diff --git a/.github/workflows/pull_request.yml b/.github/workflows/pull_request.yml index 058a2888..3194852e 100644 --- a/.github/workflows/pull_request.yml +++ b/.github/workflows/pull_request.yml @@ -36,7 +36,7 @@ jobs: # We pass the list of examples here, but we can't pass an array as argument # Instead, we pass a String with a valid JSON array. # The workaround is mentioned here https://github.com/orgs/community/discussions/11692 - examples: "[ 'APIGateway', 'APIGateway+LambdaAuthorizer', 'BackgroundTasks', 'HelloJSON', 'HelloWorld', 'ResourcesPackaging', 'S3EventNotifier', 'S3_AWSSDK', 'S3_Soto', 'Streaming', 'Testing', 'Tutorial' ]" + examples: "[ 'APIGateway', 'APIGateway+LambdaAuthorizer', 'BackgroundTasks', 'HelloJSON', 'HelloWorld', 'ResourcesPackaging', 'S3EventNotifier', 'S3_AWSSDK', 'S3_Soto', 'Streaming', 'StreamingFromEvent', 'Testing', 'Tutorial' ]" archive_plugin_examples: "[ 'HelloWorld', 'ResourcesPackaging' ]" archive_plugin_enabled: true diff --git a/.gitignore b/.gitignore index b3b30ec1..3dadbf30 100644 --- a/.gitignore +++ b/.gitignore @@ -12,4 +12,5 @@ Package.resolved .vscode Makefile .devcontainer -.amazonq \ No newline at end of file +.amazonq +samconfig.toml \ No newline at end of file diff --git a/Examples/README.md b/Examples/README.md index 973df897..76156d6e 100644 --- a/Examples/README.md +++ b/Examples/README.md @@ -34,7 +34,9 @@ This directory contains example code for Lambda functions. - **[S3_Soto](S3_Soto/README.md)**: a Lambda function that uses [Soto](https://github.com/soto-project/soto) to invoke an [Amazon S3](https://docs.aws.amazon.com/AmazonS3/latest/userguide/Welcome.html) API (requires [AWS SAM](https://aws.amazon.com/serverless/sam/)). -- **[Streaming]**: create a Lambda function exposed as an URL. The Lambda function streams its response over time. (requires [AWS SAM](https://aws.amazon.com/serverless/sam/)). +- **[Streaming](Streaming/README.md)**: create a Lambda function exposed as an URL. The Lambda function streams its response over time. (requires [AWS SAM](https://aws.amazon.com/serverless/sam/)). + +- **[StreamingFromEvent](StreamingFromEvent/README.md)**: a Lambda function that combines JSON input decoding with response streaming capabilities, demonstrating the new streaming codable interface (requires [AWS SAM](https://aws.amazon.com/serverless/sam/) or the [AWS CLI](https://docs.aws.amazon.com/cli/latest/userguide/getting-started-install.html)). - **[Testing](Testing/README.md)**: a test suite for Lambda functions. diff --git a/Examples/Streaming/README.md b/Examples/Streaming/README.md index 86a42754..cc34b852 100644 --- a/Examples/Streaming/README.md +++ b/Examples/Streaming/README.md @@ -220,3 +220,69 @@ When done testing, you can delete the infrastructure with this command. ```bash sam delete ``` + +## Payload decoding + +The content of the input `ByteBuffer` depends on how you invoke the function: + +- when you invoke the function with the [`InvokeWithresponseStream` API](https://docs.aws.amazon.com/lambda/latest/api/API_InvokeWithResponseStream.html) to invoke the function, the function incoming payload is what you pass to the API. You can decode the `ByteBuffer` with a [`JSONDecoder.decode()`](https://developer.apple.com/documentation/foundation/jsondecoder) function call. +- when you invoke the function through a [Lambda function URL](https://docs.aws.amazon.com/lambda/latest/dg/urls-configuration.html), the incoming `ByteBuffer` contains a payload that gives developer access to the underlying HTTP call. The payload contains information about the HTTP verb used, the headers received, the authentication method and so on. The [AWS documentation contains the details](https://docs.aws.amazon.com/lambda/latest/dg/urls-invocation.html) of the payload. The [Swift lambda Event library](https://github.com/swift-server/swift-aws-lambda-events) contains a [FunctionURL Swift struct definition](https://github.com/swift-server/swift-aws-lambda-events/blob/main/Sources/AWSLambdaEvents/FunctionURL.swift) ready to use in your projects. + +Here is an example of Lambda function URL payload: + +``` +// This is an example of payload received when +// the function is invoked by a Lambda function URL. +// You can use the `FunctionURL`` structure provided by the Lambda Event library to decode this +// See, https://github.com/swift-server/swift-aws-lambda-events/blob/main/Sources/AWSLambdaEvents/FunctionURL.swift + +/* +{ + "version": "2.0", + "routeKey": "$default", + "rawPath": "/", + "rawQueryString": "", + "headers": { + "x-amzn-tls-cipher-suite": "TLS_AES_128_GCM_SHA256", + "x-amzn-tls-version": "TLSv1.3", + "x-amzn-trace-id": "Root=1-68762f44-4f6a87d1639e7fc356aa6f96", + "x-amz-date": "20250715T103651Z", + "x-forwarded-proto": "https", + "host": "zvnsvhpx7u5gn3l3euimg4jjou0jvbfe.lambda-url.us-east-1.on.aws", + "x-forwarded-port": "443", + "x-forwarded-for": "2a01:cb0c:6de:8300:a1be:8004:e31a:b9f", + "accept": "*/*", + "user-agent": "curl/8.7.1" + }, + "requestContext": { + "accountId": "0123456789", + "apiId": "zvnsvhpx7u5gn3l3euimg4jjou0jvbfe", + "authorizer": { + "iam": { + "accessKey": "AKIA....", + "accountId": "0123456789", + "callerId": "AIDA...", + "cognitoIdentity": null, + "principalOrgId": "o-rlrup7z3ao", + "userArn": "arn:aws:iam::0123456789:user/sst", + "userId": "AIDA..." + } + }, + "domainName": "zvnsvhpx7u5gn3l3euimg4jjou0jvbfe.lambda-url.us-east-1.on.aws", + "domainPrefix": "zvnsvhpx7u5gn3l3euimg4jjou0jvbfe", + "http": { + "method": "GET", + "path": "/", + "protocol": "HTTP/1.1", + "sourceIp": "2a01:...:b9f", + "userAgent": "curl/8.7.1" + }, + "requestId": "f942509a-283f-4c4f-94f8-0d4ccc4a00f8", + "routeKey": "$default", + "stage": "$default", + "time": "15/Jul/2025:10:36:52 +0000", + "timeEpoch": 1752575812081 + }, + "isBase64Encoded": false +} +``` \ No newline at end of file diff --git a/Examples/StreamingFromEvent/Package.swift b/Examples/StreamingFromEvent/Package.swift new file mode 100644 index 00000000..95c72779 --- /dev/null +++ b/Examples/StreamingFromEvent/Package.swift @@ -0,0 +1,50 @@ +// swift-tools-version: 6.0 + +import PackageDescription + +// needed for CI to test the local version of the library +import struct Foundation.URL + +let package = Package( + name: "StreamingFromEvent", + platforms: [.macOS(.v15)], + dependencies: [ + // during CI, the dependency on local version of swift-aws-lambda-runtime is added dynamically below + .package(url: "https://github.com/swift-server/swift-aws-lambda-runtime.git", branch: "main") + ], + targets: [ + .executableTarget( + name: "StreamingFromEvent", + dependencies: [ + .product(name: "AWSLambdaRuntime", package: "swift-aws-lambda-runtime") + ] + ) + ] +) + +if let localDepsPath = Context.environment["LAMBDA_USE_LOCAL_DEPS"], + localDepsPath != "", + let v = try? URL(fileURLWithPath: localDepsPath).resourceValues(forKeys: [.isDirectoryKey]), + v.isDirectory == true +{ + // when we use the local runtime as deps, let's remove the dependency added above + let indexToRemove = package.dependencies.firstIndex { dependency in + if case .sourceControl( + name: _, + location: "https://github.com/swift-server/swift-aws-lambda-runtime.git", + requirement: _ + ) = dependency.kind { + return true + } + return false + } + if let indexToRemove { + package.dependencies.remove(at: indexToRemove) + } + + // then we add the dependency on LAMBDA_USE_LOCAL_DEPS' path (typically ../..) + print("[INFO] Compiling against swift-aws-lambda-runtime located at \(localDepsPath)") + package.dependencies += [ + .package(name: "swift-aws-lambda-runtime", path: localDepsPath) + ] +} diff --git a/Examples/StreamingFromEvent/README.md b/Examples/StreamingFromEvent/README.md new file mode 100644 index 00000000..8d33f145 --- /dev/null +++ b/Examples/StreamingFromEvent/README.md @@ -0,0 +1,269 @@ +# Streaming Codable Lambda function + +This example demonstrates how to use the `StreamingLambdaHandlerWithEvent` protocol to create Lambda functions that: + +1. **Receive JSON input**: Automatically decode JSON events into Swift structs +2. **Stream responses**: Send data incrementally as it becomes available +3. **Execute background work**: Perform additional processing after the response is sent + +The example uses the streaming codable interface that combines the benefits of: +- Type-safe JSON input decoding (like regular `LambdaHandler`) +- Response streaming capabilities (like `StreamingLambdaHandler`) +- Background work execution after response completion + +Streaming responses incurs a cost. For more information, see [AWS Lambda Pricing](https://aws.amazon.com/lambda/pricing/). + +You can stream responses through [Lambda function URLs](https://docs.aws.amazon.com/lambda/latest/dg/urls-configuration.html), the AWS SDK, or using the Lambda [InvokeWithResponseStream](https://docs.aws.amazon.com/lambda/latest/dg/API_InvokeWithResponseStream.html) API. + +## Code + +The sample code creates a `StreamingFromEventHandler` struct that conforms to the `StreamingLambdaHandlerWithEvent` protocol provided by the Swift AWS Lambda Runtime. + +The `handle(...)` method of this protocol receives incoming events as a decoded Swift struct (`StreamingRequest`) and returns the output through a `LambdaResponseStreamWriter`. + +The Lambda function expects a JSON payload with the following structure: + +```json +{ + "count": 5, + "message": "Hello from streaming Lambda!", + "delayMs": 1000 +} +``` + +Where: +- `count`: Number of messages to stream (1-100) +- `message`: The message content to repeat +- `delayMs`: Optional delay between messages in milliseconds (defaults to 500ms) + +The response is streamed through the `LambdaResponseStreamWriter`, which is passed as an argument in the `handle` function. The code calls the `write(_:)` function of the `LambdaResponseStreamWriter` with partial data written repeatedly before finally closing the response stream by calling `finish()`. Developers can also choose to return the entire output and not stream the response by calling `writeAndFinish(_:)`. + +An error is thrown if `finish()` is called multiple times or if it is called after having called `writeAndFinish(_:)`. + +The `handle(...)` method is marked as `mutating` to allow handlers to be implemented with a `struct`. + +Once the struct is created and the `handle(...)` method is defined, the sample code creates a `LambdaRuntime` struct and initializes it with the handler just created. Then, the code calls `run()` to start the interaction with the AWS Lambda control plane. + +Key features demonstrated: +- **JSON Input Decoding**: The function automatically parses the JSON input into a `StreamingRequest` struct +- **Input Validation**: Validates the count parameter and returns an error message if invalid +- **Progressive Streaming**: Sends messages one by one with configurable delays +- **Timestamped Output**: Each message includes an ISO8601 timestamp +- **Background Processing**: Performs cleanup and logging after the response is complete +- **Error Handling**: Gracefully handles invalid input with descriptive error messages + +## Build & Package + +To build & archive the package, type the following commands. + +```bash +swift package archive --allow-network-connections docker +``` + +If there is no error, there is a ZIP file ready to deploy. +The ZIP file is located at `.build/plugins/AWSLambdaPackager/outputs/AWSLambdaPackager/StreamingFromEvent/StreamingFromEvent.zip` + +## Test locally + +You can test the function locally before deploying: + +```bash +swift run + +# In another terminal, test with curl: +curl -v \ + --header "Content-Type: application/json" \ + --data '{"count": 3, "message": "Hello World!", "delayMs": 1000}' \ + http://127.0.0.1:7000/invoke +``` + +Or simulate a call from a Lambda Function URL (where the body is encapsulated in a Lambda Function URL request): + +```bash +curl -v \ + --header "Content-Type: application/json" \ + --data @events/sample-request.json \ + http://127.0.0.1:7000/invoke + ``` + +## Deploy with the AWS CLI + +Here is how to deploy using the `aws` command line. + +### Step 1: Create the function + +```bash +# Replace with your AWS Account ID +AWS_ACCOUNT_ID=012345678901 +aws lambda create-function \ +--function-name StreamingFromEvent \ +--zip-file fileb://.build/plugins/AWSLambdaPackager/outputs/AWSLambdaPackager/StreamingFromEvent/StreamingFromEvent.zip \ +--runtime provided.al2 \ +--handler provided \ +--architectures arm64 \ +--role arn:aws:iam::${AWS_ACCOUNT_ID}:role/lambda_basic_execution +``` + +> [!IMPORTANT] +> The timeout value must be bigger than the time it takes for your function to stream its output. Otherwise, the Lambda control plane will terminate the execution environment before your code has a chance to finish writing the stream. Here, the sample function stream responses during 10 seconds and we set the timeout for 15 seconds. + +The `--architectures` flag is only required when you build the binary on an Apple Silicon machine (Apple M1 or more recent). It defaults to `x64`. + +Be sure to set `AWS_ACCOUNT_ID` with your actual AWS account ID (for example: 012345678901). + +### Step 2: Give permission to invoke that function through a URL + +Anyone with a valid signature from your AWS account will have permission to invoke the function through its URL. + +```bash +aws lambda add-permission \ + --function-name StreamingFromEvent \ + --action lambda:InvokeFunctionUrl \ + --principal ${AWS_ACCOUNT_ID} \ + --function-url-auth-type AWS_IAM \ + --statement-id allowURL +``` + +### Step 3: Create the URL + +This creates [a URL with IAM authentication](https://docs.aws.amazon.com/lambda/latest/dg/urls-auth.html). Only calls with a valid signature will be authorized. + +```bash +aws lambda create-function-url-config \ + --function-name StreamingFromEvent \ + --auth-type AWS_IAM \ + --invoke-mode RESPONSE_STREAM +``` +This call returns various information, including the URL to invoke your function. + +```json +{ + "FunctionUrl": "https://ul3nf4dogmgyr7ffl5r5rs22640fwocc.lambda-url.us-east-1.on.aws/", + "FunctionArn": "arn:aws:lambda:us-east-1:012345678901:function:StreamingFromEvent", + "AuthType": "AWS_IAM", + "CreationTime": "2024-10-22T07:57:23.112599Z", + "InvokeMode": "RESPONSE_STREAM" +} +``` + +### Invoke your Lambda function + +To invoke the Lambda function, use `curl` with the AWS Sigv4 option to generate the signature. + +Read the [AWS Credentials and Signature](../README.md/#AWS-Credentials-and-Signature) section for more details about the AWS Sigv4 protocol and how to obtain AWS credentials. + +When you have the `aws` command line installed and configured, you will find the credentials in the `~/.aws/credentials` file. + +```bash +URL=https://ul3nf4dogmgyr7ffl5r5rs22640fwocc.lambda-url.us-east-1.on.aws/ +REGION=us-east-1 +ACCESS_KEY=AK... +SECRET_KEY=... +AWS_SESSION_TOKEN=... + +curl --user "${ACCESS_KEY}":"${SECRET_KEY}" \ + --aws-sigv4 "aws:amz:${REGION}:lambda" \ + -H "x-amz-security-token: ${AWS_SESSION_TOKEN}" \ + --no-buffer \ + --header "Content-Type: application/json" \ + --data '{"count": 3, "message": "Hello World!", "delayMs": 1000}' \ + "$URL" +``` + +This should output the following result, with configurable delays between each message: + +``` +[2024-07-15T05:00:00Z] Message 1/3: Hello World! +[2024-07-15T05:00:01Z] Message 2/3: Hello World! +[2024-07-15T05:00:02Z] Message 3/3: Hello World! +✅ Successfully sent 3 messages +``` + +### Undeploy + +When done testing, you can delete the Lambda function with this command. + +```bash +aws lambda delete-function --function-name StreamingFromEvent +``` + +## Deploy with AWS SAM + +Alternatively, you can use [AWS SAM](https://aws.amazon.com/serverless/sam/) to deploy the Lambda function. + +**Prerequisites** : Install the [SAM CLI](https://docs.aws.amazon.com/serverless-application-model/latest/developerguide/install-sam-cli.html) + +### SAM Template + +The template file is provided as part of the example in the `template.yaml` file. It defines a Lambda function based on the binary ZIP file. It creates the function url with IAM authentication and sets the function timeout to 15 seconds. + +```yaml +AWSTemplateFormatVersion: '2010-09-09' +Transform: AWS::Serverless-2016-10-31 +Description: SAM Template for StreamingFromEvent Example + +Resources: + # Lambda function + StreamingNumbers: + Type: AWS::Serverless::Function + Properties: + CodeUri: .build/plugins/AWSLambdaPackager/outputs/AWSLambdaPackager/StreamingFromEvent/StreamingFromEvent.zip + Timeout: 15 + Handler: swift.bootstrap # ignored by the Swift runtime + Runtime: provided.al2 + MemorySize: 128 + Architectures: + - arm64 + FunctionUrlConfig: + AuthType: AWS_IAM + InvokeMode: RESPONSE_STREAM + +Outputs: + # print Lambda function URL + LambdaURL: + Description: Lambda URL + Value: !GetAtt StreamingNumbersUrl.FunctionUrl +``` + +### Deploy with SAM + +```bash +sam deploy \ +--resolve-s3 \ +--template-file template.yaml \ +--stack-name StreamingFromEvent \ +--capabilities CAPABILITY_IAM +``` + +The URL of the function is provided as part of the output. + +``` +CloudFormation outputs from deployed stack +----------------------------------------------------------------------------------------------------------------------------- +Outputs +----------------------------------------------------------------------------------------------------------------------------- +Key LambdaURL +Description Lambda URL +Value https://gaudpin2zjqizfujfnqxstnv6u0czrfu.lambda-url.us-east-1.on.aws/ +----------------------------------------------------------------------------------------------------------------------------- +``` + +Once the function is deployed, you can invoke it with `curl`, similarly to what you did when deploying with the AWS CLI. + +```bash +curl -X POST \ + --data '{"count": 3, "message": "Hello World!", "delayMs": 1000}' \ + --user "$ACCESS_KEY":"$SECRET_KEY" \ + --aws-sigv4 "aws:amz:${REGION}:lambda" \ + -H "x-amz-security-token: $AWS_SESSION_TOKEN" \ + --no-buffer \ + "$URL" +``` + +### Undeploy with SAM + +When done testing, you can delete the infrastructure with this command. + +```bash +sam delete +``` diff --git a/Examples/StreamingFromEvent/Sources/main.swift b/Examples/StreamingFromEvent/Sources/main.swift new file mode 100644 index 00000000..bf559dd8 --- /dev/null +++ b/Examples/StreamingFromEvent/Sources/main.swift @@ -0,0 +1,71 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the SwiftAWSLambdaRuntime open source project +// +// Copyright (c) 2024 Apple Inc. and the SwiftAWSLambdaRuntime project authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See CONTRIBUTORS.txt for the list of SwiftAWSLambdaRuntime project authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// + +import AWSLambdaRuntime +import NIOCore + +#if canImport(FoundationEssentials) +import FoundationEssentials +#else +import Foundation +#endif + +// Define your input event structure +struct StreamingRequest: Decodable { + let count: Int + let message: String + let delayMs: Int? + + // Provide default values for optional fields + var delay: Int { + delayMs ?? 500 + } +} + +// Use the new streaming handler with JSON decoding +let runtime = LambdaRuntime { (event: StreamingRequest, responseWriter, context: LambdaContext) in + context.logger.info("Received request to send \(event.count) messages: '\(event.message)'") + + // Validate input + guard event.count > 0 && event.count <= 100 else { + let errorMessage = "Count must be between 1 and 100, got: \(event.count)" + context.logger.error("\(errorMessage)") + try await responseWriter.writeAndFinish(ByteBuffer(string: "Error: \(errorMessage)\n")) + return + } + + // Stream the messages + for i in 1...event.count { + let response = "[\(Date().ISO8601Format())] Message \(i)/\(event.count): \(event.message)\n" + try await responseWriter.write(ByteBuffer(string: response)) + + // Optional delay between messages + if event.delay > 0 { + try await Task.sleep(for: .milliseconds(event.delay)) + } + } + + // Send completion message and finish the stream + let completionMessage = "✅ Successfully sent \(event.count) messages\n" + try await responseWriter.writeAndFinish(ByteBuffer(string: completionMessage)) + + // Optional: Do background work here after response is sent + context.logger.info("Background work: cleaning up resources and logging metrics") + + // Simulate some background processing + try await Task.sleep(for: .milliseconds(100)) + context.logger.info("Background work completed") +} + +try await runtime.run() diff --git a/Examples/StreamingFromEvent/events/sample-request.json b/Examples/StreamingFromEvent/events/sample-request.json new file mode 100644 index 00000000..145cb315 --- /dev/null +++ b/Examples/StreamingFromEvent/events/sample-request.json @@ -0,0 +1,49 @@ +{ + "version": "2.0", + "routeKey": "$default", + "rawPath": "/", + "rawQueryString": "", + "body": "{\"count\": 5, \"message\": \"Hello from streaming Lambda!\", \"delayMs\": 1000}", + "headers": { + "x-amzn-tls-cipher-suite": "TLS_AES_128_GCM_SHA256", + "x-amzn-tls-version": "TLSv1.3", + "x-amzn-trace-id": "Root=1-68762f44-4f6a87d1639e7fc356aa6f96", + "x-amz-date": "20250715T103651Z", + "x-forwarded-proto": "https", + "host": "zvnsvhpx7u5gn3l3euimg4jjou0jvbfe.lambda-url.us-east-1.on.aws", + "x-forwarded-port": "443", + "x-forwarded-for": "2a01:cb0c:6de:8300:a1be:8004:e31a:b9f", + "accept": "*/*", + "user-agent": "curl/8.7.1" + }, + "requestContext": { + "accountId": "0123456789", + "apiId": "zvnsvhpx7u5gn3l3euimg4jjou0jvbfe", + "authorizer": { + "iam": { + "accessKey": "AKIA....", + "accountId": "0123456789", + "callerId": "AIDA...", + "cognitoIdentity": null, + "principalOrgId": "o-rlrup7z3ao", + "userArn": "arn:aws:iam::0123456789:user/sst", + "userId": "AIDA..." + } + }, + "domainName": "zvnsvhpx7u5gn3l3euimg4jjou0jvbfe.lambda-url.us-east-1.on.aws", + "domainPrefix": "zvnsvhpx7u5gn3l3euimg4jjou0jvbfe", + "http": { + "method": "GET", + "path": "/", + "protocol": "HTTP/1.1", + "sourceIp": "2a01:...:b9f", + "userAgent": "curl/8.7.1" + }, + "requestId": "f942509a-283f-4c4f-94f8-0d4ccc4a00f8", + "routeKey": "$default", + "stage": "$default", + "time": "15/Jul/2025:10:36:52 +0000", + "timeEpoch": 1752575812081 + }, + "isBase64Encoded": false +} \ No newline at end of file diff --git a/Examples/StreamingFromEvent/template.yaml b/Examples/StreamingFromEvent/template.yaml new file mode 100644 index 00000000..6ebb5d61 --- /dev/null +++ b/Examples/StreamingFromEvent/template.yaml @@ -0,0 +1,25 @@ +AWSTemplateFormatVersion: '2010-09-09' +Transform: AWS::Serverless-2016-10-31 +Description: SAM Template for StreamingfromEvent Example + +Resources: + # Lambda function + StreamingFromEvent: + Type: AWS::Serverless::Function + Properties: + CodeUri: .build/plugins/AWSLambdaPackager/outputs/AWSLambdaPackager/StreamingFromEvent/StreamingFromEvent.zip + Timeout: 15 + Handler: swift.bootstrap # ignored by the Swift runtime + Runtime: provided.al2 + MemorySize: 128 + Architectures: + - arm64 + FunctionUrlConfig: + AuthType: AWS_IAM + InvokeMode: RESPONSE_STREAM + +Outputs: + # print Lambda function URL + LambdaURL: + Description: Lambda URL + Value: !GetAtt StreamingFromEventUrl.FunctionUrl diff --git a/Sources/AWSLambdaRuntime/FoundationSupport/Vendored/FunctionURL-HTTPType.swift b/Sources/AWSLambdaRuntime/FoundationSupport/Vendored/FunctionURL-HTTPType.swift new file mode 100644 index 00000000..67f853e7 --- /dev/null +++ b/Sources/AWSLambdaRuntime/FoundationSupport/Vendored/FunctionURL-HTTPType.swift @@ -0,0 +1,110 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the SwiftAWSLambdaRuntime open source project +// +// Copyright (c) 2017-2022 Apple Inc. and the SwiftAWSLambdaRuntime project authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See CONTRIBUTORS.txt for the list of SwiftAWSLambdaRuntime project authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// + +#if canImport(FoundationEssentials) +import FoundationEssentials +#else +import Foundation +#endif + +// https://docs.aws.amazon.com/lambda/latest/dg/urls-invocation.html + +/// This is a simplified version of the FunctionURLRequest structure, with no dependencies on the HTTPType module. +/// This file is copied from AWS Lambda Event project at https://github.com/swift-server/swift-aws-lambda-events + +/// FunctionURLRequest contains data coming from a bare Lambda Function URL +public struct FunctionURLRequest: Codable, Sendable { + public struct Context: Codable, Sendable { + public struct Authorizer: Codable, Sendable { + public struct IAMAuthorizer: Codable, Sendable { + public let accessKey: String + + public let accountId: String + public let callerId: String + public let cognitoIdentity: String? + + public let principalOrgId: String? + + public let userArn: String + public let userId: String + } + + public let iam: IAMAuthorizer? + } + + public struct HTTP: Codable, Sendable { + public let method: String + public let path: String + public let `protocol`: String + public let sourceIp: String + public let userAgent: String + } + + public let accountId: String + public let apiId: String + public let authentication: String? + public let authorizer: Authorizer? + public let domainName: String + public let domainPrefix: String + public let http: HTTP + + public let requestId: String + public let routeKey: String + public let stage: String + + public let time: String + public let timeEpoch: Int + } + + public let version: String + + public let routeKey: String + public let rawPath: String + public let rawQueryString: String + public let cookies: [String]? + public let headers: [String: String] + public let queryStringParameters: [String: String]? + + public let requestContext: Context + + public let body: String? + public let pathParameters: [String: String]? + public let isBase64Encoded: Bool + + public let stageVariables: [String: String]? +} + +// MARK: - Response - + +public struct FunctionURLResponse: Codable, Sendable { + public var statusCode: Int + public var headers: [String: String]? + public var body: String? + public let cookies: [String]? + public var isBase64Encoded: Bool? + + public init( + statusCode: Int, + headers: [String: String]? = nil, + body: String? = nil, + cookies: [String]? = nil, + isBase64Encoded: Bool? = nil + ) { + self.statusCode = statusCode + self.headers = headers + self.body = body + self.cookies = cookies + self.isBase64Encoded = isBase64Encoded + } +} diff --git a/Sources/AWSLambdaRuntime/LambdaStreaming+Codable.swift b/Sources/AWSLambdaRuntime/LambdaStreaming+Codable.swift new file mode 100644 index 00000000..a0b9a750 --- /dev/null +++ b/Sources/AWSLambdaRuntime/LambdaStreaming+Codable.swift @@ -0,0 +1,206 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the SwiftAWSLambdaRuntime open source project +// +// Copyright (c) 2025 Apple Inc. and the SwiftAWSLambdaRuntime project authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See CONTRIBUTORS.txt for the list of SwiftAWSLambdaRuntime project authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// + +import Logging +import NIOCore + +#if canImport(FoundationEssentials) +import FoundationEssentials +#else +import Foundation +#endif + +/// A streaming handler protocol that receives a decoded JSON event and can stream responses. +/// This handler protocol supports response streaming and background work execution. +/// Background work can be executed after closing the response stream by calling +/// ``LambdaResponseStreamWriter/finish()`` or ``LambdaResponseStreamWriter/writeAndFinish(_:)``. +public protocol StreamingLambdaHandlerWithEvent: _Lambda_SendableMetatype { + /// Generic input type that will be decoded from JSON. + associatedtype Event: Decodable + + /// The handler function that receives a decoded event and can stream responses. + /// - Parameters: + /// - event: The decoded event object. + /// - responseWriter: A ``LambdaResponseStreamWriter`` to write the invocation's response to. + /// If no response or error is written to `responseWriter` an error will be reported to the invoker. + /// - context: The ``LambdaContext`` containing the invocation's metadata. + /// - Throws: + /// How the thrown error will be handled by the runtime: + /// - An invocation error will be reported if the error is thrown before the first call to + /// ``LambdaResponseStreamWriter/write(_:)``. + /// - If the error is thrown after call(s) to ``LambdaResponseStreamWriter/write(_:)`` but before + /// a call to ``LambdaResponseStreamWriter/finish()``, the response stream will be closed and trailing + /// headers will be sent. + /// - If ``LambdaResponseStreamWriter/finish()`` has already been called before the error is thrown, the + /// error will be logged. + mutating func handle( + _ event: Event, + responseWriter: some LambdaResponseStreamWriter, + context: LambdaContext + ) async throws +} + +/// Adapts a ``StreamingLambdaHandlerWithEvent`` to work as a ``StreamingLambdaHandler`` +/// by handling JSON decoding of the input event. +public struct StreamingLambdaCodableAdapter< + Handler: StreamingLambdaHandlerWithEvent, + Decoder: LambdaEventDecoder +>: StreamingLambdaHandler where Handler.Event: Decodable { + @usableFromInline var handler: Handler + @usableFromInline let decoder: Decoder + + /// Initialize with a custom decoder and handler. + /// - Parameters: + /// - decoder: The decoder to use for parsing the input event. + /// - handler: The streaming handler that works with decoded events. + @inlinable + public init(decoder: sending Decoder, handler: sending Handler) { + self.decoder = decoder + self.handler = handler + } + + /// Handles the raw ByteBuffer by decoding it and passing to the underlying handler. + /// - Parameters: + /// - event: The raw ByteBuffer event to decode. + /// - responseWriter: The response writer to pass to the underlying handler. + /// - context: The Lambda context. + @inlinable + public mutating func handle( + _ event: ByteBuffer, + responseWriter: some LambdaResponseStreamWriter, + context: LambdaContext + ) async throws { + + // try to decode the event as a FunctionURLRequest and extract its body + let urlRequestBody = bodyFromFunctionURLRequest(event) + + // decode the body or the event as user-provided JSON + let decodedEvent = try self.decoder.decode(Handler.Event.self, from: urlRequestBody ?? event) + + // and pass it to the handler + try await self.handler.handle(decodedEvent, responseWriter: responseWriter, context: context) + } + + /// Extract the body payload from a FunctionURLRequest event. + /// This function checks if the event is a valid `FunctionURLRequest` and decodes the body if it is base64 encoded. + /// If the event is not a valid `FunctionURLRequest`, it returns nil. + /// - Parameter event: The raw ByteBuffer event to check. + /// - Returns: The base64 decoded body of the FunctionURLRequest if it is a valid FunctionURLRequest, otherwise nil. + @inlinable + package func bodyFromFunctionURLRequest(_ event: ByteBuffer) -> ByteBuffer? { + do { + // try to decode as a FunctionURLRequest + let request = try self.decoder.decode(FunctionURLRequest.self, from: event) + + // if the body is encoded in base64, decode it + if request.isBase64Encoded, + let base64EncodedString = request.body, + // this is the minimal way to base64 decode without importing new dependencies + let decodedData = Data(base64Encoded: base64EncodedString), + let decodedString = String(data: decodedData, encoding: .utf8) + { + + return ByteBuffer(string: decodedString) + } else { + return ByteBuffer(string: request.body ?? "") + } + } catch { + // not a FunctionURLRequest, return nil + return nil + } + } +} + +/// A closure-based streaming handler that works with decoded JSON events. +/// Allows for a streaming handler to be defined in a clean manner, leveraging Swift's trailing closure syntax. +public struct StreamingFromEventClosureHandler: StreamingLambdaHandlerWithEvent { + let body: @Sendable (Event, LambdaResponseStreamWriter, LambdaContext) async throws -> Void + + /// Initialize with a closure that receives a decoded event. + /// - Parameter body: The handler closure that receives a decoded event, response writer, and context. + public init( + body: @Sendable @escaping (Event, LambdaResponseStreamWriter, LambdaContext) async throws -> Void + ) { + self.body = body + } + + /// Calls the provided closure with the decoded event. + /// - Parameters: + /// - event: The decoded event object. + /// - responseWriter: The response writer for streaming output. + /// - context: The Lambda context. + public func handle( + _ event: Event, + responseWriter: some LambdaResponseStreamWriter, + context: LambdaContext + ) async throws { + try await self.body(event, responseWriter, context) + } +} + +#if FoundationJSONSupport + +extension StreamingLambdaCodableAdapter { + /// Initialize with a JSON decoder and handler. + /// - Parameters: + /// - decoder: The JSON decoder to use. Defaults to `JSONDecoder()`. + /// - handler: The streaming handler that works with decoded events. + public init( + decoder: JSONDecoder = JSONDecoder(), + handler: sending Handler + ) where Decoder == LambdaJSONEventDecoder { + self.init(decoder: LambdaJSONEventDecoder(decoder), handler: handler) + } +} + +extension LambdaRuntime { + /// Initialize with a streaming handler that receives decoded JSON events. + /// - Parameters: + /// - decoder: The JSON decoder to use. Defaults to `JSONDecoder()`. + /// - logger: The logger to use. Defaults to a logger with label "LambdaRuntime". + /// - streamingBody: The handler closure that receives a decoded event. + public convenience init( + decoder: JSONDecoder = JSONDecoder(), + logger: Logger = Logger(label: "LambdaRuntime"), + streamingBody: @Sendable @escaping (Event, LambdaResponseStreamWriter, LambdaContext) async throws -> Void + ) + where + Handler == StreamingLambdaCodableAdapter< + StreamingFromEventClosureHandler, + LambdaJSONEventDecoder + > + { + let closureHandler = StreamingFromEventClosureHandler(body: streamingBody) + let adapter = StreamingLambdaCodableAdapter( + decoder: decoder, + handler: closureHandler + ) + self.init(handler: adapter, logger: logger) + } + + /// Initialize with a custom streaming handler that receives decoded events. + /// - Parameters: + /// - decoder: The decoder to use for parsing input events. + /// - handler: The streaming handler. + /// - logger: The logger to use. + public convenience init( + decoder: sending Decoder, + handler: sending StreamingHandler, + logger: Logger = Logger(label: "LambdaRuntime") + ) where Handler == StreamingLambdaCodableAdapter { + let adapter = StreamingLambdaCodableAdapter(decoder: decoder, handler: handler) + self.init(handler: adapter, logger: logger) + } +} +#endif // FoundationJSONSupport diff --git a/Tests/AWSLambdaRuntimeTests/LambdaStreamingCodableTests.swift b/Tests/AWSLambdaRuntimeTests/LambdaStreamingCodableTests.swift new file mode 100644 index 00000000..b67ae8b1 --- /dev/null +++ b/Tests/AWSLambdaRuntimeTests/LambdaStreamingCodableTests.swift @@ -0,0 +1,340 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the SwiftAWSLambdaRuntime open source project +// +// Copyright (c) 2025 Apple Inc. and the SwiftAWSLambdaRuntime project authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See CONTRIBUTORS.txt for the list of SwiftAWSLambdaRuntime project authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// + +import Logging +import NIOCore +import Synchronization +import Testing + +@testable import AWSLambdaRuntime + +#if canImport(FoundationEssentials) +import FoundationEssentials +#else +import Foundation +#endif + +@Suite("Streaming Codable Lambda Handler Tests") +struct LambdaStreamingFromEventTests { + + // MARK: - Test Data Structures + + struct TestEvent: Decodable, Equatable { + let message: String + let count: Int + let delay: Int? + } + + struct SimpleEvent: Decodable, Equatable { + let value: String + } + + // MARK: - Mock Response Writer + + actor MockResponseWriter: LambdaResponseStreamWriter { + private var writtenBuffers: [ByteBuffer] = [] + private var isFinished = false + private var writeAndFinishCalled = false + + func write(_ buffer: ByteBuffer) async throws { + guard !isFinished else { + throw MockError.writeAfterFinish + } + writtenBuffers.append(buffer) + } + + func finish() async throws { + guard !isFinished else { + throw MockError.alreadyFinished + } + isFinished = true + } + + func writeAndFinish(_ buffer: ByteBuffer) async throws { + try await write(buffer) + try await finish() + writeAndFinishCalled = true + } + + // Test helpers + func getWrittenData() -> [String] { + writtenBuffers.compactMap { buffer in + buffer.getString(at: buffer.readerIndex, length: buffer.readableBytes) + } + } + + func getFinished() -> Bool { + isFinished + } + + func getWriteAndFinishCalled() -> Bool { + writeAndFinishCalled + } + } + + enum MockError: Error { + case writeAfterFinish + case alreadyFinished + case decodingFailed + case handlerError + } + + // MARK: - Test StreamingFromEventClosureHandler + + @Test("StreamingFromEventClosureHandler handles decoded events correctly") + func testStreamingFromEventClosureHandler() async throws { + let responseWriter = MockResponseWriter() + let context = LambdaContext.makeTest() + + let handler = StreamingFromEventClosureHandler { event, writer, context in + let message = "Received: \(event.message) (count: \(event.count))" + try await writer.writeAndFinish(ByteBuffer(string: message)) + } + + let testEvent = TestEvent(message: "Hello", count: 42, delay: nil) + + try await handler.handle(testEvent, responseWriter: responseWriter, context: context) + + let writtenData = await responseWriter.getWrittenData() + let isFinished = await responseWriter.getFinished() + + #expect(writtenData == ["Received: Hello (count: 42)"]) + #expect(isFinished == true) + } + + @Test("StreamingFromEventClosureHandler can stream multiple responses") + func testStreamingMultipleResponses() async throws { + let responseWriter = MockResponseWriter() + let context = LambdaContext.makeTest() + + let handler = StreamingFromEventClosureHandler { event, writer, context in + for i in 1...event.count { + try await writer.write(ByteBuffer(string: "\(i): \(event.message)\n")) + } + try await writer.finish() + } + + let testEvent = TestEvent(message: "Test", count: 3, delay: nil) + + try await handler.handle(testEvent, responseWriter: responseWriter, context: context) + + let writtenData = await responseWriter.getWrittenData() + let isFinished = await responseWriter.getFinished() + + #expect(writtenData == ["1: Test\n", "2: Test\n", "3: Test\n"]) + #expect(isFinished == true) + } + + // MARK: - Test StreamingLambdaCodableAdapter + + @Test("StreamingLambdaCodableAdapter decodes JSON and calls handler") + func testStreamingLambdaCodableAdapter() async throws { + let responseWriter = MockResponseWriter() + let context = LambdaContext.makeTest() + + let closureHandler = StreamingFromEventClosureHandler { event, writer, context in + try await writer.writeAndFinish(ByteBuffer(string: "Echo: \(event.value)")) + } + + var adapter = StreamingLambdaCodableAdapter( + decoder: LambdaJSONEventDecoder(JSONDecoder()), + handler: closureHandler + ) + + let jsonData = #"{"value": "test message"}"# + let inputBuffer = ByteBuffer(string: jsonData) + + try await adapter.handle(inputBuffer, responseWriter: responseWriter, context: context) + + let writtenData = await responseWriter.getWrittenData() + let isFinished = await responseWriter.getFinished() + + #expect(writtenData == ["Echo: test message"]) + #expect(isFinished == true) + } + + @Test("StreamingLambdaCodableAdapter handles JSON decoding errors") + func testStreamingLambdaCodableAdapterDecodingError() async throws { + let responseWriter = MockResponseWriter() + let context = LambdaContext.makeTest() + + let closureHandler = StreamingFromEventClosureHandler { event, writer, context in + try await writer.writeAndFinish(ByteBuffer(string: "Should not reach here")) + } + + var adapter = StreamingLambdaCodableAdapter( + decoder: LambdaJSONEventDecoder(JSONDecoder()), + handler: closureHandler + ) + + let invalidJsonData = #"{"invalid": "json structure"}"# + let inputBuffer = ByteBuffer(string: invalidJsonData) + + await #expect(throws: DecodingError.self) { + try await adapter.handle(inputBuffer, responseWriter: responseWriter, context: context) + } + + let writtenData = await responseWriter.getWrittenData() + #expect(writtenData.isEmpty) + } + + @Test("StreamingLambdaCodableAdapter with convenience JSON initializer") + func testStreamingLambdaCodableAdapterJSONConvenience() async throws { + let responseWriter = MockResponseWriter() + let context = LambdaContext.makeTest() + + let closureHandler = StreamingFromEventClosureHandler { event, writer, context in + try await writer.write(ByteBuffer(string: "Message: \(event.message)\n")) + try await writer.write(ByteBuffer(string: "Count: \(event.count)\n")) + try await writer.finish() + } + + var adapter = StreamingLambdaCodableAdapter(handler: closureHandler) + + let jsonData = #"{"message": "Hello World", "count": 5, "delay": 100}"# + let inputBuffer = ByteBuffer(string: jsonData) + + try await adapter.handle(inputBuffer, responseWriter: responseWriter, context: context) + + let writtenData = await responseWriter.getWrittenData() + let isFinished = await responseWriter.getFinished() + + #expect(writtenData == ["Message: Hello World\n", "Count: 5\n"]) + #expect(isFinished == true) + } + + // MARK: - Test Error Handling + + @Test("Handler errors are properly propagated") + func testHandlerErrorPropagation() async throws { + let responseWriter = MockResponseWriter() + let context = LambdaContext.makeTest() + + let closureHandler = StreamingFromEventClosureHandler { event, writer, context in + throw MockError.handlerError + } + + var adapter = StreamingLambdaCodableAdapter( + decoder: LambdaJSONEventDecoder(JSONDecoder()), + handler: closureHandler + ) + + let jsonData = #"{"value": "test"}"# + let inputBuffer = ByteBuffer(string: jsonData) + + await #expect(throws: MockError.self) { + try await adapter.handle(inputBuffer, responseWriter: responseWriter, context: context) + } + } + + // MARK: - Test Custom Handler Implementation + + struct CustomStreamingHandler: StreamingLambdaHandlerWithEvent { + typealias Event = TestEvent + + func handle( + _ event: Event, + responseWriter: some LambdaResponseStreamWriter, + context: LambdaContext + ) async throws { + context.logger.trace("Processing event with message: \(event.message)") + + let response = "Processed: \(event.message) with count \(event.count)" + try await responseWriter.writeAndFinish(ByteBuffer(string: response)) + } + } + + @Test("Custom StreamingLambdaHandlerWithEvent implementation works") + func testCustomStreamingHandler() async throws { + let responseWriter = MockResponseWriter() + let context = LambdaContext.makeTest() + + let handler = CustomStreamingHandler() + let testEvent = TestEvent(message: "Custom Handler Test", count: 10, delay: nil) + + try await handler.handle(testEvent, responseWriter: responseWriter, context: context) + + let writtenData = await responseWriter.getWrittenData() + let isFinished = await responseWriter.getFinished() + + #expect(writtenData == ["Processed: Custom Handler Test with count 10"]) + #expect(isFinished == true) + } + + @Test("Custom handler with adapter works end-to-end") + func testCustomHandlerWithAdapter() async throws { + let responseWriter = MockResponseWriter() + let context = LambdaContext.makeTest() + + let customHandler = CustomStreamingHandler() + var adapter = StreamingLambdaCodableAdapter(handler: customHandler) + + let jsonData = #"{"message": "End-to-end test", "count": 7}"# + let inputBuffer = ByteBuffer(string: jsonData) + + try await adapter.handle(inputBuffer, responseWriter: responseWriter, context: context) + + let writtenData = await responseWriter.getWrittenData() + let isFinished = await responseWriter.getFinished() + + #expect(writtenData == ["Processed: End-to-end test with count 7"]) + #expect(isFinished == true) + } + + // MARK: - Test Background Work Simulation + + @Test("Handler can perform background work after streaming") + func testBackgroundWorkAfterStreaming() async throws { + let responseWriter = MockResponseWriter() + let context = LambdaContext.makeTest() + + let backgroundWorkCompleted = Atomic(false) + + let handler = StreamingFromEventClosureHandler { event, writer, context in + // Send response first + try await writer.writeAndFinish(ByteBuffer(string: "Response: \(event.value)")) + + // Simulate background work + try await Task.sleep(for: .milliseconds(10)) + backgroundWorkCompleted.store(true, ordering: .relaxed) + } + + let testEvent = SimpleEvent(value: "background test") + + try await handler.handle(testEvent, responseWriter: responseWriter, context: context) + + let writtenData = await responseWriter.getWrittenData() + let isFinished = await responseWriter.getFinished() + let writeAndFinishCalled = await responseWriter.getWriteAndFinishCalled() + + #expect(writtenData == ["Response: background test"]) + #expect(isFinished == true) + #expect(writeAndFinishCalled == true) + #expect(backgroundWorkCompleted.load(ordering: .relaxed) == true) + } +} + +// MARK: - Test Helpers + +extension LambdaContext { + static func makeTest() -> LambdaContext { + LambdaContext.__forTestsOnly( + requestID: "test-request-id", + traceID: "test-trace-id", + invokedFunctionARN: "arn:aws:lambda:us-east-1:123456789012:function:test", + timeout: .seconds(30), + logger: Logger(label: "test") + ) + } +} diff --git a/readme.md b/readme.md index 37596ed2..46232ada 100644 --- a/readme.md +++ b/readme.md @@ -254,6 +254,56 @@ try await runtime.run() You can learn how to deploy and invoke this function in [the streaming example README file](Examples/Streaming/README.md). +### Lambda Streaming Response with JSON Input + +The Swift AWS Lambda Runtime also provides a convenient interface that combines the benefits of JSON input decoding with response streaming capabilities. This is ideal when you want to receive strongly-typed JSON events while maintaining the ability to stream responses and execute background work. + +Here is an example of a function that receives a JSON event and streams multiple responses: + +```swift +import AWSLambdaRuntime +import NIOCore + +// Define your input event structure +struct StreamingRequest: Decodable { + let count: Int + let message: String + let delayMs: Int? +} + +// Use the new streaming handler with JSON decoding +let runtime = LambdaRuntime { (event: StreamingRequest, responseWriter, context: LambdaContext) in + context.logger.info("Received request to send \(event.count) messages") + + // Stream the messages + for i in 1...event.count { + let response = "Message \(i)/\(event.count): \(event.message)\n" + try await responseWriter.write(ByteBuffer(string: response)) + + // Optional delay between messages + if let delay = event.delayMs, delay > 0 { + try await Task.sleep(for: .milliseconds(delay)) + } + } + + // Finish the stream + try await responseWriter.finish() + + // Optional: Execute background work after response is sent + context.logger.info("Background work: processing completed") +} + +try await runtime.run() +``` + +This interface provides: +- **Type-safe JSON input**: Automatic decoding of JSON events into Swift structs +- **Streaming responses**: Full control over when and how to stream data back to clients +- **Background work support**: Ability to execute code after the response stream is finished +- **Familiar API**: Uses the same closure-based pattern as regular Lambda handlers + +You can learn how to deploy and invoke this function in [the streaming codable example README file](Examples/StreamingFromEvent/README.md). + ### Integration with AWS Services Most Lambda functions are triggered by events originating in other AWS services such as `Amazon SNS`, `Amazon SQS` or `AWS APIGateway`.