Dynamic USB passthrough using virsh

Table of Contents
Dynamic USB passthrough using virsh
USB passthrough can be a pain sometimes while using virt-manager.
If the previously connected device isn’t connected at the time of starting the VM, it will refuse to boot at all complaining about a missing USB device.
But manually adding & removing the USB device each time isn’t ideal, so let’s automate it!
Components:
- Linux Host
- Windows VM
- USB device to pass from the host to the vm
- Python
- pyudev
- subprocess
- json
- virsh
- SystemD
The idea
Use udev to listen to usb events and pass that to the vm using virsh
Code
/etc/systemd/system/vm_usb_passthrough.service
The ExecStart
requires to be modified with the actual location of the Python script.
[Unit]
Description=VM USB Passthrough
[Service]
Type=simple
WorkingDirectory=/home/[USER]/Documents/python/vm_usb_passthrough
ExecStart=/home/[USER]/Documents/python/vm_usb_passthrough/main.py
User=root
[Install]
WantedBy=multi-user.target
~/Documents/python/vm_usb_passthrough/config.json
Example config to pass a USB device by vendorID and ProductID to a specified VM
{
"devices": [
{
"vid": "0483",
"pid": "3748",
"vm": "RDPWindows"
}
]
}
~/Documents/python/vm_usb_passthrough/rquirements.txt
pyudev==0.24.0
~/Documents/python/vm_usb_passthrough/main.py
Place this Python script in a logical place that is not prune for user error (eg. deletion).
#!/usr/bin/python3
import json
import subprocess
import pyudev
config = None
current_devices = {}
known_devices = {}
connected_devices = {}
with open("config.json", 'r') as f:
config = json.load(f)
if config is None:
print("ERROR: config.json not found!")
exit(-1)
def updateKnownDevices():
global known_devices
global current_devices
current_devices = {}
try:
lsusb_output = subprocess.check_output(['lsusb']).decode('utf-8').strip().split("\n")
for line in lsusb_output:
vid, pid = line.split(" ")[5].split(':')
vendor_name = line.split(" ")[6]
product_name = line.split(" ")[7]
known_devices[f'{vendor_name}:{product_name}'] = f'{vid}:{pid}'
current_devices[f'{vendor_name}:{product_name}'] = f'{vid}:{pid}'
except subprocess.CalledProcessError as e:
pass
def get_target_vm_name_from_config(vid, pid):
global config
for entry in config["devices"]:
if entry["vid"] == vid and entry["pid"] == pid:
return entry["vm"]
return None
def check_if_vm_is_running(name):
vm_running = False
try:
if "running" in str(subprocess.check_output(['virsh', 'domstate', name])):
vm_running = True
except subprocess.CalledProcessError as e:
pass
return vm_running
def process_device_to_vm(vid, pid, is_connect):
vm = get_target_vm_name_from_config(vid, pid)
if vm is None:
print(f'VM not found for device {vid}:{pid}...')
return
if not check_if_vm_is_running(vm):
print(f'VM {vm} is not running, skipping device {vid}:{pid}...')
return
command = None
if is_connect:
print(f'Connecting device {vid}:{pid} to VM {vm}...')
command = f"""bash -c 'echo "<hostdev mode=\\"subsystem\\" type=\\"usb\\" managed=\\"yes\\"><source><vendor id=\\"0x{vid}\\"/><product id=\\"0x{pid}\\"/></source></hostdev>" | sudo virsh attach-device {vm} /dev/stdin'"""
else:
print(f'Disconnecting device {vid}:{pid} from VM {vm}...')
command = f"""bash -c 'echo "<hostdev mode=\\"subsystem\\" type=\\"usb\\" managed=\\"yes\\"><source><vendor id=\\"0x{vid}\\"/><product id=\\"0x{pid}\\"/></source></hostdev>" | sudo virsh detach-device {vm} /dev/stdin'"""
if command is not None:
try:
subprocess.check_output(command, shell=True)
except subprocess.CalledProcessError as e:
pass
context = pyudev.Context()
monitor = pyudev.Monitor.from_netlink(context)
monitor.filter_by("usb")
for action, device in monitor:
updateKnownDevices()
vendor_name = device.get("ID_VENDOR_FROM_DATABASE", None)
product_name = device.get("ID_MODEL_FROM_DATABASE", None)
if (vendor_name is None or product_name is None) and action != "remove":
continue
vid = None
pid = None
if vendor_name is not None and product_name is not None:
for key in known_devices:
value = known_devices[key]
if vendor_name in key and product_name in key:
vid, pid = value.split(":")
if vid is None or pid is None:
if action == "remove":
for device in connected_devices:
if connected_devices[device] == 1:
vid, pid = device.split(":")
if not (vid in current_devices.values() and pid in current_devices.values()):
connected_devices[device] = 0
process_device_to_vm(vid, pid, False)
break
continue
has_connected = False
has_disconnected = False
if f'{vid}:{pid}' not in connected_devices:
connected_devices[f'{vid}:{pid}'] = -1
if action == "bind":
if connected_devices[f'{vid}:{pid}'] != 1:
connected_devices[f'{vid}:{pid}'] = 1
has_connected = True
elif action == "remove":
if connected_devices[f'{vid}:{pid}'] != 0:
connected_devices[f'{vid}:{pid}'] = 0
has_disconnected = True
if has_connected:
process_device_to_vm(vid, pid, True)
elif has_disconnected:
process_device_to_vm(vid, pid, False)
Starting
Installing the Python script requirements for the user specified in the service
# user
pip3 install /path/to/requirements.txt
# root
sudo pip3 install /path/to/requirements.txt
Enable and start the service
# systemctl daemon-reload
# systemctl enable vm_usb_passthrough.service
# systemctl start vm_usb_passthrough.service