Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 7 additions & 5 deletions src/blosc2/dict_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -153,15 +153,15 @@ def _init_read_mode(self, dparams: blosc2.DParams | None = None):
if "embed.b2e" not in self.offsets:
raise FileNotFoundError("Embed file embed.b2e not found in store.")
estore_offset = self.offsets["embed.b2e"]["offset"]
schunk = blosc2.open(self.b2z_path, mode="r", offset=estore_offset, dparams=dparams)
schunk = blosc2.blosc2_ext.open(self.b2z_path, mode="r", offset=estore_offset, dparams=dparams)
for filepath in self.offsets:
if filepath.endswith((".b2nd", ".b2f")):
key = "/" + filepath[: -5 if filepath.endswith(".b2nd") else -4]
self.map_tree[key] = filepath
else: # .b2d
if not os.path.isdir(self.localpath):
raise FileNotFoundError(f"Directory {self.localpath} does not exist for reading.")
schunk = blosc2.open(self.estore_path, mode="r", dparams=dparams)
schunk = blosc2.blosc2_ext.open(self.estore_path, mode="r", offset=0, dparams=dparams)
self._update_map_tree()

self._estore = EmbedStore(_from_schunk=schunk)
Expand Down Expand Up @@ -267,7 +267,7 @@ def __getitem__(self, key: str) -> blosc2.NDArray | SChunk | C2Array:
filepath = self.map_tree[key]
if filepath in self.offsets:
offset = self.offsets[filepath]["offset"]
return blosc2.open(self.b2z_path, mode="r", offset=offset, dparams=self.dparams)
return blosc2.blosc2_ext.open(self.b2z_path, mode="r", offset=offset, dparams=self.dparams)
else:
urlpath = os.path.join(self.working_dir, filepath)
if os.path.exists(urlpath):
Expand Down Expand Up @@ -331,7 +331,9 @@ def values(self) -> Iterator[blosc2.NDArray | SChunk | C2Array]:
if self.is_zip_store:
if filepath in self.offsets:
offset = self.offsets[filepath]["offset"]
yield blosc2.open(self.b2z_path, mode="r", offset=offset, dparams=self.dparams)
yield blosc2.blosc2_ext.open(
self.b2z_path, mode="r", offset=offset, dparams=self.dparams
)
else:
urlpath = os.path.join(self.working_dir, filepath)
yield blosc2.open(urlpath, mode="r" if self.mode == "r" else "a", dparams=self.dparams)
Expand All @@ -350,7 +352,7 @@ def items(self) -> Iterator[tuple[str, blosc2.NDArray | SChunk | C2Array]]:
if self.is_zip_store:
if filepath in self.offsets:
offset = self.offsets[filepath]["offset"]
yield key, blosc2.open(self.b2z_path, mode="r", offset=offset)
yield key, blosc2.blosc2_ext.open(self.b2z_path, mode="r", offset=offset)
else:
urlpath = os.path.join(self.working_dir, filepath)
yield key, blosc2.open(urlpath, mode="r" if self.mode == "r" else "a")
Expand Down
11 changes: 10 additions & 1 deletion src/blosc2/embed_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ def __init__(
self.storage = storage

if mode in ("r", "a") and urlpath:
self._store = blosc2.open(urlpath, mode=mode)
self._store = blosc2.blosc2_ext.open(urlpath, mode=mode, offset=0)
self._load_metadata()
return

Expand Down Expand Up @@ -254,6 +254,15 @@ def to_cframe(self) -> bytes:
"""Serialize embed store to CFrame format."""
return self._store.to_cframe()

def __enter__(self):
"""Context manager enter."""
return self

def __exit__(self, exc_type, exc_val, exc_tb):
"""Context manager exit."""
# No need to close anything as SChunk/NDArray handles persistence automatically
return False


def estore_from_cframe(cframe: bytes, copy: bool = False) -> EmbedStore:
"""
Expand Down
92 changes: 60 additions & 32 deletions src/blosc2/schunk.py
Original file line number Diff line number Diff line change
Expand Up @@ -1470,11 +1470,59 @@ def __dealloc__(self):
super().__dealloc__()


def _open_special_store(urlpath, mode, offset, **kwargs):
if urlpath.endswith(".b2z") or urlpath.endswith(".b2d"):
if offset != 0:
raise ValueError("Offset must be 0 for DictStore")
from blosc2.dict_store import DictStore

return DictStore(urlpath, mode=mode, **kwargs)
elif urlpath.endswith(".b2e"):
if offset != 0:
raise ValueError("Offset must be 0 for EmbedStore")
from blosc2.embed_store import EmbedStore

return EmbedStore(urlpath, mode=mode, **kwargs)
return None


def _set_default_dparams(kwargs):
dparams = kwargs.get("dparams")
if dparams is None:
# Use multiple threads for decompression by default, unless we are in WASM
# (does not support threads). The only drawback for using multiple threads
# is that access time will be slower because of the overhead of spawning threads
# (but could be fixed in the future with more intelligent thread pools).
dparams = (
blosc2.DParams(nthreads=blosc2.nthreads) if not blosc2.IS_WASM else blosc2.DParams(nthreads=1)
)
kwargs["dparams"] = dparams


def _process_opened_object(res):
meta = getattr(res, "schunk", res).meta
if "proxy-source" in meta:
proxy_src = meta["proxy-source"]
if proxy_src["local_abspath"] is not None:
src = blosc2.open(proxy_src["local_abspath"])
return blosc2.Proxy(src, _cache=res)
elif proxy_src["urlpath"] is not None:
src = blosc2.C2Array(proxy_src["urlpath"][0], proxy_src["urlpath"][1], proxy_src["urlpath"][2])
return blosc2.Proxy(src, _cache=res)
elif not proxy_src["caterva2_env"]:
raise RuntimeError("Could not find the source when opening a Proxy")

if isinstance(res, blosc2.NDArray) and "LazyArray" in res.schunk.meta:
return blosc2._open_lazyarray(res)
else:
return res


def open(
urlpath: str | pathlib.Path | blosc2.URLPath, mode: str = "a", offset: int = 0, **kwargs: dict
) -> blosc2.SChunk | blosc2.NDArray | blosc2.C2Array | blosc2.LazyArray | blosc2.Proxy:
"""Open a persistent :ref:`SChunk`, :ref:`NDArray`, a remote :ref:`C2Array`
or a :ref:`Proxy`
) -> blosc2.SChunk | blosc2.NDArray | blosc2.C2Array | blosc2.LazyArray | blosc2.Proxy | Any:
"""Open a persistent :ref:`SChunk`, :ref:`NDArray`, a remote :ref:`C2Array`,
a :ref:`Proxy`, a :ref:`DictStore` or an :ref:`EmbedStore`.

See the `Notes` section for more info on opening `Proxy` objects.

Expand Down Expand Up @@ -1510,9 +1558,8 @@ def open(

Returns
-------
out: :ref:`SChunk`, :ref:`NDArray` or :ref:`C2Array`
The SChunk or NDArray (if there is a "b2nd" metalayer")
or the C2Array if :paramref:`urlpath` is a :ref:`blosc2.URLPath <URLPath>` instance.
out: :ref:`SChunk`, :ref:`NDArray`, :ref:`C2Array`, :ref:`DictStore` or :ref:`EmbedStore`
The object found in the path.

Notes
-----
Expand Down Expand Up @@ -1577,34 +1624,15 @@ def open(

if isinstance(urlpath, pathlib.PurePath):
urlpath = str(urlpath)

special = _open_special_store(urlpath, mode, offset, **kwargs)
if special is not None:
return special

if not os.path.exists(urlpath):
raise FileNotFoundError(f"No such file or directory: {urlpath}")

dparams = kwargs.get("dparams")
if dparams is None:
# Use multiple threads for decompression by default, unless we are in WASM
# (does not support threads). The only drawback for using multiple threads
# is that access time will be slower because of the overhead of spawning threads
# (but could be fixed in the future with more intelligent thread pools).
dparams = (
blosc2.DParams(nthreads=blosc2.nthreads) if not blosc2.IS_WASM else blosc2.DParams(nthreads=1)
)
kwargs["dparams"] = dparams
_set_default_dparams(kwargs)
res = blosc2_ext.open(urlpath, mode, offset, **kwargs)

meta = getattr(res, "schunk", res).meta
if "proxy-source" in meta:
proxy_src = meta["proxy-source"]
if proxy_src["local_abspath"] is not None:
src = blosc2.open(proxy_src["local_abspath"])
return blosc2.Proxy(src, _cache=res)
elif proxy_src["urlpath"] is not None:
src = blosc2.C2Array(proxy_src["urlpath"][0], proxy_src["urlpath"][1], proxy_src["urlpath"][2])
return blosc2.Proxy(src, _cache=res)
elif not proxy_src["caterva2_env"]:
raise RuntimeError("Could not find the source when opening a Proxy")

if isinstance(res, blosc2.NDArray) and "LazyArray" in res.schunk.meta:
return blosc2._open_lazyarray(res)
else:
return res
return _process_opened_object(res)
13 changes: 13 additions & 0 deletions tests/test_dict_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -436,3 +436,16 @@ def test_get_with_different_types():
finally:
if os.path.exists(path):
os.remove(path)


def test_open_context_manager(populated_dict_store):
"""Test opening via blosc2.open as a context manager."""
dstore_fixture, path = populated_dict_store
# Close the fixture store to ensure data is written to disk
dstore_fixture.close()

# Test opening via blosc2.open as a context manager
with blosc2.open(path, mode="r") as dstore:
assert isinstance(dstore, DictStore)
assert "/node1" in dstore
assert np.array_equal(dstore["/node1"][:], np.array([1, 2, 3]))
18 changes: 17 additions & 1 deletion tests/test_embed_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ def cleanup_files():
"test_estore.b2e",
"external_node3.b2nd",
]
yield
yield files
for f in files:
if os.path.exists(f):
os.remove(f)
Expand Down Expand Up @@ -201,3 +201,19 @@ def test_store_and_retrieve_schunk():
assert value.nbytes == len(data)
assert value[:] == data
assert value.vlmeta["description"] == vlmeta


def test_open_context_manager(cleanup_files):
"""Test opening via blosc2.open as a context manager."""
path = "test_embed_open.b2e"
cleanup_files.append(path)

# Create an EmbedStore
estore = blosc2.EmbedStore(path, mode="w")
estore["/node1"] = np.arange(10)

# Test opening via blosc2.open as a context manager
with blosc2.open(path, mode="r") as estore_read:
assert isinstance(estore_read, blosc2.EmbedStore)
assert "/node1" in estore_read
assert np.array_equal(estore_read["/node1"][:], np.arange(10))
Loading