From f20e3497fb857e98b999987f9ff48f562ab30e75 Mon Sep 17 00:00:00 2001 From: k9ert Date: Sun, 8 Jun 2025 14:03:28 +0200 Subject: [PATCH 1/4] chore: refactoring huge file --- serial-reader.py | 653 +------------------------------------------- src/__init__.py | 15 + src/base_scanner.py | 130 +++++++++ src/gm65_scanner.py | 207 ++++++++++++++ src/m3yw_scanner.py | 135 +++++++++ src/main.py | 164 +++++++++++ src/utils.py | 54 ++++ 7 files changed, 718 insertions(+), 640 deletions(-) create mode 100644 src/__init__.py create mode 100644 src/base_scanner.py create mode 100644 src/gm65_scanner.py create mode 100644 src/m3yw_scanner.py create mode 100644 src/main.py create mode 100644 src/utils.py diff --git a/serial-reader.py b/serial-reader.py index aeb50c5..f245eda 100644 --- a/serial-reader.py +++ b/serial-reader.py @@ -1,645 +1,18 @@ -import serial -import time -import argparse -import struct -import binascii -from abc import ABC, abstractmethod +#!/usr/bin/env python3 +""" +SerialBarcodeReaderTools - Refactored Version -# ------------------------ -# Useful constants -# ------------------------ -# Not exhaustive, but supported by both the M3Y and GM65 -common_baud_rates = [ - '9600', - '14400', - '19200', - '38400', - '57600', - '115200', -] +This file has been refactored into the src/ directory for better organization. +You can still use this file as before, or use: python src/main.py [arguments] +""" -# ------------------------ -# Utility Functions -# ------------------------ -def compute_crc16_xmodem(data: bytes) -> bytes: - crc = 0 - for byte in data: - for i in range(7, -1, -1): - crc *= 2 - if (crc & 0x10000) != 0: - crc ^= 0x11021 - if (byte & (1 << i)) != 0: - crc ^= 0x1021 - crc &= 0xFFFF - return crc.to_bytes(2, byteorder='big') +import sys +import os -def check_crc16_xmodem(data_with_crc: bytes) -> bool: - if len(data_with_crc) < 3: - return False - data = data_with_crc[:-2] - received_crc = data_with_crc[-2:] - calculated_crc = compute_crc16_xmodem(data) - return received_crc == calculated_crc +# Add src directory to path +sys.path.insert(0, os.path.join(os.path.dirname(__file__), 'src')) -def compute_bcc(data: bytes) -> bytes: - bcc = 0 - for byte in data: - bcc ^= byte - return bytes([bcc]) +from main import main -def check_bcc(data_with_bcc: bytes) -> bool: - if len(data_with_bcc) < 2: - return False - data = data_with_bcc[:-1] - received_bcc = data_with_bcc[-1:] - calculated_bcc = compute_bcc(data) - return received_bcc == calculated_bcc - -def set_bit(val, bit): return val | (1 << bit) -def clear_bit(val, bit): return val & ~(1 << bit) -def toggle_bit(val, bit): return val ^ (1 << bit) -def check_bit(val, bit): return (val >> bit) & 1 - -# ------------------------ -# Abstract Scanner Class -# ------------------------ -class BaseScanner(ABC): - def __init__(self, serial_port): - self.serial_port = serial_port - self.commands = {} - - @abstractmethod - def tx_header(self) -> bytes: - pass - - @abstractmethod - def compute_checksum(self, data: bytes) -> bytes: - pass - - @abstractmethod - def check_checksum(self, data: bytes) -> bool: - pass - - @abstractmethod - def header_ok(self) -> bytes: - pass - - @abstractmethod - def rx_struct_fmt(self) -> str: - pass - - @abstractmethod - def create_tx(self, command: bytes, value: bytes = b'') -> bytes: - pass - - @abstractmethod - def parse_rx(self, data: bytes): - pass - - @abstractmethod - def cmd_send_raw(self, value: str = ''): - pass - - @abstractmethod - def cmd_set_baudrate(self, value: int = 9600): - pass - - @abstractmethod - def get_safe_for_binaryqr(self): - pass - - def etx_bytes(self) -> bytes: - pass - - def send_and_parse(self, tx_data): - print("Sent (Raw):", tx_data, "AsHex:", binascii.hexlify(tx_data)) - self.serial_port.write(tx_data) - rx_data = self.serial_port.read(1024) - print("Got (Raw):", rx_data, "AsHex:", binascii.hexlify(rx_data)) - reply, extra = self.parse_rx(rx_data) - if reply: - print("Reply:", reply, "AsHex:", binascii.hexlify(reply)) - print("Extra:", extra, "AsHex:", binascii.hexlify(extra)) - return reply, extra - - # Placeholder command methods - def cmd_get_hw_version(self): - raise NotImplementedError("cmd_get_hw_version not implemented for this reader") - - def cmd_get_sw_version(self): - raise NotImplementedError("cmd_get_sw_version not implemented for this reader") - - def cmd_get_sw_year(self): - raise NotImplementedError("cmd_get_sw_year not implemented for this reader") - - def cmd_get_settings(self): - raise NotImplementedError("cmd_get_settings not implemented for this reader") - - def cmd_set_settings(self, value: bytes = b''): - raise NotImplementedError("cmd_set_settings not implemented for this reader") - - def cmd_save_settings(self): - raise NotImplementedError("cmd_save_settings not implemented for this reader") - - def cmd_set_continuous_mode(self): - raise NotImplementedError("cmd_set_continuous_mode not implemented for this reader") - - def cmd_set_command_mode(self): - raise NotImplementedError("cmd_set_command_mode not implemented for this reader") - - def cmd_set_illumination(self, value: int = 0): - raise NotImplementedError("cmd_set_illumination not implemented for this reader") - - def cmd_set_aimer(self, value: int = 0): - raise NotImplementedError("cmd_set_aimer not implemented for this reader") - - def cmd_set_beeper(self, value: int = 0): - raise NotImplementedError("cmd_set_beeper not implemented for this reader") - - def cmd_set_read_interval(self, value: float = 0): - raise NotImplementedError("cmd_set_read_interval not implemented for this reader") - - def cmd_set_same_barcode_delay(self, value: float = 0): - raise NotImplementedError("cmd_set_same_barcode_delay not implemented for this reader") - - def test_baudrates(self): - for baudrate in common_baud_rates + list(reversed(common_baud_rates)): - works, _ = self.cmd_set_baudrate(int(baudrate)) - if works: - print(baudrate, "Success") - else: - print(baudrate, "Failed") - - def find_baudrate(self): - for baudrate in common_baud_rates: - print("Checking at...", baudrate) - self.serial_port.baudrate = int(baudrate) - reply, _ = self.cmd_get_sw_version() - if reply: - return baudrate - - return None - -# ------------------------ -# GM65 Scanner -# ------------------------ -class GM65Scanner(BaseScanner): - def __init__(self, serial_port): - super().__init__(serial_port) - - def tx_header(self): - return binascii.unhexlify('7e00') - - def compute_checksum(self, data: bytes) -> bytes: - return compute_crc16_xmodem(data) - - def check_checksum(self, data: bytes) -> bool: - return check_crc16_xmodem(data) - - def header_ok(self): - return b'020000' - - def rx_struct_fmt(self): - return "3sB" - - def create_tx(self, command: bytes, value: bytes = b'') -> bytes: - raw_data = self.tx_header() + command + value - checksum = self.compute_checksum(command + value) - return raw_data + checksum - - def parse_rx(self, data: bytes): - header_len = struct.calcsize(self.rx_struct_fmt()) - try: - header, data_len = struct.unpack(self.rx_struct_fmt(), data[:header_len]) - except struct.error: - return None, b'' - - if header.hex() in self.header_ok().decode(): - if self.check_checksum(data[1:header_len + data_len + 2]): - return data[header_len:header_len + data_len], data[header_len + data_len + 2:] - return None, b'' - - # Command functions for GM65 that directly create the command and send - def cmd_get_hw_version(self): - command = binascii.unhexlify('070100e101') - return self.send_and_parse(self.create_tx(command)) - - def cmd_get_sw_version(self): - command = binascii.unhexlify('070100e201') - return self.send_and_parse(self.create_tx(command)) - - def cmd_get_sw_year(self): - command = binascii.unhexlify('070100e301') - return self.send_and_parse(self.create_tx(command)) - - def cmd_get_settings(self): - command = binascii.unhexlify('0701000001') - return self.send_and_parse(self.create_tx(command)) - - def cmd_set_settings(self, value: bytes = b''): - command = binascii.unhexlify('08010000') - return self.send_and_parse(self.create_tx(command, value)) - - def cmd_get_address(self, address: bytes): - command = binascii.unhexlify(b'0701' + address + b'01') - return self.send_and_parse(self.create_tx(command)) - - def cmd_set_address(self, address: bytes, value: bytes): - command = binascii.unhexlify(b'0801' + address) - return self.send_and_parse(self.create_tx(command, binascii.unhexlify(value))) - - def cmd_save_address(self, address: bytes): - command = binascii.unhexlify(b'0901' + address + b'00') - return self.send_and_parse(self.create_tx(command)) - - def cmd_save_settings(self): - command = binascii.unhexlify('0901000000') - return self.send_and_parse(self.create_tx(command)) - - def cmd_set_continuous_mode(self): - settings, extra = self.cmd_get_settings() - settings_int = settings[0] - settings_int = set_bit(settings_int, 1) - settings_int = clear_bit(settings_int, 0) - self.cmd_set_settings(bytes([settings_int])) - self.cmd_save_settings() - return True, None - - def cmd_set_command_mode(self): - settings, extra = self.cmd_get_settings() - settings_int = settings[0] - settings_int = set_bit(settings_int, 0) - settings_int = clear_bit(settings_int, 1) - self.cmd_set_settings(bytes([settings_int])) - self.cmd_save_settings() - return True, None - - # Always Off (Value = -1) - # Normal Mode (Value = 0) - # Always On (Value = 1) - def cmd_set_illumination(self, value: int = 0): - settings, extra = self.cmd_get_settings() - settings_int = settings[0] - if value < 0: - settings_int = clear_bit(settings_int, 3) - settings_int = clear_bit(settings_int, 2) - elif value == 0: - settings_int = set_bit(settings_int, 2) - settings_int = clear_bit(settings_int, 3) - elif value > 0: - settings_int = set_bit(settings_int, 3) - settings_int = set_bit(settings_int, 2) - self.cmd_set_settings(bytes([settings_int])) - self.cmd_save_settings() - return True, None - - # Always Off (Value = -1) - # Normal Mode (Value = 0) - # Always On (Value = 1) - def cmd_set_aimer(self, value: int = 0): - settings, extra = self.cmd_get_settings() - settings_int = settings[0] - if value < 0: - settings_int = clear_bit(settings_int, 5) - settings_int = clear_bit(settings_int, 4) - elif value == 0: - settings_int = set_bit(settings_int, 4) - settings_int = clear_bit(settings_int, 5) - elif value > 0: - settings_int = set_bit(settings_int, 5) - settings_int = set_bit(settings_int, 4) - self.cmd_set_settings(bytes([settings_int])) - self.cmd_save_settings() - return True, None - - # Muted (Value = -1) - # On (Value = 1) - def cmd_set_beeper(self, value: int = 0): - settings, extra = self.cmd_get_settings() - settings_int = settings[0] - if value < 0: - settings_int = clear_bit(settings_int, 6) - elif value == 0: - raise NotImplementedError - elif value > 0: - settings_int = set_bit(settings_int, 6) - self.cmd_set_settings(bytes([settings_int])) - self.cmd_save_settings() - return True, None - - def cmd_set_read_interval(self, value: float = 0): - command = binascii.unhexlify('08010005') - value = (round(value * 10)).to_bytes(1) - return self.send_and_parse(self.create_tx(command, value)) - - def cmd_set_same_barcode_delay(self, value: float = 0): - if value > 12.7: # Value needs to be below 12.7s as bit7 is used for enable/disable the feature - raise ValueError - command = binascii.unhexlify('08010013') - value = (round(value * 10)).to_bytes(1) - value = set_bit(value[0], 7) - return self.send_and_parse(self.create_tx(command, bytes([value]))) - - def cmd_send_raw(self, value: str = ''): - command = binascii.unhexlify(value) - return self.send_and_parse(self.create_tx(command)) - - - def cmd_set_baudrate(self, value: int = 9600): - baudvalues = { # Note that byte order here is reversed compared to what is in the datasheet... - 9600: b'3901', - 14400: b'd000', - 19200: b'9c00', - 38400: b'4e00', - 57600: b'3400', - 115200: b'1a00' - } - command = binascii.unhexlify('0802002A') - reply, extra = self.send_and_parse(self.create_tx(command, binascii.unhexlify(baudvalues[value]))) - self.serial_port.baudrate = value - # Test to see if everything worked... - reply, extra = self.cmd_get_sw_version() - if reply: - return True, None - else: - return False, None - - def get_safe_for_binaryqr(self): - gm65_known_bad_sw_versions = [b'69'] # Versions known to scan binaryQR codes unreliably - gm65_known_good_sw_version = [b'87', b'af'] # Versions known to be work correctly - version, extra = self.cmd_get_sw_version() - version = binascii.hexlify(version) - print("Got Software Version:", version, "Checking...") - if version in gm65_known_bad_sw_versions: - return False - elif version in gm65_known_good_sw_version: - return True - else: - return None - -# ------------------------ -# M3Y-W Scanner -# ------------------------ -class M3YWScanner(BaseScanner): - def __init__(self, serial_port): - super().__init__(serial_port) - - def tx_header(self): - return binascii.unhexlify('5a00') - - def compute_checksum(self, data: bytes) -> bytes: - return compute_bcc(data) - - def check_checksum(self, data: bytes) -> bool: - return check_bcc(data) - - def header_ok(self): - return b'5a01' - - def rx_struct_fmt(self): - return ">2sH" - - def etx_bytes(self): - return binascii.unhexlify('a5') - - def create_tx(self, command: bytes, value: bytes = b'') -> bytes: - # Value not used here in this reader - command_len = len(command).to_bytes(2, byteorder='big') - raw_data = self.tx_header() + command_len + command - checksum = self.compute_checksum(command_len + command) - return raw_data + checksum + self.etx_bytes() - - def parse_rx(self, data: bytes): - header_len = struct.calcsize(self.rx_struct_fmt()) - try: - header, data_len = struct.unpack(self.rx_struct_fmt(), data[:header_len]) - except struct.error: - return None, b'' - - if header.hex() in self.header_ok().decode(): - if self.check_checksum(data[header_len - 3:header_len + data_len + 1]): - return data[header_len:header_len + data_len], data[header_len + data_len + 2:] - return None, b'' - - # Command functions for M3YW that directly create the command and send - def cmd_get_sw_version(self): - command = b'T_OUT_CVER' - return self.send_and_parse(self.create_tx(command)) - - def cmd_set_continuous_mode(self): - command = b'S_CMD_020E' - return self.send_and_parse(self.create_tx(command)) - - def cmd_set_command_mode(self): - command = b'S_CMD_020D' - return self.send_and_parse(self.create_tx(command)) - - # Command functions for M3YW that directly create the command and send - # Always Off S_CMD_03L0 (Value = -1) - # Normal Mode S_CMD_03L2 (Value = 0) - # Always On S_CMD_03L1 (Value = 1) - def cmd_set_illumination(self, value: int = 0): - if value < 0: - command = b'S_CMD_03L0' - elif value == 0: - command = b'S_CMD_03L2' - elif value > 0: - command = b'S_CMD_03L1' - return self.send_and_parse(self.create_tx(command)) - - # Always Off S_CMD_03A0 (Value = -1) - # Normal Mode S_CMD_03A2 (Value = 0) - # Always On S_CMD_03A1 (Value = 1) - def cmd_set_aimer(self, value: int = 0): - if value < 0: - command = b'S_CMD_03A0' - elif value == 0: - command = b'S_CMD_03A2' - elif value > 0: - command = b'S_CMD_03A1' - - return self.send_and_parse(self.create_tx(command)) - - # Mute all S_CMD_04F0 (Value = -1) - # Unmute all S_CMD_04F1 (Value = 1) - def cmd_set_beeper(self, value: int = 0): - if value < 0: - command = b'S_CMD_04F0' - elif value == 0: - raise NotImplementedError - elif value > 0: - command = b'S_CMD_04F1' - return self.send_and_parse(self.create_tx(command)) - - def cmd_set_read_interval(self, value: float = 0): - command = b'S_CMD_MARR' + str(round(value*1000)).encode() - return self.send_and_parse(self.create_tx(command)) - - def cmd_set_same_barcode_delay(self, value: float = 0): - command = b'S_CMD_MA31' - self.send_and_parse(self.create_tx(command)) - command = b'S_CMD_MA41' - self.send_and_parse(self.create_tx(command)) - command = b'S_CMD_MARI' + str(round(value*1000)).encode() - return self.send_and_parse(self.create_tx(command)) - - def cmd_send_raw(self, value: str = ''): - command = value.encode() - return self.send_and_parse(self.create_tx(command)) - - def cmd_set_baudrate(self, value: int = 9600): - command = b'S_CMD_H3BR' + str(value).encode() - reply, extra = self.send_and_parse(self.create_tx(command, value)) - self.serial_port.baudrate = value - # Test to see if everything worked... - reply, extra = self.cmd_get_sw_version() - if reply: - return True, None - else: - return False, None - - def get_safe_for_binaryqr(self): - return True - -# ------------------------ -# Scanner Factory -# ------------------------ -def detect_scanner(serial_port) -> BaseScanner: - """ - Identify the scanner by sending a request for its software version. - """ - for scanner in [GM65Scanner, M3YWScanner]: - scanner = scanner(serial_port) - print("Trying", scanner.__class__.__name__) - foundbaud = scanner.find_baudrate() - if foundbaud: - print("Identified Scanner:", scanner.__class__.__name__, "at baudrate:", foundbaud) - return scanner - else: - print(scanner.__class__.__name__, "not detected") - - raise RuntimeError("No supported scanner found") - -# ------------------------ -# Main Script -# ------------------------ -parser = argparse.ArgumentParser(description="Scanner Interface") -parser.add_argument("port", help="Serial port to use") -parser.add_argument("--scanner", type=str, help="Scanner type (gm65 or m3y)") -parser.add_argument("--hw-version", action='store_true', help="Query the device for the hardware version") -parser.add_argument("--sw-version", action='store_true', help="Query the device for the software version") -parser.add_argument("--sw-year", action='store_true', help="Query the device for the software year") -parser.add_argument("--get-settings", action='store_true', help="Get the common (aim light, illumination, beeper) settings zone (GM65) and represent as hex") -parser.add_argument("--get-safe-for-binary-qr", help="Check if the connected reader is know to be safe for binary QR scanning") -parser.add_argument("--set-settings", help="Save the supplied byte to the common settings (aim light, illumination, beeper) zone (GM65)") -parser.add_argument("--get-address", help="Query a given memory address and return the result as a byte") -parser.add_argument("--set-address", nargs=2, help="Update a given memory address with a byte (Format ") -parser.add_argument("--save-address", help="Save a memory address so that the current setting is preserved across reboots.") -parser.add_argument("--set-illumination", type=int, help="Adjust the illumination light. -1 = always off, 0 = On while scanning, 1 = always on") -parser.add_argument("--set-aimer", type=int, help="Adjust the aiming light. -1 = always off, 0 = On while scanning, 1 = always on") -parser.add_argument("--set-beeper", type=int, help="Adjust the beeper. -1 = muted, 1 = on") -parser.add_argument("--set-read-interval", type=float, help="Adjust the minimum time between QR code reads") -parser.add_argument("--set-same-barcode-delay", type=float, help="Adjust the minimumt ime between re-reading the same QR code") -parser.add_argument("--send-raw-cmd", type=str, help="Send a raw command to the reader") -parser.add_argument("--save-settings", action='store_true', help="Save settings to EEPROM (Required for GM65 to persist settings across reboots)") -parser.add_argument("--set-continuous-mode", action='store_true', help="Put scanner in continious mode") -parser.add_argument("--set-command-mode", action='store_true', help="Put scanner in command mode (will stop continious mode)") -parser.add_argument("--set-baudrate", choices=common_baud_rates, help="Changes the scanners baudrate and checks if this was successful") -parser.add_argument("--baudrate", choices=common_baud_rates, help="Sets the baudrate that this tool will use (Default 9600)") -parser.add_argument("--test-baudrates", action='store_true', help="Runs through a list of common baud rates to see what your device supports (Or finds out what BAUD it is currently using)") - -args = parser.parse_args() - -baudrate = 9600 -if args.baudrate: - baudrate = args.baudrate - -ser = serial.Serial(args.port, baudrate, timeout=1) - -if args.scanner: - if "gm65" in args.scanner.lower(): - scanner = GM65Scanner(ser) - elif "m3y" in args.scanner.lower(): - scanner = M3YWScanner(ser) -else: - scanner = detect_scanner(ser) - -scan_duration = 1 -if args.hw_version: - reply, extra = scanner.cmd_get_hw_version() -elif args.sw_version: - reply, extra = scanner.cmd_get_sw_version() -elif args.sw_year: - reply, extra = scanner.cmd_get_sw_year() -elif args.get_settings: - reply, extra = scanner.cmd_get_settings() -elif args.set_settings: - reply, extra = scanner.cmd_set_settings(args.set_settings.encode()) -elif args.get_address: - reply, extra = scanner.cmd_get_address(args.get_address.encode()) -elif args.set_address: - reply, extra = scanner.cmd_set_address(args.set_address[0].encode(), args.set_address[1].encode()) -elif args.save_address: - reply, extra = scanner.cmd_save_address(args.save_address.encode()) -elif args.save_settings: - reply, extra = scanner.cmd_save_settings() -elif args.set_illumination is not None: - print("Setting Illumination") - reply, extra = scanner.cmd_set_illumination(args.set_illumination) -elif args.set_aimer is not None: - print("Setting Aimer") - reply, extra = scanner.cmd_set_aimer(args.set_aimer) -elif args.set_beeper is not None: - print("Setting Beeper") - reply, extra = scanner.cmd_set_beeper(args.set_beeper) -elif args.set_read_interval is not None: - print("Setting Read Interval") - reply, extra = scanner.cmd_set_read_interval(args.set_read_interval) -elif args.set_same_barcode_delay is not None: - print("Setting Same Barcode Delay") - reply, extra = scanner.cmd_set_same_barcode_delay(args.set_same_barcode_delay) -elif args.send_raw_cmd: - print("Sending raw command") - reply, extra = scanner.cmd_send_raw(args.send_raw_cmd) -elif args.set_continuous_mode: - print("Setting Continuous Mode") - reply, extra = scanner.cmd_set_continuous_mode() -elif args.set_command_mode: - print("Setting Command Mode") - reply, extra = scanner.cmd_set_command_mode() -elif args.set_baudrate is not None: - print("Setting Baud Rate") - reply, extra = scanner.cmd_set_baudrate(int(args.set_baudrate)) - if reply: - print("Baudrate Changed Successfully!") - else: - print("Baudrate Change Failed...") -elif args.test_baudrates: - scanner.test_baudrates() -elif args.get_safe_for_binary_qr: - safe = scanner.get_safe_for_binaryqr() - if safe is not None: - if safe: - print("Good News: Safe to use") - else: - print("WARNING: Known to be unsafe for binary QR scanning") - else: - print("Unsure... Unable to match software version as known-good or known-bad...") - -else: - print("Setting Continuous Mode") - reply, extra = scanner.cmd_set_continuous_mode() - - print("Scanning for 10 Seconds") - scan_duration = 10 - # Keep scanning - start = time.time() - rx_data = b'' - while (time.time() - start) <= scan_duration: - rx_data += ser.read(1024) - - print("Setting Command Mode") - reply, extra = scanner.cmd_set_command_mode() - print("Got:", rx_data, "AsHex:", binascii.hexlify(rx_data)) - -ser.close() +if __name__ == "__main__": + main() diff --git a/src/__init__.py b/src/__init__.py new file mode 100644 index 0000000..9fbc2f4 --- /dev/null +++ b/src/__init__.py @@ -0,0 +1,15 @@ +# SerialBarcodeReaderTools package +from .base_scanner import BaseScanner +from .gm65_scanner import GM65Scanner +from .m3yw_scanner import M3YWScanner +from .main import main, detect_scanner +from .utils import common_baud_rates + +__all__ = [ + 'BaseScanner', + 'GM65Scanner', + 'M3YWScanner', + 'main', + 'detect_scanner', + 'common_baud_rates' +] diff --git a/src/base_scanner.py b/src/base_scanner.py new file mode 100644 index 0000000..45b9002 --- /dev/null +++ b/src/base_scanner.py @@ -0,0 +1,130 @@ +import binascii +import sys +import os +from abc import ABC, abstractmethod + +# Add current directory to path for imports +sys.path.insert(0, os.path.dirname(__file__)) + +from utils import common_baud_rates + + +# ------------------------ +# Abstract Scanner Class +# ------------------------ +class BaseScanner(ABC): + def __init__(self, serial_port): + self.serial_port = serial_port + self.commands = {} + + @abstractmethod + def tx_header(self) -> bytes: + pass + + @abstractmethod + def compute_checksum(self, data: bytes) -> bytes: + pass + + @abstractmethod + def check_checksum(self, data: bytes) -> bool: + pass + + @abstractmethod + def header_ok(self) -> bytes: + pass + + @abstractmethod + def rx_struct_fmt(self) -> str: + pass + + @abstractmethod + def create_tx(self, command: bytes, value: bytes = b'') -> bytes: + pass + + @abstractmethod + def parse_rx(self, data: bytes): + pass + + @abstractmethod + def cmd_send_raw(self, value: str = ''): + pass + + @abstractmethod + def cmd_set_baudrate(self, value: int = 9600): + pass + + @abstractmethod + def get_safe_for_binaryqr(self): + pass + + def etx_bytes(self) -> bytes: + pass + + def send_and_parse(self, tx_data): + print("Sent (Raw):", tx_data, "AsHex:", binascii.hexlify(tx_data)) + self.serial_port.write(tx_data) + rx_data = self.serial_port.read(1024) + print("Got (Raw):", rx_data, "AsHex:", binascii.hexlify(rx_data)) + reply, extra = self.parse_rx(rx_data) + if reply: + print("Reply:", reply, "AsHex:", binascii.hexlify(reply)) + print("Extra:", extra, "AsHex:", binascii.hexlify(extra)) + return reply, extra + + # Placeholder command methods + def cmd_get_hw_version(self): + raise NotImplementedError("cmd_get_hw_version not implemented for this reader") + + def cmd_get_sw_version(self): + raise NotImplementedError("cmd_get_sw_version not implemented for this reader") + + def cmd_get_sw_year(self): + raise NotImplementedError("cmd_get_sw_year not implemented for this reader") + + def cmd_get_settings(self): + raise NotImplementedError("cmd_get_settings not implemented for this reader") + + def cmd_set_settings(self, value: bytes = b''): + raise NotImplementedError("cmd_set_settings not implemented for this reader") + + def cmd_save_settings(self): + raise NotImplementedError("cmd_save_settings not implemented for this reader") + + def cmd_set_continuous_mode(self): + raise NotImplementedError("cmd_set_continuous_mode not implemented for this reader") + + def cmd_set_command_mode(self): + raise NotImplementedError("cmd_set_command_mode not implemented for this reader") + + def cmd_set_illumination(self, value: int = 0): + raise NotImplementedError("cmd_set_illumination not implemented for this reader") + + def cmd_set_aimer(self, value: int = 0): + raise NotImplementedError("cmd_set_aimer not implemented for this reader") + + def cmd_set_beeper(self, value: int = 0): + raise NotImplementedError("cmd_set_beeper not implemented for this reader") + + def cmd_set_read_interval(self, value: float = 0): + raise NotImplementedError("cmd_set_read_interval not implemented for this reader") + + def cmd_set_same_barcode_delay(self, value: float = 0): + raise NotImplementedError("cmd_set_same_barcode_delay not implemented for this reader") + + def test_baudrates(self): + for baudrate in common_baud_rates + list(reversed(common_baud_rates)): + works, _ = self.cmd_set_baudrate(int(baudrate)) + if works: + print(baudrate, "Success") + else: + print(baudrate, "Failed") + + def find_baudrate(self): + for baudrate in common_baud_rates: + print("Checking at...", baudrate) + self.serial_port.baudrate = int(baudrate) + reply, _ = self.cmd_get_sw_version() + if reply: + return baudrate + + return None diff --git a/src/gm65_scanner.py b/src/gm65_scanner.py new file mode 100644 index 0000000..02e9c16 --- /dev/null +++ b/src/gm65_scanner.py @@ -0,0 +1,207 @@ +import struct +import binascii +import sys +import os + +# Add current directory to path for imports +sys.path.insert(0, os.path.dirname(__file__)) + +from base_scanner import BaseScanner +from utils import compute_crc16_xmodem, check_crc16_xmodem, set_bit, clear_bit + + +# ------------------------ +# GM65 Scanner +# ------------------------ +class GM65Scanner(BaseScanner): + def __init__(self, serial_port): + super().__init__(serial_port) + + def tx_header(self): + return binascii.unhexlify('7e00') + + def compute_checksum(self, data: bytes) -> bytes: + return compute_crc16_xmodem(data) + + def check_checksum(self, data: bytes) -> bool: + return check_crc16_xmodem(data) + + def header_ok(self): + return b'020000' + + def rx_struct_fmt(self): + return "3sB" + + def create_tx(self, command: bytes, value: bytes = b'') -> bytes: + raw_data = self.tx_header() + command + value + checksum = self.compute_checksum(command + value) + return raw_data + checksum + + def parse_rx(self, data: bytes): + header_len = struct.calcsize(self.rx_struct_fmt()) + try: + header, data_len = struct.unpack(self.rx_struct_fmt(), data[:header_len]) + except struct.error: + return None, b'' + + if header.hex() in self.header_ok().decode(): + if self.check_checksum(data[1:header_len + data_len + 2]): + return data[header_len:header_len + data_len], data[header_len + data_len + 2:] + return None, b'' + + # Command functions for GM65 that directly create the command and send + def cmd_get_hw_version(self): + command = binascii.unhexlify('070100e101') + return self.send_and_parse(self.create_tx(command)) + + def cmd_get_sw_version(self): + command = binascii.unhexlify('070100e201') + return self.send_and_parse(self.create_tx(command)) + + def cmd_get_sw_year(self): + command = binascii.unhexlify('070100e301') + return self.send_and_parse(self.create_tx(command)) + + def cmd_get_settings(self): + command = binascii.unhexlify('0701000001') + return self.send_and_parse(self.create_tx(command)) + + def cmd_set_settings(self, value: bytes = b''): + command = binascii.unhexlify('08010000') + return self.send_and_parse(self.create_tx(command, value)) + + def cmd_get_address(self, address: bytes): + command = binascii.unhexlify(b'0701' + address + b'01') + return self.send_and_parse(self.create_tx(command)) + + def cmd_set_address(self, address: bytes, value: bytes): + command = binascii.unhexlify(b'0801' + address) + return self.send_and_parse(self.create_tx(command, binascii.unhexlify(value))) + + def cmd_save_address(self, address: bytes): + command = binascii.unhexlify(b'0901' + address + b'00') + return self.send_and_parse(self.create_tx(command)) + + def cmd_save_settings(self): + command = binascii.unhexlify('0901000000') + return self.send_and_parse(self.create_tx(command)) + + def cmd_set_continuous_mode(self): + settings, extra = self.cmd_get_settings() + settings_int = settings[0] + settings_int = set_bit(settings_int, 1) + settings_int = clear_bit(settings_int, 0) + self.cmd_set_settings(bytes([settings_int])) + self.cmd_save_settings() + return True, None + + def cmd_set_command_mode(self): + settings, extra = self.cmd_get_settings() + settings_int = settings[0] + settings_int = set_bit(settings_int, 0) + settings_int = clear_bit(settings_int, 1) + self.cmd_set_settings(bytes([settings_int])) + self.cmd_save_settings() + return True, None + + # Always Off (Value = -1) + # Normal Mode (Value = 0) + # Always On (Value = 1) + def cmd_set_illumination(self, value: int = 0): + settings, extra = self.cmd_get_settings() + settings_int = settings[0] + if value < 0: + settings_int = clear_bit(settings_int, 3) + settings_int = clear_bit(settings_int, 2) + elif value == 0: + settings_int = set_bit(settings_int, 2) + settings_int = clear_bit(settings_int, 3) + elif value > 0: + settings_int = set_bit(settings_int, 3) + settings_int = set_bit(settings_int, 2) + self.cmd_set_settings(bytes([settings_int])) + self.cmd_save_settings() + return True, None + + # Always Off (Value = -1) + # Normal Mode (Value = 0) + # Always On (Value = 1) + def cmd_set_aimer(self, value: int = 0): + settings, extra = self.cmd_get_settings() + settings_int = settings[0] + if value < 0: + settings_int = clear_bit(settings_int, 5) + settings_int = clear_bit(settings_int, 4) + elif value == 0: + settings_int = set_bit(settings_int, 4) + settings_int = clear_bit(settings_int, 5) + elif value > 0: + settings_int = set_bit(settings_int, 5) + settings_int = set_bit(settings_int, 4) + self.cmd_set_settings(bytes([settings_int])) + self.cmd_save_settings() + return True, None + + # Muted (Value = -1) + # On (Value = 1) + def cmd_set_beeper(self, value: int = 0): + settings, extra = self.cmd_get_settings() + settings_int = settings[0] + if value < 0: + settings_int = clear_bit(settings_int, 6) + elif value == 0: + raise NotImplementedError + elif value > 0: + settings_int = set_bit(settings_int, 6) + self.cmd_set_settings(bytes([settings_int])) + self.cmd_save_settings() + return True, None + + def cmd_set_read_interval(self, value: float = 0): + command = binascii.unhexlify('08010005') + value = (round(value * 10)).to_bytes(1) + return self.send_and_parse(self.create_tx(command, value)) + + def cmd_set_same_barcode_delay(self, value: float = 0): + if value > 12.7: # Value needs to be below 12.7s as bit7 is used for enable/disable the feature + raise ValueError + command = binascii.unhexlify('08010013') + value = (round(value * 10)).to_bytes(1) + value = set_bit(value[0], 7) + return self.send_and_parse(self.create_tx(command, bytes([value]))) + + def cmd_send_raw(self, value: str = ''): + command = binascii.unhexlify(value) + return self.send_and_parse(self.create_tx(command)) + + def cmd_set_baudrate(self, value: int = 9600): + baudvalues = { # Note that byte order here is reversed compared to what is in the datasheet... + 9600: b'3901', + 14400: b'd000', + 19200: b'9c00', + 38400: b'4e00', + 57600: b'3400', + 115200: b'1a00' + } + command = binascii.unhexlify('0802002A') + reply, extra = self.send_and_parse(self.create_tx(command, binascii.unhexlify(baudvalues[value]))) + self.serial_port.baudrate = value + # Test to see if everything worked... + reply, extra = self.cmd_get_sw_version() + if reply: + return True, None + else: + return False, None + + def get_safe_for_binaryqr(self): + gm65_known_bad_sw_versions = [b'69'] # Versions known to scan binaryQR codes unreliably + gm65_known_good_sw_version = [b'87', b'af'] # Versions known to be work correctly + version, extra = self.cmd_get_sw_version() + version = binascii.hexlify(version) + print("Got Software Version:", version, "Checking...") + if version in gm65_known_bad_sw_versions: + return False + elif version in gm65_known_good_sw_version: + return True + else: + return None diff --git a/src/m3yw_scanner.py b/src/m3yw_scanner.py new file mode 100644 index 0000000..bd82943 --- /dev/null +++ b/src/m3yw_scanner.py @@ -0,0 +1,135 @@ +import struct +import binascii +import sys +import os + +# Add current directory to path for imports +sys.path.insert(0, os.path.dirname(__file__)) + +from base_scanner import BaseScanner +from utils import compute_bcc, check_bcc + + +# ------------------------ +# M3Y-W Scanner +# ------------------------ +class M3YWScanner(BaseScanner): + def __init__(self, serial_port): + super().__init__(serial_port) + + def tx_header(self): + return binascii.unhexlify('5a00') + + def compute_checksum(self, data: bytes) -> bytes: + return compute_bcc(data) + + def check_checksum(self, data: bytes) -> bool: + return check_bcc(data) + + def header_ok(self): + return b'5a01' + + def rx_struct_fmt(self): + return ">2sH" + + def etx_bytes(self): + return binascii.unhexlify('a5') + + def create_tx(self, command: bytes, value: bytes = b'') -> bytes: + # Value not used here in this reader + command_len = len(command).to_bytes(2, byteorder='big') + raw_data = self.tx_header() + command_len + command + checksum = self.compute_checksum(command_len + command) + return raw_data + checksum + self.etx_bytes() + + def parse_rx(self, data: bytes): + header_len = struct.calcsize(self.rx_struct_fmt()) + try: + header, data_len = struct.unpack(self.rx_struct_fmt(), data[:header_len]) + except struct.error: + return None, b'' + + if header.hex() in self.header_ok().decode(): + if self.check_checksum(data[header_len - 3:header_len + data_len + 1]): + return data[header_len:header_len + data_len], data[header_len + data_len + 2:] + return None, b'' + + # Command functions for M3YW that directly create the command and send + def cmd_get_sw_version(self): + command = b'T_OUT_CVER' + return self.send_and_parse(self.create_tx(command)) + + def cmd_set_continuous_mode(self): + command = b'S_CMD_020E' + return self.send_and_parse(self.create_tx(command)) + + def cmd_set_command_mode(self): + command = b'S_CMD_020D' + return self.send_and_parse(self.create_tx(command)) + + # Command functions for M3YW that directly create the command and send + # Always Off S_CMD_03L0 (Value = -1) + # Normal Mode S_CMD_03L2 (Value = 0) + # Always On S_CMD_03L1 (Value = 1) + def cmd_set_illumination(self, value: int = 0): + if value < 0: + command = b'S_CMD_03L0' + elif value == 0: + command = b'S_CMD_03L2' + elif value > 0: + command = b'S_CMD_03L1' + return self.send_and_parse(self.create_tx(command)) + + # Always Off S_CMD_03A0 (Value = -1) + # Normal Mode S_CMD_03A2 (Value = 0) + # Always On S_CMD_03A1 (Value = 1) + def cmd_set_aimer(self, value: int = 0): + if value < 0: + command = b'S_CMD_03A0' + elif value == 0: + command = b'S_CMD_03A2' + elif value > 0: + command = b'S_CMD_03A1' + + return self.send_and_parse(self.create_tx(command)) + + # Mute all S_CMD_04F0 (Value = -1) + # Unmute all S_CMD_04F1 (Value = 1) + def cmd_set_beeper(self, value: int = 0): + if value < 0: + command = b'S_CMD_04F0' + elif value == 0: + raise NotImplementedError + elif value > 0: + command = b'S_CMD_04F1' + return self.send_and_parse(self.create_tx(command)) + + def cmd_set_read_interval(self, value: float = 0): + command = b'S_CMD_MARR' + str(round(value*1000)).encode() + return self.send_and_parse(self.create_tx(command)) + + def cmd_set_same_barcode_delay(self, value: float = 0): + command = b'S_CMD_MA31' + self.send_and_parse(self.create_tx(command)) + command = b'S_CMD_MA41' + self.send_and_parse(self.create_tx(command)) + command = b'S_CMD_MARI' + str(round(value*1000)).encode() + return self.send_and_parse(self.create_tx(command)) + + def cmd_send_raw(self, value: str = ''): + command = value.encode() + return self.send_and_parse(self.create_tx(command)) + + def cmd_set_baudrate(self, value: int = 9600): + command = b'S_CMD_H3BR' + str(value).encode() + reply, extra = self.send_and_parse(self.create_tx(command, value)) + self.serial_port.baudrate = value + # Test to see if everything worked... + reply, extra = self.cmd_get_sw_version() + if reply: + return True, None + else: + return False, None + + def get_safe_for_binaryqr(self): + return True diff --git a/src/main.py b/src/main.py new file mode 100644 index 0000000..f958de5 --- /dev/null +++ b/src/main.py @@ -0,0 +1,164 @@ +import serial +import time +import argparse +import binascii +import sys +import os + +# Add current directory to path for imports +sys.path.insert(0, os.path.dirname(__file__)) + +from base_scanner import BaseScanner +from gm65_scanner import GM65Scanner +from m3yw_scanner import M3YWScanner +from utils import common_baud_rates + + +# ------------------------ +# Scanner Factory +# ------------------------ +def detect_scanner(serial_port) -> BaseScanner: + """ + Identify the scanner by sending a request for its software version. + """ + for scanner in [GM65Scanner, M3YWScanner]: + scanner = scanner(serial_port) + print("Trying", scanner.__class__.__name__) + foundbaud = scanner.find_baudrate() + if foundbaud: + print("Identified Scanner:", scanner.__class__.__name__, "at baudrate:", foundbaud) + return scanner + else: + print(scanner.__class__.__name__, "not detected") + + raise RuntimeError("No supported scanner found") + + +# ------------------------ +# Main Function +# ------------------------ +def main(): + parser = argparse.ArgumentParser(description="Scanner Interface") + parser.add_argument("port", help="Serial port to use") + parser.add_argument("--scanner", type=str, help="Scanner type (gm65 or m3y)") + parser.add_argument("--hw-version", action='store_true', help="Query the device for the hardware version") + parser.add_argument("--sw-version", action='store_true', help="Query the device for the software version") + parser.add_argument("--sw-year", action='store_true', help="Query the device for the software year") + parser.add_argument("--get-settings", action='store_true', help="Get the common (aim light, illumination, beeper) settings zone (GM65) and represent as hex") + parser.add_argument("--get-safe-for-binary-qr", help="Check if the connected reader is know to be safe for binary QR scanning") + parser.add_argument("--set-settings", help="Save the supplied byte to the common settings (aim light, illumination, beeper) zone (GM65)") + parser.add_argument("--get-address", help="Query a given memory address and return the result as a byte") + parser.add_argument("--set-address", nargs=2, help="Update a given memory address with a byte (Format ") + parser.add_argument("--save-address", help="Save a memory address so that the current setting is preserved across reboots.") + parser.add_argument("--set-illumination", type=int, help="Adjust the illumination light. -1 = always off, 0 = On while scanning, 1 = always on") + parser.add_argument("--set-aimer", type=int, help="Adjust the aiming light. -1 = always off, 0 = On while scanning, 1 = always on") + parser.add_argument("--set-beeper", type=int, help="Adjust the beeper. -1 = muted, 1 = on") + parser.add_argument("--set-read-interval", type=float, help="Adjust the minimum time between QR code reads") + parser.add_argument("--set-same-barcode-delay", type=float, help="Adjust the minimumt ime between re-reading the same QR code") + parser.add_argument("--send-raw-cmd", type=str, help="Send a raw command to the reader") + parser.add_argument("--save-settings", action='store_true', help="Save settings to EEPROM (Required for GM65 to persist settings across reboots)") + parser.add_argument("--set-continuous-mode", action='store_true', help="Put scanner in continious mode") + parser.add_argument("--set-command-mode", action='store_true', help="Put scanner in command mode (will stop continious mode)") + parser.add_argument("--set-baudrate", choices=common_baud_rates, help="Changes the scanners baudrate and checks if this was successful") + parser.add_argument("--baudrate", choices=common_baud_rates, help="Sets the baudrate that this tool will use (Default 9600)") + parser.add_argument("--test-baudrates", action='store_true', help="Runs through a list of common baud rates to see what your device supports (Or finds out what BAUD it is currently using)") + + args = parser.parse_args() + + baudrate = 9600 + if args.baudrate: + baudrate = args.baudrate + + ser = serial.Serial(args.port, baudrate, timeout=1) + + if args.scanner: + if "gm65" in args.scanner.lower(): + scanner = GM65Scanner(ser) + elif "m3y" in args.scanner.lower(): + scanner = M3YWScanner(ser) + else: + scanner = detect_scanner(ser) + + scan_duration = 1 + if args.hw_version: + reply, extra = scanner.cmd_get_hw_version() + elif args.sw_version: + reply, extra = scanner.cmd_get_sw_version() + elif args.sw_year: + reply, extra = scanner.cmd_get_sw_year() + elif args.get_settings: + reply, extra = scanner.cmd_get_settings() + elif args.set_settings: + reply, extra = scanner.cmd_set_settings(args.set_settings.encode()) + elif args.get_address: + reply, extra = scanner.cmd_get_address(args.get_address.encode()) + elif args.set_address: + reply, extra = scanner.cmd_set_address(args.set_address[0].encode(), args.set_address[1].encode()) + elif args.save_address: + reply, extra = scanner.cmd_save_address(args.save_address.encode()) + elif args.save_settings: + reply, extra = scanner.cmd_save_settings() + elif args.set_illumination is not None: + print("Setting Illumination") + reply, extra = scanner.cmd_set_illumination(args.set_illumination) + elif args.set_aimer is not None: + print("Setting Aimer") + reply, extra = scanner.cmd_set_aimer(args.set_aimer) + elif args.set_beeper is not None: + print("Setting Beeper") + reply, extra = scanner.cmd_set_beeper(args.set_beeper) + elif args.set_read_interval is not None: + print("Setting Read Interval") + reply, extra = scanner.cmd_set_read_interval(args.set_read_interval) + elif args.set_same_barcode_delay is not None: + print("Setting Same Barcode Delay") + reply, extra = scanner.cmd_set_same_barcode_delay(args.set_same_barcode_delay) + elif args.send_raw_cmd: + print("Sending raw command") + reply, extra = scanner.cmd_send_raw(args.send_raw_cmd) + elif args.set_continuous_mode: + print("Setting Continuous Mode") + reply, extra = scanner.cmd_set_continuous_mode() + elif args.set_command_mode: + print("Setting Command Mode") + reply, extra = scanner.cmd_set_command_mode() + elif args.set_baudrate is not None: + print("Setting Baud Rate") + reply, extra = scanner.cmd_set_baudrate(int(args.set_baudrate)) + if reply: + print("Baudrate Changed Successfully!") + else: + print("Baudrate Change Failed...") + elif args.test_baudrates: + scanner.test_baudrates() + elif args.get_safe_for_binary_qr: + safe = scanner.get_safe_for_binaryqr() + if safe is not None: + if safe: + print("Good News: Safe to use") + else: + print("WARNING: Known to be unsafe for binary QR scanning") + else: + print("Unsure... Unable to match software version as known-good or known-bad...") + + else: + print("Setting Continuous Mode") + reply, extra = scanner.cmd_set_continuous_mode() + + print("Scanning for 10 Seconds") + scan_duration = 10 + # Keep scanning + start = time.time() + rx_data = b'' + while (time.time() - start) <= scan_duration: + rx_data += ser.read(1024) + + print("Setting Command Mode") + reply, extra = scanner.cmd_set_command_mode() + print("Got:", rx_data, "AsHex:", binascii.hexlify(rx_data)) + + ser.close() + + +if __name__ == "__main__": + main() diff --git a/src/utils.py b/src/utils.py new file mode 100644 index 0000000..3ae3c7c --- /dev/null +++ b/src/utils.py @@ -0,0 +1,54 @@ +# ------------------------ +# Useful constants +# ------------------------ +# Not exhaustive, but supported by both the M3Y and GM65 +common_baud_rates = [ + '9600', + '14400', + '19200', + '38400', + '57600', + '115200', +] + +# ------------------------ +# Utility Functions +# ------------------------ +def compute_crc16_xmodem(data: bytes) -> bytes: + crc = 0 + for byte in data: + for i in range(7, -1, -1): + crc *= 2 + if (crc & 0x10000) != 0: + crc ^= 0x11021 + if (byte & (1 << i)) != 0: + crc ^= 0x1021 + crc &= 0xFFFF + return crc.to_bytes(2, byteorder='big') + +def check_crc16_xmodem(data_with_crc: bytes) -> bool: + if len(data_with_crc) < 3: + return False + data = data_with_crc[:-2] + received_crc = data_with_crc[-2:] + calculated_crc = compute_crc16_xmodem(data) + return received_crc == calculated_crc + +def compute_bcc(data: bytes) -> bytes: + bcc = 0 + for byte in data: + bcc ^= byte + return bytes([bcc]) + +def check_bcc(data_with_bcc: bytes) -> bool: + if len(data_with_bcc) < 2: + return False + data = data_with_bcc[:-1] + received_bcc = data_with_bcc[-1:] + calculated_bcc = compute_bcc(data) + return received_bcc == calculated_bcc + +def set_bit(val, bit): return val | (1 << bit) +def clear_bit(val, bit): return val & ~(1 << bit) +def toggle_bit(val, bit): return val ^ (1 << bit) +def check_bit(val, bit): return (val >> bit) & 1 From 09988beeed624e9b3c2393d683b00ba8888724e6 Mon Sep 17 00:00:00 2001 From: k9ert Date: Sun, 8 Jun 2025 14:37:19 +0200 Subject: [PATCH 2/4] chore: tests also for gm65 --- pytest.ini | 14 +++ requirements.txt | 5 + run_tests.py | 49 +++++++++ tests/__init__.py | 1 + tests/conftest.py | 26 +++++ tests/test_gm65_scanner.py | 83 +++++++++++++++ tests/test_utils.py | 204 +++++++++++++++++++++++++++++++++++++ 7 files changed, 382 insertions(+) create mode 100644 pytest.ini create mode 100644 requirements.txt create mode 100755 run_tests.py create mode 100644 tests/__init__.py create mode 100644 tests/conftest.py create mode 100644 tests/test_gm65_scanner.py create mode 100644 tests/test_utils.py diff --git a/pytest.ini b/pytest.ini new file mode 100644 index 0000000..fc45895 --- /dev/null +++ b/pytest.ini @@ -0,0 +1,14 @@ +[tool:pytest] +testpaths = tests +python_files = test_*.py +python_classes = Test* +python_functions = test_* +addopts = + -v + --tb=short + --strict-markers + --disable-warnings +markers = + integration: marks tests as integration tests (require real hardware) + unit: marks tests as unit tests (no hardware required) + slow: marks tests as slow running diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..2a9f528 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,5 @@ +# Core dependencies +pyserial>=3.5 + +# Testing dependencies +pytest>=7.0.0 diff --git a/run_tests.py b/run_tests.py new file mode 100755 index 0000000..8277e4f --- /dev/null +++ b/run_tests.py @@ -0,0 +1,49 @@ +#!/usr/bin/env python3 +""" +Test runner script for SerialBarcodeReaderTools + +Usage: + python run_tests.py # Run unit tests only + python run_tests.py --integration # Run integration tests (requires hardware) + python run_tests.py --all # Run all tests +""" + +import sys +import subprocess +import argparse + + +def run_command(cmd): + """Run a command and return the result.""" + print(f"Running: {' '.join(cmd)}") + result = subprocess.run(cmd) + return result.returncode + + +def main(): + parser = argparse.ArgumentParser(description="Run tests for SerialBarcodeReaderTools") + parser.add_argument("--integration", action="store_true", + help="Run integration tests (requires real GM65 hardware on /dev/ttyACM0)") + parser.add_argument("--all", action="store_true", + help="Run all tests including integration") + + args = parser.parse_args() + + # Base pytest command + cmd = ["python", "-m", "pytest", "-v"] + + if args.integration: + cmd.extend(["-m", "integration"]) + print("Running integration tests (requires GM65 scanner on /dev/ttyACM0)...") + elif args.all: + print("Running all tests...") + else: + # Run only unit tests (exclude integration) + cmd.extend(["-m", "not integration"]) + print("Running unit tests only...") + + return run_command(cmd) + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/tests/__init__.py b/tests/__init__.py new file mode 100644 index 0000000..d8c2d78 --- /dev/null +++ b/tests/__init__.py @@ -0,0 +1 @@ +# Test package for SerialBarcodeReaderTools diff --git a/tests/conftest.py b/tests/conftest.py new file mode 100644 index 0000000..1957fd7 --- /dev/null +++ b/tests/conftest.py @@ -0,0 +1,26 @@ +import pytest +import sys +import os + +# Add src directory to path +sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..', 'src')) + +from gm65_scanner import GM65Scanner + + +@pytest.fixture +def real_serial_port(): + """Fixture for real serial port testing (requires actual hardware).""" + import serial + try: + port = serial.Serial('/dev/ttyACM0', 9600, timeout=1) + yield port + port.close() + except (serial.SerialException, FileNotFoundError): + pytest.skip("Real serial port /dev/ttyACM0 not available") + + +@pytest.fixture +def real_gm65_scanner(real_serial_port): + """Create a GM65Scanner instance with real serial port.""" + return GM65Scanner(real_serial_port) diff --git a/tests/test_gm65_scanner.py b/tests/test_gm65_scanner.py new file mode 100644 index 0000000..f9ec462 --- /dev/null +++ b/tests/test_gm65_scanner.py @@ -0,0 +1,83 @@ +import pytest +import binascii +import sys +import os + +# Add src directory to path +sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..', 'src')) + +from gm65_scanner import GM65Scanner + + +class TestGM65ScannerIntegration: + """Integration tests with real hardware (requires actual GM65 scanner).""" + + @pytest.mark.integration + def test_real_sw_version(self, real_gm65_scanner): + """Test software version query with real hardware.""" + reply, extra = real_gm65_scanner.cmd_get_sw_version() + + assert reply is not None + assert len(reply) >= 1 + print(f"Software version: {binascii.hexlify(reply)}") + + @pytest.mark.integration + def test_real_sw_year(self, real_gm65_scanner): + """Test software year query with real hardware.""" + reply, extra = real_gm65_scanner.cmd_get_sw_year() + + assert reply is not None + assert len(reply) >= 1 + print(f"Software year: {binascii.hexlify(reply)}") + + @pytest.mark.integration + def test_real_get_settings(self, real_gm65_scanner): + """Test get settings with real hardware.""" + reply, extra = real_gm65_scanner.cmd_get_settings() + + assert reply is not None + assert len(reply) >= 1 + print(f"Current settings: {binascii.hexlify(reply)}") + + @pytest.mark.integration + def test_real_raw_command(self, real_gm65_scanner): + """Test raw command with real hardware.""" + reply, extra = real_gm65_scanner.cmd_send_raw('070100e201') + + assert reply is not None + assert len(reply) >= 1 + print(f"Raw command response: {binascii.hexlify(reply)}") + + @pytest.mark.integration + def test_real_illumination_control(self, real_gm65_scanner): + """Test illumination control with real hardware.""" + # Get current settings first + original_settings, _ = real_gm65_scanner.cmd_get_settings() + + # Test setting illumination to always on + success, _ = real_gm65_scanner.cmd_set_illumination(1) + assert success is True + + # Get settings after change + new_settings, _ = real_gm65_scanner.cmd_get_settings() + + print(f"Original settings: {binascii.hexlify(original_settings)}") + print(f"New settings: {binascii.hexlify(new_settings)}") + + # The command should execute successfully even if settings don't change + # (illumination might already be set to the requested value) + assert new_settings is not None + assert len(new_settings) >= 1 + + @pytest.mark.integration + def test_real_baudrate_detection(self, real_gm65_scanner): + """Test baudrate detection with real hardware.""" + baudrate = real_gm65_scanner.find_baudrate() + + assert baudrate is not None + assert baudrate in ['9600', '14400', '19200', '38400', '57600', '115200'] + print(f"Detected baudrate: {baudrate}") + + +if __name__ == "__main__": + pytest.main([__file__]) diff --git a/tests/test_utils.py b/tests/test_utils.py new file mode 100644 index 0000000..25bc309 --- /dev/null +++ b/tests/test_utils.py @@ -0,0 +1,204 @@ +import pytest +import binascii +import sys +import os + +# Add src directory to path +sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..', 'src')) + +from utils import ( + compute_crc16_xmodem, + check_crc16_xmodem, + compute_bcc, + check_bcc, + set_bit, + clear_bit, + toggle_bit, + check_bit, + common_baud_rates +) + + +class TestCRC16Functions: + """Test CRC16-XMODEM functions.""" + + def test_compute_crc16_xmodem(self): + """Test CRC16-XMODEM computation.""" + # Test with known data + data = binascii.unhexlify('070100e201') + crc = compute_crc16_xmodem(data) + + assert len(crc) == 2 + assert isinstance(crc, bytes) + + # Test with empty data + empty_crc = compute_crc16_xmodem(b'') + assert len(empty_crc) == 2 + + def test_check_crc16_xmodem_valid(self): + """Test CRC16 validation with valid data.""" + # Known good data with valid CRC + data_with_crc = binascii.unhexlify('070100e2017791') + assert check_crc16_xmodem(data_with_crc) is True + + def test_check_crc16_xmodem_invalid(self): + """Test CRC16 validation with invalid data.""" + # Data with wrong CRC + data_with_bad_crc = binascii.unhexlify('070100e201ffff') + assert check_crc16_xmodem(data_with_bad_crc) is False + + def test_check_crc16_xmodem_too_short(self): + """Test CRC16 validation with too short data.""" + short_data = b'ab' + assert check_crc16_xmodem(short_data) is False + + def test_crc16_roundtrip(self): + """Test CRC16 computation and validation roundtrip.""" + test_data = b'Hello, World!' + crc = compute_crc16_xmodem(test_data) + data_with_crc = test_data + crc + + assert check_crc16_xmodem(data_with_crc) is True + + +class TestBCCFunctions: + """Test Block Check Character (BCC) functions.""" + + def test_compute_bcc(self): + """Test BCC computation.""" + data = b'test' + bcc = compute_bcc(data) + + assert len(bcc) == 1 + assert isinstance(bcc, bytes) + + # Manual calculation: t(0x74) ^ e(0x65) ^ s(0x73) ^ t(0x74) = 0x06 + expected = bytes([0x74 ^ 0x65 ^ 0x73 ^ 0x74]) + assert bcc == expected + + def test_compute_bcc_empty(self): + """Test BCC computation with empty data.""" + bcc = compute_bcc(b'') + assert bcc == bytes([0]) + + def test_check_bcc_valid(self): + """Test BCC validation with valid data.""" + data = b'test' + bcc = compute_bcc(data) + data_with_bcc = data + bcc + + assert check_bcc(data_with_bcc) is True + + def test_check_bcc_invalid(self): + """Test BCC validation with invalid data.""" + data_with_bad_bcc = b'test\xff' + assert check_bcc(data_with_bad_bcc) is False + + def test_check_bcc_too_short(self): + """Test BCC validation with too short data.""" + short_data = b'a' + assert check_bcc(short_data) is False + + def test_bcc_roundtrip(self): + """Test BCC computation and validation roundtrip.""" + test_data = b'Hello, BCC!' + bcc = compute_bcc(test_data) + data_with_bcc = test_data + bcc + + assert check_bcc(data_with_bcc) is True + + +class TestBitOperations: + """Test bit manipulation functions.""" + + def test_set_bit(self): + """Test setting bits.""" + value = 0b00000000 + + # Set bit 0 + result = set_bit(value, 0) + assert result == 0b00000001 + + # Set bit 7 + result = set_bit(value, 7) + assert result == 0b10000000 + + # Set bit that's already set + value = 0b00000001 + result = set_bit(value, 0) + assert result == 0b00000001 + + def test_clear_bit(self): + """Test clearing bits.""" + value = 0b11111111 + + # Clear bit 0 + result = clear_bit(value, 0) + assert result == 0b11111110 + + # Clear bit 7 + result = clear_bit(value, 7) + assert result == 0b01111111 + + # Clear bit that's already clear + value = 0b11111110 + result = clear_bit(value, 0) + assert result == 0b11111110 + + def test_toggle_bit(self): + """Test toggling bits.""" + value = 0b10101010 + + # Toggle bit 0 (currently 0) + result = toggle_bit(value, 0) + assert result == 0b10101011 + + # Toggle bit 1 (currently 1) + result = toggle_bit(value, 1) + assert result == 0b10101000 + + def test_check_bit(self): + """Test checking bit values.""" + value = 0b10101010 + + # Check set bits + assert check_bit(value, 1) == 1 + assert check_bit(value, 3) == 1 + assert check_bit(value, 5) == 1 + assert check_bit(value, 7) == 1 + + # Check clear bits + assert check_bit(value, 0) == 0 + assert check_bit(value, 2) == 0 + assert check_bit(value, 4) == 0 + assert check_bit(value, 6) == 0 + + def test_bit_operations_edge_cases(self): + """Test bit operations with edge cases.""" + # Test with 0 + assert set_bit(0, 0) == 1 + assert clear_bit(0, 0) == 0 + assert toggle_bit(0, 0) == 1 + assert check_bit(0, 0) == 0 + + # Test with 255 (all bits set) + assert set_bit(255, 0) == 255 + assert clear_bit(255, 0) == 254 + assert toggle_bit(255, 0) == 254 + assert check_bit(255, 0) == 1 + + +class TestConstants: + """Test utility constants.""" + + def test_common_baud_rates(self): + """Test common baud rates constant.""" + expected_rates = ['9600', '14400', '19200', '38400', '57600', '115200'] + + assert common_baud_rates == expected_rates + assert len(common_baud_rates) == 6 + assert all(isinstance(rate, str) for rate in common_baud_rates) + + +if __name__ == "__main__": + pytest.main([__file__]) From 7ad92bb9451e30a6da35730fded668216a53fa99 Mon Sep 17 00:00:00 2001 From: k9ert Date: Sun, 8 Jun 2025 15:22:10 +0200 Subject: [PATCH 3/4] chore: tests for m3y --- README.md | 36 +++++- src/base_scanner.py | 18 +++ src/m3yw_scanner.py | 48 ++++++- tests/conftest.py | 7 + tests/test_m3yw_scanner.py | 253 +++++++++++++++++++++++++++++++++++++ 5 files changed, 357 insertions(+), 5 deletions(-) create mode 100644 tests/test_m3yw_scanner.py diff --git a/README.md b/README.md index 25dd878..bcd62bb 100644 --- a/README.md +++ b/README.md @@ -6,14 +6,46 @@ This script is quite chatty in the console, to let you see exactly what is going It is not supposed to be exhaustive, but rather to demonstrate a basic set of functionality to get you started with the device. Supported Readers: -* GM65 (and GM65S) +* GM65 (and GM65S) * GM805 * M3Y-W - + _All of the commands below first attempt to auto-detect the reader you are using..._ There are also some manuals for these devices in the manuals folder of this repository. +## Command Support Matrix + +The following table shows which commands are supported by each scanner implementation: + +| Command | GM65/GM805 | M3Y-W | Notes | +|---------|------------|-------|-------| +| **Query Commands** | | | | +| `--hw-version` | ✅ Supported | ❌ Not Supported | M3Y-W only has firmware version | +| `--sw-version` | ✅ Supported | ✅ Supported | Available on all scanners | +| `--sw-year` | ✅ Supported | ❌ Not Supported | M3Y-W doesn't separate year | +| `--get-settings` | ✅ Supported | ❌ Not Supported | M3Y-W uses individual commands | +| **Configuration Commands** | | | | +| `--set-settings` | ✅ Supported | ❌ Not Supported | M3Y-W uses individual commands | +| `--save-settings` | ✅ Supported | ❌ Not Supported | M3Y-W auto-saves settings | +| `--set-illumination` | ✅ Supported | ✅ Supported | Available on all scanners | +| `--set-aimer` | ✅ Supported | ✅ Supported | Available on all scanners | +| `--set-beeper` | ✅ Supported | ✅ Supported | Available on all scanners | +| `--set-read-interval` | ✅ Supported | ✅ Supported | Available on all scanners | +| `--set-same-barcode-delay` | ✅ Supported | ✅ Supported | Available on all scanners | +| `--set-continuous-mode` | ✅ Supported | ✅ Supported | Available on all scanners | +| `--set-command-mode` | ✅ Supported | ✅ Supported | Available on all scanners | +| `--set-baudrate` | ✅ Supported | ✅ Supported | Available on all scanners | +| **Utility Commands** | | | | +| `--send-raw-cmd` | ✅ Supported | ✅ Supported | Available on all scanners | +| `--test-baudrates` | ✅ Supported | ✅ Supported | Available on all scanners | + +**Legend:** +- ✅ **Supported**: Command is implemented and works with this scanner +- ❌ **Not Supported**: Command is not available for this scanner type (returns "Not Supported" message) + +**Note**: When a command is not supported by a scanner, the tool will display a clear "Not Supported" message rather than failing silently. + # Setup ## Requirements This module uses pyserial, so that needs to be installed via pip diff --git a/src/base_scanner.py b/src/base_scanner.py index 45b9002..0bfe4e2 100644 --- a/src/base_scanner.py +++ b/src/base_scanner.py @@ -9,6 +9,12 @@ from utils import common_baud_rates +# Custom exceptions +class NotSupportedError(Exception): + """Raised when a feature is not supported by the specific scanner model.""" + pass + + # ------------------------ # Abstract Scanner Class # ------------------------ @@ -63,12 +69,24 @@ def etx_bytes(self) -> bytes: def send_and_parse(self, tx_data): print("Sent (Raw):", tx_data, "AsHex:", binascii.hexlify(tx_data)) self.serial_port.write(tx_data) + + # Give device time to respond + import time + time.sleep(0.1) + rx_data = self.serial_port.read(1024) print("Got (Raw):", rx_data, "AsHex:", binascii.hexlify(rx_data)) + print(f"Received {len(rx_data)} bytes") + + if len(rx_data) > 0: + print("Raw bytes:", [hex(b) for b in rx_data]) + reply, extra = self.parse_rx(rx_data) if reply: print("Reply:", reply, "AsHex:", binascii.hexlify(reply)) print("Extra:", extra, "AsHex:", binascii.hexlify(extra)) + else: + print("Parse failed - no valid reply extracted") return reply, extra # Placeholder command methods diff --git a/src/m3yw_scanner.py b/src/m3yw_scanner.py index bd82943..4353e2c 100644 --- a/src/m3yw_scanner.py +++ b/src/m3yw_scanner.py @@ -6,7 +6,7 @@ # Add current directory to path for imports sys.path.insert(0, os.path.dirname(__file__)) -from base_scanner import BaseScanner +from base_scanner import BaseScanner, NotSupportedError from utils import compute_bcc, check_bcc @@ -43,6 +43,9 @@ def create_tx(self, command: bytes, value: bytes = b'') -> bytes: return raw_data + checksum + self.etx_bytes() def parse_rx(self, data: bytes): + if len(data) == 0: + return None, b'' + header_len = struct.calcsize(self.rx_struct_fmt()) try: header, data_len = struct.unpack(self.rx_struct_fmt(), data[:header_len]) @@ -51,14 +54,53 @@ def parse_rx(self, data: bytes): if header.hex() in self.header_ok().decode(): if self.check_checksum(data[header_len - 3:header_len + data_len + 1]): - return data[header_len:header_len + data_len], data[header_len + data_len + 2:] + response_data = data[header_len:header_len + data_len] + + # Check for M3Y-W status codes + if len(response_data) == 2: + status_hex = response_data.hex().upper() + if status_hex == '9000': + print(f"M3Y-W Status: SUCCESS (9000)") + return response_data, data[header_len + data_len + 2:] + elif status_hex == '6A89': + print(f"M3Y-W Status: FAILURE (6A89)") + return response_data, data[header_len + data_len + 2:] + + return response_data, data[header_len + data_len + 2:] return None, b'' # Command functions for M3YW that directly create the command and send + def cmd_enable_configuration(self): + """Enable configuration mode - required before sending config commands""" + command = b'S_CMD_0001' + return self.send_and_parse(self.create_tx(command)) + def cmd_get_sw_version(self): + # Try enabling configuration first + self.cmd_enable_configuration() command = b'T_OUT_CVER' return self.send_and_parse(self.create_tx(command)) + def cmd_get_hw_version(self): + """M3Y-W does not have a separate hardware version command.""" + raise NotSupportedError("M3Y-W scanner does not support hardware version query") + + def cmd_get_sw_year(self): + """M3Y-W does not have a separate software year command.""" + raise NotSupportedError("M3Y-W scanner does not support software year query") + + def cmd_get_settings(self): + """M3Y-W does not have a single 'get settings' command like GM65.""" + raise NotSupportedError("M3Y-W scanner does not support bulk settings query") + + def cmd_set_settings(self, value: bytes = b''): + """M3Y-W does not have a single 'set settings' command like GM65.""" + raise NotSupportedError("M3Y-W scanner does not support bulk settings modification") + + def cmd_save_settings(self): + """M3Y-W does not require explicit settings save like GM65.""" + raise NotSupportedError("M3Y-W scanner does not support/require explicit settings save") + def cmd_set_continuous_mode(self): command = b'S_CMD_020E' return self.send_and_parse(self.create_tx(command)) @@ -122,7 +164,7 @@ def cmd_send_raw(self, value: str = ''): def cmd_set_baudrate(self, value: int = 9600): command = b'S_CMD_H3BR' + str(value).encode() - reply, extra = self.send_and_parse(self.create_tx(command, value)) + reply, extra = self.send_and_parse(self.create_tx(command)) self.serial_port.baudrate = value # Test to see if everything worked... reply, extra = self.cmd_get_sw_version() diff --git a/tests/conftest.py b/tests/conftest.py index 1957fd7..12320f9 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -6,6 +6,7 @@ sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..', 'src')) from gm65_scanner import GM65Scanner +from m3yw_scanner import M3YWScanner @pytest.fixture @@ -24,3 +25,9 @@ def real_serial_port(): def real_gm65_scanner(real_serial_port): """Create a GM65Scanner instance with real serial port.""" return GM65Scanner(real_serial_port) + + +@pytest.fixture +def real_m3yw_scanner(real_serial_port): + """Create a M3YWScanner instance with real serial port.""" + return M3YWScanner(real_serial_port) diff --git a/tests/test_m3yw_scanner.py b/tests/test_m3yw_scanner.py new file mode 100644 index 0000000..9189c3d --- /dev/null +++ b/tests/test_m3yw_scanner.py @@ -0,0 +1,253 @@ +import pytest +import binascii +import sys +import os + +# Add src directory to path +sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..', 'src')) + +from m3yw_scanner import M3YWScanner +from base_scanner import NotSupportedError + + +class TestM3YWScannerIntegration: + """Integration tests with real M3Y-W hardware (requires actual M3Y-W scanner).""" + + @pytest.mark.integration + def test_real_sw_version(self, real_m3yw_scanner): + """Test software version query with real M3Y-W hardware.""" + reply, extra = real_m3yw_scanner.cmd_get_sw_version() + + # This should not be None if the method is implemented + assert reply is not None, "cmd_get_sw_version returned None - method may not be implemented" + assert len(reply) >= 1, "Software version response should contain at least 1 byte" + print(f"M3Y-W Software version: {reply}") + print(f"M3Y-W Software version (hex): {binascii.hexlify(reply)}") + + @pytest.mark.integration + def test_real_hw_version(self, real_m3yw_scanner): + """Test hardware version query with real M3Y-W hardware.""" + with pytest.raises(NotSupportedError): + real_m3yw_scanner.cmd_get_hw_version() + print("✓ M3Y-W correctly reports hardware version query as not supported") + + @pytest.mark.integration + def test_real_sw_year(self, real_m3yw_scanner): + """Test software year query with real M3Y-W hardware.""" + with pytest.raises(NotSupportedError): + real_m3yw_scanner.cmd_get_sw_year() + print("✓ M3Y-W correctly reports software year query as not supported") + + @pytest.mark.integration + def test_real_get_settings(self, real_m3yw_scanner): + """Test get settings with real M3Y-W hardware.""" + with pytest.raises(NotSupportedError): + real_m3yw_scanner.cmd_get_settings() + print("✓ M3Y-W correctly reports bulk settings query as not supported") + + @pytest.mark.integration + def test_real_set_settings(self, real_m3yw_scanner): + """Test set settings with real M3Y-W hardware.""" + with pytest.raises(NotSupportedError): + real_m3yw_scanner.cmd_set_settings(b'\x00') + print("✓ M3Y-W correctly reports bulk settings modification as not supported") + + @pytest.mark.integration + def test_real_save_settings(self, real_m3yw_scanner): + """Test save settings with real M3Y-W hardware.""" + with pytest.raises(NotSupportedError): + real_m3yw_scanner.cmd_save_settings() + print("✓ M3Y-W correctly reports explicit settings save as not supported") + + @pytest.mark.integration + def test_real_continuous_mode(self, real_m3yw_scanner): + """Test setting continuous mode with real M3Y-W hardware.""" + try: + reply, extra = real_m3yw_scanner.cmd_set_continuous_mode() + + # For M3Y-W, this should return some response or at least not None + print(f"M3Y-W Set continuous mode response: {reply}") + if reply is not None: + print(f"M3Y-W Set continuous mode (hex): {binascii.hexlify(reply)}") + + # The command should execute without raising NotImplementedError + assert True, "cmd_set_continuous_mode executed successfully" + + except NotImplementedError: + pytest.fail("cmd_set_continuous_mode is not implemented for M3Y-W scanner") + + @pytest.mark.integration + def test_real_command_mode(self, real_m3yw_scanner): + """Test setting command mode with real M3Y-W hardware.""" + try: + reply, extra = real_m3yw_scanner.cmd_set_command_mode() + + print(f"M3Y-W Set command mode response: {reply}") + if reply is not None: + print(f"M3Y-W Set command mode (hex): {binascii.hexlify(reply)}") + + # The command should execute without raising NotImplementedError + assert True, "cmd_set_command_mode executed successfully" + + except NotImplementedError: + pytest.fail("cmd_set_command_mode is not implemented for M3Y-W scanner") + + @pytest.mark.integration + def test_real_illumination_control(self, real_m3yw_scanner): + """Test illumination control with real M3Y-W hardware.""" + try: + # Test setting illumination to always on + reply, extra = real_m3yw_scanner.cmd_set_illumination(1) + + print(f"M3Y-W Set illumination (always on) response: {reply}") + if reply is not None: + print(f"M3Y-W Set illumination (hex): {binascii.hexlify(reply)}") + + # Test setting illumination to normal mode + reply2, extra2 = real_m3yw_scanner.cmd_set_illumination(0) + + print(f"M3Y-W Set illumination (normal) response: {reply2}") + if reply2 is not None: + print(f"M3Y-W Set illumination normal (hex): {binascii.hexlify(reply2)}") + + # The command should execute without raising NotImplementedError + assert True, "cmd_set_illumination executed successfully" + + except NotImplementedError: + pytest.fail("cmd_set_illumination is not implemented for M3Y-W scanner") + + @pytest.mark.integration + def test_real_aimer_control(self, real_m3yw_scanner): + """Test aimer control with real M3Y-W hardware.""" + try: + # Test setting aimer to always on + reply, extra = real_m3yw_scanner.cmd_set_aimer(1) + + print(f"M3Y-W Set aimer (always on) response: {reply}") + if reply is not None: + print(f"M3Y-W Set aimer (hex): {binascii.hexlify(reply)}") + + # The command should execute without raising NotImplementedError + assert True, "cmd_set_aimer executed successfully" + + except NotImplementedError: + pytest.fail("cmd_set_aimer is not implemented for M3Y-W scanner") + + @pytest.mark.integration + def test_real_beeper_control(self, real_m3yw_scanner): + """Test beeper control with real M3Y-W hardware.""" + try: + # Test unmuting beeper + reply, extra = real_m3yw_scanner.cmd_set_beeper(1) + + print(f"M3Y-W Set beeper (unmute) response: {reply}") + if reply is not None: + print(f"M3Y-W Set beeper (hex): {binascii.hexlify(reply)}") + + # The command should execute without raising NotImplementedError + assert True, "cmd_set_beeper executed successfully" + + except NotImplementedError: + pytest.fail("cmd_set_beeper is not implemented for M3Y-W scanner") + + @pytest.mark.integration + def test_real_read_interval(self, real_m3yw_scanner): + """Test read interval setting with real M3Y-W hardware.""" + try: + # Test setting read interval to 1 second + reply, extra = real_m3yw_scanner.cmd_set_read_interval(1.0) + + print(f"M3Y-W Set read interval response: {reply}") + if reply is not None: + print(f"M3Y-W Set read interval (hex): {binascii.hexlify(reply)}") + + # The command should execute without raising NotImplementedError + assert True, "cmd_set_read_interval executed successfully" + + except NotImplementedError: + pytest.fail("cmd_set_read_interval is not implemented for M3Y-W scanner") + + @pytest.mark.integration + def test_real_same_barcode_delay(self, real_m3yw_scanner): + """Test same barcode delay setting with real M3Y-W hardware.""" + try: + # Test setting same barcode delay to 2 seconds + reply, extra = real_m3yw_scanner.cmd_set_same_barcode_delay(2.0) + + print(f"M3Y-W Set same barcode delay response: {reply}") + if reply is not None: + print(f"M3Y-W Set same barcode delay (hex): {binascii.hexlify(reply)}") + + # The command should execute without raising NotImplementedError + assert True, "cmd_set_same_barcode_delay executed successfully" + + except NotImplementedError: + pytest.fail("cmd_set_same_barcode_delay is not implemented for M3Y-W scanner") + + @pytest.mark.integration + def test_real_raw_command(self, real_m3yw_scanner): + """Test raw command sending with real M3Y-W hardware.""" + try: + # Test sending the software version command as raw + reply, extra = real_m3yw_scanner.cmd_send_raw('T_OUT_CVER') + + assert reply is not None, "cmd_send_raw returned None - method may not be implemented" + assert len(reply) >= 1, "Raw command response should contain at least 1 byte" + print(f"M3Y-W Raw command response: {reply}") + print(f"M3Y-W Raw command (hex): {binascii.hexlify(reply)}") + + except NotImplementedError: + pytest.fail("cmd_send_raw is not implemented for M3Y-W scanner") + + @pytest.mark.integration + def test_real_baudrate_control(self, real_m3yw_scanner): + """Test baudrate control with real M3Y-W hardware.""" + try: + # Test setting baudrate (this might change the connection) + # We'll test with the current baudrate to avoid breaking the connection + current_baudrate = real_m3yw_scanner.serial_port.baudrate + + reply, extra = real_m3yw_scanner.cmd_set_baudrate(current_baudrate) + + print(f"M3Y-W Set baudrate response: {reply}") + if reply is not None: + print(f"M3Y-W Set baudrate (hex): {binascii.hexlify(reply)}") + + # The command should execute without raising NotImplementedError + assert True, "cmd_set_baudrate executed successfully" + + except NotImplementedError: + pytest.fail("cmd_set_baudrate is not implemented for M3Y-W scanner") + + @pytest.mark.integration + def test_real_baudrate_detection(self, real_m3yw_scanner): + """Test baudrate detection with real M3Y-W hardware.""" + try: + baudrate = real_m3yw_scanner.find_baudrate() + + if baudrate is not None: + assert baudrate in ['9600', '14400', '19200', '38400', '57600', '115200'] + print(f"M3Y-W Detected baudrate: {baudrate}") + else: + print("M3Y-W Baudrate detection returned None") + + except NotImplementedError: + pytest.fail("find_baudrate is not implemented for M3Y-W scanner") + + @pytest.mark.integration + def test_real_safe_for_binary_qr(self, real_m3yw_scanner): + """Test binary QR safety check with real M3Y-W hardware.""" + try: + safe = real_m3yw_scanner.get_safe_for_binaryqr() + + print(f"M3Y-W Safe for binary QR: {safe}") + + # Should return True, False, or None + assert safe in [True, False, None], "get_safe_for_binaryqr should return True, False, or None" + + except NotImplementedError: + pytest.fail("get_safe_for_binaryqr is not implemented for M3Y-W scanner") + + +if __name__ == "__main__": + pytest.main([__file__]) From 014d3ac06d0c1577487c4e9929662ae9bd42282a Mon Sep 17 00:00:00 2001 From: k9ert Date: Sun, 8 Jun 2025 15:32:51 +0200 Subject: [PATCH 4/4] feat: proper handling of not supported in the main.py --- src/base_scanner.py | 12 ---- src/main.py | 155 +++++++++++++++++++++++--------------------- 2 files changed, 81 insertions(+), 86 deletions(-) diff --git a/src/base_scanner.py b/src/base_scanner.py index 0bfe4e2..f920352 100644 --- a/src/base_scanner.py +++ b/src/base_scanner.py @@ -69,24 +69,12 @@ def etx_bytes(self) -> bytes: def send_and_parse(self, tx_data): print("Sent (Raw):", tx_data, "AsHex:", binascii.hexlify(tx_data)) self.serial_port.write(tx_data) - - # Give device time to respond - import time - time.sleep(0.1) - rx_data = self.serial_port.read(1024) print("Got (Raw):", rx_data, "AsHex:", binascii.hexlify(rx_data)) - print(f"Received {len(rx_data)} bytes") - - if len(rx_data) > 0: - print("Raw bytes:", [hex(b) for b in rx_data]) - reply, extra = self.parse_rx(rx_data) if reply: print("Reply:", reply, "AsHex:", binascii.hexlify(reply)) print("Extra:", extra, "AsHex:", binascii.hexlify(extra)) - else: - print("Parse failed - no valid reply extracted") return reply, extra # Placeholder command methods diff --git a/src/main.py b/src/main.py index f958de5..2bb2bfa 100644 --- a/src/main.py +++ b/src/main.py @@ -11,6 +11,7 @@ from base_scanner import BaseScanner from gm65_scanner import GM65Scanner from m3yw_scanner import M3YWScanner +from base_scanner import NotSupportedError from utils import common_baud_rates @@ -80,82 +81,88 @@ def main(): scanner = detect_scanner(ser) scan_duration = 1 - if args.hw_version: - reply, extra = scanner.cmd_get_hw_version() - elif args.sw_version: - reply, extra = scanner.cmd_get_sw_version() - elif args.sw_year: - reply, extra = scanner.cmd_get_sw_year() - elif args.get_settings: - reply, extra = scanner.cmd_get_settings() - elif args.set_settings: - reply, extra = scanner.cmd_set_settings(args.set_settings.encode()) - elif args.get_address: - reply, extra = scanner.cmd_get_address(args.get_address.encode()) - elif args.set_address: - reply, extra = scanner.cmd_set_address(args.set_address[0].encode(), args.set_address[1].encode()) - elif args.save_address: - reply, extra = scanner.cmd_save_address(args.save_address.encode()) - elif args.save_settings: - reply, extra = scanner.cmd_save_settings() - elif args.set_illumination is not None: - print("Setting Illumination") - reply, extra = scanner.cmd_set_illumination(args.set_illumination) - elif args.set_aimer is not None: - print("Setting Aimer") - reply, extra = scanner.cmd_set_aimer(args.set_aimer) - elif args.set_beeper is not None: - print("Setting Beeper") - reply, extra = scanner.cmd_set_beeper(args.set_beeper) - elif args.set_read_interval is not None: - print("Setting Read Interval") - reply, extra = scanner.cmd_set_read_interval(args.set_read_interval) - elif args.set_same_barcode_delay is not None: - print("Setting Same Barcode Delay") - reply, extra = scanner.cmd_set_same_barcode_delay(args.set_same_barcode_delay) - elif args.send_raw_cmd: - print("Sending raw command") - reply, extra = scanner.cmd_send_raw(args.send_raw_cmd) - elif args.set_continuous_mode: - print("Setting Continuous Mode") - reply, extra = scanner.cmd_set_continuous_mode() - elif args.set_command_mode: - print("Setting Command Mode") - reply, extra = scanner.cmd_set_command_mode() - elif args.set_baudrate is not None: - print("Setting Baud Rate") - reply, extra = scanner.cmd_set_baudrate(int(args.set_baudrate)) - if reply: - print("Baudrate Changed Successfully!") - else: - print("Baudrate Change Failed...") - elif args.test_baudrates: - scanner.test_baudrates() - elif args.get_safe_for_binary_qr: - safe = scanner.get_safe_for_binaryqr() - if safe is not None: - if safe: - print("Good News: Safe to use") + + try: + if args.hw_version: + reply, extra = scanner.cmd_get_hw_version() + elif args.sw_version: + reply, extra = scanner.cmd_get_sw_version() + elif args.sw_year: + reply, extra = scanner.cmd_get_sw_year() + elif args.get_settings: + reply, extra = scanner.cmd_get_settings() + elif args.set_settings: + reply, extra = scanner.cmd_set_settings(args.set_settings.encode()) + elif args.get_address: + reply, extra = scanner.cmd_get_address(args.get_address.encode()) + elif args.set_address: + reply, extra = scanner.cmd_set_address(args.set_address[0].encode(), args.set_address[1].encode()) + elif args.save_address: + reply, extra = scanner.cmd_save_address(args.save_address.encode()) + elif args.save_settings: + reply, extra = scanner.cmd_save_settings() + elif args.set_illumination is not None: + print("Setting Illumination") + reply, extra = scanner.cmd_set_illumination(args.set_illumination) + elif args.set_aimer is not None: + print("Setting Aimer") + reply, extra = scanner.cmd_set_aimer(args.set_aimer) + elif args.set_beeper is not None: + print("Setting Beeper") + reply, extra = scanner.cmd_set_beeper(args.set_beeper) + elif args.set_read_interval is not None: + print("Setting Read Interval") + reply, extra = scanner.cmd_set_read_interval(args.set_read_interval) + elif args.set_same_barcode_delay is not None: + print("Setting Same Barcode Delay") + reply, extra = scanner.cmd_set_same_barcode_delay(args.set_same_barcode_delay) + elif args.send_raw_cmd: + print("Sending raw command") + reply, extra = scanner.cmd_send_raw(args.send_raw_cmd) + elif args.set_continuous_mode: + print("Setting Continuous Mode") + reply, extra = scanner.cmd_set_continuous_mode() + elif args.set_command_mode: + print("Setting Command Mode") + reply, extra = scanner.cmd_set_command_mode() + elif args.set_baudrate is not None: + print("Setting Baud Rate") + reply, extra = scanner.cmd_set_baudrate(int(args.set_baudrate)) + if reply: + print("Baudrate Changed Successfully!") + else: + print("Baudrate Change Failed...") + elif args.test_baudrates: + scanner.test_baudrates() + elif args.get_safe_for_binary_qr: + safe = scanner.get_safe_for_binaryqr() + if safe is not None: + if safe: + print("Good News: Safe to use") + else: + print("WARNING: Known to be unsafe for binary QR scanning") else: - print("WARNING: Known to be unsafe for binary QR scanning") + print("Unsure... Unable to match software version as known-good or known-bad...") else: - print("Unsure... Unable to match software version as known-good or known-bad...") - - else: - print("Setting Continuous Mode") - reply, extra = scanner.cmd_set_continuous_mode() - - print("Scanning for 10 Seconds") - scan_duration = 10 - # Keep scanning - start = time.time() - rx_data = b'' - while (time.time() - start) <= scan_duration: - rx_data += ser.read(1024) - - print("Setting Command Mode") - reply, extra = scanner.cmd_set_command_mode() - print("Got:", rx_data, "AsHex:", binascii.hexlify(rx_data)) + print("Setting Continuous Mode") + reply, extra = scanner.cmd_set_continuous_mode() + + print("Scanning for 10 Seconds") + scan_duration = 10 + # Keep scanning + start = time.time() + rx_data = b'' + while (time.time() - start) <= scan_duration: + rx_data += ser.read(1024) + + print("Setting Command Mode") + reply, extra = scanner.cmd_set_command_mode() + print("Got:", rx_data, "AsHex:", binascii.hexlify(rx_data)) + + except NotSupportedError as e: + print(f"❌ Not Supported: {e}") + print(f"This command is not available for {scanner.__class__.__name__} scanners.") + print("See the README.md for a complete command support matrix.") ser.close()