Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ members = [
]

[workspace.metadata]
protocol-version = 12
protocol-version = 13

[profile.release]
lto = true
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,6 @@ public interface Resources {
*/
CompletableFuture<Optional<SimpleCloudGroup>> groupFromName(String name);

/**
* Retrieves a SimpleGroup object that contains the specified user as a member.
*
* @param uuid the uuid of the user to retrieve the group for
* @return a SimpleGroup instance
*/
CompletableFuture<Optional<SimpleCloudGroup>> groupFromUser(UUID uuid);

/**
* Retrieves an array of SimpleServer objects.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,5 @@ public interface CloudUser {

UUID uuid();

Optional<String> group();

Optional<UUID> server();
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,17 +39,6 @@ public CompletableFuture<Optional<SimpleCloudGroup>> groupFromName(String name)
});
}

@Override
public CompletableFuture<Optional<SimpleCloudGroup>> groupFromUser(UUID uuid) {
return Cloud.users().userFromUuid(uuid).thenCompose(user -> {
if (user.isPresent() && user.get().group().isPresent()) {
return this.groupFromName(user.get().group().get());
} else {
return CompletableFuture.completedFuture(Optional.empty());
}
});
}

@Override
public CompletableFuture<SimpleCloudServer[]> servers() {
return this.connection.servers().thenApply(list -> list.getServersList().stream()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,4 @@
import java.util.Optional;
import java.util.UUID;

public record CloudUserImpl(String name, UUID uuid, Optional<String> group, Optional<UUID> server)
implements CloudUser {}
public record CloudUserImpl(String name, UUID uuid, Optional<UUID> server) implements CloudUser {}
Original file line number Diff line number Diff line change
Expand Up @@ -25,16 +25,12 @@ public CompletableFuture<Optional<CloudUser>> userFromName(String name) {
return this.connection
.userFromName(name)
.thenApply(user -> {
Optional<String> group = Optional.empty();
Optional<UUID> server = Optional.empty();
if (user.hasGroup()) {
group = Optional.of(user.getGroup());
}
if (user.hasServer()) {
server = Optional.of(UUID.fromString(user.getServer()));
}
return Optional.of((CloudUser)
new CloudUserImpl(user.getName(), UUID.fromString(user.getId()), group, server));
return Optional.of(
(CloudUser) new CloudUserImpl(user.getName(), UUID.fromString(user.getId()), server));
})
.exceptionally(throwable -> {
if (throwable.getMessage().equals("NOT_FOUND")) {
Expand All @@ -50,15 +46,11 @@ public CompletableFuture<Optional<CloudUser>> userFromUuid(@NotNull UUID uuid) {
return this.connection
.user(uuid.toString())
.thenApply(user -> {
Optional<String> group = Optional.empty();
Optional<UUID> server = Optional.empty();
if (user.hasGroup()) {
group = Optional.of(user.getGroup());
}
if (user.hasServer()) {
server = Optional.of(UUID.fromString(user.getServer()));
}
return Optional.of((CloudUser) new CloudUserImpl(user.getName(), uuid, group, server));
return Optional.of((CloudUser) new CloudUserImpl(user.getName(), uuid, server));
})
.exceptionally(throwable -> {
if (throwable.getMessage().equals("NOT_FOUND")) {
Expand Down
5 changes: 2 additions & 3 deletions controller/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ bitflags = "2.9.0"
# Signal handling
ctrlc = "3.4.6"

# Unit system
# Server system
uuid = { version = "1.16.0", features = ["v4"] }

# Command line arguments
Expand All @@ -47,12 +47,11 @@ rcgen = "0.13.2"
# Plugins
wasmtime = { version = "32.0.0", default-features = false, features = ["addr2line", "threads", "std", "runtime", "demangle", "component-model", "cranelift", "parallel-compilation", "cache"], optional = true }
wasmtime-wasi = { version = "32.0.0", optional = true }
wasmtime-wasi-http = { version = "32.0.0", optional = true }
minreq = { version = "2.13.4", features = ["https-rustls"], optional = true }

[build-dependencies]
toml = "0.8.21"
tonic-build = "0.13.0"

[features]
wasm-plugins = ["dep:wasmtime", "dep:wasmtime-wasi", "dep:wasmtime-wasi-http", "dep:minreq"]
wasm-plugins = ["dep:wasmtime", "dep:wasmtime-wasi", "dep:minreq"]
25 changes: 15 additions & 10 deletions controller/src/application.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,12 @@ use subscriber::manager::SubscriberManager;
use tls::TlsSetting;
use tokio::{
select,
sync::{mpsc, watch},
sync::watch,
time::{interval, Instant, MissedTickBehavior},
};
use user::manager::UserManager;

use crate::{config::Config, network::NetworkStack, task::Task};
use crate::{config::Config, network::NetworkStack, task::manager::TaskManager};

pub mod auth;
pub mod group;
Expand All @@ -35,17 +35,14 @@ pub mod tls;
pub mod user;

pub const TICK_RATE: u64 = 10;
const TASK_BUFFER: usize = 128;

pub type TaskSender = mpsc::Sender<Task>;

#[derive(Getters, MutGetters)]
pub struct Controller {
/* State */
state: State,

/* Tasks */
tasks: (TaskSender, mpsc::Receiver<Task>),
tasks: TaskManager,

/* Shared Components */
pub shared: Arc<Shared>,
Expand All @@ -62,6 +59,7 @@ pub struct Controller {
config: Config,
}

// This is data that is shared between network thread/tasks and plugin execution threads/tasks
pub struct Shared {
pub auth: AuthManager,
pub subscribers: SubscriberManager,
Expand All @@ -78,9 +76,9 @@ impl Controller {
tls: TlsSetting::init(&config).await?,
});

let tasks = mpsc::channel(TASK_BUFFER);
let tasks = TaskManager::init();

let plugins = PluginManager::init(&config, &shared).await?;
let plugins = PluginManager::init(&config, &tasks.get_sender(), &shared).await?;
let nodes = NodeManager::init(&plugins).await?;
let groups = GroupManager::init(&nodes).await?;

Expand All @@ -101,10 +99,14 @@ impl Controller {
}

pub async fn run(&mut self) -> Result<()> {
// Set task system to ready
self.tasks.set_ready(true);

// Setup signal handlers
self.setup_handlers()?;

let network = NetworkStack::start(&self.config, &self.shared, &self.tasks.0);
let network =
NetworkStack::start(&self.config, self.shared.clone(), self.tasks.get_sender());

// Main loop
let mut interval = interval(Duration::from_millis(1000 / TICK_RATE));
Expand All @@ -114,13 +116,16 @@ impl Controller {

select! {
_ = interval.tick() => self.tick(&network).await?,
task = self.tasks.1.recv() => if let Some(task) = task {
task = self.tasks.recv() => if let Some(task) = task {
task.run(self).await?;
},
_ = self.state.signal.1.changed() => self.shutdown()?,
}
}

// Set task system to not ready
self.tasks.set_ready(false);

// Cleanup
self.cleanup(network).await?;

Expand Down
6 changes: 3 additions & 3 deletions controller/src/application/plugin/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use futures::future::join_all;
use simplelog::info;
use tick::Ticker;

use crate::{application::Shared, config::Config};
use crate::{application::Shared, config::Config, task::manager::TaskSender};

use super::BoxedPlugin;

Expand All @@ -21,13 +21,13 @@ pub struct PluginManager {
}

impl PluginManager {
pub async fn init(config: &Config, shared: &Arc<Shared>) -> Result<Self> {
pub async fn init(config: &Config, tasks: &TaskSender, shared: &Arc<Shared>) -> Result<Self> {
info!("Loading plugins...");

let mut plugins = HashMap::new();

#[cfg(feature = "wasm-plugins")]
init_wasm_plugins(config, shared, &mut plugins).await?;
init_wasm_plugins(config, tasks, shared, &mut plugins).await?;

info!("Loaded {} plugin(s)", plugins.len());
Ok(Self {
Expand Down
20 changes: 8 additions & 12 deletions controller/src/application/plugin/runtime/wasm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,14 @@ use tonic::async_trait;
use url::Url;
use wasmtime::{component::ResourceAny, AsContextMut, Engine, Store};
use wasmtime_wasi::{IoView, ResourceTable, WasiCtx, WasiView};
use wasmtime_wasi_http::{WasiHttpCtx, WasiHttpView};

use crate::application::{
node::Capabilities,
plugin::{BoxedNode, Features, GenericPlugin, Information},
Shared,
use crate::{
application::{
node::Capabilities,
plugin::{BoxedNode, Features, GenericPlugin, Information},
Shared,
},
task::manager::TaskSender,
};

pub(crate) mod config;
Expand Down Expand Up @@ -44,14 +46,14 @@ pub mod generated {

pub(crate) struct PluginState {
/* Global */
tasks: TaskSender,
shared: Arc<Shared>,

/* Plugin */
name: String,

/* Wasmtime */
wasi: WasiCtx,
http: WasiHttpCtx,
resources: ResourceTable,
}

Expand Down Expand Up @@ -244,12 +246,6 @@ impl WasiView for PluginState {
}
}

impl WasiHttpView for PluginState {
fn ctx(&mut self) -> &mut WasiHttpCtx {
&mut self.http
}
}

impl From<bridge::Information> for Information {
fn from(val: bridge::Information) -> Self {
Information {
Expand Down
1 change: 1 addition & 0 deletions controller/src/application/plugin/runtime/wasm/ext.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ mod log;
mod platform;
pub mod process;
pub mod screen;
mod server;
mod tls;

impl system::types::Host for PluginState {}
Expand Down
47 changes: 47 additions & 0 deletions controller/src/application/plugin/runtime/wasm/ext/server.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
use anyhow::Result;
use tonic::async_trait;
use uuid::Uuid;

use crate::{
application::{
plugin::runtime::wasm::{
generated::{
exports::plugin::system::bridge,
plugin::system::{self, types::ErrorMessage},
},
PluginState,
},
Controller,
},
task::{plugin::PluginTask, BoxedAny, GenericTask},
};

impl system::server::Host for PluginState {
async fn get_server(
&mut self,
uuid: String,
) -> Result<Result<Option<bridge::Server>, ErrorMessage>> {
let Ok(uuid) = Uuid::parse_str(&uuid) else {
return Ok(Err("Failed to parse provided uuid".to_string()));
};

Ok(Ok(PluginTask::execute::<Option<bridge::Server>, _>(
&self.tasks,
GetServerTask(uuid),
)
.await?))
}
}

pub struct GetServerTask(pub Uuid);

#[async_trait]
impl GenericTask for GetServerTask {
async fn run(&mut self, controller: &mut Controller) -> Result<BoxedAny> {
let Some(server) = controller.servers.get_server(&self.0) else {
return PluginTask::new_ok(None::<bridge::Server>);
};

PluginTask::new_ok(Some::<bridge::Server>(server.into()))
}
}
Loading