Add new config option maxBatchOutputSize to split batch into chunks before outputs#18680
Add new config option maxBatchOutputSize to split batch into chunks before outputs#18680estolfo wants to merge 28 commits intoelastic:mainfrom
Conversation
🤖 GitHub commentsJust comment with:
|
|
This pull request does not have a backport label. Could you fix it @estolfo? 🙏
|
💛 Build succeeded, but was flaky
Failed CI Steps |
🔍 Preview links for changed docs |
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 16 out of 16 changed files in this pull request and generated 7 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
logstash-core/src/test/java/org/logstash/config/ir/CompiledPipelineTest.java
Show resolved
Hide resolved
logstash-core/src/test/java/org/logstash/config/ir/CompiledPipelineTest.java
Outdated
Show resolved
Hide resolved
logstash-core/src/test/java/org/logstash/config/ir/CompiledPipelineTest.java
Outdated
Show resolved
Hide resolved
logstash-core/src/test/java/org/logstash/config/ir/CompiledPipelineTest.java
Outdated
Show resolved
Hide resolved
logstash-core/src/main/java/org/logstash/config/ir/CompiledPipeline.java
Show resolved
Hide resolved
| Setting::PositiveIntegerSetting.new("pipeline.batch.size", 125), | ||
| Setting::NumericSetting.new("pipeline.batch.delay", 50), # in milliseconds | ||
| Setting::NumericSetting.new("pipeline.batch.max_output_size", 0), # 0 means unlimited | ||
| Setting::BooleanSetting.new("pipeline.unsafe_shutdown", false), |
There was a problem hiding this comment.
pipeline.batch.max_output_size is registered as a NumericSetting, which accepts floats and negative numbers without validation (see org.logstash.settings.NumericSetting). Since this value is used as an event-count limit, it should be an integer and should be validated as >= 0 (with 0 meaning unlimited). Consider introducing a dedicated non-negative integer setting (e.g., a variant of PositiveIntegerSetting that allows 0) and using it here so invalid values fail fast at startup.
There was a problem hiding this comment.
I wonder if we should envision a set of defaults like “unlimited” (no chunking), “batch_size” (default to batch size) or positive number?
I think we will want to support these three scenarios in the future and the decision now should not block us from that in the future.
There was a problem hiding this comment.
When you say unlimited, what do you have in mind?
I thought unlimited was the same as batch size-- without a max_output_size set, the code will pass the whole batch to the outputs, unchunked.
There was a problem hiding this comment.
I can see two "defaults":
- "entire batch": all the events present in the batch object
- "pipeline's default batch size": chunk based on the default batch size of the pipeline
If I have batch size == 1000 it'd be nice that my output section chunks along the 1000 mark. This is mostly about ergonomics: I could see that in a Logstash 10.x we would default to chunking to the default batch size of the pipeline.
I think there are other ergonomic factors to take into account here, where a max_output_size of 1000 would mean a 1001 batch would cause two "smaller batches" of 1000 and 1, causing a lot resource waste to send a single event batch
There was a problem hiding this comment.
If there is a default batch size of the pipeline, how would the batch end up having more than that amount?
I'm seeing that there are always batchSize number of items read from the queue (here and here)
About the batches of 1000 and 1 items, how likely is that scenario? And do you think writing code to handle it is worth the overhead, when chunking would provide benefits in most cases?
| | `pipeline.workers` | The number of workers that will, in parallel, execute the filter and outputstages of the pipeline. This setting uses the[`java.lang.Runtime.getRuntime.availableProcessors`](https://docs.oracle.com/javase/7/docs/api/java/lang/Runtime.md#availableProcessors())value as a default if not overridden by `pipeline.workers` in `pipelines.yml` or`pipeline.workers` from `logstash.yml`. If you have modified this setting andsee that events are backing up, or that the CPU is not saturated, considerincreasing this number to better utilize machine processing power. | Number of the host’s CPU cores | | ||
| | `pipeline.batch.size` | The maximum number of events an individual worker thread will collect from inputs before attempting to execute its filters and outputs. Larger batch sizes are generally more efficient, but come at the cost of increased memory overhead. You may need to increase JVM heap space in the `jvm.options` config file. See [Logstash Configuration Files](/reference/config-setting-files.md) for more info. | `125` | | ||
| | `pipeline.batch.delay` | When creating pipeline event batches, how long in milliseconds to wait for each event before dispatching an undersized batch to pipeline workers. | `50` | | ||
| | `pipeline.batch.max_output_size` | Maximum number of events that are passed together as a chunk to the outputs after being filtered. The default is 0 (unlimited). | `0` | |
There was a problem hiding this comment.
I wonder if unlimited can create some confusion..maybe we should frame it as: of the batch of events that passed through the filter section, how many to pass at time to the output section. 0 would then mean the entire batch.
also not super happy about how I worded it... 🤔
| final int maxChunkSize = (maxBatchOutputSize > 0) ? maxBatchOutputSize : totalSize; | ||
|
|
||
| // send to consumer in chunks | ||
| for (int offset = 0; offset < totalSize; offset += maxChunkSize) { |
There was a problem hiding this comment.
should we short circuit here when maxChunkSize is 0? i.e. will it still allocate a new array and put the entire batch on it for the unlimited case?
There was a problem hiding this comment.
you and copilot are on the same page :) #18680 (comment)
You're right, I'm looking at changing the code now.
💚 Build Succeeded
History
|
This PR adds a new config option,
pipeline.batch.max_output_sizewhich limits the number of events sent to outputs after filtering in a pipeline.After events are filtered in a pipeline, they are sent in chunks of
max_output_sizeto the outputs. This helps mitigate some OOM issues observed in the past due to a split filter exploding the size of batches.Related to issue logstash-plugins/logstash-filter-split#48
Further work to add a config option limiting the chunk size in bytes could also help mitigate OOM issues, though this config option might be sufficient for the majority of cases.