Skip to content

Comments

Add OpenPBS tests to the CI/CD#873

Draft
GlassOfWhiskey wants to merge 1 commit intomasterfrom
add-pbs-tests
Draft

Add OpenPBS tests to the CI/CD#873
GlassOfWhiskey wants to merge 1 commit intomasterfrom
add-pbs-tests

Conversation

@GlassOfWhiskey
Copy link
Member

This commit adds a Docker-based OpenPBS cluster to the set of target architectures for CI tests, ensuring that the PBSConnector is properly tested.

@GlassOfWhiskey GlassOfWhiskey force-pushed the add-pbs-tests branch 2 times, most recently from 7021dbc to b84fa6a Compare November 20, 2025 21:02
@codecov
Copy link

codecov bot commented Nov 20, 2025

❌ 2 Tests Failed:

Tests completed Failed Passed Skipped
1864 2 1862 9
View the top 2 failed test(s) by shortest run time
tests/test_connector.py::test_connector_run_command_fails[openpbs]
Stack Traces | 0.066s run time
curr_connector = <streamflow.deployment.connector.queue_manager.PBSConnector object at 0x7f47a2b4ecf0>
curr_location = <streamflow.core.deployment.ExecutionLocation object at 0x7f47a13e3b50>

    @pytest.mark.asyncio
    async def test_connector_run_command_fails(
        curr_connector: Connector, curr_location: ExecutionLocation
    ) -> None:
        """Test connector run method on a job with an invalid command"""
>       _, returncode = await curr_connector.run(
            curr_location, ["ls -2"], capture_output=True, job_name="job_test"
        )

tests/test_connector.py:86: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
.../deployment/connector/queue_manager.py:603: in run
    job_id = await self._run_batch_command(
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <streamflow.deployment.connector.queue_manager.PBSConnector object at 0x7f47a2b4ecf0>
command = '#!/bin/sh\n\necho bHMgLTIgMj4mMQ== | base64 -d | sh'
environment = {}, job_name = 'job_test'
location = <streamflow.core.deployment.ExecutionLocation object at 0x7f47a13e3b50>
workdir = None, stdin = None, stdout = 'qsub: Bad UID for job execution'
stderr = -2, timeout = None

    async def _run_batch_command(
        self,
        command: str,
        environment: MutableMapping[str, str] | None,
        job_name: str,
        location: ExecutionLocation,
        workdir: str | None = None,
        stdin: int | str | None = None,
        stdout: int | str = asyncio.subprocess.STDOUT,
        stderr: int | str = asyncio.subprocess.STDOUT,
        timeout: int | None = None,
    ) -> str:
        batch_command = ["sh", "-c"]
        if workdir is not None:
            batch_command.extend(["cd", workdir, "&&"])
        resources = (
            {"walltime": utils.format_seconds_to_hhmmss(timeout)} if timeout else {}
        )
        batch_command.extend(
            [
                "echo",
                base64.b64encode(command.encode("utf-8")).decode("utf-8"),
                "|",
                "base64",
                "-d",
                "|",
            ]
        )
        if environment:
            batch_command.extend([f"{k}={v}" for k, v in environment.items()])
        batch_command.extend(
            [
                "qsub",
                get_option(
                    "o",
                    (
                        stdout
                        if stdout != asyncio.subprocess.STDOUT
                        else utils.random_name()
                    ),
                ),
            ]
        )
        if stdin is not None and stdin != asyncio.subprocess.DEVNULL:
            batch_command.append(get_option("i", stdin))
        if stderr != asyncio.subprocess.STDOUT and stderr != stdout:
            batch_command.append(get_option("e", self._format_stream(stderr)))
        if stderr == stdout:
            batch_command.append(get_option("j", "oe"))
        if service := cast(PBSService, self.services.get(location.service)):
            batch_command.extend(
                [
                    get_option("a", service.begin),
                    get_option("A", service.account),
                    get_option("c", service.checkpoint),
                    get_option("C", service.prefix),
                    get_option("m", service.mailOptions),
                    get_option("N", service.jobName),
                    get_option("p", service.priority),
                    get_option("q", service.destination),
                    get_option("r", "y" if service.rerunnable else "n"),
                    get_option("S", service.shellList),
                    get_option("u", service.userList),
                    get_option("v", service.variableList),
                    get_option("V", service.exportAllVariables),
                    get_option(
                        "W",
                        (
                            ",".join(
                                [
                                    f"{k}={v}"
                                    for k, v in service.additionalAttributes.items()
                                ]
                            )
                            if service.additionalAttributes
                            else None
                        ),
                    ),
                ]
            )
        if resources := cast(dict[str, str], service.resources) | resources:
            batch_command.append(
                get_option("l", ",".join([f"{k}={v}" for k, v in resources.items()]))
            )
        batch_command.append("-")
        stdout, returncode = await super().run(
            location=location, command=batch_command, capture_output=True
        )
        if returncode == 0:
            return stdout.strip()
        else:
>           raise WorkflowExecutionException(
                f"Error submitting job {job_name} to PBS: {stdout.strip()}"
            )
E           streamflow.core.exception.WorkflowExecutionException: Error submitting job job_test to PBS: qsub: Bad UID for job execution

.../deployment/connector/queue_manager.py:1078: WorkflowExecutionException
tests/test_connector.py::test_connector_run_command[openpbs]
Stack Traces | 50.5s run time
context = <streamflow.core.context.StreamFlowContext object at 0x7f47a1a47620>
curr_connector = <streamflow.deployment.connector.queue_manager.PBSConnector object at 0x7f47a2b4ecf0>
curr_location = <streamflow.core.deployment.ExecutionLocation object at 0x7f47a13e3b50>

    @pytest.mark.asyncio
    async def test_connector_run_command(
        context: StreamFlowContext,
        curr_connector: Connector,
        curr_location: ExecutionLocation,
    ) -> None:
        """Test connector run method"""
>       _, returncode = await curr_connector.run(
            location=curr_location, command=["ls"], capture_output=True, job_name="job_test"
        )

tests/test_connector.py:75: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
.../deployment/connector/queue_manager.py:603: in run
    job_id = await self._run_batch_command(
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <streamflow.deployment.connector.queue_manager.PBSConnector object at 0x7f47a2b4ecf0>
command = '#!/bin/sh\n\necho bHMgMj4mMQ== | base64 -d | sh', environment = {}
job_name = 'job_test'
location = <streamflow.core.deployment.ExecutionLocation object at 0x7f47a13e3b50>
workdir = None, stdin = None, stdout = 'qsub: Bad UID for job execution'
stderr = -2, timeout = None

    async def _run_batch_command(
        self,
        command: str,
        environment: MutableMapping[str, str] | None,
        job_name: str,
        location: ExecutionLocation,
        workdir: str | None = None,
        stdin: int | str | None = None,
        stdout: int | str = asyncio.subprocess.STDOUT,
        stderr: int | str = asyncio.subprocess.STDOUT,
        timeout: int | None = None,
    ) -> str:
        batch_command = ["sh", "-c"]
        if workdir is not None:
            batch_command.extend(["cd", workdir, "&&"])
        resources = (
            {"walltime": utils.format_seconds_to_hhmmss(timeout)} if timeout else {}
        )
        batch_command.extend(
            [
                "echo",
                base64.b64encode(command.encode("utf-8")).decode("utf-8"),
                "|",
                "base64",
                "-d",
                "|",
            ]
        )
        if environment:
            batch_command.extend([f"{k}={v}" for k, v in environment.items()])
        batch_command.extend(
            [
                "qsub",
                get_option(
                    "o",
                    (
                        stdout
                        if stdout != asyncio.subprocess.STDOUT
                        else utils.random_name()
                    ),
                ),
            ]
        )
        if stdin is not None and stdin != asyncio.subprocess.DEVNULL:
            batch_command.append(get_option("i", stdin))
        if stderr != asyncio.subprocess.STDOUT and stderr != stdout:
            batch_command.append(get_option("e", self._format_stream(stderr)))
        if stderr == stdout:
            batch_command.append(get_option("j", "oe"))
        if service := cast(PBSService, self.services.get(location.service)):
            batch_command.extend(
                [
                    get_option("a", service.begin),
                    get_option("A", service.account),
                    get_option("c", service.checkpoint),
                    get_option("C", service.prefix),
                    get_option("m", service.mailOptions),
                    get_option("N", service.jobName),
                    get_option("p", service.priority),
                    get_option("q", service.destination),
                    get_option("r", "y" if service.rerunnable else "n"),
                    get_option("S", service.shellList),
                    get_option("u", service.userList),
                    get_option("v", service.variableList),
                    get_option("V", service.exportAllVariables),
                    get_option(
                        "W",
                        (
                            ",".join(
                                [
                                    f"{k}={v}"
                                    for k, v in service.additionalAttributes.items()
                                ]
                            )
                            if service.additionalAttributes
                            else None
                        ),
                    ),
                ]
            )
        if resources := cast(dict[str, str], service.resources) | resources:
            batch_command.append(
                get_option("l", ",".join([f"{k}={v}" for k, v in resources.items()]))
            )
        batch_command.append("-")
        stdout, returncode = await super().run(
            location=location, command=batch_command, capture_output=True
        )
        if returncode == 0:
            return stdout.strip()
        else:
>           raise WorkflowExecutionException(
                f"Error submitting job {job_name} to PBS: {stdout.strip()}"
            )
E           streamflow.core.exception.WorkflowExecutionException: Error submitting job job_test to PBS: qsub: Bad UID for job execution

.../deployment/connector/queue_manager.py:1078: WorkflowExecutionException

To view more test analytics, go to the Test Analytics Dashboard
📋 Got 3 mins? Take this short survey to help us improve Test Analytics.

@GlassOfWhiskey GlassOfWhiskey force-pushed the add-pbs-tests branch 10 times, most recently from bcfa6ac to 6b012a3 Compare November 22, 2025 22:38
This commit adds a Docker-based OpenPBS cluster to the set of target
architectures for CI tests, ensuring that the `PBSConnector` is properly
tested.
@GlassOfWhiskey GlassOfWhiskey force-pushed the master branch 7 times, most recently from b21f362 to e7820f1 Compare January 12, 2026 08:25
@GlassOfWhiskey GlassOfWhiskey force-pushed the master branch 4 times, most recently from 8c05dac to 4a6edc9 Compare January 24, 2026 13:16
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant