diff --git a/src/engine.rs b/src/engine.rs index 4e3cf94..e24911f 100644 --- a/src/engine.rs +++ b/src/engine.rs @@ -12,6 +12,7 @@ use crate::http::models::{ use crate::secrets::{EncryptedCatalogBackend, SecretManager, ENCRYPTED_PROVIDER_TYPE}; use crate::source::Source; use crate::storage::{FilesystemStorage, StorageManager}; +use crate::thirdparty::LiquidCacheClientBuilder; use anyhow::Result; use chrono::Utc; use datafusion::arrow::datatypes::Schema; @@ -23,7 +24,6 @@ use datafusion::execution::SessionStateBuilder; use datafusion::prelude::*; use datafusion_tracing::{instrument_with_info_spans, InstrumentationOptions}; use instrumented_object_store::instrument_object_store; -use liquid_cache_client::LiquidCacheClientBuilder; use object_store::ObjectStore; use std::collections::{HashMap, HashSet}; use std::path::{Path, PathBuf}; @@ -2725,15 +2725,6 @@ type LiquidCacheConfig = (String, Vec); /// /// This function creates a SessionContext with tracing instrumentation: /// -/// **Without liquid-cache:** -/// 1. Object stores are wrapped with `instrument_object_store()` for I/O tracing -/// 2. DataFusion operators get spans via `datafusion-tracing` -/// -/// **With liquid-cache:** -/// Uses `LiquidCacheClientBuilder` to create the context. Object stores are registered -/// with the builder (for the liquid-cache server), not locally instrumented since -/// data fetching happens on the server side. -/// /// ## Arguments /// /// * `object_stores` - Pre-built object stores to wrap with instrumentation (non-liquid-cache path) @@ -2747,7 +2738,14 @@ fn build_instrumented_context( if let Some((server_address, store_configs)) = liquid_cache_config { info!(server = %server_address, "Building liquid-cache session context"); - let mut liquid_cache_builder = LiquidCacheClientBuilder::new(&server_address); + // Object store instrumentation is skipped here: actual I/O happens on the + // liquid-cache server, so client-side object store wrappers see no traffic. + let mut liquid_cache_builder = LiquidCacheClientBuilder::new(&server_address) + .with_session_state_mapper(|builder| { + builder.with_physical_optimizer_rule( + instrument_with_info_spans!(options: InstrumentationOptions::default()), + ) + }); // Register object stores with liquid-cache builder (server needs these configs) for (url, options) in store_configs { diff --git a/src/lib.rs b/src/lib.rs index ae9599d..43f0827 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -10,6 +10,7 @@ pub mod secrets; pub mod source; pub mod storage; pub mod telemetry; +pub mod thirdparty; pub use engine::{QueryResponse, RuntimeEngine, RuntimeEngineBuilder}; pub use source::Source; diff --git a/src/thirdparty/liquid_cache_client_builder.rs b/src/thirdparty/liquid_cache_client_builder.rs new file mode 100644 index 0000000..6e042ef --- /dev/null +++ b/src/thirdparty/liquid_cache_client_builder.rs @@ -0,0 +1,101 @@ +use std::collections::HashMap; +use std::sync::Arc; + +use datafusion::error::{DataFusionError, Result}; +use datafusion::execution::object_store::ObjectStoreUrl; +use datafusion::execution::runtime_env::RuntimeEnv; +use datafusion::execution::SessionStateBuilder; +use datafusion::prelude::*; +use liquid_cache_client::PushdownOptimizer; + +type SessionStateMapper = Box SessionStateBuilder + Send>; + +// Local rewrite of `liquid_cache_client::LiquidCacheClientBuilder` to support +// object store and session state instrumentation hooks. Remove this once the +// upstream library exposes these extension points. +pub struct LiquidCacheClientBuilder { + object_stores: Vec<(ObjectStoreUrl, HashMap)>, + cache_server: String, + session_state_mapper: Option, +} + +impl LiquidCacheClientBuilder { + /// Create a new builder for LiquidCache client state. + pub fn new(cache_server: impl AsRef) -> Self { + Self { + object_stores: vec![], + cache_server: cache_server.as_ref().to_string(), + session_state_mapper: None, + } + } + + /// Add an object store to the builder. + /// Checkout for available options. + pub fn with_object_store( + mut self, + url: ObjectStoreUrl, + object_store_options: Option>, + ) -> Self { + self.object_stores + .push((url, object_store_options.unwrap_or_default())); + self + } + + /// Set a mapper function that transforms the `SessionStateBuilder` after the default + /// configuration and optimizer rules have been applied. + /// + /// This is useful for adding additional physical optimizer rules (e.g., tracing instrumentation). + pub fn with_session_state_mapper( + mut self, + f: impl FnOnce(SessionStateBuilder) -> SessionStateBuilder + Send + 'static, + ) -> Self { + self.session_state_mapper = Some(Box::new(f)); + self + } + + /// Build the [SessionContext]. + pub fn build(self, config: SessionConfig) -> Result { + let mut session_config = config; + session_config + .options_mut() + .execution + .parquet + .pushdown_filters = true; + session_config + .options_mut() + .execution + .parquet + .schema_force_view_types = false; + session_config + .options_mut() + .execution + .parquet + .binary_as_string = true; + session_config.options_mut().execution.batch_size = 8192 * 2; + + let runtime_env = Arc::new(RuntimeEnv::default()); + + // Register object stores + for (object_store_url, options) in &self.object_stores { + let (object_store, _path) = + object_store::parse_url_opts(object_store_url.as_ref(), options.clone()) + .map_err(|e| DataFusionError::External(Box::new(e)))?; + runtime_env.register_object_store(object_store_url.as_ref(), Arc::new(object_store)); + } + + let mut state_builder = SessionStateBuilder::new() + .with_config(session_config) + .with_runtime_env(runtime_env) + .with_default_features() + .with_physical_optimizer_rule(Arc::new(PushdownOptimizer::new( + self.cache_server.clone(), + self.object_stores.clone(), + ))); + + if let Some(mapper) = self.session_state_mapper { + state_builder = mapper(state_builder); + } + + Ok(SessionContext::new_with_state(state_builder.build())) + } +} diff --git a/src/thirdparty/mod.rs b/src/thirdparty/mod.rs new file mode 100644 index 0000000..032d9f4 --- /dev/null +++ b/src/thirdparty/mod.rs @@ -0,0 +1,3 @@ +mod liquid_cache_client_builder; + +pub use liquid_cache_client_builder::LiquidCacheClientBuilder;