Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion DataParser.Console/DataParser.Console.fsproj
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFramework>net9.0</TargetFramework>
<TargetFramework>net10.0</TargetFramework>
<DockerDefaultTargetOS>Linux</DockerDefaultTargetOS>
</PropertyGroup>

Expand All @@ -18,6 +18,9 @@
<Compile Include="DataFileParseResult.fs" />
<Compile Include="FileRead.fs" />
<Compile Include="FileWrite.fs" />
<Compile Include="FreeIO.fs" />
<Compile Include="FreeIOInterpreter.fs" />
<Compile Include="PureIOInterpreter.fs" />
<Compile Include="Program.fs"/>
</ItemGroup>

Expand Down
63 changes: 63 additions & 0 deletions DataParser.Console/FreeIO.fs
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
namespace DataParser.Console

open ResultMap
open DataParser.Console.DataFiles
open DataParser.Console.Core
open DataParser.Console.FormatFiles

/// IO algebra for the application (free monad instructions)
type IO<'next> =
| ReadSpecs of folder:string * (ResultMap<FormatName, FormatLine list, Error> -> 'next)
| ParseDataFile of dataFile:DataFileFormat * (Result<DataFileParseResult, Error list> -> 'next)
| WriteOutput of folder:string * DataFileParseResult * (unit -> 'next)
| LogError of string * (unit -> 'next)
| LogInfo of string * (unit -> 'next)

/// Free monad over IO
type FreeIO<'a> =
| Pure of 'a
| Free of IO<FreeIO<'a>>

module FreeIOOps =
let rec bind (f:'a -> FreeIO<'b>) (m:FreeIO<'a>) : FreeIO<'b> =
match m with
| Pure x -> f x
| Free op ->
match op with
| ReadSpecs(path, next) -> Free(ReadSpecs(path, next >> bind f))
| ParseDataFile(df, next) -> Free(ParseDataFile(df, next >> bind f))
| WriteOutput(folder, r, next) -> Free(WriteOutput(folder, r, next >> bind f))
| LogError(msg, next) -> Free(LogError(msg, next >> bind f))
| LogInfo(msg, next) -> Free(LogInfo(msg, next >> bind f))

let map f m = bind (f >> Pure) m

let liftF (op: IO<'a>) : FreeIO<'a> =
match op with
| ReadSpecs(p, k) -> Free(ReadSpecs(p, k >> Pure))
| ParseDataFile(d, k) -> Free(ParseDataFile(d, k >> Pure))
| WriteOutput(fol, r, k) -> Free(WriteOutput(fol, r, k >> Pure))
| LogError(msg, k) -> Free(LogError(msg, k >> Pure))
| LogInfo(msg, k) -> Free(LogInfo(msg, k >> Pure))

// smart constructors
let readSpecs folder = Free(ReadSpecs(folder, Pure))
let parseDataFile df = Free(ParseDataFile(df, Pure))
let writeOutput folder result = Free(WriteOutput(folder, result, Pure))
let logError msg = Free(LogError(msg, Pure))
let logInfo msg = Free(LogInfo(msg, Pure))

type IOBuilder() =
member _.Bind(m, f) = bind f m
member _.Return(x) = Pure x
member _.ReturnFrom(x:FreeIO<'a>) = x
member _.Zero() = Pure ()
member _.Delay(f: unit -> FreeIO<'a>) = f()
member _.For(seq: seq<'a>, body: 'a -> FreeIO<unit>) : FreeIO<unit> =
// fold left to sequence effects in order
Seq.fold (fun acc v -> bind (fun _ -> body v) acc) (Pure ()) seq
member _.Combine(comp, cont) =
// sequence two computations: run comp then cont
bind (fun _ -> cont) comp

let io = IOBuilder()
29 changes: 29 additions & 0 deletions DataParser.Console/FreeIOInterpreter.fs
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
namespace DataParser.Console

open System.Threading.Tasks
open DataParser.Console.FileRead

module FreeIOInterpreter =

let rec interpret = function
| Pure x -> Task.FromResult x
| Free (ReadSpecs(folder, next)) -> task {
let! specs = readAllSpecFilesAsync folder
return! interpret (next specs)
}
| Free (ParseDataFile(df, next)) -> task {
let! r = parseDataFile df
return! interpret (next r)
}
| Free (WriteOutput(folder, result, next)) -> task {
do! DataParser.Console.FileWrite.writeOutputFileAsync folder result
return! interpret (next())
}
| Free (LogError(msg, next)) -> task {
eprintfn "%s" msg
return! interpret (next())
}
| Free (LogInfo(msg, next)) -> task {
printfn "%s" msg
return! interpret (next())
}
74 changes: 28 additions & 46 deletions DataParser.Console/Program.fs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
open System
open System.Threading.Tasks
open System
open DataParser.Console.FreeIOOps
open DataParser.Console.FreeIOInterpreter
open DataParser.Console.FileRead
open DataParser.Console.FileWrite
open ResultMap

[<Literal>]
Expand All @@ -13,52 +13,34 @@ let DataFolderPath = "./data"
[<Literal>]
let OutputFolderPath = "./output"

let okHandler _ = writeOutputFileAsync OutputFolderPath
let makeProgram specFolder dataFolder outputFolder =
io {
do! logInfo "Reading spec files..."
let! specs = readSpecs specFolder

let errorHandler filePath errors =
eprintfn $"Error occurred during processing data file: {filePath}. Errors are : %+A{errors}"
let dataFileInfos = getDataFileInfos dataFolder

let consolidateResultsAsync (ResultMap dataFileFormats) =
let folder acc k = function
| Ok dataFileFormat ->
task {
let! parseResult = parseDataFile dataFileFormat
return! Map.add <!> Task.singleton k <*> Task.singleton parseResult <*> acc
}
| Error e ->
Map.add <!> Task.singleton k <*> Task.singleton (Error e) <*> acc

Map.fold folder (Task.singleton Map.empty) dataFileFormats
|> Task.map ResultMap

printfn "Reading spec files..."

let t =
task {
let! specs = readAllSpecFilesAsync SpecFolderPath

let dataFileInfos = getDataFileInfos DataFolderPath

printfn "Parsing data files..."
do! logInfo "Parsing data files..."
let dataFileFormats = getDataFileFormats specs dataFileInfos

let! consolidatedResults = consolidateResultsAsync dataFileFormats

let result =
let errorHandler filePath =
Task.toUnit << Task.fromUnit << Task.singleton << errorHandler filePath
ResultMap.either okHandler errorHandler consolidatedResults

printfn "Writing to output folder..."
let tasks =
result
|> Map.values
|> Seq.map Task.fromUnit
|> Seq.toArray
do! Task.WhenAll tasks

printfn "Processing complete. Press Enter to exit."
ignore <| Console.ReadLine()
// iterate over entries and parse/write
let (ResultMap m) = dataFileFormats
for KeyValue(filePath, entry) in m do
match entry with
| Ok dataFileFormat ->
let! parseResult = DataParser.Console.FreeIOOps.parseDataFile dataFileFormat
match parseResult with
| Ok parseRes -> do! writeOutput outputFolder parseRes
| Error errs -> do! logError (sprintf "Error occurred during processing data file: %A. Errors are : %A" filePath errs)
| Error errs ->
do! logError (sprintf "Error occurred during processing data file: %A. Errors are : %A" filePath errs)

do! logInfo "Processing complete."
}

t.GetAwaiter().GetResult()
// run the program (when executed as an app)
interpret (makeProgram SpecFolderPath DataFolderPath OutputFolderPath)
|> Task.runSynchronously

printfn "Processing complete. Press Enter to exit."
ignore <| Console.ReadLine()
43 changes: 43 additions & 0 deletions DataParser.Console/PureIoInterpreter.fs
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
namespace DataParser.Console

open ResultMap
open DataParser.Console.DataFiles

module PureIOInterpreter =
open Core
open FormatFiles

type RecordedAction =
| ReadSpecs of folder: string
| ParseDataFile of filePath: string
| WriteOutput of folder: string * DataFileParseResult
| LogError of string
| LogInfo of string

/// Interpret a FreeIO program purely by invoking provided handlers and recording actions.
let rec interpretPure (onReadSpecs: string -> ResultMap<FormatName, FormatLine list, Error>)
(onParse: DataFileFormat -> Result<DataFileParseResult, Error list>)
(program: FreeIO<'a>) : ('a * RecordedAction list) =
match program with
| Pure x -> (x, [])
| Free op ->
match op with
| IO.ReadSpecs(folder, next) ->
let specs = onReadSpecs folder
let (res, actions) = interpretPure onReadSpecs onParse (next specs)
(res, RecordedAction.ReadSpecs folder :: actions)
| IO.ParseDataFile(df, next) ->
// choose to call the parse handler and record which file was parsed
let parseRes = onParse df
let (res, actions) = interpretPure onReadSpecs onParse (next parseRes)
let filePathStr = match df.FilePath with FilePath s -> s
(res, RecordedAction.ParseDataFile filePathStr :: actions)
| IO.WriteOutput(folder, result, next) ->
let (res, actions) = interpretPure onReadSpecs onParse (next())
(res, RecordedAction.WriteOutput(folder, result) :: actions)
| IO.LogError(msg, next) ->
let (res, actions) = interpretPure onReadSpecs onParse (next())
(res, RecordedAction.LogError msg :: actions)
| IO.LogInfo(msg, next) ->
let (res, actions) = interpretPure onReadSpecs onParse (next())
(res, RecordedAction.LogInfo msg :: actions)
2 changes: 2 additions & 0 deletions DataParser.Console/Task.fs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ module Task

open System.Threading.Tasks

let runSynchronously (task: Task) = task.GetAwaiter().GetResult()

let map = (<!>)

let toUnit (x: Task) = task { do! x }
Expand Down
2 changes: 1 addition & 1 deletion DataParser.Tests/DataParser.Tests.fsproj
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<TargetFramework>net9.0</TargetFramework>
<TargetFramework>net10.0</TargetFramework>

<IsPackable>false</IsPackable>
<GenerateProgramFile>false</GenerateProgramFile>
Expand Down
57 changes: 57 additions & 0 deletions DataParser.Tests/ProgramTests.fs
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
module DataParser.Tests.ProgramTests

open System
open System.IO
open Swensen.Unquote
open DataParser.Console
open ResultMap

open Xunit

[<Fact>]
let ``program should request specs, parse data and request write`` () =
// arrange: create a temp workspace with specs and data
let baseDir = Path.Combine(Path.GetTempPath(), Guid.NewGuid().ToString())
Directory.CreateDirectory(baseDir) |> ignore
let specsDir = Path.Combine(baseDir, "specs")
let dataDir = Path.Combine(baseDir, "data")
let outDir = Path.Combine(baseDir, "output")
Directory.CreateDirectory(specsDir) |> ignore
Directory.CreateDirectory(dataDir) |> ignore
Directory.CreateDirectory(outDir) |> ignore

// write a simple spec file
let specContent = "\"column name\",width,datatype\nname,10,TEXT\n"
File.WriteAllText(Path.Combine(specsDir, "person.csv"), specContent)

// write a simple data file with a name of 10 chars
let dataLine = "ABCDEFGHIJ\n"
File.WriteAllText(Path.Combine(dataDir, "person_2020-01-01.txt"), dataLine)

// handlers that the pure interpreter will use
let readHandler (folder: string) : ResultMap<FormatName, FormatLine list, Error> =
// read all csv files and parse them using existing parser
let items =
Directory.GetFiles(folder, "*.csv")
|> Array.map (fun fp ->
let formatName = FormatName (Path.GetFileNameWithoutExtension fp)
let text = File.ReadAllText(fp)
let parsed = DataParser.Console.FormatFiles.parseFormatFile text
formatName, parsed)
ResultMap << Map.ofArray <| items

let parseHandler (df: DataFileFormat) : Result<DataFileParseResult, Error list> =
// reuse the real parse path by calling the task-based parser synchronously
DataParser.Console.FileRead.parseDataFile df |> fun t -> t.GetAwaiter().GetResult()

// act: build the program with our temp dirs and run the pure interpreter
let program = Program.makeProgram specsDir dataDir outDir
let ((), actions) = PureIoInterpreter.interpretPure readHandler parseHandler program

// assert: actions include ReadSpecs, ParseDataFile, WriteOutput and final LogInfo
let hasReadSpecs = actions |> List.exists (function PureIoInterpreter.ReadSpecs _ -> true | _ -> false)
let hasParse = actions |> List.exists (function PureIoInterpreter.ParseDataFile _ -> true | _ -> false)
let hasWrite = actions |> List.exists (function PureIoInterpreter.WriteOutput(_, _) -> true | _ -> false)
let hasFinalLog = actions |> List.exists (function PureIoInterpreter.LogInfo m -> m = "Processing complete." | _ -> false)

test <@ hasReadSpecs = true && hasParse = true && hasWrite = true && hasFinalLog = true @>
Loading