Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions fme/core/test_writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -391,7 +391,14 @@ def test_ZarrWriter_already_initialized(tmp_path):
chunks={"time": 2},
)

def _mock_initialize_zarr(**kwargs):
# The logic of initialize_store requires _initialize_zarr to at least
# create a dummy directory at the path, so we mock that aspect here.
os.makedirs(kwargs["path"])

with patch("fme.core.writer._initialize_zarr") as mock_initialize:
mock_initialize.side_effect = _mock_initialize_zarr

# Initialize the store for the first time
writer.initialize_store(data_dtype="f4", data_vars=["var"])

Expand Down Expand Up @@ -443,3 +450,12 @@ def test_ZarrWriter_initialize_error_when_no_data_vars(tmp_path):

with pytest.raises(ValueError, match="data_vars must be provided"):
writer_no_vars.initialize_store(data_dtype="f4")


def test_ZarrWriter_raises_FileExistsError_when_path_exists_and_mode_w_minus(tmp_path):
"""ZarrWriter raises FileExistsError if path exists and mode is 'w-'."""
path = os.path.join(tmp_path, "test.zarr")
os.makedirs(path)

with pytest.raises(FileExistsError, match="already exists"):
_create_writer(path=path, n_times=4, chunks={"time": 2}, mode="w-")
46 changes: 30 additions & 16 deletions fme/core/writer.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
import logging
import os
import tempfile
from collections.abc import Mapping
from typing import Literal

import cftime
import fsspec
import fsspec.generic
import numpy as np
import xarray as xr
import zarr
Expand Down Expand Up @@ -282,6 +285,11 @@ def __init__(

if mode == "a" or mode == "r+":
self._store_initialized = True if self._path_exists() else False
elif self._dist.is_root() and mode == "w-" and self._path_exists():
raise FileExistsError(
f"Path {self._path!r} already exists and cannot be overwritten "
f"since mode is {mode!r}."
)
else:
self._store_initialized = False

Expand Down Expand Up @@ -365,22 +373,28 @@ def initialize_store(
"data_vars must be provided either to ZarrWriter or to "
"initialize()"
)
_initialize_zarr(
path=self._path,
vars=data_vars,
dim_sizes=dim_sizes,
chunks=self._chunks,
shards=self._shards,
coords=self._coords,
dim_names=self._dims,
dtype=data_dtype,
time_units=self._time_units,
time_calendar=self._time_calendar,
nondim_coords=self._nondim_coords,
array_attributes=self._array_attributes,
group_attributes=self._group_attributes,
mode=self._mode,
)
# Initialize zarr store locally in a temporary directory and then
# immediately rsync it to the final path. This avoids the need for
# users to have delete access on the destination filesystem.
with tempfile.TemporaryDirectory() as temp_dir:
temp_path = os.path.join(temp_dir, "temp.zarr")
_initialize_zarr(
path=temp_path,
vars=data_vars,
dim_sizes=dim_sizes,
chunks=self._chunks,
shards=self._shards,
coords=self._coords,
dim_names=self._dims,
dtype=data_dtype,
time_units=self._time_units,
time_calendar=self._time_calendar,
nondim_coords=self._nondim_coords,
array_attributes=self._array_attributes,
group_attributes=self._group_attributes,
mode=self._mode,
)
fsspec.generic.rsync(temp_path, self._path)
self._store_initialized = True
self._dist.barrier()
else:
Expand Down