diff --git a/src/mdio/__init__.py b/src/mdio/__init__.py index 5fed389c..aea44383 100644 --- a/src/mdio/__init__.py +++ b/src/mdio/__init__.py @@ -12,11 +12,14 @@ except metadata.PackageNotFoundError: __version__ = "unknown" +# Import numpy_to_mdio after __version__ is set to avoid circular import +from mdio.converters.numpy import numpy_to_mdio __all__ = [ "__version__", "open_mdio", "to_mdio", "mdio_to_segy", + "numpy_to_mdio", "segy_to_mdio", ] diff --git a/src/mdio/converters/numpy.py b/src/mdio/converters/numpy.py new file mode 100644 index 00000000..f4d21368 --- /dev/null +++ b/src/mdio/converters/numpy.py @@ -0,0 +1,170 @@ +"""Conversion from Numpy to MDIO v1 format.""" + +from __future__ import annotations + +from typing import TYPE_CHECKING + +import numpy as np + +from mdio.api.io import _normalize_path +from mdio.api.io import to_mdio +from mdio.builder.xarray_builder import to_xarray_dataset +from mdio.converters.type_converter import to_scalar_type + +if TYPE_CHECKING: + from pathlib import Path + + from numpy.typing import DTypeLike + from numpy.typing import NDArray + from upath import UPath + from xarray import Dataset as xr_Dataset + + from mdio.builder.schemas import Dataset + from mdio.builder.templates.base import AbstractDatasetTemplate + + +def _build_dataset( + array: NDArray, + mdio_template: AbstractDatasetTemplate, + header_dtype: DTypeLike | None, +) -> Dataset: + """Build the dataset for the numpy array using the provided template.""" + # Convert numpy dtype to MDIO ScalarType + data_type = to_scalar_type(array.dtype) + + # Build dataset using template (which defines chunking) + mdio_ds: Dataset = mdio_template.build_dataset( + name=mdio_template.name, + sizes=array.shape, + header_dtype=header_dtype, + ) + + # Update the default variable with correct dtype + var_index = next( + (i for i, v in enumerate(mdio_ds.variables) if v.name == mdio_template.default_variable_name), None + ) + if var_index is not None: + mdio_ds.variables[var_index].data_type = data_type + + return mdio_ds + + +def _validate_coordinates( + index_coords: dict[str, NDArray], + mdio_template: AbstractDatasetTemplate, + array: NDArray, +) -> None: + """Validate user-provided coordinates against template and array dimensions. + + Args: + index_coords: Dictionary mapping dimension names to coordinate arrays. + mdio_template: The MDIO template defining expected dimensions. + array: The numpy array being converted. + + Raises: + ValueError: If coordinate names or sizes don't match requirements. + """ + # Validate that coordinate names match template dimension names + for coord_name in index_coords: + if coord_name not in mdio_template.dimension_names: + available_dims = sorted(mdio_template.dimension_names) + err = ( + f"Coordinate name '{coord_name}' not found in template dimensions. " + f"Available dimensions: {available_dims}" + ) + raise ValueError(err) + + # Validate coordinate array sizes match array dimensions + for dim_name, coord_array in index_coords.items(): + expected_size = array.shape[mdio_template.dimension_names.index(dim_name)] + if coord_array.size != expected_size: + err = ( + f"Size of coordinate '{dim_name}' ({coord_array.size}) does not match " + f"array dimension size ({expected_size})" + ) + raise ValueError(err) + + +def _populate_coordinates_and_write( + xr_dataset: xr_Dataset, + index_coords: dict[str, NDArray], + output_path: UPath | Path, + array: NDArray, +) -> None: + """Populate coordinates and write data to MDIO.""" + # Populate dimension coordinates + for name, coords in index_coords.items(): + xr_dataset[name].data[:] = coords + + # Set trace mask to all True (no missing data for numpy arrays) + xr_dataset.trace_mask.data[:] = True + + # Set the data + data_var_name = xr_dataset.attrs.get("defaultVariableName", "amplitude") + xr_dataset[data_var_name].data[:] = array + + # Write everything at once + to_mdio(xr_dataset, output_path=output_path, mode="w", compute=True) + + +def numpy_to_mdio( # noqa: PLR0913 + array: NDArray, + mdio_template: AbstractDatasetTemplate, + output_path: UPath | Path | str, + index_coords: dict[str, NDArray] | None = None, + header_dtype: DTypeLike | None = None, + overwrite: bool = False, +) -> None: + """Convert a NumPy array to MDIO v1 format. + + This function converts a NumPy array into the MDIO format following the same + interface pattern as SEG-Y to MDIO conversion. + + Args: + array: Input NumPy array to be converted to MDIO format. + mdio_template: The MDIO template to use for the conversion. The template defines + the dataset structure including compression and chunking settings. + output_path: The universal path for the output MDIO v1 file. + index_coords: Dictionary mapping dimension names to their coordinate arrays. If not + provided, defaults to sequential integers (0 to size-1) for each dimension. + header_dtype: Data type for trace headers, if applicable. Defaults to None. + overwrite: If True, overwrites existing MDIO file at the specified path. + + Raises: + FileExistsError: If the output location already exists and overwrite is False. + """ + # Prepare coordinates - create defaults if not provided + if index_coords is None: + index_coords = {} + for name, size in zip(mdio_template.dimension_names, array.shape, strict=True): + index_coords[name] = np.arange(size, dtype=np.int32) + + # Normalize path + output_path = _normalize_path(output_path) + + # Check if output exists + if not overwrite and output_path.exists(): + err = f"Output location '{output_path.as_posix()}' exists. Set `overwrite=True` if intended." + raise FileExistsError(err) + + # Validate coordinates if provided + if index_coords: + _validate_coordinates(index_coords, mdio_template, array) + + # Build dataset + mdio_ds = _build_dataset( + array=array, + mdio_template=mdio_template, + header_dtype=header_dtype, + ) + + # Convert to xarray dataset + xr_dataset: xr_Dataset = to_xarray_dataset(mdio_ds=mdio_ds) + + # Populate coordinates and write data + _populate_coordinates_and_write( + xr_dataset=xr_dataset, + index_coords=index_coords, + output_path=output_path, + array=array, + ) diff --git a/tests/integration/test_import_numpy.py b/tests/integration/test_import_numpy.py new file mode 100644 index 00000000..bb86fb82 --- /dev/null +++ b/tests/integration/test_import_numpy.py @@ -0,0 +1,167 @@ +"""Module for testing NumPy to MDIO conversion functionality. + +This module contains tests for the `numpy_to_mdio` function, ensuring proper conversion +of NumPy arrays to MDIO format using templates. +""" + +from __future__ import annotations + +from typing import TYPE_CHECKING + +import numpy as np +import numpy.testing as npt +import pytest + +from mdio.api.io import open_mdio +from mdio.builder.templates.base import AbstractDatasetTemplate +from mdio.builder.templates.seismic_3d_poststack import Seismic3DPostStackTemplate +from mdio.converters.numpy import numpy_to_mdio + +if TYPE_CHECKING: + from numpy.typing import NDArray + + +class MockTemplate(AbstractDatasetTemplate): + """Mock template for testing.""" + + def __init__(self, dim_names: tuple[str, ...], data_domain: str = "time"): + super().__init__(data_domain=data_domain) + self._dim_names = dim_names + self._physical_coord_names = () + self._logical_coord_names = () + self._var_chunk_shape = (8, 8, 8) + + @property + def _name(self) -> str: + return "MockTemplate" + + def _load_dataset_attributes(self) -> dict: + return {"dataType": "numpy_array", "surveyType": "generic"} + + +@pytest.fixture +def mock_array() -> NDArray: + """Make a mock array.""" + rng = np.random.default_rng() + return rng.uniform(size=(15, 10, 20)).astype("float32") + + +@pytest.fixture +def mock_template() -> AbstractDatasetTemplate: + """Make a mock template.""" + return MockTemplate(("inline", "crossline", "time")) + + +CHUNK_SIZE = (8, 8, 8) + + +def test_npy_to_mdio_basic(mock_array: NDArray, mock_template: AbstractDatasetTemplate) -> None: + """Test basic NumPy to MDIO conversion using a template.""" + numpy_to_mdio(mock_array, mock_template, "memory://npy.mdio") + ds = open_mdio("memory://npy.mdio") + + # Check data + data_var = ds.attrs.get("defaultVariableName", "amplitude") + npt.assert_array_equal(ds[data_var].values, mock_array) + + # Check dimensions + assert list(ds.sizes.keys()) == ["inline", "crossline", "time"] + assert ds[data_var].shape == mock_array.shape + + +def test_npy_to_mdio_with_coords(mock_array: NDArray, mock_template: AbstractDatasetTemplate) -> None: + """Test NumPy to MDIO conversion with custom coordinates.""" + index_coords = { + "inline": np.arange(100, 115), + "crossline": np.arange(200, 210), + "time": np.arange(0, 20), + } + numpy_to_mdio( + mock_array, + mock_template, + "memory://npy_coord.mdio", + index_coords, + ) + ds = open_mdio("memory://npy_coord.mdio") + + # Check data + data_var = ds.attrs.get("defaultVariableName", "amplitude") + npt.assert_array_equal(ds[data_var].values, mock_array) + assert ds[data_var].shape == mock_array.shape + + # Check coordinates + npt.assert_array_equal(ds["inline"].values, index_coords["inline"]) + npt.assert_array_equal(ds["crossline"].values, index_coords["crossline"]) + npt.assert_array_equal(ds["time"].values, index_coords["time"]) + + +def test_npy_to_mdio_default_chunksize(mock_array: NDArray, mock_template: AbstractDatasetTemplate) -> None: + """Test NumPy to MDIO conversion using template's default chunk size.""" + numpy_to_mdio(mock_array, mock_template, "memory://npy_default.mdio") + ds = open_mdio("memory://npy_default.mdio") + + # Check data + data_var = ds.attrs.get("defaultVariableName", "amplitude") + npt.assert_array_equal(ds[data_var].values, mock_array) + assert list(ds.sizes.keys()) == ["inline", "crossline", "time"] + assert ds[data_var].shape == mock_array.shape + + +def test_npy_to_mdio_seismic_template(mock_array: NDArray) -> None: + """Test NumPy to MDIO conversion using a seismic template.""" + template = Seismic3DPostStackTemplate(data_domain="time") + numpy_to_mdio(mock_array, template, "memory://npy_seismic.mdio") + ds = open_mdio("memory://npy_seismic.mdio") + + # Check data + data_var = ds.attrs.get("defaultVariableName", "amplitude") + npt.assert_array_equal(ds[data_var].values, mock_array) + assert list(ds.sizes.keys()) == ["inline", "crossline", "time"] + assert ds[data_var].shape == mock_array.shape + + +def test_npy_to_mdio_invalid_coordinate_names(mock_array: NDArray, mock_template: AbstractDatasetTemplate) -> None: + """Test NumPy to MDIO conversion with invalid coordinate names.""" + index_coords = { + "inline": np.arange(15), # valid + "crossline": np.arange(10), # valid + "invalid_dim": np.arange(20), # invalid dimension name + } + + with pytest.raises(ValueError, match=r"Coordinate name 'invalid_dim' not found in template dimensions"): + numpy_to_mdio(mock_array, mock_template, "memory://npy_invalid.mdio", index_coords=index_coords) + + +def test_npy_to_mdio_invalid_coordinate_sizes(mock_array: NDArray, mock_template: AbstractDatasetTemplate) -> None: + """Test NumPy to MDIO conversion with invalid coordinate array sizes.""" + index_coords = { + "inline": np.arange(10), # wrong size (should be 15) + "crossline": np.arange(10), # valid + "time": np.arange(20), # valid + } + + with pytest.raises( + ValueError, match=r"Size of coordinate 'inline' \(10\) does not match array dimension size \(15\)" + ): + numpy_to_mdio(mock_array, mock_template, "memory://npy_wrong_size.mdio", index_coords=index_coords) + + +def test_npy_to_mdio_valid_custom_coordinates(mock_array: NDArray, mock_template: AbstractDatasetTemplate) -> None: + """Test NumPy to MDIO conversion with valid custom coordinates.""" + index_coords = { + "inline": np.arange(100, 115), # valid size (15) + "crossline": np.arange(200, 210), # valid size (10) + "time": np.arange(0, 20), # valid size (20) + } + + numpy_to_mdio(mock_array, mock_template, "memory://npy_valid_custom.mdio", index_coords=index_coords) + ds = open_mdio("memory://npy_valid_custom.mdio") + + # Check data + data_var = ds.attrs.get("defaultVariableName", "amplitude") + npt.assert_array_equal(ds[data_var].values, mock_array) + + # Check coordinates are correctly set + npt.assert_array_equal(ds["inline"].values, index_coords["inline"]) + npt.assert_array_equal(ds["crossline"].values, index_coords["crossline"]) + npt.assert_array_equal(ds["time"].values, index_coords["time"])