Skip to content

distributed.comm.tests.test_ucx_config.test_ucx_config_w_env_var flaky #5229

@fjetter

Description

@fjetter

I noticed the UCX test distributed.comm.tests.test_ucx_config.test_ucx_config_w_env_var to occasionally fail

E.g. https://gpuci.gpuopenanalytics.com/job/dask/job/distributed/job/prb/job/distributed-prb/234/CUDA_VER=11.2,LINUX_VER=ubuntu18.04,PYTHON_VER=3.8,RAPIDS_VER=21.10/testReport/junit/distributed.comm.tests/test_ucx_config/test_ucx_config_w_env_var/

Traceback
self = <Client: scheduler='ucx://172.17.0.3:13339'>

    async def _handle_report(self):
        """Listen to scheduler"""
        with log_errors():
            try:
                while True:
                    if self.scheduler_comm is None:
                        break
                    try:
>                       msgs = await self.scheduler_comm.comm.read()

distributed/client.py:1222: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <UCX Client->Scheduler local=None remote=ucx://172.17.0.3:13339>
deserializers = ('cuda', 'dask', 'pickle', 'error')

    async def read(self, deserializers=("cuda", "dask", "pickle", "error")):
        with log_errors():
            if self.closed():
                raise CommClosedError("Endpoint is closed -- unable to read message")
    
            if deserializers is None:
                deserializers = ("cuda", "dask", "pickle", "error")
    
            try:
                # Recv meta data
    
                # Recv close flag and number of frames (_Bool, int64)
                msg = host_array(struct.calcsize("?Q"))
                await self.ep.recv(msg)
                (shutdown, nframes) = struct.unpack("?Q", msg)
    
                if shutdown:  # The writer is closing the connection
>                   raise CommClosedError("Connection closed by writer")
E                   distributed.comm.core.CommClosedError: Connection closed by writer

distributed/comm/ucx.py:255: CommClosedError

During handling of the above exception, another exception occurred:

cleanup = None
loop = <tornado.platform.asyncio.AsyncIOLoop object at 0x7f872c175550>
monkeypatch = <_pytest.monkeypatch.MonkeyPatch object at 0x7f87b936b2e0>

    def test_ucx_config_w_env_var(cleanup, loop, monkeypatch):
        size = "1000.00 MB"
        monkeypatch.setenv("DASK_RMM__POOL_SIZE", size)
    
        dask.config.refresh()
    
        port = "13339"
        sched_addr = f"ucx://{HOST}:{port}"
    
        with popen(
            ["dask-scheduler", "--no-dashboard", "--protocol", "ucx", "--port", port]
        ) as sched:
            with popen(
                [
                    "dask-worker",
                    sched_addr,
                    "--no-dashboard",
                    "--protocol",
                    "ucx",
                    "--no-nanny",
                ]
            ) as w:
                with Client(sched_addr, loop=loop, timeout=10) as c:
                    while not c.scheduler_info()["workers"]:
                        sleep(0.1)
    
                    # Check for RMM pool resource type
                    rmm_resource = c.run_on_scheduler(
                        rmm.mr.get_current_device_resource_type
                    )
                    assert rmm_resource == rmm.mr.PoolMemoryResource
    
                    worker_addr = list(c.scheduler_info()["workers"])[0]
                    worker_rmm_usage = c.run(rmm.mr.get_current_device_resource_type)
                    rmm_resource = worker_rmm_usage[worker_addr]
>                   assert rmm_resource == rmm.mr.PoolMemoryResource

distributed/comm/tests/test_ucx_config.py:118: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
distributed/client.py:1187: in __exit__
    self.close()
distributed/client.py:1430: in close
    sync(self.loop, self._close, fast=True, callback_timeout=timeout)
distributed/utils.py:325: in sync
    raise exc.with_traceback(tb)
distributed/utils.py:308: in f
    result[0] = yield future
/opt/conda/envs/dask/lib/python3.8/site-packages/tornado/gen.py:762: in run
    value = future.result()
/opt/conda/envs/dask/lib/python3.8/asyncio/tasks.py:494: in wait_for
    return fut.result()
distributed/client.py:1339: in _close
    await asyncio.wait_for(
/opt/conda/envs/dask/lib/python3.8/asyncio/tasks.py:494: in wait_for
    return fut.result()
distributed/client.py:1228: in _handle_report
    await self._reconnect()
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <Client: scheduler='ucx://172.17.0.3:13339'>

    async def _reconnect(self):
        with log_errors():
>           assert self.scheduler_comm.comm.closed()
E           AssertionError

distributed/client.py:1053: AssertionError

cc @dask/gpu

Metadata

Metadata

Assignees

No one assigned

    Labels

    flaky testIntermittent failures on CI.

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions