diff --git a/CHANGELOG.md b/CHANGELOG.md index 353d96a..acde85a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,13 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] - yyyy-mm-dd +### Changed + +- Internally refactor how nodes are "executed", split into two distinct + stages. "Planning" and "Executing". Previously, these two were intertwined and + difficult to maintain. +- Now zeroizes the key data in memory when dropped. + ### Fixed - Fix a bug where key permissions where being printed in decimal format instead diff --git a/Cargo.lock b/Cargo.lock index 30b5159..ae548ed 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -606,7 +606,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "778e2ac28f6c47af28e4907f13ffd1e1ddbd400980a9abd7c8df189bf578a5ad" dependencies = [ "libc", - "windows-sys 0.52.0", + "windows-sys 0.60.2", ] [[package]] @@ -1901,7 +1901,7 @@ dependencies = [ "errno", "libc", "linux-raw-sys", - "windows-sys 0.52.0", + "windows-sys 0.60.2", ] [[package]] @@ -1922,6 +1922,15 @@ version = "1.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49" +[[package]] +name = "secrecy" +version = "0.10.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e891af845473308773346dc847b2c23ee78fe442e0472ac50e22a18a93d3ae5a" +dependencies = [ + "zeroize", +] + [[package]] name = "semver" version = "1.0.27" @@ -3179,6 +3188,7 @@ dependencies = [ "tracing-log", "tracing-subscriber", "wire-core", + "wire-plan", ] [[package]] @@ -3220,6 +3230,22 @@ dependencies = [ "zstd", ] +[[package]] +name = "wire-execute" +version = "1.1.1" +dependencies = [ + "base64", + "enum_dispatch", + "futures-util", + "itertools", + "prost", + "secrecy", + "tracing", + "wire-core", + "wire-key-agent", + "wire-keys", +] + [[package]] name = "wire-key-agent" version = "1.1.1" @@ -3235,6 +3261,33 @@ dependencies = [ "tokio-util", ] +[[package]] +name = "wire-keys" +version = "1.1.1" +dependencies = [ + "either", + "im", + "secrecy", + "serde", + "sha2", + "tokio", + "wire-core", + "wire-key-agent", +] + +[[package]] +name = "wire-plan" +version = "1.1.1" +dependencies = [ + "enum_dispatch", + "im", + "serde", + "tokio", + "wire-core", + "wire-execute", + "wire-keys", +] + [[package]] name = "wit-bindgen-rt" version = "0.39.0" diff --git a/Cargo.toml b/Cargo.toml index bac7cdb..f5d520c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,5 +1,12 @@ [workspace] -members = ["crates/key_agent", "crates/core", "crates/cli"] +members = [ + "crates/key_agent", + "crates/core", + "crates/cli", + "crates/plan", + "crates/keys", + "crates/execute", +] resolver = "2" package.edition = "2024" package.version = "1.1.1" @@ -26,6 +33,7 @@ tracing-log = "0.2.0" tracing-subscriber = "0.3.20" im = { version = "15.1.0", features = ["serde"] } anyhow = "1.0.100" +secrecy = "0.10.3" prost = "0.14.1" nix = { version = "0.30.1", features = ["user", "poll", "term"] } miette = { version = "7.6.0", features = ["fancy"] } @@ -47,6 +55,5 @@ nix-compat = { git = "https://git.snix.dev/snix/snix.git", features = [ # ] } serde_json = { version = "1.0.145" } owo-colors = { version = "4.2.3", features = ["supports-colors"] } - -[profile.dev.package.sqlx-macros] -opt-level = 3 +enum_dispatch = { version = "0.3.13" } +itertools = "0.14.0" diff --git a/crates/cli/Cargo.toml b/crates/cli/Cargo.toml index 86895ca..6ec1b72 100644 --- a/crates/cli/Cargo.toml +++ b/crates/cli/Cargo.toml @@ -10,13 +10,14 @@ workspace = true dhat-heap = [] [dependencies] +wire-core = { path = "../core" } +wire-plan = { path = "../plan" } clap = { workspace = true } clap-verbosity-flag = { workspace = true } tokio = { workspace = true } tracing = { workspace = true } tracing-log = { workspace = true } tracing-subscriber = { workspace = true } -wire-core = { path = "../core" } serde_json = { workspace = true } miette = { workspace = true } thiserror = { workspace = true } @@ -24,7 +25,7 @@ enum-display-derive = "0.1.1" futures = "0.3.31" clap-num = "1.2.0" clap-markdown = "0.1.5" -itertools = "0.14.0" +itertools.workspace = true dhat = "0.3.2" clap_complete = { version = "4.5.60", features = ["unstable-dynamic"] } owo-colors = { workspace = true } diff --git a/crates/cli/src/apply.rs b/crates/cli/src/apply.rs index b83abbf..a6bcca9 100644 --- a/crates/cli/src/apply.rs +++ b/crates/cli/src/apply.rs @@ -2,16 +2,17 @@ // Copyright 2024-2025 wire Contributors use futures::{FutureExt, StreamExt}; -use itertools::{Either, Itertools}; +use itertools::Itertools; use miette::{Diagnostic, IntoDiagnostic, Result}; +use wire_plan::{Goal, plan_for_node}; use std::any::Any; -use std::collections::HashSet; -use std::io::{Read, stderr}; +use std::collections::{HashMap, HashSet}; +use std::io::Read; use std::sync::Arc; use std::sync::atomic::AtomicBool; use thiserror::Error; use tracing::{error, info}; -use wire_core::hive::node::{Context, GoalExecutor, Name, Node, Objective, StepState}; +use wire_core::hive::node::{Name, Node}; use wire_core::hive::{Hive, HiveLocation}; use wire_core::status::STATUS; use wire_core::{SubCommandModifiers, errors::HiveLibError}; @@ -96,7 +97,7 @@ where pub async fn apply( hive: &mut Hive, - should_shutdown: Arc, + _should_shutdown: Arc, location: HiveLocation, args: CommonVerbArgs, partition: Partitions, @@ -104,9 +105,9 @@ pub async fn apply( mut modifiers: SubCommandModifiers, ) -> Result<()> where - F: Fn(&Name, &Node) -> Objective, + F: Fn(&Name, &Node) -> Goal, { - let location = Arc::new(location); + let _location = Arc::new(location); let (tags, names) = resolve_targets(&args.on, &mut modifiers); @@ -137,65 +138,68 @@ where .lock() .add_many(&partitioned_names.iter().collect::>()); - let mut set = hive + let mut nodes: HashMap<_, _> = hive .nodes .iter_mut() .filter(|(name, _)| partitioned_names.contains(name)) .map(|(name, node)| { - info!("Resolved {:?} to include {}", args.on, name); - - let objective = make_objective(name, node); - - let context = Context { - node, + let plan = plan_for_node( + Node { + target: Arc::new(node.target), + build_remotely: node.build_remotely, + allow_local_deployment: node.allow_local_deployment, + tags: node.tags, + host_platform: node.host_platform, + privilege_escalation_command: Arc::new(node.privilege_escalation_command), + keys, + }, name, - objective, - state: StepState::default(), - hive_location: location.clone(), + goal, + hive_location.clone(), modifiers, - should_quit: should_shutdown.clone(), - }; - - GoalExecutor::new(context) - .execute() - .map(move |result| (name, result)) + should_quit.clone() + ); }) - .peekable(); + .collect(); + + // let plans = wire_plan::create_plans(nodes, goal); - if set.peek().is_none() { - error!("There are no nodes selected for deployment"); - } + // let plan = nodes.inter - let futures = futures::stream::iter(set).buffer_unordered(args.parallel); - let result = futures.collect::>().await; - let (successful, errors): (Vec<_>, Vec<_>) = - result - .into_iter() - .partition_map(|(name, result)| match result { - Ok(..) => Either::Left(name), - Err(err) => Either::Right((name, err)), - }); - - if !successful.is_empty() { - info!( - "Successfully applied goal to {} node(s): {:?}", - successful.len(), - successful - ); + if nodes.peek().is_none() { + error!("There are no nodes selected for deployment"); } - if !errors.is_empty() { - // clear the status bar if we are about to print error messages - STATUS.lock().clear(&mut stderr()); - - return Err(NodeErrors( - errors - .into_iter() - .map(|(name, error)| NodeError(name.clone(), error)) - .collect(), - ) - .into()); - } + // let futures = futures::stream::iter(set).buffer_unordered(args.parallel); + // let result = futures.collect::>().await; + // let (successful, errors): (Vec<_>, Vec<_>) = + // result + // .into_iter() + // .partition_map(|(name, result)| match result { + // Ok(..) => Either::Left(name), + // Err(err) => Either::Right((name, err)), + // }); + // + // if !successful.is_empty() { + // info!( + // "Successfully applied goal to {} node(s): {:?}", + // successful.len(), + // successful + // ); + // } + // + // if !errors.is_empty() { + // // clear the status bar if we are about to print error messages + // STATUS.lock().clear(&mut stderr()); + // + // return Err(NodeErrors( + // errors + // .into_iter() + // .map(|(name, error)| NodeError(name.clone(), error)) + // .collect(), + // ) + // .into()); + // } Ok(()) } diff --git a/crates/cli/src/cli.rs b/crates/cli/src/cli.rs index 2fadf0e..41c8bf6 100644 --- a/crates/cli/src/cli.rs +++ b/crates/cli/src/cli.rs @@ -13,6 +13,7 @@ use wire_core::SubCommandModifiers; use wire_core::commands::common::get_hive_node_names; use wire_core::hive::node::{Goal as HiveGoal, HandleUnreachable, Name, SwitchToConfigurationGoal}; use wire_core::hive::{Hive, get_hive_location}; +use wire_plan::ApplyGoal; use std::io::IsTerminal; use std::{ @@ -174,7 +175,7 @@ pub struct ApplyArgs { pub common: CommonVerbArgs, #[arg(value_enum, default_value_t)] - pub goal: Goal, + pub goal: CliApplyGoal, /// Skip key uploads. noop when [GOAL] = Keys #[arg(short, long, default_value_t = false)] @@ -267,7 +268,7 @@ pub enum Inspection { } #[derive(Clone, Debug, Default, ValueEnum, Display)] -pub enum Goal { +pub enum CliApplyGoal { /// Make the configuration the boot default and activate now #[default] Switch, @@ -285,26 +286,26 @@ pub enum Goal { DryActivate, } -impl TryFrom for HiveGoal { +impl TryFrom for ApplyGoal { type Error = miette::Error; - fn try_from(value: Goal) -> Result { + fn try_from(value: CliApplyGoal) -> Result { match value { - Goal::Build => Ok(HiveGoal::Build), - Goal::Push => Ok(HiveGoal::Push), - Goal::Boot => Ok(HiveGoal::SwitchToConfiguration( + CliApplyGoal::Build => Ok(ApplyGoal::Build), + CliApplyGoal::Push => Ok(ApplyGoal::Push), + CliApplyGoal::Boot => Ok(ApplyGoal::SwitchToConfiguration( SwitchToConfigurationGoal::Boot, )), - Goal::Switch => Ok(HiveGoal::SwitchToConfiguration( + CliApplyGoal::Switch => Ok(ApplyGoal::SwitchToConfiguration( SwitchToConfigurationGoal::Switch, )), - Goal::Test => Ok(HiveGoal::SwitchToConfiguration( + CliApplyGoal::Test => Ok(ApplyGoal::SwitchToConfiguration( SwitchToConfigurationGoal::Test, )), - Goal::DryActivate => Ok(HiveGoal::SwitchToConfiguration( + CliApplyGoal::DryActivate => Ok(ApplyGoal::SwitchToConfiguration( SwitchToConfigurationGoal::DryActivate, )), - Goal::Keys => Ok(HiveGoal::Keys), + CliApplyGoal::Keys => Ok(ApplyGoal::Keys), } } } diff --git a/crates/cli/src/main.rs b/crates/cli/src/main.rs index 7ad74e6..7ef1761 100644 --- a/crates/cli/src/main.rs +++ b/crates/cli/src/main.rs @@ -80,7 +80,7 @@ async fn main() -> Result<()> { match args.command { cli::Commands::Apply(apply_args) => { let mut hive = Hive::new_from_path(&location, cache.clone(), modifiers).await?; - let goal: wire_core::hive::node::Goal = apply_args.goal.clone().try_into().unwrap(); + let goal: wire_plan::ApplyGoal = apply_args.goal.clone().try_into().unwrap(); // Respect user's --always-build-local arg hive.force_always_local(apply_args.always_build_local)?; @@ -92,17 +92,18 @@ async fn main() -> Result<()> { apply_args.common, Partitions::default(), |name, node| { - Objective::Apply(ApplyObjective { + wire_plan::Goal::Apply { goal, - no_keys: apply_args.no_keys, - reboot: apply_args.reboot, - substitute_on_destination: apply_args.substitute_on_destination, should_apply_locally: should_apply_locally( node.allow_local_deployment, &name.0, ), - handle_unreachable: apply_args.handle_unreachable.clone().into(), - }) + no_keys: apply_args.no_keys, + substitute_on_destination: apply_args.substitute_on_destination, + reboot: apply_args.reboot, + // handle_unreachable: apply_args.handle_unreachable.clone().into(), + host_platform: node.host_platform + } }, modifiers, ) diff --git a/crates/core/Cargo.toml b/crates/core/Cargo.toml index 648312a..6f591e6 100644 --- a/crates/core/Cargo.toml +++ b/crates/core/Cargo.toml @@ -27,8 +27,8 @@ rand = "0.9.2" tokio-util = { workspace = true } portable-pty = "0.9.0" anyhow.workspace = true -itertools = "0.14.0" -enum_dispatch = "0.3.13" +itertools.workspace = true +enum_dispatch.workspace = true sha2 = { workspace = true } base64 = { workspace = true } nix-compat = { workspace = true } diff --git a/crates/core/src/commands/builder.rs b/crates/core/src/commands/builder.rs index 87ada4e..cd79ab7 100644 --- a/crates/core/src/commands/builder.rs +++ b/crates/core/src/commands/builder.rs @@ -3,30 +3,31 @@ use std::fmt; -pub(crate) struct CommandStringBuilder { +pub struct CommandStringBuilder { command: String, } impl CommandStringBuilder { - pub(crate) fn nix() -> Self { + #[must_use] + pub fn nix() -> Self { Self { command: "nix".to_string(), } } - pub(crate) fn new>(s: S) -> Self { + pub fn new>(s: S) -> Self { Self { command: s.as_ref().trim().to_string(), } } - pub(crate) fn arg>(&mut self, argument: S) { + pub fn arg>(&mut self, argument: S) { let argument = argument.as_ref().trim(); self.command.push(' '); self.command.push_str(argument); } - pub(crate) fn opt_arg>(&mut self, opt: bool, argument: S) { + pub fn opt_arg>(&mut self, opt: bool, argument: S) { if !opt { return; } @@ -34,7 +35,7 @@ impl CommandStringBuilder { self.arg(argument); } - pub(crate) fn args>(&mut self, arguments: &[S]) { + pub fn args>(&mut self, arguments: &[S]) { for arg in arguments { self.arg(arg); } diff --git a/crates/core/src/commands/common.rs b/crates/core/src/commands/common.rs index ab31e49..e9bd57f 100644 --- a/crates/core/src/commands/common.rs +++ b/crates/core/src/commands/common.rs @@ -14,7 +14,7 @@ use crate::{ errors::{CommandError, HiveInitialisationError, HiveLibError}, hive::{ HiveLocation, - node::{Context, Objective, Push}, + node::{Context, Push, Target}, }, }; @@ -28,22 +28,20 @@ fn get_common_copy_path_help(error: &CommandError) -> Option { } } -pub async fn push(context: &Context<'_>, push: Push<'_>) -> Result<(), HiveLibError> { +pub async fn push(context: &Context, target: &Target, push: Push<'_>, substitute_on_destination: bool) -> Result<(), HiveLibError> { let mut command_string = CommandStringBuilder::nix(); command_string.args(&["--extra-experimental-features", "nix-command", "copy"]); - if let Objective::Apply(apply_objective) = context.objective { - command_string.opt_arg( - apply_objective.substitute_on_destination, - "--substitute-on-destination", - ); - } + command_string.opt_arg( + substitute_on_destination, + "--substitute-on-destination", + ); command_string.arg("--to"); command_string.args(&[ format!( "ssh://{user}@{host}", - user = context.node.target.user, - host = context.node.target.get_preferred_host()?, + user = target.user, + host = target.get_preferred_host()?, ), match push { Push::Derivation(drv) => format!("{drv} --derivation"), @@ -56,10 +54,7 @@ pub async fn push(context: &Context<'_>, push: Push<'_>) -> Result<(), HiveLibEr .mode(crate::commands::ChildOutputMode::Nix), HashMap::from([( "NIX_SSHOPTS".into(), - context - .node - .target - .create_ssh_opts(context.modifiers, false)?, + target.create_ssh_opts(context.modifiers, false)?, )]), ) .await?; diff --git a/crates/core/src/commands/mod.rs b/crates/core/src/commands/mod.rs index f3ab775..18becda 100644 --- a/crates/core/src/commands/mod.rs +++ b/crates/core/src/commands/mod.rs @@ -2,7 +2,7 @@ // Copyright 2024-2025 wire Contributors use crate::commands::pty::{InteractiveChildChip, interactive_command_with_env}; -use std::{collections::HashMap, str::from_utf8, sync::LazyLock}; +use std::{collections::HashMap, hash::BuildHasher, str::from_utf8, sync::{Arc, LazyLock}}; use aho_corasick::AhoCorasick; use gjson::Value; @@ -15,16 +15,16 @@ use crate::{ SubCommandModifiers, commands::noninteractive::{NonInteractiveChildChip, non_interactive_command_with_env}, errors::{CommandError, HiveLibError}, - hive::node::{Node, Target}, + hive::node::Target, }; -pub(crate) mod builder; +pub mod builder; pub mod common; pub(crate) mod noninteractive; pub(crate) mod pty; #[derive(Copy, Clone, Debug)] -pub(crate) enum ChildOutputMode { +pub enum ChildOutputMode { Nix, Generic, Interactive, @@ -37,7 +37,7 @@ pub enum Either { } #[derive(Debug)] -pub(crate) struct CommandArguments<'t, S: AsRef> { +pub struct CommandArguments<'t, S: AsRef> { modifiers: SubCommandModifiers, target: Option<&'t Target>, output_mode: ChildOutputMode, @@ -56,7 +56,7 @@ static AHO_CORASICK: LazyLock = LazyLock::new(|| { }); impl<'a, S: AsRef> CommandArguments<'a, S> { - pub(crate) const fn new(command_string: S, modifiers: SubCommandModifiers) -> Self { + pub const fn new(command_string: S, modifiers: SubCommandModifiers) -> Self { Self { command_string, keep_stdin_open: false, @@ -68,46 +68,52 @@ impl<'a, S: AsRef> CommandArguments<'a, S> { } } - pub(crate) const fn execute_on_remote(mut self, target: Option<&'a Target>) -> Self { + #[must_use] + pub const fn execute_on_remote(mut self, target: Option<&'a Target>) -> Self { self.target = target; self } - pub(crate) const fn mode(mut self, mode: ChildOutputMode) -> Self { + #[must_use] + pub const fn mode(mut self, mode: ChildOutputMode) -> Self { self.output_mode = mode; self } - pub(crate) const fn keep_stdin_open(mut self) -> Self { + #[must_use] + pub const fn keep_stdin_open(mut self) -> Self { self.keep_stdin_open = true; self } - pub(crate) fn elevated(mut self, node: &Node) -> Self { + #[must_use] + pub fn privileged(mut self, escalation_command: &[Arc]) -> Self { self.privilege_escalation_command = - Some(node.privilege_escalation_command.iter().join(" ")); + Some(escalation_command.iter().join(" ")); self } - pub(crate) const fn is_elevated(&self) -> bool { + #[must_use] + pub const fn is_elevated(&self) -> bool { self.privilege_escalation_command.is_some() } - pub(crate) const fn log_stdout(mut self) -> Self { + #[must_use] + pub const fn log_stdout(mut self) -> Self { self.log_stdout = true; self } } -pub(crate) async fn run_command>( +pub async fn run_command>( arguments: &CommandArguments<'_, S>, ) -> Result, HiveLibError> { run_command_with_env(arguments, HashMap::new()).await } -pub(crate) async fn run_command_with_env>( +pub async fn run_command_with_env, B: BuildHasher>( arguments: &CommandArguments<'_, S>, - envs: HashMap, + envs: HashMap, ) -> Result, HiveLibError> { // use the non interactive command runner when forced // ... or when there is no reason for interactivity, local and unprivileged @@ -124,10 +130,12 @@ pub(crate) async fn run_command_with_env>( )) } -pub(crate) trait WireCommandChip { +pub trait WireCommandChip { type ExitStatus; - + + #[allow(async_fn_in_trait)] async fn wait_till_success(self) -> Result; + #[allow(async_fn_in_trait)] async fn write_stdin(&mut self, data: Vec) -> Result<(), HiveLibError>; } diff --git a/crates/core/src/commands/noninteractive.rs b/crates/core/src/commands/noninteractive.rs index 399f5d9..3fc06e7 100644 --- a/crates/core/src/commands/noninteractive.rs +++ b/crates/core/src/commands/noninteractive.rs @@ -2,9 +2,7 @@ // Copyright 2024-2025 wire Contributors use std::{ - collections::{HashMap, VecDeque}, - process::ExitStatus, - sync::Arc, + collections::{HashMap, VecDeque}, hash::BuildHasher, process::ExitStatus, sync::Arc }; use crate::{ @@ -22,7 +20,7 @@ use tokio::{ }; use tracing::{Instrument, debug, instrument, trace}; -pub(crate) struct NonInteractiveChildChip { +pub struct NonInteractiveChildChip { error_collection: Arc>>, stdout_collection: Arc>>, child: Child, @@ -32,9 +30,9 @@ pub(crate) struct NonInteractiveChildChip { } #[instrument(skip_all, name = "run", fields(elevated = %arguments.is_elevated()))] -pub(crate) fn non_interactive_command_with_env>( +pub(crate) fn non_interactive_command_with_env, B: BuildHasher>( arguments: &CommandArguments, - envs: HashMap, + envs: HashMap, ) -> Result { let mut command = if let Some(target) = arguments.target { create_sync_ssh_command(target, arguments.modifiers)? diff --git a/crates/core/src/commands/pty/mod.rs b/crates/core/src/commands/pty/mod.rs index b914e90..cc8edf0 100644 --- a/crates/core/src/commands/pty/mod.rs +++ b/crates/core/src/commands/pty/mod.rs @@ -11,6 +11,7 @@ use nix::unistd::write as posix_write; use portable_pty::{CommandBuilder, NativePtySystem, PtyPair, PtySize}; use rand::distr::Alphabetic; use std::collections::VecDeque; +use std::hash::BuildHasher; use std::io::stderr; use std::sync::{LazyLock, Mutex}; use std::{ @@ -45,7 +46,7 @@ const THREAD_QUIT_SIGNAL: &[u8; 1] = b"q"; type Child = Box; -pub(crate) struct InteractiveChildChip { +pub struct InteractiveChildChip { child: Child, cancel_stdin_pipe_w: OwnedFd, @@ -124,9 +125,9 @@ fn create_starting_segment>( } #[instrument(skip_all, name = "run-int", fields(elevated = %arguments.is_elevated(), mode = ?arguments.output_mode))] -pub(crate) async fn interactive_command_with_env>( +pub(crate) async fn interactive_command_with_env, B: BuildHasher>( arguments: &CommandArguments<'_, S>, - envs: std::collections::HashMap, + envs: std::collections::HashMap, ) -> Result { print_authenticate_warning(arguments)?; diff --git a/crates/core/src/hive/mod.rs b/crates/core/src/hive/mod.rs index 0f0fd81..eec1a54 100644 --- a/crates/core/src/hive/mod.rs +++ b/crates/core/src/hive/mod.rs @@ -29,7 +29,7 @@ pub mod steps; #[derive(Serialize, Deserialize, Debug, PartialEq)] #[serde(deny_unknown_fields)] pub struct Hive { - pub nodes: HashMap, + pub nodes: HashMap, #[serde(deserialize_with = "check_schema_version", rename = "_schema")] pub schema: u32, diff --git a/crates/core/src/hive/node.rs b/crates/core/src/hive/node.rs index 63d9c9a..1bc6733 100644 --- a/crates/core/src/hive/node.rs +++ b/crates/core/src/hive/node.rs @@ -5,29 +5,21 @@ use enum_dispatch::enum_dispatch; use gethostname::gethostname; use serde::{Deserialize, Serialize}; -use std::assert_matches::debug_assert_matches; use std::fmt::Display; use std::sync::Arc; use std::sync::atomic::AtomicBool; use tokio::sync::oneshot; -use tracing::{Instrument, Level, Span, debug, error, event, instrument, trace}; +use tracing::{debug, instrument}; use crate::commands::builder::CommandStringBuilder; use crate::commands::common::evaluate_hive_attribute; use crate::commands::{CommandArguments, WireCommandChip, run_command}; use crate::errors::NetworkError; use crate::hive::HiveLocation; -use crate::hive::steps::build::Build; -use crate::hive::steps::cleanup::CleanUp; -use crate::hive::steps::evaluate::Evaluate; -use crate::hive::steps::keys::{Key, Keys, PushKeyAgent, UploadKeyAt}; -use crate::hive::steps::ping::Ping; -use crate::hive::steps::push::{PushBuildOutput, PushEvaluatedOutput}; -use crate::status::STATUS; +use crate::hive::steps::keys::Key; use crate::{EvalGoal, StrictHostKeyChecking, SubCommandModifiers}; use super::HiveLibError; -use super::steps::activate::SwitchToConfiguration; #[derive( Serialize, Deserialize, Clone, Debug, Hash, Eq, PartialEq, PartialOrd, Ord, derive_more::Display, @@ -101,31 +93,31 @@ impl Default for Target { } } -#[cfg(test)] -impl<'a> Context<'a> { - fn create_test_context( - hive_location: HiveLocation, - name: &'a Name, - node: &'a mut Node, - ) -> Self { - Context { - name, - node, - hive_location: Arc::new(hive_location), - modifiers: SubCommandModifiers::default(), - objective: Objective::Apply(ApplyObjective { - goal: Goal::SwitchToConfiguration(SwitchToConfigurationGoal::Switch), - no_keys: false, - reboot: false, - should_apply_locally: false, - substitute_on_destination: false, - handle_unreachable: HandleUnreachable::default(), - }), - state: StepState::default(), - should_quit: Arc::new(AtomicBool::new(false)), - } - } -} +// #[cfg(test)] +// impl<'a> Context<'a> { +// fn create_test_context( +// hive_location: HiveLocation, +// name: &'a Name, +// node: &'a mut Node, +// ) -> Self { +// Context { +// name, +// node, +// hive_location: Arc::new(hive_location), +// modifiers: SubCommandModifiers::default(), +// objective: Objective::Apply(ApplyObjective { +// goal: Goal::SwitchToConfiguration(SwitchToConfigurationGoal::Switch), +// no_keys: false, +// reboot: false, +// should_apply_locally: false, +// substitute_on_destination: false, +// handle_unreachable: HandleUnreachable::default(), +// }), +// state: StepState::default(), +// should_quit: Arc::new(AtomicBool::new(false)), +// } +// } +// } impl Target { pub fn get_preferred_host(&self) -> Result<&Arc, HiveLibError> { @@ -295,13 +287,6 @@ pub enum Objective { BuildLocally, } -#[enum_dispatch] -pub(crate) trait ExecuteStep: Send + Sync + Display + std::fmt::Debug { - async fn execute(&self, ctx: &mut Context<'_>) -> Result<(), HiveLibError>; - - fn should_execute(&self, context: &Context) -> bool; -} - // may include other options such as FailAll in the future #[non_exhaustive] #[derive(Clone, Copy, Default)] @@ -319,87 +304,56 @@ pub struct StepState { pub key_agent_directory: Option, } -pub struct Context<'a> { - pub name: &'a Name, - pub node: &'a mut Node, +pub struct Context { + pub name: Name, pub hive_location: Arc, pub modifiers: SubCommandModifiers, pub state: StepState, pub should_quit: Arc, - pub objective: Objective, -} - -#[enum_dispatch(ExecuteStep)] -#[derive(Debug, PartialEq)] -enum Step { - Ping, - PushKeyAgent, - Keys, - Evaluate, - PushEvaluatedOutput, - Build, - PushBuildOutput, - SwitchToConfiguration, - CleanUp, -} - -impl Display for Step { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - match self { - Self::Ping(step) => step.fmt(f), - Self::PushKeyAgent(step) => step.fmt(f), - Self::Keys(step) => step.fmt(f), - Self::Evaluate(step) => step.fmt(f), - Self::PushEvaluatedOutput(step) => step.fmt(f), - Self::Build(step) => step.fmt(f), - Self::PushBuildOutput(step) => step.fmt(f), - Self::SwitchToConfiguration(step) => step.fmt(f), - Self::CleanUp(step) => step.fmt(f), - } - } -} - -pub struct GoalExecutor<'a> { - steps: Vec, - context: Context<'a>, } -/// returns Err if the application should shut down. -fn app_shutdown_guard(context: &Context) -> Result<(), HiveLibError> { - if context - .should_quit - .load(std::sync::atomic::Ordering::Relaxed) - { - return Err(HiveLibError::Sigint); - } - - Ok(()) +pub struct GoalExecutor { + // steps: Vec, + context: Context, } -impl<'a> GoalExecutor<'a> { +// /// returns Err if the application should shut down. +// fn app_shutdown_guard(context: &Context) -> Result<(), HiveLibError> { +// if context +// .should_quit +// .load(std::sync::atomic::Ordering::Relaxed) +// { +// return Err(HiveLibError::Sigint); +// } +// +// Ok(()) +// } + +impl<'a> GoalExecutor { #[must_use] - pub fn new(context: Context<'a>) -> Self { - Self { - steps: vec![ - Step::Ping(Ping), - Step::PushKeyAgent(PushKeyAgent), - Step::Keys(Keys { - filter: UploadKeyAt::NoFilter, - }), - Step::Keys(Keys { - filter: UploadKeyAt::PreActivation, - }), - Step::Evaluate(super::steps::evaluate::Evaluate), - Step::PushEvaluatedOutput(super::steps::push::PushEvaluatedOutput), - Step::Build(super::steps::build::Build), - Step::PushBuildOutput(super::steps::push::PushBuildOutput), - Step::SwitchToConfiguration(SwitchToConfiguration), - Step::Keys(Keys { - filter: UploadKeyAt::PostActivation, - }), - ], - context, - } + pub fn new(_context: Context) -> Self { + todo!() + // Self { + // steps: vec![ + // Step::Ping(Ping), + // Step::PushKeyAgent(PushKeyAgent), + // Step::Keys(Keys { + // filter: UploadKeyAt::NoFilter, + // }), + // Step::Keys(Keys { + // filter: UploadKeyAt::PreActivation, + // }), + // Step::Evaluate(super::steps::evaluate::Evaluate), + // Step::PushEvaluatedOutput(super::steps::push::PushEvaluatedOutput), + // Step::Build(super::steps::build::Build), + // Step::PushBuildOutput(super::steps::push::PushBuildOutput), + // Step::SwitchToConfiguration(SwitchToConfiguration), + // Step::Keys(Keys { + // filter: UploadKeyAt::PostActivation, + // }), + // ], + // context, + // } } #[instrument(skip_all, name = "eval")] @@ -422,518 +376,519 @@ impl<'a> GoalExecutor<'a> { } #[instrument(skip_all, fields(node = %self.context.name))] - pub async fn execute(mut self) -> Result<(), HiveLibError> { - app_shutdown_guard(&self.context)?; - - let (tx, rx) = oneshot::channel(); - self.context.state.evaluation_rx = Some(rx); - - // The name of this span should never be changed without updating - // `wire/cli/tracing_setup.rs` - debug_assert_matches!(Span::current().metadata().unwrap().name(), "execute"); - // This span should always have a `node` field by the same file - debug_assert!( - Span::current() - .metadata() - .unwrap() - .fields() - .field("node") - .is_some() - ); - - let spawn_evaluator = match self.context.objective { - Objective::Apply(apply_objective) => !matches!(apply_objective.goal, Goal::Keys), - Objective::BuildLocally => true, - }; - - if spawn_evaluator { - tokio::spawn( - GoalExecutor::evaluate_task( - tx, - self.context.hive_location.clone(), - self.context.name.clone(), - self.context.modifiers, - ) - .in_current_span(), - ); - } - - let steps = self - .steps - .iter() - .filter(|step| step.should_execute(&self.context)) - .inspect(|step| { - trace!("Will execute step `{step}` for {}", self.context.name); - }) - .collect::>(); - let length = steps.len(); - - for (position, step) in steps.iter().enumerate() { - app_shutdown_guard(&self.context)?; - - event!( - Level::INFO, - step = step.to_string(), - progress = format!("{}/{length}", position + 1) - ); - - STATUS - .lock() - .set_node_step(self.context.name, step.to_string()); - - if let Err(err) = step.execute(&mut self.context).await.inspect_err(|_| { - error!("Failed to execute `{step}`"); - }) { - // discard error from cleanup - let _ = CleanUp.execute(&mut self.context).await; - - if let Objective::Apply(apply_objective) = self.context.objective - && matches!(step, Step::Ping(..)) - && matches!( - apply_objective.handle_unreachable, - HandleUnreachable::Ignore, - ) - { - return Ok(()); - } - - STATUS.lock().mark_node_failed(self.context.name); - - return Err(err); - } - } - - STATUS.lock().mark_node_succeeded(self.context.name); - - Ok(()) + pub async fn execute(self) -> Result<(), HiveLibError> { + todo!() + // app_shutdown_guard(&self.context)?; + // + // let (tx, rx) = oneshot::channel(); + // self.context.state.evaluation_rx = Some(rx); + // + // // The name of this span should never be changed without updating + // // `wire/cli/tracing_setup.rs` + // debug_assert_matches!(Span::current().metadata().unwrap().name(), "execute"); + // // This span should always have a `node` field by the same file + // debug_assert!( + // Span::current() + // .metadata() + // .unwrap() + // .fields() + // .field("node") + // .is_some() + // ); + // + // let spawn_evaluator = match self.context.objective { + // Objective::Apply(apply_objective) => !matches!(apply_objective.goal, Goal::Keys), + // Objective::BuildLocally => true, + // }; + // + // if spawn_evaluator { + // tokio::spawn( + // GoalExecutor::evaluate_task( + // tx, + // self.context.hive_location.clone(), + // self.context.name.clone(), + // self.context.modifiers, + // ) + // .in_current_span(), + // ); + // } + // + // let steps = self + // .steps + // .iter() + // .filter(|step| step.should_execute(&self.context)) + // .inspect(|step| { + // trace!("Will execute step `{step}` for {}", self.context.name); + // }) + // .collect::>(); + // let length = steps.len(); + // + // for (position, step) in steps.iter().enumerate() { + // app_shutdown_guard(&self.context)?; + // + // event!( + // Level::INFO, + // step = step.to_string(), + // progress = format!("{}/{length}", position + 1) + // ); + // + // STATUS + // .lock() + // .set_node_step(self.context.name, step.to_string()); + // + // if let Err(err) = step.execute(&mut self.context).await.inspect_err(|_| { + // error!("Failed to execute `{step}`"); + // }) { + // // discard error from cleanup + // let _ = CleanUp.execute(&mut self.context).await; + // + // if let Objective::Apply(apply_objective) = self.context.objective + // && matches!(step, Step::Ping(..)) + // && matches!( + // apply_objective.handle_unreachable, + // HandleUnreachable::Ignore, + // ) + // { + // return Ok(()); + // } + // + // STATUS.lock().mark_node_failed(self.context.name); + // + // return Err(err); + // } + // } + // + // STATUS.lock().mark_node_succeeded(self.context.name); + // + // Ok(()) } } -#[cfg(test)] -mod tests { - use rand::distr::Alphabetic; - - use super::*; - use crate::{ - function_name, get_test_path, - hive::{Hive, get_hive_location}, - location, - }; - use std::{assert_matches::assert_matches, path::PathBuf}; - use std::{collections::HashMap, env}; - - fn get_steps(goal_executor: GoalExecutor) -> std::vec::Vec { - goal_executor - .steps - .into_iter() - .filter(|step| step.should_execute(&goal_executor.context)) - .collect::>() - } - - #[tokio::test] - #[cfg_attr(feature = "no_web_tests", ignore)] - async fn default_values_match() { - let mut path = get_test_path!(); - - let location = - get_hive_location(path.display().to_string(), SubCommandModifiers::default()) - .await - .unwrap(); - let hive = Hive::new_from_path(&location, None, SubCommandModifiers::default()) - .await - .unwrap(); - - let node = Node::default(); - - let mut nodes = HashMap::new(); - nodes.insert(Name("NAME".into()), node); - - path.push("hive.nix"); - - assert_eq!( - hive, - Hive { - nodes, - schema: Hive::SCHEMA_VERSION - } - ); - } - - #[tokio::test] - async fn order_build_locally() { - let location = location!(get_test_path!()); - let mut node = Node { - build_remotely: false, - ..Default::default() - }; - let name = &Name(function_name!().into()); - let executor = GoalExecutor::new(Context::create_test_context(location, name, &mut node)); - let steps = get_steps(executor); - - assert_eq!( - steps, - vec![ - Ping.into(), - PushKeyAgent.into(), - Keys { - filter: UploadKeyAt::PreActivation - } - .into(), - crate::hive::steps::evaluate::Evaluate.into(), - crate::hive::steps::build::Build.into(), - crate::hive::steps::push::PushBuildOutput.into(), - SwitchToConfiguration.into(), - Keys { - filter: UploadKeyAt::PostActivation - } - .into(), - ] - ); - } - - #[tokio::test] - async fn order_keys_only() { - let location = location!(get_test_path!()); - let mut node = Node::default(); - let name = &Name(function_name!().into()); - let mut context = Context::create_test_context(location, name, &mut node); - - let Objective::Apply(ref mut apply_objective) = context.objective else { - unreachable!() - }; - - apply_objective.goal = Goal::Keys; - - let executor = GoalExecutor::new(context); - let steps = get_steps(executor); - - assert_eq!( - steps, - vec![ - Ping.into(), - PushKeyAgent.into(), - Keys { - filter: UploadKeyAt::NoFilter - } - .into(), - ] - ); - } - - #[tokio::test] - async fn order_build() { - let location = location!(get_test_path!()); - let mut node = Node::default(); - let name = &Name(function_name!().into()); - let mut context = Context::create_test_context(location, name, &mut node); - - let Objective::Apply(ref mut apply_objective) = context.objective else { - unreachable!() - }; - apply_objective.goal = Goal::Build; - - let executor = GoalExecutor::new(context); - let steps = get_steps(executor); - - assert_eq!( - steps, - vec![ - Ping.into(), - crate::hive::steps::evaluate::Evaluate.into(), - crate::hive::steps::build::Build.into(), - crate::hive::steps::push::PushBuildOutput.into(), - ] - ); - } - - #[tokio::test] - async fn order_push_only() { - let location = location!(get_test_path!()); - let mut node = Node::default(); - let name = &Name(function_name!().into()); - let mut context = Context::create_test_context(location, name, &mut node); - - let Objective::Apply(ref mut apply_objective) = context.objective else { - unreachable!() - }; - apply_objective.goal = Goal::Push; - - let executor = GoalExecutor::new(context); - let steps = get_steps(executor); - - assert_eq!( - steps, - vec![ - Ping.into(), - crate::hive::steps::evaluate::Evaluate.into(), - crate::hive::steps::push::PushEvaluatedOutput.into(), - ] - ); - } - - #[tokio::test] - async fn order_remote_build() { - let location = location!(get_test_path!()); - let mut node = Node { - build_remotely: true, - ..Default::default() - }; - - let name = &Name(function_name!().into()); - let executor = GoalExecutor::new(Context::create_test_context(location, name, &mut node)); - let steps = get_steps(executor); - - assert_eq!( - steps, - vec![ - Ping.into(), - PushKeyAgent.into(), - Keys { - filter: UploadKeyAt::PreActivation - } - .into(), - crate::hive::steps::evaluate::Evaluate.into(), - crate::hive::steps::push::PushEvaluatedOutput.into(), - crate::hive::steps::build::Build.into(), - SwitchToConfiguration.into(), - Keys { - filter: UploadKeyAt::PostActivation - } - .into(), - ] - ); - } - - #[tokio::test] - async fn order_nokeys() { - let location = location!(get_test_path!()); - let mut node = Node::default(); - - let name = &Name(function_name!().into()); - let mut context = Context::create_test_context(location, name, &mut node); - - let Objective::Apply(ref mut apply_objective) = context.objective else { - unreachable!() - }; - apply_objective.no_keys = true; - - let executor = GoalExecutor::new(context); - let steps = get_steps(executor); - - assert_eq!( - steps, - vec![ - Ping.into(), - crate::hive::steps::evaluate::Evaluate.into(), - crate::hive::steps::build::Build.into(), - crate::hive::steps::push::PushBuildOutput.into(), - SwitchToConfiguration.into(), - ] - ); - } - - #[tokio::test] - async fn order_should_apply_locally() { - let location = location!(get_test_path!()); - let mut node = Node::default(); - - let name = &Name(function_name!().into()); - let mut context = Context::create_test_context(location, name, &mut node); - - let Objective::Apply(ref mut apply_objective) = context.objective else { - unreachable!() - }; - apply_objective.no_keys = true; - apply_objective.should_apply_locally = true; - - let executor = GoalExecutor::new(context); - let steps = get_steps(executor); - - assert_eq!( - steps, - vec![ - crate::hive::steps::evaluate::Evaluate.into(), - crate::hive::steps::build::Build.into(), - SwitchToConfiguration.into(), - ] - ); - } - - #[tokio::test] - async fn order_build_only() { - let location = location!(get_test_path!()); - let mut node = Node::default(); - - let name = &Name(function_name!().into()); - let mut context = Context::create_test_context(location, name, &mut node); - - context.objective = Objective::BuildLocally; - - let executor = GoalExecutor::new(context); - let steps = get_steps(executor); - - assert_eq!( - steps, - vec![ - crate::hive::steps::evaluate::Evaluate.into(), - crate::hive::steps::build::Build.into() - ] - ); - } - - #[test] - fn target_fails_increments() { - let mut target = Target::from_host("localhost"); - - assert_eq!(target.current_host, 0); - - for i in 0..100 { - target.host_failed(); - assert_eq!(target.current_host, i + 1); - } - } - - #[test] - fn get_preferred_host_fails() { - let mut target = Target { - hosts: vec![ - "un.reachable.1".into(), - "un.reachable.2".into(), - "un.reachable.3".into(), - "un.reachable.4".into(), - "un.reachable.5".into(), - ], - ..Default::default() - }; - - assert_ne!( - target.get_preferred_host().unwrap().to_string(), - "un.reachable.5" - ); - - for i in 1..=5 { - assert_eq!( - target.get_preferred_host().unwrap().to_string(), - format!("un.reachable.{i}") - ); - target.host_failed(); - } - - for _ in 0..5 { - assert_matches!( - target.get_preferred_host(), - Err(HiveLibError::NetworkError(NetworkError::HostsExhausted)) - ); - } - } - - #[test] - fn test_ssh_opts() { - let target = Target::from_host("hello-world"); - let subcommand_modifiers = SubCommandModifiers { - non_interactive: false, - ..Default::default() - }; - let tmp = format!( - "/tmp/{}", - rand::distr::SampleString::sample_string(&Alphabetic, &mut rand::rng(), 10) - ); - - std::fs::create_dir(&tmp).unwrap(); - - unsafe { env::set_var("XDG_RUNTIME_DIR", &tmp) } - - let args = [ - "-l".to_string(), - target.user.to_string(), - "-p".to_string(), - target.port.to_string(), - "-o".to_string(), - "StrictHostKeyChecking=accept-new".to_string(), - "-o".to_string(), - "PasswordAuthentication=no".to_string(), - "-o".to_string(), - "KbdInteractiveAuthentication=no".to_string(), - ]; - - assert_eq!( - target - .create_ssh_args(subcommand_modifiers, false, false) - .unwrap(), - args - ); - assert_eq!( - target.create_ssh_opts(subcommand_modifiers, false).unwrap(), - args.join(" ") - ); - - assert_eq!( - target - .create_ssh_args(subcommand_modifiers, false, true) - .unwrap(), - [ - "-l".to_string(), - target.user.to_string(), - "-p".to_string(), - target.port.to_string(), - "-o".to_string(), - "StrictHostKeyChecking=accept-new".to_string(), - "-o".to_string(), - "PasswordAuthentication=no".to_string(), - "-o".to_string(), - "KbdInteractiveAuthentication=no".to_string(), - ] - ); - - assert_eq!( - target - .create_ssh_args(subcommand_modifiers, true, true) - .unwrap(), - [ - "-l".to_string(), - target.user.to_string(), - "-p".to_string(), - target.port.to_string(), - "-o".to_string(), - "StrictHostKeyChecking=accept-new".to_string(), - "-o".to_string(), - "PasswordAuthentication=no".to_string(), - "-o".to_string(), - "KbdInteractiveAuthentication=no".to_string(), - ] - ); - - // forced non interactive is the same as --non-interactive - assert_eq!( - target - .create_ssh_args(subcommand_modifiers, true, false) - .unwrap(), - target - .create_ssh_args( - SubCommandModifiers { - non_interactive: true, - ..Default::default() - }, - false, - false - ) - .unwrap() - ); - } - - #[tokio::test] - async fn context_quits_sigint() { - let location = location!(get_test_path!()); - let mut node = Node::default(); - - let name = &Name(function_name!().into()); - let context = Context::create_test_context(location, name, &mut node); - context - .should_quit - .store(true, std::sync::atomic::Ordering::Relaxed); - let executor = GoalExecutor::new(context); - let status = executor.execute().await; - - assert_matches!(status, Err(HiveLibError::Sigint)); - } -} +// #[cfg(test)] +// mod tests { +// use rand::distr::Alphabetic; +// +// use super::*; +// use crate::{ +// function_name, get_test_path, +// hive::{Hive, get_hive_location}, +// location, +// }; +// use std::{assert_matches::assert_matches, path::PathBuf}; +// use std::{collections::HashMap, env}; +// +// fn get_steps(goal_executor: GoalExecutor) -> std::vec::Vec { +// goal_executor +// .steps +// .into_iter() +// .filter(|step| step.should_execute(&goal_executor.context)) +// .collect::>() +// } +// +// #[tokio::test] +// #[cfg_attr(feature = "no_web_tests", ignore)] +// async fn default_values_match() { +// let mut path = get_test_path!(); +// +// let location = +// get_hive_location(path.display().to_string(), SubCommandModifiers::default()) +// .await +// .unwrap(); +// let hive = Hive::new_from_path(&location, None, SubCommandModifiers::default()) +// .await +// .unwrap(); +// +// let node = Node::default(); +// +// let mut nodes = HashMap::new(); +// nodes.insert(Name("NAME".into()), node); +// +// path.push("hive.nix"); +// +// assert_eq!( +// hive, +// Hive { +// nodes, +// schema: Hive::SCHEMA_VERSION +// } +// ); +// } +// +// #[tokio::test] +// async fn order_build_locally() { +// let location = location!(get_test_path!()); +// let mut node = Node { +// build_remotely: false, +// ..Default::default() +// }; +// let name = &Name(function_name!().into()); +// let executor = GoalExecutor::new(Context::create_test_context(location, name, &mut node)); +// let steps = get_steps(executor); +// +// assert_eq!( +// steps, +// vec![ +// Ping.into(), +// PushKeyAgent.into(), +// Keys { +// filter: UploadKeyAt::PreActivation +// } +// .into(), +// crate::hive::steps::evaluate::Evaluate.into(), +// crate::hive::steps::build::Build.into(), +// crate::hive::steps::push::PushBuildOutput.into(), +// SwitchToConfiguration.into(), +// Keys { +// filter: UploadKeyAt::PostActivation +// } +// .into(), +// ] +// ); +// } +// +// #[tokio::test] +// async fn order_keys_only() { +// let location = location!(get_test_path!()); +// let mut node = Node::default(); +// let name = &Name(function_name!().into()); +// let mut context = Context::create_test_context(location, name, &mut node); +// +// let Objective::Apply(ref mut apply_objective) = context.objective else { +// unreachable!() +// }; +// +// apply_objective.goal = Goal::Keys; +// +// let executor = GoalExecutor::new(context); +// let steps = get_steps(executor); +// +// assert_eq!( +// steps, +// vec![ +// Ping.into(), +// PushKeyAgent.into(), +// Keys { +// filter: UploadKeyAt::NoFilter +// } +// .into(), +// ] +// ); +// } +// +// #[tokio::test] +// async fn order_build() { +// let location = location!(get_test_path!()); +// let mut node = Node::default(); +// let name = &Name(function_name!().into()); +// let mut context = Context::create_test_context(location, name, &mut node); +// +// let Objective::Apply(ref mut apply_objective) = context.objective else { +// unreachable!() +// }; +// apply_objective.goal = Goal::Build; +// +// let executor = GoalExecutor::new(context); +// let steps = get_steps(executor); +// +// assert_eq!( +// steps, +// vec![ +// Ping.into(), +// crate::hive::steps::evaluate::Evaluate.into(), +// crate::hive::steps::build::Build.into(), +// crate::hive::steps::push::PushBuildOutput.into(), +// ] +// ); +// } +// +// #[tokio::test] +// async fn order_push_only() { +// let location = location!(get_test_path!()); +// let mut node = Node::default(); +// let name = &Name(function_name!().into()); +// let mut context = Context::create_test_context(location, name, &mut node); +// +// let Objective::Apply(ref mut apply_objective) = context.objective else { +// unreachable!() +// }; +// apply_objective.goal = Goal::Push; +// +// let executor = GoalExecutor::new(context); +// let steps = get_steps(executor); +// +// assert_eq!( +// steps, +// vec![ +// Ping.into(), +// crate::hive::steps::evaluate::Evaluate.into(), +// crate::hive::steps::push::PushEvaluatedOutput.into(), +// ] +// ); +// } +// +// #[tokio::test] +// async fn order_remote_build() { +// let location = location!(get_test_path!()); +// let mut node = Node { +// build_remotely: true, +// ..Default::default() +// }; +// +// let name = &Name(function_name!().into()); +// let executor = GoalExecutor::new(Context::create_test_context(location, name, &mut node)); +// let steps = get_steps(executor); +// +// assert_eq!( +// steps, +// vec![ +// Ping.into(), +// PushKeyAgent.into(), +// Keys { +// filter: UploadKeyAt::PreActivation +// } +// .into(), +// crate::hive::steps::evaluate::Evaluate.into(), +// crate::hive::steps::push::PushEvaluatedOutput.into(), +// crate::hive::steps::build::Build.into(), +// SwitchToConfiguration.into(), +// Keys { +// filter: UploadKeyAt::PostActivation +// } +// .into(), +// ] +// ); +// } +// +// #[tokio::test] +// async fn order_nokeys() { +// let location = location!(get_test_path!()); +// let mut node = Node::default(); +// +// let name = &Name(function_name!().into()); +// let mut context = Context::create_test_context(location, name, &mut node); +// +// let Objective::Apply(ref mut apply_objective) = context.objective else { +// unreachable!() +// }; +// apply_objective.no_keys = true; +// +// let executor = GoalExecutor::new(context); +// let steps = get_steps(executor); +// +// assert_eq!( +// steps, +// vec![ +// Ping.into(), +// crate::hive::steps::evaluate::Evaluate.into(), +// crate::hive::steps::build::Build.into(), +// crate::hive::steps::push::PushBuildOutput.into(), +// SwitchToConfiguration.into(), +// ] +// ); +// } +// +// #[tokio::test] +// async fn order_should_apply_locally() { +// let location = location!(get_test_path!()); +// let mut node = Node::default(); +// +// let name = &Name(function_name!().into()); +// let mut context = Context::create_test_context(location, name, &mut node); +// +// let Objective::Apply(ref mut apply_objective) = context.objective else { +// unreachable!() +// }; +// apply_objective.no_keys = true; +// apply_objective.should_apply_locally = true; +// +// let executor = GoalExecutor::new(context); +// let steps = get_steps(executor); +// +// assert_eq!( +// steps, +// vec![ +// crate::hive::steps::evaluate::Evaluate.into(), +// crate::hive::steps::build::Build.into(), +// SwitchToConfiguration.into(), +// ] +// ); +// } +// +// #[tokio::test] +// async fn order_build_only() { +// let location = location!(get_test_path!()); +// let mut node = Node::default(); +// +// let name = &Name(function_name!().into()); +// let mut context = Context::create_test_context(location, name, &mut node); +// +// context.objective = Objective::BuildLocally; +// +// let executor = GoalExecutor::new(context); +// let steps = get_steps(executor); +// +// assert_eq!( +// steps, +// vec![ +// crate::hive::steps::evaluate::Evaluate.into(), +// crate::hive::steps::build::Build.into() +// ] +// ); +// } +// +// #[test] +// fn target_fails_increments() { +// let mut target = Target::from_host("localhost"); +// +// assert_eq!(target.current_host, 0); +// +// for i in 0..100 { +// target.host_failed(); +// assert_eq!(target.current_host, i + 1); +// } +// } +// +// #[test] +// fn get_preferred_host_fails() { +// let mut target = Target { +// hosts: vec![ +// "un.reachable.1".into(), +// "un.reachable.2".into(), +// "un.reachable.3".into(), +// "un.reachable.4".into(), +// "un.reachable.5".into(), +// ], +// ..Default::default() +// }; +// +// assert_ne!( +// target.get_preferred_host().unwrap().to_string(), +// "un.reachable.5" +// ); +// +// for i in 1..=5 { +// assert_eq!( +// target.get_preferred_host().unwrap().to_string(), +// format!("un.reachable.{i}") +// ); +// target.host_failed(); +// } +// +// for _ in 0..5 { +// assert_matches!( +// target.get_preferred_host(), +// Err(HiveLibError::NetworkError(NetworkError::HostsExhausted)) +// ); +// } +// } +// +// #[test] +// fn test_ssh_opts() { +// let target = Target::from_host("hello-world"); +// let subcommand_modifiers = SubCommandModifiers { +// non_interactive: false, +// ..Default::default() +// }; +// let tmp = format!( +// "/tmp/{}", +// rand::distr::SampleString::sample_string(&Alphabetic, &mut rand::rng(), 10) +// ); +// +// std::fs::create_dir(&tmp).unwrap(); +// +// unsafe { env::set_var("XDG_RUNTIME_DIR", &tmp) } +// +// let args = [ +// "-l".to_string(), +// target.user.to_string(), +// "-p".to_string(), +// target.port.to_string(), +// "-o".to_string(), +// "StrictHostKeyChecking=accept-new".to_string(), +// "-o".to_string(), +// "PasswordAuthentication=no".to_string(), +// "-o".to_string(), +// "KbdInteractiveAuthentication=no".to_string(), +// ]; +// +// assert_eq!( +// target +// .create_ssh_args(subcommand_modifiers, false, false) +// .unwrap(), +// args +// ); +// assert_eq!( +// target.create_ssh_opts(subcommand_modifiers, false).unwrap(), +// args.join(" ") +// ); +// +// assert_eq!( +// target +// .create_ssh_args(subcommand_modifiers, false, true) +// .unwrap(), +// [ +// "-l".to_string(), +// target.user.to_string(), +// "-p".to_string(), +// target.port.to_string(), +// "-o".to_string(), +// "StrictHostKeyChecking=accept-new".to_string(), +// "-o".to_string(), +// "PasswordAuthentication=no".to_string(), +// "-o".to_string(), +// "KbdInteractiveAuthentication=no".to_string(), +// ] +// ); +// +// assert_eq!( +// target +// .create_ssh_args(subcommand_modifiers, true, true) +// .unwrap(), +// [ +// "-l".to_string(), +// target.user.to_string(), +// "-p".to_string(), +// target.port.to_string(), +// "-o".to_string(), +// "StrictHostKeyChecking=accept-new".to_string(), +// "-o".to_string(), +// "PasswordAuthentication=no".to_string(), +// "-o".to_string(), +// "KbdInteractiveAuthentication=no".to_string(), +// ] +// ); +// +// // forced non interactive is the same as --non-interactive +// assert_eq!( +// target +// .create_ssh_args(subcommand_modifiers, true, false) +// .unwrap(), +// target +// .create_ssh_args( +// SubCommandModifiers { +// non_interactive: true, +// ..Default::default() +// }, +// false, +// false +// ) +// .unwrap() +// ); +// } +// +// #[tokio::test] +// async fn context_quits_sigint() { +// let location = location!(get_test_path!()); +// let mut node = Node::default(); +// +// let name = &Name(function_name!().into()); +// let context = Context::create_test_context(location, name, &mut node); +// context +// .should_quit +// .store(true, std::sync::atomic::Ordering::Relaxed); +// let executor = GoalExecutor::new(context); +// let status = executor.execute().await; +// +// assert_matches!(status, Err(HiveLibError::Sigint)); +// } +// } diff --git a/crates/core/src/hive/steps/activate.rs b/crates/core/src/hive/steps/activate.rs deleted file mode 100644 index 9483fdd..0000000 --- a/crates/core/src/hive/steps/activate.rs +++ /dev/null @@ -1,219 +0,0 @@ -// SPDX-License-Identifier: AGPL-3.0-or-later -// Copyright 2024-2025 wire Contributors - -use std::fmt::Display; - -use tracing::{error, info, instrument, warn}; - -use crate::{ - HiveLibError, - commands::{CommandArguments, WireCommandChip, builder::CommandStringBuilder, run_command}, - errors::{ActivationError, NetworkError}, - hive::node::{Context, ExecuteStep, Goal, Objective, SwitchToConfigurationGoal}, -}; - -#[derive(Debug, PartialEq)] -pub struct SwitchToConfiguration; - -impl Display for SwitchToConfiguration { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "switch-to-configuration") - } -} - -async fn wait_for_ping(ctx: &Context<'_>) -> Result<(), HiveLibError> { - let host = ctx.node.target.get_preferred_host()?; - let mut result = ctx.node.ping(ctx.modifiers).await; - - for num in 0..2 { - warn!("Trying to ping {host} (attempt {}/3)", num + 1); - - result = ctx.node.ping(ctx.modifiers).await; - - if result.is_ok() { - info!("Regained connection to {} via {host}", ctx.name); - - break; - } - } - - result -} - -async fn set_profile( - goal: SwitchToConfigurationGoal, - built_path: &String, - ctx: &Context<'_>, -) -> Result<(), HiveLibError> { - info!("Setting profiles in anticipation for switch-to-configuration {goal}"); - - let mut command_string = CommandStringBuilder::new("nix-env"); - command_string.args(&["-p", "/nix/var/nix/profiles/system", "--set"]); - command_string.arg(built_path); - - let Objective::Apply(apply_objective) = ctx.objective else { - unreachable!() - }; - - let child = run_command( - &CommandArguments::new(command_string, ctx.modifiers) - .mode(crate::commands::ChildOutputMode::Nix) - .execute_on_remote(if apply_objective.should_apply_locally { - None - } else { - Some(&ctx.node.target) - }) - .elevated(ctx.node), - ) - .await?; - - let _ = child - .wait_till_success() - .await - .map_err(HiveLibError::CommandError)?; - - info!("Set system profile"); - - Ok(()) -} - -impl ExecuteStep for SwitchToConfiguration { - fn should_execute(&self, ctx: &Context) -> bool { - let Objective::Apply(apply_objective) = ctx.objective else { - return false; - }; - - matches!(apply_objective.goal, Goal::SwitchToConfiguration(..)) - } - - #[allow(clippy::too_many_lines)] - #[instrument(skip_all, name = "activate")] - async fn execute(&self, ctx: &mut Context<'_>) -> Result<(), HiveLibError> { - let built_path = ctx.state.build.as_ref().unwrap(); - - let Objective::Apply(apply_objective) = ctx.objective else { - unreachable!() - }; - - let Goal::SwitchToConfiguration(goal) = &apply_objective.goal else { - unreachable!("Cannot reach as guarded by should_execute") - }; - - if matches!( - goal, - // switch profile if switch or boot - // https://github.com/NixOS/nixpkgs/blob/a2c92aa34735a04010671e3378e2aa2d109b2a72/pkgs/by-name/ni/nixos-rebuild-ng/src/nixos_rebuild/services.py#L224 - SwitchToConfigurationGoal::Switch | SwitchToConfigurationGoal::Boot - ) { - set_profile(*goal, built_path, ctx).await?; - } - - info!("Running switch-to-configuration {goal}"); - - let mut command_string = - CommandStringBuilder::new(format!("{built_path}/bin/switch-to-configuration")); - command_string.arg(match goal { - SwitchToConfigurationGoal::Switch => "switch", - SwitchToConfigurationGoal::Boot => "boot", - SwitchToConfigurationGoal::Test => "test", - SwitchToConfigurationGoal::DryActivate => "dry-activate", - }); - - let child = run_command( - &CommandArguments::new(command_string, ctx.modifiers) - .execute_on_remote(if apply_objective.should_apply_locally { - None - } else { - Some(&ctx.node.target) - }) - .elevated(ctx.node) - .log_stdout(), - ) - .await?; - - let result = child.wait_till_success().await; - - match result { - Ok(_) => { - if !apply_objective.reboot { - return Ok(()); - } - - if apply_objective.should_apply_locally { - error!("Refusing to reboot local machine!"); - - return Ok(()); - } - - warn!("Rebooting {name}!", name = ctx.name); - - let reboot = run_command( - &CommandArguments::new("reboot now", ctx.modifiers) - .log_stdout() - .execute_on_remote(Some(&ctx.node.target)) - .elevated(ctx.node), - ) - .await?; - - // consume result, impossible to know if the machine failed to reboot or we - // simply disconnected - let _ = reboot - .wait_till_success() - .await - .map_err(HiveLibError::CommandError)?; - - info!("Rebooted {name}, waiting to reconnect...", name = ctx.name); - - if wait_for_ping(ctx).await.is_ok() { - return Ok(()); - } - - error!( - "Failed to get regain connection to {name} via {host} after reboot.", - name = ctx.name, - host = ctx.node.target.get_preferred_host()? - ); - - return Err(HiveLibError::NetworkError( - NetworkError::HostUnreachableAfterReboot( - ctx.node.target.get_preferred_host()?.to_string(), - ), - )); - } - Err(error) => { - warn!( - "Activation command for {name} exited unsuccessfully.", - name = ctx.name - ); - - // Bail if the command couldn't of broken the system - // and don't try to regain connection to localhost - if matches!(goal, SwitchToConfigurationGoal::DryActivate) - || apply_objective.should_apply_locally - { - return Err(HiveLibError::ActivationError( - ActivationError::SwitchToConfigurationError(*goal, ctx.name.clone(), error), - )); - } - - if wait_for_ping(ctx).await.is_ok() { - return Err(HiveLibError::ActivationError( - ActivationError::SwitchToConfigurationError(*goal, ctx.name.clone(), error), - )); - } - - error!( - "Failed to get regain connection to {name} via {host} after {goal} activation.", - name = ctx.name, - host = ctx.node.target.get_preferred_host()? - ); - - return Err(HiveLibError::NetworkError( - NetworkError::HostUnreachableAfterReboot( - ctx.node.target.get_preferred_host()?.to_string(), - ), - )); - } - } - } -} diff --git a/crates/core/src/hive/steps/build.rs b/crates/core/src/hive/steps/build.rs deleted file mode 100644 index 1a9a356..0000000 --- a/crates/core/src/hive/steps/build.rs +++ /dev/null @@ -1,89 +0,0 @@ -// SPDX-License-Identifier: AGPL-3.0-or-later -// Copyright 2024-2025 wire Contributors - -use std::fmt::Display; - -use tracing::{info, instrument}; - -use crate::{ - HiveLibError, - commands::{ - CommandArguments, Either, WireCommandChip, builder::CommandStringBuilder, - run_command_with_env, - }, - hive::node::{Context, ExecuteStep, Goal, Objective}, -}; - -#[derive(Debug, PartialEq)] -pub struct Build; - -impl Display for Build { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "Build the node") - } -} - -impl ExecuteStep for Build { - fn should_execute(&self, ctx: &Context) -> bool { - match ctx.objective { - Objective::Apply(apply_objective) => { - !matches!(apply_objective.goal, Goal::Keys | Goal::Push) - } - Objective::BuildLocally => true, - } - } - - #[instrument(skip_all, name = "build")] - async fn execute(&self, ctx: &mut Context<'_>) -> Result<(), HiveLibError> { - let top_level = ctx.state.evaluation.as_ref().unwrap(); - - let mut command_string = CommandStringBuilder::nix(); - command_string.args(&[ - "--extra-experimental-features", - "nix-command", - "build", - "--print-build-logs", - "--no-link", - "--print-out-paths", - ]); - command_string.arg(top_level.to_string()); - - let status = run_command_with_env( - &CommandArguments::new(command_string, ctx.modifiers) - // build remotely if asked for AND we arent applying locally - .execute_on_remote( - if ctx.node.build_remotely - && let Objective::Apply(apply_objective) = ctx.objective - && !apply_objective.should_apply_locally - { - Some(&ctx.node.target) - } else { - None - }, - ) - .mode(crate::commands::ChildOutputMode::Nix) - .log_stdout(), - std::collections::HashMap::new(), - ) - .await? - .wait_till_success() - .await - .map_err(|source| HiveLibError::NixBuildError { - name: ctx.name.clone(), - source, - })?; - - let stdout = match status { - Either::Left((_, stdout)) | Either::Right((_, stdout)) => stdout, - }; - - info!("Built output: {stdout:?}"); - - // print built path to stdout - println!("{stdout}"); - - ctx.state.build = Some(stdout); - - Ok(()) - } -} diff --git a/crates/core/src/hive/steps/cleanup.rs b/crates/core/src/hive/steps/cleanup.rs deleted file mode 100644 index f8964f0..0000000 --- a/crates/core/src/hive/steps/cleanup.rs +++ /dev/null @@ -1,28 +0,0 @@ -// SPDX-License-Identifier: AGPL-3.0-or-later -// Copyright 2024-2025 wire Contributors - -use std::fmt::Display; - -use crate::{ - errors::HiveLibError, - hive::node::{Context, ExecuteStep}, -}; - -#[derive(PartialEq, Debug)] -pub(crate) struct CleanUp; - -impl Display for CleanUp { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "Clean up") - } -} - -impl ExecuteStep for CleanUp { - fn should_execute(&self, _ctx: &Context) -> bool { - false - } - - async fn execute(&self, _ctx: &mut Context<'_>) -> Result<(), HiveLibError> { - Ok(()) - } -} diff --git a/crates/core/src/hive/steps/evaluate.rs b/crates/core/src/hive/steps/evaluate.rs deleted file mode 100644 index 72b4764..0000000 --- a/crates/core/src/hive/steps/evaluate.rs +++ /dev/null @@ -1,38 +0,0 @@ -// SPDX-License-Identifier: AGPL-3.0-or-later -// Copyright 2024-2025 wire Contributors - -use std::fmt::Display; - -use tracing::instrument; - -use crate::{ - HiveLibError, - hive::node::{Context, ExecuteStep, Goal, Objective}, -}; - -#[derive(Debug, PartialEq)] -pub struct Evaluate; - -impl Display for Evaluate { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "Evaluate the node") - } -} - -impl ExecuteStep for Evaluate { - fn should_execute(&self, ctx: &Context) -> bool { - match ctx.objective { - Objective::Apply(apply_objective) => !matches!(apply_objective.goal, Goal::Keys), - Objective::BuildLocally => true, - } - } - - #[instrument(skip_all, name = "eval")] - async fn execute(&self, ctx: &mut Context<'_>) -> Result<(), HiveLibError> { - let rx = ctx.state.evaluation_rx.take().unwrap(); - - ctx.state.evaluation = Some(rx.await.unwrap()?); - - Ok(()) - } -} diff --git a/crates/core/src/hive/steps/keys.rs b/crates/core/src/hive/steps/keys.rs index e3d896b..31f443f 100644 --- a/crates/core/src/hive/steps/keys.rs +++ b/crates/core/src/hive/steps/keys.rs @@ -1,37 +1,15 @@ // SPDX-License-Identifier: AGPL-3.0-or-later // Copyright 2024-2025 wire Contributors -use base64::Engine; -use base64::prelude::BASE64_STANDARD; -use futures::future::join_all; -use im::Vector; -use itertools::{Itertools, Position}; use owo_colors::OwoColorize; -use prost::Message; use prost::bytes::BytesMut; use serde::{Deserialize, Serialize}; -use sha2::{Digest, Sha256}; -use std::env; use std::fmt::Display; -use std::io::Cursor; -use std::iter::Peekable; use std::path::PathBuf; -use std::pin::Pin; -use std::process::Stdio; -use std::str::from_utf8; -use std::vec::IntoIter; -use tokio::io::AsyncReadExt as _; -use tokio::process::Command; -use tokio::{fs::File, io::AsyncRead}; use tokio_util::codec::LengthDelimitedCodec; -use tracing::{debug, instrument}; use crate::HiveLibError; -use crate::commands::builder::CommandStringBuilder; -use crate::commands::common::push; -use crate::commands::{CommandArguments, WireCommandChip, run_command}; use crate::errors::KeyError; -use crate::hive::node::{Context, ExecuteStep, Goal, Objective, Push, SwitchToConfigurationGoal}; #[derive(Serialize, Deserialize, Clone, Debug, Eq, PartialEq, Hash)] #[serde(tag = "t", content = "c")] @@ -112,92 +90,6 @@ fn get_u32_unix_mode(key: &Key) -> Result { u32::from_str_radix(&key.permissions, 8).map_err(KeyError::ParseKeyPermissions) } -async fn create_reader(key: &'_ Key) -> Result>, KeyError> { - match &key.source { - Source::Path(path) => Ok(Box::pin(File::open(path).await.map_err(KeyError::File)?)), - Source::String(string) => Ok(Box::pin(Cursor::new(string))), - Source::Command(args) => { - let output = Command::new(args.first().ok_or(KeyError::Empty)?) - .args(&args[1..]) - .stdin(Stdio::null()) - .stdout(Stdio::piped()) - .stderr(Stdio::piped()) - .envs(key.environment.clone()) - .spawn() - .map_err(|err| KeyError::CommandSpawnError { - error: err, - command: args.join(" "), - command_span: Some((0..args.first().unwrap().len()).into()), - })? - .wait_with_output() - .await - .map_err(|err| KeyError::CommandResolveError { - error: err, - command: args.join(" "), - })?; - - if output.status.success() { - return Ok(Box::pin(Cursor::new(output.stdout))); - } - - Err(KeyError::CommandError( - output.status, - from_utf8(&output.stderr).unwrap().to_string(), - )) - } - } -} - -async fn process_key(key: &Key) -> Result<(wire_key_agent::keys::KeySpec, Vec), KeyError> { - let mut reader = create_reader(key).await?; - - let mut buf = Vec::new(); - - reader - .read_to_end(&mut buf) - .await - .expect("failed to read into buffer"); - - let destination: PathBuf = [key.dest_dir.clone(), key.name.clone()].iter().collect(); - - debug!("Staging push to {}", destination.clone().display()); - - Ok(( - wire_key_agent::keys::KeySpec { - length: buf - .len() - .try_into() - .expect("Failed to convert usize buf length to i32"), - user: key.user.clone(), - group: key.group.clone(), - unix_mode: get_u32_unix_mode(key)?, - destination: destination.into_os_string().into_string().unwrap(), - digest: Sha256::digest(&buf).to_vec(), - last: false, - }, - buf, - )) -} - -#[derive(Debug, PartialEq)] -pub struct Keys { - pub filter: UploadKeyAt, -} -#[derive(Debug, PartialEq)] -pub struct PushKeyAgent; - -impl Display for Keys { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "Upload key @ {:?}", self.filter) - } -} - -impl Display for PushKeyAgent { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "Push the key agent") - } -} - pub struct SimpleLengthDelimWriter { codec: LengthDelimitedCodec, write_fn: F, @@ -207,14 +99,14 @@ impl SimpleLengthDelimWriter where F: AsyncFnMut(Vec) -> Result<(), HiveLibError>, { - fn new(write_fn: F) -> Self { + pub fn new(write_fn: F) -> Self { Self { codec: LengthDelimitedCodec::new(), write_fn, } } - async fn send(&mut self, data: prost::bytes::Bytes) -> Result<(), HiveLibError> { + pub async fn send(&mut self, data: prost::bytes::Bytes) -> Result<(), HiveLibError> { let mut buffer = BytesMut::new(); tokio_util::codec::Encoder::encode(&mut self.codec, data, &mut buffer) .map_err(HiveLibError::Encoding)?; @@ -224,218 +116,67 @@ where } } -impl ExecuteStep for Keys { - fn should_execute(&self, ctx: &Context) -> bool { - let Objective::Apply(apply_objective) = ctx.objective else { - return false; - }; - - if apply_objective.no_keys { - return false; - } - - // should execute if no filter, and the goal is keys. - // otherwise, only execute if the goal is switch and non-nofilter - matches!( - (&self.filter, &apply_objective.goal), - (UploadKeyAt::NoFilter, Goal::Keys) - | ( - UploadKeyAt::PreActivation | UploadKeyAt::PostActivation, - Goal::SwitchToConfiguration(SwitchToConfigurationGoal::Switch) - ) - ) - } - - #[instrument(skip_all, name = "keys")] - async fn execute(&self, ctx: &mut Context<'_>) -> Result<(), HiveLibError> { - let agent_directory = ctx.state.key_agent_directory.as_ref().unwrap(); - - let mut keys = self.select_keys(&ctx.node.keys).await?; - - if keys.peek().is_none() { - debug!("Had no keys to push, ending KeyStep early."); - return Ok(()); - } - - let command_string = - CommandStringBuilder::new(format!("{agent_directory}/bin/wire-key-agent")); - - let Objective::Apply(apply_objective) = ctx.objective else { - unreachable!() - }; - - let mut child = run_command( - &CommandArguments::new(command_string, ctx.modifiers) - .execute_on_remote(if apply_objective.should_apply_locally { - None - } else { - Some(&ctx.node.target) - }) - .elevated(ctx.node) - .keep_stdin_open() - .log_stdout(), - ) - .await?; - - let mut writer = SimpleLengthDelimWriter::new(async |data| child.write_stdin(data).await); - - for (position, (mut spec, buf)) in keys.with_position() { - if matches!(position, Position::Last | Position::Only) { - spec.last = true; - } - - debug!("Writing spec & buf for {:?}", spec); - - writer - .send(BASE64_STANDARD.encode(spec.encode_to_vec()).into()) - .await?; - writer.send(BASE64_STANDARD.encode(buf).into()).await?; - } - - let status = child - .wait_till_success() - .await - .map_err(HiveLibError::CommandError)?; - - debug!("status: {status:?}"); - - Ok(()) - } -} - -impl Keys { - async fn select_keys( - &self, - keys: &Vector, - ) -> Result)>>, HiveLibError> - { - let futures = keys - .iter() - .filter(|key| self.filter == UploadKeyAt::NoFilter || (key.upload_at == self.filter)) - .map(|key| async move { - process_key(key) - .await - .map_err(|err| HiveLibError::KeyError(key.name.clone(), err)) - }); - - Ok(join_all(futures) - .await - .into_iter() - .collect::, HiveLibError>>()? - .into_iter() - .peekable()) - } -} - -impl ExecuteStep for PushKeyAgent { - fn should_execute(&self, ctx: &Context) -> bool { - let Objective::Apply(apply_objective) = ctx.objective else { - return false; - }; - - if apply_objective.no_keys { - return false; - } - - matches!( - &apply_objective.goal, - Goal::Keys | Goal::SwitchToConfiguration(SwitchToConfigurationGoal::Switch) - ) - } - - #[instrument(skip_all, name = "push_agent")] - async fn execute(&self, ctx: &mut Context<'_>) -> Result<(), HiveLibError> { - let arg_name = format!( - "WIRE_KEY_AGENT_{platform}", - platform = ctx.node.host_platform.replace('-', "_") - ); - - let agent_directory = match env::var_os(&arg_name) { - Some(agent) => agent.into_string().unwrap(), - None => panic!( - "{arg_name} environment variable not set! \n - wire was not built with the ability to deploy keys to this platform. \n - Please create an issue: https://github.com/forallsys/wire/issues/new?template=bug_report.md" - ), - }; - - let Objective::Apply(apply_objective) = ctx.objective else { - unreachable!() - }; - - if !apply_objective.should_apply_locally { - push(ctx, Push::Path(&agent_directory)).await?; - } - - ctx.state.key_agent_directory = Some(agent_directory); - - Ok(()) - } -} - #[cfg(test)] mod tests { - use im::Vector; - - use crate::hive::steps::keys::{Key, Keys, UploadKeyAt, process_key}; - - fn new_key(upload_at: &UploadKeyAt) -> Key { - Key { - upload_at: upload_at.clone(), - source: super::Source::String(match upload_at { - UploadKeyAt::PreActivation => "pre".into(), - UploadKeyAt::PostActivation => "post".into(), - UploadKeyAt::NoFilter => "none".into(), - }), - ..Default::default() - } - } - - #[tokio::test] - async fn key_filtering() { - let keys = Vector::from(vec![ - new_key(&UploadKeyAt::PreActivation), - new_key(&UploadKeyAt::PostActivation), - new_key(&UploadKeyAt::PreActivation), - new_key(&UploadKeyAt::PostActivation), - ]); - - for (_, buf) in (Keys { - filter: crate::hive::steps::keys::UploadKeyAt::PreActivation, - }) - .select_keys(&keys) - .await - .unwrap() - { - assert_eq!(String::from_utf8_lossy(&buf), "pre"); - } - - for (_, buf) in (Keys { - filter: crate::hive::steps::keys::UploadKeyAt::PostActivation, - }) - .select_keys(&keys) - .await - .unwrap() - { - assert_eq!(String::from_utf8_lossy(&buf), "post"); - } - - // test that NoFilter processes all keys. - let processed_all = - futures::future::join_all(keys.iter().map(async |x| process_key(x).await)) - .await - .iter() - .flatten() - .cloned() - .collect::>(); - let no_filter = (Keys { - filter: crate::hive::steps::keys::UploadKeyAt::NoFilter, - }) - .select_keys(&keys) - .await - .unwrap() - .collect::>(); - - assert_eq!(processed_all, no_filter); - } + // use crate::hive::steps::keys::{Key, Keys, UploadKeyAt, process_key}; + // + // fn new_key(upload_at: &UploadKeyAt) -> Key { + // Key { + // upload_at: upload_at.clone(), + // source: super::Source::String(match upload_at { + // UploadKeyAt::PreActivation => "pre".into(), + // UploadKeyAt::PostActivation => "post".into(), + // UploadKeyAt::NoFilter => "none".into(), + // }), + // ..Default::default() + // } + // } + // + // #[tokio::test] + // async fn key_filtering() { + // let keys = Vector::from(vec![ + // new_key(&UploadKeyAt::PreActivation), + // new_key(&UploadKeyAt::PostActivation), + // new_key(&UploadKeyAt::PreActivation), + // new_key(&UploadKeyAt::PostActivation), + // ]); + // + // for (_, buf) in (Keys { + // filter: crate::hive::steps::keys::UploadKeyAt::PreActivation, + // }) + // .select_keys(&keys) + // .await + // .unwrap() + // { + // assert_eq!(String::from_utf8_lossy(&buf), "pre"); + // } + // + // for (_, buf) in (Keys { + // filter: crate::hive::steps::keys::UploadKeyAt::PostActivation, + // }) + // .select_keys(&keys) + // .await + // .unwrap() + // { + // assert_eq!(String::from_utf8_lossy(&buf), "post"); + // } + // + // // test that NoFilter processes all keys. + // let processed_all = + // futures::future::join_all(keys.iter().map(async |x| process_key(x).await)) + // .await + // .iter() + // .flatten() + // .cloned() + // .collect::>(); + // let no_filter = (Keys { + // filter: crate::hive::steps::keys::UploadKeyAt::NoFilter, + // }) + // .select_keys(&keys) + // .await + // .unwrap() + // .collect::>(); + // + // assert_eq!(processed_all, no_filter); + // } } diff --git a/crates/core/src/hive/steps/mod.rs b/crates/core/src/hive/steps/mod.rs index 3fbc77e..94ac297 100644 --- a/crates/core/src/hive/steps/mod.rs +++ b/crates/core/src/hive/steps/mod.rs @@ -1,10 +1,4 @@ // SPDX-License-Identifier: AGPL-3.0-or-later // Copyright 2024-2025 wire Contributors -pub mod activate; -pub mod build; -pub mod cleanup; -pub mod evaluate; pub mod keys; -pub mod ping; -pub mod push; diff --git a/crates/core/src/hive/steps/ping.rs b/crates/core/src/hive/steps/ping.rs deleted file mode 100644 index fcf31f6..0000000 --- a/crates/core/src/hive/steps/ping.rs +++ /dev/null @@ -1,58 +0,0 @@ -// SPDX-License-Identifier: AGPL-3.0-or-later -// Copyright 2024-2025 wire Contributors - -use std::fmt::Display; - -use tracing::{Level, event, instrument}; - -use crate::{ - HiveLibError, - hive::node::{Context, ExecuteStep, Objective}, -}; - -#[derive(Debug, PartialEq)] -pub struct Ping; - -impl Display for Ping { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "Ping node") - } -} - -impl ExecuteStep for Ping { - fn should_execute(&self, ctx: &Context) -> bool { - let Objective::Apply(apply_objective) = ctx.objective else { - return false; - }; - - !apply_objective.should_apply_locally - } - - #[instrument(skip_all, name = "ping")] - async fn execute(&self, ctx: &mut Context<'_>) -> Result<(), HiveLibError> { - loop { - event!( - Level::INFO, - status = "attempting", - host = ctx.node.target.get_preferred_host()?.to_string() - ); - - if ctx.node.ping(ctx.modifiers).await.is_ok() { - event!( - Level::INFO, - status = "success", - host = ctx.node.target.get_preferred_host()?.to_string() - ); - return Ok(()); - } - - // ? will take us out if we ran out of hosts - event!( - Level::WARN, - status = "failed to ping", - host = ctx.node.target.get_preferred_host()?.to_string() - ); - ctx.node.target.host_failed(); - } - } -} diff --git a/crates/core/src/hive/steps/push.rs b/crates/core/src/hive/steps/push.rs deleted file mode 100644 index 06cfc0f..0000000 --- a/crates/core/src/hive/steps/push.rs +++ /dev/null @@ -1,84 +0,0 @@ -// SPDX-License-Identifier: AGPL-3.0-or-later -// Copyright 2024-2025 wire Contributors - -use std::fmt::Display; - -use tracing::instrument; - -use crate::{ - HiveLibError, - commands::common::push, - hive::node::{Context, ExecuteStep, Goal, Objective}, -}; - -#[derive(Debug, PartialEq)] -pub struct PushEvaluatedOutput; -#[derive(Debug, PartialEq)] -pub struct PushBuildOutput; - -impl Display for PushEvaluatedOutput { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "Push the evaluated output") - } -} - -impl Display for PushBuildOutput { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "Push the build output") - } -} - -impl ExecuteStep for PushEvaluatedOutput { - fn should_execute(&self, ctx: &Context) -> bool { - let Objective::Apply(apply_objective) = ctx.objective else { - return false; - }; - - !matches!(apply_objective.goal, Goal::Keys) - && !apply_objective.should_apply_locally - && (ctx.node.build_remotely | matches!(apply_objective.goal, Goal::Push)) - } - - #[instrument(skip_all, name = "push_eval")] - async fn execute(&self, ctx: &mut Context<'_>) -> Result<(), HiveLibError> { - let top_level = ctx.state.evaluation.as_ref().unwrap(); - - push(ctx, crate::hive::node::Push::Derivation(top_level)).await?; - - Ok(()) - } -} - -impl ExecuteStep for PushBuildOutput { - fn should_execute(&self, ctx: &Context) -> bool { - let Objective::Apply(apply_objective) = ctx.objective else { - return false; - }; - - if matches!(apply_objective.goal, Goal::Keys | Goal::Push) { - // skip if we are not building - return false; - } - - if ctx.node.build_remotely { - // skip if we are building remotely - return false; - } - - if apply_objective.should_apply_locally { - // skip step if we are applying locally - return false; - } - - true - } - - #[instrument(skip_all, name = "push_build")] - async fn execute(&self, ctx: &mut Context<'_>) -> Result<(), HiveLibError> { - let built_path = ctx.state.build.as_ref().unwrap(); - - push(ctx, crate::hive::node::Push::Path(built_path)).await?; - - Ok(()) - } -} diff --git a/crates/core/src/lib.rs b/crates/core/src/lib.rs index f1782d5..9809373 100644 --- a/crates/core/src/lib.rs +++ b/crates/core/src/lib.rs @@ -13,7 +13,11 @@ use std::{ use tokio::sync::{AcquireError, Semaphore, SemaphorePermit}; -use crate::{errors::HiveLibError, hive::node::Name, status::STATUS}; +use crate::{ + errors::HiveLibError, + hive::node::Name, + status::STATUS, +}; pub mod cache; pub mod commands; diff --git a/crates/execute/Cargo.toml b/crates/execute/Cargo.toml new file mode 100644 index 0000000..24302c1 --- /dev/null +++ b/crates/execute/Cargo.toml @@ -0,0 +1,19 @@ +[package] +name = "wire-execute" +edition.workspace = true +version.workspace = true + +[dependencies] +wire-keys = { path = "../keys" } +wire-core = { path = "../core" } +wire-key-agent = { path = "../key_agent" } +tracing.workspace = true +prost.workspace = true +itertools.workspace = true +futures-util.workspace = true +secrecy.workspace = true +base64 = { workspace = true } +enum_dispatch.workspace = true + +[lints] +workspace = true diff --git a/crates/execute/src/activate.rs b/crates/execute/src/activate.rs new file mode 100644 index 0000000..ab981d0 --- /dev/null +++ b/crates/execute/src/activate.rs @@ -0,0 +1,183 @@ +use std::sync::Arc; + +use tracing::{error, info, warn}; +use wire_core::{commands::{ChildOutputMode, CommandArguments, WireCommandChip, builder::CommandStringBuilder, run_command}, errors::{ActivationError, HiveLibError, NetworkError}, hive::node::{Context, SwitchToConfigurationGoal, Target}}; + +use crate::ExecuteStep; + +pub struct SwitchToConfiguration { + pub goal: SwitchToConfigurationGoal, + pub reboot: bool, + pub target: Option>, + pub privilege_escalation_command: Arc>> +} + +impl SwitchToConfiguration { + async fn set_profile( + &self, + ctx: &Context, + built_path: &String, + ) -> Result<(), HiveLibError> { + info!("Setting profiles in anticipation for switch-to-configuration {}", self.goal); + + let mut command_string = CommandStringBuilder::new("nix-env"); + command_string.args(&["-p", "/nix/var/nix/profiles/system", "--set"]); + command_string.arg(built_path); + + let child = run_command( + &CommandArguments::new(command_string, ctx.modifiers) + .mode(ChildOutputMode::Nix) + .execute_on_remote(self.target.as_deref()) + .privileged(&self.privilege_escalation_command), + ) + .await?; + + let _ = child + .wait_till_success() + .await + .map_err(HiveLibError::CommandError)?; + + info!("Set system profile"); + + Ok(()) + } + + async fn wait_for_ping(&self, _ctx: &Context) -> Result<(), HiveLibError> { + todo!() + // let host = ctx.node.target.get_preferred_host()?; + // let mut result = ctx.node.ping(ctx.modifiers).await; + // + // for num in 0..2 { + // warn!("Trying to ping {host} (attempt {}/3)", num + 1); + // + // result = ctx.node.ping(ctx.modifiers).await; + // + // if result.is_ok() { + // info!("Regained connection to {} via {host}", ctx.name); + // + // break; + // } + // } + // + // result + } +} + +impl ExecuteStep for SwitchToConfiguration { + async fn execute(&self, ctx: &mut Context) -> Result<(), HiveLibError> { + let built_path = ctx.state.build.as_ref().unwrap(); + + if matches!( + self.goal, + // switch profile if switch or boot + // https://github.com/NixOS/nixpkgs/blob/a2c92aa34735a04010671e3378e2aa2d109b2a72/pkgs/by-name/ni/nixos-rebuild-ng/src/nixos_rebuild/services.py#L224 + SwitchToConfigurationGoal::Switch | SwitchToConfigurationGoal::Boot + ) { + self.set_profile(ctx, built_path).await?; + } + + info!("Running switch-to-configuration {}", self.goal); + + let mut command_string = + CommandStringBuilder::new(format!("{built_path}/bin/switch-to-configuration")); + command_string.arg(match self.goal { + SwitchToConfigurationGoal::Switch => "switch", + SwitchToConfigurationGoal::Boot => "boot", + SwitchToConfigurationGoal::Test => "test", + SwitchToConfigurationGoal::DryActivate => "dry-activate", + }); + + let child = run_command( + &CommandArguments::new(command_string, ctx.modifiers) + .execute_on_remote(self.target.as_deref()) + .privileged(&self.privilege_escalation_command) + .log_stdout(), + ) + .await?; + + let result = child.wait_till_success().await; + + match result { + Ok(_) => { + if !self.reboot { + return Ok(()); + } + + let Some(ref target) = self.target else { + error!("Refusing to reboot local machine!"); + + return Ok(()); + }; + + warn!("Rebooting {name}!", name = ctx.name); + + let reboot = run_command( + &CommandArguments::new("reboot now", ctx.modifiers) + .log_stdout() + .execute_on_remote(self.target.as_deref()) + .privileged(&self.privilege_escalation_command), + ) + .await?; + + // consume result, impossible to know if the machine failed to reboot or we + // simply disconnected + let _ = reboot + .wait_till_success() + .await + .map_err(HiveLibError::CommandError)?; + + info!("Rebooted {name}, waiting to reconnect...", name = ctx.name); + + if self.wait_for_ping(ctx).await.is_ok() { + return Ok(()); + } + + error!( + "Failed to get regain connection to {name} via {host} after reboot.", + name = ctx.name, + host = target.get_preferred_host()? + ); + + return Err(HiveLibError::NetworkError( + NetworkError::HostUnreachableAfterReboot( + target.get_preferred_host()?.to_string(), + ), + )); + } + Err(error) => { + warn!( + "Activation command for {name} exited unsuccessfully.", + name = ctx.name + ); + + // Bail if the command couldn't of broken the system + // and don't try to regain connection to localhost + let Some(target) = self.target.clone().filter(|_| !matches!(self.goal, SwitchToConfigurationGoal::DryActivate)) else { + return Err(HiveLibError::ActivationError( + ActivationError::SwitchToConfigurationError(self.goal, ctx.name.clone(), error), + )); + }; + + if self.wait_for_ping(ctx).await.is_ok() { + return Err(HiveLibError::ActivationError( + ActivationError::SwitchToConfigurationError(self.goal, ctx.name.clone(), error), + )); + } + + error!( + "Failed to get regain connection to {name} via {host} after {goal} activation.", + name = ctx.name, + host = target.get_preferred_host()?, + goal = self.goal + ); + + return Err(HiveLibError::NetworkError( + NetworkError::HostUnreachableAfterReboot( + target.get_preferred_host()?.to_string(), + ), + )); + } + } + } +} + diff --git a/crates/execute/src/lib.rs b/crates/execute/src/lib.rs new file mode 100644 index 0000000..411f7ae --- /dev/null +++ b/crates/execute/src/lib.rs @@ -0,0 +1,246 @@ +use base64::Engine; +use wire_core::commands::ChildOutputMode; +use prost::Message; +use futures_util::future::join_all; +use itertools::Itertools; +use base64::prelude::BASE64_STANDARD; +use itertools::Position; +use enum_dispatch::enum_dispatch; +use secrecy::{ExposeSecret, SecretSlice}; +use wire_core::commands::builder::CommandStringBuilder; +use wire_core::hive::node::Target; +use wire_core::hive::steps::keys::SimpleLengthDelimWriter; +use wire_core::commands::{CommandArguments, Either, WireCommandChip, run_command, run_command_with_env}; +use wire_key_agent::keys::KeySpec; +use std::{env, sync::Arc}; +use tracing::{debug, info}; +use wire_core::{ + commands::common::push, + errors::HiveLibError, + hive::node::{Context, Push}, +}; +use wire_keys::Key; + +pub mod activate; + +#[enum_dispatch] +pub trait ExecuteStep { + #[allow(async_fn_in_trait)] + async fn execute(&self, ctx: &mut Context) -> Result<(), HiveLibError>; +} + +pub struct Ping; +pub struct PushKeyAgent { + pub substitute_on_destination: bool, + pub host_platform: Arc, + pub target: Arc, +} +pub struct Keys { + pub keys: Vec>, + pub target: Option>, + pub privilege_escalation_command: Arc>> +} +pub struct Evaluate; +pub struct PushEvaluatedOutput { + pub substitute_on_destination: bool, + pub target: Arc, +} +pub struct Build { + pub target: Option>, +} +pub struct PushBuildOutput { + pub substitute_on_destination: bool, + pub target: Arc, +} + +impl ExecuteStep for Ping { + async fn execute(&self, _ctx: &mut Context) -> Result<(), HiveLibError> { + loop { + todo!() + // event!( + // Level::INFO, + // status = "attempting", + // host = ctx.node.target.get_preferred_host()?.to_string() + // ); + // + // if ctx.node.ping(ctx.modifiers).await.is_ok() { + // event!( + // Level::INFO, + // status = "success", + // host = ctx.node.target.get_preferred_host()?.to_string() + // ); + // return Ok(()); + // } + // + // // ? will take us out if we ran out of hosts + // event!( + // Level::WARN, + // status = "failed to ping", + // host = ctx.node.target.get_preferred_host()?.to_string() + // ); + // ctx.node.target.host_failed(); + } + } +} + +impl ExecuteStep for PushKeyAgent { + async fn execute(&self, ctx: &mut Context) -> Result<(), HiveLibError> { + let arg_name = format!( + "WIRE_KEY_AGENT_{platform}", + platform = self.host_platform.replace('-', "_") + ); + + let agent_directory = match env::var_os(&arg_name) { + Some(agent) => agent.into_string().unwrap(), + None => panic!( + "{arg_name} environment variable not set! \n + wire was not built with the ability to deploy keys to this platform. \n + Please create an issue: https://github.com/forallsys/wire/issues/new?template=bug_report.md" + ), + }; + + push(ctx, &self.target, Push::Path(&agent_directory), self.substitute_on_destination).await?; + + ctx.state.key_agent_directory = Some(agent_directory); + + Ok(()) + } +} + +impl Keys { + async fn select_keys(&self) -> Result)>, HiveLibError>{ + let futures = self.keys + .iter() + .map(|key| async move { + key.read() + .await + .map_err(|err| HiveLibError::KeyError(key.name.clone(), err)) + }); + + Ok(join_all(futures) + .await + .into_iter() + .collect::, HiveLibError>>()? + .into_iter() + .peekable()) + } +} + +impl ExecuteStep for Keys { + async fn execute(&self, ctx: &mut Context) -> Result<(), HiveLibError> { + let agent_directory = ctx.state.key_agent_directory.as_ref().unwrap(); + + let command_string = + CommandStringBuilder::new(format!("{agent_directory}/bin/wire-key-agent")); + + let mut child = run_command( + &CommandArguments::new(command_string, ctx.modifiers) + .execute_on_remote(self.target.as_deref()) + .privileged(&self.privilege_escalation_command) + .keep_stdin_open() + .log_stdout(), + ) + .await?; + + let mut writer = SimpleLengthDelimWriter::new(async |data| child.write_stdin(data).await); + + let keys = self.select_keys().await?; + + for (position, (mut spec, buf)) in keys.with_position() { + if matches!(position, Position::Last | Position::Only) { + spec.last = true; + } + + debug!("Writing spec & buf for {:?}", spec); + + writer + .send(BASE64_STANDARD.encode(spec.encode_to_vec()).into()) + .await?; + writer.send(BASE64_STANDARD.encode(buf.expose_secret()).into()).await?; + } + + let status = child + .wait_till_success() + .await + .map_err(HiveLibError::CommandError)?; + + debug!("status: {status:?}"); + + Ok(()) + } +} + +impl ExecuteStep for Evaluate { + async fn execute(&self, ctx: &mut Context) -> Result<(), HiveLibError> { + let rx = ctx.state.evaluation_rx.take().unwrap(); + + ctx.state.evaluation = Some(rx.await.unwrap()?); + + Ok(()) + } +} + +impl ExecuteStep for PushEvaluatedOutput { + async fn execute(&self, ctx: &mut Context) -> Result<(), HiveLibError> { + let top_level = ctx.state.evaluation.as_ref().unwrap(); + + push(ctx, &self.target, Push::Derivation(top_level), self.substitute_on_destination).await?; + + Ok(()) + } +} + +impl ExecuteStep for Build { + async fn execute(&self, ctx: &mut Context) -> Result<(), HiveLibError> { + let top_level = ctx.state.evaluation.as_ref().unwrap(); + + let mut command_string = CommandStringBuilder::nix(); + command_string.args(&[ + "--extra-experimental-features", + "nix-command", + "build", + "--print-build-logs", + "--no-link", + "--print-out-paths", + ]); + command_string.arg(top_level.to_string()); + + let status = run_command_with_env( + &CommandArguments::new(command_string, ctx.modifiers) + .execute_on_remote(self.target.as_deref()) + .mode(ChildOutputMode::Nix) + .log_stdout(), + std::collections::HashMap::new(), + ) + .await? + .wait_till_success() + .await + .map_err(|source| HiveLibError::NixBuildError { + name: ctx.name.clone(), + source, + })?; + + let stdout = match status { + Either::Left((_, stdout)) | Either::Right((_, stdout)) => stdout, + }; + + info!("Built output: {stdout:?}"); + + // print built path to stdout + println!("{stdout}"); + + ctx.state.build = Some(stdout); + + Ok(()) + } +} + +impl ExecuteStep for PushBuildOutput { + async fn execute(&self, ctx: &mut Context) -> Result<(), HiveLibError> { + let built_path = ctx.state.build.as_ref().unwrap(); + + push(ctx, &self.target, Push::Path(built_path), self.substitute_on_destination).await?; + + Ok(()) + } +} diff --git a/crates/keys/Cargo.toml b/crates/keys/Cargo.toml new file mode 100644 index 0000000..de19141 --- /dev/null +++ b/crates/keys/Cargo.toml @@ -0,0 +1,17 @@ +[package] +name = "wire-keys" +edition.workspace = true +version.workspace = true + +[dependencies] +wire-core = { path = "../core" } +wire-key-agent = { path = "../key_agent" } +serde.workspace = true +tokio.workspace = true +im = { workspace = true } +sha2 = { workspace = true } +either = "1.15.0" +secrecy.workspace = true + +[lints] +workspace = true diff --git a/crates/keys/src/lib.rs b/crates/keys/src/lib.rs new file mode 100644 index 0000000..8264e8a --- /dev/null +++ b/crates/keys/src/lib.rs @@ -0,0 +1,164 @@ +use secrecy::{ExposeSecret, SecretSlice}; +use serde::{Deserialize, Serialize}; +use sha2::{Digest, Sha256}; +use std::{ + io::Cursor, path::PathBuf, pin::Pin, process::Stdio, str::from_utf8, +}; +use tokio::{ + fs::File, + io::{AsyncRead, AsyncReadExt}, + process::Command, +}; +use wire_core::{ + errors::KeyError, + hive::steps::keys::{Source, UploadKeyAt}, +}; + +#[derive(Serialize, Deserialize, Debug, Eq, PartialEq, Hash)] +pub struct Key { + pub name: String, + #[serde(rename = "destDir")] + pub dest_dir: String, + pub path: PathBuf, + pub group: String, + pub user: String, + pub permissions: String, + pub source: Source, + #[serde(rename = "uploadAt")] + pub upload_at: UploadKeyAt, + #[serde(default)] + pub environment: im::HashMap, +} + +// /// what nodes hold in execution +// #[derive(Debug)] +// pub struct StoredKey { +// pub key: Key, +// +// data: Arc>, +// } +// +// #[derive(Default)] +// pub struct KeyStore { +// // keys: HashSet>, +// // keys: HashMap, Option>>>, +// +// cache: DashMap>>> +// } + +// impl KeyStore { +// #[must_use] +// pub fn new() -> Self { +// Self { cache: DashMap::new() } +// } +// +// // pub fn insert(&mut self, key: Key) -> Arc { +// // let key = Arc::new(key); +// // +// // self.keys.entry(key.clone()).or_insert(None); +// // +// // if self.keys.contains_key(&key) { +// // return self.keys.get_key_value(&key).unwrap().0.clone() +// // } +// // +// // let value = Arc::new(key); +// // self.keys.insert(value.clone(), None); +// // +// // value +// // } +// // +// // pub async fn read(&mut self, key: Arc) -> Result<&Vec, KeyError> { +// // if let Some(Some(value)) = self.keys.get(&key) { +// // return Ok(value); +// // } +// // +// // let mut buf = Vec::new(); +// // +// // let mut reader = key.create_reader().await?; +// // +// // reader +// // .read_to_end(&mut buf) +// // .await +// // .expect("failed to read into buffer"); +// // +// // drop(reader); +// // +// // self.keys.insert(key.clone(), Some(buf)); +// // +// // Ok(self.keys.get(&key).unwrap().as_ref().unwrap()) +// // +// // } +// } + +impl Key { + async fn create_reader(&self) -> Result>, KeyError> { + match &self.source { + Source::Path(path) => Ok(Box::pin(File::open(path).await.map_err(KeyError::File)?)), + Source::String(string) => Ok(Box::pin(Cursor::new(string))), + Source::Command(args) => { + let output = Command::new(args.first().ok_or(KeyError::Empty)?) + .args(&args[1..]) + .stdin(Stdio::null()) + .stdout(Stdio::piped()) + .stderr(Stdio::piped()) + .envs(self.environment.clone()) + .spawn() + .map_err(|err| KeyError::CommandSpawnError { + error: err, + command: args.join(" "), + command_span: Some((0..args.first().unwrap().len()).into()), + })? + .wait_with_output() + .await + .map_err(|err| KeyError::CommandResolveError { + error: err, + command: args.join(" "), + })?; + + if output.status.success() { + return Ok(Box::pin(Cursor::new(output.stdout))); + } + + Err(KeyError::CommandError( + output.status, + from_utf8(&output.stderr).unwrap().to_string(), + )) + } + } + } + + fn get_u32_unix_mode(&self) -> Result { + u32::from_str_radix(&self.permissions, 8).map_err(KeyError::ParseKeyPermissions) + } + + + pub async fn read(&self) -> Result<(wire_key_agent::keys::KeySpec, SecretSlice), KeyError> { + let mut buf = Vec::new(); + let mut reader = self.create_reader().await?; + + reader + .read_to_end(&mut buf) + .await + .expect("failed to read into buffer"); + + let buf = SecretSlice::from(buf); + + let destination: PathBuf = [self.dest_dir.clone(), self.name.clone()].iter().collect(); + + Ok(( + wire_key_agent::keys::KeySpec { + length: buf + .expose_secret() + .len() + .try_into() + .expect("Failed to convert usize buf length to i32"), + user: self.user.clone(), + group: self.group.clone(), + unix_mode: self.get_u32_unix_mode()?, + destination: destination.into_os_string().into_string().unwrap(), + digest: Sha256::digest(buf.expose_secret()).to_vec(), + last: false, + }, + buf)) + } +} diff --git a/crates/plan/Cargo.toml b/crates/plan/Cargo.toml new file mode 100644 index 0000000..2bb8086 --- /dev/null +++ b/crates/plan/Cargo.toml @@ -0,0 +1,16 @@ +[package] +name = "wire-plan" +edition.workspace = true +version.workspace = true + +[dependencies] +wire-core = { path = "../core" } +wire-keys = { path = "../keys" } +wire-execute = { path = "../execute" } +im = { workspace = true } +tokio = { workspace = true } +serde = { workspace = true } +enum_dispatch = { workspace = true } + +[lints] +workspace = true diff --git a/crates/plan/src/lib.rs b/crates/plan/src/lib.rs new file mode 100644 index 0000000..fe0c011 --- /dev/null +++ b/crates/plan/src/lib.rs @@ -0,0 +1,463 @@ +use std::{collections::HashMap, sync::{Arc, atomic::AtomicBool}}; + +use enum_dispatch::enum_dispatch; +use serde::{Deserialize, Serialize}; +use wire_core::{ + SubCommandModifiers, hive::{ + HiveLocation, node::{Context, Name, StepState, SwitchToConfigurationGoal, Target}, steps::keys::UploadKeyAt + } +}; +use wire_execute::{ + Build, Evaluate, ExecuteStep, Keys, Ping, PushBuildOutput, PushEvaluatedOutput, PushKeyAgent, + activate::SwitchToConfiguration, +}; +use wire_keys::{Key}; + +#[derive(Serialize, Deserialize, Debug, Eq, PartialEq, Hash)] +pub struct NodeRepr { + #[serde(rename = "target")] + pub target: Target, + + #[serde(rename = "buildOnTarget")] + pub build_remotely: bool, + + #[serde(rename = "allowLocalDeployment")] + pub allow_local_deployment: bool, + + #[serde(default)] + pub tags: im::HashSet, + + #[serde(rename(deserialize = "_keys", serialize = "keys"))] + pub keys: Vec, + + #[serde(rename(deserialize = "_hostPlatform", serialize = "host_platform"))] + pub host_platform: Arc, + + #[serde(rename( + deserialize = "privilegeEscalationCommand", + serialize = "privilege_escalation_command" + ))] + pub privilege_escalation_command: Vec>, +} + +pub struct Node { + pub target: Arc, + pub build_remotely: bool, + pub allow_local_deployment: bool, + pub tags: im::HashSet, + pub keys: Vec>, + pub host_platform: Arc, + pub privilege_escalation_command: Arc>>, +} + +pub enum ApplyGoal { + SwitchToConfiguration(SwitchToConfigurationGoal), + Push, + Build, + Keys, +} + +pub enum Goal { + Apply { + goal: ApplyGoal, + should_apply_locally: bool, + no_keys: bool, + substitute_on_destination: bool, + reboot: bool, + host_platform: Arc + }, + Build, +} + +#[enum_dispatch(ExecuteStep)] +enum Step { + Ping, + PushKeyAgent, + Keys, + Evaluate, + PushEvaluatedOutput, + Build, + PushBuildOutput, + SwitchToConfiguration, +} + +struct NodePlan { + context: Context, + steps: Vec, + node: Node, +} + +pub fn create_plans<'a>(nodes: HashMap, goal: &'_ Goal, hive_location: Arc, modifiers: &SubCommandModifiers, should_quit: Arc) -> Vec { + // let mut key_store = KeyStore::default(); + let mut plans = Vec::new(); + + for (name, node) in nodes { + let mut keys = Vec::with_capacity(node.keys.len()); + + // for key in node.keys { + // let key = key_store.insert(key); + // keys.push(key); + // } + + let plan = plan_for_node( + Node { + target: Arc::new(node.target), + build_remotely: node.build_remotely, + allow_local_deployment: node.allow_local_deployment, + tags: node.tags, + host_platform: node.host_platform, + privilege_escalation_command: Arc::new(node.privilege_escalation_command), + keys, + }, + name, + goal, + hive_location.clone(), + modifiers, + should_quit.clone() + ); + + // plans.push(plan); + } + + plans +} + +pub fn plan_for_node<'a>(node: Node, name: Name, goal: &'_ Goal, hive_location: Arc, modifiers: &SubCommandModifiers, should_quit: Arc) -> NodePlan { + match goal { + Goal::Build => NodePlan { + context: Context { + state: StepState::default(), + modifiers: *modifiers, + name, + hive_location, + should_quit + }, + steps: vec![ + Step::Evaluate(Evaluate), + Step::Build(Build { target: None }), + ], + node, + }, + Goal::Apply { + goal, + should_apply_locally, + no_keys, + substitute_on_destination, + reboot, + host_platform + } => { + let mut steps: Vec = Vec::new(); + + if !*should_apply_locally { + steps.push(Step::Ping(Ping)); + } + + if !*no_keys + && matches!( + &goal, + ApplyGoal::Keys + | ApplyGoal::SwitchToConfiguration(SwitchToConfigurationGoal::Switch) + ) + { + if !*should_apply_locally { + steps.push(Step::PushKeyAgent(PushKeyAgent { + substitute_on_destination: *substitute_on_destination, + host_platform: host_platform.clone(), + target: node.target.clone() + })); + } + + let keys = match goal { + ApplyGoal::SwitchToConfiguration(SwitchToConfigurationGoal::Switch) => node + .keys + .iter() + .filter(|x| matches!(x.upload_at, UploadKeyAt::PreActivation)) + .cloned() + .collect(), + ApplyGoal::Keys => node.keys.clone(), + _ => unreachable!(), + }; + + if !keys.is_empty() { + steps.push(Step::Keys(Keys { + keys: node.keys.clone(), + target: if *should_apply_locally { + Some(node.target.clone()) + } else { + None + }, + privilege_escalation_command: node.privilege_escalation_command.clone() + })); + } + } + + steps.push(Step::Evaluate(Evaluate)); + + if !matches!(goal, ApplyGoal::Keys) + && !should_apply_locally + && (node.build_remotely | matches!(goal, ApplyGoal::Push)) + { + steps.push(Step::PushEvaluatedOutput(PushEvaluatedOutput { + substitute_on_destination: *substitute_on_destination, + target: node.target.clone() + })); + } + + if !matches!(goal, ApplyGoal::Keys | ApplyGoal::Push) { + steps.push(Step::Build(Build { + target: if node.build_remotely && !*should_apply_locally { + Some(node.target.clone()) + } else { None } + })); + } + + if !node.build_remotely + && !should_apply_locally + && !matches!(goal, ApplyGoal::Keys | ApplyGoal::Push) + { + steps.push(Step::PushBuildOutput(PushBuildOutput { + substitute_on_destination: *substitute_on_destination, + target: node.target.clone() + })); + } + + if let ApplyGoal::SwitchToConfiguration(goal) = goal { + steps.push(Step::SwitchToConfiguration(SwitchToConfiguration { + goal: *goal, + reboot: *reboot, + target: if *should_apply_locally { + Some(node.target.clone()) + } else { None }, + privilege_escalation_command: node.privilege_escalation_command.clone() + })); + } + + NodePlan { + context: Context { + state: StepState::default(), + name, + hive_location, + modifiers: *modifiers, + should_quit + }, + steps, + node, + } + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + // fn get_steps(goal_executor: GoalExecutor) -> std::vec::Vec { + // goal_executor + // .steps + // .into_iter() + // .filter(|step| step.should_execute(&goal_executor.context)) + // .collect::>() + // } + + // #[tokio::test] + // async fn order_build_locally() { + // // let location = location!(get_test_path!()); + // // let mut node = Node { + // // build_remotely: false, + // // ..Default::default() + // // }; + // // let name = &Name(function_name!().into()); + // // let executor = GoalExecutor::new(Context::create_test_context(location, name, &mut node)); + // // let steps = get_steps(executor); + // + // let plan = plan_for_node(Node { + // build_remotely: false, + // ..Default::default() + // }, Name("".into()), &Goal::Build); + // + // assert_eq!( + // plan.steps, + // vec![ + // Step::Ping, + // Step::PushKeyAgent, + // Step::Keys, + // crate::hive::steps::evaluate::Evaluate.into(), + // crate::hive::steps::build::Build.into(), + // crate::hive::steps::push::PushBuildOutput.into(), + // SwitchToConfiguration.into(), + // Keys { + // filter: UploadKeyAt::PostActivation + // } + // .into(), + // ] + // ); + // } + + // #[tokio::test] + // async fn order_keys_only() { + // let location = location!(get_test_path!()); + // let mut node = Node::default(); + // let name = &Name(function_name!().into()); + // let mut context = Context::create_test_context(location, name, &mut node); + // + // let Objective::Apply(ref mut apply_objective) = context.objective else { + // unreachable!() + // }; + // + // apply_objective.goal = Goal::Keys; + // + // let executor = GoalExecutor::new(context); + // let steps = get_steps(executor); + // + // assert_eq!( + // steps, + // vec![ + // Ping.into(), + // PushKeyAgent.into(), + // Keys { + // filter: UploadKeyAt::NoFilter + // } + // .into(), + // ] + // ); + // } + // + // #[tokio::test] + // async fn order_build() { + // let location = location!(get_test_path!()); + // let mut node = Node::default(); + // let name = &Name(function_name!().into()); + // let mut context = Context::create_test_context(location, name, &mut node); + // + // let Objective::Apply(ref mut apply_objective) = context.objective else { + // unreachable!() + // }; + // apply_objective.goal = Goal::Build; + // + // let executor = GoalExecutor::new(context); + // let steps = get_steps(executor); + // + // assert_eq!( + // steps, + // vec![ + // Ping.into(), + // crate::hive::steps::evaluate::Evaluate.into(), + // crate::hive::steps::build::Build.into(), + // crate::hive::steps::push::PushBuildOutput.into(), + // ] + // ); + // } + // + // #[tokio::test] + // async fn order_push_only() { + // let location = location!(get_test_path!()); + // let mut node = Node::default(); + // let name = &Name(function_name!().into()); + // let mut context = Context::create_test_context(location, name, &mut node); + // + // let Objective::Apply(ref mut apply_objective) = context.objective else { + // unreachable!() + // }; + // apply_objective.goal = Goal::Push; + // + // let executor = GoalExecutor::new(context); + // let steps = get_steps(executor); + // + // assert_eq!( + // steps, + // vec![ + // Ping.into(), + // crate::hive::steps::evaluate::Evaluate.into(), + // crate::hive::steps::push::PushEvaluatedOutput.into(), + // ] + // ); + // } + // + // #[tokio::test] + // async fn order_remote_build() { + // let location = location!(get_test_path!()); + // let mut node = Node { + // build_remotely: true, + // ..Default::default() + // }; + // + // let name = &Name(function_name!().into()); + // let executor = GoalExecutor::new(Context::create_test_context(location, name, &mut node)); + // let steps = get_steps(executor); + // + // assert_eq!( + // steps, + // vec![ + // Ping.into(), + // PushKeyAgent.into(), + // Keys { + // filter: UploadKeyAt::PreActivation + // } + // .into(), + // crate::hive::steps::evaluate::Evaluate.into(), + // crate::hive::steps::push::PushEvaluatedOutput.into(), + // crate::hive::steps::build::Build.into(), + // SwitchToConfiguration.into(), + // Keys { + // filter: UploadKeyAt::PostActivation + // } + // .into(), + // ] + // ); + // } + // + // #[tokio::test] + // async fn order_nokeys() { + // let location = location!(get_test_path!()); + // let mut node = Node::default(); + // + // let name = &Name(function_name!().into()); + // let mut context = Context::create_test_context(location, name, &mut node); + // + // let Objective::Apply(ref mut apply_objective) = context.objective else { + // unreachable!() + // }; + // apply_objective.no_keys = true; + // + // let executor = GoalExecutor::new(context); + // let steps = get_steps(executor); + // + // assert_eq!( + // steps, + // vec![ + // Ping.into(), + // crate::hive::steps::evaluate::Evaluate.into(), + // crate::hive::steps::build::Build.into(), + // crate::hive::steps::push::PushBuildOutput.into(), + // SwitchToConfiguration.into(), + // ] + // ); + // } + // + // #[tokio::test] + // async fn order_should_apply_locally() { + // let location = location!(get_test_path!()); + // let mut node = Node::default(); + // + // let name = &Name(function_name!().into()); + // let mut context = Context::create_test_context(location, name, &mut node); + // + // let Objective::Apply(ref mut apply_objective) = context.objective else { + // unreachable!() + // }; + // apply_objective.no_keys = true; + // apply_objective.should_apply_locally = true; + // + // let executor = GoalExecutor::new(context); + // let steps = get_steps(executor); + // + // assert_eq!( + // steps, + // vec![ + // crate::hive::steps::evaluate::Evaluate.into(), + // crate::hive::steps::build::Build.into(), + // SwitchToConfiguration.into(), + // ] + // ); + // } +}