Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions j1939/electronic_control_unit.py
Original file line number Diff line number Diff line change
Expand Up @@ -267,8 +267,10 @@ def send_message(self, can_id, extended_id, data, fd_format=False):
bitrate_switch=fd_format
)
with self._send_lock:
self._bus.send(msg)
# TODO: check error receivement
try:
self._bus.send(msg)
except can.CanError as e:
logger.error(f'not able to send message because {e}')

def notify(self, can_id, data, timestamp):
"""Feed incoming CAN message into this ecu.
Expand Down
127 changes: 85 additions & 42 deletions j1939/j1939_21.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ def __init__(self, send_message, job_thread_wakeup, notify_subscribers, max_cmdt
self._rcv_buffer = {}
# Send buffers
self._snd_buffer = {}
# send que
self._snd_que = {}

# List of ControllerApplication
self._cas = []
Expand All @@ -51,7 +53,7 @@ def __init__(self, send_message, job_thread_wakeup, notify_subscribers, max_cmdt
self._minimum_tp_rts_cts_dt_interval = minimum_tp_rts_cts_dt_interval

# set minimum time between two tp-bam messages
if minimum_tp_bam_dt_interval == None:
if minimum_tp_bam_dt_interval is None:
self._minimum_tp_bam_dt_interval = self.Timeout.Tb
else:
self._minimum_tp_bam_dt_interval = minimum_tp_bam_dt_interval
Expand Down Expand Up @@ -89,7 +91,72 @@ def _buffer_hash(self, src_address, dest_address):
"""
return ((src_address & 0xFF) << 8) | (dest_address & 0xFF)

def send_pgn(self, data_page, pdu_format, pdu_specific, priority, src_address, data, time_limit, frame_format):

def _put_multi_msg(self, data, dest_address, src_address,priority, pgn, buffer_hash,pdu_specific):
message_size = len(data)
num_packets = (
int(message_size / 7)
if (message_size % 7 == 0)
else int(message_size / 7) + 1
)

# if the PF is between 240 and 255, the message can only be broadcast
if dest_address == ParameterGroupNumber.Address.GLOBAL:
# send BAM
self.__send_tp_bam(
src_address, priority, pgn.value, message_size, num_packets
)
# init new buffer for this connection
self._snd_buffer[buffer_hash] = {
"pgn": pgn.value,
"priority": priority,
"message_size": message_size,
"num_packages": num_packets,
"data": data,
"state": self.SendBufferState.SENDING_BM,
"deadline": time.time() + self._minimum_tp_bam_dt_interval,
"src_address": src_address,
"dest_address": ParameterGroupNumber.Address.GLOBAL,
"next_packet_to_send": 0,
}
else:
# send RTS/CTS
pgn.pdu_specific = 0 # this is 0 for peer-to-peer transfer
# init new buffer for this connection
self._snd_buffer[buffer_hash] = {
"pgn": pgn.value,
"priority": priority,
"message_size": message_size,
"num_packages": num_packets,
"data": data,
"state": self.SendBufferState.WAITING_CTS,
"deadline": time.time() + self.Timeout.T3,
"src_address": src_address,
"dest_address": pdu_specific,
"next_packet_to_send": 0,
"next_wait_on_cts": 0,
}
self.__send_tp_rts(
src_address,
pdu_specific,
priority,
pgn.value,
message_size,
num_packets,
min(self._max_cmdt_packets, num_packets),
)

def send_pgn(
self,
data_page,
pdu_format,
pdu_specific,
priority,
src_address,
data,
time_limit,
frame_format,
):
pgn = ParameterGroupNumber(data_page, pdu_format, pdu_specific)
if len(data) <= 8:
# send normal message
Expand All @@ -108,47 +175,16 @@ def send_pgn(self, data_page, pdu_format, pdu_specific, priority, src_address, d
buffer_hash = self._buffer_hash(src_address, dest_address)
if buffer_hash in self._snd_buffer:
# There is already a sequence active for this pair
# put in que
self._snd_que[pgn.value] = {'buffer_hash': buffer_hash,
'pdu_specific': pdu_specific,
'priority': priority,
'src_address':src_address,
'pgn': pgn,
'data': data,
'dest_address': dest_address}
return False
message_size = len(data)
num_packets = int(message_size / 7) if (message_size % 7 == 0) else int(message_size / 7) + 1

# if the PF is between 240 and 255, the message can only be broadcast
if dest_address == ParameterGroupNumber.Address.GLOBAL:
# send BAM
self.__send_tp_bam(src_address, priority, pgn.value, message_size, num_packets)

# init new buffer for this connection
self._snd_buffer[buffer_hash] = {
"pgn": pgn.value,
"priority": priority,
"message_size": message_size,
"num_packages": num_packets,
"data": data,
"state": self.SendBufferState.SENDING_BM,
"deadline": time.time() + self._minimum_tp_bam_dt_interval,
'src_address' : src_address,
'dest_address' : ParameterGroupNumber.Address.GLOBAL,
'next_packet_to_send' : 0,
}
else:
# send RTS/CTS
pgn.pdu_specific = 0 # this is 0 for peer-to-peer transfer
# init new buffer for this connection
self._snd_buffer[buffer_hash] = {
"pgn": pgn.value,
"priority": priority,
"message_size": message_size,
"num_packages": num_packets,
"data": data,
"state": self.SendBufferState.WAITING_CTS,
"deadline": time.time() + self.Timeout.T3,
'src_address' : src_address,
'dest_address' : pdu_specific,
'next_packet_to_send' : 0,
'next_wait_on_cts': 0,
}
self.__send_tp_rts(src_address, pdu_specific, priority, pgn.value, message_size, num_packets, min(self._max_cmdt_packets, num_packets))

self._put_multi_msg(data, dest_address, src_address, priority, pgn, buffer_hash, pdu_specific)
self.__job_thread_wakeup()

return True
Expand All @@ -175,6 +211,13 @@ def async_job_thread(self, now):
# TODO: should we notify our CAs about the cancelled transfer?
del self._rcv_buffer[bufid]

# get from que if buffer is empty
if not bool(self._snd_buffer):
for key, value in self._snd_que.items():
self._put_multi_msg(**value)
self._snd_que.pop(key)
break

# check send buffers
# using "list(x)" to prevent "RuntimeError: dictionary changed size during iteration"
for bufid in list(self._snd_buffer):
Expand Down