Skip to content

Security: mijahauan/ka9q-python

Security

docs/SECURITY.md

Security and Design Review - ka9q-python & ka9q-radio

Review Date: 2024
Reviewer: Analysis of codebase architecture, security, and design patterns
Scope: ka9q-python library and ka9q-radio C source integration


Executive Summary

The ka9q-python library is well-architected with good separation of concerns, comprehensive validation, and thread-safe design. However, several security vulnerabilities, design inadequacies, and missed opportunities have been identified that should be addressed to improve robustness, security, and usability.

Risk Level Summary:

  • 🔴 Critical: 2 issues (Authentication, Cryptographic randomness)
  • 🟠 High: 4 issues (Input validation, Rate limiting, DoS, Resource exhaustion)
  • 🟡 Medium: 8 issues (Error handling, IPv6, Monitoring, Async support)
  • 🟢 Low: 10+ issues (Documentation, Caching, Convenience features)

1. SECURITY VULNERABILITIES

🔴 CRITICAL

1.1 No Authentication/Authorization on Multicast Commands

Risk: Any process on the network can send commands to radiod

Current State:

# Anyone can send commands - no authentication
control.create_channel(ssrc=12345, frequency_hz=14.074e6)

Impact:

  • Malicious actors can tune channels to arbitrary frequencies
  • Denial of service by deleting/corrupting channels
  • Eavesdropping possible by reconfiguring channels
  • No way to verify command source

Recommendation:

  1. Implement HMAC-based message authentication using shared secrets
  2. Add optional TLS/DTLS support for command transport
  3. Implement command sequence numbers to prevent replay attacks
  4. Add ACL support in radiod for source IP filtering

Code Example:

import hmac
import hashlib

class AuthenticatedRadiodControl(RadiodControl):
    def __init__(self, status_address, shared_secret=None):
        super().__init__(status_address)
        self.shared_secret = shared_secret
    
    def send_command(self, cmdbuffer):
        if self.shared_secret:
            # Add HMAC signature
            signature = hmac.new(
                self.shared_secret.encode(),
                cmdbuffer,
                hashlib.sha256
            ).digest()
            cmdbuffer.extend(signature)
        super().send_command(cmdbuffer)

1.2 Predictable Command Tags (Weak Randomness)

Risk: Command tag collision and predictability

Current State:

# control.py line 708, 814, 984
encode_int(cmdbuffer, StatusType.COMMAND_TAG, random.randint(1, 2**31))

Impact:

  • Predictable tags allow response spoofing
  • Possible tag collisions in high-throughput scenarios
  • Not suitable for security-critical applications

Recommendation:

import secrets

# Use cryptographically secure random
encode_int(cmdbuffer, StatusType.COMMAND_TAG, secrets.randbits(31))

🟠 HIGH

1.3 Insufficient Input Validation on String Parameters

Risk: Potential buffer overflow or injection in radiod

Current State:

def encode_string(buf: bytearray, type_val: int, s: str) -> int:
    s_bytes = s.encode('utf-8')
    # No validation on content, only length check
    if length >= 65536:
        raise ValueError(f"String too long: {length} bytes")

Issues:

  • No validation of string content (special characters, control codes)
  • No maximum length enforcement for specific fields (preset names should be ~16 chars max)
  • Potential for null byte injection
  • No sanitization of newlines or other control characters

Recommendation:

def validate_preset_name(preset: str) -> None:
    """Validate preset name is safe"""
    if not preset:
        raise ValidationError("Preset name cannot be empty")
    if len(preset) > 32:
        raise ValidationError(f"Preset name too long: {len(preset)} chars (max 32)")
    # Only allow alphanumeric, dash, underscore
    if not re.match(r'^[a-zA-Z0-9_-]+$', preset):
        raise ValidationError(f"Invalid preset name: {preset}")
    # Reject control characters
    if any(ord(c) < 32 for c in preset):
        raise ValidationError("Preset name contains control characters")

