From f2741f40e6cb14d1dc389e3d1a661038c024fd29 Mon Sep 17 00:00:00 2001 From: Aliaksandr Nikitsin Date: Wed, 6 Aug 2025 20:42:33 +0200 Subject: [PATCH 1/5] fix pyshark error --- modules/test/tls/python/requirements.txt | 1 - modules/test/tls/python/src/tls_module.py | 77 ++++++++++++++--------- 2 files changed, 46 insertions(+), 32 deletions(-) diff --git a/modules/test/tls/python/requirements.txt b/modules/test/tls/python/requirements.txt index 2cbc1db46..8820cfcc7 100644 --- a/modules/test/tls/python/requirements.txt +++ b/modules/test/tls/python/requirements.txt @@ -16,6 +16,5 @@ urllib3==2.2.2 cryptography==44.0.1 pyOpenSSL==24.3.0 lxml==5.1.0 # Requirement of pyshark but if upgraded automatically above 5.1 will cause a -pyshark==0.6 requests==2.32.3 python-nmap==0.7.1 diff --git a/modules/test/tls/python/src/tls_module.py b/modules/test/tls/python/src/tls_module.py index 42245dfb9..ebe036617 100644 --- a/modules/test/tls/python/src/tls_module.py +++ b/modules/test/tls/python/src/tls_module.py @@ -18,7 +18,7 @@ from tls_util import TLSUtil from http_scan import HTTPScan import os -import pyshark +import subprocess from binascii import hexlify from cryptography import x509 from cryptography.hazmat.backends import default_backend @@ -28,6 +28,7 @@ from cryptography.x509 import GeneralNames, DNSName, ExtendedKeyUsage, ObjectIdentifier, SubjectAlternativeName from jinja2 import Environment, FileSystemLoader + LOG_NAME = 'test_tls' MODULE_REPORT_FILE_NAME = 'tls_report.j2.html' STARTUP_CAPTURE_FILE = '/runtime/device/startup.pcap' @@ -300,38 +301,52 @@ def generate_outbound_connection_table(self, outbound_conns): return html_content def extract_certificates_from_pcap(self, pcap_files, mac_address): - # Initialize a list to store packets - all_packets = [] - # Iterate over each file + """ + Extracts TLS certificates from pcap files using tshark for robust extraction. + Returns a dict keyed by (ip, port) with x509.Certificate objects as values. + """ + certificates = {} + cert_count = 0 + # MAC address for tshark must be colon-separated and lowercase + mac_colon = mac_address.lower() if mac_address else '' for pcap_file in pcap_files: - # Open the capture file and filter by tls - packets = pyshark.FileCapture(pcap_file, display_filter='tls') try: - # Iterate over each packet in the file and add it to the list - for packet in packets: - all_packets.append(packet) - finally: - # Close the capture file - packets.close() - - certificates = {} - # Loop through each item (packet) - for packet in all_packets: - if 'TLS' in packet: - # Check if the packet's source matches the target MAC address - if 'eth' in packet and (packet.eth.src == mac_address): - # Look for attribute of x509 - if hasattr(packet['TLS'], 'x509sat_utf8string'): - certificate_bytes = bytes.fromhex( - packet['TLS'].handshake_certificate.replace(':', '')) - # Parse the certificate bytes - certificate = x509.load_der_x509_certificate( - certificate_bytes, default_backend()) - # Extract IP address and port from packet - ip_address = packet.ip.src - port = packet.tcp.srcport if 'tcp' in packet else packet.udp.srcport - # Store certificate in dictionary with IP address and port as key - certificates[(ip_address, port)] = certificate + # If no MAC address is provided, match old method: do not extract any certs + if not mac_colon: + continue + # Build tshark filter expression with MAC filter + filter_expr = f'tls.handshake.certificate && eth.src=={mac_colon}' + cmd = [ + 'tshark', + '-r', pcap_file, + '-Y', filter_expr, + '-T', 'fields', + '-e', 'ip.src', + '-e', 'tcp.srcport', + '-e', 'tls.handshake.certificate', + ] + result = subprocess.run(cmd, capture_output=True, text=True, check=False) + if result.returncode != 0: + LOGGER.error(f"tshark failed on {pcap_file}: {result.stderr}") + continue + for line in result.stdout.splitlines(): + parts = line.strip().split('\t') + if len(parts) != 3: + continue + ip, port, cert_hex = parts + if not ip or not port or not cert_hex: + continue + # tls.handshake.certificate can be a list (comma-separated DERs), take the first + first_cert_hex = cert_hex.split(',')[0] + try: + cert_bytes = bytes.fromhex(first_cert_hex) + cert = x509.load_der_x509_certificate(cert_bytes, default_backend()) + certificates[(ip, port)] = cert + cert_count += 1 + except Exception as e: + LOGGER.info(f"Failed to parse certificate from {pcap_file} {ip}:{port}: {e}") + except Exception as e: + LOGGER.error(f"Error running tshark on {pcap_file}: {e}") sorted_keys = sorted(certificates.keys(), key=lambda x: (x[0], x[1])) sorted_certificates = {k: certificates[k] for k in sorted_keys} return sorted_certificates From 7459013b346d5951e2602c12be2fab2318f60dbe Mon Sep 17 00:00:00 2001 From: Aliaksandr Nikitsin Date: Wed, 6 Aug 2025 20:54:36 +0200 Subject: [PATCH 2/5] fix not_valid_before depracation warning --- modules/test/tls/python/src/tls_module.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/modules/test/tls/python/src/tls_module.py b/modules/test/tls/python/src/tls_module.py index ebe036617..19f1e8bf1 100644 --- a/modules/test/tls/python/src/tls_module.py +++ b/modules/test/tls/python/src/tls_module.py @@ -102,12 +102,12 @@ def generate_module_report(self): pages[cert_num] = {} # Extract certificate data - not_valid_before = cert.not_valid_before - not_valid_after = cert.not_valid_after + not_valid_before = cert.not_valid_before_utc + not_valid_after = cert.not_valid_after_utc version_value = f'{cert.version.value + 1} ({hex(cert.version.value)})' signature_alg_value = cert.signature_algorithm_oid._name - not_before = str(not_valid_before) - not_after = str(not_valid_after) + not_before = str(not_valid_before.replace(tzinfo=None)) + not_after = str(not_valid_after.replace(tzinfo=None)) public_key = cert.public_key() signed_by = 'None' From 31dab72e7761fc7d015f5d063cf1780cd00db18f Mon Sep 17 00:00:00 2001 From: Aliaksandr Nikitsin Date: Wed, 6 Aug 2025 20:59:48 +0200 Subject: [PATCH 3/5] fix tests deprecation warning --- testing/unit/tls/tls_module_test.py | 20 +++++++++++--------- 1 file changed, 11 insertions(+), 9 deletions(-) diff --git a/testing/unit/tls/tls_module_test.py b/testing/unit/tls/tls_module_test.py index cd849a773..cf4ee58c1 100644 --- a/testing/unit/tls/tls_module_test.py +++ b/testing/unit/tls/tls_module_test.py @@ -760,18 +760,20 @@ def make_tls_connection(self, ] context.set_ciphers(':'.join(ciphers_str)) - # Disable specific TLS versions based on the input - if tls_version != '1.1': - context.options |= ssl.OP_NO_TLSv1 # Disable TLS 1.0 - context.options |= ssl.OP_NO_TLSv1_1 # Disable TLS 1.1 - else: - context.options |= ssl.OP_NO_TLSv1_2 # Disable TLS 1.2 - context.options |= ssl.OP_NO_TLSv1_3 # Disable TLS 1.3 + # Set allowed TLS versions using minimum_version and maximum_version if tls_version == '1.3': - context.options |= ssl.OP_NO_TLSv1_2 # Disable TLS 1.2 + context.minimum_version = ssl.TLSVersion.TLSv1_3 + context.maximum_version = ssl.TLSVersion.TLSv1_3 elif tls_version == '1.2': - context.options |= ssl.OP_NO_TLSv1_3 # Disable TLS 1.3 + context.minimum_version = ssl.TLSVersion.TLSv1_2 + context.maximum_version = ssl.TLSVersion.TLSv1_2 + elif tls_version == '1.1': + context.minimum_version = ssl.TLSVersion.TLSv1_1 + context.maximum_version = ssl.TLSVersion.TLSv1_1 + elif tls_version == '1.0': + context.minimum_version = ssl.TLSVersion.TLSv1 + context.maximum_version = ssl.TLSVersion.TLSv1 # Create an SSL/TLS socket with socket.create_connection((hostname, port), timeout=10) as sock: From e4ba3a9e7edbdbb10ed3f0890a7cc4c850ca5348 Mon Sep 17 00:00:00 2001 From: Aliaksandr Nikitsin Date: Wed, 6 Aug 2025 21:09:25 +0200 Subject: [PATCH 4/5] pylint --- modules/test/tls/python/src/tls_module.py | 20 ++++++++++++++------ 1 file changed, 14 insertions(+), 6 deletions(-) diff --git a/modules/test/tls/python/src/tls_module.py b/modules/test/tls/python/src/tls_module.py index 19f1e8bf1..48135c55a 100644 --- a/modules/test/tls/python/src/tls_module.py +++ b/modules/test/tls/python/src/tls_module.py @@ -302,8 +302,10 @@ def generate_outbound_connection_table(self, outbound_conns): def extract_certificates_from_pcap(self, pcap_files, mac_address): """ - Extracts TLS certificates from pcap files using tshark for robust extraction. - Returns a dict keyed by (ip, port) with x509.Certificate objects as values. + Extracts TLS certificates from pcap files using tshark for + robust extraction. + Returns a dict keyed by (ip, port) with x509. + Certificate objects as values. """ certificates = {} cert_count = 0 @@ -311,7 +313,8 @@ def extract_certificates_from_pcap(self, pcap_files, mac_address): mac_colon = mac_address.lower() if mac_address else '' for pcap_file in pcap_files: try: - # If no MAC address is provided, match old method: do not extract any certs + # If no MAC address is provided, match old method: + # do not extract any certs if not mac_colon: continue # Build tshark filter expression with MAC filter @@ -325,7 +328,11 @@ def extract_certificates_from_pcap(self, pcap_files, mac_address): '-e', 'tcp.srcport', '-e', 'tls.handshake.certificate', ] - result = subprocess.run(cmd, capture_output=True, text=True, check=False) + result = subprocess.run(cmd, + capture_output=True, + text=True, + check=False + ) if result.returncode != 0: LOGGER.error(f"tshark failed on {pcap_file}: {result.stderr}") continue @@ -336,7 +343,7 @@ def extract_certificates_from_pcap(self, pcap_files, mac_address): ip, port, cert_hex = parts if not ip or not port or not cert_hex: continue - # tls.handshake.certificate can be a list (comma-separated DERs), take the first + # tls.handshake.certificate can be a list (comma-separated DERs) first_cert_hex = cert_hex.split(',')[0] try: cert_bytes = bytes.fromhex(first_cert_hex) @@ -344,7 +351,8 @@ def extract_certificates_from_pcap(self, pcap_files, mac_address): certificates[(ip, port)] = cert cert_count += 1 except Exception as e: - LOGGER.info(f"Failed to parse certificate from {pcap_file} {ip}:{port}: {e}") + LOGGER.info( + f"Failed to parse certificate from {pcap_file} {ip}:{port}: {e}") except Exception as e: LOGGER.error(f"Error running tshark on {pcap_file}: {e}") sorted_keys = sorted(certificates.keys(), key=lambda x: (x[0], x[1])) From fe5d709467ee1d05d423d9b64d58ea1308e514a9 Mon Sep 17 00:00:00 2001 From: Aliaksandr Nikitsin Date: Fri, 8 Aug 2025 12:34:19 +0200 Subject: [PATCH 5/5] fix inconsistent quotas --- modules/test/tls/python/src/tls_module.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/modules/test/tls/python/src/tls_module.py b/modules/test/tls/python/src/tls_module.py index 48135c55a..c6c9ca86c 100644 --- a/modules/test/tls/python/src/tls_module.py +++ b/modules/test/tls/python/src/tls_module.py @@ -334,7 +334,7 @@ def extract_certificates_from_pcap(self, pcap_files, mac_address): check=False ) if result.returncode != 0: - LOGGER.error(f"tshark failed on {pcap_file}: {result.stderr}") + LOGGER.error(f'tshark failed on {pcap_file}: {result.stderr}') continue for line in result.stdout.splitlines(): parts = line.strip().split('\t') @@ -352,9 +352,9 @@ def extract_certificates_from_pcap(self, pcap_files, mac_address): cert_count += 1 except Exception as e: LOGGER.info( - f"Failed to parse certificate from {pcap_file} {ip}:{port}: {e}") + f'Failed to parse certificate from {pcap_file} {ip}:{port}: {e}') except Exception as e: - LOGGER.error(f"Error running tshark on {pcap_file}: {e}") + LOGGER.error(f'Error running tshark on {pcap_file}: {e}') sorted_keys = sorted(certificates.keys(), key=lambda x: (x[0], x[1])) sorted_certificates = {k: certificates[k] for k in sorted_keys} return sorted_certificates