Skip to content

Commit 0359f17

Browse files
authored
ENH: use native filesystem (if available) for read_parquet with pyarrow engine (#51609)
* ENH: Add filesystem to read_parquet/to_parquet * Add to to_parquet * Bump fsspec * fix import * Mock gcs to local for parquet * Fix condidition, add whatsnew * address tests, bump gcsfs * bump s3fs * Fix doc issues * Try without fsspec wrapper * Revert "Try without fsspec wrapper" This reverts commit 7ec7d75. * Returns a tuple * Don't wrap in fsspec, undo deps bump * Fix whatsnew * Add validations for filesystem * Validate that mock filesystem is used * Undo install.rst * Try this * Make global again? * Try this * Address review * Fix test * Use localfilesystem correctly * use absolute
1 parent fb754d7 commit 0359f17

File tree

4 files changed

+168
-16
lines changed

4 files changed

+168
-16
lines changed

doc/source/whatsnew/v2.1.0.rst

Lines changed: 2 additions & 1 deletion
Original file line numberOriginal file lineDiff line numberDiff line change
@@ -113,7 +113,8 @@ Performance improvements
113
- Performance improvement in :meth:`DataFrame.clip` and :meth:`Series.clip` (:issue:`51472`)
113
- Performance improvement in :meth:`DataFrame.clip` and :meth:`Series.clip` (:issue:`51472`)
114
- Performance improvement in :meth:`DataFrame.first_valid_index` and :meth:`DataFrame.last_valid_index` for extension array dtypes (:issue:`51549`)
114
- Performance improvement in :meth:`DataFrame.first_valid_index` and :meth:`DataFrame.last_valid_index` for extension array dtypes (:issue:`51549`)
115
- Performance improvement in :meth:`DataFrame.where` when ``cond`` is backed by an extension dtype (:issue:`51574`)
115
- Performance improvement in :meth:`DataFrame.where` when ``cond`` is backed by an extension dtype (:issue:`51574`)
116-
- Performance improvement in :meth:`read_orc` when reading a remote URI file path. (:issue:`51609`)
116+
- Performance improvement in :func:`read_orc` when reading a remote URI file path. (:issue:`51609`)
117+
- Performance improvement in :func:`read_parquet` and :meth:`DataFrame.to_parquet` when reading a remote file with ``engine="pyarrow"`` (:issue:`51609`)
117
- Performance improvement in :meth:`MultiIndex.sortlevel` when ``ascending`` is a list (:issue:`51612`)
118
- Performance improvement in :meth:`MultiIndex.sortlevel` when ``ascending`` is a list (:issue:`51612`)
118
- Performance improvement in :meth:`~arrays.ArrowExtensionArray.isna` when array has zero nulls or is all nulls (:issue:`51630`)
119
- Performance improvement in :meth:`~arrays.ArrowExtensionArray.isna` when array has zero nulls or is all nulls (:issue:`51630`)
119
- Performance improvement when parsing strings to ``boolean[pyarrow]`` dtype (:issue:`51730`)
120
- Performance improvement when parsing strings to ``boolean[pyarrow]`` dtype (:issue:`51730`)

pandas/io/parquet.py

Lines changed: 88 additions & 11 deletions
Original file line numberOriginal file lineDiff line numberDiff line change
@@ -90,12 +90,39 @@ def _get_path_or_handle(
90
]:
90
]:
91
"""File handling for PyArrow."""
91
"""File handling for PyArrow."""
92
path_or_handle = stringify_path(path)
92
path_or_handle = stringify_path(path)
93+
if fs is not None:
94+
pa_fs = import_optional_dependency("pyarrow.fs", errors="ignore")
95+
fsspec = import_optional_dependency("fsspec", errors="ignore")
96+
if pa_fs is None and fsspec is None:
97+
raise ValueError(
98+
f"filesystem must be a pyarrow or fsspec FileSystem, "
99+
f"not a {type(fs).__name__}"
100+
)
101+
elif (pa_fs is not None and not isinstance(fs, pa_fs.FileSystem)) and (
102+
fsspec is not None and not isinstance(fs, fsspec.spec.AbstractFileSystem)
103+
):
104+
raise ValueError(
105+
f"filesystem must be a pyarrow or fsspec FileSystem, "
106+
f"not a {type(fs).__name__}"
107+
)
108+
elif pa_fs is not None and isinstance(fs, pa_fs.FileSystem) and storage_options:
109+
raise NotImplementedError(
110+
"storage_options not supported with a pyarrow FileSystem."
111+
)
93
if is_fsspec_url(path_or_handle) and fs is None:
112
if is_fsspec_url(path_or_handle) and fs is None:
94-
fsspec = import_optional_dependency("fsspec")
113+
if storage_options is None:
114+
pa = import_optional_dependency("pyarrow")
115+
pa_fs = import_optional_dependency("pyarrow.fs")
95

