diff --git a/pom.xml b/pom.xml
index 9d7c6f9..65bf1ef 100644
--- a/pom.xml
+++ b/pom.xml
@@ -7,7 +7,7 @@
org.xyro
kumulus
- 0.1.41
+ 0.1.42
kumulus
A lightweight, in-process, drop-in replacement for Apache Storm
diff --git a/src/main/kotlin/org/xyro/kumulus/KumulusTopology.kt b/src/main/kotlin/org/xyro/kumulus/KumulusTopology.kt
index 5255fa3..60466bd 100644
--- a/src/main/kotlin/org/xyro/kumulus/KumulusTopology.kt
+++ b/src/main/kotlin/org/xyro/kumulus/KumulusTopology.kt
@@ -44,6 +44,12 @@ class KumulusTopology(
private val atomicThreadsInUse = AtomicInteger(0)
private val atomicMaxThreadsInUse = AtomicInteger(0)
+ @Suppress("UNCHECKED_CAST")
+ private val lateMessagesStreamsToDrop: Set = config[CONF_LATE_MESSAGES_DROPPING_STREAMS_NAME] as? Set ?: emptySet()
+ private val lateMessagesShouldDrop: Boolean = config[CONF_LATE_MESSAGES_DROPPING_SHOULD_DROP] as? Boolean ?: false
+ private val lateMessageMaxWaitInNanos: Long = (config[CONF_LATE_MESSAGES_DROPPING_MAX_WAIT_SECONDS] as? Long ?: 10) * 1_000_000_000L
+
+
private val scheduledExecutorPoolSize: Int =
(config[CONF_SCHEDULED_EXECUTOR_THREAD_POOL_SIZE] as? Long ?: 5L).toInt()
private val rejectedExecutionHandler = RejectedExecutionHandler { _, _ ->
@@ -58,6 +64,7 @@ class KumulusTopology(
var onBusyBoltHook: ((String, Int, Long, Tuple) -> Unit)? = null
var onBoltPrepareFinishHook: ((String, Int, Long) -> Unit)? = null
var onReportErrorHook: ((String, Int, Throwable) -> Unit)? = null
+ var onLateMessageHook: ((String, Int, Long, Tuple) -> Unit)? = null
@Suppress("MemberVisibilityCanBePrivate", "unused")
val currentThreadsInUse: Int
@@ -92,6 +99,9 @@ class KumulusTopology(
const val CONF_BOLT_QUEUE_PUSHBACK_WAIT = "kumulus.bolt.pushback.wait"
const val CONF_SHUTDOWN_TIMEOUT_SECS = "kumulus.shutdown.timeout.secs"
const val CONF_SCHEDULED_EXECUTOR_THREAD_POOL_SIZE = "kumulus.executor.scheduled-executor.pool-size"
+ const val CONF_LATE_MESSAGES_DROPPING_STREAMS_NAME = "kumulus.late-messages-dropping.streams-name"
+ const val CONF_LATE_MESSAGES_DROPPING_SHOULD_DROP = "kumulus.late-messages-dropping.should-drop"
+ const val CONF_LATE_MESSAGES_DROPPING_MAX_WAIT_SECONDS = "kumulus.late-messages-dropping.max-wait-seconds"
@Suppress("unused")
@Deprecated("Use CONF_READY_POLL_SLEEP instead")
@@ -290,18 +300,56 @@ class KumulusTopology(
}
} else {
logger.trace { "Component ${c.componentId}/${c.taskId} is currently busy" }
- if (onBusyBoltHook != null && message is ExecuteMessage) {
+ var shouldEnqueue = true
+ if (message is ExecuteMessage) {
+ val messageWaitStartTime = c.waitStart.get()
+ if (messageWaitStartTime > 0){
+ val delay = System.nanoTime() - messageWaitStartTime
+ if ((delay >= lateMessageMaxWaitInNanos) &&
+ lateMessagesStreamsToDrop.contains(message.tuple.kTuple.sourceStreamId)) {
+ if (!message.isLate.get()){
+ onLateMessageHook?.let { onLateMessageHook ->
+ try {
+ scheduledExecutor.submit {
+ try {
+ onLateMessageHook(
+ message.component.componentId,
+ message.component.taskId,
+ delay,
+ message.tuple.kTuple
+ )
+ } catch (e: Exception) {
+ logger.error("An exception was thrown from busy-hook callback, ignoring", e)
+ }
+ }
+ } catch (e: Exception) {
+ logger.error("An exception was thrown by busy-hook thread-pool submission, ignoring", e)
+ }
+ }
+
+ message.isLate.set(true)
+ }
+
+ if (lateMessagesShouldDrop){
+ shouldEnqueue = false
+ }
+ }
+ }
+
c.waitStart.compareAndSet(0, System.nanoTime())
}
- if (queuePushbackWait <= 0L) {
- boltExecutionPool.enqueue(message)
- } else {
- scheduledExecutor.schedule(
- {
- boltExecutionPool.enqueue(message)
- },
- queuePushbackWait, TimeUnit.MILLISECONDS
- )
+
+ if (shouldEnqueue){
+ if (queuePushbackWait <= 0L) {
+ boltExecutionPool.enqueue(message)
+ } else {
+ scheduledExecutor.schedule(
+ {
+ boltExecutionPool.enqueue(message)
+ },
+ queuePushbackWait, TimeUnit.MILLISECONDS
+ )
+ }
}
}
}
diff --git a/src/main/kotlin/org/xyro/kumulus/component/KumulusComponent.kt b/src/main/kotlin/org/xyro/kumulus/component/KumulusComponent.kt
index 0579d51..b367ed4 100644
--- a/src/main/kotlin/org/xyro/kumulus/component/KumulusComponent.kt
+++ b/src/main/kotlin/org/xyro/kumulus/component/KumulusComponent.kt
@@ -77,7 +77,7 @@ class SpoutPrepareMessage(component: KumulusComponent, collector: KumulusSpoutCo
class BoltPrepareMessage(component: KumulusComponent, collector: KumulusBoltCollector) :
PrepareMessage(component, collector)
-class ExecuteMessage(component: KumulusComponent, val tuple: KumulusTuple) :
+class ExecuteMessage(component: KumulusComponent, val tuple: KumulusTuple, val isLate: AtomicBoolean = AtomicBoolean(false)) :
KumulusMessage(component)
class AckMessage(
diff --git a/src/test/kotlin/org/xyro/kumulus/TestDroppingStaleMessages.kt b/src/test/kotlin/org/xyro/kumulus/TestDroppingStaleMessages.kt
new file mode 100644
index 0000000..5e9868d
--- /dev/null
+++ b/src/test/kotlin/org/xyro/kumulus/TestDroppingStaleMessages.kt
@@ -0,0 +1,124 @@
+package org.xyro.kumulus
+
+import mu.KotlinLogging
+import org.apache.storm.Config
+import org.apache.storm.spout.SpoutOutputCollector
+import org.apache.storm.task.OutputCollector
+import org.apache.storm.task.TopologyContext
+import org.apache.storm.topology.BasicOutputCollector
+import org.apache.storm.topology.IRichBolt
+import org.apache.storm.topology.OutputFieldsDeclarer
+import org.apache.storm.tuple.Fields
+import org.apache.storm.tuple.Tuple
+import org.junit.Test
+import java.util.concurrent.TimeUnit
+import java.util.concurrent.atomic.AtomicLong
+import kotlin.test.assertTrue
+
+class TestDroppingStaleMessages {
+ @Test
+ fun testLateBolt() {
+ val builder = org.apache.storm.topology.TopologyBuilder()
+ val config: MutableMap = mutableMapOf()
+ config[Config.TOPOLOGY_MAX_SPOUT_PENDING] = 1L
+ config[KumulusTopology.CONF_THREAD_POOL_CORE_SIZE] = 5L
+ config[KumulusTopology.CONF_LATE_MESSAGES_DROPPING_SHOULD_DROP] = true;
+ config[KumulusTopology.CONF_LATE_MESSAGES_DROPPING_STREAMS_NAME] = setOf("default");
+ config[KumulusTopology.CONF_LATE_MESSAGES_DROPPING_MAX_WAIT_SECONDS] = 1;
+
+ builder.setSpout("spout", LatencyDeltaSpout())
+
+ builder.setBolt("unanchoring-bolt", UnanchoringBolt())
+ .noneGrouping("spout")
+
+ builder.setBolt("delay-unanchored-bolt", StuckBolt())
+ .noneGrouping("unanchoring-bolt")
+
+
+ val stormTopology = builder.createTopology()!!
+ val kumulusTopology =
+ KumulusStormTransformer.initializeTopology(stormTopology, config, "test")
+
+ var lateHookCalled = false
+
+ kumulusTopology.onLateMessageHook = { _, _, _, _ ->
+ lateHookCalled = true
+ }
+
+ kumulusTopology.prepare(10, TimeUnit.SECONDS)
+ kumulusTopology.start(block = false)
+ Thread.sleep(5000)
+ kumulusTopology.stop()
+
+ logger.info { "Dropped ${lateHookCalled} messages" }
+ assertTrue { lateHookCalled}
+ }
+
+ class LatencyDeltaSpout : DummySpout({
+ it.declare(Fields("id"))
+ }) {
+ private var index: Int = 0
+ private var lastCall: Long? = 0
+
+ override fun open(conf: MutableMap?, context: TopologyContext?, collector: SpoutOutputCollector?) {
+ super.open(conf, context, collector)
+ this.index = 0
+ this.lastCall = null
+ }
+
+ override fun nextTuple() {
+ val now = System.nanoTime()
+ if (this.lastCall != null) {
+ val tookMillis = TimeUnit.NANOSECONDS.toMillis(now - this.lastCall!!)
+ sumWait.addAndGet(tookMillis)
+ if (tookMillis > 100) {
+ logger.error { "Took $tookMillis to nextTuple" }
+ }
+ calledCount.incrementAndGet()
+ }
+ this.lastCall = now
+ val messageId = ++index
+ collector.emit(listOf(messageId), messageId)
+ }
+ }
+
+ class UnanchoringBolt : IRichBolt {
+ private lateinit var collector: OutputCollector
+
+ override fun execute(input: Tuple) {
+ collector.emit(input.values)
+ collector.ack(input)
+ }
+
+ override fun prepare(p0: MutableMap?, p1: TopologyContext?, p2: OutputCollector?) {
+ this.collector = p2!!
+ }
+
+ override fun cleanup() = Unit
+
+ override fun getComponentConfiguration(): MutableMap {
+ return mutableMapOf()
+ }
+
+ override fun declareOutputFields(p0: OutputFieldsDeclarer) {
+ p0.declare(Fields("id"))
+ }
+ }
+
+ class StuckBolt : DummyBolt({
+ it.declare(Fields("id"))
+ }) {
+ override fun execute(input: Tuple, collector: BasicOutputCollector) {
+ logger.info { "StuckBolt: started" }
+ while (true){
+ Thread.sleep(50)
+ }
+ }
+ }
+
+ companion object {
+ private val logger = KotlinLogging.logger {}
+ private val sumWait = AtomicLong(0)
+ private val calledCount = AtomicLong(0)
+ }
+}