diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index 61ad22a4..439b3b8b 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -3,18 +3,16 @@ name: tests on: push: -jobs: +jobs: rubocop: runs-on: ubuntu-latest - env: - BUNDLE_RUBYGEMS__PKG__GITHUB__COM: gocardless-robot-readonly:${{ secrets.GITHUB_TOKEN }} steps: - uses: actions/checkout@v4 - name: Set up Ruby uses: ruby/setup-ruby@v1 with: bundler-cache: true - ruby-version: "3.0" + ruby-version: "3.3" - name: Run rubocop run: | bundle exec rubocop --extra-details --display-style-guide --no-server --parallel @@ -23,7 +21,7 @@ jobs: strategy: fail-fast: false matrix: - ruby_version: ["3.0", "3.1", "3.2", "3.3"] + ruby_version: ["3.3", "3.4"] rack_version: ["2.2.5", "3.1"] runs-on: ubuntu-latest services: @@ -34,45 +32,20 @@ jobs: POSTGRES_USER: ubuntu POSTGRES_PASSWORD: password ports: - - 5432:5432 + - 5435:5432 options: >- --health-cmd pg_isready --health-interval 10s --health-timeout 5s --health-retries 10 - env: - PGDATABASE: que-test - PGUSER: ubuntu - PGPASSWORD: password - PGHOST: localhost - BUNDLE_RUBYGEMS__PKG__GITHUB__COM: gocardless-robot-readonly:${{ secrets.GITHUB_TOKEN }} - RACK_VERSION: "${{ matrix.rack_version }}" - steps: - - uses: actions/checkout@v4 - - name: Set up Ruby - uses: ruby/setup-ruby@v1 - with: - bundler-cache: true - ruby-version: "${{ matrix.ruby_version }}" - - name: Start bin/que - run: | - bundle exec bin/que ./lib/que.rb --metrics-port=8080 --ci - - rspec: - strategy: - fail-fast: false - matrix: - ruby_version: ["3.0", "3.1", "3.2", "3.3"] - runs-on: ubuntu-latest - services: - postgres: + lock_database: image: postgres:14.2 env: - POSTGRES_DB: que-test + POSTGRES_DB: lock-test POSTGRES_USER: ubuntu POSTGRES_PASSWORD: password ports: - - 5432:5432 + - 5436:5432 options: >- --health-cmd pg_isready --health-interval 10s @@ -83,23 +56,30 @@ jobs: PGUSER: ubuntu PGPASSWORD: password PGHOST: localhost - BUNDLE_RUBYGEMS__PKG__GITHUB__COM: gocardless-robot-readonly:${{ secrets.GITHUB_TOKEN }} + PGPORT: 5435 + LOCK_PGDATABASE: lock-test + LOCK_PGUSER: ubuntu + LOCK_PGPASSWORD: password + LOCK_PGHOST: localhost + LOCK_PGPORT: 5436 + RACK_VERSION: "${{ matrix.rack_version }}" steps: - uses: actions/checkout@v4 - name: Set up Ruby uses: ruby/setup-ruby@v1 with: bundler-cache: true - ruby-version: "${{ matrix.ruby-version }}" - - name: Run specs + ruby-version: "${{ matrix.ruby_version }}" + - name: Run smoke tests run: | - bundle exec rspec + bundle exec rspec spec/smoke_tests/ - active_record_with_lock_adapter_rspec: + rspec: + timeout-minutes: 5 strategy: fail-fast: false matrix: - ruby_version: ["3.0", "3.1", "3.2", "3.3"] + ruby_version: ["3.3", "3.4"] runs-on: ubuntu-latest services: postgres: @@ -109,7 +89,7 @@ jobs: POSTGRES_USER: ubuntu POSTGRES_PASSWORD: password ports: - - 5432:5432 + - 5435:5432 options: >- --health-cmd pg_isready --health-interval 10s @@ -122,23 +102,23 @@ jobs: POSTGRES_USER: ubuntu POSTGRES_PASSWORD: password ports: - - 5434:5432 + - 5436:5432 options: >- --health-cmd pg_isready --health-interval 10s --health-timeout 5s - --health-retries 10 + --health-retries 10 env: PGDATABASE: que-test PGUSER: ubuntu PGPASSWORD: password PGHOST: localhost - BUNDLE_RUBYGEMS__PKG__GITHUB__COM: gocardless-robot-readonly:${{ secrets.GITHUB_TOKEN }} + PGPORT: 5435 LOCK_PGDATABASE: lock-test LOCK_PGUSER: ubuntu LOCK_PGPASSWORD: password LOCK_PGHOST: localhost - ADAPTER: ActiveRecordWithLock + LOCK_PGPORT: 5436 steps: - uses: actions/checkout@v4 - name: Set up Ruby @@ -147,5 +127,4 @@ jobs: bundler-cache: true ruby-version: "${{ matrix.ruby-version }}" - name: Run Specs With ActiveRecordWithLock Adapter - run: bundle exec rspec - + run: bundle exec rspec --exclude-pattern "spec/smoke_tests/**/*" diff --git a/.rubocop.yml b/.rubocop.yml index 9b4d4643..79c18f09 100644 --- a/.rubocop.yml +++ b/.rubocop.yml @@ -1,14 +1,17 @@ inherit_gem: gc_ruboconfig: rubocop.yml -require: +plugins: + - rubocop-rspec - rubocop-performance - rubocop-rake - - rubocop-rspec + +require: - rubocop-sequel AllCops: NewCops: enable + TargetRubyVersion: 3.3 Exclude: - "vendor/**/*" - "legacy_spec/**/*" diff --git a/.ruby-version b/.ruby-version index bea438e9..86fb6504 100644 --- a/.ruby-version +++ b/.ruby-version @@ -1 +1 @@ -3.3.1 +3.3.7 diff --git a/Dockerfile.benchmark b/Dockerfile.benchmark index dffba8a5..4561b68e 100644 --- a/Dockerfile.benchmark +++ b/Dockerfile.benchmark @@ -1,4 +1,4 @@ -FROM ruby:3.0.2 +FROM ruby:3.3 COPY . /que WORKDIR /que/benchmark RUN bundle install diff --git a/Gemfile b/Gemfile index e735f2d2..d031a83e 100644 --- a/Gemfile +++ b/Gemfile @@ -2,6 +2,8 @@ source 'https://rubygems.org' +gemspec + group :development, :test do gem 'rake' @@ -11,11 +13,11 @@ group :development, :test do gem 'pg', require: nil, platform: :ruby gem 'pg_jruby', require: nil, platform: :jruby gem 'pond', require: nil - gem 'rubocop', '~> 1.64.1' - gem 'rubocop-performance', '~> 1.21.1' - gem 'rubocop-rake', '~> 0.6.0' - gem 'rubocop-rspec', '~> 3.0.2' - gem 'rubocop-sequel', '~> 0.3.4' + gem 'rubocop', '~> 1.72.2' + gem 'rubocop-performance', '~> 1.24.0' + gem 'rubocop-rake', '~> 0.7.1' + gem 'rubocop-rspec', '~> 3.5.0' + gem 'rubocop-sequel', '~> 0.3.8' gem 'sequel', require: nil rack_version = ENV.fetch('RACK_VERSION', "3.1") @@ -32,9 +34,3 @@ group :test do gem 'pry-byebug' gem 'rspec', '~> 3.9' end - -gem 'prometheus-client', '~> 1.0' -source "https://rubygems.pkg.github.com/gocardless" do - gem "prometheus_gcstat" -end -gemspec diff --git a/README.md b/README.md index 4e7de8a1..7181aa2e 100644 --- a/README.md +++ b/README.md @@ -139,16 +139,7 @@ Note that running specs requires a running Postgres instance. To start the server in a docker container perform the following: ``` -docker run -p5432:5432 --env POSTGRES_USER=ubuntu --env POSTGRES_PASSWORD=password --env POSTGRES_DB=que-test postgres:11.2 -``` - -Now inform the test suite where Postgres is running using environment variables - -``` -export PGDATABASE=que-test -export PGUSER=ubuntu -export PGPASSWORD=password -export PGHOST=localhost +docker compose up -d --force-recreate ``` A note on running specs - Que's worker system is multithreaded and therefore prone to race conditions (especially on interpreters without a global lock, like Rubinius or JRuby). As such, if you've touched that code, a single spec run passing isn't a guarantee that any changes you've made haven't introduced bugs. One thing I like to do before pushing changes is rerun the specs many times and watching for hangs. You can do this from the command line with something like: diff --git a/bin/que b/bin/que index dddc7b18..b4a0f169 100755 --- a/bin/que +++ b/bin/que @@ -5,11 +5,15 @@ require "logger" require "optparse" require "ostruct" require "prometheus/middleware/exporter" -require "prometheus_gcstat" +begin + require "prometheus_gcstat" +rescue LoadError + # If the gem is not available, we'll just skip it +end require "puma" require "que" require "rack" -USE_RACKUP = Rack.release.split(".")[0].to_i >= 3 +USE_RACKUP = Gem::Version.new(Rack.release) >= Gem::Version.new("3.0.0") if USE_RACKUP require "rackup" end @@ -73,10 +77,6 @@ OptionParser.new do |opts| $stdout.puts opts exit 0 end - - opts.on("--ci", "Don't wait for sigterm exit after boot") do - options.ci = true - end end.parse!(ARGV) # rubocop:enable Layout/LineLength @@ -106,21 +106,6 @@ secondary_queues = options.secondary_queues || [] Que.logger ||= Logger.new($stdout) -if options.ci - require "active_record" - - ActiveRecord::Base.establish_connection( - adapter: "postgresql", - host: ENV.fetch("PGHOST", "localhost"), - user: ENV.fetch("PGUSER", "postgres"), - password: ENV.fetch("PGPASSWORD", ""), - database: ENV.fetch("PGDATABASE", "que-test"), - ) - - Que.connection = ActiveRecord - Que.migrate! -end - begin Que.logger.level = Logger.const_get(log_level.upcase) if log_level rescue NameError @@ -155,8 +140,10 @@ if options.metrics_port health_check = ->(_) { [200, {}, ["healthy"]] } - Prometheus::MemoryStats. - new(Prometheus::Client.registry).start(interval: 10.seconds, delay: 10.seconds) + if defined?(Prometheus::MemoryStats) + Prometheus::MemoryStats. + new(Prometheus::Client.registry).start(interval: 10.seconds, delay: 10.seconds) + end app = Rack::URLMap.new( "/" => Rack::Builder.new do @@ -175,32 +162,24 @@ if options.metrics_port end, ) - host = "0.0.0.0" - - handler = - if USE_RACKUP - Rackup::Handler::Puma - else - Rack::Handler::Puma - end - - pidfile_opts = Dir.exist?("./tmp/pids/") ? {} : { pidfile: nil } - - handler.run( - app, - Host: host, - Port: options.metrics_port, - Silent: false, - AccessLog: [], - **pidfile_opts, - ) + # Uses the Handler::Puma class to build the config and then we call Server directly + # instead of going through the Handler::Puma.run method. This is to prevent the Handler + # from trapping signals and which would prevent Que exiting cleanly. + # https://github.com/puma/puma/blob/master/lib/rack/handler/puma.rb + server_options = { Host: "0.0.0.0", Port: options.metrics_port, Silent: false, AccessLog: [] } + handler = USE_RACKUP ? Rackup::Handler::Puma : Rack::Handler::Puma + puma_config = handler.config(app, server_options) + puma_config.clamp + + log_writer = Puma::LogWriter.stdio + puma_config.options[:log_writer] = log_writer + + Puma::Server.new(app, nil, puma_config.options).tap do |s| + s.binder.parse(puma_config.options[:binds], s.log_writer) + end.run.join end end -# For a basic CI check we just want to ensure the app boots so don't want to -# block the main thread, so this will just exit immediately. -unless options.ci - wait_for_signals("INT", "TERM") -end +wait_for_signals("INT", "TERM") worker_group.stop(timeout) diff --git a/docker-compose.yml b/docker-compose.yml new file mode 100644 index 00000000..9da3c114 --- /dev/null +++ b/docker-compose.yml @@ -0,0 +1,18 @@ +services: + default_db: + image: postgres:14.2 + environment: + POSTGRES_DB: que-test + POSTGRES_USER: postgres + POSTGRES_PASSWORD: password + ports: + - 5435:5432 + + lock_db: + image: postgres:14.2 + environment: + POSTGRES_DB: que-test-lock + POSTGRES_USER: postgres + POSTGRES_PASSWORD: password + ports: + - 5436:5432 diff --git a/lib/que/adapters/base.rb b/lib/que/adapters/base.rb index 2a06412f..6123d569 100644 --- a/lib/que/adapters/base.rb +++ b/lib/que/adapters/base.rb @@ -16,6 +16,34 @@ class UnavailableConnection < StandardError; end class Base attr_reader :instrumenter + CAST_PROCS = { + # booleans + 16 => ->(value) { + case value + when String then value == "t" + else !value.nil? + end + }, + # bigint + 20 => proc(&:to_i), + # smallint + 21 => proc(&:to_i), + # integer + 23 => proc(&:to_i), + # json + 114 => ->(value) { JSON_MODULE.load(value, create_additions: false) }, + # float + 701 => proc(&:to_f), + # timestamp with time zone + 1184 => ->(value) { + case value + when Time then value + when String then Time.parse(value) + else raise "Unexpected time class: #{value.class} (#{value.inspect})" + end + }, + }.freeze + def initialize(_thing = nil) @prepared_statements = {} @instrumenter = nil @@ -51,11 +79,12 @@ def execute(command, params = []) end end - cast_result \ + cast_result( case command when Symbol then execute_prepared(command, params) when String then execute_sql(command, params) - end + end, + ) end def in_transaction? @@ -130,34 +159,6 @@ def log(sql, conn, binds = [], type_casted_binds = [], name = "SQL", statement_n end # rubocop:enable Metrics/ParameterLists - CAST_PROCS = { - # booleans - 16 => ->(value) { - case value - when String then value == "t" - else !value.nil? - end - }, - # bigint - 20 => proc(&:to_i), - # smallint - 21 => proc(&:to_i), - # integer - 23 => proc(&:to_i), - # json - 114 => ->(value) { JSON_MODULE.load(value, create_additions: false) }, - # float - 701 => proc(&:to_f), - # timestamp with time zone - 1184 => ->(value) { - case value - when Time then value - when String then Time.parse(value) - else raise "Unexpected time class: #{value.class} (#{value.inspect})" - end - }, - }.freeze - def cast_result(result) output = result.to_a diff --git a/lib/que/leaky_bucket.rb b/lib/que/leaky_bucket.rb index c226d0f2..98e9f8a2 100644 --- a/lib/que/leaky_bucket.rb +++ b/lib/que/leaky_bucket.rb @@ -29,12 +29,10 @@ def self.sleep(duration) # remains in the bucket, and should only be called after the bucket has been refilled. def observe start = @clock.now - result = yield + yield ensure duration = @clock.now - start @remaining -= duration - - result end # Wait for the bucket to be refilled, given the time that has elapsed since the last diff --git a/que.gemspec b/que.gemspec index bc5ffc55..bfb880c0 100644 --- a/que.gemspec +++ b/que.gemspec @@ -15,22 +15,20 @@ Gem::Specification.new do |spec| spec.homepage = "https://github.com/chanks/que" spec.license = "MIT" - spec.required_ruby_version = ">= 3.0" + spec.required_ruby_version = ">= 3.3" spec.files = `git ls-files`.split($INPUT_RECORD_SEPARATOR) spec.executables = ["que"] spec.require_paths = ["lib"] - # We're pointing to our own branch of the Prometheus Client. - # Ideally we'd do this in the `gemspec`, but you can't do that. - # Instead, we remove the version restriction from `gemspec` and add it to the `Gemfile` - # instead, and in any other clients of `Que`. - # This is highly non ideal, but unless we properly fork, we have to do this for now. + spec.add_dependency "activesupport" + spec.add_dependency "ostruct" spec.add_dependency "prometheus-client" - spec.add_dependency "puma" spec.add_dependency "rack", ">= 2", "< 4" spec.add_dependency "rackup" - spec.add_runtime_dependency "activesupport" spec.metadata["rubygems_mfa_required"] = "true" + + # This is a fork of the Original Que gem, so we don't want to push it to rubygems + spec.metadata["allowed_push_host"] = "" end diff --git a/spec/active_record_with_lock_spec_helper.rb b/spec/active_record_with_lock_spec_helper.rb index 3c10fd0c..c51f85ce 100644 --- a/spec/active_record_with_lock_spec_helper.rb +++ b/spec/active_record_with_lock_spec_helper.rb @@ -1,25 +1,13 @@ # frozen_string_literal: true class LockDatabaseRecord < ActiveRecord::Base - establish_connection( - adapter: "postgresql", - host: ENV.fetch("LOCK_PGHOST", "localhost"), - user: ENV.fetch("LOCK_PGUSER", "postgres"), - password: ENV.fetch("LOCK_PGPASSWORD", "password"), - database: ENV.fetch("LOCK_PGDATABASE", "lock-test"), - port: ENV.fetch("LOCK_PGPORT", 5434), - pool: 5, - ) + self.abstract_class = true + connects_to database: { writing: :lock, reading: :lock } end class JobRecord < ActiveRecord::Base - establish_connection( - adapter: "postgresql", - host: ENV.fetch("PGHOST", "localhost"), - user: ENV.fetch("PGUSER", "ubuntu"), - password: ENV.fetch("PGPASSWORD", "password"), - database: ENV.fetch("PGDATABASE", "que-test"), - ) + self.abstract_class = true + connects_to database: { writing: :default, reading: :default } end def active_record_with_lock_adapter_connection diff --git a/spec/lib/que/adapters/active_record_with_lock_spec.rb b/spec/lib/que/adapters/active_record_with_lock_spec.rb index cb37b5c4..a2ec6695 100644 --- a/spec/lib/que/adapters/active_record_with_lock_spec.rb +++ b/spec/lib/que/adapters/active_record_with_lock_spec.rb @@ -20,6 +20,8 @@ end it "sets correct metric values" do + expect(JobRecord.connection_pool.db_config.env_name).to eq("default") + expect(LockDatabaseRecord.connection_pool.db_config.env_name).to eq("lock") expect(QueJob.count).to eq(10) with_workers(5) { wait_for_jobs_to_be_worked } expect(QueJob.count).to eq(0) diff --git a/spec/lib/que/job_spec.rb b/spec/lib/que/job_spec.rb index dafed510..a833980b 100644 --- a/spec/lib/que/job_spec.rb +++ b/spec/lib/que/job_spec.rb @@ -131,7 +131,7 @@ let(:args) { arg_keys.zip(fake_args.values_at(*arg_keys)).to_h } it "handles them properly" do - described_class.enqueue(1, true, "foo", **job_args.merge(args)) + described_class.enqueue(1, true, "foo", **job_args, **args) job = QueJob.last arg_keys.each do |key| diff --git a/spec/smoke_tests/exits_cleanly_spec.rb b/spec/smoke_tests/exits_cleanly_spec.rb new file mode 100644 index 00000000..79ad5635 --- /dev/null +++ b/spec/smoke_tests/exits_cleanly_spec.rb @@ -0,0 +1,23 @@ +# frozen_string_literal: true + +require "English" +require "open3" +require "net/http" + +RSpec.describe "Exits cleanly", :smoke_test do # rubocop:disable RSpec/DescribeClass + it "exits cleanly when the process receives a SIGINT" do + pid = Process.spawn("bundle exec bin/que ./tasks/smoke_test.rb --metrics-port=8080") + sleep 3 + + response = Net::HTTP.get_response(URI("http://0.0.0.0:8080/metrics")) + + expect(response.code).to eq("200") + expect(response.body).to include("que_locker_acquire_seconds_total") + + Process.kill("INT", pid) + Process.wait(pid) + process_status = $CHILD_STATUS + + expect(process_status.exitstatus).to eq(0) + end +end diff --git a/spec/spec_helper.rb b/spec/spec_helper.rb index 8baa994f..daa4e224 100644 --- a/spec/spec_helper.rb +++ b/spec/spec_helper.rb @@ -6,6 +6,36 @@ require "rspec" require "active_record" +ActiveRecord::Base.configurations = { + "default" => { + adapter: "postgresql", + host: ENV.fetch("PGHOST", "localhost"), + user: ENV.fetch("PGUSER", "postgres"), + password: ENV.fetch("PGPASSWORD", "password"), + database: ENV.fetch("PGDATABASE", "que-test"), + port: ENV.fetch("PGPORT", 5435), + }, + "lock" => { + adapter: "postgresql", + host: ENV.fetch("LOCK_PGHOST", "localhost"), + user: ENV.fetch("LOCK_PGUSER", "postgres"), + password: ENV.fetch("LOCK_PGPASSWORD", "password"), + database: ENV.fetch("LOCK_PGDATABASE", "que-test-lock"), + port: ENV.fetch("LOCK_PGPORT", 5436), + pool: 5, + }, +} + +ActiveRecord::Base.configurations.configs_for(env_name: "default").each do |config| + ActiveRecord::Base.establish_connection(config) +end + +ActiveRecord::Base.configurations.configs_for(env_name: "lock").each do |config| + ActiveRecord::Base.establish_connection(config) +end + +ActiveRecord::Base.connects_to(database: { writing: :default, reading: :default }) + require_relative "helpers/create_user" require_relative "helpers/exceptional_job" require_relative "helpers/fake_job" @@ -15,48 +45,16 @@ require_relative "helpers/user" require_relative "active_record_with_lock_spec_helper" -def postgres_now - ActiveRecord::Base.connection.execute("SELECT NOW();")[0]["now"] -end - -def establish_database_connection - ActiveRecord::Base.establish_connection( - adapter: "postgresql", - host: ENV.fetch("PGHOST", "localhost"), - user: ENV.fetch("PGUSER", "ubuntu"), - password: ENV.fetch("PGPASSWORD", "password"), - database: ENV.fetch("PGDATABASE", "que-test"), - ) -end - -establish_database_connection - # Make sure our test database is prepared to run Que -Que.connection = - case ENV["ADAPTER"] - when "ActiveRecordWithLock" then active_record_with_lock_adapter_connection - else ActiveRecord - end +Que.connection = ActiveRecord +default_adapter = Que.adapter Que.migrate! # Ensure we have a logger, so that we can test the code paths that log -Que.logger = Logger.new("/dev/null") +Que.logger = Logger.new(File::NULL) RSpec.configure do |config| - # Run only specific adapter files based on the adapter class - spec_dir = "./spec/lib" - # Construct the path for the adapter spec file - adapter_spec_class_path = File.join(spec_dir, "#{Que.adapter.class.to_s.underscore}_spec.rb") - - # Exclude patterns for tests in the que/adapters directory - config.exclude_pattern = "**/que/adapters/*.rb" - - # Require the adapter spec file if it exists - if File.exist?(adapter_spec_class_path) - require adapter_spec_class_path - end - config.before do QueJob.delete_all FakeJob.log = [] @@ -69,6 +67,14 @@ def establish_database_connection # configurations. Prometheus::Client.registry.instance_eval { @metrics.clear } end + + config.before do |example| + Que.adapter = if example.metadata[:active_record_with_lock] + active_record_with_lock_adapter_connection + else + default_adapter + end + end end def with_workers(num, stop_timeout: 5, secondary_queues: [], &block) @@ -88,3 +94,7 @@ def wait_for_jobs_to_be_worked(timeout: 10) sleep 0.1 end end + +def postgres_now + ActiveRecord::Base.connection.execute("SELECT NOW();")[0]["now"] +end diff --git a/tasks/smoke_test.rb b/tasks/smoke_test.rb new file mode 100644 index 00000000..d1ddd3cc --- /dev/null +++ b/tasks/smoke_test.rb @@ -0,0 +1,15 @@ +# frozen_string_literal: true + +require "active_record" +require_relative "../lib/que" + +ActiveRecord::Base.establish_connection( + adapter: "postgresql", + host: ENV.fetch("PGHOST", "localhost"), + user: ENV.fetch("PGUSER", "postgres"), + password: ENV.fetch("PGPASSWORD", ""), + database: ENV.fetch("PGDATABASE", "que-test"), +) + +Que.connection = ActiveRecord +Que.migrate!