From 2ee502605fac434fe43ae52f68f09a799d56bca4 Mon Sep 17 00:00:00 2001 From: Mitchell Scott <10804314+rmitchellscott@users.noreply.github.com> Date: Sat, 22 Nov 2025 20:53:29 -0700 Subject: [PATCH 01/15] feat: add additional external provider, 3.22-3.23 softwares, paper pro move support. Closes #154 --- codexctl/__init__.py | 5 +++++ codexctl/device.py | 17 ++++++++++++--- codexctl/updates.py | 45 +++++++++++++++++++++++++++++--------- data/version-ids.json | 51 ++++++++++++++++++++++++++++++++++++++----- 4 files changed, 100 insertions(+), 18 deletions(-) diff --git a/codexctl/__init__.py b/codexctl/__init__.py index 9d445c8..631de8e 100644 --- a/codexctl/__init__.py +++ b/codexctl/__init__.py @@ -70,12 +70,17 @@ def call_func(self, function: str, args: dict) -> None: remarkable_pp_versions = "\n".join( self.updater.remarkablepp_versions.keys() ) + remarkable_ppm_versions = "\n".join( + self.updater.remarkableppm_versions.keys() + ) remarkable_2_versions = "\n".join(self.updater.remarkable2_versions.keys()) remarkable_1_versions = "\n".join(self.updater.remarkable1_versions.keys()) version_blocks = [] if remarkable_version is None or remarkable_version == HardwareType.RMPP: version_blocks.append(f"ReMarkable Paper Pro:\n{remarkable_pp_versions}") + if remarkable_version is None or remarkable_version == HardwareType.RMPPM: + version_blocks.append(f"ReMarkable Paper Pro Move:\n{remarkable_ppm_versions}") if remarkable_version is None or remarkable_version == HardwareType.RM2: version_blocks.append(f"ReMarkable 2:\n{remarkable_2_versions}") if remarkable_version is None or remarkable_version == HardwareType.RM1: diff --git a/codexctl/device.py b/codexctl/device.py index 02e4975..45d092e 100644 --- a/codexctl/device.py +++ b/codexctl/device.py @@ -20,17 +20,20 @@ class HardwareType(enum.Enum): RM1 = enum.auto() RM2 = enum.auto() RMPP = enum.auto() + RMPPM = enum.auto() @classmethod def parse(cls, device_type: str) -> "HardwareType": - if device_type.lower() in ("pp", "pro", "rmpp", "ferrari", "remarkable ferrari"): + if device_type.lower() in ("ppm", "rmppm", "chiappa", "remarkable chiappa"): + return cls.RMPPM + elif device_type.lower() in ("pp", "pro", "rmpp", "ferrari", "remarkable ferrari"): return cls.RMPP elif device_type.lower() in ("2", "rm2", "remarkable 2", "remarkable 2.0"): return cls.RM2 elif device_type.lower() in ("1", "rm1", "remarkable 1", "remarkable 1.0", "remarkable prototype 1"): return cls.RM1 - raise ValueError(f"Unknown hardware version: {device_type} (rm1, rm2, rmpp)") + raise ValueError(f"Unknown hardware version: {device_type} (rm1, rm2, rmpp, rmppm)") @property def old_download_hw(self): @@ -41,6 +44,8 @@ def old_download_hw(self): return "reMarkable2" case HardwareType.RMPP: raise ValueError("ReMarkable Paper Pro does not support the old update engine") + case HardwareType.RMPPM: + raise ValueError("ReMarkable Paper Pro Move does not support the old update engine") @property def new_download_hw(self): @@ -51,6 +56,8 @@ def new_download_hw(self): return "rm2" case HardwareType.RMPP: return "rmpp" + case HardwareType.RMPPM: + return "chiappa" @property def swupdate_hw(self): @@ -61,6 +68,8 @@ def swupdate_hw(self): return "reMarkable2" case HardwareType.RMPP: return "ferrari" + case HardwareType.RMPPM: + return "chiappa" @property def toltec_type(self): @@ -71,6 +80,8 @@ def toltec_type(self): return "rm2" case HardwareType.RMPP: raise ValueError("ReMarkable Paper Pro does not support toltec") + case HardwareType.RMPPM: + raise ValueError("ReMarkable Paper Pro Move does not support toltec") class DeviceManager: def __init__( @@ -438,7 +449,7 @@ def restore_previous_version(self) -> None: /sbin/fw_setenv "fallback_partition" "${OLDPART}" /sbin/fw_setenv "active_partition" "${NEWPART}\"""" - if self.hardware == HardwareType.RMPP: + if self.hardware in (HardwareType.RMPP, HardwareType.RMPPM): RESTORE_CODE = """#!/bin/bash OLDPART=$(< /sys/devices/platform/lpgpr/root_part) if [[ $OLDPART == "a" ]]; then diff --git a/codexctl/updates.py b/codexctl/updates.py index 9d20377..cc56017 100644 --- a/codexctl/updates.py +++ b/codexctl/updates.py @@ -29,16 +29,17 @@ def __init__(self, logger=None) -> None: ( self.remarkablepp_versions, + self.remarkableppm_versions, self.remarkable2_versions, self.remarkable1_versions, - self.external_provider_url, + self.external_provider_urls, ) = self.get_remarkable_versions() - def get_remarkable_versions(self) -> tuple[dict, dict, dict, str, str]: + def get_remarkable_versions(self) -> tuple[dict, dict, dict, dict, list]: """Gets the avaliable versions for the device, by checking the local version-ids.json file and then updating it if necessary Returns: - tuple: A tuple containing the version ids for the remarkablepp, remarkable2, remarkable1, toltec version and external provider (in that order) + tuple: A tuple containing the version ids for the remarkablepp, remarkableppm, remarkable2, remarkable1, and external provider urls (in that order) """ if os.path.exists("data/version-ids.json"): @@ -82,11 +83,16 @@ def get_remarkable_versions(self) -> tuple[dict, dict, dict, str, str]: self.logger.debug(f"Version ids contents are {contents}") + provider_urls = contents.get("external-provider-urls", contents.get("external-provider-url")) + if isinstance(provider_urls, str): + provider_urls = [provider_urls] + return ( contents["remarkablepp"], + contents["remarkableppm"], contents["remarkable2"], contents["remarkable1"], - contents["external-provider-url"], + provider_urls, ) def update_version_ids(self, location: str) -> None: @@ -131,6 +137,8 @@ def get_latest_version(self, hardware_type: HardwareType) -> str: versions = self.remarkable2_versions case HardwareType.RMPP: versions = self.remarkablepp_versions + case HardwareType.RMPPM: + versions = self.remarkableppm_versions return self.__max_version(versions.keys()) @@ -200,6 +208,9 @@ def download_version( case HardwareType.RMPP: version_lookup = self.remarkablepp_versions + case HardwareType.RMPPM: + version_lookup = self.remarkableppm_versions + case HardwareType.RM2: version_lookup = self.remarkable2_versions BASE_URL_V3 += "2" @@ -221,16 +232,30 @@ def download_version( if version <= (3, 11, 2, 5): file_name = f"{update_version}_{hardware_type.old_download_hw}-{version_id}.signed" file_url = f"{BASE_URL}/{update_version}/{file_name}" + self.logger.debug(f"File URL is {file_url}, File name is {file_name}") + return self.__download_version_file( + file_url, file_name, download_folder, version_checksum + ) else: - file_url = self.external_provider_url.replace("REPLACE_ID", version_id) file_name = f"remarkable-production-memfault-image-{update_version}-{hardware_type.new_download_hw}-public" - self.logger.debug(f"File URL is {file_url}, File name is {file_name}") + for provider_url in self.external_provider_urls: + file_url = provider_url.replace("REPLACE_ID", version_id) + self.logger.debug(f"Trying to download from {file_url}") - return self.__download_version_file( - file_url, file_name, download_folder, version_checksum - ) + result = self.__download_version_file( + file_url, file_name, download_folder, version_checksum + ) + + if result is not None: + self.logger.debug(f"Successfully downloaded from {provider_url}") + return result + + self.logger.debug(f"Failed to download from {provider_url}, trying next source...") + + self.logger.error(f"Failed to download {file_name} from all sources") + return None def __generate_xml_data(self) -> str: """Generates and returns XML data for the update request""" @@ -298,7 +323,7 @@ def __download_version_file( """ response = requests.get(uri, stream=True) if response.status_code != 200: - self.logger.error(f"Unable to download update file: {response.status_code}") + self.logger.debug(f"Unable to download update file: {response.status_code}") return None file_length = response.headers.get("content-length") diff --git a/data/version-ids.json b/data/version-ids.json index fbed47d..887cb01 100644 --- a/data/version-ids.json +++ b/data/version-ids.json @@ -1,5 +1,13 @@ { "remarkable1": { + "3.23.0.64": [ + "remarkable-production-image-3.23.0.64-rm1-public.swu", + "88c35f0e52b08e9246677babe7163d7917f40bf133ea5b9dfa70a7fc09f548af" + ], + "3.22.0.64": [ + "remarkable-production-image-3.22.0.64-rm1-public.swu", + "9f23e1309b801a9ba68897113dad2fda032f50d261202d5b1b4ce561204afe20" + ], "3.20.0.92": [ "remarkable-production-image-3.20.0.92-rm1-public.swu", "a7558d9ef4b2bcc41d9a9219822a1e7e34bff46851658dcfd9666856401e9728" @@ -182,9 +190,17 @@ ] }, "remarkable2": { - "3.17.0.72": [ - "remarkable-production-memfault-image-3.17.0.72-rm2-public.swu", - "508f63bcae0b43e812d7afb4fbea42d11947e57abd63bc2538adb2884cbfba78" + "3.23.0.64": [ + "remarkable-production-image-3.23.0.64-rm2-public.swu", + "6de624bc03f892b73f56e83d0fd3d49c3c69a6d63a5831509fee98416d776807" + ], + "3.22.4.2": [ + "remarkable-production-image-3.22.4.2-rm2-public.swu", + "e714f7a48362fefa2b4b801d5f1b0fadfd8a272fdac9680b059c10dde6903319" + ], + "3.22.0.64": [ + "remarkable-production-image-3.22.0.64-rm2-public.swu", + "3662e7f0e827d631c99a8447038fff7710b6d2c84049ca4d06162677f1b1aa8a" ], "3.20.0.92": [ "remarkable-production-image-3.20.0.92-rm2-public.swu", @@ -376,6 +392,18 @@ ] }, "remarkablepp": { + "3.23.0.64": [ + "remarkable-production-image-3.23.0.64-ferrari-public.swu", + "c19301055b8cf4351d530a2f59a91fd6ebc8af8c36f0b8999c6557480c8554da" + ], + "3.22.4.2": [ + "remarkable-production-image-3.22.4.2-ferrari-public.swu", + "6baa6f0e9f6a4ad949ef043c3e604021b8f6c897d624b435752c5128cfb410d6" + ], + "3.22.0.64": [ + "remarkable-production-image-3.22.0.64-ferrari-public.swu", + "74b8a32e5aa6b9f1ae8d807e6a3595fed37d0d4e524e58b3a240b48de83d7e9c" + ], "3.20.0.92": [ "remarkable-production-image-3.20.0.92-ferrari-public.swu", "a63c1573aa26ed3e1f7e5dded3eb4adc09710515b2d2b1964ff397c94243ba89" @@ -421,6 +449,19 @@ "8c92f589900e7e355697206c71e2256d909313fcd96aa2c5fd9910ff04b062f1" ] }, - "last-updated": 1751286637, - "external-provider-url": "https://storage.googleapis.com/remarkable-versions/REPLACE_ID" + "remarkableppm": { + "3.23.0.64": [ + "remarkable-production-image-3.23.0.64-chiappa-public.swu", + "0a10ba1dbd873a442f144cb346c9497b820604a98203478345343a01be0d417f" + ], + "3.22.0.65": [ + "remarkable-production-image-3.22.0.65-chiappa-public.swu", + "b494b36e6c1749bbe205ef26f2cea31728f6fe82aa5abf5057e37a09ec326ffa" + ] + }, + "last-updated": 1763843954, + "external-provider-urls": [ + "https://storage.googleapis.com/remarkable-versions/REPLACE_ID", + "https://remarkable-software.s3.us-east-2.amazonaws.com/REPLACE_ID" + ] } From 1243d872892f8aa102c3a94fcf3b1d70a9dc8da5 Mon Sep 17 00:00:00 2001 From: Mitchell Scott <10804314+rmitchellscott@users.noreply.github.com> Date: Sat, 22 Nov 2025 22:24:38 -0700 Subject: [PATCH 02/15] feat: enable restore for paper pro and paper pro move, including 3.20/22 boundry --- codexctl/__init__.py | 6 +- codexctl/device.py | 201 +++++++++++++++++++++++++++++++++++++------ 2 files changed, 175 insertions(+), 32 deletions(-) diff --git a/codexctl/__init__.py b/codexctl/__init__.py index 631de8e..efd9065 100644 --- a/codexctl/__init__.py +++ b/codexctl/__init__.py @@ -214,14 +214,12 @@ def call_func(self, function: str, args: dict) -> None: version = self.updater.get_toltec_version(remarkable.hardware) if function == "status": - beta, prev, current, version_id = remarkable.get_device_status() + beta, prev, current, version_id, backup = remarkable.get_device_status() print( - f"\nCurrent version: {current}\nOld update engine: {prev}\nBeta active: {beta}\nVersion id: {version_id}" + f"\nCurrent version: {current}\nBackup version: {backup}\nOld update engine: {prev}\nBeta active: {beta}\nVersion id: {version_id}" ) elif function == "restore": - if remarkable.hardware == HardwareType.RMPP: - raise SystemError("Restore not available for rmpro.") remarkable.restore_previous_version() print( f"Device restored to previous version [{remarkable.get_device_status()[1]}]" diff --git a/codexctl/device.py b/codexctl/device.py index 45d092e..855f34c 100644 --- a/codexctl/device.py +++ b/codexctl/device.py @@ -285,11 +285,140 @@ def connect_to_device( return client - def get_device_status(self) -> tuple[str | None, str, str]: + def _read_version_from_path(self, ftp, base_path: str = "") -> tuple[str, bool]: + """Reads version from a given path (current partition or mounted backup) + + Args: + ftp: SFTP client connection + base_path: Base path prefix (empty for current partition, /tmp/mount_pX for backup) + + Returns: + tuple: (version_string, old_update_engine_boolean) + """ + old_update_engine = True + + try: + update_conf_path = f"{base_path}/usr/share/remarkable/update.conf" if base_path else "/usr/share/remarkable/update.conf" + with ftp.file(update_conf_path) as file: + version = re.search( + "(?<=REMARKABLE_RELEASE_VERSION=).*", + file.read().decode("utf-8").strip("\n"), + ).group() + except Exception: + os_release_path = f"{base_path}/etc/os-release" if base_path else "/etc/os-release" + with ftp.file(os_release_path) as file: + version = ( + re.search("(?<=IMG_VERSION=).*", file.read().decode("utf-8")) + .group() + .strip('"') + ) + old_update_engine = False + + return version, old_update_engine + + def _get_backup_partition_version(self) -> str: + """Gets the version installed on the backup (inactive) partition + + Returns: + str: Version string or "unknown" + """ + if not self.client: + return "unknown" + + try: + ftp = self.client.open_sftp() + + if self.hardware in (HardwareType.RMPP, HardwareType.RMPPM): + _stdin, stdout, _stderr = self.client.exec_command("swupdate -g") + active_device = stdout.read().decode("utf-8").strip() + active_part = int(active_device.split('p')[-1]) + inactive_part = 3 if active_part == 2 else 2 + device_base = re.sub(r'p\d+$', '', active_device) + else: + _stdin, stdout, _stderr = self.client.exec_command("rootdev") + active_device = stdout.read().decode("utf-8").strip() + active_part = int(active_device.split('p')[-1]) + inactive_part = 3 if active_part == 2 else 2 + device_base = re.sub(r'p\d+$', '', active_device) + + mount_point = f"/tmp/mount_p{inactive_part}" + + self.client.exec_command(f"mkdir -p {mount_point}") + _stdin, stdout, _stderr = self.client.exec_command( + f"mount -o ro {device_base}p{inactive_part} {mount_point}" + ) + exit_status = stdout.channel.recv_exit_status() + + if exit_status != 0: + self.logger.debug(f"Failed to mount backup partition: {_stderr.read().decode('utf-8')}") + return "unknown" + + try: + version, _ = self._read_version_from_path(ftp, mount_point) + except Exception as e: + self.logger.debug(f"Failed to read version from backup partition: {e}") + version = "unknown" + + self.client.exec_command(f"umount {mount_point}") + self.client.exec_command(f"rm -rf {mount_point}") + + return version + + except Exception as e: + self.logger.debug(f"Error getting backup partition version: {e}") + return "unknown" + + def _get_paper_pro_partition_info(self, current_version: str) -> tuple[int, int, int]: + """Gets partition information for Paper Pro devices + + Args: + current_version: Current OS version string for version-aware detection + + Returns: + tuple: (current_partition, inactive_partition, next_boot_partition) + """ + if not self.client: + raise RuntimeError("SSH client required for partition detection") + + _stdin, stdout, _stderr = self.client.exec_command("swupdate -g") + active_device = stdout.read().decode("utf-8").strip() + current_part = int(active_device.split('p')[-1]) + inactive_part = 3 if current_part == 2 else 2 + + try: + version_parts = [int(x) for x in current_version.split('.')[:2]] + is_new_version = version_parts >= [3, 22] + except Exception: + is_new_version = True + + next_boot_part = current_part + + if is_new_version: + try: + ftp = self.client.open_sftp() + with ftp.file("/sys/bus/mmc/devices/mmc0:0001/boot_part") as file: + boot_part_value = file.read().decode("utf-8").strip() + next_boot_part = 2 if boot_part_value == "1" else 3 + except Exception: + is_new_version = False + + if not is_new_version: + try: + ftp = self.client.open_sftp() + with ftp.file("/sys/devices/platform/lpgpr/root_part") as file: + root_part_value = file.read().decode("utf-8").strip() + next_boot_part = 2 if root_part_value == "a" else 3 + except Exception as e: + self.logger.debug(f"Failed to read next boot partition: {e}") + pass + + return current_part, inactive_part, next_boot_part + + def get_device_status(self) -> tuple[str | None, str, str, str, str]: """Gets the status of the device Returns: - tuple: Beta status, previous version, and current version (in that order) + tuple: Beta status, old_update_engine, current version, version_id, backup version (in that order) """ old_update_engine = True @@ -298,20 +427,7 @@ def get_device_status(self) -> tuple[str | None, str, str]: ftp = self.client.open_sftp() self.logger.debug("Connected") - try: - with ftp.file("/usr/share/remarkable/update.conf") as file: - xochitl_version = re.search( - "(?<=REMARKABLE_RELEASE_VERSION=).*", - file.read().decode("utf-8").strip("\n"), - ).group() - except Exception: - with ftp.file("/etc/os-release") as file: - xochitl_version = ( - re.search("(?<=IMG_VERSION=).*", file.read().decode("utf-8")) - .group() - .strip('"') - ) - old_update_engine = False + xochitl_version, old_update_engine = self._read_version_from_path(ftp) with ftp.file("/etc/version") as file: version_id = file.read().decode("utf-8").strip("\n") @@ -353,7 +469,9 @@ def get_device_status(self) -> tuple[str | None, str, str]: if beta_possible is not None: beta = re.search("(?<=GROUP=).*", beta_contents).group() - return beta, old_update_engine, xochitl_version, version_id + backup_version = self._get_backup_partition_version() + + return beta, old_update_engine, xochitl_version, version_id, backup_version def set_server_config(self, contents: str, server_host_name: str) -> str: """Converts the contents given to point to the given server IP and port @@ -450,17 +568,44 @@ def restore_previous_version(self) -> None: /sbin/fw_setenv "active_partition" "${NEWPART}\"""" if self.hardware in (HardwareType.RMPP, HardwareType.RMPPM): - RESTORE_CODE = """#!/bin/bash -OLDPART=$(< /sys/devices/platform/lpgpr/root_part) -if [[ $OLDPART == "a" ]]; then - NEWPART="b" -else - NEWPART="a" -fi -echo "new: ${NEWPART}" -echo "fallback: ${OLDPART}" -echo $NEWPART > /sys/devices/platform/lpgpr/root_part -""" + _, _, current_version, _, backup_version = self.get_device_status() + current_part, inactive_part, _ = self._get_paper_pro_partition_info(current_version) + + new_part_label = "a" if inactive_part == 2 else "b" + old_part_label = "a" if current_part == 2 else "b" + + try: + current_version_parts = [int(x) for x in current_version.split('.')[:2]] + current_is_new = current_version_parts >= [3, 22] + except Exception: + current_is_new = False + + try: + target_version_parts = [int(x) for x in backup_version.split('.')[:2]] + target_is_new = target_version_parts >= [3, 22] + except Exception: + target_is_new = False + + RESTORE_CODE = "#!/bin/bash\n" + RESTORE_CODE += f"echo 'Switching from partition {current_part} to partition {inactive_part}'\n" + RESTORE_CODE += f"echo 'Current version: {current_version}'\n" + RESTORE_CODE += f"echo 'Target version: {backup_version}'\n" + + # Method 1: Legacy sysfs (if current OS < 3.22) + if not current_is_new: + RESTORE_CODE += f"echo '{new_part_label}' > /sys/devices/platform/lpgpr/root_part\n" + RESTORE_CODE += "echo 'Set next boot via sysfs (legacy method)'\n" + + # Method 2: MMC bootpart (if target OS >= 3.22 OR current OS >= 3.22) + if target_is_new or current_is_new: + if inactive_part == 2: + RESTORE_CODE += "mmc bootpart enable 1 0 /dev/mmcblk0boot0\n" + else: + RESTORE_CODE += "mmc bootpart enable 2 0 /dev/mmcblk0boot1\n" + RESTORE_CODE += "echo 'Set next boot via mmc bootpart (new method)'\n" + + RESTORE_CODE += f"echo '0' > /sys/devices/platform/lpgpr/root{new_part_label}_errcnt 2>/dev/null || true\n" + RESTORE_CODE += "echo 'Partition switch complete'\n" if self.client: self.logger.debug("Connecting to FTP") From 6e558b6a7c7d06b1312d36af368a315830ea354b Mon Sep 17 00:00:00 2001 From: Mitchell Scott <10804314+rmitchellscott@users.noreply.github.com> Date: Sun, 23 Nov 2025 00:35:40 -0700 Subject: [PATCH 03/15] feat: extract supports swu files, paper pro downgrades with bootloader updates --- codexctl/__init__.py | 111 +++++++++++++++++++++++++++----- codexctl/cpio.py | 150 +++++++++++++++++++++++++++++++++++++++++++ codexctl/device.py | 91 ++++++++++++++++++++++++-- codexctl/updates.py | 26 ++++++++ 4 files changed, 359 insertions(+), 19 deletions(-) create mode 100644 codexctl/cpio.py diff --git a/codexctl/__init__.py b/codexctl/__init__.py index efd9065..6a51345 100644 --- a/codexctl/__init__.py +++ b/codexctl/__init__.py @@ -100,24 +100,50 @@ def call_func(self, function: str, args: dict) -> None: ### Mounting functionalities elif function in ("extract", "mount"): - try: - from .analysis import get_update_image - except ImportError: - raise ImportError( - "remarkable_update_image is required for analysis. Please install it!" - ) - if function == "extract": if not args["out"]: args["out"] = os.getcwd() + "/extracted" logger.debug(f"Extracting {args['file']} to {args['out']}") - image, volume = get_update_image(args["file"]) - image.seek(0) - with open(args["out"], "wb") as f: - f.write(image.read()) + with open(args["file"], "rb") as f: + magic = f.read(6) + + if magic == b'070702': + logger.info("Detected CPIO format (3.11+ SWU file)") + from .cpio import extract_cpio_files + + if os.path.isdir(args["out"]): + output_dir = args["out"] + else: + output_dir = args["out"] + os.makedirs(output_dir, exist_ok=True) + + extract_cpio_files(args["file"], output_dir=output_dir) + logger.info(f"Extracted SWU contents to {output_dir}") + else: + logger.info("Detected old format (<3.11 .signed file)") + try: + from .analysis import get_update_image + except ImportError: + raise ImportError( + "remarkable_update_image is required for extracting old format files. Please install it!" + ) + + image, volume = get_update_image(args["file"]) + image.seek(0) + + with open(args["out"], "wb") as f: + f.write(image.read()) else: + try: + from .analysis import get_update_image + from remarkable_update_fuse import UpdateFS + except ImportError: + raise ImportError( + "remarkable_update_image and remarkable_update_fuse are required for mounting. Please install them!" + ) + if args["out"] is None: args["out"] = "/opt/remarkable/" @@ -127,8 +153,6 @@ def call_func(self, function: str, args: dict) -> None: if not os.path.exists(args["filesystem"]): raise SystemExit("Firmware file does not exist!") - from remarkable_update_fuse import UpdateFS - server = UpdateFS() server.parse( args=[args["filesystem"], args["out"]], values=server, errex=1 @@ -222,7 +246,7 @@ def call_func(self, function: str, args: dict) -> None: elif function == "restore": remarkable.restore_previous_version() print( - f"Device restored to previous version [{remarkable.get_device_status()[1]}]" + f"Device restored to previous version [{remarkable.get_device_status()[4]}]" ) remarkable.reboot_device() print("Device rebooted") @@ -281,6 +305,63 @@ def version_lookup(version: str | None) -> re.Match[str] | None: ############################################################# + bootloader_files_for_install = None + + if (device_version_uses_new_engine and + remarkable.hardware == HardwareType.RMPP and + not update_file): + + current_version = remarkable.get_device_status()[2] + + if UpdateManager.is_bootloader_boundary_downgrade(current_version, version_number): + print("\n" + "="*60) + print("WARNING: Bootloader Update Required") + print("="*60) + print(f"Current version: {current_version}") + print(f"Target version: {version_number}") + print() + print("Downgrading from 3.22+ to <3.22 requires updating the") + print("bootloader on both partitions. This process will:") + print(" 1. Download the current version's bootloader files") + print(" 2. Download the target OS version") + print(" 3. Install the target OS version") + print(" 4. Update bootloader on both partitions") + print(" 5. Reboot") + print() + + response = input("Do you want to continue? (y/n): ") + if response.lower() != 'y': + raise SystemExit("Installation cancelled by user") + + expected_swu_name = f"remarkable-production-memfault-image-{current_version}-{remarkable.hardware.new_download_hw}-public" + expected_swu_path = f"./{expected_swu_name}" + + if os.path.isfile(expected_swu_path): + print(f"\nUsing existing {expected_swu_name} for bootloader extraction...") + current_swu_path = expected_swu_path + else: + print("\nDownloading current version's SWU for bootloader extraction...") + current_swu_path = self.updater.download_version( + remarkable.hardware, + current_version, + "./" + ) + + if current_swu_path: + print("Extracting bootloader files...") + from .cpio import extract_cpio_files + bootloader_files_for_install = extract_cpio_files( + current_swu_path, + filter_files=['update-bootloader.sh', 'imx-boot'] + ) + + if not bootloader_files_for_install or len(bootloader_files_for_install) != 2: + raise SystemError("Failed to extract bootloader files from current version") + + print(f"✓ Extracted update-bootloader.sh ({len(bootloader_files_for_install['update-bootloader.sh'])} bytes)") + print(f"✓ Extracted imx-boot ({len(bootloader_files_for_install['imx-boot'])} bytes)") + print() + if not update_file_requires_new_engine: if update_file: # Check if file exists if os.path.dirname( @@ -321,7 +402,7 @@ def version_lookup(version: str | None) -> re.Match[str] | None: ) if device_version_uses_new_engine: - remarkable.install_sw_update(update_file) + remarkable.install_sw_update(update_file, bootloader_files=bootloader_files_for_install) else: remarkable.install_ohma_update(update_file) diff --git a/codexctl/cpio.py b/codexctl/cpio.py new file mode 100644 index 0000000..42bf25d --- /dev/null +++ b/codexctl/cpio.py @@ -0,0 +1,150 @@ +""" +Pure Python CPIO "newc" format parser for extracting SWU files. +Zero external dependencies - uses only Python stdlib. +""" + +import os +import stat +from pathlib import Path +from typing import Dict, Optional, List, Union + + +def _parse_cpio_header(header_data: bytes) -> Dict[str, int]: + """ + Parse a CPIO newc format header (110 bytes). + + Format: magic(6) + inode(8) + mode(8) + uid(8) + gid(8) + nlink(8) + + mtime(8) + filesize(8) + devmajor(8) + devminor(8) + + rdevmajor(8) + rdevminor(8) + namesize(8) + check(8) + + All fields except magic are 8-digit ASCII hex. + """ + if len(header_data) != 110: + raise ValueError(f"Invalid header size: {len(header_data)} (expected 110)") + + header_str = header_data.decode('ascii') + magic = header_str[0:6] + + if magic != '070702': + raise ValueError(f"Invalid CPIO magic: {magic} (expected 070702)") + + return { + 'inode': int(header_str[6:14], 16), + 'mode': int(header_str[14:22], 16), + 'uid': int(header_str[22:30], 16), + 'gid': int(header_str[30:38], 16), + 'nlink': int(header_str[38:46], 16), + 'mtime': int(header_str[46:54], 16), + 'filesize': int(header_str[54:62], 16), + 'devmajor': int(header_str[62:70], 16), + 'devminor': int(header_str[70:78], 16), + 'rdevmajor': int(header_str[78:86], 16), + 'rdevminor': int(header_str[86:94], 16), + 'namesize': int(header_str[94:102], 16), + 'check': int(header_str[102:110], 16), + } + + +def _align_to_4(offset: int) -> int: + """Calculate padding needed to align to 4-byte boundary.""" + remainder = offset % 4 + return 0 if remainder == 0 else 4 - remainder + + +def extract_cpio_files( + archive_path: Union[str, Path], + output_dir: Optional[Union[str, Path]] = None, + filter_files: Optional[List[str]] = None +) -> Optional[Dict[str, bytes]]: + """ + Extract files from a CPIO archive (SWU file). + + Args: + archive_path: Path to the .swu (CPIO) file + output_dir: Directory to extract files to (for full extraction) + filter_files: List of specific filenames to extract (selective extraction) + + Returns: + If filter_files is provided: dict mapping filename -> file data (bytes) + If output_dir is provided: None (files written to disk) + + Raises: + ValueError: If invalid CPIO format + FileNotFoundError: If archive doesn't exist + """ + archive_path = Path(archive_path) + if not archive_path.exists(): + raise FileNotFoundError(f"Archive not found: {archive_path}") + + with open(archive_path, 'rb') as f: + archive_data = f.read() + + offset = 0 + extracted = {} if filter_files else None + + while offset < len(archive_data): + if offset + 110 > len(archive_data): + break + + header_bytes = archive_data[offset:offset + 110] + offset += 110 + + try: + header = _parse_cpio_header(header_bytes) + except ValueError: + break + + namesize = header['namesize'] + if offset + namesize > len(archive_data): + break + + filename_bytes = archive_data[offset:offset + namesize] + offset += namesize + + offset += _align_to_4(offset) + + filename = filename_bytes.rstrip(b'\x00').decode('utf-8') + + if filename == 'TRAILER!!!': + break + + filesize = header['filesize'] + if offset + filesize > len(archive_data): + break + + file_data = archive_data[offset:offset + filesize] + offset += filesize + + offset += _align_to_4(offset) + + mode = header['mode'] + is_dir = stat.S_ISDIR(mode) + is_symlink = stat.S_ISLNK(mode) + is_regular = stat.S_ISREG(mode) + + if filter_files is not None: + if filename in filter_files: + extracted[filename] = file_data + if len(extracted) == len(filter_files): + return extracted + + elif output_dir is not None: + output_path = Path(output_dir) / filename + + if is_dir: + output_path.mkdir(parents=True, exist_ok=True) + elif is_symlink: + link_target = file_data.decode('utf-8') + output_path.parent.mkdir(parents=True, exist_ok=True) + if output_path.exists() or output_path.is_symlink(): + output_path.unlink() + output_path.symlink_to(link_target) + elif is_regular: + output_path.parent.mkdir(parents=True, exist_ok=True) + with open(output_path, 'wb') as out_f: + out_f.write(file_data) + os.chmod(output_path, mode & 0o777) + + if filter_files is not None: + return extracted + return None diff --git a/codexctl/device.py b/codexctl/device.py index 855f34c..da056b5 100644 --- a/codexctl/device.py +++ b/codexctl/device.py @@ -6,6 +6,7 @@ import re import os import time +from typing import Optional, Dict from .server import startUpdate @@ -657,12 +658,13 @@ def reboot_device(self) -> None: self.logger.debug("Device rebooted") - def install_sw_update(self, version_file: str) -> None: + def install_sw_update(self, version_file: str, bootloader_files: Optional[Dict[str, bytes]] = None) -> None: """ Installs new version from version file path, utilising swupdate Args: version_file (str): Path to img file + bootloader_files (Optional[Dict[str, bytes]]): Bootloader files for Paper Pro downgrade Raises: SystemExit: If there was an error installing the update @@ -702,6 +704,14 @@ def install_sw_update(self, version_file: str) -> None: print("".join(_stderr.readlines())) raise SystemError("Update failed!") + if bootloader_files: + print("\nApplying bootloader update...") + self._update_paper_pro_bootloader( + bootloader_files['update-bootloader.sh'], + bootloader_files['imx-boot'] + ) + print("✓ Bootloader update completed") + print("Done! Now rebooting the device and disabling update service") #### Now disable automatic updates @@ -721,6 +731,7 @@ def install_sw_update(self, version_file: str) -> None: self.client = self.connect_to_device( remote_address=self.address, authentication=self.authentication ) + self.client.exec_command("systemctl stop swupdate memfaultd") print( @@ -761,6 +772,81 @@ def install_sw_update(self, version_file: str) -> None: print("Update complete and device rebooting") os.system("reboot") + def _update_paper_pro_bootloader(self, bootloader_script: bytes, imx_boot: bytes) -> None: + """ + Update bootloader on Paper Pro device for 3.22+ -> <3.22 downgrades. + + This method uploads the bootloader script and image to the device, + then runs the update script twice (preinst and postinst) to update + both boot partitions. + + Args: + bootloader_script: Contents of update-bootloader.sh + imx_boot: Contents of imx-boot image file + + Raises: + SystemError: If bootloader update fails + """ + import tempfile + + self.logger.info("Starting bootloader update for Paper Pro") + + if not self.client: + raise SystemError("No SSH connection to device") + + ftp_client = self.client.open_sftp() + + script_path = "/tmp/update-bootloader.sh" + boot_image_path = "/tmp/imx-boot" + + with tempfile.NamedTemporaryFile(mode='wb', delete=False, suffix='.sh') as tmp_script: + tmp_script.write(bootloader_script) + tmp_script_path = tmp_script.name + + with tempfile.NamedTemporaryFile(mode='wb', delete=False, suffix='.img') as tmp_boot: + tmp_boot.write(imx_boot) + tmp_boot_path = tmp_boot.name + + try: + self.logger.debug("Uploading bootloader script to device") + ftp_client.put(tmp_script_path, script_path) + + self.logger.debug("Uploading imx-boot image to device") + ftp_client.put(tmp_boot_path, boot_image_path) + + self.logger.debug("Making bootloader script executable") + _stdin, stdout, _stderr = self.client.exec_command(f"chmod +x {script_path}") + stdout.channel.recv_exit_status() + + self.logger.info("Running bootloader update script (preinst)") + _stdin, stdout, stderr = self.client.exec_command( + f"{script_path} preinst {boot_image_path}" + ) + exit_status = stdout.channel.recv_exit_status() + if exit_status != 0: + error_msg = "".join(stderr.readlines()) + raise SystemError(f"Bootloader preinst failed: {error_msg}") + + self.logger.info("Running bootloader update script (postinst)") + _stdin, stdout, stderr = self.client.exec_command( + f"{script_path} postinst {boot_image_path}" + ) + exit_status = stdout.channel.recv_exit_status() + if exit_status != 0: + error_msg = "".join(stderr.readlines()) + raise SystemError(f"Bootloader postinst failed: {error_msg}") + + self.logger.info("Bootloader update completed successfully") + + finally: + self.logger.debug("Cleaning up temporary bootloader files on device") + self.client.exec_command(f"rm -f {script_path} {boot_image_path}") + + self.logger.debug("Cleaning up local temporary files") + os.unlink(tmp_script_path) + os.unlink(tmp_boot_path) + ftp_client.close() + def install_ohma_update(self, version_available: dict) -> None: """Installs version from update folder on the device @@ -882,6 +968,3 @@ def output_put_progress(transferred: int, toBeTransferred: int) -> None: f"Transferring progress{int((transferred / toBeTransferred) * 100)}%", end="\r", ) - - - diff --git a/codexctl/updates.py b/codexctl/updates.py index cc56017..7a73466 100644 --- a/codexctl/updates.py +++ b/codexctl/updates.py @@ -391,3 +391,29 @@ def uses_new_update_engine(version: str) -> bool: bool: If it uses the new update engine or not """ return int(version.split(".")[0]) >= 3 and int(version.split(".")[1]) >= 11 + + @staticmethod + def is_bootloader_boundary_downgrade(current_version: str, target_version: str) -> bool: + """ + Checks if downgrade crosses the 3.22 bootloader boundary (3.22+ -> <3.22). + + Paper Pro devices require bootloader updates when downgrading from + version 3.22 or higher to any version below 3.22. + + Args: + current_version (str): Currently installed version + target_version (str): Target version to install + + Returns: + bool: True if crossing boundary downward (3.22+ -> <3.22) + """ + try: + current_parts = [int(x) for x in current_version.split('.')] + target_parts = [int(x) for x in target_version.split('.')] + + current_is_322_or_higher = current_parts >= [3, 22] + target_is_below_322 = target_parts < [3, 22] + + return current_is_322_or_higher and target_is_below_322 + except (ValueError, IndexError): + return False From b8c9a44de67c52e84469677f8d16caa94552a5df Mon Sep 17 00:00:00 2001 From: Mitchell Scott <10804314+rmitchellscott@users.noreply.github.com> Date: Sun, 23 Nov 2025 13:33:33 -0700 Subject: [PATCH 04/15] fix: implement pr review recommendations --- .github/workflows/main.yml | 19 +++++ Makefile | 6 +- README.md | 15 +++- codexctl/__init__.py | 70 +++++++++++------ codexctl/analysis.py | 107 +++++++++++++++++++++++++- codexctl/cpio.py | 150 ------------------------------------- codexctl/device.py | 61 ++++++++------- codexctl/updates.py | 5 ++ requirements.txt | 2 +- tests/test_unit.py | 65 ++++++++++++++++ 10 files changed, 292 insertions(+), 208 deletions(-) delete mode 100644 codexctl/cpio.py create mode 100644 tests/test_unit.py diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml index 998ba52..92aee3f 100644 --- a/.github/workflows/main.yml +++ b/.github/workflows/main.yml @@ -33,8 +33,27 @@ defaults: shell: bash jobs: + unit-tests: + name: Unit Tests + runs-on: ubuntu-latest + steps: + - name: Checkout repository + uses: actions/checkout@v4 + - uses: actions/setup-python@v5 + with: + python-version: '3.12' + cache: 'pip' + cache-dependency-path: | + **/requirements*.txt + - name: Install dependencies + run: | + python -m pip install -r requirements.txt + - name: Run unit tests + run: python tests/test_unit.py + remote: name: Build for ${{ matrix.os }} + needs: [unit-tests] strategy: fail-fast: false matrix: diff --git a/Makefile b/Makefile index 0184d66..c0c3413 100644 --- a/Makefile +++ b/Makefile @@ -50,7 +50,11 @@ $(VENV_BIN_ACTIVATE): requirements.remote.txt requirements.txt python -m codexctl download --hardware rm2 --out .venv ${FW_VERSION} test: $(VENV_BIN_ACTIVATE) .venv/${FW_VERSION}_reMarkable2-${FW_DATA}.signed - @echo "[info] Running test" + @echo "[info] Running unit tests" + @set -e; \ + . $(VENV_BIN_ACTIVATE); \ + python tests/test_unit.py + @echo "[info] Running integration tests" @set -e; \ . $(VENV_BIN_ACTIVATE); \ python tests/test.py; \ diff --git a/README.md b/README.md index 3ef821a..8ef3e3b 100644 --- a/README.md +++ b/README.md @@ -7,6 +7,10 @@ A utility program that helps to manage the remarkable device version utilizing [ If your reMarkable device is at or above 3.11.2.5 and you want to downgrade to a version below 3.11.2.5, codexctl cannot do this currently. Please refer to https://github.com/Jayy001/codexctl/issues/95#issuecomment-2305529048 for manual instructions. +## Paper Pro bootloader updates + +When downgrading a Paper Pro device across the 3.20/3.22 firmware boundary, codexctl automatically handles bootloader updates. It will download the current version's firmware if needed and extract the necessary bootloader files (`update-bootloader.sh` and `imx-boot`) to ensure a safe downgrade. + ## Installation You can find pre-compiled binaries on the [releases](https://github.com/Jayy001/codexctl/releases/) page. This includes a build for the reMarkable itself, as well as well as builds for linux, macOS, and Windows. Alternatively, you can install directly from pypi with `pip install codexctl`. Codexctl currently only has support for a **command line interfaces** but a graphical interface is soon to come. @@ -53,6 +57,11 @@ codexctl install latest codexctl download 3.15.4.2 --hardware rmpp -o out codexctl install ./out/remarkable-ct-prototype-image-3.15.4.2-ferrari-public.swu ``` +- (Paper Pro Move) version 3.23.0.64 to a folder named `out` +``` +codexctl download 3.23.0.64 --hardware rmppm -o out +codexctl install ./out/remarkable-production-image-3.23.0.64-chiappa-public.swu +``` - Backing up all documents to the cwd ``` codexctl backup @@ -65,7 +74,7 @@ codexctl backup --incremental ``` codexctl backup -l root -r FM --no-recursion --no-overwrite ``` -- Getting the version of the device and then switching to previous version (restore only for rm1/rm2) +- Getting the version of the device and then switching to previous version ``` codexctl status codexctl restore @@ -75,3 +84,7 @@ codexctl restore codexctl download 3.8.0.1944 --hardware rm2 codexctl cat 3.8.0.1944_reMarkable2-7eGpAv7sYB.signed /etc/version ``` +- Extract a 3.11+ SWU firmware file to see its contents +``` +codexctl extract remarkable-production-image-3.22.0.64-ferrari-public.swu -o extracted +``` diff --git a/codexctl/__init__.py b/codexctl/__init__.py index 6a51345..080985f 100644 --- a/codexctl/__init__.py +++ b/codexctl/__init__.py @@ -106,21 +106,16 @@ def call_func(self, function: str, args: dict) -> None: logger.debug(f"Extracting {args['file']} to {args['out']}") + # Check CPIO magic to route between SWU (CPIO) and old .signed format with open(args["file"], "rb") as f: magic = f.read(6) - if magic == b'070702': + if magic in (b'070701', b'070702'): logger.info("Detected CPIO format (3.11+ SWU file)") - from .cpio import extract_cpio_files + from .analysis import extract_swu_files - if os.path.isdir(args["out"]): - output_dir = args["out"] - else: - output_dir = args["out"] - os.makedirs(output_dir, exist_ok=True) - - extract_cpio_files(args["file"], output_dir=output_dir) - logger.info(f"Extracted SWU contents to {output_dir}") + extract_swu_files(args["file"], output_dir=args["out"]) + logger.info(f"Extracted SWU contents to {args['out']}") else: logger.info("Detected old format (<3.11 .signed file)") try: @@ -263,18 +258,44 @@ def call_func(self, function: str, args: dict) -> None: def version_lookup(version: str | None) -> re.Match[str] | None: return re.search(r"\b\d+\.\d+\.\d+\.\d+\b", cast(str, version)) - version_number = version_lookup(version) + version_number = None + swu_hardware = None + + if update_file: + try: + # Quick magic check to skip expensive metadata extraction on old .signed files + with open(update_file, "rb") as f: + magic = f.read(6) + + if magic in (b'070701', b'070702'): + from .analysis import get_swu_metadata + version_number, swu_hardware = get_swu_metadata(update_file) + logger.info(f"Extracted from SWU: version={version_number}, hardware={swu_hardware.name}") + + if swu_hardware != remarkable.hardware: + raise SystemError( + f"Hardware mismatch!\n" + f"SWU file is for: {swu_hardware.name}\n" + f"Connected device is: {remarkable.hardware.name}\n" + f"Cannot install firmware for different hardware." + ) + except ValueError as e: + logger.warning(f"Could not extract metadata from SWU: {e}") if not version_number: - version_number = version_lookup( - input( - "Failed to get the version number from the filename, please enter it: " + version_match = version_lookup(version) + if not version_match: + version_match = version_lookup( + input( + "Failed to get the version number from the filename, please enter it: " + ) ) - ) - if not version_number: - raise SystemError("Invalid version!") + if not version_match: + raise SystemError("Invalid version!") - version_number = version_number.group() + version_number = version_match.group() + else: + version_number = str(version_number) update_file_requires_new_engine = UpdateManager.uses_new_update_engine( version_number @@ -308,8 +329,7 @@ def version_lookup(version: str | None) -> re.Match[str] | None: bootloader_files_for_install = None if (device_version_uses_new_engine and - remarkable.hardware == HardwareType.RMPP and - not update_file): + remarkable.hardware == HardwareType.RMPP): current_version = remarkable.get_device_status()[2] @@ -349,8 +369,8 @@ def version_lookup(version: str | None) -> re.Match[str] | None: if current_swu_path: print("Extracting bootloader files...") - from .cpio import extract_cpio_files - bootloader_files_for_install = extract_cpio_files( + from .analysis import extract_swu_files + bootloader_files_for_install = extract_swu_files( current_swu_path, filter_files=['update-bootloader.sh', 'imx-boot'] ) @@ -604,4 +624,8 @@ def main() -> None: ### Call function man = Manager(device, logger) - man.call_func(args.command, vars(args)) + try: + man.call_func(args.command, vars(args)) + except SystemError as e: + print(f"\nError: {e}", file=sys.stderr) + sys.exit(1) diff --git a/codexctl/analysis.py b/codexctl/analysis.py index 8e8fe85..ddc366d 100644 --- a/codexctl/analysis.py +++ b/codexctl/analysis.py @@ -1,9 +1,13 @@ import ext4 -import warnings +import warnings import errno +import libconf from remarkable_update_image import UpdateImage from remarkable_update_image import UpdateImageSignatureException +from remarkable_update_image.cpio import Archive +from typing import Tuple, Optional, Dict +from .device import HardwareType def get_update_image(file: str): @@ -31,3 +35,104 @@ def get_update_image(file: str): warnings.warn("Unable to open public key", RuntimeWarning) return image, volume + + +def get_swu_metadata(swu_file: str) -> Tuple[str, HardwareType]: + """ + Extract version and hardware type from an SWU file. + + Args: + swu_file: Path to the SWU file + + Returns: + Tuple of (version, hardware_type) + + Raises: + ValueError: If sw-description is missing or invalid + SystemError: If hardware type is unsupported + """ + archive = Archive(swu_file) + archive.open() + try: + if b"sw-description" not in archive.keys(): + raise ValueError(f"Not a valid SWU file: {swu_file}") + + sw_desc = archive["sw-description"].read().decode("utf-8") + info = libconf.loads(sw_desc)["software"] + + version = info.get("version") + if not version: + raise ValueError(f"No version found in sw-description: {swu_file}") + + if "reMarkable1" in info: + hardware = HardwareType.RM1 + elif "reMarkable2" in info: + hardware = HardwareType.RM2 + elif "ferrari" in info: + hardware = HardwareType.RMPP + elif "chiappa" in info: + hardware = HardwareType.RMPPM + else: + raise SystemError(f"Unsupported hardware type in SWU file: {swu_file}") + + return version, hardware + finally: + archive.close() + + +def extract_swu_files( + swu_file: str, + output_dir: Optional[str] = None, + filter_files: Optional[list] = None +) -> Optional[Dict[str, bytes]]: + """ + Extract files from an SWU (CPIO) archive. + + Args: + swu_file: Path to the SWU file + output_dir: Directory to extract files to (for full extraction to disk) + filter_files: List of filenames to extract (selective extraction) + + Returns: + If filter_files is provided: dict mapping filename -> file data (bytes) + If output_dir is provided: None (files written to disk) + """ + import os + from pathlib import Path + + archive = Archive(swu_file) + archive.open() + try: + if output_dir is not None: + output_path = Path(output_dir) + output_path.mkdir(parents=True, exist_ok=True) + + for name in archive.keys(): + if name == b"TRAILER!!!": + continue + + filename = name.decode('utf-8') + file_path = output_path / filename + file_path.parent.mkdir(parents=True, exist_ok=True) + + with open(file_path, 'wb') as f: + f.write(archive[name].read()) + + return None + + else: + extracted = {} + + if filter_files is None: + for name in archive.keys(): + if name != b"TRAILER!!!": + extracted[name.decode('utf-8')] = archive[name].read() + else: + for filename in filter_files: + entry = archive.get(filename) + if entry: + extracted[filename] = entry.read() + + return extracted + finally: + archive.close() diff --git a/codexctl/cpio.py b/codexctl/cpio.py deleted file mode 100644 index 42bf25d..0000000 --- a/codexctl/cpio.py +++ /dev/null @@ -1,150 +0,0 @@ -""" -Pure Python CPIO "newc" format parser for extracting SWU files. -Zero external dependencies - uses only Python stdlib. -""" - -import os -import stat -from pathlib import Path -from typing import Dict, Optional, List, Union - - -def _parse_cpio_header(header_data: bytes) -> Dict[str, int]: - """ - Parse a CPIO newc format header (110 bytes). - - Format: magic(6) + inode(8) + mode(8) + uid(8) + gid(8) + nlink(8) + - mtime(8) + filesize(8) + devmajor(8) + devminor(8) + - rdevmajor(8) + rdevminor(8) + namesize(8) + check(8) - - All fields except magic are 8-digit ASCII hex. - """ - if len(header_data) != 110: - raise ValueError(f"Invalid header size: {len(header_data)} (expected 110)") - - header_str = header_data.decode('ascii') - magic = header_str[0:6] - - if magic != '070702': - raise ValueError(f"Invalid CPIO magic: {magic} (expected 070702)") - - return { - 'inode': int(header_str[6:14], 16), - 'mode': int(header_str[14:22], 16), - 'uid': int(header_str[22:30], 16), - 'gid': int(header_str[30:38], 16), - 'nlink': int(header_str[38:46], 16), - 'mtime': int(header_str[46:54], 16), - 'filesize': int(header_str[54:62], 16), - 'devmajor': int(header_str[62:70], 16), - 'devminor': int(header_str[70:78], 16), - 'rdevmajor': int(header_str[78:86], 16), - 'rdevminor': int(header_str[86:94], 16), - 'namesize': int(header_str[94:102], 16), - 'check': int(header_str[102:110], 16), - } - - -def _align_to_4(offset: int) -> int: - """Calculate padding needed to align to 4-byte boundary.""" - remainder = offset % 4 - return 0 if remainder == 0 else 4 - remainder - - -def extract_cpio_files( - archive_path: Union[str, Path], - output_dir: Optional[Union[str, Path]] = None, - filter_files: Optional[List[str]] = None -) -> Optional[Dict[str, bytes]]: - """ - Extract files from a CPIO archive (SWU file). - - Args: - archive_path: Path to the .swu (CPIO) file - output_dir: Directory to extract files to (for full extraction) - filter_files: List of specific filenames to extract (selective extraction) - - Returns: - If filter_files is provided: dict mapping filename -> file data (bytes) - If output_dir is provided: None (files written to disk) - - Raises: - ValueError: If invalid CPIO format - FileNotFoundError: If archive doesn't exist - """ - archive_path = Path(archive_path) - if not archive_path.exists(): - raise FileNotFoundError(f"Archive not found: {archive_path}") - - with open(archive_path, 'rb') as f: - archive_data = f.read() - - offset = 0 - extracted = {} if filter_files else None - - while offset < len(archive_data): - if offset + 110 > len(archive_data): - break - - header_bytes = archive_data[offset:offset + 110] - offset += 110 - - try: - header = _parse_cpio_header(header_bytes) - except ValueError: - break - - namesize = header['namesize'] - if offset + namesize > len(archive_data): - break - - filename_bytes = archive_data[offset:offset + namesize] - offset += namesize - - offset += _align_to_4(offset) - - filename = filename_bytes.rstrip(b'\x00').decode('utf-8') - - if filename == 'TRAILER!!!': - break - - filesize = header['filesize'] - if offset + filesize > len(archive_data): - break - - file_data = archive_data[offset:offset + filesize] - offset += filesize - - offset += _align_to_4(offset) - - mode = header['mode'] - is_dir = stat.S_ISDIR(mode) - is_symlink = stat.S_ISLNK(mode) - is_regular = stat.S_ISREG(mode) - - if filter_files is not None: - if filename in filter_files: - extracted[filename] = file_data - if len(extracted) == len(filter_files): - return extracted - - elif output_dir is not None: - output_path = Path(output_dir) / filename - - if is_dir: - output_path.mkdir(parents=True, exist_ok=True) - elif is_symlink: - link_target = file_data.decode('utf-8') - output_path.parent.mkdir(parents=True, exist_ok=True) - if output_path.exists() or output_path.is_symlink(): - output_path.unlink() - output_path.symlink_to(link_target) - elif is_regular: - output_path.parent.mkdir(parents=True, exist_ok=True) - with open(output_path, 'wb') as out_f: - out_f.write(file_data) - os.chmod(output_path, mode & 0o777) - - if filter_files is not None: - return extracted - return None diff --git a/codexctl/device.py b/codexctl/device.py index da056b5..92de1ee 100644 --- a/codexctl/device.py +++ b/codexctl/device.py @@ -301,18 +301,19 @@ def _read_version_from_path(self, ftp, base_path: str = "") -> tuple[str, bool]: try: update_conf_path = f"{base_path}/usr/share/remarkable/update.conf" if base_path else "/usr/share/remarkable/update.conf" with ftp.file(update_conf_path) as file: - version = re.search( - "(?<=REMARKABLE_RELEASE_VERSION=).*", - file.read().decode("utf-8").strip("\n"), - ).group() - except Exception: + contents = file.read().decode("utf-8").strip("\n") + match = re.search("(?<=REMARKABLE_RELEASE_VERSION=).*", contents) + if not match: + raise ValueError(f"REMARKABLE_RELEASE_VERSION not found in {update_conf_path}") + version = match.group() + except (IOError, OSError): os_release_path = f"{base_path}/etc/os-release" if base_path else "/etc/os-release" with ftp.file(os_release_path) as file: - version = ( - re.search("(?<=IMG_VERSION=).*", file.read().decode("utf-8")) - .group() - .strip('"') - ) + contents = file.read().decode("utf-8") + match = re.search("(?<=IMG_VERSION=).*", contents) + if not match: + raise ValueError(f"IMG_VERSION not found in {os_release_path}") + version = match.group().strip('"') old_update_engine = False return version, old_update_engine @@ -386,11 +387,11 @@ def _get_paper_pro_partition_info(self, current_version: str) -> tuple[int, int, current_part = int(active_device.split('p')[-1]) inactive_part = 3 if current_part == 2 else 2 - try: - version_parts = [int(x) for x in current_version.split('.')[:2]] - is_new_version = version_parts >= [3, 22] - except Exception: - is_new_version = True + parts = current_version.split('.') + if len(parts) >= 2 and parts[0].isdigit() and parts[1].isdigit(): + is_new_version = [int(parts[0]), int(parts[1])] >= [3, 22] + else: + raise ValueError(f"Cannot detect partition scheme: unexpected version format '{current_version}'") next_boot_part = current_part @@ -400,7 +401,7 @@ def _get_paper_pro_partition_info(self, current_version: str) -> tuple[int, int, with ftp.file("/sys/bus/mmc/devices/mmc0:0001/boot_part") as file: boot_part_value = file.read().decode("utf-8").strip() next_boot_part = 2 if boot_part_value == "1" else 3 - except Exception: + except (IOError, OSError): is_new_version = False if not is_new_version: @@ -409,9 +410,8 @@ def _get_paper_pro_partition_info(self, current_version: str) -> tuple[int, int, with ftp.file("/sys/devices/platform/lpgpr/root_part") as file: root_part_value = file.read().decode("utf-8").strip() next_boot_part = 2 if root_part_value == "a" else 3 - except Exception as e: + except (IOError, OSError) as e: self.logger.debug(f"Failed to read next boot partition: {e}") - pass return current_part, inactive_part, next_boot_part @@ -573,19 +573,18 @@ def restore_previous_version(self) -> None: current_part, inactive_part, _ = self._get_paper_pro_partition_info(current_version) new_part_label = "a" if inactive_part == 2 else "b" - old_part_label = "a" if current_part == 2 else "b" - try: - current_version_parts = [int(x) for x in current_version.split('.')[:2]] - current_is_new = current_version_parts >= [3, 22] - except Exception: - current_is_new = False + parts = current_version.split('.') + if len(parts) >= 2 and parts[0].isdigit() and parts[1].isdigit(): + current_is_new = [int(parts[0]), int(parts[1])] >= [3, 22] + else: + raise ValueError(f"Cannot restore: unexpected current version format '{current_version}'") - try: - target_version_parts = [int(x) for x in backup_version.split('.')[:2]] - target_is_new = target_version_parts >= [3, 22] - except Exception: - target_is_new = False + parts = backup_version.split('.') + if len(parts) >= 2 and parts[0].isdigit() and parts[1].isdigit(): + target_is_new = [int(parts[0]), int(parts[1])] >= [3, 22] + else: + raise ValueError(f"Cannot restore: unexpected backup version format '{backup_version}'") RESTORE_CODE = "#!/bin/bash\n" RESTORE_CODE += f"echo 'Switching from partition {current_part} to partition {inactive_part}'\n" @@ -658,13 +657,13 @@ def reboot_device(self) -> None: self.logger.debug("Device rebooted") - def install_sw_update(self, version_file: str, bootloader_files: Optional[Dict[str, bytes]] = None) -> None: + def install_sw_update(self, version_file: str, bootloader_files: dict[str, bytes] | None = None) -> None: """ Installs new version from version file path, utilising swupdate Args: version_file (str): Path to img file - bootloader_files (Optional[Dict[str, bytes]]): Bootloader files for Paper Pro downgrade + bootloader_files (dict[str, bytes] | None): Bootloader files for Paper Pro downgrade Raises: SystemExit: If there was an error installing the update diff --git a/codexctl/updates.py b/codexctl/updates.py index 7a73466..4d215c1 100644 --- a/codexctl/updates.py +++ b/codexctl/updates.py @@ -86,6 +86,11 @@ def get_remarkable_versions(self) -> tuple[dict, dict, dict, dict, list]: provider_urls = contents.get("external-provider-urls", contents.get("external-provider-url")) if isinstance(provider_urls, str): provider_urls = [provider_urls] + if provider_urls is None: + raise SystemError( + f"version-ids.json at {file_location} is missing external provider URLs. " + "Please delete the file and try again, or open an issue on the repo." + ) return ( contents["remarkablepp"], diff --git a/requirements.txt b/requirements.txt index b3a2f42..a907ab9 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,4 +1,4 @@ requests==2.32.4 loguru==0.7.3 -remarkable-update-image==1.2; sys_platform != 'linux' +remarkable-update-image==1.2 remarkable-update-fuse==1.2.6; sys_platform == 'linux' diff --git a/tests/test_unit.py b/tests/test_unit.py new file mode 100644 index 0000000..477e800 --- /dev/null +++ b/tests/test_unit.py @@ -0,0 +1,65 @@ +import os +import sys + +sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))) + +from codexctl.updates import UpdateManager + +FAILED = False + + +def assert_value(msg, value, expected): + global FAILED + print(f"Testing {msg}: ", end="") + if value == expected: + print("pass") + return + + FAILED = True + print("fail") + print(f" {value} != {expected}") + + +assert_value( + "boundary cross 3.23->3.20", + UpdateManager.is_bootloader_boundary_downgrade("3.23.0.64", "3.20.0.92"), + True +) +assert_value( + "boundary cross 3.22->3.20", + UpdateManager.is_bootloader_boundary_downgrade("3.22.0.64", "3.20.0.92"), + True +) +assert_value( + "no boundary 3.23->3.22", + UpdateManager.is_bootloader_boundary_downgrade("3.23.0.64", "3.22.0.64"), + False +) +assert_value( + "no boundary 3.20->3.19", + UpdateManager.is_bootloader_boundary_downgrade("3.20.0.92", "3.19.0.82"), + False +) +assert_value( + "upgrade 3.20->3.22", + UpdateManager.is_bootloader_boundary_downgrade("3.20.0.92", "3.22.0.64"), + False +) +assert_value( + "same version 3.22->3.22", + UpdateManager.is_bootloader_boundary_downgrade("3.22.0.64", "3.22.0.64"), + False +) +assert_value( + "empty string current", + UpdateManager.is_bootloader_boundary_downgrade("", "3.20.0.92"), + False +) +assert_value( + "non-numeric version", + UpdateManager.is_bootloader_boundary_downgrade("abc.def", "3.20.0.92"), + False +) + +if FAILED: + sys.exit(1) From 570b588e549745960b4bead6dbe3e5984e661b63 Mon Sep 17 00:00:00 2001 From: Mitchell Scott <10804314+rmitchellscott@users.noreply.github.com> Date: Sun, 23 Nov 2025 14:48:00 -0700 Subject: [PATCH 05/15] fix: additional coderabbit recs --- codexctl/__init__.py | 27 ++++++++++++++++----------- codexctl/analysis.py | 12 ++++++++---- codexctl/device.py | 11 ++++++++--- 3 files changed, 32 insertions(+), 18 deletions(-) diff --git a/codexctl/__init__.py b/codexctl/__init__.py index 080985f..7424afd 100644 --- a/codexctl/__init__.py +++ b/codexctl/__init__.py @@ -367,20 +367,25 @@ def version_lookup(version: str | None) -> re.Match[str] | None: "./" ) - if current_swu_path: - print("Extracting bootloader files...") - from .analysis import extract_swu_files - bootloader_files_for_install = extract_swu_files( - current_swu_path, - filter_files=['update-bootloader.sh', 'imx-boot'] + if not current_swu_path: + raise SystemError( + f"Failed to download current version {current_version} for bootloader extraction. " + f"This is required for safe downgrade across bootloader boundary." ) - if not bootloader_files_for_install or len(bootloader_files_for_install) != 2: - raise SystemError("Failed to extract bootloader files from current version") + print("Extracting bootloader files...") + from .analysis import extract_swu_files + bootloader_files_for_install = extract_swu_files( + current_swu_path, + filter_files=['update-bootloader.sh', 'imx-boot'] + ) + + if not bootloader_files_for_install or len(bootloader_files_for_install) != 2: + raise SystemError("Failed to extract bootloader files from current version") - print(f"✓ Extracted update-bootloader.sh ({len(bootloader_files_for_install['update-bootloader.sh'])} bytes)") - print(f"✓ Extracted imx-boot ({len(bootloader_files_for_install['imx-boot'])} bytes)") - print() + print(f"✓ Extracted update-bootloader.sh ({len(bootloader_files_for_install['update-bootloader.sh'])} bytes)") + print(f"✓ Extracted imx-boot ({len(bootloader_files_for_install['imx-boot'])} bytes)") + print() if not update_file_requires_new_engine: if update_file: # Check if file exists diff --git a/codexctl/analysis.py b/codexctl/analysis.py index ddc366d..1e3e3d2 100644 --- a/codexctl/analysis.py +++ b/codexctl/analysis.py @@ -57,7 +57,7 @@ def get_swu_metadata(swu_file: str) -> Tuple[str, HardwareType]: if b"sw-description" not in archive.keys(): raise ValueError(f"Not a valid SWU file: {swu_file}") - sw_desc = archive["sw-description"].read().decode("utf-8") + sw_desc = archive[b"sw-description"].read().decode("utf-8") info = libconf.loads(sw_desc)["software"] version = info.get("version") @@ -104,7 +104,7 @@ def extract_swu_files( archive.open() try: if output_dir is not None: - output_path = Path(output_dir) + output_path = Path(output_dir).resolve() output_path.mkdir(parents=True, exist_ok=True) for name in archive.keys(): @@ -112,7 +112,11 @@ def extract_swu_files( continue filename = name.decode('utf-8') - file_path = output_path / filename + file_path = (output_path / filename).resolve() + + if not file_path.is_relative_to(output_path): + raise ValueError(f"Path traversal detected: {filename} resolves outside output directory") + file_path.parent.mkdir(parents=True, exist_ok=True) with open(file_path, 'wb') as f: @@ -129,7 +133,7 @@ def extract_swu_files( extracted[name.decode('utf-8')] = archive[name].read() else: for filename in filter_files: - entry = archive.get(filename) + entry = archive.get(filename.encode('utf-8')) if entry: extracted[filename] = entry.read() diff --git a/codexctl/device.py b/codexctl/device.py index 92de1ee..5ca2051 100644 --- a/codexctl/device.py +++ b/codexctl/device.py @@ -380,7 +380,7 @@ def _get_paper_pro_partition_info(self, current_version: str) -> tuple[int, int, tuple: (current_partition, inactive_partition, next_boot_partition) """ if not self.client: - raise RuntimeError("SSH client required for partition detection") + raise SystemError("SSH client required for partition detection") _stdin, stdout, _stderr = self.client.exec_command("swupdate -g") active_device = stdout.read().decode("utf-8").strip() @@ -578,13 +578,18 @@ def restore_previous_version(self) -> None: if len(parts) >= 2 and parts[0].isdigit() and parts[1].isdigit(): current_is_new = [int(parts[0]), int(parts[1])] >= [3, 22] else: - raise ValueError(f"Cannot restore: unexpected current version format '{current_version}'") + raise SystemError(f"Cannot restore: unexpected current version format '{current_version}'") + + if backup_version == "unknown": + raise SystemError( + "Cannot restore: backup partition version could not be determined. " + ) parts = backup_version.split('.') if len(parts) >= 2 and parts[0].isdigit() and parts[1].isdigit(): target_is_new = [int(parts[0]), int(parts[1])] >= [3, 22] else: - raise ValueError(f"Cannot restore: unexpected backup version format '{backup_version}'") + raise SystemError(f"Cannot restore: unexpected backup version format '{backup_version}'") RESTORE_CODE = "#!/bin/bash\n" RESTORE_CODE += f"echo 'Switching from partition {current_part} to partition {inactive_part}'\n" From 2ce4631f61bc0afb1858605cd2e3320559fce22b Mon Sep 17 00:00:00 2001 From: Mitchell Scott <10804314+rmitchellscott@users.noreply.github.com> Date: Sun, 23 Nov 2025 15:22:13 -0700 Subject: [PATCH 06/15] fix: integrate bootloader test into existing workflow --- .github/workflows/main.yml | 19 ----------- Makefile | 6 +--- codexctl/device.py | 10 +++--- tests/test.py | 45 ++++++++++++++++++++++++-- tests/test_unit.py | 65 -------------------------------------- 5 files changed, 49 insertions(+), 96 deletions(-) delete mode 100644 tests/test_unit.py diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml index 92aee3f..998ba52 100644 --- a/.github/workflows/main.yml +++ b/.github/workflows/main.yml @@ -33,27 +33,8 @@ defaults: shell: bash jobs: - unit-tests: - name: Unit Tests - runs-on: ubuntu-latest - steps: - - name: Checkout repository - uses: actions/checkout@v4 - - uses: actions/setup-python@v5 - with: - python-version: '3.12' - cache: 'pip' - cache-dependency-path: | - **/requirements*.txt - - name: Install dependencies - run: | - python -m pip install -r requirements.txt - - name: Run unit tests - run: python tests/test_unit.py - remote: name: Build for ${{ matrix.os }} - needs: [unit-tests] strategy: fail-fast: false matrix: diff --git a/Makefile b/Makefile index c0c3413..0489a05 100644 --- a/Makefile +++ b/Makefile @@ -50,11 +50,7 @@ $(VENV_BIN_ACTIVATE): requirements.remote.txt requirements.txt python -m codexctl download --hardware rm2 --out .venv ${FW_VERSION} test: $(VENV_BIN_ACTIVATE) .venv/${FW_VERSION}_reMarkable2-${FW_DATA}.signed - @echo "[info] Running unit tests" - @set -e; \ - . $(VENV_BIN_ACTIVATE); \ - python tests/test_unit.py - @echo "[info] Running integration tests" + @echo "[info] Running tests" @set -e; \ . $(VENV_BIN_ACTIVATE); \ python tests/test.py; \ diff --git a/codexctl/device.py b/codexctl/device.py index 5ca2051..0bd5196 100644 --- a/codexctl/device.py +++ b/codexctl/device.py @@ -391,7 +391,7 @@ def _get_paper_pro_partition_info(self, current_version: str) -> tuple[int, int, if len(parts) >= 2 and parts[0].isdigit() and parts[1].isdigit(): is_new_version = [int(parts[0]), int(parts[1])] >= [3, 22] else: - raise ValueError(f"Cannot detect partition scheme: unexpected version format '{current_version}'") + raise SystemError(f"Cannot detect partition scheme: unexpected version format '{current_version}'") next_boot_part = current_part @@ -438,15 +438,15 @@ def get_device_status(self) -> tuple[str | None, str, str, str, str]: else: if os.path.exists("/usr/share/remarkable/update.conf"): - with open("/usr/share/remarkable/update.conf") as file: + with open("/usr/share/remarkable/update.conf", encoding="utf-8") as file: xochitl_version = re.search( "(?<=REMARKABLE_RELEASE_VERSION=).*", - file.read().decode("utf-8").strip("\n"), + file.read().strip("\n"), ).group() else: - with open("/etc/os-release") as file: + with open("/etc/os-release", encoding="utf-8") as file: xochitl_version = ( - re.search("(?<=IMG_VERSION=).*", file.read().decode("utf-8")) + re.search("(?<=IMG_VERSION=).*", file.read()) .group() .strip('"') ) diff --git a/tests/test.py b/tests/test.py index cdbbe05..38cb88e 100644 --- a/tests/test.py +++ b/tests/test.py @@ -198,8 +198,8 @@ def test_cat(path, expected): test_cat("/etc/version", b"20221026104022\n") -assert_value("latest rm1 version", updater.get_latest_version(HardwareType.RM1), "3.20.0.92") -assert_value("latest rm2 version", updater.get_latest_version(HardwareType.RM2), "3.20.0.92") +assert_value("latest rm1 version", updater.get_latest_version(HardwareType.RM1), "3.23.0.64") +assert_value("latest rm2 version", updater.get_latest_version(HardwareType.RM2), "3.23.0.64") # Don't think this test is needed. assert_gt( @@ -215,5 +215,46 @@ def test_cat(path, expected): with assert_raises("toltec rmpp version", SystemExit): updater.get_toltec_version(HardwareType.RMPP) +assert_value( + "boundary cross 3.23->3.20", + UpdateManager.is_bootloader_boundary_downgrade("3.23.0.64", "3.20.0.92"), + True +) +assert_value( + "boundary cross 3.22->3.20", + UpdateManager.is_bootloader_boundary_downgrade("3.22.0.64", "3.20.0.92"), + True +) +assert_value( + "no boundary 3.23->3.22", + UpdateManager.is_bootloader_boundary_downgrade("3.23.0.64", "3.22.0.64"), + False +) +assert_value( + "no boundary 3.20->3.19", + UpdateManager.is_bootloader_boundary_downgrade("3.20.0.92", "3.19.0.82"), + False +) +assert_value( + "upgrade 3.20->3.22", + UpdateManager.is_bootloader_boundary_downgrade("3.20.0.92", "3.22.0.64"), + False +) +assert_value( + "same version 3.22->3.22", + UpdateManager.is_bootloader_boundary_downgrade("3.22.0.64", "3.22.0.64"), + False +) +assert_value( + "empty string current", + UpdateManager.is_bootloader_boundary_downgrade("", "3.20.0.92"), + False +) +assert_value( + "non-numeric version", + UpdateManager.is_bootloader_boundary_downgrade("abc.def", "3.20.0.92"), + False +) + if FAILED: sys.exit(1) diff --git a/tests/test_unit.py b/tests/test_unit.py deleted file mode 100644 index 477e800..0000000 --- a/tests/test_unit.py +++ /dev/null @@ -1,65 +0,0 @@ -import os -import sys - -sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))) - -from codexctl.updates import UpdateManager - -FAILED = False - - -def assert_value(msg, value, expected): - global FAILED - print(f"Testing {msg}: ", end="") - if value == expected: - print("pass") - return - - FAILED = True - print("fail") - print(f" {value} != {expected}") - - -assert_value( - "boundary cross 3.23->3.20", - UpdateManager.is_bootloader_boundary_downgrade("3.23.0.64", "3.20.0.92"), - True -) -assert_value( - "boundary cross 3.22->3.20", - UpdateManager.is_bootloader_boundary_downgrade("3.22.0.64", "3.20.0.92"), - True -) -assert_value( - "no boundary 3.23->3.22", - UpdateManager.is_bootloader_boundary_downgrade("3.23.0.64", "3.22.0.64"), - False -) -assert_value( - "no boundary 3.20->3.19", - UpdateManager.is_bootloader_boundary_downgrade("3.20.0.92", "3.19.0.82"), - False -) -assert_value( - "upgrade 3.20->3.22", - UpdateManager.is_bootloader_boundary_downgrade("3.20.0.92", "3.22.0.64"), - False -) -assert_value( - "same version 3.22->3.22", - UpdateManager.is_bootloader_boundary_downgrade("3.22.0.64", "3.22.0.64"), - False -) -assert_value( - "empty string current", - UpdateManager.is_bootloader_boundary_downgrade("", "3.20.0.92"), - False -) -assert_value( - "non-numeric version", - UpdateManager.is_bootloader_boundary_downgrade("abc.def", "3.20.0.92"), - False -) - -if FAILED: - sys.exit(1) From 9d136b4407532a5853a4305cb811948178afc375 Mon Sep 17 00:00:00 2001 From: Mitchell Scott <10804314+rmitchellscott@users.noreply.github.com> Date: Sun, 23 Nov 2025 15:47:44 -0700 Subject: [PATCH 07/15] fix: improve error handling in device version detection and bootloader update --- codexctl/device.py | 35 +++++++++++++++++++++-------------- 1 file changed, 21 insertions(+), 14 deletions(-) diff --git a/codexctl/device.py b/codexctl/device.py index 0bd5196..31384b3 100644 --- a/codexctl/device.py +++ b/codexctl/device.py @@ -296,27 +296,29 @@ def _read_version_from_path(self, ftp, base_path: str = "") -> tuple[str, bool]: Returns: tuple: (version_string, old_update_engine_boolean) """ - old_update_engine = True + update_conf_path = f"{base_path}/usr/share/remarkable/update.conf" if base_path else "/usr/share/remarkable/update.conf" + os_release_path = f"{base_path}/etc/os-release" if base_path else "/etc/os-release" try: - update_conf_path = f"{base_path}/usr/share/remarkable/update.conf" if base_path else "/usr/share/remarkable/update.conf" + ftp.stat(update_conf_path) with ftp.file(update_conf_path) as file: contents = file.read().decode("utf-8").strip("\n") match = re.search("(?<=REMARKABLE_RELEASE_VERSION=).*", contents) if not match: - raise ValueError(f"REMARKABLE_RELEASE_VERSION not found in {update_conf_path}") - version = match.group() + raise SystemError(f"REMARKABLE_RELEASE_VERSION not found in {update_conf_path}") + return match.group(), True except (IOError, OSError): - os_release_path = f"{base_path}/etc/os-release" if base_path else "/etc/os-release" + pass + + try: with ftp.file(os_release_path) as file: contents = file.read().decode("utf-8") match = re.search("(?<=IMG_VERSION=).*", contents) if not match: - raise ValueError(f"IMG_VERSION not found in {os_release_path}") - version = match.group().strip('"') - old_update_engine = False - - return version, old_update_engine + raise SystemError(f"IMG_VERSION not found in {os_release_path}") + return match.group().strip('"'), False + except (IOError, OSError) as e: + raise SystemError(f"Cannot read version from {base_path or 'current partition'}: {e}") from e def _get_backup_partition_version(self) -> str: """Gets the version installed on the backup (inactive) partition @@ -357,7 +359,7 @@ def _get_backup_partition_version(self) -> str: try: version, _ = self._read_version_from_path(ftp, mount_point) - except Exception as e: + except (IOError, OSError, SystemError) as e: self.logger.debug(f"Failed to read version from backup partition: {e}") version = "unknown" @@ -366,7 +368,7 @@ def _get_backup_partition_version(self) -> str: return version - except Exception as e: + except (IOError, OSError, SystemError, paramiko.SSHException) as e: self.logger.debug(f"Error getting backup partition version: {e}") return "unknown" @@ -798,7 +800,11 @@ def _update_paper_pro_bootloader(self, bootloader_script: bytes, imx_boot: bytes if not self.client: raise SystemError("No SSH connection to device") - ftp_client = self.client.open_sftp() + ftp_client = None + try: + ftp_client = self.client.open_sftp() + except Exception: + raise SystemError("Failed to open SFTP connection for bootloader update") script_path = "/tmp/update-bootloader.sh" boot_image_path = "/tmp/imx-boot" @@ -849,7 +855,8 @@ def _update_paper_pro_bootloader(self, bootloader_script: bytes, imx_boot: bytes self.logger.debug("Cleaning up local temporary files") os.unlink(tmp_script_path) os.unlink(tmp_boot_path) - ftp_client.close() + if ftp_client: + ftp_client.close() def install_ohma_update(self, version_available: dict) -> None: """Installs version from update folder on the device From 02bbbb4bb22b6b527ab387afe9ac54ee0e47c408 Mon Sep 17 00:00:00 2001 From: Mitchell Scott <10804314+rmitchellscott@users.noreply.github.com> Date: Mon, 1 Dec 2025 11:55:45 -0700 Subject: [PATCH 08/15] fix: batch of suggestions Co-authored-by: Nathaniel van Diepen --- README.md | 4 ++-- codexctl/analysis.py | 2 +- requirements.txt | 2 +- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/README.md b/README.md index 8ef3e3b..96aa5a2 100644 --- a/README.md +++ b/README.md @@ -57,7 +57,7 @@ codexctl install latest codexctl download 3.15.4.2 --hardware rmpp -o out codexctl install ./out/remarkable-ct-prototype-image-3.15.4.2-ferrari-public.swu ``` -- (Paper Pro Move) version 3.23.0.64 to a folder named `out` +- Downloading rmppm version 3.23.0.64 to a folder named `out` and then installing it ``` codexctl download 3.23.0.64 --hardware rmppm -o out codexctl install ./out/remarkable-production-image-3.23.0.64-chiappa-public.swu @@ -84,7 +84,7 @@ codexctl restore codexctl download 3.8.0.1944 --hardware rm2 codexctl cat 3.8.0.1944_reMarkable2-7eGpAv7sYB.signed /etc/version ``` -- Extract a 3.11+ SWU firmware file to see its contents +- Extract the contents of an upgrade file to a folder named `extracted` ``` codexctl extract remarkable-production-image-3.22.0.64-ferrari-public.swu -o extracted ``` diff --git a/codexctl/analysis.py b/codexctl/analysis.py index 1e3e3d2..41bec92 100644 --- a/codexctl/analysis.py +++ b/codexctl/analysis.py @@ -37,7 +37,7 @@ def get_update_image(file: str): return image, volume -def get_swu_metadata(swu_file: str) -> Tuple[str, HardwareType]: +def get_swu_metadata(swu_file: str) -> tuple[str, HardwareType]: """ Extract version and hardware type from an SWU file. diff --git a/requirements.txt b/requirements.txt index a907ab9..b3a2f42 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,4 +1,4 @@ requests==2.32.4 loguru==0.7.3 -remarkable-update-image==1.2 +remarkable-update-image==1.2; sys_platform != 'linux' remarkable-update-fuse==1.2.6; sys_platform == 'linux' From 9d1cffb0894e5c2c5e00703b6cc54828b729dd76 Mon Sep 17 00:00:00 2001 From: Mitchell Scott <10804314+rmitchellscott@users.noreply.github.com> Date: Mon, 1 Dec 2025 15:08:33 -0700 Subject: [PATCH 09/15] fix: simplify update image handling --- Makefile | 2 +- README.md | 2 +- codexctl/__init__.py | 57 +++++++-------- codexctl/analysis.py | 100 +++----------------------- codexctl/device.py | 167 ++++++++++++++++++++++--------------------- codexctl/updates.py | 21 ++++-- tests/test.py | 20 ++---- 7 files changed, 145 insertions(+), 224 deletions(-) diff --git a/Makefile b/Makefile index 0489a05..0184d66 100644 --- a/Makefile +++ b/Makefile @@ -50,7 +50,7 @@ $(VENV_BIN_ACTIVATE): requirements.remote.txt requirements.txt python -m codexctl download --hardware rm2 --out .venv ${FW_VERSION} test: $(VENV_BIN_ACTIVATE) .venv/${FW_VERSION}_reMarkable2-${FW_DATA}.signed - @echo "[info] Running tests" + @echo "[info] Running test" @set -e; \ . $(VENV_BIN_ACTIVATE); \ python tests/test.py; \ diff --git a/README.md b/README.md index 96aa5a2..3a92a2f 100644 --- a/README.md +++ b/README.md @@ -57,7 +57,7 @@ codexctl install latest codexctl download 3.15.4.2 --hardware rmpp -o out codexctl install ./out/remarkable-ct-prototype-image-3.15.4.2-ferrari-public.swu ``` -- Downloading rmppm version 3.23.0.64 to a folder named `out` and then installing it +- Downloading rmppm version 3.23.0.64 to a folder named `out` and then installing it ``` codexctl download 3.23.0.64 --hardware rmppm -o out codexctl install ./out/remarkable-production-image-3.23.0.64-chiappa-public.swu diff --git a/codexctl/__init__.py b/codexctl/__init__.py index 7424afd..80b785b 100644 --- a/codexctl/__init__.py +++ b/codexctl/__init__.py @@ -106,30 +106,20 @@ def call_func(self, function: str, args: dict) -> None: logger.debug(f"Extracting {args['file']} to {args['out']}") - # Check CPIO magic to route between SWU (CPIO) and old .signed format - with open(args["file"], "rb") as f: - magic = f.read(6) + try: + from remarkable_update_image import UpdateImage + except ImportError: + raise ImportError( + "remarkable_update_image is required for extraction. Please install it!" + ) from None - if magic in (b'070701', b'070702'): - logger.info("Detected CPIO format (3.11+ SWU file)") - from .analysis import extract_swu_files + image = UpdateImage(args["file"]) + image.seek(0) - extract_swu_files(args["file"], output_dir=args["out"]) - logger.info(f"Extracted SWU contents to {args['out']}") - else: - logger.info("Detected old format (<3.11 .signed file)") - try: - from .analysis import get_update_image - except ImportError: - raise ImportError( - "remarkable_update_image is required for extracting old format files. Please install it!" - ) - - image, volume = get_update_image(args["file"]) - image.seek(0) + with open(args["out"], "wb") as f: + f.write(image.read()) - with open(args["out"], "wb") as f: - f.write(image.read()) + logger.info(f"Extracted image to {args['out']}") else: try: from .analysis import get_update_image @@ -263,11 +253,11 @@ def version_lookup(version: str | None) -> re.Match[str] | None: if update_file: try: - # Quick magic check to skip expensive metadata extraction on old .signed files - with open(update_file, "rb") as f: - magic = f.read(6) + from remarkable_update_image import UpdateImage + from remarkable_update_image.cpio import UpdateImage as CPIOUpdateImage - if magic in (b'070701', b'070702'): + image = UpdateImage(update_file) + if isinstance(image, CPIOUpdateImage): from .analysis import get_swu_metadata version_number, swu_hardware = get_swu_metadata(update_file) logger.info(f"Extracted from SWU: version={version_number}, hardware={swu_hardware.name}") @@ -280,7 +270,7 @@ def version_lookup(version: str | None) -> re.Match[str] | None: f"Cannot install firmware for different hardware." ) except ValueError as e: - logger.warning(f"Could not extract metadata from SWU: {e}") + logger.warning(f"Could not extract metadata from update file: {e}") if not version_number: version_match = version_lookup(version) @@ -374,13 +364,14 @@ def version_lookup(version: str | None) -> re.Match[str] | None: ) print("Extracting bootloader files...") - from .analysis import extract_swu_files - bootloader_files_for_install = extract_swu_files( - current_swu_path, - filter_files=['update-bootloader.sh', 'imx-boot'] - ) - - if not bootloader_files_for_install or len(bootloader_files_for_install) != 2: + from remarkable_update_image import UpdateImage + swu_image = UpdateImage(current_swu_path) + bootloader_files_for_install = { + 'update-bootloader.sh': swu_image['update-bootloader.sh'].read(), + 'imx-boot': swu_image['imx-boot'].read(), + } + + if not all(bootloader_files_for_install.values()): raise SystemError("Failed to extract bootloader files from current version") print(f"✓ Extracted update-bootloader.sh ({len(bootloader_files_for_install['update-bootloader.sh'])} bytes)") diff --git a/codexctl/analysis.py b/codexctl/analysis.py index 41bec92..7d15787 100644 --- a/codexctl/analysis.py +++ b/codexctl/analysis.py @@ -1,12 +1,9 @@ import ext4 import warnings import errno -import libconf from remarkable_update_image import UpdateImage from remarkable_update_image import UpdateImageSignatureException -from remarkable_update_image.cpio import Archive -from typing import Tuple, Optional, Dict from .device import HardwareType @@ -48,95 +45,18 @@ def get_swu_metadata(swu_file: str) -> tuple[str, HardwareType]: Tuple of (version, hardware_type) Raises: - ValueError: If sw-description is missing or invalid SystemError: If hardware type is unsupported """ - archive = Archive(swu_file) - archive.open() - try: - if b"sw-description" not in archive.keys(): - raise ValueError(f"Not a valid SWU file: {swu_file}") - - sw_desc = archive[b"sw-description"].read().decode("utf-8") - info = libconf.loads(sw_desc)["software"] - - version = info.get("version") - if not version: - raise ValueError(f"No version found in sw-description: {swu_file}") - - if "reMarkable1" in info: - hardware = HardwareType.RM1 - elif "reMarkable2" in info: - hardware = HardwareType.RM2 - elif "ferrari" in info: - hardware = HardwareType.RMPP - elif "chiappa" in info: - hardware = HardwareType.RMPPM - else: - raise SystemError(f"Unsupported hardware type in SWU file: {swu_file}") - - return version, hardware - finally: - archive.close() - - -def extract_swu_files( - swu_file: str, - output_dir: Optional[str] = None, - filter_files: Optional[list] = None -) -> Optional[Dict[str, bytes]]: - """ - Extract files from an SWU (CPIO) archive. - - Args: - swu_file: Path to the SWU file - output_dir: Directory to extract files to (for full extraction to disk) - filter_files: List of filenames to extract (selective extraction) - - Returns: - If filter_files is provided: dict mapping filename -> file data (bytes) - If output_dir is provided: None (files written to disk) - """ - import os - from pathlib import Path - - archive = Archive(swu_file) - archive.open() - try: - if output_dir is not None: - output_path = Path(output_dir).resolve() - output_path.mkdir(parents=True, exist_ok=True) - - for name in archive.keys(): - if name == b"TRAILER!!!": - continue - - filename = name.decode('utf-8') - file_path = (output_path / filename).resolve() - - if not file_path.is_relative_to(output_path): - raise ValueError(f"Path traversal detected: {filename} resolves outside output directory") - - file_path.parent.mkdir(parents=True, exist_ok=True) - - with open(file_path, 'wb') as f: - f.write(archive[name].read()) - - return None + image = UpdateImage(swu_file) - else: - extracted = {} + hw_map = { + "reMarkable1": HardwareType.RM1, + "reMarkable2": HardwareType.RM2, + "ferrari": HardwareType.RMPP, + "chiappa": HardwareType.RMPPM, + } - if filter_files is None: - for name in archive.keys(): - if name != b"TRAILER!!!": - extracted[name.decode('utf-8')] = archive[name].read() - else: - for filename in filter_files: - entry = archive.get(filename.encode('utf-8')) - if entry: - extracted[filename] = entry.read() + if image.hardware_type not in hw_map: + raise SystemError(f"Unsupported hardware type in SWU file: {swu_file}") - return extracted - finally: - archive.close() + return image.version, hw_map[image.hardware_type] diff --git a/codexctl/device.py b/codexctl/device.py index 31384b3..65ac0e8 100644 --- a/codexctl/device.py +++ b/codexctl/device.py @@ -6,7 +6,6 @@ import re import os import time -from typing import Optional, Dict from .server import startUpdate @@ -25,16 +24,21 @@ class HardwareType(enum.Enum): @classmethod def parse(cls, device_type: str) -> "HardwareType": - if device_type.lower() in ("ppm", "rmppm", "chiappa", "remarkable chiappa"): - return cls.RMPPM - elif device_type.lower() in ("pp", "pro", "rmpp", "ferrari", "remarkable ferrari"): - return cls.RMPP - elif device_type.lower() in ("2", "rm2", "remarkable 2", "remarkable 2.0"): - return cls.RM2 - elif device_type.lower() in ("1", "rm1", "remarkable 1", "remarkable 1.0", "remarkable prototype 1"): - return cls.RM1 + match device_type.lower(): + case "ppm" | "rmppm" | "chiappa" | "remarkable chiappa": + return cls.RMPPM - raise ValueError(f"Unknown hardware version: {device_type} (rm1, rm2, rmpp, rmppm)") + case "pp" | "pro" | "rmpp" | "ferrari" | "remarkable ferrari": + return cls.RMPP + + case "2" | "rm2" | "remarkable 2" | "remarkable 2.0": + return cls.RM2 + + case "1" | "rm1" | "remarkable 1" | "remarkable 1.0" | "remarkable prototype 1": + return cls.RM1 + + case _: + raise ValueError(f"Unknown hardware version: {device_type} (rm1, rm2, rmpp, rmppm)") @property def old_download_hw(self): @@ -44,9 +48,9 @@ def old_download_hw(self): case HardwareType.RM2: return "reMarkable2" case HardwareType.RMPP: - raise ValueError("ReMarkable Paper Pro does not support the old update engine") + raise ValueError("reMarkable Paper Pro does not support the old update engine") case HardwareType.RMPPM: - raise ValueError("ReMarkable Paper Pro Move does not support the old update engine") + raise ValueError("reMarkable Paper Pro Move does not support the old update engine") @property def new_download_hw(self): @@ -58,7 +62,7 @@ def new_download_hw(self): case HardwareType.RMPP: return "rmpp" case HardwareType.RMPPM: - return "chiappa" + return "rmppm" @property def swupdate_hw(self): @@ -80,9 +84,9 @@ def toltec_type(self): case HardwareType.RM2: return "rm2" case HardwareType.RMPP: - raise ValueError("ReMarkable Paper Pro does not support toltec") + raise ValueError("reMarkable Paper Pro does not support toltec") case HardwareType.RMPPM: - raise ValueError("ReMarkable Paper Pro Move does not support toltec") + raise ValueError("reMarkable Paper Pro Move does not support toltec") class DeviceManager: def __init__( @@ -299,79 +303,77 @@ def _read_version_from_path(self, ftp, base_path: str = "") -> tuple[str, bool]: update_conf_path = f"{base_path}/usr/share/remarkable/update.conf" if base_path else "/usr/share/remarkable/update.conf" os_release_path = f"{base_path}/etc/os-release" if base_path else "/etc/os-release" - try: - ftp.stat(update_conf_path) + def file_exists(path: str) -> bool: + try: + ftp.stat(path) + return True + except FileNotFoundError: + return False + + if file_exists(update_conf_path): with ftp.file(update_conf_path) as file: contents = file.read().decode("utf-8").strip("\n") match = re.search("(?<=REMARKABLE_RELEASE_VERSION=).*", contents) - if not match: - raise SystemError(f"REMARKABLE_RELEASE_VERSION not found in {update_conf_path}") - return match.group(), True - except (IOError, OSError): - pass + if match: + return match.group(), True + raise SystemError(f"REMARKABLE_RELEASE_VERSION not found in {update_conf_path}") - try: + if file_exists(os_release_path): with ftp.file(os_release_path) as file: contents = file.read().decode("utf-8") match = re.search("(?<=IMG_VERSION=).*", contents) - if not match: - raise SystemError(f"IMG_VERSION not found in {os_release_path}") - return match.group().strip('"'), False - except (IOError, OSError) as e: - raise SystemError(f"Cannot read version from {base_path or 'current partition'}: {e}") from e + if match: + return match.group().strip('"'), False + raise SystemError(f"IMG_VERSION not found in {os_release_path}") + + raise SystemError(f"Cannot read version from {base_path or 'current partition'}: no version file found") def _get_backup_partition_version(self) -> str: """Gets the version installed on the backup (inactive) partition Returns: - str: Version string or "unknown" + str: Version string + + Raises: + SystemError: If backup partition version cannot be determined """ if not self.client: - return "unknown" + raise SystemError("Cannot get backup partition version: no SSH client connection") - try: - ftp = self.client.open_sftp() - - if self.hardware in (HardwareType.RMPP, HardwareType.RMPPM): - _stdin, stdout, _stderr = self.client.exec_command("swupdate -g") - active_device = stdout.read().decode("utf-8").strip() - active_part = int(active_device.split('p')[-1]) - inactive_part = 3 if active_part == 2 else 2 - device_base = re.sub(r'p\d+$', '', active_device) - else: - _stdin, stdout, _stderr = self.client.exec_command("rootdev") - active_device = stdout.read().decode("utf-8").strip() - active_part = int(active_device.split('p')[-1]) - inactive_part = 3 if active_part == 2 else 2 - device_base = re.sub(r'p\d+$', '', active_device) + ftp = self.client.open_sftp() - mount_point = f"/tmp/mount_p{inactive_part}" + if self.hardware in (HardwareType.RMPP, HardwareType.RMPPM): + _stdin, stdout, _stderr = self.client.exec_command("swupdate -g") + active_device = stdout.read().decode("utf-8").strip() + active_part = int(active_device.split('p')[-1]) + inactive_part = 3 if active_part == 2 else 2 + device_base = re.sub(r'p\d+$', '', active_device) + else: + _stdin, stdout, _stderr = self.client.exec_command("rootdev") + active_device = stdout.read().decode("utf-8").strip() + active_part = int(active_device.split('p')[-1]) + inactive_part = 3 if active_part == 2 else 2 + device_base = re.sub(r'p\d+$', '', active_device) - self.client.exec_command(f"mkdir -p {mount_point}") - _stdin, stdout, _stderr = self.client.exec_command( - f"mount -o ro {device_base}p{inactive_part} {mount_point}" - ) - exit_status = stdout.channel.recv_exit_status() + mount_point = f"/tmp/mount_p{inactive_part}" - if exit_status != 0: - self.logger.debug(f"Failed to mount backup partition: {_stderr.read().decode('utf-8')}") - return "unknown" + self.client.exec_command(f"mkdir -p {mount_point}") + _stdin, stdout, _stderr = self.client.exec_command( + f"mount -o ro {device_base}p{inactive_part} {mount_point}" + ) + exit_status = stdout.channel.recv_exit_status() - try: - version, _ = self._read_version_from_path(ftp, mount_point) - except (IOError, OSError, SystemError) as e: - self.logger.debug(f"Failed to read version from backup partition: {e}") - version = "unknown" + if exit_status != 0: + error_msg = _stderr.read().decode('utf-8') + raise SystemError(f"Failed to mount backup partition: {error_msg}") + try: + version, _ = self._read_version_from_path(ftp, mount_point) + return version + finally: self.client.exec_command(f"umount {mount_point}") self.client.exec_command(f"rm -rf {mount_point}") - return version - - except (IOError, OSError, SystemError, paramiko.SSHException) as e: - self.logger.debug(f"Error getting backup partition version: {e}") - return "unknown" - def _get_paper_pro_partition_info(self, current_version: str) -> tuple[int, int, int]: """Gets partition information for Paper Pro devices @@ -593,26 +595,31 @@ def restore_previous_version(self) -> None: else: raise SystemError(f"Cannot restore: unexpected backup version format '{backup_version}'") - RESTORE_CODE = "#!/bin/bash\n" - RESTORE_CODE += f"echo 'Switching from partition {current_part} to partition {inactive_part}'\n" - RESTORE_CODE += f"echo 'Current version: {current_version}'\n" - RESTORE_CODE += f"echo 'Target version: {backup_version}'\n" + code = [ + "#!/bin/bash", + f"echo 'Switching from partition {current_part} to partition {inactive_part}'", + f"echo 'Current version: {current_version}'", + f"echo 'Target version: {backup_version}'", + ] - # Method 1: Legacy sysfs (if current OS < 3.22) if not current_is_new: - RESTORE_CODE += f"echo '{new_part_label}' > /sys/devices/platform/lpgpr/root_part\n" - RESTORE_CODE += "echo 'Set next boot via sysfs (legacy method)'\n" + code.extend([ + f"echo '{new_part_label}' > /sys/devices/platform/lpgpr/root_part", + "echo 'Set next boot via sysfs (legacy method)'", + ]) - # Method 2: MMC bootpart (if target OS >= 3.22 OR current OS >= 3.22) if target_is_new or current_is_new: - if inactive_part == 2: - RESTORE_CODE += "mmc bootpart enable 1 0 /dev/mmcblk0boot0\n" - else: - RESTORE_CODE += "mmc bootpart enable 2 0 /dev/mmcblk0boot1\n" - RESTORE_CODE += "echo 'Set next boot via mmc bootpart (new method)'\n" + code.extend([ + f"mmc bootpart enable {inactive_part - 1} 0 /dev/mmcblk0boot{inactive_part - 2}", + "echo 'Set next boot via mmc bootpart (new method)'", + ]) + + code.extend([ + f"echo '0' > /sys/devices/platform/lpgpr/root{new_part_label}_errcnt 2>/dev/null || true", + "echo 'Partition switch complete'", + ]) - RESTORE_CODE += f"echo '0' > /sys/devices/platform/lpgpr/root{new_part_label}_errcnt 2>/dev/null || true\n" - RESTORE_CODE += "echo 'Partition switch complete'\n" + RESTORE_CODE = "\n".join(code) if self.client: self.logger.debug("Connecting to FTP") diff --git a/codexctl/updates.py b/codexctl/updates.py index 4d215c1..44e3c41 100644 --- a/codexctl/updates.py +++ b/codexctl/updates.py @@ -411,14 +411,25 @@ def is_bootloader_boundary_downgrade(current_version: str, target_version: str) Returns: bool: True if crossing boundary downward (3.22+ -> <3.22) + + Raises: + ValueError: If either version is empty or has invalid format """ + if not current_version or not current_version.strip(): + raise ValueError("current_version cannot be empty") + if not target_version or not target_version.strip(): + raise ValueError("target_version cannot be empty") + try: current_parts = [int(x) for x in current_version.split('.')] target_parts = [int(x) for x in target_version.split('.')] + except ValueError as e: + raise ValueError(f"Invalid version format: {e}") from e + + if len(current_parts) < 2 or len(target_parts) < 2: + raise ValueError("Version must have at least 2 components (e.g., '3.22')") - current_is_322_or_higher = current_parts >= [3, 22] - target_is_below_322 = target_parts < [3, 22] + current_is_322_or_higher = current_parts >= [3, 22] + target_is_below_322 = target_parts < [3, 22] - return current_is_322_or_higher and target_is_below_322 - except (ValueError, IndexError): - return False + return current_is_322_or_higher and target_is_below_322 diff --git a/tests/test.py b/tests/test.py index 38cb88e..3a5d0ad 100644 --- a/tests/test.py +++ b/tests/test.py @@ -198,10 +198,6 @@ def test_cat(path, expected): test_cat("/etc/version", b"20221026104022\n") -assert_value("latest rm1 version", updater.get_latest_version(HardwareType.RM1), "3.23.0.64") -assert_value("latest rm2 version", updater.get_latest_version(HardwareType.RM2), "3.23.0.64") -# Don't think this test is needed. - assert_gt( "toltec rm1 version", updater.get_toltec_version(HardwareType.RM1), @@ -214,6 +210,8 @@ def test_cat(path, expected): ) with assert_raises("toltec rmpp version", SystemExit): updater.get_toltec_version(HardwareType.RMPP) +with assert_raises("toltec rmppm version", SystemExit): + updater.get_toltec_version(HardwareType.RMPPM) assert_value( "boundary cross 3.23->3.20", @@ -245,16 +243,10 @@ def test_cat(path, expected): UpdateManager.is_bootloader_boundary_downgrade("3.22.0.64", "3.22.0.64"), False ) -assert_value( - "empty string current", - UpdateManager.is_bootloader_boundary_downgrade("", "3.20.0.92"), - False -) -assert_value( - "non-numeric version", - UpdateManager.is_bootloader_boundary_downgrade("abc.def", "3.20.0.92"), - False -) +with assert_raises("empty string current", ValueError): + UpdateManager.is_bootloader_boundary_downgrade("", "3.20.0.92") +with assert_raises("non-numeric version", ValueError): + UpdateManager.is_bootloader_boundary_downgrade("abc.def", "3.20.0.92") if FAILED: sys.exit(1) From 469ca81d439ce30298cb92068cd092dad406547b Mon Sep 17 00:00:00 2001 From: Mitchell Scott <10804314+rmitchellscott@users.noreply.github.com> Date: Mon, 1 Dec 2025 16:20:21 -0700 Subject: [PATCH 10/15] fix: remove dead code for unknown backup version --- codexctl/device.py | 5 ----- 1 file changed, 5 deletions(-) diff --git a/codexctl/device.py b/codexctl/device.py index 65ac0e8..6c9a080 100644 --- a/codexctl/device.py +++ b/codexctl/device.py @@ -584,11 +584,6 @@ def restore_previous_version(self) -> None: else: raise SystemError(f"Cannot restore: unexpected current version format '{current_version}'") - if backup_version == "unknown": - raise SystemError( - "Cannot restore: backup partition version could not be determined. " - ) - parts = backup_version.split('.') if len(parts) >= 2 and parts[0].isdigit() and parts[1].isdigit(): target_is_new = [int(parts[0]), int(parts[1])] >= [3, 22] From 66840e4603976a3b6747d84a9d46835323f4e8a0 Mon Sep 17 00:00:00 2001 From: Mitchell Scott <10804314+rmitchellscott@users.noreply.github.com> Date: Tue, 2 Dec 2025 19:09:23 -0700 Subject: [PATCH 11/15] remarkable-update-fuse and remarkable-update-image to 1.3, swu tests --- Makefile | 39 +++++++++++++++++++++++++++++++++++++-- codexctl/__init__.py | 4 ++-- codexctl/analysis.py | 3 +++ pyproject.toml | 4 ++-- requirements.txt | 4 ++-- 5 files changed, 46 insertions(+), 8 deletions(-) diff --git a/Makefile b/Makefile index 0184d66..6b80488 100644 --- a/Makefile +++ b/Makefile @@ -4,6 +4,11 @@ FW_DATA := wVbHkgKisg- IMG_SHA := fc7d145e18f14a1a3f435f2fd5ca5924fe8dfe59bf45605dc540deed59551ae4 LS_DATA := ". .. lost+found bin boot dev etc home lib media mnt postinst proc run sbin sys tmp uboot-postinst uboot-version usr var" CAT_DATA := 20221026104022 +FW_VERSION_SWU := 3.20.0.92 +FW_FILE_SWU := remarkable-production-memfault-image-$(FW_VERSION_SWU)-rm2-public +IMG_SHA_SWU := 7de74325d82d249ccd644e6a6be2ada954a225cfe434d3bf16c4fa6e1c145eb9 +LS_DATA_SWU := ". .. lost+found bin boot dev etc home lib media mnt postinst postinst-waveform proc run sbin srv sys tmp uboot-version usr var" +CAT_DATA_SWU := 20250613122401 SHELL := /bin/bash ifeq ($(OS),Windows_NT) @@ -49,7 +54,13 @@ $(VENV_BIN_ACTIVATE): requirements.remote.txt requirements.txt . $(VENV_BIN_ACTIVATE); \ python -m codexctl download --hardware rm2 --out .venv ${FW_VERSION} -test: $(VENV_BIN_ACTIVATE) .venv/${FW_VERSION}_reMarkable2-${FW_DATA}.signed +.venv/$(FW_FILE_SWU): $(VENV_BIN_ACTIVATE) $(OBJ) + @echo "[info] Downloading remarkable .swu update file" + @set -e; \ + . $(VENV_BIN_ACTIVATE); \ + python -m codexctl download --hardware rm2 --out .venv $(FW_VERSION_SWU) + +test: $(VENV_BIN_ACTIVATE) .venv/${FW_VERSION}_reMarkable2-${FW_DATA}.signed .venv/$(FW_FILE_SWU) @echo "[info] Running test" @set -e; \ . $(VENV_BIN_ACTIVATE); \ @@ -74,9 +85,21 @@ test: $(VENV_BIN_ACTIVATE) .venv/${FW_VERSION}_reMarkable2-${FW_DATA}.signed if ! diff --color <(python -m codexctl cat ".venv/${FW_VERSION}_reMarkable2-${FW_DATA}.signed" /etc/version | tr -d "\n\r") <(echo -n ${CAT_DATA}) | cat -te; then \ echo "codexctl cat failed test"; \ exit 1; \ + fi; \ + echo "[info] Running .swu tests"; \ + python -m codexctl extract --out ".venv/$(FW_FILE_SWU).img" ".venv/$(FW_FILE_SWU)"; \ + echo "$(IMG_SHA_SWU) .venv/$(FW_FILE_SWU).img" | $(SHA256SUM) -c; \ + rm -f ".venv/$(FW_FILE_SWU).img"; \ + if ! diff --color <(python -m codexctl ls ".venv/$(FW_FILE_SWU)" / | tr -d "\n\r") <(echo -n $(LS_DATA_SWU)) | cat -te; then \ + echo "codexctl ls .swu failed test"; \ + exit 1; \ + fi; \ + if ! diff --color <(python -m codexctl cat ".venv/$(FW_FILE_SWU)" /etc/version | tr -d "\n\r") <(echo -n $(CAT_DATA_SWU)) | cat -te; then \ + echo "codexctl cat .swu failed test"; \ + exit 1; \ fi -test-executable: .venv/${FW_VERSION}_reMarkable2-${FW_DATA}.signed +test-executable: .venv/${FW_VERSION}_reMarkable2-${FW_DATA}.signed .venv/$(FW_FILE_SWU) @set -e; \ . $(VENV_BIN_ACTIVATE); \ dist/${CODEXCTL_BIN} extract --out ".venv/${FW_VERSION}_reMarkable2-${FW_DATA}.img" ".venv/${FW_VERSION}_reMarkable2-${FW_DATA}.signed"; \ @@ -90,6 +113,18 @@ test-executable: .venv/${FW_VERSION}_reMarkable2-${FW_DATA}.signed if ! diff --color <(dist/${CODEXCTL_BIN} cat ".venv/${FW_VERSION}_reMarkable2-${FW_DATA}.signed" /etc/version | tr -d "\n\r") <(echo -n ${CAT_DATA}) | cat -te; then \ echo "codexctl cat failed test"; \ exit 1; \ + fi; \ + echo "[info] Running .swu tests"; \ + dist/${CODEXCTL_BIN} extract --out ".venv/$(FW_FILE_SWU).img" ".venv/$(FW_FILE_SWU)"; \ + echo "$(IMG_SHA_SWU) .venv/$(FW_FILE_SWU).img" | $(SHA256SUM) -c; \ + rm -f ".venv/$(FW_FILE_SWU).img"; \ + if ! diff --color <(dist/${CODEXCTL_BIN} ls ".venv/$(FW_FILE_SWU)" / | tr -d "\n\r") <(echo -n $(LS_DATA_SWU)) | cat -te; then \ + echo "codexctl ls .swu failed test"; \ + exit 1; \ + fi; \ + if ! diff --color <(dist/${CODEXCTL_BIN} cat ".venv/$(FW_FILE_SWU)" /etc/version | tr -d "\n\r") <(echo -n $(CAT_DATA_SWU)) | cat -te; then \ + echo "codexctl cat .swu failed test"; \ + exit 1; \ fi clean: diff --git a/codexctl/__init__.py b/codexctl/__init__.py index 80b785b..a7f142a 100644 --- a/codexctl/__init__.py +++ b/codexctl/__init__.py @@ -367,8 +367,8 @@ def version_lookup(version: str | None) -> re.Match[str] | None: from remarkable_update_image import UpdateImage swu_image = UpdateImage(current_swu_path) bootloader_files_for_install = { - 'update-bootloader.sh': swu_image['update-bootloader.sh'].read(), - 'imx-boot': swu_image['imx-boot'].read(), + 'update-bootloader.sh': swu_image.archive[b'update-bootloader.sh'].read(), + 'imx-boot': swu_image.archive[b'imx-boot'].read(), } if not all(bootloader_files_for_install.values()): diff --git a/codexctl/analysis.py b/codexctl/analysis.py index 7d15787..0218687 100644 --- a/codexctl/analysis.py +++ b/codexctl/analysis.py @@ -59,4 +59,7 @@ def get_swu_metadata(swu_file: str) -> tuple[str, HardwareType]: if image.hardware_type not in hw_map: raise SystemError(f"Unsupported hardware type in SWU file: {swu_file}") + if image.version is None: + raise SystemError(f"Could not determine version from SWU file: {swu_file}") + return image.version, hw_map[image.hardware_type] diff --git a/pyproject.toml b/pyproject.toml index a45818a..88c3475 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -12,8 +12,8 @@ paramiko = "3.4.1" psutil = "6.0.0" requests = "2.32.4" loguru = "0.7.3" -remarkable-update-image = { version = "1.1.6", markers = "sys_platform != 'linux'" } -remarkable-update-fuse = { version = "1.2.4", markers = "sys_platform == 'linux'" } +remarkable-update-image = { version = "1.3", markers = "sys_platform != 'linux'" } +remarkable-update-fuse = { version = "1.3", markers = "sys_platform == 'linux'" } [build-system] requires = ["poetry-core"] diff --git a/requirements.txt b/requirements.txt index b3a2f42..78e5d89 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,4 +1,4 @@ requests==2.32.4 loguru==0.7.3 -remarkable-update-image==1.2; sys_platform != 'linux' -remarkable-update-fuse==1.2.6; sys_platform == 'linux' +remarkable-update-image==1.3; sys_platform != 'linux' +remarkable-update-fuse==1.3; sys_platform == 'linux' From 5eba442449796e0c7683fac472cb5efec5484065 Mon Sep 17 00:00:00 2001 From: Mitchell Scott <10804314+rmitchellscott@users.noreply.github.com> Date: Tue, 2 Dec 2025 21:31:03 -0700 Subject: [PATCH 12/15] fix(ci): add disk space cleanup step to build for remarkable --- .github/workflows/main.yml | 17 ++++++++++++++++- 1 file changed, 16 insertions(+), 1 deletion(-) diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml index 998ba52..3d94399 100644 --- a/.github/workflows/main.yml +++ b/.github/workflows/main.yml @@ -115,6 +115,21 @@ jobs: name: Build for remarkable runs-on: ubuntu-latest steps: + - name: Free up disk space + shell: bash + run: | + sudo rm -vrf \ + /usr/lib/jvm \ + /usr/share/dotnet \ + /usr/share/swift \ + /usr/local/.ghcup \ + /usr/local/julia* \ + /usr/local/lib/android \ + /usr/local/share/chromium \ + /opt/microsoft /opt/google \ + /opt/az \ + /usr/local/share/powershell \ + | wc -l - name: Checkout the codexctl repository uses: actions/checkout@v4 - name: Nuitka ccache @@ -210,4 +225,4 @@ jobs: tag: ${{ env.TAG }} commit: ${{ github.sha }} generateReleaseNotes: true - makeLatest: true \ No newline at end of file + makeLatest: true From 7b8b39f0517f63798ea3ed48b69c6d880c7a2181 Mon Sep 17 00:00:00 2001 From: Mitchell Scott <10804314+rmitchellscott@users.noreply.github.com> Date: Wed, 3 Dec 2025 10:36:02 -0700 Subject: [PATCH 13/15] return empty dicts for devices missing from json --- codexctl/updates.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/codexctl/updates.py b/codexctl/updates.py index 44e3c41..5b65bd8 100644 --- a/codexctl/updates.py +++ b/codexctl/updates.py @@ -93,10 +93,10 @@ def get_remarkable_versions(self) -> tuple[dict, dict, dict, dict, list]: ) return ( - contents["remarkablepp"], - contents["remarkableppm"], - contents["remarkable2"], - contents["remarkable1"], + contents.get("remarkablepp", {}), + contents.get("remarkableppm", {}), + contents.get("remarkable2", {}), + contents.get("remarkable1", {}), provider_urls, ) From 3941f3927d2ae374e5e6cdae7f87495693ea5527 Mon Sep 17 00:00:00 2001 From: Mitchell Scott <10804314+rmitchellscott@users.noreply.github.com> Date: Thu, 4 Dec 2025 13:58:47 -0700 Subject: [PATCH 14/15] fix: suggestions from code review Co-authored-by: Nathaniel van Diepen --- README.md | 2 +- codexctl/__init__.py | 20 ++++++++++++++++---- codexctl/analysis.py | 31 ------------------------------- codexctl/device.py | 12 ++++++------ codexctl/updates.py | 5 +++-- 5 files changed, 26 insertions(+), 44 deletions(-) diff --git a/README.md b/README.md index 3a92a2f..c2584ba 100644 --- a/README.md +++ b/README.md @@ -84,7 +84,7 @@ codexctl restore codexctl download 3.8.0.1944 --hardware rm2 codexctl cat 3.8.0.1944_reMarkable2-7eGpAv7sYB.signed /etc/version ``` -- Extract the contents of an upgrade file to a folder named `extracted` +- Extract the filesystem image of an upgrade file as a file named `extracted` ``` codexctl extract remarkable-production-image-3.22.0.64-ferrari-public.swu -o extracted ``` diff --git a/codexctl/__init__.py b/codexctl/__init__.py index a7f142a..ed1f05d 100644 --- a/codexctl/__init__.py +++ b/codexctl/__init__.py @@ -126,7 +126,7 @@ def call_func(self, function: str, args: dict) -> None: from remarkable_update_fuse import UpdateFS except ImportError: raise ImportError( - "remarkable_update_image and remarkable_update_fuse are required for mounting. Please install them!" + "remarkable_update_fuse is required for mounting. Please install it!" ) if args["out"] is None: @@ -258,8 +258,20 @@ def version_lookup(version: str | None) -> re.Match[str] | None: image = UpdateImage(update_file) if isinstance(image, CPIOUpdateImage): - from .analysis import get_swu_metadata - version_number, swu_hardware = get_swu_metadata(update_file) + if image.version is None: + raise SystemError(f"Could not determine version from SWU file: {update_file}") + + version_number = image.version + hw_map = { + "reMarkable1": HardwareType.RM1, + "reMarkable2": HardwareType.RM2, + "ferrari": HardwareType.RMPP, + "chiappa": HardwareType.RMPPM, + } + if image.hardware_type not in hw_map: + raise SystemError(f"Unsupported hardware type in SWU file: {update_file}") + + hw_map[image.hardware_type] logger.info(f"Extracted from SWU: version={version_number}, hardware={swu_hardware.name}") if swu_hardware != remarkable.hardware: @@ -339,7 +351,7 @@ def version_lookup(version: str | None) -> re.Match[str] | None: print(" 5. Reboot") print() - response = input("Do you want to continue? (y/n): ") + response = input("Do you want to continue? (y/N): ") if response.lower() != 'y': raise SystemExit("Installation cancelled by user") diff --git a/codexctl/analysis.py b/codexctl/analysis.py index 0218687..f84c4d3 100644 --- a/codexctl/analysis.py +++ b/codexctl/analysis.py @@ -32,34 +32,3 @@ def get_update_image(file: str): warnings.warn("Unable to open public key", RuntimeWarning) return image, volume - - -def get_swu_metadata(swu_file: str) -> tuple[str, HardwareType]: - """ - Extract version and hardware type from an SWU file. - - Args: - swu_file: Path to the SWU file - - Returns: - Tuple of (version, hardware_type) - - Raises: - SystemError: If hardware type is unsupported - """ - image = UpdateImage(swu_file) - - hw_map = { - "reMarkable1": HardwareType.RM1, - "reMarkable2": HardwareType.RM2, - "ferrari": HardwareType.RMPP, - "chiappa": HardwareType.RMPPM, - } - - if image.hardware_type not in hw_map: - raise SystemError(f"Unsupported hardware type in SWU file: {swu_file}") - - if image.version is None: - raise SystemError(f"Could not determine version from SWU file: {swu_file}") - - return image.version, hw_map[image.hardware_type] diff --git a/codexctl/device.py b/codexctl/device.py index 6c9a080..c4aa899 100644 --- a/codexctl/device.py +++ b/codexctl/device.py @@ -579,17 +579,17 @@ def restore_previous_version(self) -> None: new_part_label = "a" if inactive_part == 2 else "b" parts = current_version.split('.') - if len(parts) >= 2 and parts[0].isdigit() and parts[1].isdigit(): - current_is_new = [int(parts[0]), int(parts[1])] >= [3, 22] - else: + if len(parts) < 2 or not parts[0].isdigit() or not parts[1].isdigit(): raise SystemError(f"Cannot restore: unexpected current version format '{current_version}'") + current_is_new = [int(parts[0]), int(parts[1])] >= [3, 22] + parts = backup_version.split('.') - if len(parts) >= 2 and parts[0].isdigit() and parts[1].isdigit(): - target_is_new = [int(parts[0]), int(parts[1])] >= [3, 22] - else: + if len(parts) < 2 or not parts[0].isdigit() or not parts[1].isdigit(): raise SystemError(f"Cannot restore: unexpected backup version format '{backup_version}'") + target_is_new = [int(parts[0]), int(parts[1])] >= [3, 22] + code = [ "#!/bin/bash", f"echo 'Switching from partition {current_part} to partition {inactive_part}'", diff --git a/codexctl/updates.py b/codexctl/updates.py index 5b65bd8..c23d253 100644 --- a/codexctl/updates.py +++ b/codexctl/updates.py @@ -84,14 +84,15 @@ def get_remarkable_versions(self) -> tuple[dict, dict, dict, dict, list]: self.logger.debug(f"Version ids contents are {contents}") provider_urls = contents.get("external-provider-urls", contents.get("external-provider-url")) - if isinstance(provider_urls, str): - provider_urls = [provider_urls] if provider_urls is None: raise SystemError( f"version-ids.json at {file_location} is missing external provider URLs. " "Please delete the file and try again, or open an issue on the repo." ) + if isinstance(provider_urls, str): + provider_urls = [provider_urls] + return ( contents.get("remarkablepp", {}), contents.get("remarkableppm", {}), From dd2951f3eb205005e0b5d621365d67160ba09335 Mon Sep 17 00:00:00 2001 From: Mitchell Scott <10804314+rmitchellscott@users.noreply.github.com> Date: Thu, 4 Dec 2025 18:51:47 -0700 Subject: [PATCH 15/15] fix: assign swu_hardware variable and improve import handling --- codexctl/__init__.py | 15 +++++++++------ codexctl/device.py | 9 ++++----- 2 files changed, 13 insertions(+), 11 deletions(-) diff --git a/codexctl/__init__.py b/codexctl/__init__.py index ed1f05d..871c834 100644 --- a/codexctl/__init__.py +++ b/codexctl/__init__.py @@ -207,6 +207,13 @@ def call_func(self, function: str, args: dict) -> None: ) remote = True + try: + from remarkable_update_image import UpdateImage + except ImportError: + raise ImportError( + "remarkable_update_image is required for install. Please install it!" + ) from None + from .device import DeviceManager from .server import get_available_version @@ -271,7 +278,7 @@ def version_lookup(version: str | None) -> re.Match[str] | None: if image.hardware_type not in hw_map: raise SystemError(f"Unsupported hardware type in SWU file: {update_file}") - hw_map[image.hardware_type] + swu_hardware = hw_map[image.hardware_type] logger.info(f"Extracted from SWU: version={version_number}, hardware={swu_hardware.name}") if swu_hardware != remarkable.hardware: @@ -632,8 +639,4 @@ def main() -> None: ### Call function man = Manager(device, logger) - try: - man.call_func(args.command, vars(args)) - except SystemError as e: - print(f"\nError: {e}", file=sys.stderr) - sys.exit(1) + man.call_func(args.command, vars(args)) diff --git a/codexctl/device.py b/codexctl/device.py index c4aa899..4cd75b0 100644 --- a/codexctl/device.py +++ b/codexctl/device.py @@ -1,10 +1,11 @@ import enum +import logging +import os +import re import socket import subprocess -import logging +import tempfile import threading -import re -import os import time from .server import startUpdate @@ -795,8 +796,6 @@ def _update_paper_pro_bootloader(self, bootloader_script: bytes, imx_boot: bytes Raises: SystemError: If bootloader update fails """ - import tempfile - self.logger.info("Starting bootloader update for Paper Pro") if not self.client: