Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions crates/scrybe-storage/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,3 +16,5 @@ tokio = { workspace = true }

[dev-dependencies]
mockall = { workspace = true }
testcontainers = "0.15"
chrono = { workspace = true }
124 changes: 124 additions & 0 deletions crates/scrybe-storage/src/client.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
//! ClickHouse client with connection pooling.

use clickhouse::Client;
use scrybe_core::ScrybeError;

/// ClickHouse client for session storage.
///
/// Provides connection pooling and health checks for ClickHouse database.
#[derive(Clone)]
pub struct ClickHouseClient {
client: Client,
}

impl ClickHouseClient {
/// Create a new ClickHouse client.
///
/// # Arguments
///
/// * `url` - ClickHouse server URL (e.g., `http://localhost:8123`)
/// * `database` - Database name (default: "scrybe")
/// * `username` - Username (default: "default")
/// * `password` - Password
///
/// # Errors
///
/// Returns `ScrybeError::StorageError` if connection fails.
///
/// # Example
///
/// ```no_run
/// # use scrybe_storage::ClickHouseClient;
/// # async fn example() -> Result<(), scrybe_core::ScrybeError> {
/// let client = ClickHouseClient::new(
/// "http://localhost:8123",
/// "scrybe",
/// "default",
/// ""
/// ).await?;
/// # Ok(())
/// # }
/// ```
pub async fn new(
url: &str,
database: &str,
username: &str,
password: &str,
) -> Result<Self, ScrybeError> {
let client = Client::default()
.with_url(url)
.with_database(database)
.with_user(username)
.with_password(password);

// Test connection with PING
client.query("SELECT 1").execute().await.map_err(|e| {
ScrybeError::storage_error("clickhouse", format!("Connection failed: {}", e))
})?;

Ok(Self { client })
}

/// Get the underlying ClickHouse client.
pub fn client(&self) -> &Client {
&self.client
}

/// Check if ClickHouse is healthy.
///
/// # Errors
///
/// Returns `ScrybeError::StorageError` if health check fails.
pub async fn health_check(&self) -> Result<(), ScrybeError> {
self.client.query("SELECT 1").execute().await.map_err(|e| {
ScrybeError::storage_error("clickhouse", format!("Health check failed: {}", e))
})?;

Ok(())
}

/// Initialize database schema.
///
/// Creates the sessions table if it doesn't exist.
///
/// # Errors
///
/// Returns `ScrybeError::StorageError` if schema creation fails.
pub async fn init_schema(&self) -> Result<(), ScrybeError> {
let schema = r#"
CREATE TABLE IF NOT EXISTS sessions (
session_id UUID,
timestamp DateTime64(3, 'UTC'),
fingerprint_hash String,
ip String,
user_agent String,
network_signals String,
browser_signals String,
behavioral_signals String,
bot_probability Float32,
confidence_score Float32,
INDEX idx_fingerprint fingerprint_hash TYPE bloom_filter GRANULARITY 1,
INDEX idx_ip ip TYPE tokenbf_v1(32768, 3, 0) GRANULARITY 1
) ENGINE = MergeTree()
PARTITION BY toYYYYMM(timestamp)
ORDER BY (timestamp, session_id)
TTL timestamp + INTERVAL 90 DAY
SETTINGS index_granularity = 8192;
"#;

self.client.query(schema).execute().await.map_err(|e| {
ScrybeError::storage_error("clickhouse", format!("Schema creation failed: {}", e))
})?;

Ok(())
}
}

#[cfg(test)]
mod tests {
#[tokio::test]
async fn test_clickhouse_client_compiles() {
// Placeholder - requires ClickHouse for full testing
assert!(true);
}
}
2 changes: 2 additions & 0 deletions crates/scrybe-storage/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@
#![warn(rust_2018_idioms)]
#![deny(unsafe_code)]

pub mod client;
pub mod writer;