116

96-
fs, path_or_handle = fsspec.core.url_to_fs(
117+
try:
97-
path_or_handle, **(storage_options or {})
118+
fs, path_or_handle = pa_fs.FileSystem.from_uri(path)
98-
)
119+
except (TypeError, pa.ArrowInvalid):
120+
pass
121+
if fs is None:
122+
fsspec = import_optional_dependency("fsspec")
123+
fs, path_or_handle = fsspec.core.url_to_fs(
124+
path_or_handle, **(storage_options or {})
125+
)
99
elif storage_options and (not is_url(path_or_handle) or mode != "rb"):
126
elif storage_options and (not is_url(path_or_handle) or mode != "rb"):
100
# can't write to a remote url
127
# can't write to a remote url
101
# without making use of fsspec at the moment
128
# without making use of fsspec at the moment
@@ -173,6 +200,7 @@ def write(
173
index: bool | None = None,
200
index: bool | None = None,
174
storage_options: StorageOptions = None,
201
storage_options: StorageOptions = None,
175
partition_cols: list[str] | None = None,
202
partition_cols: list[str] | None = None,
203+
filesystem=None,
176
**kwargs,
204
**kwargs,
177
) -> None:
205
) -> None:
178
self.validate_dataframe(df)
206
self.validate_dataframe(df)
@@ -183,9 +211,9 @@ def write(
183

211

184
table = self.api.Table.from_pandas(df, **from_pandas_kwargs)
212
table = self.api.Table.from_pandas(df, **from_pandas_kwargs)
185

213

186-
path_or_handle, handles, kwargs["filesystem"] = _get_path_or_handle(
214+
path_or_handle, handles, filesystem = _get_path_or_handle(
187
path,
215
path,
188-
kwargs.pop("filesystem", None),
216+
filesystem,
189
storage_options=storage_options,
217
storage_options=storage_options,
190
mode="wb",
218
mode="wb",
191
is_dir=partition_cols is not None,
219
is_dir=partition_cols is not None,
@@ -207,12 +235,17 @@ def write(
207
path_or_handle,
235
path_or_handle,
208
compression=compression,
236
compression=compression,
209
partition_cols=partition_cols,
237
partition_cols=partition_cols,
238+
filesystem=filesystem,
210
**kwargs,
239
**kwargs,
211
)
240
)
212
else:
241
else:
213
# write to single output file
242
# write to single output file
214
self.api.parquet.write_table(
243
self.api.parquet.write_table(
215-
table, path_or_handle, compression=compression, **kwargs
244+
table,
245+
path_or_handle,
246+
compression=compression,
247+
filesystem=filesystem,
248+
**kwargs,
216
)
249
)
217
finally:
250
finally:
218
if handles is not None:
251
if handles is not None:
@@ -225,6 +258,7 @@ def read(
225
use_nullable_dtypes: bool = False,
258
use_nullable_dtypes: bool = False,
226
dtype_backend: DtypeBackend | lib.NoDefault = lib.no_default,
259
dtype_backend: DtypeBackend | lib.NoDefault = lib.no_default,
227
storage_options: StorageOptions = None,
260
storage_options: StorageOptions = None,
261+
filesystem=None,
228
**kwargs,
262
**kwargs,
229
) -> DataFrame:
263
) -> DataFrame:
230
kwargs["use_pandas_metadata"] = True
264
kwargs["use_pandas_metadata"] = True
@@ -242,15 +276,15 @@ def read(
242
if manager == "array":
276
if manager == "array":
243
to_pandas_kwargs["split_blocks"] = True # type: ignore[assignment]
277
to_pandas_kwargs["split_blocks"] = True # type: ignore[assignment]
244

278

245-
path_or_handle, handles, kwargs["filesystem"] = _get_path_or_handle(
279+
path_or_handle, handles, filesystem = _get_path_or_handle(
246
path,
280
path,
247-
kwargs.pop("filesystem", None),
281+
filesystem,
248
storage_options=storage_options,
282
storage_options=storage_options,
249
mode="rb",
283
mode="rb",
250
)
284
)
251
try:
285
try:
252
pa_table = self.api.parquet.read_table(
286
pa_table = self.api.parquet.read_table(
253-
path_or_handle, columns=columns, **kwargs
287+
path_or_handle, columns=columns, filesystem=filesystem, **kwargs
254
)
288
)
255
result = pa_table.to_pandas(**to_pandas_kwargs)
289
result = pa_table.to_pandas(**to_pandas_kwargs)
256

290

@@ -279,6 +313,7 @@ def write(
279
index=None,
313
index=None,
280
partition_cols=None,
314
partition_cols=None,
281
storage_options: StorageOptions = None,
315
storage_options: StorageOptions = None,
316+
filesystem=None,
282
**kwargs,
317
**kwargs,
283
) -> None:
318
) -> None:
284
self.validate_dataframe(df)
319
self.validate_dataframe(df)
@@ -294,6 +329,11 @@ def write(
294
if partition_cols is not None:
329
if partition_cols is not None:
295
kwargs["file_scheme"] = "hive"
330
kwargs["file_scheme"] = "hive"
296

331

332+
if filesystem is not None:
333+
raise NotImplementedError(
334+
"filesystem is not implemented for the fastparquet engine."
335+
)
336+
297
# cannot use get_handle as write() does not accept file buffers
337
# cannot use get_handle as write() does not accept file buffers
298
path = stringify_path(path)
338
path = stringify_path(path)
299
if is_fsspec_url(path):
339
if is_fsspec_url(path):
@@ -319,7 +359,12 @@ def write(
319
)
359
)
320

360

321
def read(
361
def read(
322-
self, path, columns=None, storage_options: StorageOptions = None, **kwargs
362+
self,
363+
path,
364+
columns=None,
365+
storage_options: StorageOptions = None,
366+
filesystem=None,
367+
**kwargs,
323
) -> DataFrame:
368
) -> DataFrame:
324
parquet_kwargs: dict[str, Any] = {}
369
parquet_kwargs: dict[str, Any] = {}
325
use_nullable_dtypes = kwargs.pop("use_nullable_dtypes", False)
370
use_nullable_dtypes = kwargs.pop("use_nullable_dtypes", False)
@@ -337,6 +382,10 @@ def read(
337
"The 'dtype_backend' argument is not supported for the "
382
"The 'dtype_backend' argument is not supported for the "
338
"fastparquet engine"
383
"fastparquet engine"
339
)
384
)
385+
if filesystem is not None:
386+
raise NotImplementedError(
387+
"filesystem is not implemented for the fastparquet engine."
388+
)
340
path = stringify_path(path)
389
path = stringify_path(path)
341
handles = None
390
handles = None
342
if is_fsspec_url(path):
391
if is_fsspec_url(path):
@@ -376,6 +425,7 @@ def to_parquet(
376
index: bool | None = None,
425
index: bool | None = None,
377
storage_options: StorageOptions = None,
426
storage_options: StorageOptions = None,
378
partition_cols: list[str] | None = None,
427
partition_cols: list[str] | None = None,
428+
filesystem: Any = None,
379
**kwargs,
429
**kwargs,
380
) -> bytes | None:
430
) -> bytes | None:
381
"""
431
"""
@@ -398,6 +448,12 @@ def to_parquet(
398
``io.parquet.engine`` is used. The default ``io.parquet.engine``
448
``io.parquet.engine`` is used. The default ``io.parquet.engine``
399
behavior is to try 'pyarrow', falling back to 'fastparquet' if
449
behavior is to try 'pyarrow', falling back to 'fastparquet' if
400
'pyarrow' is unavailable.
450
'pyarrow' is unavailable.
451+
452+
When using the ``'pyarrow'`` engine and no storage options are provided
453+
and a filesystem is implemented by both ``pyarrow.fs`` and ``fsspec``
454+
(e.g. "s3://"), then the ``pyarrow.fs`` filesystem is attempted first.
455+
Use the filesystem keyword with an instantiated fsspec filesystem
456+
if you wish to use its implementation.
401
compression : {{'snappy', 'gzip', 'brotli', 'lz4', 'zstd', None}},
457
compression : {{'snappy', 'gzip', 'brotli', 'lz4', 'zstd', None}},
402
default 'snappy'. Name of the compression to use. Use ``None``
458
default 'snappy'. Name of the compression to use. Use ``None``
403
for no compression. The supported compression methods actually
459
for no compression. The supported compression methods actually
@@ -420,6 +476,12 @@ def to_parquet(
420
476
421
.. versionadded:: 1.2.0
477
.. versionadded:: 1.2.0
422
478
479+
filesystem : fsspec or pyarrow filesystem, default None
480+
Filesystem object to use when reading the parquet file. Only implemented
481+
for ``engine="pyarrow"``.
482+
483+
.. versionadded:: 2.1.0
484+
423
kwargs
485
kwargs
424
Additional keyword arguments passed to the engine
486
Additional keyword arguments passed to the engine
425
487
@@ -440,6 +502,7 @@ def to_parquet(
440
index=index,
502
index=index,
441
partition_cols=partition_cols,
503
partition_cols=partition_cols,
442
storage_options=storage_options,
504
storage_options=storage_options,
505+
filesystem=filesystem,
443
**kwargs,
506
**kwargs,
444
)
507
)
445

508

@@ -458,6 +521,7 @@ def read_parquet(
458
storage_options: StorageOptions = None,
521
storage_options: StorageOptions = None,
459
use_nullable_dtypes: bool | lib.NoDefault = lib.no_default,
522
use_nullable_dtypes: bool | lib.NoDefault = lib.no_default,
460
dtype_backend: DtypeBackend | lib.NoDefault = lib.no_default,
523
dtype_backend: DtypeBackend | lib.NoDefault = lib.no_default,
524+
filesystem: Any = None,
461
**kwargs,
525
**kwargs,
462
) -> DataFrame:
526
) -> DataFrame:
463
"""
527
"""
@@ -480,6 +544,12 @@ def read_parquet(
480
``io.parquet.engine`` is used. The default ``io.parquet.engine``
544
``io.parquet.engine`` is used. The default ``io.parquet.engine``
481
behavior is to try 'pyarrow', falling back to 'fastparquet' if
545
behavior is to try 'pyarrow', falling back to 'fastparquet' if
482
'pyarrow' is unavailable.
546
'pyarrow' is unavailable.
547+
548+
When using the ``'pyarrow'`` engine and no storage options are provided
549+
and a filesystem is implemented by both ``pyarrow.fs`` and ``fsspec``
550+
(e.g. "s3://"), then the ``pyarrow.fs`` filesystem is attempted first.
551+
Use the filesystem keyword with an instantiated fsspec filesystem
552+
if you wish to use its implementation.
483
columns : list, default=None
553
columns : list, default=None
484
If not None, only these columns will be read from the file.
554
If not None, only these columns will be read from the file.
485
555
@@ -508,6 +578,12 @@ def read_parquet(
508
578
509
.. versionadded:: 2.0
579
.. versionadded:: 2.0
510
580
581+
filesystem : fsspec or pyarrow filesystem, default None
582+
Filesystem object to use when reading the parquet file. Only implemented
583+
for ``engine="pyarrow"``.
584+
585+
.. versionadded:: 2.1.0
586+
511
**kwargs
587
**kwargs
512
Any additional kwargs are passed to the engine.
588
Any additional kwargs are passed to the engine.
513
589
@@ -537,5 +613,6 @@ def read_parquet(
537
storage_options=storage_options,
613
storage_options=storage_options,
538
use_nullable_dtypes=use_nullable_dtypes,
614
use_nullable_dtypes=use_nullable_dtypes,
539
dtype_backend=dtype_backend,
615
dtype_backend=dtype_backend,
616+
filesystem=filesystem,
540
**kwargs,
617
**kwargs,
541
)
618
)

pandas/tests/io/test_gcs.py

Lines changed: 18 additions & 4 deletions
Original file line numberOriginal file lineDiff line numberDiff line change
@@ -1,5 +1,6 @@
1
from io import BytesIO
1
from io import BytesIO
2
import os
2
import os
3+
import pathlib
3
import tarfile
4
import tarfile
4
import zipfile
5
import zipfile
5

6

@@ -20,7 +21,7 @@
20

21

21

22

22
@pytest.fixture
23
@pytest.fixture
23-
def gcs_buffer(monkeypatch):
24+
def gcs_buffer():
24
"""Emulate GCS using a binary buffer."""
25
"""Emulate GCS using a binary buffer."""
25
import fsspec
26
import fsspec
26

27

@@ -45,7 +46,7 @@ def ls(self, path, **kwargs):
45

46

46
@td.skip_if_no("gcsfs")
47
@td.skip_if_no("gcsfs")
47
@pytest.mark.parametrize("format", ["csv", "json", "parquet", "excel", "markdown"])
48
@pytest.mark.parametrize("format", ["csv", "json", "parquet", "excel", "markdown"])
48-
def test_to_read_gcs(gcs_buffer, format):
49+
def test_to_read_gcs(gcs_buffer, format, monkeypatch, capsys):
49
"""
50
"""
50
Test that many to/read functions support GCS.
51
Test that many to/read functions support GCS.
51
52
@@ -75,8 +76,21 @@ def test_to_read_gcs(gcs_buffer, format):
75
df2 = read_json(path, convert_dates=["dt"])
76
df2 = read_json(path, convert_dates=["dt"])
76
elif format == "parquet":
77
elif format == "parquet":
77
pytest.importorskip("pyarrow")
78
pytest.importorskip("pyarrow")
78-
df1.to_parquet(path)
79+
pa_fs = pytest.importorskip("pyarrow.fs")
79-
df2 = read_parquet(path)
80+
81+
class MockFileSystem(pa_fs.FileSystem):
82+
@staticmethod
83+
def from_uri(path):
84+
print("Using pyarrow filesystem")
85+
to_local = pathlib.Path(path.replace("gs://", "")).absolute().as_uri()
86+
return pa_fs.LocalFileSystem(to_local)
87+
88+
with monkeypatch.context() as m:
89+
m.setattr(pa_fs, "FileSystem", MockFileSystem)
90+
df1.to_parquet(path)
91+
df2 = read_parquet(path)
92+
captured = capsys.readouterr()
93+
assert captured.out == "Using pyarrow filesystem\nUsing pyarrow filesystem\n"
80
elif format == "markdown":
94
elif format == "markdown":
81
pytest.importorskip("tabulate")
95
pytest.importorskip("tabulate")
82
df1.to_markdown(path)
96
df1.to_markdown(path)

pandas/tests/io/test_parquet.py

Lines changed: 60 additions & 0 deletions
Original file line numberOriginal file lineDiff line numberDiff line change
@@ -1211,6 +1211,66 @@ def test_bytes_file_name(self, engine):
1211
result = read_parquet(path, engine=engine)
1211
result = read_parquet(path, engine=engine)
1212
tm.assert_frame_equal(result, df)
1212
tm.assert_frame_equal(result, df)
1213

1213

1214+
def test_filesystem_notimplemented(self):
1215+
pytest.importorskip("fastparquet")
1216+
df = pd.DataFrame(data={"A": [0, 1], "B": [1, 0]})
1217+
with tm.ensure_clean() as path:
1218+
with pytest.raises(
1219+
NotImplementedError, match="filesystem is not implemented"
1220+
):
1221+
df.to_parquet(path, engine="fastparquet", filesystem="foo")
1222+
1223+
with tm.ensure_clean() as path:
1224+
pathlib.Path(path).write_bytes(b"foo")
1225+
with pytest.raises(
1226+
NotImplementedError, match="filesystem is not implemented"
1227+
):
1228+
read_parquet(path, engine="fastparquet", filesystem="foo")
1229+
1230+
def test_invalid_filesystem(self):
1231+
pytest.importorskip("pyarrow")
1232+
df = pd.DataFrame(data={"A": [0, 1], "B": [1, 0]})
1233+
with tm.ensure_clean() as path:
1234+
with pytest.raises(
1235+
ValueError, match="filesystem must be a pyarrow or fsspec FileSystem"
1236+
):
1237+
df.to_parquet(path, engine="pyarrow", filesystem="foo")
1238+
1239+
with tm.ensure_clean() as path:
1240+
pathlib.Path(path).write_bytes(b"foo")
1241+
with pytest.raises(
1242+
ValueError, match="filesystem must be a pyarrow or fsspec FileSystem"
1243+
):
1244+
read_parquet(path, engine="pyarrow", filesystem="foo")
1245+
1246+
def test_unsupported_pa_filesystem_storage_options(self):
1247+
pa_fs = pytest.importorskip("pyarrow.fs")
1248+
df = pd.DataFrame(data={"A": [0, 1], "B": [1, 0]})
1249+
with tm.ensure_clean() as path:
1250+
with pytest.raises(
1251+
NotImplementedError,
1252+
match="storage_options not supported with a pyarrow FileSystem.",
1253+
):
1254+
df.to_parquet(
1255+
path,
1256+
engine="pyarrow",
1257+
filesystem=pa_fs.LocalFileSystem(),
1258+
storage_options={"foo": "bar"},
1259+
)
1260+
1261+
with tm.ensure_clean() as path:
1262+
pathlib.Path(path).write_bytes(b"foo")
1263+
with pytest.raises(
1264+
NotImplementedError,
1265+
match="storage_options not supported with a pyarrow FileSystem.",
1266+
):
1267+
read_parquet(
1268+
path,
1269+
engine="pyarrow",
1270+
filesystem=pa_fs.LocalFileSystem(),
1271+
storage_options={"foo": "bar"},
1272+
)
1273+
1214
def test_invalid_dtype_backend(self, engine):
1274
def test_invalid_dtype_backend(self, engine):
1215
msg = (
1275
msg = (
1216
"dtype_backend numpy is invalid, only 'numpy_nullable' and "
1276
"dtype_backend numpy is invalid, only 'numpy_nullable' and "

0 commit comments

Comments
 (0)