Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion Cargo.Bazel.lock
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
{
"checksum": "c314e0d98e718728f62fcf5ba7112841a0a20cc4fb816e5eaaafd31a23a8bc82",
"checksum": "453c7d22d591a20f7c0289edbecf6681a0d22b8e8f4c6755e992892a1a9ff426",
"crates": {
"actix-codec 0.5.2": {
"name": "actix-codec",
Expand Down Expand Up @@ -12928,6 +12928,10 @@
"id": "pretty_env_logger 0.5.0",
"target": "pretty_env_logger"
},
{
"id": "prometheus 0.13.4",
"target": "prometheus"
},
{
"id": "prost 0.13.5",
"target": "prost"
Expand Down
21 changes: 11 additions & 10 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions rs/cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ url = { workspace = true }
hex = { workspace = true }
ic-registry-common-proto = { workspace = true }
base64 = { version = "0.22" }
prometheus = "0.13"

[dev-dependencies]
actix-rt = { workspace = true }
Expand Down
187 changes: 187 additions & 0 deletions rs/cli/src/commands/node_rewards/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use std::cell::RefCell;
use std::collections::BTreeMap;
use std::fs;
use tabled::Tabled;
use url::Url;

#[derive(Subcommand, Debug, Clone)]
pub enum NodeRewardsMode {
Expand Down Expand Up @@ -43,6 +44,15 @@ pub enum NodeRewardsMode {
#[arg(long)]
compare_with_governance: bool,
},
/// Push node rewards metrics to VictoriaMetrics for a specific date
PushToVictoria {
/// Date in format YYYY-MM-DD
#[arg(long)]
date: Option<String>,
/// VictoriaMetrics URL (defaults to http://localhost:9090)
#[arg(long)]
victoria_url: Option<String>,
},
}

#[derive(Args, Debug)]
Expand Down Expand Up @@ -94,6 +104,16 @@ impl ExecutableCommand for NodeRewards {
let gov_rewards_list = governance_client.list_node_provider_rewards(None).await?;
// Run the selected subcommand and populate self attributes
match &self.mode {
NodeRewardsMode::PushToVictoria { date, victoria_url } => {
let url_string = victoria_url
.clone()
.or_else(|| std::env::var("VICTORIA_METRICS_URL").ok())
.unwrap_or_else(|| "http://localhost:9090".to_string());

let victoria_url = Url::parse(&url_string).map_err(|e| anyhow!("Invalid VictoriaMetrics URL '{}': {}", url_string, e))?;

return self.push_rewards_metrics_to_victoria(ctx, date.clone(), victoria_url).await;
}
NodeRewardsMode::Ongoing {
csv_detailed_output_path,
provider_id,
Expand Down Expand Up @@ -927,4 +947,171 @@ impl NodeRewards {
wtr.flush()?;
Ok(())
}

async fn push_rewards_metrics_to_victoria(&self, ctx: crate::ctx::DreContext, date: Option<String>, victoria_url: Url) -> anyhow::Result<()> {
use prometheus::{GaugeVec, Opts, Registry};
// Parse or default to yesterday
let target_date = if let Some(date) = date {
chrono::NaiveDate::parse_from_str(&date, "%Y-%m-%d")?
} else {
let today = chrono::Utc::now().date_naive();
today.pred_opt().unwrap()
};

info!("Pushing node rewards data to VictoriaMetrics for date: {}", target_date);

// Fetch data from node rewards canister
let (_, canister_agent) = ctx.create_ic_agent_canister_client().await?;
let node_rewards_client: NodeRewardsCanisterWrapper = canister_agent.into();

let date_utc = DateUtc::from(target_date);
let daily_results = node_rewards_client.get_rewards_daily(date_utc).await?;

// Create Prometheus registry and metrics
let registry = Registry::new();

let noon_timestamp_millis = target_date
.and_hms_opt(12, 0, 0)
.ok_or_else(|| anyhow!("Invalid date noon"))?
.and_utc()
.timestamp_millis();

// Define metrics
let nodes_count = GaugeVec::new(Opts::new("nodes_count", "Number of nodes per provider"), &["provider_id"])?;
registry.register(Box::new(nodes_count.clone()))?;

let base_rewards = GaugeVec::new(
Opts::new("total_base_rewards_xdr_permyriad", "Total base rewards in XDR permyriad"),
&["provider_id"],
)?;
registry.register(Box::new(base_rewards.clone()))?;

let adjusted_rewards = GaugeVec::new(
Opts::new("total_adjusted_rewards_xdr_permyriad", "Total adjusted rewards in XDR permyriad"),
&["provider_id"],
)?;
registry.register(Box::new(adjusted_rewards.clone()))?;

let original_failure_rate = GaugeVec::new(
Opts::new("original_failure_rate", "Original failure rate per node"),
&["provider_id", "node_id", "subnet_id"],
)?;
registry.register(Box::new(original_failure_rate.clone()))?;

let relative_failure_rate = GaugeVec::new(
Opts::new("relative_failure_rate", "Relative failure rate per node"),
&["provider_id", "node_id", "subnet_id"],
)?;
registry.register(Box::new(relative_failure_rate.clone()))?;

let subnet_failure = GaugeVec::new(Opts::new("subnet_failure_rate", "Failure rate per subnet"), &["subnet_id"])?;
registry.register(Box::new(subnet_failure.clone()))?;

let gov_timestamp = prometheus::Gauge::new(
"governance_latest_reward_event_timestamp_seconds",
"Latest governance reward event timestamp",
)?;
registry.register(Box::new(gov_timestamp.clone()))?;

// Set provider-level metrics
for (provider_id, provider_rewards) in daily_results.provider_results {
let provider_id_str = provider_id.to_string();

nodes_count
.with_label_values(&[&provider_id_str])
.set(provider_rewards.daily_nodes_rewards.len() as f64);

if let Some(base) = provider_rewards.total_base_rewards_xdr_permyriad {
base_rewards.with_label_values(&[&provider_id_str]).set(base as f64);
}

if let Some(adjusted) = provider_rewards.total_adjusted_rewards_xdr_permyriad {
adjusted_rewards.with_label_values(&[&provider_id_str]).set(adjusted as f64);
}

// Node-level metrics
for node_result in &provider_rewards.daily_nodes_rewards {
let node_id_str = node_result.node_id.map(|id| id.to_string()).unwrap_or_default();

if let Some(DailyNodeFailureRate::SubnetMember { node_metrics: Some(m) }) = &node_result.daily_node_failure_rate {
let subnet_id_str = m.subnet_assigned.map(|s| s.to_string()).unwrap_or_default();

if let Some(original_fr) = m.original_failure_rate {
original_failure_rate
.with_label_values(&[&provider_id_str, &node_id_str, &subnet_id_str])
.set(original_fr);
}

if let Some(relative_fr) = m.relative_failure_rate {
relative_failure_rate
.with_label_values(&[&provider_id_str, &node_id_str, &subnet_id_str])
.set(relative_fr);
}
}
}
}

// Set subnet-level metrics
for (subnet_id, failure_rate) in daily_results.subnets_failure_rate {
subnet_failure.with_label_values(&[&subnet_id.to_string()]).set(failure_rate);
}

// Set governance timestamp
let governance_client: GovernanceCanisterWrapper = ctx.create_ic_agent_canister_client().await?.1.into();
let gov_rewards_list = governance_client.list_node_provider_rewards(None).await?;
if let Some(last_rewards) = gov_rewards_list.first() {
gov_timestamp.set(last_rewards.timestamp as f64);
}

// Gather metrics and manually format with timestamps
let metric_families = registry.gather();
let mut buffer = String::new();

// Manually construct Prometheus format with timestamps
for mf in &metric_families {
for m in mf.get_metric() {
let metric_name = mf.get_name();

// Build label string
let mut labels = Vec::new();
for label in m.get_label() {
labels.push(format!("{}=\"{}\"", label.get_name(), label.get_value()));
}
let label_str = if labels.is_empty() {
String::new()
} else {
format!("{{{}}}", labels.join(","))
};

// Get metric value
let value = if m.has_gauge() {
m.get_gauge().get_value()
} else if m.has_counter() {
m.get_counter().get_value()
} else if m.has_untyped() {
m.get_untyped().get_value()
} else {
continue; // Skip unsupported metric types
};

// Format: metric_name{labels} value timestamp_ms
buffer.push_str(&format!("{}{} {} {}\n", metric_name, label_str, value, noon_timestamp_millis));
}
}

// Push to VictoriaMetrics via import API
let client = reqwest::Client::new();
let import_url = victoria_url
.join("/api/v1/import/prometheus")
.map_err(|e| anyhow!("Failed to construct VictoriaMetrics import URL: {}", e))?;
let response = client.post(import_url).header("Content-Type", "text/plain").body(buffer).send().await?;

if response.status().is_success() {
info!("Successfully pushed node rewards data for {} to VictoriaMetrics", target_date);
Ok(())
} else {
let error_text = response.text().await?;
Err(anyhow!("Failed to push to VictoriaMetrics: {}", error_text))
}
}
}
Loading