Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 14 additions & 14 deletions lib/connection_pool.rb
Original file line number Diff line number Diff line change
Expand Up @@ -39,30 +39,30 @@ class TimeoutError < ::Timeout::Error; end
# - :auto_reload_after_fork - automatically drop all connections after fork, defaults to true
#
class ConnectionPool
def self.wrap(**, &)
Wrapper.new(**, &)
def self.wrap(**kwargs, &block)
Wrapper.new(**kwargs, &block)
end

attr_reader :size

def initialize(timeout: 5, size: 5, auto_reload_after_fork: true, name: nil, &)
def initialize(timeout: 5, size: 5, auto_reload_after_fork: true, name: nil, &block)
raise ArgumentError, "Connection pool requires a block" unless block_given?

@size = Integer(size)
@timeout = Float(timeout)
@available = TimedStack.new(size: @size, &)
@available = TimedStack.new(size: @size, &block)
@key = :"pool-#{@available.object_id}"
@key_count = :"pool-#{@available.object_id}-count"
@discard_key = :"pool-#{@available.object_id}-discard"
INSTANCES[self] = self if auto_reload_after_fork && INSTANCES
end

def with(**)
def with(**kwargs)
# We need to manage exception handling manually here in order
# to work correctly with `Timeout.timeout` and `Thread#raise`.
# Otherwise an interrupted Thread can leak connections.
Thread.handle_interrupt(Exception => :never) do
conn = checkout(**)
conn = checkout(**kwargs)
begin
Thread.handle_interrupt(Exception => :immediate) do
yield conn
Expand Down Expand Up @@ -108,12 +108,12 @@ def discard_current_connection(&block)
::Thread.current[@discard_key] = block || proc { |conn| conn }
end

def checkout(timeout: @timeout, **)
def checkout(timeout: @timeout, **kwargs)
if ::Thread.current[@key]
::Thread.current[@key_count] += 1
::Thread.current[@key]
else
conn = @available.pop(timeout:, **)
conn = @available.pop(timeout:, **kwargs)
::Thread.current[@key] = conn
::Thread.current[@key_count] = 1
conn
Expand Down Expand Up @@ -151,22 +151,22 @@ def checkin(force: false)
# Shuts down the ConnectionPool by passing each connection to +block+ and
# then removing it from the pool. Attempting to checkout a connection after
# shutdown will raise +ConnectionPool::PoolShuttingDownError+.
def shutdown(&)
@available.shutdown(&)
def shutdown(&block)
@available.shutdown(&block)
end

##
# Reloads the ConnectionPool by passing each connection to +block+ and then
# removing it the pool. Subsequent checkouts will create new connections as
# needed.
def reload(&)
@available.shutdown(reload: true, &)
def reload(&block)
@available.shutdown(reload: true, &block)
end

## Reaps idle connections that have been idle for over +idle_seconds+.
# +idle_seconds+ defaults to 60.
def reap(idle_seconds: 60, &)
@available.reap(idle_seconds:, &)
def reap(idle_seconds: 60, &block)
@available.reap(idle_seconds:, &block)
end

# Number of pool entries available for checkout at this instant.
Expand Down
26 changes: 13 additions & 13 deletions lib/connection_pool/timed_stack.rb
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,13 @@ def initialize(size: 0, &block)
##
# Returns +obj+ to the stack. Additional kwargs are ignored in TimedStack but may be
# used by subclasses that extend TimedStack.
def push(obj, **)
def push(obj, **kwargs)
@mutex.synchronize do
if @shutdown_block
@created -= 1 unless @created == 0
@shutdown_block.call(obj)
else
store_connection obj, **
store_connection obj, **kwargs
end

@resource.broadcast
Expand All @@ -59,16 +59,16 @@ def push(obj, **)
# if an entry was not available within the timeout period. Use `exception: false` to return nil.
#
# Other options may be used by subclasses that extend TimedStack.
def pop(timeout: 0.5, exception: ConnectionPool::TimeoutError, **)
def pop(timeout: 0.5, exception: ConnectionPool::TimeoutError, **kwargs)
deadline = current_time + timeout
@mutex.synchronize do
loop do
raise ConnectionPool::PoolShuttingDownError if @shutdown_block
if (conn = try_fetch_connection(**))
if (conn = try_fetch_connection(**kwargs))
return conn
end

connection = try_create(**)
connection = try_create(**kwargs)
return connection if connection

to_wait = deadline - current_time
Expand Down Expand Up @@ -156,32 +156,32 @@ def current_time
# This method must returns a connection from the stack if one exists. Allows
# subclasses with expensive match/search algorithms to avoid double-handling
# their stack.
def try_fetch_connection(**)
connection_stored?(**) && fetch_connection(**)
def try_fetch_connection(**kwargs)
connection_stored?(**kwargs) && fetch_connection(**kwargs)
end

##
# This is an extension point for TimedStack and is called with a mutex.
#
# This method must returns true if a connection is available on the stack.
def connection_stored?(**)
def connection_stored?(**_kwargs)
!@que.empty?
end

##
# This is an extension point for TimedStack and is called with a mutex.
#
# This method must return a connection from the stack.
def fetch_connection(**)
def fetch_connection(**_kwargs)
@que.pop&.first
end

##
# This is an extension point for TimedStack and is called with a mutex.
#
# This method must shut down all connections on the stack.
def shutdown_connections(**)
while (conn = try_fetch_connection(**))
def shutdown_connections(**kwargs)
while (conn = try_fetch_connection(**kwargs))
@created -= 1 unless @created == 0
@shutdown_block.call(conn)
end
Expand Down Expand Up @@ -217,7 +217,7 @@ def idle_connections?(idle_seconds)
# This is an extension point for TimedStack and is called with a mutex.
#
# This method must return +obj+ to the stack.
def store_connection(obj, **)
def store_connection(obj, **_kwargs)
@que.push [obj, current_time]
end

Expand All @@ -226,7 +226,7 @@ def store_connection(obj, **)
#
# This method must create a connection if and only if the total number of
# connections allowed has not been met.
def try_create(**)
def try_create(**_kwargs)
unless @created == @max
object = @create_block.call
@created += 1
Expand Down
24 changes: 12 additions & 12 deletions lib/connection_pool/wrapper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,20 @@ class ConnectionPool
class Wrapper < ::BasicObject
METHODS = [:with, :pool_shutdown, :wrapped_pool]

def initialize(**options, &)
@pool = options.fetch(:pool) { ::ConnectionPool.new(**options, &) }
def initialize(**options, &block)
@pool = options.fetch(:pool) { ::ConnectionPool.new(**options, &block) }
end

def wrapped_pool
@pool
end

def with(**, &)
@pool.with(**, &)
def with(**kwargs, &block)
@pool.with(**kwargs, &block)
end

def pool_shutdown(&)
@pool.shutdown(&)
def pool_shutdown(&block)
@pool.shutdown(&block)
end

def pool_size
Expand All @@ -26,17 +26,17 @@ def pool_available
@pool.available
end

def respond_to?(id, *, **)
METHODS.include?(id) || with { |c| c.respond_to?(id, *, **) }
def respond_to?(id, *args, **kwargs)
METHODS.include?(id) || with { |c| c.respond_to?(id, *args, **kwargs) }
end

def respond_to_missing?(id, *, **)
with { |c| c.respond_to?(id, *, **) }
def respond_to_missing?(id, *args, **kwargs)
with { |c| c.respond_to?(id, *args, **kwargs) }
end

def method_missing(name, *, **, &)
def method_missing(name, *args, **kwargs, &block)
with do |connection|
connection.send(name, *, **, &)
connection.send(name, *args, **kwargs, &block)
end
end
end
Expand Down