Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 8 additions & 5 deletions dataframe.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -77,10 +77,10 @@ library
DataFrame.IO.JSON,
DataFrame.IO.Unstable.CSV,
DataFrame.IO.Parquet,
DataFrame.IO.Parquet.Binary
DataFrame.IO.Parquet.Dictionary
DataFrame.IO.Parquet.Levels
DataFrame.IO.Parquet.Thrift
DataFrame.IO.Parquet.Binary,
DataFrame.IO.Parquet.Dictionary,
DataFrame.IO.Parquet.Levels,
DataFrame.IO.Parquet.Thrift,
DataFrame.IO.Parquet.ColumnStatistics,
DataFrame.IO.Parquet.Compression,
DataFrame.IO.Parquet.Encoding,
Expand Down Expand Up @@ -117,7 +117,10 @@ library
zlib >= 0.5 && < 1,
zstd >= 0.1.2.0 && < 0.2,
mmap >= 0.5.8 && < 0.6,
parallel >= 3.2.2.0 && < 5
parallel >= 3.2.2.0 && < 5,
filepath >= 1.4 && < 2,
Glob >= 0.10 && < 1,

hs-source-dirs: src
c-sources: cbits/process_csv.c
include-dirs: cbits
Expand Down
2 changes: 1 addition & 1 deletion src/DataFrame.hs
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,7 @@ import DataFrame.IO.CSV as CSV (
writeCsv,
writeSeparated,
)
import DataFrame.IO.Parquet as Parquet (readParquet)
import DataFrame.IO.Parquet as Parquet (readParquet, readParquetFiles)
import DataFrame.IO.Unstable.CSV as UnstableCSV (
fastReadCsvUnstable,
fastReadTsvUnstable,
Expand Down
29 changes: 28 additions & 1 deletion src/DataFrame/IO/Parquet.hs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,10 @@
{-# LANGUAGE RecordWildCards #-}
{-# LANGUAGE TypeApplications #-}

module DataFrame.IO.Parquet where
module DataFrame.IO.Parquet (
readParquet,
readParquetFiles,
) where

import Control.Monad
import Data.Bits
Expand All @@ -18,12 +21,18 @@ import Data.Word
import qualified DataFrame.Internal.Column as DI
import DataFrame.Internal.DataFrame (DataFrame)
import qualified DataFrame.Operations.Core as DI
import DataFrame.Operations.Merge ()
import System.FilePath.Glob (glob)


import DataFrame.IO.Parquet.Dictionary
import DataFrame.IO.Parquet.Levels
import DataFrame.IO.Parquet.Page
import DataFrame.IO.Parquet.Thrift
import DataFrame.IO.Parquet.Types
import System.Directory (doesDirectoryExist)

import System.FilePath ((</>))

{- | Read a parquet file from path and load it into a dataframe.

Expand Down Expand Up @@ -107,6 +116,24 @@ readParquet path = do

pure $ DI.fromNamedColumns orderedColumns

readParquetFiles :: FilePath -> IO DataFrame
readParquetFiles path = do
isDir <- doesDirectoryExist path

let pat = if isDir then path </> "*" else path

matches <- glob pat

files <- filterM (fmap not . doesDirectoryExist) matches

case files of
[] ->
error $
"readParquetFiles: no parquet files found for " ++ path
_ -> do
dfs <- mapM readParquet files
pure (mconcat dfs)

readMetadataFromPath :: FilePath -> IO FileMetadata
readMetadataFromPath path = do
contents <- BSO.readFile path
Expand Down
Loading