-
Notifications
You must be signed in to change notification settings - Fork 7
prometheus #58
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
prometheus #58
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -3,7 +3,12 @@ comment: 存算融合的serverless计算平台 | |
|
|
||
| local_values: {} | ||
|
|
||
| prepare: [] | ||
| prepare: | ||
| # 部署前自动生成 prometheus 配置文件 | ||
| - pyscript: gen_prometheus_config.py | ||
| trans: | ||
| - copy: | ||
| - /etc/prometheus/prometheus.yml: prometheus-config/prometheus.yml | ||
|
|
||
| dist: | ||
| waverless-test: | ||
|
|
@@ -111,3 +116,52 @@ dist: | |
| cp /usr/bin/waverless_entry ./ | ||
|
|
||
| ./waverless_entry $DIST_UNIQUE_ID | ||
| # 部署 prometheus 和 grafana | ||
| k8s: | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 这里也是,放到单独的prj yml里,此外,也不要放在waverless项目中,只是单纯放在telego_prj目录中 底下的配置参数也全部是错的,看一下文档还有template示例,看看到底怎么用 |
||
| prometheus: | ||
| type: k8s | ||
| image: prom/prometheus:latest | ||
| conf: | ||
| - name: prometheus-config | ||
| mountPath: /etc/prometheus/prometheus.yml | ||
| subPath: prometheus.yml | ||
| configMap: prometheus-config | ||
| ports: | ||
| - 9090:9090 | ||
| env: {} | ||
| resources: | ||
| limits: | ||
| cpu: "500m" | ||
| memory: "512Mi" | ||
| requests: | ||
| cpu: "100m" | ||
| memory: "128Mi" | ||
| command: ["/bin/prometheus", "--config.file=/etc/prometheus/prometheus.yml"] | ||
|
|
||
| grafana: | ||
| type: k8s | ||
| image: grafana/grafana:latest | ||
| ports: | ||
| - 3000:3000 | ||
| env: | ||
| GF_SECURITY_ADMIN_PASSWORD: "admin" | ||
| resources: | ||
| limits: | ||
| cpu: "500m" | ||
| memory: "512Mi" | ||
| requests: | ||
| cpu: "100m" | ||
| memory: "128Mi" | ||
| #实际部署时会被prepare生成的覆盖 | ||
| configmap: | ||
| prometheus-config: | ||
| prometheus.yml: | | ||
| global: | ||
| scrape_interval: 15s | ||
| scrape_configs: | ||
| - job_name: 'waverless' | ||
| static_configs: | ||
| - targets: | ||
| - '1.2.3.4:2500' | ||
| - '5.6.7.8:2500' | ||
| - '9.10.11.12:2500' | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,20 @@ | ||
| import os | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 这个没必要写成脚本,统一写到prometheus telego项目的local values中 |
||
|
|
||
| node_ids = ["1", "2", "3"] | ||
|
|
||
| targets = [] | ||
| for node_id in node_ids: | ||
| ip = os.environ.get(f"DIST_CONF_{node_id}_NODE_IP") | ||
| port = os.environ.get(f"DIST_CONF_{node_id}_port", "2500") | ||
| if ip: | ||
| targets.append(f"'{ip}:{port}'") | ||
|
|
||
| with open("/etc/prometheus/prometheus.yml", "w") as f: | ||
| f.write("global:\n scrape_interval: 15s\n\n") | ||
| f.write("scrape_configs:\n - job_name: 'waverless'\n static_configs:\n - targets: [\n") | ||
| for i, t in enumerate(targets): | ||
| if i < len(targets) - 1: | ||
| f.write(f" {t},\n") | ||
| else: | ||
| f.write(f" {t}\n") | ||
| f.write(" ]\n") | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,8 @@ | ||
| global: | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 放到独立项目 |
||
| scrape_interval: 15s | ||
|
|
||
| scrape_configs: | ||
| - job_name: 'waverless' | ||
| static_configs: | ||
| - targets: ['192.168.31.109:2500' | ||
| ] | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -11,7 +11,8 @@ use async_raft::State as RaftState; | |
| use axum::extract::{Multipart, State}; | ||
| use axum::http::StatusCode; | ||
| use axum::response::{IntoResponse, Response}; | ||
| use axum::routing::post; | ||
| use axum::routing::{get, post}; | ||
| use crate::metrics::metrics_handler; // 导入metrics处理函数 | ||
| use axum::Router; | ||
| use serde::Serialize; | ||
| use std::io; | ||
|
|
@@ -24,7 +25,10 @@ impl DataGeneral { | |
|
|
||
| with_option!(router_holder.option_mut(), router => { | ||
| // router.route("/upload_data", post(handle_write_data)) | ||
| router.merge(Router::new().route("/upload_data", post(handle_upload_data).with_state(self.view.clone()))) | ||
| router.merge(Router::new().route("/upload_data", post(handle_upload_data).with_state(self.view.clone())) | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||
| .route("/metrics", get(metrics_handler)) | ||
| ) | ||
|
|
||
| }); | ||
| Ok(()) | ||
| } | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,167 @@ | ||
| // waverless/src/main/src/general/metrics/collector.rs | ||
| use crate::general::{LogicalModule, LogicalModuleNewArgs}; | ||
| use crate::result::WSResult; | ||
| use crate::util::JoinHandleWrapper; | ||
| use prometheus_client::metrics::gauge::Gauge; | ||
| use prometheus_client::registry::Registry; | ||
| use std::sync::{Arc, Mutex}; | ||
| use std::time::{Duration, SystemTime, UNIX_EPOCH}; | ||
| use tokio::time; | ||
| use sysinfo::{System, SystemExt, ProcessExt}; | ||
| use std::process; | ||
| use super::types::NodeMetrics; | ||
| use async_trait::async_trait; | ||
|
|
||
| /// 指标收集器,负责收集本节点的性能指标 | ||
| /// 在Worker节点上运行时,会自动将指标通过RPC发送给Master节点 | ||
| pub struct MetricsCollector { | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 这里上面要加上#[derive(LogicalMoudle)] |
||
| // 本地收集的指标 | ||
| metrics: Arc<Mutex<Registry>>, | ||
|
|
||
| // 进程指标 | ||
| process_cpu: Arc<Gauge>, | ||
| process_memory: Arc<Gauge>, | ||
|
|
||
| // 模块引用 | ||
| args: LogicalModuleNewArgs, | ||
|
|
||
| // 是否是 Master 节点 | ||
| is_master: bool, | ||
|
|
||
| // 当前进程ID | ||
| pid: i32, | ||
| } | ||
|
|
||
| #[async_trait] | ||
| impl LogicalModule for MetricsCollector { | ||
| fn inner_new(args: LogicalModuleNewArgs) -> Self { | ||
| let process_cpu = Arc::new(Gauge::default()); | ||
| let process_memory = Arc::new(Gauge::default()); | ||
|
|
||
| let mut registry = Registry::default(); | ||
|
|
||
| // 注册进程指标 | ||
| registry.register( | ||
| "waverless_process_cpu_percent", | ||
| "当前进程CPU使用率(百分比)", | ||
| process_cpu.clone(), | ||
| ); | ||
| registry.register( | ||
| "waverless_process_memory_bytes", | ||
| "当前进程内存使用量(字节)", | ||
| process_memory.clone(), | ||
| ); | ||
|
|
||
| let is_master = args.nodes_config.this.1.is_master(); | ||
| let pid = process::id() as i32; | ||
|
|
||
| Self { | ||
| metrics: Arc::new(Mutex::new(registry)), | ||
| process_cpu, | ||
| process_memory, | ||
| args: args.clone(), | ||
| is_master, | ||
| pid, | ||
| } | ||
| } | ||
|
|
||
| async fn init(&self) -> WSResult<()> { | ||
| // 初始化时立即收集一次指标 | ||
| self.collect_process_metrics(); | ||
| tracing::info!("指标收集器已初始化 (节点类型: {})", | ||
| if self.is_master { "Master" } else { "Worker" }); | ||
| Ok(()) | ||
| } | ||
|
|
||
| async fn start(&self) -> WSResult<Vec<JoinHandleWrapper>> { | ||
| let process_cpu = self.process_cpu.clone(); | ||
| let process_memory = self.process_memory.clone(); | ||
| let is_master = self.is_master; | ||
| let args = self.args.clone(); | ||
| let pid = self.pid; | ||
|
|
||
| // 启动定期收集任务 | ||
| let handle = tokio::spawn(async move { | ||
| let mut interval = time::interval(Duration::from_secs(15)); | ||
| let mut sys = System::new_all(); | ||
|
|
||
| loop { | ||
| interval.tick().await; | ||
|
|
||
| // 收集进程指标 | ||
| sys.refresh_all(); | ||
|
|
||
| // 获取当前进程 | ||
| if let Some(process) = sys.process(pid) { | ||
| // 进程CPU使用率 | ||
| let cpu_usage = process.cpu_usage(); | ||
| process_cpu.set((cpu_usage * 100.0) as i64); // 转为百分比整数 | ||
|
|
||
| // 进程内存使用 | ||
| let mem_usage = process.memory(); | ||
| process_memory.set(mem_usage as i64); | ||
|
|
||
| tracing::debug!("收集到本地指标: CPU={}%, 内存={}字节", | ||
| cpu_usage, mem_usage); | ||
| } | ||
|
|
||
| // 如果是 Worker 节点,上报指标给 Master | ||
| if !is_master { | ||
| let node_id = args.nodes_config.this.0; | ||
| let node_addr = args.nodes_config.this.1.addr.to_string(); | ||
|
|
||
| let metrics = NodeMetrics { | ||
| node_id, | ||
| node_addr, | ||
| process_cpu_percent: (process_cpu.get() as f64) / 100.0, | ||
| process_memory_bytes: process_memory.get(), | ||
| timestamp: SystemTime::now() | ||
| .duration_since(UNIX_EPOCH) | ||
| .unwrap() | ||
| .as_secs(), | ||
| }; | ||
|
|
||
| // 使用 P2P 模块发送 RPC 请求 | ||
| if let Ok(modules) = args.inner.upgrade() { | ||
| if let Some(modules) = modules.as_ref() { | ||
| let p2p = &modules.p2p; | ||
| let master_node = args.nodes_config.get_master_node(); | ||
|
|
||
| // 发送 RPC 请求给 Master | ||
| match p2p.send_rpc_request( | ||
| master_node, | ||
| "report_metrics", | ||
| serde_json::to_string(&metrics).unwrap(), | ||
| ).await { | ||
| Ok(_) => tracing::debug!("成功发送指标到Master节点"), | ||
| Err(e) => tracing::warn!("发送指标到Master节点失败: {:?}", e), | ||
| } | ||
| } | ||
| } | ||
| } | ||
| } | ||
| }); | ||
|
|
||
| tracing::info!("指标收集器已启动"); | ||
| Ok(vec![JoinHandleWrapper::new(handle)]) | ||
| } | ||
| } | ||
|
|
||
| /// 公共方法 | ||
| impl MetricsCollector { | ||
| /// 收集进程指标 | ||
| pub fn collect_process_metrics(&self) { | ||
| let mut sys = System::new_all(); | ||
| sys.refresh_all(); | ||
|
|
||
| if let Some(process) = sys.process(self.pid) { | ||
| self.process_cpu.set((process.cpu_usage() * 100.0) as i64); | ||
| self.process_memory.set(process.memory() as i64); | ||
| } | ||
| } | ||
|
|
||
| /// 获取指标 Registry | ||
| pub fn registry(&self) -> Arc<Mutex<Registry>> { | ||
| self.metrics.clone() | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,6 @@ | ||
| // waverless/src/main/src/general/metrics/mod.rs | ||
| mod types; | ||
| mod collector; | ||
|
|
||
| pub use types::{NodeMetrics, AggregatedMetrics}; | ||
| pub use collector::MetricsCollector; |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,22 @@ | ||
| use serde::{Serialize, Deserialize}; | ||
| use std::collections::HashMap; | ||
|
|
||
| // 节点指标数据 | ||
| #[derive(Clone, Serialize, Deserialize)] | ||
| pub struct NodeMetrics { | ||
| pub node_id: u32, | ||
| pub node_addr: String, | ||
|
|
||
| // 进程指标 | ||
| pub process_cpu_percent: f64, | ||
| pub process_memory_bytes: i64, | ||
|
|
||
| pub timestamp: u64, | ||
| } | ||
|
|
||
| // 聚合的指标数据 | ||
| #[derive(Default)] | ||
| pub struct AggregatedMetrics { | ||
| // 节点ID到指标的映射 | ||
| pub node_metrics: HashMap<u32, NodeMetrics>, | ||
| } |

There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
prometheus 用telego部署是指单独创建一个telego k8s prometheus项目,具体间telego使用文档,问下曾俊怎么用