Skip to content

Comments

feat(tracing): add instrumentation hooks to liquid-cache builder#101

Draft
anoop-narang wants to merge 2 commits intomainfrom
liquid-cache-instrumentation
Draft

feat(tracing): add instrumentation hooks to liquid-cache builder#101
anoop-narang wants to merge 2 commits intomainfrom
liquid-cache-instrumentation

Conversation

@anoop-narang
Copy link
Contributor

Rewrite LiquidCacheClientBuilder locally to support object store and session state mapper hooks, enabling instrumentation for the liquid-cache code path. The builder now wraps object stores with tracing and adds the datafusion-tracing physical optimizer rule via the new extension points.

Rewrite LiquidCacheClientBuilder locally to support object store and
session state mapper hooks, enabling instrumentation for the liquid-cache
code path. The builder now wraps object stores with tracing and adds the
datafusion-tracing physical optimizer rule via the new extension points.
@anoop-narang
Copy link
Contributor Author

/build

@github-actions
Copy link

github-actions bot commented Feb 5, 2026

🐳 Docker image built and pushed:

ghcr.io/hotdata-dev/runtimedb:liquid-cache-instrumentation

…che path

Object store I/O happens on the liquid-cache server, not the client,
so client-side object store wrappers see no traffic. Remove the
with_object_store_mapper hook and keep only the session state mapper
for datafusion-tracing instrumentation.
@anoop-narang
Copy link
Contributor Author

/build

@github-actions
Copy link

github-actions bot commented Feb 5, 2026

🐳 Docker image built and pushed:

ghcr.io/hotdata-dev/runtimedb:liquid-cache-instrumentation

@anoop-narang
Copy link
Contributor Author

Error1

thread 'tokio-runtime-worker' (8) panicked at /home/runner/.cargo/git/checkouts/liquid-cache-0b27d150652af81d/5e63d81/src/client/src/client_exec.rs:357:64:
called `Result::unwrap()` on an `Err` value: Internal("Unsupported plan and extension codec failed with [This feature is not implemented: PhysicalExtensionCodec is not provided]. Plan: InstrumentedExec { inner: RepartitionExec { input: InstrumentedExec { inner: DataSourceExec { data_source: FileScanConfig {object_store_url=ObjectStoreUrl { url: Url { scheme: \"s3\", cannot_be_a_base: false, username: \"\", password: None, host: Some(Domain(\"amzn-hotdata-demo-bucket\")), port: None, path: \"/\", query: None, fragment: None } }, statistics=Statistics { num_rows: Absent, total_byte_size: Absent, column_statistics: [ColumnStatistics { null_count: Absent, max_value: Absent, min_value: Absent, sum_value: Absent, distinct_count: Absent }, ColumnStatistics { null_count: Absent, max_value: Absent, min_value: Absent, sum_value: Absent, distinct_count: Absent }, ColumnStatistics { null_count: Absent, max_value: Absent, min_value: Absent, sum_value: Absent, distinct_count: Absent }, ColumnStatistics { null_count: Absent, max_value: Absent, min_value: Absent, sum_value: Absent, distinct_count: Absent }, ColumnStatistics { null_count: Absent, max_value: Absent, min_value: Absent, sum_value: Absent, distinct_count: Absent }, ColumnStatistics { null_count: Absent, max_value: Absent, min_value: Absent, sum_value: Absent, distinct_count: Absent }, ColumnStatistics { null_count: Absent, max_value: Absent, min_value: Absent, sum_value: Absent, distinct_count: Absent }, ColumnStatistics { null_count: Absent, max_value: Absent, min_value: Absent, sum_value: Absent, distinct_count: Absent }, ColumnStatistics { null_count: Absent, max_value: Absent, min_value: Absent, sum_value: Absent, distinct_count: Absent }] }, file_groups={1 group: [[cache/connmuuaniq0fa811pl1chnwmr4h2q/tpch_sf1/part/1mhZWzsL/data.parquet]]}, projection=[p_type]}, cache: PlanProperties { eq_properties: EquivalenceProperties { eq_group: EquivalenceGroup { map: {}, classes: [] }, oeq_class: OrderingEquivalenceClass { orderings: [] }, oeq_cache: OrderingEquivalenceCache { normal_cls: OrderingEquivalenceClass { orderings: [] }, leading_map: {} }, constraints: Constraints { inner: [] }, schema: Schema { fields: [Field { name: \"p_type\", data_type: Utf8 }], metadata: {} } }, partitioning: UnknownPartitioning(1), emission_type: Incremental, boundedness: Bounded, evaluation_type: Lazy, scheduling_type: Cooperative, output_ordering: None } } }, state: Mutex { data: NotInitialized }, metrics: ExecutionPlanMetricsSet { inner: Mutex { data: MetricsSet { metrics: [] } } }, preserve_order: false, cache: PlanProperties { eq_properties: EquivalenceProperties { eq_group: EquivalenceGroup { map: {}, classes: [] }, oeq_class: OrderingEquivalenceClass { orderings: [] }, oeq_cache: OrderingEquivalenceCache { normal_cls: OrderingEquivalenceClass { orderings: [] }, leading_map: {} }, constraints: Constraints { inner: [] }, schema: Schema { fields: [Field { name: \"p_type\", data_type: Utf8 }], metadata: {} } }, partitioning: RoundRobinBatch(4), emission_type: Incremental, boundedness: Bounded, evaluation_type: Eager, scheduling_type: Cooperative, output_ordering: None } } }")

Error2

thread 'tokio-runtime-worker' (8) panicked at /home/runner/.cargo/git/checkouts/liquid-cache-0b27d150652af81d/5e63d81/src/client/src/client_exec.rs:357:64:
called `Result::unwrap()` on an `Err` value: Internal("Unsupported plan and extension codec failed with [This feature is not implemented: PhysicalExtensionCodec is not provided]. Plan: InstrumentedExec { inner: AggregateExec { mode: Partial, group_by: PhysicalGroupBy { expr: [], null_expr: [], groups: [] }, aggr_expr: [AggregateFunctionExpr { fun: AggregateUDF { inner: Avg { signature: Signature { type_signature: UserDefined, volatility: Immutable, parameter_names: None }, aliases: [\"mean\"] } }, args: [Column { name: \"o_totalprice\", index: 0 }], arg_fields: [Field { name: \"o_totalprice\", data_type: Float64 }], return_field: Field { name: \"avg\", data_type: Float64, nullable: true }, name: \"avg(tpch.tpch_sf1.orders.o_totalprice)\", human_display: \"avg(tpch.tpch_sf1.orders.o_totalprice)\", schema: Schema { fields: [Field { name: \"o_totalprice\", data_type: Float64 }], metadata: {} }, order_bys: [], ignore_nulls: false, ordering_fields: [], is_distinct: false, is_reversed: false, input_fields: [Field { name: \"o_totalprice\", data_type: Float64 }], is_nullable: true }], filter_expr: [None], limit: None, input: InstrumentedExec { inner: RepartitionExec { input: InstrumentedExec { inner: DataSourceExec { data_source: FileScanConfig {object_store_url=ObjectStoreUrl { url: Url { scheme: \"s3\", cannot_be_a_base: false, username: \"\", password: None, host: Some(Domain(\"amzn-hotdata-demo-bucket\")), port: None, path: \"/\", query: None, fragment: None } }, statistics=Statistics { num_rows: Absent, total_byte_size: Absent, column_statistics: [ColumnStatistics { null_count: Absent, max_value: Absent, min_value: Absent, sum_value: Absent, distinct_count: Absent }, ColumnStatistics { null_count: Absent, max_value: Absent, min_value: Absent, sum_value: Absent, distinct_count: Absent }, ColumnStatistics { null_count: Absent, max_value: Absent, min_value: Absent, sum_value: Absent, distinct_count: Absent }, ColumnStatistics { null_count: Absent, max_value: Absent, min_value: Absent, sum_value: Absent, distinct_count: Absent }, ColumnStatistics { null_count: Absent, max_value: Absent, min_value: Absent, sum_value: Absent, distinct_count: Absent }, ColumnStatistics { null_count: Absent, max_value: Absent, min_value: Absent, sum_value: Absent, distinct_count: Absent }, ColumnStatistics { null_count: Absent, max_value: Absent, min_value: Absent, sum_value: Absent, distinct_count: Absent }, ColumnStatistics { null_count: Absent, max_value: Absent, min_value: Absent, sum_value: Absent, distinct_count: Absent }, ColumnStatistics { null_count: Absent, max_value: Absent, min_value: Absent, sum_value: Absent, distinct_count: Absent }] }, file_groups={1 group: [[cache/connmuuaniq0fa811pl1chnwmr4h2q/tpch_sf1/orders/o6eg7o0k/data.parquet]]}, projection=[o_totalprice]}, cache: PlanProperties { eq_properties: EquivalenceProperties { eq_group: EquivalenceGroup { map: {}, classes: [] }, oeq_class: OrderingEquivalenceClass { orderings: [] }, oeq_cache: OrderingEquivalenceCache { normal_cls: OrderingEquivalenceClass { orderings: [] }, leading_map: {} }, constraints: Constraints { inner: [] }, schema: Schema { fields: [Field { name: \"o_totalprice\", data_type: Float64 }], metadata: {} } }, partitioning: UnknownPartitioning(1), emission_type: Incremental, boundedness: Bounded, evaluation_type: Lazy, scheduling_type: Cooperative, output_ordering: None } } }, state: Mutex { data: NotInitialized }, metrics: ExecutionPlanMetricsSet { inner: Mutex { data: MetricsSet { metrics: [] } } }, preserve_order: false, cache: PlanProperties { eq_properties: EquivalenceProperties { eq_group: EquivalenceGroup { map: {}, classes: [] }, oeq_class: OrderingEquivalenceClass { orderings: [] }, oeq_cache: OrderingEquivalenceCache { normal_cls: OrderingEquivalenceClass { orderings: [] }, leading_map: {} }, constraints: Constraints { inner: [] }, schema: Schema { fields: [Field { name: \"o_totalprice\", data_type: Float64 }], metadata: {} } }, partitioning: RoundRobinBatch(4), emission_type: Incremental, boundedness: Bounded, evaluation_type: Eager, scheduling_type: Cooperative, output_ordering: None } } }, schema: Schema { fields: [Field { name: \"avg(tpch.tpch_sf1.orders.o_totalprice)[count]\", data_type: UInt64, nullable: true }, Field { name: \"avg(tpch.tpch_sf1.orders.o_totalprice)[sum]\", data_type: Float64, nullable: true }], metadata: {} }, input_schema: Schema { fields: [Field { name: \"o_totalprice\", data_type: Float64 }], metadata: {} }, metrics: ExecutionPlanMetricsSet { inner: Mutex { data: MetricsSet { metrics: [] } } }, required_input_ordering: None, input_order_mode: Linear, cache: PlanProperties { eq_properties: EquivalenceProperties { eq_group: EquivalenceGroup { map: {Column { name: \"avg(tpch.tpch_sf1.orders.o_totalprice)\", index: 0 }: 0}, classes: [EquivalenceClass { exprs: {Column { name: \"avg(tpch.tpch_sf1.orders.o_totalprice)\", index: 0 }}, constant: Some(Heterogeneous) }] }, oeq_class: OrderingEquivalenceClass { orderings: [] }, oeq_cache: OrderingEquivalenceCache { normal_cls: OrderingEquivalenceClass { orderings: [] }, leading_map: {} }, constraints: Constraints { inner: [Unique([])] }, schema: Schema { fields: [Field { name: \"avg(tpch.tpch_sf1.orders.o_totalprice)[count]\", data_type: UInt64, nullable: true }, Field { name: \"avg(tpch.tpch_sf1.orders.o_totalprice)[sum]\", data_type: Float64, nullable: true }], metadata: {} } }, partitioning: RoundRobinBatch(4), emission_type: Final, boundedness: Bounded, evaluation_type: Lazy, scheduling_type: NonCooperative, output_ordering: None } } }")

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant