diff --git a/README.md b/README.md index f4fa8b4..5464a66 100644 --- a/README.md +++ b/README.md @@ -90,9 +90,35 @@ defmodule MyApp.MailQueuetopia do use Queuetopia, otp_app: :my_app, performer: MyApp.MailQueuetopia.Performer, - repo: MyApp.Repo + repo: MyApp.Repo, + cleanup_interval: {1, :day}, + job_retention: {7, :day}, + job_cleaner_max_initial_delay: 100 end ``` +#### Job Cleanup Configuration + +Queuetopia provides automatic cleanup of completed jobs with the following options: + +- **`cleanup_interval`** *(optional)* - Defines how often the job cleaner runs. Must be a tuple like `{1, :day}`, `{2, :hour}`, `{30, :minute}`, etc. If not set, job cleanup is **disabled** and completed jobs will remain in the database indefinitely. + +- **`job_retention`** *(optional)* - Defines how long completed jobs are kept before being deleted. Defaults to `{7, :day}` (7 days). Must be a tuple specifying the duration. + +- **`job_cleaner_max_initial_delay`** *(optional)* - Maximum delay in milliseconds before the first cleanup runs when the JobCleaner starts. A random delay between 0 and this value is used to prevent multiple nodes from running cleanup simultaneously. If set to 0, cleanup runs immediately on startup. Defaults to a reasonable value to distribute cleanup across nodes. + +**Examples:** +```elixir +# Cleanup every hour, keep jobs for 3 days, start immediately +cleanup_interval: {1, :hour}, +job_retention: {3, :day}, +job_cleaner_max_initial_delay: 0 + +# Cleanup twice daily, keep jobs for 2 weeks, random startup delay up to 5 minutes +cleanup_interval: {12, :hour}, +job_retention: {14, :day}, +job_cleaner_max_initial_delay: 300_000 +``` + A Queuetopia expects a performer to exist. For example, the performer can be implemented like this: diff --git a/lib/job_cleaner.ex b/lib/job_cleaner.ex deleted file mode 100644 index c8df631..0000000 --- a/lib/job_cleaner.ex +++ /dev/null @@ -1,46 +0,0 @@ -defmodule Queuetopia.JobCleaner do - @moduledoc """ - Removes completed jobs from the queue periodically. - - This GenServer runs in the background and cleans up - old completed jobs based on the configured interval. - """ - - use GenServer - alias Queuetopia.Queue - - def start_link(opts) do - GenServer.start_link(__MODULE__, opts, name: opts[:name]) - end - - @impl true - def init(opts) do - Process.send(self(), :cleanup, []) - - state = %{ - repo: Keyword.fetch!(opts, :repo), - scope: Keyword.fetch!(opts, :scope), - cleanup_interval: Keyword.fetch!(opts, :cleanup_interval), - job_retention: Keyword.get(opts, :job_retention) - } - - {:ok, state} - end - - @impl true - def handle_info( - :cleanup, - %{ - repo: repo, - scope: scope, - cleanup_interval: cleanup_interval, - job_retention: job_retention - } = state - ) do - Queue.cleanup_completed_jobs(repo, scope, job_retention) - - Process.send_after(self(), :cleanup, cleanup_interval) - - {:noreply, state} - end -end diff --git a/lib/queuetopia.ex b/lib/queuetopia.ex index cf3f5d5..0df1ab1 100644 --- a/lib/queuetopia.ex +++ b/lib/queuetopia.ex @@ -56,8 +56,9 @@ defmodule Queuetopia do @repo Keyword.fetch!(opts, :repo) @performer Keyword.fetch!(opts, :performer) |> to_string() @scope __MODULE__ |> to_string() - @cleanup_interval Keyword.get(opts, :cleanup_interval, nil) + @cleanup_interval Keyword.get(opts, :cleanup_interval) @job_retention Keyword.get(opts, :job_retention, {7, :day}) + @job_cleaner_max_initial_delay Keyword.get(opts, :job_cleaner_max_initial_delay, 1000) @default_poll_interval 60 * 1_000 defp config(otp_app, queue) when is_atom(otp_app) and is_atom(queue) do @@ -65,7 +66,7 @@ defmodule Queuetopia do [otp_app: otp_app] ++ config end - defp to_ms({duration, unit}) do + defp to_ms({duration, unit}) when is_integer(duration) and is_atom(unit) do timestamp = DateTime.utc_now() timestamp @@ -86,9 +87,13 @@ defmodule Queuetopia do Keyword.get(config, :poll_interval) || @default_poll_interval - cleanup_interval = Keyword.get(config, :cleanup_interval) || @cleanup_interval + cleanup_interval = Keyword.get(opts, :cleanup_interval) || @cleanup_interval + cleanup_interval_ms = cleanup_interval && to_ms(cleanup_interval) + job_cleaner_max_initial_delay = + Keyword.get(opts, :job_cleaner_max_initial_delay) || @job_cleaner_max_initial_delay + disable? = Keyword.get(config, :disable?, false) opts = [ @@ -96,7 +101,8 @@ defmodule Queuetopia do poll_interval: poll_interval, number_of_concurrent_jobs: Keyword.get(config, :number_of_concurrent_jobs), cleanup_interval: cleanup_interval_ms, - job_retention: Keyword.get(config, :job_retention) || @job_retention + job_retention: @job_retention, + job_cleaner_max_initial_delay: job_cleaner_max_initial_delay ] if disable?, do: :ignore, else: Supervisor.start_link(__MODULE__, opts, name: __MODULE__) @@ -146,7 +152,8 @@ defmodule Queuetopia do repo: Keyword.fetch!(args, :repo), scope: @scope, cleanup_interval: Keyword.fetch!(args, :cleanup_interval), - job_retention: Keyword.fetch!(args, :job_retention) + job_retention: Keyword.fetch!(args, :job_retention), + job_cleaner_max_initial_delay: Keyword.fetch!(args, :job_cleaner_max_initial_delay) ]} end diff --git a/lib/queuetopia/job_cleaner.ex b/lib/queuetopia/job_cleaner.ex new file mode 100644 index 0000000..a606af9 --- /dev/null +++ b/lib/queuetopia/job_cleaner.ex @@ -0,0 +1,46 @@ +defmodule Queuetopia.JobCleaner do + @moduledoc """ + Removes completed jobs from the queue periodically. + + This GenServer runs in the background and cleans up + old completed jobs based on the configured interval. + """ + + use GenServer + alias Queuetopia.Queue + + def start_link(opts) do + GenServer.start_link(__MODULE__, opts, name: opts[:name]) + end + + @impl true + def init(opts) do + # Add random dalay to mitigate deadlocks on starts + job_cleaner_initial_delay = + opts |> Keyword.fetch!(:job_cleaner_max_initial_delay) |> random_delay() + + Process.send_after(self(), :cleanup, job_cleaner_initial_delay) + + state = opts |> Keyword.take([:repo, :scope, :cleanup_interval, :job_retention]) |> Map.new() + + {:ok, state} + end + + @impl true + def handle_info(:cleanup, state) do + %{ + repo: repo, + scope: scope, + cleanup_interval: cleanup_interval, + job_retention: job_retention + } = state + + Queue.cleanup_completed_jobs(repo, scope, job_retention) + + Process.send_after(self(), :cleanup, cleanup_interval) + {:noreply, state} + end + + defp random_delay(0), do: 0 + defp random_delay(max) when max > 0, do: :rand.uniform(max) +end diff --git a/mix.exs b/mix.exs index 85c2b1e..0ecb18e 100644 --- a/mix.exs +++ b/mix.exs @@ -2,7 +2,7 @@ defmodule Queuetopia.MixProject do use Mix.Project @source_url "https://github.com/annatel/queuetopia" - @version "2.7.0" + @version "2.7.1" def project do [ diff --git a/test/job_cleaner_test.exs b/test/queuetopia/job_cleaner_test.exs similarity index 69% rename from test/job_cleaner_test.exs rename to test/queuetopia/job_cleaner_test.exs index 710817d..33a3998 100644 --- a/test/job_cleaner_test.exs +++ b/test/queuetopia/job_cleaner_test.exs @@ -11,18 +11,10 @@ defmodule Queuetopia.JobCleanerTest do |> DateTime.truncate(:second) end - setup do - Application.put_env(:queuetopia, TestQueuetopia, cleanup_interval: {50, :millisecond}) - - on_exit(fn -> - Application.put_env(:queuetopia, TestQueuetopia, []) - end) - - :ok - end - test "removes completed jobs older than 7 days retention period during periodic cleanup" do - start_supervised!(TestQueuetopia) + start_supervised!( + {TestQueuetopia, cleanup_interval: {100, :millisecond}, job_cleaner_max_initial_delay: 0} + ) :timer.sleep(100) @@ -60,7 +52,10 @@ defmodule Queuetopia.JobCleanerTest do done_at: datetime_days_ago(8) ) - start_supervised!(TestQueuetopia) + start_supervised!( + {TestQueuetopia, cleanup_interval: {100, :millisecond}, job_cleaner_max_initial_delay: 0} + ) + :timer.sleep(100) assert is_nil(TestRepo.get(Job, our_eight_days_old_job.id)) @@ -78,9 +73,11 @@ defmodule Queuetopia.JobCleanerTest do assert TestRepo.get(Job, eight_days_old_completed_job.id) - start_supervised!(TestQueuetopia) + start_supervised!( + {TestQueuetopia, cleanup_interval: {100, :millisecond}, job_cleaner_max_initial_delay: 0} + ) - :timer.sleep(10) + :timer.sleep(50) assert is_nil(TestRepo.get(Job, eight_days_old_completed_job.id)) end @@ -97,4 +94,26 @@ defmodule Queuetopia.JobCleanerTest do assert TestRepo.get(Job, old_job.id) assert TestRepo.get(Job, recent_job.id) end + + test "respects job_cleaner_max_initial_delay before first cleanup" do + scope = TestQueuetopia.scope() + + eight_days_old_job = + insert!(:job, + scope: scope, + done_at: datetime_days_ago(8) + ) + + start_supervised!( + {TestQueuetopia, cleanup_interval: {100, :millisecond}, job_cleaner_max_initial_delay: 20} + ) + + :timer.sleep(10) + assert TestRepo.get(Job, eight_days_old_job.id), "Job should still exist before initial delay" + + :timer.sleep(15) + + assert is_nil(TestRepo.get(Job, eight_days_old_job.id)), + "Job should be deleted after initial delay" + end end