Skip to content
This repository was archived by the owner on Apr 23, 2021. It is now read-only.

Avoid trouble with shutting down HTTPClient #45

Merged
merged 1 commit into from
Jun 22, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 16 additions & 1 deletion UseCases/Sources/HTTPEndToEnd/InstrumentedHTTPClient.swift
Original file line number Diff line number Diff line change
Expand Up @@ -6,21 +6,36 @@ import NIO
import NIOHTTP1

struct InstrumentedHTTPClient {
private let client = HTTPClient(eventLoopGroupProvider: .createNew)
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we were ignoring the init's passed in value here; so fixed this

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wow, how did I miss that? 🤦‍♂️😁

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

happens :)

private let client: HTTPClient
private let instrument: Instrument<HTTPHeaders, HTTPHeaders>

init<I>(instrument: I, eventLoopGroupProvider: HTTPClient.EventLoopGroupProvider)
where
I: InstrumentProtocol,
I.InjectInto == HTTPHeaders,
I.ExtractFrom == HTTPHeaders {
self.client = HTTPClient(eventLoopGroupProvider: eventLoopGroupProvider)
self.instrument = Instrument(instrument)
}

// TODO: deadline: NIODeadline? would move into baggage?
public func get(url: String, baggage: BaggageContext = .init()) -> EventLoopFuture<HTTPClient.Response> {
do {
let request = try HTTPClient.Request(url: url, method: .GET)
return self.execute(request: request, baggage: baggage)
} catch {
return self.client.eventLoopGroup.next().makeFailedFuture(error)
}
}

func execute(request: HTTPClient.Request, baggage: BaggageContext) -> EventLoopFuture<HTTPClient.Response> {
var request = request
self.instrument.inject(from: baggage, into: &request.headers)
baggage.logger.info("AsyncHTTPClient: Execute request")
return self.client.execute(request: request)
}

func syncShutdown() throws {
try self.client.syncShutdown()
}
}
15 changes: 7 additions & 8 deletions UseCases/Sources/HTTPEndToEnd/Services/OrderServiceHandler.swift
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,15 @@ final class OrderServiceHandler: ChannelInboundHandler {
typealias InboundIn = HTTPServerRequestPart
typealias OutboundOut = HTTPServerResponsePart

private let httpClient: InstrumentedHTTPClient
private let instrument: Instrument<HTTPHeaders, HTTPHeaders>

init<I>(instrument: I)
init<I>(httpClient: InstrumentedHTTPClient, instrument: I)
where I: InstrumentProtocol,
I.InjectInto == HTTPHeaders,
I.ExtractFrom == HTTPHeaders {

self.httpClient = httpClient
self.instrument = Instrument(instrument)
}

Expand All @@ -30,20 +33,16 @@ final class OrderServiceHandler: ChannelInboundHandler {

baggage.logger.info("Handling order service request")

let client = InstrumentedHTTPClient(
instrument: self.instrument,
eventLoopGroupProvider: .shared(context.eventLoop)
)

let request = try! HTTPClient.Request(url: "http://localhost:8081")
client.execute(request: request, baggage: baggage).whenComplete { _ in
httpClient.execute(request: request, baggage: baggage).whenComplete { _ in
let responseHead = HTTPResponseHead(version: requestHead.version, status: .ok)
context.eventLoop.execute {
context.channel.write(self.wrapOutboundOut(.head(responseHead)), promise: nil)
context.channel.write(self.wrapOutboundOut(.end(nil)), promise: nil)
context.channel.flush()
}
}

// TODO: Shut down client before deinit
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,12 @@ final class StorageServiceHandler: ChannelInboundHandler {
typealias InboundIn = HTTPServerRequestPart
typealias OutboundOut = HTTPServerResponsePart

let logger = Logger(label: "StorageService")

func channelRead(context: ChannelHandlerContext, data: NIOAny) {
guard case .head(let requestHead) = self.unwrapInboundIn(data) else { return }

var baggage = BaggageContext()
let logger = Logger(label: "StorageService")
baggage[BaggageContext.BaseLoggerKey.self] = logger
baggage.logger.info("Handling storage service request")

Expand Down
10 changes: 5 additions & 5 deletions UseCases/Sources/HTTPEndToEnd/main.swift
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,15 @@ import NIOHTTP1
import NIO

let eventLoopGroup = MultiThreadedEventLoopGroup(numberOfThreads: System.coreCount)
let threadPool = NIOThreadPool(numberOfThreads: 6)
threadPool.start()
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i don't think we'll need one?


let httpClient = InstrumentedHTTPClient(instrument: FakeTracer(), eventLoopGroupProvider: .createNew)

let orderServiceBootstrap = ServerBootstrap(group: eventLoopGroup)
.serverChannelOption(ChannelOptions.backlog, value: 256)
.serverChannelOption(ChannelOptions.socketOption(.so_reuseaddr), value: 1)
.childChannelInitializer { channel in
channel.pipeline.configureHTTPServerPipeline().flatMap {
channel.pipeline.addHandler(OrderServiceHandler(instrument: FakeTracer()))
channel.pipeline.addHandler(OrderServiceHandler(httpClient: httpClient, instrument: FakeTracer()))
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

}
}
.childChannelOption(ChannelOptions.socketOption(.so_reuseaddr), value: 1)
Expand All @@ -44,7 +44,6 @@ logger.info("Order service listening on ::1:8080")
let storageServiceChannel = try storageServiceBootstrap.bind(host: "localhost", port: 8081).wait()
logger.info("Storage service listening on ::1:8081")

let httpClient = HTTPClient(eventLoopGroupProvider: .createNew)
httpClient.get(url: "http://localhost:8080").whenComplete { result in
switch result {
case .success(let response):
Expand All @@ -57,8 +56,9 @@ httpClient.get(url: "http://localhost:8080").whenComplete { result in
sleep(20)

try httpClient.syncShutdown()
try orderServiceChannel.close().wait()
try storageServiceChannel.close().wait()
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess we don't really have to right now since we kill the EL, but this would be a good way if we had any cleanup to do in channelRemoved -- but we don't... So consider this optional here I guess

try eventLoopGroup.syncShutdownGracefully()
try threadPool.syncShutdownGracefully()

// MARK: - Fake Tracer

Expand Down