diff --git a/lib/queuetopia.ex b/lib/queuetopia.ex index 80bb99d..148ad6f 100644 --- a/lib/queuetopia.ex +++ b/lib/queuetopia.ex @@ -214,6 +214,17 @@ defmodule Queuetopia do Queuetopia.Queue.paginate_jobs(@repo, page_size, page_number, opts) end + @spec abort_job(Job.t()) :: :ok | {:error, any} + def abort_job(%Job{} = job) do + scheduler_pid = Process.whereis(scheduler()) + + if is_pid(scheduler_pid) do + Queuetopia.Scheduler.abort_job(scheduler_pid, job) + else + {:error, "#{inspect(__MODULE__)} is down"} + end + end + def handle_event(:new_incoming_job) do listen(:new_incoming_job) end diff --git a/lib/queuetopia/migrations.ex b/lib/queuetopia/migrations.ex index db12d2e..9f2e2e8 100644 --- a/lib/queuetopia/migrations.ex +++ b/lib/queuetopia/migrations.ex @@ -2,7 +2,7 @@ defmodule Queuetopia.Migrations do use Ecto.Migration @initial_version 0 - @current_version 5 + @current_version 6 def up(opts \\ []) when is_list(opts) do from_version = Keyword.get(opts, :from_version, @initial_version) |> Kernel.+(1) to_version = Keyword.get(opts, :to_version, @current_version) diff --git a/lib/queuetopia/migrations/v6.ex b/lib/queuetopia/migrations/v6.ex new file mode 100644 index 0000000..34533ea --- /dev/null +++ b/lib/queuetopia/migrations/v6.ex @@ -0,0 +1,45 @@ +defmodule Queuetopia.Migrations.V6 do + @moduledoc false + + use Ecto.Migration + + def up do + rename(table(:queuetopia_jobs), :done_at, to: :ended_at) + + rename(index(:queuetopia_jobs, [:ended_at], name: "queuetopia_jobs_done_at_index"), + to: "queuetopia_jobs_ended_at_index" + ) + + alter table(:queuetopia_jobs) do + add(:end_status, :string, null: true) + end + + set_end_status_to_success = """ + UPDATE queuetopia_jobs + SET end_status = 'success' + WHERE ended_at IS NOT NULL AND error IS NULL; + """ + + execute(set_end_status_to_success) + + set_end_status_to_max_attempts_reached = """ + UPDATE queuetopia_jobs + SET ended_at = attempted_at, end_status = 'max_attempts_reached' + WHERE ended_at IS NULL AND attempts >= max_attempts; + """ + + execute(set_end_status_to_max_attempts_reached) + end + + def down do + rename(table(:queuetopia_jobs), :ended_at, to: :done_at) + + rename(index(:queuetopia_jobs, [:done_at], name: "queuetopia_jobs_ended_at_index"), + to: "queuetopia_jobs_done_at_index" + ) + + alter table(:queuetopia_jobs) do + remove(:end_status) + end + end +end diff --git a/lib/queuetopia/queue.ex b/lib/queuetopia/queue.ex index 555ea94..7b3d24a 100644 --- a/lib/queuetopia/queue.ex +++ b/lib/queuetopia/queue.ex @@ -103,21 +103,30 @@ defmodule Queuetopia.Queue do end @doc """ - Returns true if a job scheduled date is reached and the job is not done yet. + Returns true if a job scheduled date is reached and the job is not ended yet. Otherwise, returns false. """ @spec processable_now?(Job.t()) :: boolean def processable_now?(%Job{} = job) do - not done?(job) and not max_attempts_reached?(job) and scheduled_for_now?(job) + not ended?(job) and scheduled_for_now?(job) end @doc """ - Returns true if a job is done. + Returns true if a job is successfully ended. Otherwise, returns false. """ @spec done?(Job.t()) :: boolean def done?(%Job{} = job) do - not is_nil(job.done_at) + ended?(job) and job.end_status == "success" + end + + @doc """ + Returns true if a job is somehow ended. + Otherwise, returns false. + """ + @spec ended?(Job.t()) :: boolean + def ended?(%Job{} = job) do + not is_nil(job.ended_at) end @doc """ @@ -156,7 +165,7 @@ defmodule Queuetopia.Queue do Job |> select([:queue]) |> where([j], j.scope == ^scope) - |> where([j], is_nil(j.done_at)) + |> where([j], is_nil(j.ended_at)) |> where([j], j.scheduled_at <= ^utc_now and j.next_attempt_at > ^utc_now) where_immediately_executable_job = fn queryable -> @@ -173,8 +182,7 @@ defmodule Queuetopia.Queue do query = Job |> where([j], j.scope == ^scope) - |> where([j], is_nil(j.done_at)) - |> where([j], j.attempts < j.max_attempts) + |> where([j], is_nil(j.ended_at)) |> where([j], j.queue not in subquery(locked_queues)) |> where([j], j.queue not in subquery(blocked_queues)) |> where_immediately_executable_job.() @@ -200,8 +208,7 @@ defmodule Queuetopia.Queue do Job |> where([j], j.queue == ^queue) |> where([j], j.scope == ^scope) - |> where([j], is_nil(j.done_at)) - |> where([j], j.attempts < j.max_attempts) + |> where([j], is_nil(j.ended_at)) |> order_by(asc: :scheduled_at, asc: :sequence) |> limit(1) |> repo.one() @@ -222,14 +229,11 @@ defmodule Queuetopia.Queue do |> Ecto.Multi.run(:job, fn _, _ -> job = repo.get(Job, id) - with {:done?, false} <- {:done?, done?(job)}, - {:max_attempts_reached?, false} <- - {:max_attempts_reached?, max_attempts_reached?(job)}, + with {:ended?, false} <- {:ended?, ended?(job)}, {:scheduled_for_now?, true} <- {:scheduled_for_now?, scheduled_for_now?(job)} do {:ok, job} else - {:done?, true} -> {:error, "already done"} - {:max_attempts_reached?, true} -> {:error, "max attempts reached"} + {:ended?, true} -> {:error, "already ended"} {:scheduled_for_now?, false} -> {:error, "scheduled for later"} end end) @@ -249,24 +253,45 @@ defmodule Queuetopia.Queue do end @doc false - @spec persist_result!(module, Job.t(), {:error, any} | :ok | {:ok, any}) :: Job.t() + @spec persist_result!(module, Job.t(), {:error, any} | :ok | {:ok, any} | :aborted) :: Job.t() def persist_result!(repo, %Job{} = job, {:ok, _res}), do: persist_success!(repo, job) def persist_result!(repo, %Job{} = job, :ok), do: persist_success!(repo, job) + def persist_result!(repo, %Job{} = job, :aborted), + do: persist_abort!(repo, job) + def persist_result!(repo, %Job{} = job, {:error, error}) when is_binary(error), do: persist_failure!(repo, job, error) def persist_result!(repo, %Job{} = job, unexpected_response), do: persist_failure!(repo, job, inspect(unexpected_response)) + defp persist_failure!(repo, %Job{attempts: attempts, max_attempts: max_attempts} = job, error) + when attempts + 1 >= max_attempts do + utc_now = DateTime.utc_now() |> DateTime.truncate(:second) + performer = resolve_performer(job) + + job + |> Job.failed_job_changeset(%{ + attempts: job.attempts + 1, + attempted_at: utc_now, + attempted_by: Atom.to_string(Node.self()), + ended_at: utc_now, + end_status: "max_attempts_reached", + error: error + }) + |> repo.update!() + |> tap(&performer.handle_failed_job!/1) + end + defp persist_failure!(repo, %Job{} = job, error) do utc_now = DateTime.utc_now() |> DateTime.truncate(:second) performer = resolve_performer(job) backoff = performer.backoff(job) job - |> Job.failed_job_changeset(%{ + |> Job.retry_job_changeset(%{ attempts: job.attempts + 1, attempted_at: utc_now, attempted_by: Atom.to_string(Node.self()), @@ -277,6 +302,16 @@ defmodule Queuetopia.Queue do |> tap(&performer.handle_failed_job!/1) end + defp persist_abort!(repo, %Job{} = job) do + utc_now = DateTime.utc_now() |> DateTime.truncate(:second) + + job + |> Job.aborted_job_changeset(%{ + ended_at: utc_now + }) + |> repo.update!() + end + defp persist_success!(repo, %Job{} = job) do utc_now = DateTime.utc_now() |> DateTime.truncate(:second) @@ -285,7 +320,7 @@ defmodule Queuetopia.Queue do attempts: job.attempts + 1, attempted_at: utc_now, attempted_by: Atom.to_string(Node.self()), - done_at: utc_now + ended_at: utc_now }) |> repo.update!() end diff --git a/lib/queuetopia/queue/job.ex b/lib/queuetopia/queue/job.ex index 52c9b25..da1e499 100644 --- a/lib/queuetopia/queue/job.ex +++ b/lib/queuetopia/queue/job.ex @@ -2,7 +2,15 @@ defmodule Queuetopia.Queue.Job do @moduledoc false use Ecto.Schema - import Ecto.Changeset, only: [cast: 3, put_change: 3, validate_number: 3, validate_required: 2] + + import Ecto.Changeset, + only: [ + cast: 3, + put_change: 3, + validate_number: 3, + validate_required: 2, + validate_inclusion: 3 + ] @type t :: %__MODULE__{} @type option :: @@ -31,7 +39,8 @@ defmodule Queuetopia.Queue.Job do field(:attempted_at, :utc_datetime) field(:attempted_by, :string) field(:next_attempt_at, :utc_datetime) - field(:done_at, :utc_datetime) + field(:ended_at, :utc_datetime) + field(:end_status, :string) field(:error, :string) timestamps(type: :utc_datetime) @@ -73,20 +82,38 @@ defmodule Queuetopia.Queue.Job do |> validate_number(:max_attempts, greater_than_or_equal_to: 0) end - @spec failed_job_changeset(Job.t(), map) :: Ecto.Changeset.t() - def failed_job_changeset(%__MODULE__{} = job, attrs) when is_map(attrs) do + @spec retry_job_changeset(Job.t(), map) :: Ecto.Changeset.t() + def retry_job_changeset(%__MODULE__{} = job, attrs) when is_map(attrs) do job |> cast(attrs, [:attempts, :attempted_at, :attempted_by, :next_attempt_at, :error]) |> validate_required_attempt_attributes |> validate_required([:next_attempt_at, :error]) end + @spec failed_job_changeset(Job.t(), map) :: Ecto.Changeset.t() + def failed_job_changeset(%__MODULE__{} = job, attrs) when is_map(attrs) do + job + |> cast(attrs, [:attempts, :attempted_at, :attempted_by, :ended_at, :end_status, :error]) + |> validate_required_attempt_attributes + |> validate_required([:ended_at, :end_status, :error]) + |> validate_inclusion(:end_status, ["failed", "max_attempts_reached"]) + end + + @spec aborted_job_changeset(Job.t(), map) :: Ecto.Changeset.t() + def aborted_job_changeset(%__MODULE__{} = job, attrs) when is_map(attrs) do + job + |> cast(attrs, [:ended_at]) + |> validate_required([:ended_at]) + |> put_change(:end_status, "aborted") + end + @spec succeeded_job_changeset(Job.t(), map) :: Ecto.Changeset.t() def succeeded_job_changeset(%__MODULE__{} = job, attrs) when is_map(attrs) do job - |> cast(attrs, [:attempts, :attempted_at, :attempted_by, :done_at]) + |> cast(attrs, [:attempts, :attempted_at, :attempted_by, :ended_at]) |> validate_required_attempt_attributes - |> validate_required([:done_at]) + |> validate_required([:ended_at]) + |> put_change(:end_status, "success") |> put_change(:error, nil) end diff --git a/lib/queuetopia/queue/job_queryable.ex b/lib/queuetopia/queue/job_queryable.ex index 24f6b1b..773840f 100644 --- a/lib/queuetopia/queue/job_queryable.ex +++ b/lib/queuetopia/queue/job_queryable.ex @@ -9,8 +9,7 @@ defmodule Queuetopia.Queue.JobQueryable do defp filter_by_field(queryable, {:available?, true}) do queryable - |> where([job], is_nil(job.done_at)) - |> where([job], job.attempts < job.max_attempts) + |> where([job], is_nil(job.ended_at)) end defp filter_by_field(_queryable, {key, _value}) when key not in @filterable_fields do diff --git a/lib/queuetopia/scheduler.ex b/lib/queuetopia/scheduler.ex index 4f63ca4..c86a764 100644 --- a/lib/queuetopia/scheduler.ex +++ b/lib/queuetopia/scheduler.ex @@ -18,6 +18,10 @@ defmodule Queuetopia.Scheduler do end end + def abort_job(scheduler_pid, job) do + GenServer.call(scheduler_pid, {:abort, job}) + end + defp has_poll_messages?(scheduler_pid) do {:messages, messages} = Process.info(scheduler_pid, :messages) @@ -66,30 +70,33 @@ defmodule Queuetopia.Scheduler do {:noreply, %{state | jobs: jobs}} end + @impl true def handle_info( {:DOWN, ref, :process, _pid, reason}, %{jobs: jobs, repo: repo, scope: scope} = state ) do - job = Map.get(jobs, ref) + job = Map.get(jobs, ref)[:job] :ok = handle_task_result(repo, job, {:error, inspect(reason)}) Queue.unlock_queue(repo, scope, job.queue) {:noreply, %{state | jobs: Map.delete(jobs, ref)}} end - def handle_info({:kill, task}, %{jobs: jobs, repo: repo} = state) do + @impl true + def handle_info({:timeout, task}, %{jobs: jobs, repo: repo} = state) do Task.shutdown(task, :brutal_kill) - job = Map.get(jobs, task.ref) + job = Map.get(jobs, task.ref)[:job] :ok = handle_task_result(repo, job, {:error, "job_timeout"}) {:noreply, %{state | jobs: Map.delete(jobs, task.ref)}} end + @impl true def handle_info({ref, task_result}, %{jobs: jobs, repo: repo, scope: scope} = state) do Process.demonitor(ref, [:flush]) - job = Map.get(jobs, ref) + job = Map.get(jobs, ref)[:job] :ok = handle_task_result(repo, job, task_result) @@ -100,6 +107,21 @@ defmodule Queuetopia.Scheduler do {:noreply, %{state | jobs: Map.delete(jobs, ref)}} end + @impl true + def handle_call({:abort, %Job{} = job}, _from, %{jobs: jobs, repo: repo, scope: scope} = state) do + ref = + with {ref, %{job_task: task}} <- Enum.find(jobs, &(elem(&1, 1).job.id == job.id)) do + Task.shutdown(task, :brutal_kill) + ref + end + + :ok = handle_task_result(repo, job, :aborted) + + Queue.unlock_queue(repo, scope, job.queue) + + {:reply, :ok, %{state | jobs: Map.delete(jobs, ref)}} + end + defp handle_task_result(repo, job, result) do unless is_nil(job) do Queue.persist_result!(repo, job, result) @@ -140,8 +162,8 @@ defmodule Queuetopia.Scheduler do {:ok, job} <- Queue.fetch_job(repo, job) do task = Task.Supervisor.async_nolink(task_supervisor_name, Queue, :perform, [job]) - Process.send_after(self(), {:kill, task}, job.timeout) - {task.ref, job} + Process.send_after(self(), {:timeout, task}, job.timeout) + {task.ref, %{job: job, job_task: task}} else _ -> nil end diff --git a/mix.exs b/mix.exs index fe85e19..b68e873 100644 --- a/mix.exs +++ b/mix.exs @@ -2,7 +2,7 @@ defmodule Queuetopia.MixProject do use Mix.Project @source_url "https://github.com/annatel/queuetopia" - @version "2.5.3" + @version "2.6.0" def project do [ diff --git a/test/queuetopia/queue/job_test.exs b/test/queuetopia/queue/job_test.exs index 34db89b..f9fea64 100644 --- a/test/queuetopia/queue/job_test.exs +++ b/test/queuetopia/queue/job_test.exs @@ -123,7 +123,7 @@ defmodule Queuetopia.Queue.JobTest do end end - describe "failed_job_changeset/2" do + describe "retry_job_changeset/2" do test "only permitted_keys are casted" do job = insert!(:job) @@ -136,7 +136,7 @@ defmodule Queuetopia.Queue.JobTest do error: "error" ) - changeset = Job.failed_job_changeset(job, Map.merge(params, %{new_key: "value"})) + changeset = Job.retry_job_changeset(job, Map.merge(params, %{new_key: "value"})) changes_keys = changeset.changes |> Map.keys() assert :attempts in changes_keys @@ -152,7 +152,7 @@ defmodule Queuetopia.Queue.JobTest do test "when required params are missing, returns an invalid changeset" do job = insert!(:job) - changeset = Job.failed_job_changeset(job, %{attempts: nil, scheduled_at: nil}) + changeset = Job.retry_job_changeset(job, %{attempts: nil, scheduled_at: nil}) refute changeset.valid? assert %{attempts: ["can't be blank"]} = errors_on(changeset) @@ -167,7 +167,7 @@ defmodule Queuetopia.Queue.JobTest do job = insert!(:job) changeset = - Job.failed_job_changeset(job, %{ + Job.retry_job_changeset(job, %{ attempts: 1, attempted_at: utc_now, attempted_by: Atom.to_string(Node.self()), @@ -179,6 +179,123 @@ defmodule Queuetopia.Queue.JobTest do end end + describe "failed_job_changeset/2" do + test "only permitted_keys are casted" do + job = insert!(:job) + + params = + params_for(:job, + attempts: 6, + attempted_at: utc_now(), + attempted_by: Atom.to_string(Node.self()), + ended_at: utc_now(), + end_status: "failed", + error: "error" + ) + + changeset = Job.failed_job_changeset(job, Map.merge(params, %{new_key: "value"})) + changes_keys = changeset.changes |> Map.keys() + + assert :attempts in changes_keys + assert :attempted_at in changes_keys + assert :attempted_by in changes_keys + assert :ended_at in changes_keys + assert :end_status in changes_keys + assert :error in changes_keys + assert Enum.count(changes_keys) == 6 + + refute :new_key in changes_keys + end + + test "when required params are missing, returns an invalid changeset" do + job = insert!(:job) + + changeset = Job.failed_job_changeset(job, %{attempts: nil, scheduled_at: nil}) + + refute changeset.valid? + assert %{attempts: ["can't be blank"]} = errors_on(changeset) + assert %{attempted_at: ["can't be blank"]} = errors_on(changeset) + assert %{attempted_by: ["can't be blank"]} = errors_on(changeset) + assert %{ended_at: ["can't be blank"]} = errors_on(changeset) + assert %{error: ["can't be blank"]} = errors_on(changeset) + end + + test "when invalid end_status is given, returns an invalid changeset" do + job = insert!(:job) + + changeset = + Job.failed_job_changeset(job, %{ + attempts: 1, + attempted_at: utc_now(), + attempted_by: Atom.to_string(Node.self()), + ended_at: utc_now(), + end_status: "some_unsupported_invalid_status", + error: "error" + }) + + refute changeset.valid? + assert %{end_status: ["is invalid"]} = errors_on(changeset) + end + + test "when params are valid, return a valid changeset" do + job = insert!(:job) + + changeset = + Job.failed_job_changeset(job, %{ + attempts: 1, + attempted_at: utc_now(), + attempted_by: Atom.to_string(Node.self()), + ended_at: utc_now(), + end_status: "failed", + error: "error" + }) + + assert changeset.valid? + end + end + + describe "aborted_job_changeset/2" do + test "only permitted_keys are casted" do + job = insert!(:job) + + params = params_for(:job, ended_at: utc_now()) + + changeset = Job.aborted_job_changeset(job, Map.merge(params, %{new_key: "value"})) + changes_keys = changeset.changes |> Map.keys() + + assert :ended_at in changes_keys + assert :end_status in changes_keys + assert Enum.count(changes_keys) == 2 + + refute :new_key in changes_keys + end + + test "when required params are missing, returns an invalid changeset" do + job = insert!(:job) + + changeset = Job.aborted_job_changeset(job, %{attempts: nil, scheduled_at: nil}) + + refute changeset.valid? + assert %{ended_at: ["can't be blank"]} = errors_on(changeset) + end + + test "fills the end_status field by 'aborted'" do + job = insert!(:job) + + changeset = Job.aborted_job_changeset(job, %{ended_at: utc_now()}) + + assert changeset.changes.end_status == "aborted" + end + + test "when params are valid, return a valid changeset" do + job = insert!(:job) + + changeset = Job.aborted_job_changeset(job, %{ended_at: utc_now()}) + + assert changeset.valid? + end + end + describe "succeeded_job_changeset/2" do test "only permitted_keys are casted" do job = insert!(:job) @@ -188,7 +305,7 @@ defmodule Queuetopia.Queue.JobTest do attempts: 6, attempted_at: utc_now(), attempted_by: Atom.to_string(Node.self()), - done_at: utc_now() + ended_at: utc_now() ) changeset = Job.succeeded_job_changeset(job, Map.merge(params, %{new_key: "value"})) @@ -197,8 +314,9 @@ defmodule Queuetopia.Queue.JobTest do assert :attempts in changes_keys assert :attempted_at in changes_keys assert :attempted_by in changes_keys - assert :done_at in changes_keys - assert Enum.count(changes_keys) == 4 + assert :ended_at in changes_keys + assert :end_status in changes_keys + assert Enum.count(changes_keys) == 5 refute :new_key in changes_keys end @@ -211,7 +329,7 @@ defmodule Queuetopia.Queue.JobTest do assert %{attempts: ["can't be blank"]} = errors_on(changeset) assert %{attempted_at: ["can't be blank"]} = errors_on(changeset) assert %{attempted_by: ["can't be blank"]} = errors_on(changeset) - assert %{done_at: ["can't be blank"]} = errors_on(changeset) + assert %{ended_at: ["can't be blank"]} = errors_on(changeset) end test "nillifies error field" do @@ -222,7 +340,7 @@ defmodule Queuetopia.Queue.JobTest do attempts: 6, attempted_at: utc_now(), attempted_by: Atom.to_string(Node.self()), - done_at: utc_now() + ended_at: utc_now() ) changeset = Job.succeeded_job_changeset(job, params) @@ -230,6 +348,22 @@ defmodule Queuetopia.Queue.JobTest do assert is_nil(changeset.changes.error) end + test "fills the end_status field by 'success'" do + job = insert!(:job) + + params = + params_for(:job, + attempts: 6, + attempted_at: utc_now(), + attempted_by: Atom.to_string(Node.self()), + ended_at: utc_now() + ) + + changeset = Job.succeeded_job_changeset(job, params) + + assert changeset.changes.end_status == "success" + end + test "when params are valid, return a valid changeset" do utc_now = utc_now() job = insert!(:job) @@ -239,7 +373,7 @@ defmodule Queuetopia.Queue.JobTest do attempts: 1, attempted_at: utc_now, attempted_by: Atom.to_string(Node.self()), - done_at: utc_now + ended_at: utc_now }) assert changeset.valid? diff --git a/test/queuetopia/queue_test.exs b/test/queuetopia/queue_test.exs index d2df70a..1328cae 100644 --- a/test/queuetopia/queue_test.exs +++ b/test/queuetopia/queue_test.exs @@ -38,20 +38,8 @@ defmodule Queuetopia.QueueTest do assert [] = Queue.list_available_pending_queues(TestRepo, scope) end - test "don't list queues whom jobs are done" do - %{queue: _queue_1, scope: scope_1} = insert!(:done_job) - assert [] = Queue.list_available_pending_queues(TestRepo, scope_1) - end - - test "don't list queues whom jobs reached the maximum number of attempts" do - %{queue: _queue, scope: scope} = - insert!(:job, - scheduled_at: utc_now() |> add(-3600), - next_attempt_at: utc_now(), - attempts: 5, - max_attempts: 5 - ) - + test "don't list queues whom jobs are ended" do + %{queue: _queue, scope: scope} = insert!(:ended_job) assert [] = Queue.list_available_pending_queues(TestRepo, scope) end @@ -89,7 +77,7 @@ defmodule Queuetopia.QueueTest do describe "get_next_pending_job/2" do test "returns the next pending job for a given scoped queue" do - %{queue: queue_1, scope: scope_1} = insert!(:done_job) + %{queue: queue_1, scope: scope_1} = insert!(:ended_job) %{id: id_1} = insert!(:job, queue: queue_1, scope: scope_1) %{id: id_2, queue: queue_2} = insert!(:job, scope: scope_1) @@ -158,13 +146,6 @@ defmodule Queuetopia.QueueTest do assert is_nil(Queue.get_next_pending_job(TestRepo, scope, queue)) end - - test "when max job attempts is reached, returns nil" do - %Job{queue: queue, scope: scope} = - insert!(:job, next_attempt_at: utc_now(), attempts: 20, max_attempts: 20) - - assert is_nil(Queue.get_next_pending_job(TestRepo, scope, queue)) - end end describe "fetch_job/2" do @@ -188,17 +169,10 @@ defmodule Queuetopia.QueueTest do assert %Lock{id: ^id} = TestRepo.get_by(Lock, scope: scope, queue: queue) end - test "when the job is done" do - %Job{queue: queue, scope: scope} = job = insert!(:done_job) - - assert {:error, "already done"} = Queue.fetch_job(TestRepo, job) - assert is_nil(TestRepo.get_by(Lock, scope: scope, queue: queue)) - end - - test "when max job attempts is reached, returns error" do - %{queue: queue, scope: scope} = job = insert!(:job, attempts: 20, max_attempts: 20) + test "when the job is ended" do + %Job{queue: queue, scope: scope} = job = insert!(:ended_job) - assert {:error, "max attempts reached"} = Queue.fetch_job(TestRepo, job) + assert {:error, "already ended"} = Queue.fetch_job(TestRepo, job) assert is_nil(TestRepo.get_by(Lock, scope: scope, queue: queue)) end @@ -295,17 +269,19 @@ defmodule Queuetopia.QueueTest do _ = Queue.persist_result!(TestRepo, job, :ok) %Job{ - done_at: done_at, + ended_at: ended_at, + end_status: end_status, attempted_at: attempted_at, attempted_by: attempted_by, attempts: attempts } = TestRepo.reload(job) - refute is_nil(done_at) refute is_nil(attempted_at) + refute is_nil(ended_at) + assert end_status == "success" assert attempted_by == Atom.to_string(Node.self()) assert attempts == 1 - assert done_at == attempted_at + assert ended_at == attempted_at end test "when a job succeeded with a result, persists the job as succeeded" do @@ -314,45 +290,73 @@ defmodule Queuetopia.QueueTest do _ = Queue.persist_result!(TestRepo, job, {:ok, :done}) %Job{ - done_at: done_at, + ended_at: ended_at, + end_status: end_status, attempted_at: attempted_at, attempted_by: attempted_by, attempts: attempts } = TestRepo.reload(job) - refute is_nil(done_at) refute is_nil(attempted_at) + refute is_nil(ended_at) + assert end_status == "success" assert attempted_by == Atom.to_string(Node.self()) assert attempts == 1 - assert done_at == attempted_at + assert ended_at == attempted_at end - test "when a job failed, persists the job as failed and record the error" do + test "when a job failed and max_attempts is not reached, persists the job and record the error and next_attempt_at" do job = insert!(:failure_job) _ = Queue.persist_result!(TestRepo, job, {:error, "error"}) %Job{} = job = TestRepo.reload(job) - assert job.done_at == nil - refute job.attempted_at == nil + refute is_nil(job.attempted_at) + refute is_nil(job.next_attempt_at) + assert is_nil(job.ended_at) assert job.attempted_by == Atom.to_string(Node.self()) assert job.attempts == 1 assert job.error == "error" end - test "when a job returns an unexpected_response, persists the job as failed and record the response" do + test "when a job failed and max_attempts is reached, persists the job as failed and record the error" do + job = insert!(:failure_job, attempts: 9, max_attempts: 10) + + _ = Queue.persist_result!(TestRepo, job, {:error, "error"}) + + %Job{} = job = TestRepo.reload(job) + refute is_nil(job.ended_at) + refute is_nil(job.attempted_at) + assert job.attempted_by == Atom.to_string(Node.self()) + assert job.attempts == 10 + assert job.error == "error" + assert job.end_status == "max_attempts_reached" + end + + test "when a job returns an unexpected_response, persists the job and record the error and the response" do job = insert!(:failure_job) _ = Queue.persist_result!(TestRepo, job, "unexpected_response") %Job{} = job = TestRepo.reload(job) - assert job.done_at == nil - refute job.attempted_at == nil + refute is_nil(job.attempted_at) + refute is_nil(job.next_attempt_at) + assert is_nil(job.ended_at) assert job.attempted_by == Atom.to_string(Node.self()) assert job.attempts == 1 assert job.error == "\"unexpected_response\"" end + test "when a job aborted, persists the job as aborted and record the ended_at and end_status" do + job = insert!(:job) + + _ = Queue.persist_result!(TestRepo, job, :aborted) + + %Job{} = job = TestRepo.reload(job) + refute is_nil(job.ended_at) + assert job.end_status == "aborted" + end + test "when handle_failed_job/1 is defined by the performer" do %{id: id} = job = @@ -363,12 +367,12 @@ defmodule Queuetopia.QueueTest do _ = Queue.persist_result!(TestRepo, job, {:error, "error"}) %Job{} = job = TestRepo.reload(job) - assert job.done_at == nil + assert job.ended_at == nil refute job.attempted_at == nil assert job.attempted_by == Atom.to_string(Node.self()) assert job.attempts == 1 - assert_receive {:job, %Job{id: ^id, done_at: nil, attempted_at: %DateTime{}, attempts: 1}}, + assert_receive {:job, %Job{id: ^id, ended_at: nil, attempted_at: %DateTime{}, attempts: 1}}, 100 end @@ -382,7 +386,7 @@ defmodule Queuetopia.QueueTest do Queue.persist_result!(TestRepo, job, {:error, "error"}) %Job{ - done_at: nil, + ended_at: nil, attempted_at: attempted_at, next_attempt_at: next_attempt_at } = TestRepo.reload(job) @@ -428,7 +432,7 @@ defmodule Queuetopia.QueueTest do _ = Queue.persist_result!(TestRepo, job, {:error, "error"}) %Job{ - done_at: nil, + ended_at: nil, attempted_at: attempted_at, next_attempt_at: next_attempt_at } = TestRepo.reload(job) @@ -442,7 +446,7 @@ defmodule Queuetopia.QueueTest do _ = Queue.persist_result!(TestRepo, job, {:error, "error"}) %Job{ - done_at: nil, + ended_at: nil, attempted_at: attempted_at, next_attempt_at: next_attempt_at } = TestRepo.reload(job) @@ -456,23 +460,18 @@ defmodule Queuetopia.QueueTest do end describe "processable_now?/1" do - test "when the job is processable now" do + test "when the job is not ended and processable now" do job = insert!(:job) assert Queue.processable_now?(job) end - test "when the job is done" do - job = insert!(:done_job) - refute Queue.processable_now?(job) - end - - test "when max job attempts is reached, returns false" do - job = insert!(:job, attempts: 10, max_attempts: 10) + test "when the job is not ended and scheduled for later" do + job = insert!(:job, scheduled_at: utc_now() |> add(3600, :second)) refute Queue.processable_now?(job) end - test "when the job is scheduled for later" do - job = insert!(:job, scheduled_at: utc_now() |> add(3600, :second)) + test "when the job is ended" do + job = insert!(:ended_job) refute Queue.processable_now?(job) end end @@ -481,11 +480,28 @@ defmodule Queuetopia.QueueTest do test "when the job is not done" do job = insert!(:job) refute Queue.done?(job) + assert is_nil(job.ended_at) end test "when the job is done" do job = insert!(:done_job) assert Queue.done?(job) + refute is_nil(job.ended_at) + assert job.end_status == "success" + end + end + + describe "ended?/1" do + test "when the job is not ended" do + job = insert!(:job) + refute Queue.ended?(job) + assert is_nil(job.ended_at) + end + + test "when the job is done" do + job = insert!(:ended_job) + assert Queue.ended?(job) + refute is_nil(job.ended_at) end end @@ -572,12 +588,7 @@ defmodule Queuetopia.QueueTest do end test "filters" do - insert!(:job, done_at: utc_now()) - - assert %{data: [], total: 0} = - Queue.paginate_jobs(TestRepo, 100, 1, filters: [available?: true]) - - insert!(:job, attempts: 3, max_attempts: 3) + insert!(:job, ended_at: utc_now()) assert %{data: [], total: 0} = Queue.paginate_jobs(TestRepo, 100, 1, filters: [available?: true]) @@ -638,11 +649,7 @@ defmodule Queuetopia.QueueTest do end test "filters" do - insert!(:job, done_at: utc_now()) - - assert Queue.list_jobs(TestRepo, filters: [available?: true]) == [] - - insert!(:job, attempts: 1, max_attempts: 1) + insert!(:job, ended_at: utc_now()) assert Queue.list_jobs(TestRepo, filters: [available?: true]) == [] diff --git a/test/queuetopia/scheduler_test.exs b/test/queuetopia/scheduler_test.exs index 6d75fd8..3c4f40f 100644 --- a/test/queuetopia/scheduler_test.exs +++ b/test/queuetopia/scheduler_test.exs @@ -71,7 +71,7 @@ defmodule Queuetopia.SchedulerTest do assert_receive {_, _, :started}, 50 assert_receive {_, _, :started}, 50 %{jobs: jobs} = :sys.get_state(TestQueuetopia.Scheduler) - job_ids = jobs |> Enum.map(fn {_, job} -> job.id end) + job_ids = jobs |> Enum.map(&elem(&1, 1)[:job].id) assert job_id_1 in job_ids assert job_id_2 in job_ids @@ -79,14 +79,14 @@ defmodule Queuetopia.SchedulerTest do refute_receive {_, _, :started}, 30 %{jobs: jobs} = :sys.get_state(TestQueuetopia.Scheduler) - job_ids = jobs |> Enum.map(fn {_, job} -> job.id end) + job_ids = jobs |> Enum.map(&elem(&1, 1)[:job].id) assert job_id_1 in job_ids assert job_id_2 in job_ids assert_receive {_, _, :ok}, 100 assert_receive {_, _, :started}, 100 %{jobs: jobs} = :sys.get_state(TestQueuetopia.Scheduler) - job_ids = jobs |> Enum.map(fn {_, job} -> job.id end) + job_ids = jobs |> Enum.map(&elem(&1, 1)[:job].id) assert job_id_1 in job_ids assert job_id_3 in job_ids @@ -260,7 +260,7 @@ defmodule Queuetopia.SchedulerTest do scope = TestQueuetopia.scope() %{id: failing_job_id, queue: queue} = - insert!(:failure_job, scope: scope, max_attempts: 10000) + insert!(:failure_job, scope: scope, max_attempts: 1000) start_supervised!(TestQueuetopia) @@ -335,7 +335,7 @@ defmodule Queuetopia.SchedulerTest do assert %Job{ id: ^failing_job_id, - done_at: nil, + ended_at: nil, attempted_at: attempted_at, attempted_by: attempted_by, attempts: attempts, @@ -366,7 +366,7 @@ defmodule Queuetopia.SchedulerTest do assert %Job{ id: ^slow_job_id, - done_at: nil, + ended_at: nil, attempted_at: attempted_at, attempted_by: attempted_by, attempts: attempts, @@ -400,7 +400,7 @@ defmodule Queuetopia.SchedulerTest do assert %Job{ id: ^raising_job_id, - done_at: nil, + ended_at: nil, attempted_at: attempted_at, attempted_by: attempted_by, attempts: attempts, @@ -467,4 +467,47 @@ defmodule Queuetopia.SchedulerTest do :sys.get_state(TestQueuetopia.Scheduler) end + + describe "abort_job/2" do + test "when task for a job is not running, records the ended_at and end_status fields to the job" do + job = insert!(:job, scheduled_at: utc_now() |> DateTime.add(15, :second)) + + start_supervised!(TestQueuetopia) + scheduler_pid = Process.whereis(TestQueuetopia.Scheduler) + + assert :ok = Queuetopia.Scheduler.abort_job(scheduler_pid, job) + + job = TestRepo.get_by(Job, id: job.id) + refute is_nil(job.ended_at) + assert job.end_status == "aborted" + end + + test "when task for a job is running, terminates the task and records the ended_at and end_status fields to the job" do + scope = TestQueuetopia.scope() + + %Job{id: job_id, queue: queue} = + job = + insert!(:slow_job, + params: %{"duration" => 500}, + timeout: 10_000, + scope: scope + ) + + start_supervised!(TestQueuetopia) + scheduler_pid = Process.whereis(TestQueuetopia.Scheduler) + + assert_receive {^queue, ^job_id, :started}, 100 + assert %Lock{} = TestRepo.get_by(Lock, queue: queue, scope: scope) + + assert :ok = Queuetopia.Scheduler.abort_job(scheduler_pid, job) + + job = TestRepo.get_by(Job, id: job.id) + refute is_nil(job.ended_at) + assert job.end_status == "aborted" + + assert is_nil(TestRepo.get_by(Lock, queue: queue, scope: scope)) + + refute_receive {^queue, ^job_id, :ok}, 600 + end + end end diff --git a/test/queuetopia_test.exs b/test/queuetopia_test.exs index 3a242cf..eee97c3 100644 --- a/test/queuetopia_test.exs +++ b/test/queuetopia_test.exs @@ -156,6 +156,26 @@ defmodule QueuetopiaTest do assert %{data: [], total: 2} = TestQueuetopia.paginate_jobs(1, 3) end + test "abort_job/1" do + scope = TestQueuetopia.scope() + + %Job{id: job_id, queue: queue} = + job = + insert!(:slow_job, + params: %{"duration" => 500}, + timeout: 10_000, + scope: scope + ) + + start_supervised!(TestQueuetopia) + + assert_receive {^queue, ^job_id, :started}, 100 + + assert :ok = TestQueuetopia.abort_job(job) + + refute_receive {^queue, ^job_id, :ok}, 600 + end + describe "handle_event/1" do test "sends a poll message to the scheduler" do Application.put_env(:queuetopia, TestQueuetopia, poll_interval: 5_000) diff --git a/test/support/factory.ex b/test/support/factory.ex index 3994b0d..b6c560a 100644 --- a/test/support/factory.ex +++ b/test/support/factory.ex @@ -42,10 +42,16 @@ defmodule Queuetopia.Factory do |> struct!(attrs) end + def build(:ended_job, attrs) do + job = build(:job, attrs) + + %{job | ended_at: utc_now()} + end + def build(:done_job, attrs) do job = build(:job, attrs) - %{job | done_at: utc_now()} + %{job | ended_at: utc_now(), end_status: "success"} end def build(:raising_job, attrs) do