Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/ci-tests.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ jobs:
run: uv sync --locked --no-dev --group docs
- name: "Build documentation and check for consistency"
env:
CHECKSUM: "be2933a30a986a448b8b7dfbab602e8301744520f96abde1f8ae35539061c411"
CHECKSUM: "e3715d10625d439a159b0a3bbf527403692cb93f0555b454eda7d5201c008d8a"
run: |
cd docs
HASH="$(uv run --no-sync make checksum | tail -n1)"
Expand Down
6 changes: 6 additions & 0 deletions docs/source/ext/connector.rst
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,10 @@ The ``streamflow.core.deployment`` module defines the ``Connector`` interface, w
) -> MutableMapping[str, AvailableLocation]:
...
async def get_shell(
self, command: MutableSequence[str], location: ExecutionLocation
) -> Shell: ...
async def get_stream_reader(
self,
command: MutableSequence[str],
Expand Down Expand Up @@ -86,6 +90,8 @@ The ``deploy`` method instantiates the remote execution environment, making it r

The ``undeploy`` method destroys the remote execution environment, potentially cleaning up all the temporary resources instantiated during the workflow execution (e.g., intermediate results). If a ``deployment`` object is marked as ``external``, the ``undeploy`` method should not destroy it but just close all the connections opened by the ``deploy`` method.

The ``get_shell`` method returns a ``Shell`` object, which is an abstraction of a persistent remote shell that can be used to execute commands remotely in an efficient way. The ``command`` parameter is used to obtain a shell instance (e.g., ``["sh"]`` for a standard POSIX shell), and the ``location`` parameter identifies the remote ``ExecutionLocation`` where the shell should be instantiated.

The ``get_available_locations`` method is used in the scheduling phase to obtain the locations available for job execution, identified by their unique name (see :ref:`here <Scheduling>`). The method receives an optional input parameter to filter valid locations. The ``service`` parameter specifies a specific set of locations in a deployment, and its precise meaning differs for each deployment type (see :ref:`here <Binding steps and deployments>`).

The ``get_stream_reader`` and ``get_stream_writer`` methods return an `Asynchronous Context Manager <https://docs.python.org/3/reference/datamodel.html#async-context-managers>`_ wrapping a ``StreamWrapper`` instance, allowing it to be used inside ``async with`` statements. The ``StreamWrapper`` instance is obtained by executing the ``command`` on the ``location``, and can be used to read or write data using a stream (see :ref:`here <Streaming>`). The streams must be read and written respecting the size of the available buffer, which is defined by the ``transferBufferSize`` attribute of the ``Connector`` instance. These methods improve performance of data copies between pairs of remote locations.
Expand Down
96 changes: 64 additions & 32 deletions streamflow/core/deployment.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import os
import posixpath
import tempfile
from abc import abstractmethod
from abc import ABC, abstractmethod
from collections.abc import MutableMapping, MutableSequence
from typing import TYPE_CHECKING, AsyncContextManager, cast

Expand Down Expand Up @@ -97,7 +97,6 @@ def __hash__(self) -> int:


class BindingFilter(SchemaEntity):

__slots__ = "name"

def __init__(self, name: str) -> None:
Expand Down Expand Up @@ -170,6 +169,11 @@ async def run(
@abstractmethod
async def undeploy(self, external: bool) -> None: ...

@abstractmethod
async def get_shell(
self, command: MutableSequence[str], location: ExecutionLocation
) -> Shell: ...

@abstractmethod
async def get_stream_reader(
self, command: MutableSequence[str], location: ExecutionLocation
Expand Down Expand Up @@ -288,6 +292,41 @@ async def save(self, context: StreamFlowContext) -> None:
)


class FilterConfig(PersistableEntity):
__slots__ = ("name", "type", "config")

def __init__(self, name: str, type: str, config: MutableMapping[str, Any]):
super().__init__()
self.name: str = name
self.type: str = type
self.config: MutableMapping[str, Any] = config or {}

@classmethod
async def load(
cls,
context: StreamFlowContext,
persistent_id: int,
loading_context: DatabaseLoadingContext,
) -> Self:
row = await context.database.get_filter(persistent_id)
obj = cls(
name=row["name"],
type=row["type"],
config=row["config"],
)
loading_context.add_filter(persistent_id, obj)
return obj

async def save(self, context: StreamFlowContext) -> None:
async with self.persistence_lock:
if not self.persistent_id:
self.persistent_id = await context.database.add_filter(
name=self.name,
type=self.type,
config=self.config,
)


class Target(PersistableEntity):
def __init__(
self,
Expand Down Expand Up @@ -389,39 +428,32 @@ async def _load(
return cls(workdir=row["workdir"])


class FilterConfig(PersistableEntity):
__slots__ = ("name", "type", "config")
class Shell(ABC):
__slots__ = ("command", "buffer_size")

def __init__(self, name: str, type: str, config: MutableMapping[str, Any]):
super().__init__()
self.name: str = name
self.type: str = type
self.config: MutableMapping[str, Any] = config or {}
def __init__(
self,
command: MutableSequence[str],
buffer_size: int,
) -> None:
self.command: MutableSequence[str] = command
self.buffer_size: int = buffer_size

@classmethod
async def load(
cls,
context: StreamFlowContext,
persistent_id: int,
loading_context: DatabaseLoadingContext,
) -> Self:
row = await context.database.get_filter(persistent_id)
obj = cls(
name=row["name"],
type=row["type"],
config=row["config"],
)
loading_context.add_filter(persistent_id, obj)
return obj
@abstractmethod
async def close(self) -> None: ...

async def save(self, context: StreamFlowContext) -> None:
async with self.persistence_lock:
if not self.persistent_id:
self.persistent_id = await context.database.add_filter(
name=self.name,
type=self.type,
config=self.config,
)
@abstractmethod
async def closed(self) -> bool: ...

@abstractmethod
async def execute(
self,
command: MutableSequence[str],
environment: MutableMapping[str, str] | None = ...,
workdir: str | None = ...,
capture_output: bool = ...,
timeout: int | None = ...,
) -> tuple[str, int] | None: ...


class WrapsConfig:
Expand Down
29 changes: 28 additions & 1 deletion streamflow/core/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,12 @@

from streamflow.core.exception import ProcessorTypeError, WorkflowExecutionException
from streamflow.core.persistence import PersistableEntity
from streamflow.log_handler import logger

if TYPE_CHECKING:
from typing import TypeVar

from streamflow.core.deployment import Connector, ExecutionLocation
from streamflow.core.deployment import Connector, ExecutionLocation, Shell
from streamflow.core.workflow import Token

T = TypeVar("T")
Expand Down Expand Up @@ -311,6 +312,32 @@ def random_name() -> str:
return str(uuid.uuid4())


async def run_in_shell(
shell: Shell,
location: ExecutionLocation,
command: MutableSequence[str],
environment: MutableMapping[str, str] | None = None,
workdir: str | None = None,
capture_output: bool = False,
timeout: int | None = None,
) -> tuple[str, int] | None:
try:
return await shell.execute(
command=command,
environment=environment,
workdir=workdir,
capture_output=capture_output,
timeout=timeout,
)
except WorkflowExecutionException as e:
logger.warning(
f"Persistent shell failed for location {location.name} "
f"of deployment {location.deployment}: "
f"falling back to direct exec: {e}"
)
raise e


async def run_in_subprocess(
location: ExecutionLocation,
command: MutableSequence[str],
Expand Down
37 changes: 17 additions & 20 deletions streamflow/cwl/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -1375,35 +1375,32 @@ async def update_file_token(
load_contents: bool | None,
load_listing: LoadListing | None = None,
) -> MutableMapping[str, Any]:
if path := get_path_from_token(token_value):
new_token_value = dict(token_value)
if path := get_path_from_token(new_token_value):
filepath = StreamFlowPath(path, context=context, location=location)
# Process contents
if get_token_class(token_value) == "File" and load_contents is not None:
if load_contents and "contents" not in token_value:
token_value |= {
if get_token_class(new_token_value) == "File" and load_contents is not None:
if load_contents and "contents" not in new_token_value:
new_token_value |= {
"contents": await _get_contents(
filepath,
token_value["size"],
new_token_value["size"],
cwl_version,
)
}
elif not load_contents and "contents" in token_value:
token_value = {
k: token_value[k] for k in token_value if k != "contents"
}
elif not load_contents and "contents" in new_token_value:
del new_token_value["contents"]
# Process listings
if get_token_class(token_value) == "Directory" and load_listing is not None:
if get_token_class(new_token_value) == "Directory" and load_listing is not None:
# If load listing is set to `no_listing`, remove the listing entries in present
if load_listing == LoadListing.no_listing:
if "listing" in token_value:
token_value = {
k: token_value[k] for k in token_value if k != "listing"
}
if load_listing == LoadListing.no_listing and "listing" in new_token_value:
del new_token_value["listing"]
# If listing is not present or if the token needs a deep listing, process directory contents
elif (
"listing" not in token_value or load_listing == LoadListing.deep_listing
"listing" not in new_token_value
or load_listing == LoadListing.deep_listing
):
token_value |= {
new_token_value |= {
"listing": await _get_listing(
context=context,
connector=connector,
Expand All @@ -1416,13 +1413,13 @@ async def update_file_token(
}
# If load listing is set to `shallow_listing`, remove the deep listing entries if present
elif load_listing == LoadListing.shallow_listing:
token_value |= {
new_token_value |= {
"listing": [
{k: v[k] for k in v if k != "listing"}
for v in token_value["listing"]
for v in new_token_value["listing"]
]
}
return token_value
return new_token_value


async def write_remote_file(
Expand Down
25 changes: 13 additions & 12 deletions streamflow/data/remotepath.py
Original file line number Diff line number Diff line change
Expand Up @@ -625,18 +625,19 @@ async def glob(
if not pattern:
raise ValueError(f"Unacceptable pattern: {pattern!r}")
command = [
"set",
"--",
f"{shlex.quote(str(self))}/{pattern}",
";",
"test",
"-e",
'"$1"',
"&&",
"printf",
'"%s\\0"',
str(self / pattern),
"|",
"xargs",
"-0",
"-I{}",
"sh",
"-c",
'"if [ -e \\"{}\\" ]; then echo \\"{}\\"; fi"',
"|",
"sort",
"'%s\\n'",
'"$@"',
";",
":",
]
result, status = await self.connector.run(
location=self.location, command=command, capture_output=True
Expand Down Expand Up @@ -726,7 +727,7 @@ async def rmtree(self) -> None:
if (inner_path := await self._get_inner_path()) != self:
await inner_path.rmtree()
else:
command = ["rm", "-rf ", self.__str__()]
command = ["rm", "-rf", self.__str__()]
result, status = await self.connector.run(
location=self.location, command=command, capture_output=True
)
Expand Down
Loading
Loading