diff --git a/distributed/active_memory_manager.py b/distributed/active_memory_manager.py index 22f904ddf96..ee2d2e21416 100644 --- a/distributed/active_memory_manager.py +++ b/distributed/active_memory_manager.py @@ -122,7 +122,7 @@ def add_policy(self, policy: ActiveMemoryManagerPolicy) -> None: def run_once(self) -> None: """Run all policies once and asynchronously (fire and forget) enact their - recommendations to replicate/drop keys + recommendations to replicate/drop tasks """ with log_errors(): # This should never fail since this is a synchronous method @@ -198,6 +198,9 @@ def _find_recipient( drops), or None if no eligible candidates are available. """ if ts.state != "memory": + logger.debug( + "(replicate, %s, %s) rejected: ts.state = %s", ts, candidates, ts.state + ) return None if candidates is None: candidates = self.scheduler.running.copy() @@ -207,6 +210,9 @@ def _find_recipient( candidates -= ts.who_has candidates -= pending_repl if not candidates: + logger.debug( + "(replicate, %s, %s) rejected: no valid candidates", ts, candidates + ) return None # Select candidate with the lowest memory usage @@ -230,6 +236,9 @@ def _find_dropper( drops), or None if no eligible candidates are available. """ if len(ts.who_has) - len(pending_drop) < 2: + logger.debug( + "(drop, %s, %s) rejected: less than 2 replicas exist", ts, candidates + ) return None if candidates is None: candidates = ts.who_has.copy() @@ -238,6 +247,7 @@ def _find_dropper( candidates -= pending_drop candidates -= {waiter_ts.processing_on for waiter_ts in ts.waiters} if not candidates: + logger.debug("(drop, %s, %s) rejected: no valid candidates", ts, candidates) return None # Select candidate with the highest memory usage. diff --git a/distributed/tests/test_active_memory_manager.py b/distributed/tests/test_active_memory_manager.py index 80942cf0c19..d742850f87c 100644 --- a/distributed/tests/test_active_memory_manager.py +++ b/distributed/tests/test_active_memory_manager.py @@ -70,7 +70,7 @@ async def test_no_policies(c, s, a, b): s.extensions["amm"].run_once() -@gen_cluster(nthreads=[("", 1)] * 4, client=True, config=demo_config("drop")) +@gen_cluster(nthreads=[("", 1)] * 4, client=True, config=demo_config("drop", n=5)) async def test_drop(c, s, *workers): with captured_amm_logger() as logs: s.extensions["amm"].run_once() @@ -82,7 +82,11 @@ async def test_drop(c, s, *workers): # Also test the extension handler with captured_amm_logger() as logs: s.extensions["amm"].run_once() - assert logs.getvalue() == "Enacting suggestions for 1 tasks\n" + assert logs.getvalue() == ( + "(drop, , None) rejected: less than 2 replicas exist\n" + "(drop, , None) rejected: less than 2 replicas exist\n" + "Enacting suggestions for 1 tasks\n" + ) while len(s.tasks["x"].who_has) > 1: await asyncio.sleep(0.01) # The last copy is never dropped even if the policy asks so @@ -570,7 +574,7 @@ def run(self): self.manager.policies.remove(self) -async def _tensordot_stress(c): +async def tensordot_stress(c): da = pytest.importorskip("dask.array") rng = da.random.RandomState(0) @@ -580,7 +584,7 @@ async def _tensordot_stress(c): @pytest.mark.slow -@pytest.mark.xfail(reason="https://github.com/dask/distributed/issues/5371") +@pytest.mark.avoid_ci(reason="distributed#5371") @gen_cluster( client=True, nthreads=[("", 1)] * 4, @@ -592,7 +596,6 @@ async def _tensordot_stress(c): {"class": "distributed.tests.test_active_memory_manager.DropEverything"}, ], }, - timeout=120, ) async def test_drop_stress(c, s, *nannies): """A policy which suggests dropping everything won't break a running computation, @@ -600,11 +603,11 @@ async def test_drop_stress(c, s, *nannies): See also: test_ReduceReplicas_stress """ - await _tensordot_stress(c) + await tensordot_stress(c) @pytest.mark.slow -@pytest.mark.xfail(reason="https://github.com/dask/distributed/issues/5371") +@pytest.mark.avoid_ci(reason="distributed#5371") @gen_cluster( client=True, nthreads=[("", 1)] * 4, @@ -616,11 +619,10 @@ async def test_drop_stress(c, s, *nannies): {"class": "distributed.active_memory_manager.ReduceReplicas"}, ], }, - timeout=120, ) async def test_ReduceReplicas_stress(c, s, *nannies): """Running ReduceReplicas compulsively won't break a running computation. Unlike test_drop_stress above, this test does not stop running after a few seconds - the policy must not disrupt the computation too much. """ - await _tensordot_stress(c) + await tensordot_stress(c) diff --git a/distributed/tests/test_worker.py b/distributed/tests/test_worker.py index a2f43e83e55..4b938854f49 100644 --- a/distributed/tests/test_worker.py +++ b/distributed/tests/test_worker.py @@ -1568,17 +1568,18 @@ async def test_worker_listens_on_same_interface_by_default(cleanup, Worker): @gen_cluster(client=True) async def test_close_gracefully(c, s, a, b): futures = c.map(slowinc, range(200), delay=0.1) - while not b.data: - await asyncio.sleep(0.1) + while not b.data: + await asyncio.sleep(0.01) mem = set(b.data) - proc = [ts for ts in b.tasks.values() if ts.state == "executing"] + proc = {ts for ts in b.tasks.values() if ts.state == "executing"} + assert proc await b.close_gracefully() assert b.status == Status.closed assert b.address not in s.workers - assert mem.issubset(set(a.data)) + assert mem.issubset(a.data.keys()) for ts in proc: assert ts.state in ("executing", "memory") diff --git a/distributed/worker.py b/distributed/worker.py index 2d02454598d..90904a28472 100644 --- a/distributed/worker.py +++ b/distributed/worker.py @@ -1847,7 +1847,7 @@ def handle_remove_replicas(self, keys, stimulus_id): {"op": "add-keys", "keys": rejected, "stimulus_id": stimulus_id} ) - self.transitions(recommendations=recommendations, stimulus_id=stimulus_id) + self.transitions(recommendations, stimulus_id=stimulus_id) return "OK" @@ -1889,7 +1889,6 @@ def handle_acquire_replicas( self, comm=None, keys=None, priorities=None, who_has=None, stimulus_id=None ): recommendations = {} - scheduler_msgs = [] for k in keys: ts = self.ensure_task_exists( k, @@ -1900,9 +1899,6 @@ def handle_acquire_replicas( recommendations[ts] = "fetch" self.update_who_has(who_has, stimulus_id=stimulus_id) - - for msg in scheduler_msgs: - self.batched_stream.send(msg) self.transitions(recommendations, stimulus_id=stimulus_id) def ensure_task_exists( @@ -2680,7 +2676,7 @@ def ensure_communicating(self): self.comm_nbytes += total_nbytes self.in_flight_workers[worker] = to_gather recommendations = {self.tasks[d]: ("flight", worker) for d in to_gather} - self.transitions(recommendations=recommendations, stimulus_id=stimulus_id) + self.transitions(recommendations, stimulus_id=stimulus_id) self.loop.add_callback( self.gather_dep, @@ -3030,9 +3026,7 @@ async def gather_dep( ) recommendations[ts] = "fetch" del data, response - self.transitions( - recommendations=recommendations, stimulus_id=stimulus_id - ) + self.transitions(recommendations, stimulus_id=stimulus_id) self.ensure_computing() if not busy: @@ -3104,7 +3098,7 @@ def update_who_has(self, who_has, *, stimulus_id): self.has_what[worker].add(dep) self.pending_data_per_worker[worker].append(dep_ts.key) - self.transitions(recommendations=recommendations, stimulus_id=stimulus_id) + self.transitions(recommendations, stimulus_id=stimulus_id) except Exception as e: logger.exception(e) if LOG_PDB: