diff --git a/j1939/Dm14Server.py b/j1939/Dm14Server.py index ff70463..6e52caa 100644 --- a/j1939/Dm14Server.py +++ b/j1939/Dm14Server.py @@ -164,6 +164,7 @@ def parse_dm14( self.reset_server() case _: + self.reset_server() raise ValueError("Invalid state") def _send_dm15( @@ -220,6 +221,7 @@ def _send_dm15( data[length - 3] = edcp case _: + self.reset_server() raise ValueError("Invalid state") self._ca.send_pgn(0, (pgn >> 8) & 0xFF, sa & 0xFF, 6, data) @@ -253,6 +255,7 @@ def _parse_dm16( if pgn != j1939.ParameterGroupNumber.PGN.DM16 or sa != self.sa: return + length = min(data[0], len(data) - 1) self.data_queue.put(data[1 : length + 1]) self._ca.unsubscribe(self._parse_dm16) @@ -388,5 +391,9 @@ def respond( self._wait_for_data() mem_data = None if self.state == ResponseState.WAIT_FOR_DM16: - mem_data = self.data_queue.get(block=True, timeout=max_timeout) + try: + mem_data = self.data_queue.get(block=True, timeout=max_timeout) + except queue.Empty: + self.reset_server() + raise RuntimeError("No data received from DM16 within timeout period") return mem_data diff --git a/j1939/memory_access.py b/j1939/memory_access.py index 7da12a5..bdc5b32 100644 --- a/j1939/memory_access.py +++ b/j1939/memory_access.py @@ -1,7 +1,8 @@ from enum import Enum +import threading +import time import j1939 - class DMState(Enum): IDLE = 1 REQUEST_STARTED = 2 @@ -20,12 +21,38 @@ def __init__(self, ca: j1939.ControllerApplication) -> None: self._ca = ca self.query = j1939.Dm14Query(ca) self.server = j1939.DM14Server(ca) + self.proceed = False self._ca.subscribe(self._listen_for_dm14) self.state = DMState.IDLE self.seed_security = False self._notify_query_received = None self._proceed_function = None + self._job_thread_end = threading.Event() + self._job_thread = threading.Thread(target=self._servicer, name='j1939.memory_access servicer_thread') + # A thread can be flagged as a "daemon thread". The significance of + # this flag is that the entire Python program exits when only daemon + # threads are left. + self._job_thread.daemon = True + self._job_thread.start() + + def __del__(self): + self._job_thread_end.set() + if self._job_thread.is_alive(): + self._job_thread.join() + + def _servicer(self): + """ + Job thread to service memory access requests + """ + while not self._job_thread_end.is_set(): + if (self.state == DMState.WAIT_RESPONSE) and self.proceed: + self.proceed = False + if self._notify_query_received is not None: + self._notify_query_received() # notify incoming request + time.sleep(0.001) # Add a small delay to yield control to other threads + + def _handle_error(self, priority: int, pgn: int, sa: int, timestamp: int, data: bytearray, error_code: int) -> None: """ Handles errors by resetting the state and unsubscribing from DM14 messages @@ -82,10 +109,10 @@ def _listen_for_dm14( self.server.access_level, 0x0, # placeholder for seed ) # call proceed function and pass in basic parameters - if self.proceed: - self._notify_query_received() # notify incoming request - else: + if not self.proceed: self._handle_error(priority, pgn, sa, timestamp, data, 0x100) + else: + self.proceed = True # no security, so always proceed case DMState.REQUEST_STARTED: self.server.parse_dm14(priority, pgn, sa, timestamp, data) @@ -111,10 +138,10 @@ def _listen_for_dm14( self.server.access_level, self.server.seed, ) # call proceed function and pass in basic parameters - if self.proceed: - self._notify_query_received() # notify incoming request - else: + if not self.proceed: self._handle_error(priority, pgn, sa, timestamp, data, 0x100) + else: + self.proceed = True # no proceed function, so always proceed else: self._handle_error(priority, pgn, sa, timestamp, data, 0x1003) @@ -122,6 +149,7 @@ def _listen_for_dm14( self.server.set_busy(True) self.server.parse_dm14(priority, pgn, sa, timestamp, data) self.server.set_busy(False) + case DMState.SERVER_CLEANUP: self.state = DMState.IDLE case _: @@ -150,6 +178,7 @@ def respond( if self.state is not DMState.WAIT_RESPONSE: return data + self.proceed = False self._ca.unsubscribe(self._listen_for_dm14) return_data = self.server.respond(proceed, data, error, edcp, max_timeout) self.state = DMState.SERVER_CLEANUP if self.server.state.value != DMState.IDLE.value else DMState.IDLE @@ -275,3 +304,4 @@ def reset(self) -> None: self._ca.subscribe(self._listen_for_dm14) self.server.reset_server() self.query.reset_query() + self.proceed = False diff --git a/test/test_ca.py b/test/test_ca.py index 28b0eb2..6f25a86 100644 --- a/test/test_ca.py +++ b/test/test_ca.py @@ -68,7 +68,7 @@ def test_addr_claim_fixed_reduced_time(feeder): identity_number=1234567, ) new_ca = feeder.ecu.add_ca(name=name, device_address=128) - new_ca.start(0.25) + new_ca.start(0.2) # wait until all messages are processed asynchronously # rounded up to account for scheduling delays diff --git a/test/test_memory_access.py b/test/test_memory_access.py index 9013eaa..ffa7977 100644 --- a/test/test_memory_access.py +++ b/test/test_memory_access.py @@ -1,8 +1,8 @@ import pytest from test_helpers.feeder import Feeder from test_helpers.conftest import feeder -import queue import j1939 +import time # fmt: off read_with_seed_key = [ @@ -255,7 +255,7 @@ def test_dm14_write(feeder, expected_messages): :param feeder: can message feeder :param expected_messages: list of expected messages """ - feeder.can_messages = expected_messages + feeder.can_messages = list(expected_messages) feeder.pdus_from_messages() ca = feeder.accept_all_messages( @@ -277,7 +277,7 @@ def test_dm14_read_busy( Tests the DM14 read query response to receiving another request :param feeder: can message feeder """ - feeder.can_messages = read_with_seed_key_busy + feeder.can_messages = list(read_with_seed_key_busy) feeder.pdus_from_messages() ca = feeder.accept_all_messages( @@ -304,18 +304,12 @@ def test_dm14_request_read(feeder, expected_messages): :param feeder: can message feeder :param expected_messages: list of expected messages """ - feeder.can_messages = expected_messages + feeder.can_messages = list(expected_messages) feeder.pdus_from_messages() ca = feeder.accept_all_messages( device_address_preferred=0xD4, bypass_address_claim=True ) - ca.send_pgn( - 0, - (j1939.ParameterGroupNumber.PGN.DM14 >> 8) & 0xFF, - 0xF9 & 0xFF, - 6, - [0x01, 0x13, 0x03, 0x00, 0x00, 0x92, 0x07, 0x00], - ) + global flag dm14 = j1939.MemoryAccess(ca) @@ -326,8 +320,18 @@ def test_dm14_request_read(feeder, expected_messages): if expected_messages == request_read_with_seed: dm14.set_seed_key_algorithm(key_from_seed) - while flag is False: + ca.send_pgn( + 0, + (j1939.ParameterGroupNumber.PGN.DM14 >> 8) & 0xFF, + 0xF9 & 0xFF, + 6, + [0x01, 0x13, 0x03, 0x00, 0x00, 0x92, 0x07, 0x00], + ) + timeout = 5.0 + start = time.monotonic() + while flag is False and (time.monotonic() - start) < timeout: pass + assert flag is True, "Timeout waiting for DM14 request" reset_flag() dm14.respond(True, [0x01], 0xFFFF, 0xFF) @@ -339,18 +343,12 @@ def test_dm14_request_read_busy(feeder): Tests the DM14 response to read query function while being busy responding to a read query response :param feeder: can message feeder """ - feeder.can_messages = request_read_with_seed_busy + feeder.can_messages = list(request_read_with_seed_busy) feeder.pdus_from_messages() ca = feeder.accept_all_messages( device_address_preferred=0xD4, bypass_address_claim=True ) - ca.send_pgn( - 0, - (j1939.ParameterGroupNumber.PGN.DM14 >> 8) & 0xFF, - 0xF9 & 0xFF, - 6, - [0x01, 0x13, 0x03, 0x00, 0x00, 0x92, 0x07, 0x00], - ) + global flag dm14 = j1939.MemoryAccess(ca) @@ -360,8 +358,19 @@ def test_dm14_request_read_busy(feeder): dm14.set_proceed(proceed) dm14.set_seed_key_algorithm(key_from_seed) - while flag is False: + ca.send_pgn( + 0, + (j1939.ParameterGroupNumber.PGN.DM14 >> 8) & 0xFF, + 0xF9 & 0xFF, + 6, + [0x01, 0x13, 0x03, 0x00, 0x00, 0x92, 0x07, 0x00], + ) + + timeout = 5.0 + start = time.monotonic() + while flag is False and (time.monotonic() - start) < timeout: pass + assert flag is True, "Timeout waiting for DM14 request" reset_flag() dm14.respond(True, [0x01], 0xFFFF, 0xFF) @@ -374,18 +383,12 @@ def test_dm14_busy_diff_addr(feeder): Tests the DM14 response to read query function from different source address while being busy responding to a read query response :param feeder: can message feeder """ - feeder.can_messages = receive_diff_sa_busy + feeder.can_messages = list(receive_diff_sa_busy) feeder.pdus_from_messages() ca = feeder.accept_all_messages( device_address_preferred=0xD4, bypass_address_claim=True ) - ca.send_pgn( - 0, - (j1939.ParameterGroupNumber.PGN.DM14 >> 8) & 0xFF, - 0xF9 & 0xFF, - 6, - [0x01, 0x13, 0x03, 0x00, 0x00, 0x92, 0x07, 0x00], - ) + global flag dm14 = j1939.MemoryAccess(ca) @@ -395,8 +398,19 @@ def test_dm14_busy_diff_addr(feeder): dm14.set_proceed(proceed) dm14.set_seed_key_algorithm(key_from_seed) - while flag is False: + ca.send_pgn( + 0, + (j1939.ParameterGroupNumber.PGN.DM14 >> 8) & 0xFF, + 0xF9 & 0xFF, + 6, + [0x01, 0x13, 0x03, 0x00, 0x00, 0x92, 0x07, 0x00], + ) + + timeout = 5.0 + start = time.monotonic() + while flag is False and (time.monotonic() - start) < timeout: pass + assert flag is True, "Timeout waiting for DM14 request" reset_flag() dm14.respond(True, [0x01], 0xFFFF, 0xFF) @@ -420,27 +434,33 @@ def test_dm14_request_write(feeder, expected_messages): ca = feeder.accept_all_messages( device_address_preferred=0xD4, bypass_address_claim=True ) - ca.send_pgn( - 0, - (j1939.ParameterGroupNumber.PGN.DM14 >> 8) & 0xFF, - 0xF9 & 0xFF, - 6, - [0x01, 0x13, 0x03, 0x00, 0x00, 0x92, 0x07, 0x00], - ) global flag dm14 = j1939.MemoryAccess(ca) - dm14.set_seed_generator(generate_seed) - dm14.set_proceed(proceed) - dm14.set_notify(global_flag) if expected_messages == request_write_with_seed: dm14.set_seed_key_algorithm(key_from_seed) dm14.set_verify_key(verify_key) + + dm14.set_seed_generator(generate_seed) + dm14.set_proceed(proceed) + dm14.set_notify(global_flag) + + ca.send_pgn( + 0, + (j1939.ParameterGroupNumber.PGN.DM14 >> 8) & 0xFF, + 0xF9 & 0xFF, + 6, + [0x01, 0x13, 0x03, 0x00, 0x00, 0x92, 0x07, 0x00], + ) + values = 0x11223344 - while flag is False: + timeout = 5.0 + start = time.monotonic() + while flag is False and (time.monotonic() - start) < timeout: pass + assert flag is True, "Timeout waiting for DM14 request" reset_flag() test = dm14.respond(True, [], 0xFFFF, 0xFF) assert values == int.from_bytes(test, byteorder="little", signed=False) @@ -457,9 +477,13 @@ def test_dm14_request_write(feeder, expected_messages): [0x01, 0x13, 0x03, 0x00, 0x00, 0x92, 0x07, 0x00], ) - while flag is False: + timeout = 5.0 + start = time.monotonic() + while flag is False and (time.monotonic() - start) < timeout: pass + assert flag is True, "Timeout waiting for DM14 request" reset_flag() + test = dm14.respond(True, [], 0xFFFF, 0xFF) assert values == int.from_bytes(test, byteorder="little", signed=False) @@ -471,12 +495,19 @@ def test_dm14_request_write_timeout(feeder): Tests the DM14 response to write query function timeout waiting for a DM16 response :param feeder: can message feeder """ - with pytest.raises(queue.Empty) as excinfo: - feeder.can_messages = request_write_no_seed_timeout + with pytest.raises(RuntimeError) as excinfo: + feeder.can_messages = list(request_write_no_seed_timeout) feeder.pdus_from_messages() ca = feeder.accept_all_messages( device_address_preferred=0xD4, bypass_address_claim=True ) + global flag + + dm14 = j1939.MemoryAccess(ca) + dm14.set_seed_generator(generate_seed) + dm14.set_proceed(proceed) + dm14.set_notify(global_flag) + ca.send_pgn( 0, (j1939.ParameterGroupNumber.PGN.DM14 >> 8) & 0xFF, @@ -485,19 +516,14 @@ def test_dm14_request_write_timeout(feeder): [0x01, 0x13, 0x03, 0x00, 0x00, 0x92, 0x07, 0x00], ) - global flag - - dm14 = j1939.MemoryAccess(ca) - dm14.set_seed_generator(generate_seed) - dm14.set_proceed(proceed) - dm14.set_notify(global_flag) - - values = 0x11223344 - while flag is False: + timeout = 5.0 + start = time.monotonic() + while flag is False and (time.monotonic() - start) < timeout: pass + assert flag is True, "Timeout waiting for DM14 request" reset_flag() - test = dm14.respond(True, [], 0xFFFF, 0xFF) - + dm14.respond(True, [], 0xFFFF, 0xFF) + assert str(excinfo.value) is "No response received for DM16 data transfer" feeder.process_messages() @@ -718,7 +744,7 @@ def test_dm14_read_error_response(feeder, expected_messages): :param expected_messages: list of expected messages """ with pytest.raises(RuntimeError) as excinfo: - feeder.can_messages = expected_messages + feeder.can_messages = list(expected_messages) feeder.pdus_from_messages() ca = feeder.accept_all_messages( device_address_preferred=0xF9, bypass_address_claim=True @@ -744,7 +770,7 @@ def test_dm14_write_error_response(feeder, expected_messages): :param expected_messages: list of expected messages """ with pytest.raises(RuntimeError) as excinfo: - feeder.can_messages = expected_messages + feeder.can_messages = list(expected_messages) feeder.pdus_from_messages() ca = feeder.accept_all_messages( device_address_preferred=0xF9, bypass_address_claim=True