Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions MANIFEST.in
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
include spras/cgroup_wrapper.sh
21 changes: 16 additions & 5 deletions config/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,26 @@ hash_length: 7

# Specify the container framework used by each PRM wrapper. Valid options include:
# - docker (default if not specified)
# - singularity -- Also known as apptainer, useful in HPC/HTC environments where docker isn't allowed
# - dsub -- experimental with limited support, used for running on Google Cloud with the All of Us cloud environment.
# - There is no support for other environments at the moment.
# - singularity OR apptainer -- Apptainer (formerly Singularity) is useful in HPC/HTC environments where docker isn't allowed
# - dsub -- experimental with limited support, used for running on Google Cloud
container_framework: docker

# Only used if container_framework is set to singularity, this will unpack the singularity containers
# Enabling profiling adds a file called 'usage-profile.tsv' to the output directory of each algorithm.
# The contents of this file describe the CPU utilization and peak memory consumption of the algorithm
# as seen by its runtime container.
# NOTE: Profiling is currently supported only when the container framework is set to apptainer/singularity
# and when the host system supports the 'cgroup' filesystem.
# When profiling via HTCondor, this assumes the current process is already in a two-level nested cgroup
# (introduced in HTCondor 24.8.0). To specify a minimum HTCondor version, use the following `requirements`
# expression:
#
# requirements = versionGE(split(Target.CondorVersion)[1], "24.8.0") && (isenforcingdiskusage =!= true)
enable_profiling: false

# Only used if container_framework is set to singularity/apptainer, this will unpack the containers
# to the local filesystem. This is useful when PRM containers need to run inside another container,
# such as would be the case in an HTCondor/OSPool environment.
# NOTE: This unpacks singularity containers to the local filesystem, which will take up space in a way
# NOTE: This unpacks containers to the local filesystem, which will take up space in a way
# that persists after the workflow is complete. To clean up the unpacked containers, the user must
# manually delete them. For convenience, these unpacked files will exist in the current working directory
# under `unpacked`.
Expand Down
11 changes: 8 additions & 3 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,13 @@ select = [
"W292", # missing-newline-at-end-of-file
]

[tool.setuptools]
# py-modules tells setuptools which directory is our actual module
py-modules = ["spras"]
[tool.setuptools.packages.find]
where = ["."]
include = ["spras*"]

# Include non-python executables
[tool.setuptools.package-data]
"spras" = ["cgroup_wrapper.sh"]

# packages tells setuptools what the exported package is called (ie allows import spras)
packages = ["spras", "spras.analysis", "spras.config"]
6 changes: 4 additions & 2 deletions spras/allpairs.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,8 @@ def run(nodetypes=None, network=None, directed_flag=None, output_file=None, cont
volumes.append(bind_path)

# Create the parent directories for the output file if needed
Path(output_file).parent.mkdir(parents=True, exist_ok=True)
out_dir = Path(output_file).parent
out_dir.mkdir(parents=True, exist_ok=True)
bind_path, mapped_out_file = prepare_volume(output_file, work_dir)
volumes.append(bind_path)

Expand All @@ -111,7 +112,8 @@ def run(nodetypes=None, network=None, directed_flag=None, output_file=None, cont
container_suffix,
command,
volumes,
work_dir)
work_dir,
out_dir)

@staticmethod
def parse_output(raw_pathway_file, standardized_pathway_file, params):
Expand Down
14 changes: 8 additions & 6 deletions spras/btb.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,12 +133,14 @@ def run(sources=None, targets=None, edges=None, output_file=None, container_fram
mapped_out_prefix]

container_suffix = "bowtiebuilder:v2"
run_container_and_log('BowTieBuilder',
container_framework,
container_suffix,
command,
volumes,
work_dir)
run_container_and_log(
'BowTieBuilder',
container_framework,
container_suffix,
command,
volumes,
work_dir,
out_dir)
# Output is already written to raw-pathway.txt file


Expand Down
19 changes: 19 additions & 0 deletions spras/cgroup_wrapper.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
#!/bin/bash

# This script gets invoked by run_singularity_container when enable_profiling is set to true.
# Its arguments are <cgroup_path> <apptainer exec command and args>
# If profiling is enabled, we've already created a new cgroup that has no running processes and
# we've started this script with its own PID. To isolate the inner container's resource usage stats,
# we add this script's PID to the new cgroup and then run the apptainer command. Since the generic
# snakemake/spras stuff is outside this cgroup, we can monitor the inner container's resource usage
# without conflating it with the overhead from spras itself.

CGROUP_PATH="$1"
echo "My cgroup path is: $CGROUP_PATH"
# Pop the first argument off the list so remaining args are just the apptainer command to exec
shift
echo $$ > "$CGROUP_PATH/cgroup.procs"

# Start apptainer
echo "Executing command: $@"
exec "$@"
6 changes: 6 additions & 0 deletions spras/config/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@ def __init__(self, raw_config: dict[str, Any]):
self.container_prefix: str = DEFAULT_CONTAINER_PREFIX
# A Boolean specifying whether to unpack singularity containers. Default is False
self.unpack_singularity = False
# A Boolean indiciating whether to enable container runtime profiling (apptainer/singularity only)
self.enable_profiling = False
# A dictionary to store configured datasets against which SPRAS will be run
self.datasets = None
# A dictionary to store configured gold standard data against output of SPRAS runs
Expand Down Expand Up @@ -299,6 +301,10 @@ def process_config(self, raw_config: RawConfig):
if raw_config.container_registry and raw_config.container_registry.base_url != "" and raw_config.container_registry.owner != "":
self.container_prefix = raw_config.container_registry.base_url + "/" + raw_config.container_registry.owner

if raw_config.enable_profiling and not raw_config.container_framework in ["singularity", "apptainer"]:
warnings.warn("enable_profiling is set to true, but the container framework is not singularity/apptainer. This setting will have no effect.")
self.enable_profiling = raw_config.enable_profiling

self.process_datasets(raw_config)
self.process_algorithms(raw_config)
self.process_analysis(raw_config)
3 changes: 2 additions & 1 deletion spras/config/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,8 +91,8 @@ def validate(label: str):

class ContainerFramework(CaseInsensitiveEnum):
docker = 'docker'
# TODO: add apptainer variant once #260 gets merged
singularity = 'singularity'
apptainer = 'apptainer'
dsub = 'dsub'

class ContainerRegistry(BaseModel):
Expand Down Expand Up @@ -152,6 +152,7 @@ class RawConfig(BaseModel):
container_framework: ContainerFramework = ContainerFramework.docker
unpack_singularity: bool = False
container_registry: ContainerRegistry
enable_profiling: bool = False

hash_length: int = DEFAULT_HASH_LENGTH
"The length of the hash used to identify a parameter combination"
Expand Down
78 changes: 54 additions & 24 deletions spras/containers.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

import spras.config.config as config
from spras.logging import indent
from spras.profiling import create_apptainer_container_stats, create_peer_cgroup
from spras.util import hash_filename


Expand Down Expand Up @@ -131,7 +132,7 @@ def env_to_items(environment: dict[str, str]) -> Iterator[str]:
# TODO consider a better default environment variable
# Follow docker-py's naming conventions (https://docker-py.readthedocs.io/en/stable/containers.html)
# Technically the argument is an image, not a container, but we use container here.
def run_container(framework: str, container_suffix: str, command: List[str], volumes: List[Tuple[PurePath, PurePath]], working_dir: str, environment: Optional[dict[str, str]] = None):
def run_container(framework: str, container_suffix: str, command: List[str], volumes: List[Tuple[PurePath, PurePath]], working_dir: str, out_dir: str | os.PathLike, environment: Optional[dict[str, str]] = None):
"""
Runs a command in the container using Singularity or Docker
@param framework: singularity or docker
Expand All @@ -140,21 +141,22 @@ def run_container(framework: str, container_suffix: str, command: List[str], vol
@param volumes: a list of volumes to mount where each item is a (source, destination) tuple
@param working_dir: the working directory in the container
@param environment: environment variables to set in the container
@param out_dir: output directory for the rule's artifacts. Only passed into run_container_singularity for the purpose of profiling.
@return: output from Singularity execute or Docker run
"""
normalized_framework = framework.casefold()

