Skip to content

Conversation

@nand4011
Copy link
Contributor

Move the topic subscription reconnection logic into a retry strategy and eligibility strategy that can be configured by the client.

Add the subscription retry strategy to TopicConfiguration. Add new constructors that take the retry strategy, and new ones that don't take a logger, which is only used inside the TopicConfiguration itself.

Prevent potential race conditions that could occur when unsubscribing from a connection while it is reconnecting by checking if the user called unsubscribe before reconnecting, and by making the variables used in reconnection atomic.

Call close in SubscriptionWrapper when a stream ends to stop a subscription from leaking threads from the retry executor service. We may want to share a thread pool between the subscriptions instead of making a new one per subscription, so that we don't need to make a new one each time. We could also use it to execute the callbacks, which are currently executed in the gRPC thread pool and could cause backpressure if they are long-running.

Add empty default implementations for the test methods in IScsTopicConnection to clean up its use in the topic client.

Remove SendSubscribeOptions, since we already have a class that contains the callbacks, and because it had an unused subscription object.

Get rid of CancelableClientCallStreamObserver's ClientCallStreamObserver extension, because that class is meant for observing requests, not responses, and we only need a simple cancel method.

Make TopicClientLocalTest's unrecoverable error 'not found', because the previous error is now recoverable.

