-
Notifications
You must be signed in to change notification settings - Fork 12
feat(capture): emit counter metrics as deltas instead of cumulative values #1696
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
…alues Convert registry counter capture from cumulative to delta semantics: - Add `counter_prev` field to StateMachine to track previous counter values - First observation of each counter is skipped (used as baseline) - Subsequent observations emit delta = current - prev - Counter resets handled via saturating_sub (emit 0 if current < prev) Add first_tick tracking to Accumulator to prevent outputting data for ticks before a key was first observed: - Add `counter_first_tick` and `gauge_first_tick` HashMaps - Skip output in flush() and drain() for ticks before first_tick Add parquet schema versioning: - `lading.schema_version = "2"` - `lading.counter_semantics = "delta"` This change enables downstream consumers to use counter values directly without post-processing to calculate deltas. Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
| .set_key_value_metadata(Some(vec![ | ||
| KeyValue::new("lading.schema_version".to_string(), "2".to_string()), | ||
| KeyValue::new( | ||
| "lading.counter_semantics".to_string(), | ||
| "delta".to_string(), | ||
| ), | ||
| ])) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🔥
| /// Previous counter values for delta calculation. | ||
| /// First observation of each counter is skipped to avoid emitting | ||
| /// the entire cumulative value as a "delta". | ||
| counter_prev: FxHashMap<Key, u64>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm the only thing I think I have as a concern with this PR is how this new map interacts with expiration. I don't think it does, so this is a steady accumulation site?
|
|
||
| // Verify we actually produced output | ||
| assert!(last_counter_value.is_some(), "No counter values in output"); | ||
| // Note: First counter observation is skipped, so we have 599 counter values |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was confused by this, would be worth stating that it's skipped because of the delta calculation. I guess that is done later L1584 - L1586.
| /// Tick when each counter key was first written (for delta counter support) | ||
| counter_first_tick: FxHashMap<Key, u64>, | ||
| /// Tick when each gauge key was first written | ||
| gauge_first_tick: FxHashMap<Key, u64>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Rather than have two whole separate maps I would almost rather than we alter
counters: FxHashMap<Key, { first_tick: u64, buf: [u64; BUFFER_SIZE]}>,
That way the information is co-local in memory and we avoid two additional maps, lookups.
Summary
lading.schema_version = "2",lading.counter_semantics = "delta")Changes
StateMachine
counter_prevfield to track previous counter valuesdelta = current - prevsaturating_sub(emit 0 if current < prev)Accumulator
counter_first_tickandgauge_first_tickHashMapsflush()anddrain()for ticks before a key'sfirst_tickParquet Format
lading.schema_version = "2"lading.counter_semantics = "delta"Test plan
🤖 Generated with Claude Code