diff --git a/HISTORY.md b/HISTORY.md index 17e57a6eb..15fbfa541 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -1,3 +1,26 @@ +## 1.11.0 (2010-08-23) + +* Web UI: Group /workers page by hostnames + +## 1.10.0 (2010-08-23) + +* Support redis:// string format in `Resque.redis=` +* Using new cross-platform JSON gem. +* Added `after_enqueue` plugin hook. +* Added `shutdown?` method which can be overridden. +* Added support for the "leftright" gem when running tests. +* Grammarfix: In the README + +## 1.9.10 (2010-08-06) + +* Bugfix: before_fork should get passed the job + +## 1.9.9 (2010-07-26) + +* Depend on redis-namespace 0.8.0 +* Depend on json_pure instead of json (for JRuby compat) +* Bugfix: rails_env display in stats view + ## 1.9.8 (2010-07-20) * Bugfix: Worker.all should never return nil diff --git a/README.markdown b/README.markdown index 1c823130b..d2b20b403 100644 --- a/README.markdown +++ b/README.markdown @@ -9,8 +9,8 @@ Background jobs can be any Ruby class or module that responds to jobs or you can create new classes specifically to do work. Or, you can do both. -Resque is heavily inspired by DelayedJob (which rocks) and is -comprised of three parts: +Resque is heavily inspired by DelayedJob (which rocks) and comprises +three parts: 1. A Ruby library for creating, querying, and processing jobs 2. A Rake task for starting a worker which processes jobs @@ -30,6 +30,67 @@ not doing, what queues you're using, what's in those queues, provides general usage stats, and helps you track failures. +What did you guys do to Resque? +=============================== + +This version of Resque uses mongo as the back end. It also has some neat +new features that you may enjoy + +Features +-------- + +It stores each queue in its own collection. + +For testing purposes. Resque.bypass_queues will call your job's +perform method AT ONCE, and not touch the queue at all. This can be +useful when writing unit tests against things that would normally +happen partially in your request and partially in a worker. + +If your job class has a variable called @unique_jobs = true, you +can queue unique jobs - jobs that must only be processed once. In +order to take advantage of this feature, the arguments to your enqueue +call must include a hash containing a key called _id in the first +position. + +If your job class indicates that @delayed_jobs = true, you can queue +delayed jobs. These jobs will not be popped off the queue until the +Time indicated in arg[0][:delay_until] has come. Note that you must +call Resque.enable_delay(:queue) before enququing any delayed jobs, to +ensure that the performance impact on other queues is minimal. + +Stern Warnings +-------------- + +Sometimes, Resque-Mongo will drop a queue collection, or create some +indexes, or otherwise manipulate its database. For this reason, it is +STRONGLY recommended that you give it its own database in mongo. + +All jobs should be queued via Resque.enqueue. All arguments passed to +this method must be BSON-encodable. Resque-Mongo does not serialize +your objects for you. Arrays, Hashes, Strings, Numbers, and Times +are all ok, so don't worry. + +Many of the new queue-level features require the first argument of +your perform method to be an options hash. In fact, if you just start +making all your perform()s take one param, that is an options hash, +you'll probably save yourself some pain. + +Resque-Mongo will not create any indexes on your queues, only on its +meta-data. You will need to create any indexes you want. Normally, +This is not a problem, because you aren't querying by keys, but you may +want to create indexes on the class key in some circumstances. If you +use the unique or delay features, you may want some additional indexes, +depending on the nature of your workload. Paranoid? Test enqueuing and +processing all your jobs, and run with --notablescans. Learn the profiler, +and use it often. + +Specifically, a queue with many long-delayed jobs will result in slower queue pops +for all jobs using that queue. Index delay_until in the case of +thousands of delayed jobs. + +Back to the original README +=========================== + The Blog Post ------------- @@ -764,7 +825,7 @@ To join the list simply send an email to . This will subscribe you and send you information about your subscription, including unsubscribe information. -The archive can be found at . +The archive can be found at . Meta diff --git a/Rakefile b/Rakefile index 1d4f22433..cf8a74c17 100644 --- a/Rakefile +++ b/Rakefile @@ -1,8 +1,6 @@ # # Setup # - -load 'tasks/redis.rake' require 'rake/testtask' $LOAD_PATH.unshift 'lib' @@ -23,7 +21,8 @@ desc "Run the test suite" task :test do rg = command?(:rg) Dir['test/**/*_test.rb'].each do |f| - rg ? sh("rg #{f}") : ruby(f) + #rg ? sh("rg #{f}") : ruby(f) + ruby(f) end end @@ -40,7 +39,7 @@ end # Install # -task :install => [ 'redis:install', 'dtach:install' ] +task :install => [ 'dtach:install' ] # diff --git a/bin/resque b/bin/resque index 18008db97..0f36f39eb 100755 --- a/bin/resque +++ b/bin/resque @@ -1,6 +1,7 @@ #!/usr/bin/env ruby $LOAD_PATH.unshift File.dirname(__FILE__) + '/../lib' +require 'rubygems' require 'resque' def kill(worker) @@ -34,6 +35,12 @@ def list end end +def queues + Resque.queues.each do |queue| + puts"Queue #{queue}: #{Resque.size(queue)}" + end +end + if (i = ARGV.index('-r')) && ARGV[i+1] Resque.redis = ARGV[i+1] ARGV.delete_at(i) @@ -47,6 +54,8 @@ when 'remove' remove ARGV[1] when 'list' list +when 'queues' + queues else puts "Usage: resque [-r redis_host:redis_port] COMMAND [option]" puts @@ -54,4 +63,5 @@ else puts " remove WORKER Removes a worker" puts " kill WORKER Kills a worker" puts " list Lists known workers" + puts " queues checks queue depth" end diff --git a/bin/resque-web b/bin/resque-web index c2b8832e6..737f22679 100755 --- a/bin/resque-web +++ b/bin/resque-web @@ -16,8 +16,8 @@ Vegas::Runner.new(Resque::Server, 'resque-web', { load path.to_s.strip if path } }) do |runner, opts, app| - opts.on('-N NAMESPACE', "--namespace NAMESPACE", "set the Redis namespace") {|namespace| - runner.logger.info "Using Redis namespace '#{namespace}'" - Resque.redis.namespace = namespace + opts.on('-N NAMESPACE', "--namespace NAMESPACE", "set the Mongo database") {|namespace| + runner.logger.info "Using Mongo database '#{namespace}'" + Resque.mongo_db= namespace } end diff --git a/docs/HOOKS.md b/docs/HOOKS.md index 1c942d479..c091a60e3 100644 --- a/docs/HOOKS.md +++ b/docs/HOOKS.md @@ -66,6 +66,9 @@ An unnamed hook (`before_perform`) will be executed first. The available hooks are: +* `after_enqueue`: Called with the job args after a job is placed on the queue. + Any exception raised propagates up to the code which queued the job. + * `before_perform`: Called with the job args before perform. If it raises `Resque::Job::DontPerform`, the job is aborted. If other exceptions are raised, they will be propagated up the the `Resque::Failure` @@ -99,6 +102,13 @@ look something like this. Modules are even better because jobs can use many of them. + module ScaledJob + def after_enqueue_scale_workers(*args) + Logger.info "Scaling worker count up" + Scaler.up! if Redis.info[:pending].to_i > 25 + end + end + module LoggedJob def before_perform_log_job(*args) Logger.info "About to perform #{self} with #{args.inspect}" @@ -115,6 +125,7 @@ Modules are even better because jobs can use many of them. class MyJob extend LoggedJob extend RetriedJob + extend ScaledJob def self.perform(*args) ... end diff --git a/lib/resque.rb b/lib/resque.rb index ec740cc21..73187bef0 100644 --- a/lib/resque.rb +++ b/lib/resque.rb @@ -1,4 +1,3 @@ -require 'redis/namespace' begin require 'yajl' @@ -6,6 +5,8 @@ require 'json' end +require 'mongo' + require 'resque/version' require 'resque/errors' @@ -22,40 +23,67 @@ module Resque include Helpers extend self - - # Accepts: - # 1. A 'hostname:port' string - # 2. A 'hostname:port:db' string (to select the Redis db) - # 3. An instance of `Redis`, `Redis::Client`, `Redis::DistRedis`, - # or `Redis::Namespace`. - def redis=(server) - if server.respond_to? :split - host, port, db = server.split(':') - redis = Redis.new(:host => host, :port => port, - :thread_safe => true, :db => db) - @redis = Redis::Namespace.new(:resque, :redis => redis) - elsif server.respond_to? :namespace= - @redis = server - else - @redis = Redis::Namespace.new(:resque, :redis => server) + attr_accessor :bypass_queues + @bypass_queues = false + @delay_allowed = [] + + + def mongo=(server) + if server.is_a? String + opts = server.split(':') + host = opts[0] + if opts[1] =~ /\// + opts = opts[1].split('/') + port = opts[0] + queuedb = opts[1] + else + port = opts[1] + end + conn = Mongo::Connection.new host, port + elsif server.is_a? Hash + conn = Mongo::Connection.new(options[:server], options[:port], options) + queuedb = options[:queuedb] || 'resque' + elsif server.is_a? Mongo::Connection + conn = server end + queuedb ||= 'resque' + @mongo = conn.db queuedb + initialize_mongo end - # Returns the current Redis connection. If none has been created, will + # Returns the current Mongo connection. If none has been created, will # create a new one. - def redis - return @redis if @redis - self.redis = 'localhost:6379' - self.redis + def mongo + return @mongo if @mongo + self.mongo = 'localhost:27017/resque' + self.mongo end - def redis_id - # support 1.x versions of redis-rb - if redis.respond_to?(:server) - redis.server - else - redis.client.id - end + def mongo_db=(db) + mongo.conn.db = db + end + + def mongo? + return @mongo + end + + def initialize_mongo + mongo_workers.create_index :worker + mongo_stats.create_index :stat + delay_allowed = mongo_stats.find_one({ :stat => 'Delayable Queues'}, { :fields => ['value']}) + @delay_allowed = delay_allowed['value'].map{ |queue| queue.to_sym} if delay_allowed + end + + def mongo_workers + mongo['resque.workers'] + end + + def mongo_stats + mongo['resque.metrics'] + end + + def mongo_failures + mongo['resque.failures'] end # The `before_first_fork` hook will be run in the **parent** process @@ -106,9 +134,29 @@ def after_fork=(after_fork) end def to_s - "Resque Client connected to #{redis_id}" + "Resque Client connected to #{mongo.connection.host}:#{mongo.connection.port}/#{mongo.name}" end + def allows_unique_jobs(klass) + klass.instance_variable_get(:@unique_jobs) || + (klass.respond_to?(:unique_jobs) and klass.unique_jobs) + end + + def allows_delayed_jobs(klass) + klass.instance_variable_get(:@delayed_jobs) || + (klass.respond_to?(:delayed_jobs) and klass.delayed_jobs) + end + + def queue_allows_delayed(queue) + @delay_allowed.include?(queue.to_sym) || @delay_allowed.include?(queue.to_s) + end + + def enable_delay(queue) + unless queue_allows_delayed queue + @delay_allowed << queue + mongo_stats.update({:stat => 'Delayable Queues'}, { '$addToSet' => { 'value' => queue}}, { :upsert => true}) + end + end # # queue manipulation @@ -117,23 +165,52 @@ def to_s # Pushes a job onto a queue. Queue name should be a string and the # item should be any JSON-able Ruby object. def push(queue, item) - watch_queue(queue) - redis.rpush "queue:#{queue}", encode(item) + item[:resque_enqueue_timestamp] = Time.now + if item[:unique] + mongo[queue].update({'_id' => item[:_id]}, item, { :upsert => true}) + else + mongo[queue] << item + end end # Pops a job off a queue. Queue name should be a string. # # Returns a Ruby object. def pop(queue) - decode redis.lpop("queue:#{queue}") + query = { } + if queue_allows_delayed queue + query['delay_until'] = { '$not' => { '$gt' => Time.new}} + end + #sorting will result in significant performance penalties for large queues, you have been warned. + item = mongo[queue].find_and_modify(:query => query, :remove => true, :sort => [[:_id, :asc]] ) + rescue Mongo::OperationFailure => e + return nil if e.message =~ /No matching object/ + raise e end # Returns an integer representing the size of a queue. # Queue name should be a string. - def size(queue) - redis.llen("queue:#{queue}").to_i + def size(queue) + mongo[queue].count + end + + def delayed_size(queue) + if queue_allows_delayed queue + mongo[queue].find({'delay_until' => { '$gt' => Time.new}}).count + else + mongo[queue].count + end + end + + def ready_size(queue) + if queue_allows_delayed queue + mongo[queue].find({'delay_until' => { '$not' => { '$gt' => Time.new}}}).count + else + mongo[queue].count + end end + # Returns an array of items currently queued. Queue name should be # a string. # @@ -142,40 +219,43 @@ def size(queue) # # To get the 3rd page of a 30 item, paginatied list one would use: # Resque.peek('my_list', 59, 30) - def peek(queue, start = 0, count = 1) - list_range("queue:#{queue}", start, count) + def peek(queue, start = 0, count = 1, mode = :ready) + list_range(queue, start, count, mode) end # Does the dirty work of fetching a range of items from a Redis list # and converting them into Ruby objects. - def list_range(key, start = 0, count = 1) - if count == 1 - decode redis.lindex(key, start) - else - Array(redis.lrange(key, start, start+count-1)).map do |item| - decode item + def list_range(key, start = 0, count = 1, mode = :ready) + query = { } + sort = [] + if queue_allows_delayed(key) + if mode == :ready + query['delay_until'] = { '$not' => { '$gt' => Time.new}} + elsif mode == :delayed + query['delay_until'] = { '$gt' => Time.new} + elsif mode == :delayed_sorted + query['delay_until'] = { '$gt' => Time.new} + sort << ['delay_until', 1] + elsif mode == :all_sorted + query = {} + sort << ['delay_until', 1] end end + items = mongo[key].find(query, { :limit => count, :skip => start, :sort => sort}).to_a.map{ |i| i} + count > 1 ? items : items.first end # Returns an array of all known Resque queues as strings. - def queues - Array(redis.smembers(:queues)) + def queues + names = mongo.collection_names + names.delete_if{ |name| name =~ /system./ || name =~ /resque\./ } end # Given a queue name, completely deletes the queue. def remove_queue(queue) - redis.srem(:queues, queue.to_s) - redis.del("queue:#{queue}") + mongo[queue].drop end - # Used internally to keep track of which queues we've created. - # Don't call this directly. - def watch_queue(queue) - redis.sadd(:queues, queue.to_s) - end - - # # job shortcuts # @@ -194,7 +274,11 @@ def watch_queue(queue) # # This method is considered part of the `stable` API. def enqueue(klass, *args) - Job.create(queue_from_class(klass), klass, *args) + if @bypass_queues + klass.send(:perform, *args) + else + Job.create(queue_from_class(klass), klass, *args) + end end # This method can be used to conveniently remove a job from a queue. @@ -277,9 +361,9 @@ def info :processed => Stat[:processed], :queues => queues.size, :workers => workers.size.to_i, - :working => working.size, + :working => working.count, :failed => Stat[:failed], - :servers => [redis_id], + :servers => to_s, :environment => ENV['RAILS_ENV'] || ENV['RACK_ENV'] || 'development' } end @@ -287,8 +371,11 @@ def info # Returns an array of all known Resque keys in Redis. Redis' KEYS operation # is O(N) for the keyspace, so be careful - this can be slow for big databases. def keys - redis.keys("*").map do |key| - key.sub("#{redis.namespace}:", '') - end + names = mongo.collection_names + end + + def drop + mongo.collections.each{ |collection| collection.drop unless collection.name =~ /^system./ } + @mongo = nil end end diff --git a/lib/resque/errors.rb b/lib/resque/errors.rb index 1d0dd6801..be1b99390 100644 --- a/lib/resque/errors.rb +++ b/lib/resque/errors.rb @@ -7,4 +7,7 @@ class NoClassError < RuntimeError; end # Raised when a worker was killed while processing a job. class DirtyExit < RuntimeError; end + + #raised for bad delay/unique queue integrity + class QueueError < RuntimeError; end end diff --git a/lib/resque/failure.rb b/lib/resque/failure.rb index 7b4d570fb..0d547f7ee 100644 --- a/lib/resque/failure.rb +++ b/lib/resque/failure.rb @@ -32,8 +32,8 @@ def self.backend=(backend) # back to `Resque::Failure::Redis` def self.backend return @backend if @backend - require 'resque/failure/redis' - @backend = Failure::Redis + require 'resque/failure/mongo' + @backend = Failure::Mongo end # Returns the int count of how many failures we have seen. diff --git a/lib/resque/failure/mongo.rb b/lib/resque/failure/mongo.rb new file mode 100644 index 000000000..967a951a0 --- /dev/null +++ b/lib/resque/failure/mongo.rb @@ -0,0 +1,40 @@ +module Resque + module Failure + # A Failure backend that stores exceptions in Mongo. Very simple but + # works out of the box, along with support in the Resque web app. + class Mongo < Base + def save + data = { + :failed_at => Time.now.strftime("%Y/%m/%d %H:%M:%S"), + :payload => payload, + :exception => exception.class.to_s, + :error => exception.to_s, + :backtrace => Array(exception.backtrace), + :worker => worker.to_s, + :queue => queue + } + Resque.mongo_failures << data + end + + def self.count + Resque.mongo_failures.count + end + + def self.all(start = 0, count = 1) + all_failures = Resque.mongo_failures.find().skip(start.to_i).limit(count.to_i).to_a + all_failures.size == 1 ? all_failures.first : all_failures + end + + def self.clear + Resque.mongo_failures.remove + end + + def self.requeue(index) + item = all(index) + item['retried_at'] = Time.now.strftime("%Y/%m/%d %H:%M:%S") + Resque.mongo_failures.update({ :_id => item['_id']}, item) + Job.create(item['queue'], item['payload']['class'], *item['payload']['args']) + end + end + end +end diff --git a/lib/resque/failure/redis.rb b/lib/resque/failure/redis.rb index 1a91c5f46..7b0f6c371 100644 --- a/lib/resque/failure/redis.rb +++ b/lib/resque/failure/redis.rb @@ -1,6 +1,6 @@ module Resque module Failure - # A Failure backend that stores exceptions in Redis. Very simple but + # A Failure backend that stores exceptions in Mongo. Very simple but # works out of the box, along with support in the Resque web app. class Redis < Base def save @@ -13,20 +13,20 @@ def save :worker => worker.to_s, :queue => queue } - data = Resque.encode(data) - Resque.redis.rpush(:failed, data) + Resque.mongo_failures << data end def self.count - Resque.redis.llen(:failed).to_i + Resque.mongo_failures.count end def self.all(start = 0, count = 1) - Resque.list_range(:failed, start, count) + all_failures = Resque.mongo_failures.find().sort([:natural, :desc]).skip(start).limit(count).to_a + # all_failures.size == 1 ? all_failures.first : all_failures end def self.clear - Resque.redis.del(:failed) + Resque.mongo_failures.remove end def self.requeue(index) diff --git a/lib/resque/helpers.rb b/lib/resque/helpers.rb index 417d6b3b5..fbb57ba0f 100644 --- a/lib/resque/helpers.rb +++ b/lib/resque/helpers.rb @@ -2,8 +2,16 @@ module Resque # Methods used by various classes in Resque. module Helpers # Direct access to the Redis instance. - def redis - Resque.redis + #def mongo + # Resque.mongo + #end + + def mongo_workers + Resque.mongo_workers + end + + def mongo_stats + Resque.mongo_stats end # Given a Ruby object, returns a string suitable for storage in a @@ -15,7 +23,7 @@ def encode(object) object.to_json end end - + # Given a string, returns a Ruby object. def decode(object) return unless object diff --git a/lib/resque/job.rb b/lib/resque/job.rb index 59eb561fb..397db582c 100644 --- a/lib/resque/job.rb +++ b/lib/resque/job.rb @@ -32,6 +32,7 @@ class Job def initialize(queue, payload) @queue = queue @payload = payload + end # Creates a job by placing it on a queue. Expects a string queue @@ -48,7 +49,43 @@ def self.create(queue, klass, *args) raise NoClassError.new("Jobs must be given a class.") end - Resque.push(queue, :class => klass.to_s, :args => args) + item = { :class => klass.to_s, :args => args} + + item[:_id] = args[0][:_id] if Resque.allows_unique_jobs(klass) && args[0].is_a?(Hash) && args[0].has_key?(:_id) + item[:unique] = true if item[:_id] + + #are we trying to put a non-delayed job into a delayed queue? + if Resque.queue_allows_delayed(queue) + if Resque.allows_delayed_jobs(klass) + if args[0].is_a?(Hash) && args[0].has_key?(:delay_until) + item[:delay_until] = args[0][:delay_until] + else + raise QueueError.new 'trying to insert delayed job without delay_until' + end + else + raise QueueError.new 'trying to insert non-delayed job into delayed queue' + end + else + if Resque.allows_delayed_jobs(klass) + raise QueueError.new 'trying to insert a delayed job into a non-delayed queue' + end + end + + #is it a hydra job? + heads = klass.instance_variable_get(:@hydra) + if heads + if item[:_id] + queue = (queue.to_s + (item[:_id].hash % heads).to_s).to_sym + else + queue = (queue.to_s + rand(heads).to_s).to_sym + end + end + + ret = Resque.push(queue, item) + Plugin.after_enqueue_hooks(klass).each do |hook| + klass.send(hook, *args) + end + ret end # Removes a job from a queue. Expects a string queue name, a @@ -76,20 +113,11 @@ def self.create(queue, klass, *args) # depending on the size of your queue, as it loads all jobs into # a Ruby array before processing. def self.destroy(queue, klass, *args) - klass = klass.to_s - queue = "queue:#{queue}" - destroyed = 0 - - if args.empty? - redis.lrange(queue, 0, -1).each do |string| - if decode(string)['class'] == klass - destroyed += redis.lrem(queue, 0, string).to_i - end - end - else - destroyed += redis.lrem(queue, 0, encode(:class => klass, :args => args)) - end - + collection = Resque.mongo[queue] + selector = {'class' => klass.to_s} + selector['args'] = args unless args.empty? + destroyed = collection.find(selector).count + collection.remove(selector, :safe => true) destroyed end diff --git a/lib/resque/plugin.rb b/lib/resque/plugin.rb index a19b6fed6..3152dd4bc 100644 --- a/lib/resque/plugin.rb +++ b/lib/resque/plugin.rb @@ -42,5 +42,10 @@ def after_hooks(job) def failure_hooks(job) job.methods.grep(/^on_failure/).sort end + + # Given an object, returns a list `after_enqueue` hook names. + def after_enqueue_hooks(job) + job.methods.grep(/^after_enqueue/).sort + end end end diff --git a/lib/resque/server.rb b/lib/resque/server.rb index 066401ccb..c83c49ecc 100644 --- a/lib/resque/server.rb +++ b/lib/resque/server.rb @@ -46,40 +46,33 @@ def tabs Resque::Server.tabs end - def redis_get_size(key) - case Resque.redis.type(key) - when 'none' - [] - when 'list' - Resque.redis.llen(key) - when 'set' - Resque.redis.scard(key) - when 'string' - Resque.redis.get(key).length - when 'zset' - Resque.redis.zcard(key) - end + def mongo_get_size(key) + Resque.mongo[key].count end - def redis_get_value_as_array(key, start=0) - case Resque.redis.type(key) - when 'none' - [] - when 'list' - Resque.redis.lrange(key, start, start + 20) - when 'set' - Resque.redis.smembers(key)[start..(start + 20)] - when 'string' - [Resque.redis.get(key)] - when 'zset' - Resque.redis.zrange(key, start, start + 20) - end + def mongo_get_value_as_array(key, start=0) + Resque.mongo[key].find({ }, { :skip => start, :limit => 20}).to_a end def show_args(args) Array(args).map { |a| a.inspect }.join("\n") end + def worker_hosts + @worker_hosts ||= worker_hosts! + end + + def worker_hosts! + hosts = Hash.new { [] } + + Resque.workers.each do |worker| + host, _ = worker.to_s.split(':') + hosts[host] += [worker.to_s] + end + + hosts + end + def partial? @partial end @@ -110,6 +103,66 @@ def show(page, layout = true) end end + def processes_in(delay_until) + return 'Immediately' if delay_until.nil? + now = Time.now + time = distance_of_time_in_words(now, delay_until) + return "Immediately (#{time})" if now > delay_until + return time + end + + def enqueued_at(resque_enqueue_timestamp) + return 'Unknown' if resque_enqueue_timestamp.nil? + now = Time.now + time = distance_of_time_in_words(now, resque_enqueue_timestamp) + return time + end + + def distance_of_time_in_words(from_time, to_time = 0, include_seconds = true, options = {}) + from_time = from_time.to_time if from_time.respond_to?(:to_time) + to_time = to_time.to_time if to_time.respond_to?(:to_time) + distance_in_minutes = (((to_time - from_time).abs)/60).round + distance_in_seconds = ((to_time - from_time).abs).round + + ago = from_time > to_time ? ' ago' : '' + + + case distance_in_minutes + when 0..1 + return distance_in_minutes == 0 ? + "less than 1 minute" + ago : + "#{distance_in_minutes} minutes" + ago unless include_seconds + + case distance_in_seconds + when 0..4 then "less than 5 seconds" + ago + when 5..9 then "less than 10 seconds" + ago + when 10..19 then "less than 20 seconds" + ago + when 20..39 then "half a minute" + ago + when 40..59 then "less than 1 minute" + ago + else "1 minute" + ago + end + + when 2..44 then "#{distance_in_minutes} minutes" + ago + when 45..89 then "about 1 hour" + ago + when 90..1439 then "about #{(distance_in_minutes.to_f / 60.0).round} hours" + ago + when 1440..2529 then "about 1 day" + ago + when 2530..43199 then "about #{(distance_in_minutes.to_f / 1440.0).round} days" + ago + when 43200..86399 then "about 1 month" + ago + when 86400..525599 then "about #{(distance_in_minutes.to_f / 43200.0).round} months" + ago + else + distance_in_years = distance_in_minutes / 525600 + minute_offset_for_leap_year = (distance_in_years / 4) * 1440 + remainder = ((distance_in_minutes - minute_offset_for_leap_year) % 525600) + if remainder < 131400 + "about #{distance_in_years} years" + ago + elsif remainder < 394200 + "over #{distance_in_years} years" + ago + else + "almost #{distance_in_years} years" + ago + end + end + end + # to make things easier on ourselves get "/?" do redirect url(:overview) diff --git a/lib/resque/server/public/style.css b/lib/resque/server/public/style.css index b96528bea..1dde49245 100644 --- a/lib/resque/server/public/style.css +++ b/lib/resque/server/public/style.css @@ -37,8 +37,15 @@ body { padding:0; margin:0; } #main table.queues tr.failure td { background:#ffecec; border-top:2px solid #d37474; font-size:90%; color:#d37474;} #main table.queues tr.failure td a{ color:#d37474;} -#main table.jobs td.class { font-family:Monaco, "Courier New", monospace; font-size:90%; width:50%;} -#main table.jobs td.args{ width:50%;} +#main table.jobs td.class { font-family:Monaco, "Courier New", monospace; font-size:90%; width:30%;} +#main table.jobs td.args{ width:50%;word-break:break-word;} +#main table.jobs td.enqueuedat{ width:20%;} + +#main table.delays td.class { font-family:Monaco, "Courier New", monospace; font-size:90%; width:20%;} +#main table.delays td.args{ width:40%;word-break:break-word;} +#main table.delays td.delay{ width:20%;} +#main table.delays td.enqueuedat{ width:20%;} + #main table.workers td.icon {width:1%; background:#efefef;text-align:center;} #main table.workers td.where { width:25%;} @@ -78,4 +85,4 @@ body { padding:0; margin:0; } #main form {float:right; margin-top:-10px;} -#main .time a.toggle_format {text-decoration:none;} \ No newline at end of file +#main .time a.toggle_format {text-decoration:none;} diff --git a/lib/resque/server/views/failed.erb b/lib/resque/server/views/failed.erb index 8333e7d2c..c4f316696 100644 --- a/lib/resque/server/views/failed.erb +++ b/lib/resque/server/views/failed.erb @@ -12,6 +12,7 @@

Showing <%=start%> to <%= start + 20 %> of <%= size = Resque::Failure.count %> jobs

    + <% failed = [failed] unless failed.is_a? Array %> <%for job in failed%> <% index += 1 %>
  • diff --git a/lib/resque/server/views/key_sets.erb b/lib/resque/server/views/key_sets.erb index 138a2b1b9..b0b80565c 100644 --- a/lib/resque/server/views/key_sets.erb +++ b/lib/resque/server/views/key_sets.erb @@ -1,16 +1,16 @@ <% if key = params[:key] %>

    - Showing <%= start = params[:start].to_i %> to <%= start + 20 %> of <%=size = redis_get_size(key) %> + Showing <%= start = params[:start].to_i %> to <%= start + 20 %> of <%=size = mongo_get_size(key) %>

    -

    Key "<%= key %>" is a <%= resque.redis.type key %>

    +

    Elements in Collection <%= key %>

    - <% for row in redis_get_value_as_array(key, start) %> + <% for row in mongo_get_value_as_array(key, start) %> <% end %> diff --git a/lib/resque/server/views/layout.erb b/lib/resque/server/views/layout.erb index 7ae77b3f0..1262e6b79 100644 --- a/lib/resque/server/views/layout.erb +++ b/lib/resque/server/views/layout.erb @@ -16,11 +16,9 @@ <%= tab tab_name %> <% end %> - <% if Resque.redis.namespace != :resque %> - - <%= Resque.redis.namespace %> - - <% end %> + + <%= Resque.to_s %> + <% if @subtabs %> @@ -37,8 +35,8 @@ - \ No newline at end of file + diff --git a/lib/resque/server/views/queues.erb b/lib/resque/server/views/queues.erb index 693a5ef1e..aa4a5cd65 100644 --- a/lib/resque/server/views/queues.erb +++ b/lib/resque/server/views/queues.erb @@ -1,43 +1,63 @@ -<% @subtabs = resque.queues unless partial? %> +<% @subtabs = resque.queues unless partial? || params[:id].nil? %> <% if queue = params[:id] %> - -

    Pending jobs on <%= queue %>

    +

    Jobs on <%= queue %>

    " class='remove-queue'> -

    Showing <%= start = params[:start].to_i %> to <%= start + 20 %> of <%=size = resque.size(queue)%> jobs

    -
    - <%= row %> + <%= row.inspect %>
    - - - - - <% for job in (jobs = resque.peek(queue, start, 20)) %> - - - - + <% if !resque.queue_allows_delayed queue.to_sym %> +

    Showing <%= start = params[:start].to_i %> to <%= start + 20 %> of <%=size = resque.size(queue.to_sym)%> jobs

    ClassArgs
    <%= job['class'] %><%=h job['args'].inspect %>
    + + + + + + <% for job in (jobs = resque.peek(queue.to_sym, start, 20)) %> + + + + + + <% end %> + <% else %> +

    Showing <%= start = params[:start].to_i %> to <%= start + 20 %> of <%=size = resque.size(queue.to_sym)%> jobs

    ClassArgsEnqueued At
    <%= job['class'] %><%=h job['args'].inspect %><%=h enqueued_at(job['resque_enqueue_timestamp']) %>
    + + + + + + + <% for job in (jobs = resque.peek(queue.to_sym, start, 20, :all_sorted)) %> + + + + + + + <% end %> <% end %> <% if jobs.empty? %> - - - + + + <% end %> -
    ClassArgsEnqueued AtProcesses In
    <%= job['class'] %><%=h job['args'].inspect %><%=h enqueued_at(job['resque_enqueue_timestamp']) %><%=h processes_in(job['delay_until']) %>
    There are no pending jobs in this queue
    There are no pending jobs in this queue
    - <%= partial :next_more, :start => start, :size => size %> -<% else %> + + <%= partial :next_more, :start => start, :size => size %> + <% else %>

    Queues

    The list below contains all the registered queues with the number of jobs currently in the queue. Select a queue from above to view all jobs currently pending on the queue.

    - + + <% for queue in resque.queues.sort_by { |q| q.to_s } %> - + + <% end %> "> diff --git a/lib/resque/server/views/stats.erb b/lib/resque/server/views/stats.erb index 98f5d2ce0..e9a3fa92c 100644 --- a/lib/resque/server/views/stats.erb +++ b/lib/resque/server/views/stats.erb @@ -1,8 +1,8 @@ -<% @subtabs = %w( resque redis keys ) %> +<% @subtabs = %w( resque mongo keys ) %> <% if params[:key] %> -<%= partial resque.redis.type(params[:key]).eql?("string") ? :key_string : :key_sets %> +<%= partial :key_sets %> <% elsif params[:id] == "resque" %> @@ -20,11 +20,25 @@ <% end %>
    NameJobsReady JobsDelayed Jobs
    "><%= queue %><%= resque.size queue %><%= resque.ready_size queue.to_sym %><%= resque.delayed_size(queue.to_sym) if resque.queue_allows_delayed(queue.to_sym) %>
    -<% elsif params[:id] == 'redis' %> +<% elsif params[:id] == 'mongo' %> -

    <%= resque.redis_id %>

    +

    <%= resque.to_s %>

    - <% for key, value in resque.redis.info.to_a.sort_by { |i| i[0].to_s } %> + <% resque.mongo_stats.find.to_a.each do |hash| %> + + + + + <% end %> +
    + <%= hash['stat'] %> + + <%= hash['value'].inspect %> +
    + +

    Mongo Database Health

    + + <% resque.mongo.stats.each_pair do |key, value| %>
    <%= key %> @@ -38,12 +52,10 @@ <% elsif params[:id] == 'keys' %> -

    Keys owned by <%= resque %>

    -

    (All keys are actually prefixed with "<%= Resque.redis.namespace %>:")

    +

    Collections in <%= resque.to_s %>

    - - + <% for key in resque.keys.sort %> @@ -51,8 +63,7 @@ - - + <% end %>
    keytypecollection size
    "><%= key %> <%= resque.redis.type key %><%= redis_get_size key %><%= resque.mongo[key].count %>
    diff --git a/lib/resque/server/views/workers.erb b/lib/resque/server/views/workers.erb index 3fd6fccb6..0ddb01640 100644 --- a/lib/resque/server/views/workers.erb +++ b/lib/resque/server/views/workers.erb @@ -1,3 +1,5 @@ +<% @subtabs = worker_hosts.keys.sort unless worker_hosts.size == 1 %> + <% if params[:id] && worker = Resque::Worker.find(params[:id]) %>

    Worker <%= worker %>

    @@ -34,13 +36,19 @@
    -<% elsif params[:id] %> +<% elsif params[:id] && !worker_hosts.keys.include?(params[:id]) && params[:id] != 'all' %>

    Worker doesn't exist

    -<% else %> +<% elsif worker_hosts.size == 1 || params[:id] %> -

    <%= resque.workers.size %> Workers

    + <% if worker_hosts.size == 1 || params[:id] == 'all' %> + <% workers = Resque.workers %> + <% else %> + <% workers = worker_hosts[params[:id]].map { |id| Resque::Worker.find(id) } %> + <% end %> + +

    <%= workers.size %> Workers

    The workers listed below are all registered as active on your system.

    @@ -49,7 +57,7 @@ - <% for worker in (workers = resque.workers.sort_by { |w| w.to_s }) %> + <% for worker in (workers = workers.sort_by { |w| w.to_s }) %> @@ -75,4 +83,27 @@ <% end %>
    Queues Processing
    <%= state %>
    <%=poll%> + +<% else %> + <% @subtabs = [] %> +

    Workers

    +

    The hostnames below all have registered workers. Select a hostname to view its workers, or "all" to see all workers.

    + + + + + + <% for hostname, workers in worker_hosts.sort_by { |h,w| h } %> + + + + + <% end %> + + + + +
    HostnameWorkers
    "><%= hostname %><%= workers.size %>
    ">all workers<%= Resque.workers.size %>
    + + <% end %> diff --git a/lib/resque/server/views/working.erb b/lib/resque/server/views/working.erb index ae88ccc34..db2b4ae0d 100644 --- a/lib/resque/server/views/working.erb +++ b/lib/resque/server/views/working.erb @@ -27,7 +27,7 @@ <% else %> - <% workers = resque.working %> + <% workers = resque.working.reject { |w| w.idle? } %>

    <%= workers.size %> of <%= resque.workers.size %> Workers Working

    The list below contains all workers which are currently running a job.

    @@ -45,7 +45,6 @@ <% for worker in workers.sort_by { |w| w.job['run_at'] ? w.job['run_at'] : '' } %> <% job = worker.job %> - <% next if worker.idle? %> diff --git a/lib/resque/stat.rb b/lib/resque/stat.rb index db286cc0b..dad0206fa 100644 --- a/lib/resque/stat.rb +++ b/lib/resque/stat.rb @@ -11,7 +11,8 @@ module Stat # Returns the int value of a stat, given a string stat name. def get(stat) - redis.get("stat:#{stat}").to_i + value = mongo_stats.find_one :stat => stat + value.nil? ? 0 : value['value'] end # Alias of `get` @@ -24,7 +25,7 @@ def [](stat) # Can optionally accept a second int parameter. The stat is then # incremented by that amount. def incr(stat, by = 1) - redis.incrby("stat:#{stat}", by) + mongo_stats.update({:stat => stat}, {'$inc' => {:value => by}}, :upsert => true) end # Increments a stat by one. @@ -37,7 +38,7 @@ def <<(stat) # Can optionally accept a second int parameter. The stat is then # decremented by that amount. def decr(stat, by = 1) - redis.decrby("stat:#{stat}", by) + mongo_stats.update({ :stat => stat}, { '$inc' => { :value => -by}}) end # Decrements a stat by one. @@ -47,7 +48,7 @@ def >>(stat) # Removes a stat from Redis, effectively setting it to 0. def clear(stat) - redis.del("stat:#{stat}") + mongo_stats.remove({:stat => stat}) end end end diff --git a/lib/resque/version.rb b/lib/resque/version.rb index db6158a05..ab9148a2c 100644 --- a/lib/resque/version.rb +++ b/lib/resque/version.rb @@ -1,3 +1,3 @@ module Resque - Version = VERSION = '1.9.8' + Version = VERSION = '1.12.3' end diff --git a/lib/resque/worker.rb b/lib/resque/worker.rb index 6f072aa21..80a7b0efc 100644 --- a/lib/resque/worker.rb +++ b/lib/resque/worker.rb @@ -24,18 +24,13 @@ class Worker # Returns an array of all worker objects. def self.all - Array(redis.smembers(:workers)).map { |id| find(id) }.compact + mongo_workers.distinct(:worker).map{|worker| find(worker)}.compact end # Returns an array of all worker objects currently processing # jobs. def self.working - names = all - return [] unless names.any? - names.map! { |name| "worker:#{name}" } - redis.mapped_mget(*names).keys.map do |key| - find key.sub("worker:", '') - end.compact + working = mongo_workers.find({ 'working_on' => { '$exists' => true}}).to_a.map{|w| find(w['worker'])} end # Returns a single worker object. Accepts a string id. @@ -58,7 +53,7 @@ def self.attach(worker_id) # Given a string worker id, return a boolean indicating whether the # worker exists def self.exists?(worker_id) - redis.sismember(:workers, worker_id) + mongo_workers.find({ :worker => worker_id.to_s}).count > 0 end # Workers should be initialized with an array of string queue @@ -108,11 +103,11 @@ def work(interval = 5, &block) startup loop do - break if @shutdown + break if shutdown? if not @paused and job = reserve log "got: #{job.inspect}" - run_hook :before_fork + run_hook :before_fork, job working_on job if @child = fork @@ -265,6 +260,11 @@ def shutdown! kill_child end + # Should this worker shutdown as soon as current job is finished? + def shutdown? + @shutdown + end + # Kills the forked child immediately, without remorse. The job it # is processing will not be completed. def kill_child @@ -293,15 +293,15 @@ def unpause_processing end # Looks for any workers which should be running on this server - # and, if they're not, removes them from Redis. + # and, if they're not, removes them from Mongo. # # This is a form of garbage collection. If a server is killed by a # hard shutdown, power failure, or something else beyond our # control, the Resque workers will not die gracefully and therefore - # will leave stale state information in Redis. + # will leave stale state information in Mongo. # - # By checking the current Redis state against the actual - # environment, we can determine if Redis is old and clean it up a bit. + # By checking the current Mongo state against the actual + # environment, we can determine if Mongo is old and clean it up a bit. def prune_dead_workers all_workers = Worker.all known_workers = worker_pids unless all_workers.empty? @@ -317,7 +317,7 @@ def prune_dead_workers # Registers ourself as a worker. Useful when entering the worker # lifecycle on startup. def register_worker - redis.sadd(:workers, self) + mongo_workers << { :worker => self.to_s} started! end @@ -343,30 +343,28 @@ def unregister_worker job.fail(DirtyExit.new) end - redis.srem(:workers, self) - redis.del("worker:#{self}") - redis.del("worker:#{self}:started") + mongo_workers.remove :worker => self.to_s Stat.clear("processed:#{self}") Stat.clear("failed:#{self}") end - # Given a job, tells Redis we're working on it. Useful for seeing + # Given a job, tells Mongo we're working on it. Useful for seeing # what workers are doing and when. def working_on(job) job.worker = self - data = encode \ - :queue => job.queue, + data = #encode \ + { :queue => job.queue, :run_at => Time.now.to_s, - :payload => job.payload - redis.set("worker:#{self}", data) + :payload => job.payload} + mongo_workers.update({:worker => self.to_s}, { '$set' => { 'working_on' => data}}, :upsert => true) end # Called when we are done working - clears our `working_on` state - # and tells Redis we processed a job. + # and tells Mongo we processed a job. def done_working processed! - redis.del("worker:#{self}") + mongo_workers.remove({ :worker => self.to_s}) end # How many jobs has this worker processed? Returns an int. @@ -374,7 +372,7 @@ def processed Stat["processed:#{self}"] end - # Tell Redis we've processed a job. + # Tell Mongo we've processed a job. def processed! Stat << "processed" Stat << "processed:#{self}" @@ -385,7 +383,7 @@ def failed Stat["failed:#{self}"] end - # Tells Redis we've failed a job. + # Tells Mongo we've failed a job. def failed! Stat << "failed" Stat << "failed:#{self}" @@ -393,17 +391,19 @@ def failed! # What time did this worker start? Returns an instance of `Time` def started - redis.get "worker:#{self}:started" + worker = mongo_workers.find_one(:worker => self.to_s) + worker.nil? ? nil : worker['started'] end - # Tell Redis we've started + # Tell Mongo we've started def started! - redis.set("worker:#{self}:started", Time.now.to_s) + mongo_workers.update({ :worker => self.to_s}, { '$set' => { :started => Time.now.to_s}}) end # Returns a hash explaining the Job we're currently processing, if any. def job - decode(redis.get("worker:#{self}")) || {} + worker = mongo_workers.find_one :worker => self.to_s + worker.nil? ? { } : worker['working_on'] #decode(worker['working_on']) end alias_method :processing, :job @@ -420,7 +420,7 @@ def idle? # Returns a symbol representing the current worker state, # which can be either :working or :idle def state - redis.exists("worker:#{self}") ? :working : :idle + mongo_workers.find_one(:worker => self.to_s) ? :working : :idle end # Is this worker the same as another worker? diff --git a/resque-igo-1.12.1.gem b/resque-igo-1.12.1.gem new file mode 100644 index 000000000..49b9cdd60 Binary files /dev/null and b/resque-igo-1.12.1.gem differ diff --git a/resque-igo.gemspec b/resque-igo.gemspec new file mode 100644 index 000000000..a8c9e105b --- /dev/null +++ b/resque-igo.gemspec @@ -0,0 +1,47 @@ +$LOAD_PATH.unshift 'lib' +require 'resque/version' + +Gem::Specification.new do |s| + s.name = "resque-igo" + s.version = Resque::Version + s.date = Time.now.strftime('%Y-%m-%d') + s.summary = "Resque-igo is a mongo-backed queueing system" + s.homepage = "http://github.com/mediocretes/resque-mongo" + s.email = "nacuff@igodigital.com" + s.authors = [ "Nathan D Acuff" ] + + s.files = %w( README.markdown Rakefile LICENSE HISTORY.md ) + s.files += Dir.glob("lib/**/*") + s.files += Dir.glob("bin/**/*") + s.files += Dir.glob("man/**/*") + s.files += Dir.glob("test/**/*") + s.files += Dir.glob("tasks/**/*") + s.executables = [ "resque", "resque-web" ] + + s.extra_rdoc_files = [ "LICENSE", "README.markdown" ] + s.rdoc_options = ["--charset=UTF-8"] + + s.add_dependency "vegas", "~> 0.1.2" + s.add_dependency "sinatra", ">= 0.9.2" + s.add_dependency "json", "~> 1.4.6" + s.add_dependency "mongo", ">= 1.0.7" + + s.description = < 0.7.0" + s.add_dependency "redis-namespace", "~> 0.8.0" s.add_dependency "vegas", "~> 0.1.2" s.add_dependency "sinatra", ">= 0.9.2" - s.add_dependency "json", ">= 1.1.0" + s.add_dependency "json", "~> 1.4.6" s.description = < [:about, :download, :make] do - bin_dir = '/usr/bin' - conf_dir = '/etc' - - if ENV['PREFIX'] - bin_dir = "#{ENV['PREFIX']}/bin" - sh "mkdir -p #{bin_dir}" unless File.exists?("#{bin_dir}") - - conf_dir = "#{ENV['PREFIX']}/etc" - sh "mkdir -p #{conf_dir}" unless File.exists?("#{conf_dir}") - end - - %w(redis-benchmark redis-cli redis-server).each do |bin| - sh "cp /tmp/redis/src/#{bin} #{bin_dir}" - end - - puts "Installed redis-benchmark, redis-cli and redis-server to #{bin_dir}" - - unless File.exists?("#{conf_dir}/redis.conf") - sh "cp /tmp/redis/redis.conf #{conf_dir}/redis.conf" - puts "Installed redis.conf to #{conf_dir} \n You should look at this file!" - end - end - - task :make do - sh "cd /tmp/redis/src && make clean" - sh "cd /tmp/redis/src && make" - end - - desc "Download package" - task :download do - sh 'rm -rf /tmp/redis/' if File.exists?("/tmp/redis/.svn") - sh 'git clone git://github.com/antirez/redis.git /tmp/redis' unless File.exists?('/tmp/redis') - sh "cd /tmp/redis && git pull" if File.exists?("/tmp/redis/.git") - end - -end - -namespace :dtach do - - desc 'About dtach' - task :about do - puts "\nSee http://dtach.sourceforge.net/ for information about dtach.\n\n" - end - - desc 'Install dtach 0.8 from source' - task :install => [:about, :download, :make] do - - bin_dir = "/usr/bin" - - if ENV['PREFIX'] - bin_dir = "#{ENV['PREFIX']}/bin" - sh "mkdir -p #{bin_dir}" unless File.exists?("#{bin_dir}") - end - - sh "cp /tmp/dtach-0.8/dtach #{bin_dir}" - end - - task :make do - sh 'cd /tmp/dtach-0.8/ && ./configure && make' - end - - desc "Download package" - task :download do - unless File.exists?('/tmp/dtach-0.8.tar.gz') - require 'net/http' - - url = 'http://downloads.sourceforge.net/project/dtach/dtach/0.8/dtach-0.8.tar.gz' - open('/tmp/dtach-0.8.tar.gz', 'wb') do |file| file.write(open(url).read) end - end - - unless File.directory?('/tmp/dtach-0.8') - sh 'cd /tmp && tar xzf dtach-0.8.tar.gz' - end - end -end - diff --git a/test/job_hooks_test.rb b/test/job_hooks_test.rb index e4368bf30..e72cc6664 100644 --- a/test/job_hooks_test.rb +++ b/test/job_hooks_test.rb @@ -228,6 +228,27 @@ def self.on_failure_record_failure(exception, history) end end +context "Resque::Job after_enqueue" do + include PerformJob + + class ::AfterEnqueueJob + def self.after_enqueue_record_history(history) + history << :after_enqueue + end + + def self.perform(history) + end + end + + test "the after enqueue hook should run" do + history = [] + @worker = Resque::Worker.new(:jobs) + Resque::Job.create(:jobs, AfterEnqueueJob, history) + @worker.work(0) + assert_equal history, [:after_enqueue], "after_enqueue was not run" + end +end + context "Resque::Job all hooks" do include PerformJob diff --git a/test/redis-test.conf b/test/redis-test.conf deleted file mode 100644 index 0ba01aac0..000000000 --- a/test/redis-test.conf +++ /dev/null @@ -1,115 +0,0 @@ -# Redis configuration file example - -# By default Redis does not run as a daemon. Use 'yes' if you need it. -# Note that Redis will write a pid file in /var/run/redis.pid when daemonized. -daemonize yes - -# When run as a daemon, Redis write a pid file in /var/run/redis.pid by default. -# You can specify a custom pid file location here. -pidfile ./test/redis-test.pid - -# Accept connections on the specified port, default is 6379 -port 9736 - -# If you want you can bind a single interface, if the bind option is not -# specified all the interfaces will listen for connections. -# -# bind 127.0.0.1 - -# Close the connection after a client is idle for N seconds (0 to disable) -timeout 300 - -# Save the DB on disk: -# -# save -# -# Will save the DB if both the given number of seconds and the given -# number of write operations against the DB occurred. -# -# In the example below the behaviour will be to save: -# after 900 sec (15 min) if at least 1 key changed -# after 300 sec (5 min) if at least 10 keys changed -# after 60 sec if at least 10000 keys changed -save 900 1 -save 300 10 -save 60 10000 - -# The filename where to dump the DB -dbfilename dump.rdb - -# For default save/load DB in/from the working directory -# Note that you must specify a directory not a file name. -dir ./test/ - -# Set server verbosity to 'debug' -# it can be one of: -# debug (a lot of information, useful for development/testing) -# notice (moderately verbose, what you want in production probably) -# warning (only very important / critical messages are logged) -loglevel debug - -# Specify the log file name. Also 'stdout' can be used to force -# the demon to log on the standard output. Note that if you use standard -# output for logging but daemonize, logs will be sent to /dev/null -logfile stdout - -# Set the number of databases. The default database is DB 0, you can select -# a different one on a per-connection basis using SELECT where -# dbid is a number between 0 and 'databases'-1 -databases 16 - -################################# REPLICATION ################################# - -# Master-Slave replication. Use slaveof to make a Redis instance a copy of -# another Redis server. Note that the configuration is local to the slave -# so for example it is possible to configure the slave to save the DB with a -# different interval, or to listen to another port, and so on. - -# slaveof - -################################## SECURITY ################################### - -# Require clients to issue AUTH before processing any other -# commands. This might be useful in environments in which you do not trust -# others with access to the host running redis-server. -# -# This should stay commented out for backward compatibility and because most -# people do not need auth (e.g. they run their own servers). - -# requirepass foobared - -################################### LIMITS #################################### - -# Set the max number of connected clients at the same time. By default there -# is no limit, and it's up to the number of file descriptors the Redis process -# is able to open. The special value '0' means no limts. -# Once the limit is reached Redis will close all the new connections sending -# an error 'max number of clients reached'. - -# maxclients 128 - -# Don't use more memory than the specified amount of bytes. -# When the memory limit is reached Redis will try to remove keys with an -# EXPIRE set. It will try to start freeing keys that are going to expire -# in little time and preserve keys with a longer time to live. -# Redis will also try to remove objects from free lists if possible. -# -# If all this fails, Redis will start to reply with errors to commands -# that will use more memory, like SET, LPUSH, and so on, and will continue -# to reply to most read-only commands like GET. -# -# WARNING: maxmemory can be a good idea mainly if you want to use Redis as a -# 'state' server or cache, not as a real DB. When Redis is used as a real -# database the memory usage will grow over the weeks, it will be obvious if -# it is going to use too much memory in the long run, and you'll have the time -# to upgrade. With maxmemory after the limit is reached you'll start to get -# errors for write operations, and this may even lead to DB inconsistency. - -# maxmemory - -############################### ADVANCED CONFIG ############################### - -# Glue small output buffers together in order to send small replies in a -# single TCP packet. Uses a bit more CPU but most of the times it is a win -# in terms of number of queries per second. Use 'yes' if unsure. -glueoutputbuf yes diff --git a/test/resque_test.rb b/test/resque_test.rb index 919423c91..e83621d28 100644 --- a/test/resque_test.rb +++ b/test/resque_test.rb @@ -2,12 +2,20 @@ context "Resque" do setup do - Resque.redis.flushall - + Resque.drop + Resque.bypass_queues = false + Resque.enable_delay(:delayed) Resque.push(:people, { 'name' => 'chris' }) Resque.push(:people, { 'name' => 'bob' }) Resque.push(:people, { 'name' => 'mark' }) end + + test "can set a namespace through a url-like string" do + assert Resque.mongo + assert_equal 'resque', Resque.mongo.name + Resque.mongo = 'localhost:27017/namespace' + assert_equal 'namespace', Resque.mongo.name + end test "can put jobs on a queue" do assert Resque::Job.create(:jobs, 'SomeJob', 20, '/tmp') @@ -125,36 +133,48 @@ assert Resque.push(:people, { 'name' => 'jon' }) end + def pop_no_id(queue) + item = Resque.pop(queue) + item.delete("_id") + item + end + + test "can pull items off a queue" do - assert_equal({ 'name' => 'chris' }, Resque.pop(:people)) - assert_equal({ 'name' => 'bob' }, Resque.pop(:people)) - assert_equal({ 'name' => 'mark' }, Resque.pop(:people)) + assert_equal('chris', pop_no_id(:people)['name']) + assert_equal('bob', pop_no_id(:people)['name']) + assert_equal('mark', pop_no_id(:people)['name']) assert_equal nil, Resque.pop(:people) end test "knows how big a queue is" do assert_equal 3, Resque.size(:people) - assert_equal({ 'name' => 'chris' }, Resque.pop(:people)) + assert_equal('chris', pop_no_id(:people)['name']) assert_equal 2, Resque.size(:people) - assert_equal({ 'name' => 'bob' }, Resque.pop(:people)) - assert_equal({ 'name' => 'mark' }, Resque.pop(:people)) + assert_equal('bob', pop_no_id(:people)['name']) + assert_equal('mark', pop_no_id(:people)['name']) assert_equal 0, Resque.size(:people) end test "can peek at a queue" do - assert_equal({ 'name' => 'chris' }, Resque.peek(:people)) + peek = Resque.peek(:people) + peek.delete "_id" + assert_equal('chris', peek['name']) assert_equal 3, Resque.size(:people) end test "can peek multiple items on a queue" do - assert_equal({ 'name' => 'bob' }, Resque.peek(:people, 1, 1)) - - assert_equal([{ 'name' => 'bob' }, { 'name' => 'mark' }], Resque.peek(:people, 1, 2)) - assert_equal([{ 'name' => 'chris' }, { 'name' => 'bob' }], Resque.peek(:people, 0, 2)) - assert_equal([{ 'name' => 'chris' }, { 'name' => 'bob' }, { 'name' => 'mark' }], Resque.peek(:people, 0, 3)) - assert_equal({ 'name' => 'mark' }, Resque.peek(:people, 2, 1)) + assert_equal('bob', Resque.peek(:people, 1, 1)['name']) + peek = Resque.peek(:people, 1, 2).map { |hash| { 'name' => hash['name']}} + assert_equal([{ 'name' => 'bob' }, { 'name' => 'mark' }], peek) + peek = Resque.peek(:people, 0, 2).map { |hash| { 'name' => hash['name']} } + assert_equal([{ 'name' => 'chris' }, { 'name' => 'bob' }], peek) + peek = Resque.peek(:people, 0, 3).map { |hash| { 'name' => hash['name']} } + assert_equal([{ 'name' => 'chris' }, { 'name' => 'bob' }, { 'name' => 'mark' }], peek) + peek = Resque.peek(:people, 2, 1) + assert_equal('mark', peek['name']) assert_equal nil, Resque.peek(:people, 3) assert_equal [], Resque.peek(:people, 3, 2) end @@ -162,24 +182,24 @@ test "knows what queues it is managing" do assert_equal %w( people ), Resque.queues Resque.push(:cars, { 'make' => 'bmw' }) - assert_equal %w( cars people ), Resque.queues + assert_equal %w( cars people ), Resque.queues.sort end test "queues are always a list" do - Resque.redis.flushall + Resque.drop assert_equal [], Resque.queues end test "can delete a queue" do Resque.push(:cars, { 'make' => 'bmw' }) - assert_equal %w( cars people ), Resque.queues + assert_equal %w( cars people ), Resque.queues.sort Resque.remove_queue(:people) assert_equal %w( cars ), Resque.queues assert_equal nil, Resque.pop(:people) end test "keeps track of resque keys" do - assert_equal ["queue:people", "queues"], Resque.keys + assert Resque.keys.include? 'people' end test "badly wants a class name, too" do @@ -216,10 +236,126 @@ assert_equal 3, stats[:queues] assert_equal 3, stats[:processed] assert_equal 1, stats[:failed] - assert_equal [Resque.redis.respond_to?(:server) ? 'localhost:9736' : 'redis://localhost:9736/0'], stats[:servers] + # assert_equal [Resque.redis.respond_to?(:server) ? 'localhost:9736' : 'redis://localhost:9736/0'], stats[:servers] end test "decode bad json" do assert_nil Resque.decode("{\"error\":\"Module not found \\u002\"}") end + + test "unique jobs are unique" do + #does uniqueness work? + Resque.enqueue(UniqueJob, {:_id => 'my_id', :arg1=> 'my args1'}) + assert_equal(1, Resque.size(:unique)) + assert_equal('my args1', Resque.peek(:unique)['args'][0]['arg1']) + assert_equal('my_id', Resque.peek(:unique)['args'][0]['_id']) + Resque.enqueue(UniqueJob, {:_id => 'my_id', :arg1=> 'my args2'}) + assert_equal(1, Resque.size(:unique)) + assert_equal('my args2', Resque.peek(:unique)['args'][0]['arg1']) + + #if I enqueue unique jobs with the same key in 2 queues, do I get 2 jobs? + Resque.enqueue(UniqueJob, {:_id => 'my_id3', :arg1=> 'my arg3'}) + assert_equal(2, Resque.size(:unique)) + Resque.enqueue(OtherUnique, {:_id => 'my_id3', :arg1=> 'my args4'}) + #following line fails because :unique and :unique2 are in the same collection + #\assert_equal(2, Resque.size(:unique)) + assert_equal(1, Resque.size(:unique2)) + + #can I enqueue normal jobs in the unique queue? + Resque.enqueue(NonUnique, {:arg1=> 'my args'}) + assert_equal(3, Resque.size(:unique)) + Resque.enqueue(NonUnique, {:_id => 'my_id', :_id => 'my_id', :arg1=> 'my args2'}) + assert_equal(4, Resque.size(:unique)) + + #how do unique jobs work without a given _id? + Resque.enqueue(UniqueJob, {:arg1=> 'my args3'}) + assert_equal(5, Resque.size(:unique)) + assert_equal('my args3', Resque.peek(:unique, 4)['args'][0]['arg1']) + Resque.enqueue(UniqueJob, {:arg1=> 'my args4'}) + assert_equal(6, Resque.size(:unique)) + assert_equal('my args4', Resque.peek(:unique, 5)['args'][0]['arg1']) + end + + test "Can bypass queues for testing" do + Resque.enqueue(NonUnique, 'test') + assert_equal(1, Resque.size(:unique)) + Resque.bypass_queues = true + Resque.enqueue(NonUnique, 'test') + assert_equal(1, Resque.size(:unique)) + Resque.bypass_queues = false + Resque.enqueue(NonUnique, 'test') + assert_equal(2, Resque.size(:unique)) + end + + test "delayed jobs work" do + args = { :delay_until => Time.new-1} + Resque.enqueue(DelayedJob, args) + job = Resque::Job.reserve(:delayed) + assert_equal(1, job.args[0].keys.length) + assert_equal(args[:delay_until].to_i, job.args[0]["delay_until"].to_i) + args[:delay_until] = Time.new + 2 + assert_equal(0, Resque.delayed_size(:delayed)) + Resque.enqueue(DelayedJob, args) + + assert_equal(1, Resque.delayed_size(:delayed)) + assert_nil Resque.peek(:delayed) + assert_nil Resque::Job.reserve(:delayed) + sleep 1 + assert_nil Resque::Job.reserve(:delayed) + sleep 1 + assert_equal(DelayedJob, Resque::Job.reserve(:delayed).payload_class) + end + + test "delayed unique jobs modify args in place" do + args = { :delay_until => Time.new + 3600, :_id => 'unique'} + Resque.enqueue(DelayedJob, args) + assert_nil(Resque.peek(:delayed)) + args[:delay_until] = Time.new - 1 + Resque.enqueue(DelayedJob, args) + assert_equal(2, Resque::Job.reserve(:delayed).args[0].keys.count) + end + + test "delayed attribute is ignored when bypassing queues" do + Resque.bypass_queues = true + args = { :delay_until => Time.new+20} + foo = Resque.enqueue(DelayedJob, args) + assert_equal(0, Resque.size(:delayed)) + assert(args[:delay_until] > Time.new) + assert(foo =~ /^delayed job executing/) + Resque.bypass_queues = false + end + + test "mixing delay and non-delay is bad" do + dargs = { :delay_until => Time.new + 3600} + + #non-delay into delay + assert_raise(Resque::QueueError) do + Resque.enqueue(NonDelayedJob, dargs) + end + + #delay into non-delay + assert_raise(Resque::QueueError) do + Resque.enqueue(MistargetedDelayedJob, dargs) + end + end + + test "hydra works" do + 20.times do + Resque.enqueue(HydraJob, { :one => 'one'}) + end + assert_equal(0, Resque.size(:hydra)) + assert_equal(20, Resque.size(:hydra1)+Resque.size(:hydra0)) + assert(0 != Resque.size(:hydra1)) + assert(0 != Resque.size(:hydra0)) + end + + test "hydra/unique hashes correctly" do + 20.times do + Resque.enqueue(UniqueHydraJob, {:_id => 'zomgz', :one => 'one'}) + end + # 'zomgz'.hash % 100 == 39 + assert_equal(1, Resque.size(:hydra39)) + Resque.enqueue(UniqueHydraJob, {:_id => '518', :one => 'one'}) + assert_equal(2, Resque.size(:hydra39)) + end end diff --git a/test/test_helper.rb b/test/test_helper.rb index 25274b45b..142b46341 100644 --- a/test/test_helper.rb +++ b/test/test_helper.rb @@ -1,3 +1,4 @@ +# -*- coding: utf-8 -*- dir = File.dirname(File.expand_path(__FILE__)) $LOAD_PATH.unshift dir + '/../lib' $TESTING = true @@ -5,44 +6,16 @@ require 'rubygems' require 'resque' - -# -# make sure we can run redis -# - -if !system("which redis-server") - puts '', "** can't find `redis-server` in your path" - puts "** try running `sudo rake install`" - abort '' +begin + require 'leftright' +rescue LoadError end - # # start our own redis when the tests start, # kill it when they end # -at_exit do - next if $! - - if defined?(MiniTest) - exit_code = MiniTest::Unit.new.run(ARGV) - else - exit_code = Test::Unit::AutoRunner.run - end - - pid = `ps -A -o pid,command | grep [r]edis-test`.split(" ")[0] - puts "Killing test redis server..." - `rm -f #{dir}/dump.rdb` - Process.kill("KILL", pid.to_i) - exit exit_code -end - -puts "Starting redis for testing at localhost:9736..." -`redis-server #{dir}/redis-test.conf` -Resque.redis = 'localhost:9736' - - ## # test/spec/mini 3 # http://gist.github.com/25455 @@ -109,3 +82,103 @@ def self.perform raise SyntaxError, "Extra Bad job!" end end + +class UniqueJob + @queue = :unique + @unique_jobs = true +end + +class NonUnique + @queue = :unique + + def self.perform(data) + "I has a #{data}" + end + +end + +class OtherUnique + @queue = :unique2 + @unique_jobs = true +end + +class DelayedJob + @queue = :delayed + @delayed_jobs = true + @unique_jobs = true + def self.perform(data) + "delayed job executing #{data.inspect}" + end +end + +class MistargetedDelayedJob + @queue = :unique + @delayed_jobs = true + def self.perform(data) + " mistargeteddelayed job executing #{data.inspect}" + end +end + +class NonDelayedJob + @queue = :delayed +end + +class HydraJob + @queue = :hydra + @hydra = 2 +end + +class UniqueHydraJob + @queue = :hydra + @unique_jobs = true + @hydra = 100 +end + +#some redgreen fun +# -*- coding: utf-8 -*- +begin + require 'redgreen' + module Test + module Unit + module UI + module Console + class TestRunner + def test_started(name) + @individual_test_start_time = Time.now + output_single(name + ": ", VERBOSE) + end + + def test_finished(name) + elapsed_test_time = Time.now - @individual_test_start_time + char_to_output = elapsed_test_time > 1 ? "☻" : "." + output_single(char_to_output, PROGRESS_ONLY) unless (@already_outputted) + nl(VERBOSE) + @already_outputted = false + end + end + end + end + end + end + + # -*- coding: utf-8 -*- + class Test::Unit::UI::Console::RedGreenTestRunner < Test::Unit::UI::Console::TestRunner + def output_single(something, level=NORMAL) + return unless (output?(level)) + something = case something + when '.' then Color.green('.') + when '☻' then Color.green('☻') + when 'F' then Color.red("F") + when 'E' then Color.yellow("E") + when '+' then Color.green('+') + else something + end + @io.write(something) + @io.flush + end + end +rescue LoadError + puts "consider gem install redgreen" +end + + diff --git a/test/worker_test.rb b/test/worker_test.rb index e2bad28d8..20020ef05 100644 --- a/test/worker_test.rb +++ b/test/worker_test.rb @@ -2,12 +2,10 @@ context "Resque::Worker" do setup do - Resque.redis.flushall - + Resque.drop Resque.before_first_fork = nil Resque.before_fork = nil Resque.after_fork = nil - @worker = Resque::Worker.new(:jobs) Resque::Job.create(:jobs, SomeJob, 20, '/tmp') end @@ -21,12 +19,13 @@ test "failed jobs report exception and message" do Resque::Job.create(:jobs, BadJobWithSyntaxError) @worker.work(0) - assert_equal('SyntaxError', Resque::Failure.all['exception']) - assert_equal('Extra Bad job!', Resque::Failure.all['error']) + failure = Resque::Failure.all.is_a?(Array) ? Resque::Failure.all.first : Resque::Failure.all + assert_equal('SyntaxError', failure['exception']) + assert_equal('Extra Bad job!', failure['error']) end test "fails uncompleted jobs on exit" do - job = Resque::Job.new(:jobs, [GoodJob, "blah"]) + job = Resque::Job.new(:jobs, ['GoodJob', "blah"]) @worker.working_on(job) @worker.unregister_worker assert_equal 1, Resque::Failure.count @@ -143,9 +142,11 @@ test "records what it is working on" do @worker.work(0) do task = @worker.job + task['payload'].delete "_id" + task['payload'].delete "resque_enqueue_timestamp" assert_equal({"args"=>[20, "/tmp"], "class"=>"SomeJob"}, task['payload']) assert task['run_at'] - assert_equal 'jobs', task['queue'] + assert_equal 'jobs', task['queue'].to_s end end @@ -213,10 +214,11 @@ end end + #this test depends on some OS-specific behavior test "sets $0 while working" do @worker.work(0) do ver = Resque::Version - assert_equal "resque-#{ver}: Processing jobs since #{Time.now.to_i}", $0 + assert_equal "resque-#{ver}: Processing jobs since #{Time.now.to_i}"[0..$0.length-1], $0 end end @@ -260,7 +262,7 @@ end test "Will call a before_first_fork hook only once" do - Resque.redis.flushall + Resque.drop $BEFORE_FORK_CALLED = 0 Resque.before_first_fork = Proc.new { $BEFORE_FORK_CALLED += 1 } workerA = Resque::Worker.new(:jobs) @@ -277,7 +279,7 @@ end test "Will call a before_fork hook before forking" do - Resque.redis.flushall + Resque.drop $BEFORE_FORK_CALLED = false Resque.before_fork = Proc.new { $BEFORE_FORK_CALLED = true } workerA = Resque::Worker.new(:jobs) @@ -289,7 +291,7 @@ end test "Will call an after_fork hook after forking" do - Resque.redis.flushall + Resque.drop $AFTER_FORK_CALLED = false Resque.after_fork = Proc.new { $AFTER_FORK_CALLED = true } workerA = Resque::Worker.new(:jobs)
    <%= state %>