From dd25413fa76950baf1053e2ef38b18f69ed4e05c Mon Sep 17 00:00:00 2001 From: Chris Tsang Date: Sun, 4 Jan 2026 22:02:53 +0000 Subject: [PATCH 1/5] Move cron schedule to worker defined --- oxanus/examples/cron.rs | 6 +++++- oxanus/examples/cron_w_err.rs | 6 +++++- oxanus/src/config.rs | 7 ++++--- oxanus/src/worker.rs | 8 ++++++++ oxanus/tests/integration/cron.rs | 25 ++++++++++++++++++++++++- oxanus/tests/integration/shared.rs | 18 ------------------ 6 files changed, 46 insertions(+), 24 deletions(-) diff --git a/oxanus/examples/cron.rs b/oxanus/examples/cron.rs index 5a7bba9..63325f4 100644 --- a/oxanus/examples/cron.rs +++ b/oxanus/examples/cron.rs @@ -22,6 +22,10 @@ impl oxanus::Worker for TestWorker { tokio::time::sleep(std::time::Duration::from_secs(1)).await; Ok(()) } + + fn cron_schedule() -> Option { + Some("*/5 * * * * *".to_string()) + } } #[derive(Serialize)] @@ -43,7 +47,7 @@ pub async fn main() -> Result<(), oxanus::OxanusError> { let ctx = oxanus::Context::value(WorkerState {}); let storage = oxanus::Storage::builder().build_from_env()?; let config = oxanus::Config::new(&storage) - .register_cron_worker::("*/5 * * * * *", QueueOne) + .register_cron_worker::(QueueOne) .with_graceful_shutdown(tokio::signal::ctrl_c()); oxanus::run(config, ctx).await?; diff --git a/oxanus/examples/cron_w_err.rs b/oxanus/examples/cron_w_err.rs index d1e64a3..ed61ee9 100644 --- a/oxanus/examples/cron_w_err.rs +++ b/oxanus/examples/cron_w_err.rs @@ -37,6 +37,10 @@ impl oxanus::Worker for TestWorker { fn retry_delay(&self, _retries: u32) -> u64 { 0 } + + fn cron_schedule() -> Option { + Some("*/10 * * * * *".to_string()) + } } #[derive(Serialize)] @@ -58,7 +62,7 @@ pub async fn main() -> Result<(), oxanus::OxanusError> { let ctx = oxanus::Context::value(WorkerState {}); let storage = oxanus::Storage::builder().build_from_env()?; let config = oxanus::Config::new(&storage) - .register_cron_worker::("*/10 * * * * *", QueueOne) + .register_cron_worker::(QueueOne) .with_graceful_shutdown(tokio::signal::ctrl_c()); oxanus::run(config, ctx).await?; diff --git a/oxanus/src/config.rs b/oxanus/src/config.rs index d972793..c36ad93 100644 --- a/oxanus/src/config.rs +++ b/oxanus/src/config.rs @@ -57,12 +57,13 @@ impl Config { self } - pub fn register_cron_worker(mut self, schedule: &str, queue: impl Queue) -> Self + pub fn register_cron_worker(mut self, queue: impl Queue) -> Self where - T: Worker + serde::de::DeserializeOwned + 'static, + W: Worker + serde::de::DeserializeOwned + 'static, { self.queues.insert(queue.config()); - self.registry.register_cron::(schedule, queue.key()); + let schedule = W::cron_schedule().expect("Cron Worker must have cron_schedule defined"); + self.registry.register_cron::(&schedule, queue.key()); self } diff --git a/oxanus/src/worker.rs b/oxanus/src/worker.rs index 6a161da..6cc43ba 100644 --- a/oxanus/src/worker.rs +++ b/oxanus/src/worker.rs @@ -33,6 +33,14 @@ pub trait Worker: Send + Sync + UnwindSafe { fn on_conflict(&self) -> JobConflictStrategy { JobConflictStrategy::Skip } + + /// 6 part cron schedule: "* * * * * *" + fn cron_schedule() -> Option + where + Self: Sized, + { + None + } } #[cfg(test)] diff --git a/oxanus/tests/integration/cron.rs b/oxanus/tests/integration/cron.rs index b0e76e4..dae936c 100644 --- a/oxanus/tests/integration/cron.rs +++ b/oxanus/tests/integration/cron.rs @@ -1,7 +1,30 @@ use crate::shared::*; use deadpool_redis::redis::AsyncCommands; +use serde::{Deserialize, Serialize}; use testresult::TestResult; +#[derive(Debug, Serialize, Deserialize)] +pub struct CronWorkerRedisCounter {} + +#[async_trait::async_trait] +impl oxanus::Worker for CronWorkerRedisCounter { + type Context = WorkerState; + type Error = WorkerError; + + async fn process( + &self, + oxanus::Context { ctx, .. }: &oxanus::Context, + ) -> Result<(), WorkerError> { + let mut redis = ctx.redis.get().await?; + let _: () = redis.incr("cron:counter", 1).await?; + Ok(()) + } + + fn cron_schedule() -> Option { + Some("* * * * * *".to_string()) + } +} + #[tokio::test] pub async fn test_cron() -> TestResult { let redis_pool = setup(); @@ -16,7 +39,7 @@ pub async fn test_cron() -> TestResult { .namespace(random_string()) .build_from_pool(redis_pool.clone())?; let config = oxanus::Config::new(&storage) - .register_cron_worker::("* * * * * *", QueueOne) + .register_cron_worker::(QueueOne) .exit_when_processed(2); oxanus::run(config, ctx).await?; diff --git a/oxanus/tests/integration/shared.rs b/oxanus/tests/integration/shared.rs index dc5b7ad..d910314 100644 --- a/oxanus/tests/integration/shared.rs +++ b/oxanus/tests/integration/shared.rs @@ -52,24 +52,6 @@ impl oxanus::Worker for WorkerRedisSet { } } -#[derive(Debug, Serialize, Deserialize)] -pub struct CronWorkerRedisCounter {} - -#[async_trait::async_trait] -impl oxanus::Worker for CronWorkerRedisCounter { - type Context = WorkerState; - type Error = WorkerError; - - async fn process( - &self, - oxanus::Context { ctx, .. }: &oxanus::Context, - ) -> Result<(), WorkerError> { - let mut redis = ctx.redis.get().await?; - let _: () = redis.incr("cron:counter", 1).await?; - Ok(()) - } -} - #[derive(Serialize)] pub struct QueueOne; From 13edfe5dc2c454667821f5f0de7fb3f77685b008 Mon Sep 17 00:00:00 2001 From: Chris Tsang Date: Wed, 7 Jan 2026 18:08:06 +0000 Subject: [PATCH 2/5] Define cron workers with macro --- oxanus-macros/src/queue.rs | 4 ++-- oxanus-macros/src/worker.rs | 35 ++++++++++++++++++++++++++++++ oxanus/src/config.rs | 15 ++++++++++--- oxanus/src/queue.rs | 7 ++++++ oxanus/src/worker.rs | 37 +++++++++++++++++++++++++++++++- oxanus/tests/integration/cron.rs | 7 +++++- 6 files changed, 98 insertions(+), 7 deletions(-) diff --git a/oxanus-macros/src/queue.rs b/oxanus-macros/src/queue.rs index a8e2bc2..87b1175 100644 --- a/oxanus-macros/src/queue.rs +++ b/oxanus-macros/src/queue.rs @@ -80,8 +80,8 @@ pub fn expand_derive_queue(input: DeriveInput) -> TokenStream { quote! { #[automatically_derived] impl oxanus::Queue for #struct_ident { - fn to_config() -> QueueConfig { - QueueConfig::#kind(#key) + fn to_config() -> oxanus::QueueConfig { + oxanus::QueueConfig::#kind(#key) #concurrency #throttle } diff --git a/oxanus-macros/src/worker.rs b/oxanus-macros/src/worker.rs index c60a966..2364222 100644 --- a/oxanus-macros/src/worker.rs +++ b/oxanus-macros/src/worker.rs @@ -14,6 +14,7 @@ struct OxanusArgs { max_retries: Option, unique_id: Option, on_conflict: Option, + cron: Option, } #[derive(Debug)] @@ -31,6 +32,12 @@ enum UniqueIdSpec { CustomFunc(Path), } +#[derive(Debug, FromMeta)] +struct Cron { + schedule: Option, + queue: Option, +} + impl FromMeta for UniqueIdSpec { fn from_meta(meta: &Meta) -> darling::Result { match meta { @@ -136,6 +143,11 @@ pub fn expand_derive_worker(input: DeriveInput) -> TokenStream { None => quote!(), }; + let cron = match args.cron { + Some(cron) => expand_cron(cron), + None => quote!(), + }; + quote! { #[automatically_derived] #[async_trait::async_trait] @@ -152,6 +164,8 @@ pub fn expand_derive_worker(input: DeriveInput) -> TokenStream { #max_retries #on_conflict + + #cron } } } @@ -192,3 +206,24 @@ fn expand_unique_id(spec: UniqueIdSpec, fields: &Fields) -> TokenStream { } } } + +fn expand_cron(cron: Cron) -> TokenStream { + let cron_schedule = cron.schedule; + let queue = cron.queue; + quote! { + fn cron_schedule() -> Option + where + Self: Sized, + { + Some(#cron_schedule.to_string()) + } + + fn cron_queue_config() -> Option + where + Self: Sized, + { + use oxanus::Queue; + Some(#queue::to_config()) + } + } +} diff --git a/oxanus/src/config.rs b/oxanus/src/config.rs index c36ad93..16fd9e8 100644 --- a/oxanus/src/config.rs +++ b/oxanus/src/config.rs @@ -49,14 +49,23 @@ impl Config { self } - pub fn register_worker(mut self) -> Self + pub fn register_worker(mut self) -> Self where - T: Worker + serde::de::DeserializeOwned + 'static, + W: Worker + serde::de::DeserializeOwned + 'static, { - self.registry.register::(); + if let (Some(schedule), Some(queue_config)) = (W::cron_schedule(), W::cron_queue_config()) { + let key = queue_config + .static_key() + .expect("Statically defined cron workers can only use static queues"); + self.queues.insert(queue_config); + self.registry.register_cron::(&schedule, key); + } else { + self.registry.register::(); + } self } + /// Register a cron worker with a dynamic queue pub fn register_cron_worker(mut self, queue: impl Queue) -> Self where W: Worker + serde::de::DeserializeOwned + 'static, diff --git a/oxanus/src/queue.rs b/oxanus/src/queue.rs index 0011492..65d94cb 100644 --- a/oxanus/src/queue.rs +++ b/oxanus/src/queue.rs @@ -70,6 +70,13 @@ impl QueueConfig { self.throttle = Some(throttle); self } + + pub fn static_key(&self) -> Option { + match &self.kind { + QueueKind::Static { key } => Some(key.clone()), + _ => None, + } + } } #[derive(Debug, Clone)] diff --git a/oxanus/src/worker.rs b/oxanus/src/worker.rs index 6cc43ba..1265b89 100644 --- a/oxanus/src/worker.rs +++ b/oxanus/src/worker.rs @@ -1,4 +1,4 @@ -use crate::{context::Context, job_envelope::JobConflictStrategy}; +use crate::{QueueConfig, context::Context, job_envelope::JobConflictStrategy}; use std::panic::UnwindSafe; pub type BoxedWorker = Box>; @@ -41,6 +41,13 @@ pub trait Worker: Send + Sync + UnwindSafe { { None } + + fn cron_queue_config() -> Option + where + Self: Sized, + { + None + } } #[cfg(test)] @@ -220,4 +227,32 @@ mod tests { "worker_id_2_task_22" ); } + + #[cfg(feature = "macros")] + #[tokio::test] + async fn test_define_cron_worker_with_macro() { + use crate as oxanus; // needed for unit test + use crate::Queue; + use std::io::Error as WorkerError; + + #[derive(Serialize, oxanus::Queue)] + struct DefaultQueue; + + #[derive(Serialize, oxanus::Worker)] + #[oxanus(cron(schedule = "*/1 * * * * *", queue = DefaultQueue))] + struct TestCronWorker {} + + impl TestCronWorker { + async fn process(&self, _: &Context) -> Result<(), WorkerError> { + Ok(()) + } + } + + assert_eq!(TestCronWorker {}.unique_id(), None); + assert_eq!(TestCronWorker::cron_schedule().unwrap(), "*/1 * * * * *"); + assert_eq!( + TestCronWorker::cron_queue_config().unwrap(), + DefaultQueue::to_config(), + ); + } } diff --git a/oxanus/tests/integration/cron.rs b/oxanus/tests/integration/cron.rs index dae936c..dd1914d 100644 --- a/oxanus/tests/integration/cron.rs +++ b/oxanus/tests/integration/cron.rs @@ -1,5 +1,6 @@ use crate::shared::*; use deadpool_redis::redis::AsyncCommands; +use oxanus::{Queue, QueueConfig}; use serde::{Deserialize, Serialize}; use testresult::TestResult; @@ -23,6 +24,10 @@ impl oxanus::Worker for CronWorkerRedisCounter { fn cron_schedule() -> Option { Some("* * * * * *".to_string()) } + + fn cron_queue_config() -> Option { + Some(QueueOne::to_config()) + } } #[tokio::test] @@ -39,7 +44,7 @@ pub async fn test_cron() -> TestResult { .namespace(random_string()) .build_from_pool(redis_pool.clone())?; let config = oxanus::Config::new(&storage) - .register_cron_worker::(QueueOne) + .register_worker::() .exit_when_processed(2); oxanus::run(config, ctx).await?; From bb56af6027d4d244e9da2336f81cab614312a04a Mon Sep 17 00:00:00 2001 From: Chris Tsang Date: Wed, 7 Jan 2026 19:51:13 +0000 Subject: [PATCH 3/5] Migrate all examples --- oxanus-macros/src/worker.rs | 83 ++++++++++++++++++++++++---- oxanus/examples/cron.rs | 30 +++------- oxanus/examples/cron_w_err.rs | 44 ++++----------- oxanus/examples/dynamic.rs | 25 +++------ oxanus/examples/foo.rs | 100 ++++++++-------------------------- oxanus/examples/minimal.rs | 32 +++-------- oxanus/examples/resumable.rs | 41 +++----------- oxanus/examples/throttled.rs | 46 ++++------------ oxanus/examples/unique.rs | 41 +++----------- 9 files changed, 155 insertions(+), 287 deletions(-) diff --git a/oxanus-macros/src/worker.rs b/oxanus-macros/src/worker.rs index 2364222..44cab61 100644 --- a/oxanus-macros/src/worker.rs +++ b/oxanus-macros/src/worker.rs @@ -12,6 +12,7 @@ struct OxanusArgs { context: Option, error: Option, max_retries: Option, + retry_delay: Option, unique_id: Option, on_conflict: Option, cron: Option, @@ -32,12 +33,42 @@ enum UniqueIdSpec { CustomFunc(Path), } +#[derive(Debug)] +enum RetryDelay { + /// #[retry_delay = 3] + Value(u64), + /// #[retry_delay = mymod::func] + CustomFunc(Path), +} + #[derive(Debug, FromMeta)] struct Cron { - schedule: Option, + schedule: String, queue: Option, } +impl FromMeta for RetryDelay { + fn from_meta(meta: &Meta) -> darling::Result { + match meta { + Meta::NameValue(nv) => match &nv.value { + Expr::Lit(syn::ExprLit { + lit: syn::Lit::Int(lit), + .. + }) => { + let value = lit.base10_parse::()?; + Ok(RetryDelay::Value(value)) + } + Expr::Path(expr_path) => Ok(RetryDelay::CustomFunc(expr_path.path.clone())), + other => Err(Error::custom(format!( + "unsupported retry_delay value: {:?}", + other + ))), + }, + _ => Err(Error::custom("retry_delay must be a name-value attribute")), + } + } +} + impl FromMeta for UniqueIdSpec { fn from_meta(meta: &Meta) -> darling::Result { match meta { @@ -123,6 +154,11 @@ pub fn expand_derive_worker(input: DeriveInput) -> TokenStream { None => quote!(), }; + let retry_delay = match args.retry_delay { + Some(retry_delay) => expand_retry_delay(retry_delay), + None => quote!(), + }; + let on_conflict = match args.on_conflict { Some(on_conflict) => quote! { fn on_conflict(&self) -> oxanus::JobConflictStrategy { @@ -163,6 +199,8 @@ pub fn expand_derive_worker(input: DeriveInput) -> TokenStream { #max_retries + #retry_delay + #on_conflict #cron @@ -170,6 +208,25 @@ pub fn expand_derive_worker(input: DeriveInput) -> TokenStream { } } +fn expand_retry_delay(retry_delay: RetryDelay) -> TokenStream { + match retry_delay { + RetryDelay::Value(value) => { + quote! { + fn retry_delay(&self, _retries: u32) -> u64 { + #value + } + } + } + RetryDelay::CustomFunc(func) => { + quote! { + fn retry_delay(&self, retries: u32) -> u64 { + #func(self, retries) + } + } + } + } +} + fn expand_unique_id(spec: UniqueIdSpec, fields: &Fields) -> TokenStream { let formatter = match spec { UniqueIdSpec::Shorthand(fmt) => { @@ -197,7 +254,7 @@ fn expand_unique_id(spec: UniqueIdSpec, fields: &Fields) -> TokenStream { } } - UniqueIdSpec::CustomFunc(path) => quote!(#path(self)), + UniqueIdSpec::CustomFunc(func) => quote!(#func(self)), }; quote! { @@ -209,7 +266,19 @@ fn expand_unique_id(spec: UniqueIdSpec, fields: &Fields) -> TokenStream { fn expand_cron(cron: Cron) -> TokenStream { let cron_schedule = cron.schedule; - let queue = cron.queue; + let cron_queue_config = match cron.queue { + Some(queue) => quote! { + fn cron_queue_config() -> Option + where + Self: Sized, + { + use oxanus::Queue; + Some(#queue::to_config()) + } + }, + None => quote!(), + }; + quote! { fn cron_schedule() -> Option where @@ -218,12 +287,6 @@ fn expand_cron(cron: Cron) -> TokenStream { Some(#cron_schedule.to_string()) } - fn cron_queue_config() -> Option - where - Self: Sized, - { - use oxanus::Queue; - Some(#queue::to_config()) - } + #cron_queue_config } } diff --git a/oxanus/examples/cron.rs b/oxanus/examples/cron.rs index 63325f4..15f6773 100644 --- a/oxanus/examples/cron.rs +++ b/oxanus/examples/cron.rs @@ -7,36 +7,22 @@ enum WorkerError {} #[derive(Debug, Serialize, Deserialize, Clone)] struct WorkerState {} -#[derive(Debug, Serialize, Deserialize, Default)] +#[derive(Debug, Serialize, Deserialize, oxanus::Worker)] +#[oxanus(context = WorkerState)] +#[oxanus(cron(schedule = "*/5 * * * * *", queue = QueueOne))] struct TestWorker {} -#[async_trait::async_trait] -impl oxanus::Worker for TestWorker { - type Context = WorkerState; - type Error = WorkerError; - - async fn process( - &self, - oxanus::Context { .. }: &oxanus::Context, - ) -> Result<(), WorkerError> { +impl TestWorker { + async fn process(&self, _: &oxanus::Context) -> Result<(), WorkerError> { tokio::time::sleep(std::time::Duration::from_secs(1)).await; Ok(()) } - - fn cron_schedule() -> Option { - Some("*/5 * * * * *".to_string()) - } } -#[derive(Serialize)] +#[derive(Serialize, oxanus::Queue)] +#[oxanus(key = "one")] struct QueueOne; -impl oxanus::Queue for QueueOne { - fn to_config() -> oxanus::QueueConfig { - oxanus::QueueConfig::as_static("one") - } -} - #[tokio::main] pub async fn main() -> Result<(), oxanus::OxanusError> { tracing_subscriber::registry() @@ -47,7 +33,7 @@ pub async fn main() -> Result<(), oxanus::OxanusError> { let ctx = oxanus::Context::value(WorkerState {}); let storage = oxanus::Storage::builder().build_from_env()?; let config = oxanus::Config::new(&storage) - .register_cron_worker::(QueueOne) + .register_worker::() .with_graceful_shutdown(tokio::signal::ctrl_c()); oxanus::run(config, ctx).await?; diff --git a/oxanus/examples/cron_w_err.rs b/oxanus/examples/cron_w_err.rs index ed61ee9..51394e9 100644 --- a/oxanus/examples/cron_w_err.rs +++ b/oxanus/examples/cron_w_err.rs @@ -9,48 +9,26 @@ enum WorkerError { } #[derive(Debug, Serialize, Deserialize, Clone)] -struct WorkerState {} +struct WorkerContext {} -#[derive(Debug, Serialize, Deserialize, Default)] +#[derive(Debug, Serialize, Deserialize, oxanus::Worker)] +#[oxanus(max_retries = 3, retry_delay = 0)] +#[oxanus(cron(schedule = "*/10 * * * * *"))] struct TestWorker {} -#[async_trait::async_trait] -impl oxanus::Worker for TestWorker { - type Context = WorkerState; - type Error = WorkerError; - - async fn process( - &self, - oxanus::Context { .. }: &oxanus::Context, - ) -> Result<(), WorkerError> { +impl TestWorker { + async fn process(&self, _: &oxanus::Context) -> Result<(), WorkerError> { if rand::rng().random_bool(0.5) { Err(WorkerError::GenericError("foo".to_string())) } else { Ok(()) } } - - fn max_retries(&self) -> u32 { - 3 - } - - fn retry_delay(&self, _retries: u32) -> u64 { - 0 - } - - fn cron_schedule() -> Option { - Some("*/10 * * * * *".to_string()) - } } -#[derive(Serialize)] -struct QueueOne; - -impl oxanus::Queue for QueueOne { - fn to_config() -> oxanus::QueueConfig { - oxanus::QueueConfig::as_static("one") - } -} +#[derive(Serialize, oxanus::Queue)] +#[oxanus(prefix = "two")] +struct QueueDynamic(i32); #[tokio::main] pub async fn main() -> Result<(), oxanus::OxanusError> { @@ -59,10 +37,10 @@ pub async fn main() -> Result<(), oxanus::OxanusError> { .with(EnvFilter::from_default_env()) .init(); - let ctx = oxanus::Context::value(WorkerState {}); + let ctx = oxanus::Context::value(WorkerContext {}); let storage = oxanus::Storage::builder().build_from_env()?; let config = oxanus::Config::new(&storage) - .register_cron_worker::(QueueOne) + .register_cron_worker::(QueueDynamic(2)) .with_graceful_shutdown(tokio::signal::ctrl_c()); oxanus::run(config, ctx).await?; diff --git a/oxanus/examples/dynamic.rs b/oxanus/examples/dynamic.rs index 815ce96..dad5b6d 100644 --- a/oxanus/examples/dynamic.rs +++ b/oxanus/examples/dynamic.rs @@ -7,24 +7,19 @@ enum WorkerError {} #[derive(Debug, Clone)] struct WorkerState {} -#[derive(Debug, Serialize, Deserialize)] +#[derive(Debug, Serialize, Deserialize, oxanus::Worker)] +#[oxanus(context = WorkerState)] struct Worker2Sec {} -#[async_trait::async_trait] -impl oxanus::Worker for Worker2Sec { - type Context = WorkerState; - type Error = WorkerError; - - async fn process( - &self, - oxanus::Context { .. }: &oxanus::Context, - ) -> Result<(), WorkerError> { - tokio::time::sleep(std::time::Duration::from_millis(2000)).await; +impl Worker2Sec { + async fn process(&self, _: &oxanus::Context) -> Result<(), WorkerError> { + tokio::time::sleep(std::time::Duration::from_secs(1)).await; Ok(()) } } -#[derive(Serialize)] +#[derive(Serialize, oxanus::Queue)] +#[oxanus(prefix = "two")] struct QueueDynamic(Animal, i32); #[derive(Debug, Serialize)] @@ -34,12 +29,6 @@ enum Animal { Bird, } -impl oxanus::Queue for QueueDynamic { - fn to_config() -> oxanus::QueueConfig { - oxanus::QueueConfig::as_dynamic("two") - } -} - #[tokio::main] pub async fn main() -> Result<(), oxanus::OxanusError> { tracing_subscriber::registry() diff --git a/oxanus/examples/foo.rs b/oxanus/examples/foo.rs index ec80b05..d5739eb 100644 --- a/oxanus/examples/foo.rs +++ b/oxanus/examples/foo.rs @@ -1,7 +1,7 @@ use serde::{Deserialize, Serialize}; use tracing_subscriber::{EnvFilter, fmt, prelude::*}; -#[derive(Debug, Serialize, Deserialize)] +#[derive(Debug, Serialize, Deserialize, oxanus::Worker)] struct Worker1Sec { id: usize, payload: String, @@ -11,81 +11,57 @@ struct Worker1Sec { enum WorkerError {} #[derive(Debug, Clone)] -struct WorkerState {} +struct WorkerContext {} -#[async_trait::async_trait] -impl oxanus::Worker for Worker1Sec { - type Context = WorkerState; - type Error = WorkerError; - - async fn process( - &self, - oxanus::Context { .. }: &oxanus::Context, - ) -> Result<(), WorkerError> { +impl Worker1Sec { + async fn process(&self, _: &oxanus::Context) -> Result<(), WorkerError> { tokio::time::sleep(std::time::Duration::from_millis(1000)).await; Ok(()) } } -#[derive(Debug, Serialize, Deserialize)] +#[derive(Debug, Serialize, Deserialize, oxanus::Worker)] struct Worker2Sec { id: usize, foo: i32, } -#[async_trait::async_trait] -impl oxanus::Worker for Worker2Sec { - type Context = WorkerState; - type Error = WorkerError; - - async fn process( - &self, - oxanus::Context { .. }: &oxanus::Context, - ) -> Result<(), WorkerError> { +impl Worker2Sec { + async fn process(&self, _: &oxanus::Context) -> Result<(), WorkerError> { tokio::time::sleep(std::time::Duration::from_millis(2000)).await; Ok(()) } } -#[derive(Debug, Serialize, Deserialize)] +#[derive(Debug, Serialize, Deserialize, oxanus::Worker)] struct WorkerInstant {} -#[async_trait::async_trait] -impl oxanus::Worker for WorkerInstant { - type Context = WorkerState; - type Error = WorkerError; - - async fn process( - &self, - oxanus::Context { .. }: &oxanus::Context, - ) -> Result<(), WorkerError> { +impl WorkerInstant { + async fn process(&self, _: &oxanus::Context) -> Result<(), WorkerError> { Ok(()) } } -#[derive(Debug, Serialize, Deserialize)] +#[derive(Debug, Serialize, Deserialize, oxanus::Worker)] struct WorkerInstant2 {} -#[async_trait::async_trait] -impl oxanus::Worker for WorkerInstant2 { - type Context = WorkerState; - type Error = WorkerError; - - async fn process( - &self, - oxanus::Context { .. }: &oxanus::Context, - ) -> Result<(), WorkerError> { +impl WorkerInstant2 { + async fn process(&self, _: &oxanus::Context) -> Result<(), WorkerError> { Ok(()) } } -#[derive(Serialize)] +#[derive(Serialize, oxanus::Queue)] +#[oxanus(key = "one", concurrency = 1)] struct QueueOne; -#[derive(Serialize)] +#[derive(Serialize, oxanus::Queue)] +#[oxanus(prefix = "two")] struct QueueTwo(Animal, i32); -#[derive(Serialize)] +#[derive(Serialize, oxanus::Queue)] +#[oxanus(key = "throttled")] +#[oxanus(throttle(window_ms = 1500, limit = 1))] struct QueueThrottled; #[derive(Debug, Serialize)] @@ -95,38 +71,6 @@ enum Animal { Bird, } -impl oxanus::Queue for QueueOne { - fn to_config() -> oxanus::QueueConfig { - oxanus::QueueConfig { - kind: oxanus::QueueKind::Static { - key: "one".to_string(), - }, - concurrency: 1, - throttle: None, - } - } -} - -impl oxanus::Queue for QueueTwo { - fn to_config() -> oxanus::QueueConfig { - oxanus::QueueConfig::as_dynamic("two") - } -} - -impl oxanus::Queue for QueueThrottled { - fn to_config() -> oxanus::QueueConfig { - oxanus::QueueConfig { - kind: oxanus::QueueKind::Static { - key: "throttled".to_string(), - }, - concurrency: 1, - throttle: Some(oxanus::QueueThrottle { - limit: 1, - window_ms: 1500, - }), - } - } -} #[tokio::main] pub async fn main() -> Result<(), oxanus::OxanusError> { tracing_subscriber::registry() @@ -134,7 +78,7 @@ pub async fn main() -> Result<(), oxanus::OxanusError> { .with(EnvFilter::from_default_env()) .init(); - let ctx = oxanus::Context::value(WorkerState {}); + let ctx = oxanus::Context::value(WorkerContext {}); let storage = oxanus::Storage::builder().build_from_env()?; let config = oxanus::Config::new(&storage.clone()) .register_queue::() @@ -144,7 +88,7 @@ pub async fn main() -> Result<(), oxanus::OxanusError> { .register_worker::() .register_worker::() .register_worker::() - .exit_when_processed(13); + .exit_when_processed(12); storage .enqueue( diff --git a/oxanus/examples/minimal.rs b/oxanus/examples/minimal.rs index 3150550..f9c74a2 100644 --- a/oxanus/examples/minimal.rs +++ b/oxanus/examples/minimal.rs @@ -5,42 +5,24 @@ use tracing_subscriber::{EnvFilter, fmt, prelude::*}; enum WorkerError {} #[derive(Debug, Serialize, Deserialize, Clone)] -struct WorkerState {} +struct WorkerContext {} -#[derive(Debug, Serialize, Deserialize)] +#[derive(Debug, Serialize, Deserialize, oxanus::Worker)] struct TestWorker { sleep_s: u64, } -#[async_trait::async_trait] -impl oxanus::Worker for TestWorker { - type Context = WorkerState; - type Error = WorkerError; - - async fn process( - &self, - oxanus::Context { .. }: &oxanus::Context, - ) -> Result<(), WorkerError> { +impl TestWorker { + async fn process(&self, _: &oxanus::Context) -> Result<(), WorkerError> { tokio::time::sleep(std::time::Duration::from_secs(self.sleep_s)).await; Ok(()) } } -#[derive(Serialize)] +#[derive(Serialize, oxanus::Queue)] +#[oxanus(key = "one", concurrency = 2)] struct QueueOne; -impl oxanus::Queue for QueueOne { - fn to_config() -> oxanus::QueueConfig { - oxanus::QueueConfig { - kind: oxanus::QueueKind::Static { - key: "one".to_string(), - }, - concurrency: 2, - throttle: None, - } - } -} - #[tokio::main] pub async fn main() -> Result<(), oxanus::OxanusError> { tracing_subscriber::registry() @@ -48,7 +30,7 @@ pub async fn main() -> Result<(), oxanus::OxanusError> { .with(EnvFilter::from_default_env()) .init(); - let ctx = oxanus::Context::value(WorkerState {}); + let ctx = oxanus::Context::value(WorkerContext {}); let storage = oxanus::Storage::builder().build_from_env()?; let config = oxanus::Config::new(&storage) .register_queue::() diff --git a/oxanus/examples/resumable.rs b/oxanus/examples/resumable.rs index 6e1fc28..2ef3897 100644 --- a/oxanus/examples/resumable.rs +++ b/oxanus/examples/resumable.rs @@ -12,23 +12,17 @@ enum WorkerError { #[derive(Debug, Serialize, Deserialize, Clone)] struct WorkerState {} -#[derive(Debug, Serialize, Deserialize)] +#[derive(Debug, Serialize, Deserialize, oxanus::Worker)] +#[oxanus(context = WorkerState, max_retries = 10, retry_delay = 3)] struct ResumableTestWorker {} -#[async_trait::async_trait] -impl oxanus::Worker for ResumableTestWorker { - type Context = WorkerState; - type Error = WorkerError; - - async fn process( - &self, - oxanus::Context { state, .. }: &oxanus::Context, - ) -> Result<(), WorkerError> { - let progress = state.get::().await?; +impl ResumableTestWorker { + async fn process(&self, ctx: &oxanus::Context) -> Result<(), WorkerError> { + let progress = ctx.state.get::().await?; dbg!(&progress); - state.update(progress.unwrap_or(0) + 1).await?; + ctx.state.update(progress.unwrap_or(0) + 1).await?; if progress.unwrap_or(0) == 10 { Ok(()) @@ -36,31 +30,12 @@ impl oxanus::Worker for ResumableTestWorker { Err(WorkerError::GenericError("test".to_string())) } } - - fn max_retries(&self) -> u32 { - 10 - } - - fn retry_delay(&self, _retries: u32) -> u64 { - 3 - } } -#[derive(Serialize)] +#[derive(Serialize, oxanus::Queue)] +#[oxanus(key = "one", concurrency = 1)] struct QueueOne; -impl oxanus::Queue for QueueOne { - fn to_config() -> oxanus::QueueConfig { - oxanus::QueueConfig { - kind: oxanus::QueueKind::Static { - key: "one".to_string(), - }, - concurrency: 1, - throttle: None, - } - } -} - #[tokio::main] pub async fn main() -> Result<(), oxanus::OxanusError> { tracing_subscriber::registry() diff --git a/oxanus/examples/throttled.rs b/oxanus/examples/throttled.rs index 459b88c..5fa5a0e 100644 --- a/oxanus/examples/throttled.rs +++ b/oxanus/examples/throttled.rs @@ -7,55 +7,31 @@ enum WorkerError {} #[derive(Debug, Clone)] struct WorkerState {} -#[derive(Debug, Serialize, Deserialize)] +#[derive(Debug, Serialize, Deserialize, oxanus::Worker)] +#[oxanus(context = WorkerState)] struct WorkerInstant {} -#[async_trait::async_trait] -impl oxanus::Worker for WorkerInstant { - type Context = WorkerState; - type Error = WorkerError; - - async fn process( - &self, - oxanus::Context { .. }: &oxanus::Context, - ) -> Result<(), WorkerError> { +impl WorkerInstant { + async fn process(&self, _: &oxanus::Context) -> Result<(), WorkerError> { Ok(()) } } -#[derive(Debug, Serialize, Deserialize)] +#[derive(Debug, Serialize, Deserialize, oxanus::Worker)] +#[oxanus(context = WorkerState)] struct WorkerInstant2 {} -#[async_trait::async_trait] -impl oxanus::Worker for WorkerInstant2 { - type Context = WorkerState; - type Error = WorkerError; - - async fn process( - &self, - oxanus::Context { .. }: &oxanus::Context, - ) -> Result<(), WorkerError> { +impl WorkerInstant2 { + async fn process(&self, _: &oxanus::Context) -> Result<(), WorkerError> { Ok(()) } } -#[derive(Serialize)] +#[derive(Serialize, oxanus::Queue)] +#[oxanus(key = "throttled")] +#[oxanus(throttle(window_ms = 2000, limit = 2))] struct QueueThrottled; -impl oxanus::Queue for QueueThrottled { - fn to_config() -> oxanus::QueueConfig { - oxanus::QueueConfig { - kind: oxanus::QueueKind::Static { - key: "throttled".to_string(), - }, - concurrency: 1, - throttle: Some(oxanus::QueueThrottle { - limit: 2, - window_ms: 2000, - }), - } - } -} #[tokio::main] pub async fn main() -> Result<(), oxanus::OxanusError> { tracing_subscriber::registry() diff --git a/oxanus/examples/unique.rs b/oxanus/examples/unique.rs index a1fb351..e80012b 100644 --- a/oxanus/examples/unique.rs +++ b/oxanus/examples/unique.rs @@ -5,50 +5,25 @@ use tracing_subscriber::{EnvFilter, fmt, prelude::*}; enum WorkerError {} #[derive(Debug, Serialize, Deserialize, Clone)] -struct WorkerState {} +struct WorkerContext {} -#[derive(Debug, Serialize, Deserialize)] +#[derive(Debug, Serialize, Deserialize, oxanus::Worker)] +#[oxanus(unique_id = "worker2sec:{id}", on_conflict = Skip)] struct Worker2Sec { id: usize, } -#[async_trait::async_trait] -impl oxanus::Worker for Worker2Sec { - type Context = WorkerState; - type Error = WorkerError; - - async fn process( - &self, - oxanus::Context { .. }: &oxanus::Context, - ) -> Result<(), WorkerError> { +impl Worker2Sec { + async fn process(&self, _: &oxanus::Context) -> Result<(), WorkerError> { tokio::time::sleep(std::time::Duration::from_millis(2000)).await; Ok(()) } - - fn unique_id(&self) -> Option { - Some(format!("worker2sec:{}", self.id)) - } - - fn on_conflict(&self) -> oxanus::JobConflictStrategy { - oxanus::JobConflictStrategy::Skip - } } -#[derive(Serialize)] +#[derive(Serialize, oxanus::Queue)] +#[oxanus(key = "one")] struct QueueOne; -impl oxanus::Queue for QueueOne { - fn to_config() -> oxanus::QueueConfig { - oxanus::QueueConfig { - kind: oxanus::QueueKind::Static { - key: "one".to_string(), - }, - concurrency: 1, - throttle: None, - } - } -} - #[tokio::main] pub async fn main() -> Result<(), oxanus::OxanusError> { tracing_subscriber::registry() @@ -56,7 +31,7 @@ pub async fn main() -> Result<(), oxanus::OxanusError> { .with(EnvFilter::from_default_env()) .init(); - let ctx = oxanus::Context::value(WorkerState {}); + let ctx = oxanus::Context::value(WorkerContext {}); let storage = oxanus::Storage::builder().build_from_env()?; let config = oxanus::Config::new(&storage) .register_queue::() From 0cca649b0f0e58fee99fbbec4d21ed85d39c4570 Mon Sep 17 00:00:00 2001 From: Chris Tsang Date: Wed, 7 Jan 2026 21:54:09 +0000 Subject: [PATCH 4/5] clippy --- oxanus-macros/src/worker.rs | 7 +++---- oxanus/src/queue.rs | 2 +- 2 files changed, 4 insertions(+), 5 deletions(-) diff --git a/oxanus-macros/src/worker.rs b/oxanus-macros/src/worker.rs index 44cab61..e25fa42 100644 --- a/oxanus-macros/src/worker.rs +++ b/oxanus-macros/src/worker.rs @@ -60,8 +60,7 @@ impl FromMeta for RetryDelay { } Expr::Path(expr_path) => Ok(RetryDelay::CustomFunc(expr_path.path.clone())), other => Err(Error::custom(format!( - "unsupported retry_delay value: {:?}", - other + "Unsupported retry_delay value: {other:?}", ))), }, _ => Err(Error::custom("retry_delay must be a name-value attribute")), @@ -112,14 +111,14 @@ impl FromMeta for UniqueIdSpec { args.push((ident, nv.value)); } - _ => return Err(Error::custom("unsupported unique_id syntax")), + _ => return Err(Error::custom("Unsupported unique_id syntax")), } } let fmt = fmt.ok_or_else(|| Error::custom("missing fmt = \"...\""))?; Ok(UniqueIdSpec::NamedFormatter { fmt, args }) } - _ => Err(Error::custom("invalid unique_id attribute")), + _ => Err(Error::custom("Invalid unique_id attribute")), } } } diff --git a/oxanus/src/queue.rs b/oxanus/src/queue.rs index 65d94cb..1367498 100644 --- a/oxanus/src/queue.rs +++ b/oxanus/src/queue.rs @@ -74,7 +74,7 @@ impl QueueConfig { pub fn static_key(&self) -> Option { match &self.kind { QueueKind::Static { key } => Some(key.clone()), - _ => None, + QueueKind::Dynamic { .. } => None, } } } From e2dc99205a27cfe916ef8fb2f2dfe77798994a08 Mon Sep 17 00:00:00 2001 From: Chris Tsang Date: Thu, 8 Jan 2026 10:35:03 +0000 Subject: [PATCH 5/5] more test cases --- oxanus/src/worker.rs | 27 ++++++++++++++++----------- 1 file changed, 16 insertions(+), 11 deletions(-) diff --git a/oxanus/src/worker.rs b/oxanus/src/worker.rs index 1265b89..a3b6a30 100644 --- a/oxanus/src/worker.rs +++ b/oxanus/src/worker.rs @@ -98,7 +98,7 @@ mod tests { #[derive(Serialize, oxanus::Worker)] #[oxanus(error = std::fmt::Error)] - #[oxanus(max_retries = 3)] + #[oxanus(max_retries = 3, retry_delay = 10)] #[oxanus(on_conflict = Replace)] struct TestWorkerCustomError {} @@ -113,6 +113,7 @@ mod tests { assert_eq!(TestWorkerCustomError {}.unique_id(), None); assert_eq!(TestWorkerCustomError {}.max_retries(), 3); + assert_eq!(TestWorkerCustomError {}.retry_delay(1), 10); assert_eq!( TestWorkerCustomError {}.on_conflict(), JobConflictStrategy::Replace @@ -191,6 +192,7 @@ mod tests { #[derive(Serialize, oxanus::Worker)] #[oxanus(unique_id = Self::unique_id)] + #[oxanus(retry_delay = Self::retry_delay)] struct TestWorkerCustomUniqueId { id: i32, task: TestWorkerNestedTask, @@ -204,6 +206,10 @@ mod tests { fn unique_id(&self) -> Option { Some(format!("worker_id_{}_task_{}", self.id, self.task.name)) } + + fn retry_delay(&self, retries: u32) -> u64 { + retries as u64 * 2 + } } assert_eq!( @@ -216,16 +222,15 @@ mod tests { .unwrap(), "worker_id_1_task_11" ); - assert_eq!( - Worker::unique_id(&TestWorkerCustomUniqueId { - id: 2, - task: TestWorkerNestedTask { - name: "22".to_owned(), - } - }) - .unwrap(), - "worker_id_2_task_22" - ); + let worker2 = TestWorkerCustomUniqueId { + id: 2, + task: TestWorkerNestedTask { + name: "22".to_owned(), + }, + }; + assert_eq!(Worker::unique_id(&worker2).unwrap(), "worker_id_2_task_22"); + assert_eq!(worker2.retry_delay(1), 2); + assert_eq!(worker2.retry_delay(2), 4); } #[cfg(feature = "macros")]