-
Notifications
You must be signed in to change notification settings - Fork 198
[AURON #1746] Introduce NativeTakeOrderedAndProjectExec to fuse TakeOrdered + Project #1747
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
… TakeOrdered + Project
0589ddf to
1b1eb04
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
This PR introduces NativeTakeOrderedAndProjectExec to fuse the TakeOrdered and Project operations into a single native operator, improving performance by eliminating an intermediate operator. Previously, TakeOrderedAndProjectExec was converted to separate NativeTakeOrderedExec and NativeProjectExec operators, but now they're combined into a single fused operator.
Key changes:
- Renamed base classes from
NativeTakeOrderedBasetoNativeTakeOrderedAndProjectBaseand addedprojectListparameter to support projection - Updated the converter to pass
projectListdirectly to the native operator instead of wrapping with a separateProjectExec - Modified both
executeCollect()anddoExecuteNative()methods to apply projection when needed
Reviewed changes
Copilot reviewed 7 out of 7 changed files in this pull request and generated 1 comment.
Show a summary per file
| File | Description |
|---|---|
NativeTakeOrderedAndProjectBase.scala |
Renamed base classes and added projection support; applies projection in both executeCollect and doExecuteNative paths |
Shims.scala |
Updated interface method signatures to include projectList parameter and return renamed types |
AuronConverters.scala |
Simplified conversion logic by passing projectList directly instead of creating separate ProjectExec |
NativeTakeOrderedAndProjectExec.scala |
Updated concrete implementation with new projectList parameter |
NativePartialTakeOrderedExec.scala |
Updated to extend renamed base class |
ShimsImpl.scala |
Updated implementation to match new interface signatures |
AuronExecSuite.scala |
Added test coverage for both executeCollect and doExecuteNative paths |
Comments suppressed due to low confidence (3)
spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeTakeOrderedAndProjectBase.scala:170
- The class name
NativePartialTakeOrderedAndProjectBaseis misleading because this partial execution operator does not actually apply projection. The projection only happens in the final stage (in NativeTakeOrderedAndProjectBase). Consider keeping the original nameNativePartialTakeOrderedBaseor clarifying that this is just a partial step of the TakeOrderedAndProject operation.
spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeTakeOrderedAndProjectBase.scala:218 - The friendlyName "PartialTakeOrderedAndProject" is misleading because this partial execution step does not apply projection. The projection only happens in the final stage. Consider using "PartialTakeOrdered" to better reflect what this stage actually does.
spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeTakeOrderedAndProjectBase.scala:131 - The early return at line 131 does not apply the projection. When
partial.outputPartitioning.numPartitions <= 1, the method returns the partial result directly without applying the project transformation. This means that if projectList != child.output, the returned data will have incorrect columns. The early return should also handle projection similar to how it's done later in the method (lines 158-164).
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
spark-extension-shims-spark/src/test/scala/org.apache.auron/AuronExecSuite.scala
Show resolved
Hide resolved
|
cc @richox |
| // take top-K from the final partition | ||
| new NativeRDD( | ||
| sparkContext, | ||
| metrics = SparkMetricNode(metrics, shuffledRDD.metrics :: Nil), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
MetricNode tree needs to be consistent with native plans, in this case there are two nested native plans but only one MetricNode, all the metrics in this plan will be updated to the wrong place.
Which issue does this PR close?
Closes #1746
Rationale for this change
What changes are included in this PR?
Are there any user-facing changes?
How was this patch tested?