Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 15 additions & 3 deletions app/jobs/concerns/maintenance_tasks/task_job_concern.rb
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
# frozen_string_literal: true

require "json"

module MaintenanceTasks
# Concern that holds the behaviour of the job that runs the tasks. It is
# included in {TaskJob} and if MaintenanceTasks.job is overridden, it must be
Expand Down Expand Up @@ -30,8 +32,18 @@ def retry_on(*, **)

private

def serialized_cursor_position
cursor_position && @run.cursor_is_json ? cursor_position.to_json : cursor_position
end

def deserialized_run_cursor
return JSON.parse(@run.cursor) if @run.cursor && @run.cursor_is_json

@run.cursor
end

def build_enumerator(_run, cursor:)
cursor ||= @run.cursor
cursor ||= deserialized_run_cursor
self.cursor_position = cursor
enumerator_builder = self.enumerator_builder
@collection_enum = @task.enumerator_builder(cursor: cursor)
Expand Down Expand Up @@ -140,7 +152,7 @@ def on_start

def on_shutdown
@run.job_shutdown
@run.cursor = cursor_position
@run.cursor = serialized_cursor_position
@ticker.persist
end

Expand Down Expand Up @@ -177,7 +189,7 @@ def on_error(error)
@ticker.persist if defined?(@ticker)

if defined?(@run)
@run.cursor = cursor_position
@run.cursor = serialized_cursor_position
@run.persist_error(error)

task_context = {
Expand Down
9 changes: 9 additions & 0 deletions app/models/maintenance_tasks/run.rb
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ class Run < ApplicationRecord
enum :status, STATUSES.to_h { |status| [status, status.to_s] }

after_save :instrument_status_change
after_initialize :set_cursor_is_json, if: :new_record?

validate :task_name_belongs_to_a_valid_task, on: :create
validate :csv_attachment_presence, on: :create
Expand Down Expand Up @@ -523,5 +524,13 @@ def stale_object_retry_delay(retry_count)
jitter = delay * 0.25
delay + (rand * 2 - 1) * jitter
end

# After initialize hook - default the cursor type to JSON if it hasn't been
# explicitly set.
def set_cursor_is_json
return unless cursor_is_json.nil?

self.cursor_is_json = true
end
end
end
7 changes: 7 additions & 0 deletions db/migrate/20251128180556_add_cursor_is_json_flag_to_runs.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
# frozen_string_literal: true

class AddCursorIsJsonFlagToRuns < ActiveRecord::Migration[7.1]
def change
add_column(:maintenance_tasks_runs, :cursor_is_json, :boolean)
end
end
11 changes: 6 additions & 5 deletions test/dummy/db/schema.rb
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
#
# It's strongly recommended that you check this file into your version control system.

ActiveRecord::Schema.define(version: 2023_06_22_035229) do
ActiveRecord::Schema[7.1].define(version: 2025_11_28_180556) do
create_table "active_storage_attachments", force: :cascade do |t|
t.string "name", null: false
t.string "record_type", null: false
Expand Down Expand Up @@ -52,19 +52,20 @@
t.string "error_class"
t.string "error_message"
t.text "backtrace"
t.datetime "created_at", precision: 6, null: false
t.datetime "updated_at", precision: 6, null: false
t.datetime "created_at", null: false
t.datetime "updated_at", null: false
t.text "arguments"
t.integer "lock_version", default: 0, null: false
t.text "metadata"
t.boolean "cursor_is_json"
t.index ["task_name", "status", "created_at"], name: "index_maintenance_tasks_runs", order: { created_at: :desc }
end

create_table "posts", force: :cascade do |t|
t.string "title"
t.string "content"
t.datetime "created_at", precision: 6, null: false
t.datetime "updated_at", precision: 6, null: false
t.datetime "created_at", null: false
t.datetime "updated_at", null: false
end

add_foreign_key "active_storage_attachments", "active_storage_blobs", column: "blob_id"
Expand Down
57 changes: 57 additions & 0 deletions test/jobs/maintenance_tasks/task_job_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,32 @@ class << self
assert_equal "0", @run.reload.cursor
end

# Smoke test for backward compatibility with old, non-JSON cursors.
test ".perform_now persists string cursor when run does not encode the cursor as JSON" do
run = Run.create!(task_name: "Maintenance::TestTask", cursor_is_json: false)

Maintenance::TestTask.any_instance.expects(:process).once.with do
run.pausing!
end

TaskJob.perform_now(run)

assert_equal "0", run.reload.cursor
end

# Smoke test for backward compatibility with old, non-JSON cursors.
test ".perform_now starts job from cursor position when run does not encode the cursor as JSON" do
run = Run.create!(
task_name: "Maintenance::TestTask",
cursor_is_json: false,
cursor: "0",
)

Maintenance::TestTask.any_instance.expects(:process).once.with(2)

TaskJob.perform_now(run)
end

test ".perform_now persists cursor when there's an error" do
run = Run.create!(task_name: "Maintenance::ErrorTask")

Expand All @@ -275,6 +301,37 @@ class << self
TaskJob.perform_now(@run)
end

test ".perform_now serializes multi-column cursors to JSON" do
cursor_columns = [:title, :id]
Maintenance::UpdatePostsTask.any_instance.stubs(cursor_columns: cursor_columns)

run = Run.create!(task_name: "Maintenance::UpdatePostsTask")

TaskJob.perform_now(run)

post = Post.order(title: :desc).first

assert_equal [post.title, post.id], JSON.parse(run.reload.cursor)
end

test ".perform_now starts job from multi-column cursor position on resume" do
cursor_columns = [:title, :id]
first_post = Post.order(title: :asc).first
last_post = Post.order(title: :desc).first

Maintenance::UpdatePostsTask.any_instance.stubs(cursor_columns: cursor_columns)
Maintenance::UpdatePostsTask.any_instance.expects(:process).once.with(last_post)

run = Run.create!(
task_name: "Maintenance::UpdatePostsTask",
cursor: [first_post.title, first_post.id].to_json,
)

TaskJob.perform_now(run)

assert_equal [last_post.title, last_post.id], JSON.parse(run.reload.cursor)
end

test ".perform_now accepts Active Record Relations as collection" do
Maintenance::TestTask.any_instance.stubs(collection: Post.all)
Maintenance::TestTask.any_instance.expects(:process).times(Post.count)
Expand Down
21 changes: 21 additions & 0 deletions test/models/maintenance_tasks/run_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -736,6 +736,27 @@ class RunTest < ActiveSupport::TestCase
(Run::ACTIVE_STATUSES + Run::COMPLETED_STATUSES).sort
end

test "after_initialize #cursor_is_json set to true by default" do
assert Run.new.cursor_is_json
end

test "after_initialize #cursor_is_json respects set value" do
run = Run.new(cursor_is_json: false)
assert_equal run.cursor_is_json, false
end

test "after_initialize does not modify value for persisted records" do
run = Run.create!(
task_name: "Maintenance::UpdatePostsTask",
status: :running,
)

# Unset column to simulate an old run record without a value set.
run.update_columns(cursor_is_json: nil)

assert_nil run.reload.cursor_is_json
end

private

def expected_notification(run)
Expand Down