Skip to content
Merged
26 changes: 26 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,6 +1,32 @@
/target/
Cargo.lock

# Editor and IDE
*.swp
*.swo
*~
.DS_Store
.idea/
.vscode/
*.iml

# Environment and secrets
.env
.env.*
*.pem
*.key
*.p12
*.pfx
secrets/
credentials/

# Logs
*.log
logs/

# Coverage
*.profraw
*.profdata
coverage/
lcov.info
tarpaulin-report.html
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ parking_lot = "0.12"
arc-swap = "1.7"
regex = "1.10"
rand = "0.8"
url = "2.5"

[dev-dependencies]
tokio-test = "0.4"
Expand Down
1 change: 0 additions & 1 deletion benches/proxy_benchmark.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ use rust_servicemesh::ratelimit::{RateLimitConfig, RateLimiter};
use rust_servicemesh::retry::{RetryConfig, RetryPolicy};
use rust_servicemesh::router::{PathMatch, Route, Router};
use std::net::{IpAddr, Ipv4Addr};
use std::time::Duration;

fn bench_circuit_breaker(c: &mut Criterion) {
let rt = tokio::runtime::Runtime::new().unwrap();
Expand Down
170 changes: 160 additions & 10 deletions src/admin.rs
Original file line number Diff line number Diff line change
@@ -1,49 +1,125 @@
//! Admin endpoints for health checks and metrics.
//! Admin endpoints for health checks, readiness, and metrics.

use crate::metrics::Metrics;
use http::{Request, Response, StatusCode};
use http_body_util::{combinators::BoxBody, BodyExt, Full};
use hyper::body::{Bytes, Incoming};
use serde::Serialize;
use std::convert::Infallible;
use std::future::Future;
use std::pin::Pin;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::task::{Context, Poll};
use tower::Service;
use tracing::{debug, warn};

/// Readiness state that can be shared across the application.
#[derive(Debug)]
pub struct ReadinessState {
ready: AtomicBool,
}

impl ReadinessState {
/// Creates a new readiness state, initially not ready.
pub fn new() -> Self {
Self {
ready: AtomicBool::new(false),
}
}

/// Sets the readiness state to ready.
pub fn set_ready(&self) {
self.ready.store(true, Ordering::SeqCst);
}

/// Sets the readiness state to not ready.
pub fn set_not_ready(&self) {
self.ready.store(false, Ordering::SeqCst);
}

/// Checks if the service is ready.
pub fn is_ready(&self) -> bool {
self.ready.load(Ordering::SeqCst)
}
}

impl Default for ReadinessState {
fn default() -> Self {
Self::new()
}
}

/// Health check response format.
#[derive(Debug, Serialize)]
struct HealthResponse {
status: &'static str,
}

/// Readiness check response format.
#[derive(Debug, Serialize)]
struct ReadinessResponse {
ready: bool,
#[serde(skip_serializing_if = "Option::is_none")]
reason: Option<String>,
}

/// Admin service for health checks and metrics endpoints.
///
/// Serves:
/// - `/health` - Health check endpoint returning 200 OK
/// - `/health` - Liveness check endpoint returning 200 OK
/// - `/ready` - Readiness check endpoint (200 if ready, 503 if not)
/// - `/metrics` - Prometheus metrics in text format
///
/// # Example
///
/// ```no_run
/// use rust_servicemesh::admin::AdminService;
/// use std::sync::Arc;
///
/// let service = AdminService::new();
/// // Or with custom readiness state:
/// // let service = AdminService::with_readiness(Arc::new(ReadinessState::new()));
/// ```
#[derive(Clone)]
pub struct AdminService;
pub struct AdminService {
readiness: Arc<ReadinessState>,
}

impl AdminService {
/// Creates a new admin service.
/// Creates a new admin service with default readiness (starts as ready).
pub fn new() -> Self {
Self
let readiness = Arc::new(ReadinessState::new());
readiness.set_ready(); // Default to ready for backwards compatibility
Self { readiness }
}

/// Handles admin requests for health and metrics endpoints.
/// Creates an admin service with custom readiness state.
pub fn with_readiness(readiness: Arc<ReadinessState>) -> Self {
Self { readiness }
}

/// Returns the readiness state reference.
pub fn readiness(&self) -> &Arc<ReadinessState> {
&self.readiness
}

/// Handles admin requests for health, readiness, and metrics endpoints.
async fn handle_request(
readiness: Arc<ReadinessState>,
req: Request<Incoming>,
) -> Result<Response<BoxBody<Bytes, hyper::Error>>, Infallible> {
let path = req.uri().path();

match path {
"/health" => {
"/health" | "/healthz" => {
debug!("health check requested");
Ok(Self::health_response())
}
"/ready" | "/readyz" => {
debug!("readiness check requested");
Ok(Self::readiness_response(&readiness))
}
"/metrics" => {
debug!("metrics requested");
match Metrics::encode() {
Expand All @@ -61,12 +137,54 @@ impl AdminService {
}
}

/// Creates a health check response.
/// Creates a health check response (liveness probe).
fn health_response() -> Response<BoxBody<Bytes, hyper::Error>> {
let response = HealthResponse { status: "healthy" };
let body = serde_json::to_string(&response)
.unwrap_or_else(|_| r#"{"status":"healthy"}"#.to_string());

Response::builder()
.status(StatusCode::OK)
.header("Content-Type", "application/json")
.body(
Full::new(Bytes::from("healthy"))
Full::new(Bytes::from(body))
.map_err(|never| match never {})
.boxed(),
)
.unwrap_or_else(|_| {
Response::new(
Full::new(Bytes::new())
.map_err(|never| match never {})
.boxed(),
)
})
}

/// Creates a readiness check response.
fn readiness_response(readiness: &ReadinessState) -> Response<BoxBody<Bytes, hyper::Error>> {
let is_ready = readiness.is_ready();
let response = ReadinessResponse {
ready: is_ready,
reason: if is_ready {
None
} else {
Some("service not ready".to_string())
},
};
let body = serde_json::to_string(&response)
.unwrap_or_else(|_| format!(r#"{{"ready":{}}}"#, is_ready));

let status = if is_ready {
StatusCode::OK
} else {
StatusCode::SERVICE_UNAVAILABLE
};

Response::builder()
.status(status)
.header("Content-Type", "application/json")
.body(
Full::new(Bytes::from(body))
.map_err(|never| match never {})
.boxed(),
)
Expand Down Expand Up @@ -134,7 +252,8 @@ impl Service<Request<Incoming>> for AdminService {
}

fn call(&mut self, req: Request<Incoming>) -> Self::Future {
Box::pin(Self::handle_request(req))
let readiness = Arc::clone(&self.readiness);
Box::pin(Self::handle_request(readiness, req))
}
}

Expand All @@ -146,6 +265,37 @@ mod tests {
fn test_health_response() {
let response = AdminService::health_response();
assert_eq!(response.status(), StatusCode::OK);
assert_eq!(
response.headers().get("Content-Type").unwrap(),
"application/json"
);
}

#[test]
fn test_readiness_response_ready() {
let state = ReadinessState::new();
state.set_ready();
let response = AdminService::readiness_response(&state);
assert_eq!(response.status(), StatusCode::OK);
}

#[test]
fn test_readiness_response_not_ready() {
let state = ReadinessState::new();
let response = AdminService::readiness_response(&state);
assert_eq!(response.status(), StatusCode::SERVICE_UNAVAILABLE);
}

#[test]
fn test_readiness_state() {
let state = ReadinessState::new();
assert!(!state.is_ready());

state.set_ready();
assert!(state.is_ready());

state.set_not_ready();
assert!(!state.is_ready());
}

#[test]
Expand Down
Loading