Skip to content

Commit 8046f94

Browse files
committed
Remove get_many_ordered
1 parent b9b1586 commit 8046f94

File tree

8 files changed

+61
-128
lines changed

8 files changed

+61
-128
lines changed

changes/3207.bugfix.rst

Lines changed: 10 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -2,17 +2,14 @@
22
opening Zarr v3 arrays on synchronous fsspec filesystems, by implementing a
33
fallback to sequential reads for non-concurrency-safe filesystems, ensuring
44
robust metadata retrieval without sacrificing performance for safe
5-
filesystems. Furthermore ``Store.get_many`` was modified to retrieve objects
5+
filesystems. Furthermore ``Store._get_many`` was modified to retrieve objects
66
concurrently from storage. The previous implementation was sequential,
7-
awaiting each ``self.get(*req)`` before proceeding, contrary to the docstring.
8-
- Introduced ``Store.get_many_ordered`` and ``StorePath.get_many_ordered`` to
9-
retrieve multiple metadata files in a single call, optimizing the retrieval
10-
process and reducing overhead. ``StorePath.get_many_ordered`` is used in
11-
``get_array_metadata``. ``Store._get_many_ordered`` is used in
12-
``_read_metadata_v2``.
13-
- Modified ``FsspecStore._get_many`` and ``FsspecStore._get_many_ordered``
14-
to conditionally use ``asyncio.gather`` based on the concurrency safety
15-
of the underlying file system, enhancing compatibility with
16-
synchronous file systems by avoiding deadlocks when accessing metadata
17-
concurrently. Adding tests ``LockableFileSystem`` to test with
18-
async/sync behavior.
7+
awaiting each ``self.get(*req)`` before proceeding, contrary to the
8+
docstring.
9+
- Introduced ``StorePath.get_many``, mimicing the behaviour of `StorePath.get`.
10+
- Use ``Store._get_many`` and ``StorePath.get_many`` in ``get_array_metadata``.
11+
- Implemented ``FsspecStore._get_many`` to conditionally use ``asyncio.gather``
12+
based on the concurrency safety of the underlying file system, enhancing
13+
compatibility with synchronous file systems by avoiding deadlocks when
14+
accessing metadata concurrently. Adding tests ``LockableFileSystem`` to test
15+
with async/sync behavior.

