From 0aa55bf64721e76b90a0e7eca2ddb67733528af9 Mon Sep 17 00:00:00 2001 From: Tim Geoghegan Date: Fri, 20 Feb 2026 13:53:52 -0800 Subject: [PATCH 1/4] db: migrate `aggregation_jobs` table to new times Store client report intervals as `INT8RANGE`, representing a range of timestamps in time precision units. To that end, we add `SqlIntervalTimePrecision`, a simplified version of `SqlInterval` that gives us conversion to/from `INT8RANGE`. Once all `TSTZRANGE`s are converted to `INT8RANGE`, the old `SqlInterval` gets deleted and `SqlIntervalTimePrecision` is renamed. Part of #4206 --- aggregator_core/src/datastore.rs | 71 ++++++++-------- aggregator_core/src/datastore/leases.rs | 4 +- aggregator_core/src/datastore/models.rs | 104 +++++++++++++++++++++++- db/00000000000001_initial_schema.up.sql | 5 +- 4 files changed, 142 insertions(+), 42 deletions(-) diff --git a/aggregator_core/src/datastore.rs b/aggregator_core/src/datastore.rs index 710c9f8c8..44e1d285b 100644 --- a/aggregator_core/src/datastore.rs +++ b/aggregator_core/src/datastore.rs @@ -33,6 +33,7 @@ use janus_messages::{ taskprov::TimePrecision, }; use leases::{acquired_aggregation_job_from_row, acquired_collection_job_from_row}; +use models::SqlIntervalTimePrecision; use opentelemetry::{ KeyValue, metrics::{Counter, Histogram, Meter}, @@ -1568,7 +1569,7 @@ WHERE report_aggregations.task_id = $1 /* task_id */ &task_info.pkey, /* batch_id */ &batch_id.get_encoded()?, /* threshold */ - &task_info.report_expiry_threshold(self.clock.now())?, + &task_info.report_expiry_threshold_as_time_precision_units(self.clock.now())?, ], ) .await?; @@ -1849,18 +1850,11 @@ WHERE aggregation_jobs.task_id = $1 /* task_id */ &task_info.pkey, /* aggregation_job_id */ &aggregation_job_id.as_ref(), /* threshold */ - &task_info.report_expiry_threshold(self.clock.now())?, + &task_info.report_expiry_threshold_as_time_precision_units(self.clock.now())?, ], ) .await? - .map(|row| { - Self::aggregation_job_from_row( - task_id, - aggregation_job_id, - &task_info.time_precision, - &row, - ) - }) + .map(|row| Self::aggregation_job_from_row(task_id, aggregation_job_id, &row)) .transpose() } @@ -1896,7 +1890,7 @@ WHERE aggregation_jobs.task_id = $1 &[ /* task_id */ &task_info.pkey, /* threshold */ - &task_info.report_expiry_threshold(self.clock.now())?, + &task_info.report_expiry_threshold_as_time_precision_units(self.clock.now())?, ], ) .await? @@ -1905,7 +1899,6 @@ WHERE aggregation_jobs.task_id = $1 Self::aggregation_job_from_row( task_id, &row.get_bytea_and_convert::("aggregation_job_id")?, - &task_info.time_precision, &row, ) }) @@ -1919,12 +1912,11 @@ WHERE aggregation_jobs.task_id = $1 >( task_id: &TaskId, aggregation_job_id: &AggregationJobId, - time_precision: &TimePrecision, row: &Row, ) -> Result, Error> { let client_timestamp_interval = row - .get::<_, SqlInterval>("client_timestamp_interval") - .to_dap_time_interval(time_precision)?; + .get::<_, SqlIntervalTimePrecision>("client_timestamp_interval") + .into(); let mut job = AggregationJob::new( *task_id, @@ -1958,6 +1950,7 @@ WHERE aggregation_jobs.task_id = $1 maximum_acquire_count: usize, ) -> Result>, Error> { let now = self.clock.now(); + let now_seconds = now.timestamp(); let lease_expiry_time = add_date_time_duration(now, *lease_duration)?; let maximum_acquire_count: i64 = maximum_acquire_count.try_into()?; @@ -1967,6 +1960,8 @@ WHERE aggregation_jobs.task_id = $1 // We generate the token on the DB to allow each acquired job to receive its own distinct // token. This is not strictly necessary as we only care about token collisions on a // per-row basis. + // TODO(timg): problem here: agg job client_timestamp_interval is time precisions + // but tasks.report_expiry_age is BIGINT... ah maybe we can do the math in SQL let stmt = self .prepare_cached( "-- acquire_incomplete_aggregation_jobs() @@ -1976,16 +1971,15 @@ WITH incomplete_jobs AS ( WHERE aggregation_jobs.state = 'ACTIVE' AND aggregation_jobs.lease_expiry <= $2 AND UPPER(aggregation_jobs.client_timestamp_interval) >= - COALESCE($2::TIMESTAMP WITH TIME ZONE - tasks.report_expiry_age * '1 second'::INTERVAL, - '-infinity'::TIMESTAMP WITH TIME ZONE) - FOR UPDATE OF aggregation_jobs SKIP LOCKED LIMIT $3 + COALESCE(($3 - tasks.report_expiry_age) / tasks.time_precision, 0) + FOR UPDATE OF aggregation_jobs SKIP LOCKED LIMIT $4 ) UPDATE aggregation_jobs SET lease_expiry = $1, lease_token = gen_random_bytes(16), lease_attempts = lease_attempts + 1, - updated_at = $4, - updated_by = $5 + updated_at = $5, + updated_by = $6 FROM tasks WHERE tasks.id = aggregation_jobs.task_id AND aggregation_jobs.id IN (SELECT id FROM incomplete_jobs) @@ -1999,6 +1993,7 @@ RETURNING tasks.task_id, tasks.batch_mode, tasks.vdaf, &[ /* lease_expiry */ &lease_expiry_time, /* now */ &now, + /* now_seconds */ &now_seconds, /* limit */ &maximum_acquire_count, /* updated_at */ &now, /* updated_by */ &self.name, @@ -2070,7 +2065,8 @@ WHERE aggregation_jobs.task_id = $4 /* aggregation_job_id */ &lease.leased().aggregation_job_id().as_ref(), /* lease_expiry */ &lease.lease_expiry_time(), /* lease_token */ &lease.lease_token().as_ref(), - /* threshold */ &task_info.report_expiry_threshold(now)?, + /* threshold */ + &task_info.report_expiry_threshold_as_time_precision_units(now)?, ], ) .await?, @@ -2129,17 +2125,15 @@ ON CONFLICT(task_id, aggregation_job_id) DO UPDATE /* batch_id */ &aggregation_job.partial_batch_identifier().get_encoded()?, /* client_timestamp_interval */ - &SqlInterval::from_dap_time_interval( - aggregation_job.client_timestamp_interval(), - &task_info.time_precision, - )?, + &SqlIntervalTimePrecision::from(*aggregation_job.client_timestamp_interval()), /* state */ &aggregation_job.state(), /* step */ &(u16::from(aggregation_job.step()) as i32), /* last_request_hash */ &aggregation_job.last_request_hash(), /* created_at */ &now, /* updated_at */ &now, /* updated_by */ &self.name, - /* threshold */ &task_info.report_expiry_threshold(now)?, + /* threshold */ + &task_info.report_expiry_threshold_as_time_precision_units(now)?, ], ) .await?, @@ -2173,7 +2167,7 @@ UPDATE aggregation_jobs SET updated_by = $5 WHERE aggregation_jobs.task_id = $6 AND aggregation_jobs.aggregation_job_id = $7 - AND UPPER(aggregation_jobs.client_timestamp_interval) >= $8::TIMESTAMP WITH TIME ZONE", + AND UPPER(aggregation_jobs.client_timestamp_interval) >= $8", ) .await?; check_single_row_mutation( @@ -2187,7 +2181,8 @@ WHERE aggregation_jobs.task_id = $6 /* updated_by */ &self.name, /* task_id */ &task_info.pkey, /* aggregation_job_id */ &aggregation_job.id().as_ref(), - /* threshold */ &task_info.report_expiry_threshold(now)?, + /* threshold */ + &task_info.report_expiry_threshold_as_time_precision_units(now)?, ], ) .await?, @@ -2237,7 +2232,7 @@ ORDER BY ord ASC", /* task_id */ &task_info.pkey, /* aggregation_job_id */ &aggregation_job_id.as_ref(), /* threshold */ - &task_info.report_expiry_threshold(self.clock.now())?, + &task_info.report_expiry_threshold_as_time_precision_units(self.clock.now())?, ], ) .await? @@ -2300,7 +2295,7 @@ WHERE report_aggregations.task_id = $1 /* aggregation_job_id */ &aggregation_job_id.as_ref(), /* report_id */ &report_id.as_ref(), /* threshold */ - &task_info.report_expiry_threshold(self.clock.now())?, + &task_info.report_expiry_threshold_as_time_precision_units(self.clock.now())?, ], ) .await? @@ -2357,7 +2352,7 @@ WHERE report_aggregations.task_id = $1 &[ /* task_id */ &task_info.pkey, /* threshold */ - &task_info.report_expiry_threshold(self.clock.now())?, + &task_info.report_expiry_threshold_as_time_precision_units(self.clock.now())?, ], ) .await? @@ -2723,7 +2718,8 @@ ON CONFLICT(task_id, aggregation_job_id, ord) DO UPDATE /* created_at */ &now, /* updated_at */ &now, /* updated_by */ &self.name, - /* threshold */ &task_info.report_expiry_threshold(now)?, + /* threshold */ + &task_info.report_expiry_threshold_as_time_precision_units(now)?, ], ) .await?, @@ -2812,7 +2808,8 @@ ON CONFLICT(task_id, aggregation_job_id, ord) DO UPDATE /* created_at */ &now, /* updated_at */ &now, /* updated_by */ &self.name, - /* threshold */ &task_info.report_expiry_threshold(now)?, + /* threshold */ + &task_info.report_expiry_threshold_as_time_precision_units(now)?, ], ) .await?, @@ -2877,7 +2874,8 @@ ON CONFLICT(task_id, aggregation_job_id, ord) DO UPDATE /* created_at */ &now, /* updated_at */ &now, /* updated_by */ &self.name, - /* threshold */ &task_info.report_expiry_threshold(now)?, + /* threshold */ + &task_info.report_expiry_threshold_as_time_precision_units(now)?, ], ) .await?, @@ -2962,7 +2960,8 @@ WHERE report_aggregations.aggregation_job_id = aggregation_jobs.id .time() .as_date_time(task_info.time_precision)?, /* ord */ &TryInto::::try_into(report_aggregation.ord())?, - /* threshold */ &task_info.report_expiry_threshold(now)?, + /* threshold */ + &task_info.report_expiry_threshold_as_time_precision_units(now)?, ], ) .await?, @@ -5076,7 +5075,7 @@ WHERE id IN (SELECT id FROM aggregation_jobs_to_delete)", &[ /* task_id */ &task_info.pkey, /* threshold */ - &task_info.report_expiry_threshold(self.clock.now())?, + &task_info.report_expiry_threshold_as_time_precision_units(self.clock.now())?, /* limit */ &i64::try_from(limit)?, ], ) diff --git a/aggregator_core/src/datastore/leases.rs b/aggregator_core/src/datastore/leases.rs index a00e6e37e..cefa5cab5 100644 --- a/aggregator_core/src/datastore/leases.rs +++ b/aggregator_core/src/datastore/leases.rs @@ -175,7 +175,7 @@ WHERE aggregation_jobs.task_id = $1 /* task ID */ &task_info.pkey, /* aggregation_job_id */ &aggregation_job_id.as_ref(), /* threshold */ - &task_info.report_expiry_threshold(self.clock.now())?, + &task_info.report_expiry_threshold_as_time_precision_units(self.clock.now())?, ], ) .await? @@ -221,7 +221,7 @@ WHERE aggregation_jobs.task_id = $1 &[ /* task ID */ &task_info.pkey, /* threshold */ - &task_info.report_expiry_threshold(self.clock.now())?, + &task_info.report_expiry_threshold_as_time_precision_units(self.clock.now())?, ], ) .await? diff --git a/aggregator_core/src/datastore/models.rs b/aggregator_core/src/datastore/models.rs index a6b57d3fa..bf60bf297 100644 --- a/aggregator_core/src/datastore/models.rs +++ b/aggregator_core/src/datastore/models.rs @@ -26,8 +26,8 @@ use janus_messages::{ taskprov::TimePrecision, }; use postgres_protocol::types::{ - Range, RangeBound, empty_range_to_sql, range_from_sql, range_to_sql, timestamp_from_sql, - timestamp_to_sql, + Range, RangeBound, empty_range_to_sql, int8_from_sql, int8_to_sql, range_from_sql, + range_to_sql, timestamp_from_sql, timestamp_to_sql, }; use postgres_types::{FromSql, ToSql, accepts, to_sql_checked}; use prio::{ @@ -2153,6 +2153,106 @@ impl OutstandingBatch { } } +/// Wrapper around [`janus_messages::Interval`] that supports conversions to/from SQL INT8RANGE, +/// representing time intervals in time precision units. +/// +/// Once all tables are migrated to time precision units, this type can be renamed to `SqlInterval` +/// and the other can be deleted. +#[derive(Debug, Clone, Copy, Eq, PartialEq)] +pub struct SqlIntervalTimePrecision(Interval); + +impl From for SqlIntervalTimePrecision { + fn from(interval: Interval) -> Self { + Self(interval) + } +} + +impl From for Interval { + fn from(value: SqlIntervalTimePrecision) -> Self { + value.0 + } +} + +impl<'a> FromSql<'a> for SqlIntervalTimePrecision { + fn from_sql( + _: &postgres_types::Type, + raw: &'a [u8], + ) -> Result> { + match range_from_sql(raw)? { + Range::Empty => Ok(Self(Interval::EMPTY)), + Range::Nonempty(RangeBound::Inclusive(None), _) + | Range::Nonempty(RangeBound::Exclusive(None), _) + | Range::Nonempty(_, RangeBound::Inclusive(None)) + | Range::Nonempty(_, RangeBound::Exclusive(None)) + | Range::Nonempty(RangeBound::Unbounded, _) + | Range::Nonempty(_, RangeBound::Unbounded) => Err("Interval must be bounded".into()), + Range::Nonempty(RangeBound::Exclusive(_), _) + | Range::Nonempty(_, RangeBound::Inclusive(_)) => { + Err("Interval must be half-open".into()) + } + Range::Nonempty( + RangeBound::Inclusive(Some(start_raw)), + RangeBound::Exclusive(Some(end_raw)), + ) => { + // These values are the start and end of the interval in time precision units. + let start_time = int8_from_sql(start_raw)?; + let end_time = int8_from_sql(end_raw)?; + + Ok(Interval::new( + Time::from_time_precision_units( + start_time + .try_into() + .map_err(|_| "interval start must be positive")?, + ), + Duration::from_time_precision_units( + (end_time - start_time) + .try_into() + .map_err(|_| "interval duration must be positive")?, + ), + )? + .into()) + } + } + } + + accepts!(INT8_RANGE); +} + +impl ToSql for SqlIntervalTimePrecision { + fn to_sql( + &self, + _: &postgres_types::Type, + out: &mut bytes::BytesMut, + ) -> Result> { + // Convert the interval start and end to SQL timestamps. + if self.0 == Interval::EMPTY { + empty_range_to_sql(out); + return Ok(postgres_types::IsNull::No); + } + + let start = self.0.start().as_signed_time_precision_units()?; + let end = self.0.end().as_signed_time_precision_units()?; + + range_to_sql( + |out| { + int8_to_sql(start, out); + Ok(RangeBound::Inclusive(postgres_protocol::IsNull::No)) + }, + |out| { + int8_to_sql(end, out); + Ok(RangeBound::Exclusive(postgres_protocol::IsNull::No)) + }, + out, + )?; + + Ok(postgres_types::IsNull::No) + } + + accepts!(INT8_RANGE); + + to_sql_checked!(); +} + /// The SQL timestamp epoch is midnight UTC on 2000-01-01. This const represents /// the Unix epoch seconds (946_684_800) in the context of SQL_UNIT_TIME_PRECISION /// and should not be necessary after Issue #4206. diff --git a/db/00000000000001_initial_schema.up.sql b/db/00000000000001_initial_schema.up.sql index 68d2a8704..bd3d3b430 100644 --- a/db/00000000000001_initial_schema.up.sql +++ b/db/00000000000001_initial_schema.up.sql @@ -263,8 +263,9 @@ CREATE TABLE aggregation_jobs( aggregation_job_id BYTEA NOT NULL, -- 16-byte AggregationJobID as defined by the DAP specification aggregation_param BYTEA NOT NULL, -- encoded aggregation parameter (opaque VDAF message) batch_id BYTEA NOT NULL, -- batch ID (leader-selected only; corresponds to identifier in BatchSelector) - -- the minimal interval containing all of client timestamps included in this aggregation job - client_timestamp_interval TSTZRANGE NOT NULL, + -- The minimal interval, in time precision units, containing all of client timestamps included + -- in this aggregation job + client_timestamp_interval INT8RANGE NOT NULL, state AGGREGATION_JOB_STATE NOT NULL, -- current state of the aggregation job step INTEGER NOT NULL, -- current step of the aggregation job last_request_hash BYTEA, -- SHA-256 hash of the most recently received AggregationJobInitReq or AggregationJobContinueReq (helper only) From 5419e1cb1fe2576d87bf99e51f83e427379cba39 Mon Sep 17 00:00:00 2001 From: Tim Geoghegan Date: Mon, 23 Feb 2026 17:13:23 -0800 Subject: [PATCH 2/4] remove comment --- aggregator_core/src/datastore.rs | 2 -- 1 file changed, 2 deletions(-) diff --git a/aggregator_core/src/datastore.rs b/aggregator_core/src/datastore.rs index 44e1d285b..213f2f21f 100644 --- a/aggregator_core/src/datastore.rs +++ b/aggregator_core/src/datastore.rs @@ -1960,8 +1960,6 @@ WHERE aggregation_jobs.task_id = $1 // We generate the token on the DB to allow each acquired job to receive its own distinct // token. This is not strictly necessary as we only care about token collisions on a // per-row basis. - // TODO(timg): problem here: agg job client_timestamp_interval is time precisions - // but tasks.report_expiry_age is BIGINT... ah maybe we can do the math in SQL let stmt = self .prepare_cached( "-- acquire_incomplete_aggregation_jobs() From 519c7df43adf70933f1ded060dd8a4980ea49ba4 Mon Sep 17 00:00:00 2001 From: Tim Geoghegan Date: Tue, 24 Feb 2026 11:22:52 -0800 Subject: [PATCH 3/4] jcjones review --- aggregator_core/src/datastore/models.rs | 6 ++++-- db/00000000000001_initial_schema.up.sql | 2 +- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/aggregator_core/src/datastore/models.rs b/aggregator_core/src/datastore/models.rs index bf60bf297..ecace8f0a 100644 --- a/aggregator_core/src/datastore/models.rs +++ b/aggregator_core/src/datastore/models.rs @@ -2205,7 +2205,9 @@ impl<'a> FromSql<'a> for SqlIntervalTimePrecision { .map_err(|_| "interval start must be positive")?, ), Duration::from_time_precision_units( - (end_time - start_time) + end_time + .checked_sub(start_time) + .ok_or("interval end must be >= start")? .try_into() .map_err(|_| "interval duration must be positive")?, ), @@ -2224,7 +2226,7 @@ impl ToSql for SqlIntervalTimePrecision { _: &postgres_types::Type, out: &mut bytes::BytesMut, ) -> Result> { - // Convert the interval start and end to SQL timestamps. + // Convert the interval start and end to values in time precision units. if self.0 == Interval::EMPTY { empty_range_to_sql(out); return Ok(postgres_types::IsNull::No); diff --git a/db/00000000000001_initial_schema.up.sql b/db/00000000000001_initial_schema.up.sql index bd3d3b430..24ea1c562 100644 --- a/db/00000000000001_initial_schema.up.sql +++ b/db/00000000000001_initial_schema.up.sql @@ -1,6 +1,6 @@ -- Load pgcrypto for gen_random_bytes. CREATE EXTENSION pgcrypto; --- Load an extension to allow indexing over both BIGINT and TSRANGE in a multicolumn GiST index. +-- Load an extension to allow indexing over both BIGINT and INT8RANGE in a multicolumn GiST index. CREATE EXTENSION btree_gist; -- Identifies which aggregator role is being played for this task. From 21557937bb8bbbf170f9842795beb50b738cef48 Mon Sep 17 00:00:00 2001 From: Tim Geoghegan Date: Tue, 24 Feb 2026 15:20:20 -0800 Subject: [PATCH 4/4] david review --- aggregator_core/src/datastore/models.rs | 18 ++++++++---------- 1 file changed, 8 insertions(+), 10 deletions(-) diff --git a/aggregator_core/src/datastore/models.rs b/aggregator_core/src/datastore/models.rs index ecace8f0a..5968e1f42 100644 --- a/aggregator_core/src/datastore/models.rs +++ b/aggregator_core/src/datastore/models.rs @@ -2195,21 +2195,19 @@ impl<'a> FromSql<'a> for SqlIntervalTimePrecision { RangeBound::Exclusive(Some(end_raw)), ) => { // These values are the start and end of the interval in time precision units. - let start_time = int8_from_sql(start_raw)?; - let end_time = int8_from_sql(end_raw)?; + let start_time = int8_from_sql(start_raw)? + .try_into() + .map_err(|_| "interval start must be positive")?; + let end_time: u64 = int8_from_sql(end_raw)? + .try_into() + .map_err(|_| "interval start must be positive")?; Ok(Interval::new( - Time::from_time_precision_units( - start_time - .try_into() - .map_err(|_| "interval start must be positive")?, - ), + Time::from_time_precision_units(start_time), Duration::from_time_precision_units( end_time .checked_sub(start_time) - .ok_or("interval end must be >= start")? - .try_into() - .map_err(|_| "interval duration must be positive")?, + .ok_or("interval end must be >= start")?, ), )? .into())