Genericise Flow Metrics' RetentionWindow#18624
Conversation
…ed to compute a flow metric.
…asses od DatapointCapture. - Added type paramters with a constrant to be a subclass of DatapointCapture which is something that have a datapoint capture time - Peeled out baseline method into ExtendedFlowMetric, it's a wrapper on returnHead(nanos)
🤖 GitHub commentsJust comment with:
|
|
This pull request does not have a backport label. Could you fix it @yaauie? 🙏
|
| NodeStagingPair<CAPTURE> oldTailWithStaging = this.tail.getAndUpdate((committedNodeStagePair) -> { | ||
| final Node<CAPTURE> committedNode = committedNodeStagePair.committed; | ||
| final CAPTURE committedCapture = committedNode.capture; | ||
| final CAPTURE stagedCapture = committedNodeStagePair.staged; | ||
|
|
||
| final CAPTURE captureToCommit; | ||
| final CAPTURE captureToStage; | ||
|
|
||
| if (Objects.isNull(committedCapture)) { | ||
| // if we don't have a commit yet, commit. | ||
| if (Objects.nonNull(stagedCapture)) { | ||
| captureToCommit = stagedCapture; | ||
| captureToStage = newestCapture; | ||
| } else { | ||
| captureToCommit = newestCapture; | ||
| captureToStage = null; | ||
| } | ||
| } else if (Objects.nonNull(stagedCapture) && committedCapture.nanoTime() < policy.commitBarrierNanos(newestCapture.nanoTime())) { | ||
| // if the gap between newest and committed is bigger than resolution, commit staged and stage new | ||
| captureToCommit = stagedCapture; | ||
| captureToStage = newestCapture; | ||
| } else { | ||
| // otherwise merge into our stage | ||
| captureToCommit = committedCapture; | ||
| captureToStage = mergeCaptures(stagedCapture, newestCapture); | ||
| } | ||
|
|
||
| // apply our changes, keeping the committed Node if we're committing its capture. | ||
| final Node<CAPTURE> nodeToCommit = (Objects.equals(committedCapture, captureToCommit) ? committedNode : new Node<>(captureToCommit)); | ||
| newTailWithStaging.set(nodeToCommit, captureToStage); | ||
|
|
||
| return newTailWithStaging; | ||
| }); |
There was a problem hiding this comment.
Additional context:
This is a re-implementation of the algorithm, primarily focused on using an atomically-swappable pair of values (a committed tail-Node<CAPTURE> plus a staged CAPTURE) instead of two separately-atomic values so that we can use AtomicReference#getAndUpdate to atomically swap a new pair in.
If the action results in a new node being in this pair, we know that the node is detached (not linked to by the previous tail) and must link the former tail node to it.
The code here defers out to the abstract function mergeCaptures so that the implementation can provide a meaningful merge action. In the case of FlowMetrics (which are okay dropping intermediate values), the implementation simply selects the youngest of the two captures.
| // ensure we are fully-compact. | ||
| assertThat(flowMetric.estimateCapturesRetained(), is(lessThan(250))); | ||
| assertThat(flowMetric.estimateExcessRetained(this::maxRetentionPlusMinResolutionBuffer), is(equalTo(Duration.ZERO))); | ||
| assertThat(flowMetric.estimateCapturesRetained(), is(lessThan(252))); |
There was a problem hiding this comment.
The lifetime metric retains up to 2 nodes, and wasn't previously counted.
| private final AtomicReference<Node> tail; | ||
| private final AtomicReference<Node> head; | ||
| private final FlowMetricRetentionPolicy policy; | ||
| static class FlowRetentionWindow extends RetentionWindow<FlowCapture, Double> { |
There was a problem hiding this comment.
The diff is a little difficult to read here, but our implementation that subclasses the now-abstract RetentionWindow contains all of the logic for merging FlowCaptures and calculating a value:
static class FlowRetentionWindow extends RetentionWindow<FlowCapture, Double> { FlowRetentionWindow(FlowMetricRetentionPolicy policy, FlowCapture zeroCapture) { super(policy, zeroCapture); } @Override FlowCapture mergeCaptures(FlowCapture oldCapture, FlowCapture newCapture) { if (oldCapture == null) { return newCapture; } if (newCapture == null) { return oldCapture; } return (oldCapture.nanoTime() > newCapture.nanoTime()) ? oldCapture : newCapture; } @Override Optional<Double> calculateValue() { return calculateFromBaseline((compareCapture, baselineCapture) -> { if (compareCapture == null) { return Optional.empty(); } if (baselineCapture == null) { return Optional.empty(); } final OptionalDouble rate = calculateRate(compareCapture, baselineCapture); // convert from OptionalDouble to Optional<Double> return rate.isPresent() ? Optional.of(rate.getAsDouble()) : Optional.empty(); }); } }
andsel
left a comment
There was a problem hiding this comment.
Very nice refactoring, I think that calculateValue is the place to compute the aggregation of histograms, for the histogram metric.
I've left a couple of suggestions to have your feedback.
...h-core/src/main/java/org/logstash/instrument/metrics/BuiltInFlowMetricRetentionPolicies.java
Show resolved
Hide resolved
logstash-core/src/main/java/org/logstash/instrument/metrics/RetentionWindow.java
Outdated
Show resolved
Hide resolved
logstash-core/src/main/java/org/logstash/instrument/metrics/RetentionWindow.java
Show resolved
Hide resolved
Co-authored-by: Andrea Selva <selva.andre@gmail.com>
Co-authored-by: Andrea Selva <selva.andre@gmail.com>
@andsel: I had seen your previous work on the HdrHistogram front that stored interval histograms (containing only data-points from the window), which would mean that the implementation there would need to walk the linked list to calculate, making the runtime query cost fairly high. This would be made significantly easier if the values stored for histogram captures were lifetime values, since the |
💚 Build Succeeded
History
|
| * @param newCapture a new capture to apply on top of the base | ||
| * @return a {@link CAPTURE} representing both provided captures | ||
| */ | ||
| abstract CAPTURE mergeCaptures(final CAPTURE oldCapture, final CAPTURE newCapture); |
There was a problem hiding this comment.
note: this method only really exists because the proposed hdrhistogram implementation stores interval histograms instead of lifetime values, so it needs to merge fresh captures into the staged capture when the staged capture isn't yet old enough to commit.
If the hdrhistogram were to be implemented using lifetime values, then this could be simplified to simply select the later capture in both cases.
There was a problem hiding this comment.
The histogram PR does the staging on the client side. It can't create a snapshot for each recorded value and then let the retained window do the merge of these micro snapshot. Instead, it accumulate the measures in the recorder, and that verify when it needs to create a new snapshot, checking that the resolution nanos interval passed.
I don't think that we have to create a new snapshot for each batch size that we measure, that would cost a lot, in particular in memory usage terms.
|
Hi @yaauie reviewing your comment about subtract and lifetime histograms I've some questions.
If I understand correctly you propose to not store incremental deltas of histograms, but snapshots that are absolute pictures of a lifelong accumulation and then using subtract to compute the time delimited view. The idea could be interesting but as I can understand is not feasible with Histogram's recorder, which provides only the snapshots between intervals, so it returns deltas and not absolute histograms. From official HdrHistogram documentation, Recorder is the preferred way to track values in highly concurrent environments. The metric code has to be fast on taking action but can be slower on the reading side.
Regarding the doubts about the navigation of the list of snapshot when The RetentionWindow lists contain the following approximate number of snapshot nodes:
Given the current plan to use only the first three windows for this specific metric, traversing a maximum of 30 nodes is not anticipated to cause performance issues, especially considering the operation is on the cold path. Let me know what do you think. |
andsel
left a comment
There was a problem hiding this comment.
Code changes looks good to me, but have left a couple of comments:
- one related to the need to the
mergeoperation. - the other about to use the subtract from histogram and the possible related performance issue on query side.
and
I agree. But I think we can have the best of both worlds. Flow metrics are typically registered with a periodic task to perform captures of the underlying metric, instead of making the capture logic be part of the write-time path for that metric. This is intentional, and keeps all of the complexities of managing that state out of the write-time path so that we avoid adding jitter to actual event processing. Using a similar separation here would be helpful, and would also ensure that we have frequent-enough captures even if we have periods where a given histogram doesn't have additional values being tracked. The current proposal in #18503 is to have one Since lifetime values would not need to be differentiated by retention window, we could have a single While I brought this up in terms of how it enables a latest-minus-baseline calculation in the retention window, I think that the primary value is how it simplifies the metric write-time path. |
andsel
left a comment
There was a problem hiding this comment.
LGTM
In the light of the #18624 (comment) this PR LGTM and I'll create a PR to follow the guidance on implementing Lifetime Histogram Metric that replace #18503
Release notes
[rn: skip]
What does this PR do?
This NET ZERO CHANGE refactor extracts the shareable internals of
FlowMetrics.RetentionWindowinto its own abstract class so that they can be shared by the incoming histogram implementation.The initial implementation of
FlowMetrics.RetentionWindowwas heavily optimized around the nature of rates of change being calculable from first and last entries without a need for intermediate values; it preferred throwing captures away whenever doing so didn't impact its ability to meet the retention policy.This refactor extracts
RetentionWindowas an abstract generic class, and moves the responsibility for merging two captures and for calculating the value into subclasses so that the implementations can provide meaningful specifics without knowing anything else about the linked list data-structures or atomic-operations.The
ExtendedFlowMetricprovided its lifetime values in a hacky way that couldn't be reused, so I also migrated theFlowMetricRetentionPolicyto a form that supports a lifetime value that effectively has an infinitely large resolution to avoid intermediate captures.Checklist
[ ] I have made corresponding changes to the documentation[ ] I have made corresponding change to the default configuration files (and/or docker env variables)