-
Notifications
You must be signed in to change notification settings - Fork 6
Open
Description
Thanks to @rsignell-usgs's script, I've been playing around with netCDF->Zarr conversion on S3. I am wondering whether there's any throughput data that I can use to make sense of the following measurements I recorded? Or if someone has played with transferring Zarr to S3/GCP in the past, I'd like to know more about this and/or best practices for this kind of task. How to tune Dask cluster to maximize the throughput, etc?
Dask configuration
- 1 worker
- 72 threads per worker
| Data size in (GB) | Chunk size | Transfer time (s) | Throughput (Mb/s) |
|---|---|---|---|
| 5.1 | (1, 1032, 289, 288) | 285.2 | 146 |
| 5.1 | (1, 516, 289, 288) | 309.3 | 135 |
| 5.1 | (1, 258, 289, 288) | 350.7 | 119 |
| 5.1 | (1, 129, 289, 288) | 439.0 | 95 |
Dask configuration
- 2 workers on the same machine
- 72 threads per worker
| Data size in (GB) | Chunk size | Transfer time (s) | Throughput (Mb/s) |
|---|---|---|---|
| 5.1 | (1, 1032, 289, 288) | 16 | 2611 |
| 5.1 | (1, 516, 289, 288) | 18 | 2321 |
| 5.1 | (1, 258, 289, 288) | 28 | 1492 |
| 5.1 | (1, 129, 289, 288) | 47 | 889 |
Here's my script:
import xarray as xr
from pathlib import Path
from dask.distributed import Client
import s3fs
import time
if __name__ == '__main__':
client = Client(processes=False, n_workers=1, threads_per_worker=72)
print(client)
root_dir = Path("/glade/p_old/cesmLE/CESM-CAM5-BGC-LE/atm/proc/tseries/monthly/TS")
CASE = 'b.e11.B20TRC5CNBDRD.f09_g16'
list_1 = sorted(root_dir.glob("b.e11.B20TRC5CNBDRD.f09_g16.???.cam.h0.*"))
# indices of special runs to remove from the original list.
# These runs' outputs have additional variables, and/or have special time ranges
indices = 0, 33, 34
updated_list = [item for index, item in enumerate(list_1) if index not in indices]
dset = xr.open_mfdataset(updated_list, concat_dim='ensemble')
dset = dset.chunk({'ensemble': 1, 'time': 516})
# Output: S3 Bucket
f_zarr = f'zarr-test-bucket/test1/lens/{CASE}'
# write data using xarray.to_zarr()
fs = s3fs.S3FileSystem(anon=False)
d = s3fs.S3Map(f_zarr, s3=fs)
start = time.clock()
dset.to_zarr(store=d, mode='w')
print(f'Time taken = {time.clock()-start}')Metadata
Metadata
Assignees
Labels
No labels