-
Notifications
You must be signed in to change notification settings - Fork 14
db: migrate aggregation_jobs table to new times
#4393
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||
|---|---|---|---|---|---|---|
|
|
@@ -26,8 +26,8 @@ use janus_messages::{ | |||||
| taskprov::TimePrecision, | ||||||
| }; | ||||||
| use postgres_protocol::types::{ | ||||||
| Range, RangeBound, empty_range_to_sql, range_from_sql, range_to_sql, timestamp_from_sql, | ||||||
| timestamp_to_sql, | ||||||
| Range, RangeBound, empty_range_to_sql, int8_from_sql, int8_to_sql, range_from_sql, | ||||||
| range_to_sql, timestamp_from_sql, timestamp_to_sql, | ||||||
| }; | ||||||
| use postgres_types::{FromSql, ToSql, accepts, to_sql_checked}; | ||||||
| use prio::{ | ||||||
|
|
@@ -2153,6 +2153,106 @@ impl OutstandingBatch { | |||||
| } | ||||||
| } | ||||||
|
|
||||||
| /// Wrapper around [`janus_messages::Interval`] that supports conversions to/from SQL INT8RANGE, | ||||||
| /// representing time intervals in time precision units. | ||||||
| /// | ||||||
| /// Once all tables are migrated to time precision units, this type can be renamed to `SqlInterval` | ||||||
| /// and the other can be deleted. | ||||||
| #[derive(Debug, Clone, Copy, Eq, PartialEq)] | ||||||
| pub struct SqlIntervalTimePrecision(Interval); | ||||||
|
|
||||||
| impl From<Interval> for SqlIntervalTimePrecision { | ||||||
| fn from(interval: Interval) -> Self { | ||||||
| Self(interval) | ||||||
| } | ||||||
| } | ||||||
|
|
||||||
| impl From<SqlIntervalTimePrecision> for Interval { | ||||||
| fn from(value: SqlIntervalTimePrecision) -> Self { | ||||||
| value.0 | ||||||
| } | ||||||
| } | ||||||
|
|
||||||
| impl<'a> FromSql<'a> for SqlIntervalTimePrecision { | ||||||
| fn from_sql( | ||||||
| _: &postgres_types::Type, | ||||||
| raw: &'a [u8], | ||||||
| ) -> Result<Self, Box<dyn std::error::Error + Sync + Send>> { | ||||||
| match range_from_sql(raw)? { | ||||||
| Range::Empty => Ok(Self(Interval::EMPTY)), | ||||||
| Range::Nonempty(RangeBound::Inclusive(None), _) | ||||||
| | Range::Nonempty(RangeBound::Exclusive(None), _) | ||||||
| | Range::Nonempty(_, RangeBound::Inclusive(None)) | ||||||
| | Range::Nonempty(_, RangeBound::Exclusive(None)) | ||||||
| | Range::Nonempty(RangeBound::Unbounded, _) | ||||||
| | Range::Nonempty(_, RangeBound::Unbounded) => Err("Interval must be bounded".into()), | ||||||
| Range::Nonempty(RangeBound::Exclusive(_), _) | ||||||
| | Range::Nonempty(_, RangeBound::Inclusive(_)) => { | ||||||
| Err("Interval must be half-open".into()) | ||||||
| } | ||||||
| Range::Nonempty( | ||||||
| RangeBound::Inclusive(Some(start_raw)), | ||||||
| RangeBound::Exclusive(Some(end_raw)), | ||||||
| ) => { | ||||||
| // These values are the start and end of the interval in time precision units. | ||||||
| let start_time = int8_from_sql(start_raw)? | ||||||
| .try_into() | ||||||
| .map_err(|_| "interval start must be positive")?; | ||||||
| let end_time: u64 = int8_from_sql(end_raw)? | ||||||
| .try_into() | ||||||
| .map_err(|_| "interval start must be positive")?; | ||||||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: typo
Suggested change
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Shoot, I jumped the gun. I'll get this in the next PR. |
||||||
|
|
||||||
| Ok(Interval::new( | ||||||
| Time::from_time_precision_units(start_time), | ||||||
| Duration::from_time_precision_units( | ||||||
| end_time | ||||||
| .checked_sub(start_time) | ||||||
| .ok_or("interval end must be >= start")?, | ||||||
| ), | ||||||
| )? | ||||||
| .into()) | ||||||
| } | ||||||
| } | ||||||
| } | ||||||
|
|
||||||
| accepts!(INT8_RANGE); | ||||||
| } | ||||||
|
|
||||||
| impl ToSql for SqlIntervalTimePrecision { | ||||||
| fn to_sql( | ||||||
| &self, | ||||||
| _: &postgres_types::Type, | ||||||
| out: &mut bytes::BytesMut, | ||||||
| ) -> Result<postgres_types::IsNull, Box<dyn std::error::Error + Sync + Send>> { | ||||||
| // Convert the interval start and end to values in time precision units. | ||||||
| if self.0 == Interval::EMPTY { | ||||||
| empty_range_to_sql(out); | ||||||
| return Ok(postgres_types::IsNull::No); | ||||||
| } | ||||||
|
|
||||||
| let start = self.0.start().as_signed_time_precision_units()?; | ||||||
| let end = self.0.end().as_signed_time_precision_units()?; | ||||||
|
|
||||||
| range_to_sql( | ||||||
| |out| { | ||||||
| int8_to_sql(start, out); | ||||||
| Ok(RangeBound::Inclusive(postgres_protocol::IsNull::No)) | ||||||
| }, | ||||||
| |out| { | ||||||
| int8_to_sql(end, out); | ||||||
| Ok(RangeBound::Exclusive(postgres_protocol::IsNull::No)) | ||||||
| }, | ||||||
| out, | ||||||
| )?; | ||||||
|
|
||||||
| Ok(postgres_types::IsNull::No) | ||||||
| } | ||||||
|
|
||||||
| accepts!(INT8_RANGE); | ||||||
|
|
||||||
| to_sql_checked!(); | ||||||
| } | ||||||
|
|
||||||
| /// The SQL timestamp epoch is midnight UTC on 2000-01-01. This const represents | ||||||
| /// the Unix epoch seconds (946_684_800) in the context of SQL_UNIT_TIME_PRECISION | ||||||
| /// and should not be necessary after Issue #4206. | ||||||
|
|
||||||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,6 +1,6 @@ | ||
| -- Load pgcrypto for gen_random_bytes. | ||
| CREATE EXTENSION pgcrypto; | ||
| -- Load an extension to allow indexing over both BIGINT and TSRANGE in a multicolumn GiST index. | ||
| -- Load an extension to allow indexing over both BIGINT and INT8RANGE in a multicolumn GiST index. | ||
| CREATE EXTENSION btree_gist; | ||
|
|
||
| -- Identifies which aggregator role is being played for this task. | ||
|
|
@@ -263,8 +263,9 @@ CREATE TABLE aggregation_jobs( | |
| aggregation_job_id BYTEA NOT NULL, -- 16-byte AggregationJobID as defined by the DAP specification | ||
| aggregation_param BYTEA NOT NULL, -- encoded aggregation parameter (opaque VDAF message) | ||
| batch_id BYTEA NOT NULL, -- batch ID (leader-selected only; corresponds to identifier in BatchSelector) | ||
| -- the minimal interval containing all of client timestamps included in this aggregation job | ||
| client_timestamp_interval TSTZRANGE NOT NULL, | ||
| -- The minimal interval, in time precision units, containing all of client timestamps included | ||
| -- in this aggregation job | ||
| client_timestamp_interval INT8RANGE NOT NULL, | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. On line 3 we have an out of date comment that maybe should note that the INT8RANGE is also allowed to be an index (line 288) due to the GiST extension |
||
| state AGGREGATION_JOB_STATE NOT NULL, -- current state of the aggregation job | ||
| step INTEGER NOT NULL, -- current step of the aggregation job | ||
| last_request_hash BYTEA, -- SHA-256 hash of the most recently received AggregationJobInitReq or AggregationJobContinueReq (helper only) | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This little bit of math needs close scrutiny. #3 is now in seconds since the epoch, so we're subtracting
report_expiry_age(also in seconds, a BIGINT) then dividing by the BIGINTtime_precisionto get a quantity we can compare toUPPER(aggregation_jobs.client_timestamp_interval). The way we do that in Janus boils down totimestamp / precision_secs(Time::from_seconds_since_epoch) so I think this is OK, because Postgres/is documented to "truncate the result" (https://www.postgresql.org/docs/9.5/functions-math.html).There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The math LGTM, but we do have a functionality change here, I think? Previously tasks without a
report_expiry_ageused-infinityin the comparison, while with the division here we're now getting0, so our>=is only going to match tasks with positive timestamps, while previously with the result going to-infinityit'd always match.I guess that's just an edge case not particularly important for the real world... but you did ask me to apply scrutiny here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I appreciate the scrutiny. This is worth diving into.
The nuance here is signedness of time values. Previously, we stored SQL
TIMESTAMP, which ranges from[-infinity, infinity](is it possible to have a closed range on infinity? whatever). The intention was to use the smallest possible value so that it will always compare less thanUPPER(aggregation_jobs.client_timestamp_interval), which wasTIMESTAMP.Now,
UPPER(aggregation_jobs.client_timestamp_interval)is a DAPTime, which ranges from0, 2^64-1. But we store it as SQLBIGINTwhich is-2^63, +2^63-1. So should we compare against the smallest possibleTimeor the smallest possibleBIGINT?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Furthermore, TIMESTAMP's zero point does not line up with the Unix epoch that DAP uses, so some of our unit tests did care about negative SQL timestamps. Now, both $3 and UPPER(aggregation_jobs.client_timestamp_interval) are using the Unix epoch as their zero point, so I think it would be fine to use either 0 or
x'8000000000000000'::bigintas our fallback value to compare against.