diff --git a/modbus_tcp/__init__.py b/modbus_tcp/__init__.py index a4f3779ce..d2202ade0 100755 --- a/modbus_tcp/__init__.py +++ b/modbus_tcp/__init__.py @@ -27,23 +27,22 @@ from lib.model.smartplugin import SmartPlugin from datetime import datetime import threading - +import asyncio +import struct +import logging from .webif import WebInterface from pymodbus.constants import Endian from pymodbus.payload import BinaryPayloadDecoder from pymodbus.payload import BinaryPayloadBuilder - -from pymodbus.client.tcp import ModbusTcpClient from pymodbus import ModbusException -import struct +# pymodbus async client +from pymodbus.client import AsyncModbusTcpClient -import logging AttrAddress = 'modBusAddress' AttrType = 'modBusDataType' - AttrFactor = 'modBusFactor' AttrByteOrder = 'modBusByteOrder' AttrWordOrder = 'modBusWordOrder' @@ -53,7 +52,6 @@ BAD_VALUE_SINT16 = 0x8000 BAD_VALUE_SINT32 = 0x80000000 - BAD_VALUE_UINT16 = 0xFFFF BAD_VALUE_UINT32 = 0xFFFFFFFF BAD_VALUE_UINT64 = 0xFFFFFFFFFFFFFFFF @@ -65,7 +63,7 @@ class modbus_tcp(SmartPlugin): devices. """ - PLUGIN_VERSION = '1.0.15' + PLUGIN_VERSION = '1.1.0' def __init__(self, sh, *args, **kwargs): """ @@ -85,71 +83,141 @@ def __init__(self, sh, *args, **kwargs): # Call init code of parent class (SmartPlugin) super().__init__() + self._sh = sh self._host = self.get_parameter_value('host') self._port = self.get_parameter_value('port') - self._cycle = self.get_parameter_value('cycle') # the frequency in seconds how often the device should be accessed - if self._cycle == 0: - self._cycle = None - self._crontab = self.get_parameter_value('crontab') # the more complex way to specify the device query frequency + # cycle reinterpretation: we keep the original value for mode decisions + self._cycle_param = self.get_parameter_value('cycle') + self._cycle = self._cycle_param # keep for scheduler flush usage when > 0 + + self._crontab = self.get_parameter_value('crontab') if self._crontab == '': self._crontab = None - if not (self._cycle or self._crontab): - self.logger.error(f"{self.get_fullname()}: no update cycle or crontab set. Modbus will not be queried automatically") self._slaveUnit = self.get_parameter_value('slaveUnit') self._slaveUnitRegisterDependend = False self._pause_item_path = self.get_parameter_value('pause_item') + self._pause_item = None - self._sh = sh self._regToRead = {} self._regToWrite = {} self._pollStatus = {} - self.connected = False - self._Mclient = ModbusTcpClient(self._host, port=self._port) + # Buffer for cycle > 0 mode: store only latest value per reg-key + # { reg_key: {'value': v, 'dt': datetime, 'raw': raw} } + self._latest_values = {} + + # Threading lock for data shared between SmartHomeNG thread(s) and asyncio thread self.lock = threading.Lock() + # Async infrastructure (own event loop thread) + self._loop = None + self._loop_thread = None + self._loop_started_evt = threading.Event() + + self._stop_evt_async = None # asyncio.Event created inside our loop + self._async_main_task = None # main coroutine task + self._connection_task = None # background reconnect/connection keeper + self._reader_task = None # continuous acquisition task + self._client_lock = None # asyncio.Lock to serialize requests on one TCP connection + + self._aclient = None # AsyncModbusTcpClient instance (created in run()/async thread) + + self.connected = False + self.error_count = 0 + + # Small delay between single register reads to avoid a tight loop (still "continuous", but polite) + self._read_inter_request_delay = 0.01 + self.init_webinterface(WebInterface) + # --------------------------------------------------------------------- + # SmartHomeNG lifecycle + # --------------------------------------------------------------------- + def run(self): """ - Run method for the plugin + Start plugin: + - create own asyncio loop in a dedicated thread + - build Modbus connection + - start async connection management and (depending on cycle) acquisition task + - optionally schedule "flush" to items for cycle > 0 """ self.logger.debug(f"Plugin '{self.get_fullname()}': run method called") if self.alive: return - self.alive = True - if self._cycle or self._crontab: - self.error_count = 0 # Initialize error count - self.scheduler_add('poll_device_' + self._host, self.poll_device, cycle=self._cycle, cron=self._crontab, prio=5) - self.logger.debug(f"Plugin '{self.get_fullname()}': run method finished ") + # Start our own event loop thread + self._start_asyncio_thread() + + # Build connection + tasks inside our asyncio loop (NOT blocking here) + self._submit_coro(self._async_start(), descr="async_start") + + # Scheduler is only used as "flush trigger" for buffered mode (cycle > 0). + # For cycle == 0/None: no incoming data is processed (no reads, no flush). + # For cycle < 0: immediate writes from reader task (no flush scheduler). + if self._cycle_param is not None and self._cycle_param > 0: + self.error_count = 0 + self.scheduler_add( + 'flush_items_' + self._host, + self._flush_buffer_to_items, + cycle=self._cycle_param, + cron=self._crontab, + prio=5 + ) + + self.logger.debug(f"Plugin '{self.get_fullname()}': run method finished") def stop(self): """ - Stop method for the plugin + Stop plugin: + - stop scheduler flush + - stop async tasks, close client, stop loop thread cleanly """ - self.alive = False self.logger.debug(f"Plugin '{self.get_fullname()}': stop method called") - self.scheduler_remove('poll_device_' + self._host) - self._Mclient.close() - self.connected = False + self.alive = False + + # Remove flush scheduler (if added) + try: + self.scheduler_remove('flush_items_' + self._host) + except Exception: + pass + + # Stop async side and loop thread + try: + if self._loop and self._loop.is_running(): + fut = asyncio.run_coroutine_threadsafe(self._async_stop(), self._loop) + try: + fut.result(timeout=5) + except Exception: + pass + + # stop loop + self._loop.call_soon_threadsafe(self._loop.stop) + + if self._loop_thread and self._loop_thread.is_alive(): + self._loop_thread.join(timeout=5) + finally: + self._loop = None + self._loop_thread = None + self._aclient = None + self.connected = False + self.logger.debug(f"Plugin '{self.get_fullname()}': stop method finished") + # --------------------------------------------------------------------- + # Item parsing / update + # --------------------------------------------------------------------- + def parse_item(self, item): """ Default plugin parse_item method. Is called when the plugin is initialized. - The plugin can, corresponding to its attribute keywords, decide what to do with - the item in future, like adding it to an internal array for future reference - - :param item: The item to process. """ - # check for pause item - if item.property.path == self._pause_item_path: + if self._pause_item_path and item.property.path == self._pause_item_path: self.logger.debug(f'pause item {item.property.path} registered') self._pause_item = item self.add_item(item, updating=True) @@ -158,7 +226,6 @@ def parse_item(self, item): if self.has_iattr(item.conf, AttrAddress): self.logger.debug(f"parse item: {item}") regAddr = int(self.get_iattr_value(item.conf, AttrAddress)) - objectType = 'HoldingRegister' value = item() dataType = 'uint16' @@ -170,56 +237,133 @@ def parse_item(self, item): if self.has_iattr(item.conf, AttrType): dataType = self.get_iattr_value(item.conf, AttrType) + if self.has_iattr(item.conf, AttrSlaveUnit): slaveUnit = int(self.get_iattr_value(item.conf, AttrSlaveUnit)) - if (slaveUnit) != self._slaveUnit: + if slaveUnit != self._slaveUnit: self._slaveUnitRegisterDependend = True + if self.has_iattr(item.conf, AttrObjectType): objectType = self.get_iattr_value(item.conf, AttrObjectType) - reg = self.makedictkey(objectType,regAddr,slaveUnit) + reg = self.makedictkey(objectType, regAddr, slaveUnit) if self.has_iattr(item.conf, AttrDirection): dataDirection = self.get_iattr_value(item.conf, AttrDirection) + if self.has_iattr(item.conf, AttrFactor): factor = float(self.get_iattr_value(item.conf, AttrFactor)) + if self.has_iattr(item.conf, AttrByteOrder): byteOrderStr = self.get_iattr_value(item.conf, AttrByteOrder) + if self.has_iattr(item.conf, AttrWordOrder): wordOrderStr = self.get_iattr_value(item.conf, AttrWordOrder) - try: # den letzten Teil des Strings extrahieren, in Großbuchstaben und in Endian-Konstante wandeln + try: byteOrder = Endian[(str(byteOrderStr).split('.')[-1]).upper()] except Exception as e: self.logger.warning(f"Invalid byteOrder -> default(Endian.BIG) is used. Error:{e}") byteOrder = Endian.BIG - try: # den letzten Teil des Strings extrahieren, in Großbuchstaben und in Endian-Konstante wandeln + try: wordOrder = Endian[(str(wordOrderStr).split('.')[-1]).upper()] except Exception as e: - self.logger.warning(f"Invalid byteOrder -> default(Endian.BIG) is used. Error:{e}") + self.logger.warning(f"Invalid wordOrder -> default(Endian.BIG) is used. Error:{e}") wordOrder = Endian.BIG - regPara = {'regAddr': regAddr, 'slaveUnit': slaveUnit, 'dataType': dataType, 'factor': factor, - 'byteOrder': byteOrder, - 'wordOrder': wordOrder, 'item': item, 'value': value, 'objectType': objectType, - 'dataDir': dataDirection} + regPara = { + 'regAddr': regAddr, + 'slaveUnit': slaveUnit, + 'dataType': dataType, + 'factor': factor, + 'byteOrder': byteOrder, + 'wordOrder': wordOrder, + 'item': item, + 'value': value, + 'objectType': objectType, + 'dataDir': dataDirection + } + if dataDirection == 'read': self._regToRead.update({reg: regPara}) self.logger.info(f"parse item: {item} Attributes {regPara}") + elif dataDirection == 'read_write': self._regToRead.update({reg: regPara}) self._regToWrite.update({reg: regPara}) self.logger.info(f"parse item: {item} Attributes {regPara}") return self.update_item + elif dataDirection == 'write': self._regToWrite.update({reg: regPara}) self.logger.info(f"parse item: {item} Attributes {regPara}") return self.update_item + else: self.logger.warning("Invalid data direction -> default(read) is used") self._regToRead.update({reg: regPara}) + def update_item(self, item, caller=None, source=None, dest=None): + """ + Item has been updated (write to device). + Writes are scheduled onto the plugin's own asyncio loop (non-blocking). + """ + objectType = 'HoldingRegister' + slaveUnit = self._slaveUnit + dataDirection = 'read' + + # check for pause item + if self._pause_item is not None and item is self._pause_item: + if caller != self.get_shortname(): + self.logger.debug(f'pause item changed to {item()}') + if item() and self.alive: + self.stop() + elif (not item()) and (not self.alive): + self.run() + return + + # ignore changes triggered by ourselves + if caller == self.get_fullname(): + return + + if not self.alive: + return + + if self.has_iattr(item.conf, AttrDirection): + dataDirection = self.get_iattr_value(item.conf, AttrDirection) + if not (dataDirection == 'read_write' or dataDirection == 'write'): + self.logger.debug(f'update_item: {item} Writing is not allowed - selected dataDirection:{dataDirection}') + return + + if self.has_iattr(item.conf, AttrAddress): + regAddr = int(self.get_iattr_value(item.conf, AttrAddress)) + else: + self.logger.warning(f'update_item:{item} Item has no register address') + return + + if self.has_iattr(item.conf, AttrSlaveUnit): + slaveUnit = int(self.get_iattr_value(item.conf, AttrSlaveUnit)) + if slaveUnit != self._slaveUnit: + self._slaveUnitRegisterDependend = True + + if self.has_iattr(item.conf, AttrObjectType): + objectType = self.get_iattr_value(item.conf, AttrObjectType) + + reg = self.makedictkey(objectType, regAddr, slaveUnit) + + if reg in self._regToWrite: + regPara = self._regToWrite[reg] + self.logger.debug(f'update_item:{item} value:{item()} regToWrite: {reg}') + + # IMPORTANT ASYNC TRANSITION: + # schedule non-blocking write coroutine onto our own asyncio loop + self._submit_coro(self.__write_Registers_async(regPara, item()), descr=f"write_{reg}") + + # --------------------------------------------------------------------- + # Logging helper + # --------------------------------------------------------------------- + def log_error(self, message): """ Logs an error message based on error count @@ -236,173 +380,357 @@ def log_error(self, message): if self.error_count % 100 == 0: self.logger.error(f"{message} [Logging suppressed every 100th error]") - def poll_device(self): + # --------------------------------------------------------------------- + # Scheduler flush for cycle > 0 (buffered writes to items) + # --------------------------------------------------------------------- + + def _flush_buffer_to_items(self): """ - Poll data from modbus device - It is called by the scheduler which is set within run() method. + Called by SmartHomeNG scheduler every 'cycle' seconds (cycle > 0). + Writes the latest buffered values to items. + Older values are implicitly discarded because we only keep the latest value per register key. """ if not self.alive: return - - if self.lock.locked(): - self.log_error(f"poll_device already called an not ready for next poll") + # cycle==0/None => do not write anything + if self._cycle_param is None or self._cycle_param == 0: return - with self.lock: - try: - if self._Mclient.connect(): - self.logger.debug(f"connected to {str(self._Mclient)}") - self.connected = True - self.error_count = 0 - else: - self.error_count += 1 - # Logs an error message based on error count - self.log_error(f"could not connect to {self._host}:{self._port}, connection_attempts: {self.error_count}") - self.connected = False - return - - except Exception as e: - self.error_count += 1 - # Logs an error message based on error count - self.log_error(f"connection exception: {str(self._Mclient)} {e}, errors: {self.error_count}") - self.connected = False - return + now = datetime.now() + regCount = 0 - startTime = datetime.now() - regCount = 0 + with self.lock: + # snapshot keys to minimize time under lock + keys = list(self._latest_values.keys()) + + for reg in keys: + with self.lock: + entry = self._latest_values.get(reg) + regPara = self._regToRead.get(reg) + if entry is None or regPara is None: + continue - for reg, regPara in self._regToRead.items(): - try: - raw_value = self.__read_Registers(regPara) - # self.logger.debug(f"value read: {value} type: {type(value)}") - except ModbusException as e: - self.logger.error(f"ModbusException raised while reading: {e}") - break + value = entry.get('value') + dt = entry.get('dt') - if raw_value is None: + # avoid re-writing the same buffered timestamp repeatedly + last_item_write_dt = regPara.get('last_item_write_dt') + if last_item_write_dt is not None and dt is not None and dt <= last_item_write_dt: continue - if self.is_NaN( raw_value, regPara['dataType']): - self.logger.debug(f"value read: {raw_value} type: {type(value)} is a bad Value") - continue - - value = raw_value - if regPara['factor'] != 1 and isinstance(value, (int, float)): - value *= regPara['factor'] - # self.logger.debug(f"value {value} multiply by: {regPara['factor']}") - + regPara['last_item_write_dt'] = dt + + try: item = regPara['item'] item(value, self.get_fullname()) regCount += 1 - if 'read_dt' in regPara: - regPara['last_read_dt'] = regPara['read_dt'] - - if 'value' in regPara: - regPara['last_value'] = regPara['value'] + with self.lock: + if 'read_dt' in regPara: + regPara['last_read_dt'] = regPara.get('read_dt') + if 'value' in regPara: + regPara['last_value'] = regPara.get('value') + regPara['read_dt'] = dt if dt is not None else now + regPara['value'] = value - regPara['read_dt'] = datetime.now() - regPara['value'] = value + except Exception as e: + self.logger.error(f"flush: cannot write item for {reg}: {e}") - endTime = datetime.now() - duration = endTime - startTime - if regCount > 0: - self._pollStatus['last_dt'] = datetime.now() + if regCount > 0: + with self.lock: + self._pollStatus['last_dt'] = now self._pollStatus['regCount'] = regCount - self.logger.debug(f"poll_device: {regCount} register read required {duration} seconds") - def update_item(self, item, caller=None, source=None, dest=None): + self.logger.debug(f"flush_buffer_to_items: wrote {regCount} buffered values") + + # --------------------------------------------------------------------- + # Async loop thread management + # --------------------------------------------------------------------- + + def _start_asyncio_thread(self): + if self._loop_thread and self._loop_thread.is_alive(): + return + + self._loop_started_evt.clear() + self._loop = asyncio.new_event_loop() + self._loop_thread = threading.Thread( + target=self._asyncio_thread_worker, + name=f"modbus_tcp_asyncio_{self._host}", + daemon=True + ) + self._loop_thread.start() + # Wait briefly until loop is running + self._loop_started_evt.wait(timeout=5) + + def _asyncio_thread_worker(self): + """ + Runs in dedicated thread. + Own event loop: required by task. + """ + asyncio.set_event_loop(self._loop) + self._loop_started_evt.set() + try: + self._loop.run_forever() + finally: + # Best-effort cleanup of pending tasks + try: + pending = asyncio.all_tasks(loop=self._loop) + for t in pending: + t.cancel() + if pending: + self._loop.run_until_complete(asyncio.gather(*pending, return_exceptions=True)) + except Exception: + pass + try: + self._loop.close() + except Exception: + pass + + def _submit_coro(self, coro, descr="coro"): """ - Item has been updated + Thread-safe scheduling of a coroutine onto our private event loop. + Never blocks SmartHomeNG threads. + """ + if not self._loop or not self._loop.is_running(): + self.logger.error(f"{self.get_fullname()}: cannot submit {descr}, asyncio loop not running") + return None + fut = asyncio.run_coroutine_threadsafe(coro, self._loop) + + def _done_callback(f): + try: + exc = f.exception() + if exc: + self.logger.error(f"{self.get_fullname()}: async task '{descr}' failed: {exc}") + except Exception as e: + self.logger.error(f"{self.get_fullname()}: async task '{descr}' callback error: {e}") + + fut.add_done_callback(_done_callback) + return fut - This method is called, if the value of an item has been updated by SmartHomeNG. - It should write the changed value out to the device (hardware/interface) that - is managed by this plugin. + # --------------------------------------------------------------------- + # Async start/stop + tasks + # --------------------------------------------------------------------- - :param item: item to be updated towards the plugin - :param caller: if given it represents the callers name - :param source: if given it represents the source - :param dest: if given it represents the dest + async def _async_start(self): """ - objectType = 'HoldingRegister' - slaveUnit = self._slaveUnit - dataDirection = 'read' + Executed inside plugin's own asyncio loop thread. + Must create client + connection in run() context (requirement). + """ + # Create async primitives in the correct loop + self._stop_evt_async = asyncio.Event() + self._client_lock = asyncio.Lock() - # check for pause item - if item is self._pause_item: - if caller != self.get_shortname(): - self.logger.debug(f'pause item changed to {item()}') - if item() and self.alive: - self.stop() - elif not item() and not self.alive: - self.run() - return + # Create client in our loop thread (important for asyncio transports) + self._aclient = AsyncModbusTcpClient(self._host, port=self._port) + self.connected = False + self.error_count = 0 - if caller == self.get_fullname(): - # self.logger.debug(f'item was changed by the plugin itself - caller:{caller} source:{source} dest:{dest}') - return + # Start background connection management (reconnect on drop) + self._connection_task = asyncio.create_task(self._connection_keeper(), name="connection_keeper") - if self.has_iattr(item.conf, AttrDirection): - dataDirection = self.get_iattr_value(item.conf, AttrDirection) - if not (dataDirection == 'read_write' or dataDirection == 'write'): - self.logger.debug(f'update_item: {item} Writing is not allowed - selected dataDirection:{dataDirection}') - return - # else: - # self.logger.debug(f'update_item:{item} dataDirection: {dataDirection}') - if self.has_iattr(item.conf, AttrAddress): - regAddr = int(self.get_iattr_value(item.conf, AttrAddress)) - else: - self.logger.warning(f'update_item:{item} Item has no register address') - return - if self.has_iattr(item.conf, AttrSlaveUnit): - slaveUnit = int(self.get_iattr_value(item.conf, AttrSlaveUnit)) - if (slaveUnit) != self._slaveUnit: - self._slaveUnitRegisterDependend = True - if self.has_iattr(item.conf, AttrObjectType): - objectType = self.get_iattr_value(item.conf, AttrObjectType) - # else: - # self.logger.debug(f'update_item:{item} default modBusObjectTyp: {objectType}') + # Start acquisition task depending on cycle mode + # cycle in (None, 0) => do NOT process incoming data (no reads, no item writes) + if self._cycle_param is not None and self._cycle_param != 0: + self._reader_task = asyncio.create_task(self._acquisition_loop(), name="acquisition_loop") + else: + self.logger.info( + f"{self.get_fullname()}: cycle is None/0 -> no incoming data processing (no reads, no item updates)" + ) - # Dict-key construction: objectType.regAddr.slaveUnit e.g. HoldingRegister.528.1 - reg = self.makedictkey(objectType,regAddr,slaveUnit) - - if reg in self._regToWrite: - with self.lock: - regPara = self._regToWrite[reg] - self.logger.debug(f'update_item:{item} value:{item()} regToWrite: {reg}') - try: - if self._Mclient.connect(): - self.logger.debug(f"connected to {str(self._Mclient)}") - self.connected = True - self.error_count = 0 - else: - self.error_count += 1 - # Logs an error message based on error count - self.log_error(f"could not connect to {self._host}:{self._port}, connection_attempts: {self.error_count}") - self.connected = False - return + async def _async_stop(self): + """ + Executed inside plugin's own asyncio loop thread. + Cancels tasks and closes client. + """ + try: + if self._stop_evt_async: + self._stop_evt_async.set() + + # Cancel tasks + for task in [self._reader_task, self._connection_task, self._async_main_task]: + if task and not task.done(): + task.cancel() + # Close client + try: + if self._aclient: + self._aclient.close() + except Exception: + pass + + self.connected = False + finally: + self._reader_task = None + self._connection_task = None + self._async_main_task = None + self._aclient = None + + async def _connection_keeper(self): + """ + Keeps the TCP connection alive and performs automatic reconnect. + Runs forever until stop event is set. + """ + backoff = 1.0 + while self.alive and self._stop_evt_async and (not self._stop_evt_async.is_set()): + try: + if self._aclient is None: + self._aclient = AsyncModbusTcpClient(self._host, port=self._port) + self.connected = False + + # If pymodbus exposes .connected use it; else rely on our flag + pm_connected = getattr(self._aclient, "connected", None) + if pm_connected is False: + self.connected = False + + if not self.connected: + try: + ok = await self._aclient.connect() except Exception as e: + ok = False self.error_count += 1 - # Logs an error message based on error count - self.log_error(f"connection exception: {str(self._Mclient)} {e}, errors: {self.error_count}") + self.log_error( + f"connection exception: {self._host}:{self._port} {e}, errors: {self.error_count}" + ) + + if ok: + self.connected = True + self.error_count = 0 + backoff = 1.0 + self.logger.info(f"connected to {self._host}:{self._port}") + else: self.connected = False - return + self.error_count += 1 + self.log_error( + f"could not connect to {self._host}:{self._port}, connection_attempts: {self.error_count}" + ) + await asyncio.sleep(backoff) + backoff = min(backoff * 2.0, 30.0) + continue + + # Connected: sleep a bit, acquisition/writes will detect drops on errors + await asyncio.sleep(1.0) + + except asyncio.CancelledError: + break + except Exception as e: + # Never crash the plugin + self.logger.error(f"connection_keeper unexpected error: {e}") + await asyncio.sleep(2.0) + + async def _wait_connected(self): + """ + Wait until connected or stop/alive condition ends. + """ + while self.alive and self._stop_evt_async and (not self._stop_evt_async.is_set()) and (not self.connected): + await asyncio.sleep(0.2) + return self.connected + + async def _acquisition_loop(self): + """ + Continuous async acquisition of Modbus values. + + This replaces classic scheduler polling. Values are acquired asynchronously and: + - cycle > 0: buffered in self._latest_values (only latest per item) + - cycle < 0: written immediately to items (no buffering) + """ + while self.alive and self._stop_evt_async and (not self._stop_evt_async.is_set()): + try: + # Wait for connection (reconnect handled by connection_keeper) + if not await self._wait_connected(): + await asyncio.sleep(0.5) + continue + + startTime = datetime.now() + regCount = 0 + + # Iterate over a snapshot of regToRead to avoid long locks + with self.lock: + regs = list(self._regToRead.items()) + + for reg, regPara in regs: + if not self.alive or self._stop_evt_async.is_set(): + break try: - self.__write_Registers(regPara, item()) + raw_value = await self.__read_Registers_async(regPara) + except ModbusException as e: + self.logger.error(f"ModbusException raised while reading: {e}") + raw_value = None except Exception as e: - self.logger.error(f"something went wrong in the __write_Registers function: {e}") - - def __write_Registers(self, regPara, value): - """Writes a given value to the register given in dict regPara + # Most likely connection drop => mark disconnected and let keeper reconnect + self.logger.error(f"read exception: {e}") + self.connected = False + try: + if self._aclient: + self._aclient.close() + except Exception: + pass + raw_value = None + + if raw_value is None: + await asyncio.sleep(self._read_inter_request_delay) + continue + + if self.is_NaN(raw_value, regPara['dataType']): + await asyncio.sleep(self._read_inter_request_delay) + continue + + value = raw_value + if regPara['factor'] != 1 and isinstance(value, (int, float)): + value *= regPara['factor'] + + dt = datetime.now() + + # cycle < 0: immediate write to item (no buffering, no scheduler delay) + if self._cycle_param is not None and self._cycle_param < 0: + try: + item = regPara['item'] + item(value, self.get_fullname()) + except Exception as e: + self.logger.error(f"immediate item write failed for {reg}: {e}") + with self.lock: + if 'read_dt' in regPara: + regPara['last_read_dt'] = regPara.get('read_dt') + if 'value' in regPara: + regPara['last_value'] = regPara.get('value') + regPara['read_dt'] = dt + regPara['value'] = value + + # cycle > 0: buffer latest only; flush scheduler writes later + else: + with self.lock: + self._latest_values[reg] = {'value': value, 'dt': dt, 'raw': raw_value} + + regCount += 1 + await asyncio.sleep(self._read_inter_request_delay) + + duration = datetime.now() - startTime + if regCount > 0: + with self.lock: + self._pollStatus['last_dt'] = datetime.now() + self._pollStatus['regCount'] = regCount + self.logger.debug(f"acquisition_loop: read {regCount} register(s) in {duration} seconds") + + # Yield control; do not spin too aggressively + await asyncio.sleep(0) + + except asyncio.CancelledError: + break + except Exception as e: + # Never crash the plugin + self.logger.error(f"acquisition_loop unexpected error: {e}") + await asyncio.sleep(1.0) - Args: - regPara (dict): key/value for object type, address, slaveUnit, datatype - value: the value to be written to the register + # --------------------------------------------------------------------- + # Async Modbus IO (read/write) - based on original logic, but await-based + # --------------------------------------------------------------------- - Returns: - _type_: _description_ + async def __write_Registers_async(self, regPara, value): + """ + Async version of __write_Registers(): + - no blocking connect/read/write + - uses persistent AsyncModbusTcpClient + - serializes access with asyncio.Lock """ objectType = regPara['objectType'] address = regPara['regAddr'] @@ -417,11 +745,18 @@ def __write_Registers(self, regPara, value): except: bits = 16 + # Wait for connection (reconnect in background) + if not await self._wait_connected(): + self.log_error(f"write skipped (not connected): {self._host}:{self._port}") + return + if regPara['factor'] != 1: - # self.logger.debug(f"value {value} divided by: {regPara['factor']}") value = value * (1 / regPara['factor']) - self.logger.debug(f"write {value} to {objectType}.{address}.{address} (address.slaveUnit) dataType:{dataTypeStr}") + self.logger.debug( + f"write {value} to {objectType}.{address}.{address} (address.slaveUnit) dataType:{dataTypeStr}" + ) + builder = BinaryPayloadBuilder(byteorder=bo, wordorder=wo) if dataType.lower() == 'uint': @@ -433,6 +768,7 @@ def __write_Registers(self, regPara, value): builder.add_64bit_uint(int(value)) else: self.logger.error(f"Number of bits or datatype not supported : {dataTypeStr}") + return elif dataType.lower() == 'int': if bits == 16: builder.add_16bit_int(int(value)) @@ -442,6 +778,7 @@ def __write_Registers(self, regPara, value): builder.add_64bit_int(int(value)) else: self.logger.error(f"Number of bits or datatype not supported : {dataTypeStr}") + return elif dataType.lower() == 'float': if bits == 32: builder.add_32bit_float(value) @@ -449,6 +786,7 @@ def __write_Registers(self, regPara, value): builder.add_64bit_float(value) else: self.logger.error(f"Number of bits or datatype not supported : {dataTypeStr}") + return elif dataType.lower() == 'string': builder.add_string(value) elif dataType.lower() == 'bit': @@ -461,104 +799,113 @@ def __write_Registers(self, regPara, value): builder.add_bits(value) else: self.logger.error(f"Value is not a bitstring: {value}") + return else: self.logger.error(f"Number of bits or datatype not supported : {dataTypeStr}") - return None - - if objectType == 'Coil': - result = self._Mclient.write_coil(address, value, slave=slaveUnit) - elif objectType == 'HoldingRegister': - registers = builder.to_registers() - result = self._Mclient.write_registers(address, registers, slave=slaveUnit) - elif objectType == 'DiscreteInput': - self.logger.warning(f"this object type cannot be written {objectType}:{address} slaveUnit:{slaveUnit}") return - elif objectType == 'InputRegister': - self.logger.warning(f"this object type cannot be written {objectType}:{address} slaveUnit:{slaveUnit}") - return - else: + + # IMPORTANT ASYNC TRANSITION: actual Modbus write is awaited, never blocking a SmartHomeNG thread + try: + async with self._client_lock: + if objectType == 'Coil': + result = await self._aclient.write_coil(address, value, slave=slaveUnit) + elif objectType == 'HoldingRegister': + registers = builder.to_registers() + result = await self._aclient.write_registers(address, registers, slave=slaveUnit) + elif objectType == 'DiscreteInput': + self.logger.warning(f"this object type cannot be written {objectType}:{address} slaveUnit:{slaveUnit}") + return + elif objectType == 'InputRegister': + self.logger.warning(f"this object type cannot be written {objectType}:{address} slaveUnit:{slaveUnit}") + return + else: + return + except Exception as e: + self.logger.error(f"write exception: {e}") + self.connected = False + try: + if self._aclient: + self._aclient.close() + except Exception: + pass return - if result.isError(): + if result is None or result.isError(): self.logger.error(f"write error: {result} {objectType}.{address}.{slaveUnit} (address.slaveUnit)") - return None - - if 'write_dt' in regPara: - regPara['last_write_dt'] = regPara['write_dt'] - regPara['write_dt'] = datetime.now() - else: - regPara.update({'write_dt': datetime.now()}) - - if 'write_value' in regPara: - regPara['last_write_value'] = regPara['write_value'] - regPara['write_value'] = value - else: - regPara.update({'write_value': value}) - - # regPara['write_dt'] = datetime.now() - # regPara['write_value'] = value + return - def __read_Registers(self, regPara: dict): - """Reads a register from modbus with parameters in passed dict + # Update stats dict (shared with webif) - protect with threading lock + with self.lock: + if 'write_dt' in regPara: + regPara['last_write_dt'] = regPara['write_dt'] + regPara['write_dt'] = datetime.now() + else: + regPara.update({'write_dt': datetime.now()}) - Args: - regPara (dict): key/value for object type, address, slaveUnit, datatype + if 'write_value' in regPara: + regPara['last_write_value'] = regPara['write_value'] + regPara['write_value'] = value + else: + regPara.update({'write_value': value}) - Returns: - int/float/string: the read value + async def __read_Registers_async(self, regPara: dict): + """ + Async version of __read_Registers() - returns decoded value. """ objectType = regPara['objectType'] dataTypeStr = regPara['dataType'] - dataType = ''.join(filter(str.isalpha, dataTypeStr)) # get the base type from eg. 'uint32' --> 'uint' + dataType = ''.join(filter(str.isalpha, dataTypeStr)) bo = regPara['byteOrder'] wo = regPara['wordOrder'] slaveUnit = regPara['slaveUnit'] - registerCount = 0 address = regPara['regAddr'] - value = None try: - bits = int(''.join(filter(str.isdigit, dataTypeStr))) # get only bits from e.g. 'uint32' --> 32 - except: + bits = int(''.join(filter(str.isdigit, dataTypeStr))) + except Exception: bits = 16 if dataType.lower() == 'string': - registerCount = int(bits / 2) # bei string: bits = bytes !! string16 -> 16Byte - 8 registerCount + registerCount = int(bits / 2) # string: bits means bytes -> string16 = 16 bytes -> 8 registers else: registerCount = int(bits / 16) if not self.connected: - self.logger.error(f"not connected to {self._host}:{self._port}") - return - - # self.logger.debug(f"read {objectType}.{address}.{slaveUnit} (address.slaveUnit) regCount:{registerCount}") - if objectType == 'Coil': - result = self._Mclient.read_coils(address, count=registerCount, slave=slaveUnit) - elif objectType == 'DiscreteInput': - result = self._Mclient.read_discrete_inputs(address, count=registerCount, slave=slaveUnit) - elif objectType == 'InputRegister': - result = self._Mclient.read_input_registers(address, count=registerCount, slave=slaveUnit) - elif objectType == 'HoldingRegister': - result = self._Mclient.read_holding_registers(address, count=registerCount, slave=slaveUnit) - else: - self.logger.error(f"{AttrObjectType} not supported: {objectType}") - return + # connection_keeper will reconnect; keep this silent-ish + return None - # https://pymodbus.readthedocs.io/en/latest/source/client.html#client-response-handling - if result.isError(): - self.logger.error(f"read error: {result} {objectType}.{address}.{slaveUnit} (address.slaveUnit) regCount:{registerCount}") - return + # IMPORTANT ASYNC TRANSITION: actual Modbus read is awaited + try: + async with self._client_lock: + if objectType == 'Coil': + result = await self._aclient.read_coils(address, count=registerCount, slave=slaveUnit) + elif objectType == 'DiscreteInput': + result = await self._aclient.read_discrete_inputs(address, count=registerCount, slave=slaveUnit) + elif objectType == 'InputRegister': + result = await self._aclient.read_input_registers(address, count=registerCount, slave=slaveUnit) + elif objectType == 'HoldingRegister': + result = await self._aclient.read_holding_registers(address, count=registerCount, slave=slaveUnit) + else: + self.logger.error(f"{AttrObjectType} not supported: {objectType}") + return None + except Exception as e: + # Connection likely lost + raise e + + if result is None or result.isError(): + self.logger.error( + f"read error: {result} {objectType}.{address}.{slaveUnit} (address.slaveUnit) regCount:{registerCount}" + ) + return None - if objectType == 'Coil': - value = result.bits[0] - elif objectType == 'DiscreteInput': - value = result.bits[0] - elif objectType == 'InputRegister': - decoder = BinaryPayloadDecoder.fromRegisters(result.registers, byteorder=bo, wordorder=wo) - else: - decoder = BinaryPayloadDecoder.fromRegisters(result.registers, byteorder=bo, wordorder=wo) + # Decode + if objectType == 'Coil' or objectType == 'DiscreteInput': + return result.bits[0] - self.logger.debug(f"read {objectType}.{address}.{slaveUnit} (address.slaveUnit) regCount:{registerCount} result:{result}") + decoder = BinaryPayloadDecoder.fromRegisters(result.registers, byteorder=bo, wordorder=wo) + self.logger.debug( + f"read {objectType}.{address}.{slaveUnit} (address.slaveUnit) regCount:{registerCount} result:{result}" + ) try: if dataType.lower() == 'uint': @@ -570,6 +917,7 @@ def __read_Registers(self, regPara: dict): return decoder.decode_64bit_uint() else: self.logger.error(f"Number of bits or datatype not supported : {dataTypeStr}") + elif dataType.lower() == 'int': if bits == 16: return decoder.decode_16bit_int() @@ -579,6 +927,7 @@ def __read_Registers(self, regPara: dict): return decoder.decode_64bit_int() else: self.logger.error(f"Number of bits or datatype not supported : {dataTypeStr}") + elif dataType.lower() == 'float': if bits == 32: return decoder.decode_32bit_float() @@ -586,39 +935,50 @@ def __read_Registers(self, regPara: dict): return decoder.decode_64bit_float() else: self.logger.error(f"Number of bits or datatype not supported : {dataTypeStr}") + elif dataType.lower() == 'string': # bei string: bits = bytes !! string16 -> 16Byte ret = decoder.decode_string(bits) return str(ret, 'ASCII') + elif dataType.lower() == 'bit': - if objectType == 'Coil' or objectType == 'DiscreteInput': - # self.logger.debug(f"read bit value: {value}") - return value - else: - self.logger.debug(f"read bits values: {value.decode_bits()}") - return decoder.decode_bits() + # for register-based bits (not coils/discrete inputs) + return decoder.decode_bits() + else: self.logger.error(f"Number of bits or datatype not supported : {dataTypeStr}") - except struct.error as e: - self.logger.error(f"unable to unpack data for datatype={dataType.lower()} for read {objectType}.{address}.{slaveUnit} (address.slaveUnit) regCount:{registerCount}") + + except struct.error: + self.logger.error( + f"unable to unpack data for datatype={dataType.lower()} for read " + f"{objectType}.{address}.{slaveUnit} (address.slaveUnit) regCount:{registerCount}" + ) + return None + + return None + + # --------------------------------------------------------------------- + # Helpers + # --------------------------------------------------------------------- @staticmethod - def is_NaN( value, dataType: str) -> bool: + def is_NaN(value, dataType: str) -> bool: """ Check if a returned value is a bad value and return True if it is """ if dataType == 'int16': - return value == BAD_VALUE_SINT16 + return value == BAD_VALUE_SINT16 elif dataType == 'int32': - return value == BAD_VALUE_SINT32 + return value == BAD_VALUE_SINT32 elif dataType == 'uint16': - return value == BAD_VALUE_UINT16 + return value == BAD_VALUE_UINT16 elif dataType == 'uint32': return value == BAD_VALUE_UINT32 elif dataType == 'uint64': - return value == BAD_VALUE_UINT64 + return value == BAD_VALUE_UINT64 + return False @staticmethod def makedictkey(objectType: str, regAddr, slaveUnit) -> str: # dictionary key: objectType.regAddr.slaveUnit // HoldingRegister.528.1 - return f"{str(objectType)}.{str(regAddr)}.{str(slaveUnit)}" + return f"{str(objectType)}.{str(regAddr)}.{str(slaveUnit)}" \ No newline at end of file diff --git a/modbus_tcp/plugin.yaml b/modbus_tcp/plugin.yaml index 135c96f36..be3e6d609 100755 --- a/modbus_tcp/plugin.yaml +++ b/modbus_tcp/plugin.yaml @@ -40,10 +40,10 @@ parameters: cycle: type: int default: 300 - valid_min: 0 + valid_min: -1 description: - de: 'Update Zyklus in Sekunden. Wenn der Wert 0 ist, wird keine Abfrage über cycle ausgeführt' - en: 'Update cycle in seconds. If value is 0 then no query will be made by means of cycle' + de: 'Update Zyklus in Sekunden. Wenn der Wert 0 ist, wird keine Abfrage über cycle ausgeführt, wenn er negativ ist, wird sofort wieder ausgeführt' + en: 'Update cycle in seconds. If value is 0 then no query will be made by means of cycle, if negative, it will be executed immediately' crontab: type: str