Skip to content

Conversation

@PhongChuong
Copy link
Collaborator

Pause the Publisher when we encounter an error when there is a send error.

When an error is encountered for a pending batch, we:

  1. In BatchWorker, pause publishing and send out errors for pending_msgs.
  2. In the pending batch, send out error for its messages.
  3. New messages in the rx receiver are handled as they are received by the BatchWorker.

A resume operation will be added in a later PR.

This PR also introduce PublishError. Further work is needed to handle error propagation more fully

@product-auto-label product-auto-label bot added the api: pubsub Issues related to the Pub/Sub API. label Jan 15, 2026
@codecov
Copy link

codecov bot commented Jan 15, 2026

Codecov Report

❌ Patch coverage is 88.23529% with 2 lines in your changes missing coverage. Please review.
✅ Project coverage is 94.82%. Comparing base (13bef44) to head (7030904).
⚠️ Report is 8 commits behind head on main.

Files with missing lines Patch % Lines
src/pubsub/src/publisher/worker.rs 66.66% 2 Missing ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##             main    #4286      +/-   ##
==========================================
- Coverage   94.85%   94.82%   -0.04%     
==========================================
  Files         187      187              
  Lines        7194     7207      +13     
==========================================
+ Hits         6824     6834      +10     
- Misses        370      373       +3     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

@PhongChuong PhongChuong marked this pull request as ready for review January 15, 2026 16:35
@PhongChuong PhongChuong requested a review from a team as a code owner January 15, 2026 16:35
@PhongChuong PhongChuong requested a review from suzmue January 15, 2026 16:35
Copy link
Member

@dbolduc dbolduc left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't get to the changes in worker.rs

Consider doing the error type refactor first, then the unit tests look more like what we want.


/// Publish is paused for the ordering key.
#[error("the ordering key was paused")]
OrderingKeyPaused(()),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Usability question, which is not trivial, so feel free to think about it later:

Publish consumes the application's PubsubMessage. If the operation fails, what should the application do to resend the message? Would they need to hold onto a clone of the message until the operation is complete?

It would be super convenient if we could give them their message back. The plumbing on our end could be brutal, but the application would appreciate it. 🤷

Copy link
Member

@dbolduc dbolduc Jan 15, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

our GAPICs all have this problem too.... idk if publishing is special. Something seems wrong about just dropping their message without trying to send it. But maybe I am worrying too much about the unhappy case. 🤷

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed.
Ideally, in the error case we should give the user the message back. IMO, we can do this in 2 ways:

  1. Keep a clone of it internally and pass it back to the user if there is a failure.
  2. Augment generated code/error to pass the message back if there is an error during Send.
    I think it's a bigger discussion that should be left out of this PR.

}
}

fn convert_error(e: crate::error::PublishError) -> gax::error::Error {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

comment: ah, I think you are hesitating to change Output to a Result<String, PublishError> because then we have to update all the code downstream of this.

Ok.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is really yucky though. It might have been nicer to change the error type first.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should discuss as a group with @suzmue if returning gax::error::Error or Publish error directly is ideal. We decided to with gax::error::Error for now as it is consistent with the other clients.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It feels so wrong to me, but we do have a precedence for just throwing something in an Error::io

// TODO(#3626) - reconsider the error kind.
result.map_err(crate::Error::io)

Although in GCS, I think there was a more compelling reason. (We wanted to reuse the ReadObjectResponse type which was in terms of gax::Error.)

Copy link
Collaborator Author

@PhongChuong PhongChuong left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the review.


/// Publish is paused for the ordering key.
#[error("the ordering key was paused")]
OrderingKeyPaused(()),
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed.
Ideally, in the error case we should give the user the message back. IMO, we can do this in 2 ways:

  1. Keep a clone of it internally and pass it back to the user if there is a failure.
  2. Augment generated code/error to pass the message back if there is an error during Send.
    I think it's a bigger discussion that should be left out of this PR.

}
}

fn convert_error(e: crate::error::PublishError) -> gax::error::Error {
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should discuss as a group with @suzmue if returning gax::error::Error or Publish error directly is ideal. We decided to with gax::error::Error for now as it is consistent with the other clients.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

api: pubsub Issues related to the Pub/Sub API.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants