Skip to content

feat: store and forward consumer#10

Open
joyhe208 wants to merge 16 commits intomainfrom
joy/automatic-reupload
Open

feat: store and forward consumer#10
joyhe208 wants to merge 16 commits intomainfrom
joy/automatic-reupload

Conversation

@joyhe208
Copy link
Contributor

@joyhe208 joyhe208 commented Sep 8, 2025

TL;DR

Added store-and-forward capability to the Nominal streaming client, enabling resilient data streaming with automatic recovery from network failures.

What changed?

  • Added StoreAndForwardNominalCoreConsumer that streams data to Nominal Core and falls back to local file storage when streaming fails
  • Implemented automatic file re-upload functionality when the connection is restored
  • Added StreamHealthMonitor to track stream health and detect when to attempt re-uploads
  • Replaced StreamingClient with NominalApiClients that includes streaming, upload, and ingest clients
  • Added support for multipart uploads of large files
  • Added reqwest and async-channel dependencies for HTTP requests and async communication
  • Reorganized client structure to support both streaming and batch uploads

How to test?

  1. Create a stream with store-and-forward capability:
let consumer = StoreAndForwardNominalCoreConsumer::new(
    core_consumer,
    fallback_consumer,
    ReuploadOpts::default()
);
  1. Test resilience by simulating network failures (the consumer will automatically fall back to file storage)

  2. Verify automatic re-upload by checking logs for "File path queued for reupload" messages after connectivity is restored

Why make this change?

This change significantly improves the reliability of the Nominal streaming client by:

  • Ensuring no data is lost during network outages or service disruptions
  • Automatically recovering and re-uploading data when connectivity is restored
  • Providing a more robust solution for production environments where network reliability cannot be guaranteed

The store-and-forward pattern is essential for mission-critical data pipelines that cannot afford to lose data during temporary outages.

@joyhe208 joyhe208 changed the title feat: automatic reupload feat: store and forward consumer Sep 9, 2025
@alxhill alxhill self-requested a review September 10, 2025 01:03
Copy link
Contributor

@alxhill alxhill left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

one main q, haven't taken a deep dive in the re-upload code itself but the broad monitoring/checking approach looks good

src/stream.rs Outdated
Comment on lines 255 to 298
let consumer = Arc::new(consumer);

Self::create_internal(
opts,
|running, unflushed_points, request_rx, dispatcher_id| {
thread::Builder::new()
.name(format!("nmstream_dispatch_{dispatcher_id}"))
.spawn({
let consumer = Arc::clone(&consumer);
debug!("starting request dispatcher from factory");
move || {
request_dispatcher(running, unflushed_points, request_rx, consumer);
}
})
.unwrap();
},
)
}

pub fn new_with_consumer_factory<C: WriteRequestConsumerFactory + 'static>(
consumer_factory: C,
opts: NominalStreamOpts,
) -> Self {
Self::create_internal(
opts,
|running, unflushed_points, request_rx, dispatcher_id| {
let consumer = Arc::new(
consumer_factory
.create_consumer(dispatcher_id)
.expect("Failed to create consumer"),
);

thread::Builder::new()
.name(format!("nmstream_dispatch_{dispatcher_id}"))
.spawn({
debug!("starting request dispatcher from factory");
move || {
request_dispatcher(running, unflushed_points, request_rx, consumer);
}
})
.unwrap();
},
)
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In main, I added a builder so users don't have to construct the consumers manually - good to integrate this so the "defaults" are sane/easy to use

Comment on lines +53 to +56
pub trait WriteRequestConsumerFactory: Send + Sync {
type Consumer: WriteRequestConsumer;
fn create_consumer(&self, id: usize) -> Result<Self::Consumer, Box<dyn Error + Send + Sync>>;
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So I don't love that we need a whole separate "factory" trait here, feels a little unnecessary just for creating one consumer for each thread?

Two ideas on my mind are:

  1. have the consumer use thread::current().id() to identify itself. works, but makes it hard to do a separate "init" stage for e.g file creation
  2. add WriteRequestConsumer: Clone + ... as a bound. then, consumers that don't need a factory are implemented on Arc<T> instead of T (i.e impl WriteRequestConsumer for Arc<NominalCoreConsumer>), while the AvroFileConsumer can implement clone() manually, copy over the opts and increment an ID.

thoughts? particularly on 2?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

#2 makes sense mostly. But i'm not sure how flexible the Clone solution will be in the long run because clone() can't return a result which might be awkward if the creation of the new consumer actually fails. And it can't take any arguments either (e.g. an id) - not that we would really need an id, i guess we could just generate something randomly.
I do agree the trait is a bit too heavy weight, though. We could potentially do a lighter weight approach where we have new_with_consumer_fn or something like that that takes in a function (with params thread id, etc) that defines how to create the new consumer.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems vaguely reasonable that cloning a consumer is infallible - is there a case you can think of where it could fail in a recoverable way?

For the ID, was thinking that the clone() implementation would store a refcount of some kind, so each clone gets the unique next value. Internally this would be a refcount: Arc<AtomicUsize> + id: usize in the Consumer. If this feels weird, your idea of a consumer generator is probably fine too - so long as we can abstract it away from end users via the builder

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea the refcount thing makes sense. Wondering if it'd be confusing for the AvroFileConsumer's clone to not be writing to the same underlying file? Since i think that's what the derived implementation would do.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess people could just use arc if they needed access to the same file

Copy link
Contributor Author

joyhe208 commented Sep 16, 2025

This stack of pull requests is managed by Graphite. Learn more about stacking.

}
}

impl WriteRequestConsumer for Arc<AvroFileConsumer> {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For when we don't want to create a new file for new threads

fallback: F,
pub struct StoreAndForwardNominalCoreConsumer<A: AuthProvider> {
core_consumer: NominalCoreConsumer<A>,
fallback_consumer: Arc<Mutex<AvroFileConsumer>>,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

needs to be wrapped in a mutex to do the file rotation :/

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could alternatively have a separate rotate_file method on AvroFileConsumer that switches out the file instead of cloning the consumer so we can get rid of this lock

@joyhe208 joyhe208 requested a review from alxhill September 23, 2025 19:02
}

impl Clone for AvroFileConsumer {
fn clone(&self) -> Self {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants