Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
93 changes: 93 additions & 0 deletions aikido_zen/sources/functions/asgi_middleware.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
import inspect
from aikido_zen.context import Context
from aikido_zen.helpers.get_argument import get_argument
from aikido_zen.sinks import before_async, patch_function
from aikido_zen.sources.functions.request_handler import request_handler, post_response
from aikido_zen.thread.thread_cache import get_cache


class InternalASGIMiddleware:
def __init__(self, app, source: str):
self.client_app = app
self.source = source

async def __call__(self, scope, receive, send):
if not scope or scope.get("type") != "http":
# Zen checks requests coming into HTTP(S) server, ignore other requests (like ws)
return await self.continue_app(scope, receive, send)

context = Context(req=scope, source=self.source)

process_cache = get_cache()
if process_cache and process_cache.is_bypassed_ip(context.remote_address):

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

process_cache.is_bypassed_ip(...) short-circuits request handling and skips context setup; avoid silent bypasses or gate them behind explicit config/audit logging.

Details

✨ AI Reasoning
​The change introduces a bypass: when process_cache.is_bypassed_ip(...) is true the middleware returns early and skips setting a context or running any further request handling. This is an intentional short-circuit of request processing introduced in this PR and can silently disable security/validation logic for those IPs. It reduces visibility and may leave requests unobserved by later instrumentation.

πŸ”§ How do I fix it?
Remove debugging statements like console.log, debugger, dd(), or logic bypasses like || true. Keep legitimate logging for monitoring and error handling.

More info - Comment @AikidoSec feedback: [FEEDBACK] to get better review comments in the future.

# IP address is bypassed, for simplicity we do not set a context,
# and we do not do any further handling of the request.
return await self.continue_app(scope, receive, send)

context.set_as_current_context()
if process_cache:
# Since this SHOULD be the highest level of the apps we wrap, this is the safest place
# to increment total hits.
process_cache.stats.increment_total_hits()

intercept_response = request_handler(stage="pre_response")
if intercept_response:
# The request has already been blocked (e.g. IP is on blocklist)
return await send_status_code_and_text(send, intercept_response)

return await self.run_with_intercepts(scope, receive, send)

async def run_with_intercepts(self, scope, receive, send):
# We use a skeleton class so we can use patch_function (and the logic already defined in @before_async)
class InterceptorSkeleton:
@staticmethod
async def send(*args, **kwargs):
return await send(*args, **kwargs)

patch_function(InterceptorSkeleton, "send", send_interceptor)

return await self.continue_app(scope, receive, InterceptorSkeleton.send)

async def continue_app(self, scope, receive, send):
client_app_parameters = len(inspect.signature(self.client_app).parameters)
if client_app_parameters == 2:
# This is possible if the app is still using ASGI v2.0
# See https://asgi.readthedocs.io/en/latest/specs/main.html#legacy-applications
# client_app = coroutine application_instance(receive, send)
await self.client_app(receive, send)
else:
# client_app = coroutine application(scope, receive, send)
await self.client_app(scope, receive, send)


async def send_status_code_and_text(send, pre_response):
await send(
{
"type": "http.response.start",
"status": pre_response[1],
"headers": [(b"content-type", b"text/plain")],
}
)
await send(
{
"type": "http.response.body",
"body": pre_response[0].encode("utf-8"),
"more_body": False,
}
)


@before_async
async def send_interceptor(func, instance, args, kwargs):
# There is no name for the send() comment in the standard, it really depends (quart uses message)
event = get_argument(args, kwargs, 0, name="message")

# https://asgi.readthedocs.io/en/latest/specs/www.html#response-start-send-event
if not event or "http.response.start" not in event.get("type", ""):
# If the event is not of type http.response.start it won't contain the status code.
# And this event is required before sending over a body (so even 200 status codes are intercepted).
return

if "status" in event:
# Handle post response logic (attack waves, route reporting, ...)
post_response(status_code=int(event.get("status")))
100 changes: 38 additions & 62 deletions aikido_zen/sources/quart.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,32 @@
from aikido_zen.context import Context, get_current_context
from .functions.request_handler import request_handler
import inspect
from aikido_zen.context import get_current_context
from .functions.asgi_middleware import InternalASGIMiddleware
from ..helpers.get_argument import get_argument
from ..sinks import on_import, patch_function, before, before_async
from ..sinks import on_import, patch_function, before_async, after


@before
def _call(func, instance, args, kwargs):
async def _call_coroutine(func, instance, args, kwargs):

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rename _call_coroutine to a descriptive name (e.g., wrap_quart_call_coroutine) or add a docstring explaining it wraps Quart.call and delegates to InternalASGIMiddleware.

Details

✨ AI Reasoning
​A newly introduced function named _call_coroutine is intended to wrap/replace Quart.call for coroutine-style ASGI apps, but the name doesn't describe its role (middleware wrapper, context creation, delegation). A clearer name or docstring would make intent obvious and aid maintenance.

