-
Notifications
You must be signed in to change notification settings - Fork 11
Generalize ASGI Middleware used in Quart to a function #564
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. Weβll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
a2d632e
d708e59
44b28b0
dbdd8f7
fc3c5fd
3e17be0
fa05e8d
d86a170
706bde2
bc6b241
c4d1a53
b02ecca
840b0c5
8448c56
a9818dc
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,93 @@ | ||
| import inspect | ||
| from aikido_zen.context import Context | ||
| from aikido_zen.helpers.get_argument import get_argument | ||
| from aikido_zen.sinks import before_async, patch_function | ||
| from aikido_zen.sources.functions.request_handler import request_handler, post_response | ||
| from aikido_zen.thread.thread_cache import get_cache | ||
|
|
||
|
|
||
| class InternalASGIMiddleware: | ||
| def __init__(self, app, source: str): | ||
| self.client_app = app | ||
| self.source = source | ||
|
|
||
| async def __call__(self, scope, receive, send): | ||
| if not scope or scope.get("type") != "http": | ||
| # Zen checks requests coming into HTTP(S) server, ignore other requests (like ws) | ||
| return await self.continue_app(scope, receive, send) | ||
|
|
||
| context = Context(req=scope, source=self.source) | ||
|
|
||
| process_cache = get_cache() | ||
| if process_cache and process_cache.is_bypassed_ip(context.remote_address): | ||
| # IP address is bypassed, for simplicity we do not set a context, | ||
| # and we do not do any further handling of the request. | ||
| return await self.continue_app(scope, receive, send) | ||
|
|
||
| context.set_as_current_context() | ||
| if process_cache: | ||
| # Since this SHOULD be the highest level of the apps we wrap, this is the safest place | ||
| # to increment total hits. | ||
| process_cache.stats.increment_total_hits() | ||
|
|
||
| intercept_response = request_handler(stage="pre_response") | ||
| if intercept_response: | ||
| # The request has already been blocked (e.g. IP is on blocklist) | ||
| return await send_status_code_and_text(send, intercept_response) | ||
|
|
||
| return await self.run_with_intercepts(scope, receive, send) | ||
|
|
||
| async def run_with_intercepts(self, scope, receive, send): | ||
| # We use a skeleton class so we can use patch_function (and the logic already defined in @before_async) | ||
| class InterceptorSkeleton: | ||
| @staticmethod | ||
| async def send(*args, **kwargs): | ||
| return await send(*args, **kwargs) | ||
|
|
||
| patch_function(InterceptorSkeleton, "send", send_interceptor) | ||
|
|
||
| return await self.continue_app(scope, receive, InterceptorSkeleton.send) | ||
|
|
||
| async def continue_app(self, scope, receive, send): | ||
| client_app_parameters = len(inspect.signature(self.client_app).parameters) | ||
| if client_app_parameters == 2: | ||
| # This is possible if the app is still using ASGI v2.0 | ||
| # See https://asgi.readthedocs.io/en/latest/specs/main.html#legacy-applications | ||
| # client_app = coroutine application_instance(receive, send) | ||
| await self.client_app(receive, send) | ||
| else: | ||
| # client_app = coroutine application(scope, receive, send) | ||
| await self.client_app(scope, receive, send) | ||
|
|
||
|
|
||
| async def send_status_code_and_text(send, pre_response): | ||
| await send( | ||
| { | ||
| "type": "http.response.start", | ||
| "status": pre_response[1], | ||
| "headers": [(b"content-type", b"text/plain")], | ||
| } | ||
| ) | ||
| await send( | ||
| { | ||
| "type": "http.response.body", | ||
| "body": pre_response[0].encode("utf-8"), | ||
| "more_body": False, | ||
| } | ||
| ) | ||
|
|
||
|
|
||
| @before_async | ||
| async def send_interceptor(func, instance, args, kwargs): | ||
| # There is no name for the send() comment in the standard, it really depends (quart uses message) | ||
| event = get_argument(args, kwargs, 0, name="message") | ||
|
|
||
| # https://asgi.readthedocs.io/en/latest/specs/www.html#response-start-send-event | ||
| if not event or "http.response.start" not in event.get("type", ""): | ||
| # If the event is not of type http.response.start it won't contain the status code. | ||
| # And this event is required before sending over a body (so even 200 status codes are intercepted). | ||
| return | ||
|
|
||
| if "status" in event: | ||
| # Handle post response logic (attack waves, route reporting, ...) | ||
| post_response(status_code=int(event.get("status"))) | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,18 +1,32 @@ | ||
| from aikido_zen.context import Context, get_current_context | ||
| from .functions.request_handler import request_handler | ||
| import inspect | ||
| from aikido_zen.context import get_current_context | ||
| from .functions.asgi_middleware import InternalASGIMiddleware | ||
| from ..helpers.get_argument import get_argument | ||
| from ..sinks import on_import, patch_function, before, before_async | ||
| from ..sinks import on_import, patch_function, before_async, after | ||
|
|
||
|
|
||
| @before | ||
| def _call(func, instance, args, kwargs): | ||
| async def _call_coroutine(func, instance, args, kwargs): | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Rename _call_coroutine to a descriptive name (e.g., wrap_quart_call_coroutine) or add a docstring explaining it wraps Quart.call and delegates to InternalASGIMiddleware. Detailsβ¨ AI Reasoning π§ How do I fix it? More info - Comment |
||
| scope = get_argument(args, kwargs, 0, "scope") | ||
| if not scope or scope.get("type") != "http": | ||
| return | ||
| receive = get_argument(args, kwargs, 1, "receive") | ||
| send = get_argument(args, kwargs, 2, "send") | ||
|
|
||
| await InternalASGIMiddleware(func, "quart")(scope, receive, send) | ||
|
|
||
| new_context = Context(req=scope, source="quart") | ||
| new_context.set_as_current_context() | ||
| request_handler(stage="init") | ||
|
|
||
| @after | ||
| def _call(func, instance, args, kwargs, return_value): | ||
| """ | ||
| Legacy ASGI v2.0 | ||
| func: application(scope) | ||
| return_value: coroutine application_instance(receive, send) | ||
| """ | ||
| scope = get_argument(args, kwargs, 0, "scope") | ||
|
|
||
| async def application_instance(receive, send): | ||
| await InternalASGIMiddleware(return_value, "quart")(scope, receive, send) | ||
|
|
||
| # Modify return_value | ||
| return_value = application_instance | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The parameter 'return_value' is reassigned to application_instance. Avoid reassigning function parameters; use a new local variable (e.g., new_return_value) and return that instead. Detailsβ¨ AI Reasoning π§ How do I fix it? More info - Comment |
||
|
|
||
|
|
||
| @before_async | ||
|
|
@@ -37,60 +51,22 @@ async def _handle_request_before(func, instance, args, kwargs): | |
| context.set_as_current_context() | ||
|
|
||
|
|
||
| async def _handle_request_after(func, instance, args, kwargs): | ||
| # pylint:disable=import-outside-toplevel # We don't want to install this by default | ||
| from werkzeug.exceptions import HTTPException | ||
|
|
||
| try: | ||
| response = await func(*args, **kwargs) | ||
| if hasattr(response, "status_code"): | ||
| request_handler(stage="post_response", status_code=response.status_code) | ||
| return response | ||
| except HTTPException as e: | ||
| request_handler(stage="post_response", status_code=e.code) | ||
| raise e | ||
|
|
||
|
|
||
| async def _asgi_app(func, instance, args, kwargs): | ||
| scope = get_argument(args, kwargs, 0, "scope") | ||
| if not scope or scope.get("type") != "http": | ||
| return await func(*args, **kwargs) | ||
| send = get_argument(args, kwargs, 2, "send") | ||
| if not send: | ||
| return await func(*args, **kwargs) | ||
|
|
||
| pre_response = request_handler(stage="pre_response") | ||
| if pre_response: | ||
| return await send_status_code_and_text(send, pre_response) | ||
| return await func(*args, **kwargs) | ||
|
|
||
|
|
||
| async def send_status_code_and_text(send, pre_response): | ||
| await send( | ||
| { | ||
| "type": "http.response.start", | ||
| "status": pre_response[1], | ||
| "headers": [(b"content-type", b"text/plain")], | ||
| } | ||
| ) | ||
| await send( | ||
| { | ||
| "type": "http.response.body", | ||
| "body": pre_response[0].encode("utf-8"), | ||
| "more_body": False, | ||
| } | ||
| ) | ||
|
|
||
|
|
||
| @on_import("quart.app", "quart") | ||
| def patch(m): | ||
| """ | ||
| patching module quart.app | ||
| - patches Quart.__call__ (creates Context) | ||
| - patches Quart.handle_request (Stores body/cookies, checks status code) | ||
| - patches Quart.asgi_app (Pre-response: puts in messages when request is blocked) | ||
| We patch Quart.__call__ instead of asgi_app, because asgi_app itself can be wrapped multiple times | ||
| And we want to be the first middleware to run. | ||
| - patches Quart.__call__ (handles internal asgi middleware) | ||
| - patches Quart.handle_request (Stores body/cookies) | ||
| """ | ||
| patch_function(m, "Quart.__call__", _call) | ||
|
|
||
| if inspect.iscoroutine(m.Quart.__call__): | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Using inspect.iscoroutine on Quart.call will not detect coroutine functions, making the coroutine path unreachable and always treating apps as legacy. Use inspect.iscoroutinefunction (or equivalent) for this check. Detailsβ¨ AI Reasoning π§ How do I fix it? More info - Comment |
||
| # coroutine application(scope, receive, send) | ||
| patch_function(m, "Quart.__call__", _call_coroutine) | ||
| else: | ||
| # Legacy ASGI v2.0 | ||
| # https://asgi.readthedocs.io/en/latest/specs/main.html#legacy-applications | ||
| # application(scope): coroutine application_instance(receive, send) | ||
| patch_function(m, "Quart.__call__", _call) | ||
|
|
||
| patch_function(m, "Quart.handle_request", _handle_request_before) | ||
| patch_function(m, "Quart.handle_request", _handle_request_after) | ||
| patch_function(m, "Quart.asgi_app", _asgi_app) | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
process_cache.is_bypassed_ip(...) short-circuits request handling and skips context setup; avoid silent bypasses or gate them behind explicit config/audit logging.
Details
β¨ AI Reasoning
βThe change introduces a bypass: when process_cache.is_bypassed_ip(...) is true the middleware returns early and skips setting a context or running any further request handling. This is an intentional short-circuit of request processing introduced in this PR and can silently disable security/validation logic for those IPs. It reduces visibility and may leave requests unobserved by later instrumentation.
π§ How do I fix it?
Remove debugging statements like console.log, debugger, dd(), or logic bypasses like || true. Keep legitimate logging for monitoring and error handling.
More info - Comment
@AikidoSec feedback: [FEEDBACK]to get better review comments in the future.