diff --git a/README.md b/README.md index 50a48ac..3f185b0 100644 --- a/README.md +++ b/README.md @@ -21,7 +21,7 @@ Include via maven org.xyro kumulus - 0.1.41 + 0.1.42 ``` diff --git a/pom.xml b/pom.xml index 9d7c6f9..65bf1ef 100644 --- a/pom.xml +++ b/pom.xml @@ -7,7 +7,7 @@ org.xyro kumulus - 0.1.41 + 0.1.42 kumulus A lightweight, in-process, drop-in replacement for Apache Storm diff --git a/src/main/kotlin/org/xyro/kumulus/KumulusTopology.kt b/src/main/kotlin/org/xyro/kumulus/KumulusTopology.kt index 5255fa3..f14871e 100644 --- a/src/main/kotlin/org/xyro/kumulus/KumulusTopology.kt +++ b/src/main/kotlin/org/xyro/kumulus/KumulusTopology.kt @@ -44,6 +44,11 @@ class KumulusTopology( private val atomicThreadsInUse = AtomicInteger(0) private val atomicMaxThreadsInUse = AtomicInteger(0) + @Suppress("UNCHECKED_CAST") + private val lateMessagesStreamsToDrop: Set = config[CONF_LATE_MESSAGES_DROPPING_STREAMS_NAME] as Set? ?: emptySet() + private val lateMessagesShouldDrop: Boolean = config[CONF_LATE_MESSAGES_DROPPING_SHOULD_DROP] as Boolean? ?: false + private val lateMessageMaxWaitInNanos: Long = (config[CONF_LATE_MESSAGES_DROPPING_MAX_WAIT_SECONDS] as Long? ?: 10) * 1_000_000_000L + private val scheduledExecutorPoolSize: Int = (config[CONF_SCHEDULED_EXECUTOR_THREAD_POOL_SIZE] as? Long ?: 5L).toInt() private val rejectedExecutionHandler = RejectedExecutionHandler { _, _ -> @@ -58,6 +63,7 @@ class KumulusTopology( var onBusyBoltHook: ((String, Int, Long, Tuple) -> Unit)? = null var onBoltPrepareFinishHook: ((String, Int, Long) -> Unit)? = null var onReportErrorHook: ((String, Int, Throwable) -> Unit)? = null + var onLateMessageHook: ((String, Int, Long, Tuple) -> Unit)? = null @Suppress("MemberVisibilityCanBePrivate", "unused") val currentThreadsInUse: Int @@ -92,6 +98,9 @@ class KumulusTopology( const val CONF_BOLT_QUEUE_PUSHBACK_WAIT = "kumulus.bolt.pushback.wait" const val CONF_SHUTDOWN_TIMEOUT_SECS = "kumulus.shutdown.timeout.secs" const val CONF_SCHEDULED_EXECUTOR_THREAD_POOL_SIZE = "kumulus.executor.scheduled-executor.pool-size" + const val CONF_LATE_MESSAGES_DROPPING_STREAMS_NAME = "kumulus.late-messages-dropping.streams-name" + const val CONF_LATE_MESSAGES_DROPPING_SHOULD_DROP = "kumulus.late-messages-dropping.should-drop" + const val CONF_LATE_MESSAGES_DROPPING_MAX_WAIT_SECONDS = "kumulus.late-messages-dropping.max-wait-seconds" @Suppress("unused") @Deprecated("Use CONF_READY_POLL_SLEEP instead") @@ -290,18 +299,56 @@ class KumulusTopology( } } else { logger.trace { "Component ${c.componentId}/${c.taskId} is currently busy" } - if (onBusyBoltHook != null && message is ExecuteMessage) { + var shouldEnqueue = true + if (message is ExecuteMessage) { + val messageWaitStartTime = c.waitStart.get() + if (messageWaitStartTime > 0) { + val delay = System.nanoTime() - messageWaitStartTime + if ((delay >= lateMessageMaxWaitInNanos) && + lateMessagesStreamsToDrop.contains(message.tuple.kTuple.sourceStreamId) + ) { + if (!message.isLate.get()) { + message.isLate.set(true) + onLateMessageHook?.let { onLateMessageHook -> + try { + scheduledExecutor.submit { + try { + onLateMessageHook( + message.component.componentId, + message.component.taskId, + delay, + message.tuple.kTuple + ) + } catch (e: Exception) { + logger.error("An exception was thrown from busy-hook callback, ignoring", e) + } + } + } catch (e: Exception) { + logger.error("An exception was thrown by busy-hook thread-pool submission, ignoring", e) + } + } + } + + if (lateMessagesShouldDrop) { + shouldEnqueue = false + } + } + } + c.waitStart.compareAndSet(0, System.nanoTime()) } - if (queuePushbackWait <= 0L) { - boltExecutionPool.enqueue(message) - } else { - scheduledExecutor.schedule( - { - boltExecutionPool.enqueue(message) - }, - queuePushbackWait, TimeUnit.MILLISECONDS - ) + + if (shouldEnqueue) { + if (queuePushbackWait <= 0L) { + boltExecutionPool.enqueue(message) + } else { + scheduledExecutor.schedule( + { + boltExecutionPool.enqueue(message) + }, + queuePushbackWait, TimeUnit.MILLISECONDS + ) + } } } } diff --git a/src/main/kotlin/org/xyro/kumulus/component/KumulusComponent.kt b/src/main/kotlin/org/xyro/kumulus/component/KumulusComponent.kt index 0579d51..b367ed4 100644 --- a/src/main/kotlin/org/xyro/kumulus/component/KumulusComponent.kt +++ b/src/main/kotlin/org/xyro/kumulus/component/KumulusComponent.kt @@ -77,7 +77,7 @@ class SpoutPrepareMessage(component: KumulusComponent, collector: KumulusSpoutCo class BoltPrepareMessage(component: KumulusComponent, collector: KumulusBoltCollector) : PrepareMessage(component, collector) -class ExecuteMessage(component: KumulusComponent, val tuple: KumulusTuple) : +class ExecuteMessage(component: KumulusComponent, val tuple: KumulusTuple, val isLate: AtomicBoolean = AtomicBoolean(false)) : KumulusMessage(component) class AckMessage( diff --git a/src/test/kotlin/org/xyro/kumulus/TestDroppingStaleMessages.kt b/src/test/kotlin/org/xyro/kumulus/TestDroppingStaleMessages.kt new file mode 100644 index 0000000..9fb4fa9 --- /dev/null +++ b/src/test/kotlin/org/xyro/kumulus/TestDroppingStaleMessages.kt @@ -0,0 +1,123 @@ +package org.xyro.kumulus + +import mu.KotlinLogging +import org.apache.storm.Config +import org.apache.storm.spout.SpoutOutputCollector +import org.apache.storm.task.OutputCollector +import org.apache.storm.task.TopologyContext +import org.apache.storm.topology.BasicOutputCollector +import org.apache.storm.topology.IRichBolt +import org.apache.storm.topology.OutputFieldsDeclarer +import org.apache.storm.tuple.Fields +import org.apache.storm.tuple.Tuple +import org.junit.Test +import java.util.concurrent.TimeUnit +import java.util.concurrent.atomic.AtomicLong +import kotlin.test.assertTrue + +class TestDroppingStaleMessages { + @Test + fun testLateBolt() { + val builder = org.apache.storm.topology.TopologyBuilder() + val config: MutableMap = mutableMapOf() + config[Config.TOPOLOGY_MAX_SPOUT_PENDING] = 1L + config[KumulusTopology.CONF_THREAD_POOL_CORE_SIZE] = 5L + config[KumulusTopology.CONF_LATE_MESSAGES_DROPPING_SHOULD_DROP] = true + config[KumulusTopology.CONF_LATE_MESSAGES_DROPPING_STREAMS_NAME] = setOf("default") + config[KumulusTopology.CONF_LATE_MESSAGES_DROPPING_MAX_WAIT_SECONDS] = 1 + + builder.setSpout("spout", LatencyDeltaSpout()) + + builder.setBolt("unanchoring-bolt", UnanchoringBolt()) + .noneGrouping("spout") + + builder.setBolt("delay-unanchored-bolt", StuckBolt()) + .noneGrouping("unanchoring-bolt") + + val stormTopology = builder.createTopology()!! + val kumulusTopology = + KumulusStormTransformer.initializeTopology(stormTopology, config, "test") + + var lateHookCalled = false + + kumulusTopology.onLateMessageHook = { _, _, _, _ -> + lateHookCalled = true + } + + kumulusTopology.prepare(10, TimeUnit.SECONDS) + kumulusTopology.start(block = false) + Thread.sleep(5000) + kumulusTopology.stop() + + logger.info { "Dropped $lateHookCalled messages" } + assertTrue { lateHookCalled } + } + + class LatencyDeltaSpout : DummySpout({ + it.declare(Fields("id")) + }) { + private var index: Int = 0 + private var lastCall: Long? = 0 + + override fun open(conf: MutableMap?, context: TopologyContext?, collector: SpoutOutputCollector?) { + super.open(conf, context, collector) + this.index = 0 + this.lastCall = null + } + + override fun nextTuple() { + val now = System.nanoTime() + if (this.lastCall != null) { + val tookMillis = TimeUnit.NANOSECONDS.toMillis(now - this.lastCall!!) + sumWait.addAndGet(tookMillis) + if (tookMillis > 100) { + logger.error { "Took $tookMillis to nextTuple" } + } + calledCount.incrementAndGet() + } + this.lastCall = now + val messageId = ++index + collector.emit(listOf(messageId), messageId) + } + } + + class UnanchoringBolt : IRichBolt { + private lateinit var collector: OutputCollector + + override fun execute(input: Tuple) { + collector.emit(input.values) + collector.ack(input) + } + + override fun prepare(p0: MutableMap?, p1: TopologyContext?, p2: OutputCollector?) { + this.collector = p2!! + } + + override fun cleanup() = Unit + + override fun getComponentConfiguration(): MutableMap { + return mutableMapOf() + } + + override fun declareOutputFields(p0: OutputFieldsDeclarer) { + p0.declare(Fields("id")) + } + } + + class StuckBolt : DummyBolt({ + it.declare(Fields("id")) + }) { + override fun execute(input: Tuple, collector: BasicOutputCollector) { + logger.info { "StuckBolt: started" } + while (true) { + Thread.sleep(50) + } + } + } + + companion object { + private val logger = KotlinLogging.logger {} + private val sumWait = AtomicLong(0) + private val calledCount = AtomicLong(0) + } +}