Skip to content
27 changes: 27 additions & 0 deletions j1939/Dm14Query.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,33 @@ def __init__(self, ca: j1939.ControllerApplication, user_level=7) -> None:
self.exception_queue = queue.Queue()
self.user_level = user_level

def unsubscribe_all(self) -> None:
"""
Unsubscribes all message handlers
"""
self._ca.unsubscribe(self._parse_dm15)
self._ca.unsubscribe(self._parse_dm16)

def reset_query(self) -> None:
"""
Resets query to remove transaction specific data
"""
self.state = QueryState.IDLE
self._dest_address = None
self.address = None
self.object_count = 0
self.object_byte_size = 1
self.signed = False
self.return_raw_bytes = False
self.direct = 0
self.command = None
self.bytes = bytearray()
self.mem_data = None
self.data_queue = queue.Queue()
self.exception_queue = queue.Queue()
self.unsubscribe_all()


def _wait_for_data(self) -> None:
"""
Determines whether to send data or wait to receive data based on the command type. If the command is a write command, then the data is sent.
Expand Down
23 changes: 15 additions & 8 deletions j1939/Dm14Server.py
Original file line number Diff line number Diff line change
Expand Up @@ -161,11 +161,10 @@ def parse_dm14(
self.state = ResponseState.SEND_PROCEED

case ResponseState.WAIT_OPERATION_COMPLETE:
self.state = ResponseState.IDLE
self.sa = None
self._ca.unsubscribe(self.parse_dm14)
self.reset_server()

case _:
self.reset_server()
raise ValueError("Invalid state")

def _send_dm15(
Expand Down Expand Up @@ -222,6 +221,7 @@ def _send_dm15(
data[length - 3] = edcp

case _:
self.reset_server()
raise ValueError("Invalid state")
self._ca.send_pgn(0, (pgn >> 8) & 0xFF, sa & 0xFF, 6, data)

Expand Down Expand Up @@ -255,12 +255,12 @@ def _parse_dm16(

if pgn != j1939.ParameterGroupNumber.PGN.DM16 or sa != self.sa:
return

length = min(data[0], len(data) - 1)
self.data_queue.put(data[1 : length + 1])
self._ca.unsubscribe(self._parse_dm16)
self._ca.subscribe(self.parse_dm14)
self.state = ResponseState.SEND_OPERATION_COMPLETE

self._send_dm15(
self.length,
self.direct,
Expand Down Expand Up @@ -329,11 +329,19 @@ def verify_key(self, seed: int, key: int) -> bool:
)
return self._key_from_seed(seed) == key

def reset_query(self) -> None:
def unsubscribe_all(self) -> None:
"""
Unsubscribes all message handlers
"""
self._ca.unsubscribe(self.parse_dm14)
self._ca.unsubscribe(self._parse_dm16)

def reset_server(self) -> None:
"""
Resets query to initial state
Resets server to remove transaction specific data
"""
self.state = ResponseState.IDLE
self.data_queue = queue.Queue()
self.sa = None
self.seed = None
self.key = None
Expand All @@ -346,8 +354,7 @@ def reset_query(self) -> None:
self.edcp = 0x07
self.status = j1939.Dm15Status.PROCEED.value
self.direct = 0
self._ca.unsubscribe(self.parse_dm14)
self._ca.unsubscribe(self._parse_dm16)
self.unsubscribe_all()

def respond(
self,
Expand Down
9 changes: 6 additions & 3 deletions j1939/electronic_control_unit.py
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,7 @@ def notify(self, can_id, data, timestamp):
"""
self.j1939_dll.notify(can_id, data, timestamp)

def add_bus_filters(self, fitlers: can.typechecking.CanFilters | None):
def add_bus_filters(self, filters: can.typechecking.CanFilters | None):
"""Add bus filters to the underlying CAN bus.

:param filters:
Expand All @@ -297,7 +297,7 @@ def add_bus_filters(self, fitlers: can.typechecking.CanFilters | None):
"""
if self._bus is None:
raise RuntimeError("Not connected to CAN bus")
self._bus.set_filters(fitlers)
self._bus.set_filters(filters)

def _async_job_thread(self):
"""Asynchronous thread for handling various jobs
Expand Down Expand Up @@ -374,7 +374,10 @@ def _notify_subscribers(self, priority, pgn, sa, dest, timestamp, data):
logger.debug("notify subscribers for PGN {}".format(pgn))
# notify only the CA for which the message is intended
# each CA receives all broadcast messages
for dic in self._subscribers:

# TODO: this is ineffecient but there exists a possibility of removing subscribers during callback
# and adding new ones in while this is going and it can impact message receivement
for dic in self._subscribers.copy():
if (dic['dev_adr'] == None) or (dest == ParameterGroupNumber.Address.GLOBAL) or (callable(dic['dev_adr']) and dic['dev_adr'](dest)) or (dest == dic['dev_adr']):
dic['cb'](priority, pgn, sa, timestamp, data)

Expand Down
100 changes: 56 additions & 44 deletions j1939/memory_access.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,14 @@ class DMState(Enum):
REQUEST_STARTED = 2
WAIT_RESPONSE = 3
WAIT_QUERY = 4
SERVER_CLEANUP = 5


class MemoryAccess:
def __init__(self, ca: j1939.ControllerApplication) -> None:
"""
Makes an overarching Memory access class

:param ca: Controller Application
"""
self._ca = ca
Expand All @@ -22,14 +24,33 @@ def __init__(self, ca: j1939.ControllerApplication) -> None:
self.state = DMState.IDLE
self.seed_security = False
self._notify_query_received = None
self._seed_key_valid = None
self._proceed_function = None

def _handle_error(self, priority: int, pgn: int, sa: int, timestamp: int, data: bytearray, error_code: int) -> None:
"""
Handles errors by resetting the state and unsubscribing from DM14 messages

:param priority: Priority of the message
:param pgn: Parameter Group Number of the message
:param sa: Source Address of the message
:param timestamp: Timestamp of the message
:param data: Data of the PDU
:param error_code: Error code to be set
"""
self.server.error = error_code
self.server.set_busy(True)
self.server.parse_dm14(
priority, pgn, sa, timestamp, data
)
self.server.set_busy(False)
self.reset()

def _listen_for_dm14(
self, priority: int, pgn: int, sa: int, timestamp: int, data: bytearray
) -> None:
"""
Listens for dm14 messages and passes them to the appropriate function

