diff --git a/Cargo.Bazel.lock b/Cargo.Bazel.lock index 1b2f865e6..bd8c8c022 100644 --- a/Cargo.Bazel.lock +++ b/Cargo.Bazel.lock @@ -1,5 +1,5 @@ { - "checksum": "c314e0d98e718728f62fcf5ba7112841a0a20cc4fb816e5eaaafd31a23a8bc82", + "checksum": "453c7d22d591a20f7c0289edbecf6681a0d22b8e8f4c6755e992892a1a9ff426", "crates": { "actix-codec 0.5.2": { "name": "actix-codec", @@ -12928,6 +12928,10 @@ "id": "pretty_env_logger 0.5.0", "target": "pretty_env_logger" }, + { + "id": "prometheus 0.13.4", + "target": "prometheus" + }, { "id": "prost 0.13.5", "target": "prost" diff --git a/Cargo.lock b/Cargo.lock index 60ed5ce56..913e1a361 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2221,6 +2221,7 @@ dependencies = [ "log", "mockall", "pretty_env_logger", + "prometheus", "prost", "regex", "registry-canister", @@ -3290,7 +3291,7 @@ version = "0.9.0" source = "git+https://github.com/dfinity/ic.git?rev=79ca6c4c6907acb74ea658b05fcb3706787b810f#79ca6c4c6907acb74ea658b05fcb3706787b810f" dependencies = [ "candid", - "ic-error-types 0.2.0 (git+https://github.com/dfinity/ic.git?rev=79ca6c4c6907acb74ea658b05fcb3706787b810f)", + "ic-error-types 0.2.0", "ic-interfaces-adapter-client", "ic-protobuf", "serde", @@ -3409,7 +3410,7 @@ dependencies = [ "ic-canonical-state-tree-hash", "ic-certification-version", "ic-crypto-tree-hash", - "ic-error-types 0.2.0 (git+https://github.com/dfinity/ic.git?rev=79ca6c4c6907acb74ea658b05fcb3706787b810f)", + "ic-error-types 0.2.0", "ic-protobuf", "ic-registry-routing-table", "ic-registry-subnet-type", @@ -3445,7 +3446,7 @@ dependencies = [ "candid", "ic-cdk-executor", "ic-cdk-macros", - "ic-error-types 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)", + "ic-error-types 0.2.0", "ic-management-canister-types", "ic0", "serde", @@ -4079,7 +4080,7 @@ source = "git+https://github.com/dfinity/ic.git?rev=79ca6c4c6907acb74ea658b05fcb dependencies = [ "ic-base-types", "ic-crypto-interfaces-sig-verification", - "ic-error-types 0.2.0 (git+https://github.com/dfinity/ic.git?rev=79ca6c4c6907acb74ea658b05fcb3706787b810f)", + "ic-error-types 0.2.0", "ic-heap-bytes", "ic-interfaces-state-manager", "ic-management-canister-types-private", @@ -4271,7 +4272,7 @@ dependencies = [ "ic-base-types", "ic-btc-interface", "ic-btc-replica-types", - "ic-error-types 0.2.0 (git+https://github.com/dfinity/ic.git?rev=79ca6c4c6907acb74ea658b05fcb3706787b810f)", + "ic-error-types 0.2.0", "ic-protobuf", "ic-utils 0.9.0", "num-traits", @@ -4373,7 +4374,7 @@ dependencies = [ "dfn_candid", "dfn_core", "ic-base-types", - "ic-error-types 0.2.0 (git+https://github.com/dfinity/ic.git?rev=79ca6c4c6907acb74ea658b05fcb3706787b810f)", + "ic-error-types 0.2.0", "ic-ledger-core", "ic-management-canister-types-private", "ic-nervous-system-canisters", @@ -4821,7 +4822,7 @@ dependencies = [ "ic-base-types", "ic-nervous-system-proto", "ic-protobuf", - "rewards-calculation 0.9.0", + "rewards-calculation 0.9.0 (git+https://github.com/dfinity/ic.git?rev=79ca6c4c6907acb74ea658b05fcb3706787b810f)", "rust_decimal", "serde", ] @@ -4834,7 +4835,7 @@ dependencies = [ "bincode", "candid", "erased-serde 0.3.31", - "ic-error-types 0.2.0 (git+https://github.com/dfinity/ic.git?rev=79ca6c4c6907acb74ea658b05fcb3706787b810f)", + "ic-error-types 0.2.0", "prost", "serde", "serde_json", @@ -5105,7 +5106,7 @@ dependencies = [ "ic-certification-version", "ic-config", "ic-crypto-sha2", - "ic-error-types 0.2.0 (git+https://github.com/dfinity/ic.git?rev=79ca6c4c6907acb74ea658b05fcb3706787b810f)", + "ic-error-types 0.2.0", "ic-interfaces", "ic-limits", "ic-logger", @@ -5521,7 +5522,7 @@ dependencies = [ "ic-crypto-internal-types", "ic-crypto-sha2", "ic-crypto-tree-hash", - "ic-error-types 0.2.0 (git+https://github.com/dfinity/ic.git?rev=79ca6c4c6907acb74ea658b05fcb3706787b810f)", + "ic-error-types 0.2.0", "ic-heap-bytes", "ic-limits", "ic-management-canister-types-private", diff --git a/rs/cli/Cargo.toml b/rs/cli/Cargo.toml index 22e4d6ee8..793282e5e 100644 --- a/rs/cli/Cargo.toml +++ b/rs/cli/Cargo.toml @@ -90,6 +90,7 @@ url = { workspace = true } hex = { workspace = true } ic-registry-common-proto = { workspace = true } base64 = { version = "0.22" } +prometheus = "0.13" [dev-dependencies] actix-rt = { workspace = true } diff --git a/rs/cli/src/commands/node_rewards/mod.rs b/rs/cli/src/commands/node_rewards/mod.rs index 2181e7885..c00b9cdcd 100644 --- a/rs/cli/src/commands/node_rewards/mod.rs +++ b/rs/cli/src/commands/node_rewards/mod.rs @@ -16,6 +16,7 @@ use std::cell::RefCell; use std::collections::BTreeMap; use std::fs; use tabled::Tabled; +use url::Url; #[derive(Subcommand, Debug, Clone)] pub enum NodeRewardsMode { @@ -43,6 +44,15 @@ pub enum NodeRewardsMode { #[arg(long)] compare_with_governance: bool, }, + /// Push node rewards metrics to VictoriaMetrics for a specific date + PushToVictoria { + /// Date in format YYYY-MM-DD + #[arg(long)] + date: Option, + /// VictoriaMetrics URL (defaults to http://localhost:9090) + #[arg(long)] + victoria_url: Option, + }, } #[derive(Args, Debug)] @@ -94,6 +104,16 @@ impl ExecutableCommand for NodeRewards { let gov_rewards_list = governance_client.list_node_provider_rewards(None).await?; // Run the selected subcommand and populate self attributes match &self.mode { + NodeRewardsMode::PushToVictoria { date, victoria_url } => { + let url_string = victoria_url + .clone() + .or_else(|| std::env::var("VICTORIA_METRICS_URL").ok()) + .unwrap_or_else(|| "http://localhost:9090".to_string()); + + let victoria_url = Url::parse(&url_string).map_err(|e| anyhow!("Invalid VictoriaMetrics URL '{}': {}", url_string, e))?; + + return self.push_rewards_metrics_to_victoria(ctx, date.clone(), victoria_url).await; + } NodeRewardsMode::Ongoing { csv_detailed_output_path, provider_id, @@ -927,4 +947,171 @@ impl NodeRewards { wtr.flush()?; Ok(()) } + + async fn push_rewards_metrics_to_victoria(&self, ctx: crate::ctx::DreContext, date: Option, victoria_url: Url) -> anyhow::Result<()> { + use prometheus::{GaugeVec, Opts, Registry}; + // Parse or default to yesterday + let target_date = if let Some(date) = date { + chrono::NaiveDate::parse_from_str(&date, "%Y-%m-%d")? + } else { + let today = chrono::Utc::now().date_naive(); + today.pred_opt().unwrap() + }; + + info!("Pushing node rewards data to VictoriaMetrics for date: {}", target_date); + + // Fetch data from node rewards canister + let (_, canister_agent) = ctx.create_ic_agent_canister_client().await?; + let node_rewards_client: NodeRewardsCanisterWrapper = canister_agent.into(); + + let date_utc = DateUtc::from(target_date); + let daily_results = node_rewards_client.get_rewards_daily(date_utc).await?; + + // Create Prometheus registry and metrics + let registry = Registry::new(); + + let noon_timestamp_millis = target_date + .and_hms_opt(12, 0, 0) + .ok_or_else(|| anyhow!("Invalid date noon"))? + .and_utc() + .timestamp_millis(); + + // Define metrics + let nodes_count = GaugeVec::new(Opts::new("nodes_count", "Number of nodes per provider"), &["provider_id"])?; + registry.register(Box::new(nodes_count.clone()))?; + + let base_rewards = GaugeVec::new( + Opts::new("total_base_rewards_xdr_permyriad", "Total base rewards in XDR permyriad"), + &["provider_id"], + )?; + registry.register(Box::new(base_rewards.clone()))?; + + let adjusted_rewards = GaugeVec::new( + Opts::new("total_adjusted_rewards_xdr_permyriad", "Total adjusted rewards in XDR permyriad"), + &["provider_id"], + )?; + registry.register(Box::new(adjusted_rewards.clone()))?; + + let original_failure_rate = GaugeVec::new( + Opts::new("original_failure_rate", "Original failure rate per node"), + &["provider_id", "node_id", "subnet_id"], + )?; + registry.register(Box::new(original_failure_rate.clone()))?; + + let relative_failure_rate = GaugeVec::new( + Opts::new("relative_failure_rate", "Relative failure rate per node"), + &["provider_id", "node_id", "subnet_id"], + )?; + registry.register(Box::new(relative_failure_rate.clone()))?; + + let subnet_failure = GaugeVec::new(Opts::new("subnet_failure_rate", "Failure rate per subnet"), &["subnet_id"])?; + registry.register(Box::new(subnet_failure.clone()))?; + + let gov_timestamp = prometheus::Gauge::new( + "governance_latest_reward_event_timestamp_seconds", + "Latest governance reward event timestamp", + )?; + registry.register(Box::new(gov_timestamp.clone()))?; + + // Set provider-level metrics + for (provider_id, provider_rewards) in daily_results.provider_results { + let provider_id_str = provider_id.to_string(); + + nodes_count + .with_label_values(&[&provider_id_str]) + .set(provider_rewards.daily_nodes_rewards.len() as f64); + + if let Some(base) = provider_rewards.total_base_rewards_xdr_permyriad { + base_rewards.with_label_values(&[&provider_id_str]).set(base as f64); + } + + if let Some(adjusted) = provider_rewards.total_adjusted_rewards_xdr_permyriad { + adjusted_rewards.with_label_values(&[&provider_id_str]).set(adjusted as f64); + } + + // Node-level metrics + for node_result in &provider_rewards.daily_nodes_rewards { + let node_id_str = node_result.node_id.map(|id| id.to_string()).unwrap_or_default(); + + if let Some(DailyNodeFailureRate::SubnetMember { node_metrics: Some(m) }) = &node_result.daily_node_failure_rate { + let subnet_id_str = m.subnet_assigned.map(|s| s.to_string()).unwrap_or_default(); + + if let Some(original_fr) = m.original_failure_rate { + original_failure_rate + .with_label_values(&[&provider_id_str, &node_id_str, &subnet_id_str]) + .set(original_fr); + } + + if let Some(relative_fr) = m.relative_failure_rate { + relative_failure_rate + .with_label_values(&[&provider_id_str, &node_id_str, &subnet_id_str]) + .set(relative_fr); + } + } + } + } + + // Set subnet-level metrics + for (subnet_id, failure_rate) in daily_results.subnets_failure_rate { + subnet_failure.with_label_values(&[&subnet_id.to_string()]).set(failure_rate); + } + + // Set governance timestamp + let governance_client: GovernanceCanisterWrapper = ctx.create_ic_agent_canister_client().await?.1.into(); + let gov_rewards_list = governance_client.list_node_provider_rewards(None).await?; + if let Some(last_rewards) = gov_rewards_list.first() { + gov_timestamp.set(last_rewards.timestamp as f64); + } + + // Gather metrics and manually format with timestamps + let metric_families = registry.gather(); + let mut buffer = String::new(); + + // Manually construct Prometheus format with timestamps + for mf in &metric_families { + for m in mf.get_metric() { + let metric_name = mf.get_name(); + + // Build label string + let mut labels = Vec::new(); + for label in m.get_label() { + labels.push(format!("{}=\"{}\"", label.get_name(), label.get_value())); + } + let label_str = if labels.is_empty() { + String::new() + } else { + format!("{{{}}}", labels.join(",")) + }; + + // Get metric value + let value = if m.has_gauge() { + m.get_gauge().get_value() + } else if m.has_counter() { + m.get_counter().get_value() + } else if m.has_untyped() { + m.get_untyped().get_value() + } else { + continue; // Skip unsupported metric types + }; + + // Format: metric_name{labels} value timestamp_ms + buffer.push_str(&format!("{}{} {} {}\n", metric_name, label_str, value, noon_timestamp_millis)); + } + } + + // Push to VictoriaMetrics via import API + let client = reqwest::Client::new(); + let import_url = victoria_url + .join("/api/v1/import/prometheus") + .map_err(|e| anyhow!("Failed to construct VictoriaMetrics import URL: {}", e))?; + let response = client.post(import_url).header("Content-Type", "text/plain").body(buffer).send().await?; + + if response.status().is_success() { + info!("Successfully pushed node rewards data for {} to VictoriaMetrics", target_date); + Ok(()) + } else { + let error_text = response.text().await?; + Err(anyhow!("Failed to push to VictoriaMetrics: {}", error_text)) + } + } }