From 6401ff9e3dd6be46554d8a53d1fe19de0c333006 Mon Sep 17 00:00:00 2001 From: Markus Unterwaditzer Date: Mon, 7 Apr 2025 19:44:16 +0200 Subject: [PATCH 1/4] ref: Stuck detector Implement a background thread that dumps all stacktraces when the main thread gets stuck. variations of this are: * https://github.com/getsentry/sentry/pull/100857 -- unlike that PR, this one can run enabled in all consumers, since it only reports stacktraces when we're actually stuck. * https://github.com/getsentry/arroyo/pull/442 -- this is a previous version that only reported on the main thread, and in an overly complicated manner. We cannot use faulthandler because that one can only report to "real" files, and I want to report the stuck consumers to logging/Sentry. --- arroyo/processing/processor.py | 56 ++++++++++++++++++++++++++++++ arroyo/utils/metric_defs.py | 2 ++ tests/processing/test_processor.py | 45 ++++++++++++++++++++++++ 3 files changed, 103 insertions(+) diff --git a/arroyo/processing/processor.py b/arroyo/processing/processor.py index a33a1236..7ca897ff 100644 --- a/arroyo/processing/processor.py +++ b/arroyo/processing/processor.py @@ -40,6 +40,25 @@ F = TypeVar("F", bound=Callable[[Any], Any]) +def get_all_thread_stacks() -> str: + """Get stack traces from all threads without using signals.""" + import sys + import threading + import traceback + + stacks = [] + frames = sys._current_frames() + threads_by_id = {t.ident: t for t in threading.enumerate()} + + for thread_id, frame in frames.items(): + thread = threads_by_id.get(thread_id) + thread_name = thread.name if thread else f"Unknown-{thread_id}" + stack = "".join(traceback.format_stack(frame)) + stacks.append(f"Thread {thread_name} ({thread_id}):\n{stack}") + + return "\n\n".join(stacks) + + def _rdkafka_callback(metrics: MetricsBuffer) -> Callable[[F], F]: def decorator(f: F) -> F: @functools.wraps(f) @@ -86,6 +105,7 @@ class InvalidStateError(RuntimeError): "arroyo.consumer.pause", "arroyo.consumer.resume", "arroyo.consumer.dlq.dropped_messages", + "arroyo.consumer.stuck", ] @@ -140,6 +160,7 @@ def __init__( commit_policy: CommitPolicy = ONCE_PER_SECOND, dlq_policy: Optional[DlqPolicy[TStrategyPayload]] = None, join_timeout: Optional[float] = None, + stuck_detector_timeout: Optional[int] = None, ) -> None: self.__consumer = consumer self.__processor_factory = processor_factory @@ -175,6 +196,11 @@ def __init__( DlqPolicyWrapper(dlq_policy) if dlq_policy is not None else None ) + self.__stuck = True + + if stuck_detector_timeout: + self.stuck_detector_run(stuck_detector_timeout) + def _close_strategy() -> None: self._close_processing_strategy() @@ -356,6 +382,35 @@ def run(self) -> None: logger.info("Processor terminated") raise + def stuck_detector_run(self, stuck_detector_timeout: int) -> None: + import threading + + def f() -> None: + i = 0 + while True: + if self.__stuck: + i += 1 + else: + i = 0 + self.__stuck = True + + if i >= stuck_detector_timeout: + stack_traces = get_all_thread_stacks() + logger.warning( + "main thread stuck for more than %s seconds, stacks: %s", + stuck_detector_timeout, + stack_traces, + ) + self.__metrics_buffer.incr_counter("arroyo.consumer.stuck", 1) + self.__metrics_buffer.flush() + return + + time.sleep(1) + + t = threading.Thread(target=f) + t.daemon = True + t.start() + def _clear_backpressure(self) -> None: if self.__backpressure_timestamp is not None: self.__metrics_buffer.incr_timing( @@ -405,6 +460,7 @@ def _handle_invalid_message(self, exc: InvalidMessage) -> None: def _run_once(self) -> None: self.__metrics_buffer.incr_counter("arroyo.consumer.run.count", 1) + self.__stuck = False message_carried_over = self.__message is not None diff --git a/arroyo/utils/metric_defs.py b/arroyo/utils/metric_defs.py index 8048c023..c9508cd6 100644 --- a/arroyo/utils/metric_defs.py +++ b/arroyo/utils/metric_defs.py @@ -93,6 +93,8 @@ # # This might cause increased network usage as messages are being re-fetched. "arroyo.consumer.resume", + # Counter: Incremented when the consumer main thread is stuck and not processing messages. + "arroyo.consumer.stuck", # Gauge: Queue size of background queue that librdkafka uses to prefetch messages. "arroyo.consumer.librdkafka.total_queue_size", # Counter: Counter metric to measure how often the healthcheck file has been touched. diff --git a/tests/processing/test_processor.py b/tests/processing/test_processor.py index 3e0eb57d..556b97e8 100644 --- a/tests/processing/test_processor.py +++ b/tests/processing/test_processor.py @@ -763,3 +763,48 @@ def test_processor_poll_while_paused() -> None: processor._run_once() assert strategy.submit.call_args_list[-1] == mock.call(new_message) + + +def test_stuck_detector() -> None: + """Test that stuck detector emits a metric when strategy blocks.""" + import threading + + topic = Topic("topic") + partition = Partition(topic, 0) + + consumer = mock.Mock() + consumer.tell.return_value = {} + + strategy = mock.Mock() + real_sleep = time.sleep + strategy.submit.side_effect = lambda msg: real_sleep(0.5) + + factory = mock.Mock() + factory.create_with_partitions.return_value = strategy + + TestingMetricsBackend.calls.clear() + + with mock.patch("time.sleep", side_effect=lambda s: real_sleep(0.01)): + processor: StreamProcessor[int] = StreamProcessor( + consumer, topic, factory, IMMEDIATE, stuck_detector_timeout=2 + ) + + assignment_callback = consumer.subscribe.call_args.kwargs["on_assign"] + assignment_callback({partition: 0}) + + consumer.poll.return_value = BrokerValue(0, partition, 0, datetime.now()) + + run_thread = threading.Thread(target=processor._run_once) + run_thread.start() + + real_sleep(0.5) + + stuck_metrics = [ + call + for call in TestingMetricsBackend.calls + if isinstance(call, Increment) and call.name == "arroyo.consumer.stuck" + ] + assert len(stuck_metrics) == 1 + assert stuck_metrics[0].value == 1 + + run_thread.join(timeout=2) From 1b61a378df537e8e3903fb0850046be1dc471623 Mon Sep 17 00:00:00 2001 From: Markus Unterwaditzer Date: Tue, 2 Dec 2025 14:00:48 +0100 Subject: [PATCH 2/4] shut down thread to prevent leaks in tests --- arroyo/processing/processor.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/arroyo/processing/processor.py b/arroyo/processing/processor.py index 7ca897ff..d5f691a3 100644 --- a/arroyo/processing/processor.py +++ b/arroyo/processing/processor.py @@ -185,6 +185,7 @@ def __init__( ) self.__shutdown_requested = False + self.__shutdown_done = False # Buffers messages for DLQ. Messages are added when they are submitted for processing and # removed once the commit callback is fired as they are guaranteed to be valid at that point. @@ -381,13 +382,15 @@ def run(self) -> None: self.__processor_factory.shutdown() logger.info("Processor terminated") raise + finally: + self.__shutdown_done = True def stuck_detector_run(self, stuck_detector_timeout: int) -> None: import threading def f() -> None: i = 0 - while True: + while not self.__shutdown_done: if self.__stuck: i += 1 else: From 879fa7a006ca47ecd679111def90a7a083384060 Mon Sep 17 00:00:00 2001 From: Markus Unterwaditzer Date: Tue, 2 Dec 2025 14:09:27 +0100 Subject: [PATCH 3/4] shutdown properly again in tests --- tests/processing/test_processor.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/tests/processing/test_processor.py b/tests/processing/test_processor.py index 556b97e8..92a0f77a 100644 --- a/tests/processing/test_processor.py +++ b/tests/processing/test_processor.py @@ -765,7 +765,7 @@ def test_processor_poll_while_paused() -> None: assert strategy.submit.call_args_list[-1] == mock.call(new_message) -def test_stuck_detector() -> None: +def test_stuck_detector(request: pytest.FixtureRequest) -> None: """Test that stuck detector emits a metric when strategy blocks.""" import threading @@ -789,6 +789,8 @@ def test_stuck_detector() -> None: consumer, topic, factory, IMMEDIATE, stuck_detector_timeout=2 ) + request.addfinalizer(processor.signal_shutdown) + assignment_callback = consumer.subscribe.call_args.kwargs["on_assign"] assignment_callback({partition: 0}) From 19f98f7e0a75e0e4580e8e0c108763ace8a3a5cc Mon Sep 17 00:00:00 2001 From: Markus Unterwaditzer Date: Tue, 2 Dec 2025 15:42:40 +0100 Subject: [PATCH 4/4] make it timestamp based, make test faster --- arroyo/processing/processor.py | 14 +++------- tests/processing/test_processor.py | 43 +++++++++++++----------------- 2 files changed, 23 insertions(+), 34 deletions(-) diff --git a/arroyo/processing/processor.py b/arroyo/processing/processor.py index d5f691a3..01bd24bf 100644 --- a/arroyo/processing/processor.py +++ b/arroyo/processing/processor.py @@ -197,7 +197,7 @@ def __init__( DlqPolicyWrapper(dlq_policy) if dlq_policy is not None else None ) - self.__stuck = True + self.__last_run = time.time() if stuck_detector_timeout: self.stuck_detector_run(stuck_detector_timeout) @@ -389,15 +389,9 @@ def stuck_detector_run(self, stuck_detector_timeout: int) -> None: import threading def f() -> None: - i = 0 while not self.__shutdown_done: - if self.__stuck: - i += 1 - else: - i = 0 - self.__stuck = True - - if i >= stuck_detector_timeout: + time_since_last_run = time.time() - self.__last_run + if time_since_last_run > stuck_detector_timeout: stack_traces = get_all_thread_stacks() logger.warning( "main thread stuck for more than %s seconds, stacks: %s", @@ -463,7 +457,7 @@ def _handle_invalid_message(self, exc: InvalidMessage) -> None: def _run_once(self) -> None: self.__metrics_buffer.incr_counter("arroyo.consumer.run.count", 1) - self.__stuck = False + self.__last_run = time.time() message_carried_over = self.__message is not None diff --git a/tests/processing/test_processor.py b/tests/processing/test_processor.py index 92a0f77a..85ab85d8 100644 --- a/tests/processing/test_processor.py +++ b/tests/processing/test_processor.py @@ -767,8 +767,6 @@ def test_processor_poll_while_paused() -> None: def test_stuck_detector(request: pytest.FixtureRequest) -> None: """Test that stuck detector emits a metric when strategy blocks.""" - import threading - topic = Topic("topic") partition = Partition(topic, 0) @@ -777,36 +775,33 @@ def test_stuck_detector(request: pytest.FixtureRequest) -> None: strategy = mock.Mock() real_sleep = time.sleep - strategy.submit.side_effect = lambda msg: real_sleep(0.5) factory = mock.Mock() factory.create_with_partitions.return_value = strategy TestingMetricsBackend.calls.clear() - with mock.patch("time.sleep", side_effect=lambda s: real_sleep(0.01)): - processor: StreamProcessor[int] = StreamProcessor( - consumer, topic, factory, IMMEDIATE, stuck_detector_timeout=2 - ) - - request.addfinalizer(processor.signal_shutdown) - - assignment_callback = consumer.subscribe.call_args.kwargs["on_assign"] - assignment_callback({partition: 0}) + with mock.patch("time.time", return_value=0.0) as mock_time: + with mock.patch("time.sleep", side_effect=lambda s: real_sleep(0.01)): + processor: StreamProcessor[int] = StreamProcessor( + consumer, topic, factory, IMMEDIATE, stuck_detector_timeout=2 + ) - consumer.poll.return_value = BrokerValue(0, partition, 0, datetime.now()) + request.addfinalizer(processor.signal_shutdown) - run_thread = threading.Thread(target=processor._run_once) - run_thread.start() + assignment_callback = consumer.subscribe.call_args.kwargs["on_assign"] + assignment_callback({partition: 0}) - real_sleep(0.5) + consumer.poll.return_value = BrokerValue(0, partition, 0, datetime.now()) + processor._run_once() - stuck_metrics = [ - call - for call in TestingMetricsBackend.calls - if isinstance(call, Increment) and call.name == "arroyo.consumer.stuck" - ] - assert len(stuck_metrics) == 1 - assert stuck_metrics[0].value == 1 + mock_time.return_value = 5.0 + real_sleep(0.2) - run_thread.join(timeout=2) + stuck_metrics = [ + call + for call in TestingMetricsBackend.calls + if isinstance(call, Increment) and call.name == "arroyo.consumer.stuck" + ] + assert len(stuck_metrics) == 1 + assert stuck_metrics[0].value == 1