Conversation
reembs
left a comment
There was a problem hiding this comment.
I asked a few questions and made suggestions inline
I couldn't understand from the PR how the reporting will be done. I didn't see any callbacks to the user (i.e. the Brain). Am I missing something?
| if ((System.nanoTime() - messageWaitStartTime >= lateMessageMaxWaitInNanos) && | ||
| lateMessagesStreamsToDrop.contains(message.tuple.kTuple.sourceStreamId)) { | ||
| if (!message.isLate.get()){ | ||
| atomicNumberOfMessageThatShouldBeDropped.incrementAndGet() |
There was a problem hiding this comment.
atomicNumberOfMessageThatShouldBeDropped is only used for test purposes. Can we maybe nullify it it production runs to prevent another atomic operation?
There was a problem hiding this comment.
hmm, I wanted to allow monitoring it on brain side first without dropping messages and than "turn on" dropping without requiring another kumulus release. what do you think?
| get() = atomicNumberOfMessageThatShouldBeDropped.get() | ||
|
|
||
| @Suppress("MemberVisibilityCanBePrivate") | ||
| val droppedMessagesBoltsAndTaskIds: Set<String> |
There was a problem hiding this comment.
https://github.com/forter/forter/pull/179616/files#diff-833cdee7cb1dc37e8f19e56f8ac2de9a06524f6966e0509a190a0ccab3cd54fd
should have added this for context, sorry.
Brain side has a periodic polling for this, so I added the property
| lateMessagesStreamsToDrop.contains(message.tuple.kTuple.sourceStreamId)) { | ||
| if (!message.isLate.get()){ | ||
| atomicNumberOfMessageThatShouldBeDropped.incrementAndGet() | ||
| droppedMessages.add(message.component.componentId) |
There was a problem hiding this comment.
Not sure where this is being read
There was a problem hiding this comment.
see Brain side changes in a test PR on forter/forter (BrainTopology)
https://github.com/forter/forter/pull/179616/files#diff-833cdee7cb1dc37e8f19e56f8ac2de9a06524f6966e0509a190a0ccab3cd54fd
No description provided.