From f4ce000c39123ac24cf308990da6be8efc58e6b6 Mon Sep 17 00:00:00 2001 From: kellergoech <38539019+kellergoech@users.noreply.github.com> Date: Tue, 15 Jul 2025 17:30:18 +0200 Subject: [PATCH 1/7] Update multipakcet send to have a que Added multipacket send que, if multipacket send is busy, message qill go to que and will be processed when next possible. If same message arrvies when in que values will be updated. --- j1939/j1939_21.py | 127 +++++++++++++++++++++++++++++++--------------- 1 file changed, 85 insertions(+), 42 deletions(-) diff --git a/j1939/j1939_21.py b/j1939/j1939_21.py index 75553eb..faa3adc 100644 --- a/j1939/j1939_21.py +++ b/j1939/j1939_21.py @@ -43,6 +43,8 @@ def __init__(self, send_message, job_thread_wakeup, notify_subscribers, max_cmdt self._rcv_buffer = {} # Send buffers self._snd_buffer = {} + # send que + self._snd_que = {} # List of ControllerApplication self._cas = [] @@ -51,7 +53,7 @@ def __init__(self, send_message, job_thread_wakeup, notify_subscribers, max_cmdt self._minimum_tp_rts_cts_dt_interval = minimum_tp_rts_cts_dt_interval # set minimum time between two tp-bam messages - if minimum_tp_bam_dt_interval == None: + if minimum_tp_bam_dt_interval is None: self._minimum_tp_bam_dt_interval = self.Timeout.Tb else: self._minimum_tp_bam_dt_interval = minimum_tp_bam_dt_interval @@ -89,7 +91,72 @@ def _buffer_hash(self, src_address, dest_address): """ return ((src_address & 0xFF) << 8) | (dest_address & 0xFF) - def send_pgn(self, data_page, pdu_format, pdu_specific, priority, src_address, data, time_limit, frame_format): + + def _put_multi_msg(self, data, dest_address, src_address,priority, pgn, buffer_hash,pdu_specific): + message_size = len(data) + num_packets = ( + int(message_size / 7) + if (message_size % 7 == 0) + else int(message_size / 7) + 1 + ) + + # if the PF is between 240 and 255, the message can only be broadcast + if dest_address == ParameterGroupNumber.Address.GLOBAL: + # send BAM + self.__send_tp_bam( + src_address, priority, pgn.value, message_size, num_packets + ) + # init new buffer for this connection + self._snd_buffer[buffer_hash] = { + "pgn": pgn.value, + "priority": priority, + "message_size": message_size, + "num_packages": num_packets, + "data": data, + "state": self.SendBufferState.SENDING_BM, + "deadline": time.time() + self._minimum_tp_bam_dt_interval, + "src_address": src_address, + "dest_address": ParameterGroupNumber.Address.GLOBAL, + "next_packet_to_send": 0, + } + else: + # send RTS/CTS + pgn.pdu_specific = 0 # this is 0 for peer-to-peer transfer + # init new buffer for this connection + self._snd_buffer[buffer_hash] = { + "pgn": pgn.value, + "priority": priority, + "message_size": message_size, + "num_packages": num_packets, + "data": data, + "state": self.SendBufferState.WAITING_CTS, + "deadline": time.time() + self.Timeout.T3, + "src_address": src_address, + "dest_address": pdu_specific, + "next_packet_to_send": 0, + "next_wait_on_cts": 0, + } + self.__send_tp_rts( + src_address, + pdu_specific, + priority, + pgn.value, + message_size, + num_packets, + min(self._max_cmdt_packets, num_packets), + ) + + def send_pgn( + self, + data_page, + pdu_format, + pdu_specific, + priority, + src_address, + data, + time_limit, + frame_format, + ): pgn = ParameterGroupNumber(data_page, pdu_format, pdu_specific) if len(data) <= 8: # send normal message @@ -108,47 +175,16 @@ def send_pgn(self, data_page, pdu_format, pdu_specific, priority, src_address, d buffer_hash = self._buffer_hash(src_address, dest_address) if buffer_hash in self._snd_buffer: # There is already a sequence active for this pair + # put in que + self._snd_que[pgn] = {'buffer_hash': buffer_hash, + 'pdu_specific': pdu_specific, + 'priority': priority, + 'src_address':src_address, + 'pgn': pgn, + 'data': data, + 'dest_address': dest_address} return False - message_size = len(data) - num_packets = int(message_size / 7) if (message_size % 7 == 0) else int(message_size / 7) + 1 - - # if the PF is between 240 and 255, the message can only be broadcast - if dest_address == ParameterGroupNumber.Address.GLOBAL: - # send BAM - self.__send_tp_bam(src_address, priority, pgn.value, message_size, num_packets) - - # init new buffer for this connection - self._snd_buffer[buffer_hash] = { - "pgn": pgn.value, - "priority": priority, - "message_size": message_size, - "num_packages": num_packets, - "data": data, - "state": self.SendBufferState.SENDING_BM, - "deadline": time.time() + self._minimum_tp_bam_dt_interval, - 'src_address' : src_address, - 'dest_address' : ParameterGroupNumber.Address.GLOBAL, - 'next_packet_to_send' : 0, - } - else: - # send RTS/CTS - pgn.pdu_specific = 0 # this is 0 for peer-to-peer transfer - # init new buffer for this connection - self._snd_buffer[buffer_hash] = { - "pgn": pgn.value, - "priority": priority, - "message_size": message_size, - "num_packages": num_packets, - "data": data, - "state": self.SendBufferState.WAITING_CTS, - "deadline": time.time() + self.Timeout.T3, - 'src_address' : src_address, - 'dest_address' : pdu_specific, - 'next_packet_to_send' : 0, - 'next_wait_on_cts': 0, - } - self.__send_tp_rts(src_address, pdu_specific, priority, pgn.value, message_size, num_packets, min(self._max_cmdt_packets, num_packets)) - + self._put_multi_msg(data, dest_address, src_address, priority, pgn, buffer_hash, pdu_specific) self.__job_thread_wakeup() return True @@ -175,6 +211,13 @@ def async_job_thread(self, now): # TODO: should we notify our CAs about the cancelled transfer? del self._rcv_buffer[bufid] + # get from que if buffer is empty + if not bool(self._snd_buffer): + for key, value in self._snd_que.items(): + self._put_multi_msg(**value) + self._snd_que.pop(key) + break + # check send buffers # using "list(x)" to prevent "RuntimeError: dictionary changed size during iteration" for bufid in list(self._snd_buffer): From f18ea1f6ffd163f197e2dde06aef018a80e45085 Mon Sep 17 00:00:00 2001 From: kellergoech <38539019+kellergoech@users.noreply.github.com> Date: Mon, 4 Aug 2025 09:42:15 +0200 Subject: [PATCH 2/7] dont crash when sending not possible Only make an log entry if sending of message is not possible and dont crash the application. (automatically works again when CAN sending is working again) --- j1939/electronic_control_unit.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/j1939/electronic_control_unit.py b/j1939/electronic_control_unit.py index 16fb98e..3e1d917 100644 --- a/j1939/electronic_control_unit.py +++ b/j1939/electronic_control_unit.py @@ -266,9 +266,10 @@ def send_message(self, can_id, extended_id, data, fd_format=False): is_fd=fd_format, bitrate_switch=fd_format ) - with self._send_lock: - self._bus.send(msg) - # TODO: check error receivement + try: + self._bus.send(msg) + except can.CanOperationError: + logger.info('not able to send message because CAN has not free Buffer space') def notify(self, can_id, data, timestamp): """Feed incoming CAN message into this ecu. From c7e1f8d9524c34e6f1073d0d044acf64e0044ed1 Mon Sep 17 00:00:00 2001 From: kellergoech <38539019+kellergoech@users.noreply.github.com> Date: Mon, 4 Aug 2025 09:44:08 +0200 Subject: [PATCH 3/7] set info to error --- j1939/electronic_control_unit.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/j1939/electronic_control_unit.py b/j1939/electronic_control_unit.py index 3e1d917..9d94f3e 100644 --- a/j1939/electronic_control_unit.py +++ b/j1939/electronic_control_unit.py @@ -269,7 +269,7 @@ def send_message(self, can_id, extended_id, data, fd_format=False): try: self._bus.send(msg) except can.CanOperationError: - logger.info('not able to send message because CAN has not free Buffer space') + logger.error('not able to send message because CAN has not free Buffer space') def notify(self, can_id, data, timestamp): """Feed incoming CAN message into this ecu. From 9fc967c5b4e5c45b3945b278fe6d9d9e9aee2790 Mon Sep 17 00:00:00 2001 From: kellergoech <38539019+kellergoech@users.noreply.github.com> Date: Mon, 4 Aug 2025 12:41:55 +0200 Subject: [PATCH 4/7] even more beautiful message --- j1939/electronic_control_unit.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/j1939/electronic_control_unit.py b/j1939/electronic_control_unit.py index 9d94f3e..28f3ceb 100644 --- a/j1939/electronic_control_unit.py +++ b/j1939/electronic_control_unit.py @@ -269,7 +269,7 @@ def send_message(self, can_id, extended_id, data, fd_format=False): try: self._bus.send(msg) except can.CanOperationError: - logger.error('not able to send message because CAN has not free Buffer space') + logger.error(f'not able to send message because {e}') def notify(self, can_id, data, timestamp): """Feed incoming CAN message into this ecu. From 1e4ed6f16b565e6f953cac522909e4f72a4f7169 Mon Sep 17 00:00:00 2001 From: kellergoech <38539019+kellergoech@users.noreply.github.com> Date: Mon, 4 Aug 2025 12:44:39 +0200 Subject: [PATCH 5/7] generic for all can errors --- j1939/electronic_control_unit.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/j1939/electronic_control_unit.py b/j1939/electronic_control_unit.py index 28f3ceb..a983b66 100644 --- a/j1939/electronic_control_unit.py +++ b/j1939/electronic_control_unit.py @@ -268,7 +268,7 @@ def send_message(self, can_id, extended_id, data, fd_format=False): ) try: self._bus.send(msg) - except can.CanOperationError: + except can.CanError as e: logger.error(f'not able to send message because {e}') def notify(self, can_id, data, timestamp): From 4581e3a8d74b623f4fb9a2415b29a743b9ad57de Mon Sep 17 00:00:00 2001 From: kellergoech <38539019+kellergoech@users.noreply.github.com> Date: Thu, 4 Sep 2025 11:53:55 +0200 Subject: [PATCH 6/7] Update j1939_21.py use pgn.value instead of pgn for send buffer index --- j1939/j1939_21.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/j1939/j1939_21.py b/j1939/j1939_21.py index faa3adc..c52535d 100644 --- a/j1939/j1939_21.py +++ b/j1939/j1939_21.py @@ -176,7 +176,7 @@ def send_pgn( if buffer_hash in self._snd_buffer: # There is already a sequence active for this pair # put in que - self._snd_que[pgn] = {'buffer_hash': buffer_hash, + self._snd_que[pgn.value] = {'buffer_hash': buffer_hash, 'pdu_specific': pdu_specific, 'priority': priority, 'src_address':src_address, From 32fb799a60705337da51641ae8becedb28bc574d Mon Sep 17 00:00:00 2001 From: kellergoech <38539019+kellergoech@users.noreply.github.com> Date: Thu, 4 Sep 2025 11:59:15 +0200 Subject: [PATCH 7/7] Update electronic_control_unit.py --- j1939/electronic_control_unit.py | 1 + 1 file changed, 1 insertion(+) diff --git a/j1939/electronic_control_unit.py b/j1939/electronic_control_unit.py index a983b66..e1c95b4 100644 --- a/j1939/electronic_control_unit.py +++ b/j1939/electronic_control_unit.py @@ -266,6 +266,7 @@ def send_message(self, can_id, extended_id, data, fd_format=False): is_fd=fd_format, bitrate_switch=fd_format ) + with self._send_lock: try: self._bus.send(msg) except can.CanError as e: