From 3f002a79610a43341706e7dbfec5b22878ba35b7 Mon Sep 17 00:00:00 2001 From: Clayton Passmore Date: Sat, 29 Nov 2025 22:01:47 -0500 Subject: [PATCH 1/2] Encode cursors as JSON Moving forward, encode cursors as JSON for new runs. Existing runs will continue to use the old cursor encoding mechanism, which was purely taking the cursor value and coercing it to a string via ActiveRecord. This is accomplished by tracking how the cursor is encoded on the run model via a new boolean column, `cursor_is_json`. This value will be false-y for existing runs. This change adds support for multi-column cursors, which can occur when: * A task specifies multiple `cursor_columns`, or * When iterating over an ActiveRecord collection that has a multi-column primary key Fixes #1226 --- .../maintenance_tasks/task_job_concern.rb | 18 +++++- app/models/maintenance_tasks/run.rb | 9 +++ ...8180556_add_cursor_is_json_flag_to_runs.rb | 5 ++ test/dummy/db/schema.rb | 11 ++-- test/jobs/maintenance_tasks/task_job_test.rb | 57 +++++++++++++++++++ test/models/maintenance_tasks/run_test.rb | 21 +++++++ 6 files changed, 113 insertions(+), 8 deletions(-) create mode 100644 db/migrate/20251128180556_add_cursor_is_json_flag_to_runs.rb diff --git a/app/jobs/concerns/maintenance_tasks/task_job_concern.rb b/app/jobs/concerns/maintenance_tasks/task_job_concern.rb index f3163ea07..2442d2e08 100644 --- a/app/jobs/concerns/maintenance_tasks/task_job_concern.rb +++ b/app/jobs/concerns/maintenance_tasks/task_job_concern.rb @@ -1,5 +1,7 @@ # frozen_string_literal: true +require "json" + module MaintenanceTasks # Concern that holds the behaviour of the job that runs the tasks. It is # included in {TaskJob} and if MaintenanceTasks.job is overridden, it must be @@ -30,8 +32,18 @@ def retry_on(*, **) private + def serialized_cursor_position + cursor_position && @run.cursor_is_json ? cursor_position.to_json : cursor_position + end + + def deserialized_run_cursor + return JSON.parse(@run.cursor) if @run.cursor && @run.cursor_is_json + + @run.cursor + end + def build_enumerator(_run, cursor:) - cursor ||= @run.cursor + cursor ||= deserialized_run_cursor self.cursor_position = cursor enumerator_builder = self.enumerator_builder @collection_enum = @task.enumerator_builder(cursor: cursor) @@ -140,7 +152,7 @@ def on_start def on_shutdown @run.job_shutdown - @run.cursor = cursor_position + @run.cursor = serialized_cursor_position @ticker.persist end @@ -177,7 +189,7 @@ def on_error(error) @ticker.persist if defined?(@ticker) if defined?(@run) - @run.cursor = cursor_position + @run.cursor = serialized_cursor_position @run.persist_error(error) task_context = { diff --git a/app/models/maintenance_tasks/run.rb b/app/models/maintenance_tasks/run.rb index 12185396d..46bc05c6e 100644 --- a/app/models/maintenance_tasks/run.rb +++ b/app/models/maintenance_tasks/run.rb @@ -36,6 +36,7 @@ class Run < ApplicationRecord enum :status, STATUSES.to_h { |status| [status, status.to_s] } after_save :instrument_status_change + after_initialize :set_cursor_is_json, if: :new_record? validate :task_name_belongs_to_a_valid_task, on: :create validate :csv_attachment_presence, on: :create @@ -523,5 +524,13 @@ def stale_object_retry_delay(retry_count) jitter = delay * 0.25 delay + (rand * 2 - 1) * jitter end + + # After initialize hook - default the cursor type to JSON if it hasn't been + # explicitly set. + def set_cursor_is_json + return unless cursor_is_json.nil? + + self.cursor_is_json = true + end end end diff --git a/db/migrate/20251128180556_add_cursor_is_json_flag_to_runs.rb b/db/migrate/20251128180556_add_cursor_is_json_flag_to_runs.rb new file mode 100644 index 000000000..031c9ba2a --- /dev/null +++ b/db/migrate/20251128180556_add_cursor_is_json_flag_to_runs.rb @@ -0,0 +1,5 @@ +class AddCursorIsJsonFlagToRuns < ActiveRecord::Migration[7.1] + def change + add_column :maintenance_tasks_runs, :cursor_is_json, :boolean + end +end diff --git a/test/dummy/db/schema.rb b/test/dummy/db/schema.rb index 4032acb9d..48ca682d2 100644 --- a/test/dummy/db/schema.rb +++ b/test/dummy/db/schema.rb @@ -10,7 +10,7 @@ # # It's strongly recommended that you check this file into your version control system. -ActiveRecord::Schema.define(version: 2023_06_22_035229) do +ActiveRecord::Schema[7.1].define(version: 2025_11_28_180556) do create_table "active_storage_attachments", force: :cascade do |t| t.string "name", null: false t.string "record_type", null: false @@ -52,19 +52,20 @@ t.string "error_class" t.string "error_message" t.text "backtrace" - t.datetime "created_at", precision: 6, null: false - t.datetime "updated_at", precision: 6, null: false + t.datetime "created_at", null: false + t.datetime "updated_at", null: false t.text "arguments" t.integer "lock_version", default: 0, null: false t.text "metadata" + t.boolean "cursor_is_json" t.index ["task_name", "status", "created_at"], name: "index_maintenance_tasks_runs", order: { created_at: :desc } end create_table "posts", force: :cascade do |t| t.string "title" t.string "content" - t.datetime "created_at", precision: 6, null: false - t.datetime "updated_at", precision: 6, null: false + t.datetime "created_at", null: false + t.datetime "updated_at", null: false end add_foreign_key "active_storage_attachments", "active_storage_blobs", column: "blob_id" diff --git a/test/jobs/maintenance_tasks/task_job_test.rb b/test/jobs/maintenance_tasks/task_job_test.rb index 0a3fa36eb..c89717311 100644 --- a/test/jobs/maintenance_tasks/task_job_test.rb +++ b/test/jobs/maintenance_tasks/task_job_test.rb @@ -256,6 +256,32 @@ class << self assert_equal "0", @run.reload.cursor end + # Smoke test for backward compatibility with old, non-JSON cursors. + test ".perform_now persists string cursor when run does not encode the cursor as JSON" do + run = Run.create!(task_name: "Maintenance::TestTask", cursor_is_json: false) + + Maintenance::TestTask.any_instance.expects(:process).once.with do + run.pausing! + end + + TaskJob.perform_now(run) + + assert_equal "0", run.reload.cursor + end + + # Smoke test for backward compatibility with old, non-JSON cursors. + test ".perform_now starts job from cursor position when run does not encode the cursor as JSON" do + run = Run.create!( + task_name: "Maintenance::TestTask", + cursor_is_json: false, + cursor: "0" + ) + + Maintenance::TestTask.any_instance.expects(:process).once.with(2) + + TaskJob.perform_now(run) + end + test ".perform_now persists cursor when there's an error" do run = Run.create!(task_name: "Maintenance::ErrorTask") @@ -275,6 +301,37 @@ class << self TaskJob.perform_now(@run) end + test ".perform_now serializes multi-column cursors to JSON" do + cursor_columns = [:title, :id] + Maintenance::UpdatePostsTask.any_instance.stubs(cursor_columns: cursor_columns) + + run = Run.create!(task_name: "Maintenance::UpdatePostsTask") + + TaskJob.perform_now(run) + + post = Post.order(title: :desc).first + + assert_equal [post.title, post.id], JSON.parse(run.reload.cursor) + end + + test ".perform_now starts job from multi-column cursor position on resume" do + cursor_columns = [:title, :id] + first_post = Post.order(title: :asc).first + last_post = Post.order(title: :desc).first + + Maintenance::UpdatePostsTask.any_instance.stubs(cursor_columns: cursor_columns) + Maintenance::UpdatePostsTask.any_instance.expects(:process).once.with(last_post) + + run = Run.create!( + task_name: "Maintenance::UpdatePostsTask", + cursor: [first_post.title, first_post.id].to_json + ) + + TaskJob.perform_now(run) + + assert_equal [last_post.title, last_post.id], JSON.parse(run.reload.cursor) + end + test ".perform_now accepts Active Record Relations as collection" do Maintenance::TestTask.any_instance.stubs(collection: Post.all) Maintenance::TestTask.any_instance.expects(:process).times(Post.count) diff --git a/test/models/maintenance_tasks/run_test.rb b/test/models/maintenance_tasks/run_test.rb index 210b90b0c..662cadab8 100644 --- a/test/models/maintenance_tasks/run_test.rb +++ b/test/models/maintenance_tasks/run_test.rb @@ -736,6 +736,27 @@ class RunTest < ActiveSupport::TestCase (Run::ACTIVE_STATUSES + Run::COMPLETED_STATUSES).sort end + test "after_initialize #cursor_is_json set to true by default" do + assert Run.new.cursor_is_json + end + + test "after_initialize #cursor_is_json respects set value" do + run = Run.new(cursor_is_json: false) + assert_equal run.cursor_is_json, false + end + + test "after_initialize does not modify value for persisted records" do + run = Run.create!( + task_name: "Maintenance::UpdatePostsTask", + status: :running, + ) + + # Unset column to simulate an old run record without a value set. + run.update_columns(cursor_is_json: nil) + + assert_nil run.reload.cursor_is_json + end + private def expected_notification(run) From 0d961e8bc1b47c8937f24b0ed1bf250c9ad67e02 Mon Sep 17 00:00:00 2001 From: Clayton Passmore Date: Sat, 29 Nov 2025 22:14:13 -0500 Subject: [PATCH 2/2] Fix rubocop violations --- db/migrate/20251128180556_add_cursor_is_json_flag_to_runs.rb | 4 +++- test/jobs/maintenance_tasks/task_job_test.rb | 4 ++-- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/db/migrate/20251128180556_add_cursor_is_json_flag_to_runs.rb b/db/migrate/20251128180556_add_cursor_is_json_flag_to_runs.rb index 031c9ba2a..f6253b848 100644 --- a/db/migrate/20251128180556_add_cursor_is_json_flag_to_runs.rb +++ b/db/migrate/20251128180556_add_cursor_is_json_flag_to_runs.rb @@ -1,5 +1,7 @@ +# frozen_string_literal: true + class AddCursorIsJsonFlagToRuns < ActiveRecord::Migration[7.1] def change - add_column :maintenance_tasks_runs, :cursor_is_json, :boolean + add_column(:maintenance_tasks_runs, :cursor_is_json, :boolean) end end diff --git a/test/jobs/maintenance_tasks/task_job_test.rb b/test/jobs/maintenance_tasks/task_job_test.rb index c89717311..326f1b9b4 100644 --- a/test/jobs/maintenance_tasks/task_job_test.rb +++ b/test/jobs/maintenance_tasks/task_job_test.rb @@ -274,7 +274,7 @@ class << self run = Run.create!( task_name: "Maintenance::TestTask", cursor_is_json: false, - cursor: "0" + cursor: "0", ) Maintenance::TestTask.any_instance.expects(:process).once.with(2) @@ -324,7 +324,7 @@ class << self run = Run.create!( task_name: "Maintenance::UpdatePostsTask", - cursor: [first_post.title, first_post.id].to_json + cursor: [first_post.title, first_post.id].to_json, ) TaskJob.perform_now(run)