def encode_string(buf: bytearray, type_val: int, s: str, max_length: int = 65535) -> int:
    if len(s) > max_length:
        raise ValidationError(f"String too long: {len(s)} > {max_length}")
    # ... rest of encoding

1.4 No Rate Limiting on Commands

Risk: Network flooding and DoS

Current State: No rate limiting - can send unlimited commands

Impact:

  • Can flood radiod with commands, causing CPU/memory exhaustion
  • Can saturate multicast network
  • No backpressure mechanism

Recommendation:

import time
from collections import deque

class RateLimitedRadiodControl(RadiodControl):
    def __init__(self, status_address, max_commands_per_sec=100):
        super().__init__(status_address)
        self.max_commands_per_sec = max_commands_per_sec
        self.command_times = deque(maxlen=max_commands_per_sec)
        self._rate_limit_lock = threading.Lock()
    
    def send_command(self, cmdbuffer, **kwargs):
        with self._rate_limit_lock:
            now = time.time()
            # Remove timestamps older than 1 second
            while self.command_times and now - self.command_times[0] > 1.0:
                self.command_times.popleft()
            
            # Check if we've exceeded rate limit
            if len(self.command_times) >= self.max_commands_per_sec:
                sleep_time = 1.0 - (now - self.command_times[0])
                if sleep_time > 0:
                    logger.warning(f"Rate limit reached, sleeping {sleep_time:.3f}s")
                    time.sleep(sleep_time)
            
            self.command_times.append(now)
            return super().send_command(cmdbuffer, **kwargs)

1.5 Socket Descriptor Leaks in Error Paths

Risk: Resource exhaustion

Current State: Some error paths may not properly close sockets

Found in:

# discovery.py line 107
status_sock = _create_status_listener_socket(multicast_addr)
# If error occurs before finally block, socket may leak

Recommendation:

def discover_channels_native(status_address: str, listen_duration: float = 2.0):
    status_sock = None
    try:
        multicast_addr = resolve_multicast_address(status_address, timeout=2.0)
        status_sock = _create_status_listener_socket(multicast_addr)
        # ... operations ...
    except Exception as e:
        logger.error(f"Error: {e}")
        raise
    finally:
        if status_sock:
            try:
                status_sock.close()
            except Exception as e:
                logger.warning(f"Error closing socket: {e}")

1.6 Potential Buffer Overflows in Decode Functions

Risk: Malformed status packets could cause errors

Current State:

def decode_double(data: bytes, length: int) -> float:
    # Reconstruct 8-byte big-endian representation
    value_bytes = b'\x00' * (8 - length) + data[:length]
    # What if length > 8? Buffer overflow in slice operation
    return struct.unpack('>d', value_bytes)[0]

Recommendation:

def decode_double(data: bytes, length: int) -> float:
    """Decode a double with bounds checking"""
    if length > 8:
        logger.warning(f"Invalid double length: {length} (max 8)")
        length = 8  # Truncate to prevent issues
    if length < 0:
        raise ValidationError(f"Negative length: {length}")
    
    value_bytes = b'\x00' * (8 - length) + data[:length]
    return struct.unpack('>d', value_bytes)[0]

🟡 MEDIUM

1.7 No Timeout Configuration for Network Operations

Current: Hardcoded timeouts scattered throughout code

Recommendation:

class RadiodControl:
    def __init__(self, status_address, 
                 connect_timeout=5.0,
                 command_timeout=2.0,
                 status_timeout=5.0):
        self.connect_timeout = connect_timeout
        self.command_timeout = command_timeout
        self.status_timeout = status_timeout

1.8 Limited IPv6 Support

Issue: IPv6 socket decoding is incomplete (line 379-388 in control.py)

Current:

elif length == 10:
    # IPv6: 8 bytes address + 2 bytes port (big-endian)
    # Note: This is a truncated IPv6 address (only 8 bytes instead of 16)

This is a protocol limitation from ka9q-radio, but should be documented.


2. DESIGN INADEQUACIES

2.1 Lack of Connection State Management

Issue: No connection health monitoring

Recommendation:

class RadiodControl:
    def __init__(self, status_address):
        # ...
        self._last_status_time = None
        self._connection_state = "disconnected"
    
    def is_connected(self) -> bool:
        """Check if radiod is responding"""
        if self._last_status_time is None:
            return False
        return time.time() - self._last_status_time < 10.0
    
    def health_check(self) -> dict:
        """Perform health check"""
        try:
            channels = discover_channels(self.status_address, listen_duration=1.0)
            return {
                'connected': True,
                'channel_count': len(channels),
                'last_status': self._last_status_time
            }
        except Exception as e:
            return {'connected': False, 'error': str(e)}

2.2 No Async/Await Support

Issue: Blocking operations in async applications

Current: All operations are synchronous, blocking event loops

Recommendation: Add async version

import asyncio

class AsyncRadiodControl:
    async def create_channel_async(self, ssrc, frequency_hz, **kwargs):
        loop = asyncio.get_event_loop()
        return await loop.run_in_executor(
            None, 
            self.create_channel, 
            ssrc, 
            frequency_hz
        )
    
    async def tune_async(self, ssrc, **kwargs):
        # Proper async implementation with asyncio.DatagramProtocol
        pass

2.3 No Built-in Metrics/Observability

Issue: Difficult to monitor in production

Recommendation:

from dataclasses import dataclass, field
from typing import Dict
import time

@dataclass
class Metrics:
    commands_sent: int = 0
    commands_failed: int = 0
    status_received: int = 0
    avg_response_time: float = 0.0
    errors: Dict[str, int] = field(default_factory=dict)
    
    def record_command(self, success: bool, duration: float = 0.0):
        self.commands_sent += 1
        if not success:
            self.commands_failed += 1
        # Update avg response time with exponential moving average
        alpha = 0.1
        self.avg_response_time = (alpha * duration + 
                                  (1 - alpha) * self.avg_response_time)

class RadiodControl:
    def __init__(self, status_address):
        # ...
        self.metrics = Metrics()
    
    def get_metrics(self) -> dict:
        return {
            'commands_sent': self.metrics.commands_sent,
            'commands_failed': self.metrics.commands_failed,
            'success_rate': (self.metrics.commands_sent - self.metrics.commands_failed) / 
                           max(1, self.metrics.commands_sent),
            'avg_response_time_ms': self.metrics.avg_response_time * 1000,
        }

2.4 Missing Channel Lifecycle Management

Issue: No automatic cleanup of unused channels

Recommendation:

class ManagedChannel:
    """Managed channel with automatic cleanup"""
    def __init__(self, control, ssrc, **kwargs):
        self.control = control
        self.ssrc = ssrc
        self.created_at = time.time()
        self.last_used = time.time()
    
    def __enter__(self):
        self.control.create_channel(ssrc=self.ssrc, ...)
        return self
    
    def __exit__(self, exc_type, exc_val, exc_tb):
        # Could send LOCK=0, frequency=0 to free channel
        pass

# Usage:
with ManagedChannel(control, ssrc=12345, frequency_hz=14.074e6) as ch:
    # Channel exists
    pass
# Channel automatically cleaned up

2.5 No Configuration File Support

Issue: All configuration in code

Recommendation:

import configparser
from pathlib import Path

def load_config(config_file: Path = None) -> dict:
    """Load configuration from file"""
    config = configparser.ConfigParser()
    
    # Default locations
    config_paths = [
        Path.home() / '.ka9q' / 'config.ini',
        Path('/etc/ka9q/config.ini'),
        config_file,
    ]
    
    for path in config_paths:
        if path and path.exists():
            config.read(path)
            break
    
    return {
        'status_address': config.get('radiod', 'status_address', 
                                     fallback='radiod.local'),
        'timeout': config.getfloat('radiod', 'timeout', fallback=5.0),
        # ...
    }

2.6 TODO: Extract Decoder to Standalone Module

Found: Line 110 in discovery.py

# TODO: Extract decoder into standalone module to avoid this
temp_control = RadiodControl.__new__(RadiodControl)

Issue: Discovery module creates fake RadiodControl instance just to access decoder

Recommendation: Refactor decoders into ka9q/protocol.py

# ka9q/protocol.py
def decode_status_packet(buffer: bytes) -> dict:
    """Standalone status packet decoder"""
    # Move _decode_status_response here
    pass

# discovery.py
from .protocol import decode_status_packet

def discover_channels_native(...):
    # ...
    status = decode_status_packet(buffer)

3. MISSED OPPORTUNITIES

3.1 No Command Batching

Opportunity: Reduce network overhead by batching commands

class BatchedRadiodControl(RadiodControl):
    def __init__(self, *args, batch_size=10, batch_timeout=0.1, **kwargs):
        super().__init__(*args, **kwargs)
        self.batch_size = batch_size
        self.batch_timeout = batch_timeout
        self._batch = []
        self._batch_lock = threading.Lock()
        self._batch_timer = None
    
    def _flush_batch(self):
        """Send all batched commands"""
        with self._batch_lock:
            if not self._batch:
                return
            
            # Combine into single multicast packet
            combined = bytearray()
            for cmd in self._batch:
                combined.extend(cmd)
            
            super().send_command(combined)
            self._batch.clear()
    
    def send_command(self, cmdbuffer, batch=True):
        if not batch:
            return super().send_command(cmdbuffer)
        
        with self._batch_lock:
            self._batch.append(cmdbuffer)
            
            # Flush if batch is full
            if len(self._batch) >= self.batch_size:
                self._flush_batch()
            else:
                # Set timer to flush after timeout
                if self._batch_timer:
                    self._batch_timer.cancel()
                self._batch_timer = threading.Timer(
                    self.batch_timeout, 
                    self._flush_batch
                )
                self._batch_timer.start()

3.2 No Caching for Discovery

Opportunity: Cache discovered channels to reduce network traffic

import functools
import time

def cached_discovery(ttl_seconds=5.0):
    """Decorator to cache discovery results"""
    cache = {'result': None, 'timestamp': 0}
    
    @functools.wraps(discover_channels)
    def wrapper(*args, **kwargs):
        now = time.time()
        if cache['result'] and (now - cache['timestamp']) < ttl_seconds:
            logger.debug(f"Using cached discovery (age: {now - cache['timestamp']:.1f}s)")
            return cache['result']
        
        result = discover_channels(*args, **kwargs)
        cache['result'] = result
        cache['timestamp'] = now
        return result
    
    return wrapper

# Usage
discover_channels_cached = cached_discovery(ttl_seconds=10.0)

3.3 No Event/Callback System for Status Updates

Opportunity: Real-time notifications instead of polling

from typing import Callable, Dict
import threading

class EventDrivenRadiodControl(RadiodControl):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self._callbacks: Dict[str, list] = {
            'channel_created': [],
            'frequency_changed': [],
            'status_update': [],
        }
        self._listener_thread = None
        self._running = False
    
    def on(self, event: str, callback: Callable):
        """Register event callback"""
        if event not in self._callbacks:
            self._callbacks[event] = []
        self._callbacks[event].append(callback)
    
    def start_listening(self):
        """Start background status listener"""
        self._running = True
        self._listener_thread = threading.Thread(
            target=self._listen_loop,
            daemon=True
        )
        self._listener_thread.start()
    
    def _listen_loop(self):
        """Background listener for status updates"""
        status_sock = self._setup_status_listener()
        while self._running:
            try:
                buffer, _ = status_sock.recvfrom(8192)
                status = self._decode_status_response(buffer)
                
                # Trigger callbacks
                for callback in self._callbacks['status_update']:
                    callback(status)
            except socket.timeout:
                continue

3.4 No Higher-Level Abstractions for Common Use Cases

Opportunity: Convenience classes for specific applications

