From 60b362eeeaa20b080842e3a136d127720d06eb95 Mon Sep 17 00:00:00 2001 From: Koltan Hauersperger Date: Mon, 26 Jan 2026 21:13:34 +0000 Subject: [PATCH 01/10] feat: address bug and clean up code --- j1939/Dm14Query.py | 19 ++++++++ j1939/Dm14Server.py | 5 +- j1939/controller_application.py | 7 +++ j1939/electronic_control_unit.py | 18 ++++++- j1939/memory_access.py | 80 +++++++++++++++++--------------- 5 files changed, 88 insertions(+), 41 deletions(-) diff --git a/j1939/Dm14Query.py b/j1939/Dm14Query.py index d3be851..d08b291 100644 --- a/j1939/Dm14Query.py +++ b/j1939/Dm14Query.py @@ -45,6 +45,25 @@ def __init__(self, ca: j1939.ControllerApplication, user_level=7) -> None: self.exception_queue = queue.Queue() self.user_level = user_level + def reset_query(self) -> None: + """ + Resets query to remove transaction specific data + """ + self.state = QueryState.IDLE + self._dest_address = None + self.address = None + self.object_count = 0 + self.object_byte_size = 1 + self.signed = False + self.return_raw_bytes = False + self.direct = 0 + self.command = None + self.bytes = bytearray() + self.mem_data = None + self.data_queue = queue.Queue() + self.exception_queue = queue.Queue() + + def _wait_for_data(self) -> None: """ Determines whether to send data or wait to receive data based on the command type. If the command is a write command, then the data is sent. diff --git a/j1939/Dm14Server.py b/j1939/Dm14Server.py index 34865f4..f100424 100644 --- a/j1939/Dm14Server.py +++ b/j1939/Dm14Server.py @@ -329,11 +329,12 @@ def verify_key(self, seed: int, key: int) -> bool: ) return self._key_from_seed(seed) == key - def reset_query(self) -> None: + def reset_server(self) -> None: """ - Resets query to initial state + Resets server to remove transaction specific data """ self.state = ResponseState.IDLE + self.data_queue = queue.Queue() self.sa = None self.seed = None self.key = None diff --git a/j1939/controller_application.py b/j1939/controller_application.py index bdf54ca..808a9ed 100644 --- a/j1939/controller_application.py +++ b/j1939/controller_application.py @@ -77,6 +77,13 @@ def subscribe(self, callback): """ self._ecu.subscribe(callback, self.message_acceptable) + def unique_subscribe(self, callback): + """Add the given callback to the message notification stream only once. + :param callback: + Function to call when message is received. + """ + self._ecu.unique_subscribe(callback, self.message_acceptable) + def unsubscribe(self, callback): """Stop listening for message. :param callback: diff --git a/j1939/electronic_control_unit.py b/j1939/electronic_control_unit.py index 8a1622d..be6c8aa 100644 --- a/j1939/electronic_control_unit.py +++ b/j1939/electronic_control_unit.py @@ -144,6 +144,20 @@ def subscribe(self, callback, device_address=None): """ self._subscribers.append({'cb': callback, 'dev_adr':device_address}) + def unique_subscribe(self, callback, device_address=None): + """Add the given callback to the message notification stream only once. + + :param callback: + Function to call when message is received. + :param int device_address: + Device address of the application. + This is a simple way for peer-to-peer reception without adding a controller-application. + """ + for dic in self._subscribers: + if dic['cb'] == callback: + return + self._subscribers.append({'cb': callback, 'dev_adr':device_address}) + def unsubscribe(self, callback): """Stop listening for message. @@ -288,7 +302,7 @@ def notify(self, can_id, data, timestamp): """ self.j1939_dll.notify(can_id, data, timestamp) - def add_bus_filters(self, fitlers: can.typechecking.CanFilters | None): + def add_bus_filters(self, filters: can.typechecking.CanFilters | None): """Add bus filters to the underlying CAN bus. :param filters: @@ -297,7 +311,7 @@ def add_bus_filters(self, fitlers: can.typechecking.CanFilters | None): """ if self._bus is None: raise RuntimeError("Not connected to CAN bus") - self._bus.set_filters(fitlers) + self._bus.set_filters(filters) def _async_job_thread(self): """Asynchronous thread for handling various jobs diff --git a/j1939/memory_access.py b/j1939/memory_access.py index 2eb3448..784ad43 100644 --- a/j1939/memory_access.py +++ b/j1939/memory_access.py @@ -13,6 +13,7 @@ class MemoryAccess: def __init__(self, ca: j1939.ControllerApplication) -> None: """ Makes an overarching Memory access class + :param ca: Controller Application """ self._ca = ca @@ -22,14 +23,33 @@ def __init__(self, ca: j1939.ControllerApplication) -> None: self.state = DMState.IDLE self.seed_security = False self._notify_query_received = None - self._seed_key_valid = None self._proceed_function = None + def _handle_error(self, priority: int, pgn: int, sa: int, timestamp: int, data: bytearray, error_code: int) -> None: + """ + Handles errors by resetting the state and unsubscribing from DM14 messages + + :param priority: Priority of the message + :param pgn: Parameter Group Number of the message + :param sa: Source Address of the message + :param timestamp: Timestamp of the message + :param data: Data of the PDU + :param error_code: Error code to be set + """ + self.server.error = error_code + self.server.set_busy(True) + self.server.parse_dm14( + priority, pgn, sa, timestamp, data + ) + self.server.set_busy(False) + self.reset() + def _listen_for_dm14( self, priority: int, pgn: int, sa: int, timestamp: int, data: bytearray ) -> None: """ Listens for dm14 messages and passes them to the appropriate function + :param priority: Priority of the message :param pgn: Parameter Group Number of the message :param sa: Source Address of the message @@ -64,15 +84,7 @@ def _listen_for_dm14( if self.proceed: self._notify_query_received() # notify incoming request else: - self.server.error = 0x100 - self.server.set_busy(True) - self.server.parse_dm14( - priority, pgn, sa, timestamp, data - ) - self.server.set_busy(False) - self.server.reset_query() - self.state = DMState.IDLE - self.server.error = 0x0 + self._handle_error(priority, pgn, sa, timestamp, data, 0x100) case DMState.REQUEST_STARTED: self.server.parse_dm14(priority, pgn, sa, timestamp, data) @@ -101,24 +113,9 @@ def _listen_for_dm14( if self.proceed: self._notify_query_received() # notify incoming request else: - self.server.error = 0x100 - self.server.set_busy(True) - self.server.parse_dm14( - priority, pgn, sa, timestamp, data - ) - self.server.set_busy(False) - self.server.reset_query() - self.state = DMState.IDLE - self.server.error = 0x0 + self._handle_error(priority, pgn, sa, timestamp, data, 0x100) else: - self.server.error = 0x1003 - self.server.set_busy(True) - self.server.parse_dm14( - priority, pgn, sa, timestamp, data - ) - self.server.set_busy(False) - self.state = DMState.IDLE - self.server.error = 0x0 + self._handle_error(priority, pgn, sa, timestamp, data, 0x1003) case DMState.WAIT_QUERY: self.server.set_busy(True) @@ -137,6 +134,7 @@ def respond( ) -> list: """ Responds with requested data and error code, if applicable, to a read request + :param bool proceed: whether the operation is good to proceed :param list data: data to be sent to device :param int error: error code to be sent to device @@ -189,7 +187,7 @@ def read( return_raw_bytes, max_timeout, ) - self.state = DMState.IDLE + self.reset() return data else: raise RuntimeWarning("Process already Running") @@ -205,6 +203,7 @@ def write( ) -> None: """ Send a write query to dest_address, requesting to write values at address + :param int dest_address: destination address of the message :param int direct: direct address of the message :param int address: address of the message @@ -218,18 +217,20 @@ def write( self.query.write( dest_address, direct, address, values, object_byte_size, max_timeout ) - self.state = DMState.IDLE + self.reset() def set_seed_generator(self, seed_generator: callable) -> None: """ Sets seed generator function to use + :param seed_generator: seed generator function """ self.server.set_seed_generator(seed_generator) def set_seed_key_algorithm(self, algorithm: callable) -> None: """ - set seed-key algorithm to be used for key generation + Sets seed-key algorithm to be used for key generation + :param callable algorithm: seed-key algorithm """ self.seed_security = True @@ -238,28 +239,33 @@ def set_seed_key_algorithm(self, algorithm: callable) -> None: def set_verify_key(self, verify_key: callable) -> None: """ - set verify key function to be used for verifying the key + Sets verify key function to be used for verifying the key + :param callable verify_key: verify key function """ self.server.set_verify_key(verify_key) def set_notify(self, notify: callable) -> None: """ - set notify function to be used for notifying the user of memory accesses + Sets notify function to be used for notifying the user of memory accesses + :param callable notify: notify function """ self._notify_query_received = notify def set_proceed(self, proceed: callable) -> None: """ - set proceed function to determine if a memory query is valid or not + Sets proceed function to determine if a memory query is valid or not + :param callable proceed: proceed function """ self._proceed_function = proceed - def reset_query(self) -> None: + def reset(self) -> None: """ - reset query for the server + Resets both server and query to remove transaction specific data """ - self._ca.subscribe(self._listen_for_dm14) - self.server.reset_query() + self.state = DMState.IDLE + self._ca.unique_subscribe(self._listen_for_dm14) + self.server.reset_server() + self.query.reset_query() From 3fb8d110e435f71d6d91a4cf0482668e7173ac8b Mon Sep 17 00:00:00 2001 From: Koltan Hauersperger Date: Mon, 26 Jan 2026 21:13:44 +0000 Subject: [PATCH 02/10] test: add coverage for new function --- test/test_ecu.py | 21 +++++++++++++++++++++ 1 file changed, 21 insertions(+) diff --git a/test/test_ecu.py b/test/test_ecu.py index 6e0930e..58086cc 100644 --- a/test/test_ecu.py +++ b/test/test_ecu.py @@ -200,3 +200,24 @@ def test_add_bus_filters(feeder): ] feeder.ecu.add_bus_filters(filters) assert feeder.ecu._bus.filters == filters + +def test_subscribe_unique(feeder): + """ + Test unique subscribing to messages + """ + call_count = 0 + + def callback(msg): + nonlocal call_count + call_count += 1 + + feeder.ecu.unique_subscribe(callback) + feeder.ecu.unique_subscribe(callback) # should not add again + + feeder.can_messages = [ + (Feeder.MsgType.CANRX, 0x00FEB201, [1, 2, 3, 4, 5, 6, 7, 8], 0.0), + ] + + feeder.receive() + + assert call_count == 1 From 191f5b3e150e7252a957e2295b63fb2cd9bf124b Mon Sep 17 00:00:00 2001 From: Koltan Hauersperger Date: Tue, 27 Jan 2026 15:34:11 +0000 Subject: [PATCH 03/10] feat: use unique subscribe in dm14 --- j1939/Dm14Query.py | 8 ++++---- j1939/Dm14Server.py | 8 ++++---- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/j1939/Dm14Query.py b/j1939/Dm14Query.py index d08b291..bbc1cd8 100644 --- a/j1939/Dm14Query.py +++ b/j1939/Dm14Query.py @@ -76,7 +76,7 @@ def _wait_for_data(self) -> None: else: self.state = QueryState.WAIT_FOR_DM16 self._ca.unsubscribe(self._parse_dm15) - self._ca.subscribe(self._parse_dm16) + self._ca.unique_subscribe(self._parse_dm16) def _send_operation_complete(self) -> None: """ @@ -196,7 +196,7 @@ def _parse_dm16( # assert object_count == self.object_count self.mem_data = data[1 : length + 1] self._ca.unsubscribe(self._parse_dm16) - self._ca.subscribe(self._parse_dm15) + self._ca.unique_subscribe(self._parse_dm15) self.state = QueryState.WAIT_FOR_OPER_COMPLETE def _values_to_bytes(self, values: list) -> bytearray: @@ -256,7 +256,7 @@ def read( self.signed = signed self.return_raw_bytes = return_raw_bytes self.command = Command.READ - self._ca.subscribe(self._parse_dm15) + self._ca.unique_subscribe(self._parse_dm15) self._send_dm14(self.user_level) self.state = QueryState.WAIT_FOR_SEED # wait for operation completed DM15 message @@ -302,7 +302,7 @@ def write( self.command = Command.WRITE self.bytes = self._values_to_bytes(values) self.object_count = len(values) - self._ca.subscribe(self._parse_dm15) + self._ca.unique_subscribe(self._parse_dm15) self._send_dm14(self.user_level) self.state = QueryState.WAIT_FOR_SEED # wait for operation completed DM15 message diff --git a/j1939/Dm14Server.py b/j1939/Dm14Server.py index f100424..657b338 100644 --- a/j1939/Dm14Server.py +++ b/j1939/Dm14Server.py @@ -44,7 +44,7 @@ def _wait_for_data(self) -> None: Determines whether to send data or wait to receive data based on the command type. If the command is a read command, then the data requested is sent. """ - self._ca.subscribe(self._parse_dm16) + self._ca.unique_subscribe(self._parse_dm16) self._send_dm15( self.length, self.direct, @@ -66,7 +66,7 @@ def _wait_for_data(self) -> None: if (len(self.data)) <= 8: self.proceed = True self.state = ResponseState.SEND_OPERATION_COMPLETE - self._ca.subscribe(self.parse_dm14) + self._ca.unique_subscribe(self.parse_dm14) self._send_dm15( self.length, self.direct, @@ -238,7 +238,7 @@ def _send_dm16(self) -> None: data.extend([0xFF] * (self.length - byte_count - 1)) if byte_count > 8: - self._ca.subscribe(self._parse_dm16) + self._ca.unique_subscribe(self._parse_dm16) self._ca.send_pgn(0, (self._pgn >> 8) & 0xFF, self.sa & 0xFF, 7, data) def _parse_dm16( @@ -259,7 +259,7 @@ def _parse_dm16( length = min(data[0], len(data) - 1) self.data_queue.put(data[1 : length + 1]) self._ca.unsubscribe(self._parse_dm16) - self._ca.subscribe(self.parse_dm14) + self._ca.unique_subscribe(self.parse_dm14) self.state = ResponseState.SEND_OPERATION_COMPLETE self._send_dm15( self.length, From d3b1c8355e210e0f89c52b6b963fb7a2448b6b35 Mon Sep 17 00:00:00 2001 From: Koltan Hauersperger Date: Tue, 27 Jan 2026 15:34:24 +0000 Subject: [PATCH 04/10] test: add coverage for sequential runs --- test/test_memory_access.py | 27 +++++++++++++++++++++++++-- 1 file changed, 25 insertions(+), 2 deletions(-) diff --git a/test/test_memory_access.py b/test/test_memory_access.py index 067c11b..9013eaa 100644 --- a/test/test_memory_access.py +++ b/test/test_memory_access.py @@ -222,7 +222,7 @@ def test_dm14_read(feeder, expected_messages): :param feeder: can message feeder :param expected_messages: list of expected messages """ - feeder.can_messages = expected_messages + feeder.can_messages = list(expected_messages) feeder.pdus_from_messages() ca = feeder.accept_all_messages( @@ -237,6 +237,11 @@ def test_dm14_read(feeder, expected_messages): dm14.read(0xD4, 1, 0x92000003, 1) feeder.process_messages() + feeder.can_messages = list(expected_messages) + + dm14.read(0xD4, 1, 0x92000003, 1) + + feeder.process_messages() @pytest.mark.parametrize( @@ -410,7 +415,7 @@ def test_dm14_request_write(feeder, expected_messages): :param feeder: can message feeder :param expected_messages: list of expected messages """ - feeder.can_messages = expected_messages + feeder.can_messages = list(expected_messages) feeder.pdus_from_messages() ca = feeder.accept_all_messages( device_address_preferred=0xD4, bypass_address_claim=True @@ -442,6 +447,24 @@ def test_dm14_request_write(feeder, expected_messages): feeder.process_messages() + feeder.can_messages = list(expected_messages) + + ca.send_pgn( + 0, + (j1939.ParameterGroupNumber.PGN.DM14 >> 8) & 0xFF, + 0xF9 & 0xFF, + 6, + [0x01, 0x13, 0x03, 0x00, 0x00, 0x92, 0x07, 0x00], + ) + + while flag is False: + pass + reset_flag() + test = dm14.respond(True, [], 0xFFFF, 0xFF) + assert values == int.from_bytes(test, byteorder="little", signed=False) + + feeder.process_messages() + def test_dm14_request_write_timeout(feeder): """ From db18508da3ebfc02baea6d282cec1648e4b7c06e Mon Sep 17 00:00:00 2001 From: Koltan Hauersperger Date: Tue, 27 Jan 2026 15:54:05 +0000 Subject: [PATCH 05/10] fix: broken subscribe test --- test/test_ecu.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/test/test_ecu.py b/test/test_ecu.py index 58086cc..d24b6fe 100644 --- a/test/test_ecu.py +++ b/test/test_ecu.py @@ -207,7 +207,7 @@ def test_subscribe_unique(feeder): """ call_count = 0 - def callback(msg): + def callback(priority: int, pgn: int, sa: int, timestamp: int, data: bytearray): nonlocal call_count call_count += 1 @@ -218,6 +218,8 @@ def callback(msg): (Feeder.MsgType.CANRX, 0x00FEB201, [1, 2, 3, 4, 5, 6, 7, 8], 0.0), ] + feeder.pdus = [(Feeder.MsgType.PDU, 65202, [1, 2, 3, 4, 5, 6, 7, 8])] + feeder.receive() assert call_count == 1 From c4ee9958cd51a50c1c341e3c9ef138fdd7b57335 Mon Sep 17 00:00:00 2001 From: Koltan Hauersperger Date: Tue, 27 Jan 2026 16:26:44 +0000 Subject: [PATCH 06/10] feat: update subscribe logic and use that instead --- j1939/Dm14Query.py | 8 ++++---- j1939/Dm14Server.py | 8 ++++---- j1939/controller_application.py | 7 ------- j1939/electronic_control_unit.py | 15 ++------------- j1939/memory_access.py | 2 +- test/test_ecu.py | 8 ++++---- 6 files changed, 15 insertions(+), 33 deletions(-) diff --git a/j1939/Dm14Query.py b/j1939/Dm14Query.py index bbc1cd8..d08b291 100644 --- a/j1939/Dm14Query.py +++ b/j1939/Dm14Query.py @@ -76,7 +76,7 @@ def _wait_for_data(self) -> None: else: self.state = QueryState.WAIT_FOR_DM16 self._ca.unsubscribe(self._parse_dm15) - self._ca.unique_subscribe(self._parse_dm16) + self._ca.subscribe(self._parse_dm16) def _send_operation_complete(self) -> None: """ @@ -196,7 +196,7 @@ def _parse_dm16( # assert object_count == self.object_count self.mem_data = data[1 : length + 1] self._ca.unsubscribe(self._parse_dm16) - self._ca.unique_subscribe(self._parse_dm15) + self._ca.subscribe(self._parse_dm15) self.state = QueryState.WAIT_FOR_OPER_COMPLETE def _values_to_bytes(self, values: list) -> bytearray: @@ -256,7 +256,7 @@ def read( self.signed = signed self.return_raw_bytes = return_raw_bytes self.command = Command.READ - self._ca.unique_subscribe(self._parse_dm15) + self._ca.subscribe(self._parse_dm15) self._send_dm14(self.user_level) self.state = QueryState.WAIT_FOR_SEED # wait for operation completed DM15 message @@ -302,7 +302,7 @@ def write( self.command = Command.WRITE self.bytes = self._values_to_bytes(values) self.object_count = len(values) - self._ca.unique_subscribe(self._parse_dm15) + self._ca.subscribe(self._parse_dm15) self._send_dm14(self.user_level) self.state = QueryState.WAIT_FOR_SEED # wait for operation completed DM15 message diff --git a/j1939/Dm14Server.py b/j1939/Dm14Server.py index 657b338..f100424 100644 --- a/j1939/Dm14Server.py +++ b/j1939/Dm14Server.py @@ -44,7 +44,7 @@ def _wait_for_data(self) -> None: Determines whether to send data or wait to receive data based on the command type. If the command is a read command, then the data requested is sent. """ - self._ca.unique_subscribe(self._parse_dm16) + self._ca.subscribe(self._parse_dm16) self._send_dm15( self.length, self.direct, @@ -66,7 +66,7 @@ def _wait_for_data(self) -> None: if (len(self.data)) <= 8: self.proceed = True self.state = ResponseState.SEND_OPERATION_COMPLETE - self._ca.unique_subscribe(self.parse_dm14) + self._ca.subscribe(self.parse_dm14) self._send_dm15( self.length, self.direct, @@ -238,7 +238,7 @@ def _send_dm16(self) -> None: data.extend([0xFF] * (self.length - byte_count - 1)) if byte_count > 8: - self._ca.unique_subscribe(self._parse_dm16) + self._ca.subscribe(self._parse_dm16) self._ca.send_pgn(0, (self._pgn >> 8) & 0xFF, self.sa & 0xFF, 7, data) def _parse_dm16( @@ -259,7 +259,7 @@ def _parse_dm16( length = min(data[0], len(data) - 1) self.data_queue.put(data[1 : length + 1]) self._ca.unsubscribe(self._parse_dm16) - self._ca.unique_subscribe(self.parse_dm14) + self._ca.subscribe(self.parse_dm14) self.state = ResponseState.SEND_OPERATION_COMPLETE self._send_dm15( self.length, diff --git a/j1939/controller_application.py b/j1939/controller_application.py index 808a9ed..4191b91 100644 --- a/j1939/controller_application.py +++ b/j1939/controller_application.py @@ -76,13 +76,6 @@ def subscribe(self, callback): Function to call when message is received. """ self._ecu.subscribe(callback, self.message_acceptable) - - def unique_subscribe(self, callback): - """Add the given callback to the message notification stream only once. - :param callback: - Function to call when message is received. - """ - self._ecu.unique_subscribe(callback, self.message_acceptable) def unsubscribe(self, callback): """Stop listening for message. diff --git a/j1939/electronic_control_unit.py b/j1939/electronic_control_unit.py index be6c8aa..894c7be 100644 --- a/j1939/electronic_control_unit.py +++ b/j1939/electronic_control_unit.py @@ -132,7 +132,7 @@ def disconnect(self): self._bus = None def subscribe(self, callback, device_address=None): - """Add the given callback to the message notification stream. + """Add the given callback to the message notification stream. If it's not already subscribed :param callback: Function to call when message is received. @@ -142,19 +142,8 @@ def subscribe(self, callback, device_address=None): Only one device address can be entered. Multiple device addresses are only possible with controller applications. Note: TP.CMDT will only be received if the destination address is bound to a controller application. """ - self._subscribers.append({'cb': callback, 'dev_adr':device_address}) - - def unique_subscribe(self, callback, device_address=None): - """Add the given callback to the message notification stream only once. - - :param callback: - Function to call when message is received. - :param int device_address: - Device address of the application. - This is a simple way for peer-to-peer reception without adding a controller-application. - """ for dic in self._subscribers: - if dic['cb'] == callback: + if dic['cb'] == callback and dic['dev_adr'] == device_address: return self._subscribers.append({'cb': callback, 'dev_adr':device_address}) diff --git a/j1939/memory_access.py b/j1939/memory_access.py index 784ad43..d43ca85 100644 --- a/j1939/memory_access.py +++ b/j1939/memory_access.py @@ -266,6 +266,6 @@ def reset(self) -> None: Resets both server and query to remove transaction specific data """ self.state = DMState.IDLE - self._ca.unique_subscribe(self._listen_for_dm14) + self._ca.subscribe(self._listen_for_dm14) self.server.reset_server() self.query.reset_query() diff --git a/test/test_ecu.py b/test/test_ecu.py index d24b6fe..6527a96 100644 --- a/test/test_ecu.py +++ b/test/test_ecu.py @@ -201,9 +201,9 @@ def test_add_bus_filters(feeder): feeder.ecu.add_bus_filters(filters) assert feeder.ecu._bus.filters == filters -def test_subscribe_unique(feeder): +def test_subscribe(feeder): """ - Test unique subscribing to messages + Test subscribing to callback only once """ call_count = 0 @@ -211,8 +211,8 @@ def callback(priority: int, pgn: int, sa: int, timestamp: int, data: bytearray): nonlocal call_count call_count += 1 - feeder.ecu.unique_subscribe(callback) - feeder.ecu.unique_subscribe(callback) # should not add again + feeder.ecu.subscribe(callback) + feeder.ecu.subscribe(callback) # should not add again feeder.can_messages = [ (Feeder.MsgType.CANRX, 0x00FEB201, [1, 2, 3, 4, 5, 6, 7, 8], 0.0), From e73e052f27daed15741245b55344bd7520485abe Mon Sep 17 00:00:00 2001 From: Koltan Hauersperger Date: Tue, 27 Jan 2026 16:28:13 +0000 Subject: [PATCH 07/10] style: remove white space --- j1939/controller_application.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/j1939/controller_application.py b/j1939/controller_application.py index 4191b91..bdf54ca 100644 --- a/j1939/controller_application.py +++ b/j1939/controller_application.py @@ -76,7 +76,7 @@ def subscribe(self, callback): Function to call when message is received. """ self._ecu.subscribe(callback, self.message_acceptable) - + def unsubscribe(self, callback): """Stop listening for message. :param callback: From 8e60a504e91bb9473c5fe43bca942b7b3cc722f6 Mon Sep 17 00:00:00 2001 From: Koltan Hauersperger Date: Tue, 27 Jan 2026 20:52:53 +0000 Subject: [PATCH 08/10] feat: address another receive bug --- j1939/Dm14Query.py | 8 ++++++++ j1939/Dm14Server.py | 16 ++++++++++------ j1939/electronic_control_unit.py | 10 +++++----- j1939/memory_access.py | 24 +++++++++++++++--------- 4 files changed, 38 insertions(+), 20 deletions(-) diff --git a/j1939/Dm14Query.py b/j1939/Dm14Query.py index d08b291..7e426ab 100644 --- a/j1939/Dm14Query.py +++ b/j1939/Dm14Query.py @@ -45,6 +45,13 @@ def __init__(self, ca: j1939.ControllerApplication, user_level=7) -> None: self.exception_queue = queue.Queue() self.user_level = user_level + def unsubscribe_all(self) -> None: + """ + Unsubscribes all message handlers + """ + self._ca.unsubscribe(self._parse_dm15) + self._ca.unsubscribe(self._parse_dm16) + def reset_query(self) -> None: """ Resets query to remove transaction specific data @@ -62,6 +69,7 @@ def reset_query(self) -> None: self.mem_data = None self.data_queue = queue.Queue() self.exception_queue = queue.Queue() + self.unsubscribe_all() def _wait_for_data(self) -> None: diff --git a/j1939/Dm14Server.py b/j1939/Dm14Server.py index f100424..ff70463 100644 --- a/j1939/Dm14Server.py +++ b/j1939/Dm14Server.py @@ -161,9 +161,7 @@ def parse_dm14( self.state = ResponseState.SEND_PROCEED case ResponseState.WAIT_OPERATION_COMPLETE: - self.state = ResponseState.IDLE - self.sa = None - self._ca.unsubscribe(self.parse_dm14) + self.reset_server() case _: raise ValueError("Invalid state") @@ -255,12 +253,12 @@ def _parse_dm16( if pgn != j1939.ParameterGroupNumber.PGN.DM16 or sa != self.sa: return - length = min(data[0], len(data) - 1) self.data_queue.put(data[1 : length + 1]) self._ca.unsubscribe(self._parse_dm16) self._ca.subscribe(self.parse_dm14) self.state = ResponseState.SEND_OPERATION_COMPLETE + self._send_dm15( self.length, self.direct, @@ -329,6 +327,13 @@ def verify_key(self, seed: int, key: int) -> bool: ) return self._key_from_seed(seed) == key + def unsubscribe_all(self) -> None: + """ + Unsubscribes all message handlers + """ + self._ca.unsubscribe(self.parse_dm14) + self._ca.unsubscribe(self._parse_dm16) + def reset_server(self) -> None: """ Resets server to remove transaction specific data @@ -347,8 +352,7 @@ def reset_server(self) -> None: self.edcp = 0x07 self.status = j1939.Dm15Status.PROCEED.value self.direct = 0 - self._ca.unsubscribe(self.parse_dm14) - self._ca.unsubscribe(self._parse_dm16) + self.unsubscribe_all() def respond( self, diff --git a/j1939/electronic_control_unit.py b/j1939/electronic_control_unit.py index 894c7be..cbb5abc 100644 --- a/j1939/electronic_control_unit.py +++ b/j1939/electronic_control_unit.py @@ -132,7 +132,7 @@ def disconnect(self): self._bus = None def subscribe(self, callback, device_address=None): - """Add the given callback to the message notification stream. If it's not already subscribed + """Add the given callback to the message notification stream. :param callback: Function to call when message is received. @@ -142,9 +142,6 @@ def subscribe(self, callback, device_address=None): Only one device address can be entered. Multiple device addresses are only possible with controller applications. Note: TP.CMDT will only be received if the destination address is bound to a controller application. """ - for dic in self._subscribers: - if dic['cb'] == callback and dic['dev_adr'] == device_address: - return self._subscribers.append({'cb': callback, 'dev_adr':device_address}) def unsubscribe(self, callback): @@ -377,7 +374,10 @@ def _notify_subscribers(self, priority, pgn, sa, dest, timestamp, data): logger.debug("notify subscribers for PGN {}".format(pgn)) # notify only the CA for which the message is intended # each CA receives all broadcast messages - for dic in self._subscribers: + + # TODO: this is ineffecient but there exists a possibility of removing subscribers during callback + # and adding new ones in while this is going and it can impact message receivement + for dic in self._subscribers.copy(): if (dic['dev_adr'] == None) or (dest == ParameterGroupNumber.Address.GLOBAL) or (callable(dic['dev_adr']) and dic['dev_adr'](dest)) or (dest == dic['dev_adr']): dic['cb'](priority, pgn, sa, timestamp, data) diff --git a/j1939/memory_access.py b/j1939/memory_access.py index d43ca85..7da12a5 100644 --- a/j1939/memory_access.py +++ b/j1939/memory_access.py @@ -7,6 +7,7 @@ class DMState(Enum): REQUEST_STARTED = 2 WAIT_RESPONSE = 3 WAIT_QUERY = 4 + SERVER_CLEANUP = 5 class MemoryAccess: @@ -121,9 +122,11 @@ def _listen_for_dm14( self.server.set_busy(True) self.server.parse_dm14(priority, pgn, sa, timestamp, data) self.server.set_busy(False) + case DMState.SERVER_CLEANUP: + self.state = DMState.IDLE case _: pass - + def respond( self, proceed: bool, @@ -143,14 +146,15 @@ def respond( """ if data is None: data = [] - if self.state is DMState.WAIT_RESPONSE: - self._ca.unsubscribe(self._listen_for_dm14) - self.state = DMState.IDLE - return_data = self.server.respond(proceed, data, error, edcp, max_timeout) - self._ca.subscribe(self._listen_for_dm14) - return return_data - else: + + if self.state is not DMState.WAIT_RESPONSE: return data + + self._ca.unsubscribe(self._listen_for_dm14) + return_data = self.server.respond(proceed, data, error, edcp, max_timeout) + self.state = DMState.SERVER_CLEANUP if self.server.state.value != DMState.IDLE.value else DMState.IDLE + self._ca.subscribe(self._listen_for_dm14) + return return_data def read( self, @@ -165,6 +169,7 @@ def read( ) -> list: """ Make a dm14 read Query + :param int dest_address: destination address of the message :param int direct: direct address of the message :param int address: address of the message @@ -190,6 +195,7 @@ def read( self.reset() return data else: + self.reset() raise RuntimeWarning("Process already Running") def write( @@ -222,7 +228,6 @@ def write( def set_seed_generator(self, seed_generator: callable) -> None: """ Sets seed generator function to use - :param seed_generator: seed generator function """ self.server.set_seed_generator(seed_generator) @@ -266,6 +271,7 @@ def reset(self) -> None: Resets both server and query to remove transaction specific data """ self.state = DMState.IDLE + self._ca.unsubscribe(self._listen_for_dm14) self._ca.subscribe(self._listen_for_dm14) self.server.reset_server() self.query.reset_query() From 1e9a5170bba32932ce22073c3db8b7b570d063d9 Mon Sep 17 00:00:00 2001 From: Koltan Hauersperger Date: Tue, 27 Jan 2026 20:53:25 +0000 Subject: [PATCH 09/10] test: update test expectation --- test/test_ecu.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/test/test_ecu.py b/test/test_ecu.py index 6527a96..65f8eb9 100644 --- a/test/test_ecu.py +++ b/test/test_ecu.py @@ -203,7 +203,7 @@ def test_add_bus_filters(feeder): def test_subscribe(feeder): """ - Test subscribing to callback only once + Test subscribing to callback """ call_count = 0 @@ -212,7 +212,6 @@ def callback(priority: int, pgn: int, sa: int, timestamp: int, data: bytearray): call_count += 1 feeder.ecu.subscribe(callback) - feeder.ecu.subscribe(callback) # should not add again feeder.can_messages = [ (Feeder.MsgType.CANRX, 0x00FEB201, [1, 2, 3, 4, 5, 6, 7, 8], 0.0), From fb32385c9834d81aa478467b67d1eccb59ffb678 Mon Sep 17 00:00:00 2001 From: Koltan Hauersperger Date: Thu, 29 Jan 2026 17:43:48 +0000 Subject: [PATCH 10/10] feat: reset server on invalid state --- j1939/Dm14Server.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/j1939/Dm14Server.py b/j1939/Dm14Server.py index ff70463..02bc859 100644 --- a/j1939/Dm14Server.py +++ b/j1939/Dm14Server.py @@ -164,6 +164,7 @@ def parse_dm14( self.reset_server() case _: + self.reset_server() raise ValueError("Invalid state") def _send_dm15( @@ -220,6 +221,7 @@ def _send_dm15( data[length - 3] = edcp case _: + self.reset_server() raise ValueError("Invalid state") self._ca.send_pgn(0, (pgn >> 8) & 0xFF, sa & 0xFF, 6, data)