@@ -112,7 +112,9 @@ object ReplicaManagerTest {
112
112
class ReplicaManagerTest {
113
113
114
114
private val topic = " test-topic"
115
+ private val topic2 = " test-topic2"
115
116
private val topicId = Uuid .fromString(" YK2ed2GaTH2JpgzUaJ8tgg" )
117
+ private val topicId2 = Uuid .randomUuid()
116
118
private val topicIds = scala.Predef .Map (" test-topic" -> topicId)
117
119
private val topicNames = topicIds.map(_.swap)
118
120
private val topicPartition = new TopicPartition (topic, 0 )
@@ -3294,38 +3296,53 @@ class ReplicaManagerTest {
3294
3296
@ ValueSource (booleans = Array (true , false ))
3295
3297
def testOffsetOutOfRangeExceptionWhenReadFromLog (isFromFollower : Boolean ): Unit = {
3296
3298
val replicaId = if (isFromFollower) 1 else - 1
3299
+ val fetchMaxBytes = 150
3300
+ val partitionMaxBytes = 100
3297
3301
val tp0 = new TopicPartition (topic, 0 )
3302
+ val tp02 = new TopicPartition (topic2, 0 )
3298
3303
val tidp0 = new TopicIdPartition (topicId, tp0)
3304
+ val tidp02 = new TopicIdPartition (topicId2, tp02)
3299
3305
// create a replicaManager with remoteLog enabled
3300
3306
val replicaManager = setupReplicaManagerWithMockedPurgatories(new MockTimer (time), aliveBrokerIds = Seq (0 , 1 , 2 ), enableRemoteStorage = true , shouldMockLog = true , remoteFetchQuotaExceeded = Some (false ))
3301
3307
try {
3302
3308
val offsetCheckpoints = new LazyOffsetCheckpoints (replicaManager.highWatermarkCheckpoints.asJava)
3303
3309
replicaManager.createPartition(tp0).createLogIfNotExists(isNew = false , isFutureReplica = false , offsetCheckpoints, None )
3310
+ replicaManager.createPartition(tp02).createLogIfNotExists(isNew = false , isFutureReplica = false , offsetCheckpoints, None )
3304
3311
val partition0Replicas = Seq [Integer ](0 , 1 ).asJava
3305
- val topicIds = Map (tp0.topic -> topicId).asJava
3312
+ val topicIds = Map (tp0.topic -> topicId, tp02.topic -> topicId2 ).asJava
3306
3313
val leaderEpoch = 0
3307
3314
val delta = createLeaderDelta(topicIds.get(topic), tp0, partition0Replicas.get(0 ), partition0Replicas, partition0Replicas)
3315
+ val delta2 = createLeaderDelta(topicIds.get(topic2), tp02, partition0Replicas.get(0 ), partition0Replicas, partition0Replicas)
3308
3316
val leaderMetadataImage = imageFromTopics(delta.apply())
3317
+ val leaderMetadataImage2 = imageFromTopics(delta2.apply())
3309
3318
replicaManager.applyDelta(delta, leaderMetadataImage)
3310
-
3311
- val params = new FetchParams (replicaId, 1 , 1000 , 0 , 100 , FetchIsolation .LOG_END , Optional .empty)
3312
- // when reading log, it'll throw OffsetOutOfRangeException, which will be handled separately
3313
- val result = replicaManager.readFromLog(params, Seq (tidp0 -> new PartitionData (topicId, 1 , 0 , 100000 , Optional .of[Integer ](leaderEpoch), Optional .of[Integer ](leaderEpoch))), UNBOUNDED_QUOTA , false )
3314
-
3315
- if (isFromFollower) {
3316
- // expect OFFSET_MOVED_TO_TIERED_STORAGE error returned if it's from follower, since the data is already available in remote log
3317
- assertEquals(Errors .OFFSET_MOVED_TO_TIERED_STORAGE , result.head._2.error)
3318
- } else {
3319
- assertEquals(Errors .NONE , result.head._2.error)
3320
- }
3321
- assertEquals(startOffset, result.head._2.leaderLogStartOffset)
3322
- assertEquals(endOffset, result.head._2.leaderLogEndOffset)
3323
- assertEquals(highHW, result.head._2.highWatermark)
3324
- if (isFromFollower) {
3325
- assertFalse(result.head._2.info.delayedRemoteStorageFetch.isPresent)
3326
- } else {
3327
- // for consumer fetch, we should return a delayedRemoteStorageFetch to wait for remote fetch
3328
- assertTrue(result.head._2.info.delayedRemoteStorageFetch.isPresent)
3319
+ replicaManager.applyDelta(delta2, leaderMetadataImage2)
3320
+
3321
+ val params = new FetchParams (replicaId, 1 , 100 , 0 , fetchMaxBytes, FetchIsolation .LOG_END , Optional .empty)
3322
+ // when reading logs from 2 partitions, they'll throw OffsetOutOfRangeException, which will be handled separately
3323
+ val results = replicaManager.readFromLog(params, Seq (
3324
+ tidp0 -> new PartitionData (topicId, 1 , 0 , partitionMaxBytes, Optional .of[Integer ](leaderEpoch), Optional .of[Integer ](leaderEpoch)),
3325
+ tidp02 -> new PartitionData (topicId2, 1 , 0 , partitionMaxBytes, Optional .of[Integer ](leaderEpoch), Optional .of[Integer ](leaderEpoch))), UNBOUNDED_QUOTA , false )
3326
+
3327
+ results.foreach { case (tidp, partitionData) =>
3328
+ assertEquals(startOffset, partitionData.leaderLogStartOffset)
3329
+ assertEquals(endOffset, partitionData.leaderLogEndOffset)
3330
+ assertEquals(highHW, partitionData.highWatermark)
3331
+ if (isFromFollower) {
3332
+ // expect OFFSET_MOVED_TO_TIERED_STORAGE error returned if it's from follower, since the data is already available in remote log
3333
+ assertEquals(Errors .OFFSET_MOVED_TO_TIERED_STORAGE , partitionData.error)
3334
+ assertFalse(partitionData.info.delayedRemoteStorageFetch.isPresent)
3335
+ } else {
3336
+ assertEquals(Errors .NONE , partitionData.error)
3337
+ // for consumer fetch, we should return a delayedRemoteStorageFetch to wait for remote fetch
3338
+ assertTrue(partitionData.info.delayedRemoteStorageFetch.isPresent)
3339
+ // verify the 1st partition will set the fetchMaxBytes to partitionMaxBytes,
3340
+ // and the 2nd one will set to the remaining (fetchMaxBytes - partitionMaxBytes) to meet the "fetch.max.bytes" config.
3341
+ if (tidp.topic == topic)
3342
+ assertEquals(partitionMaxBytes, partitionData.info.delayedRemoteStorageFetch.get().fetchMaxBytes)
3343
+ else
3344
+ assertEquals(fetchMaxBytes - partitionMaxBytes, partitionData.info.delayedRemoteStorageFetch.get().fetchMaxBytes)
3345
+ }
3329
3346
}
3330
3347
} finally {
3331
3348
replicaManager.shutdown(checkpointHW = false )
@@ -3723,8 +3740,8 @@ class ReplicaManagerTest {
3723
3740
partitionDir.mkdir()
3724
3741
when(mockLog.dir).thenReturn(partitionDir)
3725
3742
when(mockLog.parentDir).thenReturn(path)
3726
- when(mockLog.topicId).thenReturn(Optional .of(topicId))
3727
- when(mockLog.topicPartition).thenReturn(new TopicPartition (topic, 0 ))
3743
+ when(mockLog.topicId).thenReturn(Optional .of(topicId)).thenReturn( Optional .of(topicId2))
3744
+ when(mockLog.topicPartition).thenReturn(new TopicPartition (topic, 0 )).thenReturn( new TopicPartition (topic2, 0 ))
3728
3745
when(mockLog.highWatermark).thenReturn(highHW)
3729
3746
when(mockLog.updateHighWatermark(anyLong())).thenReturn(0L )
3730
3747
when(mockLog.logEndOffsetMetadata).thenReturn(new LogOffsetMetadata (10 ))
0 commit comments