Skip to content

Genericise Flow Metrics' RetentionWindow#18624

Merged
yaauie merged 19 commits intoelastic:mainfrom
yaauie:flow-metrics-genericise
Feb 10, 2026
Merged

Genericise Flow Metrics' RetentionWindow#18624
yaauie merged 19 commits intoelastic:mainfrom
yaauie:flow-metrics-genericise

Conversation

@yaauie
Copy link
Member

@yaauie yaauie commented Jan 22, 2026

Release notes

[rn: skip]

What does this PR do?

This NET ZERO CHANGE refactor extracts the shareable internals of FlowMetrics.RetentionWindow into its own abstract class so that they can be shared by the incoming histogram implementation.

The initial implementation of FlowMetrics.RetentionWindow was heavily optimized around the nature of rates of change being calculable from first and last entries without a need for intermediate values; it preferred throwing captures away whenever doing so didn't impact its ability to meet the retention policy.

This refactor extracts RetentionWindow as an abstract generic class, and moves the responsibility for merging two captures and for calculating the value into subclasses so that the implementations can provide meaningful specifics without knowing anything else about the linked list data-structures or atomic-operations.

The ExtendedFlowMetric provided its lifetime values in a hacky way that couldn't be reused, so I also migrated the FlowMetricRetentionPolicy to a form that supports a lifetime value that effectively has an infinitely large resolution to avoid intermediate captures.

Checklist

  • My code follows the style guidelines of this project
  • I have commented my code, particularly in hard-to-understand areas
  • [ ] I have made corresponding changes to the documentation
  • [ ] I have made corresponding change to the default configuration files (and/or docker env variables)
  • I have added tests that prove my fix is effective or that my feature works

@yaauie yaauie requested a review from andsel January 22, 2026 23:32
@github-actions
Copy link
Contributor

🤖 GitHub comments

Just comment with:

  • run docs-build : Re-trigger the docs validation. (use unformatted text in the comment!)
  • /run exhaustive tests : Run the exhaustive tests Buildkite pipeline.

@mergify
Copy link
Contributor

mergify bot commented Jan 22, 2026

This pull request does not have a backport label. Could you fix it @yaauie? 🙏
To fixup this pull request, you need to add the backport labels for the needed
branches, such as:

  • backport-8./d is the label to automatically backport to the 8./d branch. /d is the digit.
  • If no backport is necessary, please add the backport-skip label

Comment on lines 75 to 107
NodeStagingPair<CAPTURE> oldTailWithStaging = this.tail.getAndUpdate((committedNodeStagePair) -> {
final Node<CAPTURE> committedNode = committedNodeStagePair.committed;
final CAPTURE committedCapture = committedNode.capture;
final CAPTURE stagedCapture = committedNodeStagePair.staged;

final CAPTURE captureToCommit;
final CAPTURE captureToStage;

if (Objects.isNull(committedCapture)) {
// if we don't have a commit yet, commit.
if (Objects.nonNull(stagedCapture)) {
captureToCommit = stagedCapture;
captureToStage = newestCapture;
} else {
captureToCommit = newestCapture;
captureToStage = null;
}
} else if (Objects.nonNull(stagedCapture) && committedCapture.nanoTime() < policy.commitBarrierNanos(newestCapture.nanoTime())) {
// if the gap between newest and committed is bigger than resolution, commit staged and stage new
captureToCommit = stagedCapture;
captureToStage = newestCapture;
} else {
// otherwise merge into our stage
captureToCommit = committedCapture;
captureToStage = mergeCaptures(stagedCapture, newestCapture);
}

// apply our changes, keeping the committed Node if we're committing its capture.
final Node<CAPTURE> nodeToCommit = (Objects.equals(committedCapture, captureToCommit) ? committedNode : new Node<>(captureToCommit));
newTailWithStaging.set(nodeToCommit, captureToStage);

return newTailWithStaging;
});
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Additional context:

This is a re-implementation of the algorithm, primarily focused on using an atomically-swappable pair of values (a committed tail-Node<CAPTURE> plus a staged CAPTURE) instead of two separately-atomic values so that we can use AtomicReference#getAndUpdate to atomically swap a new pair in.

If the action results in a new node being in this pair, we know that the node is detached (not linked to by the previous tail) and must link the former tail node to it.

