Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 12 additions & 6 deletions distributed/tests/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -2939,27 +2939,33 @@ async def test_rebalance(c, s, *_):
"""Test Client.rebalance(). These are just to test the Client wrapper around
Scheduler.rebalance(); for more thorough tests on the latter see test_scheduler.py.
"""
np = pytest.importorskip("numpy")
# We used nannies to have separate processes for each worker
a, b = s.workers

# Generate 10 buffers worth 512 MiB total on worker a. This sends its memory
# Generate 100 buffers worth 512 MiB total on worker a. This sends its memory
# utilisation slightly above 50% (after counting unmanaged) which is above the
# distributed.worker.memory.rebalance.sender-min threshold.
futures = c.map(lambda _: "x" * (2 ** 29 // 10), range(10), workers=[a])
# NOTE: we use NumPy arrays instead of strings to get zero-copy data transfer,
# which prevents worker memory spikes during the rebalance. We use many small
# pieces of data for the same reason.
futures = c.map(
lambda _: np.full(2 ** 29 // 100, 1, dtype="uint8"), range(100), workers=[a]
)
await wait(futures)
# Wait for heartbeats
while s.memory.process < 2 ** 29:
await asyncio.sleep(0.1)

assert await c.run(lambda dask_worker: len(dask_worker.data)) == {a: 10, b: 0}
assert await c.run(lambda dask_worker: len(dask_worker.data)) == {a: 100, b: 0}

await c.rebalance()

ndata = await c.run(lambda dask_worker: len(dask_worker.data))
# Allow for some uncertainty as the unmanaged memory is not stable
assert sum(ndata.values()) == 10
assert 3 <= ndata[a] <= 7
assert 3 <= ndata[b] <= 7
assert sum(ndata.values()) == 100
assert 30 <= ndata[a] <= 70
assert 30 <= ndata[b] <= 70


@gen_cluster(
Expand Down
16 changes: 11 additions & 5 deletions distributed/tests/test_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -2676,24 +2676,30 @@ async def assert_ndata(client, by_addr, total=None):
config={"distributed.worker.memory.rebalance.sender-min": 0.3},
)
async def test_rebalance(c, s, *_):
np = pytest.importorskip("numpy")
# We used nannies to have separate processes for each worker
a, b = s.workers

# Generate 10 buffers worth 512 MiB total on worker a. This sends its memory
# Generate 100 buffers worth 512 MiB total on worker a. This sends its memory
# utilisation slightly above 50% (after counting unmanaged) which is above the
# distributed.worker.memory.rebalance.sender-min threshold.
futures = c.map(lambda _: "x" * (2 ** 29 // 10), range(10), workers=[a])
# NOTE: we use NumPy arrays instead of strings to get zero-copy data transfer,
# which prevents worker memory spikes during the rebalance. We use many small
# pieces of data for the same reason.
futures = c.map(
lambda _: np.full(2 ** 29 // 100, 1, dtype="uint8"), range(100), workers=[a]
)
await wait(futures)
# Wait for heartbeats
await assert_memory(s, "process", 512, 1024)
await assert_ndata(c, {a: 10, b: 0})
await assert_ndata(c, {a: 100, b: 0})
await s.rebalance()
# Allow for some uncertainty as the unmanaged memory is not stable
await assert_ndata(c, {a: (3, 7), b: (3, 7)}, total=10)
await assert_ndata(c, {a: (30, 70), b: (30, 70)}, total=100)

# rebalance() when there is nothing to do
await s.rebalance()
await assert_ndata(c, {a: (3, 7), b: (3, 7)}, total=10)
await assert_ndata(c, {a: (30, 70), b: (30, 70)}, total=100)


@gen_cluster(
Expand Down
20 changes: 14 additions & 6 deletions distributed/tests/test_tls_functional.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
"""
import asyncio

import pytest

from distributed import Client, Nanny, Queue, Scheduler, Worker, wait, worker_client
from distributed.core import Status
from distributed.metrics import time
Expand Down Expand Up @@ -104,29 +106,35 @@ async def test_nanny(c, s, a, b):
config={"distributed.worker.memory.rebalance.sender-min": 0.3},
)
async def test_rebalance(c, s, *_):
np = pytest.importorskip("numpy")
# We used nannies to have separate processes for each worker
a, b = s.workers
assert a.startswith("tls://")

# Generate 10 buffers worth 512 MiB total on worker a. This sends its memory
# Generate 100 buffers worth 512 MiB total on worker a. This sends its memory
# utilisation slightly above 50% (after counting unmanaged) which is above the
# distributed.worker.memory.rebalance.sender-min threshold.
futures = c.map(lambda _: "x" * (2 ** 29 // 10), range(10), workers=[a])
# NOTE: we use NumPy arrays instead of strings to get zero-copy data transfer,
# which prevents worker memory spikes during the rebalance. We use many small
# pieces of data for the same reason.
futures = c.map(
lambda _: np.full(2 ** 29 // 100, 1, dtype="uint8"), range(100), workers=[a]
)
await wait(futures)

# Wait for heartbeats
while s.memory.process < 2 ** 29:
await asyncio.sleep(0.1)

assert await c.run(lambda dask_worker: len(dask_worker.data)) == {a: 10, b: 0}
assert await c.run(lambda dask_worker: len(dask_worker.data)) == {a: 100, b: 0}

await c.rebalance()

ndata = await c.run(lambda dask_worker: len(dask_worker.data))
# Allow for some uncertainty as the unmanaged memory is not stable
assert sum(ndata.values()) == 10
assert 3 <= ndata[a] <= 7
assert 3 <= ndata[b] <= 7
assert sum(ndata.values()) == 100
assert 30 <= ndata[a] <= 70
assert 30 <= ndata[b] <= 70


@gen_tls_cluster(client=True, nthreads=[("tls://127.0.0.1", 2)] * 2)
Expand Down