diff --git a/j1939/electronic_control_unit.py b/j1939/electronic_control_unit.py index cbb5abc..21d81f4 100644 --- a/j1939/electronic_control_unit.py +++ b/j1939/electronic_control_unit.py @@ -267,8 +267,10 @@ def send_message(self, can_id, extended_id, data, fd_format=False): bitrate_switch=fd_format ) with self._send_lock: - self._bus.send(msg) - # TODO: check error receivement + try: + self._bus.send(msg) + except can.CanError as e: + logger.error(f'not able to send message because {e}') def notify(self, can_id, data, timestamp): """Feed incoming CAN message into this ecu. diff --git a/j1939/j1939_21.py b/j1939/j1939_21.py index 75553eb..c52535d 100644 --- a/j1939/j1939_21.py +++ b/j1939/j1939_21.py @@ -43,6 +43,8 @@ def __init__(self, send_message, job_thread_wakeup, notify_subscribers, max_cmdt self._rcv_buffer = {} # Send buffers self._snd_buffer = {} + # send que + self._snd_que = {} # List of ControllerApplication self._cas = [] @@ -51,7 +53,7 @@ def __init__(self, send_message, job_thread_wakeup, notify_subscribers, max_cmdt self._minimum_tp_rts_cts_dt_interval = minimum_tp_rts_cts_dt_interval # set minimum time between two tp-bam messages - if minimum_tp_bam_dt_interval == None: + if minimum_tp_bam_dt_interval is None: self._minimum_tp_bam_dt_interval = self.Timeout.Tb else: self._minimum_tp_bam_dt_interval = minimum_tp_bam_dt_interval @@ -89,7 +91,72 @@ def _buffer_hash(self, src_address, dest_address): """ return ((src_address & 0xFF) << 8) | (dest_address & 0xFF) - def send_pgn(self, data_page, pdu_format, pdu_specific, priority, src_address, data, time_limit, frame_format): + + def _put_multi_msg(self, data, dest_address, src_address,priority, pgn, buffer_hash,pdu_specific): + message_size = len(data) + num_packets = ( + int(message_size / 7) + if (message_size % 7 == 0) + else int(message_size / 7) + 1 + ) + + # if the PF is between 240 and 255, the message can only be broadcast + if dest_address == ParameterGroupNumber.Address.GLOBAL: + # send BAM + self.__send_tp_bam( + src_address, priority, pgn.value, message_size, num_packets + ) + # init new buffer for this connection + self._snd_buffer[buffer_hash] = { + "pgn": pgn.value, + "priority": priority, + "message_size": message_size, + "num_packages": num_packets, + "data": data, + "state": self.SendBufferState.SENDING_BM, + "deadline": time.time() + self._minimum_tp_bam_dt_interval, + "src_address": src_address, + "dest_address": ParameterGroupNumber.Address.GLOBAL, + "next_packet_to_send": 0, + } + else: + # send RTS/CTS + pgn.pdu_specific = 0 # this is 0 for peer-to-peer transfer + # init new buffer for this connection + self._snd_buffer[buffer_hash] = { + "pgn": pgn.value, + "priority": priority, + "message_size": message_size, + "num_packages": num_packets, + "data": data, + "state": self.SendBufferState.WAITING_CTS, + "deadline": time.time() + self.Timeout.T3, + "src_address": src_address, + "dest_address": pdu_specific, + "next_packet_to_send": 0, + "next_wait_on_cts": 0, + } + self.__send_tp_rts( + src_address, + pdu_specific, + priority, + pgn.value, + message_size, + num_packets, + min(self._max_cmdt_packets, num_packets), + ) + + def send_pgn( + self, + data_page, + pdu_format, + pdu_specific, + priority, + src_address, + data, + time_limit, + frame_format, + ): pgn = ParameterGroupNumber(data_page, pdu_format, pdu_specific) if len(data) <= 8: # send normal message @@ -108,47 +175,16 @@ def send_pgn(self, data_page, pdu_format, pdu_specific, priority, src_address, d buffer_hash = self._buffer_hash(src_address, dest_address) if buffer_hash in self._snd_buffer: # There is already a sequence active for this pair + # put in que + self._snd_que[pgn.value] = {'buffer_hash': buffer_hash, + 'pdu_specific': pdu_specific, + 'priority': priority, + 'src_address':src_address, + 'pgn': pgn, + 'data': data, + 'dest_address': dest_address} return False - message_size = len(data) - num_packets = int(message_size / 7) if (message_size % 7 == 0) else int(message_size / 7) + 1 - - # if the PF is between 240 and 255, the message can only be broadcast - if dest_address == ParameterGroupNumber.Address.GLOBAL: - # send BAM - self.__send_tp_bam(src_address, priority, pgn.value, message_size, num_packets) - - # init new buffer for this connection - self._snd_buffer[buffer_hash] = { - "pgn": pgn.value, - "priority": priority, - "message_size": message_size, - "num_packages": num_packets, - "data": data, - "state": self.SendBufferState.SENDING_BM, - "deadline": time.time() + self._minimum_tp_bam_dt_interval, - 'src_address' : src_address, - 'dest_address' : ParameterGroupNumber.Address.GLOBAL, - 'next_packet_to_send' : 0, - } - else: - # send RTS/CTS - pgn.pdu_specific = 0 # this is 0 for peer-to-peer transfer - # init new buffer for this connection - self._snd_buffer[buffer_hash] = { - "pgn": pgn.value, - "priority": priority, - "message_size": message_size, - "num_packages": num_packets, - "data": data, - "state": self.SendBufferState.WAITING_CTS, - "deadline": time.time() + self.Timeout.T3, - 'src_address' : src_address, - 'dest_address' : pdu_specific, - 'next_packet_to_send' : 0, - 'next_wait_on_cts': 0, - } - self.__send_tp_rts(src_address, pdu_specific, priority, pgn.value, message_size, num_packets, min(self._max_cmdt_packets, num_packets)) - + self._put_multi_msg(data, dest_address, src_address, priority, pgn, buffer_hash, pdu_specific) self.__job_thread_wakeup() return True @@ -175,6 +211,13 @@ def async_job_thread(self, now): # TODO: should we notify our CAs about the cancelled transfer? del self._rcv_buffer[bufid] + # get from que if buffer is empty + if not bool(self._snd_buffer): + for key, value in self._snd_que.items(): + self._put_multi_msg(**value) + self._snd_que.pop(key) + break + # check send buffers # using "list(x)" to prevent "RuntimeError: dictionary changed size during iteration" for bufid in list(self._snd_buffer):