Skip to content

Conversation

@Marenz
Copy link
Contributor

@Marenz Marenz commented Jan 21, 2026

This PR proposes a solution to the stacked resampler timing issue. I'm opening this as an RFC to discuss the approach before finalizing.

Problem

When resampling already-resampled data (e.g., 1s samples → 15min aggregates), there's a race condition at window boundaries. If both resamplers fire at the same moment, the higher-level resampler may process BEFORE the lower-level one has emitted its boundary sample.

Minimal reproduction

import asyncio
from datetime import datetime, timedelta, timezone
from frequenz.channels import Broadcast
from frequenz.quantities import Quantity
from frequenz.sdk.timeseries import Sample, ResamplerConfig
from frequenz.sdk.timeseries._resampling._resampler import Resampler

async def demo():
    samples_received = []

    def track_samples(samples, config, props):
        samples_received.append(list(samples))
        return sum(v for _, v in samples) / len(samples) if samples else float("nan")

    # First resampler: 2s periods
    first = Resampler(ResamplerConfig(
        resampling_period=timedelta(seconds=2),
        max_data_age_in_periods=1.0,
    ))

    # Second resampler: 4s periods (consumes first's output)
    second = Resampler(ResamplerConfig(
        resampling_period=timedelta(seconds=4),
        max_data_age_in_periods=1.0,
        resampling_function=track_samples,
    ))

    raw_chan = Broadcast[Sample[Quantity]](name="raw")
    inter_chan = Broadcast[Sample[Quantity]](name="inter")

    async def forward(sample):
        await inter_chan.new_sender().send(sample)

    first.add_timeseries("first", raw_chan.new_receiver(), forward)
    second.add_timeseries("second", inter_chan.new_receiver(), lambda s: None)

    # Send samples
    ts = datetime.now(timezone.utc)
    for i in range(1, 9):
        await raw_chan.new_sender().send(
            Sample(ts + timedelta(seconds=i), Quantity(float(i)))
        )

    # t=4: Both fire concurrently - THIS IS THE BUG
    await asyncio.sleep(4.0)
    await asyncio.gather(
        second.resample(one_shot=True),  # Runs before first emits!
        first.resample(one_shot=True),
    )

    # Expected: 2 samples per window
    # Actual: 1 sample per window (boundary sample missing)
    print(f"Samples per window: {[len(s) for s in samples_received]}")

Impact

  • For 15-minute windows losing 1 second: ~0.1% data loss
  • For aggregation functions like sum or max: error compounds
  • For shorter higher-level periods: more significant impact

Proposed Solution

This PR adds:

  1. Resampler.trigger(timestamp) - Allows external control of when resampling occurs, without waiting for the internal timer.

  2. ResamplerStack - Coordinates multiple resamplers:

    • Executes them in dependency order (lower-level first)
    • Adds yields between each to ensure channel delivery
    • For continuous mode: uses a single GCD-based timer

Usage

from frequenz.sdk.timeseries import Resampler, ResamplerConfig, ResamplerStack

first = Resampler(ResamplerConfig(resampling_period=timedelta(seconds=2)))
second = Resampler(ResamplerConfig(resampling_period=timedelta(seconds=4)))

# Stack them (lower-level first)
stack = ResamplerStack([first, second])

# Run - handles ordering automatically
await stack.resample()

Alternatives Considered

  1. Add asyncio.sleep(0) yields inside Resampler.resample() - Tried this but it doesn't reliably solve the problem because both resamplers enter the timer loop before either processes.

  2. Emit samples slightly before window boundary - Would require changing timestamp semantics.

  3. Use open intervals (samples at exactly window_end go to next window) - Breaking change to current behavior.

  4. Have higher-level resamplers wait longer - Fragile, doesn't guarantee ordering.

Questions for Discussion

  1. Is ResamplerStack the right abstraction? Should this be handled differently?

  2. Should the continuous mode use WallClockTimer instead of Timer for better alignment?

  3. Should we also provide a way to automatically detect stacked resamplers (e.g., via channel inspection)?

  4. Is exporting Resampler publicly the right approach, or should users only use ResamplerStack?

Add Resampler.trigger() method and ResamplerStack class to solve the
timing issue when stacking resamplers (resampling already-resampled
data). When both resamplers fire simultaneously, the higher-level one
may process before the lower-level one emits its boundary sample.

Signed-off-by: Mathias L. Baumann <mathias.baumann@frequenz.com>
@Marenz Marenz requested a review from a team as a code owner January 21, 2026 13:58
@Marenz Marenz requested review from shsms and removed request for a team January 21, 2026 13:58
@github-actions github-actions bot added part:tests Affects the unit, integration and performance (benchmarks) tests part:data-pipeline Affects the data pipeline labels Jan 21, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

part:data-pipeline Affects the data pipeline part:tests Affects the unit, integration and performance (benchmarks) tests

Projects

Status: To do

Development

Successfully merging this pull request may close these issues.

1 participant