Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 55 additions & 1 deletion scripts/telego/dist_waverless/deployment.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,12 @@ comment: 存算融合的serverless计算平台

local_values: {}

prepare: []
prepare:
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

prometheus 用telego部署是指单独创建一个telego k8s prometheus项目,具体间telego使用文档,问下曾俊怎么用

# 部署前自动生成 prometheus 配置文件
- pyscript: gen_prometheus_config.py
trans:
- copy:
- /etc/prometheus/prometheus.yml: prometheus-config/prometheus.yml

dist:
waverless-test:
Expand Down Expand Up @@ -111,3 +116,52 @@ dist:
cp /usr/bin/waverless_entry ./

./waverless_entry $DIST_UNIQUE_ID
# 部署 prometheus 和 grafana
k8s:
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这里也是,放到单独的prj yml里,此外,也不要放在waverless项目中,只是单纯放在telego_prj目录中

底下的配置参数也全部是错的,看一下文档还有template示例,看看到底怎么用

prometheus:
type: k8s
image: prom/prometheus:latest
conf:
- name: prometheus-config
mountPath: /etc/prometheus/prometheus.yml
subPath: prometheus.yml
configMap: prometheus-config
ports:
- 9090:9090
env: {}
resources:
limits:
cpu: "500m"
memory: "512Mi"
requests:
cpu: "100m"
memory: "128Mi"
command: ["/bin/prometheus", "--config.file=/etc/prometheus/prometheus.yml"]

grafana:
type: k8s
image: grafana/grafana:latest
ports:
- 3000:3000
env:
GF_SECURITY_ADMIN_PASSWORD: "admin"
resources:
limits:
cpu: "500m"
memory: "512Mi"
requests:
cpu: "100m"
memory: "128Mi"
#实际部署时会被prepare生成的覆盖
configmap:
prometheus-config:
prometheus.yml: |
global:
scrape_interval: 15s
scrape_configs:
- job_name: 'waverless'
static_configs:
- targets:
- '1.2.3.4:2500'
- '5.6.7.8:2500'
- '9.10.11.12:2500'
20 changes: 20 additions & 0 deletions scripts/telego/dist_waverless/gen_prometheus_config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
import os
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这个没必要写成脚本,统一写到prometheus telego项目的local values中


node_ids = ["1", "2", "3"]

targets = []
for node_id in node_ids:
ip = os.environ.get(f"DIST_CONF_{node_id}_NODE_IP")
port = os.environ.get(f"DIST_CONF_{node_id}_port", "2500")
if ip:
targets.append(f"'{ip}:{port}'")

with open("/etc/prometheus/prometheus.yml", "w") as f:
f.write("global:\n scrape_interval: 15s\n\n")
f.write("scrape_configs:\n - job_name: 'waverless'\n static_configs:\n - targets: [\n")
for i, t in enumerate(targets):
if i < len(targets) - 1:
f.write(f" {t},\n")
else:
f.write(f" {t}\n")
f.write(" ]\n")
8 changes: 8 additions & 0 deletions scripts/telego/dist_waverless/prometheus.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
global:
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

放到独立项目

scrape_interval: 15s

scrape_configs:
- job_name: 'waverless'
static_configs:
- targets: ['192.168.31.109:2500'
]
3 changes: 2 additions & 1 deletion src/main/src/apis.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ use async_trait::async_trait;
use axum::{http::StatusCode, routing::post, Json, Router};
use serde::{Deserialize, Serialize};
use serde_json::{json, Value};
use crate::metrics::metrics_handler;
use axum::routing::get;

#[derive(Debug, Serialize, Deserialize)]
pub struct NodeBasic {
Expand Down Expand Up @@ -185,6 +187,5 @@ pub fn add_routers(mut router: Router) -> Router {
)
}
router = router.route("/run_service_action", post(run_service_action));

router
}
8 changes: 6 additions & 2 deletions src/main/src/general/data/m_data_general/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@ use async_raft::State as RaftState;
use axum::extract::{Multipart, State};
use axum::http::StatusCode;
use axum::response::{IntoResponse, Response};
use axum::routing::post;
use axum::routing::{get, post};
use crate::metrics::metrics_handler; // 导入metrics处理函数
use axum::Router;
use serde::Serialize;
use std::io;
Expand All @@ -24,7 +25,10 @@ impl DataGeneral {

with_option!(router_holder.option_mut(), router => {
// router.route("/upload_data", post(handle_write_data))
router.merge(Router::new().route("/upload_data", post(handle_upload_data).with_state(self.view.clone())))
router.merge(Router::new().route("/upload_data", post(handle_upload_data).with_state(self.view.clone()))
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

监控单独成独立的两个LogicalModule模块,不要乱塞到data这里,zhi'qian'bu之前不是说了吗??

image

.route("/metrics", get(metrics_handler))
)

});
Ok(())
}
Expand Down
167 changes: 167 additions & 0 deletions src/main/src/general/metrics/collector.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
// waverless/src/main/src/general/metrics/collector.rs
use crate::general::{LogicalModule, LogicalModuleNewArgs};
use crate::result::WSResult;
use crate::util::JoinHandleWrapper;
use prometheus_client::metrics::gauge::Gauge;
use prometheus_client::registry::Registry;
use std::sync::{Arc, Mutex};
use std::time::{Duration, SystemTime, UNIX_EPOCH};
use tokio::time;
use sysinfo::{System, SystemExt, ProcessExt};
use std::process;
use super::types::NodeMetrics;
use async_trait::async_trait;

