diff --git a/j1939/Dm14Query.py b/j1939/Dm14Query.py index d3be851..7e426ab 100644 --- a/j1939/Dm14Query.py +++ b/j1939/Dm14Query.py @@ -45,6 +45,33 @@ def __init__(self, ca: j1939.ControllerApplication, user_level=7) -> None: self.exception_queue = queue.Queue() self.user_level = user_level + def unsubscribe_all(self) -> None: + """ + Unsubscribes all message handlers + """ + self._ca.unsubscribe(self._parse_dm15) + self._ca.unsubscribe(self._parse_dm16) + + def reset_query(self) -> None: + """ + Resets query to remove transaction specific data + """ + self.state = QueryState.IDLE + self._dest_address = None + self.address = None + self.object_count = 0 + self.object_byte_size = 1 + self.signed = False + self.return_raw_bytes = False + self.direct = 0 + self.command = None + self.bytes = bytearray() + self.mem_data = None + self.data_queue = queue.Queue() + self.exception_queue = queue.Queue() + self.unsubscribe_all() + + def _wait_for_data(self) -> None: """ Determines whether to send data or wait to receive data based on the command type. If the command is a write command, then the data is sent. diff --git a/j1939/Dm14Server.py b/j1939/Dm14Server.py index 34865f4..02bc859 100644 --- a/j1939/Dm14Server.py +++ b/j1939/Dm14Server.py @@ -161,11 +161,10 @@ def parse_dm14( self.state = ResponseState.SEND_PROCEED case ResponseState.WAIT_OPERATION_COMPLETE: - self.state = ResponseState.IDLE - self.sa = None - self._ca.unsubscribe(self.parse_dm14) + self.reset_server() case _: + self.reset_server() raise ValueError("Invalid state") def _send_dm15( @@ -222,6 +221,7 @@ def _send_dm15( data[length - 3] = edcp case _: + self.reset_server() raise ValueError("Invalid state") self._ca.send_pgn(0, (pgn >> 8) & 0xFF, sa & 0xFF, 6, data) @@ -255,12 +255,12 @@ def _parse_dm16( if pgn != j1939.ParameterGroupNumber.PGN.DM16 or sa != self.sa: return - length = min(data[0], len(data) - 1) self.data_queue.put(data[1 : length + 1]) self._ca.unsubscribe(self._parse_dm16) self._ca.subscribe(self.parse_dm14) self.state = ResponseState.SEND_OPERATION_COMPLETE + self._send_dm15( self.length, self.direct, @@ -329,11 +329,19 @@ def verify_key(self, seed: int, key: int) -> bool: ) return self._key_from_seed(seed) == key - def reset_query(self) -> None: + def unsubscribe_all(self) -> None: + """ + Unsubscribes all message handlers + """ + self._ca.unsubscribe(self.parse_dm14) + self._ca.unsubscribe(self._parse_dm16) + + def reset_server(self) -> None: """ - Resets query to initial state + Resets server to remove transaction specific data """ self.state = ResponseState.IDLE + self.data_queue = queue.Queue() self.sa = None self.seed = None self.key = None @@ -346,8 +354,7 @@ def reset_query(self) -> None: self.edcp = 0x07 self.status = j1939.Dm15Status.PROCEED.value self.direct = 0 - self._ca.unsubscribe(self.parse_dm14) - self._ca.unsubscribe(self._parse_dm16) + self.unsubscribe_all() def respond( self, diff --git a/j1939/electronic_control_unit.py b/j1939/electronic_control_unit.py index 8a1622d..cbb5abc 100644 --- a/j1939/electronic_control_unit.py +++ b/j1939/electronic_control_unit.py @@ -288,7 +288,7 @@ def notify(self, can_id, data, timestamp): """ self.j1939_dll.notify(can_id, data, timestamp) - def add_bus_filters(self, fitlers: can.typechecking.CanFilters | None): + def add_bus_filters(self, filters: can.typechecking.CanFilters | None): """Add bus filters to the underlying CAN bus. :param filters: @@ -297,7 +297,7 @@ def add_bus_filters(self, fitlers: can.typechecking.CanFilters | None): """ if self._bus is None: raise RuntimeError("Not connected to CAN bus") - self._bus.set_filters(fitlers) + self._bus.set_filters(filters) def _async_job_thread(self): """Asynchronous thread for handling various jobs @@ -374,7 +374,10 @@ def _notify_subscribers(self, priority, pgn, sa, dest, timestamp, data): logger.debug("notify subscribers for PGN {}".format(pgn)) # notify only the CA for which the message is intended # each CA receives all broadcast messages - for dic in self._subscribers: + + # TODO: this is ineffecient but there exists a possibility of removing subscribers during callback + # and adding new ones in while this is going and it can impact message receivement + for dic in self._subscribers.copy(): if (dic['dev_adr'] == None) or (dest == ParameterGroupNumber.Address.GLOBAL) or (callable(dic['dev_adr']) and dic['dev_adr'](dest)) or (dest == dic['dev_adr']): dic['cb'](priority, pgn, sa, timestamp, data) diff --git a/j1939/memory_access.py b/j1939/memory_access.py index 2eb3448..7da12a5 100644 --- a/j1939/memory_access.py +++ b/j1939/memory_access.py @@ -7,12 +7,14 @@ class DMState(Enum): REQUEST_STARTED = 2 WAIT_RESPONSE = 3 WAIT_QUERY = 4 + SERVER_CLEANUP = 5 class MemoryAccess: def __init__(self, ca: j1939.ControllerApplication) -> None: """ Makes an overarching Memory access class + :param ca: Controller Application """ self._ca = ca @@ -22,14 +24,33 @@ def __init__(self, ca: j1939.ControllerApplication) -> None: self.state = DMState.IDLE self.seed_security = False self._notify_query_received = None - self._seed_key_valid = None self._proceed_function = None + def _handle_error(self, priority: int, pgn: int, sa: int, timestamp: int, data: bytearray, error_code: int) -> None: + """ + Handles errors by resetting the state and unsubscribing from DM14 messages + + :param priority: Priority of the message + :param pgn: Parameter Group Number of the message + :param sa: Source Address of the message + :param timestamp: Timestamp of the message + :param data: Data of the PDU + :param error_code: Error code to be set + """ + self.server.error = error_code + self.server.set_busy(True) + self.server.parse_dm14( + priority, pgn, sa, timestamp, data + ) + self.server.set_busy(False) + self.reset() + def _listen_for_dm14( self, priority: int, pgn: int, sa: int, timestamp: int, data: bytearray ) -> None: """ Listens for dm14 messages and passes them to the appropriate function + :param priority: Priority of the message :param pgn: Parameter Group Number of the message :param sa: Source Address of the message @@ -64,15 +85,7 @@ def _listen_for_dm14( if self.proceed: self._notify_query_received() # notify incoming request else: - self.server.error = 0x100 - self.server.set_busy(True) - self.server.parse_dm14( - priority, pgn, sa, timestamp, data - ) - self.server.set_busy(False) - self.server.reset_query() - self.state = DMState.IDLE - self.server.error = 0x0 + self._handle_error(priority, pgn, sa, timestamp, data, 0x100) case DMState.REQUEST_STARTED: self.server.parse_dm14(priority, pgn, sa, timestamp, data) @@ -101,32 +114,19 @@ def _listen_for_dm14( if self.proceed: self._notify_query_received() # notify incoming request else: - self.server.error = 0x100 - self.server.set_busy(True) - self.server.parse_dm14( - priority, pgn, sa, timestamp, data - ) - self.server.set_busy(False) - self.server.reset_query() - self.state = DMState.IDLE - self.server.error = 0x0 + self._handle_error(priority, pgn, sa, timestamp, data, 0x100) else: - self.server.error = 0x1003 - self.server.set_busy(True) - self.server.parse_dm14( - priority, pgn, sa, timestamp, data - ) - self.server.set_busy(False) - self.state = DMState.IDLE - self.server.error = 0x0 + self._handle_error(priority, pgn, sa, timestamp, data, 0x1003) case DMState.WAIT_QUERY: self.server.set_busy(True) self.server.parse_dm14(priority, pgn, sa, timestamp, data) self.server.set_busy(False) + case DMState.SERVER_CLEANUP: + self.state = DMState.IDLE case _: pass - + def respond( self, proceed: bool, @@ -137,6 +137,7 @@ def respond( ) -> list: """ Responds with requested data and error code, if applicable, to a read request + :param bool proceed: whether the operation is good to proceed :param list data: data to be sent to device :param int error: error code to be sent to device @@ -145,14 +146,15 @@ def respond( """ if data is None: data = [] - if self.state is DMState.WAIT_RESPONSE: - self._ca.unsubscribe(self._listen_for_dm14) - self.state = DMState.IDLE - return_data = self.server.respond(proceed, data, error, edcp, max_timeout) - self._ca.subscribe(self._listen_for_dm14) - return return_data - else: + + if self.state is not DMState.WAIT_RESPONSE: return data + + self._ca.unsubscribe(self._listen_for_dm14) + return_data = self.server.respond(proceed, data, error, edcp, max_timeout) + self.state = DMState.SERVER_CLEANUP if self.server.state.value != DMState.IDLE.value else DMState.IDLE + self._ca.subscribe(self._listen_for_dm14) + return return_data def read( self, @@ -167,6 +169,7 @@ def read( ) -> list: """ Make a dm14 read Query + :param int dest_address: destination address of the message :param int direct: direct address of the message :param int address: address of the message @@ -189,9 +192,10 @@ def read( return_raw_bytes, max_timeout, ) - self.state = DMState.IDLE + self.reset() return data else: + self.reset() raise RuntimeWarning("Process already Running") def write( @@ -205,6 +209,7 @@ def write( ) -> None: """ Send a write query to dest_address, requesting to write values at address + :param int dest_address: destination address of the message :param int direct: direct address of the message :param int address: address of the message @@ -218,7 +223,7 @@ def write( self.query.write( dest_address, direct, address, values, object_byte_size, max_timeout ) - self.state = DMState.IDLE + self.reset() def set_seed_generator(self, seed_generator: callable) -> None: """ @@ -229,7 +234,8 @@ def set_seed_generator(self, seed_generator: callable) -> None: def set_seed_key_algorithm(self, algorithm: callable) -> None: """ - set seed-key algorithm to be used for key generation + Sets seed-key algorithm to be used for key generation + :param callable algorithm: seed-key algorithm """ self.seed_security = True @@ -238,28 +244,34 @@ def set_seed_key_algorithm(self, algorithm: callable) -> None: def set_verify_key(self, verify_key: callable) -> None: """ - set verify key function to be used for verifying the key + Sets verify key function to be used for verifying the key + :param callable verify_key: verify key function """ self.server.set_verify_key(verify_key) def set_notify(self, notify: callable) -> None: """ - set notify function to be used for notifying the user of memory accesses + Sets notify function to be used for notifying the user of memory accesses + :param callable notify: notify function """ self._notify_query_received = notify def set_proceed(self, proceed: callable) -> None: """ - set proceed function to determine if a memory query is valid or not + Sets proceed function to determine if a memory query is valid or not + :param callable proceed: proceed function """ self._proceed_function = proceed - def reset_query(self) -> None: + def reset(self) -> None: """ - reset query for the server + Resets both server and query to remove transaction specific data """ + self.state = DMState.IDLE + self._ca.unsubscribe(self._listen_for_dm14) self._ca.subscribe(self._listen_for_dm14) - self.server.reset_query() + self.server.reset_server() + self.query.reset_query() diff --git a/test/test_ecu.py b/test/test_ecu.py index 6e0930e..65f8eb9 100644 --- a/test/test_ecu.py +++ b/test/test_ecu.py @@ -200,3 +200,25 @@ def test_add_bus_filters(feeder): ] feeder.ecu.add_bus_filters(filters) assert feeder.ecu._bus.filters == filters + +def test_subscribe(feeder): + """ + Test subscribing to callback + """ + call_count = 0 + + def callback(priority: int, pgn: int, sa: int, timestamp: int, data: bytearray): + nonlocal call_count + call_count += 1 + + feeder.ecu.subscribe(callback) + + feeder.can_messages = [ + (Feeder.MsgType.CANRX, 0x00FEB201, [1, 2, 3, 4, 5, 6, 7, 8], 0.0), + ] + + feeder.pdus = [(Feeder.MsgType.PDU, 65202, [1, 2, 3, 4, 5, 6, 7, 8])] + + feeder.receive() + + assert call_count == 1 diff --git a/test/test_memory_access.py b/test/test_memory_access.py index 067c11b..9013eaa 100644 --- a/test/test_memory_access.py +++ b/test/test_memory_access.py @@ -222,7 +222,7 @@ def test_dm14_read(feeder, expected_messages): :param feeder: can message feeder :param expected_messages: list of expected messages """ - feeder.can_messages = expected_messages + feeder.can_messages = list(expected_messages) feeder.pdus_from_messages() ca = feeder.accept_all_messages( @@ -237,6 +237,11 @@ def test_dm14_read(feeder, expected_messages): dm14.read(0xD4, 1, 0x92000003, 1) feeder.process_messages() + feeder.can_messages = list(expected_messages) + + dm14.read(0xD4, 1, 0x92000003, 1) + + feeder.process_messages() @pytest.mark.parametrize( @@ -410,7 +415,7 @@ def test_dm14_request_write(feeder, expected_messages): :param feeder: can message feeder :param expected_messages: list of expected messages """ - feeder.can_messages = expected_messages + feeder.can_messages = list(expected_messages) feeder.pdus_from_messages() ca = feeder.accept_all_messages( device_address_preferred=0xD4, bypass_address_claim=True @@ -442,6 +447,24 @@ def test_dm14_request_write(feeder, expected_messages): feeder.process_messages() + feeder.can_messages = list(expected_messages) + + ca.send_pgn( + 0, + (j1939.ParameterGroupNumber.PGN.DM14 >> 8) & 0xFF, + 0xF9 & 0xFF, + 6, + [0x01, 0x13, 0x03, 0x00, 0x00, 0x92, 0x07, 0x00], + ) + + while flag is False: + pass + reset_flag() + test = dm14.respond(True, [], 0xFFFF, 0xFF) + assert values == int.from_bytes(test, byteorder="little", signed=False) + + feeder.process_messages() + def test_dm14_request_write_timeout(feeder): """