Conversation
alxhill
left a comment
There was a problem hiding this comment.
one main q, haven't taken a deep dive in the re-upload code itself but the broad monitoring/checking approach looks good
src/stream.rs
Outdated
| let consumer = Arc::new(consumer); | ||
|
|
||
| Self::create_internal( | ||
| opts, | ||
| |running, unflushed_points, request_rx, dispatcher_id| { | ||
| thread::Builder::new() | ||
| .name(format!("nmstream_dispatch_{dispatcher_id}")) | ||
| .spawn({ | ||
| let consumer = Arc::clone(&consumer); | ||
| debug!("starting request dispatcher from factory"); | ||
| move || { | ||
| request_dispatcher(running, unflushed_points, request_rx, consumer); | ||
| } | ||
| }) | ||
| .unwrap(); | ||
| }, | ||
| ) | ||
| } | ||
|
|
||
| pub fn new_with_consumer_factory<C: WriteRequestConsumerFactory + 'static>( | ||
| consumer_factory: C, | ||
| opts: NominalStreamOpts, | ||
| ) -> Self { | ||
| Self::create_internal( | ||
| opts, | ||
| |running, unflushed_points, request_rx, dispatcher_id| { | ||
| let consumer = Arc::new( | ||
| consumer_factory | ||
| .create_consumer(dispatcher_id) | ||
| .expect("Failed to create consumer"), | ||
| ); | ||
|
|
||
| thread::Builder::new() | ||
| .name(format!("nmstream_dispatch_{dispatcher_id}")) | ||
| .spawn({ | ||
| debug!("starting request dispatcher from factory"); | ||
| move || { | ||
| request_dispatcher(running, unflushed_points, request_rx, consumer); | ||
| } | ||
| }) | ||
| .unwrap(); | ||
| }, | ||
| ) | ||
| } |
There was a problem hiding this comment.
In main, I added a builder so users don't have to construct the consumers manually - good to integrate this so the "defaults" are sane/easy to use
| pub trait WriteRequestConsumerFactory: Send + Sync { | ||
| type Consumer: WriteRequestConsumer; | ||
| fn create_consumer(&self, id: usize) -> Result<Self::Consumer, Box<dyn Error + Send + Sync>>; | ||
| } |
There was a problem hiding this comment.
So I don't love that we need a whole separate "factory" trait here, feels a little unnecessary just for creating one consumer for each thread?
Two ideas on my mind are:
- have the consumer use
thread::current().id()to identify itself. works, but makes it hard to do a separate "init" stage for e.g file creation - add
WriteRequestConsumer: Clone + ...as a bound. then, consumers that don't need a factory are implemented onArc<T>instead ofT(i.eimpl WriteRequestConsumer for Arc<NominalCoreConsumer>), while the AvroFileConsumer can implement clone() manually, copy over the opts and increment an ID.
thoughts? particularly on 2?
There was a problem hiding this comment.
#2 makes sense mostly. But i'm not sure how flexible the Clone solution will be in the long run because clone() can't return a result which might be awkward if the creation of the new consumer actually fails. And it can't take any arguments either (e.g. an id) - not that we would really need an id, i guess we could just generate something randomly.
I do agree the trait is a bit too heavy weight, though. We could potentially do a lighter weight approach where we have new_with_consumer_fn or something like that that takes in a function (with params thread id, etc) that defines how to create the new consumer.
There was a problem hiding this comment.
Seems vaguely reasonable that cloning a consumer is infallible - is there a case you can think of where it could fail in a recoverable way?
For the ID, was thinking that the clone() implementation would store a refcount of some kind, so each clone gets the unique next value. Internally this would be a refcount: Arc<AtomicUsize> + id: usize in the Consumer. If this feels weird, your idea of a consumer generator is probably fine too - so long as we can abstract it away from end users via the builder
There was a problem hiding this comment.
Yea the refcount thing makes sense. Wondering if it'd be confusing for the AvroFileConsumer's clone to not be writing to the same underlying file? Since i think that's what the derived implementation would do.
There was a problem hiding this comment.
I guess people could just use arc if they needed access to the same file
| } | ||
| } | ||
|
|
||
| impl WriteRequestConsumer for Arc<AvroFileConsumer> { |
There was a problem hiding this comment.
For when we don't want to create a new file for new threads
| fallback: F, | ||
| pub struct StoreAndForwardNominalCoreConsumer<A: AuthProvider> { | ||
| core_consumer: NominalCoreConsumer<A>, | ||
| fallback_consumer: Arc<Mutex<AvroFileConsumer>>, |
There was a problem hiding this comment.
needs to be wrapped in a mutex to do the file rotation :/
There was a problem hiding this comment.
Could alternatively have a separate rotate_file method on AvroFileConsumer that switches out the file instead of cloning the consumer so we can get rid of this lock
| } | ||
|
|
||
| impl Clone for AvroFileConsumer { | ||
| fn clone(&self) -> Self { |

TL;DR
Added store-and-forward capability to the Nominal streaming client, enabling resilient data streaming with automatic recovery from network failures.
What changed?
StoreAndForwardNominalCoreConsumerthat streams data to Nominal Core and falls back to local file storage when streaming failsStreamHealthMonitorto track stream health and detect when to attempt re-uploadsStreamingClientwithNominalApiClientsthat includes streaming, upload, and ingest clientsreqwestandasync-channeldependencies for HTTP requests and async communicationHow to test?
Test resilience by simulating network failures (the consumer will automatically fall back to file storage)
Verify automatic re-upload by checking logs for "File path queued for reupload" messages after connectivity is restored
Why make this change?
This change significantly improves the reliability of the Nominal streaming client by:
The store-and-forward pattern is essential for mission-critical data pipelines that cannot afford to lose data during temporary outages.