subscribeWithRetryInternal(future);
return future;
}
CompletableFuture<Void> subscribeWithRetry() {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I combined this with the internal method because the public method didn't do anything other than creating the future, and this way we don't have to give an invalid future to the internal method if we retry. I also made it package private so we can call if for unit tests without letting end users have access.


if (!isSubscribed.get()) {
completeExceptionally(
future, new UnknownException("Cannot subscribe to an unsubscribed subscription"));
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wasn't sure what type of error code to use here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is there a client sdk error in this sdk? otherwise unknown seems reasonable enough to me

@Override
public void onError(Throwable t) {
if (firstMessage) {
firstMessage = false;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't need to set firstMessage to false because the subscription has ended at this point and it no longer matters.

}

@Override
public Optional<Duration> determineWhenToRetry(Throwable error) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need anything other than the error type? I'm using the error instead of the status so I can move some more retry logic into here. The gRPC method doesn't matter since this is only used for subscriptions. We might care about attempt number if we want a retry strategy that has an increasing delay between attempts, but the only implementation has a fixed delay for now.

Also, do we want to use the word 'retry' in the naming? It felt more natural in the documentation to use 'reconnect' to distinguish between reconnecting an existing subscription and retrying the call that made the subscription.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't the retry eligibility be a separate concern vs determineWhenToRetry? Ie determineWhenToRetry should be invoked if and only if the call is eligible for retry. Otherwise anyone who implements a RetryStrategy will have to include the isEligibleForRetry test in their implementation.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

determineWhenToRetry calls isEligibleForRetry in in js sdk as well

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The cache client calls don't call the eligibility strategy independently from the retry strategy; it's always called by the retry strategy.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Keeping track of attempt number could be nice, especially if printing in debug logs.

And since Java would be the first one with the subscription-specific retry strategy, I think either naming scheme would be fine.

@nand4011 nand4011 force-pushed the subscription-retry-improvements branch from 06d83a2 to c00a1e0 Compare March 19, 2025 18:29
private final SubscriptionRetryStrategy retryStrategy;
private final AtomicBoolean firstMessage = new AtomicBoolean(true);
private final AtomicBoolean isConnectionLost = new AtomicBoolean(false);
private final AtomicBoolean isSubscribed = new AtomicBoolean(true);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this used instead of SubscriptionState's private boolean isSubscribed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll look into combining those.

nonRetryableStatusCodes.add(Status.Code.PERMISSION_DENIED);
nonRetryableStatusCodes.add(Status.Code.UNAUTHENTICATED);
nonRetryableStatusCodes.add(Status.Code.CANCELLED);
nonRetryableStatusCodes.add(Status.Code.NOT_FOUND);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what was the rationale behind adding NOT_FOUND again?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is for when a cache is deleted mid-stream. I can make an integration test to show that that is the error returned when that happens.

@malandis malandis requested a review from Copilot April 1, 2025 22:28
Copy link

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

This PR refactors the subscription reconnection logic, introducing a configurable retry strategy and eligibility strategy to better handle reconnection attempts and prevent race conditions. Key changes include updating the subscription wrapper to use atomic state management and scheduled retries, refactoring the client to use callbacks instead of legacy options, and removing the obsolete SendSubscribeOptions.

Reviewed Changes

Copilot reviewed 12 out of 12 changed files in this pull request and generated no comments.

Show a summary per file
File Description
SubscriptionWrapperTest.java Updated tests to use the new ISubscriptionCallbacks interface and verify updated connection callback behavior.
SubscriptionRetryStrategy.java Introduced a new interface for determining retry delays based on errors.
SubscriptionRetryEligibilityStrategy.java Added an interface for determining subscription retry eligibility.
FixedDelaySubscriptionRetryStrategy.java Implemented a fixed delay strategy for subscription retries.
DefaultSubscriptionRetryEligibilityStrategy.java Provided a default eligibility strategy based on gRPC status codes.
TopicConfiguration.java Updated constructors to accept a subscription retry strategy and loggers, and provided a method to override the strategy.
SubscriptionWrapper.java Refactored to use atomic types for state management, integrated the new retry strategy logic, and improved resource cleanup.
SendSubscribeOptions.java Removed obsolete subscription options in favor of new callback-based approach.
ScsTopicClient.java Updated to use the new subscription callback interface and integrated the retry strategy for reconnects.
IScsTopicConnection.java Adjusted default implementations for open and close methods.
CancelableClientCallStreamObserver.java Revised generic type usage and simplified extension handling.
TopicClientLocalTest.java Updated error expectations to reflect the new iteration of unrecoverable errors.
Comments suppressed due to low confidence (1)

momento-sdk/src/main/java/momento/sdk/SubscriptionWrapper.java:247

  • Ensure that subscription.get() is not null before calling cancel, to prevent a potential NullPointerException if unsubscribe is invoked before the subscription is set.
subscription.get().cancel("Unsubscribing from topic: " + topicName + " in cache: " + cacheName, null);

nand4011 and others added 2 commits May 30, 2025 10:43
Move the topic subscription reconnection logic into a retry strategy
and eligibility strategy that can be configured by the client.

Add the subscription retry strategy to TopicConfiguration. Add new
constructors that take the retry strategy, and new ones that don't take
a logger, which is only used inside the TopicConfiguration itself.

Prevent potential race conditions that could occur when unsubscribing
from a connection while it is reconnecting by checking if the user
called unsubscribe before reconnecting, and by making the variables
used in reconnection atomic.

Call close in SubscriptionWrapper when a stream ends to stop a
subscription from leaking threads from the retry executor service. We
may want to share a thread pool between the subscriptions instead of
making a new one per subscription, so that we don't need to make a new
one each time. We could also use it to execute the callbacks, which are
currently executed in the gRPC thread pool and could cause backpressure
if they are long-running.

Add empty default implementations for the test methods in
IScsTopicConnection to clean up its use in the topic client.

Remove SendSubscribeOptions, since we already have a class that contains
the callbacks, and because it had an unused subscription object.

Get rid of CancelableClientCallStreamObserver's ClientCallStreamObserver
extension, because that class is meant for observing requests, not
responses, and we only need a simple cancel method.

Make TopicClientLocalTest's unrecoverable error 'not found', because the
previous error is now recoverable.
@anitarua anitarua force-pushed the subscription-retry-improvements branch from c00a1e0 to 332726a Compare May 30, 2025 18:53
@anitarua
Copy link
Contributor

Rebased onto main and added a commit for gracefully ending subscriptions when unsubscribe is called (should call onCompleted instead of onError). Will use this as the new base for subscriptions bookkeeping changes (#448)

@malandis malandis requested a review from Copilot June 2, 2025 23:36
Copy link

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

This PR refactors topic subscription retry behavior into configurable strategy interfaces, updates the subscription wrapper to use atomic state and the new strategies, and wires the retry strategy through TopicConfiguration and ScsTopicClient. It also removes the legacy SendSubscribeOptions class and tightens up test behavior.

  • Introduces SubscriptionRetryStrategy and SubscriptionRetryEligibilityStrategy interfaces with a fixed-delay implementation
  • Refactors SubscriptionWrapper to use atomic flags, the new retry strategy, and to properly close resources
  • Updates TopicConfiguration and ScsTopicClient to accept and expose the subscription retry strategy; removes SendSubscribeOptions

Reviewed Changes

Copilot reviewed 12 out of 12 changed files in this pull request and generated 2 comments.

Show a summary per file
File Description
src/main/java/momento/sdk/retry/SubscriptionRetryStrategy.java New interface for determining retry delay
src/main/java/momento/sdk/retry/SubscriptionRetryEligibilityStrategy.java New interface for retry eligibility checks
src/main/java/momento/sdk/retry/FixedDelaySubscriptionRetryStrategy.java Fixed-delay strategy implementation
src/main/java/momento/sdk/retry/DefaultSubscriptionRetryEligibilityStrategy.java Default eligibility (gRPC status-based) implementation
src/main/java/momento/sdk/config/TopicConfiguration.java Added retry strategy in constructors and getter
src/main/java/momento/sdk/SubscriptionWrapper.java Refactored to use retry strategy, atomic state, and Closeable
src/main/java/momento/sdk/SendSubscribeOptions.java Removed legacy options class
src/main/java/momento/sdk/ScsTopicClient.java Updated subscribe path to use new wrapper signature
src/main/java/momento/sdk/IScsTopicConnection.java Made open/close default methods
src/main/java/momento/sdk/CancelableClientCallStreamObserver.java Dropped unnecessary extension to ClientCallStreamObserver
src/test/java/momento/sdk/SubscriptionWrapperTest.java Adjusted test to new constructor and retry strategy
src/intTest/java/momento/sdk/retry/TopicClientLocalTest.java Changed test to mark NOT_FOUND as unrecoverable
Comments suppressed due to low confidence (3)

momento-sdk/src/main/java/momento/sdk/ScsTopicClient.java:21

  • [nitpick] The name DEFAULT_REQUEST_TIMEOUT_SECONDS implies a constant. Consider making it private static final long to follow Java constant naming conventions.
private final long DEFAULT_REQUEST_TIMEOUT_SECONDS = 5;

momento-sdk/src/main/java/momento/sdk/config/TopicConfiguration.java:53

  • Missing import for java.time.Duration; this will cause a compile error. Add import java.time.Duration; at the top of the file.
new FixedDelaySubscriptionRetryStrategy(Duration.ofMillis(500)),

momento-sdk/src/test/java/momento/sdk/SubscriptionWrapperTest.java:39

  • The anonymous ISubscriptionCallbacks implementation omits onDiscontinuity and onHeartbeat methods. All interface methods must be implemented or defaults provided to avoid compile errors.
new ISubscriptionCallbacks() {


if (subscription != null) {
subscription.cancel("Timed out waiting for first message", null);
subscription.get().cancel("Timed out waiting for first message", null);
Copy link

Copilot AI Jun 2, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Potential NullPointerException: subscription.get() may be null if the scheduled task fires before subscription.set(...). Add a null check on subscription.get() before calling cancel.

Suggested change
subscription.get().cancel("Timed out waiting for first message", null);
CancelableClientCallStreamObserver<_SubscriptionItem> currentSubscription = subscription.get();
if (currentSubscription != null) {
currentSubscription.cancel("Timed out waiting for first message", null);
} else {
logger.warn("Subscription is null when attempting to cancel due to timeout.");
}

Copilot uses AI. Check for mistakes.
final Optional<Duration> retryOpt = retryStrategy.determineWhenToRetry(t);
if (retryOpt.isPresent()) {
if (isSubscribed.get()) {
scheduleRetry(retryOpt.get(), () -> subscribeWithRetry());
Copy link

Copilot AI Jun 2, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Calling subscribeWithRetry() here spawns a new CompletableFuture that isn’t linked to the original returned future, so consumers may never see the retry outcome. Consider extracting the retry loop into a shared method that reuses the same future.

Suggested change
scheduleRetry(retryOpt.get(), () -> subscribeWithRetry());
scheduleRetry(retryOpt.get(), () -> retrySubscription(future, t));

Copilot uses AI. Check for mistakes.
@anitarua anitarua merged commit 0a5e7fd into main Jun 3, 2025
6 checks passed
@anitarua anitarua deleted the subscription-retry-improvements branch June 3, 2025 16:18
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants