-
Notifications
You must be signed in to change notification settings - Fork 33
Description
Summary
The current Parquet reader in dataframe works, but its control-flow and data-flow are largely “batch oriented”: we tend to read/accumulate buffers and intermediate structures (page bytes, decoded values, etc.) in lists/vectors before producing the final Column/DataFrame. This makes it harder to:
- guarantee bounded memory for large row groups / wide tables
- structure the decode pipeline as composable stages
- add concurrency safely (e.g. per-column-chunk, per-page, per-row-group)
- instrument progress/backpressure/early termination
This issue proposes refactoring the Parquet reader to use Streamly as the backbone for:
file → row groups → column chunks → pages → decoded values → column builders.
The goal is not to “rewrite everything”, but to introduce a streaming API and progressively migrate internals so we can keep memory stable and make future features easier (predicate pushdown, projection pushdown, row-group skipping, async IO, etc.).
Motivation / Why Streamly
Streamly gives us:
- Streaming IO with backpressure (avoid holding entire pages/columns in memory)
- Compositional pipelines (clear “stages” with well-defined responsibilities)
- Optional concurrency (map/parMap style over row groups / column chunks)
- A path to incremental decoding + building without inventing our own streaming abstractions
Even if we keep a “read whole column” convenience API at the top-level, the internals can be streaming.
Current implementation (as-is, high level)
This is a conceptual description of what the current reader effectively does (names may differ by module/function):
-
Open file and parse footer/metadata to get:
- schema, logical types
- row groups, column chunks (offsets, sizes, compression codecs)
- encodings, statistics, dictionary page presence, etc.
-
For each row group:
-
For each column chunk:
-
Seek to the chunk offset
-
Read chunk bytes (or read page-by-page, but often collecting)
-
Parse PageHeader → read page payload
-
Decode:
- repetition levels / definition levels
- dictionary pages + indices
- data pages (plain / dict / RLE / bit-pack)
- decompress if needed (snappy/gzip/zstd/etc.)
-
Convert decoded values into a
DI.Column/Columnrepresentation:- handle nullability via def levels
- handle types (int32/int64/byte array/fixed len byte array, etc.)
- produce final vector(s), sometimes via intermediate lists/vectors
-
-
-
Combine decoded columns into a DataFrame.
Pain points in the current shape
-
Intermediate accumulation: lists of pages, indices, values, def levels, etc.
-
IO patterns: ad-hoc seeking/reading, unclear buffering strategy, some repeated allocation
-
Difficult to add concurrency without risking spikes in memory or complicating code
-
Harder to implement:
- “stop early” (e.g. row limit)
- projection pushdown (read only selected columns)
- row-group skipping based on stats (without preloading more than necessary)
- robust progress instrumentation
Proposal: Streamly-based Parquet decode pipeline
Guiding principles
- Keep a stable public API (or add a streaming variant alongside it).
- Define a small set of streaming primitives that become the “spine” of Parquet reading.
- Decode page-by-page, producing incremental column chunks.
- Avoid “decode everything into lists then convert”; instead “decode stream → builder”.
New internal pipeline (conceptual)
Stage A — Metadata
- Parse footer/metadata as today (this is small; no need to stream).
Stage B — Row group streaming
Stream RowGroupInfo
Stage C — Column chunk streaming
-
For each row group:
Stream ColumnChunkInfo
Stage D — Page streaming
-
For a given column chunk:
-
Seek to chunk start
-
Stream:
PageHeaderPagePayload(decompressed bytes or a decompression stream)
-
Stage E — Decode streaming
-
Convert page payload stream into typed value stream:
- handle dictionary pages by updating per-chunk decode state
- handle data pages by emitting decoded values incrementally
- handle def/rep levels by emitting
Maybe aor a (values + bitmap) representation
Stage F — Column building
-
Feed the decoded stream into a builder:
Stream (Maybe a)orStream a+ null bitmap- produce final
DI.Column/Columnat end of stream - optionally produce chunked output and concatenate at end