From bfa6084a3fb35f58ed9fe5945df2d15c51126cd5 Mon Sep 17 00:00:00 2001 From: ms-snarang Date: Tue, 19 Aug 2025 11:59:42 +0530 Subject: [PATCH 1/8] Isolated worker model changes --- FHIRBulkImport/ExportAllOrchestrator.cs | 83 ++++--- FHIRBulkImport/ExportOrchestrator.cs | 271 ++++++++++++---------- FHIRBulkImport/FHIRBulkImport.csproj | 32 ++- FHIRBulkImport/ImportBundleBlobTrigger.cs | 23 +- FHIRBulkImport/ImportBundleEventGrid.cs | 17 +- FHIRBulkImport/ImportBundleHTTP.cs | 43 ++-- FHIRBulkImport/ImportBundleQueue.cs | 26 ++- FHIRBulkImport/ImportCompressedFiles.cs | 21 +- FHIRBulkImport/ImportNDJSONEventGird.cs | 11 +- FHIRBulkImport/ImportNDJSONQueue.cs | 29 +-- FHIRBulkImport/Program.cs | 39 ++++ FHIRBulkImport/host.json | 2 +- scripts/fhirBulkImport.json | 4 +- 13 files changed, 358 insertions(+), 243 deletions(-) create mode 100644 FHIRBulkImport/Program.cs diff --git a/FHIRBulkImport/ExportAllOrchestrator.cs b/FHIRBulkImport/ExportAllOrchestrator.cs index 43a84f4..2de3a0f 100644 --- a/FHIRBulkImport/ExportAllOrchestrator.cs +++ b/FHIRBulkImport/ExportAllOrchestrator.cs @@ -1,14 +1,17 @@ using System.Net.Http; using System.Threading.Tasks; -using Microsoft.Azure.WebJobs; -using Microsoft.Azure.WebJobs.Extensions.DurableTask; -using Microsoft.Azure.WebJobs.Extensions.Http; +using Microsoft.Azure.Functions.Worker; +using Microsoft.Azure.Functions.Worker.Http; using Microsoft.Extensions.Logging; using Newtonsoft.Json.Linq; -using System.Linq; +using System.Linq; using System; using Newtonsoft.Json; using System.Collections.Generic; +using Microsoft.DurableTask; +using Microsoft.DurableTask.Client; +using System.IO; +using System.Threading; namespace FHIRBulkImport { @@ -19,38 +22,44 @@ public class ExportAllOrchestrator private static int _maxInstances = Utils.GetIntEnvironmentVariable("FBI-MAXEXPORTS", "0"); private static int _maxParallelizationCount = 100; private static int _parallelSearchBundleSize = _maxInstances = Utils.GetIntEnvironmentVariable("FBI-PARALLELSEARCHBUNDLESIZE", "50"); - private static RetryOptions _exportAllRetryOptions = new RetryOptions(firstRetryInterval: TimeSpan.FromSeconds(Utils.GetIntEnvironmentVariable("FBI-EXPORTALLRETRYINTERVAL", "30")), maxNumberOfAttempts: 5) - { BackoffCoefficient = Utils.GetIntEnvironmentVariable("FBI-EXPORTALLBACKOFFCOEFFICIENT", "3") }; - - [FunctionName(nameof(ExportAllOrchestrator_HttpStart))] - public async Task ExportAllOrchestrator_HttpStart( - [HttpTrigger(AuthorizationLevel.Function, "post", Route = "$alt-export-all")] HttpRequestMessage req, - [DurableClient] IDurableOrchestrationClient starter, - ILogger log) + private static RetryPolicy _exportAllRetryOptions = new RetryPolicy(firstRetryInterval: TimeSpan.FromSeconds(Utils.GetIntEnvironmentVariable("FBI-EXPORTALLRETRYINTERVAL", "30")), maxNumberOfAttempts: 5, + backoffCoefficient: Convert.ToDouble(Utils.GetIntEnvironmentVariable("FBI-EXPORTALLBACKCOEFFICIENT", "3"))); + + + [Function(nameof(ExportAllOrchestrator_HttpStart))] + public async Task ExportAllOrchestrator_HttpStart( + [HttpTrigger(AuthorizationLevel.Function, "post", Route = "$alt-export-all")] HttpRequestData req, + [DurableClient] DurableTaskClient starter, + FunctionContext context) { - string requestParameters = await req.Content.ReadAsStringAsync(); - var state = await ExportOrchestrator.runningInstances(starter, log); + var log = context.GetLogger("ExportAllOrchestrator_HttpStart"); + string requestParameters = await new StreamReader(req.Body).ReadToEndAsync(); + var orchestrator = new ExportOrchestrator(); + var state = await orchestrator.runningInstances(starter, context); int running = state.Count(); if (_maxInstances > 0 && running >= _maxInstances) { string msg = $"Unable to start export there are {running} exports the max concurrent allowed is {_maxInstances}"; - StringContent sc = new StringContent("{\"error\":\"" + msg + "\""); - return new HttpResponseMessage() { Content = sc, StatusCode = System.Net.HttpStatusCode.TooManyRequests }; + string sc = "{\"error\":\"" + msg + "\"}"; + var response = req.CreateResponse(System.Net.HttpStatusCode.TooManyRequests); + await response.WriteStringAsync(sc); + return response; } // Function input comes from the request content. - string instanceId = await starter.StartNewAsync(nameof(ExportAll_RunOrchestrator), null, requestParameters); + string instanceId = await starter.ScheduleNewOrchestrationInstanceAsync(nameof(ExportAll_RunOrchestrator), requestParameters); log.LogInformation($"Started orchestration with ID = '{instanceId}'."); return starter.CreateCheckStatusResponse(req, instanceId); } - [FunctionName(nameof(ExportAll_RunOrchestrator))] + [Function(nameof(ExportAll_RunOrchestrator))] public async Task ExportAll_RunOrchestrator( - [OrchestrationTrigger] IDurableOrchestrationContext context, - ILogger logger) + [OrchestrationTrigger] TaskOrchestrationContext context, + FunctionContext log) { + var logger = log.GetLogger("ExportAll_RunOrchestrator"); // Setup function level variables. JObject retVal = new JObject(); retVal["instanceid"] = context.InstanceId; @@ -83,10 +92,12 @@ public async Task ExportAll_RunOrchestrator( context.SetCustomStatus(retVal); var parallelizationTasks = options.ParallelSearchRanges.Select( - x => context.CallActivityWithRetryAsync>( + x => context.CallActivityAsync>( nameof(ExportAllOrchestrator_GetCountsForListOfDateRanges), - _exportAllRetryOptions, - new GetCountsForListOfDateRangesRequest(options.ResourceType, x.Select(y => (y.Start, y.End, -1)).ToList()))); + + new GetCountsForListOfDateRangesRequest(options.ResourceType, x.Select(y => (y.Start, y.End, -1)).ToList()), + new TaskOptions(_exportAllRetryOptions) + )); var results = await Task.WhenAll(parallelizationTasks); @@ -114,10 +125,11 @@ public async Task ExportAll_RunOrchestrator( nextLink += $"&_lastUpdated=lt{searchRanges[i].End.Value.ToString("o")}"; } - exportTasks.Add(context.CallActivityWithRetryAsync( + exportTasks.Add(context.CallActivityAsync( nameof(ExportAllOrchestrator_GetAndWriteDataPage), - _exportAllRetryOptions, - new DataPageRequest(FhirRequestPath: nextLink, InstanceId: context.InstanceId, ParallelTaskIndex: i, ResourceType: options.ResourceType, Audience: options.Audience))); + + new DataPageRequest(FhirRequestPath: nextLink, InstanceId: context.InstanceId, ParallelTaskIndex: i, ResourceType: options.ResourceType, Audience: options.Audience), + new TaskOptions(_exportAllRetryOptions))); } // Start and continue export as long as there are pending tasks @@ -165,10 +177,11 @@ public async Task ExportAll_RunOrchestrator( if (fhirResult.NextLink != null) { - exportTasks.Add(context.CallActivityWithRetryAsync( + exportTasks.Add(context.CallActivityAsync( nameof(ExportAllOrchestrator_GetAndWriteDataPage), - _exportAllRetryOptions, - new DataPageRequest(FhirRequestPath: fhirResult.NextLink, InstanceId: fhirResult.InstanceId, ParallelTaskIndex: fhirResult.ParallelTaskIndex, ResourceType: options.ResourceType, Audience: options.Audience))); + + new DataPageRequest(FhirRequestPath: fhirResult.NextLink, InstanceId: fhirResult.InstanceId, ParallelTaskIndex: fhirResult.ParallelTaskIndex, ResourceType: options.ResourceType, Audience: options.Audience), + new TaskOptions(_exportAllRetryOptions))); } } @@ -184,11 +197,12 @@ public async Task ExportAll_RunOrchestrator( return retVal; } - [FunctionName(nameof(ExportAllOrchestrator_GetCountsForListOfDateRanges))] + [Function(nameof(ExportAllOrchestrator_GetCountsForListOfDateRanges))] public async Task> ExportAllOrchestrator_GetCountsForListOfDateRanges( [ActivityTrigger] GetCountsForListOfDateRangesRequest input, - ILogger logger) + FunctionContext context) { + var logger = context.GetLogger("ExportAllOrchestrator_GetCountsForListOfDateRanges"); JObject requestBody = new(); requestBody["resourceType"] = "Bundle"; requestBody["type"] = "batch"; @@ -253,12 +267,13 @@ public async Task ExportAll_RunOrchestrator( throw new Exception(message); } - [FunctionName(nameof(ExportAllOrchestrator_GetAndWriteDataPage))] + [Function(nameof(ExportAllOrchestrator_GetAndWriteDataPage))] public async Task ExportAllOrchestrator_GetAndWriteDataPage( [ActivityTrigger] DataPageRequest input, - [DurableClient] IDurableEntityClient ec, - ILogger logger) + [DurableClient] DurableTaskClient ec, + FunctionContext context) { + var logger = context.GetLogger("ExportAllOrchestrator_GetAndWriteDataPage"); logger.LogInformation($"Fetching page of resources using query {input.FhirRequestPath}"); var response = await FHIRUtils.CallFHIRServer(input.FhirRequestPath, "", HttpMethod.Get, logger, input.Audience); diff --git a/FHIRBulkImport/ExportOrchestrator.cs b/FHIRBulkImport/ExportOrchestrator.cs index 0d0cd50..1161a00 100644 --- a/FHIRBulkImport/ExportOrchestrator.cs +++ b/FHIRBulkImport/ExportOrchestrator.cs @@ -1,26 +1,33 @@ -using System.Collections.Generic; +using Azure.Storage.Blobs.Specialized; +using DurableTask.Core; +using DurableTask.Core.Entities; +using Microsoft.Azure.Functions.Worker; +using Microsoft.Azure.Functions.Worker.Extensions.Http; +using Microsoft.Azure.Functions.Worker.Extensions.DurableTask; +using Microsoft.Azure.Functions.Worker.Http; +using Microsoft.DurableTask; +using Microsoft.DurableTask.Client; +using Microsoft.DurableTask.Entities; +using Microsoft.Extensions.Logging; +using Newtonsoft.Json.Linq; +using System; +using System.Collections; using System.Collections.Concurrent; +using System.Collections.Generic; +using System.ComponentModel; using System.IO; +using System.Linq; +using System.Net; using System.Net.Http; +using System.Net.Http.Headers; using System.Text; -using System.Threading.Tasks; -using Microsoft.Azure.WebJobs; -using Microsoft.Azure.WebJobs.Extensions.DurableTask; -using Microsoft.Azure.WebJobs.Extensions.Http; -using Microsoft.Azure.WebJobs.Host; -using Microsoft.Extensions.Logging; -using Newtonsoft.Json.Linq; -using System.Linq; -using System; using System.Threading; -using System.Net.Http.Headers; -using DurableTask.Core; -using Azure.Storage.Blobs.Specialized; -using System.ComponentModel; +using System.Threading.Tasks; +using System.Text.Json; namespace FHIRBulkImport { - public static class ExportOrchestrator + public class ExportOrchestrator { private static void IdentifyUniquePatientReferences(JObject resource, string patreffield, HashSet uniquePats) { @@ -73,28 +80,31 @@ private static JObject SetContextVariables(string instanceId, string ids = null, if (include != null) o["include"] = include; return o; } - [FunctionName("CountFileLines")] - public static async Task CountFileLines( + [Function("CountFileLines")] + public async Task CountFileLines( [ActivityTrigger] JObject ctx, - ILogger log) + FunctionContext context) { + var log = context.GetLogger("CountFileLines"); string instanceid = (string)ctx["instanceid"]; string blob = (string)ctx["filename"]; return await FileHolderManager.CountLinesInBlob(Utils.GetEnvironmentVariable("FBI-STORAGEACCT"),instanceid, blob,log); } - [FunctionName("FileNames")] - public static async Task> FileNames( + [Function("FileNames")] + public async Task> FileNames( [ActivityTrigger] string instanceid, - ILogger log) + FunctionContext context) { + var log = context.GetLogger("FileNames"); + return await FileHolderManager.GetFileNames(instanceid,log); } - [FunctionName("ExportOrchestrator")] - public static async Task RunOrchestrator( - [OrchestrationTrigger] IDurableOrchestrationContext context, - ILogger log) + [Function("ExportOrchestrator")] + public async Task RunOrchestrator( + [OrchestrationTrigger] TaskOrchestrationContext context, + FunctionContext functionContext) { - + var log = functionContext.GetLogger("ExportOrchestrator"); JObject config = null; JObject retVal = new JObject(); HashSet uniquePats = new HashSet(); @@ -212,11 +222,12 @@ public static async Task RunOrchestrator( log.LogInformation($"Completed orchestration with ID = '{context.InstanceId}'."); return filecounts; } - [FunctionName("AppendBlob")] - public static async Task AppendBlob( + [Function("AppendBlob")] + public async Task AppendBlob( [ActivityTrigger] JToken ctx, - ILogger log) + FunctionContext context) { + var log = context.GetLogger("AppendBlob"); string instanceid = (string)ctx["instanceId"]; string rm = (string)ctx["ids"]; var appendBlobClient = await StorageUtils.GetAppendBlobClient(Utils.GetEnvironmentVariable("FBI-STORAGEACCT"), $"export/{instanceid}", "_completed_run.json"); @@ -226,19 +237,19 @@ public static async Task AppendBlob( } return true; } - [FunctionName("QueryFHIR")] - public static async Task> QueryFHIR( - [ActivityTrigger] IDurableActivityContext context, - ILogger log) + [Function("QueryFHIR")] + public async Task> QueryFHIR( + [ActivityTrigger] JToken input, + FunctionContext context) { - + var log = context.GetLogger("QueryFHIR"); HashSet uniquePats = new HashSet(); try { - JToken vars = context.GetInput(); - string query = vars["query"].ToString(); - string instanceid = vars["instanceid"].ToString(); - string patreffield = (string)vars["patreffield"]; + + string query = input["query"].ToString(); + string instanceid = input["instanceid"].ToString(); + string patreffield = (string)input["patreffield"]; var fhirresp = await FHIRUtils.CallFHIRServer(query, "", HttpMethod.Get, log); if (fhirresp.Success && !string.IsNullOrEmpty(fhirresp.Content)) { @@ -310,9 +321,10 @@ public static async Task> QueryFHIR( } return uniquePats; } - [FunctionName("ExportOrchestrator_ProcessPatientQueryPage")] - public static async Task ProcessPatientQueryPage([OrchestrationTrigger] IDurableOrchestrationContext context, ILogger log) + [Function("ExportOrchestrator_ProcessPatientQueryPage")] + public async Task ProcessPatientQueryPage([OrchestrationTrigger] TaskOrchestrationContext context, FunctionContext functionContext) { + var log = functionContext.GetLogger("ExportOrchestrator_ProcessPatientQueryPage"); JObject retVal = new JObject(); var vars = context.GetInput(); try @@ -370,31 +382,31 @@ public static async Task ProcessPatientQueryPage([OrchestrationTrigger] } return retVal; } - [FunctionName("FileTracker")] - public static void FileTracker ([EntityTrigger] IDurableEntityContext ctx,ILogger log) + [Function("FileTracker")] + public async Task Run([OrchestrationTrigger] TaskOrchestrationContext ctx, FunctionContext context) { - - switch (ctx.OperationName.ToLowerInvariant()) + var log = context.GetLogger("FileTracker"); + var input = ctx.GetInput(); + if (input.Operation == "set") + { + await ctx.CallActivityAsync("FileTracker_Set", input.Value); + } + else if (input.Operation == "get") { - //Set File Number - case "set": - ctx.SetState(ctx.GetInput()); - break; - case "get": - ctx.Return(ctx.GetState()); - break; + var value = await ctx.CallActivityAsync("FileTracker_Get", null); + ctx.SetCustomStatus(value); } + } - [FunctionName("ExportOrchestrator_GatherResources")] - public static async Task GatherResources([ActivityTrigger] IDurableActivityContext context, [DurableClient] IDurableEntityClient entityclient, ILogger log) + [Function("ExportOrchestrator_GatherResources")] + public static async Task GatherResources([ActivityTrigger] JToken input, [DurableClient] DurableTaskClient entityclient, FunctionContext context) { - - - JToken vars = context.GetInput(); + var log = context.GetLogger("ExportOrchestrator_GatherResources"); + int total = 0; - string query = vars["ids"].ToString(); - string instanceid = vars["instanceid"].ToString(); - var rt = (string)vars["resourcetype"]; + string query = input["ids"].ToString(); + string instanceid = input["instanceid"].ToString(); + var rt = (string)input["resourcetype"]; var fhirresp = await FHIRUtils.CallFHIRServer(query, "", HttpMethod.Get, log); if (fhirresp.Success && !string.IsNullOrEmpty(fhirresp.Content)) { @@ -427,7 +439,7 @@ public static async Task GatherResources([ActivityTrigger] IDurableActiv public record struct ConvertToNDJSONResponse(int ResourceCount, string ResourceType, string BlobUrl); - internal static async Task ConvertToNDJSON(JToken bundle, string instanceId, string resourceType, IDurableEntityClient entityclient, ILogger log, int? parallelFileId = null) + internal static async Task ConvertToNDJSON(JToken bundle, string instanceId, string resourceType, DurableTaskClient entityclient, ILogger log, int? parallelFileId = null) { ConvertToNDJSONResponse? retVal = null; int cnt = 0; @@ -454,10 +466,10 @@ public record struct ConvertToNDJSONResponse(int ResourceCount, string ResourceT { string parallelizationModifierStr = parallelFileId.HasValue ? $"-{parallelFileId.Value}" : string.Empty; string key = $"{instanceId}{parallelizationModifierStr}-{resourceType}"; - - var entityId = new EntityId(nameof(FileTracker), key); - var esresp = await entityclient.ReadEntityStateAsync(entityId); - int fileno = esresp.EntityState; + + var getInstanceId = await entityclient.ScheduleNewOrchestrationInstanceAsync("FileTracker", (Operation: "get", Key: key)); + var status = await entityclient.WaitForInstanceCompletionAsync(getInstanceId, CancellationToken.None); + int fileno = status?.SerializedOutput!= null ? JsonSerializer.Deserialize(status.SerializedOutput) : 0; var filename = resourceType + parallelizationModifierStr + "-" + (fileno + 1) + ".ndjson"; var blobclient = StorageUtils.GetAppendBlobClientSync(Utils.GetEnvironmentVariable("FBI-STORAGEACCT"), $"export/{instanceId}", filename); long maxfilesizeinbytes = Utils.GetIntEnvironmentVariable("FBI-MAXFILESIZEMB", "-1") * 1024000; @@ -471,7 +483,7 @@ public record struct ConvertToNDJSONResponse(int ResourceCount, string ResourceT fileno++; filename = resourceType + parallelizationModifierStr + "-" + (fileno + 1) + ".ndjson"; blobclient = StorageUtils.GetAppendBlobClientSync(Utils.GetEnvironmentVariable("FBI-STORAGEACCT"), $"export/{instanceId}", filename); - await entityclient.SignalEntityAsync(entityId, "set", fileno); + await entityclient.ScheduleNewOrchestrationInstanceAsync("FileTracker", (Operation: "set", Key: key, Value: fileno)); } // Write the data to blob storage @@ -490,79 +502,86 @@ public record struct ConvertToNDJSONResponse(int ResourceCount, string ResourceT return retVal; } - [FunctionName("ExportOrchestrator_HttpStart")] - public static async Task HttpStart( - [HttpTrigger(AuthorizationLevel.Function,"post",Route = "$alt-export")] HttpRequestMessage req, - [DurableClient] IDurableOrchestrationClient starter, - ILogger log) + [Function("ExportOrchestrator_HttpStart")] + public async Task HttpStart( + [HttpTrigger(AuthorizationLevel.Function,"post",Route = "$alt-export")] HttpRequestData req, + [DurableClient] DurableTaskClient starter, + FunctionContext context) { + var log = context.GetLogger("ExportOrchestrator_HttpStart"); - string config = await req.Content.ReadAsStringAsync(); - var state = await runningInstances(starter, log); + string config = await new StreamReader(req.Body).ReadToEndAsync(); + var state = await runningInstances(starter, context); int running = state.Count(); int maxinstances = Utils.GetIntEnvironmentVariable("FBI-MAXEXPORTS", "0"); if (maxinstances > 0 && running >= maxinstances) { string msg = $"Unable to start export there are {running} exports the max concurrent allowed is {maxinstances}"; - StringContent sc = new StringContent("{\"error\":\"" + msg + "\""); - return new HttpResponseMessage() { Content = sc, StatusCode = System.Net.HttpStatusCode.TooManyRequests}; + var response = req.CreateResponse(System.Net.HttpStatusCode.TooManyRequests); + await response.WriteStringAsync($"{{\"error\":\"{msg}\"}}"); + + return response; } // Function input comes from the request content. - string instanceId = await starter.StartNewAsync("ExportOrchestrator",null,config); + string instanceId = await starter.ScheduleNewOrchestrationInstanceAsync("ExportOrchestrator",config); log.LogInformation($"Started orchestration with ID = '{instanceId}'."); return starter.CreateCheckStatusResponse(req, instanceId); } - [FunctionName("ExportOrchestrator_InstanceAction")] - public static async Task InstanceAction( - [HttpTrigger(AuthorizationLevel.Function, "get", Route = "$alt-export-manage/{instanceid}")] HttpRequestMessage req, - [DurableClient] IDurableOrchestrationClient starter,string instanceid, - ILogger log) + [Function("ExportOrchestrator_InstanceAction")] + public async Task InstanceAction( + [HttpTrigger(AuthorizationLevel.Function, "get", Route = "$alt-export-manage/{instanceid}")] HttpRequestData req, + [DurableClient] DurableTaskClient starter,string instanceid, + FunctionContext context) { - - var parms = System.Web.HttpUtility.ParseQueryString(req.RequestUri.Query); + var log = context.GetLogger("ExportOrchestrator_InstanceAction"); + var parms = System.Web.HttpUtility.ParseQueryString(req.Url.Query); string action = parms["action"]; - await starter.TerminateAsync(instanceid, "Terminated by User"); + await starter.TerminateInstanceAsync(instanceid, "Terminated by User"); StringContent sc = new StringContent($"Terminated {instanceid}"); var response = req.CreateResponse(System.Net.HttpStatusCode.OK); - response.Content = sc; - response.Content.Headers.ContentType = new MediaTypeHeaderValue("text/plain"); + response.Headers.Add("Content-Type", "text/plain"); + await response.WriteStringAsync("Terminated abc"); return response; } - [FunctionName("ExportOrchestrator_ExportStatus")] - public static async Task ExportStatus( - [HttpTrigger(AuthorizationLevel.Function, "get", Route = "$alt-export-status")] HttpRequestMessage req, - [DurableClient] IDurableOrchestrationClient starter, - ILogger log) + [Function("ExportOrchestrator_ExportStatus")] + public async Task ExportStatus( + [HttpTrigger(AuthorizationLevel.Function, "get", Route = "$alt-export-status")] HttpRequestData req, + [DurableClient] DurableTaskClient client, + FunctionContext context) { - string config = await req.Content.ReadAsStringAsync(); - var state = await runningInstances(starter, log); + var log = context.GetLogger("ExportOrchestrator_ExportStatus"); + string config = await new StreamReader(req.Body).ReadToEndAsync(); + var state = await runningInstances(client, context); JArray retVal = new JArray(); - foreach (DurableOrchestrationStatus status in state) + foreach (var status in state) { + var statusWithInput = await client.GetInstanceAsync(status.InstanceId, getInputsAndOutputs: true); JObject o = new JObject(); o["instanceId"] = status.InstanceId; - o["createdDateTime"] = status.CreatedTime; + o["createdDateTime"] = status.CreatedAt; o["status"] = status.RuntimeStatus.ToString(); - TimeSpan span = (DateTime.UtcNow - status.CreatedTime); + TimeSpan span = (DateTime.UtcNow - status.CreatedAt); o["elapsedtimeinminutes"] = span.TotalMinutes; - o["input"] = status.Input; + o["input"] = statusWithInput?.SerializedInput ?? ""; retVal.Add(o); } - StringContent sc = new StringContent(retVal.ToString()); - var response = req.CreateResponse(System.Net.HttpStatusCode.OK); - response.Content = sc; - response.Content.Headers.ContentType = new MediaTypeHeaderValue("application/json"); + + var response = req.CreateResponse(HttpStatusCode.OK); + response.Headers.Add("Content-Type", "application/json"); + await response.WriteStringAsync(retVal.ToString()); return response; } - [FunctionName("ExportBlobTrigger")] - public static async Task RunBlobTrigger([BlobTrigger("export-trigger/{name}", Connection = "FBI-STORAGEACCT-IDENTITY")] Stream myBlob, string name, [DurableClient] IDurableOrchestrationClient starter, ILogger log) + + [Function("ExportBlobTrigger")] + public async Task RunBlobTrigger([BlobTrigger("export-trigger/{name}", Connection = "FBI-STORAGEACCT-IDENTITY")] Stream myBlob, string name, [DurableClient] DurableTaskClient starter, FunctionContext context) { + var log = context.GetLogger("ExportBlobTrigger"); StreamReader reader = new StreamReader(myBlob); var text = await reader.ReadToEndAsync(); - var state = await runningInstances(starter, log); + var state = await runningInstances(starter, context); int running = state.Count(); int maxinstances = Utils.GetIntEnvironmentVariable("FBI-MAXEXPORTS", "0"); if (maxinstances > 0 && running >= maxinstances) @@ -571,28 +590,31 @@ public static async Task RunBlobTrigger([BlobTrigger("export-trigger/{name}", Co log.LogError($"ExportBlobTrigger:{msg}"); return; } - string instanceId = await starter.StartNewAsync("ExportOrchestrator", null, text); + string instanceId = await starter.ScheduleNewOrchestrationInstanceAsync("ExportOrchestrator", text); var bc = StorageUtils.GetCloudBlobClient(Utils.GetEnvironmentVariable("FBI-STORAGEACCT")); await StorageUtils.MoveTo(bc, "export-trigger", "export-trigger-processed", name, name, log); log.LogInformation($"Started orchestration with ID = '{instanceId}'."); } - public static async Task> runningInstances(IDurableOrchestrationClient client,ILogger log) + public async Task> runningInstances(DurableTaskClient client, FunctionContext context) { - var queryFilter = new OrchestrationStatusQueryCondition + + var queryFilter = new OrchestrationQuery { - RuntimeStatus = new[] + Statuses = new[] { OrchestrationRuntimeStatus.Pending, OrchestrationRuntimeStatus.Running } - + }; + var result = new List(); + await foreach (var item in client.GetAllInstancesAsync(queryFilter)) + { + result.Add(item); + } - OrchestrationStatusQueryResult result = await client.ListInstancesAsync( - queryFilter, - CancellationToken.None); - var retVal = new List(); - foreach (DurableOrchestrationStatus status in result.DurableOrchestrationState) + var retVal = new List(); + await foreach (var status in client.GetAllInstancesAsync(queryFilter)) { if (!status.InstanceId.Contains(":") && !status.InstanceId.StartsWith("@")) { @@ -600,25 +622,26 @@ public static async Task> runningInstanc } } return retVal; - + } - [FunctionName("ExportHistoryCleanUp")] + [Function("ExportHistoryCleanUp")] public static async Task CleanupOldRuns( [TimerTrigger("0 0 0 * * *")] TimerInfo timerInfo, - [DurableClient] IDurableOrchestrationClient orchestrationClient, - ILogger log) + [DurableClient] DurableTaskClient orchestrationClient, + FunctionContext context) { + var log = context.GetLogger("ExportHistoryCleanUp"); var createdTimeFrom = DateTime.MinValue; var createdTimeTo = DateTime.UtcNow.Subtract(TimeSpan.FromDays(Utils.GetIntEnvironmentVariable("FBI-EXPORTPURGEAFTERDAYS", "30"))); - var runtimeStatus = new List + var runtimeStatus = new List { - OrchestrationStatus.Completed, - OrchestrationStatus.Canceled, - OrchestrationStatus.Failed, - OrchestrationStatus.Terminated + OrchestrationRuntimeStatus.Completed, + OrchestrationRuntimeStatus.Canceled, + OrchestrationRuntimeStatus.Failed, + OrchestrationRuntimeStatus.Terminated }; - var result = await orchestrationClient.PurgeInstanceHistoryAsync(createdTimeFrom, createdTimeTo, runtimeStatus); - log.LogInformation($"Scheduled cleanup done, {result.InstancesDeleted} instances deleted"); + var result = await orchestrationClient.PurgeInstancesAsync(createdTimeFrom, createdTimeTo, runtimeStatus); + log.LogInformation($"Scheduled cleanup done and instances deleted"); } } diff --git a/FHIRBulkImport/FHIRBulkImport.csproj b/FHIRBulkImport/FHIRBulkImport.csproj index 5b106e4..16ad223 100644 --- a/FHIRBulkImport/FHIRBulkImport.csproj +++ b/FHIRBulkImport/FHIRBulkImport.csproj @@ -1,19 +1,37 @@  + net8.0 v4 + Exe <_FunctionsSkipCleanOutput>true + 96064c59-55c4-4084-a2fd-99ed65f18a77 - + + + + + + + + + + + + + + - - - - - - + + + + + + + + diff --git a/FHIRBulkImport/ImportBundleBlobTrigger.cs b/FHIRBulkImport/ImportBundleBlobTrigger.cs index a7425cf..c86071d 100644 --- a/FHIRBulkImport/ImportBundleBlobTrigger.cs +++ b/FHIRBulkImport/ImportBundleBlobTrigger.cs @@ -1,11 +1,12 @@ +using Microsoft.ApplicationInsights; +using Microsoft.ApplicationInsights.Extensibility; +using Microsoft.Azure.Functions.Worker; +using Microsoft.Azure.Functions.Worker.Extensions.Storage.Blobs; +using Microsoft.Extensions.Logging; using System; +using System.ComponentModel.DataAnnotations; using System.IO; using System.Threading.Tasks; -using Microsoft.ApplicationInsights.Extensibility; -using Microsoft.ApplicationInsights; -using Microsoft.Azure.WebJobs; -using Microsoft.Azure.WebJobs.Host; -using Microsoft.Extensions.Logging; namespace FHIRBulkImport { @@ -16,12 +17,14 @@ public class ImportBundleBlobTrigger public ImportBundleBlobTrigger(TelemetryConfiguration telemetryConfiguration) { _telemetryClient = new TelemetryClient(telemetryConfiguration); - } - [Disable("FBI-DISABLE-BLOBTRIGGER")] - [FunctionName("ImportBundleBlobTrigger")] - public async Task Run([BlobTrigger("bundles/{name}", Connection = "FBI-STORAGEACCT-IDENTITY")]Stream myBlob, string name, ILogger log) + } + + [Function("ImportBundleBlobTrigger")] + + public async Task Run([BlobTrigger("bundles/{name}", Connection = "FBI-STORAGEACCT-IDENTITY")]Stream myBlob, string name, FunctionContext context) { - await ImportUtils.ImportBundle(name, log, _telemetryClient); + var logger = context.GetLogger("ImportBundleBlobTrigger"); + await ImportUtils.ImportBundle(name, logger, _telemetryClient); } } } diff --git a/FHIRBulkImport/ImportBundleEventGrid.cs b/FHIRBulkImport/ImportBundleEventGrid.cs index 5267b3d..2eb2428 100644 --- a/FHIRBulkImport/ImportBundleEventGrid.cs +++ b/FHIRBulkImport/ImportBundleEventGrid.cs @@ -4,8 +4,8 @@ using System.Threading.Tasks; using Microsoft.ApplicationInsights.Extensibility; using Microsoft.ApplicationInsights; -using Microsoft.Azure.WebJobs; -using Microsoft.Azure.WebJobs.Extensions.EventGrid; +using Microsoft.Azure.Functions.Worker; +using Microsoft.Azure.Functions.Worker.Extensions.EventGrid; using Microsoft.Extensions.Logging; using Newtonsoft.Json.Linq; @@ -15,16 +15,17 @@ namespace FHIRBulkImport public class ImportBundleEventGrid { - [FunctionName("ImportBundleEventGrid")] - [return: Queue("bundlequeue", Connection = "FBI-STORAGEACCT-QUEUEURI-IDENTITY")] - public static JObject Run([EventGridTrigger] JObject blobCreatedEvent, - ILogger log) + [Function("ImportBundleEventGrid")] + [QueueOutput("bundlequeue", Connection = "FBI-STORAGEACCT-QUEUEURI-IDENTITY")] + public JObject Run([EventGridTrigger] JObject blobCreatedEvent, + FunctionContext context) { { - + var logger = context.GetLogger("ImportBundleEventGrid"); + logger.LogInformation("EventGrid trigger recieved event."); return blobCreatedEvent; } } } -} +} \ No newline at end of file diff --git a/FHIRBulkImport/ImportBundleHTTP.cs b/FHIRBulkImport/ImportBundleHTTP.cs index 3d0e5f7..811e518 100644 --- a/FHIRBulkImport/ImportBundleHTTP.cs +++ b/FHIRBulkImport/ImportBundleHTTP.cs @@ -2,9 +2,8 @@ using System.Linq; using System.IO; using System.Threading.Tasks; -using Microsoft.AspNetCore.Mvc; -using Microsoft.Azure.WebJobs; -using Microsoft.Azure.WebJobs.Extensions.Http; +using Microsoft.Azure.Functions.Worker; +using Microsoft.Azure.Functions.Worker.Http; using Microsoft.AspNetCore.Http; using Microsoft.Extensions.Logging; using Newtonsoft.Json.Linq; @@ -22,36 +21,52 @@ public ImportBundleHTTP(TelemetryConfiguration telemetryConfiguration) { _telemetryClient = new TelemetryClient(telemetryConfiguration); } - [Disable("FBI-DISABLE-HTTPEP")] - [FunctionName("ImportBundleHTTP")] - public static async Task Run( - [HttpTrigger(AuthorizationLevel.Function, "post", Route = "importbundle")] HttpRequest req, - ILogger log) + + [Function("ImportBundleHTTP")] + public async Task Run( + [HttpTrigger(AuthorizationLevel.Function, "post", Route = "importbundle")] HttpRequestData req, + FunctionContext context) { + var logger = context.GetLogger("ImportBundleHTTP"); string requestBody = await new StreamReader(req.Body).ReadToEndAsync(); - string filename = req.Query["bundlename"]; + string filename = System.Web.HttpUtility.ParseQueryString(req.Url.Query)["bundlename"]; if (string.IsNullOrEmpty(filename)) filename = $"bundle{Guid.NewGuid().ToString().Replace("-", "")}.json"; if (!filename.ToLower().EndsWith(".json")) filename += ".json"; + var response = req.CreateResponse(); try { var o = JObject.Parse(requestBody); if (o["resourceType"] !=null && o["resourceType"].ToString().Equals("Bundle")) { var cbclient = StorageUtils.GetCloudBlobClient(System.Environment.GetEnvironmentVariable("FBI-STORAGEACCT")); - await StorageUtils.WriteStringToBlob(cbclient, "bundles", filename, requestBody, log); - return new ContentResult() { Content = "{\"filename\":\"" + filename + "\"}", StatusCode = 202, ContentType = "application/json" }; + await StorageUtils.WriteStringToBlob(cbclient, "bundles", filename, requestBody, logger); + response.StatusCode = System.Net.HttpStatusCode.Accepted; + response.Headers.Add("Content-Type", "application/json"); + await response.WriteStringAsync($"{{\"filename\":\"{filename}\"}}"); + return response; } - return new ContentResult() { Content = $"Not a Valid FHIR Bundle", StatusCode = 400, ContentType = "text/plain" }; + response.StatusCode = System.Net.HttpStatusCode.BadRequest; + response.Headers.Add("Content-Type", "text/plain"); + await response.WriteStringAsync("Not a Valid FHIR Bundle"); + return response; } catch (JsonReaderException jre) { - return new ContentResult() {Content=$"Invalid JSONRequest Body:{jre.Message}",StatusCode=400,ContentType="text/plain" }; + response.StatusCode = System.Net.HttpStatusCode.BadRequest; + response.Headers.Add("Content-Type", "text/plain"); + await response.WriteStringAsync($"Invalid JSONRequest Body:{jre.Message}"); + return response; + } catch (Exception e) { - return new ContentResult() { Content = $"Error processing request:{e.Message}", StatusCode = 500, ContentType = "text/plain" }; + response.StatusCode = System.Net.HttpStatusCode.InternalServerError; + response.Headers.Add("Content-Type", "text/plain"); + await response.WriteStringAsync($"Error processing request:{e.Message}"); + return response; + } } diff --git a/FHIRBulkImport/ImportBundleQueue.cs b/FHIRBulkImport/ImportBundleQueue.cs index ced843a..4b0af88 100644 --- a/FHIRBulkImport/ImportBundleQueue.cs +++ b/FHIRBulkImport/ImportBundleQueue.cs @@ -4,12 +4,12 @@ using System.Threading.Tasks; using Microsoft.ApplicationInsights.Extensibility; using Microsoft.ApplicationInsights; -using Microsoft.Azure.WebJobs; -using Microsoft.Azure.WebJobs.Extensions.EventGrid; using Microsoft.Extensions.Logging; using Newtonsoft.Json.Linq; -using Microsoft.Azure.WebJobs.Extensions.DurableTask; +using Microsoft.Azure.Functions.Worker; +using Microsoft.Azure.Functions.Worker.Http; using System.Collections.Generic; +using Microsoft.Azure.Functions.Worker.Extensions.Timer; using Azure.Storage.Queues; using Azure.Storage.Queues.Models; using Azure.Identity; @@ -25,21 +25,23 @@ public ImportBundleQueue(TelemetryConfiguration telemetryConfiguration) _telemetryClient = new TelemetryClient(telemetryConfiguration); } - [FunctionName("ImportBundleQueue")] - public async Task Run([QueueTrigger("bundlequeue", Connection = "FBI-STORAGEACCT-QUEUEURI-IDENTITY")] JObject blobCreatedEvent, ILogger log) + [Function("ImportBundleQueue")] + public async Task Run([QueueTrigger("bundlequeue", Connection = "FBI-STORAGEACCT-QUEUEURI-IDENTITY")] JObject blobCreatedEvent, FunctionContext context) { + var logger = context.GetLogger("ImportBundleQueue"); string url = (string)blobCreatedEvent["data"]["url"]; - log.LogInformation($"ImportBundleEventGrid: Processing blob at {url}..."); + logger.LogInformation($"ImportBundleEventGrid: Processing blob at {url}..."); string container = Utils.GetEnvironmentVariable("FBI-CONTAINER-BUNDLES", "bundles"); string name = url.Substring(url.IndexOf($"/{container}/") + $"/{container}/".Length); - await ImportUtils.ImportBundle(name, log, _telemetryClient); + await ImportUtils.ImportBundle(name, logger, _telemetryClient); } - [FunctionName("PoisonQueueRetries")] + [Function("PoisonQueueRetries")] public static async Task PoisonQueueRetries( [TimerTrigger("%FBI-POISONQUEUE-TIMER-CRON%")] TimerInfo timerInfo, - ILogger log) + FunctionContext context) { - log.LogInformation($"PoisonQueueRetries:Checking for poison queue messages in bundlequeue-poison..."); + var logger = context.GetLogger("PoisonQueueRetries"); + logger.LogInformation($"PoisonQueueRetries:Checking for poison queue messages in bundlequeue-poison..."); var sourceQueue = new QueueClient(new Uri($"{Utils.GetEnvironmentVariable("FBI-STORAGEACCT-QUEUEURI")}/bundlequeue-poison"),new DefaultAzureCredential()); await sourceQueue.CreateIfNotExistsAsync(); var targetQueue = new QueueClient(new Uri($"{Utils.GetEnvironmentVariable("FBI-STORAGEACCT-QUEUEURI")}/bundlequeue"), new DefaultAzureCredential()); @@ -51,7 +53,7 @@ public static async Task PoisonQueueRetries( QueueProperties properties = sourceQueue.GetProperties(); // Retrieve the cached approximate message count. int cachedMessagesCount = properties.ApproximateMessagesCount; - log.LogInformation($"PoisonQueueRetries:Found {cachedMessagesCount} messages in bundlequeue-poison....Re-queing upto {maxrequeuemessages}"); + logger.LogInformation($"PoisonQueueRetries:Found {cachedMessagesCount} messages in bundlequeue-poison....Re-queing upto {maxrequeuemessages}"); while(cachedMessagesCount > 0 && messagesrequeued < maxrequeuemessages) { int batchsize = (maxrequeuemessages - messagesrequeued >= 32 ? 32 : maxrequeuemessages - messagesrequeued); foreach (var message in sourceQueue.ReceiveMessages(maxMessages: batchsize).Value) @@ -63,7 +65,7 @@ public static async Task PoisonQueueRetries( properties = sourceQueue.GetProperties(); cachedMessagesCount = properties.ApproximateMessagesCount; } - log.LogInformation($"PoisonQueueRetries:Requeued {messagesrequeued} messages to bundlequeue"); + logger.LogInformation($"PoisonQueueRetries:Requeued {messagesrequeued} messages to bundlequeue"); } } diff --git a/FHIRBulkImport/ImportCompressedFiles.cs b/FHIRBulkImport/ImportCompressedFiles.cs index f83db41..7459c40 100644 --- a/FHIRBulkImport/ImportCompressedFiles.cs +++ b/FHIRBulkImport/ImportCompressedFiles.cs @@ -6,10 +6,8 @@ using System.Threading.Tasks; using Microsoft.ApplicationInsights.Extensibility; using Microsoft.ApplicationInsights; -using Microsoft.Azure.WebJobs; +using Microsoft.Azure.Functions.Worker; using Microsoft.Extensions.Logging; -using Microsoft.WindowsAzure.Storage; -using Microsoft.WindowsAzure.Storage.Blob; using Azure.Storage.Blobs; namespace FHIRBulkImport @@ -22,9 +20,10 @@ public ImportCompressedFiles(TelemetryConfiguration telemetryConfiguration) { _telemetryClient = new TelemetryClient(telemetryConfiguration); } - [FunctionName("ImportCompressedFiles")] - public static async Task Run([BlobTrigger("zip/{name}", Connection = "FBI-STORAGEACCT-IDENTITY")]Stream myBlob, string name, ILogger log) + [Function("ImportCompressedFiles")] + public async Task Run([BlobTrigger("zip/{name}", Connection = "FBI-STORAGEACCT-IDENTITY")]Stream myBlob, string name, FunctionContext context) { + var logger = context.GetLogger("ImportCompressedFiles"); try { int filecnt = 0; @@ -37,7 +36,7 @@ public static async Task Run([BlobTrigger("zip/{name}", Connection = "FBI-STORAG using (MemoryStream blobMemStream = new MemoryStream()) { - log.LogInformation($"ImportCompressedFiles: Decompressing {name} ..."); + logger.LogInformation($"ImportCompressedFiles: Decompressing {name} ..."); await myBlob.CopyToAsync(blobMemStream); using (ZipArchive archive = new ZipArchive(blobMemStream)) @@ -67,24 +66,24 @@ public static async Task Run([BlobTrigger("zip/{name}", Connection = "FBI-STORAG await blockBlob.UploadAsync(fileStream); } - log.LogInformation($"ImportCompressedFiles: Extracted {entry.FullName} to {destination.Name}/{validname}"); + logger.LogInformation($"ImportCompressedFiles: Extracted {entry.FullName} to {destination.Name}/{validname}"); } else { - log.LogInformation($"ImportCompressedFiles: Entry {entry.FullName} skipped does not end in .ndjson or .json"); + logger.LogInformation($"ImportCompressedFiles: Entry {entry.FullName} skipped does not end in .ndjson or .json"); } filecnt++; } } } - log.LogInformation($"ImportCompressedFiles: Completed Decompressing {name} extracted {filecnt} files..."); - await StorageUtils.MoveTo(blobClient, "zip", "zipprocessed", name, name,log); + logger.LogInformation($"ImportCompressedFiles: Completed Decompressing {name} extracted {filecnt} files..."); + await StorageUtils.MoveTo(blobClient, "zip", "zipprocessed", name, name,logger); } } catch (Exception ex) { - log.LogInformation($"ImportCompressedFiles: Error! Something went wrong: {ex.Message}"); + logger.LogInformation($"ImportCompressedFiles: Error! Something went wrong: {ex.Message}"); } } diff --git a/FHIRBulkImport/ImportNDJSONEventGird.cs b/FHIRBulkImport/ImportNDJSONEventGird.cs index 62d7494..4c01de7 100644 --- a/FHIRBulkImport/ImportNDJSONEventGird.cs +++ b/FHIRBulkImport/ImportNDJSONEventGird.cs @@ -4,8 +4,7 @@ using System.Threading.Tasks; using Azure.Messaging.EventGrid; using Azure.Messaging.EventGrid.SystemEvents; -using Microsoft.Azure.WebJobs; -using Microsoft.Azure.WebJobs.Extensions.EventGrid; +using Microsoft.Azure.Functions.Worker; using Microsoft.Extensions.Logging; @@ -15,12 +14,12 @@ namespace FHIRBulkImport public static class ImportNDJSONEventGird { - [FunctionName("ImportNDJSON")] - [return: Queue("ndjsonqueue", Connection = "FBI-STORAGEACCT-QUEUEURI-IDENTITY")] + [Function("ImportNDJSON")] + [QueueOutput("ndjsonqueue", Connection = "FBI-STORAGEACCT-QUEUEURI-IDENTITY")] public static JObject Run([EventGridTrigger]JObject blobCreatedEvent, - ILogger log) + FunctionContext context) { - + var logger = context.GetLogger("ImportNDJSON"); return blobCreatedEvent; } diff --git a/FHIRBulkImport/ImportNDJSONQueue.cs b/FHIRBulkImport/ImportNDJSONQueue.cs index 3bff7a7..14fe609 100644 --- a/FHIRBulkImport/ImportNDJSONQueue.cs +++ b/FHIRBulkImport/ImportNDJSONQueue.cs @@ -3,8 +3,8 @@ using System.Text; using System.Threading.Tasks; using Azure.Storage.Queues.Models; -using Microsoft.Azure.WebJobs; -using Microsoft.Azure.WebJobs.Host; + +using Microsoft.Azure.Functions.Worker; using Microsoft.Extensions.Logging; using Newtonsoft.Json.Linq; @@ -12,14 +12,15 @@ namespace FHIRBulkImport { public class ImportNDJSONQueue { - [FunctionName("ImportNDJSONQueue")] - public static async Task Run([QueueTrigger("ndjsonqueue", Connection = "FBI-STORAGEACCT-QUEUEURI-IDENTITY")] QueueMessage queueMessage,ILogger log) + [Function("ImportNDJSONQueue")] + public static async Task Run([QueueTrigger("ndjsonqueue", Connection = "FBI-STORAGEACCT-QUEUEURI-IDENTITY")] QueueMessage queueMessage,FunctionContext context) { + var logger = context.GetLogger("ImportNDJSONQueue"); JObject blobCreatedEvent = JObject.Parse(queueMessage.Body.ToString()); string url = (string)blobCreatedEvent["data"]["url"]; if (queueMessage.DequeueCount > 1) { - log.LogInformation($"ImportNDJSONQueue: Ignoring long running requeue of file {url} on dequeue {queueMessage.DequeueCount}"); + logger.LogInformation($"ImportNDJSONQueue: Ignoring long running requeue of file {url} on dequeue {queueMessage.DequeueCount}"); return; } int maxresourcesperbundle = 200; @@ -31,17 +32,17 @@ public static async Task Run([QueueTrigger("ndjsonqueue", Connection = "FBI-STOR { if (!int.TryParse(mrbundlemax, out maxresourcesperbundle)) maxresourcesperbundle = 200; } - log.LogInformation($"NDJSONConverter: Processing blob at {url}..."); + logger.LogInformation($"NDJSONConverter: Processing blob at {url}..."); JObject rv = ImportUtils.initBundle(); int linecnt = 0; int total = 0; int bundlecnt = 0; int errcnt = 0; int fileno = 1; - Stream myBlob = await StorageUtils.GetStreamForBlob(cbclient, container, name,log); + Stream myBlob = await StorageUtils.GetStreamForBlob(cbclient, container, name,logger); if (myBlob==null) { - log.LogWarning($"ImportNDJSONQueue:The blob {name} in container {container} does not exist or cannot be read."); + logger.LogWarning($"ImportNDJSONQueue:The blob {name} in container {container} does not exist or cannot be read."); return; } StringBuilder errsb = new StringBuilder(); @@ -62,14 +63,14 @@ public static async Task Run([QueueTrigger("ndjsonqueue", Connection = "FBI-STOR } catch (Exception e) { - log.LogError($"NDJSONConverter: File {name} is in error or contains invalid JSON at line number {linecnt}:{e.Message}"); + logger.LogError($"NDJSONConverter: File {name} is in error or contains invalid JSON at line number {linecnt}:{e.Message}"); errsb.Append($"{line}\n"); errcnt++; } if (bundlecnt >= maxresourcesperbundle) { - await StorageUtils.WriteStringToBlob(cbclient, "bundles", $"{name}-{fileno++}.json", rv.ToString(), log); + await StorageUtils.WriteStringToBlob(cbclient, "bundles", $"{name}-{fileno++}.json", rv.ToString(), logger); bundlecnt = 0; rv = null; rv = ImportUtils.initBundle(); @@ -77,14 +78,14 @@ public static async Task Run([QueueTrigger("ndjsonqueue", Connection = "FBI-STOR } if (bundlecnt > 0) { - await StorageUtils.WriteStringToBlob(cbclient, "bundles", $"{name}-{fileno++}.json", rv.ToString(), log); + await StorageUtils.WriteStringToBlob(cbclient, "bundles", $"{name}-{fileno++}.json", rv.ToString(), logger); } - await StorageUtils.MoveTo(cbclient, "ndjson", "ndjsonprocessed", name, $"{name}.processed", log); + await StorageUtils.MoveTo(cbclient, "ndjson", "ndjsonprocessed", name, $"{name}.processed", logger); if (errcnt > 0) { - await StorageUtils.WriteStringToBlob(cbclient, "ndjsonerr", $"{name}.err", errsb.ToString(), log); + await StorageUtils.WriteStringToBlob(cbclient, "ndjsonerr", $"{name}.err", errsb.ToString(), logger); } - log.LogInformation($"NDJSONConverter: Processing file {name} completed with {total} resources created in {fileno - 1} bundles with {errcnt} errors..."); + logger.LogInformation($"NDJSONConverter: Processing file {name} completed with {total} resources created in {fileno - 1} bundles with {errcnt} errors..."); } } diff --git a/FHIRBulkImport/Program.cs b/FHIRBulkImport/Program.cs new file mode 100644 index 0000000..c9c124c --- /dev/null +++ b/FHIRBulkImport/Program.cs @@ -0,0 +1,39 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; +using Microsoft.Extensions.Hosting; +using Microsoft.Azure.Functions.Worker; +using Microsoft.Azure.Functions.Worker.ApplicationInsights; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.ApplicationInsights; +using Microsoft.ApplicationInsights.Extensibility; +using Microsoft.Extensions.Configuration; +using Microsoft.Extensions.Logging; +public class Program +{ + private static async Task Main(string[] args) + { + var host = new HostBuilder() + .ConfigureAppConfiguration(config => + { + config.AddJsonFile("local.settings.json", optional: true, reloadOnChange: true) + .AddEnvironmentVariables(); + }) + .ConfigureFunctionsWorkerDefaults() + .ConfigureServices(services => { + services.AddApplicationInsightsTelemetryWorkerService(); + services.ConfigureFunctionsApplicationInsights(); + }) + .ConfigureLogging(logging => + { + logging.AddApplicationInsights(); + }) + .Build(); + host.Run(); + { + + } + } + } diff --git a/FHIRBulkImport/host.json b/FHIRBulkImport/host.json index 764d007..43eb1e6 100644 --- a/FHIRBulkImport/host.json +++ b/FHIRBulkImport/host.json @@ -18,7 +18,7 @@ "storageProvider": { "partitionCount": 3 }, - "extendedSessionsEnabled": true, + "extendedSessionsEnabled": false, "extendedSessionIdleTimeoutInSeconds": 30 } diff --git a/scripts/fhirBulkImport.json b/scripts/fhirBulkImport.json index 869f243..c08f69e 100644 --- a/scripts/fhirBulkImport.json +++ b/scripts/fhirBulkImport.json @@ -358,7 +358,7 @@ "properties": { "AzureWebJobsStorage__accountname": "[format('{0}stor', variables('prefixNameCleanShort'))]", "FUNCTIONS_EXTENSION_VERSION": "~4", - "FUNCTIONS_WORKER_RUNTIME": "dotnet", + "FUNCTIONS_WORKER_RUNTIME": "dotnet-isolated", "FUNCTIONS_INPROC_NET8_ENABLED": "1", "APPLICATIONINSIGHTS_CONNECTION_STRING": "[reference(resourceId('Microsoft.Insights/components', format('{0}-ai', variables('prefixNameCleanShort'))), '2020-02-02-preview').ConnectionString]", "SCM_DO_BUILD_DURING_DEPLOYMENT": "true", @@ -400,7 +400,7 @@ "name": "[format('{0}/{1}', format('{0}-func', variables('prefixNameCleanShort')), 'web')]", "properties": { "repoUrl": "[variables('repoUrl')]", - "branch": "main", + "branch": "personal/v-shanarang/isolatedworker", "isManualIntegration": true }, "dependsOn": [ From 298a33db704aa016272684a17a1d2ff840e51948 Mon Sep 17 00:00:00 2001 From: ms-snarang Date: Wed, 20 Aug 2025 18:01:11 +0530 Subject: [PATCH 2/8] islodated worker changes --- FHIRBulkImport/ImportBundleBlobTrigger.cs | 2 +- FHIRBulkImport/ImportBundleEventGrid.cs | 4 +++- FHIRBulkImport/ImportBundleQueue.cs | 4 +++- FHIRBulkImport/ImportNDJSONEventGird.cs | 7 ++++--- FHIRBulkImport/ImportNDJSONQueue.cs | 3 ++- FHIRBulkImport/Program.cs | 6 ++---- 6 files changed, 15 insertions(+), 11 deletions(-) diff --git a/FHIRBulkImport/ImportBundleBlobTrigger.cs b/FHIRBulkImport/ImportBundleBlobTrigger.cs index c86071d..ca553b7 100644 --- a/FHIRBulkImport/ImportBundleBlobTrigger.cs +++ b/FHIRBulkImport/ImportBundleBlobTrigger.cs @@ -25,6 +25,6 @@ public async Task Run([BlobTrigger("bundles/{name}", Connection = "FBI-STORAGEAC { var logger = context.GetLogger("ImportBundleBlobTrigger"); await ImportUtils.ImportBundle(name, logger, _telemetryClient); - } + } } } diff --git a/FHIRBulkImport/ImportBundleEventGrid.cs b/FHIRBulkImport/ImportBundleEventGrid.cs index 2eb2428..6f89ceb 100644 --- a/FHIRBulkImport/ImportBundleEventGrid.cs +++ b/FHIRBulkImport/ImportBundleEventGrid.cs @@ -8,6 +8,7 @@ using Microsoft.Azure.Functions.Worker.Extensions.EventGrid; using Microsoft.Extensions.Logging; using Newtonsoft.Json.Linq; +using Azure.Messaging.EventGrid; namespace FHIRBulkImport { @@ -17,11 +18,12 @@ public class ImportBundleEventGrid [Function("ImportBundleEventGrid")] [QueueOutput("bundlequeue", Connection = "FBI-STORAGEACCT-QUEUEURI-IDENTITY")] - public JObject Run([EventGridTrigger] JObject blobCreatedEvent, + public JObject Run([EventGridTrigger]EventGridEvent eventGridEvent, FunctionContext context) { { var logger = context.GetLogger("ImportBundleEventGrid"); + var blobCreatedEvent = JObject.Parse(eventGridEvent.Data.ToString()); logger.LogInformation("EventGrid trigger recieved event."); return blobCreatedEvent; diff --git a/FHIRBulkImport/ImportBundleQueue.cs b/FHIRBulkImport/ImportBundleQueue.cs index 4b0af88..d04ad74 100644 --- a/FHIRBulkImport/ImportBundleQueue.cs +++ b/FHIRBulkImport/ImportBundleQueue.cs @@ -26,9 +26,11 @@ public ImportBundleQueue(TelemetryConfiguration telemetryConfiguration) } [Function("ImportBundleQueue")] - public async Task Run([QueueTrigger("bundlequeue", Connection = "FBI-STORAGEACCT-QUEUEURI-IDENTITY")] JObject blobCreatedEvent, FunctionContext context) + public async Task Run([QueueTrigger("bundlequeue", Connection = "FBI-STORAGEACCT-QUEUEURI-IDENTITY")] QueueMessage queueMessage, FunctionContext context) { var logger = context.GetLogger("ImportBundleQueue"); + string bodyText = queueMessage.Body.ToString(); + JObject blobCreatedEvent = JObject.Parse(bodyText); string url = (string)blobCreatedEvent["data"]["url"]; logger.LogInformation($"ImportBundleEventGrid: Processing blob at {url}..."); string container = Utils.GetEnvironmentVariable("FBI-CONTAINER-BUNDLES", "bundles"); diff --git a/FHIRBulkImport/ImportNDJSONEventGird.cs b/FHIRBulkImport/ImportNDJSONEventGird.cs index 4c01de7..a277490 100644 --- a/FHIRBulkImport/ImportNDJSONEventGird.cs +++ b/FHIRBulkImport/ImportNDJSONEventGird.cs @@ -16,12 +16,13 @@ public static class ImportNDJSONEventGird [Function("ImportNDJSON")] [QueueOutput("ndjsonqueue", Connection = "FBI-STORAGEACCT-QUEUEURI-IDENTITY")] - public static JObject Run([EventGridTrigger]JObject blobCreatedEvent, + public static JObject Run([EventGridTrigger]EventGridEvent eventGridEvent, FunctionContext context) { var logger = context.GetLogger("ImportNDJSON"); - return blobCreatedEvent; - + var eventJson = JObject.Parse(eventGridEvent.Data.ToString()); + logger.LogInformation($"Full EventGrid received: {eventJson}"); + return eventJson; } diff --git a/FHIRBulkImport/ImportNDJSONQueue.cs b/FHIRBulkImport/ImportNDJSONQueue.cs index 14fe609..a7ff412 100644 --- a/FHIRBulkImport/ImportNDJSONQueue.cs +++ b/FHIRBulkImport/ImportNDJSONQueue.cs @@ -16,7 +16,8 @@ public class ImportNDJSONQueue public static async Task Run([QueueTrigger("ndjsonqueue", Connection = "FBI-STORAGEACCT-QUEUEURI-IDENTITY")] QueueMessage queueMessage,FunctionContext context) { var logger = context.GetLogger("ImportNDJSONQueue"); - JObject blobCreatedEvent = JObject.Parse(queueMessage.Body.ToString()); + string bodyText = queueMessage.Body.ToString(); + JObject blobCreatedEvent = JObject.Parse(bodyText); string url = (string)blobCreatedEvent["data"]["url"]; if (queueMessage.DequeueCount > 1) { diff --git a/FHIRBulkImport/Program.cs b/FHIRBulkImport/Program.cs index c9c124c..58793e7 100644 --- a/FHIRBulkImport/Program.cs +++ b/FHIRBulkImport/Program.cs @@ -31,9 +31,7 @@ private static async Task Main(string[] args) logging.AddApplicationInsights(); }) .Build(); - host.Run(); - { - - } + await host.RunAsync(); + } } From 15cae716558366c99d48f8077326f8d7ac22656b Mon Sep 17 00:00:00 2001 From: ms-snarang Date: Thu, 21 Aug 2025 18:45:29 +0530 Subject: [PATCH 3/8] adding logger --- FHIRBulkImport/ImportNDJSONQueue.cs | 24 ++++++++++++++++++++++-- 1 file changed, 22 insertions(+), 2 deletions(-) diff --git a/FHIRBulkImport/ImportNDJSONQueue.cs b/FHIRBulkImport/ImportNDJSONQueue.cs index a7ff412..6ace671 100644 --- a/FHIRBulkImport/ImportNDJSONQueue.cs +++ b/FHIRBulkImport/ImportNDJSONQueue.cs @@ -16,9 +16,21 @@ public class ImportNDJSONQueue public static async Task Run([QueueTrigger("ndjsonqueue", Connection = "FBI-STORAGEACCT-QUEUEURI-IDENTITY")] QueueMessage queueMessage,FunctionContext context) { var logger = context.GetLogger("ImportNDJSONQueue"); + logger.LogInformation("Function triggered. Started Processing"); string bodyText = queueMessage.Body.ToString(); - JObject blobCreatedEvent = JObject.Parse(bodyText); + logger.LogInformation($"Queue Message Body: {bodyText}"); + JObject blobCreatedEvent; + try + { + blobCreatedEvent = JObject.Parse(bodyText); + } + catch (Exception ex) + { + logger.LogError($"Failed to parse queue message body: {ex.Message}"); + return; + } string url = (string)blobCreatedEvent["data"]["url"]; + logger.LogInformation($"Blob url extracted: {url}"); if (queueMessage.DequeueCount > 1) { logger.LogInformation($"ImportNDJSONQueue: Ignoring long running requeue of file {url} on dequeue {queueMessage.DequeueCount}"); @@ -28,6 +40,7 @@ public static async Task Run([QueueTrigger("ndjsonqueue", Connection = "FBI-STOR var cbclient = StorageUtils.GetCloudBlobClient(System.Environment.GetEnvironmentVariable("FBI-STORAGEACCT")); string container = Utils.GetEnvironmentVariable("FBI-CONTAINER-NDJSON", "ndjson"); string name = url.Substring(url.IndexOf($"/{container}/") + $"/{container}/".Length); + logger.LogInformation($"Blob name resolved: {name}"); string mrbundlemax = System.Environment.GetEnvironmentVariable("FBI-MAXRESOURCESPERBUNDLE"); if (!string.IsNullOrEmpty(mrbundlemax)) { @@ -47,14 +60,16 @@ public static async Task Run([QueueTrigger("ndjsonqueue", Connection = "FBI-STOR return; } StringBuilder errsb = new StringBuilder(); + logger.LogInformation($"Starting to read blob stream for {name}"); using (StreamReader reader = new StreamReader(myBlob)) { string line; while ((line = reader.ReadLine()) != null) { - linecnt++; + linecnt++; JObject res = null; + logger.LogDebug($"Reading line {linecnt}"); try { res = JObject.Parse(line); @@ -71,7 +86,9 @@ public static async Task Run([QueueTrigger("ndjsonqueue", Connection = "FBI-STOR if (bundlecnt >= maxresourcesperbundle) { + logger.LogInformation($"Writing bundle {name}--{fileno++}.json with {bundlecnt} resources"); await StorageUtils.WriteStringToBlob(cbclient, "bundles", $"{name}-{fileno++}.json", rv.ToString(), logger); + bundlecnt = 0; rv = null; rv = ImportUtils.initBundle(); @@ -79,11 +96,14 @@ public static async Task Run([QueueTrigger("ndjsonqueue", Connection = "FBI-STOR } if (bundlecnt > 0) { + logger.LogInformation($"Writing final bundle {name}--{fileno++}.json with {bundlecnt} resources"); await StorageUtils.WriteStringToBlob(cbclient, "bundles", $"{name}-{fileno++}.json", rv.ToString(), logger); } + logger.LogInformation($"Moving processed file to 'ndjsonprocessed'"); await StorageUtils.MoveTo(cbclient, "ndjson", "ndjsonprocessed", name, $"{name}.processed", logger); if (errcnt > 0) { + logger.LogWarning($"Writing error file with {errcnt} errors"); await StorageUtils.WriteStringToBlob(cbclient, "ndjsonerr", $"{name}.err", errsb.ToString(), logger); } logger.LogInformation($"NDJSONConverter: Processing file {name} completed with {total} resources created in {fileno - 1} bundles with {errcnt} errors..."); From b0594eb883fde0f4a7ba20943845e6a73bec7dee Mon Sep 17 00:00:00 2001 From: ms-snarang Date: Mon, 25 Aug 2025 13:18:30 +0530 Subject: [PATCH 4/8] Variable name update --- FHIRBulkImport/ExportAllOrchestrator.cs | 12 +++---- FHIRBulkImport/ExportOrchestrator.cs | 22 ++++++------- FHIRBulkImport/FHIRBulkImport.csproj | 5 ++- FHIRBulkImport/FHIRUtils.cs | 38 +++++++++++------------ FHIRBulkImport/ImportBundleBlobTrigger.cs | 2 +- FHIRBulkImport/ImportBundleEventGrid.cs | 2 +- FHIRBulkImport/ImportBundleHTTP.cs | 2 +- FHIRBulkImport/ImportBundleQueue.cs | 12 +++---- FHIRBulkImport/ImportCompressedFiles.cs | 6 ++-- FHIRBulkImport/ImportNDJSONEventGird.cs | 2 +- FHIRBulkImport/ImportNDJSONQueue.cs | 2 +- FHIRBulkImport/ImportUtils.cs | 8 ++--- 12 files changed, 58 insertions(+), 55 deletions(-) diff --git a/FHIRBulkImport/ExportAllOrchestrator.cs b/FHIRBulkImport/ExportAllOrchestrator.cs index 2de3a0f..ce63ada 100644 --- a/FHIRBulkImport/ExportAllOrchestrator.cs +++ b/FHIRBulkImport/ExportAllOrchestrator.cs @@ -17,13 +17,13 @@ namespace FHIRBulkImport { public class ExportAllOrchestrator { - private static int _exportResourceCount = Utils.GetIntEnvironmentVariable("FS-EXPORTRESOURCECOUNT", "1000"); - private static string _storageAccount = Utils.GetEnvironmentVariable("FBI-STORAGEACCT"); - private static int _maxInstances = Utils.GetIntEnvironmentVariable("FBI-MAXEXPORTS", "0"); + private static int _exportResourceCount = Utils.GetIntEnvironmentVariable("FS_EXPORTRESOURCECOUNT", "1000"); + private static string _storageAccount = Utils.GetEnvironmentVariable("FBI_STORAGEACCT"); + private static int _maxInstances = Utils.GetIntEnvironmentVariable("FBI_MAXEXPORTS", "0"); private static int _maxParallelizationCount = 100; - private static int _parallelSearchBundleSize = _maxInstances = Utils.GetIntEnvironmentVariable("FBI-PARALLELSEARCHBUNDLESIZE", "50"); - private static RetryPolicy _exportAllRetryOptions = new RetryPolicy(firstRetryInterval: TimeSpan.FromSeconds(Utils.GetIntEnvironmentVariable("FBI-EXPORTALLRETRYINTERVAL", "30")), maxNumberOfAttempts: 5, - backoffCoefficient: Convert.ToDouble(Utils.GetIntEnvironmentVariable("FBI-EXPORTALLBACKCOEFFICIENT", "3"))); + private static int _parallelSearchBundleSize = _maxInstances = Utils.GetIntEnvironmentVariable("FBI_PARALLELSEARCHBUNDLESIZE", "50"); + private static RetryPolicy _exportAllRetryOptions = new RetryPolicy(firstRetryInterval: TimeSpan.FromSeconds(Utils.GetIntEnvironmentVariable("FBI_EXPORTALLRETRYINTERVAL", "30")), maxNumberOfAttempts: 5, + backoffCoefficient: Convert.ToDouble(Utils.GetIntEnvironmentVariable("FBI_EXPORTALLBACKCOEFFICIENT", "3"))); [Function(nameof(ExportAllOrchestrator_HttpStart))] diff --git a/FHIRBulkImport/ExportOrchestrator.cs b/FHIRBulkImport/ExportOrchestrator.cs index 1161a00..8f473c7 100644 --- a/FHIRBulkImport/ExportOrchestrator.cs +++ b/FHIRBulkImport/ExportOrchestrator.cs @@ -88,7 +88,7 @@ public async Task CountFileLines( var log = context.GetLogger("CountFileLines"); string instanceid = (string)ctx["instanceid"]; string blob = (string)ctx["filename"]; - return await FileHolderManager.CountLinesInBlob(Utils.GetEnvironmentVariable("FBI-STORAGEACCT"),instanceid, blob,log); + return await FileHolderManager.CountLinesInBlob(Utils.GetEnvironmentVariable("FBI_STORAGEACCT"),instanceid, blob,log); } [Function("FileNames")] public async Task> FileNames( @@ -230,14 +230,14 @@ public async Task AppendBlob( var log = context.GetLogger("AppendBlob"); string instanceid = (string)ctx["instanceId"]; string rm = (string)ctx["ids"]; - var appendBlobClient = await StorageUtils.GetAppendBlobClient(Utils.GetEnvironmentVariable("FBI-STORAGEACCT"), $"export/{instanceid}", "_completed_run.json"); + var appendBlobClient = await StorageUtils.GetAppendBlobClient(Utils.GetEnvironmentVariable("FBI_STORAGEACCT"), $"export/{instanceid}", "_completed_run.json"); using (MemoryStream ms = new MemoryStream(Encoding.UTF8.GetBytes(rm))) { await appendBlobClient.AppendBlockAsync(ms); } return true; } - [Function("QueryFHIR")] + [Function("QueryFHIR")] public async Task> QueryFHIR( [ActivityTrigger] JToken input, FunctionContext context) @@ -471,8 +471,8 @@ public record struct ConvertToNDJSONResponse(int ResourceCount, string ResourceT var status = await entityclient.WaitForInstanceCompletionAsync(getInstanceId, CancellationToken.None); int fileno = status?.SerializedOutput!= null ? JsonSerializer.Deserialize(status.SerializedOutput) : 0; var filename = resourceType + parallelizationModifierStr + "-" + (fileno + 1) + ".ndjson"; - var blobclient = StorageUtils.GetAppendBlobClientSync(Utils.GetEnvironmentVariable("FBI-STORAGEACCT"), $"export/{instanceId}", filename); - long maxfilesizeinbytes = Utils.GetIntEnvironmentVariable("FBI-MAXFILESIZEMB", "-1") * 1024000; + var blobclient = StorageUtils.GetAppendBlobClientSync(Utils.GetEnvironmentVariable("FBI_STORAGEACCT"), $"export/{instanceId}", filename); + long maxfilesizeinbytes = Utils.GetIntEnvironmentVariable("FBI_MAXFILESIZEMB", "-1") * 1024000; int bytestoadd = System.Text.ASCIIEncoding.UTF8.GetByteCount(sb.ToString()); var props = blobclient.GetProperties(); long filetotalbytes = props.Value.ContentLength + bytestoadd; @@ -482,7 +482,7 @@ public record struct ConvertToNDJSONResponse(int ResourceCount, string ResourceT { fileno++; filename = resourceType + parallelizationModifierStr + "-" + (fileno + 1) + ".ndjson"; - blobclient = StorageUtils.GetAppendBlobClientSync(Utils.GetEnvironmentVariable("FBI-STORAGEACCT"), $"export/{instanceId}", filename); + blobclient = StorageUtils.GetAppendBlobClientSync(Utils.GetEnvironmentVariable("FBI_STORAGEACCT"), $"export/{instanceId}", filename); await entityclient.ScheduleNewOrchestrationInstanceAsync("FileTracker", (Operation: "set", Key: key, Value: fileno)); } @@ -513,7 +513,7 @@ public async Task HttpStart( string config = await new StreamReader(req.Body).ReadToEndAsync(); var state = await runningInstances(starter, context); int running = state.Count(); - int maxinstances = Utils.GetIntEnvironmentVariable("FBI-MAXEXPORTS", "0"); + int maxinstances = Utils.GetIntEnvironmentVariable("FBI_MAXEXPORTS", "0"); if (maxinstances > 0 && running >= maxinstances) { string msg = $"Unable to start export there are {running} exports the max concurrent allowed is {maxinstances}"; @@ -575,7 +575,7 @@ public async Task ExportStatus( } [Function("ExportBlobTrigger")] - public async Task RunBlobTrigger([BlobTrigger("export-trigger/{name}", Connection = "FBI-STORAGEACCT-IDENTITY")] Stream myBlob, string name, [DurableClient] DurableTaskClient starter, FunctionContext context) + public async Task RunBlobTrigger([BlobTrigger("export-trigger/{name}", Connection = "FBI_STORAGEACCT_IDENTITY")] Stream myBlob, string name, [DurableClient] DurableTaskClient starter, FunctionContext context) { var log = context.GetLogger("ExportBlobTrigger"); @@ -583,7 +583,7 @@ public async Task RunBlobTrigger([BlobTrigger("export-trigger/{name}", Connectio var text = await reader.ReadToEndAsync(); var state = await runningInstances(starter, context); int running = state.Count(); - int maxinstances = Utils.GetIntEnvironmentVariable("FBI-MAXEXPORTS", "0"); + int maxinstances = Utils.GetIntEnvironmentVariable("FBI_MAXEXPORTS", "0"); if (maxinstances > 0 && running >= maxinstances) { string msg = $"Unable to start export there are {running} exports the max concurrent allowed is {maxinstances}"; @@ -591,7 +591,7 @@ public async Task RunBlobTrigger([BlobTrigger("export-trigger/{name}", Connectio return; } string instanceId = await starter.ScheduleNewOrchestrationInstanceAsync("ExportOrchestrator", text); - var bc = StorageUtils.GetCloudBlobClient(Utils.GetEnvironmentVariable("FBI-STORAGEACCT")); + var bc = StorageUtils.GetCloudBlobClient(Utils.GetEnvironmentVariable("FBI_STORAGEACCT")); await StorageUtils.MoveTo(bc, "export-trigger", "export-trigger-processed", name, name, log); log.LogInformation($"Started orchestration with ID = '{instanceId}'."); } @@ -632,7 +632,7 @@ public static async Task CleanupOldRuns( { var log = context.GetLogger("ExportHistoryCleanUp"); var createdTimeFrom = DateTime.MinValue; - var createdTimeTo = DateTime.UtcNow.Subtract(TimeSpan.FromDays(Utils.GetIntEnvironmentVariable("FBI-EXPORTPURGEAFTERDAYS", "30"))); + var createdTimeTo = DateTime.UtcNow.Subtract(TimeSpan.FromDays(Utils.GetIntEnvironmentVariable("FBI_EXPORTPURGEAFTERDAYS", "30"))); var runtimeStatus = new List { OrchestrationRuntimeStatus.Completed, diff --git a/FHIRBulkImport/FHIRBulkImport.csproj b/FHIRBulkImport/FHIRBulkImport.csproj index 16ad223..10b27c9 100644 --- a/FHIRBulkImport/FHIRBulkImport.csproj +++ b/FHIRBulkImport/FHIRBulkImport.csproj @@ -8,8 +8,10 @@ 96064c59-55c4-4084-a2fd-99ed65f18a77 + - + + @@ -24,6 +26,7 @@ + diff --git a/FHIRBulkImport/FHIRUtils.cs b/FHIRBulkImport/FHIRUtils.cs index 0b95b25..ae890f8 100644 --- a/FHIRBulkImport/FHIRUtils.cs +++ b/FHIRBulkImport/FHIRUtils.cs @@ -32,13 +32,13 @@ public enum BundleType public static class FHIRUtils { //AD Settings - private static bool isMsi = Utils.GetBoolEnvironmentVariable("FS-ISMSI", false); - private static string resource = Utils.GetEnvironmentVariable("FS-RESOURCE"); - private static string tenant = Utils.GetEnvironmentVariable("FS-TENANT-NAME"); - private static string clientid = Utils.GetEnvironmentVariable("FS-CLIENT-ID"); - private static string secret = Utils.GetEnvironmentVariable("FS-SECRET"); - private static string authority = Utils.GetEnvironmentVariable("FS-AUTHORITY", "https://login.microsoftonline.com"); - private static string fsurl = Utils.GetEnvironmentVariable("FS-URL"); + private static bool isMsi = Utils.GetBoolEnvironmentVariable("FS_ISMSI", false); + private static string resource = Utils.GetEnvironmentVariable("FS_RESOURCE"); + private static string tenant = Utils.GetEnvironmentVariable("FS_TENANT_NAME"); + private static string clientid = Utils.GetEnvironmentVariable("FS_CLIENT_ID"); + private static string secret = Utils.GetEnvironmentVariable("FS_SECRET"); + private static string authority = Utils.GetEnvironmentVariable("FS_AUTHORITY", "https://login.microsoftonline.com"); + private static string fsurl = Utils.GetEnvironmentVariable("FS_URL"); private static ConcurrentDictionary _tokens = new ConcurrentDictionary(); private static readonly HttpStatusCode[] httpStatusCodesWorthRetrying = { HttpStatusCode.RequestTimeout, // 408 @@ -51,10 +51,10 @@ public static class FHIRUtils private static HttpClient _fhirClient = new HttpClient( new SocketsHttpHandler() { - ResponseDrainTimeout = TimeSpan.FromSeconds(Utils.GetIntEnvironmentVariable("FBI-POOLEDCON-RESPONSEDRAINSECS", "60")), - PooledConnectionLifetime = TimeSpan.FromMinutes(Utils.GetIntEnvironmentVariable("FBI-POOLEDCON-LIFETIME", "5")), - PooledConnectionIdleTimeout = TimeSpan.FromMinutes(Utils.GetIntEnvironmentVariable("FBI-POOLEDCON-IDLETO", "2")), - MaxConnectionsPerServer = Utils.GetIntEnvironmentVariable("FBI-POOLEDCON-MAXCONNECTIONS", "20"), + ResponseDrainTimeout = TimeSpan.FromSeconds(Utils.GetIntEnvironmentVariable("FBI_POOLEDCON_RESPONSEDRAINSECS", "60")), + PooledConnectionLifetime = TimeSpan.FromMinutes(Utils.GetIntEnvironmentVariable("FBI_POOLEDCON_LIFETIME", "5")), + PooledConnectionIdleTimeout = TimeSpan.FromMinutes(Utils.GetIntEnvironmentVariable("FBI_POOLEDCON_IDLETO", "2")), + MaxConnectionsPerServer = Utils.GetIntEnvironmentVariable("FBI_POOLEDCON_MAXCONNECTIONS", "20"), }); public static async System.Threading.Tasks.Task CallFHIRServer(string path, string body, HttpMethod method, ILogger log, string customAudience = null) @@ -77,17 +77,17 @@ public static async System.Threading.Tasks.Task CallFHIRServer(str var retryPolicy = Policy .Handle() .OrResult(r => httpStatusCodesWorthRetrying.Contains(r.StatusCode)) - .WaitAndRetryAsync(Utils.GetIntEnvironmentVariable("FBI-POLLY-MAXRETRIES","3"), retryAttempt => - TimeSpan.FromMilliseconds(Utils.GetIntEnvironmentVariable("FBI-POLLY-RETRYMS", "500")), (result, timeSpan, retryCount, context) => + .WaitAndRetryAsync(Utils.GetIntEnvironmentVariable("FBI_POLLY_MAXRETRIES","3"), retryAttempt => + TimeSpan.FromMilliseconds(Utils.GetIntEnvironmentVariable("FBI_POLLY_RETRYMS", "500")), (result, timeSpan, retryCount, context) => { log.LogWarning($"FHIR Request failed on a retryable status...Waiting {timeSpan} before next retry. Attempt {retryCount}"); } ); - if (Utils.GetBoolEnvironmentVariable("FBI-POLLY-EXPONENTIAL")) + if (Utils.GetBoolEnvironmentVariable("FBI_POLLY_EXPONENTIAL")) { - int maxRetries = Utils.GetIntEnvironmentVariable("FBI-POLLY-MAXRETRIES", "3"); - double retryMs = Utils.GetIntEnvironmentVariable("FBI-POLLY-RETRYMS", "500"); + int maxRetries = Utils.GetIntEnvironmentVariable("FBI_POLLY_MAXRETRIES", "3"); + double retryMs = Utils.GetIntEnvironmentVariable("FBI_POLLY_RETRYMS", "500"); double jitterFactor = 0.2; // You can adjust this value as needed Random jitterRandom = new Random(); @@ -107,7 +107,7 @@ public static async System.Threading.Tasks.Task CallFHIRServer(str ); } - string bundleProcessingLogic = Utils.GetEnvironmentVariable("FBI-FHIR-BUNDLEPROCESSINGLOGIC", ""); + string bundleProcessingLogic = Utils.GetEnvironmentVariable("FBI_FHIR_BUNDLEPROCESSINGLOGIC", ""); HttpResponseMessage _fhirResponse = await retryPolicy.ExecuteAsync(async () => @@ -188,7 +188,7 @@ public static string[] SplitBundle(string requestBody, string originname, ILogge if (rtt.Equals("Bundle")) { JArray entries = (JArray)result["entry"]; - int mbs = Utils.GetIntEnvironmentVariable("FBI-MAXBUNDLESIZE", "500"); + int mbs = Utils.GetIntEnvironmentVariable("FBI_MAXBUNDLESIZE", "500"); if (entries.Count > mbs) { if (bt.Equals("batch")) @@ -317,7 +317,7 @@ public static async Task FromHttpResponseMessage(HttpResponseMessa { string s_retry = null; retVal.ResponseHeaders.TryGetValue("x-ms-retry-after-ms", out s_retry); - if (s_retry==null) s_retry = Environment.GetEnvironmentVariable("FBI-DEFAULTRETRY"); + if (s_retry==null) s_retry = Environment.GetEnvironmentVariable("FBI_DEFAULTRETRY"); int i = 0; if (!int.TryParse(s_retry, out i)) { diff --git a/FHIRBulkImport/ImportBundleBlobTrigger.cs b/FHIRBulkImport/ImportBundleBlobTrigger.cs index ca553b7..6caf31f 100644 --- a/FHIRBulkImport/ImportBundleBlobTrigger.cs +++ b/FHIRBulkImport/ImportBundleBlobTrigger.cs @@ -21,7 +21,7 @@ public ImportBundleBlobTrigger(TelemetryConfiguration telemetryConfiguration) [Function("ImportBundleBlobTrigger")] - public async Task Run([BlobTrigger("bundles/{name}", Connection = "FBI-STORAGEACCT-IDENTITY")]Stream myBlob, string name, FunctionContext context) + public async Task Run([BlobTrigger("bundles/{name}", Connection = "FBI_STORAGEACCT_IDENTITY")]Stream myBlob, string name, FunctionContext context) { var logger = context.GetLogger("ImportBundleBlobTrigger"); await ImportUtils.ImportBundle(name, logger, _telemetryClient); diff --git a/FHIRBulkImport/ImportBundleEventGrid.cs b/FHIRBulkImport/ImportBundleEventGrid.cs index 6f89ceb..19785f8 100644 --- a/FHIRBulkImport/ImportBundleEventGrid.cs +++ b/FHIRBulkImport/ImportBundleEventGrid.cs @@ -17,7 +17,7 @@ public class ImportBundleEventGrid { [Function("ImportBundleEventGrid")] - [QueueOutput("bundlequeue", Connection = "FBI-STORAGEACCT-QUEUEURI-IDENTITY")] + [QueueOutput("bundlequeue", Connection = "FBI_STORAGEACCT_QUEUEURI_IDENTITY")] public JObject Run([EventGridTrigger]EventGridEvent eventGridEvent, FunctionContext context) { diff --git a/FHIRBulkImport/ImportBundleHTTP.cs b/FHIRBulkImport/ImportBundleHTTP.cs index 811e518..b288fc5 100644 --- a/FHIRBulkImport/ImportBundleHTTP.cs +++ b/FHIRBulkImport/ImportBundleHTTP.cs @@ -38,7 +38,7 @@ public async Task Run( var o = JObject.Parse(requestBody); if (o["resourceType"] !=null && o["resourceType"].ToString().Equals("Bundle")) { - var cbclient = StorageUtils.GetCloudBlobClient(System.Environment.GetEnvironmentVariable("FBI-STORAGEACCT")); + var cbclient = StorageUtils.GetCloudBlobClient(System.Environment.GetEnvironmentVariable("FBI_STORAGEACCT")); await StorageUtils.WriteStringToBlob(cbclient, "bundles", filename, requestBody, logger); response.StatusCode = System.Net.HttpStatusCode.Accepted; response.Headers.Add("Content-Type", "application/json"); diff --git a/FHIRBulkImport/ImportBundleQueue.cs b/FHIRBulkImport/ImportBundleQueue.cs index d04ad74..d233922 100644 --- a/FHIRBulkImport/ImportBundleQueue.cs +++ b/FHIRBulkImport/ImportBundleQueue.cs @@ -26,7 +26,7 @@ public ImportBundleQueue(TelemetryConfiguration telemetryConfiguration) } [Function("ImportBundleQueue")] - public async Task Run([QueueTrigger("bundlequeue", Connection = "FBI-STORAGEACCT-QUEUEURI-IDENTITY")] QueueMessage queueMessage, FunctionContext context) + public async Task Run([QueueTrigger("bundlequeue", Connection = "FBI_STORAGEACCT_QUEUEURI_IDENTITY")] QueueMessage queueMessage, FunctionContext context) { var logger = context.GetLogger("ImportBundleQueue"); string bodyText = queueMessage.Body.ToString(); @@ -39,22 +39,22 @@ public async Task Run([QueueTrigger("bundlequeue", Connection = "FBI-STORAGEACCT } [Function("PoisonQueueRetries")] public static async Task PoisonQueueRetries( - [TimerTrigger("%FBI-POISONQUEUE-TIMER-CRON%")] TimerInfo timerInfo, + [TimerTrigger("%FBI_POISONQUEUE_TIMER_CRON%")] TimerInfo timerInfo, FunctionContext context) { var logger = context.GetLogger("PoisonQueueRetries"); logger.LogInformation($"PoisonQueueRetries:Checking for poison queue messages in bundlequeue-poison..."); - var sourceQueue = new QueueClient(new Uri($"{Utils.GetEnvironmentVariable("FBI-STORAGEACCT-QUEUEURI")}/bundlequeue-poison"),new DefaultAzureCredential()); + var sourceQueue = new QueueClient(new Uri($"{Utils.GetEnvironmentVariable("FBI_STORAGEACCT_QUEUEURI")}/bundlequeue-poison"),new DefaultAzureCredential()); await sourceQueue.CreateIfNotExistsAsync(); - var targetQueue = new QueueClient(new Uri($"{Utils.GetEnvironmentVariable("FBI-STORAGEACCT-QUEUEURI")}/bundlequeue"), new DefaultAzureCredential()); + var targetQueue = new QueueClient(new Uri($"{Utils.GetEnvironmentVariable("FBI_STORAGEACCT_QUEUEURI")}/bundlequeue"), new DefaultAzureCredential()); await targetQueue.CreateIfNotExistsAsync(); - int maxrequeuemessages = Utils.GetIntEnvironmentVariable("FBI-MAXREQUEUE-MESSAGE-COUNT", "100"); + int maxrequeuemessages = Utils.GetIntEnvironmentVariable("FBI_MAXREQUEUE_MESSAGE_COUNT", "100"); int messagesrequeued = 0; if (await sourceQueue.ExistsAsync()) { QueueProperties properties = sourceQueue.GetProperties(); // Retrieve the cached approximate message count. - int cachedMessagesCount = properties.ApproximateMessagesCount; + int cachedMessagesCount = properties.ApproximateMessagesCount; logger.LogInformation($"PoisonQueueRetries:Found {cachedMessagesCount} messages in bundlequeue-poison....Re-queing upto {maxrequeuemessages}"); while(cachedMessagesCount > 0 && messagesrequeued < maxrequeuemessages) { int batchsize = (maxrequeuemessages - messagesrequeued >= 32 ? 32 : maxrequeuemessages - messagesrequeued); diff --git a/FHIRBulkImport/ImportCompressedFiles.cs b/FHIRBulkImport/ImportCompressedFiles.cs index 7459c40..9a8e158 100644 --- a/FHIRBulkImport/ImportCompressedFiles.cs +++ b/FHIRBulkImport/ImportCompressedFiles.cs @@ -21,7 +21,7 @@ public ImportCompressedFiles(TelemetryConfiguration telemetryConfiguration) _telemetryClient = new TelemetryClient(telemetryConfiguration); } [Function("ImportCompressedFiles")] - public async Task Run([BlobTrigger("zip/{name}", Connection = "FBI-STORAGEACCT-IDENTITY")]Stream myBlob, string name, FunctionContext context) + public async Task Run([BlobTrigger("zip/{name}", Connection = "FBI_STORAGEACCT_IDENTITY")]Stream myBlob, string name, FunctionContext context) { var logger = context.GetLogger("ImportCompressedFiles"); try @@ -30,7 +30,7 @@ public async Task Run([BlobTrigger("zip/{name}", Connection = "FBI-STORAGEACCT-I if (name.Split('.').Last().ToLower() == "zip") { - var blobClient = StorageUtils.GetCloudBlobClient(Utils.GetEnvironmentVariable("FBI-STORAGEACCT")); + var blobClient = StorageUtils.GetCloudBlobClient(Utils.GetEnvironmentVariable("FBI_STORAGEACCT")); var containerndjson = blobClient.GetBlobContainerClient("ndjson"); var containerbundles = blobClient.GetBlobContainerClient("bundles"); @@ -38,7 +38,7 @@ public async Task Run([BlobTrigger("zip/{name}", Connection = "FBI-STORAGEACCT-I { logger.LogInformation($"ImportCompressedFiles: Decompressing {name} ..."); await myBlob.CopyToAsync(blobMemStream); - + blobMemStream.Position = 0; using (ZipArchive archive = new ZipArchive(blobMemStream)) { foreach (ZipArchiveEntry entry in archive.Entries) diff --git a/FHIRBulkImport/ImportNDJSONEventGird.cs b/FHIRBulkImport/ImportNDJSONEventGird.cs index a277490..af9bf43 100644 --- a/FHIRBulkImport/ImportNDJSONEventGird.cs +++ b/FHIRBulkImport/ImportNDJSONEventGird.cs @@ -15,7 +15,7 @@ public static class ImportNDJSONEventGird { [Function("ImportNDJSON")] - [QueueOutput("ndjsonqueue", Connection = "FBI-STORAGEACCT-QUEUEURI-IDENTITY")] + [QueueOutput("ndjsonqueue", Connection = "FBI_STORAGEACCT_QUEUEURI_IDENTITY")] public static JObject Run([EventGridTrigger]EventGridEvent eventGridEvent, FunctionContext context) { diff --git a/FHIRBulkImport/ImportNDJSONQueue.cs b/FHIRBulkImport/ImportNDJSONQueue.cs index 6ace671..186e016 100644 --- a/FHIRBulkImport/ImportNDJSONQueue.cs +++ b/FHIRBulkImport/ImportNDJSONQueue.cs @@ -13,7 +13,7 @@ namespace FHIRBulkImport public class ImportNDJSONQueue { [Function("ImportNDJSONQueue")] - public static async Task Run([QueueTrigger("ndjsonqueue", Connection = "FBI-STORAGEACCT-QUEUEURI-IDENTITY")] QueueMessage queueMessage,FunctionContext context) + public static async Task Run([QueueTrigger("ndjsonqueue", Connection = "FBI_STORAGEACCT_QUEUEURI_IDENTITY")] QueueMessage queueMessage,FunctionContext context) { var logger = context.GetLogger("ImportNDJSONQueue"); logger.LogInformation("Function triggered. Started Processing"); diff --git a/FHIRBulkImport/ImportUtils.cs b/FHIRBulkImport/ImportUtils.cs index a65a705..6c43b9f 100644 --- a/FHIRBulkImport/ImportUtils.cs +++ b/FHIRBulkImport/ImportUtils.cs @@ -20,16 +20,16 @@ public static class ImportUtils { //Unahandeled Exceptions worth retrying public static string MESSAGE_RETRY_SETTING = "request was canceled due to the configured HttpClient.Timeout,target machine actively refused it,an error occurred while sending the request"; - public static string[] EXCEPTION_MESSAGE_STRINGS_RETRY = Utils.GetEnvironmentVariable("FBI-UNHANDLED-RETRY-MESSAGES",MESSAGE_RETRY_SETTING).Split(','); + public static string[] EXCEPTION_MESSAGE_STRINGS_RETRY = Utils.GetEnvironmentVariable("FBI_UNHANDLED_RETRY_MESSAGES",MESSAGE_RETRY_SETTING).Split(','); public static async Task ImportBundle(string name, ILogger log, TelemetryClient telemetryClient) { // Setup for metrics - bool trbundles = Utils.GetBoolEnvironmentVariable("FBI-TRANSFORMBUNDLES", true); + bool trbundles = Utils.GetBoolEnvironmentVariable("FBI_TRANSFORMBUNDLES", true); log.LogInformation($"ImportFHIRBundles: Processing file Name:{name}..."); - var cbclient = StorageUtils.GetCloudBlobClient(Utils.GetEnvironmentVariable("FBI-STORAGEACCT")); - string container = Utils.GetEnvironmentVariable("FBI-CONTAINER-BUNDLES", "bundles"); + var cbclient = StorageUtils.GetCloudBlobClient(Utils.GetEnvironmentVariable("FBI_STORAGEACCT")); + string container = Utils.GetEnvironmentVariable("FBI_CONTAINER_BUNDLES", "bundles"); Stream myBlob = await StorageUtils.GetStreamForBlob(cbclient, container, name, log); if (myBlob == null) { From b1735ca70ef5781e152e2c9f1f815223d20d994b Mon Sep 17 00:00:00 2001 From: "Ganesh Kuber (Centific Technologies Inc)" Date: Fri, 29 Aug 2025 16:40:16 +0530 Subject: [PATCH 5/8] Changed '-' to '_' --- FHIRBulkImport/ADUtils.cs | 2 +- FHIRBulkImport/ExportOrchestrator.cs | 8 ++++- FHIRBulkImport/FHIRBulkImport.csproj | 4 +++ FHIRBulkImport/ImportBundleQueue.cs | 2 +- FHIRBulkImport/ImportNDJSONQueue.cs | 20 ++++++------- scripts/fhirBulkImport.bicep | 44 ++++++++++++++-------------- scripts/fhirBulkImport.json | 44 ++++++++++++++-------------- 7 files changed, 67 insertions(+), 57 deletions(-) diff --git a/FHIRBulkImport/ADUtils.cs b/FHIRBulkImport/ADUtils.cs index 2b818fc..8b26a90 100644 --- a/FHIRBulkImport/ADUtils.cs +++ b/FHIRBulkImport/ADUtils.cs @@ -49,7 +49,7 @@ public static async Task GetAADAccessToken(string authority, string clie catch (Exception e) { log.LogError($"GetAADAccessToken: Exception getting access token: {e.Message}"); - return null; + return null; } } diff --git a/FHIRBulkImport/ExportOrchestrator.cs b/FHIRBulkImport/ExportOrchestrator.cs index 8f473c7..1803cb7 100644 --- a/FHIRBulkImport/ExportOrchestrator.cs +++ b/FHIRBulkImport/ExportOrchestrator.cs @@ -632,7 +632,13 @@ public static async Task CleanupOldRuns( { var log = context.GetLogger("ExportHistoryCleanUp"); var createdTimeFrom = DateTime.MinValue; - var createdTimeTo = DateTime.UtcNow.Subtract(TimeSpan.FromDays(Utils.GetIntEnvironmentVariable("FBI_EXPORTPURGEAFTERDAYS", "30"))); + + if (createdTimeFrom.Year < 1000) + { + createdTimeFrom = new DateTime(2000, 1, 1, 0, 0, 0, DateTimeKind.Utc); + } + + var createdTimeTo = DateTime.UtcNow.Subtract(TimeSpan.FromDays(Utils.GetIntEnvironmentVariable("FBI_EXPORTPURGEAFTERDAYS", "30"))); var runtimeStatus = new List { OrchestrationRuntimeStatus.Completed, diff --git a/FHIRBulkImport/FHIRBulkImport.csproj b/FHIRBulkImport/FHIRBulkImport.csproj index 10b27c9..4575027 100644 --- a/FHIRBulkImport/FHIRBulkImport.csproj +++ b/FHIRBulkImport/FHIRBulkImport.csproj @@ -22,6 +22,7 @@ + @@ -45,4 +46,7 @@ + + + diff --git a/FHIRBulkImport/ImportBundleQueue.cs b/FHIRBulkImport/ImportBundleQueue.cs index d233922..5e60af7 100644 --- a/FHIRBulkImport/ImportBundleQueue.cs +++ b/FHIRBulkImport/ImportBundleQueue.cs @@ -33,7 +33,7 @@ public async Task Run([QueueTrigger("bundlequeue", Connection = "FBI_STORAGEACCT JObject blobCreatedEvent = JObject.Parse(bodyText); string url = (string)blobCreatedEvent["data"]["url"]; logger.LogInformation($"ImportBundleEventGrid: Processing blob at {url}..."); - string container = Utils.GetEnvironmentVariable("FBI-CONTAINER-BUNDLES", "bundles"); + string container = Utils.GetEnvironmentVariable("FBI_CONTAINER_BUNDLES", "bundles"); string name = url.Substring(url.IndexOf($"/{container}/") + $"/{container}/".Length); await ImportUtils.ImportBundle(name, logger, _telemetryClient); } diff --git a/FHIRBulkImport/ImportNDJSONQueue.cs b/FHIRBulkImport/ImportNDJSONQueue.cs index 186e016..b8cb51a 100644 --- a/FHIRBulkImport/ImportNDJSONQueue.cs +++ b/FHIRBulkImport/ImportNDJSONQueue.cs @@ -13,7 +13,7 @@ namespace FHIRBulkImport public class ImportNDJSONQueue { [Function("ImportNDJSONQueue")] - public static async Task Run([QueueTrigger("ndjsonqueue", Connection = "FBI_STORAGEACCT_QUEUEURI_IDENTITY")] QueueMessage queueMessage,FunctionContext context) + public static async Task Run([QueueTrigger("ndjsonqueue", Connection = "FBI_STORAGEACCT_QUEUEURI_IDENTITY")] QueueMessage queueMessage, FunctionContext context) { var logger = context.GetLogger("ImportNDJSONQueue"); logger.LogInformation("Function triggered. Started Processing"); @@ -37,11 +37,11 @@ public static async Task Run([QueueTrigger("ndjsonqueue", Connection = "FBI_STOR return; } int maxresourcesperbundle = 200; - var cbclient = StorageUtils.GetCloudBlobClient(System.Environment.GetEnvironmentVariable("FBI-STORAGEACCT")); - string container = Utils.GetEnvironmentVariable("FBI-CONTAINER-NDJSON", "ndjson"); + var cbclient = StorageUtils.GetCloudBlobClient(System.Environment.GetEnvironmentVariable("FBI_STORAGEACCT")); + string container = Utils.GetEnvironmentVariable("FBI_CONTAINER_NDJSON", "ndjson"); string name = url.Substring(url.IndexOf($"/{container}/") + $"/{container}/".Length); logger.LogInformation($"Blob name resolved: {name}"); - string mrbundlemax = System.Environment.GetEnvironmentVariable("FBI-MAXRESOURCESPERBUNDLE"); + string mrbundlemax = System.Environment.GetEnvironmentVariable("FBI_MAXRESOURCESPERBUNDLE"); if (!string.IsNullOrEmpty(mrbundlemax)) { if (!int.TryParse(mrbundlemax, out maxresourcesperbundle)) maxresourcesperbundle = 200; @@ -53,8 +53,8 @@ public static async Task Run([QueueTrigger("ndjsonqueue", Connection = "FBI_STOR int bundlecnt = 0; int errcnt = 0; int fileno = 1; - Stream myBlob = await StorageUtils.GetStreamForBlob(cbclient, container, name,logger); - if (myBlob==null) + Stream myBlob = await StorageUtils.GetStreamForBlob(cbclient, container, name, logger); + if (myBlob == null) { logger.LogWarning($"ImportNDJSONQueue:The blob {name} in container {container} does not exist or cannot be read."); return; @@ -67,7 +67,7 @@ public static async Task Run([QueueTrigger("ndjsonqueue", Connection = "FBI_STOR while ((line = reader.ReadLine()) != null) { - linecnt++; + linecnt++; JObject res = null; logger.LogDebug($"Reading line {linecnt}"); try @@ -88,7 +88,7 @@ public static async Task Run([QueueTrigger("ndjsonqueue", Connection = "FBI_STOR { logger.LogInformation($"Writing bundle {name}--{fileno++}.json with {bundlecnt} resources"); await StorageUtils.WriteStringToBlob(cbclient, "bundles", $"{name}-{fileno++}.json", rv.ToString(), logger); - + bundlecnt = 0; rv = null; rv = ImportUtils.initBundle(); @@ -110,6 +110,6 @@ public static async Task Run([QueueTrigger("ndjsonqueue", Connection = "FBI_STOR } } - + } -} +} \ No newline at end of file diff --git a/scripts/fhirBulkImport.bicep b/scripts/fhirBulkImport.bicep index 86afcbd..121ff76 100644 --- a/scripts/fhirBulkImport.bicep +++ b/scripts/fhirBulkImport.bicep @@ -223,54 +223,54 @@ resource fhirProxyAppSettings 'Microsoft.Web/sites/config@2021-03-01' = { AzureFunctionsJobHost__functionTimeout: '23:00:00' // Storage account to setup import from - 'FBI-STORAGEACCT': storageAccountUri - 'FBI-STORAGEACCT-QUEUEURI-IDENTITY__queueServiceUri': storageAccountQueueUri - 'FBI-STORAGEACCT-IDENTITY__blobServiceUri': storageAccountUri - 'FBI-STORAGEACCT-QUEUEURI': storageAccountQueueUri + 'FBI_STORAGEACCT': storageAccountUri + 'FBI_STORAGEACCT_QUEUEURI_IDENTITY': storageAccountQueueUri + 'FBI_STORAGEACCT_IDENTITY': storageAccountUri + 'FBI_STORAGEACCT_QUEUEURI': storageAccountQueueUri // URL for the FHIR endpoint - 'FS-URL': fhirUrl + 'FS_URL': fhirUrl // Resource for the FHIR endpoint. - 'FS-RESOURCE': empty(fhirAudience) ? fhirUrl : fhirAudience + 'FS_RESOURCE': empty(fhirAudience) ? fhirUrl : fhirAudience // Tenant of FHIR Server - 'FS-TENANT-NAME': tenantId + 'FS_TENANT_NAME': tenantId - 'FS-ISMSI': authenticationType == 'managedIdentity' ? 'true' : 'false' + 'FS_ISMSI': authenticationType == 'managedIdentity' ? 'true' : 'false' - 'FS-CLIENT-ID': authenticationType == 'servicePrincipal' ? serviceAccountClientId : '' + 'FS_CLIENT_ID': authenticationType == 'servicePrincipal' ? serviceAccountClientId : '' - 'FS-SECRET': authenticationType == 'servicePrincipal' ? serviceAccountSecret : '' + 'FS_SECRET': authenticationType == 'servicePrincipal' ? serviceAccountSecret : '' // When loading bundles, convert transaction to batch bundles. Transform UUIDs and resolve ifNoneExist TRANSFORMBUNDLES: '${transformTransactionBundles}' // ADVANCED // Max number of resources in a bundle - 'FBI-MAXBUNDLESIZE': '500' + 'FBI_MAXBUNDLESIZE': '500' // When loading NDJSON, how many resources to put in a single bundle - 'FBI-MAXRESOURCESPERBUNDLE': '500' + 'FBI_MAXRESOURCESPERBUNDLE': '500' // Max HTTP retries on the FHIR Server - 'FBI-POLLY-MAXRETRIES': '3' + 'FBI_POLLY_MAXRETRIES': '3' // Retry delay for FHIR Server requests - 'FBI-POLLY-RETRYMS': '500' + 'FBI_POLLY_RETRYMS': '500' // ResponseDrainTimeout - 'FBI-POOLEDCON-RESPONSEDRAINSECS': '60' + 'FBI_POOLEDCON_RESPONSEDRAINSECS': '60' // PooledConnectionLifetime - 'FBI-POOLEDCON-LIFETIME': '5' + 'FBI_POOLEDCON_LIFETIME': '5' // PooledConnectionIdleTimeout - 'FBI-POOLEDCON-IDLETO': '2' + 'FBI_POOLEDCON_IDLETO': '2' // MaxConnectionsPerServer - 'FBI-POOLEDCON-MAXCONNECTIONS': '20' + 'FBI_POOLEDCON_MAXCONNECTIONS': '20' // Max file size to load. -1 disables this. - 'FBI-MAXFILESIZEMB': '-1' + 'FBI_MAXFILESIZEMB': '-1' // Max number of concurrent exports - 'FBI-MAXEXPORTS': '-1' + 'FBI_MAXEXPORTS': '-1' // How long to leave exports on the storage account - 'FBI-EXPORTPURGEAFTERDAYS': '30' + 'FBI_EXPORTPURGEAFTERDAYS': '30' // Period to run the poision queue function. - 'FBI-POISONQUEUE-TIMER-CRON': '0 */2 * * * *' + 'FBI_POISONQUEUE_TIMER_CRON': '0 */2 * * * *' } } diff --git a/scripts/fhirBulkImport.json b/scripts/fhirBulkImport.json index c08f69e..4db12a2 100644 --- a/scripts/fhirBulkImport.json +++ b/scripts/fhirBulkImport.json @@ -364,29 +364,29 @@ "SCM_DO_BUILD_DURING_DEPLOYMENT": "true", "AzureWebJobs.ImportBundleBlobTrigger.Disabled": "1", "AzureFunctionsJobHost__functionTimeout": "23:00:00", - "FBI-STORAGEACCT": "[format('https://{0}.blob.core.windows.net/',variables('storageAccountName'))]", - "FBI-STORAGEACCT-QUEUEURI-IDENTITY__queueServiceUri": "[format('https://{0}.queue.core.windows.net/',variables('storageAccountName'))]", - "FBI-STORAGEACCT-IDENTITY__blobServiceUri": "[format('https://{0}.blob.core.windows.net/',variables('storageAccountName'))]", - "FBI-STORAGEACCT-QUEUEURI": "[format('https://{0}.queue.core.windows.net/',variables('storageAccountName'))]", - "FS-URL": "[variables('fhirUrl')]", - "FS-RESOURCE": "[if(empty(parameters('fhirAudience')), variables('fhirUrl'), parameters('fhirAudience'))]", - "FS-TENANT-NAME": "[variables('tenantId')]", - "FS-ISMSI": "[if(equals(parameters('authenticationType'), 'managedIdentity'), 'true', 'false')]", - "FS-CLIENT-ID": "[if(equals(parameters('authenticationType'), 'servicePrincipal'), parameters('serviceAccountClientId'), '')]", - "FS-SECRET": "[if(equals(parameters('authenticationType'), 'servicePrincipal'), parameters('serviceAccountSecret'), '')]", + "FBI_STORAGEACCT": "[format('https://{0}.blob.core.windows.net/',variables('storageAccountName'))]", + "FBI_STORAGEACCT_QUEUEURI_IDENTITY": "[format('https://{0}.queue.core.windows.net/',variables('storageAccountName'))]", + "FBI_STORAGEACCT_IDENTITY": "[format('https://{0}.blob.core.windows.net/',variables('storageAccountName'))]", + "FBI_STORAGEACCT_QUEUEURI": "[format('https://{0}.queue.core.windows.net/',variables('storageAccountName'))]", + "FS_URL": "[variables('fhirUrl')]", + "FS_RESOURCE": "[if(empty(parameters('fhirAudience')), variables('fhirUrl'), parameters('fhirAudience'))]", + "FS_TENANT_NAME": "[variables('tenantId')]", + "FS_ISMSI": "[if(equals(parameters('authenticationType'), 'managedIdentity'), 'true', 'false')]", + "FS_CLIENT_ID": "[if(equals(parameters('authenticationType'), 'servicePrincipal'), parameters('serviceAccountClientId'), '')]", + "FS_SECRET": "[if(equals(parameters('authenticationType'), 'servicePrincipal'), parameters('serviceAccountSecret'), '')]", "TRANSFORMBUNDLES": "[format('{0}', parameters('transformTransactionBundles'))]", - "FBI-MAXBUNDLESIZE": "500", - "FBI-MAXRESOURCESPERBUNDLE": "500", - "FBI-POLLY-MAXRETRIES": "3", - "FBI-POLLY-RETRYMS": "500", - "FBI-POOLEDCON-RESPONSEDRAINSECS": "60", - "FBI-POOLEDCON-LIFETIME": "5", - "FBI-POOLEDCON-IDLETO": "2", - "FBI-POOLEDCON-MAXCONNECTIONS": "20", - "FBI-MAXFILESIZEMB": "-1", - "FBI-MAXEXPORTS": "-1", - "FBI-EXPORTPURGEAFTERDAYS": "30", - "FBI-POISONQUEUE-TIMER-CRON": "0 */2 * * * *" + "FBI_MAXBUNDLESIZE": "500", + "FBI_MAXRESOURCESPERBUNDLE": "500", + "FBI_POLLY_MAXRETRIES": "3", + "FBI_POLLY_RETRYMS": "500", + "FBI_POOLEDCON_RESPONSEDRAINSECS": "60", + "FBI_POOLEDCON_LIFETIME": "5", + "FBI_POOLEDCON_IDLETO": "2", + "FBI_POOLEDCON_MAXCONNECTIONS": "20", + "FBI_MAXFILESIZEMB": "-1", + "FBI_MAXEXPORTS": "-1", + "FBI_EXPORTPURGEAFTERDAYS": "30", + "FBI_POISONQUEUE_TIMER_CRON": "0 */2 * * * *" }, "dependsOn": [ "[resourceId('Microsoft.Insights/components', format('{0}-ai', variables('prefixNameCleanShort')))]", From 5aecdef0c92b15356bef4b31d03bffd43bdb4b30 Mon Sep 17 00:00:00 2001 From: ms-snarang Date: Wed, 3 Sep 2025 17:02:34 +0530 Subject: [PATCH 6/8] Infra update --- scripts/fhirBulkImport.bicep | 4 ++-- scripts/fhirBulkImport.json | 6 +++--- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/scripts/fhirBulkImport.bicep b/scripts/fhirBulkImport.bicep index 121ff76..c94a4f1 100644 --- a/scripts/fhirBulkImport.bicep +++ b/scripts/fhirBulkImport.bicep @@ -224,8 +224,8 @@ resource fhirProxyAppSettings 'Microsoft.Web/sites/config@2021-03-01' = { // Storage account to setup import from 'FBI_STORAGEACCT': storageAccountUri - 'FBI_STORAGEACCT_QUEUEURI_IDENTITY': storageAccountQueueUri - 'FBI_STORAGEACCT_IDENTITY': storageAccountUri + 'FBI_STORAGEACCT_QUEUEURI_IDENTITY__queueServiceUri': storageAccountQueueUri + 'FBI_STORAGEACCT_IDENTITY__blobServiceUri': storageAccountUri 'FBI_STORAGEACCT_QUEUEURI': storageAccountQueueUri // URL for the FHIR endpoint diff --git a/scripts/fhirBulkImport.json b/scripts/fhirBulkImport.json index 4db12a2..42f6fac 100644 --- a/scripts/fhirBulkImport.json +++ b/scripts/fhirBulkImport.json @@ -365,8 +365,8 @@ "AzureWebJobs.ImportBundleBlobTrigger.Disabled": "1", "AzureFunctionsJobHost__functionTimeout": "23:00:00", "FBI_STORAGEACCT": "[format('https://{0}.blob.core.windows.net/',variables('storageAccountName'))]", - "FBI_STORAGEACCT_QUEUEURI_IDENTITY": "[format('https://{0}.queue.core.windows.net/',variables('storageAccountName'))]", - "FBI_STORAGEACCT_IDENTITY": "[format('https://{0}.blob.core.windows.net/',variables('storageAccountName'))]", + "FBI_STORAGEACCT_QUEUEURI_IDENTITY__queueServiceUri": "[format('https://{0}.queue.core.windows.net/',variables('storageAccountName'))]", + "FBI_STORAGEACCT_IDENTITY__blobServiceUri": "[format('https://{0}.blob.core.windows.net/',variables('storageAccountName'))]", "FBI_STORAGEACCT_QUEUEURI": "[format('https://{0}.queue.core.windows.net/',variables('storageAccountName'))]", "FS_URL": "[variables('fhirUrl')]", "FS_RESOURCE": "[if(empty(parameters('fhirAudience')), variables('fhirUrl'), parameters('fhirAudience'))]", @@ -400,7 +400,7 @@ "name": "[format('{0}/{1}', format('{0}-func', variables('prefixNameCleanShort')), 'web')]", "properties": { "repoUrl": "[variables('repoUrl')]", - "branch": "personal/v-shanarang/isolatedworker", + "branch": "main", "isManualIntegration": true }, "dependsOn": [ From 1f35e983ceb8491aa5a349e97de0d967eb052b6d Mon Sep 17 00:00:00 2001 From: ms-snarang Date: Thu, 4 Sep 2025 12:47:32 +0530 Subject: [PATCH 7/8] branch change --- scripts/fhirBulkImport.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/scripts/fhirBulkImport.json b/scripts/fhirBulkImport.json index 42f6fac..34c0a1e 100644 --- a/scripts/fhirBulkImport.json +++ b/scripts/fhirBulkImport.json @@ -400,7 +400,7 @@ "name": "[format('{0}/{1}', format('{0}-func', variables('prefixNameCleanShort')), 'web')]", "properties": { "repoUrl": "[variables('repoUrl')]", - "branch": "main", + "branch": "personal/v-shanarang/isolatedworker", "isManualIntegration": true }, "dependsOn": [ From a3974bcbb6488b56e28093d1952bbedbde92656d Mon Sep 17 00:00:00 2001 From: ms-snarang Date: Fri, 5 Sep 2025 14:50:21 +0530 Subject: [PATCH 8/8] Revert the branch to main --- scripts/fhirBulkImport.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/scripts/fhirBulkImport.json b/scripts/fhirBulkImport.json index 34c0a1e..42f6fac 100644 --- a/scripts/fhirBulkImport.json +++ b/scripts/fhirBulkImport.json @@ -400,7 +400,7 @@ "name": "[format('{0}/{1}', format('{0}-func', variables('prefixNameCleanShort')), 'web')]", "properties": { "repoUrl": "[variables('repoUrl')]", - "branch": "personal/v-shanarang/isolatedworker", + "branch": "main", "isManualIntegration": true }, "dependsOn": [