-
Notifications
You must be signed in to change notification settings - Fork 5
chore: improve subscription reconnection logic #435
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
| subscribeWithRetryInternal(future); | ||
| return future; | ||
| } | ||
| CompletableFuture<Void> subscribeWithRetry() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I combined this with the internal method because the public method didn't do anything other than creating the future, and this way we don't have to give an invalid future to the internal method if we retry. I also made it package private so we can call if for unit tests without letting end users have access.
|
|
||
| if (!isSubscribed.get()) { | ||
| completeExceptionally( | ||
| future, new UnknownException("Cannot subscribe to an unsubscribed subscription")); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wasn't sure what type of error code to use here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is there a client sdk error in this sdk? otherwise unknown seems reasonable enough to me
| @Override | ||
| public void onError(Throwable t) { | ||
| if (firstMessage) { | ||
| firstMessage = false; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We don't need to set firstMessage to false because the subscription has ended at this point and it no longer matters.
| } | ||
|
|
||
| @Override | ||
| public Optional<Duration> determineWhenToRetry(Throwable error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need anything other than the error type? I'm using the error instead of the status so I can move some more retry logic into here. The gRPC method doesn't matter since this is only used for subscriptions. We might care about attempt number if we want a retry strategy that has an increasing delay between attempts, but the only implementation has a fixed delay for now.
Also, do we want to use the word 'retry' in the naming? It felt more natural in the documentation to use 'reconnect' to distinguish between reconnecting an existing subscription and retrying the call that made the subscription.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't the retry eligibility be a separate concern vs determineWhenToRetry? Ie determineWhenToRetry should be invoked if and only if the call is eligible for retry. Otherwise anyone who implements a RetryStrategy will have to include the isEligibleForRetry test in their implementation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
determineWhenToRetry calls isEligibleForRetry in in js sdk as well
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The cache client calls don't call the eligibility strategy independently from the retry strategy; it's always called by the retry strategy.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Keeping track of attempt number could be nice, especially if printing in debug logs.
And since Java would be the first one with the subscription-specific retry strategy, I think either naming scheme would be fine.
06d83a2 to
c00a1e0
Compare
| private final SubscriptionRetryStrategy retryStrategy; | ||
| private final AtomicBoolean firstMessage = new AtomicBoolean(true); | ||
| private final AtomicBoolean isConnectionLost = new AtomicBoolean(false); | ||
| private final AtomicBoolean isSubscribed = new AtomicBoolean(true); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is this used instead of SubscriptionState's private boolean isSubscribed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll look into combining those.
| nonRetryableStatusCodes.add(Status.Code.PERMISSION_DENIED); | ||
| nonRetryableStatusCodes.add(Status.Code.UNAUTHENTICATED); | ||
| nonRetryableStatusCodes.add(Status.Code.CANCELLED); | ||
| nonRetryableStatusCodes.add(Status.Code.NOT_FOUND); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what was the rationale behind adding NOT_FOUND again?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That is for when a cache is deleted mid-stream. I can make an integration test to show that that is the error returned when that happens.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This PR refactors the subscription reconnection logic, introducing a configurable retry strategy and eligibility strategy to better handle reconnection attempts and prevent race conditions. Key changes include updating the subscription wrapper to use atomic state management and scheduled retries, refactoring the client to use callbacks instead of legacy options, and removing the obsolete SendSubscribeOptions.
Reviewed Changes
Copilot reviewed 12 out of 12 changed files in this pull request and generated no comments.
Show a summary per file
| File | Description |
|---|---|
| SubscriptionWrapperTest.java | Updated tests to use the new ISubscriptionCallbacks interface and verify updated connection callback behavior. |
| SubscriptionRetryStrategy.java | Introduced a new interface for determining retry delays based on errors. |
| SubscriptionRetryEligibilityStrategy.java | Added an interface for determining subscription retry eligibility. |
| FixedDelaySubscriptionRetryStrategy.java | Implemented a fixed delay strategy for subscription retries. |
| DefaultSubscriptionRetryEligibilityStrategy.java | Provided a default eligibility strategy based on gRPC status codes. |
| TopicConfiguration.java | Updated constructors to accept a subscription retry strategy and loggers, and provided a method to override the strategy. |
| SubscriptionWrapper.java | Refactored to use atomic types for state management, integrated the new retry strategy logic, and improved resource cleanup. |
| SendSubscribeOptions.java | Removed obsolete subscription options in favor of new callback-based approach. |
| ScsTopicClient.java | Updated to use the new subscription callback interface and integrated the retry strategy for reconnects. |
| IScsTopicConnection.java | Adjusted default implementations for open and close methods. |
| CancelableClientCallStreamObserver.java | Revised generic type usage and simplified extension handling. |
| TopicClientLocalTest.java | Updated error expectations to reflect the new iteration of unrecoverable errors. |
Comments suppressed due to low confidence (1)
momento-sdk/src/main/java/momento/sdk/SubscriptionWrapper.java:247
- Ensure that subscription.get() is not null before calling cancel, to prevent a potential NullPointerException if unsubscribe is invoked before the subscription is set.
subscription.get().cancel("Unsubscribing from topic: " + topicName + " in cache: " + cacheName, null);
Move the topic subscription reconnection logic into a retry strategy and eligibility strategy that can be configured by the client. Add the subscription retry strategy to TopicConfiguration. Add new constructors that take the retry strategy, and new ones that don't take a logger, which is only used inside the TopicConfiguration itself. Prevent potential race conditions that could occur when unsubscribing from a connection while it is reconnecting by checking if the user called unsubscribe before reconnecting, and by making the variables used in reconnection atomic. Call close in SubscriptionWrapper when a stream ends to stop a subscription from leaking threads from the retry executor service. We may want to share a thread pool between the subscriptions instead of making a new one per subscription, so that we don't need to make a new one each time. We could also use it to execute the callbacks, which are currently executed in the gRPC thread pool and could cause backpressure if they are long-running. Add empty default implementations for the test methods in IScsTopicConnection to clean up its use in the topic client. Remove SendSubscribeOptions, since we already have a class that contains the callbacks, and because it had an unused subscription object. Get rid of CancelableClientCallStreamObserver's ClientCallStreamObserver extension, because that class is meant for observing requests, not responses, and we only need a simple cancel method. Make TopicClientLocalTest's unrecoverable error 'not found', because the previous error is now recoverable.
c00a1e0 to
332726a
Compare
|
Rebased onto main and added a commit for gracefully ending subscriptions when unsubscribe is called (should call onCompleted instead of onError). Will use this as the new base for subscriptions bookkeeping changes (#448) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This PR refactors topic subscription retry behavior into configurable strategy interfaces, updates the subscription wrapper to use atomic state and the new strategies, and wires the retry strategy through TopicConfiguration and ScsTopicClient. It also removes the legacy SendSubscribeOptions class and tightens up test behavior.
- Introduces
SubscriptionRetryStrategyandSubscriptionRetryEligibilityStrategyinterfaces with a fixed-delay implementation - Refactors
SubscriptionWrapperto use atomic flags, the new retry strategy, and to properly close resources - Updates
TopicConfigurationandScsTopicClientto accept and expose the subscription retry strategy; removesSendSubscribeOptions
Reviewed Changes
Copilot reviewed 12 out of 12 changed files in this pull request and generated 2 comments.
Show a summary per file
| File | Description |
|---|---|
| src/main/java/momento/sdk/retry/SubscriptionRetryStrategy.java | New interface for determining retry delay |
| src/main/java/momento/sdk/retry/SubscriptionRetryEligibilityStrategy.java | New interface for retry eligibility checks |
| src/main/java/momento/sdk/retry/FixedDelaySubscriptionRetryStrategy.java | Fixed-delay strategy implementation |
| src/main/java/momento/sdk/retry/DefaultSubscriptionRetryEligibilityStrategy.java | Default eligibility (gRPC status-based) implementation |
| src/main/java/momento/sdk/config/TopicConfiguration.java | Added retry strategy in constructors and getter |
| src/main/java/momento/sdk/SubscriptionWrapper.java | Refactored to use retry strategy, atomic state, and Closeable |
| src/main/java/momento/sdk/SendSubscribeOptions.java | Removed legacy options class |
| src/main/java/momento/sdk/ScsTopicClient.java | Updated subscribe path to use new wrapper signature |
| src/main/java/momento/sdk/IScsTopicConnection.java | Made open/close default methods |
| src/main/java/momento/sdk/CancelableClientCallStreamObserver.java | Dropped unnecessary extension to ClientCallStreamObserver |
| src/test/java/momento/sdk/SubscriptionWrapperTest.java | Adjusted test to new constructor and retry strategy |
| src/intTest/java/momento/sdk/retry/TopicClientLocalTest.java | Changed test to mark NOT_FOUND as unrecoverable |
Comments suppressed due to low confidence (3)
momento-sdk/src/main/java/momento/sdk/ScsTopicClient.java:21
- [nitpick] The name
DEFAULT_REQUEST_TIMEOUT_SECONDSimplies a constant. Consider making itprivate static final longto follow Java constant naming conventions.
private final long DEFAULT_REQUEST_TIMEOUT_SECONDS = 5;
momento-sdk/src/main/java/momento/sdk/config/TopicConfiguration.java:53
- Missing import for
java.time.Duration; this will cause a compile error. Addimport java.time.Duration;at the top of the file.
new FixedDelaySubscriptionRetryStrategy(Duration.ofMillis(500)),
momento-sdk/src/test/java/momento/sdk/SubscriptionWrapperTest.java:39
- The anonymous
ISubscriptionCallbacksimplementation omitsonDiscontinuityandonHeartbeatmethods. All interface methods must be implemented or defaults provided to avoid compile errors.
new ISubscriptionCallbacks() {
|
|
||
| if (subscription != null) { | ||
| subscription.cancel("Timed out waiting for first message", null); | ||
| subscription.get().cancel("Timed out waiting for first message", null); |
Copilot
AI
Jun 2, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Potential NullPointerException: subscription.get() may be null if the scheduled task fires before subscription.set(...). Add a null check on subscription.get() before calling cancel.
| subscription.get().cancel("Timed out waiting for first message", null); | |
| CancelableClientCallStreamObserver<_SubscriptionItem> currentSubscription = subscription.get(); | |
| if (currentSubscription != null) { | |
| currentSubscription.cancel("Timed out waiting for first message", null); | |
| } else { | |
| logger.warn("Subscription is null when attempting to cancel due to timeout."); | |
| } |
| final Optional<Duration> retryOpt = retryStrategy.determineWhenToRetry(t); | ||
| if (retryOpt.isPresent()) { | ||
| if (isSubscribed.get()) { | ||
| scheduleRetry(retryOpt.get(), () -> subscribeWithRetry()); |
Copilot
AI
Jun 2, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Calling subscribeWithRetry() here spawns a new CompletableFuture that isn’t linked to the original returned future, so consumers may never see the retry outcome. Consider extracting the retry loop into a shared method that reuses the same future.
| scheduleRetry(retryOpt.get(), () -> subscribeWithRetry()); | |
| scheduleRetry(retryOpt.get(), () -> retrySubscription(future, t)); |
Move the topic subscription reconnection logic into a retry strategy and eligibility strategy that can be configured by the client.
Add the subscription retry strategy to TopicConfiguration. Add new constructors that take the retry strategy, and new ones that don't take a logger, which is only used inside the TopicConfiguration itself.
Prevent potential race conditions that could occur when unsubscribing from a connection while it is reconnecting by checking if the user called unsubscribe before reconnecting, and by making the variables used in reconnection atomic.
Call close in SubscriptionWrapper when a stream ends to stop a subscription from leaking threads from the retry executor service. We may want to share a thread pool between the subscriptions instead of making a new one per subscription, so that we don't need to make a new one each time. We could also use it to execute the callbacks, which are currently executed in the gRPC thread pool and could cause backpressure if they are long-running.
Add empty default implementations for the test methods in IScsTopicConnection to clean up its use in the topic client.
Remove SendSubscribeOptions, since we already have a class that contains the callbacks, and because it had an unused subscription object.
Get rid of CancelableClientCallStreamObserver's ClientCallStreamObserver extension, because that class is meant for observing requests, not responses, and we only need a simple cancel method.
Make TopicClientLocalTest's unrecoverable error 'not found', because the previous error is now recoverable.