container = config.config.container_prefix + "/" + container_suffix
if normalized_framework == 'docker':
return run_container_docker(container, command, volumes, working_dir, environment)
elif normalized_framework == 'singularity':
return run_container_singularity(container, command, volumes, working_dir, environment)
elif normalized_framework == 'singularity' or normalized_framework == "apptainer":
return run_container_singularity(container, command, volumes, working_dir, out_dir, environment)
elif normalized_framework == 'dsub':
return run_container_dsub(container, command, volumes, working_dir, environment)
else:
raise ValueError(f'{framework} is not a recognized container framework. Choose "docker", "dsub", or "singularity".')

def run_container_and_log(name: str, framework: str, container_suffix: str, command: List[str], volumes: List[Tuple[PurePath, PurePath]], working_dir: str, environment: Optional[dict[str, str]] = None):
def run_container_and_log(name: str, framework: str, container_suffix: str, command: List[str], volumes: List[Tuple[PurePath, PurePath]], working_dir: str, out_dir: str | os.PathLike, environment: Optional[dict[str, str]] = None):
"""
Runs a command in the container using Singularity or Docker with associated pretty printed messages.
@param name: the display name of the running container for logging purposes
Expand All @@ -171,7 +173,7 @@ def run_container_and_log(name: str, framework: str, container_suffix: str, comm

print('Running {} on container framework "{}" on env {} with command: {}'.format(name, framework, list(env_to_items(environment)), ' '.join(command)), flush=True)
try:
out = run_container(framework=framework, container_suffix=container_suffix, command=command, volumes=volumes, working_dir=working_dir, environment=environment)
out = run_container(framework=framework, container_suffix=container_suffix, command=command, volumes=volumes, working_dir=working_dir, out_dir=out_dir, environment=environment)
if out is not None:
if isinstance(out, list):
out = ''.join(out)
Expand Down Expand Up @@ -289,15 +291,15 @@ def run_container_docker(container: str, command: List[str], volumes: List[Tuple
# finally:
return out


def run_container_singularity(container: str, command: List[str], volumes: List[Tuple[PurePath, PurePath]], working_dir: str, environment: Optional[dict[str, str]] = None):
def run_container_singularity(container: str, command: List[str], volumes: List[Tuple[PurePath, PurePath]], working_dir: str, out_dir: str, environment: Optional[dict[str, str]] = None):
"""
Runs a command in the container using Singularity.
Only available on Linux.
@param container: name of the DockerHub container without the 'docker://' prefix
@param command: command to run in the container
@param volumes: a list of volumes to mount where each item is a (source, destination) tuple
@param working_dir: the working directory in the container
@param out_dir: output directory for the rule's artifacts -- used here to store profiling data
@param environment: environment variable to set in the container
@return: output from Singularity execute
"""
Expand Down Expand Up @@ -329,11 +331,14 @@ def run_container_singularity(container: str, command: List[str], volumes: List[
singularity_options.extend(['--env', ",".join(env_to_items(environment))])

# Handle unpacking singularity image if needed. Potentially needed for running nested unprivileged containers
expanded_image = None
if config.config.unpack_singularity:
# Split the string by "/"
# The incoming image string is of the format <repository>/<owner>/<image name>:<tag> e.g.
# hub.docker.com/reedcompbio/spras:latest
# Here we first produce a .sif image using the image name and tag (base_cont)
# and then expand that image into a sandbox directory. For example,
# hub.docker.com/reedcompbio/spras:latest --> spras_latest.sif --> ./spras_latest/
path_elements = container.split("/")

# Get the last element, which will indicate the base container name
base_cont = path_elements[-1]
base_cont = base_cont.replace(":", "_").split(":")[0]
sif_file = base_cont + ".sif"
Expand All @@ -348,24 +353,49 @@ def run_container_singularity(container: str, command: List[str], volumes: List[

base_cont_path = unpacked_dir / Path(base_cont)

# Check if the directory for base_cont already exists. When running concurrent jobs, it's possible
# Check whether the directory for base_cont_path already exists. When running concurrent jobs, it's possible
# something else has already pulled/unpacked the container.
# Here, we expand the sif image from `image_path` to a directory indicated by `base_cont`
# Here, we expand the sif image from `image_path` to a directory indicated by `base_cont_path`
if not base_cont_path.exists():
Client.build(recipe=image_path, image=str(base_cont_path), sandbox=True, sudo=False)

# Execute the locally unpacked container.
return Client.execute(str(base_cont_path),
command,
options=singularity_options,
bind=bind_paths)

expanded_image = base_cont_path # This is the sandbox directory

# If not using the expanded sandbox image, we still need to prepend the docker:// prefix
# so apptainer knows to pull and convert the image format from docker to apptainer.
image_to_run = expanded_image if expanded_image else "docker://" + container
if config.config.enable_profiling:
# We won't end up using the spython client if profiling is enabled because
# we need to run everything manually to set up the cgroup
# Build the apptainer run command, which gets passed to the cgroup wrapper script
singularity_cmd = [
"apptainer", "exec"
]
for bind in bind_paths:
singularity_cmd.extend(["--bind", bind])
singularity_cmd.extend(singularity_options)
singularity_cmd.append(image_to_run)
singularity_cmd.extend(command)

my_cgroup = create_peer_cgroup()
# The wrapper script is packaged with spras, and should be located in the same directory
# as `containers.py`.
wrapper = os.path.join(os.path.dirname(__file__), "cgroup_wrapper.sh")
cmd = [wrapper, my_cgroup] + singularity_cmd
proc = subprocess.run(cmd, capture_output=True, text=True, stderr=subprocess.STDOUT)

print("Reading memory and CPU stats from cgroup")
create_apptainer_container_stats(my_cgroup, out_dir)

result = proc.stdout
else:
# Adding 'docker://' to the container indicates this is a Docker image Singularity must convert
return Client.execute('docker://' + container,
command,
options=singularity_options,
bind=bind_paths)
result = Client.execute(
image=image_to_run,
command=command,
options=singularity_options,
bind=bind_paths
)

return result


# Because this is called independently for each file, the same local path can be mounted to multiple volumes
Expand Down
17 changes: 10 additions & 7 deletions spras/domino.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,8 @@ def run(network=None, active_genes=None, output_file=None, slice_threshold=None,
container_suffix,
slicer_command,
volumes,
work_dir)
work_dir,
out_dir)

# Make the Python command to run within the container
domino_command = ['domino',
Expand All @@ -136,12 +137,14 @@ def run(network=None, active_genes=None, output_file=None, slice_threshold=None,
if module_threshold is not None:
domino_command.extend(['--module_threshold', str(module_threshold)])

run_container_and_log('DOMINO',
container_framework,
container_suffix,
domino_command,
volumes,
work_dir)
run_container_and_log(
'DOMINO',
container_framework,
container_suffix,
domino_command,
volumes,
work_dir,
out_dir)

# DOMINO creates a new folder in out_dir to output its modules HTML files into called active_genes
# The filename is determined by the input active_genes and cannot be configured
Expand Down
3 changes: 2 additions & 1 deletion spras/meo.py
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,8 @@ def run(edges=None, sources=None, targets=None, output_file=None, max_path_lengt
container_suffix,
command,
volumes,
work_dir)
work_dir,
out_dir)

properties_file_local.unlink(missing_ok=True)

Expand Down
3 changes: 2 additions & 1 deletion spras/mincostflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,8 @@ def run(sources=None, targets=None, edges=None, output_file=None, flow=None, cap
container_suffix,
command,
volumes,
work_dir)
work_dir,
out_dir)

# Check the output of the container
out_dir_content = sorted(out_dir.glob('*.sif'))
Expand Down
1 change: 1 addition & 0 deletions spras/omicsintegrator1.py
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,7 @@ def run(edges=None, prizes=None, dummy_nodes=None, dummy_mode=None, mu_squared=N
command,
volumes,
work_dir,
out_dir,
{'TMPDIR': mapped_out_dir})

conf_file_local.unlink(missing_ok=True)
Expand Down
3 changes: 2 additions & 1 deletion spras/omicsintegrator2.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,8 @@ def run(edges=None, prizes=None, output_file=None, w=None, b=None, g=None, noise
container_suffix,
command,
volumes,
work_dir)
work_dir,
out_dir)

# TODO do we want to retain other output files?
# TODO if deleting other output files, write them all to a tmp directory and copy
Expand Down
3 changes: 2 additions & 1 deletion spras/pathlinker.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,8 @@ def run(nodetypes=None, network=None, output_file=None, k=None, container_framew
container_suffix,
command,
volumes,
work_dir)
work_dir,
out_dir)

# Rename the primary output file to match the desired output filename
# Currently PathLinker only writes one output file so we do not need to delete others
Expand Down
Loading
Loading