From 0c38d33708a515239dfe1fbd1b541a112bf33095 Mon Sep 17 00:00:00 2001 From: Jerry Jeyachandra Date: Wed, 16 Feb 2022 12:48:19 -0500 Subject: [PATCH 1/9] add requirements.txt --- requirements.txt | 15 +++++++++++++++ 1 file changed, 15 insertions(+) create mode 100644 requirements.txt diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..ae2677a --- /dev/null +++ b/requirements.txt @@ -0,0 +1,15 @@ +apache-airflow==2.1.4 +apache-airflow-providers-ftp==2.0.1 +apache-airflow-providers-http==2.0.1 +apache-airflow-providers-imap==2.0.1 +apache-airflow-providers-sqlite==2.0.1 +apache-airflow-providers-ssh==2.2.0 +coverage==5.3.1 +flake8==3.8.4 +pytest==6.2.5 +pytest-cov==2.11.0 +pytest-forked==1.3.0 +pytest-mock==3.6.1 +pytest-xdist==2.2.0 +Sphinx==3.4.3 +toml==0.10.2 From 080fcfe988482620ed87277735e732a3ff9dd0c9 Mon Sep 17 00:00:00 2001 From: Jerry Jeyachandra Date: Wed, 16 Feb 2022 12:50:02 -0500 Subject: [PATCH 2/9] ENH Add patched variants of DRMAA classes so that integer values are correctly encoded as bytes --- drmaa_patches.py | 42 ++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 42 insertions(+) create mode 100644 drmaa_patches.py diff --git a/drmaa_patches.py b/drmaa_patches.py new file mode 100644 index 0000000..5118f8c --- /dev/null +++ b/drmaa_patches.py @@ -0,0 +1,42 @@ +''' +Patches on DRMAA-python module +''' + +from drmaa import JobTemplate, Session +from drmaa.helpers import Attribute, IntConverter + + +class PatchedIntConverter(): + ''' + Helper class to correctly encode Integer values + as little-endian bytes for Python 3 + ''' + @staticmethod + def to_drmaa(value: int) -> bytes: + return value.to_bytes(8, byteorder="little") + + @staticmethod + def from_drmaa(value: int) -> bytes: + return int.from_bytes(value, byteorder="little") + + +class PatchedJobTemplate(JobTemplate): + def __init__(self): + ''' + Dynamically patch IntConverter attributes + ''' + super(PatchedJobTemplate, self).__init__() + for attr, value in vars(JobTemplate).items(): + if isinstance(value, Attribute): + if value.converter is IntConverter: + setattr(value, "converter", PatchedIntConverter) + + +class PatchedSession(Session): + ''' + Override createJobTemplate method to return + Patched version + ''' + @staticmethod + def createJobTemplate(self) -> PatchedJobTemplate: + return PatchedJobTemplate() From 66ea90c237f6c54b4e7299ddab5d2a20830f23cd Mon Sep 17 00:00:00 2001 From: Jerry Jeyachandra Date: Wed, 16 Feb 2022 12:51:01 -0500 Subject: [PATCH 3/9] adadd adapter class to transform DRM-specific config to DRMAA config --- config_adapters.py | 209 ++++++++++++++++++++++++++++++++++ tests/test_config_adapters.py | 67 +++++++++++ 2 files changed, 276 insertions(+) create mode 100644 config_adapters.py create mode 100644 tests/test_config_adapters.py diff --git a/config_adapters.py b/config_adapters.py new file mode 100644 index 0000000..b887463 --- /dev/null +++ b/config_adapters.py @@ -0,0 +1,209 @@ +""" +Configuration adapters for mapping native specifications from DRM to DRMAA API +""" + +from __future__ import annotations +from typing import (Dict, List, ClassVar, Union, Any, Optional, TYPE_CHECKING) + +from dataclasses import dataclass, asdict, fields, InitVar +from abc import ABC, abstractmethod + +if TYPE_CHECKING: + from drmaa import JobTemplate + +# DRMAA specific fields, anything else should be put into native spec +DRMAA_FIELDS = [ + "email", "deadlineTime", "errorPath", "hardRunDurationLimit", + "hardWallclockTimeLimit", "inputPath", "outputPath", "jobCategory", + "jobName", "outputPath", "workingDirectory", "transferFiles", + "remoteCommand", "args", "jobName", "jobCategory", "blockEmail" +] + + +@dataclass +class DRMAACompatible(ABC): + ''' + Abstract dataclass for mapping DRM specific configuration to a + DRMAA compatible specification + + Properties: + _mapped_fields: List of DRM specific keys to re-map onto + the DRMAA specification if used. Preferably users will + use the DRMAA variant of these specifications rather than + the corresponding native specification + ''' + + _mapped_fields: ClassVar[Dict[str, Any]] + + def __str__(self): + ''' + Display formatted configuration for executor + ''' + attrs = asdict(self) + drmaa_fields = "\n".join([ + f"{field}:\t{attrs.get(field)}" for field in DRMAA_FIELDS + if attrs.get(field) is not None + ]) + + drm_fields = "\n".join([ + f"{field}:\t{attrs.get(field)}" for field in self._native_fields() + if attrs.get(field) is not None + ]) + + return ("DRMAA Config:\n" + drmaa_fields + "\nNative Specification\n" + + drm_fields) + + def get_drmaa_config(self, jt: JobTemplate) -> JobTemplate: + ''' + Apply settings onto DRMAA JobTemplate + ''' + + for field in DRMAA_FIELDS: + value = getattr(self, field, None) + if value is not None: + setattr(jt, field, value) + + jt.nativeSpecification = self.drm2drmaa() + return jt + + @abstractmethod + def drm2drmaa(self) -> str: + ''' + Build native specification from DRM-specific fields + ''' + + def _map_fields(self, **drm_kwargs: Dict[str, Any]): + ''' + Transform fields in `_mapped_fields` to + DRMAA-compliant specification. Adds + DRM-specific attributes to `self` + + Arguments: + drm_kwargs: DRM-specific key-value pairs + ''' + for drm_name, value in drm_kwargs.items(): + try: + drmaa_name = self._mapped_fields[drm_name] + except KeyError: + raise AttributeError( + "Malformed adapter class! Cannot map field" + f"{drm_name} to a DRMAA-compliant field") + + setattr(self, drmaa_name, value) + + def __post_init__(self, **kwargs): + self._map_fields(**kwargs) + + def _native_fields(self): + return [ + f for f in asdict(self).keys() + if (f not in self._mapped_fields.keys()) and ( + f not in DRMAA_FIELDS) + ] + + +@dataclass +class SlurmConfig(DRMAACompatible): + ''' + Transform SLURM resource specification into DRMAA-compliant inputs + + References: + See https://github.com/natefoo/slurm-drmaa for native specification + details + ''' + + _mapped_fields: ClassVar[Dict[str, Any]] = { + "error": "errorPath", + "output": "outputPath", + "job_name": "jobName", + "time": "hardWallclockTimeLimit" + } + + job_name: InitVar[str] + time: InitVar[str] + error: InitVar[str] = None + output: InitVar[str] = None + + account: Optional[str] = None + acctg_freq: Optional[str] = None + comment: Optional[str] = None + constraint: Optional[List] = None + cpus_per_task: Optional[int] = None + contiguous: Optional[bool] = None + dependency: Optional[List] = None + exclusive: Optional[bool] = None + gres: Optional[Union[List[str], str]] = None + no_kill: Optional[bool] = None + licenses: Optional[List[str]] = None + clusters: Optional[Union[List[str], str]] = None + mail_type: Optional[str] = None + mem: Optional[int] = None + mincpus: Optional[int] = None + nodes: Optional[int] = None + ntasks: Optional[int] = None + no_requeue: Optional[bool] = None + ntasks_per_node: Optional[int] = None + partition: Optional[int] = None + qos: Optional[str] = None + requeue: Optional[bool] = None + reservation: Optional[str] = None + share: Optional[bool] = None + tmp: Optional[str] = None + nodelist: Optional[Union[List[str], str]] = None + exclude: Optional[Union[List[str], str]] = None + + def __post_init__(self, job_name, time, error, output): + ''' + Transform Union[List[str]] --> comma-delimited str + In addition map time to seconds + ''' + + super().__post_init__(job_name=job_name, + time=_timestr_to_sec(time), + error=error, + output=output) + + for field in fields(self): + value = getattr(self, field.name) + if field.type == Union[List[str], str] and isinstance(value, list): + setattr(self, field.name, ",".join(value)) + + def drm2drmaa(self) -> str: + return self._transform_attrs() + + def _transform_attrs(self) -> str: + ''' + Remap named attributes to "-" form, excludes renaming + DRMAA-compliant fields (set in __post_init__()) then join + attributes into a nativeSpecification string + ''' + + out = [] + for field in self._native_fields(): + + value = getattr(self, field) + if value is None: + continue + + field_fmtd = field.replace("_", "-") + if isinstance(value, bool): + out.append(f"--{field_fmtd}") + else: + out.append(f"--{field_fmtd}={value}") + return " ".join(out) + + +def _timestr_to_sec(timestr: str) -> int: + ''' + Transform a time-string (D-HH:MM:SS) --> seconds + ''' + + days = 0 + if "-" in timestr: + days, timestr = timestr.split('-') + + seconds = (24 * days) * (60**2) + for exp, unit in enumerate(reversed(timestr.split(":"))): + seconds += int(unit) * (60**exp) + + return seconds diff --git a/tests/test_config_adapters.py b/tests/test_config_adapters.py new file mode 100644 index 0000000..3b3140b --- /dev/null +++ b/tests/test_config_adapters.py @@ -0,0 +1,67 @@ +""" +Tests for config_adapters.py to ensure that mapping +from DRM-specific configuration to DRMAA spec works +correctly +""" + +import pytest +from drmaa_executor_plugin.drmaa_patches import (PatchedJobTemplate as + JobTemplate) +from drmaa_executor_plugin.config_adapters import SlurmConfig + + +@pytest.fixture() +def job_template(): + jt = JobTemplate() + yield jt + jt.delete() + + +def test_slurm_config_transforms_to_drmaa(job_template): + ''' + Check whether SLURM adapter class correctly + transforms SLURM specs to DRMAA attributes + ''' + + error = "TEST_VALUE" + output = "TEST_VALUE" + time = "10:00:00" # must test as seconds + job_name = "FAKE_JOB" + + expected_drmaa_attrs = { + "errorPath": error, + "outputPath": output, + "hardWallclockTimeLimit": 36000, + "jobName": job_name + } + + slurm_config = SlurmConfig(error=error, + output=output, + time=time, + job_name=job_name) + + jt = slurm_config.get_drmaa_config(job_template) + + # Test attributes matches what is expected + for k, v in expected_drmaa_attrs.items(): + assert getattr(jt, k) == v + + +def test_slurm_config_native_spec_transforms_correctly(job_template): + ''' + Test whether scheduler-specific configuration is transformed + into nativeSpecification correctly + ''' + + job_name = "TEST" + time = "1:00" + account = "TEST" + cpus_per_task = 5 + slurm_config = SlurmConfig(job_name=job_name, + time=time, + account=account, + cpus_per_task=cpus_per_task) + + jt = slurm_config.get_drmaa_config(job_template) + for spec in ['account=TEST', 'cpus-per-task=5']: + assert spec in jt.nativeSpecification From 8222de0cfb007bd607596395c06f03ebce5957bc Mon Sep 17 00:00:00 2001 From: Jerry Jeyachandra Date: Wed, 16 Feb 2022 13:04:09 -0500 Subject: [PATCH 4/9] add DRMAAConfig class --- config_adapters.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/config_adapters.py b/config_adapters.py index b887463..1e00853 100644 --- a/config_adapters.py +++ b/config_adapters.py @@ -102,6 +102,12 @@ def _native_fields(self): ] +@dataclass +class DRMAAConfig(DRMAACompatible): + def drm2drmaa(self): + return + + @dataclass class SlurmConfig(DRMAACompatible): ''' From b06b4b28a89467e3cb49e2a21ee8f3a7e76fe290 Mon Sep 17 00:00:00 2001 From: Jerry Jeyachandra Date: Wed, 16 Feb 2022 13:06:52 -0500 Subject: [PATCH 5/9] add docstring to PatchedIntConverter --- drmaa_patches.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/drmaa_patches.py b/drmaa_patches.py index 5118f8c..17416f7 100644 --- a/drmaa_patches.py +++ b/drmaa_patches.py @@ -10,6 +10,11 @@ class PatchedIntConverter(): ''' Helper class to correctly encode Integer values as little-endian bytes for Python 3 + + Info: + The standard IntConverter class attempts to convert + integer values to bytes using `bytes(value)` which + results in a zero'd byte-array of length `value`. ''' @staticmethod def to_drmaa(value: int) -> bytes: @@ -23,7 +28,7 @@ def from_drmaa(value: int) -> bytes: class PatchedJobTemplate(JobTemplate): def __init__(self): ''' - Dynamically patch IntConverter attributes + Dynamically patch attributes using IntConverter ''' super(PatchedJobTemplate, self).__init__() for attr, value in vars(JobTemplate).items(): From 89db9f516f545468fd81e2cdc53ef9b063dfc051 Mon Sep 17 00:00:00 2001 From: Jerry Jeyachandra Date: Wed, 16 Feb 2022 13:33:56 -0500 Subject: [PATCH 6/9] MAINT: make typesafe --- config_adapters.py | 3 ++- drmaa_patches.py | 2 +- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/config_adapters.py b/config_adapters.py index 1e00853..ddefa55 100644 --- a/config_adapters.py +++ b/config_adapters.py @@ -206,7 +206,8 @@ def _timestr_to_sec(timestr: str) -> int: days = 0 if "-" in timestr: - days, timestr = timestr.split('-') + day_str, timestr = timestr.split('-') + days = int(day_str) seconds = (24 * days) * (60**2) for exp, unit in enumerate(reversed(timestr.split(":"))): diff --git a/drmaa_patches.py b/drmaa_patches.py index 17416f7..c63468e 100644 --- a/drmaa_patches.py +++ b/drmaa_patches.py @@ -21,7 +21,7 @@ def to_drmaa(value: int) -> bytes: return value.to_bytes(8, byteorder="little") @staticmethod - def from_drmaa(value: int) -> bytes: + def from_drmaa(value: bytes) -> int: return int.from_bytes(value, byteorder="little") From 43c0d13f2f9a14cf9d5955bd72e145483a0da477 Mon Sep 17 00:00:00 2001 From: Jerry Jeyachandra Date: Wed, 30 Mar 2022 10:28:41 -0400 Subject: [PATCH 7/9] FIX: FIX: remove converter on hardWallclockTimeLimit --- drmaa_patches.py | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/drmaa_patches.py b/drmaa_patches.py index c63468e..3943b8d 100644 --- a/drmaa_patches.py +++ b/drmaa_patches.py @@ -6,6 +6,13 @@ from drmaa.helpers import Attribute, IntConverter +#TODO: Make sure this is actually correct? +# Works for SLURM +CORRECT_TO_STRING = [ + "hardWallclockTimeLimit" +] + + class PatchedIntConverter(): ''' Helper class to correctly encode Integer values @@ -33,7 +40,9 @@ def __init__(self): super(PatchedJobTemplate, self).__init__() for attr, value in vars(JobTemplate).items(): if isinstance(value, Attribute): - if value.converter is IntConverter: + if attr in CORRECT_TO_STRING: + setattr(value, "converter", None) + elif value.converter is IntConverter: setattr(value, "converter", PatchedIntConverter) From 727a6742276e9d4fcb3f2f0579100684fe8d8493 Mon Sep 17 00:00:00 2001 From: Jerry Jeyachandra Date: Wed, 30 Mar 2022 12:30:38 -0400 Subject: [PATCH 8/9] passing tests with simplifications to config adapter internals --- config_adapters.py | 82 ++++++++++++++++++++--------------- tests/test_config_adapters.py | 6 +-- 2 files changed, 49 insertions(+), 39 deletions(-) diff --git a/config_adapters.py b/config_adapters.py index ddefa55..d900053 100644 --- a/config_adapters.py +++ b/config_adapters.py @@ -3,10 +3,11 @@ """ from __future__ import annotations -from typing import (Dict, List, ClassVar, Union, Any, Optional, TYPE_CHECKING) +from typing import (List, ClassVar, Union, Optional, TYPE_CHECKING) from dataclasses import dataclass, asdict, fields, InitVar from abc import ABC, abstractmethod +import re if TYPE_CHECKING: from drmaa import JobTemplate @@ -19,6 +20,8 @@ "remoteCommand", "args", "jobName", "jobCategory", "blockEmail" ] +TIMESTR_VALIDATE = re.compile("^(\\d+:)?[0-9][0-9]:[0-9][0-9]$") + @dataclass class DRMAACompatible(ABC): @@ -33,7 +36,7 @@ class DRMAACompatible(ABC): the corresponding native specification ''' - _mapped_fields: ClassVar[Dict[str, Any]] + _mapped_fields: ClassVar[List[str]] def __str__(self): ''' @@ -72,35 +75,21 @@ def drm2drmaa(self) -> str: Build native specification from DRM-specific fields ''' - def _map_fields(self, **drm_kwargs: Dict[str, Any]): - ''' - Transform fields in `_mapped_fields` to - DRMAA-compliant specification. Adds - DRM-specific attributes to `self` - - Arguments: - drm_kwargs: DRM-specific key-value pairs - ''' - for drm_name, value in drm_kwargs.items(): - try: - drmaa_name = self._mapped_fields[drm_name] - except KeyError: - raise AttributeError( - "Malformed adapter class! Cannot map field" - f"{drm_name} to a DRMAA-compliant field") - - setattr(self, drmaa_name, value) - - def __post_init__(self, **kwargs): - self._map_fields(**kwargs) - def _native_fields(self): return [ f for f in asdict(self).keys() - if (f not in self._mapped_fields.keys()) and ( - f not in DRMAA_FIELDS) + if (f not in self._mapped_fields) and (f not in DRMAA_FIELDS) ] + def set_fields(self, **drmaa_kwargs): + for field, value in drmaa_kwargs.items(): + if field not in DRMAA_FIELDS: + raise AttributeError( + "Malformed adapter class! Cannot map field" + f" {field} to a DRMAA-compliant field") + + setattr(self, field, value) + @dataclass class DRMAAConfig(DRMAACompatible): @@ -118,11 +107,8 @@ class SlurmConfig(DRMAACompatible): details ''' - _mapped_fields: ClassVar[Dict[str, Any]] = { - "error": "errorPath", - "output": "outputPath", - "job_name": "jobName", - "time": "hardWallclockTimeLimit" + _mapped_fields: ClassVar[List[str]] = { + "error", "output", "job_name", "time" } job_name: InitVar[str] @@ -161,13 +147,18 @@ class SlurmConfig(DRMAACompatible): def __post_init__(self, job_name, time, error, output): ''' Transform Union[List[str]] --> comma-delimited str - In addition map time to seconds ''' - super().__post_init__(job_name=job_name, - time=_timestr_to_sec(time), - error=error, - output=output) + _validate_timestr(time, "time") + super().set_fields(jobName=job_name, + hardWallclockTimeLimit=time, + errorPath=error, + outputPath=output) + + self.job_name = job_name + self.time = time + self.error = error + self.output = output for field in fields(self): value = getattr(self, field.name) @@ -214,3 +205,22 @@ def _timestr_to_sec(timestr: str) -> int: seconds += int(unit) * (60**exp) return seconds + + +def _validate_timestr(timestr: str, field_name: str) -> str: + ''' + Validate timestring to make sure it meets + expected format. + ''' + + if not isinstance(timestr, str): + raise TypeError(f"Expected {field_name} to be of type string " + f"but received {type(timestr)}!") + + result = TIMESTR_VALIDATE.match(timestr) + if not result: + raise ValueError(f"Expected {field_name} to be of format " + "X...XX:XX:XX or XX:XX! " + f"but received {timestr}") + + return timestr diff --git a/tests/test_config_adapters.py b/tests/test_config_adapters.py index 3b3140b..6d03df4 100644 --- a/tests/test_config_adapters.py +++ b/tests/test_config_adapters.py @@ -25,13 +25,13 @@ def test_slurm_config_transforms_to_drmaa(job_template): error = "TEST_VALUE" output = "TEST_VALUE" - time = "10:00:00" # must test as seconds + time = "10:00:00" job_name = "FAKE_JOB" expected_drmaa_attrs = { "errorPath": error, "outputPath": output, - "hardWallclockTimeLimit": 36000, + "hardWallclockTimeLimit": "10:00:00", "jobName": job_name } @@ -54,7 +54,7 @@ def test_slurm_config_native_spec_transforms_correctly(job_template): ''' job_name = "TEST" - time = "1:00" + time = "01:00" account = "TEST" cpus_per_task = 5 slurm_config = SlurmConfig(job_name=job_name, From 5490b56844e4c2586844eeebc5bb20ffebcf87a4 Mon Sep 17 00:00:00 2001 From: Jerry Jeyachandra Date: Wed, 30 Mar 2022 12:33:44 -0400 Subject: [PATCH 9/9] add tests for validating timestring --- tests/test_config_adapters.py | 26 ++++++++++++++++++++++++++ 1 file changed, 26 insertions(+) diff --git a/tests/test_config_adapters.py b/tests/test_config_adapters.py index 6d03df4..31969b7 100644 --- a/tests/test_config_adapters.py +++ b/tests/test_config_adapters.py @@ -65,3 +65,29 @@ def test_slurm_config_native_spec_transforms_correctly(job_template): jt = slurm_config.get_drmaa_config(job_template) for spec in ['account=TEST', 'cpus-per-task=5']: assert spec in jt.nativeSpecification + + +def test_invalid_timestr_fails(): + job_name = "TEST" + time = "FAILURE" + account = "TEST" + cpus_per_task = 10 + + with pytest.raises(ValueError): + SlurmConfig(job_name=job_name, + time=time, + account=account, + cpus_per_task=cpus_per_task) + + +def test_timestr_not_string_fails(): + job_name = "TEST" + time = 10 + account = "TEST" + cpus_per_task = 10 + + with pytest.raises(TypeError): + SlurmConfig(job_name=job_name, + time=time, + account=account, + cpus_per_task=cpus_per_task)