diff --git a/Cargo.lock b/Cargo.lock index 4942d96..5c9dab7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2,69 +2,6 @@ # It is not intended for manual editing. version = 3 -[[package]] -name = "actix" -version = "0.13.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dc9ef49f64074352f73ef9ec8c060a5f5799c96715c986a17f10933c3da2955c" -dependencies = [ - "actix-macros", - "actix-rt", - "actix_derive", - "bitflags 2.4.2", - "bytes", - "crossbeam-channel", - "futures-core", - "futures-sink", - "futures-task", - "futures-util", - "log", - "once_cell", - "parking_lot", - "pin-project-lite", - "smallvec", - "tokio", - "tokio-util", -] - -[[package]] -name = "actix-macros" -version = "0.2.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e01ed3140b2f8d422c68afa1ed2e85d996ea619c988ac834d255db32138655cb" -dependencies = [ - "quote", - "syn", -] - -[[package]] -name = "actix-mpv" -version = "0.1.0" -dependencies = [ - "actix", -] - -[[package]] -name = "actix-rt" -version = "2.9.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "28f32d40287d3f402ae0028a9d54bef51af15c8769492826a69d28f81893151d" -dependencies = [ - "futures-core", - "tokio", -] - -[[package]] -name = "actix_derive" -version = "0.6.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7c7db3d5a9718568e4cf4a537cfd7070e6e6ff7481510d0237fb529ac850f6d3" -dependencies = [ - "proc-macro2", - "quote", - "syn", -] - [[package]] name = "addr2line" version = "0.21.0" @@ -107,18 +44,6 @@ dependencies = [ "rustc-demangle", ] -[[package]] -name = "bitflags" -version = "1.3.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a" - -[[package]] -name = "bitflags" -version = "2.4.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ed570934406eb16438a4e976b1b4500774099c13b8cb96eec99f620f05090ddf" - [[package]] name = "bytes" version = "1.5.0" @@ -149,21 +74,6 @@ dependencies = [ "unicode-segmentation", ] -[[package]] -name = "crossbeam-channel" -version = "0.5.11" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "176dc175b78f56c0f321911d9c8eb2b77a78a4860b9c19db83835fea1a46649b" -dependencies = [ - "crossbeam-utils", -] - -[[package]] -name = "crossbeam-utils" -version = "0.8.19" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "248e3bacc7dc6baa3b21e405ee045c3047101a49145e7e9eca583ab4c2ca5345" - [[package]] name = "deranged" version = "0.3.11" @@ -175,9 +85,9 @@ dependencies = [ [[package]] name = "derive-new" -version = "0.6.0" +version = "0.7.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d150dea618e920167e5973d70ae6ece4385b7164e0d799fe7c122dd0a5d912ad" +checksum = "2cdc8d50f426189eef89dac62fabfa0abb27d5cc008f25bf4156a0203325becc" dependencies = [ "proc-macro2", "quote", @@ -336,16 +246,6 @@ version = "0.2.153" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9c198f91728a82281a64e1f4f9eeb25d82cb32a5de251c6bd1b5154d63a8e7bd" -[[package]] -name = "lock_api" -version = "0.4.11" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3c168f8615b12bc01f9c17e2eb0cc07dcae1940121185446edc3744920e8ef45" -dependencies = [ - "autocfg", - "scopeguard", -] - [[package]] name = "log" version = "0.4.20" @@ -425,29 +325,6 @@ version = "0.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b15813163c1d831bf4a13c3610c05c0d03b39feb07f7e09fa234dac9b15aaf39" -[[package]] -name = "parking_lot" -version = "0.12.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3742b2c103b9f06bc9fff0a37ff4912935851bee6d36f3c02bcc755bcfec228f" -dependencies = [ - "lock_api", - "parking_lot_core", -] - -[[package]] -name = "parking_lot_core" -version = "0.9.9" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4c42a9226546d68acdd9c0a280d17ce19bfe27a46bf68784e4066115788d008e" -dependencies = [ - "cfg-if", - "libc", - "redox_syscall", - "smallvec", - "windows-targets", -] - [[package]] name = "paste" version = "1.0.14" @@ -480,9 +357,9 @@ checksum = "5b40af805b3121feab8a3c29f04d8ad262fa8e0561883e7653e024ae4479e6de" [[package]] name = "proc-macro2" -version = "1.0.78" +version = "1.0.92" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e2422ad645d89c99f8f3e6b88a9fdeca7fabeac836b1002371c4367c8f984aae" +checksum = "37d3544b3f2748c54e147655edb5025752e2303145b5aefb3c3ea2c78b973bb0" dependencies = [ "unicode-ident", ] @@ -526,27 +403,12 @@ dependencies = [ "getrandom", ] -[[package]] -name = "redox_syscall" -version = "0.4.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4722d768eff46b75989dd134e5c353f0d6296e5aaa3132e776cbdb56be7731aa" -dependencies = [ - "bitflags 1.3.2", -] - [[package]] name = "rustc-demangle" version = "0.1.23" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d626bb9dae77e28219937af045c257c28bfd3f69333c512553507f5f9798cb76" -[[package]] -name = "scopeguard" -version = "1.2.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49" - [[package]] name = "serde" version = "1.0.196" @@ -576,15 +438,6 @@ dependencies = [ "lazy_static", ] -[[package]] -name = "signal-hook-registry" -version = "1.4.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d8229b473baa5980ac72ef434c4415e70c4b5e71b423043adb4ba059f89c99a1" -dependencies = [ - "libc", -] - [[package]] name = "slab" version = "0.4.9" @@ -612,9 +465,9 @@ dependencies = [ [[package]] name = "syn" -version = "2.0.48" +version = "2.0.89" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0f3531638e407dfc0814761abb7c00a5b54992b849452a0646b7f65c9f770f3f" +checksum = "44d46482f1c1c87acd84dea20c1bf5ebff4c757009ed6bf19cfd36fb10e92c4e" dependencies = [ "proc-macro2", "quote", @@ -623,18 +476,18 @@ dependencies = [ [[package]] name = "thiserror" -version = "1.0.56" +version = "2.0.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d54378c645627613241d077a3a79db965db602882668f9136ac42af9ecb730ad" +checksum = "c006c85c7651b3cf2ada4584faa36773bd07bac24acfb39f3c431b36d7e667aa" dependencies = [ "thiserror-impl", ] [[package]] name = "thiserror-impl" -version = "1.0.56" +version = "2.0.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fa0faa943b50f3db30a20aa7e265dbc66076993efed8463e8de414e5d06d3471" +checksum = "f077553d607adc1caf65430528a576c757a71ed73944b66ebb58ef2bbd243568" dependencies = [ "proc-macro2", "quote", @@ -670,24 +523,6 @@ version = "0.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ef927ca75afb808a4d64dd374f00a2adf8d0fcff8e7b184af886c3c87ec4a3f3" -[[package]] -name = "toactor_play" -version = "0.1.0" -dependencies = [ - "tokactor", - "tokio", -] - -[[package]] -name = "tokactor" -version = "2.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9347d106cab235abbab396336c004efbf0da0edc1b9e74db63333990cfdf8b58" -dependencies = [ - "tokio", - "tracing", -] - [[package]] name = "tokio" version = "1.35.1" @@ -695,13 +530,10 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c89b4efa943be685f629b149f53829423f8f5531ea21249408e8e2f8671ec104" dependencies = [ "backtrace", - "bytes", "libc", "mio", "num_cpus", - "parking_lot", "pin-project-lite", - "signal-hook-registry", "socket2", "tokio-macros", "windows-sys", @@ -720,37 +552,13 @@ dependencies = [ [[package]] name = "tokio-stream" -version = "0.1.14" +version = "0.1.16" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "397c988d37662c7dda6d2208364a706264bf3d6138b11d436cbac0ad38832842" +checksum = "4f4e6ce100d0eb49a2734f8c0812bcd324cf357d21810932c5df6b96ef2b86f1" dependencies = [ "futures-core", "pin-project-lite", "tokio", - "tokio-util", -] - -[[package]] -name = "tokio-util" -version = "0.7.10" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5419f34732d9eb6ee4c3578b7989078579b7f039cbbb9ca2c4da015749371e15" -dependencies = [ - "bytes", - "futures-core", - "futures-sink", - "pin-project-lite", - "tokio", - "tracing", -] - -[[package]] -name = "tokio_play" -version = "0.1.0" -dependencies = [ - "futures", - "tokio", - "tokio-stream", ] [[package]] @@ -825,6 +633,7 @@ dependencies = [ "thiserror", "time", "tokio", + "tokio-stream", "tracing", "tracing-subscriber", ] diff --git a/Cargo.toml b/Cargo.toml index c9468fe..d233124 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -2,9 +2,6 @@ resolver = "2" members = [ "src/uactor", - "experiments/actix-mpv", - "experiments/tokio_play", - "experiments/toactor_play", ] [workspace.dependencies] @@ -12,7 +9,7 @@ tokio = { version = "1.35.0", features = ["net", "sync", "time", "rt", "macros", futures = "0.3" # errors -thiserror = "1.0" +thiserror = "2" anyhow = { version = "1.0", features = ["backtrace"] } # tracing @@ -26,4 +23,4 @@ strum = { version = "0.26", features = ["derive"] } strum_macros = "0.26" derive_more = { version = "1", features = ["full"] } -derive-new = "0.6.0" +derive-new = "0.7" diff --git a/README.md b/README.md index 5188f8a..2ea3a07 100644 --- a/README.md +++ b/README.md @@ -20,6 +20,9 @@ Examples can be found [here](src/uactor/examples). 9. Implemented support for actors for which it is necessary to work with multiple message sources (channels) [Example: Multi channel](src/uactor/examples/multiple_incoming_channels.rs) 10. Implemented shared state for actors [Example: Shared state](src/uactor/examples/shared_state.rs) +### Actor lifecycle +![Lifecycle.png](docs/assets/Lifecycle.png) + ### Other projects: 1. Actix 2. Ractor diff --git a/docs/assets/Lifecycle.png b/docs/assets/Lifecycle.png new file mode 100644 index 0000000..4a5666a Binary files /dev/null and b/docs/assets/Lifecycle.png differ diff --git a/experiments/actix-mpv/Cargo.toml b/experiments/actix-mpv/Cargo.toml deleted file mode 100644 index 96b1f4c..0000000 --- a/experiments/actix-mpv/Cargo.toml +++ /dev/null @@ -1,9 +0,0 @@ -[package] -name = "actix-mpv" -version = "0.1.0" -edition = "2021" - -# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html - -[dependencies] -actix = "0.13" \ No newline at end of file diff --git a/experiments/actix-mpv/src/main.rs b/experiments/actix-mpv/src/main.rs deleted file mode 100644 index a23d803..0000000 --- a/experiments/actix-mpv/src/main.rs +++ /dev/null @@ -1,31 +0,0 @@ -#![allow(dead_code)] -#![allow(unused_imports)] -#![allow(unused_variables)] -#![allow(unused_mut)] -use actix::{Actor, ActorContext, AsyncContext, Context, Handler, Message}; - -fn main() { - struct MyActor; - - struct WhoAmI; - - impl Message for WhoAmI { - type Result = Result, ()>; - } - - impl Actor for MyActor { - type Context = Context; - } - - impl Handler for MyActor { - type Result = Result, ()>; - - fn handle(&mut self, msg: WhoAmI, ctx: &mut Context) -> Self::Result { - ctx.stop(); - Ok(ctx.address()) - } - } - - // let who_addr = addr.do_send(WhoAmI{}); - println!("Hello, world!"); -} diff --git a/experiments/toactor_play/Cargo.toml b/experiments/toactor_play/Cargo.toml deleted file mode 100644 index 29acd80..0000000 --- a/experiments/toactor_play/Cargo.toml +++ /dev/null @@ -1,10 +0,0 @@ -[package] -name = "toactor_play" -version = "0.1.0" -edition = "2021" -readme = "../../README.md" -homepage = "https://serde.rs/" - -[dependencies] -tokio = "1.35.1" -tokactor = "2.1.0" \ No newline at end of file diff --git a/experiments/toactor_play/src/main.rs b/experiments/toactor_play/src/main.rs deleted file mode 100644 index 3e3b141..0000000 --- a/experiments/toactor_play/src/main.rs +++ /dev/null @@ -1,71 +0,0 @@ -#![allow(dead_code)] -#![allow(unused_imports)] -#![allow(unused_variables)] -#![allow(unused_mut)] -use tokactor::{Actor, Ask, AskResult, Ctx}; - -struct Router {} - -pub struct PingPong { - counter: u8, -} - -/// This is the types of message [PingPong] supports -#[derive(Debug, Clone)] -pub enum Msg { - Ping, - Pong, -} -impl Msg { - // retrieve the next message in the sequence - fn next(&self) -> Self { - match self { - Self::Ping => Self::Pong, - Self::Pong => Self::Ping, - } - } - // print out this message - fn print(&self) { - match self { - Self::Ping => print!("ping.."), - Self::Pong => print!("pong.."), - } - } -} - -impl Actor for PingPong {} -impl Ask for PingPong { - type Result = Msg; - - // This is our main message handler - fn handle(&mut self, message: Msg, _: &mut Ctx) -> AskResult { - message.print(); - self.counter += 1; - AskResult::Reply(message.next()) - } -} - -#[tokio::main] -async fn main() { - // let world = World::new().unwrap(); - // - // let db = world.with_state(async || Db::connect().await.unwrap()); - // - // let router = Router { db }; - // - // let tcp_input = world.tcp_component("localhost:8080", router); - // - // world.on_input(tcp_input); - // - // world.block_until_completion(); - let handle = PingPong { counter: 0 }.start(); - let mut message = Msg::Ping; - for _ in 0..10 { - message = handle.ask(message).await.unwrap(); - } - let actor = handle - .await - .expect("Ping-pong actor failed to exit properly"); - assert_eq!(actor.counter, 10); - println!("\nProcessed {} messages", actor.counter); -} diff --git a/experiments/tokio_play/Cargo.toml b/experiments/tokio_play/Cargo.toml deleted file mode 100644 index 9ba3842..0000000 --- a/experiments/tokio_play/Cargo.toml +++ /dev/null @@ -1,11 +0,0 @@ -[package] -name = "tokio_play" -version = "0.1.0" -edition = "2021" - -# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html - -[dependencies] -tokio = { workspace = true } -futures = { workspace = true } -tokio-stream = { version = "0.1", features = ["sync"] } \ No newline at end of file diff --git a/experiments/tokio_play/src/main.rs b/experiments/tokio_play/src/main.rs deleted file mode 100644 index 437ddd7..0000000 --- a/experiments/tokio_play/src/main.rs +++ /dev/null @@ -1,151 +0,0 @@ -#![allow(dead_code)] -#![allow(unused_imports)] -#![allow(unused_variables)] -#![allow(unused_mut)] -use futures::StreamExt; -use std::rc::Rc; -use std::sync::Arc; -use std::time::Duration; -use tokio::runtime::Runtime; -use tokio::sync::mpsc::{Receiver, Sender}; -use tokio::sync::watch; -use tokio_stream::wrappers::WatchStream; - -async fn local() { - let nonsend_data = Rc::new("world"); - let local = tokio::task::LocalSet::new(); - - let nonsend_data2 = nonsend_data.clone(); - let util1 = local.run_until(async move { - // ... - tokio::time::sleep(tokio::time::Duration::from_millis(300)).await; - println!("hello {}", nonsend_data2) - }); - - let util2 = local.run_until(async move { - tokio::time::sleep(tokio::time::Duration::from_millis(100)).await; - println!("goodbye {}", nonsend_data) - }); - - util1.await; - util2.await; -} - -// async fn main1() { -// #[derive(Debug)] -// enum Message1 { -// Ping, -// Pong -// } -// -// #[derive(Debug)] -// enum ReqMsg { -// GET -// } -// -// #[derive(Debug)] -// enum RespMsg { -// Ok, -// Err, -// } -// -// async fn process1(mut rx1: Receiver, mut req_rx: Receiver, resp_tx: Sender){ -// -// } -// -// let (tx1, mut rx1) = tokio::sync::mpsc::channel::(10); -// let (req_tx, mut req_rx) = tokio::sync::mpsc::channel::(10); -// let (resp_tx, mut resp_rx) = tokio::sync::mpsc::channel::(10); -// tokio::spawn(async { -// println!("started 1 process"); -// let local = tokio::task::LocalSet::new(); -// -// local.run_until(async { -// local.spawn_local(async { -// println!("started 1.1 process"); -// loop { -// println!("1.1 loop"); -// // if let Some(msg) = rx1.recv().await { -// // println!("msg1: {msg:?}") -// // } -// } -// }); -// // local.spawn_local(async { -// // println!("started 1.2 process"); -// // loop { -// // println!("1.2 loop"); -// // // if let Some(msg) = req_rx.recv().await { -// // // println!("req: {msg:?}"); -// // // resp_tx.send(RespMsg::Ok).await; -// // // } -// // } -// // }); -// // handle.await; -// }).await; -// -// Ok::<(), std::io::Error>(()) -// }); -// -// // let process2_handle = tokio::spawn(async move { -// // let local = tokio::task::LocalSet::new(); -// // local.spawn_local(async move { -// // loop { -// // if let Some(msg) = resp_rx.recv().await { -// // println!("resp: {msg:?}") -// // } -// // } -// // }); -// // }); -// -// -// tx1.send(Message1::Ping); -// req_tx.send(ReqMsg::GET); -// -// // tokio::join!(process1_handle, process2_handle); -// tokio::time::sleep(Duration::from_secs(10)).await; -// } - -trait Message {} - -#[tokio::main] -async fn main() { - // use futures::FutureExt; - // use futures::future::select_all; - // use tokio::sync::mpsc; - // - // // Предположим, что Message и Receiver уже определены - // let receivers: Vec> = vec![/* ... */]; - // - // let mut futures = Vec::new(); - // for mut receiver in receivers { - // let future = Box::pin(async move { - // let option = receiver.recv().await; - // option - // }); - // futures.push(future); - // } - // - // while !futures.is_empty() { - // let (result, index, remaining) = select_all(futures).await; - // futures = remaining; - // - // if let Some(msg) = result { - // println!("msg: {msg}"); - // } - // } - - let (tx, rx1) = watch::channel::(100500); - // WatchStream::new() - let mut stream1 = tokio_stream::wrappers::WatchStream::new(rx1.clone()); - let mut stream2 = tokio_stream::wrappers::WatchStream::new(rx1); - let value = stream1.next().await.unwrap(); - println!("value 1: {value:?}"); - let value = stream2.next().await.unwrap(); - println!("value 2: {value:?}"); - - let (_, rx2) = watch::channel::(100500); - // WatchStream::new() - let mut stream1 = tokio_stream::wrappers::WatchStream::new(rx2); - - // WatchStream::from(&) -} diff --git a/src/uactor/Cargo.toml b/src/uactor/Cargo.toml index 45b50c5..ad60e48 100644 --- a/src/uactor/Cargo.toml +++ b/src/uactor/Cargo.toml @@ -11,6 +11,7 @@ description = "The fastest and most modular actor system that doesn't force you [dependencies] tokio = { workspace = true } +tokio-stream = "0.1" futures = { workspace = true } thiserror = { workspace = true } tracing = { workspace = true } @@ -44,6 +45,7 @@ name = "single_channel_actor" name = "supervised_actor" [features] -default = ["tokio_tracing"] +default = ["tokio_tracing", "enable_spawn_macros"] async_sender = [] -tokio_tracing = [] \ No newline at end of file +tokio_tracing = [] +enable_spawn_macros = [] \ No newline at end of file diff --git a/src/uactor/examples/dependency_injection.rs b/src/uactor/examples/dependency_injection.rs index 2a97e03..eb1f929 100644 --- a/src/uactor/examples/dependency_injection.rs +++ b/src/uactor/examples/dependency_injection.rs @@ -1,19 +1,15 @@ use time::ext::NumericalStdDuration; -use uactor::actor::MessageSender; +use uactor::actor::abstract_actor::MessageSender; use uactor::system::System; -use crate::actor1::Actor1; -use crate::actor1::Actor1Msg; -use crate::actor1::Actor1Ref; -use crate::actor2::Actor2; -use crate::actor2::Actor2Msg; -use crate::actor2::Actor2Ref; +use crate::actor1::{Actor1, Actor1MpscRef}; +use crate::actor2::{Actor2, Actor2MpscRef}; use crate::messages::{MessageWithoutReply, PingMsg, PongMsg}; use crate::services::{Service1, Service2}; mod messages { - use uactor::message::{Message, Reply}; + use uactor::actor::message::{Message, Reply}; pub struct PingMsg(pub Reply); @@ -30,15 +26,14 @@ mod messages { } mod actor1 { - use tokio::sync::mpsc::UnboundedSender; - - use uactor::actor::{Actor, HandleResult, Handler, MessageSender}; - use uactor::context::extensions::Service; - use uactor::context::Context; - use uactor::di::{Inject, InjectError}; + use uactor::actor::abstract_actor::{Actor, HandleResult, Handler, MessageSender}; + use uactor::actor::context::extensions::Service; + use uactor::actor::context::Context; + use uactor::data::data_publisher::DataPublisher; + use uactor::dependency_injection::{Inject, InjectError}; use uactor::system::System; - use crate::actor2::{Actor2Msg, Actor2Ref}; + use crate::actor2::{Actor2, Actor2MpscRef}; use crate::messages::{MessageWithoutReply, PingMsg, PongMsg, PrintMessage}; use crate::services::Service1; @@ -47,7 +42,7 @@ mod actor1 { #[derive(derive_more::Constructor)] pub struct Services { service1: Service, - actor2_ref: Actor2Ref>, + actor2_ref: Actor2MpscRef, } impl Inject for Services { @@ -56,8 +51,7 @@ mod actor1 { Self: Sized, { let service1 = system.get_service()?; - let actor2_ref = - system.get_actor::>>("actor2".into())?; + let actor2_ref = system.get_actor::("actor2".into())?; Ok(Services::new(service1, actor2_ref)) } } @@ -104,10 +98,10 @@ mod actor1 { } mod actor2 { - use uactor::actor::{Actor, HandleResult, Handler}; - use uactor::context::extensions::Service; - use uactor::context::Context; - use uactor::di::{Inject, InjectError}; + use uactor::actor::abstract_actor::{Actor, HandleResult, Handler}; + use uactor::actor::context::extensions::Service; + use uactor::actor::context::Context; + use uactor::dependency_injection::{Inject, InjectError}; use uactor::system::System; use crate::messages::{PingMsg, PongMsg, PrintMessage}; @@ -207,16 +201,17 @@ async fn main() -> anyhow::Result<()> { .build(); // Init actor (instance + spawn actor) - let actor1 = Actor1; - let (actor1_ref, _) = uactor::spawn_with_ref!(system, actor1: Actor1); + let (actor1_ref, actor1_stream) = system.register_ref::("actor1"); // Init actor2 (instance + spawn actor) - let actor2 = Actor2; - let (actor2_ref, _) = uactor::spawn_with_ref!(system, actor2: Actor2); + let (actor2_ref, actor2_stream) = system.register_ref::("actor2"); // Run actors - system.run_actor::(actor1_ref.name()).await?; - system.run_actor::(actor2_ref.name()).await?; + let actor1 = Actor1; + system.spawn_actor(actor1_ref.name(), actor1, *actor1_ref.state(), (actor1_stream)).await?; + + let actor2 = Actor2; + system.spawn_actor(actor2_ref.name(), actor2, *actor2_ref.state(), (actor2_stream)).await?; // Case #1: send messages and call injected (not from &self) services inside handlers println!( diff --git a/src/uactor/examples/interval.rs b/src/uactor/examples/interval.rs index 31fb45e..c46c340 100644 --- a/src/uactor/examples/interval.rs +++ b/src/uactor/examples/interval.rs @@ -1,15 +1,16 @@ use time::ext::NumericalStdDuration; -use uactor::actor::MessageSender; +use tokio::sync::mpsc::UnboundedSender; +use uactor::actor::abstract_actor::MessageSender; use uactor::system::System; -use crate::actor1::Actor1; +use crate::actor1::{Actor1, Actor1MpscRef}; use crate::actor1::Actor1Msg; use crate::actor1::Actor1Ref; use crate::messages::{PingMsg, PongMsg}; mod messages { - use uactor::message::{Message, Reply}; + use uactor::actor::message::{Message, Reply}; pub struct PingMsg(pub Reply); @@ -20,9 +21,9 @@ mod messages { } mod actor1 { - use uactor::actor::{Actor, HandleResult, Handler}; - use uactor::context::Context; - use uactor::message::IntervalMessage; + use uactor::actor::abstract_actor::{Actor, HandleResult, Handler}; + use uactor::actor::context::Context; + use uactor::actor::message::IntervalMessage; use crate::messages::{PingMsg, PongMsg}; @@ -77,16 +78,15 @@ mod actor1 { #[tokio::main] async fn main() -> anyhow::Result<()> { - let actor1 = Actor1::default(); - let mut system = System::global().build(); // 1 second interval let interval = tokio::time::interval(1.std_seconds()); - let (actor1_ref, _) = uactor::spawn_with_ref!(system, actor1: Actor1, interval); + let (actor1_ref, actor1_stream) = system.register_ref::("actor1"); - system.run_actor::(actor1_ref.name()).await?; + let actor1 = Actor1::default(); + system.spawn_actor(actor1_ref.name(), actor1, *actor1_ref.state(), (actor1_stream, interval)).await?; let pong = actor1_ref.ask::(PingMsg).await?; println!("main: received {pong:?} message"); diff --git a/src/uactor/examples/multiple_incoming_channels.rs b/src/uactor/examples/multiple_incoming_channels.rs index f7ca59b..c9e26b0 100644 --- a/src/uactor/examples/multiple_incoming_channels.rs +++ b/src/uactor/examples/multiple_incoming_channels.rs @@ -1,5 +1,5 @@ use std::time::Duration; - +use tokio::sync::mpsc; use tracing::level_filters::LevelFilter; use tracing_subscriber::layer::SubscriberExt; use tracing_subscriber::util::SubscriberInitExt; @@ -11,7 +11,7 @@ use crate::actor2::Actor2; use crate::messages::{PingPongMsg, ReqMsg, RespMsg}; pub mod messages { - use uactor::message::Message; + use uactor::actor::message::Message; use uactor::message_impl; #[derive(Debug)] @@ -35,13 +35,14 @@ pub mod messages { } pub mod actor1 { - use uactor::actor::{Actor, HandleResult, Handler}; - use uactor::context::Context; + use tokio::sync::mpsc; + use uactor::actor::abstract_actor::{Actor, HandleResult, Handler}; + use uactor::actor::context::Context; use crate::messages::{PingPongMsg, ReqMsg, RespMsg}; pub struct Actor1 { - pub resp_tx: tokio::sync::mpsc::Sender, + pub resp_tx: mpsc::Sender, } impl Actor for Actor1 { @@ -79,8 +80,8 @@ pub mod actor1 { } pub mod actor2 { - use uactor::actor::{Actor, HandleResult, Handler}; - use uactor::context::Context; + use uactor::actor::abstract_actor::{Actor, HandleResult, Handler}; + use uactor::actor::context::Context; use crate::messages::RespMsg; @@ -107,38 +108,36 @@ pub mod actor2 { } #[tokio::main] -async fn main() { +async fn main() -> anyhow::Result<()> { tracing_subscriber::registry() .with(LevelFilter::TRACE) .with(tracing_subscriber::fmt::layer()) .init(); + // Initialize system + let mut system = System::global().build(); + // Initialize channels - let (ping_tx, ping_rx) = tokio::sync::mpsc::channel::(10); - let (req_tx, req_rx) = tokio::sync::mpsc::channel::(10); - let (resp_tx, resp_rx) = tokio::sync::mpsc::channel::(10); + let (ping_tx, ping_rx) = mpsc::channel::(10); + let (req_tx, req_rx) = mpsc::channel::(10); + let (resp_tx, resp_rx) = mpsc::channel::(10); // Initialize actors let actor1 = Actor1 { resp_tx }; let actor2 = Actor2; - // Initialize system - let mut system = System::global().build(); - - // Initialize actors - let (actor1_name, shared_state, handle1) = system.init_actor(actor1, None, (ping_rx, req_rx)); - let (actor2_name, shared_state, handle2) = system.init_actor(actor2, None, resp_rx); - // Run actors - system.run_actor::(actor1_name).await.unwrap(); - system.run_actor::(actor2_name).await.unwrap(); + let (_, handle1) = system.spawn_actor("actor1".into(), actor1, (), (ping_rx, req_rx)).await?; + let (_, handle2) = system.spawn_actor("actor2".into(), actor2, (), resp_rx).await?; // Send messages - ping_tx.send(PingPongMsg::Ping).await.unwrap(); - req_tx.send(ReqMsg::GET).await.unwrap(); + ping_tx.send(PingPongMsg::Ping).await?; + req_tx.send(ReqMsg::GET).await?; // Tokio aspects to stop spawned tasks without errors tokio::time::sleep(Duration::from_nanos(1)).await; handle1.abort_handle().abort(); handle2.abort_handle().abort(); + + Ok(()) } diff --git a/src/uactor/examples/shared_state.rs b/src/uactor/examples/shared_state.rs index 7e8a4ea..35a8a09 100644 --- a/src/uactor/examples/shared_state.rs +++ b/src/uactor/examples/shared_state.rs @@ -1,13 +1,14 @@ use std::sync::atomic::{AtomicU8, Ordering}; +use std::sync::Arc; use std::time::Duration; use tracing::level_filters::LevelFilter; use tracing_subscriber::layer::SubscriberExt; use tracing_subscriber::util::SubscriberInitExt; -use uactor::actor::{Actor, HandleResult, Handler, MessageSender}; -use uactor::context::Context; +use uactor::actor::abstract_actor::{Actor, HandleResult, Handler, MessageSender}; +use uactor::actor::context::Context; use uactor::system::System; -use uactor::message::{Message, Reply}; +use uactor::actor::message::Message; pub struct PingMsg; uactor::message_impl!(PingMsg); @@ -16,21 +17,27 @@ pub struct Actor1; #[derive(Default)] pub struct Actor1State { - pub counter: AtomicU8, + counter: AtomicU8, +} + +impl Actor1State { + pub fn get_counter(&self) -> u8 { + self.counter.load(Ordering::Relaxed) + } } impl Actor for Actor1 { type Context = Context; type Inject = (); - type State = Actor1State; + type State = Arc; } impl Handler for Actor1 { async fn handle( &mut self, _: &mut Self::Inject, - ping: PingMsg, - ctx: &mut Self::Context, + _ping: PingMsg, + _ctx: &mut Self::Context, state: &Self::State, ) -> HandleResult { state.counter.fetch_add(1, Ordering::Relaxed); @@ -49,19 +56,20 @@ async fn main() -> anyhow::Result<()> { let mut system = System::global().build(); - let actor1 = Actor1; - - let (actor1_ref, _) = uactor::spawn_with_ref!(system, actor1: Actor1); + let (actor1_ref, actor1_stream) = system.register_ref::("actor1"); - system.run_actor::(actor1_ref.name()).await?; + let actor1 = Actor1; + let (_, handle) = system.spawn_actor(actor1_ref.name(), actor1, actor1_ref.state().clone(), (actor1_stream)).await?; - let pong = actor1_ref.send(PingMsg); - let pong = actor1_ref.send(PingMsg); - let pong = actor1_ref.send(PingMsg); + actor1_ref.send(PingMsg)?; + actor1_ref.send(PingMsg)?; + actor1_ref.send(PingMsg)?; tokio::time::sleep(Duration::from_secs(1)).await; - assert_eq!(actor1_ref.state.counter.load(Ordering::Relaxed), 3); + assert_eq!(actor1_ref.state.get_counter(), 3); + // stop the actor + handle.abort_handle().abort(); Ok(()) } diff --git a/src/uactor/examples/single_channel_actor.rs b/src/uactor/examples/single_channel_actor.rs index b00741f..4b4313c 100644 --- a/src/uactor/examples/single_channel_actor.rs +++ b/src/uactor/examples/single_channel_actor.rs @@ -1,13 +1,15 @@ -use uactor::actor::MessageSender; +use tokio::sync::mpsc::UnboundedSender; +use uactor::actor::abstract_actor::MessageSender; +use uactor::aliases::ActorName; use uactor::system::System; -use crate::actor1::Actor1; +use crate::actor1::{Actor1, Actor1MpscRef}; use crate::actor1::Actor1Msg; use crate::actor1::Actor1Ref; use crate::messages::PingMsg; mod messages { - use uactor::message::{Message, Reply}; + use uactor::actor::message::{Message, Reply}; pub struct PingMsg(pub Reply); @@ -18,8 +20,9 @@ mod messages { } mod actor1 { - use uactor::actor::{Actor, HandleResult, Handler}; - use uactor::context::Context; + use tokio::sync::mpsc::UnboundedReceiver; + use uactor::actor::abstract_actor::{Actor, HandleResult, Handler}; + use uactor::actor::context::Context; use crate::messages::{PingMsg, PongMsg}; @@ -55,8 +58,9 @@ async fn main() -> anyhow::Result<()> { let mut system = System::global().build(); - let (actor1_ref, _) = uactor::spawn_with_ref!(system, actor1: Actor1); - system.run_actor::(actor1_ref.name()).await?; + let (actor1_ref, actor1_stream) = system.register_ref::("actor1"); + + system.spawn_actor(actor1_ref.name(), actor1, (), actor1_stream).await?; let pong = actor1_ref.ask(PingMsg).await?; println!("main: received {pong:?} message"); diff --git a/src/uactor/examples/supervised_actor.rs b/src/uactor/examples/supervised_actor.rs index 7649d16..d0dc8e1 100644 --- a/src/uactor/examples/supervised_actor.rs +++ b/src/uactor/examples/supervised_actor.rs @@ -1,18 +1,19 @@ use std::time::Duration; +use tokio::sync::mpsc::UnboundedSender; use tracing::level_filters::LevelFilter; use tracing_subscriber::layer::SubscriberExt; use tracing_subscriber::util::SubscriberInitExt; -use uactor::actor::MessageSender; +use uactor::actor::abstract_actor::MessageSender; use uactor::system::System; -use crate::actor1::Actor1; +use crate::actor1::{Actor1, Actor1MpscRef}; use crate::actor1::Actor1Msg; use crate::actor1::Actor1Ref; use crate::messages::PingMsg; -use crate::supervisor::{Supervisor, SupervisorMsg, SupervisorRef}; +use crate::supervisor::{Supervisor, SupervisorMpscRef, SupervisorMsg, SupervisorRef}; mod messages { - use uactor::message::{Message, Reply}; + use uactor::actor::message::{Message, Reply}; pub struct PingMsg(pub Reply); @@ -24,16 +25,13 @@ mod messages { mod actor1 { use crate::messages::{PingMsg, PongMsg}; - use crate::supervisor::{SupervisorMsg, SupervisorRef}; - use tokio::sync::mpsc; - use uactor::actor::{Actor, HandleResult, Handler}; - use uactor::context::supervised::SupervisedContext; - use uactor::context::ActorContext; + use uactor::actor::abstract_actor::{Actor, HandleResult, Handler}; + use uactor::actor::context::{ActorContext, Context}; pub struct Actor1; impl Actor for Actor1 { - type Context = SupervisedContext>>; + type Context = Context; type Inject = (); type State = (); } @@ -44,7 +42,7 @@ mod actor1 { _: &mut Self::Inject, ping: PingMsg, ctx: &mut Self::Context, - state: &Self::State, + _state: &Self::State, ) -> HandleResult { println!("actor1: Received ping message"); let PingMsg(reply) = ping; @@ -58,9 +56,8 @@ mod actor1 { } mod supervisor { - use std::os::macos::raw::stat; - use uactor::actor::{Actor, HandleResult, Handler}; - use uactor::context::{ActorDied, Context}; + use uactor::actor::abstract_actor::{Actor, HandleResult, Handler}; + use uactor::actor::context::{ActorDied, Context}; pub struct Supervisor; @@ -95,17 +92,18 @@ async fn main() -> anyhow::Result<()> { let mut system = System::global().build(); - let actor1 = Actor1; - let supervisor = Supervisor; + let (supervisor_ref, supervisor_stream) = system.register_ref::("supervisor"); + let (actor1_ref, actor1_stream) = system.register_ref::("actor1"); - let (actor1_ref, _) = uactor::spawn_with_ref!(system, actor1: Actor1); - let (supervisor_ref, _) = uactor::spawn_with_ref!(system, supervisor: Supervisor); + // Run supervisor + let supervisor = Supervisor; + system.spawn_actor(supervisor_ref.name(), supervisor, (), supervisor_stream).await?; - system - .run_actor::(supervisor_ref.name()) - .await?; - system.run_actor::(actor1_ref.name()).await?; + // Run actor1 + let actor1 = Actor1; + system.spawn_actor(actor1_ref.name(), actor1, (), actor1_stream).await?; + // ask actor1 to send a pong message let pong = actor1_ref.ask(PingMsg).await?; println!("main: received {pong:?} message"); diff --git a/src/uactor/src/actor.rs b/src/uactor/src/actor.rs deleted file mode 100644 index c35c5a5..0000000 --- a/src/uactor/src/actor.rs +++ /dev/null @@ -1,258 +0,0 @@ -use crate::context::ActorContext; -use crate::message::Message; -use std::future::Future; -use std::sync::Arc; - -pub trait State: std::any::Any + Send + 'static {} -impl State for T {} -pub type ActorPreStartResult = Result>; - -use crate::di::Inject; - -#[allow(unused_variables)] -pub trait Actor: Sized + Unpin + 'static { - /// Actor execution context type - type Context: ActorContext + Send; - - type Inject: Inject + Sized; - - type State: Default + Send + Sync; - - fn create_state(&mut self) -> Arc { - Arc::new(Default::default()) - } - - fn pre_start( - &mut self, - inject: &mut Self::Inject, - ctx: &mut Self::Context, - ) -> impl Future> + Send { - async { Ok(()) } - } -} -#[macro_export] -macro_rules! spawn_with_ref { - ($S: ident, $ActorName: ident, $ActorInstance: ident: $ActorType: ident, $($Timeout: ident),*) => {{ - uactor::paste! { - let (tx, rx) = tokio::sync::mpsc::unbounded_channel::<[<$ActorType Msg>]>(); - let (name, shared_state, handle): (std::sync::Arc, std::sync::Arc<<$ActorType as uactor::actor::Actor>::State>, tokio::task::JoinHandle<()>) = $S.init_actor($ActorInstance, Some($ActorName.to_owned()), ($($Timeout,)* rx)); - let actor_ref = [<$ActorType Ref>]::new(name, tx, shared_state); - $S.insert_actor(actor_ref.name(), uactor::data_publisher::TryClone::try_clone(&actor_ref)?); - (actor_ref, handle) - } - }}; - - ($S: ident, $ActorInstance: ident: $ActorType: ident, $($Timeout: ident),*) => {{ - let actor_name: std::sync::Arc = stringify!($ActorInstance).into(); - - uactor::paste! { - let (tx, rx) = tokio::sync::mpsc::unbounded_channel::<[<$ActorType Msg>]>(); - let (name, shared_state, handle): (std::sync::Arc, std::sync::Arc<<$ActorType as uactor::actor::Actor>::State>, tokio::task::JoinHandle<()>) = $S.init_actor($ActorInstance, Some(actor_name), ($($Timeout,)* rx)); - let actor_ref = [<$ActorType Ref>]::new(name, tx, shared_state); - $S.insert_actor(actor_ref.name(), uactor::data_publisher::TryClone::try_clone(&actor_ref)?); - (actor_ref, handle) - } - }}; - - ($S: ident, $ActorName: ident, $ActorInstance: ident: $ActorType: ident) => {{ - let actor_name: std::sync::Arc = stringify!($ActorInstance).into(); - uactor::paste! { - let (tx, rx) = tokio::sync::mpsc::unbounded_channel::<[<$ActorType Msg>]>(); - let (name, shared_state, handle): (std::sync::Arc, std::sync::Arc<<$ActorType as uactor::actor::Actor>::State>, tokio::task::JoinHandle<()>) = $S.init_actor($ActorInstance, Some(actor_name), (rx)); - let actor_ref = [<$ActorType Ref>]::new(name, tx, shared_state); - $S.insert_actor(actor_ref.name(), uactor::data_publisher::TryClone::try_clone(&actor_ref)?); - (actor_ref, handle) - } - }}; - - ($S: ident, $ActorInstance: ident: $ActorType: ident) => {{ - let actor_name: std::sync::Arc = stringify!($ActorInstance).into(); - uactor::paste! { - let (tx, rx) = tokio::sync::mpsc::unbounded_channel::<[<$ActorType Msg>]>(); - let (name, shared_state, handle): (std::sync::Arc, std::sync::Arc<<$ActorType as uactor::actor::Actor>::State>, tokio::task::JoinHandle<()>) = $S.init_actor($ActorInstance, Some(actor_name), (rx)); - let actor_ref = [<$ActorType Ref>]::new(name, tx, shared_state); - $S.insert_actor(actor_ref.name(), uactor::data_publisher::TryClone::try_clone(&actor_ref)?); - (actor_ref, handle) - } - }}; -} - -#[cfg(not(feature = "async_sender"))] -pub trait MessageSender -where - M: Message, -{ - fn send(&self, msg: M) -> crate::data_publisher::DataPublisherResult; - fn ask( - &self, - f: impl FnOnce(tokio::sync::oneshot::Sender) -> M, - ) -> impl Future>; -} - -#[cfg(feature = "async_sender")] -pub trait MessageSender -where - M: Message, -{ - async fn send(&self, msg: M) -> crate::data_publisher::DataPublisherResult; - async fn ask( - &self, - f: impl FnOnce(tokio::sync::oneshot::Sender) -> M, - ) -> Result; -} - -pub trait NamedActorRef { - fn static_name() -> &'static str; -} - -/// Example: -/// ``` -///# use std::future::Future; -/// use uactor::actor::{Actor, HandleResult}; -/// use uactor::context::Context; -/// use uactor::system::System; -/// let mut system = System::global().build(); -/// pub struct Actor1; -/// impl Actor for Actor1 { type Context = Context; type Inject = (); type State = (); } -/// let actor1 = Actor1; -/// use uactor::message::{Message}; -/// use uactor::message_impl; -/// pub struct Ping; -/// impl Message for Ping { fn static_name() -> &'static str { "Ping" } } -/// impl uactor::actor::Handler for Actor1 { async fn handle(&mut self, inject: &mut Self::Inject, msg: Ping, ctx: &mut Self::Context, state: &Self::State) -> HandleResult { todo!() } } -/// uactor::generate_actor_ref!(Actor1, { }); -/// ``` -/// let (mut actor1_ref, handle) = uactor::spawn_with_ref!(system, actor1: Actor1); -#[macro_export] -macro_rules! generate_actor_ref { - ($ActorType: ident, { $($Message: ident),* }) => { - uactor::paste! { - pub enum [<$ActorType Msg>] { - $($Message($Message)),* - } - - impl ::uactor::actor::NamedActorRef for [<$ActorType Msg>] { - fn static_name() -> &'static str { - stringify!([<$ActorType Msg>]) - } - } - - impl uactor::message::Message for [<$ActorType Msg>] { - fn static_name() -> &'static str { - stringify!([<$ActorType Msg>]) - } - - fn name(&self) -> String { - match self { - $( - Self::$Message(m) => m.name(), - )* - _ => Self::static_name().to_owned(), - } - } - } - - impl uactor::actor::Handler<[<$ActorType Msg>]> for $ActorType { - async fn handle( - &mut self, - inject: &mut ::Inject, - msg: [<$ActorType Msg>], - ctx: &mut ::Context, - state: &<$ActorType as uactor::actor::Actor>::State, - // state: &std::sync::Arc - // state: &Arc<{Self as uactor::actor::Actor}>::State - ) -> uactor::actor::HandleResult { - match msg { - $( - [<$ActorType Msg>]::$Message(m) => { - self.handle(inject, m, ctx, state).await?; - } - ),* - } - Ok(()) - } - } - - pub struct [<$ActorType Ref>] where T: uactor::data_publisher::DataPublisher]> + Clone { - name: std::sync::Arc, - state: std::sync::Arc<<$ActorType as uactor::actor::Actor>::State>, - sender: T, - } - - impl uactor::data_publisher::TryClone for [<$ActorType Ref>] where T: uactor::data_publisher::DataPublisher]> + Clone { - fn try_clone(&self) -> Result { - self.sender.try_clone().map(|sender| Self { name: self.name.clone(), sender, state: self.state.clone() }) - } - } - - impl Clone for [<$ActorType Ref>]]>> { - fn clone(&self) -> Self { - [<$ActorType Ref>]::new(self.name.clone(), self.sender.clone(), self.state.clone()) - } - } - - $( - #[cfg(not(feature = "async_sender"))] - impl uactor::actor::MessageSender<$Message> for [<$ActorType Ref>] where T: uactor::data_publisher::DataPublisher]> + Clone { - fn send(&self, msg: $Message) -> uactor::data_publisher::DataPublisherResult { - self.sender.publish([<$ActorType Msg>]::$Message(msg)) - } - - async fn ask(&self, f: impl FnOnce(tokio::sync::oneshot::Sender) -> $Message) -> Result { - let (tx, rx) = tokio::sync::oneshot::channel::(); - let message = f(tx); - self.sender.publish([<$ActorType Msg>]::$Message(message))?; - rx.await.map_err(uactor::data_publisher::DataPublisherErrors::from) - } - } - - #[cfg(feature = "async_sender")] - impl uactor::actor::MessageSender<$Message> for [<$ActorType Ref>] where T: uactor::data_publisher::DataPublisher]> + Clone { - async fn send(&self, msg: $Message) -> uactor::data_publisher::DataPublisherResult { - self.sender.publish([<$ActorType Msg>]::$Message(msg)).await - } - - async fn ask(&self, f: impl FnOnce(tokio::sync::oneshot::Sender) -> $Message) -> Result { - let (tx, rx) = tokio::sync::oneshot::channel::(); - let message = f(tx); - self.sender.publish([<$ActorType Msg>]::$Message(message))?; - rx.await.map_err(uactor::data_publisher::DataPublisherErrors::from) - } - } - )* - - impl [<$ActorType Ref>] where T: uactor::data_publisher::DataPublisher]> + Clone { - pub fn new(name: std::sync::Arc, sender: T, state: std::sync::Arc<<$ActorType as uactor::actor::Actor>::State>) -> Self { - let name = std::sync::Arc::from(name); - Self { name, sender, state } - } - - pub fn name(&self) -> std::sync::Arc { - self.name.clone() - } - - pub fn state(&self) -> &<$ActorType as uactor::actor::Actor>::State { - &self.state - } - } - } - - }; -} - -pub trait Handler -where - Self: Actor, - M: Message, -{ - /// This method is called for every message received by this actor. - fn handle( - &mut self, - inject: &mut Self::Inject, - msg: M, - ctx: &mut Self::Context, - state: &Self::State, - ) -> impl Future + Send; -} - -pub type HandleResult = Result<(), Box>; diff --git a/src/uactor/src/actor/abstract_actor.rs b/src/uactor/src/actor/abstract_actor.rs new file mode 100644 index 0000000..6591064 --- /dev/null +++ b/src/uactor/src/actor/abstract_actor.rs @@ -0,0 +1,82 @@ +use crate::actor::context::ActorContext; +use crate::actor::message::Message; +use std::future::Future; +use std::pin::Pin; +use std::sync::Arc; + +pub trait State: std::any::Any + Send + 'static {} +impl State for T {} + +use crate::dependency_injection::Inject; + +#[allow(unused_variables)] +pub trait Actor: Sized + Unpin + 'static { + /// Actor execution context type + type Context: ActorContext + Send; + + type Inject: Inject + Sized; + + type State: Default + Send + Sync + Clone; + + fn create_state(&mut self) -> Arc { + Arc::new(Default::default()) + } + + fn on_start( + &mut self, + inject: &mut Self::Inject, + ctx: &mut Self::Context, + ) -> impl Future + Send { + async { } + } + + fn on_error(&mut self, ctx: &mut Self::Context, error: HandleError) -> impl Future + Send { + async move { + tracing::error!("Actor error: {:?}", error); + } + } +} + +pub trait Handler +where + Self: Actor, + M: Message, +{ + /// This method is called for every message received by this actor. + fn handle( + &mut self, + inject: &mut Self::Inject, + msg: M, + ctx: &mut Self::Context, + state: &Self::State, + ) -> impl Future + Send; +} + +pub type HandleError = Box; + +pub type HandleResult = Result<(), HandleError>; + + +#[cfg(not(feature = "async_sender"))] +pub trait MessageSender +where + M: Message, +{ + fn send(&self, msg: M) -> crate::data::data_publisher::DataPublisherResult; + fn ask( + &self, + f: impl FnOnce(tokio::sync::oneshot::Sender) -> M, + ) -> impl Future>; +} + +#[cfg(feature = "async_sender")] +pub trait MessageSender +where + M: Message, +{ + async fn send(&self, msg: M) -> crate::data::data_publisher::DataPublisherResult; + async fn ask( + &self, + f: impl FnOnce(tokio::sync::oneshot::Sender) -> M, + ) -> Result; +} \ No newline at end of file diff --git a/src/uactor/src/actor/actor_ref.rs b/src/uactor/src/actor/actor_ref.rs new file mode 100644 index 0000000..5c69806 --- /dev/null +++ b/src/uactor/src/actor/actor_ref.rs @@ -0,0 +1,150 @@ + +/// Example: +/// ``` +///# use std::future::Future; +/// use uactor::actor::abstract_actor::{Actor, HandleResult}; +/// use uactor::actor::context::Context; +/// use uactor::system::System; +/// let mut system = System::global().build(); +/// pub struct Actor1; +/// impl Actor for Actor1 { type Context = Context; type Inject = (); type State = (); } +/// let actor1 = Actor1; +/// use uactor::actor::message::{Message}; +/// use uactor::message_impl; +/// pub struct Ping; +/// impl Message for Ping { fn static_name() -> &'static str { "Ping" } } +/// impl uactor::actor::abstract_actor::Handler for Actor1 { async fn handle(&mut self, inject: &mut Self::Inject, msg: Ping, ctx: &mut Self::Context, state: &Self::State) -> HandleResult { todo!() } } +/// uactor::generate_actor_ref!(Actor1, { }); +/// ``` +/// let (mut actor1_ref, handle) = uactor::spawn_with_ref!(system, actor1: Actor1); +#[macro_export] +macro_rules! generate_actor_ref { + ($ActorType: ident, { $($Message: ident),* }) => { + uactor::paste! { + pub enum [<$ActorType Msg>] { + $($Message($Message)),* + } + + impl uactor::actor::message::Message for [<$ActorType Msg>] { + fn static_name() -> &'static str { + stringify!([<$ActorType Msg>]) + } + + fn name(&self) -> String { + match self { + $( + Self::$Message(m) => m.name(), + )* + _ => Self::static_name().to_owned(), + } + } + } + + impl uactor::actor::abstract_actor::Handler<[<$ActorType Msg>]> for $ActorType { + async fn handle( + &mut self, + inject: &mut ::Inject, + msg: [<$ActorType Msg>], + ctx: &mut ::Context, + state: &<$ActorType as uactor::actor::abstract_actor::Actor>::State, + // state: &std::sync::Arc + // state: &Arc<{Self as uactor::actor::abstract_actor::Actor}>::State + ) -> uactor::actor::abstract_actor::HandleResult { + match msg { + $( + [<$ActorType Msg>]::$Message(m) => { + self.handle(inject, m, ctx, state).await?; + } + ),* + } + Ok(()) + } + } + + pub type [<$ActorType MpscRef>] = [<$ActorType Ref>]]>>; + + pub struct [<$ActorType Ref>] where T: uactor::data::data_publisher::DataPublisher]> + Clone { + name: std::sync::Arc, + state: <$ActorType as uactor::actor::abstract_actor::Actor>::State, + sender: T, + } + + impl uactor::data::data_publisher::TryClone for [<$ActorType Ref>] where T: uactor::data::data_publisher::DataPublisher]> + Clone { + fn try_clone(&self) -> Result { + self.sender.try_clone().map(|sender| Self { name: self.name.clone(), sender, state: self.state.clone() }) + } + } + + impl Clone for [<$ActorType Ref>]]>> { + fn clone(&self) -> Self { + [<$ActorType Ref>]::new(self.name.clone(), self.sender.clone(), self.state.clone()) + } + } + + impl From<( + uactor::aliases::ActorName, + tokio::sync::mpsc::UnboundedSender<[<$ActorType Msg>]>, + <$ActorType as uactor::actor::abstract_actor::Actor>::State + )> for [<$ActorType Ref>]]>> + { + fn from((name, sender, state): ( + uactor::aliases::ActorName, + tokio::sync::mpsc::UnboundedSender<[<$ActorType Msg>]>, + <$ActorType as uactor::actor::abstract_actor::Actor>::State + )) -> Self { + Self { + name, + sender, + state, + } + } + } + + $( + #[cfg(not(feature = "async_sender"))] + impl uactor::actor::abstract_actor::MessageSender<$Message> for [<$ActorType Ref>] where T: uactor::data::data_publisher::DataPublisher]> + Clone { + fn send(&self, msg: $Message) -> uactor::data::data_publisher::DataPublisherResult { + self.sender.publish([<$ActorType Msg>]::$Message(msg)) + } + + async fn ask(&self, f: impl FnOnce(tokio::sync::oneshot::Sender) -> $Message) -> Result { + let (tx, rx) = tokio::sync::oneshot::channel::(); + let message = f(tx); + self.sender.publish([<$ActorType Msg>]::$Message(message))?; + rx.await.map_err(uactor::data::data_publisher::DataPublisherErrors::from) + } + } + + #[cfg(feature = "async_sender")] + impl uactor::actor::abstract_actor::MessageSender<$Message> for [<$ActorType Ref>] where T: uactor::data::data_publisher::DataPublisher]> + Clone { + async fn send(&self, msg: $Message) -> uactor::data::data_publisher::DataPublisherResult { + self.sender.publish([<$ActorType Msg>]::$Message(msg)).await + } + + async fn ask(&self, f: impl FnOnce(tokio::sync::oneshot::Sender) -> $Message) -> Result { + let (tx, rx) = tokio::sync::oneshot::channel::(); + let message = f(tx); + self.sender.publish([<$ActorType Msg>]::$Message(message))?; + rx.await.map_err(uactor::data::data_publisher::DataPublisherErrors::from) + } + } + )* + + impl [<$ActorType Ref>] where T: uactor::data::data_publisher::DataPublisher]> + Clone { + pub fn new(name: std::sync::Arc, sender: T, state: <$ActorType as uactor::actor::abstract_actor::Actor>::State) -> Self { + let name = std::sync::Arc::from(name); + Self { name, sender, state } + } + + pub fn name(&self) -> std::sync::Arc { + self.name.clone() + } + + pub fn state(&self) -> &<$ActorType as uactor::actor::abstract_actor::Actor>::State { + &self.state + } + } + } + + }; +} \ No newline at end of file diff --git a/src/uactor/src/context.rs b/src/uactor/src/actor/context.rs similarity index 76% rename from src/uactor/src/context.rs rename to src/uactor/src/actor/context.rs index 285de6a..6fb11bd 100644 --- a/src/uactor/src/context.rs +++ b/src/uactor/src/actor/context.rs @@ -1,7 +1,8 @@ -use crate::message::Message; +use crate::actor::message::Message; use crate::system::System; use std::future::Future; use std::sync::Arc; +use crate::actor::abstract_actor::{Actor, HandleError}; pub type ContextResult = Result>; pub type ContextInitializationError = Result; @@ -16,16 +17,16 @@ pub trait ActorContext: Sized + Unpin + 'static { Ok(()) } #[inline] - fn on_iteration(&mut self) -> ContextResult<()> { - Ok(()) - } + fn after_iteration(&mut self) -> () { } + #[inline] + fn on_error(&mut self, _error: &HandleError) -> () { } fn kill(&mut self); fn get_name(&self) -> &str; #[allow(clippy::wrong_self_convention)] fn is_alive(&self) -> bool { true } - fn create( + fn create( system: &mut System, name: Arc, ) -> impl Future> + Send; @@ -58,80 +59,11 @@ impl ActorContext for Context { self.alive } - async fn create(_: &mut System, name: Arc) -> ContextInitializationError { + async fn create(_: &mut System, name: Arc) -> ContextInitializationError { Ok(Context { alive: true, name }) } } -pub mod supervised { - use crate::actor::MessageSender; - use crate::context::{ActorContext, ActorDied, ContextInitializationError, ContextResult}; - use crate::data_publisher::TryClone; - use crate::system::{utils, System}; - use std::sync::Arc; - - #[derive(derive_more::Constructor)] - pub struct SupervisedContext - where - T: MessageSender, - { - pub alive: bool, - _id: usize, - supervisor: T, - name: Arc, - } - - impl ActorContext for SupervisedContext - where - T: MessageSender + Unpin + 'static + TryClone + Send + Sync, - { - fn on_die(&mut self, actor_name: Arc) -> ContextResult<()> { - if let Err(e) = self.supervisor.send(ActorDied(actor_name)) { - tracing::error!("Failed to notify supervisor about actor death: {:?}", e); - } - Ok(()) - } - - fn kill(&mut self) { - self.alive = false; - } - - fn get_name(&self) -> &str { - &self.name - } - - fn is_alive(&self) -> bool { - self.alive - } - - async fn create(system: &mut System, name: Arc) -> ContextInitializationError { - let mut found_actors: Vec = system.get_actors::().map_err(|e| e.to_string())?; - let is_more_one = found_actors.len() > 1; - - if is_more_one { - let msg = format!("SupervisedContext can't be used with more than one actor: {:?} of the same kind", utils::type_name::()); - tracing::error!(msg); - return Err(msg); - } else if found_actors.is_empty() { - let msg = format!( - "SupervisedContext can't be used without selected supervisor's actor: {:?}", - utils::type_name::() - ); - tracing::error!(msg); - return Err(msg); - } - - let supervisor = found_actors.remove(0); - Ok(Self { - alive: true, - _id: rand::random(), - supervisor, - name, - }) - } - } -} - pub mod extensions { use std::any::{Any, TypeId}; use std::collections::HashMap; @@ -186,7 +118,7 @@ pub mod extensions { /// # Example /// /// ``` - /// # use uactor::context::extensions::Extensions; + /// # use uactor::actor::context::extensions::Extensions; /// let mut ext = Extensions::new(); /// assert!(ext.insert(5i32).is_none()); /// assert!(ext.insert(4u8).is_none()); @@ -211,7 +143,7 @@ pub mod extensions { /// # Example /// /// ``` - /// # use uactor::context::extensions::Extensions; + /// # use uactor::actor::context::extensions::Extensions; /// let mut ext = Extensions::new(); /// assert!(ext.get::().is_none()); /// ext.insert(5i32); @@ -230,7 +162,7 @@ pub mod extensions { /// # Example /// /// ``` - /// # use uactor::context::extensions::Extensions; + /// # use uactor::actor::context::extensions::Extensions; /// let mut ext = Extensions::new(); /// ext.insert(String::from("Hello")); /// ext.get_mut::().unwrap().push_str(" World"); @@ -251,7 +183,7 @@ pub mod extensions { /// # Example /// /// ``` - /// # use uactor::context::extensions::Extensions; + /// # use uactor::actor::context::extensions::Extensions; /// let mut ext = Extensions::new(); /// ext.insert(5i32); /// assert_eq!(ext.remove::(), Some(5i32)); @@ -274,7 +206,7 @@ pub mod extensions { /// # Example /// /// ``` - /// # use uactor::context::extensions::Extensions; + /// # use uactor::actor::context::extensions::Extensions; /// let mut ext = Extensions::new(); /// ext.insert(5i32); /// ext.clear(); @@ -293,7 +225,7 @@ pub mod extensions { /// # Example /// /// ``` - /// # use uactor::context::extensions::Extensions; + /// # use uactor::actor::context::extensions::Extensions; /// let mut ext = Extensions::new(); /// assert!(ext.is_empty()); /// ext.insert(5i32); @@ -309,7 +241,7 @@ pub mod extensions { /// # Example /// /// ``` - /// # use uactor::context::extensions::Extensions; + /// # use uactor::actor::context::extensions::Extensions; /// let mut ext = Extensions::new(); /// assert_eq!(ext.len(), 0); /// ext.insert(5i32); @@ -328,7 +260,7 @@ pub mod extensions { /// # Example /// /// ``` - /// # use uactor::context::extensions::Extensions; + /// # use uactor::actor::context::extensions::Extensions; /// let mut ext_a = Extensions::new(); /// ext_a.insert(8u8); /// ext_a.insert(16u16); @@ -394,14 +326,16 @@ pub mod extensions { } pub mod actor_registry { - use crate::context::extensions::IdHasher; - use crate::data_publisher::{TryClone, TryCloneError}; + use crate::actor::context::extensions::IdHasher; + use crate::data::data_publisher::{DataPublisher, TryClone, TryCloneError}; use std::any::{Any, TypeId}; use std::collections::HashMap; use std::fmt; use std::hash::BuildHasherDefault; use std::ops::{Deref, DerefMut}; use std::sync::Arc; + use crate::actor::abstract_actor::Actor; + use crate::actor::message::Message; type AnyBoxed = Box; @@ -417,33 +351,47 @@ pub mod actor_registry { Self::default() } - // TODO: docs - pub fn insert( + pub fn register_ref( &mut self, actor_name: Arc, - val: T, - ) -> Option { - let entry = self.inner.entry(TypeId::of::()).or_default(); - entry.insert(actor_name, Box::new(val)).and_then(|boxed| { + channel: D, + state: A::State + ) -> Option + where + A: Actor, + M: Message, + D: DataPublisher + Send + Sync + 'static, + { + let entry = self.inner.entry(TypeId::of::()).or_default(); + entry.insert(actor_name, Box::new((channel, state))).and_then(|boxed| { (boxed as Box) .downcast() .ok() .map(|boxed| *boxed) }) } - // TODO: docs - pub fn get_all(&self) -> Option> { + pub fn get_all(&self) -> Option> + where + A: Actor, + M: Message, + D: DataPublisher + Send + Sync + 'static + { self.inner - .get(&TypeId::of::())? + .get(&TypeId::of::())? .values() .map(|boxed| (&**boxed as &(dyn Any + 'static)).downcast_ref()) - .collect::>>() + .collect::>>() } // TODO: docs - pub fn get_actor(&self, actor_name: Arc) -> Option<&T> { - let boxed_actor_ref = self.inner.get(&TypeId::of::())?.get(&actor_name)?; + pub fn get_actor_ref(&self, actor_name: Arc) -> Option<&R> + where + T: Send + Sync + 'static, + R: 'static, + { + let group_by_type = self.inner.get(&TypeId::of::())?; + let boxed_actor_ref = group_by_type.get(&actor_name)?; (&**boxed_actor_ref as &(dyn Any + 'static)).downcast_ref() } diff --git a/src/uactor/src/message.rs b/src/uactor/src/actor/message.rs similarity index 100% rename from src/uactor/src/message.rs rename to src/uactor/src/actor/message.rs diff --git a/src/uactor/src/actor/mod.rs b/src/uactor/src/actor/mod.rs new file mode 100644 index 0000000..d61f4bc --- /dev/null +++ b/src/uactor/src/actor/mod.rs @@ -0,0 +1,5 @@ +pub mod abstract_actor; +pub mod context; +pub mod message; +pub mod actor_ref; +pub mod select; diff --git a/src/uactor/src/select.rs b/src/uactor/src/actor/select.rs similarity index 97% rename from src/uactor/src/select.rs rename to src/uactor/src/actor/select.rs index c63b95c..3dfd1af 100644 --- a/src/uactor/src/select.rs +++ b/src/uactor/src/actor/select.rs @@ -1,6 +1,6 @@ -use crate::actor::{Actor, HandleResult, Handler}; -use crate::datasource::DataSource; -use crate::message::Message; +use crate::actor::abstract_actor::{Actor, HandleResult, Handler}; +use crate::data::datasource::DataSource; +use crate::actor::message::Message; use std::future::pending; pub trait ActorSelect { diff --git a/src/uactor/src/aliases.rs b/src/uactor/src/aliases.rs new file mode 100644 index 0000000..4c772cd --- /dev/null +++ b/src/uactor/src/aliases.rs @@ -0,0 +1,3 @@ +use std::sync::Arc; + +pub type ActorName = Arc; \ No newline at end of file diff --git a/src/uactor/src/data_publisher.rs b/src/uactor/src/data/data_publisher.rs similarity index 98% rename from src/uactor/src/data_publisher.rs rename to src/uactor/src/data/data_publisher.rs index f826ff5..910da95 100644 --- a/src/uactor/src/data_publisher.rs +++ b/src/uactor/src/data/data_publisher.rs @@ -146,7 +146,6 @@ pub use sync_sender::*; #[cfg(not(feature = "async_sender"))] mod sync_sender { - use tokio::sync::mpsc::UnboundedSender; use tokio::sync::{broadcast, mpsc, oneshot, watch}; pub trait DataPublisher: TryClone { @@ -201,7 +200,7 @@ mod sync_sender { } } - impl TryClone for UnboundedSender + impl TryClone for mpsc::UnboundedSender where T: Send, { diff --git a/src/uactor/src/datasource.rs b/src/uactor/src/data/datasource.rs similarity index 98% rename from src/uactor/src/datasource.rs rename to src/uactor/src/data/datasource.rs index 193cb34..4dfcb57 100644 --- a/src/uactor/src/datasource.rs +++ b/src/uactor/src/data/datasource.rs @@ -3,7 +3,7 @@ use tokio::net::{TcpListener, TcpStream}; use tokio::sync::{broadcast, mpsc, oneshot, watch}; use tokio::time::Interval; -use crate::message::IntervalMessage; +use crate::actor::message::IntervalMessage; pub type DataSourceResult = Result; diff --git a/src/uactor/src/data/mod.rs b/src/uactor/src/data/mod.rs new file mode 100644 index 0000000..15d9183 --- /dev/null +++ b/src/uactor/src/data/mod.rs @@ -0,0 +1,2 @@ +pub mod datasource; +pub mod data_publisher; \ No newline at end of file diff --git a/src/uactor/src/dependency_injection/mod.rs b/src/uactor/src/dependency_injection/mod.rs new file mode 100644 index 0000000..99eb88f --- /dev/null +++ b/src/uactor/src/dependency_injection/mod.rs @@ -0,0 +1,56 @@ +use std::future::Future; + +use crate::actor::context::actor_registry::ActorRegistryErrors; +use crate::actor::context::extensions::ExtensionErrors; +use crate::system::System; + +#[derive(thiserror::Error, Debug)] +pub enum InjectError { + #[error(transparent)] + ExtensionErrors(#[from] ExtensionErrors), + #[error(transparent)] + ActorRegistryErrors(#[from] ActorRegistryErrors), +} + +/// Sample: +/// ``` +///# use uactor::actor::context::extensions::Service; +/// use uactor::dependency_injection::{Inject, InjectError}; +///# use uactor::system::System; +/// +/// pub struct References { +/// var1: Service, +/// var2: Service, +/// } +/// +/// impl Inject for References { +/// async fn inject(system: &System) -> Result +/// where +/// Self: Sized +/// { +/// let var1 = system.get_service::()?.clone(); +/// let var2 = system.get_service::()?.clone(); +/// Ok(Self { var1, var2 }) +/// } +/// } +/// ``` +pub trait Inject { + fn inject(system: &System) -> impl Future> + Send + where + Self: Sized; +} + +pub mod inject_impls { + use crate::actor::abstract_actor::Actor; + use crate::dependency_injection::{Inject, InjectError}; + use crate::system::System; + + impl Inject for () { + async fn inject(_: &System) -> Result + where + Self: Sized, + { + Ok(()) + } + } +} diff --git a/src/uactor/src/di.rs b/src/uactor/src/di.rs deleted file mode 100644 index 10d1291..0000000 --- a/src/uactor/src/di.rs +++ /dev/null @@ -1,117 +0,0 @@ -use std::future::Future; - -use crate::context::actor_registry::ActorRegistryErrors; -use crate::context::extensions::ExtensionErrors; -use crate::system::System; - -#[derive(thiserror::Error, Debug)] -pub enum InjectError { - #[error(transparent)] - ExtensionErrors(#[from] ExtensionErrors), - #[error(transparent)] - ActorRegistryErrors(#[from] ActorRegistryErrors), -} - -/// Sample: -/// ``` -///# use uactor::context::extensions::Service; -/// use uactor::di::{Inject, InjectError}; -///# use uactor::system::System; -/// -/// pub struct References { -/// var1: Service, -/// var2: Service, -/// } -/// -/// impl Inject for References { -/// async fn inject(system: &System) -> Result -/// where -/// Self: Sized -/// { -/// let var1 = system.get_service::()?.clone(); -/// let var2 = system.get_service::()?.clone(); -/// Ok(Self { var1, var2 }) -/// } -/// } -/// ``` -pub trait Inject { - fn inject(system: &System) -> impl Future> + Send - where - Self: Sized; -} - -pub mod inject_impls { - use crate::actor::NamedActorRef; - use crate::context::extensions::Service; - use crate::data_publisher::TryClone; - use crate::di::{Inject, InjectError}; - use crate::system::System; - use std::sync::Arc; - - impl Inject for () { - async fn inject(_: &System) -> Result - where - Self: Sized, - { - Ok(()) - } - } - - impl Inject for T1 - where - T1: DependencyProvider, - { - async fn inject(system: &System) -> Result - where - Self: Sized, - { - let result = T1::get_dependency(system)?; - Ok(result) - } - } - - impl Inject for (T1, T2) - where - T1: DependencyProvider, - T2: DependencyProvider, - { - async fn inject(system: &System) -> Result - where - Self: Sized, - { - let t1 = T1::get_dependency(system)?; - let t2 = T2::get_dependency(system)?; - Ok((t1, t2)) - } - } - - pub trait DependencyProvider { - type Dependency; - fn get_dependency(system: &System) -> Result; - } - - impl DependencyProvider for Service - where - T: Clone + Send + Sync + 'static, - { - type Dependency = Service; - - fn get_dependency(system: &System) -> Result { - let service = system.get_service()?; - Ok(service) - } - } - - impl DependencyProvider for T - where - T: NamedActorRef + TryClone + Clone + Send + Sync + 'static, - { - type Dependency = Self; - - fn get_dependency(system: &System) -> Result { - let actor_name = Self::static_name(); - let actor = system.get_actor(Arc::from(actor_name))?; - Ok(actor) - } - } -} diff --git a/src/uactor/src/lib.rs b/src/uactor/src/lib.rs index 5285a46..909f3d9 100644 --- a/src/uactor/src/lib.rs +++ b/src/uactor/src/lib.rs @@ -1,10 +1,8 @@ -pub mod actor; -pub mod context; -pub mod data_publisher; -pub mod datasource; -pub mod di; -pub mod message; -pub mod select; +pub mod dependency_injection; pub mod system; pub use paste::paste; + +pub mod data; +pub mod actor; +pub mod aliases; diff --git a/src/uactor/src/system.rs b/src/uactor/src/system.rs index 1e1432d..e9ac60c 100644 --- a/src/uactor/src/system.rs +++ b/src/uactor/src/system.rs @@ -1,16 +1,21 @@ -use crate::actor::Actor; -use crate::context::actor_registry::{ActorRegistry, ActorRegistryErrors}; -use crate::context::extensions::{ExtensionErrors, Extensions, Service}; -use crate::context::ActorContext; -use crate::data_publisher::{TryClone, TryCloneError}; -use crate::di::{Inject, InjectError}; -use crate::select::ActorSelect; +use crate::actor::abstract_actor::Actor; +use crate::actor::context::actor_registry::{ActorRegistry, ActorRegistryErrors}; +use crate::actor::context::extensions::{ExtensionErrors, Extensions, Service}; +use crate::actor::context::ActorContext; +use crate::data::data_publisher::{DataPublisher, TryClone, TryCloneError}; +use crate::dependency_injection::{Inject, InjectError}; +use crate::actor::select::ActorSelect; use crate::system::builder::SystemBuilder; use std::any::Any; use std::collections::HashMap; +use std::pin::pin; use std::sync::Arc; +use tokio::sync::mpsc::UnboundedSender; use tokio::sync::oneshot; use tokio::task::JoinHandle; +use crate::actor; +use crate::actor::message::Message; +use crate::aliases::{ActorName}; #[derive(thiserror::Error, Debug)] pub enum ActorRunningError { @@ -32,6 +37,96 @@ pub struct System { actor_registry: ActorRegistry, } +impl System { + pub fn register_ref(&mut self, actor_name: &str) -> (R, tokio::sync::mpsc::UnboundedReceiver) + where + A: Actor, + M: Message + Send + 'static, + R: From<(ActorName, UnboundedSender, A::State)>, + { + let (mut tx, rx) = tokio::sync::mpsc::unbounded_channel::(); + + let actor_name: Arc = actor_name.to_owned().into(); + let state = A::State::default(); + let old_ref = self.actor_registry.register_ref::>(actor_name.clone(), tx.clone(), state.clone()); + if let Some(_old_ref) = old_ref { + tracing::warn!("The actor: {actor_name:?} has already been registered, old ref has been replaced"); + } + + (R::from((actor_name, tx, state)), rx) + } + + pub async fn spawn_actor( + &mut self, + actor_name: Arc, + mut actor: A, + state: A::State, + mut select: S, + ) -> Result<(::State, JoinHandle<()>), ActorRunningError> + where + A: Actor + Send, + S: ActorSelect + Send + 'static, + ::Inject: Inject + Sized + Send, + { + // let actor_name: Arc = actor_name.to_owned().into(); + + let system_name = self.name.clone(); + + let mut ctx = A::Context::create::(self, actor_name.clone()) + .await + .map_err(ActorRunningError::ContextError)?; + + let mut inject = A::Inject::inject(self).await?; + + let handle = { + let state = state.clone(); + tokio::spawn(async move { + tracing::debug!("The system: {:?} spawned actor: {:?}", system_name, actor_name); + + // call on_start + match ctx.on_start() { + Ok(_) => { + tracing::trace!("Starting the actor: {actor_name:?}"); + } + Err(err) => { + tracing::error!("Error during actor start: {err:?}"); + ctx.kill(); + } + } + + actor.on_start(&mut inject, &mut ctx).await; + + // main loop + while ctx.is_alive() { + tracing::trace!("iteration of the process: {actor_name:?}"); + let res = select.select(&mut inject, &mut ctx, &state, &mut actor).await; + ctx.after_iteration(); + + if let Err(err) = res { + tracing::error!("An error occurred while message handling by the \"{}\", error message: \"{}\"", ctx.get_name(), err); + + ctx.on_error(&err); + actor.on_error(&mut ctx, err).await; + } else { + tracing::trace!("{actor_name:?} successful iteration"); + } + } + // call on_die + match ctx.on_die(actor_name.clone()) { + Ok(_) => { + tracing::trace!("The actor: {actor_name:?} is dead"); + } + Err(err) => { + tracing::error!("Error during actor die: {err:?}"); + } + } + }) + }; + + Ok((state, handle)) + } +} + impl System {} impl System { @@ -52,13 +147,16 @@ impl System { Ok(service.clone()) } - pub fn get_actor(&self, actor_name: Arc) -> Result + pub fn get_actor(&self, actor_name: Arc) -> Result where - A: TryClone + Send + Sync + 'static, + A: Actor + Send + Sync + 'static, + M: Message, + D: DataPublisher + Send + Sync + 'static, + R: From<(ActorName, D, A::State)>, { - let actor_ref: &A = self + let (channel, state): &(D, A::State) = self .actor_registry - .get_actor::(actor_name.clone()) + .get_actor_ref::(actor_name.clone()) .ok_or_else(|| { let system_name = self.name.clone(); let kind = utils::type_name::(); @@ -69,41 +167,31 @@ impl System { actor_name, } })?; - let a = actor_ref.try_clone()?; - Ok(a) + + let reference = R::from((actor_name, channel.try_clone()?, state.clone())); + Ok(reference) } - pub fn get_actors(&self) -> Result, ActorRegistryErrors> + pub fn get_actors(&self) -> Result, ActorRegistryErrors> where - A: TryClone + Send + Sync + 'static, + A: Actor, + M: Message, + D: DataPublisher + Send + Sync + 'static { let actor_ref = self .actor_registry - .get_all() + .get_all::() .ok_or_else(|| { let system_name = self.name.clone(); let kind = std::any::type_name::().to_owned(); ActorRegistryErrors::NotRegisteredActorKind { system_name, kind } })? .into_iter() - .map(|i: &A| i.try_clone()) - .collect::, TryCloneError>>()?; + .map(|(c, state): &(D, A::State)| Ok((c.try_clone()?, state.clone()))) + .collect::, TryCloneError>>()?; Ok(actor_ref) } - pub fn insert_actor( - &mut self, - actor_name: Arc, - actor_ref: T, - ) { - tracing::info!( - "Insert actor: {actor_name:?}: {} into system context: {:?}", - std::any::type_name::(), - self.name - ); - self.actor_registry.insert::(actor_name, actor_ref); - } - pub fn insert_service(&mut self, data: T) { self.extensions.insert(Service(data)); } @@ -129,119 +217,8 @@ impl System { } } -impl System { - pub async fn run_actor(&mut self, actor_name: Arc) -> Result<(), ActorRunningError> - where - A: Actor + Any, - ::Inject: Inject + Sized + Send, - { - if let Some(tx) = self.initialized_actors.remove(&actor_name) { - let injected_dependencies = A::Inject::inject(self).await; - - let ctx = A::Context::create(self, actor_name.clone()) - .await - .map_err(ActorRunningError::ContextError)?; - - if let Err(err) = injected_dependencies.as_ref() { - tracing::error!( - "Can't inject dependencies for {actor_name:?}, actor not started. Err: {err:?}" - ) - } - let injected_dependencies = injected_dependencies?; - - if tx.send(Box::new((injected_dependencies, ctx))).is_err() { - return Err(ActorRunningError::Dropped(actor_name.clone())); - } - } else { - eprintln!("actor_name: {:?} already started", actor_name); - return Err(ActorRunningError::MissedInitializationOrAlreadyStarted( - actor_name.clone(), - )); - } - Ok(()) - } - - pub fn init_actor( - &mut self, - mut actor: A, - actor_name: Option>, - mut select: S, - ) -> (Arc, Arc, JoinHandle<()>) - where - A: Actor + Send, - S: ActorSelect + Send + 'static, - ::Inject: Inject + Sized + Send, - { - let system_name = self.name.clone(); - - let actor_name: Arc = actor_name.unwrap_or_else(|| { - let type_name = utils::type_name::(); - format!("{}-{}", type_name, (&type_name as *const String as i32)).into() - }); - let (actor_inject_tx, actor_inject_rx) = oneshot::channel::>(); - - self.initialized_actors.insert(actor_name.clone(), actor_inject_tx); - - let shared_state = actor.create_state(); - - let handle = { - let actor_name = actor_name.clone(); - let shared_state = shared_state.clone(); - - tokio::spawn(async move { - tracing::debug!("The system: {:?} spawned actor: {:?}", system_name, actor_name); - - if let Ok(boxed_state) = actor_inject_rx.await { - let (mut state, mut ctx) = { - let boxed_state = boxed_state - .downcast::<(::Inject, ::Context)>() - .expect("failed to downcast state"); - *boxed_state - }; - - // call on_start - match ctx.on_start() { - Ok(_) => { - tracing::trace!("Starting the actor: {actor_name:?}"); - } - Err(err) => { - tracing::error!("Error during actor start: {err:?}"); - ctx.kill(); - } - } - - // main loop - while ctx.is_alive() { - tracing::trace!("iteration of the process: {actor_name:?}"); - let res = select.select(&mut state, &mut ctx, &shared_state, &mut actor).await; - - if let Err(err) = res { - tracing::error!("An error occurred while message handling by the \"{}\", error message: \"{}\"", ctx.get_name(), err); - } else { - tracing::trace!("{actor_name:?} successful iteration"); - } - } - // call on_die - match ctx.on_die(actor_name.clone()) { - Ok(_) => { - tracing::trace!("The actor: {actor_name:?} is dead"); - } - Err(err) => { - tracing::error!("Error during actor die: {err:?}"); - } - } - } else { - tracing::error!("Can't run {actor_name:?}, system dropped"); - } - }) - }; - - (actor_name, shared_state, handle) - } -} - pub mod builder { - use crate::context::extensions::{Extensions, Service}; + use crate::actor::context::extensions::{Extensions, Service}; use crate::system::System; use std::sync::Arc;