diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml index 8b55048f9..cad474de7 100644 --- a/.github/workflows/main.yml +++ b/.github/workflows/main.yml @@ -12,7 +12,7 @@ jobs: uses: apple/swift-nio/.github/workflows/unit_tests.yml@main with: linux_5_9_enabled: false - linux_5_10_arguments_override: "-Xswiftc -warnings-as-errors --explicit-target-dependency-import-check error" + linux_5_10_arguments_override: "--explicit-target-dependency-import-check error" linux_6_0_arguments_override: "--explicit-target-dependency-import-check error -Xswiftc -require-explicit-sendable" linux_6_1_arguments_override: "--explicit-target-dependency-import-check error -Xswiftc -require-explicit-sendable" linux_nightly_next_arguments_override: "--explicit-target-dependency-import-check error -Xswiftc -require-explicit-sendable" diff --git a/.github/workflows/pull_request.yml b/.github/workflows/pull_request.yml index dd914711f..f77d49ebd 100644 --- a/.github/workflows/pull_request.yml +++ b/.github/workflows/pull_request.yml @@ -15,7 +15,7 @@ jobs: uses: apple/swift-nio/.github/workflows/unit_tests.yml@main with: linux_5_9_enabled: false - linux_5_10_arguments_override: "-Xswiftc -warnings-as-errors --explicit-target-dependency-import-check error" + linux_5_10_arguments_override: "--explicit-target-dependency-import-check error" linux_6_0_arguments_override: "--explicit-target-dependency-import-check error -Xswiftc -require-explicit-sendable" linux_6_1_arguments_override: "--explicit-target-dependency-import-check error -Xswiftc -require-explicit-sendable" linux_nightly_next_arguments_override: "--explicit-target-dependency-import-check error -Xswiftc -require-explicit-sendable" diff --git a/Sources/AsyncHTTPClient/ConnectionPool/HTTP1/HTTP1ClientChannelHandler.swift b/Sources/AsyncHTTPClient/ConnectionPool/HTTP1/HTTP1ClientChannelHandler.swift index 8203f07af..191517c71 100644 --- a/Sources/AsyncHTTPClient/ConnectionPool/HTTP1/HTTP1ClientChannelHandler.swift +++ b/Sources/AsyncHTTPClient/ConnectionPool/HTTP1/HTTP1ClientChannelHandler.swift @@ -185,7 +185,7 @@ final class HTTP1ClientChannelHandler: ChannelDuplexHandler { self.runTimeoutAction(timeoutAction, context: context) } - req.willExecuteRequest(self) + req.willExecuteRequest(self.requestExecutor) let action = self.state.runNewRequest( head: req.requestHead, @@ -323,7 +323,7 @@ final class HTTP1ClientChannelHandler: ChannelDuplexHandler { case .sendRequestEnd(let writePromise, let shouldClose): let writePromise = writePromise ?? context.eventLoop.makePromise(of: Void.self) // We need to defer succeeding the old request to avoid ordering issues - writePromise.futureResult.hop(to: context.eventLoop).whenComplete { result in + writePromise.futureResult.hop(to: context.eventLoop).assumeIsolated().whenComplete { result in switch result { case .success: // If our final action was `sendRequestEnd`, that means we've already received @@ -396,7 +396,7 @@ final class HTTP1ClientChannelHandler: ChannelDuplexHandler { assert(self.idleReadTimeoutTimer == nil, "Expected there is no timeout timer so far.") let timerID = self.currentIdleReadTimeoutTimerID - self.idleReadTimeoutTimer = self.eventLoop.scheduleTask(in: timeAmount) { + self.idleReadTimeoutTimer = self.eventLoop.assumeIsolated().scheduleTask(in: timeAmount) { guard self.currentIdleReadTimeoutTimerID == timerID else { return } let action = self.state.idleReadTimeoutTriggered() self.run(action, context: context) @@ -409,7 +409,7 @@ final class HTTP1ClientChannelHandler: ChannelDuplexHandler { self.currentIdleReadTimeoutTimerID &+= 1 let timerID = self.currentIdleReadTimeoutTimerID - self.idleReadTimeoutTimer = self.eventLoop.scheduleTask(in: timeAmount) { + self.idleReadTimeoutTimer = self.eventLoop.assumeIsolated().scheduleTask(in: timeAmount) { guard self.currentIdleReadTimeoutTimerID == timerID else { return } let action = self.state.idleReadTimeoutTriggered() self.run(action, context: context) @@ -431,7 +431,7 @@ final class HTTP1ClientChannelHandler: ChannelDuplexHandler { assert(self.idleWriteTimeoutTimer == nil, "Expected there is no timeout timer so far.") let timerID = self.currentIdleWriteTimeoutTimerID - self.idleWriteTimeoutTimer = self.eventLoop.scheduleTask(in: timeAmount) { + self.idleWriteTimeoutTimer = self.eventLoop.assumeIsolated().scheduleTask(in: timeAmount) { guard self.currentIdleWriteTimeoutTimerID == timerID else { return } let action = self.state.idleWriteTimeoutTriggered() self.run(action, context: context) @@ -443,7 +443,7 @@ final class HTTP1ClientChannelHandler: ChannelDuplexHandler { self.currentIdleWriteTimeoutTimerID &+= 1 let timerID = self.currentIdleWriteTimeoutTimerID - self.idleWriteTimeoutTimer = self.eventLoop.scheduleTask(in: timeAmount) { + self.idleWriteTimeoutTimer = self.eventLoop.assumeIsolated().scheduleTask(in: timeAmount) { guard self.currentIdleWriteTimeoutTimerID == timerID else { return } let action = self.state.idleWriteTimeoutTriggered() self.run(action, context: context) @@ -461,8 +461,11 @@ final class HTTP1ClientChannelHandler: ChannelDuplexHandler { // MARK: Private HTTPRequestExecutor - private func writeRequestBodyPart0(_ data: IOData, request: HTTPExecutableRequest, promise: EventLoopPromise?) - { + fileprivate func writeRequestBodyPart0( + _ data: IOData, + request: HTTPExecutableRequest, + promise: EventLoopPromise? + ) { guard self.request === request, let context = self.channelContext else { // Because the HTTPExecutableRequest may run in a different thread to our eventLoop, // calls from the HTTPExecutableRequest to our ChannelHandler may arrive here after @@ -481,7 +484,7 @@ final class HTTP1ClientChannelHandler: ChannelDuplexHandler { self.run(action, context: context) } - private func finishRequestBodyStream0(_ request: HTTPExecutableRequest, promise: EventLoopPromise?) { + fileprivate func finishRequestBodyStream0(_ request: HTTPExecutableRequest, promise: EventLoopPromise?) { guard self.request === request, let context = self.channelContext else { // See code comment in `writeRequestBodyPart0` promise?.fail(HTTPClientError.requestStreamCancelled) @@ -492,7 +495,7 @@ final class HTTP1ClientChannelHandler: ChannelDuplexHandler { self.run(action, context: context) } - private func demandResponseBodyStream0(_ request: HTTPExecutableRequest) { + fileprivate func demandResponseBodyStream0(_ request: HTTPExecutableRequest) { guard self.request === request, let context = self.channelContext else { // See code comment in `writeRequestBodyPart0` return @@ -504,7 +507,7 @@ final class HTTP1ClientChannelHandler: ChannelDuplexHandler { self.run(action, context: context) } - private func cancelRequest0(_ request: HTTPExecutableRequest) { + fileprivate func cancelRequest0(_ request: HTTPExecutableRequest) { guard self.request === request, let context = self.channelContext else { // See code comment in `writeRequestBodyPart0` return @@ -524,43 +527,39 @@ final class HTTP1ClientChannelHandler: ChannelDuplexHandler { @available(*, unavailable) extension HTTP1ClientChannelHandler: Sendable {} -extension HTTP1ClientChannelHandler: HTTPRequestExecutor { - func writeRequestBodyPart(_ data: IOData, request: HTTPExecutableRequest, promise: EventLoopPromise?) { - if self.eventLoop.inEventLoop { - self.writeRequestBodyPart0(data, request: request, promise: promise) - } else { - self.eventLoop.execute { - self.writeRequestBodyPart0(data, request: request, promise: promise) +extension HTTP1ClientChannelHandler { + var requestExecutor: RequestExecutor { + RequestExecutor(self) + } + + struct RequestExecutor: HTTPRequestExecutor, Sendable { + private let loopBound: NIOLoopBound + + init(_ handler: HTTP1ClientChannelHandler) { + self.loopBound = NIOLoopBound(handler, eventLoop: handler.eventLoop) + } + + func writeRequestBodyPart(_ data: IOData, request: HTTPExecutableRequest, promise: EventLoopPromise?) { + self.loopBound.execute { + $0.writeRequestBodyPart0(data, request: request, promise: promise) } } - } - func finishRequestBodyStream(_ request: HTTPExecutableRequest, promise: EventLoopPromise?) { - if self.eventLoop.inEventLoop { - self.finishRequestBodyStream0(request, promise: promise) - } else { - self.eventLoop.execute { - self.finishRequestBodyStream0(request, promise: promise) + func finishRequestBodyStream(_ request: HTTPExecutableRequest, promise: EventLoopPromise?) { + self.loopBound.execute { + $0.finishRequestBodyStream0(request, promise: promise) } } - } - func demandResponseBodyStream(_ request: HTTPExecutableRequest) { - if self.eventLoop.inEventLoop { - self.demandResponseBodyStream0(request) - } else { - self.eventLoop.execute { - self.demandResponseBodyStream0(request) + func demandResponseBodyStream(_ request: HTTPExecutableRequest) { + self.loopBound.execute { + $0.demandResponseBodyStream0(request) } } - } - func cancelRequest(_ request: HTTPExecutableRequest) { - if self.eventLoop.inEventLoop { - self.cancelRequest0(request) - } else { - self.eventLoop.execute { - self.cancelRequest0(request) + func cancelRequest(_ request: HTTPExecutableRequest) { + self.loopBound.execute { + $0.cancelRequest0(request) } } } diff --git a/Sources/AsyncHTTPClient/ConnectionPool/HTTP1/HTTP1Connection.swift b/Sources/AsyncHTTPClient/ConnectionPool/HTTP1/HTTP1Connection.swift index e0496f2e3..6f64e0407 100644 --- a/Sources/AsyncHTTPClient/ConnectionPool/HTTP1/HTTP1Connection.swift +++ b/Sources/AsyncHTTPClient/ConnectionPool/HTTP1/HTTP1Connection.swift @@ -17,9 +17,9 @@ import NIOCore import NIOHTTP1 import NIOHTTPCompression -protocol HTTP1ConnectionDelegate { - func http1ConnectionReleased(_: HTTP1Connection) - func http1ConnectionClosed(_: HTTP1Connection) +protocol HTTP1ConnectionDelegate: Sendable { + func http1ConnectionReleased(_: HTTPConnectionPool.Connection.ID) + func http1ConnectionClosed(_: HTTPConnectionPool.Connection.ID) } final class HTTP1Connection { @@ -67,32 +67,45 @@ final class HTTP1Connection { return connection } - func executeRequest(_ request: HTTPExecutableRequest) { - if self.channel.eventLoop.inEventLoop { - self.execute0(request: request) - } else { - self.channel.eventLoop.execute { - self.execute0(request: request) + var sendableView: SendableView { + SendableView(self) + } + + struct SendableView: Sendable { + private let connection: NIOLoopBound + let channel: Channel + let id: HTTPConnectionPool.Connection.ID + private var eventLoop: EventLoop { self.connection.eventLoop } + + init(_ connection: HTTP1Connection) { + self.connection = NIOLoopBound(connection, eventLoop: connection.channel.eventLoop) + self.id = connection.id + self.channel = connection.channel + } + + func executeRequest(_ request: HTTPExecutableRequest) { + self.connection.execute { + $0.execute0(request: request) } } - } - func shutdown() { - self.channel.triggerUserOutboundEvent(HTTPConnectionEvent.shutdownRequested, promise: nil) - } + func shutdown() { + self.channel.triggerUserOutboundEvent(HTTPConnectionEvent.shutdownRequested, promise: nil) + } - func close(promise: EventLoopPromise?) { - self.channel.close(mode: .all, promise: promise) - } + func close(promise: EventLoopPromise?) { + self.channel.close(mode: .all, promise: promise) + } - func close() -> EventLoopFuture { - let promise = self.channel.eventLoop.makePromise(of: Void.self) - self.close(promise: promise) - return promise.futureResult + func close() -> EventLoopFuture { + let promise = self.eventLoop.makePromise(of: Void.self) + self.close(promise: promise) + return promise.futureResult + } } func taskCompleted() { - self.delegate.http1ConnectionReleased(self) + self.delegate.http1ConnectionReleased(self.id) } private func execute0(request: HTTPExecutableRequest) { @@ -100,7 +113,7 @@ final class HTTP1Connection { return request.fail(ChannelError.ioOnClosedChannel) } - self.channel.write(request, promise: nil) + self.channel.pipeline.syncOperations.write(NIOAny(request), promise: nil) } private func start(decompression: HTTPClient.Decompression, logger: Logger) throws { @@ -111,9 +124,9 @@ final class HTTP1Connection { } self.state = .active - self.channel.closeFuture.whenComplete { _ in + self.channel.closeFuture.assumeIsolated().whenComplete { _ in self.state = .closed - self.delegate.http1ConnectionClosed(self) + self.delegate.http1ConnectionClosed(self.id) } do { @@ -150,3 +163,6 @@ final class HTTP1Connection { } } } + +@available(*, unavailable) +extension HTTP1Connection: Sendable {} diff --git a/Sources/AsyncHTTPClient/ConnectionPool/HTTP2/HTTP2ClientRequestHandler.swift b/Sources/AsyncHTTPClient/ConnectionPool/HTTP2/HTTP2ClientRequestHandler.swift index 61350dfd7..7c0197cdf 100644 --- a/Sources/AsyncHTTPClient/ConnectionPool/HTTP2/HTTP2ClientRequestHandler.swift +++ b/Sources/AsyncHTTPClient/ConnectionPool/HTTP2/HTTP2ClientRequestHandler.swift @@ -137,7 +137,7 @@ final class HTTP2ClientRequestHandler: ChannelDuplexHandler { self.runTimeoutAction(timeoutAction, context: context) } - request.willExecuteRequest(self) + request.willExecuteRequest(self.requestExecutor) let action = self.state.startRequest( head: request.requestHead, @@ -313,7 +313,7 @@ final class HTTP2ClientRequestHandler: ChannelDuplexHandler { assert(self.idleReadTimeoutTimer == nil, "Expected there is no timeout timer so far.") let timerID = self.currentIdleReadTimeoutTimerID - self.idleReadTimeoutTimer = self.eventLoop.scheduleTask(in: timeAmount) { + self.idleReadTimeoutTimer = self.eventLoop.assumeIsolated().scheduleTask(in: timeAmount) { guard self.currentIdleReadTimeoutTimerID == timerID else { return } let action = self.state.idleReadTimeoutTriggered() self.run(action, context: context) @@ -326,7 +326,7 @@ final class HTTP2ClientRequestHandler: ChannelDuplexHandler { self.currentIdleReadTimeoutTimerID &+= 1 let timerID = self.currentIdleReadTimeoutTimerID - self.idleReadTimeoutTimer = self.eventLoop.scheduleTask(in: timeAmount) { + self.idleReadTimeoutTimer = self.eventLoop.assumeIsolated().scheduleTask(in: timeAmount) { guard self.currentIdleReadTimeoutTimerID == timerID else { return } let action = self.state.idleReadTimeoutTriggered() self.run(action, context: context) @@ -349,7 +349,7 @@ final class HTTP2ClientRequestHandler: ChannelDuplexHandler { assert(self.idleWriteTimeoutTimer == nil, "Expected there is no timeout timer so far.") let timerID = self.currentIdleWriteTimeoutTimerID - self.idleWriteTimeoutTimer = self.eventLoop.scheduleTask(in: timeAmount) { + self.idleWriteTimeoutTimer = self.eventLoop.assumeIsolated().scheduleTask(in: timeAmount) { guard self.currentIdleWriteTimeoutTimerID == timerID else { return } let action = self.state.idleWriteTimeoutTriggered() self.run(action, context: context) @@ -361,7 +361,7 @@ final class HTTP2ClientRequestHandler: ChannelDuplexHandler { self.currentIdleWriteTimeoutTimerID &+= 1 let timerID = self.currentIdleWriteTimeoutTimerID - self.idleWriteTimeoutTimer = self.eventLoop.scheduleTask(in: timeAmount) { + self.idleWriteTimeoutTimer = self.eventLoop.assumeIsolated().scheduleTask(in: timeAmount) { guard self.currentIdleWriteTimeoutTimerID == timerID else { return } let action = self.state.idleWriteTimeoutTriggered() self.run(action, context: context) @@ -437,43 +437,39 @@ final class HTTP2ClientRequestHandler: ChannelDuplexHandler { @available(*, unavailable) extension HTTP2ClientRequestHandler: Sendable {} -extension HTTP2ClientRequestHandler: HTTPRequestExecutor { - func writeRequestBodyPart(_ data: IOData, request: HTTPExecutableRequest, promise: EventLoopPromise?) { - if self.eventLoop.inEventLoop { - self.writeRequestBodyPart0(data, request: request, promise: promise) - } else { - self.eventLoop.execute { - self.writeRequestBodyPart0(data, request: request, promise: promise) +extension HTTP2ClientRequestHandler { + var requestExecutor: RequestExecutor { + RequestExecutor(self) + } + + struct RequestExecutor: HTTPRequestExecutor, Sendable { + private let loopBound: NIOLoopBound + + init(_ handler: HTTP2ClientRequestHandler) { + self.loopBound = NIOLoopBound(handler, eventLoop: handler.eventLoop) + } + + func writeRequestBodyPart(_ data: IOData, request: HTTPExecutableRequest, promise: EventLoopPromise?) { + self.loopBound.execute { + $0.writeRequestBodyPart0(data, request: request, promise: promise) } } - } - func finishRequestBodyStream(_ request: HTTPExecutableRequest, promise: EventLoopPromise?) { - if self.eventLoop.inEventLoop { - self.finishRequestBodyStream0(request, promise: promise) - } else { - self.eventLoop.execute { - self.finishRequestBodyStream0(request, promise: promise) + func finishRequestBodyStream(_ request: HTTPExecutableRequest, promise: EventLoopPromise?) { + self.loopBound.execute { + $0.finishRequestBodyStream0(request, promise: promise) } } - } - func demandResponseBodyStream(_ request: HTTPExecutableRequest) { - if self.eventLoop.inEventLoop { - self.demandResponseBodyStream0(request) - } else { - self.eventLoop.execute { - self.demandResponseBodyStream0(request) + func demandResponseBodyStream(_ request: HTTPExecutableRequest) { + self.loopBound.execute { + $0.demandResponseBodyStream0(request) } } - } - func cancelRequest(_ request: HTTPExecutableRequest) { - if self.eventLoop.inEventLoop { - self.cancelRequest0(request) - } else { - self.eventLoop.execute { - self.cancelRequest0(request) + func cancelRequest(_ request: HTTPExecutableRequest) { + self.loopBound.execute { + $0.cancelRequest0(request) } } } diff --git a/Sources/AsyncHTTPClient/ConnectionPool/HTTP2/HTTP2Connection.swift b/Sources/AsyncHTTPClient/ConnectionPool/HTTP2/HTTP2Connection.swift index 77e4835f6..1c24554e2 100644 --- a/Sources/AsyncHTTPClient/ConnectionPool/HTTP2/HTTP2Connection.swift +++ b/Sources/AsyncHTTPClient/ConnectionPool/HTTP2/HTTP2Connection.swift @@ -17,11 +17,11 @@ import NIOCore import NIOHTTP2 import NIOHTTPCompression -protocol HTTP2ConnectionDelegate { - func http2Connection(_: HTTP2Connection, newMaxStreamSetting: Int) - func http2ConnectionStreamClosed(_: HTTP2Connection, availableStreams: Int) - func http2ConnectionGoAwayReceived(_: HTTP2Connection) - func http2ConnectionClosed(_: HTTP2Connection) +protocol HTTP2ConnectionDelegate: Sendable { + func http2Connection(_: HTTPConnectionPool.Connection.ID, newMaxStreamSetting: Int) + func http2ConnectionStreamClosed(_: HTTPConnectionPool.Connection.ID, availableStreams: Int) + func http2ConnectionGoAwayReceived(_: HTTPConnectionPool.Connection.ID) + func http2ConnectionClosed(_: HTTPConnectionPool.Connection.ID) } struct HTTP2PushNotSupportedError: Error {} @@ -135,7 +135,7 @@ final class HTTP2Connection { maximumConnectionUses: Int?, logger: Logger, streamChannelDebugInitializer: (@Sendable (Channel) -> EventLoopFuture)? = nil - ) -> EventLoopFuture<(HTTP2Connection, Int)> { + ) -> EventLoopFuture<(HTTP2Connection, Int)>.Isolated { let connection = HTTP2Connection( channel: channel, connectionID: connectionID, @@ -145,39 +145,60 @@ final class HTTP2Connection { logger: logger, streamChannelDebugInitializer: streamChannelDebugInitializer ) - return connection._start0().map { maxStreams in (connection, maxStreams) } + + return connection._start0().assumeIsolated().map { maxStreams in + (connection, maxStreams) + } + } + + var sendableView: SendableView { + SendableView(self) } - func executeRequest(_ request: HTTPExecutableRequest) { - if self.channel.eventLoop.inEventLoop { - self.executeRequest0(request) - } else { - self.channel.eventLoop.execute { - self.executeRequest0(request) + struct SendableView: Sendable { + private let connection: NIOLoopBound + let id: HTTPConnectionPool.Connection.ID + let channel: Channel + + var eventLoop: EventLoop { + self.connection.eventLoop + } + + var closeFuture: EventLoopFuture { + self.channel.closeFuture + } + + func __forTesting_getStreamChannels() -> [Channel] { + self.connection.value.__forTesting_getStreamChannels() + } + + init(_ connection: HTTP2Connection) { + self.connection = NIOLoopBound(connection, eventLoop: connection.channel.eventLoop) + self.id = connection.id + self.channel = connection.channel + } + + func executeRequest(_ request: HTTPExecutableRequest) { + self.connection.execute { + $0.executeRequest0(request) } } - } - /// shuts down the connection by cancelling all running tasks and closing the connection once - /// all child streams/channels are closed. - func shutdown() { - if self.channel.eventLoop.inEventLoop { - self.shutdown0() - } else { - self.channel.eventLoop.execute { - self.shutdown0() + func shutdown() { + self.connection.execute { + $0.shutdown0() } } - } - func close(promise: EventLoopPromise?) { - self.channel.close(mode: .all, promise: promise) - } + func close(promise: EventLoopPromise?) { + self.channel.close(mode: .all, promise: promise) + } - func close() -> EventLoopFuture { - let promise = self.channel.eventLoop.makePromise(of: Void.self) - self.close(promise: promise) - return promise.futureResult + func close() -> EventLoopFuture { + let promise = self.eventLoop.makePromise(of: Void.self) + self.close(promise: promise) + return promise.futureResult + } } func _start0() -> EventLoopFuture { @@ -186,7 +207,7 @@ final class HTTP2Connection { let readyToAcceptConnectionsPromise = self.channel.eventLoop.makePromise(of: Int.self) self.state = .starting(readyToAcceptConnectionsPromise) - self.channel.closeFuture.whenComplete { _ in + self.channel.closeFuture.assumeIsolated().whenComplete { _ in switch self.state { case .initialized, .closed: preconditionFailure("invalid state \(self.state)") @@ -195,7 +216,7 @@ final class HTTP2Connection { readyToAcceptConnectionsPromise.fail(HTTPClientError.remoteConnectionClosed) case .active, .closing: self.state = .closed - self.delegate.http2ConnectionClosed(self) + self.delegate.http2ConnectionClosed(self.id) } } @@ -234,13 +255,18 @@ final class HTTP2Connection { case .active: let createStreamChannelPromise = self.channel.eventLoop.makePromise(of: Channel.self) - self.multiplexer.createStreamChannel(promise: createStreamChannelPromise) { - channel -> EventLoopFuture in + let loopBoundSelf = NIOLoopBound(self, eventLoop: self.channel.eventLoop) + + self.multiplexer.createStreamChannel( + promise: createStreamChannelPromise + ) { [streamChannelDebugInitializer] channel -> EventLoopFuture in + let connection = loopBoundSelf.value + do { // the connection may have been asked to shutdown while we created the child. in // this // channel. - guard case .active = self.state else { + guard case .active = connection.state else { throw HTTPClientError.cancelled } @@ -249,7 +275,7 @@ final class HTTP2Connection { let translate = HTTP2FramePayloadToHTTP1ClientCodec(httpProtocol: .https) try channel.pipeline.syncOperations.addHandler(translate) - if case .enabled(let limit) = self.decompression { + if case .enabled(let limit) = connection.decompression { let decompressHandler = NIOHTTPResponseDecompressor(limit: limit) try channel.pipeline.syncOperations.addHandler(decompressHandler) } @@ -261,17 +287,17 @@ final class HTTP2Connection { // request to it. In case of an error, we are sure that the channel was added // before. let box = ChannelBox(channel) - self.openStreams.insert(box) - channel.closeFuture.whenComplete { _ in - self.openStreams.remove(box) + connection.openStreams.insert(box) + channel.closeFuture.assumeIsolated().whenComplete { _ in + connection.openStreams.remove(box) } - if let streamChannelDebugInitializer = self.streamChannelDebugInitializer { + if let streamChannelDebugInitializer = streamChannelDebugInitializer { return streamChannelDebugInitializer(channel).map { _ in channel.write(request, promise: nil) } } else { - channel.write(request, promise: nil) + channel.pipeline.syncOperations.write(NIOAny(request), promise: nil) return channel.eventLoop.makeSucceededVoidFuture() } } catch { @@ -335,7 +361,7 @@ extension HTTP2Connection: HTTP2IdleHandlerDelegate { case .active: self.state = .active(maxStreams: maxStreams) - self.delegate.http2Connection(self, newMaxStreamSetting: maxStreams) + self.delegate.http2Connection(self.id, newMaxStreamSetting: maxStreams) case .closing, .closed: // ignore. we only wait for all connections to be closed anyway. @@ -356,7 +382,7 @@ extension HTTP2Connection: HTTP2IdleHandlerDelegate { case .active: self.state = .closing - self.delegate.http2ConnectionGoAwayReceived(self) + self.delegate.http2ConnectionGoAwayReceived(self.id) case .closing, .closed: // we are already closing. Nothing new @@ -367,6 +393,9 @@ extension HTTP2Connection: HTTP2IdleHandlerDelegate { func http2StreamClosed(availableStreams: Int) { self.channel.eventLoop.assertInEventLoop() - self.delegate.http2ConnectionStreamClosed(self, availableStreams: availableStreams) + self.delegate.http2ConnectionStreamClosed(self.id, availableStreams: availableStreams) } } + +@available(*, unavailable) +extension HTTP2Connection: Sendable {} diff --git a/Sources/AsyncHTTPClient/ConnectionPool/HTTPConnectionPool+Factory.swift b/Sources/AsyncHTTPClient/ConnectionPool/HTTPConnectionPool+Factory.swift index acd73c416..c896791cf 100644 --- a/Sources/AsyncHTTPClient/ConnectionPool/HTTPConnectionPool+Factory.swift +++ b/Sources/AsyncHTTPClient/ConnectionPool/HTTPConnectionPool+Factory.swift @@ -48,9 +48,9 @@ extension HTTPConnectionPool { } } -protocol HTTPConnectionRequester { - func http1ConnectionCreated(_: HTTP1Connection) - func http2ConnectionCreated(_: HTTP2Connection, maximumStreams: Int) +protocol HTTPConnectionRequester: Sendable { + func http1ConnectionCreated(_: HTTP1Connection.SendableView) + func http2ConnectionCreated(_: HTTP2Connection.SendableView, maximumStreams: Int) func failedToCreateHTTPConnection(_: HTTPConnectionPool.Connection.ID, error: Error) func waitingForConnectivity(_: HTTPConnectionPool.Connection.ID, error: Error) } @@ -74,7 +74,7 @@ extension HTTPConnectionPool.ConnectionFactory { deadline: deadline, eventLoop: eventLoop, logger: logger - ).whenComplete { result in + ).whenComplete { [logger] result in switch result { case .success(.http1_1(let channel)): do { @@ -87,16 +87,18 @@ extension HTTPConnectionPool.ConnectionFactory { ) if let connectionDebugInitializer = self.clientConfiguration.http1_1ConnectionDebugInitializer { - connectionDebugInitializer(channel).whenComplete { debugInitializerResult in + connectionDebugInitializer(channel).hop( + to: eventLoop + ).assumeIsolated().whenComplete { debugInitializerResult in switch debugInitializerResult { case .success: - requester.http1ConnectionCreated(connection) + requester.http1ConnectionCreated(connection.sendableView) case .failure(let error): requester.failedToCreateHTTPConnection(connectionID, error: error) } } } else { - requester.http1ConnectionCreated(connection) + requester.http1ConnectionCreated(connection.sendableView) } } catch { requester.failedToCreateHTTPConnection(connectionID, error: error) @@ -115,12 +117,12 @@ extension HTTPConnectionPool.ConnectionFactory { switch result { case .success((let connection, let maximumStreams)): if let connectionDebugInitializer = self.clientConfiguration.http2ConnectionDebugInitializer { - connectionDebugInitializer(channel).whenComplete { + connectionDebugInitializer(channel).hop(to: eventLoop).assumeIsolated().whenComplete { debugInitializerResult in switch debugInitializerResult { case .success: requester.http2ConnectionCreated( - connection, + connection.sendableView, maximumStreams: maximumStreams ) case .failure(let error): @@ -132,7 +134,7 @@ extension HTTPConnectionPool.ConnectionFactory { } } else { requester.http2ConnectionCreated( - connection, + connection.sendableView, maximumStreams: maximumStreams ) } @@ -359,7 +361,6 @@ extension HTTPConnectionPool.ConnectionFactory { case .http1Only: tlsConfig.applicationProtocols = ["http/1.1"] } - let tlsEventHandler = TLSEventsHandler(deadline: deadline) let sslServerHostname = self.key.serverNameIndicator let sslContextFuture = self.sslContextCache.sslContext( @@ -375,6 +376,7 @@ extension HTTPConnectionPool.ConnectionFactory { serverHostname: sslServerHostname ) try channel.pipeline.syncOperations.addHandler(sslHandler) + let tlsEventHandler = TLSEventsHandler(deadline: deadline) try channel.pipeline.syncOperations.addHandler(tlsEventHandler) // The tlsEstablishedFuture is set as soon as the TLSEventsHandler is in a @@ -384,8 +386,14 @@ extension HTTPConnectionPool.ConnectionFactory { return channel.eventLoop.makeFailedFuture(error) } }.flatMap { negotiated -> EventLoopFuture in - channel.pipeline.removeHandler(tlsEventHandler).flatMapThrowing { - try self.matchALPNToHTTPVersion(negotiated, channel: channel) + do { + let sync = channel.pipeline.syncOperations + let context = try sync.context(handlerType: TLSEventsHandler.self) + return sync.removeHandler(context: context).flatMapThrowing { + try self.matchALPNToHTTPVersion(negotiated, channel: channel) + } + } catch { + return channel.eventLoop.makeFailedFuture(error) } } } @@ -462,9 +470,9 @@ extension HTTPConnectionPool.ConnectionFactory { // The tlsEstablishedFuture is set as soon as the TLSEventsHandler is in a // pipeline. It is created in TLSEventsHandler's handlerAdded method. - return tlsEventHandler.tlsEstablishedFuture!.flatMap { negotiated in - channel.pipeline.removeHandler(tlsEventHandler).map { (channel, negotiated) } - } + return tlsEventHandler.tlsEstablishedFuture!.assumeIsolated().flatMap { negotiated in + channel.pipeline.syncOperations.removeHandler(tlsEventHandler).map { (channel, negotiated) } + }.nonisolated() } catch { assert( channel.isActive == false, @@ -504,9 +512,7 @@ extension HTTPConnectionPool.ConnectionFactory { } #if canImport(Network) - if #available(OSX 10.14, iOS 12.0, tvOS 12.0, watchOS 6.0, *), - let tsBootstrap = NIOTSConnectionBootstrap(validatingGroup: eventLoop) - { + if #available(OSX 10.14, iOS 12.0, tvOS 12.0, watchOS 6.0, *), eventLoop is QoSEventLoop { // create NIOClientTCPBootstrap with NIOTS TLS provider let bootstrapFuture = tlsConfig.getNWProtocolTLSOptions( on: eventLoop, @@ -514,7 +520,7 @@ extension HTTPConnectionPool.ConnectionFactory { ).map { options -> NIOClientTCPBootstrapProtocol in - tsBootstrap + NIOTSConnectionBootstrap(group: eventLoop) // validated above .channelOption( NIOTSChannelOptions.waitForActivity, value: self.clientConfiguration.networkFrameworkWaitForConnectivity @@ -551,29 +557,29 @@ extension HTTPConnectionPool.ConnectionFactory { logger: logger ) - let bootstrap = ClientBootstrap(group: eventLoop) - .connectTimeout(deadline - NIODeadline.now()) - .enableMPTCP(clientConfiguration.enableMultipath) - .channelInitializer { channel in - sslContextFuture.flatMap { sslContext -> EventLoopFuture in - do { - let sync = channel.pipeline.syncOperations - let sslHandler = try NIOSSLClientHandler( - context: sslContext, - serverHostname: self.key.serverNameIndicator - ) - let tlsEventHandler = TLSEventsHandler(deadline: deadline) + return eventLoop.submit { + ClientBootstrap(group: eventLoop) + .connectTimeout(deadline - NIODeadline.now()) + .enableMPTCP(clientConfiguration.enableMultipath) + .channelInitializer { channel in + sslContextFuture.flatMap { sslContext -> EventLoopFuture in + do { + let sync = channel.pipeline.syncOperations + let sslHandler = try NIOSSLClientHandler( + context: sslContext, + serverHostname: self.key.serverNameIndicator + ) + let tlsEventHandler = TLSEventsHandler(deadline: deadline) - try sync.addHandler(sslHandler) - try sync.addHandler(tlsEventHandler) - return channel.eventLoop.makeSucceededVoidFuture() - } catch { - return channel.eventLoop.makeFailedFuture(error) + try sync.addHandler(sslHandler) + try sync.addHandler(tlsEventHandler) + return channel.eventLoop.makeSucceededVoidFuture() + } catch { + return channel.eventLoop.makeFailedFuture(error) + } } } - } - - return eventLoop.makeSucceededFuture(bootstrap) + } } private func matchALPNToHTTPVersion(_ negotiated: String?, channel: Channel) throws -> NegotiatedProtocol { diff --git a/Sources/AsyncHTTPClient/ConnectionPool/HTTPConnectionPool.swift b/Sources/AsyncHTTPClient/ConnectionPool/HTTPConnectionPool.swift index ebcecbdc5..251224ac0 100644 --- a/Sources/AsyncHTTPClient/ConnectionPool/HTTPConnectionPool.swift +++ b/Sources/AsyncHTTPClient/ConnectionPool/HTTPConnectionPool.swift @@ -461,7 +461,7 @@ final class HTTPConnectionPool: // MARK: - Protocol methods - extension HTTPConnectionPool: HTTPConnectionRequester { - func http1ConnectionCreated(_ connection: HTTP1Connection) { + func http1ConnectionCreated(_ connection: HTTP1Connection.SendableView) { self.logger.trace( "successfully created connection", metadata: [ @@ -474,7 +474,7 @@ extension HTTPConnectionPool: HTTPConnectionRequester { } } - func http2ConnectionCreated(_ connection: HTTP2Connection, maximumStreams: Int) { + func http2ConnectionCreated(_ connection: HTTP2Connection.SendableView, maximumStreams: Int) { self.logger.trace( "successfully created connection", metadata: [ @@ -516,84 +516,84 @@ extension HTTPConnectionPool: HTTPConnectionRequester { } extension HTTPConnectionPool: HTTP1ConnectionDelegate { - func http1ConnectionClosed(_ connection: HTTP1Connection) { + func http1ConnectionClosed(_ id: HTTPConnectionPool.Connection.ID) { self.logger.debug( "connection closed", metadata: [ - "ahc-connection-id": "\(connection.id)", + "ahc-connection-id": "\(id)", "ahc-http-version": "http/1.1", ] ) self.modifyStateAndRunActions { - $0.http1ConnectionClosed(connection.id) + $0.http1ConnectionClosed(id) } } - func http1ConnectionReleased(_ connection: HTTP1Connection) { + func http1ConnectionReleased(_ id: HTTPConnectionPool.Connection.ID) { self.logger.trace( "releasing connection", metadata: [ - "ahc-connection-id": "\(connection.id)", + "ahc-connection-id": "\(id)", "ahc-http-version": "http/1.1", ] ) self.modifyStateAndRunActions { - $0.http1ConnectionReleased(connection.id) + $0.http1ConnectionReleased(id) } } } extension HTTPConnectionPool: HTTP2ConnectionDelegate { - func http2Connection(_ connection: HTTP2Connection, newMaxStreamSetting: Int) { + func http2Connection(_ id: HTTPConnectionPool.Connection.ID, newMaxStreamSetting: Int) { self.logger.debug( "new max stream setting", metadata: [ - "ahc-connection-id": "\(connection.id)", + "ahc-connection-id": "\(id)", "ahc-http-version": "http/2", "ahc-max-streams": "\(newMaxStreamSetting)", ] ) self.modifyStateAndRunActions { - $0.newHTTP2MaxConcurrentStreamsReceived(connection.id, newMaxStreams: newMaxStreamSetting) + $0.newHTTP2MaxConcurrentStreamsReceived(id, newMaxStreams: newMaxStreamSetting) } } - func http2ConnectionGoAwayReceived(_ connection: HTTP2Connection) { + func http2ConnectionGoAwayReceived(_ id: HTTPConnectionPool.Connection.ID) { self.logger.debug( "connection go away received", metadata: [ - "ahc-connection-id": "\(connection.id)", + "ahc-connection-id": "\(id)", "ahc-http-version": "http/2", ] ) self.modifyStateAndRunActions { - $0.http2ConnectionGoAwayReceived(connection.id) + $0.http2ConnectionGoAwayReceived(id) } } - func http2ConnectionClosed(_ connection: HTTP2Connection) { + func http2ConnectionClosed(_ id: HTTPConnectionPool.Connection.ID) { self.logger.debug( "connection closed", metadata: [ - "ahc-connection-id": "\(connection.id)", + "ahc-connection-id": "\(id)", "ahc-http-version": "http/2", ] ) self.modifyStateAndRunActions { - $0.http2ConnectionClosed(connection.id) + $0.http2ConnectionClosed(id) } } - func http2ConnectionStreamClosed(_ connection: HTTP2Connection, availableStreams: Int) { + func http2ConnectionStreamClosed(_ id: HTTPConnectionPool.Connection.ID, availableStreams: Int) { self.logger.trace( "stream closed", metadata: [ - "ahc-connection-id": "\(connection.id)", + "ahc-connection-id": "\(id)", "ahc-http-version": "http/2", ] ) self.modifyStateAndRunActions { - $0.http2ConnectionStreamClosed(connection.id) + $0.http2ConnectionStreamClosed(id) } } } @@ -612,18 +612,18 @@ extension HTTPConnectionPool { typealias ID = Int private enum Reference { - case http1_1(HTTP1Connection) - case http2(HTTP2Connection) + case http1_1(HTTP1Connection.SendableView) + case http2(HTTP2Connection.SendableView) case __testOnly_connection(ID, EventLoop) } private let _ref: Reference - fileprivate static func http1_1(_ conn: HTTP1Connection) -> Self { + fileprivate static func http1_1(_ conn: HTTP1Connection.SendableView) -> Self { Connection(_ref: .http1_1(conn)) } - fileprivate static func http2(_ conn: HTTP2Connection) -> Self { + fileprivate static func http2(_ conn: HTTP2Connection.SendableView) -> Self { Connection(_ref: .http2(conn)) } diff --git a/Sources/AsyncHTTPClient/ConnectionPool/HTTPExecutableRequest.swift b/Sources/AsyncHTTPClient/ConnectionPool/HTTPExecutableRequest.swift index e8c07e50f..bce55eb5b 100644 --- a/Sources/AsyncHTTPClient/ConnectionPool/HTTPExecutableRequest.swift +++ b/Sources/AsyncHTTPClient/ConnectionPool/HTTPExecutableRequest.swift @@ -176,7 +176,7 @@ protocol HTTPSchedulableRequest: HTTPExecutableRequest { /// A handle to the request executor. /// /// This protocol is implemented by the `HTTP1ClientChannelHandler`. -protocol HTTPRequestExecutor { +protocol HTTPRequestExecutor: Sendable { /// Writes a body part into the channel pipeline /// /// This method may be **called on any thread**. The executor needs to ensure thread safety. @@ -201,7 +201,7 @@ protocol HTTPRequestExecutor { func cancelRequest(_ task: HTTPExecutableRequest) } -protocol HTTPExecutableRequest: AnyObject { +protocol HTTPExecutableRequest: AnyObject, Sendable { /// The request's logger var logger: Logger { get } diff --git a/Sources/AsyncHTTPClient/NIOLoopBound+Execute.swift b/Sources/AsyncHTTPClient/NIOLoopBound+Execute.swift new file mode 100644 index 000000000..b25a0f00d --- /dev/null +++ b/Sources/AsyncHTTPClient/NIOLoopBound+Execute.swift @@ -0,0 +1,28 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the AsyncHTTPClient open source project +// +// Copyright (c) 2025 Apple Inc. and the AsyncHTTPClient project authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See CONTRIBUTORS.txt for the list of AsyncHTTPClient project authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// + +import NIOCore + +extension NIOLoopBound { + @inlinable + func execute(_ body: @Sendable @escaping (Value) -> Void) { + if self.eventLoop.inEventLoop { + body(self.value) + } else { + self.eventLoop.execute { + body(self.value) + } + } + } +} diff --git a/Tests/AsyncHTTPClientTests/EmbeddedChannel+HTTPConvenience.swift b/Tests/AsyncHTTPClientTests/EmbeddedChannel+HTTPConvenience.swift index 397d143b0..5cc35bce8 100644 --- a/Tests/AsyncHTTPClientTests/EmbeddedChannel+HTTPConvenience.swift +++ b/Tests/AsyncHTTPClientTests/EmbeddedChannel+HTTPConvenience.swift @@ -59,7 +59,7 @@ extension EmbeddedChannel { } struct HTTP1TestTools { - let connection: HTTP1Connection + let connection: HTTP1Connection.SendableView let connectionDelegate: MockConnectionDelegate let readEventHandler: ReadEventHitHandler let logger: Logger @@ -96,7 +96,7 @@ extension EmbeddedChannel { try removeEncoderFuture.wait() return .init( - connection: connection, + connection: connection.sendableView, connectionDelegate: connectionDelegate, readEventHandler: readEventHandler, logger: logger diff --git a/Tests/AsyncHTTPClientTests/HTTP1ConnectionTests.swift b/Tests/AsyncHTTPClientTests/HTTP1ConnectionTests.swift index 5f980bccb..53001b64b 100644 --- a/Tests/AsyncHTTPClientTests/HTTP1ConnectionTests.swift +++ b/Tests/AsyncHTTPClientTests/HTTP1ConnectionTests.swift @@ -48,7 +48,7 @@ class HTTP1ConnectionTests: XCTestCase { ) XCTAssertNotNil(try embedded.pipeline.syncOperations.handler(type: NIOHTTPResponseDecompressor.self)) - XCTAssertNoThrow(try connection?.close().wait()) + XCTAssertNoThrow(try connection?.sendableView.close().wait()) embedded.embeddedEventLoop.run() XCTAssert(!embedded.isActive) } @@ -108,8 +108,7 @@ class HTTP1ConnectionTests: XCTestCase { defer { XCTAssertNoThrow(try server.stop()) } let logger = Logger(label: "test") - let delegate = MockHTTP1ConnectionDelegate() - delegate.closePromise = clientEL.makePromise(of: Void.self) + let delegate = MockHTTP1ConnectionDelegate(closePromise: clientEL.makePromise()) let connection = try! ClientBootstrap(group: clientEL) .connect(to: .init(ipAddress: "127.0.0.1", port: server.serverPort)) @@ -120,7 +119,7 @@ class HTTP1ConnectionTests: XCTestCase { delegate: delegate, decompression: .disabled, logger: logger - ) + ).sendableView } .wait() @@ -223,16 +222,16 @@ class HTTP1ConnectionTests: XCTestCase { ) let connectionDelegate = MockConnectionDelegate() let logger = Logger(label: "test") - var maybeConnection: HTTP1Connection? + var maybeConnection: HTTP1Connection.SendableView? XCTAssertNoThrow( - maybeConnection = try eventLoop.submit { + maybeConnection = try eventLoop.submit { [maybeChannel] in try HTTP1Connection.start( channel: XCTUnwrap(maybeChannel), connectionID: 0, delegate: connectionDelegate, decompression: .disabled, logger: logger - ) + ).sendableView }.wait() ) guard let connection = maybeConnection else { return XCTFail("Expected to have a connection here") } @@ -287,16 +286,16 @@ class HTTP1ConnectionTests: XCTestCase { ) let connectionDelegate = MockConnectionDelegate() let logger = Logger(label: "test") - var maybeConnection: HTTP1Connection? + var maybeConnection: HTTP1Connection.SendableView? XCTAssertNoThrow( - maybeConnection = try eventLoop.submit { + maybeConnection = try eventLoop.submit { [maybeChannel] in try HTTP1Connection.start( channel: XCTUnwrap(maybeChannel), connectionID: 0, delegate: connectionDelegate, decompression: .disabled, logger: logger - ) + ).sendableView }.wait() ) guard let connection = maybeConnection else { return XCTFail("Expected to have a connection here") } @@ -372,16 +371,16 @@ class HTTP1ConnectionTests: XCTestCase { ) let connectionDelegate = MockConnectionDelegate() let logger = Logger(label: "test") - var maybeConnection: HTTP1Connection? + var maybeConnection: HTTP1Connection.SendableView? XCTAssertNoThrow( - maybeConnection = try eventLoop.submit { + maybeConnection = try eventLoop.submit { [maybeChannel] in try HTTP1Connection.start( channel: XCTUnwrap(maybeChannel), connectionID: 0, delegate: connectionDelegate, decompression: .disabled, logger: logger - ) + ).sendableView }.wait() ) guard let connection = maybeConnection else { return XCTFail("Expected to have a connection here") } @@ -454,7 +453,7 @@ class HTTP1ConnectionTests: XCTestCase { ) guard let requestBag = maybeRequestBag else { return XCTFail("Expected to be able to create a request bag") } - connection.executeRequest(requestBag) + connection.sendableView.executeRequest(requestBag) XCTAssertNoThrow(try embedded.readOutbound(as: ByteBuffer.self)) // head XCTAssertNoThrow(try embedded.readOutbound(as: ByteBuffer.self)) // end @@ -523,7 +522,7 @@ class HTTP1ConnectionTests: XCTestCase { ) guard let requestBag = maybeRequestBag else { return XCTFail("Expected to be able to create a request bag") } - connection.executeRequest(requestBag) + connection.sendableView.executeRequest(requestBag) XCTAssertNoThrow(try embedded.readOutbound(as: ByteBuffer.self)) // head XCTAssertNoThrow(try embedded.readOutbound(as: ByteBuffer.self)) // end @@ -626,7 +625,7 @@ class HTTP1ConnectionTests: XCTestCase { ) guard let requestBag = maybeRequestBag else { return XCTFail("Expected to be able to create a request bag") } - connection.executeRequest(requestBag) + connection.sendableView.executeRequest(requestBag) let responseString = """ HTTP/1.0 200 OK\r\n\ @@ -654,31 +653,31 @@ class HTTP1ConnectionTests: XCTestCase { // bytes a ready to be read as well. This will allow us to test if subsequent reads // are waiting for backpressure promise. func testDownloadStreamingBackpressure() { - class BackpressureTestDelegate: HTTPClientResponseDelegate { + final class BackpressureTestDelegate: HTTPClientResponseDelegate { typealias Response = Void - var _reads = 0 - var _channel: Channel? + private struct State: Sendable { + var reads = 0 + var channel: Channel? + } + + private let state = NIOLockedValueBox(State()) + + var reads: Int { + self.state.withLockedValue { $0.reads } + } - let lock: NIOLock let backpressurePromise: EventLoopPromise let messageReceived: EventLoopPromise init(eventLoop: EventLoop) { - self.lock = NIOLock() self.backpressurePromise = eventLoop.makePromise() self.messageReceived = eventLoop.makePromise() } - var reads: Int { - self.lock.withLock { - self._reads - } - } - func willExecuteOnChannel(_ channel: Channel) { - self.lock.withLock { - self._channel = channel + self.state.withLockedValue { + $0.channel = channel } } @@ -688,8 +687,8 @@ class HTTP1ConnectionTests: XCTestCase { func didReceiveBodyPart(task: HTTPClient.Task, _ buffer: ByteBuffer) -> EventLoopFuture { // We count a number of reads received. - self.lock.withLock { - self._reads += 1 + self.state.withLockedValue { + $0.reads += 1 } // We need to notify the test when first byte of the message is arrived. self.messageReceived.succeed(()) @@ -721,8 +720,8 @@ class HTTP1ConnectionTests: XCTestCase { let buffer = context.channel.allocator.buffer(string: "1234") context.writeAndFlush(self.wrapOutboundOut(.body(.byteBuffer(buffer))), promise: nil) - self.endFuture.hop(to: context.eventLoop).whenSuccess { - context.writeAndFlush(self.wrapOutboundOut(.end(nil)), promise: nil) + self.endFuture.hop(to: context.eventLoop).assumeIsolated().whenSuccess { + context.writeAndFlush(Self.wrapOutboundOut(.end(nil)), promise: nil) } } } @@ -753,7 +752,7 @@ class HTTP1ConnectionTests: XCTestCase { ) guard let channel = maybeChannel else { return XCTFail("Expected to have a channel at this point") } let connectionDelegate = MockConnectionDelegate() - var maybeConnection: HTTP1Connection? + var maybeConnection: HTTP1Connection.SendableView? XCTAssertNoThrow( maybeConnection = try channel.eventLoop.submit { try HTTP1Connection.start( @@ -762,7 +761,7 @@ class HTTP1ConnectionTests: XCTestCase { delegate: connectionDelegate, decompression: .disabled, logger: logger - ) + ).sendableView }.wait() ) guard let connection = maybeConnection else { return XCTFail("Expected to have a connection at this point") } @@ -802,15 +801,20 @@ class HTTP1ConnectionTests: XCTestCase { } } -class MockHTTP1ConnectionDelegate: HTTP1ConnectionDelegate { - var releasePromise: EventLoopPromise? - var closePromise: EventLoopPromise? +final class MockHTTP1ConnectionDelegate: HTTP1ConnectionDelegate { + let releasePromise: EventLoopPromise? + let closePromise: EventLoopPromise? - func http1ConnectionReleased(_: HTTP1Connection) { + init(releasePromise: EventLoopPromise? = nil, closePromise: EventLoopPromise? = nil) { + self.releasePromise = releasePromise + self.closePromise = closePromise + } + + func http1ConnectionReleased(_: HTTPConnectionPool.Connection.ID) { self.releasePromise?.succeed(()) } - func http1ConnectionClosed(_: HTTP1Connection) { + func http1ConnectionClosed(_: HTTPConnectionPool.Connection.ID) { self.closePromise?.succeed(()) } } @@ -875,38 +879,40 @@ class AfterRequestCloseConnectionChannelHandler: ChannelInboundHandler { context.write(self.wrapOutboundOut(.end(nil)), promise: nil) context.flush() - context.eventLoop.scheduleTask(in: .milliseconds(20)) { + context.eventLoop.assumeIsolated().scheduleTask(in: .milliseconds(20)) { context.close(promise: nil) } } } } -class MockConnectionDelegate: HTTP1ConnectionDelegate { - private var lock = NIOLock() +final class MockConnectionDelegate: HTTP1ConnectionDelegate { + private let counts = NIOLockedValueBox(Counts()) - private var _hitConnectionReleased = 0 - private var _hitConnectionClosed = 0 + private struct Counts: Sendable { + var hitConnectionReleased = 0 + var hitConnectionClosed = 0 + } var hitConnectionReleased: Int { - self.lock.withLock { self._hitConnectionReleased } + self.counts.withLockedValue { $0.hitConnectionReleased } } var hitConnectionClosed: Int { - self.lock.withLock { self._hitConnectionClosed } + self.counts.withLockedValue { $0.hitConnectionClosed } } init() {} - func http1ConnectionReleased(_: HTTP1Connection) { - self.lock.withLock { - self._hitConnectionReleased += 1 + func http1ConnectionReleased(_: HTTPConnectionPool.Connection.ID) { + self.counts.withLockedValue { + $0.hitConnectionReleased += 1 } } - func http1ConnectionClosed(_: HTTP1Connection) { - self.lock.withLock { - self._hitConnectionClosed += 1 + func http1ConnectionClosed(_: HTTPConnectionPool.Connection.ID) { + self.counts.withLockedValue { + $0.hitConnectionClosed += 1 } } } diff --git a/Tests/AsyncHTTPClientTests/HTTP2ConnectionTests.swift b/Tests/AsyncHTTPClientTests/HTTP2ConnectionTests.swift index a50f1ab54..3244e2b5a 100644 --- a/Tests/AsyncHTTPClientTests/HTTP2ConnectionTests.swift +++ b/Tests/AsyncHTTPClientTests/HTTP2ConnectionTests.swift @@ -44,7 +44,7 @@ class HTTP2ConnectionTests: XCTestCase { decompression: .disabled, maximumConnectionUses: nil, logger: logger - ).wait() + ).map { _ in }.nonisolated().wait() ) } @@ -70,7 +70,7 @@ class HTTP2ConnectionTests: XCTestCase { XCTAssertThrowsError(try startFuture.wait()) // should not crash - connection.shutdown() + connection.sendableView.shutdown() } func testSimpleGetRequest() { @@ -83,7 +83,7 @@ class HTTP2ConnectionTests: XCTestCase { let connectionCreator = TestConnectionCreator() let delegate = TestHTTP2ConnectionDelegate() - var maybeHTTP2Connection: HTTP2Connection? + var maybeHTTP2Connection: HTTP2Connection.SendableView? XCTAssertNoThrow( maybeHTTP2Connection = try connectionCreator.createHTTP2Connection( to: httpBin.port, @@ -142,7 +142,7 @@ class HTTP2ConnectionTests: XCTestCase { let connectionCreator = TestConnectionCreator() let delegate = TestHTTP2ConnectionDelegate() - var maybeHTTP2Connection: HTTP2Connection? + var maybeHTTP2Connection: HTTP2Connection.SendableView? XCTAssertNoThrow( maybeHTTP2Connection = try connectionCreator.createHTTP2Connection( to: httpBin.port, @@ -210,7 +210,7 @@ class HTTP2ConnectionTests: XCTestCase { let connectionCreator = TestConnectionCreator() let delegate = TestHTTP2ConnectionDelegate() - var maybeHTTP2Connection: HTTP2Connection? + var maybeHTTP2Connection: HTTP2Connection.SendableView? XCTAssertNoThrow( maybeHTTP2Connection = try connectionCreator.createHTTP2Connection( to: httpBin.port, @@ -277,7 +277,7 @@ class HTTP2ConnectionTests: XCTestCase { func channelRead(context: ChannelHandlerContext, data: NIOAny) { self.dataArrivedPromise.succeed(()) - self.triggerResponseFuture.hop(to: context.eventLoop).whenSuccess { + self.triggerResponseFuture.hop(to: context.eventLoop).assumeIsolated().whenSuccess { switch self.unwrapInboundIn(data) { case .head: context.write(self.wrapOutboundOut(.head(.init(version: .http2, status: .ok))), promise: nil) @@ -305,7 +305,7 @@ class HTTP2ConnectionTests: XCTestCase { let connectionCreator = TestConnectionCreator() let delegate = TestHTTP2ConnectionDelegate() - var maybeHTTP2Connection: HTTP2Connection? + var maybeHTTP2Connection: HTTP2Connection.SendableView? XCTAssertNoThrow( maybeHTTP2Connection = try connectionCreator.createHTTP2Connection( to: httpBin.port, @@ -385,7 +385,7 @@ class HTTP2ConnectionTests: XCTestCase { } } -class TestConnectionCreator { +final class TestConnectionCreator { enum Error: Swift.Error { case alreadyCreatingAnotherConnection case wantedHTTP2ConnectionButGotHTTP1 @@ -394,12 +394,11 @@ class TestConnectionCreator { enum State { case idle - case waitingForHTTP1Connection(EventLoopPromise) - case waitingForHTTP2Connection(EventLoopPromise) + case waitingForHTTP1Connection(EventLoopPromise) + case waitingForHTTP2Connection(EventLoopPromise) } - private var state: State = .idle - private let lock = NIOLock() + private let lock = NIOLockedValueBox(.idle) init() {} @@ -409,7 +408,7 @@ class TestConnectionCreator { connectionID: HTTPConnectionPool.Connection.ID = 0, on eventLoop: EventLoop, logger: Logger = .init(label: "test") - ) throws -> HTTP1Connection { + ) throws -> HTTP1Connection.SendableView { let request = try! HTTPClient.Request(url: "https://localhost:\(port)") var tlsConfiguration = TLSConfiguration.makeClientConfiguration() @@ -423,13 +422,13 @@ class TestConnectionCreator { sslContextCache: .init() ) - let promise = try self.lock.withLock { () -> EventLoopPromise in - guard case .idle = self.state else { + let promise = try self.lock.withLockedValue { state in + guard case .idle = state else { throw Error.alreadyCreatingAnotherConnection } - let promise = eventLoop.makePromise(of: HTTP1Connection.self) - self.state = .waitingForHTTP1Connection(promise) + let promise = eventLoop.makePromise(of: HTTP1Connection.SendableView.self) + state = .waitingForHTTP1Connection(promise) return promise } @@ -452,7 +451,7 @@ class TestConnectionCreator { connectionID: HTTPConnectionPool.Connection.ID = 0, on eventLoop: EventLoop, logger: Logger = .init(label: "test") - ) throws -> HTTP2Connection { + ) throws -> HTTP2Connection.SendableView { let request = try! HTTPClient.Request(url: "https://localhost:\(port)") var tlsConfiguration = TLSConfiguration.makeClientConfiguration() @@ -466,13 +465,13 @@ class TestConnectionCreator { sslContextCache: .init() ) - let promise = try self.lock.withLock { () -> EventLoopPromise in - guard case .idle = self.state else { + let promise = try self.lock.withLockedValue { state in + guard case .idle = state else { throw Error.alreadyCreatingAnotherConnection } - let promise = eventLoop.makePromise(of: HTTP2Connection.self) - self.state = .waitingForHTTP2Connection(promise) + let promise = eventLoop.makePromise(of: HTTP2Connection.SendableView.self) + state = .waitingForHTTP2Connection(promise) return promise } @@ -491,7 +490,7 @@ class TestConnectionCreator { } extension TestConnectionCreator: HTTPConnectionRequester { - enum EitherPromiseWrapper { + enum EitherPromiseWrapper: Sendable { case succeed(EventLoopPromise, SucceedType) case fail(EventLoopPromise, Error) @@ -505,37 +504,38 @@ extension TestConnectionCreator: HTTPConnectionRequester { } } - func http1ConnectionCreated(_ connection: HTTP1Connection) { - let wrapper = self.lock.withLock { () -> (EitherPromiseWrapper) in + func http1ConnectionCreated(_ connection: HTTP1Connection.SendableView) { + let wrapper: EitherPromiseWrapper = self.lock + .withLockedValue { state in - switch self.state { - case .waitingForHTTP1Connection(let promise): - return .succeed(promise, connection) + switch state { + case .waitingForHTTP1Connection(let promise): + return .succeed(promise, connection) - case .waitingForHTTP2Connection(let promise): - return .fail(promise, Error.wantedHTTP2ConnectionButGotHTTP1) + case .waitingForHTTP2Connection(let promise): + return .fail(promise, Error.wantedHTTP2ConnectionButGotHTTP1) - case .idle: - preconditionFailure("Invalid state: \(self.state)") + case .idle: + preconditionFailure("Invalid state: \(state)") + } } - } wrapper.complete() } - func http2ConnectionCreated(_ connection: HTTP2Connection, maximumStreams: Int) { - let wrapper = self.lock.withLock { () -> (EitherPromiseWrapper) in + func http2ConnectionCreated(_ connection: HTTP2Connection.SendableView, maximumStreams: Int) { + let wrapper: EitherPromiseWrapper = self.lock + .withLockedValue { state in + switch state { + case .waitingForHTTP1Connection(let promise): + return .fail(promise, Error.wantedHTTP1ConnectionButGotHTTP2) - switch self.state { - case .waitingForHTTP1Connection(let promise): - return .fail(promise, Error.wantedHTTP1ConnectionButGotHTTP2) + case .waitingForHTTP2Connection(let promise): + return .succeed(promise, connection) - case .waitingForHTTP2Connection(let promise): - return .succeed(promise, connection) - - case .idle: - preconditionFailure("Invalid state: \(self.state)") + case .idle: + preconditionFailure("Invalid state: \(state)") + } } - } wrapper.complete() } @@ -554,19 +554,20 @@ extension TestConnectionCreator: HTTPConnectionRequester { } func failedToCreateHTTPConnection(_: HTTPConnectionPool.Connection.ID, error: Swift.Error) { - let wrapper = self.lock.withLock { () -> (FailPromiseWrapper) in + let wrapper: FailPromiseWrapper = self.lock + .withLockedValue { state in - switch self.state { - case .waitingForHTTP1Connection(let promise): - return .type1(promise) + switch state { + case .waitingForHTTP1Connection(let promise): + return .type1(promise) - case .waitingForHTTP2Connection(let promise): - return .type2(promise) + case .waitingForHTTP2Connection(let promise): + return .type2(promise) - case .idle: - preconditionFailure("Invalid state: \(self.state)") + case .idle: + preconditionFailure("Invalid state: \(state)") + } } - } wrapper.fail(error) } @@ -575,76 +576,78 @@ extension TestConnectionCreator: HTTPConnectionRequester { } } -class TestHTTP2ConnectionDelegate: HTTP2ConnectionDelegate { +final class TestHTTP2ConnectionDelegate: HTTP2ConnectionDelegate { var hitStreamClosed: Int { - self.lock.withLock { self._hitStreamClosed } + self.lock.withLockedValue { $0.hitStreamClosed } } var hitGoAwayReceived: Int { - self.lock.withLock { self._hitGoAwayReceived } + self.lock.withLockedValue { $0.hitGoAwayReceived } } var hitConnectionClosed: Int { - self.lock.withLock { self._hitConnectionClosed } + self.lock.withLockedValue { $0.hitConnectionClosed } } var maxStreamSetting: Int { - self.lock.withLock { self._maxStreamSetting } + self.lock.withLockedValue { $0.maxStreamSetting } } - private let lock = NIOLock() - private var _hitStreamClosed: Int = 0 - private var _hitGoAwayReceived: Int = 0 - private var _hitConnectionClosed: Int = 0 - private var _maxStreamSetting: Int = 100 + private let lock = NIOLockedValueBox(.init()) + private struct Counts { + var hitStreamClosed: Int = 0 + var hitGoAwayReceived: Int = 0 + var hitConnectionClosed: Int = 0 + var maxStreamSetting: Int = 100 + } init() {} - func http2Connection(_: HTTP2Connection, newMaxStreamSetting: Int) {} + func http2Connection(_: HTTPConnectionPool.Connection.ID, newMaxStreamSetting: Int) {} - func http2ConnectionStreamClosed(_: HTTP2Connection, availableStreams: Int) { - self.lock.withLock { - self._hitStreamClosed += 1 + func http2ConnectionStreamClosed(_: HTTPConnectionPool.Connection.ID, availableStreams: Int) { + self.lock.withLockedValue { + $0.hitStreamClosed += 1 } } - func http2ConnectionGoAwayReceived(_: HTTP2Connection) { - self.lock.withLock { - self._hitGoAwayReceived += 1 + func http2ConnectionGoAwayReceived(_: HTTPConnectionPool.Connection.ID) { + self.lock.withLockedValue { + $0.hitGoAwayReceived += 1 } } - func http2ConnectionClosed(_: HTTP2Connection) { - self.lock.withLock { - self._hitConnectionClosed += 1 + func http2ConnectionClosed(_: HTTPConnectionPool.Connection.ID) { + self.lock.withLockedValue { + $0.hitConnectionClosed += 1 } } } final class EmptyHTTP2ConnectionDelegate: HTTP2ConnectionDelegate { - func http2Connection(_: HTTP2Connection, newMaxStreamSetting: Int) { + func http2Connection(_: HTTPConnectionPool.Connection.ID, newMaxStreamSetting: Int) { preconditionFailure("Unimplemented") } - func http2ConnectionStreamClosed(_: HTTP2Connection, availableStreams: Int) { + func http2ConnectionStreamClosed(_: HTTPConnectionPool.Connection.ID, availableStreams: Int) { preconditionFailure("Unimplemented") } - func http2ConnectionGoAwayReceived(_: HTTP2Connection) { + func http2ConnectionGoAwayReceived(_: HTTPConnectionPool.Connection.ID) { preconditionFailure("Unimplemented") } - func http2ConnectionClosed(_: HTTP2Connection) { + func http2ConnectionClosed(_: HTTPConnectionPool.Connection.ID) { preconditionFailure("Unimplemented") } } final class EmptyHTTP1ConnectionDelegate: HTTP1ConnectionDelegate { - func http1ConnectionReleased(_: HTTP1Connection) { + func http1ConnectionReleased(_: HTTPConnectionPool.Connection.ID) { preconditionFailure("Unimplemented") } - func http1ConnectionClosed(_: HTTP1Connection) { + func http1ConnectionClosed(_: HTTPConnectionPool.Connection.ID) { preconditionFailure("Unimplemented") } } diff --git a/Tests/AsyncHTTPClientTests/HTTPConnectionPool+FactoryTests.swift b/Tests/AsyncHTTPClientTests/HTTPConnectionPool+FactoryTests.swift index d9dbd4cb1..a87299da1 100644 --- a/Tests/AsyncHTTPClientTests/HTTPConnectionPool+FactoryTests.swift +++ b/Tests/AsyncHTTPClientTests/HTTPConnectionPool+FactoryTests.swift @@ -194,11 +194,11 @@ class NeverrespondServerHandler: ChannelInboundHandler { /// A `HTTPConnectionRequester` that will fail a test if any of its methods are ever called. final class ExplodingRequester: HTTPConnectionRequester { - func http1ConnectionCreated(_: HTTP1Connection) { + func http1ConnectionCreated(_: HTTP1Connection.SendableView) { XCTFail("http1ConnectionCreated called unexpectedly") } - func http2ConnectionCreated(_: HTTP2Connection, maximumStreams: Int) { + func http2ConnectionCreated(_: HTTP2Connection.SendableView, maximumStreams: Int) { XCTFail("http2ConnectionCreated called unexpectedly") } diff --git a/Tests/AsyncHTTPClientTests/NWWaitingHandlerTests.swift b/Tests/AsyncHTTPClientTests/NWWaitingHandlerTests.swift index 033214ffe..63eaf649d 100644 --- a/Tests/AsyncHTTPClientTests/NWWaitingHandlerTests.swift +++ b/Tests/AsyncHTTPClientTests/NWWaitingHandlerTests.swift @@ -16,6 +16,7 @@ @testable import AsyncHTTPClient import Network import NIOCore +import NIOConcurrencyHelpers import NIOEmbedded import NIOSSL import NIOTransportServices @@ -23,21 +24,41 @@ import XCTest @available(macOS 10.14, iOS 12.0, tvOS 12.0, watchOS 5.0, *) class NWWaitingHandlerTests: XCTestCase { - class MockRequester: HTTPConnectionRequester { - var waitingForConnectivityCalled = false - var connectionID: AsyncHTTPClient.HTTPConnectionPool.Connection.ID? - var transientError: NWError? + final class MockRequester: HTTPConnectionRequester { + private struct State: Sendable { + var waitingForConnectivityCalled = false + var connectionID: AsyncHTTPClient.HTTPConnectionPool.Connection.ID? + var transientError: NWError? + } + + private let state = NIOLockedValueBox(State()) + + var waitingForConnectivityCalled: Bool { + self.state.withLockedValue { $0.waitingForConnectivityCalled } + } + + var connectionID: AsyncHTTPClient.HTTPConnectionPool.Connection.ID? { + self.state.withLockedValue { $0.connectionID } + } + + var transientError: NWError? { + self.state.withLockedValue { + $0.transientError + } + } - func http1ConnectionCreated(_: AsyncHTTPClient.HTTP1Connection) {} + func http1ConnectionCreated(_: AsyncHTTPClient.HTTP1Connection.SendableView) {} - func http2ConnectionCreated(_: AsyncHTTPClient.HTTP2Connection, maximumStreams: Int) {} + func http2ConnectionCreated(_: AsyncHTTPClient.HTTP2Connection.SendableView, maximumStreams: Int) {} func failedToCreateHTTPConnection(_: AsyncHTTPClient.HTTPConnectionPool.Connection.ID, error: Error) {} func waitingForConnectivity(_ connectionID: AsyncHTTPClient.HTTPConnectionPool.Connection.ID, error: Error) { - self.waitingForConnectivityCalled = true - self.connectionID = connectionID - self.transientError = error as? NWError + self.state.withLockedValue { + $0.waitingForConnectivityCalled = true + $0.connectionID = connectionID + $0.transientError = error as? NWError + } } } diff --git a/Tests/AsyncHTTPClientTests/TransactionTests.swift b/Tests/AsyncHTTPClientTests/TransactionTests.swift index 8609597b3..a2fa97418 100644 --- a/Tests/AsyncHTTPClientTests/TransactionTests.swift +++ b/Tests/AsyncHTTPClientTests/TransactionTests.swift @@ -318,7 +318,7 @@ final class TransactionTests: XCTestCase { let connectionCreator = TestConnectionCreator() let delegate = TestHTTP2ConnectionDelegate() - var maybeHTTP2Connection: HTTP2Connection? + var maybeHTTP2Connection: HTTP2Connection.SendableView? XCTAssertNoThrow( maybeHTTP2Connection = try connectionCreator.createHTTP2Connection( to: httpBin.port, @@ -522,7 +522,7 @@ final class TransactionTests: XCTestCase { let connectionCreator = TestConnectionCreator() let delegate = TestHTTP2ConnectionDelegate() - var maybeHTTP2Connection: HTTP2Connection? + var maybeHTTP2Connection: HTTP2Connection.SendableView? XCTAssertNoThrow( maybeHTTP2Connection = try connectionCreator.createHTTP2Connection( to: httpBin.port,