From f0ea826f6aa5072956334e4ccfe837f1b4e45b1d Mon Sep 17 00:00:00 2001 From: frozenwizard <172203+frozenwizard@users.noreply.github.com> Date: Tue, 30 Dec 2025 22:30:19 -0600 Subject: [PATCH 1/7] Begin using microdot and convert things to async --- main.py | 8 +- src/gunpla/base_gundam.py | 55 +- src/gunpla/nu_gundam.py | 39 +- src/gunpla/unicorn_banshee.py | 10 +- src/pi/led_effect.py | 27 +- src/server/Microdot.py | 1554 +++++++++++++++++++++++++++++++++ src/server/Networking.py | 28 + src/server/RouteDecorator.py | 32 + src/server/Wrappers.py | 30 + src/server/__init__.py | 0 src/server/webserver.py | 102 +++ src/webserver.py | 83 -- 12 files changed, 1803 insertions(+), 165 deletions(-) create mode 100644 src/server/Microdot.py create mode 100644 src/server/Networking.py create mode 100644 src/server/RouteDecorator.py create mode 100644 src/server/Wrappers.py create mode 100644 src/server/__init__.py create mode 100644 src/server/webserver.py delete mode 100644 src/webserver.py diff --git a/main.py b/main.py index 87dd34c..a7bca5a 100644 --- a/main.py +++ b/main.py @@ -1,11 +1,13 @@ +import asyncio + from src import settings -from src.webserver import WebServer +from src.server.webserver import WebServer def main(): webserver = WebServer(settings.webserver) - webserver.main() + asyncio.run( webserver.run()) if __name__ == "__main__": - main() + main() \ No newline at end of file diff --git a/src/gunpla/base_gundam.py b/src/gunpla/base_gundam.py index a675aeb..e798085 100644 --- a/src/gunpla/base_gundam.py +++ b/src/gunpla/base_gundam.py @@ -1,10 +1,9 @@ import json -from src.phew import server -from src.phew.server import Request, Response, logging from src.pi.board_led import BoardLED from src.pi.disabled_LED import DisabledLED from src.pi.LED import LED +from src.server.Wrappers import safe_execution class BaseGundam: @@ -24,40 +23,31 @@ def get_config_file(self) -> str: """ raise Exception("Not implemented") - def led_on(self, request: Request, led_name: str) -> Response: + def led_on(self, led_name: str): """ Turns a Single LED on by name """ - logging.info(f"turning on {led_name}") - try: - led = self._get_led_from_name(led_name) - led.on() - return Response(f"{led_name}: on", 200) - except Exception as ex: - return Response(str(ex), 500) + print(f"turning on {led_name}") + led = self._get_led_from_name(led_name) + led.on() - def led_off(self, request: Request, led_name: str) -> Response: + def led_off(self, led_name: str) -> None: """ Turns a single LED off by name """ - logging.info(f"turning off {led_name}") - try: - led = self._get_led_from_name(led_name) - led.off() - return Response(f"{led_name}: off", 200) - except Exception as ex: - return Response(str(ex), 500) + print(f"turning off {led_name}") + led = self._get_led_from_name(led_name) + led.off() + + # TODO: make this not need the safe_execution and do it when we register paths - def all_on(self, request: Request) -> Response: + @safe_execution + def all_on(self) -> None: """ Turns all configured LED's on. """ - logging.info("turning on all leds") - try: - leds = self._all_leds_on() - return Response(f"All on\n {leds} ", 200) - except Exception as ex: - return Response(str(ex), 500) + print("turning on all leds") + self._all_leds_on() def _all_leds_on(self) -> str: """ @@ -74,16 +64,15 @@ def _all_leds_on(self) -> str: leds += f"{led_name}: on\n" return leds - def all_off(self, request: Request) -> Response: + # TODO: make this not need the safe_execution and do it when we register paths + + @safe_execution + def all_off(self) -> None: """ Turns all configured LED's off """ - logging.info("turning off all leds") - try: - leds = self._all_leds_off() - return Response("All off\n" + leds, 200) - except Exception as ex: - return Response(str(ex), 500) + print("turning off all leds") + self._all_leds_off() def _all_leds_off(self) -> str: """" @@ -122,7 +111,7 @@ def _get_led_from_name(self, led_name: str) -> LED: """ entry = self.__get_entry_from_name(led_name) if 'disabled' in entry and entry['disabled']: - logging.debug(f"{led_name} is disabled") + print(f"{led_name} is disabled") return DisabledLED(led_name) return LED(entry['pin'], led_name) diff --git a/src/gunpla/nu_gundam.py b/src/gunpla/nu_gundam.py index deaa052..1f86edd 100644 --- a/src/gunpla/nu_gundam.py +++ b/src/gunpla/nu_gundam.py @@ -1,9 +1,7 @@ +import asyncio import random -import time from src.gunpla.base_gundam import BaseGundam -from src.phew.server import Request, Response -from src.pi import LED from src.pi.led_effect import LEDEffects @@ -18,40 +16,27 @@ def get_config_file(self) -> str: """ return "src/config/nu_gundam.json" - def activation(self, request: Request) -> Response: + async def activation(self) -> None: """ Runs the activation lightshow this is just a sample test """ head_led = self._get_led_from_name("head") head_led.on() - time.sleep(0.1) + await asyncio.sleep(0.1) head_led.off() - time.sleep(0.5) - LEDEffects.brighten(head_led) - return Response("finished", 200) + await asyncio.sleep(0.5) + await LEDEffects.brighten(head_led) - def fire_funnels(self, request: Request) -> Response: + async def fire_funnels(self, request: Request) -> None: """ Light Show that fires fin funnels in order """ - fin1: LED = self._get_led_from_name("fin_funnel_1") - fin2: LED = self._get_led_from_name("fin_funnel_2") - fin3: LED = self._get_led_from_name("fin_funnel_3") - fin4: LED = self._get_led_from_name("fin_funnel_4") - fin5: LED = self._get_led_from_name("fin_funnel_5") - fin6: LED = self._get_led_from_name("fin_funnel_6") - - LEDEffects.fire(fin1) - LEDEffects.fire(fin2) - LEDEffects.fire(fin3) - LEDEffects.fire(fin4) - LEDEffects.fire(fin5) - LEDEffects.fire(fin6) - - return Response("finished", 200) + for i in range(1, 7): + funnel = self._get_led_from_name(f"fin_funnel_{i}") + await LEDEffects.fire(funnel) - def random_funnels(self, request: Request) -> Response: + async def random_funnels(self, request: Request) -> Response: """ Randomly fires fin funnels that are enabled for an infinite amount of time This currently does not end and needs thread management to properly be able to be halted. @@ -68,7 +53,7 @@ def random_funnels(self, request: Request) -> Response: while True: funnel = random.choice(funnels) - LEDEffects.charge_fire(funnel) - time.sleep(random.uniform(0, 3)) + await LEDEffects.charge_fire(funnel) + await asyncio.sleep(random.uniform(0, 3)) return Response("finished", 200) diff --git a/src/gunpla/unicorn_banshee.py b/src/gunpla/unicorn_banshee.py index a869bf4..e4ee1cc 100644 --- a/src/gunpla/unicorn_banshee.py +++ b/src/gunpla/unicorn_banshee.py @@ -1,7 +1,6 @@ -import time +import asyncio from src.gunpla.base_gundam import BaseGundam -from src.phew.server import Request, Response from src.pi.led_effect import LEDEffects @@ -16,11 +15,10 @@ def get_config_file(self) -> str: """ return "src/config/unicorn_banshee.json" - def glow(self, request: Request) -> Response: + async def glow(self) -> None: """ Runs the glow lightshow """ - LEDEffects.brighten_all(self.get_all_leds()) - time.sleep(3) + await LEDEffects.brighten_all(self.get_all_leds()) + await asyncio.sleep(3) self._all_leds_off() - return Response("finished", 200) diff --git a/src/pi/led_effect.py b/src/pi/led_effect.py index 60fc4e0..bbc7a73 100644 --- a/src/pi/led_effect.py +++ b/src/pi/led_effect.py @@ -1,3 +1,4 @@ +import asyncio import time from machine import PWM @@ -11,44 +12,44 @@ class LEDEffects: """ @staticmethod - def blink(led: LED) -> None: + async def blink(led: LED) -> None: """ Blinks the onboard LED twice """ led.on() time.sleep(0.5) led.off() - time.sleep(0.5) + await asyncio.sleep(0.5) led.on() - time.sleep(0.5) + await asyncio.sleep(0.5) led.off() @staticmethod - def fire(led: LED) -> None: + async def fire(led: LED) -> None: """ A simple weapon effect of firing a beam rifle, has no charging effect :param led: :return: """ led.on() - time.sleep(.5) + await asyncio.sleep(.5) led.off() @staticmethod - def charge_fire(led: LED, charge_speed: int = 1) -> None: + async def charge_fire(led: LED, charge_speed: int = 1) -> None: """ A simple charging of a shot """ - LEDEffects.brighten(led, start_percent=0, end_percent=75, speed=charge_speed) + await LEDEffects.brighten(led, start_percent=0, end_percent=75, speed=charge_speed) led.off() - time.sleep(0.5) + await asyncio.sleep(0.5) # LEDEffects.brighten(led, start_percent=75, end_percent=100, speed=1) led.on() - time.sleep(2) + await asyncio.sleep(2) led.off() @staticmethod - def brighten(led: LED, start_percent: int = 0, end_percent: int = 100, speed: int = 10) -> None: + async def brighten(led: LED, start_percent: int = 0, end_percent: int = 100, speed: int = 10) -> None: """ Starting from start_pct goes to end_pct over the course of speed, brightens led :param led: @@ -70,11 +71,11 @@ def brighten(led: LED, start_percent: int = 0, end_percent: int = 100, speed: in for percent in range(start_percent, end_percent, step_rate): duty = int((percent / 100) * 65_535) pwm.duty_u16(duty) - time.sleep(sleep_time) + await asyncio.sleep(sleep_time) pwm.deinit() @staticmethod - def brighten_all(leds: list[LED], start_percent: int = 0, end_percent: int = 100, speed: int = 10) -> None: + async def brighten_all(leds: list[LED], start_percent: int = 0, end_percent: int = 100, speed: int = 10) -> None: """ The current banshee amount of leds passed in causes it to I guess stack overflow and silently crash around 30% so this method should not be used until that's addressed. @@ -96,7 +97,7 @@ def brighten_all(leds: list[LED], start_percent: int = 0, end_percent: int = 100 duty = int((percent / 100) * 65_535) for pwm in pwms: pwm.duty_u16(duty) - time.sleep(sleep_time) + await asyncio.sleep(sleep_time) for pwm in pwms: pwm.deinit() diff --git a/src/server/Microdot.py b/src/server/Microdot.py new file mode 100644 index 0000000..04da2ec --- /dev/null +++ b/src/server/Microdot.py @@ -0,0 +1,1554 @@ +""" +microdot +-------- + +The ``microdot`` module defines a few classes that help implement HTTP-based +servers for MicroPython and standard Python. +""" +import asyncio +import io +import re +import time + +try: + import orjson as json +except ImportError: + import json + +try: + from functools import partial + from inspect import iscoroutine, iscoroutinefunction + + async def invoke_handler(handler, *args, **kwargs): + """Invoke a handler and return the result. + + This method runs sync handlers in a thread pool executor. + """ + if iscoroutinefunction(handler): + ret = await handler(*args, **kwargs) + else: + ret = await asyncio.get_running_loop().run_in_executor( + None, partial(handler, *args, **kwargs)) + return ret +except ImportError: # pragma: no cover + def iscoroutine(coro): + return hasattr(coro, 'send') and hasattr(coro, 'throw') + + async def invoke_handler(handler, *args, **kwargs): + """Invoke a handler and return the result. + + This method runs sync handlers in the asyncio thread, which can + potentially cause blocking and performance issues. + """ + ret = handler(*args, **kwargs) + if iscoroutine(ret): + ret = await ret + return ret + +try: + from sys import print_exception +except ImportError: # pragma: no cover + import traceback + + def print_exception(exc): + traceback.print_exc() + +MUTED_SOCKET_ERRORS = [ + 32, # Broken pipe + 54, # Connection reset by peer + 104, # Connection reset by peer + 128, # Operation on closed socket +] + + +def urldecode(s): + if isinstance(s, str): + s = s.encode() + s = s.replace(b'+', b' ') + parts = s.split(b'%') + if len(parts) == 1: + return s.decode() + result = [parts[0]] + for item in parts[1:]: + if item == b'': + result.append(b'%') + else: + code = item[:2] + result.append(bytes([int(code, 16)])) + result.append(item[2:]) + return b''.join(result).decode() + + +def urlencode(s): + return s.replace('+', '%2B').replace(' ', '+').replace( + '%', '%25').replace('?', '%3F').replace('#', '%23').replace( + '&', '%26').replace('=', '%3D') + + +class NoCaseDict(dict): + """A subclass of dictionary that holds case-insensitive keys. + + :param initial_dict: an initial dictionary of key/value pairs to + initialize this object with. + + Example:: + + >>> d = NoCaseDict() + >>> d['Content-Type'] = 'text/html' + >>> print(d['Content-Type']) + text/html + >>> print(d['content-type']) + text/html + >>> print(d['CONTENT-TYPE']) + text/html + >>> del d['cOnTeNt-TyPe'] + >>> print(d) + {} + """ + + def __init__(self, initial_dict=None): + super().__init__(initial_dict or {}) + self.keymap = {k.lower(): k for k in self.keys() if k.lower() != k} + + def __setitem__(self, key, value): + kl = key.lower() + key = self.keymap.get(kl, key) + if kl != key: + self.keymap[kl] = key + super().__setitem__(key, value) + + def __getitem__(self, key): + kl = key.lower() + return super().__getitem__(self.keymap.get(kl, kl)) + + def __delitem__(self, key): + kl = key.lower() + super().__delitem__(self.keymap.get(kl, kl)) + + def __contains__(self, key): + kl = key.lower() + return self.keymap.get(kl, kl) in self.keys() + + def get(self, key, default=None): + kl = key.lower() + return super().get(self.keymap.get(kl, kl), default) + + def update(self, other_dict): + for key, value in other_dict.items(): + self[key] = value + + +def mro(cls): # pragma: no cover + """Return the method resolution order of a class. + + This is a helper function that returns the method resolution order of a + class. It is used by Microdot to find the best error handler to invoke for + the raised exception. + + In CPython, this function returns the ``__mro__`` attribute of the class. + In MicroPython, this function implements a recursive depth-first scanning + of the class hierarchy. + """ + if hasattr(cls, 'mro'): + return cls.__mro__ + + def _mro(cls): + m = [cls] + for base in cls.__bases__: + m += _mro(base) + return m + + mro_list = _mro(cls) + + # If a class appears multiple times (due to multiple inheritance) remove + # all but the last occurence. This matches the method resolution order + # of MicroPython, but not CPython. + mro_pruned = [] + for i in range(len(mro_list)): + base = mro_list.pop(0) + if base not in mro_list: + mro_pruned.append(base) + return mro_pruned + + +class MultiDict(dict): + """A subclass of dictionary that can hold multiple values for the same + key. It is used to hold key/value pairs decoded from query strings and + form submissions. + + :param initial_dict: an initial dictionary of key/value pairs to + initialize this object with. + + Example:: + + >>> d = MultiDict() + >>> d['sort'] = 'name' + >>> d['sort'] = 'email' + >>> print(d['sort']) + 'name' + >>> print(d.getlist('sort')) + ['name', 'email'] + """ + + def __init__(self, initial_dict=None): + super().__init__() + if initial_dict: + for key, value in initial_dict.items(): + self[key] = value + + def __setitem__(self, key, value): + if key not in self: + super().__setitem__(key, []) + super().__getitem__(key).append(value) + + def __getitem__(self, key): + return super().__getitem__(key)[0] + + def get(self, key, default=None, type=None): + """Return the value for a given key. + + :param key: The key to retrieve. + :param default: A default value to use if the key does not exist. + :param type: A type conversion callable to apply to the value. + + If the multidict contains more than one value for the requested key, + this method returns the first value only. + + Example:: + + >>> d = MultiDict() + >>> d['age'] = '42' + >>> d.get('age') + '42' + >>> d.get('age', type=int) + 42 + >>> d.get('name', default='noname') + 'noname' + """ + if key not in self: + return default + value = self[key] + if type is not None: + value = type(value) + return value + + def getlist(self, key, type=None): + """Return all the values for a given key. + + :param key: The key to retrieve. + :param type: A type conversion callable to apply to the values. + + If the requested key does not exist in the dictionary, this method + returns an empty list. + + Example:: + + >>> d = MultiDict() + >>> d.getlist('items') + [] + >>> d['items'] = '3' + >>> d.getlist('items') + ['3'] + >>> d['items'] = '56' + >>> d.getlist('items') + ['3', '56'] + >>> d.getlist('items', type=int) + [3, 56] + """ + if key not in self: + return [] + values = super().__getitem__(key) + if type is not None: + values = [type(value) for value in values] + return values + + +class AsyncBytesIO: + """An async wrapper for BytesIO.""" + + def __init__(self, data): + self.stream = io.BytesIO(data) + + async def read(self, n=-1): + return self.stream.read(n) + + async def readline(self): # pragma: no cover + return self.stream.readline() + + async def readexactly(self, n): # pragma: no cover + return self.stream.read(n) + + async def readuntil(self, separator=b'\n'): # pragma: no cover + return self.stream.readuntil(separator=separator) + + async def awrite(self, data): # pragma: no cover + return self.stream.write(data) + + async def aclose(self): # pragma: no cover + pass + + +class Request: + """An HTTP request.""" + #: Specify the maximum payload size that is accepted. Requests with larger + #: payloads will be rejected with a 413 status code. Applications can + #: change this maximum as necessary. + #: + #: Example:: + #: + #: Request.max_content_length = 1 * 1024 * 1024 # 1MB requests allowed + max_content_length = 16 * 1024 + + #: Specify the maximum payload size that can be stored in ``body``. + #: Requests with payloads that are larger than this size and up to + #: ``max_content_length`` bytes will be accepted, but the application will + #: only be able to access the body of the request by reading from + #: ``stream``. Set to 0 if you always access the body as a stream. + #: + #: Example:: + #: + #: Request.max_body_length = 4 * 1024 # up to 4KB bodies read + max_body_length = 16 * 1024 + + #: Specify the maximum length allowed for a line in the request. Requests + #: with longer lines will not be correctly interpreted. Applications can + #: change this maximum as necessary. + #: + #: Example:: + #: + #: Request.max_readline = 16 * 1024 # 16KB lines allowed + max_readline = 2 * 1024 + + class G: + pass + + def __init__(self, app, client_addr, method, url, http_version, headers, + body=None, stream=None, sock=None, url_prefix='', + subapp=None, scheme=None, route=None): + #: The application instance to which this request belongs. + self.app = app + #: The address of the client, as a tuple (host, port). + self.client_addr = client_addr + #: The HTTP method of the request. + self.method = method + #: The scheme of the request, either `http` or `https`. + self.scheme = scheme or 'http' + #: The request URL, including the path and query string, but not the + #: scheme or the host, which is available in the ``Host`` header. + self.url = url + #: The URL prefix, if the endpoint comes from a mounted + #: sub-application, or else ''. + self.url_prefix = url_prefix + #: The sub-application instance, or `None` if this isn't a mounted + #: endpoint. + self.subapp = subapp + #: The route function that handles this request. + self.route = route + #: The path portion of the URL. + self.path = url + #: The query string portion of the URL. + self.query_string = None + #: The parsed query string, as a + #: :class:`MultiDict ` object. + self.args = {} + #: A dictionary with the headers included in the request. + self.headers = headers + #: A dictionary with the cookies included in the request. + self.cookies = {} + #: The parsed ``Content-Length`` header. + self.content_length = 0 + #: The parsed ``Content-Type`` header. + self.content_type = None + #: A general purpose container for applications to store data during + #: the life of the request. + self.g = Request.G() + + self.http_version = http_version + if '?' in self.path: + self.path, self.query_string = self.path.split('?', 1) + self.args = self._parse_urlencoded(self.query_string) + + if 'Content-Length' in self.headers: + self.content_length = int(self.headers['Content-Length']) + if 'Content-Type' in self.headers: + self.content_type = self.headers['Content-Type'] + if 'Cookie' in self.headers: + for cookie in self.headers['Cookie'].split(';'): + c = cookie.strip().split('=', 1) + self.cookies[c[0]] = c[1] if len(c) > 1 else '' + + self._body = body + self.body_used = False + self._stream = stream + self.sock = sock + self._json = None + self._form = None + self._files = None + self.after_request_handlers = [] + + @staticmethod + async def create(app, client_reader, client_writer, client_addr, + scheme=None): + """Create a request object. + + :param app: The Microdot application instance. + :param client_reader: An input stream from where the request data can + be read. + :param client_writer: An output stream where the response data can be + written. + :param client_addr: The address of the client, as a tuple. + :param scheme: The scheme of the request, either 'http' or 'https'. + + This method is a coroutine. It returns a newly created ``Request`` + object. + """ + # request line + line = (await Request._safe_readline(client_reader)).strip().decode() + if not line: # pragma: no cover + return None + method, url, http_version = line.split() + http_version = http_version.split('/', 1)[1] + + # headers + headers = NoCaseDict() + content_length = 0 + while True: + line = (await Request._safe_readline( + client_reader)).strip().decode() + if line == '': + break + header, value = line.split(':', 1) + value = value.strip() + headers[header] = value + if header.lower() == 'content-length': + content_length = int(value) + + # body + body = b'' + if content_length and content_length <= Request.max_body_length: + body = await client_reader.readexactly(content_length) + stream = None + else: + body = b'' + stream = client_reader + + return Request(app, client_addr, method, url, http_version, headers, + body=body, stream=stream, + sock=(client_reader, client_writer), scheme=scheme) + + def _parse_urlencoded(self, urlencoded): + data = MultiDict() + if len(urlencoded) > 0: # pragma: no branch + if isinstance(urlencoded, str): + for kv in [pair.split('=', 1) + for pair in urlencoded.split('&') if pair]: + data[urldecode(kv[0])] = urldecode(kv[1]) \ + if len(kv) > 1 else '' + elif isinstance(urlencoded, bytes): # pragma: no branch + for kv in [pair.split(b'=', 1) + for pair in urlencoded.split(b'&') if pair]: + data[urldecode(kv[0])] = urldecode(kv[1]) \ + if len(kv) > 1 else b'' + return data + + @property + def body(self): + """The body of the request, as bytes.""" + return self._body + + @property + def stream(self): + """The body of the request, as a bytes stream.""" + if self._stream is None: + self._stream = AsyncBytesIO(self._body) + return self._stream + + @property + def json(self): + """The parsed JSON body, or ``None`` if the request does not have a + JSON body.""" + if self._json is None: + if self.content_type is None: + return None + mime_type = self.content_type.split(';')[0] + if mime_type != 'application/json': + return None + self._json = json.loads(self.body.decode()) + return self._json + + @property + def form(self): + """The parsed form submission body, as a + :class:`MultiDict ` object, or ``None`` if the + request does not have a form submission. + + Forms that are URL encoded are processed by default. For multipart + forms to be processed, the + :func:`with_form_data ` + decorator must be added to the route. + """ + if self._form is None: + if self.content_type is None: + return None + mime_type = self.content_type.split(';')[0] + if mime_type != 'application/x-www-form-urlencoded': + return None + self._form = self._parse_urlencoded(self.body) + return self._form + + @property + def files(self): + """The files uploaded in the request as a dictionary, or ``None`` if + the request does not have any files. + + The :func:`with_form_data ` + decorator must be added to the route that receives file uploads for + this property to be set. + """ + return self._files + + def after_request(self, f): + """Register a request-specific function to run after the request is + handled. Request-specific after request handlers run at the very end, + after the application's own after request handlers. The function must + take two arguments, the request and response objects. The return value + of the function must be the updated response object. + + Example:: + + @app.route('/') + def index(request): + # register a request-specific after request handler + @req.after_request + def func(request, response): + # ... + return response + + return 'Hello, World!' + + Note that the function is not called if the request handler raises an + exception and an error response is returned instead. + """ + self.after_request_handlers.append(f) + return f + + @staticmethod + async def _safe_readline(stream): + line = (await stream.readline()) + if len(line) > Request.max_readline: + raise ValueError('line too long') + return line + + +class Response: + """An HTTP response class. + + :param body: The body of the response. If a dictionary or list is given, + a JSON formatter is used to generate the body. If a file-like + object or an async generator is given, a streaming response is + used. If a string is given, it is encoded from UTF-8. Else, + the body should be a byte sequence. + :param status_code: The numeric HTTP status code of the response. The + default is 200. + :param headers: A dictionary of headers to include in the response. + :param reason: A custom reason phrase to add after the status code. The + default is "OK" for responses with a 200 status code and + "N/A" for any other status codes. + """ + types_map = { + 'css': 'text/css', + 'gif': 'image/gif', + 'html': 'text/html', + 'jpg': 'image/jpeg', + 'js': 'application/javascript', + 'json': 'application/json', + 'png': 'image/png', + 'txt': 'text/plain', + 'svg': 'image/svg+xml', + } + + send_file_buffer_size = 1024 + + #: The content type to use for responses that do not explicitly define a + #: ``Content-Type`` header. + default_content_type = 'text/plain' + + #: The default cache control max age used by :meth:`send_file`. A value + #: of ``None`` means that no ``Cache-Control`` header is added. + default_send_file_max_age = None + + #: Special response used to signal that a response does not need to be + #: written to the client. Used to exit WebSocket connections cleanly. + already_handled = None + + def __init__(self, body='', status_code=200, headers=None, reason=None): + if body is None and status_code == 200: + body = '' + status_code = 204 + self.status_code = status_code + self.headers = NoCaseDict(headers or {}) + self.reason = reason + if isinstance(body, (dict, list)): + body = json.dumps(body) + self.headers['Content-Type'] = 'application/json; charset=UTF-8' + if isinstance(body, str): + self.body = body.encode() + else: + # this applies to bytes, file-like objects or generators + self.body = body + self.is_head = False + + def set_cookie(self, cookie, value, path=None, domain=None, expires=None, + max_age=None, secure=False, http_only=False, + partitioned=False): + """Add a cookie to the response. + + :param cookie: The cookie's name. + :param value: The cookie's value. + :param path: The cookie's path. + :param domain: The cookie's domain. + :param expires: The cookie expiration time, as a ``datetime`` object + or a correctly formatted string. + :param max_age: The cookie's ``Max-Age`` value. + :param secure: The cookie's ``secure`` flag. + :param http_only: The cookie's ``HttpOnly`` flag. + :param partitioned: Whether the cookie is partitioned. + """ + http_cookie = '{cookie}={value}'.format(cookie=cookie, value=value) + if path: + http_cookie += '; Path=' + path + if domain: + http_cookie += '; Domain=' + domain + if expires: + if isinstance(expires, str): + http_cookie += '; Expires=' + expires + else: # pragma: no cover + http_cookie += '; Expires=' + time.strftime( + '%a, %d %b %Y %H:%M:%S GMT', expires.timetuple()) + if max_age is not None: + http_cookie += '; Max-Age=' + str(max_age) + if secure: + http_cookie += '; Secure' + if http_only: + http_cookie += '; HttpOnly' + if partitioned: + http_cookie += '; Partitioned' + if 'Set-Cookie' in self.headers: + self.headers['Set-Cookie'].append(http_cookie) + else: + self.headers['Set-Cookie'] = [http_cookie] + + def delete_cookie(self, cookie, **kwargs): + """Delete a cookie. + + :param cookie: The cookie's name. + :param kwargs: Any cookie options and flags supported by + :meth:`set_cookie() `. + Values given for ``expires`` and ``max_age`` are + ignored. + """ + kwargs.pop('expires', None) + kwargs.pop('max_age', None) + self.set_cookie(cookie, '', expires='Thu, 01 Jan 1970 00:00:01 GMT', + max_age=0, **kwargs) + + def complete(self): + if isinstance(self.body, bytes) and \ + 'Content-Length' not in self.headers: + self.headers['Content-Length'] = str(len(self.body)) + if 'Content-Type' not in self.headers: + self.headers['Content-Type'] = self.default_content_type + if 'charset=' not in self.headers['Content-Type']: + self.headers['Content-Type'] += '; charset=UTF-8' + + async def write(self, stream): + self.complete() + + try: + # status code + reason = self.reason if self.reason is not None else \ + ('OK' if self.status_code == 200 else 'N/A') + await stream.awrite('HTTP/1.0 {status_code} {reason}\r\n'.format( + status_code=self.status_code, reason=reason).encode()) + + # headers + for header, value in self.headers.items(): + values = value if isinstance(value, list) else [value] + for value in values: + await stream.awrite('{header}: {value}\r\n'.format( + header=header, value=value).encode()) + await stream.awrite(b'\r\n') + + # body + if not self.is_head: + iter = self.body_iter() + async for body in iter: + if isinstance(body, str): # pragma: no cover + body = body.encode() + try: + await stream.awrite(body) + except OSError as exc: # pragma: no cover + if exc.errno in MUTED_SOCKET_ERRORS or \ + exc.args[0] == 'Connection lost': + if hasattr(iter, 'aclose'): + await iter.aclose() + raise + if hasattr(iter, 'aclose'): # pragma: no branch + await iter.aclose() + + except OSError as exc: # pragma: no cover + if exc.errno in MUTED_SOCKET_ERRORS or \ + exc.args[0] == 'Connection lost': + pass + else: + raise + + def body_iter(self): + if hasattr(self.body, '__anext__'): + # response body is an async generator + return self.body + + response = self + + class iter: + ITER_UNKNOWN = 0 + ITER_SYNC_GEN = 1 + ITER_FILE_OBJ = 2 + ITER_NO_BODY = -1 + + def __aiter__(self): + if response.body: + self.i = self.ITER_UNKNOWN # need to determine type + else: + self.i = self.ITER_NO_BODY + return self + + async def __anext__(self): + if self.i == self.ITER_NO_BODY: + await self.aclose() + raise StopAsyncIteration + if self.i == self.ITER_UNKNOWN: + if hasattr(response.body, 'read'): + self.i = self.ITER_FILE_OBJ + elif hasattr(response.body, '__next__'): + self.i = self.ITER_SYNC_GEN + return next(response.body) + else: + self.i = self.ITER_NO_BODY + return response.body + elif self.i == self.ITER_SYNC_GEN: + try: + return next(response.body) + except StopIteration: + await self.aclose() + raise StopAsyncIteration + buf = response.body.read(response.send_file_buffer_size) + if iscoroutine(buf): # pragma: no cover + buf = await buf + if len(buf) < response.send_file_buffer_size: + self.i = self.ITER_NO_BODY + return buf + + async def aclose(self): + if hasattr(response.body, 'close'): + result = response.body.close() + if iscoroutine(result): # pragma: no cover + await result + + return iter() + + @classmethod + def redirect(cls, location, status_code=302): + """Return a redirect response. + + :param location: The URL to redirect to. + :param status_code: The 3xx status code to use for the redirect. The + default is 302. + """ + if '\x0d' in location or '\x0a' in location: + raise ValueError('invalid redirect URL') + return cls(status_code=status_code, headers={'Location': location}) + + @classmethod + def send_file(cls, filename, status_code=200, content_type=None, + stream=None, max_age=None, compressed=False, + file_extension=''): + """Send file contents in a response. + + :param filename: The filename of the file. + :param status_code: The 3xx status code to use for the redirect. The + default is 200. + :param content_type: The ``Content-Type`` header to use in the + response. If omitted, it is generated + automatically from the file extension of the + ``filename`` parameter. + :param stream: A file-like object to read the file contents from. If + a stream is given, the ``filename`` parameter is only + used when generating the ``Content-Type`` header. + :param max_age: The ``Cache-Control`` header's ``max-age`` value in + seconds. If omitted, the value of the + :attr:`Response.default_send_file_max_age` attribute is + used. + :param compressed: Whether the file is compressed. If ``True``, the + ``Content-Encoding`` header is set to ``gzip``. A + string with the header value can also be passed. + Note that when using this option the file must have + been compressed beforehand. This option only sets + the header. + :param file_extension: A file extension to append to the ``filename`` + parameter when opening the file, including the + dot. The extension given here is not considered + when generating the ``Content-Type`` header. + + Security note: The filename is assumed to be trusted. Never pass + filenames provided by the user without validating and sanitizing them + first. + """ + if content_type is None: + if compressed and filename.endswith('.gz'): + ext = filename[:-3].split('.')[-1] + else: + ext = filename.split('.')[-1] + if ext in Response.types_map: + content_type = Response.types_map[ext] + else: + content_type = 'application/octet-stream' + headers = {'Content-Type': content_type} + + if max_age is None: + max_age = cls.default_send_file_max_age + if max_age is not None: + headers['Cache-Control'] = 'max-age={}'.format(max_age) + + if compressed: + headers['Content-Encoding'] = compressed \ + if isinstance(compressed, str) else 'gzip' + + f = stream or open(filename + file_extension, 'rb') + return cls(body=f, status_code=status_code, headers=headers) + + +class URLPattern(): + """A class that represents the URL pattern for a route. + + :param url_pattern: The route URL pattern, which can include static and + dynamic path segments. Dynamic segments are enclosed in + ``<`` and ``>``. The type of the segment can be given + as a prefix, separated from the name with a colon. + Supported types are ``string`` (the default), + ``int`` and ``path``. Custom types can be registered + using the :meth:`URLPattern.register_type` method. + """ + + segment_patterns = { + 'string': '/([^/]+)', + 'int': '/(-?\\d+)', + 'path': '/(.+)', + } + segment_parsers = { + 'int': lambda value: int(value), + } + + @classmethod + def register_type(cls, type_name, pattern='[^/]+', parser=None): + """Register a new URL segment type. + + :param type_name: The name of the segment type to register. + :param pattern: The regular expression pattern to use when matching + this segment type. If not given, a default matcher for + a single path segment is used. + :param parser: A callable that will be used to parse and transform the + value of the segment. If omitted, the value is returned + as a string. + """ + cls.segment_patterns[type_name] = '/({})'.format(pattern) + cls.segment_parsers[type_name] = parser + + def __init__(self, url_pattern): + self.url_pattern = url_pattern + self.segments = [] + self.regex = None + + def compile(self): + """Generate a regular expression for the URL pattern. + + This method is automatically invoked the first time the URL pattern is + matched against a path. + """ + pattern = '' + for segment in self.url_pattern.lstrip('/').split('/'): + if segment and segment[0] == '<': + if segment[-1] != '>': + raise ValueError('invalid URL pattern') + segment = segment[1:-1] + if ':' in segment: + type_, name = segment.rsplit(':', 1) + else: + type_ = 'string' + name = segment + parser = None + if type_.startswith('re:'): + pattern += '/({pattern})'.format(pattern=type_[3:]) + else: + if type_ not in self.segment_patterns: + raise ValueError('invalid URL segment type') + pattern += self.segment_patterns[type_] + parser = self.segment_parsers.get(type_) + self.segments.append({'parser': parser, 'name': name, + 'type': type_}) + else: + pattern += '/' + segment + self.segments.append({'parser': None}) + self.regex = re.compile('^' + pattern + '$') + return self.regex + + def match(self, path): + """Match a path against the URL pattern. + + Returns a dictionary with the values of all dynamic path segments if a + matche is found, or ``None`` if the path does not match this pattern. + """ + args = {} + g = (self.regex or self.compile()).match(path) + if not g: + return + i = 1 + for segment in self.segments: + if 'name' not in segment: + continue + arg = g.group(i) + if segment['parser']: + arg = self.segment_parsers[segment['type']](arg) + if arg is None: + return + args[segment['name']] = arg + i += 1 + return args + + def __repr__(self): # pragma: no cover + return 'URLPattern: {}'.format(self.url_pattern) + + +class HTTPException(Exception): + def __init__(self, status_code, reason=None): + self.status_code = status_code + self.reason = reason or str(status_code) + ' error' + + def __repr__(self): # pragma: no cover + return 'HTTPException: {}'.format(self.status_code) + + +class Microdot: + """An HTTP application class. + + This class implements an HTTP application instance and is heavily + influenced by the ``Flask`` class of the Flask framework. It is typically + declared near the start of the main application script. + + Example:: + + from microdot import Microdot + + app = Microdot() + """ + + def __init__(self): + self.url_map = [] + self.before_request_handlers = [] + self.after_request_handlers = [] + self.after_error_request_handlers = [] + self.error_handlers = {} + self.options_handler = self.default_options_handler + self.ssl = False + self.debug = False + self.server = None + + def route(self, url_pattern, methods=None): + """Decorator that is used to register a function as a request handler + for a given URL. + + :param url_pattern: The URL pattern that will be compared against + incoming requests. + :param methods: The list of HTTP methods to be handled by the + decorated function. If omitted, only ``GET`` requests + are handled. + + The URL pattern can be a static path (for example, ``/users`` or + ``/api/invoices/search``) or a path with dynamic components enclosed + in ``<`` and ``>`` (for example, ``/users/`` or + ``/invoices//products``). Dynamic path components can also + include a type prefix, separated from the name with a colon (for + example, ``/users/``). The type can be ``string`` (the + default), ``int``, ``path`` or ``re:[regular-expression]``. + + The first argument of the decorated function must be + the request object. Any path arguments that are specified in the URL + pattern are passed as keyword arguments. The return value of the + function must be a :class:`Response` instance, or the arguments to + be passed to this class. + + Example:: + + @app.route('/') + def index(request): + return 'Hello, world!' + """ + def decorated(f): + self.url_map.append( + ([m.upper() for m in (methods or ['GET'])], + URLPattern(url_pattern), f, '', None)) + return f + return decorated + + def get(self, url_pattern): + """Decorator that is used to register a function as a ``GET`` request + handler for a given URL. + + :param url_pattern: The URL pattern that will be compared against + incoming requests. + + This decorator can be used as an alias to the ``route`` decorator with + ``methods=['GET']``. + + Example:: + + @app.get('/users/') + def get_user(request, id): + # ... + """ + return self.route(url_pattern, methods=['GET']) + + def post(self, url_pattern): + """Decorator that is used to register a function as a ``POST`` request + handler for a given URL. + + :param url_pattern: The URL pattern that will be compared against + incoming requests. + + This decorator can be used as an alias to the``route`` decorator with + ``methods=['POST']``. + + Example:: + + @app.post('/users') + def create_user(request): + # ... + """ + return self.route(url_pattern, methods=['POST']) + + def put(self, url_pattern): + """Decorator that is used to register a function as a ``PUT`` request + handler for a given URL. + + :param url_pattern: The URL pattern that will be compared against + incoming requests. + + This decorator can be used as an alias to the ``route`` decorator with + ``methods=['PUT']``. + + Example:: + + @app.put('/users/') + def edit_user(request, id): + # ... + """ + return self.route(url_pattern, methods=['PUT']) + + def patch(self, url_pattern): + """Decorator that is used to register a function as a ``PATCH`` request + handler for a given URL. + + :param url_pattern: The URL pattern that will be compared against + incoming requests. + + This decorator can be used as an alias to the ``route`` decorator with + ``methods=['PATCH']``. + + Example:: + + @app.patch('/users/') + def edit_user(request, id): + # ... + """ + return self.route(url_pattern, methods=['PATCH']) + + def delete(self, url_pattern): + """Decorator that is used to register a function as a ``DELETE`` + request handler for a given URL. + + :param url_pattern: The URL pattern that will be compared against + incoming requests. + + This decorator can be used as an alias to the ``route`` decorator with + ``methods=['DELETE']``. + + Example:: + + @app.delete('/users/') + def delete_user(request, id): + # ... + """ + return self.route(url_pattern, methods=['DELETE']) + + def before_request(self, f): + """Decorator to register a function to run before each request is + handled. The decorated function must take a single argument, the + request object. + + Example:: + + @app.before_request + def func(request): + # ... + """ + self.before_request_handlers.append(f) + return f + + def after_request(self, f): + """Decorator to register a function to run after each request is + handled. The decorated function must take two arguments, the request + and response objects. The return value of the function must be an + updated response object. + + Example:: + + @app.after_request + def func(request, response): + # ... + return response + """ + self.after_request_handlers.append(f) + return f + + def after_error_request(self, f): + """Decorator to register a function to run after an error response is + generated. The decorated function must take two arguments, the request + and response objects. The return value of the function must be an + updated response object. The handler is invoked for error responses + generated by Microdot, as well as those returned by application-defined + error handlers. + + Example:: + + @app.after_error_request + def func(request, response): + # ... + return response + """ + self.after_error_request_handlers.append(f) + return f + + def errorhandler(self, status_code_or_exception_class): + """Decorator to register a function as an error handler. Error handler + functions for numeric HTTP status codes must accept a single argument, + the request object. Error handler functions for Python exceptions + must accept two arguments, the request object and the exception + object. + + :param status_code_or_exception_class: The numeric HTTP status code or + Python exception class to + handle. + + Examples:: + + @app.errorhandler(404) + def not_found(request): + return 'Not found' + + @app.errorhandler(RuntimeError) + def runtime_error(request, exception): + return 'Runtime error' + """ + def decorated(f): + self.error_handlers[status_code_or_exception_class] = f + return f + return decorated + + def mount(self, subapp, url_prefix='', local=False): + """Mount a sub-application, optionally under the given URL prefix. + + :param subapp: The sub-application to mount. + :param url_prefix: The URL prefix to mount the application under. + :param local: When set to ``True``, the before, after and error request + handlers only apply to endpoints defined in the + sub-application. When ``False``, they apply to the entire + application. The default is ``False``. + """ + for methods, pattern, handler, _prefix, _subapp in subapp.url_map: + self.url_map.append( + (methods, URLPattern(url_prefix + pattern.url_pattern), + handler, url_prefix + _prefix, _subapp or subapp)) + if not local: + for handler in subapp.before_request_handlers: + self.before_request_handlers.append(handler) + subapp.before_request_handlers = [] + for handler in subapp.after_request_handlers: + self.after_request_handlers.append(handler) + subapp.after_request_handlers = [] + for handler in subapp.after_error_request_handlers: + self.after_error_request_handlers.append(handler) + subapp.after_error_request_handlers = [] + for status_code, handler in subapp.error_handlers.items(): + self.error_handlers[status_code] = handler + subapp.error_handlers = {} + + @staticmethod + def abort(status_code, reason=None): + """Abort the current request and return an error response with the + given status code. + + :param status_code: The numeric status code of the response. + :param reason: The reason for the response, which is included in the + response body. + + Example:: + + from microdot import abort + + @app.route('/users/') + def get_user(id): + user = get_user_by_id(id) + if user is None: + abort(404) + return user.to_dict() + """ + raise HTTPException(status_code, reason) + + async def start_server(self, host='0.0.0.0', port=5000, debug=False, + ssl=None): + """Start the Microdot web server as a coroutine. This coroutine does + not normally return, as the server enters an endless listening loop. + The :func:`shutdown` function provides a method for terminating the + server gracefully. + + :param host: The hostname or IP address of the network interface that + will be listening for requests. A value of ``'0.0.0.0'`` + (the default) indicates that the server should listen for + requests on all the available interfaces, and a value of + ``127.0.0.1`` indicates that the server should listen + for requests only on the internal networking interface of + the host. + :param port: The port number to listen for requests. The default is + port 5000. + :param debug: If ``True``, the server logs debugging information. The + default is ``False``. + :param ssl: An ``SSLContext`` instance or ``None`` if the server should + not use TLS. The default is ``None``. + + This method is a coroutine. + + Example:: + + import asyncio + from microdot import Microdot + + app = Microdot() + + @app.route('/') + async def index(request): + return 'Hello, world!' + + async def main(): + await app.start_server(debug=True) + + asyncio.run(main()) + """ + self.ssl = ssl + self.debug = debug + + async def serve(reader, writer): + if not hasattr(writer, 'awrite'): # pragma: no cover + # CPython provides the awrite and aclose methods in 3.8+ + async def awrite(self, data): + self.write(data) + await self.drain() + + async def aclose(self): + self.close() + await self.wait_closed() + + from types import MethodType + writer.awrite = MethodType(awrite, writer) + writer.aclose = MethodType(aclose, writer) + + await self.handle_request(reader, writer) + + if self.debug: # pragma: no cover + print('Starting async server on {host}:{port}...'.format( + host=host, port=port)) + + try: + self.server = await asyncio.start_server(serve, host, port, + ssl=ssl) + except TypeError: # pragma: no cover + self.server = await asyncio.start_server(serve, host, port) + + while True: + try: + if hasattr(self.server, 'serve_forever'): # pragma: no cover + try: + await self.server.serve_forever() + except asyncio.CancelledError: + pass + await self.server.wait_closed() + break + except AttributeError: # pragma: no cover + # the task hasn't been initialized in the server object yet + # wait a bit and try again + await asyncio.sleep(0.1) + + def run(self, host='0.0.0.0', port=5000, debug=False, ssl=None): + """Start the web server. This function does not normally return, as + the server enters an endless listening loop. The :func:`shutdown` + function provides a method for terminating the server gracefully. + + :param host: The hostname or IP address of the network interface that + will be listening for requests. A value of ``'0.0.0.0'`` + (the default) indicates that the server should listen for + requests on all the available interfaces, and a value of + ``127.0.0.1`` indicates that the server should listen + for requests only on the internal networking interface of + the host. + :param port: The port number to listen for requests. The default is + port 5000. + :param debug: If ``True``, the server logs debugging information. The + default is ``False``. + :param ssl: An ``SSLContext`` instance or ``None`` if the server should + not use TLS. The default is ``None``. + + Example:: + + from microdot import Microdot + + app = Microdot() + + @app.route('/') + async def index(request): + return 'Hello, world!' + + app.run(debug=True) + """ + asyncio.run(self.start_server(host=host, port=port, debug=debug, + ssl=ssl)) # pragma: no cover + + def shutdown(self): + """Request a server shutdown. The server will then exit its request + listening loop and the :func:`run` function will return. This function + can be safely called from a route handler, as it only schedules the + server to terminate as soon as the request completes. + + Example:: + + @app.route('/shutdown') + def shutdown(request): + request.app.shutdown() + return 'The server is shutting down...' + """ + self.server.close() + + def find_route(self, req): + method = req.method.upper() + if method == 'OPTIONS' and self.options_handler: + return self.options_handler(req), '', None + if method == 'HEAD': + method = 'GET' + f = 404 + p = '' + s = None + for route_methods, route_pattern, route_handler, url_prefix, subapp \ + in self.url_map: + req.url_args = route_pattern.match(req.path) + if req.url_args is not None: + p = url_prefix + s = subapp + if method in route_methods: + f = route_handler + break + else: + f = 405 + return f, p, s + + def default_options_handler(self, req): + allow = [] + for route_methods, route_pattern, _, _, _ in self.url_map: + if route_pattern.match(req.path) is not None: + allow.extend(route_methods) + if 'GET' in allow: + allow.append('HEAD') + allow.append('OPTIONS') + return {'Allow': ', '.join(allow)} + + async def handle_request(self, reader, writer): + req = None + try: + req = await Request.create(self, reader, writer, + writer.get_extra_info('peername')) + except OSError as exc: # pragma: no cover + if exc.errno in MUTED_SOCKET_ERRORS: + pass + else: + raise + except Exception as exc: # pragma: no cover + print_exception(exc) + + res = await self.dispatch_request(req) + try: + if res != Response.already_handled: # pragma: no branch + await res.write(writer) + await writer.aclose() + except OSError as exc: # pragma: no cover + if exc.errno in MUTED_SOCKET_ERRORS: + pass + else: + raise + if self.debug and req: # pragma: no cover + print('{method} {path} {status_code}'.format( + method=req.method, path=req.path, + status_code=res.status_code)) + + def get_request_handlers(self, req, attr, local_first=True): + handlers = getattr(self, attr + '_handlers') + local_handlers = getattr(req.subapp, attr + '_handlers') \ + if req and req.subapp else [] + return local_handlers + handlers if local_first \ + else handlers + local_handlers + + async def error_response(self, req, status_code, reason=None): + if req and req.subapp and status_code in req.subapp.error_handlers: + return await invoke_handler( + req.subapp.error_handlers[status_code], req) + elif status_code in self.error_handlers: + return await invoke_handler(self.error_handlers[status_code], req) + return reason or 'N/A', status_code + + async def dispatch_request(self, req): + after_request_handled = False + if req: + if req.content_length > req.max_content_length: + # the request body is larger than allowed + res = await self.error_response(req, 413, 'Payload too large') + else: + # find the route in the app's URL map + f, req.url_prefix, req.subapp = self.find_route(req) + + try: + res = None + if callable(f): + req.route = f + + # invoke the before request handlers + for handler in self.get_request_handlers( + req, 'before_request', False): + res = await invoke_handler(handler, req) + if res: + break + + # invoke the endpoint handler + if res is None: + res = await invoke_handler(f, req, **req.url_args) + + # process the response + if isinstance(res, int): + # an integer response is taken as a status code + # with an empty body + res = '', res + if isinstance(res, tuple): + # handle a tuple response + if isinstance(res[0], int): + # a tuple that starts with an int has an empty + # body + res = ('', res[0], + res[1] if len(res) > 1 else {}) + body = res[0] + if isinstance(res[1], int): + # extract the status code and headers (if + # available) + status_code = res[1] + headers = res[2] if len(res) > 2 else {} + else: + # if the status code is missing, assume 200 + status_code = 200 + headers = res[1] + res = Response(body, status_code, headers) + elif not isinstance(res, Response): + # any other response types are wrapped in a + # Response object + res = Response(res) + + # invoke the after request handlers + for handler in self.get_request_handlers( + req, 'after_request', True): + res = await invoke_handler( + handler, req, res) or res + for handler in req.after_request_handlers: + res = await invoke_handler( + handler, req, res) or res + after_request_handled = True + elif isinstance(f, dict): + # the response from an OPTIONS request is a dict with + # headers + res = Response(headers=f) + else: + # if the route is not found, return a 404 or 405 + # response as appropriate + res = await self.error_response(req, f, 'Not found') + except HTTPException as exc: + # an HTTP exception was raised while handling this request + res = await self.error_response(req, exc.status_code, + exc.reason) + except Exception as exc: + # an unexpected exception was raised while handling this + # request + print_exception(exc) + + # invoke the error handler for the exception class if one + # exists + handler = None + res = None + if req.subapp and exc.__class__ in \ + req.subapp.error_handlers: + handler = req.subapp.error_handlers[exc.__class__] + elif exc.__class__ in self.error_handlers: + handler = self.error_handlers[exc.__class__] + else: + # walk up the exception class hierarchy to try to find + # a handler + for c in mro(exc.__class__)[1:]: + if req.subapp and c in req.subapp.error_handlers: + handler = req.subapp.error_handlers[c] + break + elif c in self.error_handlers: + handler = self.error_handlers[c] + break + if handler: + try: + res = await invoke_handler(handler, req, exc) + except Exception as exc2: # pragma: no cover + print_exception(exc2) + if res is None: + # if there is still no response, issue a 500 error + res = await self.error_response( + req, 500, 'Internal server error') + else: + # if the request could not be parsed, issue a 400 error + res = await self.error_response(req, 400, 'Bad request') + if isinstance(res, tuple): + res = Response(*res) + elif not isinstance(res, Response): + res = Response(res) + if not after_request_handled: + # if the request did not finish due to an error, invoke the after + # error request handler + for handler in self.get_request_handlers( + req, 'after_error_request', True): + res = await invoke_handler( + handler, req, res) or res + res.is_head = (req and req.method == 'HEAD') + return res + + +Response.already_handled = Response() + +abort = Microdot.abort +redirect = Response.redirect +send_file = Response.send_file diff --git a/src/server/Networking.py b/src/server/Networking.py new file mode 100644 index 0000000..97ec8b1 --- /dev/null +++ b/src/server/Networking.py @@ -0,0 +1,28 @@ + + +async def connect_to_wifi(ssid: str, password: str, attempts=10) -> str or None: + """ + Method to connect the pico to wifi. + + :param ssid: + :param password: + :param attempts: Number of attempts to connect before halting + :return: + """ + import time + + import network + + print(f"Connecting to {ssid} with {password}") + + wlan = network.WLAN(network.STA_IF) + wlan.active(True) + wlan.connect(ssid, password) + + for attempt in range(attempts): + if wlan.isconnected(): + print(f"Connected to {ssid}") + return wlan.ifconfig()[0] + + print("WiFi failed") + return None diff --git a/src/server/RouteDecorator.py b/src/server/RouteDecorator.py new file mode 100644 index 0000000..0e764c6 --- /dev/null +++ b/src/server/RouteDecorator.py @@ -0,0 +1,32 @@ +import asyncio + + +def lightshow_route(app, manager_attr="current_task"): + """ + A decorator factory that handles task management and + standardized HTTP responses. + """ + def decorator(func): + async def wrapper(self, request, *args, **kwargs): + # 1. Kill any existing show to prevent flickering/overlap + existing_task = getattr(self, manager_attr, None) + if existing_task and not existing_task.done(): + existing_task.cancel() + try: + await existing_task # Wait for cleanup + except asyncio.CancelledError: + pass + + # 2. Start the new show and track it + # We wrap the function call to ensure we catch the 'self' instance + task = asyncio.create_task(func(self, request, *args, **kwargs)) + setattr(self, manager_attr, task) + + # 3. Standardized Response + return { + "status": "started", + "show": func.__name__, + # "message": "Gundam sequence initiated" + }, 202 + return wrapper + return decorator diff --git a/src/server/Wrappers.py b/src/server/Wrappers.py new file mode 100644 index 0000000..c0b99ca --- /dev/null +++ b/src/server/Wrappers.py @@ -0,0 +1,30 @@ +from src.server.RouteDecorator import lightshow_route + + +def safe_execution(func): + """ + Wraps an async route handler with a try/except block. + Returns 500 on failure, passes through success. + """ + + async def wrapper(*args, **kwargs): + try: + await func(*args, **kwargs) + return {"status": "success", "action": func.__name__}, 202 + + except Exception as e: + # Log the error to console (essential for debugging) + print(f"Server Error in {func.__name__}: {e}") + return {"error": str(e)}, 500 + + return wrapper + + +def create_show_handler(func): + # note order matters for these + @lightshow_route + @safe_execution + async def show_handler(request): + return await func(request) + + return show_handler diff --git a/src/server/__init__.py b/src/server/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/server/webserver.py b/src/server/webserver.py new file mode 100644 index 0000000..484660a --- /dev/null +++ b/src/server/webserver.py @@ -0,0 +1,102 @@ +import asyncio +import logging +import sys + +import network +from Microdot import Microdot, send_file + +from src import settings +from src.gunpla.generic_gundam import GenericGundam +from src.pi.board_led import BoardLED +from src.pi.LED import LED +from src.pi.led_effect import LEDEffects +from src.server.Wrappers import create_show_handler, safe_execution + + +class WebServer: + """ + Webserver that manages API routes and web pages for the Gunpla + """ + + def __init__(self, configuration: dict): + self.app = Microdot() + self.settings: dict = configuration + self.gundam: GenericGundam = settings.webserver['model'] + self.board_led: LED = BoardLED() + + @safe_execution + async def index(self): + """ + Returns the root index page + """ + #Todo fix this rendering + return await send_file("src/www/index.html") + + @safe_execution + async def canary(self): + """ + Sanity check to make sure webserver is running. + """ + asyncio.create_task(LEDEffects.blink(self.board_led)) + return "chirp", 202 + + # def catchall(self, request: Request): + # """ + # Generic handler to catch any routing error + # """ + # return Response("Not found", 404) + + async def _connect_to_wifi(self): + from src.server.Networking import connect_to_wifi + ipaddress: str = connect_to_wifi(self.settings['ssid'], self.settings['password']) + if ipaddress: + print(f"Server started on {ipaddress}") + await LEDEffects.blink(self.board_led) + else: + logging.error("Server failed to connect") + sys.exit("Cannot start server") + + async def run(self): + """ + Main runner of the webserver. Loads configurations, paths, connects to wifi and runs the server + """ + network.hostname(self.settings['hostname']) + print(f"Set hostname to {network.hostname()}") + + await self._connect_to_wifi() + + self._add_routes() + + await self.app.start_server(host='0.0.0.0', port=80, debug=True) + + def _add_routes(self): + """ + Given a server adds all endpoints for Leds and lightshows + """ + self.app.route("/")(self.index) + self.app.route("/index")(self.index) + self.app.route("/canary")(self.canary) + + @self.app.route("/led//on") + @safe_execution + async def led_on_handler(request, led_name): + return self.gundam.led_on(led_name) + + @self.app.route("/led//off") + @safe_execution + async def led_off_handler(request, led_name): + return self.gundam.led_off(led_name) + + self.app.route("/all/on")(self.gundam.all_on) + self.app.route("/all/off")(self.gundam.all_off) + + for lightshow in self.gundam.config['lightshow']: + path = f"/lightshow/{lightshow['path']}" + method_func = getattr(self.gundam, lightshow['method']) + + self.app.route(path)(create_show_handler(method_func)) + + # 404 Handler + @self.app.errorhandler(404) + def not_found(request): + return "Not found", 404 diff --git a/src/webserver.py b/src/webserver.py deleted file mode 100644 index 097093d..0000000 --- a/src/webserver.py +++ /dev/null @@ -1,83 +0,0 @@ -import sys - -import network - -from src import settings -from src.gunpla.generic_gundam import GenericGundam -from src.phew import connect_to_wifi, server -from src.phew.server import Request, Response, logging -from src.phew.template import render_template -from src.pi.board_led import BoardLED -from src.pi.LED import LED -from src.pi.led_effect import LEDEffects - - -class WebServer: - """ - Webserver that manages API routes and web pages for the Gunpla - """ - - def __init__(self, configuration: dict): - self.settings: dict = configuration - self.gundam: GenericGundam = settings.webserver['model'] - self.board_led: LED = BoardLED() - - def index(self, request: Request) -> Response: - """ - Returns the root index page - """ - return await render_template("src/www/index.html", - title=self.gundam.config['name'], - all_leds=self.gundam.config['leds'], - lightshows=self.gundam.config['lightshow']) - - def canary(self, request: Request) -> Response: - """ - Sanity check to make sure webserver is running. - """ - LEDEffects.blink(self.board_led) - return Response("chirp", 200) - - def catchall(self, request: Request): - """ - Generic handler to catch any routing error - """ - return Response("Not found", 404) - - def main(self): - """ - Main runner of the webserver. Loads configurations, paths, connects to wifi and runs the server - """ - network.hostname(self.settings['hostname']) - logging.info(f"Set hostname to {network.hostname()}") - logging.info(f"Connect to {self.settings['ssid']} with {self.settings['password']}") - ipaddress: str = connect_to_wifi(self.settings['ssid'], self.settings['password']) - if ipaddress: - logging.info(f"Server started on {ipaddress}") - LEDEffects.blink(self.board_led) - else: - logging.error("Server failed to connect") - sys.exit("Cannot start server") - - server.set_callback(self.catchall) - - self._add_routes() - - server.run() - - def _add_routes(self): - """ - Given a server adds all endpoints for Leds and lightshows - """ - server.add_route("/", self.index, methods=["GET"]) - server.add_route("/index", self.index, methods=["GET"]) - server.add_route("/canary", self.canary, methods=["GET"]) - server.set_callback(self.catchall) - - server.add_route("/led//on", self.gundam.led_on, methods=["GET"]) - server.add_route("/led//off", self.gundam.led_off, methods=["GET"]) - server.add_route("/all/on", self.gundam.all_on, methods=["GET"]) - server.add_route("/all/off", self.gundam.all_off, methods=["GET"]) - for lightshow in self.gundam.config['lightshow']: - server.add_route(f"/lightshow/{lightshow['path']}", getattr(self, lightshow['method']), - methods=["GET"]) From 9bcce1b3e514512239b55229e817d468018562e2 Mon Sep 17 00:00:00 2001 From: frozenwizard <172203+frozenwizard@users.noreply.github.com> Date: Tue, 30 Dec 2025 22:56:20 -0600 Subject: [PATCH 2/7] readjusting make, fix some linting and also cleanup --- Makefile | 17 ++++++++--------- src/gunpla/base_gundam.py | 5 ++--- src/gunpla/nu_gundam.py | 9 ++------- src/pi/LED.py | 18 +++++++++--------- src/pi/led_effect.py | 3 ++- src/server/Networking.py | 6 +++--- src/server/RouteDecorator.py | 1 + src/server/Wrappers.py | 5 +++++ src/server/webserver.py | 16 +++++----------- 9 files changed, 37 insertions(+), 43 deletions(-) diff --git a/Makefile b/Makefile index 61390ae..2ed2bb2 100644 --- a/Makefile +++ b/Makefile @@ -4,22 +4,21 @@ clean: ##Nukes the target and micropython dirs rm -rf target/ rm -rf micropython/ - -rm -rf src/phew -rm -rf temp/ +.PHONY: setup-python-env +setup-python-env: ## Setups python dev environment + pyenv install $(cat .python-version) + pyenv local $(cat .python-version) + @eval "$$(pyenv init -)" && pyenv virtualenv $(cat .python-version) gunpla + @eval "$$(pyenv init -)" && pyenv activate gunpla && pip install --require-virtualenv -r requirements.txt + .PHONY: setup setup: ## Downloads and setups required dependencies mkdir micropython wget -P micropython https://micropython.org/resources/firmware/rp2-pico-w-20230426-v1.20.0.uf2 mkdir temp - wget -P temp/phew https://github.com/pimoroni/phew/archive/refs/tags/v0.0.3.zip - unzip temp/phew/v0.0.3.zip -d temp/ - mv temp/phew-0.0.3/phew/ src/ cp src/config.py.template src/settings.py - pyenv install $(cat .python-version) - pyenv local $(cat .python-version) - @eval "$$(pyenv init -)" && pyenv virtualenv $(cat .python-version) gunpla - @eval "$$(pyenv init -)" && pyenv activate gunpla && pip install --require-virtualenv -r requirements.txt .PHONY: install-micropython-ubuntu install-micropython-ubuntu: ## Installs micropython to pi board on ubuntu @@ -58,7 +57,7 @@ format: ## Format the Python code .PHONY: lint lint: ## Lints the python code and documents markdownlint --fix **/*.md - pylint src/ --ignore src/phew + pylint src/ --ignore Microdot.py help: ## Show this help. diff --git a/src/gunpla/base_gundam.py b/src/gunpla/base_gundam.py index e798085..908fac9 100644 --- a/src/gunpla/base_gundam.py +++ b/src/gunpla/base_gundam.py @@ -65,7 +65,6 @@ def _all_leds_on(self) -> str: return leds # TODO: make this not need the safe_execution and do it when we register paths - @safe_execution def all_off(self) -> None: """ @@ -89,14 +88,14 @@ def _all_leds_off(self) -> str: leds += f"{led_name}: off\n" return leds - def get_all_leds(self, filter: list[str] = []) -> list[LED]: + def get_all_leds(self, ignore_list: list[str] = []) -> list[LED]: """ Returns all LEDs configured, enabled or disabled. But not the board_led """ leds = [] for led_entry in self.config['leds']: led_name = led_entry['name'] - if led_name in filter: + if led_name in ignore_list: continue led = self._get_led_from_name(led_name) leds.append(led) diff --git a/src/gunpla/nu_gundam.py b/src/gunpla/nu_gundam.py index 1f86edd..165129c 100644 --- a/src/gunpla/nu_gundam.py +++ b/src/gunpla/nu_gundam.py @@ -28,7 +28,7 @@ async def activation(self) -> None: await asyncio.sleep(0.5) await LEDEffects.brighten(head_led) - async def fire_funnels(self, request: Request) -> None: + async def fire_funnels(self) -> None: """ Light Show that fires fin funnels in order """ @@ -36,7 +36,7 @@ async def fire_funnels(self, request: Request) -> None: funnel = self._get_led_from_name(f"fin_funnel_{i}") await LEDEffects.fire(funnel) - async def random_funnels(self, request: Request) -> Response: + async def random_funnels(self) -> None: """ Randomly fires fin funnels that are enabled for an infinite amount of time This currently does not end and needs thread management to properly be able to be halted. @@ -48,12 +48,7 @@ async def random_funnels(self, request: Request) -> Response: # Filter out funnels that are disabled. funnels = [funnel for funnel in funnels if funnel.enabled()] - if not funnels: - return Response("No funnels can be fired", 400) - while True: funnel = random.choice(funnels) await LEDEffects.charge_fire(funnel) await asyncio.sleep(random.uniform(0, 3)) - - return Response("finished", 200) diff --git a/src/pi/LED.py b/src/pi/LED.py index f0e1272..3fa4c2e 100644 --- a/src/pi/LED.py +++ b/src/pi/LED.py @@ -7,35 +7,35 @@ class LED: """ def __init__(self, pin_number: int, name: str): - self.pin: Pin = Pin(pin_number, Pin.OUT) - self.led_name = name + self._pin: Pin = Pin(pin_number, Pin.OUT) + self._led_name = name def enabled(self) -> bool: - ''' - Returns false as the LED is not connected - ''' + """ + Returns true as the LED is connected + """ return True def on(self) -> None: """ Turns on the LED light """ - self.pin.on() + self._pin.on() def off(self): """ Turns off the LED light """ - self.pin.off() + self._pin.off() def name(self) -> str: """ :return: The name of LED """ - return self.led_name + return self._led_name def pin(self) -> Pin: """ :return: The underlying Raspberry Pi Pico Pin of the LED. """ - return self.pin + return self._pin diff --git a/src/pi/led_effect.py b/src/pi/led_effect.py index bbc7a73..4cf0212 100644 --- a/src/pi/led_effect.py +++ b/src/pi/led_effect.py @@ -78,7 +78,8 @@ async def brighten(led: LED, start_percent: int = 0, end_percent: int = 100, spe async def brighten_all(leds: list[LED], start_percent: int = 0, end_percent: int = 100, speed: int = 10) -> None: """ The current banshee amount of leds passed in causes it to I guess stack overflow and silently crash - around 30% so this method should not be used until that's addressed. + around 30% so this method should not be used until that's addressed. I also don't think i understand all there + is to PWM. """ pwms = [] for led in leds: diff --git a/src/server/Networking.py b/src/server/Networking.py index 97ec8b1..a0a146c 100644 --- a/src/server/Networking.py +++ b/src/server/Networking.py @@ -1,4 +1,6 @@ +import time +import network async def connect_to_wifi(ssid: str, password: str, attempts=10) -> str or None: """ @@ -9,9 +11,7 @@ async def connect_to_wifi(ssid: str, password: str, attempts=10) -> str or None: :param attempts: Number of attempts to connect before halting :return: """ - import time - import network print(f"Connecting to {ssid} with {password}") @@ -19,7 +19,7 @@ async def connect_to_wifi(ssid: str, password: str, attempts=10) -> str or None: wlan.active(True) wlan.connect(ssid, password) - for attempt in range(attempts): + for _ in range(attempts): if wlan.isconnected(): print(f"Connected to {ssid}") return wlan.ifconfig()[0] diff --git a/src/server/RouteDecorator.py b/src/server/RouteDecorator.py index 0e764c6..20481ad 100644 --- a/src/server/RouteDecorator.py +++ b/src/server/RouteDecorator.py @@ -13,6 +13,7 @@ async def wrapper(self, request, *args, **kwargs): if existing_task and not existing_task.done(): existing_task.cancel() try: + #TODO: turn all leds off await existing_task # Wait for cleanup except asyncio.CancelledError: pass diff --git a/src/server/Wrappers.py b/src/server/Wrappers.py index c0b99ca..e78ea2f 100644 --- a/src/server/Wrappers.py +++ b/src/server/Wrappers.py @@ -21,6 +21,11 @@ async def wrapper(*args, **kwargs): def create_show_handler(func): + """ + Given a function, wraps it as a lighthow_route and safe_execution. + :param func: + :return: + """ # note order matters for these @lightshow_route @safe_execution diff --git a/src/server/webserver.py b/src/server/webserver.py index 484660a..d876c98 100644 --- a/src/server/webserver.py +++ b/src/server/webserver.py @@ -4,6 +4,7 @@ import network from Microdot import Microdot, send_file +from Microdot import Request from src import settings from src.gunpla.generic_gundam import GenericGundam @@ -11,7 +12,7 @@ from src.pi.LED import LED from src.pi.led_effect import LEDEffects from src.server.Wrappers import create_show_handler, safe_execution - +from src.server.Networking import connect_to_wifi class WebServer: """ @@ -25,29 +26,22 @@ def __init__(self, configuration: dict): self.board_led: LED = BoardLED() @safe_execution - async def index(self): + async def index(self, request: Request): """ Returns the root index page """ - #Todo fix this rendering + # Todo fix this rendering return await send_file("src/www/index.html") @safe_execution - async def canary(self): + async def canary(self, request: Request): """ Sanity check to make sure webserver is running. """ asyncio.create_task(LEDEffects.blink(self.board_led)) return "chirp", 202 - # def catchall(self, request: Request): - # """ - # Generic handler to catch any routing error - # """ - # return Response("Not found", 404) - async def _connect_to_wifi(self): - from src.server.Networking import connect_to_wifi ipaddress: str = connect_to_wifi(self.settings['ssid'], self.settings['password']) if ipaddress: print(f"Server started on {ipaddress}") From 32322260c25acfda6019249d9ef346b79d8f5252 Mon Sep 17 00:00:00 2001 From: frozenwizard <172203+frozenwizard@users.noreply.github.com> Date: Fri, 2 Jan 2026 20:13:11 -0600 Subject: [PATCH 3/7] Added local testing. --- .gitignore | 4 +- Makefile | 2 +- main.py | 9 ++-- src/gunpla/base_gundam.py | 2 - src/gunpla/nu_gundam.py | 8 ++-- src/gunpla/unicorn_banshee.py | 4 +- src/hardware/Hardware.py | 20 +++++++++ src/hardware/Networking.py | 40 +++++++++++++++++ src/hardware/PicoHardwre.py | 30 +++++++++++++ src/hardware/VirtualHardware.py | 80 +++++++++++++++++++++++++++++++++ src/hardware/__init__.py | 14 ++++++ src/pi/LED.py | 5 ++- src/pi/board_led.py | 8 ++-- src/pi/led_effect.py | 24 +++++----- src/server/Microdot.py | 20 ++++----- src/server/Networking.py | 28 ------------ src/server/RouteDecorator.py | 22 +++++---- src/server/Wrappers.py | 16 ++++--- src/server/webserver.py | 34 ++++++-------- tests/LocalServerTest.py | 54 ++++++++++++++++++++++ tests/config/dummy_plug.json | 24 ++++++++++ 21 files changed, 343 insertions(+), 105 deletions(-) create mode 100644 src/hardware/Hardware.py create mode 100644 src/hardware/Networking.py create mode 100644 src/hardware/PicoHardwre.py create mode 100644 src/hardware/VirtualHardware.py create mode 100644 src/hardware/__init__.py delete mode 100644 src/server/Networking.py create mode 100644 tests/LocalServerTest.py create mode 100644 tests/config/dummy_plug.json diff --git a/.gitignore b/.gitignore index 183c39f..6e29362 100644 --- a/.gitignore +++ b/.gitignore @@ -7,7 +7,7 @@ micropython/ target/ src/settings.py -src/phew +__pycache__ temp/ #OS garbage -.DS_STORE \ No newline at end of file +.DS_STORE diff --git a/Makefile b/Makefile index 2ed2bb2..3fc4448 100644 --- a/Makefile +++ b/Makefile @@ -51,7 +51,7 @@ deploy: ## Deploys the built artifacts to the pi board #python tooling .PHONY: format format: ## Format the Python code - autopep8 -i -r src/ + autopep8 -i -r src/ tests/ isort . .PHONY: lint diff --git a/main.py b/main.py index a7bca5a..0e5dafd 100644 --- a/main.py +++ b/main.py @@ -1,12 +1,15 @@ -import asyncio +import uasyncio +import src from src import settings +from src.hardware.Hardware import Hardware from src.server.webserver import WebServer def main(): - webserver = WebServer(settings.webserver) - asyncio.run( webserver.run()) + hardware: Hardware = src.hardware.get_hardware() + webserver = WebServer(settings.webserver, hardware) + uasyncio.run( webserver.run()) if __name__ == "__main__": diff --git a/src/gunpla/base_gundam.py b/src/gunpla/base_gundam.py index 908fac9..b07f5d8 100644 --- a/src/gunpla/base_gundam.py +++ b/src/gunpla/base_gundam.py @@ -1,6 +1,5 @@ import json -from src.pi.board_led import BoardLED from src.pi.disabled_LED import DisabledLED from src.pi.LED import LED from src.server.Wrappers import safe_execution @@ -10,7 +9,6 @@ class BaseGundam: """ Base Gunpla. """ - board_led = BoardLED() def __init__(self): with open(self.get_config_file()) as config_contents: diff --git a/src/gunpla/nu_gundam.py b/src/gunpla/nu_gundam.py index 165129c..7709a2a 100644 --- a/src/gunpla/nu_gundam.py +++ b/src/gunpla/nu_gundam.py @@ -1,4 +1,4 @@ -import asyncio +import uasyncio import random from src.gunpla.base_gundam import BaseGundam @@ -23,9 +23,9 @@ async def activation(self) -> None: """ head_led = self._get_led_from_name("head") head_led.on() - await asyncio.sleep(0.1) + await uasyncio.sleep(0.1) head_led.off() - await asyncio.sleep(0.5) + await uasyncio.sleep(0.5) await LEDEffects.brighten(head_led) async def fire_funnels(self) -> None: @@ -51,4 +51,4 @@ async def random_funnels(self) -> None: while True: funnel = random.choice(funnels) await LEDEffects.charge_fire(funnel) - await asyncio.sleep(random.uniform(0, 3)) + await uasyncio.sleep(random.uniform(0, 3)) diff --git a/src/gunpla/unicorn_banshee.py b/src/gunpla/unicorn_banshee.py index e4ee1cc..31879bc 100644 --- a/src/gunpla/unicorn_banshee.py +++ b/src/gunpla/unicorn_banshee.py @@ -1,4 +1,4 @@ -import asyncio +import uasyncio from src.gunpla.base_gundam import BaseGundam from src.pi.led_effect import LEDEffects @@ -20,5 +20,5 @@ async def glow(self) -> None: Runs the glow lightshow """ await LEDEffects.brighten_all(self.get_all_leds()) - await asyncio.sleep(3) + await uasyncio.sleep(3) self._all_leds_off() diff --git a/src/hardware/Hardware.py b/src/hardware/Hardware.py new file mode 100644 index 0000000..3502e29 --- /dev/null +++ b/src/hardware/Hardware.py @@ -0,0 +1,20 @@ + +from src.hardware.Networking import Networking +from src.pi.board_led import BoardLED + + +class Hardware: + def get_pin(self, pin_num, mode): + raise NotImplementedError + + def get_pwm(self, pin_obj): + raise NotImplementedError + + def reset_pin(self, pin_num): + raise NotImplementedError + + def networking(self) -> Networking: + raise NotImplementedError + + def board_led(self) -> BoardLED: + raise NotImplementedError diff --git a/src/hardware/Networking.py b/src/hardware/Networking.py new file mode 100644 index 0000000..b11c2bd --- /dev/null +++ b/src/hardware/Networking.py @@ -0,0 +1,40 @@ +import uasyncio + + +class Networking: + def __init__(self): + pass + + def configure_host(self, host_name: str): + import network + + network.hostname(host_name) + print(f"Set hostname to {network.hostname()}") + + async def connect_to_wifi(self, ssid: str, password: str, attempts=10) -> str: + """ + Method to connect the pico to wifi. + + + :param ssid: + :param password: + :param attempts: Number of attempts to connect before halting + :return: + """ + import network + + print(f"Connecting to {ssid} with {password}") + + wlan = network.WLAN(network.STA_IF) + wlan.active(True) + wlan.connect(ssid, password) + + for _ in range(attempts): + if wlan.isconnected(): + print(f"Connected to {ssid}") + return wlan.ifconfig()[0] + #Wait to retry + await uasyncio.sleep(1) + + print("WiFi failed") + raise Exception("WiFi connection failed") diff --git a/src/hardware/PicoHardwre.py b/src/hardware/PicoHardwre.py new file mode 100644 index 0000000..17624da --- /dev/null +++ b/src/hardware/PicoHardwre.py @@ -0,0 +1,30 @@ +from src.hardware.Hardware import Hardware +from src.hardware.Networking import Networking +from src.pi.board_led import BoardLED + + +class PicoHardware(Hardware): + def __init__(self): + from machine import PWM, Pin + self.Pin = Pin + self.PWM = PWM + self.networking = Networking() + self.board_led = BoardLED() + + def networking(self): + return self.networking + + def board_led(self): + return self.board_led + + def get_pin(self, pin_num, mode="OUT"): + # machine.Pin.OUT is an integer constant + m = self.Pin.OUT if mode == "OUT" else self.Pin.IN + return self.Pin(pin_num, m) + + def get_pwm(self, pin_obj): + return self.PWM(pin_obj) + + def reset_pin(self, pin_num): + """Re-initializes the pin to clear PWM settings""" + return self.get_pin(pin_num, mode="OUT") diff --git a/src/hardware/VirtualHardware.py b/src/hardware/VirtualHardware.py new file mode 100644 index 0000000..31e5e25 --- /dev/null +++ b/src/hardware/VirtualHardware.py @@ -0,0 +1,80 @@ +import src.hardware +from src.hardware.Hardware import Hardware +from src.hardware.Networking import Networking +from src.pi.board_led import BoardLED + + +class VirtualHardware(Hardware): + """ + Virtual Hardware + Fake implementation so the webserver can run without a physical Raspberry Pi Pico connected to it. + """ + class MockPin: + """ + Partial implementation of Pico Pin, only using the currently needed methods + """ + + def __init__(self, num): + self.num = num + + def on(self): + print(f"[SIM] Pin {self.num} ON") + + def off(self): + print(f"[SIM] Pin {self.num} OFF") + + class MockPWM: + """ + Partial implementation of Pico PWM, only using the currently needed methods + """ + + def __init__(self, p): + self.p = p + + def freq(self, f): + pass + + def duty_u16(self, d): + print(f"[SIM] PWM {self.p.num} @ {d}") + + def deinit(self): + print(f"[SIM] PWM {self.p.num} De-initialized") + + class NoOpNetworking(Networking): + """ + Networking implementation that does nothing + """ + + def __init__(self): + pass + + async def connect_to_wifi(self, ssid: str, password: str, attempts=10) -> str or None: + return "123.123.123.123" + + def configure_host(self, host_name: str): + pass + + class MockBoardLED(BoardLED): + def __init__(self): + self._pin = src.hardware.VirtualHardware.MockPin(1) + self.led_name = "Mock Board LED" + + def __init__(self): + self.pin = self.MockPin + self.pwm = self.MockPWM + + def get_pin(self, pin_num, mode="OUT"): + return self.pin(pin_num) + + def get_pwm(self, pin_obj): + return self.pwm(pin_obj) + + def board_led(self) -> BoardLED: + return self.MockBoardLED() + + @property + def networking(self) -> Networking: + return self.NoOpNetworking() + + def reset_pin(self, pin_num): + print(f"[SIM] Pin {pin_num} reset to standard GPIO") diff --git a/src/hardware/__init__.py b/src/hardware/__init__.py new file mode 100644 index 0000000..29d8e73 --- /dev/null +++ b/src/hardware/__init__.py @@ -0,0 +1,14 @@ +import sys + +from src.hardware.Hardware import Hardware +from src.hardware.PicoHardwre import PicoHardware +from src.hardware.VirtualHardware import VirtualHardware + + +def get_hardware() -> Hardware: + """ + :return: the appropriate hardware + """ + if sys.platform == 'rp2': + return PicoHardware() + return VirtualHardware() diff --git a/src/pi/LED.py b/src/pi/LED.py index 3fa4c2e..54a750a 100644 --- a/src/pi/LED.py +++ b/src/pi/LED.py @@ -1,4 +1,3 @@ -from machine import Pin class LED: @@ -7,6 +6,8 @@ class LED: """ def __init__(self, pin_number: int, name: str): + from machine import Pin + self._pin: Pin = Pin(pin_number, Pin.OUT) self._led_name = name @@ -34,7 +35,7 @@ def name(self) -> str: """ return self._led_name - def pin(self) -> Pin: + def pin(self): """ :return: The underlying Raspberry Pi Pico Pin of the LED. """ diff --git a/src/pi/board_led.py b/src/pi/board_led.py index d99d5f3..068eeed 100644 --- a/src/pi/board_led.py +++ b/src/pi/board_led.py @@ -1,5 +1,3 @@ -from machine import Pin - from src.pi.LED import LED @@ -9,5 +7,7 @@ class BoardLED(LED): """ def __init__(self): # pylint # pylint: disable=(super-init-not-called - self.pin: Pin = Pin("LED", Pin.OUT) - self.led_name = "Board LED" + from machine import Pin + + self._pin: Pin = Pin("LED", Pin.OUT) + self._led_name = "Board LED" diff --git a/src/pi/led_effect.py b/src/pi/led_effect.py index 4cf0212..935ee28 100644 --- a/src/pi/led_effect.py +++ b/src/pi/led_effect.py @@ -1,8 +1,7 @@ -import asyncio +import uasyncio import time -from machine import PWM - +import src.hardware from src.pi import LED @@ -19,9 +18,9 @@ async def blink(led: LED) -> None: led.on() time.sleep(0.5) led.off() - await asyncio.sleep(0.5) + await uasyncio.sleep(0.5) led.on() - await asyncio.sleep(0.5) + await uasyncio.sleep(0.5) led.off() @staticmethod @@ -32,7 +31,7 @@ async def fire(led: LED) -> None: :return: """ led.on() - await asyncio.sleep(.5) + await uasyncio.sleep(.5) led.off() @staticmethod @@ -42,10 +41,10 @@ async def charge_fire(led: LED, charge_speed: int = 1) -> None: """ await LEDEffects.brighten(led, start_percent=0, end_percent=75, speed=charge_speed) led.off() - await asyncio.sleep(0.5) + await uasyncio.sleep(0.5) # LEDEffects.brighten(led, start_percent=75, end_percent=100, speed=1) led.on() - await asyncio.sleep(2) + await uasyncio.sleep(2) led.off() @staticmethod @@ -58,8 +57,7 @@ async def brighten(led: LED, start_percent: int = 0, end_percent: int = 100, spe :param speed: :return: """ - - pwm = PWM(led.pin) + pwm = src.hardware.get_hardware().get_pwm(led.pin) pwm.freq(1000) step_rate = 10 @@ -71,7 +69,7 @@ async def brighten(led: LED, start_percent: int = 0, end_percent: int = 100, spe for percent in range(start_percent, end_percent, step_rate): duty = int((percent / 100) * 65_535) pwm.duty_u16(duty) - await asyncio.sleep(sleep_time) + await uasyncio.sleep(sleep_time) pwm.deinit() @staticmethod @@ -83,7 +81,7 @@ async def brighten_all(leds: list[LED], start_percent: int = 0, end_percent: int """ pwms = [] for led in leds: - pwm = PWM(led.pin) + pwm = src.hardware.get_hardware().get_pwm(led.pin) pwm.freq(1000) pwms.append(pwm) @@ -98,7 +96,7 @@ async def brighten_all(leds: list[LED], start_percent: int = 0, end_percent: int duty = int((percent / 100) * 65_535) for pwm in pwms: pwm.duty_u16(duty) - await asyncio.sleep(sleep_time) + await uasyncio.sleep(sleep_time) for pwm in pwms: pwm.deinit() diff --git a/src/server/Microdot.py b/src/server/Microdot.py index 04da2ec..b58b234 100644 --- a/src/server/Microdot.py +++ b/src/server/Microdot.py @@ -5,7 +5,7 @@ The ``microdot`` module defines a few classes that help implement HTTP-based servers for MicroPython and standard Python. """ -import asyncio +import uasyncio import io import re import time @@ -27,7 +27,7 @@ async def invoke_handler(handler, *args, **kwargs): if iscoroutinefunction(handler): ret = await handler(*args, **kwargs) else: - ret = await asyncio.get_running_loop().run_in_executor( + ret = await uasyncio.get_running_loop().run_in_executor( None, partial(handler, *args, **kwargs)) return ret except ImportError: # pragma: no cover @@ -37,7 +37,7 @@ def iscoroutine(coro): async def invoke_handler(handler, *args, **kwargs): """Invoke a handler and return the result. - This method runs sync handlers in the asyncio thread, which can + This method runs sync handlers in the uasyncio thread, which can potentially cause blocking and performance issues. """ ret = handler(*args, **kwargs) @@ -1239,7 +1239,7 @@ async def start_server(self, host='0.0.0.0', port=5000, debug=False, Example:: - import asyncio + import uasyncio from microdot import Microdot app = Microdot() @@ -1251,7 +1251,7 @@ async def index(request): async def main(): await app.start_server(debug=True) - asyncio.run(main()) + uasyncio.run(main()) """ self.ssl = ssl self.debug = debug @@ -1278,24 +1278,24 @@ async def aclose(self): host=host, port=port)) try: - self.server = await asyncio.start_server(serve, host, port, + self.server = await uasyncio.start_server(serve, host, port, ssl=ssl) except TypeError: # pragma: no cover - self.server = await asyncio.start_server(serve, host, port) + self.server = await uasyncio.start_server(serve, host, port) while True: try: if hasattr(self.server, 'serve_forever'): # pragma: no cover try: await self.server.serve_forever() - except asyncio.CancelledError: + except uasyncio.CancelledError: pass await self.server.wait_closed() break except AttributeError: # pragma: no cover # the task hasn't been initialized in the server object yet # wait a bit and try again - await asyncio.sleep(0.1) + await uasyncio.sleep(0.1) def run(self, host='0.0.0.0', port=5000, debug=False, ssl=None): """Start the web server. This function does not normally return, as @@ -1328,7 +1328,7 @@ async def index(request): app.run(debug=True) """ - asyncio.run(self.start_server(host=host, port=port, debug=debug, + uasyncio.run(self.start_server(host=host, port=port, debug=debug, ssl=ssl)) # pragma: no cover def shutdown(self): diff --git a/src/server/Networking.py b/src/server/Networking.py deleted file mode 100644 index a0a146c..0000000 --- a/src/server/Networking.py +++ /dev/null @@ -1,28 +0,0 @@ - -import time -import network - -async def connect_to_wifi(ssid: str, password: str, attempts=10) -> str or None: - """ - Method to connect the pico to wifi. - - :param ssid: - :param password: - :param attempts: Number of attempts to connect before halting - :return: - """ - - - print(f"Connecting to {ssid} with {password}") - - wlan = network.WLAN(network.STA_IF) - wlan.active(True) - wlan.connect(ssid, password) - - for _ in range(attempts): - if wlan.isconnected(): - print(f"Connected to {ssid}") - return wlan.ifconfig()[0] - - print("WiFi failed") - return None diff --git a/src/server/RouteDecorator.py b/src/server/RouteDecorator.py index 20481ad..ea87ed8 100644 --- a/src/server/RouteDecorator.py +++ b/src/server/RouteDecorator.py @@ -1,27 +1,29 @@ -import asyncio +import uasyncio -def lightshow_route(app, manager_attr="current_task"): +def lightshow_route(gunpla, manager_attr="current_task"): """ A decorator factory that handles task management and standardized HTTP responses. """ def decorator(func): - async def wrapper(self, request, *args, **kwargs): + print("decorating route") + + async def wrapper(request, *args, **kwargs): + print("wrapping route") # 1. Kill any existing show to prevent flickering/overlap - existing_task = getattr(self, manager_attr, None) + existing_task = getattr(gunpla, manager_attr, None) if existing_task and not existing_task.done(): existing_task.cancel() try: - #TODO: turn all leds off + gunpla.all_off() await existing_task # Wait for cleanup - except asyncio.CancelledError: + except uasyncio.CancelledError: pass # 2. Start the new show and track it - # We wrap the function call to ensure we catch the 'self' instance - task = asyncio.create_task(func(self, request, *args, **kwargs)) - setattr(self, manager_attr, task) + task = uasyncio.create_task(func()) + setattr(gunpla, manager_attr, task) # 3. Standardized Response return { @@ -29,5 +31,7 @@ async def wrapper(self, request, *args, **kwargs): "show": func.__name__, # "message": "Gundam sequence initiated" }, 202 + print("wappred route") return wrapper + print("lighted route") return decorator diff --git a/src/server/Wrappers.py b/src/server/Wrappers.py index e78ea2f..fe4f583 100644 --- a/src/server/Wrappers.py +++ b/src/server/Wrappers.py @@ -7,11 +7,13 @@ def safe_execution(func): Returns 500 on failure, passes through success. """ + print("Safe execution") + async def wrapper(*args, **kwargs): try: + print("Trying to execute {}".format(func.__name__)) await func(*args, **kwargs) return {"status": "success", "action": func.__name__}, 202 - except Exception as e: # Log the error to console (essential for debugging) print(f"Server Error in {func.__name__}: {e}") @@ -20,16 +22,20 @@ async def wrapper(*args, **kwargs): return wrapper -def create_show_handler(func): +def create_show_handler(func, gundam_instance): """ Given a function, wraps it as a lighthow_route and safe_execution. + :param gundam_instance: + if needed we can add back in the request obj to show_handler and func(request) :param func: :return: """ # note order matters for these - @lightshow_route + @lightshow_route(gunpla=gundam_instance) @safe_execution - async def show_handler(request): - return await func(request) + async def show_handler(): + print("show_handler") + return await func() + print("created") return show_handler diff --git a/src/server/webserver.py b/src/server/webserver.py index d876c98..dd8f7d7 100644 --- a/src/server/webserver.py +++ b/src/server/webserver.py @@ -1,29 +1,23 @@ -import asyncio -import logging +import uasyncio import sys -import network -from Microdot import Microdot, send_file -from Microdot import Request - -from src import settings from src.gunpla.generic_gundam import GenericGundam -from src.pi.board_led import BoardLED -from src.pi.LED import LED +from src.hardware.Hardware import Hardware from src.pi.led_effect import LEDEffects +from src.server.Microdot import Microdot, Request, send_file from src.server.Wrappers import create_show_handler, safe_execution -from src.server.Networking import connect_to_wifi + class WebServer: """ Webserver that manages API routes and web pages for the Gunpla """ - def __init__(self, configuration: dict): + def __init__(self, configuration: dict, hardware: Hardware): self.app = Microdot() self.settings: dict = configuration - self.gundam: GenericGundam = settings.webserver['model'] - self.board_led: LED = BoardLED() + self.gundam: GenericGundam = configuration['model'] + self.hardware: Hardware = hardware @safe_execution async def index(self, request: Request): @@ -38,25 +32,24 @@ async def canary(self, request: Request): """ Sanity check to make sure webserver is running. """ - asyncio.create_task(LEDEffects.blink(self.board_led)) + uasyncio.create_task(LEDEffects.blink(self.hardware.board_led)) return "chirp", 202 async def _connect_to_wifi(self): - ipaddress: str = connect_to_wifi(self.settings['ssid'], self.settings['password']) + ipaddress: str = await self.hardware.networking.connect_to_wifi(self.settings['ssid'], self.settings['password']) if ipaddress: print(f"Server started on {ipaddress}") - await LEDEffects.blink(self.board_led) + await LEDEffects.blink(self.hardware.board_led) else: - logging.error("Server failed to connect") + print("Server failed to connect") sys.exit("Cannot start server") async def run(self): """ Main runner of the webserver. Loads configurations, paths, connects to wifi and runs the server """ - network.hostname(self.settings['hostname']) - print(f"Set hostname to {network.hostname()}") + self.hardware.networking.configure_host(self.settings['hostname']) await self._connect_to_wifi() self._add_routes() @@ -88,9 +81,10 @@ async def led_off_handler(request, led_name): path = f"/lightshow/{lightshow['path']}" method_func = getattr(self.gundam, lightshow['method']) - self.app.route(path)(create_show_handler(method_func)) + self.app.route(path)(create_show_handler(method_func, self.gundam)) # 404 Handler @self.app.errorhandler(404) def not_found(request): + # TODO: list all routes return "Not found", 404 diff --git a/tests/LocalServerTest.py b/tests/LocalServerTest.py new file mode 100644 index 0000000..71b723a --- /dev/null +++ b/tests/LocalServerTest.py @@ -0,0 +1,54 @@ + +import uasyncio +import json + +from src.gunpla.generic_gundam import GenericGundam +from src.hardware.VirtualHardware import VirtualHardware +from src.server.webserver import WebServer + + +class DummyPlug(GenericGundam): + + def __init__(self, model_config: json): + self.config = model_config + + def get_config_file(self) -> str: + return "tests/config/dummy_plug.json" + + # TODO: refactor light show url creation to be a decorator and also not need the request. + async def activation(self): + print("Dummy Plug activation") + return + + +def main(): + + model_config = { + "$schema": "gundam_led_config.schema.json", + + "name": "dummy plug", + "leds": [ + {"name": "head", "pin": 0, "color": "green", "disabled": True}, + ], + "lightshow": [ + { + "name": "Activate Gundam", + "path": "activation", + "method": "activation" + } + ] + } + + test_settings = { + "ssid": "wifi", + "password": 'wifi-pass', + "hostname": 'rei', + "model": DummyPlug(model_config) + } + + webserver = WebServer(test_settings, VirtualHardware()) + uasyncio.run(webserver.run()) + + +if __name__ == "__main__": + main() diff --git a/tests/config/dummy_plug.json b/tests/config/dummy_plug.json new file mode 100644 index 0000000..671d0a7 --- /dev/null +++ b/tests/config/dummy_plug.json @@ -0,0 +1,24 @@ +{ + "$schema": "gundam_led_config.schema.json", + + "name": "dummy plug", + "leds": [ + {"name": "head", "pin": 0, "color": "green", "disabled": true}, + {"name": "head_camera", "pin": 1, "color": "green", "disabled": true}, + {"name": "chest", "pin": 2, "color": "green", "disabled": true}, + {"name": "beamsaber1", "pin": 3, "color": "red", "disabled": true}, + {"name": "beamsaber2", "pin": 4, "color": "red", "disabled": true}, + {"name": "weapon1", "pin": 5, "color": "red", "disabled": true}, + {"name": "weapon1_camera", "pin": 6, "color": "green", "disabled": true}, + {"name": "weapon2", "pin": 7, "color": "red", "disabled": true}, + {"name": "weapon2_camera", "pin": 8, "color": "green", "disabled": true}, + {"name": "vernier", "pin": 10, "color": "blue", "disabled": true} + ], + "lightshow": [ + { + "name": "Activate Gundam", + "path": "activation", + "method": "activation" + } + ] +} \ No newline at end of file From 680e0159e0f53208e439279df358b801e783b97f Mon Sep 17 00:00:00 2001 From: frozenwizard <172203+frozenwizard@users.noreply.github.com> Date: Fri, 2 Jan 2026 23:14:07 -0600 Subject: [PATCH 4/7] Adding credits and cleanup --- CREDITS.md | 47 ++++++++++++++++++++ src/gunpla/nu_gundam.py | 3 +- src/hardware/Hardware.py | 5 +++ src/hardware/Networking.py | 12 +++-- src/hardware/PicoHardwre.py | 7 +++ src/hardware/VirtualHardware.py | 4 ++ src/pi/led_effect.py | 3 +- src/server/RouteDecorator.py | 12 ++--- src/server/Wrappers.py | 10 ++--- src/server/{ => microdot}/Microdot.py | 7 +-- src/server/microdot/__init__.py | 0 src/server/webserver.py | 7 ++- tests/LocalServerTest.py | 24 ++++++---- tests/config/{dummy_plug.json => virgo.json} | 2 +- 14 files changed, 107 insertions(+), 36 deletions(-) create mode 100644 CREDITS.md rename src/server/{ => microdot}/Microdot.py (99%) create mode 100644 src/server/microdot/__init__.py rename tests/config/{dummy_plug.json => virgo.json} (94%) diff --git a/CREDITS.md b/CREDITS.md new file mode 100644 index 0000000..643e8b9 --- /dev/null +++ b/CREDITS.md @@ -0,0 +1,47 @@ +# Project Credits and Licensing Notices + +This project is primarily licensed under the **GNU General Public License v3.0 (GPLv3)**. +It incorporates third-party open-source software as detailed below. + +--- + +## Microdot Web Framework + +This project uses the **Microdot** web framework for its server implementation. + +- **Author:** Miguel Grinberg +- **Source:** [https://github.com/miguelgrinberg/microdot](https://github.com/miguelgrinberg/microdot) +- **License:** MIT License + +### MIT License (Microdot) +Copyright (c) 2019 Miguel Grinberg + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. + +--- + +## Additional Acknowledgments + +- **utemplate:** Included within the Microdot library distribution. + - **Author:** Paul Sokolovsky + - **License:** MIT License + +--- + +*Note: While individual components remain under their respective licenses, the combined work as a whole is distributed under the terms of the GPLv3.* \ No newline at end of file diff --git a/src/gunpla/nu_gundam.py b/src/gunpla/nu_gundam.py index 7709a2a..1ac995b 100644 --- a/src/gunpla/nu_gundam.py +++ b/src/gunpla/nu_gundam.py @@ -1,6 +1,7 @@ -import uasyncio import random +import uasyncio + from src.gunpla.base_gundam import BaseGundam from src.pi.led_effect import LEDEffects diff --git a/src/hardware/Hardware.py b/src/hardware/Hardware.py index 3502e29..2163619 100644 --- a/src/hardware/Hardware.py +++ b/src/hardware/Hardware.py @@ -4,6 +4,10 @@ class Hardware: + """ + Hardware abstraction layer. + """ + def get_pin(self, pin_num, mode): raise NotImplementedError @@ -13,6 +17,7 @@ def get_pwm(self, pin_obj): def reset_pin(self, pin_num): raise NotImplementedError + @property def networking(self) -> Networking: raise NotImplementedError diff --git a/src/hardware/Networking.py b/src/hardware/Networking.py index b11c2bd..6798760 100644 --- a/src/hardware/Networking.py +++ b/src/hardware/Networking.py @@ -5,7 +5,12 @@ class Networking: def __init__(self): pass - def configure_host(self, host_name: str): + def configure_host(self, host_name: str) -> None: + """ + Configures the hostname of the Pi. + :param host_name: + :return: + """ import network network.hostname(host_name) @@ -15,11 +20,10 @@ async def connect_to_wifi(self, ssid: str, password: str, attempts=10) -> str: """ Method to connect the pico to wifi. - :param ssid: :param password: :param attempts: Number of attempts to connect before halting - :return: + :return: ip or raises exception """ import network @@ -33,7 +37,7 @@ async def connect_to_wifi(self, ssid: str, password: str, attempts=10) -> str: if wlan.isconnected(): print(f"Connected to {ssid}") return wlan.ifconfig()[0] - #Wait to retry + # Wait to retry await uasyncio.sleep(1) print("WiFi failed") diff --git a/src/hardware/PicoHardwre.py b/src/hardware/PicoHardwre.py index 17624da..8db8fb0 100644 --- a/src/hardware/PicoHardwre.py +++ b/src/hardware/PicoHardwre.py @@ -4,6 +4,10 @@ class PicoHardware(Hardware): + """ + Abstraction to access the Raspberry Pi Pico hardware. + """ + def __init__(self): from machine import PWM, Pin self.Pin = Pin @@ -12,6 +16,9 @@ def __init__(self): self.board_led = BoardLED() def networking(self): + """ + :return: networking + """ return self.networking def board_led(self): diff --git a/src/hardware/VirtualHardware.py b/src/hardware/VirtualHardware.py index 31e5e25..541103a 100644 --- a/src/hardware/VirtualHardware.py +++ b/src/hardware/VirtualHardware.py @@ -55,6 +55,10 @@ def configure_host(self, host_name: str): pass class MockBoardLED(BoardLED): + """ + Fake implementation of the onboard led. + """ + def __init__(self): self._pin = src.hardware.VirtualHardware.MockPin(1) self.led_name = "Mock Board LED" diff --git a/src/pi/led_effect.py b/src/pi/led_effect.py index 935ee28..59e1f63 100644 --- a/src/pi/led_effect.py +++ b/src/pi/led_effect.py @@ -1,6 +1,7 @@ -import uasyncio import time +import uasyncio + import src.hardware from src.pi import LED diff --git a/src/server/RouteDecorator.py b/src/server/RouteDecorator.py index ea87ed8..18c5f80 100644 --- a/src/server/RouteDecorator.py +++ b/src/server/RouteDecorator.py @@ -7,11 +7,8 @@ def lightshow_route(gunpla, manager_attr="current_task"): standardized HTTP responses. """ def decorator(func): - print("decorating route") - async def wrapper(request, *args, **kwargs): - print("wrapping route") - # 1. Kill any existing show to prevent flickering/overlap + # If any existing lightshow is running, cancel it and turn off all the LEDs. existing_task = getattr(gunpla, manager_attr, None) if existing_task and not existing_task.done(): existing_task.cancel() @@ -21,17 +18,14 @@ async def wrapper(request, *args, **kwargs): except uasyncio.CancelledError: pass - # 2. Start the new show and track it + # Start the new show and track it task = uasyncio.create_task(func()) setattr(gunpla, manager_attr, task) - # 3. Standardized Response + # Return common HTTP response that the show started. return { "status": "started", "show": func.__name__, - # "message": "Gundam sequence initiated" }, 202 - print("wappred route") return wrapper - print("lighted route") return decorator diff --git a/src/server/Wrappers.py b/src/server/Wrappers.py index fe4f583..6261f45 100644 --- a/src/server/Wrappers.py +++ b/src/server/Wrappers.py @@ -7,15 +7,13 @@ def safe_execution(func): Returns 500 on failure, passes through success. """ - print("Safe execution") - async def wrapper(*args, **kwargs): try: - print("Trying to execute {}".format(func.__name__)) + print(f"Trying to execute {func.__name__}") await func(*args, **kwargs) return {"status": "success", "action": func.__name__}, 202 except Exception as e: - # Log the error to console (essential for debugging) + # Log the error to console print(f"Server Error in {func.__name__}: {e}") return {"error": str(e)}, 500 @@ -24,7 +22,7 @@ async def wrapper(*args, **kwargs): def create_show_handler(func, gundam_instance): """ - Given a function, wraps it as a lighthow_route and safe_execution. + Helper that when given a function, wraps it as a lighthow_route and safe_execution. :param gundam_instance: if needed we can add back in the request obj to show_handler and func(request) :param func: @@ -34,8 +32,6 @@ def create_show_handler(func, gundam_instance): @lightshow_route(gunpla=gundam_instance) @safe_execution async def show_handler(): - print("show_handler") return await func() - print("created") return show_handler diff --git a/src/server/Microdot.py b/src/server/microdot/Microdot.py similarity index 99% rename from src/server/Microdot.py rename to src/server/microdot/Microdot.py index b58b234..33e402b 100644 --- a/src/server/Microdot.py +++ b/src/server/microdot/Microdot.py @@ -5,11 +5,12 @@ The ``microdot`` module defines a few classes that help implement HTTP-based servers for MicroPython and standard Python. """ -import uasyncio import io import re import time +import uasyncio + try: import orjson as json except ImportError: @@ -1279,7 +1280,7 @@ async def aclose(self): try: self.server = await uasyncio.start_server(serve, host, port, - ssl=ssl) + ssl=ssl) except TypeError: # pragma: no cover self.server = await uasyncio.start_server(serve, host, port) @@ -1329,7 +1330,7 @@ async def index(request): app.run(debug=True) """ uasyncio.run(self.start_server(host=host, port=port, debug=debug, - ssl=ssl)) # pragma: no cover + ssl=ssl)) # pragma: no cover def shutdown(self): """Request a server shutdown. The server will then exit its request diff --git a/src/server/microdot/__init__.py b/src/server/microdot/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/server/webserver.py b/src/server/webserver.py index dd8f7d7..f763de9 100644 --- a/src/server/webserver.py +++ b/src/server/webserver.py @@ -1,10 +1,11 @@ -import uasyncio import sys +import uasyncio + from src.gunpla.generic_gundam import GenericGundam from src.hardware.Hardware import Hardware from src.pi.led_effect import LEDEffects -from src.server.Microdot import Microdot, Request, send_file +from src.server.microdot.Microdot import Microdot, Request, send_file from src.server.Wrappers import create_show_handler, safe_execution @@ -63,6 +64,7 @@ def _add_routes(self): self.app.route("/")(self.index) self.app.route("/index")(self.index) self.app.route("/canary")(self.canary) + # TODO add a /stop route to stop all lightshows @self.app.route("/led//on") @safe_execution @@ -77,6 +79,7 @@ async def led_off_handler(request, led_name): self.app.route("/all/on")(self.gundam.all_on) self.app.route("/all/off")(self.gundam.all_off) + # dynamically add all lightshow paths for lightshow in self.gundam.config['lightshow']: path = f"/lightshow/{lightshow['path']}" method_func = getattr(self.gundam, lightshow['method']) diff --git a/tests/LocalServerTest.py b/tests/LocalServerTest.py index 71b723a..011c137 100644 --- a/tests/LocalServerTest.py +++ b/tests/LocalServerTest.py @@ -1,23 +1,31 @@ -import uasyncio import json +import uasyncio + from src.gunpla.generic_gundam import GenericGundam from src.hardware.VirtualHardware import VirtualHardware from src.server.webserver import WebServer +""" +Sanity check class to run the webserver in local mode when a Raspberry pi is not needed. +""" + -class DummyPlug(GenericGundam): +class MobileDoll(GenericGundam): + """ + Mobile Doll + """ def __init__(self, model_config: json): self.config = model_config def get_config_file(self) -> str: - return "tests/config/dummy_plug.json" + return "tests/config/virgo.json" # TODO: refactor light show url creation to be a decorator and also not need the request. async def activation(self): - print("Dummy Plug activation") + print("Mobile Doll activation") return @@ -26,13 +34,13 @@ def main(): model_config = { "$schema": "gundam_led_config.schema.json", - "name": "dummy plug", + "name": "Virgo", "leds": [ {"name": "head", "pin": 0, "color": "green", "disabled": True}, ], "lightshow": [ { - "name": "Activate Gundam", + "name": "Activate Virgo", "path": "activation", "method": "activation" } @@ -42,8 +50,8 @@ def main(): test_settings = { "ssid": "wifi", "password": 'wifi-pass', - "hostname": 'rei', - "model": DummyPlug(model_config) + "hostname": 'virgo', + "model": MobileDoll(model_config) } webserver = WebServer(test_settings, VirtualHardware()) diff --git a/tests/config/dummy_plug.json b/tests/config/virgo.json similarity index 94% rename from tests/config/dummy_plug.json rename to tests/config/virgo.json index 671d0a7..bf852a2 100644 --- a/tests/config/dummy_plug.json +++ b/tests/config/virgo.json @@ -1,5 +1,5 @@ { - "$schema": "gundam_led_config.schema.json", + "$schema": "../../gundam_led_config.schema.json", "name": "dummy plug", "leds": [ From b3633b85c73906a804e637b3973de123abdee913 Mon Sep 17 00:00:00 2001 From: frozenwizard <172203+frozenwizard@users.noreply.github.com> Date: Fri, 2 Jan 2026 23:24:10 -0600 Subject: [PATCH 5/7] eva -> gundam --- tests/config/virgo.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/config/virgo.json b/tests/config/virgo.json index bf852a2..9753e6f 100644 --- a/tests/config/virgo.json +++ b/tests/config/virgo.json @@ -1,7 +1,7 @@ { "$schema": "../../gundam_led_config.schema.json", - "name": "dummy plug", + "name": "virgo", "leds": [ {"name": "head", "pin": 0, "color": "green", "disabled": true}, {"name": "head_camera", "pin": 1, "color": "green", "disabled": true}, From e9910be319eb42f7e14f16f3b1ead3ee3c720f2a Mon Sep 17 00:00:00 2001 From: frozenwizard <172203+frozenwizard@users.noreply.github.com> Date: Fri, 2 Jan 2026 23:33:46 -0600 Subject: [PATCH 6/7] Fixing markdown and tooling help --- CREDITS.md | 6 ++++-- Makefile | 14 ++++++++++---- 2 files changed, 14 insertions(+), 6 deletions(-) diff --git a/CREDITS.md b/CREDITS.md index 643e8b9..6d8f8d4 100644 --- a/CREDITS.md +++ b/CREDITS.md @@ -1,6 +1,6 @@ # Project Credits and Licensing Notices -This project is primarily licensed under the **GNU General Public License v3.0 (GPLv3)**. +This project is primarily licensed under the **GNU General Public License v3.0 (GPLv3)**. It incorporates third-party open-source software as detailed below. --- @@ -14,6 +14,7 @@ This project uses the **Microdot** web framework for its server implementation. - **License:** MIT License ### MIT License (Microdot) + Copyright (c) 2019 Miguel Grinberg Permission is hereby granted, free of charge, to any person obtaining a copy @@ -44,4 +45,5 @@ SOFTWARE. --- -*Note: While individual components remain under their respective licenses, the combined work as a whole is distributed under the terms of the GPLv3.* \ No newline at end of file +*Note: While individual components remain under their respective licenses, +the combined work as a whole is distributed under the terms of the GPLv3.* diff --git a/Makefile b/Makefile index 3fc4448..05c1c3d 100644 --- a/Makefile +++ b/Makefile @@ -48,17 +48,23 @@ deploy: ## Deploys the built artifacts to the pi board rshell rm -r /pyboard/* rshell cp -r target/* /pyboard/ -#python tooling +.PHONY: format-other +format-other: ## Formats anything else + markdownlint -c .markdownlint.yaml --fix **/*.md + .PHONY: format -format: ## Format the Python code +format: format-python format-other ## Formats everything + +#python tooling +.PHONY: format-python +format-python: ## Format the Python code autopep8 -i -r src/ tests/ isort . .PHONY: lint lint: ## Lints the python code and documents - markdownlint --fix **/*.md + markdownlint -c .markdownlint.yaml **/*.md pylint src/ --ignore Microdot.py - help: ## Show this help. @fgrep -h "##" $(MAKEFILE_LIST) | fgrep -v fgrep | sed -e 's/\\$$//' | sed -e 's/##//' From ebd54d20c589b08e73f48d74363148d0a5013839 Mon Sep 17 00:00:00 2001 From: frozenwizard <172203+frozenwizard@users.noreply.github.com> Date: Fri, 2 Jan 2026 23:36:41 -0600 Subject: [PATCH 7/7] forgot the space --- CREDITS.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CREDITS.md b/CREDITS.md index 6d8f8d4..42cac9a 100644 --- a/CREDITS.md +++ b/CREDITS.md @@ -45,5 +45,5 @@ SOFTWARE. --- -*Note: While individual components remain under their respective licenses, +*Note: While individual components remain under their respective licenses, the combined work as a whole is distributed under the terms of the GPLv3.*