From 85fe62f4bc093290dc5ff3be9d98c1bbdfe7c77d Mon Sep 17 00:00:00 2001 From: jhughesbiot Date: Fri, 10 Jan 2025 16:46:35 -0700 Subject: [PATCH 1/3] Add full port scan to determine any HTTPS/TLS server running on non-standard 443 port --- modules/test/tls/conf/module_config.json | 2 +- modules/test/tls/python/requirements.txt | 2 +- modules/test/tls/python/src/http_scan.py | 76 +++++++++++++++ modules/test/tls/python/src/tls_module.py | 108 ++++++++++++++++------ modules/test/tls/python/src/tls_util.py | 47 ++++++---- testing/unit/tls/tls_module_test.py | 65 ++++++------- 6 files changed, 218 insertions(+), 82 deletions(-) create mode 100644 modules/test/tls/python/src/http_scan.py diff --git a/modules/test/tls/conf/module_config.json b/modules/test/tls/conf/module_config.json index 9c83c85df..228fbb64d 100644 --- a/modules/test/tls/conf/module_config.json +++ b/modules/test/tls/conf/module_config.json @@ -9,7 +9,7 @@ "docker": { "depends_on": "base", "enable_container": true, - "timeout": 420 + "timeout": 540 }, "tests":[ { diff --git a/modules/test/tls/python/requirements.txt b/modules/test/tls/python/requirements.txt index a5719c77c..d7180f31b 100644 --- a/modules/test/tls/python/requirements.txt +++ b/modules/test/tls/python/requirements.txt @@ -18,4 +18,4 @@ pyOpenSSL==24.3.0 lxml==5.1.0 # Requirement of pyshark but if upgraded automatically above 5.1 will cause a pyshark==0.6 requests==2.32.3 - +python-nmap==0.7.1 diff --git a/modules/test/tls/python/src/http_scan.py b/modules/test/tls/python/src/http_scan.py new file mode 100644 index 000000000..a25f5215d --- /dev/null +++ b/modules/test/tls/python/src/http_scan.py @@ -0,0 +1,76 @@ +# Copyright 2023 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Module that contains various methods for scaning for HTTP/HTTPS services""" +import nmap +import socket +import ssl + +LOGGER = None + + +class HTTPScan(): + """Helper class to scan for all HTTP/HTTPS services for a device""" + + def __init__(self, logger): + global LOGGER + LOGGER = logger + + def scan_all_ports(self, ip): + """Scans all ports and identifies potential HTTP/HTTPS ports.""" + nm = nmap.PortScanner() + nm.scan(hosts=ip, ports='1-65535', arguments='--open -sV') + + http_ports = [] + for host in nm.all_hosts(): + for proto in nm[host].all_protocols(): + for port in nm[host][proto].keys(): + service = nm[host][proto][port]['name'] + if 'http' in service: + http_ports.append(port) + return http_ports + + def is_https(self, ip, port): + """Attempts a TLS handshake to determine if the port serves HTTPS.""" + try: + context = ssl.create_default_context() + context.check_hostname = False + context.verify_mode = ssl.CERT_NONE + with socket.create_connection((ip, port), timeout=2) as sock: + with context.wrap_socket(sock, server_hostname=ip): + return True + except ssl.SSLError: + return False + except Exception: # pylint: disable=W0718 + return False + + def verify_http_or_https(self, ip, ports): + """Classifies each port as HTTP or HTTPS.""" + results = {} + for port in ports: + if self.is_https(ip, port): + results[port] = 'HTTPS' + else: + results[port] = 'HTTP' + return results + + def scan_for_http_services(self, ip_address): + LOGGER.info(f'Scanning for HTTP ports on {ip_address}') + http_ports = self.scan_all_ports(ip_address) + results = None + if len(http_ports) > 0: + LOGGER.info(f'Checking HTTP ports on {ip_address}: {http_ports}') + results = self.verify_http_or_https(ip_address, http_ports) + for port, service_type in results.items(): + LOGGER.info(f'Port {port}: {service_type}') + return results diff --git a/modules/test/tls/python/src/tls_module.py b/modules/test/tls/python/src/tls_module.py index e9163843a..d551956e8 100644 --- a/modules/test/tls/python/src/tls_module.py +++ b/modules/test/tls/python/src/tls_module.py @@ -16,6 +16,7 @@ from test_module import TestModule from tls_util import TLSUtil +from http_scan import HTTPScan import os import pyshark from binascii import hexlify @@ -55,6 +56,8 @@ def __init__(self, global LOGGER LOGGER = self._get_logger() self._tls_util = TLSUtil(LOGGER) + self._http_scan = HTTPScan(LOGGER) + self._scan_results = None def generate_module_report(self): html_content = '

TLS Module

' @@ -387,53 +390,102 @@ def extract_certificates_from_pcap(self, pcap_files, mac_address): def _security_tls_v1_2_server(self): LOGGER.info('Running security.tls.v1_2_server') - self._resolve_device_ip() # If the ipv4 address wasn't resolved yet, try again + self._resolve_device_ip() + ports_valid = [] + ports_invalid = [] + result = None + details = '' + description = '' if self._device_ipv4_addr is not None: - tls_1_2_results = self._tls_util.validate_tls_server( - self._device_ipv4_addr, tls_version='1.2') - tls_1_3_results = self._tls_util.validate_tls_server( - self._device_ipv4_addr, tls_version='1.3') - results = self._tls_util.process_tls_server_results( - tls_1_2_results, tls_1_3_results) + if self._scan_results is None: + self._scan_results = self._http_scan.scan_for_http_services( + self._device_ipv4_addr) + if self._scan_results is not None: + for port, service_type in self._scan_results.items(): + if 'HTTPS' in service_type: + LOGGER.info(f'Inspecting Service on port {port}: {service_type}') + tls_1_2_results = self._tls_util.validate_tls_server( + host=self._device_ipv4_addr, port=port, tls_version='1.2') + tls_1_3_results = self._tls_util.validate_tls_server( + host=self._device_ipv4_addr, port=port, tls_version='1.3') + port_results = self._tls_util.process_tls_server_results( + tls_1_2_results, tls_1_3_results, port=port) + if port_results is not None: + result = port_results[ + 0] if result is None else result and port_results[0] + details += port_results[1] + if port_results[0]: + ports_valid.append(port) + else: + ports_invalid.append(port) + elif 'HTTP' in service_type: + # Any non-HTTPS service detetcted is automatically invalid + ports_invalid.append(port) + details += f'\nHTTP service detected on port {port}' + result = False + LOGGER.debug(f'Valid Ports: {ports_valid}') + LOGGER.debug(f'Invalid Ports: {ports_invalid}') # Determine results and return proper messaging and details - description = '' - result = results[0] - details = results[1] if result is None: result = 'Feature Not Detected' description = 'TLS 1.2 certificate could not be validated' elif result: - description = 'TLS 1.2 certificate is valid' + ports_csv = ','.join(map(str,ports_valid)) + description = f'TLS 1.2 certificate valid on ports: {ports_csv}' else: - description = 'TLS 1.2 certificate is invalid' + ports_csv = ','.join(map(str,ports_invalid)) + description = f'TLS 1.2 certificate invalid on ports: {ports_csv}' return result, description, details - else: LOGGER.error('Could not resolve device IP address. Skipping') return 'Error', 'Could not resolve device IP address' def _security_tls_v1_3_server(self): LOGGER.info('Running security.tls.v1_3_server') - self._resolve_device_ip() # If the ipv4 address wasn't resolved yet, try again + self._resolve_device_ip() + ports_valid = [] + ports_invalid = [] + result = None + details = '' + description = '' if self._device_ipv4_addr is not None: - results = self._tls_util.validate_tls_server(self._device_ipv4_addr, - tls_version='1.3') + if self._scan_results is None: + self._scan_results = self._http_scan.scan_for_http_services( + self._device_ipv4_addr) + if self._scan_results is not None: + for port, service_type in self._scan_results.items(): + if 'HTTPS' in service_type: + LOGGER.info(f'Inspecting Service on port {port}: {service_type}') + port_results = self._tls_util.validate_tls_server( + self._device_ipv4_addr, tls_version='1.3', port=port) + if port_results is not None: + result = port_results[ + 0] if result is None else result and port_results[0] + details += port_results[1] + if port_results[0]: + ports_valid.append(port) + else: + ports_invalid.append(port) + elif 'HTTP' in service_type: + # Any non-HTTPS service detetcted is automatically invalid + ports_invalid.append(port) + details += f'\nHTTP service detected on port {port}' + result = False + LOGGER.debug(f'Valid Ports: {ports_valid}') + LOGGER.debug(f'Invalid Ports: {ports_invalid}') # Determine results and return proper messaging and details - description = '' - result = results[0] - details = results[1] - description = '' if result is None: result = 'Feature Not Detected' description = 'TLS 1.3 certificate could not be validated' - elif results[0]: - description = 'TLS 1.3 certificate is valid' + elif result: + ports_csv = ','.join(map(str,ports_valid)) + description = f'TLS 1.3 certificate valid on ports: {ports_csv}' else: - description = 'TLS 1.3 certificate is invalid' + ports_csv = ','.join(map(str,ports_invalid)) + description = f'TLS 1.3 certificate invalid on ports: {ports_csv}' return result, description, details - else: LOGGER.error('Could not resolve device IP address') return 'Error', 'Could not resolve device IP address' @@ -472,14 +524,14 @@ def _security_tls_v1_0_client(self): def _security_tls_v1_2_client(self): LOGGER.info('Running security.tls.v1_2_client') return self._validate_tls_client(self._device_mac, - '1.2', - unsupported_versions=['1.0', '1.1']) + '1.2', + unsupported_versions=['1.0', '1.1']) def _security_tls_v1_3_client(self): LOGGER.info('Running security.tls.v1_3_client') return self._validate_tls_client(self._device_mac, - '1.3', - unsupported_versions=['1.0', '1.1']) + '1.3', + unsupported_versions=['1.0', '1.1']) def _validate_tls_client(self, client_mac, diff --git a/modules/test/tls/python/src/tls_util.py b/modules/test/tls/python/src/tls_util.py index 37cd89133..d92379aab 100644 --- a/modules/test/tls/python/src/tls_util.py +++ b/modules/test/tls/python/src/tls_util.py @@ -221,11 +221,11 @@ def verify_public_key(self, public_key): else: return False, 'Key is not RSA or EC type' - def validate_signature(self, host): + def validate_signature(self, host, port): # Reconnect to the device but with validate signature option # set to true which will check for proper cert chains # within the valid CA root certs stored on the server - if self.validate_trusted_ca_signature(host): + if self.validate_trusted_ca_signature(host, port): LOGGER.info('Authorized Certificate Authority signature confirmed') return True, 'Authorized Certificate Authority signature confirmed' else: @@ -261,13 +261,14 @@ def validate_local_ca_signature(self, device_cert_path): LOGGER.error(str(e)) return False, None - def validate_trusted_ca_signature(self, host): + def validate_trusted_ca_signature(self, host, port): # Reconnect to the device but with validate signature option # set to true which will check for proper cert chains # within the valid CA root certs stored on the server LOGGER.info( 'Checking for valid signature from authorized Certificate Authorities') - public_cert = self.get_public_certificate(host, + public_cert = self.get_public_certificate(host=host, + port=port, validate_cert=True, tls_version='1.2') if public_cert: @@ -377,34 +378,41 @@ def get_certificate(self, uri, timeout=10): LOGGER.error(f'Error fetching certificate from URI: {e}') return certificate - def process_tls_server_results(self, tls_1_2_results, tls_1_3_results): + def process_tls_server_results(self, tls_1_2_results, tls_1_3_results, port): results = '' if tls_1_2_results[0] is None and tls_1_3_results[0] is not None: # Validate only TLS 1.3 results - description = 'TLS 1.3' + (' not' if not tls_1_3_results[0] else - '') + ' validated: ' + tls_1_3_results[1] + description = (f"""TLS 1.3 {'' if tls_1_3_results[0] else 'not '}""" + f"""validated on port {port}: """ + f"""{tls_1_3_results[1]}""") results = tls_1_3_results[0], description elif tls_1_3_results[0] is None and tls_1_2_results[0] is not None: # Vaidate only TLS 1.2 results - description = 'TLS 1.2' + (' not' if not tls_1_2_results[0] else - '') + ' validated: ' + tls_1_2_results[1] + description = (f"""TLS 1.2 {'' if tls_1_2_results[0] else 'not '}""" + f"""validated on port {port}: """ + f"""{tls_1_2_results[1]}""") results = tls_1_2_results[0], description elif tls_1_3_results[0] is not None and tls_1_2_results[0] is not None: # Validate both results - description = 'TLS 1.2' + (' not' if not tls_1_2_results[0] else - '') + ' validated: ' + tls_1_2_results[1] - description += '\nTLS 1.3' + (' not' if not tls_1_3_results[0] else - '') + ' validated: ' + tls_1_3_results[1] + description = (f"""TLS 1.2 {'' if tls_1_2_results[0] else 'not '}""" + f"""validated on port {port}: """ + f"""{tls_1_2_results[1]}""") + description += '\n'+(f"""TLS 1.3 {'' if tls_1_3_results[0] else 'not '}""" + f"""validated on port {port}: """ + f"""{tls_1_3_results[1]}""") results = tls_1_2_results[0] or tls_1_3_results[0], description else: - description = f'TLS 1.2 not validated: {tls_1_2_results[1]}' - description += f'\nTLS 1.3 not validated: {tls_1_3_results[1]}' + description = (f"""TLS 1.2 not validated on port {port}: """ + f"""{tls_1_2_results[1]}""") + description += '\n'+(f"""TLS 1.3 not validated on port {port}: """ + f"""{tls_1_3_results[1]}""") results = None, description LOGGER.info('TLS server test results: ' + str(results)) return results - def validate_tls_server(self, host, tls_version): - cert_pem = self.get_public_certificate(host, + def validate_tls_server(self, host, tls_version, port=443): + cert_pem = self.get_public_certificate(host=host, + port=port, validate_cert=False, tls_version=tls_version) if cert_pem: @@ -430,13 +438,12 @@ def validate_tls_server(self, host, tls_version): else: key_valid = [0] - sig_valid = self.validate_signature(host) + sig_valid = self.validate_signature(host=host, port=port) # Check results cert_valid = tr_valid[0] and key_valid[0] and sig_valid[0] test_details = tr_valid[1] + '\n' + key_valid[1] + '\n' + sig_valid[1] LOGGER.info('Certificate validated: ' + str(cert_valid)) - LOGGER.info('Test details:\n' + test_details) return cert_valid, test_details else: LOGGER.info('Failed to resolve public certificate') @@ -632,7 +639,7 @@ def get_non_tls_client_connection_ips(self, client_mac, capture_files): # Packet is not ACK or SYN src_ip = ipaddress.ip_address( - packet['_source']['layers']['ip.src'][0]) + packet['_source']['layers']['ip.src'][0]) src_subnet = ipaddress.ip_network(src_ip, strict=False) subnet_with_mask = ipaddress.ip_network( src_subnet, strict=False).supernet(new_prefix=24) diff --git a/testing/unit/tls/tls_module_test.py b/testing/unit/tls/tls_module_test.py index f42c7e9d4..f69f82fc3 100644 --- a/testing/unit/tls/tls_module_test.py +++ b/testing/unit/tls/tls_module_test.py @@ -71,7 +71,7 @@ def security_tls_v1_2_server_test(self): tls_version='1.2') tls_1_3_results = None, 'No TLS 1.3' test_results = TLS_UTIL.process_tls_server_results(tls_1_2_results, - tls_1_3_results) + tls_1_3_results,port=443) self.assertTrue(test_results[0]) # Test 1.2 server when 1.3 connection is established @@ -80,7 +80,7 @@ def security_tls_v1_2_for_1_3_server_test(self): tls_1_3_results = TLS_UTIL.validate_tls_server('google.com', tls_version='1.3') test_results = TLS_UTIL.process_tls_server_results(tls_1_2_results, - tls_1_3_results) + tls_1_3_results,port=443) self.assertTrue(test_results[0]) # Test 1.2 server when 1.2 and 1.3 connection is established @@ -90,7 +90,7 @@ def security_tls_v1_2_for_1_2_and_1_3_server_test(self): tls_1_3_results = TLS_UTIL.validate_tls_server('google.com', tls_version='1.3') test_results = TLS_UTIL.process_tls_server_results(tls_1_2_results, - tls_1_3_results) + tls_1_3_results,port=443) self.assertTrue(test_results[0]) # Test 1.2 server when 1.2 and failed 1.3 connection is established @@ -99,7 +99,7 @@ def security_tls_v1_2_for_1_2_and_1_3_fail_server_test(self): tls_version='1.2') tls_1_3_results = False, 'Signature faild' test_results = TLS_UTIL.process_tls_server_results(tls_1_2_results, - tls_1_3_results) + tls_1_3_results,port=443) self.assertTrue(test_results[0]) # Test 1.2 server when 1.3 and failed 1.2 connection is established @@ -108,10 +108,10 @@ def security_tls_v1_2_for_1_3_and_1_2_fail_server_test(self): tls_version='1.3') tls_1_2_results = False, 'Signature faild' test_results = TLS_UTIL.process_tls_server_results(tls_1_2_results, - tls_1_3_results) + tls_1_3_results,port=443) self.assertTrue(test_results[0]) - def security_tls_server_results_test(self, ): + def security_tls_server_results_test(self): # Generic messages to test they are passing through # to the results as expected fail_message = 'Certificate not validated' @@ -121,74 +121,75 @@ def security_tls_server_results_test(self, ): # Both None tls_1_2_results = None, none_message tls_1_3_results = None, none_message - expected = None, (f'TLS 1.2 not validated: {none_message}\n' - f'TLS 1.3 not validated: {none_message}') + expected = None, (f'TLS 1.2 not validated on port 443: {none_message}\n' + f'TLS 1.3 not validated on port 443: {none_message}') result = TLS_UTIL.process_tls_server_results(tls_1_2_results, - tls_1_3_results) + tls_1_3_results,port=443) self.assertEqual(result, expected) # TLS 1.2 Pass and TLS 1.3 None tls_1_2_results = True, success_message - expected = True, f'TLS 1.2 validated: {success_message}' + expected = True, f'TLS 1.2 validated on port 443: {success_message}' result = TLS_UTIL.process_tls_server_results(tls_1_2_results, - tls_1_3_results) + tls_1_3_results,port=443) self.assertEqual(result, expected) # TLS 1.2 Fail and TLS 1.3 None tls_1_2_results = False, fail_message - expected = False, f'TLS 1.2 not validated: {fail_message}' + expected = False, f'TLS 1.2 not validated on port 443: {fail_message}' result = TLS_UTIL.process_tls_server_results(tls_1_2_results, - tls_1_3_results) + tls_1_3_results,port=443) self.assertEqual(result, expected) # TLS 1.3 Pass and TLS 1.2 None tls_1_2_results = None, fail_message tls_1_3_results = True, success_message - expected = True, f'TLS 1.3 validated: {success_message}' + expected = True, f'TLS 1.3 validated on port 443: {success_message}' result = TLS_UTIL.process_tls_server_results(tls_1_2_results, - tls_1_3_results) + tls_1_3_results,port=443) self.assertEqual(result, expected) # TLS 1.3 Fail and TLS 1.2 None tls_1_3_results = False, fail_message - expected = False, f'TLS 1.3 not validated: {fail_message}' + expected = False, f'TLS 1.3 not validated on port 443: {fail_message}' result = TLS_UTIL.process_tls_server_results(tls_1_2_results, - tls_1_3_results) + tls_1_3_results,port=443) self.assertEqual(result, expected) # TLS 1.2 Pass and TLS 1.3 Pass tls_1_2_results = True, success_message tls_1_3_results = True, success_message - expected = True, (f'TLS 1.2 validated: {success_message}\n' - f'TLS 1.3 validated: {success_message}') + expected = True, (f'TLS 1.2 validated on port 443: {success_message}\n' + f'TLS 1.3 validated on port 443: {success_message}') result = TLS_UTIL.process_tls_server_results(tls_1_2_results, - tls_1_3_results) + tls_1_3_results,port=443) + self.assertEqual(result, expected) # TLS 1.2 Pass and TLS 1.3 Fail tls_1_2_results = True, success_message tls_1_3_results = False, fail_message - expected = True, (f'TLS 1.2 validated: {success_message}\n' - f'TLS 1.3 not validated: {fail_message}') + expected = True, (f'TLS 1.2 validated on port 443: {success_message}\n' + f'TLS 1.3 not validated on port 443: {fail_message}') result = TLS_UTIL.process_tls_server_results(tls_1_2_results, - tls_1_3_results) + tls_1_3_results,port=443) self.assertEqual(result, expected) # TLS 1.2 Fail and TLS 1.2 Pass tls_1_2_results = False, fail_message tls_1_3_results = True, success_message - expected = True, (f'TLS 1.2 not validated: {fail_message}\n' - f'TLS 1.3 validated: {success_message}') + expected = True, (f'TLS 1.2 not validated on port 443: {fail_message}\n' + f'TLS 1.3 validated on port 443: {success_message}') result = TLS_UTIL.process_tls_server_results(tls_1_2_results, - tls_1_3_results) + tls_1_3_results,port=443) self.assertEqual(result, expected) # TLS 1.2 Fail and TLS 1.2 Fail tls_1_3_results = False, fail_message - expected = False, (f'TLS 1.2 not validated: {fail_message}\n' - f'TLS 1.3 not validated: {fail_message}') + expected = False, (f'TLS 1.2 not validated on port 443: {fail_message}\n' + f'TLS 1.3 not validated on port 443: {fail_message}') result = TLS_UTIL.process_tls_server_results(tls_1_2_results, - tls_1_3_results) + tls_1_3_results,port=443) self.assertEqual(result, expected) # Test 1.2 server when 1.3 and 1.2 failed connection is established @@ -196,7 +197,7 @@ def security_tls_v1_2_fail_server_test(self): tls_1_2_results = False, 'Signature faild' tls_1_3_results = False, 'Signature faild' test_results = TLS_UTIL.process_tls_server_results(tls_1_2_results, - tls_1_3_results) + tls_1_3_results,port=443) self.assertFalse(test_results[0]) # Test 1.2 server when 1.3 and 1.2 failed connection is established @@ -205,7 +206,7 @@ def security_tls_v1_2_none_server_test(self): tls_1_2_results = None, 'No cert' tls_1_3_results = None, 'No cert' test_results = TLS_UTIL.process_tls_server_results(tls_1_2_results, - tls_1_3_results) + tls_1_3_results,port=443) self.assertIsNone(test_results[0]) def security_tls_v1_3_server_test(self): @@ -608,7 +609,7 @@ def download_public_cert(self, hostname, port=443): suite.addTest(TLSModuleTest('security_tls_v1_2_fail_server_test')) suite.addTest(TLSModuleTest('security_tls_v1_2_none_server_test')) - # # TLS 1.3 server tests + # TLS 1.3 server tests suite.addTest(TLSModuleTest('security_tls_v1_3_server_test')) # TLS client tests From d3fc978a47274b3047b118aa898776dfdcdd3771 Mon Sep 17 00:00:00 2001 From: jhughesbiot Date: Fri, 10 Jan 2025 16:57:31 -0700 Subject: [PATCH 2/3] pylint --- testing/unit/tls/tls_module_test.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/testing/unit/tls/tls_module_test.py b/testing/unit/tls/tls_module_test.py index f69f82fc3..205280329 100644 --- a/testing/unit/tls/tls_module_test.py +++ b/testing/unit/tls/tls_module_test.py @@ -163,7 +163,7 @@ def security_tls_server_results_test(self): f'TLS 1.3 validated on port 443: {success_message}') result = TLS_UTIL.process_tls_server_results(tls_1_2_results, tls_1_3_results,port=443) - + self.assertEqual(result, expected) # TLS 1.2 Pass and TLS 1.3 Fail From 448998b54cff1fb205517c77b5410464ebbafc0b Mon Sep 17 00:00:00 2001 From: MariusBaldovin Date: Mon, 13 Jan 2025 13:41:28 +0000 Subject: [PATCH 3/3] added tls filter into the pcap files processing --- modules/test/tls/python/src/tls_module.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/modules/test/tls/python/src/tls_module.py b/modules/test/tls/python/src/tls_module.py index d551956e8..433bb2943 100644 --- a/modules/test/tls/python/src/tls_module.py +++ b/modules/test/tls/python/src/tls_module.py @@ -356,8 +356,8 @@ def extract_certificates_from_pcap(self, pcap_files, mac_address): all_packets = [] # Iterate over each file for pcap_file in pcap_files: - # Open the capture file - packets = pyshark.FileCapture(pcap_file) + # Open the capture file and filter by tls + packets = pyshark.FileCapture(pcap_file, display_filter='tls') try: # Iterate over each packet in the file and add it to the list for packet in packets: