From 32c4e7f3cc7d81f7be82eeaacb6cdccab3544904 Mon Sep 17 00:00:00 2001 From: David Nadoba Date: Wed, 13 Oct 2021 15:57:36 +0200 Subject: [PATCH 1/6] integrate http2 state machine --- .../HTTPConnectionPool+HTTP1Connections.swift | 5 +- .../HTTPConnectionPool+HTTP2Connections.swift | 32 ++- ...HTTPConnectionPool+HTTP2StateMachine.swift | 46 ++- .../HTTPConnectionPool+StateMachine.swift | 192 +++++++++---- ...tionPool+HTTP2ConnectionsTest+XCTest.swift | 1 + ...PConnectionPool+HTTP2ConnectionsTest.swift | 65 +++++ ...onPool+HTTP2StateMachineTests+XCTest.swift | 3 + ...onnectionPool+HTTP2StateMachineTests.swift | 262 ++++++++++++++++++ 8 files changed, 540 insertions(+), 66 deletions(-) diff --git a/Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+HTTP1Connections.swift b/Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+HTTP1Connections.swift index 446720140..ce638ac29 100644 --- a/Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+HTTP1Connections.swift +++ b/Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+HTTP1Connections.swift @@ -618,9 +618,10 @@ extension HTTPConnectionPool { // TODO: improve algorithm to create connections uniformly across all preferred event loops // while paying attention to the number of queued request per event loop // Currently we start by creating new connections on the event loop with the most queued - // requests. If we have create a enough connections to cover all requests for the given + // requests. If we have created enough connections to cover all requests for the first // event loop we will continue with the event loop with the second most queued requests - // and so on and so forth. We do not need to sort the array because + // and so on and so forth. The `generalPurposeRequestCountGroupedByPreferredEventLoop` + // array is already ordered so we can just iterate over it without sorting by request count. let newGeneralPurposeConnections: [(Connection.ID, EventLoop)] = generalPurposeRequestCountGroupedByPreferredEventLoop // we do not want to allocated intermediate arrays. .lazy diff --git a/Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+HTTP2Connections.swift b/Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+HTTP2Connections.swift index 0a6cdbb16..66a2d69ee 100644 --- a/Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+HTTP2Connections.swift +++ b/Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+HTTP2Connections.swift @@ -404,9 +404,9 @@ extension HTTPConnectionPool { self.connections.removeAll { connection in switch connection.migrateToHTTP1(context: &context) { case .removeConnection: - return false - case .keepConnection: return true + case .keepConnection: + return false } } return context @@ -419,7 +419,7 @@ extension HTTPConnectionPool { self.connections.contains { $0.isActive } } - /// used in general purpose connection scenarios to check if at least one connection exist, or if should we create a new one + /// used in general purpose connection scenarios to check if at least one connection is starting or active for the given `eventLoop` var hasConnectionThatCanOrWillBeAbleToExecuteRequests: Bool { self.connections.contains { $0.canOrWillBeAbleToExecuteRequests } } @@ -433,6 +433,32 @@ extension HTTPConnectionPool { } } + func hasActiveConnection(for eventLoop: EventLoop) -> Bool { + self.connections.contains { + $0.eventLoop === eventLoop && $0.isActive + } + } + + /// used after backoff is done to determine if we need to create a new connection + func hasStartingOrActiveConnection() -> Bool { + self.connections.contains { connection in + connection.canOrWillBeAbleToExecuteRequests + } + } + + /// used after backoff is done to determine if we need to create a new connection + /// - Parameters: + /// - eventLoop: connection `EventLoop` to search for + /// - Returns: true if at least one connection is starting or active for the given `eventLoop` + func hasStartingOrActiveConnection( + for eventLoop: EventLoop + ) -> Bool { + self.connections.contains { connection in + connection.eventLoop === eventLoop && + connection.canOrWillBeAbleToExecuteRequests + } + } + mutating func createNewConnection(on eventLoop: EventLoop) -> Connection.ID { assert( !self.hasConnectionThatCanOrWillBeAbleToExecuteRequests(for: eventLoop), diff --git a/Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+HTTP2StateMachine.swift b/Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+HTTP2StateMachine.swift index 1d5a4f68d..7d3cfdf28 100644 --- a/Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+HTTP2StateMachine.swift +++ b/Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+HTTP2StateMachine.swift @@ -228,11 +228,22 @@ extension HTTPConnectionPool { private mutating func _newHTTP2ConnectionEstablished(_ connection: Connection, maxConcurrentStreams: Int) -> EstablishedAction { self.failedConsecutiveConnectionAttempts = 0 self.lastConnectFailure = nil - let (index, context) = self.connections.newHTTP2ConnectionEstablished( - connection, - maxConcurrentStreams: maxConcurrentStreams - ) - return self.nextActionForAvailableConnection(at: index, context: context) + if self.connections.hasActiveConnection(for: connection.eventLoop) { + guard let (index, _) = self.connections.failConnection(connection.id) else { + preconditionFailure("we connection to a connection which we no nothing about") + } + self.connections.removeConnection(at: index) + return .init( + request: .none, + connection: .closeConnection(connection, isShutdown: .no) + ) + } else { + let (index, context) = self.connections.newHTTP2ConnectionEstablished( + connection, + maxConcurrentStreams: maxConcurrentStreams + ) + return self.nextActionForAvailableConnection(at: index, context: context) + } } private mutating func nextActionForAvailableConnection( @@ -318,8 +329,28 @@ extension HTTPConnectionPool { private mutating func nextActionForFailedConnection(at index: Int, on eventLoop: EventLoop) -> Action { switch self.state { case .running: - let hasPendingRequest = !self.requests.isEmpty(for: eventLoop) || !self.requests.isEmpty(for: nil) - guard hasPendingRequest else { + // we do not know if we have created this connection for a request with a required + // event loop or not. However, we do not need this information and can infer + // if we need to create a new connection because we will only ever create one connection + // per event loop for required event loop requests and only need one connection for + // general purpose requests. + + // we need to start a new on connection in two cases: + let needGeneralPurposeConnection = + // 1. if we have general purpose requests + !self.requests.isEmpty(for: nil) && + // and no connection starting or active + !self.connections.hasStartingOrActiveConnection() + + let needRequiredEventLoopConnection = + // 2. or if we have requests for a required event loop + !self.requests.isEmpty(for: eventLoop) && + // and no connection starting or active for the given event loop + !self.connections.hasStartingOrActiveConnection(for: eventLoop) + + guard needGeneralPurposeConnection || needRequiredEventLoopConnection else { + // otherwise we can remove the connection + self.connections.removeConnection(at: index) return .none } @@ -330,6 +361,7 @@ extension HTTPConnectionPool { request: .none, connection: .createConnection(newConnectionID, on: eventLoop) ) + case .shuttingDown(let unclean): assert(self.requests.isEmpty) self.connections.removeConnection(at: index) diff --git a/Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+StateMachine.swift b/Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+StateMachine.swift index dab1354c9..8453ff459 100644 --- a/Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+StateMachine.swift +++ b/Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+StateMachine.swift @@ -68,6 +68,23 @@ extension HTTPConnectionPool { enum HTTPVersionState { case http1(HTTP1StateMachine) + case http2(HTTP2StateMachine) + + mutating func modify( + http1: (inout HTTP1StateMachine) -> ReturnValue, + http2: (inout HTTP2StateMachine) -> ReturnValue + ) -> ReturnValue { + let returnValue: ReturnValue + switch self { + case .http1(var http1State): + returnValue = http1(&http1State) + self = .http1(http1State) + case .http2(var http2State): + returnValue = http2(&http2State) + self = .http2(http2State) + } + return returnValue + } } var state: HTTPVersionState @@ -87,12 +104,11 @@ extension HTTPConnectionPool { } mutating func executeRequest(_ request: Request) -> Action { - switch self.state { - case .http1(var http1StateMachine): - let action = http1StateMachine.executeRequest(request) - self.state = .http1(http1StateMachine) - return action - } + self.state.modify(http1: { http1 in + http1.executeRequest(request) + }, http2: { http2 in + http2.executeRequest(request) + }) } mutating func newHTTP1ConnectionCreated(_ connection: Connection) -> Action { @@ -101,28 +117,100 @@ extension HTTPConnectionPool { let action = http1StateMachine.newHTTP1ConnectionEstablished(connection) self.state = .http1(http1StateMachine) return action + + case .http2(let http2StateMachine): + var http1StateMachine = HTTP1StateMachine( + idGenerator: self.idGenerator, + maximumConcurrentConnections: self.maximumConcurrentHTTP1Connections + ) + + let newConnectionAction = http1StateMachine.migrateFromHTTP2( + http2State: http2StateMachine, + newHTTP1Connection: connection + ) + self.state = .http1(http1StateMachine) + return newConnectionAction } } - mutating func failedToCreateNewConnection(_ error: Error, connectionID: Connection.ID) -> Action { + mutating func newHTTP2ConnectionCreated(_ connection: Connection, maxConcurrentStreams: Int) -> Action { switch self.state { - case .http1(var http1StateMachine): - let action = http1StateMachine.failedToCreateNewConnection( + case .http1(let http1StateMachine): + + var http2StateMachine = HTTP2StateMachine( + idGenerator: self.idGenerator + ) + let migrationAction = http2StateMachine.migrateFromHTTP1( + http1State: http1StateMachine, + newHTTP2Connection: connection, + maxConcurrentStreams: maxConcurrentStreams + ) + + self.state = .http2(http2StateMachine) + return migrationAction + + case .http2(var http2StateMachine): + let newConnectionAction = http2StateMachine.newHTTP2ConnectionEstablished( + connection, + maxConcurrentStreams: maxConcurrentStreams + ) + self.state = .http2(http2StateMachine) + return newConnectionAction + } + } + + mutating func newHTTP2MaxConcurrentStreamsReceived(_ connectionID: Connection.ID, newMaxStreams: Int) -> Action { + self.state.modify(http1: { http1 in + http1.newHTTP2MaxConcurrentStreamsReceived(connectionID, newMaxStreams: newMaxStreams) + }, http2: { http2 in + http2.newHTTP2MaxConcurrentStreamsReceived(connectionID, newMaxStreams: newMaxStreams) + }) + } + + mutating func http2ConnectionGoAwayReceived(_ connectionID: Connection.ID) -> Action { + self.state.modify(http1: { http1 in + http1.http2ConnectionGoAwayReceived(connectionID) + }, http2: { http2 in + http2.http2ConnectionGoAwayReceived(connectionID) + }) + } + + mutating func http2ConnectionClosed(_ connectionID: Connection.ID) -> Action { + self.state.modify(http1: { http1 in + http1.http2ConnectionClosed(connectionID) + }, http2: { http2 in + http2.http2ConnectionClosed(connectionID) + }) + } + + mutating func http2ConnectionStreamClosed(_ connectionID: Connection.ID) -> Action { + self.state.modify(http1: { http1 in + http1.http2ConnectionStreamClosed(connectionID) + }, http2: { http2 in + http2.http2ConnectionStreamClosed(connectionID) + }) + } + + mutating func failedToCreateNewConnection(_ error: Error, connectionID: Connection.ID) -> Action { + self.state.modify(http1: { http1 in + http1.failedToCreateNewConnection( error, connectionID: connectionID ) - self.state = .http1(http1StateMachine) - return action - } + }, http2: { http2 in + http2.failedToCreateNewConnection( + error, + connectionID: connectionID + ) + }) } mutating func connectionCreationBackoffDone(_ connectionID: Connection.ID) -> Action { - switch self.state { - case .http1(var http1StateMachine): - let action = http1StateMachine.connectionCreationBackoffDone(connectionID) - self.state = .http1(http1StateMachine) - return action - } + self.state.modify(http1: { http1 in + http1.connectionCreationBackoffDone(connectionID) + }, http2: { http2 in + http2.connectionCreationBackoffDone(connectionID) + }) } /// A request has timed out. @@ -131,12 +219,11 @@ extension HTTPConnectionPool { /// request, but don't need to cancel the timer (it already triggered). If a request is cancelled /// we don't need to fail it but we need to cancel its timeout timer. mutating func timeoutRequest(_ requestID: Request.ID) -> Action { - switch self.state { - case .http1(var http1StateMachine): - let action = http1StateMachine.timeoutRequest(requestID) - self.state = .http1(http1StateMachine) - return action - } + self.state.modify(http1: { http1 in + http1.timeoutRequest(requestID) + }, http2: { http2 in + http2.timeoutRequest(requestID) + }) } /// A request was cancelled. @@ -145,40 +232,36 @@ extension HTTPConnectionPool { /// need to cancel its timeout timer. If a request times out, we need to fail the request, but don't /// need to cancel the timer (it already triggered). mutating func cancelRequest(_ requestID: Request.ID) -> Action { - switch self.state { - case .http1(var http1StateMachine): - let action = http1StateMachine.cancelRequest(requestID) - self.state = .http1(http1StateMachine) - return action - } + self.state.modify(http1: { http1 in + http1.cancelRequest(requestID) + }, http2: { http2 in + http2.cancelRequest(requestID) + }) } mutating func connectionIdleTimeout(_ connectionID: Connection.ID) -> Action { - switch self.state { - case .http1(var http1StateMachine): - let action = http1StateMachine.connectionIdleTimeout(connectionID) - self.state = .http1(http1StateMachine) - return action - } + self.state.modify(http1: { http1 in + http1.connectionIdleTimeout(connectionID) + }, http2: { http2 in + http2.connectionIdleTimeout(connectionID) + }) } /// A connection has been closed mutating func http1ConnectionClosed(_ connectionID: Connection.ID) -> Action { - switch self.state { - case .http1(var http1StateMachine): - let action = http1StateMachine.http1ConnectionClosed(connectionID) - self.state = .http1(http1StateMachine) - return action - } + self.state.modify(http1: { http1 in + http1.http1ConnectionClosed(connectionID) + }, http2: { http2 in + http2.http1ConnectionClosed(connectionID) + }) } mutating func http1ConnectionReleased(_ connectionID: Connection.ID) -> Action { - switch self.state { - case .http1(var http1StateMachine): - let action = http1StateMachine.http1ConnectionReleased(connectionID) - self.state = .http1(http1StateMachine) - return action - } + self.state.modify(http1: { http1 in + http1.http1ConnectionReleased(connectionID) + }, http2: { http2 in + http2.http1ConnectionReleased(connectionID) + }) } mutating func shutdown() -> Action { @@ -186,12 +269,11 @@ extension HTTPConnectionPool { self.isShuttingDown = true - switch self.state { - case .http1(var http1StateMachine): - let action = http1StateMachine.shutdown() - self.state = .http1(http1StateMachine) - return action - } + return self.state.modify(http1: { http1 in + http1.shutdown() + }, http2: { http2 in + http2.shutdown() + }) } } } @@ -221,6 +303,8 @@ extension HTTPConnectionPool.StateMachine: CustomStringConvertible { switch self.state { case .http1(let http1): return ".http1(\(http1))" + case .http2(let http2): + return ".http2(\(http2))" } } } diff --git a/Tests/AsyncHTTPClientTests/HTTPConnectionPool+HTTP2ConnectionsTest+XCTest.swift b/Tests/AsyncHTTPClientTests/HTTPConnectionPool+HTTP2ConnectionsTest+XCTest.swift index 2ad98202a..f9afa713e 100644 --- a/Tests/AsyncHTTPClientTests/HTTPConnectionPool+HTTP2ConnectionsTest+XCTest.swift +++ b/Tests/AsyncHTTPClientTests/HTTPConnectionPool+HTTP2ConnectionsTest+XCTest.swift @@ -40,6 +40,7 @@ extension HTTPConnectionPool_HTTP2ConnectionsTests { ("testNewMaxConcurrentStreamsSetting", testNewMaxConcurrentStreamsSetting), ("testLeaseOnPreferredEventLoopWithoutAnyAvailable", testLeaseOnPreferredEventLoopWithoutAnyAvailable), ("testMigrationFromHTTP1", testMigrationFromHTTP1), + ("testMigrationToHTTP1", testMigrationToHTTP1), ("testMigrationFromHTTP1WithPendingRequestsWithRequiredEventLoop", testMigrationFromHTTP1WithPendingRequestsWithRequiredEventLoop), ("testMigrationFromHTTP1WithAlreadyEstablishedHTTP2Connection", testMigrationFromHTTP1WithAlreadyEstablishedHTTP2Connection), ] diff --git a/Tests/AsyncHTTPClientTests/HTTPConnectionPool+HTTP2ConnectionsTest.swift b/Tests/AsyncHTTPClientTests/HTTPConnectionPool+HTTP2ConnectionsTest.swift index 0d3f45ad0..804707959 100644 --- a/Tests/AsyncHTTPClientTests/HTTPConnectionPool+HTTP2ConnectionsTest.swift +++ b/Tests/AsyncHTTPClientTests/HTTPConnectionPool+HTTP2ConnectionsTest.swift @@ -556,6 +556,71 @@ class HTTPConnectionPool_HTTP2ConnectionsTests: XCTestCase { ) } + func testMigrationToHTTP1() { + let elg = EmbeddedEventLoopGroup(loops: 4) + let generator = HTTPConnectionPool.Connection.ID.Generator() + var connections = HTTPConnectionPool.HTTP2Connections(generator: generator) + let el1 = elg.next() + let el2 = elg.next() + let el3 = elg.next() + let el4 = elg.next() + + let conn1ID = generator.next() + let conn2ID = generator.next() + let conn3ID = generator.next() + let conn4ID = generator.next() + + connections.migrateFromHTTP1( + starting: [(conn1ID, el1), (conn2ID, el2), (conn3ID, el3)], + backingOff: [(conn4ID, el4)] + ) + + let conn1: HTTPConnectionPool.Connection = .__testOnly_connection(id: conn1ID, eventLoop: el1) + let (conn1Index, conn1CreatedContext) = connections.newHTTP2ConnectionEstablished(conn1, maxConcurrentStreams: 100) + XCTAssertEqual(conn1CreatedContext.availableStreams, 100) + + let (leasedConn1, leasdConnContext1) = connections.leaseStreams(at: conn1Index, count: 2) + XCTAssertEqual(leasedConn1, conn1) + XCTAssertEqual(leasdConnContext1.wasIdle, true) + + let conn2: HTTPConnectionPool.Connection = .__testOnly_connection(id: conn2ID, eventLoop: el2) + let (_, conn2CreatedContext) = connections.newHTTP2ConnectionEstablished(conn2, maxConcurrentStreams: 100) + XCTAssertEqual(conn2CreatedContext.availableStreams, 100) + + XCTAssertEqual( + connections.stats, + .init( + startingConnections: 1, + backingOffConnections: 1, + idleConnections: 1, + availableConnections: 2, + drainingConnections: 0, + leasedStreams: 2, + availableStreams: 198 + ) + ) + + let migrationContext = connections.migrateToHTTP1() + XCTAssertEqual(migrationContext.close, [conn2]) + XCTAssertEqual(migrationContext.starting.map { $0.0 }, [conn3ID]) + XCTAssertEqual(migrationContext.starting.map { $0.1.id }, [el3.id]) + XCTAssertEqual(migrationContext.backingOff.map { $0.0 }, [conn4ID]) + XCTAssertEqual(migrationContext.backingOff.map { $0.1.id }, [el4.id]) + + XCTAssertEqual( + connections.stats, + .init( + startingConnections: 0, + backingOffConnections: 0, + idleConnections: 0, + availableConnections: 1, + drainingConnections: 0, + leasedStreams: 2, + availableStreams: 98 + ) + ) + } + func testMigrationFromHTTP1WithPendingRequestsWithRequiredEventLoop() { let elg = EmbeddedEventLoopGroup(loops: 4) let generator = HTTPConnectionPool.Connection.ID.Generator() diff --git a/Tests/AsyncHTTPClientTests/HTTPConnectionPool+HTTP2StateMachineTests+XCTest.swift b/Tests/AsyncHTTPClientTests/HTTPConnectionPool+HTTP2StateMachineTests+XCTest.swift index a54d8c578..dbb489503 100644 --- a/Tests/AsyncHTTPClientTests/HTTPConnectionPool+HTTP2StateMachineTests+XCTest.swift +++ b/Tests/AsyncHTTPClientTests/HTTPConnectionPool+HTTP2StateMachineTests+XCTest.swift @@ -36,6 +36,9 @@ extension HTTPConnectionPool_HTTP2StateMachineTests { ("testGoAwayOnIdleConnection", testGoAwayOnIdleConnection), ("testGoAwayWithLeasedStream", testGoAwayWithLeasedStream), ("testGoAwayWithPendingRequestsStartsNewConnection", testGoAwayWithPendingRequestsStartsNewConnection), + ("testMigrationFromHTTP1ToHTTP2", testMigrationFromHTTP1ToHTTP2), + ("testMigrationFromHTTP1ToHTTP2WithAlreadyStartedHTTP1Connections", testMigrationFromHTTP1ToHTTP2WithAlreadyStartedHTTP1Connections), + ("testHTTP2toHTTP1Migration", testHTTP2toHTTP1Migration), ] } } diff --git a/Tests/AsyncHTTPClientTests/HTTPConnectionPool+HTTP2StateMachineTests.swift b/Tests/AsyncHTTPClientTests/HTTPConnectionPool+HTTP2StateMachineTests.swift index ce728147f..612e5221c 100644 --- a/Tests/AsyncHTTPClientTests/HTTPConnectionPool+HTTP2StateMachineTests.swift +++ b/Tests/AsyncHTTPClientTests/HTTPConnectionPool+HTTP2StateMachineTests.swift @@ -577,4 +577,266 @@ class HTTPConnectionPool_HTTP2StateMachineTests: XCTestCase { XCTAssertEqual(closeStream2Action.request, .none) XCTAssertEqual(closeStream2Action.connection, .scheduleTimeoutTimer(conn2ID, on: el1)) } + + func testMigrationFromHTTP1ToHTTP2() { + let elg = EmbeddedEventLoopGroup(loops: 1) + let el1 = elg.next() + var connections = MockConnectionPool() + var queuer = MockRequestQueuer() + var state = HTTPConnectionPool.StateMachine(idGenerator: .init(), maximumConcurrentHTTP1Connections: 8) + + /// first 8 request should create a new connection + var connectionIDs: [HTTPConnectionPool.Connection.ID] = [] + for _ in 0..<8 { + let mockRequest = MockHTTPRequest(eventLoop: el1) + let request = HTTPConnectionPool.Request(mockRequest) + let action = state.executeRequest(request) + guard case .createConnection(let connID, let eventLoop) = action.connection else { + return XCTFail("Unexpected connection action \(action.connection)") + } + connectionIDs.append(connID) + XCTAssertTrue(eventLoop === el1) + XCTAssertEqual(action.request, .scheduleRequestTimeout(for: request, on: mockRequest.eventLoop)) + XCTAssertNoThrow(try connections.createConnection(connID, on: el1)) + XCTAssertNoThrow(try queuer.queue(mockRequest, id: request.id)) + } + + guard let conn1ID = connectionIDs.first else { + return XCTFail("could not create connection") + } + + /// after we reached the `maximumConcurrentHTTP1Connections`, we will not create new connections + for _ in 0..<8 { + let mockRequest = MockHTTPRequest(eventLoop: el1) + let request = HTTPConnectionPool.Request(mockRequest) + let action = state.executeRequest(request) + XCTAssertEqual(action.connection, .none) + XCTAssertEqual(action.request, .scheduleRequestTimeout(for: request, on: mockRequest.eventLoop)) + + XCTAssertNoThrow(try queuer.queue(mockRequest, id: request.id)) + } + + /// first new HTTP2 connection should migrate from HTTP1 to HTTP2 and execute requests + let conn1: HTTPConnectionPool.Connection = .__testOnly_connection(id: conn1ID, eventLoop: el1) + let migrationAction = state.newHTTP2ConnectionCreated(conn1, maxConcurrentStreams: 10) + guard case .executeRequestsAndCancelTimeouts(let requests, let conn) = migrationAction.request else { + return XCTFail("unexpected request action \(migrationAction.request)") + } + XCTAssertEqual(conn, conn1) + XCTAssertEqual(requests.count, 10) + + for request in requests { + XCTAssertNoThrow(try queuer.get(request.id, request: request.__testOnly_wrapped_request())) + } + + XCTAssertEqual(migrationAction.connection, .migration( + createConnections: [], + closeConnections: [], + scheduleTimeout: nil + )) + + /// remaining connections should be closed immediately without executing any request + for connID in connectionIDs.dropFirst() { + let conn: HTTPConnectionPool.Connection = .__testOnly_connection(id: connID, eventLoop: el1) + let action = state.newHTTP2ConnectionCreated(conn, maxConcurrentStreams: 10) + XCTAssertEqual(action.request, .none) + XCTAssertEqual(action.connection, .closeConnection(conn, isShutdown: .no)) + } + + /// closing a stream while we have requests queued should result in one request execution action + for _ in 0..<6 { + let action = state.http2ConnectionStreamClosed(conn1ID) + XCTAssertEqual(action.connection, .none) + guard case .executeRequestsAndCancelTimeouts(let requests, conn) = action.request else { + return XCTFail("Unexpected request action \(action.request)") + } + XCTAssertEqual(requests.count, 1) + for request in requests { + XCTAssertNoThrow(try queuer.cancel(request.id)) + } + } + XCTAssertTrue(queuer.isEmpty) + } + + func testMigrationFromHTTP1ToHTTP2WithAlreadyStartedHTTP1Connections() { + let elg = EmbeddedEventLoopGroup(loops: 1) + let el1 = elg.next() + var connections = MockConnectionPool() + var queuer = MockRequestQueuer() + var state = HTTPConnectionPool.StateMachine(idGenerator: .init(), maximumConcurrentHTTP1Connections: 8) + + /// first 8 request should create a new connection + var connectionIDs: [HTTPConnectionPool.Connection.ID] = [] + for _ in 0..<8 { + let mockRequest = MockHTTPRequest(eventLoop: el1) + let request = HTTPConnectionPool.Request(mockRequest) + let action = state.executeRequest(request) + guard case .createConnection(let connID, let eventLoop) = action.connection else { + return XCTFail("Unexpected connection action \(action.connection)") + } + connectionIDs.append(connID) + XCTAssertTrue(eventLoop === el1) + XCTAssertEqual(action.request, .scheduleRequestTimeout(for: request, on: mockRequest.eventLoop)) + XCTAssertNoThrow(try connections.createConnection(connID, on: el1)) + XCTAssertNoThrow(try queuer.queue(mockRequest, id: request.id)) + } + + /// after we reached the `maximumConcurrentHTTP1Connections`, we will not create new connections + for _ in 0..<8 { + let mockRequest = MockHTTPRequest(eventLoop: el1) + let request = HTTPConnectionPool.Request(mockRequest) + let action = state.executeRequest(request) + XCTAssertEqual(action.connection, .none) + XCTAssertEqual(action.request, .scheduleRequestTimeout(for: request, on: mockRequest.eventLoop)) + XCTAssertNoThrow(try queuer.queue(mockRequest, id: request.id)) + } + + let http1ConnIDs = connectionIDs.prefix(4) + let succesfullHTTP1ConnIDs = http1ConnIDs.prefix(2) + let failedHTTP1ConnIDs = http1ConnIDs.dropFirst(2) + + /// new http1 connection should execute 1 request + for connID in succesfullHTTP1ConnIDs { + let conn: HTTPConnectionPool.Connection = .__testOnly_connection(id: connID, eventLoop: el1) + let action = state.newHTTP1ConnectionCreated(conn) + guard case .executeRequest(let request, conn, cancelTimeout: true) = action.request else { + return XCTFail("unexpected request action \(action.request)") + } + XCTAssertEqual(action.connection, .none) + XCTAssertNoThrow(try queuer.get(request.id, request: request.__testOnly_wrapped_request())) + } + + /// failing connection should backoff connection + for connID in failedHTTP1ConnIDs { + struct SomeError: Error {} + let action = state.failedToCreateNewConnection(SomeError(), connectionID: connID) + guard case .scheduleBackoffTimer(connID, backoff: _, let el) = action.connection else { + return XCTFail("unexpected connection action \(action.connection)") + } + XCTAssertEqual(action.request, .none) + XCTAssertTrue(el === el1) + } + + let http2ConnectionIDs = Array(connectionIDs.dropFirst(4)) + + guard let firstHTTP2ConnID = http2ConnectionIDs.first else { + return XCTFail("could not create connection") + } + + /// first new HTTP2 connection should migrate from HTTP1 to HTTP2 and execute requests + let http2Conn: HTTPConnectionPool.Connection = .__testOnly_connection(id: firstHTTP2ConnID, eventLoop: el1) + let migrationAction = state.newHTTP2ConnectionCreated(http2Conn, maxConcurrentStreams: 10) + guard case .executeRequestsAndCancelTimeouts(let requests, let conn) = migrationAction.request else { + return XCTFail("unexpected request action \(migrationAction.request)") + } + XCTAssertEqual(migrationAction.connection, .migration(createConnections: [], closeConnections: [], scheduleTimeout: nil)) + + XCTAssertEqual(conn, http2Conn) + XCTAssertEqual(requests.count, 10) + + for request in requests { + XCTAssertNoThrow(try queuer.get(request.id, request: request.__testOnly_wrapped_request())) + } + + /// remaining connections should be closed immediately without executing any request + for connID in http2ConnectionIDs.dropFirst() { + let conn: HTTPConnectionPool.Connection = .__testOnly_connection(id: connID, eventLoop: el1) + let action = state.newHTTP2ConnectionCreated(conn, maxConcurrentStreams: 10) + XCTAssertEqual(action.request, .none) + XCTAssertEqual(action.connection, .closeConnection(conn, isShutdown: .no)) + } + + /// after a request has finished on a http1 connection, the connection should be closed + /// because we are now in http/2 mode + for http1ConnectionID in succesfullHTTP1ConnIDs { + let action = state.http1ConnectionReleased(http1ConnectionID) + XCTAssertEqual(action.request, .none) + guard case .closeConnection(let conn, isShutdown: .no) = action.connection else { + return XCTFail("unexpected connection action \(migrationAction.connection)") + } + XCTAssertEqual(conn.id, http1ConnectionID) + } + + /// if a backoff timer fires for an old http1 connection we should not start a new connection + /// because we are already in http2 mode + for http1ConnectionID in failedHTTP1ConnIDs { + let action = state.connectionCreationBackoffDone(http1ConnectionID) + XCTAssertEqual(action, .none) + } + + /// closing a stream while we have requests queued should result in one request execution action + for _ in 0..<4 { + let action = state.http2ConnectionStreamClosed(http2Conn.id) + XCTAssertEqual(action.connection, .none) + guard case .executeRequestsAndCancelTimeouts(let requests, http2Conn) = action.request else { + return XCTFail("Unexpected request action \(action.request)") + } + XCTAssertEqual(requests.count, 1) + for request in requests { + XCTAssertNoThrow(try queuer.cancel(request.id)) + } + } + + XCTAssertTrue(queuer.isEmpty) + } + + func testHTTP2toHTTP1Migration() { + let elg = EmbeddedEventLoopGroup(loops: 2) + let el1 = elg.next() + let el2 = elg.next() + var connections = MockConnectionPool() + var queuer = MockRequestQueuer() + var state = HTTPConnectionPool.StateMachine(idGenerator: .init(), maximumConcurrentHTTP1Connections: 8) + + // create http2 connection + let mockRequest = MockHTTPRequest(eventLoop: el1) + let request = HTTPConnectionPool.Request(mockRequest) + let action1 = state.executeRequest(request) + guard case .createConnection(let http2ConnID, let eventLoop) = action1.connection else { + return XCTFail("Unexpected connection action \(action1.connection)") + } + XCTAssertTrue(eventLoop === el1) + XCTAssertEqual(action1.request, .scheduleRequestTimeout(for: request, on: mockRequest.eventLoop)) + XCTAssertNoThrow(try connections.createConnection(http2ConnID, on: el1)) + XCTAssertNoThrow(try queuer.queue(mockRequest, id: request.id)) + let http2Conn: HTTPConnectionPool.Connection = .__testOnly_connection(id: http2ConnID, eventLoop: el1) + let migrationAction1 = state.newHTTP2ConnectionCreated(http2Conn, maxConcurrentStreams: 10) + guard case .executeRequestsAndCancelTimeouts(let requests, http2Conn) = migrationAction1.request else { + return XCTFail("unexpected request action \(migrationAction1.request)") + } + XCTAssertEqual(migrationAction1.connection, .migration(createConnections: [], closeConnections: [], scheduleTimeout: nil)) + XCTAssertEqual(requests.count, 1) + for request in requests { + XCTAssertNoThrow(try queuer.get(request.id, request: request.__testOnly_wrapped_request())) + } + + // a request with new required event loop should create a new connection + let mockRequestWithRequiredEventLoop = MockHTTPRequest(eventLoop: el2, requiresEventLoopForChannel: true) + let requestWithRequiredEventLoop = HTTPConnectionPool.Request(mockRequestWithRequiredEventLoop) + let action2 = state.executeRequest(requestWithRequiredEventLoop) + guard case .createConnection(let http1ConnId, let eventLoop) = action2.connection else { + return XCTFail("Unexpected connection action \(action2.connection)") + } + XCTAssertTrue(eventLoop === el2) + XCTAssertEqual(action2.request, .scheduleRequestTimeout(for: requestWithRequiredEventLoop, on: mockRequestWithRequiredEventLoop.eventLoop)) + XCTAssertNoThrow(try connections.createConnection(http1ConnId, on: el2)) + XCTAssertNoThrow(try queuer.queue(mockRequestWithRequiredEventLoop, id: requestWithRequiredEventLoop.id)) + + // if we established a new http/1 connection we should migrate back to http/1 + let http1Conn: HTTPConnectionPool.Connection = .__testOnly_connection(id: http1ConnId, eventLoop: el2) + let migrationAction2 = state.newHTTP1ConnectionCreated(http1Conn) + guard case .executeRequest(let request, http1Conn, cancelTimeout: true) = migrationAction2.request else { + return XCTFail("unexpected request action \(migrationAction2.request)") + } + guard case .migration(let createConnections, closeConnections: [], scheduleTimeout: nil) = migrationAction2.connection else { + return XCTFail("unexpected connection action \(migrationAction2.connection)") + } + XCTAssertEqual(createConnections.map { $0.1.id }, [el2.id]) + XCTAssertNoThrow(try queuer.get(request.id, request: request.__testOnly_wrapped_request())) + + // in http/1 state, we should close idle http2 connections + let releaseAction = state.http2ConnectionStreamClosed(http2Conn.id) + XCTAssertEqual(releaseAction.connection, .closeConnection(http2Conn, isShutdown: .no)) + XCTAssertEqual(releaseAction.request, .none) + } } From 0d0375ae445781157c37c7ae89adb52ec82175d4 Mon Sep 17 00:00:00 2001 From: David Nadoba Date: Thu, 28 Oct 2021 22:24:49 +0200 Subject: [PATCH 2/6] fix compilation for Swift 5.2 & 5.3 --- ...onnectionPool+HTTP2StateMachineTests.swift | 20 +++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/Tests/AsyncHTTPClientTests/HTTPConnectionPool+HTTP2StateMachineTests.swift b/Tests/AsyncHTTPClientTests/HTTPConnectionPool+HTTP2StateMachineTests.swift index 612e5221c..7a80c97ef 100644 --- a/Tests/AsyncHTTPClientTests/HTTPConnectionPool+HTTP2StateMachineTests.swift +++ b/Tests/AsyncHTTPClientTests/HTTPConnectionPool+HTTP2StateMachineTests.swift @@ -790,15 +790,15 @@ class HTTPConnectionPool_HTTP2StateMachineTests: XCTestCase { // create http2 connection let mockRequest = MockHTTPRequest(eventLoop: el1) - let request = HTTPConnectionPool.Request(mockRequest) - let action1 = state.executeRequest(request) - guard case .createConnection(let http2ConnID, let eventLoop) = action1.connection else { + let request1 = HTTPConnectionPool.Request(mockRequest) + let action1 = state.executeRequest(request1) + guard case .createConnection(let http2ConnID, let http2EventLoop) = action1.connection else { return XCTFail("Unexpected connection action \(action1.connection)") } - XCTAssertTrue(eventLoop === el1) - XCTAssertEqual(action1.request, .scheduleRequestTimeout(for: request, on: mockRequest.eventLoop)) + XCTAssertTrue(http2EventLoop === el1) + XCTAssertEqual(action1.request, .scheduleRequestTimeout(for: request1, on: mockRequest.eventLoop)) XCTAssertNoThrow(try connections.createConnection(http2ConnID, on: el1)) - XCTAssertNoThrow(try queuer.queue(mockRequest, id: request.id)) + XCTAssertNoThrow(try queuer.queue(mockRequest, id: request1.id)) let http2Conn: HTTPConnectionPool.Connection = .__testOnly_connection(id: http2ConnID, eventLoop: el1) let migrationAction1 = state.newHTTP2ConnectionCreated(http2Conn, maxConcurrentStreams: 10) guard case .executeRequestsAndCancelTimeouts(let requests, http2Conn) = migrationAction1.request else { @@ -814,10 +814,10 @@ class HTTPConnectionPool_HTTP2StateMachineTests: XCTestCase { let mockRequestWithRequiredEventLoop = MockHTTPRequest(eventLoop: el2, requiresEventLoopForChannel: true) let requestWithRequiredEventLoop = HTTPConnectionPool.Request(mockRequestWithRequiredEventLoop) let action2 = state.executeRequest(requestWithRequiredEventLoop) - guard case .createConnection(let http1ConnId, let eventLoop) = action2.connection else { + guard case .createConnection(let http1ConnId, let http1EventLoop) = action2.connection else { return XCTFail("Unexpected connection action \(action2.connection)") } - XCTAssertTrue(eventLoop === el2) + XCTAssertTrue(http1EventLoop === el2) XCTAssertEqual(action2.request, .scheduleRequestTimeout(for: requestWithRequiredEventLoop, on: mockRequestWithRequiredEventLoop.eventLoop)) XCTAssertNoThrow(try connections.createConnection(http1ConnId, on: el2)) XCTAssertNoThrow(try queuer.queue(mockRequestWithRequiredEventLoop, id: requestWithRequiredEventLoop.id)) @@ -825,14 +825,14 @@ class HTTPConnectionPool_HTTP2StateMachineTests: XCTestCase { // if we established a new http/1 connection we should migrate back to http/1 let http1Conn: HTTPConnectionPool.Connection = .__testOnly_connection(id: http1ConnId, eventLoop: el2) let migrationAction2 = state.newHTTP1ConnectionCreated(http1Conn) - guard case .executeRequest(let request, http1Conn, cancelTimeout: true) = migrationAction2.request else { + guard case .executeRequest(let request2, http1Conn, cancelTimeout: true) = migrationAction2.request else { return XCTFail("unexpected request action \(migrationAction2.request)") } guard case .migration(let createConnections, closeConnections: [], scheduleTimeout: nil) = migrationAction2.connection else { return XCTFail("unexpected connection action \(migrationAction2.connection)") } XCTAssertEqual(createConnections.map { $0.1.id }, [el2.id]) - XCTAssertNoThrow(try queuer.get(request.id, request: request.__testOnly_wrapped_request())) + XCTAssertNoThrow(try queuer.get(request2.id, request: request2.__testOnly_wrapped_request())) // in http/1 state, we should close idle http2 connections let releaseAction = state.http2ConnectionStreamClosed(http2Conn.id) From 75763bf26b5dc68e90964d6939eb764954196a91 Mon Sep 17 00:00:00 2001 From: David Nadoba Date: Fri, 29 Oct 2021 08:52:42 +0200 Subject: [PATCH 3/6] fix review comments - fix code formatting style - remove migration convience overload --- .../HTTPConnectionPool+HTTP1StateMachine.swift | 12 ------------ .../HTTPConnectionPool+HTTP2StateMachine.swift | 14 -------------- .../HTTPConnectionPool+StateMachine.swift | 18 ++++++++---------- ...ConnectionPool+HTTP2StateMachineTests.swift | 8 +++++++- 4 files changed, 15 insertions(+), 37 deletions(-) diff --git a/Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+HTTP1StateMachine.swift b/Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+HTTP1StateMachine.swift index b7505ed33..2fefd0420 100644 --- a/Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+HTTP1StateMachine.swift +++ b/Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+HTTP1StateMachine.swift @@ -45,18 +45,6 @@ extension HTTPConnectionPool { self.requests = RequestQueue() } - mutating func migrateFromHTTP2( - http2State: HTTP2StateMachine, - newHTTP1Connection: Connection - ) -> Action { - self.migrateFromHTTP2( - http1Connections: http2State.http1Connections, - http2Connections: http2State.connections, - requests: http2State.requests, - newHTTP1Connection: newHTTP1Connection - ) - } - mutating func migrateFromHTTP2( http1Connections: HTTP1Connections? = nil, http2Connections: HTTP2Connections, diff --git a/Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+HTTP2StateMachine.swift b/Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+HTTP2StateMachine.swift index 7d3cfdf28..972d513c8 100644 --- a/Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+HTTP2StateMachine.swift +++ b/Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+HTTP2StateMachine.swift @@ -49,20 +49,6 @@ extension HTTPConnectionPool { self.connections = HTTP2Connections(generator: idGenerator) } - mutating func migrateFromHTTP1( - http1State: HTTP1StateMachine, - newHTTP2Connection: Connection, - maxConcurrentStreams: Int - ) -> Action { - self.migrateFromHTTP1( - http1Connections: http1State.connections, - http2Connections: http1State.http2Connections, - requests: http1State.requests, - newHTTP2Connection: newHTTP2Connection, - maxConcurrentStreams: maxConcurrentStreams - ) - } - mutating func migrateFromHTTP1( http1Connections: HTTP1Connections, http2Connections: HTTP2Connections? = nil, diff --git a/Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+StateMachine.swift b/Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+StateMachine.swift index 8453ff459..c2b621841 100644 --- a/Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+StateMachine.swift +++ b/Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+StateMachine.swift @@ -125,7 +125,9 @@ extension HTTPConnectionPool { ) let newConnectionAction = http1StateMachine.migrateFromHTTP2( - http2State: http2StateMachine, + http1Connections: http2StateMachine.http1Connections, + http2Connections: http2StateMachine.connections, + requests: http2StateMachine.requests, newHTTP1Connection: connection ) self.state = .http1(http1StateMachine) @@ -141,7 +143,9 @@ extension HTTPConnectionPool { idGenerator: self.idGenerator ) let migrationAction = http2StateMachine.migrateFromHTTP1( - http1State: http1StateMachine, + http1Connections: http1StateMachine.connections, + http2Connections: http1StateMachine.http2Connections, + requests: http1StateMachine.requests, newHTTP2Connection: connection, maxConcurrentStreams: maxConcurrentStreams ) @@ -193,15 +197,9 @@ extension HTTPConnectionPool { mutating func failedToCreateNewConnection(_ error: Error, connectionID: Connection.ID) -> Action { self.state.modify(http1: { http1 in - http1.failedToCreateNewConnection( - error, - connectionID: connectionID - ) + http1.failedToCreateNewConnection(error, connectionID: connectionID) }, http2: { http2 in - http2.failedToCreateNewConnection( - error, - connectionID: connectionID - ) + http2.failedToCreateNewConnection(error, connectionID: connectionID) }) } diff --git a/Tests/AsyncHTTPClientTests/HTTPConnectionPool+HTTP2StateMachineTests.swift b/Tests/AsyncHTTPClientTests/HTTPConnectionPool+HTTP2StateMachineTests.swift index 7a80c97ef..191801ee2 100644 --- a/Tests/AsyncHTTPClientTests/HTTPConnectionPool+HTTP2StateMachineTests.swift +++ b/Tests/AsyncHTTPClientTests/HTTPConnectionPool+HTTP2StateMachineTests.swift @@ -312,7 +312,13 @@ class HTTPConnectionPool_HTTP2StateMachineTests: XCTestCase { let conn2: HTTPConnectionPool.Connection = .__testOnly_connection(id: conn2ID, eventLoop: el1) var http2State = HTTPConnectionPool.HTTP2StateMachine(idGenerator: idGenerator) - let http2ConnectAction = http2State.migrateFromHTTP1(http1State: http1State, newHTTP2Connection: conn2, maxConcurrentStreams: 100) + let http2ConnectAction = http2State.migrateFromHTTP1( + http1Connections: http1State.connections, + http2Connections: http1State.http2Connections, + requests: http1State.requests, + newHTTP2Connection: conn2, + maxConcurrentStreams: 100 + ) XCTAssertEqual(http2ConnectAction.connection, .migration(createConnections: [], closeConnections: [], scheduleTimeout: nil)) guard case .executeRequestsAndCancelTimeouts([request2], conn2) = http2ConnectAction.request else { return XCTFail("Unexpected request action \(http2ConnectAction.request)") From 82d02bbb07b15d0819b1c7653006c4783bcdb572 Mon Sep 17 00:00:00 2001 From: David Nadoba Date: Fri, 29 Oct 2021 12:24:39 +0200 Subject: [PATCH 4/6] use `MockConnectionPool` in tests --- ...onnectionPool+HTTP2StateMachineTests.swift | 26 ++++++++++++++++++- 1 file changed, 25 insertions(+), 1 deletion(-) diff --git a/Tests/AsyncHTTPClientTests/HTTPConnectionPool+HTTP2StateMachineTests.swift b/Tests/AsyncHTTPClientTests/HTTPConnectionPool+HTTP2StateMachineTests.swift index 191801ee2..767f2f032 100644 --- a/Tests/AsyncHTTPClientTests/HTTPConnectionPool+HTTP2StateMachineTests.swift +++ b/Tests/AsyncHTTPClientTests/HTTPConnectionPool+HTTP2StateMachineTests.swift @@ -624,6 +624,7 @@ class HTTPConnectionPool_HTTP2StateMachineTests: XCTestCase { /// first new HTTP2 connection should migrate from HTTP1 to HTTP2 and execute requests let conn1: HTTPConnectionPool.Connection = .__testOnly_connection(id: conn1ID, eventLoop: el1) + XCTAssertNoThrow(try connections.succeedConnectionCreationHTTP2(conn1ID, maxConcurrentStreams: 10)) let migrationAction = state.newHTTP2ConnectionCreated(conn1, maxConcurrentStreams: 10) guard case .executeRequestsAndCancelTimeouts(let requests, let conn) = migrationAction.request else { return XCTFail("unexpected request action \(migrationAction.request)") @@ -633,6 +634,7 @@ class HTTPConnectionPool_HTTP2StateMachineTests: XCTestCase { for request in requests { XCTAssertNoThrow(try queuer.get(request.id, request: request.__testOnly_wrapped_request())) + XCTAssertNoThrow(try connections.execute(request.__testOnly_wrapped_request(), on: conn1)) } XCTAssertEqual(migrationAction.connection, .migration( @@ -644,13 +646,16 @@ class HTTPConnectionPool_HTTP2StateMachineTests: XCTestCase { /// remaining connections should be closed immediately without executing any request for connID in connectionIDs.dropFirst() { let conn: HTTPConnectionPool.Connection = .__testOnly_connection(id: connID, eventLoop: el1) + XCTAssertNoThrow(try connections.succeedConnectionCreationHTTP2(connID, maxConcurrentStreams: 10)) let action = state.newHTTP2ConnectionCreated(conn, maxConcurrentStreams: 10) XCTAssertEqual(action.request, .none) XCTAssertEqual(action.connection, .closeConnection(conn, isShutdown: .no)) + XCTAssertNoThrow(try connections.closeConnection(conn)) } /// closing a stream while we have requests queued should result in one request execution action for _ in 0..<6 { + XCTAssertNoThrow(try connections.finishExecution(conn1ID)) let action = state.http2ConnectionStreamClosed(conn1ID) XCTAssertEqual(action.connection, .none) guard case .executeRequestsAndCancelTimeouts(let requests, conn) = action.request else { @@ -659,6 +664,7 @@ class HTTPConnectionPool_HTTP2StateMachineTests: XCTestCase { XCTAssertEqual(requests.count, 1) for request in requests { XCTAssertNoThrow(try queuer.cancel(request.id)) + XCTAssertNoThrow(try connections.execute(request.__testOnly_wrapped_request(), on: conn1)) } } XCTAssertTrue(queuer.isEmpty) @@ -704,16 +710,19 @@ class HTTPConnectionPool_HTTP2StateMachineTests: XCTestCase { /// new http1 connection should execute 1 request for connID in succesfullHTTP1ConnIDs { let conn: HTTPConnectionPool.Connection = .__testOnly_connection(id: connID, eventLoop: el1) + XCTAssertNoThrow(try connections.succeedConnectionCreationHTTP1(connID)) let action = state.newHTTP1ConnectionCreated(conn) guard case .executeRequest(let request, conn, cancelTimeout: true) = action.request else { return XCTFail("unexpected request action \(action.request)") } XCTAssertEqual(action.connection, .none) XCTAssertNoThrow(try queuer.get(request.id, request: request.__testOnly_wrapped_request())) + XCTAssertNoThrow(try connections.execute(request.__testOnly_wrapped_request(), on: conn)) } /// failing connection should backoff connection for connID in failedHTTP1ConnIDs { + XCTAssertNoThrow(try connections.failConnectionCreation(connID)) struct SomeError: Error {} let action = state.failedToCreateNewConnection(SomeError(), connectionID: connID) guard case .scheduleBackoffTimer(connID, backoff: _, let el) = action.connection else { @@ -721,6 +730,7 @@ class HTTPConnectionPool_HTTP2StateMachineTests: XCTestCase { } XCTAssertEqual(action.request, .none) XCTAssertTrue(el === el1) + XCTAssertNoThrow(try connections.startConnectionBackoffTimer(connID)) } let http2ConnectionIDs = Array(connectionIDs.dropFirst(4)) @@ -731,6 +741,7 @@ class HTTPConnectionPool_HTTP2StateMachineTests: XCTestCase { /// first new HTTP2 connection should migrate from HTTP1 to HTTP2 and execute requests let http2Conn: HTTPConnectionPool.Connection = .__testOnly_connection(id: firstHTTP2ConnID, eventLoop: el1) + XCTAssertNoThrow(try connections.succeedConnectionCreationHTTP2(firstHTTP2ConnID, maxConcurrentStreams: 10)) let migrationAction = state.newHTTP2ConnectionCreated(http2Conn, maxConcurrentStreams: 10) guard case .executeRequestsAndCancelTimeouts(let requests, let conn) = migrationAction.request else { return XCTFail("unexpected request action \(migrationAction.request)") @@ -742,19 +753,23 @@ class HTTPConnectionPool_HTTP2StateMachineTests: XCTestCase { for request in requests { XCTAssertNoThrow(try queuer.get(request.id, request: request.__testOnly_wrapped_request())) + XCTAssertNoThrow(try connections.execute(request.__testOnly_wrapped_request(), on: http2Conn)) } /// remaining connections should be closed immediately without executing any request for connID in http2ConnectionIDs.dropFirst() { let conn: HTTPConnectionPool.Connection = .__testOnly_connection(id: connID, eventLoop: el1) + XCTAssertNoThrow(try connections.succeedConnectionCreationHTTP2(connID, maxConcurrentStreams: 10)) let action = state.newHTTP2ConnectionCreated(conn, maxConcurrentStreams: 10) XCTAssertEqual(action.request, .none) XCTAssertEqual(action.connection, .closeConnection(conn, isShutdown: .no)) + XCTAssertNoThrow(try connections.closeConnection(conn)) } /// after a request has finished on a http1 connection, the connection should be closed /// because we are now in http/2 mode for http1ConnectionID in succesfullHTTP1ConnIDs { + XCTAssertNoThrow(try connections.finishExecution(http1ConnectionID)) let action = state.http1ConnectionReleased(http1ConnectionID) XCTAssertEqual(action.request, .none) guard case .closeConnection(let conn, isShutdown: .no) = action.connection else { @@ -766,12 +781,14 @@ class HTTPConnectionPool_HTTP2StateMachineTests: XCTestCase { /// if a backoff timer fires for an old http1 connection we should not start a new connection /// because we are already in http2 mode for http1ConnectionID in failedHTTP1ConnIDs { + XCTAssertNoThrow(try connections.connectionBackoffTimerDone(http1ConnectionID)) let action = state.connectionCreationBackoffDone(http1ConnectionID) XCTAssertEqual(action, .none) } /// closing a stream while we have requests queued should result in one request execution action for _ in 0..<4 { + XCTAssertNoThrow(try connections.finishExecution(http2Conn.id)) let action = state.http2ConnectionStreamClosed(http2Conn.id) XCTAssertEqual(action.connection, .none) guard case .executeRequestsAndCancelTimeouts(let requests, http2Conn) = action.request else { @@ -779,7 +796,8 @@ class HTTPConnectionPool_HTTP2StateMachineTests: XCTestCase { } XCTAssertEqual(requests.count, 1) for request in requests { - XCTAssertNoThrow(try queuer.cancel(request.id)) + XCTAssertNoThrow(try queuer.get(request.id, request: request.__testOnly_wrapped_request())) + XCTAssertNoThrow(try connections.execute(request.__testOnly_wrapped_request(), on: http2Conn)) } } @@ -806,6 +824,7 @@ class HTTPConnectionPool_HTTP2StateMachineTests: XCTestCase { XCTAssertNoThrow(try connections.createConnection(http2ConnID, on: el1)) XCTAssertNoThrow(try queuer.queue(mockRequest, id: request1.id)) let http2Conn: HTTPConnectionPool.Connection = .__testOnly_connection(id: http2ConnID, eventLoop: el1) + XCTAssertNoThrow(try connections.succeedConnectionCreationHTTP2(http2ConnID, maxConcurrentStreams: 10)) let migrationAction1 = state.newHTTP2ConnectionCreated(http2Conn, maxConcurrentStreams: 10) guard case .executeRequestsAndCancelTimeouts(let requests, http2Conn) = migrationAction1.request else { return XCTFail("unexpected request action \(migrationAction1.request)") @@ -814,6 +833,7 @@ class HTTPConnectionPool_HTTP2StateMachineTests: XCTestCase { XCTAssertEqual(requests.count, 1) for request in requests { XCTAssertNoThrow(try queuer.get(request.id, request: request.__testOnly_wrapped_request())) + XCTAssertNoThrow(try connections.execute(request.__testOnly_wrapped_request(), on: http2Conn)) } // a request with new required event loop should create a new connection @@ -830,6 +850,7 @@ class HTTPConnectionPool_HTTP2StateMachineTests: XCTestCase { // if we established a new http/1 connection we should migrate back to http/1 let http1Conn: HTTPConnectionPool.Connection = .__testOnly_connection(id: http1ConnId, eventLoop: el2) + XCTAssertNoThrow(try connections.succeedConnectionCreationHTTP1(http1ConnId)) let migrationAction2 = state.newHTTP1ConnectionCreated(http1Conn) guard case .executeRequest(let request2, http1Conn, cancelTimeout: true) = migrationAction2.request else { return XCTFail("unexpected request action \(migrationAction2.request)") @@ -839,10 +860,13 @@ class HTTPConnectionPool_HTTP2StateMachineTests: XCTestCase { } XCTAssertEqual(createConnections.map { $0.1.id }, [el2.id]) XCTAssertNoThrow(try queuer.get(request2.id, request: request2.__testOnly_wrapped_request())) + XCTAssertNoThrow(try connections.execute(request2.__testOnly_wrapped_request(), on: http1Conn)) // in http/1 state, we should close idle http2 connections + XCTAssertNoThrow(try connections.finishExecution(http2Conn.id)) let releaseAction = state.http2ConnectionStreamClosed(http2Conn.id) XCTAssertEqual(releaseAction.connection, .closeConnection(http2Conn, isShutdown: .no)) XCTAssertEqual(releaseAction.request, .none) + XCTAssertNoThrow(try connections.closeConnection(http2Conn)) } } From d7eba81c979a5f0743a9bb83fafea937b7358230 Mon Sep 17 00:00:00 2001 From: David Nadoba Date: Fri, 29 Oct 2021 13:17:21 +0200 Subject: [PATCH 5/6] fix documentation --- .../State Machine/HTTPConnectionPool+HTTP2Connections.swift | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+HTTP2Connections.swift b/Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+HTTP2Connections.swift index 66a2d69ee..560d5ff0f 100644 --- a/Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+HTTP2Connections.swift +++ b/Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+HTTP2Connections.swift @@ -419,14 +419,14 @@ extension HTTPConnectionPool { self.connections.contains { $0.isActive } } - /// used in general purpose connection scenarios to check if at least one connection is starting or active for the given `eventLoop` + /// used in general purpose connection scenarios to check if at least one connection is starting, backing off or active var hasConnectionThatCanOrWillBeAbleToExecuteRequests: Bool { self.connections.contains { $0.canOrWillBeAbleToExecuteRequests } } /// used in eventLoop scenarios. does at least one connection exist for this eventLoop, or should we create a new one? /// - Parameter eventLoop: connection `EventLoop` to search for - /// - Returns: true if at least one connection is starting or active for the given `eventLoop` + /// - Returns: true if at least one connection is starting, backing off or active for the given `eventLoop` func hasConnectionThatCanOrWillBeAbleToExecuteRequests(for eventLoop: EventLoop) -> Bool { self.connections.contains { $0.eventLoop === eventLoop && $0.canOrWillBeAbleToExecuteRequests From 420ef148230f0c64b1bfd515eff98fe9a0b849fa Mon Sep 17 00:00:00 2001 From: David Nadoba Date: Tue, 2 Nov 2021 11:42:41 +0000 Subject: [PATCH 6/6] fix precondition message --- .../State Machine/HTTPConnectionPool+HTTP2StateMachine.swift | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+HTTP2StateMachine.swift b/Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+HTTP2StateMachine.swift index 972d513c8..94ae3edda 100644 --- a/Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+HTTP2StateMachine.swift +++ b/Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+HTTP2StateMachine.swift @@ -216,7 +216,7 @@ extension HTTPConnectionPool { self.lastConnectFailure = nil if self.connections.hasActiveConnection(for: connection.eventLoop) { guard let (index, _) = self.connections.failConnection(connection.id) else { - preconditionFailure("we connection to a connection which we no nothing about") + preconditionFailure("we have established a new connection that we know nothing about?") } self.connections.removeConnection(at: index) return .init(