class WSPRMonitor(RadiodControl):
    """High-level interface for WSPR monitoring"""
    WSPR_BANDS = {
        '160m': 1.8366e6,
        '80m': 3.5686e6,
        '40m': 7.0386e6,
        '30m': 10.1387e6,
        '20m': 14.0956e6,
    }
    
    def monitor_all_bands(self):
        """Create channels for all WSPR bands"""
        for band, freq in self.WSPR_BANDS.items():
            self.create_channel(
                ssrc=int(freq),
                frequency_hz=freq,
                preset='usb',
                sample_rate=12000
            )
            logger.info(f"Monitoring {band} WSPR")

class SatelliteTracker(RadiodControl):
    """High-level interface for satellite tracking with Doppler"""
    def track_satellite(self, satellite_name: str, observer_location: tuple):
        """Track satellite with automatic Doppler correction"""
        # Integration with orbital prediction libraries
        pass

3.5 No Built-in RTP Stream Handling

Opportunity: Complete solution for receiving audio/IQ data

Current: Library only controls channels, doesn't handle RTP streams

Recommendation:

import socket
import struct

class RTPReceiver:
    """Receive RTP audio/IQ streams from radiod"""
    def __init__(self, multicast_addr: str, port: int):
        self.addr = multicast_addr
        self.port = port
        self.sock = None
    
    def start(self):
        """Start receiving RTP packets"""
        self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
        self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        self.sock.bind(('0.0.0.0', self.port))
        
        # Join multicast group
        mreq = struct.pack('4sl', 
                          socket.inet_aton(self.addr),
                          socket.INADDR_ANY)
        self.sock.setsockopt(socket.IPPROTO_IP, 
                            socket.IP_ADD_MEMBERSHIP, 
                            mreq)
    
    def receive_samples(self, num_samples: int = 1024):
        """Receive IQ samples from RTP stream"""
        # Parse RTP header, extract samples
        pass

# Integration with RadiodControl
class RadiodWithRTP(RadiodControl):
    def create_channel_with_receiver(self, ssrc, frequency_hz, **kwargs):
        """Create channel and return RTP receiver"""
        # Discover multicast address for this channel
        self.create_channel(ssrc, frequency_hz, **kwargs)
        channels = discover_channels(self.status_address)
        
        if ssrc in channels:
            channel = channels[ssrc]
            rtp = RTPReceiver(
                channel.multicast_address,
                channel.port
            )
            rtp.start()
            return rtp

3.6 No Connection Pooling for Multiple Radiod Instances

Opportunity: Manage multiple radiod instances efficiently

class RadiodPool:
    """Connection pool for multiple radiod instances"""
    def __init__(self):
        self._connections: Dict[str, RadiodControl] = {}
        self._lock = threading.Lock()
    
    def get_connection(self, status_address: str) -> RadiodControl:
        """Get or create connection to radiod instance"""
        with self._lock:
            if status_address not in self._connections:
                self._connections[status_address] = RadiodControl(status_address)
            return self._connections[status_address]
    
    def close_all(self):
        """Close all connections"""
        for conn in self._connections.values():
            conn.close()

3.7 No Automatic Service Discovery with Auto-Connect

Opportunity: Zero-configuration usage

class AutoRadiodControl:
    """Automatically discover and connect to radiod"""
    @classmethod
    def auto_connect(cls, service_name: str = None):
        """Discover and connect to radiod automatically"""
        services = discover_radiod_services()
        
        if not services:
            raise DiscoveryError("No radiod services found on network")
        
        if service_name:
            # Find specific service
            for svc in services:
                if service_name in svc['name']:
                    return cls(svc['address'])
            raise DiscoveryError(f"Service {service_name} not found")
        else:
            # Use first available
            return cls(services[0]['address'])

# Usage:
control = AutoRadiodControl.auto_connect()
# OR
control = AutoRadiodControl.auto_connect(service_name='radiod@hf')

4. KA9Q-RADIO C SOURCE CONSIDERATIONS

4.1 Buffer Safety in C Code

Observation: Based on the C source headers, radiod uses:

  • Manual string manipulation
  • Fixed-size buffers
  • TLV parsing

Recommendation for Python Integration:

  • Always validate string lengths before sending
  • Never send untrusted input without sanitization
  • Implement fuzzing tests for TLV encoder/decoder

4.2 Multicast Protocol Considerations

