build() throws IOException {
+ if (this.s3Client == null || this.bucket == null || this.key == null || this.serializer == null) {
+ throw new IllegalArgumentException("S3Client, bucket, key, and serializer must be provided");
+ }
+ OutputStream outputStream;
+ if (this.multipartUpload) {
+ S3MultipartUploader s3MultipartUploader = new S3MultipartUploader(this.s3Client, this.bucket, this.key);
+ if (this.contentType != null) {
+ s3MultipartUploader.setContentType(this.contentType);
+ }
+ if (this.partSize != null) {
+ s3MultipartUploader.setPartSize(this.partSize);
+ }
+
+ outputStream = new S3MultipartOutputStream(s3MultipartUploader);
+ }
+ else {
+ outputStream = new S3OutputStream(this.s3Client, this.bucket, this.key);
+ if (this.contentType != null) {
+ ((S3OutputStream) outputStream).setContentType(this.contentType);
+ }
+ }
+
+ return new S3ItemWriter<>(outputStream, this.serializer);
+ }
+}
diff --git a/spring-batch-s3/src/main/java/org/springframework/batch/extensions/s3/builder/package-info.java b/spring-batch-s3/src/main/java/org/springframework/batch/extensions/s3/builder/package-info.java
new file mode 100644
index 00000000..10cab50f
--- /dev/null
+++ b/spring-batch-s3/src/main/java/org/springframework/batch/extensions/s3/builder/package-info.java
@@ -0,0 +1,28 @@
+/*
+ * Copyright 2025 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Provides support for reading items from Amazon S3 using a stream-based approach. This
+ * package includes classes for reading items from S3 objects, deserializing them, and
+ * handling the input stream efficiently.
+ *
+ *
+ * Classes in this package are designed to work with the AWS SDK for Java and provide a
+ * convenient way to read large datasets stored in S3 without loading them entirely into
+ * memory.
+ */
+
+package org.springframework.batch.extensions.s3.builder;
diff --git a/spring-batch-s3/src/main/java/org/springframework/batch/extensions/s3/package-info.java b/spring-batch-s3/src/main/java/org/springframework/batch/extensions/s3/package-info.java
new file mode 100644
index 00000000..9c7dbb3a
--- /dev/null
+++ b/spring-batch-s3/src/main/java/org/springframework/batch/extensions/s3/package-info.java
@@ -0,0 +1,28 @@
+/*
+ * Copyright 2025 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Provides support for reading items from Amazon S3 using a stream-based approach. This
+ * package includes classes for reading items from S3 objects, deserializing them, and
+ * handling the input stream efficiently.
+ *
+ *
+ * Classes in this package are designed to work with the AWS SDK for Java and provide a
+ * convenient way to read large datasets stored in S3 without loading them entirely into
+ * memory.
+ */
+
+package org.springframework.batch.extensions.s3;
diff --git a/spring-batch-s3/src/main/java/org/springframework/batch/extensions/s3/serializer/S3Deserializer.java b/spring-batch-s3/src/main/java/org/springframework/batch/extensions/s3/serializer/S3Deserializer.java
new file mode 100644
index 00000000..aef87dfc
--- /dev/null
+++ b/spring-batch-s3/src/main/java/org/springframework/batch/extensions/s3/serializer/S3Deserializer.java
@@ -0,0 +1,39 @@
+/*
+ * Copyright 2025 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.springframework.batch.extensions.s3.serializer;
+
+/**
+ * A functional interface for serializing items to byte arrays for S3 storage.
+ * Implementations should provide a way to convert an item of type T into a byte array.
+ *
+ * @param the type of items to be serialized
+ * @author Andrea Cioni
+ */
+@FunctionalInterface
+public interface S3Deserializer {
+
+ /**
+ * Deserialize a byte array into an object of type T. It is not guaranteed that the
+ * call to this method will always return a non-null value. This can happen if the
+ * byte array is either empty or it doesn't represent a valid object of type T yet.
+ * For this reason the implementation of this method should be stateful.
+ * @param buffer the byte array to deserialize
+ * @return the deserialized object
+ */
+ T deserialize(byte[] buffer);
+
+}
diff --git a/spring-batch-s3/src/main/java/org/springframework/batch/extensions/s3/serializer/S3Serializer.java b/spring-batch-s3/src/main/java/org/springframework/batch/extensions/s3/serializer/S3Serializer.java
new file mode 100644
index 00000000..838bf393
--- /dev/null
+++ b/spring-batch-s3/src/main/java/org/springframework/batch/extensions/s3/serializer/S3Serializer.java
@@ -0,0 +1,31 @@
+/*
+ * Copyright 2025 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.springframework.batch.extensions.s3.serializer;
+
+/**
+ * A functional interface for serializing items to byte arrays for S3 storage.
+ * Implementations should provide a way to convert an item of type T into a byte array.
+ *
+ * @param the type of items to be serialized
+ * @author Andrea Cioni
+ */
+@FunctionalInterface
+public interface S3Serializer {
+
+ byte[] serialize(T item);
+
+}
diff --git a/spring-batch-s3/src/main/java/org/springframework/batch/extensions/s3/serializer/S3StringDeserializer.java b/spring-batch-s3/src/main/java/org/springframework/batch/extensions/s3/serializer/S3StringDeserializer.java
new file mode 100644
index 00000000..ce024d8c
--- /dev/null
+++ b/spring-batch-s3/src/main/java/org/springframework/batch/extensions/s3/serializer/S3StringDeserializer.java
@@ -0,0 +1,65 @@
+/*
+ * Copyright 2025 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.springframework.batch.extensions.s3.serializer;
+
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
+
+/**
+ * Simple deserializer for String items from S3. It reads lines from a byte array,
+ * handling both \n and \r\n line endings.
+ * This is intended to be used with S3ItemReader to read text data from S3 objects.
+ *
+ * @author Andrea Cioni
+ */
+public class S3StringDeserializer implements S3Deserializer {
+
+ final Charset charset;
+
+ private StringBuilder stringBuilder = new StringBuilder();
+
+ public S3StringDeserializer() {
+ this.charset = StandardCharsets.UTF_8;
+ }
+
+ public S3StringDeserializer(Charset charset) {
+ this.charset = charset;
+ }
+
+ @Override
+ public String deserialize(byte[] buffer) {
+ String incoming = new String(buffer, this.charset);
+ this.stringBuilder.append(incoming);
+
+ int newlineIdx = this.stringBuilder.indexOf("\n");
+ if (newlineIdx == -1) {
+ return null;
+ }
+
+ // Handle both \n and \r\n line endings
+ int lineEnd = newlineIdx;
+ if (newlineIdx > 0 && this.stringBuilder.charAt(newlineIdx - 1) == '\r') {
+ lineEnd--;
+ }
+
+ String line = this.stringBuilder.substring(0, lineEnd);
+ this.stringBuilder = new StringBuilder(this.stringBuilder.substring(newlineIdx + 1));
+
+ return line;
+ }
+
+}
diff --git a/spring-batch-s3/src/main/java/org/springframework/batch/extensions/s3/serializer/S3StringSerializer.java b/spring-batch-s3/src/main/java/org/springframework/batch/extensions/s3/serializer/S3StringSerializer.java
new file mode 100644
index 00000000..6669adf9
--- /dev/null
+++ b/spring-batch-s3/src/main/java/org/springframework/batch/extensions/s3/serializer/S3StringSerializer.java
@@ -0,0 +1,36 @@
+/*
+ * Copyright 2025 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.springframework.batch.extensions.s3.serializer;
+
+import java.nio.charset.StandardCharsets;
+
+/**
+ * Simple serializer for String items to be used with S3. This serializer takes a String
+ * item, appends a newline character, and converts it to a byte array using UTF-8
+ * encoding. This is intended to be used with S3ItemWriter to write text data to S3
+ * objects.
+ *
+ * @author Andrea Cioni
+ */
+public class S3StringSerializer implements S3Serializer {
+
+ @Override
+ public byte[] serialize(String item) {
+ return (item + "\n").getBytes(StandardCharsets.UTF_8);
+ }
+
+}
diff --git a/spring-batch-s3/src/main/java/org/springframework/batch/extensions/s3/serializer/package-info.java b/spring-batch-s3/src/main/java/org/springframework/batch/extensions/s3/serializer/package-info.java
new file mode 100644
index 00000000..44839b25
--- /dev/null
+++ b/spring-batch-s3/src/main/java/org/springframework/batch/extensions/s3/serializer/package-info.java
@@ -0,0 +1,27 @@
+/*
+ * Copyright 2025 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Provides support for reading items from Amazon S3 using a stream-based approach. This
+ * package includes classes for reading items from S3 objects, deserializing them, and
+ * handling the input stream efficiently.
+ *
+ *
+ * Classes in this package are designed to work with the AWS SDK for Java and provide a
+ * convenient way to read large datasets stored in S3 without loading them entirely into
+ * memory.
+ */
+package org.springframework.batch.extensions.s3.serializer;
diff --git a/spring-batch-s3/src/main/java/org/springframework/batch/extensions/s3/stream/Defaults.java b/spring-batch-s3/src/main/java/org/springframework/batch/extensions/s3/stream/Defaults.java
new file mode 100644
index 00000000..eea4061b
--- /dev/null
+++ b/spring-batch-s3/src/main/java/org/springframework/batch/extensions/s3/stream/Defaults.java
@@ -0,0 +1,27 @@
+/*
+ * Copyright 2025 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.springframework.batch.extensions.s3.stream;
+
+import org.springframework.util.unit.DataSize;
+
+final class Defaults {
+ static final int DEFAULT_PART_SIZE = (int) DataSize.ofMegabytes(5L).toBytes();
+
+ static final String DEFAULT_CONTENT_TYPE = "application/octet-stream";
+
+ private Defaults() { }
+}
diff --git a/spring-batch-s3/src/main/java/org/springframework/batch/extensions/s3/stream/S3InputStream.java b/spring-batch-s3/src/main/java/org/springframework/batch/extensions/s3/stream/S3InputStream.java
new file mode 100644
index 00000000..35398ac5
--- /dev/null
+++ b/spring-batch-s3/src/main/java/org/springframework/batch/extensions/s3/stream/S3InputStream.java
@@ -0,0 +1,78 @@
+/*
+ * Copyright 2025 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.springframework.batch.extensions.s3.stream;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.services.s3.S3Client;
+import software.amazon.awssdk.services.s3.model.GetObjectRequest;
+
+/**
+ * An {@link InputStream} that reads data from an S3 object. It uses the AWS SDK for Java
+ * to retrieve the object from S3. Is safe to use this stream for reading large files as
+ * it doesn't load the entire file into memory.
+ *
+ * @author Andrea Cioni
+ */
+public class S3InputStream extends InputStream {
+
+ private static final Logger logger = LoggerFactory.getLogger(S3InputStream.class);
+
+ private final S3Client s3;
+
+ private final String bucketName;
+
+ private final String objectKey;
+
+ private InputStream inputStream;
+
+ public S3InputStream(S3Client s3, String bucketName, String objectKey) {
+ this.s3 = s3;
+ this.bucketName = bucketName;
+ this.objectKey = objectKey;
+ }
+
+ @Override
+ public int read() throws IOException {
+ if (this.inputStream == null) {
+ this.inputStream = openS3InputStream();
+ }
+ return this.inputStream.read();
+ }
+
+ @Override
+ public void close() throws IOException {
+ logger.debug("Closing stream");
+ if (this.inputStream != null) {
+ this.inputStream.close();
+ }
+ logger.debug("Stream closed");
+ super.close();
+ }
+
+ private InputStream openS3InputStream() {
+ GetObjectRequest getObjectRequest = GetObjectRequest.builder()
+ .bucket(this.bucketName)
+ .key(this.objectKey)
+ .build();
+ return this.s3.getObject(getObjectRequest);
+ }
+
+}
diff --git a/spring-batch-s3/src/main/java/org/springframework/batch/extensions/s3/stream/S3MultipartOutputStream.java b/spring-batch-s3/src/main/java/org/springframework/batch/extensions/s3/stream/S3MultipartOutputStream.java
new file mode 100644
index 00000000..86ccac4f
--- /dev/null
+++ b/spring-batch-s3/src/main/java/org/springframework/batch/extensions/s3/stream/S3MultipartOutputStream.java
@@ -0,0 +1,123 @@
+/*
+ * Copyright 2025 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.springframework.batch.extensions.s3.stream;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.io.PipedInputStream;
+import java.io.PipedOutputStream;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.services.s3.S3Client;
+
+/**
+ * An {@link OutputStream} that writes data to an S3 object using multipart upload. It
+ * uses a {@link PipedInputStream} and a {@link PipedOutputStream} to allow writing data
+ * asynchronously while uploading it in parts. This stream is suitable for large file
+ * uploads.
+ *
+ * @author Andrea Cioni
+ */
+public class S3MultipartOutputStream extends OutputStream {
+
+ private static final Logger logger = LoggerFactory.getLogger(S3MultipartOutputStream.class);
+
+ private final PipedInputStream pipedInputStream;
+
+ private final PipedOutputStream pipedOutputStream;
+
+ private ExecutorService singleThreadExecutor;
+
+ private volatile boolean uploading;
+
+ private final S3Uploader multipartUpload;
+
+ public S3MultipartOutputStream(S3Client s3Client, String bucketName, String key) throws IOException {
+ this(new S3MultipartUploader(s3Client, bucketName, key));
+ }
+
+ public S3MultipartOutputStream(S3Uploader s3Uploader) throws IOException {
+ this.pipedInputStream = new PipedInputStream();
+ this.pipedOutputStream = new PipedOutputStream(this.pipedInputStream);
+ this.uploading = false;
+ this.multipartUpload = s3Uploader;
+ }
+
+ @Override
+ public void write(int b) throws IOException {
+ if (!this.uploading) {
+ this.uploading = true;
+
+ startUpload();
+ }
+ this.pipedOutputStream.write(b);
+ }
+
+ private void startUpload() {
+ if (this.singleThreadExecutor == null) {
+ this.singleThreadExecutor = Executors.newSingleThreadExecutor();
+ }
+
+ this.singleThreadExecutor.execute(() -> {
+ try {
+ this.multipartUpload.upload(this.pipedInputStream);
+ }
+ catch (IOException ex) {
+ logger.error("Error during multipart upload", ex);
+ throw new RuntimeException(ex);
+ }
+ finally {
+ try {
+ this.pipedInputStream.close();
+ }
+ catch (IOException ex) {
+ logger.error("Error closing piped input stream", ex);
+ }
+ }
+ });
+ this.singleThreadExecutor.shutdown();
+ }
+
+ @Override
+ public void close() throws IOException {
+ logger.debug("Closing output stream");
+
+ this.pipedOutputStream.close();
+
+ if (this.uploading) {
+ try {
+ if (!this.singleThreadExecutor.awaitTermination(10L, TimeUnit.SECONDS)) {
+ logger.warn("Multipart upload thread did not finish in time");
+ }
+ }
+ catch (InterruptedException ex) {
+ logger.error("Multipart upload thread interrupted", ex);
+ }
+ }
+
+ logger.debug("Output stream closed");
+ super.close();
+ }
+
+ public void setSingleThreadExecutor(ExecutorService singleThreadExecutor) {
+ this.singleThreadExecutor = singleThreadExecutor;
+ }
+}
diff --git a/spring-batch-s3/src/main/java/org/springframework/batch/extensions/s3/stream/S3MultipartUploader.java b/spring-batch-s3/src/main/java/org/springframework/batch/extensions/s3/stream/S3MultipartUploader.java
new file mode 100644
index 00000000..c0878829
--- /dev/null
+++ b/spring-batch-s3/src/main/java/org/springframework/batch/extensions/s3/stream/S3MultipartUploader.java
@@ -0,0 +1,201 @@
+/*
+ * Copyright 2025 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.springframework.batch.extensions.s3.stream;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.core.sync.RequestBody;
+import software.amazon.awssdk.services.s3.S3Client;
+import software.amazon.awssdk.services.s3.model.AbortMultipartUploadRequest;
+import software.amazon.awssdk.services.s3.model.CompleteMultipartUploadRequest;
+import software.amazon.awssdk.services.s3.model.CompleteMultipartUploadResponse;
+import software.amazon.awssdk.services.s3.model.CompletedMultipartUpload;
+import software.amazon.awssdk.services.s3.model.CompletedPart;
+import software.amazon.awssdk.services.s3.model.CreateMultipartUploadRequest;
+import software.amazon.awssdk.services.s3.model.CreateMultipartUploadResponse;
+import software.amazon.awssdk.services.s3.model.UploadPartRequest;
+import software.amazon.awssdk.services.s3.model.UploadPartResponse;
+
+/**
+ * A utility class for performing multipart uploads to Amazon S3. It reads data from an
+ * input stream and uploads it in parts to a specified S3 bucket and key.
+ * Reference: Uploading
+ * streams to Amazon S3 using the AWS SDK for Java 2.x
+ *
+ * @author Andrea Cioni
+ */
+public class S3MultipartUploader implements S3Uploader {
+
+ private static final Logger logger = LoggerFactory.getLogger(S3MultipartUploader.class);
+
+ private final S3Client s3Client;
+
+ private final String bucket;
+
+ private final String key;
+
+ private int partSize = Defaults.DEFAULT_PART_SIZE;
+
+ private String contentType = Defaults.DEFAULT_CONTENT_TYPE;
+
+ public S3MultipartUploader(S3Client s3Client, String bucket, String key) {
+ this.s3Client = s3Client;
+ this.bucket = bucket;
+ this.key = key;
+ }
+
+ /**
+ * Reads from the input stream into the buffer, attempting to fill the buffer
+ * completely or until the end of the stream is reached.
+ * @param inputStream the input stream to read from
+ * @param buffer the buffer to fill
+ * @return the number of bytes read, or -1 if the end of the stream is reached before
+ * any bytes are read
+ * @throws IOException if an I/O error occurs
+ */
+ private static int readFullyOrToEnd(InputStream inputStream, byte[] buffer) throws IOException {
+ int totalBytesRead = 0;
+ int bytesRead;
+ while (totalBytesRead < buffer.length) {
+ bytesRead = inputStream.read(buffer, totalBytesRead, buffer.length - totalBytesRead);
+ if (bytesRead == -1) {
+ break;
+ }
+ totalBytesRead += bytesRead;
+ }
+ return (totalBytesRead > 0) ? totalBytesRead : -1;
+ }
+
+ @Override
+ public long upload(InputStream inputStream) throws IOException {
+ String uploadId;
+ long totalBytesRead = 0;
+
+ try {
+ CreateMultipartUploadRequest createMultipartUploadRequest = CreateMultipartUploadRequest.builder()
+ .bucket(this.bucket)
+ .key(this.key)
+ .contentType(this.contentType)
+ .build();
+
+ CreateMultipartUploadResponse createResponse = this.s3Client
+ .createMultipartUpload(createMultipartUploadRequest);
+ uploadId = createResponse.uploadId();
+ logger.debug("Started multipart upload with ID: {}", uploadId);
+
+ List completedParts = new ArrayList<>();
+ int partNumber = 1;
+ byte[] buffer = new byte[this.partSize];
+ int bytesRead;
+
+ try {
+ while ((bytesRead = readFullyOrToEnd(inputStream, buffer)) > 0) {
+ totalBytesRead += bytesRead;
+ UploadPartRequest uploadPartRequest = UploadPartRequest.builder()
+ .bucket(this.bucket)
+ .key(this.key)
+ .uploadId(uploadId)
+ .partNumber(partNumber)
+ .build();
+
+ RequestBody requestBody;
+ if (bytesRead < this.partSize) {
+ byte[] lastPartBuffer = new byte[bytesRead];
+ System.arraycopy(buffer, 0, lastPartBuffer, 0, bytesRead);
+ requestBody = RequestBody.fromBytes(lastPartBuffer);
+ }
+ else {
+ requestBody = RequestBody.fromBytes(buffer);
+ }
+
+ UploadPartResponse uploadPartResponse = this.s3Client.uploadPart(uploadPartRequest, requestBody);
+ CompletedPart part = CompletedPart.builder()
+ .partNumber(partNumber)
+ .eTag(uploadPartResponse.eTag())
+ .build();
+ completedParts.add(part);
+
+ logger.debug("Uploaded part {} with size {} bytes", partNumber, bytesRead);
+ partNumber++;
+ }
+
+ CompletedMultipartUpload completedMultipartUpload = CompletedMultipartUpload.builder()
+ .parts(completedParts)
+ .build();
+
+ CompleteMultipartUploadRequest completeRequest = CompleteMultipartUploadRequest.builder()
+ .bucket(this.bucket)
+ .key(this.key)
+ .uploadId(uploadId)
+ .multipartUpload(completedMultipartUpload)
+ .build();
+
+ CompleteMultipartUploadResponse completeResponse = this.s3Client
+ .completeMultipartUpload(completeRequest);
+ logger.debug("Multipart upload completed. Object URL: {}", completeResponse.location());
+ }
+ catch (Exception ex) {
+ logger.error("Error during multipart upload: {}", ex.getMessage(), ex);
+ if (uploadId != null) {
+ AbortMultipartUploadRequest abortRequest = AbortMultipartUploadRequest.builder()
+ .bucket(this.bucket)
+ .key(this.key)
+ .uploadId(uploadId)
+ .build();
+ this.s3Client.abortMultipartUpload(abortRequest);
+ logger.warn("Multipart upload aborted");
+ }
+ throw ex;
+ }
+ finally {
+ try {
+ inputStream.close();
+ }
+ catch (IOException ex) {
+ logger.error("Error closing input stream: {}", ex.getMessage(), ex);
+ }
+ }
+ }
+ finally {
+ this.s3Client.close();
+ }
+
+ return totalBytesRead;
+ }
+
+ public int getPartSize() {
+ return this.partSize;
+ }
+
+ public void setPartSize(int partSize) {
+ this.partSize = partSize;
+ }
+
+ public String getContentType() {
+ return this.contentType;
+ }
+
+ public void setContentType(String contentType) {
+ this.contentType = contentType;
+ }
+}
diff --git a/spring-batch-s3/src/main/java/org/springframework/batch/extensions/s3/stream/S3OutputStream.java b/spring-batch-s3/src/main/java/org/springframework/batch/extensions/s3/stream/S3OutputStream.java
new file mode 100644
index 00000000..2117f24b
--- /dev/null
+++ b/spring-batch-s3/src/main/java/org/springframework/batch/extensions/s3/stream/S3OutputStream.java
@@ -0,0 +1,119 @@
+/*
+ * Copyright 2025 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.springframework.batch.extensions.s3.stream;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.io.PipedInputStream;
+import java.io.PipedOutputStream;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.core.sync.RequestBody;
+import software.amazon.awssdk.http.ContentStreamProvider;
+import software.amazon.awssdk.services.s3.S3Client;
+
+/**
+ * An {@link OutputStream} that writes data directly to an S3 object with a specified MIME
+ * type (default is application/octet-stream). This stream load the data in-memory and
+ * uploads it to S3 as it is written. It uses a {@link PipedInputStream} and a
+ * {@link PipedOutputStream} to allow writing data asynchronously while uploading it
+ * directly to S3. Is it not safe to use this stream with large file uploads, as it does
+ * not handle multipart uploads or large data efficiently. For this use case, check out
+ * {@link S3MultipartOutputStream}.
+ *
+ * @author Andrea Cioni
+ */
+public class S3OutputStream extends OutputStream {
+
+ private static final Logger logger = LoggerFactory.getLogger(S3OutputStream.class);
+
+ private final S3Client s3;
+
+ private final String bucketName;
+
+ private final String key;
+
+ private final PipedInputStream pipedInputStream;
+
+ private final PipedOutputStream pipedOutputStream;
+
+ private ExecutorService singleThreadExecutor;
+
+ private volatile boolean uploading;
+
+ private String contentType = Defaults.DEFAULT_CONTENT_TYPE;
+
+ public S3OutputStream(S3Client s3, String bucketName, String key) throws IOException {
+ this.s3 = s3;
+ this.bucketName = bucketName;
+ this.key = key;
+ this.pipedInputStream = new PipedInputStream();
+ this.pipedOutputStream = new PipedOutputStream(this.pipedInputStream);
+ this.uploading = false;
+ }
+
+ @Override
+ public void write(int b) throws IOException {
+ if (!this.uploading) {
+ this.uploading = true;
+ runUploadThread();
+ }
+ this.pipedOutputStream.write(b);
+ }
+
+ private void runUploadThread() {
+ if (this.singleThreadExecutor == null) {
+ this.singleThreadExecutor = Executors.newSingleThreadExecutor();
+ }
+
+ this.singleThreadExecutor.execute(() -> {
+ try {
+ RequestBody body = RequestBody
+ .fromContentProvider(ContentStreamProvider.fromInputStream(this.pipedInputStream), this.contentType);
+ this.s3.putObject((builder) -> builder.bucket(this.bucketName).key(this.key), body);
+ }
+ finally {
+ try {
+ this.pipedInputStream.close();
+ }
+ catch (IOException ex) {
+ logger.error("Error closing piped input stream", ex);
+ }
+ }
+ });
+ this.singleThreadExecutor.shutdown();
+ }
+
+ @Override
+ public void close() throws IOException {
+ logger.debug("Closing output stream");
+ this.pipedOutputStream.close();
+ logger.debug("Output stream closed");
+ super.close();
+ }
+
+ public void setContentType(String contentType) {
+ this.contentType = contentType;
+ }
+
+ public String getContentType() {
+ return this.contentType;
+ }
+}
diff --git a/spring-batch-s3/src/main/java/org/springframework/batch/extensions/s3/stream/S3Uploader.java b/spring-batch-s3/src/main/java/org/springframework/batch/extensions/s3/stream/S3Uploader.java
new file mode 100644
index 00000000..559e6702
--- /dev/null
+++ b/spring-batch-s3/src/main/java/org/springframework/batch/extensions/s3/stream/S3Uploader.java
@@ -0,0 +1,26 @@
+/*
+ * Copyright 2025 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.springframework.batch.extensions.s3.stream;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+public interface S3Uploader {
+
+ long upload(InputStream inputStream) throws IOException;
+
+}
diff --git a/spring-batch-s3/src/main/java/org/springframework/batch/extensions/s3/stream/package-info.java b/spring-batch-s3/src/main/java/org/springframework/batch/extensions/s3/stream/package-info.java
new file mode 100644
index 00000000..46a0d5bb
--- /dev/null
+++ b/spring-batch-s3/src/main/java/org/springframework/batch/extensions/s3/stream/package-info.java
@@ -0,0 +1,27 @@
+/*
+ * Copyright 2025 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Provides support for reading items from Amazon S3 using a stream-based approach. This
+ * package includes classes for reading items from S3 objects, deserializing them, and
+ * handling the input stream efficiently.
+ *
+ *
+ * Classes in this package are designed to work with the AWS SDK for Java and provide a
+ * convenient way to read large datasets stored in S3 without loading them entirely into
+ * memory.
+ */
+package org.springframework.batch.extensions.s3.stream;
diff --git a/spring-batch-s3/src/test/java/org/springframework/batch/extensions/s3/S3ItemReaderTests.java b/spring-batch-s3/src/test/java/org/springframework/batch/extensions/s3/S3ItemReaderTests.java
new file mode 100644
index 00000000..3e8f1350
--- /dev/null
+++ b/spring-batch-s3/src/test/java/org/springframework/batch/extensions/s3/S3ItemReaderTests.java
@@ -0,0 +1,171 @@
+/*
+ * Copyright 2025 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.springframework.batch.extensions.s3;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import org.springframework.batch.extensions.s3.serializer.S3Deserializer;
+import org.springframework.batch.extensions.s3.serializer.S3StringDeserializer;
+import org.springframework.batch.extensions.s3.stream.S3InputStream;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.BDDMockito.given;
+import static org.mockito.BDDMockito.then;
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+
+class S3ItemReaderTests {
+
+ private S3Deserializer mockDeserializer;
+
+ private S3InputStream s3InputStream;
+
+ private S3StringDeserializer stringDeserializer;
+
+ @BeforeEach
+ void setUp() {
+ this.stringDeserializer = new S3StringDeserializer();
+ this.mockDeserializer = mock(S3Deserializer.class);
+ this.s3InputStream = mock(S3InputStream.class);
+ }
+
+ @Test
+ void testReadReturnsDeserializedItemWithStreamMock() throws Exception {
+ byte[] data = "test".getBytes();
+ // given
+ given(this.s3InputStream.read(any(byte[].class))).willReturn(data.length, -1);
+ given(this.mockDeserializer.deserialize(any(byte[].class))).willReturn(null, "item");
+
+ S3ItemReader reader = new S3ItemReader<>(this.s3InputStream, this.mockDeserializer);
+
+ // when
+ String result = reader.read();
+
+ // then
+ assertThat(result).isEqualTo("item");
+ then(this.s3InputStream).should(times(1)).read(any(byte[].class));
+ then(this.mockDeserializer).should(times(2)).deserialize(any(byte[].class));
+ }
+
+ @Test
+ void testReadReturnsDeserializedItem() throws Exception {
+ byte[] data = "item\n".getBytes();
+
+ // given
+ given(this.mockDeserializer.deserialize(any(byte[].class)))
+ .willReturn(null);
+ given(this.s3InputStream.read(any(byte[].class))).willAnswer((invocation) -> {
+ byte[] buffer = invocation.getArgument(0);
+ System.arraycopy(data, 0, buffer, 0, data.length);
+ return data.length;
+ }).willReturn(-1);
+
+ S3ItemReader reader = new S3ItemReader<>(this.s3InputStream, this.stringDeserializer);
+
+ // when
+ String result = reader.read();
+
+ // then
+ assertThat(result).isEqualTo("item");
+ then(this.s3InputStream).should(times(1)).read(any(byte[].class));
+ }
+
+ @Test
+ void testReadReturnsNullWhenNoData() throws Exception {
+ // given
+ given(this.s3InputStream.read(any(byte[].class))).willReturn(-1);
+
+ S3ItemReader reader = new S3ItemReader<>(this.s3InputStream, this.mockDeserializer);
+
+ // when
+ String result = reader.read();
+
+ // then
+ assertThat(result).isNull();
+ }
+
+ @Test
+ void testReadReturnsMultipleItems() throws Exception {
+ byte[] data1 = "item1\n".getBytes();
+ byte[] data2 = "item2\n".getBytes();
+
+ // given
+ given(this.s3InputStream.read(any(byte[].class)))
+ .willAnswer((invocation) -> {
+ byte[] buffer = invocation.getArgument(0);
+ System.arraycopy(data1, 0, buffer, 0, data1.length);
+ return data1.length;
+ });
+ given(this.mockDeserializer.deserialize(any(byte[].class)))
+ .willReturn("item1")
+ .willReturn("item2")
+ .willReturn(null); // No more items
+ given(this.s3InputStream.read(any(byte[].class)))
+ .willAnswer((invocation) -> {
+ byte[] buffer = invocation.getArgument(0);
+ System.arraycopy(data2, 0, buffer, 0, data2.length);
+ return data2.length;
+ })
+ .willReturn(-1); // End of stream
+ S3ItemReader reader = new S3ItemReader<>(this.s3InputStream, this.mockDeserializer);
+ String result1 = reader.read();
+ String result2 = reader.read();
+ String result3 = reader.read();
+ // then
+
+ assertThat(result1).isEqualTo("item1");
+ assertThat(result2).isEqualTo("item2");
+ assertThat(result3).isNull();
+ then(this.s3InputStream).should(times(2)).read(any(byte[].class));
+ then(this.mockDeserializer).should(times(4)).deserialize(any(byte[].class));
+ }
+
+ @Test
+ void testReadReturnsMultipleItemsInSingleDeserialization() throws Exception {
+ byte[] data = "item1\nitem2\n".getBytes();
+
+ // given
+ given(this.s3InputStream.read(any(byte[].class)))
+ .willAnswer((invocation) -> {
+ byte[] buffer = invocation.getArgument(0);
+ System.arraycopy(data, 0, buffer, 0, data.length);
+ return data.length;
+ }).willAnswer((invocation) -> -1);
+
+ given(this.mockDeserializer.deserialize(any(byte[].class)))
+ .willReturn(null) // buffer is empty
+ .willReturn("item1")
+ .willReturn("item2")
+ .willReturn(null); // End of stream
+
+ S3ItemReader reader = new S3ItemReader<>(this.s3InputStream, this.mockDeserializer);
+ String result1 = reader.read();
+ String result2 = reader.read();
+ String result3 = reader.read();
+
+ // then
+
+ assertThat(result1).isEqualTo("item1");
+ assertThat(result2).isEqualTo("item2");
+ assertThat(result3).isNull();
+ then(this.s3InputStream).should(times(2)).read(any(byte[].class));
+ then(this.mockDeserializer).should(times(4)).deserialize(any(byte[].class));
+ }
+
+}
diff --git a/spring-batch-s3/src/test/java/org/springframework/batch/extensions/s3/S3ItemWriterTests.java b/spring-batch-s3/src/test/java/org/springframework/batch/extensions/s3/S3ItemWriterTests.java
new file mode 100644
index 00000000..744ba934
--- /dev/null
+++ b/spring-batch-s3/src/test/java/org/springframework/batch/extensions/s3/S3ItemWriterTests.java
@@ -0,0 +1,101 @@
+/*
+ * Copyright 2025 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.springframework.batch.extensions.s3;
+
+import java.io.IOException;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import org.springframework.batch.extensions.s3.serializer.S3Serializer;
+import org.springframework.batch.extensions.s3.stream.S3MultipartOutputStream;
+import org.springframework.batch.item.Chunk;
+import org.springframework.batch.item.ItemStreamException;
+
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.mockito.BDDMockito.given;
+import static org.mockito.BDDMockito.then;
+import static org.mockito.BDDMockito.willThrow;
+import static org.mockito.Mockito.mock;
+
+class S3ItemWriterTests {
+
+ private S3Serializer serializer;
+
+ private S3MultipartOutputStream outputStream;
+
+ @BeforeEach
+ void setUp() {
+ this.serializer = mock(S3Serializer.class);
+ this.outputStream = mock(S3MultipartOutputStream.class);
+ }
+
+ @Test
+ void testWrite_success() throws Exception {
+ String item = "test";
+ byte[] data = item.getBytes();
+ // given
+ given(this.serializer.serialize(item)).willReturn(data);
+
+ S3ItemWriter writer = new S3ItemWriter<>(this.outputStream, this.serializer);
+ Chunk chunk = Chunk.of(item);
+
+ // when
+ writer.write(chunk);
+
+ // then
+ then(this.serializer).should().serialize(item);
+ then(this.outputStream).should().write(data);
+ }
+
+ @Test
+ void testWrite_throwsOnNullOrEmpty() {
+ String item = "bad";
+ // given
+ given(this.serializer.serialize(item)).willReturn(null);
+
+ S3ItemWriter writer = new S3ItemWriter<>(this.outputStream, this.serializer);
+ Chunk chunk = Chunk.of(item);
+
+ // when/then
+ assertThatThrownBy(() -> writer.write(chunk))
+ .isInstanceOf(IllegalArgumentException.class);
+ }
+
+ @Test
+ void testClose_success() throws Exception {
+ S3ItemWriter writer = new S3ItemWriter<>(this.outputStream, this.serializer);
+
+ // when
+ writer.close();
+
+ // then
+ then(this.outputStream).should().close();
+ }
+
+ @Test
+ void testClose_throwsItemStreamException() throws Exception {
+ // given
+ willThrow(new IOException("close error")).given(this.outputStream).close();
+ S3ItemWriter writer = new S3ItemWriter<>(this.outputStream, this.serializer);
+
+ // when/then
+ assertThatThrownBy(writer::close)
+ .isInstanceOf(ItemStreamException.class);
+ }
+
+}
diff --git a/spring-batch-s3/src/test/java/org/springframework/batch/extensions/s3/serializer/S3StringDeserializerTests.java b/spring-batch-s3/src/test/java/org/springframework/batch/extensions/s3/serializer/S3StringDeserializerTests.java
new file mode 100644
index 00000000..b6309f31
--- /dev/null
+++ b/spring-batch-s3/src/test/java/org/springframework/batch/extensions/s3/serializer/S3StringDeserializerTests.java
@@ -0,0 +1,70 @@
+/*
+ * Copyright 2025 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.springframework.batch.extensions.s3.serializer;
+
+
+import java.nio.charset.StandardCharsets;
+
+import org.junit.jupiter.api.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+class S3StringDeserializerTests {
+
+ @Test
+ void testDeserializeSingleLine() {
+ S3StringDeserializer deserializer = new S3StringDeserializer();
+ String input = "testString\n";
+ String result = deserializer.deserialize(input.getBytes(StandardCharsets.UTF_8));
+ assertThat(result).isEqualTo("testString");
+ }
+
+ @Test
+ void testDeserializeMultipleLines() {
+ S3StringDeserializer deserializer = new S3StringDeserializer();
+ String input = "line1\nline2\n";
+ String result1 = deserializer.deserialize(input.getBytes(StandardCharsets.UTF_8));
+ assertThat(result1).isEqualTo("line1");
+ String result2 = deserializer.deserialize(new byte[0]);
+ assertThat(result2).isEqualTo("line2");
+ }
+
+ @Test
+ void testDeserializeWithCarriageReturn() {
+ S3StringDeserializer deserializer = new S3StringDeserializer();
+ String input = "line1\r\n";
+ String result = deserializer.deserialize(input.getBytes(StandardCharsets.UTF_8));
+ assertThat(result).isEqualTo("line1");
+ }
+
+ @Test
+ void testDeserializePartialInput() {
+ S3StringDeserializer deserializer = new S3StringDeserializer();
+ String part1 = "partial";
+ String part2 = "Line\n";
+ assertThat(deserializer.deserialize(part1.getBytes(StandardCharsets.UTF_8))).isNull();
+ String result = deserializer.deserialize(part2.getBytes(StandardCharsets.UTF_8));
+ assertThat(result).isEqualTo("partialLine");
+ }
+
+ @Test
+ void testDeserializeEmptyInput() {
+ S3StringDeserializer deserializer = new S3StringDeserializer();
+ assertThat(deserializer.deserialize("".getBytes(StandardCharsets.UTF_8))).isNull();
+ }
+
+}
diff --git a/spring-batch-s3/src/test/java/org/springframework/batch/extensions/s3/serializer/S3StringSerializerTests.java b/spring-batch-s3/src/test/java/org/springframework/batch/extensions/s3/serializer/S3StringSerializerTests.java
new file mode 100644
index 00000000..f3a643f4
--- /dev/null
+++ b/spring-batch-s3/src/test/java/org/springframework/batch/extensions/s3/serializer/S3StringSerializerTests.java
@@ -0,0 +1,62 @@
+/*
+ * Copyright 2025 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.springframework.batch.extensions.s3.serializer;
+
+import org.junit.jupiter.api.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatCode;
+
+class S3StringSerializerTests {
+
+ @Test
+ void testSerialize() {
+ S3StringSerializer serializer = new S3StringSerializer();
+ String input = "testString";
+ byte[] result = serializer.serialize(input);
+
+ String expected = "testString\n";
+ assertThat(result).isEqualTo(expected.getBytes(java.nio.charset.StandardCharsets.UTF_8));
+ }
+
+ @Test
+ void testSerializeEmptyString() {
+ S3StringSerializer serializer = new S3StringSerializer();
+ String input = "";
+ byte[] result = serializer.serialize(input);
+
+ String expected = "\n";
+ assertThat(result).isEqualTo(expected.getBytes(java.nio.charset.StandardCharsets.UTF_8));
+ }
+
+ @Test
+ void testSerializeNull() {
+ S3StringSerializer serializer = new S3StringSerializer();
+ assertThatCode(() -> serializer.serialize(null)).doesNotThrowAnyException();
+ }
+
+ @Test
+ void testSerializeWithSpecialCharacters() {
+ S3StringSerializer serializer = new S3StringSerializer();
+ String input = "test\nstring\r\nwith special characters!@#$%^&*()";
+ byte[] result = serializer.serialize(input);
+
+ String expected = "test\nstring\r\nwith special characters!@#$%^&*()\n";
+ assertThat(result).isEqualTo(expected.getBytes(java.nio.charset.StandardCharsets.UTF_8));
+ }
+
+}
diff --git a/spring-batch-s3/src/test/java/org/springframework/batch/extensions/s3/stream/S3InputStreamTests.java b/spring-batch-s3/src/test/java/org/springframework/batch/extensions/s3/stream/S3InputStreamTests.java
new file mode 100644
index 00000000..cbf6541b
--- /dev/null
+++ b/spring-batch-s3/src/test/java/org/springframework/batch/extensions/s3/stream/S3InputStreamTests.java
@@ -0,0 +1,69 @@
+/*
+ * Copyright 2025 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.springframework.batch.extensions.s3.stream;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
+import software.amazon.awssdk.core.ResponseInputStream;
+import software.amazon.awssdk.services.s3.S3Client;
+import software.amazon.awssdk.services.s3.model.GetObjectRequest;
+import software.amazon.awssdk.services.s3.model.GetObjectResponse;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.BDDMockito.given;
+import static org.mockito.BDDMockito.then;
+
+class S3InputStreamTests {
+
+ private S3Client s3Client;
+
+ private final byte[] data = { 1, 2, 3, 4 };
+
+ @BeforeEach
+ void setUp() {
+ this.s3Client = Mockito.mock(S3Client.class);
+ }
+
+ @Test
+ void testRead() throws IOException {
+ InputStream mockStream = new ByteArrayInputStream(this.data);
+ ResponseInputStream responseInputStream = new ResponseInputStream<>(
+ GetObjectResponse.builder().build(), mockStream);
+ // given
+ given(this.s3Client.getObject(any(GetObjectRequest.class))).willReturn(responseInputStream);
+
+ String key = "test-key";
+ String bucket = "test-bucket";
+ // when
+ try (S3InputStream s3InputStream = new S3InputStream(this.s3Client, bucket, key)) {
+ for (byte b : this.data) {
+ assertThat(s3InputStream.read()).isEqualTo(b);
+ }
+ assertThat(s3InputStream.read()).isEqualTo(-1);
+ }
+
+ // then
+ then(this.s3Client).should().getObject(any(GetObjectRequest.class));
+ }
+
+}
diff --git a/spring-batch-s3/src/test/java/org/springframework/batch/extensions/s3/stream/S3MultipartOutputStreamTests.java b/spring-batch-s3/src/test/java/org/springframework/batch/extensions/s3/stream/S3MultipartOutputStreamTests.java
new file mode 100644
index 00000000..82419d5b
--- /dev/null
+++ b/spring-batch-s3/src/test/java/org/springframework/batch/extensions/s3/stream/S3MultipartOutputStreamTests.java
@@ -0,0 +1,90 @@
+/*
+ * Copyright 2025 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.springframework.batch.extensions.s3.stream;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.concurrent.TimeUnit;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.mockito.ArgumentCaptor;
+import software.amazon.awssdk.services.s3.S3Client;
+
+import org.springframework.util.unit.DataSize;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.BDDMockito.given;
+import static org.mockito.BDDMockito.then;
+import static org.mockito.Mockito.mock;
+
+class S3MultipartOutputStreamTests {
+
+ private S3Client s3Client;
+
+ private S3Uploader multipartUploadMock;
+
+ @BeforeEach
+ void setUp() throws IOException {
+ this.s3Client = mock(S3Client.class);
+ this.multipartUploadMock = mock(S3Uploader.class);
+
+ given(this.multipartUploadMock.upload(any())).willAnswer((invocation) -> {
+ TimeUnit.MILLISECONDS.sleep(100); // Simulate some delay for upload
+ return 1L;
+ });
+ }
+
+ @Test
+ void testWriteSingleByteTriggersUpload() throws IOException {
+ int testByte = 42;
+
+ try (S3MultipartOutputStream out = new S3MultipartOutputStream(this.multipartUploadMock)) {
+ // when
+ out.write(testByte);
+
+ ArgumentCaptor captor = ArgumentCaptor.forClass(InputStream.class);
+
+ // then
+ then(this.multipartUploadMock).should().upload(captor.capture());
+ assertThat(captor.getValue().available()).as("InputStream should contain one byte").isEqualTo(1);
+ }
+ }
+
+ @Test
+ void testConstructorWithDefaultPartSize() throws IOException {
+ S3MultipartOutputStream out = new S3MultipartOutputStream(this.s3Client, "bucket", "key");
+ out.close();
+ }
+
+ @Test
+ void testConstructorWithCustomPartSize() throws IOException {
+ int customPartSize = (int) DataSize.ofMegabytes(10).toBytes();
+ var s3Uploader = new S3MultipartUploader(this.s3Client, "bucket", "key");
+ s3Uploader.setPartSize(customPartSize);
+ S3MultipartOutputStream out = new S3MultipartOutputStream(s3Uploader);
+ out.close();
+ }
+
+ @Test
+ void testConstructorWithS3UploadOutputStream() throws IOException {
+ S3MultipartOutputStream out = new S3MultipartOutputStream(this.multipartUploadMock);
+ out.close();
+ }
+
+}
diff --git a/spring-batch-s3/src/test/java/org/springframework/batch/extensions/s3/stream/S3MultipartUploaderTests.java b/spring-batch-s3/src/test/java/org/springframework/batch/extensions/s3/stream/S3MultipartUploaderTests.java
new file mode 100644
index 00000000..0d4cd7f5
--- /dev/null
+++ b/spring-batch-s3/src/test/java/org/springframework/batch/extensions/s3/stream/S3MultipartUploaderTests.java
@@ -0,0 +1,101 @@
+/*
+ * Copyright 2025 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.springframework.batch.extensions.s3.stream;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import software.amazon.awssdk.core.sync.RequestBody;
+import software.amazon.awssdk.services.s3.S3Client;
+import software.amazon.awssdk.services.s3.model.AbortMultipartUploadRequest;
+import software.amazon.awssdk.services.s3.model.CompleteMultipartUploadRequest;
+import software.amazon.awssdk.services.s3.model.CompleteMultipartUploadResponse;
+import software.amazon.awssdk.services.s3.model.CreateMultipartUploadRequest;
+import software.amazon.awssdk.services.s3.model.CreateMultipartUploadResponse;
+import software.amazon.awssdk.services.s3.model.UploadPartRequest;
+import software.amazon.awssdk.services.s3.model.UploadPartResponse;
+
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.BDDMockito.given;
+import static org.mockito.BDDMockito.then;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+
+class S3MultipartUploaderTests {
+
+ private S3Client s3Client;
+
+ private S3MultipartUploader s3MultipartUploader;
+
+ @BeforeEach
+ void setUp() {
+ this.s3Client = mock(S3Client.class);
+ var s3Uploader = new S3MultipartUploader(this.s3Client, "bucket", "key");
+ s3Uploader.setPartSize(5);
+ this.s3MultipartUploader = s3Uploader;
+ }
+
+ @Test
+ void testUpload_SuccessfulUpload() throws IOException {
+ byte[] data = "HelloWorld!".getBytes(); // 11 bytes, 3 parts, 2 of 5 bytes each and one of 1 byte
+ ByteArrayInputStream inputStream = new ByteArrayInputStream(data);
+
+ // given
+ given(this.s3Client.createMultipartUpload(any(CreateMultipartUploadRequest.class)))
+ .willReturn(CreateMultipartUploadResponse.builder().uploadId("uploadId").build());
+
+ given(this.s3Client.uploadPart(any(UploadPartRequest.class), any(RequestBody.class)))
+ .willReturn(UploadPartResponse.builder().eTag("etag1").build(),
+ UploadPartResponse.builder().eTag("etag2").build(),
+ UploadPartResponse.builder().eTag("etag3").build());
+
+ given(this.s3Client.completeMultipartUpload(any(CompleteMultipartUploadRequest.class)))
+ .willReturn(CompleteMultipartUploadResponse.builder().location("url").build());
+
+ // when
+ this.s3MultipartUploader.upload(inputStream);
+
+ // then
+ then(this.s3Client).should().createMultipartUpload(any(CreateMultipartUploadRequest.class));
+ then(this.s3Client).should(times(3)).uploadPart(any(UploadPartRequest.class), any(RequestBody.class));
+ then(this.s3Client).should().completeMultipartUpload(any(CompleteMultipartUploadRequest.class));
+ then(this.s3Client).should().close();
+ }
+
+ @Test
+ void testUpload_AbortOnException() {
+ byte[] data = "HelloWorld".getBytes();
+ ByteArrayInputStream inputStream = new ByteArrayInputStream(data);
+
+ // given
+ given(this.s3Client.createMultipartUpload(any(CreateMultipartUploadRequest.class)))
+ .willReturn(CreateMultipartUploadResponse.builder().uploadId("uploadId").build());
+
+ given(this.s3Client.uploadPart(any(UploadPartRequest.class), any(RequestBody.class)))
+ .willThrow(new RuntimeException("Upload failed"));
+
+ // when/then
+ assertThatThrownBy(() -> this.s3MultipartUploader.upload(inputStream))
+ .isInstanceOf(RuntimeException.class);
+ then(this.s3Client).should().abortMultipartUpload(any(AbortMultipartUploadRequest.class));
+ then(this.s3Client).should().close();
+ }
+
+}
diff --git a/spring-batch-s3/src/test/java/org/springframework/batch/extensions/s3/stream/S3OutputStreamTests.java b/spring-batch-s3/src/test/java/org/springframework/batch/extensions/s3/stream/S3OutputStreamTests.java
new file mode 100644
index 00000000..e62f6f6d
--- /dev/null
+++ b/spring-batch-s3/src/test/java/org/springframework/batch/extensions/s3/stream/S3OutputStreamTests.java
@@ -0,0 +1,58 @@
+/*
+ * Copyright 2025 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.springframework.batch.extensions.s3.stream;
+
+import java.io.IOException;
+import java.util.function.Consumer;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import software.amazon.awssdk.core.sync.RequestBody;
+import software.amazon.awssdk.services.s3.S3Client;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.timeout;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+class S3OutputStreamTests {
+
+ private S3Client s3Client;
+
+ @BeforeEach
+ void setUp() {
+ this.s3Client = mock(S3Client.class);
+ }
+
+ @Test
+ void testWriteAndUpload() throws IOException, InterruptedException {
+ byte[] data = { 10, 20, 30, 40 };
+ doReturn(null).when(this.s3Client).putObject(any(Consumer.class), any(RequestBody.class));
+
+ String bucket = "test-bucket";
+ String key = "test-key";
+ try (S3OutputStream out = new S3OutputStream(this.s3Client, bucket, key)) {
+ out.write(data);
+ }
+
+ verify(this.s3Client, timeout(200)).putObject(any(Consumer.class), any(RequestBody.class));
+ verify(this.s3Client, times(1)).putObject(any(Consumer.class), any(RequestBody.class));
+ }
+
+}