From ac8b703aae8a732cf96bd81767175333bbd969b4 Mon Sep 17 00:00:00 2001 From: BlindSpot <127803250+blindspotbounty@users.noreply.github.com> Date: Wed, 6 Dec 2023 21:02:26 +0200 Subject: [PATCH] defer source.finish --- Sources/Kafka/KafkaProducer.swift | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/Sources/Kafka/KafkaProducer.swift b/Sources/Kafka/KafkaProducer.swift index 4ac95995..de953312 100644 --- a/Sources/Kafka/KafkaProducer.swift +++ b/Sources/Kafka/KafkaProducer.swift @@ -238,8 +238,10 @@ public final class KafkaProducer: Service, Sendable { 0...Int(Int32.max) ~= self.configuration.flushTimeoutMilliseconds, "Flush timeout outside of valid range \(0...Int32.max)" ) + defer { // we should finish source indefinetely of exception in client.flush() + source?.finish() + } try await client.flush(timeoutMilliseconds: Int32(self.configuration.flushTimeoutMilliseconds)) - source?.finish() return case .terminatePollLoop: return