diff --git a/Cargo.lock b/Cargo.lock index 35d88d5..3a37605 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -594,6 +594,15 @@ dependencies = [ "hashbrown", ] +[[package]] +name = "inventory" +version = "0.3.21" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bc61209c082fbeb19919bee74b176221b27223e27b65d781eb91af24eb1fb46e" +dependencies = [ + "rustversion", +] + [[package]] name = "itoa" version = "1.0.17" @@ -746,6 +755,8 @@ dependencies = [ "dotenvy", "futures", "gethostname", + "inventory", + "oxanus", "oxanus-macros", "rand", "redis 1.0.2", diff --git a/oxanus-macros/Cargo.toml b/oxanus-macros/Cargo.toml index e032022..bd452b1 100644 --- a/oxanus-macros/Cargo.toml +++ b/oxanus-macros/Cargo.toml @@ -17,3 +17,4 @@ syn = { version = "2", features = ["parsing", "proc-macro", "derive"] } [features] default = [] +registry = [] diff --git a/oxanus-macros/src/lib.rs b/oxanus-macros/src/lib.rs index 91faf0d..cd2a233 100644 --- a/oxanus-macros/src/lib.rs +++ b/oxanus-macros/src/lib.rs @@ -1,7 +1,9 @@ mod queue; +mod registry; mod worker; use queue::*; +use registry::*; use worker::*; use proc_macro::TokenStream; @@ -44,3 +46,12 @@ pub fn derive_worker(input: TokenStream) -> TokenStream { expand_derive_worker(input).into() } + +/// Helper to define a component registry. +#[proc_macro_error] +#[proc_macro_derive(Registry, attributes(oxanus))] +pub fn derive_registry(input: TokenStream) -> TokenStream { + let input = parse_macro_input!(input as DeriveInput); + + expand_derive_registry(input).into() +} diff --git a/oxanus-macros/src/queue.rs b/oxanus-macros/src/queue.rs index 87b1175..b3852ff 100644 --- a/oxanus-macros/src/queue.rs +++ b/oxanus-macros/src/queue.rs @@ -3,11 +3,12 @@ use heck::ToSnakeCase; use proc_macro_error2::abort; use proc_macro2::TokenStream; use quote::quote; -use syn::{Data, DeriveInput, Fields}; +use syn::{Data, DeriveInput, Fields, Path}; #[derive(Debug, FromDeriveInput)] #[darling(attributes(oxanus), supports(struct_any))] struct OxanusArgs { + registry: Option, key: Option, prefix: Option, concurrency: Option, @@ -77,6 +78,29 @@ pub fn expand_derive_queue(input: DeriveInput) -> TokenStream { None => quote!(), }; + let component_registry = match args.registry { + Some(registry) => quote!(#registry), + None => quote!(ComponentRegistry), + }; + + let registry = if cfg!(feature = "registry") && component_registry.to_string() != "None" { + quote! { + oxanus::register_component! { + #component_registry(oxanus::ComponentRegistry { + module_path: module_path!(), + type_name: stringify!(#struct_ident), + definition: || { + oxanus::ComponentDefinition::Queue( + <#struct_ident as oxanus::Queue>::to_config() + ) + } + }) + } + } + } else { + quote!() + }; + quote! { #[automatically_derived] impl oxanus::Queue for #struct_ident { @@ -86,5 +110,7 @@ pub fn expand_derive_queue(input: DeriveInput) -> TokenStream { #throttle } } + + #registry } } diff --git a/oxanus-macros/src/registry.rs b/oxanus-macros/src/registry.rs new file mode 100644 index 0000000..7ff2a84 --- /dev/null +++ b/oxanus-macros/src/registry.rs @@ -0,0 +1,52 @@ +use proc_macro_error2::abort; +use proc_macro2::TokenStream; +use quote::quote; +use syn::{Data, DeriveInput, Fields, PathArguments, Type}; + +pub fn expand_derive_registry(input: DeriveInput) -> TokenStream { + if !cfg!(feature = "registry") { + return quote!(); + } + + let inner_type = match &input.data { + Data::Struct(data) => match &data.fields { + Fields::Unnamed(fields) if fields.unnamed.len() == 1 => &fields.unnamed[0].ty, + _ => abort!( + input.ident, + "Expected a tuple struct with exactly one field", + ), + }, + _ => abort!(input.ident, "Expected a struct",), + }; + + let type_path = match inner_type { + Type::Path(path) => path, + _ => abort!(input.ident, "Expected a struct with inner type",), + }; + + let generics = match type_path.path.segments.last() { + Some(segment) => match &segment.arguments { + PathArguments::AngleBracketed(args) => args, + _ => abort!( + inner_type, + "Expected generic arguments ", + ), + }, + _ => abort!(input.ident, "Expected a struct with inner type",), + }; + + let struct_ident = &input.ident; + + quote! { + oxanus::create_component_registry!(#struct_ident); + + impl #struct_ident { + pub fn build_config(storage: &oxanus::Storage) -> oxanus::Config::#generics { + oxanus::ComponentRegistry::#generics::build_config( + &storage, + oxanus::iterate_components::<#struct_ident>().map(|x| &x.0), + ) + } + } + } +} diff --git a/oxanus-macros/src/worker.rs b/oxanus-macros/src/worker.rs index e25fa42..9a6f317 100644 --- a/oxanus-macros/src/worker.rs +++ b/oxanus-macros/src/worker.rs @@ -11,6 +11,7 @@ use syn::{ struct OxanusArgs { context: Option, error: Option, + registry: Option, max_retries: Option, retry_delay: Option, unique_id: Option, @@ -183,6 +184,31 @@ pub fn expand_derive_worker(input: DeriveInput) -> TokenStream { None => quote!(), }; + let component_registry = match args.registry { + Some(registry) => quote!(#registry), + None => quote!(ComponentRegistry), + }; + + let registry = if cfg!(feature = "registry") && component_registry.to_string() != "None" { + quote! { + oxanus::register_component! { + #component_registry(oxanus::ComponentRegistry { + module_path: module_path!(), + type_name: stringify!(#struct_ident), + definition: || { + oxanus::ComponentDefinition::Worker(oxanus::WorkerConfig { + name: std::any::type_name::<#struct_ident>().to_owned(), + factory: oxanus::job_factory::<#struct_ident, #type_context, #type_error>, + kind: <#struct_ident as oxanus::Worker>::to_config(), + }) + } + }) + } + } + } else { + quote!() + }; + quote! { #[automatically_derived] #[async_trait::async_trait] @@ -204,6 +230,8 @@ pub fn expand_derive_worker(input: DeriveInput) -> TokenStream { #cron } + + #registry } } diff --git a/oxanus/Cargo.toml b/oxanus/Cargo.toml index 1c5241f..485a15e 100644 --- a/oxanus/Cargo.toml +++ b/oxanus/Cargo.toml @@ -18,6 +18,7 @@ default = ["sentry", "tracing-instrument", "macros"] sentry = ["sentry-core"] tracing-instrument = [] macros = ["oxanus-macros"] +registry = ["inventory", "oxanus-macros?/registry"] [dependencies] async-trait = "0.1" @@ -27,6 +28,7 @@ deadpool-redis = "^0.22" redis = { version = "^1.0", features = ["aio", "tokio-comp"] } futures = { version = "^0.3" } gethostname = "^1.0" +inventory = { version = "0.3", optional = true } sentry-core = { version = "^0.46", optional = true } serde = { version = "^1.0.218", features = ["derive"] } serde_json = { version = "^1.0", features = ["preserve_order"] } @@ -45,6 +47,8 @@ rand = "^0.9" testresult = "^0.4" tracing-subscriber = { version = "^0.3", features = ["env-filter"] } +oxanus = { path = ".", features = ["registry"] } + [[bench]] name = "concurrency" harness = false diff --git a/oxanus/examples/cron.rs b/oxanus/examples/cron.rs index 15f6773..446963b 100644 --- a/oxanus/examples/cron.rs +++ b/oxanus/examples/cron.rs @@ -1,19 +1,22 @@ use serde::{Deserialize, Serialize}; use tracing_subscriber::{EnvFilter, fmt, prelude::*}; +#[derive(oxanus::Registry)] +struct ComponentRegistry(oxanus::ComponentRegistry); + #[derive(Debug, thiserror::Error)] enum WorkerError {} #[derive(Debug, Serialize, Deserialize, Clone)] -struct WorkerState {} +struct WorkerContext {} #[derive(Debug, Serialize, Deserialize, oxanus::Worker)] -#[oxanus(context = WorkerState)] +#[oxanus(context = WorkerContext)] #[oxanus(cron(schedule = "*/5 * * * * *", queue = QueueOne))] struct TestWorker {} impl TestWorker { - async fn process(&self, _: &oxanus::Context) -> Result<(), WorkerError> { + async fn process(&self, _: &oxanus::Context) -> Result<(), WorkerError> { tokio::time::sleep(std::time::Duration::from_secs(1)).await; Ok(()) } @@ -30,11 +33,10 @@ pub async fn main() -> Result<(), oxanus::OxanusError> { .with(EnvFilter::from_default_env()) .init(); - let ctx = oxanus::Context::value(WorkerState {}); + let ctx = oxanus::Context::value(WorkerContext {}); let storage = oxanus::Storage::builder().build_from_env()?; - let config = oxanus::Config::new(&storage) - .register_worker::() - .with_graceful_shutdown(tokio::signal::ctrl_c()); + let config = + ComponentRegistry::build_config(&storage).with_graceful_shutdown(tokio::signal::ctrl_c()); oxanus::run(config, ctx).await?; diff --git a/oxanus/examples/cron_w_err.rs b/oxanus/examples/cron_w_err.rs index 51394e9..b029b67 100644 --- a/oxanus/examples/cron_w_err.rs +++ b/oxanus/examples/cron_w_err.rs @@ -12,6 +12,7 @@ enum WorkerError { struct WorkerContext {} #[derive(Debug, Serialize, Deserialize, oxanus::Worker)] +#[oxanus(registry = None)] #[oxanus(max_retries = 3, retry_delay = 0)] #[oxanus(cron(schedule = "*/10 * * * * *"))] struct TestWorker {} @@ -27,6 +28,7 @@ impl TestWorker { } #[derive(Serialize, oxanus::Queue)] +#[oxanus(registry = None)] #[oxanus(prefix = "two")] struct QueueDynamic(i32); diff --git a/oxanus/examples/dynamic.rs b/oxanus/examples/dynamic.rs index dad5b6d..e57f9b7 100644 --- a/oxanus/examples/dynamic.rs +++ b/oxanus/examples/dynamic.rs @@ -1,18 +1,20 @@ use serde::{Deserialize, Serialize}; use tracing_subscriber::{EnvFilter, fmt, prelude::*}; +#[derive(oxanus::Registry)] +struct ComponentRegistry(oxanus::ComponentRegistry); + #[derive(Debug, thiserror::Error)] enum WorkerError {} #[derive(Debug, Clone)] -struct WorkerState {} +struct WorkerContext {} #[derive(Debug, Serialize, Deserialize, oxanus::Worker)] -#[oxanus(context = WorkerState)] struct Worker2Sec {} impl Worker2Sec { - async fn process(&self, _: &oxanus::Context) -> Result<(), WorkerError> { + async fn process(&self, _: &oxanus::Context) -> Result<(), WorkerError> { tokio::time::sleep(std::time::Duration::from_secs(1)).await; Ok(()) } @@ -36,12 +38,9 @@ pub async fn main() -> Result<(), oxanus::OxanusError> { .with(EnvFilter::from_default_env()) .init(); - let ctx = oxanus::Context::value(WorkerState {}); + let ctx = oxanus::Context::value(WorkerContext {}); let storage = oxanus::Storage::builder().build_from_env()?; - let config = oxanus::Config::new(&storage.clone()) - .register_queue::() - .register_worker::() - .exit_when_processed(5); + let config = ComponentRegistry::build_config(&storage).exit_when_processed(5); storage .enqueue(QueueDynamic(Animal::Cat, 2), Worker2Sec {}) diff --git a/oxanus/examples/foo.rs b/oxanus/examples/foo.rs index d5739eb..eae15da 100644 --- a/oxanus/examples/foo.rs +++ b/oxanus/examples/foo.rs @@ -1,6 +1,9 @@ use serde::{Deserialize, Serialize}; use tracing_subscriber::{EnvFilter, fmt, prelude::*}; +#[derive(oxanus::Registry)] +struct ComponentRegistry(oxanus::ComponentRegistry); + #[derive(Debug, Serialize, Deserialize, oxanus::Worker)] struct Worker1Sec { id: usize, @@ -80,15 +83,7 @@ pub async fn main() -> Result<(), oxanus::OxanusError> { let ctx = oxanus::Context::value(WorkerContext {}); let storage = oxanus::Storage::builder().build_from_env()?; - let config = oxanus::Config::new(&storage.clone()) - .register_queue::() - .register_queue::() - .register_queue::() - .register_worker::() - .register_worker::() - .register_worker::() - .register_worker::() - .exit_when_processed(12); + let config = ComponentRegistry::build_config(&storage).exit_when_processed(12); storage .enqueue( diff --git a/oxanus/examples/minimal.rs b/oxanus/examples/minimal.rs index f9c74a2..309261a 100644 --- a/oxanus/examples/minimal.rs +++ b/oxanus/examples/minimal.rs @@ -1,6 +1,9 @@ use serde::{Deserialize, Serialize}; use tracing_subscriber::{EnvFilter, fmt, prelude::*}; +#[derive(oxanus::Registry)] +struct ComponentRegistry(oxanus::ComponentRegistry); + #[derive(Debug, thiserror::Error)] enum WorkerError {} @@ -32,9 +35,7 @@ pub async fn main() -> Result<(), oxanus::OxanusError> { let ctx = oxanus::Context::value(WorkerContext {}); let storage = oxanus::Storage::builder().build_from_env()?; - let config = oxanus::Config::new(&storage) - .register_queue::() - .register_worker::() + let config = ComponentRegistry::build_config(&storage) .with_graceful_shutdown(tokio::signal::ctrl_c()) .exit_when_processed(1); diff --git a/oxanus/examples/resumable.rs b/oxanus/examples/resumable.rs index 2ef3897..a29efab 100644 --- a/oxanus/examples/resumable.rs +++ b/oxanus/examples/resumable.rs @@ -1,6 +1,9 @@ use serde::{Deserialize, Serialize}; use tracing_subscriber::{EnvFilter, fmt, prelude::*}; +#[derive(oxanus::Registry)] +struct ComponentRegistry(oxanus::ComponentRegistry); + #[derive(Debug, thiserror::Error)] enum WorkerError { #[error("Generic error: {0}")] @@ -10,14 +13,14 @@ enum WorkerError { } #[derive(Debug, Serialize, Deserialize, Clone)] -struct WorkerState {} +struct WorkerContext {} #[derive(Debug, Serialize, Deserialize, oxanus::Worker)] -#[oxanus(context = WorkerState, max_retries = 10, retry_delay = 3)] +#[oxanus(max_retries = 10, retry_delay = 3)] struct ResumableTestWorker {} impl ResumableTestWorker { - async fn process(&self, ctx: &oxanus::Context) -> Result<(), WorkerError> { + async fn process(&self, ctx: &oxanus::Context) -> Result<(), WorkerError> { let progress = ctx.state.get::().await?; dbg!(&progress); @@ -43,11 +46,9 @@ pub async fn main() -> Result<(), oxanus::OxanusError> { .with(EnvFilter::from_default_env()) .init(); - let ctx = oxanus::Context::value(WorkerState {}); + let ctx = oxanus::Context::value(WorkerContext {}); let storage = oxanus::Storage::builder().build_from_env()?; - let config = oxanus::Config::new(&storage) - .register_queue::() - .register_worker::() + let config = ComponentRegistry::build_config(&storage) .with_graceful_shutdown(tokio::signal::ctrl_c()) .exit_when_processed(11); diff --git a/oxanus/examples/throttled.rs b/oxanus/examples/throttled.rs index 5fa5a0e..e7ace18 100644 --- a/oxanus/examples/throttled.rs +++ b/oxanus/examples/throttled.rs @@ -1,28 +1,29 @@ use serde::{Deserialize, Serialize}; use tracing_subscriber::{EnvFilter, fmt, prelude::*}; +#[derive(oxanus::Registry)] +struct ComponentRegistry(oxanus::ComponentRegistry); + #[derive(Debug, thiserror::Error)] enum WorkerError {} #[derive(Debug, Clone)] -struct WorkerState {} +struct WorkerContext {} #[derive(Debug, Serialize, Deserialize, oxanus::Worker)] -#[oxanus(context = WorkerState)] struct WorkerInstant {} impl WorkerInstant { - async fn process(&self, _: &oxanus::Context) -> Result<(), WorkerError> { + async fn process(&self, _: &oxanus::Context) -> Result<(), WorkerError> { Ok(()) } } #[derive(Debug, Serialize, Deserialize, oxanus::Worker)] -#[oxanus(context = WorkerState)] struct WorkerInstant2 {} impl WorkerInstant2 { - async fn process(&self, _: &oxanus::Context) -> Result<(), WorkerError> { + async fn process(&self, _: &oxanus::Context) -> Result<(), WorkerError> { Ok(()) } } @@ -39,13 +40,9 @@ pub async fn main() -> Result<(), oxanus::OxanusError> { .with(EnvFilter::from_default_env()) .init(); - let ctx = oxanus::Context::value(WorkerState {}); + let ctx = oxanus::Context::value(WorkerContext {}); let storage = oxanus::Storage::builder().build_from_env()?; - let config = oxanus::Config::new(&storage.clone()) - .register_queue::() - .register_worker::() - .register_worker::() - .exit_when_processed(8); + let config = ComponentRegistry::build_config(&storage).exit_when_processed(8); storage.enqueue(QueueThrottled, WorkerInstant {}).await?; storage.enqueue(QueueThrottled, WorkerInstant2 {}).await?; diff --git a/oxanus/examples/unique.rs b/oxanus/examples/unique.rs index e80012b..f63d047 100644 --- a/oxanus/examples/unique.rs +++ b/oxanus/examples/unique.rs @@ -1,6 +1,9 @@ use serde::{Deserialize, Serialize}; use tracing_subscriber::{EnvFilter, fmt, prelude::*}; +#[derive(oxanus::Registry)] +struct ComponentRegistry(oxanus::ComponentRegistry); + #[derive(Debug, thiserror::Error)] enum WorkerError {} @@ -33,9 +36,7 @@ pub async fn main() -> Result<(), oxanus::OxanusError> { let ctx = oxanus::Context::value(WorkerContext {}); let storage = oxanus::Storage::builder().build_from_env()?; - let config = oxanus::Config::new(&storage) - .register_queue::() - .register_worker::(); + let config = ComponentRegistry::build_config(&storage); storage.enqueue(QueueOne, Worker2Sec { id: 1 }).await?; storage.enqueue(QueueOne, Worker2Sec { id: 1 }).await?; diff --git a/oxanus/src/config.rs b/oxanus/src/config.rs index 16fd9e8..8846c8b 100644 --- a/oxanus/src/config.rs +++ b/oxanus/src/config.rs @@ -5,7 +5,7 @@ use tokio_util::sync::CancellationToken; use crate::Storage; use crate::queue::{Queue, QueueConfig}; use crate::worker::Worker; -use crate::worker_registry::WorkerRegistry; +use crate::worker_registry::{WorkerConfig, WorkerRegistry}; pub struct Config { pub(crate) registry: WorkerRegistry, @@ -35,17 +35,21 @@ impl Config { where Q: Queue, { - self.queues.insert(Q::to_config()); + self.register_queue_with(Q::to_config()); self } + pub fn register_queue_with(&mut self, config: QueueConfig) { + self.queues.insert(config); + } + pub fn register_queue_with_concurrency(mut self, concurrency: usize) -> Self where Q: Queue, { let mut config = Q::to_config(); config.concurrency = concurrency; - self.queues.insert(config); + self.register_queue_with(config); self } @@ -57,7 +61,7 @@ impl Config { let key = queue_config .static_key() .expect("Statically defined cron workers can only use static queues"); - self.queues.insert(queue_config); + self.register_queue_with(queue_config); self.registry.register_cron::(&schedule, key); } else { self.registry.register::(); @@ -70,12 +74,16 @@ impl Config { where W: Worker + serde::de::DeserializeOwned + 'static, { - self.queues.insert(queue.config()); + self.register_queue_with(queue.config()); let schedule = W::cron_schedule().expect("Cron Worker must have cron_schedule defined"); self.registry.register_cron::(&schedule, queue.key()); self } + pub fn register_worker_with(&mut self, config: WorkerConfig) { + self.registry.register_worker_with(config); + } + pub fn exit_when_processed(mut self, processed: u64) -> Self { self.exit_when_processed = Some(processed); self @@ -96,6 +104,17 @@ impl Config { std::mem::swap(&mut self.shutdown_signal, &mut shutdown_signal); shutdown_signal } + + pub fn has_registered_queue(&self) -> bool { + self.queues.contains(&Q::to_config()) + } + + pub fn has_registered_worker(&self) -> bool + where + W: Worker, + { + self.registry.has_registered::() + } } #[cfg(any(target_os = "linux", target_os = "macos"))] diff --git a/oxanus/src/lib.rs b/oxanus/src/lib.rs index 2a2cdb8..26a8226 100644 --- a/oxanus/src/lib.rs +++ b/oxanus/src/lib.rs @@ -96,6 +96,9 @@ mod worker; mod worker_event; mod worker_registry; +#[cfg(feature = "registry")] +mod registry; + #[cfg(test)] mod test_helper; @@ -109,6 +112,10 @@ pub use crate::queue::{Queue, QueueConfig, QueueKind, QueueThrottle}; pub use crate::storage::Storage; pub use crate::storage_builder::{StorageBuilder, StorageBuilderTimeouts}; pub use crate::worker::Worker; +pub use crate::worker_registry::{WorkerConfig, WorkerConfigKind, job_factory}; + +#[cfg(feature = "registry")] +pub use registry::*; #[cfg(feature = "macros")] -pub use oxanus_macros::{Queue, Worker}; +pub use oxanus_macros::{Queue, Registry, Worker}; diff --git a/oxanus/src/queue.rs b/oxanus/src/queue.rs index 1367498..6f79cad 100644 --- a/oxanus/src/queue.rs +++ b/oxanus/src/queue.rs @@ -195,6 +195,10 @@ mod tests { fn test_define_queue_with_macro() { use crate as oxanus; // needed for unit test + #[derive(oxanus::Registry)] + #[allow(dead_code)] + struct ComponentRegistry(oxanus::ComponentRegistry<(), ()>); + #[derive(Serialize, oxanus::Queue)] struct DefaultQueue; diff --git a/oxanus/src/registry.rs b/oxanus/src/registry.rs new file mode 100644 index 0000000..12617bc --- /dev/null +++ b/oxanus/src/registry.rs @@ -0,0 +1,48 @@ +use crate::{Config, QueueConfig, Storage, worker_registry::WorkerConfig}; + +pub struct ComponentRegistry { + /// `module_path!()` + pub module_path: &'static str, + /// `stringify!(MyStruct)` + pub type_name: &'static str, + pub definition: fn() -> ComponentDefinition, +} + +pub enum ComponentDefinition { + Queue(QueueConfig), + Worker(WorkerConfig), +} + +/// Macro to create a component registry +pub use inventory::collect as create_component_registry; + +/// Macro to register a Queue or Worker +pub use inventory::submit as register_component; + +/// Helper type to iterate components +pub use inventory::iter as iterate_components; + +impl ComponentRegistry +where + DT: 'static, + ET: 'static, +{ + pub fn build_config( + storage: &Storage, + items: impl Iterator, + ) -> Config { + let mut config = Config::new(storage); + for component in items { + tracing::info!( + "Registering {}::{}", + component.module_path, + component.type_name + ); + match (component.definition)() { + ComponentDefinition::Queue(q) => config.register_queue_with(q), + ComponentDefinition::Worker(w) => config.register_worker_with(w), + } + } + config + } +} diff --git a/oxanus/src/worker.rs b/oxanus/src/worker.rs index a3b6a30..b50f3db 100644 --- a/oxanus/src/worker.rs +++ b/oxanus/src/worker.rs @@ -1,4 +1,4 @@ -use crate::{QueueConfig, context::Context, job_envelope::JobConflictStrategy}; +use crate::{QueueConfig, WorkerConfigKind, context::Context, job_envelope::JobConflictStrategy}; use std::panic::UnwindSafe; pub type BoxedWorker = Box>; @@ -48,28 +48,53 @@ pub trait Worker: Send + Sync + UnwindSafe { { None } + + fn to_config() -> WorkerConfigKind + where + Self: Sized, + { + #[allow(clippy::collapsible_if)] // requires 1.88 + if let (Some(schedule), Some(queue_config)) = + (Self::cron_schedule(), Self::cron_queue_config()) + { + if let Some(queue_key) = queue_config.static_key() { + return WorkerConfigKind::Cron { + schedule, + queue_key, + }; + } + } + WorkerConfigKind::Normal + } } +#[cfg(feature = "macros")] #[cfg(test)] mod tests { use super::{JobConflictStrategy, Worker}; + use crate as oxanus; use crate::Context; use crate::test_helper::create_worker_context; - use serde::Serialize; - use std::sync::{Arc, Mutex}; + use serde::{Deserialize, Serialize}; + use std::io::Error as WorkerError; + use std::sync::{Arc, Mutex}; // needed for unit test #[derive(Clone, Default)] struct WorkerContext { count: Arc>, } - #[cfg(feature = "macros")] + #[derive(oxanus::Registry)] + #[allow(dead_code)] + struct ComponentRegistry(oxanus::ComponentRegistry); + + #[derive(oxanus::Registry)] + #[allow(dead_code)] + struct ComponentRegistryFmt(oxanus::ComponentRegistry); + #[tokio::test] async fn test_define_worker_with_macro() { - use crate as oxanus; - use std::io::Error as WorkerError; - - #[derive(Serialize, oxanus::Worker)] + #[derive(Serialize, Deserialize, oxanus::Worker)] struct TestWorker {} impl TestWorker { @@ -96,8 +121,8 @@ mod tests { assert_eq!(*ctx.count.lock().unwrap(), 2); - #[derive(Serialize, oxanus::Worker)] - #[oxanus(error = std::fmt::Error)] + #[derive(Serialize, Deserialize, oxanus::Worker)] + #[oxanus(error = std::fmt::Error, registry = ComponentRegistryFmt)] #[oxanus(max_retries = 3, retry_delay = 10)] #[oxanus(on_conflict = Replace)] struct TestWorkerCustomError {} @@ -119,7 +144,7 @@ mod tests { JobConflictStrategy::Replace ); - #[derive(Serialize, oxanus::Worker)] + #[derive(Serialize, Deserialize, oxanus::Worker)] #[oxanus(unique_id = "test_worker_{id}")] struct TestWorkerUniqueId { id: i32, @@ -141,14 +166,14 @@ mod tests { "test_worker_12" ); - #[derive(Serialize, oxanus::Worker)] + #[derive(Serialize, Deserialize, oxanus::Worker)] #[oxanus(unique_id(fmt = "test_worker_{id}_{task}", id = self.id, task = self.task.name))] struct TestWorkerNestedUniqueId { id: i32, task: TestWorkerNestedTask, } - #[derive(Serialize, Default)] + #[derive(Serialize, Deserialize, Default)] struct TestWorkerNestedTask { name: String, } @@ -190,7 +215,7 @@ mod tests { "test_worker_2_task2" ); - #[derive(Serialize, oxanus::Worker)] + #[derive(Serialize, Deserialize, oxanus::Worker)] #[oxanus(unique_id = Self::unique_id)] #[oxanus(retry_delay = Self::retry_delay)] struct TestWorkerCustomUniqueId { @@ -233,7 +258,6 @@ mod tests { assert_eq!(worker2.retry_delay(2), 4); } - #[cfg(feature = "macros")] #[tokio::test] async fn test_define_cron_worker_with_macro() { use crate as oxanus; // needed for unit test @@ -243,7 +267,7 @@ mod tests { #[derive(Serialize, oxanus::Queue)] struct DefaultQueue; - #[derive(Serialize, oxanus::Worker)] + #[derive(Serialize, Deserialize, oxanus::Worker)] #[oxanus(cron(schedule = "*/1 * * * * *", queue = DefaultQueue))] struct TestCronWorker {} diff --git a/oxanus/src/worker_registry.rs b/oxanus/src/worker_registry.rs index fc14d73..f291d81 100644 --- a/oxanus/src/worker_registry.rs +++ b/oxanus/src/worker_registry.rs @@ -12,12 +12,34 @@ pub struct WorkerRegistry { pub schedules: HashMap, } +pub struct WorkerConfig { + pub name: String, + pub factory: JobFactory, + pub kind: WorkerConfigKind, +} + +pub enum WorkerConfigKind { + Normal, + Cron { schedule: String, queue_key: String }, +} + #[derive(Debug, Clone)] pub struct CronJob { pub schedule: cron::Schedule, pub queue_key: String, } +pub fn job_factory< + T: Worker + serde::de::DeserializeOwned + 'static, + DT, + ET, +>( + value: serde_json::Value, +) -> Result, OxanusError> { + let job: T = serde_json::from_value(value)?; + Ok(Box::new(job)) +} + impl WorkerRegistry { pub fn new() -> Self { Self { @@ -30,20 +52,9 @@ impl WorkerRegistry { where T: Worker + serde::de::DeserializeOwned + 'static, { - fn factory< - T: Worker + serde::de::DeserializeOwned + 'static, - DT, - ET, - >( - value: serde_json::Value, - ) -> Result, OxanusError> { - let job: T = serde_json::from_value(value)?; - Ok(Box::new(job)) - } - let name = type_name::(); - self.jobs.insert(name.to_string(), factory::); + self.jobs.insert(name.to_string(), job_factory::); self } @@ -69,6 +80,41 @@ impl WorkerRegistry { self } + pub fn register_worker_with(&mut self, config: WorkerConfig) { + match config.kind { + WorkerConfigKind::Normal => { + self.jobs.insert(config.name, config.factory); + } + WorkerConfigKind::Cron { + schedule, + queue_key, + } => { + // we can enforce cron worker being `struct Worker {}` in macro + + self.jobs.insert(config.name.clone(), config.factory); + + let schedule = cron::Schedule::from_str(&schedule).unwrap_or_else(|_| { + panic!("{}: Invalid cron schedule: {schedule}", config.name) + }); + + self.schedules.insert( + config.name, + CronJob { + schedule, + queue_key, + }, + ); + } + } + } + + pub fn has_registered(&self) -> bool + where + T: Worker, + { + self.jobs.contains_key(type_name::()) + } + pub fn build( &self, name: &str, diff --git a/oxanus/tests/integration/main.rs b/oxanus/tests/integration/main.rs index 6962453..6a1358d 100644 --- a/oxanus/tests/integration/main.rs +++ b/oxanus/tests/integration/main.rs @@ -3,6 +3,7 @@ mod dead; mod drain; mod dynamic; mod panic; +mod registry; mod retry; mod shared; mod standard; diff --git a/oxanus/tests/integration/registry.rs b/oxanus/tests/integration/registry.rs new file mode 100644 index 0000000..1b6c330 --- /dev/null +++ b/oxanus/tests/integration/registry.rs @@ -0,0 +1,73 @@ +use crate::shared::{WorkerError, WorkerState as WorkerContext, random_string, setup}; +use deadpool_redis::redis::AsyncCommands; +use serde::{Deserialize, Serialize}; +use testresult::TestResult; + +#[derive(oxanus::Registry)] +struct ComponentRegistry(oxanus::ComponentRegistry); + +#[derive(Serialize, oxanus::Queue)] +#[oxanus(key = "two")] +struct QueueTwo; + +#[derive(Debug, Clone, Serialize, Deserialize, oxanus::Worker)] +pub struct WorkerCounter { + pub key: String, +} + +impl WorkerCounter { + async fn process(&self, ctx: &oxanus::Context) -> Result<(), WorkerError> { + let mut redis = ctx.ctx.redis.get().await?; + let _: () = redis.incr(&self.key, 1).await?; + Ok(()) + } +} + +#[derive(Debug, Serialize, Deserialize, oxanus::Worker)] +#[oxanus(cron(schedule = "* * * * * *", queue = QueueTwo))] +pub struct CronWorkerCounter {} + +impl CronWorkerCounter { + async fn process(&self, ctx: &oxanus::Context) -> Result<(), WorkerError> { + let mut redis = ctx.ctx.redis.get().await?; + let _: () = redis.incr("test_worker:counter", 1).await?; + Ok(()) + } +} + +#[tokio::test] +pub async fn test_registry() -> TestResult { + let redis_pool = setup(); + let mut redis_conn = redis_pool.get().await?; + let _: i64 = redis_conn.del("test_worker:counter").await?; + + let ctx = oxanus::Context::value(WorkerContext { + redis: redis_pool.clone(), + }); + + let storage = oxanus::Storage::builder() + .namespace(random_string()) + .build_from_pool(redis_pool.clone())?; + + let config = ComponentRegistry::build_config(&storage).exit_when_processed(2); + + // no need to manually register, here we verify they were registered + assert!(config.has_registered_queue::()); + assert!(config.has_registered_worker::()); + assert!(config.has_registered_worker::()); + + let worker = WorkerCounter { + key: "test_worker:counter".to_owned(), + }; + + storage.enqueue(QueueTwo, worker).await?; + + oxanus::run(config, ctx).await?; + + let mut redis_conn = redis_pool.get().await?; + let value: Option = redis_conn.get("test_worker:counter").await?; + + assert_eq!(value, Some(2)); + + Ok(()) +}