diff --git a/benchmarks/src/bin/tpch.rs b/benchmarks/src/bin/tpch.rs index 8a78c357b951e..8b83ed7994bd8 100644 --- a/benchmarks/src/bin/tpch.rs +++ b/benchmarks/src/bin/tpch.rs @@ -325,30 +325,31 @@ async fn execute_query( enable_scheduler: bool, ) -> Result> { let plan = ctx.sql(sql).await?; - let plan = plan.into_unoptimized_plan(); + let (state, plan) = plan.into_parts(); if debug { println!("=== Logical plan ===\n{:?}\n", plan); } - let plan = ctx.optimize(&plan)?; + let plan = state.optimize(&plan)?; if debug { println!("=== Optimized logical plan ===\n{:?}\n", plan); } - let physical_plan = ctx.create_physical_plan(&plan).await?; + let physical_plan = state.create_physical_plan(&plan).await?; if debug { println!( "=== Physical plan ===\n{}\n", displayable(physical_plan.as_ref()).indent() ); } - let task_ctx = ctx.task_ctx(); let result = if enable_scheduler { let scheduler = Scheduler::new(num_cpus::get()); - let results = scheduler.schedule(physical_plan.clone(), task_ctx).unwrap(); + let results = scheduler + .schedule(physical_plan.clone(), state.task_ctx()) + .unwrap(); results.stream().try_collect().await? } else { - collect(physical_plan.clone(), task_ctx).await? + collect(physical_plan.clone(), state.task_ctx()).await? }; if debug { println!( diff --git a/datafusion/core/src/dataframe.rs b/datafusion/core/src/dataframe.rs index 60d79490cf6fb..8b1877f2f816f 100644 --- a/datafusion/core/src/dataframe.rs +++ b/datafusion/core/src/dataframe.rs @@ -510,6 +510,11 @@ impl DataFrame { &self.plan } + /// Returns both the [`LogicalPlan`] and [`SessionState`] that comprise this [`DataFrame`] + pub fn into_parts(self) -> (SessionState, LogicalPlan) { + (self.session_state, self.plan) + } + /// Return the logical plan represented by this DataFrame without running the optimizers /// /// Note: This method should not be used outside testing, as it loses the snapshot diff --git a/datafusion/core/src/execution/context.rs b/datafusion/core/src/execution/context.rs index fcfcf1ca69035..c48a8b2380095 100644 --- a/datafusion/core/src/execution/context.rs +++ b/datafusion/core/src/execution/context.rs @@ -991,11 +991,17 @@ impl SessionContext { } /// Optimizes the logical plan by applying optimizer rules. + #[deprecated( + note = "Use SessionState::optimize to ensure a consistent state for planning and execution" + )] pub fn optimize(&self, plan: &LogicalPlan) -> Result { self.state.read().optimize(plan) } /// Creates a physical plan from a logical plan. + #[deprecated( + note = "Use SessionState::create_physical_plan or DataFrame::create_physical_plan to ensure a consistent state for planning and execution" + )] pub async fn create_physical_plan( &self, logical_plan: &LogicalPlan, diff --git a/datafusion/core/tests/custom_sources.rs b/datafusion/core/tests/custom_sources.rs index f9d0c555532c8..b97473e14cef0 100644 --- a/datafusion/core/tests/custom_sources.rs +++ b/datafusion/core/tests/custom_sources.rs @@ -208,11 +208,12 @@ async fn custom_source_dataframe() -> Result<()> { let ctx = SessionContext::new(); let table = ctx.read_table(Arc::new(CustomTableProvider))?; - let logical_plan = LogicalPlanBuilder::from(table.into_optimized_plan()?) + let (state, plan) = table.into_parts(); + let logical_plan = LogicalPlanBuilder::from(plan) .project(vec![col("c2")])? .build()?; - let optimized_plan = ctx.optimize(&logical_plan)?; + let optimized_plan = state.optimize(&logical_plan)?; match &optimized_plan { LogicalPlan::Projection(Projection { input, .. }) => match &**input { LogicalPlan::TableScan(TableScan { @@ -235,13 +236,12 @@ async fn custom_source_dataframe() -> Result<()> { ); assert_eq!(format!("{:?}", optimized_plan), expected); - let physical_plan = ctx.create_physical_plan(&optimized_plan).await?; + let physical_plan = state.create_physical_plan(&optimized_plan).await?; assert_eq!(1, physical_plan.schema().fields().len()); assert_eq!("c2", physical_plan.schema().field(0).name().as_str()); - let task_ctx = ctx.task_ctx(); - let batches = collect(physical_plan, task_ctx).await?; + let batches = collect(physical_plan, state.task_ctx()).await?; let origin_rec_batch = TEST_CUSTOM_RECORD_BATCH!()?; assert_eq!(1, batches.len()); assert_eq!(1, batches[0].num_columns()); @@ -261,10 +261,7 @@ async fn optimizers_catch_all_statistics() { .await .unwrap(); - let physical_plan = ctx - .create_physical_plan(&df.into_optimized_plan().unwrap()) - .await - .unwrap(); + let physical_plan = df.create_physical_plan().await.unwrap(); // when the optimization kicks in, the source is replaced by an EmptyExec assert!( diff --git a/datafusion/core/tests/parquet/mod.rs b/datafusion/core/tests/parquet/mod.rs index 58f22716bbf17..d3bf4909c7f6f 100644 --- a/datafusion/core/tests/parquet/mod.rs +++ b/datafusion/core/tests/parquet/mod.rs @@ -210,15 +210,15 @@ impl ContextWithParquet { .expect("getting input"); let pretty_input = pretty_format_batches(&input).unwrap().to_string(); - let logical_plan = self.ctx.optimize(&logical_plan).expect("optimizing plan"); + let state = self.ctx.state(); + let logical_plan = state.optimize(&logical_plan).expect("optimizing plan"); - let physical_plan = self - .ctx + let physical_plan = state .create_physical_plan(&logical_plan) .await .expect("creating physical plan"); - let task_ctx = self.ctx.task_ctx(); + let task_ctx = state.task_ctx(); let results = datafusion::physical_plan::collect(physical_plan.clone(), task_ctx) .await .expect("Running"); diff --git a/datafusion/core/tests/sql/aggregates.rs b/datafusion/core/tests/sql/aggregates.rs index 97f043d9e0221..77e9558f5878d 100644 --- a/datafusion/core/tests/sql/aggregates.rs +++ b/datafusion/core/tests/sql/aggregates.rs @@ -1109,6 +1109,7 @@ async fn count_distinct_integers_aggregated_multiple_partitions() -> Result<()> #[tokio::test] async fn aggregate_with_alias() -> Result<()> { let ctx = SessionContext::new(); + let state = ctx.state(); let schema = Arc::new(Schema::new(vec![ Field::new("c1", DataType::Utf8, false), @@ -1120,9 +1121,8 @@ async fn aggregate_with_alias() -> Result<()> { .project(vec![col("c1"), sum(col("c2")).alias("total_salary")])? .build()?; - let plan = ctx.optimize(&plan)?; - - let physical_plan = ctx.create_physical_plan(&Arc::new(plan)).await?; + let plan = state.optimize(&plan)?; + let physical_plan = state.create_physical_plan(&Arc::new(plan)).await?; assert_eq!("c1", physical_plan.schema().field(0).name().as_str()); assert_eq!( "total_salary", diff --git a/datafusion/core/tests/sql/explain.rs b/datafusion/core/tests/sql/explain.rs index 2dd4d4f64f3a8..4774f95465d08 100644 --- a/datafusion/core/tests/sql/explain.rs +++ b/datafusion/core/tests/sql/explain.rs @@ -37,8 +37,11 @@ fn optimize_explain() { panic!("plan was not an explain: {:?}", plan); } + let ctx = SessionContext::new(); + let state = ctx.state(); + // now optimize the plan and expect to see more plans - let optimized_plan = SessionContext::new().optimize(&plan).unwrap(); + let optimized_plan = state.optimize(&plan).unwrap(); if let LogicalPlan::Explain(e) = &optimized_plan { // should have more than one plan assert!( diff --git a/datafusion/core/tests/sql/explain_analyze.rs b/datafusion/core/tests/sql/explain_analyze.rs index 90fd91164fe17..d3848c10aa7b7 100644 --- a/datafusion/core/tests/sql/explain_analyze.rs +++ b/datafusion/core/tests/sql/explain_analyze.rs @@ -257,9 +257,9 @@ async fn csv_explain_plans() { ); // Optimized logical plan - // + let state = ctx.state(); let msg = format!("Optimizing logical plan for '{}': {:?}", sql, plan); - let plan = ctx.optimize(plan).expect(&msg); + let plan = state.optimize(plan).expect(&msg); let optimized_logical_schema = plan.schema(); // Both schema has to be the same assert_eq!(logical_schema, optimized_logical_schema.as_ref()); @@ -334,12 +334,11 @@ async fn csv_explain_plans() { // Physical plan // Create plan let msg = format!("Creating physical plan for '{}': {:?}", sql, plan); - let plan = ctx.create_physical_plan(&plan).await.expect(&msg); + let plan = state.create_physical_plan(&plan).await.expect(&msg); // // Execute plan let msg = format!("Executing physical plan for '{}': {:?}", sql, plan); - let task_ctx = ctx.task_ctx(); - let results = collect(plan, task_ctx).await.expect(&msg); + let results = collect(plan, state.task_ctx()).await.expect(&msg); let actual = result_vec(&results); // flatten to a single string let actual = actual.into_iter().map(|r| r.join("\t")).collect::(); @@ -481,9 +480,9 @@ async fn csv_explain_verbose_plans() { ); // Optimized logical plan - // let msg = format!("Optimizing logical plan for '{}': {:?}", sql, dataframe); - let plan = dataframe.into_optimized_plan().expect(&msg); + let (state, plan) = dataframe.into_parts(); + let plan = state.optimize(&plan).expect(&msg); let optimized_logical_schema = plan.schema(); // Both schema has to be the same assert_eq!(&logical_schema, optimized_logical_schema.as_ref()); @@ -558,7 +557,7 @@ async fn csv_explain_verbose_plans() { // Physical plan // Create plan let msg = format!("Creating physical plan for '{}': {:?}", sql, plan); - let plan = ctx.create_physical_plan(&plan).await.expect(&msg); + let plan = state.create_physical_plan(&plan).await.expect(&msg); // // Execute plan let msg = format!("Executing physical plan for '{}': {:?}", sql, plan); diff --git a/datafusion/core/tests/sql/mod.rs b/datafusion/core/tests/sql/mod.rs index 41ccdb9a24bbb..c7822e5b4a3d0 100644 --- a/datafusion/core/tests/sql/mod.rs +++ b/datafusion/core/tests/sql/mod.rs @@ -1022,13 +1022,11 @@ async fn try_execute_to_batches( ) -> Result> { let dataframe = ctx.sql(sql).await?; let logical_schema = dataframe.schema().clone(); + let (state, plan) = dataframe.into_parts(); - let optimized = ctx.optimize(dataframe.logical_plan())?; - let optimized_logical_schema = optimized.schema(); - let results = dataframe.collect().await?; - - assert_eq!(&logical_schema, optimized_logical_schema.as_ref()); - Ok(results) + let optimized = state.optimize(&plan)?; + assert_eq!(&logical_schema, optimized.schema().as_ref()); + DataFrame::new(state, optimized).collect().await } /// Execute query and return results as a Vec of RecordBatches diff --git a/datafusion/core/tests/sql/projection.rs b/datafusion/core/tests/sql/projection.rs index 524317dd6d743..2ba80bcd0819b 100644 --- a/datafusion/core/tests/sql/projection.rs +++ b/datafusion/core/tests/sql/projection.rs @@ -172,7 +172,8 @@ async fn projection_on_table_scan() -> Result<()> { .project(vec![col("c2")])? .build()?; - let optimized_plan = ctx.optimize(&logical_plan)?; + let state = ctx.state(); + let optimized_plan = state.optimize(&logical_plan)?; match &optimized_plan { LogicalPlan::Projection(Projection { input, .. }) => match &**input { LogicalPlan::TableScan(TableScan { @@ -192,12 +193,11 @@ async fn projection_on_table_scan() -> Result<()> { \n TableScan: test projection=[c2]"; assert_eq!(format!("{:?}", optimized_plan), expected); - let physical_plan = ctx.create_physical_plan(&optimized_plan).await?; + let physical_plan = state.create_physical_plan(&optimized_plan).await?; assert_eq!(1, physical_plan.schema().fields().len()); assert_eq!("c2", physical_plan.schema().field(0).name().as_str()); - let task_ctx = ctx.task_ctx(); - let batches = collect(physical_plan, task_ctx).await?; + let batches = collect(physical_plan, state.task_ctx()).await?; assert_eq!(40, batches.iter().map(|x| x.num_rows()).sum::()); Ok(()) @@ -215,8 +215,8 @@ async fn preserve_nullability_on_projection() -> Result<()> { .project(vec![col("c1")])? .build()?; - let plan = ctx.optimize(&plan)?; - let physical_plan = ctx.create_physical_plan(&Arc::new(plan)).await?; + let dataframe = DataFrame::new(ctx.state(), plan); + let physical_plan = dataframe.create_physical_plan().await?; assert!(!physical_plan.schema().field_with_name("c1")?.is_nullable()); Ok(()) } @@ -247,9 +247,8 @@ async fn project_cast_dictionary() { .unwrap(); let logical_plan = builder.project(vec![expr]).unwrap().build().unwrap(); - - let physical_plan = ctx.create_physical_plan(&logical_plan).await.unwrap(); - let actual = collect(physical_plan, ctx.task_ctx()).await.unwrap(); + let df = DataFrame::new(ctx.state(), logical_plan); + let actual = df.collect().await.unwrap(); let expected = vec![ "+----------------------------------------------------------------------------------+", @@ -289,7 +288,8 @@ async fn projection_on_memory_scan() -> Result<()> { assert_fields_eq(&plan, vec!["b"]); let ctx = SessionContext::new(); - let optimized_plan = ctx.optimize(&plan)?; + let state = ctx.state(); + let optimized_plan = state.optimize(&plan)?; match &optimized_plan { LogicalPlan::Projection(Projection { input, .. }) => match &**input { LogicalPlan::TableScan(TableScan { @@ -312,13 +312,12 @@ async fn projection_on_memory_scan() -> Result<()> { ); assert_eq!(format!("{:?}", optimized_plan), expected); - let physical_plan = ctx.create_physical_plan(&optimized_plan).await?; + let physical_plan = state.create_physical_plan(&optimized_plan).await?; assert_eq!(1, physical_plan.schema().fields().len()); assert_eq!("b", physical_plan.schema().field(0).name().as_str()); - let task_ctx = ctx.task_ctx(); - let batches = collect(physical_plan, task_ctx).await?; + let batches = collect(physical_plan, state.task_ctx()).await?; assert_eq!(1, batches.len()); assert_eq!(1, batches[0].num_columns()); assert_eq!(4, batches[0].num_rows()); diff --git a/datafusion/core/tests/sql/udf.rs b/datafusion/core/tests/sql/udf.rs index eb416901a3e2f..d07fdd91d3063 100644 --- a/datafusion/core/tests/sql/udf.rs +++ b/datafusion/core/tests/sql/udf.rs @@ -90,10 +90,7 @@ async fn scalar_udf() -> Result<()> { "Projection: t.a, t.b, my_add(t.a, t.b)\n TableScan: t projection=[a, b]" ); - let plan = ctx.optimize(&plan)?; - let plan = ctx.create_physical_plan(&plan).await?; - let task_ctx = ctx.task_ctx(); - let result = collect(plan, task_ctx).await?; + let result = DataFrame::new(ctx.state(), plan).collect().await?; let expected = vec![ "+-----+-----+-----------------+", diff --git a/datafusion/core/tests/tpcds_planning.rs b/datafusion/core/tests/tpcds_planning.rs index 6ecd6dcf410d5..ddd8898e4d26d 100644 --- a/datafusion/core/tests/tpcds_planning.rs +++ b/datafusion/core/tests/tpcds_planning.rs @@ -1067,9 +1067,10 @@ async fn regression_test(query_no: u8, create_physical: bool) -> Result<()> { for sql in &sql { let df = ctx.sql(sql).await?; - let plan = df.into_optimized_plan()?; + let (state, plan) = df.into_parts(); + let plan = state.optimize(&plan)?; if create_physical { - let _ = ctx.create_physical_plan(&plan).await?; + let _ = state.create_physical_plan(&plan).await?; } }