diff --git a/.gitignore b/.gitignore index 183c39f..6e29362 100644 --- a/.gitignore +++ b/.gitignore @@ -7,7 +7,7 @@ micropython/ target/ src/settings.py -src/phew +__pycache__ temp/ #OS garbage -.DS_STORE \ No newline at end of file +.DS_STORE diff --git a/CREDITS.md b/CREDITS.md new file mode 100644 index 0000000..42cac9a --- /dev/null +++ b/CREDITS.md @@ -0,0 +1,49 @@ +# Project Credits and Licensing Notices + +This project is primarily licensed under the **GNU General Public License v3.0 (GPLv3)**. +It incorporates third-party open-source software as detailed below. + +--- + +## Microdot Web Framework + +This project uses the **Microdot** web framework for its server implementation. + +- **Author:** Miguel Grinberg +- **Source:** [https://github.com/miguelgrinberg/microdot](https://github.com/miguelgrinberg/microdot) +- **License:** MIT License + +### MIT License (Microdot) + +Copyright (c) 2019 Miguel Grinberg + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. + +--- + +## Additional Acknowledgments + +- **utemplate:** Included within the Microdot library distribution. + - **Author:** Paul Sokolovsky + - **License:** MIT License + +--- + +*Note: While individual components remain under their respective licenses, +the combined work as a whole is distributed under the terms of the GPLv3.* diff --git a/Makefile b/Makefile index 61390ae..05c1c3d 100644 --- a/Makefile +++ b/Makefile @@ -4,22 +4,21 @@ clean: ##Nukes the target and micropython dirs rm -rf target/ rm -rf micropython/ - -rm -rf src/phew -rm -rf temp/ +.PHONY: setup-python-env +setup-python-env: ## Setups python dev environment + pyenv install $(cat .python-version) + pyenv local $(cat .python-version) + @eval "$$(pyenv init -)" && pyenv virtualenv $(cat .python-version) gunpla + @eval "$$(pyenv init -)" && pyenv activate gunpla && pip install --require-virtualenv -r requirements.txt + .PHONY: setup setup: ## Downloads and setups required dependencies mkdir micropython wget -P micropython https://micropython.org/resources/firmware/rp2-pico-w-20230426-v1.20.0.uf2 mkdir temp - wget -P temp/phew https://github.com/pimoroni/phew/archive/refs/tags/v0.0.3.zip - unzip temp/phew/v0.0.3.zip -d temp/ - mv temp/phew-0.0.3/phew/ src/ cp src/config.py.template src/settings.py - pyenv install $(cat .python-version) - pyenv local $(cat .python-version) - @eval "$$(pyenv init -)" && pyenv virtualenv $(cat .python-version) gunpla - @eval "$$(pyenv init -)" && pyenv activate gunpla && pip install --require-virtualenv -r requirements.txt .PHONY: install-micropython-ubuntu install-micropython-ubuntu: ## Installs micropython to pi board on ubuntu @@ -49,17 +48,23 @@ deploy: ## Deploys the built artifacts to the pi board rshell rm -r /pyboard/* rshell cp -r target/* /pyboard/ -#python tooling +.PHONY: format-other +format-other: ## Formats anything else + markdownlint -c .markdownlint.yaml --fix **/*.md + .PHONY: format -format: ## Format the Python code - autopep8 -i -r src/ +format: format-python format-other ## Formats everything + +#python tooling +.PHONY: format-python +format-python: ## Format the Python code + autopep8 -i -r src/ tests/ isort . .PHONY: lint lint: ## Lints the python code and documents - markdownlint --fix **/*.md - pylint src/ --ignore src/phew - + markdownlint -c .markdownlint.yaml **/*.md + pylint src/ --ignore Microdot.py help: ## Show this help. @fgrep -h "##" $(MAKEFILE_LIST) | fgrep -v fgrep | sed -e 's/\\$$//' | sed -e 's/##//' diff --git a/main.py b/main.py index 87dd34c..0e5dafd 100644 --- a/main.py +++ b/main.py @@ -1,11 +1,16 @@ +import uasyncio + +import src from src import settings -from src.webserver import WebServer +from src.hardware.Hardware import Hardware +from src.server.webserver import WebServer def main(): - webserver = WebServer(settings.webserver) - webserver.main() + hardware: Hardware = src.hardware.get_hardware() + webserver = WebServer(settings.webserver, hardware) + uasyncio.run( webserver.run()) if __name__ == "__main__": - main() + main() \ No newline at end of file diff --git a/src/gunpla/base_gundam.py b/src/gunpla/base_gundam.py index a675aeb..b07f5d8 100644 --- a/src/gunpla/base_gundam.py +++ b/src/gunpla/base_gundam.py @@ -1,17 +1,14 @@ import json -from src.phew import server -from src.phew.server import Request, Response, logging -from src.pi.board_led import BoardLED from src.pi.disabled_LED import DisabledLED from src.pi.LED import LED +from src.server.Wrappers import safe_execution class BaseGundam: """ Base Gunpla. """ - board_led = BoardLED() def __init__(self): with open(self.get_config_file()) as config_contents: @@ -24,40 +21,31 @@ def get_config_file(self) -> str: """ raise Exception("Not implemented") - def led_on(self, request: Request, led_name: str) -> Response: + def led_on(self, led_name: str): """ Turns a Single LED on by name """ - logging.info(f"turning on {led_name}") - try: - led = self._get_led_from_name(led_name) - led.on() - return Response(f"{led_name}: on", 200) - except Exception as ex: - return Response(str(ex), 500) + print(f"turning on {led_name}") + led = self._get_led_from_name(led_name) + led.on() - def led_off(self, request: Request, led_name: str) -> Response: + def led_off(self, led_name: str) -> None: """ Turns a single LED off by name """ - logging.info(f"turning off {led_name}") - try: - led = self._get_led_from_name(led_name) - led.off() - return Response(f"{led_name}: off", 200) - except Exception as ex: - return Response(str(ex), 500) + print(f"turning off {led_name}") + led = self._get_led_from_name(led_name) + led.off() + + # TODO: make this not need the safe_execution and do it when we register paths - def all_on(self, request: Request) -> Response: + @safe_execution + def all_on(self) -> None: """ Turns all configured LED's on. """ - logging.info("turning on all leds") - try: - leds = self._all_leds_on() - return Response(f"All on\n {leds} ", 200) - except Exception as ex: - return Response(str(ex), 500) + print("turning on all leds") + self._all_leds_on() def _all_leds_on(self) -> str: """ @@ -74,16 +62,14 @@ def _all_leds_on(self) -> str: leds += f"{led_name}: on\n" return leds - def all_off(self, request: Request) -> Response: + # TODO: make this not need the safe_execution and do it when we register paths + @safe_execution + def all_off(self) -> None: """ Turns all configured LED's off """ - logging.info("turning off all leds") - try: - leds = self._all_leds_off() - return Response("All off\n" + leds, 200) - except Exception as ex: - return Response(str(ex), 500) + print("turning off all leds") + self._all_leds_off() def _all_leds_off(self) -> str: """" @@ -100,14 +86,14 @@ def _all_leds_off(self) -> str: leds += f"{led_name}: off\n" return leds - def get_all_leds(self, filter: list[str] = []) -> list[LED]: + def get_all_leds(self, ignore_list: list[str] = []) -> list[LED]: """ Returns all LEDs configured, enabled or disabled. But not the board_led """ leds = [] for led_entry in self.config['leds']: led_name = led_entry['name'] - if led_name in filter: + if led_name in ignore_list: continue led = self._get_led_from_name(led_name) leds.append(led) @@ -122,7 +108,7 @@ def _get_led_from_name(self, led_name: str) -> LED: """ entry = self.__get_entry_from_name(led_name) if 'disabled' in entry and entry['disabled']: - logging.debug(f"{led_name} is disabled") + print(f"{led_name} is disabled") return DisabledLED(led_name) return LED(entry['pin'], led_name) diff --git a/src/gunpla/nu_gundam.py b/src/gunpla/nu_gundam.py index deaa052..1ac995b 100644 --- a/src/gunpla/nu_gundam.py +++ b/src/gunpla/nu_gundam.py @@ -1,9 +1,8 @@ import random -import time + +import uasyncio from src.gunpla.base_gundam import BaseGundam -from src.phew.server import Request, Response -from src.pi import LED from src.pi.led_effect import LEDEffects @@ -18,40 +17,27 @@ def get_config_file(self) -> str: """ return "src/config/nu_gundam.json" - def activation(self, request: Request) -> Response: + async def activation(self) -> None: """ Runs the activation lightshow this is just a sample test """ head_led = self._get_led_from_name("head") head_led.on() - time.sleep(0.1) + await uasyncio.sleep(0.1) head_led.off() - time.sleep(0.5) - LEDEffects.brighten(head_led) - return Response("finished", 200) + await uasyncio.sleep(0.5) + await LEDEffects.brighten(head_led) - def fire_funnels(self, request: Request) -> Response: + async def fire_funnels(self) -> None: """ Light Show that fires fin funnels in order """ - fin1: LED = self._get_led_from_name("fin_funnel_1") - fin2: LED = self._get_led_from_name("fin_funnel_2") - fin3: LED = self._get_led_from_name("fin_funnel_3") - fin4: LED = self._get_led_from_name("fin_funnel_4") - fin5: LED = self._get_led_from_name("fin_funnel_5") - fin6: LED = self._get_led_from_name("fin_funnel_6") - - LEDEffects.fire(fin1) - LEDEffects.fire(fin2) - LEDEffects.fire(fin3) - LEDEffects.fire(fin4) - LEDEffects.fire(fin5) - LEDEffects.fire(fin6) - - return Response("finished", 200) + for i in range(1, 7): + funnel = self._get_led_from_name(f"fin_funnel_{i}") + await LEDEffects.fire(funnel) - def random_funnels(self, request: Request) -> Response: + async def random_funnels(self) -> None: """ Randomly fires fin funnels that are enabled for an infinite amount of time This currently does not end and needs thread management to properly be able to be halted. @@ -63,12 +49,7 @@ def random_funnels(self, request: Request) -> Response: # Filter out funnels that are disabled. funnels = [funnel for funnel in funnels if funnel.enabled()] - if not funnels: - return Response("No funnels can be fired", 400) - while True: funnel = random.choice(funnels) - LEDEffects.charge_fire(funnel) - time.sleep(random.uniform(0, 3)) - - return Response("finished", 200) + await LEDEffects.charge_fire(funnel) + await uasyncio.sleep(random.uniform(0, 3)) diff --git a/src/gunpla/unicorn_banshee.py b/src/gunpla/unicorn_banshee.py index a869bf4..31879bc 100644 --- a/src/gunpla/unicorn_banshee.py +++ b/src/gunpla/unicorn_banshee.py @@ -1,7 +1,6 @@ -import time +import uasyncio from src.gunpla.base_gundam import BaseGundam -from src.phew.server import Request, Response from src.pi.led_effect import LEDEffects @@ -16,11 +15,10 @@ def get_config_file(self) -> str: """ return "src/config/unicorn_banshee.json" - def glow(self, request: Request) -> Response: + async def glow(self) -> None: """ Runs the glow lightshow """ - LEDEffects.brighten_all(self.get_all_leds()) - time.sleep(3) + await LEDEffects.brighten_all(self.get_all_leds()) + await uasyncio.sleep(3) self._all_leds_off() - return Response("finished", 200) diff --git a/src/hardware/Hardware.py b/src/hardware/Hardware.py new file mode 100644 index 0000000..2163619 --- /dev/null +++ b/src/hardware/Hardware.py @@ -0,0 +1,25 @@ + +from src.hardware.Networking import Networking +from src.pi.board_led import BoardLED + + +class Hardware: + """ + Hardware abstraction layer. + """ + + def get_pin(self, pin_num, mode): + raise NotImplementedError + + def get_pwm(self, pin_obj): + raise NotImplementedError + + def reset_pin(self, pin_num): + raise NotImplementedError + + @property + def networking(self) -> Networking: + raise NotImplementedError + + def board_led(self) -> BoardLED: + raise NotImplementedError diff --git a/src/hardware/Networking.py b/src/hardware/Networking.py new file mode 100644 index 0000000..6798760 --- /dev/null +++ b/src/hardware/Networking.py @@ -0,0 +1,44 @@ +import uasyncio + + +class Networking: + def __init__(self): + pass + + def configure_host(self, host_name: str) -> None: + """ + Configures the hostname of the Pi. + :param host_name: + :return: + """ + import network + + network.hostname(host_name) + print(f"Set hostname to {network.hostname()}") + + async def connect_to_wifi(self, ssid: str, password: str, attempts=10) -> str: + """ + Method to connect the pico to wifi. + + :param ssid: + :param password: + :param attempts: Number of attempts to connect before halting + :return: ip or raises exception + """ + import network + + print(f"Connecting to {ssid} with {password}") + + wlan = network.WLAN(network.STA_IF) + wlan.active(True) + wlan.connect(ssid, password) + + for _ in range(attempts): + if wlan.isconnected(): + print(f"Connected to {ssid}") + return wlan.ifconfig()[0] + # Wait to retry + await uasyncio.sleep(1) + + print("WiFi failed") + raise Exception("WiFi connection failed") diff --git a/src/hardware/PicoHardwre.py b/src/hardware/PicoHardwre.py new file mode 100644 index 0000000..8db8fb0 --- /dev/null +++ b/src/hardware/PicoHardwre.py @@ -0,0 +1,37 @@ +from src.hardware.Hardware import Hardware +from src.hardware.Networking import Networking +from src.pi.board_led import BoardLED + + +class PicoHardware(Hardware): + """ + Abstraction to access the Raspberry Pi Pico hardware. + """ + + def __init__(self): + from machine import PWM, Pin + self.Pin = Pin + self.PWM = PWM + self.networking = Networking() + self.board_led = BoardLED() + + def networking(self): + """ + :return: networking + """ + return self.networking + + def board_led(self): + return self.board_led + + def get_pin(self, pin_num, mode="OUT"): + # machine.Pin.OUT is an integer constant + m = self.Pin.OUT if mode == "OUT" else self.Pin.IN + return self.Pin(pin_num, m) + + def get_pwm(self, pin_obj): + return self.PWM(pin_obj) + + def reset_pin(self, pin_num): + """Re-initializes the pin to clear PWM settings""" + return self.get_pin(pin_num, mode="OUT") diff --git a/src/hardware/VirtualHardware.py b/src/hardware/VirtualHardware.py new file mode 100644 index 0000000..541103a --- /dev/null +++ b/src/hardware/VirtualHardware.py @@ -0,0 +1,84 @@ +import src.hardware +from src.hardware.Hardware import Hardware +from src.hardware.Networking import Networking +from src.pi.board_led import BoardLED + + +class VirtualHardware(Hardware): + """ + Virtual Hardware + Fake implementation so the webserver can run without a physical Raspberry Pi Pico connected to it. + """ + class MockPin: + """ + Partial implementation of Pico Pin, only using the currently needed methods + """ + + def __init__(self, num): + self.num = num + + def on(self): + print(f"[SIM] Pin {self.num} ON") + + def off(self): + print(f"[SIM] Pin {self.num} OFF") + + class MockPWM: + """ + Partial implementation of Pico PWM, only using the currently needed methods + """ + + def __init__(self, p): + self.p = p + + def freq(self, f): + pass + + def duty_u16(self, d): + print(f"[SIM] PWM {self.p.num} @ {d}") + + def deinit(self): + print(f"[SIM] PWM {self.p.num} De-initialized") + + class NoOpNetworking(Networking): + """ + Networking implementation that does nothing + """ + + def __init__(self): + pass + + async def connect_to_wifi(self, ssid: str, password: str, attempts=10) -> str or None: + return "123.123.123.123" + + def configure_host(self, host_name: str): + pass + + class MockBoardLED(BoardLED): + """ + Fake implementation of the onboard led. + """ + + def __init__(self): + self._pin = src.hardware.VirtualHardware.MockPin(1) + self.led_name = "Mock Board LED" + + def __init__(self): + self.pin = self.MockPin + self.pwm = self.MockPWM + + def get_pin(self, pin_num, mode="OUT"): + return self.pin(pin_num) + + def get_pwm(self, pin_obj): + return self.pwm(pin_obj) + + def board_led(self) -> BoardLED: + return self.MockBoardLED() + + @property + def networking(self) -> Networking: + return self.NoOpNetworking() + + def reset_pin(self, pin_num): + print(f"[SIM] Pin {pin_num} reset to standard GPIO") diff --git a/src/hardware/__init__.py b/src/hardware/__init__.py new file mode 100644 index 0000000..29d8e73 --- /dev/null +++ b/src/hardware/__init__.py @@ -0,0 +1,14 @@ +import sys + +from src.hardware.Hardware import Hardware +from src.hardware.PicoHardwre import PicoHardware +from src.hardware.VirtualHardware import VirtualHardware + + +def get_hardware() -> Hardware: + """ + :return: the appropriate hardware + """ + if sys.platform == 'rp2': + return PicoHardware() + return VirtualHardware() diff --git a/src/pi/LED.py b/src/pi/LED.py index f0e1272..54a750a 100644 --- a/src/pi/LED.py +++ b/src/pi/LED.py @@ -1,4 +1,3 @@ -from machine import Pin class LED: @@ -7,35 +6,37 @@ class LED: """ def __init__(self, pin_number: int, name: str): - self.pin: Pin = Pin(pin_number, Pin.OUT) - self.led_name = name + from machine import Pin + + self._pin: Pin = Pin(pin_number, Pin.OUT) + self._led_name = name def enabled(self) -> bool: - ''' - Returns false as the LED is not connected - ''' + """ + Returns true as the LED is connected + """ return True def on(self) -> None: """ Turns on the LED light """ - self.pin.on() + self._pin.on() def off(self): """ Turns off the LED light """ - self.pin.off() + self._pin.off() def name(self) -> str: """ :return: The name of LED """ - return self.led_name + return self._led_name - def pin(self) -> Pin: + def pin(self): """ :return: The underlying Raspberry Pi Pico Pin of the LED. """ - return self.pin + return self._pin diff --git a/src/pi/board_led.py b/src/pi/board_led.py index d99d5f3..068eeed 100644 --- a/src/pi/board_led.py +++ b/src/pi/board_led.py @@ -1,5 +1,3 @@ -from machine import Pin - from src.pi.LED import LED @@ -9,5 +7,7 @@ class BoardLED(LED): """ def __init__(self): # pylint # pylint: disable=(super-init-not-called - self.pin: Pin = Pin("LED", Pin.OUT) - self.led_name = "Board LED" + from machine import Pin + + self._pin: Pin = Pin("LED", Pin.OUT) + self._led_name = "Board LED" diff --git a/src/pi/led_effect.py b/src/pi/led_effect.py index 60fc4e0..59e1f63 100644 --- a/src/pi/led_effect.py +++ b/src/pi/led_effect.py @@ -1,7 +1,8 @@ import time -from machine import PWM +import uasyncio +import src.hardware from src.pi import LED @@ -11,44 +12,44 @@ class LEDEffects: """ @staticmethod - def blink(led: LED) -> None: + async def blink(led: LED) -> None: """ Blinks the onboard LED twice """ led.on() time.sleep(0.5) led.off() - time.sleep(0.5) + await uasyncio.sleep(0.5) led.on() - time.sleep(0.5) + await uasyncio.sleep(0.5) led.off() @staticmethod - def fire(led: LED) -> None: + async def fire(led: LED) -> None: """ A simple weapon effect of firing a beam rifle, has no charging effect :param led: :return: """ led.on() - time.sleep(.5) + await uasyncio.sleep(.5) led.off() @staticmethod - def charge_fire(led: LED, charge_speed: int = 1) -> None: + async def charge_fire(led: LED, charge_speed: int = 1) -> None: """ A simple charging of a shot """ - LEDEffects.brighten(led, start_percent=0, end_percent=75, speed=charge_speed) + await LEDEffects.brighten(led, start_percent=0, end_percent=75, speed=charge_speed) led.off() - time.sleep(0.5) + await uasyncio.sleep(0.5) # LEDEffects.brighten(led, start_percent=75, end_percent=100, speed=1) led.on() - time.sleep(2) + await uasyncio.sleep(2) led.off() @staticmethod - def brighten(led: LED, start_percent: int = 0, end_percent: int = 100, speed: int = 10) -> None: + async def brighten(led: LED, start_percent: int = 0, end_percent: int = 100, speed: int = 10) -> None: """ Starting from start_pct goes to end_pct over the course of speed, brightens led :param led: @@ -57,8 +58,7 @@ def brighten(led: LED, start_percent: int = 0, end_percent: int = 100, speed: in :param speed: :return: """ - - pwm = PWM(led.pin) + pwm = src.hardware.get_hardware().get_pwm(led.pin) pwm.freq(1000) step_rate = 10 @@ -70,18 +70,19 @@ def brighten(led: LED, start_percent: int = 0, end_percent: int = 100, speed: in for percent in range(start_percent, end_percent, step_rate): duty = int((percent / 100) * 65_535) pwm.duty_u16(duty) - time.sleep(sleep_time) + await uasyncio.sleep(sleep_time) pwm.deinit() @staticmethod - def brighten_all(leds: list[LED], start_percent: int = 0, end_percent: int = 100, speed: int = 10) -> None: + async def brighten_all(leds: list[LED], start_percent: int = 0, end_percent: int = 100, speed: int = 10) -> None: """ The current banshee amount of leds passed in causes it to I guess stack overflow and silently crash - around 30% so this method should not be used until that's addressed. + around 30% so this method should not be used until that's addressed. I also don't think i understand all there + is to PWM. """ pwms = [] for led in leds: - pwm = PWM(led.pin) + pwm = src.hardware.get_hardware().get_pwm(led.pin) pwm.freq(1000) pwms.append(pwm) @@ -96,7 +97,7 @@ def brighten_all(leds: list[LED], start_percent: int = 0, end_percent: int = 100 duty = int((percent / 100) * 65_535) for pwm in pwms: pwm.duty_u16(duty) - time.sleep(sleep_time) + await uasyncio.sleep(sleep_time) for pwm in pwms: pwm.deinit() diff --git a/src/server/RouteDecorator.py b/src/server/RouteDecorator.py new file mode 100644 index 0000000..18c5f80 --- /dev/null +++ b/src/server/RouteDecorator.py @@ -0,0 +1,31 @@ +import uasyncio + + +def lightshow_route(gunpla, manager_attr="current_task"): + """ + A decorator factory that handles task management and + standardized HTTP responses. + """ + def decorator(func): + async def wrapper(request, *args, **kwargs): + # If any existing lightshow is running, cancel it and turn off all the LEDs. + existing_task = getattr(gunpla, manager_attr, None) + if existing_task and not existing_task.done(): + existing_task.cancel() + try: + gunpla.all_off() + await existing_task # Wait for cleanup + except uasyncio.CancelledError: + pass + + # Start the new show and track it + task = uasyncio.create_task(func()) + setattr(gunpla, manager_attr, task) + + # Return common HTTP response that the show started. + return { + "status": "started", + "show": func.__name__, + }, 202 + return wrapper + return decorator diff --git a/src/server/Wrappers.py b/src/server/Wrappers.py new file mode 100644 index 0000000..6261f45 --- /dev/null +++ b/src/server/Wrappers.py @@ -0,0 +1,37 @@ +from src.server.RouteDecorator import lightshow_route + + +def safe_execution(func): + """ + Wraps an async route handler with a try/except block. + Returns 500 on failure, passes through success. + """ + + async def wrapper(*args, **kwargs): + try: + print(f"Trying to execute {func.__name__}") + await func(*args, **kwargs) + return {"status": "success", "action": func.__name__}, 202 + except Exception as e: + # Log the error to console + print(f"Server Error in {func.__name__}: {e}") + return {"error": str(e)}, 500 + + return wrapper + + +def create_show_handler(func, gundam_instance): + """ + Helper that when given a function, wraps it as a lighthow_route and safe_execution. + :param gundam_instance: + if needed we can add back in the request obj to show_handler and func(request) + :param func: + :return: + """ + # note order matters for these + @lightshow_route(gunpla=gundam_instance) + @safe_execution + async def show_handler(): + return await func() + + return show_handler diff --git a/src/server/__init__.py b/src/server/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/server/microdot/Microdot.py b/src/server/microdot/Microdot.py new file mode 100644 index 0000000..33e402b --- /dev/null +++ b/src/server/microdot/Microdot.py @@ -0,0 +1,1555 @@ +""" +microdot +-------- + +The ``microdot`` module defines a few classes that help implement HTTP-based +servers for MicroPython and standard Python. +""" +import io +import re +import time + +import uasyncio + +try: + import orjson as json +except ImportError: + import json + +try: + from functools import partial + from inspect import iscoroutine, iscoroutinefunction + + async def invoke_handler(handler, *args, **kwargs): + """Invoke a handler and return the result. + + This method runs sync handlers in a thread pool executor. + """ + if iscoroutinefunction(handler): + ret = await handler(*args, **kwargs) + else: + ret = await uasyncio.get_running_loop().run_in_executor( + None, partial(handler, *args, **kwargs)) + return ret +except ImportError: # pragma: no cover + def iscoroutine(coro): + return hasattr(coro, 'send') and hasattr(coro, 'throw') + + async def invoke_handler(handler, *args, **kwargs): + """Invoke a handler and return the result. + + This method runs sync handlers in the uasyncio thread, which can + potentially cause blocking and performance issues. + """ + ret = handler(*args, **kwargs) + if iscoroutine(ret): + ret = await ret + return ret + +try: + from sys import print_exception +except ImportError: # pragma: no cover + import traceback + + def print_exception(exc): + traceback.print_exc() + +MUTED_SOCKET_ERRORS = [ + 32, # Broken pipe + 54, # Connection reset by peer + 104, # Connection reset by peer + 128, # Operation on closed socket +] + + +def urldecode(s): + if isinstance(s, str): + s = s.encode() + s = s.replace(b'+', b' ') + parts = s.split(b'%') + if len(parts) == 1: + return s.decode() + result = [parts[0]] + for item in parts[1:]: + if item == b'': + result.append(b'%') + else: + code = item[:2] + result.append(bytes([int(code, 16)])) + result.append(item[2:]) + return b''.join(result).decode() + + +def urlencode(s): + return s.replace('+', '%2B').replace(' ', '+').replace( + '%', '%25').replace('?', '%3F').replace('#', '%23').replace( + '&', '%26').replace('=', '%3D') + + +class NoCaseDict(dict): + """A subclass of dictionary that holds case-insensitive keys. + + :param initial_dict: an initial dictionary of key/value pairs to + initialize this object with. + + Example:: + + >>> d = NoCaseDict() + >>> d['Content-Type'] = 'text/html' + >>> print(d['Content-Type']) + text/html + >>> print(d['content-type']) + text/html + >>> print(d['CONTENT-TYPE']) + text/html + >>> del d['cOnTeNt-TyPe'] + >>> print(d) + {} + """ + + def __init__(self, initial_dict=None): + super().__init__(initial_dict or {}) + self.keymap = {k.lower(): k for k in self.keys() if k.lower() != k} + + def __setitem__(self, key, value): + kl = key.lower() + key = self.keymap.get(kl, key) + if kl != key: + self.keymap[kl] = key + super().__setitem__(key, value) + + def __getitem__(self, key): + kl = key.lower() + return super().__getitem__(self.keymap.get(kl, kl)) + + def __delitem__(self, key): + kl = key.lower() + super().__delitem__(self.keymap.get(kl, kl)) + + def __contains__(self, key): + kl = key.lower() + return self.keymap.get(kl, kl) in self.keys() + + def get(self, key, default=None): + kl = key.lower() + return super().get(self.keymap.get(kl, kl), default) + + def update(self, other_dict): + for key, value in other_dict.items(): + self[key] = value + + +def mro(cls): # pragma: no cover + """Return the method resolution order of a class. + + This is a helper function that returns the method resolution order of a + class. It is used by Microdot to find the best error handler to invoke for + the raised exception. + + In CPython, this function returns the ``__mro__`` attribute of the class. + In MicroPython, this function implements a recursive depth-first scanning + of the class hierarchy. + """ + if hasattr(cls, 'mro'): + return cls.__mro__ + + def _mro(cls): + m = [cls] + for base in cls.__bases__: + m += _mro(base) + return m + + mro_list = _mro(cls) + + # If a class appears multiple times (due to multiple inheritance) remove + # all but the last occurence. This matches the method resolution order + # of MicroPython, but not CPython. + mro_pruned = [] + for i in range(len(mro_list)): + base = mro_list.pop(0) + if base not in mro_list: + mro_pruned.append(base) + return mro_pruned + + +class MultiDict(dict): + """A subclass of dictionary that can hold multiple values for the same + key. It is used to hold key/value pairs decoded from query strings and + form submissions. + + :param initial_dict: an initial dictionary of key/value pairs to + initialize this object with. + + Example:: + + >>> d = MultiDict() + >>> d['sort'] = 'name' + >>> d['sort'] = 'email' + >>> print(d['sort']) + 'name' + >>> print(d.getlist('sort')) + ['name', 'email'] + """ + + def __init__(self, initial_dict=None): + super().__init__() + if initial_dict: + for key, value in initial_dict.items(): + self[key] = value + + def __setitem__(self, key, value): + if key not in self: + super().__setitem__(key, []) + super().__getitem__(key).append(value) + + def __getitem__(self, key): + return super().__getitem__(key)[0] + + def get(self, key, default=None, type=None): + """Return the value for a given key. + + :param key: The key to retrieve. + :param default: A default value to use if the key does not exist. + :param type: A type conversion callable to apply to the value. + + If the multidict contains more than one value for the requested key, + this method returns the first value only. + + Example:: + + >>> d = MultiDict() + >>> d['age'] = '42' + >>> d.get('age') + '42' + >>> d.get('age', type=int) + 42 + >>> d.get('name', default='noname') + 'noname' + """ + if key not in self: + return default + value = self[key] + if type is not None: + value = type(value) + return value + + def getlist(self, key, type=None): + """Return all the values for a given key. + + :param key: The key to retrieve. + :param type: A type conversion callable to apply to the values. + + If the requested key does not exist in the dictionary, this method + returns an empty list. + + Example:: + + >>> d = MultiDict() + >>> d.getlist('items') + [] + >>> d['items'] = '3' + >>> d.getlist('items') + ['3'] + >>> d['items'] = '56' + >>> d.getlist('items') + ['3', '56'] + >>> d.getlist('items', type=int) + [3, 56] + """ + if key not in self: + return [] + values = super().__getitem__(key) + if type is not None: + values = [type(value) for value in values] + return values + + +class AsyncBytesIO: + """An async wrapper for BytesIO.""" + + def __init__(self, data): + self.stream = io.BytesIO(data) + + async def read(self, n=-1): + return self.stream.read(n) + + async def readline(self): # pragma: no cover + return self.stream.readline() + + async def readexactly(self, n): # pragma: no cover + return self.stream.read(n) + + async def readuntil(self, separator=b'\n'): # pragma: no cover + return self.stream.readuntil(separator=separator) + + async def awrite(self, data): # pragma: no cover + return self.stream.write(data) + + async def aclose(self): # pragma: no cover + pass + + +class Request: + """An HTTP request.""" + #: Specify the maximum payload size that is accepted. Requests with larger + #: payloads will be rejected with a 413 status code. Applications can + #: change this maximum as necessary. + #: + #: Example:: + #: + #: Request.max_content_length = 1 * 1024 * 1024 # 1MB requests allowed + max_content_length = 16 * 1024 + + #: Specify the maximum payload size that can be stored in ``body``. + #: Requests with payloads that are larger than this size and up to + #: ``max_content_length`` bytes will be accepted, but the application will + #: only be able to access the body of the request by reading from + #: ``stream``. Set to 0 if you always access the body as a stream. + #: + #: Example:: + #: + #: Request.max_body_length = 4 * 1024 # up to 4KB bodies read + max_body_length = 16 * 1024 + + #: Specify the maximum length allowed for a line in the request. Requests + #: with longer lines will not be correctly interpreted. Applications can + #: change this maximum as necessary. + #: + #: Example:: + #: + #: Request.max_readline = 16 * 1024 # 16KB lines allowed + max_readline = 2 * 1024 + + class G: + pass + + def __init__(self, app, client_addr, method, url, http_version, headers, + body=None, stream=None, sock=None, url_prefix='', + subapp=None, scheme=None, route=None): + #: The application instance to which this request belongs. + self.app = app + #: The address of the client, as a tuple (host, port). + self.client_addr = client_addr + #: The HTTP method of the request. + self.method = method + #: The scheme of the request, either `http` or `https`. + self.scheme = scheme or 'http' + #: The request URL, including the path and query string, but not the + #: scheme or the host, which is available in the ``Host`` header. + self.url = url + #: The URL prefix, if the endpoint comes from a mounted + #: sub-application, or else ''. + self.url_prefix = url_prefix + #: The sub-application instance, or `None` if this isn't a mounted + #: endpoint. + self.subapp = subapp + #: The route function that handles this request. + self.route = route + #: The path portion of the URL. + self.path = url + #: The query string portion of the URL. + self.query_string = None + #: The parsed query string, as a + #: :class:`MultiDict ` object. + self.args = {} + #: A dictionary with the headers included in the request. + self.headers = headers + #: A dictionary with the cookies included in the request. + self.cookies = {} + #: The parsed ``Content-Length`` header. + self.content_length = 0 + #: The parsed ``Content-Type`` header. + self.content_type = None + #: A general purpose container for applications to store data during + #: the life of the request. + self.g = Request.G() + + self.http_version = http_version + if '?' in self.path: + self.path, self.query_string = self.path.split('?', 1) + self.args = self._parse_urlencoded(self.query_string) + + if 'Content-Length' in self.headers: + self.content_length = int(self.headers['Content-Length']) + if 'Content-Type' in self.headers: + self.content_type = self.headers['Content-Type'] + if 'Cookie' in self.headers: + for cookie in self.headers['Cookie'].split(';'): + c = cookie.strip().split('=', 1) + self.cookies[c[0]] = c[1] if len(c) > 1 else '' + + self._body = body + self.body_used = False + self._stream = stream + self.sock = sock + self._json = None + self._form = None + self._files = None + self.after_request_handlers = [] + + @staticmethod + async def create(app, client_reader, client_writer, client_addr, + scheme=None): + """Create a request object. + + :param app: The Microdot application instance. + :param client_reader: An input stream from where the request data can + be read. + :param client_writer: An output stream where the response data can be + written. + :param client_addr: The address of the client, as a tuple. + :param scheme: The scheme of the request, either 'http' or 'https'. + + This method is a coroutine. It returns a newly created ``Request`` + object. + """ + # request line + line = (await Request._safe_readline(client_reader)).strip().decode() + if not line: # pragma: no cover + return None + method, url, http_version = line.split() + http_version = http_version.split('/', 1)[1] + + # headers + headers = NoCaseDict() + content_length = 0 + while True: + line = (await Request._safe_readline( + client_reader)).strip().decode() + if line == '': + break + header, value = line.split(':', 1) + value = value.strip() + headers[header] = value + if header.lower() == 'content-length': + content_length = int(value) + + # body + body = b'' + if content_length and content_length <= Request.max_body_length: + body = await client_reader.readexactly(content_length) + stream = None + else: + body = b'' + stream = client_reader + + return Request(app, client_addr, method, url, http_version, headers, + body=body, stream=stream, + sock=(client_reader, client_writer), scheme=scheme) + + def _parse_urlencoded(self, urlencoded): + data = MultiDict() + if len(urlencoded) > 0: # pragma: no branch + if isinstance(urlencoded, str): + for kv in [pair.split('=', 1) + for pair in urlencoded.split('&') if pair]: + data[urldecode(kv[0])] = urldecode(kv[1]) \ + if len(kv) > 1 else '' + elif isinstance(urlencoded, bytes): # pragma: no branch + for kv in [pair.split(b'=', 1) + for pair in urlencoded.split(b'&') if pair]: + data[urldecode(kv[0])] = urldecode(kv[1]) \ + if len(kv) > 1 else b'' + return data + + @property + def body(self): + """The body of the request, as bytes.""" + return self._body + + @property + def stream(self): + """The body of the request, as a bytes stream.""" + if self._stream is None: + self._stream = AsyncBytesIO(self._body) + return self._stream + + @property + def json(self): + """The parsed JSON body, or ``None`` if the request does not have a + JSON body.""" + if self._json is None: + if self.content_type is None: + return None + mime_type = self.content_type.split(';')[0] + if mime_type != 'application/json': + return None + self._json = json.loads(self.body.decode()) + return self._json + + @property + def form(self): + """The parsed form submission body, as a + :class:`MultiDict ` object, or ``None`` if the + request does not have a form submission. + + Forms that are URL encoded are processed by default. For multipart + forms to be processed, the + :func:`with_form_data ` + decorator must be added to the route. + """ + if self._form is None: + if self.content_type is None: + return None + mime_type = self.content_type.split(';')[0] + if mime_type != 'application/x-www-form-urlencoded': + return None + self._form = self._parse_urlencoded(self.body) + return self._form + + @property + def files(self): + """The files uploaded in the request as a dictionary, or ``None`` if + the request does not have any files. + + The :func:`with_form_data ` + decorator must be added to the route that receives file uploads for + this property to be set. + """ + return self._files + + def after_request(self, f): + """Register a request-specific function to run after the request is + handled. Request-specific after request handlers run at the very end, + after the application's own after request handlers. The function must + take two arguments, the request and response objects. The return value + of the function must be the updated response object. + + Example:: + + @app.route('/') + def index(request): + # register a request-specific after request handler + @req.after_request + def func(request, response): + # ... + return response + + return 'Hello, World!' + + Note that the function is not called if the request handler raises an + exception and an error response is returned instead. + """ + self.after_request_handlers.append(f) + return f + + @staticmethod + async def _safe_readline(stream): + line = (await stream.readline()) + if len(line) > Request.max_readline: + raise ValueError('line too long') + return line + + +class Response: + """An HTTP response class. + + :param body: The body of the response. If a dictionary or list is given, + a JSON formatter is used to generate the body. If a file-like + object or an async generator is given, a streaming response is + used. If a string is given, it is encoded from UTF-8. Else, + the body should be a byte sequence. + :param status_code: The numeric HTTP status code of the response. The + default is 200. + :param headers: A dictionary of headers to include in the response. + :param reason: A custom reason phrase to add after the status code. The + default is "OK" for responses with a 200 status code and + "N/A" for any other status codes. + """ + types_map = { + 'css': 'text/css', + 'gif': 'image/gif', + 'html': 'text/html', + 'jpg': 'image/jpeg', + 'js': 'application/javascript', + 'json': 'application/json', + 'png': 'image/png', + 'txt': 'text/plain', + 'svg': 'image/svg+xml', + } + + send_file_buffer_size = 1024 + + #: The content type to use for responses that do not explicitly define a + #: ``Content-Type`` header. + default_content_type = 'text/plain' + + #: The default cache control max age used by :meth:`send_file`. A value + #: of ``None`` means that no ``Cache-Control`` header is added. + default_send_file_max_age = None + + #: Special response used to signal that a response does not need to be + #: written to the client. Used to exit WebSocket connections cleanly. + already_handled = None + + def __init__(self, body='', status_code=200, headers=None, reason=None): + if body is None and status_code == 200: + body = '' + status_code = 204 + self.status_code = status_code + self.headers = NoCaseDict(headers or {}) + self.reason = reason + if isinstance(body, (dict, list)): + body = json.dumps(body) + self.headers['Content-Type'] = 'application/json; charset=UTF-8' + if isinstance(body, str): + self.body = body.encode() + else: + # this applies to bytes, file-like objects or generators + self.body = body + self.is_head = False + + def set_cookie(self, cookie, value, path=None, domain=None, expires=None, + max_age=None, secure=False, http_only=False, + partitioned=False): + """Add a cookie to the response. + + :param cookie: The cookie's name. + :param value: The cookie's value. + :param path: The cookie's path. + :param domain: The cookie's domain. + :param expires: The cookie expiration time, as a ``datetime`` object + or a correctly formatted string. + :param max_age: The cookie's ``Max-Age`` value. + :param secure: The cookie's ``secure`` flag. + :param http_only: The cookie's ``HttpOnly`` flag. + :param partitioned: Whether the cookie is partitioned. + """ + http_cookie = '{cookie}={value}'.format(cookie=cookie, value=value) + if path: + http_cookie += '; Path=' + path + if domain: + http_cookie += '; Domain=' + domain + if expires: + if isinstance(expires, str): + http_cookie += '; Expires=' + expires + else: # pragma: no cover + http_cookie += '; Expires=' + time.strftime( + '%a, %d %b %Y %H:%M:%S GMT', expires.timetuple()) + if max_age is not None: + http_cookie += '; Max-Age=' + str(max_age) + if secure: + http_cookie += '; Secure' + if http_only: + http_cookie += '; HttpOnly' + if partitioned: + http_cookie += '; Partitioned' + if 'Set-Cookie' in self.headers: + self.headers['Set-Cookie'].append(http_cookie) + else: + self.headers['Set-Cookie'] = [http_cookie] + + def delete_cookie(self, cookie, **kwargs): + """Delete a cookie. + + :param cookie: The cookie's name. + :param kwargs: Any cookie options and flags supported by + :meth:`set_cookie() `. + Values given for ``expires`` and ``max_age`` are + ignored. + """ + kwargs.pop('expires', None) + kwargs.pop('max_age', None) + self.set_cookie(cookie, '', expires='Thu, 01 Jan 1970 00:00:01 GMT', + max_age=0, **kwargs) + + def complete(self): + if isinstance(self.body, bytes) and \ + 'Content-Length' not in self.headers: + self.headers['Content-Length'] = str(len(self.body)) + if 'Content-Type' not in self.headers: + self.headers['Content-Type'] = self.default_content_type + if 'charset=' not in self.headers['Content-Type']: + self.headers['Content-Type'] += '; charset=UTF-8' + + async def write(self, stream): + self.complete() + + try: + # status code + reason = self.reason if self.reason is not None else \ + ('OK' if self.status_code == 200 else 'N/A') + await stream.awrite('HTTP/1.0 {status_code} {reason}\r\n'.format( + status_code=self.status_code, reason=reason).encode()) + + # headers + for header, value in self.headers.items(): + values = value if isinstance(value, list) else [value] + for value in values: + await stream.awrite('{header}: {value}\r\n'.format( + header=header, value=value).encode()) + await stream.awrite(b'\r\n') + + # body + if not self.is_head: + iter = self.body_iter() + async for body in iter: + if isinstance(body, str): # pragma: no cover + body = body.encode() + try: + await stream.awrite(body) + except OSError as exc: # pragma: no cover + if exc.errno in MUTED_SOCKET_ERRORS or \ + exc.args[0] == 'Connection lost': + if hasattr(iter, 'aclose'): + await iter.aclose() + raise + if hasattr(iter, 'aclose'): # pragma: no branch + await iter.aclose() + + except OSError as exc: # pragma: no cover + if exc.errno in MUTED_SOCKET_ERRORS or \ + exc.args[0] == 'Connection lost': + pass + else: + raise + + def body_iter(self): + if hasattr(self.body, '__anext__'): + # response body is an async generator + return self.body + + response = self + + class iter: + ITER_UNKNOWN = 0 + ITER_SYNC_GEN = 1 + ITER_FILE_OBJ = 2 + ITER_NO_BODY = -1 + + def __aiter__(self): + if response.body: + self.i = self.ITER_UNKNOWN # need to determine type + else: + self.i = self.ITER_NO_BODY + return self + + async def __anext__(self): + if self.i == self.ITER_NO_BODY: + await self.aclose() + raise StopAsyncIteration + if self.i == self.ITER_UNKNOWN: + if hasattr(response.body, 'read'): + self.i = self.ITER_FILE_OBJ + elif hasattr(response.body, '__next__'): + self.i = self.ITER_SYNC_GEN + return next(response.body) + else: + self.i = self.ITER_NO_BODY + return response.body + elif self.i == self.ITER_SYNC_GEN: + try: + return next(response.body) + except StopIteration: + await self.aclose() + raise StopAsyncIteration + buf = response.body.read(response.send_file_buffer_size) + if iscoroutine(buf): # pragma: no cover + buf = await buf + if len(buf) < response.send_file_buffer_size: + self.i = self.ITER_NO_BODY + return buf + + async def aclose(self): + if hasattr(response.body, 'close'): + result = response.body.close() + if iscoroutine(result): # pragma: no cover + await result + + return iter() + + @classmethod + def redirect(cls, location, status_code=302): + """Return a redirect response. + + :param location: The URL to redirect to. + :param status_code: The 3xx status code to use for the redirect. The + default is 302. + """ + if '\x0d' in location or '\x0a' in location: + raise ValueError('invalid redirect URL') + return cls(status_code=status_code, headers={'Location': location}) + + @classmethod + def send_file(cls, filename, status_code=200, content_type=None, + stream=None, max_age=None, compressed=False, + file_extension=''): + """Send file contents in a response. + + :param filename: The filename of the file. + :param status_code: The 3xx status code to use for the redirect. The + default is 200. + :param content_type: The ``Content-Type`` header to use in the + response. If omitted, it is generated + automatically from the file extension of the + ``filename`` parameter. + :param stream: A file-like object to read the file contents from. If + a stream is given, the ``filename`` parameter is only + used when generating the ``Content-Type`` header. + :param max_age: The ``Cache-Control`` header's ``max-age`` value in + seconds. If omitted, the value of the + :attr:`Response.default_send_file_max_age` attribute is + used. + :param compressed: Whether the file is compressed. If ``True``, the + ``Content-Encoding`` header is set to ``gzip``. A + string with the header value can also be passed. + Note that when using this option the file must have + been compressed beforehand. This option only sets + the header. + :param file_extension: A file extension to append to the ``filename`` + parameter when opening the file, including the + dot. The extension given here is not considered + when generating the ``Content-Type`` header. + + Security note: The filename is assumed to be trusted. Never pass + filenames provided by the user without validating and sanitizing them + first. + """ + if content_type is None: + if compressed and filename.endswith('.gz'): + ext = filename[:-3].split('.')[-1] + else: + ext = filename.split('.')[-1] + if ext in Response.types_map: + content_type = Response.types_map[ext] + else: + content_type = 'application/octet-stream' + headers = {'Content-Type': content_type} + + if max_age is None: + max_age = cls.default_send_file_max_age + if max_age is not None: + headers['Cache-Control'] = 'max-age={}'.format(max_age) + + if compressed: + headers['Content-Encoding'] = compressed \ + if isinstance(compressed, str) else 'gzip' + + f = stream or open(filename + file_extension, 'rb') + return cls(body=f, status_code=status_code, headers=headers) + + +class URLPattern(): + """A class that represents the URL pattern for a route. + + :param url_pattern: The route URL pattern, which can include static and + dynamic path segments. Dynamic segments are enclosed in + ``<`` and ``>``. The type of the segment can be given + as a prefix, separated from the name with a colon. + Supported types are ``string`` (the default), + ``int`` and ``path``. Custom types can be registered + using the :meth:`URLPattern.register_type` method. + """ + + segment_patterns = { + 'string': '/([^/]+)', + 'int': '/(-?\\d+)', + 'path': '/(.+)', + } + segment_parsers = { + 'int': lambda value: int(value), + } + + @classmethod + def register_type(cls, type_name, pattern='[^/]+', parser=None): + """Register a new URL segment type. + + :param type_name: The name of the segment type to register. + :param pattern: The regular expression pattern to use when matching + this segment type. If not given, a default matcher for + a single path segment is used. + :param parser: A callable that will be used to parse and transform the + value of the segment. If omitted, the value is returned + as a string. + """ + cls.segment_patterns[type_name] = '/({})'.format(pattern) + cls.segment_parsers[type_name] = parser + + def __init__(self, url_pattern): + self.url_pattern = url_pattern + self.segments = [] + self.regex = None + + def compile(self): + """Generate a regular expression for the URL pattern. + + This method is automatically invoked the first time the URL pattern is + matched against a path. + """ + pattern = '' + for segment in self.url_pattern.lstrip('/').split('/'): + if segment and segment[0] == '<': + if segment[-1] != '>': + raise ValueError('invalid URL pattern') + segment = segment[1:-1] + if ':' in segment: + type_, name = segment.rsplit(':', 1) + else: + type_ = 'string' + name = segment + parser = None + if type_.startswith('re:'): + pattern += '/({pattern})'.format(pattern=type_[3:]) + else: + if type_ not in self.segment_patterns: + raise ValueError('invalid URL segment type') + pattern += self.segment_patterns[type_] + parser = self.segment_parsers.get(type_) + self.segments.append({'parser': parser, 'name': name, + 'type': type_}) + else: + pattern += '/' + segment + self.segments.append({'parser': None}) + self.regex = re.compile('^' + pattern + '$') + return self.regex + + def match(self, path): + """Match a path against the URL pattern. + + Returns a dictionary with the values of all dynamic path segments if a + matche is found, or ``None`` if the path does not match this pattern. + """ + args = {} + g = (self.regex or self.compile()).match(path) + if not g: + return + i = 1 + for segment in self.segments: + if 'name' not in segment: + continue + arg = g.group(i) + if segment['parser']: + arg = self.segment_parsers[segment['type']](arg) + if arg is None: + return + args[segment['name']] = arg + i += 1 + return args + + def __repr__(self): # pragma: no cover + return 'URLPattern: {}'.format(self.url_pattern) + + +class HTTPException(Exception): + def __init__(self, status_code, reason=None): + self.status_code = status_code + self.reason = reason or str(status_code) + ' error' + + def __repr__(self): # pragma: no cover + return 'HTTPException: {}'.format(self.status_code) + + +class Microdot: + """An HTTP application class. + + This class implements an HTTP application instance and is heavily + influenced by the ``Flask`` class of the Flask framework. It is typically + declared near the start of the main application script. + + Example:: + + from microdot import Microdot + + app = Microdot() + """ + + def __init__(self): + self.url_map = [] + self.before_request_handlers = [] + self.after_request_handlers = [] + self.after_error_request_handlers = [] + self.error_handlers = {} + self.options_handler = self.default_options_handler + self.ssl = False + self.debug = False + self.server = None + + def route(self, url_pattern, methods=None): + """Decorator that is used to register a function as a request handler + for a given URL. + + :param url_pattern: The URL pattern that will be compared against + incoming requests. + :param methods: The list of HTTP methods to be handled by the + decorated function. If omitted, only ``GET`` requests + are handled. + + The URL pattern can be a static path (for example, ``/users`` or + ``/api/invoices/search``) or a path with dynamic components enclosed + in ``<`` and ``>`` (for example, ``/users/`` or + ``/invoices//products``). Dynamic path components can also + include a type prefix, separated from the name with a colon (for + example, ``/users/``). The type can be ``string`` (the + default), ``int``, ``path`` or ``re:[regular-expression]``. + + The first argument of the decorated function must be + the request object. Any path arguments that are specified in the URL + pattern are passed as keyword arguments. The return value of the + function must be a :class:`Response` instance, or the arguments to + be passed to this class. + + Example:: + + @app.route('/') + def index(request): + return 'Hello, world!' + """ + def decorated(f): + self.url_map.append( + ([m.upper() for m in (methods or ['GET'])], + URLPattern(url_pattern), f, '', None)) + return f + return decorated + + def get(self, url_pattern): + """Decorator that is used to register a function as a ``GET`` request + handler for a given URL. + + :param url_pattern: The URL pattern that will be compared against + incoming requests. + + This decorator can be used as an alias to the ``route`` decorator with + ``methods=['GET']``. + + Example:: + + @app.get('/users/') + def get_user(request, id): + # ... + """ + return self.route(url_pattern, methods=['GET']) + + def post(self, url_pattern): + """Decorator that is used to register a function as a ``POST`` request + handler for a given URL. + + :param url_pattern: The URL pattern that will be compared against + incoming requests. + + This decorator can be used as an alias to the``route`` decorator with + ``methods=['POST']``. + + Example:: + + @app.post('/users') + def create_user(request): + # ... + """ + return self.route(url_pattern, methods=['POST']) + + def put(self, url_pattern): + """Decorator that is used to register a function as a ``PUT`` request + handler for a given URL. + + :param url_pattern: The URL pattern that will be compared against + incoming requests. + + This decorator can be used as an alias to the ``route`` decorator with + ``methods=['PUT']``. + + Example:: + + @app.put('/users/') + def edit_user(request, id): + # ... + """ + return self.route(url_pattern, methods=['PUT']) + + def patch(self, url_pattern): + """Decorator that is used to register a function as a ``PATCH`` request + handler for a given URL. + + :param url_pattern: The URL pattern that will be compared against + incoming requests. + + This decorator can be used as an alias to the ``route`` decorator with + ``methods=['PATCH']``. + + Example:: + + @app.patch('/users/') + def edit_user(request, id): + # ... + """ + return self.route(url_pattern, methods=['PATCH']) + + def delete(self, url_pattern): + """Decorator that is used to register a function as a ``DELETE`` + request handler for a given URL. + + :param url_pattern: The URL pattern that will be compared against + incoming requests. + + This decorator can be used as an alias to the ``route`` decorator with + ``methods=['DELETE']``. + + Example:: + + @app.delete('/users/') + def delete_user(request, id): + # ... + """ + return self.route(url_pattern, methods=['DELETE']) + + def before_request(self, f): + """Decorator to register a function to run before each request is + handled. The decorated function must take a single argument, the + request object. + + Example:: + + @app.before_request + def func(request): + # ... + """ + self.before_request_handlers.append(f) + return f + + def after_request(self, f): + """Decorator to register a function to run after each request is + handled. The decorated function must take two arguments, the request + and response objects. The return value of the function must be an + updated response object. + + Example:: + + @app.after_request + def func(request, response): + # ... + return response + """ + self.after_request_handlers.append(f) + return f + + def after_error_request(self, f): + """Decorator to register a function to run after an error response is + generated. The decorated function must take two arguments, the request + and response objects. The return value of the function must be an + updated response object. The handler is invoked for error responses + generated by Microdot, as well as those returned by application-defined + error handlers. + + Example:: + + @app.after_error_request + def func(request, response): + # ... + return response + """ + self.after_error_request_handlers.append(f) + return f + + def errorhandler(self, status_code_or_exception_class): + """Decorator to register a function as an error handler. Error handler + functions for numeric HTTP status codes must accept a single argument, + the request object. Error handler functions for Python exceptions + must accept two arguments, the request object and the exception + object. + + :param status_code_or_exception_class: The numeric HTTP status code or + Python exception class to + handle. + + Examples:: + + @app.errorhandler(404) + def not_found(request): + return 'Not found' + + @app.errorhandler(RuntimeError) + def runtime_error(request, exception): + return 'Runtime error' + """ + def decorated(f): + self.error_handlers[status_code_or_exception_class] = f + return f + return decorated + + def mount(self, subapp, url_prefix='', local=False): + """Mount a sub-application, optionally under the given URL prefix. + + :param subapp: The sub-application to mount. + :param url_prefix: The URL prefix to mount the application under. + :param local: When set to ``True``, the before, after and error request + handlers only apply to endpoints defined in the + sub-application. When ``False``, they apply to the entire + application. The default is ``False``. + """ + for methods, pattern, handler, _prefix, _subapp in subapp.url_map: + self.url_map.append( + (methods, URLPattern(url_prefix + pattern.url_pattern), + handler, url_prefix + _prefix, _subapp or subapp)) + if not local: + for handler in subapp.before_request_handlers: + self.before_request_handlers.append(handler) + subapp.before_request_handlers = [] + for handler in subapp.after_request_handlers: + self.after_request_handlers.append(handler) + subapp.after_request_handlers = [] + for handler in subapp.after_error_request_handlers: + self.after_error_request_handlers.append(handler) + subapp.after_error_request_handlers = [] + for status_code, handler in subapp.error_handlers.items(): + self.error_handlers[status_code] = handler + subapp.error_handlers = {} + + @staticmethod + def abort(status_code, reason=None): + """Abort the current request and return an error response with the + given status code. + + :param status_code: The numeric status code of the response. + :param reason: The reason for the response, which is included in the + response body. + + Example:: + + from microdot import abort + + @app.route('/users/') + def get_user(id): + user = get_user_by_id(id) + if user is None: + abort(404) + return user.to_dict() + """ + raise HTTPException(status_code, reason) + + async def start_server(self, host='0.0.0.0', port=5000, debug=False, + ssl=None): + """Start the Microdot web server as a coroutine. This coroutine does + not normally return, as the server enters an endless listening loop. + The :func:`shutdown` function provides a method for terminating the + server gracefully. + + :param host: The hostname or IP address of the network interface that + will be listening for requests. A value of ``'0.0.0.0'`` + (the default) indicates that the server should listen for + requests on all the available interfaces, and a value of + ``127.0.0.1`` indicates that the server should listen + for requests only on the internal networking interface of + the host. + :param port: The port number to listen for requests. The default is + port 5000. + :param debug: If ``True``, the server logs debugging information. The + default is ``False``. + :param ssl: An ``SSLContext`` instance or ``None`` if the server should + not use TLS. The default is ``None``. + + This method is a coroutine. + + Example:: + + import uasyncio + from microdot import Microdot + + app = Microdot() + + @app.route('/') + async def index(request): + return 'Hello, world!' + + async def main(): + await app.start_server(debug=True) + + uasyncio.run(main()) + """ + self.ssl = ssl + self.debug = debug + + async def serve(reader, writer): + if not hasattr(writer, 'awrite'): # pragma: no cover + # CPython provides the awrite and aclose methods in 3.8+ + async def awrite(self, data): + self.write(data) + await self.drain() + + async def aclose(self): + self.close() + await self.wait_closed() + + from types import MethodType + writer.awrite = MethodType(awrite, writer) + writer.aclose = MethodType(aclose, writer) + + await self.handle_request(reader, writer) + + if self.debug: # pragma: no cover + print('Starting async server on {host}:{port}...'.format( + host=host, port=port)) + + try: + self.server = await uasyncio.start_server(serve, host, port, + ssl=ssl) + except TypeError: # pragma: no cover + self.server = await uasyncio.start_server(serve, host, port) + + while True: + try: + if hasattr(self.server, 'serve_forever'): # pragma: no cover + try: + await self.server.serve_forever() + except uasyncio.CancelledError: + pass + await self.server.wait_closed() + break + except AttributeError: # pragma: no cover + # the task hasn't been initialized in the server object yet + # wait a bit and try again + await uasyncio.sleep(0.1) + + def run(self, host='0.0.0.0', port=5000, debug=False, ssl=None): + """Start the web server. This function does not normally return, as + the server enters an endless listening loop. The :func:`shutdown` + function provides a method for terminating the server gracefully. + + :param host: The hostname or IP address of the network interface that + will be listening for requests. A value of ``'0.0.0.0'`` + (the default) indicates that the server should listen for + requests on all the available interfaces, and a value of + ``127.0.0.1`` indicates that the server should listen + for requests only on the internal networking interface of + the host. + :param port: The port number to listen for requests. The default is + port 5000. + :param debug: If ``True``, the server logs debugging information. The + default is ``False``. + :param ssl: An ``SSLContext`` instance or ``None`` if the server should + not use TLS. The default is ``None``. + + Example:: + + from microdot import Microdot + + app = Microdot() + + @app.route('/') + async def index(request): + return 'Hello, world!' + + app.run(debug=True) + """ + uasyncio.run(self.start_server(host=host, port=port, debug=debug, + ssl=ssl)) # pragma: no cover + + def shutdown(self): + """Request a server shutdown. The server will then exit its request + listening loop and the :func:`run` function will return. This function + can be safely called from a route handler, as it only schedules the + server to terminate as soon as the request completes. + + Example:: + + @app.route('/shutdown') + def shutdown(request): + request.app.shutdown() + return 'The server is shutting down...' + """ + self.server.close() + + def find_route(self, req): + method = req.method.upper() + if method == 'OPTIONS' and self.options_handler: + return self.options_handler(req), '', None + if method == 'HEAD': + method = 'GET' + f = 404 + p = '' + s = None + for route_methods, route_pattern, route_handler, url_prefix, subapp \ + in self.url_map: + req.url_args = route_pattern.match(req.path) + if req.url_args is not None: + p = url_prefix + s = subapp + if method in route_methods: + f = route_handler + break + else: + f = 405 + return f, p, s + + def default_options_handler(self, req): + allow = [] + for route_methods, route_pattern, _, _, _ in self.url_map: + if route_pattern.match(req.path) is not None: + allow.extend(route_methods) + if 'GET' in allow: + allow.append('HEAD') + allow.append('OPTIONS') + return {'Allow': ', '.join(allow)} + + async def handle_request(self, reader, writer): + req = None + try: + req = await Request.create(self, reader, writer, + writer.get_extra_info('peername')) + except OSError as exc: # pragma: no cover + if exc.errno in MUTED_SOCKET_ERRORS: + pass + else: + raise + except Exception as exc: # pragma: no cover + print_exception(exc) + + res = await self.dispatch_request(req) + try: + if res != Response.already_handled: # pragma: no branch + await res.write(writer) + await writer.aclose() + except OSError as exc: # pragma: no cover + if exc.errno in MUTED_SOCKET_ERRORS: + pass + else: + raise + if self.debug and req: # pragma: no cover + print('{method} {path} {status_code}'.format( + method=req.method, path=req.path, + status_code=res.status_code)) + + def get_request_handlers(self, req, attr, local_first=True): + handlers = getattr(self, attr + '_handlers') + local_handlers = getattr(req.subapp, attr + '_handlers') \ + if req and req.subapp else [] + return local_handlers + handlers if local_first \ + else handlers + local_handlers + + async def error_response(self, req, status_code, reason=None): + if req and req.subapp and status_code in req.subapp.error_handlers: + return await invoke_handler( + req.subapp.error_handlers[status_code], req) + elif status_code in self.error_handlers: + return await invoke_handler(self.error_handlers[status_code], req) + return reason or 'N/A', status_code + + async def dispatch_request(self, req): + after_request_handled = False + if req: + if req.content_length > req.max_content_length: + # the request body is larger than allowed + res = await self.error_response(req, 413, 'Payload too large') + else: + # find the route in the app's URL map + f, req.url_prefix, req.subapp = self.find_route(req) + + try: + res = None + if callable(f): + req.route = f + + # invoke the before request handlers + for handler in self.get_request_handlers( + req, 'before_request', False): + res = await invoke_handler(handler, req) + if res: + break + + # invoke the endpoint handler + if res is None: + res = await invoke_handler(f, req, **req.url_args) + + # process the response + if isinstance(res, int): + # an integer response is taken as a status code + # with an empty body + res = '', res + if isinstance(res, tuple): + # handle a tuple response + if isinstance(res[0], int): + # a tuple that starts with an int has an empty + # body + res = ('', res[0], + res[1] if len(res) > 1 else {}) + body = res[0] + if isinstance(res[1], int): + # extract the status code and headers (if + # available) + status_code = res[1] + headers = res[2] if len(res) > 2 else {} + else: + # if the status code is missing, assume 200 + status_code = 200 + headers = res[1] + res = Response(body, status_code, headers) + elif not isinstance(res, Response): + # any other response types are wrapped in a + # Response object + res = Response(res) + + # invoke the after request handlers + for handler in self.get_request_handlers( + req, 'after_request', True): + res = await invoke_handler( + handler, req, res) or res + for handler in req.after_request_handlers: + res = await invoke_handler( + handler, req, res) or res + after_request_handled = True + elif isinstance(f, dict): + # the response from an OPTIONS request is a dict with + # headers + res = Response(headers=f) + else: + # if the route is not found, return a 404 or 405 + # response as appropriate + res = await self.error_response(req, f, 'Not found') + except HTTPException as exc: + # an HTTP exception was raised while handling this request + res = await self.error_response(req, exc.status_code, + exc.reason) + except Exception as exc: + # an unexpected exception was raised while handling this + # request + print_exception(exc) + + # invoke the error handler for the exception class if one + # exists + handler = None + res = None + if req.subapp and exc.__class__ in \ + req.subapp.error_handlers: + handler = req.subapp.error_handlers[exc.__class__] + elif exc.__class__ in self.error_handlers: + handler = self.error_handlers[exc.__class__] + else: + # walk up the exception class hierarchy to try to find + # a handler + for c in mro(exc.__class__)[1:]: + if req.subapp and c in req.subapp.error_handlers: + handler = req.subapp.error_handlers[c] + break + elif c in self.error_handlers: + handler = self.error_handlers[c] + break + if handler: + try: + res = await invoke_handler(handler, req, exc) + except Exception as exc2: # pragma: no cover + print_exception(exc2) + if res is None: + # if there is still no response, issue a 500 error + res = await self.error_response( + req, 500, 'Internal server error') + else: + # if the request could not be parsed, issue a 400 error + res = await self.error_response(req, 400, 'Bad request') + if isinstance(res, tuple): + res = Response(*res) + elif not isinstance(res, Response): + res = Response(res) + if not after_request_handled: + # if the request did not finish due to an error, invoke the after + # error request handler + for handler in self.get_request_handlers( + req, 'after_error_request', True): + res = await invoke_handler( + handler, req, res) or res + res.is_head = (req and req.method == 'HEAD') + return res + + +Response.already_handled = Response() + +abort = Microdot.abort +redirect = Response.redirect +send_file = Response.send_file diff --git a/src/server/microdot/__init__.py b/src/server/microdot/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/server/webserver.py b/src/server/webserver.py new file mode 100644 index 0000000..f763de9 --- /dev/null +++ b/src/server/webserver.py @@ -0,0 +1,93 @@ +import sys + +import uasyncio + +from src.gunpla.generic_gundam import GenericGundam +from src.hardware.Hardware import Hardware +from src.pi.led_effect import LEDEffects +from src.server.microdot.Microdot import Microdot, Request, send_file +from src.server.Wrappers import create_show_handler, safe_execution + + +class WebServer: + """ + Webserver that manages API routes and web pages for the Gunpla + """ + + def __init__(self, configuration: dict, hardware: Hardware): + self.app = Microdot() + self.settings: dict = configuration + self.gundam: GenericGundam = configuration['model'] + self.hardware: Hardware = hardware + + @safe_execution + async def index(self, request: Request): + """ + Returns the root index page + """ + # Todo fix this rendering + return await send_file("src/www/index.html") + + @safe_execution + async def canary(self, request: Request): + """ + Sanity check to make sure webserver is running. + """ + uasyncio.create_task(LEDEffects.blink(self.hardware.board_led)) + return "chirp", 202 + + async def _connect_to_wifi(self): + ipaddress: str = await self.hardware.networking.connect_to_wifi(self.settings['ssid'], self.settings['password']) + if ipaddress: + print(f"Server started on {ipaddress}") + await LEDEffects.blink(self.hardware.board_led) + else: + print("Server failed to connect") + sys.exit("Cannot start server") + + async def run(self): + """ + Main runner of the webserver. Loads configurations, paths, connects to wifi and runs the server + """ + + self.hardware.networking.configure_host(self.settings['hostname']) + await self._connect_to_wifi() + + self._add_routes() + + await self.app.start_server(host='0.0.0.0', port=80, debug=True) + + def _add_routes(self): + """ + Given a server adds all endpoints for Leds and lightshows + """ + self.app.route("/")(self.index) + self.app.route("/index")(self.index) + self.app.route("/canary")(self.canary) + # TODO add a /stop route to stop all lightshows + + @self.app.route("/led//on") + @safe_execution + async def led_on_handler(request, led_name): + return self.gundam.led_on(led_name) + + @self.app.route("/led//off") + @safe_execution + async def led_off_handler(request, led_name): + return self.gundam.led_off(led_name) + + self.app.route("/all/on")(self.gundam.all_on) + self.app.route("/all/off")(self.gundam.all_off) + + # dynamically add all lightshow paths + for lightshow in self.gundam.config['lightshow']: + path = f"/lightshow/{lightshow['path']}" + method_func = getattr(self.gundam, lightshow['method']) + + self.app.route(path)(create_show_handler(method_func, self.gundam)) + + # 404 Handler + @self.app.errorhandler(404) + def not_found(request): + # TODO: list all routes + return "Not found", 404 diff --git a/src/webserver.py b/src/webserver.py deleted file mode 100644 index 097093d..0000000 --- a/src/webserver.py +++ /dev/null @@ -1,83 +0,0 @@ -import sys - -import network - -from src import settings -from src.gunpla.generic_gundam import GenericGundam -from src.phew import connect_to_wifi, server -from src.phew.server import Request, Response, logging -from src.phew.template import render_template -from src.pi.board_led import BoardLED -from src.pi.LED import LED -from src.pi.led_effect import LEDEffects - - -class WebServer: - """ - Webserver that manages API routes and web pages for the Gunpla - """ - - def __init__(self, configuration: dict): - self.settings: dict = configuration - self.gundam: GenericGundam = settings.webserver['model'] - self.board_led: LED = BoardLED() - - def index(self, request: Request) -> Response: - """ - Returns the root index page - """ - return await render_template("src/www/index.html", - title=self.gundam.config['name'], - all_leds=self.gundam.config['leds'], - lightshows=self.gundam.config['lightshow']) - - def canary(self, request: Request) -> Response: - """ - Sanity check to make sure webserver is running. - """ - LEDEffects.blink(self.board_led) - return Response("chirp", 200) - - def catchall(self, request: Request): - """ - Generic handler to catch any routing error - """ - return Response("Not found", 404) - - def main(self): - """ - Main runner of the webserver. Loads configurations, paths, connects to wifi and runs the server - """ - network.hostname(self.settings['hostname']) - logging.info(f"Set hostname to {network.hostname()}") - logging.info(f"Connect to {self.settings['ssid']} with {self.settings['password']}") - ipaddress: str = connect_to_wifi(self.settings['ssid'], self.settings['password']) - if ipaddress: - logging.info(f"Server started on {ipaddress}") - LEDEffects.blink(self.board_led) - else: - logging.error("Server failed to connect") - sys.exit("Cannot start server") - - server.set_callback(self.catchall) - - self._add_routes() - - server.run() - - def _add_routes(self): - """ - Given a server adds all endpoints for Leds and lightshows - """ - server.add_route("/", self.index, methods=["GET"]) - server.add_route("/index", self.index, methods=["GET"]) - server.add_route("/canary", self.canary, methods=["GET"]) - server.set_callback(self.catchall) - - server.add_route("/led//on", self.gundam.led_on, methods=["GET"]) - server.add_route("/led//off", self.gundam.led_off, methods=["GET"]) - server.add_route("/all/on", self.gundam.all_on, methods=["GET"]) - server.add_route("/all/off", self.gundam.all_off, methods=["GET"]) - for lightshow in self.gundam.config['lightshow']: - server.add_route(f"/lightshow/{lightshow['path']}", getattr(self, lightshow['method']), - methods=["GET"]) diff --git a/tests/LocalServerTest.py b/tests/LocalServerTest.py new file mode 100644 index 0000000..011c137 --- /dev/null +++ b/tests/LocalServerTest.py @@ -0,0 +1,62 @@ + +import json + +import uasyncio + +from src.gunpla.generic_gundam import GenericGundam +from src.hardware.VirtualHardware import VirtualHardware +from src.server.webserver import WebServer + +""" +Sanity check class to run the webserver in local mode when a Raspberry pi is not needed. +""" + + +class MobileDoll(GenericGundam): + """ + Mobile Doll + """ + + def __init__(self, model_config: json): + self.config = model_config + + def get_config_file(self) -> str: + return "tests/config/virgo.json" + + # TODO: refactor light show url creation to be a decorator and also not need the request. + async def activation(self): + print("Mobile Doll activation") + return + + +def main(): + + model_config = { + "$schema": "gundam_led_config.schema.json", + + "name": "Virgo", + "leds": [ + {"name": "head", "pin": 0, "color": "green", "disabled": True}, + ], + "lightshow": [ + { + "name": "Activate Virgo", + "path": "activation", + "method": "activation" + } + ] + } + + test_settings = { + "ssid": "wifi", + "password": 'wifi-pass', + "hostname": 'virgo', + "model": MobileDoll(model_config) + } + + webserver = WebServer(test_settings, VirtualHardware()) + uasyncio.run(webserver.run()) + + +if __name__ == "__main__": + main() diff --git a/tests/config/virgo.json b/tests/config/virgo.json new file mode 100644 index 0000000..9753e6f --- /dev/null +++ b/tests/config/virgo.json @@ -0,0 +1,24 @@ +{ + "$schema": "../../gundam_led_config.schema.json", + + "name": "virgo", + "leds": [ + {"name": "head", "pin": 0, "color": "green", "disabled": true}, + {"name": "head_camera", "pin": 1, "color": "green", "disabled": true}, + {"name": "chest", "pin": 2, "color": "green", "disabled": true}, + {"name": "beamsaber1", "pin": 3, "color": "red", "disabled": true}, + {"name": "beamsaber2", "pin": 4, "color": "red", "disabled": true}, + {"name": "weapon1", "pin": 5, "color": "red", "disabled": true}, + {"name": "weapon1_camera", "pin": 6, "color": "green", "disabled": true}, + {"name": "weapon2", "pin": 7, "color": "red", "disabled": true}, + {"name": "weapon2_camera", "pin": 8, "color": "green", "disabled": true}, + {"name": "vernier", "pin": 10, "color": "blue", "disabled": true} + ], + "lightshow": [ + { + "name": "Activate Gundam", + "path": "activation", + "method": "activation" + } + ] +} \ No newline at end of file