Skip to content

allow dropping stale messages#67

Merged
reembs merged 10 commits intoreembs:masterfrom
yoavschw:master
Nov 17, 2025
Merged

allow dropping stale messages#67
reembs merged 10 commits intoreembs:masterfrom
yoavschw:master

Conversation

@yoavschw
Copy link
Contributor

No description provided.

Copy link
Owner

@reembs reembs left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I asked a few questions and made suggestions inline

I couldn't understand from the PR how the reporting will be done. I didn't see any callbacks to the user (i.e. the Brain). Am I missing something?

if ((System.nanoTime() - messageWaitStartTime >= lateMessageMaxWaitInNanos) &&
lateMessagesStreamsToDrop.contains(message.tuple.kTuple.sourceStreamId)) {
if (!message.isLate.get()){
atomicNumberOfMessageThatShouldBeDropped.incrementAndGet()
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

atomicNumberOfMessageThatShouldBeDropped is only used for test purposes. Can we maybe nullify it it production runs to prevent another atomic operation?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm, I wanted to allow monitoring it on brain side first without dropping messages and than "turn on" dropping without requiring another kumulus release. what do you think?

get() = atomicNumberOfMessageThatShouldBeDropped.get()

@Suppress("MemberVisibilityCanBePrivate")
val droppedMessagesBoltsAndTaskIds: Set<String>
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This isn't being used

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

https://github.com/forter/forter/pull/179616/files#diff-833cdee7cb1dc37e8f19e56f8ac2de9a06524f6966e0509a190a0ccab3cd54fd
should have added this for context, sorry.
Brain side has a periodic polling for this, so I added the property

lateMessagesStreamsToDrop.contains(message.tuple.kTuple.sourceStreamId)) {
if (!message.isLate.get()){
atomicNumberOfMessageThatShouldBeDropped.incrementAndGet()
droppedMessages.add(message.component.componentId)
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure where this is being read

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@reembs reembs merged commit 90fabbe into reembs:master Nov 17, 2025
1 check failed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants