diff --git a/.gitignore b/.gitignore index 95ad927..4733564 100644 --- a/.gitignore +++ b/.gitignore @@ -192,3 +192,6 @@ $RECYCLE.BIN/ # Linux .directory .Trash-* + +# Claude local permissions +.claude/settings.local.json \ No newline at end of file diff --git a/README.md b/README.md index 8fc22ca..cb07c47 100644 --- a/README.md +++ b/README.md @@ -1,270 +1,322 @@ -# IoTHackBot - -Open-source IoT security testing toolkit with integrated Claude Code skills for automated vulnerability discovery. - -## Overview - -IoTHackBot is a collection of specialized tools and Claude Code skills designed for security testing of IoT devices, IP cameras, and embedded systems. It provides both command-line tools and AI-assisted workflows for comprehensive IoT security assessments. - -## Tools Included - -### Network Discovery & Reconnaissance - -- **wsdiscovery** - WS-Discovery protocol scanner for discovering ONVIF cameras and IoT devices -- **iotnet** - IoT network traffic analyzer for detecting protocols and vulnerabilities -- **netflows** - Network flow extractor with DNS hostname resolution from pcap files -- **nmap** (skill) - Professional network reconnaissance with two-phase scanning strategy - -### Device-Specific Testing - -- **onvifscan** - ONVIF device security scanner - - Authentication bypass testing - - Credential brute-forcing - -### Firmware & File Analysis - -- **chipsec** (skill) - UEFI/BIOS firmware static analysis - - Detect known rootkits (LoJax, ThinkPwn, HackingTeam) - - Generate EFI executable inventories with hashes - - Decode firmware structure and extract NVRAM - -- **ffind** - Advanced file finder with type detection and filesystem extraction - - Identifies artifact file types - - Extracts ext2/3/4 and F2FS filesystems - - Designed for firmware analysis - -### Android Analysis - -- **apktool** (skill) - APK unpacking and resource extraction - - Decode AndroidManifest.xml - - Extract resources, layouts, strings - - Disassemble to smali code - -- **jadx** (skill) - APK decompilation - - Convert DEX to readable Java source - - Search for hardcoded credentials - - Analyze app logic - -### Hardware & Console Access - -- **picocom** (skill) - IoT UART console interaction for hardware testing - - Bootloader manipulation - - Shell enumeration - - Firmware extraction - - Includes Python helper script for automated interaction - -- **telnetshell** (skill) - IoT telnet shell interaction - - Unauthenticated shell testing - - Device enumeration - - BusyBox command handling - - Includes Python helper script and pre-built enumeration scripts - -## Installation - -### Prerequisites - -```bash -# Python dependencies -pip install colorama pyserial pexpect requests - -# System dependencies (Arch Linux) -sudo pacman -S nmap e2fsprogs f2fs-tools python python-pip inetutils - -# For other distributions, install equivalent packages -``` - -### Setup - -1. Clone the repository: -```bash -git clone https://github.com/BrownFineSecurity/iothackbot.git -cd iothackbot -``` - -2. Add the bin directory to your PATH: -```bash -export PATH="$PATH:$(pwd)/bin" -``` - -3. For permanent setup, add to your shell configuration: -```bash -echo 'export PATH="$PATH:/path/to/iothackbot/bin"' >> ~/.bashrc -``` - -## Usage - -### Quick Start Examples - -#### Discover ONVIF Devices -```bash -wsdiscovery 192.168.1.0/24 -``` - -#### Test ONVIF Device Security -```bash -onvifscan auth http://192.168.1.100 -onvifscan brute http://192.168.1.100 -``` - -#### Analyze Network Traffic -```bash -# Analyze PCAP file for IoT protocols -iotnet capture.pcap - -# Live capture -sudo iotnet -i eth0 -d 60 -``` - -#### Extract Network Flows -```bash -# Extract flows from device with DNS resolution -netflows capture.pcap --source-ip 192.168.1.100 - -# Get just hostname:port list -netflows capture.pcap -s 192.168.1.100 --format quiet -``` - -#### Analyze Firmware -```bash -# Identify file types -ffind firmware.bin - -# Extract filesystems (requires sudo) -sudo ffind firmware.bin -e -``` - -### Claude Code Plugin - -IoTHackBot is available as a Claude Code plugin, providing AI-assisted security testing with specialized skills. - -#### Available Skills - -| Skill | Description | -|-------|-------------| -| **chipsec** | UEFI/BIOS firmware static analysis - malware detection, EFI inventory | -| **apktool** | Android APK unpacking and resource extraction | -| **jadx** | Android APK decompilation to Java source | -| **ffind** | Firmware file analysis with filesystem extraction | -| **iotnet** | IoT network traffic analysis | -| **netflows** | Network flow extraction with DNS hostname resolution | -| **nmap** | Professional network reconnaissance | -| **onvifscan** | ONVIF device security testing | -| **picocom** | UART console interaction | -| **telnetshell** | Telnet shell enumeration | -| **wsdiscovery** | WS-Discovery device discovery | - -#### Plugin Installation - -**Option 1: Use directly during development** - -```bash -claude --plugin-dir /path/to/iothackbot -``` - -**Option 2: Install as local marketplace (persistent)** - -Add to `~/.claude/settings.json`: - -```json -{ - "extraKnownMarketplaces": { - "iothackbot-local": { - "source": { - "source": "directory", - "path": "/path/to/iothackbot" - } - } - }, - "enabledPlugins": { - "iothackbot": true - } -} -``` - -Then restart Claude Code for the settings to take effect. - -**Option 3: Project-specific setup** - -For use within a specific project, the skills are also available via the `.claude/skills/` symlink for backwards compatibility. - -## Tool Architecture - -All tools follow a consistent design pattern: - -- **CLI Layer** (`tools/iothackbot/*.py`) - Command-line interface with argparse -- **Core Layer** (`tools/iothackbot/core/*_core.py`) - Core functionality implementing ToolInterface -- **Binary** (`bin/*`) - Executable wrapper scripts - -This separation enables: -- Easy automation and chaining -- Consistent output formats (text, JSON, quiet) -- Standardized error handling -- Tool composition and pipelines - -## Configuration - -### IoT Detection Rules -`config/iot/detection_rules.json` - Custom IoT protocol detection rules for iotnet - -### Wordlists -- `wordlists/onvif-usernames.txt` - Default usernames for ONVIF devices -- `wordlists/onvif-passwords.txt` - Default passwords for ONVIF devices - -## Development - -### Adding New Tools - -See `TOOL_DEVELOPMENT_GUIDE.md` for detailed information on: -- Project structure standards -- Development patterns -- Output formatting guidelines -- Testing and integration - -### Key Interfaces - -- **ToolInterface** - Base interface for all tools -- **ToolConfig** - Standardized configuration object -- **ToolResult** - Standardized result object with success, data, errors, and metadata - -## Output Formats - -All tools support multiple output formats: - -```bash -# Human-readable text with colors (default) -onvifscan auth 192.168.1.100 - -# Machine-readable JSON -onvifscan auth 192.168.1.100 --format json - -# Minimal output -onvifscan auth 192.168.1.100 --format quiet -``` - -## Security & Ethics - -**IMPORTANT**: These tools are designed for authorized security testing only. - -- Only test devices you own or have explicit permission to test -- Respect scope limitations and rules of engagement -- Be aware of the impact on production systems -- Use appropriate timing to avoid denial of service -- Document all testing activities -- Follow responsible disclosure practices - -## Contributing - -Contributions are welcome! Please ensure: - -- New tools follow the architecture patterns in `TOOL_DEVELOPMENT_GUIDE.md` -- All tools support text, JSON, and quiet output formats -- Code includes proper error handling -- Documentation is clear and comprehensive - -## License - -MIT License - See LICENSE file for details - -## Disclaimer - -This toolkit is provided for educational and authorized security testing purposes only. Users are responsible for ensuring they have proper authorization before testing any systems. The authors are not responsible for misuse or damage caused by this toolkit. +# IoTHackBot + +Open-source IoT security testing toolkit with integrated Claude Code skills for automated vulnerability discovery. + +## Overview + +IoTHackBot is a collection of specialized tools and Claude Code skills designed for security testing of IoT devices, IP cameras, and embedded systems. It provides both command-line tools and AI-assisted workflows for comprehensive IoT security assessments. + +## Tools Included + +### Network Discovery & Reconnaissance + +- **wsdiscovery** - WS-Discovery protocol scanner for discovering ONVIF cameras and IoT devices +- **iotnet** - IoT network traffic analyzer for detecting protocols and vulnerabilities +- **netflows** - Network flow extractor with DNS hostname resolution from pcap files +- **nmap** (skill) - Professional network reconnaissance with two-phase scanning strategy + +### Device-Specific Testing + +- **onvifscan** - ONVIF device security scanner + - Authentication bypass testing + - Credential brute-forcing + +### Firmware & File Analysis + +- **chipsec** (skill) - UEFI/BIOS firmware static analysis + - Detect known rootkits (LoJax, ThinkPwn, HackingTeam) + - Generate EFI executable inventories with hashes + - Decode firmware structure and extract NVRAM + +- **ffind** - Advanced file finder with type detection and filesystem extraction + - Identifies artifact file types + - Extracts ext2/3/4 and F2FS filesystems + - Designed for firmware analysis + +### Android Analysis + +- **apktool** (skill) - APK unpacking and resource extraction + - Decode AndroidManifest.xml + - Extract resources, layouts, strings + - Disassemble to smali code + +- **jadx** (skill) - APK decompilation + - Convert DEX to readable Java source + - Search for hardcoded credentials + - Analyze app logic + +### Hardware & Console Access + +- **picocom** (skill) - IoT UART console interaction for hardware testing + - Bootloader manipulation + - Shell enumeration + - Firmware extraction + - Includes Python helper script for automated interaction + +- **telnetshell** (skill) - IoT telnet shell interaction + - Unauthenticated shell testing + - Device enumeration + - BusyBox command handling + - Includes Python helper script and pre-built enumeration scripts + +### Logic Analyzer & Signal Analysis + +- **sigrok** (skill) - Analyze logic analyzer captures using sigrok-cli and 131+ protocol decoders + - Native .sr file parsing (no sigrok-cli needed for timing analysis) + - Supports .sr, .csv, and .vcd formats + - Decode UART, SPI, I2C, CAN, JTAG, USB, 1-Wire, and many more + - Timing analysis with histograms, cluster detection, and protocol guessing + - Binary data extraction from decoded protocols + +- **logicmso** (skill) - Analyze captures from Saleae Logic MSO devices + - Decode protocols (UART, SPI, I2C) from exported binary files + - Digital and analog capture analysis + - Hardware reverse engineering and CTF challenges + +- **urh** (skill) - RF signal analysis and protocol decoding using Universal Radio Hacker + - Supports OOK, ASK, FSK, and PSK modulations + - Loads .complex/.cfile (float32), .complex16s, .complex16u, and .wav IQ formats + - Native IQ loading with numpy — spectrum, energy, and OOK demodulation without URH installed + - Protocol decoding via urh_cli with automatic modulation detection + - Covers 433MHz/315MHz remotes, Z-Wave, Zigbee, and arbitrary RF protocols + - Recording via URH GUI with any compatible SDR (RTL-SDR, HackRF, USRP, etc.) + +## Installation + +### Prerequisites + +```bash +# Python dependencies +pip install colorama pyserial pexpect requests + +# System dependencies (Arch Linux) +sudo pacman -S nmap e2fsprogs f2fs-tools python python-pip inetutils + +# For other distributions, install equivalent packages +``` + +### Setup + +1. Clone the repository: +```bash +git clone https://github.com/BrownFineSecurity/iothackbot.git +cd iothackbot +``` + +2. Add the bin directory to your PATH: +```bash +export PATH="$PATH:$(pwd)/bin" +``` + +3. For permanent setup, add to your shell configuration: +```bash +echo 'export PATH="$PATH:/path/to/iothackbot/bin"' >> ~/.bashrc +``` + +## Usage + +### Quick Start Examples + +#### Discover ONVIF Devices +```bash +wsdiscovery 192.168.1.0/24 +``` + +#### Test ONVIF Device Security +```bash +onvifscan auth http://192.168.1.100 +onvifscan brute http://192.168.1.100 +``` + +#### Analyze Network Traffic +```bash +# Analyze PCAP file for IoT protocols +iotnet capture.pcap + +# Live capture +sudo iotnet -i eth0 -d 60 +``` + +#### Extract Network Flows +```bash +# Extract flows from device with DNS resolution +netflows capture.pcap --source-ip 192.168.1.100 + +# Get just hostname:port list +netflows capture.pcap -s 192.168.1.100 --format quiet +``` + +#### Analyze Logic Analyzer Captures +```bash +# Timing analysis of a sigrok capture (no sigrok-cli needed) +python3 skills/sigrok/analyze_capture.py capture.sr --histogram --clusters + +# Decode UART protocol (requires sigrok-cli) +python3 skills/sigrok/analyze_capture.py capture.sr --decode uart:baudrate=115200 + +# Analyze a specific channel +python3 skills/sigrok/analyze_capture.py capture.sr --channel D2 --raw +``` + +#### Analyze RF Signals +```bash +# Signal overview and modulation hint +python3 skills/urh/analyze_signal.py signal.complex -s 2000000 + +# Inspect power spectrum +python3 skills/urh/analyze_signal.py signal.complex -s 2000000 --spectrum + +# Demodulate OOK and show pulse timing clusters (e.g. 433MHz remote) +python3 skills/urh/analyze_signal.py signal.complex -s 2000000 --demodulate ook --clusters + +# Full protocol decoding via urh_cli +python3 skills/urh/analyze_signal.py signal.complex -s 2000000 --decode --modulation OOK +``` + +#### Analyze Firmware +```bash +# Identify file types +ffind firmware.bin + +# Extract filesystems (requires sudo) +sudo ffind firmware.bin -e +``` + +### Claude Code Plugin + +IoTHackBot is available as a Claude Code plugin, providing AI-assisted security testing with specialized skills. + +#### Available Skills + +| Skill | Description | +|-------|-------------| +| **chipsec** | UEFI/BIOS firmware static analysis - malware detection, EFI inventory | +| **apktool** | Android APK unpacking and resource extraction | +| **jadx** | Android APK decompilation to Java source | +| **ffind** | Firmware file analysis with filesystem extraction | +| **iotnet** | IoT network traffic analysis | +| **netflows** | Network flow extraction with DNS hostname resolution | +| **nmap** | Professional network reconnaissance | +| **onvifscan** | ONVIF device security testing | +| **logicmso** | Saleae Logic MSO capture analysis and protocol decoding | +| **picocom** | UART console interaction | +| **sigrok** | Logic analyzer capture analysis with 131+ protocol decoders | +| **urh** | RF signal analysis — OOK/FSK/ASK/PSK demodulation and protocol decoding | +| **telnetshell** | Telnet shell enumeration | +| **wsdiscovery** | WS-Discovery device discovery | + +#### Plugin Installation + +**Option 1: Use directly during development** + +```bash +claude --plugin-dir /path/to/iothackbot +``` + +**Option 2: Install as local marketplace (persistent)** + +Add to `~/.claude/settings.json`: + +```json +{ + "extraKnownMarketplaces": { + "iothackbot-local": { + "source": { + "source": "directory", + "path": "/path/to/iothackbot" + } + } + }, + "enabledPlugins": { + "iothackbot": true + } +} +``` + +Then restart Claude Code for the settings to take effect. + +**Option 3: Project-specific setup** + +For use within a specific project, the skills are also available via the `.claude/skills/` symlink for backwards compatibility. + +## Tool Architecture + +All tools follow a consistent design pattern: + +- **CLI Layer** (`tools/iothackbot/*.py`) - Command-line interface with argparse +- **Core Layer** (`tools/iothackbot/core/*_core.py`) - Core functionality implementing ToolInterface +- **Binary** (`bin/*`) - Executable wrapper scripts + +This separation enables: +- Easy automation and chaining +- Consistent output formats (text, JSON, quiet) +- Standardized error handling +- Tool composition and pipelines + +## Configuration + +### IoT Detection Rules +`config/iot/detection_rules.json` - Custom IoT protocol detection rules for iotnet + +### Wordlists +- `wordlists/onvif-usernames.txt` - Default usernames for ONVIF devices +- `wordlists/onvif-passwords.txt` - Default passwords for ONVIF devices + +## Development + +### Adding New Tools + +See `TOOL_DEVELOPMENT_GUIDE.md` for detailed information on: +- Project structure standards +- Development patterns +- Output formatting guidelines +- Testing and integration + +### Key Interfaces + +- **ToolInterface** - Base interface for all tools +- **ToolConfig** - Standardized configuration object +- **ToolResult** - Standardized result object with success, data, errors, and metadata + +## Output Formats + +All tools support multiple output formats: + +```bash +# Human-readable text with colors (default) +onvifscan auth 192.168.1.100 + +# Machine-readable JSON +onvifscan auth 192.168.1.100 --format json + +# Minimal output +onvifscan auth 192.168.1.100 --format quiet +``` + +## Security & Ethics + +**IMPORTANT**: These tools are designed for authorized security testing only. + +- Only test devices you own or have explicit permission to test +- Respect scope limitations and rules of engagement +- Be aware of the impact on production systems +- Use appropriate timing to avoid denial of service +- Document all testing activities +- Follow responsible disclosure practices + +## Contributing + +Contributions are welcome! Please ensure: + +- New tools follow the architecture patterns in `TOOL_DEVELOPMENT_GUIDE.md` +- All tools support text, JSON, and quiet output formats +- Code includes proper error handling +- Documentation is clear and comprehensive + +## License + +MIT License - See LICENSE file for details + +## Disclaimer + +This toolkit is provided for educational and authorized security testing purposes only. Users are responsible for ensuring they have proper authorization before testing any systems. The authors are not responsible for misuse or damage caused by this toolkit. diff --git a/skills/common/__init__.py b/skills/common/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/skills/common/signal_analysis.py b/skills/common/signal_analysis.py new file mode 100644 index 0000000..918242e --- /dev/null +++ b/skills/common/signal_analysis.py @@ -0,0 +1,238 @@ +#!/usr/bin/env python3 +""" +Shared signal analysis utilities for logic analyzer skills. + +Provides timing analysis, cluster detection, protocol guessing, +histogram generation, and duration formatting used by both the +logicmso and sigrok skills. +""" + +import re +from typing import List, Tuple + +import numpy as np + + +# Common baud rates and their bit periods in microseconds +COMMON_BAUD_RATES = { + 300: 3333.33, + 1200: 833.33, + 2400: 416.67, + 4800: 208.33, + 9600: 104.17, + 19200: 52.08, + 38400: 26.04, + 57600: 17.36, + 115200: 8.68, + 230400: 4.34, + 460800: 2.17, + 921600: 1.09, +} + + +def analyze_timing(times: np.ndarray, initial_state: int, + duration: float) -> dict: + """ + Analyze timing characteristics of a digital signal. + + Args: + times: Array of transition timestamps in seconds. + initial_state: Starting logic level (0 or 1). + duration: Total capture duration in seconds. + + Returns: + Dict with timing statistics, or {'error': msg} on failure. + """ + if len(times) < 2: + return {'error': 'Not enough transitions'} + + durations_s = np.diff(times) + durations_us = durations_s * 1e6 + + # Separate HIGH and LOW durations + high_idx = 0 if initial_state == 0 else 1 + low_idx = 1 - high_idx + + high_durations_us = durations_us[high_idx::2] + low_durations_us = durations_us[low_idx::2] + + return { + 'total_transitions': len(times), + 'capture_duration_s': duration, + 'signal_duration_s': times[-1] - times[0] if len(times) > 0 else 0, + 'initial_state': 'HIGH' if initial_state else 'LOW', + 'all': { + 'min_us': float(durations_us.min()), + 'max_us': float(durations_us.max()), + 'mean_us': float(durations_us.mean()), + 'std_us': float(durations_us.std()), + }, + 'high': { + 'count': len(high_durations_us), + 'min_us': float(high_durations_us.min()) if len(high_durations_us) > 0 else 0, + 'max_us': float(high_durations_us.max()) if len(high_durations_us) > 0 else 0, + 'mean_us': float(high_durations_us.mean()) if len(high_durations_us) > 0 else 0, + }, + 'low': { + 'count': len(low_durations_us), + 'min_us': float(low_durations_us.min()) if len(low_durations_us) > 0 else 0, + 'max_us': float(low_durations_us.max()) if len(low_durations_us) > 0 else 0, + 'mean_us': float(low_durations_us.mean()) if len(low_durations_us) > 0 else 0, + }, + 'durations_us': durations_us, + 'high_durations_us': high_durations_us, + 'low_durations_us': low_durations_us, + } + + +def detect_clusters(durations_us: np.ndarray, + tolerance: float = 0.15) -> List[Tuple[float, int]]: + """ + Detect clusters of similar durations. + + Returns list of (center_value, count) tuples sorted by count (most + common first). + """ + if len(durations_us) == 0: + return [] + + sorted_durations = np.sort(durations_us) + clusters = [] + current_cluster = [sorted_durations[0]] + + for dur in sorted_durations[1:]: + cluster_mean = np.mean(current_cluster) + if cluster_mean > 0 and abs(dur - cluster_mean) / cluster_mean <= tolerance: + current_cluster.append(dur) + elif cluster_mean == 0 and dur == 0: + current_cluster.append(dur) + else: + clusters.append((float(np.mean(current_cluster)), len(current_cluster))) + current_cluster = [dur] + + if current_cluster: + clusters.append((float(np.mean(current_cluster)), len(current_cluster))) + + clusters.sort(key=lambda x: -x[1]) + return clusters + + +def guess_protocol(analysis: dict) -> List[Tuple[str, float, str]]: + """ + Attempt to guess the protocol based on timing characteristics. + + Returns list of (protocol_name, confidence, details) tuples sorted + by confidence (highest first). + """ + guesses = [] + + all_min = analysis['all']['min_us'] + all_max = analysis['all']['max_us'] + + # Check for UART (look for consistent bit period) + for baud, period_us in COMMON_BAUD_RATES.items(): + if 0.7 < all_min / period_us < 1.3: + multiples = analysis['durations_us'] / period_us + rounded = np.round(multiples) + error = np.abs(multiples - rounded).mean() + if error < 0.15: + guesses.append(( + f'UART ({baud} baud)', + max(0.3, 0.9 - error * 3), + f'Bit period ~{period_us:.1f}us' + )) + + # Check for 1-Wire (reset pulse ~480us, data pulses 1-120us) + if all_min < 20 and all_max > 400: + has_reset = any(400 < d < 600 for d in analysis['low_durations_us']) + has_short = any(d < 20 for d in analysis['durations_us']) + if has_reset and has_short: + guesses.append(( + '1-Wire', + 0.6, + 'Detected reset pulses and short data pulses' + )) + + # Check for CAN bus + can_bitrates = {125000: 8.0, 250000: 4.0, 500000: 2.0, 1000000: 1.0} + for bitrate, period_us in can_bitrates.items(): + if 0.7 < all_min / period_us < 1.3: + multiples = analysis['durations_us'] / period_us + rounded = np.round(multiples) + error = np.abs(multiples - rounded).mean() + if error < 0.15: + guesses.append(( + f'CAN ({bitrate // 1000}kbps)', + max(0.3, 0.85 - error * 3), + f'Bit period ~{period_us:.1f}us' + )) + + guesses.sort(key=lambda x: -x[1]) + return guesses + + +def format_duration(us: float) -> str: + """Format a duration in microseconds with appropriate units.""" + if us < 1000: + return f"{us:.1f}us" + elif us < 1000000: + return f"{us/1000:.2f}ms" + else: + return f"{us/1e6:.3f}s" + + +def print_histogram(durations_us: np.ndarray, bins: int = 20, + title: str = "Duration Histogram"): + """Print a simple ASCII histogram of timing durations.""" + if len(durations_us) == 0: + print(f"{title}: No data") + return + + hist, edges = np.histogram(durations_us, bins=bins) + max_count = max(hist) + + print(f"\n{title}") + print("=" * 60) + + for i, count in enumerate(hist): + left = edges[i] + right = edges[i + 1] + bar_len = int(40 * count / max_count) if max_count > 0 else 0 + bar = "#" * bar_len + label = f"{format_duration(left):>10s}-{format_duration(right):>10s}" + print(f"{label} |{bar} ({count})") + + +def export_transitions_csv(times: np.ndarray, initial_state: int, + output_path, label: str = ""): + """Export transitions to CSV file.""" + from pathlib import Path + output_path = Path(output_path) + + with open(output_path, 'w') as f: + f.write("index,time_s,state,duration_us\n") + + for i, t in enumerate(times): + state = (initial_state + i) % 2 + if i < len(times) - 1: + dur = (times[i + 1] - t) * 1e6 + else: + dur = 0 + f.write(f"{i},{t:.9f},{state},{dur:.3f}\n") + + print(f"Exported {len(times)} transitions to {output_path}") + + +def parse_sample_rate(text: str) -> float: + """ + Extract sample rate from a string like '24 MHz' or '1 kHz'. + + Returns sample rate in Hz, or 0.0 if not found. + """ + match = re.search(r'(\d+(?:\.\d+)?)\s*(Hz|kHz|MHz|GHz)', text) + if match: + val = float(match.group(1)) + unit = match.group(2) + multiplier = {'Hz': 1, 'kHz': 1e3, 'MHz': 1e6, 'GHz': 1e9} + return val * multiplier.get(unit, 1) + return 0.0 diff --git a/skills/logicmso/analyze_protocol.py b/skills/logicmso/analyze_protocol.py index c4fbb80..acf1faf 100755 --- a/skills/logicmso/analyze_protocol.py +++ b/skills/logicmso/analyze_protocol.py @@ -9,7 +9,6 @@ import argparse import sys from pathlib import Path -from collections import Counter import numpy as np @@ -19,25 +18,19 @@ print("Error: saleae-mso-api not installed. Run: pip install saleae-mso-api") sys.exit(1) +# Add parent directory to path for shared module imports +sys.path.insert(0, str(Path(__file__).resolve().parent.parent)) +from common.signal_analysis import ( + analyze_timing as _analyze_timing, + detect_clusters, + export_transitions_csv, + format_duration, + guess_protocol, + print_histogram, +) -# Common baud rates and their bit periods in microseconds -COMMON_BAUD_RATES = { - 300: 3333.33, - 1200: 833.33, - 2400: 416.67, - 4800: 208.33, - 9600: 104.17, - 19200: 52.08, - 38400: 26.04, - 57600: 17.36, - 115200: 8.68, - 230400: 4.34, - 460800: 2.17, - 921600: 1.09, -} - - -def load_capture(file_path: Path) -> tuple: + +def load_capture(file_path: Path) -> dict: """Load a Saleae binary capture file and return transition data.""" saleae_file = read_file(file_path) @@ -58,176 +51,13 @@ def load_capture(file_path: Path) -> tuple: def analyze_timing(data: dict) -> dict: """Analyze timing characteristics of the signal.""" - times = data['times'] - - if len(times) < 2: - return {'error': 'Not enough transitions'} - - durations_s = np.diff(times) - durations_us = durations_s * 1e6 - durations_ms = durations_s * 1e3 - - # Separate HIGH and LOW durations - initial = data['initial_state'] - high_idx = 0 if initial == 0 else 1 - low_idx = 1 - high_idx - - high_durations_us = durations_us[high_idx::2] - low_durations_us = durations_us[low_idx::2] - - return { - 'total_transitions': len(times), - 'capture_duration_s': data['end_time'] - data['begin_time'], - 'signal_duration_s': times[-1] - times[0] if len(times) > 0 else 0, - 'initial_state': 'HIGH' if initial else 'LOW', - 'all': { - 'min_us': float(durations_us.min()), - 'max_us': float(durations_us.max()), - 'mean_us': float(durations_us.mean()), - 'std_us': float(durations_us.std()), - }, - 'high': { - 'count': len(high_durations_us), - 'min_us': float(high_durations_us.min()) if len(high_durations_us) > 0 else 0, - 'max_us': float(high_durations_us.max()) if len(high_durations_us) > 0 else 0, - 'mean_us': float(high_durations_us.mean()) if len(high_durations_us) > 0 else 0, - }, - 'low': { - 'count': len(low_durations_us), - 'min_us': float(low_durations_us.min()) if len(low_durations_us) > 0 else 0, - 'max_us': float(low_durations_us.max()) if len(low_durations_us) > 0 else 0, - 'mean_us': float(low_durations_us.mean()) if len(low_durations_us) > 0 else 0, - }, - 'durations_us': durations_us, - 'high_durations_us': high_durations_us, - 'low_durations_us': low_durations_us, - } - - -def detect_clusters(durations_us: np.ndarray, tolerance: float = 0.15) -> list: - """ - Detect clusters of similar durations. - - Returns list of (center_value, count) tuples. - """ - if len(durations_us) == 0: - return [] - - sorted_durations = np.sort(durations_us) - clusters = [] - current_cluster = [sorted_durations[0]] - - for dur in sorted_durations[1:]: - # Check if this duration is within tolerance of current cluster - cluster_mean = np.mean(current_cluster) - if abs(dur - cluster_mean) / cluster_mean <= tolerance: - current_cluster.append(dur) - else: - # Save current cluster and start new one - clusters.append((np.mean(current_cluster), len(current_cluster))) - current_cluster = [dur] - - # Don't forget the last cluster - if current_cluster: - clusters.append((np.mean(current_cluster), len(current_cluster))) - - # Sort by count (most common first) - clusters.sort(key=lambda x: -x[1]) - - return clusters - - -def guess_protocol(analysis: dict) -> list: - """ - Attempt to guess the protocol based on timing characteristics. - - Returns list of (protocol_name, confidence, details) tuples. - """ - guesses = [] - - all_min = analysis['all']['min_us'] - all_max = analysis['all']['max_us'] - high_clusters = detect_clusters(analysis['high_durations_us']) - low_clusters = detect_clusters(analysis['low_durations_us']) - - # Check for UART (look for consistent bit period) - for baud, period_us in COMMON_BAUD_RATES.items(): - # Check if minimum duration is close to a baud rate bit period - if 0.7 < all_min / period_us < 1.3: - # Check if durations are multiples of the bit period - multiples = analysis['durations_us'] / period_us - rounded = np.round(multiples) - error = np.abs(multiples - rounded).mean() - if error < 0.15: - guesses.append(( - f'UART ({baud} baud)', - max(0.3, 0.9 - error * 3), - f'Bit period ~{period_us:.1f}us' - )) - - # Check for 1-Wire (reset pulse ~480us, data pulses 1-120us) - if all_min < 20 and all_max > 400: - has_reset = any(400 < d < 600 for d in analysis['low_durations_us']) - has_short = any(d < 20 for d in analysis['durations_us']) - if has_reset and has_short: - guesses.append(( - '1-Wire', - 0.6, - 'Detected reset pulses and short data pulses' - )) - - # Sort by confidence - guesses.sort(key=lambda x: -x[1]) - - return guesses - - -def print_histogram(durations_us: np.ndarray, bins: int = 20, title: str = "Duration Histogram"): - """Print a simple ASCII histogram.""" - if len(durations_us) == 0: - print(f"{title}: No data") - return - - hist, edges = np.histogram(durations_us, bins=bins) - max_count = max(hist) - - print(f"\n{title}") - print("=" * 60) - - for i, count in enumerate(hist): - left = edges[i] - right = edges[i + 1] - bar_len = int(40 * count / max_count) if max_count > 0 else 0 - bar = "#" * bar_len - - # Choose appropriate unit - if right < 1000: - label = f"{left:7.1f}-{right:7.1f}us" - elif right < 1000000: - label = f"{left/1000:7.2f}-{right/1000:7.2f}ms" - else: - label = f"{left/1e6:7.3f}-{right/1e6:7.3f}s" - - print(f"{label} |{bar} ({count})") + duration = data['end_time'] - data['begin_time'] + return _analyze_timing(data['times'], data['initial_state'], duration) def export_csv(data: dict, output_path: Path): """Export transitions to CSV file.""" - times = data['times'] - initial = data['initial_state'] - - with open(output_path, 'w') as f: - f.write("index,time_s,state,duration_us\n") - - for i, t in enumerate(times): - state = (initial + i) % 2 - if i < len(times) - 1: - dur = (times[i + 1] - t) * 1e6 - else: - dur = 0 - f.write(f"{i},{t:.9f},{state},{dur:.3f}\n") - - print(f"Exported {len(times)} transitions to {output_path}") + export_transitions_csv(data['times'], data['initial_state'], output_path) def main(): @@ -279,13 +109,19 @@ def main(): print("Timing Summary") print("-" * 40) a = analysis['all'] - print(f"All durations: min={a['min_us']:.1f}us max={a['max_us']:.1f}us mean={a['mean_us']:.1f}us") + print(f"All durations: min={format_duration(a['min_us'])} " + f"max={format_duration(a['max_us'])} " + f"mean={format_duration(a['mean_us'])}") h = analysis['high'] - print(f"HIGH pulses ({h['count']}): min={h['min_us']:.1f}us max={h['max_us']:.1f}us mean={h['mean_us']:.1f}us") - - l = analysis['low'] - print(f"LOW gaps ({l['count']}): min={l['min_us']:.1f}us max={l['max_us']:.1f}us mean={l['mean_us']:.1f}us") + print(f"HIGH pulses ({h['count']}): min={format_duration(h['min_us'])} " + f"max={format_duration(h['max_us'])} " + f"mean={format_duration(h['mean_us'])}") + + lo = analysis['low'] + print(f"LOW gaps ({lo['count']}): min={format_duration(lo['min_us'])} " + f"max={format_duration(lo['max_us'])} " + f"mean={format_duration(lo['mean_us'])}") print() # Protocol guesses @@ -306,18 +142,12 @@ def main(): high_clusters = detect_clusters(analysis['high_durations_us']) print("HIGH pulse clusters:") for center, count in high_clusters[:5]: - if center < 1000: - print(f" ~{center:.1f}us ({count} occurrences)") - else: - print(f" ~{center/1000:.2f}ms ({count} occurrences)") + print(f" ~{format_duration(center)} ({count} occurrences)") low_clusters = detect_clusters(analysis['low_durations_us']) print("LOW gap clusters:") for center, count in low_clusters[:5]: - if center < 1000: - print(f" ~{center:.1f}us ({count} occurrences)") - else: - print(f" ~{center/1000:.2f}ms ({count} occurrences)") + print(f" ~{format_duration(center)} ({count} occurrences)") print() # Raw values @@ -329,10 +159,7 @@ def main(): for i in range(min(args.n, len(durations))): state = "HIGH" if (i + initial) % 2 == 0 else "LOW" dur = durations[i] - if dur < 1000: - print(f" [{i:3d}] {state}: {dur:.1f}us") - else: - print(f" [{i:3d}] {state}: {dur/1000:.2f}ms") + print(f" [{i:3d}] {state}: {format_duration(dur)}") print() # Histogram diff --git a/skills/sigrok/SKILL.md b/skills/sigrok/SKILL.md new file mode 100644 index 0000000..3e0648c --- /dev/null +++ b/skills/sigrok/SKILL.md @@ -0,0 +1,263 @@ +--- +name: sigrok +description: Analyze logic analyzer captures (.sr, CSV, VCD) using sigrok-cli and 131+ protocol decoders. Decode UART, SPI, I2C, CAN, JTAG, USB, 1-Wire, and many more protocols from any sigrok-compatible hardware. Use for CTF challenges, hardware reverse engineering, and protocol decoding. +--- + +# Sigrok Capture Analysis + +This skill enables analysis of captured digital signals from any sigrok-compatible logic analyzer. It wraps `sigrok-cli` for protocol decoding (131+ decoders) and provides custom timing analysis, cluster detection, and protocol identification. + +Sigrok supports 258+ devices from 58 vendors, including Saleae Logic clones, DSLogic, Kingst LA series, fx2lafw-based analyzers, and many more. + +## Prerequisites + +- `sigrok-cli` installed — **Do NOT blindly install.** First check if it's available: + ```bash + sigrok-cli --version + ``` + Only if that fails, install it: + - Arch Linux: `sudo pacman -S sigrok-cli` + - Ubuntu/Debian: `sudo apt install sigrok-cli` + - Fedora: `sudo dnf install sigrok-cli` + - macOS: `brew install sigrok-cli` + - Windows: Download from https://sigrok.org/wiki/Downloads + - If you get a missing DLL error: `winget install --id=Microsoft.VCRedist.2010.x64` + +- `numpy` Python package (for timing analysis): + ```bash + python3 -c "import numpy; print('numpy available')" + ``` + Only if that fails: `pip install numpy` + +## Supported File Formats + +| Format | Extension | Description | +|--------|-----------|-------------| +| Sigrok session | `.sr` | Native PulseView/sigrok session files | +| CSV | `.csv` | Comma-separated values (sigrok or generic) | +| VCD | `.vcd` | Value Change Dump (standard digital waveform format) | + +All formats are auto-detected by file extension. + +## Quick Reference + +### File Information + +```bash +# Show capture metadata (channels, sample rate, duration) +python3 skills/sigrok/analyze_capture.py capture.sr --show +``` + +### Timing Analysis + +```bash +# Basic timing analysis with protocol guessing +python3 skills/sigrok/analyze_capture.py capture.sr + +# Show detailed timing histogram +python3 skills/sigrok/analyze_capture.py capture.sr --histogram + +# Show detected timing clusters +python3 skills/sigrok/analyze_capture.py capture.sr --clusters + +# Analyze a specific channel +python3 skills/sigrok/analyze_capture.py capture.sr --channel D2 + +# Show raw transition values +python3 skills/sigrok/analyze_capture.py capture.sr --raw -n 50 + +# Export transitions to CSV +python3 skills/sigrok/analyze_capture.py capture.sr --export transitions.csv +``` + +### Protocol Decoding + +```bash +# Decode UART at 115200 baud +python3 skills/sigrok/analyze_capture.py capture.sr --decode uart:baudrate=115200 + +# Decode with annotation filtering (show only TX data) +python3 skills/sigrok/analyze_capture.py capture.sr --decode uart:baudrate=115200 --annotations uart=tx-data + +# Decode SPI +python3 skills/sigrok/analyze_capture.py capture.sr --decode spi:cpol=0:cpha=0 + +# Stacked decoders (I2C + EEPROM) +python3 skills/sigrok/analyze_capture.py capture.sr --decode i2c,eeprom24xx + +# CAN bus decoding +python3 skills/sigrok/analyze_capture.py capture.sr --decode can:bitrate=500000 + +# Extract binary data from decoder +python3 skills/sigrok/analyze_capture.py capture.sr --decode uart:baudrate=115200 --binary-decode uart=tx --binary-out data.bin + +# List all available decoders +python3 skills/sigrok/analyze_capture.py --list-decoders + +# Include sample numbers in output +python3 skills/sigrok/analyze_capture.py capture.sr --decode uart:baudrate=115200 --samplenum +``` + +### Direct sigrok-cli Usage + +For advanced use cases, invoke sigrok-cli directly: + +```bash +# Export .sr to CSV +sigrok-cli -i capture.sr -O csv > capture.csv + +# Decode with full output +sigrok-cli -i capture.sr -P uart:baudrate=115200 + +# Stacked decoders with annotation filter +sigrok-cli -i capture.sr -P i2c,eeprom24xx -A eeprom24xx + +# Binary output (raw decoded bytes) +sigrok-cli -i capture.sr -P uart:baudrate=115200 -B uart=tx > uart_data.bin + +# Show file details +sigrok-cli -i capture.sr --show + +# List all decoders with details +sigrok-cli -L +``` + +## Common Protocol Patterns + +### UART (Asynchronous Serial) +- **Idle state**: HIGH +- **Start bit**: LOW (1 bit period) +- **Data bits**: 8 bits, LSB first +- **Stop bit**: HIGH (1-2 bit periods) +- **Common baud rates**: 9600, 19200, 38400, 57600, 115200 +- **Bit period calculation**: `1/baud_rate` seconds +- **Decoder**: `uart:baudrate=115200` (adjust baud rate) +- **Channel mapping**: `uart:baudrate=115200:tx=D0:rx=D1` + +### SPI (Serial Peripheral Interface) +- **4 signals**: SCLK (clock), MOSI (master out), MISO (master in), CS (chip select) +- **Clock polarity (CPOL)**: Idle clock state (0=LOW, 1=HIGH) +- **Clock phase (CPHA)**: Sample edge (0=leading, 1=trailing) +- **Data**: Sampled on clock edges, typically 8 bits per transaction +- **Decoder**: `spi:cpol=0:cpha=0:clk=D0:mosi=D1:miso=D2:cs=D3` + +### I2C (Inter-Integrated Circuit) +- **2 signals**: SDA (data), SCL (clock) +- **Idle state**: Both HIGH (pulled up) +- **Start condition**: SDA falls while SCL is HIGH +- **Stop condition**: SDA rises while SCL is HIGH +- **Data**: 8 bits + ACK/NACK, MSB first +- **Address**: 7-bit (first byte after START) +- **Decoder**: `i2c:scl=D0:sda=D1` +- **Stacked**: `i2c,eeprom24xx` to decode EEPROM commands + +### CAN (Controller Area Network) +- **Single differential bus**: CANH/CANL (or single-ended capture) +- **Common bitrates**: 125kbps, 250kbps, 500kbps, 1Mbps +- **Frame**: SOF, arbitration ID, control, data, CRC, ACK, EOF +- **Decoder**: `can:bitrate=500000` + +### 1-Wire +- **Single signal**: DQ (data/power) +- **Idle state**: HIGH (pulled up) +- **Reset pulse**: Master pulls LOW for 480us minimum +- **Presence pulse**: Slave responds LOW for 60-240us +- **Write 0**: LOW for 60-120us +- **Write 1**: LOW for 1-15us, then release +- **Decoder**: `onewire_link` (link layer), `onewire_network` (network layer) + +### JTAG +- **4-5 signals**: TCK, TMS, TDI, TDO, (TRST) +- **Decoder**: `jtag:tck=D0:tms=D1:tdi=D2:tdo=D3` +- **Stacked**: `jtag,jtag_stm32` for STM32-specific decoding + +### USB +- **2 signals**: D+, D- +- **Decoder**: `usb_signalling:dp=D0:dm=D1` +- **Stacked**: `usb_signalling,usb_packet,usb_request` for full USB decode + +## Analysis Workflow + +### Step 1: Get File Overview +```bash +python3 skills/sigrok/analyze_capture.py capture.sr --show +``` +Check channels, sample rate, and duration. + +### Step 2: Run Timing Analysis +```bash +python3 skills/sigrok/analyze_capture.py capture.sr --clusters --histogram +``` +Look at timing distributions to identify the protocol. + +### Step 3: Try Protocol Decoders +Based on timing analysis, try appropriate decoders: +```bash +# If UART is guessed +python3 skills/sigrok/analyze_capture.py capture.sr --decode uart:baudrate=115200 + +# If SPI-like timing +python3 skills/sigrok/analyze_capture.py capture.sr --decode spi:cpol=0:cpha=0 + +# If I2C-like timing +python3 skills/sigrok/analyze_capture.py capture.sr --decode i2c +``` + +### Step 4: Extract Data +```bash +# Get decoded text +python3 skills/sigrok/analyze_capture.py capture.sr --decode uart:baudrate=115200 --annotations uart=tx-data + +# Extract raw bytes +python3 skills/sigrok/analyze_capture.py capture.sr --decode uart:baudrate=115200 --binary-decode uart=tx --binary-out decoded.bin + +# Export timing data for external tools +python3 skills/sigrok/analyze_capture.py capture.sr --export timing.csv +``` + +## CTF Tips + +1. **Unknown protocol**: Start with `--clusters --histogram` to see timing distribution +2. **Try the helper script first**: `python3 skills/sigrok/analyze_capture.py capture.sr` gives automatic protocol guesses +3. **Multiple channels**: Use `--show` to see available channels, then `--channel D2` to analyze each +4. **Stacked decoders**: Use `i2c,eeprom24xx` or `spi,spiflash` to decode higher-level protocols +5. **Inverted signals**: Some captures have inverted logic — check initial state +6. **Binary extraction**: Use `--binary-decode` to get raw bytes for further analysis with `xxd` or `binwalk` +7. **Custom baud rate**: If standard rates don't match, calculate from timing clusters: `baud = 1e6 / cluster_us` +8. **List decoders**: Run `--list-decoders` to see all 131+ available decoders + +## Troubleshooting + +### "sigrok-cli not found" +Verify installation: +```bash +sigrok-cli --version +``` +Install if missing (see Prerequisites section above). + +### Missing DLL error on Windows +sigrok-cli depends on the Visual C++ 2010 Redistributable. Install it with: +```bash +winget install --id=Microsoft.VCRedist.2010.x64 +``` + +### "No transitions detected" +- Signal may be constant (stuck high/low) +- Check if the correct channel is selected with `--channel` +- Use `--show` to verify file has data + +### Decoder produces no output +- Check channel mapping: `uart:baudrate=115200:tx=D0` +- Verify baud rate/bitrate matches the signal +- Try without annotation filter first +- Use `--samplenum` to verify decoder is processing samples + +### Wrong timing values +- Check if file uses correct timescale (especially VCD files) +- Verify sample rate shown by `--show` +- For CSV files, ensure the time column is in seconds + +### "Unsupported format" +- Supported: `.sr`, `.csv`, `.vcd` +- For Saleae `.bin` files, use the `logicmso` skill instead +- Convert other formats to CSV or VCD first diff --git a/skills/sigrok/analyze_capture.py b/skills/sigrok/analyze_capture.py new file mode 100644 index 0000000..9c04f90 --- /dev/null +++ b/skills/sigrok/analyze_capture.py @@ -0,0 +1,1092 @@ +#!/usr/bin/env python3 +""" +Capture Analyzer for sigrok-compatible logic analyzer files. + +Analyzes digital signal captures (.sr, .csv, .vcd) to identify timing +patterns, decode protocols using sigrok-cli's 131+ decoders, and help +with hardware reverse engineering and CTF challenges. +""" + +import argparse +import re +import subprocess +import sys +import zipfile +from dataclasses import dataclass, field +from pathlib import Path +from typing import Dict, List, Optional, Tuple + +import numpy as np + +# Add parent directory to path for shared module imports +sys.path.insert(0, str(Path(__file__).resolve().parent.parent)) +from common.signal_analysis import ( + analyze_timing as _analyze_timing, + detect_clusters, + export_transitions_csv, + format_duration, + guess_protocol, + parse_sample_rate, + print_histogram, +) + + +# ============================================================================ +# CONSTANTS +# ============================================================================ + +SUPPORTED_EXTENSIONS = {'.sr', '.csv', '.vcd'} + +VCD_TIMESCALE_UNITS = { + 's': 1.0, + 'ms': 1e-3, + 'us': 1e-6, + 'ns': 1e-9, + 'ps': 1e-12, + 'fs': 1e-15, +} + + +# ============================================================================ +# DATA STRUCTURES +# ============================================================================ + +@dataclass +class CaptureData: + """Normalized capture data from any supported format.""" + times: np.ndarray # Transition timestamps in seconds + initial_state: int # Starting logic level (0 or 1) + sample_rate: float # Hz (estimated if not available) + channel_name: str # Name of the analyzed channel + file_format: str # 'sr', 'csv', or 'vcd' + duration: float = 0.0 # Total capture duration in seconds + total_samples: int = 0 # Total sample count (if known) + available_channels: List[str] = field(default_factory=list) + + +# ============================================================================ +# SIGROK-CLI INTERACTION +# ============================================================================ + +def check_sigrok_cli() -> Tuple[bool, str]: + """Check if sigrok-cli is installed and return version info.""" + try: + result = subprocess.run( + ['sigrok-cli', '--version'], + capture_output=True, + text=True, + timeout=10 + ) + if result.returncode == 0: + version = result.stdout.strip().split('\n')[0] + return True, version + return False, "sigrok-cli returned non-zero exit code" + except FileNotFoundError: + return False, "sigrok-cli not found in PATH" + except subprocess.TimeoutExpired: + return False, "sigrok-cli timed out" + except Exception as e: + return False, str(e) + + +def run_sigrok_cli(args: List[str], timeout: int = 60, + binary: bool = False) -> Tuple[int, str, str]: + """ + Run sigrok-cli with the given arguments. + + Returns (returncode, stdout, stderr). + """ + cmd = ['sigrok-cli'] + args + try: + result = subprocess.run( + cmd, + capture_output=True, + text=not binary, + timeout=timeout + ) + stdout = result.stdout if not binary else result.stdout.decode('latin-1') + stderr = result.stderr if not binary else result.stderr.decode('latin-1') + return result.returncode, stdout, stderr + except FileNotFoundError: + return 1, '', 'sigrok-cli not found' + except subprocess.TimeoutExpired: + return 1, '', 'sigrok-cli timed out' + + +def show_file_info(file_path: Path) -> Optional[dict]: + """ + Show capture file metadata using sigrok-cli --show. + + Returns dict with channels, sample_rate, duration, etc. + """ + rc, stdout, stderr = run_sigrok_cli(['-i', str(file_path), '--show']) + if rc != 0: + return None + + info = { + 'channels': [], + 'sample_rate': 0.0, + 'total_samples': 0, + 'duration': 0.0, + 'raw': stdout, + } + + for line in stdout.split('\n'): + line = line.strip() + if 'samplerate' in line.lower(): + info['sample_rate'] = parse_sample_rate(line) + elif 'total samples' in line.lower(): + match = re.search(r'(\d+)', line) + if match: + info['total_samples'] = int(match.group(1)) + elif line.startswith('D') or line.startswith('CH'): + info['channels'].append(line.split(':')[0].strip()) + + if info['sample_rate'] > 0 and info['total_samples'] > 0: + info['duration'] = info['total_samples'] / info['sample_rate'] + + return info + + +def list_decoders() -> List[Tuple[str, str]]: + """ + List available protocol decoders. + + Returns list of (name, description) tuples. + """ + rc, stdout, stderr = run_sigrok_cli(['-L']) + if rc != 0: + return [] + + decoders = [] + in_decoders = False + + for line in stdout.split('\n'): + if 'protocol decoder' in line.lower() or 'Supported protocol decoders' in line: + in_decoders = True + continue + if in_decoders: + if line.strip() == '' or line.startswith('Supported '): + if decoders: + break + continue + match = re.match(r'\s+(\S+)\s*-\s*(.*)', line) + if match: + decoders.append((match.group(1).strip(), match.group(2).strip())) + + return decoders + + +def decode_protocol(file_path: Path, decoder_spec: str, + annotations: str = None, + sample_numbers: bool = False) -> dict: + """ + Run sigrok-cli protocol decoder on a capture file. + + Args: + file_path: Path to .sr, .csv, or .vcd file + decoder_spec: Decoder specification, e.g. "uart:baudrate=115200" + or "i2c,eeprom24xx" for stacked decoders + annotations: Annotation filter, e.g. "uart=tx-data" + sample_numbers: Include sample numbers in output + + Returns dict with 'success', 'output', and 'error' keys. + """ + args = ['-i', str(file_path), '-P', decoder_spec] + + if annotations: + args.extend(['-A', annotations]) + if sample_numbers: + args.append('--protocol-decoder-samplenum') + + rc, stdout, stderr = run_sigrok_cli(args) + + return { + 'success': rc == 0, + 'output': stdout, + 'error': stderr if rc != 0 else None, + 'decoder': decoder_spec, + } + + +def extract_binary(file_path: Path, decoder_spec: str, + binary_spec: str, output_path: Path) -> dict: + """ + Extract binary data from protocol decoder to a file. + + Args: + file_path: Input capture file + decoder_spec: e.g. "uart:baudrate=115200" + binary_spec: e.g. "uart=tx" + output_path: Where to write binary output + + Returns dict with 'success' and 'error' keys. + """ + cmd = [ + 'sigrok-cli', '-i', str(file_path), + '-P', decoder_spec, + '-B', binary_spec, + ] + try: + with open(output_path, 'wb') as f: + result = subprocess.run( + cmd, + stdout=f, + stderr=subprocess.PIPE, + text=False, + timeout=60 + ) + if result.returncode == 0: + size = output_path.stat().st_size + return {'success': True, 'size': size, 'error': None} + else: + return { + 'success': False, + 'size': 0, + 'error': result.stderr.decode('utf-8', errors='replace'), + } + except Exception as e: + return {'success': False, 'size': 0, 'error': str(e)} + + +# ============================================================================ +# FILE LOADING +# ============================================================================ + +def load_capture(file_path: Path, channel: str = None) -> CaptureData: + """ + Load a capture file and return normalized transition data. + + Supports .sr (native parsing, with sigrok-cli fallback), .csv, and .vcd. + """ + if not file_path.exists(): + raise FileNotFoundError(f"File not found: {file_path}") + + suffix = file_path.suffix.lower() + if suffix not in SUPPORTED_EXTENSIONS: + raise ValueError( + f"Unsupported format: {suffix} " + f"(supported: {', '.join(SUPPORTED_EXTENSIONS)})" + ) + + if suffix == '.sr': + return _load_sr(file_path, channel) + elif suffix == '.csv': + return _load_csv(file_path, channel) + elif suffix == '.vcd': + return _load_vcd(file_path, channel) + + +def _load_sr(file_path: Path, channel: str = None) -> CaptureData: + """Load .sr file, trying native parsing first, falling back to sigrok-cli.""" + # Try native parsing first (no sigrok-cli dependency) + try: + return _load_sr_native(file_path, channel) + except Exception as native_err: + pass + + # Fall back to sigrok-cli CSV export + available, _ = check_sigrok_cli() + if not available: + raise RuntimeError( + f"Native .sr parsing failed ({native_err}), " + f"and sigrok-cli is not installed. " + f"Install sigrok-cli or check the .sr file." + ) + + info = show_file_info(file_path) + args = ['-i', str(file_path), '-O', 'csv'] + if channel: + args.extend(['-C', channel]) + + rc, stdout, stderr = run_sigrok_cli(args, timeout=120) + if rc != 0: + raise RuntimeError(f"sigrok-cli export failed: {stderr}") + + return _parse_sigrok_csv( + stdout, + channel=channel, + file_format='sr', + sample_rate=info['sample_rate'] if info else 0.0, + available_channels=info['channels'] if info else [], + ) + + +def _load_sr_native(file_path: Path, channel: str = None) -> CaptureData: + """Parse .sr file natively without sigrok-cli.""" + with zipfile.ZipFile(file_path, 'r') as zf: + meta = _parse_sr_metadata(zf) + + sample_rate = meta['samplerate'] + if sample_rate <= 0: + raise ValueError("No sample rate found in .sr metadata") + + unitsize = meta['unitsize'] + total_probes = meta['total_probes'] + capturefile = meta['capturefile'] + probe_names = meta['probes'] + + available_channels = [ + probe_names.get(i, f"D{i}") for i in range(total_probes) + ] + + # Determine which channel to extract + channel_idx, channel_name = _resolve_sr_channel( + channel, probe_names, total_probes + ) + + # Read logic data chunks + packed_data = _read_sr_logic_data(zf, capturefile) + + if len(packed_data) == 0: + raise ValueError("No logic data found in .sr file") + + # Extract transitions for the selected channel + times, initial_state = _extract_channel_transitions( + packed_data, channel_idx, unitsize, sample_rate + ) + + total_samples = len(packed_data) // unitsize + duration = total_samples / sample_rate + + return CaptureData( + times=times, + initial_state=initial_state, + sample_rate=sample_rate, + channel_name=channel_name, + file_format='sr', + duration=duration, + total_samples=total_samples, + available_channels=available_channels, + ) + + +def _parse_sr_metadata(zf: zipfile.ZipFile) -> Dict: + """Parse the metadata file from a sigrok .sr ZIP archive.""" + try: + raw = zf.read('metadata').decode('utf-8') + except KeyError: + raise ValueError("No 'metadata' file found in .sr archive") + + meta = { + 'samplerate': 0.0, + 'total_probes': 0, + 'unitsize': 1, + 'capturefile': 'logic-1', + 'probes': {}, # index -> name + } + + for line in raw.split('\n'): + line = line.strip() + if not line or line.startswith('[') or line.startswith(';'): + continue + + if '=' not in line: + continue + + key, _, value = line.partition('=') + key = key.strip().lower() + value = value.strip() + + if key == 'samplerate': + # Can be "24000000" or "24 MHz" + rate = parse_sample_rate(value) + if rate > 0: + meta['samplerate'] = rate + else: + try: + meta['samplerate'] = float(value) + except ValueError: + pass + elif key == 'total probes': + try: + meta['total_probes'] = int(value) + except ValueError: + pass + elif key == 'unitsize': + try: + meta['unitsize'] = int(value) + except ValueError: + pass + elif key == 'capturefile': + meta['capturefile'] = value + elif re.match(r'^probe(\d+)$', key): + idx = int(re.match(r'^probe(\d+)$', key).group(1)) - 1 # 0-based + meta['probes'][idx] = value + + if meta['total_probes'] == 0 and meta['probes']: + meta['total_probes'] = max(meta['probes'].keys()) + 1 + + return meta + + +def _read_sr_logic_data(zf: zipfile.ZipFile, capturefile: str) -> bytes: + """Read and concatenate logic data chunks from .sr archive.""" + names = sorted(zf.namelist()) + chunks = [] + + # Look for chunked files: logic-1-1, logic-1-2, ... or just logic-1 + pattern = re.compile(re.escape(capturefile) + r'(-\d+)?$') + data_files = sorted( + [n for n in names if pattern.match(n)], + key=lambda n: ( + int(re.search(r'-(\d+)$', n).group(1)) + if re.search(r'-(\d+)$', n) and n != capturefile + else 0 + ), + ) + + if not data_files: + raise ValueError( + f"No logic data files matching '{capturefile}' in .sr archive" + ) + + for name in data_files: + chunks.append(zf.read(name)) + + return b''.join(chunks) + + +def _resolve_sr_channel( + channel: Optional[str], + probe_names: Dict[int, str], + total_probes: int, +) -> Tuple[int, str]: + """Resolve a channel specifier to (index, name).""" + if channel is None: + idx = 0 + return idx, probe_names.get(idx, 'D0') + + # Try exact match on probe names + for i, name in probe_names.items(): + if name == channel: + return i, name + + # Try D0, D1, ... format + match = re.match(r'^D(\d+)$', channel, re.IGNORECASE) + if match: + idx = int(match.group(1)) + if idx < total_probes: + return idx, probe_names.get(idx, f'D{idx}') + + # Try numeric index + try: + idx = int(channel) + if 0 <= idx < total_probes: + return idx, probe_names.get(idx, f'D{idx}') + except ValueError: + pass + + available = [probe_names.get(i, f'D{i}') for i in range(total_probes)] + raise ValueError( + f"Channel '{channel}' not found. Available: {available}" + ) + + +def _extract_channel_transitions( + packed_data: bytes, + channel_idx: int, + unitsize: int, + sample_rate: float, +) -> Tuple[np.ndarray, int]: + """ + Extract transition timestamps for a single channel from packed binary data. + + Each sample is `unitsize` bytes. Channel N = bit N (LSB-first within bytes). + Returns (transition_times_array, initial_state). + """ + # Determine which byte and bit within that byte + byte_offset = channel_idx // 8 + bit_mask = 1 << (channel_idx % 8) + + if byte_offset >= unitsize: + raise ValueError( + f"Channel index {channel_idx} exceeds unitsize {unitsize} " + f"({unitsize * 8} bits available)" + ) + + total_samples = len(packed_data) // unitsize + + # Convert to numpy array for efficient processing + raw = np.frombuffer(packed_data, dtype=np.uint8) + + # Extract the relevant byte for each sample (stride = unitsize) + channel_bytes = raw[byte_offset::unitsize][:total_samples] + + # Extract the single bit + channel_bits = (channel_bytes & bit_mask).astype(bool).astype(np.uint8) + + initial_state = int(channel_bits[0]) + + # Find transitions (where consecutive samples differ) + transitions = np.where(np.diff(channel_bits) != 0)[0] + 1 + + if len(transitions) == 0: + raise ValueError( + f"No transitions detected (signal constant at {initial_state})" + ) + + # Convert sample indices to timestamps + transition_times = transitions.astype(np.float64) / sample_rate + + return transition_times, initial_state + + +def _load_csv(file_path: Path, channel: str = None) -> CaptureData: + """Load a CSV file with time and signal columns.""" + with open(file_path, 'r') as f: + content = f.read() + return _parse_sigrok_csv(content, channel=channel, file_format='csv') + + +def _parse_sigrok_csv(content: str, channel: str = None, + file_format: str = 'csv', + sample_rate: float = 0.0, + available_channels: List[str] = None) -> CaptureData: + """ + Parse sigrok-style CSV content into CaptureData. + + Sigrok CSV format has a header comment section followed by: + Time [s],D0,D1,...,Dn + 0.000000000,0,1,...,0 + """ + if available_channels is None: + available_channels = [] + + lines = content.strip().split('\n') + + # Skip comment lines (sigrok CSV starts with ;) + data_lines = [] + header = None + for line in lines: + line = line.strip() + if not line: + continue + if line.startswith(';'): + # Extract sample rate from comments if present + if 'samplerate' in line.lower(): + rate = parse_sample_rate(line) + if rate > 0: + sample_rate = rate + continue + if header is None: + header = line + continue + data_lines.append(line) + + if header is None or not data_lines: + raise ValueError("CSV file is empty or has no data rows") + + # Parse header to find columns + columns = [c.strip() for c in header.split(',')] + + # Find the time column + time_col = None + for i, col in enumerate(columns): + if 'time' in col.lower(): + time_col = i + break + if time_col is None: + time_col = 0 # Assume first column is time + + # Find the channel column + signal_cols = [i for i in range(len(columns)) if i != time_col] + if not signal_cols: + raise ValueError("No signal columns found in CSV") + + # Determine which channel to use + channel_col = None + channel_name = None + + if channel: + # User specified a channel name or index + for i, col in enumerate(columns): + if col.strip() == channel or col.strip() == f'D{channel}': + channel_col = i + channel_name = col.strip() + break + if channel_col is None: + try: + idx = int(channel) + if idx < len(signal_cols): + channel_col = signal_cols[idx] + channel_name = columns[channel_col].strip() + except ValueError: + pass + if channel_col is None: + raise ValueError( + f"Channel '{channel}' not found. " + f"Available: {[columns[i].strip() for i in signal_cols]}" + ) + else: + # Default to first signal channel + channel_col = signal_cols[0] + channel_name = columns[channel_col].strip() + + if not available_channels: + available_channels = [columns[i].strip() for i in signal_cols] + + # Parse data rows and extract transitions + times = [] + states = [] + + for line in data_lines: + parts = line.split(',') + if len(parts) <= max(time_col, channel_col): + continue + try: + t = float(parts[time_col].strip()) + s = int(parts[channel_col].strip()) + times.append(t) + states.append(s) + except (ValueError, IndexError): + continue + + if not times: + raise ValueError("No valid data rows found in CSV") + + # Extract only transition points (where signal changes) + initial_state = states[0] + transition_times = [] + + for i in range(1, len(times)): + if states[i] != states[i - 1]: + transition_times.append(times[i]) + + if not transition_times: + raise ValueError( + "No transitions detected on channel " + f"'{channel_name}' (signal constant at {initial_state})" + ) + + # Estimate sample rate from data if not known (use median for robustness) + if sample_rate == 0.0 and len(times) > 2: + intervals = np.diff(times[:min(1000, len(times))]) + dt = float(np.median(intervals)) + if dt > 0: + sample_rate = 1.0 / dt + elif sample_rate == 0.0 and len(times) > 1: + dt = times[1] - times[0] + if dt > 0: + sample_rate = 1.0 / dt + + duration = times[-1] - times[0] if len(times) > 1 else 0.0 + + return CaptureData( + times=np.array(transition_times), + initial_state=initial_state, + sample_rate=sample_rate, + channel_name=channel_name, + file_format=file_format, + duration=duration, + total_samples=len(times), + available_channels=available_channels, + ) + + +def _load_vcd(file_path: Path, channel: str = None) -> CaptureData: + """Parse a VCD (Value Change Dump) file.""" + with open(file_path, 'r') as f: + content = f.read() + + # Parse header + timescale = 1e-9 # Default: 1ns + variables = {} # var_id -> name + + lines = content.split('\n') + data_start = 0 + + for i, line in enumerate(lines): + line = line.strip() + + if '$timescale' in line: + # Support both integer and float values (e.g. 1ns, 100ps, 1.0ns) + match = re.search(r'(\d+(?:\.\d+)?)\s*(s|ms|us|ns|ps|fs)', line) + if match: + val = float(match.group(1)) + unit = match.group(2) + timescale = val * VCD_TIMESCALE_UNITS.get(unit, 1e-9) + + elif '$var' in line: + # Format: $var wire 1 ! D0 $end + match = re.match( + r'\$var\s+\w+\s+\d+\s+(\S+)\s+(\S+)', line + ) + if match: + var_id = match.group(1) + var_name = match.group(2) + variables[var_id] = var_name + + elif '$enddefinitions' in line: + data_start = i + 1 + break + + if not variables: + raise ValueError("No variables found in VCD file") + + available_channels = list(variables.values()) + + # Determine target variable + target_id = None + target_name = None + + if channel: + # Find by name or ID + for vid, vname in variables.items(): + if vname == channel or vid == channel: + target_id = vid + target_name = vname + break + if target_id is None: + raise ValueError( + f"Channel '{channel}' not found. " + f"Available: {available_channels}" + ) + else: + # Default to first variable + target_id = list(variables.keys())[0] + target_name = variables[target_id] + + # Parse data section + current_time = 0.0 + transitions = [] # (time_seconds, state) + + for line in lines[data_start:]: + line = line.strip() + if not line: + continue + + if line.startswith('#'): + try: + current_time = int(line[1:]) * timescale + except ValueError: + continue + + elif line.startswith('0') or line.startswith('1'): + # Single-bit value change: 0! or 1! + state = int(line[0]) + var_id = line[1:] + if var_id == target_id: + transitions.append((current_time, state)) + + elif line.startswith('b'): + # Multi-bit value: bXXXX var_id + parts = line.split() + if len(parts) == 2 and parts[1] == target_id: + try: + state = int(parts[0][1:], 2) & 1 # Take LSB + transitions.append((current_time, state)) + except ValueError: + continue + + if not transitions: + raise ValueError( + f"No transitions found for channel '{target_name}'" + ) + + initial_state = transitions[0][1] + + # Filter to only actual transitions (state changes) + transition_times = [] + prev_state = initial_state + for t, s in transitions[1:]: + if s != prev_state: + transition_times.append(t) + prev_state = s + + if not transition_times: + raise ValueError( + f"No state changes on channel '{target_name}' " + f"(constant at {initial_state})" + ) + + duration = transitions[-1][0] - transitions[0][0] + + return CaptureData( + times=np.array(transition_times), + initial_state=initial_state, + sample_rate=1.0 / timescale, + channel_name=target_name, + file_format='vcd', + duration=duration, + total_samples=len(transitions), + available_channels=available_channels, + ) + + +# ============================================================================ +# ANALYSIS WRAPPERS +# ============================================================================ + +def analyze_timing(data: CaptureData) -> dict: + """Analyze timing characteristics of the signal.""" + return _analyze_timing(data.times, data.initial_state, data.duration) + + +def export_csv(data: CaptureData, output_path: Path): + """Export transitions to CSV file.""" + export_transitions_csv(data.times, data.initial_state, output_path) + + +# ============================================================================ +# CLI +# ============================================================================ + +def main(): + parser = argparse.ArgumentParser( + description="Analyze logic analyzer captures using sigrok", + epilog=( + "Examples:\n" + " %(prog)s capture.sr\n" + " %(prog)s capture.sr --histogram --clusters\n" + " %(prog)s capture.sr --decode uart:baudrate=115200\n" + " %(prog)s capture.sr --decode i2c,eeprom24xx\n" + " %(prog)s capture.sr --export transitions.csv\n" + " %(prog)s --list-decoders\n" + ), + formatter_class=argparse.RawDescriptionHelpFormatter, + ) + parser.add_argument("file", type=Path, nargs='?', + help="Capture file (.sr, .csv, .vcd)") + + # Timing analysis options + timing = parser.add_argument_group('timing analysis') + timing.add_argument("--histogram", action="store_true", + help="Show timing histogram") + timing.add_argument("--bins", type=int, default=20, + help="Number of histogram bins (default: 20)") + timing.add_argument("--clusters", action="store_true", + help="Show detected timing clusters") + timing.add_argument("--raw", action="store_true", + help="Show raw transition durations") + timing.add_argument("-n", type=int, default=20, + help="Number of raw values to show (default: 20)") + + # Channel selection + parser.add_argument("--channel", "-C", type=str, default=None, + help="Channel to analyze (name or index, default: first)") + + # Export options + export = parser.add_argument_group('export') + export.add_argument("--export", type=Path, metavar="CSV", + help="Export transitions to CSV file") + export.add_argument("--binary-out", type=Path, metavar="FILE", + help="Extract binary data to file (use with --decode)") + + # Protocol decoding + decoding = parser.add_argument_group('protocol decoding') + decoding.add_argument("--decode", "-P", type=str, metavar="DECODER", + help="Protocol decoder spec (e.g., uart:baudrate=115200)") + decoding.add_argument("--annotations", "-A", type=str, metavar="FILTER", + help="Filter decoder annotations (e.g., uart=tx-data)") + decoding.add_argument("--binary-decode", "-B", type=str, metavar="SPEC", + help="Binary output spec (e.g., uart=tx)") + decoding.add_argument("--samplenum", action="store_true", + help="Include sample numbers in decoder output") + + # Info commands + info = parser.add_argument_group('information') + info.add_argument("--list-decoders", action="store_true", + help="List available protocol decoders") + info.add_argument("--show", action="store_true", + help="Show capture file metadata") + + args = parser.parse_args() + + # Handle --list-decoders (no file required) + if args.list_decoders: + available, version = check_sigrok_cli() + if not available: + print(f"Error: {version}") + _print_install_help() + sys.exit(1) + + decoders = list_decoders() + if decoders: + print(f"Available Protocol Decoders ({len(decoders)}):") + print("-" * 60) + for name, desc in decoders: + print(f" {name:25s} {desc}") + else: + print("No decoders found (check sigrok-cli installation)") + sys.exit(0) + + # All other commands require a file + if args.file is None: + parser.error("the following arguments are required: file") + + if not args.file.exists(): + print(f"Error: File not found: {args.file}") + sys.exit(1) + + # Commands that require sigrok-cli: --show, --decode + needs_sigrok = args.show or args.decode + + if needs_sigrok: + available, version = check_sigrok_cli() + if not available: + print(f"Error: {version}") + _print_install_help() + sys.exit(1) + + # Handle --show + if args.show: + info = show_file_info(args.file) + if info: + print(f"File: {args.file}") + print(info['raw']) + else: + print(f"Error: Could not read file info for {args.file}") + sys.exit(1) + sys.exit(0) + + # Handle protocol decoding + if args.decode: + # Binary extraction + if args.binary_out and args.binary_decode: + result = extract_binary( + args.file, args.decode, args.binary_decode, args.binary_out + ) + if result['success']: + print(f"Extracted {result['size']} bytes to {args.binary_out}") + else: + print(f"Error: {result['error']}") + sys.exit(1) + sys.exit(0) + + # Annotation decoding + result = decode_protocol( + args.file, args.decode, + annotations=args.annotations, + sample_numbers=args.samplenum, + ) + if result['success']: + if result['output']: + print(result['output'], end='') + else: + print("No decoder output (check decoder spec and channel mapping)") + else: + print(f"Error decoding: {result['error']}") + sys.exit(1) + + # If no timing analysis flags, stop here + if not (args.histogram or args.clusters or args.raw or args.export): + sys.exit(0) + + # Load capture for timing analysis + try: + data = load_capture(args.file, channel=args.channel) + except Exception as e: + print(f"Error loading file: {e}") + sys.exit(1) + + analysis = analyze_timing(data) + + if 'error' in analysis: + print(f"Error: {analysis['error']}") + sys.exit(1) + + # Print basic info + print(f"File: {args.file}") + print(f"Format: {data.file_format}") + print(f"Channel: {data.channel_name}") + if data.sample_rate > 0: + if data.sample_rate >= 1e6: + print(f"Sample rate: {data.sample_rate/1e6:.1f} MHz") + elif data.sample_rate >= 1e3: + print(f"Sample rate: {data.sample_rate/1e3:.1f} kHz") + else: + print(f"Sample rate: {data.sample_rate:.1f} Hz") + print(f"Capture duration: {analysis['capture_duration_s']:.3f}s") + print(f"Signal duration: {analysis['signal_duration_s']:.3f}s") + print(f"Initial state: {analysis['initial_state']}") + print(f"Total transitions: {analysis['total_transitions']}") + if data.available_channels: + print(f"Available channels: {', '.join(data.available_channels)}") + print() + + # Timing summary + print("Timing Summary") + print("-" * 40) + a = analysis['all'] + print(f"All durations: min={format_duration(a['min_us'])} " + f"max={format_duration(a['max_us'])} " + f"mean={format_duration(a['mean_us'])}") + + h = analysis['high'] + print(f"HIGH pulses ({h['count']}): min={format_duration(h['min_us'])} " + f"max={format_duration(h['max_us'])} " + f"mean={format_duration(h['mean_us'])}") + + lo = analysis['low'] + print(f"LOW gaps ({lo['count']}): min={format_duration(lo['min_us'])} " + f"max={format_duration(lo['max_us'])} " + f"mean={format_duration(lo['mean_us'])}") + print() + + # Protocol guesses + guesses = guess_protocol(analysis) + if guesses: + print("Protocol Guesses") + print("-" * 40) + for name, confidence, details in guesses: + print(f" {name} ({confidence*100:.0f}% confidence)") + print(f" {details}") + print() + + # Clusters + if args.clusters: + print("Detected Timing Clusters") + print("-" * 40) + + high_clusters = detect_clusters(analysis['high_durations_us']) + print("HIGH pulse clusters:") + for center, count in high_clusters[:5]: + print(f" ~{format_duration(center)} ({count} occurrences)") + + low_clusters = detect_clusters(analysis['low_durations_us']) + print("LOW gap clusters:") + for center, count in low_clusters[:5]: + print(f" ~{format_duration(center)} ({count} occurrences)") + print() + + # Raw values + if args.raw: + print(f"First {args.n} Transitions") + print("-" * 40) + durations = analysis['durations_us'] + initial = 0 if analysis['initial_state'] == 'LOW' else 1 + for i in range(min(args.n, len(durations))): + state = "HIGH" if (i + initial) % 2 == 0 else "LOW" + dur = durations[i] + print(f" [{i:3d}] {state}: {format_duration(dur)}") + print() + + # Histogram + if args.histogram: + print_histogram(analysis['durations_us'], bins=args.bins, + title="All Durations") + print_histogram(analysis['high_durations_us'], bins=args.bins, + title="HIGH Pulse Durations") + print_histogram(analysis['low_durations_us'], bins=args.bins, + title="LOW Gap Durations") + + # Export + if args.export: + export_csv(data, args.export) + + +def _print_install_help(): + """Print installation instructions for sigrok-cli.""" + print("\nInstall sigrok-cli:") + print(" Arch Linux: sudo pacman -S sigrok-cli") + print(" Ubuntu: sudo apt install sigrok-cli") + print(" Fedora: sudo dnf install sigrok-cli") + print(" macOS: brew install sigrok-cli") + print(" Windows: Download from https://sigrok.org/wiki/Downloads") + print(" If you get a missing DLL error, install the VC++ 2010 runtime:") + print(" winget install --id=Microsoft.VCRedist.2010.x64") + + +if __name__ == "__main__": + main() diff --git a/skills/sigrok/examples.md b/skills/sigrok/examples.md new file mode 100644 index 0000000..5c530c9 --- /dev/null +++ b/skills/sigrok/examples.md @@ -0,0 +1,346 @@ +# Sigrok Capture Analysis Examples + +## Example 1: Unknown Protocol Analysis + +**Scenario**: You have a captured signal from an unknown IoT device and need to identify the protocol. + +### Step 1: Check the file + +```bash +python3 skills/sigrok/analyze_capture.py capture.sr --show +``` + +Output: +``` +File: capture.sr +Channels: D0, D1, D2, D3 +Sample rate: 24.0 MHz +Total samples: 2400000 +Duration: 0.100s +``` + +### Step 2: Get an overview + +```bash +python3 skills/sigrok/analyze_capture.py capture.sr +``` + +This shows basic timing statistics and automatic protocol guesses. + +### Step 3: Look at timing distribution + +```bash +python3 skills/sigrok/analyze_capture.py capture.sr --histogram --clusters +``` + +Look for distinct timing clusters — these help identify the protocol: +- **2-3 clusters** → likely UART or 1-Wire +- **Very regular clusters** → likely SPI or I2C (clock-based) +- **Wide spread** → likely analog or mixed protocol + +### Step 4: Try the suggested decoder + +```bash +# If UART was guessed at 115200 baud +python3 skills/sigrok/analyze_capture.py capture.sr --decode uart:baudrate=115200 +``` + +### Step 5: Export for further analysis + +```bash +python3 skills/sigrok/analyze_capture.py capture.sr --export transitions.csv +``` + +--- + +## Example 2: UART Signal Decoding + +**Scenario**: You captured UART communication from an IoT device's debug port. + +### Identify UART characteristics + +```bash +python3 skills/sigrok/analyze_capture.py uart_capture.sr --clusters +``` + +Look for: +- Signal idles HIGH +- Consistent bit period clusters +- Durations are multiples of the base bit period + +### Decode at a specific baud rate + +```bash +# Standard 115200 baud +python3 skills/sigrok/analyze_capture.py uart_capture.sr \ + --decode uart:baudrate=115200 + +# With channel mapping (TX on D0, RX on D1) +python3 skills/sigrok/analyze_capture.py uart_capture.sr \ + --decode uart:baudrate=115200:tx=D0:rx=D1 +``` + +### Extract only the transmitted data + +```bash +# Text annotations +python3 skills/sigrok/analyze_capture.py uart_capture.sr \ + --decode uart:baudrate=115200 \ + --annotations uart=tx-data + +# Raw binary output +python3 skills/sigrok/analyze_capture.py uart_capture.sr \ + --decode uart:baudrate=115200 \ + --binary-decode uart=tx \ + --binary-out uart_tx.bin + +# Inspect the binary +xxd uart_tx.bin | head -20 +``` + +### Custom baud rate detection + +If standard rates don't work, use timing analysis to find the baud rate: + +```bash +python3 skills/sigrok/analyze_capture.py uart_capture.sr --clusters --raw -n 30 +``` + +The smallest timing cluster represents the bit period. Calculate: +``` +baud_rate = 1,000,000 / cluster_us +``` + +For example, if the smallest cluster is ~8.7us: `1000000 / 8.7 = 114943` → try 115200 baud. + +--- + +## Example 3: SPI Analysis with Stacked Decoders + +**Scenario**: Multi-channel SPI capture with clock and data lines. + +### Identify SPI signals + +First check which channels are present: +```bash +python3 skills/sigrok/analyze_capture.py spi_capture.sr --show +``` + +Analyze each channel to identify clock vs data: +```bash +# Check D0 (likely CLK - should be very regular) +python3 skills/sigrok/analyze_capture.py spi_capture.sr --channel D0 --clusters + +# Check D1 (MOSI - data, less regular) +python3 skills/sigrok/analyze_capture.py spi_capture.sr --channel D1 --clusters +``` + +The channel with the most regular timing is the clock (SCLK). + +### Decode SPI + +```bash +# Basic SPI decode with channel mapping +python3 skills/sigrok/analyze_capture.py spi_capture.sr \ + --decode spi:clk=D0:mosi=D1:miso=D2:cs=D3 + +# With clock polarity/phase options +python3 skills/sigrok/analyze_capture.py spi_capture.sr \ + --decode spi:clk=D0:mosi=D1:miso=D2:cs=D3:cpol=0:cpha=0 +``` + +### Stacked decoder: SPI Flash + +```bash +# Decode SPI flash commands (read, write, erase) +python3 skills/sigrok/analyze_capture.py spi_capture.sr \ + --decode spi:clk=D0:mosi=D1:miso=D2:cs=D3,spiflash + +# Show only flash data +python3 skills/sigrok/analyze_capture.py spi_capture.sr \ + --decode spi:clk=D0:mosi=D1:miso=D2:cs=D3,spiflash \ + --annotations spiflash +``` + +--- + +## Example 4: I2C Address Discovery + +**Scenario**: Captured I2C communication and need to find device addresses. + +### Decode I2C + +```bash +# Basic I2C decode (SCL on D0, SDA on D1) +python3 skills/sigrok/analyze_capture.py i2c_capture.sr \ + --decode i2c:scl=D0:sda=D1 +``` + +Output shows START, address, R/W, data bytes, ACK/NACK, STOP. + +### Stacked decoder: EEPROM + +```bash +# Decode I2C EEPROM read/write operations +python3 skills/sigrok/analyze_capture.py i2c_capture.sr \ + --decode i2c:scl=D0:sda=D1,eeprom24xx + +# Show only EEPROM data +python3 skills/sigrok/analyze_capture.py i2c_capture.sr \ + --decode i2c:scl=D0:sda=D1,eeprom24xx \ + --annotations eeprom24xx +``` + +### Other I2C stacked decoders + +```bash +# Temperature sensor (LM75) +--decode i2c:scl=D0:sda=D1,lm75 + +# Real-time clock (DS1307) +--decode i2c:scl=D0:sda=D1,ds1307 + +# Accelerometer (ADXL345) +--decode i2c:scl=D0:sda=D1,adxl345 +``` + +--- + +## Example 5: CAN Bus Analysis + +**Scenario**: Captured CAN bus traffic from a vehicle or industrial device. + +### Decode CAN frames + +```bash +# CAN at 500kbps +python3 skills/sigrok/analyze_capture.py can_capture.sr \ + --decode can:bitrate=500000 + +# CAN at 250kbps +python3 skills/sigrok/analyze_capture.py can_capture.sr \ + --decode can:bitrate=250000 +``` + +### Timing analysis for bitrate detection + +```bash +python3 skills/sigrok/analyze_capture.py can_capture.sr --clusters +``` + +The smallest cluster corresponds to the bit period: +- 2.0us → 500kbps +- 4.0us → 250kbps +- 8.0us → 125kbps +- 1.0us → 1Mbps + +--- + +## Example 6: JTAG Tap Detection + +**Scenario**: JTAG pins identified on a PCB, need to analyze the communication. + +### Decode JTAG + +```bash +# Basic JTAG decode +python3 skills/sigrok/analyze_capture.py jtag_capture.sr \ + --decode jtag:tck=D0:tms=D1:tdi=D2:tdo=D3 + +# STM32-specific JTAG +python3 skills/sigrok/analyze_capture.py jtag_capture.sr \ + --decode jtag:tck=D0:tms=D1:tdi=D2:tdo=D3,jtag_stm32 +``` + +### SWD (Serial Wire Debug) — alternative to JTAG + +```bash +# SWD decode (ARM Cortex-M) +python3 skills/sigrok/analyze_capture.py swd_capture.sr \ + --decode swd:swclk=D0:swdio=D1 +``` + +--- + +## Example 7: Working with Different File Formats + +### Sigrok session files (.sr) + +Created by PulseView or sigrok-cli capture: +```bash +python3 skills/sigrok/analyze_capture.py capture.sr +python3 skills/sigrok/analyze_capture.py capture.sr --decode uart:baudrate=9600 +``` + +### CSV files + +Exported from logic analyzers or other tools: +```bash +# Sigrok-format CSV (Time [s],D0,D1,...) +python3 skills/sigrok/analyze_capture.py export.csv + +# Analyze specific channel +python3 skills/sigrok/analyze_capture.py export.csv --channel D2 +``` + +### VCD files (Value Change Dump) + +Common output from simulators and some logic analyzers: +```bash +python3 skills/sigrok/analyze_capture.py waveform.vcd +python3 skills/sigrok/analyze_capture.py waveform.vcd --channel CLK +``` + +--- + +## Example 8: Combined Timing + Decoding Workflow + +**Scenario**: Full analysis workflow from unknown capture to decoded data. + +```bash +# Step 1: Overview +python3 skills/sigrok/analyze_capture.py mystery.sr --show + +# Step 2: Timing analysis on each channel +python3 skills/sigrok/analyze_capture.py mystery.sr --channel D0 --clusters +python3 skills/sigrok/analyze_capture.py mystery.sr --channel D1 --clusters + +# Step 3: Decode based on findings +python3 skills/sigrok/analyze_capture.py mystery.sr \ + --decode uart:baudrate=115200:tx=D0 + +# Step 4: Extract the data +python3 skills/sigrok/analyze_capture.py mystery.sr \ + --decode uart:baudrate=115200:tx=D0 \ + --binary-decode uart=tx \ + --binary-out decoded.bin + +# Step 5: Analyze decoded data +xxd decoded.bin +strings decoded.bin +``` + +--- + +## Useful sigrok-cli Direct Commands + +For advanced scenarios beyond the helper script: + +```bash +# List all supported protocol decoders with details +sigrok-cli -L + +# Show decoder options +sigrok-cli -P uart --show + +# Decode with multiple independent decoder stacks +sigrok-cli -i capture.sr -P uart:tx=D0:baudrate=115200 -P spi:clk=D2:mosi=D3 + +# Export to different formats +sigrok-cli -i capture.sr -O csv > data.csv +sigrok-cli -i capture.sr -O vcd > data.vcd +sigrok-cli -i capture.sr -O bits > data.txt + +# Continuous sampling to file +sigrok-cli -d fx2lafw --config samplerate=1m --samples 1m -o capture.sr +``` diff --git a/skills/urh/SKILL.md b/skills/urh/SKILL.md new file mode 100644 index 0000000..d7bdb9e --- /dev/null +++ b/skills/urh/SKILL.md @@ -0,0 +1,364 @@ +--- +name: urh +description: Analyze and decode RF signals using Universal Radio Hacker (URH). Supports OOK, ASK, FSK, PSK modulations and common IoT RF protocols including 433MHz/315MHz remotes, Zigbee, Z-Wave, and more. Use for CTF RF challenges and IoT device RF analysis. +--- + +# Universal Radio Hacker (URH) RF Signal Analysis + +This skill enables analysis and decoding of RF signal captures using Universal Radio Hacker (URH). It wraps `urh_cli` for command-line protocol analysis and provides a helper script for spectrum inspection, demodulation, and OOK/FSK signal decoding. URH supports OOK, ASK, FSK, and PSK modulations, making it the primary tool for reverse engineering 433MHz/315MHz remotes, Zigbee, Z-Wave, and arbitrary RF protocols in both CTF and IoT security contexts. + +## Prerequisites + +- `urh` Python package — **Do NOT blindly pip install.** First check if it's already installed: + ```bash + python3 -c "import urh; print('urh available')" + ``` + Only if that fails, install it. URH requires Cython to build its C extensions, so install that first: + ```bash + pip install cython + pip install urh + ``` + +- **Windows standalone installer**: URH may also be installed as a standalone app at + `C:\Program Files\Universal Radio Hacker\urh_cli.exe`. + **Important limitation**: The standalone installer's `urh_cli.exe` only supports live SDR + capture (`-rx`/`-tx`). It **cannot** analyze saved `.complex`/`.complex16s` files offline. + For offline file analysis use either: + - The URH GUI (`urh.exe`) — open the project file or drag-and-drop signal files + - The pip-installed `urh` Python package with `analyze_signal.py` (this skill's helper) + +- Verify CLI tool is accessible: + ```bash + urh_cli --version + ``` + If `urh_cli` is not found but `urh` imports correctly, run it as a module: + ```bash + python3 -m urh.cli.urh_cli --version + ``` + +- `numpy` Python package (for the helper script): + ```bash + python3 -c "import numpy; print('numpy available')" + ``` + Only if that fails: `pip install numpy` + +- **For recording with an SDR**: A compatible SDR device and its driver must be installed + (e.g., RTL-SDR via `rtl-sdr`, HackRF via `hackrf`). URH detects these automatically at runtime. + +## Supported File Formats + +| Format | Extension | Description | +|--------|-----------|-------------| +| Complex float32 | `.complex`, `.cfile` | GNU Radio / URH native IQ, 32-bit float pairs | +| Complex int16 (signed) | `.complex16s` | 16-bit signed integer IQ pairs (e.g., HackRF) | +| Complex int16 (unsigned) | `.complex16u` | 16-bit unsigned integer IQ pairs (e.g., RTL-SDR raw) | +| WAV | `.wav` | Audio WAV — stereo channel as IQ (L=I, R=Q) | + +All formats are auto-detected by file extension. + +## Quick Reference + +### Signal Inspection + +```bash +# Show signal metadata and statistics (sample count, duration, amplitude stats) +python3 skills/urh/analyze_signal.py signal.complex -s 2000000 + +# Show power spectrum (ASCII FFT) to identify modulation type +python3 skills/urh/analyze_signal.py signal.complex -s 2000000 --spectrum + +# Show signal energy over time to identify packet boundaries +python3 skills/urh/analyze_signal.py signal.complex -s 2000000 --energy + +# Show first N raw IQ samples +python3 skills/urh/analyze_signal.py signal.complex -s 2000000 --raw -n 20 +``` + +### Demodulation and Pulse Analysis + +```bash +# Demodulate as OOK (most common for 433MHz remotes) +python3 skills/urh/analyze_signal.py signal.complex -s 2000000 --demodulate ook + +# Demodulate as FSK (common for IoT sensors, Z-Wave) +python3 skills/urh/analyze_signal.py signal.complex -s 2000000 --demodulate fsk + +# OOK with custom amplitude threshold (0.0–1.0, default: auto) +python3 skills/urh/analyze_signal.py signal.complex -s 2000000 --demodulate ook --threshold 0.3 + +# Show timing clusters after OOK demodulation (pulse-width analysis) +python3 skills/urh/analyze_signal.py signal.complex -s 2000000 --demodulate ook --clusters + +# Show pulse duration histogram after OOK demodulation +python3 skills/urh/analyze_signal.py signal.complex -s 2000000 --demodulate ook --histogram + +# Show raw demodulated pulse transitions (-n limits count) +python3 skills/urh/analyze_signal.py signal.complex -s 2000000 --demodulate ook --raw -n 50 +``` + +### Protocol Decoding (urh_cli) + +```bash +# Auto-analyze with urh_cli (detects modulation automatically) +python3 skills/urh/analyze_signal.py signal.complex -s 2000000 --decode + +# Specify modulation for urh_cli decoding +python3 skills/urh/analyze_signal.py signal.complex -s 2000000 --decode --modulation OOK +python3 skills/urh/analyze_signal.py signal.complex -s 2000000 --decode --modulation FSK +python3 skills/urh/analyze_signal.py signal.complex -s 2000000 --decode --modulation ASK +python3 skills/urh/analyze_signal.py signal.complex -s 2000000 --decode --modulation PSK +``` + +### Export + +```bash +# Export IQ samples to CSV +python3 skills/urh/analyze_signal.py signal.complex -s 2000000 --export iq.csv + +# Export OOK pulse transitions to CSV (like sigrok's --export) +python3 skills/urh/analyze_signal.py signal.complex -s 2000000 --demodulate ook --export pulses.csv +``` + +### Direct urh_cli Usage + +```bash +# Auto-detect modulation and print decoded bits +urh_cli analyze -f signal.complex -s 2000000 + +# Specify modulation explicitly +urh_cli analyze -f signal.complex -s 2000000 --modulation OOK + +# Analyze a WAV file +urh_cli analyze -f capture.wav -s 44100 --modulation OOK + +# Launch the full URH graphical interface (for recording, visual inspection, protocol editor) +urh +``` + +## Recording RF Signals + +URH recording is done through the **GUI** — launch it with: + +```bash +urh +``` + +In the GUI: + +1. Go to **File > Record Signal** +2. Select your SDR device (RTL-SDR, HackRF, USRP, LimeSDR, PlutoSDR, etc.) +3. Set the center frequency (e.g., 433.92 MHz for common remotes, 868 MHz for Z-Wave EU) +4. Set the sample rate (minimum 2× the signal bandwidth; 2 MSPS is a safe default for narrow-band IoT) +5. Set the gain (start around 20–30 dB for RTL-SDR) +6. Press **Record** and trigger the device under test +7. Save as `.complex` (URH native format) for subsequent analysis + +### Recommended SDR Settings by Protocol + +| Protocol | Frequency | Sample Rate | Notes | +|----------|-----------|-------------|-------| +| 433MHz remotes | 433.92 MHz | 2 MSPS | OOK, very common | +| 315MHz remotes | 315 MHz | 2 MSPS | OOK, North America | +| Z-Wave EU | 868.42 MHz | 2 MSPS | FSK, 9.6/40/100 kbps | +| Z-Wave US | 908.42 MHz | 2 MSPS | FSK | +| Zigbee | 2.405–2.480 GHz | 4+ MSPS | O-QPSK, needs HackRF/USRP | + +## Common RF Protocol Patterns + +### OOK (On-Off Keying) + +The most common modulation for 433MHz and 315MHz consumer RF remotes. + +- **Principle**: Carrier ON = 1-bit, carrier OFF = 0-bit (or vice versa) +- **Identifying features**: Signal amplitude alternates between high and near-zero; clear ON/OFF envelope +- **Typical bit rates**: 1 kbps – 10 kbps +- **Common encodings**: NRZ (direct), pulse-width (short/long pulse = 0/1), Manchester +- **Preamble**: Alternating 1/0 for AGC settling +- **Payload**: Typically 24–32 bits (device address + command) +- **Decoder**: `--demodulate ook` or `--modulation OOK` + +```bash +python3 skills/urh/analyze_signal.py remote.complex -s 2000000 --demodulate ook --clusters +urh_cli analyze -f remote.complex -s 2000000 --modulation OOK +``` + +### ASK (Amplitude Shift Keying) + +Generalization of OOK where the amplitude shifts between two non-zero levels. + +- **Principle**: Two amplitude levels encode 0 and 1 (unlike OOK, carrier is always present) +- **Identifying features**: Amplitude varies but signal is never fully off +- **Common in**: RFID (125 kHz), some proprietary protocols +- **Decoder**: `--modulation ASK` + +### FSK (Frequency Shift Keying) + +Common for IoT sensors, Z-Wave, and many 433MHz/868MHz devices. + +- **Principle**: Two frequencies encode 0 and 1 (e.g., center ± deviation) +- **Identifying features**: Constant-amplitude signal that shifts frequency; visible as two lobes in the spectrum +- **Typical deviation**: 25 kHz – 100 kHz for narrowband IoT +- **Decoder**: `--demodulate fsk` or `--modulation FSK` + +```bash +python3 skills/urh/analyze_signal.py sensor.complex -s 2000000 --demodulate fsk +urh_cli analyze -f zwave.complex -s 2000000 --modulation FSK +``` + +### PSK (Phase Shift Keying) + +Used in protocols requiring higher data rates or better noise immunity. + +- **Principle**: Phase of the carrier encodes bits (BPSK: 2 phases, QPSK: 4 phases) +- **Identifying features**: Constant amplitude and frequency; phase reversals visible in I/Q constellation +- **Common in**: Zigbee (O-QPSK at 2.4 GHz), RFID +- **Decoder**: `--modulation PSK` + +### Common IoT Protocols + +#### 433MHz / 315MHz Remotes (OOK, Pulse-Width or Manchester) + +- **Modulation**: OOK +- **Bit encoding**: Pulse-width (short ≈ 300–500 µs = 0, long ≈ 900–1500 µs = 1) or Manchester +- **Frame**: 24–48 bits, repeated 3–10 times with guard gap +- **Attack**: Replay — record and retransmit the exact bit sequence + +#### Z-Wave (868 MHz EU / 908 MHz US) + +- **Modulation**: 2-FSK +- **Data rates**: 9.6 kbps (legacy), 40 kbps, 100 kbps (Z-Wave+) +- **Frame**: Preamble (0x55 bytes), SOF (0xF0), length, payload, checksum +- **Analysis**: Capture at 2 MSPS, demodulate FSK, look for 0x55 preamble + +#### Zigbee (2.4 GHz, O-QPSK) + +- **Modulation**: O-QPSK (requires HackRF or USRP — RTL-SDR cannot tune to 2.4 GHz) +- **Data rate**: 250 kbps +- **Note**: Full decoding requires Wireshark with the IEEE 802.15.4 dissector + +## Analysis Workflow + +### Step 1: Get Signal Overview + +```bash +python3 skills/urh/analyze_signal.py signal.complex -s 2000000 +``` + +Check sample count, duration, and amplitude statistics. Confirm the file loaded correctly. + +### Step 2: Inspect the Spectrum + +```bash +python3 skills/urh/analyze_signal.py signal.complex -s 2000000 --spectrum +``` + +- **Single lobe at center (0 Hz)**: likely OOK or ASK (baseband amplitude modulation) +- **Two symmetrical lobes offset from 0 Hz**: FSK (the offset is the frequency deviation) +- **Flat/spread spectrum**: PSK or higher-order modulation + +### Step 3: Check Signal Energy Over Time + +```bash +python3 skills/urh/analyze_signal.py signal.complex -s 2000000 --energy +``` + +Identify packet boundaries — bursts of high energy separated by silence. Count the number of frames. + +### Step 4: Demodulate + +```bash +# OOK (most common for narrowband IoT / remotes) +python3 skills/urh/analyze_signal.py signal.complex -s 2000000 --demodulate ook --clusters + +# FSK (if spectrum shows two lobes) +python3 skills/urh/analyze_signal.py signal.complex -s 2000000 --demodulate fsk +``` + +### Step 5: Decode with urh_cli + +```bash +urh_cli analyze -f signal.complex -s 2000000 --modulation OOK +``` + +### Step 6: Export and Analyze Bits + +```bash +python3 skills/urh/analyze_signal.py signal.complex -s 2000000 --demodulate ook --export pulses.csv + +# Convert bit string to hex +python3 -c "bits='01001000...'; print(hex(int(bits, 2)))" +``` + +## CTF Tips + +1. **OOK is overwhelmingly common in CTF RF challenges**: If the frequency is 315/433 MHz and the signal has clear bursting, try OOK first. +2. **Start with `--spectrum`**: The spectrum shape is the fastest way to identify OOK vs FSK. +3. **Repeated frames**: Most RF remotes transmit the same frame 3–10 times. Use `--energy` to count packets. +4. **Pulse-width decoding**: If NRZ gives garbage, check `--clusters` — two timing clusters indicate pulse-width encoding (short=0, long=1 or vice versa). +5. **Preamble identification**: A long run of alternating bits at the start is sync/preamble — skip it, focus on what follows. +6. **Manchester encoding**: If the payload looks like `10101010...` throughout, you are seeing un-decoded Manchester. Every `10` → `1`, every `01` → `0`. +7. **URH GUI for difficult signals**: When CLI output is ambiguous, `urh` opens the graphical editor where you can visually set thresholds and zoom into individual bits. +8. **Sample rate must be exact**: If `-s` does not match the actual capture rate, bit timing will be wrong and decoding will fail. Check SDR software logs for the exact rate. +9. **Flag in the bits**: After decoding, convert the bit string to ASCII — CTF flags often appear directly in RF payloads as plaintext strings. +10. **Fixed vs rolling codes**: Simple 433MHz remotes use fixed codes (same bits every press) — safe to replay. Modern systems use rolling codes — replay will not work. + +## Troubleshooting + +### "urh_cli not found" + +Verify the package is installed: +```bash +python3 -c "import urh; print('urh available')" +``` + +If the import works but `urh_cli` is not on PATH: +```bash +python3 -m urh.cli.urh_cli analyze -f signal.complex -s 2000000 +``` + +### "No module named 'urh'" + +Install the package. URH requires Cython to build its C extensions — install it first: +```bash +pip install cython +pip install urh +``` + +### "You need Cython to build URH's extensions!" + +Cython is a build-time dependency for URH. Install it before installing URH: +```bash +pip install cython +pip install urh +``` + +### Decoded bits look like noise / all zeros / all ones + +- **Wrong sample rate**: The `-s` value must exactly match the rate used during capture. +- **Wrong modulation**: Try a different modulation type. +- **Inverted signal**: Try `--demodulate ook --threshold 0.7` or manually invert bits: + ```bash + python3 -c "bits=open('bits.txt').read().strip(); print(''.join('1' if b=='0' else '0' for b in bits))" + ``` + +### "No transitions detected" after OOK demodulation + +- Signal may be too weak or clipped. Check amplitude statistics with the basic run. +- Try a lower threshold: `--threshold 0.1` +- Use `--energy` to verify packets are actually present in the signal. + +### URH GUI fails to launch (Qt errors on Linux) + +```bash +# With X11 forwarding: +ssh -X user@host urh + +# Check Qt installation: +python3 -c "from PyQt5.QtWidgets import QApplication; print('Qt OK')" +``` + +### "Unsupported format" + +- GNU Radio `.cfile` → rename to `.complex` +- Raw RTL-SDR uint8 I/Q → this format is not supported; convert to `.complex16u` or float32 first +- If unsure about format, try `.complex` and check if the spectrum looks reasonable diff --git a/skills/urh/analyze_signal.py b/skills/urh/analyze_signal.py new file mode 100644 index 0000000..9372fc7 --- /dev/null +++ b/skills/urh/analyze_signal.py @@ -0,0 +1,783 @@ +#!/usr/bin/env python3 +""" +RF Signal Analyzer for URH (Universal Radio Hacker) compatible captures. + +Analyzes IQ signal files (.complex, .complex16s, .complex16u, .wav) to +identify modulation type, demodulate OOK/FSK signals, and decode protocols +using urh_cli. For CTF RF challenges and IoT device reverse engineering. +""" + +import argparse +import subprocess +import sys +from dataclasses import dataclass, field +from pathlib import Path +from typing import List, Optional, Tuple + +import numpy as np + +# Add parent directory to path for shared module imports +sys.path.insert(0, str(Path(__file__).resolve().parent.parent)) +from common.signal_analysis import ( + detect_clusters, + export_transitions_csv, + format_duration, + guess_protocol, + parse_sample_rate, + print_histogram, +) +from common.signal_analysis import analyze_timing as _analyze_timing + + +# ============================================================================ +# CONSTANTS +# ============================================================================ + +SUPPORTED_EXTENSIONS = {'.complex', '.cfile', '.complex16s', '.complex16u', '.wav'} + +# URH filename convention: signal__.complex +# e.g. signal_2000000_433920000.complex or capture_2M_433.92M.complex +_RATE_PATTERN = r'(\d+(?:\.\d+)?)\s*([kMG]?)(?:_|\b)' + + +# ============================================================================ +# DATA STRUCTURES +# ============================================================================ + +@dataclass +class SignalData: + """Normalized IQ signal data from any supported format.""" + samples: np.ndarray # Complex float64 samples + sample_rate: float # Samples per second (Hz) + center_freq: float # Center frequency in Hz (0 if unknown) + file_format: str # 'complex', 'complex16s', 'complex16u', or 'wav' + duration: float # Total duration in seconds + n_samples: int # Total sample count + available_channels: List[str] = field(default_factory=list) + + +# ============================================================================ +# URH-CLI INTERACTION +# ============================================================================ + +def check_urh_cli() -> Tuple[bool, str]: + """Check if urh_cli is installed and return version info.""" + for cmd in [['urh_cli', '--version'], ['python3', '-m', 'urh.cli.urh_cli', '--version']]: + try: + result = subprocess.run( + cmd, + capture_output=True, + text=True, + timeout=10 + ) + if result.returncode == 0: + version = (result.stdout or result.stderr).strip().split('\n')[0] + return True, version + except FileNotFoundError: + continue + except subprocess.TimeoutExpired: + return False, "urh_cli timed out" + except Exception as e: + return False, str(e) + return False, "urh_cli not found in PATH" + + +def run_urh_cli(args: List[str], timeout: int = 60) -> Tuple[int, str, str]: + """ + Run urh_cli with the given arguments. + + Returns (returncode, stdout, stderr). + """ + # Try direct command first, fall back to module invocation + for cmd_prefix in [['urh_cli'], ['python3', '-m', 'urh.cli.urh_cli']]: + cmd = cmd_prefix + args + try: + result = subprocess.run( + cmd, + capture_output=True, + text=True, + timeout=timeout + ) + return result.returncode, result.stdout, result.stderr + except FileNotFoundError: + continue + except subprocess.TimeoutExpired: + return 1, '', 'urh_cli timed out' + return 1, '', 'urh_cli not found' + + +def _print_install_help(): + print("Install URH with: pip install urh") + print("Or run via module: python3 -m urh.cli.urh_cli") + + +# ============================================================================ +# FILENAME PARSING +# ============================================================================ + +def _parse_rate_from_name(name: str) -> float: + """ + Try to extract sample rate from a filename. + + URH convention: signal__.complex + Examples: capture_2000000_433920000.complex, sig_2M_433.92M.complex + """ + import re + # Look for standalone numbers that could be a sample rate (common: 1M, 2M, 8M) + # Pattern: digits followed by optional k/M/G multiplier + matches = re.findall(r'(\d+(?:\.\d+)?)([kKmMgG]?)', name) + for val_str, unit in matches: + val = float(val_str) + multiplier = {'k': 1e3, 'K': 1e3, 'm': 1e6, 'M': 1e6, 'g': 1e9, 'G': 1e9}.get(unit, 1) + rate = val * multiplier + # Plausible sample rates: 8kHz – 56MHz + if 8e3 <= rate <= 56e6: + return rate + return 0.0 + + +def _parse_freq_from_name(name: str) -> float: + """ + Try to extract center frequency from a filename. + + Looks for frequency hints like 433, 433.92, 433920000. + Skips the first plausible sample-rate value to avoid returning + the sample rate as the center frequency on URH-convention filenames + like capture_2000000_433920000.complex. + """ + import re + matches = re.findall(r'(\d+(?:\.\d+)?)([kKmMgG]?)', name) + skipped_one = False + for val_str, unit in matches: + val = float(val_str) + multiplier = {'k': 1e3, 'K': 1e3, 'm': 1e6, 'M': 1e6, 'g': 1e9, 'G': 1e9}.get(unit, 1) + candidate = val * multiplier + # Skip the first value that falls in the sample-rate range (8 kHz – 56 MHz) + # so that we return the second numeric token (the center frequency) + if not skipped_one and 8e3 <= candidate <= 56e6: + skipped_one = True + continue + # Plausible RF center frequencies: 100 kHz – 6 GHz + if 1e5 <= candidate <= 6e9: + return candidate + return 0.0 + + +# ============================================================================ +# FILE LOADING +# ============================================================================ + +def load_signal(file_path: Path, sample_rate: float = 0.0, + center_freq: float = 0.0) -> SignalData: + """ + Load an IQ signal file and return normalized SignalData. + + Auto-detects format from file extension. + """ + ext = file_path.suffix.lower() + if ext in ('.complex', '.cfile'): + return _load_complex32(file_path, sample_rate, center_freq) + elif ext == '.complex16s': + return _load_complex16(file_path, sample_rate, center_freq, signed=True) + elif ext == '.complex16u': + return _load_complex16(file_path, sample_rate, center_freq, signed=False) + elif ext == '.wav': + return _load_wav(file_path, sample_rate, center_freq) + else: + raise ValueError( + f"Unsupported format '{ext}'. " + f"Supported: {', '.join(sorted(SUPPORTED_EXTENSIONS))}" + ) + + +def _load_complex32(file_path: Path, sample_rate: float, + center_freq: float) -> SignalData: + """Load float32 interleaved IQ file (.complex, .cfile).""" + raw = np.fromfile(str(file_path), dtype=np.float32) + if len(raw) % 2 != 0: + raw = raw[:-1] + samples = raw[0::2].astype(np.float64) + 1j * raw[1::2].astype(np.float64) + + if sample_rate == 0.0: + sample_rate = _parse_rate_from_name(file_path.name) + if center_freq == 0.0: + center_freq = _parse_freq_from_name(file_path.name) + + n = len(samples) + duration = n / sample_rate if sample_rate > 0 else 0.0 + return SignalData( + samples=samples, + sample_rate=sample_rate, + center_freq=center_freq, + file_format='complex', + duration=duration, + n_samples=n, + ) + + +def _load_complex16(file_path: Path, sample_rate: float, + center_freq: float, signed: bool) -> SignalData: + """Load int16 interleaved IQ file (.complex16s or .complex16u).""" + dtype = np.int16 if signed else np.uint16 + raw = np.fromfile(str(file_path), dtype=dtype) + if len(raw) % 2 != 0: + raw = raw[:-1] + + i_vals = raw[0::2].astype(np.float64) + q_vals = raw[1::2].astype(np.float64) + + # Normalize to [-1, 1] + scale = 32768.0 if signed else 32768.0 + if not signed: + i_vals -= 32768.0 + q_vals -= 32768.0 + samples = (i_vals / scale) + 1j * (q_vals / scale) + + if sample_rate == 0.0: + sample_rate = _parse_rate_from_name(file_path.name) + if center_freq == 0.0: + center_freq = _parse_freq_from_name(file_path.name) + + fmt = 'complex16s' if signed else 'complex16u' + n = len(samples) + duration = n / sample_rate if sample_rate > 0 else 0.0 + return SignalData( + samples=samples, + sample_rate=sample_rate, + center_freq=center_freq, + file_format=fmt, + duration=duration, + n_samples=n, + ) + + +def _load_wav(file_path: Path, sample_rate: float, + center_freq: float) -> SignalData: + """ + Load a WAV file as IQ signal. + + Stereo WAV: left channel = I, right channel = Q. + Mono WAV: treated as real-valued (Q = 0). + """ + try: + import wave + import struct + with wave.open(str(file_path), 'rb') as wf: + n_channels = wf.getnchannels() + samp_width = wf.getsampwidth() + wav_rate = wf.getframerate() + n_frames = wf.getnframes() + raw_bytes = wf.readframes(n_frames) + + if sample_rate == 0.0: + sample_rate = float(wav_rate) + + # Decode samples + fmt_map = {1: np.int8, 2: np.int16, 4: np.int32} + if samp_width not in fmt_map: + raise ValueError(f"Unsupported WAV sample width: {samp_width} bytes") + dtype = fmt_map[samp_width] + max_val = float(2 ** (8 * samp_width - 1)) + + all_samples = np.frombuffer(raw_bytes, dtype=dtype).astype(np.float64) / max_val + + if n_channels == 2: + i_vals = all_samples[0::2] + q_vals = all_samples[1::2] + samples = i_vals + 1j * q_vals + else: + samples = all_samples + 0j + + except ImportError: + raise ImportError("Could not import 'wave' module (standard library)") + + if center_freq == 0.0: + center_freq = _parse_freq_from_name(file_path.name) + + n = len(samples) + duration = n / sample_rate if sample_rate > 0 else 0.0 + return SignalData( + samples=samples, + sample_rate=sample_rate, + center_freq=center_freq, + file_format='wav', + duration=duration, + n_samples=n, + ) + + +# ============================================================================ +# SIGNAL ANALYSIS +# ============================================================================ + +def compute_amplitude_envelope(samples: np.ndarray) -> np.ndarray: + """Compute the amplitude (magnitude) envelope of the IQ signal.""" + return np.abs(samples) + + +def compute_spectrum(samples: np.ndarray, sample_rate: float, + n_bins: int = 512) -> Tuple[np.ndarray, np.ndarray]: + """ + Compute a power spectrum (FFT) of the IQ signal. + + Returns (freqs_hz, power_db) arrays, both of length n_bins. + Frequencies are centered on 0 Hz (baseband). + """ + # Downsample to n_bins samples, apply window, then FFT + if len(samples) >= n_bins: + # Decimate: average groups of samples down to n_bins points + step = len(samples) // n_bins + decimated = samples[:step * n_bins].reshape(n_bins, step).mean(axis=1) + else: + # Zero-pad to n_bins + decimated = np.zeros(n_bins, dtype=complex) + decimated[:len(samples)] = samples + + window = np.hanning(n_bins) + fft_out = np.fft.fftshift(np.fft.fft(decimated * window)) + power = 20 * np.log10(np.abs(fft_out) + 1e-10) + freqs = np.fft.fftshift(np.fft.fftfreq(n_bins, d=1.0 / sample_rate)) + return freqs, power + + +def compute_energy_profile(samples: np.ndarray, block_size: int = 512) -> np.ndarray: + """ + Compute the signal energy in consecutive blocks. + + Returns an array of RMS energy values, one per block. + """ + n_blocks = len(samples) // block_size + if n_blocks == 0: + return np.array([float(np.mean(np.abs(samples)))]) + trimmed = samples[:n_blocks * block_size].reshape(n_blocks, block_size) + return np.mean(np.abs(trimmed), axis=1) + + +def demodulate_ook(samples: np.ndarray, threshold: Optional[float] = None + ) -> Tuple[np.ndarray, float]: + """ + Demodulate OOK signal to a binary sequence via amplitude thresholding. + + Returns (binary_signal, threshold_used) where binary_signal is a 0/1 array. + """ + envelope = compute_amplitude_envelope(samples) + env_min = float(envelope.min()) + env_max = float(envelope.max()) + + if threshold is None: + # Otsu-like: midpoint between noise floor and peak + threshold = env_min + (env_max - env_min) * 0.5 + + binary = (envelope >= threshold).astype(np.uint8) + return binary, threshold + + +def demodulate_fsk(samples: np.ndarray) -> np.ndarray: + """ + Demodulate FSK signal via instantaneous frequency. + + Returns a float array of instantaneous frequency deviations (Hz not scaled). + Positive values correspond to one symbol, negative to the other. + """ + # Instantaneous frequency from phase derivative + phase = np.angle(samples) + inst_freq = np.diff(np.unwrap(phase)) + return inst_freq + + +def extract_ook_transitions(binary_signal: np.ndarray, + sample_rate: float) -> Tuple[np.ndarray, int]: + """ + Extract transition timestamps (in seconds) from a binary OOK signal. + + Returns (times, initial_state) compatible with common/signal_analysis.py. + """ + if len(binary_signal) == 0: + return np.array([]), 0 + + initial_state = int(binary_signal[0]) + + # Find indices where the signal changes value + diffs = np.diff(binary_signal.astype(np.int8)) + transition_indices = np.where(diffs != 0)[0] + 1 # +1: index after transition + + # Convert indices to timestamps + times = transition_indices.astype(np.float64) / sample_rate + return times, initial_state + + +def detect_modulation_hint(signal: SignalData) -> str: + """ + Provide a coarse modulation hint based on spectrum and amplitude characteristics. + + Returns a string: 'OOK/ASK', 'FSK', 'PSK', or 'UNKNOWN'. + """ + env = compute_amplitude_envelope(signal.samples) + env_max = float(env.max()) + if env_max == 0: + return 'UNKNOWN' + + env_norm = env / env_max + + # OOK/ASK: amplitude varies significantly (ratio of std to mean is high) + env_std = float(env_norm.std()) + env_mean = float(env_norm.mean()) + amp_variation = env_std / (env_mean + 1e-9) + + # FSK: look for two lobes in the spectrum + if signal.sample_rate > 0: + _, power = compute_spectrum(signal.samples, signal.sample_rate) + half = len(power) // 2 + left_power = float(np.max(power[:half])) + right_power = float(np.max(power[half:])) + # Two roughly equal lobes on both sides: FSK + if abs(left_power - right_power) < 6 and left_power > -60: + return 'FSK' + + if amp_variation > 0.3: + return 'OOK/ASK' + + return 'PSK' + + +# ============================================================================ +# OUTPUT DISPLAY +# ============================================================================ + +def print_spectrum(freqs: np.ndarray, power: np.ndarray, + width: int = 60, title: str = "Power Spectrum"): + """Print an ASCII representation of the power spectrum.""" + p_min = float(power.min()) + p_max = float(power.max()) + p_range = p_max - p_min if p_max > p_min else 1.0 + + # Downsample to display width + n_bins = len(power) + step = max(1, n_bins // width) + display_power = [] + display_freqs = [] + for i in range(0, n_bins, step): + end = min(i + step, n_bins) + display_power.append(float(np.max(power[i:end]))) + display_freqs.append(float(np.mean(freqs[i:end]))) + + print(f"\n{title}") + print("=" * 60) + print(f"{'Freq (kHz)':>12} Spectrum") + print("-" * 60) + + for freq_hz, p in zip(display_freqs, display_power): + bar_len = int(40 * (p - p_min) / p_range) + bar = "#" * bar_len + freq_khz = freq_hz / 1e3 + print(f"{freq_khz:>12.1f} |{bar}") + + print(f"\n Peak power: {p_max:.1f} dB") + print(f" Noise floor: {p_min:.1f} dB") + + +def print_energy_profile(energy: np.ndarray, sample_rate: float, + block_size: int = 512, title: str = "Signal Energy"): + """Print an ASCII energy-over-time plot.""" + e_max = float(energy.max()) + if e_max == 0: + print(f"{title}: No signal energy detected") + return + + energy_norm = energy / e_max + block_duration_ms = block_size / sample_rate * 1000.0 + + print(f"\n{title} (block size: {block_duration_ms:.2f}ms)") + print("=" * 60) + + width = 40 + for i, e in enumerate(energy_norm): + bar_len = int(width * e) + bar = "#" * bar_len + t_ms = i * block_duration_ms + print(f"{t_ms:8.1f}ms |{bar}") + + +# ============================================================================ +# CLI +# ============================================================================ + +def main(): + parser = argparse.ArgumentParser( + description="Analyze RF IQ signal captures for URH-compatible files" + ) + parser.add_argument("file", type=Path, help="IQ signal file (.complex, .cfile, .complex16s, .complex16u, .wav)") + parser.add_argument("-s", "--sample-rate", type=float, default=0.0, metavar="HZ", + help="Sample rate in Hz (e.g. 2000000 for 2 MSPS). " + "Parsed from filename if not specified.") + parser.add_argument("-c", "--center-freq", type=float, default=0.0, metavar="HZ", + help="Center frequency in Hz (for display only).") + + # Display options + parser.add_argument("--spectrum", action="store_true", + help="Show ASCII power spectrum") + parser.add_argument("--energy", action="store_true", + help="Show signal energy over time (packet boundary detection)") + parser.add_argument("--raw", action="store_true", + help="Show raw IQ samples (basic stats) or raw pulse transitions " + "(after demodulation)") + parser.add_argument("-n", type=int, default=20, + help="Number of raw values to show (default: 20)") + + # Demodulation + parser.add_argument("--demodulate", choices=['ook', 'fsk', 'ask'], metavar="MODE", + help="Demodulate the signal (ook, fsk, ask). " + "After OOK demodulation, enables --clusters, --histogram, --raw for pulse analysis.") + parser.add_argument("--threshold", type=float, default=None, + help="OOK amplitude threshold 0.0–1.0 (default: auto midpoint)") + + # Pulse analysis (after OOK/ASK demodulation) + parser.add_argument("--clusters", action="store_true", + help="Show detected pulse duration clusters (after OOK demodulation)") + parser.add_argument("--histogram", action="store_true", + help="Show pulse duration histogram (after OOK demodulation)") + parser.add_argument("--bins", type=int, default=20, + help="Number of histogram bins (default: 20)") + + # Protocol decoding via urh_cli + parser.add_argument("--decode", action="store_true", + help="Run urh_cli auto-analysis for protocol decoding") + parser.add_argument("--modulation", choices=['OOK', 'ASK', 'FSK', 'PSK'], + help="Modulation hint for urh_cli --decode (OOK, ASK, FSK, PSK)") + + # Export + parser.add_argument("--export", type=Path, metavar="CSV", + help="Export to CSV. Without --demodulate: exports IQ samples. " + "With --demodulate ook: exports pulse transitions.") + + args = parser.parse_args() + + if not args.file.exists(): + print(f"Error: File not found: {args.file}") + sys.exit(1) + + ext = args.file.suffix.lower() + if ext not in SUPPORTED_EXTENSIONS: + print(f"Error: Unsupported file extension '{ext}'") + print(f"Supported: {', '.join(sorted(SUPPORTED_EXTENSIONS))}") + sys.exit(1) + + # --- Load signal --- + try: + signal = load_signal(args.file, sample_rate=args.sample_rate, + center_freq=args.center_freq) + except Exception as e: + print(f"Error loading file: {e}") + sys.exit(1) + + if signal.sample_rate == 0.0: + print("Warning: Sample rate unknown. Use -s to specify it.") + print(" Example: -s 2000000 for 2 MSPS") + print() + + # --- Basic info --- + print(f"File: {args.file}") + print(f"Format: {signal.file_format}") + print(f"Samples: {signal.n_samples:,}") + if signal.sample_rate > 0: + rate_str = f"{signal.sample_rate/1e6:.3f} MSPS" if signal.sample_rate >= 1e6 else f"{signal.sample_rate/1e3:.1f} kSPS" + print(f"Sample rate: {rate_str}") + print(f"Duration: {signal.duration*1000:.2f} ms ({signal.duration:.4f} s)") + if signal.center_freq > 0: + freq_str = f"{signal.center_freq/1e6:.3f} MHz" if signal.center_freq >= 1e6 else f"{signal.center_freq/1e3:.1f} kHz" + print(f"Center freq: {freq_str}") + + env = compute_amplitude_envelope(signal.samples) + env_max = float(env.max()) + env_mean = float(env.mean()) + print() + print("Amplitude") + print("-" * 40) + print(f" Peak: {env_max:.4f}") + print(f" Mean: {env_mean:.4f}") + print(f" Std: {float(env.std()):.4f}") + + # Modulation hint + hint = detect_modulation_hint(signal) + print() + print(f"Modulation hint: {hint}") + print() + + # --- Spectrum --- + if args.spectrum: + if signal.sample_rate == 0.0: + print("Warning: Sample rate unknown — spectrum frequencies will be incorrect.") + sample_rate_for_fft = 1.0 + else: + sample_rate_for_fft = signal.sample_rate + freqs, power = compute_spectrum(signal.samples, sample_rate_for_fft) + print_spectrum(freqs, power) + print() + + # --- Energy profile --- + if args.energy: + block_size = max(64, signal.n_samples // 200) + energy = compute_energy_profile(signal.samples, block_size=block_size) + if signal.sample_rate > 0: + print_energy_profile(energy, signal.sample_rate, block_size=block_size) + else: + print_energy_profile(energy, 1.0, block_size=block_size) + print() + + # --- Raw IQ samples (no demodulation) --- + if args.raw and not args.demodulate: + print(f"First {args.n} IQ Samples") + print("-" * 40) + for i in range(min(args.n, signal.n_samples)): + s = signal.samples[i] + amp = abs(s) + print(f" [{i:4d}] I={s.real:+.4f} Q={s.imag:+.4f} |{amp:.4f}|") + print() + + # --- Demodulation --- + if args.demodulate: + mode = args.demodulate.lower() + + if mode in ('ook', 'ask'): + binary, thresh_used = demodulate_ook(signal.samples, threshold=args.threshold) + print(f"OOK Demodulation (threshold: {thresh_used:.4f})") + print("-" * 40) + + if signal.sample_rate > 0: + times, initial_state = extract_ook_transitions(binary, signal.sample_rate) + print(f"Transitions detected: {len(times)}") + if len(times) < 2: + print("Warning: Very few transitions detected.") + print(" Try adjusting --threshold or check that the signal contains pulses.") + print() + else: + analysis = _analyze_timing(times, initial_state, signal.duration) + if 'error' in analysis: + print(f"Timing analysis error: {analysis['error']}") + else: + from common.signal_analysis import format_duration + print(f"Initial state: {'HIGH' if initial_state else 'LOW'}") + a = analysis['all'] + print(f"All pulses: min={format_duration(a['min_us'])} " + f"max={format_duration(a['max_us'])} " + f"mean={format_duration(a['mean_us'])}") + h = analysis['high'] + print(f"HIGH pulses ({h['count']}): min={format_duration(h['min_us'])} " + f"max={format_duration(h['max_us'])}") + lo = analysis['low'] + print(f"LOW gaps ({lo['count']}): min={format_duration(lo['min_us'])} " + f"max={format_duration(lo['max_us'])}") + print() + + guesses = guess_protocol(analysis) + if guesses: + print("Protocol Guesses (from pulse timing)") + print("-" * 40) + for name, confidence, details in guesses: + print(f" {name} ({confidence*100:.0f}% confidence)") + print(f" {details}") + print() + + if args.clusters: + print("Detected Pulse Duration Clusters") + print("-" * 40) + high_clusters = detect_clusters(analysis['high_durations_us']) + print("HIGH pulse clusters:") + for center, count in high_clusters[:6]: + print(f" ~{format_duration(center)} ({count} occurrences)") + low_clusters = detect_clusters(analysis['low_durations_us']) + print("LOW gap clusters:") + for center, count in low_clusters[:6]: + print(f" ~{format_duration(center)} ({count} occurrences)") + print() + + if args.histogram: + print_histogram(analysis['durations_us'], bins=args.bins, + title="All Pulse Durations") + print_histogram(analysis['high_durations_us'], bins=args.bins, + title="HIGH Pulse Durations") + print_histogram(analysis['low_durations_us'], bins=args.bins, + title="LOW Gap Durations") + + if args.raw: + print(f"First {args.n} Pulse Transitions") + print("-" * 40) + durations = analysis['durations_us'] + for i in range(min(args.n, len(durations))): + state = "HIGH" if (i + initial_state) % 2 == 0 else "LOW" + print(f" [{i:3d}] {state}: {format_duration(durations[i])}") + print() + + if args.export: + export_transitions_csv(times, initial_state, args.export) + # (print message inside export_transitions_csv) + else: + # No sample rate: show raw binary sequence only + print(f"Binary (first {args.n*8} bits): ", end='') + print(''.join(map(str, binary[:args.n * 8]))) + print() + + elif mode == 'fsk': + inst_freq = demodulate_fsk(signal.samples) + threshold = float(np.median(inst_freq)) + binary_fsk = (inst_freq > threshold).astype(np.uint8) + print(f"FSK Demodulation (frequency threshold: {threshold:.4f})") + print("-" * 40) + print(f"Instantaneous freq: min={float(inst_freq.min()):.4f} " + f"max={float(inst_freq.max()):.4f} " + f"mean={float(inst_freq.mean()):.4f}") + print() + if args.raw: + print(f"First {args.n} demodulated FSK values (normalized freq)") + print("-" * 40) + for i in range(min(args.n, len(inst_freq))): + sym = "HI" if binary_fsk[i] else "LO" + print(f" [{i:4d}] freq={inst_freq[i]:+.4f} -> {sym}") + print() + + if args.export: + with open(args.export, 'w') as f: + f.write("index,inst_freq,symbol\n") + for i, (fr, sym) in enumerate(zip(inst_freq, binary_fsk)): + f.write(f"{i},{fr:.6f},{sym}\n") + print(f"Exported {len(inst_freq)} FSK samples to {args.export}") + + # --- Protocol decoding via urh_cli --- + if args.decode: + ok, version = check_urh_cli() + if not ok: + print(f"Error: urh_cli not available ({version})") + _print_install_help() + sys.exit(1) + + print(f"urh_cli Protocol Decoding [{version}]") + print("-" * 40) + + urh_args = ['analyze', '-f', str(args.file)] + if signal.sample_rate > 0: + urh_args += ['-s', str(int(signal.sample_rate))] + if args.modulation: + urh_args += ['--modulation', args.modulation] + + rc, stdout, stderr = run_urh_cli(urh_args) + if stdout: + print(stdout) + if stderr and rc != 0: + print(f"urh_cli error: {stderr.strip()}") + if rc != 0: + print("urh_cli returned non-zero exit code. Check the error message above.") + print() + + # --- Export IQ (without demodulation) --- + if args.export and not args.demodulate: + n_export = min(signal.n_samples, 100000) + with open(args.export, 'w') as f: + f.write("index,i,q,amplitude\n") + for i in range(n_export): + s = signal.samples[i] + f.write(f"{i},{s.real:.6f},{s.imag:.6f},{abs(s):.6f}\n") + print(f"Exported {n_export} IQ samples to {args.export}") + + +if __name__ == "__main__": + main() diff --git a/skills/urh/examples.md b/skills/urh/examples.md new file mode 100644 index 0000000..735cfa0 --- /dev/null +++ b/skills/urh/examples.md @@ -0,0 +1,243 @@ +# URH RF Signal Analysis Examples + +## Example 1: Unknown RF Signal — Full Analysis Pipeline + +**Scenario**: You have a captured RF signal from an unknown IoT device and need to identify the protocol. + +### Step 1: Get an overview + +```bash +python3 skills/urh/analyze_signal.py signal.complex -s 2000000 +``` + +Output: +``` +File: signal.complex +Format: complex +Samples: 200,000 +Sample rate: 2.000 MSPS +Duration: 100.00 ms (0.1000 s) + +Amplitude +---------------------------------------- + Peak: 0.8432 + Mean: 0.1234 + Std: 0.2156 + +Modulation hint: OOK/ASK +``` + +### Step 2: Inspect the spectrum + +```bash +python3 skills/urh/analyze_signal.py signal.complex -s 2000000 --spectrum +``` + +A single lobe centered at 0 Hz confirms this is OOK/ASK (baseband amplitude modulation). + +### Step 3: Check for packet boundaries + +```bash +python3 skills/urh/analyze_signal.py signal.complex -s 2000000 --energy +``` + +Look for bursts of high energy separated by silence. This shows how many frames are in the capture. + +### Step 4: Demodulate and analyze timing + +```bash +python3 skills/urh/analyze_signal.py signal.complex -s 2000000 --demodulate ook --clusters +``` + +Output: +``` +OOK Demodulation (threshold: 0.4216) +---------------------------------------- +Transitions detected: 156 +Initial state: LOW +All pulses: min=300.0us max=1200.0us mean=600.0us +HIGH pulses (78): min=300.0us max=900.0us +LOW gaps (78): min=300.0us max=1200.0us + +Detected Pulse Duration Clusters +---------------------------------------- +HIGH pulse clusters: + ~300.0us (52 occurrences) + ~900.0us (26 occurrences) +LOW gap clusters: + ~300.0us (48 occurrences) + ~1200.0us (8 occurrences) +``` + +Two pulse duration clusters (300µs and 900µs, ratio 1:3) → pulse-width encoding. + +### Step 5: Decode with urh_cli + +```bash +urh_cli analyze -f signal.complex -s 2000000 --modulation OOK +``` + +--- + +## Example 2: 433MHz Remote Control + +**Scenario**: Captured a 433.92 MHz garage door remote with 2 MSPS sample rate. + +### Identify the protocol + +```bash +python3 skills/urh/analyze_signal.py remote_433.complex -s 2000000 --demodulate ook --clusters --histogram +``` + +Typical output for a fixed-code 433MHz remote: +- 2–3 HIGH pulse clusters (preamble + short bit + long bit) +- Guard gap cluster much longer than data bits +- Repeated 3–5 times + +### Decode to bits + +```bash +urh_cli analyze -f remote_433.complex -s 2000000 --modulation OOK +``` + +### Export pulses for manual analysis + +```bash +python3 skills/urh/analyze_signal.py remote_433.complex -s 2000000 --demodulate ook --export pulses.csv +``` + +### Convert decoded bits to hex + +```bash +python3 -c "bits='010100110101...'; print(hex(int(bits, 2)))" +``` + +--- + +## Example 3: FSK Sensor Signal + +**Scenario**: Captured a 433MHz temperature/humidity sensor that uses FSK. + +### Identify FSK via spectrum + +```bash +python3 skills/urh/analyze_signal.py sensor.complex -s 2000000 --spectrum +``` + +Two symmetrical lobes offset from 0 Hz → FSK confirmed. + +### Demodulate FSK + +```bash +python3 skills/urh/analyze_signal.py sensor.complex -s 2000000 --demodulate fsk +``` + +### Decode with urh_cli + +```bash +urh_cli analyze -f sensor.complex -s 2000000 --modulation FSK +``` + +--- + +## Example 4: CTF RF Challenge + +**Scenario**: CTF provides `capture.cfile` (GNU Radio format). The flag is encoded in the transmission. + +### Step 1: Rename and check (GNU Radio .cfile is float32 IQ) + +```bash +# .cfile is the same as .complex, both are float32 IQ +python3 skills/urh/analyze_signal.py capture.cfile -s 2000000 +``` + +If sample rate is unknown, try common values: 1000000, 2000000, 8000000. + +### Step 2: Try OOK (most common in CTF) + +```bash +python3 skills/urh/analyze_signal.py capture.cfile -s 2000000 --demodulate ook --clusters +``` + +### Step 3: Decode bits + +```bash +urh_cli analyze -f capture.cfile -s 2000000 --modulation OOK +``` + +### Step 4: Look for the flag + +```bash +# Convert bit string to ASCII +python3 -c " +bits = '0100011001001100010000010100011101111011...' +n = len(bits) - (len(bits) % 8) +chars = [chr(int(bits[i:i+8], 2)) for i in range(0, n, 8)] +print(''.join(chars)) +" +``` + +### Step 5: Try Manchester decoding if bits look like alternating pairs + +```bash +python3 -c " +bits = '1001100110...' # raw demodulated bits +# Manchester: 10 -> 1, 01 -> 0 +decoded = [] +for i in range(0, len(bits) - 1, 2): + pair = bits[i:i+2] + if pair == '10': decoded.append('1') + elif pair == '01': decoded.append('0') +print(''.join(decoded)) +" +``` + +--- + +## Example 5: Multiple File Formats + +### GNU Radio .cfile (float32 IQ) + +```bash +python3 skills/urh/analyze_signal.py capture.cfile -s 2000000 --demodulate ook +``` + +### HackRF .complex16s (int16 signed IQ) + +```bash +python3 skills/urh/analyze_signal.py hackrf_capture.complex16s -s 8000000 --spectrum +``` + +### RTL-SDR raw (int16 unsigned IQ) + +```bash +python3 skills/urh/analyze_signal.py rtlsdr_capture.complex16u -s 2048000 --demodulate ook --clusters +``` + +### WAV file (stereo IQ — left=I, right=Q) + +```bash +python3 skills/urh/analyze_signal.py iq_capture.wav --spectrum +# (sample rate read automatically from WAV header) +``` + +--- + +## Useful Direct urh_cli Commands + +```bash +# Full auto-analysis (URH detects modulation) +urh_cli analyze -f signal.complex -s 2000000 + +# Specify modulation type +urh_cli analyze -f signal.complex -s 2000000 --modulation OOK +urh_cli analyze -f signal.complex -s 2000000 --modulation FSK +urh_cli analyze -f signal.complex -s 2000000 --modulation ASK +urh_cli analyze -f signal.complex -s 2000000 --modulation PSK + +# If urh_cli is not on PATH: +python3 -m urh.cli.urh_cli analyze -f signal.complex -s 2000000 + +# Launch GUI for interactive analysis and recording +urh +```