Skip to content

Fix thread leak in FileDownloadDelegate #614

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 12 commits into from
Aug 18, 2022
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 51 additions & 8 deletions Sources/AsyncHTTPClient/FileDownloadDelegate.swift
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ public final class FileDownloadDelegate: HTTPClientResponseDelegate {
public typealias Response = Progress

private let filePath: String
private let io: NonBlockingFileIO
private var io: NonBlockingFileIO?
private let reportHead: ((HTTPResponseHead) -> Void)?
private let reportProgress: ((Progress) -> Void)?

Expand All @@ -47,20 +47,60 @@ public final class FileDownloadDelegate: HTTPClientResponseDelegate {
/// the total byte count and download byte count passed to it as arguments. The callbacks
/// will be invoked in the same threading context that the delegate itself is invoked,
/// as controlled by `EventLoopPreference`.
public init(
public convenience init(
path: String,
pool: NIOThreadPool = NIOThreadPool(numberOfThreads: 1),
pool: NIOThreadPool,
reportHead: ((HTTPResponseHead) -> Void)? = nil,
reportProgress: ((Progress) -> Void)? = nil
) throws {
pool.start()
self.io = NonBlockingFileIO(threadPool: pool)
try self.init(path: path, pool: .some(pool), reportHead: reportHead, reportProgress: reportProgress)
}

/// Initializes a new file download delegate and spawns a new thread for file I/O.
///
/// - parameters:
/// - path: Path to a file you'd like to write the download to.
/// - reportHead: A closure called when the response head is available.
/// - reportProgress: A closure called when a body chunk has been downloaded, with
/// the total byte count and download byte count passed to it as arguments. The callbacks
/// will be invoked in the same threading context that the delegate itself is invoked,
/// as controlled by `EventLoopPreference`.
public convenience init(
path: String,
reportHead: ((HTTPResponseHead) -> Void)? = nil,
reportProgress: ((Progress) -> Void)? = nil
) throws {
try self.init(path: path, pool: nil, reportHead: reportHead, reportProgress: reportProgress)
}

private init(
path: String,
pool: NIOThreadPool?,
reportHead: ((HTTPResponseHead) -> Void)? = nil,
reportProgress: ((Progress) -> Void)? = nil
) throws {
if let pool = pool {
self.io = NonBlockingFileIO(threadPool: pool)
} else {
// we should use the shared thread pool from the HTTPClient which
// we will get shortly through a call to provideSharedThreadPool(fileIOPool:)
self.io = nil
}

self.filePath = path

self.reportHead = reportHead
self.reportProgress = reportProgress
}

public func provideSharedThreadPool(fileIOPool: NIOThreadPool) {
guard self.io == nil else {
// user has provided their own thread pool
return
}
self.io = NonBlockingFileIO(threadPool: fileIOPool)
}

public func didReceiveHead(
task: HTTPClient.Task<Response>,
_ head: HTTPResponseHead
Expand All @@ -79,24 +119,27 @@ public final class FileDownloadDelegate: HTTPClientResponseDelegate {
task: HTTPClient.Task<Response>,
_ buffer: ByteBuffer
) -> EventLoopFuture<Void> {
guard let io = io else {
preconditionFailure("thread pool not provided by HTTPClient before calling \(#function)")
}
self.progress.receivedBytes += buffer.readableBytes
self.reportProgress?(self.progress)

let writeFuture: EventLoopFuture<Void>
if let fileHandleFuture = self.fileHandleFuture {
writeFuture = fileHandleFuture.flatMap {
self.io.write(fileHandle: $0, buffer: buffer, eventLoop: task.eventLoop)
io.write(fileHandle: $0, buffer: buffer, eventLoop: task.eventLoop)
}
} else {
let fileHandleFuture = self.io.openFile(
let fileHandleFuture = io.openFile(
path: self.filePath,
mode: .write,
flags: .allowFileCreation(),
eventLoop: task.eventLoop
)
self.fileHandleFuture = fileHandleFuture
writeFuture = fileHandleFuture.flatMap {
self.io.write(fileHandle: $0, buffer: buffer, eventLoop: task.eventLoop)
io.write(fileHandle: $0, buffer: buffer, eventLoop: task.eventLoop)
}
}

Expand Down
16 changes: 12 additions & 4 deletions Sources/AsyncHTTPClient/HTTPClient.swift
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,9 @@ public class HTTPClient {
let eventLoopGroupProvider: EventLoopGroupProvider
let configuration: Configuration
let poolManager: HTTPConnectionPool.Manager

/// Shared thread pool used for file IO. It is given to the user through ``HTTPClientResponseDelegate/provideSharedThreadPool(fileIOPool:)-6phmu``
private let fileIOThreadPool = NIOThreadPool(numberOfThreads: 1)
private var state: State
private let stateLock = Lock()

Expand All @@ -97,6 +100,7 @@ public class HTTPClient {
public required init(eventLoopGroupProvider: EventLoopGroupProvider,
configuration: Configuration = Configuration(),
backgroundActivityLogger: Logger) {
self.fileIOThreadPool.start()
self.eventLoopGroupProvider = eventLoopGroupProvider
switch self.eventLoopGroupProvider {
case .shared(let group):
Expand Down Expand Up @@ -241,10 +245,11 @@ public class HTTPClient {
let error: Error? = (requiresClean && unclean) ? HTTPClientError.uncleanShutdown : nil
return (callback, error)
}

self.shutdownEventLoop(queue: queue) { error in
let reportedError = error ?? uncleanError
callback(reportedError)
self.fileIOThreadPool.shutdownGracefully(queue: queue) { ioThreadPoolError in
self.shutdownEventLoop(queue: queue) { error in
let reportedError = error ?? ioThreadPoolError ?? uncleanError
callback(reportedError)
}
}
}
}
Expand Down Expand Up @@ -562,10 +567,13 @@ public class HTTPClient {
case .testOnly_exact(_, delegateOn: let delegateEL):
taskEL = delegateEL
}

logger.trace("selected EventLoop for task given the preference",
metadata: ["ahc-eventloop": "\(taskEL)",
"ahc-el-preference": "\(eventLoopPreference)"])

delegate.provideSharedThreadPool(fileIOPool: self.fileIOThreadPool)

let failedTask: Task<Delegate.Response>? = self.stateLock.withLock {
switch state {
case .upAndRunning:
Expand Down
15 changes: 14 additions & 1 deletion Sources/AsyncHTTPClient/HTTPHandler.swift
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import Logging
import NIOConcurrencyHelpers
import NIOCore
import NIOHTTP1
import NIOPosix
import NIOSSL

extension HTTPClient {
Expand Down Expand Up @@ -431,6 +432,13 @@ public class ResponseAccumulator: HTTPClientResponseDelegate {
public protocol HTTPClientResponseDelegate: AnyObject {
associatedtype Response

/// Called exactly once before any other methods of this delegate are called.
/// It is used to give access to the shared thread pool of the ``HTTPClient`` the request is executed on.
/// Use this thread pool to do file IO associated with the request e.g. to save a response to disk.
///
/// - Parameter fileIOPool: File IO Pool
func provideSharedThreadPool(fileIOPool: NIOThreadPool)

/// Called when the request head is sent. Will be called once.
///
/// - parameters:
Expand Down Expand Up @@ -502,7 +510,12 @@ public protocol HTTPClientResponseDelegate: AnyObject {
}

extension HTTPClientResponseDelegate {
/// Default implementation of ``HTTPClientResponseDelegate/didSendRequestHead(task:_:)-6khai``.
/// Default implementation of ``HTTPClientResponseDelegate/provideSharedThreadPool(fileIOPool:)-8y1b``
///
/// By default, this does nothing.
public func provideSharedThreadPool(fileIOPool: NIOThreadPool) {}

/// Default implementation of ``HTTPClientResponseDelegate/didSendRequest(task:)-9od5p``.
///
/// By default, this does nothing.
public func didSendRequestHead(task: HTTPClient.Task<Response>, _ head: HTTPRequestHead) {}
Expand Down