Observation: radiod protocol has no built-in authentication

This is a fundamental protocol limitation - changes would require updates to both C and Python implementations.

Recommendation:

  • Document security model clearly
  • Provide guidance on network isolation
  • Consider proposing authenticated protocol extension to ka9q-radio project

5. TESTING GAPS

5.1 Missing Test Coverage

No tests for:

  • Malformed packet handling
  • Concurrent access from multiple threads
  • Socket exhaustion scenarios
  • Network partition recovery
  • IPv6 functionality
  • Large-scale stress testing

Recommendation: Add comprehensive test suite

# tests/test_security.py
def test_malformed_packets():
    """Test handling of malformed status packets"""
    control = RadiodControl("test.local")
    
    malformed_packets = [
        b'',  # Empty
        b'\x00',  # Only type byte
        b'\x00\x21\x08',  # Incomplete
        b'\x00\x21\xFF' + b'\x00' * 300,  # Invalid length
    ]
    
    for packet in malformed_packets:
        status = control._decode_status_response(packet)
        # Should not crash, should return empty or minimal dict

def test_concurrent_commands():
    """Test thread safety"""
    control = RadiodControl("test.local")
    errors = []
    
    def worker():
        try:
            for i in range(100):
                control.set_frequency(ssrc=10000, frequency_hz=14.0e6 + i)
        except Exception as e:
            errors.append(e)
    
    threads = [threading.Thread(target=worker) for _ in range(10)]
    for t in threads:
        t.start()
    for t in threads:
        t.join()
    
    assert len(errors) == 0, f"Concurrent errors: {errors}"

6. DOCUMENTATION IMPROVEMENTS

6.1 Security Considerations Section Needed

Add to README.md:

## Security Considerations

⚠️ **IMPORTANT**: The ka9q-radio protocol does not include authentication or encryption.

### Network Security
- Deploy radiod on isolated network segments
- Use firewall rules to restrict multicast access
- Consider VPN for remote access
- Monitor for unauthorized command traffic

### Input Validation
- All user inputs are validated before transmission
- String parameters are length-limited and sanitized
- Numeric parameters are range-checked

### Threat Model
This library is designed for trusted network environments, such as:
- Private LANs behind firewalls
- Laboratory/research networks
- Amateur radio operators on home networks

NOT suitable for:
- Public internet exposure
- Multi-tenant environments
- Untrusted networks

7. PRIORITY RECOMMENDATIONS

Immediate (Week 1)

  1. ✅ Replace random.randint() with secrets.randbits() for command tags
  2. ✅ Add comprehensive input validation for all string parameters
  3. ✅ Fix socket leaks in error paths
  4. ✅ Add bounds checking to all decode functions

Short-term (Month 1)

  1. ✅ Implement rate limiting
  2. ✅ Add metrics/observability
  3. ✅ Extract decoder to standalone module (fix TODO)
  4. ✅ Add security documentation

Medium-term (Quarter 1)

  1. ✅ Add async/await support
  2. ✅ Implement event/callback system
  3. ✅ Add command batching
  4. ✅ Add discovery caching

Long-term (Year 1)

  1. ✅ Propose authenticated protocol extension to ka9q-radio project
  2. ✅ Add complete RTP stream handling
  3. ✅ Build higher-level abstractions for common use cases
  4. ✅ Comprehensive security audit and penetration testing

8. CONCLUSION

The ka9q-python library demonstrates solid software engineering with good architecture, thread safety, and comprehensive validation. However, the lack of authentication in the underlying protocol and several implementation gaps create security and robustness concerns.

Key Strengths:

  • Well-structured, modular design
  • Comprehensive parameter validation
  • Thread-safe implementation
  • Good error handling and logging
  • Clear documentation

Key Weaknesses:

  • No authentication/authorization
  • Weak cryptographic randomness
  • Limited async support
  • Missing observability features
  • Protocol security limitations

Overall Assessment: Suitable for trusted networks with improvements needed for production environments and security-critical applications.


END OF REVIEW

There aren’t any published security advisories