Skip to content

Add new config option maxBatchOutputSize to split batch into chunks before outputs#18680

Open
estolfo wants to merge 28 commits intoelastic:mainfrom
estolfo:split-filter
Open

Add new config option maxBatchOutputSize to split batch into chunks before outputs#18680
estolfo wants to merge 28 commits intoelastic:mainfrom
estolfo:split-filter

Conversation

@estolfo
Copy link
Contributor

@estolfo estolfo commented Jan 30, 2026

This PR adds a new config option, pipeline.batch.max_output_size which limits the number of events sent to outputs after filtering in a pipeline.
After events are filtered in a pipeline, they are sent in chunks of max_output_size to the outputs. This helps mitigate some OOM issues observed in the past due to a split filter exploding the size of batches.
Related to issue logstash-plugins/logstash-filter-split#48

Further work to add a config option limiting the chunk size in bytes could also help mitigate OOM issues, though this config option might be sufficient for the majority of cases.

@github-actions
Copy link
Contributor

🤖 GitHub comments

Just comment with:

  • run docs-build : Re-trigger the docs validation. (use unformatted text in the comment!)
  • /run exhaustive tests : Run the exhaustive tests Buildkite pipeline.

@mergify
Copy link
Contributor

mergify bot commented Jan 30, 2026

This pull request does not have a backport label. Could you fix it @estolfo? 🙏
To fixup this pull request, you need to add the backport labels for the needed
branches, such as:

  • backport-8./d is the label to automatically backport to the 8./d branch. /d is the digit.
  • If no backport is necessary, please add the backport-skip label

@elasticmachine
Copy link

💛 Build succeeded, but was flaky

Failed CI Steps

@github-actions
Copy link
Contributor

github-actions bot commented Feb 9, 2026

Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copilot encountered an error and was unable to review this pull request. You can try again by re-requesting a review.

Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 16 out of 16 changed files in this pull request and generated 7 comments.


💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines 55 to 58
Setting::PositiveIntegerSetting.new("pipeline.batch.size", 125),
Setting::NumericSetting.new("pipeline.batch.delay", 50), # in milliseconds
Setting::NumericSetting.new("pipeline.batch.max_output_size", 0), # 0 means unlimited
Setting::BooleanSetting.new("pipeline.unsafe_shutdown", false),
Copy link

Copilot AI Feb 10, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pipeline.batch.max_output_size is registered as a NumericSetting, which accepts floats and negative numbers without validation (see org.logstash.settings.NumericSetting). Since this value is used as an event-count limit, it should be an integer and should be validated as >= 0 (with 0 meaning unlimited). Consider introducing a dedicated non-negative integer setting (e.g., a variant of PositiveIntegerSetting that allows 0) and using it here so invalid values fail fast at startup.

Copilot uses AI. Check for mistakes.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jsvd what do you think about this? Should I create a new number type? I can't use PositiveIntegerSetting as that doesn't allow 0. The number provided is converted into an integer here

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if we should envision a set of defaults like “unlimited” (no chunking), “batch_size” (default to batch size) or positive number?
I think we will want to support these three scenarios in the future and the decision now should not block us from that in the future.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When you say unlimited, what do you have in mind?
I thought unlimited was the same as batch size-- without a max_output_size set, the code will pass the whole batch to the outputs, unchunked.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can see two "defaults":

  1. "entire batch": all the events present in the batch object
  2. "pipeline's default batch size": chunk based on the default batch size of the pipeline

If I have batch size == 1000 it'd be nice that my output section chunks along the 1000 mark. This is mostly about ergonomics: I could see that in a Logstash 10.x we would default to chunking to the default batch size of the pipeline.

I think there are other ergonomic factors to take into account here, where a max_output_size of 1000 would mean a 1001 batch would cause two "smaller batches" of 1000 and 1, causing a lot resource waste to send a single event batch

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If there is a default batch size of the pipeline, how would the batch end up having more than that amount?
I'm seeing that there are always batchSize number of items read from the queue (here and here)

About the batches of 1000 and 1 items, how likely is that scenario? And do you think writing code to handle it is worth the overhead, when chunking would provide benefits in most cases?

| `pipeline.workers` | The number of workers that will, in parallel, execute the filter and outputstages of the pipeline. This setting uses the[`java.lang.Runtime.getRuntime.availableProcessors`](https://docs.oracle.com/javase/7/docs/api/java/lang/Runtime.md#availableProcessors())value as a default if not overridden by `pipeline.workers` in `pipelines.yml` or`pipeline.workers` from `logstash.yml`. If you have modified this setting andsee that events are backing up, or that the CPU is not saturated, considerincreasing this number to better utilize machine processing power. | Number of the host’s CPU cores |
| `pipeline.batch.size` | The maximum number of events an individual worker thread will collect from inputs before attempting to execute its filters and outputs. Larger batch sizes are generally more efficient, but come at the cost of increased memory overhead. You may need to increase JVM heap space in the `jvm.options` config file. See [Logstash Configuration Files](/reference/config-setting-files.md) for more info. | `125` |
| `pipeline.batch.delay` | When creating pipeline event batches, how long in milliseconds to wait for each event before dispatching an undersized batch to pipeline workers. | `50` |
| `pipeline.batch.max_output_size` | Maximum number of events that are passed together as a chunk to the outputs after being filtered. The default is 0 (unlimited). | `0` |
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if unlimited can create some confusion..maybe we should frame it as: of the batch of events that passed through the filter section, how many to pass at time to the output section. 0 would then mean the entire batch.
also not super happy about how I worded it... 🤔

final int maxChunkSize = (maxBatchOutputSize > 0) ? maxBatchOutputSize : totalSize;

// send to consumer in chunks
for (int offset = 0; offset < totalSize; offset += maxChunkSize) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we short circuit here when maxChunkSize is 0? i.e. will it still allocate a new array and put the entire batch on it for the unlimited case?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you and copilot are on the same page :) #18680 (comment)

You're right, I'm looking at changing the code now.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

resolved in 39a4d7e, dd21464

@elasticmachine
Copy link

💚 Build Succeeded

History

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants