diff --git a/modules/test/tls/conf/module_config.json b/modules/test/tls/conf/module_config.json
index 9c83c85df..228fbb64d 100644
--- a/modules/test/tls/conf/module_config.json
+++ b/modules/test/tls/conf/module_config.json
@@ -9,7 +9,7 @@
"docker": {
"depends_on": "base",
"enable_container": true,
- "timeout": 420
+ "timeout": 540
},
"tests":[
{
diff --git a/modules/test/tls/python/requirements.txt b/modules/test/tls/python/requirements.txt
index a5719c77c..d7180f31b 100644
--- a/modules/test/tls/python/requirements.txt
+++ b/modules/test/tls/python/requirements.txt
@@ -18,4 +18,4 @@ pyOpenSSL==24.3.0
lxml==5.1.0 # Requirement of pyshark but if upgraded automatically above 5.1 will cause a
pyshark==0.6
requests==2.32.3
-
+python-nmap==0.7.1
diff --git a/modules/test/tls/python/src/http_scan.py b/modules/test/tls/python/src/http_scan.py
new file mode 100644
index 000000000..a25f5215d
--- /dev/null
+++ b/modules/test/tls/python/src/http_scan.py
@@ -0,0 +1,76 @@
+# Copyright 2023 Google LLC
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# https://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+"""Module that contains various methods for scaning for HTTP/HTTPS services"""
+import nmap
+import socket
+import ssl
+
+LOGGER = None
+
+
+class HTTPScan():
+ """Helper class to scan for all HTTP/HTTPS services for a device"""
+
+ def __init__(self, logger):
+ global LOGGER
+ LOGGER = logger
+
+ def scan_all_ports(self, ip):
+ """Scans all ports and identifies potential HTTP/HTTPS ports."""
+ nm = nmap.PortScanner()
+ nm.scan(hosts=ip, ports='1-65535', arguments='--open -sV')
+
+ http_ports = []
+ for host in nm.all_hosts():
+ for proto in nm[host].all_protocols():
+ for port in nm[host][proto].keys():
+ service = nm[host][proto][port]['name']
+ if 'http' in service:
+ http_ports.append(port)
+ return http_ports
+
+ def is_https(self, ip, port):
+ """Attempts a TLS handshake to determine if the port serves HTTPS."""
+ try:
+ context = ssl.create_default_context()
+ context.check_hostname = False
+ context.verify_mode = ssl.CERT_NONE
+ with socket.create_connection((ip, port), timeout=2) as sock:
+ with context.wrap_socket(sock, server_hostname=ip):
+ return True
+ except ssl.SSLError:
+ return False
+ except Exception: # pylint: disable=W0718
+ return False
+
+ def verify_http_or_https(self, ip, ports):
+ """Classifies each port as HTTP or HTTPS."""
+ results = {}
+ for port in ports:
+ if self.is_https(ip, port):
+ results[port] = 'HTTPS'
+ else:
+ results[port] = 'HTTP'
+ return results
+
+ def scan_for_http_services(self, ip_address):
+ LOGGER.info(f'Scanning for HTTP ports on {ip_address}')
+ http_ports = self.scan_all_ports(ip_address)
+ results = None
+ if len(http_ports) > 0:
+ LOGGER.info(f'Checking HTTP ports on {ip_address}: {http_ports}')
+ results = self.verify_http_or_https(ip_address, http_ports)
+ for port, service_type in results.items():
+ LOGGER.info(f'Port {port}: {service_type}')
+ return results
diff --git a/modules/test/tls/python/src/tls_module.py b/modules/test/tls/python/src/tls_module.py
index e9163843a..433bb2943 100644
--- a/modules/test/tls/python/src/tls_module.py
+++ b/modules/test/tls/python/src/tls_module.py
@@ -16,6 +16,7 @@
from test_module import TestModule
from tls_util import TLSUtil
+from http_scan import HTTPScan
import os
import pyshark
from binascii import hexlify
@@ -55,6 +56,8 @@ def __init__(self,
global LOGGER
LOGGER = self._get_logger()
self._tls_util = TLSUtil(LOGGER)
+ self._http_scan = HTTPScan(LOGGER)
+ self._scan_results = None
def generate_module_report(self):
html_content = '
TLS Module
'
@@ -353,8 +356,8 @@ def extract_certificates_from_pcap(self, pcap_files, mac_address):
all_packets = []
# Iterate over each file
for pcap_file in pcap_files:
- # Open the capture file
- packets = pyshark.FileCapture(pcap_file)
+ # Open the capture file and filter by tls
+ packets = pyshark.FileCapture(pcap_file, display_filter='tls')
try:
# Iterate over each packet in the file and add it to the list
for packet in packets:
@@ -387,53 +390,102 @@ def extract_certificates_from_pcap(self, pcap_files, mac_address):
def _security_tls_v1_2_server(self):
LOGGER.info('Running security.tls.v1_2_server')
- self._resolve_device_ip()
# If the ipv4 address wasn't resolved yet, try again
+ self._resolve_device_ip()
+ ports_valid = []
+ ports_invalid = []
+ result = None
+ details = ''
+ description = ''
if self._device_ipv4_addr is not None:
- tls_1_2_results = self._tls_util.validate_tls_server(
- self._device_ipv4_addr, tls_version='1.2')
- tls_1_3_results = self._tls_util.validate_tls_server(
- self._device_ipv4_addr, tls_version='1.3')
- results = self._tls_util.process_tls_server_results(
- tls_1_2_results, tls_1_3_results)
+ if self._scan_results is None:
+ self._scan_results = self._http_scan.scan_for_http_services(
+ self._device_ipv4_addr)
+ if self._scan_results is not None:
+ for port, service_type in self._scan_results.items():
+ if 'HTTPS' in service_type:
+ LOGGER.info(f'Inspecting Service on port {port}: {service_type}')
+ tls_1_2_results = self._tls_util.validate_tls_server(
+ host=self._device_ipv4_addr, port=port, tls_version='1.2')
+ tls_1_3_results = self._tls_util.validate_tls_server(
+ host=self._device_ipv4_addr, port=port, tls_version='1.3')
+ port_results = self._tls_util.process_tls_server_results(
+ tls_1_2_results, tls_1_3_results, port=port)
+ if port_results is not None:
+ result = port_results[
+ 0] if result is None else result and port_results[0]
+ details += port_results[1]
+ if port_results[0]:
+ ports_valid.append(port)
+ else:
+ ports_invalid.append(port)
+ elif 'HTTP' in service_type:
+ # Any non-HTTPS service detetcted is automatically invalid
+ ports_invalid.append(port)
+ details += f'\nHTTP service detected on port {port}'
+ result = False
+ LOGGER.debug(f'Valid Ports: {ports_valid}')
+ LOGGER.debug(f'Invalid Ports: {ports_invalid}')
# Determine results and return proper messaging and details
- description = ''
- result = results[0]
- details = results[1]
if result is None:
result = 'Feature Not Detected'
description = 'TLS 1.2 certificate could not be validated'
elif result:
- description = 'TLS 1.2 certificate is valid'
+ ports_csv = ','.join(map(str,ports_valid))
+ description = f'TLS 1.2 certificate valid on ports: {ports_csv}'
else:
- description = 'TLS 1.2 certificate is invalid'
+ ports_csv = ','.join(map(str,ports_invalid))
+ description = f'TLS 1.2 certificate invalid on ports: {ports_csv}'
return result, description, details
-
else:
LOGGER.error('Could not resolve device IP address. Skipping')
return 'Error', 'Could not resolve device IP address'
def _security_tls_v1_3_server(self):
LOGGER.info('Running security.tls.v1_3_server')
- self._resolve_device_ip()
# If the ipv4 address wasn't resolved yet, try again
+ self._resolve_device_ip()
+ ports_valid = []
+ ports_invalid = []
+ result = None
+ details = ''
+ description = ''
if self._device_ipv4_addr is not None:
- results = self._tls_util.validate_tls_server(self._device_ipv4_addr,
- tls_version='1.3')
+ if self._scan_results is None:
+ self._scan_results = self._http_scan.scan_for_http_services(
+ self._device_ipv4_addr)
+ if self._scan_results is not None:
+ for port, service_type in self._scan_results.items():
+ if 'HTTPS' in service_type:
+ LOGGER.info(f'Inspecting Service on port {port}: {service_type}')
+ port_results = self._tls_util.validate_tls_server(
+ self._device_ipv4_addr, tls_version='1.3', port=port)
+ if port_results is not None:
+ result = port_results[
+ 0] if result is None else result and port_results[0]
+ details += port_results[1]
+ if port_results[0]:
+ ports_valid.append(port)
+ else:
+ ports_invalid.append(port)
+ elif 'HTTP' in service_type:
+ # Any non-HTTPS service detetcted is automatically invalid
+ ports_invalid.append(port)
+ details += f'\nHTTP service detected on port {port}'
+ result = False
+ LOGGER.debug(f'Valid Ports: {ports_valid}')
+ LOGGER.debug(f'Invalid Ports: {ports_invalid}')
# Determine results and return proper messaging and details
- description = ''
- result = results[0]
- details = results[1]
- description = ''
if result is None:
result = 'Feature Not Detected'
description = 'TLS 1.3 certificate could not be validated'
- elif results[0]:
- description = 'TLS 1.3 certificate is valid'
+ elif result:
+ ports_csv = ','.join(map(str,ports_valid))
+ description = f'TLS 1.3 certificate valid on ports: {ports_csv}'
else:
- description = 'TLS 1.3 certificate is invalid'
+ ports_csv = ','.join(map(str,ports_invalid))
+ description = f'TLS 1.3 certificate invalid on ports: {ports_csv}'
return result, description, details
-
else:
LOGGER.error('Could not resolve device IP address')
return 'Error', 'Could not resolve device IP address'
@@ -472,14 +524,14 @@ def _security_tls_v1_0_client(self):
def _security_tls_v1_2_client(self):
LOGGER.info('Running security.tls.v1_2_client')
return self._validate_tls_client(self._device_mac,
- '1.2',
- unsupported_versions=['1.0', '1.1'])
+ '1.2',
+ unsupported_versions=['1.0', '1.1'])
def _security_tls_v1_3_client(self):
LOGGER.info('Running security.tls.v1_3_client')
return self._validate_tls_client(self._device_mac,
- '1.3',
- unsupported_versions=['1.0', '1.1'])
+ '1.3',
+ unsupported_versions=['1.0', '1.1'])
def _validate_tls_client(self,
client_mac,
diff --git a/modules/test/tls/python/src/tls_util.py b/modules/test/tls/python/src/tls_util.py
index 37cd89133..d92379aab 100644
--- a/modules/test/tls/python/src/tls_util.py
+++ b/modules/test/tls/python/src/tls_util.py
@@ -221,11 +221,11 @@ def verify_public_key(self, public_key):
else:
return False, 'Key is not RSA or EC type'
- def validate_signature(self, host):
+ def validate_signature(self, host, port):
# Reconnect to the device but with validate signature option
# set to true which will check for proper cert chains
# within the valid CA root certs stored on the server
- if self.validate_trusted_ca_signature(host):
+ if self.validate_trusted_ca_signature(host, port):
LOGGER.info('Authorized Certificate Authority signature confirmed')
return True, 'Authorized Certificate Authority signature confirmed'
else:
@@ -261,13 +261,14 @@ def validate_local_ca_signature(self, device_cert_path):
LOGGER.error(str(e))
return False, None
- def validate_trusted_ca_signature(self, host):
+ def validate_trusted_ca_signature(self, host, port):
# Reconnect to the device but with validate signature option
# set to true which will check for proper cert chains
# within the valid CA root certs stored on the server
LOGGER.info(
'Checking for valid signature from authorized Certificate Authorities')
- public_cert = self.get_public_certificate(host,
+ public_cert = self.get_public_certificate(host=host,
+ port=port,
validate_cert=True,
tls_version='1.2')
if public_cert:
@@ -377,34 +378,41 @@ def get_certificate(self, uri, timeout=10):
LOGGER.error(f'Error fetching certificate from URI: {e}')
return certificate
- def process_tls_server_results(self, tls_1_2_results, tls_1_3_results):
+ def process_tls_server_results(self, tls_1_2_results, tls_1_3_results, port):
results = ''
if tls_1_2_results[0] is None and tls_1_3_results[0] is not None:
# Validate only TLS 1.3 results
- description = 'TLS 1.3' + (' not' if not tls_1_3_results[0] else
- '') + ' validated: ' + tls_1_3_results[1]
+ description = (f"""TLS 1.3 {'' if tls_1_3_results[0] else 'not '}"""
+ f"""validated on port {port}: """
+ f"""{tls_1_3_results[1]}""")
results = tls_1_3_results[0], description
elif tls_1_3_results[0] is None and tls_1_2_results[0] is not None:
# Vaidate only TLS 1.2 results
- description = 'TLS 1.2' + (' not' if not tls_1_2_results[0] else
- '') + ' validated: ' + tls_1_2_results[1]
+ description = (f"""TLS 1.2 {'' if tls_1_2_results[0] else 'not '}"""
+ f"""validated on port {port}: """
+ f"""{tls_1_2_results[1]}""")
results = tls_1_2_results[0], description
elif tls_1_3_results[0] is not None and tls_1_2_results[0] is not None:
# Validate both results
- description = 'TLS 1.2' + (' not' if not tls_1_2_results[0] else
- '') + ' validated: ' + tls_1_2_results[1]
- description += '\nTLS 1.3' + (' not' if not tls_1_3_results[0] else
- '') + ' validated: ' + tls_1_3_results[1]
+ description = (f"""TLS 1.2 {'' if tls_1_2_results[0] else 'not '}"""
+ f"""validated on port {port}: """
+ f"""{tls_1_2_results[1]}""")
+ description += '\n'+(f"""TLS 1.3 {'' if tls_1_3_results[0] else 'not '}"""
+ f"""validated on port {port}: """
+ f"""{tls_1_3_results[1]}""")
results = tls_1_2_results[0] or tls_1_3_results[0], description
else:
- description = f'TLS 1.2 not validated: {tls_1_2_results[1]}'
- description += f'\nTLS 1.3 not validated: {tls_1_3_results[1]}'
+ description = (f"""TLS 1.2 not validated on port {port}: """
+ f"""{tls_1_2_results[1]}""")
+ description += '\n'+(f"""TLS 1.3 not validated on port {port}: """
+ f"""{tls_1_3_results[1]}""")
results = None, description
LOGGER.info('TLS server test results: ' + str(results))
return results
- def validate_tls_server(self, host, tls_version):
- cert_pem = self.get_public_certificate(host,
+ def validate_tls_server(self, host, tls_version, port=443):
+ cert_pem = self.get_public_certificate(host=host,
+ port=port,
validate_cert=False,
tls_version=tls_version)
if cert_pem:
@@ -430,13 +438,12 @@ def validate_tls_server(self, host, tls_version):
else:
key_valid = [0]
- sig_valid = self.validate_signature(host)
+ sig_valid = self.validate_signature(host=host, port=port)
# Check results
cert_valid = tr_valid[0] and key_valid[0] and sig_valid[0]
test_details = tr_valid[1] + '\n' + key_valid[1] + '\n' + sig_valid[1]
LOGGER.info('Certificate validated: ' + str(cert_valid))
- LOGGER.info('Test details:\n' + test_details)
return cert_valid, test_details
else:
LOGGER.info('Failed to resolve public certificate')
@@ -632,7 +639,7 @@ def get_non_tls_client_connection_ips(self, client_mac, capture_files):
# Packet is not ACK or SYN
src_ip = ipaddress.ip_address(
- packet['_source']['layers']['ip.src'][0])
+ packet['_source']['layers']['ip.src'][0])
src_subnet = ipaddress.ip_network(src_ip, strict=False)
subnet_with_mask = ipaddress.ip_network(
src_subnet, strict=False).supernet(new_prefix=24)
diff --git a/testing/unit/tls/tls_module_test.py b/testing/unit/tls/tls_module_test.py
index f42c7e9d4..205280329 100644
--- a/testing/unit/tls/tls_module_test.py
+++ b/testing/unit/tls/tls_module_test.py
@@ -71,7 +71,7 @@ def security_tls_v1_2_server_test(self):
tls_version='1.2')
tls_1_3_results = None, 'No TLS 1.3'
test_results = TLS_UTIL.process_tls_server_results(tls_1_2_results,
- tls_1_3_results)
+ tls_1_3_results,port=443)
self.assertTrue(test_results[0])
# Test 1.2 server when 1.3 connection is established
@@ -80,7 +80,7 @@ def security_tls_v1_2_for_1_3_server_test(self):
tls_1_3_results = TLS_UTIL.validate_tls_server('google.com',
tls_version='1.3')
test_results = TLS_UTIL.process_tls_server_results(tls_1_2_results,
- tls_1_3_results)
+ tls_1_3_results,port=443)
self.assertTrue(test_results[0])
# Test 1.2 server when 1.2 and 1.3 connection is established
@@ -90,7 +90,7 @@ def security_tls_v1_2_for_1_2_and_1_3_server_test(self):
tls_1_3_results = TLS_UTIL.validate_tls_server('google.com',
tls_version='1.3')
test_results = TLS_UTIL.process_tls_server_results(tls_1_2_results,
- tls_1_3_results)
+ tls_1_3_results,port=443)
self.assertTrue(test_results[0])
# Test 1.2 server when 1.2 and failed 1.3 connection is established
@@ -99,7 +99,7 @@ def security_tls_v1_2_for_1_2_and_1_3_fail_server_test(self):
tls_version='1.2')
tls_1_3_results = False, 'Signature faild'
test_results = TLS_UTIL.process_tls_server_results(tls_1_2_results,
- tls_1_3_results)
+ tls_1_3_results,port=443)
self.assertTrue(test_results[0])
# Test 1.2 server when 1.3 and failed 1.2 connection is established
@@ -108,10 +108,10 @@ def security_tls_v1_2_for_1_3_and_1_2_fail_server_test(self):
tls_version='1.3')
tls_1_2_results = False, 'Signature faild'
test_results = TLS_UTIL.process_tls_server_results(tls_1_2_results,
- tls_1_3_results)
+ tls_1_3_results,port=443)
self.assertTrue(test_results[0])
- def security_tls_server_results_test(self, ):
+ def security_tls_server_results_test(self):
# Generic messages to test they are passing through
# to the results as expected
fail_message = 'Certificate not validated'
@@ -121,74 +121,75 @@ def security_tls_server_results_test(self, ):
# Both None
tls_1_2_results = None, none_message
tls_1_3_results = None, none_message
- expected = None, (f'TLS 1.2 not validated: {none_message}\n'
- f'TLS 1.3 not validated: {none_message}')
+ expected = None, (f'TLS 1.2 not validated on port 443: {none_message}\n'
+ f'TLS 1.3 not validated on port 443: {none_message}')
result = TLS_UTIL.process_tls_server_results(tls_1_2_results,
- tls_1_3_results)
+ tls_1_3_results,port=443)
self.assertEqual(result, expected)
# TLS 1.2 Pass and TLS 1.3 None
tls_1_2_results = True, success_message
- expected = True, f'TLS 1.2 validated: {success_message}'
+ expected = True, f'TLS 1.2 validated on port 443: {success_message}'
result = TLS_UTIL.process_tls_server_results(tls_1_2_results,
- tls_1_3_results)
+ tls_1_3_results,port=443)
self.assertEqual(result, expected)
# TLS 1.2 Fail and TLS 1.3 None
tls_1_2_results = False, fail_message
- expected = False, f'TLS 1.2 not validated: {fail_message}'
+ expected = False, f'TLS 1.2 not validated on port 443: {fail_message}'
result = TLS_UTIL.process_tls_server_results(tls_1_2_results,
- tls_1_3_results)
+ tls_1_3_results,port=443)
self.assertEqual(result, expected)
# TLS 1.3 Pass and TLS 1.2 None
tls_1_2_results = None, fail_message
tls_1_3_results = True, success_message
- expected = True, f'TLS 1.3 validated: {success_message}'
+ expected = True, f'TLS 1.3 validated on port 443: {success_message}'
result = TLS_UTIL.process_tls_server_results(tls_1_2_results,
- tls_1_3_results)
+ tls_1_3_results,port=443)
self.assertEqual(result, expected)
# TLS 1.3 Fail and TLS 1.2 None
tls_1_3_results = False, fail_message
- expected = False, f'TLS 1.3 not validated: {fail_message}'
+ expected = False, f'TLS 1.3 not validated on port 443: {fail_message}'
result = TLS_UTIL.process_tls_server_results(tls_1_2_results,
- tls_1_3_results)
+ tls_1_3_results,port=443)
self.assertEqual(result, expected)
# TLS 1.2 Pass and TLS 1.3 Pass
tls_1_2_results = True, success_message
tls_1_3_results = True, success_message
- expected = True, (f'TLS 1.2 validated: {success_message}\n'
- f'TLS 1.3 validated: {success_message}')
+ expected = True, (f'TLS 1.2 validated on port 443: {success_message}\n'
+ f'TLS 1.3 validated on port 443: {success_message}')
result = TLS_UTIL.process_tls_server_results(tls_1_2_results,
- tls_1_3_results)
+ tls_1_3_results,port=443)
+
self.assertEqual(result, expected)
# TLS 1.2 Pass and TLS 1.3 Fail
tls_1_2_results = True, success_message
tls_1_3_results = False, fail_message
- expected = True, (f'TLS 1.2 validated: {success_message}\n'
- f'TLS 1.3 not validated: {fail_message}')
+ expected = True, (f'TLS 1.2 validated on port 443: {success_message}\n'
+ f'TLS 1.3 not validated on port 443: {fail_message}')
result = TLS_UTIL.process_tls_server_results(tls_1_2_results,
- tls_1_3_results)
+ tls_1_3_results,port=443)
self.assertEqual(result, expected)
# TLS 1.2 Fail and TLS 1.2 Pass
tls_1_2_results = False, fail_message
tls_1_3_results = True, success_message
- expected = True, (f'TLS 1.2 not validated: {fail_message}\n'
- f'TLS 1.3 validated: {success_message}')
+ expected = True, (f'TLS 1.2 not validated on port 443: {fail_message}\n'
+ f'TLS 1.3 validated on port 443: {success_message}')
result = TLS_UTIL.process_tls_server_results(tls_1_2_results,
- tls_1_3_results)
+ tls_1_3_results,port=443)
self.assertEqual(result, expected)
# TLS 1.2 Fail and TLS 1.2 Fail
tls_1_3_results = False, fail_message
- expected = False, (f'TLS 1.2 not validated: {fail_message}\n'
- f'TLS 1.3 not validated: {fail_message}')
+ expected = False, (f'TLS 1.2 not validated on port 443: {fail_message}\n'
+ f'TLS 1.3 not validated on port 443: {fail_message}')
result = TLS_UTIL.process_tls_server_results(tls_1_2_results,
- tls_1_3_results)
+ tls_1_3_results,port=443)
self.assertEqual(result, expected)
# Test 1.2 server when 1.3 and 1.2 failed connection is established
@@ -196,7 +197,7 @@ def security_tls_v1_2_fail_server_test(self):
tls_1_2_results = False, 'Signature faild'
tls_1_3_results = False, 'Signature faild'
test_results = TLS_UTIL.process_tls_server_results(tls_1_2_results,
- tls_1_3_results)
+ tls_1_3_results,port=443)
self.assertFalse(test_results[0])
# Test 1.2 server when 1.3 and 1.2 failed connection is established
@@ -205,7 +206,7 @@ def security_tls_v1_2_none_server_test(self):
tls_1_2_results = None, 'No cert'
tls_1_3_results = None, 'No cert'
test_results = TLS_UTIL.process_tls_server_results(tls_1_2_results,
- tls_1_3_results)
+ tls_1_3_results,port=443)
self.assertIsNone(test_results[0])
def security_tls_v1_3_server_test(self):
@@ -608,7 +609,7 @@ def download_public_cert(self, hostname, port=443):
suite.addTest(TLSModuleTest('security_tls_v1_2_fail_server_test'))
suite.addTest(TLSModuleTest('security_tls_v1_2_none_server_test'))
- # # TLS 1.3 server tests
+ # TLS 1.3 server tests
suite.addTest(TLSModuleTest('security_tls_v1_3_server_test'))
# TLS client tests