diff --git a/Cargo.lock b/Cargo.lock index 064dc41..7b7733a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -20,8 +20,7 @@ dependencies = [ [[package]] name = "apalis" version = "1.0.0-rc.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f93be0eb33b912f5e66004d0b756423c285273259068b1c80a71d7842658189b" +source = "git+https://github.com/apalis-dev/apalis#82670cfd52c6054d69620e2fbe8c140d573fa7b5" dependencies = [ "apalis-core", "futures-util", @@ -34,8 +33,7 @@ dependencies = [ [[package]] name = "apalis-codec" version = "0.1.0-rc.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a5ed6bb8e64c360ed4ad666a6cbc42e9e6df73087461dc4071f510a3af284637" +source = "git+https://github.com/apalis-dev/apalis#82670cfd52c6054d69620e2fbe8c140d573fa7b5" dependencies = [ "apalis-core", "serde", @@ -45,8 +43,7 @@ dependencies = [ [[package]] name = "apalis-core" version = "1.0.0-rc.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6b1557d680ee4a9b42a76ab3a9572cee1a00d45e7eb455427d906c42774766e7" +source = "git+https://github.com/apalis-dev/apalis#82670cfd52c6054d69620e2fbe8c140d573fa7b5" dependencies = [ "futures-channel", "futures-core", @@ -71,7 +68,6 @@ dependencies = [ "apalis-sql", "apalis-workflow", "async-std", - "chrono", "futures", "futures-util", "once_cell", @@ -87,21 +83,20 @@ dependencies = [ [[package]] name = "apalis-sql" version = "1.0.0-rc.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5ade5d8faa60e9975b01d3bb1ebc5028589aa4986365eaa4d080d30ed3b5141f" +source = "git+https://github.com/apalis-dev/apalis#82670cfd52c6054d69620e2fbe8c140d573fa7b5" dependencies = [ "apalis-core", "chrono", "serde", "serde_json", "thiserror", + "time", ] [[package]] name = "apalis-workflow" version = "0.1.0-rc.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bc024da2d5d3ab59cc9fea099a2e2b20de5ff608f2e287abcb73aa45e4966a89" +source = "git+https://github.com/apalis-dev/apalis#82670cfd52c6054d69620e2fbe8c140d573fa7b5" dependencies = [ "apalis-core", "futures", @@ -345,9 +340,9 @@ checksum = "b35204fbdc0b3f4446b89fc1ac2cf84a8a68971995d0bf2e925ec7cd960f9cb3" [[package]] name = "cc" -version = "1.2.50" +version = "1.2.51" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9f50d563227a1c37cc0a263f64eca3334388c01c5e4c4861a9def205c614383c" +checksum = "7a0aeaff4ff1a90589618835a598e545176939b97874f7abc7851caa0618f203" dependencies = [ "find-msvc-tools", "shlex", @@ -464,6 +459,16 @@ dependencies = [ "zeroize", ] +[[package]] +name = "deranged" +version = "0.5.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ececcb659e7ba858fb4f10388c250a7252eb0a27373f1a72b8748afdd248e587" +dependencies = [ + "powerfmt", + "serde_core", +] + [[package]] name = "digest" version = "0.10.7" @@ -573,9 +578,9 @@ checksum = "37909eebbb50d72f9059c3b6d82c0463f2ff062c9e95845c43a6c9c0355411be" [[package]] name = "find-msvc-tools" -version = "0.1.5" +version = "0.1.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3a3076410a55c90011c298b04d0cfa770b00fa04e1e3c97d3f6c9de105a03844" +checksum = "645cbb3a84e60b7531617d5ae4e57f7e27308f6445f5abf653209ea76dec8dff" [[package]] name = "fixedbitset" @@ -1038,9 +1043,9 @@ dependencies = [ [[package]] name = "itoa" -version = "1.0.16" +version = "1.0.17" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7ee5b5339afb4c41626dde77b7a611bd4f2c202b897852b4bcf5d03eddc61010" +checksum = "92ecc6618181def0457392ccd0ee51198e065e016d1d527a7ac1b6dc7c1f09d2" [[package]] name = "js-sys" @@ -1084,13 +1089,13 @@ checksum = "f9fbbcab51052fe104eb5e5d351cf728d30a5be1fe14d9be8a3b097481fb97de" [[package]] name = "libredox" -version = "0.1.11" +version = "0.1.12" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "df15f6eac291ed1cf25865b1ee60399f57e7c227e7f51bdbd4c5270396a9ed50" +checksum = "3d0b95e02c851351f877147b7deea7b1afb1df71b63aa5f8270716e0c5720616" dependencies = [ "bitflags 2.10.0", "libc", - "redox_syscall 0.6.0", + "redox_syscall 0.7.0", ] [[package]] @@ -1199,6 +1204,12 @@ dependencies = [ "zeroize", ] +[[package]] +name = "num-conv" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "51d515d32fb182ee37cda2ccdcb92950d6a3c2893aa280e540671c2cd0f3b1d9" + [[package]] name = "num-integer" version = "0.1.46" @@ -1445,6 +1456,12 @@ dependencies = [ "zerovec", ] +[[package]] +name = "powerfmt" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "439ee305def115ba05938db6eb1644ff94165c5ab5e9420d1c1bcedbba909391" + [[package]] name = "ppv-lite86" version = "0.2.21" @@ -1456,9 +1473,9 @@ dependencies = [ [[package]] name = "proc-macro2" -version = "1.0.103" +version = "1.0.104" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5ee95bc4ef87b8d5ba32e8b7714ccc834865276eab0aed5c9958d00ec45f49e8" +checksum = "9695f8df41bb4f3d222c95a67532365f569318332d03d5f3f67f37b20e6ebdf0" dependencies = [ "unicode-ident", ] @@ -1548,9 +1565,9 @@ dependencies = [ [[package]] name = "redox_syscall" -version = "0.6.0" +version = "0.7.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ec96166dafa0886eb81fe1c0a388bece180fbef2135f97c1e2cf8302e74b43b5" +checksum = "49f3fe0889e69e2ae9e41f4d6c4c0181701d00e4697b356fb1f74173a5e0ee27" dependencies = [ "bitflags 2.10.0", ] @@ -1658,9 +1675,9 @@ checksum = "b39cdef0fa800fc44525c84ccb54a029961a8215f9619753635a9c0d2538d46d" [[package]] name = "ryu" -version = "1.0.21" +version = "1.0.22" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "62049b2877bf12821e8f9ad256ee38fdc31db7387ec2d3b3f403024de2034aea" +checksum = "a50f4cf475b65d88e057964e0e9bb1f0aa9bbb2036dc65c64596b42932536984" [[package]] name = "schannel" @@ -1732,9 +1749,9 @@ dependencies = [ [[package]] name = "serde_json" -version = "1.0.147" +version = "1.0.148" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6af14725505314343e673e9ecb7cd7e8a36aa9791eb936235a3567cc31447ae4" +checksum = "3084b546a1dd6289475996f182a22aba973866ea8e8b02c51d9f46b1336a22da" dependencies = [ "itoa", "memchr", @@ -1893,6 +1910,7 @@ dependencies = [ "sha2", "smallvec", "thiserror", + "time", "tokio", "tokio-stream", "tracing", @@ -1978,6 +1996,7 @@ dependencies = [ "sqlx-core", "stringprep", "thiserror", + "time", "tracing", "whoami", ] @@ -2016,6 +2035,7 @@ dependencies = [ "sqlx-core", "stringprep", "thiserror", + "time", "tracing", "whoami", ] @@ -2041,6 +2061,7 @@ dependencies = [ "serde_urlencoded", "sqlx-core", "thiserror", + "time", "tracing", "url", ] @@ -2129,6 +2150,37 @@ dependencies = [ "syn", ] +[[package]] +name = "time" +version = "0.3.44" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "91e7d9e3bb61134e77bde20dd4825b97c010155709965fedf0f49bb138e52a9d" +dependencies = [ + "deranged", + "itoa", + "num-conv", + "powerfmt", + "serde", + "time-core", + "time-macros", +] + +[[package]] +name = "time-core" +version = "0.1.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "40868e7c1d2f0b8d73e4a8c7f0ff63af4f6d19be117e90bd73eb1d62cf831c6b" + +[[package]] +name = "time-macros" +version = "0.2.24" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "30cfb0125f12d9c277f35663a0a33f8c30190f4e4574868a330595412d34ebf3" +dependencies = [ + "num-conv", + "time-core", +] + [[package]] name = "tinystr" version = "0.8.2" @@ -2904,6 +2956,6 @@ dependencies = [ [[package]] name = "zmij" -version = "0.1.8" +version = "1.0.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f1dccf46b25b205e4bebe1d5258a991df1cc17801017a845cb5b3fe0269781aa" +checksum = "e9747e91771f56fd7893e1164abd78febd14a670ceec257caad15e051de35f06" diff --git a/Cargo.toml b/Cargo.toml index 07a652e..e40e367 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -14,21 +14,22 @@ categories = ["asynchronous", "database", "network-programming"] publish = true [features] -default = ["migrate", "tokio-comp"] +default = ["migrate", "tokio-comp", "chrono"] migrate = ["sqlx/migrate", "sqlx/macros"] async-std-comp = ["async-std", "sqlx/runtime-async-std-rustls"] async-std-comp-native-tls = ["async-std", "sqlx/runtime-async-std-native-tls"] tokio-comp = ["tokio", "sqlx/runtime-tokio-rustls"] tokio-comp-native-tls = ["tokio", "sqlx/runtime-tokio-native-tls"] +chrono = ["apalis-sql/chrono", "sqlx/chrono"] +time = ["apalis-sql/time", "sqlx/time"] [dependencies] -apalis-core = { version = "1.0.0-rc.1", default-features = false, features = [ +apalis-core = { git = "https://github.com/apalis-dev/apalis", version = "1.0.0-rc.1", default-features = false, features = [ "sleep", ] } -apalis-sql = { version = "1.0.0-rc.1", default-features = false } -apalis-codec = { version = "0.1.0-rc.1", features = ["json"] } +apalis-sql = { git = "https://github.com/apalis-dev/apalis", version = "1.0.0-rc.1", default-features = false } +apalis-codec = { git = "https://github.com/apalis-dev/apalis", version = "0.1.0-rc.1", features = ["json"] } serde = { version = "1", features = ["derive"], default-features = false } -chrono = { version = "0.4", features = ["serde"], default-features = false } pin-project = "1.1.10" serde_json = "1" futures = "0.3.30" @@ -44,11 +45,11 @@ ulid = { version = "1", features = ["serde"] } [dependencies.sqlx] version = "0.8.1" default-features = false -features = ["chrono", "postgres", "json"] +features = ["postgres", "json"] [dev-dependencies] tokio = { version = "1", features = ["macros", "rt-multi-thread"] } once_cell = "1.19.0" -apalis = { version = "1.0.0-rc.1" } -apalis-workflow = { version = "0.1.0-rc.1" } +apalis = { git = "https://github.com/apalis-dev/apalis", version = "1.0.0-rc.1" } +apalis-workflow = { git = "https://github.com/apalis-dev/apalis", version = "0.1.0-rc.1" } futures-util = "0.3.30" diff --git a/src/from_row.rs b/src/from_row.rs index d4a1dc3..f988429 100644 --- a/src/from_row.rs +++ b/src/from_row.rs @@ -1,4 +1,5 @@ -use chrono::{DateTime, Utc}; +use apalis_sql::{SqlDateTime, TaskRow}; + #[derive(Debug)] pub struct PgTaskRow { pub job: Option>, @@ -7,19 +8,19 @@ pub struct PgTaskRow { pub status: Option, pub attempts: Option, pub max_attempts: Option, - pub run_at: Option>, + pub run_at: Option, pub last_result: Option, - pub lock_at: Option>, + pub lock_at: Option, pub lock_by: Option, - pub done_at: Option>, + pub done_at: Option, pub priority: Option, pub metadata: Option, } -impl TryInto for PgTaskRow { +impl TryInto for PgTaskRow { type Error = sqlx::Error; - fn try_into(self) -> Result { - Ok(apalis_sql::from_row::TaskRow { + fn try_into(self) -> Result { + Ok(TaskRow { job: self.job.unwrap_or_default(), id: self .id diff --git a/src/lib.rs b/src/lib.rs index 34f49af..452e665 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -36,7 +36,7 @@ mod ack; mod fetcher; mod from_row; -pub type PgContext = apalis_sql::context::SqlContext; +pub type PgContext = apalis_sql::context::SqlContext; mod queries; pub mod shared; pub mod sink; @@ -252,8 +252,9 @@ where type CompactStream = TaskStream, Self::Error>; fn get_queue(&self) -> Queue { - self.config.queue().clone() + Queue::from(self.config.queue().to_string()) } + fn poll_compact(self, worker: &WorkerContext) -> Self::CompactStream { self.poll_basic(worker).boxed() } @@ -349,7 +350,7 @@ where type CompactStream = TaskStream, Self::Error>; fn get_queue(&self) -> Queue { - self.config.queue().clone() + Queue::from(self.config.queue().to_string()) } fn poll_compact(self, worker: &WorkerContext) -> Self::CompactStream { diff --git a/src/queries/keep_alive.rs b/src/queries/keep_alive.rs index c0a5a4a..3332093 100644 --- a/src/queries/keep_alive.rs +++ b/src/queries/keep_alive.rs @@ -1,5 +1,5 @@ use apalis_core::worker::context::WorkerContext; -use chrono::Utc; +use apalis_sql::{SqlDateTime, SqlDateTimeExt}; use futures::{FutureExt, Stream, stream}; use sqlx::PgPool; @@ -36,7 +36,7 @@ pub async fn initial_heartbeat( storage_type: &str, ) -> Result<(), sqlx::Error> { reenqueue_orphaned(pool.clone(), config.clone()).await?; - let last_seen = Utc::now(); + let last_seen = SqlDateTime::now(); register_worker( pool, config.queue().to_string(), diff --git a/src/queries/list_workers.rs b/src/queries/list_workers.rs index adff2a5..20f42fc 100644 --- a/src/queries/list_workers.rs +++ b/src/queries/list_workers.rs @@ -1,5 +1,5 @@ use apalis_core::backend::{BackendExt, ListWorkers, RunningWorker}; -use chrono::{DateTime, Utc}; +use apalis_sql::{SqlDateTime, SqlDateTimeExt}; use futures::TryFutureExt; use ulid::Ulid; @@ -9,8 +9,8 @@ pub struct WorkerRow { pub worker_type: String, pub storage_name: String, pub layers: Option, - pub last_seen: DateTime, - pub started_at: Option>, + pub last_seen: SqlDateTime, + pub started_at: Option, } use crate::{CompactType, PgContext, PostgresStorage}; @@ -42,8 +42,11 @@ where .map(|w| RunningWorker { id: w.id, backend: w.storage_name, - started_at: w.started_at.map(|t| t.timestamp()).unwrap_or_default() as u64, - last_heartbeat: w.last_seen.timestamp() as u64, + started_at: w + .started_at + .map(|t| t.to_unix_timestamp()) + .unwrap_or_default() as u64, + last_heartbeat: w.last_seen.to_unix_timestamp() as u64, layers: w.layers.unwrap_or_default(), queue: w.worker_type, }) @@ -73,8 +76,11 @@ where .map(|w| RunningWorker { id: w.id, backend: w.storage_name, - started_at: w.started_at.map(|t| t.timestamp()).unwrap_or_default() as u64, - last_heartbeat: w.last_seen.timestamp() as u64, + started_at: w + .started_at + .map(|t| t.to_unix_timestamp()) + .unwrap_or_default() as u64, + last_heartbeat: w.last_seen.to_unix_timestamp() as u64, layers: w.layers.unwrap_or_default(), queue: w.worker_type, }) diff --git a/src/queries/register_worker.rs b/src/queries/register_worker.rs index 641c4f5..7ed0f40 100644 --- a/src/queries/register_worker.rs +++ b/src/queries/register_worker.rs @@ -1,12 +1,12 @@ use apalis_core::worker::context::WorkerContext; -use chrono::{DateTime, Utc}; +use apalis_sql::SqlDateTime; use sqlx::PgPool; pub async fn register( pool: PgPool, worker_type: String, worker: WorkerContext, - last_seen: DateTime, + last_seen: SqlDateTime, backend_type: &str, ) -> Result<(), sqlx::Error> { let res = sqlx::query_file!( diff --git a/src/shared.rs b/src/shared.rs index 46a08a5..b21fe83 100644 --- a/src/shared.rs +++ b/src/shared.rs @@ -223,7 +223,7 @@ where type CompactStream = TaskStream, Self::Error>; fn get_queue(&self) -> Queue { - self.config.queue().clone() + Queue::from(self.config.queue().to_string()) } fn poll_compact(self, worker: &WorkerContext) -> Self::CompactStream { diff --git a/src/sink.rs b/src/sink.rs index 2c0565f..6dcb89f 100644 --- a/src/sink.rs +++ b/src/sink.rs @@ -1,6 +1,5 @@ use apalis_codec::json::JsonCodec; -use apalis_sql::config::Config; -use chrono::{DateTime, Utc}; +use apalis_sql::{SqlDateTime, SqlDateTimeExt, config::Config}; use futures::{ FutureExt, Sink, TryFutureExt, future::{BoxFuture, Shared}, @@ -64,7 +63,9 @@ where .unwrap_or(Ulid::new().to_string()), ); job_data.push(task.args); - run_ats.push(DateTime::from_timestamp(task.parts.run_at as i64, 0).unwrap_or(Utc::now())); + run_ats.push(::from_unix_timestamp( + task.parts.run_at as i64, + )); priorities.push(task.parts.ctx.priority()); max_attempts_vec.push(task.parts.ctx.max_attempts()); metadata.push(serde_json::Value::Object(task.parts.ctx.meta().clone()));