diff --git a/modules/test/conn/python/src/connection_module.py b/modules/test/conn/python/src/connection_module.py index fdb1ae5cc..4a0cd8376 100644 --- a/modules/test/conn/python/src/connection_module.py +++ b/modules/test/conn/python/src/connection_module.py @@ -44,7 +44,7 @@ class ConnectionModule(TestModule): """Connection Test module""" - def __init__(self, + def __init__(self, # pylint: disable=R0917 module, conf_file=None, results_dir=None, @@ -192,51 +192,50 @@ def _connection_dhcp_address(self): LOGGER.info('Running connection.dhcp_address') lease = self._dhcp_util.get_cur_lease(mac_address=self._device_mac, timeout=self._lease_wait_time_sec) - if lease is not None: - if 'ip' in lease: - ip_addr = lease['ip'] - LOGGER.info('IP Resolved: ' + ip_addr) - LOGGER.info('Attempting to ping device...') - ping_success = self._ping(self._device_ipv4_addr) - LOGGER.debug('Ping success: ' + str(ping_success)) - if ping_success: - return True, 'Device responded to leased IP address' - else: - return False, 'Device did not respond to leased IP address' - else: - LOGGER.info('No IP information found in lease: ' + self._device_mac) - return False, 'No IP information found in lease: ' + self._device_mac - else: - LOGGER.info('No DHCP lease could be found for MAC ' + self._device_mac + + if lease is None: + message = (f'No DHCP lease could be found for MAC {self._device_mac}' + ' at the time of this test') - return (False, 'No DHCP lease could be found for MAC ' + - self._device_mac + ' at the time of this test') + LOGGER.info(message) + return False, message + if 'ip' not in lease: + message = f'No IP information found in lease: {self._device_mac}' + LOGGER.info(message) + return False, message + ip_addr = lease['ip'] + LOGGER.info('IP Resolved: ' + ip_addr) + LOGGER.info('Attempting to ping device...') + ping_success = self._ping(self._device_ipv4_addr) + LOGGER.debug('Ping success: ' + str(ping_success)) + if not ping_success: + return False, 'Device did not respond to leased IP address' + return True, 'Device responded to leased IP address' def _connection_mac_address(self): LOGGER.info('Running connection.mac_address') - if self._device_mac is not None: - LOGGER.info('MAC address found: ' + self._device_mac) - return True, 'MAC address found: ' + self._device_mac - else: + if self._device_mac is None: LOGGER.info('No MAC address found: ' + self._device_mac) return False, 'No MAC address found.' + message = f'MAC address found: {self._device_mac}' + LOGGER.info(message) + return True, message def _connection_mac_oui(self): LOGGER.info('Running connection.mac_oui') manufacturer = self._get_oui_manufacturer(self._device_mac) - if manufacturer is not None: - LOGGER.info('OUI Manufacturer found: ' + manufacturer) - return True, 'OUI Manufacturer found: ' + manufacturer - else: - LOGGER.info('No OUI Manufacturer found for: ' + self._device_mac) - return False, 'No OUI Manufacturer found for: ' + self._device_mac + if manufacturer is None: + msg = f'No OUI Manufacturer found for: {self._device_mac}' + LOGGER.info(msg) + return False, msg + msg = f'OUI Manufacturer found: {manufacturer}' + LOGGER.info(msg) + return True, msg def _connection_single_ip(self): LOGGER.info('Running connection.single_ip') result = None if self._device_mac is None: - LOGGER.info('No MAC address found: ') + LOGGER.info('No MAC address found.') return result, 'No MAC address found.' # Read all the pcap files containing DHCP packet information @@ -286,187 +285,149 @@ def _connection_target_ping(self): if self._device_ipv4_addr is None: LOGGER.error('No device IP could be resolved') return 'Error', 'Could not resolve device IP address' - else: - if self._ping(self._device_ipv4_addr): - return True, 'Device responds to ping' - else: - return False, 'Device does not respond to ping' + if not self._ping(self._device_ipv4_addr): + return False, 'Device does not respond to ping' + return True, 'Device responds to ping' def _connection_ipaddr_ip_change(self, config): - result = None LOGGER.info('Running connection.ipaddr.ip_change') # Resolve the configured lease wait time - if 'lease_wait_time_sec' in config: - self._lease_wait_time_sec = config['lease_wait_time_sec'] - - if self._dhcp_util.setup_single_dhcp_server(): - lease = self._dhcp_util.get_cur_lease(mac_address=self._device_mac, + if (not 'lease_wait_time_sec' in config or + not self._dhcp_util.setup_single_dhcp_server()): + return None, 'Failed to configure network for test' + self._lease_wait_time_sec = config['lease_wait_time_sec'] + lease = self._dhcp_util.get_cur_lease(mac_address=self._device_mac, timeout=self._lease_wait_time_sec) - if lease is not None: - LOGGER.info('Current device lease resolved') - LOGGER.debug(str(lease)) - # Figure out how to calculate a valid IP address - ip_address = '10.10.10.30' - if self._dhcp_util.add_reserved_lease(lease['hostname'], + if lease is None: + message = ('Device has no current DHCP lease so ' + + 'this test could not be run') + LOGGER.info(message) + return None, message + LOGGER.info('Current device lease resolved') + LOGGER.debug(str(lease)) + # Figure out how to calculate a valid IP address + ip_address = '10.10.10.30' + if not self._dhcp_util.add_reserved_lease(lease['hostname'], lease['hw_addr'], ip_address): - self._dhcp_util.wait_for_lease_expire(lease, - self._lease_wait_time_sec) - LOGGER.info('Checking device accepted new IP') - for _ in range(5): - LOGGER.info('Pinging device at IP: ' + ip_address) - if self._ping(ip_address): - LOGGER.debug('Ping success') - LOGGER.debug('Reserved lease confirmed active in device') - result = True, 'Device has accepted an IP address change' - LOGGER.debug('Restoring DHCP failover configuration') - break - else: - LOGGER.info('Device did not respond to ping') - result = False, 'Device did not accept IP address change' - time.sleep(5) # Wait 5 seconds before trying again - self._dhcp_util.delete_reserved_lease(lease['hw_addr']) - else: - result = None, 'Failed to create reserved lease for device' + return None, 'Failed to create reserved lease for device' + self._dhcp_util.wait_for_lease_expire(lease, + self._lease_wait_time_sec) + LOGGER.info('Checking device accepted new IP') + for _ in range(5): + LOGGER.info('Pinging device at IP: ' + ip_address) + if self._ping(ip_address): + LOGGER.debug('Ping success') + LOGGER.debug('Reserved lease confirmed active in device') + result = True, 'Device has accepted an IP address change' + LOGGER.debug('Restoring DHCP failover configuration') + break else: - LOGGER.info('Device has no current DHCP lease so ' + - 'this test could not be run') - result = None, ('Device has no current DHCP lease so ' + - 'this test could not be run') - # Restore the network - self._dhcp_util.restore_failover_dhcp_server() - LOGGER.info('Waiting 30 seconds for reserved lease to expire') - time.sleep(30) - self._dhcp_util.get_cur_lease(mac_address=self._device_mac, - timeout=self._lease_wait_time_sec) - else: - result = None, 'Failed to configure network for test' + LOGGER.info('Device did not respond to ping') + result = False, 'Device did not accept IP address change' + time.sleep(5) # Wait 5 seconds before trying again + self._dhcp_util.delete_reserved_lease(lease['hw_addr']) + # Restore the network + self._dhcp_util.restore_failover_dhcp_server() + LOGGER.info('Waiting 30 seconds for reserved lease to expire') + time.sleep(30) + self._dhcp_util.get_cur_lease(mac_address=self._device_mac, + timeout=self._lease_wait_time_sec) return result def _connection_ipaddr_dhcp_failover(self, config): - result = None LOGGER.info('Running connection.ipaddr.dhcp_failover') - # Resolve the configured lease wait time if 'lease_wait_time_sec' in config: self._lease_wait_time_sec = config['lease_wait_time_sec'] - # Confirm that both servers are online primary_status = self._dhcp_util.get_dhcp_server_status( dhcp_server_primary=True) secondary_status = self._dhcp_util.get_dhcp_server_status( dhcp_server_primary=False) - if primary_status and secondary_status: - lease = self._dhcp_util.get_cur_lease(mac_address=self._device_mac, + if not primary_status or not secondary_status: + LOGGER.error('Network is not ready for this test. Skipping') + return None, 'Network is not ready for this test' + lease = self._dhcp_util.get_cur_lease(mac_address=self._device_mac, timeout=self._lease_wait_time_sec) - if lease is not None: - LOGGER.info('Current device lease resolved') - if self._dhcp_util.is_lease_active(lease): - # Shutdown the primary server - if self._dhcp_util.stop_dhcp_server(dhcp_server_primary=True): - # Wait until the current lease is expired - self._dhcp_util.wait_for_lease_expire(lease, - self._lease_wait_time_sec) - # Make sure the device has received a new lease from the - # secondary server - if self._dhcp_util.get_cur_lease(mac_address=self._device_mac, - timeout=self._lease_wait_time_sec): - if self._dhcp_util.is_lease_active(lease): - result = True, ('Secondary DHCP server lease confirmed active ' - 'in device') - else: - result = False, 'Could not validate lease is active in device' - else: - result = False, ('Device did not recieve a new lease from ' - 'secondary DHCP server') - self._dhcp_util.start_dhcp_server(dhcp_server_primary=True) - else: - result = None, 'Failed to shutdown primary DHCP server' - else: - result = False, 'Device did not respond to ping' - else: - result = ( + if lease is None: + return ( None, 'Device has no current DHCP lease so this test could not be run') - else: - LOGGER.error('Network is not ready for this test. Skipping') - result = None, 'Network is not ready for this test' - return result + LOGGER.info('Current device lease resolved') + if not self._dhcp_util.is_lease_active(lease): + return False, 'Device did not respond to ping' + # Shutdown the primary server + if not self._dhcp_util.stop_dhcp_server(dhcp_server_primary=True): + return None, 'Failed to shutdown primary DHCP server' + # Wait until the current lease is expired + self._dhcp_util.wait_for_lease_expire(lease, + self._lease_wait_time_sec) + # Make sure the device has received a new lease from the + # secondary server + lease = self._dhcp_util.get_cur_lease(mac_address=self._device_mac, + timeout=self._lease_wait_time_sec) + if lease is None: + return False, ('Device did not recieve a new lease from ' + 'secondary DHCP server') + if not self._dhcp_util.is_lease_active(lease): + return False, 'Could not validate lease is active in device' + return True, ('Secondary DHCP server lease confirmed active ' + 'in device') - def _connection_dhcp_disconnect(self): + def _connection_dhcp_disconnect(self) -> tuple[str | bool, str]: LOGGER.info('Running connection.dhcp.disconnect') - result = None - description = '' dev_iface = os.getenv('DEV_IFACE') - + rpc_error_msg = 'Unable to connect to gRPC server' try: iface_status = self.host_client.check_interface_status(dev_iface) - if iface_status.code == 200: - LOGGER.info('Successfully resolved iface status') - if iface_status.status: - lease = self._dhcp_util.get_cur_lease(mac_address=self._device_mac, + except Exception: + LOGGER.error(rpc_error_msg) + return 'Error', rpc_error_msg + if iface_status.code != 200: + return 'Error', 'Device interface could not be resolved' + LOGGER.info('Successfully resolved iface status') + if not iface_status.status: + return 'Error', 'Device interface is down' + lease = self._dhcp_util.get_cur_lease(mac_address=self._device_mac, timeout=self._lease_wait_time_sec) - if lease is not None: - LOGGER.info('Current device lease resolved') - if self._dhcp_util.is_lease_active(lease): - - # Disable the device interface - iface_down = self.host_client.set_iface_down(dev_iface) - if iface_down: - LOGGER.info('Device interface set to down state') - - # Wait for the lease to expire - self._dhcp_util.wait_for_lease_expire(lease, - self._lease_wait_time_sec) - - # Wait an additonal 10 seconds to better test a true disconnect - # state - LOGGER.info('Waiting 10 seconds before bringing iface back up') - time.sleep(10) - - # Enable the device interface - iface_up = self.host_client.set_iface_up(dev_iface) - if iface_up: - LOGGER.info('Device interface set to up state') - - # Confirm device receives a new lease - if self._dhcp_util.get_cur_lease( - mac_address=self._device_mac, - timeout=self._lease_wait_time_sec): - if self._dhcp_util.is_lease_active(lease): - result = True - description = ( - 'Device received a DHCP lease after disconnect') - else: - result = False - description = ( - 'Could not confirm DHCP lease active after disconnect') - else: - result = False - description = ( - 'Device did not recieve a DHCP lease after disconnect') - else: - result = 'Error' - description = 'Failed to set device interface to up state' - else: - result = 'Error' - description = 'Failed to set device interface to down state' - else: - result = 'Error' - description = 'No active lease available for device' - else: - result = 'Error' - description = 'Device interface is down' - else: - result = 'Error' - description = 'Device interface could not be resolved' - + if lease is None or not self._dhcp_util.is_lease_active(lease): + return 'Error', 'No active lease available for device' + try: + # Disable the device interface + iface_down = self.host_client.set_iface_down(dev_iface) except Exception: - LOGGER.error('Unable to connect to gRPC server') - result = 'Error' - description = ( - 'Unable to connect to gRPC server' - ) - return result, description + LOGGER.error(rpc_error_msg) + return 'Error', rpc_error_msg + if not iface_down: + return 'Error', 'Failed to set device interface to down state' + LOGGER.info('Device interface set to down state') + + # Wait for the lease to expire + self._dhcp_util.wait_for_lease_expire(lease, + self._lease_wait_time_sec) + + # Wait an additonal 10 seconds to better test a true disconnect + # state + LOGGER.info('Waiting 10 seconds before bringing iface back up') + time.sleep(10) + try: + # Enable the device interface + iface_up = self.host_client.set_iface_up(dev_iface) + except Exception: + LOGGER.error(rpc_error_msg) + return 'Error', rpc_error_msg + if not iface_up: + return 'Error', 'Failed to set device interface to up state' + LOGGER.info('Device interface set to up state') + # Confirm device receives a new lease + lease = self._dhcp_util.get_cur_lease( + mac_address=self._device_mac, + timeout=self._lease_wait_time_sec) + if lease is None: + return False, 'Device did not recieve a DHCP lease after disconnect' + if not self._dhcp_util.is_lease_active(lease): + return False, 'Could not confirm DHCP lease active after disconnect' + return True, 'Device received a DHCP lease after disconnect' def _connection_dhcp_disconnect_ip_change(self): LOGGER.info('Running connection.dhcp.disconnect_ip_change') @@ -589,21 +550,18 @@ def _get_oui_manufacturer(self, mac_address): def _connection_ipv6_slaac(self): LOGGER.info('Running connection.ipv6_slaac') - result = None - slac_test, sends_ipv6 = self._has_slaac_address() if slac_test: - result = True, f'Device has formed SLAAC address {self._device_ipv6_addr}' + return True, f'Device has formed SLAAC address {self._device_ipv6_addr}' elif slac_test is None: - result = 'Error', 'An error occurred whilst running this test' + return 'Error', 'An error occurred whilst running this test' else: if sends_ipv6: LOGGER.info('Device does not support IPv6 SLAAC') - result = False, 'Device does not support IPv6 SLAAC' + return False, 'Device does not support IPv6 SLAAC' else: LOGGER.info('Device does not support IPv6') - result = False, 'Device does not support IPv6' - return result + return False, 'Device does not support IPv6' def _has_slaac_address(self): packet_capture = (rdpcap(self.startup_capture_file) + @@ -630,19 +588,14 @@ def _has_slaac_address(self): def _connection_ipv6_ping(self): LOGGER.info('Running connection.ipv6_ping') - result = None if self._device_ipv6_addr is None: LOGGER.info('No IPv6 SLAAC address found. Cannot ping') - result = False, 'No IPv6 SLAAC address found. Cannot ping' - else: - if self._ping(self._device_ipv6_addr, ipv6=True): - LOGGER.info(f'Device responds to IPv6 ping on {self._device_ipv6_addr}') - result = True, ('Device responds to IPv6 ping on ' + - f'{self._device_ipv6_addr}') - else: - LOGGER.info('Device does not respond to IPv6 ping') - result = False, 'Device does not respond to IPv6 ping' - return result + return False, 'No IPv6 SLAAC address found. Cannot ping' + if self._ping(self._device_ipv6_addr, ipv6=True): + LOGGER.info(f'Device responds to IPv6 ping on {self._device_ipv6_addr}') + return True, f'Device responds to IPv6 ping on {self._device_ipv6_addr}' + LOGGER.info('Device does not respond to IPv6 ping') + return False, 'Device does not respond to IPv6 ping' def _ping(self, host, ipv6=False): LOGGER.info('Pinging: ' + str(host)) @@ -661,40 +614,34 @@ def restore_failover_dhcp_server(self, subnet): if response.code == 200: LOGGER.info('DHCP server configuration restored') return True - else: - LOGGER.error('Failed to start secondary DHCP server') - return False - else: - LOGGER.error('Failed to enabled failover in primary DHCP server') + LOGGER.error('Failed to start secondary DHCP server') return False - else: - LOGGER.error('Failed to restore original subnet') + LOGGER.error('Failed to enabled failover in primary DHCP server') return False + LOGGER.error('Failed to restore original subnet') + return False def setup_single_dhcp_server(self): # Shutdown the secondary DHCP Server LOGGER.info('Stopping secondary DHCP server') response = self.dhcp2_client.stop_dhcp_server() - if response.code == 200: - LOGGER.info('Secondary DHCP server stop command success') - time.sleep(3) # Give some time for the server to stop - LOGGER.info('Checking secondary DHCP server status') - response = self.dhcp2_client.get_status() - if response.code == 200: - LOGGER.info('Secondary DHCP server stopped') - LOGGER.info('Configuring primary DHCP server') - # Move primary DHCP server from failover into - # a single DHCP server config - response = self.dhcp1_client.disable_failover() - if response.code == 200: - LOGGER.info('Primary DHCP server failover disabled') - else: - return False, 'Failed to disable primary DHCP server failover' - return True, 'Single DHCP server configured' - else: - return False, 'Secondary DHCP server still running' - else: + if response.code != 200: return False, 'Secondary DHCP server stop command failed' + LOGGER.info('Secondary DHCP server stop command success') + time.sleep(3) # Give some time for the server to stop + LOGGER.info('Checking secondary DHCP server status') + response = self.dhcp2_client.get_status() + if response.code != 200: + return False, 'Secondary DHCP server still running' + LOGGER.info('Secondary DHCP server stopped') + LOGGER.info('Configuring primary DHCP server') + # Move primary DHCP server from failover into + # a single DHCP server config + response = self.dhcp1_client.disable_failover() + if response.code != 200: + return False, 'Failed to disable primary DHCP server failover' + LOGGER.info('Primary DHCP server failover disabled') + return True, 'Single DHCP server configured' def _communication_network_type(self): try: @@ -764,9 +711,8 @@ def enable_failover(self): if response.code == 200: LOGGER.info('Primary DHCP server enabled') return True - else: - LOGGER.error('Failed to disable primary DHCP server failover') - return False + LOGGER.error('Failed to disable primary DHCP server failover') + return False def is_ip_in_range(self, ip, start_ip, end_ip): ip_int = int(''.join(format(int(octet), '08b') for octet in ip.split('.')), @@ -912,8 +858,7 @@ def _change_subnet(self, subnet): LOGGER.debug('Subnet change confirmed') return True LOGGER.debug('Failed to confirm subnet change') - else: - LOGGER.debug('Subnet change request failed.') + LOGGER.debug('Subnet change request failed.') return False def test_subnets(self, subnets):