From ab3c004fbd9a8fea332b06e410091c61acf57adb Mon Sep 17 00:00:00 2001 From: yeyeto2788 Date: Tue, 10 Feb 2026 05:22:16 +0100 Subject: [PATCH 1/2] feature: Add sigrok skill implementation --- .gitignore | 3 + README.md | 28 + skills/common/__init__.py | 0 skills/common/signal_analysis.py | 238 ++++++ skills/logicmso/analyze_protocol.py | 231 +----- skills/sigrok/SKILL.md | 263 +++++++ skills/sigrok/analyze_capture.py | 1092 +++++++++++++++++++++++++++ skills/sigrok/examples.md | 346 +++++++++ 8 files changed, 1999 insertions(+), 202 deletions(-) create mode 100644 skills/common/__init__.py create mode 100644 skills/common/signal_analysis.py create mode 100644 skills/sigrok/SKILL.md create mode 100644 skills/sigrok/analyze_capture.py create mode 100644 skills/sigrok/examples.md diff --git a/.gitignore b/.gitignore index 95ad927..ced3406 100644 --- a/.gitignore +++ b/.gitignore @@ -192,3 +192,6 @@ $RECYCLE.BIN/ # Linux .directory .Trash-* + +# Claude local permissions +.claude\settings.local.json \ No newline at end of file diff --git a/README.md b/README.md index 8fc22ca..bbf1c86 100644 --- a/README.md +++ b/README.md @@ -59,6 +59,20 @@ IoTHackBot is a collection of specialized tools and Claude Code skills designed - BusyBox command handling - Includes Python helper script and pre-built enumeration scripts +### Logic Analyzer & Signal Analysis + +- **sigrok** (skill) - Analyze logic analyzer captures using sigrok-cli and 131+ protocol decoders + - Native .sr file parsing (no sigrok-cli needed for timing analysis) + - Supports .sr, .csv, and .vcd formats + - Decode UART, SPI, I2C, CAN, JTAG, USB, 1-Wire, and many more + - Timing analysis with histograms, cluster detection, and protocol guessing + - Binary data extraction from decoded protocols + +- **logicmso** (skill) - Analyze captures from Saleae Logic MSO devices + - Decode protocols (UART, SPI, I2C) from exported binary files + - Digital and analog capture analysis + - Hardware reverse engineering and CTF challenges + ## Installation ### Prerequisites @@ -124,6 +138,18 @@ netflows capture.pcap --source-ip 192.168.1.100 netflows capture.pcap -s 192.168.1.100 --format quiet ``` +#### Analyze Logic Analyzer Captures +```bash +# Timing analysis of a sigrok capture (no sigrok-cli needed) +python3 skills/sigrok/analyze_capture.py capture.sr --histogram --clusters + +# Decode UART protocol (requires sigrok-cli) +python3 skills/sigrok/analyze_capture.py capture.sr --decode uart:baudrate=115200 + +# Analyze a specific channel +python3 skills/sigrok/analyze_capture.py capture.sr --channel D2 --raw +``` + #### Analyze Firmware ```bash # Identify file types @@ -149,7 +175,9 @@ IoTHackBot is available as a Claude Code plugin, providing AI-assisted security | **netflows** | Network flow extraction with DNS hostname resolution | | **nmap** | Professional network reconnaissance | | **onvifscan** | ONVIF device security testing | +| **logicmso** | Saleae Logic MSO capture analysis and protocol decoding | | **picocom** | UART console interaction | +| **sigrok** | Logic analyzer capture analysis with 131+ protocol decoders | | **telnetshell** | Telnet shell enumeration | | **wsdiscovery** | WS-Discovery device discovery | diff --git a/skills/common/__init__.py b/skills/common/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/skills/common/signal_analysis.py b/skills/common/signal_analysis.py new file mode 100644 index 0000000..918242e --- /dev/null +++ b/skills/common/signal_analysis.py @@ -0,0 +1,238 @@ +#!/usr/bin/env python3 +""" +Shared signal analysis utilities for logic analyzer skills. + +Provides timing analysis, cluster detection, protocol guessing, +histogram generation, and duration formatting used by both the +logicmso and sigrok skills. +""" + +import re +from typing import List, Tuple + +import numpy as np + + +# Common baud rates and their bit periods in microseconds +COMMON_BAUD_RATES = { + 300: 3333.33, + 1200: 833.33, + 2400: 416.67, + 4800: 208.33, + 9600: 104.17, + 19200: 52.08, + 38400: 26.04, + 57600: 17.36, + 115200: 8.68, + 230400: 4.34, + 460800: 2.17, + 921600: 1.09, +} + + +def analyze_timing(times: np.ndarray, initial_state: int, + duration: float) -> dict: + """ + Analyze timing characteristics of a digital signal. + + Args: + times: Array of transition timestamps in seconds. + initial_state: Starting logic level (0 or 1). + duration: Total capture duration in seconds. + + Returns: + Dict with timing statistics, or {'error': msg} on failure. + """ + if len(times) < 2: + return {'error': 'Not enough transitions'} + + durations_s = np.diff(times) + durations_us = durations_s * 1e6 + + # Separate HIGH and LOW durations + high_idx = 0 if initial_state == 0 else 1 + low_idx = 1 - high_idx + + high_durations_us = durations_us[high_idx::2] + low_durations_us = durations_us[low_idx::2] + + return { + 'total_transitions': len(times), + 'capture_duration_s': duration, + 'signal_duration_s': times[-1] - times[0] if len(times) > 0 else 0, + 'initial_state': 'HIGH' if initial_state else 'LOW', + 'all': { + 'min_us': float(durations_us.min()), + 'max_us': float(durations_us.max()), + 'mean_us': float(durations_us.mean()), + 'std_us': float(durations_us.std()), + }, + 'high': { + 'count': len(high_durations_us), + 'min_us': float(high_durations_us.min()) if len(high_durations_us) > 0 else 0, + 'max_us': float(high_durations_us.max()) if len(high_durations_us) > 0 else 0, + 'mean_us': float(high_durations_us.mean()) if len(high_durations_us) > 0 else 0, + }, + 'low': { + 'count': len(low_durations_us), + 'min_us': float(low_durations_us.min()) if len(low_durations_us) > 0 else 0, + 'max_us': float(low_durations_us.max()) if len(low_durations_us) > 0 else 0, + 'mean_us': float(low_durations_us.mean()) if len(low_durations_us) > 0 else 0, + }, + 'durations_us': durations_us, + 'high_durations_us': high_durations_us, + 'low_durations_us': low_durations_us, + } + + +def detect_clusters(durations_us: np.ndarray, + tolerance: float = 0.15) -> List[Tuple[float, int]]: + """ + Detect clusters of similar durations. + + Returns list of (center_value, count) tuples sorted by count (most + common first). + """ + if len(durations_us) == 0: + return [] + + sorted_durations = np.sort(durations_us) + clusters = [] + current_cluster = [sorted_durations[0]] + + for dur in sorted_durations[1:]: + cluster_mean = np.mean(current_cluster) + if cluster_mean > 0 and abs(dur - cluster_mean) / cluster_mean <= tolerance: + current_cluster.append(dur) + elif cluster_mean == 0 and dur == 0: + current_cluster.append(dur) + else: + clusters.append((float(np.mean(current_cluster)), len(current_cluster))) + current_cluster = [dur] + + if current_cluster: + clusters.append((float(np.mean(current_cluster)), len(current_cluster))) + + clusters.sort(key=lambda x: -x[1]) + return clusters + + +def guess_protocol(analysis: dict) -> List[Tuple[str, float, str]]: + """ + Attempt to guess the protocol based on timing characteristics. + + Returns list of (protocol_name, confidence, details) tuples sorted + by confidence (highest first). + """ + guesses = [] + + all_min = analysis['all']['min_us'] + all_max = analysis['all']['max_us'] + + # Check for UART (look for consistent bit period) + for baud, period_us in COMMON_BAUD_RATES.items(): + if 0.7 < all_min / period_us < 1.3: + multiples = analysis['durations_us'] / period_us + rounded = np.round(multiples) + error = np.abs(multiples - rounded).mean() + if error < 0.15: + guesses.append(( + f'UART ({baud} baud)', + max(0.3, 0.9 - error * 3), + f'Bit period ~{period_us:.1f}us' + )) + + # Check for 1-Wire (reset pulse ~480us, data pulses 1-120us) + if all_min < 20 and all_max > 400: + has_reset = any(400 < d < 600 for d in analysis['low_durations_us']) + has_short = any(d < 20 for d in analysis['durations_us']) + if has_reset and has_short: + guesses.append(( + '1-Wire', + 0.6, + 'Detected reset pulses and short data pulses' + )) + + # Check for CAN bus + can_bitrates = {125000: 8.0, 250000: 4.0, 500000: 2.0, 1000000: 1.0} + for bitrate, period_us in can_bitrates.items(): + if 0.7 < all_min / period_us < 1.3: + multiples = analysis['durations_us'] / period_us + rounded = np.round(multiples) + error = np.abs(multiples - rounded).mean() + if error < 0.15: + guesses.append(( + f'CAN ({bitrate // 1000}kbps)', + max(0.3, 0.85 - error * 3), + f'Bit period ~{period_us:.1f}us' + )) + + guesses.sort(key=lambda x: -x[1]) + return guesses + + +def format_duration(us: float) -> str: + """Format a duration in microseconds with appropriate units.""" + if us < 1000: + return f"{us:.1f}us" + elif us < 1000000: + return f"{us/1000:.2f}ms" + else: + return f"{us/1e6:.3f}s" + + +def print_histogram(durations_us: np.ndarray, bins: int = 20, + title: str = "Duration Histogram"): + """Print a simple ASCII histogram of timing durations.""" + if len(durations_us) == 0: + print(f"{title}: No data") + return + + hist, edges = np.histogram(durations_us, bins=bins) + max_count = max(hist) + + print(f"\n{title}") + print("=" * 60) + + for i, count in enumerate(hist): + left = edges[i] + right = edges[i + 1] + bar_len = int(40 * count / max_count) if max_count > 0 else 0 + bar = "#" * bar_len + label = f"{format_duration(left):>10s}-{format_duration(right):>10s}" + print(f"{label} |{bar} ({count})") + + +def export_transitions_csv(times: np.ndarray, initial_state: int, + output_path, label: str = ""): + """Export transitions to CSV file.""" + from pathlib import Path + output_path = Path(output_path) + + with open(output_path, 'w') as f: + f.write("index,time_s,state,duration_us\n") + + for i, t in enumerate(times): + state = (initial_state + i) % 2 + if i < len(times) - 1: + dur = (times[i + 1] - t) * 1e6 + else: + dur = 0 + f.write(f"{i},{t:.9f},{state},{dur:.3f}\n") + + print(f"Exported {len(times)} transitions to {output_path}") + + +def parse_sample_rate(text: str) -> float: + """ + Extract sample rate from a string like '24 MHz' or '1 kHz'. + + Returns sample rate in Hz, or 0.0 if not found. + """ + match = re.search(r'(\d+(?:\.\d+)?)\s*(Hz|kHz|MHz|GHz)', text) + if match: + val = float(match.group(1)) + unit = match.group(2) + multiplier = {'Hz': 1, 'kHz': 1e3, 'MHz': 1e6, 'GHz': 1e9} + return val * multiplier.get(unit, 1) + return 0.0 diff --git a/skills/logicmso/analyze_protocol.py b/skills/logicmso/analyze_protocol.py index c4fbb80..acf1faf 100755 --- a/skills/logicmso/analyze_protocol.py +++ b/skills/logicmso/analyze_protocol.py @@ -9,7 +9,6 @@ import argparse import sys from pathlib import Path -from collections import Counter import numpy as np @@ -19,25 +18,19 @@ print("Error: saleae-mso-api not installed. Run: pip install saleae-mso-api") sys.exit(1) +# Add parent directory to path for shared module imports +sys.path.insert(0, str(Path(__file__).resolve().parent.parent)) +from common.signal_analysis import ( + analyze_timing as _analyze_timing, + detect_clusters, + export_transitions_csv, + format_duration, + guess_protocol, + print_histogram, +) -# Common baud rates and their bit periods in microseconds -COMMON_BAUD_RATES = { - 300: 3333.33, - 1200: 833.33, - 2400: 416.67, - 4800: 208.33, - 9600: 104.17, - 19200: 52.08, - 38400: 26.04, - 57600: 17.36, - 115200: 8.68, - 230400: 4.34, - 460800: 2.17, - 921600: 1.09, -} - - -def load_capture(file_path: Path) -> tuple: + +def load_capture(file_path: Path) -> dict: """Load a Saleae binary capture file and return transition data.""" saleae_file = read_file(file_path) @@ -58,176 +51,13 @@ def load_capture(file_path: Path) -> tuple: def analyze_timing(data: dict) -> dict: """Analyze timing characteristics of the signal.""" - times = data['times'] - - if len(times) < 2: - return {'error': 'Not enough transitions'} - - durations_s = np.diff(times) - durations_us = durations_s * 1e6 - durations_ms = durations_s * 1e3 - - # Separate HIGH and LOW durations - initial = data['initial_state'] - high_idx = 0 if initial == 0 else 1 - low_idx = 1 - high_idx - - high_durations_us = durations_us[high_idx::2] - low_durations_us = durations_us[low_idx::2] - - return { - 'total_transitions': len(times), - 'capture_duration_s': data['end_time'] - data['begin_time'], - 'signal_duration_s': times[-1] - times[0] if len(times) > 0 else 0, - 'initial_state': 'HIGH' if initial else 'LOW', - 'all': { - 'min_us': float(durations_us.min()), - 'max_us': float(durations_us.max()), - 'mean_us': float(durations_us.mean()), - 'std_us': float(durations_us.std()), - }, - 'high': { - 'count': len(high_durations_us), - 'min_us': float(high_durations_us.min()) if len(high_durations_us) > 0 else 0, - 'max_us': float(high_durations_us.max()) if len(high_durations_us) > 0 else 0, - 'mean_us': float(high_durations_us.mean()) if len(high_durations_us) > 0 else 0, - }, - 'low': { - 'count': len(low_durations_us), - 'min_us': float(low_durations_us.min()) if len(low_durations_us) > 0 else 0, - 'max_us': float(low_durations_us.max()) if len(low_durations_us) > 0 else 0, - 'mean_us': float(low_durations_us.mean()) if len(low_durations_us) > 0 else 0, - }, - 'durations_us': durations_us, - 'high_durations_us': high_durations_us, - 'low_durations_us': low_durations_us, - } - - -def detect_clusters(durations_us: np.ndarray, tolerance: float = 0.15) -> list: - """ - Detect clusters of similar durations. - - Returns list of (center_value, count) tuples. - """ - if len(durations_us) == 0: - return [] - - sorted_durations = np.sort(durations_us) - clusters = [] - current_cluster = [sorted_durations[0]] - - for dur in sorted_durations[1:]: - # Check if this duration is within tolerance of current cluster - cluster_mean = np.mean(current_cluster) - if abs(dur - cluster_mean) / cluster_mean <= tolerance: - current_cluster.append(dur) - else: - # Save current cluster and start new one - clusters.append((np.mean(current_cluster), len(current_cluster))) - current_cluster = [dur] - - # Don't forget the last cluster - if current_cluster: - clusters.append((np.mean(current_cluster), len(current_cluster))) - - # Sort by count (most common first) - clusters.sort(key=lambda x: -x[1]) - - return clusters - - -def guess_protocol(analysis: dict) -> list: - """ - Attempt to guess the protocol based on timing characteristics. - - Returns list of (protocol_name, confidence, details) tuples. - """ - guesses = [] - - all_min = analysis['all']['min_us'] - all_max = analysis['all']['max_us'] - high_clusters = detect_clusters(analysis['high_durations_us']) - low_clusters = detect_clusters(analysis['low_durations_us']) - - # Check for UART (look for consistent bit period) - for baud, period_us in COMMON_BAUD_RATES.items(): - # Check if minimum duration is close to a baud rate bit period - if 0.7 < all_min / period_us < 1.3: - # Check if durations are multiples of the bit period - multiples = analysis['durations_us'] / period_us - rounded = np.round(multiples) - error = np.abs(multiples - rounded).mean() - if error < 0.15: - guesses.append(( - f'UART ({baud} baud)', - max(0.3, 0.9 - error * 3), - f'Bit period ~{period_us:.1f}us' - )) - - # Check for 1-Wire (reset pulse ~480us, data pulses 1-120us) - if all_min < 20 and all_max > 400: - has_reset = any(400 < d < 600 for d in analysis['low_durations_us']) - has_short = any(d < 20 for d in analysis['durations_us']) - if has_reset and has_short: - guesses.append(( - '1-Wire', - 0.6, - 'Detected reset pulses and short data pulses' - )) - - # Sort by confidence - guesses.sort(key=lambda x: -x[1]) - - return guesses - - -def print_histogram(durations_us: np.ndarray, bins: int = 20, title: str = "Duration Histogram"): - """Print a simple ASCII histogram.""" - if len(durations_us) == 0: - print(f"{title}: No data") - return - - hist, edges = np.histogram(durations_us, bins=bins) - max_count = max(hist) - - print(f"\n{title}") - print("=" * 60) - - for i, count in enumerate(hist): - left = edges[i] - right = edges[i + 1] - bar_len = int(40 * count / max_count) if max_count > 0 else 0 - bar = "#" * bar_len - - # Choose appropriate unit - if right < 1000: - label = f"{left:7.1f}-{right:7.1f}us" - elif right < 1000000: - label = f"{left/1000:7.2f}-{right/1000:7.2f}ms" - else: - label = f"{left/1e6:7.3f}-{right/1e6:7.3f}s" - - print(f"{label} |{bar} ({count})") + duration = data['end_time'] - data['begin_time'] + return _analyze_timing(data['times'], data['initial_state'], duration) def export_csv(data: dict, output_path: Path): """Export transitions to CSV file.""" - times = data['times'] - initial = data['initial_state'] - - with open(output_path, 'w') as f: - f.write("index,time_s,state,duration_us\n") - - for i, t in enumerate(times): - state = (initial + i) % 2 - if i < len(times) - 1: - dur = (times[i + 1] - t) * 1e6 - else: - dur = 0 - f.write(f"{i},{t:.9f},{state},{dur:.3f}\n") - - print(f"Exported {len(times)} transitions to {output_path}") + export_transitions_csv(data['times'], data['initial_state'], output_path) def main(): @@ -279,13 +109,19 @@ def main(): print("Timing Summary") print("-" * 40) a = analysis['all'] - print(f"All durations: min={a['min_us']:.1f}us max={a['max_us']:.1f}us mean={a['mean_us']:.1f}us") + print(f"All durations: min={format_duration(a['min_us'])} " + f"max={format_duration(a['max_us'])} " + f"mean={format_duration(a['mean_us'])}") h = analysis['high'] - print(f"HIGH pulses ({h['count']}): min={h['min_us']:.1f}us max={h['max_us']:.1f}us mean={h['mean_us']:.1f}us") - - l = analysis['low'] - print(f"LOW gaps ({l['count']}): min={l['min_us']:.1f}us max={l['max_us']:.1f}us mean={l['mean_us']:.1f}us") + print(f"HIGH pulses ({h['count']}): min={format_duration(h['min_us'])} " + f"max={format_duration(h['max_us'])} " + f"mean={format_duration(h['mean_us'])}") + + lo = analysis['low'] + print(f"LOW gaps ({lo['count']}): min={format_duration(lo['min_us'])} " + f"max={format_duration(lo['max_us'])} " + f"mean={format_duration(lo['mean_us'])}") print() # Protocol guesses @@ -306,18 +142,12 @@ def main(): high_clusters = detect_clusters(analysis['high_durations_us']) print("HIGH pulse clusters:") for center, count in high_clusters[:5]: - if center < 1000: - print(f" ~{center:.1f}us ({count} occurrences)") - else: - print(f" ~{center/1000:.2f}ms ({count} occurrences)") + print(f" ~{format_duration(center)} ({count} occurrences)") low_clusters = detect_clusters(analysis['low_durations_us']) print("LOW gap clusters:") for center, count in low_clusters[:5]: - if center < 1000: - print(f" ~{center:.1f}us ({count} occurrences)") - else: - print(f" ~{center/1000:.2f}ms ({count} occurrences)") + print(f" ~{format_duration(center)} ({count} occurrences)") print() # Raw values @@ -329,10 +159,7 @@ def main(): for i in range(min(args.n, len(durations))): state = "HIGH" if (i + initial) % 2 == 0 else "LOW" dur = durations[i] - if dur < 1000: - print(f" [{i:3d}] {state}: {dur:.1f}us") - else: - print(f" [{i:3d}] {state}: {dur/1000:.2f}ms") + print(f" [{i:3d}] {state}: {format_duration(dur)}") print() # Histogram diff --git a/skills/sigrok/SKILL.md b/skills/sigrok/SKILL.md new file mode 100644 index 0000000..3e0648c --- /dev/null +++ b/skills/sigrok/SKILL.md @@ -0,0 +1,263 @@ +--- +name: sigrok +description: Analyze logic analyzer captures (.sr, CSV, VCD) using sigrok-cli and 131+ protocol decoders. Decode UART, SPI, I2C, CAN, JTAG, USB, 1-Wire, and many more protocols from any sigrok-compatible hardware. Use for CTF challenges, hardware reverse engineering, and protocol decoding. +--- + +# Sigrok Capture Analysis + +This skill enables analysis of captured digital signals from any sigrok-compatible logic analyzer. It wraps `sigrok-cli` for protocol decoding (131+ decoders) and provides custom timing analysis, cluster detection, and protocol identification. + +Sigrok supports 258+ devices from 58 vendors, including Saleae Logic clones, DSLogic, Kingst LA series, fx2lafw-based analyzers, and many more. + +## Prerequisites + +- `sigrok-cli` installed — **Do NOT blindly install.** First check if it's available: + ```bash + sigrok-cli --version + ``` + Only if that fails, install it: + - Arch Linux: `sudo pacman -S sigrok-cli` + - Ubuntu/Debian: `sudo apt install sigrok-cli` + - Fedora: `sudo dnf install sigrok-cli` + - macOS: `brew install sigrok-cli` + - Windows: Download from https://sigrok.org/wiki/Downloads + - If you get a missing DLL error: `winget install --id=Microsoft.VCRedist.2010.x64` + +- `numpy` Python package (for timing analysis): + ```bash + python3 -c "import numpy; print('numpy available')" + ``` + Only if that fails: `pip install numpy` + +## Supported File Formats + +| Format | Extension | Description | +|--------|-----------|-------------| +| Sigrok session | `.sr` | Native PulseView/sigrok session files | +| CSV | `.csv` | Comma-separated values (sigrok or generic) | +| VCD | `.vcd` | Value Change Dump (standard digital waveform format) | + +All formats are auto-detected by file extension. + +## Quick Reference + +### File Information + +```bash +# Show capture metadata (channels, sample rate, duration) +python3 skills/sigrok/analyze_capture.py capture.sr --show +``` + +### Timing Analysis + +```bash +# Basic timing analysis with protocol guessing +python3 skills/sigrok/analyze_capture.py capture.sr + +# Show detailed timing histogram +python3 skills/sigrok/analyze_capture.py capture.sr --histogram + +# Show detected timing clusters +python3 skills/sigrok/analyze_capture.py capture.sr --clusters + +# Analyze a specific channel +python3 skills/sigrok/analyze_capture.py capture.sr --channel D2 + +# Show raw transition values +python3 skills/sigrok/analyze_capture.py capture.sr --raw -n 50 + +# Export transitions to CSV +python3 skills/sigrok/analyze_capture.py capture.sr --export transitions.csv +``` + +### Protocol Decoding + +```bash +# Decode UART at 115200 baud +python3 skills/sigrok/analyze_capture.py capture.sr --decode uart:baudrate=115200 + +# Decode with annotation filtering (show only TX data) +python3 skills/sigrok/analyze_capture.py capture.sr --decode uart:baudrate=115200 --annotations uart=tx-data + +# Decode SPI +python3 skills/sigrok/analyze_capture.py capture.sr --decode spi:cpol=0:cpha=0 + +# Stacked decoders (I2C + EEPROM) +python3 skills/sigrok/analyze_capture.py capture.sr --decode i2c,eeprom24xx + +# CAN bus decoding +python3 skills/sigrok/analyze_capture.py capture.sr --decode can:bitrate=500000 + +# Extract binary data from decoder +python3 skills/sigrok/analyze_capture.py capture.sr --decode uart:baudrate=115200 --binary-decode uart=tx --binary-out data.bin + +# List all available decoders +python3 skills/sigrok/analyze_capture.py --list-decoders + +# Include sample numbers in output +python3 skills/sigrok/analyze_capture.py capture.sr --decode uart:baudrate=115200 --samplenum +``` + +### Direct sigrok-cli Usage + +For advanced use cases, invoke sigrok-cli directly: + +```bash +# Export .sr to CSV +sigrok-cli -i capture.sr -O csv > capture.csv + +# Decode with full output +sigrok-cli -i capture.sr -P uart:baudrate=115200 + +# Stacked decoders with annotation filter +sigrok-cli -i capture.sr -P i2c,eeprom24xx -A eeprom24xx + +# Binary output (raw decoded bytes) +sigrok-cli -i capture.sr -P uart:baudrate=115200 -B uart=tx > uart_data.bin + +# Show file details +sigrok-cli -i capture.sr --show + +# List all decoders with details +sigrok-cli -L +``` + +## Common Protocol Patterns + +### UART (Asynchronous Serial) +- **Idle state**: HIGH +- **Start bit**: LOW (1 bit period) +- **Data bits**: 8 bits, LSB first +- **Stop bit**: HIGH (1-2 bit periods) +- **Common baud rates**: 9600, 19200, 38400, 57600, 115200 +- **Bit period calculation**: `1/baud_rate` seconds +- **Decoder**: `uart:baudrate=115200` (adjust baud rate) +- **Channel mapping**: `uart:baudrate=115200:tx=D0:rx=D1` + +### SPI (Serial Peripheral Interface) +- **4 signals**: SCLK (clock), MOSI (master out), MISO (master in), CS (chip select) +- **Clock polarity (CPOL)**: Idle clock state (0=LOW, 1=HIGH) +- **Clock phase (CPHA)**: Sample edge (0=leading, 1=trailing) +- **Data**: Sampled on clock edges, typically 8 bits per transaction +- **Decoder**: `spi:cpol=0:cpha=0:clk=D0:mosi=D1:miso=D2:cs=D3` + +### I2C (Inter-Integrated Circuit) +- **2 signals**: SDA (data), SCL (clock) +- **Idle state**: Both HIGH (pulled up) +- **Start condition**: SDA falls while SCL is HIGH +- **Stop condition**: SDA rises while SCL is HIGH +- **Data**: 8 bits + ACK/NACK, MSB first +- **Address**: 7-bit (first byte after START) +- **Decoder**: `i2c:scl=D0:sda=D1` +- **Stacked**: `i2c,eeprom24xx` to decode EEPROM commands + +### CAN (Controller Area Network) +- **Single differential bus**: CANH/CANL (or single-ended capture) +- **Common bitrates**: 125kbps, 250kbps, 500kbps, 1Mbps +- **Frame**: SOF, arbitration ID, control, data, CRC, ACK, EOF +- **Decoder**: `can:bitrate=500000` + +### 1-Wire +- **Single signal**: DQ (data/power) +- **Idle state**: HIGH (pulled up) +- **Reset pulse**: Master pulls LOW for 480us minimum +- **Presence pulse**: Slave responds LOW for 60-240us +- **Write 0**: LOW for 60-120us +- **Write 1**: LOW for 1-15us, then release +- **Decoder**: `onewire_link` (link layer), `onewire_network` (network layer) + +### JTAG +- **4-5 signals**: TCK, TMS, TDI, TDO, (TRST) +- **Decoder**: `jtag:tck=D0:tms=D1:tdi=D2:tdo=D3` +- **Stacked**: `jtag,jtag_stm32` for STM32-specific decoding + +### USB +- **2 signals**: D+, D- +- **Decoder**: `usb_signalling:dp=D0:dm=D1` +- **Stacked**: `usb_signalling,usb_packet,usb_request` for full USB decode + +## Analysis Workflow + +### Step 1: Get File Overview +```bash +python3 skills/sigrok/analyze_capture.py capture.sr --show +``` +Check channels, sample rate, and duration. + +### Step 2: Run Timing Analysis +```bash +python3 skills/sigrok/analyze_capture.py capture.sr --clusters --histogram +``` +Look at timing distributions to identify the protocol. + +### Step 3: Try Protocol Decoders +Based on timing analysis, try appropriate decoders: +```bash +# If UART is guessed +python3 skills/sigrok/analyze_capture.py capture.sr --decode uart:baudrate=115200 + +# If SPI-like timing +python3 skills/sigrok/analyze_capture.py capture.sr --decode spi:cpol=0:cpha=0 + +# If I2C-like timing +python3 skills/sigrok/analyze_capture.py capture.sr --decode i2c +``` + +### Step 4: Extract Data +```bash +# Get decoded text +python3 skills/sigrok/analyze_capture.py capture.sr --decode uart:baudrate=115200 --annotations uart=tx-data + +# Extract raw bytes +python3 skills/sigrok/analyze_capture.py capture.sr --decode uart:baudrate=115200 --binary-decode uart=tx --binary-out decoded.bin + +# Export timing data for external tools +python3 skills/sigrok/analyze_capture.py capture.sr --export timing.csv +``` + +## CTF Tips + +1. **Unknown protocol**: Start with `--clusters --histogram` to see timing distribution +2. **Try the helper script first**: `python3 skills/sigrok/analyze_capture.py capture.sr` gives automatic protocol guesses +3. **Multiple channels**: Use `--show` to see available channels, then `--channel D2` to analyze each +4. **Stacked decoders**: Use `i2c,eeprom24xx` or `spi,spiflash` to decode higher-level protocols +5. **Inverted signals**: Some captures have inverted logic — check initial state +6. **Binary extraction**: Use `--binary-decode` to get raw bytes for further analysis with `xxd` or `binwalk` +7. **Custom baud rate**: If standard rates don't match, calculate from timing clusters: `baud = 1e6 / cluster_us` +8. **List decoders**: Run `--list-decoders` to see all 131+ available decoders + +## Troubleshooting + +### "sigrok-cli not found" +Verify installation: +```bash +sigrok-cli --version +``` +Install if missing (see Prerequisites section above). + +### Missing DLL error on Windows +sigrok-cli depends on the Visual C++ 2010 Redistributable. Install it with: +```bash +winget install --id=Microsoft.VCRedist.2010.x64 +``` + +### "No transitions detected" +- Signal may be constant (stuck high/low) +- Check if the correct channel is selected with `--channel` +- Use `--show` to verify file has data + +### Decoder produces no output +- Check channel mapping: `uart:baudrate=115200:tx=D0` +- Verify baud rate/bitrate matches the signal +- Try without annotation filter first +- Use `--samplenum` to verify decoder is processing samples + +### Wrong timing values +- Check if file uses correct timescale (especially VCD files) +- Verify sample rate shown by `--show` +- For CSV files, ensure the time column is in seconds + +### "Unsupported format" +- Supported: `.sr`, `.csv`, `.vcd` +- For Saleae `.bin` files, use the `logicmso` skill instead +- Convert other formats to CSV or VCD first diff --git a/skills/sigrok/analyze_capture.py b/skills/sigrok/analyze_capture.py new file mode 100644 index 0000000..9c04f90 --- /dev/null +++ b/skills/sigrok/analyze_capture.py @@ -0,0 +1,1092 @@ +#!/usr/bin/env python3 +""" +Capture Analyzer for sigrok-compatible logic analyzer files. + +Analyzes digital signal captures (.sr, .csv, .vcd) to identify timing +patterns, decode protocols using sigrok-cli's 131+ decoders, and help +with hardware reverse engineering and CTF challenges. +""" + +import argparse +import re +import subprocess +import sys +import zipfile +from dataclasses import dataclass, field +from pathlib import Path +from typing import Dict, List, Optional, Tuple + +import numpy as np + +# Add parent directory to path for shared module imports +sys.path.insert(0, str(Path(__file__).resolve().parent.parent)) +from common.signal_analysis import ( + analyze_timing as _analyze_timing, + detect_clusters, + export_transitions_csv, + format_duration, + guess_protocol, + parse_sample_rate, + print_histogram, +) + + +# ============================================================================ +# CONSTANTS +# ============================================================================ + +SUPPORTED_EXTENSIONS = {'.sr', '.csv', '.vcd'} + +VCD_TIMESCALE_UNITS = { + 's': 1.0, + 'ms': 1e-3, + 'us': 1e-6, + 'ns': 1e-9, + 'ps': 1e-12, + 'fs': 1e-15, +} + + +# ============================================================================ +# DATA STRUCTURES +# ============================================================================ + +@dataclass +class CaptureData: + """Normalized capture data from any supported format.""" + times: np.ndarray # Transition timestamps in seconds + initial_state: int # Starting logic level (0 or 1) + sample_rate: float # Hz (estimated if not available) + channel_name: str # Name of the analyzed channel + file_format: str # 'sr', 'csv', or 'vcd' + duration: float = 0.0 # Total capture duration in seconds + total_samples: int = 0 # Total sample count (if known) + available_channels: List[str] = field(default_factory=list) + + +# ============================================================================ +# SIGROK-CLI INTERACTION +# ============================================================================ + +def check_sigrok_cli() -> Tuple[bool, str]: + """Check if sigrok-cli is installed and return version info.""" + try: + result = subprocess.run( + ['sigrok-cli', '--version'], + capture_output=True, + text=True, + timeout=10 + ) + if result.returncode == 0: + version = result.stdout.strip().split('\n')[0] + return True, version + return False, "sigrok-cli returned non-zero exit code" + except FileNotFoundError: + return False, "sigrok-cli not found in PATH" + except subprocess.TimeoutExpired: + return False, "sigrok-cli timed out" + except Exception as e: + return False, str(e) + + +def run_sigrok_cli(args: List[str], timeout: int = 60, + binary: bool = False) -> Tuple[int, str, str]: + """ + Run sigrok-cli with the given arguments. + + Returns (returncode, stdout, stderr). + """ + cmd = ['sigrok-cli'] + args + try: + result = subprocess.run( + cmd, + capture_output=True, + text=not binary, + timeout=timeout + ) + stdout = result.stdout if not binary else result.stdout.decode('latin-1') + stderr = result.stderr if not binary else result.stderr.decode('latin-1') + return result.returncode, stdout, stderr + except FileNotFoundError: + return 1, '', 'sigrok-cli not found' + except subprocess.TimeoutExpired: + return 1, '', 'sigrok-cli timed out' + + +def show_file_info(file_path: Path) -> Optional[dict]: + """ + Show capture file metadata using sigrok-cli --show. + + Returns dict with channels, sample_rate, duration, etc. + """ + rc, stdout, stderr = run_sigrok_cli(['-i', str(file_path), '--show']) + if rc != 0: + return None + + info = { + 'channels': [], + 'sample_rate': 0.0, + 'total_samples': 0, + 'duration': 0.0, + 'raw': stdout, + } + + for line in stdout.split('\n'): + line = line.strip() + if 'samplerate' in line.lower(): + info['sample_rate'] = parse_sample_rate(line) + elif 'total samples' in line.lower(): + match = re.search(r'(\d+)', line) + if match: + info['total_samples'] = int(match.group(1)) + elif line.startswith('D') or line.startswith('CH'): + info['channels'].append(line.split(':')[0].strip()) + + if info['sample_rate'] > 0 and info['total_samples'] > 0: + info['duration'] = info['total_samples'] / info['sample_rate'] + + return info + + +def list_decoders() -> List[Tuple[str, str]]: + """ + List available protocol decoders. + + Returns list of (name, description) tuples. + """ + rc, stdout, stderr = run_sigrok_cli(['-L']) + if rc != 0: + return [] + + decoders = [] + in_decoders = False + + for line in stdout.split('\n'): + if 'protocol decoder' in line.lower() or 'Supported protocol decoders' in line: + in_decoders = True + continue + if in_decoders: + if line.strip() == '' or line.startswith('Supported '): + if decoders: + break + continue + match = re.match(r'\s+(\S+)\s*-\s*(.*)', line) + if match: + decoders.append((match.group(1).strip(), match.group(2).strip())) + + return decoders + + +def decode_protocol(file_path: Path, decoder_spec: str, + annotations: str = None, + sample_numbers: bool = False) -> dict: + """ + Run sigrok-cli protocol decoder on a capture file. + + Args: + file_path: Path to .sr, .csv, or .vcd file + decoder_spec: Decoder specification, e.g. "uart:baudrate=115200" + or "i2c,eeprom24xx" for stacked decoders + annotations: Annotation filter, e.g. "uart=tx-data" + sample_numbers: Include sample numbers in output + + Returns dict with 'success', 'output', and 'error' keys. + """ + args = ['-i', str(file_path), '-P', decoder_spec] + + if annotations: + args.extend(['-A', annotations]) + if sample_numbers: + args.append('--protocol-decoder-samplenum') + + rc, stdout, stderr = run_sigrok_cli(args) + + return { + 'success': rc == 0, + 'output': stdout, + 'error': stderr if rc != 0 else None, + 'decoder': decoder_spec, + } + + +def extract_binary(file_path: Path, decoder_spec: str, + binary_spec: str, output_path: Path) -> dict: + """ + Extract binary data from protocol decoder to a file. + + Args: + file_path: Input capture file + decoder_spec: e.g. "uart:baudrate=115200" + binary_spec: e.g. "uart=tx" + output_path: Where to write binary output + + Returns dict with 'success' and 'error' keys. + """ + cmd = [ + 'sigrok-cli', '-i', str(file_path), + '-P', decoder_spec, + '-B', binary_spec, + ] + try: + with open(output_path, 'wb') as f: + result = subprocess.run( + cmd, + stdout=f, + stderr=subprocess.PIPE, + text=False, + timeout=60 + ) + if result.returncode == 0: + size = output_path.stat().st_size + return {'success': True, 'size': size, 'error': None} + else: + return { + 'success': False, + 'size': 0, + 'error': result.stderr.decode('utf-8', errors='replace'), + } + except Exception as e: + return {'success': False, 'size': 0, 'error': str(e)} + + +# ============================================================================ +# FILE LOADING +# ============================================================================ + +def load_capture(file_path: Path, channel: str = None) -> CaptureData: + """ + Load a capture file and return normalized transition data. + + Supports .sr (native parsing, with sigrok-cli fallback), .csv, and .vcd. + """ + if not file_path.exists(): + raise FileNotFoundError(f"File not found: {file_path}") + + suffix = file_path.suffix.lower() + if suffix not in SUPPORTED_EXTENSIONS: + raise ValueError( + f"Unsupported format: {suffix} " + f"(supported: {', '.join(SUPPORTED_EXTENSIONS)})" + ) + + if suffix == '.sr': + return _load_sr(file_path, channel) + elif suffix == '.csv': + return _load_csv(file_path, channel) + elif suffix == '.vcd': + return _load_vcd(file_path, channel) + + +def _load_sr(file_path: Path, channel: str = None) -> CaptureData: + """Load .sr file, trying native parsing first, falling back to sigrok-cli.""" + # Try native parsing first (no sigrok-cli dependency) + try: + return _load_sr_native(file_path, channel) + except Exception as native_err: + pass + + # Fall back to sigrok-cli CSV export + available, _ = check_sigrok_cli() + if not available: + raise RuntimeError( + f"Native .sr parsing failed ({native_err}), " + f"and sigrok-cli is not installed. " + f"Install sigrok-cli or check the .sr file." + ) + + info = show_file_info(file_path) + args = ['-i', str(file_path), '-O', 'csv'] + if channel: + args.extend(['-C', channel]) + + rc, stdout, stderr = run_sigrok_cli(args, timeout=120) + if rc != 0: + raise RuntimeError(f"sigrok-cli export failed: {stderr}") + + return _parse_sigrok_csv( + stdout, + channel=channel, + file_format='sr', + sample_rate=info['sample_rate'] if info else 0.0, + available_channels=info['channels'] if info else [], + ) + + +def _load_sr_native(file_path: Path, channel: str = None) -> CaptureData: + """Parse .sr file natively without sigrok-cli.""" + with zipfile.ZipFile(file_path, 'r') as zf: + meta = _parse_sr_metadata(zf) + + sample_rate = meta['samplerate'] + if sample_rate <= 0: + raise ValueError("No sample rate found in .sr metadata") + + unitsize = meta['unitsize'] + total_probes = meta['total_probes'] + capturefile = meta['capturefile'] + probe_names = meta['probes'] + + available_channels = [ + probe_names.get(i, f"D{i}") for i in range(total_probes) + ] + + # Determine which channel to extract + channel_idx, channel_name = _resolve_sr_channel( + channel, probe_names, total_probes + ) + + # Read logic data chunks + packed_data = _read_sr_logic_data(zf, capturefile) + + if len(packed_data) == 0: + raise ValueError("No logic data found in .sr file") + + # Extract transitions for the selected channel + times, initial_state = _extract_channel_transitions( + packed_data, channel_idx, unitsize, sample_rate + ) + + total_samples = len(packed_data) // unitsize + duration = total_samples / sample_rate + + return CaptureData( + times=times, + initial_state=initial_state, + sample_rate=sample_rate, + channel_name=channel_name, + file_format='sr', + duration=duration, + total_samples=total_samples, + available_channels=available_channels, + ) + + +def _parse_sr_metadata(zf: zipfile.ZipFile) -> Dict: + """Parse the metadata file from a sigrok .sr ZIP archive.""" + try: + raw = zf.read('metadata').decode('utf-8') + except KeyError: + raise ValueError("No 'metadata' file found in .sr archive") + + meta = { + 'samplerate': 0.0, + 'total_probes': 0, + 'unitsize': 1, + 'capturefile': 'logic-1', + 'probes': {}, # index -> name + } + + for line in raw.split('\n'): + line = line.strip() + if not line or line.startswith('[') or line.startswith(';'): + continue + + if '=' not in line: + continue + + key, _, value = line.partition('=') + key = key.strip().lower() + value = value.strip() + + if key == 'samplerate': + # Can be "24000000" or "24 MHz" + rate = parse_sample_rate(value) + if rate > 0: + meta['samplerate'] = rate + else: + try: + meta['samplerate'] = float(value) + except ValueError: + pass + elif key == 'total probes': + try: + meta['total_probes'] = int(value) + except ValueError: + pass + elif key == 'unitsize': + try: + meta['unitsize'] = int(value) + except ValueError: + pass + elif key == 'capturefile': + meta['capturefile'] = value + elif re.match(r'^probe(\d+)$', key): + idx = int(re.match(r'^probe(\d+)$', key).group(1)) - 1 # 0-based + meta['probes'][idx] = value + + if meta['total_probes'] == 0 and meta['probes']: + meta['total_probes'] = max(meta['probes'].keys()) + 1 + + return meta + + +def _read_sr_logic_data(zf: zipfile.ZipFile, capturefile: str) -> bytes: + """Read and concatenate logic data chunks from .sr archive.""" + names = sorted(zf.namelist()) + chunks = [] + + # Look for chunked files: logic-1-1, logic-1-2, ... or just logic-1 + pattern = re.compile(re.escape(capturefile) + r'(-\d+)?$') + data_files = sorted( + [n for n in names if pattern.match(n)], + key=lambda n: ( + int(re.search(r'-(\d+)$', n).group(1)) + if re.search(r'-(\d+)$', n) and n != capturefile + else 0 + ), + ) + + if not data_files: + raise ValueError( + f"No logic data files matching '{capturefile}' in .sr archive" + ) + + for name in data_files: + chunks.append(zf.read(name)) + + return b''.join(chunks) + + +def _resolve_sr_channel( + channel: Optional[str], + probe_names: Dict[int, str], + total_probes: int, +) -> Tuple[int, str]: + """Resolve a channel specifier to (index, name).""" + if channel is None: + idx = 0 + return idx, probe_names.get(idx, 'D0') + + # Try exact match on probe names + for i, name in probe_names.items(): + if name == channel: + return i, name + + # Try D0, D1, ... format + match = re.match(r'^D(\d+)$', channel, re.IGNORECASE) + if match: + idx = int(match.group(1)) + if idx < total_probes: + return idx, probe_names.get(idx, f'D{idx}') + + # Try numeric index + try: + idx = int(channel) + if 0 <= idx < total_probes: + return idx, probe_names.get(idx, f'D{idx}') + except ValueError: + pass + + available = [probe_names.get(i, f'D{i}') for i in range(total_probes)] + raise ValueError( + f"Channel '{channel}' not found. Available: {available}" + ) + + +def _extract_channel_transitions( + packed_data: bytes, + channel_idx: int, + unitsize: int, + sample_rate: float, +) -> Tuple[np.ndarray, int]: + """ + Extract transition timestamps for a single channel from packed binary data. + + Each sample is `unitsize` bytes. Channel N = bit N (LSB-first within bytes). + Returns (transition_times_array, initial_state). + """ + # Determine which byte and bit within that byte + byte_offset = channel_idx // 8 + bit_mask = 1 << (channel_idx % 8) + + if byte_offset >= unitsize: + raise ValueError( + f"Channel index {channel_idx} exceeds unitsize {unitsize} " + f"({unitsize * 8} bits available)" + ) + + total_samples = len(packed_data) // unitsize + + # Convert to numpy array for efficient processing + raw = np.frombuffer(packed_data, dtype=np.uint8) + + # Extract the relevant byte for each sample (stride = unitsize) + channel_bytes = raw[byte_offset::unitsize][:total_samples] + + # Extract the single bit + channel_bits = (channel_bytes & bit_mask).astype(bool).astype(np.uint8) + + initial_state = int(channel_bits[0]) + + # Find transitions (where consecutive samples differ) + transitions = np.where(np.diff(channel_bits) != 0)[0] + 1 + + if len(transitions) == 0: + raise ValueError( + f"No transitions detected (signal constant at {initial_state})" + ) + + # Convert sample indices to timestamps + transition_times = transitions.astype(np.float64) / sample_rate + + return transition_times, initial_state + + +def _load_csv(file_path: Path, channel: str = None) -> CaptureData: + """Load a CSV file with time and signal columns.""" + with open(file_path, 'r') as f: + content = f.read() + return _parse_sigrok_csv(content, channel=channel, file_format='csv') + + +def _parse_sigrok_csv(content: str, channel: str = None, + file_format: str = 'csv', + sample_rate: float = 0.0, + available_channels: List[str] = None) -> CaptureData: + """ + Parse sigrok-style CSV content into CaptureData. + + Sigrok CSV format has a header comment section followed by: + Time [s],D0,D1,...,Dn + 0.000000000,0,1,...,0 + """ + if available_channels is None: + available_channels = [] + + lines = content.strip().split('\n') + + # Skip comment lines (sigrok CSV starts with ;) + data_lines = [] + header = None + for line in lines: + line = line.strip() + if not line: + continue + if line.startswith(';'): + # Extract sample rate from comments if present + if 'samplerate' in line.lower(): + rate = parse_sample_rate(line) + if rate > 0: + sample_rate = rate + continue + if header is None: + header = line + continue + data_lines.append(line) + + if header is None or not data_lines: + raise ValueError("CSV file is empty or has no data rows") + + # Parse header to find columns + columns = [c.strip() for c in header.split(',')] + + # Find the time column + time_col = None + for i, col in enumerate(columns): + if 'time' in col.lower(): + time_col = i + break + if time_col is None: + time_col = 0 # Assume first column is time + + # Find the channel column + signal_cols = [i for i in range(len(columns)) if i != time_col] + if not signal_cols: + raise ValueError("No signal columns found in CSV") + + # Determine which channel to use + channel_col = None + channel_name = None + + if channel: + # User specified a channel name or index + for i, col in enumerate(columns): + if col.strip() == channel or col.strip() == f'D{channel}': + channel_col = i + channel_name = col.strip() + break + if channel_col is None: + try: + idx = int(channel) + if idx < len(signal_cols): + channel_col = signal_cols[idx] + channel_name = columns[channel_col].strip() + except ValueError: + pass + if channel_col is None: + raise ValueError( + f"Channel '{channel}' not found. " + f"Available: {[columns[i].strip() for i in signal_cols]}" + ) + else: + # Default to first signal channel + channel_col = signal_cols[0] + channel_name = columns[channel_col].strip() + + if not available_channels: + available_channels = [columns[i].strip() for i in signal_cols] + + # Parse data rows and extract transitions + times = [] + states = [] + + for line in data_lines: + parts = line.split(',') + if len(parts) <= max(time_col, channel_col): + continue + try: + t = float(parts[time_col].strip()) + s = int(parts[channel_col].strip()) + times.append(t) + states.append(s) + except (ValueError, IndexError): + continue + + if not times: + raise ValueError("No valid data rows found in CSV") + + # Extract only transition points (where signal changes) + initial_state = states[0] + transition_times = [] + + for i in range(1, len(times)): + if states[i] != states[i - 1]: + transition_times.append(times[i]) + + if not transition_times: + raise ValueError( + "No transitions detected on channel " + f"'{channel_name}' (signal constant at {initial_state})" + ) + + # Estimate sample rate from data if not known (use median for robustness) + if sample_rate == 0.0 and len(times) > 2: + intervals = np.diff(times[:min(1000, len(times))]) + dt = float(np.median(intervals)) + if dt > 0: + sample_rate = 1.0 / dt + elif sample_rate == 0.0 and len(times) > 1: + dt = times[1] - times[0] + if dt > 0: + sample_rate = 1.0 / dt + + duration = times[-1] - times[0] if len(times) > 1 else 0.0 + + return CaptureData( + times=np.array(transition_times), + initial_state=initial_state, + sample_rate=sample_rate, + channel_name=channel_name, + file_format=file_format, + duration=duration, + total_samples=len(times), + available_channels=available_channels, + ) + + +def _load_vcd(file_path: Path, channel: str = None) -> CaptureData: + """Parse a VCD (Value Change Dump) file.""" + with open(file_path, 'r') as f: + content = f.read() + + # Parse header + timescale = 1e-9 # Default: 1ns + variables = {} # var_id -> name + + lines = content.split('\n') + data_start = 0 + + for i, line in enumerate(lines): + line = line.strip() + + if '$timescale' in line: + # Support both integer and float values (e.g. 1ns, 100ps, 1.0ns) + match = re.search(r'(\d+(?:\.\d+)?)\s*(s|ms|us|ns|ps|fs)', line) + if match: + val = float(match.group(1)) + unit = match.group(2) + timescale = val * VCD_TIMESCALE_UNITS.get(unit, 1e-9) + + elif '$var' in line: + # Format: $var wire 1 ! D0 $end + match = re.match( + r'\$var\s+\w+\s+\d+\s+(\S+)\s+(\S+)', line + ) + if match: + var_id = match.group(1) + var_name = match.group(2) + variables[var_id] = var_name + + elif '$enddefinitions' in line: + data_start = i + 1 + break + + if not variables: + raise ValueError("No variables found in VCD file") + + available_channels = list(variables.values()) + + # Determine target variable + target_id = None + target_name = None + + if channel: + # Find by name or ID + for vid, vname in variables.items(): + if vname == channel or vid == channel: + target_id = vid + target_name = vname + break + if target_id is None: + raise ValueError( + f"Channel '{channel}' not found. " + f"Available: {available_channels}" + ) + else: + # Default to first variable + target_id = list(variables.keys())[0] + target_name = variables[target_id] + + # Parse data section + current_time = 0.0 + transitions = [] # (time_seconds, state) + + for line in lines[data_start:]: + line = line.strip() + if not line: + continue + + if line.startswith('#'): + try: + current_time = int(line[1:]) * timescale + except ValueError: + continue + + elif line.startswith('0') or line.startswith('1'): + # Single-bit value change: 0! or 1! + state = int(line[0]) + var_id = line[1:] + if var_id == target_id: + transitions.append((current_time, state)) + + elif line.startswith('b'): + # Multi-bit value: bXXXX var_id + parts = line.split() + if len(parts) == 2 and parts[1] == target_id: + try: + state = int(parts[0][1:], 2) & 1 # Take LSB + transitions.append((current_time, state)) + except ValueError: + continue + + if not transitions: + raise ValueError( + f"No transitions found for channel '{target_name}'" + ) + + initial_state = transitions[0][1] + + # Filter to only actual transitions (state changes) + transition_times = [] + prev_state = initial_state + for t, s in transitions[1:]: + if s != prev_state: + transition_times.append(t) + prev_state = s + + if not transition_times: + raise ValueError( + f"No state changes on channel '{target_name}' " + f"(constant at {initial_state})" + ) + + duration = transitions[-1][0] - transitions[0][0] + + return CaptureData( + times=np.array(transition_times), + initial_state=initial_state, + sample_rate=1.0 / timescale, + channel_name=target_name, + file_format='vcd', + duration=duration, + total_samples=len(transitions), + available_channels=available_channels, + ) + + +# ============================================================================ +# ANALYSIS WRAPPERS +# ============================================================================ + +def analyze_timing(data: CaptureData) -> dict: + """Analyze timing characteristics of the signal.""" + return _analyze_timing(data.times, data.initial_state, data.duration) + + +def export_csv(data: CaptureData, output_path: Path): + """Export transitions to CSV file.""" + export_transitions_csv(data.times, data.initial_state, output_path) + + +# ============================================================================ +# CLI +# ============================================================================ + +def main(): + parser = argparse.ArgumentParser( + description="Analyze logic analyzer captures using sigrok", + epilog=( + "Examples:\n" + " %(prog)s capture.sr\n" + " %(prog)s capture.sr --histogram --clusters\n" + " %(prog)s capture.sr --decode uart:baudrate=115200\n" + " %(prog)s capture.sr --decode i2c,eeprom24xx\n" + " %(prog)s capture.sr --export transitions.csv\n" + " %(prog)s --list-decoders\n" + ), + formatter_class=argparse.RawDescriptionHelpFormatter, + ) + parser.add_argument("file", type=Path, nargs='?', + help="Capture file (.sr, .csv, .vcd)") + + # Timing analysis options + timing = parser.add_argument_group('timing analysis') + timing.add_argument("--histogram", action="store_true", + help="Show timing histogram") + timing.add_argument("--bins", type=int, default=20, + help="Number of histogram bins (default: 20)") + timing.add_argument("--clusters", action="store_true", + help="Show detected timing clusters") + timing.add_argument("--raw", action="store_true", + help="Show raw transition durations") + timing.add_argument("-n", type=int, default=20, + help="Number of raw values to show (default: 20)") + + # Channel selection + parser.add_argument("--channel", "-C", type=str, default=None, + help="Channel to analyze (name or index, default: first)") + + # Export options + export = parser.add_argument_group('export') + export.add_argument("--export", type=Path, metavar="CSV", + help="Export transitions to CSV file") + export.add_argument("--binary-out", type=Path, metavar="FILE", + help="Extract binary data to file (use with --decode)") + + # Protocol decoding + decoding = parser.add_argument_group('protocol decoding') + decoding.add_argument("--decode", "-P", type=str, metavar="DECODER", + help="Protocol decoder spec (e.g., uart:baudrate=115200)") + decoding.add_argument("--annotations", "-A", type=str, metavar="FILTER", + help="Filter decoder annotations (e.g., uart=tx-data)") + decoding.add_argument("--binary-decode", "-B", type=str, metavar="SPEC", + help="Binary output spec (e.g., uart=tx)") + decoding.add_argument("--samplenum", action="store_true", + help="Include sample numbers in decoder output") + + # Info commands + info = parser.add_argument_group('information') + info.add_argument("--list-decoders", action="store_true", + help="List available protocol decoders") + info.add_argument("--show", action="store_true", + help="Show capture file metadata") + + args = parser.parse_args() + + # Handle --list-decoders (no file required) + if args.list_decoders: + available, version = check_sigrok_cli() + if not available: + print(f"Error: {version}") + _print_install_help() + sys.exit(1) + + decoders = list_decoders() + if decoders: + print(f"Available Protocol Decoders ({len(decoders)}):") + print("-" * 60) + for name, desc in decoders: + print(f" {name:25s} {desc}") + else: + print("No decoders found (check sigrok-cli installation)") + sys.exit(0) + + # All other commands require a file + if args.file is None: + parser.error("the following arguments are required: file") + + if not args.file.exists(): + print(f"Error: File not found: {args.file}") + sys.exit(1) + + # Commands that require sigrok-cli: --show, --decode + needs_sigrok = args.show or args.decode + + if needs_sigrok: + available, version = check_sigrok_cli() + if not available: + print(f"Error: {version}") + _print_install_help() + sys.exit(1) + + # Handle --show + if args.show: + info = show_file_info(args.file) + if info: + print(f"File: {args.file}") + print(info['raw']) + else: + print(f"Error: Could not read file info for {args.file}") + sys.exit(1) + sys.exit(0) + + # Handle protocol decoding + if args.decode: + # Binary extraction + if args.binary_out and args.binary_decode: + result = extract_binary( + args.file, args.decode, args.binary_decode, args.binary_out + ) + if result['success']: + print(f"Extracted {result['size']} bytes to {args.binary_out}") + else: + print(f"Error: {result['error']}") + sys.exit(1) + sys.exit(0) + + # Annotation decoding + result = decode_protocol( + args.file, args.decode, + annotations=args.annotations, + sample_numbers=args.samplenum, + ) + if result['success']: + if result['output']: + print(result['output'], end='') + else: + print("No decoder output (check decoder spec and channel mapping)") + else: + print(f"Error decoding: {result['error']}") + sys.exit(1) + + # If no timing analysis flags, stop here + if not (args.histogram or args.clusters or args.raw or args.export): + sys.exit(0) + + # Load capture for timing analysis + try: + data = load_capture(args.file, channel=args.channel) + except Exception as e: + print(f"Error loading file: {e}") + sys.exit(1) + + analysis = analyze_timing(data) + + if 'error' in analysis: + print(f"Error: {analysis['error']}") + sys.exit(1) + + # Print basic info + print(f"File: {args.file}") + print(f"Format: {data.file_format}") + print(f"Channel: {data.channel_name}") + if data.sample_rate > 0: + if data.sample_rate >= 1e6: + print(f"Sample rate: {data.sample_rate/1e6:.1f} MHz") + elif data.sample_rate >= 1e3: + print(f"Sample rate: {data.sample_rate/1e3:.1f} kHz") + else: + print(f"Sample rate: {data.sample_rate:.1f} Hz") + print(f"Capture duration: {analysis['capture_duration_s']:.3f}s") + print(f"Signal duration: {analysis['signal_duration_s']:.3f}s") + print(f"Initial state: {analysis['initial_state']}") + print(f"Total transitions: {analysis['total_transitions']}") + if data.available_channels: + print(f"Available channels: {', '.join(data.available_channels)}") + print() + + # Timing summary + print("Timing Summary") + print("-" * 40) + a = analysis['all'] + print(f"All durations: min={format_duration(a['min_us'])} " + f"max={format_duration(a['max_us'])} " + f"mean={format_duration(a['mean_us'])}") + + h = analysis['high'] + print(f"HIGH pulses ({h['count']}): min={format_duration(h['min_us'])} " + f"max={format_duration(h['max_us'])} " + f"mean={format_duration(h['mean_us'])}") + + lo = analysis['low'] + print(f"LOW gaps ({lo['count']}): min={format_duration(lo['min_us'])} " + f"max={format_duration(lo['max_us'])} " + f"mean={format_duration(lo['mean_us'])}") + print() + + # Protocol guesses + guesses = guess_protocol(analysis) + if guesses: + print("Protocol Guesses") + print("-" * 40) + for name, confidence, details in guesses: + print(f" {name} ({confidence*100:.0f}% confidence)") + print(f" {details}") + print() + + # Clusters + if args.clusters: + print("Detected Timing Clusters") + print("-" * 40) + + high_clusters = detect_clusters(analysis['high_durations_us']) + print("HIGH pulse clusters:") + for center, count in high_clusters[:5]: + print(f" ~{format_duration(center)} ({count} occurrences)") + + low_clusters = detect_clusters(analysis['low_durations_us']) + print("LOW gap clusters:") + for center, count in low_clusters[:5]: + print(f" ~{format_duration(center)} ({count} occurrences)") + print() + + # Raw values + if args.raw: + print(f"First {args.n} Transitions") + print("-" * 40) + durations = analysis['durations_us'] + initial = 0 if analysis['initial_state'] == 'LOW' else 1 + for i in range(min(args.n, len(durations))): + state = "HIGH" if (i + initial) % 2 == 0 else "LOW" + dur = durations[i] + print(f" [{i:3d}] {state}: {format_duration(dur)}") + print() + + # Histogram + if args.histogram: + print_histogram(analysis['durations_us'], bins=args.bins, + title="All Durations") + print_histogram(analysis['high_durations_us'], bins=args.bins, + title="HIGH Pulse Durations") + print_histogram(analysis['low_durations_us'], bins=args.bins, + title="LOW Gap Durations") + + # Export + if args.export: + export_csv(data, args.export) + + +def _print_install_help(): + """Print installation instructions for sigrok-cli.""" + print("\nInstall sigrok-cli:") + print(" Arch Linux: sudo pacman -S sigrok-cli") + print(" Ubuntu: sudo apt install sigrok-cli") + print(" Fedora: sudo dnf install sigrok-cli") + print(" macOS: brew install sigrok-cli") + print(" Windows: Download from https://sigrok.org/wiki/Downloads") + print(" If you get a missing DLL error, install the VC++ 2010 runtime:") + print(" winget install --id=Microsoft.VCRedist.2010.x64") + + +if __name__ == "__main__": + main() diff --git a/skills/sigrok/examples.md b/skills/sigrok/examples.md new file mode 100644 index 0000000..5c530c9 --- /dev/null +++ b/skills/sigrok/examples.md @@ -0,0 +1,346 @@ +# Sigrok Capture Analysis Examples + +## Example 1: Unknown Protocol Analysis + +**Scenario**: You have a captured signal from an unknown IoT device and need to identify the protocol. + +### Step 1: Check the file + +```bash +python3 skills/sigrok/analyze_capture.py capture.sr --show +``` + +Output: +``` +File: capture.sr +Channels: D0, D1, D2, D3 +Sample rate: 24.0 MHz +Total samples: 2400000 +Duration: 0.100s +``` + +### Step 2: Get an overview + +```bash +python3 skills/sigrok/analyze_capture.py capture.sr +``` + +This shows basic timing statistics and automatic protocol guesses. + +### Step 3: Look at timing distribution + +```bash +python3 skills/sigrok/analyze_capture.py capture.sr --histogram --clusters +``` + +Look for distinct timing clusters — these help identify the protocol: +- **2-3 clusters** → likely UART or 1-Wire +- **Very regular clusters** → likely SPI or I2C (clock-based) +- **Wide spread** → likely analog or mixed protocol + +### Step 4: Try the suggested decoder + +```bash +# If UART was guessed at 115200 baud +python3 skills/sigrok/analyze_capture.py capture.sr --decode uart:baudrate=115200 +``` + +### Step 5: Export for further analysis + +```bash +python3 skills/sigrok/analyze_capture.py capture.sr --export transitions.csv +``` + +--- + +## Example 2: UART Signal Decoding + +**Scenario**: You captured UART communication from an IoT device's debug port. + +### Identify UART characteristics + +```bash +python3 skills/sigrok/analyze_capture.py uart_capture.sr --clusters +``` + +Look for: +- Signal idles HIGH +- Consistent bit period clusters +- Durations are multiples of the base bit period + +### Decode at a specific baud rate + +```bash +# Standard 115200 baud +python3 skills/sigrok/analyze_capture.py uart_capture.sr \ + --decode uart:baudrate=115200 + +# With channel mapping (TX on D0, RX on D1) +python3 skills/sigrok/analyze_capture.py uart_capture.sr \ + --decode uart:baudrate=115200:tx=D0:rx=D1 +``` + +### Extract only the transmitted data + +```bash +# Text annotations +python3 skills/sigrok/analyze_capture.py uart_capture.sr \ + --decode uart:baudrate=115200 \ + --annotations uart=tx-data + +# Raw binary output +python3 skills/sigrok/analyze_capture.py uart_capture.sr \ + --decode uart:baudrate=115200 \ + --binary-decode uart=tx \ + --binary-out uart_tx.bin + +# Inspect the binary +xxd uart_tx.bin | head -20 +``` + +### Custom baud rate detection + +If standard rates don't work, use timing analysis to find the baud rate: + +```bash +python3 skills/sigrok/analyze_capture.py uart_capture.sr --clusters --raw -n 30 +``` + +The smallest timing cluster represents the bit period. Calculate: +``` +baud_rate = 1,000,000 / cluster_us +``` + +For example, if the smallest cluster is ~8.7us: `1000000 / 8.7 = 114943` → try 115200 baud. + +--- + +## Example 3: SPI Analysis with Stacked Decoders + +**Scenario**: Multi-channel SPI capture with clock and data lines. + +### Identify SPI signals + +First check which channels are present: +```bash +python3 skills/sigrok/analyze_capture.py spi_capture.sr --show +``` + +Analyze each channel to identify clock vs data: +```bash +# Check D0 (likely CLK - should be very regular) +python3 skills/sigrok/analyze_capture.py spi_capture.sr --channel D0 --clusters + +# Check D1 (MOSI - data, less regular) +python3 skills/sigrok/analyze_capture.py spi_capture.sr --channel D1 --clusters +``` + +The channel with the most regular timing is the clock (SCLK). + +### Decode SPI + +```bash +# Basic SPI decode with channel mapping +python3 skills/sigrok/analyze_capture.py spi_capture.sr \ + --decode spi:clk=D0:mosi=D1:miso=D2:cs=D3 + +# With clock polarity/phase options +python3 skills/sigrok/analyze_capture.py spi_capture.sr \ + --decode spi:clk=D0:mosi=D1:miso=D2:cs=D3:cpol=0:cpha=0 +``` + +### Stacked decoder: SPI Flash + +```bash +# Decode SPI flash commands (read, write, erase) +python3 skills/sigrok/analyze_capture.py spi_capture.sr \ + --decode spi:clk=D0:mosi=D1:miso=D2:cs=D3,spiflash + +# Show only flash data +python3 skills/sigrok/analyze_capture.py spi_capture.sr \ + --decode spi:clk=D0:mosi=D1:miso=D2:cs=D3,spiflash \ + --annotations spiflash +``` + +--- + +## Example 4: I2C Address Discovery + +**Scenario**: Captured I2C communication and need to find device addresses. + +### Decode I2C + +```bash +# Basic I2C decode (SCL on D0, SDA on D1) +python3 skills/sigrok/analyze_capture.py i2c_capture.sr \ + --decode i2c:scl=D0:sda=D1 +``` + +Output shows START, address, R/W, data bytes, ACK/NACK, STOP. + +### Stacked decoder: EEPROM + +```bash +# Decode I2C EEPROM read/write operations +python3 skills/sigrok/analyze_capture.py i2c_capture.sr \ + --decode i2c:scl=D0:sda=D1,eeprom24xx + +# Show only EEPROM data +python3 skills/sigrok/analyze_capture.py i2c_capture.sr \ + --decode i2c:scl=D0:sda=D1,eeprom24xx \ + --annotations eeprom24xx +``` + +### Other I2C stacked decoders + +```bash +# Temperature sensor (LM75) +--decode i2c:scl=D0:sda=D1,lm75 + +# Real-time clock (DS1307) +--decode i2c:scl=D0:sda=D1,ds1307 + +# Accelerometer (ADXL345) +--decode i2c:scl=D0:sda=D1,adxl345 +``` + +--- + +## Example 5: CAN Bus Analysis + +**Scenario**: Captured CAN bus traffic from a vehicle or industrial device. + +### Decode CAN frames + +```bash +# CAN at 500kbps +python3 skills/sigrok/analyze_capture.py can_capture.sr \ + --decode can:bitrate=500000 + +# CAN at 250kbps +python3 skills/sigrok/analyze_capture.py can_capture.sr \ + --decode can:bitrate=250000 +``` + +### Timing analysis for bitrate detection + +```bash +python3 skills/sigrok/analyze_capture.py can_capture.sr --clusters +``` + +The smallest cluster corresponds to the bit period: +- 2.0us → 500kbps +- 4.0us → 250kbps +- 8.0us → 125kbps +- 1.0us → 1Mbps + +--- + +## Example 6: JTAG Tap Detection + +**Scenario**: JTAG pins identified on a PCB, need to analyze the communication. + +### Decode JTAG + +```bash +# Basic JTAG decode +python3 skills/sigrok/analyze_capture.py jtag_capture.sr \ + --decode jtag:tck=D0:tms=D1:tdi=D2:tdo=D3 + +# STM32-specific JTAG +python3 skills/sigrok/analyze_capture.py jtag_capture.sr \ + --decode jtag:tck=D0:tms=D1:tdi=D2:tdo=D3,jtag_stm32 +``` + +### SWD (Serial Wire Debug) — alternative to JTAG + +```bash +# SWD decode (ARM Cortex-M) +python3 skills/sigrok/analyze_capture.py swd_capture.sr \ + --decode swd:swclk=D0:swdio=D1 +``` + +--- + +## Example 7: Working with Different File Formats + +### Sigrok session files (.sr) + +Created by PulseView or sigrok-cli capture: +```bash +python3 skills/sigrok/analyze_capture.py capture.sr +python3 skills/sigrok/analyze_capture.py capture.sr --decode uart:baudrate=9600 +``` + +### CSV files + +Exported from logic analyzers or other tools: +```bash +# Sigrok-format CSV (Time [s],D0,D1,...) +python3 skills/sigrok/analyze_capture.py export.csv + +# Analyze specific channel +python3 skills/sigrok/analyze_capture.py export.csv --channel D2 +``` + +### VCD files (Value Change Dump) + +Common output from simulators and some logic analyzers: +```bash +python3 skills/sigrok/analyze_capture.py waveform.vcd +python3 skills/sigrok/analyze_capture.py waveform.vcd --channel CLK +``` + +--- + +## Example 8: Combined Timing + Decoding Workflow + +**Scenario**: Full analysis workflow from unknown capture to decoded data. + +```bash +# Step 1: Overview +python3 skills/sigrok/analyze_capture.py mystery.sr --show + +# Step 2: Timing analysis on each channel +python3 skills/sigrok/analyze_capture.py mystery.sr --channel D0 --clusters +python3 skills/sigrok/analyze_capture.py mystery.sr --channel D1 --clusters + +# Step 3: Decode based on findings +python3 skills/sigrok/analyze_capture.py mystery.sr \ + --decode uart:baudrate=115200:tx=D0 + +# Step 4: Extract the data +python3 skills/sigrok/analyze_capture.py mystery.sr \ + --decode uart:baudrate=115200:tx=D0 \ + --binary-decode uart=tx \ + --binary-out decoded.bin + +# Step 5: Analyze decoded data +xxd decoded.bin +strings decoded.bin +``` + +--- + +## Useful sigrok-cli Direct Commands + +For advanced scenarios beyond the helper script: + +```bash +# List all supported protocol decoders with details +sigrok-cli -L + +# Show decoder options +sigrok-cli -P uart --show + +# Decode with multiple independent decoder stacks +sigrok-cli -i capture.sr -P uart:tx=D0:baudrate=115200 -P spi:clk=D2:mosi=D3 + +# Export to different formats +sigrok-cli -i capture.sr -O csv > data.csv +sigrok-cli -i capture.sr -O vcd > data.vcd +sigrok-cli -i capture.sr -O bits > data.txt + +# Continuous sampling to file +sigrok-cli -d fx2lafw --config samplerate=1m --samples 1m -o capture.sr +``` From b5bf0f6b35a16e5a162af015a25eb11acb35aaa5 Mon Sep 17 00:00:00 2001 From: yeyeto2788 Date: Tue, 10 Feb 2026 05:25:04 +0100 Subject: [PATCH 2/2] fix: Fix typo on .gitignore --- .gitignore | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.gitignore b/.gitignore index ced3406..4733564 100644 --- a/.gitignore +++ b/.gitignore @@ -194,4 +194,4 @@ $RECYCLE.BIN/ .Trash-* # Claude local permissions -.claude\settings.local.json \ No newline at end of file +.claude/settings.local.json \ No newline at end of file