Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
125 changes: 66 additions & 59 deletions src/main/src/general/app/app_owned/wasm_host_funcs/kv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ async fn kv_batch_ope<T>(
) -> Result<Vec<WasmValue>, HostFuncError> {
let opes_arg_ptr = args[0].to_i32();
let opes_arg_len = args[1].to_i32();
let opes_id = utils::mutref::<i32>(&caller, args[2].to_i32());
let _opes_id = utils::mutref::<i32>(&caller, args[2].to_i32());
let args = utils::i32slice(&caller, opes_arg_ptr, opes_arg_len);
let func_ctx = unsafe {
#[cfg(feature = "unsafe-log")]
Expand Down Expand Up @@ -153,7 +153,7 @@ async fn kv_batch_ope<T>(
proto::kv::kv_request::KvPutRequest {
kv: Some(KvPair {
key: key.to_owned(),
value: value.to_owned(),
values: vec![value.to_owned()],
}),
},
)),
Expand All @@ -167,6 +167,7 @@ async fn kv_batch_ope<T>(
requests.push(KvRequest {
op: Some(proto::kv::kv_request::Op::Get(
proto::kv::kv_request::KvGetRequest {
idxs: vec![0],
range: Some(KeyRange {
start: key.to_owned(),
end: vec![],
Expand Down Expand Up @@ -221,69 +222,70 @@ async fn kv_batch_ope<T>(
}
}
// tracing::debug!("requests:{:?}", requests);
let prev_kv_opeid = func_ctx
let _prev_kv_opeid = func_ctx
.event_ctx_mut()
.take_prev_kv_opeid()
.map_or(-1, |v| v as i64);
match m_kv_user_client()
.kv_requests(
func_ctx.app(),
func_ctx.func(),
todo!("wasm kv is not updated for src wait"),
KvRequests {
requests,
app: func_ctx.app().to_owned(),
func: func_ctx.func().to_owned(),
prev_kv_opeid,
prev_kv_opeid: _prev_kv_opeid,
},
)
.await
{
Ok(res) => {
let id = NEXT_CACHE_ID.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
// Write back the results to wasm runtime
let mut cur_idx = 1;
let mut resps = res.responses.iter();
// Construct the requests
for _ in 0..ope_cnt {
let ope_type = args[cur_idx];
match ope_type as usize {
// set
SET_ID => {
let _ = resps.next().unwrap();
cur_idx += 5;
}
// get
GET_ID => {
let kvs = resps.next().unwrap().common_kvs().unwrap();
// get len
*utils::mutref::<i32>(&caller, args[cur_idx + 3]) = if kvs.len() > 0 {
kvs.get(0).unwrap().value.len() as i32
} else {
-1
};
cur_idx += 4;
}
// lock
LOCK_ID => {
if let Some(lockid) = resps.next().unwrap().lock_id() {
// lock id is allocated by the remote when call the lock
*utils::mutref::<u32>(&caller, args[cur_idx + 4]) = lockid;
} else {
// unlock, no response
}
cur_idx += 5;
}
DELETE_ID => {
let _ = resps.next().unwrap();
cur_idx += 3;
}
_ => {
panic!("not implemented");
}
}
}
RECENT_KV_CACHE.insert(id, res);
*opes_id = id;
Ok(_res) => {
// https://fvd360f8oos.feishu.cn/wiki/M4ubwJkvcichuHkiGhjc0miHn5f#share-FuaYd5qZboOkhZxYMc2cCV0fndh
todo!("wasm kv is not updated for multi value items (dataset)");
// let id = NEXT_CACHE_ID.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
// // Write back the results to wasm runtime
// let mut cur_idx = 1;
// let mut resps = res.responses.iter();
// // Construct the requests
// for _ in 0..ope_cnt {
// let ope_type = args[cur_idx];
// match ope_type as usize {
// // set
// SET_ID => {
// let _ = resps.next().unwrap();
// cur_idx += 5;
// }
// // get
// GET_ID => {
// let get_resp = resps.next().unwrap().get_kvs().unwrap();
// // get len
// *utils::mutref::<i32>(&caller, args[cur_idx + 3]) = if get_resp.len() > 0 {
// kvs.get(0).unwrap().value.len() as i32
// } else {
// -1
// };
// cur_idx += 4;
// }
// // lock
// LOCK_ID => {
// if let Some(lockid) = resps.next().unwrap().lock_id() {
// // lock id is allocated by the remote when call the lock
// *utils::mutref::<u32>(&caller, args[cur_idx + 4]) = lockid;
// } else {
// // unlock, no response
// }
// cur_idx += 5;
// }
// DELETE_ID => {
// let _ = resps.next().unwrap();
// cur_idx += 3;
// }
// _ => {
// panic!("not implemented");
// }
// }
// }
// RECENT_KV_CACHE.insert(id, res);
// *opes_id = id;
}
Err(err) => {
tracing::error!("kv batch ope error:{}", err);
Expand All @@ -304,13 +306,18 @@ fn kv_batch_res(caller: Caller, args: Vec<WasmValue>) -> Result<Vec<WasmValue>,
while cur_idx < args.len() {
let ope_idx = args[cur_idx];
if let Some(res) = res.responses.get(ope_idx as usize) {
if let Some(kvs) = res.common_kvs() {
if let Some(kv) = kvs.get(0) {
let slice =
utils::mutu8sclice(&caller, args[cur_idx + 1], kv.value.len() as i32)
.unwrap();
slice.copy_from_slice(kvs.get(0).unwrap().value.as_slice());
}
if let Some(_kvs) = res.get_kvs() {
// https://fvd360f8oos.feishu.cn/wiki/M4ubwJkvcichuHkiGhjc0miHn5f#share-FuaYd5qZboOkhZxYMc2cCV0fndh
todo!("wasm kv is not updated for multi value items (dataset)");
// if let Some(kv) = kvs.get(0) {
// let slice = utils::mutu8sclice(
// &caller,
// args[cur_idx + 1],
// kv.values[0].len() as i32,
// )
// .unwrap();
// slice.copy_from_slice(kv.values[0].as_slice());
// }
} else if let Some(_lock_id) = res.lock_id() {
// do nothing
} else {
Expand Down
26 changes: 13 additions & 13 deletions src/main/src/general/app/app_owned/wasm_host_funcs/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,10 @@ use result::ResultFuncsRegister;
mod utils {

use super::UnsafeFunctionCtx;
use crate::general::app::m_executor::{FnExeCtxAsync};
use crate::general::app::m_executor::FnExeCtxAsync;
use crate::general::app::InstanceManager;
use crate::{
general::m_os::OperatingSystem, sys::LogicalModulesRef, util::SendNonNull,
worker::m_kv_user_client::KvUserClient,
};
use crate::general::data::m_kv_user_client::KvUserClient;
use crate::{general::m_os::OperatingSystem, sys::LogicalModulesRef, util::SendNonNull};
use wasmedge_sdk::{Caller, Instance, Memory};

pub trait WasmCtx {
Expand Down Expand Up @@ -127,14 +125,16 @@ mod utils {
}

pub fn m_kv_user_client() -> &'static KvUserClient {
unsafe {
&(*MODULES.as_ref().unwrap().inner.as_ptr())
.as_ref()
.unwrap()
.kv_user_client
.as_ref()
.unwrap()
}
// https://fvd360f8oos.feishu.cn/wiki/M4ubwJkvcichuHkiGhjc0miHn5f#share-FuaYd5qZboOkhZxYMc2cCV0fndh
todo!("wasm kv is not updated for src wait and multi value items (dataset)");
// unsafe {
// &(*MODULES.as_ref().unwrap().inner.as_ptr())
// .as_ref()
// .unwrap()
// .kv_user_client
// .as_ref()
// .unwrap()
// }
}

pub fn m_fs<'a>() -> &'a OperatingSystem {
Expand Down
1 change: 1 addition & 0 deletions src/main/src/general/app/app_shared/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ pub mod java;
pub mod process;
pub mod process_instance_man_related;
pub mod process_rpc;
pub mod process_rpc_proto_ext;

use crate::general::app::instance::InstanceTrait;
use crate::general::app::m_executor::{FnExeCtxAsync, FnExeCtxSync};
Expand Down
18 changes: 15 additions & 3 deletions src/main/src/general/app/app_shared/process.rs
Original file line number Diff line number Diff line change
Expand Up @@ -215,10 +215,22 @@ impl InstanceTrait for ProcessInstance {
fn_ctx.func()
);
tracing::debug!("before process_rpc::call_func ");
let res =
process_rpc::call_func(fn_ctx.app(), fn_ctx.func(), fn_ctx.http_str_unwrap()).await;

let res = process_rpc::call_func(
proc_proto::FnTaskId {
call_node_id: fn_ctx.task_id().call_node_id,
task_id: fn_ctx.task_id().task_id,
},
fn_ctx.app(),
fn_ctx.func(),
fn_ctx.format_arg_to_pass(),
)
.await;
tracing::debug!("after process_rpc::call_func ");
return res.map(|v| Some(v.ret_str));
match res {
Ok(resp) => Ok(Some(resp.ret_str)),
Err(e) => Err(e.into()),
}
}

/// Process instances don't support synchronous execution
Expand Down
109 changes: 96 additions & 13 deletions src/main/src/general/app/app_shared/process_rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ use self::proc_proto::{FuncCallReq, FuncCallResp};
use super::SharedInstance;
use crate::general::app;
use crate::general::app::app_shared::process_rpc::proc_proto::AppStarted;
use crate::general::app::app_shared::process_rpc_proto_ext::{ProcRpcExtKvReq, ProcRpcReqExt};
use crate::general::network::rpc_model::ProcRpcTaskId;
use crate::{
general::network::rpc_model::{self, HashValue, MsgIdBind, ReqMsg, RpcCustom},
modules_global_bridge::process_func::ModulesGlobalBrigeInstanceManager,
Expand Down Expand Up @@ -119,16 +121,19 @@ impl RpcCustom for ProcessRpc {
Some(HashValue::Str(res.appid))
}

fn handle_remote_call(_conn: &HashValue, id: u8, buf: &[u8]) -> bool {
tracing::debug!("handle_remote_call: id: {}", id);
let _ = match id {
4 => (),
id => {
tracing::warn!("handle_remote_call: unsupported id: {}", id);
return false;
}
};
let err = match id {
fn handle_remote_call(
&self,
conn: &HashValue,
msgid: u8,
taskid: ProcRpcTaskId,
buf: &[u8],
) -> bool {
tracing::debug!("handle_remote_call: id: {}", msgid);
// let _ = match id {
// 4 => (),

// };
let err = match msgid {
4 => match proc_proto::UpdateCheckpoint::decode(buf) {
Ok(_req) => {
tracing::debug!("function requested for checkpoint, but we ignore it");
Expand All @@ -143,7 +148,57 @@ impl RpcCustom for ProcessRpc {
}
Err(e) => e,
},
_ => unreachable!(),

5 => match proc_proto::KvRequest::decode(buf) {
Ok(req) => {
tracing::debug!("function requested for kv");
let proc_rpc = self.clone();
let srctaskid = req.fn_taskid();
let conn = conn.clone();
let _ = tokio::spawn(async move {
let proc_rpc_res = proc_rpc
.0
.kv_user_client()
.kv_requests(srctaskid, req.to_proto_kvrequests())
.await;
match proc_rpc_res {
Ok(mut res) => {
tracing::debug!("function kv request success, sending response");
match rpc_model::send_resp::<proc_proto::KvRequest>(
conn,
taskid,
proc_proto::KvResponse::from(res.responses.pop().unwrap()),
)
.await
{
Ok(_) => {
tracing::debug!("send kv response success");
}
Err(e) => {
tracing::warn!("send kv response failed: {:?}", e);
}
}
}
Err(e) => {
tracing::warn!("function kv request failed, error: {:?}", e);
}
}
});
return true;
}
Err(e) => {
tracing::warn!(
"decode kv request failed with buf length: {}, parital content: {:?}",
buf.len(),
&buf[..20]
);
e
}
},
id => {
tracing::warn!("handle_remote_call: unsupported id: {}", id);
return false;
}
};
tracing::warn!("handle_remote_call error: {:?}", err);
true
Expand All @@ -168,18 +223,46 @@ impl MsgIdBind for proc_proto::FuncCallResp {
}
}

impl MsgIdBind for proc_proto::UpdateCheckpoint {
fn id() -> u16 {
4
}
}

impl MsgIdBind for proc_proto::KvRequest {
fn id() -> u16 {
5
}
}

impl MsgIdBind for proc_proto::KvResponse {
fn id() -> u16 {
6
}
}

impl ReqMsg for FuncCallReq {
type Resp = FuncCallResp;
}

pub async fn call_func(app: &str, func: &str, arg: String) -> WSResult<FuncCallResp> {
impl ReqMsg for proc_proto::KvRequest {
type Resp = proc_proto::KvResponse;
}

pub async fn call_func(
srcfnid: proc_proto::FnTaskId,
app: &str,
func: &str,
arg: String,
) -> WSResult<FuncCallResp> {
rpc_model::call(
FuncCallReq {
src_task_id: Some(srcfnid),
func: func.to_owned(),
arg_str: arg,
},
HashValue::Str(app.into()),
Duration::from_secs(20),
Duration::from_secs(120),
)
.await
}
Loading
Loading