Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
233 changes: 81 additions & 152 deletions modules/test/tls/python/src/tls_module.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,16 @@
from cryptography.hazmat.primitives.asymmetric import rsa, dsa, ec
from cryptography.x509 import AuthorityKeyIdentifier, SubjectKeyIdentifier, BasicConstraints, KeyUsage
from cryptography.x509 import GeneralNames, DNSName, ExtendedKeyUsage, ObjectIdentifier, SubjectAlternativeName
from jinja2 import Environment, FileSystemLoader

LOG_NAME = 'test_tls'
MODULE_REPORT_FILE_NAME = 'tls_report.html'
MODULE_REPORT_FILE_NAME = 'tls_report.j2.html'
STARTUP_CAPTURE_FILE = '/runtime/device/startup.pcap'
MONITOR_CAPTURE_FILE = '/runtime/device/monitor.pcap'
TLS_CAPTURE_FILE = '/runtime/output/tls.pcap'
GATEWAY_CAPTURE_FILE = '/runtime/network/gateway.pcap'
LOGGER = None

REPORT_TEMPLATE_FILE = 'report_template.jinja2'

class TLSModule(TestModule):
"""The TLS testing module."""
Expand All @@ -57,7 +58,24 @@ def __init__(self,
self._tls_util = TLSUtil(LOGGER)

def generate_module_report(self):
html_content = '<h4 class="page-heading">TLS Module</h4>'
# Load Jinja2 template
loader=FileSystemLoader(self._report_template_folder)
template = Environment(loader=loader).get_template(REPORT_TEMPLATE_FILE)
module_header='TLS Module'
# Summary table headers
summary_headers = [
'Expiry',
'Length',
'Type',
'Port number',
'Signed by',
]
# Cert table headers
cert_table_headers = ['Property', 'Value']
# Outbound connections table headers
outbound_headers = ['Destination IP', 'Port']
pages = {}
outbound_conns = None

# List of capture files to scan
pcap_files = [
Expand All @@ -66,39 +84,12 @@ def generate_module_report(self):
]
certificates = self.extract_certificates_from_pcap(pcap_files,
self._device_mac)

if len(certificates) > 0:

cert_tables = []
# pylint: disable=W0612
for cert_num, ((ip_address, port),
cert) in enumerate(certificates.items()):

# Add summary table
summary_table = '''
<table class="module-summary" style="width:100%;">
<thead>
<tr>
<th>Expiry</th>
<th>Length</th>
<th>Type</th>
<th>Port number</th>
<th>Signed by</th>
</tr>
</thead>
<tbody>
'''

# Generate the certificate table
cert_table = '''
<table class="module-data">
<thead>
<tr>
<th>Property</th>
<th>Value</th>
</tr>
</thead>
<tbody>'''
pages[cert_num] = {}

# Extract certificate data
not_valid_before = cert.not_valid_before
Expand All @@ -124,153 +115,91 @@ def generate_module_report(self):
cert.public_bytes(encoding=serialization.Encoding.DER))

# Append certification information
cert_table += f'''
<tr>
<td>Version</td>
<td>{version_value}</td>
</tr>
<tr>
<td>Signature Alg.</td>
<td>{signature_alg_value}</td>
</tr>
<tr>
<td>Validity from</td>
<td>{not_before}</td>
</tr>
<tr>
<td>Valid to</td>
<td>{not_after}</td>
</tr>
</tbody>
</table>
'''

subject_table = '''
<table class="module-data">
<thead>
<tr>
<th>Property</th>
<th>Value</th>
</tr>
</thead>
<tbody>'''
pages[cert_num]['cert_info_data'] = {
'Version': version_value,
'Signature Alg.': signature_alg_value,
'Validity from': not_before,
'Valid to': not_after,
}

# Append the subject information
pages[cert_num]['subject_data'] = {}
for val in cert.subject.rdns:
dn = val.rfc4514_string().split('=')
subject_table += f'''
<tr>
<td>{dn[0]}</td>
<td>{dn[1]}</td>
</tr>
'''

subject_table += '''
</tbody>
</table>'''
pages[cert_num]['subject_data'][dn[0]] = dn[1]

# Append issuer information
for val in cert.issuer.rdns:
dn = val.rfc4514_string().split('=')
if 'CN' in dn[0]:
signed_by = dn[1]

ext_table = ''

# Append extensions information
if cert.extensions:

ext_table = '''
<h5>Certificate Extensions</h5>
<table class="module-data" style="margin-bottom:20px;">
<thead>
<tr>
<th>Property</th>
<th>Value</th>
</tr>
</thead>
<tbody>'''

pages[cert_num]['cert_ext'] = {}
for extension in cert.extensions:
if isinstance(extension.value, list):
for extension_value in extension.value:
ext_table += f'''
<tr>
<td>{extension.oid._name}</td>
<td>{self.format_extension_value(extension_value.value)}</td>
</tr>
'''
extension_name = extension.oid._name
formatted_value = self.format_extension_value(
extension_value.value)
pages[cert_num]['cert_ext'][extension_name] = formatted_value
else:
ext_table += f'''
<tr>
<td>{extension.oid._name}</td>
<td>{self.format_extension_value(extension.value)}</td>
</tr>
'''

ext_table += '''
</tbody>
</table>'''

