-
Notifications
You must be signed in to change notification settings - Fork 527
feat: introduce semaphore-based scheduling backpressure #5753
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
Lance's read pipeline is split into independent scheduling and decoding loops, designed to maximize both IO and CPU parallelism. When we schedule a range of rows to read, the schedule breaks those ranges up into "scan lines" and issues requests against the storage backend at the maximum parallelism advertised by the backend. For each scan line, a message is sent over an unbounded mpsc channel to the decoder. The message includes futures the wrap the results of the column download operations. Although requests against storage are limited by the advertised capabilities of the store, the limitation applies only to in-flight IO requests. There is nothing that stops results of completed IO requests from piling up in memory prior to decoding. This works fine when IO is slower than decoding, but if IO is signficantly faster than decoding (for example, RAM or NVME-based storage, or particularly slow decoding tasks), we can eagerly accumulate a lot of downloaded data in memory a lot sooner than we need it. This patch introduces a semaphore-based backpressure mechanism. Permits are required prior to scheduling and released after completion of the decoding futures, limiting the amount of work that can accumulate between the scheduler and the consumer.
|
this is WIP/discussion state -- there are some unresolved questions. One big question is how to make this work optimally when working against a storage-backed memory cache, which is the original reason for the experiment. The problem is, some requests will behave like object storage and some will behave like memory. We may need something smarter/more adaptive than a fixed limit on the channel size to accommodate this. It may be useful to allow the caller to bring their own semaphore, in order to keep this complexity out of lance itself. Also, I'm not sure if the IO latency simulation in the benchmark is good enough. If we merge this we will probably want to scale the benchmark back and make it a bit more targeted. Here are the current textual benchmark results ("unbounded" is current behavior): I validated the effectiveness in my real application's memory cache but as mentioned I think more work is required to cover the cold data case. |
|
another issue with this patch, is that it precludes some IO coalescing that is currently performed by the unbounded case. I think ideally we would have some way to independently control the degree of coalescing and the degree of permitting/readahead, but currently the coalescing is done in the same step (scheduling) as the storage request dispatch. I was thinking for this it could make sense to split scheduling/decoding into plan/schedule/decode or schedule/execute/decode. Then the permitting could happen between plan and schedule, which would be downstream of IO coalescing. |
| let next_task = ReadBatchTask { | ||
| task: task.boxed(), | ||
| num_rows: num_rows as u32, | ||
| backpressure_permits: Vec::new(), // Permits are now inside the task future |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think my preference would be to just remove the tokio::spawn call up above this point. Does that solve the backpressure issue? See this change here for an example: a9efde8
Lance's read pipeline is split into independent scheduling and decoding loops, designed to maximize both IO and CPU parallelism.
When we schedule a range of rows to read, the scheduler breaks those ranges up into "scan lines" and issues requests against the storage backend at the maximum parallelism advertised by the backend. For each scan line, a message is sent over an unbounded mpsc channel to the decoder. The message includes futures that wrap the results of the column download operations.
Although requests against storage are limited by the advertised capabilities of the store, the limitation applies only to in-flight IO requests. There is nothing that stops results of completed IO requests from piling up in memory prior to decoding. This works fine when IO is slower than decoding, but if IO is significantly faster than decoding (for example, RAM or NVME-based storage, or particularly slow decoding tasks), we can eagerly accumulate a lot of downloaded data in memory a lot sooner than we need it.
This patch introduces a semaphore-based backpressure mechanism. Permits are required prior to scheduling and released after completion of the decoding futures, limiting the amount of work that can accumulate between the scheduler and the consumer.