diff --git a/scripts/deploy_cluster/1.install_remote_env.py b/scripts/deploy_cluster/1.install_remote_env.py index cdfbee3..5abb3c8 100644 --- a/scripts/deploy_cluster/1.install_remote_env.py +++ b/scripts/deploy_cluster/1.install_remote_env.py @@ -24,7 +24,15 @@ def os_system(command): # make ../install tar.gz os_system_sure("rm -rf install.tar.gz") CRAC_INSTALL_DIR = "/usr/jdk_crac" -os_system_sure(f"cp -r {CRAC_INSTALL_DIR} ../install/inner/jdk_crac") + +# 判断一下 ../install/inner/jdk_crac 存不存在,如果不存在则 cp, 存在则跳过 +dst = "../install/inner/jdk_crac" +if not os.path.exists(dst): + os_system(f"cp -r {CRAC_INSTALL_DIR} {dst}") +else: + print(f"{dst} 已存在,跳过复制。") + +# os_system_sure(f"cp -r {CRAC_INSTALL_DIR} ../install/inner/jdk_crac") os_system_sure("tar -czvf install.tar.gz -C ../install .") def deploy_to_nodes(): diff --git a/scripts/deploy_cluster/node_config.yaml b/scripts/deploy_cluster/node_config.yaml index 7f54c57..7ec9644 100644 --- a/scripts/deploy_cluster/node_config.yaml +++ b/scripts/deploy_cluster/node_config.yaml @@ -1,11 +1,11 @@ nodes: - 9: - addr: 192.168.31.9:2500 + 4: + addr: 192.168.31.109:2500 spec: - meta - master - 10: - addr: 192.168.31.240:2500 + 11: + addr: 192.168.31.138:2500 spec: - meta - worker diff --git a/src/general/m_appmeta_manager/http.rs b/src/general/m_appmeta_manager/http.rs index 76e2f01..263afc0 100644 --- a/src/general/m_appmeta_manager/http.rs +++ b/src/general/m_appmeta_manager/http.rs @@ -84,6 +84,7 @@ async fn call_app_fn(Path((app, func)): Path<(String, String)>, body: String) -> } } +// KV DEBUG async fn upload_app(mut multipart: Multipart) -> Response { tracing::debug!("upload_app called"); // only worker can upload app diff --git a/src/general/m_appmeta_manager/mod.rs b/src/general/m_appmeta_manager/mod.rs index 68eb25a..51f76f0 100644 --- a/src/general/m_appmeta_manager/mod.rs +++ b/src/general/m_appmeta_manager/mod.rs @@ -138,6 +138,8 @@ impl<'de> Deserialize<'de> for FnMetaYaml { .ok_or_else(|| serde::de::Error::custom("not a map"))?; // let calls = map.remove("calls").ok_or_else(|| serde::de::Error::missing_field("calls"))?; let mut calls = vec![]; + + // KV DEBUG fn parse_http_call<'de, D: Deserializer<'de>>( map: &serde_yaml::Value, ) -> Result { @@ -695,6 +697,7 @@ impl AppMetaManager { // let appdir = self.fs_layer.concat_app_dir(app); let appmeta = self.fs_layer.read_app_meta(tmpapp).await?; + // KV DEBUG // TODO: 2.check project dir // 3. if java, take snapshot if let AppType::Jar = appmeta.app_type { @@ -736,6 +739,8 @@ impl AppMetaManager { .await .is_some()) } + + // KV DEBUG pub async fn app_uploaded(&self, appname: String, data: Bytes) -> WSResult<()> { // 1. tmpapp name & dir // TODO: fobidden tmpapp public access @@ -777,6 +782,7 @@ impl AppMetaManager { }; // 3. check meta + tracing::debug!("begin check meta"); let res = self.construct_tmp_app(&tmpapp).await; let appmeta = match res { Err(e) => { diff --git a/src/general/m_os/mod.rs b/src/general/m_os/mod.rs index ee978b1..d51c059 100644 --- a/src/general/m_os/mod.rs +++ b/src/general/m_os/mod.rs @@ -111,6 +111,8 @@ impl OperatingSystem { pub fn app_path(&self, app: &str) -> PathBuf { self.view.appmeta_manager().fs_layer.concat_app_dir(app) } + + // KV DEBUG pub fn start_process(&self, p: OsProcessType) -> process::Child { let (mut binding, log_file) = match p { OsProcessType::JavaApp(app) => { diff --git a/src/general/network/proto_src/kv.proto b/src/general/network/proto_src/kv.proto index 4cc9bf0..c3578ba 100644 --- a/src/general/network/proto_src/kv.proto +++ b/src/general/network/proto_src/kv.proto @@ -41,7 +41,7 @@ message KvPairs{ message KvResponse{ message KvResponse{ - repeated KvPair kvs=1; + repeated KvPair kvs=1; } oneof resp { KvResponse common_resp=1; @@ -49,6 +49,10 @@ message KvResponse{ } } +message KvResponses{ + repeated KvResponse responses=1; +} + message KvRequests{ string app=1; string func=2; @@ -56,10 +60,6 @@ message KvRequests{ int64 prev_kv_opeid=4; } -message KvResponses{ - repeated KvResponse responses=1; -} - // message MetaKvRequest{ // KvRequest request=1; // } diff --git a/src/general/network/rpc_model.rs b/src/general/network/rpc_model.rs index f6249d2..45c3b71 100644 --- a/src/general/network/rpc_model.rs +++ b/src/general/network/rpc_model.rs @@ -87,12 +87,15 @@ pub async fn call( // register the call back let (wait_tx, wait_rx) = oneshot::channel(); let next_task = NEXT_TASK_ID.fetch_add(1, Ordering::SeqCst); + tracing::debug!("insert into CALL_MAP next_task:{:?}", next_task.clone()); let _ = CALL_MAP.write().insert(next_task, wait_tx); + tracing::debug!("insert after CALL_MAP.write(): {:?}", CALL_MAP.write()); // send the request let mut buf = BytesMut::with_capacity(req.encoded_len() + 8); buf.put_i32(req.encoded_len() as i32); buf.put_i32(next_task as i32); + buf.put_i32(2); req.encode(&mut buf).unwrap(); tracing::debug!("send request: {:?} with len: {}", req, buf.len() - 8); @@ -122,15 +125,16 @@ pub async fn call( // Disconnected, // } -struct ConnState { +#[derive(Debug)] +pub struct ConnState { /// record the waiters // Connecting(Vec>>>), - tx: tokio::sync::mpsc::Sender>, + pub tx: tokio::sync::mpsc::Sender>, } lazy_static! { /// This is an example for using doc comment attributes - static ref CONN_MAP: RwLock> = RwLock::new(HashMap::new()); + pub static ref CONN_MAP: RwLock> = RwLock::new(HashMap::new()); static ref CALL_MAP: RwLock>>> = RwLock::new(HashMap::new()); @@ -147,13 +151,19 @@ async fn listen_task(socket: tokio::net::UnixStream) { let Some((conn, rx)) = listen_task_ext::verify_remote::(&mut sockrx, &mut len, &mut buf).await else { - tracing::debug!("verify failed"); + tracing::warn!("verify failed"); return; }; + tracing::debug!("verify_remote 结束"); + listen_task_ext::spawn_send_loop(rx, socktx); + tracing::debug!("spawn_send_loop 结束"); + listen_task_ext::read_loop::(conn, &mut sockrx, &mut len, &mut buf).await; + + tracing::debug!("read_loop 结束"); } pub(super) mod listen_task_ext { @@ -172,8 +182,8 @@ pub(super) mod listen_task_ext { pub(super) async fn verify_remote( sockrx: &mut OwnedReadHalf, - len: &mut usize, - buf: &mut [u8], + len: &mut usize, // 0 + buf: &mut [u8], // 0 ) -> Option<(HashValue, Receiver>)> { async fn verify_remote_inner( sockrx: &mut OwnedReadHalf, @@ -188,6 +198,8 @@ pub(super) mod listen_task_ext { let verify_msg_len = consume_i32(0, buf, len); + tracing::debug!("len: {}, verify_msg_len: {}", len, verify_msg_len); + // println!("waiting for verify msg {}", verify_msg_len); if !wait_for_len(sockrx, len, verify_msg_len, buf).await { tracing::warn!("failed to read verify msg"); @@ -195,18 +207,24 @@ pub(super) mod listen_task_ext { } // println!("wait done"); + tracing::debug!("wait_for_len 完成"); + let Some(id) = R::verify(&buf[4..4 + verify_msg_len]).await else { - tracing::warn!("verify failed"); + tracing::warn!("verify failed in verify_remote_inner"); return None; }; let (tx, rx) = tokio::sync::mpsc::channel(10); + // 确定一下为什么 conn_map 里面有上一次连接 id, 需要找这个 conn_map 在哪里都被调用了 let mut write_conn_map = CONN_MAP.write(); + tracing::debug!("write_conn_map: {:?}", write_conn_map); if write_conn_map.contains_key(&id) { tracing::warn!("conflict conn id: {:?}", id); return None; } let _ = write_conn_map.insert(id.clone(), ConnState { tx }); + tracing::debug!("insert into CALL_MAP id:{:?}", id.clone()); + tracing::debug!("insert after CALL_MAP.write(): {:?}", write_conn_map); // println!("verify success"); Some((id, rx)) @@ -230,6 +248,7 @@ pub(super) mod listen_task_ext { *len = 0; let mut offset = 0; loop { + let (msg_len, msg_id, taskid) = { let buf = &mut buf[offset..]; if !wait_for_len(socket, len, 9, buf).await { @@ -243,6 +262,8 @@ pub(super) mod listen_task_ext { consume_i32(5, buf, len) as u32, ) }; + + tracing::debug!("2 len: {}, msg_len: {}, msg_id: {}, taskid: {}", len, msg_len, msg_id, taskid); { if buf.len() < offset + msg_len { @@ -269,12 +290,15 @@ pub(super) mod listen_task_ext { }; let msg = buf[..msg_len].to_vec(); + tracing::debug!("msg: {:?}", msg); cb.send(msg).unwrap(); } // update the buf meta offset += msg_len; *len -= msg_len; + + tracing::debug!("1 len: {}, msg_len: {}, msg_id: {}, taskid: {}", len, msg_len, msg_id, taskid); } // match socket.read(buf).await { @@ -331,6 +355,7 @@ pub(super) mod listen_task_ext { return false; } // println!("recv: {:?}", buf[..n]); + tracing::debug!("len += {}", n); *len += n; } Err(e) => { diff --git a/src/worker/func/shared/java.rs b/src/worker/func/shared/java.rs index 210be5f..1a18ae5 100644 --- a/src/worker/func/shared/java.rs +++ b/src/worker/func/shared/java.rs @@ -9,6 +9,7 @@ use crate::{ use super::process::PID; +// KV DEBUG pub(super) fn cold_start(app: &str, os: &OperatingSystem) -> WSResult { tracing::debug!("java cold start {}", app); let p = os.start_process(OsProcessType::JavaApp(app.to_owned())); diff --git a/src/worker/func/shared/process.rs b/src/worker/func/shared/process.rs index 58e4b3a..f8c2c60 100644 --- a/src/worker/func/shared/process.rs +++ b/src/worker/func/shared/process.rs @@ -147,6 +147,7 @@ impl ProcessInstance { true } + // KV DEBUG pub async fn wait_for_verify(&self) -> proc_proto::AppStarted { if let Some(v) = self.state.0.read().0.as_connected() { return v.clone(); diff --git a/src/worker/func/shared/process_instance_man_related.rs b/src/worker/func/shared/process_instance_man_related.rs index 33116c2..e3b749e 100644 --- a/src/worker/func/shared/process_instance_man_related.rs +++ b/src/worker/func/shared/process_instance_man_related.rs @@ -3,7 +3,10 @@ use std::time::Duration; use tokio::process::Command; use crate::{ - general::m_appmeta_manager::AppType, + general::{ + m_appmeta_manager::{AppType}, + network::rpc_model::{self, HashValue}, + }, result::{WSResult, WsFuncError}, worker::func::{ m_instance_manager::{EachAppCache, InstanceManager}, @@ -15,6 +18,7 @@ use super::{process::ProcessInstance, SharedInstance}; impl InstanceManager { pub async fn update_checkpoint(&self, app_name: &str, restart: bool) -> WSResult<()> { + tracing::debug!("start update_checkpoint"); async fn debug_port_left() { tracing::debug!("debug port left"); // only for test @@ -40,6 +44,7 @@ impl InstanceManager { .into()); }; // state 2 connecting, make others wait + tracing::debug!("state 2 connecting, make others wait"); { proc_ins.before_checkpoint(); tokio::time::sleep(Duration::from_secs(3)).await; @@ -51,6 +56,11 @@ impl InstanceManager { AppType::Jar => java::take_snapshot(app_name, self.view.os()).await, AppType::Wasm => unreachable!(), } + + // 打完快照手动 close 一下 + tracing::debug!("打完快照手动 close 一下, 移除CONN_MAP中的残余 app"); + rpc_model::close_conn(&HashValue::Str(app_name.to_string().clone())); + } // recover by criu tokio::time::sleep(Duration::from_secs(3)).await; @@ -80,6 +90,7 @@ impl InstanceManager { Ok(()) } + // KV DEBUG pub async fn make_checkpoint_for_app(&self, app: &str) -> WSResult<()> { tracing::debug!("make checkpoint for app: {}", app); let p = self.get_process_instance(&AppType::Jar, app); diff --git a/src/worker/func/shared/process_rpc.rs b/src/worker/func/shared/process_rpc.rs index 78be47a..8a31e05 100644 --- a/src/worker/func/shared/process_rpc.rs +++ b/src/worker/func/shared/process_rpc.rs @@ -1,19 +1,23 @@ pub mod proc_proto { include!(concat!(env!("OUT_DIR"), "/process_rpc_proto.rs")); } - +use crate::general::network::rpc_model::{CONN_MAP, ConnState}; use crate::{ general::network::rpc_model::{self, HashValue, MsgIdBind, ReqMsg, RpcCustom}, modules_global_bridge::process_func::{ ModulesGlobalBrigeAppMetaManager, ModulesGlobalBrigeInstanceManager, }, - result::WSResult, + result::{WSResult, WsRpcErr}, sys::LogicalModulesRef, worker::func::shared::process_rpc::proc_proto::AppStarted, }; use async_trait::async_trait; -use parking_lot::Mutex; -use prost::Message; +use parking_lot::{Mutex, RwLock}; +use proc_proto::{kv_request::Op, kv_response::{self, CommonKvResponse, Resp}, KvPair, KvResponse, KvResponses}; +use prost::{ + bytes::{BufMut, BytesMut}, + Message, +}; use std::{collections::HashMap, path::Path, time::Duration}; use tokio::sync::oneshot; @@ -37,6 +41,8 @@ lazy_static::lazy_static! { #[async_trait] impl RpcCustom for ProcessRpc { type SpawnArgs = String; + + // 创建了一个 Unix socket 监听器,用于接收客户端的连接。当有连接时,会通过这个 socket 接收数据。 fn bind(a: String) -> tokio::net::UnixListener { clean_sock_file(&a); tokio::net::UnixListener::bind(&a).unwrap() @@ -53,15 +59,23 @@ impl RpcCustom for ProcessRpc { // }; // } + + // TODO 每个 return None 的地方打 log 查看 + // 一旦有数据到达,后端会先进行数据验证 async fn verify(buf: &[u8]) -> Option { + + // 首先尝试将其解码为 proc_proto::AppStarted 结构体 let res = proc_proto::AppStarted::decode(buf); let res: proc_proto::AppStarted = match res { Ok(res) => res, + + // 如果解码失败,则请求被忽略 Err(_) => { + tracing::debug!("解码失败,请求被忽略"); return None; } }; - + // 然后再根据 appid 查找对应的实例,并设置其验证状态 unsafe { let appman = ProcessRpc::global_m_app_meta_manager(); let ishttp = { @@ -77,6 +91,7 @@ impl RpcCustom for ProcessRpc { if ishttp && !with_http_port // || (!ishttp && with_http_port) <<< seems ok { + tracing::debug!("ishttp && !with_http_port 校验失败"); return None; } @@ -91,6 +106,7 @@ impl RpcCustom for ProcessRpc { return None; }; if !s.0.set_verifyed(res.clone()) { + tracing::warn!("failed to set verifyed"); return None; } } @@ -98,15 +114,18 @@ impl RpcCustom for ProcessRpc { Some(HashValue::Str(res.appid)) } - fn handle_remote_call(_conn: &HashValue, id: u8, buf: &[u8]) -> bool { + // 处理远程调用的主要逻辑。它根据消息的 id 来识别不同类型的请求,并根据请求的类型来处理。 + fn handle_remote_call(conn: &HashValue, id: u8, buf: &[u8]) -> bool { tracing::debug!("handle_remote_call: id: {}", id); let _ = match id { 4 => (), + 5 => (), id => { tracing::warn!("handle_remote_call: unsupported id: {}", id); return false; } }; + // TODO 再加一类处理新消息类型的 let err = match id { 4 => match proc_proto::UpdateCheckpoint::decode(buf) { Ok(_req) => { @@ -122,6 +141,119 @@ impl RpcCustom for ProcessRpc { } Err(e) => e, }, + 5 => match proc_proto::KvRequests::decode(buf) { + Ok(req) => { + tracing::debug!("test java kv"); + // TODO 要不要有处理逻辑,类似 4 下面的 spawn 这块 + + // 先接收到KvRequests,根据内容转换为 KvResponses,再传出去 + let mut kv_responses: Vec = Vec::new(); + + // 遍历 KvRequests 中的每一个请求 + for request in req.requests.clone().iter() { + match request.op.clone() { + Some(op)=>{ + match op { + Op::Set(kv_put) => { + // 将 kv_put.kv.key、kv_put.kv.value 解码为 String + let key_str = String::from_utf8(kv_put.kv.key.clone()).unwrap(); + let value_str = String::from_utf8(kv_put.kv.value.clone()).unwrap(); + tracing::debug!("kv_put, key={:?}, value={:?}", key_str, value_str); + + let key = (key_str + "_response").into_bytes(); + let value = (value_str + "_response").into_bytes(); + + // 构造 KvResponse + let kv_response = KvResponse { + resp: Some(Resp::CommonResp(CommonKvResponse{kvs: vec![KvPair{key, value}]})) + }; + kv_responses.push(kv_response); + }, + Op::Get(kv_get) => { + let start_str = String::from_utf8(kv_get.range.start.clone()).unwrap(); + let end_str = String::from_utf8(kv_get.range.end.clone()).unwrap(); + tracing::debug!("kv_get, start={:?}, end={:?}", start_str, end_str); + + let key = (start_str + "_response").into_bytes(); + let value = (end_str + "_response").into_bytes(); + + // 构造 KvResponse + let kv_response = KvResponse { + resp: Some(Resp::CommonResp(CommonKvResponse{kvs: vec![KvPair{key, value}]})) + }; + kv_responses.push(kv_response); + }, + Op::Delete(kv_delete) => { + let start_str = String::from_utf8(kv_delete.range.start.clone()).unwrap(); + let end_str = String::from_utf8(kv_delete.range.end.clone()).unwrap(); + tracing::debug!("kv_delete, start={:?}, end={:?}", start_str, end_str); + + let key = (start_str + "_response").into_bytes(); + let value = (end_str + "_response").into_bytes(); + + // 构造 KvResponse + let kv_response = KvResponse { + resp: Some(Resp::CommonResp(CommonKvResponse{kvs: vec![KvPair{key, value}]})) + }; + kv_responses.push(kv_response); + }, + Op::Lock(kv_lock) => { + let read_or_write = kv_lock.read_or_write; + let release_ids = kv_lock.release_id; + let start_str = String::from_utf8(kv_lock.range.start.clone()).unwrap(); + let end_str = String::from_utf8(kv_lock.range.end.clone()).unwrap(); + tracing::debug!("kv_lock, read_or_write={:?}, release_id={:?}, start={:?}, end={:?}", read_or_write, release_ids, start_str, end_str); + + let key = (start_str + "_response").into_bytes(); + let value = (end_str + "_response").into_bytes(); + + // 构造 KvResponse + let kv_response = KvResponse { + resp: Some(Resp::CommonResp(CommonKvResponse{kvs: vec![KvPair{key, value}]})) + }; + kv_responses.push(kv_response); + }, + } + }, + None => {}, + } + } + + let kv_responses = KvResponses { responses: kv_responses }; + + let conn = conn.clone(); + let _: tokio::task::JoinHandle> = tokio::spawn(async move { + let tx = { + let mut conn_map = CONN_MAP.write(); // 确保使用 await + match conn_map.get_mut(&conn) { + Some(state) => { + state.tx.clone() + }, + None => { + // 返回一个错误结果 + return Err(WsRpcErr::ConnectionNotEstablished(conn).into()); + } + } + }; + + // 其他逻辑 + let mut buf = BytesMut::with_capacity(kv_responses.encoded_len() + 8); + // 长度 + buf.put_i32(kv_responses.encoded_len() as i32); + // taskid + buf.put_i32(9999); + // 区别类型 reqType + buf.put_i32(1); + + tx.send(buf.into()).await.unwrap(); + + Ok(()) + }); + + return true; + } + Err(e) => e, + }, _ => unreachable!(), }; tracing::warn!("handle_remote_call error: {:?}", err); @@ -147,11 +279,21 @@ impl MsgIdBind for proc_proto::FuncCallResp { } } +impl MsgIdBind for proc_proto::KvRequests { + fn id() -> u16 { + 5 + } +} + impl ReqMsg for FuncCallReq { type Resp = FuncCallResp; } +// TODO 看一下是不是根据这个 app或者func 来区分是什么类型的请求 pub async fn call_func(app: &str, func: &str, arg: String) -> WSResult { + + tracing::debug!("CALL_FUNC: app:{}, func:{}, arg:{}", app, func, arg); + rpc_model::call( FuncCallReq { func: func.to_owned(), diff --git a/src/worker/func/shared/process_rpc_proto.proto b/src/worker/func/shared/process_rpc_proto.proto index b98801b..ea5c5fd 100644 --- a/src/worker/func/shared/process_rpc_proto.proto +++ b/src/worker/func/shared/process_rpc_proto.proto @@ -1,5 +1,7 @@ package process_rpc_proto; +// TODO 传递数据的 proto + // Sample message. message AppStarted { required string appid = 1; @@ -19,4 +21,64 @@ message FuncCallResp{ message UpdateCheckpoint{ +} + +message KeyRange { + required bytes start=1; + required bytes end=2; +} + +message KvPair { + required bytes key=1; + required bytes value=2; +} + +message KvRequest { + message KvPutRequest{ + required KvPair kv=1; + } + message KvGetRequest{ + required KeyRange range=1; + } + + message KvDeleteRequest{ + required KeyRange range=1; + } + + message KvLockRequest{ + required bool read_or_write=1; + repeated uint32 release_id=2; + required KeyRange range=3; + } + oneof op { + KvPutRequest set=1; + KvGetRequest get=2; + KvDeleteRequest delete=3; + KvLockRequest lock=4; + } +} + +message KvPairs{ + repeated KvPair kvs=1; +} + +message KvResponse { + message CommonKvResponse { + repeated KvPair kvs=1; + } + oneof resp { + CommonKvResponse common_resp=1; + uint32 lock_id=2; + } +} + +message KvResponses{ + repeated KvResponse responses=1; +} + +message KvRequests{ + required string app=1; + required string func=2; + repeated KvRequest requests=3; + required int64 prev_kv_opeid=4; } \ No newline at end of file diff --git a/src/worker/m_executor.rs b/src/worker/m_executor.rs index 6666e3d..778b50e 100644 --- a/src/worker/m_executor.rs +++ b/src/worker/m_executor.rs @@ -89,7 +89,7 @@ impl LogicalModule for Executor { let view = self.view.clone(); self.view.executor().rpc_handler_distribute_task.regist( self.view.p2p(), - move |responser, r| { + move |responser, r: proto::sche::DistributeTaskReq| { // tracing::info!("rpc recv: {:?}", r); let view = view.clone(); let _ = tokio::spawn(async move {