From 3357fa34fe5f7c2d867d724ac0c35a3e2e6c669a Mon Sep 17 00:00:00 2001 From: Altay Sansal <13684161+tasansal@users.noreply.github.com> Date: Thu, 18 Dec 2025 09:05:19 -0600 Subject: [PATCH 01/27] suppress Zarr V3 numcodecs warnings --- src/mdio/api/io.py | 18 ++++++++++-------- src/mdio/builder/xarray_builder.py | 5 ++++- src/mdio/core/zarr_io.py | 12 ++++++++++++ 3 files changed, 26 insertions(+), 9 deletions(-) diff --git a/src/mdio/api/io.py b/src/mdio/api/io.py index 2654be315..be7d00702 100644 --- a/src/mdio/api/io.py +++ b/src/mdio/api/io.py @@ -13,6 +13,7 @@ from xarray.backends.writers import to_zarr as xr_to_zarr from mdio.constants import ZarrFormat +from mdio.core.zarr_io import zarr_warnings_suppress_unstable_numcodecs_v3 from mdio.core.zarr_io import zarr_warnings_suppress_unstable_structs_v3 if TYPE_CHECKING: @@ -53,13 +54,14 @@ def open_mdio(input_path: UPath | Path | str, chunks: T_Chunks = None) -> xr_Dat storage_options = _normalize_storage_options(input_path) zarr_format = zarr.config.get("default_zarr_format") - return xr_open_zarr( - input_path.as_posix(), - chunks=chunks, - storage_options=storage_options, - mask_and_scale=zarr_format == ZarrFormat.V3, # off for v2, on for v3 - consolidated=zarr_format == ZarrFormat.V2, # on for v2, off for v3 - ) + with zarr_warnings_suppress_unstable_numcodecs_v3(): + return xr_open_zarr( + input_path.as_posix(), + chunks=chunks, + storage_options=storage_options, + mask_and_scale=zarr_format == ZarrFormat.V3, # off for v2, on for v3 + consolidated=zarr_format == ZarrFormat.V2, # on for v2, off for v3 + ) def to_mdio( # noqa: PLR0913 @@ -90,7 +92,7 @@ def to_mdio( # noqa: PLR0913 storage_options = _normalize_storage_options(output_path) zarr_format = zarr.config.get("default_zarr_format") - with zarr_warnings_suppress_unstable_structs_v3(): + with zarr_warnings_suppress_unstable_structs_v3() and zarr_warnings_suppress_unstable_numcodecs_v3(): xr_to_zarr( dataset, store=output_path.as_posix(), # xarray doesn't like URI when file:// is protocol diff --git a/src/mdio/builder/xarray_builder.py b/src/mdio/builder/xarray_builder.py index 58501cba9..1cb2b9095 100644 --- a/src/mdio/builder/xarray_builder.py +++ b/src/mdio/builder/xarray_builder.py @@ -10,6 +10,7 @@ from zarr.codecs import BloscCodec from mdio.converters.type_converter import to_numpy_dtype +from mdio.core.zarr_io import zarr_warnings_suppress_unstable_numcodecs_v3 try: # zfpy is an optional dependency for ZFP compression @@ -150,7 +151,9 @@ def _compressor_to_encoding( kwargs["mode"] = compressor.mode.int_code if is_v2: return {"compressors": zfpy_ZFPY(**kwargs)} - return {"serializer": zarr_ZFPY(**kwargs), "compressors": None} + with zarr_warnings_suppress_unstable_numcodecs_v3(): + serializer = zarr_ZFPY(**kwargs) + return {"serializer": serializer, "compressors": None} def _get_fill_value(data_type: ScalarType | StructuredType | str) -> any: diff --git a/src/mdio/core/zarr_io.py b/src/mdio/core/zarr_io.py index 844ce761f..c3ecfb556 100644 --- a/src/mdio/core/zarr_io.py +++ b/src/mdio/core/zarr_io.py @@ -7,6 +7,7 @@ from typing import TYPE_CHECKING from zarr.errors import UnstableSpecificationWarning +from zarr.errors import ZarrUserWarning if TYPE_CHECKING: from collections.abc import Generator @@ -21,3 +22,14 @@ def zarr_warnings_suppress_unstable_structs_v3() -> Generator[None, None, None]: yield finally: pass + + +@contextmanager +def zarr_warnings_suppress_unstable_numcodecs_v3() -> Generator[None, None, None]: + """Context manager to suppress Zarr V3 unstable numcodecs warning.""" + warn = r"Numcodecs codecs are not in the Zarr version 3 specification" + warnings.filterwarnings("ignore", message=warn, category=ZarrUserWarning) + try: + yield + finally: + pass From 2001434f581f52ede9957ecdd938b95ce3ace752 Mon Sep 17 00:00:00 2001 From: Altay Sansal <13684161+tasansal@users.noreply.github.com> Date: Thu, 18 Dec 2025 09:06:04 -0600 Subject: [PATCH 02/27] add statistical properties (mean, variance, std) to stats schema --- src/mdio/builder/schemas/v1/stats.py | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/src/mdio/builder/schemas/v1/stats.py b/src/mdio/builder/schemas/v1/stats.py index ccd12fc79..f4ec7d0b8 100644 --- a/src/mdio/builder/schemas/v1/stats.py +++ b/src/mdio/builder/schemas/v1/stats.py @@ -56,3 +56,18 @@ class SummaryStatistics(CamelCaseStrictModel): min: float = Field(..., description="The smallest value in the variable.") max: float = Field(..., description="The largest value in the variable.") histogram: Histogram = Field(..., description="Binned frequency distribution.") + + @property + def mean(self) -> float: + """Returns the mean of the data.""" + return self.sum / self.count + + @property + def variance(self) -> float: + """Returns the variance of the data.""" + return (self.sum_squares / self.count) - (self.mean**2) + + @property + def std(self) -> float: + """Returns the standard deviation of the data.""" + return self.variance**0.5 From 824bbe3e400a9cfde358360fb4a242e970210ff4 Mon Sep 17 00:00:00 2001 From: Altay Sansal <13684161+tasansal@users.noreply.github.com> Date: Thu, 18 Dec 2025 10:51:14 -0600 Subject: [PATCH 03/27] monkey-patch ZFP codec to fix bug temporarily --- src/mdio/optimize/patch.py | 42 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 42 insertions(+) create mode 100644 src/mdio/optimize/patch.py diff --git a/src/mdio/optimize/patch.py b/src/mdio/optimize/patch.py new file mode 100644 index 000000000..e572d335c --- /dev/null +++ b/src/mdio/optimize/patch.py @@ -0,0 +1,42 @@ +"""Dask worker plugins for monkey patching ZFP due to bug. + +We can remove this once the fix is upstreamed: +https://github.com/zarr-developers/numcodecs/issues/812 +https://github.com/zarr-developers/numcodecs/pull/811 +""" + +from __future__ import annotations + +import asyncio +from typing import TYPE_CHECKING + +import numpy as np +from dask.distributed import Worker +from dask.distributed import WorkerPlugin +from numcodecs import blosc +from zarr.codecs import numcodecs + +if TYPE_CHECKING: + from zarr.core.array_spec import ArraySpec + from zarr.core.buffer import Buffer + from zarr.core.buffer import NDBuffer + + +class ZFPY(numcodecs._NumcodecsArrayBytesCodec, codec_name="zfpy"): + """Monkey patch ZFP codec to make input array contiguous before encoding.""" + + async def _encode_single(self, chunk_data: NDBuffer, chunk_spec: ArraySpec) -> Buffer: + chunk_ndarray = chunk_data.as_ndarray_like() + if not chunk_ndarray.flags.c_contiguous: + chunk_ndarray = np.ascontiguousarray(chunk_ndarray) + out = await asyncio.to_thread(self._codec.encode, chunk_ndarray) + return chunk_spec.prototype.buffer.from_bytes(out) + + +class MonkeyPatchZfpDaskPlugin(WorkerPlugin): + """Monkey patch ZFP codec and disable Blosc threading for Dask workers.""" + + def setup(self, worker: Worker) -> None: # noqa: ARG002 + """Monkey patch ZFP codec and disable Blosc threading.""" + numcodecs._codecs.ZFPY = ZFPY + blosc.set_nthreads(1) From 816fe3eaecfab932e790038d34c1227480b8a246 Mon Sep 17 00:00:00 2001 From: Altay Sansal <13684161+tasansal@users.noreply.github.com> Date: Thu, 18 Dec 2025 10:52:46 -0600 Subject: [PATCH 04/27] add ZFP-based access pattern optimization utilities with Dask support --- src/mdio/__init__.py | 8 ++ src/mdio/optimize/__init__.py | 1 + src/mdio/optimize/access_pattern.py | 112 ++++++++++++++++++++++++++++ src/mdio/optimize/common.py | 86 +++++++++++++++++++++ 4 files changed, 207 insertions(+) create mode 100644 src/mdio/optimize/__init__.py create mode 100644 src/mdio/optimize/access_pattern.py create mode 100644 src/mdio/optimize/common.py diff --git a/src/mdio/__init__.py b/src/mdio/__init__.py index 5fed389c8..b01d3608f 100644 --- a/src/mdio/__init__.py +++ b/src/mdio/__init__.py @@ -1,11 +1,16 @@ """MDIO library.""" +from __future__ import annotations + from importlib import metadata from mdio.api.io import open_mdio from mdio.api.io import to_mdio from mdio.converters import mdio_to_segy from mdio.converters import segy_to_mdio +from mdio.optimize.access_pattern import OptimizedAccessPatternConfig +from mdio.optimize.access_pattern import optimize_access_patterns +from mdio.optimize.common import ZfpQuality try: __version__ = metadata.version("multidimio") @@ -19,4 +24,7 @@ "to_mdio", "mdio_to_segy", "segy_to_mdio", + "OptimizedAccessPatternConfig", + "optimize_access_patterns", + "ZfpQuality", ] diff --git a/src/mdio/optimize/__init__.py b/src/mdio/optimize/__init__.py new file mode 100644 index 000000000..2ae26b9eb --- /dev/null +++ b/src/mdio/optimize/__init__.py @@ -0,0 +1 @@ +"""Module for optimizing datasets for various access patterns / LOD etc.""" diff --git a/src/mdio/optimize/access_pattern.py b/src/mdio/optimize/access_pattern.py new file mode 100644 index 000000000..86846e0ed --- /dev/null +++ b/src/mdio/optimize/access_pattern.py @@ -0,0 +1,112 @@ +"""Optimize MDIO seismic datasets for fast access patterns using ZFP compression and Dask. + +This module provides tools to create compressed, rechunked transpose views of seismic data for efficient +access along dataset dimensions. It uses configurable ZFP compression based on data statistics and +supports parallel processing with Dask Distributed. +""" + +from __future__ import annotations + +import logging +from typing import TYPE_CHECKING + +from pydantic import BaseModel +from pydantic import Field +from xarray import Dataset as xr_Dataset + +from mdio import to_mdio +from mdio.builder.schemas.v1.stats import SummaryStatistics +from mdio.optimize.common import apply_zfp_encoding +from mdio.optimize.common import get_or_create_client +from mdio.optimize.common import get_zfp_encoding +from mdio.optimize.patch import MonkeyPatchZfpDaskPlugin + +if TYPE_CHECKING: + from mdio.optimize.common import ZfpQuality + + +logger = logging.getLogger(__name__) + + +class OptimizedAccessPatternConfig(BaseModel): + """Configuration for fast access pattern optimization.""" + + quality: ZfpQuality = Field(..., description="Compression quality.") + access_patterns: dict[str, dict[str, tuple[int, ...]]] = Field(..., description="New variables and chunk sizes.") + processing_chunks: dict[str, int] = Field(..., description="Chunk sizes for processing the original variable.") + + +def optimize_access_patterns( + dataset: xr_Dataset, + config: OptimizedAccessPatternConfig, + n_workers: int = 1, + threads_per_worker: int = 1, +) -> None: + """Optimize MDIO dataset for fast access along dimensions. + + Optimize an MDIO dataset by creating compressed, rechunked views for fast access along + configurable dimensions, then append them to the existing MDIO file. + + This uses ZFP compression with tolerance based on data standard deviation and the provided quality level. + Requires Dask Distributed for parallel execution. It will try to grab the existing distributed.Client + or create its own. Existing Client will be kept running after optimization. + + Args: + dataset: MDIO Dataset containing the seismic data. + config: Configuration object with quality, access patterns, and processing chunks. + n_workers: Number of Dask workers. Default is 1. + threads_per_worker: Threads per Dask worker. Default is 1. + + Raises: + ValueError: If required attrs/stats are missing or the dataset is invalid. + + Examples: + For Post-Stack 3D seismic data, we can optimize the inline, crossline, and depth dimensions. + + >>> from mdio import optimize_access_patterns, OptimizedAccessPatternConfig, ZfpQuality + >>> from mdio import open_mdio + >>> + >>> conf = OptimizedAccessPatternConfig( + >>> quality=MdioZfpQuality.LOW, + >>> access_patterns={ + >>> "fast_inline": {"chunks": (4, 512, 512)}, + >>> "fast_crossline": {"chunks": (512, 4, 512)}, + >>> "fast_time": {"chunks": (512, 512, 4)}, + >>> }, + >>> processing_chunks= {"inline": 512, "crossline": 512, "time": 512} + >>> ) + >>> + >>> ds = open_mdio("/path/to/seismic.mdio") + >>> optimize_access_patterns(ds, conf, n_workers=4) + """ + # Extract and validate key attrs + attrs = dataset.attrs.get("attributes", {}) + var_name = attrs.get("defaultVariableName") + if not var_name: + msg = "Default variable name is missing from dataset attributes." + raise ValueError(msg) + + variable = dataset[var_name] + chunked_var = variable.chunk(**config.processing_chunks, inline_array=True) + + if "statsV1" not in variable.attrs: + msg = "Statistics are missing from data. Std. dev. is required for compression." + raise ValueError(msg) + + stats = SummaryStatistics.model_validate_json(variable.attrs["statsV1"]) + zfp_encoding = get_zfp_encoding(stats, config.quality) + + optimized_variables = {} + for pattern_name, pattern_config in config.access_patterns.items(): + optimized_var = apply_zfp_encoding(chunked_var, pattern_config["chunks"], zfp_encoding) + optimized_var.name = pattern_name + optimized_variables[pattern_name] = optimized_var + + optimized_dataset = xr_Dataset(optimized_variables, attrs=dataset.attrs) + source_path = dataset.encoding["source"] + + with get_or_create_client(n_workers=n_workers, threads_per_worker=threads_per_worker) as client: + client.register_plugin(MonkeyPatchZfpDaskPlugin()) + logger.info("Starting optimization with quality %s.", config.quality.name) + to_mdio(optimized_dataset, source_path, mode="a") + logger.info("Optimization completed successfully.") diff --git a/src/mdio/optimize/common.py b/src/mdio/optimize/common.py new file mode 100644 index 000000000..eb61c377f --- /dev/null +++ b/src/mdio/optimize/common.py @@ -0,0 +1,86 @@ +"""Common optimization utilities.""" + +from __future__ import annotations + +import logging +from contextlib import contextmanager +from enum import Enum +from typing import TYPE_CHECKING +from typing import Any + +from mdio.builder.schemas.compressors import ZFP as MDIO_ZFP +from mdio.builder.schemas.compressors import ZFPMode +from mdio.builder.xarray_builder import _compressor_to_encoding + +if TYPE_CHECKING: + from collections.abc import Generator + + from xarray import DataArray + + from mdio.builder.schemas.v1.stats import SummaryStatistics + + +try: + import distributed +except ImportError: + distributed = None + + +logger = logging.getLogger(__name__) + + +class ZfpQuality(float, Enum): + """Config options for ZFP compression.""" + + VERY_LOW = 6 + LOW = 3 + MEDIUM = 1 + HIGH = 0.1 + VERY_HIGH = 0.01 + ULTRA = 0.001 + + +def get_zfp_encoding( + stats: SummaryStatistics, + quality: ZfpQuality, +) -> dict[str, Any]: + """Compute ZFP encoding based on data statistics and quality level.""" + if stats.std is None or stats.std <= 0: + msg = "Standard deviation must be positive for tolerance calculation." + raise ValueError(msg) + + tolerance = quality.value * stats.std + zfp_schema = MDIO_ZFP(mode=ZFPMode.FIXED_ACCURACY, tolerance=tolerance) + logger.info("Computed ZFP tolerance: %s (quality: %s, std: %s)", tolerance, quality.name, stats.std) + return _compressor_to_encoding(zfp_schema) + + +def apply_zfp_encoding(data_array: DataArray, chunks: tuple[int, ...], zfp_encoding: dict[str, Any]) -> DataArray: + """Apply ZFP encoding and custom chunks to a DataArray copy.""" + # Drop coordinates to avoid re-writing them and avoid rechunking issues in views + data_array = data_array.copy().reset_coords(drop=True) + data_array.encoding.update(zfp_encoding) + data_array.encoding["chunks"] = chunks + return data_array + + +@contextmanager +def get_or_create_client(n_workers: int, threads_per_worker: int) -> Generator[distributed.Client, None, None]: + """Get or create a Dask Distributed Client.""" + if distributed is None: + msg = "The 'distributed' package is required for parallel processing. Install: 'pip install mdio[distributed]'." + raise ImportError(msg) + + created = False + try: + client = distributed.Client.current() + logger.info("Using existing Dask client: %s", client) + except ValueError: + logger.info("No active Dask client found. Creating a new one.") + client = distributed.Client(n_workers=n_workers, threads_per_worker=threads_per_worker) + created = True + try: + yield client + finally: + if created: + client.close() From aaadd142dad6e53194ba5a79741f9c73187d05b6 Mon Sep 17 00:00:00 2001 From: Altay Sansal <13684161+tasansal@users.noreply.github.com> Date: Thu, 18 Dec 2025 11:00:40 -0600 Subject: [PATCH 05/27] refactor ZFP monkey patch to handle optional Dask dependency --- src/mdio/optimize/patch.py | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/src/mdio/optimize/patch.py b/src/mdio/optimize/patch.py index e572d335c..fc06971c9 100644 --- a/src/mdio/optimize/patch.py +++ b/src/mdio/optimize/patch.py @@ -11,8 +11,6 @@ from typing import TYPE_CHECKING import numpy as np -from dask.distributed import Worker -from dask.distributed import WorkerPlugin from numcodecs import blosc from zarr.codecs import numcodecs @@ -22,7 +20,13 @@ from zarr.core.buffer import NDBuffer -class ZFPY(numcodecs._NumcodecsArrayBytesCodec, codec_name="zfpy"): +try: + import distributed +except ImportError: + distributed = None + + +class ZFPY(numcodecs.ZFPY, codec_name="zfpy"): """Monkey patch ZFP codec to make input array contiguous before encoding.""" async def _encode_single(self, chunk_data: NDBuffer, chunk_spec: ArraySpec) -> Buffer: @@ -33,10 +37,10 @@ async def _encode_single(self, chunk_data: NDBuffer, chunk_spec: ArraySpec) -> B return chunk_spec.prototype.buffer.from_bytes(out) -class MonkeyPatchZfpDaskPlugin(WorkerPlugin): +class MonkeyPatchZfpDaskPlugin(distributed.WorkerPlugin): """Monkey patch ZFP codec and disable Blosc threading for Dask workers.""" - def setup(self, worker: Worker) -> None: # noqa: ARG002 + def setup(self, worker: distributed.Worker) -> None: # noqa: ARG002 """Monkey patch ZFP codec and disable Blosc threading.""" numcodecs._codecs.ZFPY = ZFPY blosc.set_nthreads(1) From 3e09631da16603be2edf8c330cd9b4fe51ab3e2a Mon Sep 17 00:00:00 2001 From: Altay Sansal <13684161+tasansal@users.noreply.github.com> Date: Thu, 18 Dec 2025 11:11:07 -0600 Subject: [PATCH 06/27] refactor Dask integration to simplify ZFP plugin setup and import handling --- src/mdio/optimize/access_pattern.py | 2 ++ src/mdio/optimize/patch.py | 19 +++++++++++++------ 2 files changed, 15 insertions(+), 6 deletions(-) diff --git a/src/mdio/optimize/access_pattern.py b/src/mdio/optimize/access_pattern.py index 86846e0ed..485037c9f 100644 --- a/src/mdio/optimize/access_pattern.py +++ b/src/mdio/optimize/access_pattern.py @@ -106,6 +106,8 @@ def optimize_access_patterns( source_path = dataset.encoding["source"] with get_or_create_client(n_workers=n_workers, threads_per_worker=threads_per_worker) as client: + # The context manager ensures distributed is installed so we can try to register the plugin + # safely. The plugin is conditionally imported based on the installation status of distributed client.register_plugin(MonkeyPatchZfpDaskPlugin()) logger.info("Starting optimization with quality %s.", config.quality.name) to_mdio(optimized_dataset, source_path, mode="a") diff --git a/src/mdio/optimize/patch.py b/src/mdio/optimize/patch.py index fc06971c9..c1775af31 100644 --- a/src/mdio/optimize/patch.py +++ b/src/mdio/optimize/patch.py @@ -10,6 +10,7 @@ import asyncio from typing import TYPE_CHECKING +import distributed import numpy as np from numcodecs import blosc from zarr.codecs import numcodecs @@ -37,10 +38,16 @@ async def _encode_single(self, chunk_data: NDBuffer, chunk_spec: ArraySpec) -> B return chunk_spec.prototype.buffer.from_bytes(out) -class MonkeyPatchZfpDaskPlugin(distributed.WorkerPlugin): - """Monkey patch ZFP codec and disable Blosc threading for Dask workers.""" +if distributed is not None: - def setup(self, worker: distributed.Worker) -> None: # noqa: ARG002 - """Monkey patch ZFP codec and disable Blosc threading.""" - numcodecs._codecs.ZFPY = ZFPY - blosc.set_nthreads(1) + class MonkeyPatchZfpDaskPlugin(distributed.WorkerPlugin): + """Monkey patch ZFP codec and disable Blosc threading for Dask workers. + + Note that this is class is only importable if distributed is installed. However, in the caller + function we have a context manager that checks if distributed is installed, so it is safe (for now). + """ + + def setup(self, worker: distributed.Worker) -> None: # noqa: ARG002 + """Monkey patch ZFP codec and disable Blosc threading.""" + numcodecs._codecs.ZFPY = ZFPY + blosc.set_nthreads(1) From aa1848b5db00d02f3a8f4d09940f29429ddbc433 Mon Sep 17 00:00:00 2001 From: Altay Sansal <13684161+tasansal@users.noreply.github.com> Date: Thu, 18 Dec 2025 11:18:27 -0600 Subject: [PATCH 07/27] refactor Dask integration to simplify ZFP plugin setup and import handling --- src/mdio/optimize/patch.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/mdio/optimize/patch.py b/src/mdio/optimize/patch.py index c1775af31..b3781f700 100644 --- a/src/mdio/optimize/patch.py +++ b/src/mdio/optimize/patch.py @@ -10,7 +10,6 @@ import asyncio from typing import TYPE_CHECKING -import distributed import numpy as np from numcodecs import blosc from zarr.codecs import numcodecs @@ -23,7 +22,7 @@ try: import distributed -except ImportError: +except ModuleNotFoundError: distributed = None From c402b958e0d2845104733302dc943f31cb22a622 Mon Sep 17 00:00:00 2001 From: Altay Sansal <13684161+tasansal@users.noreply.github.com> Date: Thu, 18 Dec 2025 11:20:25 -0600 Subject: [PATCH 08/27] ZFP plugin import by switching to local scoped import --- src/mdio/optimize/access_pattern.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/mdio/optimize/access_pattern.py b/src/mdio/optimize/access_pattern.py index 485037c9f..4eb4866cd 100644 --- a/src/mdio/optimize/access_pattern.py +++ b/src/mdio/optimize/access_pattern.py @@ -19,7 +19,6 @@ from mdio.optimize.common import apply_zfp_encoding from mdio.optimize.common import get_or_create_client from mdio.optimize.common import get_zfp_encoding -from mdio.optimize.patch import MonkeyPatchZfpDaskPlugin if TYPE_CHECKING: from mdio.optimize.common import ZfpQuality @@ -107,7 +106,9 @@ def optimize_access_patterns( with get_or_create_client(n_workers=n_workers, threads_per_worker=threads_per_worker) as client: # The context manager ensures distributed is installed so we can try to register the plugin - # safely. The plugin is conditionally imported based on the installation status of distributed + # safely. The plugin is conditionally created based on the installation status of distributed + from mdio.optimize.patch import MonkeyPatchZfpDaskPlugin # noqa: PLC0415 + client.register_plugin(MonkeyPatchZfpDaskPlugin()) logger.info("Starting optimization with quality %s.", config.quality.name) to_mdio(optimized_dataset, source_path, mode="a") From e665514051e5c21967718206e3d438d0cc1b61f2 Mon Sep 17 00:00:00 2001 From: Altay Sansal <13684161+tasansal@users.noreply.github.com> Date: Thu, 18 Dec 2025 11:35:41 -0600 Subject: [PATCH 09/27] refactor access pattern logic to rename `access_patterns` to `optimize_dimensions` and update related usage --- src/mdio/optimize/access_pattern.py | 21 ++++++++++++--------- 1 file changed, 12 insertions(+), 9 deletions(-) diff --git a/src/mdio/optimize/access_pattern.py b/src/mdio/optimize/access_pattern.py index 4eb4866cd..69fe7c79d 100644 --- a/src/mdio/optimize/access_pattern.py +++ b/src/mdio/optimize/access_pattern.py @@ -31,7 +31,7 @@ class OptimizedAccessPatternConfig(BaseModel): """Configuration for fast access pattern optimization.""" quality: ZfpQuality = Field(..., description="Compression quality.") - access_patterns: dict[str, dict[str, tuple[int, ...]]] = Field(..., description="New variables and chunk sizes.") + optimize_dimensions: dict[str, tuple[int, ...]] = Field(..., description="Optimize dims and desired chunks.") processing_chunks: dict[str, int] = Field(..., description="Chunk sizes for processing the original variable.") @@ -67,10 +67,10 @@ def optimize_access_patterns( >>> >>> conf = OptimizedAccessPatternConfig( >>> quality=MdioZfpQuality.LOW, - >>> access_patterns={ - >>> "fast_inline": {"chunks": (4, 512, 512)}, - >>> "fast_crossline": {"chunks": (512, 4, 512)}, - >>> "fast_time": {"chunks": (512, 512, 4)}, + >>> optimize_dimensions={ + >>> "inline": (4, 512, 512), + >>> "crossline": (512, 4, 512), + >>> "time": (512, 512, 4), >>> }, >>> processing_chunks= {"inline": 512, "crossline": 512, "time": 512} >>> ) @@ -96,10 +96,13 @@ def optimize_access_patterns( zfp_encoding = get_zfp_encoding(stats, config.quality) optimized_variables = {} - for pattern_name, pattern_config in config.access_patterns.items(): - optimized_var = apply_zfp_encoding(chunked_var, pattern_config["chunks"], zfp_encoding) - optimized_var.name = pattern_name - optimized_variables[pattern_name] = optimized_var + for dim_name, dim_new_chunks in config.optimize_dimensions.items(): + if dim_name not in chunked_var.dims: + msg = f"Dimension to optimize '{dim_name}' not found in original dataset dims: {chunked_var.dims}." + raise ValueError(msg) + optimized_var = apply_zfp_encoding(chunked_var, dim_new_chunks, zfp_encoding) + optimized_var.name = f"fast_{dim_name}" + optimized_variables[optimized_var.name] = optimized_var optimized_dataset = xr_Dataset(optimized_variables, attrs=dataset.attrs) source_path = dataset.encoding["source"] From e8a718cc4302d51682226197fdc5d7db6d864417 Mon Sep 17 00:00:00 2001 From: Altay Sansal <13684161+tasansal@users.noreply.github.com> Date: Thu, 18 Dec 2025 14:13:47 -0600 Subject: [PATCH 10/27] fix syntax error on warning ignore --- src/mdio/api/io.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/mdio/api/io.py b/src/mdio/api/io.py index be7d00702..ea5b22dc3 100644 --- a/src/mdio/api/io.py +++ b/src/mdio/api/io.py @@ -92,7 +92,7 @@ def to_mdio( # noqa: PLR0913 storage_options = _normalize_storage_options(output_path) zarr_format = zarr.config.get("default_zarr_format") - with zarr_warnings_suppress_unstable_structs_v3() and zarr_warnings_suppress_unstable_numcodecs_v3(): + with zarr_warnings_suppress_unstable_structs_v3(), zarr_warnings_suppress_unstable_numcodecs_v3(): xr_to_zarr( dataset, store=output_path.as_posix(), # xarray doesn't like URI when file:// is protocol From 402665fd9a2a27f6ef0378f6aea7c72e78dbc129 Mon Sep 17 00:00:00 2001 From: Altay Sansal <13684161+tasansal@users.noreply.github.com> Date: Thu, 18 Dec 2025 14:40:45 -0600 Subject: [PATCH 11/27] update rechunking tutorial to v1 --- docs/tutorials/rechunking.ipynb | 1623 ++++++++++++++++++++++++------- 1 file changed, 1282 insertions(+), 341 deletions(-) diff --git a/docs/tutorials/rechunking.ipynb b/docs/tutorials/rechunking.ipynb index f9d4b12da..79b9af66e 100644 --- a/docs/tutorials/rechunking.ipynb +++ b/docs/tutorials/rechunking.ipynb @@ -17,257 +17,1329 @@ "## Introduction\n", "\n", "In this page we will be showing how we can take an existing MDIO and add\n", - "fast access, lossy, versions of the data in X/Y/Z cross-sections (slices).\n", + "fast access, lossy, versions of the data in IL/XL/TWT cross-sections (slices).\n", "\n", "We can re-use the MDIO dataset we created in the [Quickstart](#quickstart) page.\n", "Please run it first.\n", "\n", - "We will define our compression levels first. We will use this to adjust the quality\n", - "of the lossy compression." + "Let's open the original MDIO first." ] }, { "cell_type": "code", - "execution_count": 1, - "id": "initial_id", - "metadata": { - "ExecuteTime": { - "end_time": "2025-04-16T18:38:02.462276Z", - "start_time": "2025-04-16T18:38:02.459882Z" - } - }, - "outputs": [], - "source": [ - "from enum import Enum\n", - "\n", - "\n", - "class MdioZfpQuality(float, Enum):\n", - " \"\"\"Config options for ZFP compression.\"\"\"\n", - "\n", - " VERY_LOW = 6\n", - " LOW = 3\n", - " MEDIUM = 1\n", - " HIGH = 0.1\n", - " VERY_HIGH = 0.01\n", - " ULTRA = 0.001" - ] - }, - { - "cell_type": "markdown", - "id": "c2a09a89-b453-4c3e-b879-14caaedd29de", - "metadata": {}, - "source": [ - "We will use the lower level `MDIOAccessor` to open the existing file in write mode that\n", - "allows us to modify its raw metadata. This can be dangerous, we recommend using only provided\n", - "tools to avoid data corruption.\n", - "\n", - "We specify the original access pattern of the source data `\"012\"` with some parameters like\n", - "caching. For the rechunking, we recommend using the single threaded `\"zarr\"` backend to avoid\n", - "race conditions.\n", - "\n", - "We also define a `dict` for common arguments in rechunking." - ] - }, - { - "cell_type": "code", - "execution_count": 3, + "execution_count": null, "id": "45558306-ab9c-46aa-a299-8758a911b373", - "metadata": { - "ExecuteTime": { - "end_time": "2025-04-16T18:38:04.107696Z", - "start_time": "2025-04-16T18:38:04.101239Z" + "metadata": {}, + "outputs": [ + { + "data": { + "text/html": [ + "
<xarray.Dataset> Size: 2GB\n",
+ "Dimensions: (inline: 345, crossline: 188, time: 1501)\n",
+ "Coordinates:\n",
+ " * inline (inline) int32 1kB 1 2 3 4 5 6 ... 340 341 342 343 344 345\n",
+ " * crossline (crossline) int32 752B 1 2 3 4 5 6 ... 184 185 186 187 188\n",
+ " * time (time) int32 6kB 0 2 4 6 8 10 ... 2992 2994 2996 2998 3000\n",
+ " cdp_x (inline, crossline) float64 519kB ...\n",
+ " cdp_y (inline, crossline) float64 519kB ...\n",
+ "Data variables:\n",
+ " fast_crossline (inline, crossline, time) float32 389MB ...\n",
+ " fast_inline (inline, crossline, time) float32 389MB ...\n",
+ " trace_mask (inline, crossline) bool 65kB ...\n",
+ " amplitude (inline, crossline, time) float32 389MB ...\n",
+ " segy_file_header <U1 4B ...\n",
+ " fast_time (inline, crossline, time) float32 389MB ...\n",
+ " headers (inline, crossline) [('trace_seq_num_line', '<i4'), ('trace_seq_num_reel', '<i4'), ('orig_field_record_num', '<i4'), ('trace_num_orig_record', '<i4'), ('energy_source_point_num', '<i4'), ('ensemble_num', '<i4'), ('trace_num_ensemble', '<i4'), ('trace_id_code', '<i2'), ('vertically_summed_traces', '<i2'), ('horizontally_stacked_traces', '<i2'), ('data_use', '<i2'), ('source_to_receiver_distance', '<i4'), ('receiver_group_elevation', '<i4'), ('source_surface_elevation', '<i4'), ('source_depth_below_surface', '<i4'), ('receiver_datum_elevation', '<i4'), ('source_datum_elevation', '<i4'), ('source_water_depth', '<i4'), ('receiver_water_depth', '<i4'), ('elevation_depth_scalar', '<i2'), ('coordinate_scalar', '<i2'), ('source_coord_x', '<i4'), ('source_coord_y', '<i4'), ('group_coord_x', '<i4'), ('group_coord_y', '<i4'), ('coordinate_unit', '<i2'), ('weathering_velocity', '<i2'), ('subweathering_velocity', '<i2'), ('source_uphole_time', '<i2'), ('group_uphole_time', '<i2'), ('source_static_correction', '<i2'), ('receiver_static_correction', '<i2'), ('total_static_applied', '<i2'), ('lag_time_a', '<i2'), ('lag_time_b', '<i2'), ('delay_recording_time', '<i2'), ('mute_time_start', '<i2'), ('mute_time_end', '<i2'), ('samples_per_trace', '<i2'), ('sample_interval', '<i2'), ('instrument_gain_type', '<i2'), ('instrument_gain_const', '<i2'), ('instrument_gain_initial', '<i2'), ('correlated_data', '<i2'), ('sweep_freq_start', '<i2'), ('sweep_freq_end', '<i2'), ('sweep_length', '<i2'), ('sweep_type', '<i2'), ('sweep_taper_start', '<i2'), ('sweep_taper_end', '<i2'), ('taper_type', '<i2'), ('alias_filter_freq', '<i2'), ('alias_filter_slope', '<i2'), ('notch_filter_freq', '<i2'), ('notch_filter_slope', '<i2'), ('low_cut_freq', '<i2'), ('high_cut_freq', '<i2'), ('low_cut_slope', '<i2'), ('high_cut_slope', '<i2'), ('year_recorded', '<i2'), ('day_of_year', '<i2'), ('hour_of_day', '<i2'), ('minute_of_hour', '<i2'), ('second_of_minute', '<i2'), ('time_basis_code', '<i2'), ('trace_weighting_factor', '<i2'), ('group_num_roll_switch', '<i2'), ('group_num_first_trace', '<i2'), ('group_num_last_trace', '<i2'), ('gap_size', '<i2'), ('taper_overtravel', '<i2'), ('inline', '<i4'), ('crossline', '<i4'), ('cdp_x', '<i4'), ('cdp_y', '<i4')] 13MB ...\n",
+ "Attributes:\n",
+ " apiVersion: 1.1.1\n",
+ " createdOn: 2025-12-18 20:12:34.106580+00:00\n",
+ " name: PostStack3DTime\n",
+ " attributes: {'surveyType': '3D', 'gatherType': 'stacked', 'defaultVariab...<xarray.Dataset> Size: 2GB\n",
+ "Dimensions: (inline: 345, crossline: 188, time: 1501)\n",
+ "Coordinates:\n",
+ " * inline (inline) int32 1kB 1 2 3 4 5 6 ... 340 341 342 343 344 345\n",
+ " * crossline (crossline) int32 752B 1 2 3 4 5 6 ... 184 185 186 187 188\n",
+ " * time (time) int32 6kB 0 2 4 6 8 10 ... 2992 2994 2996 2998 3000\n",
+ " cdp_x (inline, crossline) float64 519kB ...\n",
+ " cdp_y (inline, crossline) float64 519kB ...\n",
+ "Data variables:\n",
+ " fast_crossline (inline, crossline, time) float32 389MB ...\n",
+ " amplitude (inline, crossline, time) float32 389MB ...\n",
+ " fast_inline (inline, crossline, time) float32 389MB ...\n",
+ " trace_mask (inline, crossline) bool 65kB ...\n",
+ " headers (inline, crossline) [('trace_seq_num_line', '<i4'), ('trace_seq_num_reel', '<i4'), ('orig_field_record_num', '<i4'), ('trace_num_orig_record', '<i4'), ('energy_source_point_num', '<i4'), ('ensemble_num', '<i4'), ('trace_num_ensemble', '<i4'), ('trace_id_code', '<i2'), ('vertically_summed_traces', '<i2'), ('horizontally_stacked_traces', '<i2'), ('data_use', '<i2'), ('source_to_receiver_distance', '<i4'), ('receiver_group_elevation', '<i4'), ('source_surface_elevation', '<i4'), ('source_depth_below_surface', '<i4'), ('receiver_datum_elevation', '<i4'), ('source_datum_elevation', '<i4'), ('source_water_depth', '<i4'), ('receiver_water_depth', '<i4'), ('elevation_depth_scalar', '<i2'), ('coordinate_scalar', '<i2'), ('source_coord_x', '<i4'), ('source_coord_y', '<i4'), ('group_coord_x', '<i4'), ('group_coord_y', '<i4'), ('coordinate_unit', '<i2'), ('weathering_velocity', '<i2'), ('subweathering_velocity', '<i2'), ('source_uphole_time', '<i2'), ('group_uphole_time', '<i2'), ('source_static_correction', '<i2'), ('receiver_static_correction', '<i2'), ('total_static_applied', '<i2'), ('lag_time_a', '<i2'), ('lag_time_b', '<i2'), ('delay_recording_time', '<i2'), ('mute_time_start', '<i2'), ('mute_time_end', '<i2'), ('samples_per_trace', '<i2'), ('sample_interval', '<i2'), ('instrument_gain_type', '<i2'), ('instrument_gain_const', '<i2'), ('instrument_gain_initial', '<i2'), ('correlated_data', '<i2'), ('sweep_freq_start', '<i2'), ('sweep_freq_end', '<i2'), ('sweep_length', '<i2'), ('sweep_type', '<i2'), ('sweep_taper_start', '<i2'), ('sweep_taper_end', '<i2'), ('taper_type', '<i2'), ('alias_filter_freq', '<i2'), ('alias_filter_slope', '<i2'), ('notch_filter_freq', '<i2'), ('notch_filter_slope', '<i2'), ('low_cut_freq', '<i2'), ('high_cut_freq', '<i2'), ('low_cut_slope', '<i2'), ('high_cut_slope', '<i2'), ('year_recorded', '<i2'), ('day_of_year', '<i2'), ('hour_of_day', '<i2'), ('minute_of_hour', '<i2'), ('second_of_minute', '<i2'), ('time_basis_code', '<i2'), ('trace_weighting_factor', '<i2'), ('group_num_roll_switch', '<i2'), ('group_num_first_trace', '<i2'), ('group_num_last_trace', '<i2'), ('gap_size', '<i2'), ('taper_overtravel', '<i2'), ('inline', '<i4'), ('crossline', '<i4'), ('cdp_x', '<i4'), ('cdp_y', '<i4')] 13MB ...\n",
+ " segy_file_header <U1 4B ...\n",
+ " fast_time (inline, crossline, time) float32 389MB ...\n",
+ "Attributes:\n",
+ " apiVersion: 1.1.1\n",
+ " createdOn: 2025-12-18 20:12:34.106580+00:00\n",
+ " name: PostStack3DTime\n",
+ " attributes: {'surveyType': '3D', 'gatherType': 'stacked', 'defaultVariab..."
+ ],
+ "text/plain": [
+ "<xarray.Dataset> Size: 2GB\n",
+ "<xarray.Dataset> Size: 403MB\n",
"Dimensions: (inline: 345, crossline: 188, time: 1501)\n",
"Coordinates:\n",
" * inline (inline) int32 1kB 1 2 3 4 5 6 ... 340 341 342 343 344 345\n",
" * crossline (crossline) int32 752B 1 2 3 4 5 6 ... 184 185 186 187 188\n",
" * time (time) int32 6kB 0 2 4 6 8 10 ... 2992 2994 2996 2998 3000\n",
- " cdp_x (inline, crossline) float64 519kB ...\n",
" cdp_y (inline, crossline) float64 519kB ...\n",
+ " cdp_x (inline, crossline) float64 519kB ...\n",
"Data variables:\n",
- " fast_crossline (inline, crossline, time) float32 389MB ...\n",
- " fast_inline (inline, crossline, time) float32 389MB ...\n",
- " trace_mask (inline, crossline) bool 65kB ...\n",
" amplitude (inline, crossline, time) float32 389MB ...\n",
- " segy_file_header <U1 4B ...\n",
- " fast_time (inline, crossline, time) float32 389MB ...\n",
" headers (inline, crossline) [('trace_seq_num_line', '<i4'), ('trace_seq_num_reel', '<i4'), ('orig_field_record_num', '<i4'), ('trace_num_orig_record', '<i4'), ('energy_source_point_num', '<i4'), ('ensemble_num', '<i4'), ('trace_num_ensemble', '<i4'), ('trace_id_code', '<i2'), ('vertically_summed_traces', '<i2'), ('horizontally_stacked_traces', '<i2'), ('data_use', '<i2'), ('source_to_receiver_distance', '<i4'), ('receiver_group_elevation', '<i4'), ('source_surface_elevation', '<i4'), ('source_depth_below_surface', '<i4'), ('receiver_datum_elevation', '<i4'), ('source_datum_elevation', '<i4'), ('source_water_depth', '<i4'), ('receiver_water_depth', '<i4'), ('elevation_depth_scalar', '<i2'), ('coordinate_scalar', '<i2'), ('source_coord_x', '<i4'), ('source_coord_y', '<i4'), ('group_coord_x', '<i4'), ('group_coord_y', '<i4'), ('coordinate_unit', '<i2'), ('weathering_velocity', '<i2'), ('subweathering_velocity', '<i2'), ('source_uphole_time', '<i2'), ('group_uphole_time', '<i2'), ('source_static_correction', '<i2'), ('receiver_static_correction', '<i2'), ('total_static_applied', '<i2'), ('lag_time_a', '<i2'), ('lag_time_b', '<i2'), ('delay_recording_time', '<i2'), ('mute_time_start', '<i2'), ('mute_time_end', '<i2'), ('samples_per_trace', '<i2'), ('sample_interval', '<i2'), ('instrument_gain_type', '<i2'), ('instrument_gain_const', '<i2'), ('instrument_gain_initial', '<i2'), ('correlated_data', '<i2'), ('sweep_freq_start', '<i2'), ('sweep_freq_end', '<i2'), ('sweep_length', '<i2'), ('sweep_type', '<i2'), ('sweep_taper_start', '<i2'), ('sweep_taper_end', '<i2'), ('taper_type', '<i2'), ('alias_filter_freq', '<i2'), ('alias_filter_slope', '<i2'), ('notch_filter_freq', '<i2'), ('notch_filter_slope', '<i2'), ('low_cut_freq', '<i2'), ('high_cut_freq', '<i2'), ('low_cut_slope', '<i2'), ('high_cut_slope', '<i2'), ('year_recorded', '<i2'), ('day_of_year', '<i2'), ('hour_of_day', '<i2'), ('minute_of_hour', '<i2'), ('second_of_minute', '<i2'), ('time_basis_code', '<i2'), ('trace_weighting_factor', '<i2'), ('group_num_roll_switch', '<i2'), ('group_num_first_trace', '<i2'), ('group_num_last_trace', '<i2'), ('gap_size', '<i2'), ('taper_overtravel', '<i2'), ('inline', '<i4'), ('crossline', '<i4'), ('cdp_x', '<i4'), ('cdp_y', '<i4')] 13MB ...\n",
+ " segy_file_header <U1 4B ...\n",
+ " trace_mask (inline, crossline) bool 65kB ...\n",
"Attributes:\n",
" apiVersion: 1.1.1\n",
- " createdOn: 2025-12-18 20:12:34.106580+00:00\n",
+ " createdOn: 2025-12-19 16:05:58.230520+00:00\n",
" name: PostStack3DTime\n",
- " attributes: {'surveyType': '3D', 'gatherType': 'stacked', 'defaultVariab...