Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions src/frequenz/sdk/timeseries/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
ResamplingFunction2,
)
from ._resampling._exceptions import ResamplingError, SourceStoppedError
from ._resampling._resampler import Resampler, ResamplerStack
from ._resampling._wall_clock_timer import (
ClocksInfo,
TickInfo,
Expand All @@ -68,8 +69,10 @@
"MovingWindow",
"PeriodicFeatureExtractor",
"ReceiverFetcher",
"Resampler",
"ResamplerConfig",
"ResamplerConfig2",
"ResamplerStack",
"ResamplingError",
"ResamplingFunction",
"ResamplingFunction2",
Expand Down
176 changes: 173 additions & 3 deletions src/frequenz/sdk/timeseries/_resampling/_resampler.py
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,35 @@ def remove_timeseries(self, source: Source) -> bool:
return False
return True

async def trigger(self, timestamp: datetime) -> None:
"""Trigger resampling at the specified timestamp.

This method allows external control of when resampling occurs, which is
useful for coordinating multiple stacked resamplers via `ResamplerStack`.

Unlike `resample()`, this method does not wait for a timer - it immediately
triggers resampling for all timeseries at the given timestamp.

Args:
timestamp: The timestamp to use for the resampling window end.

Raises:
ResamplingError: If any timeseries source or sink encounters errors.
"""
resampler_sources = list(self._resamplers)
results = await asyncio.gather(
*[r.resample(timestamp) for r in self._resamplers.values()],
return_exceptions=True,
)

exceptions = {
source: result
for source, result in zip(resampler_sources, results)
if isinstance(result, (Exception, asyncio.CancelledError))
}
if exceptions:
raise ResamplingError(exceptions)

async def resample(self, *, one_shot: bool = False) -> None:
"""Start resampling all known timeseries.

Expand Down Expand Up @@ -314,9 +343,9 @@ def _update_source_sample_period(self, now: datetime) -> bool:
Returns:
Whether the source sample period was changed (was really updated).
"""
assert (
self._buffer.maxlen is not None and self._buffer.maxlen > 0
), "We need a maxlen of at least 1 to update the sample period"
assert self._buffer.maxlen is not None and self._buffer.maxlen > 0, (
"We need a maxlen of at least 1 to update the sample period"
)

config = self._config
props = self._source_properties
Expand Down Expand Up @@ -565,3 +594,144 @@ async def resample(self, timestamp: datetime) -> None: # noqa: DOC503
raise SourceStoppedError(self._source)

await self._sink(self._helper.resample(timestamp))


class ResamplerStack:
"""Manages a stack of resamplers that feed into each other.

When stacking resamplers (resampling already-resampled data), there's a timing
issue: if both resamplers fire at the same moment, the higher-level resampler
may process BEFORE the lower-level one has emitted its boundary sample, causing
data loss.

This class solves the problem by:
1. Using a single coordinating timer based on the GCD of all resampler periods
2. Executing resamplers in the correct order (lower-level first)
3. Adding yields between resamplers to ensure channel delivery completes

Example:
```python
# First resampler: raw data -> 2 second samples
first_resampler = Resampler(ResamplerConfig(resampling_period=timedelta(seconds=2)))

# Second resampler: 2 second samples -> 4 second samples
second_resampler = Resampler(ResamplerConfig(resampling_period=timedelta(seconds=4)))

# Stack them (order matters: lower-level first)
stack = ResamplerStack([first_resampler, second_resampler])

# Run the stack continuously - handles ordering automatically
await stack.resample()
```
"""

def __init__(self, resamplers: list[Resampler]) -> None:
"""Initialize a resampler stack.

Args:
resamplers: List of resamplers in dependency order. Lower-level resamplers
(those that produce data consumed by others) should come first.
"""
self._resamplers = resamplers
self._timer: Timer | None = None

async def stop(self) -> None:
"""Stop all resamplers in the stack."""
await asyncio.gather(*[r.stop() for r in self._resamplers])

async def resample(self, *, one_shot: bool = False) -> None:
"""Run all resamplers in the stack in the correct order.

This method ensures that lower-level resamplers emit their samples before
higher-level resamplers process, preventing boundary sample loss.

For continuous mode, a single coordinating timer is used based on the GCD
of all resampler periods. Each resampler is triggered only when its period
aligns with the current tick.

Args:
one_shot: Whether to run only one resampling cycle.

Raises:
ResamplingError: If any resampler encounters an error.
"""
if not self._resamplers:
return

if one_shot:
await self._run_one_shot()
else:
await self._run_continuous()

async def _run_one_shot(self) -> None:
"""Run one resampling cycle for all resamplers."""
for resampler in self._resamplers:
await resampler.resample(one_shot=True)
# Yield to allow channel delivery to complete
await asyncio.sleep(0)

async def _run_continuous(self) -> None:
"""Run resamplers continuously with a coordinating timer.

Uses the GCD of all resampler periods as the base tick interval.
On each tick, determines which resamplers should fire and triggers
them in order.
"""
periods = [r.config.resampling_period for r in self._resamplers]
gcd_period = self._gcd_timedeltas(periods)

# Create a single coordinating timer
self._timer = Timer(gcd_period, TriggerAllMissed())

# Track when each resampler should next fire
# Start with the current time aligned to the GCD period
now = datetime.now(timezone.utc)
next_fires: list[datetime] = []
for resampler in self._resamplers:
period = resampler.config.resampling_period
align_to = resampler.config.align_to or now
# Calculate next fire time aligned to the resampler's period
elapsed = (now - align_to) % period
if elapsed:
next_fire = now + period - elapsed
else:
next_fire = now + period
next_fires.append(next_fire)

async for _ in self._timer:
tick_time = datetime.now(timezone.utc)

# Trigger resamplers whose time has come, in order
for i, resampler in enumerate(self._resamplers):
if tick_time >= next_fires[i]:
await resampler.trigger(next_fires[i])
# Update next fire time
next_fires[i] += resampler.config.resampling_period
# Yield to allow channel delivery
await asyncio.sleep(0)

@staticmethod
def _gcd_timedeltas(deltas: list[timedelta]) -> timedelta:
"""Calculate the GCD of a list of timedeltas.

Args:
deltas: List of timedeltas to find GCD of.

Returns:
The GCD as a timedelta.
"""
if not deltas:
return timedelta(seconds=1)

def gcd(a: int, b: int) -> int:
while b:
a, b = b, a % b
return a

# Convert to microseconds for integer GCD
micros = [int(d.total_seconds() * 1_000_000) for d in deltas]
result = micros[0]
for m in micros[1:]:
result = gcd(result, m)

return timedelta(microseconds=result)
Loading
Loading