From 01fa2433366dc01e51bc16e90bdacedd2be8099b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=A9bastien=20Stormacq?= Date: Wed, 16 Jul 2025 09:05:31 +0200 Subject: [PATCH 1/9] Add support for streaming custom HTTP headers and status --- Examples/Streaming/README.md | 52 +- Examples/Streaming/Sources/main.swift | 91 ++- .../LambdaResponseStreamWriter+Headers.swift | 100 +++ ...bdaResponseStreamWriter+HeadersTests.swift | 742 ++++++++++++++++++ readme.md | 46 ++ 5 files changed, 1024 insertions(+), 7 deletions(-) create mode 100644 Sources/AWSLambdaRuntime/LambdaResponseStreamWriter+Headers.swift create mode 100644 Tests/AWSLambdaRuntimeTests/LambdaResponseStreamWriter+HeadersTests.swift diff --git a/Examples/Streaming/README.md b/Examples/Streaming/README.md index 86a42754..03548a44 100644 --- a/Examples/Streaming/README.md +++ b/Examples/Streaming/README.md @@ -13,15 +13,59 @@ The sample code creates a `SendNumbersWithPause` struct that conforms to the `St The `handle(...)` method of this protocol receives incoming events as a Swift NIO `ByteBuffer` and returns the output as a `ByteBuffer`. -The response is streamed through the `LambdaResponseStreamWriter`, which is passed as an argument in the `handle` function. The code calls the `write(_:)` function of the `LambdaResponseStreamWriter` with partial data repeatedly written before -finally closing the response stream by calling `finish()`. Developers can also choose to return the entire output and not -stream the response by calling `writeAndFinish(_:)`. +The response is streamed through the `LambdaResponseStreamWriter`, which is passed as an argument in the `handle` function. + +### Setting HTTP Status Code and Headers + +Before streaming the response body, you can set the HTTP status code and headers using the `writeStatusAndHeaders(_:)` method: + +```swift +try await responseWriter.writeStatusAndHeaders( + StreamingLambdaStatusAndHeadersResponse( + statusCode: 200, + headers: [ + "Content-Type": "text/plain", + "x-my-custom-header": "streaming-example" + ], + multiValueHeaders: [ + "Set-Cookie": ["session=abc123", "theme=dark"] + ] + ) +) +``` + +The `StreamingLambdaStatusAndHeadersResponse` structure allows you to specify: +- **statusCode**: HTTP status code (e.g., 200, 404, 500) +- **headers**: Dictionary of single-value HTTP headers (optional) +- **multiValueHeaders**: Dictionary of multi-value HTTP headers like Set-Cookie (optional) + +### Streaming the Response Body + +After setting headers, you can stream the response body by calling the `write(_:)` function of the `LambdaResponseStreamWriter` with partial data repeatedly before finally closing the response stream by calling `finish()`. Developers can also choose to return the entire output and not stream the response by calling `writeAndFinish(_:)`. + +```swift +// Stream data in chunks +for i in 1...10 { + try await responseWriter.write(ByteBuffer(string: "Number: \(i)\n")) + try await Task.sleep(for: .milliseconds(1000)) +} + +// Close the response stream +try await responseWriter.finish() +``` An error is thrown if `finish()` is called multiple times or if it is called after having called `writeAndFinish(_:)`. +### Example Usage Patterns + +The example includes two handler implementations: + +1. **SendNumbersWithPause**: Demonstrates basic streaming with headers, sending numbers with delays +2. **ConditionalStreamingHandler**: Shows how to handle different response scenarios, including error responses with appropriate status codes + The `handle(...)` method is marked as `mutating` to allow handlers to be implemented with a `struct`. -Once the struct is created and the `handle(...)` method is defined, the sample code creates a `LambdaRuntime` struct and initializes it with the handler just created. Then, the code calls `run()` to start the interaction with the AWS Lambda control plane. +Once the struct is created and the `handle(...)` method is defined, the sample code creates a `LambdaRuntime` struct and initializes it with the handler just created. Then, the code calls `run()` to start the interaction with the AWS Lambda control plane. ## Build & Package diff --git a/Examples/Streaming/Sources/main.swift b/Examples/Streaming/Sources/main.swift index ce92560c..6391a37e 100644 --- a/Examples/Streaming/Sources/main.swift +++ b/Examples/Streaming/Sources/main.swift @@ -15,22 +15,107 @@ import AWSLambdaRuntime import NIOCore +#if canImport(FoundationEssentials) +import FoundationEssentials +#else +import Foundation +#endif + struct SendNumbersWithPause: StreamingLambdaHandler { func handle( _ event: ByteBuffer, responseWriter: some LambdaResponseStreamWriter, context: LambdaContext ) async throws { + + // Send HTTP status code and headers before streaming the response body + try await responseWriter.writeStatusAndHeaders( + StreamingLambdaStatusAndHeadersResponse( + statusCode: 200, + headers: [ + "Content-Type": "text/plain", + "x-my-custom-header": "streaming-example", + ], + multiValueHeaders: [ + "Set-Cookie": ["session=abc123", "theme=dark"] + ] + ) + ) + + // Stream numbers with pauses to demonstrate streaming functionality for i in 1...10 { // Send partial data - try await responseWriter.write(ByteBuffer(string: "\(i)\n")) - // Perform some long asynchronous work + try await responseWriter.write(ByteBuffer(string: "Number: \(i)\n")) + // Perform some long asynchronous work to simulate processing try await Task.sleep(for: .milliseconds(1000)) } + + // Send final message + try await responseWriter.write(ByteBuffer(string: "Streaming complete!\n")) + // All data has been sent. Close off the response stream. try await responseWriter.finish() } } -let runtime = LambdaRuntime.init(handler: SendNumbersWithPause()) +// Example of a more complex streaming handler that demonstrates different response scenarios +struct ConditionalStreamingHandler: StreamingLambdaHandler { + func handle( + _ event: ByteBuffer, + responseWriter: some LambdaResponseStreamWriter, + context: LambdaContext + ) async throws { + + // Parse the event to determine response type + let eventString = String(buffer: event) + let shouldError = eventString.contains("error") + + if shouldError { + // Send error response with appropriate status code + try await responseWriter.writeStatusAndHeaders( + StreamingLambdaStatusAndHeadersResponse( + statusCode: 400, + headers: [ + "Content-Type": "application/json", + "x-error-type": "client-error", + ] + ) + ) + + try await responseWriter.writeAndFinish( + ByteBuffer(string: #"{"error": "Bad request", "message": "Error requested in input"}"#) + ) + } else { + // Send successful response with streaming data + try await responseWriter.writeStatusAndHeaders( + StreamingLambdaStatusAndHeadersResponse( + statusCode: 200, + headers: [ + "Content-Type": "application/json", + "Cache-Control": "no-cache", + ] + ) + ) + + // Stream JSON array elements + try await responseWriter.write(ByteBuffer(string: "[")) + + for i in 1...5 { + if i > 1 { + try await responseWriter.write(ByteBuffer(string: ",")) + } + try await responseWriter.write( + ByteBuffer(string: #"{"id": \#(i), "timestamp": "\#(Date().timeIntervalSince1970)"}"#) + ) + try await Task.sleep(for: .milliseconds(500)) + } + + try await responseWriter.write(ByteBuffer(string: "]")) + try await responseWriter.finish() + } + } +} + +// Use the simple example by default +let runtime = LambdaRuntime(handler: SendNumbersWithPause()) try await runtime.run() diff --git a/Sources/AWSLambdaRuntime/LambdaResponseStreamWriter+Headers.swift b/Sources/AWSLambdaRuntime/LambdaResponseStreamWriter+Headers.swift new file mode 100644 index 00000000..a2609c2f --- /dev/null +++ b/Sources/AWSLambdaRuntime/LambdaResponseStreamWriter+Headers.swift @@ -0,0 +1,100 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the SwiftAWSLambdaRuntime open source project +// +// Copyright (c) 2017-2024 Apple Inc. and the SwiftAWSLambdaRuntime project authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See CONTRIBUTORS.txt for the list of SwiftAWSLambdaRuntime project authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// + +import NIOCore + +#if canImport(FoundationEssentials) +import FoundationEssentials +#else +import Foundation +#endif + +/// A response structure specifically designed for streaming Lambda responses that contains +/// HTTP status code and headers without body content. +/// +/// This structure is used with `LambdaResponseStreamWriter.writeStatusAndHeaders(_:)` to send +/// HTTP response metadata before streaming the response body. +public struct StreamingLambdaStatusAndHeadersResponse: Codable, Sendable { + /// The HTTP status code for the response (e.g., 200, 404, 500) + public let statusCode: Int + + /// Dictionary of single-value HTTP headers + public let headers: [String: String]? + + /// Dictionary of multi-value HTTP headers (e.g., Set-Cookie headers) + public let multiValueHeaders: [String: [String]]? + + /// Creates a new streaming Lambda response with status code and optional headers + /// + /// - Parameters: + /// - statusCode: The HTTP status code for the response + /// - headers: Optional dictionary of single-value HTTP headers + /// - multiValueHeaders: Optional dictionary of multi-value HTTP headers + public init( + statusCode: Int, + headers: [String: String]? = nil, + multiValueHeaders: [String: [String]]? = nil + ) { + self.statusCode = statusCode + self.headers = headers + self.multiValueHeaders = multiValueHeaders + } +} + +extension LambdaResponseStreamWriter { + /// Writes the HTTP status code and headers to the response stream. + /// + /// This method serializes the status and headers as JSON and writes them to the stream, + /// followed by eight null bytes as a separator before the response body. + /// + /// - Parameters: + /// - response: The status and headers response to write + /// - encoder: The encoder to use for serializing the response, + /// - Throws: An error if JSON serialization or writing fails + public func writeStatusAndHeaders( + _ response: StreamingLambdaStatusAndHeadersResponse, + encoder: Encoder + ) async throws where Encoder.Output == StreamingLambdaStatusAndHeadersResponse { + + // Convert Data to ByteBuffer + var buffer = ByteBuffer() + try encoder.encode(response, into: &buffer) + + // Write the JSON data + try await write(buffer) + + // Write eight null bytes as separator + var separatorBuffer = ByteBuffer() + separatorBuffer.writeBytes([0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00]) + try await write(separatorBuffer) + } +} + +extension LambdaResponseStreamWriter { + /// Writes the HTTP status code and headers to the response stream. + /// + /// This method serializes the status and headers as JSON and writes them to the stream, + /// followed by eight null bytes as a separator before the response body. + /// + /// - Parameters: + /// - response: The status and headers response to write + /// - encoder: The encoder to use for serializing the response, use JSONEncoder by default + /// - Throws: An error if JSON serialization or writing fails + public func writeStatusAndHeaders( + _ response: StreamingLambdaStatusAndHeadersResponse, + encoder: JSONEncoder = JSONEncoder() + ) async throws { + try await self.writeStatusAndHeaders(response, encoder: LambdaJSONOutputEncoder(encoder)) + } +} diff --git a/Tests/AWSLambdaRuntimeTests/LambdaResponseStreamWriter+HeadersTests.swift b/Tests/AWSLambdaRuntimeTests/LambdaResponseStreamWriter+HeadersTests.swift new file mode 100644 index 00000000..3cb6095f --- /dev/null +++ b/Tests/AWSLambdaRuntimeTests/LambdaResponseStreamWriter+HeadersTests.swift @@ -0,0 +1,742 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the SwiftAWSLambdaRuntime open source project +// +// Copyright (c) 2017-2024 Apple Inc. and the SwiftAWSLambdaRuntime project authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See CONTRIBUTORS.txt for the list of SwiftAWSLambdaRuntime project authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// + +import AWSLambdaRuntime +import Logging +import NIOCore +import Testing + +#if canImport(FoundationEssentials) +import FoundationEssentials +#else +import Foundation +#endif + +@Suite("LambdaResponseStreamWriter+Headers Tests") +struct LambdaResponseStreamWriterHeadersTests { + + @Test("Write status and headers with minimal response (status code only)") + func testWriteStatusAndHeadersMinimal() async throws { + let writer = MockLambdaResponseStreamWriter() + let response = StreamingLambdaStatusAndHeadersResponse(statusCode: 200) + + try await writer.writeStatusAndHeaders(response) + + // Verify we have exactly 2 buffers written (JSON + separator) + #expect(writer.writtenBuffers.count == 2) + + // Verify JSON content + let jsonBuffer = writer.writtenBuffers[0] + let jsonString = String(buffer: jsonBuffer) + let expectedJSON = #"{"statusCode":200}"# + #expect(jsonString == expectedJSON) + + // Verify separator (8 null bytes) + let separatorBuffer = writer.writtenBuffers[1] + let separatorBytes = separatorBuffer.getBytes(at: 0, length: separatorBuffer.readableBytes) + let expectedSeparator: [UInt8] = [0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00] + #expect(separatorBytes == expectedSeparator) + } + + @Test("Write status and headers with full response (all fields populated)") + func testWriteStatusAndHeadersFull() async throws { + let writer = MockLambdaResponseStreamWriter() + let response = StreamingLambdaStatusAndHeadersResponse( + statusCode: 201, + headers: [ + "Content-Type": "application/json", + "Cache-Control": "no-cache", + ], + multiValueHeaders: [ + "Set-Cookie": ["session=abc123", "theme=dark"], + "X-Custom": ["value1", "value2"], + ] + ) + + try await writer.writeStatusAndHeaders(response) + + // Verify we have exactly 2 buffers written (JSON + separator) + #expect(writer.writtenBuffers.count == 2) + + // Verify JSON content structure + let jsonBuffer = writer.writtenBuffers[0] + let jsonString = String(buffer: jsonBuffer) + + // Parse JSON to verify structure + let jsonData = Data(jsonString.utf8) + let decoder = JSONDecoder() + let parsedResponse = try decoder.decode(StreamingLambdaStatusAndHeadersResponse.self, from: jsonData) + + #expect(parsedResponse.statusCode == 201) + + #expect(parsedResponse.headers?["Content-Type"] == "application/json") + #expect(parsedResponse.headers?["Cache-Control"] == "no-cache") + + #expect(parsedResponse.multiValueHeaders?["Set-Cookie"] == ["session=abc123", "theme=dark"]) + #expect(parsedResponse.multiValueHeaders?["X-Custom"] == ["value1", "value2"]) + + // Verify separator (8 null bytes) + let separatorBuffer = writer.writtenBuffers[1] + let separatorBytes = separatorBuffer.getBytes(at: 0, length: separatorBuffer.readableBytes) + let expectedSeparator: [UInt8] = [0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00] + #expect(separatorBytes == expectedSeparator) + } + + @Test("Write status and headers with custom encoder") + func testWriteStatusAndHeadersWithCustomEncoder() async throws { + let writer = MockLambdaResponseStreamWriter() + let response = StreamingLambdaStatusAndHeadersResponse( + statusCode: 404, + headers: ["Error": "Not Found"] + ) + + // Use custom encoder with different formatting + let customEncoder = JSONEncoder() + customEncoder.outputFormatting = .sortedKeys + + try await writer.writeStatusAndHeaders(response, encoder: customEncoder) + + // Verify we have exactly 2 buffers written (JSON + separator) + #expect(writer.writtenBuffers.count == 2) + + // Verify JSON content with sorted keys + let jsonBuffer = writer.writtenBuffers[0] + let jsonString = String(buffer: jsonBuffer) + + // With sorted keys, "headers" should come before "statusCode" + #expect(jsonString.contains(#""headers":{"Error":"Not Found"}"#)) + #expect(jsonString.contains(#""statusCode":404"#)) + + // Verify separator + let separatorBuffer = writer.writtenBuffers[1] + #expect(separatorBuffer.readableBytes == 8) + } + + @Test("Write status and headers with only headers (no multiValueHeaders)") + func testWriteStatusAndHeadersOnlyHeaders() async throws { + let writer = MockLambdaResponseStreamWriter() + let response = StreamingLambdaStatusAndHeadersResponse( + statusCode: 302, + headers: ["Location": "https://example.com"] + ) + + try await writer.writeStatusAndHeaders(response) + + // Verify JSON structure + let jsonBuffer = writer.writtenBuffers[0] + let jsonString = String(buffer: jsonBuffer) + let jsonData = Data(jsonString.utf8) + let decoder = JSONDecoder() + let parsedResponse = try decoder.decode(StreamingLambdaStatusAndHeadersResponse.self, from: jsonData) + + #expect(parsedResponse.statusCode == 302) + + #expect(parsedResponse.headers?["Location"] == "https://example.com") + + // multiValueHeaders should be null/nil in JSON + #expect(parsedResponse.multiValueHeaders == nil) + } + + @Test("Write status and headers with only multiValueHeaders (no headers)") + func testWriteStatusAndHeadersOnlyMultiValueHeaders() async throws { + let writer = MockLambdaResponseStreamWriter() + let response = StreamingLambdaStatusAndHeadersResponse( + statusCode: 200, + multiValueHeaders: [ + "Accept": ["application/json", "text/html"] + ] + ) + + try await writer.writeStatusAndHeaders(response) + + // Verify JSON structure + let jsonBuffer = writer.writtenBuffers[0] + let jsonString = String(buffer: jsonBuffer) + let jsonData = Data(jsonString.utf8) + let decoder = JSONDecoder() + let parsedResponse = try decoder.decode(StreamingLambdaStatusAndHeadersResponse.self, from: jsonData) + + #expect(parsedResponse.statusCode == 200) + + // headers should be null/nil in JSON + #expect(parsedResponse.headers == nil) + + #expect(parsedResponse.multiValueHeaders?["Accept"] == ["application/json", "text/html"]) + } + + @Test("Verify JSON serialization format matches expected structure") + func testJSONSerializationFormat() async throws { + let writer = MockLambdaResponseStreamWriter() + let response = StreamingLambdaStatusAndHeadersResponse( + statusCode: 418, + headers: ["X-Tea": "Earl Grey"], + multiValueHeaders: ["X-Brew": ["hot", "strong"]] + ) + + try await writer.writeStatusAndHeaders(response) + + let jsonBuffer = writer.writtenBuffers[0] + let jsonString = String(buffer: jsonBuffer) + + // Verify it's valid JSON by decoding + let jsonData = Data(jsonString.utf8) + let decoder = JSONDecoder() + #expect(throws: Never.self) { + _ = try decoder.decode(StreamingLambdaStatusAndHeadersResponse.self, from: jsonData) + } + + // Verify specific structure + let parsedResponse = try decoder.decode(StreamingLambdaStatusAndHeadersResponse.self, from: jsonData) + + // Must have statusCode + #expect(parsedResponse.statusCode == 418) + + // Must have headers when provided + #expect(parsedResponse.headers?["X-Tea"] == "Earl Grey") + + // Must have multiValueHeaders when provided + #expect(parsedResponse.multiValueHeaders?["X-Brew"] == ["hot", "strong"]) + } + + @Test("Verify null byte separator is exactly 8 bytes") + func testNullByteSeparatorLength() async throws { + let writer = MockLambdaResponseStreamWriter() + let response = StreamingLambdaStatusAndHeadersResponse(statusCode: 200) + + try await writer.writeStatusAndHeaders(response) + + #expect(writer.writtenBuffers.count == 2) + + let separatorBuffer = writer.writtenBuffers[1] + #expect(separatorBuffer.readableBytes == 8) + + // Verify all bytes are 0x00 + let separatorBytes = separatorBuffer.getBytes(at: 0, length: 8)! + for byte in separatorBytes { + #expect(byte == 0x00) + } + } + + // MARK: - Error Handling Tests + + @Test("JSON serialization error propagation") + func testJSONSerializationErrorPropagation() async throws { + let writer = MockLambdaResponseStreamWriter() + let response = StreamingLambdaStatusAndHeadersResponse(statusCode: 200) + + // Create a failing encoder that always throws an error + let failingEncoder = FailingEncoder() + + // Verify that the encoder error is propagated + await #expect(throws: TestEncodingError.self) { + try await writer.writeStatusAndHeaders(response, encoder: failingEncoder) + } + + // Verify no data was written when encoding fails + #expect(writer.writtenBuffers.isEmpty) + } + + @Test("Write method error propagation for JSON data") + func testWriteMethodErrorPropagationForJSON() async throws { + let writer = FailingMockLambdaResponseStreamWriter(failOnWriteCall: 1) // Fail on first write (JSON) + let response = StreamingLambdaStatusAndHeadersResponse(statusCode: 200) + + // Verify that the write error is propagated + await #expect(throws: TestWriteError.self) { + try await writer.writeStatusAndHeaders(response) + } + + // Verify the writer attempted to write once (the JSON data) + #expect(writer.writeCallCount == 1) + } + + @Test("Write method error propagation for separator") + func testWriteMethodErrorPropagationForSeparator() async throws { + let writer = FailingMockLambdaResponseStreamWriter(failOnWriteCall: 2) // Fail on second write (separator) + let response = StreamingLambdaStatusAndHeadersResponse(statusCode: 200) + + // Verify that the write error is propagated + await #expect(throws: TestWriteError.self) { + try await writer.writeStatusAndHeaders(response) + } + + // Verify the writer attempted to write twice (JSON succeeded, separator failed) + #expect(writer.writeCallCount == 2) + // Verify JSON was written successfully before separator failure + #expect(writer.writtenBuffers.count == 1) + } + + @Test("Error types and messages are properly handled") + func testErrorTypesAndMessages() async throws { + let writer = MockLambdaResponseStreamWriter() + let response = StreamingLambdaStatusAndHeadersResponse(statusCode: 200) + + // Test with a custom encoder that throws a specific error + let customFailingEncoder = CustomFailingEncoder() + + do { + try await writer.writeStatusAndHeaders(response, encoder: customFailingEncoder) + #expect(Bool(false), "Expected error to be thrown") + } catch let error as CustomEncodingError { + // Verify the specific error type and message are preserved + #expect(error.message == "Custom encoding failed") + #expect(error.code == 42) + } catch { + #expect(Bool(false), "Expected CustomEncodingError but got \(type(of: error))") + } + } + + @Test("JSONEncoder error propagation with invalid data") + func testJSONEncoderErrorPropagation() async throws { + let writer = MockLambdaResponseStreamWriter() + + // Create a response that should encode successfully + let response = StreamingLambdaStatusAndHeadersResponse(statusCode: 200) + + // Note: It's difficult to make JSONEncoder fail with valid Codable types, + // so we'll use our custom failing encoder to simulate this scenario + let failingJSONEncoder = FailingJSONEncoder() + + await #expect(throws: TestJSONEncodingError.self) { + try await writer.writeStatusAndHeaders(response, encoder: failingJSONEncoder) + } + + // Verify no data was written when encoding fails + #expect(writer.writtenBuffers.isEmpty) + } + + // MARK: - Integration Tests + + @Test("Integration: writeStatusAndHeaders with existing streaming methods") + func testIntegrationWithExistingStreamingMethods() async throws { + let writer = MockLambdaResponseStreamWriter() + let response = StreamingLambdaStatusAndHeadersResponse( + statusCode: 200, + headers: ["Content-Type": "text/plain"] + ) + + // Write headers first + try await writer.writeStatusAndHeaders(response) + + // Then use existing streaming methods + let bodyData = "Hello, World!" + var bodyBuffer = ByteBuffer() + bodyBuffer.writeString(bodyData) + + try await writer.write(bodyBuffer) + + let moreData = " Additional content." + var moreBuffer = ByteBuffer() + moreBuffer.writeString(moreData) + + try await writer.writeAndFinish(moreBuffer) + + // Verify the sequence: JSON + separator + body + more body + #expect(writer.writtenBuffers.count == 4) + #expect(writer.isFinished == true) + + // Verify JSON content + let jsonBuffer = writer.writtenBuffers[0] + let jsonString = String(buffer: jsonBuffer) + #expect(jsonString.contains(#""statusCode":200"#)) + #expect(jsonString.contains(#""Content-Type":"text\/plain""#)) + + // Verify separator + let separatorBuffer = writer.writtenBuffers[1] + #expect(separatorBuffer.readableBytes == 8) + + // Verify body content + let firstBodyBuffer = writer.writtenBuffers[2] + let firstBodyString = String(buffer: firstBodyBuffer) + #expect(firstBodyString == "Hello, World!") + + let secondBodyBuffer = writer.writtenBuffers[3] + let secondBodyString = String(buffer: secondBodyBuffer) + #expect(secondBodyString == " Additional content.") + } + + @Test("Integration: multiple header writes work correctly") + func testMultipleHeaderWrites() async throws { + let writer = MockLambdaResponseStreamWriter() + + // First header write + let firstResponse = StreamingLambdaStatusAndHeadersResponse( + statusCode: 200, + headers: ["Content-Type": "application/json"] + ) + try await writer.writeStatusAndHeaders(firstResponse) + + // Second header write (should work - multiple headers are allowed) + let secondResponse = StreamingLambdaStatusAndHeadersResponse( + statusCode: 201, + headers: ["Location": "https://example.com/resource/123"] + ) + try await writer.writeStatusAndHeaders(secondResponse) + + // Verify both header writes were successful + #expect(writer.writtenBuffers.count == 4) // 2 JSON + 2 separators + + // Verify first header write + let firstJsonBuffer = writer.writtenBuffers[0] + let firstJsonString = String(buffer: firstJsonBuffer) + #expect(firstJsonString.contains(#""statusCode":200"#)) + #expect(firstJsonString.contains(#""Content-Type":"application\/json""#)) + + // Verify first separator + let firstSeparatorBuffer = writer.writtenBuffers[1] + #expect(firstSeparatorBuffer.readableBytes == 8) + + // Verify second header write + let secondJsonBuffer = writer.writtenBuffers[2] + let secondJsonString = String(buffer: secondJsonBuffer) + #expect(secondJsonString.contains(#""statusCode":201"#)) + #expect(secondJsonString.contains(#""Location":"https:\/\/example.com\/resource\/123""#)) + + // Verify second separator + let secondSeparatorBuffer = writer.writtenBuffers[3] + #expect(secondSeparatorBuffer.readableBytes == 8) + } + + @Test("Integration: header write followed by body streaming compatibility") + func testHeaderWriteFollowedByBodyStreaming() async throws { + let writer = MockLambdaResponseStreamWriter() + + // Write headers first + let response = StreamingLambdaStatusAndHeadersResponse( + statusCode: 200, + headers: ["Content-Type": "application/json"], + multiValueHeaders: ["Set-Cookie": ["session=abc123", "theme=dark"]] + ) + try await writer.writeStatusAndHeaders(response) + + // Stream body content in multiple chunks + let chunks = [ + #"{"users": ["#, + #"{"id": 1, "name": "Alice"}, "#, + #"{"id": 2, "name": "Bob"}"#, + #"]}"#, + ] + + for (index, chunk) in chunks.enumerated() { + var buffer = ByteBuffer() + buffer.writeString(chunk) + + if index == chunks.count - 1 { + // Use writeAndFinish for the last chunk + try await writer.writeAndFinish(buffer) + } else { + try await writer.write(buffer) + } + } + + // Verify the complete sequence + #expect(writer.writtenBuffers.count == 6) // JSON + separator + 4 body chunks + #expect(writer.isFinished == true) + + // Verify headers were written correctly + let jsonBuffer = writer.writtenBuffers[0] + let jsonString = String(buffer: jsonBuffer) + #expect(jsonString.contains(#""statusCode":200"#)) + #expect(jsonString.contains(#""Content-Type":"application\/json""#)) + #expect(jsonString.contains(#""Set-Cookie":["session=abc123","theme=dark"]"#)) + + // Verify separator + let separatorBuffer = writer.writtenBuffers[1] + #expect(separatorBuffer.readableBytes == 8) + + // Verify body chunks + let bodyChunks = writer.writtenBuffers[2...5].map { String(buffer: $0) } + let completeBody = bodyChunks.joined() + let expectedBody = #"{"users": [{"id": 1, "name": "Alice"}, {"id": 2, "name": "Bob"}]}"# + #expect(completeBody == expectedBody) + } + + @Test("Integration: verify method works with different LambdaResponseStreamWriter implementations") + func testWithDifferentWriterImplementations() async throws { + // Test with basic mock implementation + let basicWriter = MockLambdaResponseStreamWriter() + let response = StreamingLambdaStatusAndHeadersResponse(statusCode: 200) + + try await basicWriter.writeStatusAndHeaders(response) + #expect(basicWriter.writtenBuffers.count == 2) + + // Test with a writer that tracks additional state + let trackingWriter = TrackingLambdaResponseStreamWriter() + try await trackingWriter.writeStatusAndHeaders(response) + #expect(trackingWriter.writtenBuffers.count == 2) + #expect(trackingWriter.writeCallCount == 2) // JSON + separator + #expect(trackingWriter.finishCallCount == 0) + + // Test with a writer that has custom behavior + let customWriter = CustomBehaviorLambdaResponseStreamWriter() + try await customWriter.writeStatusAndHeaders(response) + #expect(customWriter.writtenBuffers.count == 2) + #expect(customWriter.customBehaviorTriggered == true) + } + + @Test("Integration: complex scenario with headers, streaming, and finish") + func testComplexIntegrationScenario() async throws { + let writer = MockLambdaResponseStreamWriter() + + // Step 1: Write initial headers + let initialResponse = StreamingLambdaStatusAndHeadersResponse( + statusCode: 200, + headers: ["Content-Type": "text/event-stream", "Cache-Control": "no-cache"] + ) + try await writer.writeStatusAndHeaders(initialResponse) + + // Step 2: Write additional headers (simulating server-sent events setup) + let sseResponse = StreamingLambdaStatusAndHeadersResponse( + statusCode: 200, + headers: ["Connection": "keep-alive"] + ) + try await writer.writeStatusAndHeaders(sseResponse) + + // Step 3: Stream event data + let events = [ + "data: Event 1\n\n", + "data: Event 2\n\n", + "data: Event 3\n\n", + ] + + for event in events { + var buffer = ByteBuffer() + buffer.writeString(event) + try await writer.write(buffer) + } + + // Step 4: Send final event and finish + var finalBuffer = ByteBuffer() + finalBuffer.writeString("data: Final event\n\n") + try await writer.writeAndFinish(finalBuffer) + + // Verify the complete sequence + // 2 header writes (JSON + separator each) + 3 events + 1 final event = 8 buffers + #expect(writer.writtenBuffers.count == 8) + #expect(writer.isFinished == true) + + // Verify headers + let firstJsonString = String(buffer: writer.writtenBuffers[0]) + #expect(firstJsonString.contains(#""Content-Type":"text\/event-stream""#)) + + let secondJsonString = String(buffer: writer.writtenBuffers[2]) + #expect(secondJsonString.contains(#""Connection":"keep-alive""#)) + + // Verify events + let eventBuffers = [ + writer.writtenBuffers[4], writer.writtenBuffers[5], writer.writtenBuffers[6], writer.writtenBuffers[7], + ] + let eventStrings = eventBuffers.map { String(buffer: $0) } + #expect(eventStrings[0] == "data: Event 1\n\n") + #expect(eventStrings[1] == "data: Event 2\n\n") + #expect(eventStrings[2] == "data: Event 3\n\n") + #expect(eventStrings[3] == "data: Final event\n\n") + } + + @Test("Integration: verify compatibility with protocol requirements") + func testProtocolCompatibility() async throws { + let writer = MockLambdaResponseStreamWriter() + let response = StreamingLambdaStatusAndHeadersResponse(statusCode: 200) + + // Verify the method can be called on any LambdaResponseStreamWriter + func testWithGenericWriter(_ writer: W) async throws { + try await writer.writeStatusAndHeaders(response) + } + + // This should compile and work without issues + try await testWithGenericWriter(writer) + #expect(writer.writtenBuffers.count == 2) + + // Verify it works with protocol existential + let protocolWriter: any LambdaResponseStreamWriter = MockLambdaResponseStreamWriter() + try await protocolWriter.writeStatusAndHeaders(response) + + if let mockWriter = protocolWriter as? MockLambdaResponseStreamWriter { + #expect(mockWriter.writtenBuffers.count == 2) + } + } +} + +// MARK: - Mock Implementation + +/// Mock implementation of LambdaResponseStreamWriter for testing +final class MockLambdaResponseStreamWriter: LambdaResponseStreamWriter { + private(set) var writtenBuffers: [ByteBuffer] = [] + private(set) var isFinished = false + + func write(_ buffer: ByteBuffer) async throws { + writtenBuffers.append(buffer) + } + + func finish() async throws { + isFinished = true + } + + func writeAndFinish(_ buffer: ByteBuffer) async throws { + writtenBuffers.append(buffer) + isFinished = true + } +} + +// MARK: - Error Handling Mock Implementations + +/// Mock implementation that fails on specific write calls for testing error propagation +final class FailingMockLambdaResponseStreamWriter: LambdaResponseStreamWriter { + private(set) var writtenBuffers: [ByteBuffer] = [] + private(set) var writeCallCount = 0 + private(set) var isFinished = false + private let failOnWriteCall: Int + + init(failOnWriteCall: Int) { + self.failOnWriteCall = failOnWriteCall + } + + func write(_ buffer: ByteBuffer) async throws { + writeCallCount += 1 + + if writeCallCount == failOnWriteCall { + throw TestWriteError() + } + + writtenBuffers.append(buffer) + } + + func finish() async throws { + isFinished = true + } + + func writeAndFinish(_ buffer: ByteBuffer) async throws { + try await write(buffer) + try await finish() + } +} + +// MARK: - Test Error Types + +/// Test error for write method failures +struct TestWriteError: Error, Equatable { + let message: String + + init(message: String = "Test write error") { + self.message = message + } +} + +/// Test error for encoding failures +struct TestEncodingError: Error, Equatable { + let message: String + + init(message: String = "Test encoding error") { + self.message = message + } +} + +/// Custom test error with additional properties +struct CustomEncodingError: Error, Equatable { + let message: String + let code: Int + + init(message: String = "Custom encoding failed", code: Int = 42) { + self.message = message + self.code = code + } +} + +/// Test error for JSON encoding failures +struct TestJSONEncodingError: Error, Equatable { + let message: String + + init(message: String = "Test JSON encoding error") { + self.message = message + } +} + +// MARK: - Failing Encoder Implementations + +/// Mock encoder that always fails for testing error propagation +struct FailingEncoder: LambdaOutputEncoder { + typealias Output = StreamingLambdaStatusAndHeadersResponse + + func encode(_ value: StreamingLambdaStatusAndHeadersResponse, into buffer: inout ByteBuffer) throws { + throw TestEncodingError() + } +} + +/// Mock encoder that throws custom errors for testing specific error handling +struct CustomFailingEncoder: LambdaOutputEncoder { + typealias Output = StreamingLambdaStatusAndHeadersResponse + + func encode(_ value: StreamingLambdaStatusAndHeadersResponse, into buffer: inout ByteBuffer) throws { + throw CustomEncodingError() + } +} + +/// Mock JSON encoder that always fails for testing JSON-specific error propagation +struct FailingJSONEncoder: LambdaOutputEncoder { + typealias Output = StreamingLambdaStatusAndHeadersResponse + + func encode(_ value: StreamingLambdaStatusAndHeadersResponse, into buffer: inout ByteBuffer) throws { + throw TestJSONEncodingError() + } +} + +// MARK: - Additional Mock Implementations for Integration Tests + +/// Mock implementation that tracks additional state for integration testing +final class TrackingLambdaResponseStreamWriter: LambdaResponseStreamWriter { + private(set) var writtenBuffers: [ByteBuffer] = [] + private(set) var writeCallCount = 0 + private(set) var finishCallCount = 0 + private(set) var writeAndFinishCallCount = 0 + private(set) var isFinished = false + + func write(_ buffer: ByteBuffer) async throws { + writeCallCount += 1 + writtenBuffers.append(buffer) + } + + func finish() async throws { + finishCallCount += 1 + isFinished = true + } + + func writeAndFinish(_ buffer: ByteBuffer) async throws { + writeAndFinishCallCount += 1 + writtenBuffers.append(buffer) + isFinished = true + } +} + +/// Mock implementation with custom behavior for integration testing +final class CustomBehaviorLambdaResponseStreamWriter: LambdaResponseStreamWriter { + private(set) var writtenBuffers: [ByteBuffer] = [] + private(set) var customBehaviorTriggered = false + private(set) var isFinished = false + + func write(_ buffer: ByteBuffer) async throws { + // Trigger custom behavior on any write + customBehaviorTriggered = true + writtenBuffers.append(buffer) + } + + func finish() async throws { + isFinished = true + } + + func writeAndFinish(_ buffer: ByteBuffer) async throws { + customBehaviorTriggered = true + writtenBuffers.append(buffer) + isFinished = true + } +} diff --git a/readme.md b/readme.md index 37596ed2..dbbad531 100644 --- a/readme.md +++ b/readme.md @@ -225,6 +225,8 @@ Streaming responses incurs a cost. For more information, see [AWS Lambda Pricing You can stream responses through [Lambda function URLs](https://docs.aws.amazon.com/lambda/latest/dg/urls-configuration.html), the AWS SDK, or using the Lambda [InvokeWithResponseStream](https://docs.aws.amazon.com/lambda/latest/dg/API_InvokeWithResponseStream.html) API. In this example, we create an authenticated Lambda function URL. +#### Simple Streaming Example + Here is an example of a minimal function that streams 10 numbers with an interval of one second for each number. ```swift @@ -252,6 +254,50 @@ let runtime = LambdaRuntime.init(handler: SendNumbersWithPause()) try await runtime.run() ``` +#### Streaming with HTTP Headers and Status Code + +When streaming responses, you can also set HTTP status codes and headers before sending the response body. This is particularly useful when your Lambda function is invoked through API Gateway or Lambda function URLs, allowing you to control the HTTP response metadata. + +```swift +import AWSLambdaRuntime +import NIOCore + +struct StreamingWithHeaders: StreamingLambdaHandler { + func handle( + _ event: ByteBuffer, + responseWriter: some LambdaResponseStreamWriter, + context: LambdaContext + ) async throws { + // Set HTTP status code and headers before streaming the body + let response = StreamingLambdaStatusAndHeadersResponse( + statusCode: 200, + headers: [ + "Content-Type": "text/plain", + "Cache-Control": "no-cache" + ] + ) + try await responseWriter.writeStatusAndHeaders(response) + + // Now stream the response body + for i in 1...5 { + try await responseWriter.write(ByteBuffer(string: "Chunk \(i)\n")) + try await Task.sleep(for: .milliseconds(500)) + } + + try await responseWriter.finish() + } +} + +let runtime = LambdaRuntime.init(handler: StreamingWithHeaders()) +try await runtime.run() +``` + +The `writeStatusAndHeaders` method allows you to: +- Set HTTP status codes (200, 404, 500, etc.) +- Add custom HTTP headers for content type, caching, CORS, etc. +- Control response metadata before streaming begins +- Maintain compatibility with API Gateway and Lambda function URLs + You can learn how to deploy and invoke this function in [the streaming example README file](Examples/Streaming/README.md). ### Integration with AWS Services From b6fb60cd3fc1ceb444196ac62870b79d84817bd7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=A9bastien=20Stormacq?= Date: Wed, 16 Jul 2025 09:05:59 +0200 Subject: [PATCH 2/9] add kiro to gitignore --- .gitignore | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/.gitignore b/.gitignore index b3b30ec1..ff8b77b9 100644 --- a/.gitignore +++ b/.gitignore @@ -12,4 +12,5 @@ Package.resolved .vscode Makefile .devcontainer -.amazonq \ No newline at end of file +.amazonq +.kiro \ No newline at end of file From 83aa01825644c517c89cf79c6d488a0642d7817a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=A9bastien=20Stormacq?= Date: Sat, 19 Jul 2025 23:14:12 +0200 Subject: [PATCH 3/9] add support for streaming lambda function + custom status code and HTTP headers --- .gitignore | 3 +- Examples/Streaming/Sources/main.swift | 67 +---------- Examples/Streaming/template.yaml | 3 + Sources/AWSLambdaRuntime/LambdaHandlers.swift | 9 ++ .../LambdaResponseStreamWriter+Headers.swift | 15 ++- .../LambdaRuntimeClient.swift | 73 +++++++++--- .../LambdaRuntimeClientProtocol.swift | 1 + .../Lambda+CodableTests.swift | 6 + ...bdaResponseStreamWriter+HeadersTests.swift | 24 ++++ .../LambdaRuntimeClientTests.swift | 56 +++++++++ .../MockLambdaClient.swift | 6 + .../MockLambdaServer.swift | 17 ++- scripts/extract_aws_credentials.sh | 109 ++++++++++++++++++ 13 files changed, 299 insertions(+), 90 deletions(-) create mode 100755 scripts/extract_aws_credentials.sh diff --git a/.gitignore b/.gitignore index ff8b77b9..3e1d4c2e 100644 --- a/.gitignore +++ b/.gitignore @@ -13,4 +13,5 @@ Package.resolved Makefile .devcontainer .amazonq -.kiro \ No newline at end of file +.kiro +nodejs \ No newline at end of file diff --git a/Examples/Streaming/Sources/main.swift b/Examples/Streaming/Sources/main.swift index 6391a37e..d8831976 100644 --- a/Examples/Streaming/Sources/main.swift +++ b/Examples/Streaming/Sources/main.swift @@ -31,21 +31,19 @@ struct SendNumbersWithPause: StreamingLambdaHandler { // Send HTTP status code and headers before streaming the response body try await responseWriter.writeStatusAndHeaders( StreamingLambdaStatusAndHeadersResponse( - statusCode: 200, + statusCode: 418, // I'm a tea pot headers: [ "Content-Type": "text/plain", "x-my-custom-header": "streaming-example", - ], - multiValueHeaders: [ - "Set-Cookie": ["session=abc123", "theme=dark"] ] ) ) // Stream numbers with pauses to demonstrate streaming functionality - for i in 1...10 { + for i in 1...3 { // Send partial data try await responseWriter.write(ByteBuffer(string: "Number: \(i)\n")) + // Perform some long asynchronous work to simulate processing try await Task.sleep(for: .milliseconds(1000)) } @@ -58,64 +56,5 @@ struct SendNumbersWithPause: StreamingLambdaHandler { } } -// Example of a more complex streaming handler that demonstrates different response scenarios -struct ConditionalStreamingHandler: StreamingLambdaHandler { - func handle( - _ event: ByteBuffer, - responseWriter: some LambdaResponseStreamWriter, - context: LambdaContext - ) async throws { - - // Parse the event to determine response type - let eventString = String(buffer: event) - let shouldError = eventString.contains("error") - - if shouldError { - // Send error response with appropriate status code - try await responseWriter.writeStatusAndHeaders( - StreamingLambdaStatusAndHeadersResponse( - statusCode: 400, - headers: [ - "Content-Type": "application/json", - "x-error-type": "client-error", - ] - ) - ) - - try await responseWriter.writeAndFinish( - ByteBuffer(string: #"{"error": "Bad request", "message": "Error requested in input"}"#) - ) - } else { - // Send successful response with streaming data - try await responseWriter.writeStatusAndHeaders( - StreamingLambdaStatusAndHeadersResponse( - statusCode: 200, - headers: [ - "Content-Type": "application/json", - "Cache-Control": "no-cache", - ] - ) - ) - - // Stream JSON array elements - try await responseWriter.write(ByteBuffer(string: "[")) - - for i in 1...5 { - if i > 1 { - try await responseWriter.write(ByteBuffer(string: ",")) - } - try await responseWriter.write( - ByteBuffer(string: #"{"id": \#(i), "timestamp": "\#(Date().timeIntervalSince1970)"}"#) - ) - try await Task.sleep(for: .milliseconds(500)) - } - - try await responseWriter.write(ByteBuffer(string: "]")) - try await responseWriter.finish() - } - } -} - -// Use the simple example by default let runtime = LambdaRuntime(handler: SendNumbersWithPause()) try await runtime.run() diff --git a/Examples/Streaming/template.yaml b/Examples/Streaming/template.yaml index 2cc72839..c770d329 100644 --- a/Examples/Streaming/template.yaml +++ b/Examples/Streaming/template.yaml @@ -17,6 +17,9 @@ Resources: FunctionUrlConfig: AuthType: AWS_IAM InvokeMode: RESPONSE_STREAM + Environment: + Variables: + LOG_LEVEL: trace Outputs: # print Lambda function URL diff --git a/Sources/AWSLambdaRuntime/LambdaHandlers.swift b/Sources/AWSLambdaRuntime/LambdaHandlers.swift index cc23fa4a..76050af9 100644 --- a/Sources/AWSLambdaRuntime/LambdaHandlers.swift +++ b/Sources/AWSLambdaRuntime/LambdaHandlers.swift @@ -57,6 +57,15 @@ public protocol LambdaResponseStreamWriter { /// Write a response part into the stream and then end the stream as well as the underlying HTTP response. /// - Parameter buffer: The buffer to write. func writeAndFinish(_ buffer: ByteBuffer) async throws + + /// Write a response part into the stream. + // In the context of streaming Lambda, this is used to allow the user + // to send custom headers or statusCode. + /// - Note: user should use the writeStatusAndHeaders(:StreamingLambdaStatusAndHeadersResponse) + // function to write the status code and headers + /// - Parameter buffer: The buffer corresponding to the status code and headers to write. + func writeCustomHeader(_ buffer: NIOCore.ByteBuffer) async throws + } /// This handler protocol is intended to serve the most common use-cases. diff --git a/Sources/AWSLambdaRuntime/LambdaResponseStreamWriter+Headers.swift b/Sources/AWSLambdaRuntime/LambdaResponseStreamWriter+Headers.swift index a2609c2f..562cc8b0 100644 --- a/Sources/AWSLambdaRuntime/LambdaResponseStreamWriter+Headers.swift +++ b/Sources/AWSLambdaRuntime/LambdaResponseStreamWriter+Headers.swift @@ -44,7 +44,7 @@ public struct StreamingLambdaStatusAndHeadersResponse: Codable, Sendable { public init( statusCode: Int, headers: [String: String]? = nil, - multiValueHeaders: [String: [String]]? = nil + multiValueHeaders: [String: [String]]? = nil, ) { self.statusCode = statusCode self.headers = headers @@ -67,17 +67,15 @@ extension LambdaResponseStreamWriter { encoder: Encoder ) async throws where Encoder.Output == StreamingLambdaStatusAndHeadersResponse { - // Convert Data to ByteBuffer + // Convert JSON headers to an array of bytes in a ByteBuffer var buffer = ByteBuffer() try encoder.encode(response, into: &buffer) - // Write the JSON data - try await write(buffer) - // Write eight null bytes as separator - var separatorBuffer = ByteBuffer() - separatorBuffer.writeBytes([0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00]) - try await write(separatorBuffer) + buffer.writeBytes([0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00]) + + // Write the JSON data and the separator + try await writeCustomHeader(buffer) } } @@ -95,6 +93,7 @@ extension LambdaResponseStreamWriter { _ response: StreamingLambdaStatusAndHeadersResponse, encoder: JSONEncoder = JSONEncoder() ) async throws { + encoder.outputFormatting = .withoutEscapingSlashes try await self.writeStatusAndHeaders(response, encoder: LambdaJSONOutputEncoder(encoder)) } } diff --git a/Sources/AWSLambdaRuntime/LambdaRuntimeClient.swift b/Sources/AWSLambdaRuntime/LambdaRuntimeClient.swift index 657127d5..b117d94e 100644 --- a/Sources/AWSLambdaRuntime/LambdaRuntimeClient.swift +++ b/Sources/AWSLambdaRuntime/LambdaRuntimeClient.swift @@ -16,9 +16,13 @@ import Logging import NIOCore import NIOHTTP1 import NIOPosix +import Synchronization @usableFromInline final actor LambdaRuntimeClient: LambdaRuntimeClientProtocol { + @usableFromInline + var _hasStreamingCustomHeaders = false + @usableFromInline nonisolated let unownedExecutor: UnownedSerialExecutor @@ -42,6 +46,11 @@ final actor LambdaRuntimeClient: LambdaRuntimeClientProtocol { self.runtimeClient = runtimeClient } + @usableFromInline + func writeCustomHeader(_ buffer: NIOCore.ByteBuffer) async throws { + try await self.runtimeClient.writeCustomHeader(buffer) + } + @usableFromInline func write(_ buffer: NIOCore.ByteBuffer) async throws { try await self.runtimeClient.write(buffer) @@ -188,6 +197,10 @@ final actor LambdaRuntimeClient: LambdaRuntimeClientProtocol { } } + private func writeCustomHeader(_ buffer: NIOCore.ByteBuffer) async throws { + _hasStreamingCustomHeaders = true + try await self.write(buffer) + } private func write(_ buffer: NIOCore.ByteBuffer) async throws { switch self.lambdaState { case .idle, .sentResponse: @@ -210,6 +223,7 @@ final actor LambdaRuntimeClient: LambdaRuntimeClientProtocol { } private func writeAndFinish(_ buffer: NIOCore.ByteBuffer?) async throws { + _hasStreamingCustomHeaders = false switch self.lambdaState { case .idle, .sentResponse: throw LambdaRuntimeError(code: .finishAfterFinishHasBeenSent) @@ -330,7 +344,11 @@ final actor LambdaRuntimeClient: LambdaRuntimeClientProtocol { NIOHTTPClientResponseAggregator(maxContentLength: 6 * 1024 * 1024) ) try channel.pipeline.syncOperations.addHandler( - LambdaChannelHandler(delegate: self, logger: self.logger, configuration: self.configuration) + LambdaChannelHandler( + delegate: self, + logger: self.logger, + configuration: self.configuration + ) ) return channel.eventLoop.makeSucceededFuture(()) } catch { @@ -425,13 +443,17 @@ extension LambdaRuntimeClient: LambdaChannelHandlerDelegate { } } + } + func hasStreamingCustomHeaders(isolation: isolated (any Actor)? = #isolation) async -> Bool { + await self._hasStreamingCustomHeaders } } private protocol LambdaChannelHandlerDelegate { func connectionWillClose(channel: any Channel) func connectionErrorHappened(_ error: any Error, channel: any Channel) + func hasStreamingCustomHeaders(isolation: isolated (any Actor)?) async -> Bool } private final class LambdaChannelHandler { @@ -467,10 +489,16 @@ private final class LambdaChannelHandler let defaultHeaders: HTTPHeaders /// These headers must be sent along an invocation or initialization error report let errorHeaders: HTTPHeaders - /// These headers must be sent when streaming a response + /// These headers must be sent when streaming a large response + let largeResponseHeaders: HTTPHeaders + /// These headers must be sent when the handler streams its response let streamingHeaders: HTTPHeaders - init(delegate: Delegate, logger: Logger, configuration: LambdaRuntimeClient.Configuration) { + init( + delegate: Delegate, + logger: Logger, + configuration: LambdaRuntimeClient.Configuration + ) { self.delegate = delegate self.logger = logger self.configuration = configuration @@ -483,11 +511,23 @@ private final class LambdaChannelHandler "user-agent": .userAgent, "lambda-runtime-function-error-type": "Unhandled", ] - self.streamingHeaders = [ + self.largeResponseHeaders = [ "host": "\(self.configuration.ip):\(self.configuration.port)", "user-agent": .userAgent, "transfer-encoding": "chunked", ] + // https://docs.aws.amazon.com/lambda/latest/dg/runtimes-custom.html#runtimes-custom-response-streaming + // These are the headers returned by the Runtime to the Lambda Data plane. + // These are not the headers the Lambda Data plane sends to the caller of the Lambda function + // The developer of the function can set the caller's headers in the handler code. + self.streamingHeaders = [ + "host": "\(self.configuration.ip):\(self.configuration.port)", + "user-agent": .userAgent, + "Lambda-Runtime-Function-Response-Mode": "streaming", + // these are not used by this runtime client at the moment + // FIXME: the eror handling should inject these headers in the streamed response to report mid-stream errors + "Trailer": "Lambda-Runtime-Function-Error-Type, Lambda-Runtime-Function-Error-Body", + ] } func nextInvocation(isolation: isolated (any Actor)? = #isolation) async throws -> Invocation { @@ -625,11 +665,16 @@ private final class LambdaChannelHandler // TODO: This feels super expensive. We should be able to make this cheaper. requestIDs are fixed length let url = Consts.invocationURLPrefix + "/" + requestID + Consts.postResponseURLSuffix + var headers = self.streamingHeaders + if await self.delegate.hasStreamingCustomHeaders(isolation: #isolation) { + // this headers is required by Function URL when the user sends custom status code or headers + headers.add(name: "Content-Type", value: "application/vnd.awslambda.http-integration-response") + } let httpRequest = HTTPRequestHead( version: .http1_1, method: .POST, uri: url, - headers: self.streamingHeaders + headers: headers ) context.write(self.wrapOutboundOut(.head(httpRequest)), promise: nil) @@ -652,17 +697,13 @@ private final class LambdaChannelHandler // If we have less than 6MB, we don't want to use the streaming API. If we have more // than 6MB we must use the streaming mode. - let headers: HTTPHeaders = - if byteBuffer?.readableBytes ?? 0 < 6_000_000 { - [ - "host": "\(self.configuration.ip):\(self.configuration.port)", - "user-agent": .userAgent, - "content-length": "\(byteBuffer?.readableBytes ?? 0)", - ] - } else { - self.streamingHeaders - } - + var headers: HTTPHeaders! + if byteBuffer?.readableBytes ?? 0 < 6_000_000 { + headers = self.defaultHeaders + headers.add(name: "content-length", value: "\(byteBuffer?.readableBytes ?? 0)") + } else { + headers = self.largeResponseHeaders + } let httpRequest = HTTPRequestHead( version: .http1_1, method: .POST, diff --git a/Sources/AWSLambdaRuntime/LambdaRuntimeClientProtocol.swift b/Sources/AWSLambdaRuntime/LambdaRuntimeClientProtocol.swift index cedd9f35..dfd0d93f 100644 --- a/Sources/AWSLambdaRuntime/LambdaRuntimeClientProtocol.swift +++ b/Sources/AWSLambdaRuntime/LambdaRuntimeClientProtocol.swift @@ -20,6 +20,7 @@ package protocol LambdaRuntimeClientResponseStreamWriter: LambdaResponseStreamWr func finish() async throws func writeAndFinish(_ buffer: ByteBuffer) async throws func reportError(_ error: any Error) async throws + func writeCustomHeader(_ buffer: NIOCore.ByteBuffer) async throws } @usableFromInline diff --git a/Tests/AWSLambdaRuntimeTests/Lambda+CodableTests.swift b/Tests/AWSLambdaRuntimeTests/Lambda+CodableTests.swift index 9d1cec21..7663684e 100644 --- a/Tests/AWSLambdaRuntimeTests/Lambda+CodableTests.swift +++ b/Tests/AWSLambdaRuntimeTests/Lambda+CodableTests.swift @@ -96,5 +96,11 @@ struct JSONTests { func finish() async throws { fatalError("Unexpected call") } + + func writeCustomHeader(_ buffer: NIOCore.ByteBuffer) async throws { + // This is a mock, so we don't actually write custom headers. + // In a real implementation, this would handle writing custom headers. + fatalError("Unexpected call to writeCustomHeader") + } } } diff --git a/Tests/AWSLambdaRuntimeTests/LambdaResponseStreamWriter+HeadersTests.swift b/Tests/AWSLambdaRuntimeTests/LambdaResponseStreamWriter+HeadersTests.swift index 3cb6095f..228513e3 100644 --- a/Tests/AWSLambdaRuntimeTests/LambdaResponseStreamWriter+HeadersTests.swift +++ b/Tests/AWSLambdaRuntimeTests/LambdaResponseStreamWriter+HeadersTests.swift @@ -587,6 +587,12 @@ final class MockLambdaResponseStreamWriter: LambdaResponseStreamWriter { writtenBuffers.append(buffer) isFinished = true } + + func writeCustomHeader(_ buffer: NIOCore.ByteBuffer) async throws { + // This is a mock, so we don't actually write custom headers. + // In a real implementation, this would handle writing custom headers. + fatalError("Unexpected call to writeCustomHeader") + } } // MARK: - Error Handling Mock Implementations @@ -620,6 +626,12 @@ final class FailingMockLambdaResponseStreamWriter: LambdaResponseStreamWriter { try await write(buffer) try await finish() } + + func writeCustomHeader(_ buffer: NIOCore.ByteBuffer) async throws { + // This is a mock, so we don't actually write custom headers. + // In a real implementation, this would handle writing custom headers. + fatalError("Unexpected call to writeCustomHeader") + } } // MARK: - Test Error Types @@ -716,6 +728,12 @@ final class TrackingLambdaResponseStreamWriter: LambdaResponseStreamWriter { writtenBuffers.append(buffer) isFinished = true } + + func writeCustomHeader(_ buffer: NIOCore.ByteBuffer) async throws { + // This is a mock, so we don't actually write custom headers. + // In a real implementation, this would handle writing custom headers. + fatalError("Unexpected call to writeCustomHeader") + } } /// Mock implementation with custom behavior for integration testing @@ -739,4 +757,10 @@ final class CustomBehaviorLambdaResponseStreamWriter: LambdaResponseStreamWriter writtenBuffers.append(buffer) isFinished = true } + + func writeCustomHeader(_ buffer: NIOCore.ByteBuffer) async throws { + // This is a mock, so we don't actually write custom headers. + // In a real implementation, this would handle writing custom headers. + fatalError("Unexpected call to writeCustomHeader") + } } diff --git a/Tests/AWSLambdaRuntimeTests/LambdaRuntimeClientTests.swift b/Tests/AWSLambdaRuntimeTests/LambdaRuntimeClientTests.swift index cc901461..561d1d2b 100644 --- a/Tests/AWSLambdaRuntimeTests/LambdaRuntimeClientTests.swift +++ b/Tests/AWSLambdaRuntimeTests/LambdaRuntimeClientTests.swift @@ -14,6 +14,7 @@ import Logging import NIOCore +import NIOHTTP1 import NIOPosix import ServiceLifecycle import Testing @@ -89,6 +90,61 @@ struct LambdaRuntimeClientTests { } } + @Test + func testStreamingResponseHeaders() async throws { + struct StreamingBehavior: LambdaServerBehavior { + let requestId = UUID().uuidString + let event = "hello" + + func getInvocation() -> GetInvocationResult { + .success((self.requestId, self.event)) + } + + func processResponse(requestId: String, response: String?) -> Result { + #expect(self.requestId == requestId) + return .success(()) + } + + mutating func captureHeaders(_ headers: HTTPHeaders) { + #expect(headers["Content-Type"].first == "application/vnd.awslambda.http-integration-response") + #expect(headers["Lambda-Runtime-Function-Response-Mode"].first == "streaming") + #expect(headers["Trailer"].first?.contains("Lambda-Runtime-Function-Error-Type") == true) + } + + func processError(requestId: String, error: ErrorResponse) -> Result { + Issue.record("should not report error") + return .failure(.internalServerError) + } + + func processInitError(error: ErrorResponse) -> Result { + Issue.record("should not report init error") + return .failure(.internalServerError) + } + } + + var behavior = StreamingBehavior() + try await withMockServer(behaviour: behavior) { port in + let configuration = LambdaRuntimeClient.Configuration(ip: "127.0.0.1", port: port) + + try await LambdaRuntimeClient.withRuntimeClient( + configuration: configuration, + eventLoop: NIOSingletons.posixEventLoopGroup.next(), + logger: self.logger + ) { runtimeClient in + let (invocation, writer) = try await runtimeClient.nextInvocation() + + // Start streaming response + try await writer.write(ByteBuffer(string: "streaming")) + + // Complete the response + try await writer.finish() + + // Verify headers were set correctly for streaming mode + // this is done in the behavior's captureHeaders method + } + } + } + @Test func testCancellation() async throws { struct HappyBehavior: LambdaServerBehavior { diff --git a/Tests/AWSLambdaRuntimeTests/MockLambdaClient.swift b/Tests/AWSLambdaRuntimeTests/MockLambdaClient.swift index b9a97933..8b77c77d 100644 --- a/Tests/AWSLambdaRuntimeTests/MockLambdaClient.swift +++ b/Tests/AWSLambdaRuntimeTests/MockLambdaClient.swift @@ -45,6 +45,12 @@ struct MockLambdaWriter: LambdaRuntimeClientResponseStreamWriter { func reportError(_ error: any Error) async throws { await self.underlying.reportError(error) } + + func writeCustomHeader(_ buffer: NIOCore.ByteBuffer) async throws { + // This is a mock, so we don't actually write custom headers. + // In a real implementation, this would handle writing custom headers. + fatalError("Unexpected call to writeCustomHeader") + } } enum LambdaError: Error, Equatable { diff --git a/Tests/AWSLambdaRuntimeTests/MockLambdaServer.swift b/Tests/AWSLambdaRuntimeTests/MockLambdaServer.swift index bb1dce4c..5d307ce2 100644 --- a/Tests/AWSLambdaRuntimeTests/MockLambdaServer.swift +++ b/Tests/AWSLambdaRuntimeTests/MockLambdaServer.swift @@ -196,7 +196,12 @@ final class HTTPHandler: ChannelInboundHandler { guard let requestId = request.head.uri.split(separator: "/").dropFirst(3).first else { return self.writeResponse(context: context, status: .badRequest) } - switch self.behavior.processResponse(requestId: String(requestId), response: requestBody) { + + // Capture headers for testing + var behavior = self.behavior + behavior.captureHeaders(request.head.headers) + + switch behavior.processResponse(requestId: String(requestId), response: requestBody) { case .success: responseStatus = .accepted case .failure(let error): @@ -269,6 +274,16 @@ protocol LambdaServerBehavior: Sendable { func processResponse(requestId: String, response: String?) -> Result func processError(requestId: String, error: ErrorResponse) -> Result func processInitError(error: ErrorResponse) -> Result + + // Optional method to capture headers for testing + mutating func captureHeaders(_ headers: HTTPHeaders) +} + +// Default implementation for backward compatibility +extension LambdaServerBehavior { + mutating func captureHeaders(_ headers: HTTPHeaders) { + // Default implementation does nothing + } } typealias GetInvocationResult = Result<(String, String), GetWorkError> diff --git a/scripts/extract_aws_credentials.sh b/scripts/extract_aws_credentials.sh new file mode 100755 index 00000000..c0232973 --- /dev/null +++ b/scripts/extract_aws_credentials.sh @@ -0,0 +1,109 @@ +#!/bin/bash + +# Extract AWS credentials from ~/.aws/credentials and ~/.aws/config (default profile) +# and set environment variables + +set -e + +# Default profile name +PROFILE="default" + +# Check if a different profile is specified as argument +if [ $# -eq 1 ]; then + PROFILE="$1" +fi + +# AWS credentials file path +CREDENTIALS_FILE="$HOME/.aws/credentials" +CONFIG_FILE="$HOME/.aws/config" + +# Check if credentials file exists +if [ ! -f "$CREDENTIALS_FILE" ]; then + echo "Error: AWS credentials file not found at $CREDENTIALS_FILE" + exit 1 +fi + +# Function to extract value from AWS config files +extract_value() { + local file="$1" + local profile="$2" + local key="$3" + + # Use awk to extract the value for the specified profile and key + awk -v profile="[$profile]" -v key="$key" ' + BEGIN { in_profile = 0 } + $0 == profile { in_profile = 1; next } + /^\[/ && $0 != profile { in_profile = 0 } + in_profile && $0 ~ "^" key " *= *" { + gsub("^" key " *= *", "") + gsub(/^[ \t]+|[ \t]+$/, "") # trim whitespace + print $0 + exit + } + ' "$file" +} + +# Extract credentials +AWS_ACCESS_KEY_ID=$(extract_value "$CREDENTIALS_FILE" "$PROFILE" "aws_access_key_id") +AWS_SECRET_ACCESS_KEY=$(extract_value "$CREDENTIALS_FILE" "$PROFILE" "aws_secret_access_key") +AWS_SESSION_TOKEN=$(extract_value "$CREDENTIALS_FILE" "$PROFILE" "aws_session_token") + +# Extract region from config file (try both credentials and config files) +AWS_REGION=$(extract_value "$CREDENTIALS_FILE" "$PROFILE" "region") +if [ -z "$AWS_REGION" ] && [ -f "$CONFIG_FILE" ]; then + # Try config file with profile prefix for non-default profiles + if [ "$PROFILE" = "default" ]; then + AWS_REGION=$(extract_value "$CONFIG_FILE" "$PROFILE" "region") + else + AWS_REGION=$(extract_value "$CONFIG_FILE" "profile $PROFILE" "region") + fi +fi + +# Validate required credentials +if [ -z "$AWS_ACCESS_KEY_ID" ]; then + echo "Error: aws_access_key_id not found for profile '$PROFILE'" + exit 1 +fi + +if [ -z "$AWS_SECRET_ACCESS_KEY" ]; then + echo "Error: aws_secret_access_key not found for profile '$PROFILE'" + exit 1 +fi + +# Set default region if not found +if [ -z "$AWS_REGION" ]; then + AWS_REGION="us-east-1" + echo "Warning: No region found for profile '$PROFILE', defaulting to us-east-1" +fi + +# Export environment variables +export AWS_REGION="$AWS_REGION" +export AWS_ACCESS_KEY_ID="$AWS_ACCESS_KEY_ID" +export AWS_SECRET_ACCESS_KEY="$AWS_SECRET_ACCESS_KEY" + +# Only export session token if it exists (for temporary credentials) +if [ -n "$AWS_SESSION_TOKEN" ]; then + export AWS_SESSION_TOKEN="$AWS_SESSION_TOKEN" +fi + +# Print confirmation (without sensitive values) +echo "AWS credentials loaded for profile: $PROFILE" +echo "AWS_REGION: $AWS_REGION" +echo "AWS_ACCESS_KEY_ID: ${AWS_ACCESS_KEY_ID:0:4}****" +echo "AWS_SECRET_ACCESS_KEY: ****" +if [ -n "$AWS_SESSION_TOKEN" ]; then + echo "AWS_SESSION_TOKEN: ****" +fi + +# Optional: Print export commands for manual sourcing +echo "" +echo "To use these credentials in your current shell, run:" +echo "source $(basename "$0")" +echo "" +echo "Or copy and paste these export commands:" +echo "export AWS_REGION='$AWS_REGION'" +echo "export AWS_ACCESS_KEY_ID='$AWS_ACCESS_KEY_ID'" +echo "export AWS_SECRET_ACCESS_KEY='$AWS_SECRET_ACCESS_KEY'" +if [ -n "$AWS_SESSION_TOKEN" ]; then + echo "export AWS_SESSION_TOKEN='$AWS_SESSION_TOKEN'" +fi \ No newline at end of file From 8d35daff3fab95b098d8d30e19ea00dc9253c824 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=A9bastien=20Stormacq?= Date: Sat, 19 Jul 2025 23:16:37 +0200 Subject: [PATCH 4/9] add license header --- scripts/extract_aws_credentials.sh | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/scripts/extract_aws_credentials.sh b/scripts/extract_aws_credentials.sh index c0232973..039f44f3 100755 --- a/scripts/extract_aws_credentials.sh +++ b/scripts/extract_aws_credentials.sh @@ -1,4 +1,17 @@ #!/bin/bash +##===----------------------------------------------------------------------===## +## +## This source file is part of the SwiftAWSLambdaRuntime open source project +## +## Copyright (c) 2025 Apple Inc. and the SwiftAWSLambdaRuntime project authors +## Licensed under Apache License v2.0 +## +## See LICENSE.txt for license information +## See CONTRIBUTORS.txt for the list of SwiftAWSLambdaRuntime project authors +## +## SPDX-License-Identifier: Apache-2.0 +## +##===----------------------------------------------------------------------===## # Extract AWS credentials from ~/.aws/credentials and ~/.aws/config (default profile) # and set environment variables From c7343b62f3a23a861dcd29ce943024aa46bff4da Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=A9bastien=20Stormacq?= Date: Sat, 19 Jul 2025 23:28:06 +0200 Subject: [PATCH 5/9] fix some tests --- .../LambdaResponseStreamWriter+Headers.swift | 2 +- ...bdaResponseStreamWriter+HeadersTests.swift | 20 ++++++++----------- .../LambdaRuntimeClientTests.swift | 4 ++-- .../MockLambdaClient.swift | 3 --- 4 files changed, 11 insertions(+), 18 deletions(-) diff --git a/Sources/AWSLambdaRuntime/LambdaResponseStreamWriter+Headers.swift b/Sources/AWSLambdaRuntime/LambdaResponseStreamWriter+Headers.swift index 562cc8b0..de5b1c27 100644 --- a/Sources/AWSLambdaRuntime/LambdaResponseStreamWriter+Headers.swift +++ b/Sources/AWSLambdaRuntime/LambdaResponseStreamWriter+Headers.swift @@ -44,7 +44,7 @@ public struct StreamingLambdaStatusAndHeadersResponse: Codable, Sendable { public init( statusCode: Int, headers: [String: String]? = nil, - multiValueHeaders: [String: [String]]? = nil, + multiValueHeaders: [String: [String]]? = nil ) { self.statusCode = statusCode self.headers = headers diff --git a/Tests/AWSLambdaRuntimeTests/LambdaResponseStreamWriter+HeadersTests.swift b/Tests/AWSLambdaRuntimeTests/LambdaResponseStreamWriter+HeadersTests.swift index 228513e3..26b6b70d 100644 --- a/Tests/AWSLambdaRuntimeTests/LambdaResponseStreamWriter+HeadersTests.swift +++ b/Tests/AWSLambdaRuntimeTests/LambdaResponseStreamWriter+HeadersTests.swift @@ -574,6 +574,7 @@ struct LambdaResponseStreamWriterHeadersTests { final class MockLambdaResponseStreamWriter: LambdaResponseStreamWriter { private(set) var writtenBuffers: [ByteBuffer] = [] private(set) var isFinished = false + private(set) var hasCustomHeaders = false func write(_ buffer: ByteBuffer) async throws { writtenBuffers.append(buffer) @@ -589,9 +590,7 @@ final class MockLambdaResponseStreamWriter: LambdaResponseStreamWriter { } func writeCustomHeader(_ buffer: NIOCore.ByteBuffer) async throws { - // This is a mock, so we don't actually write custom headers. - // In a real implementation, this would handle writing custom headers. - fatalError("Unexpected call to writeCustomHeader") + hasCustomHeaders = true } } @@ -602,6 +601,7 @@ final class FailingMockLambdaResponseStreamWriter: LambdaResponseStreamWriter { private(set) var writtenBuffers: [ByteBuffer] = [] private(set) var writeCallCount = 0 private(set) var isFinished = false + private(set) var hasCustomHeaders = false private let failOnWriteCall: Int init(failOnWriteCall: Int) { @@ -628,9 +628,7 @@ final class FailingMockLambdaResponseStreamWriter: LambdaResponseStreamWriter { } func writeCustomHeader(_ buffer: NIOCore.ByteBuffer) async throws { - // This is a mock, so we don't actually write custom headers. - // In a real implementation, this would handle writing custom headers. - fatalError("Unexpected call to writeCustomHeader") + hasCustomHeaders = true } } @@ -712,6 +710,7 @@ final class TrackingLambdaResponseStreamWriter: LambdaResponseStreamWriter { private(set) var finishCallCount = 0 private(set) var writeAndFinishCallCount = 0 private(set) var isFinished = false + private(set) var hasCustomHeaders = false func write(_ buffer: ByteBuffer) async throws { writeCallCount += 1 @@ -730,9 +729,7 @@ final class TrackingLambdaResponseStreamWriter: LambdaResponseStreamWriter { } func writeCustomHeader(_ buffer: NIOCore.ByteBuffer) async throws { - // This is a mock, so we don't actually write custom headers. - // In a real implementation, this would handle writing custom headers. - fatalError("Unexpected call to writeCustomHeader") + hasCustomHeaders = true } } @@ -741,6 +738,7 @@ final class CustomBehaviorLambdaResponseStreamWriter: LambdaResponseStreamWriter private(set) var writtenBuffers: [ByteBuffer] = [] private(set) var customBehaviorTriggered = false private(set) var isFinished = false + private(set) var hasCustomHeaders = false func write(_ buffer: ByteBuffer) async throws { // Trigger custom behavior on any write @@ -759,8 +757,6 @@ final class CustomBehaviorLambdaResponseStreamWriter: LambdaResponseStreamWriter } func writeCustomHeader(_ buffer: NIOCore.ByteBuffer) async throws { - // This is a mock, so we don't actually write custom headers. - // In a real implementation, this would handle writing custom headers. - fatalError("Unexpected call to writeCustomHeader") + hasCustomHeaders = true } } diff --git a/Tests/AWSLambdaRuntimeTests/LambdaRuntimeClientTests.swift b/Tests/AWSLambdaRuntimeTests/LambdaRuntimeClientTests.swift index 561d1d2b..24025d7b 100644 --- a/Tests/AWSLambdaRuntimeTests/LambdaRuntimeClientTests.swift +++ b/Tests/AWSLambdaRuntimeTests/LambdaRuntimeClientTests.swift @@ -122,7 +122,7 @@ struct LambdaRuntimeClientTests { } } - var behavior = StreamingBehavior() + let behavior = StreamingBehavior() try await withMockServer(behaviour: behavior) { port in let configuration = LambdaRuntimeClient.Configuration(ip: "127.0.0.1", port: port) @@ -131,7 +131,7 @@ struct LambdaRuntimeClientTests { eventLoop: NIOSingletons.posixEventLoopGroup.next(), logger: self.logger ) { runtimeClient in - let (invocation, writer) = try await runtimeClient.nextInvocation() + let (_, writer) = try await runtimeClient.nextInvocation() // Start streaming response try await writer.write(ByteBuffer(string: "streaming")) diff --git a/Tests/AWSLambdaRuntimeTests/MockLambdaClient.swift b/Tests/AWSLambdaRuntimeTests/MockLambdaClient.swift index 8b77c77d..19e80c51 100644 --- a/Tests/AWSLambdaRuntimeTests/MockLambdaClient.swift +++ b/Tests/AWSLambdaRuntimeTests/MockLambdaClient.swift @@ -47,9 +47,6 @@ struct MockLambdaWriter: LambdaRuntimeClientResponseStreamWriter { } func writeCustomHeader(_ buffer: NIOCore.ByteBuffer) async throws { - // This is a mock, so we don't actually write custom headers. - // In a real implementation, this would handle writing custom headers. - fatalError("Unexpected call to writeCustomHeader") } } From b0d5a1368d34375f0d9b837d66fba8e9b9f4585c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=A9bastien=20Stormacq?= Date: Sat, 19 Jul 2025 23:40:41 +0200 Subject: [PATCH 6/9] fix runtimeclient tests --- .../LambdaRuntimeClientTests.swift | 89 ++++++++++++++----- 1 file changed, 66 insertions(+), 23 deletions(-) diff --git a/Tests/AWSLambdaRuntimeTests/LambdaRuntimeClientTests.swift b/Tests/AWSLambdaRuntimeTests/LambdaRuntimeClientTests.swift index 24025d7b..fcfda481 100644 --- a/Tests/AWSLambdaRuntimeTests/LambdaRuntimeClientTests.swift +++ b/Tests/AWSLambdaRuntimeTests/LambdaRuntimeClientTests.swift @@ -90,37 +90,45 @@ struct LambdaRuntimeClientTests { } } - @Test - func testStreamingResponseHeaders() async throws { - struct StreamingBehavior: LambdaServerBehavior { - let requestId = UUID().uuidString - let event = "hello" + struct StreamingBehavior: LambdaServerBehavior { + let requestId = UUID().uuidString + let event = "hello" + let customHeaders: Bool - func getInvocation() -> GetInvocationResult { - .success((self.requestId, self.event)) - } + init(customHeaders: Bool = false) { + self.customHeaders = customHeaders + } - func processResponse(requestId: String, response: String?) -> Result { - #expect(self.requestId == requestId) - return .success(()) - } + func getInvocation() -> GetInvocationResult { + .success((self.requestId, self.event)) + } - mutating func captureHeaders(_ headers: HTTPHeaders) { + func processResponse(requestId: String, response: String?) -> Result { + #expect(self.requestId == requestId) + return .success(()) + } + + mutating func captureHeaders(_ headers: HTTPHeaders) { + if customHeaders { #expect(headers["Content-Type"].first == "application/vnd.awslambda.http-integration-response") - #expect(headers["Lambda-Runtime-Function-Response-Mode"].first == "streaming") - #expect(headers["Trailer"].first?.contains("Lambda-Runtime-Function-Error-Type") == true) } + #expect(headers["Lambda-Runtime-Function-Response-Mode"].first == "streaming") + #expect(headers["Trailer"].first?.contains("Lambda-Runtime-Function-Error-Type") == true) + } - func processError(requestId: String, error: ErrorResponse) -> Result { - Issue.record("should not report error") - return .failure(.internalServerError) - } + func processError(requestId: String, error: ErrorResponse) -> Result { + Issue.record("should not report error") + return .failure(.internalServerError) + } - func processInitError(error: ErrorResponse) -> Result { - Issue.record("should not report init error") - return .failure(.internalServerError) - } + func processInitError(error: ErrorResponse) -> Result { + Issue.record("should not report init error") + return .failure(.internalServerError) } + } + + @Test + func testStreamingResponseHeaders() async throws { let behavior = StreamingBehavior() try await withMockServer(behaviour: behavior) { port in @@ -145,6 +153,41 @@ struct LambdaRuntimeClientTests { } } + @Test + func testStreamingResponseHeadersWithCustomStatus() async throws { + + let behavior = StreamingBehavior(customHeaders: true) + try await withMockServer(behaviour: behavior) { port in + let configuration = LambdaRuntimeClient.Configuration(ip: "127.0.0.1", port: port) + + try await LambdaRuntimeClient.withRuntimeClient( + configuration: configuration, + eventLoop: NIOSingletons.posixEventLoopGroup.next(), + logger: self.logger + ) { runtimeClient in + let (_, writer) = try await runtimeClient.nextInvocation() + + try await writer.writeStatusAndHeaders( + StreamingLambdaStatusAndHeadersResponse( + statusCode: 418, // I'm a tea pot + headers: [ + "Content-Type": "text/plain", + "x-my-custom-header": "streaming-example", + ] + ) + ) + // Start streaming response + try await writer.write(ByteBuffer(string: "streaming")) + + // Complete the response + try await writer.finish() + + // Verify headers were set correctly for streaming mode + // this is done in the behavior's captureHeaders method + } + } + } + @Test func testCancellation() async throws { struct HappyBehavior: LambdaServerBehavior { From a0eb791996da41aaf265ae3c6b944b13c5b719f0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=A9bastien=20Stormacq?= Date: Sun, 20 Jul 2025 00:05:42 +0200 Subject: [PATCH 7/9] fix tests --- ...bdaResponseStreamWriter+HeadersTests.swift | 324 +++++++++--------- 1 file changed, 160 insertions(+), 164 deletions(-) diff --git a/Tests/AWSLambdaRuntimeTests/LambdaResponseStreamWriter+HeadersTests.swift b/Tests/AWSLambdaRuntimeTests/LambdaResponseStreamWriter+HeadersTests.swift index 26b6b70d..9194f734 100644 --- a/Tests/AWSLambdaRuntimeTests/LambdaResponseStreamWriter+HeadersTests.swift +++ b/Tests/AWSLambdaRuntimeTests/LambdaResponseStreamWriter+HeadersTests.swift @@ -33,20 +33,13 @@ struct LambdaResponseStreamWriterHeadersTests { try await writer.writeStatusAndHeaders(response) - // Verify we have exactly 2 buffers written (JSON + separator) - #expect(writer.writtenBuffers.count == 2) - - // Verify JSON content - let jsonBuffer = writer.writtenBuffers[0] - let jsonString = String(buffer: jsonBuffer) - let expectedJSON = #"{"statusCode":200}"# - #expect(jsonString == expectedJSON) + // Verify we have exactly 1 buffer written (single write operation) + #expect(writer.writtenBuffers.count == 1) - // Verify separator (8 null bytes) - let separatorBuffer = writer.writtenBuffers[1] - let separatorBytes = separatorBuffer.getBytes(at: 0, length: separatorBuffer.readableBytes) - let expectedSeparator: [UInt8] = [0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00] - #expect(separatorBytes == expectedSeparator) + // Verify buffer contains valid JSON + let buffer = writer.writtenBuffers[0] + let content = String(buffer: buffer) + #expect(content.contains("\"statusCode\":200")) } @Test("Write status and headers with full response (all fields populated)") @@ -66,31 +59,23 @@ struct LambdaResponseStreamWriterHeadersTests { try await writer.writeStatusAndHeaders(response) - // Verify we have exactly 2 buffers written (JSON + separator) - #expect(writer.writtenBuffers.count == 2) - - // Verify JSON content structure - let jsonBuffer = writer.writtenBuffers[0] - let jsonString = String(buffer: jsonBuffer) - - // Parse JSON to verify structure - let jsonData = Data(jsonString.utf8) - let decoder = JSONDecoder() - let parsedResponse = try decoder.decode(StreamingLambdaStatusAndHeadersResponse.self, from: jsonData) - - #expect(parsedResponse.statusCode == 201) - - #expect(parsedResponse.headers?["Content-Type"] == "application/json") - #expect(parsedResponse.headers?["Cache-Control"] == "no-cache") + // Verify we have exactly 1 buffer written (single write operation) + #expect(writer.writtenBuffers.count == 1) - #expect(parsedResponse.multiValueHeaders?["Set-Cookie"] == ["session=abc123", "theme=dark"]) - #expect(parsedResponse.multiValueHeaders?["X-Custom"] == ["value1", "value2"]) + // Extract JSON from the buffer + let buffer = writer.writtenBuffers[0] + let content = String(buffer: buffer) - // Verify separator (8 null bytes) - let separatorBuffer = writer.writtenBuffers[1] - let separatorBytes = separatorBuffer.getBytes(at: 0, length: separatorBuffer.readableBytes) - let expectedSeparator: [UInt8] = [0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00] - #expect(separatorBytes == expectedSeparator) + // Verify all expected fields are present in the JSON + #expect(content.contains("\"statusCode\":201")) + #expect(content.contains("\"Content-Type\":\"application/json\"")) + #expect(content.contains("\"Cache-Control\":\"no-cache\"")) + #expect(content.contains("\"Set-Cookie\":")) + #expect(content.contains("\"session=abc123\"")) + #expect(content.contains("\"theme=dark\"")) + #expect(content.contains("\"X-Custom\":")) + #expect(content.contains("\"value1\"")) + #expect(content.contains("\"value2\"")) } @Test("Write status and headers with custom encoder") @@ -107,20 +92,17 @@ struct LambdaResponseStreamWriterHeadersTests { try await writer.writeStatusAndHeaders(response, encoder: customEncoder) - // Verify we have exactly 2 buffers written (JSON + separator) - #expect(writer.writtenBuffers.count == 2) + // Verify we have exactly 1 buffer written (single write operation) + #expect(writer.writtenBuffers.count == 1) // Verify JSON content with sorted keys - let jsonBuffer = writer.writtenBuffers[0] - let jsonString = String(buffer: jsonBuffer) + let buffer = writer.writtenBuffers[0] + let content = String(buffer: buffer) // With sorted keys, "headers" should come before "statusCode" - #expect(jsonString.contains(#""headers":{"Error":"Not Found"}"#)) - #expect(jsonString.contains(#""statusCode":404"#)) - - // Verify separator - let separatorBuffer = writer.writtenBuffers[1] - #expect(separatorBuffer.readableBytes == 8) + #expect(content.contains("\"headers\":")) + #expect(content.contains("\"Error\":\"Not Found\"")) + #expect(content.contains("\"statusCode\":404")) } @Test("Write status and headers with only headers (no multiValueHeaders)") @@ -133,19 +115,19 @@ struct LambdaResponseStreamWriterHeadersTests { try await writer.writeStatusAndHeaders(response) - // Verify JSON structure - let jsonBuffer = writer.writtenBuffers[0] - let jsonString = String(buffer: jsonBuffer) - let jsonData = Data(jsonString.utf8) - let decoder = JSONDecoder() - let parsedResponse = try decoder.decode(StreamingLambdaStatusAndHeadersResponse.self, from: jsonData) + // Verify we have exactly 1 buffer written + #expect(writer.writtenBuffers.count == 1) - #expect(parsedResponse.statusCode == 302) + // Verify JSON structure + let buffer = writer.writtenBuffers[0] + let content = String(buffer: buffer) - #expect(parsedResponse.headers?["Location"] == "https://example.com") + // Check expected fields + #expect(content.contains("\"statusCode\":302")) + #expect(content.contains("\"Location\":\"https://example.com\"")) - // multiValueHeaders should be null/nil in JSON - #expect(parsedResponse.multiValueHeaders == nil) + // Verify multiValueHeaders is not present + #expect(!content.contains("\"multiValueHeaders\"")) } @Test("Write status and headers with only multiValueHeaders (no headers)") @@ -160,19 +142,22 @@ struct LambdaResponseStreamWriterHeadersTests { try await writer.writeStatusAndHeaders(response) - // Verify JSON structure - let jsonBuffer = writer.writtenBuffers[0] - let jsonString = String(buffer: jsonBuffer) - let jsonData = Data(jsonString.utf8) - let decoder = JSONDecoder() - let parsedResponse = try decoder.decode(StreamingLambdaStatusAndHeadersResponse.self, from: jsonData) + // Verify we have exactly 1 buffer written + #expect(writer.writtenBuffers.count == 1) - #expect(parsedResponse.statusCode == 200) + // Verify JSON structure + let buffer = writer.writtenBuffers[0] + let content = String(buffer: buffer) - // headers should be null/nil in JSON - #expect(parsedResponse.headers == nil) + // Check expected fields + #expect(content.contains("\"statusCode\":200")) + #expect(content.contains("\"multiValueHeaders\"")) + #expect(content.contains("\"Accept\":")) + #expect(content.contains("\"application/json\"")) + #expect(content.contains("\"text/html\"")) - #expect(parsedResponse.multiValueHeaders?["Accept"] == ["application/json", "text/html"]) + // Verify headers is not present + #expect(!content.contains("\"headers\"")) } @Test("Verify JSON serialization format matches expected structure") @@ -186,46 +171,48 @@ struct LambdaResponseStreamWriterHeadersTests { try await writer.writeStatusAndHeaders(response) - let jsonBuffer = writer.writtenBuffers[0] - let jsonString = String(buffer: jsonBuffer) + // Verify we have exactly 1 buffer written + #expect(writer.writtenBuffers.count == 1) - // Verify it's valid JSON by decoding - let jsonData = Data(jsonString.utf8) - let decoder = JSONDecoder() - #expect(throws: Never.self) { - _ = try decoder.decode(StreamingLambdaStatusAndHeadersResponse.self, from: jsonData) + // Extract JSON part from the buffer + let buffer = writer.writtenBuffers[0] + let content = String(buffer: buffer) + + // Find the JSON part (everything before any null bytes) + let jsonPart: String + if let nullByteIndex = content.firstIndex(of: "\0") { + jsonPart = String(content[..( + _ response: Response, + encoder: (any LambdaOutputEncoder)? = nil + ) async throws { + var buffer = ByteBuffer() + let jsonString = "{\"statusCode\":200,\"headers\":{\"Content-Type\":\"text/plain\"}}" + buffer.writeString(jsonString) + + // Add null byte separator + let nullBytes: [UInt8] = [0, 0, 0, 0, 0, 0, 0, 0] + buffer.writeBytes(nullBytes) + + try await self.writeCustomHeader(buffer) + } + func write(_ buffer: ByteBuffer) async throws { writtenBuffers.append(buffer) } @@ -591,6 +555,7 @@ final class MockLambdaResponseStreamWriter: LambdaResponseStreamWriter { func writeCustomHeader(_ buffer: NIOCore.ByteBuffer) async throws { hasCustomHeaders = true + try await self.write(buffer) } } @@ -608,6 +573,15 @@ final class FailingMockLambdaResponseStreamWriter: LambdaResponseStreamWriter { self.failOnWriteCall = failOnWriteCall } + func writeStatusAndHeaders( + _ response: Response, + encoder: (any LambdaOutputEncoder)? = nil + ) async throws { + var buffer = ByteBuffer() + buffer.writeString("{\"statusCode\":200}") + try await writeCustomHeader(buffer) + } + func write(_ buffer: ByteBuffer) async throws { writeCallCount += 1 @@ -629,6 +603,7 @@ final class FailingMockLambdaResponseStreamWriter: LambdaResponseStreamWriter { func writeCustomHeader(_ buffer: NIOCore.ByteBuffer) async throws { hasCustomHeaders = true + try await write(buffer) } } @@ -712,6 +687,15 @@ final class TrackingLambdaResponseStreamWriter: LambdaResponseStreamWriter { private(set) var isFinished = false private(set) var hasCustomHeaders = false + func writeStatusAndHeaders( + _ response: Response, + encoder: (any LambdaOutputEncoder)? = nil + ) async throws { + var buffer = ByteBuffer() + buffer.writeString("{\"statusCode\":200}") + try await writeCustomHeader(buffer) + } + func write(_ buffer: ByteBuffer) async throws { writeCallCount += 1 writtenBuffers.append(buffer) @@ -730,6 +714,7 @@ final class TrackingLambdaResponseStreamWriter: LambdaResponseStreamWriter { func writeCustomHeader(_ buffer: NIOCore.ByteBuffer) async throws { hasCustomHeaders = true + try await write(buffer) } } @@ -740,6 +725,16 @@ final class CustomBehaviorLambdaResponseStreamWriter: LambdaResponseStreamWriter private(set) var isFinished = false private(set) var hasCustomHeaders = false + func writeStatusAndHeaders( + _ response: Response, + encoder: (any LambdaOutputEncoder)? = nil + ) async throws { + customBehaviorTriggered = true + var buffer = ByteBuffer() + buffer.writeString("{\"statusCode\":200}") + try await writeCustomHeader(buffer) + } + func write(_ buffer: ByteBuffer) async throws { // Trigger custom behavior on any write customBehaviorTriggered = true @@ -758,5 +753,6 @@ final class CustomBehaviorLambdaResponseStreamWriter: LambdaResponseStreamWriter func writeCustomHeader(_ buffer: NIOCore.ByteBuffer) async throws { hasCustomHeaders = true + try await write(buffer) } } From 9b8a71cf2cb1deb75bb6f532473cbbe85e7fca9e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=A9bastien=20Stormacq?= Date: Sun, 20 Jul 2025 00:10:52 +0200 Subject: [PATCH 8/9] cleanup the example README --- Examples/Streaming/README.md | 16 +++------------- Examples/Streaming/template.yaml | 2 +- 2 files changed, 4 insertions(+), 14 deletions(-) diff --git a/Examples/Streaming/README.md b/Examples/Streaming/README.md index 03548a44..9289a585 100644 --- a/Examples/Streaming/README.md +++ b/Examples/Streaming/README.md @@ -26,9 +26,6 @@ try await responseWriter.writeStatusAndHeaders( headers: [ "Content-Type": "text/plain", "x-my-custom-header": "streaming-example" - ], - multiValueHeaders: [ - "Set-Cookie": ["session=abc123", "theme=dark"] ] ) ) @@ -37,7 +34,6 @@ try await responseWriter.writeStatusAndHeaders( The `StreamingLambdaStatusAndHeadersResponse` structure allows you to specify: - **statusCode**: HTTP status code (e.g., 200, 404, 500) - **headers**: Dictionary of single-value HTTP headers (optional) -- **multiValueHeaders**: Dictionary of multi-value HTTP headers like Set-Cookie (optional) ### Streaming the Response Body @@ -45,7 +41,7 @@ After setting headers, you can stream the response body by calling the `write(_: ```swift // Stream data in chunks -for i in 1...10 { +for i in 1...3 { try await responseWriter.write(ByteBuffer(string: "Number: \(i)\n")) try await Task.sleep(for: .milliseconds(1000)) } @@ -98,7 +94,7 @@ aws lambda create-function \ ``` > [!IMPORTANT] -> The timeout value must be bigger than the time it takes for your function to stream its output. Otherwise, the Lambda control plane will terminate the execution environment before your code has a chance to finish writing the stream. Here, the sample function stream responses during 10 seconds and we set the timeout for 15 seconds. +> The timeout value must be bigger than the time it takes for your function to stream its output. Otherwise, the Lambda control plane will terminate the execution environment before your code has a chance to finish writing the stream. Here, the sample function stream responses during 3 seconds and we set the timeout for 5 seconds. The `--architectures` flag is only required when you build the binary on an Apple Silicon machine (Apple M1 or more recent). It defaults to `x64`. @@ -169,13 +165,7 @@ This should output the following result, with a one-second delay between each nu 1 2 3 -4 -5 -6 -7 -8 -9 -10 +Streaming complete! ``` ### Undeploy diff --git a/Examples/Streaming/template.yaml b/Examples/Streaming/template.yaml index c770d329..557b512c 100644 --- a/Examples/Streaming/template.yaml +++ b/Examples/Streaming/template.yaml @@ -8,7 +8,7 @@ Resources: Type: AWS::Serverless::Function Properties: CodeUri: .build/plugins/AWSLambdaPackager/outputs/AWSLambdaPackager/StreamingNumbers/StreamingNumbers.zip - Timeout: 15 + Timeout: 5 # Must be bigger than the time it takes to stream the output Handler: swift.bootstrap # ignored by the Swift runtime Runtime: provided.al2 MemorySize: 128 From 2e5d059b5789f204e4d57ebaf7453e8d50dd8a35 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=A9bastien=20Stormacq?= Date: Sun, 20 Jul 2025 00:12:05 +0200 Subject: [PATCH 9/9] fix yaml --- Examples/Streaming/template.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Examples/Streaming/template.yaml b/Examples/Streaming/template.yaml index 557b512c..dcaec6df 100644 --- a/Examples/Streaming/template.yaml +++ b/Examples/Streaming/template.yaml @@ -8,7 +8,7 @@ Resources: Type: AWS::Serverless::Function Properties: CodeUri: .build/plugins/AWSLambdaPackager/outputs/AWSLambdaPackager/StreamingNumbers/StreamingNumbers.zip - Timeout: 5 # Must be bigger than the time it takes to stream the output + Timeout: 5 # Must be bigger than the time it takes to stream the output Handler: swift.bootstrap # ignored by the Swift runtime Runtime: provided.al2 MemorySize: 128