From 84a15ca4569c056b1c91c4e76f2430ba77c5ddc3 Mon Sep 17 00:00:00 2001 From: tom doron Date: Fri, 1 May 2020 14:01:13 -0700 Subject: [PATCH 1/3] add debug functionality to test with mock server motivation: allow end to end testing locally changes: * add a Lambda+LocalServer which exposes Lambda.withLocalServer available only in DEBUG mode * local server can recieve POST requests with payloads on a configurable endpoint and and send them to the Lambda * add a "noContent" mode to Lambda runtime to allow polling --- Package.swift | 7 +- .../AWSLambdaRuntime/Lambda+LocalServer.swift | 222 ++++++++++++++++++ Sources/AWSLambdaRuntime/LambdaContext.swift | 6 +- Sources/AWSLambdaRuntime/LambdaRunner.swift | 13 +- Sources/AWSLambdaTesting/Lambda+Testing.swift | 20 +- Sources/StringSample/main.swift | 4 +- docker/docker-compose.1804.53.yaml | 2 +- 7 files changed, 264 insertions(+), 10 deletions(-) create mode 100644 Sources/AWSLambdaRuntime/Lambda+LocalServer.swift diff --git a/Package.swift b/Package.swift index 624cf6b5..88bb776f 100644 --- a/Package.swift +++ b/Package.swift @@ -36,12 +36,11 @@ let package = Package( .product(name: "NIO", package: "swift-nio"), ]), .testTarget(name: "AWSLambdaTestingTests", dependencies: ["AWSLambdaTesting"]), - // samples - .target(name: "StringSample", dependencies: ["AWSLambdaRuntime"]), - .target(name: "CodableSample", dependencies: ["AWSLambdaRuntime"]), - // perf tests + // for perf testing .target(name: "MockServer", dependencies: [ .product(name: "NIOHTTP1", package: "swift-nio"), ]), + .target(name: "StringSample", dependencies: ["AWSLambdaRuntime"]), + .target(name: "CodableSample", dependencies: ["AWSLambdaRuntime"]), ] ) diff --git a/Sources/AWSLambdaRuntime/Lambda+LocalServer.swift b/Sources/AWSLambdaRuntime/Lambda+LocalServer.swift new file mode 100644 index 00000000..731ca597 --- /dev/null +++ b/Sources/AWSLambdaRuntime/Lambda+LocalServer.swift @@ -0,0 +1,222 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the SwiftAWSLambdaRuntime open source project +// +// Copyright (c) 2020 Apple Inc. and the SwiftAWSLambdaRuntime project authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See CONTRIBUTORS.txt for the list of SwiftAWSLambdaRuntime project authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// + +import Dispatch +import Logging +import NIO +import NIOConcurrencyHelpers +import NIOHTTP1 + +// This functionality is designed for local testing hence beind a #if DEBUG flag. +// For example: +// +// try Lambda.withLocalServer { +// Lambda.run { (context: Lambda.Context, payload: String, callback: @escaping (Result) -> Void) in +// callback(.success("Hello, \(payload)!")) +// } +// } + +#if DEBUG +extension Lambda { + /// Execute code in the context of a mock Lambda server. + /// + /// - parameters: + /// - invocationEndpoint: The endpoint to post payloads to. + /// - body: Code to run within the context of the mock server. Typically this would be a Lambda.run function call. + /// + /// - note: This API is designed stricly for local testing and is behind a DEBUG flag + public static func withLocalServer(invocationEndpoint: String? = nil, _ body: @escaping () -> Void) throws { + let server = LocalLambda.Server(invocationEndpoint: invocationEndpoint) + try server.start().wait() + defer { try! server.stop() } // FIXME: + body() + } +} + +// MARK: - Local Mock Server + +private enum LocalLambda { + struct Server { + private let logger: Logger + private let group: EventLoopGroup + private let host: String + private let port: Int + private let invocationEndpoint: String + + public init(invocationEndpoint: String?) { + let configuration = Lambda.Configuration() + var logger = Logger(label: "LocalLambdaServer") + logger.logLevel = configuration.general.logLevel + self.logger = logger + self.group = MultiThreadedEventLoopGroup(numberOfThreads: System.coreCount) + self.host = configuration.runtimeEngine.ip + self.port = configuration.runtimeEngine.port + self.invocationEndpoint = invocationEndpoint ?? "/invoke" + } + + func start() -> EventLoopFuture { + let bootstrap = ServerBootstrap(group: group) + .serverChannelOption(ChannelOptions.socket(SocketOptionLevel(SOL_SOCKET), SO_REUSEADDR), value: 1) + .childChannelInitializer { channel in + channel.pipeline.configureHTTPServerPipeline(withErrorHandling: true).flatMap { _ in + channel.pipeline.addHandler(HTTPHandler(logger: self.logger, invocationEndpoint: self.invocationEndpoint)) + } + } + return bootstrap.bind(host: self.host, port: self.port).flatMap { channel -> EventLoopFuture in + guard channel.localAddress != nil else { + return channel.eventLoop.makeFailedFuture(ServerError.cantBind) + } + self.logger.info("LocalLambdaServer started and listening on \(self.host):\(self.port), receiving payloads on \(self.invocationEndpoint)") + return channel.eventLoop.makeSucceededFuture(()) + } + } + + func stop() throws { + try self.group.syncShutdownGracefully() + } + } + + final class HTTPHandler: ChannelInboundHandler { + public typealias InboundIn = HTTPServerRequestPart + public typealias OutboundOut = HTTPServerResponsePart + + private static let queueLock = Lock() + private static var queue = [String: Pending]() + + private var processing = CircularBuffer<(head: HTTPRequestHead, body: ByteBuffer?)>() + + private let logger: Logger + private let invocationEndpoint: String + + init(logger: Logger, invocationEndpoint: String) { + self.logger = logger + self.invocationEndpoint = invocationEndpoint + } + + func channelRead(context: ChannelHandlerContext, data: NIOAny) { + let requestPart = unwrapInboundIn(data) + + switch requestPart { + case .head(let head): + self.processing.append((head: head, body: nil)) + case .body(var buffer): + var request = self.processing.removeFirst() + if request.body == nil { + request.body = buffer + } else { + request.body!.writeBuffer(&buffer) + } + self.processing.prepend(request) + case .end: + let request = self.processing.removeFirst() + self.processRequest(context: context, request: request) + } + } + + func processRequest(context: ChannelHandlerContext, request: (head: HTTPRequestHead, body: ByteBuffer?)) { + if request.head.uri.hasSuffix(self.invocationEndpoint) { + if let work = request.body { + let requestId = "\(DispatchTime.now().uptimeNanoseconds)" // FIXME: + let promise = context.eventLoop.makePromise(of: Response.self) + promise.futureResult.whenComplete { result in + switch result { + case .success(let response): + self.writeResponse(context: context, response: response) + case .failure: + self.writeResponse(context: context, response: .init(status: .internalServerError)) + } + } + Self.queueLock.withLock { + Self.queue[requestId] = Pending(requestId: requestId, request: work, responsePromise: promise) + } + } + } else if request.head.uri.hasSuffix("/next") { + switch (Self.queueLock.withLock { Self.queue.popFirst() }) { + case .none: + self.writeResponse(context: context, response: .init(status: .noContent)) + case .some(let pending): + var response = Response() + response.body = pending.value.request + // required headers + response.headers = [ + (AmazonHeaders.requestID, pending.key), + (AmazonHeaders.invokedFunctionARN, "arn:aws:lambda:us-east-1:\(Int16.random(in: Int16.min ... Int16.max)):function:custom-runtime"), + (AmazonHeaders.traceID, "Root=\(Int16.random(in: Int16.min ... Int16.max));Parent=\(Int16.random(in: Int16.min ... Int16.max));Sampled=1"), + (AmazonHeaders.deadline, "\(DispatchWallTime.distantFuture.millisSinceEpoch)"), + ] + Self.queueLock.withLock { + Self.queue[pending.key] = pending.value + } + self.writeResponse(context: context, response: response) + } + + } else if request.head.uri.hasSuffix("/response") { + let parts = request.head.uri.split(separator: "/") + guard let requestId = parts.count > 2 ? String(parts[parts.count - 2]) : nil else { + return self.writeResponse(context: context, response: .init(status: .badRequest)) + } + switch (Self.queueLock.withLock { Self.queue[requestId] }) { + case .none: + self.writeResponse(context: context, response: .init(status: .badRequest)) + case .some(let pending): + pending.responsePromise.succeed(.init(status: .ok, body: request.body)) + self.writeResponse(context: context, response: .init(status: .accepted)) + Self.queueLock.withLock { Self.queue[requestId] = nil } + } + } else { + self.writeResponse(context: context, response: .init(status: .notFound)) + } + } + + func writeResponse(context: ChannelHandlerContext, response: Response) { + var headers = HTTPHeaders(response.headers ?? []) + headers.add(name: "content-length", value: "\(response.body?.readableBytes ?? 0)") + let head = HTTPResponseHead(version: HTTPVersion(major: 1, minor: 1), status: response.status, headers: headers) + + context.write(wrapOutboundOut(.head(head))).whenFailure { error in + self.logger.error("\(self) write error \(error)") + } + + if let buffer = response.body { + context.write(wrapOutboundOut(.body(.byteBuffer(buffer)))).whenFailure { error in + self.logger.error("\(self) write error \(error)") + } + } + + context.writeAndFlush(wrapOutboundOut(.end(nil))).whenComplete { result in + if case .failure(let error) = result { + self.logger.error("\(self) write error \(error)") + } + } + } + + struct Response { + var status: HTTPResponseStatus = .ok + var headers: [(String, String)]? + var body: ByteBuffer? + } + + struct Pending { + let requestId: String + let request: ByteBuffer + let responsePromise: EventLoopPromise + } + } + + enum ServerError: Error { + case notReady + case cantBind + } +} +#endif diff --git a/Sources/AWSLambdaRuntime/LambdaContext.swift b/Sources/AWSLambdaRuntime/LambdaContext.swift index ca69cbf7..fef386a4 100644 --- a/Sources/AWSLambdaRuntime/LambdaContext.swift +++ b/Sources/AWSLambdaRuntime/LambdaContext.swift @@ -19,7 +19,7 @@ import NIO extension Lambda { /// Lambda runtime context. /// The Lambda runtime generates and passes the `Context` to the Lambda handler as an argument. - public final class Context { + public final class Context: CustomDebugStringConvertible { /// The request ID, which identifies the request that triggered the function invocation. public let requestId: String @@ -85,5 +85,9 @@ extension Lambda { let remaining = deadline - now return .milliseconds(remaining) } + + public var debugDescription: String { + "\(Self.self)(requestId: \(self.requestId), traceId: \(self.traceId), invokedFunctionArn: \(self.invokedFunctionArn), cognitoIdentity: \(self.cognitoIdentity ?? "nil"), clientContext: \(self.clientContext ?? "nil"), deadline: \(self.deadline))" + } } } diff --git a/Sources/AWSLambdaRuntime/LambdaRunner.swift b/Sources/AWSLambdaRuntime/LambdaRunner.swift index 10e3bbe4..50190b7c 100644 --- a/Sources/AWSLambdaRuntime/LambdaRunner.swift +++ b/Sources/AWSLambdaRuntime/LambdaRunner.swift @@ -46,7 +46,10 @@ extension Lambda { func run(logger: Logger, handler: Handler) -> EventLoopFuture { logger.debug("lambda invocation sequence starting") // 1. request work from lambda runtime engine - return self.runtimeClient.requestWork(logger: logger).peekError { error in + return self.runtimeClient.requestWork(logger: logger).peekError { error -> Void in + if case RuntimeError.badStatusCode(.noContent) = error { + return + } logger.error("could not fetch work from lambda runtime engine: \(error)") }.flatMap { invocation, payload in // 2. send work to handler @@ -64,7 +67,13 @@ extension Lambda { self.runtimeClient.reportResults(logger: logger, invocation: invocation, result: result).peekError { error in logger.error("could not report results to lambda runtime engine: \(error)") } - }.always { result in + }.flatMapErrorThrowing { error in + if case RuntimeError.badStatusCode(.noContent) = error { + return () + } + throw error + } + .always { result in // we are done! logger.log(level: result.successful ? .debug : .warning, "lambda invocation sequence completed \(result.successful ? "successfully" : "with failure")") } diff --git a/Sources/AWSLambdaTesting/Lambda+Testing.swift b/Sources/AWSLambdaTesting/Lambda+Testing.swift index d8b2b125..0a418db9 100644 --- a/Sources/AWSLambdaTesting/Lambda+Testing.swift +++ b/Sources/AWSLambdaTesting/Lambda+Testing.swift @@ -12,9 +12,27 @@ // //===----------------------------------------------------------------------===// -// this is designed to only work for testing +// This functionality is designed to help with Lambda unit testing with XCTest // #if filter required for release builds which do not support @testable import // @testable is used to access of internal functions +// For exmaple: +// +// func test() { +// struct MyLambda: EventLoopLambdaHandler { +// typealias In = String +// typealias Out = String +// +// func handle(context: Lambda.Context, payload: String) -> EventLoopFuture { +// return context.eventLoop.makeSucceededFuture("echo" + payload) +// } +// } +// +// let input = UUID().uuidString +// var result: String? +// XCTAssertNoThrow(result = try Lambda.test(MyLambda(), with: input)) +// XCTAssertEqual(result, "echo" + input) +// } + #if DEBUG @testable import AWSLambdaRuntime import Dispatch diff --git a/Sources/StringSample/main.swift b/Sources/StringSample/main.swift index fddfa478..17bc6ade 100644 --- a/Sources/StringSample/main.swift +++ b/Sources/StringSample/main.swift @@ -26,7 +26,9 @@ struct Handler: EventLoopLambdaHandler { } } -Lambda.run(Handler()) +try Lambda.withLocalServer { + Lambda.run(Handler()) +} // MARK: - this can also be expressed as a closure: diff --git a/docker/docker-compose.1804.53.yaml b/docker/docker-compose.1804.53.yaml index e200426c..1121f2d2 100644 --- a/docker/docker-compose.1804.53.yaml +++ b/docker/docker-compose.1804.53.yaml @@ -6,7 +6,7 @@ services: image: swift-aws-lambda:18.04-5.3 build: args: - base_image: "swiftlang/swift:nightly-bionic" + base_image: "swiftlang/swift:nightly-5.3-bionic" test: image: swift-aws-lambda:18.04-5.3 From 26f6a4b243b7c74ae57c2cf8a44ab86cd26dd750 Mon Sep 17 00:00:00 2001 From: Fabian Fett Date: Wed, 6 May 2020 01:42:43 +0200 Subject: [PATCH 2/3] Feature/ff local server (#70) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Don’t exit immediately * Removed locks. Just running in one EL --- .../AWSLambdaRuntime/Lambda+LocalServer.swift | 96 +++++++++++++------ 1 file changed, 68 insertions(+), 28 deletions(-) diff --git a/Sources/AWSLambdaRuntime/Lambda+LocalServer.swift b/Sources/AWSLambdaRuntime/Lambda+LocalServer.swift index 731ca597..02607680 100644 --- a/Sources/AWSLambdaRuntime/Lambda+LocalServer.swift +++ b/Sources/AWSLambdaRuntime/Lambda+LocalServer.swift @@ -59,7 +59,7 @@ private enum LocalLambda { var logger = Logger(label: "LocalLambdaServer") logger.logLevel = configuration.general.logLevel self.logger = logger - self.group = MultiThreadedEventLoopGroup(numberOfThreads: System.coreCount) + self.group = MultiThreadedEventLoopGroup(numberOfThreads: 1) self.host = configuration.runtimeEngine.ip self.port = configuration.runtimeEngine.port self.invocationEndpoint = invocationEndpoint ?? "/invoke" @@ -88,13 +88,20 @@ private enum LocalLambda { } final class HTTPHandler: ChannelInboundHandler { + + enum InvocationState { + case waitingForNextRequest + case idle(EventLoopPromise) + case processing(Pending) + } + public typealias InboundIn = HTTPServerRequestPart public typealias OutboundOut = HTTPServerResponsePart - private static let queueLock = Lock() - private static var queue = [String: Pending]() - private var processing = CircularBuffer<(head: HTTPRequestHead, body: ByteBuffer?)>() + + private static var queue = [Pending]() + private static var invocationState: InvocationState = .waitingForNextRequest private let logger: Logger private let invocationEndpoint: String @@ -137,43 +144,63 @@ private enum LocalLambda { self.writeResponse(context: context, response: .init(status: .internalServerError)) } } - Self.queueLock.withLock { - Self.queue[requestId] = Pending(requestId: requestId, request: work, responsePromise: promise) + let pending = Pending(requestId: requestId, request: work, responsePromise: promise) + switch Self.invocationState { + case .idle(let promise): + promise.succeed(pending) + case .processing(_), .waitingForNextRequest: + Self.queue.append(pending) } } } else if request.head.uri.hasSuffix("/next") { - switch (Self.queueLock.withLock { Self.queue.popFirst() }) { + // check if our server is in the correct state + guard case .waitingForNextRequest = Self.invocationState else { + #warning("better error code?!") + self.writeResponse(context: context, response: .init(status: .conflict)) + return + } + + // pop the first task from the queue + switch !Self.queue.isEmpty ? Self.queue.removeFirst() : nil { case .none: - self.writeResponse(context: context, response: .init(status: .noContent)) - case .some(let pending): - var response = Response() - response.body = pending.value.request - // required headers - response.headers = [ - (AmazonHeaders.requestID, pending.key), - (AmazonHeaders.invokedFunctionARN, "arn:aws:lambda:us-east-1:\(Int16.random(in: Int16.min ... Int16.max)):function:custom-runtime"), - (AmazonHeaders.traceID, "Root=\(Int16.random(in: Int16.min ... Int16.max));Parent=\(Int16.random(in: Int16.min ... Int16.max));Sampled=1"), - (AmazonHeaders.deadline, "\(DispatchWallTime.distantFuture.millisSinceEpoch)"), - ] - Self.queueLock.withLock { - Self.queue[pending.key] = pending.value + // if there is nothing in the queue, create a promise that we can succeed, + // when we get a new task + let promise = context.eventLoop.makePromise(of: Pending.self) + promise.futureResult.whenComplete { (result) in + switch result { + case .failure(let error): + self.writeResponse(context: context, response: .init(status: .internalServerError)) + case .success(let pending): + Self.invocationState = .processing(pending) + self.writeResponse(context: context, response: pending.toResponse()) + } } - self.writeResponse(context: context, response: response) + Self.invocationState = .idle(promise) + case .some(let pending): + // if there is a task pending, we can immediatly respond with it. + Self.invocationState = .processing(pending) + self.writeResponse(context: context, response: pending.toResponse()) } } else if request.head.uri.hasSuffix("/response") { let parts = request.head.uri.split(separator: "/") guard let requestId = parts.count > 2 ? String(parts[parts.count - 2]) : nil else { + // the request is malformed, since we were expecting a requestId in the path return self.writeResponse(context: context, response: .init(status: .badRequest)) } - switch (Self.queueLock.withLock { Self.queue[requestId] }) { - case .none: - self.writeResponse(context: context, response: .init(status: .badRequest)) - case .some(let pending): - pending.responsePromise.succeed(.init(status: .ok, body: request.body)) - self.writeResponse(context: context, response: .init(status: .accepted)) - Self.queueLock.withLock { Self.queue[requestId] = nil } + guard case .processing(let pending) = Self.invocationState else { + // a response was send, but we did not expect to receive one + #warning("better error code?!") + return self.writeResponse(context: context, response: .init(status: .conflict)) } + guard requestId == pending.requestId else { + // the request's requestId is not matching the one we are expecting + return self.writeResponse(context: context, response: .init(status: .badRequest)) + } + + pending.responsePromise.succeed(.init(status: .ok, body: request.body)) + self.writeResponse(context: context, response: .init(status: .accepted)) + Self.invocationState = .waitingForNextRequest } else { self.writeResponse(context: context, response: .init(status: .notFound)) } @@ -211,6 +238,19 @@ private enum LocalLambda { let requestId: String let request: ByteBuffer let responsePromise: EventLoopPromise + + func toResponse() -> Response { + var response = Response() + response.body = self.request + // required headers + response.headers = [ + (AmazonHeaders.requestID, self.requestId), + (AmazonHeaders.invokedFunctionARN, "arn:aws:lambda:us-east-1:\(Int16.random(in: Int16.min ... Int16.max)):function:custom-runtime"), + (AmazonHeaders.traceID, "Root=\(Int16.random(in: Int16.min ... Int16.max));Parent=\(Int16.random(in: Int16.min ... Int16.max));Sampled=1"), + (AmazonHeaders.deadline, "\(DispatchWallTime.distantFuture.millisSinceEpoch)"), + ] + return response + } } } From 2cf4e1b4cc0b4dfc60e0088d8c524cbf358fb33c Mon Sep 17 00:00:00 2001 From: tom doron Date: Tue, 5 May 2020 17:05:41 -0700 Subject: [PATCH 3/3] fixup --- .../AWSLambdaRuntime/Lambda+LocalServer.swift | 142 +++++++++--------- Sources/AWSLambdaRuntime/LambdaRunner.swift | 13 +- Sources/StringSample/main.swift | 4 +- 3 files changed, 77 insertions(+), 82 deletions(-) diff --git a/Sources/AWSLambdaRuntime/Lambda+LocalServer.swift b/Sources/AWSLambdaRuntime/Lambda+LocalServer.swift index 02607680..b39165b1 100644 --- a/Sources/AWSLambdaRuntime/Lambda+LocalServer.swift +++ b/Sources/AWSLambdaRuntime/Lambda+LocalServer.swift @@ -12,6 +12,7 @@ // //===----------------------------------------------------------------------===// +#if DEBUG import Dispatch import Logging import NIO @@ -26,8 +27,6 @@ import NIOHTTP1 // callback(.success("Hello, \(payload)!")) // } // } - -#if DEBUG extension Lambda { /// Execute code in the context of a mock Lambda server. /// @@ -88,20 +87,13 @@ private enum LocalLambda { } final class HTTPHandler: ChannelInboundHandler { - - enum InvocationState { - case waitingForNextRequest - case idle(EventLoopPromise) - case processing(Pending) - } - public typealias InboundIn = HTTPServerRequestPart public typealias OutboundOut = HTTPServerResponsePart - private var processing = CircularBuffer<(head: HTTPRequestHead, body: ByteBuffer?)>() - - private static var queue = [Pending]() - private static var invocationState: InvocationState = .waitingForNextRequest + private var pending = CircularBuffer<(head: HTTPRequestHead, body: ByteBuffer?)>() + + private static var invocations = CircularBuffer() + private static var invocationState = InvocationState.waitingForLambdaRequest private let logger: Logger private let invocationEndpoint: String @@ -116,92 +108,100 @@ private enum LocalLambda { switch requestPart { case .head(let head): - self.processing.append((head: head, body: nil)) + self.pending.append((head: head, body: nil)) case .body(var buffer): - var request = self.processing.removeFirst() + var request = self.pending.removeFirst() if request.body == nil { request.body = buffer } else { request.body!.writeBuffer(&buffer) } - self.processing.prepend(request) + self.pending.prepend(request) case .end: - let request = self.processing.removeFirst() + let request = self.pending.removeFirst() self.processRequest(context: context, request: request) } } func processRequest(context: ChannelHandlerContext, request: (head: HTTPRequestHead, body: ByteBuffer?)) { - if request.head.uri.hasSuffix(self.invocationEndpoint) { - if let work = request.body { - let requestId = "\(DispatchTime.now().uptimeNanoseconds)" // FIXME: - let promise = context.eventLoop.makePromise(of: Response.self) - promise.futureResult.whenComplete { result in - switch result { - case .success(let response): - self.writeResponse(context: context, response: response) - case .failure: - self.writeResponse(context: context, response: .init(status: .internalServerError)) - } - } - let pending = Pending(requestId: requestId, request: work, responsePromise: promise) - switch Self.invocationState { - case .idle(let promise): - promise.succeed(pending) - case .processing(_), .waitingForNextRequest: - Self.queue.append(pending) + switch (request.head.method, request.head.uri) { + // this endpoint is called by the client invoking the lambda + case (.POST, let url) where url.hasSuffix(self.invocationEndpoint): + guard let work = request.body else { + return self.writeResponse(context: context, response: .init(status: .badRequest)) + } + let requestID = "\(DispatchTime.now().uptimeNanoseconds)" // FIXME: + let promise = context.eventLoop.makePromise(of: Response.self) + promise.futureResult.whenComplete { result in + switch result { + case .failure(let error): + self.logger.error("invocation error: \(error)") + self.writeResponse(context: context, response: .init(status: .internalServerError)) + case .success(let response): + self.writeResponse(context: context, response: response) } } - } else if request.head.uri.hasSuffix("/next") { + let invocation = Invocation(requestID: requestID, request: work, responsePromise: promise) + switch Self.invocationState { + case .waitingForInvocation(let promise): + promise.succeed(invocation) + case .waitingForLambdaRequest, .waitingForLambdaResponse: + Self.invocations.append(invocation) + } + // /next endpoint is called by the lambda polling for work + case (.GET, let url) where url.hasSuffix(Consts.requestWorkURLSuffix): // check if our server is in the correct state - guard case .waitingForNextRequest = Self.invocationState else { - #warning("better error code?!") - self.writeResponse(context: context, response: .init(status: .conflict)) + guard case .waitingForLambdaRequest = Self.invocationState else { + self.logger.error("invalid invocation state \(Self.invocationState)") + self.writeResponse(context: context, response: .init(status: .unprocessableEntity)) return } - + // pop the first task from the queue - switch !Self.queue.isEmpty ? Self.queue.removeFirst() : nil { + switch Self.invocations.popFirst() { case .none: - // if there is nothing in the queue, create a promise that we can succeed, - // when we get a new task - let promise = context.eventLoop.makePromise(of: Pending.self) - promise.futureResult.whenComplete { (result) in + // if there is nothing in the queue, + // create a promise that we can fullfill when we get a new task + let promise = context.eventLoop.makePromise(of: Invocation.self) + promise.futureResult.whenComplete { result in switch result { case .failure(let error): + self.logger.error("invocation error: \(error)") self.writeResponse(context: context, response: .init(status: .internalServerError)) - case .success(let pending): - Self.invocationState = .processing(pending) - self.writeResponse(context: context, response: pending.toResponse()) + case .success(let invocation): + Self.invocationState = .waitingForLambdaResponse(invocation) + self.writeResponse(context: context, response: invocation.makeResponse()) } } - Self.invocationState = .idle(promise) - case .some(let pending): + Self.invocationState = .waitingForInvocation(promise) + case .some(let invocation): // if there is a task pending, we can immediatly respond with it. - Self.invocationState = .processing(pending) - self.writeResponse(context: context, response: pending.toResponse()) + Self.invocationState = .waitingForLambdaResponse(invocation) + self.writeResponse(context: context, response: invocation.makeResponse()) } - - } else if request.head.uri.hasSuffix("/response") { + // :requestID/response endpoint is called by the lambda posting the response + case (.POST, let url) where url.hasSuffix(Consts.postResponseURLSuffix): let parts = request.head.uri.split(separator: "/") - guard let requestId = parts.count > 2 ? String(parts[parts.count - 2]) : nil else { + guard let requestID = parts.count > 2 ? String(parts[parts.count - 2]) : nil else { // the request is malformed, since we were expecting a requestId in the path return self.writeResponse(context: context, response: .init(status: .badRequest)) } - guard case .processing(let pending) = Self.invocationState else { + guard case .waitingForLambdaResponse(let invocation) = Self.invocationState else { // a response was send, but we did not expect to receive one - #warning("better error code?!") - return self.writeResponse(context: context, response: .init(status: .conflict)) + self.logger.error("invalid invocation state \(Self.invocationState)") + return self.writeResponse(context: context, response: .init(status: .unprocessableEntity)) } - guard requestId == pending.requestId else { + guard requestID == invocation.requestID else { // the request's requestId is not matching the one we are expecting + self.logger.error("invalid invocation state request ID \(requestID) does not match expected \(invocation.requestID)") return self.writeResponse(context: context, response: .init(status: .badRequest)) } - - pending.responsePromise.succeed(.init(status: .ok, body: request.body)) + + invocation.responsePromise.succeed(.init(status: .ok, body: request.body)) self.writeResponse(context: context, response: .init(status: .accepted)) - Self.invocationState = .waitingForNextRequest - } else { + Self.invocationState = .waitingForLambdaRequest + // unknown call + default: self.writeResponse(context: context, response: .init(status: .notFound)) } } @@ -234,17 +234,17 @@ private enum LocalLambda { var body: ByteBuffer? } - struct Pending { - let requestId: String + struct Invocation { + let requestID: String let request: ByteBuffer let responsePromise: EventLoopPromise - - func toResponse() -> Response { + + func makeResponse() -> Response { var response = Response() response.body = self.request // required headers response.headers = [ - (AmazonHeaders.requestID, self.requestId), + (AmazonHeaders.requestID, self.requestID), (AmazonHeaders.invokedFunctionARN, "arn:aws:lambda:us-east-1:\(Int16.random(in: Int16.min ... Int16.max)):function:custom-runtime"), (AmazonHeaders.traceID, "Root=\(Int16.random(in: Int16.min ... Int16.max));Parent=\(Int16.random(in: Int16.min ... Int16.max));Sampled=1"), (AmazonHeaders.deadline, "\(DispatchWallTime.distantFuture.millisSinceEpoch)"), @@ -252,6 +252,12 @@ private enum LocalLambda { return response } } + + enum InvocationState { + case waitingForInvocation(EventLoopPromise) + case waitingForLambdaRequest + case waitingForLambdaResponse(Invocation) + } } enum ServerError: Error { diff --git a/Sources/AWSLambdaRuntime/LambdaRunner.swift b/Sources/AWSLambdaRuntime/LambdaRunner.swift index 50190b7c..10e3bbe4 100644 --- a/Sources/AWSLambdaRuntime/LambdaRunner.swift +++ b/Sources/AWSLambdaRuntime/LambdaRunner.swift @@ -46,10 +46,7 @@ extension Lambda { func run(logger: Logger, handler: Handler) -> EventLoopFuture { logger.debug("lambda invocation sequence starting") // 1. request work from lambda runtime engine - return self.runtimeClient.requestWork(logger: logger).peekError { error -> Void in - if case RuntimeError.badStatusCode(.noContent) = error { - return - } + return self.runtimeClient.requestWork(logger: logger).peekError { error in logger.error("could not fetch work from lambda runtime engine: \(error)") }.flatMap { invocation, payload in // 2. send work to handler @@ -67,13 +64,7 @@ extension Lambda { self.runtimeClient.reportResults(logger: logger, invocation: invocation, result: result).peekError { error in logger.error("could not report results to lambda runtime engine: \(error)") } - }.flatMapErrorThrowing { error in - if case RuntimeError.badStatusCode(.noContent) = error { - return () - } - throw error - } - .always { result in + }.always { result in // we are done! logger.log(level: result.successful ? .debug : .warning, "lambda invocation sequence completed \(result.successful ? "successfully" : "with failure")") } diff --git a/Sources/StringSample/main.swift b/Sources/StringSample/main.swift index 17bc6ade..fddfa478 100644 --- a/Sources/StringSample/main.swift +++ b/Sources/StringSample/main.swift @@ -26,9 +26,7 @@ struct Handler: EventLoopLambdaHandler { } } -try Lambda.withLocalServer { - Lambda.run(Handler()) -} +Lambda.run(Handler()) // MARK: - this can also be expressed as a closure: