Skip to content

fix(stream_io): Finalize temporary files correctly on __exit__ #72

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Jan 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 17 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,21 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [Unreleased]

### Fixed

- File objects opened with `stream_io.open_stream("s3://...", "wb")` for writing
to object storage now correctly upload their content when closed implicitly
at the end of a `with` block, without requiring an explicit call to their
`.close()` method
- Since `TensorSerializer` objects already call `.close()` explicitly on
their output file objects, either when `TensorSerializer.close()` is invoked
or when the `TensorSerializer` is garbage collected, this bug mainly applies
to manual usage of `stream_io.open_stream()` for object storage uploads
not involving a `TensorSerializer`


## [2.7.1] - 2023-12-06

### Fixed
Expand Down Expand Up @@ -265,7 +280,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- `get_gpu_name`
- `no_init_or_tensor`

[2.7.0]: https://github.com/coreweave/tensorizer/compare/v2.7.0...v2.7.1
[Unreleased]: https://github.com/coreweave/tensorizer/compare/v2.7.1...HEAD
[2.7.1]: https://github.com/coreweave/tensorizer/compare/v2.7.0...v2.7.1
[2.7.0]: https://github.com/coreweave/tensorizer/compare/v2.6.0...v2.7.0
[2.6.0]: https://github.com/coreweave/tensorizer/compare/v2.5.1...v2.6.0
[2.5.1]: https://github.com/coreweave/tensorizer/compare/v2.5.0...v2.5.1
Expand Down
2 changes: 1 addition & 1 deletion tensorizer/_version.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = "2.7.1"
__version__ = "2.7.2"
50 changes: 35 additions & 15 deletions tensorizer/stream_io.py
Original file line number Diff line number Diff line change
Expand Up @@ -1053,9 +1053,9 @@ def _infer_credentials(
def _temp_file_closer(file: io.IOBase, file_name: str, *upload_args):
"""
Close, upload by name, and then delete the file.
Meant to replace .close() on a particular instance
of a temporary file-like wrapper object, as an unbound
callback to a weakref.finalize() registration on the wrapper.
Meant to be placed as a hook before both .close() and .__exit__()
on a particular instance of a temporary file-like wrapper object,
as a callback to a weakref.finalize() registration on the wrapper.

The reason this implementation is necessary is really complicated.

Expand All @@ -1077,17 +1077,6 @@ def _temp_file_closer(file: io.IOBase, file_name: str, *upload_args):
so they have to buffer it all in memory.
"""

if file.closed:
# Makes closure idempotent.

# If the file object is used as a context
# manager, close() is called twice (once in the
# serializer code, once after, when leaving the
# context).

# Without this check, this would trigger two
# separate uploads.
return
try:
file.close()
s3_upload(file_name, *upload_args)
Expand Down Expand Up @@ -1281,6 +1270,9 @@ def open_stream(
# with primitive temporary file support (e.g. Windows)
temp_file = tempfile.NamedTemporaryFile(mode="wb+", delete=False)

# Attach a callback to upload the temporary file when it closes.
# weakref finalizers are idempotent, so this upload callback
# is guaranteed to run at most once.
guaranteed_closer = weakref.finalize(
temp_file,
_temp_file_closer,
Expand All @@ -1291,7 +1283,35 @@ def open_stream(
s3_secret_access_key,
s3_endpoint,
)
temp_file.close = guaranteed_closer

# Always run the close + upload procedure
# before any code from Python's NamedTemporaryFile wrapper.
# It isn't safe to call a bound method from a weakref finalizer,
# but calling a weakref finalizer alongside a bound method
# creates no problems, other than that the code outside the
# finalizer is not guaranteed to be run at any point.
# In this case, the weakref finalizer performs all necessary
# cleanup itself, but the original NamedTemporaryFile methods
# are invoked as well, just in case.
wrapped_close = temp_file.close

def close_wrapper():
guaranteed_closer()
return wrapped_close()

# Python 3.12+ doesn't call NamedTemporaryFile.close() during
# .__exit__(), so it must be wrapped separately.
# Since guaranteed_closer is idempotent, it's fine to call it in
# both methods, even if both are called back-to-back.
wrapped_exit = temp_file.__exit__

def exit_wrapper(exc, value, tb):
guaranteed_closer()
return wrapped_exit(exc, value, tb)

temp_file.close = close_wrapper
temp_file.__exit__ = exit_wrapper

return temp_file
else:
s3_endpoint = s3_endpoint or default_s3_read_endpoint
Expand Down
2 changes: 1 addition & 1 deletion tests/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
transformers>=4.27.1
moto[s3,server]>=4.1.4
moto[s3,server]>=4.1.4,<5.0.0
redis>=5.0.0
hiredis>=2.2.0