diff --git a/src/main/src/general/app/m_executor.rs b/src/main/src/general/app/m_executor.rs index 7e90948..2c5ddf4 100644 --- a/src/main/src/general/app/m_executor.rs +++ b/src/main/src/general/app/m_executor.rs @@ -333,6 +333,7 @@ impl Executor { tracing::debug!("receive distribute task: {:?}", req); let app = req.app.to_owned(); let func = req.func.to_owned(); + // todo let (appmeta, _) = match self.view.appmeta_manager().get_app_meta(&app).await { Ok(Some(appmeta)) => appmeta, Ok(None) => { @@ -378,70 +379,177 @@ impl Executor { return; }; + //费新文 + // distribute task requires sync support + // if fnmeta.sync_async.asyncable() { + // let warn = format!( + // "func {} not support sync, meta:{:?}", + // func, fnmeta.sync_async + // ); + // tracing::warn!("{}", warn); + // if let Err(err) = resp + // .send_resp(DistributeTaskResp { + // success: false, + // err_msg: warn, + // }) + // .await + // { + // tracing::error!("send distribute task resp failed with err: {}", err); + // } + // return; + // } + + // // construct sync fn exe ctx + // let ctx = FnExeCtxSync::new( + // match FnExeCtxAsyncAllowedType::try_from(apptype) { // 这里修正为 FnExeCtxAsyncAllowedType + // Ok(v) => v, + // Err(err) => { + // let warn = format!("app type {:?} not supported, err: {}", apptype, err); + // tracing::warn!("{}", warn); + // if let Err(err) = resp + // .send_resp(DistributeTaskResp { + // success: false, + // err_msg: warn, + // }) + // .await + // { + // tracing::error!("send distribute task resp failed with err: {}", err); + // } + // return; + // } + // }, + // req.app, + // req.func, + // fnmeta.clone(), + // req.task_id as usize, + // match req.trigger.unwrap() { + // distribute_task_req::Trigger::EventNew(new) => EventCtx::KvSet { + // key: new.key, + // opeid: Some(new.opeid), + // }, + // distribute_task_req::Trigger::EventWrite(write) => EventCtx::KvSet { + // key: write.key, + // opeid: Some(write.opeid), + // }, + // }, + // ); + + // if let Err(err) = resp + // .send_resp(DistributeTaskResp { + // success: true, + // err_msg: "".to_owned(), + // }) + // .await + // { + // tracing::error!("send sche resp for app:{app} fn:{func} failed with err: {err}"); + // } + // let _ = self.execute_sync(ctx); + + //判断函数是否支持异步或者同步 // distribute task requires async support if !fnmeta.sync_async.asyncable() { - let warn = format!( - "func {} not support async, meta:{:?}", - func, fnmeta.sync_async + //如果函数支持同步 + // construct sync fn exe ctx + let ctx = FnExeCtxSync::new( + match FnExeCtxAsyncAllowedType::try_from(apptype) { // 这里修正为 FnExeCtxAsyncAllowedType + Ok(v) => v, + Err(err) => { + let warn = format!("app type {:?} not supported, err: {}", apptype, err); + tracing::warn!("{}", warn); + if let Err(err) = resp + .send_resp(DistributeTaskResp { + success: false, + err_msg: warn, + }) + .await + { + tracing::error!("send distribute task resp failed with err: {}", err); + } + return; + } + }, + req.app, + req.func, + fnmeta.clone(), + req.task_id as usize, + match req.trigger.unwrap() { + distribute_task_req::Trigger::EventNew(new) => EventCtx::KvSet { + key: new.key, + opeid: Some(new.opeid), + }, + distribute_task_req::Trigger::EventWrite(write) => EventCtx::KvSet { + key: write.key, + opeid: Some(write.opeid), + }, + }, ); - tracing::warn!("{}", warn); + if let Err(err) = resp .send_resp(DistributeTaskResp { - success: false, - err_msg: warn, + success: true, + err_msg: "".to_owned(), }) .await { - tracing::error!("send distribute task resp failed with err: {}", err); + tracing::error!("send sche resp for app:{app} fn:{func} failed with err: {err}"); } - return; - } + let _ = self.execute_sync(ctx); + - // construct async fn exe ctx - let ctx = FnExeCtxAsync::new( - match FnExeCtxAsyncAllowedType::try_from(apptype) { - Ok(v) => v, - Err(err) => { - let warn = format!("app type {:?} not supported, err: {}", apptype, err); - tracing::warn!("{}", warn); - if let Err(err) = resp - .send_resp(DistributeTaskResp { - success: false, - err_msg: warn, - }) - .await - { - tracing::error!("send distribute task resp failed with err: {}", err); + + } else { + //如果函数支持异步 + // construct async fn exe ctx + let ctx = FnExeCtxAsync::new( + match FnExeCtxAsyncAllowedType::try_from(apptype) { + Ok(v) => v, + Err(err) => { + let warn = format!("app type {:?} not supported, err: {}", apptype, err); + tracing::warn!("{}", warn); + if let Err(err) = resp + .send_resp(DistributeTaskResp { + success: false, + err_msg: warn, + }) + .await + { + tracing::error!("send distribute task resp failed with err: {}", err); + } + return; } - return; - } - }, - req.app, - req.func, - fnmeta.clone(), - req.task_id as usize, - match req.trigger.unwrap() { - distribute_task_req::Trigger::EventNew(new) => EventCtx::KvSet { - key: new.key, - opeid: Some(new.opeid), }, - distribute_task_req::Trigger::EventWrite(write) => EventCtx::KvSet { - key: write.key, - opeid: Some(write.opeid), + req.app, + req.func, + fnmeta.clone(), + req.task_id as usize, + match req.trigger.unwrap() { + distribute_task_req::Trigger::EventNew(new) => EventCtx::KvSet { + key: new.key, + opeid: Some(new.opeid), + }, + distribute_task_req::Trigger::EventWrite(write) => EventCtx::KvSet { + key: write.key, + opeid: Some(write.opeid), + }, }, - }, - ); + ); + + if let Err(err) = resp + .send_resp(DistributeTaskResp { + success: true, + err_msg: "".to_owned(), + }) + .await + { + tracing::error!("send sche resp for app:{app} fn:{func} failed with err: {err}"); + } + let _ = self.execute(ctx).await; - if let Err(err) = resp - .send_resp(DistributeTaskResp { - success: true, - err_msg: "".to_owned(), - }) - .await - { - tracing::error!("send sche resp for app:{app} fn:{func} failed with err: {err}"); } - let _ = self.execute(ctx).await; + + + + } pub async fn handle_http_task(&self, route: &str, text: String) -> WSResult> { diff --git a/src/main/src/general/data/m_data_general/mod.rs b/src/main/src/general/data/m_data_general/mod.rs index 19a372f..250da75 100644 --- a/src/main/src/general/data/m_data_general/mod.rs +++ b/src/main/src/general/data/m_data_general/mod.rs @@ -50,6 +50,21 @@ use tokio::sync::Semaphore; use tokio::task::JoinError; use ws_derive::LogicalModule; + +// 费新文 +// use crate::general::network::proto::sche::{DistributeTaskReq, DistributeTaskResp}; +// use crate::general::app::app_native::NativeAppInstance; +// use crate::general::app::instance::InstanceTrait; +// use crate::general::app::m_executor::{ +// EventCtx, +// FnExeCtxSync, +// FnExeCtxAsyncAllowedType, +// }; + + + + + logical_module_view_impl!(DataGeneralView); logical_module_view_impl!(DataGeneralView, p2p, P2PModule); logical_module_view_impl!(DataGeneralView, data_general, DataGeneral); @@ -103,11 +118,19 @@ pub struct DataGeneral { rpc_call_get_data_meta: RPCCaller, rpc_call_get_data: RPCCaller, + //费新文 + // rpc_call_distribute_task: RPCCaller, + + rpc_handler_write_once_data: RPCHandler, rpc_handler_batch_data: RPCHandler, rpc_handler_data_meta_update: RPCHandler, rpc_handler_get_data_meta: RPCHandler, rpc_handler_get_data: RPCHandler, + + //费新文 + // rpc_handler_distribute_task: RPCHandler, + // 批量数据接收状态管理 batch_receive_states: AsyncInitMap>, @@ -122,12 +145,21 @@ impl DataGeneral { rpc_call_batch_data: RPCCaller::new(), rpc_call_get_data_meta: RPCCaller::new(), rpc_call_get_data: RPCCaller::new(), + + //费新文 + // rpc_call_distribute_task: RPCCaller::new(), + + rpc_handler_write_once_data: RPCHandler::new(), rpc_handler_batch_data: RPCHandler::new(), rpc_handler_data_meta_update: RPCHandler::new(), rpc_handler_get_data_meta: RPCHandler::new(), rpc_handler_get_data: RPCHandler::new(), batch_receive_states: AsyncInitMap::new(), + + //费新文 + // rpc_handler_distribute_task: RPCHandler::new(), + } } @@ -1161,6 +1193,79 @@ impl DataGeneral { Ok(()) } + + + + //费新文 + // pub async fn distribute_task_to_worker( + // &self, + // worker_node: NodeID, + // app_name: String, + // func_name: String, + // task_id: u32, + // key: Option>, + // is_write_trigger: bool, + // ) -> WSResult { + // let mut req = DistributeTaskReq { + // app: app_name, + // func: func_name, + // task_id, + // trigger: None, + // }; + + // if let Some(key_data) = key { + // if is_write_trigger { + // req.trigger = Some(proto::sche::distribute_task_req::Trigger::EventWrite( + // proto::sche::distribute_task_req::DataEventTriggerWrite { + // key: key_data, + // opeid: task_id, + // } + // )); + // } else { + // req.trigger = Some(proto::sche::distribute_task_req::Trigger::EventNew( + // proto::sche::distribute_task_req::DataEventTriggerNew { + // key: key_data, + // opeid: task_id, + // } + // )); + // } + // } + + // self.rpc_call_distribute_task + // .call( + // self.view.p2p(), + // worker_node, + // req, + // Some(Duration::from_secs(30)), + // ) + // .await + // .map_err(|e| WsNetworkLogicErr::RPCErr { //这里错误类型需要换 + // message: e.to_string(), + // }.into()) + // } + + // async fn rpc_handle_distribute_task( + // &self, + // responsor: RPCResponsor, + // req: DistributeTaskReq, + // ) { + // tracing::debug!("rpc_handle_distribute_task with req({:?})", req); + + // // TODO: 这里需要实现具体的任务处理逻辑 + + // if let Err(e) = responsor + // .send_resp(DistributeTaskResp { + // success: true, + // err_msg: String::new(), + // }) + // .await { + // tracing::error!("Failed to send distribute task response: {}", e); + // } + // } + + + + } #[derive(Serialize, Deserialize, Debug, Clone)] @@ -1542,6 +1647,11 @@ impl LogicalModule for DataGeneral { rpc_call_get_data_meta: RPCCaller::new(), rpc_call_get_data: RPCCaller::new(), + // //费新文 + // rpc_call_distribute_task: RPCCaller::new(), + // rpc_handler_distribute_task: RPCHandler::new(), + + rpc_handler_write_once_data: RPCHandler::new(), rpc_handler_batch_data: RPCHandler::new(), rpc_handler_data_meta_update: RPCHandler::new(), @@ -1564,10 +1674,30 @@ impl LogicalModule for DataGeneral { self.rpc_call_batch_data.regist(p2p); self.rpc_call_get_data_meta.regist(p2p); self.rpc_call_get_data.regist(p2p); + + + //费新文 + // self.rpc_call_distribute_task.regist(p2p); } // register rpc handlers { + + //费新文 + // let view = self.view.clone(); + // self.rpc_handler_distribute_task.regist( + // p2p, + // move |responsor: RPCResponsor, + // req: DistributeTaskReq| { + // let view = view.clone(); + // let _ = tokio::spawn(async move { + // view.data_general().rpc_handle_distribute_task(responsor, req).await; + // }); + // Ok(()) + // }, + // ); + + let view = self.view.clone(); self.rpc_handler_write_once_data .regist(p2p, move |responsor, req| { diff --git a/src/main/src/master/app/test.rs b/src/main/src/master/app/test.rs new file mode 100644 index 0000000..e69de29 diff --git a/src/main/src/master/m_master.rs b/src/main/src/master/m_master.rs index 5c4849f..1e12724 100644 --- a/src/main/src/master/m_master.rs +++ b/src/main/src/master/m_master.rs @@ -162,6 +162,7 @@ impl Master { .master() .rpc_caller_distribute_task .call( + //理解 self.view.p2p(), self.select_node(), DistributeTaskReq {