Skip to content

[core] Implement Lambda streaming with custom HTTP headers #521

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 20 commits into from
Jul 24, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,4 @@ Package.resolved
.vscode
Makefile
.devcontainer
.amazonq
samconfig.toml
.amazonq
58 changes: 46 additions & 12 deletions Examples/Streaming/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,55 @@ The sample code creates a `SendNumbersWithPause` struct that conforms to the `St

The `handle(...)` method of this protocol receives incoming events as a Swift NIO `ByteBuffer` and returns the output as a `ByteBuffer`.

The response is streamed through the `LambdaResponseStreamWriter`, which is passed as an argument in the `handle` function. The code calls the `write(_:)` function of the `LambdaResponseStreamWriter` with partial data repeatedly written before
finally closing the response stream by calling `finish()`. Developers can also choose to return the entire output and not
stream the response by calling `writeAndFinish(_:)`.
The response is streamed through the `LambdaResponseStreamWriter`, which is passed as an argument in the `handle` function.

### Setting HTTP Status Code and Headers

Before streaming the response body, you can set the HTTP status code and headers using the `writeStatusAndHeaders(_:)` method:

```swift
try await responseWriter.writeStatusAndHeaders(
StreamingLambdaStatusAndHeadersResponse(
statusCode: 200,
headers: [
"Content-Type": "text/plain",
"x-my-custom-header": "streaming-example"
]
)
)
```

The `StreamingLambdaStatusAndHeadersResponse` structure allows you to specify:
- **statusCode**: HTTP status code (e.g., 200, 404, 500)
- **headers**: Dictionary of single-value HTTP headers (optional)

### Streaming the Response Body

After setting headers, you can stream the response body by calling the `write(_:)` function of the `LambdaResponseStreamWriter` with partial data repeatedly before finally closing the response stream by calling `finish()`. Developers can also choose to return the entire output and not stream the response by calling `writeAndFinish(_:)`.

```swift
// Stream data in chunks
for i in 1...3 {
try await responseWriter.write(ByteBuffer(string: "Number: \(i)\n"))
try await Task.sleep(for: .milliseconds(1000))
}

// Close the response stream
try await responseWriter.finish()
```

An error is thrown if `finish()` is called multiple times or if it is called after having called `writeAndFinish(_:)`.

### Example Usage Patterns

The example includes two handler implementations:

1. **SendNumbersWithPause**: Demonstrates basic streaming with headers, sending numbers with delays
2. **ConditionalStreamingHandler**: Shows how to handle different response scenarios, including error responses with appropriate status codes

The `handle(...)` method is marked as `mutating` to allow handlers to be implemented with a `struct`.

Once the struct is created and the `handle(...)` method is defined, the sample code creates a `LambdaRuntime` struct and initializes it with the handler just created. Then, the code calls `run()` to start the interaction with the AWS Lambda control plane.
Once the struct is created and the `handle(...)` method is defined, the sample code creates a `LambdaRuntime` struct and initializes it with the handler just created. Then, the code calls `run()` to start the interaction with the AWS Lambda control plane.

## Build & Package

Expand Down Expand Up @@ -68,7 +108,7 @@ aws lambda create-function \
```

> [!IMPORTANT]
> The timeout value must be bigger than the time it takes for your function to stream its output. Otherwise, the Lambda control plane will terminate the execution environment before your code has a chance to finish writing the stream. Here, the sample function stream responses during 10 seconds and we set the timeout for 15 seconds.
> The timeout value must be bigger than the time it takes for your function to stream its output. Otherwise, the Lambda control plane will terminate the execution environment before your code has a chance to finish writing the stream. Here, the sample function stream responses during 3 seconds and we set the timeout for 5 seconds.

The `--architectures` flag is only required when you build the binary on an Apple Silicon machine (Apple M1 or more recent). It defaults to `x64`.

