Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 29 additions & 33 deletions framework/python/src/test_orc/test_orchestrator.py
Original file line number Diff line number Diff line change
Expand Up @@ -255,11 +255,8 @@ def _calculate_result(self):
for test_result in self.get_session().get_test_results():

# Check Required tests
if (test_result.required_result.lower() == "required"
and test_result.result not in [
TestResult.COMPLIANT,
TestResult.ERROR
]):
if (test_result.required_result.lower() == "required" and
test_result.result not in [TestResult.COMPLIANT, TestResult.ERROR]):
result = TestResult.NON_COMPLIANT

# Check Required if Applicable tests
Expand Down Expand Up @@ -360,12 +357,12 @@ def zip_results(self, device, timestamp: str, profile):

# Report file path
report_path = os.path.join(
LOCAL_DEVICE_REPORTS.replace("{device_folder}", device.device_folder),
timestamp, "test", device.mac_addr.replace(":", ""))
LOCAL_DEVICE_REPORTS.replace("{device_folder}", device.device_folder),
timestamp, "test", device.mac_addr.replace(":", ""))

# Parse string timestamp
date_timestamp: datetime.datetime = datetime.strptime(
timestamp, "%Y-%m-%dT%H:%M:%S")
timestamp, "%Y-%m-%dT%H:%M:%S")

# Find the report
test_report = None
Expand All @@ -388,17 +385,18 @@ def zip_results(self, device, timestamp: str, profile):

# Write the json report
with open(os.path.join(report_path, "report.json"),
"w", encoding="utf-8") as f:
"w",
encoding="utf-8") as f:
json.dump(test_report.to_json(), f, indent=2)

# Write the html report
with open(os.path.join(report_path, "report.html"),
"w", encoding="utf-8") as f:
"w",
encoding="utf-8") as f:
f.write(test_report.to_html())

# Write the pdf report
with open(os.path.join(report_path, "report.pdf"),
"wb") as f:
with open(os.path.join(report_path, "report.pdf"), "wb") as f:
f.write(test_report.to_pdf().getvalue())

# Define temp directory to store files before zipping
Expand Down Expand Up @@ -569,14 +567,13 @@ def _run_test_module(self, module):
for test_result in module_results:

# Convert dict from json into TestCase object
test_case = TestCase(
name=test_result["name"],
result=test_result["result"],
description=test_result["description"])
test_case = TestCase(name=test_result["name"],
result=test_result["result"],
description=test_result["description"])

# Add steps to resolve if test is non-compliant
if (test_case.result == TestResult.NON_COMPLIANT and
"recommendations" in test_result):
if (test_case.result == TestResult.NON_COMPLIANT
and "recommendations" in test_result):
test_case.recommendations = test_result["recommendations"]
else:
test_case.recommendations = []
Expand Down Expand Up @@ -666,17 +663,13 @@ def _load_test_packs(self):

LOGGER.debug(f"Loading test pack {test_pack_file}")

with open(os.path.join(
self._root_path,
TEST_PACKS_DIR,
test_pack_file), encoding="utf-8") as f:
with open(os.path.join(self._root_path, TEST_PACKS_DIR, test_pack_file),
encoding="utf-8") as f:
test_pack_json = json.load(f)

test_pack: TestPack = TestPack(
name = test_pack_json["name"],
tests = test_pack_json["tests"],
language = test_pack_json["language"]
)
test_pack: TestPack = TestPack(name=test_pack_json["name"],
tests=test_pack_json["tests"],
language=test_pack_json["language"])

self._test_packs.append(test_pack)

Expand All @@ -693,6 +686,12 @@ def _load_test_modules(self):
# corrupted during DHCP changes in the conn module
if "protocol" in module_dirs:
module_dirs.insert(0, module_dirs.pop(module_dirs.index("protocol")))
# Check if the directory services exists and move it higher in the index
# so it always runs before connection. Connection may cause too many
# DHCP changes causing nmap to use wrong IP during scan
if "services" in module_dirs and "conn" in module_dirs:
module_dirs.insert(module_dirs.index("conn"),
module_dirs.pop(module_dirs.index("services")))

for module_dir in module_dirs:

Expand Down Expand Up @@ -723,9 +722,7 @@ def _load_test_module(self, module_dir):
module_conf_file = os.path.join(self._root_path, modules_dir, module_dir,
MODULE_CONFIG)

module = TestModule(module_conf_file,
self,
self.get_session(),
module = TestModule(module_conf_file, self, self.get_session(),
extra_hosts)
if module.depends_on is not None:
self._load_test_module(module.depends_on)
Expand Down Expand Up @@ -786,6 +783,5 @@ def _set_test_modules_error(self, current_test):
start_idx = current_test if i == self._current_module else 0
for j in range(start_idx, len(self._test_modules_running[i].tests)):
self.get_session().set_test_result_error(
self._test_modules_running[i].tests[j],
"Test did not run, the device was disconnected"
)
self._test_modules_running[i].tests[j],
"Test did not run, the device was disconnected")
6 changes: 6 additions & 0 deletions modules/test/services/python/src/services_module.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,8 @@ def generate_module_report(self):

def _run_nmap(self):
LOGGER.info('Running nmap')
self._device_ipv4_addr = self._get_device_ipv4()
LOGGER.info('Resolved device IP: ' + str(self._device_ipv4_addr))

# Run the monitor method asynchronously to keep this method non-blocking
self._tcp_scan_thread = threading.Thread(target=self._scan_tcp_ports)
Expand Down Expand Up @@ -191,7 +193,9 @@ def _scan_tcp_ports(self):
--version-intensity 7 -T4 -oX - {self._ipv4_addr}''')[0]

LOGGER.info('TCP port scan complete')
LOGGER.debug(f'TCP Scan results raw: {nmap_results}')
nmap_results_json = self._nmap_results_to_json(nmap_results)
LOGGER.debug(f'TCP Scan results JSON: {nmap_results_json}')
self._scan_tcp_results = self._process_nmap_json_results(
nmap_results_json=nmap_results_json)

Expand All @@ -217,7 +221,9 @@ def _scan_udp_ports(self):
nmap_results = util.run_command( # pylint: disable=E1120
f'nmap -sU -sV -p {port_list} -oX - {self._ipv4_addr}')[0]
LOGGER.info('UDP port scan complete')
LOGGER.debug(f'UDP Scan results raw: {nmap_results}')
nmap_results_json = self._nmap_results_to_json(nmap_results)
LOGGER.debug(f'UDP Scan results JSON: {nmap_results_json}')
self._scan_udp_results = self._process_nmap_json_results(
nmap_results_json=nmap_results_json)

Expand Down