:param priority: Priority of the message
:param pgn: Parameter Group Number of the message
:param sa: Source Address of the message
Expand Down Expand Up @@ -64,15 +85,7 @@ def _listen_for_dm14(
if self.proceed:
self._notify_query_received() # notify incoming request
else:
self.server.error = 0x100
self.server.set_busy(True)
self.server.parse_dm14(
priority, pgn, sa, timestamp, data
)
self.server.set_busy(False)
self.server.reset_query()
self.state = DMState.IDLE
self.server.error = 0x0
self._handle_error(priority, pgn, sa, timestamp, data, 0x100)

case DMState.REQUEST_STARTED:
self.server.parse_dm14(priority, pgn, sa, timestamp, data)
Expand Down Expand Up @@ -101,32 +114,19 @@ def _listen_for_dm14(
if self.proceed:
self._notify_query_received() # notify incoming request
else:
self.server.error = 0x100
self.server.set_busy(True)
self.server.parse_dm14(
priority, pgn, sa, timestamp, data
)
self.server.set_busy(False)
self.server.reset_query()
self.state = DMState.IDLE
self.server.error = 0x0
self._handle_error(priority, pgn, sa, timestamp, data, 0x100)
else:
self.server.error = 0x1003
self.server.set_busy(True)
self.server.parse_dm14(
priority, pgn, sa, timestamp, data
)
self.server.set_busy(False)
self.state = DMState.IDLE
self.server.error = 0x0
self._handle_error(priority, pgn, sa, timestamp, data, 0x1003)

case DMState.WAIT_QUERY:
self.server.set_busy(True)
self.server.parse_dm14(priority, pgn, sa, timestamp, data)
self.server.set_busy(False)
case DMState.SERVER_CLEANUP:
self.state = DMState.IDLE
case _:
pass

def respond(
self,
proceed: bool,
Expand All @@ -137,6 +137,7 @@ def respond(
) -> list:
"""
Responds with requested data and error code, if applicable, to a read request

:param bool proceed: whether the operation is good to proceed
:param list data: data to be sent to device
:param int error: error code to be sent to device
Expand All @@ -145,14 +146,15 @@ def respond(
"""
if data is None:
data = []
if self.state is DMState.WAIT_RESPONSE:
self._ca.unsubscribe(self._listen_for_dm14)
self.state = DMState.IDLE
return_data = self.server.respond(proceed, data, error, edcp, max_timeout)
self._ca.subscribe(self._listen_for_dm14)
return return_data
else:

if self.state is not DMState.WAIT_RESPONSE:
return data

self._ca.unsubscribe(self._listen_for_dm14)
return_data = self.server.respond(proceed, data, error, edcp, max_timeout)
self.state = DMState.SERVER_CLEANUP if self.server.state.value != DMState.IDLE.value else DMState.IDLE
self._ca.subscribe(self._listen_for_dm14)
return return_data

def read(
self,
Expand All @@ -167,6 +169,7 @@ def read(
) -> list:
"""
Make a dm14 read Query

:param int dest_address: destination address of the message
:param int direct: direct address of the message
:param int address: address of the message
Expand All @@ -189,9 +192,10 @@ def read(
return_raw_bytes,
max_timeout,
)
self.state = DMState.IDLE
self.reset()
return data
else:
self.reset()
raise RuntimeWarning("Process already Running")

def write(
Expand All @@ -205,6 +209,7 @@ def write(
) -> None:
"""
Send a write query to dest_address, requesting to write values at address

:param int dest_address: destination address of the message
:param int direct: direct address of the message
:param int address: address of the message
Expand All @@ -218,7 +223,7 @@ def write(
self.query.write(
dest_address, direct, address, values, object_byte_size, max_timeout
)
self.state = DMState.IDLE
self.reset()

def set_seed_generator(self, seed_generator: callable) -> None:
"""
Expand All @@ -229,7 +234,8 @@ def set_seed_generator(self, seed_generator: callable) -> None:

def set_seed_key_algorithm(self, algorithm: callable) -> None:
"""
set seed-key algorithm to be used for key generation
Sets seed-key algorithm to be used for key generation

:param callable algorithm: seed-key algorithm
"""
self.seed_security = True
Expand All @@ -238,28 +244,34 @@ def set_seed_key_algorithm(self, algorithm: callable) -> None:

def set_verify_key(self, verify_key: callable) -> None:
"""
set verify key function to be used for verifying the key
Sets verify key function to be used for verifying the key

:param callable verify_key: verify key function
"""
self.server.set_verify_key(verify_key)

def set_notify(self, notify: callable) -> None:
"""
set notify function to be used for notifying the user of memory accesses
Sets notify function to be used for notifying the user of memory accesses

:param callable notify: notify function
"""
self._notify_query_received = notify

def set_proceed(self, proceed: callable) -> None:
"""
set proceed function to determine if a memory query is valid or not
Sets proceed function to determine if a memory query is valid or not

:param callable proceed: proceed function
"""
self._proceed_function = proceed

def reset_query(self) -> None:
def reset(self) -> None:
"""
reset query for the server
Resets both server and query to remove transaction specific data
"""
self.state = DMState.IDLE
self._ca.unsubscribe(self._listen_for_dm14)
self._ca.subscribe(self._listen_for_dm14)
self.server.reset_query()
self.server.reset_server()
self.query.reset_query()
22 changes: 22 additions & 0 deletions test/test_ecu.py
Original file line number Diff line number Diff line change
Expand Up @@ -200,3 +200,25 @@ def test_add_bus_filters(feeder):
]
feeder.ecu.add_bus_filters(filters)
assert feeder.ecu._bus.filters == filters

def test_subscribe(feeder):
"""
Test subscribing to callback
"""
call_count = 0

def callback(priority: int, pgn: int, sa: int, timestamp: int, data: bytearray):
nonlocal call_count
call_count += 1

feeder.ecu.subscribe(callback)

feeder.can_messages = [
(Feeder.MsgType.CANRX, 0x00FEB201, [1, 2, 3, 4, 5, 6, 7, 8], 0.0),
]

feeder.pdus = [(Feeder.MsgType.PDU, 65202, [1, 2, 3, 4, 5, 6, 7, 8])]

feeder.receive()

assert call_count == 1
Loading