Skip to content

Commit 183bc96

Browse files
committed
Fix KafkaItemReader ExecutionContext deserialization error when using Jackson2ExecutionContextStringSerializer
Signed-off-by: Hyunwoo Jung <[email protected]>
1 parent f90e965 commit 183bc96

File tree

2 files changed

+21
-12
lines changed

2 files changed

+21
-12
lines changed

spring-batch-infrastructure/src/main/java/org/springframework/batch/item/kafka/KafkaItemReader.java

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2019-2023 the original author or authors.
2+
* Copyright 2019-2025 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -48,6 +48,7 @@
4848
*
4949
* @author Mathieu Ouellet
5050
* @author Mahmoud Ben Hassine
51+
* @author Hyunwoo Jung
5152
* @since 4.2
5253
*/
5354
public class KafkaItemReader<K, V> extends AbstractItemStreamItemReader<V> {
@@ -56,6 +57,8 @@ public class KafkaItemReader<K, V> extends AbstractItemStreamItemReader<V> {
5657

5758
private static final long DEFAULT_POLL_TIMEOUT = 30L;
5859

60+
private final String topicName;
61+
5962
private final List<TopicPartition> topicPartitions;
6063

6164
private Map<TopicPartition, Long> partitionOffsets;
@@ -110,6 +113,7 @@ public KafkaItemReader(Properties consumerProperties, String topicName, List<Int
110113
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG + " property must be provided");
111114
this.consumerProperties = consumerProperties;
112115
Assert.hasLength(topicName, "Topic name must not be null or empty");
116+
this.topicName = topicName;
113117
Assert.isTrue(!partitions.isEmpty(), "At least one partition must be provided");
114118
this.topicPartitions = new ArrayList<>();
115119
for (Integer partition : partitions) {
@@ -174,10 +178,10 @@ public void open(ExecutionContext executionContext) {
174178
}
175179
}
176180
if (this.saveState && executionContext.containsKey(TOPIC_PARTITION_OFFSETS)) {
177-
Map<TopicPartition, Long> offsets = (Map<TopicPartition, Long>) executionContext
178-
.get(TOPIC_PARTITION_OFFSETS);
179-
for (Map.Entry<TopicPartition, Long> entry : offsets.entrySet()) {
180-
this.partitionOffsets.put(entry.getKey(), entry.getValue() == 0 ? 0 : entry.getValue() + 1);
181+
Map<String, Long> offsets = (Map<String, Long>) executionContext.get(TOPIC_PARTITION_OFFSETS);
182+
for (Map.Entry<String, Long> entry : offsets.entrySet()) {
183+
this.partitionOffsets.put(new TopicPartition(this.topicName, Integer.parseInt(entry.getKey())),
184+
entry.getValue() == 0 ? 0 : entry.getValue() + 1);
181185
}
182186
}
183187
this.kafkaConsumer.assign(this.topicPartitions);
@@ -203,7 +207,11 @@ public V read() {
203207
@Override
204208
public void update(ExecutionContext executionContext) {
205209
if (this.saveState) {
206-
executionContext.put(TOPIC_PARTITION_OFFSETS, new HashMap<>(this.partitionOffsets));
210+
Map<String, Long> offsets = new HashMap<>();
211+
for (Map.Entry<TopicPartition, Long> entry : this.partitionOffsets.entrySet()) {
212+
offsets.put(String.valueOf(entry.getKey().partition()), entry.getValue());
213+
}
214+
executionContext.put(TOPIC_PARTITION_OFFSETS, offsets);
207215
}
208216
this.kafkaConsumer.commitSync();
209217
}

spring-batch-infrastructure/src/test/java/org/springframework/batch/item/kafka/KafkaItemReaderIntegrationTests.java

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2019-2023 the original author or authors.
2+
* Copyright 2019-2025 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -59,6 +59,7 @@
5959
* @author Mahmoud Ben Hassine
6060
* @author François Martin
6161
* @author Patrick Baumgartner
62+
* @author Hyunwoo Jung
6263
*/
6364
@Testcontainers(disabledWithoutDocker = true)
6465
@ExtendWith(SpringExtension.class)
@@ -266,8 +267,8 @@ void testReadFromSinglePartitionAfterRestart() throws ExecutionException, Interr
266267
future.get();
267268
}
268269
ExecutionContext executionContext = new ExecutionContext();
269-
Map<TopicPartition, Long> offsets = new HashMap<>();
270-
offsets.put(new TopicPartition("topic3", 0), 1L);
270+
Map<String, Long> offsets = new HashMap<>();
271+
offsets.put("0", 1L);
271272
executionContext.put("topic.partition.offsets", offsets);
272273

273274
// topic3-0: val0, val1, val2, val3, val4
@@ -307,9 +308,9 @@ void testReadFromMultiplePartitionsAfterRestart() throws ExecutionException, Int
307308
}
308309

309310
ExecutionContext executionContext = new ExecutionContext();
310-
Map<TopicPartition, Long> offsets = new HashMap<>();
311-
offsets.put(new TopicPartition("topic4", 0), 1L);
312-
offsets.put(new TopicPartition("topic4", 1), 2L);
311+
Map<String, Long> offsets = new HashMap<>();
312+
offsets.put("0", 1L);
313+
offsets.put("1", 2L);
313314
executionContext.put("topic.partition.offsets", offsets);
314315

315316
// topic4-0: val0, val2, val4, val6

0 commit comments

Comments
 (0)