Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion scripts/telego/dist_waverless/deployment.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,20 @@ dist:
1: {tag: "[meta, master]"}
2: {tag: "[meta, worker]"}
3: {tag: '[meta, worker]'}
7: {tag: '[meta, worker]'}
8: {tag: '[meta, worker]'}
9: {tag: '[meta, worker]'}
10: {tag: '[meta, worker]'}

# 每个unique服务的分布节点
distribution:
lab1: [1]
lab2: [2]
lab3: [3]
lab7: [7]
lab8: [8]
lab9: [9]
lab10: [10]
# 安装脚本
install: |
telego install --bin-prj bin_waverless
Expand Down Expand Up @@ -105,7 +114,7 @@ dist:
echo "start waverless with id $DIST_UNIQUE_ID"
# only host contains python3
python3 gen_nodes_config.py
export RUST_LOG=debug
export RUST_LOG=info
rm -rf ./wasm_serverless
ln -s /usr/bin/waverless ./wasm_serverless
cp /usr/bin/waverless_entry ./
Expand Down
5 changes: 5 additions & 0 deletions src/main/src/general/app/app_native/app_checkpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,11 @@ impl InstanceManager {

pub async fn make_checkpoint_for_app(&self, app: &str) -> WSResult<()> {
tracing::debug!("make checkpoint for app: {}", app);
// remove app checkpoint-dir
let app_dir = self.view.os().app_path(app);
let _ = std::fs::remove_dir_all(app_dir.join("checkpoint-dir"));
let _ = std::fs::remove_file(app_dir.join("checkpoint.log"));

let p = self.get_process_instance(&AppType::Jar, app);
let _ = p.wait_for_verify().await;
tracing::debug!("wait_for_verify done2");
Expand Down
90 changes: 59 additions & 31 deletions src/main/src/general/app/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -621,12 +621,12 @@ impl From<(AppType, FnMetaYaml)> for FnMeta {
lazy_static::lazy_static! {
static ref VIEW: Option<View> = None;
}
fn view() -> &'static View {
tracing::debug!("get view");
let res = unsafe { util::non_null(&*VIEW).as_ref().as_ref().unwrap() };
tracing::debug!("get view end");
res
}
// fn view() -> &'static View {
// tracing::debug!("get view");
// let res = unsafe { util::non_null(&*VIEW).as_ref().as_ref().unwrap() };
// tracing::debug!("get view end");
// res
// }

#[derive(LogicalModule)]
pub struct AppMetaManager {
Expand Down Expand Up @@ -963,33 +963,61 @@ impl AppMetaManager {
// return Ok(Some((res, None)));
// }

fn map_app_data_res(
datameta: Result<(DataSetMetaV2, HashMap<u8, proto::DataItem>), WSError>,
) -> Result<Option<(DataSetMetaV2, proto::DataItem)>, WSError> {
match datameta {
Err(err) => match err {
WSError::WsDataError(WsDataError::DataSetNotFound { uniqueid }) => {
tracing::debug!(
"get_app_meta not exist, uniqueid: {:?}",
std::str::from_utf8(&*uniqueid)
);
Ok(None)
}
_ => {
tracing::warn!("get_app_meta failed with err {:?}", err);
Err(err)
}
},
Ok((datameta, mut datas)) => {
let meta: proto::DataItem = datas.remove(&0).unwrap();
Ok(Some((datameta, meta)))
}
}
}
// self.app_metas.get(app)
tracing::debug!("calling get_or_del_data to get app meta, app: {}", app);
let datameta = view()
.data_general()
.get_or_del_datas(GetOrDelDataArg {
meta: None,
unique_id: format!("{}{}", DATA_UID_PREFIX_APP_META, app).into(),
ty: GetOrDelDataArgType::PartialOne { idx: 0 },
})
.await;

// only one data item
let (datameta, meta): (DataSetMetaV2, proto::DataItem) = match datameta {
Err(err) => match err {
WSError::WsDataError(WsDataError::DataSetNotFound { uniqueid }) => {
tracing::debug!(
"get_app_meta not exist, uniqueid: {:?}",
std::str::from_utf8(&*uniqueid)
);
return Ok(None);
}
_ => {
tracing::warn!("get_app_meta failed with err {:?}", err);
return Err(err);
}
},
Ok((datameta, mut datas)) => (datameta, datas.remove(&0).unwrap()),
let unique_id = format!("{}{}", DATA_UID_PREFIX_APP_META, app);

let mut datameta_meta = None;
for i in 0..2 {
let datameta = self
.view
.data_general()
.get_or_del_datas(GetOrDelDataArg {
meta: None,
unique_id: unique_id.clone().into(),
ty: GetOrDelDataArgType::PartialOne { idx: 0 },
})
.await;
// if let Ok((datameta, _)) = datameta {}
let res = map_app_data_res(datameta);
if let Ok(Some((datameta, meta))) = res {
datameta_meta = Some((datameta, meta));
break;
}
tracing::debug!("get_app_meta get_or_del_datas failed, uid({}) data idx({}), will retry for the {} time",
unique_id,
0,
i+1,
);
tokio::time::sleep(Duration::from_secs(1)).await;
}

let Some((datameta, meta)) = datameta_meta else {
tracing::warn!("get_app_meta failed after retry 2 times, app: {}", app);
return Ok(None);
};

let proto::DataItem {
Expand Down
27 changes: 14 additions & 13 deletions src/main/src/general/data/m_data_general/batch_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ use std::{
use tokio::sync::{futures::Notified, oneshot, Mutex, Notify, RwLock};
use tracing;

/// 默认数据块大小 (4MB)
pub const DEFAULT_BLOCK_SIZE: usize = 4 * 1024 * 1024;
/// 默认数据块大小 (1MB)
pub const DEFAULT_BLOCK_SIZE: usize = 1 * 1024 * 1024;

enum BatchDoneMsg {
Done {
Expand Down Expand Up @@ -509,7 +509,8 @@ impl DataGeneral {
let opetype = opetype.clone();
let responsor = responsor.clone();
let version = dataset_meta.version;
let node_id = split.node_id as NodeID;
// let node_id = split.node_id as NodeID;
let split = split.clone();
// let data_offset = split.data_offset;
let _ = tokio::spawn(async move {
// first read the partial block from target node
Expand All @@ -518,7 +519,7 @@ impl DataGeneral {
.rpc_call_get_data
.call(
view.p2p(),
node_id,
split.node_id,
proto::GetOneDataRequest {
unique_id: unique_id.clone(),
idxs: vec![idx as u32],
Expand All @@ -529,23 +530,23 @@ impl DataGeneral {
)
.await
.unwrap_or_else(|err| {
panic!("batch one recev {:?} error: {}", request_id, err);
panic!("batch one fetch {:?} error: {}", request_id, err);
});

if partial_block.data.len() != 1 {
tracing::warn!(
"batch one recev partial_block wrong count, idx({}), count({})",
let err_msg = format!(
"batch one fetch partial_block wrong count, key({:?}), idx({}), count({}), supposed split range({}-{})",
std::str::from_utf8(&unique_id).map(|v|v.to_string()).unwrap_or(format!("{:?}", unique_id)),
idx,
partial_block.data.len()
partial_block.data.len(),
split.data_offset,
split.data_offset + split.data_size
);
tracing::warn!("{}", err_msg);
responsor
.done(BatchDoneMsg::Error {
version: version,
error_message: format!(
"batch one recev partial_block wrong count, idx({}), count({})",
idx,
partial_block.data.len()
),
error_message: err_msg,
request_id: request_id.clone(),
// required_result: None,
})
Expand Down
12 changes: 11 additions & 1 deletion src/main/src/general/data/m_data_general/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -624,11 +624,21 @@ impl DataGeneral {
},
Some(Duration::from_secs(60)),
)
.await?;
.await
.map_err(|err| {
tracing::error!(
"{} write_data version_schedule_resp error: {}",
log_tag,
err
);
err
})?;

// Clone the response to extend its lifetime
let version = version_schedule_resp.version;
let splits = version_schedule_resp.split.clone();
// debug schedule plan
tracing::debug!("{} schedule plan: {:?}", log_tag, version_schedule_resp);

// 处理每个数据项
let mut iter = WantIdxIter::new(&GetOrDelDataArgType::All, datas.len() as u8);
Expand Down
13 changes: 8 additions & 5 deletions src/main/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,13 @@ async fn main() {
let config = config::read_config(args.this_id, args.files_dir);
tracing::info!("config: {:?}", config);
// dist_kv_raft::tikvraft_proxy::start();
let mut sys=Sys::new(config);
let modules_ref=sys.new_logical_modules_ref();
let mut sys = Sys::new(config);
let modules_ref = sys.new_logical_modules_ref();
// modules_global_bridge::modules_ref_scope(modules_ref, async move{sys.wait_for_end().await;}) 由于modules_ref_scope改为了异步函数,所以这里加上.await 曾俊
modules_global_bridge::modules_ref_scope(modules_ref, async move{sys.wait_for_end().await;}).await;
modules_global_bridge::modules_ref_scope(modules_ref, async move {
sys.wait_for_end().await;
})
.await;
}

pub fn start_tracing() {
Expand Down Expand Up @@ -89,8 +92,8 @@ pub fn start_tracing() {
// }

// v.level() == &tracing::Level::ERROR
// || v.level() == &tracing::Level::WARN
// || v.level() == &tracing::Level::INFO
// || v.level() == &tracing::Level::WARN
// || v.level() == &tracing::Level::INFO
v.level() != &tracing::Level::TRACE
// v.level() == &tracing::Level::INFO
// true
Expand Down
14 changes: 13 additions & 1 deletion src/main/src/master/app/fddg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,11 +90,16 @@ impl FDDGMgmt {
&*_hold.as_ref().unwrap()
};

// let new_map_cb = || {

// };

let node = self
.prefix_key_to_functions
.search_or_insert(&insert_key, || {
new_map! (HashMap {
app_name.to_string() => {
// one app fn meta
(app_type, new_map! (HashMap {
fn_name.to_string() => fn_meta.clone(),
}))
Expand All @@ -107,7 +112,14 @@ impl FDDGMgmt {
.and_modify(|(_app_type, fn_names)| {
let _ = fn_names.insert(fn_name.to_string(), fn_meta.clone());
})
.or_insert_with(|| panic!("app_name not found, should be created when search"));
.or_insert_with(|| {
(
app_type,
new_map! (HashMap {
fn_name.to_string() => fn_meta.clone(),
}),
)
});
}
}
Ok(())
Expand Down
23 changes: 23 additions & 0 deletions src/main/src/master/app/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,9 @@ async fn test_app_upload() -> Result<(), Box<dyn std::error::Error>> {
let (_sys_guard, _master_logical_modules, _worker_logical_modules) =
start_sys_with_app_uploaded(APP_FN_NAME).await;

tracing::debug!("test_app_upload uploading app again, some bug comes at later upload");
bencher(APP_FN_NAME, true).await;

// 发起对函数的 http 请求校验应用是否运行
// 发起对函数的 http 请求校验应用是否运行
tracing::debug!("test_app_upload try calling test app");
Expand Down Expand Up @@ -245,6 +248,9 @@ async fn test_app_upload() -> Result<(), Box<dyn std::error::Error>> {
assert!(res.get("fn_start_time").is_some(), "Missing fn_start_time");
assert!(res.get("fn_end_time").is_some(), "Missing fn_end_time");

tracing::debug!("test_app_upload uploading app again, some bug comes at later upload");
bencher(APP_FN_NAME, true).await;

Ok(()) // 返回 Ok(()) 表示成功
}

Expand All @@ -257,5 +263,22 @@ async fn test_write_data_trigger_app() -> Result<(), Box<dyn std::error::Error>>

bencher(APP_FN_NAME, false).await;

tracing::debug!(
"test_write_data_trigger_app uploading app again, some bug comes at later upload"
);
bencher(APP_FN_NAME, true).await;

// 重新调用
tracing::debug!(
"test_write_data_trigger_app calling app again, some bug comes at later upload"
);
bencher(APP_FN_NAME, false).await;

// 重新上传
tracing::debug!(
"test_write_data_trigger_app uploading app again, some bug comes at later upload"
);
bencher(APP_FN_NAME, true).await;

Ok(())
}
8 changes: 8 additions & 0 deletions src/main/src/master/data/m_data_master.rs
Original file line number Diff line number Diff line change
Expand Up @@ -309,8 +309,16 @@ impl DataMaster {
.plan_for_write_data(&req.unique_id, ctx, FuncTriggerType::DataWrite)
.await?;

tracing::debug!(
"master planned for write data({:?}) cache_modes: {:?}, fetching meta lock",
req.unique_id,
item_cache_modes
);

let update_version_lock = kv_store_engine.with_rwlock(&metakey_bytes);
let _guard = update_version_lock.write();
tracing::debug!("master got meta lock for data({:?})", req.unique_id);

let dataset_meta = kv_store_engine.get(&metakey, true, KvAdditionalConf::default());

// let takeonce=Some((new_meta,new_))
Expand Down
Loading