diff --git a/Cargo.toml b/Cargo.toml index 449624a7..e2fdd6ab 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -18,7 +18,7 @@ members = [ ] [workspace.metadata] -protocol-version = 12 +protocol-version = 13 [profile.release] lto = true \ No newline at end of file diff --git a/clients/java/api/src/main/java/io/atomic/cloud/api/resource/Resources.java b/clients/java/api/src/main/java/io/atomic/cloud/api/resource/Resources.java index 3a352a90..7433108e 100644 --- a/clients/java/api/src/main/java/io/atomic/cloud/api/resource/Resources.java +++ b/clients/java/api/src/main/java/io/atomic/cloud/api/resource/Resources.java @@ -24,14 +24,6 @@ public interface Resources { */ CompletableFuture> groupFromName(String name); - /** - * Retrieves a SimpleGroup object that contains the specified user as a member. - * - * @param uuid the uuid of the user to retrieve the group for - * @return a SimpleGroup instance - */ - CompletableFuture> groupFromUser(UUID uuid); - /** * Retrieves an array of SimpleServer objects. * diff --git a/clients/java/api/src/main/java/io/atomic/cloud/api/user/CloudUser.java b/clients/java/api/src/main/java/io/atomic/cloud/api/user/CloudUser.java index 512276c9..f8adf788 100644 --- a/clients/java/api/src/main/java/io/atomic/cloud/api/user/CloudUser.java +++ b/clients/java/api/src/main/java/io/atomic/cloud/api/user/CloudUser.java @@ -9,7 +9,5 @@ public interface CloudUser { UUID uuid(); - Optional group(); - Optional server(); } diff --git a/clients/java/common/src/main/java/io/atomic/cloud/common/resource/ResourceManager.java b/clients/java/common/src/main/java/io/atomic/cloud/common/resource/ResourceManager.java index 6cc62a37..2a954585 100644 --- a/clients/java/common/src/main/java/io/atomic/cloud/common/resource/ResourceManager.java +++ b/clients/java/common/src/main/java/io/atomic/cloud/common/resource/ResourceManager.java @@ -39,17 +39,6 @@ public CompletableFuture> groupFromName(String name) }); } - @Override - public CompletableFuture> groupFromUser(UUID uuid) { - return Cloud.users().userFromUuid(uuid).thenCompose(user -> { - if (user.isPresent() && user.get().group().isPresent()) { - return this.groupFromName(user.get().group().get()); - } else { - return CompletableFuture.completedFuture(Optional.empty()); - } - }); - } - @Override public CompletableFuture servers() { return this.connection.servers().thenApply(list -> list.getServersList().stream() diff --git a/clients/java/common/src/main/java/io/atomic/cloud/common/user/CloudUserImpl.java b/clients/java/common/src/main/java/io/atomic/cloud/common/user/CloudUserImpl.java index 3130c6c5..90bd601e 100644 --- a/clients/java/common/src/main/java/io/atomic/cloud/common/user/CloudUserImpl.java +++ b/clients/java/common/src/main/java/io/atomic/cloud/common/user/CloudUserImpl.java @@ -4,5 +4,4 @@ import java.util.Optional; import java.util.UUID; -public record CloudUserImpl(String name, UUID uuid, Optional group, Optional server) - implements CloudUser {} +public record CloudUserImpl(String name, UUID uuid, Optional server) implements CloudUser {} diff --git a/clients/java/common/src/main/java/io/atomic/cloud/common/user/UserManager.java b/clients/java/common/src/main/java/io/atomic/cloud/common/user/UserManager.java index 8f20a258..214a204f 100644 --- a/clients/java/common/src/main/java/io/atomic/cloud/common/user/UserManager.java +++ b/clients/java/common/src/main/java/io/atomic/cloud/common/user/UserManager.java @@ -25,16 +25,12 @@ public CompletableFuture> userFromName(String name) { return this.connection .userFromName(name) .thenApply(user -> { - Optional group = Optional.empty(); Optional server = Optional.empty(); - if (user.hasGroup()) { - group = Optional.of(user.getGroup()); - } if (user.hasServer()) { server = Optional.of(UUID.fromString(user.getServer())); } - return Optional.of((CloudUser) - new CloudUserImpl(user.getName(), UUID.fromString(user.getId()), group, server)); + return Optional.of( + (CloudUser) new CloudUserImpl(user.getName(), UUID.fromString(user.getId()), server)); }) .exceptionally(throwable -> { if (throwable.getMessage().equals("NOT_FOUND")) { @@ -50,15 +46,11 @@ public CompletableFuture> userFromUuid(@NotNull UUID uuid) { return this.connection .user(uuid.toString()) .thenApply(user -> { - Optional group = Optional.empty(); Optional server = Optional.empty(); - if (user.hasGroup()) { - group = Optional.of(user.getGroup()); - } if (user.hasServer()) { server = Optional.of(UUID.fromString(user.getServer())); } - return Optional.of((CloudUser) new CloudUserImpl(user.getName(), uuid, group, server)); + return Optional.of((CloudUser) new CloudUserImpl(user.getName(), uuid, server)); }) .exceptionally(throwable -> { if (throwable.getMessage().equals("NOT_FOUND")) { diff --git a/controller/Cargo.toml b/controller/Cargo.toml index 96ff52fd..1379daf2 100644 --- a/controller/Cargo.toml +++ b/controller/Cargo.toml @@ -20,7 +20,7 @@ bitflags = "2.9.0" # Signal handling ctrlc = "3.4.6" -# Unit system +# Server system uuid = { version = "1.16.0", features = ["v4"] } # Command line arguments @@ -47,7 +47,6 @@ rcgen = "0.13.2" # Plugins wasmtime = { version = "32.0.0", default-features = false, features = ["addr2line", "threads", "std", "runtime", "demangle", "component-model", "cranelift", "parallel-compilation", "cache"], optional = true } wasmtime-wasi = { version = "32.0.0", optional = true } -wasmtime-wasi-http = { version = "32.0.0", optional = true } minreq = { version = "2.13.4", features = ["https-rustls"], optional = true } [build-dependencies] @@ -55,4 +54,4 @@ toml = "0.8.21" tonic-build = "0.13.0" [features] -wasm-plugins = ["dep:wasmtime", "dep:wasmtime-wasi", "dep:wasmtime-wasi-http", "dep:minreq"] \ No newline at end of file +wasm-plugins = ["dep:wasmtime", "dep:wasmtime-wasi", "dep:minreq"] \ No newline at end of file diff --git a/controller/src/application.rs b/controller/src/application.rs index b9cd6dec..5faa3723 100644 --- a/controller/src/application.rs +++ b/controller/src/application.rs @@ -18,12 +18,12 @@ use subscriber::manager::SubscriberManager; use tls::TlsSetting; use tokio::{ select, - sync::{mpsc, watch}, + sync::watch, time::{interval, Instant, MissedTickBehavior}, }; use user::manager::UserManager; -use crate::{config::Config, network::NetworkStack, task::Task}; +use crate::{config::Config, network::NetworkStack, task::manager::TaskManager}; pub mod auth; pub mod group; @@ -35,9 +35,6 @@ pub mod tls; pub mod user; pub const TICK_RATE: u64 = 10; -const TASK_BUFFER: usize = 128; - -pub type TaskSender = mpsc::Sender; #[derive(Getters, MutGetters)] pub struct Controller { @@ -45,7 +42,7 @@ pub struct Controller { state: State, /* Tasks */ - tasks: (TaskSender, mpsc::Receiver), + tasks: TaskManager, /* Shared Components */ pub shared: Arc, @@ -62,6 +59,7 @@ pub struct Controller { config: Config, } +// This is data that is shared between network thread/tasks and plugin execution threads/tasks pub struct Shared { pub auth: AuthManager, pub subscribers: SubscriberManager, @@ -78,9 +76,9 @@ impl Controller { tls: TlsSetting::init(&config).await?, }); - let tasks = mpsc::channel(TASK_BUFFER); + let tasks = TaskManager::init(); - let plugins = PluginManager::init(&config, &shared).await?; + let plugins = PluginManager::init(&config, &tasks.get_sender(), &shared).await?; let nodes = NodeManager::init(&plugins).await?; let groups = GroupManager::init(&nodes).await?; @@ -101,10 +99,14 @@ impl Controller { } pub async fn run(&mut self) -> Result<()> { + // Set task system to ready + self.tasks.set_ready(true); + // Setup signal handlers self.setup_handlers()?; - let network = NetworkStack::start(&self.config, &self.shared, &self.tasks.0); + let network = + NetworkStack::start(&self.config, self.shared.clone(), self.tasks.get_sender()); // Main loop let mut interval = interval(Duration::from_millis(1000 / TICK_RATE)); @@ -114,13 +116,16 @@ impl Controller { select! { _ = interval.tick() => self.tick(&network).await?, - task = self.tasks.1.recv() => if let Some(task) = task { + task = self.tasks.recv() => if let Some(task) = task { task.run(self).await?; }, _ = self.state.signal.1.changed() => self.shutdown()?, } } + // Set task system to not ready + self.tasks.set_ready(false); + // Cleanup self.cleanup(network).await?; diff --git a/controller/src/application/plugin/manager.rs b/controller/src/application/plugin/manager.rs index c976b741..79c5b7ee 100644 --- a/controller/src/application/plugin/manager.rs +++ b/controller/src/application/plugin/manager.rs @@ -5,7 +5,7 @@ use futures::future::join_all; use simplelog::info; use tick::Ticker; -use crate::{application::Shared, config::Config}; +use crate::{application::Shared, config::Config, task::manager::TaskSender}; use super::BoxedPlugin; @@ -21,13 +21,13 @@ pub struct PluginManager { } impl PluginManager { - pub async fn init(config: &Config, shared: &Arc) -> Result { + pub async fn init(config: &Config, tasks: &TaskSender, shared: &Arc) -> Result { info!("Loading plugins..."); let mut plugins = HashMap::new(); #[cfg(feature = "wasm-plugins")] - init_wasm_plugins(config, shared, &mut plugins).await?; + init_wasm_plugins(config, tasks, shared, &mut plugins).await?; info!("Loaded {} plugin(s)", plugins.len()); Ok(Self { diff --git a/controller/src/application/plugin/runtime/wasm.rs b/controller/src/application/plugin/runtime/wasm.rs index b21394a1..3638fd4b 100644 --- a/controller/src/application/plugin/runtime/wasm.rs +++ b/controller/src/application/plugin/runtime/wasm.rs @@ -10,12 +10,14 @@ use tonic::async_trait; use url::Url; use wasmtime::{component::ResourceAny, AsContextMut, Engine, Store}; use wasmtime_wasi::{IoView, ResourceTable, WasiCtx, WasiView}; -use wasmtime_wasi_http::{WasiHttpCtx, WasiHttpView}; -use crate::application::{ - node::Capabilities, - plugin::{BoxedNode, Features, GenericPlugin, Information}, - Shared, +use crate::{ + application::{ + node::Capabilities, + plugin::{BoxedNode, Features, GenericPlugin, Information}, + Shared, + }, + task::manager::TaskSender, }; pub(crate) mod config; @@ -44,6 +46,7 @@ pub mod generated { pub(crate) struct PluginState { /* Global */ + tasks: TaskSender, shared: Arc, /* Plugin */ @@ -51,7 +54,6 @@ pub(crate) struct PluginState { /* Wasmtime */ wasi: WasiCtx, - http: WasiHttpCtx, resources: ResourceTable, } @@ -244,12 +246,6 @@ impl WasiView for PluginState { } } -impl WasiHttpView for PluginState { - fn ctx(&mut self) -> &mut WasiHttpCtx { - &mut self.http - } -} - impl From for Information { fn from(val: bridge::Information) -> Self { Information { diff --git a/controller/src/application/plugin/runtime/wasm/ext.rs b/controller/src/application/plugin/runtime/wasm/ext.rs index 2d04c310..db801e26 100644 --- a/controller/src/application/plugin/runtime/wasm/ext.rs +++ b/controller/src/application/plugin/runtime/wasm/ext.rs @@ -17,6 +17,7 @@ mod log; mod platform; pub mod process; pub mod screen; +mod server; mod tls; impl system::types::Host for PluginState {} diff --git a/controller/src/application/plugin/runtime/wasm/ext/server.rs b/controller/src/application/plugin/runtime/wasm/ext/server.rs new file mode 100644 index 00000000..ffe78d8c --- /dev/null +++ b/controller/src/application/plugin/runtime/wasm/ext/server.rs @@ -0,0 +1,47 @@ +use anyhow::Result; +use tonic::async_trait; +use uuid::Uuid; + +use crate::{ + application::{ + plugin::runtime::wasm::{ + generated::{ + exports::plugin::system::bridge, + plugin::system::{self, types::ErrorMessage}, + }, + PluginState, + }, + Controller, + }, + task::{plugin::PluginTask, BoxedAny, GenericTask}, +}; + +impl system::server::Host for PluginState { + async fn get_server( + &mut self, + uuid: String, + ) -> Result, ErrorMessage>> { + let Ok(uuid) = Uuid::parse_str(&uuid) else { + return Ok(Err("Failed to parse provided uuid".to_string())); + }; + + Ok(Ok(PluginTask::execute::, _>( + &self.tasks, + GetServerTask(uuid), + ) + .await?)) + } +} + +pub struct GetServerTask(pub Uuid); + +#[async_trait] +impl GenericTask for GetServerTask { + async fn run(&mut self, controller: &mut Controller) -> Result { + let Some(server) = controller.servers.get_server(&self.0) else { + return PluginTask::new_ok(None::); + }; + + PluginTask::new_ok(Some::(server.into())) + } +} diff --git a/controller/src/application/plugin/runtime/wasm/init.rs b/controller/src/application/plugin/runtime/wasm/init.rs index 8db0d733..7e6d8b1a 100644 --- a/controller/src/application/plugin/runtime/wasm/init.rs +++ b/controller/src/application/plugin/runtime/wasm/init.rs @@ -9,7 +9,6 @@ use wasmtime::{ Engine, Store, }; use wasmtime_wasi::{DirPerms, FilePerms, ResourceTable, WasiCtxBuilder}; -use wasmtime_wasi_http::WasiHttpCtx; use crate::{ application::{ @@ -18,6 +17,7 @@ use crate::{ }, config::Config, storage::Storage, + task::manager::TaskSender, }; use super::{ @@ -29,6 +29,7 @@ use super::{ #[allow(clippy::too_many_lines)] pub async fn init_wasm_plugins( global_config: &Config, + tasks: &TaskSender, shared: &Arc, plugins: &mut HashMap, ) -> Result<()> { @@ -90,7 +91,8 @@ pub async fn init_wasm_plugins( &name, &source, global_config, - shared, + tasks.clone(), + shared.clone(), &plugins_config, &data_directory, &config_directory, @@ -169,7 +171,8 @@ impl Plugin { name: &str, source: &Source, global_config: &Config, - shared: &Arc, + tasks: TaskSender, + shared: Arc, plugins_config: &PluginsConfig, data_directory: &Path, config_directory: &Path, @@ -189,7 +192,6 @@ impl Plugin { let mut linker = Linker::new(&engine); wasmtime_wasi::add_to_linker_async(&mut linker)?; - wasmtime_wasi_http::add_only_http_to_linker_async(&mut linker)?; generated::Plugin::add_to_linker(&mut linker, |state: &mut PluginState| state)?; let mut wasi = WasiCtxBuilder::new(); @@ -232,10 +234,10 @@ impl Plugin { let mut store = Store::new( &engine, PluginState { - shared: shared.clone(), + tasks, + shared, name: name.to_string(), wasi, - http: WasiHttpCtx::new(), resources, }, ); diff --git a/controller/src/application/plugin/runtime/wasm/node.rs b/controller/src/application/plugin/runtime/wasm/node.rs index 0213de53..e34e9009 100644 --- a/controller/src/application/plugin/runtime/wasm/node.rs +++ b/controller/src/application/plugin/runtime/wasm/node.rs @@ -256,6 +256,7 @@ impl From<&Server> for bridge::Server { group: val.group().clone(), allocation: val.allocation().into(), token: val.token().clone(), + connected_users: *val.connected_users(), } } } @@ -268,6 +269,7 @@ impl From for bridge::Server { group: val.group().clone(), allocation: val.allocation().into(), token: val.token().clone(), + connected_users: *val.connected_users(), } } } diff --git a/controller/src/application/server/manager.rs b/controller/src/application/server/manager.rs index b10df8c8..b362c834 100644 --- a/controller/src/application/server/manager.rs +++ b/controller/src/application/server/manager.rs @@ -136,12 +136,13 @@ impl ServerManager { for server in self.servers.values() { if server.heart.is_dead() { match server.state { - State::Starting | State::Running => { + State::Starting | State::Restarting => { warn!("Unit {} failed to establish online status within the expected startup time of {:.2?}.", server.id, config.restart_timeout()); } - _ => { + State::Running => { warn!("Server {} has not checked in for {:.2?}, indicating a potential error.", server.id, server.heart.timeout); } + State::Stopping => continue, // We ignore that the server has not checked in because he is stopping } self.restart_requests .push(RestartRequest::new(None, server.id().clone())); diff --git a/controller/src/application/server/screen/manager.rs b/controller/src/application/server/screen/manager.rs index 466a90bb..21815f63 100644 --- a/controller/src/application/server/screen/manager.rs +++ b/controller/src/application/server/screen/manager.rs @@ -19,7 +19,8 @@ use crate::{ use super::{BoxedScreen, ScreenPullJoinHandle, ScreenWriteJoinHandle}; -const SCREEN_TICK_RATE: u64 = TICK_RATE / 2; +const SCREEN_TICK_RATE: u64 = TICK_RATE / 3; +const PASSIVE_SCREEN_TICK_RATE_MULTIPLIER: u64 = 250; type SubscriberHolder = Vec>; @@ -101,7 +102,8 @@ impl ScreenManager { } struct ActiveScreen { - interval: Interval, + // First is for normal ticks and second is for passiv ticks to prevent buffers from overflowing + intervals: (Interval, Interval), screen: BoxedScreen, handle: Option, subscribers: SubscriberHolder, @@ -110,10 +112,20 @@ struct ActiveScreen { impl ActiveScreen { pub fn new(screen: BoxedScreen) -> Self { - let mut interval = interval(Duration::from_millis(1000 / SCREEN_TICK_RATE)); - interval.set_missed_tick_behavior(MissedTickBehavior::Skip); + let mut intervals = ( + interval(Duration::from_millis(1000 / SCREEN_TICK_RATE)), + interval(Duration::from_millis( + (1000 / SCREEN_TICK_RATE) * PASSIVE_SCREEN_TICK_RATE_MULTIPLIER, + )), + ); + intervals + .0 + .set_missed_tick_behavior(MissedTickBehavior::Skip); + intervals + .1 + .set_missed_tick_behavior(MissedTickBehavior::Skip); Self { - interval, + intervals, screen, handle: None, subscribers: vec![], @@ -127,13 +139,13 @@ impl ActiveScreen { pub async fn push(&mut self, subscriber: Subscriber) { if self.cache.has_data() - && subscriber + && !subscriber .send_message(ScreenLines { lines: self.cache.clone_items(), }) .await { - warn!("Failed to send initial screen data to subscriber!",); + warn!("Failed to send initial screen data to subscriber!"); return; } @@ -141,7 +153,7 @@ impl ActiveScreen { } pub async fn tick(&mut self) -> Result<()> { - if self.interval.tick().now_or_never().is_none() { + if self.intervals.0.tick().now_or_never().is_none() { // Skip tick return Ok(()); } @@ -149,8 +161,8 @@ impl ActiveScreen { // Remove all dead subscribers self.subscribers.retain(Subscriber::is_alive); - if self.subscribers.is_empty() { - // If no one is watching dont pull + if self.intervals.1.tick().now_or_never().is_none() && self.subscribers.is_empty() { + // If no one is watching dont pull and no passiv tick is needed return Ok(()); } diff --git a/controller/src/application/subscriber.rs b/controller/src/application/subscriber.rs index 61fcd6c6..a86dfc87 100644 --- a/controller/src/application/subscriber.rs +++ b/controller/src/application/subscriber.rs @@ -57,8 +57,10 @@ impl Subscriber { Dispatch::Plugin(sender) => { if let Err(error) = sender.send(Ok(message)).await { error!("Failed to send plugin message: {}", error); + false + } else { + true } - false } } } diff --git a/controller/src/application/subscriber/manager/event/server.rs b/controller/src/application/subscriber/manager/event/server.rs index 1efa4dd9..0e2af713 100644 --- a/controller/src/application/subscriber/manager/event/server.rs +++ b/controller/src/application/subscriber/manager/event/server.rs @@ -17,6 +17,8 @@ pub struct ServerEvent { pub allocation: Allocation, #[getset(get = "pub")] pub token: String, + #[getset(get = "pub")] + pub connected_users: u32, } impl From<&Server> for ServerEvent { @@ -26,6 +28,7 @@ impl From<&Server> for ServerEvent { group: value.group().clone(), allocation: value.allocation().clone(), token: value.token().clone(), + connected_users: *value.connected_users(), } } } diff --git a/controller/src/network.rs b/controller/src/network.rs index 66457799..e500c8e1 100644 --- a/controller/src/network.rs +++ b/controller/src/network.rs @@ -17,10 +17,7 @@ use tokio::{ }; use tonic::transport::{Identity, Server, ServerTlsConfig}; -use crate::{ - application::{Shared, TaskSender}, - config::Config, -}; +use crate::{application::Shared, config::Config, task::manager::TaskSender}; mod auth; pub mod client; @@ -34,7 +31,7 @@ pub struct NetworkStack { } impl NetworkStack { - pub fn start(config: &Config, shared: &Arc, queue: &TaskSender) -> Self { + pub fn start(config: &Config, shared: Arc, queue: TaskSender) -> Self { async fn run( bind: SocketAddr, identity: Option, @@ -77,8 +74,6 @@ impl NetworkStack { .tls .as_ref() .map(|(_, identity)| identity.clone()); - let shared = shared.clone(); - let queue = queue.clone(); let task = spawn(async move { if let Err(error) = run(bind, identity, shared, queue, receiver).await { diff --git a/controller/src/network/client.rs b/controller/src/network/client.rs index 2a69f807..1e77f741 100644 --- a/controller/src/network/client.rs +++ b/controller/src/network/client.rs @@ -17,9 +17,9 @@ use uuid::Uuid; use crate::{ application::{ auth::AuthType, server::NameAndUuid, subscriber::Subscriber, - user::transfer::TransferTarget, Shared, TaskSender, + user::transfer::TransferTarget, Shared, }, - task::Task, + task::{manager::TaskSender, network::TonicTask}, VERSION, }; @@ -64,7 +64,7 @@ impl ClientService for ClientServiceImpl { // Heartbeat async fn beat(&self, request: Request<()>) -> Result, Status> { Ok(Response::new( - Task::execute::<(), _, _>(AuthType::Server, &self.0, request, |_, auth| { + TonicTask::execute::<(), _, _>(AuthType::Server, &self.0, request, |_, auth| { Ok(Box::new(BeatTask(auth))) }) .await?, @@ -74,7 +74,7 @@ impl ClientService for ClientServiceImpl { // Ready state async fn set_ready(&self, request: Request) -> Result, Status> { Ok(Response::new( - Task::execute::<(), _, _>(AuthType::Server, &self.0, request, |request, auth| { + TonicTask::execute::<(), _, _>(AuthType::Server, &self.0, request, |request, auth| { Ok(Box::new(SetReadyTask(auth, *request.get_ref()))) }) .await?, @@ -84,7 +84,7 @@ impl ClientService for ClientServiceImpl { // Health async fn set_running(&self, request: Request<()>) -> Result, Status> { Ok(Response::new( - Task::execute::<(), _, _>(AuthType::Server, &self.0, request, |_, auth| { + TonicTask::execute::<(), _, _>(AuthType::Server, &self.0, request, |_, auth| { Ok(Box::new(SetRunningTask(auth))) }) .await?, @@ -92,7 +92,7 @@ impl ClientService for ClientServiceImpl { } async fn request_stop(&self, request: Request<()>) -> Result, Status> { Ok(Response::new( - Task::execute::<(), _, _>(AuthType::Server, &self.0, request, |_, auth| { + TonicTask::execute::<(), _, _>(AuthType::Server, &self.0, request, |_, auth| { Ok(Box::new(RequestStopTask(auth))) }) .await?, @@ -102,7 +102,7 @@ impl ClientService for ClientServiceImpl { // User async fn user_connected(&self, request: Request) -> Result, Status> { Ok(Response::new( - Task::execute::<(), _, _>(AuthType::Server, &self.0, request, |request, auth| { + TonicTask::execute::<(), _, _>(AuthType::Server, &self.0, request, |request, auth| { let request = request.into_inner(); let name = request.name; @@ -123,7 +123,7 @@ impl ClientService for ClientServiceImpl { request: Request, ) -> Result, Status> { Ok(Response::new( - Task::execute::<(), _, _>(AuthType::Server, &self.0, request, |request, auth| { + TonicTask::execute::<(), _, _>(AuthType::Server, &self.0, request, |request, auth| { let request = request.into_inner(); let Ok(uuid) = Uuid::from_str(&request.id) else { @@ -140,7 +140,7 @@ impl ClientService for ClientServiceImpl { request: Request, ) -> Result, Status> { Ok(Response::new( - Task::execute::( + TonicTask::execute::( AuthType::Server, &self.0, request, @@ -162,7 +162,7 @@ impl ClientService for ClientServiceImpl { request: Request, ) -> Result, Status> { Ok(Response::new( - Task::execute::( + TonicTask::execute::( AuthType::Server, &self.0, request, @@ -177,15 +177,18 @@ impl ClientService for ClientServiceImpl { } async fn get_users(&self, request: Request<()>) -> Result, Status> { Ok(Response::new( - Task::execute::(AuthType::Server, &self.0, request, |_, _| { - Ok(Box::new(GetUsersTask)) - }) + TonicTask::execute::( + AuthType::Server, + &self.0, + request, + |_, _| Ok(Box::new(GetUsersTask)), + ) .await?, )) } async fn get_user_count(&self, request: Request<()>) -> Result, Status> { Ok(Response::new( - Task::execute::(AuthType::Server, &self.0, request, |_, _| { + TonicTask::execute::(AuthType::Server, &self.0, request, |_, _| { Ok(Box::new(UserCountTask)) }) .await?, @@ -195,7 +198,7 @@ impl ClientService for ClientServiceImpl { // Transfer async fn transfer_users(&self, request: Request) -> Result, Status> { Ok(Response::new( - Task::execute::(AuthType::Server, &self.0, request, |request, auth| { + TonicTask::execute::(AuthType::Server, &self.0, request, |request, auth| { let request = request.into_inner(); let target = match request.target { @@ -243,7 +246,7 @@ impl ClientService for ClientServiceImpl { &self, request: Request<()>, ) -> Result, Status> { - let auth = Task::get_auth(AuthType::Server, &request)?; + let auth = TonicTask::get_auth(AuthType::Server, &request)?; let server = auth .get_server() .expect("Should be ok. Because type is checked in get_auth"); @@ -296,7 +299,7 @@ impl ClientService for ClientServiceImpl { request: Request, ) -> Result, Status> { Ok(Response::new( - Task::execute::( + TonicTask::execute::( AuthType::Server, &self.0, request, @@ -318,7 +321,7 @@ impl ClientService for ClientServiceImpl { request: Request, ) -> Result, Status> { Ok(Response::new( - Task::execute::( + TonicTask::execute::( AuthType::Server, &self.0, request, @@ -336,7 +339,7 @@ impl ClientService for ClientServiceImpl { request: Request<()>, ) -> Result, Status> { Ok(Response::new( - Task::execute::( + TonicTask::execute::( AuthType::Server, &self.0, request, @@ -352,7 +355,7 @@ impl ClientService for ClientServiceImpl { request: Request, ) -> Result, Status> { Ok(Response::new( - Task::execute::( + TonicTask::execute::( AuthType::Server, &self.0, request, @@ -370,7 +373,7 @@ impl ClientService for ClientServiceImpl { request: Request<()>, ) -> Result, Status> { Ok(Response::new( - Task::execute::( + TonicTask::execute::( AuthType::Server, &self.0, request, diff --git a/controller/src/network/client/beat.rs b/controller/src/network/client/beat.rs index 1ff0339b..71f5ae0f 100644 --- a/controller/src/network/client/beat.rs +++ b/controller/src/network/client/beat.rs @@ -3,7 +3,7 @@ use tonic::async_trait; use crate::{ application::{auth::Authorization, Controller}, - task::{BoxedAny, GenericTask, Task}, + task::{network::TonicTask, BoxedAny, GenericTask}, }; pub struct BeatTask(pub Authorization); @@ -16,9 +16,9 @@ impl GenericTask for BeatTask { .get_server() .and_then(|server| controller.servers.get_server_mut(server.uuid())) else { - return Task::new_link_error(); + return TonicTask::new_link_error(); }; server.heart_mut().beat(); - Task::new_empty() + TonicTask::new_empty() } } diff --git a/controller/src/network/client/group.rs b/controller/src/network/client/group.rs index 51a24632..9a1f8997 100644 --- a/controller/src/network/client/group.rs +++ b/controller/src/network/client/group.rs @@ -4,7 +4,7 @@ use tonic::{async_trait, Status}; use crate::{ application::{group::Group, Controller}, network::proto::common::common_group::{List, Short}, - task::{BoxedAny, GenericTask, Task}, + task::{network::TonicTask, BoxedAny, GenericTask}, }; pub struct GetGroupTask(pub String); @@ -14,17 +14,17 @@ pub struct GetGroupsTask; impl GenericTask for GetGroupTask { async fn run(&mut self, controller: &mut Controller) -> Result { let Some(group) = controller.groups.get_group(&self.0) else { - return Task::new_err(Status::not_found("Group not found")); + return TonicTask::new_err(Status::not_found("Group not found")); }; - Task::new_ok(Short::from(&group)) + TonicTask::new_ok(Short::from(&group)) } } #[async_trait] impl GenericTask for GetGroupsTask { async fn run(&mut self, controller: &mut Controller) -> Result { - Task::new_ok(List { + TonicTask::new_ok(List { groups: controller .groups .get_groups() diff --git a/controller/src/network/client/health.rs b/controller/src/network/client/health.rs index 65d1b733..dbf012a4 100644 --- a/controller/src/network/client/health.rs +++ b/controller/src/network/client/health.rs @@ -7,7 +7,7 @@ use crate::{ server::{manager::StopRequest, State}, Controller, }, - task::{BoxedAny, GenericTask, Task}, + task::{network::TonicTask, BoxedAny, GenericTask}, }; pub struct SetRunningTask(pub Authorization); @@ -21,10 +21,10 @@ impl GenericTask for SetRunningTask { .get_server() .and_then(|server| controller.servers.get_server_mut(server.uuid())) else { - return Task::new_link_error(); + return TonicTask::new_link_error(); }; server.set_state(State::Running); - Task::new_empty() + TonicTask::new_empty() } } @@ -36,11 +36,11 @@ impl GenericTask for RequestStopTask { .get_server() .and_then(|server| controller.servers.resolve_server(server.uuid())) else { - return Task::new_link_error(); + return TonicTask::new_link_error(); }; controller .servers .schedule_stop(StopRequest::new(None, server)); - Task::new_empty() + TonicTask::new_empty() } } diff --git a/controller/src/network/client/ready.rs b/controller/src/network/client/ready.rs index 45ef9d32..d8f7b7e0 100644 --- a/controller/src/network/client/ready.rs +++ b/controller/src/network/client/ready.rs @@ -3,7 +3,7 @@ use tonic::async_trait; use crate::{ application::{auth::Authorization, Controller}, - task::{BoxedAny, GenericTask, Task}, + task::{network::TonicTask, BoxedAny, GenericTask}, }; pub struct SetReadyTask(pub Authorization, pub bool); @@ -16,9 +16,9 @@ impl GenericTask for SetReadyTask { .get_server() .and_then(|server| controller.servers.get_server_mut(server.uuid())) else { - return Task::new_link_error(); + return TonicTask::new_link_error(); }; server.set_ready(self.1, &controller.shared).await; - Task::new_empty() + TonicTask::new_empty() } } diff --git a/controller/src/network/client/server.rs b/controller/src/network/client/server.rs index 425ec993..d31bff95 100644 --- a/controller/src/network/client/server.rs +++ b/controller/src/network/client/server.rs @@ -5,7 +5,7 @@ use uuid::Uuid; use crate::{ application::{server::Server, Controller}, network::proto::common::common_server::{List, Short}, - task::{BoxedAny, GenericTask, Task}, + task::{network::TonicTask, BoxedAny, GenericTask}, }; pub struct GetServerTask(pub Uuid); @@ -16,10 +16,10 @@ pub struct GetServersTask; impl GenericTask for GetServerTask { async fn run(&mut self, controller: &mut Controller) -> Result { let Some(server) = controller.servers.get_server(&self.0) else { - return Task::new_err(Status::not_found("Server not found")); + return TonicTask::new_err(Status::not_found("Server not found")); }; - Task::new_ok(Short::from(&server)) + TonicTask::new_ok(Short::from(&server)) } } @@ -27,17 +27,17 @@ impl GenericTask for GetServerTask { impl GenericTask for GetServerFromNameTask { async fn run(&mut self, controller: &mut Controller) -> Result { let Some(server) = controller.servers.get_server_from_name(&self.0) else { - return Task::new_err(Status::not_found("Server not found")); + return TonicTask::new_err(Status::not_found("Server not found")); }; - Task::new_ok(Short::from(&server)) + TonicTask::new_ok(Short::from(&server)) } } #[async_trait] impl GenericTask for GetServersTask { async fn run(&mut self, controller: &mut Controller) -> Result { - Task::new_ok(List { + TonicTask::new_ok(List { servers: controller .servers .get_servers() diff --git a/controller/src/network/client/user.rs b/controller/src/network/client/user.rs index 84794066..916779de 100644 --- a/controller/src/network/client/user.rs +++ b/controller/src/network/client/user.rs @@ -10,7 +10,7 @@ use crate::{ Controller, }, network::proto::common::common_user::{Item, List}, - task::{BoxedAny, GenericTask, Task}, + task::{network::TonicTask, BoxedAny, GenericTask}, }; pub struct UserConnectedTask(pub Authorization, pub NameAndUuid); @@ -28,10 +28,10 @@ impl GenericTask for UserConnectedTask { .get_server() .and_then(|server| controller.servers.get_server_mut(server.uuid())) else { - return Task::new_link_error(); + return TonicTask::new_link_error(); }; controller.users.user_connected(server, self.1.clone()); - Task::new_empty() + TonicTask::new_empty() } } @@ -43,12 +43,12 @@ impl GenericTask for UserDisconnectedTask { .get_server() .and_then(|server| controller.servers.get_server_mut(server.uuid())) else { - return Task::new_link_error(); + return TonicTask::new_link_error(); }; if controller.users.user_disconnected(server, &self.1) == ActionResult::Denied { - return Task::new_permission_error("You are not allowed to disconnect this user"); + return TonicTask::new_permission_error("You are not allowed to disconnect this user"); } - Task::new_empty() + TonicTask::new_empty() } } @@ -56,26 +56,17 @@ impl GenericTask for UserDisconnectedTask { impl GenericTask for GetUserTask { async fn run(&mut self, controller: &mut Controller) -> Result { let Some(user) = controller.users.get_user(&self.0) else { - return Task::new_err(Status::not_found("User not found")); + return TonicTask::new_err(Status::not_found("User not found")); }; - let (server, group) = if let CurrentServer::Connected(server) = user.server() { - let server = server.uuid(); - ( - Some(server.to_string()), - controller - .servers - .get_server(server) - .and_then(|server| server.group().clone()), - ) - } else { - (None, None) - }; - Task::new_ok(Item { + TonicTask::new_ok(Item { name: user.id().name().clone(), id: user.id().uuid().to_string(), - group, - server, + server: if let CurrentServer::Connected(server) = user.server() { + Some(server.uuid().to_string()) + } else { + None + }, }) } } @@ -84,59 +75,37 @@ impl GenericTask for GetUserTask { impl GenericTask for GetUserFromNameTask { async fn run(&mut self, controller: &mut Controller) -> Result { let Some(user) = controller.users.get_user_from_name(&self.0) else { - return Task::new_err(Status::not_found("User not found")); + return TonicTask::new_err(Status::not_found("User not found")); }; - let (server, group) = if let CurrentServer::Connected(server) = user.server() { - let server = server.uuid(); - ( - Some(server.to_string()), - controller - .servers - .get_server(server) - .and_then(|server| server.group().clone()), - ) - } else { - (None, None) - }; - Task::new_ok(Item { + TonicTask::new_ok(Item { name: user.id().name().clone(), id: user.id().uuid().to_string(), - group, - server, + server: if let CurrentServer::Connected(server) = user.server() { + Some(server.uuid().to_string()) + } else { + None + }, }) } } -// TODO: This call is very expensive -// TODO: Remove or find a different solution #[async_trait] impl GenericTask for GetUsersTask { async fn run(&mut self, controller: &mut Controller) -> Result { - Task::new_ok(List { + TonicTask::new_ok(List { users: controller .users .get_users() .iter() - .map(|user| { - let (server, group) = if let CurrentServer::Connected(server) = user.server() { - let server = server.uuid(); - ( - Some(server.to_string()), - controller - .servers - .get_server(server) - .and_then(|server| server.group().clone()), - ) + .map(|user| Item { + name: user.id().name().clone(), + id: user.id().uuid().to_string(), + server: if let CurrentServer::Connected(server) = user.server() { + Some(server.uuid().to_string()) } else { - (None, None) - }; - Item { - name: user.id().name().clone(), - id: user.id().uuid().to_string(), - group, - server, - } + None + }, }) .collect(), }) @@ -146,6 +115,6 @@ impl GenericTask for GetUsersTask { #[async_trait] impl GenericTask for UserCountTask { async fn run(&mut self, controller: &mut Controller) -> Result { - Task::new_ok(controller.users.get_user_count()) + TonicTask::new_ok(controller.users.get_user_count()) } } diff --git a/controller/src/network/manage.rs b/controller/src/network/manage.rs index 6fcdafac..259d2069 100644 --- a/controller/src/network/manage.rs +++ b/controller/src/network/manage.rs @@ -21,9 +21,9 @@ use crate::{ server::{DiskRetention, FallbackPolicy, Resources, Specification}, subscriber::Subscriber, user::transfer::TransferTarget, - Shared, TaskSender, + Shared, }, - task::Task, + task::{manager::TaskSender, network::TonicTask}, VERSION, }; @@ -63,7 +63,7 @@ impl ManageService for ManageServiceImpl { // Power async fn request_stop(&self, request: Request<()>) -> Result, Status> { Ok(Response::new( - Task::execute::<(), _, _>(AuthType::User, &self.0, request, |_, _| { + TonicTask::execute::<(), _, _>(AuthType::User, &self.0, request, |_, _| { Ok(Box::new(RequestStopTask())) }) .await?, @@ -73,7 +73,7 @@ impl ManageService for ManageServiceImpl { // Resource async fn set_resource(&self, request: Request) -> Result, Status> { Ok(Response::new( - Task::execute::<(), _, _>(AuthType::User, &self.0, request, |request, _| { + TonicTask::execute::<(), _, _>(AuthType::User, &self.0, request, |request, _| { let request = request.into_inner(); let Ok(category) = Category::try_from(request.category) else { @@ -91,7 +91,7 @@ impl ManageService for ManageServiceImpl { } async fn delete_resource(&self, request: Request) -> Result, Status> { Ok(Response::new( - Task::execute::<(), _, _>(AuthType::User, &self.0, request, |request, _| { + TonicTask::execute::<(), _, _>(AuthType::User, &self.0, request, |request, _| { let request = request.into_inner(); let Ok(category) = Category::try_from(request.category) else { @@ -110,7 +110,7 @@ impl ManageService for ManageServiceImpl { request: Request<()>, ) -> Result, Status> { Ok(Response::new( - Task::execute::( + TonicTask::execute::( AuthType::User, &self.0, request, @@ -126,7 +126,7 @@ impl ManageService for ManageServiceImpl { request: Request, ) -> Result, Status> { Ok(Response::new( - Task::execute::<(), _, _>(AuthType::User, &self.0, request, |request, _| { + TonicTask::execute::<(), _, _>(AuthType::User, &self.0, request, |request, _| { let request = request.into_inner(); let capabilities = match request.capabilities { @@ -158,7 +158,7 @@ impl ManageService for ManageServiceImpl { request: Request, ) -> Result, Status> { Ok(Response::new( - Task::execute::( + TonicTask::execute::( AuthType::User, &self.0, request, @@ -205,7 +205,7 @@ impl ManageService for ManageServiceImpl { request: Request, ) -> Result, Status> { Ok(Response::new( - Task::execute::( + TonicTask::execute::( AuthType::User, &self.0, request, @@ -223,9 +223,12 @@ impl ManageService for ManageServiceImpl { request: Request<()>, ) -> Result, Status> { Ok(Response::new( - Task::execute::(AuthType::User, &self.0, request, |_, _| { - Ok(Box::new(GetNodesTask())) - }) + TonicTask::execute::( + AuthType::User, + &self.0, + request, + |_, _| Ok(Box::new(GetNodesTask())), + ) .await?, )) } @@ -236,7 +239,7 @@ impl ManageService for ManageServiceImpl { request: Request, ) -> Result, Status> { Ok(Response::new( - Task::execute::<(), _, _>(AuthType::User, &self.0, request, |request, _| { + TonicTask::execute::<(), _, _>(AuthType::User, &self.0, request, |request, _| { let request = request.into_inner(); let constraints = match request.constraints { @@ -336,7 +339,7 @@ impl ManageService for ManageServiceImpl { request: Request, ) -> Result, Status> { Ok(Response::new( - Task::execute::( + TonicTask::execute::( AuthType::User, &self.0, request, @@ -434,7 +437,7 @@ impl ManageService for ManageServiceImpl { request: Request, ) -> Result, Status> { Ok(Response::new( - Task::execute::( + TonicTask::execute::( AuthType::User, &self.0, request, @@ -448,9 +451,12 @@ impl ManageService for ManageServiceImpl { request: Request<()>, ) -> Result, Status> { Ok(Response::new( - Task::execute::(AuthType::User, &self.0, request, |_, _| { - Ok(Box::new(GetGroupsTask)) - }) + TonicTask::execute::( + AuthType::User, + &self.0, + request, + |_, _| Ok(Box::new(GetGroupsTask)), + ) .await?, )) } @@ -461,7 +467,7 @@ impl ManageService for ManageServiceImpl { request: Request, ) -> Result, Status> { Ok(Response::new( - Task::execute::(AuthType::User, &self.0, request, |request, _| { + TonicTask::execute::(AuthType::User, &self.0, request, |request, _| { let request = request.into_inner(); let resources = match request.resources { @@ -540,7 +546,7 @@ impl ManageService for ManageServiceImpl { request: Request, ) -> Result, Status> { Ok(Response::new( - Task::execute::( + TonicTask::execute::( AuthType::User, &self.0, request, @@ -562,7 +568,7 @@ impl ManageService for ManageServiceImpl { request: Request, ) -> Result, Status> { Ok(Response::new( - Task::execute::( + TonicTask::execute::( AuthType::User, &self.0, request, @@ -580,9 +586,12 @@ impl ManageService for ManageServiceImpl { request: Request<()>, ) -> Result, Status> { Ok(Response::new( - Task::execute::(AuthType::User, &self.0, request, |_, _| { - Ok(Box::new(GetServersTask)) - }) + TonicTask::execute::( + AuthType::User, + &self.0, + request, + |_, _| Ok(Box::new(GetServersTask)), + ) .await?, )) } @@ -619,7 +628,7 @@ impl ManageService for ManageServiceImpl { request: Request, ) -> Result, Status> { Ok(Response::new( - Task::execute::( + TonicTask::execute::( AuthType::User, &self.0, request, @@ -641,7 +650,7 @@ impl ManageService for ManageServiceImpl { request: Request, ) -> Result, Status> { Ok(Response::new( - Task::execute::( + TonicTask::execute::( AuthType::User, &self.0, request, @@ -656,15 +665,18 @@ impl ManageService for ManageServiceImpl { } async fn get_users(&self, request: Request<()>) -> Result, Status> { Ok(Response::new( - Task::execute::(AuthType::User, &self.0, request, |_, _| { - Ok(Box::new(GetUsersTask)) - }) + TonicTask::execute::( + AuthType::User, + &self.0, + request, + |_, _| Ok(Box::new(GetUsersTask)), + ) .await?, )) } async fn get_user_count(&self, request: Request<()>) -> Result, Status> { Ok(Response::new( - Task::execute::(AuthType::User, &self.0, request, |_, _| { + TonicTask::execute::(AuthType::User, &self.0, request, |_, _| { Ok(Box::new(UserCountTask)) }) .await?, @@ -674,7 +686,7 @@ impl ManageService for ManageServiceImpl { // Transfer async fn transfer_users(&self, request: Request) -> Result, Status> { Ok(Response::new( - Task::execute::(AuthType::User, &self.0, request, |request, auth| { + TonicTask::execute::(AuthType::User, &self.0, request, |request, auth| { let request = request.into_inner(); let target = match request.target { diff --git a/controller/src/network/manage/group.rs b/controller/src/network/manage/group.rs index 7d3054ac..be1cee2c 100644 --- a/controller/src/network/manage/group.rs +++ b/controller/src/network/manage/group.rs @@ -14,7 +14,7 @@ use crate::{ server::{self, Fallback}, }, }, - task::{BoxedAny, GenericTask, Task}, + task::{network::TonicTask, BoxedAny, GenericTask}, }; pub struct CreateGroupTask( @@ -52,9 +52,9 @@ impl GenericTask for CreateGroupTask { ) .await { - return Task::new_err(error.into()); + return TonicTask::new_err(error.into()); } - Task::new_empty() + TonicTask::new_empty() } } @@ -74,8 +74,8 @@ impl GenericTask for UpdateGroupTask { ) .await { - Ok(group) => return Task::new_ok(Detail::from(group)), - Err(error) => Task::new_err(error.into()), + Ok(group) => return TonicTask::new_ok(Detail::from(group)), + Err(error) => TonicTask::new_err(error.into()), } } } @@ -84,17 +84,17 @@ impl GenericTask for UpdateGroupTask { impl GenericTask for GetGroupTask { async fn run(&mut self, controller: &mut Controller) -> Result { let Some(group) = controller.groups.get_group(&self.0) else { - return Task::new_err(Status::not_found("Group not found")); + return TonicTask::new_err(Status::not_found("Group not found")); }; - Task::new_ok(Detail::from(group)) + TonicTask::new_ok(Detail::from(group)) } } #[async_trait] impl GenericTask for GetGroupsTask { async fn run(&mut self, controller: &mut Controller) -> Result { - Task::new_ok(List { + TonicTask::new_ok(List { groups: controller .groups .get_groups() diff --git a/controller/src/network/manage/node.rs b/controller/src/network/manage/node.rs index b4ec8742..948f09b8 100644 --- a/controller/src/network/manage/node.rs +++ b/controller/src/network/manage/node.rs @@ -8,7 +8,7 @@ use crate::{ Controller, }, network::proto::manage::node::{self, Detail, List, Short}, - task::{BoxedAny, GenericTask, Task}, + task::{network::TonicTask, BoxedAny, GenericTask}, }; pub struct CreateNodeTask(pub String, pub String, pub Capabilities, pub Url); @@ -24,9 +24,9 @@ impl GenericTask for CreateNodeTask { .create_node(&self.0, &self.1, &self.2, &self.3, &controller.plugins) .await { - return Task::new_err(error.into()); + return TonicTask::new_err(error.into()); } - Task::new_empty() + TonicTask::new_empty() } } @@ -38,8 +38,8 @@ impl GenericTask for UpdateNodeTask { .update_node(&self.0, self.1.as_ref(), self.2.as_ref()) .await { - Ok(node) => return Task::new_ok(Detail::from(node)), - Err(error) => Task::new_err(error.into()), + Ok(node) => return TonicTask::new_ok(Detail::from(node)), + Err(error) => TonicTask::new_err(error.into()), } } } @@ -48,17 +48,17 @@ impl GenericTask for UpdateNodeTask { impl GenericTask for GetNodeTask { async fn run(&mut self, controller: &mut Controller) -> Result { let Some(node) = controller.nodes.get_node(&self.0) else { - return Task::new_err(Status::not_found("Node not found")); + return TonicTask::new_err(Status::not_found("Node not found")); }; - Task::new_ok(Detail::from(node)) + TonicTask::new_ok(Detail::from(node)) } } #[async_trait] impl GenericTask for GetNodesTask { async fn run(&mut self, controller: &mut Controller) -> Result { - Task::new_ok(List { + TonicTask::new_ok(List { nodes: controller .nodes .get_nodes() diff --git a/controller/src/network/manage/plugin.rs b/controller/src/network/manage/plugin.rs index 531a3a24..791fed8f 100644 --- a/controller/src/network/manage/plugin.rs +++ b/controller/src/network/manage/plugin.rs @@ -4,7 +4,7 @@ use tonic::async_trait; use crate::{ application::Controller, network::proto::manage::plugin::{List, Short}, - task::{BoxedAny, GenericTask, Task}, + task::{network::TonicTask, BoxedAny, GenericTask}, }; pub struct GetPluginsTask(); @@ -12,7 +12,7 @@ pub struct GetPluginsTask(); #[async_trait] impl GenericTask for GetPluginsTask { async fn run(&mut self, controller: &mut Controller) -> Result { - Task::new_ok(List { + TonicTask::new_ok(List { plugins: controller .plugins .get_plugins_keys() diff --git a/controller/src/network/manage/power.rs b/controller/src/network/manage/power.rs index 58c62e83..ade85ca2 100644 --- a/controller/src/network/manage/power.rs +++ b/controller/src/network/manage/power.rs @@ -3,7 +3,7 @@ use tonic::async_trait; use crate::{ application::Controller, - task::{BoxedAny, GenericTask, Task}, + task::{network::TonicTask, BoxedAny, GenericTask}, }; pub struct RequestStopTask(); @@ -12,6 +12,6 @@ pub struct RequestStopTask(); impl GenericTask for RequestStopTask { async fn run(&mut self, controller: &mut Controller) -> Result { controller.signal_shutdown(); - Task::new_empty() + TonicTask::new_empty() } } diff --git a/controller/src/network/manage/resource.rs b/controller/src/network/manage/resource.rs index 813d2c3b..105c4cca 100644 --- a/controller/src/network/manage/resource.rs +++ b/controller/src/network/manage/resource.rs @@ -5,7 +5,7 @@ use uuid::Uuid; use crate::{ application::{server::manager::StopRequest, Controller}, network::proto::manage::resource::Category, - task::{BoxedAny, GenericTask, Task}, + task::{network::TonicTask, BoxedAny, GenericTask}, }; pub struct SetResourceTask(pub Category, pub String, pub bool); @@ -22,9 +22,9 @@ impl GenericTask for SetResourceTask { .get_node_mut(&self.1) .ok_or(Status::not_found("Node not found"))?; if let Err(error) = node.set_active(self.2).await { - return Task::new_err(Status::internal(error.to_string())); + return TonicTask::new_err(Status::internal(error.to_string())); } - Task::new_empty() + TonicTask::new_empty() } Category::Group => { let group = controller @@ -32,11 +32,11 @@ impl GenericTask for SetResourceTask { .get_group_mut(&self.1) .ok_or(Status::not_found("Group not found"))?; if let Err(error) = group.set_active(self.2, &mut controller.servers).await { - return Task::new_err(Status::internal(error.to_string())); + return TonicTask::new_err(Status::internal(error.to_string())); } - Task::new_empty() + TonicTask::new_empty() } - Category::Server => Task::new_err(Status::unimplemented( + Category::Server => TonicTask::new_err(Status::unimplemented( "This category is not supported for this action", )), } @@ -53,27 +53,27 @@ impl GenericTask for DeleteResourceTask { .delete_node(&self.1, &controller.servers, &controller.groups) .await { - return Task::new_err(error.into()); + return TonicTask::new_err(error.into()); } - Task::new_empty() + TonicTask::new_empty() } Category::Group => { if let Err(error) = controller.groups.delete_group(&self.1).await { - return Task::new_err(error.into()); + return TonicTask::new_err(error.into()); } - Task::new_empty() + TonicTask::new_empty() } Category::Server => { let Ok(uuid) = Uuid::parse_str(&self.1) else { - return Task::new_err(Status::invalid_argument("Invalid UUID")); + return TonicTask::new_err(Status::invalid_argument("Invalid UUID")); }; let id = match controller.servers.get_server(&uuid) { Some(server) => server.id().clone(), - None => return Task::new_err(Status::not_found("Server not found")), + None => return TonicTask::new_err(Status::not_found("Server not found")), }; controller.servers.schedule_stop(StopRequest::new(None, id)); - Task::new_empty() + TonicTask::new_empty() } } } diff --git a/controller/src/network/manage/server.rs b/controller/src/network/manage/server.rs index 5eac5d17..414bcdd9 100644 --- a/controller/src/network/manage/server.rs +++ b/controller/src/network/manage/server.rs @@ -13,7 +13,7 @@ use crate::{ common::{common_server::List, Address}, manage::server::{self, Detail}, }, - task::{BoxedAny, GenericTask, Task}, + task::{network::TonicTask, BoxedAny, GenericTask}, }; pub struct ScheduleServerTask( @@ -46,7 +46,7 @@ impl GenericTask for ScheduleServerTask { ); controller.servers.schedule_start(request); - Task::new_ok(uuid) + TonicTask::new_ok(uuid) } } @@ -54,10 +54,10 @@ impl GenericTask for ScheduleServerTask { impl GenericTask for GetServerTask { async fn run(&mut self, controller: &mut Controller) -> Result { let Some(server) = controller.servers.get_server(&self.0) else { - return Task::new_err(Status::not_found("Server not found")); + return TonicTask::new_err(Status::not_found("Server not found")); }; - Task::new_ok(Detail::from(server)) + TonicTask::new_ok(Detail::from(server)) } } @@ -65,17 +65,17 @@ impl GenericTask for GetServerTask { impl GenericTask for GetServerFromNameTask { async fn run(&mut self, controller: &mut Controller) -> Result { let Some(server) = controller.servers.get_server_from_name(&self.0) else { - return Task::new_err(Status::not_found("Server not found")); + return TonicTask::new_err(Status::not_found("Server not found")); }; - Task::new_ok(Detail::from(server)) + TonicTask::new_ok(Detail::from(server)) } } #[async_trait] impl GenericTask for GetServersTask { async fn run(&mut self, controller: &mut Controller) -> Result { - Task::new_ok(List { + TonicTask::new_ok(List { servers: controller .servers .get_servers() diff --git a/controller/src/network/manage/transfer.rs b/controller/src/network/manage/transfer.rs index 0937d356..8c6c1966 100644 --- a/controller/src/network/manage/transfer.rs +++ b/controller/src/network/manage/transfer.rs @@ -8,7 +8,7 @@ use crate::{ user::transfer::{Transfer, TransferTarget}, Controller, }, - task::{BoxedAny, GenericTask, Task}, + task::{network::TonicTask, BoxedAny, GenericTask}, }; pub struct TransferUsersTask(pub Authorization, pub Vec, pub TransferTarget); @@ -29,13 +29,13 @@ impl GenericTask for TransferUsersTask { &controller.groups, ) { Ok(transfer) => transfer, - Err(error) => return Task::new_err(error.into()), + Err(error) => return TonicTask::new_err(error.into()), }; if let Err(error) = Transfer::transfer_user(&mut transfer, &controller.shared).await { - return Task::new_err(error); + return TonicTask::new_err(error); } count += 1; } - Task::new_ok(count) + TonicTask::new_ok(count) } } diff --git a/controller/src/network/manage/user.rs b/controller/src/network/manage/user.rs index f9643dae..7331e080 100644 --- a/controller/src/network/manage/user.rs +++ b/controller/src/network/manage/user.rs @@ -5,7 +5,7 @@ use uuid::Uuid; use crate::{ application::{user::CurrentServer, Controller}, network::proto::common::common_user::{Item, List}, - task::{BoxedAny, GenericTask, Task}, + task::{network::TonicTask, BoxedAny, GenericTask}, }; pub struct GetUserTask(pub Uuid); @@ -17,26 +17,17 @@ pub struct UserCountTask; impl GenericTask for GetUserTask { async fn run(&mut self, controller: &mut Controller) -> Result { let Some(user) = controller.users.get_user(&self.0) else { - return Task::new_err(Status::not_found("User not found")); + return TonicTask::new_err(Status::not_found("User not found")); }; - let (server, group) = if let CurrentServer::Connected(server) = user.server() { - let server = server.uuid(); - ( - Some(server.to_string()), - controller - .servers - .get_server(server) - .and_then(|server| server.group().clone()), - ) - } else { - (None, None) - }; - Task::new_ok(Item { + TonicTask::new_ok(Item { name: user.id().name().clone(), id: user.id().uuid().to_string(), - group, - server, + server: if let CurrentServer::Connected(server) = user.server() { + Some(server.uuid().to_string()) + } else { + None + }, }) } } @@ -45,59 +36,37 @@ impl GenericTask for GetUserTask { impl GenericTask for GetUserFromNameTask { async fn run(&mut self, controller: &mut Controller) -> Result { let Some(user) = controller.users.get_user_from_name(&self.0) else { - return Task::new_err(Status::not_found("User not found")); + return TonicTask::new_err(Status::not_found("User not found")); }; - let (server, group) = if let CurrentServer::Connected(server) = user.server() { - let server = server.uuid(); - ( - Some(server.to_string()), - controller - .servers - .get_server(server) - .and_then(|server| server.group().clone()), - ) - } else { - (None, None) - }; - Task::new_ok(Item { + TonicTask::new_ok(Item { name: user.id().name().clone(), id: user.id().uuid().to_string(), - group, - server, + server: if let CurrentServer::Connected(server) = user.server() { + Some(server.uuid().to_string()) + } else { + None + }, }) } } -// TODO: This call is very expensive -// TODO: Remove or find a different solution #[async_trait] impl GenericTask for GetUsersTask { async fn run(&mut self, controller: &mut Controller) -> Result { - Task::new_ok(List { + TonicTask::new_ok(List { users: controller .users .get_users() .iter() - .map(|user| { - let (server, group) = if let CurrentServer::Connected(server) = user.server() { - let server = server.uuid(); - ( - Some(server.to_string()), - controller - .servers - .get_server(server) - .and_then(|server| server.group().clone()), - ) + .map(|user| Item { + name: user.id().name().clone(), + id: user.id().uuid().to_string(), + server: if let CurrentServer::Connected(server) = user.server() { + Some(server.uuid().to_string()) } else { - (None, None) - }; - Item { - name: user.id().name().clone(), - id: user.id().uuid().to_string(), - group, - server, - } + None + }, }) .collect(), }) @@ -107,6 +76,6 @@ impl GenericTask for GetUsersTask { #[async_trait] impl GenericTask for UserCountTask { async fn run(&mut self, controller: &mut Controller) -> Result { - Task::new_ok(controller.users.get_user_count()) + TonicTask::new_ok(controller.users.get_user_count()) } } diff --git a/controller/src/task.rs b/controller/src/task.rs index 8725a328..330ebd72 100644 --- a/controller/src/task.rs +++ b/controller/src/task.rs @@ -1,18 +1,15 @@ -use std::{ - any::{type_name, Any}, - borrow::Cow, -}; +use std::any::Any; use anyhow::{anyhow, Result}; -use common::error::FancyError; -use simplelog::debug; -use tokio::sync::oneshot::{channel, Sender}; -use tonic::{async_trait, Request, Status}; +use tokio::sync::oneshot::Sender; +use tonic::async_trait; -use crate::application::{ - auth::{AuthType, Authorization}, - Controller, TaskSender, -}; +use crate::application::Controller; + +pub mod manager; + +pub mod network; +pub mod plugin; pub type BoxedTask = Box; pub type BoxedAny = Box; @@ -23,88 +20,12 @@ pub struct Task { } impl Task { - #[allow(clippy::result_large_err)] - pub fn get_auth(auth: AuthType, request: &Request) -> Result { - match request.extensions().get::() { - Some(data) if data.is_type(auth) => Ok(data.clone()), - _ => Err(Status::permission_denied("Not linked")), - } - } - - pub async fn execute( - auth: AuthType, - queue: &TaskSender, - request: Request, - task: F, - ) -> Result - where - F: FnOnce(Request, Authorization) -> Result, - { - let data = Self::get_auth(auth, &request)?; - debug!("Executing task with a return type of: {}", type_name::()); - match Task::create::(queue, task(request, data)?).await { - Ok(value) => value, - Err(error) => { - FancyError::print_fancy(&error, false); - Err(Status::internal(error.to_string())) - } - } - } - - pub async fn create( - queue: &TaskSender, - task: BoxedTask, - ) -> Result> { - let (sender, receiver) = channel(); - queue - .send(Task { task, sender }) - .await - .map_err(|_| anyhow!("Failed to send task to task queue"))?; - let result = receiver.await??; - match result.downcast::() { - Ok(result) => Ok(Ok(*result)), - Err(result) => match result.downcast::() { - Ok(result) => Ok(Err(*result)), - Err(_) => Err(anyhow!( - "Failed to downcast task result to the expected type. Check task implementation" - )), - }, - } - } - pub async fn run(mut self, controller: &mut Controller) -> Result<()> { let task = self.task.run(controller).await; self.sender .send(task) .map_err(|_| anyhow!("Failed to send task result to the task sender")) } - - #[allow(clippy::unnecessary_wraps)] - pub fn new_ok(value: T) -> Result { - Ok(Box::new(value)) - } - - pub fn new_empty() -> Result { - Self::new_ok(()) - } - - #[allow(clippy::unnecessary_wraps)] - pub fn new_err(value: Status) -> Result { - Ok(Box::new(value)) - } - - pub fn new_permission_error<'a, T>(message: T) -> Result - where - T: Into>, - { - Self::new_err(Status::permission_denied(message.into())) - } - - pub fn new_link_error() -> Result { - Self::new_err(Status::failed_precondition( - "Your token is not linked to the required resource for this action", - )) - } } #[async_trait] diff --git a/controller/src/task/manager.rs b/controller/src/task/manager.rs new file mode 100644 index 00000000..a9bef4d0 --- /dev/null +++ b/controller/src/task/manager.rs @@ -0,0 +1,54 @@ +use std::sync::{ + atomic::{AtomicBool, Ordering}, + Arc, +}; + +use anyhow::{anyhow, Result}; +use tokio::sync::mpsc::{channel, Receiver, Sender}; + +use super::Task; + +const TASK_BUFFER: usize = 128; + +#[derive(Clone)] +pub struct TaskSender(Arc, Sender); + +pub struct TaskManager { + ready: Arc, + + sender: Sender, + receiver: Receiver, +} + +impl TaskManager { + pub fn init() -> Self { + let (sender, receiver) = channel(TASK_BUFFER); + Self { + ready: Arc::new(AtomicBool::new(false)), + sender, + receiver, + } + } + + pub fn set_ready(&self, ready: bool) { + self.ready.store(ready, Ordering::Relaxed); + } + + pub fn get_sender(&self) -> TaskSender { + TaskSender(self.ready.clone(), self.sender.clone()) + } + + pub async fn recv(&mut self) -> Option { + self.receiver.recv().await + } +} + +impl TaskSender { + pub fn inner(&self) -> Result<&Sender> { + if self.0.load(Ordering::Relaxed) { + Ok(&self.1) + } else { + Err(anyhow!("Attempting to use the task system before it is ready or after it has been shut down. Was it used during the initialization or cleanup phase?")) + } + } +} diff --git a/controller/src/task/network.rs b/controller/src/task/network.rs new file mode 100644 index 00000000..34b09a16 --- /dev/null +++ b/controller/src/task/network.rs @@ -0,0 +1,100 @@ +// Tasks that are used by the tonic network code + +use std::{any::type_name, borrow::Cow}; + +use anyhow::{anyhow, Result}; +use common::error::FancyError; +use simplelog::debug; +use tokio::sync::oneshot::channel; +use tonic::{Request, Status}; + +use crate::{ + application::auth::{AuthType, Authorization}, + task::Task, +}; + +use super::{manager::TaskSender, BoxedAny, BoxedTask}; + +pub struct TonicTask; + +impl TonicTask { + #[allow(clippy::result_large_err)] + pub fn get_auth(auth: AuthType, request: &Request) -> Result { + match request.extensions().get::() { + Some(data) if data.is_type(auth) => Ok(data.clone()), + _ => Err(Status::permission_denied("Not linked")), + } + } + + pub async fn execute( + auth: AuthType, + queue: &TaskSender, + request: Request, + task: F, + ) -> Result + where + F: FnOnce(Request, Authorization) -> Result, + { + let data = Self::get_auth(auth, &request)?; + debug!( + "Executing tonic task with a return type of: {}", + type_name::() + ); + match Self::create::(queue, task(request, data)?).await { + Ok(value) => value, + Err(error) => { + FancyError::print_fancy(&error, false); + Err(Status::internal(error.to_string())) + } + } + } + + pub async fn create( + queue: &TaskSender, + task: BoxedTask, + ) -> Result> { + let (sender, receiver) = channel(); + queue + .inner()? + .send(Task { task, sender }) + .await + .map_err(|_| anyhow!("Failed to send task to task queue"))?; + let result = receiver.await??; + match result.downcast::() { + Ok(result) => Ok(Ok(*result)), + Err(result) => match result.downcast::() { + Ok(result) => Ok(Err(*result)), + Err(_) => Err(anyhow!( + "Failed to downcast task result to the expected type. Check task implementation" + )), + }, + } + } + + #[allow(clippy::unnecessary_wraps)] + pub fn new_ok(value: T) -> Result { + Ok(Box::new(value)) + } + + pub fn new_empty() -> Result { + Self::new_ok(()) + } + + #[allow(clippy::unnecessary_wraps)] + pub fn new_err(value: Status) -> Result { + Ok(Box::new(value)) + } + + pub fn new_permission_error<'a, T>(message: T) -> Result + where + T: Into>, + { + Self::new_err(Status::permission_denied(message.into())) + } + + pub fn new_link_error() -> Result { + Self::new_err(Status::failed_precondition( + "Your token is not linked to the required resource for this action", + )) + } +} diff --git a/controller/src/task/plugin.rs b/controller/src/task/plugin.rs new file mode 100644 index 00000000..cc73dae0 --- /dev/null +++ b/controller/src/task/plugin.rs @@ -0,0 +1,65 @@ +// Tasks that are used by the plugin system + +use std::any::type_name; + +use anyhow::{anyhow, Result}; +use simplelog::debug; +use tokio::sync::oneshot::channel; + +use crate::task::Task; + +use super::{manager::TaskSender, BoxedAny, BoxedTask, GenericTask}; + +pub struct PluginTask; + +impl PluginTask { + pub async fn execute(queue: &TaskSender, task: T) -> Result + where + T: GenericTask + Send + 'static, + { + debug!( + "Executing plugin task with a return type of: {}", + type_name::() + ); + match Self::create::(queue, Box::new(task)).await { + Ok(value) => value, + Err(error) => Err(error), + } + } + + pub async fn create( + queue: &TaskSender, + task: BoxedTask, + ) -> Result> { + let (sender, receiver) = channel(); + queue + .inner()? + .send(Task { task, sender }) + .await + .map_err(|_| anyhow!("Failed to send task to task queue"))?; + let result = receiver.await??; + match result.downcast::() { + Ok(result) => Ok(Ok(*result)), + Err(result) => match result.downcast::() { + Ok(result) => Ok(Err(*result)), + Err(_) => Err(anyhow!( + "Failed to downcast task result to the expected type. Check task implementation" + )), + }, + } + } + + #[allow(clippy::unnecessary_wraps)] + pub fn new_ok(value: T) -> Result { + Ok(Box::new(value)) + } + + pub fn _new_empty() -> Result { + Self::new_ok(()) + } + + #[allow(clippy::unnecessary_wraps)] + pub fn _new_err(value: anyhow::Error) -> Result { + Ok(Box::new(value)) + } +} diff --git a/plugins/cloudflare/Cargo.toml b/plugins/cloudflare/Cargo.toml index bcae3e20..6da43619 100644 --- a/plugins/cloudflare/Cargo.toml +++ b/plugins/cloudflare/Cargo.toml @@ -24,5 +24,12 @@ serde = { version = "1.0.219", features = ["derive"] } toml = "0.8.22" walkdir = "2.5.0" +# Servers +regex = "1.11.1" + +# Cloudflare API +url = { version = "2.5.4", features = ["serde"] } +serde_json = "1.0.140" + [build-dependencies] toml = "0.8.22" \ No newline at end of file diff --git a/plugins/cloudflare/configs/config.toml b/plugins/cloudflare/configs/config.toml new file mode 100644 index 00000000..559d1b19 --- /dev/null +++ b/plugins/cloudflare/configs/config.toml @@ -0,0 +1,29 @@ +# How often the plugin should update the DNS records and weights +# NOTE: This is in updates per minute +rate = 6 + +[account] +# The cloudflare API token +token = "" + +[[entries]] +# The Zone ID from your Cloudflare dashboard +zone = "" +# The DNS record name under which the plugin will create SRV records +name = "_minecraft._tcp.mynetwork.net" +# A regular expression to identify servers that should receive an SRV record +servers = "^lobby-.*" +# The priority assigned to the SRV records +priority = 5 + +# The weight is calculated using a formula where k is a constant, a is a constant, max is the maximum number of players on a server, +# and x is the current number of players on the server. +# NOTE: Adjust the values as necessary: https://www.geogebra.org/m/fmat9nc2 +[entries.weight] +# Scale factor: Multiplies the entire result to address resolution issues in scenarios with a low max value +a = 1.5 +# Determines the rate of weight reduction; higher values result in a steeper decline +# NOTE: Since weights can only be represented as integers, excessively high values may cause resolution issues. +k = 2.5 +# Maximum number of players allowed per server +max = 20 \ No newline at end of file diff --git a/plugins/cloudflare/src/listener.rs b/plugins/cloudflare/src/listener.rs index 58ffb879..6be1f9ef 100644 --- a/plugins/cloudflare/src/listener.rs +++ b/plugins/cloudflare/src/listener.rs @@ -1,20 +1,73 @@ -use crate::generated::{ - exports::plugin::system::event::GuestListener, - plugin::system::{data_types::Server, types::ErrorMessage}, +use std::{cell::RefCell, rc::Rc}; + +use anyhow::Result; +use regex::Regex; + +use crate::{ + error, + generated::{ + exports::plugin::system::event::GuestListener, + plugin::system::{data_types::Server, types::ErrorMessage}, + }, + plugin::{batcher::Batcher, config::Entry}, }; -pub struct Listener(); +pub struct Listener { + /* Configuration */ + entries: Vec<(Regex, Entry)>, + + /* Batcher */ + batcher: Rc>, +} + +impl Listener { + pub fn new(entries: &[Entry], batcher: Rc>) -> Self { + Self { + entries: entries + .iter() + .filter_map(|entry| match Regex::new(&entry.servers) { + Ok(servers) => Some((servers, entry.clone())), + Err(error) => { + error!( + "Failed to compile regex({}) for entry({}): {}", + entry.servers, entry.name, error + ); + None + } + }) + .collect(), + batcher, + } + } +} impl GuestListener for Listener { fn server_start(&self, _: Server) -> Result<(), ErrorMessage> { - Ok(()) + unimplemented!() } - fn server_stop(&self, _: Server) -> Result<(), ErrorMessage> { + fn server_stop(&self, server: Server) -> Result<(), ErrorMessage> { + for (regex, entry) in &self.entries { + if regex.is_match(&server.name) { + self.batcher.borrow_mut().delete(entry.clone(), server.uuid); + break; + } + } Ok(()) } - fn server_change_ready(&self, _: Server, _: bool) -> Result<(), ErrorMessage> { + fn server_change_ready(&self, server: Server, ready: bool) -> Result<(), ErrorMessage> { + if !ready { + // Only run if the server is ready + return Ok(()); + } + + for (regex, entry) in &self.entries { + if regex.is_match(&server.name) { + self.batcher.borrow_mut().create(entry.clone(), server); + break; + } + } Ok(()) } } diff --git a/plugins/cloudflare/src/main.rs b/plugins/cloudflare/src/main.rs index 8d44f819..9fe3a2a7 100644 --- a/plugins/cloudflare/src/main.rs +++ b/plugins/cloudflare/src/main.rs @@ -1,5 +1,6 @@ #![no_main] #![warn(clippy::all, clippy::pedantic)] +#![feature(let_chains)] use dummy::{node::Node, screen::Screen}; use generated::{ @@ -17,6 +18,7 @@ mod dummy; mod listener; mod log; mod plugin; +mod storage; #[allow(clippy::all)] pub mod generated { diff --git a/plugins/cloudflare/src/plugin.rs b/plugins/cloudflare/src/plugin.rs index dff85d42..4603c269 100644 --- a/plugins/cloudflare/src/plugin.rs +++ b/plugins/cloudflare/src/plugin.rs @@ -1,6 +1,18 @@ +use std::{ + cell::RefCell, + rc::Rc, + time::{Duration, Instant}, +}; + use anyhow::Result; +use backend::Backend; +use batcher::Batcher; +use config::Config; +use dns::manager::Records; + use crate::{ + error, generated::{ exports::plugin::system::{ bridge::{ @@ -14,32 +26,76 @@ use crate::{ listener::Listener, }; +pub mod backend; +pub mod batcher; +pub mod config; +pub mod dns; +pub mod math; + // Include the build information generated by build.rs include!(concat!(env!("OUT_DIR"), "/build_info.rs")); pub const AUTHORS: [&str; 1] = ["HttpRafa"]; pub const FEATURES: Features = Features::LISTENER; -pub struct Cloudflare(); +pub struct Cloudflare { + /* Configuration */ + config: RefCell, + + /* Batcher */ + batcher: Rc>, + + /* Managers */ + backend: RefCell, + records: RefCell, + + /* Tick Intervals */ + last: RefCell, +} impl GuestPlugin for Cloudflare { fn new(_: String) -> Self { - Self {} + Self { + config: RefCell::new(Config::default()), // Dummy config + batcher: Rc::new(RefCell::new(Batcher::default())), + backend: RefCell::new(Backend::default()), + records: RefCell::new(Records::default()), + last: RefCell::new(Instant::now()), + } } fn init(&self) -> Information { + fn inner(own: &Cloudflare) -> Result<()> { + // Load configuration + { + let config = Config::parse()?; + own.backend.replace(Backend::new(&config)); + own.records.replace(Records::new(&config)); + own.config.replace(config); + } + Ok(()) + } + Information { authors: AUTHORS.iter().map(|author| (*author).to_string()).collect(), version: VERSION.to_string(), features: FEATURES, - ready: true, + ready: if let Err(error) = inner(self) { + error!("Failed to initialize plugin: {}", error); + false + } else { + true + }, } } fn init_listener(&self) -> (Events, GenericListener) { ( - Events::SERVER_START | Events::SERVER_STOP, - GenericListener::new(Listener()), + Events::SERVER_STOP | Events::SERVER_CHANGE_READY, + GenericListener::new(Listener::new( + &self.config.borrow().entries, + self.batcher.clone(), + )), ) } @@ -53,10 +109,21 @@ impl GuestPlugin for Cloudflare { } fn tick(&self) -> Result<(), ScopedErrors> { + if self.last.borrow().elapsed() + >= Duration::from_secs(60 / u64::from(self.config.borrow().rate)) + { + self.last.replace(Instant::now()); + + // Execute update + self.records + .borrow_mut() + .tick(&self.backend.borrow(), &mut self.batcher.borrow_mut()); + } Ok(()) } fn shutdown(&self) -> Result<(), ScopedErrors> { + self.records.borrow_mut().shutdown(&self.backend.borrow()); Ok(()) } } diff --git a/plugins/cloudflare/src/plugin/backend.rs b/plugins/cloudflare/src/plugin/backend.rs new file mode 100644 index 00000000..e557f913 --- /dev/null +++ b/plugins/cloudflare/src/plugin/backend.rs @@ -0,0 +1,17 @@ +use super::config::Config; + +pub mod batch; +mod common; + +#[derive(Default)] +pub struct Backend { + token: String, +} + +impl Backend { + pub fn new(config: &Config) -> Self { + Self { + token: config.account.token.clone(), + } + } +} diff --git a/plugins/cloudflare/src/plugin/backend/batch.rs b/plugins/cloudflare/src/plugin/backend/batch.rs new file mode 100644 index 00000000..654394b9 --- /dev/null +++ b/plugins/cloudflare/src/plugin/backend/batch.rs @@ -0,0 +1,24 @@ +use data::{BBatch, BBatchResult}; + +use crate::error; + +use super::Backend; + +pub mod data; +pub mod delete; +pub mod record; + +impl Backend { + pub fn send_batch(&self, zone: &str, batch: &BBatch) -> Option { + let response = + self.post_object_to_api(&format!("zones/{zone}/dns_records/batch"), &batch)?; + if !response.success { + error!("Failed to send batched dns updates:"); + for (i, error) in response.errors.into_iter().enumerate() { + error!("{i}: {:?}", error); + } + return None; + } + Some(response.result) + } +} diff --git a/plugins/cloudflare/src/plugin/backend/batch/data.rs b/plugins/cloudflare/src/plugin/backend/batch/data.rs new file mode 100644 index 00000000..5eb5801a --- /dev/null +++ b/plugins/cloudflare/src/plugin/backend/batch/data.rs @@ -0,0 +1,21 @@ +use serde::{Deserialize, Serialize}; + +use super::{ + delete::BDelete, + record::{BRRecord, BRecord}, +}; + +#[derive(Serialize, Default, Clone)] +pub struct BBatch { + pub deletes: Vec, + pub patches: Vec, + pub posts: Vec, + pub puts: Vec<()>, +} + +#[derive(Deserialize, Clone)] +pub struct BBatchResult { + pub deletes: Option>, + pub patches: Option>, + pub posts: Option>, +} diff --git a/plugins/cloudflare/src/plugin/backend/batch/delete.rs b/plugins/cloudflare/src/plugin/backend/batch/delete.rs new file mode 100644 index 00000000..9d440eaf --- /dev/null +++ b/plugins/cloudflare/src/plugin/backend/batch/delete.rs @@ -0,0 +1,16 @@ +use serde::Serialize; + +use crate::plugin::dns::Record; + +#[derive(Serialize, Clone)] +pub struct BDelete { + id: String, +} + +impl From<&Record> for BDelete { + fn from(value: &Record) -> Self { + Self { + id: value.id.clone().unwrap_or_default(), + } + } +} diff --git a/plugins/cloudflare/src/plugin/backend/batch/record.rs b/plugins/cloudflare/src/plugin/backend/batch/record.rs new file mode 100644 index 00000000..522172e1 --- /dev/null +++ b/plugins/cloudflare/src/plugin/backend/batch/record.rs @@ -0,0 +1,50 @@ +use serde::{Deserialize, Serialize}; + +use crate::plugin::{config::Entry, dns::Record}; + +#[derive(Serialize, Deserialize, Clone)] +pub struct BData { + pub priority: u16, + pub weight: u16, + pub port: u16, + pub target: String, +} + +#[derive(Serialize, Clone)] +pub struct BRecord { + pub comment: String, + pub data: BData, + pub name: String, + pub proxied: bool, + pub ttl: u16, + pub r#type: String, + pub id: Option, +} + +#[derive(Deserialize, Clone)] +pub struct BRRecord { + pub comment: String, + pub id: String, + pub data: BData, +} + +impl BRecord { + pub fn new(entry: &Entry, record: &Record) -> Option { + let address = record.server.allocation.ports.first()?; + + Some(Self { + comment: record.server.uuid.clone(), + data: BData { + priority: entry.priority, + weight: record.weight, + port: address.port, + target: address.host.clone(), + }, + name: entry.name.clone(), + proxied: false, + ttl: 1, + r#type: "SRV".to_string(), + id: record.id.clone(), + }) + } +} diff --git a/plugins/cloudflare/src/plugin/backend/common.rs b/plugins/cloudflare/src/plugin/backend/common.rs new file mode 100644 index 00000000..e52d5638 --- /dev/null +++ b/plugins/cloudflare/src/plugin/backend/common.rs @@ -0,0 +1,110 @@ +use data::BObject; +use serde::{de::DeserializeOwned, Serialize}; + +use crate::{ + debug, error, + generated::plugin::system::http::{send_http_request, Header, Method, Response}, +}; + +use super::Backend; + +pub mod data; +pub mod error; + +pub const CLOUDFLARE_API_URL: &str = "https://api.cloudflare.com/client/v4"; + +impl Backend { + pub fn post_object_to_api( + &self, + target: &str, + object: &T, + ) -> Option> { + let body = serde_json::to_vec(object).ok(); + self.send_to_api_parse(Method::Post, target, 200, body.as_deref(), None) + } + + fn send_to_api_parse( + &self, + method: Method, + target: &str, + expected_code: u32, + body: Option<&[u8]>, + page: Option, + ) -> Option { + let mut url = format!("{CLOUDFLARE_API_URL}/{target}"); + if let Some(page) = page { + url = format!("{}?page={}", &url, &page); + } + debug!( + "Sending request to the cloudflare api: {:?} {}", + method, &url + ); + let response = send_http_request( + method, + &url, + &[ + Header { + key: "Authorization".to_string(), + value: format!("Bearer {}", self.token), + }, + Header { + key: "Content-Type".to_string(), + value: "application/json".to_string(), + }, + ], + body, + ); + if let Some(response) = Self::handle_response::(&url, body, response, expected_code) { + return Some(response); + } + None + } + + fn check_response( + url: &str, + body: Option<&[u8]>, + response: Option, + expected_code: u32, + ) -> Option { + response.as_ref()?; + let response = response.unwrap(); + if response.status_code != expected_code { + error!( + "An unexpected error occurred while sending a request to the cloudflare api at {}: Received {} status code {} - {}", + url, + response.status_code, + response.reason_phrase, + String::from_utf8_lossy(&response.bytes) + ); + if let Some(body) = body { + debug!("Sended body: {}", String::from_utf8_lossy(body)); + } + debug!( + "Response body: {}", + String::from_utf8_lossy(&response.bytes) + ); + return None; + } + Some(response) + } + + fn handle_response( + url: &str, + body: Option<&[u8]>, + response: Option, + expected_code: u32, + ) -> Option { + let response = Self::check_response(url, body, response, expected_code)?; + let body = response.bytes; + let response = serde_json::from_slice::(&body); + if let Err(error) = response { + error!( + "Failed to parse response from the cloudflare api at URL {}: {}", + url, &error + ); + debug!("Response body: {}", String::from_utf8_lossy(&body)); + return None; + } + Some(response.unwrap()) + } +} diff --git a/plugins/cloudflare/src/plugin/backend/common/data.rs b/plugins/cloudflare/src/plugin/backend/common/data.rs new file mode 100644 index 00000000..9a4457b1 --- /dev/null +++ b/plugins/cloudflare/src/plugin/backend/common/data.rs @@ -0,0 +1,10 @@ +use serde::Deserialize; + +use super::error::BError; + +#[derive(Deserialize)] +pub struct BObject { + pub errors: Vec, + pub success: bool, + pub result: T, +} diff --git a/plugins/cloudflare/src/plugin/backend/common/error.rs b/plugins/cloudflare/src/plugin/backend/common/error.rs new file mode 100644 index 00000000..9c076b6b --- /dev/null +++ b/plugins/cloudflare/src/plugin/backend/common/error.rs @@ -0,0 +1,9 @@ +use serde::Deserialize; + +#[derive(Deserialize, Debug, Clone)] +#[allow(dead_code)] +pub struct BError { + pub code: u32, + pub message: String, + pub documentation_url: String, +} diff --git a/plugins/cloudflare/src/plugin/batcher.rs b/plugins/cloudflare/src/plugin/batcher.rs new file mode 100644 index 00000000..07f0f7f3 --- /dev/null +++ b/plugins/cloudflare/src/plugin/batcher.rs @@ -0,0 +1,35 @@ +use std::collections::HashMap; + +use crate::generated::plugin::system::data_types::{Server, Uuid}; + +use super::config::Entry; + +pub enum Action { + Create(Server), + Delete, +} + +#[derive(Default)] +pub struct Batcher { + inner: HashMap)>, // Grouped by zone, entry and uuid of server +} + +impl Batcher { + pub fn create(&mut self, entry: Entry, server: Server) { + self.inner + .entry(entry.zone.clone()) + .or_insert((entry, HashMap::new())) + .1 + .insert(server.uuid.clone(), Action::Create(server)); + } + pub fn delete(&mut self, entry: Entry, uuid: String) { + self.inner + .entry(entry.zone.clone()) + .or_insert((entry, HashMap::new())) + .1 + .insert(uuid, Action::Delete); + } + pub fn drain(&mut self, zone: &str) -> Option<&mut (Entry, HashMap)> { + self.inner.get_mut(zone) + } +} diff --git a/plugins/cloudflare/src/plugin/config.rs b/plugins/cloudflare/src/plugin/config.rs new file mode 100644 index 00000000..a98ac59a --- /dev/null +++ b/plugins/cloudflare/src/plugin/config.rs @@ -0,0 +1,92 @@ +use std::{ + fs, + hash::{Hash, Hasher}, +}; + +use anyhow::Result; +use common::file::SyncLoadFromTomlFile; +use serde::Deserialize; + +use crate::storage::Storage; + +const DEFAULT_CONFIG: &str = + include_str!(concat!(env!("CARGO_MANIFEST_DIR"), "/configs/config.toml")); + +#[derive(Deserialize, Default)] +pub struct Account { + pub token: String, +} + +#[derive(Deserialize, Clone)] +pub struct Weight { + pub a: f64, + pub k: f64, + pub max: f64, +} + +#[derive(Deserialize, Clone)] +pub struct Entry { + pub zone: String, + pub name: String, + pub servers: String, + pub priority: u16, + pub weight: Weight, +} + +#[derive(Deserialize, Default)] +pub struct Config { + pub rate: u16, + pub account: Account, + pub entries: Vec, +} + +impl Config { + pub fn parse() -> Result { + let path = Storage::primary_config_file(); + if !path.exists() { + if let Some(parent) = path.parent() { + fs::create_dir_all(parent)?; + } + fs::write(&path, DEFAULT_CONFIG)?; + } + Self::from_file(&path) + } +} + +impl Hash for Weight { + fn hash(&self, state: &mut H) { + state.write_u64(self.a.to_bits()); + state.write_u64(self.k.to_bits()); + state.write_u64(self.max.to_bits()); + } +} + +impl PartialEq for Weight { + fn eq(&self, other: &Self) -> bool { + self.a.to_bits() == other.a.to_bits() + && self.k.to_bits() == other.k.to_bits() + && self.max.to_bits() == other.max.to_bits() + } +} +impl Eq for Weight {} + +impl PartialEq for Entry { + fn eq(&self, other: &Self) -> bool { + self.name == other.name + && self.servers == other.servers + && self.priority == other.priority + && self.weight == other.weight + } +} +impl Eq for Entry {} + +impl Hash for Entry { + fn hash(&self, state: &mut H) { + self.name.hash(state); + self.servers.hash(state); + self.priority.hash(state); + self.weight.hash(state); + } +} + +impl SyncLoadFromTomlFile for Config {} diff --git a/plugins/cloudflare/src/plugin/dns.rs b/plugins/cloudflare/src/plugin/dns.rs new file mode 100644 index 00000000..23a9f0a8 --- /dev/null +++ b/plugins/cloudflare/src/plugin/dns.rs @@ -0,0 +1,38 @@ +use crate::generated::plugin::system::{data_types::Server, server::get_server}; + +use super::{config::Weight, math::WeightCalc}; + +pub mod manager; + +pub struct Record { + pub server: Server, + pub id: Option, + pub weight: u16, +} + +impl Record { + pub fn new(values: &Weight, server: Server, id: Option) -> Self { + let mut record = Self { + server, + id, + weight: 0, + }; + record.update_weight(values); + record + } + + fn update_weight(&mut self, values: &Weight) -> bool { + let new = WeightCalc::calc_from(self.server.connected_users, values); + let changed = new != self.weight; + self.weight = new; + changed + } + + pub fn update(&mut self, values: &Weight) -> bool { + if let Ok(Some(server)) = get_server(&self.server.uuid) { + self.server = server; + } + + self.update_weight(values) + } +} diff --git a/plugins/cloudflare/src/plugin/dns/manager.rs b/plugins/cloudflare/src/plugin/dns/manager.rs new file mode 100644 index 00000000..fd84c230 --- /dev/null +++ b/plugins/cloudflare/src/plugin/dns/manager.rs @@ -0,0 +1,179 @@ +use std::collections::HashMap; + +use crate::{ + generated::plugin::system::data_types::Uuid, + info, + plugin::{ + backend::{ + batch::{ + data::{BBatch, BBatchResult}, + delete::BDelete, + record::BRecord, + }, + Backend, + }, + batcher::{Action, Batcher}, + config::{Config, Entry}, + }, +}; + +use super::Record; + +#[derive(Default)] +pub struct Zone { + records: HashMap>, +} + +#[derive(Default)] +pub struct Records { + zones: HashMap, +} + +impl Records { + pub fn new(config: &Config) -> Self { + let mut zones: HashMap = HashMap::new(); + for entry in &config.entries { + zones + .entry(entry.zone.clone()) + .or_default() + .records + .entry(entry.clone()) + .or_default(); + } + info!("Found {} unique zones", zones.len()); + Self { zones } + } + + pub fn tick(&mut self, backend: &Backend, batcher: &mut Batcher) { + for (zone_id, zone) in &mut self.zones { + let mut batch = BBatch::default(); + + // Process create/delete actions and prepare placeholder records + if let Some((entry, actions)) = batcher.drain(zone_id) { + for (uuid, action) in actions.drain() { + match action { + Action::Create(server) => { + // Prepare DNS record and send create + let record = Record::new(&entry.weight, server, None); + if let Some(brecord) = BRecord::new(entry, &record) { + batch.posts.push(brecord); + // Insert placeholder to be updated once CF returns an ID + zone.records + .entry(entry.clone()) + .or_default() + .insert(uuid.clone(), record); + } + } + Action::Delete => { + // Send delete for existing record + if let Some(record_map) = zone.records.get(entry) + && let Some(existing_record) = record_map.get(&uuid) + { + // If this record is a placeholder we skip it. + if existing_record.id.is_none() { + continue; + } + + batch.deletes.push(BDelete::from(existing_record)); + } + } + } + } + } + + // Update weights for all existing records + for (entry, record_map) in &mut zone.records { + for record in record_map.values_mut() { + // If this record is a placeholder we skip it. + if record.id.is_none() { + continue; + } + + if record.update(&entry.weight) + && let Some(brec) = BRecord::new(entry, record) + { + batch.patches.push(brec); + } + } + } + + if batch.deletes.is_empty() + && batch.patches.is_empty() + && batch.posts.is_empty() + && batch.puts.is_empty() + { + // No request to cloudflare required + continue; + } + + // Execute batch and apply Cloudflare response + if let Some(BBatchResult { + posts, + patches, + deletes, + }) = backend.send_batch(zone_id, &batch) + { + // Apply deletions + if let Some(deletes) = deletes { + for delete_result in deletes { + for record_map in zone.records.values_mut() { + record_map.remove(&delete_result.comment); + } + } + } + + // Apply creations: set CF-assigned IDs on placeholders + if let Some(posts) = posts { + for created_record in posts { + for record_map in zone.records.values_mut() { + if let Some(record) = record_map.get_mut(&created_record.comment) { + // Only update placeholders (empty id) + if record.id.is_none() { + record.id = Some(created_record.id.clone()); + } + break; + } + } + } + } + + // Apply patches: update id (in case CF changed) and weight + if let Some(patches) = patches { + for patch_record in patches { + for record_map in zone.records.values_mut() { + if let Some(record) = record_map.get_mut(&patch_record.comment) { + record.weight = patch_record.data.weight; + break; + } + } + } + } + } + } + } + + pub fn shutdown(&mut self, backend: &Backend) { + info!("Deleting created records..."); + + let mut count = 0; + for (zone_id, zone) in &mut self.zones { + let mut batch = BBatch::default(); + + for record_map in zone.records.values_mut() { + for record in record_map.values_mut() { + // If this record is a placeholder we skip it. + if record.id.is_none() { + continue; + } + + batch.deletes.push(BDelete::from(&*record)); + } + } + + count += batch.deletes.len(); + backend.send_batch(zone_id, &batch); + } + + info!("Deleted {} records...", count); + } +} diff --git a/plugins/cloudflare/src/plugin/math.rs b/plugins/cloudflare/src/plugin/math.rs new file mode 100644 index 00000000..f46c7e7d --- /dev/null +++ b/plugins/cloudflare/src/plugin/math.rs @@ -0,0 +1,18 @@ +use super::config::Weight; + +pub struct WeightCalc; + +impl WeightCalc { + pub fn calc_from(x: u32, values: &Weight) -> u16 { + Self::calc(x, values.max, values.k, values.a) + } + + #[allow(clippy::cast_sign_loss, clippy::cast_possible_truncation)] + pub fn calc(x: u32, max: f64, k: f64, a: f64) -> u16 { + let numerator = (-k * (f64::from(x) / max)).exp() - (-k).exp(); + let denominator = 1.0 - (-k).exp(); + let weight = a * max * (numerator / denominator); + + weight.round().clamp(0.0, f64::from(u16::MAX)) as u16 + } +} diff --git a/plugins/cloudflare/src/storage.rs b/plugins/cloudflare/src/storage.rs new file mode 100644 index 00000000..58b10bfc --- /dev/null +++ b/plugins/cloudflare/src/storage.rs @@ -0,0 +1,25 @@ +use std::path::PathBuf; + +/* Configs */ +const CONFIG_DIRECTORY: &str = "/configs"; +const PRIMARY_CONFIG_FILE: &str = "config.toml"; + +/* Data */ +//const DATA_DIRECTORY: &str = "/data"; + +pub struct Storage; + +impl Storage { + /* Configs */ + pub fn configs_directory() -> PathBuf { + PathBuf::from(CONFIG_DIRECTORY) + } + pub fn primary_config_file() -> PathBuf { + Self::configs_directory().join(PRIMARY_CONFIG_FILE) + } + + /* Data */ + //pub fn data_directory(host: bool) -> PathBuf { + // PathBuf::from(DATA_DIRECTORY) + //} +} diff --git a/protocol/grpc/common/user.proto b/protocol/grpc/common/user.proto index f63b0960..d96b4469 100644 --- a/protocol/grpc/common/user.proto +++ b/protocol/grpc/common/user.proto @@ -12,7 +12,6 @@ message CommonUser { message Item { string name = 1; string id = 2; - optional string group = 3; optional string server = 4; } } \ No newline at end of file diff --git a/protocol/wit/plugin.wit b/protocol/wit/plugin.wit index 46c1d767..930684ff 100644 --- a/protocol/wit/plugin.wit +++ b/protocol/wit/plugin.wit @@ -92,9 +92,17 @@ interface data-types { group: option, allocation: allocation, token: string, + connected-users: u32, } } +interface server { + use types.{error-message}; + use data-types.{uuid, server}; + + get-server: func(uuid: uuid) -> result, error-message>; +} + interface log { variant level { debug, @@ -240,6 +248,7 @@ world plugin { export screen; export event; import guard; + import server; import log; import platform; import tls;