/// 指标收集器,负责收集本节点的性能指标
/// 在Worker节点上运行时,会自动将指标通过RPC发送给Master节点
pub struct MetricsCollector {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这里上面要加上#[derive(LogicalMoudle)]

// 本地收集的指标
metrics: Arc<Mutex<Registry>>,

// 进程指标
process_cpu: Arc<Gauge>,
process_memory: Arc<Gauge>,

// 模块引用
args: LogicalModuleNewArgs,

// 是否是 Master 节点
is_master: bool,

// 当前进程ID
pid: i32,
}

#[async_trait]
impl LogicalModule for MetricsCollector {
fn inner_new(args: LogicalModuleNewArgs) -> Self {
let process_cpu = Arc::new(Gauge::default());
let process_memory = Arc::new(Gauge::default());

let mut registry = Registry::default();

// 注册进程指标
registry.register(
"waverless_process_cpu_percent",
"当前进程CPU使用率(百分比)",
process_cpu.clone(),
);
registry.register(
"waverless_process_memory_bytes",
"当前进程内存使用量(字节)",
process_memory.clone(),
);

let is_master = args.nodes_config.this.1.is_master();
let pid = process::id() as i32;

Self {
metrics: Arc::new(Mutex::new(registry)),
process_cpu,
process_memory,
args: args.clone(),
is_master,
pid,
}
}

async fn init(&self) -> WSResult<()> {
// 初始化时立即收集一次指标
self.collect_process_metrics();
tracing::info!("指标收集器已初始化 (节点类型: {})",
if self.is_master { "Master" } else { "Worker" });
Ok(())
}

async fn start(&self) -> WSResult<Vec<JoinHandleWrapper>> {
let process_cpu = self.process_cpu.clone();
let process_memory = self.process_memory.clone();
let is_master = self.is_master;
let args = self.args.clone();
let pid = self.pid;

// 启动定期收集任务
let handle = tokio::spawn(async move {
let mut interval = time::interval(Duration::from_secs(15));
let mut sys = System::new_all();

loop {
interval.tick().await;

// 收集进程指标
sys.refresh_all();

// 获取当前进程
if let Some(process) = sys.process(pid) {
// 进程CPU使用率
let cpu_usage = process.cpu_usage();
process_cpu.set((cpu_usage * 100.0) as i64); // 转为百分比整数

// 进程内存使用
let mem_usage = process.memory();
process_memory.set(mem_usage as i64);

tracing::debug!("收集到本地指标: CPU={}%, 内存={}字节",
cpu_usage, mem_usage);
}

// 如果是 Worker 节点,上报指标给 Master
if !is_master {
let node_id = args.nodes_config.this.0;
let node_addr = args.nodes_config.this.1.addr.to_string();

let metrics = NodeMetrics {
node_id,
node_addr,
process_cpu_percent: (process_cpu.get() as f64) / 100.0,
process_memory_bytes: process_memory.get(),
timestamp: SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_secs(),
};

// 使用 P2P 模块发送 RPC 请求
if let Ok(modules) = args.inner.upgrade() {
if let Some(modules) = modules.as_ref() {
let p2p = &modules.p2p;
let master_node = args.nodes_config.get_master_node();

// 发送 RPC 请求给 Master
match p2p.send_rpc_request(
master_node,
"report_metrics",
serde_json::to_string(&metrics).unwrap(),
).await {
Ok(_) => tracing::debug!("成功发送指标到Master节点"),
Err(e) => tracing::warn!("发送指标到Master节点失败: {:?}", e),
}
}
}
}
}
});

tracing::info!("指标收集器已启动");
Ok(vec![JoinHandleWrapper::new(handle)])
}
}

/// 公共方法
impl MetricsCollector {
/// 收集进程指标
pub fn collect_process_metrics(&self) {
let mut sys = System::new_all();
sys.refresh_all();

if let Some(process) = sys.process(self.pid) {
self.process_cpu.set((process.cpu_usage() * 100.0) as i64);
self.process_memory.set(process.memory() as i64);
}
}

/// 获取指标 Registry
pub fn registry(&self) -> Arc<Mutex<Registry>> {
self.metrics.clone()
}
}
6 changes: 6 additions & 0 deletions src/main/src/general/metrics/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
// waverless/src/main/src/general/metrics/mod.rs
mod types;
mod collector;

pub use types::{NodeMetrics, AggregatedMetrics};
pub use collector::MetricsCollector;
22 changes: 22 additions & 0 deletions src/main/src/general/metrics/types.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
use serde::{Serialize, Deserialize};
use std::collections::HashMap;

// 节点指标数据
#[derive(Clone, Serialize, Deserialize)]
pub struct NodeMetrics {
pub node_id: u32,
pub node_addr: String,

// 进程指标
pub process_cpu_percent: f64,
pub process_memory_bytes: i64,

pub timestamp: u64,
}

// 聚合的指标数据
#[derive(Default)]
pub struct AggregatedMetrics {
// 节点ID到指标的映射
pub node_metrics: HashMap<u32, NodeMetrics>,
}
2 changes: 1 addition & 1 deletion src/main/src/general/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,6 @@ pub mod data;
pub mod m_metric_publisher;
pub mod m_os;
pub mod network;

pub mod metrics;
#[cfg(test)]
pub mod test_utils;
1 change: 1 addition & 0 deletions src/main/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ pub mod modules_global_bridge;
pub mod result;
pub mod sys;
pub mod util;
pub mod metrics;

#[tokio::main]
async fn main() {
Expand Down
Loading
Loading