From 59d8a2fdd422f4f34c8c598306df5afef46601d3 Mon Sep 17 00:00:00 2001 From: anitarua Date: Wed, 21 May 2025 15:57:40 -0700 Subject: [PATCH 01/15] fix: add bookkeeping for number of active subscriptions --- .gitignore | 1 + Makefile | 4 + momento-sdk/build.gradle.kts | 5 + .../TopicsSubscriptionInitializationTest.java | 594 ++++++++++++++++++ .../momento/sdk/ISubscriptionCallbacks.java | 2 +- .../main/java/momento/sdk/ScsTopicClient.java | 83 ++- .../momento/sdk/ScsTopicGrpcStubsManager.java | 66 +- 7 files changed, 716 insertions(+), 39 deletions(-) create mode 100644 momento-sdk/src/intTest/java/momento/sdk/subscriptionInitialization/TopicsSubscriptionInitializationTest.java diff --git a/.gitignore b/.gitignore index db9d3e1f..ef5d4ec8 100644 --- a/.gitignore +++ b/.gitignore @@ -46,3 +46,4 @@ examples/lib/.classpath examples/lib/.project examples/lib/bin .vscode/ +momento-sdk/bin \ No newline at end of file diff --git a/Makefile b/Makefile index 52c1c1e7..075471f8 100644 --- a/Makefile +++ b/Makefile @@ -42,6 +42,10 @@ test-leaderboard-service: test-topics-service: @CONSISTENT_READS=1 ./gradlew test-topics-service +## Run the topics subscription initialization tests +test-topics-subscription-initialization: + @CONSISTENT_READS=1 ./gradlew test-topics-subscription-initialization + ## Run the http service tests test-http-service: @echo "No tests for http service." diff --git a/momento-sdk/build.gradle.kts b/momento-sdk/build.gradle.kts index 439a9711..d43e215f 100644 --- a/momento-sdk/build.gradle.kts +++ b/momento-sdk/build.gradle.kts @@ -109,3 +109,8 @@ registerIntegrationTestTask( "test-retries", listOf("momento.sdk.retry.*") ) + +registerIntegrationTestTask( + "test-topics-subscription-initialization", + listOf("momento.sdk.subscriptionInitialization.*") +) diff --git a/momento-sdk/src/intTest/java/momento/sdk/subscriptionInitialization/TopicsSubscriptionInitializationTest.java b/momento-sdk/src/intTest/java/momento/sdk/subscriptionInitialization/TopicsSubscriptionInitializationTest.java new file mode 100644 index 00000000..14cb599f --- /dev/null +++ b/momento-sdk/src/intTest/java/momento/sdk/subscriptionInitialization/TopicsSubscriptionInitializationTest.java @@ -0,0 +1,594 @@ +package momento.sdk.subscriptionInitialization; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.jupiter.api.Assertions.assertEquals; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import momento.sdk.ISubscriptionCallbacks; +import momento.sdk.TopicClient; +import momento.sdk.auth.CredentialProvider; +import momento.sdk.config.TopicConfiguration; +import momento.sdk.config.transport.GrpcConfiguration; +import momento.sdk.config.transport.StaticTransportStrategy; +import momento.sdk.exceptions.MomentoErrorCode; +import momento.sdk.responses.topic.TopicMessage; +import momento.sdk.responses.topic.TopicSubscribeResponse; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Timeout; +import org.slf4j.LoggerFactory; + +/******************************************************************/ +/* Do not run these tests in CI as they rely on using a cache with +/* a subscription limit >= 2010. +/* Provide the name of your dev cache with greater subscription +/* limits using the TEST_CACHE_NAME environment variable. +/******************************************************************/ + +public class TopicsSubscriptionInitializationTest { + private static String cacheName; + private static CredentialProvider credentialProvider; + private int unsubscribeCounter = 0; + + private ISubscriptionCallbacks callbacks() { + return new ISubscriptionCallbacks() { + @Override + public void onItem(TopicMessage message) {} + + @Override + public void onCompleted() { + unsubscribeCounter++; + } + + @Override + public void onError(Throwable t) {} + }; + } + + @BeforeAll + static void setupAll() { + cacheName = System.getenv("TEST_CACHE_NAME"); + if (cacheName == null) { + throw new RuntimeException("TEST_CACHE_NAME environment variable not set"); + } + + credentialProvider = CredentialProvider.fromEnvVar("MOMENTO_API_KEY"); + } + + @Test + @Timeout(1000) + public void oneStreamChannel_doesNotSilentlyQueueSubscribeRequestOnFullChannel() { + unsubscribeCounter = 0; + final GrpcConfiguration grpcConfig = + new GrpcConfiguration(Duration.ofMillis(15000)).withNumStreamGrpcChannels(1); + final TopicConfiguration topicConfiguration = + new TopicConfiguration( + new StaticTransportStrategy(grpcConfig), + LoggerFactory.getLogger(TopicsSubscriptionInitializationTest.class)); + TopicClient topicClient = TopicClient.builder(credentialProvider, topicConfiguration).build(); + + // Starting 100 subscriptions on 1 channel should be fine + List subscriptions = new ArrayList<>(); + for (int i = 0; i < 100; i++) { + final TopicSubscribeResponse response = + topicClient.subscribe(cacheName, "test-topic", callbacks()).join(); + assertThat(response).isInstanceOf(TopicSubscribeResponse.Subscription.class); + subscriptions.add((TopicSubscribeResponse.Subscription) response); + } + + // Wait a bit for all subscriptions to be fully established + try { + Thread.sleep(500); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException("Test interrupted while waiting for subscriptions", e); + } + + // Starting one more subscription should produce resource exhausted error + final TopicSubscribeResponse response = + topicClient.subscribe(cacheName, "test-topic", callbacks()).join(); + assertThat(response).isInstanceOf(TopicSubscribeResponse.Error.class); + assertEquals( + MomentoErrorCode.CLIENT_RESOURCE_EXHAUSTED, + ((TopicSubscribeResponse.Error) response).getErrorCode()); + + // Ending some subscriptions should free up streams and allow new subscriptions + subscriptions.get(0).unsubscribe(); + // Wait for the subscription to end + try { + Thread.sleep(200); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException("Test interrupted while waiting for subscriptions", e); + } + assertEquals(1, unsubscribeCounter); + + final TopicSubscribeResponse response2 = + topicClient.subscribe(cacheName, "test-topic", callbacks()).join(); + assertThat(response2).isInstanceOf(TopicSubscribeResponse.Subscription.class); + subscriptions.add((TopicSubscribeResponse.Subscription) response2); + + // Cleanup + for (TopicSubscribeResponse.Subscription sub : subscriptions) { + if (sub != null) { + sub.unsubscribe(); + } + } + topicClient.close(); + } + + @Test + @Timeout(1000) + public void twoStreamChannels_handlesBurstOfSubscribeAndUnsubscribeRequests() { + unsubscribeCounter = 0; + final int numGrpcChannels = 2; + final int maxStreamCapacity = 100 * numGrpcChannels; + final GrpcConfiguration grpcConfig = + new GrpcConfiguration(Duration.ofMillis(15000)).withNumStreamGrpcChannels(numGrpcChannels); + final TopicConfiguration topicConfiguration = + new TopicConfiguration( + new StaticTransportStrategy(grpcConfig), + LoggerFactory.getLogger(TopicsSubscriptionInitializationTest.class)); + TopicClient topicClient = TopicClient.builder(credentialProvider, topicConfiguration).build(); + + List> subscribeRequests = new ArrayList<>(); + for (int i = 0; i < maxStreamCapacity; i++) { + final CompletableFuture response = + topicClient.subscribe(cacheName, "test-topic", callbacks()); + subscribeRequests.add(response); + } + // Wait for all the subscribe requests to complete + CompletableFuture.allOf(subscribeRequests.toArray(new CompletableFuture[0])).join(); + + // Wait a bit for all subscriptions to be fully established + try { + Thread.sleep(500); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException("Test interrupted while waiting for subscriptions", e); + } + + // Verify they all succeeded + List subscriptions = new ArrayList<>(); + for (CompletableFuture future : subscribeRequests) { + TopicSubscribeResponse response = future.join(); + assertThat(response).isInstanceOf(TopicSubscribeResponse.Subscription.class); + subscriptions.add((TopicSubscribeResponse.Subscription) response); + } + + // Unsubscribe half of the subscriptions + final int unsubscribeBurstSize = maxStreamCapacity / 2; + for (int i = 0; i < unsubscribeBurstSize; i++) { + subscriptions.get(i).unsubscribe(); + } + // Wait a bit for the subscription to end + try { + Thread.sleep(200); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException("Test interrupted while waiting for subscriptions", e); + } + assertEquals(unsubscribeBurstSize, unsubscribeCounter); + + // Burst of subscribe requests should succeed + final int subscribeBurstSize = maxStreamCapacity / 2 + 10; + List> subscribeRequests2 = new ArrayList<>(); + for (int i = 0; i < subscribeBurstSize; i++) { + final CompletableFuture subscribePromise = + topicClient.subscribe(cacheName, "test-topic", callbacks()); + subscribeRequests2.add(subscribePromise); + } + CompletableFuture.allOf(subscribeRequests2.toArray(new CompletableFuture[0])).join(); + + // Wait a bit for all subscriptions to be fully established + try { + Thread.sleep(500); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException("Test interrupted while waiting for subscriptions", e); + } + + List successfulSubscriptions2 = new ArrayList<>(); + int numFailedSubscriptions = 0; + for (CompletableFuture future : subscribeRequests2) { + TopicSubscribeResponse response = future.join(); + if (response instanceof TopicSubscribeResponse.Subscription) { + successfulSubscriptions2.add((TopicSubscribeResponse.Subscription) response); + } else { + numFailedSubscriptions++; + } + } + assertEquals(10, numFailedSubscriptions); + assertEquals(subscribeBurstSize - 10, successfulSubscriptions2.size()); + + // Cleanup + for (TopicSubscribeResponse.Subscription sub : subscriptions) { + sub.unsubscribe(); + } + topicClient.close(); + } + + @Test + @Timeout(1000) + public void twoStreamChannels_handlesBurstOfSubscribeRequestsAtHalfOfMaxCapacity() { + final int numGrpcChannels = 2; + final int maxStreamCapacity = 100 * numGrpcChannels; + final GrpcConfiguration grpcConfig = + new GrpcConfiguration(Duration.ofMillis(15000)).withNumStreamGrpcChannels(numGrpcChannels); + final TopicConfiguration topicConfiguration = + new TopicConfiguration( + new StaticTransportStrategy(grpcConfig), + LoggerFactory.getLogger(TopicsSubscriptionInitializationTest.class)); + TopicClient topicClient = TopicClient.builder(credentialProvider, topicConfiguration).build(); + + List> subscribeRequests = new ArrayList<>(); + for (int i = 0; i < maxStreamCapacity / 2; i++) { + final CompletableFuture response = + topicClient.subscribe(cacheName, "test-topic", callbacks()); + subscribeRequests.add(response); + } + // Wait for all the subscribe requests to complete + CompletableFuture.allOf(subscribeRequests.toArray(new CompletableFuture[0])).join(); + + // Wait a bit for all subscriptions to be fully established + try { + Thread.sleep(500); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException("Test interrupted while waiting for subscriptions", e); + } + + // Verify they all succeeded + List subscriptions = new ArrayList<>(); + for (CompletableFuture future : subscribeRequests) { + TopicSubscribeResponse response = future.join(); + assertThat(response).isInstanceOf(TopicSubscribeResponse.Subscription.class); + subscriptions.add((TopicSubscribeResponse.Subscription) response); + } + + // Cleanup + for (TopicSubscribeResponse.Subscription sub : subscriptions) { + sub.unsubscribe(); + } + topicClient.close(); + } + + @Test + @Timeout(1000) + public void twoStreamChannels_handlesBurstOfSubscribeRequestsAtMaxCapacity() { + final int numGrpcChannels = 2; + final int maxStreamCapacity = 100 * numGrpcChannels; + final GrpcConfiguration grpcConfig = + new GrpcConfiguration(Duration.ofMillis(15000)).withNumStreamGrpcChannels(numGrpcChannels); + final TopicConfiguration topicConfiguration = + new TopicConfiguration( + new StaticTransportStrategy(grpcConfig), + LoggerFactory.getLogger(TopicsSubscriptionInitializationTest.class)); + TopicClient topicClient = TopicClient.builder(credentialProvider, topicConfiguration).build(); + + List> subscribeRequests = new ArrayList<>(); + for (int i = 0; i < maxStreamCapacity; i++) { + final CompletableFuture response = + topicClient.subscribe(cacheName, "test-topic", callbacks()); + subscribeRequests.add(response); + } + // Wait for all the subscribe requests to complete + CompletableFuture.allOf(subscribeRequests.toArray(new CompletableFuture[0])).join(); + + // Wait a bit for all subscriptions to be fully established + try { + Thread.sleep(500); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException("Test interrupted while waiting for subscriptions", e); + } + + // Verify they all succeeded + List subscriptions = new ArrayList<>(); + for (CompletableFuture future : subscribeRequests) { + TopicSubscribeResponse response = future.join(); + assertThat(response).isInstanceOf(TopicSubscribeResponse.Subscription.class); + subscriptions.add((TopicSubscribeResponse.Subscription) response); + } + + // Cleanup + for (TopicSubscribeResponse.Subscription sub : subscriptions) { + sub.unsubscribe(); + } + topicClient.close(); + } + + @Test + @Timeout(1000) + public void twoStreamChannels_handlesBurstOfSubscribeRequestsAtOverMaxCapacity() { + final int numGrpcChannels = 2; + final int maxStreamCapacity = 100 * numGrpcChannels; + final GrpcConfiguration grpcConfig = + new GrpcConfiguration(Duration.ofMillis(15000)).withNumStreamGrpcChannels(numGrpcChannels); + final TopicConfiguration topicConfiguration = + new TopicConfiguration( + new StaticTransportStrategy(grpcConfig), + LoggerFactory.getLogger(TopicsSubscriptionInitializationTest.class)); + TopicClient topicClient = TopicClient.builder(credentialProvider, topicConfiguration).build(); + + List> subscribeRequests = new ArrayList<>(); + for (int i = 0; i < maxStreamCapacity + 10; i++) { + final CompletableFuture response = + topicClient.subscribe(cacheName, "test-topic", callbacks()); + subscribeRequests.add(response); + } + // Wait for all the subscribe requests to complete + CompletableFuture.allOf(subscribeRequests.toArray(new CompletableFuture[0])).join(); + + // Wait a bit for all subscriptions to be fully established + try { + Thread.sleep(500); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException("Test interrupted while waiting for subscriptions", e); + } + + // Verify they all succeeded + List subscriptions = new ArrayList<>(); + int numFailedSubscriptions = 0; + for (CompletableFuture future : subscribeRequests) { + TopicSubscribeResponse response = future.join(); + if (response instanceof TopicSubscribeResponse.Error) { + numFailedSubscriptions++; + } else { + assertThat(response).isInstanceOf(TopicSubscribeResponse.Subscription.class); + subscriptions.add((TopicSubscribeResponse.Subscription) response); + } + } + assertEquals(10, numFailedSubscriptions); + assertEquals(maxStreamCapacity, subscriptions.size()); + + // Cleanup + for (TopicSubscribeResponse.Subscription sub : subscriptions) { + sub.unsubscribe(); + } + topicClient.close(); + } + + @Test + @Timeout(1000) + public void tenStreamChannels_handlesBurstOfSubscribeRequestsAtHalfOfMaxCapacity() { + final int numGrpcChannels = 10; + final int maxStreamCapacity = 100 * numGrpcChannels; + final GrpcConfiguration grpcConfig = + new GrpcConfiguration(Duration.ofMillis(15000)).withNumStreamGrpcChannels(numGrpcChannels); + final TopicConfiguration topicConfiguration = + new TopicConfiguration( + new StaticTransportStrategy(grpcConfig), + LoggerFactory.getLogger(TopicsSubscriptionInitializationTest.class)); + TopicClient topicClient = TopicClient.builder(credentialProvider, topicConfiguration).build(); + + List> subscribeRequests = new ArrayList<>(); + for (int i = 0; i < maxStreamCapacity / 2; i++) { + final CompletableFuture response = + topicClient.subscribe(cacheName, "test-topic", callbacks()); + subscribeRequests.add(response); + } + // Wait for all the subscribe requests to complete + CompletableFuture.allOf(subscribeRequests.toArray(new CompletableFuture[0])).join(); + + // Wait a bit for all subscriptions to be fully established + try { + Thread.sleep(500); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException("Test interrupted while waiting for subscriptions", e); + } + + // Verify they all succeeded + List subscriptions = new ArrayList<>(); + for (CompletableFuture future : subscribeRequests) { + TopicSubscribeResponse response = future.join(); + assertThat(response).isInstanceOf(TopicSubscribeResponse.Subscription.class); + subscriptions.add((TopicSubscribeResponse.Subscription) response); + } + + // Cleanup + for (TopicSubscribeResponse.Subscription sub : subscriptions) { + sub.unsubscribe(); + } + topicClient.close(); + } + + @Test + @Timeout(1000) + public void tenStreamChannels_handlesBurstOfSubscribeRequestsAtMaxCapacity() { + final int numGrpcChannels = 10; + final int maxStreamCapacity = 100 * numGrpcChannels; + final GrpcConfiguration grpcConfig = + new GrpcConfiguration(Duration.ofMillis(15000)).withNumStreamGrpcChannels(numGrpcChannels); + final TopicConfiguration topicConfiguration = + new TopicConfiguration( + new StaticTransportStrategy(grpcConfig), + LoggerFactory.getLogger(TopicsSubscriptionInitializationTest.class)); + TopicClient topicClient = TopicClient.builder(credentialProvider, topicConfiguration).build(); + + List> subscribeRequests = new ArrayList<>(); + for (int i = 0; i < maxStreamCapacity; i++) { + final CompletableFuture response = + topicClient.subscribe(cacheName, "test-topic", callbacks()); + subscribeRequests.add(response); + } + // Wait for all the subscribe requests to complete + CompletableFuture.allOf(subscribeRequests.toArray(new CompletableFuture[0])).join(); + + // Wait a bit for all subscriptions to be fully established + try { + Thread.sleep(500); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException("Test interrupted while waiting for subscriptions", e); + } + + // Verify they all succeeded + List subscriptions = new ArrayList<>(); + for (CompletableFuture future : subscribeRequests) { + TopicSubscribeResponse response = future.join(); + assertThat(response).isInstanceOf(TopicSubscribeResponse.Subscription.class); + subscriptions.add((TopicSubscribeResponse.Subscription) response); + } + + // Cleanup + for (TopicSubscribeResponse.Subscription sub : subscriptions) { + sub.unsubscribe(); + } + topicClient.close(); + } + + @Test + @Timeout(1000) + public void tenStreamChannels_handlesBurstOfSubscribeRequestsAtOverMaxCapacity() { + final int numGrpcChannels = 10; + final int maxStreamCapacity = 100 * numGrpcChannels; + final GrpcConfiguration grpcConfig = + new GrpcConfiguration(Duration.ofMillis(15000)).withNumStreamGrpcChannels(numGrpcChannels); + final TopicConfiguration topicConfiguration = + new TopicConfiguration( + new StaticTransportStrategy(grpcConfig), + LoggerFactory.getLogger(TopicsSubscriptionInitializationTest.class)); + TopicClient topicClient = TopicClient.builder(credentialProvider, topicConfiguration).build(); + + List> subscribeRequests = new ArrayList<>(); + for (int i = 0; i < maxStreamCapacity + 10; i++) { + final CompletableFuture response = + topicClient.subscribe(cacheName, "test-topic", callbacks()); + subscribeRequests.add(response); + } + // Wait for all the subscribe requests to complete + CompletableFuture.allOf(subscribeRequests.toArray(new CompletableFuture[0])).join(); + + // Wait a bit for all subscriptions to be fully established + try { + Thread.sleep(500); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException("Test interrupted while waiting for subscriptions", e); + } + + // Verify they all succeeded + List subscriptions = new ArrayList<>(); + int numFailedSubscriptions = 0; + for (CompletableFuture future : subscribeRequests) { + TopicSubscribeResponse response = future.join(); + if (response instanceof TopicSubscribeResponse.Error) { + numFailedSubscriptions++; + } else { + assertThat(response).isInstanceOf(TopicSubscribeResponse.Subscription.class); + subscriptions.add((TopicSubscribeResponse.Subscription) response); + } + } + assertEquals(10, numFailedSubscriptions); + assertEquals(maxStreamCapacity, subscriptions.size()); + + // Cleanup + for (TopicSubscribeResponse.Subscription sub : subscriptions) { + sub.unsubscribe(); + } + topicClient.close(); + } + + @Test + @Timeout(1000) + public void twentyStreamChannels_handlesBurstOfSubscribeRequestsAtMaxCapacity() { + final int numGrpcChannels = 20; + final int maxStreamCapacity = 100 * numGrpcChannels; + final GrpcConfiguration grpcConfig = + new GrpcConfiguration(Duration.ofMillis(15000)).withNumStreamGrpcChannels(numGrpcChannels); + final TopicConfiguration topicConfiguration = + new TopicConfiguration( + new StaticTransportStrategy(grpcConfig), + LoggerFactory.getLogger(TopicsSubscriptionInitializationTest.class)); + TopicClient topicClient = TopicClient.builder(credentialProvider, topicConfiguration).build(); + + List> subscribeRequests = new ArrayList<>(); + for (int i = 0; i < maxStreamCapacity; i++) { + final CompletableFuture response = + topicClient.subscribe(cacheName, "test-topic", callbacks()); + subscribeRequests.add(response); + } + // Wait for all the subscribe requests to complete + CompletableFuture.allOf(subscribeRequests.toArray(new CompletableFuture[0])).join(); + + // Wait a bit for all subscriptions to be fully established + try { + Thread.sleep(500); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException("Test interrupted while waiting for subscriptions", e); + } + + // Verify they all succeeded + List subscriptions = new ArrayList<>(); + for (CompletableFuture future : subscribeRequests) { + TopicSubscribeResponse response = future.join(); + assertThat(response).isInstanceOf(TopicSubscribeResponse.Subscription.class); + subscriptions.add((TopicSubscribeResponse.Subscription) response); + } + + // Cleanup + for (TopicSubscribeResponse.Subscription sub : subscriptions) { + sub.unsubscribe(); + } + topicClient.close(); + } + + @Test + @Timeout(1000) + public void twentyStreamChannels_handlesBurstOfSubscribeRequestsAtOverMaxCapacity() { + final int numGrpcChannels = 20; + final int maxStreamCapacity = 100 * numGrpcChannels; + final GrpcConfiguration grpcConfig = + new GrpcConfiguration(Duration.ofMillis(15000)).withNumStreamGrpcChannels(numGrpcChannels); + final TopicConfiguration topicConfiguration = + new TopicConfiguration( + new StaticTransportStrategy(grpcConfig), + LoggerFactory.getLogger(TopicsSubscriptionInitializationTest.class)); + TopicClient topicClient = TopicClient.builder(credentialProvider, topicConfiguration).build(); + + List> subscribeRequests = new ArrayList<>(); + for (int i = 0; i < maxStreamCapacity + 10; i++) { + final CompletableFuture response = + topicClient.subscribe(cacheName, "test-topic", callbacks()); + subscribeRequests.add(response); + } + // Wait for all the subscribe requests to complete + CompletableFuture.allOf(subscribeRequests.toArray(new CompletableFuture[0])).join(); + + // Wait a bit for all subscriptions to be fully established + try { + Thread.sleep(500); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException("Test interrupted while waiting for subscriptions", e); + } + + // Verify they all succeeded + List subscriptions = new ArrayList<>(); + int numFailedSubscriptions = 0; + for (CompletableFuture future : subscribeRequests) { + TopicSubscribeResponse response = future.join(); + if (response instanceof TopicSubscribeResponse.Error) { + numFailedSubscriptions++; + } else { + assertThat(response).isInstanceOf(TopicSubscribeResponse.Subscription.class); + subscriptions.add((TopicSubscribeResponse.Subscription) response); + } + } + assertEquals(10, numFailedSubscriptions); + assertEquals(maxStreamCapacity, subscriptions.size()); + + // Cleanup + for (TopicSubscribeResponse.Subscription sub : subscriptions) { + sub.unsubscribe(); + } + topicClient.close(); + } +} diff --git a/momento-sdk/src/main/java/momento/sdk/ISubscriptionCallbacks.java b/momento-sdk/src/main/java/momento/sdk/ISubscriptionCallbacks.java index be876716..3d7bfad9 100644 --- a/momento-sdk/src/main/java/momento/sdk/ISubscriptionCallbacks.java +++ b/momento-sdk/src/main/java/momento/sdk/ISubscriptionCallbacks.java @@ -33,7 +33,7 @@ default void onHeartbeat() {} /** Called when the connection to the topic is lost. */ default void onConnectionLost() { - logger.info("Connection to topic lost"); + // logger.info("Connection to topic lost"); } /** Called when the connection to the topic is restored. */ diff --git a/momento-sdk/src/main/java/momento/sdk/ScsTopicClient.java b/momento-sdk/src/main/java/momento/sdk/ScsTopicClient.java index cf32f214..177dd895 100644 --- a/momento-sdk/src/main/java/momento/sdk/ScsTopicClient.java +++ b/momento-sdk/src/main/java/momento/sdk/ScsTopicClient.java @@ -118,40 +118,55 @@ private CompletableFuture sendSubscribe( String cacheName, String topicName, ISubscriptionCallbacks callbacks) { final SubscriptionState subscriptionState = new SubscriptionState(); - final IScsTopicConnection connection = - (request, subscription) -> - topicGrpcStubsManager.getNextStreamStub().subscribe(request, subscription); - - long configuredTimeoutSeconds = - topicGrpcStubsManager - .getConfiguration() - .getTransportStrategy() - .getGrpcConfiguration() - .getDeadline() - .getSeconds(); - long firstMessageSubscribeTimeoutSeconds = - configuredTimeoutSeconds > 0 ? configuredTimeoutSeconds : DEFAULT_REQUEST_TIMEOUT_SECONDS; - - @SuppressWarnings("resource") // the wrapper closes itself when a subscription ends. - final SubscriptionWrapper subscriptionWrapper = - new SubscriptionWrapper( - cacheName, - topicName, - connection, - callbacks, - subscriptionState, - firstMessageSubscribeTimeoutSeconds, - subscriptionRetryStrategy); - final CompletableFuture subscribeFuture = subscriptionWrapper.subscribeWithRetry(); - return subscribeFuture.handle( - (v, ex) -> { - if (ex != null) { - return new TopicSubscribeResponse.Error(CacheServiceExceptionMapper.convert(ex)); - } else { - subscriptionState.setUnsubscribeFn(subscriptionWrapper::unsubscribe); - return new TopicSubscribeResponse.Subscription(subscriptionState); - } - }); + try { + // Wrap in try-catch because getNextStreamStub() can throw an exception + // if the number of active subscriptions is already at max capacity. + final StreamStubWithCount stubWithCount = topicGrpcStubsManager.getNextStreamStub(); + + final IScsTopicConnection connection = + (request, subscription) -> stubWithCount.getStub().subscribe(request, subscription); + + long configuredTimeoutSeconds = + topicGrpcStubsManager + .getConfiguration() + .getTransportStrategy() + .getGrpcConfiguration() + .getDeadline() + .getSeconds(); + long firstMessageSubscribeTimeoutSeconds = + configuredTimeoutSeconds > 0 ? configuredTimeoutSeconds : DEFAULT_REQUEST_TIMEOUT_SECONDS; + + @SuppressWarnings("resource") // the wrapper closes itself when a subscription ends. + final SubscriptionWrapper subscriptionWrapper = + new SubscriptionWrapper( + cacheName, + topicName, + connection, + callbacks, + subscriptionState, + firstMessageSubscribeTimeoutSeconds, + subscriptionRetryStrategy); + + final CompletableFuture subscribeFuture = subscriptionWrapper.subscribeWithRetry(); + return subscribeFuture.handle( + (v, ex) -> { + if (ex != null) { + return new TopicSubscribeResponse.Error(CacheServiceExceptionMapper.convert(ex)); + } else { + subscriptionState.setUnsubscribeFn( + () -> { + // Revise the unsubscribe function to decrement the count of active + // subscriptions + stubWithCount.decrementCount(); + subscriptionWrapper.unsubscribe(); + }); + return new TopicSubscribeResponse.Subscription(subscriptionState); + } + }); + } catch (Throwable e) { + return CompletableFuture.completedFuture( + new TopicSubscribeResponse.Error(CacheServiceExceptionMapper.convert(e))); + } } @Override diff --git a/momento-sdk/src/main/java/momento/sdk/ScsTopicGrpcStubsManager.java b/momento-sdk/src/main/java/momento/sdk/ScsTopicGrpcStubsManager.java index a4f43626..3a274011 100644 --- a/momento-sdk/src/main/java/momento/sdk/ScsTopicGrpcStubsManager.java +++ b/momento-sdk/src/main/java/momento/sdk/ScsTopicGrpcStubsManager.java @@ -19,8 +19,37 @@ import momento.sdk.config.TopicConfiguration; import momento.sdk.config.middleware.Middleware; import momento.sdk.config.middleware.MiddlewareRequestHandlerContext; +import momento.sdk.exceptions.ClientSdkException; +import momento.sdk.exceptions.MomentoErrorCode; import momento.sdk.internal.GrpcChannelOptions; +// Helper class for bookkeeping the number of active concurrent subscriptions. +final class StreamStubWithCount { + private final PubsubGrpc.PubsubStub stub; + private final AtomicInteger count = new AtomicInteger(0); + + StreamStubWithCount(PubsubGrpc.PubsubStub stub) { + this.stub = stub; + } + + PubsubGrpc.PubsubStub getStub() { + return stub; + } + + int getCount() { + return count.get(); + } + + int incrementCount() { + return count.incrementAndGet(); + } + + int decrementCount() { + int decremented = count.decrementAndGet(); + return decremented; + } +} + /** * Manager responsible for GRPC channels and stubs for the Topics. * @@ -35,7 +64,7 @@ final class ScsTopicGrpcStubsManager implements Closeable { private final AtomicInteger unaryIndex = new AtomicInteger(0); private final List streamChannels; - private final List streamStubs; + private final List streamStubs; private final AtomicInteger streamIndex = new AtomicInteger(0); public static final UUID CONNECTION_ID_KEY = UUID.randomUUID(); @@ -44,6 +73,7 @@ final class ScsTopicGrpcStubsManager implements Closeable { private final int numStreamGrpcChannels; private final TopicConfiguration configuration; private final Duration deadline; + private final int maximumActiveSubscriptions; ScsTopicGrpcStubsManager( @Nonnull CredentialProvider credentialProvider, @Nonnull TopicConfiguration configuration) { @@ -54,6 +84,9 @@ final class ScsTopicGrpcStubsManager implements Closeable { this.numStreamGrpcChannels = configuration.getTransportStrategy().getGrpcConfiguration().getNumStreamGrpcChannels(); + // Each stream grpc channel can support 100 concurrent active subscriptions + this.maximumActiveSubscriptions = this.numStreamGrpcChannels * 100; + this.unaryChannels = IntStream.range(0, this.numUnaryGrpcChannels) .mapToObj(i -> setupConnection(credentialProvider, configuration)) @@ -65,7 +98,10 @@ final class ScsTopicGrpcStubsManager implements Closeable { .mapToObj(i -> setupConnection(credentialProvider, configuration)) .collect(Collectors.toList()); this.streamStubs = - streamChannels.stream().map(PubsubGrpc::newStub).collect(Collectors.toList()); + streamChannels.stream() + .map(PubsubGrpc::newStub) + .map(StreamStubWithCount::new) + .collect(Collectors.toList()); } private static ManagedChannel setupConnection( @@ -100,8 +136,30 @@ PubsubGrpc.PubsubStub getNextUnaryStub() { } /** Round-robin subscribe stub. */ - PubsubGrpc.PubsubStub getNextStreamStub() { - return streamStubs.get(streamIndex.getAndIncrement() % this.numStreamGrpcChannels); + StreamStubWithCount getNextStreamStub() { + // First check if there's available capacity on any of the stubs + int totalActiveSubscriptions = 0; + for (StreamStubWithCount stubWithCount : streamStubs) { + totalActiveSubscriptions += stubWithCount.getCount(); + } + if (totalActiveSubscriptions < this.maximumActiveSubscriptions) { + // Try to get a client with capacity for another subscription. + // Allow up to maximumActiveSubscriptions attempts. + for (int i = 0; i < this.maximumActiveSubscriptions; i++) { + // Round-robin to the next stub + final StreamStubWithCount stubWithCount = + streamStubs.get(streamIndex.getAndIncrement() % this.numStreamGrpcChannels); + if (stubWithCount.getCount() < 100) { + stubWithCount.incrementCount(); + return stubWithCount; + } + } + } + + // Otherwise return an error + throw new ClientSdkException( + MomentoErrorCode.CLIENT_RESOURCE_EXHAUSTED, + "Maximum number of active subscriptions reached"); } TopicConfiguration getConfiguration() { From f9508db627f2bed7b9b931ad7614e12a79a49e2c Mon Sep 17 00:00:00 2001 From: anitarua Date: Wed, 21 May 2025 15:59:36 -0700 Subject: [PATCH 02/15] cleanup --- .../src/main/java/momento/sdk/ISubscriptionCallbacks.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/momento-sdk/src/main/java/momento/sdk/ISubscriptionCallbacks.java b/momento-sdk/src/main/java/momento/sdk/ISubscriptionCallbacks.java index 3d7bfad9..be876716 100644 --- a/momento-sdk/src/main/java/momento/sdk/ISubscriptionCallbacks.java +++ b/momento-sdk/src/main/java/momento/sdk/ISubscriptionCallbacks.java @@ -33,7 +33,7 @@ default void onHeartbeat() {} /** Called when the connection to the topic is lost. */ default void onConnectionLost() { - // logger.info("Connection to topic lost"); + logger.info("Connection to topic lost"); } /** Called when the connection to the topic is restored. */ From f0b03d541b4fd9f00fe6b1b84b8fbef4e97b5af4 Mon Sep 17 00:00:00 2001 From: anitarua Date: Wed, 21 May 2025 16:07:38 -0700 Subject: [PATCH 03/15] exclude new tests from integration test suite --- momento-sdk/build.gradle.kts | 1 + 1 file changed, 1 insertion(+) diff --git a/momento-sdk/build.gradle.kts b/momento-sdk/build.gradle.kts index d43e215f..0e1fa4c9 100644 --- a/momento-sdk/build.gradle.kts +++ b/momento-sdk/build.gradle.kts @@ -58,6 +58,7 @@ tasks.named("analyzeTestClassesDependencies").configure { tasks.named("integrationTest") { filter { excludeTestsMatching("momento.sdk.retry.*") + excludeTestsMatching("momento.sdk.subscriptionInitialization.*") } } From 15a2b677434f0b32de5ac464f0f33fd29b542053 Mon Sep 17 00:00:00 2001 From: anitarua Date: Fri, 30 May 2025 13:23:16 -0700 Subject: [PATCH 04/15] chore: clean up resource in subscription wrapper test --- .../src/test/java/momento/sdk/SubscriptionWrapperTest.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/momento-sdk/src/test/java/momento/sdk/SubscriptionWrapperTest.java b/momento-sdk/src/test/java/momento/sdk/SubscriptionWrapperTest.java index ceb0e44b..169dca09 100644 --- a/momento-sdk/src/test/java/momento/sdk/SubscriptionWrapperTest.java +++ b/momento-sdk/src/test/java/momento/sdk/SubscriptionWrapperTest.java @@ -121,5 +121,7 @@ public void subscribe( waitingForSubscriptionAttempt.acquire(); assertTrue(gotConnectionRestoredCallback.get()); + + subscriptionWrapper.close(); } } From 7cd5e567543ffeb2a72d71817505b45a514a441c Mon Sep 17 00:00:00 2001 From: anitarua Date: Fri, 30 May 2025 16:08:43 -0700 Subject: [PATCH 05/15] make sure active subscriptions count is decremented upon error as well, not just on unsubscribe --- .../TopicsSubscriptionInitializationTest.java | 66 +++++++++++++++++++ .../main/java/momento/sdk/ScsTopicClient.java | 11 ++-- .../momento/sdk/ScsTopicGrpcStubsManager.java | 3 +- .../java/momento/sdk/SubscriptionWrapper.java | 3 + .../sdk/internal/SubscriptionState.java | 18 +++++ 5 files changed, 92 insertions(+), 9 deletions(-) diff --git a/momento-sdk/src/intTest/java/momento/sdk/subscriptionInitialization/TopicsSubscriptionInitializationTest.java b/momento-sdk/src/intTest/java/momento/sdk/subscriptionInitialization/TopicsSubscriptionInitializationTest.java index 14cb599f..69c0c36c 100644 --- a/momento-sdk/src/intTest/java/momento/sdk/subscriptionInitialization/TopicsSubscriptionInitializationTest.java +++ b/momento-sdk/src/intTest/java/momento/sdk/subscriptionInitialization/TopicsSubscriptionInitializationTest.java @@ -7,6 +7,8 @@ import java.util.ArrayList; import java.util.List; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Semaphore; +import java.util.concurrent.atomic.AtomicInteger; import momento.sdk.ISubscriptionCallbacks; import momento.sdk.TopicClient; import momento.sdk.auth.CredentialProvider; @@ -591,4 +593,68 @@ public void twentyStreamChannels_handlesBurstOfSubscribeRequestsAtOverMaxCapacit } topicClient.close(); } + + @Test + @Timeout(1000) + public void shouldDecrementActiveSubscriptionsCountWhenSubscribeRequestsFail() { + final int numGrpcChannels = 1; + final int maxStreamCapacity = 100 * numGrpcChannels; + final GrpcConfiguration grpcConfig = + new GrpcConfiguration(Duration.ofMillis(15000)).withNumStreamGrpcChannels(numGrpcChannels); + final TopicConfiguration topicConfiguration = + new TopicConfiguration( + new StaticTransportStrategy(grpcConfig), + LoggerFactory.getLogger(TopicsSubscriptionInitializationTest.class)); + TopicClient topicClient = TopicClient.builder(credentialProvider, topicConfiguration).build(); + + final Semaphore errorSemaphore = new Semaphore(0); + final AtomicInteger errorCounter = new AtomicInteger(0); + + final ISubscriptionCallbacks callbacks = + new ISubscriptionCallbacks() { + @Override + public void onItem(TopicMessage message) {} + + @Override + public void onCompleted() {} + + @Override + public void onError(Throwable t) { + errorCounter.incrementAndGet(); + errorSemaphore.release(); + } + }; + + // Should successfully start the maximum number of subscriptions because 10 attempts ran into + // NOT_FOUND_ERROR and the errors should have decremented the active subscriptions count. + List successfulSubscriptions = new ArrayList<>(); + for (int i = 0; i < maxStreamCapacity + 10; i++) { + String cacheNameToUse = cacheName; + if (i % 11 == 0) { + cacheNameToUse = "this-cache-does-not-exist"; + } + TopicSubscribeResponse attempt = + topicClient.subscribe(cacheNameToUse, "test-topic", callbacks).join(); + if (attempt instanceof TopicSubscribeResponse.Subscription) { + successfulSubscriptions.add((TopicSubscribeResponse.Subscription) attempt); + } else { + assertThat(attempt).isInstanceOf(TopicSubscribeResponse.Error.class); + assertThat(((TopicSubscribeResponse.Error) attempt).getErrorCode()) + .isEqualTo(MomentoErrorCode.NOT_FOUND_ERROR); + errorCounter.incrementAndGet(); + } + } + + // Assert that we have received maxStreamCapacity number of successful subscriptions + assertThat(successfulSubscriptions.size()).isEqualTo(maxStreamCapacity); + + // Assert that we have received 10 NOT_FOUND_ERRORs + assertThat(errorCounter).hasValue(10); + + // Cleanup + for (TopicSubscribeResponse.Subscription sub : successfulSubscriptions) { + sub.unsubscribe(); + } + topicClient.close(); + } } diff --git a/momento-sdk/src/main/java/momento/sdk/ScsTopicClient.java b/momento-sdk/src/main/java/momento/sdk/ScsTopicClient.java index 177dd895..10980306 100644 --- a/momento-sdk/src/main/java/momento/sdk/ScsTopicClient.java +++ b/momento-sdk/src/main/java/momento/sdk/ScsTopicClient.java @@ -151,15 +151,12 @@ private CompletableFuture sendSubscribe( return subscribeFuture.handle( (v, ex) -> { if (ex != null) { + stubWithCount.decrementCount(); return new TopicSubscribeResponse.Error(CacheServiceExceptionMapper.convert(ex)); } else { - subscriptionState.setUnsubscribeFn( - () -> { - // Revise the unsubscribe function to decrement the count of active - // subscriptions - stubWithCount.decrementCount(); - subscriptionWrapper.unsubscribe(); - }); + subscriptionState.setDecrementActiveSubscriptionsCountFn( + stubWithCount::decrementCount); + subscriptionState.setUnsubscribeFn(subscriptionWrapper::unsubscribe); return new TopicSubscribeResponse.Subscription(subscriptionState); } }); diff --git a/momento-sdk/src/main/java/momento/sdk/ScsTopicGrpcStubsManager.java b/momento-sdk/src/main/java/momento/sdk/ScsTopicGrpcStubsManager.java index 3a274011..ced8d7f4 100644 --- a/momento-sdk/src/main/java/momento/sdk/ScsTopicGrpcStubsManager.java +++ b/momento-sdk/src/main/java/momento/sdk/ScsTopicGrpcStubsManager.java @@ -45,8 +45,7 @@ int incrementCount() { } int decrementCount() { - int decremented = count.decrementAndGet(); - return decremented; + return count.decrementAndGet(); } } diff --git a/momento-sdk/src/main/java/momento/sdk/SubscriptionWrapper.java b/momento-sdk/src/main/java/momento/sdk/SubscriptionWrapper.java index 1906c828..280cb60c 100644 --- a/momento-sdk/src/main/java/momento/sdk/SubscriptionWrapper.java +++ b/momento-sdk/src/main/java/momento/sdk/SubscriptionWrapper.java @@ -241,6 +241,7 @@ public void onCompleted() { } private void completeExceptionally(CompletableFuture future, Throwable t) { + subscriptionState.decrementActiveSubscriptionsCount(); future.completeExceptionally( new TopicSubscribeResponse.Error(CacheServiceExceptionMapper.convert(t))); close(); @@ -251,6 +252,7 @@ private void scheduleRetry(Duration retryDelay, Runnable retryAction) { } private void handleSubscriptionCompleted() { + subscriptionState.decrementActiveSubscriptionsCount(); callbacks.onCompleted(); } @@ -346,6 +348,7 @@ public void unsubscribe() { @Override public void close() { + subscriptionState.decrementActiveSubscriptionsCount(); scheduler.shutdown(); } } diff --git a/momento-sdk/src/main/java/momento/sdk/internal/SubscriptionState.java b/momento-sdk/src/main/java/momento/sdk/internal/SubscriptionState.java index 5ba3702e..1e1a05bf 100644 --- a/momento-sdk/src/main/java/momento/sdk/internal/SubscriptionState.java +++ b/momento-sdk/src/main/java/momento/sdk/internal/SubscriptionState.java @@ -3,6 +3,7 @@ /** Represents the state of a subscription to a topic. */ public class SubscriptionState { + private Runnable decrementActiveSubscriptionsCountFn; private Runnable unsubscribeFn; private Long lastTopicSequenceNumber; private Long lastTopicSequencePage; @@ -10,6 +11,7 @@ public class SubscriptionState { /** Constructs a new SubscriptionState instance with default values. */ public SubscriptionState() { + this.decrementActiveSubscriptionsCountFn = () -> {}; this.unsubscribeFn = () -> {}; this.isSubscribed = false; } @@ -59,8 +61,24 @@ public void setUnsubscribeFn(Runnable unsubscribeFn) { /** Unsubscribes from the topic, executing the unsubscribe function. */ public void unsubscribe() { if (isSubscribed) { + decrementActiveSubscriptionsCountFn.run(); unsubscribeFn.run(); this.isSubscribed = false; } } + + /** + * Sets the function to be decrement the active subscriptions count for a given stub. + * + * @param decrementActiveSubscriptionsCountFn The function to decrement the active subscriptions + * count. + */ + public void setDecrementActiveSubscriptionsCountFn(Runnable decrementActiveSubscriptionsCountFn) { + this.decrementActiveSubscriptionsCountFn = decrementActiveSubscriptionsCountFn; + } + + /** Decrements the active subscriptions count for a given stub. */ + public void decrementActiveSubscriptionsCount() { + decrementActiveSubscriptionsCountFn.run(); + } } From 25dc9bff9c777f8bc6b8ac56227bd4a3f3842ddc Mon Sep 17 00:00:00 2001 From: anitarua Date: Mon, 2 Jun 2025 12:10:42 -0700 Subject: [PATCH 06/15] clean up subscription initialization tests --- momento-sdk/build.gradle.kts | 5 - .../sdk/retry/BaseMomentoLocalTestClass.java | 35 + .../TopicsSubscriptionInitializationTest.java | 398 +++++++++++ .../TopicsSubscriptionInitializationTest.java | 660 ------------------ 4 files changed, 433 insertions(+), 665 deletions(-) create mode 100644 momento-sdk/src/intTest/java/momento/sdk/retry/TopicsSubscriptionInitializationTest.java delete mode 100644 momento-sdk/src/intTest/java/momento/sdk/subscriptionInitialization/TopicsSubscriptionInitializationTest.java diff --git a/momento-sdk/build.gradle.kts b/momento-sdk/build.gradle.kts index 0e1fa4c9..b6eb7f64 100644 --- a/momento-sdk/build.gradle.kts +++ b/momento-sdk/build.gradle.kts @@ -110,8 +110,3 @@ registerIntegrationTestTask( "test-retries", listOf("momento.sdk.retry.*") ) - -registerIntegrationTestTask( - "test-topics-subscription-initialization", - listOf("momento.sdk.subscriptionInitialization.*") -) diff --git a/momento-sdk/src/intTest/java/momento/sdk/retry/BaseMomentoLocalTestClass.java b/momento-sdk/src/intTest/java/momento/sdk/retry/BaseMomentoLocalTestClass.java index 886494b9..ed4159db 100644 --- a/momento-sdk/src/intTest/java/momento/sdk/retry/BaseMomentoLocalTestClass.java +++ b/momento-sdk/src/intTest/java/momento/sdk/retry/BaseMomentoLocalTestClass.java @@ -14,6 +14,8 @@ import momento.sdk.config.Configurations; import momento.sdk.config.TopicConfiguration; import momento.sdk.config.TopicConfigurations; +import momento.sdk.config.transport.GrpcConfiguration; +import momento.sdk.config.transport.StaticTransportStrategy; import momento.sdk.responses.cache.control.CacheCreateResponse; import momento.sdk.retry.utils.MomentoLocalMiddleware; import momento.sdk.retry.utils.MomentoLocalMiddlewareArgs; @@ -126,6 +128,39 @@ public static void withCacheAndTopicClient( } } + public static void withCacheAndTopicClientWithNumStreamChannels( + int numStreamChannels, + MomentoLocalMiddlewareArgs testMetricsMiddlewareArgs, + TopicTestCallback testCallback) + throws Exception { + + final String cacheName = testCacheName(); + final String hostname = + Optional.ofNullable(System.getenv("MOMENTO_HOSTNAME")).orElse("127.0.0.1"); + final int port = + Optional.ofNullable(System.getenv("MOMENTO_PORT")).map(Integer::parseInt).orElse(8080); + final CredentialProvider credentialProvider = new MomentoLocalProvider(hostname, port); + + final GrpcConfiguration grpcConfig = + new GrpcConfiguration(Duration.ofMillis(15000)) + .withNumStreamGrpcChannels(numStreamChannels); + final TopicConfiguration topicConfiguration = + new TopicConfiguration(new StaticTransportStrategy(grpcConfig)) + .withMiddleware(new MomentoLocalMiddleware(testMetricsMiddlewareArgs)); + + try (final CacheClient cacheClient = + CacheClient.builder( + credentialProvider, Configurations.Laptop.latest(), DEFAULT_TTL_SECONDS) + .build(); + final TopicClient topicClient = + TopicClient.builder(credentialProvider, topicConfiguration).build()) { + if (cacheClient.createCache(cacheName).join() instanceof CacheCreateResponse.Error) { + throw new RuntimeException("Failed to create cache: " + cacheName); + } + testCallback.run(topicClient, cacheName); + } + } + @FunctionalInterface public interface CacheTestCallback { void run(CacheClient cc, String cacheName) throws Exception; diff --git a/momento-sdk/src/intTest/java/momento/sdk/retry/TopicsSubscriptionInitializationTest.java b/momento-sdk/src/intTest/java/momento/sdk/retry/TopicsSubscriptionInitializationTest.java new file mode 100644 index 00000000..52749da2 --- /dev/null +++ b/momento-sdk/src/intTest/java/momento/sdk/retry/TopicsSubscriptionInitializationTest.java @@ -0,0 +1,398 @@ +package momento.sdk.retry; + +import static momento.sdk.retry.BaseMomentoLocalTestClass.withCacheAndTopicClientWithNumStreamChannels; +import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.slf4j.LoggerFactory.getLogger; + +import java.util.ArrayList; +import java.util.List; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Semaphore; +import java.util.concurrent.atomic.AtomicInteger; +import momento.sdk.ISubscriptionCallbacks; +import momento.sdk.exceptions.MomentoErrorCode; +import momento.sdk.responses.topic.TopicMessage; +import momento.sdk.responses.topic.TopicSubscribeResponse; +import momento.sdk.retry.utils.MomentoLocalMiddlewareArgs; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Timeout; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; +import org.slf4j.Logger; + +public class TopicsSubscriptionInitializationTest { + private int unsubscribeCounter = 0; + + private ISubscriptionCallbacks callbacks() { + return new ISubscriptionCallbacks() { + @Override + public void onItem(TopicMessage message) {} + + @Override + public void onCompleted() { + unsubscribeCounter++; + } + + @Override + public void onError(Throwable t) {} + }; + } + + private static Logger logger; + + @BeforeAll + static void setup() { + logger = getLogger(TopicsSubscriptionInitializationTest.class); + } + + @Test + @Timeout(30) + public void oneStreamChannel_doesNotSilentlyQueueSubscribeRequestOnFullChannel() + throws Exception { + unsubscribeCounter = 0; + + withCacheAndTopicClientWithNumStreamChannels( + 1, + new MomentoLocalMiddlewareArgs.Builder(logger, UUID.randomUUID().toString()).build(), + (topicClient, cacheName) -> { + // These should all succeed + // Starting 100 subscriptions on 1 channel should be fine + List subscriptions = new ArrayList<>(); + for (int i = 0; i < 100; i++) { + final TopicSubscribeResponse response = + topicClient.subscribe(cacheName, "test-topic", callbacks()).join(); + assertThat(response).isInstanceOf(TopicSubscribeResponse.Subscription.class); + subscriptions.add((TopicSubscribeResponse.Subscription) response); + } + + // Wait a bit for all subscriptions to be fully established + try { + Thread.sleep(500); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException("Test interrupted while waiting for subscriptions", e); + } + + // Starting one more subscription should produce resource exhausted error + final TopicSubscribeResponse response = + topicClient.subscribe(cacheName, "test-topic", callbacks()).join(); + assertThat(response).isInstanceOf(TopicSubscribeResponse.Error.class); + assertEquals( + MomentoErrorCode.CLIENT_RESOURCE_EXHAUSTED, + ((TopicSubscribeResponse.Error) response).getErrorCode()); + + // Ending a subscription should free up one new stream + subscriptions.get(0).unsubscribe(); + // Wait for the subscription to end + try { + Thread.sleep(200); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException("Test interrupted while waiting for subscriptions", e); + } + assertEquals(1, unsubscribeCounter); + + final TopicSubscribeResponse response2 = + topicClient.subscribe(cacheName, "test-topic", callbacks()).join(); + assertThat(response2).isInstanceOf(TopicSubscribeResponse.Subscription.class); + subscriptions.add((TopicSubscribeResponse.Subscription) response2); + + // Cleanup + for (TopicSubscribeResponse.Subscription sub : subscriptions) { + if (sub != null) { + sub.unsubscribe(); + } + } + }); + } + + @ParameterizedTest + @ValueSource(ints = {2, 10, 20}) + @Timeout(30) + public void multipleStreamChannels_handlesBurstOfSubscribeAndUnsubscribeRequests( + int numGrpcChannels) throws Exception { + unsubscribeCounter = 0; + final int maxStreamCapacity = 100 * numGrpcChannels; + + withCacheAndTopicClientWithNumStreamChannels( + numGrpcChannels, + new MomentoLocalMiddlewareArgs.Builder(logger, UUID.randomUUID().toString()).build(), + (topicClient, cacheName) -> { + List> subscribeRequests = new ArrayList<>(); + for (int i = 0; i < maxStreamCapacity; i++) { + final CompletableFuture response = + topicClient.subscribe(cacheName, "test-topic", callbacks()); + subscribeRequests.add(response); + } + // Wait for all the subscribe requests to complete + CompletableFuture.allOf(subscribeRequests.toArray(new CompletableFuture[0])).join(); + + // Wait a bit for all subscriptions to be fully established + try { + Thread.sleep(500); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException("Test interrupted while waiting for subscriptions", e); + } + + // Verify they all succeeded + List subscriptions = new ArrayList<>(); + for (CompletableFuture future : subscribeRequests) { + TopicSubscribeResponse response = future.join(); + assertThat(response).isInstanceOf(TopicSubscribeResponse.Subscription.class); + subscriptions.add((TopicSubscribeResponse.Subscription) response); + } + + // Unsubscribe half of the subscriptions + final int unsubscribeBurstSize = maxStreamCapacity / 2; + for (int i = 0; i < unsubscribeBurstSize; i++) { + subscriptions.get(i).unsubscribe(); + } + // Wait a bit for the subscription to end + try { + Thread.sleep(200); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException("Test interrupted while waiting for subscriptions", e); + } + assertEquals(unsubscribeBurstSize, unsubscribeCounter); + + // Burst of subscribe requests should succeed + final int subscribeBurstSize = maxStreamCapacity / 2 + 10; + List> subscribeRequests2 = new ArrayList<>(); + for (int i = 0; i < subscribeBurstSize; i++) { + final CompletableFuture subscribePromise = + topicClient.subscribe(cacheName, "test-topic", callbacks()); + subscribeRequests2.add(subscribePromise); + } + CompletableFuture.allOf(subscribeRequests2.toArray(new CompletableFuture[0])).join(); + + // Wait a bit for all subscriptions to be fully established + try { + Thread.sleep(500); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException("Test interrupted while waiting for subscriptions", e); + } + + List successfulSubscriptions2 = new ArrayList<>(); + int numFailedSubscriptions = 0; + for (CompletableFuture future : subscribeRequests2) { + TopicSubscribeResponse response = future.join(); + if (response instanceof TopicSubscribeResponse.Subscription) { + successfulSubscriptions2.add((TopicSubscribeResponse.Subscription) response); + } else { + numFailedSubscriptions++; + } + } + assertEquals(10, numFailedSubscriptions); + assertEquals(subscribeBurstSize - 10, successfulSubscriptions2.size()); + + // Cleanup + for (TopicSubscribeResponse.Subscription sub : subscriptions) { + sub.unsubscribe(); + } + }); + } + + @ParameterizedTest + @ValueSource(ints = {2, 10, 20}) + @Timeout(30) + public void multipleStreamChannels_handlesBurstOfSubscribeRequestsAtMaxCapacity( + int numGrpcChannels) throws Exception { + final int maxStreamCapacity = 100 * numGrpcChannels; + + withCacheAndTopicClientWithNumStreamChannels( + numGrpcChannels, + new MomentoLocalMiddlewareArgs.Builder(logger, UUID.randomUUID().toString()).build(), + (topicClient, cacheName) -> { + List> subscribeRequests = new ArrayList<>(); + for (int i = 0; i < maxStreamCapacity; i++) { + final CompletableFuture response = + topicClient.subscribe(cacheName, "test-topic", callbacks()); + subscribeRequests.add(response); + } + // Wait for all the subscribe requests to complete + CompletableFuture.allOf(subscribeRequests.toArray(new CompletableFuture[0])).join(); + + // Wait a bit for all subscriptions to be fully established + try { + Thread.sleep(500); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException("Test interrupted while waiting for subscriptions", e); + } + + // Verify they all succeeded + List subscriptions = new ArrayList<>(); + for (CompletableFuture future : subscribeRequests) { + TopicSubscribeResponse response = future.join(); + assertThat(response).isInstanceOf(TopicSubscribeResponse.Subscription.class); + subscriptions.add((TopicSubscribeResponse.Subscription) response); + } + + // Cleanup + for (TopicSubscribeResponse.Subscription sub : subscriptions) { + sub.unsubscribe(); + } + }); + } + + @ParameterizedTest + @ValueSource(ints = {2, 10, 20}) + @Timeout(30) + public void multipleStreamChannels_handlesBurstOfSubscribeRequestsAtOverMaxCapacity( + int numGrpcChannels) throws Exception { + final int maxStreamCapacity = 100 * numGrpcChannels; + + withCacheAndTopicClientWithNumStreamChannels( + numGrpcChannels, + new MomentoLocalMiddlewareArgs.Builder(logger, UUID.randomUUID().toString()).build(), + (topicClient, cacheName) -> { + List> subscribeRequests = new ArrayList<>(); + for (int i = 0; i < maxStreamCapacity + 10; i++) { + final CompletableFuture response = + topicClient.subscribe(cacheName, "test-topic", callbacks()); + subscribeRequests.add(response); + } + // Wait for all the subscribe requests to complete + CompletableFuture.allOf(subscribeRequests.toArray(new CompletableFuture[0])).join(); + + // Wait a bit for all subscriptions to be fully established + try { + Thread.sleep(500); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException("Test interrupted while waiting for subscriptions", e); + } + + // Verify they all succeeded + List subscriptions = new ArrayList<>(); + int numFailedSubscriptions = 0; + for (CompletableFuture future : subscribeRequests) { + TopicSubscribeResponse response = future.join(); + if (response instanceof TopicSubscribeResponse.Error) { + numFailedSubscriptions++; + } else { + assertThat(response).isInstanceOf(TopicSubscribeResponse.Subscription.class); + subscriptions.add((TopicSubscribeResponse.Subscription) response); + } + } + assertEquals(10, numFailedSubscriptions); + assertEquals(maxStreamCapacity, subscriptions.size()); + + // Cleanup + for (TopicSubscribeResponse.Subscription sub : subscriptions) { + sub.unsubscribe(); + } + }); + } + + @ParameterizedTest + @ValueSource(ints = {2, 10}) + @Timeout(30) + public void multipleStreamChannels_handlesBurstOfSubscribeRequestsAtHalfOfMaxCapacity( + int numGrpcChannels) throws Exception { + final int maxStreamCapacity = 100 * numGrpcChannels; + + withCacheAndTopicClientWithNumStreamChannels( + numGrpcChannels, + new MomentoLocalMiddlewareArgs.Builder(logger, UUID.randomUUID().toString()).build(), + (topicClient, cacheName) -> { + List> subscribeRequests = new ArrayList<>(); + for (int i = 0; i < maxStreamCapacity / 2; i++) { + final CompletableFuture response = + topicClient.subscribe(cacheName, "test-topic", callbacks()); + subscribeRequests.add(response); + } + // Wait for all the subscribe requests to complete + CompletableFuture.allOf(subscribeRequests.toArray(new CompletableFuture[0])).join(); + + // Wait a bit for all subscriptions to be fully established + try { + Thread.sleep(500); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException("Test interrupted while waiting for subscriptions", e); + } + + // Verify they all succeeded + List subscriptions = new ArrayList<>(); + for (CompletableFuture future : subscribeRequests) { + TopicSubscribeResponse response = future.join(); + assertThat(response).isInstanceOf(TopicSubscribeResponse.Subscription.class); + subscriptions.add((TopicSubscribeResponse.Subscription) response); + } + + // Cleanup + for (TopicSubscribeResponse.Subscription sub : subscriptions) { + sub.unsubscribe(); + } + }); + } + + @Test + @Timeout(30) + public void shouldDecrementActiveSubscriptionsCountWhenSubscribeRequestsFail() throws Exception { + final int numGrpcChannels = 1; + final int maxStreamCapacity = 100 * numGrpcChannels; + + withCacheAndTopicClientWithNumStreamChannels( + numGrpcChannels, + new MomentoLocalMiddlewareArgs.Builder(logger, UUID.randomUUID().toString()).build(), + (topicClient, cacheName) -> { + final Semaphore errorSemaphore = new Semaphore(0); + final AtomicInteger errorCounter = new AtomicInteger(0); + + final ISubscriptionCallbacks callbacks = + new ISubscriptionCallbacks() { + @Override + public void onItem(TopicMessage message) {} + + @Override + public void onCompleted() {} + + @Override + public void onError(Throwable t) { + errorCounter.incrementAndGet(); + errorSemaphore.release(); + } + }; + + // Should successfully start the maximum number of subscriptions because 10 attempts ran + // into NOT_FOUND_ERROR. The errors should have decremented the active subscriptions + // count. + List successfulSubscriptions = new ArrayList<>(); + for (int i = 0; i < maxStreamCapacity + 10; i++) { + String cacheNameToUse = cacheName; + if (i % 11 == 0) { + cacheNameToUse = "this-cache-does-not-exist"; + } + TopicSubscribeResponse attempt = + topicClient.subscribe(cacheNameToUse, "test-topic", callbacks).join(); + if (attempt instanceof TopicSubscribeResponse.Subscription) { + successfulSubscriptions.add((TopicSubscribeResponse.Subscription) attempt); + } else { + assertThat(attempt).isInstanceOf(TopicSubscribeResponse.Error.class); + assertThat(((TopicSubscribeResponse.Error) attempt).getErrorCode()) + .isEqualTo(MomentoErrorCode.NOT_FOUND_ERROR); + errorCounter.incrementAndGet(); + } + } + + // Assert that we have received maxStreamCapacity number of successful subscriptions + assertThat(successfulSubscriptions.size()).isEqualTo(maxStreamCapacity); + + // Assert that we have received 10 NOT_FOUND_ERRORs + assertThat(errorCounter).hasValue(10); + + // Cleanup + for (TopicSubscribeResponse.Subscription sub : successfulSubscriptions) { + sub.unsubscribe(); + } + }); + } +} diff --git a/momento-sdk/src/intTest/java/momento/sdk/subscriptionInitialization/TopicsSubscriptionInitializationTest.java b/momento-sdk/src/intTest/java/momento/sdk/subscriptionInitialization/TopicsSubscriptionInitializationTest.java deleted file mode 100644 index 69c0c36c..00000000 --- a/momento-sdk/src/intTest/java/momento/sdk/subscriptionInitialization/TopicsSubscriptionInitializationTest.java +++ /dev/null @@ -1,660 +0,0 @@ -package momento.sdk.subscriptionInitialization; - -import static org.assertj.core.api.Assertions.assertThat; -import static org.junit.jupiter.api.Assertions.assertEquals; - -import java.time.Duration; -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.Semaphore; -import java.util.concurrent.atomic.AtomicInteger; -import momento.sdk.ISubscriptionCallbacks; -import momento.sdk.TopicClient; -import momento.sdk.auth.CredentialProvider; -import momento.sdk.config.TopicConfiguration; -import momento.sdk.config.transport.GrpcConfiguration; -import momento.sdk.config.transport.StaticTransportStrategy; -import momento.sdk.exceptions.MomentoErrorCode; -import momento.sdk.responses.topic.TopicMessage; -import momento.sdk.responses.topic.TopicSubscribeResponse; -import org.junit.jupiter.api.BeforeAll; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.Timeout; -import org.slf4j.LoggerFactory; - -/******************************************************************/ -/* Do not run these tests in CI as they rely on using a cache with -/* a subscription limit >= 2010. -/* Provide the name of your dev cache with greater subscription -/* limits using the TEST_CACHE_NAME environment variable. -/******************************************************************/ - -public class TopicsSubscriptionInitializationTest { - private static String cacheName; - private static CredentialProvider credentialProvider; - private int unsubscribeCounter = 0; - - private ISubscriptionCallbacks callbacks() { - return new ISubscriptionCallbacks() { - @Override - public void onItem(TopicMessage message) {} - - @Override - public void onCompleted() { - unsubscribeCounter++; - } - - @Override - public void onError(Throwable t) {} - }; - } - - @BeforeAll - static void setupAll() { - cacheName = System.getenv("TEST_CACHE_NAME"); - if (cacheName == null) { - throw new RuntimeException("TEST_CACHE_NAME environment variable not set"); - } - - credentialProvider = CredentialProvider.fromEnvVar("MOMENTO_API_KEY"); - } - - @Test - @Timeout(1000) - public void oneStreamChannel_doesNotSilentlyQueueSubscribeRequestOnFullChannel() { - unsubscribeCounter = 0; - final GrpcConfiguration grpcConfig = - new GrpcConfiguration(Duration.ofMillis(15000)).withNumStreamGrpcChannels(1); - final TopicConfiguration topicConfiguration = - new TopicConfiguration( - new StaticTransportStrategy(grpcConfig), - LoggerFactory.getLogger(TopicsSubscriptionInitializationTest.class)); - TopicClient topicClient = TopicClient.builder(credentialProvider, topicConfiguration).build(); - - // Starting 100 subscriptions on 1 channel should be fine - List subscriptions = new ArrayList<>(); - for (int i = 0; i < 100; i++) { - final TopicSubscribeResponse response = - topicClient.subscribe(cacheName, "test-topic", callbacks()).join(); - assertThat(response).isInstanceOf(TopicSubscribeResponse.Subscription.class); - subscriptions.add((TopicSubscribeResponse.Subscription) response); - } - - // Wait a bit for all subscriptions to be fully established - try { - Thread.sleep(500); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new RuntimeException("Test interrupted while waiting for subscriptions", e); - } - - // Starting one more subscription should produce resource exhausted error - final TopicSubscribeResponse response = - topicClient.subscribe(cacheName, "test-topic", callbacks()).join(); - assertThat(response).isInstanceOf(TopicSubscribeResponse.Error.class); - assertEquals( - MomentoErrorCode.CLIENT_RESOURCE_EXHAUSTED, - ((TopicSubscribeResponse.Error) response).getErrorCode()); - - // Ending some subscriptions should free up streams and allow new subscriptions - subscriptions.get(0).unsubscribe(); - // Wait for the subscription to end - try { - Thread.sleep(200); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new RuntimeException("Test interrupted while waiting for subscriptions", e); - } - assertEquals(1, unsubscribeCounter); - - final TopicSubscribeResponse response2 = - topicClient.subscribe(cacheName, "test-topic", callbacks()).join(); - assertThat(response2).isInstanceOf(TopicSubscribeResponse.Subscription.class); - subscriptions.add((TopicSubscribeResponse.Subscription) response2); - - // Cleanup - for (TopicSubscribeResponse.Subscription sub : subscriptions) { - if (sub != null) { - sub.unsubscribe(); - } - } - topicClient.close(); - } - - @Test - @Timeout(1000) - public void twoStreamChannels_handlesBurstOfSubscribeAndUnsubscribeRequests() { - unsubscribeCounter = 0; - final int numGrpcChannels = 2; - final int maxStreamCapacity = 100 * numGrpcChannels; - final GrpcConfiguration grpcConfig = - new GrpcConfiguration(Duration.ofMillis(15000)).withNumStreamGrpcChannels(numGrpcChannels); - final TopicConfiguration topicConfiguration = - new TopicConfiguration( - new StaticTransportStrategy(grpcConfig), - LoggerFactory.getLogger(TopicsSubscriptionInitializationTest.class)); - TopicClient topicClient = TopicClient.builder(credentialProvider, topicConfiguration).build(); - - List> subscribeRequests = new ArrayList<>(); - for (int i = 0; i < maxStreamCapacity; i++) { - final CompletableFuture response = - topicClient.subscribe(cacheName, "test-topic", callbacks()); - subscribeRequests.add(response); - } - // Wait for all the subscribe requests to complete - CompletableFuture.allOf(subscribeRequests.toArray(new CompletableFuture[0])).join(); - - // Wait a bit for all subscriptions to be fully established - try { - Thread.sleep(500); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new RuntimeException("Test interrupted while waiting for subscriptions", e); - } - - // Verify they all succeeded - List subscriptions = new ArrayList<>(); - for (CompletableFuture future : subscribeRequests) { - TopicSubscribeResponse response = future.join(); - assertThat(response).isInstanceOf(TopicSubscribeResponse.Subscription.class); - subscriptions.add((TopicSubscribeResponse.Subscription) response); - } - - // Unsubscribe half of the subscriptions - final int unsubscribeBurstSize = maxStreamCapacity / 2; - for (int i = 0; i < unsubscribeBurstSize; i++) { - subscriptions.get(i).unsubscribe(); - } - // Wait a bit for the subscription to end - try { - Thread.sleep(200); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new RuntimeException("Test interrupted while waiting for subscriptions", e); - } - assertEquals(unsubscribeBurstSize, unsubscribeCounter); - - // Burst of subscribe requests should succeed - final int subscribeBurstSize = maxStreamCapacity / 2 + 10; - List> subscribeRequests2 = new ArrayList<>(); - for (int i = 0; i < subscribeBurstSize; i++) { - final CompletableFuture subscribePromise = - topicClient.subscribe(cacheName, "test-topic", callbacks()); - subscribeRequests2.add(subscribePromise); - } - CompletableFuture.allOf(subscribeRequests2.toArray(new CompletableFuture[0])).join(); - - // Wait a bit for all subscriptions to be fully established - try { - Thread.sleep(500); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new RuntimeException("Test interrupted while waiting for subscriptions", e); - } - - List successfulSubscriptions2 = new ArrayList<>(); - int numFailedSubscriptions = 0; - for (CompletableFuture future : subscribeRequests2) { - TopicSubscribeResponse response = future.join(); - if (response instanceof TopicSubscribeResponse.Subscription) { - successfulSubscriptions2.add((TopicSubscribeResponse.Subscription) response); - } else { - numFailedSubscriptions++; - } - } - assertEquals(10, numFailedSubscriptions); - assertEquals(subscribeBurstSize - 10, successfulSubscriptions2.size()); - - // Cleanup - for (TopicSubscribeResponse.Subscription sub : subscriptions) { - sub.unsubscribe(); - } - topicClient.close(); - } - - @Test - @Timeout(1000) - public void twoStreamChannels_handlesBurstOfSubscribeRequestsAtHalfOfMaxCapacity() { - final int numGrpcChannels = 2; - final int maxStreamCapacity = 100 * numGrpcChannels; - final GrpcConfiguration grpcConfig = - new GrpcConfiguration(Duration.ofMillis(15000)).withNumStreamGrpcChannels(numGrpcChannels); - final TopicConfiguration topicConfiguration = - new TopicConfiguration( - new StaticTransportStrategy(grpcConfig), - LoggerFactory.getLogger(TopicsSubscriptionInitializationTest.class)); - TopicClient topicClient = TopicClient.builder(credentialProvider, topicConfiguration).build(); - - List> subscribeRequests = new ArrayList<>(); - for (int i = 0; i < maxStreamCapacity / 2; i++) { - final CompletableFuture response = - topicClient.subscribe(cacheName, "test-topic", callbacks()); - subscribeRequests.add(response); - } - // Wait for all the subscribe requests to complete - CompletableFuture.allOf(subscribeRequests.toArray(new CompletableFuture[0])).join(); - - // Wait a bit for all subscriptions to be fully established - try { - Thread.sleep(500); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new RuntimeException("Test interrupted while waiting for subscriptions", e); - } - - // Verify they all succeeded - List subscriptions = new ArrayList<>(); - for (CompletableFuture future : subscribeRequests) { - TopicSubscribeResponse response = future.join(); - assertThat(response).isInstanceOf(TopicSubscribeResponse.Subscription.class); - subscriptions.add((TopicSubscribeResponse.Subscription) response); - } - - // Cleanup - for (TopicSubscribeResponse.Subscription sub : subscriptions) { - sub.unsubscribe(); - } - topicClient.close(); - } - - @Test - @Timeout(1000) - public void twoStreamChannels_handlesBurstOfSubscribeRequestsAtMaxCapacity() { - final int numGrpcChannels = 2; - final int maxStreamCapacity = 100 * numGrpcChannels; - final GrpcConfiguration grpcConfig = - new GrpcConfiguration(Duration.ofMillis(15000)).withNumStreamGrpcChannels(numGrpcChannels); - final TopicConfiguration topicConfiguration = - new TopicConfiguration( - new StaticTransportStrategy(grpcConfig), - LoggerFactory.getLogger(TopicsSubscriptionInitializationTest.class)); - TopicClient topicClient = TopicClient.builder(credentialProvider, topicConfiguration).build(); - - List> subscribeRequests = new ArrayList<>(); - for (int i = 0; i < maxStreamCapacity; i++) { - final CompletableFuture response = - topicClient.subscribe(cacheName, "test-topic", callbacks()); - subscribeRequests.add(response); - } - // Wait for all the subscribe requests to complete - CompletableFuture.allOf(subscribeRequests.toArray(new CompletableFuture[0])).join(); - - // Wait a bit for all subscriptions to be fully established - try { - Thread.sleep(500); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new RuntimeException("Test interrupted while waiting for subscriptions", e); - } - - // Verify they all succeeded - List subscriptions = new ArrayList<>(); - for (CompletableFuture future : subscribeRequests) { - TopicSubscribeResponse response = future.join(); - assertThat(response).isInstanceOf(TopicSubscribeResponse.Subscription.class); - subscriptions.add((TopicSubscribeResponse.Subscription) response); - } - - // Cleanup - for (TopicSubscribeResponse.Subscription sub : subscriptions) { - sub.unsubscribe(); - } - topicClient.close(); - } - - @Test - @Timeout(1000) - public void twoStreamChannels_handlesBurstOfSubscribeRequestsAtOverMaxCapacity() { - final int numGrpcChannels = 2; - final int maxStreamCapacity = 100 * numGrpcChannels; - final GrpcConfiguration grpcConfig = - new GrpcConfiguration(Duration.ofMillis(15000)).withNumStreamGrpcChannels(numGrpcChannels); - final TopicConfiguration topicConfiguration = - new TopicConfiguration( - new StaticTransportStrategy(grpcConfig), - LoggerFactory.getLogger(TopicsSubscriptionInitializationTest.class)); - TopicClient topicClient = TopicClient.builder(credentialProvider, topicConfiguration).build(); - - List> subscribeRequests = new ArrayList<>(); - for (int i = 0; i < maxStreamCapacity + 10; i++) { - final CompletableFuture response = - topicClient.subscribe(cacheName, "test-topic", callbacks()); - subscribeRequests.add(response); - } - // Wait for all the subscribe requests to complete - CompletableFuture.allOf(subscribeRequests.toArray(new CompletableFuture[0])).join(); - - // Wait a bit for all subscriptions to be fully established - try { - Thread.sleep(500); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new RuntimeException("Test interrupted while waiting for subscriptions", e); - } - - // Verify they all succeeded - List subscriptions = new ArrayList<>(); - int numFailedSubscriptions = 0; - for (CompletableFuture future : subscribeRequests) { - TopicSubscribeResponse response = future.join(); - if (response instanceof TopicSubscribeResponse.Error) { - numFailedSubscriptions++; - } else { - assertThat(response).isInstanceOf(TopicSubscribeResponse.Subscription.class); - subscriptions.add((TopicSubscribeResponse.Subscription) response); - } - } - assertEquals(10, numFailedSubscriptions); - assertEquals(maxStreamCapacity, subscriptions.size()); - - // Cleanup - for (TopicSubscribeResponse.Subscription sub : subscriptions) { - sub.unsubscribe(); - } - topicClient.close(); - } - - @Test - @Timeout(1000) - public void tenStreamChannels_handlesBurstOfSubscribeRequestsAtHalfOfMaxCapacity() { - final int numGrpcChannels = 10; - final int maxStreamCapacity = 100 * numGrpcChannels; - final GrpcConfiguration grpcConfig = - new GrpcConfiguration(Duration.ofMillis(15000)).withNumStreamGrpcChannels(numGrpcChannels); - final TopicConfiguration topicConfiguration = - new TopicConfiguration( - new StaticTransportStrategy(grpcConfig), - LoggerFactory.getLogger(TopicsSubscriptionInitializationTest.class)); - TopicClient topicClient = TopicClient.builder(credentialProvider, topicConfiguration).build(); - - List> subscribeRequests = new ArrayList<>(); - for (int i = 0; i < maxStreamCapacity / 2; i++) { - final CompletableFuture response = - topicClient.subscribe(cacheName, "test-topic", callbacks()); - subscribeRequests.add(response); - } - // Wait for all the subscribe requests to complete - CompletableFuture.allOf(subscribeRequests.toArray(new CompletableFuture[0])).join(); - - // Wait a bit for all subscriptions to be fully established - try { - Thread.sleep(500); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new RuntimeException("Test interrupted while waiting for subscriptions", e); - } - - // Verify they all succeeded - List subscriptions = new ArrayList<>(); - for (CompletableFuture future : subscribeRequests) { - TopicSubscribeResponse response = future.join(); - assertThat(response).isInstanceOf(TopicSubscribeResponse.Subscription.class); - subscriptions.add((TopicSubscribeResponse.Subscription) response); - } - - // Cleanup - for (TopicSubscribeResponse.Subscription sub : subscriptions) { - sub.unsubscribe(); - } - topicClient.close(); - } - - @Test - @Timeout(1000) - public void tenStreamChannels_handlesBurstOfSubscribeRequestsAtMaxCapacity() { - final int numGrpcChannels = 10; - final int maxStreamCapacity = 100 * numGrpcChannels; - final GrpcConfiguration grpcConfig = - new GrpcConfiguration(Duration.ofMillis(15000)).withNumStreamGrpcChannels(numGrpcChannels); - final TopicConfiguration topicConfiguration = - new TopicConfiguration( - new StaticTransportStrategy(grpcConfig), - LoggerFactory.getLogger(TopicsSubscriptionInitializationTest.class)); - TopicClient topicClient = TopicClient.builder(credentialProvider, topicConfiguration).build(); - - List> subscribeRequests = new ArrayList<>(); - for (int i = 0; i < maxStreamCapacity; i++) { - final CompletableFuture response = - topicClient.subscribe(cacheName, "test-topic", callbacks()); - subscribeRequests.add(response); - } - // Wait for all the subscribe requests to complete - CompletableFuture.allOf(subscribeRequests.toArray(new CompletableFuture[0])).join(); - - // Wait a bit for all subscriptions to be fully established - try { - Thread.sleep(500); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new RuntimeException("Test interrupted while waiting for subscriptions", e); - } - - // Verify they all succeeded - List subscriptions = new ArrayList<>(); - for (CompletableFuture future : subscribeRequests) { - TopicSubscribeResponse response = future.join(); - assertThat(response).isInstanceOf(TopicSubscribeResponse.Subscription.class); - subscriptions.add((TopicSubscribeResponse.Subscription) response); - } - - // Cleanup - for (TopicSubscribeResponse.Subscription sub : subscriptions) { - sub.unsubscribe(); - } - topicClient.close(); - } - - @Test - @Timeout(1000) - public void tenStreamChannels_handlesBurstOfSubscribeRequestsAtOverMaxCapacity() { - final int numGrpcChannels = 10; - final int maxStreamCapacity = 100 * numGrpcChannels; - final GrpcConfiguration grpcConfig = - new GrpcConfiguration(Duration.ofMillis(15000)).withNumStreamGrpcChannels(numGrpcChannels); - final TopicConfiguration topicConfiguration = - new TopicConfiguration( - new StaticTransportStrategy(grpcConfig), - LoggerFactory.getLogger(TopicsSubscriptionInitializationTest.class)); - TopicClient topicClient = TopicClient.builder(credentialProvider, topicConfiguration).build(); - - List> subscribeRequests = new ArrayList<>(); - for (int i = 0; i < maxStreamCapacity + 10; i++) { - final CompletableFuture response = - topicClient.subscribe(cacheName, "test-topic", callbacks()); - subscribeRequests.add(response); - } - // Wait for all the subscribe requests to complete - CompletableFuture.allOf(subscribeRequests.toArray(new CompletableFuture[0])).join(); - - // Wait a bit for all subscriptions to be fully established - try { - Thread.sleep(500); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new RuntimeException("Test interrupted while waiting for subscriptions", e); - } - - // Verify they all succeeded - List subscriptions = new ArrayList<>(); - int numFailedSubscriptions = 0; - for (CompletableFuture future : subscribeRequests) { - TopicSubscribeResponse response = future.join(); - if (response instanceof TopicSubscribeResponse.Error) { - numFailedSubscriptions++; - } else { - assertThat(response).isInstanceOf(TopicSubscribeResponse.Subscription.class); - subscriptions.add((TopicSubscribeResponse.Subscription) response); - } - } - assertEquals(10, numFailedSubscriptions); - assertEquals(maxStreamCapacity, subscriptions.size()); - - // Cleanup - for (TopicSubscribeResponse.Subscription sub : subscriptions) { - sub.unsubscribe(); - } - topicClient.close(); - } - - @Test - @Timeout(1000) - public void twentyStreamChannels_handlesBurstOfSubscribeRequestsAtMaxCapacity() { - final int numGrpcChannels = 20; - final int maxStreamCapacity = 100 * numGrpcChannels; - final GrpcConfiguration grpcConfig = - new GrpcConfiguration(Duration.ofMillis(15000)).withNumStreamGrpcChannels(numGrpcChannels); - final TopicConfiguration topicConfiguration = - new TopicConfiguration( - new StaticTransportStrategy(grpcConfig), - LoggerFactory.getLogger(TopicsSubscriptionInitializationTest.class)); - TopicClient topicClient = TopicClient.builder(credentialProvider, topicConfiguration).build(); - - List> subscribeRequests = new ArrayList<>(); - for (int i = 0; i < maxStreamCapacity; i++) { - final CompletableFuture response = - topicClient.subscribe(cacheName, "test-topic", callbacks()); - subscribeRequests.add(response); - } - // Wait for all the subscribe requests to complete - CompletableFuture.allOf(subscribeRequests.toArray(new CompletableFuture[0])).join(); - - // Wait a bit for all subscriptions to be fully established - try { - Thread.sleep(500); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new RuntimeException("Test interrupted while waiting for subscriptions", e); - } - - // Verify they all succeeded - List subscriptions = new ArrayList<>(); - for (CompletableFuture future : subscribeRequests) { - TopicSubscribeResponse response = future.join(); - assertThat(response).isInstanceOf(TopicSubscribeResponse.Subscription.class); - subscriptions.add((TopicSubscribeResponse.Subscription) response); - } - - // Cleanup - for (TopicSubscribeResponse.Subscription sub : subscriptions) { - sub.unsubscribe(); - } - topicClient.close(); - } - - @Test - @Timeout(1000) - public void twentyStreamChannels_handlesBurstOfSubscribeRequestsAtOverMaxCapacity() { - final int numGrpcChannels = 20; - final int maxStreamCapacity = 100 * numGrpcChannels; - final GrpcConfiguration grpcConfig = - new GrpcConfiguration(Duration.ofMillis(15000)).withNumStreamGrpcChannels(numGrpcChannels); - final TopicConfiguration topicConfiguration = - new TopicConfiguration( - new StaticTransportStrategy(grpcConfig), - LoggerFactory.getLogger(TopicsSubscriptionInitializationTest.class)); - TopicClient topicClient = TopicClient.builder(credentialProvider, topicConfiguration).build(); - - List> subscribeRequests = new ArrayList<>(); - for (int i = 0; i < maxStreamCapacity + 10; i++) { - final CompletableFuture response = - topicClient.subscribe(cacheName, "test-topic", callbacks()); - subscribeRequests.add(response); - } - // Wait for all the subscribe requests to complete - CompletableFuture.allOf(subscribeRequests.toArray(new CompletableFuture[0])).join(); - - // Wait a bit for all subscriptions to be fully established - try { - Thread.sleep(500); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new RuntimeException("Test interrupted while waiting for subscriptions", e); - } - - // Verify they all succeeded - List subscriptions = new ArrayList<>(); - int numFailedSubscriptions = 0; - for (CompletableFuture future : subscribeRequests) { - TopicSubscribeResponse response = future.join(); - if (response instanceof TopicSubscribeResponse.Error) { - numFailedSubscriptions++; - } else { - assertThat(response).isInstanceOf(TopicSubscribeResponse.Subscription.class); - subscriptions.add((TopicSubscribeResponse.Subscription) response); - } - } - assertEquals(10, numFailedSubscriptions); - assertEquals(maxStreamCapacity, subscriptions.size()); - - // Cleanup - for (TopicSubscribeResponse.Subscription sub : subscriptions) { - sub.unsubscribe(); - } - topicClient.close(); - } - - @Test - @Timeout(1000) - public void shouldDecrementActiveSubscriptionsCountWhenSubscribeRequestsFail() { - final int numGrpcChannels = 1; - final int maxStreamCapacity = 100 * numGrpcChannels; - final GrpcConfiguration grpcConfig = - new GrpcConfiguration(Duration.ofMillis(15000)).withNumStreamGrpcChannels(numGrpcChannels); - final TopicConfiguration topicConfiguration = - new TopicConfiguration( - new StaticTransportStrategy(grpcConfig), - LoggerFactory.getLogger(TopicsSubscriptionInitializationTest.class)); - TopicClient topicClient = TopicClient.builder(credentialProvider, topicConfiguration).build(); - - final Semaphore errorSemaphore = new Semaphore(0); - final AtomicInteger errorCounter = new AtomicInteger(0); - - final ISubscriptionCallbacks callbacks = - new ISubscriptionCallbacks() { - @Override - public void onItem(TopicMessage message) {} - - @Override - public void onCompleted() {} - - @Override - public void onError(Throwable t) { - errorCounter.incrementAndGet(); - errorSemaphore.release(); - } - }; - - // Should successfully start the maximum number of subscriptions because 10 attempts ran into - // NOT_FOUND_ERROR and the errors should have decremented the active subscriptions count. - List successfulSubscriptions = new ArrayList<>(); - for (int i = 0; i < maxStreamCapacity + 10; i++) { - String cacheNameToUse = cacheName; - if (i % 11 == 0) { - cacheNameToUse = "this-cache-does-not-exist"; - } - TopicSubscribeResponse attempt = - topicClient.subscribe(cacheNameToUse, "test-topic", callbacks).join(); - if (attempt instanceof TopicSubscribeResponse.Subscription) { - successfulSubscriptions.add((TopicSubscribeResponse.Subscription) attempt); - } else { - assertThat(attempt).isInstanceOf(TopicSubscribeResponse.Error.class); - assertThat(((TopicSubscribeResponse.Error) attempt).getErrorCode()) - .isEqualTo(MomentoErrorCode.NOT_FOUND_ERROR); - errorCounter.incrementAndGet(); - } - } - - // Assert that we have received maxStreamCapacity number of successful subscriptions - assertThat(successfulSubscriptions.size()).isEqualTo(maxStreamCapacity); - - // Assert that we have received 10 NOT_FOUND_ERRORs - assertThat(errorCounter).hasValue(10); - - // Cleanup - for (TopicSubscribeResponse.Subscription sub : successfulSubscriptions) { - sub.unsubscribe(); - } - topicClient.close(); - } -} From 69250c66779b55a799623b54d7f44985b70da367 Mon Sep 17 00:00:00 2001 From: anitarua Date: Tue, 3 Jun 2025 09:43:08 -0700 Subject: [PATCH 07/15] fix: add null check in subscription wrapper --- momento-sdk/src/main/java/momento/sdk/SubscriptionWrapper.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/momento-sdk/src/main/java/momento/sdk/SubscriptionWrapper.java b/momento-sdk/src/main/java/momento/sdk/SubscriptionWrapper.java index 280cb60c..d0ab0838 100644 --- a/momento-sdk/src/main/java/momento/sdk/SubscriptionWrapper.java +++ b/momento-sdk/src/main/java/momento/sdk/SubscriptionWrapper.java @@ -92,7 +92,7 @@ CompletableFuture subscribeWithRetry() { logger.warn( "First message timeout exceeded for topic {} on cache {}", topicName, cacheName); - if (subscription != null) { + if (subscription.get() != null) { subscription.get().cancel("Timed out waiting for first message", null); } From 0861dbc21dd2d28a4e900cb674de0be94d2ffb28 Mon Sep 17 00:00:00 2001 From: anitarua Date: Tue, 3 Jun 2025 11:32:38 -0700 Subject: [PATCH 08/15] add mid-stream error test --- .../TopicsSubscriptionInitializationTest.java | 69 ++++++++++++++++++- 1 file changed, 66 insertions(+), 3 deletions(-) diff --git a/momento-sdk/src/intTest/java/momento/sdk/retry/TopicsSubscriptionInitializationTest.java b/momento-sdk/src/intTest/java/momento/sdk/retry/TopicsSubscriptionInitializationTest.java index 52749da2..2527224c 100644 --- a/momento-sdk/src/intTest/java/momento/sdk/retry/TopicsSubscriptionInitializationTest.java +++ b/momento-sdk/src/intTest/java/momento/sdk/retry/TopicsSubscriptionInitializationTest.java @@ -6,6 +6,7 @@ import static org.slf4j.LoggerFactory.getLogger; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.UUID; import java.util.concurrent.CompletableFuture; @@ -88,7 +89,7 @@ public void oneStreamChannel_doesNotSilentlyQueueSubscribeRequestOnFullChannel() subscriptions.get(0).unsubscribe(); // Wait for the subscription to end try { - Thread.sleep(200); + Thread.sleep(500); } catch (InterruptedException e) { Thread.currentThread().interrupt(); throw new RuntimeException("Test interrupted while waiting for subscriptions", e); @@ -153,7 +154,7 @@ public void multipleStreamChannels_handlesBurstOfSubscribeAndUnsubscribeRequests } // Wait a bit for the subscription to end try { - Thread.sleep(200); + Thread.sleep(500); } catch (InterruptedException e) { Thread.currentThread().interrupt(); throw new RuntimeException("Test interrupted while waiting for subscriptions", e); @@ -292,7 +293,7 @@ public void multipleStreamChannels_handlesBurstOfSubscribeRequestsAtOverMaxCapac } @ParameterizedTest - @ValueSource(ints = {2, 10}) + @ValueSource(ints = {2, 10, 20}) @Timeout(30) public void multipleStreamChannels_handlesBurstOfSubscribeRequestsAtHalfOfMaxCapacity( int numGrpcChannels) throws Exception { @@ -395,4 +396,66 @@ public void onError(Throwable t) { } }); } + + @Test + @Timeout(30) + public void oneStreamChannel_properlyDecrementsWhenErrorOccursMidStream() throws Exception { + unsubscribeCounter = 0; + final AtomicInteger unsubscribeOnErrorCounter = new AtomicInteger(0); + final ISubscriptionCallbacks callbacks = + new ISubscriptionCallbacks() { + @Override + public void onItem(TopicMessage message) {} + + @Override + public void onCompleted() { + System.out.println("onCompleted"); + unsubscribeCounter++; + } + + @Override + public void onError(Throwable t) { + System.out.println("onError"); + unsubscribeOnErrorCounter.incrementAndGet(); + } + }; + + final MomentoLocalMiddlewareArgs middlewareArgs = + new MomentoLocalMiddlewareArgs.Builder(logger, UUID.randomUUID().toString()) + .streamError(MomentoErrorCode.NOT_FOUND_ERROR) + .streamErrorRpcList(Collections.singletonList(MomentoRpcMethod.TOPIC_SUBSCRIBE)) + .streamErrorMessageLimit(3) + .build(); + + withCacheAndTopicClientWithNumStreamChannels( + 1, + middlewareArgs, + (topicClient, cacheName) -> { + List subscriptions = new ArrayList<>(); + + // Subscribe but expecting an error after a couple of heartbeats + final TopicSubscribeResponse response = + topicClient.subscribe(cacheName, "topic", callbacks).join(); + assertThat(response).isInstanceOf(TopicSubscribeResponse.Subscription.class); + subscriptions.add((TopicSubscribeResponse.Subscription) response); + + // Wait for the subscription that ran into the error to be closed + try { + Thread.sleep(3000); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException("Test interrupted while waiting for subscriptions", e); + } + + // Cleanup + for (TopicSubscribeResponse.Subscription sub : subscriptions) { + if (sub != null) { + sub.unsubscribe(); + } + } + + assertEquals(0, unsubscribeCounter); + assertEquals(1, unsubscribeOnErrorCounter.get()); + }); + } } From 38577cf7371d4137e439c85fc175eace5e881cb2 Mon Sep 17 00:00:00 2001 From: anitarua Date: Wed, 4 Jun 2025 10:15:53 -0700 Subject: [PATCH 09/15] simplify getNextStreamStub method --- .../momento/sdk/ScsTopicGrpcStubsManager.java | 28 ++++++------------- 1 file changed, 8 insertions(+), 20 deletions(-) diff --git a/momento-sdk/src/main/java/momento/sdk/ScsTopicGrpcStubsManager.java b/momento-sdk/src/main/java/momento/sdk/ScsTopicGrpcStubsManager.java index ced8d7f4..f45d420a 100644 --- a/momento-sdk/src/main/java/momento/sdk/ScsTopicGrpcStubsManager.java +++ b/momento-sdk/src/main/java/momento/sdk/ScsTopicGrpcStubsManager.java @@ -72,7 +72,6 @@ final class ScsTopicGrpcStubsManager implements Closeable { private final int numStreamGrpcChannels; private final TopicConfiguration configuration; private final Duration deadline; - private final int maximumActiveSubscriptions; ScsTopicGrpcStubsManager( @Nonnull CredentialProvider credentialProvider, @Nonnull TopicConfiguration configuration) { @@ -83,9 +82,6 @@ final class ScsTopicGrpcStubsManager implements Closeable { this.numStreamGrpcChannels = configuration.getTransportStrategy().getGrpcConfiguration().getNumStreamGrpcChannels(); - // Each stream grpc channel can support 100 concurrent active subscriptions - this.maximumActiveSubscriptions = this.numStreamGrpcChannels * 100; - this.unaryChannels = IntStream.range(0, this.numUnaryGrpcChannels) .mapToObj(i -> setupConnection(credentialProvider, configuration)) @@ -136,22 +132,14 @@ PubsubGrpc.PubsubStub getNextUnaryStub() { /** Round-robin subscribe stub. */ StreamStubWithCount getNextStreamStub() { - // First check if there's available capacity on any of the stubs - int totalActiveSubscriptions = 0; - for (StreamStubWithCount stubWithCount : streamStubs) { - totalActiveSubscriptions += stubWithCount.getCount(); - } - if (totalActiveSubscriptions < this.maximumActiveSubscriptions) { - // Try to get a client with capacity for another subscription. - // Allow up to maximumActiveSubscriptions attempts. - for (int i = 0; i < this.maximumActiveSubscriptions; i++) { - // Round-robin to the next stub - final StreamStubWithCount stubWithCount = - streamStubs.get(streamIndex.getAndIncrement() % this.numStreamGrpcChannels); - if (stubWithCount.getCount() < 100) { - stubWithCount.incrementCount(); - return stubWithCount; - } + // Try to get a client with capacity for another subscription + // by round-robining through the stubs. + for (int i = 0; i < this.streamStubs.size(); i++) { + final StreamStubWithCount stubWithCount = + streamStubs.get(streamIndex.getAndIncrement() % this.numStreamGrpcChannels); + if (stubWithCount.getCount() < 100) { + stubWithCount.incrementCount(); + return stubWithCount; } } From 3cea9fd50e9a420333d3d0262c4ab4a98a561a79 Mon Sep 17 00:00:00 2001 From: anitarua Date: Wed, 4 Jun 2025 10:17:08 -0700 Subject: [PATCH 10/15] catch more specific error --- momento-sdk/src/main/java/momento/sdk/ScsTopicClient.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/momento-sdk/src/main/java/momento/sdk/ScsTopicClient.java b/momento-sdk/src/main/java/momento/sdk/ScsTopicClient.java index 10980306..11931e57 100644 --- a/momento-sdk/src/main/java/momento/sdk/ScsTopicClient.java +++ b/momento-sdk/src/main/java/momento/sdk/ScsTopicClient.java @@ -10,6 +10,7 @@ import momento.sdk.auth.CredentialProvider; import momento.sdk.config.TopicConfiguration; import momento.sdk.exceptions.CacheServiceExceptionMapper; +import momento.sdk.exceptions.ClientSdkException; import momento.sdk.internal.SubscriptionState; import momento.sdk.responses.topic.TopicPublishResponse; import momento.sdk.responses.topic.TopicSubscribeResponse; @@ -160,9 +161,9 @@ private CompletableFuture sendSubscribe( return new TopicSubscribeResponse.Subscription(subscriptionState); } }); - } catch (Throwable e) { + } catch (ClientSdkException e) { return CompletableFuture.completedFuture( - new TopicSubscribeResponse.Error(CacheServiceExceptionMapper.convert(e))); + new TopicSubscribeResponse.Error(e)); } } From bf1064787b5282deaf5801386efae1c2cecb25ac Mon Sep 17 00:00:00 2001 From: anitarua Date: Wed, 4 Jun 2025 14:47:42 -0700 Subject: [PATCH 11/15] catch another type of possible error in sendSubscribe --- momento-sdk/src/main/java/momento/sdk/ScsTopicClient.java | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/momento-sdk/src/main/java/momento/sdk/ScsTopicClient.java b/momento-sdk/src/main/java/momento/sdk/ScsTopicClient.java index 11931e57..867fb441 100644 --- a/momento-sdk/src/main/java/momento/sdk/ScsTopicClient.java +++ b/momento-sdk/src/main/java/momento/sdk/ScsTopicClient.java @@ -162,8 +162,11 @@ private CompletableFuture sendSubscribe( } }); } catch (ClientSdkException e) { - return CompletableFuture.completedFuture( - new TopicSubscribeResponse.Error(e)); + // getNextStreamStub() may throw a ClientSdkException + return CompletableFuture.completedFuture(new TopicSubscribeResponse.Error(e)); + } catch (TopicSubscribeResponse.Error e) { + // subscribeWithRetry() may throw a TopicSubscribeResponse.Error + return CompletableFuture.completedFuture(e); } } From 276089cddf334713f85add17ca5800df4e952eeb Mon Sep 17 00:00:00 2001 From: anitarua Date: Wed, 4 Jun 2025 14:48:00 -0700 Subject: [PATCH 12/15] add correct test target for subscription initialization tests --- momento-sdk/build.gradle.kts | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/momento-sdk/build.gradle.kts b/momento-sdk/build.gradle.kts index b6eb7f64..ee586dc3 100644 --- a/momento-sdk/build.gradle.kts +++ b/momento-sdk/build.gradle.kts @@ -110,3 +110,8 @@ registerIntegrationTestTask( "test-retries", listOf("momento.sdk.retry.*") ) + +registerIntegrationTestTask( + "test-topics-subscription-initialization", + listOf("momento.sdk.retry.TopicsSubscriptionInitializationTest") +) From 8f76b5e233f0bee93c1adde6ad4475390c52bf6d Mon Sep 17 00:00:00 2001 From: anitarua Date: Wed, 4 Jun 2025 14:48:22 -0700 Subject: [PATCH 13/15] add acquireStubOrThrow method to StreamStubWithCount --- .../momento/sdk/ScsTopicGrpcStubsManager.java | 23 +++++++++++++++---- 1 file changed, 19 insertions(+), 4 deletions(-) diff --git a/momento-sdk/src/main/java/momento/sdk/ScsTopicGrpcStubsManager.java b/momento-sdk/src/main/java/momento/sdk/ScsTopicGrpcStubsManager.java index f45d420a..04a30d48 100644 --- a/momento-sdk/src/main/java/momento/sdk/ScsTopicGrpcStubsManager.java +++ b/momento-sdk/src/main/java/momento/sdk/ScsTopicGrpcStubsManager.java @@ -47,6 +47,17 @@ int incrementCount() { int decrementCount() { return count.decrementAndGet(); } + + void acquireStubOrThrow() throws ClientSdkException { + if (count.incrementAndGet() <= 100) { + return; + } else { + count.decrementAndGet(); + throw new ClientSdkException( + MomentoErrorCode.CLIENT_RESOURCE_EXHAUSTED, + "Maximum number of active subscriptions reached"); + } + } } /** @@ -134,16 +145,20 @@ PubsubGrpc.PubsubStub getNextUnaryStub() { StreamStubWithCount getNextStreamStub() { // Try to get a client with capacity for another subscription // by round-robining through the stubs. - for (int i = 0; i < this.streamStubs.size(); i++) { + // Allow up to two attempts per stub. + for (int i = 0; i < this.streamStubs.size() * 2; i++) { final StreamStubWithCount stubWithCount = streamStubs.get(streamIndex.getAndIncrement() % this.numStreamGrpcChannels); - if (stubWithCount.getCount() < 100) { - stubWithCount.incrementCount(); + try { + stubWithCount.acquireStubOrThrow(); return stubWithCount; + } catch (ClientSdkException e) { + // If the stub is at capacity, continue to the next one. + continue; } } - // Otherwise return an error + // Otherwise return an error if no stubs have capacity. throw new ClientSdkException( MomentoErrorCode.CLIENT_RESOURCE_EXHAUSTED, "Maximum number of active subscriptions reached"); From 332d7060b744fdf898d7edc3fb786b99ad9507a6 Mon Sep 17 00:00:00 2001 From: anitarua Date: Wed, 4 Jun 2025 15:00:44 -0700 Subject: [PATCH 14/15] allow up to max streams attempts for acquiring stub to accommodate bursts --- .../src/main/java/momento/sdk/ScsTopicGrpcStubsManager.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/momento-sdk/src/main/java/momento/sdk/ScsTopicGrpcStubsManager.java b/momento-sdk/src/main/java/momento/sdk/ScsTopicGrpcStubsManager.java index 04a30d48..484df6ef 100644 --- a/momento-sdk/src/main/java/momento/sdk/ScsTopicGrpcStubsManager.java +++ b/momento-sdk/src/main/java/momento/sdk/ScsTopicGrpcStubsManager.java @@ -145,8 +145,9 @@ PubsubGrpc.PubsubStub getNextUnaryStub() { StreamStubWithCount getNextStreamStub() { // Try to get a client with capacity for another subscription // by round-robining through the stubs. - // Allow up to two attempts per stub. - for (int i = 0; i < this.streamStubs.size() * 2; i++) { + // Allow up to maximumActiveSubscriptions attempts to account for large bursts of requests. + final int maximumActiveSubscriptions = this.numStreamGrpcChannels * 100; + for (int i = 0; i < maximumActiveSubscriptions; i++) { final StreamStubWithCount stubWithCount = streamStubs.get(streamIndex.getAndIncrement() % this.numStreamGrpcChannels); try { From cfca7db290c3ed70f637872fc808ba4cab0925cd Mon Sep 17 00:00:00 2001 From: anitarua Date: Wed, 4 Jun 2025 15:00:59 -0700 Subject: [PATCH 15/15] clean up test exclusion --- momento-sdk/build.gradle.kts | 1 - 1 file changed, 1 deletion(-) diff --git a/momento-sdk/build.gradle.kts b/momento-sdk/build.gradle.kts index ee586dc3..89dc6634 100644 --- a/momento-sdk/build.gradle.kts +++ b/momento-sdk/build.gradle.kts @@ -58,7 +58,6 @@ tasks.named("analyzeTestClassesDependencies").configure { tasks.named("integrationTest") { filter { excludeTestsMatching("momento.sdk.retry.*") - excludeTestsMatching("momento.sdk.subscriptionInitialization.*") } }