diff --git a/asv_bench/benchmarks/dataset_io.py b/asv_bench/benchmarks/dataset_io.py
index 0956be67..38c66d5d 100644
--- a/asv_bench/benchmarks/dataset_io.py
+++ b/asv_bench/benchmarks/dataset_io.py
@@ -761,3 +761,197 @@ class IOReadCustomEngine:
Test with and without dask.
"""
xr.open_dataset(None, engine=self.engine, chunks=chunks)
+
+
+# Zarr I/O benchmarks
+class IOZarr(IOSingleNetCDF):
+ """
+ Benchmarks for reading/writing zarr files with xarray.
+ These benchmarks cover various aspects of zarr operations, including:
+ - Basic I/O
+ - Metadata consolidation
+ - Group operations
+ - Rechunking
+ - Parallel writing and reading
+ - Compression (writing and reading)
+ - Simulated cloud storage operations
+
+ Dependencies:
+ - xarray
+ - zarr
+ - dask
+ - numpy
+
+ - pandas
+ - fsspec (for cloud storage simulation)
+
+ To run these benchmarks:
+ 1. Ensure all dependencies are installed.
+ 2. Use the Airspeed Velocity (asv) tool: `asv run`
+ 3. To run a specific benchmark: `asv run --bench IOZarr`
+
+ Note: These benchmarks may consume significant system resources,
+
+ Note: These benchmarks may consume significant system resources,
+ especially memory and disk space. Ensure you have sufficient
+ resources available before running.
+ """
+
+ def setup(self):
+ """
+ Set up the benchmark environment by creating datasets and zarr stores.
+ """
+ requires_dask()
+ self.make_ds()
+ self.make_large_ds()
+ self.zarr_path = "test_zarr_store"
+ self.large_zarr_path = "test_large_zarr_store"
+
+ def teardown(self):
+ """
+ Clean up zarr stores after benchmarks are complete.
+ """
+ import shutil
+ shutil.rmtree(self.zarr_path, ignore_errors=True)
+ shutil.rmtree(self.large_zarr_path, ignore_errors=True)
+
+ def make_large_ds(self):
+ """
+ Create a larger dataset for more realistic benchmarks.
+ """
+ self.large_ds = xr.Dataset(
+ {
+ "temp": (("time", "lat", "lon"), np.random.rand(1000, 180, 360)),
+ "precip": (("time", "lat", "lon"), np.random.rand(1000, 180, 360)),
+ },
+ coords={
+ "time": pd.date_range("2000-01-01", periods=1000),
+ "lat": np.linspace(-90, 90, 180),
+ "lon": np.linspace(-180, 180, 360),
+ },
+ )
+
+ def time_zarr_consolidate_metadata(self):
+ """Benchmark zarr metadata consolidation."""
+ import zarr
+ # Write the dataset
+
+ self.large_ds.to_zarr(self.large_zarr_path)
+
+ # Consolidate metadata
+ zarr.consolidate_metadata(self.large_zarr_path)
+
+ # Read the consolidated metadata
+ _ = zarr.open_consolidated(self.large_zarr_path)
+
+ def time_zarr_group_operations(self):
+ """Benchmark zarr group operations."""
+ import zarr
+
+ # Create a zarr group
+ root = zarr.open_group(self.zarr_path, mode='w')
+
+ # Create a zarr group
+ root = zarr.open_group(self.zarr_path, mode='w')
+
+ # Write datasets to different groups
+ self.ds.to_zarr(root.create_group('group1'))
+ self.large_ds.to_zarr(root.create_group('group2'))
+
+ # Read from groups
+ _ = xr.open_zarr(root['group1'])
+ _ = xr.open_zarr(root['group2'])
+
+ def time_zarr_rechunk(self):
+ """Benchmark zarr rechunking operations."""
+ import zarr
+ from rechunker import rechunk
+
+ # Write the dataset with initial chunking
+ self.large_ds.chunk({'time': 100, 'lat': 45, 'lon': 90}).to_zarr(self.large_zarr_path)
+
+ # Rechunk the dataset
+ source = zarr.open(self.large_zarr_path)
+ target = zarr.open(f"{self.large_zarr_path}_rechunked", mode='w')
+ rechunked = rechunk(source, target_chunks={'time': 250, 'lat': 30, 'lon': 60}, target=target)
+ rechunked.execute()
+
+ # Read the rechunked dataset
+ _ = xr.open_zarr(f"{self.large_zarr_path}_rechunked")
+
+
+ def time_zarr_parallel_write(self):
+ """Benchmark parallel writing to zarr using dask."""
+ import dask
+
+ # Create a dask array
+ da = dask.array.from_array(self.large_ds['temp'].values, chunks=(100, 45, 90))
+
+ # Create a new dataset with the dask array
+ ds = xr.Dataset({'temp': (('time', 'lat', 'lon'), da)}, coords=self.large_ds.coords)
+
+ # Write to zarr in parallel
+ with dask.config.set(scheduler='threads', num_workers=4):
+ ds.to_zarr(f"{self.zarr_path}_parallel", compute=True)
+
+ def time_zarr_parallel_read(self):
+ """Benchmark parallel reading from zarr using dask."""
+ import dask
+
+ # Write the large dataset to zarr
+ self.large_ds.to_zarr(f"{self.zarr_path}_for_parallel_read")
+
+ # Read from zarr in parallel
+ with dask.config.set(scheduler='threads', num_workers=4):
+ ds = xr.open_zarr(f"{self.zarr_path}_for_parallel_read", chunks={'time': 100, 'lat': 45, 'lon': 90})
+ _ = ds.compute()
+
+ @parameterized(["compressor"], [None, "blosc", "zlib"])
+ def time_zarr_write_with_compression(self, compressor):
+ """Benchmark writing to zarr with different compression settings."""
+ import zarr
+
+ if compressor == "blosc":
+ compressor = zarr.Blosc(cname="zstd", clevel=3, shuffle=2)
+ elif compressor == "zlib":
+ compressor = zarr.Zlib(level=3)
+
+ self.large_ds.to_zarr(f"{self.zarr_path}_compressed_{compressor}", compressor=compressor)
+
+ @parameterized(["compressor"], [None, "blosc", "zlib"])
+ def time_zarr_read_with_compression(self, compressor):
+ """Benchmark reading from zarr with different compression settings."""
+ import zarr
+
+ if compressor == "blosc":
+ compressor = zarr.Blosc(cname="zstd", clevel=3, shuffle=2)
+ elif compressor == "zlib":
+ compressor = zarr.Zlib(level=3)
+
+ # First, write the data with the specified compressor
+ self.large_ds.to_zarr(f"{self.zarr_path}_compressed_{compressor}", compressor=compressor)
+
+ # Then, read it back
+ _ = xr.open_zarr(f"{self.zarr_path}_compressed_{compressor}").load()
+
+ def time_zarr_cloud_storage(self):
+ """Benchmark zarr operations with simulated cloud storage."""
+ try:
+ import fsspec
+ except ImportError:
+ # Skip this benchmark if fsspec is not installed
+ return
+
+ # Create a memory file system to simulate cloud storage
+ fs = fsspec.filesystem("memory")
+
+ # Write to simulated cloud storage
+ self.large_ds.to_zarr(f"memory://{self.zarr_path}_cloud", storage_options={"fo": fs})
+
+ # Read from simulated cloud storage
+ _ = xr.open_zarr(f"memory://{self.zarr_path}_cloud", storage_options={"fo": fs}).load()
+
+# End of IOZarr class
+
+# Additional benchmarks or setup code can be added below this line
+