Skip to content

Conversation

@XorSum
Copy link
Contributor

@XorSum XorSum commented Dec 3, 2025

Which issue does this PR close?

Closes #1693.

Rationale for this change

What changes are included in this PR?

  • Introduce split_with_default_batch_size method, which split large batch into many small batches with default - batch_size.
  • Split the output stream of sort merge join.

Are there any user-facing changes?

No

How was this patch tested?

Performed manual testing with the code specified in the issue.

Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR addresses issue #1693 by introducing batch size normalization for sort merge join output. The sort merge join operator can produce very large batches that exceed the configured batch size, leading to memory and performance issues. The solution splits these large batches into smaller ones and then applies coalescing to ensure optimal batch sizes.

Key Changes:

  • Added split_with_default_batch_size method to ExecutionContext that splits large batches into chunks of target batch size
  • Modified SortMergeJoinExec to apply both split and coalesce operations to its output stream

Reviewed changes

Copilot reviewed 2 out of 2 changed files in this pull request and generated 3 comments.

File Description
native-engine/datafusion-ext-plans/src/common/execution_context.rs Introduces split_with_default_batch_size method with internal SplitLargeBatchStream that slices large batches into smaller chunks using the configured batch size
native-engine/datafusion-ext-plans/src/sort_merge_join_exec.rs Updates output_with_streams to wrap the output with both split and coalesce operations for batch size normalization

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines +144 to +147
pub fn split_with_default_batch_size(
self: &Arc<Self>,
input: SendableRecordBatchStream,
) -> SendableRecordBatchStream {
Copy link

Copilot AI Dec 3, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The split_with_default_batch_size method lacks documentation. Consider adding a doc comment that explains:

  • The purpose of this method (splitting large batches into smaller ones)
  • The behavior when batches are already smaller than the target size
  • The target batch size used (from batch_size())
  • The relationship with coalesce_with_default_batch_size

Example:

/// Splits large record batches into smaller batches with sizes not exceeding
/// the default batch size. Batches smaller than or equal to the target size
/// are passed through unchanged. Empty batches are filtered out.
///
/// This is typically used in combination with `coalesce_with_default_batch_size`
/// to normalize batch sizes in a stream.

Copilot uses AI. Check for mistakes.
Comment on lines +144 to +225
pub fn split_with_default_batch_size(
self: &Arc<Self>,
input: SendableRecordBatchStream,
) -> SendableRecordBatchStream {
struct SplitLargeBatchStream {
input: SendableRecordBatchStream,
current_batch: Option<RecordBatch>,
current_offset: usize,
}

impl SplitLargeBatchStream {
fn split_next_chunk(&mut self) -> Option<RecordBatch> {
let batch = self.current_batch.as_ref()?;
let target_batch_size = batch_size();
let num_rows = batch.num_rows();

if self.current_offset >= num_rows {
self.current_batch = None;
return None;
}

let chunk_size = std::cmp::min(target_batch_size, num_rows - self.current_offset);
let chunk = batch.slice(self.current_offset, chunk_size);
self.current_offset += chunk_size;

if self.current_offset >= num_rows {
self.current_batch = None;
}

Some(chunk)
}
}

impl RecordBatchStream for SplitLargeBatchStream {
fn schema(&self) -> SchemaRef {
self.input.schema()
}
}

impl Stream for SplitLargeBatchStream {
type Item = Result<RecordBatch>;

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
loop {
if let Some(chunk) = self.split_next_chunk() {
return Poll::Ready(Some(Ok(chunk)));
}

match ready!(self.input.as_mut().poll_next_unpin(cx)) {
Some(Ok(batch)) => {
if batch.is_empty() {
continue;
}

let target_batch_size = batch_size();
if target_batch_size == 0 {
return Poll::Ready(Some(Err(DataFusionError::Internal(
"Invalid batch size: 0".to_string(),
))));
}

let num_rows = batch.num_rows();
if num_rows <= target_batch_size {
return Poll::Ready(Some(Ok(batch)));
} else {
self.current_batch = Some(batch);
self.current_offset = 0;
}
}
Some(Err(e)) => return Poll::Ready(Some(Err(e))),
None => return Poll::Ready(None),
}
}
}
}

Box::pin(SplitLargeBatchStream {
input,
current_batch: None,
current_offset: 0,
})
}
Copy link

Copilot AI Dec 3, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The new split_with_default_batch_size method lacks test coverage. Consider adding unit tests to verify:

  • Splitting large batches (e.g., 10000 rows) into multiple smaller batches
  • Passing through batches already at or below target size
  • Handling empty batches
  • Error handling for zero batch size
  • Edge cases like batches with exactly target_batch_size rows

This is important to ensure the splitting logic works correctly, especially for the error case at line 199-202.

Copilot uses AI. Check for mistakes.
Comment on lines +208 to +209
Ok(exec_ctx
.coalesce_with_default_batch_size(exec_ctx.split_with_default_batch_size(output)))
Copy link

Copilot AI Dec 3, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[nitpick] The composition coalesce_with_default_batch_size(split_with_default_batch_size(output)) may create unnecessary overhead. After splitting large batches to the target size, the coalesce operation will immediately try to merge them back if they're too small (less than 1/4 of batch_size based on line 275 of execution_context.rs).

Consider whether both operations are needed here, or if just split_with_default_batch_size would suffice for the sort merge join output. The coalesce operation is typically used for small batches from sources like filters or unions, but after splitting, batches should already be close to the target size.

Suggested change
Ok(exec_ctx
.coalesce_with_default_batch_size(exec_ctx.split_with_default_batch_size(output)))
Ok(exec_ctx.split_with_default_batch_size(output))

Copilot uses AI. Check for mistakes.
@xumingming
Copy link
Contributor

@XorSum I wonder why do we need to do this manual batch split, the join algorithm itself already have batch_size handling logic, there is bug in it?

fn should_flush(&self) -> bool {
self.lindices.len() >= self.join_params.batch_size
}

@XorSum
Copy link
Contributor Author

XorSum commented Dec 4, 2025

@XorSum I wonder why do we need to do this manual batch split, the join algorithm itself already have batch_size handling logic, there is bug in it?

fn should_flush(&self) -> bool {
self.lindices.len() >= self.join_params.batch_size
}

Thanks for the comment! I do not yet understand how the join batch_size handling logic works, however I did find a reproducible bug(#1693). An extremely large batch (100 million rows) is generated after the join operation, and lead to the OversizedAllocationException.

We can verify this fact by printing the num_rows in native-engine/datafusion-ext-exprs/src/spark_udf_wrapper.rs.

截屏2025-12-04 19 43 15 截屏2025-12-04 19 42 53

@richox
Copy link
Contributor

richox commented Dec 4, 2025

@XorSum I wonder why do we need to do this manual batch split, the join algorithm itself already have batch_size handling logic, there is bug in it?

fn should_flush(&self) -> bool {
self.lindices.len() >= self.join_params.batch_size
}

Thanks for the comment! I do not yet understand how the join batch_size handling logic works, however I did find a reproducible bug(#1693). An extremely large batch (100 million rows) is generated after the join operation, and lead to the OversizedAllocationException.

In some extreme cases which have a lot of duplicated keys in both join sides, the should_flush() function might not be checked in time, huge batches will be created in thess cases.
however, i dont think this patch can solve the issue because the huge batch has already been generated and cause oom before it is splitted.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Arrow Vector OversizedAllocationException when processing large batches

3 participants