src/zarr/abc/store.py

Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -426,16 +426,6 @@ async def _get_with_name(
426426
task = await completed
427427
yield task
428428

429-
async def _get_many_ordered(
430-
self, requests: Iterable[tuple[str, BufferPrototype, ByteRequest | None]]
431-
) -> tuple[Buffer | None, ...]:
432-
"""
433-
Retrieve a collection of objects from storage in the order they were requested.
434-
"""
435-
tasks = [self.get(*req) for req in requests]
436-
437-
return tuple(await gather(*tasks))
438-
439429
async def getsize(self, key: str) -> int:
440430
"""
441431
Return the size, in bytes, of a value in a Store.

src/zarr/core/array.py

Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -211,24 +211,25 @@ async def get_array_metadata(
211211
store_path: StorePath, zarr_format: ZarrFormat | None = 3
212212
) -> dict[str, JSON]:
213213
if zarr_format == 2:
214-
zarray_bytes, zattrs_bytes = await store_path.get_many_ordered(
215-
ZARRAY_JSON,
216-
ZATTRS_JSON,
217-
prototype=cpu_buffer_prototype,
218-
)
214+
requests = [(key, default_buffer_prototype(), None) for key in [ZARRAY_JSON, ZATTRS_JSON]]
215+
retrieved_buffers = {key: value async for key, value in store_path.get_many(requests)}
216+
zarray_bytes, zattrs_bytes = tuple(retrieved_buffers.get(req[0]) for req in requests)
217+
219218
if zarray_bytes is None:
220219
raise FileNotFoundError(store_path)
221220
elif zarr_format == 3:
222221
zarr_json_bytes = await (store_path / ZARR_JSON).get(prototype=cpu_buffer_prototype)
223222
if zarr_json_bytes is None:
224223
raise FileNotFoundError(store_path)
225224
elif zarr_format is None:
226-
zarr_json_bytes, zarray_bytes, zattrs_bytes = await store_path.get_many_ordered(
227-
ZARR_JSON,
228-
ZARRAY_JSON,
229-
ZATTRS_JSON,
230-
prototype=cpu_buffer_prototype,
225+
requests = [
226+
(key, default_buffer_prototype(), None) for key in [ZARR_JSON, ZARRAY_JSON, ZATTRS_JSON]
227+
]
228+
retrieved_buffers = {key: value async for key, value in store_path.get_many(requests)}
229+
zarr_json_bytes, zarray_bytes, zattrs_bytes = tuple(
230+
retrieved_buffers.get(req[0]) for req in requests
231231
)
232+
232233
if zarr_json_bytes is not None and zarray_bytes is not None:
233234
# warn and favor v3
234235
msg = f"Both zarr.json (Zarr format 3) and .zarray (Zarr format 2) metadata objects exist at {store_path}. Zarr v3 will be used."

src/zarr/core/group.py

Lines changed: 25 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -513,19 +513,23 @@ async def open(
513513
consolidated_key = use_consolidated
514514

515515
if zarr_format == 2:
516-
paths = [store_path / ZGROUP_JSON, store_path / ZATTRS_JSON]
516+
requests = [
517+
(key, default_buffer_prototype(), None) for key in [ZGROUP_JSON, ZATTRS_JSON]
518+
]
517519
if use_consolidated or use_consolidated is None:
518-
paths.append(store_path / consolidated_key)
520+
requests.append((consolidated_key, default_buffer_prototype(), None))
519521

520-
zgroup_bytes, zattrs_bytes, *rest = await asyncio.gather(
521-
*[path.get() for path in paths]
522+
retrieved_buffers = {key: value async for key, value in store_path.get_many(requests)}
523+
zgroup_bytes, zattrs_bytes = (
524+
retrieved_buffers[ZGROUP_JSON],
525+
retrieved_buffers[ZATTRS_JSON],
522526
)
527+
523528
if zgroup_bytes is None:
524529
raise FileNotFoundError(store_path)
525530

526531
if use_consolidated or use_consolidated is None:
527-
maybe_consolidated_metadata_bytes = rest[0]
528-
532+
maybe_consolidated_metadata_bytes = retrieved_buffers[consolidated_key]
529533
else:
530534
maybe_consolidated_metadata_bytes = None
531535

@@ -534,17 +538,18 @@ async def open(
534538
if zarr_json_bytes is None:
535539
raise FileNotFoundError(store_path)
536540
elif zarr_format is None:
541+
requests = [
542+
(key, default_buffer_prototype(), None)
543+
for key in [ZARR_JSON, ZGROUP_JSON, ZATTRS_JSON, consolidated_key]
544+
]
545+
retrieved_buffers = {key: value async for key, value in store_path.get_many(requests)}
537546
(
538547
zarr_json_bytes,
539548
zgroup_bytes,
540549
zattrs_bytes,
541550
maybe_consolidated_metadata_bytes,
542-
) = await store_path.get_many_ordered(
543-
ZARR_JSON,
544-
ZGROUP_JSON,
545-
ZATTRS_JSON,
546-
consolidated_key,
547-
)
551+
) = tuple(retrieved_buffers.get(req[0]) for req in requests)
552+
548553
if zarr_json_bytes is not None and zgroup_bytes is not None:
549554
# warn and favor v3
550555
msg = f"Both zarr.json (Zarr format 3) and .zgroup (Zarr format 2) metadata objects exist at {store_path}. Zarr format 3 will be used."
@@ -3476,12 +3481,14 @@ async def _read_metadata_v2(store: Store, path: str) -> ArrayV2Metadata | GroupM
34763481
"""
34773482
# TODO: consider first fetching array metadata, and only fetching group metadata when we don't
34783483
# find an array
3479-
zarray_bytes, zgroup_bytes, zattrs_bytes = await store._get_many_ordered(
3480-
[
3481-
(_join_paths([path, ZARRAY_JSON]), default_buffer_prototype(), None),
3482-
(_join_paths([path, ZGROUP_JSON]), default_buffer_prototype(), None),
3483-
(_join_paths([path, ZATTRS_JSON]), default_buffer_prototype(), None),
3484-
]
3484+
requests = [
3485+
(_join_paths([path, ZARRAY_JSON]), default_buffer_prototype(), None),
3486+
(_join_paths([path, ZGROUP_JSON]), default_buffer_prototype(), None),
3487+
(_join_paths([path, ZATTRS_JSON]), default_buffer_prototype(), None),
3488+
]
3489+
retrieved_buffers = {key: value async for key, value in store._get_many(requests)}
3490+
zarray_bytes, zgroup_bytes, zattrs_bytes = tuple(
3491+
retrieved_buffers.get(req[0]) for req in requests
34853492
)
34863493

34873494
if zattrs_bytes is None:

src/zarr/storage/_common.py

Lines changed: 12 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,8 @@
2828
FSMap = None
2929

3030
if TYPE_CHECKING:
31+
from collections.abc import AsyncGenerator, Iterable
32+
3133
from zarr.core.buffer import BufferPrototype
3234

3335

@@ -164,34 +166,24 @@ async def get(
164166
prototype = default_buffer_prototype()
165167
return await self.store.get(self.path, prototype=prototype, byte_range=byte_range)
166168

167-
async def get_many_ordered(
168-
self,
169-
*path_components: str,
170-
prototype: BufferPrototype | None = None,
171-
byte_range: ByteRequest | None = None,
172-
) -> tuple[Buffer | None, ...]:
169+
async def get_many(
170+
self, requests: Iterable[tuple[str, BufferPrototype, ByteRequest | None]]
171+
) -> AsyncGenerator[tuple[str, Buffer | None], None]:
173172
"""
174173
Read multiple bytes from the store in order of the provided path_components.
175174
176175
Parameters
177176
----------
178-
path_components : str
179-
Components to append to the store path.
180-
prototype : BufferPrototype, optional
181-
The buffer prototype to use when reading the bytes.
182-
byte_range : ByteRequest, optional
183-
The range of bytes to read.
177+
requests : Iterable[tuple[str, BufferPrototype, ByteRequest | None]]
184178
185-
Returns
179+
Yields
186180
-------
187-
tuple[Buffer | None, ...]
188-
A tuple of buffers read from the store, in the order of the provided path_components.
181+
tuple[str, Buffer | None]
189182
"""
190-
if prototype is None:
191-
prototype = default_buffer_prototype()
192-
193-
tasks = [(self / component).path for component in path_components]
194-
return await self.store._get_many_ordered([(task, prototype, byte_range) for task in tasks])
183+
path_component_dict = {(self / req[0]).path: req[0] for req in requests}
184+
complete_requests = [((self / req[0]).path, *req[1:]) for req in requests]
185+
async for result in self.store._get_many(complete_requests):
186+
yield (path_component_dict[result[0]], *result[1:])
195187

196188
async def set(self, value: Buffer, byte_range: ByteRequest | None = None) -> None:
197189
"""

src/zarr/storage/_fsspec.py

Lines changed: 2 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -331,24 +331,12 @@ async def _get_many(
331331
self, requests: Iterable[tuple[str, BufferPrototype, ByteRequest | None]]
332332
) -> AsyncGenerator[tuple[str, Buffer | None], None]:
333333
if getattr(self.fs, "asynchronous", True):
334-
async for key, value in super()._get_many(requests):
335-
yield (key, value)
334+
async for result in super()._get_many(requests):
335+
yield result
336336
else:
337337
for key, prototype, byte_range in requests:
338338
value = await self.get(key, prototype, byte_range)
339339
yield (key, value)
340-
341-
async def _get_many_ordered(
342-
self, requests: Iterable[tuple[str, BufferPrototype, ByteRequest | None]]
343-
) -> tuple[Buffer | None, ...]:
344-
if getattr(self.fs, "asynchronous", True):
345-
return await super()._get_many_ordered(requests)
346-
else:
347-
results = []
348-
for key, prototype, byte_range in requests:
349-
value = await self.get(key, prototype, byte_range)
350-
results.append(value)
351-
return tuple(results)
352340

353341
async def set(
354342
self,

src/zarr/testing/store.py

Lines changed: 0 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -265,30 +265,6 @@ async def test_get_many(self, store: S) -> None:
265265
expected_kvs = sorted(((k, b) for k, b in zip(keys, values, strict=False)))
266266
assert observed_kvs == expected_kvs
267267

268-
async def test_get_many_ordered(self, store: S) -> None:
269-
"""
270-
Ensure that multiple keys can be retrieved at once with the _get_many method,
271-
preserving the order of keys.
272-
"""
273-
keys = tuple(map(str, range(10)))
274-
values = tuple(f"{k}".encode() for k in keys)
275-
276-
for k, v in zip(keys, values, strict=False):
277-
await self.set(store, k, self.buffer_cls.from_bytes(v))
278-
279-
observed_buffers = await store._get_many_ordered(
280-
tuple(
281-
zip(
282-
keys,
283-
(default_buffer_prototype(),) * len(keys),
284-
(None,) * len(keys),
285-
strict=False,
286-
)
287-
)
288-
)
289-
observed_bytes = tuple(b.to_bytes() for b in observed_buffers if b is not None)
290-
assert observed_bytes == values, f"Expected {values}, got {observed_bytes}"
291-
292268
@pytest.mark.parametrize("key", ["c/0", "foo/c/0.0", "foo/0/0"])
293269
@pytest.mark.parametrize("data", [b"\x01\x02\x03\x04", b""])
294270
async def test_getsize(self, store: S, key: str, data: bytes) -> None:

tests/test_store/test_fsspec.py

Lines changed: 1 addition & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -532,25 +532,10 @@ async def test_get_many_synchronous_fs(self):
532532
results = []
533533
async for k, v in self.store_sync._get_many(requests):
534534
results.append((k, v.to_bytes() if v else None))
535-
# Results should already be in the same order as requests
535+
# In the synchronous case, results should be in the same order as requests
536536

537537
assert results == true_results
538538

539-
async def test_get_many_ordered_synchronous_fs(self):
540-
requests, true_results = self.get_requests_and_true_results()
541-
542-
results = await self.store_sync._get_many_ordered(requests)
543-
results = [value.to_bytes() if value else None for value in results]
544-
545-
assert results == [value[1] for value in true_results]
546-
547-
async def test_get_many_ordered_asynchronous_fs(self):
548-
requests, true_results = self.get_requests_and_true_results()
549-
550-
results = await self.store_async._get_many_ordered(requests)
551-
results = [value.to_bytes() if value else None for value in results]
552-
553-
assert results == [value[1] for value in true_results]
554539

555540
async def test_asynchronous_locked_fs_raises(self):
556541
store = LockableFileSystem(asynchronous=True, lock=True).get_store(path="root")
@@ -559,6 +544,3 @@ async def test_asynchronous_locked_fs_raises(self):
559544
with pytest.raises(RuntimeError, match="Concurrent requests!"):
560545
async for _, _ in store._get_many(requests):
561546
pass
562-
563-
with pytest.raises(RuntimeError, match="Concurrent requests!"):
564-
await store._get_many_ordered(requests)

0 commit comments

Comments
 (0)