From ccc47638b3b6b55f5f8d4e0969cb350200ab0363 Mon Sep 17 00:00:00 2001 From: ActivePeter <1020401660@qq.com> Date: Thu, 22 May 2025 17:29:39 +0800 Subject: [PATCH] fixed app upload again and cache hit checkpoint bug --- scripts/telego/dist_waverless/deployment.yml | 11 ++- .../general/app/app_native/app_checkpoint.rs | 5 ++ src/main/src/general/app/mod.rs | 90 ++++++++++++------- .../data/m_data_general/batch_handler.rs | 27 +++--- .../src/general/data/m_data_general/mod.rs | 12 ++- src/main/src/main.rs | 13 +-- src/main/src/master/app/fddg.rs | 14 ++- src/main/src/master/app/test.rs | 23 +++++ src/main/src/master/data/m_data_master.rs | 8 ++ 9 files changed, 151 insertions(+), 52 deletions(-) diff --git a/scripts/telego/dist_waverless/deployment.yml b/scripts/telego/dist_waverless/deployment.yml index 0668ad2..6dc0e4c 100644 --- a/scripts/telego/dist_waverless/deployment.yml +++ b/scripts/telego/dist_waverless/deployment.yml @@ -15,11 +15,20 @@ dist: 1: {tag: "[meta, master]"} 2: {tag: "[meta, worker]"} 3: {tag: '[meta, worker]'} + 7: {tag: '[meta, worker]'} + 8: {tag: '[meta, worker]'} + 9: {tag: '[meta, worker]'} + 10: {tag: '[meta, worker]'} + # 每个unique服务的分布节点 distribution: lab1: [1] lab2: [2] lab3: [3] + lab7: [7] + lab8: [8] + lab9: [9] + lab10: [10] # 安装脚本 install: | telego install --bin-prj bin_waverless @@ -105,7 +114,7 @@ dist: echo "start waverless with id $DIST_UNIQUE_ID" # only host contains python3 python3 gen_nodes_config.py - export RUST_LOG=debug + export RUST_LOG=info rm -rf ./wasm_serverless ln -s /usr/bin/waverless ./wasm_serverless cp /usr/bin/waverless_entry ./ diff --git a/src/main/src/general/app/app_native/app_checkpoint.rs b/src/main/src/general/app/app_native/app_checkpoint.rs index b607964..0313f53 100644 --- a/src/main/src/general/app/app_native/app_checkpoint.rs +++ b/src/main/src/general/app/app_native/app_checkpoint.rs @@ -91,6 +91,11 @@ impl InstanceManager { pub async fn make_checkpoint_for_app(&self, app: &str) -> WSResult<()> { tracing::debug!("make checkpoint for app: {}", app); + // remove app checkpoint-dir + let app_dir = self.view.os().app_path(app); + let _ = std::fs::remove_dir_all(app_dir.join("checkpoint-dir")); + let _ = std::fs::remove_file(app_dir.join("checkpoint.log")); + let p = self.get_process_instance(&AppType::Jar, app); let _ = p.wait_for_verify().await; tracing::debug!("wait_for_verify done2"); diff --git a/src/main/src/general/app/mod.rs b/src/main/src/general/app/mod.rs index bc34092..5aadfa2 100644 --- a/src/main/src/general/app/mod.rs +++ b/src/main/src/general/app/mod.rs @@ -621,12 +621,12 @@ impl From<(AppType, FnMetaYaml)> for FnMeta { lazy_static::lazy_static! { static ref VIEW: Option = None; } -fn view() -> &'static View { - tracing::debug!("get view"); - let res = unsafe { util::non_null(&*VIEW).as_ref().as_ref().unwrap() }; - tracing::debug!("get view end"); - res -} +// fn view() -> &'static View { +// tracing::debug!("get view"); +// let res = unsafe { util::non_null(&*VIEW).as_ref().as_ref().unwrap() }; +// tracing::debug!("get view end"); +// res +// } #[derive(LogicalModule)] pub struct AppMetaManager { @@ -963,33 +963,61 @@ impl AppMetaManager { // return Ok(Some((res, None))); // } + fn map_app_data_res( + datameta: Result<(DataSetMetaV2, HashMap), WSError>, + ) -> Result, WSError> { + match datameta { + Err(err) => match err { + WSError::WsDataError(WsDataError::DataSetNotFound { uniqueid }) => { + tracing::debug!( + "get_app_meta not exist, uniqueid: {:?}", + std::str::from_utf8(&*uniqueid) + ); + Ok(None) + } + _ => { + tracing::warn!("get_app_meta failed with err {:?}", err); + Err(err) + } + }, + Ok((datameta, mut datas)) => { + let meta: proto::DataItem = datas.remove(&0).unwrap(); + Ok(Some((datameta, meta))) + } + } + } // self.app_metas.get(app) tracing::debug!("calling get_or_del_data to get app meta, app: {}", app); - let datameta = view() - .data_general() - .get_or_del_datas(GetOrDelDataArg { - meta: None, - unique_id: format!("{}{}", DATA_UID_PREFIX_APP_META, app).into(), - ty: GetOrDelDataArgType::PartialOne { idx: 0 }, - }) - .await; - - // only one data item - let (datameta, meta): (DataSetMetaV2, proto::DataItem) = match datameta { - Err(err) => match err { - WSError::WsDataError(WsDataError::DataSetNotFound { uniqueid }) => { - tracing::debug!( - "get_app_meta not exist, uniqueid: {:?}", - std::str::from_utf8(&*uniqueid) - ); - return Ok(None); - } - _ => { - tracing::warn!("get_app_meta failed with err {:?}", err); - return Err(err); - } - }, - Ok((datameta, mut datas)) => (datameta, datas.remove(&0).unwrap()), + let unique_id = format!("{}{}", DATA_UID_PREFIX_APP_META, app); + + let mut datameta_meta = None; + for i in 0..2 { + let datameta = self + .view + .data_general() + .get_or_del_datas(GetOrDelDataArg { + meta: None, + unique_id: unique_id.clone().into(), + ty: GetOrDelDataArgType::PartialOne { idx: 0 }, + }) + .await; + // if let Ok((datameta, _)) = datameta {} + let res = map_app_data_res(datameta); + if let Ok(Some((datameta, meta))) = res { + datameta_meta = Some((datameta, meta)); + break; + } + tracing::debug!("get_app_meta get_or_del_datas failed, uid({}) data idx({}), will retry for the {} time", + unique_id, + 0, + i+1, + ); + tokio::time::sleep(Duration::from_secs(1)).await; + } + + let Some((datameta, meta)) = datameta_meta else { + tracing::warn!("get_app_meta failed after retry 2 times, app: {}", app); + return Ok(None); }; let proto::DataItem { diff --git a/src/main/src/general/data/m_data_general/batch_handler.rs b/src/main/src/general/data/m_data_general/batch_handler.rs index 30e3aea..466608b 100644 --- a/src/main/src/general/data/m_data_general/batch_handler.rs +++ b/src/main/src/general/data/m_data_general/batch_handler.rs @@ -22,8 +22,8 @@ use std::{ use tokio::sync::{futures::Notified, oneshot, Mutex, Notify, RwLock}; use tracing; -/// 默认数据块大小 (4MB) -pub const DEFAULT_BLOCK_SIZE: usize = 4 * 1024 * 1024; +/// 默认数据块大小 (1MB) +pub const DEFAULT_BLOCK_SIZE: usize = 1 * 1024 * 1024; enum BatchDoneMsg { Done { @@ -509,7 +509,8 @@ impl DataGeneral { let opetype = opetype.clone(); let responsor = responsor.clone(); let version = dataset_meta.version; - let node_id = split.node_id as NodeID; + // let node_id = split.node_id as NodeID; + let split = split.clone(); // let data_offset = split.data_offset; let _ = tokio::spawn(async move { // first read the partial block from target node @@ -518,7 +519,7 @@ impl DataGeneral { .rpc_call_get_data .call( view.p2p(), - node_id, + split.node_id, proto::GetOneDataRequest { unique_id: unique_id.clone(), idxs: vec![idx as u32], @@ -529,23 +530,23 @@ impl DataGeneral { ) .await .unwrap_or_else(|err| { - panic!("batch one recev {:?} error: {}", request_id, err); + panic!("batch one fetch {:?} error: {}", request_id, err); }); if partial_block.data.len() != 1 { - tracing::warn!( - "batch one recev partial_block wrong count, idx({}), count({})", + let err_msg = format!( + "batch one fetch partial_block wrong count, key({:?}), idx({}), count({}), supposed split range({}-{})", + std::str::from_utf8(&unique_id).map(|v|v.to_string()).unwrap_or(format!("{:?}", unique_id)), idx, - partial_block.data.len() + partial_block.data.len(), + split.data_offset, + split.data_offset + split.data_size ); + tracing::warn!("{}", err_msg); responsor .done(BatchDoneMsg::Error { version: version, - error_message: format!( - "batch one recev partial_block wrong count, idx({}), count({})", - idx, - partial_block.data.len() - ), + error_message: err_msg, request_id: request_id.clone(), // required_result: None, }) diff --git a/src/main/src/general/data/m_data_general/mod.rs b/src/main/src/general/data/m_data_general/mod.rs index 5a33ae5..196bc7c 100644 --- a/src/main/src/general/data/m_data_general/mod.rs +++ b/src/main/src/general/data/m_data_general/mod.rs @@ -624,11 +624,21 @@ impl DataGeneral { }, Some(Duration::from_secs(60)), ) - .await?; + .await + .map_err(|err| { + tracing::error!( + "{} write_data version_schedule_resp error: {}", + log_tag, + err + ); + err + })?; // Clone the response to extend its lifetime let version = version_schedule_resp.version; let splits = version_schedule_resp.split.clone(); + // debug schedule plan + tracing::debug!("{} schedule plan: {:?}", log_tag, version_schedule_resp); // 处理每个数据项 let mut iter = WantIdxIter::new(&GetOrDelDataArgType::All, datas.len() as u8); diff --git a/src/main/src/main.rs b/src/main/src/main.rs index fefd8ca..49d76cf 100644 --- a/src/main/src/main.rs +++ b/src/main/src/main.rs @@ -41,10 +41,13 @@ async fn main() { let config = config::read_config(args.this_id, args.files_dir); tracing::info!("config: {:?}", config); // dist_kv_raft::tikvraft_proxy::start(); - let mut sys=Sys::new(config); - let modules_ref=sys.new_logical_modules_ref(); + let mut sys = Sys::new(config); + let modules_ref = sys.new_logical_modules_ref(); // modules_global_bridge::modules_ref_scope(modules_ref, async move{sys.wait_for_end().await;}) 由于modules_ref_scope改为了异步函数,所以这里加上.await 曾俊 - modules_global_bridge::modules_ref_scope(modules_ref, async move{sys.wait_for_end().await;}).await; + modules_global_bridge::modules_ref_scope(modules_ref, async move { + sys.wait_for_end().await; + }) + .await; } pub fn start_tracing() { @@ -89,8 +92,8 @@ pub fn start_tracing() { // } // v.level() == &tracing::Level::ERROR - // || v.level() == &tracing::Level::WARN - // || v.level() == &tracing::Level::INFO + // || v.level() == &tracing::Level::WARN + // || v.level() == &tracing::Level::INFO v.level() != &tracing::Level::TRACE // v.level() == &tracing::Level::INFO // true diff --git a/src/main/src/master/app/fddg.rs b/src/main/src/master/app/fddg.rs index ada8d90..27390dc 100644 --- a/src/main/src/master/app/fddg.rs +++ b/src/main/src/master/app/fddg.rs @@ -90,11 +90,16 @@ impl FDDGMgmt { &*_hold.as_ref().unwrap() }; + // let new_map_cb = || { + + // }; + let node = self .prefix_key_to_functions .search_or_insert(&insert_key, || { new_map! (HashMap { app_name.to_string() => { + // one app fn meta (app_type, new_map! (HashMap { fn_name.to_string() => fn_meta.clone(), })) @@ -107,7 +112,14 @@ impl FDDGMgmt { .and_modify(|(_app_type, fn_names)| { let _ = fn_names.insert(fn_name.to_string(), fn_meta.clone()); }) - .or_insert_with(|| panic!("app_name not found, should be created when search")); + .or_insert_with(|| { + ( + app_type, + new_map! (HashMap { + fn_name.to_string() => fn_meta.clone(), + }), + ) + }); } } Ok(()) diff --git a/src/main/src/master/app/test.rs b/src/main/src/master/app/test.rs index 5399d80..94fda2b 100644 --- a/src/main/src/master/app/test.rs +++ b/src/main/src/master/app/test.rs @@ -200,6 +200,9 @@ async fn test_app_upload() -> Result<(), Box> { let (_sys_guard, _master_logical_modules, _worker_logical_modules) = start_sys_with_app_uploaded(APP_FN_NAME).await; + tracing::debug!("test_app_upload uploading app again, some bug comes at later upload"); + bencher(APP_FN_NAME, true).await; + // 发起对函数的 http 请求校验应用是否运行 // 发起对函数的 http 请求校验应用是否运行 tracing::debug!("test_app_upload try calling test app"); @@ -245,6 +248,9 @@ async fn test_app_upload() -> Result<(), Box> { assert!(res.get("fn_start_time").is_some(), "Missing fn_start_time"); assert!(res.get("fn_end_time").is_some(), "Missing fn_end_time"); + tracing::debug!("test_app_upload uploading app again, some bug comes at later upload"); + bencher(APP_FN_NAME, true).await; + Ok(()) // 返回 Ok(()) 表示成功 } @@ -257,5 +263,22 @@ async fn test_write_data_trigger_app() -> Result<(), Box> bencher(APP_FN_NAME, false).await; + tracing::debug!( + "test_write_data_trigger_app uploading app again, some bug comes at later upload" + ); + bencher(APP_FN_NAME, true).await; + + // 重新调用 + tracing::debug!( + "test_write_data_trigger_app calling app again, some bug comes at later upload" + ); + bencher(APP_FN_NAME, false).await; + + // 重新上传 + tracing::debug!( + "test_write_data_trigger_app uploading app again, some bug comes at later upload" + ); + bencher(APP_FN_NAME, true).await; + Ok(()) } diff --git a/src/main/src/master/data/m_data_master.rs b/src/main/src/master/data/m_data_master.rs index 4080317..7c8d38c 100644 --- a/src/main/src/master/data/m_data_master.rs +++ b/src/main/src/master/data/m_data_master.rs @@ -309,8 +309,16 @@ impl DataMaster { .plan_for_write_data(&req.unique_id, ctx, FuncTriggerType::DataWrite) .await?; + tracing::debug!( + "master planned for write data({:?}) cache_modes: {:?}, fetching meta lock", + req.unique_id, + item_cache_modes + ); + let update_version_lock = kv_store_engine.with_rwlock(&metakey_bytes); let _guard = update_version_lock.write(); + tracing::debug!("master got meta lock for data({:?})", req.unique_id); + let dataset_meta = kv_store_engine.get(&metakey, true, KvAdditionalConf::default()); // let takeonce=Some((new_meta,new_))