diff --git a/modules/test/tls/python/requirements.txt b/modules/test/tls/python/requirements.txt index 2cbc1db46..8820cfcc7 100644 --- a/modules/test/tls/python/requirements.txt +++ b/modules/test/tls/python/requirements.txt @@ -16,6 +16,5 @@ urllib3==2.2.2 cryptography==44.0.1 pyOpenSSL==24.3.0 lxml==5.1.0 # Requirement of pyshark but if upgraded automatically above 5.1 will cause a -pyshark==0.6 requests==2.32.3 python-nmap==0.7.1 diff --git a/modules/test/tls/python/src/tls_module.py b/modules/test/tls/python/src/tls_module.py index 42245dfb9..c6c9ca86c 100644 --- a/modules/test/tls/python/src/tls_module.py +++ b/modules/test/tls/python/src/tls_module.py @@ -18,7 +18,7 @@ from tls_util import TLSUtil from http_scan import HTTPScan import os -import pyshark +import subprocess from binascii import hexlify from cryptography import x509 from cryptography.hazmat.backends import default_backend @@ -28,6 +28,7 @@ from cryptography.x509 import GeneralNames, DNSName, ExtendedKeyUsage, ObjectIdentifier, SubjectAlternativeName from jinja2 import Environment, FileSystemLoader + LOG_NAME = 'test_tls' MODULE_REPORT_FILE_NAME = 'tls_report.j2.html' STARTUP_CAPTURE_FILE = '/runtime/device/startup.pcap' @@ -101,12 +102,12 @@ def generate_module_report(self): pages[cert_num] = {} # Extract certificate data - not_valid_before = cert.not_valid_before - not_valid_after = cert.not_valid_after + not_valid_before = cert.not_valid_before_utc + not_valid_after = cert.not_valid_after_utc version_value = f'{cert.version.value + 1} ({hex(cert.version.value)})' signature_alg_value = cert.signature_algorithm_oid._name - not_before = str(not_valid_before) - not_after = str(not_valid_after) + not_before = str(not_valid_before.replace(tzinfo=None)) + not_after = str(not_valid_after.replace(tzinfo=None)) public_key = cert.public_key() signed_by = 'None' @@ -300,38 +301,60 @@ def generate_outbound_connection_table(self, outbound_conns): return html_content def extract_certificates_from_pcap(self, pcap_files, mac_address): - # Initialize a list to store packets - all_packets = [] - # Iterate over each file + """ + Extracts TLS certificates from pcap files using tshark for + robust extraction. + Returns a dict keyed by (ip, port) with x509. + Certificate objects as values. + """ + certificates = {} + cert_count = 0 + # MAC address for tshark must be colon-separated and lowercase + mac_colon = mac_address.lower() if mac_address else '' for pcap_file in pcap_files: - # Open the capture file and filter by tls - packets = pyshark.FileCapture(pcap_file, display_filter='tls') try: - # Iterate over each packet in the file and add it to the list - for packet in packets: - all_packets.append(packet) - finally: - # Close the capture file - packets.close() - - certificates = {} - # Loop through each item (packet) - for packet in all_packets: - if 'TLS' in packet: - # Check if the packet's source matches the target MAC address - if 'eth' in packet and (packet.eth.src == mac_address): - # Look for attribute of x509 - if hasattr(packet['TLS'], 'x509sat_utf8string'): - certificate_bytes = bytes.fromhex( - packet['TLS'].handshake_certificate.replace(':', '')) - # Parse the certificate bytes - certificate = x509.load_der_x509_certificate( - certificate_bytes, default_backend()) - # Extract IP address and port from packet - ip_address = packet.ip.src - port = packet.tcp.srcport if 'tcp' in packet else packet.udp.srcport - # Store certificate in dictionary with IP address and port as key - certificates[(ip_address, port)] = certificate + # If no MAC address is provided, match old method: + # do not extract any certs + if not mac_colon: + continue + # Build tshark filter expression with MAC filter + filter_expr = f'tls.handshake.certificate && eth.src=={mac_colon}' + cmd = [ + 'tshark', + '-r', pcap_file, + '-Y', filter_expr, + '-T', 'fields', + '-e', 'ip.src', + '-e', 'tcp.srcport', + '-e', 'tls.handshake.certificate', + ] + result = subprocess.run(cmd, + capture_output=True, + text=True, + check=False + ) + if result.returncode != 0: + LOGGER.error(f'tshark failed on {pcap_file}: {result.stderr}') + continue + for line in result.stdout.splitlines(): + parts = line.strip().split('\t') + if len(parts) != 3: + continue + ip, port, cert_hex = parts + if not ip or not port or not cert_hex: + continue + # tls.handshake.certificate can be a list (comma-separated DERs) + first_cert_hex = cert_hex.split(',')[0] + try: + cert_bytes = bytes.fromhex(first_cert_hex) + cert = x509.load_der_x509_certificate(cert_bytes, default_backend()) + certificates[(ip, port)] = cert + cert_count += 1 + except Exception as e: + LOGGER.info( + f'Failed to parse certificate from {pcap_file} {ip}:{port}: {e}') + except Exception as e: + LOGGER.error(f'Error running tshark on {pcap_file}: {e}') sorted_keys = sorted(certificates.keys(), key=lambda x: (x[0], x[1])) sorted_certificates = {k: certificates[k] for k in sorted_keys} return sorted_certificates diff --git a/testing/unit/tls/tls_module_test.py b/testing/unit/tls/tls_module_test.py index cd849a773..cf4ee58c1 100644 --- a/testing/unit/tls/tls_module_test.py +++ b/testing/unit/tls/tls_module_test.py @@ -760,18 +760,20 @@ def make_tls_connection(self, ] context.set_ciphers(':'.join(ciphers_str)) - # Disable specific TLS versions based on the input - if tls_version != '1.1': - context.options |= ssl.OP_NO_TLSv1 # Disable TLS 1.0 - context.options |= ssl.OP_NO_TLSv1_1 # Disable TLS 1.1 - else: - context.options |= ssl.OP_NO_TLSv1_2 # Disable TLS 1.2 - context.options |= ssl.OP_NO_TLSv1_3 # Disable TLS 1.3 + # Set allowed TLS versions using minimum_version and maximum_version if tls_version == '1.3': - context.options |= ssl.OP_NO_TLSv1_2 # Disable TLS 1.2 + context.minimum_version = ssl.TLSVersion.TLSv1_3 + context.maximum_version = ssl.TLSVersion.TLSv1_3 elif tls_version == '1.2': - context.options |= ssl.OP_NO_TLSv1_3 # Disable TLS 1.3 + context.minimum_version = ssl.TLSVersion.TLSv1_2 + context.maximum_version = ssl.TLSVersion.TLSv1_2 + elif tls_version == '1.1': + context.minimum_version = ssl.TLSVersion.TLSv1_1 + context.maximum_version = ssl.TLSVersion.TLSv1_1 + elif tls_version == '1.0': + context.minimum_version = ssl.TLSVersion.TLSv1 + context.maximum_version = ssl.TLSVersion.TLSv1 # Create an SSL/TLS socket with socket.create_connection((hostname, port), timeout=10) as sock: