diff --git a/Sources/AsyncHTTPClient/ConnectionPool/HTTP1/HTTP1ClientChannelHandler.swift b/Sources/AsyncHTTPClient/ConnectionPool/HTTP1/HTTP1ClientChannelHandler.swift index affe4770c..ac92e4bc8 100644 --- a/Sources/AsyncHTTPClient/ConnectionPool/HTTP1/HTTP1ClientChannelHandler.swift +++ b/Sources/AsyncHTTPClient/ConnectionPool/HTTP1/HTTP1ClientChannelHandler.swift @@ -279,6 +279,7 @@ final class HTTP1ClientChannelHandler: ChannelDuplexHandler { oldRequest.succeedRequest(buffer) case .failure(let error): + context.close(promise: nil) oldRequest.fail(error) } } diff --git a/Tests/AsyncHTTPClientTests/HTTP1ClientChannelHandlerTests+XCTest.swift b/Tests/AsyncHTTPClientTests/HTTP1ClientChannelHandlerTests+XCTest.swift index 86707520c..66c1a48d1 100644 --- a/Tests/AsyncHTTPClientTests/HTTP1ClientChannelHandlerTests+XCTest.swift +++ b/Tests/AsyncHTTPClientTests/HTTP1ClientChannelHandlerTests+XCTest.swift @@ -32,6 +32,7 @@ extension HTTP1ClientChannelHandlerTests { ("testIdleReadTimeoutIsCanceledIfRequestIsCanceled", testIdleReadTimeoutIsCanceledIfRequestIsCanceled), ("testFailHTTPRequestWithContentLengthBecauseOfChannelInactiveWaitingForDemand", testFailHTTPRequestWithContentLengthBecauseOfChannelInactiveWaitingForDemand), ("testWriteHTTPHeadFails", testWriteHTTPHeadFails), + ("testHandlerClosesChannelIfLastActionIsSendEndAndItFails", testHandlerClosesChannelIfLastActionIsSendEndAndItFails), ] } } diff --git a/Tests/AsyncHTTPClientTests/HTTP1ClientChannelHandlerTests.swift b/Tests/AsyncHTTPClientTests/HTTP1ClientChannelHandlerTests.swift index 4769d2c7e..f97580372 100644 --- a/Tests/AsyncHTTPClientTests/HTTP1ClientChannelHandlerTests.swift +++ b/Tests/AsyncHTTPClientTests/HTTP1ClientChannelHandlerTests.swift @@ -457,6 +457,75 @@ class HTTP1ClientChannelHandlerTests: XCTestCase { XCTAssertEqual(embedded.isActive, false) } } + + func testHandlerClosesChannelIfLastActionIsSendEndAndItFails() { + let embedded = EmbeddedChannel() + let testWriter = TestBackpressureWriter(eventLoop: embedded.eventLoop, parts: 5) + var maybeTestUtils: HTTP1TestTools? + XCTAssertNoThrow(maybeTestUtils = try embedded.setupHTTP1Connection()) + guard let testUtils = maybeTestUtils else { return XCTFail("Expected connection setup works") } + + var maybeRequest: HTTPClient.Request? + XCTAssertNoThrow(maybeRequest = try HTTPClient.Request(url: "http://localhost/", method: .POST, body: .stream(length: 10) { writer in + testWriter.start(writer: writer) + })) + guard let request = maybeRequest else { return XCTFail("Expected to be able to create a request") } + + let delegate = ResponseAccumulator(request: request) + var maybeRequestBag: RequestBag? + XCTAssertNoThrow(maybeRequestBag = try RequestBag( + request: request, + eventLoopPreference: .delegate(on: embedded.eventLoop), + task: .init(eventLoop: embedded.eventLoop, logger: testUtils.logger), + redirectHandler: nil, + connectionDeadline: .now() + .seconds(30), + requestOptions: .forTests(idleReadTimeout: .milliseconds(200)), + delegate: delegate + )) + guard let requestBag = maybeRequestBag else { return XCTFail("Expected to be able to create a request bag") } + + XCTAssertNoThrow(try embedded.pipeline.addHandler(FailEndHandler(), position: .first).wait()) + + // Execute the request and we'll receive the head. + testWriter.writabilityChanged(true) + testUtils.connection.executeRequest(requestBag) + XCTAssertNoThrow(try embedded.receiveHeadAndVerify { + XCTAssertEqual($0.method, .POST) + XCTAssertEqual($0.uri, "/") + XCTAssertEqual($0.headers.first(name: "host"), "localhost") + XCTAssertEqual($0.headers.first(name: "content-length"), "10") + }) + // We're going to immediately send the response head and end. + let responseHead = HTTPResponseHead(version: .http1_1, status: .ok) + XCTAssertNoThrow(try embedded.writeInbound(HTTPClientResponsePart.head(responseHead))) + embedded.read() + + // Send the end and confirm the connection is still live. + XCTAssertNoThrow(try embedded.writeInbound(HTTPClientResponsePart.end(nil))) + XCTAssertEqual(testUtils.connectionDelegate.hitConnectionClosed, 0) + XCTAssertEqual(testUtils.connectionDelegate.hitConnectionReleased, 0) + + // Ok, now we can process some reads. We expect 5 reads, but we do _not_ expect an .end, because + // the `FailEndHandler` is going to fail it. + embedded.embeddedEventLoop.run() + XCTAssertEqual(testWriter.written, 5) + for _ in 0..<5 { + XCTAssertNoThrow(try embedded.receiveBodyAndVerify { + XCTAssertEqual($0.readableBytes, 2) + }) + } + + embedded.embeddedEventLoop.run() + XCTAssertNil(try embedded.readOutbound(as: HTTPClientRequestPart.self)) + + // We should have seen the connection close, and the request is complete. + XCTAssertEqual(testUtils.connectionDelegate.hitConnectionClosed, 1) + XCTAssertEqual(testUtils.connectionDelegate.hitConnectionReleased, 0) + + XCTAssertThrowsError(try requestBag.task.futureResult.wait()) { error in + XCTAssertTrue(error is FailEndHandler.Error) + } + } } class TestBackpressureWriter { @@ -636,3 +705,19 @@ class ReadEventHitHandler: ChannelOutboundHandler { context.read() } } + +final class FailEndHandler: ChannelOutboundHandler { + typealias OutboundIn = HTTPClientRequestPart + typealias OutboundOut = HTTPClientRequestPart + + struct Error: Swift.Error {} + + func write(context: ChannelHandlerContext, data: NIOAny, promise: EventLoopPromise?) { + if case .end = self.unwrapOutboundIn(data) { + // We fail this. + promise?.fail(Self.Error()) + } else { + context.write(data, promise: promise) + } + } +}