From 338d0be2e50b5458ef019ec8ce7377bda3097ef1 Mon Sep 17 00:00:00 2001 From: Gabe Joseph Date: Mon, 24 Jan 2022 14:59:59 -0700 Subject: [PATCH 1/5] Defer pandas import on worker in P2P shuffle MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit In #5688, we discovered that #5520 was increasing worker unmanaged memory usage by ~10MiB at startup (on my mac). I suspect that this came from importing pandas. The shuffle extension already worked if pandas wasn't available: it just wouldn't install itself on the worker. However, to check if pandas was available, it was importing pandas—so if pandas _was_ available, every worker would have to spend the time and memory to import it at startup, even if it wasn't used at all. With this PR, all use of pandas is deferred. There's also now an explicit test that importing the shuffle does not import pandas. --- distributed/shuffle/__init__.py | 13 ++------ distributed/shuffle/shuffle.py | 5 ++- distributed/shuffle/shuffle_extension.py | 6 ++-- .../shuffle/tests/test_no_pandas_import.py | 32 +++++++++++++++++++ distributed/worker.py | 7 ++-- 5 files changed, 45 insertions(+), 18 deletions(-) create mode 100644 distributed/shuffle/tests/test_no_pandas_import.py diff --git a/distributed/shuffle/__init__.py b/distributed/shuffle/__init__.py index a431c5bddfd..d530f9b679d 100644 --- a/distributed/shuffle/__init__.py +++ b/distributed/shuffle/__init__.py @@ -1,16 +1,7 @@ -try: - import pandas -except ImportError: - SHUFFLE_AVAILABLE = False -else: - del pandas - SHUFFLE_AVAILABLE = True - - from .shuffle import rearrange_by_column_p2p - from .shuffle_extension import ShuffleId, ShuffleMetadata, ShuffleWorkerExtension +from .shuffle import rearrange_by_column_p2p +from .shuffle_extension import ShuffleId, ShuffleMetadata, ShuffleWorkerExtension __all__ = [ - "SHUFFLE_AVAILABLE", "rearrange_by_column_p2p", "ShuffleId", "ShuffleMetadata", diff --git a/distributed/shuffle/shuffle.py b/distributed/shuffle/shuffle.py index e30ddde746c..c99b4ca8ba6 100644 --- a/distributed/shuffle/shuffle.py +++ b/distributed/shuffle/shuffle.py @@ -3,7 +3,6 @@ from typing import TYPE_CHECKING from dask.base import tokenize -from dask.dataframe import DataFrame from dask.delayed import Delayed, delayed from dask.highlevelgraph import HighLevelGraph @@ -12,6 +11,8 @@ if TYPE_CHECKING: import pandas as pd + from dask.dataframe import DataFrame + def get_ext() -> ShuffleWorkerExtension: from distributed import get_worker @@ -53,6 +54,8 @@ def rearrange_by_column_p2p( column: str, npartitions: int | None = None, ): + from dask.dataframe import DataFrame + npartitions = npartitions or df.npartitions token = tokenize(df, column, npartitions) diff --git a/distributed/shuffle/shuffle_extension.py b/distributed/shuffle/shuffle_extension.py index e5e0baaf7bc..8f13480b91d 100644 --- a/distributed/shuffle/shuffle_extension.py +++ b/distributed/shuffle/shuffle_extension.py @@ -6,12 +6,12 @@ from dataclasses import dataclass from typing import TYPE_CHECKING, NewType -import pandas as pd - from distributed.protocol import to_serialize from distributed.utils import sync if TYPE_CHECKING: + import pandas as pd + from distributed.worker import Worker ShuffleId = NewType("ShuffleId", str) @@ -103,6 +103,8 @@ async def add_partition(self, data: pd.DataFrame) -> None: await asyncio.gather(*tasks) def get_output_partition(self, i: int) -> pd.DataFrame: + import pandas as pd + assert self.transferred, "`get_output_partition` called before barrier task" assert self.metadata.worker_for(i) == self.worker.address, ( diff --git a/distributed/shuffle/tests/test_no_pandas_import.py b/distributed/shuffle/tests/test_no_pandas_import.py new file mode 100644 index 00000000000..a321d273df1 --- /dev/null +++ b/distributed/shuffle/tests/test_no_pandas_import.py @@ -0,0 +1,32 @@ +from __future__ import annotations + +import subprocess +import sys + +import_check_code = """ +import sys + +current_pandas_modules = [m for m in sys.modules if m.startswith("pandas")] +assert ( + not current_pandas_modules +), "pandas is already imported at startup: " + "\\n".join(current_pandas_modules) + +# Make pandas un-importable +sys.modules["pandas"] = None +# "if the value is None, then a ModuleNotFoundError is raised" +# https://docs.python.org/3.6/reference/import.html#the-module-cache + +import distributed.shuffle # noqa: F401 +""" + + +def test_import_no_pandas(): + p = subprocess.run( + [sys.executable], + input=import_check_code, + text=True, + timeout=10, + ) + assert ( + p.returncode == 0 + ), "Importing the shuffle extension without pandas failed. See logs for details." diff --git a/distributed/worker.py b/distributed/worker.py index 72d3c93f5e3..19adedfcf0a 100644 --- a/distributed/worker.py +++ b/distributed/worker.py @@ -44,7 +44,7 @@ typename, ) -from . import comm, preloading, profile, shuffle, system, utils +from . import comm, preloading, profile, system, utils from .batched import BatchedSend from .comm import Comm, connect, get_address_host from .comm.addressing import address_from_user_args, parse_address @@ -67,6 +67,7 @@ from .protocol import pickle, to_serialize from .pubsub import PubSubWorkerExtension from .security import Security +from .shuffle import ShuffleWorkerExtension from .sizeof import safe_sizeof as sizeof from .threadpoolexecutor import ThreadPoolExecutor from .threadpoolexecutor import secede as tpe_secede @@ -115,9 +116,7 @@ # Worker.status subsets RUNNING = {Status.running, Status.paused, Status.closing_gracefully} -DEFAULT_EXTENSIONS: list[type] = [PubSubWorkerExtension] -if shuffle.SHUFFLE_AVAILABLE: - DEFAULT_EXTENSIONS.append(shuffle.ShuffleWorkerExtension) +DEFAULT_EXTENSIONS: list[type] = [PubSubWorkerExtension, ShuffleWorkerExtension] DEFAULT_METRICS: dict[str, Callable[[Worker], Any]] = {} From ce8e9d24f3cb3b1ff2a3c8ebef9d46d657db5aa7 Mon Sep 17 00:00:00 2001 From: Gabe Joseph Date: Thu, 27 Jan 2022 18:22:16 -0700 Subject: [PATCH 2/5] Test unnecessary imports in nanny tests xfail'd until https://github.com/dask/distributed/pull/5724 --- .../shuffle/tests/test_no_pandas_import.py | 32 ------------------- distributed/tests/test_nanny.py | 19 +++++++++++ 2 files changed, 19 insertions(+), 32 deletions(-) delete mode 100644 distributed/shuffle/tests/test_no_pandas_import.py diff --git a/distributed/shuffle/tests/test_no_pandas_import.py b/distributed/shuffle/tests/test_no_pandas_import.py deleted file mode 100644 index a321d273df1..00000000000 --- a/distributed/shuffle/tests/test_no_pandas_import.py +++ /dev/null @@ -1,32 +0,0 @@ -from __future__ import annotations - -import subprocess -import sys - -import_check_code = """ -import sys - -current_pandas_modules = [m for m in sys.modules if m.startswith("pandas")] -assert ( - not current_pandas_modules -), "pandas is already imported at startup: " + "\\n".join(current_pandas_modules) - -# Make pandas un-importable -sys.modules["pandas"] = None -# "if the value is None, then a ModuleNotFoundError is raised" -# https://docs.python.org/3.6/reference/import.html#the-module-cache - -import distributed.shuffle # noqa: F401 -""" - - -def test_import_no_pandas(): - p = subprocess.run( - [sys.executable], - input=import_check_code, - text=True, - timeout=10, - ) - assert ( - p.returncode == 0 - ), "Importing the shuffle extension without pandas failed. See logs for details." diff --git a/distributed/tests/test_nanny.py b/distributed/tests/test_nanny.py index dc3eba86e58..85b30204090 100644 --- a/distributed/tests/test_nanny.py +++ b/distributed/tests/test_nanny.py @@ -4,6 +4,7 @@ import multiprocessing as mp import os import random +import sys from contextlib import suppress from time import sleep from unittest import mock @@ -610,3 +611,21 @@ async def test_environ_plugin(c, s, a, b): assert results[a.worker_address] == "123" assert results[b.worker_address] == "123" assert results[n.worker_address] == "123" + + +@pytest.mark.xfail(reason="https://github.com/dask/distributed/issues/5723") +@gen_cluster(client=True, Worker=Nanny, nthreads=[("", 1)]) +async def test_no_unnecessary_imports_on_worker(c, s, a): + """ + Regression test against accidentally importing unnecessary modules at worker startup. + + Importing modules like pandas slows down worker startup, especially if workers are + loading their software environment from NFS or other non-local filesystems. + It also slightly increases memory footprint. + """ + + def assert_no_pandas(dask_worker): + assert "pandas" not in sys.modules + + await c.wait_for_workers(1) + await c.run(assert_no_pandas) From 6a6fc6473dc65d6f8cb0490bc31503c8a97ddc6f Mon Sep 17 00:00:00 2001 From: crusaderky Date: Fri, 28 Jan 2022 11:00:09 +0000 Subject: [PATCH 3/5] Update distributed/tests/test_nanny.py --- distributed/tests/test_nanny.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/distributed/tests/test_nanny.py b/distributed/tests/test_nanny.py index 85b30204090..8581441ef61 100644 --- a/distributed/tests/test_nanny.py +++ b/distributed/tests/test_nanny.py @@ -614,8 +614,9 @@ async def test_environ_plugin(c, s, a, b): @pytest.mark.xfail(reason="https://github.com/dask/distributed/issues/5723") +@pytest.mark.parametrize("modname", ["numpy", "pandas"]) @gen_cluster(client=True, Worker=Nanny, nthreads=[("", 1)]) -async def test_no_unnecessary_imports_on_worker(c, s, a): +async def test_no_unnecessary_imports_on_worker(c, s, a, modname): """ Regression test against accidentally importing unnecessary modules at worker startup. From 31c33c6414d1806098687e5680b2d618ef3486d4 Mon Sep 17 00:00:00 2001 From: crusaderky Date: Fri, 28 Jan 2022 11:00:15 +0000 Subject: [PATCH 4/5] Update distributed/tests/test_nanny.py --- distributed/tests/test_nanny.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/distributed/tests/test_nanny.py b/distributed/tests/test_nanny.py index 8581441ef61..5dd7ff21c45 100644 --- a/distributed/tests/test_nanny.py +++ b/distributed/tests/test_nanny.py @@ -625,8 +625,8 @@ async def test_no_unnecessary_imports_on_worker(c, s, a, modname): It also slightly increases memory footprint. """ - def assert_no_pandas(dask_worker): - assert "pandas" not in sys.modules + def assert_no_import(dask_worker): + assert modname not in sys.modules await c.wait_for_workers(1) - await c.run(assert_no_pandas) + await c.run(assert_no_import) From 9d0cd6537f05d26b63d359f7147f9c7130c064bb Mon Sep 17 00:00:00 2001 From: crusaderky Date: Fri, 28 Jan 2022 16:12:13 +0000 Subject: [PATCH 5/5] Update distributed/tests/test_nanny.py --- distributed/tests/test_nanny.py | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/distributed/tests/test_nanny.py b/distributed/tests/test_nanny.py index 5dd7ff21c45..6dcf72c5214 100644 --- a/distributed/tests/test_nanny.py +++ b/distributed/tests/test_nanny.py @@ -613,8 +613,17 @@ async def test_environ_plugin(c, s, a, b): assert results[n.worker_address] == "123" -@pytest.mark.xfail(reason="https://github.com/dask/distributed/issues/5723") -@pytest.mark.parametrize("modname", ["numpy", "pandas"]) +@pytest.mark.parametrize( + "modname", + [ + pytest.param( + "numpy", + marks=pytest.mark.xfail(reason="distributed#5723, distributed#5729"), + ), + "scipy", + pytest.param("pandas", marks=pytest.mark.xfail(reason="distributed#5723")), + ], +) @gen_cluster(client=True, Worker=Nanny, nthreads=[("", 1)]) async def test_no_unnecessary_imports_on_worker(c, s, a, modname): """