From 3d51b4371aa5e49141ed7f510731adcf3164dbc5 Mon Sep 17 00:00:00 2001 From: Nicolas Sanguinetti Date: Sat, 8 Aug 2015 00:43:52 +0200 Subject: [PATCH] Use connection pool as a dumb thread pool Instead of depending on Celluloid, this would _always_ make the workers threaded (set DISC_CONCURRENCY=1 if you want a single thread...) Having a single code path is good, though. It means it's simpler to test features / bugfixes as we don't need to run every test by duplicte on threaded/nonthreaded mode. Oh, and speaking about tests, did I mention this is just a WIP? :P --- bin/disc | 30 +--------------------------- lib/disc.rb | 57 +++++++++++++++++++++-------------------------------- 2 files changed, 24 insertions(+), 63 deletions(-) diff --git a/bin/disc b/bin/disc index d9e49ef..98a5236 100755 --- a/bin/disc +++ b/bin/disc @@ -6,32 +6,4 @@ require 'clap' Clap.run ARGV, "-r" => lambda { |file| require file } -if defined?(Celluloid) - $stdout.puts( - "[Notice] Disc running in celluloid mode! Current DISC_CONCURRENCY is\ - #{ Integer(ENV.fetch('DISC_CONCURRENCY', '25')) }." - ) - - Disc::Worker.send(:include, Celluloid) - - if defined?(Celluloid::SupervisionGroup) - # Deprecated as of Celluloid 0.17, but still supported via "backported mode" - class Disc::WorkerGroup < Celluloid::SupervisionGroup - pool Disc::Worker, - size: Integer(ENV.fetch('DISC_CONCURRENCY', '25')), - as: :worker_pool, - args: [{ run: true }] - end - - Disc::WorkerGroup.run - else - Disc::Worker.pool( - size: Integer(ENV.fetch('DISC_CONCURRENCY', '25')), - args: [{ run: true }] - ) - end -else - $stdout.puts("[Notice] Disc running in non-threaded mode") - Disc::Worker.run -end - +Disc::Worker.run diff --git a/lib/disc.rb b/lib/disc.rb index 2c2bd3a..b447df3 100644 --- a/lib/disc.rb +++ b/lib/disc.rb @@ -1,12 +1,18 @@ require 'date' require 'disque' require 'msgpack' +require 'connection_pool' require_relative 'disc/version' class Disc - attr_reader :disque, - :disque_timeout + def self.pool + @pool ||= begin + concurrency = Integer(ENV.fetch('DISC_CONCURRENCY', 25)) + tolerance = Integer(ENV.fetch('DISC_TOLERANCE', disque_timeout / 1000.0)) + ConnectionPool.new(size: concurrency, timeout: tolerance) { true } + end + end def self.disque @disque ||= Disque.new( @@ -21,7 +27,7 @@ def self.disque=(disque) end def self.disque_timeout - @disque_timeout ||= 100 + @disque_timeout ||= Integer(ENV.fetch('DISQUE_TIMEOUT', 2000)) end def self.disque_timeout=(timeout) @@ -44,25 +50,9 @@ def self.run def initialize(options = {}) @disque = options.fetch(:disque, Disc.disque) - @queues = options.fetch( - :queues, - ENV.fetch('QUEUES', 'default') - ).split(',') - @count = Integer( - options.fetch( - :count, - ENV.fetch('DISQUE_COUNT', '1') - ) - ) - @timeout = Integer( - options.fetch( - :timeout, - ENV.fetch('DISQUE_TIMEOUT', '2000') - ) - ) - - self.run if options[:run] - self + @queues = options.fetch(:queues, ENV.fetch('QUEUES', 'default')).split(',') + @count = Integer(options.fetch(:count, ENV.fetch('DISQUE_COUNT', '1'))) + @timeout = Integer(options.fetch(:timeout, Disc.disque_timeout)) end def run @@ -70,13 +60,15 @@ def run loop do jobs = disque.fetch(from: queues, timeout: timeout, count: count) Array(jobs).each do |queue, msgid, serialized_job| - begin - job = MessagePack.unpack(serialized_job) - instance = Object.const_get(job['class']).new - instance.perform(*job['arguments']) - disque.call('ACKJOB', msgid) - rescue => err - Disc.on_error(err, job.update('id' => msgid, 'queue' => queue)) + Disc.pool.with do + begin + job = MessagePack.unpack(serialized_job) + instance = Object.const_get(job['class']).new + instance.perform(*job['arguments']) + disque.call('ACKJOB', msgid) + rescue => err + Disc.on_error(err, job.update('id' => msgid, 'queue' => queue)) + end end end end @@ -94,7 +86,7 @@ def self.included(base) module ClassMethods def disque - defined?(@disque) ? @disque : Disc.disque + @disque ||= Disc.disque end def disque=(disque) @@ -116,10 +108,7 @@ def queue def enqueue(args = [], at: nil, queue: nil) disque.push( queue || self.queue, - { - class: self.name, - arguments: Array(args) - }.to_msgpack, + { class: self.name, arguments: Array(args) }.to_msgpack, Disc.disque_timeout, delay: at.nil? ? nil : (at.to_time.to_i - DateTime.now.to_time.to_i) )