Skip to content

Parquet reader refactor: move to a Streamly-based streaming pipeline (bounded memory, clearer structure, optional concurrency) #133

@mchav

Description

@mchav

Summary

The current Parquet reader in dataframe works, but its control-flow and data-flow are largely “batch oriented”: we tend to read/accumulate buffers and intermediate structures (page bytes, decoded values, etc.) in lists/vectors before producing the final Column/DataFrame. This makes it harder to:

  • guarantee bounded memory for large row groups / wide tables
  • structure the decode pipeline as composable stages
  • add concurrency safely (e.g. per-column-chunk, per-page, per-row-group)
  • instrument progress/backpressure/early termination

This issue proposes refactoring the Parquet reader to use Streamly as the backbone for:
file → row groups → column chunks → pages → decoded values → column builders.

The goal is not to “rewrite everything”, but to introduce a streaming API and progressively migrate internals so we can keep memory stable and make future features easier (predicate pushdown, projection pushdown, row-group skipping, async IO, etc.).


Motivation / Why Streamly

Streamly gives us:

  • Streaming IO with backpressure (avoid holding entire pages/columns in memory)
  • Compositional pipelines (clear “stages” with well-defined responsibilities)
  • Optional concurrency (map/parMap style over row groups / column chunks)
  • A path to incremental decoding + building without inventing our own streaming abstractions

Even if we keep a “read whole column” convenience API at the top-level, the internals can be streaming.


Current implementation (as-is, high level)

This is a conceptual description of what the current reader effectively does (names may differ by module/function):

  1. Open file and parse footer/metadata to get:

    • schema, logical types
    • row groups, column chunks (offsets, sizes, compression codecs)
    • encodings, statistics, dictionary page presence, etc.
  2. For each row group:

    • For each column chunk:

      • Seek to the chunk offset

      • Read chunk bytes (or read page-by-page, but often collecting)

      • Parse PageHeader → read page payload

      • Decode:

        • repetition levels / definition levels
        • dictionary pages + indices
        • data pages (plain / dict / RLE / bit-pack)
        • decompress if needed (snappy/gzip/zstd/etc.)
      • Convert decoded values into a DI.Column / Column representation:

        • handle nullability via def levels
        • handle types (int32/int64/byte array/fixed len byte array, etc.)
        • produce final vector(s), sometimes via intermediate lists/vectors
  3. Combine decoded columns into a DataFrame.

Pain points in the current shape

  • Intermediate accumulation: lists of pages, indices, values, def levels, etc.

  • IO patterns: ad-hoc seeking/reading, unclear buffering strategy, some repeated allocation

  • Difficult to add concurrency without risking spikes in memory or complicating code

  • Harder to implement:

    • “stop early” (e.g. row limit)
    • projection pushdown (read only selected columns)
    • row-group skipping based on stats (without preloading more than necessary)
    • robust progress instrumentation

Proposal: Streamly-based Parquet decode pipeline

Guiding principles

  • Keep a stable public API (or add a streaming variant alongside it).
  • Define a small set of streaming primitives that become the “spine” of Parquet reading.
  • Decode page-by-page, producing incremental column chunks.
  • Avoid “decode everything into lists then convert”; instead “decode stream → builder”.

New internal pipeline (conceptual)

Stage A — Metadata

  • Parse footer/metadata as today (this is small; no need to stream).

Stage B — Row group streaming

  • Stream RowGroupInfo

Stage C — Column chunk streaming

  • For each row group:

    • Stream ColumnChunkInfo

Stage D — Page streaming

  • For a given column chunk:

    • Seek to chunk start

    • Stream:

      • PageHeader
      • PagePayload (decompressed bytes or a decompression stream)

Stage E — Decode streaming

  • Convert page payload stream into typed value stream:

    • handle dictionary pages by updating per-chunk decode state
    • handle data pages by emitting decoded values incrementally
    • handle def/rep levels by emitting Maybe a or a (values + bitmap) representation

Stage F — Column building

  • Feed the decoded stream into a builder:

    • Stream (Maybe a) or Stream a + null bitmap
    • produce final DI.Column / Column at end of stream
    • optionally produce chunked output and concatenate at end

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions