diff --git a/aggregator_core/src/datastore.rs b/aggregator_core/src/datastore.rs index 710c9f8c8..213f2f21f 100644 --- a/aggregator_core/src/datastore.rs +++ b/aggregator_core/src/datastore.rs @@ -33,6 +33,7 @@ use janus_messages::{ taskprov::TimePrecision, }; use leases::{acquired_aggregation_job_from_row, acquired_collection_job_from_row}; +use models::SqlIntervalTimePrecision; use opentelemetry::{ KeyValue, metrics::{Counter, Histogram, Meter}, @@ -1568,7 +1569,7 @@ WHERE report_aggregations.task_id = $1 /* task_id */ &task_info.pkey, /* batch_id */ &batch_id.get_encoded()?, /* threshold */ - &task_info.report_expiry_threshold(self.clock.now())?, + &task_info.report_expiry_threshold_as_time_precision_units(self.clock.now())?, ], ) .await?; @@ -1849,18 +1850,11 @@ WHERE aggregation_jobs.task_id = $1 /* task_id */ &task_info.pkey, /* aggregation_job_id */ &aggregation_job_id.as_ref(), /* threshold */ - &task_info.report_expiry_threshold(self.clock.now())?, + &task_info.report_expiry_threshold_as_time_precision_units(self.clock.now())?, ], ) .await? - .map(|row| { - Self::aggregation_job_from_row( - task_id, - aggregation_job_id, - &task_info.time_precision, - &row, - ) - }) + .map(|row| Self::aggregation_job_from_row(task_id, aggregation_job_id, &row)) .transpose() } @@ -1896,7 +1890,7 @@ WHERE aggregation_jobs.task_id = $1 &[ /* task_id */ &task_info.pkey, /* threshold */ - &task_info.report_expiry_threshold(self.clock.now())?, + &task_info.report_expiry_threshold_as_time_precision_units(self.clock.now())?, ], ) .await? @@ -1905,7 +1899,6 @@ WHERE aggregation_jobs.task_id = $1 Self::aggregation_job_from_row( task_id, &row.get_bytea_and_convert::("aggregation_job_id")?, - &task_info.time_precision, &row, ) }) @@ -1919,12 +1912,11 @@ WHERE aggregation_jobs.task_id = $1 >( task_id: &TaskId, aggregation_job_id: &AggregationJobId, - time_precision: &TimePrecision, row: &Row, ) -> Result, Error> { let client_timestamp_interval = row - .get::<_, SqlInterval>("client_timestamp_interval") - .to_dap_time_interval(time_precision)?; + .get::<_, SqlIntervalTimePrecision>("client_timestamp_interval") + .into(); let mut job = AggregationJob::new( *task_id, @@ -1958,6 +1950,7 @@ WHERE aggregation_jobs.task_id = $1 maximum_acquire_count: usize, ) -> Result>, Error> { let now = self.clock.now(); + let now_seconds = now.timestamp(); let lease_expiry_time = add_date_time_duration(now, *lease_duration)?; let maximum_acquire_count: i64 = maximum_acquire_count.try_into()?; @@ -1976,16 +1969,15 @@ WITH incomplete_jobs AS ( WHERE aggregation_jobs.state = 'ACTIVE' AND aggregation_jobs.lease_expiry <= $2 AND UPPER(aggregation_jobs.client_timestamp_interval) >= - COALESCE($2::TIMESTAMP WITH TIME ZONE - tasks.report_expiry_age * '1 second'::INTERVAL, - '-infinity'::TIMESTAMP WITH TIME ZONE) - FOR UPDATE OF aggregation_jobs SKIP LOCKED LIMIT $3 + COALESCE(($3 - tasks.report_expiry_age) / tasks.time_precision, 0) + FOR UPDATE OF aggregation_jobs SKIP LOCKED LIMIT $4 ) UPDATE aggregation_jobs SET lease_expiry = $1, lease_token = gen_random_bytes(16), lease_attempts = lease_attempts + 1, - updated_at = $4, - updated_by = $5 + updated_at = $5, + updated_by = $6 FROM tasks WHERE tasks.id = aggregation_jobs.task_id AND aggregation_jobs.id IN (SELECT id FROM incomplete_jobs) @@ -1999,6 +1991,7 @@ RETURNING tasks.task_id, tasks.batch_mode, tasks.vdaf, &[ /* lease_expiry */ &lease_expiry_time, /* now */ &now, + /* now_seconds */ &now_seconds, /* limit */ &maximum_acquire_count, /* updated_at */ &now, /* updated_by */ &self.name, @@ -2070,7 +2063,8 @@ WHERE aggregation_jobs.task_id = $4 /* aggregation_job_id */ &lease.leased().aggregation_job_id().as_ref(), /* lease_expiry */ &lease.lease_expiry_time(), /* lease_token */ &lease.lease_token().as_ref(), - /* threshold */ &task_info.report_expiry_threshold(now)?, + /* threshold */ + &task_info.report_expiry_threshold_as_time_precision_units(now)?, ], ) .await?, @@ -2129,17 +2123,15 @@ ON CONFLICT(task_id, aggregation_job_id) DO UPDATE /* batch_id */ &aggregation_job.partial_batch_identifier().get_encoded()?, /* client_timestamp_interval */ - &SqlInterval::from_dap_time_interval( - aggregation_job.client_timestamp_interval(), - &task_info.time_precision, - )?, + &SqlIntervalTimePrecision::from(*aggregation_job.client_timestamp_interval()), /* state */ &aggregation_job.state(), /* step */ &(u16::from(aggregation_job.step()) as i32), /* last_request_hash */ &aggregation_job.last_request_hash(), /* created_at */ &now, /* updated_at */ &now, /* updated_by */ &self.name, - /* threshold */ &task_info.report_expiry_threshold(now)?, + /* threshold */ + &task_info.report_expiry_threshold_as_time_precision_units(now)?, ], ) .await?, @@ -2173,7 +2165,7 @@ UPDATE aggregation_jobs SET updated_by = $5 WHERE aggregation_jobs.task_id = $6 AND aggregation_jobs.aggregation_job_id = $7 - AND UPPER(aggregation_jobs.client_timestamp_interval) >= $8::TIMESTAMP WITH TIME ZONE", + AND UPPER(aggregation_jobs.client_timestamp_interval) >= $8", ) .await?; check_single_row_mutation( @@ -2187,7 +2179,8 @@ WHERE aggregation_jobs.task_id = $6 /* updated_by */ &self.name, /* task_id */ &task_info.pkey, /* aggregation_job_id */ &aggregation_job.id().as_ref(), - /* threshold */ &task_info.report_expiry_threshold(now)?, + /* threshold */ + &task_info.report_expiry_threshold_as_time_precision_units(now)?, ], ) .await?, @@ -2237,7 +2230,7 @@ ORDER BY ord ASC", /* task_id */ &task_info.pkey, /* aggregation_job_id */ &aggregation_job_id.as_ref(), /* threshold */ - &task_info.report_expiry_threshold(self.clock.now())?, + &task_info.report_expiry_threshold_as_time_precision_units(self.clock.now())?, ], ) .await? @@ -2300,7 +2293,7 @@ WHERE report_aggregations.task_id = $1 /* aggregation_job_id */ &aggregation_job_id.as_ref(), /* report_id */ &report_id.as_ref(), /* threshold */ - &task_info.report_expiry_threshold(self.clock.now())?, + &task_info.report_expiry_threshold_as_time_precision_units(self.clock.now())?, ], ) .await? @@ -2357,7 +2350,7 @@ WHERE report_aggregations.task_id = $1 &[ /* task_id */ &task_info.pkey, /* threshold */ - &task_info.report_expiry_threshold(self.clock.now())?, + &task_info.report_expiry_threshold_as_time_precision_units(self.clock.now())?, ], ) .await? @@ -2723,7 +2716,8 @@ ON CONFLICT(task_id, aggregation_job_id, ord) DO UPDATE /* created_at */ &now, /* updated_at */ &now, /* updated_by */ &self.name, - /* threshold */ &task_info.report_expiry_threshold(now)?, + /* threshold */ + &task_info.report_expiry_threshold_as_time_precision_units(now)?, ], ) .await?, @@ -2812,7 +2806,8 @@ ON CONFLICT(task_id, aggregation_job_id, ord) DO UPDATE /* created_at */ &now, /* updated_at */ &now, /* updated_by */ &self.name, - /* threshold */ &task_info.report_expiry_threshold(now)?, + /* threshold */ + &task_info.report_expiry_threshold_as_time_precision_units(now)?, ], ) .await?, @@ -2877,7 +2872,8 @@ ON CONFLICT(task_id, aggregation_job_id, ord) DO UPDATE /* created_at */ &now, /* updated_at */ &now, /* updated_by */ &self.name, - /* threshold */ &task_info.report_expiry_threshold(now)?, + /* threshold */ + &task_info.report_expiry_threshold_as_time_precision_units(now)?, ], ) .await?, @@ -2962,7 +2958,8 @@ WHERE report_aggregations.aggregation_job_id = aggregation_jobs.id .time() .as_date_time(task_info.time_precision)?, /* ord */ &TryInto::::try_into(report_aggregation.ord())?, - /* threshold */ &task_info.report_expiry_threshold(now)?, + /* threshold */ + &task_info.report_expiry_threshold_as_time_precision_units(now)?, ], ) .await?, @@ -5076,7 +5073,7 @@ WHERE id IN (SELECT id FROM aggregation_jobs_to_delete)", &[ /* task_id */ &task_info.pkey, /* threshold */ - &task_info.report_expiry_threshold(self.clock.now())?, + &task_info.report_expiry_threshold_as_time_precision_units(self.clock.now())?, /* limit */ &i64::try_from(limit)?, ], ) diff --git a/aggregator_core/src/datastore/leases.rs b/aggregator_core/src/datastore/leases.rs index a00e6e37e..cefa5cab5 100644 --- a/aggregator_core/src/datastore/leases.rs +++ b/aggregator_core/src/datastore/leases.rs @@ -175,7 +175,7 @@ WHERE aggregation_jobs.task_id = $1 /* task ID */ &task_info.pkey, /* aggregation_job_id */ &aggregation_job_id.as_ref(), /* threshold */ - &task_info.report_expiry_threshold(self.clock.now())?, + &task_info.report_expiry_threshold_as_time_precision_units(self.clock.now())?, ], ) .await? @@ -221,7 +221,7 @@ WHERE aggregation_jobs.task_id = $1 &[ /* task ID */ &task_info.pkey, /* threshold */ - &task_info.report_expiry_threshold(self.clock.now())?, + &task_info.report_expiry_threshold_as_time_precision_units(self.clock.now())?, ], ) .await? diff --git a/aggregator_core/src/datastore/models.rs b/aggregator_core/src/datastore/models.rs index a6b57d3fa..5968e1f42 100644 --- a/aggregator_core/src/datastore/models.rs +++ b/aggregator_core/src/datastore/models.rs @@ -26,8 +26,8 @@ use janus_messages::{ taskprov::TimePrecision, }; use postgres_protocol::types::{ - Range, RangeBound, empty_range_to_sql, range_from_sql, range_to_sql, timestamp_from_sql, - timestamp_to_sql, + Range, RangeBound, empty_range_to_sql, int8_from_sql, int8_to_sql, range_from_sql, + range_to_sql, timestamp_from_sql, timestamp_to_sql, }; use postgres_types::{FromSql, ToSql, accepts, to_sql_checked}; use prio::{ @@ -2153,6 +2153,106 @@ impl OutstandingBatch { } } +/// Wrapper around [`janus_messages::Interval`] that supports conversions to/from SQL INT8RANGE, +/// representing time intervals in time precision units. +/// +/// Once all tables are migrated to time precision units, this type can be renamed to `SqlInterval` +/// and the other can be deleted. +#[derive(Debug, Clone, Copy, Eq, PartialEq)] +pub struct SqlIntervalTimePrecision(Interval); + +impl From for SqlIntervalTimePrecision { + fn from(interval: Interval) -> Self { + Self(interval) + } +} + +impl From for Interval { + fn from(value: SqlIntervalTimePrecision) -> Self { + value.0 + } +} + +impl<'a> FromSql<'a> for SqlIntervalTimePrecision { + fn from_sql( + _: &postgres_types::Type, + raw: &'a [u8], + ) -> Result> { + match range_from_sql(raw)? { + Range::Empty => Ok(Self(Interval::EMPTY)), + Range::Nonempty(RangeBound::Inclusive(None), _) + | Range::Nonempty(RangeBound::Exclusive(None), _) + | Range::Nonempty(_, RangeBound::Inclusive(None)) + | Range::Nonempty(_, RangeBound::Exclusive(None)) + | Range::Nonempty(RangeBound::Unbounded, _) + | Range::Nonempty(_, RangeBound::Unbounded) => Err("Interval must be bounded".into()), + Range::Nonempty(RangeBound::Exclusive(_), _) + | Range::Nonempty(_, RangeBound::Inclusive(_)) => { + Err("Interval must be half-open".into()) + } + Range::Nonempty( + RangeBound::Inclusive(Some(start_raw)), + RangeBound::Exclusive(Some(end_raw)), + ) => { + // These values are the start and end of the interval in time precision units. + let start_time = int8_from_sql(start_raw)? + .try_into() + .map_err(|_| "interval start must be positive")?; + let end_time: u64 = int8_from_sql(end_raw)? + .try_into() + .map_err(|_| "interval start must be positive")?; + + Ok(Interval::new( + Time::from_time_precision_units(start_time), + Duration::from_time_precision_units( + end_time + .checked_sub(start_time) + .ok_or("interval end must be >= start")?, + ), + )? + .into()) + } + } + } + + accepts!(INT8_RANGE); +} + +impl ToSql for SqlIntervalTimePrecision { + fn to_sql( + &self, + _: &postgres_types::Type, + out: &mut bytes::BytesMut, + ) -> Result> { + // Convert the interval start and end to values in time precision units. + if self.0 == Interval::EMPTY { + empty_range_to_sql(out); + return Ok(postgres_types::IsNull::No); + } + + let start = self.0.start().as_signed_time_precision_units()?; + let end = self.0.end().as_signed_time_precision_units()?; + + range_to_sql( + |out| { + int8_to_sql(start, out); + Ok(RangeBound::Inclusive(postgres_protocol::IsNull::No)) + }, + |out| { + int8_to_sql(end, out); + Ok(RangeBound::Exclusive(postgres_protocol::IsNull::No)) + }, + out, + )?; + + Ok(postgres_types::IsNull::No) + } + + accepts!(INT8_RANGE); + + to_sql_checked!(); +} + /// The SQL timestamp epoch is midnight UTC on 2000-01-01. This const represents /// the Unix epoch seconds (946_684_800) in the context of SQL_UNIT_TIME_PRECISION /// and should not be necessary after Issue #4206. diff --git a/db/00000000000001_initial_schema.up.sql b/db/00000000000001_initial_schema.up.sql index 68d2a8704..24ea1c562 100644 --- a/db/00000000000001_initial_schema.up.sql +++ b/db/00000000000001_initial_schema.up.sql @@ -1,6 +1,6 @@ -- Load pgcrypto for gen_random_bytes. CREATE EXTENSION pgcrypto; --- Load an extension to allow indexing over both BIGINT and TSRANGE in a multicolumn GiST index. +-- Load an extension to allow indexing over both BIGINT and INT8RANGE in a multicolumn GiST index. CREATE EXTENSION btree_gist; -- Identifies which aggregator role is being played for this task. @@ -263,8 +263,9 @@ CREATE TABLE aggregation_jobs( aggregation_job_id BYTEA NOT NULL, -- 16-byte AggregationJobID as defined by the DAP specification aggregation_param BYTEA NOT NULL, -- encoded aggregation parameter (opaque VDAF message) batch_id BYTEA NOT NULL, -- batch ID (leader-selected only; corresponds to identifier in BatchSelector) - -- the minimal interval containing all of client timestamps included in this aggregation job - client_timestamp_interval TSTZRANGE NOT NULL, + -- The minimal interval, in time precision units, containing all of client timestamps included + -- in this aggregation job + client_timestamp_interval INT8RANGE NOT NULL, state AGGREGATION_JOB_STATE NOT NULL, -- current state of the aggregation job step INTEGER NOT NULL, -- current step of the aggregation job last_request_hash BYTEA, -- SHA-256 hash of the most recently received AggregationJobInitReq or AggregationJobContinueReq (helper only)