Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .env.example
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
export QUEUETOPIA__DATABASE_TEST_URL="ecto://YOUR_USERNAME:YOUR_PASSWORD@YOUR_HOST:YOUR_PORT/YOUR_DATABASE"
export QUEUETOPIA__DATABASE_TEST_REPO_ADAPTER=
export QUEUETOPIA__DATABASE_TEST_REPO_ADAPTER=
2 changes: 2 additions & 0 deletions .env.mvno_containers_test
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
QUEUETOPIA__DATABASE_TEST_URL="ecto://root:123456@db:3306/queutopia_test"
QUEUETOPIA__DATABASE_TEST_REPO_ADAPTER=myxql
6 changes: 6 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -24,4 +24,10 @@ queuetopia-*.tar

.elixir_ls
.tool-versions

.env
.env.*
!.env.example
!.env.mvno_containers
!.env.mvno_containers_test

2 changes: 1 addition & 1 deletion Earthfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
VERSION 0.7
VERSION 0.6

elixir-base:
FROM --platform=$BUILDPLATFORM elixir:1.18.4-otp-27-alpine
Expand Down
46 changes: 46 additions & 0 deletions lib/job_cleaner.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
defmodule Queuetopia.JobCleaner do
@moduledoc """
Removes completed jobs from the queue periodically.

This GenServer runs in the background and cleans up
old completed jobs based on the configured interval.
"""

use GenServer
alias Queuetopia.Queue

def start_link(opts) do
GenServer.start_link(__MODULE__, opts, name: opts[:name])
end

@impl true
def init(opts) do
Process.send(self(), :cleanup, [])

state = %{
repo: Keyword.fetch!(opts, :repo),
scope: Keyword.fetch!(opts, :scope),
cleanup_interval: Keyword.fetch!(opts, :cleanup_interval),
job_retention: Keyword.get(opts, :job_retention)
}

{:ok, state}
end

@impl true
def handle_info(
:cleanup,
%{
repo: repo,
scope: scope,
cleanup_interval: cleanup_interval,
job_retention: job_retention
} = state
) do
Queue.cleanup_completed_jobs(repo, scope, job_retention)

Process.send_after(self(), :cleanup, cleanup_interval)

{:noreply, state}
end
end
73 changes: 59 additions & 14 deletions lib/queuetopia.ex
Original file line number Diff line number Diff line change
Expand Up @@ -56,14 +56,23 @@ defmodule Queuetopia do
@repo Keyword.fetch!(opts, :repo)
@performer Keyword.fetch!(opts, :performer) |> to_string()
@scope __MODULE__ |> to_string()

@cleanup_interval Keyword.get(opts, :cleanup_interval, nil)
@job_retention Keyword.get(opts, :job_retention, {7, :day})
@default_poll_interval 60 * 1_000

defp config(otp_app, queue) when is_atom(otp_app) and is_atom(queue) do
config = Application.get_env(otp_app, queue, [])
[otp_app: otp_app] ++ config
end

defp to_ms({duration, unit}) do
timestamp = DateTime.utc_now()

timestamp
|> DateTime.add(duration, unit)
|> DateTime.diff(timestamp, :millisecond)
end

