Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 27 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -90,9 +90,35 @@ defmodule MyApp.MailQueuetopia do
use Queuetopia,
otp_app: :my_app,
performer: MyApp.MailQueuetopia.Performer,
repo: MyApp.Repo
repo: MyApp.Repo,
cleanup_interval: {1, :day},
job_retention: {7, :day},
job_cleaner_max_initial_delay: 100
end
```
#### Job Cleanup Configuration

Queuetopia provides automatic cleanup of completed jobs with the following options:

- **`cleanup_interval`** *(optional)* - Defines how often the job cleaner runs. Must be a tuple like `{1, :day}`, `{2, :hour}`, `{30, :minute}`, etc. If not set, job cleanup is **disabled** and completed jobs will remain in the database indefinitely.

- **`job_retention`** *(optional)* - Defines how long completed jobs are kept before being deleted. Defaults to `{7, :day}` (7 days). Must be a tuple specifying the duration.

- **`job_cleaner_max_initial_delay`** *(optional)* - Maximum delay in milliseconds before the first cleanup runs when the JobCleaner starts. A random delay between 0 and this value is used to prevent multiple nodes from running cleanup simultaneously. If set to 0, cleanup runs immediately on startup. Defaults to a reasonable value to distribute cleanup across nodes.

**Examples:**
```elixir
# Cleanup every hour, keep jobs for 3 days, start immediately
cleanup_interval: {1, :hour},
job_retention: {3, :day},
job_cleaner_max_initial_delay: 0

