Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion modules/test/tls/python/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,5 @@ urllib3==2.2.2
cryptography==44.0.1
pyOpenSSL==24.3.0
lxml==5.1.0 # Requirement of pyshark but if upgraded automatically above 5.1 will cause a
pyshark==0.6
requests==2.32.3
python-nmap==0.7.1
93 changes: 58 additions & 35 deletions modules/test/tls/python/src/tls_module.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
from tls_util import TLSUtil
from http_scan import HTTPScan
import os
import pyshark
import subprocess
from binascii import hexlify
from cryptography import x509
from cryptography.hazmat.backends import default_backend
Expand All @@ -28,6 +28,7 @@
from cryptography.x509 import GeneralNames, DNSName, ExtendedKeyUsage, ObjectIdentifier, SubjectAlternativeName
from jinja2 import Environment, FileSystemLoader


LOG_NAME = 'test_tls'
MODULE_REPORT_FILE_NAME = 'tls_report.j2.html'
STARTUP_CAPTURE_FILE = '/runtime/device/startup.pcap'
Expand Down Expand Up @@ -101,12 +102,12 @@ def generate_module_report(self):
pages[cert_num] = {}

# Extract certificate data
not_valid_before = cert.not_valid_before
not_valid_after = cert.not_valid_after
not_valid_before = cert.not_valid_before_utc
not_valid_after = cert.not_valid_after_utc
version_value = f'{cert.version.value + 1} ({hex(cert.version.value)})'
signature_alg_value = cert.signature_algorithm_oid._name
not_before = str(not_valid_before)
not_after = str(not_valid_after)
not_before = str(not_valid_before.replace(tzinfo=None))
not_after = str(not_valid_after.replace(tzinfo=None))
public_key = cert.public_key()
signed_by = 'None'

Expand Down Expand Up @@ -300,38 +301,60 @@ def generate_outbound_connection_table(self, outbound_conns):
return html_content

def extract_certificates_from_pcap(self, pcap_files, mac_address):
# Initialize a list to store packets
all_packets = []
# Iterate over each file
"""
Extracts TLS certificates from pcap files using tshark for
robust extraction.
Returns a dict keyed by (ip, port) with x509.
Certificate objects as values.
"""
certificates = {}
cert_count = 0
# MAC address for tshark must be colon-separated and lowercase
mac_colon = mac_address.lower() if mac_address else ''
for pcap_file in pcap_files:
# Open the capture file and filter by tls
packets = pyshark.FileCapture(pcap_file, display_filter='tls')
try:
# Iterate over each packet in the file and add it to the list
for packet in packets:
all_packets.append(packet)
finally:
# Close the capture file
packets.close()

certificates = {}
# Loop through each item (packet)
for packet in all_packets:
if 'TLS' in packet:
# Check if the packet's source matches the target MAC address
if 'eth' in packet and (packet.eth.src == mac_address):
# Look for attribute of x509
if hasattr(packet['TLS'], 'x509sat_utf8string'):
certificate_bytes = bytes.fromhex(
packet['TLS'].handshake_certificate.replace(':', ''))
# Parse the certificate bytes
certificate = x509.load_der_x509_certificate(
certificate_bytes, default_backend())
# Extract IP address and port from packet
ip_address = packet.ip.src
port = packet.tcp.srcport if 'tcp' in packet else packet.udp.srcport
# Store certificate in dictionary with IP address and port as key
certificates[(ip_address, port)] = certificate
# If no MAC address is provided, match old method:
# do not extract any certs
if not mac_colon:
continue
# Build tshark filter expression with MAC filter
filter_expr = f'tls.handshake.certificate && eth.src=={mac_colon}'
cmd = [
'tshark',
'-r', pcap_file,
'-Y', filter_expr,
'-T', 'fields',
'-e', 'ip.src',
'-e', 'tcp.srcport',
'-e', 'tls.handshake.certificate',
]
result = subprocess.run(cmd,
capture_output=True,
text=True,
check=False
)
if result.returncode != 0:
LOGGER.error(f'tshark failed on {pcap_file}: {result.stderr}')
continue
for line in result.stdout.splitlines():
parts = line.strip().split('\t')
if len(parts) != 3:
continue
ip, port, cert_hex = parts
if not ip or not port or not cert_hex:
continue
# tls.handshake.certificate can be a list (comma-separated DERs)
first_cert_hex = cert_hex.split(',')[0]
try:
cert_bytes = bytes.fromhex(first_cert_hex)
cert = x509.load_der_x509_certificate(cert_bytes, default_backend())
certificates[(ip, port)] = cert
cert_count += 1
except Exception as e:
LOGGER.info(
f'Failed to parse certificate from {pcap_file} {ip}:{port}: {e}')
except Exception as e:
LOGGER.error(f'Error running tshark on {pcap_file}: {e}')
sorted_keys = sorted(certificates.keys(), key=lambda x: (x[0], x[1]))
sorted_certificates = {k: certificates[k] for k in sorted_keys}
return sorted_certificates
Expand Down
20 changes: 11 additions & 9 deletions testing/unit/tls/tls_module_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -760,18 +760,20 @@ def make_tls_connection(self,
]
context.set_ciphers(':'.join(ciphers_str))

# Disable specific TLS versions based on the input
if tls_version != '1.1':
context.options |= ssl.OP_NO_TLSv1 # Disable TLS 1.0
context.options |= ssl.OP_NO_TLSv1_1 # Disable TLS 1.1
else:
context.options |= ssl.OP_NO_TLSv1_2 # Disable TLS 1.2
context.options |= ssl.OP_NO_TLSv1_3 # Disable TLS 1.3

# Set allowed TLS versions using minimum_version and maximum_version
if tls_version == '1.3':
context.options |= ssl.OP_NO_TLSv1_2 # Disable TLS 1.2
context.minimum_version = ssl.TLSVersion.TLSv1_3
context.maximum_version = ssl.TLSVersion.TLSv1_3
elif tls_version == '1.2':
context.options |= ssl.OP_NO_TLSv1_3 # Disable TLS 1.3
context.minimum_version = ssl.TLSVersion.TLSv1_2
context.maximum_version = ssl.TLSVersion.TLSv1_2
elif tls_version == '1.1':
context.minimum_version = ssl.TLSVersion.TLSv1_1
context.maximum_version = ssl.TLSVersion.TLSv1_1
elif tls_version == '1.0':
context.minimum_version = ssl.TLSVersion.TLSv1
context.maximum_version = ssl.TLSVersion.TLSv1

# Create an SSL/TLS socket
with socket.create_connection((hostname, port), timeout=10) as sock:
Expand Down