From facda31444f6d55826b64e46c8cd5b912a05896e Mon Sep 17 00:00:00 2001 From: Yoav Schwartz Date: Wed, 12 Nov 2025 11:24:11 +0200 Subject: [PATCH 01/10] add kumulus topology changes --- .../org/xyro/kumulus/KumulusTopology.kt | 65 +++++-- .../org/xyro/kumulus/component/KumulusBolt.kt | 40 ---- .../kumulus/component/KumulusComponent.kt | 89 --------- .../KumulusFailureNotificationSpout.kt | 13 -- .../xyro/kumulus/component/KumulusSpout.kt | 121 ------------ .../KumulusTimeoutNotificationSpout.kt | 14 -- .../org/xyro/kumulus/component/TupleImpl.kt | 179 ------------------ 7 files changed, 54 insertions(+), 467 deletions(-) delete mode 100644 src/main/kotlin/org/xyro/kumulus/component/KumulusBolt.kt delete mode 100644 src/main/kotlin/org/xyro/kumulus/component/KumulusComponent.kt delete mode 100644 src/main/kotlin/org/xyro/kumulus/component/KumulusFailureNotificationSpout.kt delete mode 100644 src/main/kotlin/org/xyro/kumulus/component/KumulusSpout.kt delete mode 100644 src/main/kotlin/org/xyro/kumulus/component/KumulusTimeoutNotificationSpout.kt delete mode 100644 src/main/kotlin/org/xyro/kumulus/component/TupleImpl.kt diff --git a/src/main/kotlin/org/xyro/kumulus/KumulusTopology.kt b/src/main/kotlin/org/xyro/kumulus/KumulusTopology.kt index 5255fa3..fc6de46 100644 --- a/src/main/kotlin/org/xyro/kumulus/KumulusTopology.kt +++ b/src/main/kotlin/org/xyro/kumulus/KumulusTopology.kt @@ -15,6 +15,7 @@ import org.xyro.kumulus.component.KumulusMessage import org.xyro.kumulus.component.KumulusSpout import org.xyro.kumulus.component.PrepareMessage import org.xyro.kumulus.component.SpoutPrepareMessage +import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.CountDownLatch import java.util.concurrent.RejectedExecutionHandler import java.util.concurrent.ScheduledThreadPoolExecutor @@ -43,6 +44,13 @@ class KumulusTopology( .toMap() private val atomicThreadsInUse = AtomicInteger(0) private val atomicMaxThreadsInUse = AtomicInteger(0) + private val atomicNumberOfMessageThatShouldBeDropped = AtomicInteger(0) + + @Suppress("UNCHECKED_CAST") + private val lateMessagesStreamsToDrop: Set = config[CONF_LATE_MESSAGES_DROPPING_STREAMS_NAME] as? Set ?: emptySet() + private val lateMessagesShouldDrop: Boolean = config[CONF_LATE_MESSAGES_DROPPING_SHOULD_DROP] as? Boolean ?: false + private val lateMessageMaxWaitInNanos: Long = (config[CONF_LATE_MESSAGES_DROPPING_MAX_WAIT_SECONDS] as? Long ?: 10) * 1_000_000_000L + private val droppedMessages: MutableSet = ConcurrentHashMap.newKeySet() private val scheduledExecutorPoolSize: Int = (config[CONF_SCHEDULED_EXECUTOR_THREAD_POOL_SIZE] as? Long ?: 5L).toInt() @@ -67,6 +75,14 @@ class KumulusTopology( val maxThreadsInUse: Int get() = atomicMaxThreadsInUse.get() + @Suppress("MemberVisibilityCanBePrivate") + val numOfMessagesToDrop: Int + get() = atomicNumberOfMessageThatShouldBeDropped.get() + + @Suppress("MemberVisibilityCanBePrivate") + val droppedMessagesBoltsAndTaskIds: Set + get() = droppedMessages.toMutableSet() + @Suppress("MemberVisibilityCanBePrivate") val maxQueueSize: Int get() = boltExecutionPool.maxSize.get() @@ -92,6 +108,9 @@ class KumulusTopology( const val CONF_BOLT_QUEUE_PUSHBACK_WAIT = "kumulus.bolt.pushback.wait" const val CONF_SHUTDOWN_TIMEOUT_SECS = "kumulus.shutdown.timeout.secs" const val CONF_SCHEDULED_EXECUTOR_THREAD_POOL_SIZE = "kumulus.executor.scheduled-executor.pool-size" + const val CONF_LATE_MESSAGES_DROPPING_STREAMS_NAME = "kumulus.late-messages-dropping.streams-name" + const val CONF_LATE_MESSAGES_DROPPING_SHOULD_DROP = "kumulus.late-messages-dropping.should-drop" + const val CONF_LATE_MESSAGES_DROPPING_MAX_WAIT_SECONDS = "kumulus.late-messages-dropping.max-wait-seconds" @Suppress("unused") @Deprecated("Use CONF_READY_POLL_SLEEP instead") @@ -290,19 +309,41 @@ class KumulusTopology( } } else { logger.trace { "Component ${c.componentId}/${c.taskId} is currently busy" } - if (onBusyBoltHook != null && message is ExecuteMessage) { + var shouldEnqueue = true + if (message is ExecuteMessage) { + val messageWaitStartTime = c.waitStart.get() + + if (messageWaitStartTime > 0){ + if ((System.nanoTime() - messageWaitStartTime >= lateMessageMaxWaitInNanos) && + lateMessagesStreamsToDrop.contains(message.tuple.kTuple.sourceStreamId)) { + if (!message.isLate.get()){ + atomicNumberOfMessageThatShouldBeDropped.incrementAndGet() + droppedMessages.add(message.component.componentId) + message.isLate.set(true) + } + + if (lateMessagesShouldDrop){ + shouldEnqueue = false + } + } + } + c.waitStart.compareAndSet(0, System.nanoTime()) } - if (queuePushbackWait <= 0L) { - boltExecutionPool.enqueue(message) - } else { - scheduledExecutor.schedule( - { - boltExecutionPool.enqueue(message) - }, - queuePushbackWait, TimeUnit.MILLISECONDS - ) + + if (shouldEnqueue){ + if (queuePushbackWait <= 0L) { + boltExecutionPool.enqueue(message) + } else { + scheduledExecutor.schedule( + { + boltExecutionPool.enqueue(message) + }, + queuePushbackWait, TimeUnit.MILLISECONDS + ) + } } + } } @@ -310,6 +351,8 @@ class KumulusTopology( fun resetMetrics() { this.atomicMaxThreadsInUse.set(0) this.boltExecutionPool.maxSize.set(0) + this.atomicNumberOfMessageThatShouldBeDropped.set(0) + this.droppedMessages.clear() } private fun stopInternal() { @@ -358,4 +401,4 @@ class KumulusTopology( "Kumulus topology had crashed due to an uncaught exception", exception ) -} +} \ No newline at end of file diff --git a/src/main/kotlin/org/xyro/kumulus/component/KumulusBolt.kt b/src/main/kotlin/org/xyro/kumulus/component/KumulusBolt.kt deleted file mode 100644 index 733f9fb..0000000 --- a/src/main/kotlin/org/xyro/kumulus/component/KumulusBolt.kt +++ /dev/null @@ -1,40 +0,0 @@ -package org.xyro.kumulus.component - -import mu.KotlinLogging -import org.apache.storm.generated.ComponentCommon -import org.apache.storm.generated.GlobalStreamId -import org.apache.storm.generated.Grouping -import org.apache.storm.task.OutputCollector -import org.apache.storm.task.TopologyContext -import org.apache.storm.topology.IRichBolt -import org.xyro.kumulus.KumulusTuple -import org.xyro.kumulus.collector.KumulusBoltCollector - -class KumulusBolt( - config: Map, - context: TopologyContext, - componentInstance: IRichBolt, - common: ComponentCommon? -) : KumulusComponent(config, context) { - companion object { - private val logger = KotlinLogging.logger {} - } - - val inputs: Map = common?._inputs?.toMap() ?: mapOf() - val streams = common?._streams?.toMap() ?: mapOf() - - var tickSecs: Number? = null - - private val bolt: IRichBolt = componentInstance - - fun prepare(collector: KumulusBoltCollector) { - logger.info { "Created bolt '$componentId' with taskId $taskId (index: ${context.thisTaskIndex}). Object hashcode: ${this.hashCode()}" } - bolt.prepare(config, context, OutputCollector(collector)) - super.prepare() - } - - fun execute(tuple: KumulusTuple) { - logger.debug { "Executing bolt '$componentId' with taskId $taskId (index: ${context.thisTaskIndex}). Input: ${tuple.kTuple}" } - bolt.execute(tuple.kTuple) - } -} diff --git a/src/main/kotlin/org/xyro/kumulus/component/KumulusComponent.kt b/src/main/kotlin/org/xyro/kumulus/component/KumulusComponent.kt deleted file mode 100644 index 0579d51..0000000 --- a/src/main/kotlin/org/xyro/kumulus/component/KumulusComponent.kt +++ /dev/null @@ -1,89 +0,0 @@ -package org.xyro.kumulus.component - -import org.apache.storm.generated.GlobalStreamId -import org.apache.storm.grouping.CustomStreamGrouping -import org.apache.storm.grouping.ShuffleGrouping -import org.apache.storm.task.TopologyContext -import org.apache.storm.utils.Utils -import org.xyro.kumulus.KumulusTuple -import org.xyro.kumulus.collector.KumulusBoltCollector -import org.xyro.kumulus.collector.KumulusCollector -import org.xyro.kumulus.collector.KumulusSpoutCollector -import org.xyro.kumulus.grouping.AllGrouping -import org.xyro.kumulus.grouping.FieldsGrouping -import java.io.Serializable -import java.util.concurrent.atomic.AtomicBoolean -import java.util.concurrent.atomic.AtomicLong - -abstract class KumulusComponent( - protected val config: Map, - val context: TopologyContext -) { - val inUse = AtomicBoolean(false) - val isReady = AtomicBoolean(false) - - /** - * stream -> (component -> grouping) - */ - lateinit var groupingStateMap: Map> - - val waitStart = AtomicLong(0) - val prepareStart = AtomicLong(0) - - val componentId = context.thisComponentId!! - val taskId = context.thisTaskId - - fun prepare() { - val groupingStateMap: MutableMap> = mutableMapOf() - context.thisTargets.forEach { stream, groupings -> - groupings.forEach { component, grouping -> - val kGrouping = if (grouping.is_set_all) { - AllGrouping() - } else if (grouping.is_set_none || grouping.is_set_shuffle || grouping.is_set_local_or_shuffle) { - ShuffleGrouping() - } else if (grouping.is_set_fields) { - FieldsGrouping(grouping._fields, context.thisOutputFieldsForStreams[stream]!!) - } else if (grouping.is_set_custom_serialized) { - val customGrouping = Utils.javaDeserialize(grouping._custom_serialized, Serializable::class.java)!! - customGrouping as CustomStreamGrouping - } else { - throw UnsupportedOperationException("Grouping type $grouping isn't currently supported by Kumulus") - } - kGrouping.prepare(this.context, GlobalStreamId(component, stream), context.getComponentTasks(component)) - groupingStateMap[stream] = (groupingStateMap[stream] ?: mutableMapOf()).also { - it[component] = kGrouping - } - } - } - this.groupingStateMap = groupingStateMap - isReady.set(true) - } - - override fun toString(): String { - return "[Component $componentId->$taskId]" - } -} - -abstract class KumulusMessage(val component: KumulusComponent) - -abstract class PrepareMessage( - component: KumulusComponent, - val collector: KumulusCollector -) : KumulusMessage(component) - -class SpoutPrepareMessage(component: KumulusComponent, collector: KumulusSpoutCollector) : - PrepareMessage(component, collector) - -class BoltPrepareMessage(component: KumulusComponent, collector: KumulusBoltCollector) : - PrepareMessage(component, collector) - -class ExecuteMessage(component: KumulusComponent, val tuple: KumulusTuple) : - KumulusMessage(component) - -class AckMessage( - spout: KumulusSpout, - val spoutMessageId: Any?, - val ack: Boolean, - val timeoutComponents: List, - val failedComponents: List -) : KumulusMessage(spout) diff --git a/src/main/kotlin/org/xyro/kumulus/component/KumulusFailureNotificationSpout.kt b/src/main/kotlin/org/xyro/kumulus/component/KumulusFailureNotificationSpout.kt deleted file mode 100644 index 9dfcc95..0000000 --- a/src/main/kotlin/org/xyro/kumulus/component/KumulusFailureNotificationSpout.kt +++ /dev/null @@ -1,13 +0,0 @@ -package org.xyro.kumulus.component - -/** - * Implementing this interface by a spout means that the spout will be notified on spout message failures. - * Report includes the failed bolts. This interface is mutually exclusive with KumulusTimeoutNotificationSpout - * @see KumulusTimeoutNotificationSpout - */ -interface KumulusFailureNotificationSpout { - /** - * Spout fail hook that includes a list of failed bolts (bolts that did not failed the msgId instead of acking) - */ - fun messageIdFailure(msgId: Any?, failedComponents: List) -} diff --git a/src/main/kotlin/org/xyro/kumulus/component/KumulusSpout.kt b/src/main/kotlin/org/xyro/kumulus/component/KumulusSpout.kt deleted file mode 100644 index 3699c3a..0000000 --- a/src/main/kotlin/org/xyro/kumulus/component/KumulusSpout.kt +++ /dev/null @@ -1,121 +0,0 @@ -package org.xyro.kumulus.component - -import mu.KotlinLogging -import org.apache.storm.spout.SpoutOutputCollector -import org.apache.storm.task.TopologyContext -import org.apache.storm.topology.IRichSpout -import org.xyro.kumulus.KumulusAcker -import org.xyro.kumulus.KumulusTopology -import org.xyro.kumulus.collector.KumulusSpoutCollector -import java.util.concurrent.LinkedBlockingQueue -import java.util.concurrent.atomic.AtomicBoolean - -class KumulusSpout( - config: Map, - context: TopologyContext, - componentInstance: IRichSpout -) : KumulusComponent(config, context) { - companion object { - private val logger = KotlinLogging.logger {} - } - - val spout: IRichSpout = componentInstance - - private val deactivationLock = Any() - private val deactivated = AtomicBoolean(false) - - val queue = LinkedBlockingQueue() - - fun prepare(collector: KumulusSpoutCollector) { - logger.debug { "Created spout '$componentId' with taskId $taskId (index: ${context.thisTaskIndex}). Object hashcode: ${this.hashCode()}" } - spout.open(config, context, SpoutOutputCollector(collector)) - super.prepare() - } - - private fun nextTuple() { - spout.nextTuple() - } - - private fun ack(msgId: Any?) { - spout.ack(msgId) - } - - private fun fail( - msgId: Any?, - timeoutComponents: List, - failedComponents: List - ) { - if (spout is KumulusTimeoutNotificationSpout) { - spout.messageIdFailure(msgId, failedComponents, timeoutComponents) - } else if (spout is KumulusFailureNotificationSpout) { - spout.messageIdFailure(msgId, failedComponents + timeoutComponents) - } - spout.fail(msgId) - } - - fun activate() { - spout.activate() - } - - fun deactivate() { - if (!deactivated.get()) { - synchronized(deactivationLock) { - if (!deactivated.get()) { - deactivated.set(true) - spout.deactivate() - } - } - } - } - - fun start(topology: KumulusTopology) { - Thread { - try { - while (true) { - if (isReady.get()) { - activate() - break - } - Thread.sleep(topology.readyPollSleepTime) - } - while (true) { - mainLoopMethod(topology.acker) - if (!isReady.get()) { - spout.deactivate() - return@Thread - } - } - } catch (e: Exception) { - logger.error("An uncaught exception in spout '$componentId' (taskId: $taskId) had forced a Kumulus shutdown", e) - topology.throwException(e) - } - }.apply { - isDaemon = true - start() - } - } - - private fun mainLoopMethod(acker: KumulusAcker) { - queue.poll()?.also { ackMessage -> - if (ackMessage.ack) { - ack(ackMessage.spoutMessageId) - } else { - fail(ackMessage.spoutMessageId, ackMessage.timeoutComponents, ackMessage.failedComponents) - } - }.let { - if (it == null && isReady.get()) { - if (acker.waitForSpoutAvailability()) { - if (inUse.compareAndSet(false, true)) { - try { - if (isReady.get()) { - nextTuple() - } - } finally { - inUse.set(false) - } - } - } - } - } - } -} diff --git a/src/main/kotlin/org/xyro/kumulus/component/KumulusTimeoutNotificationSpout.kt b/src/main/kotlin/org/xyro/kumulus/component/KumulusTimeoutNotificationSpout.kt deleted file mode 100644 index c506e43..0000000 --- a/src/main/kotlin/org/xyro/kumulus/component/KumulusTimeoutNotificationSpout.kt +++ /dev/null @@ -1,14 +0,0 @@ -package org.xyro.kumulus.component - -/** - * Implementing this interface by a spout means that the spout will be notified on spout message timeouts. - * Report includes the timeout bolts. - * This interface is mutually exclusive with KumulusFailureNotificationSpout - * @see KumulusFailureNotificationSpout - */ -interface KumulusTimeoutNotificationSpout { - /** - * Spout fail hook that includes a list of timeout bolts (bolts that did not ack/fail the msgId) - */ - fun messageIdFailure(msgId: Any?, failedComponents: List, timeoutComponents: List) -} diff --git a/src/main/kotlin/org/xyro/kumulus/component/TupleImpl.kt b/src/main/kotlin/org/xyro/kumulus/component/TupleImpl.kt deleted file mode 100644 index 57c1902..0000000 --- a/src/main/kotlin/org/xyro/kumulus/component/TupleImpl.kt +++ /dev/null @@ -1,179 +0,0 @@ -package org.xyro.kumulus.component - -import org.apache.storm.generated.GlobalStreamId -import org.apache.storm.task.GeneralTopologyContext -import org.apache.storm.tuple.Fields -import org.apache.storm.tuple.MessageId -import org.apache.storm.tuple.Tuple - -open class TupleImpl : Tuple { - val spoutMessageId: Any? - - private val context: GeneralTopologyContext - private val values: List - private val taskId: Int - private val streamId: String - private val id: MessageId - - constructor(context: GeneralTopologyContext, values: List, taskId: Int, streamId: String, id: MessageId, spoutMessageId: Any?) { - this.context = context - this.values = values - this.taskId = taskId - this.streamId = streamId - this.id = id - this.spoutMessageId = spoutMessageId - val componentId = context.getComponentId(taskId) - val schema = context.getComponentOutputFields(componentId, streamId) - if (values.size != schema.size()) { - throw IllegalArgumentException( - "Tuple created with wrong number of fields. " + - "Expected " + schema.size() + " fields but got " + - values.size + " fields" - ) - } - } - - constructor(context: GeneralTopologyContext, values: List, taskId: Int, streamId: String) : - this(context, values, taskId, streamId, MessageId.makeUnanchored(), null) - - override fun size(): Int { - return values.size - } - - override fun fieldIndex(field: String): Int { - return fields.fieldIndex(field) - } - - override operator fun contains(field: String): Boolean { - return fields.contains(field) - } - - override fun getValue(i: Int): Any { - return values[i] - } - - override fun getString(i: Int): String { - return values[i] as String - } - - override fun getInteger(i: Int): Int? { - return values[i] as Int - } - - override fun getLong(i: Int): Long? { - return values[i] as Long - } - - override fun getBoolean(i: Int): Boolean? { - return values[i] as Boolean - } - - override fun getShort(i: Int): Short? { - return values[i] as Short - } - - override fun getByte(i: Int): Byte? { - return values[i] as Byte - } - - override fun getDouble(i: Int): Double? { - return values[i] as Double - } - - override fun getFloat(i: Int): Float? { - return values[i] as Float - } - - override fun getBinary(i: Int): ByteArray { - return values[i] as ByteArray - } - - override fun getValueByField(field: String): Any { - return values[fieldIndex(field)] - } - - override fun getStringByField(field: String): String { - return values[fieldIndex(field)] as String - } - - override fun getIntegerByField(field: String): Int? { - return values[fieldIndex(field)] as Int - } - - override fun getLongByField(field: String): Long? { - return values[fieldIndex(field)] as Long - } - - override fun getBooleanByField(field: String): Boolean? { - return values[fieldIndex(field)] as Boolean - } - - override fun getShortByField(field: String): Short? { - return values[fieldIndex(field)] as Short - } - - override fun getByteByField(field: String): Byte? { - return values[fieldIndex(field)] as Byte - } - - override fun getDoubleByField(field: String): Double? { - return values[fieldIndex(field)] as Double - } - - override fun getFloatByField(field: String): Float? { - return values[fieldIndex(field)] as Float - } - - override fun getBinaryByField(field: String): ByteArray { - return values[fieldIndex(field)] as ByteArray - } - - override fun getValues(): List { - return values - } - - override fun getFields(): Fields { - return context.getComponentOutputFields(sourceComponent, sourceStreamId) - } - - override fun select(selector: Fields): List { - return fields.select(selector, values) - } - - @Deprecated("", ReplaceWith("sourceGlobalStreamId")) - override fun getSourceGlobalStreamid(): GlobalStreamId { - return sourceGlobalStreamId - } - - override fun getSourceGlobalStreamId(): GlobalStreamId { - return GlobalStreamId(sourceComponent, streamId) - } - - override fun getSourceComponent(): String { - return context.getComponentId(taskId) - } - - override fun getSourceTask(): Int { - return taskId - } - - override fun getSourceStreamId(): String { - return streamId - } - - override fun getMessageId(): MessageId { - return id - } - - override fun toString(): String { - return """source: $sourceComponent:$taskId, stream: $streamId, id: $id, $values [spoutMessageId: $spoutMessageId]""" - } - - override fun equals(other: Any?): Boolean { - return this === other - } - - override fun hashCode(): Int { - return System.identityHashCode(this) - } -} From 27652739466cac54998e47a884cd92b989f20e27 Mon Sep 17 00:00:00 2001 From: Yoav Schwartz Date: Wed, 12 Nov 2025 11:25:14 +0200 Subject: [PATCH 02/10] compy component changes --- .../org/xyro/kumulus/component/KumulusBolt.kt | 40 ++++ .../kumulus/component/KumulusComponent.kt | 89 +++++++++ .../KumulusFailureNotificationSpout.kt | 13 ++ .../xyro/kumulus/component/KumulusSpout.kt | 121 ++++++++++++ .../KumulusTimeoutNotificationSpout.kt | 14 ++ .../org/xyro/kumulus/component/TupleImpl.kt | 179 ++++++++++++++++++ 6 files changed, 456 insertions(+) create mode 100644 src/main/kotlin/org/xyro/kumulus/component/KumulusBolt.kt create mode 100644 src/main/kotlin/org/xyro/kumulus/component/KumulusComponent.kt create mode 100644 src/main/kotlin/org/xyro/kumulus/component/KumulusFailureNotificationSpout.kt create mode 100644 src/main/kotlin/org/xyro/kumulus/component/KumulusSpout.kt create mode 100644 src/main/kotlin/org/xyro/kumulus/component/KumulusTimeoutNotificationSpout.kt create mode 100644 src/main/kotlin/org/xyro/kumulus/component/TupleImpl.kt diff --git a/src/main/kotlin/org/xyro/kumulus/component/KumulusBolt.kt b/src/main/kotlin/org/xyro/kumulus/component/KumulusBolt.kt new file mode 100644 index 0000000..733f9fb --- /dev/null +++ b/src/main/kotlin/org/xyro/kumulus/component/KumulusBolt.kt @@ -0,0 +1,40 @@ +package org.xyro.kumulus.component + +import mu.KotlinLogging +import org.apache.storm.generated.ComponentCommon +import org.apache.storm.generated.GlobalStreamId +import org.apache.storm.generated.Grouping +import org.apache.storm.task.OutputCollector +import org.apache.storm.task.TopologyContext +import org.apache.storm.topology.IRichBolt +import org.xyro.kumulus.KumulusTuple +import org.xyro.kumulus.collector.KumulusBoltCollector + +class KumulusBolt( + config: Map, + context: TopologyContext, + componentInstance: IRichBolt, + common: ComponentCommon? +) : KumulusComponent(config, context) { + companion object { + private val logger = KotlinLogging.logger {} + } + + val inputs: Map = common?._inputs?.toMap() ?: mapOf() + val streams = common?._streams?.toMap() ?: mapOf() + + var tickSecs: Number? = null + + private val bolt: IRichBolt = componentInstance + + fun prepare(collector: KumulusBoltCollector) { + logger.info { "Created bolt '$componentId' with taskId $taskId (index: ${context.thisTaskIndex}). Object hashcode: ${this.hashCode()}" } + bolt.prepare(config, context, OutputCollector(collector)) + super.prepare() + } + + fun execute(tuple: KumulusTuple) { + logger.debug { "Executing bolt '$componentId' with taskId $taskId (index: ${context.thisTaskIndex}). Input: ${tuple.kTuple}" } + bolt.execute(tuple.kTuple) + } +} diff --git a/src/main/kotlin/org/xyro/kumulus/component/KumulusComponent.kt b/src/main/kotlin/org/xyro/kumulus/component/KumulusComponent.kt new file mode 100644 index 0000000..b367ed4 --- /dev/null +++ b/src/main/kotlin/org/xyro/kumulus/component/KumulusComponent.kt @@ -0,0 +1,89 @@ +package org.xyro.kumulus.component + +import org.apache.storm.generated.GlobalStreamId +import org.apache.storm.grouping.CustomStreamGrouping +import org.apache.storm.grouping.ShuffleGrouping +import org.apache.storm.task.TopologyContext +import org.apache.storm.utils.Utils +import org.xyro.kumulus.KumulusTuple +import org.xyro.kumulus.collector.KumulusBoltCollector +import org.xyro.kumulus.collector.KumulusCollector +import org.xyro.kumulus.collector.KumulusSpoutCollector +import org.xyro.kumulus.grouping.AllGrouping +import org.xyro.kumulus.grouping.FieldsGrouping +import java.io.Serializable +import java.util.concurrent.atomic.AtomicBoolean +import java.util.concurrent.atomic.AtomicLong + +abstract class KumulusComponent( + protected val config: Map, + val context: TopologyContext +) { + val inUse = AtomicBoolean(false) + val isReady = AtomicBoolean(false) + + /** + * stream -> (component -> grouping) + */ + lateinit var groupingStateMap: Map> + + val waitStart = AtomicLong(0) + val prepareStart = AtomicLong(0) + + val componentId = context.thisComponentId!! + val taskId = context.thisTaskId + + fun prepare() { + val groupingStateMap: MutableMap> = mutableMapOf() + context.thisTargets.forEach { stream, groupings -> + groupings.forEach { component, grouping -> + val kGrouping = if (grouping.is_set_all) { + AllGrouping() + } else if (grouping.is_set_none || grouping.is_set_shuffle || grouping.is_set_local_or_shuffle) { + ShuffleGrouping() + } else if (grouping.is_set_fields) { + FieldsGrouping(grouping._fields, context.thisOutputFieldsForStreams[stream]!!) + } else if (grouping.is_set_custom_serialized) { + val customGrouping = Utils.javaDeserialize(grouping._custom_serialized, Serializable::class.java)!! + customGrouping as CustomStreamGrouping + } else { + throw UnsupportedOperationException("Grouping type $grouping isn't currently supported by Kumulus") + } + kGrouping.prepare(this.context, GlobalStreamId(component, stream), context.getComponentTasks(component)) + groupingStateMap[stream] = (groupingStateMap[stream] ?: mutableMapOf()).also { + it[component] = kGrouping + } + } + } + this.groupingStateMap = groupingStateMap + isReady.set(true) + } + + override fun toString(): String { + return "[Component $componentId->$taskId]" + } +} + +abstract class KumulusMessage(val component: KumulusComponent) + +abstract class PrepareMessage( + component: KumulusComponent, + val collector: KumulusCollector +) : KumulusMessage(component) + +class SpoutPrepareMessage(component: KumulusComponent, collector: KumulusSpoutCollector) : + PrepareMessage(component, collector) + +class BoltPrepareMessage(component: KumulusComponent, collector: KumulusBoltCollector) : + PrepareMessage(component, collector) + +class ExecuteMessage(component: KumulusComponent, val tuple: KumulusTuple, val isLate: AtomicBoolean = AtomicBoolean(false)) : + KumulusMessage(component) + +class AckMessage( + spout: KumulusSpout, + val spoutMessageId: Any?, + val ack: Boolean, + val timeoutComponents: List, + val failedComponents: List +) : KumulusMessage(spout) diff --git a/src/main/kotlin/org/xyro/kumulus/component/KumulusFailureNotificationSpout.kt b/src/main/kotlin/org/xyro/kumulus/component/KumulusFailureNotificationSpout.kt new file mode 100644 index 0000000..9dfcc95 --- /dev/null +++ b/src/main/kotlin/org/xyro/kumulus/component/KumulusFailureNotificationSpout.kt @@ -0,0 +1,13 @@ +package org.xyro.kumulus.component + +/** + * Implementing this interface by a spout means that the spout will be notified on spout message failures. + * Report includes the failed bolts. This interface is mutually exclusive with KumulusTimeoutNotificationSpout + * @see KumulusTimeoutNotificationSpout + */ +interface KumulusFailureNotificationSpout { + /** + * Spout fail hook that includes a list of failed bolts (bolts that did not failed the msgId instead of acking) + */ + fun messageIdFailure(msgId: Any?, failedComponents: List) +} diff --git a/src/main/kotlin/org/xyro/kumulus/component/KumulusSpout.kt b/src/main/kotlin/org/xyro/kumulus/component/KumulusSpout.kt new file mode 100644 index 0000000..3699c3a --- /dev/null +++ b/src/main/kotlin/org/xyro/kumulus/component/KumulusSpout.kt @@ -0,0 +1,121 @@ +package org.xyro.kumulus.component + +import mu.KotlinLogging +import org.apache.storm.spout.SpoutOutputCollector +import org.apache.storm.task.TopologyContext +import org.apache.storm.topology.IRichSpout +import org.xyro.kumulus.KumulusAcker +import org.xyro.kumulus.KumulusTopology +import org.xyro.kumulus.collector.KumulusSpoutCollector +import java.util.concurrent.LinkedBlockingQueue +import java.util.concurrent.atomic.AtomicBoolean + +class KumulusSpout( + config: Map, + context: TopologyContext, + componentInstance: IRichSpout +) : KumulusComponent(config, context) { + companion object { + private val logger = KotlinLogging.logger {} + } + + val spout: IRichSpout = componentInstance + + private val deactivationLock = Any() + private val deactivated = AtomicBoolean(false) + + val queue = LinkedBlockingQueue() + + fun prepare(collector: KumulusSpoutCollector) { + logger.debug { "Created spout '$componentId' with taskId $taskId (index: ${context.thisTaskIndex}). Object hashcode: ${this.hashCode()}" } + spout.open(config, context, SpoutOutputCollector(collector)) + super.prepare() + } + + private fun nextTuple() { + spout.nextTuple() + } + + private fun ack(msgId: Any?) { + spout.ack(msgId) + } + + private fun fail( + msgId: Any?, + timeoutComponents: List, + failedComponents: List + ) { + if (spout is KumulusTimeoutNotificationSpout) { + spout.messageIdFailure(msgId, failedComponents, timeoutComponents) + } else if (spout is KumulusFailureNotificationSpout) { + spout.messageIdFailure(msgId, failedComponents + timeoutComponents) + } + spout.fail(msgId) + } + + fun activate() { + spout.activate() + } + + fun deactivate() { + if (!deactivated.get()) { + synchronized(deactivationLock) { + if (!deactivated.get()) { + deactivated.set(true) + spout.deactivate() + } + } + } + } + + fun start(topology: KumulusTopology) { + Thread { + try { + while (true) { + if (isReady.get()) { + activate() + break + } + Thread.sleep(topology.readyPollSleepTime) + } + while (true) { + mainLoopMethod(topology.acker) + if (!isReady.get()) { + spout.deactivate() + return@Thread + } + } + } catch (e: Exception) { + logger.error("An uncaught exception in spout '$componentId' (taskId: $taskId) had forced a Kumulus shutdown", e) + topology.throwException(e) + } + }.apply { + isDaemon = true + start() + } + } + + private fun mainLoopMethod(acker: KumulusAcker) { + queue.poll()?.also { ackMessage -> + if (ackMessage.ack) { + ack(ackMessage.spoutMessageId) + } else { + fail(ackMessage.spoutMessageId, ackMessage.timeoutComponents, ackMessage.failedComponents) + } + }.let { + if (it == null && isReady.get()) { + if (acker.waitForSpoutAvailability()) { + if (inUse.compareAndSet(false, true)) { + try { + if (isReady.get()) { + nextTuple() + } + } finally { + inUse.set(false) + } + } + } + } + } + } +} diff --git a/src/main/kotlin/org/xyro/kumulus/component/KumulusTimeoutNotificationSpout.kt b/src/main/kotlin/org/xyro/kumulus/component/KumulusTimeoutNotificationSpout.kt new file mode 100644 index 0000000..c506e43 --- /dev/null +++ b/src/main/kotlin/org/xyro/kumulus/component/KumulusTimeoutNotificationSpout.kt @@ -0,0 +1,14 @@ +package org.xyro.kumulus.component + +/** + * Implementing this interface by a spout means that the spout will be notified on spout message timeouts. + * Report includes the timeout bolts. + * This interface is mutually exclusive with KumulusFailureNotificationSpout + * @see KumulusFailureNotificationSpout + */ +interface KumulusTimeoutNotificationSpout { + /** + * Spout fail hook that includes a list of timeout bolts (bolts that did not ack/fail the msgId) + */ + fun messageIdFailure(msgId: Any?, failedComponents: List, timeoutComponents: List) +} diff --git a/src/main/kotlin/org/xyro/kumulus/component/TupleImpl.kt b/src/main/kotlin/org/xyro/kumulus/component/TupleImpl.kt new file mode 100644 index 0000000..57c1902 --- /dev/null +++ b/src/main/kotlin/org/xyro/kumulus/component/TupleImpl.kt @@ -0,0 +1,179 @@ +package org.xyro.kumulus.component + +import org.apache.storm.generated.GlobalStreamId +import org.apache.storm.task.GeneralTopologyContext +import org.apache.storm.tuple.Fields +import org.apache.storm.tuple.MessageId +import org.apache.storm.tuple.Tuple + +open class TupleImpl : Tuple { + val spoutMessageId: Any? + + private val context: GeneralTopologyContext + private val values: List + private val taskId: Int + private val streamId: String + private val id: MessageId + + constructor(context: GeneralTopologyContext, values: List, taskId: Int, streamId: String, id: MessageId, spoutMessageId: Any?) { + this.context = context + this.values = values + this.taskId = taskId + this.streamId = streamId + this.id = id + this.spoutMessageId = spoutMessageId + val componentId = context.getComponentId(taskId) + val schema = context.getComponentOutputFields(componentId, streamId) + if (values.size != schema.size()) { + throw IllegalArgumentException( + "Tuple created with wrong number of fields. " + + "Expected " + schema.size() + " fields but got " + + values.size + " fields" + ) + } + } + + constructor(context: GeneralTopologyContext, values: List, taskId: Int, streamId: String) : + this(context, values, taskId, streamId, MessageId.makeUnanchored(), null) + + override fun size(): Int { + return values.size + } + + override fun fieldIndex(field: String): Int { + return fields.fieldIndex(field) + } + + override operator fun contains(field: String): Boolean { + return fields.contains(field) + } + + override fun getValue(i: Int): Any { + return values[i] + } + + override fun getString(i: Int): String { + return values[i] as String + } + + override fun getInteger(i: Int): Int? { + return values[i] as Int + } + + override fun getLong(i: Int): Long? { + return values[i] as Long + } + + override fun getBoolean(i: Int): Boolean? { + return values[i] as Boolean + } + + override fun getShort(i: Int): Short? { + return values[i] as Short + } + + override fun getByte(i: Int): Byte? { + return values[i] as Byte + } + + override fun getDouble(i: Int): Double? { + return values[i] as Double + } + + override fun getFloat(i: Int): Float? { + return values[i] as Float + } + + override fun getBinary(i: Int): ByteArray { + return values[i] as ByteArray + } + + override fun getValueByField(field: String): Any { + return values[fieldIndex(field)] + } + + override fun getStringByField(field: String): String { + return values[fieldIndex(field)] as String + } + + override fun getIntegerByField(field: String): Int? { + return values[fieldIndex(field)] as Int + } + + override fun getLongByField(field: String): Long? { + return values[fieldIndex(field)] as Long + } + + override fun getBooleanByField(field: String): Boolean? { + return values[fieldIndex(field)] as Boolean + } + + override fun getShortByField(field: String): Short? { + return values[fieldIndex(field)] as Short + } + + override fun getByteByField(field: String): Byte? { + return values[fieldIndex(field)] as Byte + } + + override fun getDoubleByField(field: String): Double? { + return values[fieldIndex(field)] as Double + } + + override fun getFloatByField(field: String): Float? { + return values[fieldIndex(field)] as Float + } + + override fun getBinaryByField(field: String): ByteArray { + return values[fieldIndex(field)] as ByteArray + } + + override fun getValues(): List { + return values + } + + override fun getFields(): Fields { + return context.getComponentOutputFields(sourceComponent, sourceStreamId) + } + + override fun select(selector: Fields): List { + return fields.select(selector, values) + } + + @Deprecated("", ReplaceWith("sourceGlobalStreamId")) + override fun getSourceGlobalStreamid(): GlobalStreamId { + return sourceGlobalStreamId + } + + override fun getSourceGlobalStreamId(): GlobalStreamId { + return GlobalStreamId(sourceComponent, streamId) + } + + override fun getSourceComponent(): String { + return context.getComponentId(taskId) + } + + override fun getSourceTask(): Int { + return taskId + } + + override fun getSourceStreamId(): String { + return streamId + } + + override fun getMessageId(): MessageId { + return id + } + + override fun toString(): String { + return """source: $sourceComponent:$taskId, stream: $streamId, id: $id, $values [spoutMessageId: $spoutMessageId]""" + } + + override fun equals(other: Any?): Boolean { + return this === other + } + + override fun hashCode(): Int { + return System.identityHashCode(this) + } +} From 087fc9896ad6dad25b24549118225f4151f18953 Mon Sep 17 00:00:00 2001 From: Yoav Schwartz Date: Wed, 12 Nov 2025 11:26:33 +0200 Subject: [PATCH 03/10] add test --- .../xyro/kumulus/TestDroppingStaleMessages.kt | 119 ++++++++++++++++++ 1 file changed, 119 insertions(+) create mode 100644 src/test/kotlin/org/xyro/kumulus/TestDroppingStaleMessages.kt diff --git a/src/test/kotlin/org/xyro/kumulus/TestDroppingStaleMessages.kt b/src/test/kotlin/org/xyro/kumulus/TestDroppingStaleMessages.kt new file mode 100644 index 0000000..8b22c5e --- /dev/null +++ b/src/test/kotlin/org/xyro/kumulus/TestDroppingStaleMessages.kt @@ -0,0 +1,119 @@ +package org.xyro.kumulus + +import mu.KotlinLogging +import org.apache.storm.Config +import org.apache.storm.spout.SpoutOutputCollector +import org.apache.storm.task.OutputCollector +import org.apache.storm.task.TopologyContext +import org.apache.storm.topology.BasicOutputCollector +import org.apache.storm.topology.IRichBolt +import org.apache.storm.topology.OutputFieldsDeclarer +import org.apache.storm.tuple.Fields +import org.apache.storm.tuple.Tuple +import org.junit.Test +import java.util.concurrent.TimeUnit +import java.util.concurrent.atomic.AtomicLong +import kotlin.test.assertTrue + +class TestDroppingStaleMessages { + @Test + fun testLatentUnanchoredBolt() { + val builder = org.apache.storm.topology.TopologyBuilder() + val config: MutableMap = mutableMapOf() + config[Config.TOPOLOGY_MAX_SPOUT_PENDING] = 1L + config[KumulusTopology.CONF_THREAD_POOL_CORE_SIZE] = 5L + config[KumulusTopology.CONF_LATE_MESSAGES_DROPPING_SHOULD_DROP] = true; + config[KumulusTopology.CONF_LATE_MESSAGES_DROPPING_STREAMS_NAME] = setOf("default"); + config[KumulusTopology.CONF_LATE_MESSAGES_DROPPING_MAX_WAIT_SECONDS] = 1; + + builder.setSpout("spout", LatencyDeltaSpout()) + + builder.setBolt("unanchoring-bolt", UnanchoringBolt()) + .noneGrouping("spout") + + builder.setBolt("delay-unanchored-bolt", StuckBolt()) + .noneGrouping("unanchoring-bolt") + + + val stormTopology = builder.createTopology()!! + val kumulusTopology = + KumulusStormTransformer.initializeTopology(stormTopology, config, "test") + + kumulusTopology.prepare(10, TimeUnit.SECONDS) + kumulusTopology.start(block = false) + Thread.sleep(5000) + kumulusTopology.stop() + + val dropped = kumulusTopology.numOfMessagesToDrop + logger.info { "Dropped ${dropped} messages" } + assertTrue { dropped > 0} + } + + class LatencyDeltaSpout : DummySpout({ + it.declare(Fields("id")) + }) { + private var index: Int = 0 + private var lastCall: Long? = 0 + + override fun open(conf: MutableMap?, context: TopologyContext?, collector: SpoutOutputCollector?) { + super.open(conf, context, collector) + this.index = 0 + this.lastCall = null + } + + override fun nextTuple() { + val now = System.nanoTime() + if (this.lastCall != null) { + val tookMillis = TimeUnit.NANOSECONDS.toMillis(now - this.lastCall!!) + sumWait.addAndGet(tookMillis) + if (tookMillis > 100) { + logger.error { "Took $tookMillis to nextTuple" } + } + calledCount.incrementAndGet() + } + this.lastCall = now + val messageId = ++index + collector.emit(listOf(messageId), messageId) + } + } + + class UnanchoringBolt : IRichBolt { + private lateinit var collector: OutputCollector + + override fun execute(input: Tuple) { + collector.emit(input.values) + collector.ack(input) + } + + override fun prepare(p0: MutableMap?, p1: TopologyContext?, p2: OutputCollector?) { + this.collector = p2!! + } + + override fun cleanup() = Unit + + override fun getComponentConfiguration(): MutableMap { + return mutableMapOf() + } + + override fun declareOutputFields(p0: OutputFieldsDeclarer) { + p0.declare(Fields("id")) + } + } + + class StuckBolt : DummyBolt({ + it.declare(Fields("id")) + }) { + override fun execute(input: Tuple, collector: BasicOutputCollector) { + logger.info { "StuckBolt: started" } + while (true){ + Thread.sleep(50) + } + } + } + + companion object { + private val logger = KotlinLogging.logger {} + private val sumWait = AtomicLong(0) + private val calledCount = AtomicLong(0) + } +} From 680af3ef2679490cc2f3699abadaa861fdcc8659 Mon Sep 17 00:00:00 2001 From: Yoav Schwartz Date: Wed, 12 Nov 2025 11:30:17 +0200 Subject: [PATCH 04/10] update pom.xml version --- pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index 9d7c6f9..65bf1ef 100644 --- a/pom.xml +++ b/pom.xml @@ -7,7 +7,7 @@ org.xyro kumulus - 0.1.41 + 0.1.42 kumulus A lightweight, in-process, drop-in replacement for Apache Storm From e2f6374d833d055eba12fb3d2cb4c6a196835ae1 Mon Sep 17 00:00:00 2001 From: Yoav Schwartz Date: Sun, 16 Nov 2025 21:35:42 +0200 Subject: [PATCH 05/10] move monitoring to hook --- .../org/xyro/kumulus/KumulusTopology.kt | 42 +++++++++++-------- 1 file changed, 25 insertions(+), 17 deletions(-) diff --git a/src/main/kotlin/org/xyro/kumulus/KumulusTopology.kt b/src/main/kotlin/org/xyro/kumulus/KumulusTopology.kt index fc6de46..31eb0db 100644 --- a/src/main/kotlin/org/xyro/kumulus/KumulusTopology.kt +++ b/src/main/kotlin/org/xyro/kumulus/KumulusTopology.kt @@ -15,7 +15,6 @@ import org.xyro.kumulus.component.KumulusMessage import org.xyro.kumulus.component.KumulusSpout import org.xyro.kumulus.component.PrepareMessage import org.xyro.kumulus.component.SpoutPrepareMessage -import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.CountDownLatch import java.util.concurrent.RejectedExecutionHandler import java.util.concurrent.ScheduledThreadPoolExecutor @@ -44,13 +43,12 @@ class KumulusTopology( .toMap() private val atomicThreadsInUse = AtomicInteger(0) private val atomicMaxThreadsInUse = AtomicInteger(0) - private val atomicNumberOfMessageThatShouldBeDropped = AtomicInteger(0) @Suppress("UNCHECKED_CAST") private val lateMessagesStreamsToDrop: Set = config[CONF_LATE_MESSAGES_DROPPING_STREAMS_NAME] as? Set ?: emptySet() private val lateMessagesShouldDrop: Boolean = config[CONF_LATE_MESSAGES_DROPPING_SHOULD_DROP] as? Boolean ?: false - private val lateMessageMaxWaitInNanos: Long = (config[CONF_LATE_MESSAGES_DROPPING_MAX_WAIT_SECONDS] as? Long ?: 10) * 1_000_000_000L - private val droppedMessages: MutableSet = ConcurrentHashMap.newKeySet() + private val lateMessageMaxWaitInNanos: Long = (config[CONF_LATE_MESSAGES_DROPPING_MAX_WAIT_SECONDS] as? Int ?: 10) * 1_000_000_000L + private val scheduledExecutorPoolSize: Int = (config[CONF_SCHEDULED_EXECUTOR_THREAD_POOL_SIZE] as? Long ?: 5L).toInt() @@ -66,6 +64,7 @@ class KumulusTopology( var onBusyBoltHook: ((String, Int, Long, Tuple) -> Unit)? = null var onBoltPrepareFinishHook: ((String, Int, Long) -> Unit)? = null var onReportErrorHook: ((String, Int, Throwable) -> Unit)? = null + var onLateMessageHook: ((String, Int, Long, Tuple) -> Unit)? = null @Suppress("MemberVisibilityCanBePrivate", "unused") val currentThreadsInUse: Int @@ -75,13 +74,6 @@ class KumulusTopology( val maxThreadsInUse: Int get() = atomicMaxThreadsInUse.get() - @Suppress("MemberVisibilityCanBePrivate") - val numOfMessagesToDrop: Int - get() = atomicNumberOfMessageThatShouldBeDropped.get() - - @Suppress("MemberVisibilityCanBePrivate") - val droppedMessagesBoltsAndTaskIds: Set - get() = droppedMessages.toMutableSet() @Suppress("MemberVisibilityCanBePrivate") val maxQueueSize: Int @@ -311,14 +303,32 @@ class KumulusTopology( logger.trace { "Component ${c.componentId}/${c.taskId} is currently busy" } var shouldEnqueue = true if (message is ExecuteMessage) { - val messageWaitStartTime = c.waitStart.get() + val messageWaitStartTime = c.waitStart.get() if (messageWaitStartTime > 0){ - if ((System.nanoTime() - messageWaitStartTime >= lateMessageMaxWaitInNanos) && + val delay = System.nanoTime() - messageWaitStartTime + if ((delay >= lateMessageMaxWaitInNanos) && lateMessagesStreamsToDrop.contains(message.tuple.kTuple.sourceStreamId)) { if (!message.isLate.get()){ - atomicNumberOfMessageThatShouldBeDropped.incrementAndGet() - droppedMessages.add(message.component.componentId) + onLateMessageHook?.let { onLateMessageHook -> + try { + scheduledExecutor.submit { + try { + onLateMessageHook( + message.component.componentId, + message.component.taskId, + delay, + message.tuple.kTuple + ) + } catch (e: Exception) { + logger.error("An exception was thrown from busy-hook callback, ignoring", e) + } + } + } catch (e: Exception) { + logger.error("An exception was thrown by busy-hook thread-pool submission, ignoring", e) + } + } + message.isLate.set(true) } @@ -351,8 +361,6 @@ class KumulusTopology( fun resetMetrics() { this.atomicMaxThreadsInUse.set(0) this.boltExecutionPool.maxSize.set(0) - this.atomicNumberOfMessageThatShouldBeDropped.set(0) - this.droppedMessages.clear() } private fun stopInternal() { From c43b7a960ff257a0dffd03ba9da829c00bfbc6f0 Mon Sep 17 00:00:00 2001 From: Yoav Schwartz Date: Sun, 16 Nov 2025 21:55:26 +0200 Subject: [PATCH 06/10] Revert "move monitoring to hook" This reverts commit e2f6374d833d055eba12fb3d2cb4c6a196835ae1. --- .../org/xyro/kumulus/KumulusTopology.kt | 42 ++++++++----------- 1 file changed, 17 insertions(+), 25 deletions(-) diff --git a/src/main/kotlin/org/xyro/kumulus/KumulusTopology.kt b/src/main/kotlin/org/xyro/kumulus/KumulusTopology.kt index 31eb0db..fc6de46 100644 --- a/src/main/kotlin/org/xyro/kumulus/KumulusTopology.kt +++ b/src/main/kotlin/org/xyro/kumulus/KumulusTopology.kt @@ -15,6 +15,7 @@ import org.xyro.kumulus.component.KumulusMessage import org.xyro.kumulus.component.KumulusSpout import org.xyro.kumulus.component.PrepareMessage import org.xyro.kumulus.component.SpoutPrepareMessage +import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.CountDownLatch import java.util.concurrent.RejectedExecutionHandler import java.util.concurrent.ScheduledThreadPoolExecutor @@ -43,12 +44,13 @@ class KumulusTopology( .toMap() private val atomicThreadsInUse = AtomicInteger(0) private val atomicMaxThreadsInUse = AtomicInteger(0) + private val atomicNumberOfMessageThatShouldBeDropped = AtomicInteger(0) @Suppress("UNCHECKED_CAST") private val lateMessagesStreamsToDrop: Set = config[CONF_LATE_MESSAGES_DROPPING_STREAMS_NAME] as? Set ?: emptySet() private val lateMessagesShouldDrop: Boolean = config[CONF_LATE_MESSAGES_DROPPING_SHOULD_DROP] as? Boolean ?: false - private val lateMessageMaxWaitInNanos: Long = (config[CONF_LATE_MESSAGES_DROPPING_MAX_WAIT_SECONDS] as? Int ?: 10) * 1_000_000_000L - + private val lateMessageMaxWaitInNanos: Long = (config[CONF_LATE_MESSAGES_DROPPING_MAX_WAIT_SECONDS] as? Long ?: 10) * 1_000_000_000L + private val droppedMessages: MutableSet = ConcurrentHashMap.newKeySet() private val scheduledExecutorPoolSize: Int = (config[CONF_SCHEDULED_EXECUTOR_THREAD_POOL_SIZE] as? Long ?: 5L).toInt() @@ -64,7 +66,6 @@ class KumulusTopology( var onBusyBoltHook: ((String, Int, Long, Tuple) -> Unit)? = null var onBoltPrepareFinishHook: ((String, Int, Long) -> Unit)? = null var onReportErrorHook: ((String, Int, Throwable) -> Unit)? = null - var onLateMessageHook: ((String, Int, Long, Tuple) -> Unit)? = null @Suppress("MemberVisibilityCanBePrivate", "unused") val currentThreadsInUse: Int @@ -74,6 +75,13 @@ class KumulusTopology( val maxThreadsInUse: Int get() = atomicMaxThreadsInUse.get() + @Suppress("MemberVisibilityCanBePrivate") + val numOfMessagesToDrop: Int + get() = atomicNumberOfMessageThatShouldBeDropped.get() + + @Suppress("MemberVisibilityCanBePrivate") + val droppedMessagesBoltsAndTaskIds: Set + get() = droppedMessages.toMutableSet() @Suppress("MemberVisibilityCanBePrivate") val maxQueueSize: Int @@ -303,32 +311,14 @@ class KumulusTopology( logger.trace { "Component ${c.componentId}/${c.taskId} is currently busy" } var shouldEnqueue = true if (message is ExecuteMessage) { - val messageWaitStartTime = c.waitStart.get() + if (messageWaitStartTime > 0){ - val delay = System.nanoTime() - messageWaitStartTime - if ((delay >= lateMessageMaxWaitInNanos) && + if ((System.nanoTime() - messageWaitStartTime >= lateMessageMaxWaitInNanos) && lateMessagesStreamsToDrop.contains(message.tuple.kTuple.sourceStreamId)) { if (!message.isLate.get()){ - onLateMessageHook?.let { onLateMessageHook -> - try { - scheduledExecutor.submit { - try { - onLateMessageHook( - message.component.componentId, - message.component.taskId, - delay, - message.tuple.kTuple - ) - } catch (e: Exception) { - logger.error("An exception was thrown from busy-hook callback, ignoring", e) - } - } - } catch (e: Exception) { - logger.error("An exception was thrown by busy-hook thread-pool submission, ignoring", e) - } - } - + atomicNumberOfMessageThatShouldBeDropped.incrementAndGet() + droppedMessages.add(message.component.componentId) message.isLate.set(true) } @@ -361,6 +351,8 @@ class KumulusTopology( fun resetMetrics() { this.atomicMaxThreadsInUse.set(0) this.boltExecutionPool.maxSize.set(0) + this.atomicNumberOfMessageThatShouldBeDropped.set(0) + this.droppedMessages.clear() } private fun stopInternal() { From a646aa48182a26eae9837915a509dedea7fc4801 Mon Sep 17 00:00:00 2001 From: Yoav Schwartz Date: Sun, 16 Nov 2025 21:56:31 +0200 Subject: [PATCH 07/10] Revert "Revert "move monitoring to hook"" This reverts commit c43b7a960ff257a0dffd03ba9da829c00bfbc6f0. --- .../org/xyro/kumulus/KumulusTopology.kt | 42 +++++++++++-------- 1 file changed, 25 insertions(+), 17 deletions(-) diff --git a/src/main/kotlin/org/xyro/kumulus/KumulusTopology.kt b/src/main/kotlin/org/xyro/kumulus/KumulusTopology.kt index fc6de46..31eb0db 100644 --- a/src/main/kotlin/org/xyro/kumulus/KumulusTopology.kt +++ b/src/main/kotlin/org/xyro/kumulus/KumulusTopology.kt @@ -15,7 +15,6 @@ import org.xyro.kumulus.component.KumulusMessage import org.xyro.kumulus.component.KumulusSpout import org.xyro.kumulus.component.PrepareMessage import org.xyro.kumulus.component.SpoutPrepareMessage -import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.CountDownLatch import java.util.concurrent.RejectedExecutionHandler import java.util.concurrent.ScheduledThreadPoolExecutor @@ -44,13 +43,12 @@ class KumulusTopology( .toMap() private val atomicThreadsInUse = AtomicInteger(0) private val atomicMaxThreadsInUse = AtomicInteger(0) - private val atomicNumberOfMessageThatShouldBeDropped = AtomicInteger(0) @Suppress("UNCHECKED_CAST") private val lateMessagesStreamsToDrop: Set = config[CONF_LATE_MESSAGES_DROPPING_STREAMS_NAME] as? Set ?: emptySet() private val lateMessagesShouldDrop: Boolean = config[CONF_LATE_MESSAGES_DROPPING_SHOULD_DROP] as? Boolean ?: false - private val lateMessageMaxWaitInNanos: Long = (config[CONF_LATE_MESSAGES_DROPPING_MAX_WAIT_SECONDS] as? Long ?: 10) * 1_000_000_000L - private val droppedMessages: MutableSet = ConcurrentHashMap.newKeySet() + private val lateMessageMaxWaitInNanos: Long = (config[CONF_LATE_MESSAGES_DROPPING_MAX_WAIT_SECONDS] as? Int ?: 10) * 1_000_000_000L + private val scheduledExecutorPoolSize: Int = (config[CONF_SCHEDULED_EXECUTOR_THREAD_POOL_SIZE] as? Long ?: 5L).toInt() @@ -66,6 +64,7 @@ class KumulusTopology( var onBusyBoltHook: ((String, Int, Long, Tuple) -> Unit)? = null var onBoltPrepareFinishHook: ((String, Int, Long) -> Unit)? = null var onReportErrorHook: ((String, Int, Throwable) -> Unit)? = null + var onLateMessageHook: ((String, Int, Long, Tuple) -> Unit)? = null @Suppress("MemberVisibilityCanBePrivate", "unused") val currentThreadsInUse: Int @@ -75,13 +74,6 @@ class KumulusTopology( val maxThreadsInUse: Int get() = atomicMaxThreadsInUse.get() - @Suppress("MemberVisibilityCanBePrivate") - val numOfMessagesToDrop: Int - get() = atomicNumberOfMessageThatShouldBeDropped.get() - - @Suppress("MemberVisibilityCanBePrivate") - val droppedMessagesBoltsAndTaskIds: Set - get() = droppedMessages.toMutableSet() @Suppress("MemberVisibilityCanBePrivate") val maxQueueSize: Int @@ -311,14 +303,32 @@ class KumulusTopology( logger.trace { "Component ${c.componentId}/${c.taskId} is currently busy" } var shouldEnqueue = true if (message is ExecuteMessage) { - val messageWaitStartTime = c.waitStart.get() + val messageWaitStartTime = c.waitStart.get() if (messageWaitStartTime > 0){ - if ((System.nanoTime() - messageWaitStartTime >= lateMessageMaxWaitInNanos) && + val delay = System.nanoTime() - messageWaitStartTime + if ((delay >= lateMessageMaxWaitInNanos) && lateMessagesStreamsToDrop.contains(message.tuple.kTuple.sourceStreamId)) { if (!message.isLate.get()){ - atomicNumberOfMessageThatShouldBeDropped.incrementAndGet() - droppedMessages.add(message.component.componentId) + onLateMessageHook?.let { onLateMessageHook -> + try { + scheduledExecutor.submit { + try { + onLateMessageHook( + message.component.componentId, + message.component.taskId, + delay, + message.tuple.kTuple + ) + } catch (e: Exception) { + logger.error("An exception was thrown from busy-hook callback, ignoring", e) + } + } + } catch (e: Exception) { + logger.error("An exception was thrown by busy-hook thread-pool submission, ignoring", e) + } + } + message.isLate.set(true) } @@ -351,8 +361,6 @@ class KumulusTopology( fun resetMetrics() { this.atomicMaxThreadsInUse.set(0) this.boltExecutionPool.maxSize.set(0) - this.atomicNumberOfMessageThatShouldBeDropped.set(0) - this.droppedMessages.clear() } private fun stopInternal() { From 45bf806b6f1f6a24ad018eb1e0cb46ac6f95d7c5 Mon Sep 17 00:00:00 2001 From: Yoav Schwartz Date: Sun, 16 Nov 2025 22:01:05 +0200 Subject: [PATCH 08/10] update test --- .../kotlin/org/xyro/kumulus/KumulusTopology.kt | 2 +- .../org/xyro/kumulus/TestDroppingStaleMessages.kt | 14 ++++++++++---- 2 files changed, 11 insertions(+), 5 deletions(-) diff --git a/src/main/kotlin/org/xyro/kumulus/KumulusTopology.kt b/src/main/kotlin/org/xyro/kumulus/KumulusTopology.kt index 31eb0db..3ac2b4b 100644 --- a/src/main/kotlin/org/xyro/kumulus/KumulusTopology.kt +++ b/src/main/kotlin/org/xyro/kumulus/KumulusTopology.kt @@ -47,7 +47,7 @@ class KumulusTopology( @Suppress("UNCHECKED_CAST") private val lateMessagesStreamsToDrop: Set = config[CONF_LATE_MESSAGES_DROPPING_STREAMS_NAME] as? Set ?: emptySet() private val lateMessagesShouldDrop: Boolean = config[CONF_LATE_MESSAGES_DROPPING_SHOULD_DROP] as? Boolean ?: false - private val lateMessageMaxWaitInNanos: Long = (config[CONF_LATE_MESSAGES_DROPPING_MAX_WAIT_SECONDS] as? Int ?: 10) * 1_000_000_000L + private val lateMessageMaxWaitInNanos: Long = (config[CONF_LATE_MESSAGES_DROPPING_MAX_WAIT_SECONDS] as? Long ?: 10) * 1_000_000_000L private val scheduledExecutorPoolSize: Int = diff --git a/src/test/kotlin/org/xyro/kumulus/TestDroppingStaleMessages.kt b/src/test/kotlin/org/xyro/kumulus/TestDroppingStaleMessages.kt index 8b22c5e..fa93169 100644 --- a/src/test/kotlin/org/xyro/kumulus/TestDroppingStaleMessages.kt +++ b/src/test/kotlin/org/xyro/kumulus/TestDroppingStaleMessages.kt @@ -17,11 +17,12 @@ import kotlin.test.assertTrue class TestDroppingStaleMessages { @Test - fun testLatentUnanchoredBolt() { + fun testLateBolt() { val builder = org.apache.storm.topology.TopologyBuilder() val config: MutableMap = mutableMapOf() config[Config.TOPOLOGY_MAX_SPOUT_PENDING] = 1L config[KumulusTopology.CONF_THREAD_POOL_CORE_SIZE] = 5L +// config[KumulusTopology.CONF_SCHEDULED_EXECUTOR_THREAD_POOL_SIZE] = 5; config[KumulusTopology.CONF_LATE_MESSAGES_DROPPING_SHOULD_DROP] = true; config[KumulusTopology.CONF_LATE_MESSAGES_DROPPING_STREAMS_NAME] = setOf("default"); config[KumulusTopology.CONF_LATE_MESSAGES_DROPPING_MAX_WAIT_SECONDS] = 1; @@ -39,14 +40,19 @@ class TestDroppingStaleMessages { val kumulusTopology = KumulusStormTransformer.initializeTopology(stormTopology, config, "test") + var lateHookCalled = false + + kumulusTopology.onLateMessageHook = { _, _, _, _ -> + lateHookCalled = true + } + kumulusTopology.prepare(10, TimeUnit.SECONDS) kumulusTopology.start(block = false) Thread.sleep(5000) kumulusTopology.stop() - val dropped = kumulusTopology.numOfMessagesToDrop - logger.info { "Dropped ${dropped} messages" } - assertTrue { dropped > 0} + logger.info { "Dropped ${lateHookCalled} messages" } + assertTrue { lateHookCalled} } class LatencyDeltaSpout : DummySpout({ From e6051f7bd5431f755948bc2a2eaad8648ea499bd Mon Sep 17 00:00:00 2001 From: Yoav Schwartz Date: Sun, 16 Nov 2025 22:02:38 +0200 Subject: [PATCH 09/10] clean --- src/main/kotlin/org/xyro/kumulus/KumulusTopology.kt | 5 ++--- .../kotlin/org/xyro/kumulus/TestDroppingStaleMessages.kt | 1 - 2 files changed, 2 insertions(+), 4 deletions(-) diff --git a/src/main/kotlin/org/xyro/kumulus/KumulusTopology.kt b/src/main/kotlin/org/xyro/kumulus/KumulusTopology.kt index 3ac2b4b..df34fdc 100644 --- a/src/main/kotlin/org/xyro/kumulus/KumulusTopology.kt +++ b/src/main/kotlin/org/xyro/kumulus/KumulusTopology.kt @@ -73,8 +73,7 @@ class KumulusTopology( @Suppress("MemberVisibilityCanBePrivate") val maxThreadsInUse: Int get() = atomicMaxThreadsInUse.get() - - + @Suppress("MemberVisibilityCanBePrivate") val maxQueueSize: Int get() = boltExecutionPool.maxSize.get() @@ -409,4 +408,4 @@ class KumulusTopology( "Kumulus topology had crashed due to an uncaught exception", exception ) -} \ No newline at end of file +} diff --git a/src/test/kotlin/org/xyro/kumulus/TestDroppingStaleMessages.kt b/src/test/kotlin/org/xyro/kumulus/TestDroppingStaleMessages.kt index fa93169..5e9868d 100644 --- a/src/test/kotlin/org/xyro/kumulus/TestDroppingStaleMessages.kt +++ b/src/test/kotlin/org/xyro/kumulus/TestDroppingStaleMessages.kt @@ -22,7 +22,6 @@ class TestDroppingStaleMessages { val config: MutableMap = mutableMapOf() config[Config.TOPOLOGY_MAX_SPOUT_PENDING] = 1L config[KumulusTopology.CONF_THREAD_POOL_CORE_SIZE] = 5L -// config[KumulusTopology.CONF_SCHEDULED_EXECUTOR_THREAD_POOL_SIZE] = 5; config[KumulusTopology.CONF_LATE_MESSAGES_DROPPING_SHOULD_DROP] = true; config[KumulusTopology.CONF_LATE_MESSAGES_DROPPING_STREAMS_NAME] = setOf("default"); config[KumulusTopology.CONF_LATE_MESSAGES_DROPPING_MAX_WAIT_SECONDS] = 1; From 90fabbedea3b94446d8be732c4a529ef6f76816d Mon Sep 17 00:00:00 2001 From: Yoav Schwartz Date: Sun, 16 Nov 2025 22:04:12 +0200 Subject: [PATCH 10/10] whitespace --- src/main/kotlin/org/xyro/kumulus/KumulusTopology.kt | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/src/main/kotlin/org/xyro/kumulus/KumulusTopology.kt b/src/main/kotlin/org/xyro/kumulus/KumulusTopology.kt index df34fdc..60466bd 100644 --- a/src/main/kotlin/org/xyro/kumulus/KumulusTopology.kt +++ b/src/main/kotlin/org/xyro/kumulus/KumulusTopology.kt @@ -73,7 +73,7 @@ class KumulusTopology( @Suppress("MemberVisibilityCanBePrivate") val maxThreadsInUse: Int get() = atomicMaxThreadsInUse.get() - + @Suppress("MemberVisibilityCanBePrivate") val maxQueueSize: Int get() = boltExecutionPool.maxSize.get() @@ -302,7 +302,6 @@ class KumulusTopology( logger.trace { "Component ${c.componentId}/${c.taskId} is currently busy" } var shouldEnqueue = true if (message is ExecuteMessage) { - val messageWaitStartTime = c.waitStart.get() if (messageWaitStartTime > 0){ val delay = System.nanoTime() - messageWaitStartTime @@ -327,7 +326,7 @@ class KumulusTopology( logger.error("An exception was thrown by busy-hook thread-pool submission, ignoring", e) } } - + message.isLate.set(true) } @@ -352,7 +351,6 @@ class KumulusTopology( ) } } - } }