Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
206 changes: 157 additions & 49 deletions src/main/src/general/app/m_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -333,6 +333,7 @@ impl Executor {
tracing::debug!("receive distribute task: {:?}", req);
let app = req.app.to_owned();
let func = req.func.to_owned();
// todo
let (appmeta, _) = match self.view.appmeta_manager().get_app_meta(&app).await {
Ok(Some(appmeta)) => appmeta,
Ok(None) => {
Expand Down Expand Up @@ -378,70 +379,177 @@ impl Executor {
return;
};

//费新文
// distribute task requires sync support
// if fnmeta.sync_async.asyncable() {
// let warn = format!(
// "func {} not support sync, meta:{:?}",
// func, fnmeta.sync_async
// );
// tracing::warn!("{}", warn);
// if let Err(err) = resp
// .send_resp(DistributeTaskResp {
// success: false,
// err_msg: warn,
// })
// .await
// {
// tracing::error!("send distribute task resp failed with err: {}", err);
// }
// return;
// }

// // construct sync fn exe ctx
// let ctx = FnExeCtxSync::new(
// match FnExeCtxAsyncAllowedType::try_from(apptype) { // 这里修正为 FnExeCtxAsyncAllowedType
// Ok(v) => v,
// Err(err) => {
// let warn = format!("app type {:?} not supported, err: {}", apptype, err);
// tracing::warn!("{}", warn);
// if let Err(err) = resp
// .send_resp(DistributeTaskResp {
// success: false,
// err_msg: warn,
// })
// .await
// {
// tracing::error!("send distribute task resp failed with err: {}", err);
// }
// return;
// }
// },
// req.app,
// req.func,
// fnmeta.clone(),
// req.task_id as usize,
// match req.trigger.unwrap() {
// distribute_task_req::Trigger::EventNew(new) => EventCtx::KvSet {
// key: new.key,
// opeid: Some(new.opeid),
// },
// distribute_task_req::Trigger::EventWrite(write) => EventCtx::KvSet {
// key: write.key,
// opeid: Some(write.opeid),
// },
// },
// );

// if let Err(err) = resp
// .send_resp(DistributeTaskResp {
// success: true,
// err_msg: "".to_owned(),
// })
// .await
// {
// tracing::error!("send sche resp for app:{app} fn:{func} failed with err: {err}");
// }
// let _ = self.execute_sync(ctx);

//判断函数是否支持异步或者同步
// distribute task requires async support
if !fnmeta.sync_async.asyncable() {
let warn = format!(
"func {} not support async, meta:{:?}",
func, fnmeta.sync_async
//如果函数支持同步
// construct sync fn exe ctx
let ctx = FnExeCtxSync::new(
match FnExeCtxAsyncAllowedType::try_from(apptype) { // 这里修正为 FnExeCtxAsyncAllowedType
Ok(v) => v,
Err(err) => {
let warn = format!("app type {:?} not supported, err: {}", apptype, err);
tracing::warn!("{}", warn);
if let Err(err) = resp
.send_resp(DistributeTaskResp {
success: false,
err_msg: warn,
})
.await
{
tracing::error!("send distribute task resp failed with err: {}", err);
}
return;
}
},
req.app,
req.func,
fnmeta.clone(),
req.task_id as usize,
match req.trigger.unwrap() {
distribute_task_req::Trigger::EventNew(new) => EventCtx::KvSet {
key: new.key,
opeid: Some(new.opeid),
},
distribute_task_req::Trigger::EventWrite(write) => EventCtx::KvSet {
key: write.key,
opeid: Some(write.opeid),
},
},
);
tracing::warn!("{}", warn);

if let Err(err) = resp
.send_resp(DistributeTaskResp {
success: false,
err_msg: warn,
success: true,
err_msg: "".to_owned(),
})
.await
{
tracing::error!("send distribute task resp failed with err: {}", err);
tracing::error!("send sche resp for app:{app} fn:{func} failed with err: {err}");
}
return;
}
let _ = self.execute_sync(ctx);

// construct async fn exe ctx
let ctx = FnExeCtxAsync::new(
match FnExeCtxAsyncAllowedType::try_from(apptype) {
Ok(v) => v,
Err(err) => {
let warn = format!("app type {:?} not supported, err: {}", apptype, err);
tracing::warn!("{}", warn);
if let Err(err) = resp
.send_resp(DistributeTaskResp {
success: false,
err_msg: warn,
})
.await
{
tracing::error!("send distribute task resp failed with err: {}", err);

} else {
//如果函数支持异步
// construct async fn exe ctx
let ctx = FnExeCtxAsync::new(
match FnExeCtxAsyncAllowedType::try_from(apptype) {
Ok(v) => v,
Err(err) => {
let warn = format!("app type {:?} not supported, err: {}", apptype, err);
tracing::warn!("{}", warn);
if let Err(err) = resp
.send_resp(DistributeTaskResp {
success: false,
err_msg: warn,
})
.await
{
tracing::error!("send distribute task resp failed with err: {}", err);
}
return;
}
return;
}
},
req.app,
req.func,
fnmeta.clone(),
req.task_id as usize,
match req.trigger.unwrap() {
distribute_task_req::Trigger::EventNew(new) => EventCtx::KvSet {
key: new.key,
opeid: Some(new.opeid),
},
distribute_task_req::Trigger::EventWrite(write) => EventCtx::KvSet {
key: write.key,
opeid: Some(write.opeid),
req.app,
req.func,
fnmeta.clone(),
req.task_id as usize,
match req.trigger.unwrap() {
distribute_task_req::Trigger::EventNew(new) => EventCtx::KvSet {
key: new.key,
opeid: Some(new.opeid),
},
distribute_task_req::Trigger::EventWrite(write) => EventCtx::KvSet {
key: write.key,
opeid: Some(write.opeid),
},
},
},
);
);

if let Err(err) = resp
.send_resp(DistributeTaskResp {
success: true,
err_msg: "".to_owned(),
})
.await
{
tracing::error!("send sche resp for app:{app} fn:{func} failed with err: {err}");
}
let _ = self.execute(ctx).await;

if let Err(err) = resp
.send_resp(DistributeTaskResp {
success: true,
err_msg: "".to_owned(),
})
.await
{
tracing::error!("send sche resp for app:{app} fn:{func} failed with err: {err}");
}
let _ = self.execute(ctx).await;




}

pub async fn handle_http_task(&self, route: &str, text: String) -> WSResult<Option<String>> {
Expand Down
Loading
Loading