Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 33 additions & 36 deletions aggregator_core/src/datastore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ use janus_messages::{
taskprov::TimePrecision,
};
use leases::{acquired_aggregation_job_from_row, acquired_collection_job_from_row};
use models::SqlIntervalTimePrecision;
use opentelemetry::{
KeyValue,
metrics::{Counter, Histogram, Meter},
Expand Down Expand Up @@ -1568,7 +1569,7 @@ WHERE report_aggregations.task_id = $1
/* task_id */ &task_info.pkey,
/* batch_id */ &batch_id.get_encoded()?,
/* threshold */
&task_info.report_expiry_threshold(self.clock.now())?,
&task_info.report_expiry_threshold_as_time_precision_units(self.clock.now())?,
],
)
.await?;
Expand Down Expand Up @@ -1849,18 +1850,11 @@ WHERE aggregation_jobs.task_id = $1
/* task_id */ &task_info.pkey,
/* aggregation_job_id */ &aggregation_job_id.as_ref(),
/* threshold */
&task_info.report_expiry_threshold(self.clock.now())?,
&task_info.report_expiry_threshold_as_time_precision_units(self.clock.now())?,
],
)
.await?
.map(|row| {
Self::aggregation_job_from_row(
task_id,
aggregation_job_id,
&task_info.time_precision,
&row,
)
})
.map(|row| Self::aggregation_job_from_row(task_id, aggregation_job_id, &row))
.transpose()
}

Expand Down Expand Up @@ -1896,7 +1890,7 @@ WHERE aggregation_jobs.task_id = $1
&[
/* task_id */ &task_info.pkey,
/* threshold */
&task_info.report_expiry_threshold(self.clock.now())?,
&task_info.report_expiry_threshold_as_time_precision_units(self.clock.now())?,
],
)
.await?
Expand All @@ -1905,7 +1899,6 @@ WHERE aggregation_jobs.task_id = $1
Self::aggregation_job_from_row(
task_id,
&row.get_bytea_and_convert::<AggregationJobId>("aggregation_job_id")?,
&task_info.time_precision,
&row,
)
})
Expand All @@ -1919,12 +1912,11 @@ WHERE aggregation_jobs.task_id = $1
>(
task_id: &TaskId,
aggregation_job_id: &AggregationJobId,
time_precision: &TimePrecision,
row: &Row,
) -> Result<AggregationJob<SEED_SIZE, B, A>, Error> {
let client_timestamp_interval = row
.get::<_, SqlInterval>("client_timestamp_interval")
.to_dap_time_interval(time_precision)?;
.get::<_, SqlIntervalTimePrecision>("client_timestamp_interval")
.into();

let mut job = AggregationJob::new(
*task_id,
Expand Down Expand Up @@ -1958,6 +1950,7 @@ WHERE aggregation_jobs.task_id = $1
maximum_acquire_count: usize,
) -> Result<Vec<Lease<AcquiredAggregationJob>>, Error> {
let now = self.clock.now();
let now_seconds = now.timestamp();
let lease_expiry_time = add_date_time_duration(now, *lease_duration)?;
let maximum_acquire_count: i64 = maximum_acquire_count.try_into()?;

Expand All @@ -1976,16 +1969,15 @@ WITH incomplete_jobs AS (
WHERE aggregation_jobs.state = 'ACTIVE'
AND aggregation_jobs.lease_expiry <= $2
AND UPPER(aggregation_jobs.client_timestamp_interval) >=
COALESCE($2::TIMESTAMP WITH TIME ZONE - tasks.report_expiry_age * '1 second'::INTERVAL,
'-infinity'::TIMESTAMP WITH TIME ZONE)
FOR UPDATE OF aggregation_jobs SKIP LOCKED LIMIT $3
COALESCE(($3 - tasks.report_expiry_age) / tasks.time_precision, 0)
Copy link
Contributor Author

@tgeoghegan tgeoghegan Feb 24, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This little bit of math needs close scrutiny. #3 is now in seconds since the epoch, so we're subtracting report_expiry_age (also in seconds, a BIGINT) then dividing by the BIGINT time_precision to get a quantity we can compare to UPPER(aggregation_jobs.client_timestamp_interval). The way we do that in Janus boils down to timestamp / precision_secs (Time::from_seconds_since_epoch) so I think this is OK, because Postgres / is documented to "truncate the result" (https://www.postgresql.org/docs/9.5/functions-math.html).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The math LGTM, but we do have a functionality change here, I think? Previously tasks without a report_expiry_age used -infinity in the comparison, while with the division here we're now getting 0, so our >= is only going to match tasks with positive timestamps, while previously with the result going to -infinity it'd always match.

I guess that's just an edge case not particularly important for the real world... but you did ask me to apply scrutiny here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I appreciate the scrutiny. This is worth diving into.

The nuance here is signedness of time values. Previously, we stored SQL TIMESTAMP, which ranges from [-infinity, infinity] (is it possible to have a closed range on infinity? whatever). The intention was to use the smallest possible value so that it will always compare less than UPPER(aggregation_jobs.client_timestamp_interval), which was TIMESTAMP.

Now, UPPER(aggregation_jobs.client_timestamp_interval) is a DAP Time, which ranges from 0, 2^64-1. But we store it as SQL BIGINT which is -2^63, +2^63-1. So should we compare against the smallest possible Time or the smallest possible BIGINT?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Furthermore, TIMESTAMP's zero point does not line up with the Unix epoch that DAP uses, so some of our unit tests did care about negative SQL timestamps. Now, both $3 and UPPER(aggregation_jobs.client_timestamp_interval) are using the Unix epoch as their zero point, so I think it would be fine to use either 0 or x'8000000000000000'::bigint as our fallback value to compare against.

FOR UPDATE OF aggregation_jobs SKIP LOCKED LIMIT $4
)
UPDATE aggregation_jobs SET
lease_expiry = $1,
lease_token = gen_random_bytes(16),
lease_attempts = lease_attempts + 1,
updated_at = $4,
updated_by = $5
updated_at = $5,
updated_by = $6
FROM tasks
WHERE tasks.id = aggregation_jobs.task_id
AND aggregation_jobs.id IN (SELECT id FROM incomplete_jobs)
Expand All @@ -1999,6 +1991,7 @@ RETURNING tasks.task_id, tasks.batch_mode, tasks.vdaf,
&[
/* lease_expiry */ &lease_expiry_time,
/* now */ &now,
/* now_seconds */ &now_seconds,
/* limit */ &maximum_acquire_count,
/* updated_at */ &now,
/* updated_by */ &self.name,
Expand Down Expand Up @@ -2070,7 +2063,8 @@ WHERE aggregation_jobs.task_id = $4
/* aggregation_job_id */ &lease.leased().aggregation_job_id().as_ref(),
/* lease_expiry */ &lease.lease_expiry_time(),
/* lease_token */ &lease.lease_token().as_ref(),
/* threshold */ &task_info.report_expiry_threshold(now)?,
/* threshold */
&task_info.report_expiry_threshold_as_time_precision_units(now)?,
],
)
.await?,
Expand Down Expand Up @@ -2129,17 +2123,15 @@ ON CONFLICT(task_id, aggregation_job_id) DO UPDATE
/* batch_id */
&aggregation_job.partial_batch_identifier().get_encoded()?,
/* client_timestamp_interval */
&SqlInterval::from_dap_time_interval(
aggregation_job.client_timestamp_interval(),
&task_info.time_precision,
)?,
&SqlIntervalTimePrecision::from(*aggregation_job.client_timestamp_interval()),
/* state */ &aggregation_job.state(),
/* step */ &(u16::from(aggregation_job.step()) as i32),
/* last_request_hash */ &aggregation_job.last_request_hash(),
/* created_at */ &now,
/* updated_at */ &now,
/* updated_by */ &self.name,
/* threshold */ &task_info.report_expiry_threshold(now)?,
/* threshold */
&task_info.report_expiry_threshold_as_time_precision_units(now)?,
],
)
.await?,
Expand Down Expand Up @@ -2173,7 +2165,7 @@ UPDATE aggregation_jobs SET
updated_by = $5
WHERE aggregation_jobs.task_id = $6
AND aggregation_jobs.aggregation_job_id = $7
AND UPPER(aggregation_jobs.client_timestamp_interval) >= $8::TIMESTAMP WITH TIME ZONE",
AND UPPER(aggregation_jobs.client_timestamp_interval) >= $8",
)
.await?;
check_single_row_mutation(
Expand All @@ -2187,7 +2179,8 @@ WHERE aggregation_jobs.task_id = $6
/* updated_by */ &self.name,
/* task_id */ &task_info.pkey,
/* aggregation_job_id */ &aggregation_job.id().as_ref(),
/* threshold */ &task_info.report_expiry_threshold(now)?,
/* threshold */
&task_info.report_expiry_threshold_as_time_precision_units(now)?,
],
)
.await?,
Expand Down Expand Up @@ -2237,7 +2230,7 @@ ORDER BY ord ASC",
/* task_id */ &task_info.pkey,
/* aggregation_job_id */ &aggregation_job_id.as_ref(),
/* threshold */
&task_info.report_expiry_threshold(self.clock.now())?,
&task_info.report_expiry_threshold_as_time_precision_units(self.clock.now())?,
],
)
.await?
Expand Down Expand Up @@ -2300,7 +2293,7 @@ WHERE report_aggregations.task_id = $1
/* aggregation_job_id */ &aggregation_job_id.as_ref(),
/* report_id */ &report_id.as_ref(),
/* threshold */
&task_info.report_expiry_threshold(self.clock.now())?,
&task_info.report_expiry_threshold_as_time_precision_units(self.clock.now())?,
],
)
.await?
Expand Down Expand Up @@ -2357,7 +2350,7 @@ WHERE report_aggregations.task_id = $1
&[
/* task_id */ &task_info.pkey,
/* threshold */
&task_info.report_expiry_threshold(self.clock.now())?,
&task_info.report_expiry_threshold_as_time_precision_units(self.clock.now())?,
],
)
.await?
Expand Down Expand Up @@ -2723,7 +2716,8 @@ ON CONFLICT(task_id, aggregation_job_id, ord) DO UPDATE
/* created_at */ &now,
/* updated_at */ &now,
/* updated_by */ &self.name,
/* threshold */ &task_info.report_expiry_threshold(now)?,
/* threshold */
&task_info.report_expiry_threshold_as_time_precision_units(now)?,
],
)
.await?,
Expand Down Expand Up @@ -2812,7 +2806,8 @@ ON CONFLICT(task_id, aggregation_job_id, ord) DO UPDATE
/* created_at */ &now,
/* updated_at */ &now,
/* updated_by */ &self.name,
/* threshold */ &task_info.report_expiry_threshold(now)?,
/* threshold */
&task_info.report_expiry_threshold_as_time_precision_units(now)?,
],
)
.await?,
Expand Down Expand Up @@ -2877,7 +2872,8 @@ ON CONFLICT(task_id, aggregation_job_id, ord) DO UPDATE
/* created_at */ &now,
/* updated_at */ &now,
/* updated_by */ &self.name,
/* threshold */ &task_info.report_expiry_threshold(now)?,
/* threshold */
&task_info.report_expiry_threshold_as_time_precision_units(now)?,
],
)
.await?,
Expand Down Expand Up @@ -2962,7 +2958,8 @@ WHERE report_aggregations.aggregation_job_id = aggregation_jobs.id
.time()
.as_date_time(task_info.time_precision)?,
/* ord */ &TryInto::<i64>::try_into(report_aggregation.ord())?,
/* threshold */ &task_info.report_expiry_threshold(now)?,
/* threshold */
&task_info.report_expiry_threshold_as_time_precision_units(now)?,
],
)
.await?,
Expand Down Expand Up @@ -5076,7 +5073,7 @@ WHERE id IN (SELECT id FROM aggregation_jobs_to_delete)",
&[
/* task_id */ &task_info.pkey,
/* threshold */
&task_info.report_expiry_threshold(self.clock.now())?,
&task_info.report_expiry_threshold_as_time_precision_units(self.clock.now())?,
/* limit */ &i64::try_from(limit)?,
],
)
Expand Down
4 changes: 2 additions & 2 deletions aggregator_core/src/datastore/leases.rs
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ WHERE aggregation_jobs.task_id = $1
/* task ID */ &task_info.pkey,
/* aggregation_job_id */ &aggregation_job_id.as_ref(),
/* threshold */
&task_info.report_expiry_threshold(self.clock.now())?,
&task_info.report_expiry_threshold_as_time_precision_units(self.clock.now())?,
],
)
.await?
Expand Down Expand Up @@ -221,7 +221,7 @@ WHERE aggregation_jobs.task_id = $1
&[
/* task ID */ &task_info.pkey,
/* threshold */
&task_info.report_expiry_threshold(self.clock.now())?,
&task_info.report_expiry_threshold_as_time_precision_units(self.clock.now())?,
],
)
.await?
Expand Down
104 changes: 102 additions & 2 deletions aggregator_core/src/datastore/models.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ use janus_messages::{
taskprov::TimePrecision,
};
use postgres_protocol::types::{
Range, RangeBound, empty_range_to_sql, range_from_sql, range_to_sql, timestamp_from_sql,
timestamp_to_sql,
Range, RangeBound, empty_range_to_sql, int8_from_sql, int8_to_sql, range_from_sql,
range_to_sql, timestamp_from_sql, timestamp_to_sql,
};
use postgres_types::{FromSql, ToSql, accepts, to_sql_checked};
use prio::{
Expand Down Expand Up @@ -2153,6 +2153,106 @@ impl OutstandingBatch {
}
}

/// Wrapper around [`janus_messages::Interval`] that supports conversions to/from SQL INT8RANGE,
/// representing time intervals in time precision units.
///
/// Once all tables are migrated to time precision units, this type can be renamed to `SqlInterval`
/// and the other can be deleted.
#[derive(Debug, Clone, Copy, Eq, PartialEq)]
pub struct SqlIntervalTimePrecision(Interval);

impl From<Interval> for SqlIntervalTimePrecision {
fn from(interval: Interval) -> Self {
Self(interval)
}
}

impl From<SqlIntervalTimePrecision> for Interval {
fn from(value: SqlIntervalTimePrecision) -> Self {
value.0
}
}

impl<'a> FromSql<'a> for SqlIntervalTimePrecision {
fn from_sql(
_: &postgres_types::Type,
raw: &'a [u8],
) -> Result<Self, Box<dyn std::error::Error + Sync + Send>> {
match range_from_sql(raw)? {
Range::Empty => Ok(Self(Interval::EMPTY)),
Range::Nonempty(RangeBound::Inclusive(None), _)
| Range::Nonempty(RangeBound::Exclusive(None), _)
| Range::Nonempty(_, RangeBound::Inclusive(None))
| Range::Nonempty(_, RangeBound::Exclusive(None))
| Range::Nonempty(RangeBound::Unbounded, _)
| Range::Nonempty(_, RangeBound::Unbounded) => Err("Interval must be bounded".into()),
Range::Nonempty(RangeBound::Exclusive(_), _)
| Range::Nonempty(_, RangeBound::Inclusive(_)) => {
Err("Interval must be half-open".into())
}
Range::Nonempty(
RangeBound::Inclusive(Some(start_raw)),
RangeBound::Exclusive(Some(end_raw)),
) => {
// These values are the start and end of the interval in time precision units.
let start_time = int8_from_sql(start_raw)?
.try_into()
.map_err(|_| "interval start must be positive")?;
let end_time: u64 = int8_from_sql(end_raw)?
.try_into()
.map_err(|_| "interval start must be positive")?;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: typo

Suggested change
.map_err(|_| "interval start must be positive")?;
.map_err(|_| "interval end must be positive")?;

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shoot, I jumped the gun. I'll get this in the next PR.


Ok(Interval::new(
Time::from_time_precision_units(start_time),
Duration::from_time_precision_units(
end_time
.checked_sub(start_time)
.ok_or("interval end must be >= start")?,
),
)?
.into())
}
}
}

accepts!(INT8_RANGE);
}

impl ToSql for SqlIntervalTimePrecision {
fn to_sql(
&self,
_: &postgres_types::Type,
out: &mut bytes::BytesMut,
) -> Result<postgres_types::IsNull, Box<dyn std::error::Error + Sync + Send>> {
// Convert the interval start and end to values in time precision units.
if self.0 == Interval::EMPTY {
empty_range_to_sql(out);
return Ok(postgres_types::IsNull::No);
}

let start = self.0.start().as_signed_time_precision_units()?;
let end = self.0.end().as_signed_time_precision_units()?;

range_to_sql(
|out| {
int8_to_sql(start, out);
Ok(RangeBound::Inclusive(postgres_protocol::IsNull::No))
},
|out| {
int8_to_sql(end, out);
Ok(RangeBound::Exclusive(postgres_protocol::IsNull::No))
},
out,
)?;

Ok(postgres_types::IsNull::No)
}

accepts!(INT8_RANGE);

to_sql_checked!();
}

/// The SQL timestamp epoch is midnight UTC on 2000-01-01. This const represents
/// the Unix epoch seconds (946_684_800) in the context of SQL_UNIT_TIME_PRECISION
/// and should not be necessary after Issue #4206.
Expand Down
7 changes: 4 additions & 3 deletions db/00000000000001_initial_schema.up.sql
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
-- Load pgcrypto for gen_random_bytes.
CREATE EXTENSION pgcrypto;
-- Load an extension to allow indexing over both BIGINT and TSRANGE in a multicolumn GiST index.
-- Load an extension to allow indexing over both BIGINT and INT8RANGE in a multicolumn GiST index.
CREATE EXTENSION btree_gist;

-- Identifies which aggregator role is being played for this task.
Expand Down Expand Up @@ -263,8 +263,9 @@ CREATE TABLE aggregation_jobs(
aggregation_job_id BYTEA NOT NULL, -- 16-byte AggregationJobID as defined by the DAP specification
aggregation_param BYTEA NOT NULL, -- encoded aggregation parameter (opaque VDAF message)
batch_id BYTEA NOT NULL, -- batch ID (leader-selected only; corresponds to identifier in BatchSelector)
-- the minimal interval containing all of client timestamps included in this aggregation job
client_timestamp_interval TSTZRANGE NOT NULL,
-- The minimal interval, in time precision units, containing all of client timestamps included
-- in this aggregation job
client_timestamp_interval INT8RANGE NOT NULL,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On line 3 we have an out of date comment that maybe should note that the INT8RANGE is also allowed to be an index (line 288) due to the GiST extension

state AGGREGATION_JOB_STATE NOT NULL, -- current state of the aggregation job
step INTEGER NOT NULL, -- current step of the aggregation job
last_request_hash BYTEA, -- SHA-256 hash of the most recently received AggregationJobInitReq or AggregationJobContinueReq (helper only)
Expand Down
Loading