Skip to content

Commit 01fa243

Browse files
committed
Add support for streaming custom HTTP headers and status
1 parent dede067 commit 01fa243

File tree

5 files changed

+1024
-7
lines changed

5 files changed

+1024
-7
lines changed

Examples/Streaming/README.md

Lines changed: 48 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -13,15 +13,59 @@ The sample code creates a `SendNumbersWithPause` struct that conforms to the `St
1313

1414
The `handle(...)` method of this protocol receives incoming events as a Swift NIO `ByteBuffer` and returns the output as a `ByteBuffer`.
1515

16-
The response is streamed through the `LambdaResponseStreamWriter`, which is passed as an argument in the `handle` function. The code calls the `write(_:)` function of the `LambdaResponseStreamWriter` with partial data repeatedly written before
17-
finally closing the response stream by calling `finish()`. Developers can also choose to return the entire output and not
18-
stream the response by calling `writeAndFinish(_:)`.
16+
The response is streamed through the `LambdaResponseStreamWriter`, which is passed as an argument in the `handle` function.
17+
18+
### Setting HTTP Status Code and Headers
19+
20+
Before streaming the response body, you can set the HTTP status code and headers using the `writeStatusAndHeaders(_:)` method:
21+
22+
```swift
23+
try await responseWriter.writeStatusAndHeaders(
24+
StreamingLambdaStatusAndHeadersResponse(
25+
statusCode: 200,
26+
headers: [
27+
"Content-Type": "text/plain",
28+
"x-my-custom-header": "streaming-example"
29+
],
30+
multiValueHeaders: [
31+
"Set-Cookie": ["session=abc123", "theme=dark"]
32+
]
33+
)
34+
)
35+
```
36+
37+
The `StreamingLambdaStatusAndHeadersResponse` structure allows you to specify:
38+
- **statusCode**: HTTP status code (e.g., 200, 404, 500)
39+
- **headers**: Dictionary of single-value HTTP headers (optional)
40+
- **multiValueHeaders**: Dictionary of multi-value HTTP headers like Set-Cookie (optional)
41+
42+
### Streaming the Response Body
43+
44+
After setting headers, you can stream the response body by calling the `write(_:)` function of the `LambdaResponseStreamWriter` with partial data repeatedly before finally closing the response stream by calling `finish()`. Developers can also choose to return the entire output and not stream the response by calling `writeAndFinish(_:)`.
45+
46+
```swift
47+
// Stream data in chunks
48+
for i in 1...10 {
49+
try await responseWriter.write(ByteBuffer(string: "Number: \(i)\n"))
50+
try await Task.sleep(for: .milliseconds(1000))
51+
}
52+
53+
// Close the response stream
54+
try await responseWriter.finish()
55+
```
1956

2057
An error is thrown if `finish()` is called multiple times or if it is called after having called `writeAndFinish(_:)`.
2158

59+
### Example Usage Patterns
60+
61+
The example includes two handler implementations:
62+
63+
1. **SendNumbersWithPause**: Demonstrates basic streaming with headers, sending numbers with delays
64+
2. **ConditionalStreamingHandler**: Shows how to handle different response scenarios, including error responses with appropriate status codes
65+
2266
The `handle(...)` method is marked as `mutating` to allow handlers to be implemented with a `struct`.
2367

24-
Once the struct is created and the `handle(...)` method is defined, the sample code creates a `LambdaRuntime` struct and initializes it with the handler just created. Then, the code calls `run()` to start the interaction with the AWS Lambda control plane.
68+
Once the struct is created and the `handle(...)` method is defined, the sample code creates a `LambdaRuntime` struct and initializes it with the handler just created. Then, the code calls `run()` to start the interaction with the AWS Lambda control plane.
2569

2670
## Build & Package
2771

Examples/Streaming/Sources/main.swift

Lines changed: 88 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,22 +15,107 @@
1515
import AWSLambdaRuntime
1616
import NIOCore
1717

18+
#if canImport(FoundationEssentials)
19+
import FoundationEssentials
20+
#else
21+
import Foundation
22+
#endif
23+
1824
struct SendNumbersWithPause: StreamingLambdaHandler {
1925
func handle(
2026
_ event: ByteBuffer,
2127
responseWriter: some LambdaResponseStreamWriter,
2228
context: LambdaContext
2329
) async throws {
30+
31+
// Send HTTP status code and headers before streaming the response body
32+
try await responseWriter.writeStatusAndHeaders(
33+
StreamingLambdaStatusAndHeadersResponse(
34+
statusCode: 200,
35+
headers: [
36+
"Content-Type": "text/plain",
37+
"x-my-custom-header": "streaming-example",
38+
],
39+
multiValueHeaders: [
40+
"Set-Cookie": ["session=abc123", "theme=dark"]
41+
]
42+
)
43+
)
44+
45+
// Stream numbers with pauses to demonstrate streaming functionality
2446
for i in 1...10 {
2547
// Send partial data
26-
try await responseWriter.write(ByteBuffer(string: "\(i)\n"))
27-
// Perform some long asynchronous work
48+
try await responseWriter.write(ByteBuffer(string: "Number: \(i)\n"))
49+
// Perform some long asynchronous work to simulate processing
2850
try await Task.sleep(for: .milliseconds(1000))
2951
}
52+
53+
// Send final message
54+
try await responseWriter.write(ByteBuffer(string: "Streaming complete!\n"))
55+
3056
// All data has been sent. Close off the response stream.
3157
try await responseWriter.finish()
3258
}
3359
}
3460

35-
let runtime = LambdaRuntime.init(handler: SendNumbersWithPause())
61+
// Example of a more complex streaming handler that demonstrates different response scenarios
62+
struct ConditionalStreamingHandler: StreamingLambdaHandler {
63+
func handle(
64+
_ event: ByteBuffer,
65+
responseWriter: some LambdaResponseStreamWriter,
66+
context: LambdaContext
67+
) async throws {
68+
69+
// Parse the event to determine response type
70+
let eventString = String(buffer: event)
71+
let shouldError = eventString.contains("error")
72+
73+
if shouldError {
74+
// Send error response with appropriate status code
75+
try await responseWriter.writeStatusAndHeaders(
76+
StreamingLambdaStatusAndHeadersResponse(
77+
statusCode: 400,
78+
headers: [
79+
"Content-Type": "application/json",
80+
"x-error-type": "client-error",
81+
]
82+
)
83+
)
84+
85+
try await responseWriter.writeAndFinish(
86+
ByteBuffer(string: #"{"error": "Bad request", "message": "Error requested in input"}"#)
87+
)
88+
} else {
89+
// Send successful response with streaming data
90+
try await responseWriter.writeStatusAndHeaders(
91+
StreamingLambdaStatusAndHeadersResponse(
92+
statusCode: 200,
93+
headers: [
94+
"Content-Type": "application/json",
95+
"Cache-Control": "no-cache",
96+
]
97+
)
98+
)
99+
100+
// Stream JSON array elements
101+
try await responseWriter.write(ByteBuffer(string: "["))
102+
103+
for i in 1...5 {
104+
if i > 1 {
105+
try await responseWriter.write(ByteBuffer(string: ","))
106+
}
107+
try await responseWriter.write(
108+
ByteBuffer(string: #"{"id": \#(i), "timestamp": "\#(Date().timeIntervalSince1970)"}"#)
109+
)
110+
try await Task.sleep(for: .milliseconds(500))
111+
}
112+
113+
try await responseWriter.write(ByteBuffer(string: "]"))
114+
try await responseWriter.finish()
115+
}
116+
}
117+
}
118+
119+
// Use the simple example by default
120+
let runtime = LambdaRuntime(handler: SendNumbersWithPause())
36121
try await runtime.run()
Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,100 @@
1+
//===----------------------------------------------------------------------===//
2+
//
3+
// This source file is part of the SwiftAWSLambdaRuntime open source project
4+
//
5+
// Copyright (c) 2017-2024 Apple Inc. and the SwiftAWSLambdaRuntime project authors
6+
// Licensed under Apache License v2.0
7+
//
8+
// See LICENSE.txt for license information
9+
// See CONTRIBUTORS.txt for the list of SwiftAWSLambdaRuntime project authors
10+
//
11+
// SPDX-License-Identifier: Apache-2.0
12+
//
13+
//===----------------------------------------------------------------------===//
14+
15+
import NIOCore
16+
17+
#if canImport(FoundationEssentials)
18+
import FoundationEssentials
19+
#else
20+
import Foundation
21+
#endif
22+
23+
/// A response structure specifically designed for streaming Lambda responses that contains
24+
/// HTTP status code and headers without body content.
25+
///
26+
/// This structure is used with `LambdaResponseStreamWriter.writeStatusAndHeaders(_:)` to send
27+
/// HTTP response metadata before streaming the response body.
28+
public struct StreamingLambdaStatusAndHeadersResponse: Codable, Sendable {
29+
/// The HTTP status code for the response (e.g., 200, 404, 500)
30+
public let statusCode: Int
31+
32+
/// Dictionary of single-value HTTP headers
33+
public let headers: [String: String]?
34+
35+
/// Dictionary of multi-value HTTP headers (e.g., Set-Cookie headers)
36+
public let multiValueHeaders: [String: [String]]?
37+
38+
/// Creates a new streaming Lambda response with status code and optional headers
39+
///
40+
/// - Parameters:
41+
/// - statusCode: The HTTP status code for the response
42+
/// - headers: Optional dictionary of single-value HTTP headers
43+
/// - multiValueHeaders: Optional dictionary of multi-value HTTP headers
44+
public init(
45+
statusCode: Int,
46+
headers: [String: String]? = nil,
47+
multiValueHeaders: [String: [String]]? = nil
48+
) {
49+
self.statusCode = statusCode
50+
self.headers = headers
51+
self.multiValueHeaders = multiValueHeaders
52+
}
53+
}
54+
55+
extension LambdaResponseStreamWriter {
56+
/// Writes the HTTP status code and headers to the response stream.
57+
///
58+
/// This method serializes the status and headers as JSON and writes them to the stream,
59+
/// followed by eight null bytes as a separator before the response body.
60+
///
61+
/// - Parameters:
62+
/// - response: The status and headers response to write
63+
/// - encoder: The encoder to use for serializing the response,
64+
/// - Throws: An error if JSON serialization or writing fails
65+
public func writeStatusAndHeaders<Encoder: LambdaOutputEncoder>(
66+
_ response: StreamingLambdaStatusAndHeadersResponse,
67+
encoder: Encoder
68+
) async throws where Encoder.Output == StreamingLambdaStatusAndHeadersResponse {
69+
70+
// Convert Data to ByteBuffer
71+
var buffer = ByteBuffer()
72+
try encoder.encode(response, into: &buffer)
73+
74+
// Write the JSON data
75+
try await write(buffer)
76+
77+
// Write eight null bytes as separator
78+
var separatorBuffer = ByteBuffer()
79+
separatorBuffer.writeBytes([0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00])
80+
try await write(separatorBuffer)
81+
}
82+
}
83+
84+
extension LambdaResponseStreamWriter {
85+
/// Writes the HTTP status code and headers to the response stream.
86+
///
87+
/// This method serializes the status and headers as JSON and writes them to the stream,
88+
/// followed by eight null bytes as a separator before the response body.
89+
///
90+
/// - Parameters:
91+
/// - response: The status and headers response to write
92+
/// - encoder: The encoder to use for serializing the response, use JSONEncoder by default
93+
/// - Throws: An error if JSON serialization or writing fails
94+
public func writeStatusAndHeaders(
95+
_ response: StreamingLambdaStatusAndHeadersResponse,
96+
encoder: JSONEncoder = JSONEncoder()
97+
) async throws {
98+
try await self.writeStatusAndHeaders(response, encoder: LambdaJSONOutputEncoder(encoder))
99+
}
100+
}

0 commit comments

Comments
 (0)