diff --git a/arroyo/processing/processor.py b/arroyo/processing/processor.py index a33a1236..01bd24bf 100644 --- a/arroyo/processing/processor.py +++ b/arroyo/processing/processor.py @@ -40,6 +40,25 @@ F = TypeVar("F", bound=Callable[[Any], Any]) +def get_all_thread_stacks() -> str: + """Get stack traces from all threads without using signals.""" + import sys + import threading + import traceback + + stacks = [] + frames = sys._current_frames() + threads_by_id = {t.ident: t for t in threading.enumerate()} + + for thread_id, frame in frames.items(): + thread = threads_by_id.get(thread_id) + thread_name = thread.name if thread else f"Unknown-{thread_id}" + stack = "".join(traceback.format_stack(frame)) + stacks.append(f"Thread {thread_name} ({thread_id}):\n{stack}") + + return "\n\n".join(stacks) + + def _rdkafka_callback(metrics: MetricsBuffer) -> Callable[[F], F]: def decorator(f: F) -> F: @functools.wraps(f) @@ -86,6 +105,7 @@ class InvalidStateError(RuntimeError): "arroyo.consumer.pause", "arroyo.consumer.resume", "arroyo.consumer.dlq.dropped_messages", + "arroyo.consumer.stuck", ] @@ -140,6 +160,7 @@ def __init__( commit_policy: CommitPolicy = ONCE_PER_SECOND, dlq_policy: Optional[DlqPolicy[TStrategyPayload]] = None, join_timeout: Optional[float] = None, + stuck_detector_timeout: Optional[int] = None, ) -> None: self.__consumer = consumer self.__processor_factory = processor_factory @@ -164,6 +185,7 @@ def __init__( ) self.__shutdown_requested = False + self.__shutdown_done = False # Buffers messages for DLQ. Messages are added when they are submitted for processing and # removed once the commit callback is fired as they are guaranteed to be valid at that point. @@ -175,6 +197,11 @@ def __init__( DlqPolicyWrapper(dlq_policy) if dlq_policy is not None else None ) + self.__last_run = time.time() + + if stuck_detector_timeout: + self.stuck_detector_run(stuck_detector_timeout) + def _close_strategy() -> None: self._close_processing_strategy() @@ -355,6 +382,31 @@ def run(self) -> None: self.__processor_factory.shutdown() logger.info("Processor terminated") raise + finally: + self.__shutdown_done = True + + def stuck_detector_run(self, stuck_detector_timeout: int) -> None: + import threading + + def f() -> None: + while not self.__shutdown_done: + time_since_last_run = time.time() - self.__last_run + if time_since_last_run > stuck_detector_timeout: + stack_traces = get_all_thread_stacks() + logger.warning( + "main thread stuck for more than %s seconds, stacks: %s", + stuck_detector_timeout, + stack_traces, + ) + self.__metrics_buffer.incr_counter("arroyo.consumer.stuck", 1) + self.__metrics_buffer.flush() + return + + time.sleep(1) + + t = threading.Thread(target=f) + t.daemon = True + t.start() def _clear_backpressure(self) -> None: if self.__backpressure_timestamp is not None: @@ -405,6 +457,7 @@ def _handle_invalid_message(self, exc: InvalidMessage) -> None: def _run_once(self) -> None: self.__metrics_buffer.incr_counter("arroyo.consumer.run.count", 1) + self.__last_run = time.time() message_carried_over = self.__message is not None diff --git a/arroyo/utils/metric_defs.py b/arroyo/utils/metric_defs.py index 8048c023..c9508cd6 100644 --- a/arroyo/utils/metric_defs.py +++ b/arroyo/utils/metric_defs.py @@ -93,6 +93,8 @@ # # This might cause increased network usage as messages are being re-fetched. "arroyo.consumer.resume", + # Counter: Incremented when the consumer main thread is stuck and not processing messages. + "arroyo.consumer.stuck", # Gauge: Queue size of background queue that librdkafka uses to prefetch messages. "arroyo.consumer.librdkafka.total_queue_size", # Counter: Counter metric to measure how often the healthcheck file has been touched. diff --git a/tests/processing/test_processor.py b/tests/processing/test_processor.py index 3e0eb57d..85ab85d8 100644 --- a/tests/processing/test_processor.py +++ b/tests/processing/test_processor.py @@ -763,3 +763,45 @@ def test_processor_poll_while_paused() -> None: processor._run_once() assert strategy.submit.call_args_list[-1] == mock.call(new_message) + + +def test_stuck_detector(request: pytest.FixtureRequest) -> None: + """Test that stuck detector emits a metric when strategy blocks.""" + topic = Topic("topic") + partition = Partition(topic, 0) + + consumer = mock.Mock() + consumer.tell.return_value = {} + + strategy = mock.Mock() + real_sleep = time.sleep + + factory = mock.Mock() + factory.create_with_partitions.return_value = strategy + + TestingMetricsBackend.calls.clear() + + with mock.patch("time.time", return_value=0.0) as mock_time: + with mock.patch("time.sleep", side_effect=lambda s: real_sleep(0.01)): + processor: StreamProcessor[int] = StreamProcessor( + consumer, topic, factory, IMMEDIATE, stuck_detector_timeout=2 + ) + + request.addfinalizer(processor.signal_shutdown) + + assignment_callback = consumer.subscribe.call_args.kwargs["on_assign"] + assignment_callback({partition: 0}) + + consumer.poll.return_value = BrokerValue(0, partition, 0, datetime.now()) + processor._run_once() + + mock_time.return_value = 5.0 + real_sleep(0.2) + + stuck_metrics = [ + call + for call in TestingMetricsBackend.calls + if isinstance(call, Increment) and call.name == "arroyo.consumer.stuck" + ] + assert len(stuck_metrics) == 1 + assert stuck_metrics[0].value == 1