From d5c819813324262c5d992663b502503446af7544 Mon Sep 17 00:00:00 2001 From: Dzhamaludin Osmanov Date: Mon, 8 Dec 2025 14:17:02 +0300 Subject: [PATCH 1/2] Fix: Handle missing worker processes on shutdown Added exception handling for ProcessLookupError when killing worker processes during shutdown. Now logs if a process is already terminated, improving robustness and clarity in process management. --- taskiq/cli/worker/process_manager.py | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/taskiq/cli/worker/process_manager.py b/taskiq/cli/worker/process_manager.py index 2763261..dee60d0 100644 --- a/taskiq/cli/worker/process_manager.py +++ b/taskiq/cli/worker/process_manager.py @@ -275,7 +275,19 @@ def start(self) -> int | None: # noqa: C901 logger.debug("Process manager closed, killing workers.") for worker in self.workers: if worker.pid: - os.kill(worker.pid, signal.SIGINT) + try: + os.kill(worker.pid, signal.SIGINT) + logger.info( + "Stopped process %s with pid %s", + worker.name, + worker.pid, + ) + except ProcessLookupError: + logger.info( + "Process %s (pid %s) already terminated", + worker.name, + worker.pid, + ) return None for worker_num, worker in enumerate(self.workers): From e3c6666e9205975a8d5fb3ea4e9ae42fd340db92 Mon Sep 17 00:00:00 2001 From: Dzhamaludin Osmanov Date: Mon, 8 Dec 2025 15:17:08 +0300 Subject: [PATCH 2/2] Fix(ci): Refactor worker shutdown logic into helper method Moved the worker shutdown code into a new _shutdown_workers() method in ProcessManager to improve code reuse and readability. The start() method now calls this helper when handling ShutdownAction. --- taskiq/cli/worker/process_manager.py | 33 +++++++++++++++------------- 1 file changed, 18 insertions(+), 15 deletions(-) diff --git a/taskiq/cli/worker/process_manager.py b/taskiq/cli/worker/process_manager.py index dee60d0..c34090c 100644 --- a/taskiq/cli/worker/process_manager.py +++ b/taskiq/cli/worker/process_manager.py @@ -214,6 +214,23 @@ def prepare_workers(self) -> None: for worker, event in zip(self.workers, events, strict=True): _wait_for_worker_startup(worker, event) + def _shutdown_workers(self) -> None: + for worker in self.workers: + if worker.pid: + try: + os.kill(worker.pid, signal.SIGINT) + logger.info( + "Stopped process %s with pid %s", + worker.name, + worker.pid, + ) + except ProcessLookupError: + logger.info( + "Process %s (pid %s) already terminated", + worker.name, + worker.pid, + ) + def start(self) -> int | None: # noqa: C901 """ Start managing child processes. @@ -273,21 +290,7 @@ def start(self) -> int | None: # noqa: C901 reloaded_workers.add(action.worker_num) elif isinstance(action, ShutdownAction): logger.debug("Process manager closed, killing workers.") - for worker in self.workers: - if worker.pid: - try: - os.kill(worker.pid, signal.SIGINT) - logger.info( - "Stopped process %s with pid %s", - worker.name, - worker.pid, - ) - except ProcessLookupError: - logger.info( - "Process %s (pid %s) already terminated", - worker.name, - worker.pid, - ) + self._shutdown_workers() return None for worker_num, worker in enumerate(self.workers):