Skip to content

Commit 4d726ba

Browse files
authored
Add ConnectionPool HTTP1StateMachine (#416)
1 parent 05e570d commit 4d726ba

12 files changed

+1480
-31
lines changed

Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+HTTP1Connections.swift

Lines changed: 10 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -375,6 +375,11 @@ extension HTTPConnectionPool {
375375
self.connections[index].lease()
376376
}
377377

378+
func parkConnection(at index: Int) -> (Connection.ID, EventLoop) {
379+
precondition(self.connections[index].isIdle)
380+
return (self.connections[index].connectionID, self.connections[index].eventLoop)
381+
}
382+
378383
/// A new HTTP/1.1 connection was released.
379384
///
380385
/// This will put the position into the idle state.
@@ -446,12 +451,13 @@ extension HTTPConnectionPool {
446451
/// This will put the position into the closed state.
447452
///
448453
/// - Parameter connectionID: The failed connection's id.
449-
/// - Returns: An index and an IdleConnectionContext to determine the next action for the now closed connection.
454+
/// - Returns: An optional index and an IdleConnectionContext to determine the next action for the closed connection.
450455
/// You must call ``removeConnection(at:)`` or ``replaceConnection(at:)`` with the
451-
/// supplied index after this.
452-
mutating func failConnection(_ connectionID: Connection.ID) -> (Int, FailedConnectionContext) {
456+
/// supplied index after this. If nil is returned the connection was closed by the state machine and was
457+
/// therefore already removed.
458+
mutating func failConnection(_ connectionID: Connection.ID) -> (Int, FailedConnectionContext)? {
453459
guard let index = self.connections.firstIndex(where: { $0.connectionID == connectionID }) else {
454-
preconditionFailure("We tried to fail a new connection that we know nothing about?")
460+
return nil
455461
}
456462

457463
let use: ConnectionUse
@@ -607,22 +613,4 @@ extension HTTPConnectionPool {
607613
var connecting: Int = 0
608614
var backingOff: Int = 0
609615
}
610-
611-
/// The pool cleanup todo list.
612-
struct CleanupContext: Equatable {
613-
/// the connections to close right away. These are idle.
614-
var close: [Connection]
615-
616-
/// the connections that currently run a request that needs to be cancelled to close the connections
617-
var cancel: [Connection]
618-
619-
/// the connections that are backing off from connection creation
620-
var connectBackoff: [Connection.ID]
621-
622-
init(close: [Connection] = [], cancel: [Connection] = [], connectBackoff: [Connection.ID] = []) {
623-
self.close = close
624-
self.cancel = cancel
625-
self.connectBackoff = connectBackoff
626-
}
627-
}
628616
}

Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+HTTP1StateMachine.swift

Lines changed: 462 additions & 0 deletions
Large diffs are not rendered by default.

Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+RequestQueue.swift

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -33,11 +33,12 @@ extension HTTPConnectionPool {
3333
self.count == 0
3434
}
3535

36-
func count(for eventLoop: EventLoop?) -> Int {
37-
if let eventLoop = eventLoop {
38-
return self.withEventLoopQueueIfAvailable(for: eventLoop.id) { $0.count } ?? 0
39-
}
40-
return self.generalPurposeQueue.count
36+
var generalPurposeCount: Int {
37+
self.generalPurposeQueue.count
38+
}
39+
40+
func count(for eventLoop: EventLoop) -> Int {
41+
self.withEventLoopQueueIfAvailable(for: eventLoop.id) { $0.count } ?? 0
4142
}
4243

4344
func isEmpty(for eventLoop: EventLoop?) -> Bool {
Lines changed: 220 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,220 @@
1+
//===----------------------------------------------------------------------===//
2+
//
3+
// This source file is part of the AsyncHTTPClient open source project
4+
//
5+
// Copyright (c) 2021 Apple Inc. and the AsyncHTTPClient project authors
6+
// Licensed under Apache License v2.0
7+
//
8+
// See LICENSE.txt for license information
9+
// See CONTRIBUTORS.txt for the list of AsyncHTTPClient project authors
10+
//
11+
// SPDX-License-Identifier: Apache-2.0
12+
//
13+
//===----------------------------------------------------------------------===//
14+
15+
import NIO
16+
import NIOHTTP1
17+
18+
extension HTTPConnectionPool {
19+
struct StateMachine {
20+
struct Action {
21+
let request: RequestAction
22+
let connection: ConnectionAction
23+
24+
init(request: RequestAction, connection: ConnectionAction) {
25+
self.request = request
26+
self.connection = connection
27+
}
28+
29+
static let none: Action = Action(request: .none, connection: .none)
30+
}
31+
32+
enum ConnectionAction {
33+
enum IsShutdown: Equatable {
34+
case yes(unclean: Bool)
35+
case no
36+
}
37+
38+
case createConnection(Connection.ID, on: EventLoop)
39+
case scheduleBackoffTimer(Connection.ID, backoff: TimeAmount, on: EventLoop)
40+
41+
case scheduleTimeoutTimer(Connection.ID, on: EventLoop)
42+
case cancelTimeoutTimer(Connection.ID)
43+
44+
case closeConnection(Connection, isShutdown: IsShutdown)
45+
case cleanupConnections(CleanupContext, isShutdown: IsShutdown)
46+
47+
case none
48+
}
49+
50+
enum RequestAction {
51+
case executeRequest(Request, Connection, cancelTimeout: Bool)
52+
case executeRequestsAndCancelTimeouts([Request], Connection)
53+
54+
case failRequest(Request, Error, cancelTimeout: Bool)
55+
case failRequestsAndCancelTimeouts([Request], Error)
56+
57+
case scheduleRequestTimeout(for: Request, on: EventLoop)
58+
case cancelRequestTimeout(Request.ID)
59+
60+
case none
61+
}
62+
63+
enum HTTPVersionState {
64+
case http1(HTTP1StateMachine)
65+
}
66+
67+
var state: HTTPVersionState
68+
var isShuttingDown: Bool = false
69+
70+
let eventLoopGroup: EventLoopGroup
71+
let maximumConcurrentHTTP1Connections: Int
72+
73+
init(eventLoopGroup: EventLoopGroup, idGenerator: Connection.ID.Generator, maximumConcurrentHTTP1Connections: Int) {
74+
self.maximumConcurrentHTTP1Connections = maximumConcurrentHTTP1Connections
75+
let http1State = HTTP1StateMachine(
76+
idGenerator: idGenerator,
77+
maximumConcurrentConnections: maximumConcurrentHTTP1Connections
78+
)
79+
self.state = .http1(http1State)
80+
self.eventLoopGroup = eventLoopGroup
81+
}
82+
83+
mutating func executeRequest(_ request: Request) -> Action {
84+
switch self.state {
85+
case .http1(var http1StateMachine):
86+
let action = http1StateMachine.executeRequest(request)
87+
self.state = .http1(http1StateMachine)
88+
return action
89+
}
90+
}
91+
92+
mutating func newHTTP1ConnectionCreated(_ connection: Connection) -> Action {
93+
switch self.state {
94+
case .http1(var http1StateMachine):
95+
let action = http1StateMachine.newHTTP1ConnectionEstablished(connection)
96+
self.state = .http1(http1StateMachine)
97+
return action
98+
}
99+
}
100+
101+
mutating func failedToCreateNewConnection(_ error: Error, connectionID: Connection.ID) -> Action {
102+
switch self.state {
103+
case .http1(var http1StateMachine):
104+
let action = http1StateMachine.failedToCreateNewConnection(
105+
error,
106+
connectionID: connectionID
107+
)
108+
self.state = .http1(http1StateMachine)
109+
return action
110+
}
111+
}
112+
113+
mutating func connectionCreationBackoffDone(_ connectionID: Connection.ID) -> Action {
114+
switch self.state {
115+
case .http1(var http1StateMachine):
116+
let action = http1StateMachine.connectionCreationBackoffDone(connectionID)
117+
self.state = .http1(http1StateMachine)
118+
return action
119+
}
120+
}
121+
122+
/// A request has timed out.
123+
///
124+
/// This is different to a request being cancelled. If a request times out, we need to fail the
125+
/// request, but don't need to cancel the timer (it already triggered). If a request is cancelled
126+
/// we don't need to fail it but we need to cancel its timeout timer.
127+
mutating func timeoutRequest(_ requestID: Request.ID) -> Action {
128+
switch self.state {
129+
case .http1(var http1StateMachine):
130+
let action = http1StateMachine.timeoutRequest(requestID)
131+
self.state = .http1(http1StateMachine)
132+
return action
133+
}
134+
}
135+
136+
/// A request was cancelled.
137+
///
138+
/// This is different to a request timing out. If a request is cancelled we don't need to fail it but we
139+
/// need to cancel its timeout timer. If a request times out, we need to fail the request, but don't
140+
/// need to cancel the timer (it already triggered).
141+
mutating func cancelRequest(_ requestID: Request.ID) -> Action {
142+
switch self.state {
143+
case .http1(var http1StateMachine):
144+
let action = http1StateMachine.cancelRequest(requestID)
145+
self.state = .http1(http1StateMachine)
146+
return action
147+
}
148+
}
149+
150+
mutating func connectionIdleTimeout(_ connectionID: Connection.ID) -> Action {
151+
switch self.state {
152+
case .http1(var http1StateMachine):
153+
let action = http1StateMachine.connectionIdleTimeout(connectionID)
154+
self.state = .http1(http1StateMachine)
155+
return action
156+
}
157+
}
158+
159+
/// A connection has been closed
160+
mutating func connectionClosed(_ connectionID: Connection.ID) -> Action {
161+
switch self.state {
162+
case .http1(var http1StateMachine):
163+
let action = http1StateMachine.connectionClosed(connectionID)
164+
self.state = .http1(http1StateMachine)
165+
return action
166+
}
167+
}
168+
169+
mutating func http1ConnectionReleased(_ connectionID: Connection.ID) -> Action {
170+
switch self.state {
171+
case .http1(var http1StateMachine):
172+
let action = http1StateMachine.http1ConnectionReleased(connectionID)
173+
self.state = .http1(http1StateMachine)
174+
return action
175+
}
176+
}
177+
178+
mutating func shutdown() -> Action {
179+
precondition(!self.isShuttingDown, "Shutdown must only be called once")
180+
181+
self.isShuttingDown = true
182+
183+
switch self.state {
184+
case .http1(var http1StateMachine):
185+
let action = http1StateMachine.shutdown()
186+
self.state = .http1(http1StateMachine)
187+
return action
188+
}
189+
}
190+
}
191+
}
192+
193+
extension HTTPConnectionPool {
194+
/// The pool cleanup todo list.
195+
struct CleanupContext: Equatable {
196+
/// the connections to close right away. These are idle.
197+
var close: [Connection]
198+
199+
/// the connections that currently run a request that needs to be cancelled to close the connections
200+
var cancel: [Connection]
201+
202+
/// the connections that are backing off from connection creation
203+
var connectBackoff: [Connection.ID]
204+
205+
init(close: [Connection] = [], cancel: [Connection] = [], connectBackoff: [Connection.ID] = []) {
206+
self.close = close
207+
self.cancel = cancel
208+
self.connectBackoff = connectBackoff
209+
}
210+
}
211+
}
212+
213+
extension HTTPConnectionPool.StateMachine: CustomStringConvertible {
214+
var description: String {
215+
switch self.state {
216+
case .http1(let http1):
217+
return ".http1(\(http1))"
218+
}
219+
}
220+
}

Sources/AsyncHTTPClient/HTTPClient.swift

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -925,6 +925,7 @@ public struct HTTPClientError: Error, Equatable, CustomStringConvertible {
925925
case tlsHandshakeTimeout
926926
case serverOfferedUnsupportedApplicationProtocol(String)
927927
case requestStreamCancelled
928+
case getConnectionFromPoolTimeout
928929
}
929930

930931
private var code: Code
@@ -997,4 +998,11 @@ public struct HTTPClientError: Error, Equatable, CustomStringConvertible {
997998
/// The remote server responded with a status code >= 300, before the full request was sent. The request stream
998999
/// was therefore cancelled
9991000
public static let requestStreamCancelled = HTTPClientError(code: .requestStreamCancelled)
1001+
1002+
/// Aquiring a HTTP connection from the connection pool timed out.
1003+
///
1004+
/// This can have multiple reasons:
1005+
/// - A connection could not be created within the timout period.
1006+
/// - Tasks are not processed fast enough on the existing connections, to process all waiters in time
1007+
public static let getConnectionFromPoolTimeout = HTTPClientError(code: .getConnectionFromPoolTimeout)
10001008
}

Tests/AsyncHTTPClientTests/HTTPConnectionPool+HTTP1ConnectionsTest.swift

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,9 @@ class HTTPConnectionPool_HTTP1ConnectionsTests: XCTestCase {
6868
let backoff1EL = connections.backoffNextConnectionAttempt(conn1ID)
6969
XCTAssert(backoff1EL === el1)
7070
// backoff done. 2. decide what's next
71-
let (conn1FailIndex, conn1FailContext) = connections.failConnection(conn1ID)
71+
guard let (conn1FailIndex, conn1FailContext) = connections.failConnection(conn1ID) else {
72+
return XCTFail("Expected that the connection is remembered")
73+
}
7274
XCTAssert(conn1FailContext.eventLoop === el1)
7375
XCTAssertEqual(conn1FailContext.use, .generalPurpose)
7476
XCTAssertEqual(conn1FailContext.connectionsStartingForUseCase, 0)
@@ -83,7 +85,9 @@ class HTTPConnectionPool_HTTP1ConnectionsTests: XCTestCase {
8385
XCTAssertEqual(connections.startingEventLoopConnections(on: el2), 1)
8486
let backoff2EL = connections.backoffNextConnectionAttempt(conn2ID)
8587
XCTAssert(backoff2EL === el2)
86-
let (conn2FailIndex, conn2FailContext) = connections.failConnection(conn2ID)
88+
guard let (conn2FailIndex, conn2FailContext) = connections.failConnection(conn2ID) else {
89+
return XCTFail("Expected that the connection is remembered")
90+
}
8791
XCTAssert(conn2FailContext.eventLoop === el2)
8892
XCTAssertEqual(conn2FailContext.use, .eventLoop(el2))
8993
XCTAssertEqual(conn2FailContext.connectionsStartingForUseCase, 0)
@@ -329,7 +333,9 @@ class HTTPConnectionPool_HTTP1ConnectionsTests: XCTestCase {
329333
XCTAssertEqual(connections.closeConnection(at: releaseIndex), lease)
330334
XCTAssertFalse(connections.isEmpty)
331335

332-
let (failIndex, _) = connections.failConnection(startingID)
336+
guard let (failIndex, _) = connections.failConnection(startingID) else {
337+
return XCTFail("Expected that the connection is remembered")
338+
}
333339
connections.removeConnection(at: failIndex)
334340
XCTAssertTrue(connections.isEmpty)
335341
}
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
//===----------------------------------------------------------------------===//
2+
//
3+
// This source file is part of the AsyncHTTPClient open source project
4+
//
5+
// Copyright (c) 2018-2019 Apple Inc. and the AsyncHTTPClient project authors
6+
// Licensed under Apache License v2.0
7+
//
8+
// See LICENSE.txt for license information
9+
// See CONTRIBUTORS.txt for the list of AsyncHTTPClient project authors
10+
//
11+
// SPDX-License-Identifier: Apache-2.0
12+
//
13+
//===----------------------------------------------------------------------===//
14+
//
15+
// HTTPConnectionPool+HTTP1StateTests+XCTest.swift
16+
//
17+
import XCTest
18+
19+
///
20+
/// NOTE: This file was generated by generate_linux_tests.rb
21+
///
22+
/// Do NOT edit this file directly as it will be regenerated automatically when needed.
23+
///
24+
25+
extension HTTPConnectionPool_HTTP1StateMachineTests {
26+
static var allTests: [(String, (HTTPConnectionPool_HTTP1StateMachineTests) -> () throws -> Void)] {
27+
return [
28+
("testCreatingAndFailingConnections", testCreatingAndFailingConnections),
29+
("testConnectionFailureBackoff", testConnectionFailureBackoff),
30+
("testCancelRequestWorks", testCancelRequestWorks),
31+
("testExecuteOnShuttingDownPool", testExecuteOnShuttingDownPool),
32+
("testRequestsAreQueuedIfAllConnectionsAreInUseAndRequestsAreDequeuedInOrder", testRequestsAreQueuedIfAllConnectionsAreInUseAndRequestsAreDequeuedInOrder),
33+
("testBestConnectionIsPicked", testBestConnectionIsPicked),
34+
("testConnectionAbortIsIgnoredIfThereAreNoQueuedRequests", testConnectionAbortIsIgnoredIfThereAreNoQueuedRequests),
35+
("testConnectionCloseLeadsToTumbleWeedIfThereNoQueuedRequests", testConnectionCloseLeadsToTumbleWeedIfThereNoQueuedRequests),
36+
("testConnectionAbortLeadsToNewConnectionsIfThereAreQueuedRequests", testConnectionAbortLeadsToNewConnectionsIfThereAreQueuedRequests),
37+
("testParkedConnectionTimesOut", testParkedConnectionTimesOut),
38+
("testConnectionPoolFullOfParkedConnectionsIsShutdownImmediately", testConnectionPoolFullOfParkedConnectionsIsShutdownImmediately),
39+
("testParkedConnectionTimesOutButIsAlsoClosedByRemote", testParkedConnectionTimesOutButIsAlsoClosedByRemote),
40+
]
41+
}
42+
}

0 commit comments

Comments
 (0)