From 254b61fad46c7c8273735019666c2b5e3fe99294 Mon Sep 17 00:00:00 2001 From: Talha Can Havadar Date: Sat, 19 Jul 2025 21:30:06 +0200 Subject: [PATCH] sdwire: remove pyudev dependency This commit removes the need for pyudev to detect block devices which removes a blocker for this tool to run on operating systems that dont have udev (namely windows,macos, etc.). This also enables `--version` option for the cli and it fetches the information from pyproject.toml file --- poetry.lock | 75 +-- pyproject.toml | 2 +- sdwire/__init__.py | 5 + sdwire/backend/block_device_utils.py | 854 +++++++++++++++++++++++++++ sdwire/backend/detect.py | 125 ++-- sdwire/backend/device/sdwire.py | 64 +- sdwire/backend/device/sdwirec.py | 75 ++- sdwire/backend/device/usb_device.py | 32 +- sdwire/backend/utils.py | 64 +- sdwire/constants.py | 2 + sdwire/main.py | 18 +- 11 files changed, 1149 insertions(+), 167 deletions(-) create mode 100644 sdwire/backend/block_device_utils.py diff --git a/poetry.lock b/poetry.lock index 178992c..4567263 100644 --- a/poetry.lock +++ b/poetry.lock @@ -1,4 +1,4 @@ -# This file is automatically @generated by Poetry 2.1.2 and should not be changed by hand. +# This file is automatically @generated by Poetry 1.8.3 and should not be changed by hand. [[package]] name = "black" @@ -6,7 +6,6 @@ version = "24.10.0" description = "The uncompromising code formatter." optional = false python-versions = ">=3.9" -groups = ["dev"] files = [ {file = "black-24.10.0-cp310-cp310-macosx_10_9_x86_64.whl", hash = "sha256:e6668650ea4b685440857138e5fe40cde4d652633b1bdffc62933d0db4ed9812"}, {file = "black-24.10.0-cp310-cp310-macosx_11_0_arm64.whl", hash = "sha256:1c536fcf674217e87b8cc3657b81809d3c085d7bf3ef262ead700da345bfa6ea"}, @@ -47,14 +46,13 @@ uvloop = ["uvloop (>=0.15.2)"] [[package]] name = "click" -version = "8.1.7" +version = "8.2.1" description = "Composable command line interface toolkit" optional = false -python-versions = ">=3.7" -groups = ["main", "dev"] +python-versions = ">=3.10" files = [ - {file = "click-8.1.7-py3-none-any.whl", hash = "sha256:ae74fb96c20a0277a1d615f1e4d73c8414f5a98db8b799a7931d1582f3390c28"}, - {file = "click-8.1.7.tar.gz", hash = "sha256:ca9853ad459e787e2192211578cc907e7594e294c7ccc834310722b41b9ca6de"}, + {file = "click-8.2.1-py3-none-any.whl", hash = "sha256:61a3265b914e850b85317d0b3109c7f8cd35a670f963866005d6ef1d5175a12b"}, + {file = "click-8.2.1.tar.gz", hash = "sha256:27c491cc05d968d271d5a1db13e3b5a184636d9d930f148c50b038f0d0646202"}, ] [package.dependencies] @@ -66,8 +64,6 @@ version = "0.4.6" description = "Cross-platform colored terminal text." optional = false python-versions = "!=3.0.*,!=3.1.*,!=3.2.*,!=3.3.*,!=3.4.*,!=3.5.*,!=3.6.*,>=2.7" -groups = ["main", "dev"] -markers = "platform_system == \"Windows\"" files = [ {file = "colorama-0.4.6-py2.py3-none-any.whl", hash = "sha256:4f1d9991f5acc0ca119f9d443620b77f9d6b33703e51011c16baf57afb285fc6"}, {file = "colorama-0.4.6.tar.gz", hash = "sha256:08695f5cb7ed6e0531a20572697297273c47b8cae5a63ffc6d6ed5c201be6e44"}, @@ -75,26 +71,24 @@ files = [ [[package]] name = "mypy-extensions" -version = "1.0.0" +version = "1.1.0" description = "Type system extensions for programs checked with the mypy type checker." optional = false -python-versions = ">=3.5" -groups = ["dev"] +python-versions = ">=3.8" files = [ - {file = "mypy_extensions-1.0.0-py3-none-any.whl", hash = "sha256:4392f6c0eb8a5668a69e23d168ffa70f0be9ccfd32b5cc2d26a34ae5b844552d"}, - {file = "mypy_extensions-1.0.0.tar.gz", hash = "sha256:75dbf8955dc00442a438fc4d0666508a9a97b6bd41aa2f0ffe9d2f2725af0782"}, + {file = "mypy_extensions-1.1.0-py3-none-any.whl", hash = "sha256:1be4cccdb0f2482337c4743e60421de3a356cd97508abadd57d47403e94f5505"}, + {file = "mypy_extensions-1.1.0.tar.gz", hash = "sha256:52e68efc3284861e772bbcd66823fde5ae21fd2fdb51c62a211403730b916558"}, ] [[package]] name = "packaging" -version = "24.1" +version = "25.0" description = "Core utilities for Python packages" optional = false python-versions = ">=3.8" -groups = ["dev"] files = [ - {file = "packaging-24.1-py3-none-any.whl", hash = "sha256:5b8f2217dbdbd2f7f384c41c628544e6d52f2d0f53c6d0c3ea61aa5d1d7ff124"}, - {file = "packaging-24.1.tar.gz", hash = "sha256:026ed72c8ed3fcce5bf8950572258698927fd1dbda10a5e981cdf0ac37f4f002"}, + {file = "packaging-25.0-py3-none-any.whl", hash = "sha256:29572ef2b1f17581046b3a2227d5c611fb25ec70ca1ba8554b24b0e69331a484"}, + {file = "packaging-25.0.tar.gz", hash = "sha256:d443872c98d677bf60f6a1f2f8c1cb748e8fe762d2bf9d3148b5599295b0fc4f"}, ] [[package]] @@ -103,7 +97,6 @@ version = "0.12.1" description = "Utility library for gitignore style pattern matching of file paths." optional = false python-versions = ">=3.8" -groups = ["dev"] files = [ {file = "pathspec-0.12.1-py3-none-any.whl", hash = "sha256:a0d503e138a4c123b27490a4f7beda6a01c6f288df0e4a8b79c7eb0dc7b4cc08"}, {file = "pathspec-0.12.1.tar.gz", hash = "sha256:a482d51503a1ab33b1c67a6c3813a26953dbdc71c31dacaef9a838c4e29f5712"}, @@ -111,20 +104,19 @@ files = [ [[package]] name = "platformdirs" -version = "4.3.6" +version = "4.3.8" description = "A small Python package for determining appropriate platform-specific dirs, e.g. a `user data dir`." optional = false -python-versions = ">=3.8" -groups = ["dev"] +python-versions = ">=3.9" files = [ - {file = "platformdirs-4.3.6-py3-none-any.whl", hash = "sha256:73e575e1408ab8103900836b97580d5307456908a03e92031bab39e4554cc3fb"}, - {file = "platformdirs-4.3.6.tar.gz", hash = "sha256:357fb2acbc885b0419afd3ce3ed34564c13c9b95c89360cd9563f73aa5e2b907"}, + {file = "platformdirs-4.3.8-py3-none-any.whl", hash = "sha256:ff7059bb7eb1179e2685604f4aaf157cfd9535242bd23742eadc3c13542139b4"}, + {file = "platformdirs-4.3.8.tar.gz", hash = "sha256:3d512d96e16bcb959a814c9f348431070822a6496326a4be0911c40b5a74c2bc"}, ] [package.extras] -docs = ["furo (>=2024.8.6)", "proselint (>=0.14)", "sphinx (>=8.0.2)", "sphinx-autodoc-typehints (>=2.4)"] -test = ["appdirs (==1.4.4)", "covdefaults (>=2.3)", "pytest (>=8.3.2)", "pytest-cov (>=5)", "pytest-mock (>=3.14)"] -type = ["mypy (>=1.11.2)"] +docs = ["furo (>=2024.8.6)", "proselint (>=0.14)", "sphinx (>=8.1.3)", "sphinx-autodoc-typehints (>=3)"] +test = ["appdirs (==1.4.4)", "covdefaults (>=2.3)", "pytest (>=8.3.4)", "pytest-cov (>=6)", "pytest-mock (>=3.14)"] +type = ["mypy (>=1.14.1)"] [[package]] name = "pyftdi" @@ -132,7 +124,6 @@ version = "0.56.0" description = "FTDI device driver (pure Python)" optional = false python-versions = ">=3.9" -groups = ["main"] files = [ {file = "pyftdi-0.56.0-py3-none-any.whl", hash = "sha256:3ef0baadbf9031dde9d623ae66fac2d16ded36ce1b66c17765ca1944cb38b8b0"}, ] @@ -147,7 +138,6 @@ version = "3.5" description = "Python Serial Port Extension" optional = false python-versions = "*" -groups = ["main"] files = [ {file = "pyserial-3.5-py2.py3-none-any.whl", hash = "sha256:c4451db6ba391ca6ca299fb3ec7bae67a5c55dde170964c7a14ceefec02f2cf0"}, {file = "pyserial-3.5.tar.gz", hash = "sha256:3c77e014170dfffbd816e6ffc205e9842efb10be9f58ec16d3e8675b4925cddb"}, @@ -156,31 +146,18 @@ files = [ [package.extras] cp2110 = ["hidapi"] -[[package]] -name = "pyudev" -version = "0.24.3" -description = "A libudev binding" -optional = false -python-versions = ">=3.7" -groups = ["main"] -files = [ - {file = "pyudev-0.24.3-py3-none-any.whl", hash = "sha256:e8246f0a014fe370119ba2bc781bfbe62c0298d0d6b39c94e83102a8a3f56960"}, - {file = "pyudev-0.24.3.tar.gz", hash = "sha256:2e945427a21674893bb97632401db62139d91cea1ee96137cc7b07ad22198fc7"}, -] - [[package]] name = "pyusb" -version = "1.2.1" -description = "Python USB access module" +version = "1.3.1" +description = "Easy USB access for Python" optional = false -python-versions = ">=3.6.0" -groups = ["main"] +python-versions = ">=3.9.0" files = [ - {file = "pyusb-1.2.1-py3-none-any.whl", hash = "sha256:2b4c7cb86dbadf044dfb9d3a4ff69fd217013dbe78a792177a3feb172449ea36"}, - {file = "pyusb-1.2.1.tar.gz", hash = "sha256:a4cc7404a203144754164b8b40994e2849fde1cfff06b08492f12fff9d9de7b9"}, + {file = "pyusb-1.3.1-py3-none-any.whl", hash = "sha256:bf9b754557af4717fe80c2b07cc2b923a9151f5c08d17bdb5345dac09d6a0430"}, + {file = "pyusb-1.3.1.tar.gz", hash = "sha256:3af070b607467c1c164f49d5b0caabe8ac78dbed9298d703a8dbf9df4052d17e"}, ] [metadata] -lock-version = "2.1" +lock-version = "2.0" python-versions = "^3.12" -content-hash = "1a5cd4e89f0872ef9fa702b5a45f371cd965280783ce2f1cc9085a45dd20c721" +content-hash = "5e2aceec730a4767e01430c87c164e893393297c68a618d135c417072b5fc602" diff --git a/pyproject.toml b/pyproject.toml index 556547e..bbcd64e 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -12,7 +12,7 @@ python = "^3.12" click = "^8.1.7" pyusb = "^1.2.1" pyftdi = "^0.56.0" -pyudev = "^0.24.3" + [tool.poetry.group.dev.dependencies] diff --git a/sdwire/__init__.py b/sdwire/__init__.py index e69de29..648222d 100644 --- a/sdwire/__init__.py +++ b/sdwire/__init__.py @@ -0,0 +1,5 @@ +"""SDWire CLI package initialization.""" + +from sdwire import main + +__all__ = ["main"] diff --git a/sdwire/backend/block_device_utils.py b/sdwire/backend/block_device_utils.py new file mode 100644 index 0000000..d78e914 --- /dev/null +++ b/sdwire/backend/block_device_utils.py @@ -0,0 +1,854 @@ +"""Block device utilities for mapping USB devices to system block devices. + +This module provides cross-platform functionality to map USB storage devices +to their corresponding block device paths (e.g., /dev/sda on Linux, /dev/disk2 on macOS). +It uses USB hierarchy and topology information for accurate device mapping. +""" + +import platform +import logging +import re +import subprocess +import json +import plistlib +from typing import Optional, List, Dict, Any +import usb.core +from sdwire.constants import ( + SDWIRE3_VID, + SDWIRE3_PID, + SDWIREC_VID, + SDWIREC_PID, + USB_MASS_STORAGE_CLASS_ID, +) + +log = logging.getLogger(__name__) + + +def map_usb_device_to_block_device(usb_device: usb.core.Device) -> Optional[str]: + """Map a USB device to its corresponding system block device path using USB hierarchy. + + This function provides cross-platform mapping from USB devices to block devices + using USB topology information rather than serial numbers for more reliable detection. + + Args: + usb_device: USB device object from pyusb representing the storage device + + Returns: + Block device path (e.g., '/dev/sda' on Linux, '/dev/disk2' on macOS) + or None if no corresponding block device is found + + Note: + - Uses USB bus, address, and port hierarchy for device correlation + - Handles both direct storage devices and hub-based topologies + - Falls back gracefully when USB information is not accessible + """ + system = platform.system().lower() + + if system == "linux": + return _map_usb_to_block_device_linux(usb_device) + elif system == "darwin": # macOS + return _map_usb_to_block_device_macos(usb_device) + else: + log.warning(f"Unsupported platform: {system}") + return None + + +def _get_usb_device_topology_key(usb_device: usb.core.Device) -> Optional[str]: + """Generate a topology-based key for USB device identification. + + This creates a unique identifier based on USB bus, address, and port + hierarchy rather than serial numbers which may not be unique. + + Args: + usb_device: USB device to generate key for + + Returns: + Topology key string or None if device info is not accessible + """ + try: + bus = getattr(usb_device, "bus", None) + address = getattr(usb_device, "address", None) + + if bus is None or address is None: + return None + + # Try to get port numbers for more specific topology info + try: + port_numbers = getattr(usb_device, "port_numbers", []) + if port_numbers: + port_path = ".".join(map(str, port_numbers)) + return f"{bus}:{address}@{port_path}" + except (AttributeError, usb.core.USBError): + pass + + # Fallback to bus:address + return f"{bus}:{address}" + + except Exception as e: + log.debug(f"Error generating topology key: {e}") + return None + + +def _find_block_device_via_ioregistry_direct( + vendor_id: int, + product_id: int, + bus: Optional[int] = None, + address: Optional[int] = None, +) -> Optional[str]: + """Find block device by searching IORegistry for USB mass storage devices. + + This method searches for IOMedia objects that have USB mass storage + devices in their parent chain with matching vendor/product IDs. + + Args: + vendor_id: USB vendor ID to match + product_id: USB product ID to match + bus: Optional USB bus number for more precise matching + address: Optional USB address for more precise matching + + Returns: + Block device path (e.g., '/dev/disk14') or None if not found + """ + try: + # Use system_profiler to find USB devices with media + result = subprocess.run( + ["system_profiler", "SPUSBDataType", "-xml"], + capture_output=True, + text=True, + timeout=10, + ) + + if result.returncode != 0: + return None + + # Parse the plist output + data = plistlib.loads(result.stdout.encode()) + + # Search for USB devices with matching vendor/product ID that have media + def search_usb_tree(items): + for item in items: + if isinstance(item, dict): + # Check if this device matches + item_vendor = item.get("vendor_id", "") + item_product = item.get("product_id", "") + + # Convert to int if in hex string format + try: + if isinstance(item_vendor, str): + # Handle format like "0x0bda (Realtek Semiconductor Corp.)" + vendor_match = re.search(r"0x([0-9a-fA-F]+)", item_vendor) + if vendor_match: + item_vendor = int(vendor_match.group(1), 16) + elif item_vendor.startswith("0x"): + item_vendor = int(item_vendor, 16) + + if isinstance(item_product, str): + product_match = re.search(r"0x([0-9a-fA-F]+)", item_product) + if product_match: + item_product = int(product_match.group(1), 16) + elif item_product.startswith("0x"): + item_product = int(item_product, 16) + + if item_vendor == vendor_id and item_product == product_id: + # If bus/address provided, verify they match + if bus is not None or address is not None: + location_id = item.get("location_id", "") + # Parse location ID format: "0x01142200 / 12" + location_match = re.search(r"/ (\d+)$", location_id) + if location_match: + device_address = int(location_match.group(1)) + if ( + address is not None + and device_address != address + ): + log.debug( + f"Address mismatch: looking for {address}, found {device_address}" + ) + continue + + # For more precise matching when multiple identical devices exist, + # we rely on the address from location_id since USB serial numbers + # are often identical for mass-produced devices + + # Check if this device has media + media = item.get("Media", []) + if media: + for m in media: + bsd_name = m.get("bsd_name", "") + if bsd_name and re.match(r"^disk\d+$", bsd_name): + log.debug( + f"Found USB device media directly: {bsd_name} for device at address {address}" + ) + return f"/dev/{bsd_name}" + except (ValueError, TypeError): + pass + + # Search children + if "_items" in item: + result = search_usb_tree(item["_items"]) + if result: + return result + + return None + + # Search through all USB buses + for bus in data: + if "_items" in bus: + result = search_usb_tree(bus["_items"]) + if result: + return result + + except Exception as e: + log.debug(f"Error in direct IORegistry search: {e}") + + return None + + +def _map_usb_to_block_device_linux(usb_device: usb.core.Device) -> Optional[str]: + """Map USB device to block device on Linux using lsblk and USB topology. + + This function uses lsblk to enumerate block devices and correlates them + with the USB device using bus and address information from sysfs. + + Args: + usb_device: USB device object from pyusb + + Returns: + Block device path (e.g., '/dev/sda') or None if not found + """ + try: + # Get device topology info + device_key = _get_usb_device_topology_key(usb_device) + if not device_key: + log.debug("Could not generate topology key for USB device") + return None + + # Use lsblk to get detailed block device information + result = subprocess.run( + ["lsblk", "-o", "NAME,TRAN,VENDOR,MODEL,SERIAL,SUBSYSTEMS", "-J"], + capture_output=True, + text=True, + timeout=10, + ) + + if result.returncode != 0: + log.debug("lsblk command failed") + return None + + try: + data = json.loads(result.stdout) + blockdevices = data.get("blockdevices", []) + + # Find block devices that match our USB device topology + for device in blockdevices: + if device.get("tran") != "usb": + continue + + device_name = device.get("name") + if not device_name: + continue + + # Check if this block device corresponds to our USB device + if _is_block_device_match_linux(device, usb_device, device_key): + log.debug(f"Found matching block device: {device_name}") + return f"/dev/{device_name}" + + except (json.JSONDecodeError, KeyError) as e: + log.debug(f"Failed to parse lsblk output: {e}") + + except Exception as e: + log.debug(f"Error mapping USB to block device on Linux: {e}") + + return None + + +def _is_block_device_match_linux( + block_device: Dict[str, Any], usb_device: usb.core.Device, device_key: str +) -> bool: + """Check if a block device matches the given USB device on Linux. + + Uses sysfs to correlate block devices with USB devices through bus and + address information. + + Args: + block_device: Block device info from lsblk + usb_device: USB device to match + device_key: Topology key for the USB device + + Returns: + True if the block device corresponds to the USB device + """ + try: + device_name = block_device.get("name") + if not device_name: + return False + + # Try to read USB info from sysfs + sysfs_path = f"/sys/block/{device_name}" + + # Follow the device link to find USB information + try: + result = subprocess.run( + ["readlink", "-f", f"{sysfs_path}/device"], + capture_output=True, + text=True, + timeout=5, + ) + + if result.returncode == 0: + device_path = result.stdout.strip() + + # Extract bus and address from the device path + # Typical path: /sys/devices/pci.../usb1/1-2/1-2.3/... + # We look for patterns like "1-2.3" which indicate USB bus 1, + # port path 2.3 + usb_match = re.search(r"/usb(\d+)/(\d+)-([0-9.]+)/", device_path) + if usb_match: + bus_num = int(usb_match.group(1)) + port_path = usb_match.group(3) + + # Check if the topology matches (bus and port path) + if ( + device_key.startswith(f"{bus_num}:") + and f"@{port_path}" in device_key + ): + return True + + except Exception as e: + log.debug(f"Error reading sysfs for {device_name}: {e}") + return False + + except Exception as e: + log.debug(f"Error checking block device match: {e}") + return False + + +def _map_usb_to_block_device_macos(usb_device: usb.core.Device) -> Optional[str]: + """Map USB device to block device on macOS using system_profiler and diskutil. + + This function uses macOS system tools to enumerate USB devices and find + corresponding disk devices using USB topology information. + + Args: + usb_device: USB device object from pyusb + + Returns: + Block device path (e.g., '/dev/disk2') or None if not found + """ + try: + vendor_id = getattr(usb_device, "idVendor", 0) + product_id = getattr(usb_device, "idProduct", 0) + bus = getattr(usb_device, "bus", None) + address = getattr(usb_device, "address", None) + + log.debug( + f"Searching for USB device: VID={vendor_id:04x}, PID={product_id:04x}, bus={bus}, addr={address}" + ) + + # Method 1: Direct IORegistry search for mass storage devices + block_device = _find_block_device_via_ioregistry_direct( + vendor_id, product_id, bus, address + ) + if block_device: + return block_device + + # Method 2: system_profiler approach + result = subprocess.run( + ["system_profiler", "SPUSBDataType", "-json"], + capture_output=True, + text=True, + timeout=15, + ) + + if result.returncode != 0: + return None + + try: + data = json.loads(result.stdout) + usb_data = data.get("SPUSBDataType", []) + + # Find our USB device in the tree and get its location info + location_id = _find_usb_device_location_macos( + usb_data, vendor_id, product_id + ) + if not location_id: + log.debug( + "Could not find USB device location in system_profiler output" + ) + return None + + # Now find the corresponding disk device + return _find_disk_by_usb_location_macos(location_id, vendor_id, product_id) + + except json.JSONDecodeError: + log.debug("Failed to parse system_profiler JSON output") + + except Exception as e: + log.debug(f"Error finding block device on macOS: {e}") + + return None + + +def _find_usb_device_location_macos( + usb_tree: List[Dict], + target_vid: int, + target_pid: int, +) -> Optional[str]: + """Find USB device location ID in macOS system_profiler tree. + + Args: + usb_tree: USB device tree from system_profiler + target_vid: Target vendor ID + target_pid: Target product ID + + Returns: + Location ID string or None if not found + """ + + def search_tree(items: List[Dict], depth: int = 0) -> Optional[str]: + for item in items: + if isinstance(item, dict): + # Check if this item matches our device + vendor_id_str = item.get("vendor_id", "") + product_id_str = item.get("product_id", "") + location_id = item.get("location_id", "") + + # Log for debugging + if vendor_id_str or product_id_str: + log.debug( + f"Checking USB device: vendor={vendor_id_str}, product={product_id_str}, location={location_id}" + ) + + try: + # Handle different formatting of vendor/product IDs + vendor_match = False + product_match = False + + # Check for exact hex match or partial match + if vendor_id_str: + vendor_hex = f"0x{target_vid:04x}" + if ( + vendor_hex.lower() in vendor_id_str.lower() + or str(target_vid) in vendor_id_str + or f"{target_vid:04x}" in vendor_id_str.lower() + ): + vendor_match = True + + if product_id_str: + product_hex = f"0x{target_pid:04x}" + if ( + product_hex.lower() in product_id_str.lower() + or str(target_pid) in product_id_str + or f"{target_pid:04x}" in product_id_str.lower() + ): + product_match = True + + if vendor_match and product_match: + log.debug( + f"Found matching USB device at location {location_id}" + ) + return location_id + + except Exception as e: + log.debug(f"Error checking USB device match: {e}") + + # Search children + children = item.get("_items", []) + if children: + result = search_tree(children, depth + 1) + if result: + return result + + return None + + return search_tree(usb_tree) + + +def _find_disk_by_usb_location_macos( + location_id: str, vendor_id: int, product_id: int +) -> Optional[str]: + """Find disk device corresponding to USB location ID on macOS. + + Args: + location_id: USB location ID from system_profiler + vendor_id: USB vendor ID for additional validation + product_id: USB product ID for additional validation + + Returns: + Block device path (e.g., '/dev/disk2') or None if not found + """ + try: + all_disks = _get_all_disks_macos() + if not all_disks: + return None + + # Check each disk to see if it corresponds to our USB device + for disk in all_disks: + if not _is_valid_disk_name(disk): + continue + + disk_data = _get_disk_info_macos(disk) + if not disk_data: + continue + + if _is_matching_usb_disk_macos(disk_data, vendor_id, product_id): + return f"/dev/{disk}" + + except Exception as e: + log.debug(f"Error finding disk by USB location on macOS: {e}") + + return None + + +def _get_all_disks_macos() -> Optional[List[str]]: + """Get list of all disks on macOS.""" + try: + result = subprocess.run( + ["diskutil", "list", "-plist"], capture_output=True, text=True, timeout=10 + ) + + if result.returncode != 0: + return None + + data = plistlib.loads(result.stdout.encode()) + return data.get("AllDisks", []) + + except Exception as e: + log.debug(f"Failed to get disk list on macOS: {e}") + return None + + +def _is_valid_disk_name(disk: str) -> bool: + """Check if disk name is valid (not a partition).""" + return disk.startswith("disk") and not disk.endswith("s1") + + +def _get_disk_info_macos(disk: str) -> Optional[Dict[str, Any]]: + """Get detailed information for a specific disk on macOS.""" + try: + result = subprocess.run( + ["diskutil", "info", "-plist", disk], + capture_output=True, + text=True, + timeout=5, + ) + + if result.returncode != 0: + return None + + return plistlib.loads(result.stdout.encode()) + + except Exception as e: + log.debug(f"Error getting disk info for {disk}: {e}") + return None + + +def _match_disk_via_ioregistry_macos( + disk_identifier: str, vendor_id: int, product_id: int +) -> bool: + """Match disk to USB device using IORegistry parent chain. + + Args: + disk_identifier: Disk identifier (e.g., 'disk14') + vendor_id: USB vendor ID to match + product_id: USB product ID to match + + Returns: + True if the disk is connected through the specified USB device + """ + try: + # First, get the IOMedia object for this disk + media_result = subprocess.run( + ["ioreg", "-c", "IOMedia", "-w0", "-r"], + capture_output=True, + text=True, + timeout=5, + ) + + if media_result.returncode != 0: + return False + + # Find the specific disk in the output + lines = media_result.stdout.split("\n") + disk_found = False + + for i, line in enumerate(lines): + if f'"BSD Name" = "{disk_identifier}"' in line: + disk_found = True + # Look backwards to find the IOMedia object path + for j in range(i, max(0, i - 10), -1): + if "class IOMedia" in lines[j] and "<" in lines[j]: + # Extract registry ID + match = re.search(r"id 0x([0-9a-fA-F]+)", lines[j]) + if match: + # Registry ID found but not currently used + break + break + + if not disk_found: + log.debug(f"Disk {disk_identifier} not found in IORegistry") + return False + + # Now trace upwards to find USB device + # Use ioreg to get the full parent chain + parent_result = subprocess.run( + ["ioreg", "-w0", "-p", "IOService", "-t"], + capture_output=True, + text=True, + timeout=10, + ) + + if parent_result.returncode != 0: + return False + + # Search for our disk and trace its parents + output = parent_result.stdout + lines = output.split("\n") + + # Find the disk and work backwards to find USB device + for i, line in enumerate(lines): + if disk_identifier in line and "IOMedia" in line: + # Work backwards from this line to find USB device info + indent_level = len(line) - len(line.lstrip()) + + # Search upwards for USB device with less indentation + for j in range(i, max(0, i - 100), -1): + parent_line = lines[j] + parent_indent = len(parent_line) - len(parent_line.lstrip()) + + # If we find a USB device at a higher level (less indented) + if parent_indent < indent_level and ( + "IOUSBHostDevice" in parent_line + or "IOUSBMassStorageDriver" in parent_line + ): + # Now check the next few lines for vendor/product IDs + for k in range(j, min(j + 20, len(lines))): + check_line = lines[k] + # Check indentation to ensure we're still in the same device + check_indent = len(check_line) - len(check_line.lstrip()) + if check_indent <= parent_indent and k > j: + break + + # Look for vendor and product IDs + vendor_match = re.search( + r'"idVendor"\s*=\s*0x([0-9a-fA-F]+)', check_line + ) + product_match = re.search( + r'"idProduct"\s*=\s*0x([0-9a-fA-F]+)', check_line + ) + + if ( + vendor_match + and int(vendor_match.group(1), 16) == vendor_id + ): + # Check for product ID in nearby lines + for m in range(k - 5, min(k + 5, len(lines))): + pm = re.search( + r'"idProduct"\s*=\s*0x([0-9a-fA-F]+)', lines[m] + ) + if pm and int(pm.group(1), 16) == product_id: + log.debug( + f"Found matching USB device for {disk_identifier}: VID={vendor_id:04x} PID={product_id:04x}" + ) + return True + + except Exception as e: + log.debug(f"Error matching disk via IORegistry: {e}") + + return False + + +def _is_matching_usb_disk_macos( + disk_data: Dict[str, Any], vendor_id: int, product_id: int +) -> bool: + """Check if disk data matches the expected USB device.""" + # Check if this is a USB device + bus_protocol = disk_data.get("BusProtocol", "") + if "USB" not in bus_protocol: + return False + + # Try to match using IORegistry information + # First, check if we can find this disk in IORegistry and trace its USB parent + disk_identifier = disk_data.get("DeviceIdentifier", "") + if disk_identifier: + log.debug( + f"Checking disk {disk_identifier} against VID={vendor_id:04x} PID={product_id:04x}" + ) + return _match_disk_via_ioregistry_macos(disk_identifier, vendor_id, product_id) + + return False + + +def find_sibling_storage_device( + control_device: Optional[usb.core.Device], +) -> Optional[usb.core.Device]: + """Find sibling mass storage device for SDWireC hub topology. + + For SDWireC devices, the FTDI control chip and mass storage device are + siblings under the same USB hub. This function finds the storage sibling. + + Args: + control_device: The FTDI control device + + Returns: + USB device object for the sibling mass storage device, or None if not + found + """ + if not control_device: + return None + + try: + control_topology = _get_device_topology_info(control_device) + if control_topology is None: + return None + + bus, control_ports = control_topology + + # Find all devices on the same bus + all_devices = _get_devices_on_bus(bus) + if not all_devices: + return None + + return _find_sibling_in_devices(control_device, control_ports, all_devices) + + except Exception as e: + log.debug(f"Error finding sibling storage device: {e}") + + return None + + +def _get_device_topology_info(device: usb.core.Device) -> Optional[tuple]: + """Get topology information for a USB device.""" + try: + bus = getattr(device, "bus", None) + if bus is None: + return None + + ports = getattr(device, "port_numbers", []) + if ports is None: + return None + + if not isinstance(ports, (list, tuple)): + return None + + if len(ports) < 2: # Need at least hub + device port + return None + + return (bus, ports) + + except (AttributeError, usb.core.USBError) as e: + log.debug(f"Could not get topology info for device: {e}") + return None + + +def _get_devices_on_bus(bus: int) -> Optional[List[usb.core.Device]]: + """Get all USB devices on a specific bus.""" + try: + devices_iter = usb.core.find(find_all=True, bus=bus) + if devices_iter is None: + return None + # Filter to only include Device objects, not Configuration objects + devices = [dev for dev in devices_iter if isinstance(dev, usb.core.Device)] + return devices + + except Exception as e: + log.debug(f"Error getting devices on bus {bus}: {e}") + return None + + +def _find_sibling_in_devices( + control_device: usb.core.Device, + control_ports: List[int], + all_devices: List[usb.core.Device], +) -> Optional[usb.core.Device]: + """Find sibling device among candidate devices.""" + for candidate in all_devices: + if candidate == control_device: + continue + + if _is_sibling_device(candidate, control_ports): + return candidate + + return None + + +def _is_sibling_device(candidate: usb.core.Device, control_ports: List[int]) -> bool: + """Check if candidate device is a sibling of the control device.""" + try: + candidate_ports = getattr(candidate, "port_numbers", []) + except (AttributeError, usb.core.USBError): + return False + + # Ensure candidate_ports is not None and is a list/tuple + if candidate_ports is None: + return False + + if not isinstance(candidate_ports, (list, tuple)): + return False + + # Check if they share the same parent hub (same port path except last element) + if ( + len(candidate_ports) >= 2 + and len(control_ports) >= 2 + and candidate_ports[:-1] == control_ports[:-1] + ): + + # Check if it's a mass storage device + if isinstance(candidate, usb.core.Device) and _is_mass_storage_device( + candidate + ): + log.debug(f"Found sibling storage device for SDWireC: {candidate}") + return True + + return False + + +def _is_mass_storage_device(device: usb.core.Device) -> bool: + """Check if a USB device is a mass storage device. + + Args: + device: USB device to check + + Returns: + True if the device is a mass storage device + """ + try: + # Check device class + device_class = getattr(device, "bDeviceClass", None) + if device_class == USB_MASS_STORAGE_CLASS_ID: + return True + + # Check interface class for composite devices + try: + config = device.get_active_configuration() + for interface in config: + if interface.bInterfaceClass == USB_MASS_STORAGE_CLASS_ID: + return True + except Exception as e: + log.debug(f"Could not access device interfaces: {e}") + # For composite devices where we can't access interfaces due to + # permissions, we'll use heuristics based on device class and + # known SDWire patterns + if device_class == 0: # Composite device + vendor_id = getattr(device, "idVendor", 0) + product_id = getattr(device, "idProduct", 0) + + # Known storage device patterns for SDWire ecosystem + if vendor_id == SDWIRE3_VID and product_id == SDWIRE3_PID: + return True + + # For SDWireC hub topology, if we find a composite device + # with permission issues that's a sibling of an FTDI device, + # it's likely the storage part. + if vendor_id == SDWIREC_VID and product_id == SDWIREC_PID: + return True + + # Generic heuristic: if we can't determine the device type + # due to permissions and it's a composite device, assume it + # might be storage in SDWire context + return True + + except Exception as e: + log.debug(f"Error checking if device is mass storage: {e}") + + return False diff --git a/sdwire/backend/detect.py b/sdwire/backend/detect.py index fa17f0c..250ae37 100644 --- a/sdwire/backend/detect.py +++ b/sdwire/backend/detect.py @@ -1,23 +1,38 @@ +"""SDWire device detection module. + +This module provides functions to detect and enumerate SDWire devices +connected to the system via USB, including both SDWire3 and SDWireC variants. +""" + import logging -from typing import List +from typing import List, Union from sdwire import constants -from .device.sdwire import SDWire, SDWIRE_GENERATION_SDWIRE3 -from .device.sdwirec import SDWireC -from .device.usb_device import PortInfo +from sdwire.backend.device.sdwire import SDWire, SDWIRE_GENERATION_SDWIRE3 +from sdwire.backend.device.sdwirec import SDWireC +from sdwire.backend.device.usb_device import PortInfo -import pyudev import usb.core import usb.util -from usb.core import Device log = logging.getLogger(__name__) def get_sdwirec_devices() -> List[SDWireC]: - devices: List[Device] = usb.core.find(find_all=True) + """Detect and return all connected SDWireC devices. + + Returns: + List of SDWireC device instances found on the system + """ + try: + found_devices = usb.core.find(find_all=True) + devices = list(found_devices or []) + except Exception as e: + log.debug("Error finding USB devices: %s", e) + return [] + if not devices: - log.info("no usb devices found while searching for SDWireC..") + log.debug("No USB devices found while searching for SDWireC") return [] device_list = [] @@ -26,9 +41,10 @@ def get_sdwirec_devices() -> List[SDWireC]: serial = None manufacturer = None try: - product = device.product - serial = device.serial_number - manufacturer = device.manufacturer + # Safe attribute access + product = getattr(device, "product", None) + serial = getattr(device, "serial_number", None) + manufacturer = getattr(device, "manufacturer", None) except Exception as e: log.debug( "not able to get usb product, serial_number and manufacturer information, err: %s", @@ -44,48 +60,59 @@ def get_sdwirec_devices() -> List[SDWireC]: return device_list -def get_sdwire_devices() -> List[SDWire]: - # Badgerd SDWire3 - # VID = 0bda PID = 0316 - # Badgerd SDWireC - # VID = 0x04e8 PID = 0x6001 - result = [] - devices: List[Device] = pyudev.Context().list_devices( - subsystem="usb", - ID_VENDOR_ID=f"{constants.SDWIRE3_VID:04x}", - ID_MODEL_ID=f"{constants.SDWIRE3_PID:04x}", - ) +def get_sdwire_devices() -> List[Union[SDWire, SDWireC]]: + """Detect and return all connected SDWire devices (both SDWire3 and SDWireC). + + This function searches for: + - SDWire3 devices (VID: 0x0bda, PID: 0x0316) + - SDWireC devices (VID: 0x04e8, PID: 0x6001) + + Returns: + List of SDWire device instances (SDWire or SDWireC) found on the system + """ + result: List[Union[SDWire, SDWireC]] = [] + try: + found_devices = usb.core.find( + find_all=True, + idVendor=constants.SDWIRE3_VID, + idProduct=constants.SDWIRE3_PID, + ) + devices = list(found_devices or []) + except Exception as e: + log.debug("Error finding SDWire3 devices: %s", e) + devices = [] + if not devices: log.info("no usb devices found while searching for SDWire..") - return [] - - for device in devices: - product = None - serial = None - bus = None - address = None - try: - product = int(f"0x{device.get('ID_MODEL_ID')}", 16) - vendor = int(f"0x{device.get('ID_VENDOR_ID')}", 16) - bus = int(device.get("BUSNUM")) - address = int(device.get("DEVNUM")) - serial = f"{device.get('ID_USB_SERIAL_SHORT')}:{bus}.{address}" - except Exception as e: - log.debug( - "not able to get usb product, serial_number and manufacturer information, err: %s", - e, - ) + else: + for device in devices: + product = None + serial = None + vendor = None + bus = None + address = None + try: + # Safe attribute access + product = getattr(device, "idProduct", None) + vendor = getattr(device, "idVendor", None) + bus = getattr(device, "bus", None) + address = getattr(device, "address", None) + serial_num = getattr(device, "serial_number", None) or "unknown" + serial = f"{serial_num}:{bus}.{address}" + except Exception as e: + log.debug( + "not able to get usb product, serial_number and manufacturer information, err: %s", + e, + ) - if product == constants.SDWIRE3_PID and vendor == constants.SDWIRE3_VID: - usb_device: List[Device] = usb.core.find( - idVendor=vendor, idProduct=product, bus=bus, address=address - ) - result.append( - SDWire( - port_info=PortInfo(device, product, vendor, serial, usb_device), - generation=SDWIRE_GENERATION_SDWIRE3, + if product == constants.SDWIRE3_PID and vendor == constants.SDWIRE3_VID: + result.append( + SDWire( + port_info=PortInfo(device, product, vendor, serial, device), + generation=SDWIRE_GENERATION_SDWIRE3, + ) ) - ) + # Search for legacy SDWireC devices legacy_devices = get_sdwirec_devices() diff --git a/sdwire/backend/device/sdwire.py b/sdwire/backend/device/sdwire.py index aa85269..c259b70 100644 --- a/sdwire/backend/device/sdwire.py +++ b/sdwire/backend/device/sdwire.py @@ -1,5 +1,8 @@ import logging -from .usb_device import USBDevice, PortInfo +from typing import Optional +import usb.core +from sdwire.backend.device.usb_device import USBDevice, PortInfo +from sdwire.backend.block_device_utils import map_usb_device_to_block_device log = logging.getLogger(__name__) @@ -12,15 +15,13 @@ class SDWire(USBDevice): def __init__(self, port_info: PortInfo, generation: int): super().__init__(port_info) self.generation = generation - for child in self.dev_string.children: - if ( - self.dev_string.device_path != child.device_path - and child.device_type == "disk" - ): - self.__block_dev = f"/dev/{child.device_path.split('/')[-1]}" - break - - def switch_ts(self): + self._update_block_device() + + def switch_ts(self) -> None: + if not self.usb_device: + log.error("USB device not available") + return + try: self.usb_device.attach_kernel_driver(0) self.usb_device.reset() @@ -30,7 +31,11 @@ def switch_ts(self): e, ) - def switch_dut(self): + def switch_dut(self) -> None: + if not self.usb_device: + log.error("USB device not available") + return + try: self.usb_device.detach_kernel_driver(0) self.usb_device.reset() @@ -40,12 +45,41 @@ def switch_dut(self): e, ) + def _update_block_device(self) -> None: + """Update block device detection based on current device state.""" + if not self.usb_device: + self.__block_dev = None + return + + try: + storage_device = self.storage_device + if storage_device is not None: + self.__block_dev = map_usb_device_to_block_device(storage_device) + log.debug(f"SDWire3: Found block device: {self.__block_dev}") + else: + self.__block_dev = None + except Exception as e: + log.debug(f"SDWire3: Block device detection failed: {e}") + self.__block_dev = None + @property - def block_dev(self): + def block_dev(self) -> Optional[str]: return self.__block_dev - def __str__(self): - return f"{self.serial_string}\t[{int(self.manufacturer_string):04x}::{int(self.product_string):04x}]\t\t{self.block_dev}" + @property + def storage_device(self) -> Optional[usb.core.Device]: + """Return the USB device that corresponds to the storage interface. + + For SDWire3, this is the same device we control (direct media controller). + + Returns: + usb.core.Device: The USB device for storage, or None if not available + """ + return self.usb_device + + def __str__(self) -> str: + block_dev_str = self.block_dev if self.block_dev is not None else "None" + return f"{self.serial_string}\t[{int(self.manufacturer_string):04x}::{int(self.product_string):04x}]\t\t{block_dev_str}" - def __repr__(self): + def __repr__(self) -> str: return self.__str__() diff --git a/sdwire/backend/device/sdwirec.py b/sdwire/backend/device/sdwirec.py index b83b77f..f9419b5 100644 --- a/sdwire/backend/device/sdwirec.py +++ b/sdwire/backend/device/sdwirec.py @@ -1,6 +1,9 @@ import logging +from typing import Optional from pyftdi.ftdi import Ftdi -from .usb_device import USBDevice, PortInfo +import usb.core +from sdwire.backend.device.usb_device import USBDevice, PortInfo +from sdwire.backend.block_device_utils import map_usb_device_to_block_device, find_sibling_storage_device log = logging.getLogger(__name__) @@ -10,35 +13,62 @@ class SDWireC(USBDevice): def __init__(self, port_info: PortInfo): super().__init__(port_info) - for d in self._pyudev_context.list_devices(ID_MODEL="sd-wire"): - d_serial = d.get("ID_USB_SERIAL_SHORT", None) - if d_serial is not None and d_serial == self.serial_string: - for sibling in d.parent.children: - if ( - d.device_path != sibling.device_path - and sibling.device_type == "disk" - ): - self.__block_dev = f"/dev/{sibling.device_path.split('/')[-1]}" - break - break - - def __str__(self): - return f"{self.serial_string}\t[{self.product_string}::{self.manufacturer_string}]\t{self.block_dev}" - - def __repr__(self): + self._update_block_device() + + def _update_block_device(self) -> None: + """Update block device detection for SDWireC.""" + if not self.usb_device: + self.__block_dev = None + return + + try: + storage_device = self.storage_device + if storage_device is not None: + self.__block_dev = map_usb_device_to_block_device(storage_device) + log.debug(f"SDWireC: Found block device: {self.__block_dev}") + else: + self.__block_dev = None + except Exception as e: + log.debug(f"SDWireC: Block device detection failed: {e}") + self.__block_dev = None + + def __str__(self) -> str: + block_dev_str = self.block_dev if self.block_dev is not None else "None" + return f"{self.serial_string}\t[{self.product_string}::{self.manufacturer_string}]\t{block_dev_str}" + + def __repr__(self) -> str: return self.__str__() @property - def block_dev(self): + def block_dev(self) -> Optional[str]: return self.__block_dev - def switch_ts(self): + @property + def storage_device(self) -> Optional[usb.core.Device]: + """Return the USB device that corresponds to the storage interface. + + For SDWireC, this is a sibling mass storage device under the same hub, + not the FTDI device we control. + + Returns: + USB device object for the sibling mass storage device, or None if + not found + """ + return find_sibling_storage_device(self.usb_device) + + def switch_ts(self) -> None: self._set_sdwire(1) - def switch_dut(self): + def switch_dut(self) -> None: self._set_sdwire(0) - def _set_sdwire(self, target): + def _set_sdwire(self, target: int) -> None: + if not self.usb_device: + log.error("USB device not available") + import sys + print("USB device not available") + sys.exit(1) + try: ftdi = Ftdi() ftdi.open_from_device(self.usb_device) @@ -47,7 +77,6 @@ def _set_sdwire(self, target): ftdi.close() except Exception as e: import sys - - log.debug("error while updating ftdi device", exc_info=1) + log.debug("error while updating ftdi device: %s", e, exc_info=True) print("couldnt switch sdwire device") sys.exit(1) diff --git a/sdwire/backend/device/usb_device.py b/sdwire/backend/device/usb_device.py index c382386..e2ff6a2 100644 --- a/sdwire/backend/device/usb_device.py +++ b/sdwire/backend/device/usb_device.py @@ -1,5 +1,7 @@ from collections import namedtuple -import pyudev +from typing import Optional +import usb.core + PortInfo = namedtuple( "PortInfo", ("device", "product", "manufacturer", "serial", "usb_device") @@ -7,29 +9,39 @@ class USBDevice: - __port_info = None - _pyudev_context = None + __port_info: Optional[PortInfo] = None + def __init__(self, port_info: PortInfo): self.__port_info = port_info - self._pyudev_context = pyudev.Context() + @property - def usb_device(self): - return self.__port_info.usb_device + def usb_device(self) -> Optional[usb.core.Device]: + if self.__port_info: + return self.__port_info.usb_device + return None @property def dev_string(self) -> str: - return self.__port_info.device + if self.__port_info and self.__port_info.device: + return self.__port_info.device + return "" @property def product_string(self) -> str: - return self.__port_info.product + if self.__port_info and self.__port_info.product: + return str(self.__port_info.product) + return "" @property def manufacturer_string(self) -> str: - return self.__port_info.manufacturer + if self.__port_info and self.__port_info.manufacturer: + return str(self.__port_info.manufacturer) + return "" @property def serial_string(self) -> str: - return self.__port_info.serial + if self.__port_info and self.__port_info.serial: + return str(self.__port_info.serial) + return "" diff --git a/sdwire/backend/utils.py b/sdwire/backend/utils.py index 1868748..2493f9b 100644 --- a/sdwire/backend/utils.py +++ b/sdwire/backend/utils.py @@ -1,30 +1,62 @@ +"""Utility functions for SDWire CLI operations.""" import sys import logging + import click -from .device.sdwire import SDWire -from .device.sdwirec import SDWireC -from . import detect +from sdwire.backend.device.sdwire import SDWire +from sdwire.backend.device.sdwirec import SDWireC +from sdwire.backend import detect log = logging.getLogger(__name__) -def handle_switch_host_command(ctx): - device = ctx.obj["device"] - device.switch_ts() +def handle_switch_host_command(ctx: click.Context) -> None: + """Handle switching device to host/TS mode. + + Args: + ctx: Click context containing device information + """ + try: + device = ctx.obj["device"] + device.switch_ts() + except Exception as e: + log.error(f"Failed to switch to host mode: {e}") + print(f"Error: Failed to switch device to host mode: {e}") + sys.exit(1) -def handle_switch_target_command(ctx): - device = ctx.obj["device"] - device.switch_dut() +def handle_switch_target_command(ctx: click.Context) -> None: + """Handle switching device to target/DUT mode. + Args: + ctx: Click context containing device information + """ + try: + device = ctx.obj["device"] + device.switch_dut() + except Exception as e: + log.error(f"Failed to switch to target mode: {e}") + print(f"Error: Failed to switch device to target mode: {e}") + sys.exit(1) -def handle_switch_off_command(ctx): - device = ctx.obj["device"] - if isinstance(device, SDWireC) or isinstance(device, SDWire): - log.info( - "SDWire3, SDWireC or legacy sdwire devices dont have off functionality" - ) - print("SDWireC and SDWire3 dont have off functionality implemented") + +def handle_switch_off_command(ctx: click.Context) -> None: + """Handle switching device to off mode. + + Args: + ctx: Click context containing device information + """ + try: + device = ctx.obj["device"] + if isinstance(device, (SDWireC, SDWire)): + log.info( + "SDWire3, SDWireC or legacy sdwire devices don't have off functionality" + ) + print("SDWireC and SDWire3 don't have off functionality implemented") + sys.exit(1) + except Exception as e: + log.error(f"Failed to process off command: {e}") + print(f"Error: Failed to process off command: {e}") sys.exit(1) diff --git a/sdwire/constants.py b/sdwire/constants.py index befa44a..a0779ba 100644 --- a/sdwire/constants.py +++ b/sdwire/constants.py @@ -3,3 +3,5 @@ SDWIREC_VID = 0x04E8 SDWIREC_PID = 0x6001 SDWIREC_PRODUCT_STRING = "sd-wire" + +USB_MASS_STORAGE_CLASS_ID = 8 diff --git a/sdwire/main.py b/sdwire/main.py index 3c6d6d0..b4d71f1 100644 --- a/sdwire/main.py +++ b/sdwire/main.py @@ -1,19 +1,29 @@ #!/usr/bin/env python +"""SDWire CLI main entry point. + +This module provides the command-line interface for controlling SDWire devices, +including listing devices and switching between host and DUT modes. +""" import logging +from typing import Optional +import importlib.metadata import click -from .backend import utils -from .backend import detect +from sdwire.backend import utils +from sdwire.backend import detect @click.group() @click.option("--debug", required=False, is_flag=True, help="Enable debug output") -def main(debug=None): +@click.version_option(version=importlib.metadata.version("sdwire"), prog_name="sdwire") +def main(debug: Optional[bool] = None) -> None: + """SDWire CLI - Control SDWire devices from command line.""" if debug: logging.basicConfig(level=logging.DEBUG) @main.command() -def list(): +def list() -> None: + """List all connected SDWire devices with their block device information.""" print("Serial\t\t\tProduct Info\t\tBlock Dev") for sdwire in detect.get_sdwire_devices(): print(sdwire)