diff --git a/DataParser.Console/DataParser.Console.fsproj b/DataParser.Console/DataParser.Console.fsproj index 6ff1e0e..43ef940 100644 --- a/DataParser.Console/DataParser.Console.fsproj +++ b/DataParser.Console/DataParser.Console.fsproj @@ -2,7 +2,7 @@ Exe - net9.0 + net10.0 Linux @@ -18,6 +18,9 @@ + + + diff --git a/DataParser.Console/FreeIO.fs b/DataParser.Console/FreeIO.fs new file mode 100644 index 0000000..976d3a1 --- /dev/null +++ b/DataParser.Console/FreeIO.fs @@ -0,0 +1,63 @@ +namespace DataParser.Console + +open ResultMap +open DataParser.Console.DataFiles +open DataParser.Console.Core +open DataParser.Console.FormatFiles + +/// IO algebra for the application (free monad instructions) +type IO<'next> = + | ReadSpecs of folder:string * (ResultMap -> 'next) + | ParseDataFile of dataFile:DataFileFormat * (Result -> 'next) + | WriteOutput of folder:string * DataFileParseResult * (unit -> 'next) + | LogError of string * (unit -> 'next) + | LogInfo of string * (unit -> 'next) + +/// Free monad over IO +type FreeIO<'a> = + | Pure of 'a + | Free of IO> + +module FreeIOOps = + let rec bind (f:'a -> FreeIO<'b>) (m:FreeIO<'a>) : FreeIO<'b> = + match m with + | Pure x -> f x + | Free op -> + match op with + | ReadSpecs(path, next) -> Free(ReadSpecs(path, next >> bind f)) + | ParseDataFile(df, next) -> Free(ParseDataFile(df, next >> bind f)) + | WriteOutput(folder, r, next) -> Free(WriteOutput(folder, r, next >> bind f)) + | LogError(msg, next) -> Free(LogError(msg, next >> bind f)) + | LogInfo(msg, next) -> Free(LogInfo(msg, next >> bind f)) + + let map f m = bind (f >> Pure) m + + let liftF (op: IO<'a>) : FreeIO<'a> = + match op with + | ReadSpecs(p, k) -> Free(ReadSpecs(p, k >> Pure)) + | ParseDataFile(d, k) -> Free(ParseDataFile(d, k >> Pure)) + | WriteOutput(fol, r, k) -> Free(WriteOutput(fol, r, k >> Pure)) + | LogError(msg, k) -> Free(LogError(msg, k >> Pure)) + | LogInfo(msg, k) -> Free(LogInfo(msg, k >> Pure)) + + // smart constructors + let readSpecs folder = Free(ReadSpecs(folder, Pure)) + let parseDataFile df = Free(ParseDataFile(df, Pure)) + let writeOutput folder result = Free(WriteOutput(folder, result, Pure)) + let logError msg = Free(LogError(msg, Pure)) + let logInfo msg = Free(LogInfo(msg, Pure)) + + type IOBuilder() = + member _.Bind(m, f) = bind f m + member _.Return(x) = Pure x + member _.ReturnFrom(x:FreeIO<'a>) = x + member _.Zero() = Pure () + member _.Delay(f: unit -> FreeIO<'a>) = f() + member _.For(seq: seq<'a>, body: 'a -> FreeIO) : FreeIO = + // fold left to sequence effects in order + Seq.fold (fun acc v -> bind (fun _ -> body v) acc) (Pure ()) seq + member _.Combine(comp, cont) = + // sequence two computations: run comp then cont + bind (fun _ -> cont) comp + + let io = IOBuilder() diff --git a/DataParser.Console/FreeIOInterpreter.fs b/DataParser.Console/FreeIOInterpreter.fs new file mode 100644 index 0000000..5dbcf1b --- /dev/null +++ b/DataParser.Console/FreeIOInterpreter.fs @@ -0,0 +1,29 @@ +namespace DataParser.Console + +open System.Threading.Tasks +open DataParser.Console.FileRead + +module FreeIOInterpreter = + + let rec interpret = function + | Pure x -> Task.FromResult x + | Free (ReadSpecs(folder, next)) -> task { + let! specs = readAllSpecFilesAsync folder + return! interpret (next specs) + } + | Free (ParseDataFile(df, next)) -> task { + let! r = parseDataFile df + return! interpret (next r) + } + | Free (WriteOutput(folder, result, next)) -> task { + do! DataParser.Console.FileWrite.writeOutputFileAsync folder result + return! interpret (next()) + } + | Free (LogError(msg, next)) -> task { + eprintfn "%s" msg + return! interpret (next()) + } + | Free (LogInfo(msg, next)) -> task { + printfn "%s" msg + return! interpret (next()) + } diff --git a/DataParser.Console/Program.fs b/DataParser.Console/Program.fs index 56fae78..8fa6646 100644 --- a/DataParser.Console/Program.fs +++ b/DataParser.Console/Program.fs @@ -1,7 +1,7 @@ -open System -open System.Threading.Tasks +open System +open DataParser.Console.FreeIOOps +open DataParser.Console.FreeIOInterpreter open DataParser.Console.FileRead -open DataParser.Console.FileWrite open ResultMap [] @@ -13,52 +13,34 @@ let DataFolderPath = "./data" [] let OutputFolderPath = "./output" -let okHandler _ = writeOutputFileAsync OutputFolderPath +let makeProgram specFolder dataFolder outputFolder = + io { + do! logInfo "Reading spec files..." + let! specs = readSpecs specFolder -let errorHandler filePath errors = - eprintfn $"Error occurred during processing data file: {filePath}. Errors are : %+A{errors}" + let dataFileInfos = getDataFileInfos dataFolder -let consolidateResultsAsync (ResultMap dataFileFormats) = - let folder acc k = function - | Ok dataFileFormat -> - task { - let! parseResult = parseDataFile dataFileFormat - return! Map.add Task.singleton k <*> Task.singleton parseResult <*> acc - } - | Error e -> - Map.add Task.singleton k <*> Task.singleton (Error e) <*> acc - - Map.fold folder (Task.singleton Map.empty) dataFileFormats - |> Task.map ResultMap - -printfn "Reading spec files..." - -let t = - task { - let! specs = readAllSpecFilesAsync SpecFolderPath - - let dataFileInfos = getDataFileInfos DataFolderPath - - printfn "Parsing data files..." + do! logInfo "Parsing data files..." let dataFileFormats = getDataFileFormats specs dataFileInfos - let! consolidatedResults = consolidateResultsAsync dataFileFormats - - let result = - let errorHandler filePath = - Task.toUnit << Task.fromUnit << Task.singleton << errorHandler filePath - ResultMap.either okHandler errorHandler consolidatedResults - - printfn "Writing to output folder..." - let tasks = - result - |> Map.values - |> Seq.map Task.fromUnit - |> Seq.toArray - do! Task.WhenAll tasks - - printfn "Processing complete. Press Enter to exit." - ignore <| Console.ReadLine() + // iterate over entries and parse/write + let (ResultMap m) = dataFileFormats + for KeyValue(filePath, entry) in m do + match entry with + | Ok dataFileFormat -> + let! parseResult = DataParser.Console.FreeIOOps.parseDataFile dataFileFormat + match parseResult with + | Ok parseRes -> do! writeOutput outputFolder parseRes + | Error errs -> do! logError (sprintf "Error occurred during processing data file: %A. Errors are : %A" filePath errs) + | Error errs -> + do! logError (sprintf "Error occurred during processing data file: %A. Errors are : %A" filePath errs) + + do! logInfo "Processing complete." } -t.GetAwaiter().GetResult() +// run the program (when executed as an app) +interpret (makeProgram SpecFolderPath DataFolderPath OutputFolderPath) +|> Task.runSynchronously + +printfn "Processing complete. Press Enter to exit." +ignore <| Console.ReadLine() diff --git a/DataParser.Console/PureIoInterpreter.fs b/DataParser.Console/PureIoInterpreter.fs new file mode 100644 index 0000000..4468929 --- /dev/null +++ b/DataParser.Console/PureIoInterpreter.fs @@ -0,0 +1,43 @@ +namespace DataParser.Console + +open ResultMap +open DataParser.Console.DataFiles + +module PureIOInterpreter = + open Core + open FormatFiles + + type RecordedAction = + | ReadSpecs of folder: string + | ParseDataFile of filePath: string + | WriteOutput of folder: string * DataFileParseResult + | LogError of string + | LogInfo of string + + /// Interpret a FreeIO program purely by invoking provided handlers and recording actions. + let rec interpretPure (onReadSpecs: string -> ResultMap) + (onParse: DataFileFormat -> Result) + (program: FreeIO<'a>) : ('a * RecordedAction list) = + match program with + | Pure x -> (x, []) + | Free op -> + match op with + | IO.ReadSpecs(folder, next) -> + let specs = onReadSpecs folder + let (res, actions) = interpretPure onReadSpecs onParse (next specs) + (res, RecordedAction.ReadSpecs folder :: actions) + | IO.ParseDataFile(df, next) -> + // choose to call the parse handler and record which file was parsed + let parseRes = onParse df + let (res, actions) = interpretPure onReadSpecs onParse (next parseRes) + let filePathStr = match df.FilePath with FilePath s -> s + (res, RecordedAction.ParseDataFile filePathStr :: actions) + | IO.WriteOutput(folder, result, next) -> + let (res, actions) = interpretPure onReadSpecs onParse (next()) + (res, RecordedAction.WriteOutput(folder, result) :: actions) + | IO.LogError(msg, next) -> + let (res, actions) = interpretPure onReadSpecs onParse (next()) + (res, RecordedAction.LogError msg :: actions) + | IO.LogInfo(msg, next) -> + let (res, actions) = interpretPure onReadSpecs onParse (next()) + (res, RecordedAction.LogInfo msg :: actions) diff --git a/DataParser.Console/Task.fs b/DataParser.Console/Task.fs index 7c1f0bd..40607f3 100644 --- a/DataParser.Console/Task.fs +++ b/DataParser.Console/Task.fs @@ -3,6 +3,8 @@ module Task open System.Threading.Tasks +let runSynchronously (task: Task) = task.GetAwaiter().GetResult() + let map = () let toUnit (x: Task) = task { do! x } diff --git a/DataParser.Tests/DataParser.Tests.fsproj b/DataParser.Tests/DataParser.Tests.fsproj index 59f85d4..0cae1c9 100644 --- a/DataParser.Tests/DataParser.Tests.fsproj +++ b/DataParser.Tests/DataParser.Tests.fsproj @@ -1,7 +1,7 @@ - net9.0 + net10.0 false false diff --git a/DataParser.Tests/ProgramTests.fs b/DataParser.Tests/ProgramTests.fs new file mode 100644 index 0000000..b79f3fe --- /dev/null +++ b/DataParser.Tests/ProgramTests.fs @@ -0,0 +1,57 @@ +module DataParser.Tests.ProgramTests + +open System +open System.IO +open Swensen.Unquote +open DataParser.Console +open ResultMap + +open Xunit + +[] +let ``program should request specs, parse data and request write`` () = + // arrange: create a temp workspace with specs and data + let baseDir = Path.Combine(Path.GetTempPath(), Guid.NewGuid().ToString()) + Directory.CreateDirectory(baseDir) |> ignore + let specsDir = Path.Combine(baseDir, "specs") + let dataDir = Path.Combine(baseDir, "data") + let outDir = Path.Combine(baseDir, "output") + Directory.CreateDirectory(specsDir) |> ignore + Directory.CreateDirectory(dataDir) |> ignore + Directory.CreateDirectory(outDir) |> ignore + + // write a simple spec file + let specContent = "\"column name\",width,datatype\nname,10,TEXT\n" + File.WriteAllText(Path.Combine(specsDir, "person.csv"), specContent) + + // write a simple data file with a name of 10 chars + let dataLine = "ABCDEFGHIJ\n" + File.WriteAllText(Path.Combine(dataDir, "person_2020-01-01.txt"), dataLine) + + // handlers that the pure interpreter will use + let readHandler (folder: string) : ResultMap = + // read all csv files and parse them using existing parser + let items = + Directory.GetFiles(folder, "*.csv") + |> Array.map (fun fp -> + let formatName = FormatName (Path.GetFileNameWithoutExtension fp) + let text = File.ReadAllText(fp) + let parsed = DataParser.Console.FormatFiles.parseFormatFile text + formatName, parsed) + ResultMap << Map.ofArray <| items + + let parseHandler (df: DataFileFormat) : Result = + // reuse the real parse path by calling the task-based parser synchronously + DataParser.Console.FileRead.parseDataFile df |> fun t -> t.GetAwaiter().GetResult() + + // act: build the program with our temp dirs and run the pure interpreter + let program = Program.makeProgram specsDir dataDir outDir + let ((), actions) = PureIoInterpreter.interpretPure readHandler parseHandler program + + // assert: actions include ReadSpecs, ParseDataFile, WriteOutput and final LogInfo + let hasReadSpecs = actions |> List.exists (function PureIoInterpreter.ReadSpecs _ -> true | _ -> false) + let hasParse = actions |> List.exists (function PureIoInterpreter.ParseDataFile _ -> true | _ -> false) + let hasWrite = actions |> List.exists (function PureIoInterpreter.WriteOutput(_, _) -> true | _ -> false) + let hasFinalLog = actions |> List.exists (function PureIoInterpreter.LogInfo m -> m = "Processing complete." | _ -> false) + + test <@ hasReadSpecs = true && hasParse = true && hasWrite = true && hasFinalLog = true @>