From 881e2365a63a2ee632dabb11f95a89d65160afed Mon Sep 17 00:00:00 2001 From: Anoop Narang Date: Thu, 5 Feb 2026 14:47:49 +0530 Subject: [PATCH 1/2] feat(tracing): add instrumentation hooks to liquid-cache builder Rewrite LiquidCacheClientBuilder locally to support object store and session state mapper hooks, enabling instrumentation for the liquid-cache code path. The builder now wraps object stores with tracing and adds the datafusion-tracing physical optimizer rule via the new extension points. --- src/engine.rs | 23 ++-- src/lib.rs | 1 + src/thirdparty/liquid_cache_client_builder.rs | 128 ++++++++++++++++++ src/thirdparty/mod.rs | 3 + 4 files changed, 144 insertions(+), 11 deletions(-) create mode 100644 src/thirdparty/liquid_cache_client_builder.rs create mode 100644 src/thirdparty/mod.rs diff --git a/src/engine.rs b/src/engine.rs index 4e3cf94..b882ea2 100644 --- a/src/engine.rs +++ b/src/engine.rs @@ -12,6 +12,7 @@ use crate::http::models::{ use crate::secrets::{EncryptedCatalogBackend, SecretManager, ENCRYPTED_PROVIDER_TYPE}; use crate::source::Source; use crate::storage::{FilesystemStorage, StorageManager}; +use crate::thirdparty::LiquidCacheClientBuilder; use anyhow::Result; use chrono::Utc; use datafusion::arrow::datatypes::Schema; @@ -23,7 +24,6 @@ use datafusion::execution::SessionStateBuilder; use datafusion::prelude::*; use datafusion_tracing::{instrument_with_info_spans, InstrumentationOptions}; use instrumented_object_store::instrument_object_store; -use liquid_cache_client::LiquidCacheClientBuilder; use object_store::ObjectStore; use std::collections::{HashMap, HashSet}; use std::path::{Path, PathBuf}; @@ -2725,15 +2725,6 @@ type LiquidCacheConfig = (String, Vec); /// /// This function creates a SessionContext with tracing instrumentation: /// -/// **Without liquid-cache:** -/// 1. Object stores are wrapped with `instrument_object_store()` for I/O tracing -/// 2. DataFusion operators get spans via `datafusion-tracing` -/// -/// **With liquid-cache:** -/// Uses `LiquidCacheClientBuilder` to create the context. Object stores are registered -/// with the builder (for the liquid-cache server), not locally instrumented since -/// data fetching happens on the server side. -/// /// ## Arguments /// /// * `object_stores` - Pre-built object stores to wrap with instrumentation (non-liquid-cache path) @@ -2747,7 +2738,17 @@ fn build_instrumented_context( if let Some((server_address, store_configs)) = liquid_cache_config { info!(server = %server_address, "Building liquid-cache session context"); - let mut liquid_cache_builder = LiquidCacheClientBuilder::new(&server_address); + let mut liquid_cache_builder = LiquidCacheClientBuilder::new(&server_address) + .with_object_store_mapper(|store, url| { + let url_ref: &url::Url = url.as_ref(); + let prefix = url_ref.scheme(); + instrument_object_store(store, prefix) + }) + .with_session_state_mapper(|builder| { + builder.with_physical_optimizer_rule( + instrument_with_info_spans!(options: InstrumentationOptions::default()), + ) + }); // Register object stores with liquid-cache builder (server needs these configs) for (url, options) in store_configs { diff --git a/src/lib.rs b/src/lib.rs index ae9599d..43f0827 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -10,6 +10,7 @@ pub mod secrets; pub mod source; pub mod storage; pub mod telemetry; +pub mod thirdparty; pub use engine::{QueryResponse, RuntimeEngine, RuntimeEngineBuilder}; pub use source::Source; diff --git a/src/thirdparty/liquid_cache_client_builder.rs b/src/thirdparty/liquid_cache_client_builder.rs new file mode 100644 index 0000000..262efe6 --- /dev/null +++ b/src/thirdparty/liquid_cache_client_builder.rs @@ -0,0 +1,128 @@ +use std::collections::HashMap; +use std::sync::Arc; + +use datafusion::error::{DataFusionError, Result}; +use datafusion::execution::object_store::ObjectStoreUrl; +use datafusion::execution::runtime_env::RuntimeEnv; +use datafusion::execution::SessionStateBuilder; +use datafusion::prelude::*; +use liquid_cache_client::PushdownOptimizer; +use object_store::ObjectStore; + +type ObjectStoreMapper = + Box, &ObjectStoreUrl) -> Arc + Send + Sync>; + +type SessionStateMapper = Box SessionStateBuilder + Send>; + +// Local rewrite of `liquid_cache_client::LiquidCacheClientBuilder` to support +// object store and session state instrumentation hooks. Remove this once the +// upstream library exposes these extension points. +pub struct LiquidCacheClientBuilder { + object_stores: Vec<(ObjectStoreUrl, HashMap)>, + cache_server: String, + object_store_mapper: Option, + session_state_mapper: Option, +} + +impl LiquidCacheClientBuilder { + /// Create a new builder for LiquidCache client state. + pub fn new(cache_server: impl AsRef) -> Self { + Self { + object_stores: vec![], + cache_server: cache_server.as_ref().to_string(), + object_store_mapper: None, + session_state_mapper: None, + } + } + + /// Add an object store to the builder. + /// Checkout for available options. + pub fn with_object_store( + mut self, + url: ObjectStoreUrl, + object_store_options: Option>, + ) -> Self { + self.object_stores + .push((url, object_store_options.unwrap_or_default())); + self + } + + /// Set a mapper function that transforms each object store before registration. + /// + /// This is useful for wrapping object stores with instrumentation or middleware. + /// The function receives the object store and its URL, and should return the + /// (possibly wrapped) object store. + pub fn with_object_store_mapper( + mut self, + f: impl Fn(Arc, &ObjectStoreUrl) -> Arc + + Send + + Sync + + 'static, + ) -> Self { + self.object_store_mapper = Some(Box::new(f)); + self + } + + /// Set a mapper function that transforms the `SessionStateBuilder` after the default + /// configuration and optimizer rules have been applied. + /// + /// This is useful for adding additional physical optimizer rules (e.g., tracing instrumentation). + pub fn with_session_state_mapper( + mut self, + f: impl FnOnce(SessionStateBuilder) -> SessionStateBuilder + Send + 'static, + ) -> Self { + self.session_state_mapper = Some(Box::new(f)); + self + } + + /// Build the [SessionContext]. + pub fn build(self, config: SessionConfig) -> Result { + let mut session_config = config; + session_config + .options_mut() + .execution + .parquet + .pushdown_filters = true; + session_config + .options_mut() + .execution + .parquet + .schema_force_view_types = false; + session_config + .options_mut() + .execution + .parquet + .binary_as_string = true; + session_config.options_mut().execution.batch_size = 8192 * 2; + + let runtime_env = Arc::new(RuntimeEnv::default()); + + // Register object stores + for (object_store_url, options) in &self.object_stores { + let (object_store, _path) = + object_store::parse_url_opts(object_store_url.as_ref(), options.clone()) + .map_err(|e| DataFusionError::External(Box::new(e)))?; + let object_store: Arc = Arc::new(object_store); + let object_store = match &self.object_store_mapper { + Some(mapper) => mapper(object_store, object_store_url), + None => object_store, + }; + runtime_env.register_object_store(object_store_url.as_ref(), object_store); + } + + let mut state_builder = SessionStateBuilder::new() + .with_config(session_config) + .with_runtime_env(runtime_env) + .with_default_features() + .with_physical_optimizer_rule(Arc::new(PushdownOptimizer::new( + self.cache_server.clone(), + self.object_stores.clone(), + ))); + + if let Some(mapper) = self.session_state_mapper { + state_builder = mapper(state_builder); + } + + Ok(SessionContext::new_with_state(state_builder.build())) + } +} diff --git a/src/thirdparty/mod.rs b/src/thirdparty/mod.rs new file mode 100644 index 0000000..032d9f4 --- /dev/null +++ b/src/thirdparty/mod.rs @@ -0,0 +1,3 @@ +mod liquid_cache_client_builder; + +pub use liquid_cache_client_builder::LiquidCacheClientBuilder; From 8244c4c0718dbc53d7a7b273a7e41914df7228bd Mon Sep 17 00:00:00 2001 From: Anoop Narang Date: Thu, 5 Feb 2026 16:07:26 +0530 Subject: [PATCH 2/2] refactor(tracing): remove object store instrumentation from liquid-cache path Object store I/O happens on the liquid-cache server, not the client, so client-side object store wrappers see no traffic. Remove the with_object_store_mapper hook and keep only the session state mapper for datafusion-tracing instrumentation. --- src/engine.rs | 7 ++--- src/thirdparty/liquid_cache_client_builder.rs | 29 +------------------ 2 files changed, 3 insertions(+), 33 deletions(-) diff --git a/src/engine.rs b/src/engine.rs index b882ea2..e24911f 100644 --- a/src/engine.rs +++ b/src/engine.rs @@ -2738,12 +2738,9 @@ fn build_instrumented_context( if let Some((server_address, store_configs)) = liquid_cache_config { info!(server = %server_address, "Building liquid-cache session context"); + // Object store instrumentation is skipped here: actual I/O happens on the + // liquid-cache server, so client-side object store wrappers see no traffic. let mut liquid_cache_builder = LiquidCacheClientBuilder::new(&server_address) - .with_object_store_mapper(|store, url| { - let url_ref: &url::Url = url.as_ref(); - let prefix = url_ref.scheme(); - instrument_object_store(store, prefix) - }) .with_session_state_mapper(|builder| { builder.with_physical_optimizer_rule( instrument_with_info_spans!(options: InstrumentationOptions::default()), diff --git a/src/thirdparty/liquid_cache_client_builder.rs b/src/thirdparty/liquid_cache_client_builder.rs index 262efe6..6e042ef 100644 --- a/src/thirdparty/liquid_cache_client_builder.rs +++ b/src/thirdparty/liquid_cache_client_builder.rs @@ -7,10 +7,6 @@ use datafusion::execution::runtime_env::RuntimeEnv; use datafusion::execution::SessionStateBuilder; use datafusion::prelude::*; use liquid_cache_client::PushdownOptimizer; -use object_store::ObjectStore; - -type ObjectStoreMapper = - Box, &ObjectStoreUrl) -> Arc + Send + Sync>; type SessionStateMapper = Box SessionStateBuilder + Send>; @@ -20,7 +16,6 @@ type SessionStateMapper = Box SessionStateBui pub struct LiquidCacheClientBuilder { object_stores: Vec<(ObjectStoreUrl, HashMap)>, cache_server: String, - object_store_mapper: Option, session_state_mapper: Option, } @@ -30,7 +25,6 @@ impl LiquidCacheClientBuilder { Self { object_stores: vec![], cache_server: cache_server.as_ref().to_string(), - object_store_mapper: None, session_state_mapper: None, } } @@ -47,22 +41,6 @@ impl LiquidCacheClientBuilder { self } - /// Set a mapper function that transforms each object store before registration. - /// - /// This is useful for wrapping object stores with instrumentation or middleware. - /// The function receives the object store and its URL, and should return the - /// (possibly wrapped) object store. - pub fn with_object_store_mapper( - mut self, - f: impl Fn(Arc, &ObjectStoreUrl) -> Arc - + Send - + Sync - + 'static, - ) -> Self { - self.object_store_mapper = Some(Box::new(f)); - self - } - /// Set a mapper function that transforms the `SessionStateBuilder` after the default /// configuration and optimizer rules have been applied. /// @@ -102,12 +80,7 @@ impl LiquidCacheClientBuilder { let (object_store, _path) = object_store::parse_url_opts(object_store_url.as_ref(), options.clone()) .map_err(|e| DataFusionError::External(Box::new(e)))?; - let object_store: Arc = Arc::new(object_store); - let object_store = match &self.object_store_mapper { - Some(mapper) => mapper(object_store, object_store_url), - None => object_store, - }; - runtime_env.register_object_store(object_store_url.as_ref(), object_store); + runtime_env.register_object_store(object_store_url.as_ref(), Arc::new(object_store)); } let mut state_builder = SessionStateBuilder::new()