Expand Down Expand Up @@ -139,13 +179,7 @@ This should output the following result, with a one-second delay between each nu
1
2
3
4
5
6
7
8
9
10
Streaming complete!
```

### Undeploy
Expand Down
32 changes: 28 additions & 4 deletions Examples/Streaming/Sources/main.swift
Original file line number Diff line number Diff line change
Expand Up @@ -15,22 +15,46 @@
import AWSLambdaRuntime
import NIOCore

#if canImport(FoundationEssentials)
import FoundationEssentials
#else
import Foundation
#endif

struct SendNumbersWithPause: StreamingLambdaHandler {
func handle(
_ event: ByteBuffer,
responseWriter: some LambdaResponseStreamWriter,
context: LambdaContext
) async throws {
for i in 1...10 {

// Send HTTP status code and headers before streaming the response body
try await responseWriter.writeStatusAndHeaders(
StreamingLambdaStatusAndHeadersResponse(
statusCode: 418, // I'm a tea pot
headers: [
"Content-Type": "text/plain",
"x-my-custom-header": "streaming-example",
]
)
)

// Stream numbers with pauses to demonstrate streaming functionality
for i in 1...3 {
// Send partial data
try await responseWriter.write(ByteBuffer(string: "\(i)\n"))
// Perform some long asynchronous work
try await responseWriter.write(ByteBuffer(string: "Number: \(i)\n"))

// Perform some long asynchronous work to simulate processing
try await Task.sleep(for: .milliseconds(1000))
}

// Send final message
try await responseWriter.write(ByteBuffer(string: "Streaming complete!\n"))

// All data has been sent. Close off the response stream.
try await responseWriter.finish()
}
}

let runtime = LambdaRuntime.init(handler: SendNumbersWithPause())
let runtime = LambdaRuntime(handler: SendNumbersWithPause())
try await runtime.run()
5 changes: 4 additions & 1 deletion Examples/Streaming/template.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ Resources:
Type: AWS::Serverless::Function
Properties:
CodeUri: .build/plugins/AWSLambdaPackager/outputs/AWSLambdaPackager/StreamingNumbers/StreamingNumbers.zip
Timeout: 15
Timeout: 5 # Must be bigger than the time it takes to stream the output
Handler: swift.bootstrap # ignored by the Swift runtime
Runtime: provided.al2
MemorySize: 128
Expand All @@ -17,6 +17,9 @@ Resources:
FunctionUrlConfig:
AuthType: AWS_IAM
InvokeMode: RESPONSE_STREAM
Environment:
Variables:
LOG_LEVEL: trace

Outputs:
# print Lambda function URL
Expand Down
2 changes: 1 addition & 1 deletion Sources/AWSLambdaRuntime/LambdaHandlers.swift
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ public protocol StreamingLambdaHandler: _Lambda_SendableMetatype {
public protocol LambdaResponseStreamWriter {
/// Write a response part into the stream. Bytes written are streamed continually.
/// - Parameter buffer: The buffer to write.
func write(_ buffer: ByteBuffer) async throws
func write(_ buffer: ByteBuffer, hasCustomHeaders: Bool) async throws

/// End the response stream and the underlying HTTP response.
func finish() async throws
Expand Down
107 changes: 107 additions & 0 deletions Sources/AWSLambdaRuntime/LambdaResponseStreamWriter+Headers.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
//===----------------------------------------------------------------------===//
//
// This source file is part of the SwiftAWSLambdaRuntime open source project
//
// Copyright (c) 2017-2024 Apple Inc. and the SwiftAWSLambdaRuntime project authors
// Licensed under Apache License v2.0
//
// See LICENSE.txt for license information
// See CONTRIBUTORS.txt for the list of SwiftAWSLambdaRuntime project authors
//
// SPDX-License-Identifier: Apache-2.0
//
//===----------------------------------------------------------------------===//

import NIOCore

#if canImport(FoundationEssentials)
import FoundationEssentials
#else
import Foundation
#endif

/// A response structure specifically designed for streaming Lambda responses that contains
/// HTTP status code and headers without body content.
///
/// This structure is used with `LambdaResponseStreamWriter.writeStatusAndHeaders(_:)` to send
/// HTTP response metadata before streaming the response body.
public struct StreamingLambdaStatusAndHeadersResponse: Codable, Sendable {
/// The HTTP status code for the response (e.g., 200, 404, 500)
public let statusCode: Int

/// Dictionary of single-value HTTP headers
public let headers: [String: String]?
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any reason we aren't use swift-http-types here?

Copy link
Contributor Author

@sebsto sebsto Jul 21, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because there is no dependency on HTTPTypes in this library and I did not want to pull a new dependency just for this. The data format is well controlled. It's between the Swift runtime and the Lambda data plane.


/// Dictionary of multi-value HTTP headers (e.g., Set-Cookie headers)
public let multiValueHeaders: [String: [String]]?

/// Creates a new streaming Lambda response with status code and optional headers
///
/// - Parameters:
/// - statusCode: The HTTP status code for the response
/// - headers: Optional dictionary of single-value HTTP headers
/// - multiValueHeaders: Optional dictionary of multi-value HTTP headers
public init(
statusCode: Int,
headers: [String: String]? = nil,
multiValueHeaders: [String: [String]]? = nil
) {
self.statusCode = statusCode
self.headers = headers
self.multiValueHeaders = multiValueHeaders
}
}

extension LambdaResponseStreamWriter {
/// Writes the HTTP status code and headers to the response stream.
///
/// This method serializes the status and headers as JSON and writes them to the stream,
/// followed by eight null bytes as a separator before the response body.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there any documentation anywhere we can link to that provides a reason for writing 8 null bytes

Copy link
Contributor Author

@sebsto sebsto Jul 21, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is undocumented at the moment I received the information from an SDM in the Lambda service team and I have confirmed by looking at the NodeJS runtime implementation.
https://github.com/aws/aws-lambda-nodejs-runtime-interface-client/blob/main/src/HttpResponseStream.js

The value of the "magic" header is here
https://github.com/aws/aws-lambda-nodejs-runtime-interface-client/blob/a5ae1c2a92708e81c9df4949c60fd9e1e6e46bed/src/HttpResponseStream.js#L17

The 8 x 0 bytes are defined here
https://github.com/aws/aws-lambda-nodejs-runtime-interface-client/blob/a5ae1c2a92708e81c9df4949c60fd9e1e6e46bed/src/HttpResponseStream.js#L26

BTW, at the moment, the NodeJS runtime is the only runtime supporting this capability. When we will release the Swift runtime, it will be second to offer this possibility, before all the AWS managed runtime

///
/// - Parameters:
/// - response: The status and headers response to write
/// - encoder: The encoder to use for serializing the response,
/// - Throws: An error if JSON serialization or writing fails
public func writeStatusAndHeaders<Encoder: LambdaOutputEncoder>(
_ response: StreamingLambdaStatusAndHeadersResponse,
encoder: Encoder
) async throws where Encoder.Output == StreamingLambdaStatusAndHeadersResponse {

// Convert JSON headers to an array of bytes in a ByteBuffer
var buffer = ByteBuffer()
try encoder.encode(response, into: &buffer)

// Write eight null bytes as separator
buffer.writeBytes([0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00])

// Write the JSON data and the separator
try await self.write(buffer, hasCustomHeaders: true)
}

/// Write a response part into the stream. Bytes written are streamed continually.
/// This implementation avoids having to modify all the tests and other part of the code that use this function signature
/// - Parameter buffer: The buffer to write.
public func write(_ buffer: ByteBuffer) async throws {
// Write the buffer to the response stream
try await self.write(buffer, hasCustomHeaders: false)
}
}

extension LambdaResponseStreamWriter {
/// Writes the HTTP status code and headers to the response stream.
///
/// This method serializes the status and headers as JSON and writes them to the stream,
/// followed by eight null bytes as a separator before the response body.
///
/// - Parameters:
/// - response: The status and headers response to write
/// - encoder: The encoder to use for serializing the response, use JSONEncoder by default
/// - Throws: An error if JSON serialization or writing fails
public func writeStatusAndHeaders(
_ response: StreamingLambdaStatusAndHeadersResponse,
encoder: JSONEncoder = JSONEncoder()
) async throws {
encoder.outputFormatting = .withoutEscapingSlashes
try await self.writeStatusAndHeaders(response, encoder: LambdaJSONOutputEncoder(encoder))
}
}
Loading
Loading