From 8e27cbf743b0aa6402a412324baa772830b58233 Mon Sep 17 00:00:00 2001 From: Serhii Sadovskyi Date: Thu, 19 Jun 2025 16:28:17 +0200 Subject: [PATCH 01/10] added: * worker to check fiber timeouts * timeout_after implementation for fiber scheduler --- lib/rage/fiber_scheduler.rb | 78 +++++++++++++++++++++++++++++++++++++ 1 file changed, 78 insertions(+) diff --git a/lib/rage/fiber_scheduler.rb b/lib/rage/fiber_scheduler.rb index abc651ea..e7edb575 100644 --- a/lib/rage/fiber_scheduler.rb +++ b/lib/rage/fiber_scheduler.rb @@ -4,16 +4,29 @@ class Rage::FiberScheduler MAX_READ = 65536 + TIMEOUT_WORKER_INTERVAL = 100 # miliseconds def initialize @root_fiber = Fiber.current @dns_cache = {} + + @alive_fibers = {} + @timeout_mutex = Mutex.new + + start_timeout_worker end def io_wait(io, events, timeout = nil) f = Fiber.current ::Iodine::Scheduler.attach(io.fileno, events, timeout&.ceil) { |err| f.resume(err) } + timeout_deadline = Process.clock_gettime(Process::CLOCK_MONOTONIC) + timeout + @alive_fibers[f.__get_id] = { + fiber: f, + timeout_deadline: timeout_deadline, + exception_class: RageTimeout, + } + err = Fiber.defer(io.fileno) if err == false || (err && err < 0) err @@ -79,6 +92,30 @@ def kernel_sleep(duration = nil) # result # end + def timeout_after(duration, exception_class = Timeout::Error, *exception_arguments, &block) + fiber = Fiber.current + timeout_deadline = Process.clock_gettime(Process::CLOCK_MONOTONIC) + duration + + p "duration #{duration}" + p "fiber id #{fiber.__get_id}" + + @timeout_mutex.synchronize do + @alive_fibers[fiber.__get_id] = { + fiber: fiber, + timeout_deadline: timeout_deadline, + exception_class: exception_class, + exception_arguments: exception_arguments, + } + end + + begin + block.call + ensure + @timeout_mutex.synchronize do + @alive_fibers.delete(fiber.__get_id) + end + end + end def address_resolve(hostname) @dns_cache[hostname] ||= begin @@ -145,4 +182,45 @@ def fiber(&block) def close ::Iodine::Scheduler.close end + + private + + def start_timeout_worker + return unless ::Iodine.running? + + ::Iodine.run_every(Rage::FiberScheduler::TIMEOUT_WORKER_INTERVAL) do + @timeout_mutex.synchronize do + check_timeouts + end + end + end + + def check_timeouts + p @alive_fibers.count + + @alive_fibers.delete_if do |_, fiber_hash| + current_time = Process.clock_gettime(Process::CLOCK_MONOTONIC) + + p current_time + p "deadline #{fiber_hash[:timeout_deadline]}" + p "fiber id #{fiber_hash[:fiber].__get_id}" + + return false if current_time < fiber_hash[:timeout_deadline] + + p 'after' + + fiber = fiber_hash[:fiber] + # unblock(nil, fiber) + + # if fiber.alive? + fiber.raise(RageTimeout) + # else + # fiber.kill + # end + + true + end + end end + +class RageTimeout < StandardError; end \ No newline at end of file From a531c5554feef99dbdd267cd59348f72c744f786 Mon Sep 17 00:00:00 2001 From: Serhii Sadovskyi Date: Wed, 9 Jul 2025 23:20:48 +0200 Subject: [PATCH 02/10] correctly handle nested timeouts --- lib/rage/fiber_scheduler.rb | 82 +++++++++++-------------------------- 1 file changed, 24 insertions(+), 58 deletions(-) diff --git a/lib/rage/fiber_scheduler.rb b/lib/rage/fiber_scheduler.rb index e7edb575..69d3496d 100644 --- a/lib/rage/fiber_scheduler.rb +++ b/lib/rage/fiber_scheduler.rb @@ -10,8 +10,7 @@ def initialize @root_fiber = Fiber.current @dns_cache = {} - @alive_fibers = {} - @timeout_mutex = Mutex.new + @alive_fibers = Hash.new { |h, k| h[k] = {} } start_timeout_worker end @@ -20,13 +19,6 @@ def io_wait(io, events, timeout = nil) f = Fiber.current ::Iodine::Scheduler.attach(io.fileno, events, timeout&.ceil) { |err| f.resume(err) } - timeout_deadline = Process.clock_gettime(Process::CLOCK_MONOTONIC) + timeout - @alive_fibers[f.__get_id] = { - fiber: f, - timeout_deadline: timeout_deadline, - exception_class: RageTimeout, - } - err = Fiber.defer(io.fileno) if err == false || (err && err < 0) err @@ -79,41 +71,21 @@ def kernel_sleep(duration = nil) Fiber.pause if duration.nil? || duration < 1 end - # TODO: GC works a little strange with this closure; - # - # def timeout_after(duration, exception_class = Timeout::Error, *exception_arguments, &block) - # fiber, block_status = Fiber.current, :running - # ::Iodine.run_after((duration * 1000).to_i) do - # fiber.raise(exception_class, exception_arguments) if block_status == :running - # end - - # result = block.call - # block_status = :finished - - # result - # end def timeout_after(duration, exception_class = Timeout::Error, *exception_arguments, &block) fiber = Fiber.current timeout_deadline = Process.clock_gettime(Process::CLOCK_MONOTONIC) + duration - p "duration #{duration}" - p "fiber id #{fiber.__get_id}" - - @timeout_mutex.synchronize do - @alive_fibers[fiber.__get_id] = { - fiber: fiber, - timeout_deadline: timeout_deadline, - exception_class: exception_class, - exception_arguments: exception_arguments, - } - end + @alive_fibers[fiber.__get_id][timeout_deadline] = { + fiber: fiber, + timeout_deadline: timeout_deadline, + exception_class: exception_class, + exception_arguments: exception_arguments + } begin block.call ensure - @timeout_mutex.synchronize do - @alive_fibers.delete(fiber.__get_id) - end + @alive_fibers[fiber.__get_id].delete(timeout_deadline) end end @@ -189,38 +161,32 @@ def start_timeout_worker return unless ::Iodine.running? ::Iodine.run_every(Rage::FiberScheduler::TIMEOUT_WORKER_INTERVAL) do - @timeout_mutex.synchronize do - check_timeouts - end + check_timeouts end end def check_timeouts - p @alive_fibers.count + @alive_fibers.delete_if do |fiber_id, timeouts| + timeouts.delete_if do |timeout_key, fiber_hash| + current_time = Process.clock_gettime(Process::CLOCK_MONOTONIC) - @alive_fibers.delete_if do |_, fiber_hash| - current_time = Process.clock_gettime(Process::CLOCK_MONOTONIC) + return false if current_time < fiber_hash[:timeout_deadline] - p current_time - p "deadline #{fiber_hash[:timeout_deadline]}" - p "fiber id #{fiber_hash[:fiber].__get_id}" + fiber = fiber_hash[:fiber] + unblock(nil, fiber) - return false if current_time < fiber_hash[:timeout_deadline] + if fiber.alive? + fiber.raise(RageTimeout) + else + timeouts.delete(timeout_key) + end - p 'after' - - fiber = fiber_hash[:fiber] - # unblock(nil, fiber) - - # if fiber.alive? - fiber.raise(RageTimeout) - # else - # fiber.kill - # end + true + end - true + timeouts.length == 0 end end end -class RageTimeout < StandardError; end \ No newline at end of file +class RageTimeout < StandardError; end From 3355f18650c1edbe966e4b359ba717638d620918 Mon Sep 17 00:00:00 2001 From: Serhii Sadovskyi Date: Sun, 13 Jul 2025 21:51:22 +0200 Subject: [PATCH 03/10] - mutex to correctly access hash with fiber timeouts (concurrency happens between fiber itself and iodine worker) - kill fiber forcefully if it still alive after error was raised --- lib/rage/fiber_scheduler.rb | 51 ++++++++++++++++++++++--------------- 1 file changed, 31 insertions(+), 20 deletions(-) diff --git a/lib/rage/fiber_scheduler.rb b/lib/rage/fiber_scheduler.rb index 69d3496d..c928e60b 100644 --- a/lib/rage/fiber_scheduler.rb +++ b/lib/rage/fiber_scheduler.rb @@ -11,6 +11,7 @@ def initialize @dns_cache = {} @alive_fibers = Hash.new { |h, k| h[k] = {} } + @fibers_mutex = Mutex.new start_timeout_worker end @@ -75,17 +76,21 @@ def timeout_after(duration, exception_class = Timeout::Error, *exception_argumen fiber = Fiber.current timeout_deadline = Process.clock_gettime(Process::CLOCK_MONOTONIC) + duration - @alive_fibers[fiber.__get_id][timeout_deadline] = { - fiber: fiber, - timeout_deadline: timeout_deadline, - exception_class: exception_class, - exception_arguments: exception_arguments - } + @fibers_mutex.synchronize do + @alive_fibers[fiber.__get_id][timeout_deadline] = { + fiber: fiber, + timeout_deadline: timeout_deadline, + exception_class: exception_class, + exception_arguments: exception_arguments + } + end begin block.call ensure - @alive_fibers[fiber.__get_id].delete(timeout_deadline) + @fibers_mutex.synchronize do + @alive_fibers[fiber.__get_id].delete(timeout_deadline) + end end end @@ -166,25 +171,31 @@ def start_timeout_worker end def check_timeouts - @alive_fibers.delete_if do |fiber_id, timeouts| - timeouts.delete_if do |timeout_key, fiber_hash| - current_time = Process.clock_gettime(Process::CLOCK_MONOTONIC) + @fibers_mutex.synchronize do + @alive_fibers.delete_if do |fiber_id, timeouts| + timeouts.delete_if do |timeout_key, fiber_hash| + current_time = Process.clock_gettime(Process::CLOCK_MONOTONIC) + + next false if current_time < fiber_hash[:timeout_deadline] - return false if current_time < fiber_hash[:timeout_deadline] + fiber = fiber_hash[:fiber] + unblock(nil, fiber) - fiber = fiber_hash[:fiber] - unblock(nil, fiber) + if fiber.alive? + fiber.raise(RageTimeout) - if fiber.alive? - fiber.raise(RageTimeout) - else - timeouts.delete(timeout_key) + ::Iodine.run_after(1000) do + fiber.kill if fiber.alive? + end + else + timeouts.delete(timeout_key) + end + + true end - true + timeouts.length == 0 end - - timeouts.length == 0 end end end From 539c1be068b1e76bc1afa4d6bce68b6be7738135 Mon Sep 17 00:00:00 2001 From: Serhii Sadovskyi Date: Sun, 13 Jul 2025 22:38:53 +0200 Subject: [PATCH 04/10] refactorings --- lib/rage/fiber_scheduler.rb | 19 +++++++++---------- 1 file changed, 9 insertions(+), 10 deletions(-) diff --git a/lib/rage/fiber_scheduler.rb b/lib/rage/fiber_scheduler.rb index c928e60b..4eeb1d38 100644 --- a/lib/rage/fiber_scheduler.rb +++ b/lib/rage/fiber_scheduler.rb @@ -5,6 +5,7 @@ class Rage::FiberScheduler MAX_READ = 65536 TIMEOUT_WORKER_INTERVAL = 100 # miliseconds + FIBER_KILL_DELAY = 500 # miliseconds def initialize @root_fiber = Fiber.current @@ -172,32 +173,30 @@ def start_timeout_worker def check_timeouts @fibers_mutex.synchronize do - @alive_fibers.delete_if do |fiber_id, timeouts| - timeouts.delete_if do |timeout_key, fiber_hash| + @alive_fibers.delete_if do |fiber_id, fiber_timeouts| + fiber_timeouts.delete_if do |timeout_key, fiber_context| current_time = Process.clock_gettime(Process::CLOCK_MONOTONIC) - next false if current_time < fiber_hash[:timeout_deadline] + next false if current_time < fiber_context[:timeout_deadline] - fiber = fiber_hash[:fiber] + fiber = fiber_context[:fiber] unblock(nil, fiber) if fiber.alive? - fiber.raise(RageTimeout) + fiber.raise(fiber_context[:exception_class], *fiber_context[:exception_arguments]) - ::Iodine.run_after(1000) do + ::Iodine.run_after(FIBER_KILL_DELAY) do fiber.kill if fiber.alive? end else - timeouts.delete(timeout_key) + fiber_timeouts.delete(timeout_key) end true end - timeouts.length == 0 + fiber_timeouts.length == 0 end end end end - -class RageTimeout < StandardError; end From 9f661bfd593ad9ded4a40a251c6598fe38bbc90d Mon Sep 17 00:00:00 2001 From: Serhii Sadovskyi Date: Mon, 14 Jul 2025 19:14:03 +0200 Subject: [PATCH 05/10] refactored: - variable names - removed mutex, as was added by mistake - `check_timeouts` now is thinner after unnecessary code deletion --- lib/rage/fiber_scheduler.rb | 54 ++++++++++--------------------------- 1 file changed, 14 insertions(+), 40 deletions(-) diff --git a/lib/rage/fiber_scheduler.rb b/lib/rage/fiber_scheduler.rb index 4eeb1d38..89465ef9 100644 --- a/lib/rage/fiber_scheduler.rb +++ b/lib/rage/fiber_scheduler.rb @@ -5,14 +5,12 @@ class Rage::FiberScheduler MAX_READ = 65536 TIMEOUT_WORKER_INTERVAL = 100 # miliseconds - FIBER_KILL_DELAY = 500 # miliseconds def initialize @root_fiber = Fiber.current @dns_cache = {} - @alive_fibers = Hash.new { |h, k| h[k] = {} } - @fibers_mutex = Mutex.new + @fiber_timeouts = Hash.new { |h, k| h[k] = {} } start_timeout_worker end @@ -75,23 +73,18 @@ def kernel_sleep(duration = nil) def timeout_after(duration, exception_class = Timeout::Error, *exception_arguments, &block) fiber = Fiber.current - timeout_deadline = Process.clock_gettime(Process::CLOCK_MONOTONIC) + duration - - @fibers_mutex.synchronize do - @alive_fibers[fiber.__get_id][timeout_deadline] = { - fiber: fiber, - timeout_deadline: timeout_deadline, - exception_class: exception_class, - exception_arguments: exception_arguments - } - end + timeout = Process.clock_gettime(Process::CLOCK_MONOTONIC) + duration + + @fiber_timeouts[fiber][timeout] = { + exception_class: exception_class, + exception_arguments: exception_arguments + } begin block.call ensure - @fibers_mutex.synchronize do - @alive_fibers[fiber.__get_id].delete(timeout_deadline) - end + @fiber_timeouts[fiber].delete(timeout) + @fiber_timeouts.delete(fiber) if @fiber_timeouts[fiber].empty? end end @@ -164,38 +157,19 @@ def close private def start_timeout_worker - return unless ::Iodine.running? - ::Iodine.run_every(Rage::FiberScheduler::TIMEOUT_WORKER_INTERVAL) do check_timeouts end end def check_timeouts - @fibers_mutex.synchronize do - @alive_fibers.delete_if do |fiber_id, fiber_timeouts| - fiber_timeouts.delete_if do |timeout_key, fiber_context| - current_time = Process.clock_gettime(Process::CLOCK_MONOTONIC) - - next false if current_time < fiber_context[:timeout_deadline] - - fiber = fiber_context[:fiber] - unblock(nil, fiber) - - if fiber.alive? - fiber.raise(fiber_context[:exception_class], *fiber_context[:exception_arguments]) - - ::Iodine.run_after(FIBER_KILL_DELAY) do - fiber.kill if fiber.alive? - end - else - fiber_timeouts.delete(timeout_key) - end + @fiber_timeouts.each_pair do |fiber, timeouts| + timeouts.delete_if do |timeout, context| + next false if Process.clock_gettime(Process::CLOCK_MONOTONIC) < timeout - true - end + fiber.raise(context[:exception_class], *context[:exception_arguments]) if fiber.alive? - fiber_timeouts.length == 0 + true end end end From 2bb212dda82e60dec9e9111a172e6d7f6b02c59e Mon Sep 17 00:00:00 2001 From: Serhii Sadovskyi Date: Mon, 14 Jul 2025 20:33:48 +0200 Subject: [PATCH 06/10] (fix) do not try to resume fiber if it's cannot be resumed --- lib/rage/fiber_scheduler.rb | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/lib/rage/fiber_scheduler.rb b/lib/rage/fiber_scheduler.rb index 89465ef9..ecf427bd 100644 --- a/lib/rage/fiber_scheduler.rb +++ b/lib/rage/fiber_scheduler.rb @@ -17,7 +17,7 @@ def initialize def io_wait(io, events, timeout = nil) f = Fiber.current - ::Iodine::Scheduler.attach(io.fileno, events, timeout&.ceil) { |err| f.resume(err) } + ::Iodine::Scheduler.attach(io.fileno, events, timeout&.ceil) { |err| f.resume(err) if f.alive? } err = Fiber.defer(io.fileno) if err == false || (err && err < 0) @@ -105,7 +105,7 @@ def block(_blocker, timeout = nil) unless fulfilled fulfilled = true ::Iodine.defer { ::Iodine.unsubscribe(channel) } - f.resume + f.resume if f.alive? end end From 213988d9069ab82bc79eba8292bd362cfe02a2e0 Mon Sep 17 00:00:00 2001 From: Serhii Sadovskyi Date: Tue, 15 Jul 2025 17:33:03 +0200 Subject: [PATCH 07/10] more refactoring: * simplified `check_timeouts` method * renamed variables --- lib/rage/fiber_scheduler.rb | 16 +++++++--------- 1 file changed, 7 insertions(+), 9 deletions(-) diff --git a/lib/rage/fiber_scheduler.rb b/lib/rage/fiber_scheduler.rb index ecf427bd..d9a6a425 100644 --- a/lib/rage/fiber_scheduler.rb +++ b/lib/rage/fiber_scheduler.rb @@ -72,10 +72,10 @@ def kernel_sleep(duration = nil) end def timeout_after(duration, exception_class = Timeout::Error, *exception_arguments, &block) - fiber = Fiber.current + f = Fiber.current timeout = Process.clock_gettime(Process::CLOCK_MONOTONIC) + duration - @fiber_timeouts[fiber][timeout] = { + @fiber_timeouts[f][timeout] = { exception_class: exception_class, exception_arguments: exception_arguments } @@ -83,8 +83,8 @@ def timeout_after(duration, exception_class = Timeout::Error, *exception_argumen begin block.call ensure - @fiber_timeouts[fiber].delete(timeout) - @fiber_timeouts.delete(fiber) if @fiber_timeouts[fiber].empty? + @fiber_timeouts[f].delete(timeout) + @fiber_timeouts.delete(f) if @fiber_timeouts[f].empty? end end @@ -163,13 +163,11 @@ def start_timeout_worker end def check_timeouts - @fiber_timeouts.each_pair do |fiber, timeouts| - timeouts.delete_if do |timeout, context| + @fiber_timeouts.each do |fiber, timeouts| + timeouts.each do |timeout, context| next false if Process.clock_gettime(Process::CLOCK_MONOTONIC) < timeout - fiber.raise(context[:exception_class], *context[:exception_arguments]) if fiber.alive? - - true + fiber.raise(context[:exception_class], *context[:exception_arguments]) end end end From 79506f2c446d153c390167c74531719bacff17a0 Mon Sep 17 00:00:00 2001 From: Serhii Sadovskyi Date: Tue, 14 Oct 2025 19:45:59 +0200 Subject: [PATCH 08/10] * unsubscribe from channel after raising exception for fiber * names for channels moved into a separate method --- lib/rage/fiber_scheduler.rb | 3 +++ 1 file changed, 3 insertions(+) diff --git a/lib/rage/fiber_scheduler.rb b/lib/rage/fiber_scheduler.rb index d9a6a425..839581f7 100644 --- a/lib/rage/fiber_scheduler.rb +++ b/lib/rage/fiber_scheduler.rb @@ -168,6 +168,9 @@ def check_timeouts next false if Process.clock_gettime(Process::CLOCK_MONOTONIC) < timeout fiber.raise(context[:exception_class], *context[:exception_arguments]) + + Iodine.unsubscribe(fiber.__block_channel) + Iodine.unsubscribe(fiber.__await_channel) end end end From d0a5f4b6d0b93d78a22907a35f9df60511923284 Mon Sep 17 00:00:00 2001 From: Serhii Sadovskyi Date: Wed, 15 Oct 2025 13:34:31 +0200 Subject: [PATCH 09/10] fix for "RuntimeError (can't add a new key into hash during iteration)" --- lib/rage/fiber_scheduler.rb | 14 +++++++++++--- 1 file changed, 11 insertions(+), 3 deletions(-) diff --git a/lib/rage/fiber_scheduler.rb b/lib/rage/fiber_scheduler.rb index 839581f7..e3c413ae 100644 --- a/lib/rage/fiber_scheduler.rb +++ b/lib/rage/fiber_scheduler.rb @@ -163,15 +163,23 @@ def start_timeout_worker end def check_timeouts + fibers_to_raise = [] + @fiber_timeouts.each do |fiber, timeouts| timeouts.each do |timeout, context| next false if Process.clock_gettime(Process::CLOCK_MONOTONIC) < timeout - fiber.raise(context[:exception_class], *context[:exception_arguments]) + fibers_to_raise << -> do + fiber.raise(context[:exception_class], *context[:exception_arguments]) - Iodine.unsubscribe(fiber.__block_channel) - Iodine.unsubscribe(fiber.__await_channel) + Iodine.unsubscribe(fiber.__block_channel) + Iodine.unsubscribe(fiber.__await_channel) + end end end + + fibers_to_raise.each(&:call) + + fibers_to_raise.clear end end From 86ba8253c862f2d7e5c62e4e8be075710a1bad06 Mon Sep 17 00:00:00 2001 From: Serhii Sadovskyi Date: Fri, 5 Dec 2025 16:31:27 +0100 Subject: [PATCH 10/10] code's refactored in favor of `run_after` method instead of `run_every` --- lib/rage/fiber_scheduler.rb | 25 ++++++++++++++++++++----- 1 file changed, 20 insertions(+), 5 deletions(-) diff --git a/lib/rage/fiber_scheduler.rb b/lib/rage/fiber_scheduler.rb index e3c413ae..271751e0 100644 --- a/lib/rage/fiber_scheduler.rb +++ b/lib/rage/fiber_scheduler.rb @@ -4,15 +4,12 @@ class Rage::FiberScheduler MAX_READ = 65536 - TIMEOUT_WORKER_INTERVAL = 100 # miliseconds def initialize @root_fiber = Fiber.current @dns_cache = {} @fiber_timeouts = Hash.new { |h, k| h[k] = {} } - - start_timeout_worker end def io_wait(io, events, timeout = nil) @@ -80,6 +77,8 @@ def timeout_after(duration, exception_class = Timeout::Error, *exception_argumen exception_arguments: exception_arguments } + schedule_timeout_check + begin block.call ensure @@ -156,9 +155,25 @@ def close private - def start_timeout_worker - ::Iodine.run_every(Rage::FiberScheduler::TIMEOUT_WORKER_INTERVAL) do + def schedule_timeout_check + return if @fiber_timeouts.empty? + + closest_timeout = nil + @fiber_timeouts.each_value do |timeouts| + timeouts.each_key do |timeout| + closest_timeout = timeout if closest_timeout.nil? || timeout < closest_timeout + end + end + + return unless closest_timeout + + now = Process.clock_gettime(Process::CLOCK_MONOTONIC) + delay_ms = ((closest_timeout - now) * 1000).ceil + delay_ms = 0 if delay_ms < 0 + + ::Iodine.run_after(delay_ms) do check_timeouts + schedule_timeout_check end end