From 9d44990ac5600dc926c5f9d935d77958df7baf25 Mon Sep 17 00:00:00 2001 From: Benaim Axelle Date: Tue, 2 Dec 2025 10:42:41 +0200 Subject: [PATCH 1/5] add job_cleaner_initial_delay to prevent deadLock --- lib/queuetopia.ex | 17 +++++-- lib/{ => queuetopia}/job_cleaner.ex | 24 +++++----- mix.exs | 2 +- test/{ => queuetopia}/job_cleaner_test.exs | 53 +++++++++++++++++----- 4 files changed, 67 insertions(+), 29 deletions(-) rename lib/{ => queuetopia}/job_cleaner.ex (63%) rename test/{ => queuetopia}/job_cleaner_test.exs (62%) diff --git a/lib/queuetopia.ex b/lib/queuetopia.ex index cf3f5d5..e8ee931 100644 --- a/lib/queuetopia.ex +++ b/lib/queuetopia.ex @@ -56,8 +56,9 @@ defmodule Queuetopia do @repo Keyword.fetch!(opts, :repo) @performer Keyword.fetch!(opts, :performer) |> to_string() @scope __MODULE__ |> to_string() - @cleanup_interval Keyword.get(opts, :cleanup_interval, nil) + @cleanup_interval Keyword.get(opts, :cleanup_interval) @job_retention Keyword.get(opts, :job_retention, {7, :day}) + @job_cleaner_initial_delay Keyword.get(opts, :job_cleaner_initial_delay, :rand.uniform(100)) @default_poll_interval 60 * 1_000 defp config(otp_app, queue) when is_atom(otp_app) and is_atom(queue) do @@ -65,7 +66,7 @@ defmodule Queuetopia do [otp_app: otp_app] ++ config end - defp to_ms({duration, unit}) do + defp to_ms({duration, unit}) when is_integer(duration) and is_atom(unit) do timestamp = DateTime.utc_now() timestamp @@ -86,9 +87,13 @@ defmodule Queuetopia do Keyword.get(config, :poll_interval) || @default_poll_interval - cleanup_interval = Keyword.get(config, :cleanup_interval) || @cleanup_interval + cleanup_interval = Keyword.get(opts, :cleanup_interval) || @cleanup_interval + cleanup_interval_ms = cleanup_interval && to_ms(cleanup_interval) + job_cleaner_initial_delay = + Keyword.get(opts, :job_cleaner_initial_delay) || @job_cleaner_initial_delay + disable? = Keyword.get(config, :disable?, false) opts = [ @@ -96,7 +101,8 @@ defmodule Queuetopia do poll_interval: poll_interval, number_of_concurrent_jobs: Keyword.get(config, :number_of_concurrent_jobs), cleanup_interval: cleanup_interval_ms, - job_retention: Keyword.get(config, :job_retention) || @job_retention + job_retention: @job_retention, + job_cleaner_initial_delay: job_cleaner_initial_delay ] if disable?, do: :ignore, else: Supervisor.start_link(__MODULE__, opts, name: __MODULE__) @@ -146,7 +152,8 @@ defmodule Queuetopia do repo: Keyword.fetch!(args, :repo), scope: @scope, cleanup_interval: Keyword.fetch!(args, :cleanup_interval), - job_retention: Keyword.fetch!(args, :job_retention) + job_retention: Keyword.fetch!(args, :job_retention), + job_cleaner_initial_delay: Keyword.fetch!(args, :job_cleaner_initial_delay) ]} end diff --git a/lib/job_cleaner.ex b/lib/queuetopia/job_cleaner.ex similarity index 63% rename from lib/job_cleaner.ex rename to lib/queuetopia/job_cleaner.ex index c8df631..99f2bb9 100644 --- a/lib/job_cleaner.ex +++ b/lib/queuetopia/job_cleaner.ex @@ -15,32 +15,32 @@ defmodule Queuetopia.JobCleaner do @impl true def init(opts) do - Process.send(self(), :cleanup, []) + job_cleaner_initial_delay = Keyword.fetch!(opts, :job_cleaner_initial_delay) + Process.send_after(self(), :cleanup, job_cleaner_initial_delay) state = %{ repo: Keyword.fetch!(opts, :repo), scope: Keyword.fetch!(opts, :scope), cleanup_interval: Keyword.fetch!(opts, :cleanup_interval), - job_retention: Keyword.get(opts, :job_retention) + job_retention: Keyword.get(opts, :job_retention), + job_cleaner_initial_delay: job_cleaner_initial_delay } {:ok, state} end @impl true - def handle_info( - :cleanup, - %{ - repo: repo, - scope: scope, - cleanup_interval: cleanup_interval, - job_retention: job_retention - } = state - ) do + def handle_info(:cleanup, state) do + %{ + repo: repo, + scope: scope, + cleanup_interval: cleanup_interval, + job_retention: job_retention + } = state + Queue.cleanup_completed_jobs(repo, scope, job_retention) Process.send_after(self(), :cleanup, cleanup_interval) - {:noreply, state} end end diff --git a/mix.exs b/mix.exs index 85c2b1e..0ecb18e 100644 --- a/mix.exs +++ b/mix.exs @@ -2,7 +2,7 @@ defmodule Queuetopia.MixProject do use Mix.Project @source_url "https://github.com/annatel/queuetopia" - @version "2.7.0" + @version "2.7.1" def project do [ diff --git a/test/job_cleaner_test.exs b/test/queuetopia/job_cleaner_test.exs similarity index 62% rename from test/job_cleaner_test.exs rename to test/queuetopia/job_cleaner_test.exs index 710817d..a4d8faf 100644 --- a/test/job_cleaner_test.exs +++ b/test/queuetopia/job_cleaner_test.exs @@ -11,18 +11,22 @@ defmodule Queuetopia.JobCleanerTest do |> DateTime.truncate(:second) end - setup do - Application.put_env(:queuetopia, TestQueuetopia, cleanup_interval: {50, :millisecond}) + # setup do + # Application.put_env(:queuetopia, TestQueuetopia, cleanup_interval: {50, :millisecond}) + # Application.put_env(:queuetopia, :job_cleaner_initial_delay, 0) - on_exit(fn -> - Application.put_env(:queuetopia, TestQueuetopia, []) - end) + # on_exit(fn -> + # Application.put_env(:queuetopia, TestQueuetopia, []) + # Application.delete_env(:queuetopia, :job_cleaner_initial_delay) + # end) - :ok - end + # :ok + # end test "removes completed jobs older than 7 days retention period during periodic cleanup" do - start_supervised!(TestQueuetopia) + start_supervised!( + {TestQueuetopia, cleanup_interval: {100, :millisecond}, job_cleaner_initial_delay: 0} + ) :timer.sleep(100) @@ -60,7 +64,10 @@ defmodule Queuetopia.JobCleanerTest do done_at: datetime_days_ago(8) ) - start_supervised!(TestQueuetopia) + start_supervised!( + {TestQueuetopia, cleanup_interval: {100, :millisecond}, job_cleaner_initial_delay: 0} + ) + :timer.sleep(100) assert is_nil(TestRepo.get(Job, our_eight_days_old_job.id)) @@ -78,9 +85,11 @@ defmodule Queuetopia.JobCleanerTest do assert TestRepo.get(Job, eight_days_old_completed_job.id) - start_supervised!(TestQueuetopia) + start_supervised!( + {TestQueuetopia, cleanup_interval: {100, :millisecond}, job_cleaner_initial_delay: 0} + ) - :timer.sleep(10) + :timer.sleep(50) assert is_nil(TestRepo.get(Job, eight_days_old_completed_job.id)) end @@ -97,4 +106,26 @@ defmodule Queuetopia.JobCleanerTest do assert TestRepo.get(Job, old_job.id) assert TestRepo.get(Job, recent_job.id) end + + test "respects job_cleaner_initial_delay before first cleanup" do + scope = TestQueuetopia.scope() + + eight_days_old_job = + insert!(:job, + scope: scope, + done_at: datetime_days_ago(8) + ) + + start_supervised!( + {TestQueuetopia, cleanup_interval: {100, :millisecond}, job_cleaner_initial_delay: 20} + ) + + :timer.sleep(10) + assert TestRepo.get(Job, eight_days_old_job.id), "Job should still exist before initial delay" + + :timer.sleep(15) + + assert is_nil(TestRepo.get(Job, eight_days_old_job.id)), + "Job should be deleted after initial delay" + end end From a554c6d63f46503d00cce829c3484b38ffd8df68 Mon Sep 17 00:00:00 2001 From: Benaim Axelle Date: Tue, 2 Dec 2025 10:45:52 +0200 Subject: [PATCH 2/5] remove unused code --- test/queuetopia/job_cleaner_test.exs | 12 ------------ 1 file changed, 12 deletions(-) diff --git a/test/queuetopia/job_cleaner_test.exs b/test/queuetopia/job_cleaner_test.exs index a4d8faf..27c28f5 100644 --- a/test/queuetopia/job_cleaner_test.exs +++ b/test/queuetopia/job_cleaner_test.exs @@ -11,18 +11,6 @@ defmodule Queuetopia.JobCleanerTest do |> DateTime.truncate(:second) end - # setup do - # Application.put_env(:queuetopia, TestQueuetopia, cleanup_interval: {50, :millisecond}) - # Application.put_env(:queuetopia, :job_cleaner_initial_delay, 0) - - # on_exit(fn -> - # Application.put_env(:queuetopia, TestQueuetopia, []) - # Application.delete_env(:queuetopia, :job_cleaner_initial_delay) - # end) - - # :ok - # end - test "removes completed jobs older than 7 days retention period during periodic cleanup" do start_supervised!( {TestQueuetopia, cleanup_interval: {100, :millisecond}, job_cleaner_initial_delay: 0} From 4b6ec3bc8722a64fd2e5786cf96a08125ca0512e Mon Sep 17 00:00:00 2001 From: Benaim Axelle Date: Tue, 2 Dec 2025 11:54:04 +0200 Subject: [PATCH 3/5] update --- README.md | 30 +++++++++++++++++++++++++++- lib/queuetopia.ex | 10 +++++----- lib/queuetopia/job_cleaner.ex | 17 ++++++++-------- test/queuetopia/job_cleaner_test.exs | 10 +++++----- 4 files changed, 48 insertions(+), 19 deletions(-) diff --git a/README.md b/README.md index f4fa8b4..892fa86 100644 --- a/README.md +++ b/README.md @@ -90,9 +90,37 @@ defmodule MyApp.MailQueuetopia do use Queuetopia, otp_app: :my_app, performer: MyApp.MailQueuetopia.Performer, - repo: MyApp.Repo + repo: MyApp.Repo, + cleanup_interval: {1, :day}, + job_retention: {7, :day}, + job_cleaner_max_initial_delay: 100 end ``` +#### Job Cleanup Configuration + +Queuetopia provides automatic cleanup of completed jobs with the following options: + +- **`cleanup_interval`** *(optional)* - Defines how often the job cleaner runs. Must be a tuple like `{1, :day}`, `{2, :hour}`, `{30, :minute}`, etc. If not set, job cleanup is **disabled** and completed jobs will remain in the database indefinitely. + +- **`job_retention`** *(optional)* - Defines how long completed jobs are kept before being deleted. Defaults to `{7, :day}` (7 days). Must be a tuple specifying the duration. + +- **`job_cleaner_max_initial_delay`** *(optional)* - Maximum delay in milliseconds before the first cleanup runs when the JobCleaner starts. A random delay between 0 and this value is used to prevent multiple nodes from running cleanup simultaneously. If set to 0, cleanup runs immediately on startup. Defaults to a reasonable value to distribute cleanup across nodes. + +**Examples:** +```elixir +# Cleanup every hour, keep jobs for 3 days, start immediately +cleanup_interval: {1, :hour}, +job_retention: {3, :day}, +job_cleaner_max_initial_delay: 0 + +# Cleanup twice daily, keep jobs for 2 weeks, random startup delay up to 5 minutes +cleanup_interval: {12, :hour}, +job_retention: {14, :day}, +job_cleaner_max_initial_delay: 300_000 +``` +# No automatic cleanup (jobs persist forever) +# cleanup_interval not set + A Queuetopia expects a performer to exist. For example, the performer can be implemented like this: diff --git a/lib/queuetopia.ex b/lib/queuetopia.ex index e8ee931..0df1ab1 100644 --- a/lib/queuetopia.ex +++ b/lib/queuetopia.ex @@ -58,7 +58,7 @@ defmodule Queuetopia do @scope __MODULE__ |> to_string() @cleanup_interval Keyword.get(opts, :cleanup_interval) @job_retention Keyword.get(opts, :job_retention, {7, :day}) - @job_cleaner_initial_delay Keyword.get(opts, :job_cleaner_initial_delay, :rand.uniform(100)) + @job_cleaner_max_initial_delay Keyword.get(opts, :job_cleaner_max_initial_delay, 1000) @default_poll_interval 60 * 1_000 defp config(otp_app, queue) when is_atom(otp_app) and is_atom(queue) do @@ -91,8 +91,8 @@ defmodule Queuetopia do cleanup_interval_ms = cleanup_interval && to_ms(cleanup_interval) - job_cleaner_initial_delay = - Keyword.get(opts, :job_cleaner_initial_delay) || @job_cleaner_initial_delay + job_cleaner_max_initial_delay = + Keyword.get(opts, :job_cleaner_max_initial_delay) || @job_cleaner_max_initial_delay disable? = Keyword.get(config, :disable?, false) @@ -102,7 +102,7 @@ defmodule Queuetopia do number_of_concurrent_jobs: Keyword.get(config, :number_of_concurrent_jobs), cleanup_interval: cleanup_interval_ms, job_retention: @job_retention, - job_cleaner_initial_delay: job_cleaner_initial_delay + job_cleaner_max_initial_delay: job_cleaner_max_initial_delay ] if disable?, do: :ignore, else: Supervisor.start_link(__MODULE__, opts, name: __MODULE__) @@ -153,7 +153,7 @@ defmodule Queuetopia do scope: @scope, cleanup_interval: Keyword.fetch!(args, :cleanup_interval), job_retention: Keyword.fetch!(args, :job_retention), - job_cleaner_initial_delay: Keyword.fetch!(args, :job_cleaner_initial_delay) + job_cleaner_max_initial_delay: Keyword.fetch!(args, :job_cleaner_max_initial_delay) ]} end diff --git a/lib/queuetopia/job_cleaner.ex b/lib/queuetopia/job_cleaner.ex index 99f2bb9..e290711 100644 --- a/lib/queuetopia/job_cleaner.ex +++ b/lib/queuetopia/job_cleaner.ex @@ -15,16 +15,14 @@ defmodule Queuetopia.JobCleaner do @impl true def init(opts) do - job_cleaner_initial_delay = Keyword.fetch!(opts, :job_cleaner_initial_delay) + + # Add random dalay to mitigate deadlocks on starts + job_cleaner_initial_delay = + opts |> Keyword.fetch!(:job_cleaner_max_initial_delay) |> random_delay() + Process.send_after(self(), :cleanup, job_cleaner_initial_delay) - state = %{ - repo: Keyword.fetch!(opts, :repo), - scope: Keyword.fetch!(opts, :scope), - cleanup_interval: Keyword.fetch!(opts, :cleanup_interval), - job_retention: Keyword.get(opts, :job_retention), - job_cleaner_initial_delay: job_cleaner_initial_delay - } + state = opts |> Keyword.take([:repo, :scope, :cleanup_interval, :job_retention]) |> Map.new() {:ok, state} end @@ -43,4 +41,7 @@ defmodule Queuetopia.JobCleaner do Process.send_after(self(), :cleanup, cleanup_interval) {:noreply, state} end + + defp random_delay(0), do: 0 + defp random_delay(max) when max > 0, do: :rand.uniform(max) end diff --git a/test/queuetopia/job_cleaner_test.exs b/test/queuetopia/job_cleaner_test.exs index 27c28f5..33a3998 100644 --- a/test/queuetopia/job_cleaner_test.exs +++ b/test/queuetopia/job_cleaner_test.exs @@ -13,7 +13,7 @@ defmodule Queuetopia.JobCleanerTest do test "removes completed jobs older than 7 days retention period during periodic cleanup" do start_supervised!( - {TestQueuetopia, cleanup_interval: {100, :millisecond}, job_cleaner_initial_delay: 0} + {TestQueuetopia, cleanup_interval: {100, :millisecond}, job_cleaner_max_initial_delay: 0} ) :timer.sleep(100) @@ -53,7 +53,7 @@ defmodule Queuetopia.JobCleanerTest do ) start_supervised!( - {TestQueuetopia, cleanup_interval: {100, :millisecond}, job_cleaner_initial_delay: 0} + {TestQueuetopia, cleanup_interval: {100, :millisecond}, job_cleaner_max_initial_delay: 0} ) :timer.sleep(100) @@ -74,7 +74,7 @@ defmodule Queuetopia.JobCleanerTest do assert TestRepo.get(Job, eight_days_old_completed_job.id) start_supervised!( - {TestQueuetopia, cleanup_interval: {100, :millisecond}, job_cleaner_initial_delay: 0} + {TestQueuetopia, cleanup_interval: {100, :millisecond}, job_cleaner_max_initial_delay: 0} ) :timer.sleep(50) @@ -95,7 +95,7 @@ defmodule Queuetopia.JobCleanerTest do assert TestRepo.get(Job, recent_job.id) end - test "respects job_cleaner_initial_delay before first cleanup" do + test "respects job_cleaner_max_initial_delay before first cleanup" do scope = TestQueuetopia.scope() eight_days_old_job = @@ -105,7 +105,7 @@ defmodule Queuetopia.JobCleanerTest do ) start_supervised!( - {TestQueuetopia, cleanup_interval: {100, :millisecond}, job_cleaner_initial_delay: 20} + {TestQueuetopia, cleanup_interval: {100, :millisecond}, job_cleaner_max_initial_delay: 20} ) :timer.sleep(10) From 2d045cada97d08039e34c5bd7a10f978f11db96d Mon Sep 17 00:00:00 2001 From: Benaim Axelle Date: Tue, 2 Dec 2025 12:09:01 +0200 Subject: [PATCH 4/5] update --- README.md | 2 -- 1 file changed, 2 deletions(-) diff --git a/README.md b/README.md index 892fa86..5464a66 100644 --- a/README.md +++ b/README.md @@ -118,8 +118,6 @@ cleanup_interval: {12, :hour}, job_retention: {14, :day}, job_cleaner_max_initial_delay: 300_000 ``` -# No automatic cleanup (jobs persist forever) -# cleanup_interval not set A Queuetopia expects a performer to exist. For example, the performer can be implemented like this: From caa76eeb2ee6ec50c11e79db303f44e179d7cb5e Mon Sep 17 00:00:00 2001 From: Joel Kociolek Date: Tue, 2 Dec 2025 12:21:06 +0200 Subject: [PATCH 5/5] mix format --- lib/queuetopia/job_cleaner.ex | 1 - 1 file changed, 1 deletion(-) diff --git a/lib/queuetopia/job_cleaner.ex b/lib/queuetopia/job_cleaner.ex index e290711..a606af9 100644 --- a/lib/queuetopia/job_cleaner.ex +++ b/lib/queuetopia/job_cleaner.ex @@ -15,7 +15,6 @@ defmodule Queuetopia.JobCleaner do @impl true def init(opts) do - # Add random dalay to mitigate deadlocks on starts job_cleaner_initial_delay = opts |> Keyword.fetch!(:job_cleaner_max_initial_delay) |> random_delay()