Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

<groupId>org.xyro</groupId>
<artifactId>kumulus</artifactId>
<version>0.1.41</version>
<version>0.1.42</version>

<name>kumulus</name>
<description>A lightweight, in-process, drop-in replacement for Apache Storm</description>
Expand Down
68 changes: 58 additions & 10 deletions src/main/kotlin/org/xyro/kumulus/KumulusTopology.kt
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,12 @@ class KumulusTopology(
private val atomicThreadsInUse = AtomicInteger(0)
private val atomicMaxThreadsInUse = AtomicInteger(0)

@Suppress("UNCHECKED_CAST")
private val lateMessagesStreamsToDrop: Set<String> = config[CONF_LATE_MESSAGES_DROPPING_STREAMS_NAME] as? Set<String> ?: emptySet()
private val lateMessagesShouldDrop: Boolean = config[CONF_LATE_MESSAGES_DROPPING_SHOULD_DROP] as? Boolean ?: false
private val lateMessageMaxWaitInNanos: Long = (config[CONF_LATE_MESSAGES_DROPPING_MAX_WAIT_SECONDS] as? Long ?: 10) * 1_000_000_000L


private val scheduledExecutorPoolSize: Int =
(config[CONF_SCHEDULED_EXECUTOR_THREAD_POOL_SIZE] as? Long ?: 5L).toInt()
private val rejectedExecutionHandler = RejectedExecutionHandler { _, _ ->
Expand All @@ -58,6 +64,7 @@ class KumulusTopology(
var onBusyBoltHook: ((String, Int, Long, Tuple) -> Unit)? = null
var onBoltPrepareFinishHook: ((String, Int, Long) -> Unit)? = null
var onReportErrorHook: ((String, Int, Throwable) -> Unit)? = null
var onLateMessageHook: ((String, Int, Long, Tuple) -> Unit)? = null

@Suppress("MemberVisibilityCanBePrivate", "unused")
val currentThreadsInUse: Int
Expand Down Expand Up @@ -92,6 +99,9 @@ class KumulusTopology(
const val CONF_BOLT_QUEUE_PUSHBACK_WAIT = "kumulus.bolt.pushback.wait"
const val CONF_SHUTDOWN_TIMEOUT_SECS = "kumulus.shutdown.timeout.secs"
const val CONF_SCHEDULED_EXECUTOR_THREAD_POOL_SIZE = "kumulus.executor.scheduled-executor.pool-size"
const val CONF_LATE_MESSAGES_DROPPING_STREAMS_NAME = "kumulus.late-messages-dropping.streams-name"
const val CONF_LATE_MESSAGES_DROPPING_SHOULD_DROP = "kumulus.late-messages-dropping.should-drop"
const val CONF_LATE_MESSAGES_DROPPING_MAX_WAIT_SECONDS = "kumulus.late-messages-dropping.max-wait-seconds"

@Suppress("unused")
@Deprecated("Use CONF_READY_POLL_SLEEP instead")
Expand Down Expand Up @@ -290,18 +300,56 @@ class KumulusTopology(
}
} else {
logger.trace { "Component ${c.componentId}/${c.taskId} is currently busy" }
if (onBusyBoltHook != null && message is ExecuteMessage) {
var shouldEnqueue = true
if (message is ExecuteMessage) {
val messageWaitStartTime = c.waitStart.get()
if (messageWaitStartTime > 0){
val delay = System.nanoTime() - messageWaitStartTime
if ((delay >= lateMessageMaxWaitInNanos) &&
lateMessagesStreamsToDrop.contains(message.tuple.kTuple.sourceStreamId)) {
if (!message.isLate.get()){
onLateMessageHook?.let { onLateMessageHook ->
try {
scheduledExecutor.submit {
try {
onLateMessageHook(
message.component.componentId,
message.component.taskId,
delay,
message.tuple.kTuple
)
} catch (e: Exception) {
logger.error("An exception was thrown from busy-hook callback, ignoring", e)
}
}
} catch (e: Exception) {
logger.error("An exception was thrown by busy-hook thread-pool submission, ignoring", e)
}
}

message.isLate.set(true)
}

if (lateMessagesShouldDrop){
shouldEnqueue = false
}
}
}

c.waitStart.compareAndSet(0, System.nanoTime())
}
if (queuePushbackWait <= 0L) {
boltExecutionPool.enqueue(message)
} else {
scheduledExecutor.schedule(
{
boltExecutionPool.enqueue(message)
},
queuePushbackWait, TimeUnit.MILLISECONDS
)

if (shouldEnqueue){
if (queuePushbackWait <= 0L) {
boltExecutionPool.enqueue(message)
} else {
scheduledExecutor.schedule(
{
boltExecutionPool.enqueue(message)
},
queuePushbackWait, TimeUnit.MILLISECONDS
)
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ class SpoutPrepareMessage(component: KumulusComponent, collector: KumulusSpoutCo
class BoltPrepareMessage(component: KumulusComponent, collector: KumulusBoltCollector) :
PrepareMessage<KumulusBolt>(component, collector)

class ExecuteMessage(component: KumulusComponent, val tuple: KumulusTuple) :
class ExecuteMessage(component: KumulusComponent, val tuple: KumulusTuple, val isLate: AtomicBoolean = AtomicBoolean(false)) :
KumulusMessage(component)

class AckMessage(
Expand Down
124 changes: 124 additions & 0 deletions src/test/kotlin/org/xyro/kumulus/TestDroppingStaleMessages.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
package org.xyro.kumulus

import mu.KotlinLogging
import org.apache.storm.Config
import org.apache.storm.spout.SpoutOutputCollector
import org.apache.storm.task.OutputCollector
import org.apache.storm.task.TopologyContext
import org.apache.storm.topology.BasicOutputCollector
import org.apache.storm.topology.IRichBolt
import org.apache.storm.topology.OutputFieldsDeclarer
import org.apache.storm.tuple.Fields
import org.apache.storm.tuple.Tuple
import org.junit.Test
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicLong
import kotlin.test.assertTrue

class TestDroppingStaleMessages {
@Test
fun testLateBolt() {
val builder = org.apache.storm.topology.TopologyBuilder()
val config: MutableMap<String, Any> = mutableMapOf()
config[Config.TOPOLOGY_MAX_SPOUT_PENDING] = 1L
config[KumulusTopology.CONF_THREAD_POOL_CORE_SIZE] = 5L
config[KumulusTopology.CONF_LATE_MESSAGES_DROPPING_SHOULD_DROP] = true;
config[KumulusTopology.CONF_LATE_MESSAGES_DROPPING_STREAMS_NAME] = setOf("default");
config[KumulusTopology.CONF_LATE_MESSAGES_DROPPING_MAX_WAIT_SECONDS] = 1;

builder.setSpout("spout", LatencyDeltaSpout())

builder.setBolt("unanchoring-bolt", UnanchoringBolt())
.noneGrouping("spout")

builder.setBolt("delay-unanchored-bolt", StuckBolt())
.noneGrouping("unanchoring-bolt")


val stormTopology = builder.createTopology()!!
val kumulusTopology =
KumulusStormTransformer.initializeTopology(stormTopology, config, "test")

var lateHookCalled = false

kumulusTopology.onLateMessageHook = { _, _, _, _ ->
lateHookCalled = true
}

kumulusTopology.prepare(10, TimeUnit.SECONDS)
kumulusTopology.start(block = false)
Thread.sleep(5000)
kumulusTopology.stop()

logger.info { "Dropped ${lateHookCalled} messages" }
assertTrue { lateHookCalled}
}

class LatencyDeltaSpout : DummySpout({
it.declare(Fields("id"))
}) {
private var index: Int = 0
private var lastCall: Long? = 0

override fun open(conf: MutableMap<Any?, Any?>?, context: TopologyContext?, collector: SpoutOutputCollector?) {
super.open(conf, context, collector)
this.index = 0
this.lastCall = null
}

override fun nextTuple() {
val now = System.nanoTime()
if (this.lastCall != null) {
val tookMillis = TimeUnit.NANOSECONDS.toMillis(now - this.lastCall!!)
sumWait.addAndGet(tookMillis)
if (tookMillis > 100) {
logger.error { "Took $tookMillis to nextTuple" }
}
calledCount.incrementAndGet()
}
this.lastCall = now
val messageId = ++index
collector.emit(listOf(messageId), messageId)
}
}

class UnanchoringBolt : IRichBolt {
private lateinit var collector: OutputCollector

override fun execute(input: Tuple) {
collector.emit(input.values)
collector.ack(input)
}

override fun prepare(p0: MutableMap<Any?, Any?>?, p1: TopologyContext?, p2: OutputCollector?) {
this.collector = p2!!
}

override fun cleanup() = Unit

override fun getComponentConfiguration(): MutableMap<String, Any> {
return mutableMapOf()
}

override fun declareOutputFields(p0: OutputFieldsDeclarer) {
p0.declare(Fields("id"))
}
}

class StuckBolt : DummyBolt({
it.declare(Fields("id"))
}) {
override fun execute(input: Tuple, collector: BasicOutputCollector) {
logger.info { "StuckBolt: started" }
while (true){
Thread.sleep(50)
}
}
}

companion object {
private val logger = KotlinLogging.logger {}
private val sumWait = AtomicLong(0)
private val calledCount = AtomicLong(0)
}
}
Loading