diff --git a/__pycache__/pylib.cpython-38.pyc b/__pycache__/pylib.cpython-38.pyc index 10fc8c9..7cf33e3 100644 Binary files a/__pycache__/pylib.cpython-38.pyc and b/__pycache__/pylib.cpython-38.pyc differ diff --git a/bencher/.gitignore b/bencher/.gitignore new file mode 100644 index 0000000..3f197ab --- /dev/null +++ b/bencher/.gitignore @@ -0,0 +1 @@ +prepare_data \ No newline at end of file diff --git a/bencher/Cargo.lock b/bencher/Cargo.lock index 292a616..618c7c6 100644 --- a/bencher/Cargo.lock +++ b/bencher/Cargo.lock @@ -189,6 +189,7 @@ dependencies = [ "base64 0.13.1", "clap", "enum_dispatch", + "futures", "goose", "image", "interprocess", diff --git a/bencher/Cargo.toml b/bencher/Cargo.toml index 922daed..e5826ce 100644 --- a/bencher/Cargo.toml +++ b/bencher/Cargo.toml @@ -7,19 +7,20 @@ edition = "2021" [dependencies] goose = "^0.16.4" -tokio = {version="1.1",features=["process"]} +tokio = { version = "1.1", features = ["process"] } clap = { version = "4.5.7", features = ["derive"] } image = "0.24.1" rand = "0.8.5" serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" +serde_yaml = "0.9" base64 = "0.13" rust-s3 = "0.34.0" lazy_static = "1.4.0" -reqwest = { version="0.12.5", features = ["json"]} +reqwest = { version = "0.12.5", features = ["json"] } async-trait = "0.1.74" enum_dispatch = "0.3.13" -serde_yaml = "0.9" interprocess = "2.2.0" tracing = "0.1.40" -tracing-subscriber = "0.3" \ No newline at end of file +tracing-subscriber = "0.3" +futures = "0.3.30" diff --git a/bencher/app_fn_entries.yaml b/bencher/app_fn_entries.yaml new file mode 100644 index 0000000..3fa03e9 --- /dev/null +++ b/bencher/app_fn_entries.yaml @@ -0,0 +1,134 @@ +minio: + endpoint: "http://192.168.31.54:9009" + access_key: "minioadmin" + secret_key: "minioadmin123" + compose_path: "../middlewares/minio/" + +models: + simple_demo: + # name: "Simple Demo" + # description: "A simple demo function" + # language: "java" + # runtime: "java11" + # handler: "test.functions.Simple" + # memory: 256 + # timeout: 30 + prepare_data: [] + prepare_scripts: [] + fns: + simple: + args: {} + + img_resize: + fns: + resize: + args: + use_minio: true + image_s3_path: "image_to_resize.png" + target_width: 50 + target_height: 50 + prepare_data: ["image_to_resize.png"] + prepare_scripts: + - | + from PIL import Image, ImageDraw + import random + + def generate_random_image(width, height, output_path): + # 创建一个空白图像 + image = Image.new('RGB', (width, height), 'white') + draw = ImageDraw.Draw(image) + + # 生成随机颜色的像素 + for x in range(width): + for y in range(height): + r = random.randint(0, 255) + g = random.randint(0, 255) + b = random.randint(0, 255) + draw.point((x, y), fill=(r, g, b)) + + # 保存图像 + image.save(output_path) + print(f"Image saved to {output_path}") + + # 指定图片的宽度和高度 + width = 800 + height = 600 + output_path = 'image_to_resize.png' + + # 生成随机图片 + generate_random_image(width, height, output_path) + + word_count: + prepare_data: + - large_text_file.txt + prepare_scripts: + - | + import os + + def generate_paragraph(): + paragraph = """ + In a far-off land, nestled among the mountains, lies a small village known for its unique culture and friendly inhabitants. Each spring, the villagers hold a grand festival to celebrate the arrival of the flowering season. Visitors from all corners gather to enjoy this beautiful moment. During the festival, people don traditional attire and dance joyfully, filling the village with laughter and cheer. + """ + return paragraph + + def generate_large_text_file(file_path, target_size_gb): + target_size_bytes = target_size_gb * 1024 * 1024 * 1024 + paragraph = generate_paragraph() + paragraph_length = len(paragraph.encode('utf-8')) + + with open(file_path, 'w') as file: + while os.path.getsize(file_path) < target_size_bytes: + file.write(paragraph) + + final_size = os.path.getsize(file_path) / (1024 * 1024 * 1024) + print(f"Generated file size: {final_size:.2f} GB") + + # 生成一个 1 GB 大小的文件 + file_path = 'large_text_file.txt' + target_size_gb = 1 + generate_large_text_file(file_path, target_size_gb) + fns: + count: + args: + text_s3_path: "large_text_file.txt" + +# 函数配置项变体 +replicas: {} +benchlist: {} +# img_resize2: # 函数 和 是否使用minio排列组合,每种两个 +# source: img_resize + +# img_resize3: +# source: img_resize +# args: +# use_minio: false +# img_resize4: +# source: img_resize +# args: +# use_minio: false + + + +# benchlist: +# img_resize: +# img_resize2: +# img_resize3: +# img_resize4: + + +# parallel_composition: +# parallel: +# args: +# loopTime: 10000000 +# parallelIndex: 100 +# sequential: +# args: +# loopTime: 10000000 + + + +# fns: +# split: +# args: +# text_s3_path: "random_words.txt" +# count: \ No newline at end of file diff --git a/bencher/bench.yaml b/bencher/bench.yaml deleted file mode 100644 index 8c46fa5..0000000 --- a/bencher/bench.yaml +++ /dev/null @@ -1,10 +0,0 @@ -img_resize: - resize: - -parallel_composition: - parallel: - sequential: - -word_count: - split_word: - diff --git a/bencher/scripts/run_with_all_log.sh b/bencher/scripts/run_with_all_log.sh new file mode 100644 index 0000000..c73efba --- /dev/null +++ b/bencher/scripts/run_with_all_log.sh @@ -0,0 +1,4 @@ +# run with following command: +# source scripts/run_with_all_log.sh + +export RUST_LOG=trace,debug \ No newline at end of file diff --git a/bencher/src/demo_img_resize.rs b/bencher/src/_deperacated/demo_img_resize.rs similarity index 100% rename from bencher/src/demo_img_resize.rs rename to bencher/src/_deperacated/demo_img_resize.rs diff --git a/bencher/src/demo_parallel.rs b/bencher/src/_deperacated/demo_parallel.rs similarity index 100% rename from bencher/src/demo_parallel.rs rename to bencher/src/_deperacated/demo_parallel.rs diff --git a/bencher/src/demo_sequential.rs b/bencher/src/_deperacated/demo_sequential.rs similarity index 100% rename from bencher/src/demo_sequential.rs rename to bencher/src/_deperacated/demo_sequential.rs diff --git a/bencher/src/demo_word_count.rs b/bencher/src/_deperacated/demo_word_count.rs similarity index 94% rename from bencher/src/demo_word_count.rs rename to bencher/src/_deperacated/demo_word_count.rs index f3d3ff7..72f7f03 100644 --- a/bencher/src/demo_word_count.rs +++ b/bencher/src/_deperacated/demo_word_count.rs @@ -97,10 +97,24 @@ impl SpecTarget for WordCount { receive_resp_time - start_call_ms ); + // | cold start time + // | + // | fn_start_ms + println!("- req trans time: {}", req_arrive_time - start_call_ms); println!("- app verify time: {}", bf_exec_time - req_arrive_time); - println!("- cold start time: {}", recover_begin_time - bf_exec_time); - println!("- cold start time2: {}", fn_start_ms - recover_begin_time); + println!( + "- cold start time: {}", + if bf_exec_time > recover_begin_time { + recover_begin_time - bf_exec_time + } else { + 0 + } + ); + println!( + "- cold start time2: {}", + fn_start_ms - recover_begin_time.max(req_arrive_time) + ); println!("- exec time:{}", fn_end_ms - fn_start_ms); if fn_end_ms > receive_resp_time { println!( diff --git a/bencher/src/common_prepare.rs b/bencher/src/common_prepare.rs new file mode 100644 index 0000000..b004199 --- /dev/null +++ b/bencher/src/common_prepare.rs @@ -0,0 +1,74 @@ +use std::{collections::HashMap, path::PathBuf, process::Stdio}; + +use tokio::{fs, process::Command}; + +use crate::config::Config; + +/// return each app datas +/// app->[data1,data2] +pub async fn prepare_data( + target_apps: Vec, + config: &Config, +) -> HashMap> { + let mut prepare_data = HashMap::new(); + let model_apps: Vec = target_apps + .clone() + .into_iter() + .filter(|app| config.models.contains_key(app)) + .collect(); + + for app in model_apps { + fs::create_dir_all(PathBuf::from("prepare_data").join(&app)) + .await + .unwrap(); + let app_entry = config.models.get(&app).unwrap(); + for (i, script) in app_entry.prepare_scripts.iter().enumerate() { + let script_path = PathBuf::from("prepare_data") + .join(&app) + .join(format!("prepare_{}.py", i)); + let script_dir = PathBuf::from("prepare_data").join(&app); + let abs_script_dir = script_dir.canonicalize().unwrap(); + // let abs_script_path = script_path.canonicalize().unwrap(); + fs::write(&script_path, script).await.unwrap(); + + tracing::debug!( + "prepare data for {} with script {}", + app, + script_path.to_str().unwrap() + ); + let res = Command::new("python3") + .args(&[script_path.file_name().unwrap().to_str().unwrap(), &*app]) + .stdout(Stdio::piped()) + .stderr(Stdio::piped()) + .current_dir(abs_script_dir) + .spawn() + .unwrap() + .wait() + .await + .unwrap(); + if !res.success() { + panic!( + "prepare data for {} with script {} failed", + app, + script_path.to_str().unwrap() + ); + } + } + + for data in app_entry.prepare_data.iter() { + let data_path = PathBuf::from("prepare_data").join(&app).join(data); + if !data_path.exists() { + panic!("prepare data failed {:?}", data); + } + prepare_data + .entry(app.to_owned()) + .or_insert(vec![]) + .push(data_path); + } + // for data in app_entry.prepare_data.iter() { + // prepare_data.push(data.clone()); + // } + } + + prepare_data +} diff --git a/bencher/src/config.rs b/bencher/src/config.rs new file mode 100644 index 0000000..5233d70 --- /dev/null +++ b/bencher/src/config.rs @@ -0,0 +1,104 @@ +use serde_yaml::Value; +use std::{collections::HashMap, fs::File, io::BufReader}; + +#[derive(serde::Deserialize, Debug, Clone)] +pub struct FnDetails { + pub args: Option>, +} + +#[derive(serde::Deserialize, Debug, Clone)] +pub struct AppFnEntry { + pub prepare_data: Vec, + pub prepare_scripts: Vec, + pub fns: HashMap, +} + +#[derive(serde::Deserialize, Debug, Clone)] +pub struct AppFnReplica { + pub source: String, + pub fns: HashMap, +} + +// app: +// prepare_data: +// - file1 +// prepare_scripts: +// - script1 +// fns: +// fn1: +// args: +// pub type AppFnEntries = HashMap; + +#[derive(serde::Deserialize, Debug, Clone)] +pub struct MinioConfig { + pub endpoint: String, + pub access_key: String, + pub secret_key: String, + pub compose_path: String, +} + +impl MinioConfig { + pub fn one_line(&self) -> String { + format!("{},{},{}", self.endpoint, self.access_key, self.secret_key) + } +} + +#[derive(serde::Deserialize, Debug, Clone)] +pub struct Config { + pub models: HashMap, + pub replicas: HashMap, + pub benchlist: HashMap, + pub minio: MinioConfig, +} + +impl Config { + // todo: add replica support + pub fn get(&self, app: &str) -> Option<&AppFnEntry> { + self.models.get(app) + } + + pub fn get_fn_details(&self, app: &str, func: &str) -> Option { + let mut fndetail = if self.models.contains_key(app) { + self.models.get(app).unwrap().fns.get(func).cloned() + } else if let Some(replica) = self.replicas.get(app) { + // replica args will cover model args + let source = replica.source.clone(); + let source_fn_details = self.get_fn_details(&source, func); + if let Some(mut source_fn_details) = source_fn_details { + // cover by replica args + for (key, value) in replica.fns.get(func).unwrap().args.as_ref().unwrap() { + source_fn_details + .args + .as_mut() + .unwrap() + .insert(key.clone(), value.clone()); + } + Some(source_fn_details) + } else { + None + } + } else { + None + }; + fndetail.map(|mut fndetail| { + let args = fndetail.args.as_mut().unwrap(); + if let Some(Value::Bool(true)) = args.get("use_minio") { + args.insert("use_minio".to_owned(), Value::from(self.minio.one_line())); + } else { + // remove use_minio + args.remove("use_minio"); + } + + fndetail + }) + } +} + +pub fn load_config() -> Config { + let f = File::open("app_fn_entries.yaml").expect("open app_fn_entries.yaml failed"); + let freader = BufReader::new(f); + let app_fn_entries: Config = serde_yaml::from_reader(freader).unwrap_or_else(|e| { + panic!("parse 'app_fn_entries.yaml' failed with {:?}", e); + }); + app_fn_entries +} diff --git a/bencher/src/main.rs b/bencher/src/main.rs index 6f2ad5a..46f314a 100644 --- a/bencher/src/main.rs +++ b/bencher/src/main.rs @@ -1,23 +1,28 @@ -mod demo_img_resize; -mod demo_parallel; -mod demo_sequential; -mod demo_word_count; +mod common_prepare; +mod config; mod metric; +mod minio; +mod mode_bench; +mod mode_call_once; +mod mode_first_call; +mod mode_prepare; mod parse; -mod parse_app; mod parse_platform; mod parse_test_mode; mod platform_ow; mod platform_wl; mod prometheus; +mod test_call_once; +mod util; +// mod reponse; use async_trait::async_trait; use clap::Parser; +use config::Config; use enum_dispatch::enum_dispatch; use goose::prelude::*; use parse::Cli; -use parse_app::App; use s3::creds::Credentials; use s3::Bucket; use s3::BucketConfiguration; @@ -30,63 +35,10 @@ use std::sync::mpsc; use std::time::Duration; use tokio::sync::oneshot; use tokio::sync::Mutex; - -lazy_static::lazy_static! { - pub static ref BUCKET:Bucket={ - let(tx,rx)=mpsc::channel(); - tokio::spawn(async move{ - let bucket_name="serverless-bench"; - let region=Region::Custom { - region: "eu-central-1".to_owned(), - endpoint: "http://192.168.31.96:9009".to_owned(), - }; - let credentials= Credentials { - access_key: Some("minioadmin".to_owned()), - secret_key: Some("minioadmin123".to_owned()), - security_token: None, - session_token: None, - expiration: None, - }; - - let mut bucket=Bucket::new(bucket_name,region.clone(), credentials.clone()).unwrap().with_path_style(); - - let bucket_exist=match bucket.exists().await{ - Err(e)=>{ - tracing::warn!("test s3 is not started, automatically start it"); - // docker-compose up -d at ../middlewares/minio/ - process::Command::new("docker-compose") - .arg("up") - .arg("-d") - .current_dir(PathBuf::from("../middlewares/minio/")) - .output() - .expect("failed to start minio"); - tokio::time::sleep(Duration::from_secs(15)).await; - bucket.exists().await.unwrap() - } - Ok(ok)=>ok - }; - - if bucket_exist { - for b in bucket.list("".to_owned(),None).await.unwrap(){ - bucket.delete_object(b.name).await.unwrap(); - // bucket.delete().await.unwrap(); - } - }else{ - bucket = Bucket::create_with_path_style( - bucket_name, - region, - credentials, - BucketConfiguration::default(), - ) - .await.unwrap() - .bucket; - } - - tx.send(bucket); - }); - rx.recv().unwrap() - }; -} +use tracing::Level; +use tracing_subscriber::layer::SubscriberExt; +use tracing_subscriber::util::SubscriberInitExt; +use tracing_subscriber::Layer; fn is_bench_mode(cli: &Cli) -> bool { cli.bench_mode > 0 @@ -96,17 +48,20 @@ fn is_first_call_mode(cli: &Cli) -> bool { cli.first_call_mode > 0 } -fn is_once_mode(cli: &Cli) -> bool { - !is_bench_mode(cli) && !is_first_call_mode(cli) +fn is_prepare_mode(cli: &Cli) -> bool { + cli.prepare > 0 } -#[enum_dispatch(SpecTarget)] -enum SpecTargetBind { - ImgResize(demo_img_resize::ImgResize), - WordCount(demo_word_count::WordCount), - Parallel(demo_parallel::Parallel), - Sequential(demo_sequential::Sequential), +fn is_once_mode(cli: &Cli) -> bool { + !is_bench_mode(cli) && !is_first_call_mode(cli) } +// #[enum_dispatch(SpecTarget)] +// enum SpecTargetBind { +// ImgResize(demo_img_resize::ImgResize), +// WordCount(demo_word_count::WordCount), +// Parallel(demo_parallel::Parallel), +// Sequential(demo_sequential::Sequential), +// } /// unit: ms #[derive(Debug, Serialize, Deserialize, Clone)] @@ -152,29 +107,29 @@ impl Metric { } } -#[enum_dispatch] -trait SpecTarget: Send + 'static { - fn app(&self) -> App; - fn set_platform(&mut self, platform: PlatformOpsBind); - fn get_platform(&mut self) -> &mut PlatformOpsBind; +// #[enum_dispatch] +// trait SpecTarget: Send + 'static { +// fn app(&self) -> App; +// fn set_platform(&mut self, platform: PlatformOpsBind); +// fn get_platform(&mut self) -> &mut PlatformOpsBind; - async fn prepare_once(&mut self, _seed: String, _cli: Cli) { - unimplemented!() - } - async fn call_once(&mut self, _cli: Cli) -> Metric { - unimplemented!() - } - async fn prepare_bench(&mut self, _seed: String, _cli: Cli) {} - async fn call_bench(&mut self, _cli: Cli) { - unimplemented!() - } - async fn prepare_first_call(&mut self, _seed: String, _cli: Cli) { - unimplemented!() - } - async fn call_first_call(&mut self, _cli: Cli) { - unimplemented!() - } -} +// async fn prepare_once(&mut self, _seed: String, _cli: Cli) { +// unimplemented!() +// } +// async fn call_once(&mut self, _cli: Cli) -> Metric { +// unimplemented!() +// } +// async fn prepare_bench(&mut self, _seed: String, _cli: Cli) {} +// async fn call_bench(&mut self, _cli: Cli) { +// unimplemented!() +// } +// async fn prepare_first_call(&mut self, _seed: String, _cli: Cli) { +// unimplemented!() +// } +// async fn call_first_call(&mut self, _cli: Cli) { +// unimplemented!() +// } +// } #[enum_dispatch(PlatformOps)] enum PlatformOpsBind { @@ -184,69 +139,131 @@ enum PlatformOpsBind { #[enum_dispatch] pub trait PlatformOps: Send + 'static { + fn cli(&self) -> &Cli; async fn remove_all_fn(&self); async fn upload_fn(&mut self, demo: &str, rename_sub: &str); async fn call_fn(&self, app: &str, func: &str, arg_json_value: &serde_json::Value) -> String; + async fn prepare_apps_bin(&self, apps: Vec, config: &Config); } + +// pub trait PlatformOpsExt: PlatformOps { +// fn config_path_string(&self) -> String { +// self.cli().config_path() +// } +// } // pub struct CallRes { // out: String, // err: String, // } +pub fn start_tracing() { + let my_filter = tracing_subscriber::filter::filter_fn(|v| { + // println!("{}", v.module_path().unwrap()); + // println!("{}", v.name()); + // if v.module_path().unwrap().contains("quinn_proto") { + // return false; + // } + + // if v.module_path().unwrap().contains("qp2p::wire_msg") { + // return false; + // } + + // println!("{}", v.target()); + if let Some(mp) = v.module_path() { + if mp.contains("async_raft") { + return false; + } + if mp.contains("hyper") { + return false; + } + } + + // if v.module_path().unwrap().contains("less::network::p2p") { + // return false; + // } + + // v.level() == &tracing::Level::ERROR + // || v.level() == &tracing::Level::WARN + // || v.level() == &tracing::Level::INFO + v.level() != &tracing::Level::TRACE + // v.level() == &tracing::Level::INFO + // true + }); + let my_layer = tracing_subscriber::fmt::layer(); + tracing_subscriber::registry() + .with(my_layer.with_filter(my_filter)) + .init(); +} + #[tokio::main] async fn main() -> Result<(), GooseError> { // don't go thouph proxy when performance std::env::remove_var("http_proxy"); std::env::remove_var("https_proxy"); - let subscriber = tracing_subscriber::FmtSubscriber::new(); - tracing::subscriber::set_global_default(subscriber).unwrap(); - let cli = Cli::parse(); - let seed = "helloworld"; + start_tracing(); - assert!( - !(cli.with_ow > 0 && cli.with_wl > 0), - "Cannot run with both OpenWhisk and Waverless" - ); + // tracing::debug!( + // "bencher running at dir {}", + // std::env::current_dir().unwrap() + // ); + // debug abs running dir - assert!( - cli.bench_mode + cli.first_call_mode <= 1, - "Cannot test multiple modes at one time {}", - cli.bench_mode + cli.first_call_mode + tracing::debug!( + "bencher running at dir {}", + std::env::current_dir().unwrap().display() ); - let mut target = if cli.img_resize > 0 { - SpecTargetBind::from(demo_img_resize::ImgResize::default()) - } else if cli.word_count > 0 { - SpecTargetBind::from(demo_word_count::WordCount::default()) - } else if cli.parallel > 0 { - SpecTargetBind::from(demo_parallel::Parallel::default()) - } else if cli.sequential > 0 { - SpecTargetBind::from(demo_sequential::Sequential::default()) - } else { - unreachable!() - }; - target.set_platform(if cli.with_ow > 0 { - PlatformOpsBind::from(platform_ow::PlatfromOw::default()) + let cli = Cli::parse(); + cli.check_app_fn().check_platform().check_mode(); + + let config = config::load_config(); + + minio::init_bucket(&config.minio).await; + + let seed = "helloworld"; + tracing::debug!("Preparing paltform >>>"); + let mut platform = if cli.with_ow > 0 { + PlatformOpsBind::from(platform_ow::PlatfromOw::new(&cli, &config)) } else if cli.with_wl > 0 { - PlatformOpsBind::from(platform_wl::PlatfromWl::new()) + PlatformOpsBind::from(platform_wl::PlatfromWl::new(&cli, config.clone())) } else { - panic!(); - }); + panic!("no platform specified, please specify by --with-ow or --with-wl"); + }; + + tracing::debug!("dispatching mode >>>"); + fn print_mode(mode: &str, preparing: bool) { + tracing::debug!("==========================="); + tracing::debug!("Current mode is {mode}"); + tracing::debug!("Preparing: {preparing}"); + tracing::debug!("==========================="); + } + if is_prepare_mode(&cli) { + common_prepare::prepare_data(cli.target_apps(), &config).await; + platform.prepare_apps_bin(cli.target_apps(), &config).await; + } if is_bench_mode(&cli) { - target.prepare_bench(seed.to_owned(), cli.clone()).await; - target.call_bench(cli).await; + print_mode("bench", is_bench_mode(&cli)); + unimplemented!(); + // target.prepare_bench(seed.to_owned(), cli.clone()).await; + // target.call_bench(cli).await; } else if is_first_call_mode(&cli) { - target - .prepare_first_call(seed.to_owned(), cli.clone()) - .await; - target.call_first_call(cli).await; + print_mode("first_call", is_bench_mode(&cli)); + if is_prepare_mode(&cli) { + mode_first_call::prepare(&mut platform, &config, cli.clone()).await; + } else { + mode_first_call::call(&mut platform, cli, &config).await; + } } else if is_once_mode(&cli) { - target.prepare_once(seed.to_owned(), cli.clone()).await; - // wait for the system to be stable - tokio::time::sleep(Duration::from_secs(5)).await; - target.call_once(cli).await; + print_mode("first_call", is_bench_mode(&cli)); + if is_prepare_mode(&cli) { + mode_call_once::prepare(&mut platform, seed.to_owned(), cli.clone()).await; + } else { + mode_call_once::call(&mut platform, cli, &config).await; + } + } else { + panic!("unreachable") } Ok(()) diff --git a/bencher/src/minio.rs b/bencher/src/minio.rs new file mode 100644 index 0000000..c13993d --- /dev/null +++ b/bencher/src/minio.rs @@ -0,0 +1,84 @@ +use std::{path::PathBuf, process::Stdio, time::Duration}; + +use s3::{creds::Credentials, Bucket, BucketConfiguration, Region}; +use tokio::{process::Command, sync::OnceCell}; + +use crate::{config::MinioConfig, util::CommandDebugStdio}; + +pub async fn init_bucket(config: &MinioConfig) -> Bucket { + let bucket_name = "serverless-bench"; + let region = Region::Custom { + region: "eu-central-1".to_owned(), + endpoint: config.endpoint.to_owned(), + }; + let credentials = Credentials { + access_key: Some(config.access_key.to_owned()), + secret_key: Some(config.secret_key.to_owned()), + security_token: None, + session_token: None, + expiration: None, + }; + + let mut bucket = Bucket::new(bucket_name, region.clone(), credentials.clone()) + .unwrap() + .with_path_style(); + + let bucket_exist = match bucket.exists().await { + Err(e) => { + tracing::warn!("test s3 is not started, automatically start it"); + // docker-compose up -d at ../middlewares/minio/ + // check if docker-compose is installed + if !Command::new("docker-compose") + .arg("--version") + .output() + .await + .is_ok() + { + panic!("docker-compose is not installed"); + } + let (_, _, mut res) = Command::new("docker-compose") + .arg("up") + .arg("-d") + .stdout(Stdio::piped()) + .stderr(Stdio::piped()) + .current_dir(PathBuf::from(config.compose_path.clone())) + .spawn_debug() + .await; + let res = res.wait().await.unwrap(); + + assert!( + res.success(), + "failed to start minio at {}", + config.compose_path + ); + tokio::time::sleep(Duration::from_secs(15)).await; + bucket.exists().await.unwrap() + } + Ok(ok) => ok, + }; + + if bucket_exist { + for b in bucket.list("".to_owned(), None).await.unwrap() { + bucket.delete_object(b.name).await.unwrap(); + // bucket.delete().await.unwrap(); + } + } else { + bucket = Bucket::create_with_path_style( + bucket_name, + region, + credentials, + BucketConfiguration::default(), + ) + .await + .unwrap() + .bucket; + } + + BUCKET.set(bucket).unwrap(); + + BUCKET.get().unwrap().clone() +} + +lazy_static::lazy_static! { + pub static ref BUCKET: OnceCell = OnceCell::new(); +} diff --git a/bencher/src/mode_bench.rs b/bencher/src/mode_bench.rs new file mode 100644 index 0000000..13cb390 --- /dev/null +++ b/bencher/src/mode_bench.rs @@ -0,0 +1,3 @@ +pub fn call_bench() { + unimplemented!(); +} diff --git a/bencher/src/mode_call_once.rs b/bencher/src/mode_call_once.rs new file mode 100644 index 0000000..a9242e7 --- /dev/null +++ b/bencher/src/mode_call_once.rs @@ -0,0 +1,145 @@ +use std::{ + collections::HashMap, + fs, + time::{SystemTime, UNIX_EPOCH}, +}; + +use clap::Args; +use serde_yaml::Value; + +use crate::{config::Config, parse::Cli, Metric, PlatformOps, PlatformOpsBind}; + +pub async fn prepare(platform: &mut PlatformOpsBind, seed: String, cli: Cli) { + platform.remove_all_fn().await; + platform.upload_fn(&cli.app().unwrap(), "").await; + // self.prepare_img(&seed); +} + +pub async fn call(platform: &mut PlatformOpsBind, cli: Cli, config: &Config) -> Metric { + // read image from file + // let img = fs::read("img_resize/image_0.jpg").unwrap(); + let app = cli.app().unwrap(); + let func = cli.func().unwrap(); + // BUCKET + // // .lock() + // // .await + // .put_object(format!("image_{}.jpg", 0), &img) + // .await + // .unwrap(); + + let fndetail = config.get_fn_details(&app, &func).unwrap(); + // let args = cli.func_details().args; + // let arg = Args { + // image_s3_path: format!("image_{}.jpg", 0), + // target_width: 50, + // target_height: 50, + // }; + + let start_call_ms = SystemTime::now() + .duration_since(UNIX_EPOCH) + .expect("Time went backwards") + .as_millis() as u64; + // let output = platform + // .call_fn("img_resize", "resize", &serde_json::to_value(args).unwrap()) + // .await; + let output = platform + .call_fn( + &cli.app().unwrap(), + &cli.func().unwrap(), + &serde_json::to_value(fndetail.args).unwrap(), + ) + .await; + // tracing::info!("debug output {}", output); + let res: serde_json::Value = serde_json::from_str(&output).unwrap_or_else(|e| { + tracing::error!("failed to parse json: {}", e); + panic!("output is not json: '{}'", output); + }); + + let mut req_arrive_time = res + .get("req_arrive_time") + .map(|v| v.as_u64().unwrap()) + .unwrap_or(0); + + let mut bf_exec_time = res + .get("bf_exec_time") + .map(|v| v.as_u64().unwrap()) + .unwrap_or(0); + + let mut recover_begin_time = res + .get("recover_begin_time") + .map(|v| v.as_u64().unwrap()) + .unwrap_or(0); + + let fn_start_ms = res.get("fn_start_time").unwrap().as_u64().unwrap(); + { + if req_arrive_time == 0 { + req_arrive_time = fn_start_ms; + } + if bf_exec_time == 0 { + bf_exec_time = fn_start_ms; + } + if recover_begin_time == 0 { + recover_begin_time = fn_start_ms; + } + } + + let fn_end_ms = res.get("fn_end_time").unwrap().as_u64().unwrap(); + + let receive_resp_time = SystemTime::now() + .duration_since(UNIX_EPOCH) + .expect("Time went backwards") + .as_millis() as u64; + + // | + println!("debug output: {:?}", output); + println!( + "\ntotal request latency: {} ms", + receive_resp_time - start_call_ms + ); + + println!("- req trans time: {}", req_arrive_time - start_call_ms); + // if recover_begin_time<=req_arrive_time{ + + // } + // 系统调用函数时刻 - 请求到达系统 + println!("- app verify time: {}", bf_exec_time - req_arrive_time); + // 开始冷启动时刻 + println!( + "- cold start time: {}", + if recover_begin_time > bf_exec_time { + recover_begin_time - bf_exec_time + } else { + 0 + } + ); + + // 冷启动和请求到达系统谁更新 + println!( + "- cold start time2: {}", + fn_start_ms - recover_begin_time.max(req_arrive_time) + ); + + println!("- exec time:{}", fn_end_ms - fn_start_ms); + if fn_end_ms > receive_resp_time { + println!( + "- system time is not synced, lag with {} ms", + fn_end_ms - receive_resp_time + ); + } else { + println!("- receive resp time: {}", receive_resp_time - fn_end_ms); + } + + // let res: Resp = serde_json::from_str(&output).expect("Failed to parse JSON response"); + // let res = BUCKET.get_object(&res.resized_image).await.unwrap(); + // std::fs::write("resized_image.jpg", res.as_slice()).unwrap(); + + Metric { + start_call_time: start_call_ms, + req_arrive_time, + bf_exec_time, + recover_begin_time, + fn_start_time: fn_start_ms, + fn_end_time: fn_end_ms, + receive_resp_time, + } +} diff --git a/bencher/src/mode_first_call.rs b/bencher/src/mode_first_call.rs new file mode 100644 index 0000000..694bb3e --- /dev/null +++ b/bencher/src/mode_first_call.rs @@ -0,0 +1,66 @@ +use crate::{ + config::Config, metric::Recorder, mode_call_once, parse::Cli, parse_platform::Platform, + parse_test_mode::TestMode, PlatformOps, PlatformOpsBind, +}; + +pub async fn prepare(platform: &mut PlatformOpsBind, config: &Config, cli: Cli) { + platform.remove_all_fn().await; + platform.prepare_apps_bin(cli.target_apps(), config).await; + // cli.prepare_img(&seed); +} + +pub async fn call(platform: &mut PlatformOpsBind, cli: Cli, config: &Config) { + let app = cli.app().expect("first call mode must have app name"); + let func = cli.func().expect("first call mode must have func name"); + + let mut recorder = Recorder::new(app.to_string(), TestMode::from(&cli), Platform::from(&cli)); + + let mut metrics = vec![]; + for _ in 0..20 { + platform.upload_fn("simple_demo", "").await; + let m = mode_call_once::call(platform, cli.clone(), config).await; //self.call_once(cli.clone()).await; + recorder.record(m.clone()); + // prometheus::upload_fn_call_metric("simple_demo", &m).await; + metrics.push(m); + } + recorder.persist(); + + println!("Average metrics:"); + println!( + "\ntotal request latency: {}", + metrics.iter().map(|v| v.get_total_req()).sum::() as f32 / metrics.len() as f32 + ); + + println!( + "- req trans time: {}", + metrics.iter().map(|v| v.get_req_trans_time()).sum::() as f32 / metrics.len() as f32 + ); + + println!( + "- app verify time: {}", + metrics.iter().map(|v| v.get_app_verify_time()).sum::() as f32 / metrics.len() as f32 + ); + + println!( + "- cold start time: {}", + metrics.iter().map(|v| v.get_cold_start_time()).sum::() as f32 / metrics.len() as f32 + ); + + println!( + "- cold start time2: {}", + metrics + .iter() + .map(|v| v.get_cold_start_time2()) + .sum::() as f32 + / metrics.len() as f32 + ); + + println!( + "- exec time: {}", + metrics.iter().map(|v| v.get_exec_time()).sum::() as f32 / metrics.len() as f32 + ); + // println!("- app verify time: {}", bf_exec_time - req_arrive_time); + // println!("- cold start time: {}", recover_begin_time - bf_exec_time); + // println!("- cold start time2: {}", fn_start_ms - recover_begin_time); + // println!("- exec time:{}", fn_end_ms - fn_start_ms); +} diff --git a/bencher/src/mode_prepare.rs b/bencher/src/mode_prepare.rs new file mode 100644 index 0000000..db96dd3 --- /dev/null +++ b/bencher/src/mode_prepare.rs @@ -0,0 +1,5 @@ +use crate::parse::Cli; + +fn prepare(cli:Cli){ + +} \ No newline at end of file diff --git a/bencher/src/parse.rs b/bencher/src/parse.rs index 2502db3..8f18599 100644 --- a/bencher/src/parse.rs +++ b/bencher/src/parse.rs @@ -1,23 +1,25 @@ +use core::panic; +use std::collections::HashMap; +use std::fs::File; +use std::io::BufReader; + use clap::arg; use clap::value_parser; use clap::{command, Command}; use clap::{Parser, Subcommand}; +use serde_yaml::Value; + +use crate::config::Config; +use crate::config::FnDetails; #[derive(Parser, Clone)] #[command(version, about, long_about = None)] pub struct Cli { // #[arg(action = clap::ArgAction::Count)] - #[arg(long, action = clap::ArgAction::Count)] - pub img_resize: u8, - - #[arg(long, action = clap::ArgAction::Count)] - pub word_count: u8, + pub app_fn: String, #[arg(long, action = clap::ArgAction::Count)] - pub parallel: u8, - - #[arg(long, action = clap::ArgAction::Count)] - pub sequential: u8, + pub prepare: u8, #[arg(long, action = clap::ArgAction::Count)] pub with_ow: u8, @@ -31,4 +33,103 @@ pub struct Cli { // create many function copy and collect the average cold start #[arg(long, action = clap::ArgAction::Count)] pub first_call_mode: u8, + + #[arg(short, long, default_value = "../middlewares/cluster_config.yml")] + config: String, +} + +impl Cli { + pub fn target_apps(&self) -> Vec { + vec![self.app().unwrap()] + } + pub fn app(&self) -> Option { + if self.app_fn.find("/").is_none() { + return None; + } + return Some(self.app_fn.split("/").next().unwrap().to_owned()); + } + pub fn func(&self) -> Option { + if self.app_fn.find("/").is_none() { + return None; + } + let mut iter = self.app_fn.split("/"); + iter.next(); + return Some(iter.next().unwrap().to_owned()); + } + // pub fn func_details(&self, config: &Config) -> FnDetails { + // let app = self.app().unwrap_or_else(|| { + // panic!("missing app name, cur input is {}", self.app_fn); + // }); + // let func = self.func().unwrap_or_else(|| { + // panic!("missing fn name, cur input is {}", self.app_fn); + // }); + + // //read 'app_fn_entries.yaml' + // if config.models.contains_key(&app) { + + // }else{ + // // replica, read source first, then compose fn details + // } + + // let f = config + // .get(&app) + // .unwrap_or_else(|| panic!("app '{}' is not in app_fn_entries.yaml", app)) + // .fns + // .get(&func) + // .unwrap_or_else(|| { + // panic!( + // "function '{}' is not in app '{}' in app_fn_entries.yaml", + // func, app + // ) + // }); + // f.clone() + // } + pub fn check_app_fn(&self) -> &Self { + let app = self.app().unwrap_or_else(|| { + panic!("missing app name, cur input is {}", self.app_fn); + }); + let func = self.func().unwrap_or_else(|| { + panic!("missing fn name, cur input is {}", self.app_fn); + }); + + //read 'app_fn_entries.yaml' + let f = File::open("app_fn_entries.yaml").expect("open app_fn_entries.yaml failed"); + let freader = BufReader::new(f); + let app_fn_entries: Config = serde_yaml::from_reader(freader).unwrap_or_else(|e| { + panic!("parse 'app_fn_entries.yaml' failed with {:?}", e); + }); + + let _ = app_fn_entries + .get(&app) + .unwrap_or_else(|| panic!("app '{}' is not in app_fn_entries.yaml", app)) + .fns + .get(&func) + .unwrap_or_else(|| { + panic!( + "function '{}' is not in app '{}' in app_fn_entries.yaml", + func, app + ) + }); + self + } + pub fn check_platform(&self) -> &Self { + assert!( + !(self.with_ow > 0 && self.with_wl > 0), + "Cannot run with both OpenWhisk and Waverless" + ); + self + } + + pub fn check_mode(&self) -> &Self { + assert!( + self.bench_mode + self.first_call_mode <= 1, + "Cannot test multiple modes at one time {}", + self.bench_mode + self.first_call_mode + ); + self + } + + pub fn cluster_config(&self) -> String { + self.config.clone() + } } diff --git a/bencher/src/parse_app.rs b/bencher/src/parse_app.rs deleted file mode 100644 index 227bd2a..0000000 --- a/bencher/src/parse_app.rs +++ /dev/null @@ -1,35 +0,0 @@ -use crate::parse::Cli; - -pub enum App { - ImgResize, - WordCount, - Parallel, - Sequential, -} - -impl From<&Cli> for App { - fn from(cli: &Cli) -> Self { - if cli.img_resize > 0 { - App::ImgResize - } else if cli.word_count > 0 { - App::WordCount - } else if cli.parallel > 0 { - App::Parallel - } else if cli.sequential > 0 { - App::Sequential - } else { - unimplemented!() - } - } -} - -impl ToString for App { - fn to_string(&self) -> String { - match self { - App::ImgResize => "img_resize".to_owned(), - App::WordCount => "word_count".to_owned(), - App::Parallel => "parallel".to_owned(), - App::Sequential => "sequential".to_owned(), - } - } -} diff --git a/bencher/src/platform_ow.rs b/bencher/src/platform_ow.rs index 15a0dce..5e9ea17 100644 --- a/bencher/src/platform_ow.rs +++ b/bencher/src/platform_ow.rs @@ -6,21 +6,55 @@ use std::{ }; use tokio::process::{self, Command}; -use crate::PlatformOps; +use crate::{config::Config, parse::Cli, PlatformOps}; pub struct PlatfromOw { pub gen_demos: HashSet, + cli: Cli, + config: Config, } -impl Default for PlatfromOw { - fn default() -> Self { - PlatfromOw { +impl PlatfromOw { + pub fn new(cli: &Cli, config: &Config) -> Self { + Self { gen_demos: HashSet::new(), + cli: cli.clone(), + config: config.clone(), } } } +// impl Default for PlatfromOw { +// fn default() -> Self { +// PlatfromOw { +// gen_demos: HashSet::new(), +// } +// } +// } + impl PlatformOps for PlatfromOw { + async fn prepare_apps_bin(&self, apps: Vec, config: &Config) { + process::Command::new("python3") + .args(&["../middlewares/openwhisk/7.clean_all_fns.py"]) + .status() + .await + .expect("Failed to clean openwhisk fns"); + + for app in apps { + let mut cmd2 = process::Command::new("python3"); + cmd2.args(&["../middlewares/openwhisk/8.add_func.py", &app, &app]); + tracing::info!("run cmd: {:?}", cmd2); + let res = cmd2 + .status() + .await + .expect(&format!("Failed to add func {}", app)); + assert!(res.success()); + } + } + + fn cli(&self) -> &Cli { + &self.cli + } async fn remove_all_fn(&self) { process::Command::new("python3") .args(&["../middlewares/openwhisk/7.clean_all_fns.py"]) diff --git a/bencher/src/platform_wl.rs b/bencher/src/platform_wl.rs index d902d9c..53fbce0 100644 --- a/bencher/src/platform_wl.rs +++ b/bencher/src/platform_wl.rs @@ -1,35 +1,43 @@ use tokio::process; +use crate::config::Config; +use crate::parse::Cli; use crate::PlatformOps; use std::collections::HashSet; use std::{collections::HashMap, fs::File, io::BufReader, str::from_utf8}; pub struct PlatfromWl { + cli: Cli, master_url: String, worker_url: String, - gen_demos: HashSet, + // gen_demos: HashSet, + config: Config, } impl PlatfromWl { - pub fn new() -> Self { - let file = File::open("../middlewares/cluster_config.yml").unwrap(); - let reader = BufReader::new(file); - let config: HashMap> = - serde_yaml::from_reader(reader).unwrap(); + pub fn new(cli: &Cli, config: Config) -> Self { let mut res = Self { + cli: cli.clone(), master_url: "".to_owned(), worker_url: "".to_owned(), - gen_demos: HashSet::new(), + // gen_demos: HashSet::new(), + config: config.clone(), }; + + let file = File::open(cli.cluster_config()).unwrap(); + let reader = BufReader::new(file); + let config: HashMap> = + serde_yaml::from_reader(reader).unwrap(); + for (_, conf) in config { if conf.contains_key("is_master") { res.master_url = format!( - "http://{}:2501", + "http://{}", conf.get("ip").unwrap().as_str().unwrap().to_owned() ); } else { res.worker_url = format!( - "http://{}:2501", + "http://{}", conf.get("ip").unwrap().as_str().unwrap().to_owned() ); } @@ -39,19 +47,39 @@ impl PlatfromWl { } impl PlatformOps for PlatfromWl { - async fn remove_all_fn(&self) {} - async fn upload_fn(&mut self, demo: &str, rename_sub: &str) { - if !self.gen_demos.contains(demo) { + async fn prepare_apps_bin(&self, apps: Vec, config: &Config) { + let model_apps: Vec = apps + .clone() + .into_iter() + .filter(|app| config.models.contains_key(app)) + .collect(); + + for app in model_apps { let res = process::Command::new("python3") - .args(&["../demos/scripts/1.gen_waverless_app.py", demo]) + .args(&["../demos/scripts/1.gen_waverless_app.py", &app]) .status() .await - .expect(&format!("Failed to gen demo {}", demo)); - assert!(res.success()); - self.gen_demos.insert(demo.to_owned()); + .expect(&format!("Failed to gen demo {}", app)); + assert!(res.success(), "Failed to gen demo app: {}", app); + // self.gen_demos.insert(demo.to_owned()); } + } + + fn cli(&self) -> &Cli { + &self.cli + } + async fn remove_all_fn(&self) {} + async fn upload_fn(&mut self, demo: &str, rename_sub: &str) { + // if !self.gen_demos.contains(demo) { + + // } process::Command::new("python3") - .args(&["../middlewares/waverless/3.add_func.py", demo, rename_sub]) + .args(&[ + "../middlewares/waverless/3.add_func.py", + demo, + rename_sub, + &self.cli().cluster_config(), + ]) .status() .await .expect(&format!("Failed to add func {} as {}", demo, rename_sub)); diff --git a/bencher/src/response.rs b/bencher/src/response.rs new file mode 100644 index 0000000..d0a5cca --- /dev/null +++ b/bencher/src/response.rs @@ -0,0 +1,7 @@ +use std::collections::HashMap; + +use serde_yaml::Value; + +pub type Resp = HashMap; + +pub trait RespMetricExt {} diff --git a/bencher/src/test_call_once.rs b/bencher/src/test_call_once.rs new file mode 100644 index 0000000..4b4c52d --- /dev/null +++ b/bencher/src/test_call_once.rs @@ -0,0 +1,107 @@ +use std::{ + collections::HashMap, + fs, + time::{SystemTime, UNIX_EPOCH}, +}; + +use clap::Args; +use serde_yaml::Value; + +use crate::{config::Config, parse::Cli, Metric, PlatformOps, PlatformOpsBind}; + +pub async fn prepare(platform: &mut PlatformOpsBind, seed: String, cli: Cli) { + platform.remove_all_fn().await; + platform + .upload_fn("simple_demo", "/root/ygy/demos/simple_demo/pack") + .await; +} + +// call once mod only got one function +pub async fn call(platform: &mut PlatformOpsBind, cli: Cli, config: &Config) -> Metric { + let app = cli.app().unwrap(); + let func = cli.func().unwrap(); + let args = config + .get_fn_details(&app, &func) + .unwrap() + .args + .unwrap_or_default(); + + let start_call_ms = SystemTime::now() + .duration_since(UNIX_EPOCH) + .expect("Time went backwards") + .as_millis() as u64; + + let output = platform + .call_fn(&app, &func, &serde_json::to_value(args).unwrap()) + .await; + + let res: serde_json::Value = serde_json::from_str(&output).unwrap_or_else(|e| { + tracing::error!("failed to parse json: {}", e); + panic!("output is not json: '{}'", output); + }); + + let mut req_arrive_time = res + .get("req_arrive_time") + .map(|v| v.as_u64().unwrap()) + .unwrap_or(0); + + let mut bf_exec_time = res + .get("bf_exec_time") + .map(|v| v.as_u64().unwrap()) + .unwrap_or(0); + + let mut recover_begin_time = res + .get("recover_begin_time") + .map(|v| v.as_u64().unwrap()) + .unwrap_or(0); + + let fn_start_ms = res.get("fn_start_time").unwrap().as_u64().unwrap(); + { + if req_arrive_time == 0 { + req_arrive_time = fn_start_ms; + } + if bf_exec_time == 0 { + bf_exec_time = fn_start_ms; + } + if recover_begin_time == 0 { + recover_begin_time = fn_start_ms; + } + } + + let fn_end_ms = res.get("fn_end_time").unwrap().as_u64().unwrap(); + + let receive_resp_time = SystemTime::now() + .duration_since(UNIX_EPOCH) + .expect("Time went backwards") + .as_millis() as u64; + + println!("debug output: {:?}", output); + println!( + "\ntotal request latency: {}", + receive_resp_time - start_call_ms + ); + + println!("- req trans time: {}", req_arrive_time - start_call_ms); + println!("- app verify time: {}", bf_exec_time - req_arrive_time); + println!("- cold start time: {}", recover_begin_time - bf_exec_time); + println!("- cold start time2: {}", fn_start_ms - recover_begin_time); + println!("- exec time:{}", fn_end_ms - fn_start_ms); + if fn_end_ms > receive_resp_time { + println!( + "- system time is not synced, lag with {} ms", + fn_end_ms - receive_resp_time + ); + } else { + println!("- receive resp time: {}", receive_resp_time - fn_end_ms); + } + + Metric { + start_call_time: start_call_ms, + req_arrive_time, + bf_exec_time, + recover_begin_time, + fn_start_time: fn_start_ms, + fn_end_time: fn_end_ms, + receive_resp_time, + } +} diff --git a/bencher/src/util.rs b/bencher/src/util.rs new file mode 100644 index 0000000..a1dc853 --- /dev/null +++ b/bencher/src/util.rs @@ -0,0 +1,52 @@ +use tokio::{ + io::{AsyncBufReadExt, BufReader}, + process::{Child, Command}, +}; + +pub trait CommandDebugStdio { + async fn spawn_debug( + &mut self, + ) -> ( + tokio::task::JoinHandle, + tokio::task::JoinHandle, + Child, + ); +} + +impl CommandDebugStdio for Command { + async fn spawn_debug( + &mut self, + ) -> ( + tokio::task::JoinHandle, + tokio::task::JoinHandle, + Child, + ) { + let mut child = self.spawn().unwrap(); + let stdout = child.stdout.take().unwrap(); + let stderr = child.stderr.take().unwrap(); + + // 分别处理 stdout 和 stderr + let mut stdout_reader = BufReader::new(stdout).lines(); + let mut stderr_reader = BufReader::new(stderr).lines(); + + let stdout_task = tokio::spawn(async move { + let mut all = String::new(); + while let Ok(Some(line)) = stdout_reader.next_line().await { + println!("[STDOUT] {}", line); + all += &format!("[STDOUT] {}\n", line); + } + all + }); + + let stderr_task = tokio::spawn(async move { + let mut all = String::new(); + while let Ok(Some(line)) = stderr_reader.next_line().await { + eprintln!("[STDERR] {}", line); + all += &format!("[STDERR] {}\n", line); + } + all + }); + + (stdout_task, stderr_task, child) + } +} diff --git a/config_easytier.py b/config_easytier.py new file mode 100644 index 0000000..e15495b --- /dev/null +++ b/config_easytier.py @@ -0,0 +1,286 @@ +#!/usr/bin/env python3 + +########################################### +# EasyTier 配置参数 +########################################### +# 网络名称,确保唯一性 +NETWORK_NAME = "cuit_cloud" +# 网络密钥,用于加密通信 +NETWORK_SECRET = "827385543" +# 是否使用官方公共节点 +USE_PUBLIC_NODE = True +# 官方公共节点地址 +PUBLIC_NODE = "tcp://115.29.224.208:11010" +# 本地监听地址(多个进程需要避免端口冲突) +LISTEN_PORT = "11010" +# ipv4 +IPV4="10.126.126.78" +# 是否启用子网代理 +ENABLE_SUBNET_PROXY = False +# 要代理的子网(如果启用子网代理) +SUBNET_TO_PROXY = "192.168.1.0/24" +########################################### + + + +import os +import sys + +# 切换到脚本所在目录 +script_dir = os.path.dirname(os.path.abspath(__file__)) +os.chdir(script_dir) + +########################################### +# 路径配置 +########################################### +# 下载目录 +DOWNLOAD_DIR = '/teledeploy_secret/bin_easytier' +# 安装目录 +INSTALL_DIR = os.path.join(DOWNLOAD_DIR, 'install') +# 可执行文件名 +EXEC_NAMES = ['easytier-cli', 'easytier-core', 'easytier-web'] +MAIN_EXEC = 'easytier-core' +# 可执行文件的完整路径 +EXEC_PATHS = {name: os.path.join(INSTALL_DIR, 'easytier', name) for name in EXEC_NAMES} +# 软链接路径 +LINK_PATHS = {name: os.path.join('/usr/bin', name) for name in EXEC_NAMES} +# 主软链接路径 +MAIN_LINK = os.path.join('/usr/bin', 'easytier') + + +import subprocess +import platform +import shutil +import time +import urllib.request +import zipfile +import os +from pathlib import Path + +def run_with_root(cmd): + """使用root权限运行命令,如果不是root用户则添加sudo""" + if isinstance(cmd, list): + cmd = ' '.join(cmd) + if os.geteuid() != 0: + cmd = 'sudo ' + cmd + return os.system(cmd) + +def check_system(): + """检查系统环境""" + if platform.system() != "Linux": + print("目前只支持 Linux 系统") + sys.exit(1) + + # 检查必要的命令是否存在 + required_commands = ['curl', 'tar', 'systemctl'] + for cmd in required_commands: + if shutil.which(cmd) is None: + print(f"缺少必要的命令: {cmd}") + sys.exit(1) + +def verify_zip_file(file_path): + """验证zip文件是否有效""" + try: + # 检查文件是否存在且非空 + if not os.path.exists(file_path) or os.path.getsize(file_path) == 0: + return False + + # 尝试打开zip文件 + with zipfile.ZipFile(file_path, 'r') as zip_ref: + # 检查zip文件完整性 + if zip_ref.testzip() is not None: + return False + # 检查是否包含 easytier 文件 + file_list = zip_ref.namelist() + return any('easytier' in f.lower() for f in file_list) + + except zipfile.BadZipFile: + return False + except Exception: + return False + +def show_progress(count, block_size, total_size): + """显示下载进度""" + if total_size > 0: + percent = min(100, count * block_size * 100 / total_size) + # 计算进度条长度,总长50字符 + bar_len = 50 + filled_len = int(bar_len * count * block_size / total_size) + bar = '=' * filled_len + '-' * (bar_len - filled_len) + # 计算下载速度和已下载大小 + downloaded = count * block_size + downloaded_mb = downloaded / (1024 * 1024) + total_mb = total_size / (1024 * 1024) + # 输出进度信息 + print(f'\r[下载进度] [{bar}] {percent:.1f}% ({downloaded_mb:.1f}MB/{total_mb:.1f}MB)', end='') + if count * block_size >= total_size: + print() # 下载完成后换行 + +def download_easytier(): + """下载最新版本的 EasyTier到指定目录""" + easytier_zip = os.path.join(DOWNLOAD_DIR, 'easytier.zip') + + try: + need_download = True + + # 检查已存在的文件 + if os.path.exists(easytier_zip): + print("检查已下载的文件...") + if verify_zip_file(easytier_zip): + print("使用已下载的 EasyTier 安装包") + need_download = False + else: + print("已下载的文件无效,将重新下载") + os.remove(easytier_zip) + + # 下载文件 + if need_download: + print("正在下载 EasyTier...") + os.makedirs(DOWNLOAD_DIR, exist_ok=True) + download_url = "https://github.com/EasyTier/EasyTier/releases/download/v2.2.0/easytier-linux-x86_64-v2.2.0.zip" + + # 使用 urllib 下载文件,显示进度 + try: + urllib.request.urlretrieve(download_url, easytier_zip, show_progress) + except urllib.error.URLError as e: + raise Exception(f"下载失败: {str(e)}") + + # 验证新下载的文件 + if not verify_zip_file(easytier_zip): + raise Exception("下载的文件无效,请检查网络连接") + + # 解压文件 + print("正在解压文件...") + # 解压之前要rm INSTALL_DIR + os.system("rm -rf " + INSTALL_DIR) + os.makedirs(INSTALL_DIR, exist_ok=True) + + with zipfile.ZipFile(easytier_zip, 'r') as zip_ref: + # 查看解压后的文件结构 + file_list = zip_ref.namelist() + print("文件列表:", file_list) + + # 解压文件 + zip_ref.extractall(INSTALL_DIR) + + # 检查解压后的目录结构 + print("安装目录内容:") + os.system(f"ls -la {INSTALL_DIR}") + + # 如果目录不存在,创建它 + extracted_dir = os.path.join(INSTALL_DIR, 'easytier') + if not os.path.exists(extracted_dir): + os.makedirs(extracted_dir) + # 移动所有可执行文件到这个目录 + for exec_name in EXEC_NAMES: + src = os.path.join(INSTALL_DIR, f'easytier-linux-x86_64/{exec_name}') + dst = os.path.join(extracted_dir, exec_name) + if os.path.exists(src): + os.rename(src, dst) + else: + raise Exception(f"未找到可执行文件: {exec_name}") + + # 检查所有可执行文件是否存在 + for exec_name in EXEC_NAMES: + if not os.path.exists(EXEC_PATHS[exec_name]): + raise Exception(f"解压失败,未找到 {exec_name} 可执行文件") + # 设置可执行权限 + os.chmod(EXEC_PATHS[exec_name], 0o755) + + # 创建软链接 + for exec_name in EXEC_NAMES: + if os.path.exists(LINK_PATHS[exec_name]): + run_with_root(f'rm -rf {LINK_PATHS[exec_name]}') + run_with_root(f'ln -s {EXEC_PATHS[exec_name]} {LINK_PATHS[exec_name]}') + + # 创建主软链接 (easytier -> easytier-core) + if os.path.exists(MAIN_LINK): + run_with_root(f'rm -rf {MAIN_LINK}') + run_with_root(f'ln -s {EXEC_PATHS[MAIN_EXEC]} {MAIN_LINK}') + + + except Exception as e: + if os.path.exists(easytier_zip): + os.remove(easytier_zip) + raise e + +def create_systemd_service(): + """创建并启用 systemd 服务""" + print("\n开始配置 EasyTier 服务...") + + # 准备配置命令 + cmd = [ + LINK_PATHS[MAIN_EXEC], + '--network-name', NETWORK_NAME, + '--network-secret', NETWORK_SECRET, + '-i', # 使用DHCP自动分配IP + IPV4 + ] + + if USE_PUBLIC_NODE: + # 使用公共节点 + cmd.extend(['-p', PUBLIC_NODE]) + + # 如果不使用公共节点,则监听本地地址 + cmd.extend(['--listeners', LISTEN_PORT]) + + # 生成systemd服务文件 + service_content = f"""[Unit] +Description=EasyTier Service +After=network.target + +[Service] +Type=simple +ExecStart={' '.join(cmd)} +Restart=always +User=root + +[Install] +WantedBy=multi-user.target +""" + + # 写入服务文件 + service_path = '/etc/systemd/system/easytier.service' + with open('/tmp/easytier.service', 'w') as f: + f.write(service_content) + run_with_root(f'mv /tmp/easytier.service {service_path}') + + # 启用并启动服务 + run_with_root('systemctl daemon-reload') + run_with_root('systemctl enable easytier') + run_with_root('systemctl restart easytier') + + print("EasyTier 服务配置完成!") + + + +def check_installation(): + """检查 EasyTier 是否已安装""" + return os.path.exists('/opt/easytier/easytier') + +def main(): + try: + check_system() + download_easytier() + create_systemd_service() + + # 等待服务启动 + time.sleep(2) + + # 显示状态 + print("\n当前配置信息:") + print(f"网络名称: {NETWORK_NAME}") + if USE_PUBLIC_NODE: + print(f"使用公共节点: {PUBLIC_NODE}") + else: + print(f"监听地址: {LISTEN_PORT}") + + print("\n使用以下命令查看服务状态:") + print("systemctl status easytier") + + except Exception as e: + print(f"错误: {str(e)}") + sys.exit(1) + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/demos/_java_serverless_lib/core/pom.xml b/demos/_java_serverless_lib/core/pom.xml index 98e5708..0d3af23 100644 --- a/demos/_java_serverless_lib/core/pom.xml +++ b/demos/_java_serverless_lib/core/pom.xml @@ -44,6 +44,11 @@ org.crac crac + + io.minio + minio + 8.5.2 + diff --git a/demos/_java_serverless_lib/core/src/main/java/io/serverless_lib/BeanConfig.java b/demos/_java_serverless_lib/core/src/main/java/io/serverless_lib/BeanConfig.java index 402edb0..b3101f3 100644 --- a/demos/_java_serverless_lib/core/src/main/java/io/serverless_lib/BeanConfig.java +++ b/demos/_java_serverless_lib/core/src/main/java/io/serverless_lib/BeanConfig.java @@ -24,4 +24,9 @@ public RpcHandleOwner rpcHandleOwner() { public CracManager cracManager() { return new CracManager(); } + + @Bean + public DataApi dataApi() { + return new DataApi(); + } } diff --git a/demos/_java_serverless_lib/core/src/main/java/io/serverless_lib/DataApi.java b/demos/_java_serverless_lib/core/src/main/java/io/serverless_lib/DataApi.java new file mode 100644 index 0000000..e30dfdc --- /dev/null +++ b/demos/_java_serverless_lib/core/src/main/java/io/serverless_lib/DataApi.java @@ -0,0 +1,202 @@ +package io.serverless_lib; + +import io.minio.BucketExistsArgs; +import io.minio.GetObjectArgs; +import io.minio.MinioClient; +import io.minio.MakeBucketArgs; +import io.minio.PutObjectArgs; +import io.minio.RemoveObjectArgs; +import io.minio.errors.MinioException; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.security.InvalidKeyException; +import java.security.NoSuchAlgorithmException; +import java.util.concurrent.ConcurrentHashMap; +import java.util.Map; + +/** + * DataApi用于处理数据存储,支持内存模式和MinIO模式 + */ +public class DataApi { + + private volatile boolean initialized = false; + private Map memoryStorage; + private MinioClient minioClient; + private String storageBucket; + private String storageMode; + + /** + * 延迟初始化函数,使用线程安全的双重检查锁定模式 + * @param use_minio MinIO连接字符串,为空时使用内存Map存储 + */ + public void init(String use_minio) { + // 第一次检查,避免每次调用都加锁 + if (initialized) { + return; + } + + // 使用同步块保护初始化过程 + synchronized (this) { + // 二次检查,防止其他线程已经完成了初始化 + if (initialized) { + return; + } + + if (use_minio == null || use_minio.isEmpty()) { + // 使用内存Map模式 + memoryStorage = new ConcurrentHashMap<>(); + storageMode = "memory"; + System.out.println("DataApi initialized in memory mode"); + } else { + // 连接到MinIO + try { + // 解析MinIO连接字符串 (格式示例: "http://localhost:9000,accessKey,secretKey,bucketName") + String[] parts = use_minio.split(","); + String endpoint = parts[0]; + String accessKey = parts.length > 1 ? parts[1] : ""; + String secretKey = parts.length > 2 ? parts[2] : ""; + storageBucket = parts.length > 3 ? parts[3] : "serverless-bench"; + + minioClient = MinioClient.builder() + .endpoint(endpoint) + .credentials(accessKey, secretKey) + .build(); + + // 确保存储桶存在 + boolean found = minioClient.bucketExists(BucketExistsArgs.builder().bucket(storageBucket).build()); + if (!found) { + minioClient.makeBucket(MakeBucketArgs.builder().bucket(storageBucket).build()); + } + + storageMode = "minio"; + System.out.println("DataApi initialized in MinIO mode: " + endpoint); + } catch (Exception e) { + // 连接MinIO失败,回退到内存模式 + memoryStorage = new ConcurrentHashMap<>(); + storageMode = "memory"; + System.err.println("Failed to connect to MinIO, fallback to memory mode: " + e.getMessage()); + } + } + + // 最后设置初始化标志 + initialized = true; + } + } + + /** + * 保存数据 + * @param key 键 + * @param value 字节数组值 + * @throws IOException 操作失败时抛出异常 + */ + public void put(String key, byte[] value) throws IOException { + if (!initialized) { + throw new IllegalStateException("DataApi not initialized. Call init() first."); + } + + if ("memory".equals(storageMode)) { + memoryStorage.put(key, value); + } else { + try { + // 将字节数组上传到MinIO + ByteArrayInputStream bais = new ByteArrayInputStream(value); + minioClient.putObject( + PutObjectArgs.builder() + .bucket(storageBucket) + .object(key) + .stream(bais, value.length, -1) + .contentType("application/octet-stream") + .build() + ); + bais.close(); + } catch (MinioException | InvalidKeyException | NoSuchAlgorithmException e) { + throw new IOException("Failed to put object to MinIO: " + e.getMessage(), e); + } + } + } + + /** + * 获取数据 + * @param key 键 + * @return 字节数组值,如果键不存在则返回null + * @throws IOException 操作失败时抛出异常 + */ + public byte[] get(String key) throws IOException { + if (!initialized) { + throw new IllegalStateException("DataApi not initialized. Call init() first."); + } + + if ("memory".equals(storageMode)) { + return memoryStorage.get(key); + } else { + try { + // 从MinIO获取对象 + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + InputStream stream = minioClient.getObject( + GetObjectArgs.builder() + .bucket(storageBucket) + .object(key) + .build() + ); + + // 读取字节到输出流 + byte[] buffer = new byte[16384]; + int bytesRead; + while ((bytesRead = stream.read(buffer)) > 0) { + baos.write(buffer, 0, bytesRead); + } + + stream.close(); + baos.close(); + return baos.toByteArray(); + + } catch (MinioException | InvalidKeyException | NoSuchAlgorithmException e) { + throw new IOException("Failed to get object from MinIO: " + e.getMessage(), e); + } + } + } + + /** + * 删除数据 + * @param key 要删除的键 + * @throws IOException 操作失败时抛出异常 + */ + public void delete(String key) throws IOException { + if (!initialized) { + throw new IllegalStateException("DataApi not initialized. Call init() first."); + } + + if ("memory".equals(storageMode)) { + memoryStorage.remove(key); + } else { + try { + // 从MinIO删除对象 + minioClient.removeObject( + RemoveObjectArgs.builder() + .bucket(storageBucket) + .object(key) + .build() + ); + } catch (MinioException | InvalidKeyException | NoSuchAlgorithmException e) { + throw new IOException("Failed to delete object from MinIO: " + e.getMessage(), e); + } + } + } + + /** + * 检查是否已初始化 + */ + public boolean isInitialized() { + return initialized; + } + + /** + * 获取当前存储模式 + */ + public String getStorageMode() { + return storageMode; + } +} \ No newline at end of file diff --git a/demos/_java_serverless_lib/core/src/main/java/io/serverless_lib/UdsBackend.java b/demos/_java_serverless_lib/core/src/main/java/io/serverless_lib/UdsBackend.java index 5811017..caeb486 100644 --- a/demos/_java_serverless_lib/core/src/main/java/io/serverless_lib/UdsBackend.java +++ b/demos/_java_serverless_lib/core/src/main/java/io/serverless_lib/UdsBackend.java @@ -212,6 +212,7 @@ protected void decode(ChannelHandlerContext ctx, ByteBuf in, List out) // 读取长度字段 int length = in.readInt(); int taskId = in.readInt(); + System.out.println("Received message from server: " + length + " bytes, taskId: " + taskId); // 确保有足够的字节来读取数据 if (in.readableBytes() < length) { diff --git a/demos/_java_serverless_lib/scripts/prepare_protoc.py b/demos/_java_serverless_lib/scripts/prepare_protoc.py index 33de384..cdc6611 100644 --- a/demos/_java_serverless_lib/scripts/prepare_protoc.py +++ b/demos/_java_serverless_lib/scripts/prepare_protoc.py @@ -31,7 +31,7 @@ def run_cmd_return(cmd): with open("config.yaml") as f: conf=yaml.safe_load(f) waverless_rel_path=conf["waverless_rel_path"] -proto_src_dir=os.path.join(waverless_rel_path,"src/worker/func/shared/") +proto_src_dir=os.path.join(waverless_rel_path,"src/main/src/general/app/app_shared/") proto_src=os.path.join(proto_src_dir,'process_rpc_proto.proto') diff --git a/demos/img_resize/src/main/java/test/functions/Resize.java b/demos/img_resize/src/main/java/test/functions/Resize.java index 1cbf9d9..57d229c 100644 --- a/demos/img_resize/src/main/java/test/functions/Resize.java +++ b/demos/img_resize/src/main/java/test/functions/Resize.java @@ -16,15 +16,20 @@ import java.io.InputStream; import java.nio.ByteBuffer; import javax.imageio.ImageIO; +import org.springframework.beans.factory.annotation.Autowired; +import io.serverless_lib.DataApi; public class Resize { // Initialize Minio Client - MinioClient minioClient = - MinioClient.builder() - .endpoint("http://192.168.31.96:9009") - .credentials("minioadmin", "minioadmin123") - .build(); + // MinioClient minioClient = + // MinioClient.builder() + // .endpoint("http://192.168.31.96:9009") + // .credentials("minioadmin", "minioadmin123") + // .build(); + @Autowired + DataApi dataApi; + // 辅助方法:将输入流读取到 ByteBuffer private static ByteBuffer readToByteBuffer(InputStream inputStream) throws IOException { // Start with a default buffer size @@ -80,30 +85,41 @@ public JsonObject call(JsonObject args) { String imagepath = args.get("image_s3_path").getAsString(); int targetWidth = args.get("target_width").getAsInt(); int targetHeight = args.get("target_height").getAsInt(); + String useMinio = args.get("use_minio").getAsString(); + // print useMinio + System.out.println("--------------------------------"); + System.out.println("imagepath: " + imagepath); + System.out.println("targetWidth: " + targetWidth); + System.out.println("targetHeight: " + targetHeight); + System.out.println("useMinio: " + useMinio); + System.out.println("--------------------------------"); + + dataApi.init(useMinio); JsonObject result = new JsonObject(); try { // Download the file from the bucket - GetObjectArgs getObjectArgs = GetObjectArgs.builder() - .bucket("serverless-bench") - .object(imagepath) - .build(); - InputStream downloadedStream = minioClient.getObject(getObjectArgs); - ByteBuffer bf=readToByteBuffer(downloadedStream); + // GetObjectArgs getObjectArgs = GetObjectArgs.builder() + // .bucket("serverless-bench") + // .object(imagepath) + // .build(); + byte[] imageData = dataApi.get(imagepath); + // ByteBuffer bf=readToByteBuffer(downloadedStream); - byte[] resizedImage = resizeImage(bf.array(), targetWidth, targetHeight); + byte[] resizedImage = resizeImage(imageData, targetWidth, targetHeight); - ByteArrayInputStream inputStream = new ByteArrayInputStream(resizedImage); - - minioClient.putObject( - PutObjectArgs.builder() - .bucket("serverless-bench") - .object(renameFile(imagepath)) - .stream(inputStream, resizedImage.length, -1) - .contentType("image/jpeg") - .build() - ); + // ByteArrayInputStream inputStream = new ByteArrayInputStream(resizedImage); + + dataApi.put(renameFile(imagepath), resizedImage); + // minioClient.putObject( + // PutObjectArgs.builder() + // .bucket("serverless-bench") + // .object(renameFile(imagepath)) + // .stream(inputStream, resizedImage.length, -1) + // .contentType("image/jpeg") + // .build() + // ); diff --git a/demos/simple_demo/dependency-reduced-pom.xml b/demos/simple_demo/dependency-reduced-pom.xml new file mode 100644 index 0000000..2dfebc8 --- /dev/null +++ b/demos/simple_demo/dependency-reduced-pom.xml @@ -0,0 +1,44 @@ + + + 4.0.0 + test + simple-demo + 1.0-SNAPSHOT + + + + maven-compiler-plugin + 3.8.1 + + 11 + 11 + + + + maven-shade-plugin + 3.4.1 + + + package + + shade + + + simple_demo-1.0-SNAPSHOT-jar-with-dependencies + + + test.Application + + + + + + + + + + 11 + 11 + UTF-8 + + diff --git a/demos/simple_demo/pack/app.jar b/demos/simple_demo/pack/app.jar new file mode 100644 index 0000000..9f80c25 Binary files /dev/null and b/demos/simple_demo/pack/app.jar differ diff --git a/demos/simple_demo/pack/app.yml b/demos/simple_demo/pack/app.yml new file mode 100644 index 0000000..ee4f437 --- /dev/null +++ b/demos/simple_demo/pack/app.yml @@ -0,0 +1,7 @@ +name: simple_demo +version: 1.0.0 +functions: + - name: simple + handler: test.functions.Simple + memory: 128 + timeout: 30 \ No newline at end of file diff --git a/demos/simple_demo/pom.xml b/demos/simple_demo/pom.xml new file mode 100644 index 0000000..25b724e --- /dev/null +++ b/demos/simple_demo/pom.xml @@ -0,0 +1,61 @@ + + + 4.0.0 + + test + simple-demo + 1.0-SNAPSHOT + + + 11 + 11 + UTF-8 + + + + + com.google.code.gson + gson + 2.8.9 + + + + + + + + org.apache.maven.plugins + maven-compiler-plugin + 3.8.1 + + 11 + 11 + + + + org.apache.maven.plugins + maven-shade-plugin + 3.4.1 + + + package + + shade + + + simple_demo-1.0-SNAPSHOT-jar-with-dependencies + + + test.Application + + + + + + + + + + \ No newline at end of file diff --git a/demos/simple_demo/src/main/java/test/Application.java b/demos/simple_demo/src/main/java/test/Application.java new file mode 100644 index 0000000..48a8c35 --- /dev/null +++ b/demos/simple_demo/src/main/java/test/Application.java @@ -0,0 +1,13 @@ +package test; + +import com.google.gson.JsonObject; +import test.functions.Simple; + +public class Application { + public static void main(String[] args) { + Simple simple = new Simple(); + JsonObject input = new JsonObject(); + JsonObject result = simple.call(input); + System.out.println(result.toString()); + } +} \ No newline at end of file diff --git a/demos/simple_demo/src/main/java/test/functions/Simple.java b/demos/simple_demo/src/main/java/test/functions/Simple.java new file mode 100644 index 0000000..3518158 --- /dev/null +++ b/demos/simple_demo/src/main/java/test/functions/Simple.java @@ -0,0 +1,11 @@ +package test.functions; + +import com.google.gson.JsonObject; + +public class Simple { + public JsonObject call(JsonObject input) { + JsonObject result = new JsonObject(); + result.addProperty("message", "Hello from Simple function!"); + return result; + } +} \ No newline at end of file diff --git a/middlewares/1.firsttime_deploy_openwhisk.py b/middlewares/1.firsttime_deploy_openwhisk.py new file mode 100644 index 0000000..3ffa63a --- /dev/null +++ b/middlewares/1.firsttime_deploy_openwhisk.py @@ -0,0 +1,9 @@ +import os,sys +CUR_FDIR = os.path.dirname(os.path.abspath(__file__)); cur_scan=CUR_FDIR; scan=[["pylib.py" in os.listdir(cur_scan),cur_scan,exec('global cur_scan;cur_scan=os.path.join(cur_scan, "..")')] for _ in range(10)]; found_pylib=[x[0] for x in scan]; pylib_dir_idx=found_pylib.index(True); assert pylib_dir_idx>=0, "pylib.py not found"; print(scan[pylib_dir_idx][1]); ROOT_DIR=os.path.abspath(os.path.join(CUR_FDIR, scan[pylib_dir_idx][1])); sys.path.append(ROOT_DIR) +import pylib + +os.chdir("openwhisk") + +pylib.os_system_sure(f"python3 1.prepare.py") +pylib.os_system_sure(f"python3 3.deploy_ow.py") + diff --git a/middlewares/1.firsttime_deploy_waverless.py b/middlewares/1.firsttime_deploy_waverless.py new file mode 100644 index 0000000..f8c1655 --- /dev/null +++ b/middlewares/1.firsttime_deploy_waverless.py @@ -0,0 +1,11 @@ +import os,sys +CUR_FDIR = os.path.dirname(os.path.abspath(__file__)); cur_scan=CUR_FDIR; scan=[["pylib.py" in os.listdir(cur_scan),cur_scan,exec('global cur_scan;cur_scan=os.path.join(cur_scan, "..")')] for _ in range(10)]; found_pylib=[x[0] for x in scan]; pylib_dir_idx=found_pylib.index(True); assert pylib_dir_idx>=0, "pylib.py not found"; print(scan[pylib_dir_idx][1]); ROOT_DIR=os.path.abspath(os.path.join(CUR_FDIR, scan[pylib_dir_idx][1])); sys.path.append(ROOT_DIR) +import pylib + +os.chdir(CUR_FDIR) +pylib.os_system_sure(f"telego cmd --cmd deploy/bin_waverless/prepare") +# pylib.os_system_sure(f"telego cmd --cmd deploy/bin_waverless2/upload") + +pylib.os_system_sure(f"telego cmd --cmd deploy/dist_waverless/prepare") +# pylib.os_system_sure(f"telego cmd --cmd deploy/dist_waverless/upload") +# pylib.os_system_sure(f"telego cmd --cmd deploy/dist_waverless/apply") diff --git a/middlewares/2.update_waverless_bin.py b/middlewares/2.update_waverless_bin.py new file mode 100644 index 0000000..4dde3ee --- /dev/null +++ b/middlewares/2.update_waverless_bin.py @@ -0,0 +1,12 @@ +import os,sys +CUR_FDIR = os.path.dirname(os.path.abspath(__file__)); cur_scan=CUR_FDIR; scan=[["pylib.py" in os.listdir(cur_scan),cur_scan,exec('global cur_scan;cur_scan=os.path.join(cur_scan, "..")')] for _ in range(10)]; found_pylib=[x[0] for x in scan]; pylib_dir_idx=found_pylib.index(True); assert pylib_dir_idx>=0, "pylib.py not found"; print(scan[pylib_dir_idx][1]); ROOT_DIR=os.path.abspath(os.path.join(CUR_FDIR, scan[pylib_dir_idx][1])); sys.path.append(ROOT_DIR) +import pylib + +os.chdir(CUR_FDIR) + +pylib.os_system(f"rm -rf waverless/bin_waverless2/prepare_cache/waverless_amd64") +pylib.os_system(f"rm -rf waverless/bin_waverless2/prepare_cache/waverless_entry_amd64") +pylib.os_system_sure(f"telego cmd --cmd deploy/bin_waverless/prepare") +pylib.os_system_sure(f"telego cmd --cmd deploy/bin_waverless/upload") + + diff --git a/middlewares/3.redeploy_waverless.py b/middlewares/3.redeploy_waverless.py new file mode 100644 index 0000000..7621d11 --- /dev/null +++ b/middlewares/3.redeploy_waverless.py @@ -0,0 +1,7 @@ +import os,sys +CUR_FDIR = os.path.dirname(os.path.abspath(__file__)); cur_scan=CUR_FDIR; scan=[["pylib.py" in os.listdir(cur_scan),cur_scan,exec('global cur_scan;cur_scan=os.path.join(cur_scan, "..")')] for _ in range(10)]; found_pylib=[x[0] for x in scan]; pylib_dir_idx=found_pylib.index(True); assert pylib_dir_idx>=0, "pylib.py not found"; print(scan[pylib_dir_idx][1]); ROOT_DIR=os.path.abspath(os.path.join(CUR_FDIR, scan[pylib_dir_idx][1])); sys.path.append(ROOT_DIR) +import pylib + +os.chdir(CUR_FDIR) + +pylib.os_system_sure(f"telego cmd --cmd deploy/dist_waverless/apply/default") diff --git a/middlewares/4.debug_waverless.py b/middlewares/4.debug_waverless.py new file mode 100644 index 0000000..53e78a6 --- /dev/null +++ b/middlewares/4.debug_waverless.py @@ -0,0 +1,44 @@ +import subprocess, sys, os +import json + +def get_pods_on_node(node_name): + try: + # 执行 kubectl 命令获取节点上的 Pod 列表 + result = subprocess.run( + ["kubectl", "get", "pods", "--all-namespaces", "-o", "json"], + capture_output=True, + text=True, + check=True + ) + + # 解析 JSON 输出 + pods = json.loads(result.stdout) + + # 筛选出指定节点上的 Pod + pods_on_node = [ + pod for pod in pods['items'] + if pod['spec']['nodeName'] == node_name + ] + + return pods_on_node + + except subprocess.CalledProcessError as e: + print(f"Error executing kubectl command: {e}") + return [] + except json.JSONDecodeError as e: + print(f"Error parsing JSON output: {e}") + return [] + +# 示例调用 +if len(sys.argv) < 2: + print("Usage: python script.py ") + sys.exit(1) + +node_name = sys.argv[1] +pods = get_pods_on_node(node_name) +for pod in pods: + # print(f"Pod Name: {pod['metadata']['name']} Namespace: {pod['metadata']['namespace']}") + podname=pod['metadata']['name'] + if podname.find("waverless")>=0: + print(f"Pod Name: {podname} Namespace: {pod['metadata']['namespace']}") + os.system(f"kubectl logs -f -n {pod['metadata']['namespace']} {podname}") \ No newline at end of file diff --git a/middlewares/cluster_config.yml b/middlewares/cluster_config.yml index a6a88fa..5937522 100644 --- a/middlewares/cluster_config.yml +++ b/middlewares/cluster_config.yml @@ -1,11 +1,14 @@ -# lab1: -# ip: 192.168.31.162 -# is_master: -lab2: - ip: 192.168.31.87 +# according to waverless/scripts/telego/dist/deployment.yml + +lab1: + ip: 192.168.31.162:2501 is_master: +lab2: + ip: 192.168.31.87:2501 + lab3: - ip: 192.168.31.96 + ip: 192.168.31.96:2501 + # is_master: # lab7: # ip: 192.168.31.89 @@ -19,4 +22,9 @@ lab3: # lab11: # ip: 192.168.31.138 # lab12: -# ip: 192.168.31.171 \ No newline at end of file +# ip: 192.168.31.171 +# master: +# ip: 127.0.0.1 +# is_master: +# worker: +# ip: 127.0.0.1 \ No newline at end of file diff --git a/middlewares/waverless/3.add_func.py b/middlewares/waverless/3.add_func.py index 044b15f..ad592ce 100644 --- a/middlewares/waverless/3.add_func.py +++ b/middlewares/waverless/3.add_func.py @@ -7,7 +7,27 @@ CUR_FPATH = os.path.abspath(__file__) CUR_FDIR = os.path.dirname(CUR_FPATH) # chdir to the directory of this script + + +def run_cmd_with_res(cmd): + print(f"执行命令:{cmd}") + result = os.popen(cmd) + # print(f"执行结果:{result.read()}") + return result.read() + + +import sys +if len(sys.argv) !=4: + print("Usage: python 3.add_func.py ") + exit(1) +demo_app=sys.argv[1] +rename_sub=sys.argv[2] +cluster_config_path=sys.argv[3] +cluster_config_path=os.path.abspath(cluster_config_path) + +# before chdir, we transform the cluster_config_path to absolute path os.chdir(CUR_FDIR) +################################################################################################# class FunctionContainer: pass def load_functions_into_object(file_path, obj): @@ -27,23 +47,8 @@ def load_functions_into_object(file_path, obj): ################################################################################################# - -def run_cmd_with_res(cmd): - print(f"执行命令:{cmd}") - result = os.popen(cmd) - # print(f"执行结果:{result.read()}") - return result.read() - - -import sys -if len(sys.argv) !=3: - print("Usage: python 3.add_func.py ") - exit(1) -demo_app=sys.argv[1] -rename_sub=sys.argv[2] - # targetname=sys.argv[2] -srcyaml=pylib.read_yaml("../cluster_config.yml") +srcyaml=pylib.read_yaml(cluster_config_path) first_worker_ip="" for n in srcyaml: if "is_master" not in srcyaml[n]: @@ -54,8 +59,13 @@ def run_cmd_with_res(cmd): os.chdir(f"../../demos") # pylib.os_system_sure(f"python3 scripts/1.gen_waverless_app.py {demo_app}") def upload_app(appname,rename): - appdir=f"scripts/waverless/{appname}/pack" - + abssrc=os.path.abspath(f"scripts/waverless/{appname}/target/{appname}-1.0-SNAPSHOT-jar-with-dependencies.jar") + appdir=os.path.abspath(f"scripts/waverless/{appname}/pack") + target=os.path.abspath(f"scripts/waverless/{appname}/pack/app.jar") + print(f"copying {abssrc} to {target}") + # appdir=f"scripts/waverless/{appname}/pack" + # 复制 JAR 文件到应用包 + os.system(f"cp {abssrc} {target}") os.chdir(appdir) @@ -63,6 +73,7 @@ def upload_app(appname,rename): entries_concat=" ".join(entries) print(f"{appdir} contains {entries_concat}") + print(f"zipping app pack: {entries_concat} to {rename}.zip") os.system(f"zip -r {rename}.zip {entries_concat}") os.system(f"mv {rename}.zip {CUR_FDIR}") os.chdir(CUR_FDIR) @@ -72,8 +83,9 @@ def upload_app(appname,rename): f= open(filepath, 'rb') files.append((rename, (filepath.split('/')[-1], f, 'application/zip'))) + print(f"uploading {filepath} to {first_worker_ip}") try: - response = requests.post(f'http://{first_worker_ip}:2501/appmgmt/upload_app', files=files) + response = requests.post(f'http://{first_worker_ip}/appmgmt/upload_app', files=files) print(response.status_code, response.text) except requests.exceptions.RequestException as e: print(f"An error occurred: {e}") diff --git a/middlewares/waverless/waverless b/middlewares/waverless/waverless index 1778957..11469fe 160000 --- a/middlewares/waverless/waverless +++ b/middlewares/waverless/waverless @@ -1 +1 @@ -Subproject commit 177895768f85c7c063cee89997dbae229f5ecf51 +Subproject commit 11469fe17f28fe4fe763a61bdf74ba9f161bdabf