@doc """
Starts the Queuetopia supervisor process.
The :poll_interval can also be given in order to config the polling interval of the scheduler.
Expand All @@ -77,33 +86,68 @@ defmodule Queuetopia do
Keyword.get(config, :poll_interval) ||
@default_poll_interval

cleanup_interval = Keyword.get(config, :cleanup_interval) || @cleanup_interval
cleanup_interval_ms = cleanup_interval && to_ms(cleanup_interval)

disable? = Keyword.get(config, :disable?, false)

opts = [
repo: @repo,
poll_interval: poll_interval,
number_of_concurrent_jobs: Keyword.get(config, :number_of_concurrent_jobs)
number_of_concurrent_jobs: Keyword.get(config, :number_of_concurrent_jobs),
cleanup_interval: cleanup_interval_ms,
job_retention: Keyword.get(config, :job_retention) || @job_retention
]

if disable?, do: :ignore, else: Supervisor.start_link(__MODULE__, opts, name: __MODULE__)
end

@impl true
def init(args) do
children = [
{Task.Supervisor, name: task_supervisor()},
{Queuetopia.Scheduler,
[
name: scheduler(),
task_supervisor_name: task_supervisor(),
repo: Keyword.fetch!(args, :repo),
scope: @scope,
poll_interval: Keyword.fetch!(args, :poll_interval),
number_of_concurrent_jobs: Keyword.fetch!(args, :number_of_concurrent_jobs)
]}
build_children(args)
|> Supervisor.init(strategy: :one_for_one)
end

defp build_children(args) do
[
task_supervisor_spec(),
scheduler_spec(args)
]
|> maybe_add_cleanup(args)
end

defp task_supervisor_spec do
{Task.Supervisor, name: task_supervisor()}
end

defp scheduler_spec(args) do
{Queuetopia.Scheduler,
[
name: scheduler(),
task_supervisor_name: task_supervisor(),
repo: Keyword.fetch!(args, :repo),
scope: @scope,
poll_interval: Keyword.fetch!(args, :poll_interval),
number_of_concurrent_jobs: Keyword.fetch!(args, :number_of_concurrent_jobs)
]}
end

defp maybe_add_cleanup(children, args) do
case Keyword.get(args, :cleanup_interval) do
nil -> children
_interval -> children ++ [job_cleaner_spec(args)]
end
end

Supervisor.init(children, strategy: :one_for_one)
defp job_cleaner_spec(args) do
{Queuetopia.JobCleaner,
[
name: job_cleaner(),
repo: Keyword.fetch!(args, :repo),
scope: @scope,
cleanup_interval: Keyword.fetch!(args, :cleanup_interval),
job_retention: Keyword.fetch!(args, :job_retention)
]}
end

defp child_name(child) do
Expand Down Expand Up @@ -238,6 +282,7 @@ defmodule Queuetopia do

defp scheduler(), do: child_name("Scheduler")
defp task_supervisor(), do: child_name("TaskSupervisor")
defp job_cleaner(), do: child_name("JobCleaner")
end
end
end
13 changes: 13 additions & 0 deletions lib/queuetopia/queue.ex
Original file line number Diff line number Diff line change
Expand Up @@ -299,4 +299,17 @@ defmodule Queuetopia.Queue do
|> where([lock], lock.queue == ^queue)
|> repo.delete_all()
end

@doc false
def cleanup_completed_jobs(repo, scope, job_retention \\ {7, :day}) do
{duration, unit} = job_retention
cutoff_date = DateTime.utc_now() |> DateTime.add(-duration, unit)

from(j in Job,
where: j.scope == ^scope,
where: not is_nil(j.done_at),
where: j.done_at < ^cutoff_date
)
|> repo.delete_all()
end
end
2 changes: 1 addition & 1 deletion mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ defmodule Queuetopia.MixProject do
use Mix.Project

@source_url "https://github.com/annatel/queuetopia"
@version "2.6.1"
@version "2.7.0"

def project do
[
Expand Down
100 changes: 100 additions & 0 deletions test/job_cleaner_test.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
defmodule Queuetopia.JobCleanerTest do
use Queuetopia.DataCase, async: false

alias Queuetopia.Queue.Job
alias Queuetopia.TestRepo
alias Queuetopia.TestQueuetopia

defp datetime_days_ago(days) do
DateTime.utc_now()
|> DateTime.add(-days, :day)
|> DateTime.truncate(:second)
end

setup do
Application.put_env(:queuetopia, TestQueuetopia, cleanup_interval: {50, :millisecond})

on_exit(fn ->
Application.put_env(:queuetopia, TestQueuetopia, [])
end)

:ok
end

test "removes completed jobs older than 7 days retention period during periodic cleanup" do
start_supervised!(TestQueuetopia)

:timer.sleep(100)

eight_days_old_job =
insert!(:job,
scope: TestQueuetopia.scope(),
done_at: datetime_days_ago(8)
)

six_days_old_job =
insert!(:job,
scope: TestQueuetopia.scope(),
done_at: datetime_days_ago(6)
)

:timer.sleep(60)

assert is_nil(TestRepo.get(Job, eight_days_old_job.id))
assert TestRepo.get(Job, six_days_old_job.id)
end

test "only removes jobs from its own scope" do
our_queue_scope = TestQueuetopia.scope()
other_queue_scope = "other_queue"

our_eight_days_old_job =
insert!(:job,
scope: our_queue_scope,
done_at: datetime_days_ago(8)
)

other_eight_days_old_job =
insert!(:job,
scope: other_queue_scope,
done_at: datetime_days_ago(8)
)

start_supervised!(TestQueuetopia)
:timer.sleep(100)

assert is_nil(TestRepo.get(Job, our_eight_days_old_job.id))
assert TestRepo.get(Job, other_eight_days_old_job.id)
end

test "starts cleanup immediately when JobCleaner starts" do
scope = TestQueuetopia.scope()

eight_days_old_completed_job =
insert!(:job,
scope: scope,
done_at: datetime_days_ago(8)
)

assert TestRepo.get(Job, eight_days_old_completed_job.id)

start_supervised!(TestQueuetopia)

:timer.sleep(10)

assert is_nil(TestRepo.get(Job, eight_days_old_completed_job.id))
end

test "cleanup is disabled by default" do
old_job = insert!(:job, scope: "default_scope", done_at: datetime_days_ago(30))
recent_job = insert!(:job, scope: "default_scope", done_at: datetime_days_ago(1))

Application.put_env(:queuetopia, TestQueuetopia, [])
start_supervised!(TestQueuetopia)

:timer.sleep(100)

assert TestRepo.get(Job, old_job.id)
assert TestRepo.get(Job, recent_job.id)
end
end
Loading