Skip to content

Commit 9287d56

Browse files
authored
[core] Implement Lambda streaming with custom HTTP headers (#521)
Fix #520
1 parent 412a345 commit 9287d56

16 files changed

+1292
-59
lines changed

.gitignore

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,5 +12,4 @@ Package.resolved
1212
.vscode
1313
Makefile
1414
.devcontainer
15-
.amazonq
16-
samconfig.toml
15+
.amazonq

Examples/Streaming/README.md

Lines changed: 46 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -13,15 +13,55 @@ The sample code creates a `SendNumbersWithPause` struct that conforms to the `St
1313

1414
The `handle(...)` method of this protocol receives incoming events as a Swift NIO `ByteBuffer` and returns the output as a `ByteBuffer`.
1515

16-
The response is streamed through the `LambdaResponseStreamWriter`, which is passed as an argument in the `handle` function. The code calls the `write(_:)` function of the `LambdaResponseStreamWriter` with partial data repeatedly written before
17-
finally closing the response stream by calling `finish()`. Developers can also choose to return the entire output and not
18-
stream the response by calling `writeAndFinish(_:)`.
16+
The response is streamed through the `LambdaResponseStreamWriter`, which is passed as an argument in the `handle` function.
17+
18+
### Setting HTTP Status Code and Headers
19+
20+
Before streaming the response body, you can set the HTTP status code and headers using the `writeStatusAndHeaders(_:)` method:
21+
22+
```swift
23+
try await responseWriter.writeStatusAndHeaders(
24+
StreamingLambdaStatusAndHeadersResponse(
25+
statusCode: 200,
26+
headers: [
27+
"Content-Type": "text/plain",
28+
"x-my-custom-header": "streaming-example"
29+
]
30+
)
31+
)
32+
```
33+
34+
The `StreamingLambdaStatusAndHeadersResponse` structure allows you to specify:
35+
- **statusCode**: HTTP status code (e.g., 200, 404, 500)
36+
- **headers**: Dictionary of single-value HTTP headers (optional)
37+
38+
### Streaming the Response Body
39+
40+
After setting headers, you can stream the response body by calling the `write(_:)` function of the `LambdaResponseStreamWriter` with partial data repeatedly before finally closing the response stream by calling `finish()`. Developers can also choose to return the entire output and not stream the response by calling `writeAndFinish(_:)`.
41+
42+
```swift
43+
// Stream data in chunks
44+
for i in 1...3 {
45+
try await responseWriter.write(ByteBuffer(string: "Number: \(i)\n"))
46+
try await Task.sleep(for: .milliseconds(1000))
47+
}
48+
49+
// Close the response stream
50+
try await responseWriter.finish()
51+
```
1952

2053
An error is thrown if `finish()` is called multiple times or if it is called after having called `writeAndFinish(_:)`.
2154

55+
### Example Usage Patterns
56+
57+
The example includes two handler implementations:
58+
59+
1. **SendNumbersWithPause**: Demonstrates basic streaming with headers, sending numbers with delays
60+
2. **ConditionalStreamingHandler**: Shows how to handle different response scenarios, including error responses with appropriate status codes
61+
2262
The `handle(...)` method is marked as `mutating` to allow handlers to be implemented with a `struct`.
2363

24-
Once the struct is created and the `handle(...)` method is defined, the sample code creates a `LambdaRuntime` struct and initializes it with the handler just created. Then, the code calls `run()` to start the interaction with the AWS Lambda control plane.
64+
Once the struct is created and the `handle(...)` method is defined, the sample code creates a `LambdaRuntime` struct and initializes it with the handler just created. Then, the code calls `run()` to start the interaction with the AWS Lambda control plane.
2565

2666
## Build & Package
2767

@@ -68,7 +108,7 @@ aws lambda create-function \
68108
```
69109

70110
> [!IMPORTANT]
71-
> The timeout value must be bigger than the time it takes for your function to stream its output. Otherwise, the Lambda control plane will terminate the execution environment before your code has a chance to finish writing the stream. Here, the sample function stream responses during 10 seconds and we set the timeout for 15 seconds.
111+
> The timeout value must be bigger than the time it takes for your function to stream its output. Otherwise, the Lambda control plane will terminate the execution environment before your code has a chance to finish writing the stream. Here, the sample function stream responses during 3 seconds and we set the timeout for 5 seconds.
72112
73113
The `--architectures` flag is only required when you build the binary on an Apple Silicon machine (Apple M1 or more recent). It defaults to `x64`.
74114

@@ -139,13 +179,7 @@ This should output the following result, with a one-second delay between each nu
139179
1
140180
2
141181
3
142-
4
143-
5
144-
6
145-
7
146-
8
147-
9
148-
10
182+
Streaming complete!
149183
```
150184

151185
### Undeploy

Examples/Streaming/Sources/main.swift

Lines changed: 28 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,22 +15,46 @@
1515
import AWSLambdaRuntime
1616
import NIOCore
1717

18+
#if canImport(FoundationEssentials)
19+
import FoundationEssentials
20+
#else
21+
import Foundation
22+
#endif
23+
1824
struct SendNumbersWithPause: StreamingLambdaHandler {
1925
func handle(
2026
_ event: ByteBuffer,
2127
responseWriter: some LambdaResponseStreamWriter,
2228
context: LambdaContext
2329
) async throws {
24-
for i in 1...10 {
30+
31+
// Send HTTP status code and headers before streaming the response body
32+
try await responseWriter.writeStatusAndHeaders(
33+
StreamingLambdaStatusAndHeadersResponse(
34+
statusCode: 418, // I'm a tea pot
35+
headers: [
36+
"Content-Type": "text/plain",
37+
"x-my-custom-header": "streaming-example",
38+
]
39+
)
40+
)
41+
42+
// Stream numbers with pauses to demonstrate streaming functionality
43+
for i in 1...3 {
2544
// Send partial data
26-
try await responseWriter.write(ByteBuffer(string: "\(i)\n"))
27-
// Perform some long asynchronous work
45+
try await responseWriter.write(ByteBuffer(string: "Number: \(i)\n"))
46+
47+
// Perform some long asynchronous work to simulate processing
2848
try await Task.sleep(for: .milliseconds(1000))
2949
}
50+
51+
// Send final message
52+
try await responseWriter.write(ByteBuffer(string: "Streaming complete!\n"))
53+
3054
// All data has been sent. Close off the response stream.
3155
try await responseWriter.finish()
3256
}
3357
}
3458

35-
let runtime = LambdaRuntime.init(handler: SendNumbersWithPause())
59+
let runtime = LambdaRuntime(handler: SendNumbersWithPause())
3660
try await runtime.run()

Examples/Streaming/template.yaml

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ Resources:
88
Type: AWS::Serverless::Function
99
Properties:
1010
CodeUri: .build/plugins/AWSLambdaPackager/outputs/AWSLambdaPackager/StreamingNumbers/StreamingNumbers.zip
11-
Timeout: 15
11+
Timeout: 5 # Must be bigger than the time it takes to stream the output
1212
Handler: swift.bootstrap # ignored by the Swift runtime
1313
Runtime: provided.al2
1414
MemorySize: 128
@@ -17,6 +17,9 @@ Resources:
1717
FunctionUrlConfig:
1818
AuthType: AWS_IAM
1919
InvokeMode: RESPONSE_STREAM
20+
Environment:
21+
Variables:
22+
LOG_LEVEL: trace
2023

2124
Outputs:
2225
# print Lambda function URL

Sources/AWSLambdaRuntime/LambdaHandlers.swift

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ public protocol StreamingLambdaHandler: _Lambda_SendableMetatype {
4949
public protocol LambdaResponseStreamWriter {
5050
/// Write a response part into the stream. Bytes written are streamed continually.
5151
/// - Parameter buffer: The buffer to write.
52-
func write(_ buffer: ByteBuffer) async throws
52+
func write(_ buffer: ByteBuffer, hasCustomHeaders: Bool) async throws
5353

5454
/// End the response stream and the underlying HTTP response.
5555
func finish() async throws
Lines changed: 107 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,107 @@
1+
//===----------------------------------------------------------------------===//
2+
//
3+
// This source file is part of the SwiftAWSLambdaRuntime open source project
4+
//
5+
// Copyright (c) 2017-2024 Apple Inc. and the SwiftAWSLambdaRuntime project authors
6+
// Licensed under Apache License v2.0
7+
//
8+
// See LICENSE.txt for license information
9+
// See CONTRIBUTORS.txt for the list of SwiftAWSLambdaRuntime project authors
10+
//
11+
// SPDX-License-Identifier: Apache-2.0
12+
//
13+
//===----------------------------------------------------------------------===//
14+
15+
import NIOCore
16+
17+
#if canImport(FoundationEssentials)
18+
import FoundationEssentials
19+
#else
20+
import Foundation
21+
#endif
22+
23+
/// A response structure specifically designed for streaming Lambda responses that contains
24+
/// HTTP status code and headers without body content.
25+
///
26+
/// This structure is used with `LambdaResponseStreamWriter.writeStatusAndHeaders(_:)` to send
27+
/// HTTP response metadata before streaming the response body.
28+
public struct StreamingLambdaStatusAndHeadersResponse: Codable, Sendable {
29+
/// The HTTP status code for the response (e.g., 200, 404, 500)
30+
public let statusCode: Int
31+
32+
/// Dictionary of single-value HTTP headers
33+
public let headers: [String: String]?
34+
35+
/// Dictionary of multi-value HTTP headers (e.g., Set-Cookie headers)
36+
public let multiValueHeaders: [String: [String]]?
37+
38+
/// Creates a new streaming Lambda response with status code and optional headers
39+
///
40+
/// - Parameters:
41+
/// - statusCode: The HTTP status code for the response
42+
/// - headers: Optional dictionary of single-value HTTP headers
43+
/// - multiValueHeaders: Optional dictionary of multi-value HTTP headers
44+
public init(
45+
statusCode: Int,
46+
headers: [String: String]? = nil,
47+
multiValueHeaders: [String: [String]]? = nil
48+
) {
49+
self.statusCode = statusCode
50+
self.headers = headers
51+
self.multiValueHeaders = multiValueHeaders
52+
}
53+
}
54+
55+
extension LambdaResponseStreamWriter {
56+
/// Writes the HTTP status code and headers to the response stream.
57+
///
58+
/// This method serializes the status and headers as JSON and writes them to the stream,
59+
/// followed by eight null bytes as a separator before the response body.
60+
///
61+
/// - Parameters:
62+
/// - response: The status and headers response to write
63+
/// - encoder: The encoder to use for serializing the response,
64+
/// - Throws: An error if JSON serialization or writing fails
65+
public func writeStatusAndHeaders<Encoder: LambdaOutputEncoder>(
66+
_ response: StreamingLambdaStatusAndHeadersResponse,
67+
encoder: Encoder
68+
) async throws where Encoder.Output == StreamingLambdaStatusAndHeadersResponse {
69+
70+
// Convert JSON headers to an array of bytes in a ByteBuffer
71+
var buffer = ByteBuffer()
72+
try encoder.encode(response, into: &buffer)
73+
74+
// Write eight null bytes as separator
75+
buffer.writeBytes([0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00])
76+
77+
// Write the JSON data and the separator
78+
try await self.write(buffer, hasCustomHeaders: true)
79+
}
80+
81+
/// Write a response part into the stream. Bytes written are streamed continually.
82+
/// This implementation avoids having to modify all the tests and other part of the code that use this function signature
83+
/// - Parameter buffer: The buffer to write.
84+
public func write(_ buffer: ByteBuffer) async throws {
85+
// Write the buffer to the response stream
86+
try await self.write(buffer, hasCustomHeaders: false)
87+
}
88+
}
89+
90+
extension LambdaResponseStreamWriter {
91+
/// Writes the HTTP status code and headers to the response stream.
92+
///
93+
/// This method serializes the status and headers as JSON and writes them to the stream,
94+
/// followed by eight null bytes as a separator before the response body.
95+
///
96+
/// - Parameters:
97+
/// - response: The status and headers response to write
98+
/// - encoder: The encoder to use for serializing the response, use JSONEncoder by default
99+
/// - Throws: An error if JSON serialization or writing fails
100+
public func writeStatusAndHeaders(
101+
_ response: StreamingLambdaStatusAndHeadersResponse,
102+
encoder: JSONEncoder = JSONEncoder()
103+
) async throws {
104+
encoder.outputFormatting = .withoutEscapingSlashes
105+
try await self.writeStatusAndHeaders(response, encoder: LambdaJSONOutputEncoder(encoder))
106+
}
107+
}

0 commit comments

Comments
 (0)