From 76cbdf639d502fdd27895556895428067a3a0c32 Mon Sep 17 00:00:00 2001 From: Rex Ng Date: Mon, 8 Sep 2025 20:36:45 +0800 Subject: [PATCH] Use Task-based parsing --- DataParser.Console/DataParser.Console.fsproj | 3 +-- DataParser.Console/FileWrite.fs | 18 +++++++++++++----- DataParser.Console/Map.fs | 10 ---------- DataParser.Console/Program.fs | 18 ++++++++---------- DataParser.Console/ResultMap.fs | 19 ++----------------- DataParser.Console/Task.fs | 18 ++---------------- DataParser.Console/TaskBuilder.fs | 20 -------------------- DataParser.Console/TaskOperators.fs | 15 +++++++++++++++ 8 files changed, 41 insertions(+), 80 deletions(-) delete mode 100644 DataParser.Console/Map.fs delete mode 100644 DataParser.Console/TaskBuilder.fs create mode 100644 DataParser.Console/TaskOperators.fs diff --git a/DataParser.Console/DataParser.Console.fsproj b/DataParser.Console/DataParser.Console.fsproj index 8432683..6ff1e0e 100644 --- a/DataParser.Console/DataParser.Console.fsproj +++ b/DataParser.Console/DataParser.Console.fsproj @@ -7,9 +7,8 @@ - + - diff --git a/DataParser.Console/FileWrite.fs b/DataParser.Console/FileWrite.fs index c5caacd..07aff70 100644 --- a/DataParser.Console/FileWrite.fs +++ b/DataParser.Console/FileWrite.fs @@ -1,15 +1,22 @@ module DataParser.Console.FileWrite +open System open System.Text.Json +open System.Threading.Tasks open System.IO open DataParser.Console.DataFiles -let writeOutputFile folderPath (fileMap : DataFileParseResult) = +let newLineUtf8 = System.Text.Encoding.UTF8.GetBytes Environment.NewLine + +let writeOutputFileAsync folderPath (fileMap : DataFileParseResult) = let serializeElement (JsonObject jsonObject) = - let serialized = JsonSerializer.Serialize jsonObject - System.Text.Encoding.UTF8.GetBytes $"{serialized}\n" + let serialized = JsonSerializer.SerializeToUtf8Bytes jsonObject + Array.concat [| serialized; newLineUtf8 |] - let writeBytes (stream: Stream) (bytes: byte array) = stream.Write bytes + let writeBytesAsync (stream: Stream) (bytes: byte array) = + bytes + |> stream.WriteAsync + |> _.AsTask() let createOutputFilePath = (+) folderPath << sprintf "/%s" << formatOutputFileName @@ -17,4 +24,5 @@ let writeOutputFile folderPath (fileMap : DataFileParseResult) = ignore <| Directory.CreateDirectory folderPath let filePath = createOutputFilePath fileMap.DataFileName use fs = File.Open (filePath, FileMode.Create) - Seq.iter (writeBytes fs << serializeElement) fileMap.JsonElements + let tasks = Seq.map (writeBytesAsync fs << serializeElement) fileMap.JsonElements + Task.WhenAll tasks diff --git a/DataParser.Console/Map.fs b/DataParser.Console/Map.fs deleted file mode 100644 index 28f84fa..0000000 --- a/DataParser.Console/Map.fs +++ /dev/null @@ -1,10 +0,0 @@ -module Map - -open System.Threading.Tasks -open Microsoft.FSharp.Control.TaskBuilder - -let traverseTask (f: 'b -> Task<'c>) = - Map.fold (fun acc k v -> task { - let! t = f v - and! acc' = acc - return Map.add k t acc' }) (task { return Map.empty }) diff --git a/DataParser.Console/Program.fs b/DataParser.Console/Program.fs index 1a895a7..6b4cfc6 100644 --- a/DataParser.Console/Program.fs +++ b/DataParser.Console/Program.fs @@ -1,4 +1,5 @@ open System +open System.Threading.Tasks open DataParser.Console.FileRead open DataParser.Console.FileWrite open ResultMap @@ -12,26 +13,21 @@ let DataFolderPath = "./data" [] let OutputFolderPath = "./output" -let okHandler _ = writeOutputFile OutputFolderPath +let okHandler _ = writeOutputFileAsync OutputFolderPath let errorHandler filePath errors = eprintfn $"Error occurred during processing data file: {filePath}. Errors are : %+A{errors}" let consolidateResults (ResultMap dataFileFormats) = - let folder acc k v = - match v with + let folder acc k = function | Ok dataFileFormat -> task { let! parseResult = parseDataFile dataFileFormat - match parseResult with - | Ok result -> - return! Task.liftA3 Map.add (Task.singleton k) (Task.singleton (Ok result)) acc - | Error e -> - return! Task.liftA3 Map.add (Task.singleton k) (Task.singleton (Error e)) acc + return! Map.add Task.singleton k <*> Task.singleton parseResult <*> acc } | Error e -> task { - return! Task.liftA3 Map.add (Task.singleton k) (Task.singleton (Error e)) acc + return! Map.add Task.singleton k <*> Task.singleton (Error e) <*> acc } Map.fold folder (Task.singleton Map.empty) dataFileFormats @@ -50,8 +46,10 @@ let t = let! consolidatedResults = consolidateResults dataFileFormats + let result = ResultMap.either okHandler ((<<) Task.fromUnit << errorHandler) consolidatedResults + printfn "Writing to output folder..." - ResultMap.biIter okHandler errorHandler consolidatedResults + do! Task.WhenAll(Map.values result) printfn "Processing complete. Press Enter to exit." ignore <| Console.ReadLine() diff --git a/DataParser.Console/ResultMap.fs b/DataParser.Console/ResultMap.fs index a284221..a69f9dc 100644 --- a/DataParser.Console/ResultMap.fs +++ b/DataParser.Console/ResultMap.fs @@ -17,21 +17,6 @@ module ResultMap = ResultMap << Map.map (fun _ -> Result.map f) << unResultMap - - let traverseTask f (ResultMap m) = - let folder acc k v = task { - let! acc' = acc - match v with - | Ok x -> - let! t = f x - return Map.add k (Ok t) acc' - | Error e -> - return Map.add k (Error e) acc' - - } - - Map.fold folder (task { return Map.empty }) m - |> Task.map ResultMap let bindResult f = ResultMap @@ -43,6 +28,6 @@ module ResultMap = let tryFind key (ResultMap m) = Map.tryFind key m - let biIter f g (ResultMap x) = + let either f g (ResultMap x) = let go k = function Ok v -> f k v | Error e -> g k e - Map.iter go x + Map.map go x diff --git a/DataParser.Console/Task.fs b/DataParser.Console/Task.fs index 8738f93..f16fd84 100644 --- a/DataParser.Console/Task.fs +++ b/DataParser.Console/Task.fs @@ -3,28 +3,14 @@ module Task open System.Threading.Tasks -let map f x = task { - let! result = x - return f result -} - -let bind f x = task { - let! result = x - return! f result -} +let map = () let toUnit (x: Task) = task { do! x return () } -let () = map - -let (<*>) (f: Task<'a -> 'b>) (x: Task<'a>) = task { - let tasks = [|f :> Task; x :> Task|] - let! _ = Task.WhenAll(tasks) - return f.Result x.Result -} +let fromUnit (x: unit) = Task.FromResult x :> Task let liftA3 f x y z = f x <*> y <*> z diff --git a/DataParser.Console/TaskBuilder.fs b/DataParser.Console/TaskBuilder.fs deleted file mode 100644 index fcddeab..0000000 --- a/DataParser.Console/TaskBuilder.fs +++ /dev/null @@ -1,20 +0,0 @@ -namespace Microsoft.FSharp.Control - -open System.Threading.Tasks - -type TaskBuilder () = - member _.MergeSources (x: Task<'a>, y: Task<'b>) = task { - let! _ = Task.WhenAll(x :> Task, y :> Task) - return x.Result, y.Result - } - - member _.Bind (p: Task<'a>, k: 'a -> Task<'b>) : Task<'b> = task { - let! v = p - return! k v - } - - member _.Return (v: 'a) : Task<'a> = Task.FromResult v - - -module TaskBuilder = - let task = TaskBuilder() diff --git a/DataParser.Console/TaskOperators.fs b/DataParser.Console/TaskOperators.fs new file mode 100644 index 0000000..583ebf2 --- /dev/null +++ b/DataParser.Console/TaskOperators.fs @@ -0,0 +1,15 @@ +[] +module TaskOperators + +open System.Threading.Tasks + +let () f x = task { + let! result = x + return f result +} + +let (<*>) (f: Task<'a -> 'b>) (x: Task<'a>) = task { + let tasks = [|f :> Task; x :> Task|] + let! _ = Task.WhenAll(tasks) + return f.Result x.Result +}