diff --git a/FHIRBulkImport/ADUtils.cs b/FHIRBulkImport/ADUtils.cs index 2b818fc..8b26a90 100644 --- a/FHIRBulkImport/ADUtils.cs +++ b/FHIRBulkImport/ADUtils.cs @@ -49,7 +49,7 @@ public static async Task GetAADAccessToken(string authority, string clie catch (Exception e) { log.LogError($"GetAADAccessToken: Exception getting access token: {e.Message}"); - return null; + return null; } } diff --git a/FHIRBulkImport/ExportAllOrchestrator.cs b/FHIRBulkImport/ExportAllOrchestrator.cs index 43a84f4..ce63ada 100644 --- a/FHIRBulkImport/ExportAllOrchestrator.cs +++ b/FHIRBulkImport/ExportAllOrchestrator.cs @@ -1,56 +1,65 @@ using System.Net.Http; using System.Threading.Tasks; -using Microsoft.Azure.WebJobs; -using Microsoft.Azure.WebJobs.Extensions.DurableTask; -using Microsoft.Azure.WebJobs.Extensions.Http; +using Microsoft.Azure.Functions.Worker; +using Microsoft.Azure.Functions.Worker.Http; using Microsoft.Extensions.Logging; using Newtonsoft.Json.Linq; -using System.Linq; +using System.Linq; using System; using Newtonsoft.Json; using System.Collections.Generic; +using Microsoft.DurableTask; +using Microsoft.DurableTask.Client; +using System.IO; +using System.Threading; namespace FHIRBulkImport { public class ExportAllOrchestrator { - private static int _exportResourceCount = Utils.GetIntEnvironmentVariable("FS-EXPORTRESOURCECOUNT", "1000"); - private static string _storageAccount = Utils.GetEnvironmentVariable("FBI-STORAGEACCT"); - private static int _maxInstances = Utils.GetIntEnvironmentVariable("FBI-MAXEXPORTS", "0"); + private static int _exportResourceCount = Utils.GetIntEnvironmentVariable("FS_EXPORTRESOURCECOUNT", "1000"); + private static string _storageAccount = Utils.GetEnvironmentVariable("FBI_STORAGEACCT"); + private static int _maxInstances = Utils.GetIntEnvironmentVariable("FBI_MAXEXPORTS", "0"); private static int _maxParallelizationCount = 100; - private static int _parallelSearchBundleSize = _maxInstances = Utils.GetIntEnvironmentVariable("FBI-PARALLELSEARCHBUNDLESIZE", "50"); - private static RetryOptions _exportAllRetryOptions = new RetryOptions(firstRetryInterval: TimeSpan.FromSeconds(Utils.GetIntEnvironmentVariable("FBI-EXPORTALLRETRYINTERVAL", "30")), maxNumberOfAttempts: 5) - { BackoffCoefficient = Utils.GetIntEnvironmentVariable("FBI-EXPORTALLBACKOFFCOEFFICIENT", "3") }; - - [FunctionName(nameof(ExportAllOrchestrator_HttpStart))] - public async Task ExportAllOrchestrator_HttpStart( - [HttpTrigger(AuthorizationLevel.Function, "post", Route = "$alt-export-all")] HttpRequestMessage req, - [DurableClient] IDurableOrchestrationClient starter, - ILogger log) + private static int _parallelSearchBundleSize = _maxInstances = Utils.GetIntEnvironmentVariable("FBI_PARALLELSEARCHBUNDLESIZE", "50"); + private static RetryPolicy _exportAllRetryOptions = new RetryPolicy(firstRetryInterval: TimeSpan.FromSeconds(Utils.GetIntEnvironmentVariable("FBI_EXPORTALLRETRYINTERVAL", "30")), maxNumberOfAttempts: 5, + backoffCoefficient: Convert.ToDouble(Utils.GetIntEnvironmentVariable("FBI_EXPORTALLBACKCOEFFICIENT", "3"))); + + + [Function(nameof(ExportAllOrchestrator_HttpStart))] + public async Task ExportAllOrchestrator_HttpStart( + [HttpTrigger(AuthorizationLevel.Function, "post", Route = "$alt-export-all")] HttpRequestData req, + [DurableClient] DurableTaskClient starter, + FunctionContext context) { - string requestParameters = await req.Content.ReadAsStringAsync(); - var state = await ExportOrchestrator.runningInstances(starter, log); + var log = context.GetLogger("ExportAllOrchestrator_HttpStart"); + string requestParameters = await new StreamReader(req.Body).ReadToEndAsync(); + var orchestrator = new ExportOrchestrator(); + var state = await orchestrator.runningInstances(starter, context); int running = state.Count(); if (_maxInstances > 0 && running >= _maxInstances) { string msg = $"Unable to start export there are {running} exports the max concurrent allowed is {_maxInstances}"; - StringContent sc = new StringContent("{\"error\":\"" + msg + "\""); - return new HttpResponseMessage() { Content = sc, StatusCode = System.Net.HttpStatusCode.TooManyRequests }; + string sc = "{\"error\":\"" + msg + "\"}"; + var response = req.CreateResponse(System.Net.HttpStatusCode.TooManyRequests); + await response.WriteStringAsync(sc); + return response; } // Function input comes from the request content. - string instanceId = await starter.StartNewAsync(nameof(ExportAll_RunOrchestrator), null, requestParameters); + string instanceId = await starter.ScheduleNewOrchestrationInstanceAsync(nameof(ExportAll_RunOrchestrator), requestParameters); log.LogInformation($"Started orchestration with ID = '{instanceId}'."); return starter.CreateCheckStatusResponse(req, instanceId); } - [FunctionName(nameof(ExportAll_RunOrchestrator))] + [Function(nameof(ExportAll_RunOrchestrator))] public async Task ExportAll_RunOrchestrator( - [OrchestrationTrigger] IDurableOrchestrationContext context, - ILogger logger) + [OrchestrationTrigger] TaskOrchestrationContext context, + FunctionContext log) { + var logger = log.GetLogger("ExportAll_RunOrchestrator"); // Setup function level variables. JObject retVal = new JObject(); retVal["instanceid"] = context.InstanceId; @@ -83,10 +92,12 @@ public async Task ExportAll_RunOrchestrator( context.SetCustomStatus(retVal); var parallelizationTasks = options.ParallelSearchRanges.Select( - x => context.CallActivityWithRetryAsync>( + x => context.CallActivityAsync>( nameof(ExportAllOrchestrator_GetCountsForListOfDateRanges), - _exportAllRetryOptions, - new GetCountsForListOfDateRangesRequest(options.ResourceType, x.Select(y => (y.Start, y.End, -1)).ToList()))); + + new GetCountsForListOfDateRangesRequest(options.ResourceType, x.Select(y => (y.Start, y.End, -1)).ToList()), + new TaskOptions(_exportAllRetryOptions) + )); var results = await Task.WhenAll(parallelizationTasks); @@ -114,10 +125,11 @@ public async Task ExportAll_RunOrchestrator( nextLink += $"&_lastUpdated=lt{searchRanges[i].End.Value.ToString("o")}"; } - exportTasks.Add(context.CallActivityWithRetryAsync( + exportTasks.Add(context.CallActivityAsync( nameof(ExportAllOrchestrator_GetAndWriteDataPage), - _exportAllRetryOptions, - new DataPageRequest(FhirRequestPath: nextLink, InstanceId: context.InstanceId, ParallelTaskIndex: i, ResourceType: options.ResourceType, Audience: options.Audience))); + + new DataPageRequest(FhirRequestPath: nextLink, InstanceId: context.InstanceId, ParallelTaskIndex: i, ResourceType: options.ResourceType, Audience: options.Audience), + new TaskOptions(_exportAllRetryOptions))); } // Start and continue export as long as there are pending tasks @@ -165,10 +177,11 @@ public async Task ExportAll_RunOrchestrator( if (fhirResult.NextLink != null) { - exportTasks.Add(context.CallActivityWithRetryAsync( + exportTasks.Add(context.CallActivityAsync( nameof(ExportAllOrchestrator_GetAndWriteDataPage), - _exportAllRetryOptions, - new DataPageRequest(FhirRequestPath: fhirResult.NextLink, InstanceId: fhirResult.InstanceId, ParallelTaskIndex: fhirResult.ParallelTaskIndex, ResourceType: options.ResourceType, Audience: options.Audience))); + + new DataPageRequest(FhirRequestPath: fhirResult.NextLink, InstanceId: fhirResult.InstanceId, ParallelTaskIndex: fhirResult.ParallelTaskIndex, ResourceType: options.ResourceType, Audience: options.Audience), + new TaskOptions(_exportAllRetryOptions))); } } @@ -184,11 +197,12 @@ public async Task ExportAll_RunOrchestrator( return retVal; } - [FunctionName(nameof(ExportAllOrchestrator_GetCountsForListOfDateRanges))] + [Function(nameof(ExportAllOrchestrator_GetCountsForListOfDateRanges))] public async Task> ExportAllOrchestrator_GetCountsForListOfDateRanges( [ActivityTrigger] GetCountsForListOfDateRangesRequest input, - ILogger logger) + FunctionContext context) { + var logger = context.GetLogger("ExportAllOrchestrator_GetCountsForListOfDateRanges"); JObject requestBody = new(); requestBody["resourceType"] = "Bundle"; requestBody["type"] = "batch"; @@ -253,12 +267,13 @@ public async Task ExportAll_RunOrchestrator( throw new Exception(message); } - [FunctionName(nameof(ExportAllOrchestrator_GetAndWriteDataPage))] + [Function(nameof(ExportAllOrchestrator_GetAndWriteDataPage))] public async Task ExportAllOrchestrator_GetAndWriteDataPage( [ActivityTrigger] DataPageRequest input, - [DurableClient] IDurableEntityClient ec, - ILogger logger) + [DurableClient] DurableTaskClient ec, + FunctionContext context) { + var logger = context.GetLogger("ExportAllOrchestrator_GetAndWriteDataPage"); logger.LogInformation($"Fetching page of resources using query {input.FhirRequestPath}"); var response = await FHIRUtils.CallFHIRServer(input.FhirRequestPath, "", HttpMethod.Get, logger, input.Audience); diff --git a/FHIRBulkImport/ExportOrchestrator.cs b/FHIRBulkImport/ExportOrchestrator.cs index 0d0cd50..1803cb7 100644 --- a/FHIRBulkImport/ExportOrchestrator.cs +++ b/FHIRBulkImport/ExportOrchestrator.cs @@ -1,26 +1,33 @@ -using System.Collections.Generic; +using Azure.Storage.Blobs.Specialized; +using DurableTask.Core; +using DurableTask.Core.Entities; +using Microsoft.Azure.Functions.Worker; +using Microsoft.Azure.Functions.Worker.Extensions.Http; +using Microsoft.Azure.Functions.Worker.Extensions.DurableTask; +using Microsoft.Azure.Functions.Worker.Http; +using Microsoft.DurableTask; +using Microsoft.DurableTask.Client; +using Microsoft.DurableTask.Entities; +using Microsoft.Extensions.Logging; +using Newtonsoft.Json.Linq; +using System; +using System.Collections; using System.Collections.Concurrent; +using System.Collections.Generic; +using System.ComponentModel; using System.IO; +using System.Linq; +using System.Net; using System.Net.Http; +using System.Net.Http.Headers; using System.Text; -using System.Threading.Tasks; -using Microsoft.Azure.WebJobs; -using Microsoft.Azure.WebJobs.Extensions.DurableTask; -using Microsoft.Azure.WebJobs.Extensions.Http; -using Microsoft.Azure.WebJobs.Host; -using Microsoft.Extensions.Logging; -using Newtonsoft.Json.Linq; -using System.Linq; -using System; using System.Threading; -using System.Net.Http.Headers; -using DurableTask.Core; -using Azure.Storage.Blobs.Specialized; -using System.ComponentModel; +using System.Threading.Tasks; +using System.Text.Json; namespace FHIRBulkImport { - public static class ExportOrchestrator + public class ExportOrchestrator { private static void IdentifyUniquePatientReferences(JObject resource, string patreffield, HashSet uniquePats) { @@ -73,28 +80,31 @@ private static JObject SetContextVariables(string instanceId, string ids = null, if (include != null) o["include"] = include; return o; } - [FunctionName("CountFileLines")] - public static async Task CountFileLines( + [Function("CountFileLines")] + public async Task CountFileLines( [ActivityTrigger] JObject ctx, - ILogger log) + FunctionContext context) { + var log = context.GetLogger("CountFileLines"); string instanceid = (string)ctx["instanceid"]; string blob = (string)ctx["filename"]; - return await FileHolderManager.CountLinesInBlob(Utils.GetEnvironmentVariable("FBI-STORAGEACCT"),instanceid, blob,log); + return await FileHolderManager.CountLinesInBlob(Utils.GetEnvironmentVariable("FBI_STORAGEACCT"),instanceid, blob,log); } - [FunctionName("FileNames")] - public static async Task> FileNames( + [Function("FileNames")] + public async Task> FileNames( [ActivityTrigger] string instanceid, - ILogger log) + FunctionContext context) { + var log = context.GetLogger("FileNames"); + return await FileHolderManager.GetFileNames(instanceid,log); } - [FunctionName("ExportOrchestrator")] - public static async Task RunOrchestrator( - [OrchestrationTrigger] IDurableOrchestrationContext context, - ILogger log) + [Function("ExportOrchestrator")] + public async Task RunOrchestrator( + [OrchestrationTrigger] TaskOrchestrationContext context, + FunctionContext functionContext) { - + var log = functionContext.GetLogger("ExportOrchestrator"); JObject config = null; JObject retVal = new JObject(); HashSet uniquePats = new HashSet(); @@ -212,33 +222,34 @@ public static async Task RunOrchestrator( log.LogInformation($"Completed orchestration with ID = '{context.InstanceId}'."); return filecounts; } - [FunctionName("AppendBlob")] - public static async Task AppendBlob( + [Function("AppendBlob")] + public async Task AppendBlob( [ActivityTrigger] JToken ctx, - ILogger log) + FunctionContext context) { + var log = context.GetLogger("AppendBlob"); string instanceid = (string)ctx["instanceId"]; string rm = (string)ctx["ids"]; - var appendBlobClient = await StorageUtils.GetAppendBlobClient(Utils.GetEnvironmentVariable("FBI-STORAGEACCT"), $"export/{instanceid}", "_completed_run.json"); + var appendBlobClient = await StorageUtils.GetAppendBlobClient(Utils.GetEnvironmentVariable("FBI_STORAGEACCT"), $"export/{instanceid}", "_completed_run.json"); using (MemoryStream ms = new MemoryStream(Encoding.UTF8.GetBytes(rm))) { await appendBlobClient.AppendBlockAsync(ms); } return true; } - [FunctionName("QueryFHIR")] - public static async Task> QueryFHIR( - [ActivityTrigger] IDurableActivityContext context, - ILogger log) + [Function("QueryFHIR")] + public async Task> QueryFHIR( + [ActivityTrigger] JToken input, + FunctionContext context) { - + var log = context.GetLogger("QueryFHIR"); HashSet uniquePats = new HashSet(); try { - JToken vars = context.GetInput(); - string query = vars["query"].ToString(); - string instanceid = vars["instanceid"].ToString(); - string patreffield = (string)vars["patreffield"]; + + string query = input["query"].ToString(); + string instanceid = input["instanceid"].ToString(); + string patreffield = (string)input["patreffield"]; var fhirresp = await FHIRUtils.CallFHIRServer(query, "", HttpMethod.Get, log); if (fhirresp.Success && !string.IsNullOrEmpty(fhirresp.Content)) { @@ -310,9 +321,10 @@ public static async Task> QueryFHIR( } return uniquePats; } - [FunctionName("ExportOrchestrator_ProcessPatientQueryPage")] - public static async Task ProcessPatientQueryPage([OrchestrationTrigger] IDurableOrchestrationContext context, ILogger log) + [Function("ExportOrchestrator_ProcessPatientQueryPage")] + public async Task ProcessPatientQueryPage([OrchestrationTrigger] TaskOrchestrationContext context, FunctionContext functionContext) { + var log = functionContext.GetLogger("ExportOrchestrator_ProcessPatientQueryPage"); JObject retVal = new JObject(); var vars = context.GetInput(); try @@ -370,31 +382,31 @@ public static async Task ProcessPatientQueryPage([OrchestrationTrigger] } return retVal; } - [FunctionName("FileTracker")] - public static void FileTracker ([EntityTrigger] IDurableEntityContext ctx,ILogger log) + [Function("FileTracker")] + public async Task Run([OrchestrationTrigger] TaskOrchestrationContext ctx, FunctionContext context) { - - switch (ctx.OperationName.ToLowerInvariant()) + var log = context.GetLogger("FileTracker"); + var input = ctx.GetInput(); + if (input.Operation == "set") + { + await ctx.CallActivityAsync("FileTracker_Set", input.Value); + } + else if (input.Operation == "get") { - //Set File Number - case "set": - ctx.SetState(ctx.GetInput()); - break; - case "get": - ctx.Return(ctx.GetState()); - break; + var value = await ctx.CallActivityAsync("FileTracker_Get", null); + ctx.SetCustomStatus(value); } + } - [FunctionName("ExportOrchestrator_GatherResources")] - public static async Task GatherResources([ActivityTrigger] IDurableActivityContext context, [DurableClient] IDurableEntityClient entityclient, ILogger log) + [Function("ExportOrchestrator_GatherResources")] + public static async Task GatherResources([ActivityTrigger] JToken input, [DurableClient] DurableTaskClient entityclient, FunctionContext context) { - - - JToken vars = context.GetInput(); + var log = context.GetLogger("ExportOrchestrator_GatherResources"); + int total = 0; - string query = vars["ids"].ToString(); - string instanceid = vars["instanceid"].ToString(); - var rt = (string)vars["resourcetype"]; + string query = input["ids"].ToString(); + string instanceid = input["instanceid"].ToString(); + var rt = (string)input["resourcetype"]; var fhirresp = await FHIRUtils.CallFHIRServer(query, "", HttpMethod.Get, log); if (fhirresp.Success && !string.IsNullOrEmpty(fhirresp.Content)) { @@ -427,7 +439,7 @@ public static async Task GatherResources([ActivityTrigger] IDurableActiv public record struct ConvertToNDJSONResponse(int ResourceCount, string ResourceType, string BlobUrl); - internal static async Task ConvertToNDJSON(JToken bundle, string instanceId, string resourceType, IDurableEntityClient entityclient, ILogger log, int? parallelFileId = null) + internal static async Task ConvertToNDJSON(JToken bundle, string instanceId, string resourceType, DurableTaskClient entityclient, ILogger log, int? parallelFileId = null) { ConvertToNDJSONResponse? retVal = null; int cnt = 0; @@ -454,13 +466,13 @@ public record struct ConvertToNDJSONResponse(int ResourceCount, string ResourceT { string parallelizationModifierStr = parallelFileId.HasValue ? $"-{parallelFileId.Value}" : string.Empty; string key = $"{instanceId}{parallelizationModifierStr}-{resourceType}"; - - var entityId = new EntityId(nameof(FileTracker), key); - var esresp = await entityclient.ReadEntityStateAsync(entityId); - int fileno = esresp.EntityState; + + var getInstanceId = await entityclient.ScheduleNewOrchestrationInstanceAsync("FileTracker", (Operation: "get", Key: key)); + var status = await entityclient.WaitForInstanceCompletionAsync(getInstanceId, CancellationToken.None); + int fileno = status?.SerializedOutput!= null ? JsonSerializer.Deserialize(status.SerializedOutput) : 0; var filename = resourceType + parallelizationModifierStr + "-" + (fileno + 1) + ".ndjson"; - var blobclient = StorageUtils.GetAppendBlobClientSync(Utils.GetEnvironmentVariable("FBI-STORAGEACCT"), $"export/{instanceId}", filename); - long maxfilesizeinbytes = Utils.GetIntEnvironmentVariable("FBI-MAXFILESIZEMB", "-1") * 1024000; + var blobclient = StorageUtils.GetAppendBlobClientSync(Utils.GetEnvironmentVariable("FBI_STORAGEACCT"), $"export/{instanceId}", filename); + long maxfilesizeinbytes = Utils.GetIntEnvironmentVariable("FBI_MAXFILESIZEMB", "-1") * 1024000; int bytestoadd = System.Text.ASCIIEncoding.UTF8.GetByteCount(sb.ToString()); var props = blobclient.GetProperties(); long filetotalbytes = props.Value.ContentLength + bytestoadd; @@ -470,8 +482,8 @@ public record struct ConvertToNDJSONResponse(int ResourceCount, string ResourceT { fileno++; filename = resourceType + parallelizationModifierStr + "-" + (fileno + 1) + ".ndjson"; - blobclient = StorageUtils.GetAppendBlobClientSync(Utils.GetEnvironmentVariable("FBI-STORAGEACCT"), $"export/{instanceId}", filename); - await entityclient.SignalEntityAsync(entityId, "set", fileno); + blobclient = StorageUtils.GetAppendBlobClientSync(Utils.GetEnvironmentVariable("FBI_STORAGEACCT"), $"export/{instanceId}", filename); + await entityclient.ScheduleNewOrchestrationInstanceAsync("FileTracker", (Operation: "set", Key: key, Value: fileno)); } // Write the data to blob storage @@ -490,109 +502,119 @@ public record struct ConvertToNDJSONResponse(int ResourceCount, string ResourceT return retVal; } - [FunctionName("ExportOrchestrator_HttpStart")] - public static async Task HttpStart( - [HttpTrigger(AuthorizationLevel.Function,"post",Route = "$alt-export")] HttpRequestMessage req, - [DurableClient] IDurableOrchestrationClient starter, - ILogger log) + [Function("ExportOrchestrator_HttpStart")] + public async Task HttpStart( + [HttpTrigger(AuthorizationLevel.Function,"post",Route = "$alt-export")] HttpRequestData req, + [DurableClient] DurableTaskClient starter, + FunctionContext context) { + var log = context.GetLogger("ExportOrchestrator_HttpStart"); - string config = await req.Content.ReadAsStringAsync(); - var state = await runningInstances(starter, log); + string config = await new StreamReader(req.Body).ReadToEndAsync(); + var state = await runningInstances(starter, context); int running = state.Count(); - int maxinstances = Utils.GetIntEnvironmentVariable("FBI-MAXEXPORTS", "0"); + int maxinstances = Utils.GetIntEnvironmentVariable("FBI_MAXEXPORTS", "0"); if (maxinstances > 0 && running >= maxinstances) { string msg = $"Unable to start export there are {running} exports the max concurrent allowed is {maxinstances}"; - StringContent sc = new StringContent("{\"error\":\"" + msg + "\""); - return new HttpResponseMessage() { Content = sc, StatusCode = System.Net.HttpStatusCode.TooManyRequests}; + var response = req.CreateResponse(System.Net.HttpStatusCode.TooManyRequests); + await response.WriteStringAsync($"{{\"error\":\"{msg}\"}}"); + + return response; } // Function input comes from the request content. - string instanceId = await starter.StartNewAsync("ExportOrchestrator",null,config); + string instanceId = await starter.ScheduleNewOrchestrationInstanceAsync("ExportOrchestrator",config); log.LogInformation($"Started orchestration with ID = '{instanceId}'."); return starter.CreateCheckStatusResponse(req, instanceId); } - [FunctionName("ExportOrchestrator_InstanceAction")] - public static async Task InstanceAction( - [HttpTrigger(AuthorizationLevel.Function, "get", Route = "$alt-export-manage/{instanceid}")] HttpRequestMessage req, - [DurableClient] IDurableOrchestrationClient starter,string instanceid, - ILogger log) + [Function("ExportOrchestrator_InstanceAction")] + public async Task InstanceAction( + [HttpTrigger(AuthorizationLevel.Function, "get", Route = "$alt-export-manage/{instanceid}")] HttpRequestData req, + [DurableClient] DurableTaskClient starter,string instanceid, + FunctionContext context) { - - var parms = System.Web.HttpUtility.ParseQueryString(req.RequestUri.Query); + var log = context.GetLogger("ExportOrchestrator_InstanceAction"); + var parms = System.Web.HttpUtility.ParseQueryString(req.Url.Query); string action = parms["action"]; - await starter.TerminateAsync(instanceid, "Terminated by User"); + await starter.TerminateInstanceAsync(instanceid, "Terminated by User"); StringContent sc = new StringContent($"Terminated {instanceid}"); var response = req.CreateResponse(System.Net.HttpStatusCode.OK); - response.Content = sc; - response.Content.Headers.ContentType = new MediaTypeHeaderValue("text/plain"); + response.Headers.Add("Content-Type", "text/plain"); + await response.WriteStringAsync("Terminated abc"); return response; } - [FunctionName("ExportOrchestrator_ExportStatus")] - public static async Task ExportStatus( - [HttpTrigger(AuthorizationLevel.Function, "get", Route = "$alt-export-status")] HttpRequestMessage req, - [DurableClient] IDurableOrchestrationClient starter, - ILogger log) + [Function("ExportOrchestrator_ExportStatus")] + public async Task ExportStatus( + [HttpTrigger(AuthorizationLevel.Function, "get", Route = "$alt-export-status")] HttpRequestData req, + [DurableClient] DurableTaskClient client, + FunctionContext context) { - string config = await req.Content.ReadAsStringAsync(); - var state = await runningInstances(starter, log); + var log = context.GetLogger("ExportOrchestrator_ExportStatus"); + string config = await new StreamReader(req.Body).ReadToEndAsync(); + var state = await runningInstances(client, context); JArray retVal = new JArray(); - foreach (DurableOrchestrationStatus status in state) + foreach (var status in state) { + var statusWithInput = await client.GetInstanceAsync(status.InstanceId, getInputsAndOutputs: true); JObject o = new JObject(); o["instanceId"] = status.InstanceId; - o["createdDateTime"] = status.CreatedTime; + o["createdDateTime"] = status.CreatedAt; o["status"] = status.RuntimeStatus.ToString(); - TimeSpan span = (DateTime.UtcNow - status.CreatedTime); + TimeSpan span = (DateTime.UtcNow - status.CreatedAt); o["elapsedtimeinminutes"] = span.TotalMinutes; - o["input"] = status.Input; + o["input"] = statusWithInput?.SerializedInput ?? ""; retVal.Add(o); } - StringContent sc = new StringContent(retVal.ToString()); - var response = req.CreateResponse(System.Net.HttpStatusCode.OK); - response.Content = sc; - response.Content.Headers.ContentType = new MediaTypeHeaderValue("application/json"); + + var response = req.CreateResponse(HttpStatusCode.OK); + response.Headers.Add("Content-Type", "application/json"); + await response.WriteStringAsync(retVal.ToString()); return response; } - [FunctionName("ExportBlobTrigger")] - public static async Task RunBlobTrigger([BlobTrigger("export-trigger/{name}", Connection = "FBI-STORAGEACCT-IDENTITY")] Stream myBlob, string name, [DurableClient] IDurableOrchestrationClient starter, ILogger log) + + [Function("ExportBlobTrigger")] + public async Task RunBlobTrigger([BlobTrigger("export-trigger/{name}", Connection = "FBI_STORAGEACCT_IDENTITY")] Stream myBlob, string name, [DurableClient] DurableTaskClient starter, FunctionContext context) { + var log = context.GetLogger("ExportBlobTrigger"); StreamReader reader = new StreamReader(myBlob); var text = await reader.ReadToEndAsync(); - var state = await runningInstances(starter, log); + var state = await runningInstances(starter, context); int running = state.Count(); - int maxinstances = Utils.GetIntEnvironmentVariable("FBI-MAXEXPORTS", "0"); + int maxinstances = Utils.GetIntEnvironmentVariable("FBI_MAXEXPORTS", "0"); if (maxinstances > 0 && running >= maxinstances) { string msg = $"Unable to start export there are {running} exports the max concurrent allowed is {maxinstances}"; log.LogError($"ExportBlobTrigger:{msg}"); return; } - string instanceId = await starter.StartNewAsync("ExportOrchestrator", null, text); - var bc = StorageUtils.GetCloudBlobClient(Utils.GetEnvironmentVariable("FBI-STORAGEACCT")); + string instanceId = await starter.ScheduleNewOrchestrationInstanceAsync("ExportOrchestrator", text); + var bc = StorageUtils.GetCloudBlobClient(Utils.GetEnvironmentVariable("FBI_STORAGEACCT")); await StorageUtils.MoveTo(bc, "export-trigger", "export-trigger-processed", name, name, log); log.LogInformation($"Started orchestration with ID = '{instanceId}'."); } - public static async Task> runningInstances(IDurableOrchestrationClient client,ILogger log) + public async Task> runningInstances(DurableTaskClient client, FunctionContext context) { - var queryFilter = new OrchestrationStatusQueryCondition + + var queryFilter = new OrchestrationQuery { - RuntimeStatus = new[] + Statuses = new[] { OrchestrationRuntimeStatus.Pending, OrchestrationRuntimeStatus.Running } - + }; + var result = new List(); + await foreach (var item in client.GetAllInstancesAsync(queryFilter)) + { + result.Add(item); + } - OrchestrationStatusQueryResult result = await client.ListInstancesAsync( - queryFilter, - CancellationToken.None); - var retVal = new List(); - foreach (DurableOrchestrationStatus status in result.DurableOrchestrationState) + var retVal = new List(); + await foreach (var status in client.GetAllInstancesAsync(queryFilter)) { if (!status.InstanceId.Contains(":") && !status.InstanceId.StartsWith("@")) { @@ -600,25 +622,32 @@ public static async Task> runningInstanc } } return retVal; - + } - [FunctionName("ExportHistoryCleanUp")] + [Function("ExportHistoryCleanUp")] public static async Task CleanupOldRuns( [TimerTrigger("0 0 0 * * *")] TimerInfo timerInfo, - [DurableClient] IDurableOrchestrationClient orchestrationClient, - ILogger log) + [DurableClient] DurableTaskClient orchestrationClient, + FunctionContext context) { + var log = context.GetLogger("ExportHistoryCleanUp"); var createdTimeFrom = DateTime.MinValue; - var createdTimeTo = DateTime.UtcNow.Subtract(TimeSpan.FromDays(Utils.GetIntEnvironmentVariable("FBI-EXPORTPURGEAFTERDAYS", "30"))); - var runtimeStatus = new List + + if (createdTimeFrom.Year < 1000) + { + createdTimeFrom = new DateTime(2000, 1, 1, 0, 0, 0, DateTimeKind.Utc); + } + + var createdTimeTo = DateTime.UtcNow.Subtract(TimeSpan.FromDays(Utils.GetIntEnvironmentVariable("FBI_EXPORTPURGEAFTERDAYS", "30"))); + var runtimeStatus = new List { - OrchestrationStatus.Completed, - OrchestrationStatus.Canceled, - OrchestrationStatus.Failed, - OrchestrationStatus.Terminated + OrchestrationRuntimeStatus.Completed, + OrchestrationRuntimeStatus.Canceled, + OrchestrationRuntimeStatus.Failed, + OrchestrationRuntimeStatus.Terminated }; - var result = await orchestrationClient.PurgeInstanceHistoryAsync(createdTimeFrom, createdTimeTo, runtimeStatus); - log.LogInformation($"Scheduled cleanup done, {result.InstancesDeleted} instances deleted"); + var result = await orchestrationClient.PurgeInstancesAsync(createdTimeFrom, createdTimeTo, runtimeStatus); + log.LogInformation($"Scheduled cleanup done and instances deleted"); } } diff --git a/FHIRBulkImport/FHIRBulkImport.csproj b/FHIRBulkImport/FHIRBulkImport.csproj index 5b106e4..4575027 100644 --- a/FHIRBulkImport/FHIRBulkImport.csproj +++ b/FHIRBulkImport/FHIRBulkImport.csproj @@ -1,19 +1,41 @@  + net8.0 v4 + Exe <_FunctionsSkipCleanOutput>true + 96064c59-55c4-4084-a2fd-99ed65f18a77 - + + + + + + + + + + + + + + + + + - - - - - - + + + + + + + + + @@ -24,4 +46,7 @@ + + + diff --git a/FHIRBulkImport/FHIRUtils.cs b/FHIRBulkImport/FHIRUtils.cs index 0b95b25..ae890f8 100644 --- a/FHIRBulkImport/FHIRUtils.cs +++ b/FHIRBulkImport/FHIRUtils.cs @@ -32,13 +32,13 @@ public enum BundleType public static class FHIRUtils { //AD Settings - private static bool isMsi = Utils.GetBoolEnvironmentVariable("FS-ISMSI", false); - private static string resource = Utils.GetEnvironmentVariable("FS-RESOURCE"); - private static string tenant = Utils.GetEnvironmentVariable("FS-TENANT-NAME"); - private static string clientid = Utils.GetEnvironmentVariable("FS-CLIENT-ID"); - private static string secret = Utils.GetEnvironmentVariable("FS-SECRET"); - private static string authority = Utils.GetEnvironmentVariable("FS-AUTHORITY", "https://login.microsoftonline.com"); - private static string fsurl = Utils.GetEnvironmentVariable("FS-URL"); + private static bool isMsi = Utils.GetBoolEnvironmentVariable("FS_ISMSI", false); + private static string resource = Utils.GetEnvironmentVariable("FS_RESOURCE"); + private static string tenant = Utils.GetEnvironmentVariable("FS_TENANT_NAME"); + private static string clientid = Utils.GetEnvironmentVariable("FS_CLIENT_ID"); + private static string secret = Utils.GetEnvironmentVariable("FS_SECRET"); + private static string authority = Utils.GetEnvironmentVariable("FS_AUTHORITY", "https://login.microsoftonline.com"); + private static string fsurl = Utils.GetEnvironmentVariable("FS_URL"); private static ConcurrentDictionary _tokens = new ConcurrentDictionary(); private static readonly HttpStatusCode[] httpStatusCodesWorthRetrying = { HttpStatusCode.RequestTimeout, // 408 @@ -51,10 +51,10 @@ public static class FHIRUtils private static HttpClient _fhirClient = new HttpClient( new SocketsHttpHandler() { - ResponseDrainTimeout = TimeSpan.FromSeconds(Utils.GetIntEnvironmentVariable("FBI-POOLEDCON-RESPONSEDRAINSECS", "60")), - PooledConnectionLifetime = TimeSpan.FromMinutes(Utils.GetIntEnvironmentVariable("FBI-POOLEDCON-LIFETIME", "5")), - PooledConnectionIdleTimeout = TimeSpan.FromMinutes(Utils.GetIntEnvironmentVariable("FBI-POOLEDCON-IDLETO", "2")), - MaxConnectionsPerServer = Utils.GetIntEnvironmentVariable("FBI-POOLEDCON-MAXCONNECTIONS", "20"), + ResponseDrainTimeout = TimeSpan.FromSeconds(Utils.GetIntEnvironmentVariable("FBI_POOLEDCON_RESPONSEDRAINSECS", "60")), + PooledConnectionLifetime = TimeSpan.FromMinutes(Utils.GetIntEnvironmentVariable("FBI_POOLEDCON_LIFETIME", "5")), + PooledConnectionIdleTimeout = TimeSpan.FromMinutes(Utils.GetIntEnvironmentVariable("FBI_POOLEDCON_IDLETO", "2")), + MaxConnectionsPerServer = Utils.GetIntEnvironmentVariable("FBI_POOLEDCON_MAXCONNECTIONS", "20"), }); public static async System.Threading.Tasks.Task CallFHIRServer(string path, string body, HttpMethod method, ILogger log, string customAudience = null) @@ -77,17 +77,17 @@ public static async System.Threading.Tasks.Task CallFHIRServer(str var retryPolicy = Policy .Handle() .OrResult(r => httpStatusCodesWorthRetrying.Contains(r.StatusCode)) - .WaitAndRetryAsync(Utils.GetIntEnvironmentVariable("FBI-POLLY-MAXRETRIES","3"), retryAttempt => - TimeSpan.FromMilliseconds(Utils.GetIntEnvironmentVariable("FBI-POLLY-RETRYMS", "500")), (result, timeSpan, retryCount, context) => + .WaitAndRetryAsync(Utils.GetIntEnvironmentVariable("FBI_POLLY_MAXRETRIES","3"), retryAttempt => + TimeSpan.FromMilliseconds(Utils.GetIntEnvironmentVariable("FBI_POLLY_RETRYMS", "500")), (result, timeSpan, retryCount, context) => { log.LogWarning($"FHIR Request failed on a retryable status...Waiting {timeSpan} before next retry. Attempt {retryCount}"); } ); - if (Utils.GetBoolEnvironmentVariable("FBI-POLLY-EXPONENTIAL")) + if (Utils.GetBoolEnvironmentVariable("FBI_POLLY_EXPONENTIAL")) { - int maxRetries = Utils.GetIntEnvironmentVariable("FBI-POLLY-MAXRETRIES", "3"); - double retryMs = Utils.GetIntEnvironmentVariable("FBI-POLLY-RETRYMS", "500"); + int maxRetries = Utils.GetIntEnvironmentVariable("FBI_POLLY_MAXRETRIES", "3"); + double retryMs = Utils.GetIntEnvironmentVariable("FBI_POLLY_RETRYMS", "500"); double jitterFactor = 0.2; // You can adjust this value as needed Random jitterRandom = new Random(); @@ -107,7 +107,7 @@ public static async System.Threading.Tasks.Task CallFHIRServer(str ); } - string bundleProcessingLogic = Utils.GetEnvironmentVariable("FBI-FHIR-BUNDLEPROCESSINGLOGIC", ""); + string bundleProcessingLogic = Utils.GetEnvironmentVariable("FBI_FHIR_BUNDLEPROCESSINGLOGIC", ""); HttpResponseMessage _fhirResponse = await retryPolicy.ExecuteAsync(async () => @@ -188,7 +188,7 @@ public static string[] SplitBundle(string requestBody, string originname, ILogge if (rtt.Equals("Bundle")) { JArray entries = (JArray)result["entry"]; - int mbs = Utils.GetIntEnvironmentVariable("FBI-MAXBUNDLESIZE", "500"); + int mbs = Utils.GetIntEnvironmentVariable("FBI_MAXBUNDLESIZE", "500"); if (entries.Count > mbs) { if (bt.Equals("batch")) @@ -317,7 +317,7 @@ public static async Task FromHttpResponseMessage(HttpResponseMessa { string s_retry = null; retVal.ResponseHeaders.TryGetValue("x-ms-retry-after-ms", out s_retry); - if (s_retry==null) s_retry = Environment.GetEnvironmentVariable("FBI-DEFAULTRETRY"); + if (s_retry==null) s_retry = Environment.GetEnvironmentVariable("FBI_DEFAULTRETRY"); int i = 0; if (!int.TryParse(s_retry, out i)) { diff --git a/FHIRBulkImport/ImportBundleBlobTrigger.cs b/FHIRBulkImport/ImportBundleBlobTrigger.cs index a7425cf..6caf31f 100644 --- a/FHIRBulkImport/ImportBundleBlobTrigger.cs +++ b/FHIRBulkImport/ImportBundleBlobTrigger.cs @@ -1,11 +1,12 @@ +using Microsoft.ApplicationInsights; +using Microsoft.ApplicationInsights.Extensibility; +using Microsoft.Azure.Functions.Worker; +using Microsoft.Azure.Functions.Worker.Extensions.Storage.Blobs; +using Microsoft.Extensions.Logging; using System; +using System.ComponentModel.DataAnnotations; using System.IO; using System.Threading.Tasks; -using Microsoft.ApplicationInsights.Extensibility; -using Microsoft.ApplicationInsights; -using Microsoft.Azure.WebJobs; -using Microsoft.Azure.WebJobs.Host; -using Microsoft.Extensions.Logging; namespace FHIRBulkImport { @@ -16,12 +17,14 @@ public class ImportBundleBlobTrigger public ImportBundleBlobTrigger(TelemetryConfiguration telemetryConfiguration) { _telemetryClient = new TelemetryClient(telemetryConfiguration); - } - [Disable("FBI-DISABLE-BLOBTRIGGER")] - [FunctionName("ImportBundleBlobTrigger")] - public async Task Run([BlobTrigger("bundles/{name}", Connection = "FBI-STORAGEACCT-IDENTITY")]Stream myBlob, string name, ILogger log) + } + + [Function("ImportBundleBlobTrigger")] + + public async Task Run([BlobTrigger("bundles/{name}", Connection = "FBI_STORAGEACCT_IDENTITY")]Stream myBlob, string name, FunctionContext context) { - await ImportUtils.ImportBundle(name, log, _telemetryClient); - } + var logger = context.GetLogger("ImportBundleBlobTrigger"); + await ImportUtils.ImportBundle(name, logger, _telemetryClient); + } } } diff --git a/FHIRBulkImport/ImportBundleEventGrid.cs b/FHIRBulkImport/ImportBundleEventGrid.cs index 5267b3d..19785f8 100644 --- a/FHIRBulkImport/ImportBundleEventGrid.cs +++ b/FHIRBulkImport/ImportBundleEventGrid.cs @@ -4,10 +4,11 @@ using System.Threading.Tasks; using Microsoft.ApplicationInsights.Extensibility; using Microsoft.ApplicationInsights; -using Microsoft.Azure.WebJobs; -using Microsoft.Azure.WebJobs.Extensions.EventGrid; +using Microsoft.Azure.Functions.Worker; +using Microsoft.Azure.Functions.Worker.Extensions.EventGrid; using Microsoft.Extensions.Logging; using Newtonsoft.Json.Linq; +using Azure.Messaging.EventGrid; namespace FHIRBulkImport { @@ -15,16 +16,18 @@ namespace FHIRBulkImport public class ImportBundleEventGrid { - [FunctionName("ImportBundleEventGrid")] - [return: Queue("bundlequeue", Connection = "FBI-STORAGEACCT-QUEUEURI-IDENTITY")] - public static JObject Run([EventGridTrigger] JObject blobCreatedEvent, - ILogger log) + [Function("ImportBundleEventGrid")] + [QueueOutput("bundlequeue", Connection = "FBI_STORAGEACCT_QUEUEURI_IDENTITY")] + public JObject Run([EventGridTrigger]EventGridEvent eventGridEvent, + FunctionContext context) { { - + var logger = context.GetLogger("ImportBundleEventGrid"); + var blobCreatedEvent = JObject.Parse(eventGridEvent.Data.ToString()); + logger.LogInformation("EventGrid trigger recieved event."); return blobCreatedEvent; } } } -} +} \ No newline at end of file diff --git a/FHIRBulkImport/ImportBundleHTTP.cs b/FHIRBulkImport/ImportBundleHTTP.cs index 3d0e5f7..b288fc5 100644 --- a/FHIRBulkImport/ImportBundleHTTP.cs +++ b/FHIRBulkImport/ImportBundleHTTP.cs @@ -2,9 +2,8 @@ using System.Linq; using System.IO; using System.Threading.Tasks; -using Microsoft.AspNetCore.Mvc; -using Microsoft.Azure.WebJobs; -using Microsoft.Azure.WebJobs.Extensions.Http; +using Microsoft.Azure.Functions.Worker; +using Microsoft.Azure.Functions.Worker.Http; using Microsoft.AspNetCore.Http; using Microsoft.Extensions.Logging; using Newtonsoft.Json.Linq; @@ -22,36 +21,52 @@ public ImportBundleHTTP(TelemetryConfiguration telemetryConfiguration) { _telemetryClient = new TelemetryClient(telemetryConfiguration); } - [Disable("FBI-DISABLE-HTTPEP")] - [FunctionName("ImportBundleHTTP")] - public static async Task Run( - [HttpTrigger(AuthorizationLevel.Function, "post", Route = "importbundle")] HttpRequest req, - ILogger log) + + [Function("ImportBundleHTTP")] + public async Task Run( + [HttpTrigger(AuthorizationLevel.Function, "post", Route = "importbundle")] HttpRequestData req, + FunctionContext context) { + var logger = context.GetLogger("ImportBundleHTTP"); string requestBody = await new StreamReader(req.Body).ReadToEndAsync(); - string filename = req.Query["bundlename"]; + string filename = System.Web.HttpUtility.ParseQueryString(req.Url.Query)["bundlename"]; if (string.IsNullOrEmpty(filename)) filename = $"bundle{Guid.NewGuid().ToString().Replace("-", "")}.json"; if (!filename.ToLower().EndsWith(".json")) filename += ".json"; + var response = req.CreateResponse(); try { var o = JObject.Parse(requestBody); if (o["resourceType"] !=null && o["resourceType"].ToString().Equals("Bundle")) { - var cbclient = StorageUtils.GetCloudBlobClient(System.Environment.GetEnvironmentVariable("FBI-STORAGEACCT")); - await StorageUtils.WriteStringToBlob(cbclient, "bundles", filename, requestBody, log); - return new ContentResult() { Content = "{\"filename\":\"" + filename + "\"}", StatusCode = 202, ContentType = "application/json" }; + var cbclient = StorageUtils.GetCloudBlobClient(System.Environment.GetEnvironmentVariable("FBI_STORAGEACCT")); + await StorageUtils.WriteStringToBlob(cbclient, "bundles", filename, requestBody, logger); + response.StatusCode = System.Net.HttpStatusCode.Accepted; + response.Headers.Add("Content-Type", "application/json"); + await response.WriteStringAsync($"{{\"filename\":\"{filename}\"}}"); + return response; } - return new ContentResult() { Content = $"Not a Valid FHIR Bundle", StatusCode = 400, ContentType = "text/plain" }; + response.StatusCode = System.Net.HttpStatusCode.BadRequest; + response.Headers.Add("Content-Type", "text/plain"); + await response.WriteStringAsync("Not a Valid FHIR Bundle"); + return response; } catch (JsonReaderException jre) { - return new ContentResult() {Content=$"Invalid JSONRequest Body:{jre.Message}",StatusCode=400,ContentType="text/plain" }; + response.StatusCode = System.Net.HttpStatusCode.BadRequest; + response.Headers.Add("Content-Type", "text/plain"); + await response.WriteStringAsync($"Invalid JSONRequest Body:{jre.Message}"); + return response; + } catch (Exception e) { - return new ContentResult() { Content = $"Error processing request:{e.Message}", StatusCode = 500, ContentType = "text/plain" }; + response.StatusCode = System.Net.HttpStatusCode.InternalServerError; + response.Headers.Add("Content-Type", "text/plain"); + await response.WriteStringAsync($"Error processing request:{e.Message}"); + return response; + } } diff --git a/FHIRBulkImport/ImportBundleQueue.cs b/FHIRBulkImport/ImportBundleQueue.cs index ced843a..5e60af7 100644 --- a/FHIRBulkImport/ImportBundleQueue.cs +++ b/FHIRBulkImport/ImportBundleQueue.cs @@ -4,12 +4,12 @@ using System.Threading.Tasks; using Microsoft.ApplicationInsights.Extensibility; using Microsoft.ApplicationInsights; -using Microsoft.Azure.WebJobs; -using Microsoft.Azure.WebJobs.Extensions.EventGrid; using Microsoft.Extensions.Logging; using Newtonsoft.Json.Linq; -using Microsoft.Azure.WebJobs.Extensions.DurableTask; +using Microsoft.Azure.Functions.Worker; +using Microsoft.Azure.Functions.Worker.Http; using System.Collections.Generic; +using Microsoft.Azure.Functions.Worker.Extensions.Timer; using Azure.Storage.Queues; using Azure.Storage.Queues.Models; using Azure.Identity; @@ -25,33 +25,37 @@ public ImportBundleQueue(TelemetryConfiguration telemetryConfiguration) _telemetryClient = new TelemetryClient(telemetryConfiguration); } - [FunctionName("ImportBundleQueue")] - public async Task Run([QueueTrigger("bundlequeue", Connection = "FBI-STORAGEACCT-QUEUEURI-IDENTITY")] JObject blobCreatedEvent, ILogger log) + [Function("ImportBundleQueue")] + public async Task Run([QueueTrigger("bundlequeue", Connection = "FBI_STORAGEACCT_QUEUEURI_IDENTITY")] QueueMessage queueMessage, FunctionContext context) { + var logger = context.GetLogger("ImportBundleQueue"); + string bodyText = queueMessage.Body.ToString(); + JObject blobCreatedEvent = JObject.Parse(bodyText); string url = (string)blobCreatedEvent["data"]["url"]; - log.LogInformation($"ImportBundleEventGrid: Processing blob at {url}..."); - string container = Utils.GetEnvironmentVariable("FBI-CONTAINER-BUNDLES", "bundles"); + logger.LogInformation($"ImportBundleEventGrid: Processing blob at {url}..."); + string container = Utils.GetEnvironmentVariable("FBI_CONTAINER_BUNDLES", "bundles"); string name = url.Substring(url.IndexOf($"/{container}/") + $"/{container}/".Length); - await ImportUtils.ImportBundle(name, log, _telemetryClient); + await ImportUtils.ImportBundle(name, logger, _telemetryClient); } - [FunctionName("PoisonQueueRetries")] + [Function("PoisonQueueRetries")] public static async Task PoisonQueueRetries( - [TimerTrigger("%FBI-POISONQUEUE-TIMER-CRON%")] TimerInfo timerInfo, - ILogger log) + [TimerTrigger("%FBI_POISONQUEUE_TIMER_CRON%")] TimerInfo timerInfo, + FunctionContext context) { - log.LogInformation($"PoisonQueueRetries:Checking for poison queue messages in bundlequeue-poison..."); - var sourceQueue = new QueueClient(new Uri($"{Utils.GetEnvironmentVariable("FBI-STORAGEACCT-QUEUEURI")}/bundlequeue-poison"),new DefaultAzureCredential()); + var logger = context.GetLogger("PoisonQueueRetries"); + logger.LogInformation($"PoisonQueueRetries:Checking for poison queue messages in bundlequeue-poison..."); + var sourceQueue = new QueueClient(new Uri($"{Utils.GetEnvironmentVariable("FBI_STORAGEACCT_QUEUEURI")}/bundlequeue-poison"),new DefaultAzureCredential()); await sourceQueue.CreateIfNotExistsAsync(); - var targetQueue = new QueueClient(new Uri($"{Utils.GetEnvironmentVariable("FBI-STORAGEACCT-QUEUEURI")}/bundlequeue"), new DefaultAzureCredential()); + var targetQueue = new QueueClient(new Uri($"{Utils.GetEnvironmentVariable("FBI_STORAGEACCT_QUEUEURI")}/bundlequeue"), new DefaultAzureCredential()); await targetQueue.CreateIfNotExistsAsync(); - int maxrequeuemessages = Utils.GetIntEnvironmentVariable("FBI-MAXREQUEUE-MESSAGE-COUNT", "100"); + int maxrequeuemessages = Utils.GetIntEnvironmentVariable("FBI_MAXREQUEUE_MESSAGE_COUNT", "100"); int messagesrequeued = 0; if (await sourceQueue.ExistsAsync()) { QueueProperties properties = sourceQueue.GetProperties(); // Retrieve the cached approximate message count. - int cachedMessagesCount = properties.ApproximateMessagesCount; - log.LogInformation($"PoisonQueueRetries:Found {cachedMessagesCount} messages in bundlequeue-poison....Re-queing upto {maxrequeuemessages}"); + int cachedMessagesCount = properties.ApproximateMessagesCount; + logger.LogInformation($"PoisonQueueRetries:Found {cachedMessagesCount} messages in bundlequeue-poison....Re-queing upto {maxrequeuemessages}"); while(cachedMessagesCount > 0 && messagesrequeued < maxrequeuemessages) { int batchsize = (maxrequeuemessages - messagesrequeued >= 32 ? 32 : maxrequeuemessages - messagesrequeued); foreach (var message in sourceQueue.ReceiveMessages(maxMessages: batchsize).Value) @@ -63,7 +67,7 @@ public static async Task PoisonQueueRetries( properties = sourceQueue.GetProperties(); cachedMessagesCount = properties.ApproximateMessagesCount; } - log.LogInformation($"PoisonQueueRetries:Requeued {messagesrequeued} messages to bundlequeue"); + logger.LogInformation($"PoisonQueueRetries:Requeued {messagesrequeued} messages to bundlequeue"); } } diff --git a/FHIRBulkImport/ImportCompressedFiles.cs b/FHIRBulkImport/ImportCompressedFiles.cs index f83db41..9a8e158 100644 --- a/FHIRBulkImport/ImportCompressedFiles.cs +++ b/FHIRBulkImport/ImportCompressedFiles.cs @@ -6,10 +6,8 @@ using System.Threading.Tasks; using Microsoft.ApplicationInsights.Extensibility; using Microsoft.ApplicationInsights; -using Microsoft.Azure.WebJobs; +using Microsoft.Azure.Functions.Worker; using Microsoft.Extensions.Logging; -using Microsoft.WindowsAzure.Storage; -using Microsoft.WindowsAzure.Storage.Blob; using Azure.Storage.Blobs; namespace FHIRBulkImport @@ -22,24 +20,25 @@ public ImportCompressedFiles(TelemetryConfiguration telemetryConfiguration) { _telemetryClient = new TelemetryClient(telemetryConfiguration); } - [FunctionName("ImportCompressedFiles")] - public static async Task Run([BlobTrigger("zip/{name}", Connection = "FBI-STORAGEACCT-IDENTITY")]Stream myBlob, string name, ILogger log) + [Function("ImportCompressedFiles")] + public async Task Run([BlobTrigger("zip/{name}", Connection = "FBI_STORAGEACCT_IDENTITY")]Stream myBlob, string name, FunctionContext context) { + var logger = context.GetLogger("ImportCompressedFiles"); try { int filecnt = 0; if (name.Split('.').Last().ToLower() == "zip") { - var blobClient = StorageUtils.GetCloudBlobClient(Utils.GetEnvironmentVariable("FBI-STORAGEACCT")); + var blobClient = StorageUtils.GetCloudBlobClient(Utils.GetEnvironmentVariable("FBI_STORAGEACCT")); var containerndjson = blobClient.GetBlobContainerClient("ndjson"); var containerbundles = blobClient.GetBlobContainerClient("bundles"); using (MemoryStream blobMemStream = new MemoryStream()) { - log.LogInformation($"ImportCompressedFiles: Decompressing {name} ..."); + logger.LogInformation($"ImportCompressedFiles: Decompressing {name} ..."); await myBlob.CopyToAsync(blobMemStream); - + blobMemStream.Position = 0; using (ZipArchive archive = new ZipArchive(blobMemStream)) { foreach (ZipArchiveEntry entry in archive.Entries) @@ -67,24 +66,24 @@ public static async Task Run([BlobTrigger("zip/{name}", Connection = "FBI-STORAG await blockBlob.UploadAsync(fileStream); } - log.LogInformation($"ImportCompressedFiles: Extracted {entry.FullName} to {destination.Name}/{validname}"); + logger.LogInformation($"ImportCompressedFiles: Extracted {entry.FullName} to {destination.Name}/{validname}"); } else { - log.LogInformation($"ImportCompressedFiles: Entry {entry.FullName} skipped does not end in .ndjson or .json"); + logger.LogInformation($"ImportCompressedFiles: Entry {entry.FullName} skipped does not end in .ndjson or .json"); } filecnt++; } } } - log.LogInformation($"ImportCompressedFiles: Completed Decompressing {name} extracted {filecnt} files..."); - await StorageUtils.MoveTo(blobClient, "zip", "zipprocessed", name, name,log); + logger.LogInformation($"ImportCompressedFiles: Completed Decompressing {name} extracted {filecnt} files..."); + await StorageUtils.MoveTo(blobClient, "zip", "zipprocessed", name, name,logger); } } catch (Exception ex) { - log.LogInformation($"ImportCompressedFiles: Error! Something went wrong: {ex.Message}"); + logger.LogInformation($"ImportCompressedFiles: Error! Something went wrong: {ex.Message}"); } } diff --git a/FHIRBulkImport/ImportNDJSONEventGird.cs b/FHIRBulkImport/ImportNDJSONEventGird.cs index 62d7494..af9bf43 100644 --- a/FHIRBulkImport/ImportNDJSONEventGird.cs +++ b/FHIRBulkImport/ImportNDJSONEventGird.cs @@ -4,8 +4,7 @@ using System.Threading.Tasks; using Azure.Messaging.EventGrid; using Azure.Messaging.EventGrid.SystemEvents; -using Microsoft.Azure.WebJobs; -using Microsoft.Azure.WebJobs.Extensions.EventGrid; +using Microsoft.Azure.Functions.Worker; using Microsoft.Extensions.Logging; @@ -15,14 +14,15 @@ namespace FHIRBulkImport public static class ImportNDJSONEventGird { - [FunctionName("ImportNDJSON")] - [return: Queue("ndjsonqueue", Connection = "FBI-STORAGEACCT-QUEUEURI-IDENTITY")] - public static JObject Run([EventGridTrigger]JObject blobCreatedEvent, - ILogger log) + [Function("ImportNDJSON")] + [QueueOutput("ndjsonqueue", Connection = "FBI_STORAGEACCT_QUEUEURI_IDENTITY")] + public static JObject Run([EventGridTrigger]EventGridEvent eventGridEvent, + FunctionContext context) { - - return blobCreatedEvent; - + var logger = context.GetLogger("ImportNDJSON"); + var eventJson = JObject.Parse(eventGridEvent.Data.ToString()); + logger.LogInformation($"Full EventGrid received: {eventJson}"); + return eventJson; } diff --git a/FHIRBulkImport/ImportNDJSONQueue.cs b/FHIRBulkImport/ImportNDJSONQueue.cs index 3bff7a7..b8cb51a 100644 --- a/FHIRBulkImport/ImportNDJSONQueue.cs +++ b/FHIRBulkImport/ImportNDJSONQueue.cs @@ -3,8 +3,8 @@ using System.Text; using System.Threading.Tasks; using Azure.Storage.Queues.Models; -using Microsoft.Azure.WebJobs; -using Microsoft.Azure.WebJobs.Host; + +using Microsoft.Azure.Functions.Worker; using Microsoft.Extensions.Logging; using Newtonsoft.Json.Linq; @@ -12,39 +12,55 @@ namespace FHIRBulkImport { public class ImportNDJSONQueue { - [FunctionName("ImportNDJSONQueue")] - public static async Task Run([QueueTrigger("ndjsonqueue", Connection = "FBI-STORAGEACCT-QUEUEURI-IDENTITY")] QueueMessage queueMessage,ILogger log) + [Function("ImportNDJSONQueue")] + public static async Task Run([QueueTrigger("ndjsonqueue", Connection = "FBI_STORAGEACCT_QUEUEURI_IDENTITY")] QueueMessage queueMessage, FunctionContext context) { - JObject blobCreatedEvent = JObject.Parse(queueMessage.Body.ToString()); + var logger = context.GetLogger("ImportNDJSONQueue"); + logger.LogInformation("Function triggered. Started Processing"); + string bodyText = queueMessage.Body.ToString(); + logger.LogInformation($"Queue Message Body: {bodyText}"); + JObject blobCreatedEvent; + try + { + blobCreatedEvent = JObject.Parse(bodyText); + } + catch (Exception ex) + { + logger.LogError($"Failed to parse queue message body: {ex.Message}"); + return; + } string url = (string)blobCreatedEvent["data"]["url"]; + logger.LogInformation($"Blob url extracted: {url}"); if (queueMessage.DequeueCount > 1) { - log.LogInformation($"ImportNDJSONQueue: Ignoring long running requeue of file {url} on dequeue {queueMessage.DequeueCount}"); + logger.LogInformation($"ImportNDJSONQueue: Ignoring long running requeue of file {url} on dequeue {queueMessage.DequeueCount}"); return; } int maxresourcesperbundle = 200; - var cbclient = StorageUtils.GetCloudBlobClient(System.Environment.GetEnvironmentVariable("FBI-STORAGEACCT")); - string container = Utils.GetEnvironmentVariable("FBI-CONTAINER-NDJSON", "ndjson"); + var cbclient = StorageUtils.GetCloudBlobClient(System.Environment.GetEnvironmentVariable("FBI_STORAGEACCT")); + string container = Utils.GetEnvironmentVariable("FBI_CONTAINER_NDJSON", "ndjson"); string name = url.Substring(url.IndexOf($"/{container}/") + $"/{container}/".Length); - string mrbundlemax = System.Environment.GetEnvironmentVariable("FBI-MAXRESOURCESPERBUNDLE"); + logger.LogInformation($"Blob name resolved: {name}"); + string mrbundlemax = System.Environment.GetEnvironmentVariable("FBI_MAXRESOURCESPERBUNDLE"); if (!string.IsNullOrEmpty(mrbundlemax)) { if (!int.TryParse(mrbundlemax, out maxresourcesperbundle)) maxresourcesperbundle = 200; } - log.LogInformation($"NDJSONConverter: Processing blob at {url}..."); + logger.LogInformation($"NDJSONConverter: Processing blob at {url}..."); JObject rv = ImportUtils.initBundle(); int linecnt = 0; int total = 0; int bundlecnt = 0; int errcnt = 0; int fileno = 1; - Stream myBlob = await StorageUtils.GetStreamForBlob(cbclient, container, name,log); - if (myBlob==null) + Stream myBlob = await StorageUtils.GetStreamForBlob(cbclient, container, name, logger); + if (myBlob == null) { - log.LogWarning($"ImportNDJSONQueue:The blob {name} in container {container} does not exist or cannot be read."); + logger.LogWarning($"ImportNDJSONQueue:The blob {name} in container {container} does not exist or cannot be read."); return; } StringBuilder errsb = new StringBuilder(); + logger.LogInformation($"Starting to read blob stream for {name}"); using (StreamReader reader = new StreamReader(myBlob)) { string line; @@ -53,6 +69,7 @@ public static async Task Run([QueueTrigger("ndjsonqueue", Connection = "FBI-STOR linecnt++; JObject res = null; + logger.LogDebug($"Reading line {linecnt}"); try { res = JObject.Parse(line); @@ -62,14 +79,16 @@ public static async Task Run([QueueTrigger("ndjsonqueue", Connection = "FBI-STOR } catch (Exception e) { - log.LogError($"NDJSONConverter: File {name} is in error or contains invalid JSON at line number {linecnt}:{e.Message}"); + logger.LogError($"NDJSONConverter: File {name} is in error or contains invalid JSON at line number {linecnt}:{e.Message}"); errsb.Append($"{line}\n"); errcnt++; } if (bundlecnt >= maxresourcesperbundle) { - await StorageUtils.WriteStringToBlob(cbclient, "bundles", $"{name}-{fileno++}.json", rv.ToString(), log); + logger.LogInformation($"Writing bundle {name}--{fileno++}.json with {bundlecnt} resources"); + await StorageUtils.WriteStringToBlob(cbclient, "bundles", $"{name}-{fileno++}.json", rv.ToString(), logger); + bundlecnt = 0; rv = null; rv = ImportUtils.initBundle(); @@ -77,17 +96,20 @@ public static async Task Run([QueueTrigger("ndjsonqueue", Connection = "FBI-STOR } if (bundlecnt > 0) { - await StorageUtils.WriteStringToBlob(cbclient, "bundles", $"{name}-{fileno++}.json", rv.ToString(), log); + logger.LogInformation($"Writing final bundle {name}--{fileno++}.json with {bundlecnt} resources"); + await StorageUtils.WriteStringToBlob(cbclient, "bundles", $"{name}-{fileno++}.json", rv.ToString(), logger); } - await StorageUtils.MoveTo(cbclient, "ndjson", "ndjsonprocessed", name, $"{name}.processed", log); + logger.LogInformation($"Moving processed file to 'ndjsonprocessed'"); + await StorageUtils.MoveTo(cbclient, "ndjson", "ndjsonprocessed", name, $"{name}.processed", logger); if (errcnt > 0) { - await StorageUtils.WriteStringToBlob(cbclient, "ndjsonerr", $"{name}.err", errsb.ToString(), log); + logger.LogWarning($"Writing error file with {errcnt} errors"); + await StorageUtils.WriteStringToBlob(cbclient, "ndjsonerr", $"{name}.err", errsb.ToString(), logger); } - log.LogInformation($"NDJSONConverter: Processing file {name} completed with {total} resources created in {fileno - 1} bundles with {errcnt} errors..."); + logger.LogInformation($"NDJSONConverter: Processing file {name} completed with {total} resources created in {fileno - 1} bundles with {errcnt} errors..."); } } - + } -} +} \ No newline at end of file diff --git a/FHIRBulkImport/ImportUtils.cs b/FHIRBulkImport/ImportUtils.cs index a65a705..6c43b9f 100644 --- a/FHIRBulkImport/ImportUtils.cs +++ b/FHIRBulkImport/ImportUtils.cs @@ -20,16 +20,16 @@ public static class ImportUtils { //Unahandeled Exceptions worth retrying public static string MESSAGE_RETRY_SETTING = "request was canceled due to the configured HttpClient.Timeout,target machine actively refused it,an error occurred while sending the request"; - public static string[] EXCEPTION_MESSAGE_STRINGS_RETRY = Utils.GetEnvironmentVariable("FBI-UNHANDLED-RETRY-MESSAGES",MESSAGE_RETRY_SETTING).Split(','); + public static string[] EXCEPTION_MESSAGE_STRINGS_RETRY = Utils.GetEnvironmentVariable("FBI_UNHANDLED_RETRY_MESSAGES",MESSAGE_RETRY_SETTING).Split(','); public static async Task ImportBundle(string name, ILogger log, TelemetryClient telemetryClient) { // Setup for metrics - bool trbundles = Utils.GetBoolEnvironmentVariable("FBI-TRANSFORMBUNDLES", true); + bool trbundles = Utils.GetBoolEnvironmentVariable("FBI_TRANSFORMBUNDLES", true); log.LogInformation($"ImportFHIRBundles: Processing file Name:{name}..."); - var cbclient = StorageUtils.GetCloudBlobClient(Utils.GetEnvironmentVariable("FBI-STORAGEACCT")); - string container = Utils.GetEnvironmentVariable("FBI-CONTAINER-BUNDLES", "bundles"); + var cbclient = StorageUtils.GetCloudBlobClient(Utils.GetEnvironmentVariable("FBI_STORAGEACCT")); + string container = Utils.GetEnvironmentVariable("FBI_CONTAINER_BUNDLES", "bundles"); Stream myBlob = await StorageUtils.GetStreamForBlob(cbclient, container, name, log); if (myBlob == null) { diff --git a/FHIRBulkImport/Program.cs b/FHIRBulkImport/Program.cs new file mode 100644 index 0000000..58793e7 --- /dev/null +++ b/FHIRBulkImport/Program.cs @@ -0,0 +1,37 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; +using Microsoft.Extensions.Hosting; +using Microsoft.Azure.Functions.Worker; +using Microsoft.Azure.Functions.Worker.ApplicationInsights; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.ApplicationInsights; +using Microsoft.ApplicationInsights.Extensibility; +using Microsoft.Extensions.Configuration; +using Microsoft.Extensions.Logging; +public class Program +{ + private static async Task Main(string[] args) + { + var host = new HostBuilder() + .ConfigureAppConfiguration(config => + { + config.AddJsonFile("local.settings.json", optional: true, reloadOnChange: true) + .AddEnvironmentVariables(); + }) + .ConfigureFunctionsWorkerDefaults() + .ConfigureServices(services => { + services.AddApplicationInsightsTelemetryWorkerService(); + services.ConfigureFunctionsApplicationInsights(); + }) + .ConfigureLogging(logging => + { + logging.AddApplicationInsights(); + }) + .Build(); + await host.RunAsync(); + + } + } diff --git a/FHIRBulkImport/host.json b/FHIRBulkImport/host.json index 764d007..43eb1e6 100644 --- a/FHIRBulkImport/host.json +++ b/FHIRBulkImport/host.json @@ -18,7 +18,7 @@ "storageProvider": { "partitionCount": 3 }, - "extendedSessionsEnabled": true, + "extendedSessionsEnabled": false, "extendedSessionIdleTimeoutInSeconds": 30 } diff --git a/scripts/fhirBulkImport.bicep b/scripts/fhirBulkImport.bicep index 86afcbd..c94a4f1 100644 --- a/scripts/fhirBulkImport.bicep +++ b/scripts/fhirBulkImport.bicep @@ -223,54 +223,54 @@ resource fhirProxyAppSettings 'Microsoft.Web/sites/config@2021-03-01' = { AzureFunctionsJobHost__functionTimeout: '23:00:00' // Storage account to setup import from - 'FBI-STORAGEACCT': storageAccountUri - 'FBI-STORAGEACCT-QUEUEURI-IDENTITY__queueServiceUri': storageAccountQueueUri - 'FBI-STORAGEACCT-IDENTITY__blobServiceUri': storageAccountUri - 'FBI-STORAGEACCT-QUEUEURI': storageAccountQueueUri + 'FBI_STORAGEACCT': storageAccountUri + 'FBI_STORAGEACCT_QUEUEURI_IDENTITY__queueServiceUri': storageAccountQueueUri + 'FBI_STORAGEACCT_IDENTITY__blobServiceUri': storageAccountUri + 'FBI_STORAGEACCT_QUEUEURI': storageAccountQueueUri // URL for the FHIR endpoint - 'FS-URL': fhirUrl + 'FS_URL': fhirUrl // Resource for the FHIR endpoint. - 'FS-RESOURCE': empty(fhirAudience) ? fhirUrl : fhirAudience + 'FS_RESOURCE': empty(fhirAudience) ? fhirUrl : fhirAudience // Tenant of FHIR Server - 'FS-TENANT-NAME': tenantId + 'FS_TENANT_NAME': tenantId - 'FS-ISMSI': authenticationType == 'managedIdentity' ? 'true' : 'false' + 'FS_ISMSI': authenticationType == 'managedIdentity' ? 'true' : 'false' - 'FS-CLIENT-ID': authenticationType == 'servicePrincipal' ? serviceAccountClientId : '' + 'FS_CLIENT_ID': authenticationType == 'servicePrincipal' ? serviceAccountClientId : '' - 'FS-SECRET': authenticationType == 'servicePrincipal' ? serviceAccountSecret : '' + 'FS_SECRET': authenticationType == 'servicePrincipal' ? serviceAccountSecret : '' // When loading bundles, convert transaction to batch bundles. Transform UUIDs and resolve ifNoneExist TRANSFORMBUNDLES: '${transformTransactionBundles}' // ADVANCED // Max number of resources in a bundle - 'FBI-MAXBUNDLESIZE': '500' + 'FBI_MAXBUNDLESIZE': '500' // When loading NDJSON, how many resources to put in a single bundle - 'FBI-MAXRESOURCESPERBUNDLE': '500' + 'FBI_MAXRESOURCESPERBUNDLE': '500' // Max HTTP retries on the FHIR Server - 'FBI-POLLY-MAXRETRIES': '3' + 'FBI_POLLY_MAXRETRIES': '3' // Retry delay for FHIR Server requests - 'FBI-POLLY-RETRYMS': '500' + 'FBI_POLLY_RETRYMS': '500' // ResponseDrainTimeout - 'FBI-POOLEDCON-RESPONSEDRAINSECS': '60' + 'FBI_POOLEDCON_RESPONSEDRAINSECS': '60' // PooledConnectionLifetime - 'FBI-POOLEDCON-LIFETIME': '5' + 'FBI_POOLEDCON_LIFETIME': '5' // PooledConnectionIdleTimeout - 'FBI-POOLEDCON-IDLETO': '2' + 'FBI_POOLEDCON_IDLETO': '2' // MaxConnectionsPerServer - 'FBI-POOLEDCON-MAXCONNECTIONS': '20' + 'FBI_POOLEDCON_MAXCONNECTIONS': '20' // Max file size to load. -1 disables this. - 'FBI-MAXFILESIZEMB': '-1' + 'FBI_MAXFILESIZEMB': '-1' // Max number of concurrent exports - 'FBI-MAXEXPORTS': '-1' + 'FBI_MAXEXPORTS': '-1' // How long to leave exports on the storage account - 'FBI-EXPORTPURGEAFTERDAYS': '30' + 'FBI_EXPORTPURGEAFTERDAYS': '30' // Period to run the poision queue function. - 'FBI-POISONQUEUE-TIMER-CRON': '0 */2 * * * *' + 'FBI_POISONQUEUE_TIMER_CRON': '0 */2 * * * *' } } diff --git a/scripts/fhirBulkImport.json b/scripts/fhirBulkImport.json index 869f243..42f6fac 100644 --- a/scripts/fhirBulkImport.json +++ b/scripts/fhirBulkImport.json @@ -358,35 +358,35 @@ "properties": { "AzureWebJobsStorage__accountname": "[format('{0}stor', variables('prefixNameCleanShort'))]", "FUNCTIONS_EXTENSION_VERSION": "~4", - "FUNCTIONS_WORKER_RUNTIME": "dotnet", + "FUNCTIONS_WORKER_RUNTIME": "dotnet-isolated", "FUNCTIONS_INPROC_NET8_ENABLED": "1", "APPLICATIONINSIGHTS_CONNECTION_STRING": "[reference(resourceId('Microsoft.Insights/components', format('{0}-ai', variables('prefixNameCleanShort'))), '2020-02-02-preview').ConnectionString]", "SCM_DO_BUILD_DURING_DEPLOYMENT": "true", "AzureWebJobs.ImportBundleBlobTrigger.Disabled": "1", "AzureFunctionsJobHost__functionTimeout": "23:00:00", - "FBI-STORAGEACCT": "[format('https://{0}.blob.core.windows.net/',variables('storageAccountName'))]", - "FBI-STORAGEACCT-QUEUEURI-IDENTITY__queueServiceUri": "[format('https://{0}.queue.core.windows.net/',variables('storageAccountName'))]", - "FBI-STORAGEACCT-IDENTITY__blobServiceUri": "[format('https://{0}.blob.core.windows.net/',variables('storageAccountName'))]", - "FBI-STORAGEACCT-QUEUEURI": "[format('https://{0}.queue.core.windows.net/',variables('storageAccountName'))]", - "FS-URL": "[variables('fhirUrl')]", - "FS-RESOURCE": "[if(empty(parameters('fhirAudience')), variables('fhirUrl'), parameters('fhirAudience'))]", - "FS-TENANT-NAME": "[variables('tenantId')]", - "FS-ISMSI": "[if(equals(parameters('authenticationType'), 'managedIdentity'), 'true', 'false')]", - "FS-CLIENT-ID": "[if(equals(parameters('authenticationType'), 'servicePrincipal'), parameters('serviceAccountClientId'), '')]", - "FS-SECRET": "[if(equals(parameters('authenticationType'), 'servicePrincipal'), parameters('serviceAccountSecret'), '')]", + "FBI_STORAGEACCT": "[format('https://{0}.blob.core.windows.net/',variables('storageAccountName'))]", + "FBI_STORAGEACCT_QUEUEURI_IDENTITY__queueServiceUri": "[format('https://{0}.queue.core.windows.net/',variables('storageAccountName'))]", + "FBI_STORAGEACCT_IDENTITY__blobServiceUri": "[format('https://{0}.blob.core.windows.net/',variables('storageAccountName'))]", + "FBI_STORAGEACCT_QUEUEURI": "[format('https://{0}.queue.core.windows.net/',variables('storageAccountName'))]", + "FS_URL": "[variables('fhirUrl')]", + "FS_RESOURCE": "[if(empty(parameters('fhirAudience')), variables('fhirUrl'), parameters('fhirAudience'))]", + "FS_TENANT_NAME": "[variables('tenantId')]", + "FS_ISMSI": "[if(equals(parameters('authenticationType'), 'managedIdentity'), 'true', 'false')]", + "FS_CLIENT_ID": "[if(equals(parameters('authenticationType'), 'servicePrincipal'), parameters('serviceAccountClientId'), '')]", + "FS_SECRET": "[if(equals(parameters('authenticationType'), 'servicePrincipal'), parameters('serviceAccountSecret'), '')]", "TRANSFORMBUNDLES": "[format('{0}', parameters('transformTransactionBundles'))]", - "FBI-MAXBUNDLESIZE": "500", - "FBI-MAXRESOURCESPERBUNDLE": "500", - "FBI-POLLY-MAXRETRIES": "3", - "FBI-POLLY-RETRYMS": "500", - "FBI-POOLEDCON-RESPONSEDRAINSECS": "60", - "FBI-POOLEDCON-LIFETIME": "5", - "FBI-POOLEDCON-IDLETO": "2", - "FBI-POOLEDCON-MAXCONNECTIONS": "20", - "FBI-MAXFILESIZEMB": "-1", - "FBI-MAXEXPORTS": "-1", - "FBI-EXPORTPURGEAFTERDAYS": "30", - "FBI-POISONQUEUE-TIMER-CRON": "0 */2 * * * *" + "FBI_MAXBUNDLESIZE": "500", + "FBI_MAXRESOURCESPERBUNDLE": "500", + "FBI_POLLY_MAXRETRIES": "3", + "FBI_POLLY_RETRYMS": "500", + "FBI_POOLEDCON_RESPONSEDRAINSECS": "60", + "FBI_POOLEDCON_LIFETIME": "5", + "FBI_POOLEDCON_IDLETO": "2", + "FBI_POOLEDCON_MAXCONNECTIONS": "20", + "FBI_MAXFILESIZEMB": "-1", + "FBI_MAXEXPORTS": "-1", + "FBI_EXPORTPURGEAFTERDAYS": "30", + "FBI_POISONQUEUE_TIMER_CRON": "0 */2 * * * *" }, "dependsOn": [ "[resourceId('Microsoft.Insights/components', format('{0}-ai', variables('prefixNameCleanShort')))]",