diff --git a/src/main/src/general/app/app_owned/wasm_host_funcs/kv.rs b/src/main/src/general/app/app_owned/wasm_host_funcs/kv.rs index 74f2d34..e0ba1ac 100644 --- a/src/main/src/general/app/app_owned/wasm_host_funcs/kv.rs +++ b/src/main/src/general/app/app_owned/wasm_host_funcs/kv.rs @@ -124,7 +124,7 @@ async fn kv_batch_ope( ) -> Result, HostFuncError> { let opes_arg_ptr = args[0].to_i32(); let opes_arg_len = args[1].to_i32(); - let opes_id = utils::mutref::(&caller, args[2].to_i32()); + let _opes_id = utils::mutref::(&caller, args[2].to_i32()); let args = utils::i32slice(&caller, opes_arg_ptr, opes_arg_len); let func_ctx = unsafe { #[cfg(feature = "unsafe-log")] @@ -153,7 +153,7 @@ async fn kv_batch_ope( proto::kv::kv_request::KvPutRequest { kv: Some(KvPair { key: key.to_owned(), - value: value.to_owned(), + values: vec![value.to_owned()], }), }, )), @@ -167,6 +167,7 @@ async fn kv_batch_ope( requests.push(KvRequest { op: Some(proto::kv::kv_request::Op::Get( proto::kv::kv_request::KvGetRequest { + idxs: vec![0], range: Some(KeyRange { start: key.to_owned(), end: vec![], @@ -221,69 +222,70 @@ async fn kv_batch_ope( } } // tracing::debug!("requests:{:?}", requests); - let prev_kv_opeid = func_ctx + let _prev_kv_opeid = func_ctx .event_ctx_mut() .take_prev_kv_opeid() .map_or(-1, |v| v as i64); match m_kv_user_client() .kv_requests( - func_ctx.app(), - func_ctx.func(), + todo!("wasm kv is not updated for src wait"), KvRequests { requests, app: func_ctx.app().to_owned(), func: func_ctx.func().to_owned(), - prev_kv_opeid, + prev_kv_opeid: _prev_kv_opeid, }, ) .await { - Ok(res) => { - let id = NEXT_CACHE_ID.fetch_add(1, std::sync::atomic::Ordering::Relaxed); - // Write back the results to wasm runtime - let mut cur_idx = 1; - let mut resps = res.responses.iter(); - // Construct the requests - for _ in 0..ope_cnt { - let ope_type = args[cur_idx]; - match ope_type as usize { - // set - SET_ID => { - let _ = resps.next().unwrap(); - cur_idx += 5; - } - // get - GET_ID => { - let kvs = resps.next().unwrap().common_kvs().unwrap(); - // get len - *utils::mutref::(&caller, args[cur_idx + 3]) = if kvs.len() > 0 { - kvs.get(0).unwrap().value.len() as i32 - } else { - -1 - }; - cur_idx += 4; - } - // lock - LOCK_ID => { - if let Some(lockid) = resps.next().unwrap().lock_id() { - // lock id is allocated by the remote when call the lock - *utils::mutref::(&caller, args[cur_idx + 4]) = lockid; - } else { - // unlock, no response - } - cur_idx += 5; - } - DELETE_ID => { - let _ = resps.next().unwrap(); - cur_idx += 3; - } - _ => { - panic!("not implemented"); - } - } - } - RECENT_KV_CACHE.insert(id, res); - *opes_id = id; + Ok(_res) => { + // https://fvd360f8oos.feishu.cn/wiki/M4ubwJkvcichuHkiGhjc0miHn5f#share-FuaYd5qZboOkhZxYMc2cCV0fndh + todo!("wasm kv is not updated for multi value items (dataset)"); + // let id = NEXT_CACHE_ID.fetch_add(1, std::sync::atomic::Ordering::Relaxed); + // // Write back the results to wasm runtime + // let mut cur_idx = 1; + // let mut resps = res.responses.iter(); + // // Construct the requests + // for _ in 0..ope_cnt { + // let ope_type = args[cur_idx]; + // match ope_type as usize { + // // set + // SET_ID => { + // let _ = resps.next().unwrap(); + // cur_idx += 5; + // } + // // get + // GET_ID => { + // let get_resp = resps.next().unwrap().get_kvs().unwrap(); + // // get len + // *utils::mutref::(&caller, args[cur_idx + 3]) = if get_resp.len() > 0 { + // kvs.get(0).unwrap().value.len() as i32 + // } else { + // -1 + // }; + // cur_idx += 4; + // } + // // lock + // LOCK_ID => { + // if let Some(lockid) = resps.next().unwrap().lock_id() { + // // lock id is allocated by the remote when call the lock + // *utils::mutref::(&caller, args[cur_idx + 4]) = lockid; + // } else { + // // unlock, no response + // } + // cur_idx += 5; + // } + // DELETE_ID => { + // let _ = resps.next().unwrap(); + // cur_idx += 3; + // } + // _ => { + // panic!("not implemented"); + // } + // } + // } + // RECENT_KV_CACHE.insert(id, res); + // *opes_id = id; } Err(err) => { tracing::error!("kv batch ope error:{}", err); @@ -304,13 +306,18 @@ fn kv_batch_res(caller: Caller, args: Vec) -> Result, while cur_idx < args.len() { let ope_idx = args[cur_idx]; if let Some(res) = res.responses.get(ope_idx as usize) { - if let Some(kvs) = res.common_kvs() { - if let Some(kv) = kvs.get(0) { - let slice = - utils::mutu8sclice(&caller, args[cur_idx + 1], kv.value.len() as i32) - .unwrap(); - slice.copy_from_slice(kvs.get(0).unwrap().value.as_slice()); - } + if let Some(_kvs) = res.get_kvs() { + // https://fvd360f8oos.feishu.cn/wiki/M4ubwJkvcichuHkiGhjc0miHn5f#share-FuaYd5qZboOkhZxYMc2cCV0fndh + todo!("wasm kv is not updated for multi value items (dataset)"); + // if let Some(kv) = kvs.get(0) { + // let slice = utils::mutu8sclice( + // &caller, + // args[cur_idx + 1], + // kv.values[0].len() as i32, + // ) + // .unwrap(); + // slice.copy_from_slice(kv.values[0].as_slice()); + // } } else if let Some(_lock_id) = res.lock_id() { // do nothing } else { diff --git a/src/main/src/general/app/app_owned/wasm_host_funcs/mod.rs b/src/main/src/general/app/app_owned/wasm_host_funcs/mod.rs index 30e3516..65a6fe6 100644 --- a/src/main/src/general/app/app_owned/wasm_host_funcs/mod.rs +++ b/src/main/src/general/app/app_owned/wasm_host_funcs/mod.rs @@ -13,12 +13,10 @@ use result::ResultFuncsRegister; mod utils { use super::UnsafeFunctionCtx; - use crate::general::app::m_executor::{FnExeCtxAsync}; + use crate::general::app::m_executor::FnExeCtxAsync; use crate::general::app::InstanceManager; - use crate::{ - general::m_os::OperatingSystem, sys::LogicalModulesRef, util::SendNonNull, - worker::m_kv_user_client::KvUserClient, - }; + use crate::general::data::m_kv_user_client::KvUserClient; + use crate::{general::m_os::OperatingSystem, sys::LogicalModulesRef, util::SendNonNull}; use wasmedge_sdk::{Caller, Instance, Memory}; pub trait WasmCtx { @@ -127,14 +125,16 @@ mod utils { } pub fn m_kv_user_client() -> &'static KvUserClient { - unsafe { - &(*MODULES.as_ref().unwrap().inner.as_ptr()) - .as_ref() - .unwrap() - .kv_user_client - .as_ref() - .unwrap() - } + // https://fvd360f8oos.feishu.cn/wiki/M4ubwJkvcichuHkiGhjc0miHn5f#share-FuaYd5qZboOkhZxYMc2cCV0fndh + todo!("wasm kv is not updated for src wait and multi value items (dataset)"); + // unsafe { + // &(*MODULES.as_ref().unwrap().inner.as_ptr()) + // .as_ref() + // .unwrap() + // .kv_user_client + // .as_ref() + // .unwrap() + // } } pub fn m_fs<'a>() -> &'a OperatingSystem { diff --git a/src/main/src/general/app/app_shared/mod.rs b/src/main/src/general/app/app_shared/mod.rs index 9716879..358f460 100644 --- a/src/main/src/general/app/app_shared/mod.rs +++ b/src/main/src/general/app/app_shared/mod.rs @@ -2,6 +2,7 @@ pub mod java; pub mod process; pub mod process_instance_man_related; pub mod process_rpc; +pub mod process_rpc_proto_ext; use crate::general::app::instance::InstanceTrait; use crate::general::app::m_executor::{FnExeCtxAsync, FnExeCtxSync}; diff --git a/src/main/src/general/app/app_shared/process.rs b/src/main/src/general/app/app_shared/process.rs index 9d0859b..37f02ef 100644 --- a/src/main/src/general/app/app_shared/process.rs +++ b/src/main/src/general/app/app_shared/process.rs @@ -215,10 +215,22 @@ impl InstanceTrait for ProcessInstance { fn_ctx.func() ); tracing::debug!("before process_rpc::call_func "); - let res = - process_rpc::call_func(fn_ctx.app(), fn_ctx.func(), fn_ctx.http_str_unwrap()).await; + + let res = process_rpc::call_func( + proc_proto::FnTaskId { + call_node_id: fn_ctx.task_id().call_node_id, + task_id: fn_ctx.task_id().task_id, + }, + fn_ctx.app(), + fn_ctx.func(), + fn_ctx.format_arg_to_pass(), + ) + .await; tracing::debug!("after process_rpc::call_func "); - return res.map(|v| Some(v.ret_str)); + match res { + Ok(resp) => Ok(Some(resp.ret_str)), + Err(e) => Err(e.into()), + } } /// Process instances don't support synchronous execution diff --git a/src/main/src/general/app/app_shared/process_rpc.rs b/src/main/src/general/app/app_shared/process_rpc.rs index 9c2dd2a..2ab0b13 100644 --- a/src/main/src/general/app/app_shared/process_rpc.rs +++ b/src/main/src/general/app/app_shared/process_rpc.rs @@ -6,6 +6,8 @@ use self::proc_proto::{FuncCallReq, FuncCallResp}; use super::SharedInstance; use crate::general::app; use crate::general::app::app_shared::process_rpc::proc_proto::AppStarted; +use crate::general::app::app_shared::process_rpc_proto_ext::{ProcRpcExtKvReq, ProcRpcReqExt}; +use crate::general::network::rpc_model::ProcRpcTaskId; use crate::{ general::network::rpc_model::{self, HashValue, MsgIdBind, ReqMsg, RpcCustom}, modules_global_bridge::process_func::ModulesGlobalBrigeInstanceManager, @@ -119,16 +121,19 @@ impl RpcCustom for ProcessRpc { Some(HashValue::Str(res.appid)) } - fn handle_remote_call(_conn: &HashValue, id: u8, buf: &[u8]) -> bool { - tracing::debug!("handle_remote_call: id: {}", id); - let _ = match id { - 4 => (), - id => { - tracing::warn!("handle_remote_call: unsupported id: {}", id); - return false; - } - }; - let err = match id { + fn handle_remote_call( + &self, + conn: &HashValue, + msgid: u8, + taskid: ProcRpcTaskId, + buf: &[u8], + ) -> bool { + tracing::debug!("handle_remote_call: id: {}", msgid); + // let _ = match id { + // 4 => (), + + // }; + let err = match msgid { 4 => match proc_proto::UpdateCheckpoint::decode(buf) { Ok(_req) => { tracing::debug!("function requested for checkpoint, but we ignore it"); @@ -143,7 +148,57 @@ impl RpcCustom for ProcessRpc { } Err(e) => e, }, - _ => unreachable!(), + + 5 => match proc_proto::KvRequest::decode(buf) { + Ok(req) => { + tracing::debug!("function requested for kv"); + let proc_rpc = self.clone(); + let srctaskid = req.fn_taskid(); + let conn = conn.clone(); + let _ = tokio::spawn(async move { + let proc_rpc_res = proc_rpc + .0 + .kv_user_client() + .kv_requests(srctaskid, req.to_proto_kvrequests()) + .await; + match proc_rpc_res { + Ok(mut res) => { + tracing::debug!("function kv request success, sending response"); + match rpc_model::send_resp::( + conn, + taskid, + proc_proto::KvResponse::from(res.responses.pop().unwrap()), + ) + .await + { + Ok(_) => { + tracing::debug!("send kv response success"); + } + Err(e) => { + tracing::warn!("send kv response failed: {:?}", e); + } + } + } + Err(e) => { + tracing::warn!("function kv request failed, error: {:?}", e); + } + } + }); + return true; + } + Err(e) => { + tracing::warn!( + "decode kv request failed with buf length: {}, parital content: {:?}", + buf.len(), + &buf[..20] + ); + e + } + }, + id => { + tracing::warn!("handle_remote_call: unsupported id: {}", id); + return false; + } }; tracing::warn!("handle_remote_call error: {:?}", err); true @@ -168,18 +223,46 @@ impl MsgIdBind for proc_proto::FuncCallResp { } } +impl MsgIdBind for proc_proto::UpdateCheckpoint { + fn id() -> u16 { + 4 + } +} + +impl MsgIdBind for proc_proto::KvRequest { + fn id() -> u16 { + 5 + } +} + +impl MsgIdBind for proc_proto::KvResponse { + fn id() -> u16 { + 6 + } +} + impl ReqMsg for FuncCallReq { type Resp = FuncCallResp; } -pub async fn call_func(app: &str, func: &str, arg: String) -> WSResult { +impl ReqMsg for proc_proto::KvRequest { + type Resp = proc_proto::KvResponse; +} + +pub async fn call_func( + srcfnid: proc_proto::FnTaskId, + app: &str, + func: &str, + arg: String, +) -> WSResult { rpc_model::call( FuncCallReq { + src_task_id: Some(srcfnid), func: func.to_owned(), arg_str: arg, }, HashValue::Str(app.into()), - Duration::from_secs(20), + Duration::from_secs(120), ) .await } diff --git a/src/main/src/general/app/app_shared/process_rpc_proto.proto b/src/main/src/general/app/app_shared/process_rpc_proto.proto index b98801b..392c70e 100644 --- a/src/main/src/general/app/app_shared/process_rpc_proto.proto +++ b/src/main/src/general/app/app_shared/process_rpc_proto.proto @@ -1,22 +1,108 @@ +syntax = "proto3"; package process_rpc_proto; + +//////////////////////////////////////////////////////////// +// Category: Sub Structure >> + + +message FnTaskId{ + uint32 call_node_id=1; + uint32 task_id=2; +} + + +//////////////////////////////////////////////////////////// +// Category: Outgoing >> // Sample message. message AppStarted { - required string appid = 1; - optional string http_port = 2; - required uint32 pid=3; + string appid = 1; + string http_port = 2; + uint32 pid=3; } +// << Category: Incomming +//////////////////////////////////////////////////////////// +// Category: Outgoing RPC >> + message FuncCallReq{ - required string func=1; - required string arg_str=2; + FnTaskId src_task_id=1; + string func=2; + string arg_str=3; } message FuncCallResp{ - required string ret_str=1; + string ret_str=1; } message UpdateCheckpoint{ -} \ No newline at end of file +} + + +message KeyRange { + bytes start=1; + bytes end=2; +} + +message KvPair{ + bytes key=1; + repeated bytes values=2; +} + + +message KvRequest { + message KvPutRequest{ + FnTaskId src_task_id=1; + string app_fn=2; + // required + KvPair kv=3; + } + message KvGetRequest{ + FnTaskId src_task_id=1; + string app_fn=2; + // required + KeyRange range=3; + // required + repeated uint32 idxs=4; + } + message KvDeleteRequest{ + FnTaskId src_task_id=1; + string app_fn=2; + // required + KeyRange range=3; + } + oneof op { + KvPutRequest set=1; + KvGetRequest get=2; + KvDeleteRequest delete=3; + } +} + +// message KvLockWaitAcquireNotifyRequest{ +// uint32 release_id=1; +// } + +// message KvLockWaitAcquireNotifyResponse{} + +message KvPairs{ + repeated KvPair kvs=1; +} + +message KvResponse{ + message KvGetResponse{ + repeated uint32 idxs=1; + repeated bytes values=2; + } + message KvPutOrDelResponse{ + KvPair kv=1; + } + oneof resp { + KvGetResponse get=1; + KvPutOrDelResponse put_or_del=2; + // 0 is invalid lock id + uint32 lock_id=3; + } +} + diff --git a/src/main/src/general/app/app_shared/process_rpc_proto_ext.rs b/src/main/src/general/app/app_shared/process_rpc_proto_ext.rs new file mode 100644 index 0000000..713e053 --- /dev/null +++ b/src/main/src/general/app/app_shared/process_rpc_proto_ext.rs @@ -0,0 +1,147 @@ +use crate::general::network::proto; + +use super::process_rpc::proc_proto; + +impl From for proto::FnTaskId { + fn from(taskid: proc_proto::FnTaskId) -> Self { + proto::FnTaskId { + call_node_id: taskid.call_node_id, + task_id: taskid.task_id, + } + } +} + +impl From for proc_proto::KvResponse { + fn from(response: proto::kv::KvResponse) -> Self { + proc_proto::KvResponse { + resp: Some(match response.resp.unwrap() { + proto::kv::kv_response::Resp::Get(get) => { + proc_proto::kv_response::Resp::Get(proc_proto::kv_response::KvGetResponse { + idxs: get.idxs, + values: get.values, + }) + } + proto::kv::kv_response::Resp::PutOrDel(put_or_del) => { + let kv = put_or_del.kv.unwrap(); + proc_proto::kv_response::Resp::PutOrDel( + proc_proto::kv_response::KvPutOrDelResponse { + kv: Some(proc_proto::KvPair { + key: kv.key, + values: kv.values, + }), + }, + ) + } + proto::kv::kv_response::Resp::LockId(lock_id) => { + proc_proto::kv_response::Resp::LockId(lock_id) + } + }), + } + } +} + +pub trait ProcRpcReqExt { + fn app_fn(&self) -> &str; + fn app_name(&self) -> &str { + self.app_fn().split("/").next().unwrap() + } + fn func_name(&self) -> &str { + self.app_fn().split("/").nth(1).unwrap() + } + fn fn_taskid(&self) -> proto::FnTaskId; +} + +impl ProcRpcReqExt for proc_proto::KvRequest { + fn app_fn(&self) -> &str { + match &self.op { + Some(proc_proto::kv_request::Op::Set(set)) => set.app_fn.as_str(), + Some(proc_proto::kv_request::Op::Get(get)) => get.app_fn.as_str(), + Some(proc_proto::kv_request::Op::Delete(delete)) => delete.app_fn.as_str(), + None => panic!("no app_fn in kv request"), + } + } + fn fn_taskid(&self) -> proto::FnTaskId { + match &self.op { + Some(proc_proto::kv_request::Op::Set(set)) => { + proto::FnTaskId::from(set.src_task_id.clone().unwrap()) + } + Some(proc_proto::kv_request::Op::Get(get)) => { + proto::FnTaskId::from(get.src_task_id.clone().unwrap()) + } + Some(proc_proto::kv_request::Op::Delete(delete)) => { + proto::FnTaskId::from(delete.src_task_id.clone().unwrap()) + } + None => panic!("no fn_taskid in kv request"), + } + } +} + +pub trait ProcRpcExtKvReq { + fn to_proto_kvrequests(self) -> proto::kv::KvRequests; +} + +impl ProcRpcExtKvReq for proc_proto::KvRequest { + fn to_proto_kvrequests(self) -> proto::kv::KvRequests { + let app = self.app_name().to_string(); + let func = self.func_name().to_string(); + + match self.op { + Some(proc_proto::kv_request::Op::Set(set)) => { + let kv = set.kv.unwrap(); + proto::kv::KvRequests { + app, + func, + requests: vec![proto::kv::KvRequest { + op: Some(proto::kv::kv_request::Op::Set( + proto::kv::kv_request::KvPutRequest { + kv: Some(proto::kv::KvPair { + key: kv.key, + values: kv.values, + }), + }, + )), + }], + prev_kv_opeid: 0, + } + } + Some(proc_proto::kv_request::Op::Get(get)) => { + let range = get.range.unwrap(); + proto::kv::KvRequests { + app, + func, + requests: vec![proto::kv::KvRequest { + op: Some(proto::kv::kv_request::Op::Get( + proto::kv::kv_request::KvGetRequest { + range: Some(proto::kv::KeyRange { + start: range.start, + end: range.end, + }), + idxs: get.idxs, + }, + )), + }], + prev_kv_opeid: 0, + } + } + Some(proc_proto::kv_request::Op::Delete(delete)) => { + let range = delete.range.unwrap(); + proto::kv::KvRequests { + app, + func, + requests: vec![proto::kv::KvRequest { + op: Some(proto::kv::kv_request::Op::Delete( + proto::kv::kv_request::KvDeleteRequest { + range: Some(proto::kv::KeyRange { + start: range.start, + end: range.end, + }), + }, + )), + }], + prev_kv_opeid: 0, + } + } + None => panic!("no op in kv request"), + } + } +} diff --git a/src/main/src/general/app/m_executor.rs b/src/main/src/general/app/m_executor.rs index ff8467f..dd8d1f4 100644 --- a/src/main/src/general/app/m_executor.rs +++ b/src/main/src/general/app/m_executor.rs @@ -3,6 +3,7 @@ use crate::general::app::instance::m_instance_manager::UnsafeFunctionCtx; use crate::general::app::instance::InstanceTrait; use crate::general::app::AppType; use crate::general::app::FnMeta; +use crate::general::data::m_data_general::DATA_UID_PREFIX_FN_KV; use crate::general::network::m_p2p::RPCCaller; use crate::general::network::m_p2p::TaskId; use crate::general::network::proto::FnTaskId; @@ -27,6 +28,8 @@ use crate::{ }; use async_trait::async_trait; use dashmap::DashMap; +use serde::Deserialize; +use serde::Serialize; use std::time::Duration; use std::{ ptr::NonNull, @@ -148,12 +151,12 @@ impl FnExeCtxAsync { } } - pub fn http_str_unwrap(&self) -> String { - match &self.inner.event_ctx { - EventCtx::Http(text) => text.clone(), - _ => panic!("not http event ctx"), - } - } + // pub fn http_str_unwrap(&self) -> String { + // match &self.inner.event_ctx { + // EventCtx::Http(text) => text.clone(), + // _ => panic!("not http event ctx"), + // } + // } pub fn set_result(&mut self, result: Option) { self.inner.res = result; @@ -262,7 +265,7 @@ pub struct Executor { task_subwait_for: DashMap>, // this runing task id -> src waiting rpc - task_subwait_by: DashMap>, + task_subwait_by: DashMap>, rpc_handler_distribute_task: RPCHandler, rpc_caller_listen_for_task_done: RPCCaller, @@ -270,6 +273,13 @@ pub struct Executor { rpc_handler_add_wait_target: RPCHandler, } +#[derive(Serialize, Deserialize)] +struct FnDataEventArg { + src_called_by: NodeID, + src_taskid: u32, + trigger_data_key: String, +} + /// Base trait for function execution contexts pub trait FnExeCtxBase { /// Get the application name @@ -280,6 +290,39 @@ pub trait FnExeCtxBase { fn event_ctx(&self) -> &EventCtx; /// Get mutable reference to event context fn event_ctx_mut(&mut self) -> &mut EventCtx; + /// Get fn type + fn app_type(&self) -> AppType; + /// format arg to pass to function + fn format_arg_to_pass(&self) -> String { + match &self.event_ctx() { + EventCtx::Http(text) => text.clone(), + EventCtx::KvSet { + key, src_task_id, .. + } => { + let key_str = std::str::from_utf8(&key).unwrap(); + let trigger_data_key = match self.app_type() { + AppType::Jar | AppType::Wasm => { + // remove prefix fkv + key_str + .strip_prefix(DATA_UID_PREFIX_FN_KV) + .unwrap_or(key_str) + .to_string() + } + AppType::Native => { + // keep the key as is + key_str.to_string() + } + }; + + let arg = FnDataEventArg { + trigger_data_key, + src_called_by: src_task_id.call_node_id, + src_taskid: src_task_id.task_id, + }; + serde_json::to_string(&arg).unwrap() + } + } + } } impl FnExeCtxBase for FnExeCtxAsync { @@ -295,6 +338,9 @@ impl FnExeCtxBase for FnExeCtxAsync { fn event_ctx_mut(&mut self) -> &mut EventCtx { &mut self.inner.event_ctx } + fn app_type(&self) -> AppType { + self.inner.app_type + } } impl FnExeCtxBase for FnExeCtxSync { @@ -310,11 +356,16 @@ impl FnExeCtxBase for FnExeCtxSync { fn event_ctx_mut(&mut self) -> &mut EventCtx { &mut self.inner.event_ctx } + fn app_type(&self) -> AppType { + self.inner.app_type + } } impl Executor { - pub async fn wait_for_subtasks(&self, thistask: &u32) { + /// return last task response + pub async fn wait_for_subtasks(&self, thistask: &u32) -> Option { let mut done_tasks = vec![]; + let mut last_res = None; loop { if !self.task_subwait_for.contains_key(&thistask) { tracing::debug!( @@ -330,7 +381,7 @@ impl Executor { done_tasks.push(task.clone()); let view = self.view.clone(); let wait_task = tokio::spawn(async move { - let _ = view + let res: Result = view .executor() .rpc_caller_listen_for_task_done .call( @@ -342,19 +393,34 @@ impl Executor { Some(Duration::from_secs(180)), ) .await; + res.map_err(|err| { + tracing::error!("listen for task done failed with err: {}", err); + err + }) + .unwrap() }); wait_tasks.push(wait_task); } } for wait_task in wait_tasks { - let _ = wait_task.await; + let res = wait_task.await.unwrap(); + if !res.success { + tracing::error!( + "listen for task done failed with err: {}", + res.response_or_errmsg + ); + } else { + tracing::debug!("listen for task done success: {}", res.response_or_errmsg); + last_res = Some(res.response_or_errmsg); + } } } + last_res } - pub fn notify_subwait_done(&self, taskid: &FnTaskId) { + pub fn notify_subwait_done(&self, taskid: &FnTaskId, res: String) { loop { if let Some((_, sender)) = self.task_subwait_by.remove(&taskid) { - let _ = sender.send(()); + let _ = sender.send(res); } return; } @@ -430,23 +496,26 @@ impl Executor { }; let res = sub.recv().await; tracing::debug!("task is done: {:?}", res); - if res.is_ok() { - let _ = responsor - .send_resp(proto::ListenForTaskDoneResp { - success: true, - err_msg: "".to_owned(), - }) - .await - .todo_handle("listen task done"); - } else { - tracing::warn!("listen task done failed: {:?}", res); - let _ = responsor - .send_resp(proto::ListenForTaskDoneResp { - success: false, - err_msg: format!("err:{:?}", res), - }) - .await - .todo_handle("listen task done"); + match res { + Ok(res) => { + let _ = responsor + .send_resp(proto::ListenForTaskDoneResp { + success: true, + response_or_errmsg: res, + }) + .await + .todo_handle("listen task done"); + } + Err(err) => { + tracing::warn!("listen task done failed: {:?}", err); + let _ = responsor + .send_resp(proto::ListenForTaskDoneResp { + success: false, + response_or_errmsg: format!("err:{:?}", err), + }) + .await + .todo_handle("listen task done"); + } } }); Ok(()) @@ -505,6 +574,22 @@ impl Executor { self.execute_sync(ctx) } + pub fn handle_exec_result(&self, taskid: &FnTaskId, res: WSResult>) { + let res_str = match res { + Ok(Some(res)) => res, + Ok(None) => "".to_owned(), + Err(err) => { + tracing::warn!( + "handle failed exec result for taskid: {:?} with err: {}", + taskid, + err + ); + format!("err:{:?}", err) + } + }; + self.notify_subwait_done(taskid, res_str); + } + pub async fn handle_distribute_task( &self, resp: RPCResponsor, @@ -679,8 +764,14 @@ impl Executor { tracing::error!("send sche resp for app:{app} fn:{func} failed with err: {err}"); } let taskid = ctx.inner.task_id.clone(); - let _ = self.execute_sync(ctx); - self.notify_subwait_done(&taskid); + let res = self.execute_sync(ctx); + self.handle_exec_result(&taskid, res); + // let res_str = match res { + // Ok(Some(res)) => res, + // Ok(None) => "".to_owned(), + // Err(err) => format!("err:{:?}", err), + // }; + // self.notify_subwait_done(&taskid, res_str); } else { //如果函数支持异步 // construct async fn exe ctx @@ -731,10 +822,12 @@ impl Executor { } let taskid = ctx.task_id().clone(); - let _ = self.execute(ctx).await; + let res = self.execute(ctx).await; + + self.handle_exec_result(&taskid, res); // notify src task - self.notify_subwait_done(&taskid); + // self.notify_subwait_done(&taskid); // self.take_subwaitings_for_task(&ctx.task_id()) } } @@ -832,7 +925,7 @@ impl Executor { }; // wait for sub tasks done - self.wait_for_subtasks(&task_id.task_id).await; + let _ = self.wait_for_subtasks(&task_id.task_id).await; res } diff --git a/src/main/src/general/app/mod.rs b/src/main/src/general/app/mod.rs index 364ecaa..bc34092 100644 --- a/src/main/src/general/app/mod.rs +++ b/src/main/src/general/app/mod.rs @@ -7,6 +7,7 @@ pub mod m_executor; pub mod v_os; use super::data::m_data_general::{DataSetMetaV2, GetOrDelDataArg, GetOrDelDataArgType}; +use super::data::m_kv_user_client::KvUserClient; use super::m_os::APPS_REL_DIR; use crate::general::app::app_native::native_apps; use crate::general::app::instance::m_instance_manager::InstanceManager; @@ -67,6 +68,7 @@ logical_module_view_impl!(View, master, Option); logical_module_view_impl!(View, instance_manager, InstanceManager); logical_module_view_impl!(View, data_general, DataGeneral); logical_module_view_impl!(View, executor, Executor); +logical_module_view_impl!(View, kv_user_client, KvUserClient); #[derive(Debug, Serialize, Deserialize)] #[serde(untagged)] @@ -1122,7 +1124,7 @@ impl AppMetaManager { ) .await?; // wait for sub task done(checkpoint) - self.view.executor().wait_for_subtasks(&task.task_id).await; + let _ = self.view.executor().wait_for_subtasks(&task.task_id).await; tracing::debug!("app uploaded, wait for sub task done"); Ok(()) } diff --git a/src/main/src/general/data/m_data_general/http.rs b/src/main/src/general/data/m_data_general/http.rs index bd379fd..6114cad 100644 --- a/src/main/src/general/data/m_data_general/http.rs +++ b/src/main/src/general/data/m_data_general/http.rs @@ -15,6 +15,8 @@ use axum::routing::post; use axum::Router; use serde::Serialize; use std::io; +use std::time::SystemTime; +use std::time::UNIX_EPOCH; impl DataGeneral { pub fn register_http(&self) -> WSResult<()> { @@ -32,38 +34,57 @@ impl DataGeneral { struct UploadDataResponse { err_msg: String, } -impl UploadDataResponse { - fn is_err(&self) -> bool { - self.err_msg.len() > 0 - } -} +// impl UploadDataResponse { +// fn is_err(&self) -> bool { +// self.err_msg.len() > 0 +// } +// } -#[derive(Debug, Serialize)] -struct UploadDataResponses { - responses: Vec, -} +// #[derive(Debug, Serialize)] +// struct UploadDataResponses { +// responses: Vec, +// } -impl UploadDataResponses { - fn contains_err(&self) -> bool { - self.responses.iter().any(|r| r.is_err()) - } -} +// impl UploadDataResponses { +// fn contains_err(&self) -> bool { +// self.responses.iter().any(|r| r.is_err()) +// } +// } async fn handle_upload_data( State(view): State, mut multipart: Multipart, ) -> Response { - let mut responses = UploadDataResponses { - responses: Vec::new(), - }; + // let mut responses = UploadDataResponses { + // responses: Vec::new(), + // }; + let mut data_items = Vec::new(); + let mut first_field = true; + let mut unique_id: Option = None; + let req_arrive_time = SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap() + .as_millis(); while let Ok(Some(field)) = multipart.next_field().await { - let Some(name) = field.name() else { - responses.responses.push(UploadDataResponse { - err_msg: "field name is None".to_string(), - }); - continue; - }; - let name = name.to_string(); + if first_field { + first_field = false; + // require name for first field as unique id + let Some(name) = field.name() else { + // responses.responses.push(UploadDataResponse { + // err_msg: "field name is None".to_string(), + // }); + return ( + StatusCode::BAD_REQUEST, + serde_json::to_string(&UploadDataResponse { + err_msg: "field name is None".to_string(), + }) + .unwrap(), + ) + .into_response(); + }; + unique_id = Some(name.to_string()); + } + let data = match field .bytes() .await @@ -71,34 +92,17 @@ async fn handle_upload_data( { Ok(data) => data, Err(err) => { - responses - .responses - .push(UploadDataResponse { err_msg: err }); - continue; + return ( + StatusCode::BAD_REQUEST, + serde_json::to_string(&UploadDataResponse { err_msg: err }).unwrap(), + ) + .into_response(); } }; - tracing::debug!("data received: {}, start writing to system", &name); - let taskid = view.executor().register_sub_task(); - let taskid_value = taskid.task_id; - let _ = view - .data_general() - .write_data( - new_data_unique_id_fn_kv(name.as_bytes()), - vec![DataItemArgWrapper::new(proto::DataItem::new_mem_data( - data.to_vec(), - ))], - Some(( - view.p2p().nodes_config.this_node(), - proto::DataOpeType::Write, - proto::data_schedule_context::OpeRole::new_upload_data(), - taskid, - )), - ) - .await - .todo_handle("write data failed when upload data"); - - view.executor().wait_for_subtasks(&taskid_value).await; + data_items.push(DataItemArgWrapper::new(proto::DataItem::new_mem_data( + data.to_vec(), + ))); // let name = field.name().unwrap_or("").to_string(); // if name == "app_name" { @@ -111,14 +115,64 @@ async fn handle_upload_data( // })?); // } } - - if responses.contains_err() { - ( + let Some(unique_id) = unique_id else { + return ( StatusCode::BAD_REQUEST, - serde_json::to_string(&responses).unwrap(), + serde_json::to_string(&UploadDataResponse { + err_msg: "unique id is not specified".to_string(), + }) + .unwrap(), ) - .into_response() - } else { - (StatusCode::OK, serde_json::to_string(&responses).unwrap()).into_response() - } + .into_response(); + }; + + tracing::debug!("data received: {}, start writing to system", &unique_id); + let taskid = view.executor().register_sub_task(); + let taskid_value = taskid.task_id; + let _ = view + .data_general() + .write_data( + new_data_unique_id_fn_kv(unique_id.as_bytes()), + // vec![DataItemArgWrapper::new(proto::DataItem::new_mem_data( + // data.to_vec(), + // ))], + data_items, + Some(( + view.p2p().nodes_config.this_node(), + proto::DataOpeType::Write, + proto::data_schedule_context::OpeRole::new_upload_data(), + taskid, + )), + ) + .await + .todo_handle("write data failed when upload data"); + + let res = view.executor().wait_for_subtasks(&taskid_value).await; + let res_str = match res { + Some(res) => { + // try deserialize res + match serde_json::from_str::(&res) { + Ok(mut res) => { + let _ = res.as_object_mut().unwrap().insert( + "req_arrive_time".to_string(), + serde_json::Value::Number((req_arrive_time as u64).into()), + ); + serde_json::to_string(&res).unwrap() + } + Err(err) => { + tracing::warn!( + "deserialize upload data response failed: {}, can't insert req_arrive_time", + err + ); + res + } + } + } + None => "".to_owned(), + }; + + // no sub trigger result collection + tracing::debug!("upload data result: {}", res_str); + + (StatusCode::OK, res_str).into_response() } diff --git a/src/main/src/worker/m_kv_user_client.rs b/src/main/src/general/data/m_kv_user_client.rs similarity index 65% rename from src/main/src/worker/m_kv_user_client.rs rename to src/main/src/general/data/m_kv_user_client.rs index fac8f0a..857dd2e 100644 --- a/src/main/src/worker/m_kv_user_client.rs +++ b/src/main/src/general/data/m_kv_user_client.rs @@ -1,3 +1,5 @@ +use crate::general::network::proto::FnTaskId; +use crate::general::network::proto_ext::data_ope_role::ProtoExtDataOpeRole; use crate::general::network::proto_ext::ProtoExtDataItem; use crate::{ general::{ @@ -23,14 +25,14 @@ use crate::{ util::JoinHandleWrapper, }; use async_trait::async_trait; -use std::collections::HashMap; +use std::collections::{BTreeSet, HashMap}; use ws_derive::LogicalModule; logical_module_view_impl!(KvUserClientView); logical_module_view_impl!(KvUserClientView, p2p, P2PModule); logical_module_view_impl!(KvUserClientView, data_general, DataGeneral); logical_module_view_impl!(KvUserClientView, dist_lock, DistLock); -logical_module_view_impl!(KvUserClientView, kv_user_client, Option); +logical_module_view_impl!(KvUserClientView, kv_user_client, KvUserClient); #[derive(LogicalModule)] pub struct KvUserClient { @@ -45,11 +47,11 @@ impl LogicalModule for KvUserClient { where Self: Sized, { - unsafe { - *(&*KV_USER_CLIENT as *const Option - as *mut Option) = - Some(KvUserClientView::new(args.logical_modules_ref.clone())); - } + // unsafe { + // *(&*KV_USER_CLIENT as *const Option + // as *mut Option) = + // Some(KvUserClientView::new(args.logical_modules_ref.clone())); + // } Self { // testmap: SkipMap::new(), view: KvUserClientView::new(args.logical_modules_ref.clone()), @@ -76,16 +78,16 @@ impl LogicalModule for KvUserClient { // } // } -lazy_static::lazy_static! { - static ref KV_USER_CLIENT: Option=None; - // static ref RECENT_Kv_CACHE: Cache>=Cache::>::builder() - // .time_to_live(Duration::from_secs(10)) - // .weigher(|_key, value| -> u32 { value.len().try_into().unwrap_or(u32::MAX) }) - // // This cache will hold up to 32MiB of values. - // .max_capacity(32 * 1024 * 1024) - // .build(); - // static ref NEXT_CACHE_ID: AtomicI32=AtomicI32::new(0); -} +// lazy_static::lazy_static! { +// static ref KV_USER_CLIENT: Option=None; +// // static ref RECENT_Kv_CACHE: Cache>=Cache::>::builder() +// // .time_to_live(Duration::from_secs(10)) +// // .weigher(|_key, value| -> u32 { value.len().try_into().unwrap_or(u32::MAX) }) +// // // This cache will hold up to 32MiB of values. +// // .max_capacity(32 * 1024 * 1024) +// // .build(); +// // static ref NEXT_CACHE_ID: AtomicI32=AtomicI32::new(0); +// } // pub fn kv_user_client() -> &'static KvUserClient { // let res = &*KV_USER_CLIENT as *const Option as *mut Option; @@ -122,21 +124,26 @@ lazy_static::lazy_static! { impl KvUserClient { pub async fn kv_requests( &self, - app_name: &str, - func_name: &str, - reqs: proto::kv::KvRequests, + src_taskid: FnTaskId, + proto::kv::KvRequests { + app: app_name, + func: func_name, + requests, + .. + }: proto::kv::KvRequests, // responsor: RPCResponsor, ) -> WSResult { let mut kv_responses = KvResponses { responses: vec![] }; // pre-collect each operation's event trigger info // let mut kv_opeid = None; - for req in reqs.requests.into_iter() { + for req in requests.into_iter() { // let mut sub_tasks = vec![]; let response = match req.op.unwrap() { - proto::kv::kv_request::Op::Set(set) => { - Some(self.handle_kv_set(app_name, func_name, set).await) - } + proto::kv::kv_request::Op::Set(set) => Some( + self.handle_kv_set(src_taskid.clone(), &app_name, &func_name, set) + .await, + ), proto::kv::kv_request::Op::Get(get) => Some(self.handle_kv_get(get).await), proto::kv::kv_request::Op::Delete(delete) => { Some(self.handle_kv_delete(delete).await) @@ -200,141 +207,93 @@ impl KvUserClient { async fn handle_kv_set( &self, - _app_name: &str, - _func_name: &str, - _set: proto::kv::kv_request::KvPutRequest, + src_taskid: FnTaskId, + app_name: &str, + func_name: &str, + set: proto::kv::kv_request::KvPutRequest, ) -> KvResponse { - // let proto::kv::KvPair { key, value } = set.kv.unwrap(); - // let cur_node = self.view.p2p().nodes_config.this_node(); - // tracing::debug!("handle_kv_set: key: {:?}", key); - - // let data_general = self.view.data_general(); - //返回结果未处理 曾俊 - { - todo!() - // if let Err(e) = data_general - // .write_data( - // new_data_unique_id_fn_kv(&key), - // //原代码: - // // vec![proto::DataItem { - // // data_item_dispatch: Some(proto::data_item::DataItemDispatch::RawBytes(value)), - // // }], - // //修改后封装成要求的DataItemArgWrapper类型 tmpzipfile设置为Uninitialized状态 在DataItemArgWrapper结构体中添加了一个new方法 曾俊 - // vec![DataItemArgWrapper::new(value)], - // Some(( - // cur_node, - // proto::DataOpeType::Write, - // proto::data_schedule_context::OpeRole::FuncCall(proto::DataOpeRoleFuncCall { - // app_func: format!("{}/{}", app_name, func_name), - // node_id: cur_node, - // }), - // )), - // ) - // .await - // { - // tracing::error!("Failed to write data: {}", e); - // } - } - - // .todo_handle("This part of the code needs to be implemented."); - KvResponse::new_common(vec![]) - } - - fn convert_get_data_res_to_kv_response( - key: Vec, - uid: Vec, - _meta: DataSetMetaV2, - splits: HashMap, - ) -> WSResult> { - tracing::debug!( - "convert_get_data_res_to_kv_response uid: {:?}, split keys: {:?}", - uid, - splits.keys().collect::>() - ); - if splits.len() != 1 { - return Err(WSError::WsDataError( - WsDataError::KvGotWrongSplitCountAndIdx { - unique_id: uid.clone(), - idx: splits.keys().cloned().collect(), - }, - )); - } - - let (idx, data_item) = splits.into_iter().next().unwrap(); - if idx != 0 { - return Err(WSError::WsDataError( - WsDataError::KvGotWrongSplitCountAndIdx { - unique_id: uid.clone(), - idx: vec![idx], - }, - )); - } - - let data_item_dispatch = data_item.data_item_dispatch.unwrap(); - let raw_bytes = match data_item_dispatch { - proto::data_item::DataItemDispatch::RawBytes(value) => value, - _ => { - return Err(WSError::WsDataError(WsDataError::KvDeserializeErr { - unique_id: uid, - context: format!( - "data_item_dispatch({}) is not RawBytes", - proto::DataItem { - data_item_dispatch: Some(data_item_dispatch), - } - .to_string(), + let kv = set.kv.unwrap(); + let key = kv.key; + let values = kv.values; + + let res = self + .view + .data_general() + .write_data( + new_data_unique_id_fn_kv(&key), + values + .into_iter() + .map(|v| DataItemArgWrapper::new(proto::DataItem::new_mem_data(v))) + .collect(), + Some(( + self.view.p2p().nodes_config.this_node(), + proto::DataOpeType::Write, + proto::data_schedule_context::OpeRole::new_fn_call( + app_name, + func_name, + self.view.p2p().nodes_config.this_node(), ), - })) - } - }; + src_taskid, + )), + ) + .await; - Ok(vec![proto::kv::KvPair { - key: key, - value: raw_bytes, - }]) + match res { + Err(err) => { + tracing::warn!("kv_requests set err:{:?}", err); + return KvResponse::new_put_or_del(proto::kv::KvPair { + key: vec![], // err so we put empty + values: vec![], + }); + } + Ok(_) => KvResponse::new_put_or_del(proto::kv::KvPair { + key, + values: vec![], + }), + } } async fn handle_kv_get(&self, get: proto::kv::kv_request::KvGetRequest) -> KvResponse { tracing::debug!("handle_kv_get:{:?}", get); + let idxs: Vec = get.idxs.into_iter().map(|i| i as u8).collect(); + let range = get.range.unwrap(); + let key = range.start; let data_general = self.view.data_general(); - let uid = new_data_unique_id_fn_kv(&get.range.as_ref().unwrap().start); + let uid = new_data_unique_id_fn_kv(&key); let got = data_general .get_or_del_datas(GetOrDelDataArg { meta: None, unique_id: uid.clone(), - ty: GetOrDelDataArgType::All, + ty: GetOrDelDataArgType::PartialMany { + idxs: idxs.iter().map(|i| *i).collect(), + }, }) .await; - let got = match got { - Ok((meta, splits)) => match Self::convert_get_data_res_to_kv_response( - get.range.unwrap().start, - uid, - meta, - splits, - ) { - Ok(res) => res, - Err(err) => { - tracing::warn!("get kv data error:{:?}", err); - vec![] + match got { + Ok((_meta, mut idx_2_items)) => { + let mut values = vec![]; + for idx in idxs.iter() { + if let Some(mut item) = idx_2_items.remove(idx) { + values.push(item.take_mem_data()); + } } - }, - Err(WSError::WsDataError(WsDataError::DataSetNotFound { uniqueid })) => { - tracing::debug!("get kv data not found, uid({:?})", uniqueid); - vec![] + KvResponse::new_get(idxs, values) } Err(err) => { - tracing::warn!("get kv data error:{:?}", err); - vec![] + tracing::error!("kv get err: {:?}", err); + KvResponse::new_get(vec![], vec![]) } - }; - KvResponse::new_common(got) + } } async fn handle_kv_delete(&self, delete: proto::kv::kv_request::KvDeleteRequest) -> KvResponse { tracing::debug!("handle_kv_delete:{:?}", delete); + let range = delete.range.unwrap(); + let key = range.start; let data_general = self.view.data_general(); - let uid = new_data_unique_id_fn_kv(&delete.range.as_ref().unwrap().start); + let uid = new_data_unique_id_fn_kv(&key); let deleted = data_general .get_or_del_datas(GetOrDelDataArg { meta: None, @@ -343,29 +302,21 @@ impl KvUserClient { }) .await; - let deleted = match deleted { - Ok((deleted_meta, deleted_splits)) => match Self::convert_get_data_res_to_kv_response( - delete.range.unwrap().start, - uid, - deleted_meta, - deleted_splits, - ) { - Ok(res) => res, - Err(err) => { - tracing::warn!("delete kv data error:{:?}", err); - vec![] + match deleted { + Ok((_meta, mut idx_2_items)) => { + let mut values = vec![]; + + let ordered_idxs: BTreeSet = idx_2_items.iter().map(|(i, _)| *i).collect(); + for idx in ordered_idxs.iter() { + values.push(idx_2_items.remove(idx).unwrap().take_mem_data()); } - }, - Err(WSError::WsDataError(WsDataError::DataSetNotFound { uniqueid })) => { - tracing::debug!("delete kv data not found, uid({:?})", uniqueid); - vec![] + KvResponse::new_put_or_del(proto::kv::KvPair { key, values }) } Err(err) => { - tracing::warn!("delete kv data error:{:?}", err); - vec![] + tracing::error!("kv get err: {:?}", err); + KvResponse::new_get(vec![], vec![]) } - }; - KvResponse::new_common(deleted) + } } // async fn handle_kv_lock( // &self, @@ -445,6 +396,7 @@ mod test { proto::{ self, kv::{KvRequest, KvRequests}, + FnTaskId, }, proto_ext::KvRequestExt, }, @@ -460,29 +412,34 @@ mod test { let func = "test_func"; let test_key = "test_key"; let test_value = "test_value"; + let test_taskid = FnTaskId { + task_id: 0, + call_node_id: view.p2p().nodes_config.this_node(), + }; // first time get should be none { let res = view .kv_user_client() .kv_requests( - app, - func, + // fake taskid + test_taskid.clone(), KvRequests { app: app.to_owned(), func: func.to_owned(), prev_kv_opeid: -1, - requests: vec![KvRequest::new_get(test_key.as_bytes().to_owned())], + requests: vec![KvRequest::new_get(test_key.as_bytes().to_owned(), vec![0])], }, ) .await .unwrap(); assert!(res.responses.len() == 1); match res.responses[0].resp.clone().unwrap() { - proto::kv::kv_response::Resp::CommonResp(kv_response) => { - assert!(kv_response.kvs.len() == 0); + proto::kv::kv_response::Resp::Get(kv_response) => { + assert!(kv_response.idxs.len() == 0); + assert!(kv_response.values.len() == 0); } - proto::kv::kv_response::Resp::LockId(_) => panic!(), + resp => panic!("require get resp, but get {:?}", resp), } tracing::debug!("first time get is none"); } @@ -492,15 +449,14 @@ mod test { let res = view .kv_user_client() .kv_requests( - app, - func, + test_taskid.clone(), KvRequests { app: app.to_owned(), func: func.to_owned(), prev_kv_opeid: -1, requests: vec![KvRequest::new_set(proto::kv::KvPair { key: test_key.as_bytes().to_owned(), - value: test_value.as_bytes().to_owned(), + values: vec![test_value.as_bytes().to_owned()], })], }, ) @@ -508,15 +464,15 @@ mod test { .unwrap(); assert!(res.responses.len() == 1); match res.responses[0].resp.clone().unwrap() { - proto::kv::kv_response::Resp::CommonResp(kv_response) => { - assert!(kv_response.kvs.len() == 0); + proto::kv::kv_response::Resp::PutOrDel(kv_response) => { + assert!(kv_response.kv.is_none()); // assert_eq!(str::from_utf8(&kv_response.kvs[0].key).unwrap(), test_key); // assert_eq!( // str::from_utf8(&kv_response.kvs[0].value).unwrap(), // test_value // ); } - proto::kv::kv_response::Resp::LockId(_) => panic!(), + resp => panic!("require common resp, but get {:?}", resp), } tracing::debug!("set success"); @@ -524,25 +480,25 @@ mod test { let res = view .kv_user_client() .kv_requests( - app, - func, + test_taskid.clone(), KvRequests { app: app.to_owned(), func: func.to_owned(), prev_kv_opeid: -1, - requests: vec![KvRequest::new_get(test_key.as_bytes().to_owned())], + requests: vec![KvRequest::new_get(test_key.as_bytes().to_owned(), vec![0])], }, ) .await .unwrap(); assert!(res.responses.len() == 1); match res.responses[0].resp.clone().unwrap() { - proto::kv::kv_response::Resp::CommonResp(kv_response) => { - assert_eq!(kv_response.kvs.len(), 1); - assert!(kv_response.kvs[0].key == test_key.as_bytes().to_owned()); - assert!(kv_response.kvs[0].value == test_value.as_bytes().to_owned()); + proto::kv::kv_response::Resp::Get(kv_response) => { + // assert_eq!(kv_response.kvs.len(), 1); + // assert!(kv_response.kvs[0].key == test_key.as_bytes().to_owned()); + assert!(kv_response.idxs[0] == 0); + assert!(kv_response.values[0] == test_value.as_bytes().to_owned()); } - proto::kv::kv_response::Resp::LockId(_) => panic!(), + resp => panic!("require get resp, but get {:?}", resp), } tracing::debug!("get after set success"); @@ -550,8 +506,7 @@ mod test { let res = view .kv_user_client() .kv_requests( - app, - func, + test_taskid.clone(), KvRequests { app: app.to_owned(), func: func.to_owned(), @@ -563,11 +518,12 @@ mod test { .unwrap(); assert!(res.responses.len() == 1); match res.responses[0].resp.clone().unwrap() { - proto::kv::kv_response::Resp::CommonResp(kv_response) => { - assert!(kv_response.kvs[0].key == test_key.as_bytes().to_owned()); - assert!(kv_response.kvs[0].value == test_value.as_bytes().to_owned()); + proto::kv::kv_response::Resp::PutOrDel(kv_response) => { + let kv = kv_response.kv.unwrap(); + assert!(kv.key == test_key.as_bytes().to_owned()); + assert!(kv.values[0] == test_value.as_bytes().to_owned()); } - proto::kv::kv_response::Resp::LockId(_) => panic!(), + resp => panic!("require common resp, but get {:?}", resp), } tracing::debug!("delete after get success"); @@ -575,8 +531,7 @@ mod test { let res = view .kv_user_client() .kv_requests( - app, - func, + test_taskid.clone(), KvRequests { app: app.to_owned(), func: func.to_owned(), @@ -588,10 +543,10 @@ mod test { .unwrap(); assert!(res.responses.len() == 1); match res.responses[0].resp.clone().unwrap() { - proto::kv::kv_response::Resp::CommonResp(kv_response) => { - assert!(kv_response.kvs.len() == 0); + proto::kv::kv_response::Resp::PutOrDel(kv_response) => { + assert!(kv_response.kv.is_none()); } - proto::kv::kv_response::Resp::LockId(_) => panic!(), + resp => panic!("expected delete resp, got:{:?}", resp), } tracing::debug!("delete again is none"); } diff --git a/src/main/src/general/data/mod.rs b/src/main/src/general/data/mod.rs index b88a7a9..ad0b3f9 100644 --- a/src/main/src/general/data/mod.rs +++ b/src/main/src/general/data/mod.rs @@ -2,3 +2,4 @@ pub mod kv_interface; pub mod m_data_general; pub mod m_dist_lock; pub mod m_kv_store_engine; +pub mod m_kv_user_client; diff --git a/src/main/src/general/network/proto_ext/data_ope_role.rs b/src/main/src/general/network/proto_ext/data_ope_role.rs index 48afabf..699d76e 100644 --- a/src/main/src/general/network/proto_ext/data_ope_role.rs +++ b/src/main/src/general/network/proto_ext/data_ope_role.rs @@ -1,11 +1,26 @@ -use crate::general::network::proto; +use crate::{general::network::proto, sys::NodeID}; pub trait ProtoExtDataOpeRole { fn new_upload_data() -> proto::data_schedule_context::OpeRole; + fn new_fn_call( + app_name: &str, + func_name: &str, + thisnode: NodeID, + ) -> proto::data_schedule_context::OpeRole; } impl ProtoExtDataOpeRole for proto::data_schedule_context::OpeRole { fn new_upload_data() -> proto::data_schedule_context::OpeRole { proto::data_schedule_context::OpeRole::UploadData(proto::DataOpeRoleUploadData {}) } + fn new_fn_call( + app_name: &str, + func_name: &str, + thisnode: NodeID, + ) -> proto::data_schedule_context::OpeRole { + proto::data_schedule_context::OpeRole::FuncCall(proto::DataOpeRoleFuncCall { + app_func: format!("{}/{}", app_name, func_name), + node_id: thisnode, + }) + } } diff --git a/src/main/src/general/network/proto_ext/mod.rs b/src/main/src/general/network/proto_ext/mod.rs index 1049916..b4d7f88 100644 --- a/src/main/src/general/network/proto_ext/mod.rs +++ b/src/main/src/general/network/proto_ext/mod.rs @@ -47,9 +47,17 @@ pub trait ProtoExtDataItem: Sized { fn new_partial_raw_bytes(rawbytes: impl Into>, range: Range) -> WSResult; fn new_file_data(path: &str, is_dir: bool) -> Self; fn new_mem_data(mem: Vec) -> Self; + /// panic if not mem data + fn take_mem_data(&mut self) -> Vec; } impl ProtoExtDataItem for proto::DataItem { + fn take_mem_data(&mut self) -> Vec { + match &mut self.data_item_dispatch { + Some(proto::data_item::DataItemDispatch::RawBytes(bytes)) => std::mem::take(bytes), + _ => panic!("DataItem is not a mem data"), + } + } fn new_partial_raw_bytes(rawbytes: impl Into>, range: Range) -> WSResult { let bytes = rawbytes.into(); if range.end > bytes.len() { @@ -259,16 +267,27 @@ impl AsRef<[u8]> for proto::DataItem { pub trait ProtoExtKvResponse { fn new_lock(lock_id: u32) -> KvResponse; - fn new_common(kvs: Vec) -> KvResponse; + fn new_get(idxs: Vec, values: Vec>) -> KvResponse; + fn new_put_or_del(kv: proto::kv::KvPair) -> KvResponse; fn lock_id(&self) -> Option; - fn common_kvs(&self) -> Option<&Vec>; + fn get_kvs(&self) -> Option<&proto::kv::kv_response::KvGetResponse>; } impl ProtoExtKvResponse for KvResponse { - fn new_common(kvs: Vec) -> KvResponse { + fn new_get(idxs: Vec, values: Vec>) -> KvResponse { + KvResponse { + resp: Some(proto::kv::kv_response::Resp::Get( + proto::kv::kv_response::KvGetResponse { + idxs: idxs.iter().map(|v| *v as u32).collect(), + values, + }, + )), + } + } + fn new_put_or_del(kv: proto::kv::KvPair) -> KvResponse { KvResponse { - resp: Some(proto::kv::kv_response::Resp::CommonResp( - proto::kv::kv_response::KvResponse { kvs }, + resp: Some(proto::kv::kv_response::Resp::PutOrDel( + proto::kv::kv_response::KvPutOrDelResponse { kv: Some(kv) }, )), } } @@ -279,13 +298,16 @@ impl ProtoExtKvResponse for KvResponse { } fn lock_id(&self) -> Option { match self.resp.as_ref().unwrap() { - proto::kv::kv_response::Resp::CommonResp(_) => None, + proto::kv::kv_response::Resp::Get(_) | proto::kv::kv_response::Resp::PutOrDel(_) => { + None + } proto::kv::kv_response::Resp::LockId(id) => Some(*id), } } - fn common_kvs(&self) -> Option<&Vec> { + fn get_kvs(&self) -> Option<&proto::kv::kv_response::KvGetResponse> { match self.resp.as_ref().unwrap() { - proto::kv::kv_response::Resp::CommonResp(resp) => Some(&resp.kvs), + proto::kv::kv_response::Resp::Get(res) => Some(res), + proto::kv::kv_response::Resp::PutOrDel(_) => None, proto::kv::kv_response::Resp::LockId(_) => None, } } @@ -293,7 +315,7 @@ impl ProtoExtKvResponse for KvResponse { pub trait KvRequestExt { fn new_set(kv: proto::kv::KvPair) -> Self; - fn new_get(key: Vec) -> Self; + fn new_get(key: Vec, idxs: Vec) -> Self; fn new_delete(key: Vec) -> Self; fn new_lock(ope: DistLockOpe, key: Vec) -> Self; } @@ -306,10 +328,11 @@ impl KvRequestExt for proto::kv::KvRequest { )), } } - fn new_get(key: Vec) -> Self { + fn new_get(key: Vec, idxs: Vec) -> Self { proto::kv::KvRequest { op: Some(proto::kv::kv_request::Op::Get( proto::kv::kv_request::KvGetRequest { + idxs: idxs.iter().map(|v| *v as u32).collect(), range: Some(proto::kv::KeyRange { start: key, end: vec![], diff --git a/src/main/src/general/network/proto_src/kv.proto b/src/main/src/general/network/proto_src/kv.proto index ca19e91..d9e7f16 100644 --- a/src/main/src/general/network/proto_src/kv.proto +++ b/src/main/src/general/network/proto_src/kv.proto @@ -6,9 +6,9 @@ message KeyRange { bytes end=2; } -message KvPair { +message KvPair{ bytes key=1; - bytes value=2; + repeated bytes values=2; } @@ -20,6 +20,8 @@ message KvRequest { message KvGetRequest{ // required KeyRange range=1; + // required + repeated uint32 idxs=2; } message KvDeleteRequest{ // required @@ -63,13 +65,18 @@ message KvPairs{ } message KvResponse{ - message KvResponse{ - repeated KvPair kvs=1; + message KvGetResponse{ + repeated uint32 idxs=1; + repeated bytes values=2; + } + message KvPutOrDelResponse{ + KvPair kv=1; } oneof resp { - KvResponse common_resp=1; + KvGetResponse get=1; + KvPutOrDelResponse put_or_del=2; // 0 is invalid lock id - uint32 lock_id=2; + uint32 lock_id=3; } } diff --git a/src/main/src/general/network/proto_src/sche.proto b/src/main/src/general/network/proto_src/sche.proto index 6d616b2..51a8198 100644 --- a/src/main/src/general/network/proto_src/sche.proto +++ b/src/main/src/general/network/proto_src/sche.proto @@ -73,5 +73,5 @@ message ListenForTaskDoneReq{ message ListenForTaskDoneResp{ bool success=1; - string err_msg=2; + string response_or_errmsg=2; } \ No newline at end of file diff --git a/src/main/src/general/network/rpc_model.rs b/src/main/src/general/network/rpc_model.rs index 3b7cd80..bbbeac4 100644 --- a/src/main/src/general/network/rpc_model.rs +++ b/src/main/src/general/network/rpc_model.rs @@ -14,7 +14,7 @@ use std::{ }; use tokio::{net::UnixListener, sync::oneshot}; -use crate::result::{WSResult, WsFuncError, WsRpcErr}; +use crate::result::{ProcRpcErr, WSResult, WsFuncError, WsRpcErr}; // start from the begining #[async_trait] @@ -23,7 +23,13 @@ pub trait RpcCustom: Clone + Sized + Send + 'static { fn bind(a: Self::SpawnArgs) -> UnixListener; // return true if the id matches remote call pack - fn handle_remote_call(conn: &HashValue, id: u8, buf: &[u8]) -> bool; + fn handle_remote_call( + &self, + conn: &HashValue, + msgid: u8, + taskid: ProcRpcTaskId, + buf: &[u8], + ) -> bool; async fn verify(&self, buf: &[u8]) -> Option; // fn deserialize(id: u16, buf: &[u8]); } @@ -46,6 +52,8 @@ async fn accept_task(r: R, a: R::SpawnArgs) { } } +pub type ProcRpcTaskId = u32; + #[derive(Debug, Clone, PartialEq, Eq, Hash, EnumAsInner)] pub enum HashValue { Int(i64), @@ -65,6 +73,47 @@ pub fn close_conn(id: &HashValue) { let _ = CONN_MAP.write().remove(id); } +pub async fn send_resp( + conn: HashValue, + taskid: ProcRpcTaskId, + resp: Req::Resp, +) -> WSResult<()> { + let tx = { + let conn_map = CONN_MAP.read(); + let Some(conn_state) = conn_map.get(&conn) else { + return Err(ProcRpcErr::ConnIdNotFound { + conn, + context: "err at rpc_model::send_resp".to_string(), + } + .into()); + }; + conn_state.tx.clone() + }; + + let mut buf = BytesMut::with_capacity(resp.encoded_len() + 8); + buf.put_i16(Req::Resp::id() as i16); + buf.put_i32(resp.encoded_len() as i32); + buf.put_i32(taskid as i32); + resp.encode(&mut buf).unwrap(); + + tx.send(buf.into()).await.map_err(|err| { + tracing::warn!("failed to send resp: {:?}", err); + ProcRpcErr::SendRespFailed { + err, + context: "err at rpc_model::send_resp".to_string(), + } + .into() + }) +} +// let _ = CONN_MAP +// .write() +// .get_mut(&conn) +// .unwrap() +// .tx +// .send(resp.encode_to_vec()) +// .await; +// Ok(()) + pub async fn call( req: Req, conn: HashValue, @@ -97,6 +146,7 @@ pub async fn call( // send the request let mut buf = BytesMut::with_capacity(req.encoded_len() + 8); + buf.put_i16(Req::id() as i16); buf.put_i32(req.encoded_len() as i32); buf.put_i32(next_task as i32); req.encode(&mut buf).unwrap(); @@ -147,10 +197,11 @@ async fn listen_task(r: R, socket: tokio::net::UnixStream) -> WSRe tracing::debug!("new connection: {:?}", socket.peer_addr().unwrap()); let (mut sockrx, socktx) = socket.into_split(); - let mut buf = [0; 1024]; + let mut buf = vec![0; 1024]; let mut len = 0; let (conn, rx) = - match listen_task_ext::verify_remote::(r, &mut sockrx, &mut len, &mut buf).await { + match listen_task_ext::verify_remote::(r.clone(), &mut sockrx, &mut len, &mut buf).await + { Ok((conn, rx)) => (conn, rx), Err(err) => { tracing::debug!("verify failed {:?}", err); @@ -160,7 +211,7 @@ async fn listen_task(r: R, socket: tokio::net::UnixStream) -> WSRe listen_task_ext::spawn_send_loop(rx, socktx); - listen_task_ext::read_loop::(conn, &mut sockrx, &mut len, &mut buf).await; + listen_task_ext::read_loop::(r, conn, &mut sockrx, &mut len, &mut buf).await; Ok(()) } @@ -203,7 +254,7 @@ pub(super) mod listen_task_ext { .into()); } - let verify_msg_len = consume_i32(0, buf, len); + let verify_msg_len = consume_i32(0, buf, len) as usize; // println!("waiting for verify msg {}", verify_msg_len); if !wait_for_len(sockrx, len, verify_msg_len, buf).await { @@ -248,42 +299,91 @@ pub(super) mod listen_task_ext { } pub(super) async fn read_loop( + r: R, conn: super::HashValue, socket: &mut OwnedReadHalf, len: &mut usize, - buf: &mut [u8], + buf: &mut Vec, ) { *len = 0; - let mut offset = 0; + // let mut offset = 0; + let mut history_read_len = vec![]; loop { + // fn reset_when_overflow( + // buf: &mut Vec, + // offset: &mut usize, + // len: &mut usize, + // tarlen: usize, + // ) { + // if buf.len() < *offset + tarlen { + // tracing::debug!( + // "reset when overflow, offset: {}, len: {}, tarlen: {}", + // *offset, + // *len, + // tarlen + // ); + // buf.copy_within(*offset.., 0); + // *len -= *offset; + // *offset = 0; + // if buf.len() < tarlen { + // buf.resize(tarlen, 0); + // } + // tracing::debug!( + // "reseted with offset: {}, len: {}, tarlen: {}, buflen: {}", + // *offset, + // *len, + // tarlen, + // buf.len() + // ); + // } + // } + + // reset_when_overflow(buf, &mut offset, &mut *len, 9); let (msg_len, msg_id, taskid) = { - let buf = &mut buf[offset..]; + let buf = &mut buf[0..9]; + *len = 0; if !wait_for_len(socket, len, 9, buf).await { tracing::warn!("failed to read head len, stop rd loop"); return; } - offset += 9; + assert_eq!(*len, 9); + // offset += 9; ( - consume_i32(0, buf, len), + consume_i32(0, buf, len) as usize, consume_u8(4, buf, len), consume_i32(5, buf, len) as u32, ) }; + history_read_len.push(9); { - if buf.len() < offset + msg_len { - // move forward - buf.copy_within(offset.., 0); - offset = 0; + tracing::debug!( + "history_read_len: {:?}, total: {}", + history_read_len, + history_read_len.iter().sum::() + ); + // reset_when_overflow(buf, &mut offset, &mut *len, msg_len); + // if buf.len() < offset + msg_len { + // // move forward + // buf.copy_within(offset.., 0); + // offset = 0; + // *len -= offset; + // if buf.len() < msg_len { + // buf.resize(msg_len, 0); + // } + // } + if buf.len() < msg_len { + buf.resize(msg_len, 0); } - let buf = &mut buf[offset..]; - + let buf = &mut buf[0..msg_len]; + *len = 0; if !wait_for_len(socket, len, msg_len, buf).await { tracing::warn!("failed to read head len, stop rd loop"); return; } + assert_eq!(*len, msg_len); - if !R::handle_remote_call(&conn, msg_id, &buf[..msg_len]) { + if !r.handle_remote_call(&conn, msg_id, taskid, &buf[..msg_len]) { tracing::debug!("msg id not remote call to sys, seen as sys call response"); let Some(cb) = CALL_MAP.write().remove(&taskid) else { tracing::warn!( @@ -299,8 +399,9 @@ pub(super) mod listen_task_ext { } // update the buf meta - offset += msg_len; - *len -= msg_len; + // offset += msg_len; + // *len -= msg_len; + history_read_len.push(msg_len); } // match socket.read(buf).await { @@ -348,9 +449,10 @@ pub(super) mod listen_task_ext { tarlen: usize, buf: &mut [u8], ) -> bool { + // let mut write_offset = *len; while *len < tarlen { tracing::debug!("current len: {}, target len: {}", *len, tarlen); - match socket.read(buf).await { + match socket.read(&mut buf[*len..]).await { Ok(n) => { if n == 0 { tracing::warn!("connection closed"); @@ -358,6 +460,7 @@ pub(super) mod listen_task_ext { } // println!("recv: {:?}", buf[..n]); *len += n; + // write_offset += n; } Err(e) => { tracing::warn!("failed to read from socket; err = {:?}", e); @@ -375,10 +478,10 @@ pub(super) mod listen_task_ext { } // 4字节u32 长度 - pub(super) fn consume_i32(off: usize, buf: &mut [u8], len: &mut usize) -> usize { + pub(super) fn consume_i32(off: usize, buf: &mut [u8], len: &mut usize) -> i32 { // let ret = bincode::deserialize::(&buf[off..off + 4]).unwrap() as usize; let ret = Bytes::copy_from_slice(&buf[off..off + 4]).get_i32(); *len -= 4; - ret as usize + ret } } diff --git a/src/main/src/master/app/test.rs b/src/main/src/master/app/test.rs index b447c7f..5399d80 100644 --- a/src/main/src/master/app/test.rs +++ b/src/main/src/master/app/test.rs @@ -1,6 +1,7 @@ use crate::{ config::{NodeConfig, NodesConfig}, general::app::View, + sys::{LogicalModulesRef, Sys}, util::command::CommandDebugStdio, }; use axum::body::Bytes; @@ -15,41 +16,7 @@ use std::{collections::HashMap, env, fs, path::PathBuf, process::Stdio}; // #[cfg(test)] use crate::general::test_utils; -#[tokio::test(flavor = "multi_thread")] -async fn test_app_upload() -> Result<(), Box> { - // install java related by scripts/install/2.3install_java_related.py - // run real time output command - let (stdout_task, stderr_task, mut child) = Command::new("bash") - .arg("-c") - .arg("python3 scripts/install/2.3install_java_related.py") - .current_dir("../../../../../middlewares/waverless/waverless") - .stdout(Stdio::piped()) - .stderr(Stdio::piped()) - .spawn_debug() - .await - .unwrap(); - let status = child.wait().await.unwrap(); - if !status.success() { - panic!( - "install java related failed, stderr: {}, stdout: {}", - stderr_task.await.unwrap(), - stdout_task.await.unwrap() - ); - } - - // 使用 get_test_sys 新建两个系统模块(一个 master,一个 worker) - let ( - _sys_guard, // 互斥锁守卫 - _master_logical_modules, // 系统 0 (Master) 的逻辑模块引用 - worker_logical_modules, // 系统 1 (Worker) 的逻辑模块引用 - ) = test_utils::get_test_sys().await; - - // 延迟等待连接稳定 - tokio::time::sleep(tokio::time::Duration::from_secs(5)).await; - - //调用 bencher 的 prepare 模式触发应用上传 - tracing::debug!("test_app_upload uploading app"); - +async fn bencher(app_fn_name: &str, prepare: bool) { // 创建临时配置文件 let temp_dir = tempfile::tempdir().expect("Failed to create temp directory"); let config_path = temp_dir.path().join("cluster_config.yml"); @@ -75,15 +42,26 @@ worker: // 获取配置文件的绝对路径 let config_path_str = config_path.to_str().expect("Invalid path"); - let command_str = format!( - "echo $PWD && \ + let command_str = if prepare { + format!( + "echo $PWD && \ cargo run -- \ - simple_demo/simple \ + {} \ --with-wl \ --prepare \ --config {}", - config_path_str - ); + app_fn_name, config_path_str + ) + } else { + format!( + "echo $PWD && \ + cargo run -- \ + {} \ + --with-wl \ + --config {}", + app_fn_name, config_path_str + ) + }; let (stdout_task, stderr_task, mut child) = Command::new("bash") .arg("-c") @@ -109,6 +87,52 @@ worker: stderr_task.await.unwrap() ); } +} + +async fn start_sys_with_app_uploaded<'a>( + app_fn_name: &str, +) -> ( + tokio::sync::MutexGuard< + 'a, + std::option::Option<((Sys, LogicalModulesRef), (Sys, LogicalModulesRef))>, + >, + LogicalModulesRef, + LogicalModulesRef, +) { + // install java related by scripts/install/2.3install_java_related.py + // run real time output command + let (stdout_task, stderr_task, mut child) = Command::new("bash") + .arg("-c") + .arg("python3 scripts/install/2.3install_java_related.py") + .current_dir("../../../../../middlewares/waverless/waverless") + .stdout(Stdio::piped()) + .stderr(Stdio::piped()) + .spawn_debug() + .await + .unwrap(); + let status = child.wait().await.unwrap(); + if !status.success() { + panic!( + "install java related failed, stderr: {}, stdout: {}", + stderr_task.await.unwrap(), + stdout_task.await.unwrap() + ); + } + + // 使用 get_test_sys 新建两个系统模块(一个 master,一个 worker) + let ( + sys_guard, // 互斥锁守卫 + master_logical_modules, // 系统 0 (Master) 的逻辑模块引用 + worker_logical_modules, // 系统 1 (Worker) 的逻辑模块引用 + ) = test_utils::get_test_sys().await; + + // 延迟等待连接稳定 + tokio::time::sleep(tokio::time::Duration::from_secs(5)).await; + + //调用 bencher 的 prepare 模式触发应用上传 + tracing::debug!("test_app_upload uploading app"); + + bencher(app_fn_name, true).await; tracing::debug!( "test_app_upload app uploaded", @@ -119,7 +143,7 @@ worker: // tokio::time::sleep(tokio::time::Duration::from_secs(5)).await; // 应用名称 - let appname = "simple_demo"; + let appname = app_fn_name.split('/').next().unwrap(); // 读取本地 ZIP 文件的内容 let zip_path = format!("../../../../../middlewares/waverless/{}.zip", appname); let zip_content = tokio::fs::read(zip_path) @@ -153,7 +177,7 @@ worker: // 调用数据接口校验应用是否上传完成 tracing::debug!("test_app_upload verifying app meta"); - let app_meta = app_meta_manager2.get_app_meta("simple_demo").await; + let app_meta = app_meta_manager2.get_app_meta(appname).await; assert!(app_meta.is_ok(), "Failed to get app meta"); let app_meta = app_meta.unwrap(); assert!(app_meta.is_some(), "App meta data not found"); @@ -164,8 +188,19 @@ worker: tokio::time::sleep(tokio::time::Duration::from_secs(1)).await; tracing::debug!("test_app_upload waited {}s", i + 1); } + + (sys_guard, master_logical_modules, worker_logical_modules) +} + +#[tokio::test(flavor = "multi_thread")] +async fn test_app_upload() -> Result<(), Box> { + const APP_FN_NAME: &str = "simple_demo/simple"; + // tokio::time::sleep(tokio::time::Duration::from_secs(10)).await; + let (_sys_guard, _master_logical_modules, _worker_logical_modules) = + start_sys_with_app_uploaded(APP_FN_NAME).await; + // 发起对函数的 http 请求校验应用是否运行 // 发起对函数的 http 请求校验应用是否运行 tracing::debug!("test_app_upload try calling test app"); let client = reqwest::Client::new(); @@ -212,3 +247,15 @@ worker: Ok(()) // 返回 Ok(()) 表示成功 } + +#[tokio::test(flavor = "multi_thread")] +async fn test_write_data_trigger_app() -> Result<(), Box> { + const APP_FN_NAME: &str = "img_resize/resize"; + + let (_sys_guard, _master_logical_modules, _worker_logical_modules) = + start_sys_with_app_uploaded(APP_FN_NAME).await; + + bencher(APP_FN_NAME, false).await; + + Ok(()) +} diff --git a/src/main/src/master/data/m_data_master.rs b/src/main/src/master/data/m_data_master.rs index 454fa6e..4080317 100644 --- a/src/main/src/master/data/m_data_master.rs +++ b/src/main/src/master/data/m_data_master.rs @@ -356,6 +356,7 @@ impl DataMaster { app_meta_encoded, }) = ctx.ope_role.as_ref().unwrap() { + tracing::debug!("update app meta for data({:?})", req.unique_id); let meta: AppMeta = bincode::deserialize(&app_meta_encoded).map_err(|e| { WsDataError::DataDecodeError { reason: format!("err: {:?}", e), diff --git a/src/main/src/master/m_master.rs b/src/main/src/master/m_master.rs index daa0a06..14e632c 100644 --- a/src/main/src/master/m_master.rs +++ b/src/main/src/master/m_master.rs @@ -226,6 +226,8 @@ impl Master { .into()); } + tracing::debug!("trigger func call for data({:?})", ctx.data_unique_id); + // Generate task and operation IDs let task_id = self.view.executor().register_sub_task(); let opeid = self.ope_id_allocator.fetch_add(1, Ordering::Relaxed); diff --git a/src/main/src/metrics.rs b/src/main/src/metrics.rs new file mode 100644 index 0000000..b73ec41 --- /dev/null +++ b/src/main/src/metrics.rs @@ -0,0 +1,44 @@ +use prometheus::{register_counter, register_gauge, Counter, Gauge, Encoder, TextEncoder}; +use lazy_static::lazy_static; +use axum::response::IntoResponse; +use sysinfo::{System, SystemExt}; + +lazy_static! { + // HTTP请求总数 + pub static ref HTTP_REQUESTS_TOTAL: Counter = register_counter!( + "waverless_http_requests_total", + "HTTP请求总数" + ).unwrap(); + + // 函数调用总数 + pub static ref FUNCTION_CALLS_TOTAL: Counter = register_counter!( + "waverless_function_calls_total", + "函数调用总数" + ).unwrap(); + + // 批处理任务数 + pub static ref BATCH_TASKS_TOTAL: Counter = register_counter!( + "waverless_batch_tasks_total", + "批处理任务数" + ).unwrap(); + + // 节点内存使用量(单位:字节) + pub static ref NODE_MEMORY_USAGE: Gauge = register_gauge!( + "waverless_node_memory_usage_bytes", + "节点内存使用量" + ).unwrap(); +} + +// /metrics 路由的 handler +pub async fn metrics_handler() -> impl IntoResponse { + // 每次请求时更新内存指标 + let mut sys = System::new_all(); + sys.refresh_memory(); + NODE_MEMORY_USAGE.set(sys.used_memory() as f64 * 1024.0); // 转为字节 + + let encoder = TextEncoder::new(); + let metric_families = prometheus::gather(); + let mut buffer = Vec::new(); + encoder.encode(&metric_families, &mut buffer).unwrap(); + String::from_utf8(buffer).unwrap() +} \ No newline at end of file diff --git a/src/main/src/result.rs b/src/main/src/result.rs index 8f746a4..38c1641 100644 --- a/src/main/src/result.rs +++ b/src/main/src/result.rs @@ -5,7 +5,7 @@ use camelpaste::paste; use prost::{DecodeError, Message}; use qp2p::{EndpointError, SendError}; use thiserror::Error; -use tokio::task::JoinError; +use tokio::{sync::mpsc, task::JoinError}; use wasmedge_sdk::error::WasmEdgeError; use zip_extract::ZipExtractError; @@ -124,6 +124,17 @@ pub enum WsPermissionErr { }, } +#[derive(Debug)] +pub enum ProcRpcErr { + ConnIdNotFound { + conn: HashValue, + context: String, + }, + SendRespFailed { + err: mpsc::error::SendError>, + context: String, + }, +} #[derive(Debug)] pub enum WsFuncError { WasmError(Box), @@ -404,6 +415,9 @@ pub enum WSError { #[error("Runtime error: {0:?}")] WsRuntimeErr(WsRuntimeErr), + #[error("ProcRpc error: {0:?}")] + WsProcRpcErr(ProcRpcErr), + #[error("Not Implemented")] NotImplemented, } @@ -474,6 +488,12 @@ impl From for WSError { } } +impl From for WSError { + fn from(e: ProcRpcErr) -> Self { + WSError::WsProcRpcErr(e) + } +} + pub struct ErrCvt(pub T); macro_rules! impl_err_convertor { diff --git a/src/main/src/sys.rs b/src/main/src/sys.rs index 4296248..b3e1b72 100644 --- a/src/main/src/sys.rs +++ b/src/main/src/sys.rs @@ -1,6 +1,7 @@ use crate::general::app::app_owned::wasm_host_funcs; use crate::general::app::instance::m_instance_manager::InstanceManager; use crate::general::app::m_executor::Executor; +use crate::general::data::m_kv_user_client::KvUserClient; use crate::{ config::NodesConfig, general::{ @@ -17,7 +18,7 @@ use crate::{ m_metric_observor::MetricObservor, }, modules_global_bridge, util, - worker::{m_kv_user_client::KvUserClient, m_worker::WorkerCore}, + worker::m_worker::WorkerCore, }; use crate::{result::WSResult, util::JoinHandleWrapper}; use async_trait::async_trait; @@ -355,7 +356,9 @@ start_modules!( instance_manager, InstanceManager, executor, - Executor + Executor, + kv_user_client, + KvUserClient ], [ metric_observor, @@ -371,5 +374,5 @@ start_modules!( app_master, MasterAppMgmt ], - [worker, WorkerCore, kv_user_client, KvUserClient] + [worker, WorkerCore] ); diff --git a/src/main/src/worker/mod.rs b/src/main/src/worker/mod.rs index efe51cf..911a333 100644 --- a/src/main/src/worker/mod.rs +++ b/src/main/src/worker/mod.rs @@ -1,5 +1,4 @@ pub mod m_http_handler; -pub mod m_kv_user_client; pub mod m_worker; // use axum::{extract::Path, routing::get, Router}; diff --git a/test/files/node_config.yaml b/test/files/node_config.yaml deleted file mode 100644 index 78aef3a..0000000 --- a/test/files/node_config.yaml +++ /dev/null @@ -1,7 +0,0 @@ -nodes: - 1: - addr: 127.0.0.1:2600 - spec: [meta,master] - 2: - addr: 127.0.0.1:2605 - spec: [meta,worker] diff --git a/test/kv_store_engine_1/conf b/test/kv_store_engine_1/conf deleted file mode 100644 index 4154d7c..0000000 --- a/test/kv_store_engine_1/conf +++ /dev/null @@ -1,4 +0,0 @@ -segment_size: 524288 -use_compression: false -version: 0.34 -vQ \ No newline at end of file diff --git a/test/kv_store_engine_1/db b/test/kv_store_engine_1/db deleted file mode 100644 index 03cab51..0000000 Binary files a/test/kv_store_engine_1/db and /dev/null differ