From 79281d18a6c24a799cae520e73e99878b9a2cbd5 Mon Sep 17 00:00:00 2001 From: Mitchell Scott <10804314+rmitchellscott@users.noreply.github.com> Date: Sat, 6 Dec 2025 18:23:43 -0700 Subject: [PATCH 1/3] fix: on-device installs, rmpp 3.20 downgrade regression. Fixes #158 --- codexctl/__init__.py | 2 +- codexctl/device.py | 213 ++++++++++++++++++++++++++----------------- 2 files changed, 128 insertions(+), 87 deletions(-) diff --git a/codexctl/__init__.py b/codexctl/__init__.py index 871c834..b68f8d0 100644 --- a/codexctl/__init__.py +++ b/codexctl/__init__.py @@ -261,7 +261,7 @@ def version_lookup(version: str | None) -> re.Match[str] | None: if update_file: try: from remarkable_update_image import UpdateImage - from remarkable_update_image.cpio import UpdateImage as CPIOUpdateImage + from remarkable_update_image.image import CPIOUpdateImage image = UpdateImage(update_file) if isinstance(image, CPIOUpdateImage): diff --git a/codexctl/device.py b/codexctl/device.py index 74464c7..d71ee18 100644 --- a/codexctl/device.py +++ b/codexctl/device.py @@ -291,11 +291,11 @@ def connect_to_device( return client - def _read_version_from_path(self, ftp, base_path: str = "") -> tuple[str, bool]: + def _read_version_from_path(self, ftp=None, base_path: str = "") -> tuple[str, bool]: """Reads version from a given path (current partition or mounted backup) Args: - ftp: SFTP client connection + ftp: SFTP client connection (None for local file access) base_path: Base path prefix (empty for current partition, /tmp/mount_pX for backup) Returns: @@ -305,75 +305,120 @@ def _read_version_from_path(self, ftp, base_path: str = "") -> tuple[str, bool]: os_release_path = f"{base_path}/etc/os-release" if base_path else "/etc/os-release" def file_exists(path: str) -> bool: - try: - ftp.stat(path) - return True - except FileNotFoundError: - return False + if ftp: + try: + ftp.stat(path) + return True + except FileNotFoundError: + return False + return os.path.exists(path) + + def read_file(path: str) -> str: + if ftp: + with ftp.file(path) as file: + return file.read().decode("utf-8") + with open(path, encoding="utf-8") as file: + return file.read() if file_exists(update_conf_path): - with ftp.file(update_conf_path) as file: - contents = file.read().decode("utf-8").strip("\n") - match = re.search("(?<=REMARKABLE_RELEASE_VERSION=).*", contents) - if match: - return match.group(), True - raise SystemError(f"REMARKABLE_RELEASE_VERSION not found in {update_conf_path}") + contents = read_file(update_conf_path).strip("\n") + match = re.search("(?<=REMARKABLE_RELEASE_VERSION=).*", contents) + if match: + return match.group(), True + raise SystemError(f"REMARKABLE_RELEASE_VERSION not found in {update_conf_path}") if file_exists(os_release_path): - with ftp.file(os_release_path) as file: - contents = file.read().decode("utf-8") - match = re.search("(?<=IMG_VERSION=).*", contents) - if match: - return match.group().strip('"'), False - raise SystemError(f"IMG_VERSION not found in {os_release_path}") + contents = read_file(os_release_path) + match = re.search("(?<=IMG_VERSION=).*", contents) + if match: + return match.group().strip('"'), False + raise SystemError(f"IMG_VERSION not found in {os_release_path}") raise SystemError(f"Cannot read version from {base_path or 'current partition'}: no version file found") + def _get_active_device(self) -> str: + """Gets the active root device path. + + Returns: + str: Active device path (e.g., /dev/mmcblk2p2) + """ + if self.hardware in (HardwareType.RMPP, HardwareType.RMPPM): + cmd = "swupdate -g" + else: + cmd = "rootdev" + + if self.client: + _stdin, stdout, _stderr = self.client.exec_command(cmd) + return stdout.read().decode("utf-8").strip() + else: + result = subprocess.run(cmd.split(), capture_output=True, text=True) + return result.stdout.strip() + + def _parse_partition_info(self, active_device: str) -> tuple[int, int, str]: + """Parse partition numbers from device path. + + Args: + active_device: Device path (e.g., /dev/mmcblk2p2) + + Returns: + tuple: (active_part, inactive_part, device_base) + """ + active_part = int(active_device.split('p')[-1]) + inactive_part = 3 if active_part == 2 else 2 + device_base = re.sub(r'p\d+$', '', active_device) + return active_part, inactive_part, device_base + def _get_backup_partition_version(self) -> str: """Gets the version installed on the backup (inactive) partition Returns: - str: Version string + str: Version string (empty string for RM1/RM2 on failure) Raises: - SystemError: If backup partition version cannot be determined + SystemError: If backup partition version cannot be determined (Paper Pro only) """ - if not self.client: - raise SystemError("Cannot get backup partition version: no SSH client connection") - - ftp = self.client.open_sftp() - - if self.hardware in (HardwareType.RMPP, HardwareType.RMPPM): - _stdin, stdout, _stderr = self.client.exec_command("swupdate -g") - active_device = stdout.read().decode("utf-8").strip() - active_part = int(active_device.split('p')[-1]) - inactive_part = 3 if active_part == 2 else 2 - device_base = re.sub(r'p\d+$', '', active_device) - else: - _stdin, stdout, _stderr = self.client.exec_command("rootdev") - active_device = stdout.read().decode("utf-8").strip() - active_part = int(active_device.split('p')[-1]) - inactive_part = 3 if active_part == 2 else 2 - device_base = re.sub(r'p\d+$', '', active_device) + try: + active_device = self._get_active_device() + _, inactive_part, device_base = self._parse_partition_info(active_device) + mount_point = f"/tmp/mount_p{inactive_part}" - mount_point = f"/tmp/mount_p{inactive_part}" + if self.client: + ftp = self.client.open_sftp() + self.client.exec_command(f"mkdir -p {mount_point}") + _stdin, stdout, _stderr = self.client.exec_command( + f"mount -o ro {device_base}p{inactive_part} {mount_point}" + ) + exit_status = stdout.channel.recv_exit_status() - self.client.exec_command(f"mkdir -p {mount_point}") - _stdin, stdout, _stderr = self.client.exec_command( - f"mount -o ro {device_base}p{inactive_part} {mount_point}" - ) - exit_status = stdout.channel.recv_exit_status() + if exit_status != 0: + error_msg = _stderr.read().decode('utf-8') + raise SystemError(f"Failed to mount backup partition: {error_msg}") - if exit_status != 0: - error_msg = _stderr.read().decode('utf-8') - raise SystemError(f"Failed to mount backup partition: {error_msg}") + try: + version, _ = self._read_version_from_path(ftp, mount_point) + return version + finally: + self.client.exec_command(f"umount {mount_point}") + self.client.exec_command(f"rm -rf {mount_point}") + else: + os.makedirs(mount_point, exist_ok=True) + result = subprocess.run( + ["mount", "-o", "ro", f"{device_base}p{inactive_part}", mount_point], + capture_output=True, text=True + ) + if result.returncode != 0: + raise SystemError(f"Failed to mount backup partition: {result.stderr}") - try: - version, _ = self._read_version_from_path(ftp, mount_point) - return version - finally: - self.client.exec_command(f"umount {mount_point}") - self.client.exec_command(f"rm -rf {mount_point}") + try: + version, _ = self._read_version_from_path(base_path=mount_point) + return version + finally: + subprocess.run(["umount", mount_point]) + subprocess.run(["rm", "-rf", mount_point]) + except SystemError: + if self.hardware in (HardwareType.RMPP, HardwareType.RMPPM): + raise + return "" def _get_paper_pro_partition_info(self, current_version: str) -> tuple[int, int, int]: """Gets partition information for Paper Pro devices @@ -384,13 +429,8 @@ def _get_paper_pro_partition_info(self, current_version: str) -> tuple[int, int, Returns: tuple: (current_partition, inactive_partition, next_boot_partition) """ - if not self.client: - raise SystemError("SSH client required for partition detection") - - _stdin, stdout, _stderr = self.client.exec_command("swupdate -g") - active_device = stdout.read().decode("utf-8").strip() - current_part = int(active_device.split('p')[-1]) - inactive_part = 3 if current_part == 2 else 2 + active_device = self._get_active_device() + current_part, inactive_part, _ = self._parse_partition_info(active_device) parts = current_version.split('.') if len(parts) >= 2 and parts[0].isdigit() and parts[1].isdigit(): @@ -401,20 +441,34 @@ def _get_paper_pro_partition_info(self, current_version: str) -> tuple[int, int, next_boot_part = current_part if is_new_version: + boot_part_path = "/sys/bus/mmc/devices/mmc0:0001/boot_part" try: - ftp = self.client.open_sftp() - with ftp.file("/sys/bus/mmc/devices/mmc0:0001/boot_part") as file: - boot_part_value = file.read().decode("utf-8").strip() - next_boot_part = 2 if boot_part_value == "1" else 3 + if self.client: + ftp = self.client.open_sftp() + with ftp.file(boot_part_path) as file: + boot_part_value = file.read().decode("utf-8").strip() + next_boot_part = 2 if boot_part_value == "1" else 3 + elif os.path.exists(boot_part_path): + with open(boot_part_path, encoding="utf-8") as file: + boot_part_value = file.read().strip() + next_boot_part = 2 if boot_part_value == "1" else 3 + else: + is_new_version = False except (IOError, OSError): is_new_version = False if not is_new_version: + root_part_path = "/sys/devices/platform/lpgpr/root_part" try: - ftp = self.client.open_sftp() - with ftp.file("/sys/devices/platform/lpgpr/root_part") as file: - root_part_value = file.read().decode("utf-8").strip() - next_boot_part = 2 if root_part_value == "a" else 3 + if self.client: + ftp = self.client.open_sftp() + with ftp.file(root_part_path) as file: + root_part_value = file.read().decode("utf-8").strip() + next_boot_part = 2 if root_part_value == "a" else 3 + elif os.path.exists(root_part_path): + with open(root_part_path, encoding="utf-8") as file: + root_part_value = file.read().strip() + next_boot_part = 2 if root_part_value == "a" else 3 except (IOError, OSError) as e: self.logger.debug(f"Failed to read next boot partition: {e}") @@ -442,29 +496,16 @@ def get_device_status(self) -> tuple[str | None, str, str, str, str]: beta_contents = file.read().decode("utf-8") else: - if os.path.exists("/usr/share/remarkable/update.conf"): - with open("/usr/share/remarkable/update.conf", encoding="utf-8") as file: - xochitl_version = re.search( - "(?<=REMARKABLE_RELEASE_VERSION=).*", - file.read().strip("\n"), - ).group() - else: - with open("/etc/os-release", encoding="utf-8") as file: - xochitl_version = ( - re.search("(?<=IMG_VERSION=).*", file.read()) - .group() - .strip('"') - ) + xochitl_version, old_update_engine = self._read_version_from_path() - old_update_engine = False if os.path.exists("/etc/version"): - with open("/etc/version") as file: + with open("/etc/version", encoding="utf-8") as file: version_id = file.read().rstrip() else: version_id = "" if os.path.exists("/home/root/.config/remarkable/xochitl.conf"): - with open("/home/root/.config/remarkable/xochitl.conf") as file: + with open("/home/root/.config/remarkable/xochitl.conf", encoding="utf-8") as file: beta_contents = file.read().rstrip() else: beta_contents = "" @@ -691,7 +732,7 @@ def install_sw_update(self, version_file: str, bootloader_files: dict[str, bytes print("\nDone! Running swupdate (PLEASE BE PATIENT, ~5 MINUTES)") - command = f"/usr/sbin/swupdate-from-image-file {out_location}" + command = f"bash -c 'source /usr/lib/swupdate/conf.d/09-swupdate-args && swupdate $SWUPDATE_ARGS -i {out_location}'" self.logger.debug(command) _stdin, stdout, _stderr = self.client.exec_command(command) @@ -737,7 +778,7 @@ def install_sw_update(self, version_file: str, bootloader_files: dict[str, bytes else: print("Running swupdate") - command = ["/usr/sbin/swupdate-from-image-file", version_file] + command = ["bash", "-c", f"source /usr/lib/swupdate/conf.d/09-swupdate-args && swupdate $SWUPDATE_ARGS -i {version_file}"] self.logger.debug(command) try: From 04d24c59713e269769434b161272dd32b58b954e Mon Sep 17 00:00:00 2001 From: Mitchell Scott <10804314+rmitchellscott@users.noreply.github.com> Date: Sat, 6 Dec 2025 20:28:58 -0700 Subject: [PATCH 2/3] fix: file access helpers and escape shell argument --- codexctl/device.py | 65 +++++++++++++++++++++++++++------------------- 1 file changed, 38 insertions(+), 27 deletions(-) diff --git a/codexctl/device.py b/codexctl/device.py index d71ee18..30c865d 100644 --- a/codexctl/device.py +++ b/codexctl/device.py @@ -2,6 +2,7 @@ import logging import os import re +import shlex import socket import subprocess import tempfile @@ -304,21 +305,23 @@ def _read_version_from_path(self, ftp=None, base_path: str = "") -> tuple[str, b update_conf_path = f"{base_path}/usr/share/remarkable/update.conf" if base_path else "/usr/share/remarkable/update.conf" os_release_path = f"{base_path}/etc/os-release" if base_path else "/etc/os-release" - def file_exists(path: str) -> bool: - if ftp: + if ftp: + def file_exists(path: str) -> bool: try: ftp.stat(path) return True except FileNotFoundError: return False - return os.path.exists(path) - def read_file(path: str) -> str: - if ftp: + def read_file(path: str) -> str: with ftp.file(path) as file: return file.read().decode("utf-8") - with open(path, encoding="utf-8") as file: - return file.read() + else: + file_exists = os.path.exists + + def read_file(path: str) -> str: + with open(path, encoding="utf-8") as file: + return file.read() if file_exists(update_conf_path): contents = read_file(update_conf_path).strip("\n") @@ -440,18 +443,32 @@ def _get_paper_pro_partition_info(self, current_version: str) -> tuple[int, int, next_boot_part = current_part + if self.client: + ftp = self.client.open_sftp() + + def file_exists(path: str) -> bool: + try: + ftp.stat(path) + return True + except FileNotFoundError: + return False + + def read_file(path: str) -> str: + with ftp.file(path) as file: + return file.read().decode("utf-8") + else: + file_exists = os.path.exists + + def read_file(path: str) -> str: + with open(path, encoding="utf-8") as file: + return file.read() + if is_new_version: boot_part_path = "/sys/bus/mmc/devices/mmc0:0001/boot_part" try: - if self.client: - ftp = self.client.open_sftp() - with ftp.file(boot_part_path) as file: - boot_part_value = file.read().decode("utf-8").strip() - next_boot_part = 2 if boot_part_value == "1" else 3 - elif os.path.exists(boot_part_path): - with open(boot_part_path, encoding="utf-8") as file: - boot_part_value = file.read().strip() - next_boot_part = 2 if boot_part_value == "1" else 3 + if file_exists(boot_part_path): + boot_part_value = read_file(boot_part_path).strip() + next_boot_part = 2 if boot_part_value == "1" else 3 else: is_new_version = False except (IOError, OSError): @@ -460,15 +477,9 @@ def _get_paper_pro_partition_info(self, current_version: str) -> tuple[int, int, if not is_new_version: root_part_path = "/sys/devices/platform/lpgpr/root_part" try: - if self.client: - ftp = self.client.open_sftp() - with ftp.file(root_part_path) as file: - root_part_value = file.read().decode("utf-8").strip() - next_boot_part = 2 if root_part_value == "a" else 3 - elif os.path.exists(root_part_path): - with open(root_part_path, encoding="utf-8") as file: - root_part_value = file.read().strip() - next_boot_part = 2 if root_part_value == "a" else 3 + if file_exists(root_part_path): + root_part_value = read_file(root_part_path).strip() + next_boot_part = 2 if root_part_value == "a" else 3 except (IOError, OSError) as e: self.logger.debug(f"Failed to read next boot partition: {e}") @@ -732,7 +743,7 @@ def install_sw_update(self, version_file: str, bootloader_files: dict[str, bytes print("\nDone! Running swupdate (PLEASE BE PATIENT, ~5 MINUTES)") - command = f"bash -c 'source /usr/lib/swupdate/conf.d/09-swupdate-args && swupdate $SWUPDATE_ARGS -i {out_location}'" + command = f"bash -c 'source /usr/lib/swupdate/conf.d/09-swupdate-args && swupdate $SWUPDATE_ARGS -i {shlex.quote(out_location)}'" self.logger.debug(command) _stdin, stdout, _stderr = self.client.exec_command(command) @@ -778,7 +789,7 @@ def install_sw_update(self, version_file: str, bootloader_files: dict[str, bytes else: print("Running swupdate") - command = ["bash", "-c", f"source /usr/lib/swupdate/conf.d/09-swupdate-args && swupdate $SWUPDATE_ARGS -i {version_file}"] + command = ["bash", "-c", f"source /usr/lib/swupdate/conf.d/09-swupdate-args && swupdate $SWUPDATE_ARGS -i {shlex.quote(version_file)}"] self.logger.debug(command) try: From 87ded47f3c1b566b10232ec68039006888ee6016 Mon Sep 17 00:00:00 2001 From: Mitchell Scott <10804314+rmitchellscott@users.noreply.github.com> Date: Sat, 6 Dec 2025 20:48:44 -0700 Subject: [PATCH 3/3] fix: check exit status when determining active root device --- codexctl/device.py | 14 ++++++++++++-- 1 file changed, 12 insertions(+), 2 deletions(-) diff --git a/codexctl/device.py b/codexctl/device.py index 30c865d..97481e2 100644 --- a/codexctl/device.py +++ b/codexctl/device.py @@ -344,6 +344,9 @@ def _get_active_device(self) -> str: Returns: str: Active device path (e.g., /dev/mmcblk2p2) + + Raises: + SystemError: If command fails or returns no output """ if self.hardware in (HardwareType.RMPP, HardwareType.RMPPM): cmd = "swupdate -g" @@ -351,10 +354,17 @@ def _get_active_device(self) -> str: cmd = "rootdev" if self.client: - _stdin, stdout, _stderr = self.client.exec_command(cmd) - return stdout.read().decode("utf-8").strip() + _stdin, stdout, stderr = self.client.exec_command(cmd) + output = stdout.read().decode("utf-8").strip() + exit_status = stdout.channel.recv_exit_status() + if exit_status != 0 or not output: + error = stderr.read().decode("utf-8", errors="ignore") + raise SystemError(f"Failed to get active device using '{cmd}': {error or 'no output'}") + return output else: result = subprocess.run(cmd.split(), capture_output=True, text=True) + if result.returncode != 0 or not result.stdout.strip(): + raise SystemError(f"Failed to get active device using '{cmd}': {result.stderr or 'no output'}") return result.stdout.strip() def _parse_partition_info(self, active_device: str) -> tuple[int, int, str]: