Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 1 addition & 29 deletions bin/disc
Original file line number Diff line number Diff line change
Expand Up @@ -6,32 +6,4 @@ require 'clap'
Clap.run ARGV,
"-r" => lambda { |file| require file }

if defined?(Celluloid)
$stdout.puts(
"[Notice] Disc running in celluloid mode! Current DISC_CONCURRENCY is\
#{ Integer(ENV.fetch('DISC_CONCURRENCY', '25')) }."
)

Disc::Worker.send(:include, Celluloid)

if defined?(Celluloid::SupervisionGroup)
# Deprecated as of Celluloid 0.17, but still supported via "backported mode"
class Disc::WorkerGroup < Celluloid::SupervisionGroup
pool Disc::Worker,
size: Integer(ENV.fetch('DISC_CONCURRENCY', '25')),
as: :worker_pool,
args: [{ run: true }]
end

Disc::WorkerGroup.run
else
Disc::Worker.pool(
size: Integer(ENV.fetch('DISC_CONCURRENCY', '25')),
args: [{ run: true }]
)
end
else
$stdout.puts("[Notice] Disc running in non-threaded mode")
Disc::Worker.run
end

Disc::Worker.run
57 changes: 23 additions & 34 deletions lib/disc.rb
Original file line number Diff line number Diff line change
@@ -1,12 +1,18 @@
require 'date'
require 'disque'
require 'msgpack'
require 'connection_pool'

require_relative 'disc/version'

class Disc
attr_reader :disque,
:disque_timeout
def self.pool
@pool ||= begin
concurrency = Integer(ENV.fetch('DISC_CONCURRENCY', 25))
tolerance = Integer(ENV.fetch('DISC_TOLERANCE', disque_timeout / 1000.0))
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The default timeout for each thread would be the same as for disque to consider a job was not finished. Maybe this should be lower? Dunno, seemed like a decent default.

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Decent defaults are all we can hope for, if a reason to change it presents itself then we'll reevaluate, for now that's good enough :)

ConnectionPool.new(size: concurrency, timeout: tolerance) { true }
end
end

def self.disque
@disque ||= Disque.new(
Expand All @@ -21,7 +27,7 @@ def self.disque=(disque)
end

def self.disque_timeout
@disque_timeout ||= 100
@disque_timeout ||= Integer(ENV.fetch('DISQUE_TIMEOUT', 2000))
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not entirely sure why Disc.disque_timeout had a default different from DISQUE_TIMEOUT. Was this on purpose?

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably not.

end

def self.disque_timeout=(timeout)
Expand All @@ -44,39 +50,25 @@ def self.run

def initialize(options = {})
@disque = options.fetch(:disque, Disc.disque)
@queues = options.fetch(
:queues,
ENV.fetch('QUEUES', 'default')
).split(',')
@count = Integer(
options.fetch(
:count,
ENV.fetch('DISQUE_COUNT', '1')
)
)
@timeout = Integer(
options.fetch(
:timeout,
ENV.fetch('DISQUE_TIMEOUT', '2000')
)
)

self.run if options[:run]
self
@queues = options.fetch(:queues, ENV.fetch('QUEUES', 'default')).split(',')
@count = Integer(options.fetch(:count, ENV.fetch('DISQUE_COUNT', '1')))
@timeout = Integer(options.fetch(:timeout, Disc.disque_timeout))
end

def run
$stdout.puts("Disc::Worker listening in #{queues}")
loop do
jobs = disque.fetch(from: queues, timeout: timeout, count: count)
Array(jobs).each do |queue, msgid, serialized_job|
begin
job = MessagePack.unpack(serialized_job)
instance = Object.const_get(job['class']).new
instance.perform(*job['arguments'])
disque.call('ACKJOB', msgid)
rescue => err
Disc.on_error(err, job.update('id' => msgid, 'queue' => queue))
Disc.pool.with do
begin
job = MessagePack.unpack(serialized_job)
instance = Object.const_get(job['class']).new
instance.perform(*job['arguments'])
disque.call('ACKJOB', msgid)
rescue => err
Disc.on_error(err, job.update('id' => msgid, 'queue' => queue))
end
end
end
end
Expand All @@ -94,7 +86,7 @@ def self.included(base)

module ClassMethods
def disque
defined?(@disque) ? @disque : Disc.disque
@disque ||= Disc.disque
end

def disque=(disque)
Expand All @@ -116,10 +108,7 @@ def queue
def enqueue(args = [], at: nil, queue: nil)
disque.push(
queue || self.queue,
{
class: self.name,
arguments: Array(args)
}.to_msgpack,
{ class: self.name, arguments: Array(args) }.to_msgpack,
Disc.disque_timeout,
delay: at.nil? ? nil : (at.to_time.to_i - DateTime.now.to_time.to_i)
)
Expand Down