Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions oxanus-macros/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,4 @@ syn = { version = "2", features = ["parsing", "proc-macro", "derive"] }

[features]
default = []
registry = []
11 changes: 11 additions & 0 deletions oxanus-macros/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
mod queue;
mod registry;
mod worker;

use queue::*;
use registry::*;
use worker::*;

use proc_macro::TokenStream;
Expand Down Expand Up @@ -44,3 +46,12 @@ pub fn derive_worker(input: TokenStream) -> TokenStream {

expand_derive_worker(input).into()
}

/// Helper to define a component registry.
#[proc_macro_error]
#[proc_macro_derive(Registry, attributes(oxanus))]
pub fn derive_registry(input: TokenStream) -> TokenStream {
let input = parse_macro_input!(input as DeriveInput);

expand_derive_registry(input).into()
}
28 changes: 27 additions & 1 deletion oxanus-macros/src/queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,12 @@ use heck::ToSnakeCase;
use proc_macro_error2::abort;
use proc_macro2::TokenStream;
use quote::quote;
use syn::{Data, DeriveInput, Fields};
use syn::{Data, DeriveInput, Fields, Path};

#[derive(Debug, FromDeriveInput)]
#[darling(attributes(oxanus), supports(struct_any))]
struct OxanusArgs {
registry: Option<Path>,
key: Option<String>,
prefix: Option<String>,
concurrency: Option<usize>,
Expand Down Expand Up @@ -77,6 +78,29 @@ pub fn expand_derive_queue(input: DeriveInput) -> TokenStream {
None => quote!(),
};

let component_registry = match args.registry {
Some(registry) => quote!(#registry),
None => quote!(ComponentRegistry),
};

let registry = if cfg!(feature = "registry") && component_registry.to_string() != "None" {
quote! {
oxanus::register_component! {
#component_registry(oxanus::ComponentRegistry {
module_path: module_path!(),
type_name: stringify!(#struct_ident),
definition: || {
oxanus::ComponentDefinition::Queue(
<#struct_ident as oxanus::Queue>::to_config()
)
}
})
}
}
} else {
quote!()
};

quote! {
#[automatically_derived]
impl oxanus::Queue for #struct_ident {
Expand All @@ -86,5 +110,7 @@ pub fn expand_derive_queue(input: DeriveInput) -> TokenStream {
#throttle
}
}

#registry
}
}
52 changes: 52 additions & 0 deletions oxanus-macros/src/registry.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
use proc_macro_error2::abort;
use proc_macro2::TokenStream;
use quote::quote;
use syn::{Data, DeriveInput, Fields, PathArguments, Type};

pub fn expand_derive_registry(input: DeriveInput) -> TokenStream {
if !cfg!(feature = "registry") {
return quote!();
}

let inner_type = match &input.data {
Data::Struct(data) => match &data.fields {
Fields::Unnamed(fields) if fields.unnamed.len() == 1 => &fields.unnamed[0].ty,
_ => abort!(
input.ident,
"Expected a tuple struct with exactly one field",
),
},
_ => abort!(input.ident, "Expected a struct",),
};

let type_path = match inner_type {
Type::Path(path) => path,
_ => abort!(input.ident, "Expected a struct with inner type",),
};

let generics = match type_path.path.segments.last() {
Some(segment) => match &segment.arguments {
PathArguments::AngleBracketed(args) => args,
_ => abort!(
inner_type,
"Expected generic arguments <WorkerContext, WorkerError>",
),
},
_ => abort!(input.ident, "Expected a struct with inner type",),
};

let struct_ident = &input.ident;

quote! {
oxanus::create_component_registry!(#struct_ident);

impl #struct_ident {
pub fn build_config(storage: &oxanus::Storage) -> oxanus::Config::#generics {
oxanus::ComponentRegistry::#generics::build_config(
&storage,
oxanus::iterate_components::<#struct_ident>().map(|x| &x.0),
)
}
}
}
}
28 changes: 28 additions & 0 deletions oxanus-macros/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use syn::{
struct OxanusArgs {
context: Option<Path>,
error: Option<Path>,
registry: Option<Path>,
max_retries: Option<u32>,
retry_delay: Option<RetryDelay>,
unique_id: Option<UniqueIdSpec>,
Expand Down Expand Up @@ -183,6 +184,31 @@ pub fn expand_derive_worker(input: DeriveInput) -> TokenStream {
None => quote!(),
};

let component_registry = match args.registry {
Some(registry) => quote!(#registry),
None => quote!(ComponentRegistry),
};

let registry = if cfg!(feature = "registry") && component_registry.to_string() != "None" {
quote! {
oxanus::register_component! {
#component_registry(oxanus::ComponentRegistry {
module_path: module_path!(),
type_name: stringify!(#struct_ident),
definition: || {
oxanus::ComponentDefinition::Worker(oxanus::WorkerConfig {
name: std::any::type_name::<#struct_ident>().to_owned(),
factory: oxanus::job_factory::<#struct_ident, #type_context, #type_error>,
kind: <#struct_ident as oxanus::Worker>::to_config(),
})
}
})
}
}
} else {
quote!()
};

quote! {
#[automatically_derived]
#[async_trait::async_trait]
Expand All @@ -204,6 +230,8 @@ pub fn expand_derive_worker(input: DeriveInput) -> TokenStream {

#cron
}

#registry
}
}

Expand Down
4 changes: 4 additions & 0 deletions oxanus/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ default = ["sentry", "tracing-instrument", "macros"]
sentry = ["sentry-core"]
tracing-instrument = []
macros = ["oxanus-macros"]
registry = ["inventory", "oxanus-macros?/registry"]

[dependencies]
async-trait = "0.1"
Expand All @@ -27,6 +28,7 @@ deadpool-redis = "^0.22"
redis = { version = "^1.0", features = ["aio", "tokio-comp"] }
futures = { version = "^0.3" }
gethostname = "^1.0"
inventory = { version = "0.3", optional = true }
sentry-core = { version = "^0.46", optional = true }
serde = { version = "^1.0.218", features = ["derive"] }
serde_json = { version = "^1.0", features = ["preserve_order"] }
Expand All @@ -45,6 +47,8 @@ rand = "^0.9"
testresult = "^0.4"
tracing-subscriber = { version = "^0.3", features = ["env-filter"] }

oxanus = { path = ".", features = ["registry"] }

[[bench]]
name = "concurrency"
harness = false
16 changes: 9 additions & 7 deletions oxanus/examples/cron.rs
Original file line number Diff line number Diff line change
@@ -1,19 +1,22 @@
use serde::{Deserialize, Serialize};
use tracing_subscriber::{EnvFilter, fmt, prelude::*};

#[derive(oxanus::Registry)]
struct ComponentRegistry(oxanus::ComponentRegistry<WorkerContext, WorkerError>);

#[derive(Debug, thiserror::Error)]
enum WorkerError {}

#[derive(Debug, Serialize, Deserialize, Clone)]
struct WorkerState {}
struct WorkerContext {}

#[derive(Debug, Serialize, Deserialize, oxanus::Worker)]
#[oxanus(context = WorkerState)]
#[oxanus(context = WorkerContext)]
#[oxanus(cron(schedule = "*/5 * * * * *", queue = QueueOne))]
struct TestWorker {}

impl TestWorker {
async fn process(&self, _: &oxanus::Context<WorkerState>) -> Result<(), WorkerError> {
async fn process(&self, _: &oxanus::Context<WorkerContext>) -> Result<(), WorkerError> {
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
Ok(())
}
Expand All @@ -30,11 +33,10 @@ pub async fn main() -> Result<(), oxanus::OxanusError> {
.with(EnvFilter::from_default_env())
.init();

let ctx = oxanus::Context::value(WorkerState {});
let ctx = oxanus::Context::value(WorkerContext {});
let storage = oxanus::Storage::builder().build_from_env()?;
let config = oxanus::Config::new(&storage)
.register_worker::<TestWorker>()
.with_graceful_shutdown(tokio::signal::ctrl_c());
let config =
ComponentRegistry::build_config(&storage).with_graceful_shutdown(tokio::signal::ctrl_c());

oxanus::run(config, ctx).await?;

Expand Down
2 changes: 2 additions & 0 deletions oxanus/examples/cron_w_err.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ enum WorkerError {
struct WorkerContext {}

#[derive(Debug, Serialize, Deserialize, oxanus::Worker)]
#[oxanus(registry = None)]
#[oxanus(max_retries = 3, retry_delay = 0)]
#[oxanus(cron(schedule = "*/10 * * * * *"))]
struct TestWorker {}
Expand All @@ -27,6 +28,7 @@ impl TestWorker {
}

#[derive(Serialize, oxanus::Queue)]
#[oxanus(registry = None)]
#[oxanus(prefix = "two")]
struct QueueDynamic(i32);

Expand Down
15 changes: 7 additions & 8 deletions oxanus/examples/dynamic.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,20 @@
use serde::{Deserialize, Serialize};
use tracing_subscriber::{EnvFilter, fmt, prelude::*};

#[derive(oxanus::Registry)]
struct ComponentRegistry(oxanus::ComponentRegistry<WorkerContext, WorkerError>);

#[derive(Debug, thiserror::Error)]
enum WorkerError {}

#[derive(Debug, Clone)]
struct WorkerState {}
struct WorkerContext {}

#[derive(Debug, Serialize, Deserialize, oxanus::Worker)]
#[oxanus(context = WorkerState)]
struct Worker2Sec {}

impl Worker2Sec {
async fn process(&self, _: &oxanus::Context<WorkerState>) -> Result<(), WorkerError> {
async fn process(&self, _: &oxanus::Context<WorkerContext>) -> Result<(), WorkerError> {
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
Ok(())
}
Expand All @@ -36,12 +38,9 @@ pub async fn main() -> Result<(), oxanus::OxanusError> {
.with(EnvFilter::from_default_env())
.init();

let ctx = oxanus::Context::value(WorkerState {});
let ctx = oxanus::Context::value(WorkerContext {});
let storage = oxanus::Storage::builder().build_from_env()?;
let config = oxanus::Config::new(&storage.clone())
.register_queue::<QueueDynamic>()
.register_worker::<Worker2Sec>()
.exit_when_processed(5);
let config = ComponentRegistry::build_config(&storage).exit_when_processed(5);

storage
.enqueue(QueueDynamic(Animal::Cat, 2), Worker2Sec {})
Expand Down
13 changes: 4 additions & 9 deletions oxanus/examples/foo.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
use serde::{Deserialize, Serialize};
use tracing_subscriber::{EnvFilter, fmt, prelude::*};

#[derive(oxanus::Registry)]
struct ComponentRegistry(oxanus::ComponentRegistry<WorkerContext, WorkerError>);
Comment on lines +4 to +5
Copy link
Collaborator Author

@tyt2y3 tyt2y3 Jan 8, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ComponentRegistry is a unique type owned by user's crate.

Each ComponentRegistry is unique, and would only contain types that's registered to it statically.

The derive macros generate register_component!(ComponentRegistry, TestWorker) under the hood.


#[derive(Debug, Serialize, Deserialize, oxanus::Worker)]
struct Worker1Sec {
id: usize,
Expand Down Expand Up @@ -80,15 +83,7 @@ pub async fn main() -> Result<(), oxanus::OxanusError> {

let ctx = oxanus::Context::value(WorkerContext {});
let storage = oxanus::Storage::builder().build_from_env()?;
let config = oxanus::Config::new(&storage.clone())
.register_queue::<QueueOne>()
.register_queue::<QueueTwo>()
.register_queue::<QueueThrottled>()
.register_worker::<Worker1Sec>()
.register_worker::<Worker2Sec>()
.register_worker::<WorkerInstant>()
.register_worker::<WorkerInstant2>()
Comment on lines -83 to -90
Copy link
Collaborator Author

@tyt2y3 tyt2y3 Jan 8, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On runtime, build_config would iterate all queues and workers and build the config object.

Manual register is no longer needed.

.exit_when_processed(12);
let config = ComponentRegistry::build_config(&storage).exit_when_processed(12);

storage
.enqueue(
Expand Down
7 changes: 4 additions & 3 deletions oxanus/examples/minimal.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
use serde::{Deserialize, Serialize};
use tracing_subscriber::{EnvFilter, fmt, prelude::*};

#[derive(oxanus::Registry)]
struct ComponentRegistry(oxanus::ComponentRegistry<WorkerContext, WorkerError>);

#[derive(Debug, thiserror::Error)]
enum WorkerError {}

Expand Down Expand Up @@ -32,9 +35,7 @@ pub async fn main() -> Result<(), oxanus::OxanusError> {

let ctx = oxanus::Context::value(WorkerContext {});
let storage = oxanus::Storage::builder().build_from_env()?;
let config = oxanus::Config::new(&storage)
.register_queue::<QueueOne>()
.register_worker::<TestWorker>()
let config = ComponentRegistry::build_config(&storage)
.with_graceful_shutdown(tokio::signal::ctrl_c())
.exit_when_processed(1);

Expand Down
Loading