diff --git a/framework/python/src/test_orc/test_orchestrator.py b/framework/python/src/test_orc/test_orchestrator.py index d28326cbc..deca20798 100644 --- a/framework/python/src/test_orc/test_orchestrator.py +++ b/framework/python/src/test_orc/test_orchestrator.py @@ -255,11 +255,8 @@ def _calculate_result(self): for test_result in self.get_session().get_test_results(): # Check Required tests - if (test_result.required_result.lower() == "required" - and test_result.result not in [ - TestResult.COMPLIANT, - TestResult.ERROR - ]): + if (test_result.required_result.lower() == "required" and + test_result.result not in [TestResult.COMPLIANT, TestResult.ERROR]): result = TestResult.NON_COMPLIANT # Check Required if Applicable tests @@ -360,12 +357,12 @@ def zip_results(self, device, timestamp: str, profile): # Report file path report_path = os.path.join( - LOCAL_DEVICE_REPORTS.replace("{device_folder}", device.device_folder), - timestamp, "test", device.mac_addr.replace(":", "")) + LOCAL_DEVICE_REPORTS.replace("{device_folder}", device.device_folder), + timestamp, "test", device.mac_addr.replace(":", "")) # Parse string timestamp date_timestamp: datetime.datetime = datetime.strptime( - timestamp, "%Y-%m-%dT%H:%M:%S") + timestamp, "%Y-%m-%dT%H:%M:%S") # Find the report test_report = None @@ -388,17 +385,18 @@ def zip_results(self, device, timestamp: str, profile): # Write the json report with open(os.path.join(report_path, "report.json"), - "w", encoding="utf-8") as f: + "w", + encoding="utf-8") as f: json.dump(test_report.to_json(), f, indent=2) # Write the html report with open(os.path.join(report_path, "report.html"), - "w", encoding="utf-8") as f: + "w", + encoding="utf-8") as f: f.write(test_report.to_html()) # Write the pdf report - with open(os.path.join(report_path, "report.pdf"), - "wb") as f: + with open(os.path.join(report_path, "report.pdf"), "wb") as f: f.write(test_report.to_pdf().getvalue()) # Define temp directory to store files before zipping @@ -569,14 +567,13 @@ def _run_test_module(self, module): for test_result in module_results: # Convert dict from json into TestCase object - test_case = TestCase( - name=test_result["name"], - result=test_result["result"], - description=test_result["description"]) + test_case = TestCase(name=test_result["name"], + result=test_result["result"], + description=test_result["description"]) # Add steps to resolve if test is non-compliant - if (test_case.result == TestResult.NON_COMPLIANT and - "recommendations" in test_result): + if (test_case.result == TestResult.NON_COMPLIANT + and "recommendations" in test_result): test_case.recommendations = test_result["recommendations"] else: test_case.recommendations = [] @@ -666,17 +663,13 @@ def _load_test_packs(self): LOGGER.debug(f"Loading test pack {test_pack_file}") - with open(os.path.join( - self._root_path, - TEST_PACKS_DIR, - test_pack_file), encoding="utf-8") as f: + with open(os.path.join(self._root_path, TEST_PACKS_DIR, test_pack_file), + encoding="utf-8") as f: test_pack_json = json.load(f) - test_pack: TestPack = TestPack( - name = test_pack_json["name"], - tests = test_pack_json["tests"], - language = test_pack_json["language"] - ) + test_pack: TestPack = TestPack(name=test_pack_json["name"], + tests=test_pack_json["tests"], + language=test_pack_json["language"]) self._test_packs.append(test_pack) @@ -693,6 +686,12 @@ def _load_test_modules(self): # corrupted during DHCP changes in the conn module if "protocol" in module_dirs: module_dirs.insert(0, module_dirs.pop(module_dirs.index("protocol"))) + # Check if the directory services exists and move it higher in the index + # so it always runs before connection. Connection may cause too many + # DHCP changes causing nmap to use wrong IP during scan + if "services" in module_dirs and "conn" in module_dirs: + module_dirs.insert(module_dirs.index("conn"), + module_dirs.pop(module_dirs.index("services"))) for module_dir in module_dirs: @@ -723,9 +722,7 @@ def _load_test_module(self, module_dir): module_conf_file = os.path.join(self._root_path, modules_dir, module_dir, MODULE_CONFIG) - module = TestModule(module_conf_file, - self, - self.get_session(), + module = TestModule(module_conf_file, self, self.get_session(), extra_hosts) if module.depends_on is not None: self._load_test_module(module.depends_on) @@ -786,6 +783,5 @@ def _set_test_modules_error(self, current_test): start_idx = current_test if i == self._current_module else 0 for j in range(start_idx, len(self._test_modules_running[i].tests)): self.get_session().set_test_result_error( - self._test_modules_running[i].tests[j], - "Test did not run, the device was disconnected" - ) + self._test_modules_running[i].tests[j], + "Test did not run, the device was disconnected") diff --git a/modules/test/services/python/src/services_module.py b/modules/test/services/python/src/services_module.py index 57db0a0ed..3ec713962 100644 --- a/modules/test/services/python/src/services_module.py +++ b/modules/test/services/python/src/services_module.py @@ -145,6 +145,8 @@ def generate_module_report(self): def _run_nmap(self): LOGGER.info('Running nmap') + self._device_ipv4_addr = self._get_device_ipv4() + LOGGER.info('Resolved device IP: ' + str(self._device_ipv4_addr)) # Run the monitor method asynchronously to keep this method non-blocking self._tcp_scan_thread = threading.Thread(target=self._scan_tcp_ports) @@ -191,7 +193,9 @@ def _scan_tcp_ports(self): --version-intensity 7 -T4 -oX - {self._ipv4_addr}''')[0] LOGGER.info('TCP port scan complete') + LOGGER.debug(f'TCP Scan results raw: {nmap_results}') nmap_results_json = self._nmap_results_to_json(nmap_results) + LOGGER.debug(f'TCP Scan results JSON: {nmap_results_json}') self._scan_tcp_results = self._process_nmap_json_results( nmap_results_json=nmap_results_json) @@ -217,7 +221,9 @@ def _scan_udp_ports(self): nmap_results = util.run_command( # pylint: disable=E1120 f'nmap -sU -sV -p {port_list} -oX - {self._ipv4_addr}')[0] LOGGER.info('UDP port scan complete') + LOGGER.debug(f'UDP Scan results raw: {nmap_results}') nmap_results_json = self._nmap_results_to_json(nmap_results) + LOGGER.debug(f'UDP Scan results JSON: {nmap_results_json}') self._scan_udp_results = self._process_nmap_json_results( nmap_results_json=nmap_results_json)