Table of Contents

Dynamic USB passthrough using virsh

USB passthrough can be a pain sometimes while using virt-manager.

If the previously connected device isn’t connected at the time of starting the VM, it will refuse to boot at all complaining about a missing USB device.

But manually adding & removing the USB device each time isn’t ideal, so let’s automate it!

Components:

  • Linux Host
  • Windows VM
  • USB device to pass from the host to the vm
  • Python
    • pyudev
    • subprocess
    • json
  • virsh
  • SystemD

The idea

Use udev to listen to usb events and pass that to the vm using virsh

Code

/etc/systemd/system/vm_usb_passthrough.service

The ExecStart requires to be modified with the actual location of the Python script.

[Unit]
Description=VM USB Passthrough

[Service]
Type=simple
WorkingDirectory=/home/[USER]/Documents/python/vm_usb_passthrough
ExecStart=/home/[USER]/Documents/python/vm_usb_passthrough/main.py
User=root

[Install]
WantedBy=multi-user.target

~/Documents/python/vm_usb_passthrough/config.json

Example config to pass a USB device by vendorID and ProductID to a specified VM

{
    "devices": [
        {
            "vid": "0483",
            "pid": "3748",
            "vm": "RDPWindows"
        }
    ]
}

~/Documents/python/vm_usb_passthrough/rquirements.txt

pyudev==0.24.0

~/Documents/python/vm_usb_passthrough/main.py

Place this Python script in a logical place that is not prune for user error (eg. deletion).

#!/usr/bin/python3

import json
import subprocess
import pyudev

config = None
current_devices = {}
known_devices = {}
connected_devices = {}

with open("config.json", 'r') as f:
    config = json.load(f)

if config is None:
    print("ERROR: config.json not found!")
    exit(-1)

def updateKnownDevices():
    global known_devices
    global current_devices

    current_devices = {}

    try:
        lsusb_output = subprocess.check_output(['lsusb']).decode('utf-8').strip().split("\n")
        for line in lsusb_output:
            vid, pid = line.split(" ")[5].split(':')
            vendor_name = line.split(" ")[6]
            product_name = line.split(" ")[7]

            known_devices[f'{vendor_name}:{product_name}'] = f'{vid}:{pid}'
            current_devices[f'{vendor_name}:{product_name}'] = f'{vid}:{pid}'
    except subprocess.CalledProcessError as e:
        pass

def get_target_vm_name_from_config(vid, pid):
    global config

    for entry in config["devices"]:
        if entry["vid"] == vid and entry["pid"] == pid:
            return entry["vm"]

    return None

def check_if_vm_is_running(name):
    vm_running = False
    try:
        if "running" in str(subprocess.check_output(['virsh', 'domstate', name])):
            vm_running = True
    except subprocess.CalledProcessError as e:
        pass

    return vm_running

def process_device_to_vm(vid, pid, is_connect):
    vm = get_target_vm_name_from_config(vid, pid)

    if vm is None:
        print(f'VM not found for device {vid}:{pid}...')
        return
    
    if not check_if_vm_is_running(vm):
        print(f'VM {vm} is not running, skipping device {vid}:{pid}...')
        return
    
    command = None
    if is_connect:
        print(f'Connecting device {vid}:{pid} to VM {vm}...')
        command = f"""bash -c 'echo "<hostdev mode=\\"subsystem\\" type=\\"usb\\" managed=\\"yes\\"><source><vendor id=\\"0x{vid}\\"/><product id=\\"0x{pid}\\"/></source></hostdev>" | sudo virsh attach-device {vm} /dev/stdin'"""
    else:
        print(f'Disconnecting device {vid}:{pid} from VM {vm}...')
        command = f"""bash -c 'echo "<hostdev mode=\\"subsystem\\" type=\\"usb\\" managed=\\"yes\\"><source><vendor id=\\"0x{vid}\\"/><product id=\\"0x{pid}\\"/></source></hostdev>" | sudo virsh detach-device {vm} /dev/stdin'"""

    if command is not None:
        try:
            subprocess.check_output(command, shell=True)
        except subprocess.CalledProcessError as e:
            pass

context = pyudev.Context()
monitor = pyudev.Monitor.from_netlink(context)
monitor.filter_by("usb")

for action, device in monitor:
    updateKnownDevices()

    vendor_name = device.get("ID_VENDOR_FROM_DATABASE", None)
    product_name = device.get("ID_MODEL_FROM_DATABASE", None)

    if (vendor_name is None or product_name is None) and action != "remove":
        continue

    vid = None
    pid = None
    if vendor_name is not None and product_name is not None:
        for key in known_devices:
            value = known_devices[key]
            if vendor_name in key and product_name in key:
                vid, pid = value.split(":")

    if vid is None or pid is None:
        if action == "remove":
            for device in connected_devices:
                if connected_devices[device] == 1:
                    vid, pid = device.split(":")
                    if not (vid in current_devices.values() and pid in current_devices.values()):
                        connected_devices[device] = 0
                        process_device_to_vm(vid, pid, False)
                    break
        continue

    has_connected = False
    has_disconnected = False

    if f'{vid}:{pid}' not in connected_devices:
        connected_devices[f'{vid}:{pid}'] = -1

    if action == "bind":
        if connected_devices[f'{vid}:{pid}'] != 1:
            connected_devices[f'{vid}:{pid}'] = 1
            has_connected = True
    elif action == "remove":
        if connected_devices[f'{vid}:{pid}'] != 0:
            connected_devices[f'{vid}:{pid}'] = 0
            has_disconnected = True
    
    if has_connected:
        process_device_to_vm(vid, pid, True)
    elif has_disconnected:
        process_device_to_vm(vid, pid, False)

Starting

Installing the Python script requirements for the user specified in the service

# user
pip3 install /path/to/requirements.txt

# root
sudo pip3 install /path/to/requirements.txt

Enable and start the service

# systemctl daemon-reload
# systemctl enable vm_usb_passthrough.service
# systemctl start vm_usb_passthrough.service

Demo