From adc5b9c9a49300aac325ed2d9735e1e550a24e99 Mon Sep 17 00:00:00 2001 From: Edouard127 <46357922+Edouard127@users.noreply.github.com> Date: Sun, 12 Jan 2025 17:18:46 -0500 Subject: [PATCH] feat: timer --- .../main/kotlin/com/lambda/event/EventFlow.kt | 32 +++++++++++++++++-- 1 file changed, 29 insertions(+), 3 deletions(-) diff --git a/common/src/main/kotlin/com/lambda/event/EventFlow.kt b/common/src/main/kotlin/com/lambda/event/EventFlow.kt index 894b6795c..bd0c4ef94 100644 --- a/common/src/main/kotlin/com/lambda/event/EventFlow.kt +++ b/common/src/main/kotlin/com/lambda/event/EventFlow.kt @@ -94,6 +94,32 @@ object EventFlow { } } + /** + * Registers a timer that invokes the specified block at regular intervals + * + * This function runs a timer that emits a signal at the specified interval, and invokes the + * provided block each time the signal is emitted. The interval is specified in milliseconds, + * and the block is a suspend function that is called each time the timer triggers + * + * **Note**: This function uses a flow to emit signals at the specified interval, and uses `conflate()` + * to prevent overloading the block execution in case the block takes longer than the interval + * + * @param interval The time interval (in milliseconds) between each invocation of the block + * @param block The suspend function that will be called each time the timer triggers + * + * @return A [Job] representing the running timer. The timer can be canceled by invoking `cancel()` on the returned job + */ + inline fun timer(interval: Long, crossinline block: suspend () -> Unit) = + runConcurrent { + // This allows for 18,446,744,073,709,551,615 events + // Don't worry, a 10 ms timer will end after 5.849 billion years + (Long.MAX_VALUE downTo Long.MIN_VALUE) + .asFlow() + .onEach { delay(interval) } + .conflate() + .collect { block() } + } + /** * Suspends until an event of type [E] is received that satisfies the given [predicate]. * @@ -123,7 +149,7 @@ object EventFlow { noinline predicate: (E) -> Boolean = { true }, ) = runBlocking { withTimeout(timeout) { - concurrentFlow.filterIsInstance().first(predicate) + this@EventFlow.concurrentFlow.filterIsInstance().first(predicate) } } @@ -150,7 +176,7 @@ object EventFlow { suspend inline fun collectEvents( crossinline predicate: (E) -> Boolean = { true }, ): Flow = flow { - concurrentFlow + this@EventFlow.concurrentFlow .filterIsInstance() .filter { predicate(it) } .collect { @@ -175,7 +201,7 @@ object EventFlow { */ @JvmStatic fun E.post(): E { - concurrentFlow.tryEmit(this) + this@EventFlow.concurrentFlow.tryEmit(this) executeListenerSynchronous() return this@post }