The code here defers out to the abstract function mergeCaptures so that the implementation can provide a meaningful merge action. In the case of FlowMetrics (which are okay dropping intermediate values), the implementation simply selects the youngest of the two captures.

// ensure we are fully-compact.
assertThat(flowMetric.estimateCapturesRetained(), is(lessThan(250)));
assertThat(flowMetric.estimateExcessRetained(this::maxRetentionPlusMinResolutionBuffer), is(equalTo(Duration.ZERO)));
assertThat(flowMetric.estimateCapturesRetained(), is(lessThan(252)));
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The lifetime metric retains up to 2 nodes, and wasn't previously counted.

private final AtomicReference<Node> tail;
private final AtomicReference<Node> head;
private final FlowMetricRetentionPolicy policy;
static class FlowRetentionWindow extends RetentionWindow<FlowCapture, Double> {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The diff is a little difficult to read here, but our implementation that subclasses the now-abstract RetentionWindow contains all of the logic for merging FlowCaptures and calculating a value:

    static class FlowRetentionWindow extends RetentionWindow<FlowCapture, Double> {

        FlowRetentionWindow(FlowMetricRetentionPolicy policy, FlowCapture zeroCapture) {
            super(policy, zeroCapture);
        }

        @Override
        FlowCapture mergeCaptures(FlowCapture oldCapture, FlowCapture newCapture) {
            if (oldCapture == null) {
                return newCapture;
            }
            if (newCapture == null) {
                return oldCapture;
            }

            return (oldCapture.nanoTime() > newCapture.nanoTime()) ? oldCapture : newCapture;
        }

        @Override
        Optional<Double> calculateValue() {
            return calculateFromBaseline((compareCapture, baselineCapture) -> {
                if (compareCapture == null) { return Optional.empty(); }
                if (baselineCapture == null) { return Optional.empty(); }

                final OptionalDouble rate = calculateRate(compareCapture, baselineCapture);

                // convert from OptionalDouble to Optional<Double>
                return rate.isPresent() ? Optional.of(rate.getAsDouble()) : Optional.empty();
            });
        }
    }

-- ExtendedFlowMetric.FlowRetentionWindow@f2a18b67

Copy link
Contributor

@andsel andsel left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Very nice refactoring, I think that calculateValue is the place to compute the aggregation of histograms, for the histogram metric.
I've left a couple of suggestions to have your feedback.

yaauie and others added 2 commits February 5, 2026 10:36
Co-authored-by: Andrea Selva <selva.andre@gmail.com>
Co-authored-by: Andrea Selva <selva.andre@gmail.com>
@yaauie
Copy link
Member Author

yaauie commented Feb 5, 2026

I think that calculateValue is the place to compute the aggregation of histograms, for the histogram metric.

@andsel: I had seen your previous work on the HdrHistogram front that stored interval histograms (containing only data-points from the window), which would mean that the implementation there would need to walk the linked list to calculate, making the runtime query cost fairly high.

This would be made significantly easier if the values stored for histogram captures were lifetime values, since the HdrHistogram#subtract would allow us to similarly use only the head and youngest captures to calculate a value. It would also reduce the capture-time cost, since we wouldn't need to be merging captures into the staged capture and could simply select the youngest capture between the capture-to-append and the existing staged capture.

@yaauie yaauie requested a review from andsel February 5, 2026 18:55
@elasticmachine
Copy link

💚 Build Succeeded

History

cc @yaauie @andsel

* @param newCapture a new capture to apply on top of the base
* @return a {@link CAPTURE} representing both provided captures
*/
abstract CAPTURE mergeCaptures(final CAPTURE oldCapture, final CAPTURE newCapture);
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

note: this method only really exists because the proposed hdrhistogram implementation stores interval histograms instead of lifetime values, so it needs to merge fresh captures into the staged capture when the staged capture isn't yet old enough to commit.

If the hdrhistogram were to be implemented using lifetime values, then this could be simplified to simply select the later capture in both cases.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The histogram PR does the staging on the client side. It can't create a snapshot for each recorded value and then let the retained window do the merge of these micro snapshot. Instead, it accumulate the measures in the recorder, and that verify when it needs to create a new snapshot, checking that the resolution nanos interval passed.
I don't think that we have to create a new snapshot for each batch size that we measure, that would cost a lot, in particular in memory usage terms.

@andsel
Copy link
Contributor

andsel commented Feb 6, 2026

Hi @yaauie reviewing your comment about subtract and lifetime histograms I've some questions.

This would be made significantly easier if the values stored for histogram captures were lifetime values, since the HdrHistogram#subtract would allow us to similarly use only the head and youngest captures

If I understand correctly you propose to not store incremental deltas of histograms, but snapshots that are absolute pictures of a lifelong accumulation and then using subtract to compute the time delimited view. The idea could be interesting but as I can understand is not feasible with Histogram's recorder, which provides only the snapshots between intervals, so it returns deltas and not absolute histograms. From official HdrHistogram documentation, Recorder is the preferred way to track values in highly concurrent environments.

The metric code has to be fast on taking action but can be slower on the reading side.

the implementation there would need to walk the linked list to calculate, making the runtime query cost fairly high.

Regarding the doubts about the navigation of the list of snapshot when getValue() is queried to provide a metric.
This operation is typically invoked externally via the HTTP API, meaning the call frequency is low (not many calls per millisecond), and it occurs on a "cold path," offline from the primary pipeline workers that record the values.

The RetentionWindow lists contain the following approximate number of snapshot nodes:

