diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index fc1b661..356d079 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -18,7 +18,6 @@ on: jobs: test: - runs-on: windows-latest strategy: fail-fast: false @@ -27,13 +26,39 @@ jobs: steps: - uses: actions/checkout@v4 + + - name: Restore cached test binaries + id: restore-cache-test + uses: actions/cache/restore@v4 + with: + path: tests/manual/test_inject/target + key: ${{ runner.os }}-${{ hashFiles('tests/manual/test_inject/src/**') }} + + - name: Setup rust + if: steps.restore-cache-test.outputs.cache-hit != 'true' + uses: actions-rust-lang/setup-rust-toolchain@v1 + + - name: Build test binaries + if: steps.restore-cache-test.outputs.cache-hit != 'true' + run: cargo build --release --manifest-path tests/manual/test_inject/Cargo.toml + + - name: Cache test binaries + if: steps.restore-cache-test.outputs.cache-hit != 'true' + uses: actions/cache/save@v4 + with: + path: tests/manual/test_inject/target + key: ${{ runner.os }}-${{ hashFiles('tests/manual/test_inject/src/**') }} + - name: Set up Python ${{ matrix.python-version }} uses: actions/setup-python@v5 with: python-version: ${{ matrix.python-version }} + - name: Install uv uses: astral-sh/setup-uv@v6 + - name: Install dependencies - run: uv sync --all-groups + run: uv sync + - name: Run tests run: uv run pytest diff --git a/Cargo.lock b/Cargo.lock new file mode 100644 index 0000000..7c743bd --- /dev/null +++ b/Cargo.lock @@ -0,0 +1,155 @@ +# This file is automatically @generated by Cargo. +# It is not intended for manual editing. +version = 4 + +[[package]] +name = "proc-macro2" +version = "1.0.95" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "02b3e5e68a3a1a02aad3ec490a98007cbc13c37cbe84a3cd7b8e406d76e7f778" +dependencies = [ + "unicode-ident", +] + +[[package]] +name = "quote" +version = "1.0.40" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1885c039570dc00dcb4ff087a89e185fd56bae234ddc7f056a945bf36467248d" +dependencies = [ + "proc-macro2", +] + +[[package]] +name = "syn" +version = "2.0.101" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8ce2b7fc941b3a24138a0a7cf8e858bfc6a992e7978a068a5c760deb0ed43caf" +dependencies = [ + "proc-macro2", + "quote", + "unicode-ident", +] + +[[package]] +name = "test_inject" +version = "0.1.0" +dependencies = [ + "windows", +] + +[[package]] +name = "unicode-ident" +version = "1.0.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5a5f39404a5da50712a4c1eecf25e90dd62b613502b7e925fd4e4d19b5c96512" + +[[package]] +name = "windows" +version = "0.62.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "527fadee13e0c05939a6a05d5bd6eec6cd2e3dbd648b9f8e447c6518133d8580" +dependencies = [ + "windows-collections", + "windows-core", + "windows-future", + "windows-numerics", +] + +[[package]] +name = "windows-collections" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "23b2d95af1a8a14a3c7367e1ed4fc9c20e0a26e79551b1454d72583c97cc6610" +dependencies = [ + "windows-core", +] + +[[package]] +name = "windows-core" +version = "0.62.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b8e83a14d34d0623b51dce9581199302a221863196a1dde71a7663a4c2be9deb" +dependencies = [ + "windows-implement", + "windows-interface", + "windows-link", + "windows-result", + "windows-strings", +] + +[[package]] +name = "windows-future" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e1d6f90251fe18a279739e78025bd6ddc52a7e22f921070ccdc67dde84c605cb" +dependencies = [ + "windows-core", + "windows-link", + "windows-threading", +] + +[[package]] +name = "windows-implement" +version = "0.60.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "053e2e040ab57b9dc951b72c264860db7eb3b0200ba345b4e4c3b14f67855ddf" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "windows-interface" +version = "0.59.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3f316c4a2570ba26bbec722032c4099d8c8bc095efccdc15688708623367e358" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "windows-link" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f0805222e57f7521d6a62e36fa9163bc891acd422f971defe97d64e70d0a4fe5" + +[[package]] +name = "windows-numerics" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6e2e40844ac143cdb44aead537bbf727de9b044e107a0f1220392177d15b0f26" +dependencies = [ + "windows-core", + "windows-link", +] + +[[package]] +name = "windows-result" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7781fa89eaf60850ac3d2da7af8e5242a5ea78d1a11c49bf2910bb5a73853eb5" +dependencies = [ + "windows-link", +] + +[[package]] +name = "windows-strings" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7837d08f69c77cf6b07689544538e017c1bfcf57e34b4c0ff58e6c2cd3b37091" +dependencies = [ + "windows-link", +] + +[[package]] +name = "windows-threading" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3949bd5b99cafdf1c7ca86b43ca564028dfe27d66958f2470940f73d86d75b37" +dependencies = [ + "windows-link", +] diff --git a/Cargo.toml b/Cargo.toml new file mode 100644 index 0000000..38a77d6 --- /dev/null +++ b/Cargo.toml @@ -0,0 +1,4 @@ +[workspace] +members = [ + "tests/manual/test_inject" +] \ No newline at end of file diff --git a/README.md b/README.md index f79eb18..ced0521 100644 --- a/README.md +++ b/README.md @@ -43,8 +43,9 @@ This project uses uv as the build backend and package manager. - isort . && black . Optional: A Nix flake provides a dev shell with Python 3.11, just, black, isort, and more: + - nix develop ## Support -discord: https://discord.gg/wcftyYm6qe +discord: diff --git a/flake.nix b/flake.nix index 8e0ebe9..b747bfc 100644 --- a/flake.nix +++ b/flake.nix @@ -31,6 +31,8 @@ python just alejandra + rustc + cargo python.pkgs.black python.pkgs.isort python.pkgs.vulture diff --git a/justfile b/justfile index bb45ff0..d7a61fd 100644 --- a/justfile +++ b/justfile @@ -46,3 +46,18 @@ format: isort . black . alejandra . + +# build test dll +build-test-dll: + if (!(Test-Path "tests/manual/test_inject/target/release/test_inject.dll")) { cargo build --release --manifest-path tests/manual/test_inject/Cargo.toml } + +# build test exe +build-test-exe: + if (!(Test-Path "tests/manual/test_inject/target/release/inject_target.dll")) { cargo build --release --manifest-path tests/manual/test_inject/Cargo.toml --bin inject_target } + +# run manual tests +manual-test: build-test-dll build-test-exe + uv run pytest -rs --run-manual tests/manual/ + +# run all tests +all-tests: test manual-test diff --git a/memobj/__init__.py b/memobj/__init__.py index 117cd42..a79d8e6 100644 --- a/memobj/__init__.py +++ b/memobj/__init__.py @@ -1,9 +1,9 @@ +import logging + +from .allocation import Allocation, Allocator +from .object import MemoryObject from .process import Process, WindowsProcess from .property import * -from .object import MemoryObject -from .allocation import Allocator, Allocation - -import logging logger = logging.getLogger(__name__) logger.addHandler(logging.NullHandler()) diff --git a/memobj/allocation.py b/memobj/allocation.py index ab02621..c6be126 100644 --- a/memobj/allocation.py +++ b/memobj/allocation.py @@ -1,14 +1,16 @@ -from typing import TYPE_CHECKING, Self +from typing import TYPE_CHECKING, Any, Self +from memobj.utils import Type, get_type_size if TYPE_CHECKING: from memobj.process import Process class Allocation: - def __init__(self, address: int, process: "Process"): + def __init__(self, address: int, process: "Process", size: int): self.address = address self.process = process + self.size = size self._is_closed: bool = False @@ -32,6 +34,22 @@ def free(self): self.process.free_memory(self.address) self._is_closed = True + def read_typed(self, read_type: Type) -> Any: + if (type_size := get_type_size(read_type)) != self.size: + raise ValueError( + f"{read_type} size ({type_size}) does not match allocation size ({self.size})" + ) + + return self.process.read_typed(self.address, read_type) + + def write_typed(self, write_type: Type, value: Any) -> None: + if (type_size := get_type_size(write_type)) != self.size: + raise ValueError( + f"Write type ({type_size}) does not match allocation size ({self.size})" + ) + + return self.process.write_typed(self.address, write_type, value) + class Allocator: """ @@ -61,12 +79,34 @@ def closed(self) -> bool: return self._is_closed def allocate(self, size: int) -> Allocation: + """ + Allocates a block of memory for the process. + + Allocates a specified block of memory for the associated process, keeping + track of the allocation for management purposes. The allocated memory + is represented as an `Allocation` object and is appended to the list of + current allocations. + + Args: + size (int): The size of the memory block to allocate in bytes. + + Returns: + Allocation: An object representing the allocated memory block. + """ address = self.process.allocate_memory(size) - allocation = Allocation(address, self.process) + allocation = Allocation(address, self.process, size) self.allocations.append(allocation) return allocation def close(self): + """ + Closes the allocator, ensuring all current allocations are properly freed and the allocator + is set to a closed state. This method prevents further use of the allocator by marking it + as closed. If the allocator is already closed, an error will be raised. + + Raises: + ValueError: If the allocator is already closed before the invocation of this method. + """ if self._is_closed: raise ValueError("Cannot close an already closed allocator") diff --git a/memobj/hook.py b/memobj/hook.py index bf614dc..369656a 100644 --- a/memobj/hook.py +++ b/memobj/hook.py @@ -1,23 +1,16 @@ import time from dataclasses import dataclass -from typing import TYPE_CHECKING, Callable, Any, Literal, Self -from logging import getLogger from functools import cached_property +from logging import getLogger +from typing import TYPE_CHECKING, Any, Literal, Self, ClassVar +from collections.abc import Callable import regex -from iced_x86 import ( - Instruction, - Decoder, - Code, - MemoryOperand, - Register, - BlockEncoder, - FlowControl, -) +from iced_x86 import (BlockEncoder, Code, Decoder, FlowControl, Instruction, + MemoryOperand, Register) from iced_x86._iced_x86_py import Register as RegisterType -from memobj.allocation import Allocator, Allocation - +from memobj.allocation import Allocation, Allocator if TYPE_CHECKING: from memobj.process import Process @@ -42,12 +35,14 @@ def _debug_print_disassembly( callback("bytes=" + " ".join(map(hex, code))) for instruction in decoder: - callback(f"{instruction.ip:016X} {instruction}; {instruction.code=}") + callback( + f"{instruction.ip:016X} {instruction}; {instruction.code=} | ({instruction.op_code()})" + ) -def _add_instruction_label(label_id: int, instruction: Instruction) -> Instruction: - instruction.ip = label_id - return instruction +# def _add_instruction_label(label_id: int, instruction: Instruction) -> Instruction: +# instruction.ip = label_id +# return instruction def instructions_to_code( @@ -85,27 +80,50 @@ def __exit__(self, *_): self.deactivate() def pre_hook(self): + """Called before the hook is activated""" pass def post_hook(self): + """Called after the hook is activated""" pass def hook(self) -> Any: + """Called when the hook is activated""" raise NotImplementedError() def unhook(self): + """Called when the hook is deactivated""" pass @property def active(self) -> bool: + """Whether the hook is active""" return self._active def get_code( self, ) -> list[Instruction]: + """Called when the hook is activated to get the code to put in the hook""" raise NotImplementedError() def allocate_variable(self, name: str, size: int) -> Allocation: + """ + Allocate a variable of the specified size for use in the hook, retrievable with get_variable. + + Args: + name: str + The name of the variable to allocate. + size: int + The size of the memory block to allocate. + + Returns: + Allocation + An Allocation object representing the allocated memory block. + + Raises: + ValueError + If a variable with the specified name has already been allocated. + """ if self._variables.get("name") is not None: raise ValueError(f"Variable {name} is already allocated") @@ -114,12 +132,33 @@ def allocate_variable(self, name: str, size: int) -> Allocation: return allocation def get_variable(self, name: str) -> Allocation: + """ + Retrieves the allocated variable by its name. + + This method attempts to fetch the variable associated with the provided + name from the internal allocation storage. If the requested variable + does not exist, an error is raised to indicate that it has not been + allocated. + + Args: + name (str): The name of the variable to retrieve. + + Returns: + Allocation + The allocated variable corresponding to the given name. + + Raises: + ValueError + If the variable with the specified name does not exist in the + allocation storage. + """ try: return self._variables[name] except KeyError: raise ValueError(f"Variable {name} has not been allocated") def activate(self) -> dict[str, Allocation]: + """Activate the hook""" if self.active: raise ValueError(f"Cannot activate active hook {self.__class__.__name__}") @@ -134,6 +173,7 @@ def activate(self) -> dict[str, Allocation]: return self._variables def deactivate(self, *, close_allocator: bool = True): + """Deactivate the hook""" self.unhook() if close_allocator: @@ -144,18 +184,36 @@ def deactivate(self, *, close_allocator: bool = True): class JmpHook(Hook): - PATTERN: regex.Pattern | bytes | None = None - MODULE: str | None = None - PRESERVE_RAX: bool = True + PATTERN: ClassVar[regex.Pattern[bytes] | bytes | None] = None + MODULE: ClassVar[str | None] = None + PRESERVE_RAX: ClassVar[bool] = True - def __init__(self, process: "Process"): + def __init__( + self, + process: "Process", + *, + special_deallocate: bool = True, + delayed_close_allocator_seconds: float | None = 0.5, + ): + """ + Initializes an instance of the specified class with provided parameters. + + Args: + process (Process): The process to hook. + special_deallocate (bool): If we should remove the entry jump before deallocating. Default is True. + delayed_close_allocator_seconds (float | None): How many seconds to delay the deallocating. Default is 0.5. + """ + # TODO: make this better if not process.process_64_bit: - self.PRESERVE_RAX = False + self.__class__.PRESERVE_RAX = False super().__init__(process) # (address, code) self._original_code: tuple[int, bytes] | None = None + self.close_allocator = special_deallocate + self.delayed_close_allocator_seconds = delayed_close_allocator_seconds + def activate(self) -> dict[str, Allocation]: if self.PATTERN is None: raise ValueError(f"PATTERN not set on {self.__class__.__name__}") @@ -168,30 +226,27 @@ def activate(self) -> dict[str, Allocation]: return super().activate() # TODO: move the delayed dealloc stuff to Hook class? - def deactivate( - self, - *, - close_allocator: bool = True, - delayed_close_allocator_seconds: float | None = None, - ): - """Deactivates the hook - - Args: - close_allocator (bool, optional): If the body allocator should be closed. Defaults to True. - delayed_close_allocator_seconds (float | None, optional): how many second to delay body deallocation. Defaults to None. - """ - if close_allocator is False and delayed_close_allocator_seconds is not None: + def deactivate(self, *, close_allocator: bool = True): + """Deactivates the hook""" + if ( + self.close_allocator is False + and self.delayed_close_allocator_seconds is not None + ): raise ValueError( "close_allocator cannot be False with a delayed number of seconds" ) - if close_allocator is True and delayed_close_allocator_seconds is not None: + if ( + self.close_allocator is True + and self.delayed_close_allocator_seconds is not None + ): # this will write over the outside jmp so the body is no longer entered super().deactivate(close_allocator=False) # this wait gives the process time to exit the hook body code, should only need ~1 second - time.sleep(delayed_close_allocator_seconds) + time.sleep(self.delayed_close_allocator_seconds) # finally deallocate the body self.allocator.close() + return None else: return super().deactivate(close_allocator=close_allocator) @@ -271,8 +326,6 @@ def get_hook_tail(self, jump_address: int) -> tuple[list[Instruction], int]: # TODO: what does this 10 mean? is it just a general guess and what else we might need? search_bytes = self.process.read_memory(jump_address, self._jump_needed + 10) - # logger.debug(f"{search_bytes=}") - if self.process.process_64_bit: bitness = 64 else: @@ -449,7 +502,7 @@ class RegisterCaptureSettings: # NOTE: this registertype is kinda mid, we may need to provide our own def create_capture_hook( - pattern: regex.Pattern | bytes, + pattern: regex.Pattern[bytes] | bytes, module: str, bitness: Literal[32] | Literal[64], *, @@ -458,7 +511,7 @@ def create_capture_hook( """Create a capture hook class Args: - pattern (regex.Pattern | bytes): Pattern to hook at + pattern (regex.Pattern[bytes] | bytes): Pattern to hook at module (str): Module to search in bitness (int): What bitness of hook to create register_captures (list[RegisterCaptureSettings]): Registers to capture @@ -474,7 +527,7 @@ def create_capture_hook( def _create_capture_hook_32bit( - pattern: regex.Pattern | bytes, + pattern: regex.Pattern[bytes] | bytes, module: str, register_captures: list[RegisterCaptureSettings], ): @@ -531,7 +584,7 @@ def get_code(self) -> list[Instruction]: def _create_capture_hook_64bit( - pattern: regex.Pattern | bytes, + pattern: regex.Pattern[bytes] | bytes, module: str, register_captures: list[RegisterCaptureSettings], ): diff --git a/memobj/object.py b/memobj/object.py index 1311dc2..acf006a 100644 --- a/memobj/object.py +++ b/memobj/object.py @@ -1,4 +1,4 @@ -from typing import TYPE_CHECKING, Union, Callable +from typing import TYPE_CHECKING, Callable, Union from memobj.property import MemoryProperty, Pointer diff --git a/memobj/process/__init__.py b/memobj/process/__init__.py index 304dd6b..3454fd4 100644 --- a/memobj/process/__init__.py +++ b/memobj/process/__init__.py @@ -1,4 +1,4 @@ from .base import Process from .module import Module -from .windows.process import WindowsProcess from .windows.module import WindowsModule +from .windows.process import WindowsProcess diff --git a/memobj/process/base.py b/memobj/process/base.py index 753e22d..f489621 100644 --- a/memobj/process/base.py +++ b/memobj/process/base.py @@ -2,25 +2,14 @@ import functools import platform import struct -import typing from pathlib import Path -from typing import Any, Self, Literal, TypeAlias, Iterator +from typing import Any, Self, assert_never import regex -from memobj.utils import ProcessEndianess, TypeFormat +from memobj.utils import ProcessEndianness, Type -# TODO: switch to type statements when we drop 3.11 (TypeAlias is depreciated) -FormatStringInt: TypeAlias = Literal[ - "b", "B", "h", "H", "i", "I", "l", "L", "q", "Q", "n", "N", "P" -] -FormatStringFloat: TypeAlias = Literal["e", "f", "d"] -FormatStringBool: TypeAlias = Literal["?"] -FormatStringBytes: TypeAlias = Literal["c", "s", "p"] - - -# TODO: add non-keyword to args where needed (/) class Process: """A connected process""" @@ -148,13 +137,13 @@ def write_memory(self, address: int, value: bytes): raise NotImplementedError() def scan_memory( - self, pattern: regex.Pattern | bytes, *, module: str | None = None + self, pattern: regex.Pattern[bytes] | bytes, *, module: str | None = None ) -> list[int]: """ Scan memory for a regex pattern Args: - pattern: A regex.Pattern or a byte pattern + pattern: A regex.Pattern[bytes] or a byte pattern module: Name of a module to exclusively search Returns: @@ -163,13 +152,13 @@ def scan_memory( raise NotImplementedError() def scan_one( - self, pattern: regex.Pattern | bytes, *, module: str | None = None + self, pattern: regex.Pattern[bytes] | bytes, *, module: str | None = None ) -> int: """ Scan memory for a regex pattern and error if one address was not found Args: - pattern: A regex.Pattern or a byte pattern + pattern: A regex.Pattern[bytes] or a byte pattern module: Name of a module to exclusively search Returns: @@ -206,75 +195,35 @@ def read_formatted(self, address: int, format_string: str) -> tuple[Any] | Any: return formatted - @typing.overload - def read_formatted_single( - self, - address: int, - format_string: FormatStringInt, - *, - endianess: ProcessEndianess = ProcessEndianess.native, - ) -> int: ... - - @typing.overload - def read_formatted_single( - self, - address: int, - format_string: FormatStringBool, - *, - endianess: ProcessEndianess = ProcessEndianess.native, - ) -> bool: ... - - @typing.overload - def read_formatted_single( - self, - address: int, - format_string: FormatStringBytes, - *, - endianess: ProcessEndianess = ProcessEndianess.native, - ) -> bytes: ... - - @typing.overload - def read_formatted_single( + def read_typed( self, address: int, - format_string: FormatStringFloat, + read_type: Type, *, - endianess: ProcessEndianess = ProcessEndianess.native, - ) -> float: ... + endianness: ProcessEndianness = ProcessEndianness.native, + ) -> Any: + """ + Read a single typed value from memory using utils.Type and optional endianness - # NOTE: the order of these matters (the any default needs to be last) - @typing.overload - def read_formatted_single( - self, - address: int, - format_string: str, - *, - endianess: ProcessEndianess = ProcessEndianess.native, - ) -> Any: ... + Args: + address: The address to read from + read_type: The Type enumeration indicating the value to read (e.g., Type.s4) + endianness: The endianness to use when reading (defaults to native "=") - def read_formatted_single( - self, - address: int, - format_string: str, - *, - # TODO: should the default be native? - endianess: ProcessEndianess = ProcessEndianess.native, - ) -> Any: - if len(format_string) != 1: - raise ValueError( - f"format_string should be a single character not {format_string}" - ) - - # we don't just include endianess in the format string to make typing easier - match endianess: - case ProcessEndianess.native: - endianess_string = "=" - case ProcessEndianess.little: - endianess_string = "<" - case ProcessEndianess.big: - endianess_string = ">" - - combined_format = endianess_string + format_string + Returns: + The formatted value as the corresponding Python type + """ + match endianness: + case ProcessEndianness.native: + endianness_string = "=" + case ProcessEndianness.little: + endianness_string = "<" + case ProcessEndianness.big: + endianness_string = ">" + case _: + assert_never(endianness) + + combined_format = endianness_string + read_type.value # type: ignore (it doesn't believe me about exhaustiveness) return self.read_formatted(address, combined_format) @@ -292,81 +241,25 @@ def write_formatted( packed_data = struct.pack(format_string, value) self.write_memory(address, packed_data) - @typing.overload - def write_formatted_single( - self, - address: int, - format_string: FormatStringInt, - value: int, - *, - endianess: ProcessEndianess = ProcessEndianess.native, - ) -> None: ... - - @typing.overload - def write_formatted_single( - self, - address: int, - format_string: FormatStringBool, - value: bool, - *, - endianess: ProcessEndianess = ProcessEndianess.native, - ) -> None: ... - - @typing.overload - def write_formatted_single( - self, - address: int, - format_string: FormatStringFloat, - value: float, - *, - endianess: ProcessEndianess = ProcessEndianess.native, - ) -> None: ... - - @typing.overload - def write_formatted_single( - self, - address: int, - format_string: FormatStringBytes, - value: bytes, - *, - endianess: ProcessEndianess = ProcessEndianess.native, - ) -> None: ... - - # TODO: this overrides the other impls into making value Any when we'd like it to error - # i.e. write_formatted_single(1, "?", 100) would be valid when it shouldn't be - @typing.overload - def write_formatted_single( - self, - address: int, - format_string: str, - value: Any, - *, - endianess: ProcessEndianess = ProcessEndianess.native, - ) -> None: ... - - def write_formatted_single( + def write_typed( self, address: int, - format_string: str, + write_type: Type, value: Any, *, - endianess: ProcessEndianess = ProcessEndianess.native, + endianness: ProcessEndianness = ProcessEndianness.native, ) -> None: - if len(format_string) != 1: - raise ValueError( - f"format_string should be a single character not {format_string}" - ) - - # we don't just include endianess in the format string to make typing easier - match endianess: - case ProcessEndianess.native: - endianess_string = "=" - case ProcessEndianess.little: - endianess_string = "<" - case ProcessEndianess.big: - endianess_string = ">" - - combined_format = endianess_string + format_string + match endianness: + case ProcessEndianness.native: + endianness_string = "=" + case ProcessEndianness.little: + endianness_string = "<" + case ProcessEndianness.big: + endianness_string = ">" + case _: + assert_never(endianness) + + combined_format = endianness_string + write_type.value return self.write_formatted(address, combined_format, value) diff --git a/memobj/process/windows/module.py b/memobj/process/windows/module.py index 1874e13..86ee71f 100644 --- a/memobj/process/windows/module.py +++ b/memobj/process/windows/module.py @@ -17,11 +17,11 @@ """ import ctypes +from typing import TYPE_CHECKING, Iterator, Self -from typing import Self, Iterator, TYPE_CHECKING from memobj.process import Module -from .utils import ModuleEntry32, CheckWindowsOsError +from .utils import CheckWindowsOsError, ModuleEntry32 if TYPE_CHECKING: from .process import WindowsProcess @@ -34,7 +34,7 @@ class WindowsModule(Module): _symbols: dict[str, int] | None = None - # TODO: make a user facing iterface to this copying the object so it isnt changed while they're using it + # TODO: make a user facing iterface to this copying the object so it isn't changed while they're using it # TODO: get wide character variants working # adapted to python from https://learn.microsoft.com/en-us/windows/win32/toolhelp/traversing-the-module-list @staticmethod @@ -104,6 +104,25 @@ def from_name( raise ValueError(f"No modules named {name}") + @classmethod + def get_all_modules(cls, process: "WindowsProcess") -> list[Self]: + modules: list[Self] = [] + + for module in cls._iter_modules(process): + module_name = module.szModule.decode() + + modules.append( + cls( + name=module_name, + base_address=module.modBaseAddr, + executable_path=module.szExePath.decode(), + size=module.modBaseSize, + process=process, + ) + ) + + return modules + def get_symbol_with_name(self, name: str) -> int: try: return self.get_symbols()[name] diff --git a/memobj/process/windows/process.py b/memobj/process/windows/process.py index 93608a8..6cc05a6 100644 --- a/memobj/process/windows/process.py +++ b/memobj/process/windows/process.py @@ -8,19 +8,16 @@ import regex from memobj.allocation import Allocator -from memobj.process.windows.module import WindowsModule -from memobj.process.windows.utils import ( - CheckWindowsOsError, - WindowsModuleInfo, - WindowsMemoryProtection, # TODO: what was this going to be used for? - WindowsMemoryBasicInformation, - LUID, - LUID_AND_ATTRIBUTES, - SingleLUIDAndAttributes, - TOKEN_PRIVILEGES, - PROCESSENTRY32, -) from memobj.process import Process +from memobj.process.windows.module import WindowsModule +from memobj.process.windows.utils import \ + WindowsMemoryProtection # TODO: what was this going to be used for? +from memobj.process.windows.utils import (LUID, LUID_AND_ATTRIBUTES, + PROCESSENTRY32, TOKEN_PRIVILEGES, + CheckWindowsOsError, + SingleLUIDAndAttributes, + WindowsMemoryBasicInformation, + WindowsModuleInfo) # TODO: update everything that uses modules to use the new WindowsModule @@ -302,17 +299,17 @@ def write_memory(self, address: int, value: bytes): def scan_memory( self, - pattern: regex.Pattern | bytes, + pattern: regex.Pattern[bytes] | bytes, *, - module: Union[str, WindowsModuleInfo, bool, None] = None, + module: str | WindowsModule | bool | None = None, ) -> list[int]: """ Scan memory for a regex pattern Args: - pattern: A regex.Pattern or a byte pattern + pattern: A regex.Pattern[bytes] or a byte pattern module: Name of a module to exclusively search or a module to search for - (True is shortcut for base module) + (True is a shortcut for base module) Returns: A list of addresses that matched @@ -332,14 +329,14 @@ def scan_memory( elif module is False: raise ValueError("module can only be True") - elif isinstance(module, WindowsModuleInfo): + elif isinstance(module, WindowsModule): pass else: module = self.get_module_named(module) - region_start = module.lpBaseOfDll - max_size = region_start + module.SizeOfImage + region_start = module.base_address + max_size = region_start + module.size matches: list[int] = [] while region_start < max_size: @@ -399,99 +396,42 @@ def virtual_query(self, address: int = 0) -> WindowsMemoryBasicInformation: @typing.overload def get_modules( self, base_only: typing.Literal[False] = False - ) -> list[WindowsModuleInfo]: ... + ) -> list[WindowsModule]: ... @typing.overload - def get_modules(self, base_only: typing.Literal[True]) -> WindowsModuleInfo: ... + def get_modules(self, base_only: typing.Literal[True]) -> WindowsModule: ... # TODO: check if you actually can't get modules on linux # note: platform dependent def get_modules( self, base_only: bool = False - ) -> list[WindowsModuleInfo] | WindowsModuleInfo: - # TODO: for some reason EnumProcessModulesEx always sets LastError? - # with CheckWindowsOsError(): - # TODO: is it always the psapi dll? check requirments section - # https://learn.microsoft.com/en-us/windows/win32/api/psapi/nf-psapi-enumprocessmodulesex - lpcb_needed = ctypes.wintypes.DWORD() - - success = ctypes.windll.psapi.EnumProcessModulesEx( - self.process_handle, - 0, - 0, - ctypes.byref(lpcb_needed), - 0x3, - ) - - if success == 0: - raise RuntimeError("EnumProcessModulesEx (get size) failed") - - # with CheckWindowsOsError(): - module_handles_type = ctypes.wintypes.HMODULE * ( - lpcb_needed.value // ctypes.sizeof(ctypes.wintypes.HMODULE) - ) - module_handles = module_handles_type() - - success = ctypes.windll.psapi.EnumProcessModulesEx( - self.process_handle, - ctypes.byref(module_handles), - lpcb_needed, - ctypes.byref(ctypes.wintypes.DWORD()), - 0x3, - ) - - if success == 0: - raise RuntimeError("EnumProcessModulesEx failed") - - with CheckWindowsOsError(): - modules = [] - for module_handle in module_handles: - # https://learn.microsoft.com/en-us/windows/win32/api/psapi/nf-psapi-getmoduleinformation - module_info = WindowsModuleInfo() - - success = ctypes.windll.psapi.GetModuleInformation( - self.process_handle, - ctypes.wintypes.HMODULE(module_handle), - ctypes.byref(module_info), - ctypes.sizeof(module_info), - ) - - if success == 0: - raise ValueError( - f"GetModuleInformation failed for handle {module_handle}" - ) + ) -> list[WindowsModule] | WindowsModule: + if base_only: + return WindowsModule.from_name(self, self.executable_path.name) + else: + return WindowsModule.get_all_modules(self) - if base_only: - return module_info + # def get_module_name(self, module: WindowsModuleInfo) -> str: + # with CheckWindowsOsError(): + # # https://learn.microsoft.com/en-us/windows/win32/api/psapi/nf-psapi-getmodulebasenamew + # # I just assume MAX_PATH is good enough + # name_buffer = ctypes.create_unicode_buffer(ctypes.wintypes.MAX_PATH) - modules.append(module_info) + # success = ctypes.windll.psapi.GetModuleBaseNameW( + # self.process_handle, + # ctypes.c_void_p(module.lpBaseOfDll), + # ctypes.byref(name_buffer), + # ctypes.wintypes.MAX_PATH, + # ) - return modules + # if success == 0: + # raise ValueError(f"GetModuleBaseNameW failed for {module}") - def get_module_name(self, module: WindowsModuleInfo) -> str: - with CheckWindowsOsError(): - # https://learn.microsoft.com/en-us/windows/win32/api/psapi/nf-psapi-getmodulebasenamew - # I just assume MAX_PATH is good enough - name_buffer = ctypes.create_unicode_buffer(ctypes.wintypes.MAX_PATH) + # return name_buffer.value - success = ctypes.windll.psapi.GetModuleBaseNameW( - self.process_handle, - ctypes.c_void_p(module.lpBaseOfDll), - ctypes.byref(name_buffer), - ctypes.wintypes.MAX_PATH, - ) - - if success == 0: - raise ValueError(f"GetModuleBaseNameW failed for {module}") - - return name_buffer.value - - def get_module_named(self, name: str) -> WindowsModuleInfo: - for module in self.get_modules(): - if self.get_module_name(module) == name: - return module - - raise ValueError(f"No modules named {name}") + # TODO: return module here + def get_module_named(self, name: str) -> WindowsModule: + return WindowsModule.from_name(self, name) # note: platform dependent def create_remote_thread( @@ -555,7 +495,6 @@ def inject_dll(self, path: Path | str) -> WindowsModule: Returns: WindowsModule: The injected module """ - if isinstance(path, str): path = Path(path) @@ -574,4 +513,5 @@ def inject_dll(self, path: Path | str) -> WindowsModule: thread_wait_time=-1, ) - return WindowsModule.from_name(self, path.name) + # thread_wait_time -1 should wait for the library to be loaded + return WindowsModule.from_name(self, path.name) diff --git a/memobj/property/__init__.py b/memobj/property/__init__.py index 1874226..09be17e 100644 --- a/memobj/property/__init__.py +++ b/memobj/property/__init__.py @@ -1,4 +1,4 @@ from .base import MemoryProperty -from .pointer import Pointer, Void, DereffedPointer +from .pointer import DereffedPointer, Pointer, Void from .simple import * from .string import NullTerminatedString diff --git a/memobj/property/array.py b/memobj/property/array.py index 159c58e..135b8f2 100644 --- a/memobj/property/array.py +++ b/memobj/property/array.py @@ -1,7 +1,7 @@ from typing import Any -from . import MemoryProperty from .. import MemoryObject +from . import MemoryProperty class Array(MemoryProperty): diff --git a/memobj/property/base.py b/memobj/property/base.py index c79d76e..1a3ab95 100644 --- a/memobj/property/base.py +++ b/memobj/property/base.py @@ -1,5 +1,4 @@ -from typing import TYPE_CHECKING, Optional, Any, Union - +from typing import TYPE_CHECKING, Any, Optional, Union if TYPE_CHECKING: from memobj.object import MemoryObject diff --git a/memobj/property/pointer.py b/memobj/property/pointer.py index 7eb1688..2e05ebe 100644 --- a/memobj/property/pointer.py +++ b/memobj/property/pointer.py @@ -137,9 +137,7 @@ def to_memory_deref(self, value: Any): instance = self._pointed_type(address=addr, process=self.process) for attribute_name in self._pointed_type.__memory_properties__.keys(): - setattr( - instance, attribute_name, getattr(value, attribute_name) - ) + setattr(instance, attribute_name, getattr(value, attribute_name)) elif isinstance(self._pointed_type, str): # noinspection PyProtectedMember diff --git a/memobj/utils.py b/memobj/utils.py index 209e2bf..1913bab 100644 --- a/memobj/utils.py +++ b/memobj/utils.py @@ -1,21 +1,20 @@ -import time -import struct import operator +import struct +import time from enum import Enum from typing import Callable, Generic, TypeVar - # TODO: remove when dropping 3.11 support T = TypeVar("T") class ValueWaiter(Generic[T]): - def __init__(self, callback: Callable[[], T]): - """A utility class to wait for changes from a callable + """A utility class to wait for changes from a callable - Args: - callback (Callable[[], T]): the callable to wait for changes from - """ + Args: + callback (Callable[[], T]): the callable to wait for changes from + """ + def __init__(self, callback: Callable[[], T]): self.callback = callback def wait_for_value( @@ -96,35 +95,38 @@ def yield_changes( results += 1 -class ProcessEndianess(Enum): +class ProcessEndianness(Enum): native = 0 little = 1 big = 2 -# TODO: rework read_formated_single to use this -class TypeFormat(Enum): +class Type(Enum): """ Byte sized based types """ # struct calls this char but they're trolling byte = "c" - s1 = "b" - u1 = "B" bool = "?" - s2 = "h" - u2 = "H" - s4 = "i" - u4 = "I" - s8 = "l" - u8 = "L" - ssize = "n" - usize = "N" + signed1 = "b" + unsigned1 = "B" + signed2 = "h" + unsigned2 = "H" + signed4 = "i" + unsigned4 = "I" + signed8 = "l" + unsigned8 = "L" + signed_size = "n" + unsigned_size = "N" float = "f" double = "d" +def get_type_size(type_: Type) -> int: + return struct.calcsize(type_.value) + + def align_up(value: int, align: int) -> int: return align_down(value + (align - 1), align) diff --git a/pyproject.toml b/pyproject.toml index 51c32bd..797b618 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -26,6 +26,8 @@ tests = [ "types-regex>=2025.9.18.20250921", ] +[tool.uv] +default-groups = "all" [tool.uv.build-backend] module-root = "" diff --git a/tests/conftest.py b/tests/conftest.py index 795e8e9..c5af0d1 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,11 +1,15 @@ -import sys import os +import sys import pytest from memobj import Process, WindowsProcess +def pytest_addoption(parser): + parser.addoption("--run-manual", action="store_true", default=False, help="run manual tests") + + @pytest.fixture(scope="session") def process() -> Process: """ diff --git a/tests/manual/test_dll_injection.py b/tests/manual/test_dll_injection.py new file mode 100644 index 0000000..1290ec3 --- /dev/null +++ b/tests/manual/test_dll_injection.py @@ -0,0 +1,27 @@ +import subprocess +import pytest +from pathlib import Path + +import memobj + +import pytest + + +def test_dll_injection(): + dll_path = (Path(__file__).parent / "test_inject/target/release/test_inject.dll").resolve() + exe_path = (Path(__file__).parent / "test_inject/target/release/inject_target.exe").resolve() + + if not dll_path.exists(): + pytest.skip(f"Test DLL not found at {dll_path}") + if not exe_path.exists(): + pytest.skip(f"Test EXE not found at {exe_path}") + + proc = subprocess.Popen([str(exe_path)]) + try: + process = memobj.WindowsProcess.from_id(proc.pid) + assert process.inject_dll(dll_path), "DLL injection failed" + + module = process.get_module_named(dll_path.name) + assert module is not None, "Failed to find injected DLL in remote process" + finally: + proc.terminate() diff --git a/tests/manual/test_inject/Cargo.lock b/tests/manual/test_inject/Cargo.lock new file mode 100644 index 0000000..7c743bd --- /dev/null +++ b/tests/manual/test_inject/Cargo.lock @@ -0,0 +1,155 @@ +# This file is automatically @generated by Cargo. +# It is not intended for manual editing. +version = 4 + +[[package]] +name = "proc-macro2" +version = "1.0.95" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "02b3e5e68a3a1a02aad3ec490a98007cbc13c37cbe84a3cd7b8e406d76e7f778" +dependencies = [ + "unicode-ident", +] + +[[package]] +name = "quote" +version = "1.0.40" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1885c039570dc00dcb4ff087a89e185fd56bae234ddc7f056a945bf36467248d" +dependencies = [ + "proc-macro2", +] + +[[package]] +name = "syn" +version = "2.0.101" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8ce2b7fc941b3a24138a0a7cf8e858bfc6a992e7978a068a5c760deb0ed43caf" +dependencies = [ + "proc-macro2", + "quote", + "unicode-ident", +] + +[[package]] +name = "test_inject" +version = "0.1.0" +dependencies = [ + "windows", +] + +[[package]] +name = "unicode-ident" +version = "1.0.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5a5f39404a5da50712a4c1eecf25e90dd62b613502b7e925fd4e4d19b5c96512" + +[[package]] +name = "windows" +version = "0.62.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "527fadee13e0c05939a6a05d5bd6eec6cd2e3dbd648b9f8e447c6518133d8580" +dependencies = [ + "windows-collections", + "windows-core", + "windows-future", + "windows-numerics", +] + +[[package]] +name = "windows-collections" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "23b2d95af1a8a14a3c7367e1ed4fc9c20e0a26e79551b1454d72583c97cc6610" +dependencies = [ + "windows-core", +] + +[[package]] +name = "windows-core" +version = "0.62.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b8e83a14d34d0623b51dce9581199302a221863196a1dde71a7663a4c2be9deb" +dependencies = [ + "windows-implement", + "windows-interface", + "windows-link", + "windows-result", + "windows-strings", +] + +[[package]] +name = "windows-future" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e1d6f90251fe18a279739e78025bd6ddc52a7e22f921070ccdc67dde84c605cb" +dependencies = [ + "windows-core", + "windows-link", + "windows-threading", +] + +[[package]] +name = "windows-implement" +version = "0.60.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "053e2e040ab57b9dc951b72c264860db7eb3b0200ba345b4e4c3b14f67855ddf" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "windows-interface" +version = "0.59.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3f316c4a2570ba26bbec722032c4099d8c8bc095efccdc15688708623367e358" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "windows-link" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f0805222e57f7521d6a62e36fa9163bc891acd422f971defe97d64e70d0a4fe5" + +[[package]] +name = "windows-numerics" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6e2e40844ac143cdb44aead537bbf727de9b044e107a0f1220392177d15b0f26" +dependencies = [ + "windows-core", + "windows-link", +] + +[[package]] +name = "windows-result" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7781fa89eaf60850ac3d2da7af8e5242a5ea78d1a11c49bf2910bb5a73853eb5" +dependencies = [ + "windows-link", +] + +[[package]] +name = "windows-strings" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7837d08f69c77cf6b07689544538e017c1bfcf57e34b4c0ff58e6c2cd3b37091" +dependencies = [ + "windows-link", +] + +[[package]] +name = "windows-threading" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3949bd5b99cafdf1c7ca86b43ca564028dfe27d66958f2470940f73d86d75b37" +dependencies = [ + "windows-link", +] diff --git a/tests/manual/test_inject/Cargo.toml b/tests/manual/test_inject/Cargo.toml new file mode 100644 index 0000000..6edc05e --- /dev/null +++ b/tests/manual/test_inject/Cargo.toml @@ -0,0 +1,17 @@ +[package] +name = "test_inject" +version = "0.1.0" +edition = "2021" + +[workspace] +resolver = "2" + +[lib] +crate-type = ["cdylib"] + +[dependencies] +windows = { version = "0.62.2", features = ["Win32_System_Console", "Win32_Foundation", "Win32_System_SystemServices"] } + +[[bin]] +name = "inject_target" +path = "src/bin/inject_target.rs" \ No newline at end of file diff --git a/tests/manual/test_inject/src/bin/inject_target.rs b/tests/manual/test_inject/src/bin/inject_target.rs new file mode 100644 index 0000000..3a7119b --- /dev/null +++ b/tests/manual/test_inject/src/bin/inject_target.rs @@ -0,0 +1,4 @@ +fn main() { + // This process just waits, so it can be injected into. + std::thread::sleep(std::time::Duration::from_secs(60 * 5)); +} \ No newline at end of file diff --git a/tests/manual/test_inject/src/lib.rs b/tests/manual/test_inject/src/lib.rs new file mode 100644 index 0000000..4840707 --- /dev/null +++ b/tests/manual/test_inject/src/lib.rs @@ -0,0 +1,45 @@ +use std::ffi::CStr; +use std::fs::File; +use std::io::Write; +use std::os::raw::c_char; +use windows::Win32::System::SystemServices::DLL_PROCESS_ATTACH; +use windows::Win32::System::Console::AllocConsole; + + +// #[no_mangle] +// extern "system" fn DllMain(_: *const u8, _: u32, _: *const u8) -> u32 { 1 } + +#[no_mangle] +extern "system" fn DllMain( + _hinst_dll: *const u8, + _fdw_reason: u32, + _lpv_reserved: *const u8, +) -> bool { + match _fdw_reason { + DLL_PROCESS_ATTACH => unsafe { AllocConsole() }.unwrap_or(()), + _ => () + } + + // return True on successful attach + true +} + + +#[no_mangle] +pub extern "C" fn create_file_at_path(path: *const c_char) -> i32 { + if path.is_null() { + return 0; + } + let c_str = unsafe { CStr::from_ptr(path) }; + match c_str.to_str() { + Ok(path_str) => { + if let Ok(mut file) = File::create(path_str) { + let _ = file.write_all(b"Injection succeeded!"); + 1 + } else { + 0 + } + } + Err(_) => 0, + } +} \ No newline at end of file diff --git a/tests/test_memory_object.py b/tests/test_memory_object.py index f375f20..67cf27e 100644 --- a/tests/test_memory_object.py +++ b/tests/test_memory_object.py @@ -1,7 +1,7 @@ import ctypes -from memobj.property import * from memobj import MemoryObject +from memobj.property import * # TODO: why do I use this instead of ctypes.addressof? diff --git a/tests/test_process.py b/tests/test_process.py index 740056b..34083ed 100644 --- a/tests/test_process.py +++ b/tests/test_process.py @@ -1,8 +1,6 @@ def test_get_module_name(process): base_module = process.get_modules(True) - base_module_name = process.get_module_name(base_module) - - assert base_module_name == "python.exe" + assert base_module.name == "python.exe" def test_get_module_named(process):