diff --git a/envr-default b/envr-default index 15617a9..a695c36 100644 --- a/envr-default +++ b/envr-default @@ -7,5 +7,5 @@ PYTHON_VENV=.venv [ADD_TO_PATH] [ALIASES] -lint=black --check . && isort --check-only --diff . && flake8 . && pydoclint smpclient && mypy . +lint=black --check --diff . && isort --check-only --diff . && flake8 . && pydoclint smpclient && mypy . test=coverage erase && pytest --cov --maxfail=1 \ No newline at end of file diff --git a/smpclient/__init__.py b/smpclient/__init__.py index 4bba672..abfb276 100644 --- a/smpclient/__init__.py +++ b/smpclient/__init__.py @@ -37,7 +37,7 @@ from smp import message as smpmsg from typing_extensions import assert_never -from smpclient.exceptions import SMPBadSequence, SMPUploadError +from smpclient.exceptions import SMPBadSequence, SMPUploadError, SMPValidationException from smpclient.generics import SMPRequest, TEr1, TEr2, TRep, error, success from smpclient.requests.file_management import FileDownload, FileUpload from smpclient.requests.image_management import ImageUploadWrite @@ -52,6 +52,47 @@ logger = logging.getLogger(__name__) +def _hexdump_ascii(data: bytes) -> str: + """Python 3.12+ has builtin hexdump, prior to that we need to reinvent the wheel.""" + lines = [] + for i in range(0, len(data), 16): + chunk = data[i : i + 16] + hexpart = " ".join(f"{b:02x}" for b in chunk) + ascpart = "".join(chr(b) if 32 <= b <= 126 else "." for b in chunk) + lines.append(f"\t{i:04x} {hexpart:<47} {ascpart}") + return "\n".join(lines) + + +def _prettify_validation_error(exc: ValidationError) -> str: + lines: list[str] = [] + for err in exc.errors(): + err_type = err["type"] + msg = err["msg"] + loc = ".".join(str(x) for x in err["loc"]) + lines.append(f"\t\t[{err_type}] {msg}: {loc}; input: {err['input']})") + return "\n".join(lines) + + +def _smp_validation_error_message( + header: smpheader.Header, + frame: bytes, + errs: dict[type[smpmsg.Response], ValidationError], +) -> tuple[str, str]: + msg = f"\nFrame could not be parsed as any of:\n\t{[str(t.__name__) for t in errs.keys()]}\n" + + details = "" + details += f"Header:\n\t{header}\n" + details += f"Frame:\n{_hexdump_ascii(frame)}\n" + details += "Errors:\n" + for cls, exc in errs.items(): + details += ( + f"\tCould not be parsed as {cls.__name__} because {len(exc.errors())} errors:\n" + f"{_prettify_validation_error(exc)}\n" + ) + + return msg, details + + class SMPClient: """Create a client to the SMP server `address`, using `transport`. @@ -120,7 +161,7 @@ async def request( Raises: TimeoutError: if the request times out SMPBadSequence: if the response sequence does not match the request sequence - ValidationError: if the response cannot be parsed as a Response or Error + SMPValidationException: if the response cannot be parsed as a Response or Error Examples: @@ -177,23 +218,22 @@ async def request( f"Bad sequence {header.sequence}, expected {request.header.sequence}" ) + errs: dict[Type, ValidationError] = {} try: return request._Response.loads(frame) # type: ignore - except ValidationError: - pass + except ValidationError as e: + errs[request._Response] = e try: return request._ErrorV1.loads(frame) - except ValidationError: - pass + except ValidationError as e: + errs[request._ErrorV1] = e try: return request._ErrorV2.loads(frame) - except ValidationError: - error_message = ( - f"Response could not by parsed as one of {request._Response}, " - f"{request._ErrorV1}, or {request._ErrorV2}. {header=} {frame=}" - ) - logger.error(error_message) - raise ValidationError(error_message) + except ValidationError as e: + errs[request._ErrorV2] = e + msg, details = _smp_validation_error_message(header, frame, errs) + logger.error(msg + details) + raise SMPValidationException(msg, details) async def upload( self, diff --git a/smpclient/exceptions.py b/smpclient/exceptions.py index 39b973c..c6700a1 100644 --- a/smpclient/exceptions.py +++ b/smpclient/exceptions.py @@ -11,3 +11,10 @@ class SMPBadSequence(SMPClientException): class SMPUploadError(SMPClientException): ... + + +class SMPValidationException(SMPClientException): + def __init__(self, msg: str, details: str) -> None: + self.msg: str = msg + self.details: str = details + super().__init__(msg)