Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion codexctl/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,7 @@ def version_lookup(version: str | None) -> re.Match[str] | None:
if update_file:
try:
from remarkable_update_image import UpdateImage
from remarkable_update_image.cpio import UpdateImage as CPIOUpdateImage
from remarkable_update_image.image import CPIOUpdateImage

image = UpdateImage(update_file)
if isinstance(image, CPIOUpdateImage):
Expand Down
230 changes: 146 additions & 84 deletions codexctl/device.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import logging
import os
import re
import shlex
import socket
import subprocess
import tempfile
Expand Down Expand Up @@ -291,11 +292,11 @@ def connect_to_device(

return client

def _read_version_from_path(self, ftp, base_path: str = "") -> tuple[str, bool]:
def _read_version_from_path(self, ftp=None, base_path: str = "") -> tuple[str, bool]:
"""Reads version from a given path (current partition or mounted backup)

Args:
ftp: SFTP client connection
ftp: SFTP client connection (None for local file access)
base_path: Base path prefix (empty for current partition, /tmp/mount_pX for backup)

Returns:
Expand All @@ -304,76 +305,133 @@ def _read_version_from_path(self, ftp, base_path: str = "") -> tuple[str, bool]:
update_conf_path = f"{base_path}/usr/share/remarkable/update.conf" if base_path else "/usr/share/remarkable/update.conf"
os_release_path = f"{base_path}/etc/os-release" if base_path else "/etc/os-release"

def file_exists(path: str) -> bool:
try:
ftp.stat(path)
return True
except FileNotFoundError:
return False
if ftp:
def file_exists(path: str) -> bool:
try:
ftp.stat(path)
return True
except FileNotFoundError:
return False

def read_file(path: str) -> str:
with ftp.file(path) as file:
return file.read().decode("utf-8")
else:
file_exists = os.path.exists

def read_file(path: str) -> str:
with open(path, encoding="utf-8") as file:
return file.read()

if file_exists(update_conf_path):
with ftp.file(update_conf_path) as file:
contents = file.read().decode("utf-8").strip("\n")
match = re.search("(?<=REMARKABLE_RELEASE_VERSION=).*", contents)
if match:
return match.group(), True
raise SystemError(f"REMARKABLE_RELEASE_VERSION not found in {update_conf_path}")
contents = read_file(update_conf_path).strip("\n")
match = re.search("(?<=REMARKABLE_RELEASE_VERSION=).*", contents)
if match:
return match.group(), True
raise SystemError(f"REMARKABLE_RELEASE_VERSION not found in {update_conf_path}")

if file_exists(os_release_path):
with ftp.file(os_release_path) as file:
contents = file.read().decode("utf-8")
match = re.search("(?<=IMG_VERSION=).*", contents)
if match:
return match.group().strip('"'), False
raise SystemError(f"IMG_VERSION not found in {os_release_path}")
contents = read_file(os_release_path)
match = re.search("(?<=IMG_VERSION=).*", contents)
if match:
return match.group().strip('"'), False
raise SystemError(f"IMG_VERSION not found in {os_release_path}")

raise SystemError(f"Cannot read version from {base_path or 'current partition'}: no version file found")

def _get_backup_partition_version(self) -> str:
"""Gets the version installed on the backup (inactive) partition
def _get_active_device(self) -> str:
"""Gets the active root device path.

Returns:
str: Version string
str: Active device path (e.g., /dev/mmcblk2p2)

Raises:
SystemError: If backup partition version cannot be determined
SystemError: If command fails or returns no output
"""
if not self.client:
raise SystemError("Cannot get backup partition version: no SSH client connection")

ftp = self.client.open_sftp()

if self.hardware in (HardwareType.RMPP, HardwareType.RMPPM):
_stdin, stdout, _stderr = self.client.exec_command("swupdate -g")
active_device = stdout.read().decode("utf-8").strip()
active_part = int(active_device.split('p')[-1])
inactive_part = 3 if active_part == 2 else 2
device_base = re.sub(r'p\d+$', '', active_device)
cmd = "swupdate -g"
else:
_stdin, stdout, _stderr = self.client.exec_command("rootdev")
active_device = stdout.read().decode("utf-8").strip()
active_part = int(active_device.split('p')[-1])
inactive_part = 3 if active_part == 2 else 2
device_base = re.sub(r'p\d+$', '', active_device)
cmd = "rootdev"

mount_point = f"/tmp/mount_p{inactive_part}"
if self.client:
_stdin, stdout, stderr = self.client.exec_command(cmd)
output = stdout.read().decode("utf-8").strip()
exit_status = stdout.channel.recv_exit_status()
if exit_status != 0 or not output:
error = stderr.read().decode("utf-8", errors="ignore")
raise SystemError(f"Failed to get active device using '{cmd}': {error or 'no output'}")
return output
else:
result = subprocess.run(cmd.split(), capture_output=True, text=True)
if result.returncode != 0 or not result.stdout.strip():
raise SystemError(f"Failed to get active device using '{cmd}': {result.stderr or 'no output'}")
return result.stdout.strip()

self.client.exec_command(f"mkdir -p {mount_point}")
_stdin, stdout, _stderr = self.client.exec_command(
f"mount -o ro {device_base}p{inactive_part} {mount_point}"
)
exit_status = stdout.channel.recv_exit_status()
def _parse_partition_info(self, active_device: str) -> tuple[int, int, str]:
"""Parse partition numbers from device path.

Args:
active_device: Device path (e.g., /dev/mmcblk2p2)

Returns:
tuple: (active_part, inactive_part, device_base)
"""
active_part = int(active_device.split('p')[-1])
inactive_part = 3 if active_part == 2 else 2
device_base = re.sub(r'p\d+$', '', active_device)
return active_part, inactive_part, device_base

if exit_status != 0:
error_msg = _stderr.read().decode('utf-8')
raise SystemError(f"Failed to mount backup partition: {error_msg}")
def _get_backup_partition_version(self) -> str:
"""Gets the version installed on the backup (inactive) partition

Returns:
str: Version string (empty string for RM1/RM2 on failure)

Raises:
SystemError: If backup partition version cannot be determined (Paper Pro only)
"""
try:
version, _ = self._read_version_from_path(ftp, mount_point)
return version
finally:
self.client.exec_command(f"umount {mount_point}")
self.client.exec_command(f"rm -rf {mount_point}")
active_device = self._get_active_device()
_, inactive_part, device_base = self._parse_partition_info(active_device)
mount_point = f"/tmp/mount_p{inactive_part}"

if self.client:
ftp = self.client.open_sftp()
self.client.exec_command(f"mkdir -p {mount_point}")
_stdin, stdout, _stderr = self.client.exec_command(
f"mount -o ro {device_base}p{inactive_part} {mount_point}"
)
exit_status = stdout.channel.recv_exit_status()

if exit_status != 0:
error_msg = _stderr.read().decode('utf-8')
raise SystemError(f"Failed to mount backup partition: {error_msg}")

try:
version, _ = self._read_version_from_path(ftp, mount_point)
return version
finally:
self.client.exec_command(f"umount {mount_point}")
self.client.exec_command(f"rm -rf {mount_point}")
else:
os.makedirs(mount_point, exist_ok=True)
result = subprocess.run(
["mount", "-o", "ro", f"{device_base}p{inactive_part}", mount_point],
capture_output=True, text=True
)
if result.returncode != 0:
raise SystemError(f"Failed to mount backup partition: {result.stderr}")

try:
version, _ = self._read_version_from_path(base_path=mount_point)
return version
finally:
subprocess.run(["umount", mount_point])
subprocess.run(["rm", "-rf", mount_point])
except SystemError:
if self.hardware in (HardwareType.RMPP, HardwareType.RMPPM):
raise
return ""

def _get_paper_pro_partition_info(self, current_version: str) -> tuple[int, int, int]:
"""Gets partition information for Paper Pro devices
Expand All @@ -384,13 +442,8 @@ def _get_paper_pro_partition_info(self, current_version: str) -> tuple[int, int,
Returns:
tuple: (current_partition, inactive_partition, next_boot_partition)
"""
if not self.client:
raise SystemError("SSH client required for partition detection")

_stdin, stdout, _stderr = self.client.exec_command("swupdate -g")
active_device = stdout.read().decode("utf-8").strip()
current_part = int(active_device.split('p')[-1])
inactive_part = 3 if current_part == 2 else 2
active_device = self._get_active_device()
current_part, inactive_part, _ = self._parse_partition_info(active_device)

parts = current_version.split('.')
if len(parts) >= 2 and parts[0].isdigit() and parts[1].isdigit():
Expand All @@ -400,20 +453,42 @@ def _get_paper_pro_partition_info(self, current_version: str) -> tuple[int, int,

next_boot_part = current_part

if self.client:
ftp = self.client.open_sftp()

def file_exists(path: str) -> bool:
try:
ftp.stat(path)
return True
except FileNotFoundError:
return False

def read_file(path: str) -> str:
with ftp.file(path) as file:
return file.read().decode("utf-8")
else:
file_exists = os.path.exists

def read_file(path: str) -> str:
with open(path, encoding="utf-8") as file:
return file.read()

if is_new_version:
boot_part_path = "/sys/bus/mmc/devices/mmc0:0001/boot_part"
try:
ftp = self.client.open_sftp()
with ftp.file("/sys/bus/mmc/devices/mmc0:0001/boot_part") as file:
boot_part_value = file.read().decode("utf-8").strip()
if file_exists(boot_part_path):
boot_part_value = read_file(boot_part_path).strip()
next_boot_part = 2 if boot_part_value == "1" else 3
else:
is_new_version = False
except (IOError, OSError):
is_new_version = False

if not is_new_version:
root_part_path = "/sys/devices/platform/lpgpr/root_part"
try:
ftp = self.client.open_sftp()
with ftp.file("/sys/devices/platform/lpgpr/root_part") as file:
root_part_value = file.read().decode("utf-8").strip()
if file_exists(root_part_path):
root_part_value = read_file(root_part_path).strip()
next_boot_part = 2 if root_part_value == "a" else 3
except (IOError, OSError) as e:
self.logger.debug(f"Failed to read next boot partition: {e}")
Expand Down Expand Up @@ -442,29 +517,16 @@ def get_device_status(self) -> tuple[str | None, str, str, str, str]:
beta_contents = file.read().decode("utf-8")

else:
if os.path.exists("/usr/share/remarkable/update.conf"):
with open("/usr/share/remarkable/update.conf", encoding="utf-8") as file:
xochitl_version = re.search(
"(?<=REMARKABLE_RELEASE_VERSION=).*",
file.read().strip("\n"),
).group()
else:
with open("/etc/os-release", encoding="utf-8") as file:
xochitl_version = (
re.search("(?<=IMG_VERSION=).*", file.read())
.group()
.strip('"')
)
xochitl_version, old_update_engine = self._read_version_from_path()

old_update_engine = False
if os.path.exists("/etc/version"):
with open("/etc/version") as file:
with open("/etc/version", encoding="utf-8") as file:
version_id = file.read().rstrip()
else:
version_id = ""

if os.path.exists("/home/root/.config/remarkable/xochitl.conf"):
with open("/home/root/.config/remarkable/xochitl.conf") as file:
with open("/home/root/.config/remarkable/xochitl.conf", encoding="utf-8") as file:
beta_contents = file.read().rstrip()
else:
beta_contents = ""
Expand Down Expand Up @@ -691,7 +753,7 @@ def install_sw_update(self, version_file: str, bootloader_files: dict[str, bytes

print("\nDone! Running swupdate (PLEASE BE PATIENT, ~5 MINUTES)")

command = f"/usr/sbin/swupdate-from-image-file {out_location}"
command = f"bash -c 'source /usr/lib/swupdate/conf.d/09-swupdate-args && swupdate $SWUPDATE_ARGS -i {shlex.quote(out_location)}'"
self.logger.debug(command)
_stdin, stdout, _stderr = self.client.exec_command(command)

Expand Down Expand Up @@ -737,7 +799,7 @@ def install_sw_update(self, version_file: str, bootloader_files: dict[str, bytes

else:
print("Running swupdate")
command = ["/usr/sbin/swupdate-from-image-file", version_file]
command = ["bash", "-c", f"source /usr/lib/swupdate/conf.d/09-swupdate-args && swupdate $SWUPDATE_ARGS -i {shlex.quote(version_file)}"]
self.logger.debug(command)

try:
Expand Down
Loading