diff --git a/CHANGELOG.md b/CHANGELOG.md index 1099fbbba..6093479c7 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,7 +1,8 @@ # Changelog -## Ongoing +## v0.44.11 - 2025-08-14 +- Improve reading from energy-logs cache via PR [314](https://github.com/plugwise/python-plugwise-usb/pull/314) - Improve energy-collection via PR [311](https://github.com/plugwise/python-plugwise-usb/pull/311) ## v0.44.10 - 2025-08-11 diff --git a/plugwise_usb/nodes/circle.py b/plugwise_usb/nodes/circle.py index 8d79d9c75..8e644a26b 100644 --- a/plugwise_usb/nodes/circle.py +++ b/plugwise_usb/nodes/circle.py @@ -5,7 +5,7 @@ from asyncio import Task, create_task, gather from collections.abc import Awaitable, Callable from dataclasses import replace -from datetime import UTC, datetime +from datetime import UTC, datetime, timedelta from functools import wraps import logging from math import ceil @@ -80,6 +80,42 @@ _LOGGER = logging.getLogger(__name__) +def _collect_records(data: str) -> dict[int, dict[int, tuple[datetime, int]]]: + """Collect logs from a cache data string.""" + logs: dict[int, dict[int, tuple[datetime, int]]] = {} + log_data = data.split("|") + for log_record in log_data: + log_fields = log_record.split(":") + if len(log_fields) == 4: + address = int(log_fields[0]) + slot = int(log_fields[1]) + pulses = int(log_fields[3]) + # Parse zero-padded timestamp, fallback to manual split + try: + timestamp = datetime.strptime( + log_fields[2], "%Y-%m-%d-%H-%M-%S" + ).replace(tzinfo=UTC) + except ValueError: + parts = log_fields[2].split("-") + if len(parts) != 6: + continue + timestamp = datetime( + year=int(parts[0]), + month=int(parts[1]), + day=int(parts[2]), + hour=int(parts[3]), + minute=int(parts[4]), + second=int(parts[5]), + tzinfo=UTC, + ) + bucket = logs.setdefault(address, {}) + # Keep the first occurrence (cache is newest-first), skip older duplicates + if slot not in bucket: + bucket[slot] = (timestamp, pulses) + + return logs + + def raise_calibration_missing(func: FuncT) -> FuncT: """Validate energy calibration settings are available.""" @@ -381,7 +417,9 @@ async def energy_update(self) -> EnergyStatistics | None: # noqa: PLR0911 PLR09 return None # Try collecting energy-stats for _current_log_address - result = await self.energy_log_update(self._current_log_address, save_cache=True) + result = await self.energy_log_update( + self._current_log_address, save_cache=True + ) if not result: _LOGGER.debug( "async_energy_update | %s | Log rollover | energy_log_update from address %s failed", @@ -415,7 +453,9 @@ async def energy_update(self) -> EnergyStatistics | None: # noqa: PLR0911 PLR09 return self._energy_counters.energy_statistics if len(missing_addresses) == 1: - result = await self.energy_log_update(missing_addresses[0], save_cache=True) + result = await self.energy_log_update( + missing_addresses[0], save_cache=True + ) if result: await self.power_update() _LOGGER.debug( @@ -528,9 +568,10 @@ async def get_missing_energy_logs(self) -> None: if self._cache_enabled: await self._energy_log_records_save_to_cache() - async def energy_log_update(self, address: int | None, save_cache: bool = True) -> bool: + async def energy_log_update( + self, address: int | None, save_cache: bool = True + ) -> bool: """Request energy logs and return True only when at least one recent, non-empty record was stored; otherwise return False.""" - any_record_stored = False if address is None: return False @@ -553,6 +594,7 @@ async def energy_log_update(self, address: int | None, save_cache: bool = True) # Forward historical energy log information to energy counters # Each response message contains 4 log counters (slots) of the # energy pulses collected during the previous hour of given timestamp + cache_updated = False for _slot in range(4, 0, -1): log_timestamp, log_pulses = response.log_data[_slot] _LOGGER.debug( @@ -567,34 +609,32 @@ async def energy_log_update(self, address: int | None, save_cache: bool = True) self._energy_counters.add_empty_log(response.log_address, _slot) continue - await self._energy_log_record_update_state( + cache_updated = await self._energy_log_record_update_state( response.log_address, _slot, log_timestamp.replace(tzinfo=UTC), log_pulses, import_only=True, ) - any_record_stored = True self._energy_counters.update() - if any_record_stored and self._cache_enabled and save_cache: + if cache_updated and save_cache: _LOGGER.debug( "Saving energy record update to cache for %s", self._mac_in_str ) await self.save_cache() - return any_record_stored + return True def _check_timestamp_is_recent( self, address: int, slot: int, timestamp: datetime ) -> bool: """Check if a log record timestamp is within the last MAX_LOG_HOURS hours.""" age_seconds = max( - 0.0, - (datetime.now(tz=UTC) - timestamp.replace(tzinfo=UTC)).total_seconds() + 0.0, (datetime.now(tz=UTC) - timestamp.replace(tzinfo=UTC)).total_seconds() ) if age_seconds > MAX_LOG_HOURS * 3600: - _LOGGER.warning( + _LOGGER.info( "EnergyLog from Node %s | address %s | slot %s | timestamp %s is outdated, ignoring...", self._mac_in_str, address, @@ -611,37 +651,29 @@ async def _energy_log_records_load_from_cache(self) -> bool: "Failed to restore energy log records from cache for node %s", self.name ) return False - restored_logs: dict[int, list[int]] = {} if cache_data == "": _LOGGER.debug("Cache-record is empty") return False - log_data = cache_data.split("|") - for log_record in log_data: - log_fields = log_record.split(":") - if len(log_fields) == 4: - timestamp_energy_log = log_fields[2].split("-") - if len(timestamp_energy_log) == 6: - address = int(log_fields[0]) - slot = int(log_fields[1]) - self._energy_counters.add_pulse_log( - address=address, - slot=slot, - timestamp=datetime( - year=int(timestamp_energy_log[0]), - month=int(timestamp_energy_log[1]), - day=int(timestamp_energy_log[2]), - hour=int(timestamp_energy_log[3]), - minute=int(timestamp_energy_log[4]), - second=int(timestamp_energy_log[5]), - tzinfo=UTC, - ), - pulses=int(log_fields[3]), - import_only=True, - ) - if restored_logs.get(address) is None: - restored_logs[address] = [] - restored_logs[address].append(slot) + collected_logs = _collect_records(cache_data) + + # Cutoff timestamp for filtering + skip_before = datetime.now(tz=UTC) - timedelta(hours=MAX_LOG_HOURS) + + # Iterate in reverse sorted order directly + for address in sorted(collected_logs, reverse=True): + for slot in sorted(collected_logs[address].keys(), reverse=True): + (timestamp, pulses) = collected_logs[address][slot] + # Keep only recent entries; prune older-or-equal than cutoff + if timestamp <= skip_before: + continue + self._energy_counters.add_pulse_log( + address=address, + slot=slot, + pulses=pulses, + timestamp=timestamp, + import_only=True, + ) self._energy_counters.update() @@ -670,19 +702,19 @@ async def _energy_log_records_save_to_cache(self) -> None: logs: dict[int, dict[int, PulseLogRecord]] = ( self._energy_counters.get_pulse_logs() ) - cached_logs = "" - for address in sorted(logs.keys(), reverse=True): - for slot in sorted(logs[address].keys(), reverse=True): - log = logs[address][slot] - if cached_logs != "": - cached_logs += "|" - cached_logs += f"{address}:{slot}:{log.timestamp.year}" - cached_logs += f"-{log.timestamp.month}-{log.timestamp.day}" - cached_logs += f"-{log.timestamp.hour}-{log.timestamp.minute}" - cached_logs += f"-{log.timestamp.second}:{log.pulses}" - + # Efficiently serialize newest-first (logs is already sorted) + records: list[str] = [] + for address, record in logs.items(): + for slot, log in record.items(): + ts = log.timestamp + records.append( + f"{address}:{slot}:{ts.strftime('%Y-%m-%d-%H-%M-%S')}:{log.pulses}" + ) + cached_logs = "|".join(records) _LOGGER.debug("Saving energy logrecords to cache for %s", self._mac_in_str) self._set_cache(CACHE_ENERGY_COLLECTION, cached_logs) + # Persist new cache entries to disk immediately + await self.save_cache(trigger_only=True) async def _energy_log_record_update_state( self, @@ -699,21 +731,25 @@ async def _energy_log_record_update_state( if not self._cache_enabled: return False - log_cache_record = f"{address}:{slot}:{timestamp.year}" - log_cache_record += f"-{timestamp.month}-{timestamp.day}" - log_cache_record += f"-{timestamp.hour}-{timestamp.minute}" - log_cache_record += f"-{timestamp.second}:{pulses}" + log_cache_record = ( + f"{address}:{slot}:{timestamp.strftime('%Y-%m-%d-%H-%M-%S')}:{pulses}" + ) if (cached_logs := self._get_cache(CACHE_ENERGY_COLLECTION)) is not None: - if log_cache_record not in cached_logs: + entries = cached_logs.split("|") if cached_logs else [] + if log_cache_record not in entries: _LOGGER.debug( "Adding logrecord (%s, %s) to cache of %s", str(address), str(slot), self._mac_in_str, ) - self._set_cache( - CACHE_ENERGY_COLLECTION, cached_logs + "|" + log_cache_record + new_cache = ( + f"{log_cache_record}|{cached_logs}" + if cached_logs + else log_cache_record ) + self._set_cache(CACHE_ENERGY_COLLECTION, new_cache) + await self.save_cache(trigger_only=True) return True _LOGGER.debug( diff --git a/pyproject.toml b/pyproject.toml index cb97c1a36..2840f0420 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta" [project] name = "plugwise_usb" -version = "0.44.10" +version = "0.44.11a8" license = "MIT" keywords = ["home", "automation", "plugwise", "module", "usb"] classifiers = [