// Re-export main types
pub use client::ClickHouseClient;
pub use writer::SessionWriter;
103 changes: 98 additions & 5 deletions crates/scrybe-storage/src/writer.rs
Original file line number Diff line number Diff line change
@@ -1,28 +1,121 @@
//! Session writer for ClickHouse storage.

use crate::client::ClickHouseClient;
use scrybe_core::{types::Session, ScrybeError};
use serde::Serialize;

/// Row format for ClickHouse sessions table.
#[derive(Debug, Serialize, clickhouse::Row)]
struct SessionRow {
session_id: String,
timestamp: i64,
fingerprint_hash: String,
ip: String,
user_agent: String,
network_signals: String,
browser_signals: String,
behavioral_signals: String,
bot_probability: f32,
confidence_score: f32,
}

impl SessionRow {
/// Convert a Session to ClickHouse row format.
fn from_session(session: &Session) -> Result<Self, ScrybeError> {
Ok(Self {
session_id: session.id.to_string(),
timestamp: session.timestamp.timestamp_millis(),
fingerprint_hash: session.fingerprint.hash.clone(),
ip: session.network.ip.to_string(),
user_agent: session.browser.user_agent.clone(),
network_signals: serde_json::to_string(&session.network).map_err(|e| {
ScrybeError::storage_error(
"clickhouse",
format!("JSON serialization failed: {}", e),
)
})?,
browser_signals: serde_json::to_string(&session.browser).map_err(|e| {
ScrybeError::storage_error(
"clickhouse",
format!("JSON serialization failed: {}", e),
)
})?,
behavioral_signals: serde_json::to_string(&session.behavioral).map_err(|e| {
ScrybeError::storage_error(
"clickhouse",
format!("JSON serialization failed: {}", e),
)
})?,
bot_probability: 0.0, // Will be filled by enrichment pipeline
confidence_score: 0.0, // Will be filled by enrichment pipeline
})
}
}

/// Writes session data to ClickHouse.
pub struct SessionWriter;
pub struct SessionWriter {
client: ClickHouseClient,
}

impl SessionWriter {
/// Create a new session writer.
pub fn new(client: ClickHouseClient) -> Self {
Self { client }
}

/// Write a session to ClickHouse.
///
/// # Errors
///
/// Returns `ScrybeError::StorageError` if the write fails.
pub async fn write(_session: &Session) -> Result<(), ScrybeError> {
// TODO: Implement actual ClickHouse write
pub async fn write(&self, session: &Session) -> Result<(), ScrybeError> {
let row = SessionRow::from_session(session)?;

let mut insert = self.client.client().insert("sessions").map_err(|e| {
ScrybeError::storage_error("clickhouse", format!("Insert preparation failed: {}", e))
})?;

insert.write(&row).await.map_err(|e| {
ScrybeError::storage_error("clickhouse", format!("Write failed: {}", e))
})?;

insert.end().await.map_err(|e| {
ScrybeError::storage_error("clickhouse", format!("Write completion failed: {}", e))
})?;

Ok(())
}

/// Batch write multiple sessions to ClickHouse.
///
/// More efficient for high-throughput scenarios.
///
/// # Errors
///
/// Returns `ScrybeError::StorageError` if the write fails.
pub async fn write_batch(_sessions: &[Session]) -> Result<(), ScrybeError> {
// TODO: Implement batch write
pub async fn write_batch(&self, sessions: &[Session]) -> Result<(), ScrybeError> {
if sessions.is_empty() {
return Ok(());
}

let mut insert = self.client.client().insert("sessions").map_err(|e| {
ScrybeError::storage_error("clickhouse", format!("Insert preparation failed: {}", e))
})?;

for session in sessions {
let row = SessionRow::from_session(session)?;
insert.write(&row).await.map_err(|e| {
ScrybeError::storage_error("clickhouse", format!("Write failed: {}", e))
})?;
}

insert.end().await.map_err(|e| {
ScrybeError::storage_error(
"clickhouse",
format!("Batch write completion failed: {}", e),
)
})?;

Ok(())
}
}
Expand Down
Loading