diff --git a/Gigya.Microdot.Fakes/Discovery/AlwaysLocalHostDiscovery.cs b/Gigya.Microdot.Fakes/Discovery/AlwaysLocalHostDiscovery.cs index 9f0003fc..dd9d8b0b 100644 --- a/Gigya.Microdot.Fakes/Discovery/AlwaysLocalHostDiscovery.cs +++ b/Gigya.Microdot.Fakes/Discovery/AlwaysLocalHostDiscovery.cs @@ -51,5 +51,15 @@ public async Task GetNodes(DeploymentIdentifier deploymentIdentifier) public void Dispose() { } + + public TaskCompletionSource<(string version, Node[] nodes)> WaitForServiceChanges(DeploymentIdentifier deploymentIdentifier) + { + throw new NotImplementedException(); + } + + Task<(string version, Node[] nodes)> IDiscovery.WaitForServiceChanges(DeploymentIdentifier deploymentIdentifier) + { + throw new NotImplementedException(); + } } } \ No newline at end of file diff --git a/Gigya.Microdot.ServiceDiscovery/Rewrite/ConfigNodeSource.cs b/Gigya.Microdot.ServiceDiscovery/Rewrite/ConfigNodeSource.cs index b0ece487..1be16885 100644 --- a/Gigya.Microdot.ServiceDiscovery/Rewrite/ConfigNodeSource.cs +++ b/Gigya.Microdot.ServiceDiscovery/Rewrite/ConfigNodeSource.cs @@ -127,5 +127,10 @@ public void Dispose() { // nothing to shutdown } + + public void RegisterForSchemaChangeEvent(DeploymentIdentifier deploymentIdentifier, System.Threading.Tasks.TaskCompletionSource<(string version, Node[] nodes)> tcs) + { + throw new NotImplementedException(); + } } } diff --git a/Gigya.Microdot.ServiceDiscovery/Rewrite/ConsulClient.cs b/Gigya.Microdot.ServiceDiscovery/Rewrite/ConsulClient.cs index 557f1e47..22469e90 100644 --- a/Gigya.Microdot.ServiceDiscovery/Rewrite/ConsulClient.cs +++ b/Gigya.Microdot.ServiceDiscovery/Rewrite/ConsulClient.cs @@ -219,8 +219,9 @@ private async Task> Call(string commandPath, CancellationTo using (var timeoutcancellationToken = new CancellationTokenSource(httpTaskTimeout)) using (var cancellationSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, timeoutcancellationToken.Token)) { - var response = await _httpClient.GetAsync(commandPath, HttpCompletionOption.ResponseContentRead, cancellationSource.Token).ConfigureAwait(false); - using (response) + var response = await _httpClient.GetAsync(commandPath, HttpCompletionOption.ResponseContentRead, cancellationSource.Token).ConfigureAwait(false); + + using (response) { responseContent = await response.Content.ReadAsStringAsync().ConfigureAwait(false); consulResult.StatusCode = response.StatusCode; diff --git a/Gigya.Microdot.ServiceDiscovery/Rewrite/ConsulNodeSource.cs b/Gigya.Microdot.ServiceDiscovery/Rewrite/ConsulNodeSource.cs index 6a97c29c..7a4a53c5 100644 --- a/Gigya.Microdot.ServiceDiscovery/Rewrite/ConsulNodeSource.cs +++ b/Gigya.Microdot.ServiceDiscovery/Rewrite/ConsulNodeSource.cs @@ -21,6 +21,8 @@ #endregion using System; +using System.Collections.Concurrent; +using System.Collections.Generic; using System.Linq; using System.Threading; using System.Threading.Tasks; @@ -55,8 +57,10 @@ internal class ConsulNodeSource : INodeSource private HealthMessage _healthStatus = new HealthMessage(Health.Info, message: null, suppressMessage: true); private readonly IDisposable _healthCheck; + private ConcurrentDictionary, Boolean> _taskCompletionSourcesList { get; } - public ConsulNodeSource( + + public ConsulNodeSource( DeploymentIdentifier deploymentIdentifier, ILog log, ConsulClient consulClient, @@ -64,7 +68,8 @@ public ConsulNodeSource( Func getConfig, Func getAggregatingHealthStatus) { - DeploymentIdentifier = deploymentIdentifier; + _taskCompletionSourcesList = new ConcurrentDictionary, Boolean>(); + DeploymentIdentifier = deploymentIdentifier; Log = log; ConsulClient = consulClient; DateTime = dateTime; @@ -89,7 +94,10 @@ public Node[] GetNodes() else return nodes.Nodes; } - + public void RegisterForSchemaChangeEvent(DeploymentIdentifier deploymentIdentifier, TaskCompletionSource<(string version, Node[] nodes)> tcs) + { + _taskCompletionSourcesList.TryAdd(tcs, false); + } private async Task UpdateLoop() @@ -111,8 +119,34 @@ private async Task UpdateLoop() if (deploymentVersionTask.IsCompleted) { deploymentVersion = deploymentVersionTask.Result; - if (deploymentVersion.Error == null && deploymentVersion.IsUndeployed == false) - lastKnownDeploymentVersion = deploymentVersion.ResponseObject; + if (deploymentVersion.Error == null && deploymentVersion.IsUndeployed == false) + { + if (lastKnownDeploymentVersion != deploymentVersion.ResponseObject) + { + List> toRemoveList = new List>(); + + foreach (KeyValuePair, Boolean> entry in _taskCompletionSourcesList) + { + if (entry.Value == false) + { + entry.Key.SetResult((deploymentVersion.ResponseObject, null)); + + _taskCompletionSourcesList.TryUpdate(entry.Key, false, true); + + toRemoveList.Add(entry.Key); + } + } + + foreach (TaskCompletionSource<(string version, Node[] nodes)> key in toRemoveList) + { + var val = new Boolean(); + _taskCompletionSourcesList.TryRemove(key, out val); + } + } + + lastKnownDeploymentVersion = deploymentVersion.ResponseObject; + } + deploymentVersionTask = ConsulClient.GetDeploymentVersion(DeploymentIdentifier, deploymentVersion.ModifyIndex ?? InitialModifyIndex, _shutdownToken.Token); } diff --git a/Gigya.Microdot.ServiceDiscovery/Rewrite/Discovery.cs b/Gigya.Microdot.ServiceDiscovery/Rewrite/Discovery.cs index 541ff6ab..41165e9a 100644 --- a/Gigya.Microdot.ServiceDiscovery/Rewrite/Discovery.cs +++ b/Gigya.Microdot.ServiceDiscovery/Rewrite/Discovery.cs @@ -47,7 +47,8 @@ internal sealed class Discovery : IDiscovery private Func CreateConfigNodeSource { get; } private Dictionary NodeSourceFactories { get; } - class NodeSourceAndAccesstime + + class NodeSourceAndAccesstime { public string NodeSourceType; public Task NodeSourceTask; @@ -77,7 +78,6 @@ public Discovery(Func getConfig, } - /// public ILoadBalancer CreateLoadBalancer(DeploymentIdentifier deploymentIdentifier, ReachabilityCheck reachabilityCheck, TrafficRoutingStrategy trafficRoutingStrategy) { @@ -85,7 +85,6 @@ public ILoadBalancer CreateLoadBalancer(DeploymentIdentifier deploymentIdentifie } - /// public async Task GetNodes(DeploymentIdentifier deploymentIdentifier) { @@ -115,7 +114,40 @@ public async Task GetNodes(DeploymentIdentifier deploymentIdentifier) else return null; } + public async Task<(string version, Node[] nodes)> WaitForServiceChanges(DeploymentIdentifier deploymentIdentifier) + { + TaskCompletionSource<(string version, Node[] nodes)> tcs = new TaskCompletionSource<(string version, Node[] nodes)>(); + + // We have a cached node source; query it + if (_nodeSources.TryGetValue(deploymentIdentifier, out Lazy lazySource)) + { + lazySource.Value.LastAccessTime = DateTime.UtcNow; + var nodeSource = await lazySource.Value.NodeSourceTask.ConfigureAwait(false); + + nodeSource.RegisterForSchemaChangeEvent(deploymentIdentifier, tcs); + } + + // No node source but the service is deployed; create one and query it + else if (await IsServiceDeployed(deploymentIdentifier).ConfigureAwait(false)) + { + string sourceType = GetConfiguredSourceType(deploymentIdentifier); + lazySource = _nodeSources.GetOrAdd(deploymentIdentifier, _ => new Lazy(() => + new NodeSourceAndAccesstime + { + NodeSourceType = sourceType, + LastAccessTime = DateTime.UtcNow, + NodeSourceTask = CreateNodeSource(sourceType, deploymentIdentifier) + })); + var nodeSource = await lazySource.Value.NodeSourceTask.ConfigureAwait(false); + + nodeSource.RegisterForSchemaChangeEvent(deploymentIdentifier, tcs); + } + // No node source and the service is not deployed; return null + else return (null, null); + + return await tcs.Task; + } private async Task IsServiceDeployed(DeploymentIdentifier deploymentIdentifier) { diff --git a/Gigya.Microdot.ServiceDiscovery/Rewrite/IDiscovery.cs b/Gigya.Microdot.ServiceDiscovery/Rewrite/IDiscovery.cs index c381ac10..12ae61b9 100644 --- a/Gigya.Microdot.ServiceDiscovery/Rewrite/IDiscovery.cs +++ b/Gigya.Microdot.ServiceDiscovery/Rewrite/IDiscovery.cs @@ -48,5 +48,7 @@ public interface IDiscovery: IDisposable /// identifier for service and env for which LoadBalancer is requested /// a list of , or null if the service is not deployed in the requested environment Task GetNodes(DeploymentIdentifier deploymentIdentifier); + + Task<(string version, Node[] nodes)> WaitForServiceChanges(DeploymentIdentifier deploymentIdentifier); } } diff --git a/Gigya.Microdot.ServiceDiscovery/Rewrite/INodeSource.cs b/Gigya.Microdot.ServiceDiscovery/Rewrite/INodeSource.cs index c5b43dc0..915c8eb8 100644 --- a/Gigya.Microdot.ServiceDiscovery/Rewrite/INodeSource.cs +++ b/Gigya.Microdot.ServiceDiscovery/Rewrite/INodeSource.cs @@ -21,6 +21,7 @@ #endregion using System; +using System.Threading.Tasks; using Gigya.Common.Contracts.Exceptions; using Gigya.Microdot.SharedLogic.Rewrite; @@ -37,5 +38,7 @@ public interface INodeSource: IDisposable /// A non-empty array of nodes. /// Thrown when no nodes are available, the service was undeployed or an error occurred. Node[] GetNodes(); + + void RegisterForSchemaChangeEvent(DeploymentIdentifier deploymentIdentifier, TaskCompletionSource<(string version, Node[] nodes)> tcs); } } diff --git a/Gigya.Microdot.ServiceDiscovery/Rewrite/LocalNodeSource.cs b/Gigya.Microdot.ServiceDiscovery/Rewrite/LocalNodeSource.cs index f5a1f719..82ae2e6b 100644 --- a/Gigya.Microdot.ServiceDiscovery/Rewrite/LocalNodeSource.cs +++ b/Gigya.Microdot.ServiceDiscovery/Rewrite/LocalNodeSource.cs @@ -43,6 +43,11 @@ public void Dispose() { // nothing to shutdown } + + public void RegisterForSchemaChangeEvent(DeploymentIdentifier deploymentIdentifier, System.Threading.Tasks.TaskCompletionSource<(string version, Node[] nodes)> tcs) + { + throw new System.NotImplementedException(); + } } } \ No newline at end of file