Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion modules/test/tls/conf/module_config.json
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
"docker": {
"depends_on": "base",
"enable_container": true,
"timeout": 420
"timeout": 540
},
"tests":[
{
Expand Down
2 changes: 1 addition & 1 deletion modules/test/tls/python/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,4 @@ pyOpenSSL==24.3.0
lxml==5.1.0 # Requirement of pyshark but if upgraded automatically above 5.1 will cause a
pyshark==0.6
requests==2.32.3

python-nmap==0.7.1
76 changes: 76 additions & 0 deletions modules/test/tls/python/src/http_scan.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
# Copyright 2023 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Module that contains various methods for scaning for HTTP/HTTPS services"""
import nmap
import socket
import ssl

LOGGER = None


class HTTPScan():
"""Helper class to scan for all HTTP/HTTPS services for a device"""

def __init__(self, logger):
global LOGGER
LOGGER = logger

def scan_all_ports(self, ip):
"""Scans all ports and identifies potential HTTP/HTTPS ports."""
nm = nmap.PortScanner()
nm.scan(hosts=ip, ports='1-65535', arguments='--open -sV')

http_ports = []
for host in nm.all_hosts():
for proto in nm[host].all_protocols():
for port in nm[host][proto].keys():
service = nm[host][proto][port]['name']
if 'http' in service:
http_ports.append(port)
return http_ports

def is_https(self, ip, port):
"""Attempts a TLS handshake to determine if the port serves HTTPS."""
try:
context = ssl.create_default_context()
context.check_hostname = False
context.verify_mode = ssl.CERT_NONE
with socket.create_connection((ip, port), timeout=2) as sock:
with context.wrap_socket(sock, server_hostname=ip):
return True
except ssl.SSLError:
return False
except Exception: # pylint: disable=W0718
return False

def verify_http_or_https(self, ip, ports):
"""Classifies each port as HTTP or HTTPS."""
results = {}
for port in ports:
if self.is_https(ip, port):
results[port] = 'HTTPS'
else:
results[port] = 'HTTP'
return results

def scan_for_http_services(self, ip_address):
LOGGER.info(f'Scanning for HTTP ports on {ip_address}')
http_ports = self.scan_all_ports(ip_address)
results = None
if len(http_ports) > 0:
LOGGER.info(f'Checking HTTP ports on {ip_address}: {http_ports}')
results = self.verify_http_or_https(ip_address, http_ports)
for port, service_type in results.items():
LOGGER.info(f'Port {port}: {service_type}')
return results
112 changes: 82 additions & 30 deletions modules/test/tls/python/src/tls_module.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

from test_module import TestModule
from tls_util import TLSUtil
from http_scan import HTTPScan
import os
import pyshark
from binascii import hexlify
Expand Down Expand Up @@ -55,6 +56,8 @@ def __init__(self,
global LOGGER
LOGGER = self._get_logger()
self._tls_util = TLSUtil(LOGGER)
self._http_scan = HTTPScan(LOGGER)
self._scan_results = None

def generate_module_report(self):
html_content = '<h4 class="page-heading">TLS Module</h4>'
Expand Down Expand Up @@ -353,8 +356,8 @@ def extract_certificates_from_pcap(self, pcap_files, mac_address):
all_packets = []
# Iterate over each file
for pcap_file in pcap_files:
# Open the capture file
packets = pyshark.FileCapture(pcap_file)
# Open the capture file and filter by tls
packets = pyshark.FileCapture(pcap_file, display_filter='tls')
try:
# Iterate over each packet in the file and add it to the list
for packet in packets:
Expand Down Expand Up @@ -387,53 +390,102 @@ def extract_certificates_from_pcap(self, pcap_files, mac_address):

def _security_tls_v1_2_server(self):
LOGGER.info('Running security.tls.v1_2_server')
self._resolve_device_ip()
# If the ipv4 address wasn't resolved yet, try again
self._resolve_device_ip()
ports_valid = []
ports_invalid = []
result = None
details = ''
description = ''
if self._device_ipv4_addr is not None:
tls_1_2_results = self._tls_util.validate_tls_server(
self._device_ipv4_addr, tls_version='1.2')
tls_1_3_results = self._tls_util.validate_tls_server(
self._device_ipv4_addr, tls_version='1.3')
results = self._tls_util.process_tls_server_results(
tls_1_2_results, tls_1_3_results)
if self._scan_results is None:
self._scan_results = self._http_scan.scan_for_http_services(
self._device_ipv4_addr)
if self._scan_results is not None:
for port, service_type in self._scan_results.items():
if 'HTTPS' in service_type:
LOGGER.info(f'Inspecting Service on port {port}: {service_type}')
tls_1_2_results = self._tls_util.validate_tls_server(
host=self._device_ipv4_addr, port=port, tls_version='1.2')
tls_1_3_results = self._tls_util.validate_tls_server(
host=self._device_ipv4_addr, port=port, tls_version='1.3')
port_results = self._tls_util.process_tls_server_results(
tls_1_2_results, tls_1_3_results, port=port)
if port_results is not None:
result = port_results[
0] if result is None else result and port_results[0]
details += port_results[1]
if port_results[0]:
ports_valid.append(port)
else:
ports_invalid.append(port)
elif 'HTTP' in service_type:
# Any non-HTTPS service detetcted is automatically invalid
ports_invalid.append(port)
details += f'\nHTTP service detected on port {port}'
result = False
LOGGER.debug(f'Valid Ports: {ports_valid}')
LOGGER.debug(f'Invalid Ports: {ports_invalid}')
# Determine results and return proper messaging and details
description = ''
result = results[0]
details = results[1]
if result is None:
result = 'Feature Not Detected'
description = 'TLS 1.2 certificate could not be validated'
elif result:
description = 'TLS 1.2 certificate is valid'
ports_csv = ','.join(map(str,ports_valid))
description = f'TLS 1.2 certificate valid on ports: {ports_csv}'
else:
description = 'TLS 1.2 certificate is invalid'
ports_csv = ','.join(map(str,ports_invalid))
description = f'TLS 1.2 certificate invalid on ports: {ports_csv}'
return result, description, details

else:
LOGGER.error('Could not resolve device IP address. Skipping')
return 'Error', 'Could not resolve device IP address'

def _security_tls_v1_3_server(self):
LOGGER.info('Running security.tls.v1_3_server')
self._resolve_device_ip()
# If the ipv4 address wasn't resolved yet, try again
self._resolve_device_ip()
ports_valid = []
ports_invalid = []
result = None
details = ''
description = ''
if self._device_ipv4_addr is not None:
results = self._tls_util.validate_tls_server(self._device_ipv4_addr,
tls_version='1.3')
if self._scan_results is None:
self._scan_results = self._http_scan.scan_for_http_services(
self._device_ipv4_addr)
if self._scan_results is not None:
for port, service_type in self._scan_results.items():
if 'HTTPS' in service_type:
LOGGER.info(f'Inspecting Service on port {port}: {service_type}')
port_results = self._tls_util.validate_tls_server(
self._device_ipv4_addr, tls_version='1.3', port=port)
if port_results is not None:
result = port_results[
0] if result is None else result and port_results[0]
details += port_results[1]
if port_results[0]:
ports_valid.append(port)
else:
ports_invalid.append(port)
elif 'HTTP' in service_type:
# Any non-HTTPS service detetcted is automatically invalid
ports_invalid.append(port)
details += f'\nHTTP service detected on port {port}'
result = False
LOGGER.debug(f'Valid Ports: {ports_valid}')
LOGGER.debug(f'Invalid Ports: {ports_invalid}')
# Determine results and return proper messaging and details
description = ''
result = results[0]
details = results[1]
description = ''
if result is None:
result = 'Feature Not Detected'
description = 'TLS 1.3 certificate could not be validated'
elif results[0]:
description = 'TLS 1.3 certificate is valid'
elif result:
ports_csv = ','.join(map(str,ports_valid))
description = f'TLS 1.3 certificate valid on ports: {ports_csv}'
else:
description = 'TLS 1.3 certificate is invalid'
ports_csv = ','.join(map(str,ports_invalid))
description = f'TLS 1.3 certificate invalid on ports: {ports_csv}'
return result, description, details

else:
LOGGER.error('Could not resolve device IP address')
return 'Error', 'Could not resolve device IP address'
Expand Down Expand Up @@ -472,14 +524,14 @@ def _security_tls_v1_0_client(self):
def _security_tls_v1_2_client(self):
LOGGER.info('Running security.tls.v1_2_client')
return self._validate_tls_client(self._device_mac,
'1.2',
unsupported_versions=['1.0', '1.1'])
'1.2',
unsupported_versions=['1.0', '1.1'])

def _security_tls_v1_3_client(self):
LOGGER.info('Running security.tls.v1_3_client')
return self._validate_tls_client(self._device_mac,
'1.3',
unsupported_versions=['1.0', '1.1'])
'1.3',
unsupported_versions=['1.0', '1.1'])

def _validate_tls_client(self,
client_mac,
Expand Down
47 changes: 27 additions & 20 deletions modules/test/tls/python/src/tls_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -221,11 +221,11 @@ def verify_public_key(self, public_key):
else:
return False, 'Key is not RSA or EC type'

def validate_signature(self, host):
def validate_signature(self, host, port):
# Reconnect to the device but with validate signature option
# set to true which will check for proper cert chains
# within the valid CA root certs stored on the server
if self.validate_trusted_ca_signature(host):
if self.validate_trusted_ca_signature(host, port):
LOGGER.info('Authorized Certificate Authority signature confirmed')
return True, 'Authorized Certificate Authority signature confirmed'
else:
Expand Down Expand Up @@ -261,13 +261,14 @@ def validate_local_ca_signature(self, device_cert_path):
LOGGER.error(str(e))
return False, None

def validate_trusted_ca_signature(self, host):
def validate_trusted_ca_signature(self, host, port):
# Reconnect to the device but with validate signature option
# set to true which will check for proper cert chains
# within the valid CA root certs stored on the server
LOGGER.info(
'Checking for valid signature from authorized Certificate Authorities')
public_cert = self.get_public_certificate(host,
public_cert = self.get_public_certificate(host=host,
port=port,
validate_cert=True,
tls_version='1.2')
if public_cert:
Expand Down Expand Up @@ -377,34 +378,41 @@ def get_certificate(self, uri, timeout=10):
LOGGER.error(f'Error fetching certificate from URI: {e}')
return certificate

def process_tls_server_results(self, tls_1_2_results, tls_1_3_results):
def process_tls_server_results(self, tls_1_2_results, tls_1_3_results, port):
results = ''
if tls_1_2_results[0] is None and tls_1_3_results[0] is not None:
# Validate only TLS 1.3 results
description = 'TLS 1.3' + (' not' if not tls_1_3_results[0] else
'') + ' validated: ' + tls_1_3_results[1]
description = (f"""TLS 1.3 {'' if tls_1_3_results[0] else 'not '}"""
f"""validated on port {port}: """
f"""{tls_1_3_results[1]}""")
results = tls_1_3_results[0], description
elif tls_1_3_results[0] is None and tls_1_2_results[0] is not None:
# Vaidate only TLS 1.2 results
description = 'TLS 1.2' + (' not' if not tls_1_2_results[0] else
'') + ' validated: ' + tls_1_2_results[1]
description = (f"""TLS 1.2 {'' if tls_1_2_results[0] else 'not '}"""
f"""validated on port {port}: """
f"""{tls_1_2_results[1]}""")
results = tls_1_2_results[0], description
elif tls_1_3_results[0] is not None and tls_1_2_results[0] is not None:
# Validate both results
description = 'TLS 1.2' + (' not' if not tls_1_2_results[0] else
'') + ' validated: ' + tls_1_2_results[1]
description += '\nTLS 1.3' + (' not' if not tls_1_3_results[0] else
'') + ' validated: ' + tls_1_3_results[1]
description = (f"""TLS 1.2 {'' if tls_1_2_results[0] else 'not '}"""
f"""validated on port {port}: """
f"""{tls_1_2_results[1]}""")
description += '\n'+(f"""TLS 1.3 {'' if tls_1_3_results[0] else 'not '}"""
f"""validated on port {port}: """
f"""{tls_1_3_results[1]}""")
results = tls_1_2_results[0] or tls_1_3_results[0], description
else:
description = f'TLS 1.2 not validated: {tls_1_2_results[1]}'
description += f'\nTLS 1.3 not validated: {tls_1_3_results[1]}'
description = (f"""TLS 1.2 not validated on port {port}: """
f"""{tls_1_2_results[1]}""")
description += '\n'+(f"""TLS 1.3 not validated on port {port}: """
f"""{tls_1_3_results[1]}""")
results = None, description
LOGGER.info('TLS server test results: ' + str(results))
return results

def validate_tls_server(self, host, tls_version):
cert_pem = self.get_public_certificate(host,
def validate_tls_server(self, host, tls_version, port=443):
cert_pem = self.get_public_certificate(host=host,
port=port,
validate_cert=False,
tls_version=tls_version)
if cert_pem:
Expand All @@ -430,13 +438,12 @@ def validate_tls_server(self, host, tls_version):
else:
key_valid = [0]

sig_valid = self.validate_signature(host)
sig_valid = self.validate_signature(host=host, port=port)

# Check results
cert_valid = tr_valid[0] and key_valid[0] and sig_valid[0]
test_details = tr_valid[1] + '\n' + key_valid[1] + '\n' + sig_valid[1]
LOGGER.info('Certificate validated: ' + str(cert_valid))
LOGGER.info('Test details:\n' + test_details)
return cert_valid, test_details
else:
LOGGER.info('Failed to resolve public certificate')
Expand Down Expand Up @@ -632,7 +639,7 @@ def get_non_tls_client_connection_ips(self, client_mac, capture_files):
# Packet is not ACK or SYN

src_ip = ipaddress.ip_address(
packet['_source']['layers']['ip.src'][0])
packet['_source']['layers']['ip.src'][0])
src_subnet = ipaddress.ip_network(src_ip, strict=False)
subnet_with_mask = ipaddress.ip_network(
src_subnet, strict=False).supernet(new_prefix=24)
Expand Down
Loading
Loading