# Cleanup twice daily, keep jobs for 2 weeks, random startup delay up to 5 minutes
cleanup_interval: {12, :hour},
job_retention: {14, :day},
job_cleaner_max_initial_delay: 300_000
```

A Queuetopia expects a performer to exist.
For example, the performer can be implemented like this:

Expand Down
46 changes: 0 additions & 46 deletions lib/job_cleaner.ex

This file was deleted.

17 changes: 12 additions & 5 deletions lib/queuetopia.ex
Original file line number Diff line number Diff line change
Expand Up @@ -56,16 +56,17 @@ defmodule Queuetopia do
@repo Keyword.fetch!(opts, :repo)
@performer Keyword.fetch!(opts, :performer) |> to_string()
@scope __MODULE__ |> to_string()
@cleanup_interval Keyword.get(opts, :cleanup_interval, nil)
@cleanup_interval Keyword.get(opts, :cleanup_interval)
@job_retention Keyword.get(opts, :job_retention, {7, :day})
@job_cleaner_max_initial_delay Keyword.get(opts, :job_cleaner_max_initial_delay, 1000)
@default_poll_interval 60 * 1_000

defp config(otp_app, queue) when is_atom(otp_app) and is_atom(queue) do
config = Application.get_env(otp_app, queue, [])
[otp_app: otp_app] ++ config
end

defp to_ms({duration, unit}) do
defp to_ms({duration, unit}) when is_integer(duration) and is_atom(unit) do
timestamp = DateTime.utc_now()

timestamp
Expand All @@ -86,17 +87,22 @@ defmodule Queuetopia do
Keyword.get(config, :poll_interval) ||
@default_poll_interval

cleanup_interval = Keyword.get(config, :cleanup_interval) || @cleanup_interval
cleanup_interval = Keyword.get(opts, :cleanup_interval) || @cleanup_interval

cleanup_interval_ms = cleanup_interval && to_ms(cleanup_interval)

job_cleaner_max_initial_delay =
Keyword.get(opts, :job_cleaner_max_initial_delay) || @job_cleaner_max_initial_delay

disable? = Keyword.get(config, :disable?, false)

opts = [
repo: @repo,
poll_interval: poll_interval,
number_of_concurrent_jobs: Keyword.get(config, :number_of_concurrent_jobs),
cleanup_interval: cleanup_interval_ms,
job_retention: Keyword.get(config, :job_retention) || @job_retention
job_retention: @job_retention,
job_cleaner_max_initial_delay: job_cleaner_max_initial_delay
]

if disable?, do: :ignore, else: Supervisor.start_link(__MODULE__, opts, name: __MODULE__)
Expand Down Expand Up @@ -146,7 +152,8 @@ defmodule Queuetopia do
repo: Keyword.fetch!(args, :repo),
scope: @scope,
cleanup_interval: Keyword.fetch!(args, :cleanup_interval),
job_retention: Keyword.fetch!(args, :job_retention)
job_retention: Keyword.fetch!(args, :job_retention),
job_cleaner_max_initial_delay: Keyword.fetch!(args, :job_cleaner_max_initial_delay)
]}
end

Expand Down
46 changes: 46 additions & 0 deletions lib/queuetopia/job_cleaner.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
defmodule Queuetopia.JobCleaner do
@moduledoc """
Removes completed jobs from the queue periodically.

This GenServer runs in the background and cleans up
old completed jobs based on the configured interval.
"""

use GenServer
alias Queuetopia.Queue

def start_link(opts) do
GenServer.start_link(__MODULE__, opts, name: opts[:name])
end

@impl true
def init(opts) do
# Add random dalay to mitigate deadlocks on starts
job_cleaner_initial_delay =
opts |> Keyword.fetch!(:job_cleaner_max_initial_delay) |> random_delay()

Process.send_after(self(), :cleanup, job_cleaner_initial_delay)

state = opts |> Keyword.take([:repo, :scope, :cleanup_interval, :job_retention]) |> Map.new()

{:ok, state}
end

@impl true
def handle_info(:cleanup, state) do
%{
repo: repo,
scope: scope,
cleanup_interval: cleanup_interval,
job_retention: job_retention
} = state

Queue.cleanup_completed_jobs(repo, scope, job_retention)

Process.send_after(self(), :cleanup, cleanup_interval)
{:noreply, state}
end

defp random_delay(0), do: 0
defp random_delay(max) when max > 0, do: :rand.uniform(max)
end
2 changes: 1 addition & 1 deletion mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ defmodule Queuetopia.MixProject do
use Mix.Project

@source_url "https://github.com/annatel/queuetopia"
@version "2.7.0"
@version "2.7.1"

def project do
[
Expand Down
47 changes: 33 additions & 14 deletions test/job_cleaner_test.exs → test/queuetopia/job_cleaner_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -11,18 +11,10 @@ defmodule Queuetopia.JobCleanerTest do
|> DateTime.truncate(:second)
end

setup do
Application.put_env(:queuetopia, TestQueuetopia, cleanup_interval: {50, :millisecond})

on_exit(fn ->
Application.put_env(:queuetopia, TestQueuetopia, [])
end)

:ok
end

test "removes completed jobs older than 7 days retention period during periodic cleanup" do
start_supervised!(TestQueuetopia)
start_supervised!(
{TestQueuetopia, cleanup_interval: {100, :millisecond}, job_cleaner_max_initial_delay: 0}
)

:timer.sleep(100)

Expand Down Expand Up @@ -60,7 +52,10 @@ defmodule Queuetopia.JobCleanerTest do
done_at: datetime_days_ago(8)
)

start_supervised!(TestQueuetopia)
start_supervised!(
{TestQueuetopia, cleanup_interval: {100, :millisecond}, job_cleaner_max_initial_delay: 0}
)

:timer.sleep(100)

assert is_nil(TestRepo.get(Job, our_eight_days_old_job.id))
Expand All @@ -78,9 +73,11 @@ defmodule Queuetopia.JobCleanerTest do

assert TestRepo.get(Job, eight_days_old_completed_job.id)

start_supervised!(TestQueuetopia)
start_supervised!(
{TestQueuetopia, cleanup_interval: {100, :millisecond}, job_cleaner_max_initial_delay: 0}
)

:timer.sleep(10)
:timer.sleep(50)

assert is_nil(TestRepo.get(Job, eight_days_old_completed_job.id))
end
Expand All @@ -97,4 +94,26 @@ defmodule Queuetopia.JobCleanerTest do
assert TestRepo.get(Job, old_job.id)
assert TestRepo.get(Job, recent_job.id)
end

test "respects job_cleaner_max_initial_delay before first cleanup" do
scope = TestQueuetopia.scope()

eight_days_old_job =
insert!(:job,
scope: scope,
done_at: datetime_days_ago(8)
)

start_supervised!(
{TestQueuetopia, cleanup_interval: {100, :millisecond}, job_cleaner_max_initial_delay: 20}
)

:timer.sleep(10)
assert TestRepo.get(Job, eight_days_old_job.id), "Job should still exist before initial delay"

:timer.sleep(15)

assert is_nil(TestRepo.get(Job, eight_days_old_job.id)),
"Job should be deleted after initial delay"
end
end