πŸ”§ How do I fix it?
Use descriptive verb-noun function names, add docstrings explaining the function's purpose, or provide meaningful return type hints.

More info - Comment @AikidoSec feedback: [FEEDBACK] to get better review comments in the future.

scope = get_argument(args, kwargs, 0, "scope")
if not scope or scope.get("type") != "http":
return
receive = get_argument(args, kwargs, 1, "receive")
send = get_argument(args, kwargs, 2, "send")

await InternalASGIMiddleware(func, "quart")(scope, receive, send)

new_context = Context(req=scope, source="quart")
new_context.set_as_current_context()
request_handler(stage="init")

@after
def _call(func, instance, args, kwargs, return_value):
"""
Legacy ASGI v2.0
func: application(scope)
return_value: coroutine application_instance(receive, send)
"""
scope = get_argument(args, kwargs, 0, "scope")

async def application_instance(receive, send):
await InternalASGIMiddleware(return_value, "quart")(scope, receive, send)

# Modify return_value
return_value = application_instance

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The parameter 'return_value' is reassigned to application_instance. Avoid reassigning function parameters; use a new local variable (e.g., new_return_value) and return that instead.

Details

✨ AI Reasoning
​An after-advice wrapper function receives the original function's return value as a parameter and then assigns a new value to that parameter before returning it. Reassigning an input parameter can confuse readers about the original return value and make reasoning about the wrapper harder. This change introduced the reassignment where the wrapper replaced the incoming return value with a new callable wrapping the original behavior.

πŸ”§ How do I fix it?
Create new local variables instead of reassigning parameters. Use different variable names to clearly distinguish between input and modified values.

More info - Comment @AikidoSec feedback: [FEEDBACK] to get better review comments in the future.



@before_async
Expand All @@ -37,60 +51,22 @@ async def _handle_request_before(func, instance, args, kwargs):
context.set_as_current_context()


async def _handle_request_after(func, instance, args, kwargs):
# pylint:disable=import-outside-toplevel # We don't want to install this by default
from werkzeug.exceptions import HTTPException

try:
response = await func(*args, **kwargs)
if hasattr(response, "status_code"):
request_handler(stage="post_response", status_code=response.status_code)
return response
except HTTPException as e:
request_handler(stage="post_response", status_code=e.code)
raise e


async def _asgi_app(func, instance, args, kwargs):
scope = get_argument(args, kwargs, 0, "scope")
if not scope or scope.get("type") != "http":
return await func(*args, **kwargs)
send = get_argument(args, kwargs, 2, "send")
if not send:
return await func(*args, **kwargs)

pre_response = request_handler(stage="pre_response")
if pre_response:
return await send_status_code_and_text(send, pre_response)
return await func(*args, **kwargs)


async def send_status_code_and_text(send, pre_response):
await send(
{
"type": "http.response.start",
"status": pre_response[1],
"headers": [(b"content-type", b"text/plain")],
}
)
await send(
{
"type": "http.response.body",
"body": pre_response[0].encode("utf-8"),
"more_body": False,
}
)


@on_import("quart.app", "quart")
def patch(m):
"""
patching module quart.app
- patches Quart.__call__ (creates Context)
- patches Quart.handle_request (Stores body/cookies, checks status code)
- patches Quart.asgi_app (Pre-response: puts in messages when request is blocked)
We patch Quart.__call__ instead of asgi_app, because asgi_app itself can be wrapped multiple times
And we want to be the first middleware to run.
- patches Quart.__call__ (handles internal asgi middleware)
- patches Quart.handle_request (Stores body/cookies)
"""
patch_function(m, "Quart.__call__", _call)

if inspect.iscoroutine(m.Quart.__call__):

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using inspect.iscoroutine on Quart.call will not detect coroutine functions, making the coroutine path unreachable and always treating apps as legacy. Use inspect.iscoroutinefunction (or equivalent) for this check.

Details

✨ AI Reasoning
​The conditional that decides whether Quart.call is treated as a coroutine application uses an API that checks coroutine instances instead of coroutine functions. Since the target is a function, this branch will not be taken in normal operation, so the logic for non-legacy ASGI apps will never run and everything will be treated as legacy. This is a clear control-flow bug where the intended branch is unreachable.

πŸ”§ How do I fix it?
Trace execution paths carefully. Ensure precondition checks happen before using values, validate ranges before checking impossible conditions, and don't check for states that the code has already ruled out.

More info - Comment @AikidoSec feedback: [FEEDBACK] to get better review comments in the future.

# coroutine application(scope, receive, send)
patch_function(m, "Quart.__call__", _call_coroutine)
else:
# Legacy ASGI v2.0
# https://asgi.readthedocs.io/en/latest/specs/main.html#legacy-applications
# application(scope): coroutine application_instance(receive, send)
patch_function(m, "Quart.__call__", _call)

patch_function(m, "Quart.handle_request", _handle_request_before)
patch_function(m, "Quart.handle_request", _handle_request_after)
patch_function(m, "Quart.asgi_app", _asgi_app)
Loading