diff --git a/MANIFEST.in b/MANIFEST.in new file mode 100644 index 000000000..72dcf4891 --- /dev/null +++ b/MANIFEST.in @@ -0,0 +1 @@ +include spras/cgroup_wrapper.sh diff --git a/config/config.yaml b/config/config.yaml index 1e5f1d561..aef81a767 100644 --- a/config/config.yaml +++ b/config/config.yaml @@ -5,15 +5,26 @@ hash_length: 7 # Specify the container framework used by each PRM wrapper. Valid options include: # - docker (default if not specified) -# - singularity -- Also known as apptainer, useful in HPC/HTC environments where docker isn't allowed -# - dsub -- experimental with limited support, used for running on Google Cloud with the All of Us cloud environment. -# - There is no support for other environments at the moment. +# - singularity OR apptainer -- Apptainer (formerly Singularity) is useful in HPC/HTC environments where docker isn't allowed +# - dsub -- experimental with limited support, used for running on Google Cloud container_framework: docker -# Only used if container_framework is set to singularity, this will unpack the singularity containers +# Enabling profiling adds a file called 'usage-profile.tsv' to the output directory of each algorithm. +# The contents of this file describe the CPU utilization and peak memory consumption of the algorithm +# as seen by its runtime container. +# NOTE: Profiling is currently supported only when the container framework is set to apptainer/singularity +# and when the host system supports the 'cgroup' filesystem. +# When profiling via HTCondor, this assumes the current process is already in a two-level nested cgroup +# (introduced in HTCondor 24.8.0). To specify a minimum HTCondor version, use the following `requirements` +# expression: +# +# requirements = versionGE(split(Target.CondorVersion)[1], "24.8.0") && (isenforcingdiskusage =!= true) +enable_profiling: false + +# Only used if container_framework is set to singularity/apptainer, this will unpack the containers # to the local filesystem. This is useful when PRM containers need to run inside another container, # such as would be the case in an HTCondor/OSPool environment. -# NOTE: This unpacks singularity containers to the local filesystem, which will take up space in a way +# NOTE: This unpacks containers to the local filesystem, which will take up space in a way # that persists after the workflow is complete. To clean up the unpacked containers, the user must # manually delete them. For convenience, these unpacked files will exist in the current working directory # under `unpacked`. diff --git a/pyproject.toml b/pyproject.toml index 7a58e8ccb..424f183cf 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -71,8 +71,13 @@ select = [ "W292", # missing-newline-at-end-of-file ] -[tool.setuptools] -# py-modules tells setuptools which directory is our actual module -py-modules = ["spras"] +[tool.setuptools.packages.find] +where = ["."] +include = ["spras*"] + +# Include non-python executables +[tool.setuptools.package-data] +"spras" = ["cgroup_wrapper.sh"] + # packages tells setuptools what the exported package is called (ie allows import spras) packages = ["spras", "spras.analysis", "spras.config"] diff --git a/spras/allpairs.py b/spras/allpairs.py index e0f28d748..7ff1ade5c 100644 --- a/spras/allpairs.py +++ b/spras/allpairs.py @@ -92,7 +92,8 @@ def run(nodetypes=None, network=None, directed_flag=None, output_file=None, cont volumes.append(bind_path) # Create the parent directories for the output file if needed - Path(output_file).parent.mkdir(parents=True, exist_ok=True) + out_dir = Path(output_file).parent + out_dir.mkdir(parents=True, exist_ok=True) bind_path, mapped_out_file = prepare_volume(output_file, work_dir) volumes.append(bind_path) @@ -111,7 +112,8 @@ def run(nodetypes=None, network=None, directed_flag=None, output_file=None, cont container_suffix, command, volumes, - work_dir) + work_dir, + out_dir) @staticmethod def parse_output(raw_pathway_file, standardized_pathway_file, params): diff --git a/spras/btb.py b/spras/btb.py index ced433efe..71f774858 100644 --- a/spras/btb.py +++ b/spras/btb.py @@ -133,12 +133,14 @@ def run(sources=None, targets=None, edges=None, output_file=None, container_fram mapped_out_prefix] container_suffix = "bowtiebuilder:v2" - run_container_and_log('BowTieBuilder', - container_framework, - container_suffix, - command, - volumes, - work_dir) + run_container_and_log( + 'BowTieBuilder', + container_framework, + container_suffix, + command, + volumes, + work_dir, + out_dir) # Output is already written to raw-pathway.txt file diff --git a/spras/cgroup_wrapper.sh b/spras/cgroup_wrapper.sh new file mode 100755 index 000000000..e2ba5c3b9 --- /dev/null +++ b/spras/cgroup_wrapper.sh @@ -0,0 +1,19 @@ +#!/bin/bash + +# This script gets invoked by run_singularity_container when enable_profiling is set to true. +# Its arguments are +# If profiling is enabled, we've already created a new cgroup that has no running processes and +# we've started this script with its own PID. To isolate the inner container's resource usage stats, +# we add this script's PID to the new cgroup and then run the apptainer command. Since the generic +# snakemake/spras stuff is outside this cgroup, we can monitor the inner container's resource usage +# without conflating it with the overhead from spras itself. + +CGROUP_PATH="$1" +echo "My cgroup path is: $CGROUP_PATH" +# Pop the first argument off the list so remaining args are just the apptainer command to exec +shift +echo $$ > "$CGROUP_PATH/cgroup.procs" + +# Start apptainer +echo "Executing command: $@" +exec "$@" diff --git a/spras/config/config.py b/spras/config/config.py index 605faecf4..6425a8bab 100644 --- a/spras/config/config.py +++ b/spras/config/config.py @@ -71,6 +71,8 @@ def __init__(self, raw_config: dict[str, Any]): self.container_prefix: str = DEFAULT_CONTAINER_PREFIX # A Boolean specifying whether to unpack singularity containers. Default is False self.unpack_singularity = False + # A Boolean indiciating whether to enable container runtime profiling (apptainer/singularity only) + self.enable_profiling = False # A dictionary to store configured datasets against which SPRAS will be run self.datasets = None # A dictionary to store configured gold standard data against output of SPRAS runs @@ -299,6 +301,10 @@ def process_config(self, raw_config: RawConfig): if raw_config.container_registry and raw_config.container_registry.base_url != "" and raw_config.container_registry.owner != "": self.container_prefix = raw_config.container_registry.base_url + "/" + raw_config.container_registry.owner + if raw_config.enable_profiling and not raw_config.container_framework in ["singularity", "apptainer"]: + warnings.warn("enable_profiling is set to true, but the container framework is not singularity/apptainer. This setting will have no effect.") + self.enable_profiling = raw_config.enable_profiling + self.process_datasets(raw_config) self.process_algorithms(raw_config) self.process_analysis(raw_config) diff --git a/spras/config/schema.py b/spras/config/schema.py index c84ea4384..5cc4285ef 100644 --- a/spras/config/schema.py +++ b/spras/config/schema.py @@ -91,8 +91,8 @@ def validate(label: str): class ContainerFramework(CaseInsensitiveEnum): docker = 'docker' - # TODO: add apptainer variant once #260 gets merged singularity = 'singularity' + apptainer = 'apptainer' dsub = 'dsub' class ContainerRegistry(BaseModel): @@ -152,6 +152,7 @@ class RawConfig(BaseModel): container_framework: ContainerFramework = ContainerFramework.docker unpack_singularity: bool = False container_registry: ContainerRegistry + enable_profiling: bool = False hash_length: int = DEFAULT_HASH_LENGTH "The length of the hash used to identify a parameter combination" diff --git a/spras/containers.py b/spras/containers.py index b7711c4f6..88a02ec58 100644 --- a/spras/containers.py +++ b/spras/containers.py @@ -10,6 +10,7 @@ import spras.config.config as config from spras.logging import indent +from spras.profiling import create_apptainer_container_stats, create_peer_cgroup from spras.util import hash_filename @@ -131,7 +132,7 @@ def env_to_items(environment: dict[str, str]) -> Iterator[str]: # TODO consider a better default environment variable # Follow docker-py's naming conventions (https://docker-py.readthedocs.io/en/stable/containers.html) # Technically the argument is an image, not a container, but we use container here. -def run_container(framework: str, container_suffix: str, command: List[str], volumes: List[Tuple[PurePath, PurePath]], working_dir: str, environment: Optional[dict[str, str]] = None): +def run_container(framework: str, container_suffix: str, command: List[str], volumes: List[Tuple[PurePath, PurePath]], working_dir: str, out_dir: str | os.PathLike, environment: Optional[dict[str, str]] = None): """ Runs a command in the container using Singularity or Docker @param framework: singularity or docker @@ -140,6 +141,7 @@ def run_container(framework: str, container_suffix: str, command: List[str], vol @param volumes: a list of volumes to mount where each item is a (source, destination) tuple @param working_dir: the working directory in the container @param environment: environment variables to set in the container + @param out_dir: output directory for the rule's artifacts. Only passed into run_container_singularity for the purpose of profiling. @return: output from Singularity execute or Docker run """ normalized_framework = framework.casefold() @@ -147,14 +149,14 @@ def run_container(framework: str, container_suffix: str, command: List[str], vol container = config.config.container_prefix + "/" + container_suffix if normalized_framework == 'docker': return run_container_docker(container, command, volumes, working_dir, environment) - elif normalized_framework == 'singularity': - return run_container_singularity(container, command, volumes, working_dir, environment) + elif normalized_framework == 'singularity' or normalized_framework == "apptainer": + return run_container_singularity(container, command, volumes, working_dir, out_dir, environment) elif normalized_framework == 'dsub': return run_container_dsub(container, command, volumes, working_dir, environment) else: raise ValueError(f'{framework} is not a recognized container framework. Choose "docker", "dsub", or "singularity".') -def run_container_and_log(name: str, framework: str, container_suffix: str, command: List[str], volumes: List[Tuple[PurePath, PurePath]], working_dir: str, environment: Optional[dict[str, str]] = None): +def run_container_and_log(name: str, framework: str, container_suffix: str, command: List[str], volumes: List[Tuple[PurePath, PurePath]], working_dir: str, out_dir: str | os.PathLike, environment: Optional[dict[str, str]] = None): """ Runs a command in the container using Singularity or Docker with associated pretty printed messages. @param name: the display name of the running container for logging purposes @@ -171,7 +173,7 @@ def run_container_and_log(name: str, framework: str, container_suffix: str, comm print('Running {} on container framework "{}" on env {} with command: {}'.format(name, framework, list(env_to_items(environment)), ' '.join(command)), flush=True) try: - out = run_container(framework=framework, container_suffix=container_suffix, command=command, volumes=volumes, working_dir=working_dir, environment=environment) + out = run_container(framework=framework, container_suffix=container_suffix, command=command, volumes=volumes, working_dir=working_dir, out_dir=out_dir, environment=environment) if out is not None: if isinstance(out, list): out = ''.join(out) @@ -289,8 +291,7 @@ def run_container_docker(container: str, command: List[str], volumes: List[Tuple # finally: return out - -def run_container_singularity(container: str, command: List[str], volumes: List[Tuple[PurePath, PurePath]], working_dir: str, environment: Optional[dict[str, str]] = None): +def run_container_singularity(container: str, command: List[str], volumes: List[Tuple[PurePath, PurePath]], working_dir: str, out_dir: str, environment: Optional[dict[str, str]] = None): """ Runs a command in the container using Singularity. Only available on Linux. @@ -298,6 +299,7 @@ def run_container_singularity(container: str, command: List[str], volumes: List[ @param command: command to run in the container @param volumes: a list of volumes to mount where each item is a (source, destination) tuple @param working_dir: the working directory in the container + @param out_dir: output directory for the rule's artifacts -- used here to store profiling data @param environment: environment variable to set in the container @return: output from Singularity execute """ @@ -329,11 +331,14 @@ def run_container_singularity(container: str, command: List[str], volumes: List[ singularity_options.extend(['--env', ",".join(env_to_items(environment))]) # Handle unpacking singularity image if needed. Potentially needed for running nested unprivileged containers + expanded_image = None if config.config.unpack_singularity: - # Split the string by "/" + # The incoming image string is of the format //: e.g. + # hub.docker.com/reedcompbio/spras:latest + # Here we first produce a .sif image using the image name and tag (base_cont) + # and then expand that image into a sandbox directory. For example, + # hub.docker.com/reedcompbio/spras:latest --> spras_latest.sif --> ./spras_latest/ path_elements = container.split("/") - - # Get the last element, which will indicate the base container name base_cont = path_elements[-1] base_cont = base_cont.replace(":", "_").split(":")[0] sif_file = base_cont + ".sif" @@ -348,24 +353,49 @@ def run_container_singularity(container: str, command: List[str], volumes: List[ base_cont_path = unpacked_dir / Path(base_cont) - # Check if the directory for base_cont already exists. When running concurrent jobs, it's possible + # Check whether the directory for base_cont_path already exists. When running concurrent jobs, it's possible # something else has already pulled/unpacked the container. - # Here, we expand the sif image from `image_path` to a directory indicated by `base_cont` + # Here, we expand the sif image from `image_path` to a directory indicated by `base_cont_path` if not base_cont_path.exists(): Client.build(recipe=image_path, image=str(base_cont_path), sandbox=True, sudo=False) - - # Execute the locally unpacked container. - return Client.execute(str(base_cont_path), - command, - options=singularity_options, - bind=bind_paths) - + expanded_image = base_cont_path # This is the sandbox directory + + # If not using the expanded sandbox image, we still need to prepend the docker:// prefix + # so apptainer knows to pull and convert the image format from docker to apptainer. + image_to_run = expanded_image if expanded_image else "docker://" + container + if config.config.enable_profiling: + # We won't end up using the spython client if profiling is enabled because + # we need to run everything manually to set up the cgroup + # Build the apptainer run command, which gets passed to the cgroup wrapper script + singularity_cmd = [ + "apptainer", "exec" + ] + for bind in bind_paths: + singularity_cmd.extend(["--bind", bind]) + singularity_cmd.extend(singularity_options) + singularity_cmd.append(image_to_run) + singularity_cmd.extend(command) + + my_cgroup = create_peer_cgroup() + # The wrapper script is packaged with spras, and should be located in the same directory + # as `containers.py`. + wrapper = os.path.join(os.path.dirname(__file__), "cgroup_wrapper.sh") + cmd = [wrapper, my_cgroup] + singularity_cmd + proc = subprocess.run(cmd, capture_output=True, text=True, stderr=subprocess.STDOUT) + + print("Reading memory and CPU stats from cgroup") + create_apptainer_container_stats(my_cgroup, out_dir) + + result = proc.stdout else: - # Adding 'docker://' to the container indicates this is a Docker image Singularity must convert - return Client.execute('docker://' + container, - command, - options=singularity_options, - bind=bind_paths) + result = Client.execute( + image=image_to_run, + command=command, + options=singularity_options, + bind=bind_paths + ) + + return result # Because this is called independently for each file, the same local path can be mounted to multiple volumes diff --git a/spras/domino.py b/spras/domino.py index 449715b1f..dabf4fc6b 100644 --- a/spras/domino.py +++ b/spras/domino.py @@ -117,7 +117,8 @@ def run(network=None, active_genes=None, output_file=None, slice_threshold=None, container_suffix, slicer_command, volumes, - work_dir) + work_dir, + out_dir) # Make the Python command to run within the container domino_command = ['domino', @@ -136,12 +137,14 @@ def run(network=None, active_genes=None, output_file=None, slice_threshold=None, if module_threshold is not None: domino_command.extend(['--module_threshold', str(module_threshold)]) - run_container_and_log('DOMINO', - container_framework, - container_suffix, - domino_command, - volumes, - work_dir) + run_container_and_log( + 'DOMINO', + container_framework, + container_suffix, + domino_command, + volumes, + work_dir, + out_dir) # DOMINO creates a new folder in out_dir to output its modules HTML files into called active_genes # The filename is determined by the input active_genes and cannot be configured diff --git a/spras/meo.py b/spras/meo.py index 3ae8be9cc..3e4ca4d46 100644 --- a/spras/meo.py +++ b/spras/meo.py @@ -184,7 +184,8 @@ def run(edges=None, sources=None, targets=None, output_file=None, max_path_lengt container_suffix, command, volumes, - work_dir) + work_dir, + out_dir) properties_file_local.unlink(missing_ok=True) diff --git a/spras/mincostflow.py b/spras/mincostflow.py index 03898a1bd..f883afb52 100644 --- a/spras/mincostflow.py +++ b/spras/mincostflow.py @@ -121,7 +121,8 @@ def run(sources=None, targets=None, edges=None, output_file=None, flow=None, cap container_suffix, command, volumes, - work_dir) + work_dir, + out_dir) # Check the output of the container out_dir_content = sorted(out_dir.glob('*.sif')) diff --git a/spras/omicsintegrator1.py b/spras/omicsintegrator1.py index b3b7afbfe..a92d7ecea 100644 --- a/spras/omicsintegrator1.py +++ b/spras/omicsintegrator1.py @@ -202,6 +202,7 @@ def run(edges=None, prizes=None, dummy_nodes=None, dummy_mode=None, mu_squared=N command, volumes, work_dir, + out_dir, {'TMPDIR': mapped_out_dir}) conf_file_local.unlink(missing_ok=True) diff --git a/spras/omicsintegrator2.py b/spras/omicsintegrator2.py index ee133f2bd..b038baec2 100644 --- a/spras/omicsintegrator2.py +++ b/spras/omicsintegrator2.py @@ -134,7 +134,8 @@ def run(edges=None, prizes=None, output_file=None, w=None, b=None, g=None, noise container_suffix, command, volumes, - work_dir) + work_dir, + out_dir) # TODO do we want to retain other output files? # TODO if deleting other output files, write them all to a tmp directory and copy diff --git a/spras/pathlinker.py b/spras/pathlinker.py index a671c9b92..c7cabc97b 100644 --- a/spras/pathlinker.py +++ b/spras/pathlinker.py @@ -118,7 +118,8 @@ def run(nodetypes=None, network=None, output_file=None, k=None, container_framew container_suffix, command, volumes, - work_dir) + work_dir, + out_dir) # Rename the primary output file to match the desired output filename # Currently PathLinker only writes one output file so we do not need to delete others diff --git a/spras/profiling.py b/spras/profiling.py new file mode 100644 index 000000000..185ee4363 --- /dev/null +++ b/spras/profiling.py @@ -0,0 +1,90 @@ +import csv +import os + + +def create_peer_cgroup() -> str: + """ + A helper function that creates a new peer cgroup for the current process. + Apptainer/singularity containers are placed in this cgroup so that they + can be tracked for memory and CPU usage. + This currently assumes HTCondor runs where the current process is already + in a two-level nested cgroup (introduced in HTCondor 24.8.0). + + Returns the path to the peer cgroup, which is needed by the cgroup_wrapper.sh script + to set up the cgroup for the container. + """ + + # Get the current process's cgroup path + # This assumes the cgroup is in the unified hierarchy + with open("/proc/self/cgroup") as f: + first_line = next(f).strip() + cgroup_rel = first_line.split(":")[-1].strip() + + mycgroup = os.path.join("/sys/fs/cgroup", cgroup_rel.lstrip("/")) + peer_cgroup = os.path.join(os.path.dirname(mycgroup), f"spras-peer-{os.getpid()}") + + # Create the peer cgroup directory + try: + os.makedirs(peer_cgroup, exist_ok=True) + except Exception as e: + print(f"Failed to create cgroup: {e}") + + return peer_cgroup + + +def create_apptainer_container_stats(cgroup_path: str, out_dir: str): + """ + Reads the contents of the provided cgroup's memory.peak and cpu.stat files. + This information is parsed and placed in the calling rule's output directory + as 'usage-profile.tsv'. + In particular, we capture peak memory (in bytes) and various CPU usage statistics: + - user_usec: Total user CPU time consumed in microseconds + - system_usec: Total system CPU time consumed in microseconds + - usage_usec: Total CPU time (usually but not always user + system) consumed in microseconds + @param cgroup_path: path to the cgroup directory for the container + @param out_dir: output directory for the rule's artifacts -- used here to store profiling data + """ + + profile_path = os.path.join(out_dir, "usage-profile.tsv") + + peak_mem = "N/A" + try: + with open(os.path.join(cgroup_path, "memory.peak")) as f: + peak_mem = f.read().strip() + except Exception as e: + print(f"Failed to read memory usage from cgroup: {e}") + + cpu_usage = cpu_user = cpu_system = "N/A" + try: + with open(os.path.join(cgroup_path, "cpu.stat")) as f: + # Parse out the contents of the cpu.stat file + # You can find these fields by searching "cpu.stat" in the cgroup documentation: + # https://www.kernel.org/doc/html/latest/admin-guide/cgroup-v2.html + for line in f: + parts = line.strip().split() + if len(parts) != 2: + continue + key, value = parts + if key == "usage_usec": + cpu_usage = value + elif key == "user_usec": + cpu_user = value + elif key == "system_usec": + cpu_system = value + except Exception as e: + print(f"Failed to read cpu.stat from cgroup: {e}") + + # Set up the header for the TSV file + header = ["peak_memory_bytes", "cpu_usage_usec", "cpu_user_usec", "cpu_system_usec"] + row = [peak_mem, cpu_usage, cpu_user, cpu_system] + + # Write the contents of the file + write_header = not os.path.exists(profile_path) or os.path.getsize(profile_path) == 0 + with open(profile_path, "a", newline="") as out_f: + writer = csv.writer(out_f, delimiter="\t") + + # Only write the header if the file was previously empty or did not exist + if write_header: + writer.writerow(header) + writer.writerow(row) + diff --git a/spras/responsenet.py b/spras/responsenet.py index 48dd269cc..bbc3b5255 100644 --- a/spras/responsenet.py +++ b/spras/responsenet.py @@ -105,12 +105,14 @@ def run(sources=None, targets=None, edges=None, output_file=None, gamma=10, cont container_suffix = "responsenet:v2" # constructs a docker run call - run_container_and_log('ResponseNet', - container_framework, - container_suffix, - command, - volumes, - work_dir) + run_container_and_log( + 'ResponseNet', + container_framework, + container_suffix, + command, + volumes, + work_dir, + out_dir) # Rename the primary output file to match the desired output filename out_file_suffixed.rename(output_file) diff --git a/spras/rwr.py b/spras/rwr.py index 396cf4df1..4717aa064 100644 --- a/spras/rwr.py +++ b/spras/rwr.py @@ -2,7 +2,7 @@ import pandas as pd -from spras.containers import prepare_volume, run_container +from spras.containers import prepare_volume, run_container_and_log from spras.dataset import Dataset from spras.interactome import reinsert_direction_col_directed from spras.prm import PRM @@ -72,13 +72,15 @@ def run(network=None, nodes=None, alpha=None, output_file=None, container_framew command.extend(['--alpha', str(alpha)]) container_suffix = 'rwr:v1' - out = run_container(container_framework, - container_suffix, - command, - volumes, - work_dir) + run_container_and_log( + "RandomWalk with Restart", + container_framework, + container_suffix, + command, + volumes, + work_dir, + out_dir) - print(out) # Rename the primary output file to match the desired output filename output_edges = Path(out_dir, 'output.txt') output_edges.rename(output_file) diff --git a/spras/strwr.py b/spras/strwr.py index c4e1df95c..65ea9f923 100644 --- a/spras/strwr.py +++ b/spras/strwr.py @@ -1,6 +1,6 @@ from pathlib import Path -from spras.containers import prepare_volume, run_container +from spras.containers import prepare_volume, run_container_and_log from spras.dataset import Dataset from spras.interactome import reinsert_direction_col_directed from spras.prm import PRM @@ -77,13 +77,15 @@ def run(network=None, sources=None, targets=None, alpha=None, output_file=None, command.extend(['--alpha', str(alpha)]) container_suffix = 'st-rwr:v1' - out = run_container(container_framework, - container_suffix, - command, - volumes, - work_dir) + run_container_and_log( + "Source-Target RandomWalk with Restart", + container_framework, + container_suffix, + command, + volumes, + work_dir, + out_dir) - print(out) # Rename the primary output file to match the desired output filename output_edges = Path(out_dir, 'output.txt') output_edges.rename(output_file)