From ad612c44a980d3d1d6e0805c3294ab14d0813bc0 Mon Sep 17 00:00:00 2001 From: Aleksandr Golubov Date: Tue, 22 Apr 2025 15:41:44 +0500 Subject: [PATCH 1/9] skipping jobs due to reaching the maximum number of attempts --- lib/queuetopia/queue.ex | 15 +++++++- lib/queuetopia/queue/job_queryable.ex | 1 + test/queuetopia/queue_test.exs | 52 +++++++++++++++++++++++++++ 3 files changed, 67 insertions(+), 1 deletion(-) diff --git a/lib/queuetopia/queue.ex b/lib/queuetopia/queue.ex index ccfc87d..81935bd 100644 --- a/lib/queuetopia/queue.ex +++ b/lib/queuetopia/queue.ex @@ -108,7 +108,7 @@ defmodule Queuetopia.Queue do """ @spec processable_now?(Job.t()) :: boolean def processable_now?(%Job{} = job) do - not done?(job) and scheduled_for_now?(job) + not done?(job) and not skipped?(job) and scheduled_for_now?(job) end @doc """ @@ -120,6 +120,15 @@ defmodule Queuetopia.Queue do not is_nil(job.done_at) end + @doc """ + Returns true if a job is skipped. + Otherwise, returns false. + """ + @spec skipped?(Job.t()) :: boolean + def skipped?(%Job{} = job) do + job.attempts >= job.max_attempts + end + @doc """ Returns true if a job scheduled date is reached. Otherwise, returns false. @@ -165,6 +174,7 @@ defmodule Queuetopia.Queue do Job |> where([j], j.scope == ^scope) |> where([j], is_nil(j.done_at)) + |> where([j], j.attempts < j.max_attempts) |> where([j], j.queue not in subquery(locked_queues)) |> where([j], j.queue not in subquery(blocked_queues)) |> where_immediately_executable_job.() @@ -191,6 +201,7 @@ defmodule Queuetopia.Queue do |> where([j], j.queue == ^queue) |> where([j], j.scope == ^scope) |> where([j], is_nil(j.done_at)) + |> where([j], j.attempts < j.max_attempts) |> order_by(asc: :scheduled_at, asc: :sequence) |> limit(1) |> repo.one() @@ -212,10 +223,12 @@ defmodule Queuetopia.Queue do job = repo.get(Job, id) with {:done?, false} <- {:done?, done?(job)}, + {:skipped?, false} <- {:skipped?, skipped?(job)}, {:scheduled_for_now?, true} <- {:scheduled_for_now?, scheduled_for_now?(job)} do {:ok, job} else {:done?, true} -> {:error, "already done"} + {:skipped?, true} -> {:error, "skipped"} {:scheduled_for_now?, false} -> {:error, "scheduled for later"} end end) diff --git a/lib/queuetopia/queue/job_queryable.ex b/lib/queuetopia/queue/job_queryable.ex index 16342f9..24f6b1b 100644 --- a/lib/queuetopia/queue/job_queryable.ex +++ b/lib/queuetopia/queue/job_queryable.ex @@ -10,6 +10,7 @@ defmodule Queuetopia.Queue.JobQueryable do defp filter_by_field(queryable, {:available?, true}) do queryable |> where([job], is_nil(job.done_at)) + |> where([job], job.attempts < job.max_attempts) end defp filter_by_field(_queryable, {key, _value}) when key not in @filterable_fields do diff --git a/test/queuetopia/queue_test.exs b/test/queuetopia/queue_test.exs index 01bae72..680f63c 100644 --- a/test/queuetopia/queue_test.exs +++ b/test/queuetopia/queue_test.exs @@ -43,6 +43,18 @@ defmodule Queuetopia.QueueTest do assert [] = Queue.list_available_pending_queues(TestRepo, scope_1) end + test "don't list queues whom jobs are skipped due to reaching the maximum number of attempts" do + %{queue: _queue, scope: scope} = + insert!(:job, + scheduled_at: utc_now() |> add(-3600), + next_attempt_at: utc_now(), + attempts: 5, + max_attempts: 5 + ) + + assert [] = Queue.list_available_pending_queues(TestRepo, scope) + end + test "when limit is given, returns only the specified number of rows from the result set" do %{queue: queue, scope: scope} = insert!(:job) insert!(:job, queue: queue, scope: scope) @@ -146,6 +158,13 @@ defmodule Queuetopia.QueueTest do assert is_nil(Queue.get_next_pending_job(TestRepo, scope, queue)) end + + test "when job skipped due to reaching the maximum number of attempts, returns nil" do + %Job{queue: queue, scope: scope} = + insert!(:job, next_attempt_at: utc_now(), attempts: 20, max_attempts: 20) + + assert is_nil(Queue.get_next_pending_job(TestRepo, scope, queue)) + end end describe "fetch_job/2" do @@ -176,6 +195,13 @@ defmodule Queuetopia.QueueTest do assert is_nil(TestRepo.get_by(Lock, scope: scope, queue: queue)) end + test "when the job is skipped due to reaching the maximum number of attempts" do + %{queue: queue, scope: scope} = job = insert!(:job, attempts: 20, max_attempts: 20) + + assert {:error, "skipped"} = Queue.fetch_job(TestRepo, job) + assert is_nil(TestRepo.get_by(Lock, scope: scope, queue: queue)) + end + test "when the job is scheduled for later" do %Job{queue: queue, scope: scope} = job = insert!(:job, scheduled_at: utc_now() |> add(3600, :second)) @@ -428,6 +454,11 @@ defmodule Queuetopia.QueueTest do refute Queue.processable_now?(job) end + test "when the job is skipped due to reaching the maximum number of attempts" do + job = insert!(:job, attempts: 10, max_attempts: 10) + refute Queue.processable_now?(job) + end + test "when the job is scheduled for later" do job = insert!(:job, scheduled_at: utc_now() |> add(3600, :second)) refute Queue.processable_now?(job) @@ -446,6 +477,18 @@ defmodule Queuetopia.QueueTest do end end + describe "skipped?/1" do + test "when the job is not skipped" do + job = insert!(:job) + refute Queue.skipped?(job) + end + + test "when the maximum number of attempts reached" do + job = insert!(:job, attempts: 10, max_attempts: 10) + assert Queue.skipped?(job) + end + end + describe "scheduled_for_now?/1" do test "when the job is scheduled for now" do job = insert!(:job) @@ -519,6 +562,11 @@ defmodule Queuetopia.QueueTest do test "filters" do insert!(:job, done_at: utc_now()) + assert %{data: [], total: 0} = + Queue.paginate_jobs(TestRepo, 100, 1, filters: [available?: true]) + + insert!(:job, attempts: 3, max_attempts: 3) + assert %{data: [], total: 0} = Queue.paginate_jobs(TestRepo, 100, 1, filters: [available?: true]) @@ -582,6 +630,10 @@ defmodule Queuetopia.QueueTest do assert Queue.list_jobs(TestRepo, filters: [available?: true]) == [] + insert!(:job, attempts: 1, max_attempts: 1) + + assert Queue.list_jobs(TestRepo, filters: [available?: true]) == [] + %{id: id} = job = insert!(:job) [ From 7516417164b826f9d507567e4d7aff841b6907fc Mon Sep 17 00:00:00 2001 From: Aleksandr Golubov Date: Thu, 24 Apr 2025 18:22:21 +0500 Subject: [PATCH 2/9] Update Job schema: rename done_at to ended_at, added end_status --- lib/queuetopia/migrations.ex | 2 +- lib/queuetopia/migrations/v6.ex | 23 +++++ lib/queuetopia/queue.ex | 51 ++++++---- lib/queuetopia/queue/job.ex | 21 +++- lib/queuetopia/queue/job_queryable.ex | 3 +- mix.exs | 2 +- test/queuetopia/queue/job_test.exs | 108 ++++++++++++++++++-- test/queuetopia/queue_test.exs | 137 ++++++++++++-------------- test/queuetopia/scheduler_test.exs | 8 +- test/support/factory.ex | 8 +- 10 files changed, 244 insertions(+), 119 deletions(-) create mode 100644 lib/queuetopia/migrations/v6.ex diff --git a/lib/queuetopia/migrations.ex b/lib/queuetopia/migrations.ex index db12d2e..9f2e2e8 100644 --- a/lib/queuetopia/migrations.ex +++ b/lib/queuetopia/migrations.ex @@ -2,7 +2,7 @@ defmodule Queuetopia.Migrations do use Ecto.Migration @initial_version 0 - @current_version 5 + @current_version 6 def up(opts \\ []) when is_list(opts) do from_version = Keyword.get(opts, :from_version, @initial_version) |> Kernel.+(1) to_version = Keyword.get(opts, :to_version, @current_version) diff --git a/lib/queuetopia/migrations/v6.ex b/lib/queuetopia/migrations/v6.ex new file mode 100644 index 0000000..2c78d58 --- /dev/null +++ b/lib/queuetopia/migrations/v6.ex @@ -0,0 +1,23 @@ +defmodule Queuetopia.Migrations.V6 do + @moduledoc false + + use Ecto.Migration + + def up do + rename(table(:queuetopia_jobs), :done_at, to: :ended_at) + rename(index(:queuetopia_jobs, [:ended_at], name: "queuetopia_jobs_done_at_index"), to: "queuetopia_jobs_ended_at_index") + + alter table(:queuetopia_jobs) do + add(:end_status, :string, null: true) + end + end + + def down do + rename(table(:queuetopia_jobs), :ended_at, to: :done_at) + rename(index(:queuetopia_jobs, [:done_at], name: "queuetopia_jobs_ended_at_index"), to: "queuetopia_jobs_done_at_index") + + alter table(:queuetopia_jobs) do + remove(:end_status) + end + end +end diff --git a/lib/queuetopia/queue.ex b/lib/queuetopia/queue.ex index 81935bd..f2714b7 100644 --- a/lib/queuetopia/queue.ex +++ b/lib/queuetopia/queue.ex @@ -103,30 +103,30 @@ defmodule Queuetopia.Queue do end @doc """ - Returns true if a job scheduled date is reached and the job is not done yet. + Returns true if a job scheduled date is reached and the job is not ended yet. Otherwise, returns false. """ @spec processable_now?(Job.t()) :: boolean def processable_now?(%Job{} = job) do - not done?(job) and not skipped?(job) and scheduled_for_now?(job) + not ended?(job) and scheduled_for_now?(job) end @doc """ - Returns true if a job is done. + Returns true if a job is successfully ended. Otherwise, returns false. """ @spec done?(Job.t()) :: boolean def done?(%Job{} = job) do - not is_nil(job.done_at) + ended?(job) and job.end_status == "success" end @doc """ - Returns true if a job is skipped. + Returns true if a job is somehow ended. Otherwise, returns false. """ - @spec skipped?(Job.t()) :: boolean - def skipped?(%Job{} = job) do - job.attempts >= job.max_attempts + @spec ended?(Job.t()) :: boolean + def ended?(%Job{} = job) do + not is_nil(job.ended_at) end @doc """ @@ -156,7 +156,7 @@ defmodule Queuetopia.Queue do Job |> select([:queue]) |> where([j], j.scope == ^scope) - |> where([j], is_nil(j.done_at)) + |> where([j], is_nil(j.ended_at)) |> where([j], j.scheduled_at <= ^utc_now and j.next_attempt_at > ^utc_now) where_immediately_executable_job = fn queryable -> @@ -173,8 +173,7 @@ defmodule Queuetopia.Queue do query = Job |> where([j], j.scope == ^scope) - |> where([j], is_nil(j.done_at)) - |> where([j], j.attempts < j.max_attempts) + |> where([j], is_nil(j.ended_at)) |> where([j], j.queue not in subquery(locked_queues)) |> where([j], j.queue not in subquery(blocked_queues)) |> where_immediately_executable_job.() @@ -200,8 +199,7 @@ defmodule Queuetopia.Queue do Job |> where([j], j.queue == ^queue) |> where([j], j.scope == ^scope) - |> where([j], is_nil(j.done_at)) - |> where([j], j.attempts < j.max_attempts) + |> where([j], is_nil(j.ended_at)) |> order_by(asc: :scheduled_at, asc: :sequence) |> limit(1) |> repo.one() @@ -222,13 +220,11 @@ defmodule Queuetopia.Queue do |> Ecto.Multi.run(:job, fn _, _ -> job = repo.get(Job, id) - with {:done?, false} <- {:done?, done?(job)}, - {:skipped?, false} <- {:skipped?, skipped?(job)}, + with {:ended?, false} <- {:ended?, ended?(job)}, {:scheduled_for_now?, true} <- {:scheduled_for_now?, scheduled_for_now?(job)} do {:ok, job} else - {:done?, true} -> {:error, "already done"} - {:skipped?, true} -> {:error, "skipped"} + {:ended?, true} -> {:error, "already ended"} {:scheduled_for_now?, false} -> {:error, "scheduled for later"} end end) @@ -259,13 +255,30 @@ defmodule Queuetopia.Queue do def persist_result!(repo, %Job{} = job, unexpected_response), do: persist_failure!(repo, job, inspect(unexpected_response)) + defp persist_failure!(repo, %Job{attempts: attempts, max_attempts: max_attempts} = job, error) + when attempts + 1 >= max_attempts do + utc_now = DateTime.utc_now() |> DateTime.truncate(:second) + performer = resolve_performer(job) + + job + |> Job.failed_job_changeset(%{ + attempts: job.attempts + 1, + attempted_at: utc_now, + attempted_by: Atom.to_string(Node.self()), + ended_at: utc_now, + error: error, + }) + |> repo.update!() + |> tap(&performer.handle_failed_job!/1) + end + defp persist_failure!(repo, %Job{} = job, error) do utc_now = DateTime.utc_now() |> DateTime.truncate(:second) performer = resolve_performer(job) backoff = performer.backoff(job) job - |> Job.failed_job_changeset(%{ + |> Job.retry_job_changeset(%{ attempts: job.attempts + 1, attempted_at: utc_now, attempted_by: Atom.to_string(Node.self()), @@ -284,7 +297,7 @@ defmodule Queuetopia.Queue do attempts: job.attempts + 1, attempted_at: utc_now, attempted_by: Atom.to_string(Node.self()), - done_at: utc_now + ended_at: utc_now }) |> repo.update!() end diff --git a/lib/queuetopia/queue/job.ex b/lib/queuetopia/queue/job.ex index 52c9b25..adcbe6d 100644 --- a/lib/queuetopia/queue/job.ex +++ b/lib/queuetopia/queue/job.ex @@ -31,7 +31,8 @@ defmodule Queuetopia.Queue.Job do field(:attempted_at, :utc_datetime) field(:attempted_by, :string) field(:next_attempt_at, :utc_datetime) - field(:done_at, :utc_datetime) + field(:ended_at, :utc_datetime) + field(:end_status, :string) field(:error, :string) timestamps(type: :utc_datetime) @@ -73,20 +74,30 @@ defmodule Queuetopia.Queue.Job do |> validate_number(:max_attempts, greater_than_or_equal_to: 0) end - @spec failed_job_changeset(Job.t(), map) :: Ecto.Changeset.t() - def failed_job_changeset(%__MODULE__{} = job, attrs) when is_map(attrs) do + @spec retry_job_changeset(Job.t(), map) :: Ecto.Changeset.t() + def retry_job_changeset(%__MODULE__{} = job, attrs) when is_map(attrs) do job |> cast(attrs, [:attempts, :attempted_at, :attempted_by, :next_attempt_at, :error]) |> validate_required_attempt_attributes |> validate_required([:next_attempt_at, :error]) end + @spec failed_job_changeset(Job.t(), map) :: Ecto.Changeset.t() + def failed_job_changeset(%__MODULE__{} = job, attrs) when is_map(attrs) do + job + |> cast(attrs, [:attempts, :attempted_at, :attempted_by, :ended_at, :error]) + |> validate_required_attempt_attributes + |> validate_required([:ended_at, :error]) + |> put_change(:end_status, "failed") + end + @spec succeeded_job_changeset(Job.t(), map) :: Ecto.Changeset.t() def succeeded_job_changeset(%__MODULE__{} = job, attrs) when is_map(attrs) do job - |> cast(attrs, [:attempts, :attempted_at, :attempted_by, :done_at]) + |> cast(attrs, [:attempts, :attempted_at, :attempted_by, :ended_at]) |> validate_required_attempt_attributes - |> validate_required([:done_at]) + |> validate_required([:ended_at]) + |> put_change(:end_status, "success") |> put_change(:error, nil) end diff --git a/lib/queuetopia/queue/job_queryable.ex b/lib/queuetopia/queue/job_queryable.ex index 24f6b1b..773840f 100644 --- a/lib/queuetopia/queue/job_queryable.ex +++ b/lib/queuetopia/queue/job_queryable.ex @@ -9,8 +9,7 @@ defmodule Queuetopia.Queue.JobQueryable do defp filter_by_field(queryable, {:available?, true}) do queryable - |> where([job], is_nil(job.done_at)) - |> where([job], job.attempts < job.max_attempts) + |> where([job], is_nil(job.ended_at)) end defp filter_by_field(_queryable, {key, _value}) when key not in @filterable_fields do diff --git a/mix.exs b/mix.exs index 7383f42..b68e873 100644 --- a/mix.exs +++ b/mix.exs @@ -2,7 +2,7 @@ defmodule Queuetopia.MixProject do use Mix.Project @source_url "https://github.com/annatel/queuetopia" - @version "2.5.2" + @version "2.6.0" def project do [ diff --git a/test/queuetopia/queue/job_test.exs b/test/queuetopia/queue/job_test.exs index 34db89b..842a9bd 100644 --- a/test/queuetopia/queue/job_test.exs +++ b/test/queuetopia/queue/job_test.exs @@ -123,7 +123,7 @@ defmodule Queuetopia.Queue.JobTest do end end - describe "failed_job_changeset/2" do + describe "retry_job_changeset/2" do test "only permitted_keys are casted" do job = insert!(:job) @@ -136,7 +136,7 @@ defmodule Queuetopia.Queue.JobTest do error: "error" ) - changeset = Job.failed_job_changeset(job, Map.merge(params, %{new_key: "value"})) + changeset = Job.retry_job_changeset(job, Map.merge(params, %{new_key: "value"})) changes_keys = changeset.changes |> Map.keys() assert :attempts in changes_keys @@ -152,7 +152,7 @@ defmodule Queuetopia.Queue.JobTest do test "when required params are missing, returns an invalid changeset" do job = insert!(:job) - changeset = Job.failed_job_changeset(job, %{attempts: nil, scheduled_at: nil}) + changeset = Job.retry_job_changeset(job, %{attempts: nil, scheduled_at: nil}) refute changeset.valid? assert %{attempts: ["can't be blank"]} = errors_on(changeset) @@ -167,7 +167,7 @@ defmodule Queuetopia.Queue.JobTest do job = insert!(:job) changeset = - Job.failed_job_changeset(job, %{ + Job.retry_job_changeset(job, %{ attempts: 1, attempted_at: utc_now, attempted_by: Atom.to_string(Node.self()), @@ -179,6 +179,77 @@ defmodule Queuetopia.Queue.JobTest do end end + describe "failed_job_changeset/2" do + test "only permitted_keys are casted" do + job = insert!(:job) + + params = + params_for(:job, + attempts: 6, + attempted_at: utc_now(), + attempted_by: Atom.to_string(Node.self()), + ended_at: utc_now(), + error: "error" + ) + + changeset = Job.failed_job_changeset(job, Map.merge(params, %{new_key: "value"})) + changes_keys = changeset.changes |> Map.keys() + + assert :attempts in changes_keys + assert :attempted_at in changes_keys + assert :attempted_by in changes_keys + assert :ended_at in changes_keys + assert :end_status in changes_keys + assert :error in changes_keys + assert Enum.count(changes_keys) == 6 + + refute :new_key in changes_keys + end + + test "when required params are missing, returns an invalid changeset" do + job = insert!(:job) + + changeset = Job.failed_job_changeset(job, %{attempts: nil, scheduled_at: nil}) + + refute changeset.valid? + assert %{attempts: ["can't be blank"]} = errors_on(changeset) + assert %{attempted_at: ["can't be blank"]} = errors_on(changeset) + assert %{attempted_by: ["can't be blank"]} = errors_on(changeset) + assert %{ended_at: ["can't be blank"]} = errors_on(changeset) + assert %{error: ["can't be blank"]} = errors_on(changeset) + end + + test "fills the end_status field by 'failed'" do + job = insert!(:job) + + changeset = + Job.failed_job_changeset(job, %{ + attempts: 1, + attempted_at: utc_now(), + attempted_by: Atom.to_string(Node.self()), + ended_at: utc_now(), + error: "error" + }) + + assert changeset.changes.end_status == "failed" + end + + test "when params are valid, return a valid changeset" do + job = insert!(:job) + + changeset = + Job.failed_job_changeset(job, %{ + attempts: 1, + attempted_at: utc_now(), + attempted_by: Atom.to_string(Node.self()), + ended_at: utc_now(), + error: "error" + }) + + assert changeset.valid? + end + end + describe "succeeded_job_changeset/2" do test "only permitted_keys are casted" do job = insert!(:job) @@ -188,7 +259,7 @@ defmodule Queuetopia.Queue.JobTest do attempts: 6, attempted_at: utc_now(), attempted_by: Atom.to_string(Node.self()), - done_at: utc_now() + ended_at: utc_now() ) changeset = Job.succeeded_job_changeset(job, Map.merge(params, %{new_key: "value"})) @@ -197,8 +268,9 @@ defmodule Queuetopia.Queue.JobTest do assert :attempts in changes_keys assert :attempted_at in changes_keys assert :attempted_by in changes_keys - assert :done_at in changes_keys - assert Enum.count(changes_keys) == 4 + assert :ended_at in changes_keys + assert :end_status in changes_keys + assert Enum.count(changes_keys) == 5 refute :new_key in changes_keys end @@ -211,7 +283,7 @@ defmodule Queuetopia.Queue.JobTest do assert %{attempts: ["can't be blank"]} = errors_on(changeset) assert %{attempted_at: ["can't be blank"]} = errors_on(changeset) assert %{attempted_by: ["can't be blank"]} = errors_on(changeset) - assert %{done_at: ["can't be blank"]} = errors_on(changeset) + assert %{ended_at: ["can't be blank"]} = errors_on(changeset) end test "nillifies error field" do @@ -222,7 +294,7 @@ defmodule Queuetopia.Queue.JobTest do attempts: 6, attempted_at: utc_now(), attempted_by: Atom.to_string(Node.self()), - done_at: utc_now() + ended_at: utc_now() ) changeset = Job.succeeded_job_changeset(job, params) @@ -230,6 +302,22 @@ defmodule Queuetopia.Queue.JobTest do assert is_nil(changeset.changes.error) end + test "fills the end_status field by 'success'" do + job = insert!(:job) + + params = + params_for(:job, + attempts: 6, + attempted_at: utc_now(), + attempted_by: Atom.to_string(Node.self()), + ended_at: utc_now() + ) + + changeset = Job.succeeded_job_changeset(job, params) + + assert changeset.changes.end_status == "success" + end + test "when params are valid, return a valid changeset" do utc_now = utc_now() job = insert!(:job) @@ -239,7 +327,7 @@ defmodule Queuetopia.Queue.JobTest do attempts: 1, attempted_at: utc_now, attempted_by: Atom.to_string(Node.self()), - done_at: utc_now + ended_at: utc_now }) assert changeset.valid? diff --git a/test/queuetopia/queue_test.exs b/test/queuetopia/queue_test.exs index 680f63c..49b3947 100644 --- a/test/queuetopia/queue_test.exs +++ b/test/queuetopia/queue_test.exs @@ -38,20 +38,8 @@ defmodule Queuetopia.QueueTest do assert [] = Queue.list_available_pending_queues(TestRepo, scope) end - test "don't list queues whom jobs are done" do - %{queue: _queue_1, scope: scope_1} = insert!(:done_job) - assert [] = Queue.list_available_pending_queues(TestRepo, scope_1) - end - - test "don't list queues whom jobs are skipped due to reaching the maximum number of attempts" do - %{queue: _queue, scope: scope} = - insert!(:job, - scheduled_at: utc_now() |> add(-3600), - next_attempt_at: utc_now(), - attempts: 5, - max_attempts: 5 - ) - + test "don't list queues whom jobs are ended" do + %{queue: _queue, scope: scope} = insert!(:ended_job) assert [] = Queue.list_available_pending_queues(TestRepo, scope) end @@ -89,7 +77,7 @@ defmodule Queuetopia.QueueTest do describe "get_next_pending_job/2" do test "returns the next pending job for a given scoped queue" do - %{queue: queue_1, scope: scope_1} = insert!(:done_job) + %{queue: queue_1, scope: scope_1} = insert!(:ended_job) %{id: id_1} = insert!(:job, queue: queue_1, scope: scope_1) %{id: id_2, queue: queue_2} = insert!(:job, scope: scope_1) @@ -158,13 +146,6 @@ defmodule Queuetopia.QueueTest do assert is_nil(Queue.get_next_pending_job(TestRepo, scope, queue)) end - - test "when job skipped due to reaching the maximum number of attempts, returns nil" do - %Job{queue: queue, scope: scope} = - insert!(:job, next_attempt_at: utc_now(), attempts: 20, max_attempts: 20) - - assert is_nil(Queue.get_next_pending_job(TestRepo, scope, queue)) - end end describe "fetch_job/2" do @@ -188,17 +169,10 @@ defmodule Queuetopia.QueueTest do assert %Lock{id: ^id} = TestRepo.get_by(Lock, scope: scope, queue: queue) end - test "when the job is done" do - %Job{queue: queue, scope: scope} = job = insert!(:done_job) - - assert {:error, "already done"} = Queue.fetch_job(TestRepo, job) - assert is_nil(TestRepo.get_by(Lock, scope: scope, queue: queue)) - end - - test "when the job is skipped due to reaching the maximum number of attempts" do - %{queue: queue, scope: scope} = job = insert!(:job, attempts: 20, max_attempts: 20) + test "when the job is ended" do + %Job{queue: queue, scope: scope} = job = insert!(:ended_job) - assert {:error, "skipped"} = Queue.fetch_job(TestRepo, job) + assert {:error, "already ended"} = Queue.fetch_job(TestRepo, job) assert is_nil(TestRepo.get_by(Lock, scope: scope, queue: queue)) end @@ -295,17 +269,19 @@ defmodule Queuetopia.QueueTest do _ = Queue.persist_result!(TestRepo, job, :ok) %Job{ - done_at: done_at, + ended_at: ended_at, + end_status: end_status, attempted_at: attempted_at, attempted_by: attempted_by, attempts: attempts } = TestRepo.reload(job) - refute is_nil(done_at) refute is_nil(attempted_at) + refute is_nil(ended_at) + assert end_status == "success" assert attempted_by == Atom.to_string(Node.self()) assert attempts == 1 - assert done_at == attempted_at + assert ended_at == attempted_at end test "when a job succeeded with a result, persists the job as succeeded" do @@ -314,40 +290,58 @@ defmodule Queuetopia.QueueTest do _ = Queue.persist_result!(TestRepo, job, {:ok, :done}) %Job{ - done_at: done_at, + ended_at: ended_at, + end_status: end_status, attempted_at: attempted_at, attempted_by: attempted_by, attempts: attempts } = TestRepo.reload(job) - refute is_nil(done_at) refute is_nil(attempted_at) + refute is_nil(ended_at) + assert end_status == "success" assert attempted_by == Atom.to_string(Node.self()) assert attempts == 1 - assert done_at == attempted_at + assert ended_at == attempted_at end - test "when a job failed, persists the job as failed and record the error" do + test "when a job failed and max_attempts is not reached, persists the job and record the error and next_attempt_at" do job = insert!(:failure_job) _ = Queue.persist_result!(TestRepo, job, {:error, "error"}) %Job{} = job = TestRepo.reload(job) - assert job.done_at == nil - refute job.attempted_at == nil + refute is_nil(job.attempted_at) + refute is_nil(job.next_attempt_at) + assert is_nil(job.ended_at) assert job.attempted_by == Atom.to_string(Node.self()) assert job.attempts == 1 assert job.error == "error" end - test "when a job returns an unexpected_response, persists the job as failed and record the response" do + test "when a job failed and max_attempts is reached, persists the job as failed and record the error" do + job = insert!(:failure_job, attempts: 9, max_attempts: 10) + + _ = Queue.persist_result!(TestRepo, job, {:error, "error"}) + + %Job{} = job = TestRepo.reload(job) + refute is_nil(job.ended_at) + refute is_nil(job.attempted_at) + assert job.attempted_by == Atom.to_string(Node.self()) + assert job.attempts == 10 + assert job.error == "error" + assert job.end_status == "failed" + end + + test "when a job returns an unexpected_response, persists the job and record the error and the response" do job = insert!(:failure_job) _ = Queue.persist_result!(TestRepo, job, "unexpected_response") %Job{} = job = TestRepo.reload(job) - assert job.done_at == nil - refute job.attempted_at == nil + refute is_nil(job.attempted_at) + refute is_nil(job.next_attempt_at) + assert is_nil(job.ended_at) assert job.attempted_by == Atom.to_string(Node.self()) assert job.attempts == 1 assert job.error == "\"unexpected_response\"" @@ -363,12 +357,12 @@ defmodule Queuetopia.QueueTest do _ = Queue.persist_result!(TestRepo, job, {:error, "error"}) %Job{} = job = TestRepo.reload(job) - assert job.done_at == nil + assert job.ended_at == nil refute job.attempted_at == nil assert job.attempted_by == Atom.to_string(Node.self()) assert job.attempts == 1 - assert_receive {:job, %Job{id: ^id, done_at: nil, attempted_at: %DateTime{}, attempts: 1}}, + assert_receive {:job, %Job{id: ^id, ended_at: nil, attempted_at: %DateTime{}, attempts: 1}}, 100 end @@ -382,7 +376,7 @@ defmodule Queuetopia.QueueTest do Queue.persist_result!(TestRepo, job, {:error, "error"}) %Job{ - done_at: nil, + ended_at: nil, attempted_at: attempted_at, next_attempt_at: next_attempt_at } = TestRepo.reload(job) @@ -424,7 +418,7 @@ defmodule Queuetopia.QueueTest do _ = Queue.persist_result!(TestRepo, job, {:error, "error"}) %Job{ - done_at: nil, + ended_at: nil, attempted_at: attempted_at, next_attempt_at: next_attempt_at } = TestRepo.reload(job) @@ -434,7 +428,7 @@ defmodule Queuetopia.QueueTest do _ = Queue.persist_result!(TestRepo, job, {:error, "error"}) %Job{ - done_at: nil, + ended_at: nil, attempted_at: attempted_at, next_attempt_at: next_attempt_at } = TestRepo.reload(job) @@ -444,23 +438,18 @@ defmodule Queuetopia.QueueTest do end describe "processable_now?/1" do - test "when the job is processable now" do + test "when the job is not ended and processable now" do job = insert!(:job) assert Queue.processable_now?(job) end - test "when the job is done" do - job = insert!(:done_job) + test "when the job is not ended and scheduled for later" do + job = insert!(:job, scheduled_at: utc_now() |> add(3600, :second)) refute Queue.processable_now?(job) end - test "when the job is skipped due to reaching the maximum number of attempts" do - job = insert!(:job, attempts: 10, max_attempts: 10) - refute Queue.processable_now?(job) - end - - test "when the job is scheduled for later" do - job = insert!(:job, scheduled_at: utc_now() |> add(3600, :second)) + test "when the job is ended" do + job = insert!(:ended_job) refute Queue.processable_now?(job) end end @@ -469,23 +458,28 @@ defmodule Queuetopia.QueueTest do test "when the job is not done" do job = insert!(:job) refute Queue.done?(job) + assert is_nil(job.ended_at) end test "when the job is done" do job = insert!(:done_job) assert Queue.done?(job) + refute is_nil(job.ended_at) + assert job.end_status == "success" end end - describe "skipped?/1" do - test "when the job is not skipped" do + describe "ended?/1" do + test "when the job is not ended" do job = insert!(:job) - refute Queue.skipped?(job) + refute Queue.ended?(job) + assert is_nil(job.ended_at) end - test "when the maximum number of attempts reached" do - job = insert!(:job, attempts: 10, max_attempts: 10) - assert Queue.skipped?(job) + test "when the job is done" do + job = insert!(:ended_job) + assert Queue.ended?(job) + refute is_nil(job.ended_at) end end @@ -560,12 +554,7 @@ defmodule Queuetopia.QueueTest do end test "filters" do - insert!(:job, done_at: utc_now()) - - assert %{data: [], total: 0} = - Queue.paginate_jobs(TestRepo, 100, 1, filters: [available?: true]) - - insert!(:job, attempts: 3, max_attempts: 3) + insert!(:job, ended_at: utc_now()) assert %{data: [], total: 0} = Queue.paginate_jobs(TestRepo, 100, 1, filters: [available?: true]) @@ -626,11 +615,7 @@ defmodule Queuetopia.QueueTest do end test "filters" do - insert!(:job, done_at: utc_now()) - - assert Queue.list_jobs(TestRepo, filters: [available?: true]) == [] - - insert!(:job, attempts: 1, max_attempts: 1) + insert!(:job, ended_at: utc_now()) assert Queue.list_jobs(TestRepo, filters: [available?: true]) == [] diff --git a/test/queuetopia/scheduler_test.exs b/test/queuetopia/scheduler_test.exs index a04e41e..2e9c0e3 100644 --- a/test/queuetopia/scheduler_test.exs +++ b/test/queuetopia/scheduler_test.exs @@ -259,7 +259,7 @@ defmodule Queuetopia.SchedulerTest do test "a failed job will be retried" do scope = TestQueuetopia.scope() - %{id: failing_job_id, queue: queue} = insert!(:failure_job, scope: scope) + %{id: failing_job_id, queue: queue} = insert!(:failure_job, scope: scope, max_attempts: 1000) start_supervised!(TestQueuetopia) @@ -334,7 +334,7 @@ defmodule Queuetopia.SchedulerTest do assert %Job{ id: ^failing_job_id, - done_at: nil, + ended_at: nil, attempted_at: attempted_at, attempted_by: attempted_by, attempts: attempts, @@ -365,7 +365,7 @@ defmodule Queuetopia.SchedulerTest do assert %Job{ id: ^slow_job_id, - done_at: nil, + ended_at: nil, attempted_at: attempted_at, attempted_by: attempted_by, attempts: attempts, @@ -399,7 +399,7 @@ defmodule Queuetopia.SchedulerTest do assert %Job{ id: ^raising_job_id, - done_at: nil, + ended_at: nil, attempted_at: attempted_at, attempted_by: attempted_by, attempts: attempts, diff --git a/test/support/factory.ex b/test/support/factory.ex index 3994b0d..b6c560a 100644 --- a/test/support/factory.ex +++ b/test/support/factory.ex @@ -42,10 +42,16 @@ defmodule Queuetopia.Factory do |> struct!(attrs) end + def build(:ended_job, attrs) do + job = build(:job, attrs) + + %{job | ended_at: utc_now()} + end + def build(:done_job, attrs) do job = build(:job, attrs) - %{job | done_at: utc_now()} + %{job | ended_at: utc_now(), end_status: "success"} end def build(:raising_job, attrs) do From 2ff7722e5b2f07cc7acee5163a0a9da6df8293e9 Mon Sep 17 00:00:00 2001 From: Aleksandr Golubov Date: Sun, 27 Apr 2025 14:39:23 +0500 Subject: [PATCH 3/9] added data migration to v6 migration, mix format --- lib/queuetopia/migrations/v6.ex | 32 ++++++++++++++++++++++-- lib/queuetopia/queue.ex | 4 +-- test/queuetopia/queue_test.exs | 18 ++++++++++--- test/queuetopia/scheduler_test.exs | 3 ++- test/queuetopia/test/assertions_test.exs | 28 ++++++++++----------- 5 files changed, 63 insertions(+), 22 deletions(-) diff --git a/lib/queuetopia/migrations/v6.ex b/lib/queuetopia/migrations/v6.ex index 2c78d58..2d4de44 100644 --- a/lib/queuetopia/migrations/v6.ex +++ b/lib/queuetopia/migrations/v6.ex @@ -5,16 +5,44 @@ defmodule Queuetopia.Migrations.V6 do def up do rename(table(:queuetopia_jobs), :done_at, to: :ended_at) - rename(index(:queuetopia_jobs, [:ended_at], name: "queuetopia_jobs_done_at_index"), to: "queuetopia_jobs_ended_at_index") + + rename(index(:queuetopia_jobs, [:ended_at], name: "queuetopia_jobs_done_at_index"), + to: "queuetopia_jobs_ended_at_index" + ) alter table(:queuetopia_jobs) do add(:end_status, :string, null: true) end + + max_attempts_reached = + """ + UPDATE queuetopia_jobs + SET ended_at = attempted_at + WHERE ended_at IS NULL AND attempts >= max_attempts; + """ + + execute(max_attempts_reached) + + end_status_query = + """ + UPDATE queuetopia_jobs + SET end_status = + CASE + WHEN error IS NULL THEN 'success' + ELSE 'failed' + END + WHERE ended_at IS NOT NULL; + """ + + execute(end_status_query) end def down do rename(table(:queuetopia_jobs), :ended_at, to: :done_at) - rename(index(:queuetopia_jobs, [:done_at], name: "queuetopia_jobs_ended_at_index"), to: "queuetopia_jobs_done_at_index") + + rename(index(:queuetopia_jobs, [:done_at], name: "queuetopia_jobs_ended_at_index"), + to: "queuetopia_jobs_done_at_index" + ) alter table(:queuetopia_jobs) do remove(:end_status) diff --git a/lib/queuetopia/queue.ex b/lib/queuetopia/queue.ex index f2714b7..034fa93 100644 --- a/lib/queuetopia/queue.ex +++ b/lib/queuetopia/queue.ex @@ -256,7 +256,7 @@ defmodule Queuetopia.Queue do do: persist_failure!(repo, job, inspect(unexpected_response)) defp persist_failure!(repo, %Job{attempts: attempts, max_attempts: max_attempts} = job, error) - when attempts + 1 >= max_attempts do + when attempts + 1 >= max_attempts do utc_now = DateTime.utc_now() |> DateTime.truncate(:second) performer = resolve_performer(job) @@ -266,7 +266,7 @@ defmodule Queuetopia.Queue do attempted_at: utc_now, attempted_by: Atom.to_string(Node.self()), ended_at: utc_now, - error: error, + error: error }) |> repo.update!() |> tap(&performer.handle_failed_job!/1) diff --git a/test/queuetopia/queue_test.exs b/test/queuetopia/queue_test.exs index 49b3947..dad700f 100644 --- a/test/queuetopia/queue_test.exs +++ b/test/queuetopia/queue_test.exs @@ -381,7 +381,11 @@ defmodule Queuetopia.QueueTest do next_attempt_at: next_attempt_at } = TestRepo.reload(job) - assert :eq = DateTime.compare(next_attempt_at, DateTime.add(attempted_at, backoff, :millisecond)) + assert :eq = + DateTime.compare( + next_attempt_at, + DateTime.add(attempted_at, backoff, :millisecond) + ) end) end @@ -423,7 +427,11 @@ defmodule Queuetopia.QueueTest do next_attempt_at: next_attempt_at } = TestRepo.reload(job) - assert :eq = DateTime.compare(next_attempt_at, DateTime.add(attempted_at, max_backoff, :millisecond)) + assert :eq = + DateTime.compare( + next_attempt_at, + DateTime.add(attempted_at, max_backoff, :millisecond) + ) _ = Queue.persist_result!(TestRepo, job, {:error, "error"}) @@ -433,7 +441,11 @@ defmodule Queuetopia.QueueTest do next_attempt_at: next_attempt_at } = TestRepo.reload(job) - assert :eq = DateTime.compare(next_attempt_at, DateTime.add(attempted_at, max_backoff, :millisecond)) + assert :eq = + DateTime.compare( + next_attempt_at, + DateTime.add(attempted_at, max_backoff, :millisecond) + ) end end diff --git a/test/queuetopia/scheduler_test.exs b/test/queuetopia/scheduler_test.exs index 2e9c0e3..c18cf24 100644 --- a/test/queuetopia/scheduler_test.exs +++ b/test/queuetopia/scheduler_test.exs @@ -259,7 +259,8 @@ defmodule Queuetopia.SchedulerTest do test "a failed job will be retried" do scope = TestQueuetopia.scope() - %{id: failing_job_id, queue: queue} = insert!(:failure_job, scope: scope, max_attempts: 1000) + %{id: failing_job_id, queue: queue} = + insert!(:failure_job, scope: scope, max_attempts: 1000) start_supervised!(TestQueuetopia) diff --git a/test/queuetopia/test/assertions_test.exs b/test/queuetopia/test/assertions_test.exs index 9d38991..5f3539d 100644 --- a/test/queuetopia/test/assertions_test.exs +++ b/test/queuetopia/test/assertions_test.exs @@ -75,10 +75,10 @@ defmodule Queuetopia.Test.AssertionsTest do ) expected_attributes = %{queue: "queue", scope: job.scope} + message = %ExUnit.AssertionError{ - message: - "Expected 1 job with attributes #{inspect(expected_attributes)}, got 0." + message: "Expected 1 job with attributes #{inspect(expected_attributes)}, got 0." } |> ExUnit.AssertionError.message() @@ -87,10 +87,10 @@ defmodule Queuetopia.Test.AssertionsTest do end expected_attributes = %{action: "action", scope: job.scope} + message = %ExUnit.AssertionError{ - message: - "Expected 1 job with attributes #{inspect(expected_attributes)}, got 0." + message: "Expected 1 job with attributes #{inspect(expected_attributes)}, got 0." } |> ExUnit.AssertionError.message() @@ -107,10 +107,10 @@ defmodule Queuetopia.Test.AssertionsTest do ) expected_attributes = %{params: %{"b" => 2}, scope: job.scope} + message = %ExUnit.AssertionError{ - message: - "Expected 1 job with attributes #{inspect(expected_attributes)}, got 0." + message: "Expected 1 job with attributes #{inspect(expected_attributes)}, got 0." } |> ExUnit.AssertionError.message() @@ -132,10 +132,10 @@ defmodule Queuetopia.Test.AssertionsTest do ) expected_attributes = %{params: %{"a" => 1}, scope: job.scope} + message = %ExUnit.AssertionError{ - message: - "Expected 1 job with attributes #{inspect(expected_attributes)}, got 2." + message: "Expected 1 job with attributes #{inspect(expected_attributes)}, got 2." } |> ExUnit.AssertionError.message() @@ -174,10 +174,10 @@ defmodule Queuetopia.Test.AssertionsTest do job = insert!(:job, scope: Queuetopia.TestQueuetopia.scope(), params: %{"a" => 1}) expected_attributes = %{queue: job.queue, scope: job.scope} + message = %ExUnit.AssertionError{ - message: - "Expected 0 job with attributes #{inspect(expected_attributes)}, got 1." + message: "Expected 0 job with attributes #{inspect(expected_attributes)}, got 1." } |> ExUnit.AssertionError.message() @@ -186,10 +186,10 @@ defmodule Queuetopia.Test.AssertionsTest do end expected_attributes = %{action: job.action, scope: job.scope} + message = %ExUnit.AssertionError{ - message: - "Expected 0 job with attributes #{inspect(expected_attributes)}, got 1." + message: "Expected 0 job with attributes #{inspect(expected_attributes)}, got 1." } |> ExUnit.AssertionError.message() @@ -198,10 +198,10 @@ defmodule Queuetopia.Test.AssertionsTest do end expected_attributes = %{params: job.params, scope: job.scope} + message = %ExUnit.AssertionError{ - message: - "Expected 0 job with attributes #{inspect(expected_attributes)}, got 1." + message: "Expected 0 job with attributes #{inspect(expected_attributes)}, got 1." } |> ExUnit.AssertionError.message() From f1b1b57f9672928e0ed3009d737314cb772b7fa1 Mon Sep 17 00:00:00 2001 From: Aleksandr Golubov Date: Sun, 27 Apr 2025 17:28:18 +0500 Subject: [PATCH 4/9] added aborted_job_changeset to abort jobs --- lib/queuetopia/migrations/v6.ex | 20 +++++------- lib/queuetopia/queue.ex | 1 + lib/queuetopia/queue/job.ex | 22 +++++++++++-- test/queuetopia/queue/job_test.exs | 51 ++++++++++++++++++++++++++++-- test/queuetopia/queue_test.exs | 2 +- 5 files changed, 78 insertions(+), 18 deletions(-) diff --git a/lib/queuetopia/migrations/v6.ex b/lib/queuetopia/migrations/v6.ex index 2d4de44..a8a6881 100644 --- a/lib/queuetopia/migrations/v6.ex +++ b/lib/queuetopia/migrations/v6.ex @@ -14,27 +14,23 @@ defmodule Queuetopia.Migrations.V6 do add(:end_status, :string, null: true) end - max_attempts_reached = + set_end_status_to_success = """ UPDATE queuetopia_jobs - SET ended_at = attempted_at - WHERE ended_at IS NULL AND attempts >= max_attempts; + SET end_status = 'success' + WHERE ended_at IS NOT NULL AND error IS NULL; """ - execute(max_attempts_reached) + execute(set_end_status_to_success) - end_status_query = + set_end_status_to_max_attempts_reached = """ UPDATE queuetopia_jobs - SET end_status = - CASE - WHEN error IS NULL THEN 'success' - ELSE 'failed' - END - WHERE ended_at IS NOT NULL; + SET ended_at = attempted_at, end_status = 'max_attempts_reached' + WHERE ended_at IS NULL AND attempts >= max_attempts; """ - execute(end_status_query) + execute(set_end_status_to_max_attempts_reached) end def down do diff --git a/lib/queuetopia/queue.ex b/lib/queuetopia/queue.ex index 034fa93..da06f94 100644 --- a/lib/queuetopia/queue.ex +++ b/lib/queuetopia/queue.ex @@ -266,6 +266,7 @@ defmodule Queuetopia.Queue do attempted_at: utc_now, attempted_by: Atom.to_string(Node.self()), ended_at: utc_now, + end_status: "max_attempts_reached", error: error }) |> repo.update!() diff --git a/lib/queuetopia/queue/job.ex b/lib/queuetopia/queue/job.ex index adcbe6d..03aedaf 100644 --- a/lib/queuetopia/queue/job.ex +++ b/lib/queuetopia/queue/job.ex @@ -2,7 +2,15 @@ defmodule Queuetopia.Queue.Job do @moduledoc false use Ecto.Schema - import Ecto.Changeset, only: [cast: 3, put_change: 3, validate_number: 3, validate_required: 2] + + import Ecto.Changeset, + only: [ + cast: 3, + put_change: 3, + validate_number: 3, + validate_required: 2, + validate_inclusion: 3 + ] @type t :: %__MODULE__{} @type option :: @@ -85,10 +93,18 @@ defmodule Queuetopia.Queue.Job do @spec failed_job_changeset(Job.t(), map) :: Ecto.Changeset.t() def failed_job_changeset(%__MODULE__{} = job, attrs) when is_map(attrs) do job - |> cast(attrs, [:attempts, :attempted_at, :attempted_by, :ended_at, :error]) + |> cast(attrs, [:attempts, :attempted_at, :attempted_by, :ended_at, :end_status, :error]) |> validate_required_attempt_attributes + |> validate_required([:ended_at, :end_status, :error]) + |> validate_inclusion(:end_status, ["failed", "max_attempts_reached"]) + end + + @spec aborted_job_changeset(Job.t(), map) :: Ecto.Changeset.t() + def aborted_job_changeset(%__MODULE__{} = job, attrs) when is_map(attrs) do + job + |> cast(attrs, [:ended_at, :error]) |> validate_required([:ended_at, :error]) - |> put_change(:end_status, "failed") + |> put_change(:end_status, "aborted") end @spec succeeded_job_changeset(Job.t(), map) :: Ecto.Changeset.t() diff --git a/test/queuetopia/queue/job_test.exs b/test/queuetopia/queue/job_test.exs index 842a9bd..1af7dcb 100644 --- a/test/queuetopia/queue/job_test.exs +++ b/test/queuetopia/queue/job_test.exs @@ -189,6 +189,7 @@ defmodule Queuetopia.Queue.JobTest do attempted_at: utc_now(), attempted_by: Atom.to_string(Node.self()), ended_at: utc_now(), + end_status: "failed", error: "error" ) @@ -219,7 +220,7 @@ defmodule Queuetopia.Queue.JobTest do assert %{error: ["can't be blank"]} = errors_on(changeset) end - test "fills the end_status field by 'failed'" do + test "when invalid end_status is given, returns an invalid changeset" do job = insert!(:job) changeset = @@ -228,10 +229,12 @@ defmodule Queuetopia.Queue.JobTest do attempted_at: utc_now(), attempted_by: Atom.to_string(Node.self()), ended_at: utc_now(), + end_status: "some_unsupported_invalid_status", error: "error" }) - assert changeset.changes.end_status == "failed" + refute changeset.valid? + assert %{end_status: ["is invalid"]} = errors_on(changeset) end test "when params are valid, return a valid changeset" do @@ -242,6 +245,50 @@ defmodule Queuetopia.Queue.JobTest do attempts: 1, attempted_at: utc_now(), attempted_by: Atom.to_string(Node.self()), + ended_at: utc_now(), + end_status: "failed", + error: "error" + }) + + assert changeset.valid? + end + end + + describe "aborted_job_changeset/2" do + test "only permitted_keys are casted" do + job = insert!(:job) + + params = + params_for(:job, + ended_at: utc_now(), + error: "error" + ) + + changeset = Job.failed_job_changeset(job, Map.merge(params, %{new_key: "value"})) + changes_keys = changeset.changes |> Map.keys() + + assert :ended_at in changes_keys + assert :error in changes_keys + assert Enum.count(changes_keys) == 2 + + refute :new_key in changes_keys + end + + test "when required params are missing, returns an invalid changeset" do + job = insert!(:job) + + changeset = Job.aborted_job_changeset(job, %{attempts: nil, scheduled_at: nil}) + + refute changeset.valid? + assert %{ended_at: ["can't be blank"]} = errors_on(changeset) + assert %{error: ["can't be blank"]} = errors_on(changeset) + end + + test "when params are valid, return a valid changeset" do + job = insert!(:job) + + changeset = + Job.aborted_job_changeset(job, %{ ended_at: utc_now(), error: "error" }) diff --git a/test/queuetopia/queue_test.exs b/test/queuetopia/queue_test.exs index dad700f..d6a2f30 100644 --- a/test/queuetopia/queue_test.exs +++ b/test/queuetopia/queue_test.exs @@ -330,7 +330,7 @@ defmodule Queuetopia.QueueTest do assert job.attempted_by == Atom.to_string(Node.self()) assert job.attempts == 10 assert job.error == "error" - assert job.end_status == "failed" + assert job.end_status == "max_attempts_reached" end test "when a job returns an unexpected_response, persists the job and record the error and the response" do From 8144f823380afd060df6ebea32c1ea0bb8efa1fb Mon Sep 17 00:00:00 2001 From: Aleksandr Golubov Date: Wed, 30 Apr 2025 11:08:24 +0500 Subject: [PATCH 5/9] Added abort_job API method --- lib/queuetopia.ex | 5 +++++ lib/queuetopia/queue.ex | 33 +++++++++++++++++++++++++++++++++ lib/queuetopia/queue/job.ex | 2 +- test/queuetopia/queue_test.exs | 34 ++++++++++++++++++++++++++++++++++ 4 files changed, 73 insertions(+), 1 deletion(-) diff --git a/lib/queuetopia.ex b/lib/queuetopia.ex index 80bb99d..1357429 100644 --- a/lib/queuetopia.ex +++ b/lib/queuetopia.ex @@ -214,6 +214,11 @@ defmodule Queuetopia do Queuetopia.Queue.paginate_jobs(@repo, page_size, page_number, opts) end + @spec abort_job(Job.t(), error :: any) :: {:ok, any} | {:error, any} + def abort_job(%Job{} = job, error \\ nil) do + Queuetopia.Queue.abort_job(@repo, job, error) + end + def handle_event(:new_incoming_job) do listen(:new_incoming_job) end diff --git a/lib/queuetopia/queue.ex b/lib/queuetopia/queue.ex index da06f94..a4cfc4f 100644 --- a/lib/queuetopia/queue.ex +++ b/lib/queuetopia/queue.ex @@ -102,6 +102,28 @@ defmodule Queuetopia.Queue do end) end + @spec abort_job(module, Job.t(), any) :: {:ok, any} | {:error, any} + def abort_job(repo, %Job{id: id} = job, error \\ nil) do + Ecto.Multi.new() + |> Ecto.Multi.run(:job, fn _, _ -> + job = repo.get(Job, id) + + with {:ended?, false} <- {:ended?, ended?(job)} do + persist_abort(repo, job, error) + else + {:ended?, true} -> {:error, "already ended"} + end + end) + |> Ecto.Multi.run(:lock, fn _, _ -> + {:ok, unlock_queue(repo, job.scope, job.queue)} + end) + |> repo.transaction() + |> case do + {:ok, %{job: job}} -> {:ok, job} + {:error, :job, error, _} -> {:error, error} + end + end + @doc """ Returns true if a job scheduled date is reached and the job is not ended yet. Otherwise, returns false. @@ -303,6 +325,17 @@ defmodule Queuetopia.Queue do |> repo.update!() end + defp persist_abort(repo, %Job{} = job, error) do + utc_now = DateTime.utc_now() |> DateTime.truncate(:second) + + job + |> Job.aborted_job_changeset(%{ + ended_at: utc_now, + error: error + }) + |> repo.update() + end + defp resolve_performer(%Job{performer: performer}) do performer |> String.split(".") diff --git a/lib/queuetopia/queue/job.ex b/lib/queuetopia/queue/job.ex index 03aedaf..6e73a2e 100644 --- a/lib/queuetopia/queue/job.ex +++ b/lib/queuetopia/queue/job.ex @@ -103,7 +103,7 @@ defmodule Queuetopia.Queue.Job do def aborted_job_changeset(%__MODULE__{} = job, attrs) when is_map(attrs) do job |> cast(attrs, [:ended_at, :error]) - |> validate_required([:ended_at, :error]) + |> validate_required([:ended_at]) |> put_change(:end_status, "aborted") end diff --git a/test/queuetopia/queue_test.exs b/test/queuetopia/queue_test.exs index d6a2f30..331fea1 100644 --- a/test/queuetopia/queue_test.exs +++ b/test/queuetopia/queue_test.exs @@ -667,6 +667,40 @@ defmodule Queuetopia.QueueTest do end end + describe "abort_job/3" do + test "when aborts the ended job, returns error" do + job = insert!(:ended_job) + + assert {:error, "already ended"} = Queue.abort_job(TestRepo, job) + end + + test "when aborts the not ended job, aborts the job and record the ended_at and the end_status" do + job = insert!(:job) + + assert {:ok, %Job{}} = Queue.abort_job(TestRepo, job) + + %Job{} = job = TestRepo.reload(job) + refute is_nil(job.ended_at) + assert job.end_status == "aborted" + end + + test "when aborts the locked not ended job, aborts the job and removes the lock" do + %Job{id: id, queue: queue, scope: scope} = job = insert!(:job, timeout: 1_000) + + assert {:ok, %Job{id: ^id}} = Queue.fetch_job(TestRepo, job) + + assert %Lock{} = TestRepo.get_by(Lock, scope: scope, queue: queue) + + assert {:ok, %Job{end_status: "aborted"}} = Queue.abort_job(TestRepo, job) + + assert is_nil(TestRepo.get_by(Lock, scope: scope, queue: queue)) + end + + test "when aborts the perfoming job, ..." do + # TODO + end + end + defp all_locks(scope) do Lock |> Ecto.Query.where(scope: ^scope) |> TestRepo.all() end From f41ea31b9fa1b002e109d9e5f98f97771294fe74 Mon Sep 17 00:00:00 2001 From: Aleksandr Golubov Date: Mon, 5 May 2025 17:07:50 +0500 Subject: [PATCH 6/9] aborting job using scheduler --- lib/queuetopia.ex | 12 ++++++-- lib/queuetopia/queue.ex | 46 ++++++++-------------------- lib/queuetopia/queue/job.ex | 2 +- lib/queuetopia/scheduler.ex | 34 +++++++++++++++++---- test/queuetopia/queue/job_test.exs | 25 ++++++++------- test/queuetopia/queue_test.exs | 44 ++++++--------------------- test/queuetopia/scheduler_test.exs | 49 ++++++++++++++++++++++++++++-- test/queuetopia_test.exs | 20 ++++++++++++ 8 files changed, 139 insertions(+), 93 deletions(-) diff --git a/lib/queuetopia.ex b/lib/queuetopia.ex index 1357429..148ad6f 100644 --- a/lib/queuetopia.ex +++ b/lib/queuetopia.ex @@ -214,9 +214,15 @@ defmodule Queuetopia do Queuetopia.Queue.paginate_jobs(@repo, page_size, page_number, opts) end - @spec abort_job(Job.t(), error :: any) :: {:ok, any} | {:error, any} - def abort_job(%Job{} = job, error \\ nil) do - Queuetopia.Queue.abort_job(@repo, job, error) + @spec abort_job(Job.t()) :: :ok | {:error, any} + def abort_job(%Job{} = job) do + scheduler_pid = Process.whereis(scheduler()) + + if is_pid(scheduler_pid) do + Queuetopia.Scheduler.abort_job(scheduler_pid, job) + else + {:error, "#{inspect(__MODULE__)} is down"} + end end def handle_event(:new_incoming_job) do diff --git a/lib/queuetopia/queue.ex b/lib/queuetopia/queue.ex index a4cfc4f..7033f61 100644 --- a/lib/queuetopia/queue.ex +++ b/lib/queuetopia/queue.ex @@ -102,28 +102,6 @@ defmodule Queuetopia.Queue do end) end - @spec abort_job(module, Job.t(), any) :: {:ok, any} | {:error, any} - def abort_job(repo, %Job{id: id} = job, error \\ nil) do - Ecto.Multi.new() - |> Ecto.Multi.run(:job, fn _, _ -> - job = repo.get(Job, id) - - with {:ended?, false} <- {:ended?, ended?(job)} do - persist_abort(repo, job, error) - else - {:ended?, true} -> {:error, "already ended"} - end - end) - |> Ecto.Multi.run(:lock, fn _, _ -> - {:ok, unlock_queue(repo, job.scope, job.queue)} - end) - |> repo.transaction() - |> case do - {:ok, %{job: job}} -> {:ok, job} - {:error, :job, error, _} -> {:error, error} - end - end - @doc """ Returns true if a job scheduled date is reached and the job is not ended yet. Otherwise, returns false. @@ -266,11 +244,14 @@ defmodule Queuetopia.Queue do end @doc false - @spec persist_result!(module, Job.t(), {:error, any} | :ok | {:ok, any}) :: Job.t() + @spec persist_result!(module, Job.t(), {:error, any} | :ok | {:ok, any} | :abort) :: Job.t() def persist_result!(repo, %Job{} = job, {:ok, _res}), do: persist_success!(repo, job) def persist_result!(repo, %Job{} = job, :ok), do: persist_success!(repo, job) + def persist_result!(repo, %Job{} = job, :aborted), + do: persist_abort!(repo, job) + def persist_result!(repo, %Job{} = job, {:error, error}) when is_binary(error), do: persist_failure!(repo, job, error) @@ -312,28 +293,27 @@ defmodule Queuetopia.Queue do |> tap(&performer.handle_failed_job!/1) end - defp persist_success!(repo, %Job{} = job) do + defp persist_abort!(repo, %Job{} = job) do utc_now = DateTime.utc_now() |> DateTime.truncate(:second) job - |> Job.succeeded_job_changeset(%{ - attempts: job.attempts + 1, - attempted_at: utc_now, - attempted_by: Atom.to_string(Node.self()), + |> Job.aborted_job_changeset(%{ ended_at: utc_now }) |> repo.update!() end - defp persist_abort(repo, %Job{} = job, error) do + defp persist_success!(repo, %Job{} = job) do utc_now = DateTime.utc_now() |> DateTime.truncate(:second) job - |> Job.aborted_job_changeset(%{ - ended_at: utc_now, - error: error + |> Job.succeeded_job_changeset(%{ + attempts: job.attempts + 1, + attempted_at: utc_now, + attempted_by: Atom.to_string(Node.self()), + ended_at: utc_now }) - |> repo.update() + |> repo.update!() end defp resolve_performer(%Job{performer: performer}) do diff --git a/lib/queuetopia/queue/job.ex b/lib/queuetopia/queue/job.ex index 6e73a2e..da1e499 100644 --- a/lib/queuetopia/queue/job.ex +++ b/lib/queuetopia/queue/job.ex @@ -102,7 +102,7 @@ defmodule Queuetopia.Queue.Job do @spec aborted_job_changeset(Job.t(), map) :: Ecto.Changeset.t() def aborted_job_changeset(%__MODULE__{} = job, attrs) when is_map(attrs) do job - |> cast(attrs, [:ended_at, :error]) + |> cast(attrs, [:ended_at]) |> validate_required([:ended_at]) |> put_change(:end_status, "aborted") end diff --git a/lib/queuetopia/scheduler.ex b/lib/queuetopia/scheduler.ex index 4f63ca4..c86a764 100644 --- a/lib/queuetopia/scheduler.ex +++ b/lib/queuetopia/scheduler.ex @@ -18,6 +18,10 @@ defmodule Queuetopia.Scheduler do end end + def abort_job(scheduler_pid, job) do + GenServer.call(scheduler_pid, {:abort, job}) + end + defp has_poll_messages?(scheduler_pid) do {:messages, messages} = Process.info(scheduler_pid, :messages) @@ -66,30 +70,33 @@ defmodule Queuetopia.Scheduler do {:noreply, %{state | jobs: jobs}} end + @impl true def handle_info( {:DOWN, ref, :process, _pid, reason}, %{jobs: jobs, repo: repo, scope: scope} = state ) do - job = Map.get(jobs, ref) + job = Map.get(jobs, ref)[:job] :ok = handle_task_result(repo, job, {:error, inspect(reason)}) Queue.unlock_queue(repo, scope, job.queue) {:noreply, %{state | jobs: Map.delete(jobs, ref)}} end - def handle_info({:kill, task}, %{jobs: jobs, repo: repo} = state) do + @impl true + def handle_info({:timeout, task}, %{jobs: jobs, repo: repo} = state) do Task.shutdown(task, :brutal_kill) - job = Map.get(jobs, task.ref) + job = Map.get(jobs, task.ref)[:job] :ok = handle_task_result(repo, job, {:error, "job_timeout"}) {:noreply, %{state | jobs: Map.delete(jobs, task.ref)}} end + @impl true def handle_info({ref, task_result}, %{jobs: jobs, repo: repo, scope: scope} = state) do Process.demonitor(ref, [:flush]) - job = Map.get(jobs, ref) + job = Map.get(jobs, ref)[:job] :ok = handle_task_result(repo, job, task_result) @@ -100,6 +107,21 @@ defmodule Queuetopia.Scheduler do {:noreply, %{state | jobs: Map.delete(jobs, ref)}} end + @impl true + def handle_call({:abort, %Job{} = job}, _from, %{jobs: jobs, repo: repo, scope: scope} = state) do + ref = + with {ref, %{job_task: task}} <- Enum.find(jobs, &(elem(&1, 1).job.id == job.id)) do + Task.shutdown(task, :brutal_kill) + ref + end + + :ok = handle_task_result(repo, job, :aborted) + + Queue.unlock_queue(repo, scope, job.queue) + + {:reply, :ok, %{state | jobs: Map.delete(jobs, ref)}} + end + defp handle_task_result(repo, job, result) do unless is_nil(job) do Queue.persist_result!(repo, job, result) @@ -140,8 +162,8 @@ defmodule Queuetopia.Scheduler do {:ok, job} <- Queue.fetch_job(repo, job) do task = Task.Supervisor.async_nolink(task_supervisor_name, Queue, :perform, [job]) - Process.send_after(self(), {:kill, task}, job.timeout) - {task.ref, job} + Process.send_after(self(), {:timeout, task}, job.timeout) + {task.ref, %{job: job, job_task: task}} else _ -> nil end diff --git a/test/queuetopia/queue/job_test.exs b/test/queuetopia/queue/job_test.exs index 1af7dcb..f9fea64 100644 --- a/test/queuetopia/queue/job_test.exs +++ b/test/queuetopia/queue/job_test.exs @@ -258,17 +258,13 @@ defmodule Queuetopia.Queue.JobTest do test "only permitted_keys are casted" do job = insert!(:job) - params = - params_for(:job, - ended_at: utc_now(), - error: "error" - ) + params = params_for(:job, ended_at: utc_now()) - changeset = Job.failed_job_changeset(job, Map.merge(params, %{new_key: "value"})) + changeset = Job.aborted_job_changeset(job, Map.merge(params, %{new_key: "value"})) changes_keys = changeset.changes |> Map.keys() assert :ended_at in changes_keys - assert :error in changes_keys + assert :end_status in changes_keys assert Enum.count(changes_keys) == 2 refute :new_key in changes_keys @@ -281,17 +277,20 @@ defmodule Queuetopia.Queue.JobTest do refute changeset.valid? assert %{ended_at: ["can't be blank"]} = errors_on(changeset) - assert %{error: ["can't be blank"]} = errors_on(changeset) + end + + test "fills the end_status field by 'aborted'" do + job = insert!(:job) + + changeset = Job.aborted_job_changeset(job, %{ended_at: utc_now()}) + + assert changeset.changes.end_status == "aborted" end test "when params are valid, return a valid changeset" do job = insert!(:job) - changeset = - Job.aborted_job_changeset(job, %{ - ended_at: utc_now(), - error: "error" - }) + changeset = Job.aborted_job_changeset(job, %{ended_at: utc_now()}) assert changeset.valid? end diff --git a/test/queuetopia/queue_test.exs b/test/queuetopia/queue_test.exs index 331fea1..8437cd7 100644 --- a/test/queuetopia/queue_test.exs +++ b/test/queuetopia/queue_test.exs @@ -347,6 +347,16 @@ defmodule Queuetopia.QueueTest do assert job.error == "\"unexpected_response\"" end + test "when a job aborted, persists the job as aborted and record the ended_at and end_status" do + job = insert!(:job) + + _ = Queue.persist_result!(TestRepo, job, :aborted) + + %Job{} = job = TestRepo.reload(job) + refute is_nil(job.ended_at) + assert job.end_status == "aborted" + end + test "when handle_failed_job/1 is defined by the performer" do %{id: id} = job = @@ -667,40 +677,6 @@ defmodule Queuetopia.QueueTest do end end - describe "abort_job/3" do - test "when aborts the ended job, returns error" do - job = insert!(:ended_job) - - assert {:error, "already ended"} = Queue.abort_job(TestRepo, job) - end - - test "when aborts the not ended job, aborts the job and record the ended_at and the end_status" do - job = insert!(:job) - - assert {:ok, %Job{}} = Queue.abort_job(TestRepo, job) - - %Job{} = job = TestRepo.reload(job) - refute is_nil(job.ended_at) - assert job.end_status == "aborted" - end - - test "when aborts the locked not ended job, aborts the job and removes the lock" do - %Job{id: id, queue: queue, scope: scope} = job = insert!(:job, timeout: 1_000) - - assert {:ok, %Job{id: ^id}} = Queue.fetch_job(TestRepo, job) - - assert %Lock{} = TestRepo.get_by(Lock, scope: scope, queue: queue) - - assert {:ok, %Job{end_status: "aborted"}} = Queue.abort_job(TestRepo, job) - - assert is_nil(TestRepo.get_by(Lock, scope: scope, queue: queue)) - end - - test "when aborts the perfoming job, ..." do - # TODO - end - end - defp all_locks(scope) do Lock |> Ecto.Query.where(scope: ^scope) |> TestRepo.all() end diff --git a/test/queuetopia/scheduler_test.exs b/test/queuetopia/scheduler_test.exs index c18cf24..3c4f40f 100644 --- a/test/queuetopia/scheduler_test.exs +++ b/test/queuetopia/scheduler_test.exs @@ -71,7 +71,7 @@ defmodule Queuetopia.SchedulerTest do assert_receive {_, _, :started}, 50 assert_receive {_, _, :started}, 50 %{jobs: jobs} = :sys.get_state(TestQueuetopia.Scheduler) - job_ids = jobs |> Enum.map(fn {_, job} -> job.id end) + job_ids = jobs |> Enum.map(&elem(&1, 1)[:job].id) assert job_id_1 in job_ids assert job_id_2 in job_ids @@ -79,14 +79,14 @@ defmodule Queuetopia.SchedulerTest do refute_receive {_, _, :started}, 30 %{jobs: jobs} = :sys.get_state(TestQueuetopia.Scheduler) - job_ids = jobs |> Enum.map(fn {_, job} -> job.id end) + job_ids = jobs |> Enum.map(&elem(&1, 1)[:job].id) assert job_id_1 in job_ids assert job_id_2 in job_ids assert_receive {_, _, :ok}, 100 assert_receive {_, _, :started}, 100 %{jobs: jobs} = :sys.get_state(TestQueuetopia.Scheduler) - job_ids = jobs |> Enum.map(fn {_, job} -> job.id end) + job_ids = jobs |> Enum.map(&elem(&1, 1)[:job].id) assert job_id_1 in job_ids assert job_id_3 in job_ids @@ -467,4 +467,47 @@ defmodule Queuetopia.SchedulerTest do :sys.get_state(TestQueuetopia.Scheduler) end + + describe "abort_job/2" do + test "when task for a job is not running, records the ended_at and end_status fields to the job" do + job = insert!(:job, scheduled_at: utc_now() |> DateTime.add(15, :second)) + + start_supervised!(TestQueuetopia) + scheduler_pid = Process.whereis(TestQueuetopia.Scheduler) + + assert :ok = Queuetopia.Scheduler.abort_job(scheduler_pid, job) + + job = TestRepo.get_by(Job, id: job.id) + refute is_nil(job.ended_at) + assert job.end_status == "aborted" + end + + test "when task for a job is running, terminates the task and records the ended_at and end_status fields to the job" do + scope = TestQueuetopia.scope() + + %Job{id: job_id, queue: queue} = + job = + insert!(:slow_job, + params: %{"duration" => 500}, + timeout: 10_000, + scope: scope + ) + + start_supervised!(TestQueuetopia) + scheduler_pid = Process.whereis(TestQueuetopia.Scheduler) + + assert_receive {^queue, ^job_id, :started}, 100 + assert %Lock{} = TestRepo.get_by(Lock, queue: queue, scope: scope) + + assert :ok = Queuetopia.Scheduler.abort_job(scheduler_pid, job) + + job = TestRepo.get_by(Job, id: job.id) + refute is_nil(job.ended_at) + assert job.end_status == "aborted" + + assert is_nil(TestRepo.get_by(Lock, queue: queue, scope: scope)) + + refute_receive {^queue, ^job_id, :ok}, 600 + end + end end diff --git a/test/queuetopia_test.exs b/test/queuetopia_test.exs index 3a242cf..eee97c3 100644 --- a/test/queuetopia_test.exs +++ b/test/queuetopia_test.exs @@ -156,6 +156,26 @@ defmodule QueuetopiaTest do assert %{data: [], total: 2} = TestQueuetopia.paginate_jobs(1, 3) end + test "abort_job/1" do + scope = TestQueuetopia.scope() + + %Job{id: job_id, queue: queue} = + job = + insert!(:slow_job, + params: %{"duration" => 500}, + timeout: 10_000, + scope: scope + ) + + start_supervised!(TestQueuetopia) + + assert_receive {^queue, ^job_id, :started}, 100 + + assert :ok = TestQueuetopia.abort_job(job) + + refute_receive {^queue, ^job_id, :ok}, 600 + end + describe "handle_event/1" do test "sends a poll message to the scheduler" do Application.put_env(:queuetopia, TestQueuetopia, poll_interval: 5_000) From b5973c0c0a5024e410a88911909b90182dfe7489 Mon Sep 17 00:00:00 2001 From: Aleksandr Golubov Date: Mon, 5 May 2025 17:12:48 +0500 Subject: [PATCH 7/9] mistype fixed --- lib/queuetopia/queue.ex | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/queuetopia/queue.ex b/lib/queuetopia/queue.ex index 7033f61..68cee7b 100644 --- a/lib/queuetopia/queue.ex +++ b/lib/queuetopia/queue.ex @@ -244,7 +244,7 @@ defmodule Queuetopia.Queue do end @doc false - @spec persist_result!(module, Job.t(), {:error, any} | :ok | {:ok, any} | :abort) :: Job.t() + @spec persist_result!(module, Job.t(), {:error, any} | :ok | {:ok, any} | :aborted) :: Job.t() def persist_result!(repo, %Job{} = job, {:ok, _res}), do: persist_success!(repo, job) def persist_result!(repo, %Job{} = job, :ok), do: persist_success!(repo, job) From eaa0d4671c9dcb83069eb4afc3e1e64887a7ede2 Mon Sep 17 00:00:00 2001 From: Aleksandr Golubov Date: Tue, 6 May 2025 16:27:39 +0500 Subject: [PATCH 8/9] remove tests for now irrelevant max_attempts filter --- test/queuetopia/queue_test.exs | 35 ---------------------------------- 1 file changed, 35 deletions(-) diff --git a/test/queuetopia/queue_test.exs b/test/queuetopia/queue_test.exs index 0ac8379..1328cae 100644 --- a/test/queuetopia/queue_test.exs +++ b/test/queuetopia/queue_test.exs @@ -43,18 +43,6 @@ defmodule Queuetopia.QueueTest do assert [] = Queue.list_available_pending_queues(TestRepo, scope) end - test "don't list queues whom jobs reached the maximum number of attempts" do - %{queue: _queue, scope: scope} = - insert!(:job, - scheduled_at: utc_now() |> add(-3600), - next_attempt_at: utc_now(), - attempts: 5, - max_attempts: 5 - ) - - assert [] = Queue.list_available_pending_queues(TestRepo, scope) - end - test "when limit is given, returns only the specified number of rows from the result set" do %{queue: queue, scope: scope} = insert!(:job) insert!(:job, queue: queue, scope: scope) @@ -158,13 +146,6 @@ defmodule Queuetopia.QueueTest do assert is_nil(Queue.get_next_pending_job(TestRepo, scope, queue)) end - - test "when max job attempts is reached, returns nil" do - %Job{queue: queue, scope: scope} = - insert!(:job, next_attempt_at: utc_now(), attempts: 20, max_attempts: 20) - - assert is_nil(Queue.get_next_pending_job(TestRepo, scope, queue)) - end end describe "fetch_job/2" do @@ -195,13 +176,6 @@ defmodule Queuetopia.QueueTest do assert is_nil(TestRepo.get_by(Lock, scope: scope, queue: queue)) end - test "when max job attempts is reached, returns error" do - %{queue: queue, scope: scope} = job = insert!(:job, attempts: 20, max_attempts: 20) - - assert {:error, "max attempts reached"} = Queue.fetch_job(TestRepo, job) - assert is_nil(TestRepo.get_by(Lock, scope: scope, queue: queue)) - end - test "when the job is scheduled for later" do %Job{queue: queue, scope: scope} = job = insert!(:job, scheduled_at: utc_now() |> add(3600, :second)) @@ -616,11 +590,6 @@ defmodule Queuetopia.QueueTest do test "filters" do insert!(:job, ended_at: utc_now()) - assert %{data: [], total: 0} = - Queue.paginate_jobs(TestRepo, 100, 1, filters: [available?: true]) - - insert!(:job, attempts: 3, max_attempts: 3) - assert %{data: [], total: 0} = Queue.paginate_jobs(TestRepo, 100, 1, filters: [available?: true]) @@ -684,10 +653,6 @@ defmodule Queuetopia.QueueTest do assert Queue.list_jobs(TestRepo, filters: [available?: true]) == [] - insert!(:job, attempts: 1, max_attempts: 1) - - assert Queue.list_jobs(TestRepo, filters: [available?: true]) == [] - %{id: id} = job = insert!(:job) [ From 516835c05c865370dfde62e6e0085e99b3d72e14 Mon Sep 17 00:00:00 2001 From: Aleksandr Golubov Date: Tue, 6 May 2025 17:27:45 +0500 Subject: [PATCH 9/9] mix format (elixir 1.12.1) --- lib/queuetopia/migrations/v6.ex | 22 ++++++++++------------ 1 file changed, 10 insertions(+), 12 deletions(-) diff --git a/lib/queuetopia/migrations/v6.ex b/lib/queuetopia/migrations/v6.ex index a8a6881..34533ea 100644 --- a/lib/queuetopia/migrations/v6.ex +++ b/lib/queuetopia/migrations/v6.ex @@ -14,21 +14,19 @@ defmodule Queuetopia.Migrations.V6 do add(:end_status, :string, null: true) end - set_end_status_to_success = - """ - UPDATE queuetopia_jobs - SET end_status = 'success' - WHERE ended_at IS NOT NULL AND error IS NULL; - """ + set_end_status_to_success = """ + UPDATE queuetopia_jobs + SET end_status = 'success' + WHERE ended_at IS NOT NULL AND error IS NULL; + """ execute(set_end_status_to_success) - set_end_status_to_max_attempts_reached = - """ - UPDATE queuetopia_jobs - SET ended_at = attempted_at, end_status = 'max_attempts_reached' - WHERE ended_at IS NULL AND attempts >= max_attempts; - """ + set_end_status_to_max_attempts_reached = """ + UPDATE queuetopia_jobs + SET ended_at = attempted_at, end_status = 'max_attempts_reached' + WHERE ended_at IS NULL AND attempts >= max_attempts; + """ execute(set_end_status_to_max_attempts_reached) end