diff --git a/app/jobs/concerns/maintenance_tasks/task_job_concern.rb b/app/jobs/concerns/maintenance_tasks/task_job_concern.rb index f3163ea0..2442d2e0 100644 --- a/app/jobs/concerns/maintenance_tasks/task_job_concern.rb +++ b/app/jobs/concerns/maintenance_tasks/task_job_concern.rb @@ -1,5 +1,7 @@ # frozen_string_literal: true +require "json" + module MaintenanceTasks # Concern that holds the behaviour of the job that runs the tasks. It is # included in {TaskJob} and if MaintenanceTasks.job is overridden, it must be @@ -30,8 +32,18 @@ def retry_on(*, **) private + def serialized_cursor_position + cursor_position && @run.cursor_is_json ? cursor_position.to_json : cursor_position + end + + def deserialized_run_cursor + return JSON.parse(@run.cursor) if @run.cursor && @run.cursor_is_json + + @run.cursor + end + def build_enumerator(_run, cursor:) - cursor ||= @run.cursor + cursor ||= deserialized_run_cursor self.cursor_position = cursor enumerator_builder = self.enumerator_builder @collection_enum = @task.enumerator_builder(cursor: cursor) @@ -140,7 +152,7 @@ def on_start def on_shutdown @run.job_shutdown - @run.cursor = cursor_position + @run.cursor = serialized_cursor_position @ticker.persist end @@ -177,7 +189,7 @@ def on_error(error) @ticker.persist if defined?(@ticker) if defined?(@run) - @run.cursor = cursor_position + @run.cursor = serialized_cursor_position @run.persist_error(error) task_context = { diff --git a/app/models/maintenance_tasks/run.rb b/app/models/maintenance_tasks/run.rb index 12185396..46bc05c6 100644 --- a/app/models/maintenance_tasks/run.rb +++ b/app/models/maintenance_tasks/run.rb @@ -36,6 +36,7 @@ class Run < ApplicationRecord enum :status, STATUSES.to_h { |status| [status, status.to_s] } after_save :instrument_status_change + after_initialize :set_cursor_is_json, if: :new_record? validate :task_name_belongs_to_a_valid_task, on: :create validate :csv_attachment_presence, on: :create @@ -523,5 +524,13 @@ def stale_object_retry_delay(retry_count) jitter = delay * 0.25 delay + (rand * 2 - 1) * jitter end + + # After initialize hook - default the cursor type to JSON if it hasn't been + # explicitly set. + def set_cursor_is_json + return unless cursor_is_json.nil? + + self.cursor_is_json = true + end end end diff --git a/db/migrate/20251128180556_add_cursor_is_json_flag_to_runs.rb b/db/migrate/20251128180556_add_cursor_is_json_flag_to_runs.rb new file mode 100644 index 00000000..f6253b84 --- /dev/null +++ b/db/migrate/20251128180556_add_cursor_is_json_flag_to_runs.rb @@ -0,0 +1,7 @@ +# frozen_string_literal: true + +class AddCursorIsJsonFlagToRuns < ActiveRecord::Migration[7.1] + def change + add_column(:maintenance_tasks_runs, :cursor_is_json, :boolean) + end +end diff --git a/test/dummy/db/schema.rb b/test/dummy/db/schema.rb index 4032acb9..48ca682d 100644 --- a/test/dummy/db/schema.rb +++ b/test/dummy/db/schema.rb @@ -10,7 +10,7 @@ # # It's strongly recommended that you check this file into your version control system. -ActiveRecord::Schema.define(version: 2023_06_22_035229) do +ActiveRecord::Schema[7.1].define(version: 2025_11_28_180556) do create_table "active_storage_attachments", force: :cascade do |t| t.string "name", null: false t.string "record_type", null: false @@ -52,19 +52,20 @@ t.string "error_class" t.string "error_message" t.text "backtrace" - t.datetime "created_at", precision: 6, null: false - t.datetime "updated_at", precision: 6, null: false + t.datetime "created_at", null: false + t.datetime "updated_at", null: false t.text "arguments" t.integer "lock_version", default: 0, null: false t.text "metadata" + t.boolean "cursor_is_json" t.index ["task_name", "status", "created_at"], name: "index_maintenance_tasks_runs", order: { created_at: :desc } end create_table "posts", force: :cascade do |t| t.string "title" t.string "content" - t.datetime "created_at", precision: 6, null: false - t.datetime "updated_at", precision: 6, null: false + t.datetime "created_at", null: false + t.datetime "updated_at", null: false end add_foreign_key "active_storage_attachments", "active_storage_blobs", column: "blob_id" diff --git a/test/jobs/maintenance_tasks/task_job_test.rb b/test/jobs/maintenance_tasks/task_job_test.rb index 0a3fa36e..326f1b9b 100644 --- a/test/jobs/maintenance_tasks/task_job_test.rb +++ b/test/jobs/maintenance_tasks/task_job_test.rb @@ -256,6 +256,32 @@ class << self assert_equal "0", @run.reload.cursor end + # Smoke test for backward compatibility with old, non-JSON cursors. + test ".perform_now persists string cursor when run does not encode the cursor as JSON" do + run = Run.create!(task_name: "Maintenance::TestTask", cursor_is_json: false) + + Maintenance::TestTask.any_instance.expects(:process).once.with do + run.pausing! + end + + TaskJob.perform_now(run) + + assert_equal "0", run.reload.cursor + end + + # Smoke test for backward compatibility with old, non-JSON cursors. + test ".perform_now starts job from cursor position when run does not encode the cursor as JSON" do + run = Run.create!( + task_name: "Maintenance::TestTask", + cursor_is_json: false, + cursor: "0", + ) + + Maintenance::TestTask.any_instance.expects(:process).once.with(2) + + TaskJob.perform_now(run) + end + test ".perform_now persists cursor when there's an error" do run = Run.create!(task_name: "Maintenance::ErrorTask") @@ -275,6 +301,37 @@ class << self TaskJob.perform_now(@run) end + test ".perform_now serializes multi-column cursors to JSON" do + cursor_columns = [:title, :id] + Maintenance::UpdatePostsTask.any_instance.stubs(cursor_columns: cursor_columns) + + run = Run.create!(task_name: "Maintenance::UpdatePostsTask") + + TaskJob.perform_now(run) + + post = Post.order(title: :desc).first + + assert_equal [post.title, post.id], JSON.parse(run.reload.cursor) + end + + test ".perform_now starts job from multi-column cursor position on resume" do + cursor_columns = [:title, :id] + first_post = Post.order(title: :asc).first + last_post = Post.order(title: :desc).first + + Maintenance::UpdatePostsTask.any_instance.stubs(cursor_columns: cursor_columns) + Maintenance::UpdatePostsTask.any_instance.expects(:process).once.with(last_post) + + run = Run.create!( + task_name: "Maintenance::UpdatePostsTask", + cursor: [first_post.title, first_post.id].to_json, + ) + + TaskJob.perform_now(run) + + assert_equal [last_post.title, last_post.id], JSON.parse(run.reload.cursor) + end + test ".perform_now accepts Active Record Relations as collection" do Maintenance::TestTask.any_instance.stubs(collection: Post.all) Maintenance::TestTask.any_instance.expects(:process).times(Post.count) diff --git a/test/models/maintenance_tasks/run_test.rb b/test/models/maintenance_tasks/run_test.rb index 210b90b0..662cadab 100644 --- a/test/models/maintenance_tasks/run_test.rb +++ b/test/models/maintenance_tasks/run_test.rb @@ -736,6 +736,27 @@ class RunTest < ActiveSupport::TestCase (Run::ACTIVE_STATUSES + Run::COMPLETED_STATUSES).sort end + test "after_initialize #cursor_is_json set to true by default" do + assert Run.new.cursor_is_json + end + + test "after_initialize #cursor_is_json respects set value" do + run = Run.new(cursor_is_json: false) + assert_equal run.cursor_is_json, false + end + + test "after_initialize does not modify value for persisted records" do + run = Run.create!( + task_name: "Maintenance::UpdatePostsTask", + status: :running, + ) + + # Unset column to simulate an old run record without a value set. + run.update_columns(cursor_is_json: nil) + + assert_nil run.reload.cursor_is_json + end + private def expected_notification(run)