  • Last 1 minute: 20 nodes (every 3 seconds)
  • Last 5 minutes: 20 nodes (every 15 seconds)
  • Last 15 minutes: 30 nodes (every 30 seconds)
  • Last 1 hour: 60 nodes (every minute)
  • Last 24 hours: 96 nodes (every 15 minutes)

Given the current plan to use only the first three windows for this specific metric, traversing a maximum of 30 nodes is not anticipated to cause performance issues, especially considering the operation is on the cold path.

Let me know what do you think.

Copy link
Contributor

@andsel andsel left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code changes looks good to me, but have left a couple of comments:

  • one related to the need to the merge operation.
  • the other about to use the subtract from histogram and the possible related performance issue on query side.

@yaauie
Copy link
Member Author

yaauie commented Feb 9, 2026

The idea could be interesting but as I can understand is not feasible with Histogram's recorder, which provides only the snapshots between intervals, so it returns deltas and not absolute histograms. From official HdrHistogram documentation, Recorder is the preferred way to track values in highly concurrent environments.

and

The metric code has to be fast on taking action but can be slower on the reading side.

I agree. But I think we can have the best of both worlds.

Flow metrics are typically registered with a periodic task to perform captures of the underlying metric, instead of making the capture logic be part of the write-time path for that metric. This is intentional, and keeps all of the complexities of managing that state out of the write-time path so that we avoid adding jitter to actual event processing.

Using a similar separation here would be helpful, and would also ensure that we have frequent-enough captures even if we have periods where a given histogram doesn't have additional values being tracked.

The current proposal in #18503 is to have one Recorder instance per RetentionWindow, each of which may conditionally block the current thread after writing to the recorder in order to perform the capture if the retention window's policy requires it. This means that a flow metric with 3 retention windows would have 3 opportunities for a thread to be blocked at write time to perform additional calculations (appending the capture to the window).

Since lifetime values would not need to be differentiated by retention window, we could have a single LifetimeHistogramMetric for each flow metric that uses a recorder internally (so that only a single Recorder#recordValue is on the writer's path). At value-read time (e.g., when the periodic capture task is invoked), we could use Recorder#getIntervalHistogram and merge the result into our lifetime value. A non-blocking spike example of that approach that takes care to return immutable wrappers is here.

While I brought this up in terms of how it enables a latest-minus-baseline calculation in the retention window, I think that the primary value is how it simplifies the metric write-time path.

@andsel andsel self-requested a review February 10, 2026 09:20
Copy link
Contributor

@andsel andsel left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

In the light of the #18624 (comment) this PR LGTM and I'll create a PR to follow the guidance on implementing Lifetime Histogram Metric that replace #18503

@yaauie yaauie merged commit b05f0dd into elastic:main Feb 10, 2026
12 checks passed
@yaauie yaauie deleted the flow-metrics-genericise branch February 10, 2026 16:38
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants