Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
## Features

* **Production-Safe & Low Overhead**: Leverages Python's `sys.audit` hooks for minimal runtime overhead, making it safe for production use
* **Works with asyncio and uvloop**: Compatible with both standard asyncio and uvloop event loops out of the box
* **Blocking I/O Detection**: Automatically detects blocking I/O calls (file operations, network calls, subprocess, etc.) in your async code
* **Stack Trace Capture**: Captures full stack traces to pinpoint exactly where blocking calls originate
* **Severity Scoring**: Assigns severity scores to blocking events to help prioritize fixes
Expand All @@ -30,7 +31,7 @@
<img src="https://raw.githubusercontent.com/feverup/aiocop/master/docs/images/explanation_diagram.png" alt="aiocop architecture diagram">
</p>

aiocop wraps `asyncio.Handle._run` (the method that executes every task in the event loop) and uses Python's `sys.audit` hooks to detect blocking calls. When your code calls a blocking function like `open()`, the audit event is captured along with the full stack trace—letting you know exactly where the problem is.
aiocop wraps the event loop's scheduling methods (`call_soon`, `call_later`, etc.) and uses Python's `sys.audit` hooks to detect blocking calls. This approach works with both standard asyncio and uvloop. When your code calls a blocking function like `open()`, the audit event is captured along with the full stack trace—letting you know exactly where the problem is.

## Why aiocop?

Expand All @@ -47,6 +48,7 @@ aiocop was built to solve specific production constraints that existing approach
| Production Overhead | Low-Medium | High | Very Low (~13μs/task) |
| Stack Traces | Yes | No (timing only) | Yes |
| Runtime Control | Varies | Flag at startup | Dynamic on/off |
| uvloop Support | Varies | No | Yes |

## Performance

Expand Down
4 changes: 2 additions & 2 deletions aiocop/core/callbacks.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ def _capture_context() -> dict[str, Any]:
if provider_context is not None:
context.update(provider_context)
except Exception as e:
logger.warning("Error in context provider %s: %s", provider.__name__, e)
logger.warning("Error in context provider %s: %s", getattr(provider, "__name__", repr(provider)), e)

return context

Expand All @@ -81,4 +81,4 @@ def _invoke_slow_task_callbacks(event: SlowTaskEvent) -> None:
try:
callback(event)
except Exception as e:
logger.warning("Error in slow task callback %s: %s", callback.__name__, e)
logger.warning("Error in slow task callback %s: %s", getattr(callback, "__name__", repr(callback)), e)
146 changes: 113 additions & 33 deletions aiocop/core/slow_tasks.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,15 @@
"""Slow task detection by patching asyncio Handle._run."""

"""Slow task detection by patching event loop scheduling methods.

This module patches the event loop's call_soon, call_later, call_at, and
call_soon_threadsafe methods to wrap callbacks with monitoring logic.

This approach works with both standard asyncio and uvloop, since it patches
at the loop level rather than relying on asyncio's Handle._run which uvloop
replaces with its own Cython implementation.
"""

import asyncio
import logging
from asyncio.events import Handle
from collections.abc import Callable
from dataclasses import replace
from time import perf_counter_ns
Expand All @@ -20,39 +25,34 @@
_reset_exception_flag,
is_monitoring_active,
raise_on_violations,
register_on_activate_hook,
)
from aiocop.exceptions import HighSeverityBlockingIoException
from aiocop.types.events import BlockingEventInfo, SlowTaskEvent
from aiocop.types.severity import THRESHOLD_HIGH

logger = logging.getLogger(__name__)

_detect_slow_tasks_already_applied = False
_detect_slow_tasks_configured = False
_slow_task_threshold_ns: int = 30 * 1_000_000

SlowTaskCallback = Callable[[SlowTaskEvent], None]


def _invoke_callbacks_with_context(event: SlowTaskEvent) -> None:
"""
Capture context and invoke callbacks within the Handle's context.

This function is called via self._context.run() to ensure that context
providers (like ddtrace span) are captured from the correct contextvars.
"""
captured_context = _capture_context()
event_with_context = replace(event, context=captured_context)
_invoke_slow_task_callbacks(event_with_context)


def detect_slow_tasks(
threshold_ms: int = 30,
on_slow_task: SlowTaskCallback | None = None,
) -> None:
"""
Patch the asyncio event loop to detect slow tasks.
Configure slow task detection for the asyncio event loop.

This patches the event loop's scheduling methods (call_soon, call_later, etc.)
to measure execution time and capture blocking IO events. Works with both
standard asyncio and uvloop.

The loop is patched either immediately (if called from an async context) or
when activate() is called.

This patches Handle._run to measure execution time and capture blocking IO events.
Callbacks are invoked for every task that has blocking events detected, with
the exceeded_threshold flag indicating if the task exceeded the threshold.

Expand All @@ -64,9 +64,9 @@ def detect_slow_tasks(

Should be called after start_blocking_io_detection().
"""
global _detect_slow_tasks_already_applied, _slow_task_threshold_ns
global _detect_slow_tasks_configured, _slow_task_threshold_ns

if _detect_slow_tasks_already_applied is True:
if _detect_slow_tasks_configured is True:
logger.warning("detect_slow_tasks called more than once, ignoring")
return

Expand All @@ -80,27 +80,99 @@ def detect_slow_tasks(
if raise_on_violations.get() is True:
logger.info("Exceptions raising on high severity IO blocking tasks enabled")

old_run = Handle._run # noqa
_detect_slow_tasks_configured = True

register_on_activate_hook(_ensure_loop_patched)

_ensure_loop_patched()


def _ensure_loop_patched() -> bool:
"""
Ensure the current event loop is patched for slow task detection.
"""
if not _detect_slow_tasks_configured:
return False

try:
loop = asyncio.get_running_loop()
except RuntimeError:
return False

if getattr(loop, "_aiocop_patched", False):
return True

_patch_loop(loop)
loop._aiocop_patched = True # type: ignore[attr-defined]
logger.info("Event loop patched for slow task detection")
return True


def _patch_loop(loop: asyncio.AbstractEventLoop) -> None:
"""Patch the event loop's scheduling methods to monitor callback execution."""

original_call_soon = loop.call_soon

__class__ = Handle # noqa
def patched_call_soon(callback: Callable[..., Any], *args: Any, context: Any = None) -> Any:
wrapped = _make_monitored_callback(callback, args)
return original_call_soon(wrapped, context=context)

def new_run(self) -> Any:
loop.call_soon = patched_call_soon # type: ignore[method-assign]

original_call_later = loop.call_later

def patched_call_later(delay: float, callback: Callable[..., Any], *args: Any, context: Any = None) -> Any:
wrapped = _make_monitored_callback(callback, args)
return original_call_later(delay, wrapped, context=context)

loop.call_later = patched_call_later # type: ignore[method-assign]

original_call_at = loop.call_at

def patched_call_at(when: float, callback: Callable[..., Any], *args: Any, context: Any = None) -> Any:
wrapped = _make_monitored_callback(callback, args)
return original_call_at(when, wrapped, context=context)

loop.call_at = patched_call_at # type: ignore[method-assign]

original_call_soon_threadsafe = loop.call_soon_threadsafe

def patched_call_soon_threadsafe(callback: Callable[..., Any], *args: Any, context: Any = None) -> Any:
wrapped = _make_monitored_callback(callback, args)
return original_call_soon_threadsafe(wrapped, context=context)

loop.call_soon_threadsafe = patched_call_soon_threadsafe # type: ignore[method-assign]


def _make_monitored_callback(callback: Callable[..., Any], args: tuple[Any, ...]) -> Callable[[], Any]:
"""
Create a wrapper that monitors callback execution time and blocking events.

The wrapper is called with no arguments - the original args are captured
in the closure and passed to the callback.
"""

def monitored_wrapper() -> Any:
if not is_monitoring_active():
return old_run(self)
if args:
return callback(*args)
return callback()

thread_local = _get_thread_local()
captured_events: list = []
captured_events: list[Any] = []

previous_events = getattr(thread_local, "blocking_events", None)

thread_local.blocking_events = captured_events
thread_local.should_raise_for_this_handle = False

_reset_exception_flag()
t0 = perf_counter_ns()

try:
return_value = old_run(self) # noqa
if args:
return_value = callback(*args)
else:
return_value = callback()
finally:
thread_local.blocking_events = previous_events

Expand Down Expand Up @@ -129,10 +201,10 @@ def new_run(self) -> Any:
blocking_events=formatted_events,
)

self._context.run(_invoke_callbacks_with_context, slow_task_event)
_invoke_callbacks_with_context(slow_task_event)

if exceeded_threshold is True:
self._context.run(_check_and_raise_if_needed, elapsed, formatted_events, should_raise)
_check_and_raise_if_needed(elapsed, formatted_events, should_raise)

elif exceeded_threshold is True:
slow_task_event = SlowTaskEvent(
Expand All @@ -145,7 +217,7 @@ def new_run(self) -> Any:
blocking_events=[],
)

self._context.run(_invoke_callbacks_with_context, slow_task_event)
_invoke_callbacks_with_context(slow_task_event)

except HighSeverityBlockingIoException:
raise
Expand All @@ -154,14 +226,22 @@ def new_run(self) -> Any:

return return_value

Handle._run = new_run # noqa # type: ignore[method-assign]
_detect_slow_tasks_already_applied = True
return monitored_wrapper


def _invoke_callbacks_with_context(event: SlowTaskEvent) -> None:
"""
Capture context and invoke callbacks.
"""
captured_context = _capture_context()
event_with_context = replace(event, context=captured_context)
_invoke_slow_task_callbacks(event_with_context)


def _check_and_raise_if_needed(
elapsed: int, blocking_events: list[BlockingEventInfo] | None, should_raise: bool
) -> None:
"""Check if high severity blocking IO should raise an exception within the Handle's context."""
"""Check if high severity blocking IO should raise an exception."""
if should_raise is True and _has_exception_been_raised() is False:
io_severity = calculate_io_severity_score(blocking_events)

Expand Down
12 changes: 12 additions & 0 deletions aiocop/core/state.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
"""Shared state management for aiocop monitoring."""

import threading
from collections.abc import Callable
from contextvars import ContextVar

_thread_local = threading.local()
Expand All @@ -11,12 +12,23 @@

_exception_raised = False

_on_activate_hooks: list[Callable[[], object]] = []


def register_on_activate_hook(hook: Callable[[], object]) -> None:
"""Register a hook to be called when monitoring is activated."""
if hook not in _on_activate_hooks:
_on_activate_hooks.append(hook)


def activate() -> None:
"""Activate monitoring. Call this after startup when you want to start detecting blocking IO."""
global _monitoring_active
_monitoring_active = True

for hook in _on_activate_hooks:
hook()


def deactivate() -> None:
"""Deactivate monitoring. Pauses all detection without unregistering hooks."""
Expand Down
7 changes: 6 additions & 1 deletion aiocop/exceptions.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
"""Exceptions for aiocop."""

from typing import TYPE_CHECKING

if TYPE_CHECKING:
from aiocop.types.events import BlockingEventInfo


class HighSeverityBlockingIoException(Exception):
"""Exception raised when high-severity blocking I/O is detected in async context."""
Expand All @@ -10,7 +15,7 @@ def __init__(
severity_level: str,
elapsed_ms: float,
threshold_ms: float,
events: list[dict[str, str]],
events: "list[BlockingEventInfo]",
) -> None:
self.severity_score = severity_score
self.severity_level = severity_level
Expand Down
Loading