diff --git a/dataframe.cabal b/dataframe.cabal index 615720c..5cb8921 100644 --- a/dataframe.cabal +++ b/dataframe.cabal @@ -77,10 +77,10 @@ library DataFrame.IO.JSON, DataFrame.IO.Unstable.CSV, DataFrame.IO.Parquet, - DataFrame.IO.Parquet.Binary - DataFrame.IO.Parquet.Dictionary - DataFrame.IO.Parquet.Levels - DataFrame.IO.Parquet.Thrift + DataFrame.IO.Parquet.Binary, + DataFrame.IO.Parquet.Dictionary, + DataFrame.IO.Parquet.Levels, + DataFrame.IO.Parquet.Thrift, DataFrame.IO.Parquet.ColumnStatistics, DataFrame.IO.Parquet.Compression, DataFrame.IO.Parquet.Encoding, @@ -117,7 +117,10 @@ library zlib >= 0.5 && < 1, zstd >= 0.1.2.0 && < 0.2, mmap >= 0.5.8 && < 0.6, - parallel >= 3.2.2.0 && < 5 + parallel >= 3.2.2.0 && < 5, + filepath >= 1.4 && < 2, + Glob >= 0.10 && < 1, + hs-source-dirs: src c-sources: cbits/process_csv.c include-dirs: cbits diff --git a/src/DataFrame.hs b/src/DataFrame.hs index d11b760..c50c43b 100644 --- a/src/DataFrame.hs +++ b/src/DataFrame.hs @@ -251,7 +251,7 @@ import DataFrame.IO.CSV as CSV ( writeCsv, writeSeparated, ) -import DataFrame.IO.Parquet as Parquet (readParquet) +import DataFrame.IO.Parquet as Parquet (readParquet, readParquetFiles) import DataFrame.IO.Unstable.CSV as UnstableCSV ( fastReadCsvUnstable, fastReadTsvUnstable, diff --git a/src/DataFrame/IO/Parquet.hs b/src/DataFrame/IO/Parquet.hs index dca9b47..7079ece 100644 --- a/src/DataFrame/IO/Parquet.hs +++ b/src/DataFrame/IO/Parquet.hs @@ -2,7 +2,10 @@ {-# LANGUAGE RecordWildCards #-} {-# LANGUAGE TypeApplications #-} -module DataFrame.IO.Parquet where +module DataFrame.IO.Parquet ( + readParquet, + readParquetFiles, +) where import Control.Monad import Data.Bits @@ -18,12 +21,18 @@ import Data.Word import qualified DataFrame.Internal.Column as DI import DataFrame.Internal.DataFrame (DataFrame) import qualified DataFrame.Operations.Core as DI +import DataFrame.Operations.Merge () +import System.FilePath.Glob (glob) + import DataFrame.IO.Parquet.Dictionary import DataFrame.IO.Parquet.Levels import DataFrame.IO.Parquet.Page import DataFrame.IO.Parquet.Thrift import DataFrame.IO.Parquet.Types +import System.Directory (doesDirectoryExist) + +import System.FilePath (()) {- | Read a parquet file from path and load it into a dataframe. @@ -107,6 +116,24 @@ readParquet path = do pure $ DI.fromNamedColumns orderedColumns +readParquetFiles :: FilePath -> IO DataFrame +readParquetFiles path = do + isDir <- doesDirectoryExist path + + let pat = if isDir then path "*" else path + + matches <- glob pat + + files <- filterM (fmap not . doesDirectoryExist) matches + + case files of + [] -> + error $ + "readParquetFiles: no parquet files found for " ++ path + _ -> do + dfs <- mapM readParquet files + pure (mconcat dfs) + readMetadataFromPath :: FilePath -> IO FileMetadata readMetadataFromPath path = do contents <- BSO.readFile path