Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
target
*.zip
Python-3.10.12
test_temp_dir*
test_temp_dir*
app.jar
app.yml
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -72,3 +72,4 @@ tempfile="3.8"
# "z": optimize for binary size, but also turn off loop vectorization.
opt-level = 3 # Use slightly better optimizations.
overflow-checks = false # Disable integer overflow checks.

1 change: 1 addition & 0 deletions telego/README → scripts/telego/README
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
lnk `bin_Waverless` and `dist_waverless` to telego project dir
https://qcnoe3hd7k5c.feishu.cn/wiki/HKyFwat29i8PiEkxhCQcW9NdnTe
2 changes: 2 additions & 0 deletions scripts/telego/bin_waverless/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
prepare_cache
teledeploy
49 changes: 49 additions & 0 deletions scripts/telego/bin_waverless/deployment.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
comment: 存算融合的serverless计算平台

# 嵌入式的安装脚本,上传到 teledeploy 公共fileserver
local_values:
pack_py:
read_from_file: template/pack.py # require output to prepare_cache
install_crac.py:
read_from_file: template/install_crac.py
install_wasmedge.py:
read_from_file: template/install_wasmedge.py

prepare:
# 使用脚本预处理,将必要资源准备到 teledeploy,上传到 teledeploy 公共fileserver
- pyscript: ${pack_py}
trans:
- copy:
- run_node.py: teledeploy/waverless_entry_amd64
- wasm_serverless: teledeploy/waverless_amd64
- wasm_edge.py: teledeploy/install_wasmedge_inner.py
- jdk_crac.tar.gz: teledeploy/jdk_crac.tar.gz
# 下载wasmedge,上传到 teledeploy 公共fileserver
- url: https://github.com/WasmEdge/WasmEdge/releases/download/0.13.3/WasmEdge-0.13.3-manylinux2014_x86_64.tar.gz
trans:
- copy:
- WasmEdge-0.13.3-manylinux2014_x86_64.tar.gz: teledeploy/WasmEdge-0.13.3-manylinux2014_x86_64.tar.gz
# 考虑到目前telego安装描述还不够,提供脚本安装能力
- filemap:
content: ${install_crac.py}
path: teledeploy/install_crac.py
mode: 755
# 脚本安装能力
- filemap:
content: ${install_wasmedge.py}
path: teledeploy/install_wasmedge.py
mode: 755

bin:
# waverless 二进制本体
waverless:
# waverless 入口脚本
waverless_entry:
# wasmedge 安装脚本
wasmedge:
no_default_installer: true
py_installer: "install_wasmedge.py ${BIN_PRJ} ${MAIN_NODE_IP}"
# crac 安装脚本
crac:
no_default_installer: true
py_installer: "install_crac.py ${BIN_PRJ} ${MAIN_NODE_IP}"
75 changes: 75 additions & 0 deletions scripts/telego/bin_waverless/template/install_crac.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
import os,urllib.request,sys


install_dir="/teledeploy_secret/bin_crac"
crac_pack="jdk_crac.tar.gz"

def chdir(dir):
print("chdir:",dir)
os.chdir(dir)

def os_system(cmd):
print("os_system:",cmd)
os.system(cmd)

def download(url,file):
file=os.path.abspath(file)
dir=os.path.dirname(file)
os_system(f"mkdir -p {dir}")
print(f"downloading {url} to {file}")
urllib.request.urlretrieve(url,file)

### utils
def os_system_sure(command):
print(f"执行命令:{command}")
result = os.system(command)
if result != 0:
print(f"命令执行失败:{command}")
exit(1)
print(f"命令执行成功:{command}")

if len(sys.argv)!=3:
print("usage: python3 install_crac.py <bin_prj> <main_node_ip>")
exit(1)
BIN_PRJ=sys.argv[1]
MAIN_NODE_IP=sys.argv[2]

os_system(f"mkdir -p {install_dir}")
chdir(install_dir)

url=f"http://{MAIN_NODE_IP}:8003/{BIN_PRJ}/{crac_pack}"
download(url,crac_pack)

# extract jdk_crac.tar.gz to
os_system("tar -xvf jdk_crac.tar.gz")

# copy jdk_crac to /usr/jdk_crac
os_system_sure("rm -rf /usr/jdk_crac && cp -r jdk_crac /usr/jdk_crac")

