Skip to content

Commit 61d446e

Browse files
florianhoffmbenhassine
authored andcommitted
Update the "write skip count" when items are removed from chunks during write
Resolves #4514 Signed-off-by: Florian Hof <[email protected]>
1 parent 16914c8 commit 61d446e

File tree

6 files changed

+64
-6
lines changed

6 files changed

+64
-6
lines changed

spring-batch-core/src/main/java/org/springframework/batch/core/step/StepContribution.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2006-2023 the original author or authors.
2+
* Copyright 2006-2025 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -157,6 +157,15 @@ public void incrementWriteSkipCount() {
157157
writeSkipCount++;
158158
}
159159

160+
/**
161+
* Increment the write skip count for this contribution.
162+
* @param count The {@code long} amount to increment by.
163+
* @since 6.0.0
164+
*/
165+
public void incrementWriteSkipCount(long count) {
166+
writeSkipCount += count;
167+
}
168+
160169
/**
161170
*
162171
*/

spring-batch-core/src/main/java/org/springframework/batch/core/step/item/FaultTolerantChunkProcessor.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2006-2024 the original author or authors.
2+
* Copyright 2006-2025 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -347,6 +347,7 @@ protected void write(final StepContribution contribution, final Chunk<I> inputs,
347347
stopTimer(sample, contribution.getStepExecution(), "chunk.write", status, "Chunk writing");
348348
}
349349
contribution.incrementWriteCount(outputs.size());
350+
contribution.incrementWriteSkipCount(outputs.getSkipsSize());
350351
}
351352
else {
352353
scan(contribution, inputs, outputs, chunkMonitor, false);

spring-batch-core/src/main/java/org/springframework/batch/core/step/item/SimpleChunkProcessor.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -309,6 +309,7 @@ protected void write(StepContribution contribution, Chunk<I> inputs, Chunk<O> ou
309309
stopTimer(sample, contribution.getStepExecution(), "chunk.write", status, "Chunk writing");
310310
}
311311
contribution.incrementWriteCount(outputs.size());
312+
contribution.incrementWriteSkipCount(outputs.getSkipsSize());
312313
}
313314

314315
protected Chunk<O> transform(StepContribution contribution, Chunk<I> inputs) throws Exception {

spring-batch-core/src/test/java/org/springframework/batch/core/step/item/FaultTolerantChunkProcessorTests.java

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2008-2024 the original author or authors.
2+
* Copyright 2008-2025 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -230,6 +230,25 @@ void testWriteSkipOnException() throws Exception {
230230
assertEquals(0, contribution.getFilterCount());
231231
}
232232

233+
@Test
234+
void testWriteSkipOnIteratorRemove() throws Exception {
235+
processor.setItemWriter(chunk -> {
236+
Chunk<? extends String>.ChunkIterator iterator = chunk.iterator();
237+
while (iterator.hasNext()) {
238+
String item = iterator.next();
239+
if (item.equals("skip")) {
240+
iterator.remove((Exception) null);
241+
}
242+
}
243+
});
244+
Chunk<String> inputs = new Chunk<>(Arrays.asList("3", "skip", "2"));
245+
processor.process(contribution, inputs);
246+
assertEquals(1, contribution.getSkipCount());
247+
assertEquals(2, contribution.getWriteCount());
248+
assertEquals(1, contribution.getWriteSkipCount());
249+
assertEquals(0, contribution.getFilterCount());
250+
}
251+
233252
@Test
234253
void testWriteSkipOnExceptionWithTrivialChunk() throws Exception {
235254
processor.setWriteSkipPolicy(new AlwaysSkipItemSkipPolicy());

spring-batch-core/src/test/java/org/springframework/batch/core/step/item/SimpleChunkProcessorTests.java

Lines changed: 22 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2008-2024 the original author or authors.
2+
* Copyright 2008-2025 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -51,7 +51,16 @@ public void write(Chunk<? extends String> chunk) throws Exception {
5151
if (chunk.getItems().contains("fail")) {
5252
throw new RuntimeException("Planned failure!");
5353
}
54-
list.addAll(chunk.getItems());
54+
Chunk<? extends String>.ChunkIterator iterator = chunk.iterator();
55+
while (iterator.hasNext()) {
56+
String item = iterator.next();
57+
if (item.equals("skip")) {
58+
iterator.remove((Exception) null);
59+
}
60+
else {
61+
list.add(item);
62+
}
63+
}
5564
}
5665
});
5766

@@ -88,4 +97,15 @@ void testTransform() throws Exception {
8897
assertTrue(outputs.isEnd());
8998
}
9099

100+
@Test
101+
void testWriteWithSkip() throws Exception {
102+
Chunk<String> inputs = new Chunk<>();
103+
inputs.add("foo");
104+
inputs.add("skip");
105+
inputs.add("bar");
106+
processor.process(contribution, inputs);
107+
assertEquals(2, contribution.getWriteCount());
108+
assertEquals(1, contribution.getWriteSkipCount());
109+
}
110+
91111
}

spring-batch-infrastructure/src/main/java/org/springframework/batch/item/Chunk.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2006-2024 the original author or authors.
2+
* Copyright 2006-2025 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -152,6 +152,14 @@ public int size() {
152152
return items.size();
153153
}
154154

155+
/**
156+
* @return the number of skipped items
157+
* @since 6.0.0
158+
*/
159+
public int getSkipsSize() {
160+
return skips.size();
161+
}
162+
155163
/**
156164
* Flag to indicate if the source data is exhausted.
157165
*

0 commit comments

Comments
 (0)