diff --git a/distributed/tests/test_client.py b/distributed/tests/test_client.py index 3d316cee2d9..d7da1fd613b 100644 --- a/distributed/tests/test_client.py +++ b/distributed/tests/test_client.py @@ -2939,27 +2939,33 @@ async def test_rebalance(c, s, *_): """Test Client.rebalance(). These are just to test the Client wrapper around Scheduler.rebalance(); for more thorough tests on the latter see test_scheduler.py. """ + np = pytest.importorskip("numpy") # We used nannies to have separate processes for each worker a, b = s.workers - # Generate 10 buffers worth 512 MiB total on worker a. This sends its memory + # Generate 100 buffers worth 512 MiB total on worker a. This sends its memory # utilisation slightly above 50% (after counting unmanaged) which is above the # distributed.worker.memory.rebalance.sender-min threshold. - futures = c.map(lambda _: "x" * (2 ** 29 // 10), range(10), workers=[a]) + # NOTE: we use NumPy arrays instead of strings to get zero-copy data transfer, + # which prevents worker memory spikes during the rebalance. We use many small + # pieces of data for the same reason. + futures = c.map( + lambda _: np.full(2 ** 29 // 100, 1, dtype="uint8"), range(100), workers=[a] + ) await wait(futures) # Wait for heartbeats while s.memory.process < 2 ** 29: await asyncio.sleep(0.1) - assert await c.run(lambda dask_worker: len(dask_worker.data)) == {a: 10, b: 0} + assert await c.run(lambda dask_worker: len(dask_worker.data)) == {a: 100, b: 0} await c.rebalance() ndata = await c.run(lambda dask_worker: len(dask_worker.data)) # Allow for some uncertainty as the unmanaged memory is not stable - assert sum(ndata.values()) == 10 - assert 3 <= ndata[a] <= 7 - assert 3 <= ndata[b] <= 7 + assert sum(ndata.values()) == 100 + assert 30 <= ndata[a] <= 70 + assert 30 <= ndata[b] <= 70 @gen_cluster( diff --git a/distributed/tests/test_scheduler.py b/distributed/tests/test_scheduler.py index 5fcc3bb1f15..a0e234a3548 100644 --- a/distributed/tests/test_scheduler.py +++ b/distributed/tests/test_scheduler.py @@ -2676,24 +2676,30 @@ async def assert_ndata(client, by_addr, total=None): config={"distributed.worker.memory.rebalance.sender-min": 0.3}, ) async def test_rebalance(c, s, *_): + np = pytest.importorskip("numpy") # We used nannies to have separate processes for each worker a, b = s.workers - # Generate 10 buffers worth 512 MiB total on worker a. This sends its memory + # Generate 100 buffers worth 512 MiB total on worker a. This sends its memory # utilisation slightly above 50% (after counting unmanaged) which is above the # distributed.worker.memory.rebalance.sender-min threshold. - futures = c.map(lambda _: "x" * (2 ** 29 // 10), range(10), workers=[a]) + # NOTE: we use NumPy arrays instead of strings to get zero-copy data transfer, + # which prevents worker memory spikes during the rebalance. We use many small + # pieces of data for the same reason. + futures = c.map( + lambda _: np.full(2 ** 29 // 100, 1, dtype="uint8"), range(100), workers=[a] + ) await wait(futures) # Wait for heartbeats await assert_memory(s, "process", 512, 1024) - await assert_ndata(c, {a: 10, b: 0}) + await assert_ndata(c, {a: 100, b: 0}) await s.rebalance() # Allow for some uncertainty as the unmanaged memory is not stable - await assert_ndata(c, {a: (3, 7), b: (3, 7)}, total=10) + await assert_ndata(c, {a: (30, 70), b: (30, 70)}, total=100) # rebalance() when there is nothing to do await s.rebalance() - await assert_ndata(c, {a: (3, 7), b: (3, 7)}, total=10) + await assert_ndata(c, {a: (30, 70), b: (30, 70)}, total=100) @gen_cluster( diff --git a/distributed/tests/test_tls_functional.py b/distributed/tests/test_tls_functional.py index afd7ad96e41..b9a96e0eeec 100644 --- a/distributed/tests/test_tls_functional.py +++ b/distributed/tests/test_tls_functional.py @@ -4,6 +4,8 @@ """ import asyncio +import pytest + from distributed import Client, Nanny, Queue, Scheduler, Worker, wait, worker_client from distributed.core import Status from distributed.metrics import time @@ -104,29 +106,35 @@ async def test_nanny(c, s, a, b): config={"distributed.worker.memory.rebalance.sender-min": 0.3}, ) async def test_rebalance(c, s, *_): + np = pytest.importorskip("numpy") # We used nannies to have separate processes for each worker a, b = s.workers assert a.startswith("tls://") - # Generate 10 buffers worth 512 MiB total on worker a. This sends its memory + # Generate 100 buffers worth 512 MiB total on worker a. This sends its memory # utilisation slightly above 50% (after counting unmanaged) which is above the # distributed.worker.memory.rebalance.sender-min threshold. - futures = c.map(lambda _: "x" * (2 ** 29 // 10), range(10), workers=[a]) + # NOTE: we use NumPy arrays instead of strings to get zero-copy data transfer, + # which prevents worker memory spikes during the rebalance. We use many small + # pieces of data for the same reason. + futures = c.map( + lambda _: np.full(2 ** 29 // 100, 1, dtype="uint8"), range(100), workers=[a] + ) await wait(futures) # Wait for heartbeats while s.memory.process < 2 ** 29: await asyncio.sleep(0.1) - assert await c.run(lambda dask_worker: len(dask_worker.data)) == {a: 10, b: 0} + assert await c.run(lambda dask_worker: len(dask_worker.data)) == {a: 100, b: 0} await c.rebalance() ndata = await c.run(lambda dask_worker: len(dask_worker.data)) # Allow for some uncertainty as the unmanaged memory is not stable - assert sum(ndata.values()) == 10 - assert 3 <= ndata[a] <= 7 - assert 3 <= ndata[b] <= 7 + assert sum(ndata.values()) == 100 + assert 30 <= ndata[a] <= 70 + assert 30 <= ndata[b] <= 70 @gen_tls_cluster(client=True, nthreads=[("tls://127.0.0.1", 2)] * 2)