From ac67b2238751925ddd665fade70fbd451f16b2f8 Mon Sep 17 00:00:00 2001 From: David Nadoba Date: Fri, 12 Aug 2022 15:53:46 +0200 Subject: [PATCH 1/9] Fix thread leak in `FileDownloadDelegate` --- .../FileDownloadDelegate.swift | 63 ++++++++++++++++++- 1 file changed, 61 insertions(+), 2 deletions(-) diff --git a/Sources/AsyncHTTPClient/FileDownloadDelegate.swift b/Sources/AsyncHTTPClient/FileDownloadDelegate.swift index 75f16f52a..9189486be 100644 --- a/Sources/AsyncHTTPClient/FileDownloadDelegate.swift +++ b/Sources/AsyncHTTPClient/FileDownloadDelegate.swift @@ -33,6 +33,13 @@ public final class FileDownloadDelegate: HTTPClientResponseDelegate { private let io: NonBlockingFileIO private let reportHead: ((HTTPResponseHead) -> Void)? private let reportProgress: ((Progress) -> Void)? + + private enum ThreadPool { + case unowned + // if we own the thread pool we also need to shut it down + case owned(NIOThreadPool) + } + private let threadPool: ThreadPool private var fileHandleFuture: EventLoopFuture? private var writeFuture: EventLoopFuture? @@ -47,12 +54,47 @@ public final class FileDownloadDelegate: HTTPClientResponseDelegate { /// the total byte count and download byte count passed to it as arguments. The callbacks /// will be invoked in the same threading context that the delegate itself is invoked, /// as controlled by `EventLoopPreference`. - public init( + public convenience init( + path: String, + pool: NIOThreadPool, + reportHead: ((HTTPResponseHead) -> Void)? = nil, + reportProgress: ((Progress) -> Void)? = nil + ) throws { + try self.init(path: path, sharedThreadPool: pool, reportHead: reportHead, reportProgress: reportProgress) + } + + /// Initializes a new file download delegate and spawns a new thread for file I/O. + /// + /// - parameters: + /// - path: Path to a file you'd like to write the download to. + /// - reportHead: A closure called when the response head is available. + /// - reportProgress: A closure called when a body chunk has been downloaded, with + /// the total byte count and download byte count passed to it as arguments. The callbacks + /// will be invoked in the same threading context that the delegate itself is invoked, + /// as controlled by `EventLoopPreference`. + public convenience init( + path: String, + reportHead: ((HTTPResponseHead) -> Void)? = nil, + reportProgress: ((Progress) -> Void)? = nil + ) throws { + try self.init(path: path, sharedThreadPool: nil, reportHead: reportHead, reportProgress: reportProgress) + } + + private init( path: String, - pool: NIOThreadPool = NIOThreadPool(numberOfThreads: 1), + sharedThreadPool: NIOThreadPool?, reportHead: ((HTTPResponseHead) -> Void)? = nil, reportProgress: ((Progress) -> Void)? = nil ) throws { + let pool: NIOThreadPool + if let sharedThreadPool = sharedThreadPool { + pool = sharedThreadPool + self.threadPool = .unowned + } else { + pool = NIOThreadPool(numberOfThreads: 1) + self.threadPool = .owned(pool) + } + pool.start() self.io = NonBlockingFileIO(threadPool: pool) self.filePath = path @@ -60,6 +102,7 @@ public final class FileDownloadDelegate: HTTPClientResponseDelegate { self.reportHead = reportHead self.reportProgress = reportProgress } + public func didReceiveHead( task: HTTPClient.Task, @@ -107,6 +150,12 @@ public final class FileDownloadDelegate: HTTPClientResponseDelegate { private func close(fileHandle: NIOFileHandle) { try! fileHandle.close() self.fileHandleFuture = nil + switch threadPool { + case .unowned: + break + case .owned(let pool): + try! pool.syncShutdownGracefully() + } } private func finalize() { @@ -128,4 +177,14 @@ public final class FileDownloadDelegate: HTTPClientResponseDelegate { self.finalize() return self.progress } + + deinit { + switch threadPool { + case .unowned: + break + case .owned(let pool): + // if the delegate is unused we still need to shutdown the thread pool + try! pool.syncShutdownGracefully() + } + } } From c5c0e7bed34a92d9d2aef7b7888062aa93c55589 Mon Sep 17 00:00:00 2001 From: David Nadoba Date: Fri, 12 Aug 2022 16:11:22 +0200 Subject: [PATCH 2/9] `SwiftFormat` --- Sources/AsyncHTTPClient/FileDownloadDelegate.swift | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/Sources/AsyncHTTPClient/FileDownloadDelegate.swift b/Sources/AsyncHTTPClient/FileDownloadDelegate.swift index 9189486be..b358af8d5 100644 --- a/Sources/AsyncHTTPClient/FileDownloadDelegate.swift +++ b/Sources/AsyncHTTPClient/FileDownloadDelegate.swift @@ -33,12 +33,13 @@ public final class FileDownloadDelegate: HTTPClientResponseDelegate { private let io: NonBlockingFileIO private let reportHead: ((HTTPResponseHead) -> Void)? private let reportProgress: ((Progress) -> Void)? - + private enum ThreadPool { case unowned // if we own the thread pool we also need to shut it down case owned(NIOThreadPool) } + private let threadPool: ThreadPool private var fileHandleFuture: EventLoopFuture? @@ -62,7 +63,7 @@ public final class FileDownloadDelegate: HTTPClientResponseDelegate { ) throws { try self.init(path: path, sharedThreadPool: pool, reportHead: reportHead, reportProgress: reportProgress) } - + /// Initializes a new file download delegate and spawns a new thread for file I/O. /// /// - parameters: @@ -79,7 +80,7 @@ public final class FileDownloadDelegate: HTTPClientResponseDelegate { ) throws { try self.init(path: path, sharedThreadPool: nil, reportHead: reportHead, reportProgress: reportProgress) } - + private init( path: String, sharedThreadPool: NIOThreadPool?, @@ -94,7 +95,7 @@ public final class FileDownloadDelegate: HTTPClientResponseDelegate { pool = NIOThreadPool(numberOfThreads: 1) self.threadPool = .owned(pool) } - + pool.start() self.io = NonBlockingFileIO(threadPool: pool) self.filePath = path @@ -102,7 +103,6 @@ public final class FileDownloadDelegate: HTTPClientResponseDelegate { self.reportHead = reportHead self.reportProgress = reportProgress } - public func didReceiveHead( task: HTTPClient.Task, @@ -150,7 +150,7 @@ public final class FileDownloadDelegate: HTTPClientResponseDelegate { private func close(fileHandle: NIOFileHandle) { try! fileHandle.close() self.fileHandleFuture = nil - switch threadPool { + switch self.threadPool { case .unowned: break case .owned(let pool): @@ -177,7 +177,7 @@ public final class FileDownloadDelegate: HTTPClientResponseDelegate { self.finalize() return self.progress } - + deinit { switch threadPool { case .unowned: From 1afc151d644c489a233916d26b99b966d6154dee Mon Sep 17 00:00:00 2001 From: David Nadoba Date: Mon, 15 Aug 2022 16:16:35 +0200 Subject: [PATCH 3/9] Add a shared file IO thread pool per HTTPClient --- .../FileDownloadDelegate.swift | 62 +++++++------------ Sources/AsyncHTTPClient/HTTPClient.swift | 16 +++-- Sources/AsyncHTTPClient/HTTPHandler.swift | 15 ++++- 3 files changed, 49 insertions(+), 44 deletions(-) diff --git a/Sources/AsyncHTTPClient/FileDownloadDelegate.swift b/Sources/AsyncHTTPClient/FileDownloadDelegate.swift index b358af8d5..c9a5ef16c 100644 --- a/Sources/AsyncHTTPClient/FileDownloadDelegate.swift +++ b/Sources/AsyncHTTPClient/FileDownloadDelegate.swift @@ -30,18 +30,10 @@ public final class FileDownloadDelegate: HTTPClientResponseDelegate { public typealias Response = Progress private let filePath: String - private let io: NonBlockingFileIO + private var io: NonBlockingFileIO? private let reportHead: ((HTTPResponseHead) -> Void)? private let reportProgress: ((Progress) -> Void)? - private enum ThreadPool { - case unowned - // if we own the thread pool we also need to shut it down - case owned(NIOThreadPool) - } - - private let threadPool: ThreadPool - private var fileHandleFuture: EventLoopFuture? private var writeFuture: EventLoopFuture? @@ -61,7 +53,7 @@ public final class FileDownloadDelegate: HTTPClientResponseDelegate { reportHead: ((HTTPResponseHead) -> Void)? = nil, reportProgress: ((Progress) -> Void)? = nil ) throws { - try self.init(path: path, sharedThreadPool: pool, reportHead: reportHead, reportProgress: reportProgress) + try self.init(path: path, pool: .some(pool), reportHead: reportHead, reportProgress: reportProgress) } /// Initializes a new file download delegate and spawns a new thread for file I/O. @@ -78,32 +70,37 @@ public final class FileDownloadDelegate: HTTPClientResponseDelegate { reportHead: ((HTTPResponseHead) -> Void)? = nil, reportProgress: ((Progress) -> Void)? = nil ) throws { - try self.init(path: path, sharedThreadPool: nil, reportHead: reportHead, reportProgress: reportProgress) + try self.init(path: path, pool: nil, reportHead: reportHead, reportProgress: reportProgress) } private init( path: String, - sharedThreadPool: NIOThreadPool?, + pool: NIOThreadPool?, reportHead: ((HTTPResponseHead) -> Void)? = nil, reportProgress: ((Progress) -> Void)? = nil ) throws { - let pool: NIOThreadPool - if let sharedThreadPool = sharedThreadPool { - pool = sharedThreadPool - self.threadPool = .unowned + if let pool = pool { + self.io = NonBlockingFileIO(threadPool: pool) } else { - pool = NIOThreadPool(numberOfThreads: 1) - self.threadPool = .owned(pool) + // we should use the shared thread pool from the HTTPClient which + // we will get shortly through a call to provideSharedThreadPool(fileIOPool:) + self.io = nil } - pool.start() - self.io = NonBlockingFileIO(threadPool: pool) self.filePath = path self.reportHead = reportHead self.reportProgress = reportProgress } + public func provideSharedThreadPool(fileIOPool: NIOThreadPool) { + guard self.io == nil else { + // user has provided their own thread pool + return + } + self.io = NonBlockingFileIO(threadPool: fileIOPool) + } + public func didReceiveHead( task: HTTPClient.Task, _ head: HTTPResponseHead @@ -122,16 +119,19 @@ public final class FileDownloadDelegate: HTTPClientResponseDelegate { task: HTTPClient.Task, _ buffer: ByteBuffer ) -> EventLoopFuture { + guard let io = io else { + preconditionFailure("thread pool not provided by HTTPClient before calling \(#function)") + } self.progress.receivedBytes += buffer.readableBytes self.reportProgress?(self.progress) let writeFuture: EventLoopFuture if let fileHandleFuture = self.fileHandleFuture { writeFuture = fileHandleFuture.flatMap { - self.io.write(fileHandle: $0, buffer: buffer, eventLoop: task.eventLoop) + io.write(fileHandle: $0, buffer: buffer, eventLoop: task.eventLoop) } } else { - let fileHandleFuture = self.io.openFile( + let fileHandleFuture = io.openFile( path: self.filePath, mode: .write, flags: .allowFileCreation(), @@ -139,7 +139,7 @@ public final class FileDownloadDelegate: HTTPClientResponseDelegate { ) self.fileHandleFuture = fileHandleFuture writeFuture = fileHandleFuture.flatMap { - self.io.write(fileHandle: $0, buffer: buffer, eventLoop: task.eventLoop) + io.write(fileHandle: $0, buffer: buffer, eventLoop: task.eventLoop) } } @@ -150,12 +150,6 @@ public final class FileDownloadDelegate: HTTPClientResponseDelegate { private func close(fileHandle: NIOFileHandle) { try! fileHandle.close() self.fileHandleFuture = nil - switch self.threadPool { - case .unowned: - break - case .owned(let pool): - try! pool.syncShutdownGracefully() - } } private func finalize() { @@ -177,14 +171,4 @@ public final class FileDownloadDelegate: HTTPClientResponseDelegate { self.finalize() return self.progress } - - deinit { - switch threadPool { - case .unowned: - break - case .owned(let pool): - // if the delegate is unused we still need to shutdown the thread pool - try! pool.syncShutdownGracefully() - } - } } diff --git a/Sources/AsyncHTTPClient/HTTPClient.swift b/Sources/AsyncHTTPClient/HTTPClient.swift index 3fbdb1366..09bad1c84 100644 --- a/Sources/AsyncHTTPClient/HTTPClient.swift +++ b/Sources/AsyncHTTPClient/HTTPClient.swift @@ -72,6 +72,9 @@ public class HTTPClient { let eventLoopGroupProvider: EventLoopGroupProvider let configuration: Configuration let poolManager: HTTPConnectionPool.Manager + + /// Shared thread pool used for file IO. It is given to the user through ``HTTPClientResponseDelegate/provideSharedThreadPool(fileIOPool:)-6phmu`` + private let fileIOThreadPool = NIOThreadPool(numberOfThreads: 1) private var state: State private let stateLock = Lock() @@ -97,6 +100,7 @@ public class HTTPClient { public required init(eventLoopGroupProvider: EventLoopGroupProvider, configuration: Configuration = Configuration(), backgroundActivityLogger: Logger) { + self.fileIOThreadPool.start() self.eventLoopGroupProvider = eventLoopGroupProvider switch self.eventLoopGroupProvider { case .shared(let group): @@ -241,10 +245,11 @@ public class HTTPClient { let error: Error? = (requiresClean && unclean) ? HTTPClientError.uncleanShutdown : nil return (callback, error) } - - self.shutdownEventLoop(queue: queue) { error in - let reportedError = error ?? uncleanError - callback(reportedError) + self.fileIOThreadPool.shutdownGracefully(queue: queue) { ioThreadPoolError in + self.shutdownEventLoop(queue: queue) { error in + let reportedError = error ?? ioThreadPoolError ?? uncleanError + callback(reportedError) + } } } } @@ -562,10 +567,13 @@ public class HTTPClient { case .testOnly_exact(_, delegateOn: let delegateEL): taskEL = delegateEL } + logger.trace("selected EventLoop for task given the preference", metadata: ["ahc-eventloop": "\(taskEL)", "ahc-el-preference": "\(eventLoopPreference)"]) + delegate.provideSharedThreadPool(fileIOPool: self.fileIOThreadPool) + let failedTask: Task? = self.stateLock.withLock { switch state { case .upAndRunning: diff --git a/Sources/AsyncHTTPClient/HTTPHandler.swift b/Sources/AsyncHTTPClient/HTTPHandler.swift index aeef71ba4..2e4358ad3 100644 --- a/Sources/AsyncHTTPClient/HTTPHandler.swift +++ b/Sources/AsyncHTTPClient/HTTPHandler.swift @@ -17,6 +17,7 @@ import Logging import NIOConcurrencyHelpers import NIOCore import NIOHTTP1 +import NIOPosix import NIOSSL extension HTTPClient { @@ -431,6 +432,13 @@ public class ResponseAccumulator: HTTPClientResponseDelegate { public protocol HTTPClientResponseDelegate: AnyObject { associatedtype Response + /// Called exactly once before any other methods of this delegate are called. + /// It is used to give access to the shared thread pool of the ``HTTPClient`` the request is executed on. + /// Use this thread pool to do file IO associated with the request e.g. to save a response to disk. + /// + /// - Parameter fileIOPool: File IO Pool + func provideSharedThreadPool(fileIOPool: NIOThreadPool) + /// Called when the request head is sent. Will be called once. /// /// - parameters: @@ -502,7 +510,12 @@ public protocol HTTPClientResponseDelegate: AnyObject { } extension HTTPClientResponseDelegate { - /// Default implementation of ``HTTPClientResponseDelegate/didSendRequestHead(task:_:)-6khai``. + /// Default implementation of ``HTTPClientResponseDelegate/provideSharedThreadPool(fileIOPool:)-8y1b`` + /// + /// By default, this does nothing. + public func provideSharedThreadPool(fileIOPool: NIOThreadPool) {} + + /// Default implementation of ``HTTPClientResponseDelegate/didSendRequest(task:)-9od5p``. /// /// By default, this does nothing. public func didSendRequestHead(task: HTTPClient.Task, _ head: HTTPRequestHead) {} From ac0b9f7329c938286ce8379832f849bd141e0f68 Mon Sep 17 00:00:00 2001 From: David Nadoba Date: Tue, 16 Aug 2022 14:21:08 +0200 Subject: [PATCH 4/9] User bigger thread pool and initlize lazily during first file write --- .../FileDownloadDelegate.swift | 14 +++++--- Sources/AsyncHTTPClient/HTTPClient.swift | 36 +++++++++++++++---- Sources/AsyncHTTPClient/HTTPHandler.swift | 29 ++++++++------- .../RequestBagTests.swift | 14 ++++++++ 4 files changed, 67 insertions(+), 26 deletions(-) diff --git a/Sources/AsyncHTTPClient/FileDownloadDelegate.swift b/Sources/AsyncHTTPClient/FileDownloadDelegate.swift index c9a5ef16c..45611e128 100644 --- a/Sources/AsyncHTTPClient/FileDownloadDelegate.swift +++ b/Sources/AsyncHTTPClient/FileDownloadDelegate.swift @@ -83,7 +83,7 @@ public final class FileDownloadDelegate: HTTPClientResponseDelegate { self.io = NonBlockingFileIO(threadPool: pool) } else { // we should use the shared thread pool from the HTTPClient which - // we will get shortly through a call to provideSharedThreadPool(fileIOPool:) + // we will get from the `HTTPClient.Task` self.io = nil } @@ -119,9 +119,15 @@ public final class FileDownloadDelegate: HTTPClientResponseDelegate { task: HTTPClient.Task, _ buffer: ByteBuffer ) -> EventLoopFuture { - guard let io = io else { - preconditionFailure("thread pool not provided by HTTPClient before calling \(#function)") - } + let io: NonBlockingFileIO = { + guard let io = self.io else { + let pool = task.fileIOThreadPool + let io = NonBlockingFileIO(threadPool: pool) + self.io = io + return io + } + return io + }() self.progress.receivedBytes += buffer.readableBytes self.reportProgress?(self.progress) diff --git a/Sources/AsyncHTTPClient/HTTPClient.swift b/Sources/AsyncHTTPClient/HTTPClient.swift index 09bad1c84..e5d69b5c4 100644 --- a/Sources/AsyncHTTPClient/HTTPClient.swift +++ b/Sources/AsyncHTTPClient/HTTPClient.swift @@ -74,7 +74,9 @@ public class HTTPClient { let poolManager: HTTPConnectionPool.Manager /// Shared thread pool used for file IO. It is given to the user through ``HTTPClientResponseDelegate/provideSharedThreadPool(fileIOPool:)-6phmu`` - private let fileIOThreadPool = NIOThreadPool(numberOfThreads: 1) + private var fileIOThreadPool: NIOThreadPool? + private var fileIOThreadPoolLock = Lock() + private var state: State private let stateLock = Lock() @@ -100,7 +102,6 @@ public class HTTPClient { public required init(eventLoopGroupProvider: EventLoopGroupProvider, configuration: Configuration = Configuration(), backgroundActivityLogger: Logger) { - self.fileIOThreadPool.start() self.eventLoopGroupProvider = eventLoopGroupProvider switch self.eventLoopGroupProvider { case .shared(let group): @@ -217,6 +218,16 @@ public class HTTPClient { } } + private func shutdownFileIOThreadPool(queue: DispatchQueue, _ callback: @escaping (Error?) -> Void) { + self.fileIOThreadPoolLock.withLockVoid { + guard let fileIOThreadPool = fileIOThreadPool else { + callback(nil) + return + } + fileIOThreadPool.shutdownGracefully(queue: queue, callback) + } + } + private func shutdown(requiresCleanClose: Bool, queue: DispatchQueue, _ callback: @escaping (Error?) -> Void) { do { try self.stateLock.withLock { @@ -245,7 +256,7 @@ public class HTTPClient { let error: Error? = (requiresClean && unclean) ? HTTPClientError.uncleanShutdown : nil return (callback, error) } - self.fileIOThreadPool.shutdownGracefully(queue: queue) { ioThreadPoolError in + self.shutdownFileIOThreadPool(queue: queue) { ioThreadPoolError in self.shutdownEventLoop(queue: queue) { error in let reportedError = error ?? ioThreadPoolError ?? uncleanError callback(reportedError) @@ -255,6 +266,18 @@ public class HTTPClient { } } + private func makeOrGetFileIOThreadPool() -> NIOThreadPool { + self.fileIOThreadPoolLock.withLock { + guard let fileIOThreadPool = fileIOThreadPool else { + let fileIOThreadPool = NIOThreadPool(numberOfThreads: ProcessInfo.processInfo.processorCount) + fileIOThreadPool.start() + self.fileIOThreadPool = fileIOThreadPool + return fileIOThreadPool + } + return fileIOThreadPool + } + } + /// Execute `GET` request using specified URL. /// /// - parameters: @@ -572,8 +595,6 @@ public class HTTPClient { metadata: ["ahc-eventloop": "\(taskEL)", "ahc-el-preference": "\(eventLoopPreference)"]) - delegate.provideSharedThreadPool(fileIOPool: self.fileIOThreadPool) - let failedTask: Task? = self.stateLock.withLock { switch state { case .upAndRunning: @@ -582,7 +603,8 @@ public class HTTPClient { logger.debug("client is shutting down, failing request") return Task.failedTask(eventLoop: taskEL, error: HTTPClientError.alreadyShutdown, - logger: logger) + logger: logger, + makeOrGetFileIOThreadPool: self.makeOrGetFileIOThreadPool) } } @@ -605,7 +627,7 @@ public class HTTPClient { } }() - let task = Task(eventLoop: taskEL, logger: logger) + let task = Task(eventLoop: taskEL, logger: logger, makeOrGetFileIOThreadPool: self.makeOrGetFileIOThreadPool) do { let requestBag = try RequestBag( request: request, diff --git a/Sources/AsyncHTTPClient/HTTPHandler.swift b/Sources/AsyncHTTPClient/HTTPHandler.swift index 2e4358ad3..2095aaf0a 100644 --- a/Sources/AsyncHTTPClient/HTTPHandler.swift +++ b/Sources/AsyncHTTPClient/HTTPHandler.swift @@ -432,13 +432,6 @@ public class ResponseAccumulator: HTTPClientResponseDelegate { public protocol HTTPClientResponseDelegate: AnyObject { associatedtype Response - /// Called exactly once before any other methods of this delegate are called. - /// It is used to give access to the shared thread pool of the ``HTTPClient`` the request is executed on. - /// Use this thread pool to do file IO associated with the request e.g. to save a response to disk. - /// - /// - Parameter fileIOPool: File IO Pool - func provideSharedThreadPool(fileIOPool: NIOThreadPool) - /// Called when the request head is sent. Will be called once. /// /// - parameters: @@ -510,11 +503,6 @@ public protocol HTTPClientResponseDelegate: AnyObject { } extension HTTPClientResponseDelegate { - /// Default implementation of ``HTTPClientResponseDelegate/provideSharedThreadPool(fileIOPool:)-8y1b`` - /// - /// By default, this does nothing. - public func provideSharedThreadPool(fileIOPool: NIOThreadPool) {} - /// Default implementation of ``HTTPClientResponseDelegate/didSendRequest(task:)-9od5p``. /// /// By default, this does nothing. @@ -635,15 +623,26 @@ extension HTTPClient { private var _isCancelled: Bool = false private var _taskDelegate: HTTPClientTaskDelegate? private let lock = Lock() + private let makeOrGetFileIOThreadPool: () -> NIOThreadPool + + public var fileIOThreadPool: NIOThreadPool { + self.makeOrGetFileIOThreadPool() + } - init(eventLoop: EventLoop, logger: Logger) { + init(eventLoop: EventLoop, logger: Logger, makeOrGetFileIOThreadPool: @escaping () -> NIOThreadPool) { self.eventLoop = eventLoop self.promise = eventLoop.makePromise() self.logger = logger + self.makeOrGetFileIOThreadPool = makeOrGetFileIOThreadPool } - static func failedTask(eventLoop: EventLoop, error: Error, logger: Logger) -> Task { - let task = self.init(eventLoop: eventLoop, logger: logger) + static func failedTask( + eventLoop: EventLoop, + error: Error, + logger: Logger, + makeOrGetFileIOThreadPool: @escaping () -> NIOThreadPool + ) -> Task { + let task = self.init(eventLoop: eventLoop, logger: logger, makeOrGetFileIOThreadPool: makeOrGetFileIOThreadPool) task.promise.fail(error) return task } diff --git a/Tests/AsyncHTTPClientTests/RequestBagTests.swift b/Tests/AsyncHTTPClientTests/RequestBagTests.swift index 9e7072c19..ebd499905 100644 --- a/Tests/AsyncHTTPClientTests/RequestBagTests.swift +++ b/Tests/AsyncHTTPClientTests/RequestBagTests.swift @@ -771,6 +771,20 @@ final class RequestBagTests: XCTestCase { } } +import NIOPosix + +extension HTTPClient.Task { + convenience init( + eventLoop: EventLoop, + logger: Logger + ) { + lazy var threadPool = NIOThreadPool(numberOfThreads: 1) + self.init(eventLoop: eventLoop, logger: logger) { + threadPool + } + } +} + class UploadCountingDelegate: HTTPClientResponseDelegate { typealias Response = Void From 33566e7c0455d07a7aac94fc6acafc0a4b8214bf Mon Sep 17 00:00:00 2001 From: David Nadoba Date: Tue, 16 Aug 2022 15:14:30 +0200 Subject: [PATCH 5/9] thread pool is actually not used in tests --- Tests/AsyncHTTPClientTests/RequestBagTests.swift | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/Tests/AsyncHTTPClientTests/RequestBagTests.swift b/Tests/AsyncHTTPClientTests/RequestBagTests.swift index ebd499905..6993c0df9 100644 --- a/Tests/AsyncHTTPClientTests/RequestBagTests.swift +++ b/Tests/AsyncHTTPClientTests/RequestBagTests.swift @@ -771,16 +771,13 @@ final class RequestBagTests: XCTestCase { } } -import NIOPosix - extension HTTPClient.Task { convenience init( eventLoop: EventLoop, logger: Logger ) { - lazy var threadPool = NIOThreadPool(numberOfThreads: 1) self.init(eventLoop: eventLoop, logger: logger) { - threadPool + preconditionFailure("thread pool not needed in tests") } } } From f8b57057d019a652cd784ac69b9a64c5b1cc02dd Mon Sep 17 00:00:00 2001 From: David Nadoba Date: Tue, 16 Aug 2022 16:51:22 +0200 Subject: [PATCH 6/9] Update documentation --- Sources/AsyncHTTPClient/FileDownloadDelegate.swift | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Sources/AsyncHTTPClient/FileDownloadDelegate.swift b/Sources/AsyncHTTPClient/FileDownloadDelegate.swift index 45611e128..90eda2f5e 100644 --- a/Sources/AsyncHTTPClient/FileDownloadDelegate.swift +++ b/Sources/AsyncHTTPClient/FileDownloadDelegate.swift @@ -56,7 +56,7 @@ public final class FileDownloadDelegate: HTTPClientResponseDelegate { try self.init(path: path, pool: .some(pool), reportHead: reportHead, reportProgress: reportProgress) } - /// Initializes a new file download delegate and spawns a new thread for file I/O. + /// Initializes a new file download delegate and uses the shared thread pool of the ``HTTPClient`` for file I/O. /// /// - parameters: /// - path: Path to a file you'd like to write the download to. From 32fac04c4ceecb0bd45b1e2c5c392501d816828f Mon Sep 17 00:00:00 2001 From: David Nadoba Date: Wed, 17 Aug 2022 13:39:42 +0200 Subject: [PATCH 7/9] fix review comments --- Sources/AsyncHTTPClient/FileDownloadDelegate.swift | 8 -------- Sources/AsyncHTTPClient/HTTPClient.swift | 4 ++-- Sources/AsyncHTTPClient/HTTPHandler.swift | 1 + 3 files changed, 3 insertions(+), 10 deletions(-) diff --git a/Sources/AsyncHTTPClient/FileDownloadDelegate.swift b/Sources/AsyncHTTPClient/FileDownloadDelegate.swift index 90eda2f5e..01474197f 100644 --- a/Sources/AsyncHTTPClient/FileDownloadDelegate.swift +++ b/Sources/AsyncHTTPClient/FileDownloadDelegate.swift @@ -93,14 +93,6 @@ public final class FileDownloadDelegate: HTTPClientResponseDelegate { self.reportProgress = reportProgress } - public func provideSharedThreadPool(fileIOPool: NIOThreadPool) { - guard self.io == nil else { - // user has provided their own thread pool - return - } - self.io = NonBlockingFileIO(threadPool: fileIOPool) - } - public func didReceiveHead( task: HTTPClient.Task, _ head: HTTPResponseHead diff --git a/Sources/AsyncHTTPClient/HTTPClient.swift b/Sources/AsyncHTTPClient/HTTPClient.swift index 5533630ac..ab4f7815e 100644 --- a/Sources/AsyncHTTPClient/HTTPClient.swift +++ b/Sources/AsyncHTTPClient/HTTPClient.swift @@ -73,9 +73,9 @@ public class HTTPClient { let configuration: Configuration let poolManager: HTTPConnectionPool.Manager - /// Shared thread pool used for file IO. It is given to the user through ``HTTPClientResponseDelegate/provideSharedThreadPool(fileIOPool:)-6phmu`` + /// Shared thread pool used for file IO. It is lazily created on first access of ``Task/fileIOThreadPool``. private var fileIOThreadPool: NIOThreadPool? - private var fileIOThreadPoolLock = Lock() + private let fileIOThreadPoolLock = Lock() private var state: State private let stateLock = Lock() diff --git a/Sources/AsyncHTTPClient/HTTPHandler.swift b/Sources/AsyncHTTPClient/HTTPHandler.swift index 2095aaf0a..683290539 100644 --- a/Sources/AsyncHTTPClient/HTTPHandler.swift +++ b/Sources/AsyncHTTPClient/HTTPHandler.swift @@ -625,6 +625,7 @@ extension HTTPClient { private let lock = Lock() private let makeOrGetFileIOThreadPool: () -> NIOThreadPool + /// The shared thread pool of a ``HTTPClient`` used for file IO. It is lazily created on first access. public var fileIOThreadPool: NIOThreadPool { self.makeOrGetFileIOThreadPool() } From c3c68b2a27cc55c7597ddd83025b71fec7404ffc Mon Sep 17 00:00:00 2001 From: David Nadoba Date: Wed, 17 Aug 2022 14:06:56 +0200 Subject: [PATCH 8/9] make `fileIOThreadPool` internal --- Sources/AsyncHTTPClient/HTTPHandler.swift | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Sources/AsyncHTTPClient/HTTPHandler.swift b/Sources/AsyncHTTPClient/HTTPHandler.swift index 683290539..c62c2f7d1 100644 --- a/Sources/AsyncHTTPClient/HTTPHandler.swift +++ b/Sources/AsyncHTTPClient/HTTPHandler.swift @@ -626,7 +626,7 @@ extension HTTPClient { private let makeOrGetFileIOThreadPool: () -> NIOThreadPool /// The shared thread pool of a ``HTTPClient`` used for file IO. It is lazily created on first access. - public var fileIOThreadPool: NIOThreadPool { + internal var fileIOThreadPool: NIOThreadPool { self.makeOrGetFileIOThreadPool() } From 9ee5c82cfbf732d7cf2aab533c1f35392b8d8680 Mon Sep 17 00:00:00 2001 From: David Nadoba Date: Wed, 17 Aug 2022 18:01:12 +0200 Subject: [PATCH 9/9] Add test to verify that we actually share the same thread pool across all delegates for a given HTTPClient --- .../FileDownloadDelegate.swift | 18 +++++----- .../HTTPClientInternalTests+XCTest.swift | 1 + .../HTTPClientInternalTests.swift | 35 +++++++++++++++++++ .../HTTPClientTestUtils.swift | 17 +++++++++ 4 files changed, 62 insertions(+), 9 deletions(-) diff --git a/Sources/AsyncHTTPClient/FileDownloadDelegate.swift b/Sources/AsyncHTTPClient/FileDownloadDelegate.swift index 01474197f..6199f33ff 100644 --- a/Sources/AsyncHTTPClient/FileDownloadDelegate.swift +++ b/Sources/AsyncHTTPClient/FileDownloadDelegate.swift @@ -30,7 +30,7 @@ public final class FileDownloadDelegate: HTTPClientResponseDelegate { public typealias Response = Progress private let filePath: String - private var io: NonBlockingFileIO? + private(set) var fileIOThreadPool: NIOThreadPool? private let reportHead: ((HTTPResponseHead) -> Void)? private let reportProgress: ((Progress) -> Void)? @@ -80,11 +80,11 @@ public final class FileDownloadDelegate: HTTPClientResponseDelegate { reportProgress: ((Progress) -> Void)? = nil ) throws { if let pool = pool { - self.io = NonBlockingFileIO(threadPool: pool) + self.fileIOThreadPool = pool } else { // we should use the shared thread pool from the HTTPClient which // we will get from the `HTTPClient.Task` - self.io = nil + self.fileIOThreadPool = nil } self.filePath = path @@ -111,15 +111,15 @@ public final class FileDownloadDelegate: HTTPClientResponseDelegate { task: HTTPClient.Task, _ buffer: ByteBuffer ) -> EventLoopFuture { - let io: NonBlockingFileIO = { - guard let io = self.io else { + let threadPool: NIOThreadPool = { + guard let pool = self.fileIOThreadPool else { let pool = task.fileIOThreadPool - let io = NonBlockingFileIO(threadPool: pool) - self.io = io - return io + self.fileIOThreadPool = pool + return pool } - return io + return pool }() + let io = NonBlockingFileIO(threadPool: threadPool) self.progress.receivedBytes += buffer.readableBytes self.reportProgress?(self.progress) diff --git a/Tests/AsyncHTTPClientTests/HTTPClientInternalTests+XCTest.swift b/Tests/AsyncHTTPClientTests/HTTPClientInternalTests+XCTest.swift index 3be2c79a6..9114df259 100644 --- a/Tests/AsyncHTTPClientTests/HTTPClientInternalTests+XCTest.swift +++ b/Tests/AsyncHTTPClientTests/HTTPClientInternalTests+XCTest.swift @@ -36,6 +36,7 @@ extension HTTPClientInternalTests { ("testConnectErrorCalloutOnCorrectEL", testConnectErrorCalloutOnCorrectEL), ("testInternalRequestURI", testInternalRequestURI), ("testHasSuffix", testHasSuffix), + ("testSharedThreadPoolIsIdenticalForAllDelegates", testSharedThreadPoolIsIdenticalForAllDelegates), ] } } diff --git a/Tests/AsyncHTTPClientTests/HTTPClientInternalTests.swift b/Tests/AsyncHTTPClientTests/HTTPClientInternalTests.swift index 492bb4c35..234185eb6 100644 --- a/Tests/AsyncHTTPClientTests/HTTPClientInternalTests.swift +++ b/Tests/AsyncHTTPClientTests/HTTPClientInternalTests.swift @@ -554,4 +554,39 @@ class HTTPClientInternalTests: XCTestCase { XCTAssertFalse(elements.hasSuffix([0, 0, 0])) } } + + /// test to verify that we actually share the same thread pool across all ``FileDownloadDelegate``s for a given ``HTTPClient`` + func testSharedThreadPoolIsIdenticalForAllDelegates() throws { + let httpBin = HTTPBin() + let httpClient = HTTPClient(eventLoopGroupProvider: .shared(self.clientGroup)) + defer { + XCTAssertNoThrow(try httpClient.syncShutdown(requiresCleanClose: true)) + XCTAssertNoThrow(try httpBin.shutdown()) + } + var request = try Request(url: "http://localhost:\(httpBin.port)/events/10/content-length") + request.headers.add(name: "Accept", value: "text/event-stream") + + let filePaths = (0..<10).map { _ in + TemporaryFileHelpers.makeTemporaryFilePath() + } + defer { + for filePath in filePaths { + TemporaryFileHelpers.removeTemporaryFile(at: filePath) + } + } + let delegates = try filePaths.map { + try FileDownloadDelegate(path: $0) + } + + let resultFutures = delegates.map { delegate in + httpClient.execute( + request: request, + delegate: delegate + ).futureResult + } + _ = try EventLoopFuture.whenAllSucceed(resultFutures, on: self.clientGroup.next()).wait() + let threadPools = delegates.map { $0.fileIOThreadPool } + let firstThreadPool = threadPools.first ?? nil + XCTAssert(threadPools.dropFirst().allSatisfy { $0 === firstThreadPool }) + } } diff --git a/Tests/AsyncHTTPClientTests/HTTPClientTestUtils.swift b/Tests/AsyncHTTPClientTests/HTTPClientTestUtils.swift index 8f7d4dfce..7cd9ef83d 100644 --- a/Tests/AsyncHTTPClientTests/HTTPClientTestUtils.swift +++ b/Tests/AsyncHTTPClientTests/HTTPClientTestUtils.swift @@ -283,6 +283,23 @@ enum TemporaryFileHelpers { return try body(path) } + internal static func makeTemporaryFilePath( + directory: String = temporaryDirectory + ) -> String { + let (fd, path) = self.openTemporaryFile() + close(fd) + try! FileManager.default.removeItem(atPath: path) + return path + } + + internal static func removeTemporaryFile( + at path: String + ) { + if FileManager.default.fileExists(atPath: path) { + try? FileManager.default.removeItem(atPath: path) + } + } + internal static func fileSize(path: String) throws -> Int? { return try FileManager.default.attributesOfItem(atPath: path)[.size] as? Int }