Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion FHIRBulkImport/ADUtils.cs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ public static async Task<string> GetAADAccessToken(string authority, string clie
catch (Exception e)
{
log.LogError($"GetAADAccessToken: Exception getting access token: {e.Message}");
return null;
return null;
}

}
Expand Down
91 changes: 53 additions & 38 deletions FHIRBulkImport/ExportAllOrchestrator.cs
Original file line number Diff line number Diff line change
@@ -1,56 +1,65 @@
using System.Net.Http;
using System.Threading.Tasks;
using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.Extensions.DurableTask;
using Microsoft.Azure.WebJobs.Extensions.Http;
using Microsoft.Azure.Functions.Worker;
using Microsoft.Azure.Functions.Worker.Http;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json.Linq;
using System.Linq;
using System.Linq;
using System;
using Newtonsoft.Json;
using System.Collections.Generic;
using Microsoft.DurableTask;
using Microsoft.DurableTask.Client;
using System.IO;
using System.Threading;

namespace FHIRBulkImport
{
public class ExportAllOrchestrator
{
private static int _exportResourceCount = Utils.GetIntEnvironmentVariable("FS-EXPORTRESOURCECOUNT", "1000");
private static string _storageAccount = Utils.GetEnvironmentVariable("FBI-STORAGEACCT");
private static int _maxInstances = Utils.GetIntEnvironmentVariable("FBI-MAXEXPORTS", "0");
private static int _exportResourceCount = Utils.GetIntEnvironmentVariable("FS_EXPORTRESOURCECOUNT", "1000");
private static string _storageAccount = Utils.GetEnvironmentVariable("FBI_STORAGEACCT");
private static int _maxInstances = Utils.GetIntEnvironmentVariable("FBI_MAXEXPORTS", "0");
private static int _maxParallelizationCount = 100;
private static int _parallelSearchBundleSize = _maxInstances = Utils.GetIntEnvironmentVariable("FBI-PARALLELSEARCHBUNDLESIZE", "50");
private static RetryOptions _exportAllRetryOptions = new RetryOptions(firstRetryInterval: TimeSpan.FromSeconds(Utils.GetIntEnvironmentVariable("FBI-EXPORTALLRETRYINTERVAL", "30")), maxNumberOfAttempts: 5)
{ BackoffCoefficient = Utils.GetIntEnvironmentVariable("FBI-EXPORTALLBACKOFFCOEFFICIENT", "3") };

[FunctionName(nameof(ExportAllOrchestrator_HttpStart))]
public async Task<HttpResponseMessage> ExportAllOrchestrator_HttpStart(
[HttpTrigger(AuthorizationLevel.Function, "post", Route = "$alt-export-all")] HttpRequestMessage req,
[DurableClient] IDurableOrchestrationClient starter,
ILogger log)
private static int _parallelSearchBundleSize = _maxInstances = Utils.GetIntEnvironmentVariable("FBI_PARALLELSEARCHBUNDLESIZE", "50");
private static RetryPolicy _exportAllRetryOptions = new RetryPolicy(firstRetryInterval: TimeSpan.FromSeconds(Utils.GetIntEnvironmentVariable("FBI_EXPORTALLRETRYINTERVAL", "30")), maxNumberOfAttempts: 5,
backoffCoefficient: Convert.ToDouble(Utils.GetIntEnvironmentVariable("FBI_EXPORTALLBACKCOEFFICIENT", "3")));


[Function(nameof(ExportAllOrchestrator_HttpStart))]
public async Task<HttpResponseData> ExportAllOrchestrator_HttpStart(
[HttpTrigger(AuthorizationLevel.Function, "post", Route = "$alt-export-all")] HttpRequestData req,
[DurableClient] DurableTaskClient starter,
FunctionContext context)
{
string requestParameters = await req.Content.ReadAsStringAsync();
var state = await ExportOrchestrator.runningInstances(starter, log);
var log = context.GetLogger("ExportAllOrchestrator_HttpStart");
string requestParameters = await new StreamReader(req.Body).ReadToEndAsync();
var orchestrator = new ExportOrchestrator();
var state = await orchestrator.runningInstances(starter, context);
int running = state.Count();

if (_maxInstances > 0 && running >= _maxInstances)
{
string msg = $"Unable to start export there are {running} exports the max concurrent allowed is {_maxInstances}";
StringContent sc = new StringContent("{\"error\":\"" + msg + "\"");
return new HttpResponseMessage() { Content = sc, StatusCode = System.Net.HttpStatusCode.TooManyRequests };
string sc = "{\"error\":\"" + msg + "\"}";
var response = req.CreateResponse(System.Net.HttpStatusCode.TooManyRequests);
await response.WriteStringAsync(sc);
return response;
}

// Function input comes from the request content.
string instanceId = await starter.StartNewAsync(nameof(ExportAll_RunOrchestrator), null, requestParameters);
string instanceId = await starter.ScheduleNewOrchestrationInstanceAsync(nameof(ExportAll_RunOrchestrator), requestParameters);

log.LogInformation($"Started orchestration with ID = '{instanceId}'.");
return starter.CreateCheckStatusResponse(req, instanceId);
}

[FunctionName(nameof(ExportAll_RunOrchestrator))]
[Function(nameof(ExportAll_RunOrchestrator))]
public async Task<JObject> ExportAll_RunOrchestrator(
[OrchestrationTrigger] IDurableOrchestrationContext context,
ILogger logger)
[OrchestrationTrigger] TaskOrchestrationContext context,
FunctionContext log)
{
var logger = log.GetLogger("ExportAll_RunOrchestrator");
// Setup function level variables.
JObject retVal = new JObject();
retVal["instanceid"] = context.InstanceId;
Expand Down Expand Up @@ -83,10 +92,12 @@ public async Task<JObject> ExportAll_RunOrchestrator(
context.SetCustomStatus(retVal);

var parallelizationTasks = options.ParallelSearchRanges.Select(
x => context.CallActivityWithRetryAsync<List<(DateTime Start, DateTime End, int Count)>>(
x => context.CallActivityAsync<List<(DateTime Start, DateTime End, int Count)>>(
nameof(ExportAllOrchestrator_GetCountsForListOfDateRanges),
_exportAllRetryOptions,
new GetCountsForListOfDateRangesRequest(options.ResourceType, x.Select(y => (y.Start, y.End, -1)).ToList())));

new GetCountsForListOfDateRangesRequest(options.ResourceType, x.Select(y => (y.Start, y.End, -1)).ToList()),
new TaskOptions(_exportAllRetryOptions)
));

var results = await Task.WhenAll(parallelizationTasks);

Expand Down Expand Up @@ -114,10 +125,11 @@ public async Task<JObject> ExportAll_RunOrchestrator(
nextLink += $"&_lastUpdated=lt{searchRanges[i].End.Value.ToString("o")}";
}

exportTasks.Add(context.CallActivityWithRetryAsync<DataPageResult>(
exportTasks.Add(context.CallActivityAsync<DataPageResult>(
nameof(ExportAllOrchestrator_GetAndWriteDataPage),
_exportAllRetryOptions,
new DataPageRequest(FhirRequestPath: nextLink, InstanceId: context.InstanceId, ParallelTaskIndex: i, ResourceType: options.ResourceType, Audience: options.Audience)));

new DataPageRequest(FhirRequestPath: nextLink, InstanceId: context.InstanceId, ParallelTaskIndex: i, ResourceType: options.ResourceType, Audience: options.Audience),
new TaskOptions(_exportAllRetryOptions)));
}

// Start and continue export as long as there are pending tasks
Expand Down Expand Up @@ -165,10 +177,11 @@ public async Task<JObject> ExportAll_RunOrchestrator(

if (fhirResult.NextLink != null)
{
exportTasks.Add(context.CallActivityWithRetryAsync<DataPageResult>(
exportTasks.Add(context.CallActivityAsync<DataPageResult>(
nameof(ExportAllOrchestrator_GetAndWriteDataPage),
_exportAllRetryOptions,
new DataPageRequest(FhirRequestPath: fhirResult.NextLink, InstanceId: fhirResult.InstanceId, ParallelTaskIndex: fhirResult.ParallelTaskIndex, ResourceType: options.ResourceType, Audience: options.Audience)));

new DataPageRequest(FhirRequestPath: fhirResult.NextLink, InstanceId: fhirResult.InstanceId, ParallelTaskIndex: fhirResult.ParallelTaskIndex, ResourceType: options.ResourceType, Audience: options.Audience),
new TaskOptions(_exportAllRetryOptions)));
}
}

Expand All @@ -184,11 +197,12 @@ public async Task<JObject> ExportAll_RunOrchestrator(
return retVal;
}

[FunctionName(nameof(ExportAllOrchestrator_GetCountsForListOfDateRanges))]
[Function(nameof(ExportAllOrchestrator_GetCountsForListOfDateRanges))]
public async Task<List<(DateTime Start, DateTime End, int Count)>> ExportAllOrchestrator_GetCountsForListOfDateRanges(
[ActivityTrigger] GetCountsForListOfDateRangesRequest input,
ILogger logger)
FunctionContext context)
{
var logger = context.GetLogger("ExportAllOrchestrator_GetCountsForListOfDateRanges");
JObject requestBody = new();
requestBody["resourceType"] = "Bundle";
requestBody["type"] = "batch";
Expand Down Expand Up @@ -253,12 +267,13 @@ public async Task<JObject> ExportAll_RunOrchestrator(
throw new Exception(message);
}

[FunctionName(nameof(ExportAllOrchestrator_GetAndWriteDataPage))]
[Function(nameof(ExportAllOrchestrator_GetAndWriteDataPage))]
public async Task<DataPageResult> ExportAllOrchestrator_GetAndWriteDataPage(
[ActivityTrigger] DataPageRequest input,
[DurableClient] IDurableEntityClient ec,
ILogger logger)
[DurableClient] DurableTaskClient ec,
FunctionContext context)
{
var logger = context.GetLogger("ExportAllOrchestrator_GetAndWriteDataPage");
logger.LogInformation($"Fetching page of resources using query {input.FhirRequestPath}");

var response = await FHIRUtils.CallFHIRServer(input.FhirRequestPath, "", HttpMethod.Get, logger, input.Audience);
Expand Down
Loading
Loading