Skip to content
11 changes: 11 additions & 0 deletions lib/queuetopia.ex
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,17 @@ defmodule Queuetopia do
Queuetopia.Queue.paginate_jobs(@repo, page_size, page_number, opts)
end

@spec abort_job(Job.t()) :: :ok | {:error, any}
def abort_job(%Job{} = job) do
scheduler_pid = Process.whereis(scheduler())

if is_pid(scheduler_pid) do
Queuetopia.Scheduler.abort_job(scheduler_pid, job)
else
{:error, "#{inspect(__MODULE__)} is down"}
end
end

def handle_event(:new_incoming_job) do
listen(:new_incoming_job)
end
Expand Down
2 changes: 1 addition & 1 deletion lib/queuetopia/migrations.ex
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ defmodule Queuetopia.Migrations do
use Ecto.Migration

@initial_version 0
@current_version 5
@current_version 6
def up(opts \\ []) when is_list(opts) do
from_version = Keyword.get(opts, :from_version, @initial_version) |> Kernel.+(1)
to_version = Keyword.get(opts, :to_version, @current_version)
Expand Down
45 changes: 45 additions & 0 deletions lib/queuetopia/migrations/v6.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
defmodule Queuetopia.Migrations.V6 do
@moduledoc false

use Ecto.Migration

def up do
rename(table(:queuetopia_jobs), :done_at, to: :ended_at)

rename(index(:queuetopia_jobs, [:ended_at], name: "queuetopia_jobs_done_at_index"),
to: "queuetopia_jobs_ended_at_index"
)

alter table(:queuetopia_jobs) do
add(:end_status, :string, null: true)
end

set_end_status_to_success = """
UPDATE queuetopia_jobs
SET end_status = 'success'
WHERE ended_at IS NOT NULL AND error IS NULL;
"""

execute(set_end_status_to_success)

set_end_status_to_max_attempts_reached = """
UPDATE queuetopia_jobs
SET ended_at = attempted_at, end_status = 'max_attempts_reached'
WHERE ended_at IS NULL AND attempts >= max_attempts;
"""

execute(set_end_status_to_max_attempts_reached)
end

def down do
rename(table(:queuetopia_jobs), :ended_at, to: :done_at)

rename(index(:queuetopia_jobs, [:done_at], name: "queuetopia_jobs_ended_at_index"),
to: "queuetopia_jobs_done_at_index"
)

alter table(:queuetopia_jobs) do
remove(:end_status)
end
end
end
69 changes: 52 additions & 17 deletions lib/queuetopia/queue.ex
Original file line number Diff line number Diff line change
Expand Up @@ -103,21 +103,30 @@ defmodule Queuetopia.Queue do
end

@doc """
Returns true if a job scheduled date is reached and the job is not done yet.
Returns true if a job scheduled date is reached and the job is not ended yet.
Otherwise, returns false.
"""
@spec processable_now?(Job.t()) :: boolean
def processable_now?(%Job{} = job) do
not done?(job) and not max_attempts_reached?(job) and scheduled_for_now?(job)
not ended?(job) and scheduled_for_now?(job)
end

@doc """
Returns true if a job is done.
Returns true if a job is successfully ended.
Otherwise, returns false.
"""
@spec done?(Job.t()) :: boolean
def done?(%Job{} = job) do
not is_nil(job.done_at)
ended?(job) and job.end_status == "success"
end

@doc """
Returns true if a job is somehow ended.
Otherwise, returns false.
"""
@spec ended?(Job.t()) :: boolean
def ended?(%Job{} = job) do
not is_nil(job.ended_at)
end

@doc """
Expand Down Expand Up @@ -156,7 +165,7 @@ defmodule Queuetopia.Queue do
Job
|> select([:queue])
|> where([j], j.scope == ^scope)
|> where([j], is_nil(j.done_at))
|> where([j], is_nil(j.ended_at))
|> where([j], j.scheduled_at <= ^utc_now and j.next_attempt_at > ^utc_now)

where_immediately_executable_job = fn queryable ->
Expand All @@ -173,8 +182,7 @@ defmodule Queuetopia.Queue do
query =
Job
|> where([j], j.scope == ^scope)
|> where([j], is_nil(j.done_at))
|> where([j], j.attempts < j.max_attempts)
|> where([j], is_nil(j.ended_at))
|> where([j], j.queue not in subquery(locked_queues))
|> where([j], j.queue not in subquery(blocked_queues))
|> where_immediately_executable_job.()
Expand All @@ -200,8 +208,7 @@ defmodule Queuetopia.Queue do
Job
|> where([j], j.queue == ^queue)
|> where([j], j.scope == ^scope)
|> where([j], is_nil(j.done_at))
|> where([j], j.attempts < j.max_attempts)
|> where([j], is_nil(j.ended_at))
|> order_by(asc: :scheduled_at, asc: :sequence)
|> limit(1)
|> repo.one()
Expand All @@ -222,14 +229,11 @@ defmodule Queuetopia.Queue do
|> Ecto.Multi.run(:job, fn _, _ ->
job = repo.get(Job, id)

with {:done?, false} <- {:done?, done?(job)},
{:max_attempts_reached?, false} <-
{:max_attempts_reached?, max_attempts_reached?(job)},
with {:ended?, false} <- {:ended?, ended?(job)},
{:scheduled_for_now?, true} <- {:scheduled_for_now?, scheduled_for_now?(job)} do
{:ok, job}
else
{:done?, true} -> {:error, "already done"}
{:max_attempts_reached?, true} -> {:error, "max attempts reached"}
{:ended?, true} -> {:error, "already ended"}
{:scheduled_for_now?, false} -> {:error, "scheduled for later"}
end
end)
Expand All @@ -249,24 +253,45 @@ defmodule Queuetopia.Queue do
end

@doc false
@spec persist_result!(module, Job.t(), {:error, any} | :ok | {:ok, any}) :: Job.t()
@spec persist_result!(module, Job.t(), {:error, any} | :ok | {:ok, any} | :aborted) :: Job.t()

def persist_result!(repo, %Job{} = job, {:ok, _res}), do: persist_success!(repo, job)
def persist_result!(repo, %Job{} = job, :ok), do: persist_success!(repo, job)

def persist_result!(repo, %Job{} = job, :aborted),
do: persist_abort!(repo, job)

def persist_result!(repo, %Job{} = job, {:error, error}) when is_binary(error),
do: persist_failure!(repo, job, error)

def persist_result!(repo, %Job{} = job, unexpected_response),
do: persist_failure!(repo, job, inspect(unexpected_response))

defp persist_failure!(repo, %Job{attempts: attempts, max_attempts: max_attempts} = job, error)
when attempts + 1 >= max_attempts do
utc_now = DateTime.utc_now() |> DateTime.truncate(:second)
performer = resolve_performer(job)

job
|> Job.failed_job_changeset(%{
attempts: job.attempts + 1,
attempted_at: utc_now,
attempted_by: Atom.to_string(Node.self()),
ended_at: utc_now,
end_status: "max_attempts_reached",
error: error
})
|> repo.update!()
|> tap(&performer.handle_failed_job!/1)
end

defp persist_failure!(repo, %Job{} = job, error) do
utc_now = DateTime.utc_now() |> DateTime.truncate(:second)
performer = resolve_performer(job)
backoff = performer.backoff(job)

job
|> Job.failed_job_changeset(%{
|> Job.retry_job_changeset(%{
attempts: job.attempts + 1,
attempted_at: utc_now,
attempted_by: Atom.to_string(Node.self()),
Expand All @@ -277,6 +302,16 @@ defmodule Queuetopia.Queue do
|> tap(&performer.handle_failed_job!/1)
end

defp persist_abort!(repo, %Job{} = job) do
utc_now = DateTime.utc_now() |> DateTime.truncate(:second)

job
|> Job.aborted_job_changeset(%{
ended_at: utc_now
})
|> repo.update!()
end

defp persist_success!(repo, %Job{} = job) do
utc_now = DateTime.utc_now() |> DateTime.truncate(:second)

Expand All @@ -285,7 +320,7 @@ defmodule Queuetopia.Queue do
attempts: job.attempts + 1,
attempted_at: utc_now,
attempted_by: Atom.to_string(Node.self()),
done_at: utc_now
ended_at: utc_now
})
|> repo.update!()
end
Expand Down
39 changes: 33 additions & 6 deletions lib/queuetopia/queue/job.ex
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,15 @@ defmodule Queuetopia.Queue.Job do
@moduledoc false

use Ecto.Schema
import Ecto.Changeset, only: [cast: 3, put_change: 3, validate_number: 3, validate_required: 2]

import Ecto.Changeset,
only: [
cast: 3,
put_change: 3,
validate_number: 3,
validate_required: 2,
validate_inclusion: 3
]

@type t :: %__MODULE__{}
@type option ::
Expand Down Expand Up @@ -31,7 +39,8 @@ defmodule Queuetopia.Queue.Job do
field(:attempted_at, :utc_datetime)
field(:attempted_by, :string)
field(:next_attempt_at, :utc_datetime)
field(:done_at, :utc_datetime)
field(:ended_at, :utc_datetime)
field(:end_status, :string)
field(:error, :string)

timestamps(type: :utc_datetime)
Expand Down Expand Up @@ -73,20 +82,38 @@ defmodule Queuetopia.Queue.Job do
|> validate_number(:max_attempts, greater_than_or_equal_to: 0)
end

@spec failed_job_changeset(Job.t(), map) :: Ecto.Changeset.t()
def failed_job_changeset(%__MODULE__{} = job, attrs) when is_map(attrs) do
@spec retry_job_changeset(Job.t(), map) :: Ecto.Changeset.t()
def retry_job_changeset(%__MODULE__{} = job, attrs) when is_map(attrs) do
job
|> cast(attrs, [:attempts, :attempted_at, :attempted_by, :next_attempt_at, :error])
|> validate_required_attempt_attributes
|> validate_required([:next_attempt_at, :error])
end

@spec failed_job_changeset(Job.t(), map) :: Ecto.Changeset.t()
def failed_job_changeset(%__MODULE__{} = job, attrs) when is_map(attrs) do
job
|> cast(attrs, [:attempts, :attempted_at, :attempted_by, :ended_at, :end_status, :error])
|> validate_required_attempt_attributes
|> validate_required([:ended_at, :end_status, :error])
|> validate_inclusion(:end_status, ["failed", "max_attempts_reached"])
end

@spec aborted_job_changeset(Job.t(), map) :: Ecto.Changeset.t()
def aborted_job_changeset(%__MODULE__{} = job, attrs) when is_map(attrs) do
job
|> cast(attrs, [:ended_at])
|> validate_required([:ended_at])
|> put_change(:end_status, "aborted")
end

@spec succeeded_job_changeset(Job.t(), map) :: Ecto.Changeset.t()
def succeeded_job_changeset(%__MODULE__{} = job, attrs) when is_map(attrs) do
job
|> cast(attrs, [:attempts, :attempted_at, :attempted_by, :done_at])
|> cast(attrs, [:attempts, :attempted_at, :attempted_by, :ended_at])
|> validate_required_attempt_attributes
|> validate_required([:done_at])
|> validate_required([:ended_at])
|> put_change(:end_status, "success")
|> put_change(:error, nil)
end

Expand Down
3 changes: 1 addition & 2 deletions lib/queuetopia/queue/job_queryable.ex
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,7 @@ defmodule Queuetopia.Queue.JobQueryable do

defp filter_by_field(queryable, {:available?, true}) do
queryable
|> where([job], is_nil(job.done_at))
|> where([job], job.attempts < job.max_attempts)
|> where([job], is_nil(job.ended_at))
end

defp filter_by_field(_queryable, {key, _value}) when key not in @filterable_fields do
Expand Down
34 changes: 28 additions & 6 deletions lib/queuetopia/scheduler.ex
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@ defmodule Queuetopia.Scheduler do
end
end

def abort_job(scheduler_pid, job) do
GenServer.call(scheduler_pid, {:abort, job})
end

defp has_poll_messages?(scheduler_pid) do
{:messages, messages} = Process.info(scheduler_pid, :messages)

Expand Down Expand Up @@ -66,30 +70,33 @@ defmodule Queuetopia.Scheduler do
{:noreply, %{state | jobs: jobs}}
end

@impl true
def handle_info(
{:DOWN, ref, :process, _pid, reason},
%{jobs: jobs, repo: repo, scope: scope} = state
) do
job = Map.get(jobs, ref)
job = Map.get(jobs, ref)[:job]
:ok = handle_task_result(repo, job, {:error, inspect(reason)})

Queue.unlock_queue(repo, scope, job.queue)
{:noreply, %{state | jobs: Map.delete(jobs, ref)}}
end

def handle_info({:kill, task}, %{jobs: jobs, repo: repo} = state) do
@impl true
def handle_info({:timeout, task}, %{jobs: jobs, repo: repo} = state) do
Task.shutdown(task, :brutal_kill)

job = Map.get(jobs, task.ref)
job = Map.get(jobs, task.ref)[:job]
:ok = handle_task_result(repo, job, {:error, "job_timeout"})

{:noreply, %{state | jobs: Map.delete(jobs, task.ref)}}
end

@impl true
def handle_info({ref, task_result}, %{jobs: jobs, repo: repo, scope: scope} = state) do
Process.demonitor(ref, [:flush])

job = Map.get(jobs, ref)
job = Map.get(jobs, ref)[:job]

:ok = handle_task_result(repo, job, task_result)

Expand All @@ -100,6 +107,21 @@ defmodule Queuetopia.Scheduler do
{:noreply, %{state | jobs: Map.delete(jobs, ref)}}
end

@impl true
def handle_call({:abort, %Job{} = job}, _from, %{jobs: jobs, repo: repo, scope: scope} = state) do
ref =
with {ref, %{job_task: task}} <- Enum.find(jobs, &(elem(&1, 1).job.id == job.id)) do
Task.shutdown(task, :brutal_kill)
ref
end

:ok = handle_task_result(repo, job, :aborted)

Queue.unlock_queue(repo, scope, job.queue)

{:reply, :ok, %{state | jobs: Map.delete(jobs, ref)}}
end

defp handle_task_result(repo, job, result) do
unless is_nil(job) do
Queue.persist_result!(repo, job, result)
Expand Down Expand Up @@ -140,8 +162,8 @@ defmodule Queuetopia.Scheduler do
{:ok, job} <- Queue.fetch_job(repo, job) do
task = Task.Supervisor.async_nolink(task_supervisor_name, Queue, :perform, [job])

Process.send_after(self(), {:kill, task}, job.timeout)
{task.ref, job}
Process.send_after(self(), {:timeout, task}, job.timeout)
{task.ref, %{job: job, job_task: task}}
else
_ -> nil
end
Expand Down
2 changes: 1 addition & 1 deletion mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ defmodule Queuetopia.MixProject do
use Mix.Project

@source_url "https://github.com/annatel/queuetopia"
@version "2.5.3"
@version "2.6.0"

def project do
[
Expand Down
Loading