Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 9 additions & 11 deletions src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use crate::http::models::{
use crate::secrets::{EncryptedCatalogBackend, SecretManager, ENCRYPTED_PROVIDER_TYPE};
use crate::source::Source;
use crate::storage::{FilesystemStorage, StorageManager};
use crate::thirdparty::LiquidCacheClientBuilder;
use anyhow::Result;
use chrono::Utc;
use datafusion::arrow::datatypes::Schema;
Expand All @@ -23,7 +24,6 @@ use datafusion::execution::SessionStateBuilder;
use datafusion::prelude::*;
use datafusion_tracing::{instrument_with_info_spans, InstrumentationOptions};
use instrumented_object_store::instrument_object_store;
use liquid_cache_client::LiquidCacheClientBuilder;
use object_store::ObjectStore;
use std::collections::{HashMap, HashSet};
use std::path::{Path, PathBuf};
Expand Down Expand Up @@ -2725,15 +2725,6 @@ type LiquidCacheConfig = (String, Vec<ObjectStoreConfig>);
///
/// This function creates a SessionContext with tracing instrumentation:
///
/// **Without liquid-cache:**
/// 1. Object stores are wrapped with `instrument_object_store()` for I/O tracing
/// 2. DataFusion operators get spans via `datafusion-tracing`
///
/// **With liquid-cache:**
/// Uses `LiquidCacheClientBuilder` to create the context. Object stores are registered
/// with the builder (for the liquid-cache server), not locally instrumented since
/// data fetching happens on the server side.
///
/// ## Arguments
///
/// * `object_stores` - Pre-built object stores to wrap with instrumentation (non-liquid-cache path)
Expand All @@ -2747,7 +2738,14 @@ fn build_instrumented_context(
if let Some((server_address, store_configs)) = liquid_cache_config {
info!(server = %server_address, "Building liquid-cache session context");

let mut liquid_cache_builder = LiquidCacheClientBuilder::new(&server_address);
// Object store instrumentation is skipped here: actual I/O happens on the
// liquid-cache server, so client-side object store wrappers see no traffic.
let mut liquid_cache_builder = LiquidCacheClientBuilder::new(&server_address)
.with_session_state_mapper(|builder| {
builder.with_physical_optimizer_rule(
instrument_with_info_spans!(options: InstrumentationOptions::default()),
)
});

// Register object stores with liquid-cache builder (server needs these configs)
for (url, options) in store_configs {
Expand Down
1 change: 1 addition & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ pub mod secrets;
pub mod source;
pub mod storage;
pub mod telemetry;
pub mod thirdparty;

pub use engine::{QueryResponse, RuntimeEngine, RuntimeEngineBuilder};
pub use source::Source;
101 changes: 101 additions & 0 deletions src/thirdparty/liquid_cache_client_builder.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
use std::collections::HashMap;
use std::sync::Arc;

use datafusion::error::{DataFusionError, Result};
use datafusion::execution::object_store::ObjectStoreUrl;
use datafusion::execution::runtime_env::RuntimeEnv;
use datafusion::execution::SessionStateBuilder;
use datafusion::prelude::*;
use liquid_cache_client::PushdownOptimizer;

type SessionStateMapper = Box<dyn FnOnce(SessionStateBuilder) -> SessionStateBuilder + Send>;

// Local rewrite of `liquid_cache_client::LiquidCacheClientBuilder` to support
// object store and session state instrumentation hooks. Remove this once the
// upstream library exposes these extension points.
pub struct LiquidCacheClientBuilder {
object_stores: Vec<(ObjectStoreUrl, HashMap<String, String>)>,
cache_server: String,
session_state_mapper: Option<SessionStateMapper>,
}

impl LiquidCacheClientBuilder {
/// Create a new builder for LiquidCache client state.
pub fn new(cache_server: impl AsRef<str>) -> Self {
Self {
object_stores: vec![],
cache_server: cache_server.as_ref().to_string(),
session_state_mapper: None,
}
}

/// Add an object store to the builder.
/// Checkout <https://docs.rs/object_store/latest/object_store/fn.parse_url_opts.html> for available options.
pub fn with_object_store(
mut self,
url: ObjectStoreUrl,
object_store_options: Option<HashMap<String, String>>,
) -> Self {
self.object_stores
.push((url, object_store_options.unwrap_or_default()));
self
}

/// Set a mapper function that transforms the `SessionStateBuilder` after the default
/// configuration and optimizer rules have been applied.
///
/// This is useful for adding additional physical optimizer rules (e.g., tracing instrumentation).
pub fn with_session_state_mapper(
mut self,
f: impl FnOnce(SessionStateBuilder) -> SessionStateBuilder + Send + 'static,
) -> Self {
self.session_state_mapper = Some(Box::new(f));
self
}

/// Build the [SessionContext].
pub fn build(self, config: SessionConfig) -> Result<SessionContext> {
let mut session_config = config;
session_config
.options_mut()
.execution
.parquet
.pushdown_filters = true;
session_config
.options_mut()
.execution
.parquet
.schema_force_view_types = false;
session_config
.options_mut()
.execution
.parquet
.binary_as_string = true;
session_config.options_mut().execution.batch_size = 8192 * 2;

let runtime_env = Arc::new(RuntimeEnv::default());

// Register object stores
for (object_store_url, options) in &self.object_stores {
let (object_store, _path) =
object_store::parse_url_opts(object_store_url.as_ref(), options.clone())
.map_err(|e| DataFusionError::External(Box::new(e)))?;
runtime_env.register_object_store(object_store_url.as_ref(), Arc::new(object_store));
}

let mut state_builder = SessionStateBuilder::new()
.with_config(session_config)
.with_runtime_env(runtime_env)
.with_default_features()
.with_physical_optimizer_rule(Arc::new(PushdownOptimizer::new(
self.cache_server.clone(),
self.object_stores.clone(),
)));

if let Some(mapper) = self.session_state_mapper {
state_builder = mapper(state_builder);
}

Ok(SessionContext::new_with_state(state_builder.build()))
}
}
3 changes: 3 additions & 0 deletions src/thirdparty/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
mod liquid_cache_client_builder;

pub use liquid_cache_client_builder::LiquidCacheClientBuilder;
Loading