From da3c14559f9d0c344563f6837e85455b5e3c216f Mon Sep 17 00:00:00 2001 From: Aliaksandr Nikitsin Date: Wed, 25 Jun 2025 13:53:12 +0200 Subject: [PATCH] discover open posrt before full scan --- modules/test/services/conf/module_config.json | 2 +- .../services/python/src/services_module.py | 42 +++++++++++++++---- 2 files changed, 34 insertions(+), 10 deletions(-) diff --git a/modules/test/services/conf/module_config.json b/modules/test/services/conf/module_config.json index b37435eda..6c481821e 100644 --- a/modules/test/services/conf/module_config.json +++ b/modules/test/services/conf/module_config.json @@ -9,7 +9,7 @@ "docker": { "depends_on": "base", "enable_container": true, - "timeout": 900 + "timeout": 1200 }, "tests": [ { diff --git a/modules/test/services/python/src/services_module.py b/modules/test/services/python/src/services_module.py index 06ceed67d..23abcaebf 100644 --- a/modules/test/services/python/src/services_module.py +++ b/modules/test/services/python/src/services_module.py @@ -186,18 +186,42 @@ def _process_port_results(self): if self._scan_udp_results is not None: self._scan_results.update(self._scan_udp_results) + def _nmap_open_ports(self, nmap_results: str) -> list[str]: + # Returns the list of open ports from nmap xml output + open_ports = [] + try: + xml_data = xmltodict.parse(nmap_results) + if 'host' in xml_data['nmaprun']: + for entry in xml_data['nmaprun']['host']['ports']['port']: + open_ports.append(entry['@portid']) + except Exception as e: + LOGGER.error(f'Error parsing Nmap output: {e}') + return open_ports + def _scan_tcp_ports(self): + # Scans TCP ports to detect open ports. + host = self._device_ipv4_addr + # Preliminary command for quick open port detection. + nmap_command_prelim = f'''nmap --open -sT -Pn -v -n -T5 + --version-intensity 0 --min-rate 1000 + -p- -oX - {host}''' + # Command for detecting services using open ports. + nmap_command = f'''nmap --open -sT -sV -Pn -v -p 1-65535 + --version-intensity 7 -T4 -oX - {host}''' LOGGER.info(f'Running nmap TCP port scan for {self._device_ipv4_addr}') - nmap_results = util.run_command( # pylint: disable=E1120 - f'''nmap --open -sT -sV -Pn -v -p 1-65535 - --version-intensity 7 -T4 -oX - {self._device_ipv4_addr}''')[0] - + nmap_results_prelim = util.run_command(nmap_command_prelim)[0] + open_ports = self._nmap_open_ports(nmap_results_prelim) + if open_ports: + LOGGER.info(f'Open TCP ports detected: {open_ports}') + nmap_results = util.run_command(nmap_command)[0] + LOGGER.debug(f'TCP Scan results raw: {nmap_results}') + nmap_results_json = self._nmap_results_to_json(nmap_results) + LOGGER.debug(f'TCP Scan results JSON: {nmap_results_json}') + self._scan_tcp_results = self._process_nmap_json_results( + nmap_results_json=nmap_results_json) + else: + LOGGER.info('No open TCP ports detected.') LOGGER.info('TCP port scan complete') - LOGGER.debug(f'TCP Scan results raw: {nmap_results}') - nmap_results_json = self._nmap_results_to_json(nmap_results) - LOGGER.debug(f'TCP Scan results JSON: {nmap_results_json}') - self._scan_tcp_results = self._process_nmap_json_results( - nmap_results_json=nmap_results_json) def _scan_udp_ports(self):