# Add summary table row
summary_table += f'''
<tr>
<td>{not_after}</td>
<td>{cert_length}</td>
<td>{public_key_type}</td>
<td>{port}</td>
<td>{signed_by}</td>
</tr>
</tbody>
</table>
'''

# Merge all table HTML
summary_table = f'\n{summary_table}'

summary_table += f'''
<div id="paired" style="display:flex;justify-content:space-between;">
<div style="margin-right:20px;">
<h5>Certificate Information</h5>
{cert_table}
</div>
<div>
<h5>Subject Information</h5>
{subject_table}
</div>
</div>'''

if ext_table is not None:
summary_table += f'\n\n{ext_table}'

cert_tables.append(summary_table)
formatted_value = self.format_extension_value(
extension.value)
pages[cert_num]['cert_ext'][extension.oid._name] = formatted_value

pages[cert_num]['summary_data'] = [
not_after,
cert_length,
public_key_type,
port,
signed_by
]

outbound_conns = self._tls_util.get_all_outbound_connections(
device_mac=self._device_mac, capture_files=pcap_files)
conn_table = self.generate_outbound_connection_table(outbound_conns)

html_content += summary_table + '\n'.join('\n' + tables
for tables in cert_tables)
html_content += conn_table

report_jinja = ''
if pages:
for num,page in pages.items():
module_header_repr = module_header if num == 0 else None
cert_ext=page['cert_ext'] if 'cert_ext' in page else None
page_html = template.render(
base_template=self._base_template_file,
module_header=module_header_repr,
summary_headers=summary_headers,
summary_data=page['summary_data'],
cert_info_data=page['cert_info_data'],
subject_data=page['subject_data'],
cert_table_headers=cert_table_headers,
cert_ext=cert_ext,
ountbound_headers=outbound_headers,
)
report_jinja += page_html
if outbound_conns:
out_page = template.render(
base_template=self._base_template_file,
ountbound_headers=outbound_headers,
outbound_conns=outbound_conns
)
report_jinja += out_page
else:
html_content += ('''
<div class="callout-container info">
<div class="icon"></div>
No TLS certificates found on the device
</div>''')

LOGGER.debug('Module report:\n' + html_content)
report_jinja = template.render(
base_template=self._base_template_file,
module_header = module_header,
)
LOGGER.debug('Module report:\n' + report_jinja)

# Use os.path.join to create the complete file path
report_path = os.path.join(self._results_dir, MODULE_REPORT_FILE_NAME)
jinja_path = os.path.join(self._results_dir, MODULE_REPORT_FILE_NAME)

# Write the content to a file
with open(report_path, 'w', encoding='utf-8') as file:
file.write(html_content)
with open(jinja_path, 'w', encoding='utf-8') as file:
file.write(report_jinja)

LOGGER.info('Module report generated at: ' + str(report_path))
return report_path
LOGGER.info('Module report generated at: ' + str(jinja_path))
return jinja_path

def format_extension_value(self, value):
if isinstance(value, bytes):
Expand Down Expand Up @@ -533,5 +462,5 @@ def _validate_tls_client(self,

def _resolve_device_ip(self):
# If the ipv4 address wasn't resolved yet, try again
if self._device_ipv4_addr is None:
if self._device_ipv4_addr is None: # pylint: disable=E0203
self._device_ipv4_addr = self._get_device_ipv4()
90 changes: 90 additions & 0 deletions modules/test/tls/resources/report_template.jinja2
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
{% extends base_template %}
{% block content %}
{% if cert_info_data and subject_data %}
<div style="display:flex;justify-content:space-between;">
<div style="margin-right:20px;">
<h5>Certificate Information</h5>
<table class="module-data">
<thead>
<tr>
{% for header in cert_table_headers%}
<th>{{ header }}</th>
{% endfor %}
</tr>
</thead>
<tbody>
{% for k,v in cert_info_data.items() %}
<tr>
<td>{{ k }}</td>
<td>{{ v }}</td>
</tr>
{% endfor %}
</tbody>
</table>
</div>
<div>
<h5>Subject Information</h5>
<table class="module-data">
<thead>
<tr>
{% for header in cert_table_headers%}
<th>{{ header }}</th>
{% endfor %}
</tr>
</thead>
<tbody>
{% for k,v in subject_data.items() %}
<tr>
<td>{{ k }}</td>
<td>{{ v }}</td>
</tr>
{% endfor %}
</tbody>
</table>
</div>
</div>
{% if cert_ext %}
<h5>Certificate Extensions</h5>
<table class="module-data" style="margin-bottom:20px;">
<thead>
<tr>
<th>Property</th>
<th>Value</th>
</tr>
</thead>
<tbody>
{% for k,v in cert_ext.items()%}
<tr>
<td>{{ k }}</td>
<td>{{ v }}</td>
</tr>
{% endfor %}
</tbody>
</table>
{% endif %}
{% elif ountbound_headers %}
<h1>Outbound Connections</h1>
<table class="module-data">
<thead>
<tr>
{% for header in ountbound_headers%}
<th>{{ header }}</th>
{% endfor %}
</tr>
</thead>
<tbody>
{% for ip, port in outbound_conns%}
<tr>
<td>{{ip}}</td>
<td>{{port}}</td>
</tr>
{% endfor %}
</tbody>
</table>
{% else %}
<div class="callout-container info">
<div class="icon"></div>
No TLS certificates found on the device
</div>
{% endif %}
{% endblock %}
Loading
Loading