# switch to jdk crac 17
def switch_to_jdk_crac():
CRAC_INSTALL_DIR = "/usr/jdk_crac"
bins=[
"java",
"javac",
"jcmd"
]
for bin in bins:
os_system_sure(f"update-alternatives --install /usr/bin/{bin} {bin} {CRAC_INSTALL_DIR}/bin/{bin} 100")
os_system_sure(f"update-alternatives --set {bin} {CRAC_INSTALL_DIR}/bin/{bin}")
# Check and update JAVA_HOME in /etc/environment
with open("/root/.bashrc", "r") as env_file:
lines = env_file.readlines()
java_home_set = False
for line in lines:
if line.startswith("export JAVA_HOME="):
line=f"export JAVA_HOME={CRAC_INSTALL_DIR}\n"
java_home_set = True
if not java_home_set:
lines.append(f"export JAVA_HOME={CRAC_INSTALL_DIR}\n")
print("env lines: ",lines)

with open("/root/.bashrc", "w") as env_file:
env_file.writelines(lines)
print("\nsuccess switch to jdk_crac")
switch_to_jdk_crac()
39 changes: 39 additions & 0 deletions scripts/telego/bin_waverless/template/install_wasmedge.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
import os,urllib.request,sys


install_dir="/teledeploy_secret/bin_wasmedge"
files=[
["install_wasmedge_inner.py","./"],
["WasmEdge-0.13.3-manylinux2014_x86_64.tar.gz","/tmp/install/"]
]

def chdir(dir):
print("chdir:",dir)
os.chdir(dir)

def os_system(cmd):
print("os_system:",cmd)
os.system(cmd)

def download(url,file):
file=os.path.abspath(file)
dir=os.path.dirname(file)
os_system(f"mkdir -p {dir}")
print(f"downloading {url} to {file}")
urllib.request.urlretrieve(url,file)

if len(sys.argv)!=3:
print("usage: python3 install_wasmedge.py <bin_prj> <main_node_ip>")
exit(1)
BIN_PRJ=sys.argv[1]
MAIN_NODE_IP=sys.argv[2]

os_system(f"mkdir -p {install_dir}")
chdir(install_dir)

for file in files:
url=f"http://{MAIN_NODE_IP}:8003/{BIN_PRJ}/{file[0]}"
file=os.path.join(file[1],file[0])
download(url,file)

os_system("python3 wasmedge_local_install.py")
76 changes: 76 additions & 0 deletions scripts/telego/bin_waverless/template/pack.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
import os

PRJ_DIR=os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
DOWNLOAD_CACHE_DIR=os.path.join(PRJ_DIR,"prepare_cache")
WAVERLESS_PATH="/root/serverless_benchmark_plus/middlewares/waverless/waverless"

#########

waverless_benchmark_path=os.path.abspath(os.path.join(WAVERLESS_PATH,"../../.."))

# cmd means the necessary command to prepare the resource
# rsc means the resource to be prepared
rscs=[
[ # binary
{"cmd":"python3 "+os.path.join(WAVERLESS_PATH,"scripts/build/1.1build_core.py")},
{"rsc":os.path.join(WAVERLESS_PATH,"scripts/build/pack/waverless_backend/wasm_serverless")},
],
[ # entry script
{"rsc":os.path.join(WAVERLESS_PATH,"scripts/build/template/run_node.py")},
],
[ # wasmedge installer
{"rsc":os.path.join(WAVERLESS_PATH,"scripts/install/inner/wasm_edge.py")},
],
[ # crac
{"cmd":"python3 "+os.path.join(WAVERLESS_PATH,"scripts/install/inner/install_crac.py && "+
"mkdir -p /teledeploy_secret/waverless && "
"rm -f /teledeploy_secret/waverless/jdk_crac.tar.gz && "+
"tar -czvf /teledeploy_secret/waverless/jdk_crac.tar.gz -C /usr jdk_crac")},
{"rsc":"/teledeploy_secret/waverless/jdk_crac.tar.gz"}
]
]


def chdir(dir):
print("chdir:",dir)
os.chdir(dir)

def os_system(cmd):
print("os_system:",cmd)
os.system(cmd)

def os_system_sure(cmd):
print("os_system_sure:",cmd)
res=os.system(cmd)
if res!=0:
raise Exception(f"os_system_sure failed: {cmd}")

for rsc_ in rscs:
cmd=""
rsc=""
copy=""

for item in rsc_:
if "rsc" in item:
rsc=item["rsc"]

for item in rsc_:
if rsc=="":
# 没有目标资源绑定,每次都执行脚本
if "cmd" in item:
cmd=item["cmd"]
os_system_sure(cmd)
else:
rsc_file=rsc.split("/")[-1]
cache_rsc=os.path.join(DOWNLOAD_CACHE_DIR,rsc_file)

# 有目标资源绑定,只有资源不存在时(缓存被删除),才执行脚本,并更新资源
if not os.path.exists(cache_rsc):
if "cmd" in item:
cmd=item["cmd"]
os_system_sure(cmd)
# copy to prepare_cache dir
os_system_sure(f"mkdir -p {DOWNLOAD_CACHE_DIR}")
os_system_sure(f"cp -r {rsc} {DOWNLOAD_CACHE_DIR}")

print("pack waverless related done!")
113 changes: 113 additions & 0 deletions scripts/telego/dist_waverless/deployment.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
# dist服务文档:https://qcnoe3hd7k5c.feishu.cn/wiki/Y9SkwEPmqiTov1knR8KctyJ0nJf
comment: 存算融合的serverless计算平台

local_values: {}

prepare: []

dist:
waverless-test:
# 运行在裸机上
type: raw_metal
# 每个unique服务的配置信息以及全局配置信息
conf:
global: {port: 2500}
1: {tag: "[meta, master]"}
2: {tag: "[meta, worker]"}
3: {tag: '[meta, worker]'}
# 每个unique服务的分布节点
distribution:
lab1: [1]
lab2: [2]
lab3: [3]
# 安装脚本
install: |
telego install --bin-prj bin_waverless2
# 有状态服务备份
state_backup: |
# 调试模式,不存储任何东西
rm -rf test_dir/kv_store_engine*
rm -rf test_dir/apps
rm -rf test_dir/files

# mkdir -p backup
#mv apps backup || true # allow to fail
#mv files backup || true # allow to fail
#mv kv_store_engine* backup || true # allow to fail
# 有状态服务恢复
state_restore: |
# mv backup/* . || true # allow to fail
############################################################
#
# telego项目结构
# 二进制执行目录
# \_ test_dir
# \_ files
# | \_ node_config.yaml
# \_ apps
# | |_ app1
# | |_ app2
# | \_ ...
# \_ kv_store_engine
#
############################################################

mkdir -p test_dir/files
mkdir -p test_dir/apps

ls test_dir > debug_exec_dir.log
ls test_dir/apps > debug_apps_dir.log

# 根据 dist conf 动态生成目标 config 格式
cat > gen_nodes_config.py << EOF
import os, subprocess
# DIST_UNIQUE_ID_LIST is env split with ,
if 'DIST_UNIQUE_ID_LIST' not in os.environ:
print("DIST_UNIQUE_ID_LIST is not set")
exit(1)
DIST_UNIQUE_ID_LIST = os.environ.get('DIST_UNIQUE_ID_LIST', '').split(',')
if DIST_UNIQUE_ID_LIST and DIST_UNIQUE_ID_LIST[0] == '':
DIST_UNIQUE_ID_LIST = []
with open("test_dir/files/node_config.yaml", "w") as f:
f.write("nodes:\n")

for unique_id in DIST_UNIQUE_ID_LIST:
print(f"processing {unique_id}")

# 使用 os.environ 来获取环境变量
ip = os.getenv(f"DIST_CONF_{unique_id}_NODE_IP")
port = os.getenv(f"DIST_CONF_{unique_id}_port")
spec = os.getenv(f"DIST_CONF_{unique_id}_tag")

# 将结果写入 node_config.yaml
f.write(f" {unique_id}:\n")
f.write(f" addr: {ip}:{port}\n")
f.write(f" spec: {spec}\n")
def kill_process_by_port(port):
try:
# 获取监听指定端口的进程ID (PID)
cmd = f"lsof -t -i:{port}"
pid = subprocess.check_output(cmd, shell=True).decode().strip()

# 杀死对应的进程
if pid:
os.kill(int(pid), 9) # 9 是 SIGKILL 信号,表示强制终止进程
print(f"进程 {pid} 已被终止")
else:
print(f"没有找到监听端口 {port} 的进程")
except subprocess.CalledProcessError as e:
print(f"出错了: {e}")
kill_process_by_port("2500")
EOF

# 启动脚本
entrypoint: |
echo "start waverless with id $DIST_UNIQUE_ID"
# only host contains python3
python3 gen_nodes_config.py
export RUST_LOG=debug
rm -rf ./wasm_serverless
ln -s /usr/bin/waverless ./wasm_serverless
cp /usr/bin/waverless_entry ./

./waverless_entry $DIST_UNIQUE_ID
4 changes: 2 additions & 2 deletions src/main/build.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
use std::io::Result;
fn main() -> Result<()> {
let mut config = prost_build::Config::new();
config
.type_attribute("BatchRequestId", "#[derive(Eq, Hash)]");
config.type_attribute("FnTaskId", "#[derive(Eq, Hash)]");
config.type_attribute("BatchRequestId", "#[derive(Eq, Hash)]");
config.compile_protos(
&[
"src/general/network/proto_src/kv.proto",
Expand Down
Loading