diff --git a/.github/ISSUE_TEMPLATE/02-bug_report.yml b/.github/ISSUE_TEMPLATE/02-bug_report.yml index 88f66efd..bdfbd730 100644 --- a/.github/ISSUE_TEMPLATE/02-bug_report.yml +++ b/.github/ISSUE_TEMPLATE/02-bug_report.yml @@ -16,7 +16,7 @@ body: attributes: label: Affected Trino version description: Which version of Trino do you see this bug in? -# +# - type: textarea attributes: label: Current and expected behavior diff --git a/CHANGELOG.md b/CHANGELOG.md index 0a870f76..b73fdf84 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,12 @@ All notable changes to this project will be documented in this file. ## [Unreleased] +### Added + +- Support for fault-tolerant execution ([#779]). + +[#779]: https://github.com/stackabletech/trino-operator/pull/779 + ## [25.7.0] - 2025-07-23 ## [25.7.0-rc1] - 2025-07-18 diff --git a/deploy/helm/trino-operator/crds/crds.yaml b/deploy/helm/trino-operator/crds/crds.yaml index f52c41b2..19ee4127 100644 --- a/deploy/helm/trino-operator/crds/crds.yaml +++ b/deploy/helm/trino-operator/crds/crds.yaml @@ -105,6 +105,543 @@ spec: description: matchLabels is a map of {key,value} pairs. A single {key,value} in the matchLabels map is equivalent to an element of matchExpressions, whose key field is "key", the operator is "In", and the values array contains only "value". The requirements are ANDed. type: object type: object + faultTolerantExecution: + description: Fault tolerant execution configuration. When enabled, Trino can automatically retry queries or tasks in case of failures. + nullable: true + oneOf: + - required: + - query + - required: + - task + properties: + query: + description: Query-level fault tolerant execution. Retries entire queries on failure. + properties: + exchangeDeduplicationBufferSize: + description: Data size of the coordinator's in-memory buffer used to store output of query stages. + nullable: true + type: string + exchangeManager: + description: Exchange manager configuration for spooling intermediate data during fault tolerant execution. Optional for Query retry policy, recommended for large result sets. + nullable: true + oneOf: + - required: + - s3 + - required: + - hdfs + - required: + - local + properties: + configOverrides: + additionalProperties: + type: string + default: {} + description: The `configOverrides` allow overriding arbitrary exchange manager properties. + type: object + encryptionEnabled: + description: Whether to enable encryption of spooling data. + nullable: true + type: boolean + hdfs: + description: HDFS-based exchange manager. + properties: + baseDirectories: + description: HDFS URIs for spooling data. + items: + type: string + type: array + blockSize: + description: Block data size for HDFS storage. + nullable: true + type: string + hdfs: + description: HDFS connection configuration. + properties: + configMap: + description: Name of the [discovery ConfigMap](https://docs.stackable.tech/home/nightly/concepts/service_discovery) providing information about the HDFS cluster. + type: string + required: + - configMap + type: object + skipDirectorySchemeValidation: + description: Skip directory scheme validation to support Hadoop-compatible file systems. + nullable: true + type: boolean + required: + - baseDirectories + - hdfs + type: object + local: + description: Local filesystem storage (not recommended for production). + properties: + baseDirectories: + description: Local filesystem paths for exchange storage. + items: + type: string + type: array + required: + - baseDirectories + type: object + s3: + description: S3-compatible storage configuration. + properties: + baseDirectories: + description: S3 bucket URIs for spooling data (e.g., s3://bucket1,s3://bucket2). + items: + type: string + type: array + connection: + description: S3 connection configuration. Learn more about S3 configuration in the [S3 concept docs](https://docs.stackable.tech/home/nightly/concepts/s3). + oneOf: + - required: + - inline + - required: + - reference + properties: + inline: + description: S3 connection definition as a resource. Learn more on the [S3 concept documentation](https://docs.stackable.tech/home/nightly/concepts/s3). + properties: + accessStyle: + default: VirtualHosted + description: Which access style to use. Defaults to virtual hosted-style as most of the data products out there. Have a look at the [AWS documentation](https://docs.aws.amazon.com/AmazonS3/latest/userguide/VirtualHosting.html). + enum: + - Path + - VirtualHosted + type: string + credentials: + description: If the S3 uses authentication you have to specify you S3 credentials. In the most cases a [SecretClass](https://docs.stackable.tech/home/nightly/secret-operator/secretclass) providing `accessKey` and `secretKey` is sufficient. + nullable: true + properties: + scope: + description: '[Scope](https://docs.stackable.tech/home/nightly/secret-operator/scope) of the [SecretClass](https://docs.stackable.tech/home/nightly/secret-operator/secretclass).' + nullable: true + properties: + listenerVolumes: + default: [] + description: The listener volume scope allows Node and Service scopes to be inferred from the applicable listeners. This must correspond to Volume names in the Pod that mount Listeners. + items: + type: string + type: array + node: + default: false + description: The node scope is resolved to the name of the Kubernetes Node object that the Pod is running on. This will typically be the DNS name of the node. + type: boolean + pod: + default: false + description: The pod scope is resolved to the name of the Kubernetes Pod. This allows the secret to differentiate between StatefulSet replicas. + type: boolean + services: + default: [] + description: The service scope allows Pod objects to specify custom scopes. This should typically correspond to Service objects that the Pod participates in. + items: + type: string + type: array + type: object + secretClass: + description: '[SecretClass](https://docs.stackable.tech/home/nightly/secret-operator/secretclass) containing the LDAP bind credentials.' + type: string + required: + - secretClass + type: object + host: + description: 'Host of the S3 server without any protocol or port. For example: `west1.my-cloud.com`.' + type: string + port: + description: Port the S3 server listens on. If not specified the product will determine the port to use. + format: uint16 + minimum: 0.0 + nullable: true + type: integer + region: + default: + name: us-east-1 + description: |- + Bucket region used for signing headers (sigv4). + + This defaults to `us-east-1` which is compatible with other implementations such as Minio. + + WARNING: Some products use the Hadoop S3 implementation which falls back to us-east-2. + properties: + name: + default: us-east-1 + type: string + type: object + tls: + description: Use a TLS connection. If not specified no TLS will be used. + nullable: true + properties: + verification: + description: The verification method used to verify the certificates of the server and/or the client. + oneOf: + - required: + - none + - required: + - server + properties: + none: + description: Use TLS but don't verify certificates. + type: object + server: + description: Use TLS and a CA certificate to verify the server. + properties: + caCert: + description: CA cert to verify the server. + oneOf: + - required: + - webPki + - required: + - secretClass + properties: + secretClass: + description: Name of the [SecretClass](https://docs.stackable.tech/home/nightly/secret-operator/secretclass) which will provide the CA certificate. Note that a SecretClass does not need to have a key but can also work with just a CA certificate, so if you got provided with a CA cert but don't have access to the key you can still use this method. + type: string + webPki: + description: Use TLS and the CA certificates trusted by the common web browsers to verify the server. This can be useful when you e.g. use public AWS S3 or other public available services. + type: object + type: object + required: + - caCert + type: object + type: object + required: + - verification + type: object + required: + - host + type: object + reference: + type: string + type: object + externalId: + description: External ID for the IAM role trust policy. + nullable: true + type: string + iamRole: + description: IAM role to assume for S3 access. + nullable: true + type: string + maxErrorRetries: + description: Maximum number of times the S3 client should retry a request. + format: uint32 + minimum: 0.0 + nullable: true + type: integer + uploadPartSize: + description: Part data size for S3 multi-part upload. + nullable: true + type: string + required: + - baseDirectories + - connection + type: object + sinkBufferPoolMinSize: + description: The minimum buffer pool size for an exchange sink. The larger the buffer pool size, the larger the write parallelism and memory usage. + format: uint32 + minimum: 0.0 + nullable: true + type: integer + sinkBuffersPerPartition: + description: The number of buffers per partition in the buffer pool. The larger the buffer pool size, the larger the write parallelism and memory usage. + format: uint32 + minimum: 0.0 + nullable: true + type: integer + sinkMaxFileSize: + description: Max data size of files written by exchange sinks. + nullable: true + type: string + sourceConcurrentReaders: + description: Number of concurrent readers to read from spooling storage. The larger the number of concurrent readers, the larger the read parallelism and memory usage. + format: uint32 + minimum: 0.0 + nullable: true + type: integer + type: object + retryAttempts: + description: Maximum number of times Trino may attempt to retry a query before declaring it failed. + format: uint32 + minimum: 0.0 + nullable: true + type: integer + retryDelayScaleFactor: + description: Factor by which retry delay is increased on each query failure. + format: float + nullable: true + type: number + retryInitialDelay: + description: Minimum time that a failed query must wait before it is retried. + nullable: true + type: string + retryMaxDelay: + description: Maximum time that a failed query must wait before it is retried. + nullable: true + type: string + type: object + task: + description: Task-level fault tolerant execution. Retries individual tasks on failure (requires exchange manager). + properties: + exchangeDeduplicationBufferSize: + description: Data size of the coordinator's in-memory buffer used to store output of query stages. + nullable: true + type: string + exchangeManager: + description: Exchange manager configuration for spooling intermediate data during fault tolerant execution. Required for Task retry policy. + oneOf: + - required: + - s3 + - required: + - hdfs + - required: + - local + properties: + configOverrides: + additionalProperties: + type: string + default: {} + description: The `configOverrides` allow overriding arbitrary exchange manager properties. + type: object + encryptionEnabled: + description: Whether to enable encryption of spooling data. + nullable: true + type: boolean + hdfs: + description: HDFS-based exchange manager. + properties: + baseDirectories: + description: HDFS URIs for spooling data. + items: + type: string + type: array + blockSize: + description: Block data size for HDFS storage. + nullable: true + type: string + hdfs: + description: HDFS connection configuration. + properties: + configMap: + description: Name of the [discovery ConfigMap](https://docs.stackable.tech/home/nightly/concepts/service_discovery) providing information about the HDFS cluster. + type: string + required: + - configMap + type: object + skipDirectorySchemeValidation: + description: Skip directory scheme validation to support Hadoop-compatible file systems. + nullable: true + type: boolean + required: + - baseDirectories + - hdfs + type: object + local: + description: Local filesystem storage (not recommended for production). + properties: + baseDirectories: + description: Local filesystem paths for exchange storage. + items: + type: string + type: array + required: + - baseDirectories + type: object + s3: + description: S3-compatible storage configuration. + properties: + baseDirectories: + description: S3 bucket URIs for spooling data (e.g., s3://bucket1,s3://bucket2). + items: + type: string + type: array + connection: + description: S3 connection configuration. Learn more about S3 configuration in the [S3 concept docs](https://docs.stackable.tech/home/nightly/concepts/s3). + oneOf: + - required: + - inline + - required: + - reference + properties: + inline: + description: S3 connection definition as a resource. Learn more on the [S3 concept documentation](https://docs.stackable.tech/home/nightly/concepts/s3). + properties: + accessStyle: + default: VirtualHosted + description: Which access style to use. Defaults to virtual hosted-style as most of the data products out there. Have a look at the [AWS documentation](https://docs.aws.amazon.com/AmazonS3/latest/userguide/VirtualHosting.html). + enum: + - Path + - VirtualHosted + type: string + credentials: + description: If the S3 uses authentication you have to specify you S3 credentials. In the most cases a [SecretClass](https://docs.stackable.tech/home/nightly/secret-operator/secretclass) providing `accessKey` and `secretKey` is sufficient. + nullable: true + properties: + scope: + description: '[Scope](https://docs.stackable.tech/home/nightly/secret-operator/scope) of the [SecretClass](https://docs.stackable.tech/home/nightly/secret-operator/secretclass).' + nullable: true + properties: + listenerVolumes: + default: [] + description: The listener volume scope allows Node and Service scopes to be inferred from the applicable listeners. This must correspond to Volume names in the Pod that mount Listeners. + items: + type: string + type: array + node: + default: false + description: The node scope is resolved to the name of the Kubernetes Node object that the Pod is running on. This will typically be the DNS name of the node. + type: boolean + pod: + default: false + description: The pod scope is resolved to the name of the Kubernetes Pod. This allows the secret to differentiate between StatefulSet replicas. + type: boolean + services: + default: [] + description: The service scope allows Pod objects to specify custom scopes. This should typically correspond to Service objects that the Pod participates in. + items: + type: string + type: array + type: object + secretClass: + description: '[SecretClass](https://docs.stackable.tech/home/nightly/secret-operator/secretclass) containing the LDAP bind credentials.' + type: string + required: + - secretClass + type: object + host: + description: 'Host of the S3 server without any protocol or port. For example: `west1.my-cloud.com`.' + type: string + port: + description: Port the S3 server listens on. If not specified the product will determine the port to use. + format: uint16 + minimum: 0.0 + nullable: true + type: integer + region: + default: + name: us-east-1 + description: |- + Bucket region used for signing headers (sigv4). + + This defaults to `us-east-1` which is compatible with other implementations such as Minio. + + WARNING: Some products use the Hadoop S3 implementation which falls back to us-east-2. + properties: + name: + default: us-east-1 + type: string + type: object + tls: + description: Use a TLS connection. If not specified no TLS will be used. + nullable: true + properties: + verification: + description: The verification method used to verify the certificates of the server and/or the client. + oneOf: + - required: + - none + - required: + - server + properties: + none: + description: Use TLS but don't verify certificates. + type: object + server: + description: Use TLS and a CA certificate to verify the server. + properties: + caCert: + description: CA cert to verify the server. + oneOf: + - required: + - webPki + - required: + - secretClass + properties: + secretClass: + description: Name of the [SecretClass](https://docs.stackable.tech/home/nightly/secret-operator/secretclass) which will provide the CA certificate. Note that a SecretClass does not need to have a key but can also work with just a CA certificate, so if you got provided with a CA cert but don't have access to the key you can still use this method. + type: string + webPki: + description: Use TLS and the CA certificates trusted by the common web browsers to verify the server. This can be useful when you e.g. use public AWS S3 or other public available services. + type: object + type: object + required: + - caCert + type: object + type: object + required: + - verification + type: object + required: + - host + type: object + reference: + type: string + type: object + externalId: + description: External ID for the IAM role trust policy. + nullable: true + type: string + iamRole: + description: IAM role to assume for S3 access. + nullable: true + type: string + maxErrorRetries: + description: Maximum number of times the S3 client should retry a request. + format: uint32 + minimum: 0.0 + nullable: true + type: integer + uploadPartSize: + description: Part data size for S3 multi-part upload. + nullable: true + type: string + required: + - baseDirectories + - connection + type: object + sinkBufferPoolMinSize: + description: The minimum buffer pool size for an exchange sink. The larger the buffer pool size, the larger the write parallelism and memory usage. + format: uint32 + minimum: 0.0 + nullable: true + type: integer + sinkBuffersPerPartition: + description: The number of buffers per partition in the buffer pool. The larger the buffer pool size, the larger the write parallelism and memory usage. + format: uint32 + minimum: 0.0 + nullable: true + type: integer + sinkMaxFileSize: + description: Max data size of files written by exchange sinks. + nullable: true + type: string + sourceConcurrentReaders: + description: Number of concurrent readers to read from spooling storage. The larger the number of concurrent readers, the larger the read parallelism and memory usage. + format: uint32 + minimum: 0.0 + nullable: true + type: integer + type: object + retryAttemptsPerTask: + description: Maximum number of times Trino may attempt to retry a single task before declaring the query failed. + format: uint32 + minimum: 0.0 + nullable: true + type: integer + retryDelayScaleFactor: + description: Factor by which retry delay is increased on each task failure. + format: float + nullable: true + type: number + retryInitialDelay: + description: Minimum time that a failed task must wait before it is retried. + nullable: true + type: string + retryMaxDelay: + description: Maximum time that a failed task must wait before it is retried. + nullable: true + type: string + required: + - exchangeManager + type: object + type: object tls: default: internalSecretClass: tls diff --git a/docs/modules/trino/examples/usage-guide/fault-tolerant-execution.yaml b/docs/modules/trino/examples/usage-guide/fault-tolerant-execution.yaml new file mode 100644 index 00000000..870406ef --- /dev/null +++ b/docs/modules/trino/examples/usage-guide/fault-tolerant-execution.yaml @@ -0,0 +1,108 @@ +--- +apiVersion: trino.stackable.tech/v1alpha1 +kind: TrinoCluster +metadata: + name: trino-fault-tolerant +spec: + image: + productVersion: "476" + clusterConfig: + catalogLabelSelector: + matchLabels: + trino: trino-fault-tolerant + faultTolerantExecution: + task: + retryAttemptsPerTask: 4 + retryInitialDelay: 10s + retryMaxDelay: 60s + retryDelayScaleFactor: 2.0 + exchangeDeduplicationBufferSize: 64Mi + exchangeManager: + encryptionEnabled: true + sinkBufferPoolMinSize: 20 + sinkBuffersPerPartition: 4 + sinkMaxFileSize: 2Gi + sourceConcurrentReaders: 8 + s3: + baseDirectories: + - "s3://trino-exchange-bucket/spooling" + connection: + reference: minio-connection + maxErrorRetries: 10 + uploadPartSize: 10Mi + coordinators: + roleGroups: + default: + replicas: 1 + workers: + roleGroups: + default: + replicas: 3 +--- +apiVersion: s3.stackable.tech/v1alpha1 +kind: S3Connection +metadata: + name: minio-connection +spec: + host: minio + port: 9000 + accessStyle: Path + credentials: + secretClass: minio-credentials + tls: + verification: + server: + caCert: + secretClass: minio-tls-certificates +--- +apiVersion: secrets.stackable.tech/v1alpha1 +kind: SecretClass +metadata: + name: minio-tls-certificates +spec: + backend: + k8sSearch: + searchNamespace: + pod: {} +--- +apiVersion: v1 +kind: Secret +metadata: + name: minio-tls-certificates + labels: + secrets.stackable.tech/class: minio-tls-certificates +data: + ca.crt: LS0tLS1CRUdJTiBDRVJUSUZJQ0FURS0tLS0tCk1JSUQyVENDQXNHZ0F3SUJBZ0lVTmpxdUdZV3R5SjVhNnd5MjNIejJHUmNNbHdNd0RRWUpLb1pJaHZjTkFRRUwKQlFBd2V6RUxNQWtHQTFVRUJoTUNSRVV4R3pBWkJnTlZCQWdNRWxOamFHeGxjM2RwWnkxSWIyeHpkR1ZwYmpFTwpNQXdHQTFVRUJ3d0ZWMlZrWld3eEtEQW1CZ05WQkFvTUgxTjBZV05yWVdKc1pTQlRhV2R1YVc1bklFRjFkR2h2CmNtbDBlU0JKYm1NeEZUQVRCZ05WQkFNTURITjBZV05yWVdKc1pTNWtaVEFnRncweU16QTJNVFl4TWpVeE1ESmEKR0E4eU1USXpNRFV5TXpFeU5URXdNbG93ZXpFTE1Ba0dBMVVFQmhNQ1JFVXhHekFaQmdOVkJBZ01FbE5qYUd4bApjM2RwWnkxSWIyeHpkR1ZwYmpFT01Bd0dBMVVFQnd3RlYyVmtaV3d4S0RBbUJnTlZCQW9NSDFOMFlXTnJZV0pzClpTQlRhV2R1YVc1bklFRjFkR2h2Y21sMGVTQkpibU14RlRBVEJnTlZCQU1NREhOMFlXTnJZV0pzWlM1a1pUQ0MKQVNJd0RRWUpLb1pJaHZjTkFRRUJCUUFEZ2dFUEFEQ0NBUW9DZ2dFQkFOblYvdmJ5M1JvNTdhMnF2UVJubjBqZQplS01VMitGMCtsWk5DQXZpR1VENWJtOGprOTFvUFpuazBiaFFxZXlFcm1EUzRXVDB6ZXZFUklCSkpEamZMMEQ4CjQ2QmU3UGlNS2UwZEdqb3FJM3o1Y09JZWpjOGFMUEhTSWxnTjZsVDNmSXJ1UzE2Y29RZ0c0dWFLaUhGNStlV0YKRFJVTGR1NmRzWXV6NmRLanFSaVVPaEh3RHd0VUprRHdQditFSXRxbzBIK01MRkxMWU0wK2xFSWFlN2RONUNRNQpTbzVXaEwyY3l2NVZKN2xqL0VBS0NWaUlFZ0NtekRSRGNSZ1NTald5SDRibjZ5WDIwMjZmUEl5V0pGeUVkTC82CmpBT0pBRERSMEd5aE5PWHJFZXFob2NTTW5JYlFWcXdBVDBrTWh1WFN2d3Zscm5MeVRwRzVqWm00bFVNMzRrTUMKQXdFQUFhTlRNRkV3SFFZRFZSME9CQllFRkVJM1JNTWl5aUJqeVExUlM4bmxPUkpWZDFwQk1COEdBMVVkSXdRWQpNQmFBRkVJM1JNTWl5aUJqeVExUlM4bmxPUkpWZDFwQk1BOEdBMVVkRXdFQi93UUZNQU1CQWY4d0RRWUpLb1pJCmh2Y05BUUVMQlFBRGdnRUJBSHRLUlhkRmR0VWh0VWpvZG1ZUWNlZEFEaEhaT2hCcEtpbnpvdTRicmRrNEhmaEYKTHIvV0ZsY1JlbWxWNm1Cc0xweU11SytUZDhaVUVRNkpFUkx5NmxTL2M2cE9HeG5CNGFDbEU4YXQrQytUakpBTwpWbTNXU0k2VlIxY0ZYR2VaamxkVlE2eGtRc2tNSnpPN2RmNmlNVFB0VjVSa01lSlh0TDZYYW1FaTU0ckJvZ05ICk5yYStFSkJRQmwvWmU5ME5qZVlidjIwdVFwWmFhWkZhYVNtVm9OSERwQndsYTBvdXkrTWpPYkMzU3BnT3ExSUMKUGwzTnV3TkxWOFZiT3I1SHJoUUFvS21nU05iM1A4dmFUVnV4L1gwWWZqeS9TN045a1BCYUs5bUZqNzR6d1Y5dwpxU1ExNEtsNWpPM1YzaHJHV1laRWpET2diWnJyRVgxS1hFdXN0K1E9Ci0tLS0tRU5EIENFUlRJRklDQVRFLS0tLS0K + tls.crt: LS0tLS1CRUdJTiBDRVJUSUZJQ0FURS0tLS0tCk1JSUR5RENDQXJDZ0F3SUJBZ0lVQ0kyUE5OcnR6cDZRbDdHa3VhRnhtRGE2VUJvd0RRWUpLb1pJaHZjTkFRRUwKQlFBd2V6RUxNQWtHQTFVRUJoTUNSRVV4R3pBWkJnTlZCQWdNRWxOamFHeGxjM2RwWnkxSWIyeHpkR1ZwYmpFTwpNQXdHQTFVRUJ3d0ZWMlZrWld3eEtEQW1CZ05WQkFvTUgxTjBZV05yWVdKc1pTQlRhV2R1YVc1bklFRjFkR2h2CmNtbDBlU0JKYm1NeEZUQVRCZ05WQkFNTURITjBZV05yWVdKc1pTNWtaVEFnRncweU16QTJNVFl4TWpVeE1ESmEKR0E4eU1USXpNRFV5TXpFeU5URXdNbG93WGpFTE1Ba0dBMVVFQmhNQ1JFVXhHekFaQmdOVkJBZ01FbE5qYUd4bApjM2RwWnkxSWIyeHpkR1ZwYmpFT01Bd0dBMVVFQnd3RlYyVmtaV3d4RWpBUUJnTlZCQW9NQ1ZOMFlXTnJZV0pzClpURU9NQXdHQTFVRUF3d0ZiV2x1YVc4d2dnRWlNQTBHQ1NxR1NJYjNEUUVCQVFVQUE0SUJEd0F3Z2dFS0FvSUIKQVFDanluVnorWEhCOE9DWTRwc0VFWW1qb2JwZHpUbG93d2NTUU4rWURQQ2tCZW9yMFRiODdFZ0x6SksrSllidQpwb1hCbE5JSlBRYW93SkVvL1N6U2s4ZnUyWFNNeXZBWlk0RldHeEp5Mnl4SXh2UC9pYk9HT1l1aVBHWEsyNHQ2ClpjR1RVVmhhdWlaR1Nna1dyZWpXV2g3TWpGUytjMXZhWVpxQitRMXpQczVQRk1sYzhsNVYvK2I4WjdqTUppODQKbU9mSVB4amt2SXlKcjVVa2VGM1VmTHFKUzV5NExGNHR5NEZ0MmlBZDdiYmZIYW5mdlltdjZVb0RWdE1YdFdvMQpvUVBmdjNzaFdybVJMenc2ZXVJQXRiWGM1Q2pCeUlha0NiaURuQVU4cktnK0IxSjRtdlFnckx3bzNxUHJ5Smd4ClNkaWRtWjJtRVI3RXorYzVCMG0vTGlJaEFnTUJBQUdqWHpCZE1Cc0dBMVVkRVFRVU1CS0NCVzFwYm1sdmdnbHMKYjJOaGJHaHZjM1F3SFFZRFZSME9CQllFRkpRMGdENWtFdFFyK3REcERTWjdrd1o4SDVoR01COEdBMVVkSXdRWQpNQmFBRkVJM1JNTWl5aUJqeVExUlM4bmxPUkpWZDFwQk1BMEdDU3FHU0liM0RRRUJDd1VBQTRJQkFRQmNkaGQrClI0Sm9HdnFMQms1OWRxSVVlY2N0dUZzcmRQeHNCaU9GaFlOZ1pxZWRMTTBVTDVEenlmQUhmVk8wTGZTRURkZFgKUkpMOXlMNytrTVUwVDc2Y3ZkQzlYVkFJRTZIVXdUbzlHWXNQcXN1eVpvVmpOcEVESkN3WTNDdm9ubEpWZTRkcQovZ0FiSk1ZQitUU21ZNXlEUHovSkZZL1haellhUGI3T2RlR3VqYlZUNUl4cDk3QXBTOFlJaXY3M0Mwd1ViYzZSCmgwcmNmUmJ5a1NRVWg5dmdWZFhSU1I4RFQzV0NmZHFOek5CWVh2OW1xZlc1ejRzYkdqK2wzd1VsL0kzRi9tSXcKZnlPNEN0aTRha2lHVkhsZmZFeTB3a3pWYUJ4aGNYajJJM0JVVGhCNFpxamxzc2llVmFGa3d2WG1teVJUMG9FVwo1SCtOUEhjcXVTMXpQc2NsCi0tLS0tRU5EIENFUlRJRklDQVRFLS0tLS0K + tls.key: LS0tLS1CRUdJTiBQUklWQVRFIEtFWS0tLS0tCk1JSUV2QUlCQURBTkJna3Foa2lHOXcwQkFRRUZBQVNDQktZd2dnU2lBZ0VBQW9JQkFRQ2p5blZ6K1hIQjhPQ1kKNHBzRUVZbWpvYnBkelRsb3d3Y1NRTitZRFBDa0Jlb3IwVGI4N0VnTHpKSytKWWJ1cG9YQmxOSUpQUWFvd0pFbwovU3pTazhmdTJYU015dkFaWTRGV0d4SnkyeXhJeHZQL2liT0dPWXVpUEdYSzI0dDZaY0dUVVZoYXVpWkdTZ2tXCnJlaldXaDdNakZTK2MxdmFZWnFCK1ExelBzNVBGTWxjOGw1Vi8rYjhaN2pNSmk4NG1PZklQeGprdkl5SnI1VWsKZUYzVWZMcUpTNXk0TEY0dHk0RnQyaUFkN2JiZkhhbmZ2WW12NlVvRFZ0TVh0V28xb1FQZnYzc2hXcm1STHp3NgpldUlBdGJYYzVDakJ5SWFrQ2JpRG5BVThyS2crQjFKNG12UWdyTHdvM3FQcnlKZ3hTZGlkbVoybUVSN0V6K2M1CkIwbS9MaUloQWdNQkFBRUNnZ0VBQWQzdDVzdUNFMjdXY0llc3NxZ3NoSFAwZHRzKyswVzF6K3h6WC8xTnhPRFkKWVhWNkJmbi9mRHJ4dFQ4aVFaZ2VVQzJORTFQaHZveXJXdWMvMm9xYXJjdEd1OUFZV29HNjJLdG9VMnpTSFdZLwpJN3VERTFXV2xOdlJZVFdOYW5DOGV4eGpRRzE4d0RKWjFpdFhTeEl0NWJEM3lrL3dUUlh0dCt1SnpyVjVqb2N1CmNoeERMd293aXUxQWo2ZFJDWk5CejlUSnh5TnI1ME5ZVzJVWEJhVC84N1hyRkZkSndNVFZUMEI3SE9uRzdSQlYKUWxLdzhtcVZiYU5lbmhjdk1qUjI5c3hUekhSK2p4SU8zQndPNk9Hai9PRmhGQllVN1RMWGVsZDFxb2UwdmIyRwpiOGhQcEd1cHRyNUF0OWx3MXc1d1EzSWdpdXRQTkg1cXlEeUNwRWw2RVFLQmdRRGNkYnNsT2ZLSmo3TzJMQXlZCkZ0a1RwaWxFMFYzajBxbVE5M0lqclY0K0RSbUxNRUIyOTk0MDdCVVlRUWoxL0RJYlFjb1oyRUVjVUI1cGRlSHMKN0RNRUQ2WExIYjJKVTEyK2E3c1d5Q05kS2VjZStUNy9JYmxJOFR0MzQwVWxIUTZ6U01TRGNqdmZjRkhWZ3YwcwpDYWpoRng3TmtMRVhUWnI4ZlQzWUloajR2UUtCZ1FDK01nWjFVbW9KdzlJQVFqMnVJVTVDeTl4aldlWURUQU8vCllhWEl6d2xnZTQzOE1jYmI0Y04yU2FOU0dEZ1Y3bnU1a3FpaWhwalBZV0lpaU9CcDlrVFJIWE9kUFc0N3N5ZUkKdDNrd3JwMnpWbFVnbGNNWlo2bW1WM1FWYUFOWmdqVTRSU3Y0ZS9WeFVMamJaYWZqUHRaUnNqWkdwSzBZVTFvdApWajhJZVE3Zk5RS0JnQ1ArWk11ekpsSW5VQ1FTRlF4UHpxbFNtN0pNckpPaHRXV2h3TlRxWFZTc050dHV5VmVqCktIaGpneDR1b0JQcFZSVDJMTlVEWmI0RnByRjVPYVhBK3FOVEdyS0s3SU1iUlZidHArSVVVeEhHNGFGQStIUVgKUVhVVFRhNUpRT1RLVmJnWHpWM1lyTVhTUk1valZNcDMyVWJHeTVTc1p2MXpBamJ2QzhYWjYxSFJBb0dBZEJjUQp2aGU1eFpBUzVEbUtjSGkvemlHa3ViZXJuNk9NUGdxYUtJSEdsVytVOExScFR0ajBkNFRtL1Rydk1PUEovVEU1CllVcUtoenBIcmhDaCtjdHBvY0k2U1dXdm5SenpLbzNpbVFaY0Y1VEFqUTBjY3F0RmI5UzlkRHR5bi9YTUNqYWUKYWlNdll5VUVVRll5TFpDelBGWnNycDNoVVpHKzN5RmZoQXB3TzJrQ2dZQkh3WWFQSWRXNld3NytCMmhpbjBvdwpqYTNjZXN2QTRqYU1Qd1NMVDhPTnRVMUdCU01md2N6TWJuUEhMclJ2Qjg3bjlnUGFSMndRR1VtckZFTzNMUFgvCmtSY09HcFlCSHBEWEVqRGhLa1dkUnVMT0ZnNEhMWmRWOEFOWmxRMFZTY0U4dTNkRERVTzg5cEdEbjA4cVRBcmwKeDlreHN1ZEVWcmtlclpiNVV4RlZxUT09Ci0tLS0tRU5EIFBSSVZBVEUgS0VZLS0tLS0K +--- +apiVersion: secrets.stackable.tech/v1alpha1 +kind: SecretClass +metadata: + name: minio-credentials +spec: + backend: + k8sSearch: + searchNamespace: + pod: {} +--- +apiVersion: v1 +kind: Secret +metadata: + name: minio-credentials-secret + labels: + secrets.stackable.tech/class: minio-credentials +stringData: + accessKey: minio-access-key + secretKey: minio-secret-key +--- +apiVersion: trino.stackable.tech/v1alpha1 +kind: TrinoCatalog +metadata: + name: tpch + labels: + trino: trino-fault-tolerant +spec: + connector: + tpch: {} + diff --git a/docs/modules/trino/pages/usage-guide/configuration.adoc b/docs/modules/trino/pages/usage-guide/configuration.adoc index 59ddc40b..fd7c805a 100644 --- a/docs/modules/trino/pages/usage-guide/configuration.adoc +++ b/docs/modules/trino/pages/usage-guide/configuration.adoc @@ -18,6 +18,9 @@ For a role or role group, at the same level of `config`, you can specify `config For a list of possible configuration properties consult the https://trino.io/docs/current/admin/properties.html[Trino Properties Reference]. +TIP: For fault-tolerant execution configuration, use the dedicated `faultTolerantExecution` section in the cluster configuration instead of `configOverrides`. +See xref:usage-guide/fault-tolerant-execution.adoc[] for detailed instructions. + [source,yaml] ---- workers: diff --git a/docs/modules/trino/pages/usage-guide/fault-tolerant-execution.adoc b/docs/modules/trino/pages/usage-guide/fault-tolerant-execution.adoc new file mode 100644 index 00000000..daa88934 --- /dev/null +++ b/docs/modules/trino/pages/usage-guide/fault-tolerant-execution.adoc @@ -0,0 +1,213 @@ += Fault-tolerant execution +:description: Configure fault-tolerant execution in Trino clusters for improved query resilience and automatic retry capabilities. +:keywords: fault-tolerant execution, retry policy, exchange manager, spooling, query resilience + +Fault-tolerant execution is a mechanism in Trino that enables a cluster to mitigate query failures by retrying queries or their component tasks in the event of failure. +With fault-tolerant execution enabled, intermediate exchange data is spooled and can be re-used by another worker in the event of a worker outage or other fault during query execution. + +By default, if a Trino node lacks the resources to execute a task or otherwise fails during query execution, the query fails and must be run again manually. +The longer the runtime of a query, the more likely it is to be susceptible to such failures. + +NOTE: Fault tolerance does not apply to broken queries or other user error. +For example, Trino does not spend resources retrying a query that fails because its SQL cannot be parsed. + +Take a look at the link:https://trino.io/docs/current/admin/fault-tolerant-execution.html[Trino documentation for fault-tolerant execution {external-link-icon}^] to learn more. + +== Configuration + +Fault-tolerant execution is not enabled by default. +To enable the feature, you need to configure it in your `TrinoCluster` resource by adding a `faultTolerantExecution` section to the cluster configuration. +The configuration uses a structured approach where you choose either `query` or `task` retry policy, each with their specific configuration options. + +=== Query retry policy + +A `query` retry policy instructs Trino to automatically retry a query in the event of an error occurring on a worker node. +This policy is recommended when the majority of the Trino cluster's workload consists of many small queries. + +By default, Trino does not implement fault tolerance for queries whose result set exceeds 32Mi in size. +This limit can be increased by modifying the `exchangeDeduplicationBufferSize` configuration property to be greater than the default value of `32Mi`, but this results in higher memory usage on the coordinator. + +[source,yaml] +---- +spec: + clusterConfig: + faultTolerantExecution: + query: + retryAttempts: 3 + exchangeDeduplicationBufferSize: 64Mi # Increased from default 32Mi +---- + +=== Task retry policy + +A `task` retry policy instructs Trino to retry individual query tasks in the event of failure. +You **must** configure an exchange manager to use the task retry policy. +This policy is recommended when executing large batch queries, as the cluster can more efficiently retry smaller tasks within the query rather than retry the whole query. + +IMPORTANT: A `task` retry policy is best suited for long-running queries, but this policy can result in higher latency for short-running queries executed in high volume. +As a best practice, it is recommended to run a dedicated cluster with a `task` retry policy for large batch queries, separate from another cluster that handles short queries. +There are tools that can help you achieve this by automatically routing queries based on certain criteria (such as query estimates or user) to different Trino clusters. Notable mentions are link:https://github.com/stackabletech/trino-lb[trino-lb {external-link-icon}^] and link:https://github.com/trinodb/trino-gateway[trino-gateway {external-link-icon}^]. + +[source,yaml] +---- +spec: + clusterConfig: + faultTolerantExecution: + task: + retryAttemptsPerTask: 4 + exchangeManager: # Mandatory for Task retry policy + encryptionEnabled: true + s3: + baseDirectories: + - "s3://trino-exchange-bucket/spooling" + connection: + reference: my-s3-connection # <1> +---- +<1> Reference to an xref:concepts:s3.adoc[S3Connection] resource + +== Exchange manager + +Exchange spooling is responsible for storing and managing spooled data for fault-tolerant execution. +You can configure a filesystem-based exchange manager that stores spooled data in a specified location, such as AWS S3 and S3-compatible systems, HDFS, or local filesystem. + +NOTE: An exchange manager is required when using the `task` retry policy and optional for the `query` retry policy. + +=== S3-compatible storage + +You can use S3-compatible storage systems for exchange spooling, including AWS S3 and MinIO. + +[source,yaml] +---- +spec: + clusterConfig: + faultTolerantExecution: + task: + retryAttemptsPerTask: 4 + exchangeManager: + s3: + baseDirectories: # <1> + - "s3://exchange-bucket-1/trino-spooling" + connection: + reference: minio-s3-connection # <2> +--- +apiVersion: s3.stackable.tech/v1alpha1 +kind: S3Connection +metadata: + name: minio-s3-connection +spec: + host: minio.default.svc.cluster.local + port: 9000 + accessStyle: Path + credentials: + secretClass: minio-secret-class + tls: + verification: + server: + caCert: + secretClass: tls +---- +<1> Multiple S3 buckets can be specified to distribute I/O load +<2> S3 connection defined as a reference to an xref:concepts:s3.adoc[S3Connection] resource + +For storage systems like Google Cloud Storage or Azure Blob Storage, you can use the S3-compatible configuration with `configOverrides` to provide the necessary exchange manager properties. + +=== HDFS storage + +You can configure HDFS as the exchange spooling destination: + +[source,yaml] +---- +spec: + clusterConfig: + faultTolerantExecution: + task: + retryAttemptsPerTask: 4 + exchangeManager: + hdfs: + baseDirectories: + - "hdfs://simple-hdfs/exchange-spooling" + hdfs: + configMap: simple-hdfs # <1> +---- +<1> ConfigMap containing HDFS configuration files (created by the HDFS operator) + +=== Local filesystem storage + +Local filesystem storage is supported but only recommended for development or single-node deployments: + +WARNING: It is only recommended to use a local filesystem for exchange in standalone, non-production clusters. +A local directory can only be used for exchange in a distributed cluster if the exchange directory is shared and accessible from all nodes. + +[source,yaml] +---- +spec: + clusterConfig: + faultTolerantExecution: + task: + exchangeManager: + local: + baseDirectories: + - "/trino-exchange" + coordinators: + roleGroups: + default: + replicas: 1 + podOverrides: + spec: + volumes: + - name: trino-exchange + persistentVolumeClaim: + claimName: trino-exchange-pvc + containers: + - name: trino + volumeMounts: + - name: trino-exchange + mountPath: /trino-exchange + workers: + roleGroups: + default: + replicas: 1 + podOverrides: + spec: + volumes: + - name: trino-exchange + persistentVolumeClaim: + claimName: trino-exchange-pvc + containers: + - name: trino + volumeMounts: + - name: trino-exchange + mountPath: /trino-exchange +--- +kind: PersistentVolumeClaim +apiVersion: v1 +metadata: + name: trino-exchange-pvc +spec: + accessModes: + - ReadWriteOnce + resources: + requests: + storage: 50Gi +---- + +== Connector support + +Support for fault-tolerant execution of SQL statements varies on a per-connector basis. +Take a look at the link:https://trino.io/docs/current/admin/fault-tolerant-execution.html#configuration[Trino documentation {external-link-icon}^] to see which connectors support fault-tolerant execution. + +When using connectors that do not explicitly support fault-tolerant execution, you may encounter a "This connector does not support query retries" error message. + +== Example + +Here's an example of a Trino cluster with fault-tolerant execution enabled using the `task` retry policy and MinIO backed S3 as the exchange manager: + +[source,bash] +---- +stackablectl operator install commons secret listener trino +helm install minio oci://registry-1.docker.io/bitnamicharts/minio --version 17.0.19 --set auth.rootUser=minio-access-key --set auth.rootPassword=minio-secret-key --set tls.enabled=true --set tls.server.existingSecret=minio-tls-certificates --set tls.existingSecret=minio-tls-certificates --set tls.existingCASecret=minio-tls-certificates --set tls.autoGenerated.enabled=false --set provisioning.enabled=true --set provisioning.buckets[0].name=trino-exchange-bucket --set global.security.allowInsecureImages=true --set image.repository=bitnamilegacy/minio --set clientImage.repository=bitnamilegacy/minio-client --set defaultInitContainers.volumePermissions.image.repository=bitnamilegacy/os-shell --set console.image.repository=bitnamilegacy/minio-object-browser +---- + +[source,yaml] +---- +include::example$usage-guide/fault-tolerant-execution.yaml[] +---- diff --git a/docs/modules/trino/pages/usage-guide/operations/graceful-shutdown.adoc b/docs/modules/trino/pages/usage-guide/operations/graceful-shutdown.adoc index f24ce39e..a82ada2e 100644 --- a/docs/modules/trino/pages/usage-guide/operations/graceful-shutdown.adoc +++ b/docs/modules/trino/pages/usage-guide/operations/graceful-shutdown.adoc @@ -80,10 +80,12 @@ spec: All queries that take less than the minimal graceful shutdown period of all roleGroups (`1` hour as a default) are guaranteed to not be disturbed by regular termination of Pods. They can obviously still fail when, for example, a Kubernetes node dies or gets rebooted before it is fully drained. -Because of this, the operator automatically restricts the execution time of queries to the minimal graceful shutdown period of all roleGroups using the Trino configuration `query.max-execution-time=3600s`. +Because of this, the operator automatically restricts the execution time of queries to the minimal graceful shutdown period of all roleGroups using the Trino configuration `query.max-execution-time=3600s` when xref:usage-guide/fault-tolerant-execution.adoc[fault tolerant execution] is not configured. This causes all queries that take longer than 1 hour to fail with the error message `Query failed: Query exceeded the maximum execution time limit of 3600s.00s`. -In case you need to execute queries that take longer than the configured graceful shutdown period, you need to increase the `query.max-execution-time` property as follows: +However, when xref:usage-guide/fault-tolerant-execution.adoc[fault tolerant execution] is enabled, the `query.max-execution-time` restriction is not applied since queries can be automatically retried in case of failures, allowing them to run indefinitely without being cancelled by worker restarts. + +In case you need to execute queries that take longer than the configured graceful shutdown period and do not want to configure fault tolerant execution, you can increase the `query.max-execution-time` property as follows: [source,yaml] ---- @@ -95,8 +97,6 @@ spec: ---- Keep in mind, that queries taking longer than the graceful shutdown period are now subject to failure when a Trino worker gets shut down. -Running into this issue can be circumvented by using https://trino.io/docs/current/admin/fault-tolerant-execution.html[Fault-tolerant execution], which is not supported natively yet. -Until native support is added, you will have to use `configOverrides` to enable it. == Authorization requirements diff --git a/docs/modules/trino/partials/nav.adoc b/docs/modules/trino/partials/nav.adoc index 630f4e26..b706aa8c 100644 --- a/docs/modules/trino/partials/nav.adoc +++ b/docs/modules/trino/partials/nav.adoc @@ -6,6 +6,7 @@ ** xref:trino:usage-guide/connect_to_trino.adoc[] ** xref:trino:usage-guide/listenerclass.adoc[] ** xref:trino:usage-guide/configuration.adoc[] +** xref:trino:usage-guide/fault-tolerant-execution.adoc[] ** xref:trino:usage-guide/s3.adoc[] ** xref:trino:usage-guide/security.adoc[] ** xref:trino:usage-guide/monitoring.adoc[] diff --git a/rust/operator-binary/src/command.rs b/rust/operator-binary/src/command.rs index 6c80f5e9..4b6791d5 100644 --- a/rust/operator-binary/src/command.rs +++ b/rust/operator-binary/src/command.rs @@ -14,7 +14,8 @@ use crate::{ CONFIG_DIR_NAME, Container, LOG_PROPERTIES, RW_CONFIG_DIR_NAME, STACKABLE_CLIENT_TLS_DIR, STACKABLE_INTERNAL_TLS_DIR, STACKABLE_MOUNT_INTERNAL_TLS_DIR, STACKABLE_MOUNT_SERVER_TLS_DIR, STACKABLE_SERVER_TLS_DIR, STACKABLE_TLS_STORE_PASSWORD, - SYSTEM_TRUST_STORE, SYSTEM_TRUST_STORE_PASSWORD, TrinoRole, v1alpha1, + SYSTEM_TRUST_STORE, SYSTEM_TRUST_STORE_PASSWORD, TrinoRole, + fault_tolerant_execution::ResolvedFaultTolerantExecutionConfig, v1alpha1, }, }; @@ -22,6 +23,7 @@ pub fn container_prepare_args( trino: &v1alpha1::TrinoCluster, catalogs: &[CatalogConfig], merged_config: &v1alpha1::TrinoConfig, + resolved_fte_config: &Option, ) -> Vec { let mut args = vec![]; @@ -78,12 +80,18 @@ pub fn container_prepare_args( args.extend_from_slice(&catalog.init_container_extra_start_commands); }); + // Add the commands that are needed for fault tolerant execution (e.g., TLS certificates for S3) + if let Some(resolved_fte) = resolved_fte_config { + args.extend_from_slice(&resolved_fte.init_container_extra_start_commands); + } + args } pub fn container_trino_args( authentication_config: &TrinoAuthenticationConfig, catalogs: &[CatalogConfig], + resolved_fte_config: &Option, ) -> Vec { let mut args = vec![ // copy config files to a writeable empty folder @@ -110,6 +118,14 @@ pub fn container_trino_args( args.push(format!("export {env_name}=\"$(cat {file})\"")); } }); + + // Add fault tolerant execution environment variables from files + if let Some(resolved_fte) = resolved_fte_config { + for (env_name, file) in &resolved_fte.load_env_from_files { + args.push(format!("export {env_name}=\"$(cat {file})\"")); + } + } + args.push("set -x".to_string()); // Start command diff --git a/rust/operator-binary/src/controller.rs b/rust/operator-binary/src/controller.rs index 236cb20f..54e1400e 100644 --- a/rust/operator-binary/src/controller.rs +++ b/rust/operator-binary/src/controller.rs @@ -78,14 +78,16 @@ use crate::{ command, config, crd::{ ACCESS_CONTROL_PROPERTIES, APP_NAME, CONFIG_DIR_NAME, CONFIG_PROPERTIES, Container, - DISCOVERY_URI, ENV_INTERNAL_SECRET, HTTP_PORT, HTTP_PORT_NAME, HTTPS_PORT, HTTPS_PORT_NAME, - JVM_CONFIG, JVM_SECURITY_PROPERTIES, LOG_PROPERTIES, MAX_TRINO_LOG_FILES_SIZE, - METRICS_PORT, METRICS_PORT_NAME, NODE_PROPERTIES, RW_CONFIG_DIR_NAME, - STACKABLE_CLIENT_TLS_DIR, STACKABLE_INTERNAL_TLS_DIR, STACKABLE_MOUNT_INTERNAL_TLS_DIR, - STACKABLE_MOUNT_SERVER_TLS_DIR, STACKABLE_SERVER_TLS_DIR, TrinoRole, + DISCOVERY_URI, ENV_INTERNAL_SECRET, EXCHANGE_MANAGER_PROPERTIES, HTTP_PORT, HTTP_PORT_NAME, + HTTPS_PORT, HTTPS_PORT_NAME, JVM_CONFIG, JVM_SECURITY_PROPERTIES, LOG_PROPERTIES, + MAX_TRINO_LOG_FILES_SIZE, METRICS_PORT, METRICS_PORT_NAME, NODE_PROPERTIES, + RW_CONFIG_DIR_NAME, STACKABLE_CLIENT_TLS_DIR, STACKABLE_INTERNAL_TLS_DIR, + STACKABLE_MOUNT_INTERNAL_TLS_DIR, STACKABLE_MOUNT_SERVER_TLS_DIR, STACKABLE_SERVER_TLS_DIR, + TrinoRole, authentication::resolve_authentication_classes, catalog, discovery::{TrinoDiscovery, TrinoDiscoveryProtocol, TrinoPodRef}, + fault_tolerant_execution::ResolvedFaultTolerantExecutionConfig, rolegroup_headless_service_name, v1alpha1, }, listener::{ @@ -298,6 +300,11 @@ pub enum Error { source: crate::operations::graceful_shutdown::Error, }, + #[snafu(display("failed to configure fault tolerant execution"))] + FaultTolerantExecution { + source: crate::crd::fault_tolerant_execution::Error, + }, + #[snafu(display("failed to get required Labels"))] GetRequiredLabels { source: @@ -424,6 +431,20 @@ pub async fn reconcile_trino( catalogs.push(catalog_config); } + // Resolve fault tolerant execution configuration with S3 connections if needed + let resolved_fte_config = match trino.spec.cluster_config.fault_tolerant_execution.as_ref() { + Some(fte_config) => Some( + ResolvedFaultTolerantExecutionConfig::from_config( + fte_config, + Some(client), + &trino.namespace_r().context(ReadRoleSnafu)?, + ) + .await + .context(FaultTolerantExecutionSnafu)?, + ), + None => None, + }; + let validated_config = validated_product_config( trino, // The Trino version is a single number like 396. @@ -526,6 +547,7 @@ pub async fn reconcile_trino( &trino_authentication_config, &trino_opa_config, &client.kubernetes_cluster_info, + &resolved_fte_config, )?; let rg_catalog_configmap = build_rolegroup_catalog_config_map( trino, @@ -543,6 +565,7 @@ pub async fn reconcile_trino( &trino_authentication_config, &catalogs, &rbac_sa.name_any(), + &resolved_fte_config, )?; cluster_resources @@ -651,6 +674,7 @@ fn build_rolegroup_config_map( trino_authentication_config: &TrinoAuthenticationConfig, trino_opa_config: &Option, cluster_info: &KubernetesClusterInfo, + resolved_fte_config: &Option, ) -> Result { let mut cm_conf_data = BTreeMap::new(); @@ -712,6 +736,16 @@ fn build_rolegroup_config_map( dynamic_resolved_config .extend(graceful_shutdown_config_properties(trino, trino_role)); + // Add fault tolerant execution properties from resolved configuration + if let Some(resolved_fte) = resolved_fte_config { + dynamic_resolved_config.extend( + resolved_fte + .config_properties + .iter() + .map(|(k, v)| (k.clone(), Some(v.clone()))), + ); + } + // Add static properties and overrides dynamic_resolved_config.extend(transformed_config); @@ -776,6 +810,22 @@ fn build_rolegroup_config_map( cm_conf_data.insert(JVM_CONFIG.to_string(), jvm_config.to_string()); + // Add exchange manager properties from resolved fault tolerant execution configuration + if let Some(resolved_fte) = resolved_fte_config { + if !resolved_fte.exchange_manager_properties.is_empty() { + let exchange_props_with_options: BTreeMap> = resolved_fte + .exchange_manager_properties + .iter() + .map(|(k, v)| (k.clone(), Some(v.clone()))) + .collect(); + cm_conf_data.insert( + EXCHANGE_MANAGER_PROPERTIES.to_string(), + to_java_properties_string(exchange_props_with_options.iter()) + .with_context(|_| FailedToWriteJavaPropertiesSnafu)?, + ); + } + } + let jvm_sec_props: BTreeMap> = config .get(&PropertyNameKind::File(JVM_SECURITY_PROPERTIES.to_string())) .cloned() @@ -884,6 +934,7 @@ fn build_rolegroup_statefulset( trino_authentication_config: &TrinoAuthenticationConfig, catalogs: &[CatalogConfig], sa_name: &str, + resolved_fte_config: &Option, ) -> Result { let role = trino .role(trino_role) @@ -974,6 +1025,7 @@ fn build_rolegroup_statefulset( &mut cb_trino, catalogs, &requested_secret_lifetime, + resolved_fte_config, )?; let mut prepare_args = vec![]; @@ -992,6 +1044,7 @@ fn build_rolegroup_statefulset( trino, catalogs, merged_config, + resolved_fte_config, )); prepare_args @@ -1056,7 +1109,12 @@ fn build_rolegroup_statefulset( "-c".to_string(), ]) .args(vec![ - command::container_trino_args(trino_authentication_config, catalogs).join("\n"), + command::container_trino_args( + trino_authentication_config, + catalogs, + resolved_fte_config, + ) + .join("\n"), ]) .add_env_vars(env) .add_volume_mount("config", CONFIG_DIR_NAME) @@ -1524,6 +1582,7 @@ fn create_tls_volume( .build()) } +#[allow(clippy::too_many_arguments)] fn tls_volume_mounts( trino: &v1alpha1::TrinoCluster, trino_role: &TrinoRole, @@ -1532,6 +1591,7 @@ fn tls_volume_mounts( cb_trino: &mut ContainerBuilder, catalogs: &[CatalogConfig], requested_secret_lifetime: &Duration, + resolved_fte_config: &Option, ) -> Result<()> { if let Some(server_tls) = trino.get_server_tls() { cb_prepare @@ -1611,6 +1671,19 @@ fn tls_volume_mounts( .context(AddVolumeSnafu)?; } + // fault tolerant execution S3 credentials and other resources + if let Some(resolved_fte) = resolved_fte_config { + cb_prepare + .add_volume_mounts(resolved_fte.volume_mounts.clone()) + .context(AddVolumeMountSnafu)?; + cb_trino + .add_volume_mounts(resolved_fte.volume_mounts.clone()) + .context(AddVolumeMountSnafu)?; + pod_builder + .add_volumes(resolved_fte.volumes.clone()) + .context(AddVolumeSnafu)?; + } + Ok(()) } @@ -1780,6 +1853,7 @@ mod tests { &trino_authentication_config, &trino_opa_config, &cluster_info, + &None, ) .unwrap() } diff --git a/rust/operator-binary/src/crd/fault_tolerant_execution.rs b/rust/operator-binary/src/crd/fault_tolerant_execution.rs new file mode 100644 index 00000000..7db48e51 --- /dev/null +++ b/rust/operator-binary/src/crd/fault_tolerant_execution.rs @@ -0,0 +1,860 @@ +//! This module handles fault tolerant execution configuration for Trino. +//! +//! It processes the FaultTolerantExecutionConfig from the cluster configuration and +//! generates the appropriate properties for config.properties and exchange-manager.properties. +//! +//! Based on the Trino documentation: + +use std::collections::{BTreeMap, HashMap}; + +use serde::{Deserialize, Serialize}; +use snafu::Snafu; +use stackable_operator::{ + builder::pod::volume::{VolumeBuilder, VolumeMountBuilder}, + client::Client, + commons::tls_verification::{CaCert, TlsServerVerification, TlsVerification}, + crd::s3, + k8s_openapi::{ + api::core::v1::{Volume, VolumeMount}, + apimachinery::pkg::api::resource::Quantity, + }, + schemars::{self, JsonSchema}, + time::Duration, +}; + +use super::catalog::commons::HdfsConnection; +use crate::{ + command, + crd::{CONFIG_DIR_NAME, STACKABLE_CLIENT_TLS_DIR}, +}; + +#[derive(Clone, Debug, Deserialize, JsonSchema, PartialEq, Serialize)] +#[serde(rename_all = "camelCase")] +pub enum FaultTolerantExecutionConfig { + /// Query-level fault tolerant execution. Retries entire queries on failure. + Query(QueryRetryConfig), + + /// Task-level fault tolerant execution. Retries individual tasks on failure (requires exchange manager). + Task(TaskRetryConfig), +} + +#[derive(Clone, Debug, Deserialize, JsonSchema, PartialEq, Serialize)] +#[serde(rename_all = "camelCase")] +pub struct QueryRetryConfig { + /// Maximum number of times Trino may attempt to retry a query before declaring it failed. + #[serde(skip_serializing_if = "Option::is_none")] + pub retry_attempts: Option, + + /// Minimum time that a failed query must wait before it is retried. + #[serde(skip_serializing_if = "Option::is_none")] + pub retry_initial_delay: Option, + + /// Maximum time that a failed query must wait before it is retried. + #[serde(skip_serializing_if = "Option::is_none")] + pub retry_max_delay: Option, + + /// Factor by which retry delay is increased on each query failure. + #[serde(skip_serializing_if = "Option::is_none")] + pub retry_delay_scale_factor: Option, + + /// Data size of the coordinator's in-memory buffer used to store output of query stages. + #[serde(skip_serializing_if = "Option::is_none")] + pub exchange_deduplication_buffer_size: Option, + + /// Exchange manager configuration for spooling intermediate data during fault tolerant execution. + /// Optional for Query retry policy, recommended for large result sets. + #[serde(skip_serializing_if = "Option::is_none")] + pub exchange_manager: Option, +} + +#[derive(Clone, Debug, Deserialize, JsonSchema, PartialEq, Serialize)] +#[serde(rename_all = "camelCase")] +pub struct TaskRetryConfig { + /// Maximum number of times Trino may attempt to retry a single task before declaring the query failed. + #[serde(skip_serializing_if = "Option::is_none")] + pub retry_attempts_per_task: Option, + + /// Minimum time that a failed task must wait before it is retried. + #[serde(skip_serializing_if = "Option::is_none")] + pub retry_initial_delay: Option, + + /// Maximum time that a failed task must wait before it is retried. + #[serde(skip_serializing_if = "Option::is_none")] + pub retry_max_delay: Option, + + /// Factor by which retry delay is increased on each task failure. + #[serde(skip_serializing_if = "Option::is_none")] + pub retry_delay_scale_factor: Option, + + /// Data size of the coordinator's in-memory buffer used to store output of query stages. + #[serde(skip_serializing_if = "Option::is_none")] + pub exchange_deduplication_buffer_size: Option, + + /// Exchange manager configuration for spooling intermediate data during fault tolerant execution. + /// Required for Task retry policy. + pub exchange_manager: ExchangeManagerConfig, +} + +#[derive(Clone, Debug, Deserialize, JsonSchema, PartialEq, Serialize)] +#[serde(rename_all = "camelCase")] +pub struct ExchangeManagerConfig { + /// Whether to enable encryption of spooling data. + #[serde(skip_serializing_if = "Option::is_none")] + pub encryption_enabled: Option, + + /// The minimum buffer pool size for an exchange sink. The larger the buffer pool size, + /// the larger the write parallelism and memory usage. + #[serde(skip_serializing_if = "Option::is_none")] + pub sink_buffer_pool_min_size: Option, + + /// The number of buffers per partition in the buffer pool. The larger the buffer pool size, + /// the larger the write parallelism and memory usage. + #[serde(skip_serializing_if = "Option::is_none")] + pub sink_buffers_per_partition: Option, + + /// Max data size of files written by exchange sinks. + #[serde(skip_serializing_if = "Option::is_none")] + pub sink_max_file_size: Option, + + /// Number of concurrent readers to read from spooling storage. The larger the number of + /// concurrent readers, the larger the read parallelism and memory usage. + #[serde(skip_serializing_if = "Option::is_none")] + pub source_concurrent_readers: Option, + + /// Backend-specific configuration. + #[serde(flatten)] + pub backend: ExchangeManagerBackend, + + /// The `configOverrides` allow overriding arbitrary exchange manager properties. + #[serde(default)] + pub config_overrides: HashMap, +} + +#[derive(Clone, Debug, Deserialize, JsonSchema, PartialEq, Serialize)] +#[serde(rename_all = "camelCase")] +pub enum ExchangeManagerBackend { + /// S3-compatible storage configuration. + S3(S3ExchangeConfig), + + /// HDFS-based exchange manager. + Hdfs(HdfsExchangeConfig), + + /// Local filesystem storage (not recommended for production). + Local(LocalExchangeConfig), +} + +#[derive(Clone, Debug, Deserialize, JsonSchema, PartialEq, Serialize)] +#[serde(rename_all = "camelCase")] +pub struct S3ExchangeConfig { + /// S3 bucket URIs for spooling data (e.g., s3://bucket1,s3://bucket2). + pub base_directories: Vec, + + /// S3 connection configuration. + /// Learn more about S3 configuration in the [S3 concept docs](DOCS_BASE_URL_PLACEHOLDER/concepts/s3). + pub connection: stackable_operator::crd::s3::v1alpha1::InlineConnectionOrReference, + + /// IAM role to assume for S3 access. + #[serde(skip_serializing_if = "Option::is_none")] + pub iam_role: Option, + + /// External ID for the IAM role trust policy. + #[serde(skip_serializing_if = "Option::is_none")] + pub external_id: Option, + + /// Maximum number of times the S3 client should retry a request. + #[serde(skip_serializing_if = "Option::is_none")] + pub max_error_retries: Option, + + /// Part data size for S3 multi-part upload. + #[serde(skip_serializing_if = "Option::is_none")] + pub upload_part_size: Option, +} + +#[derive(Clone, Debug, Deserialize, JsonSchema, PartialEq, Serialize)] +#[serde(rename_all = "camelCase")] +pub struct HdfsExchangeConfig { + /// HDFS URIs for spooling data. + pub base_directories: Vec, + + /// HDFS connection configuration. + pub hdfs: HdfsConnection, + + /// Block data size for HDFS storage. + #[serde(skip_serializing_if = "Option::is_none")] + pub block_size: Option, + + /// Skip directory scheme validation to support Hadoop-compatible file systems. + #[serde(skip_serializing_if = "Option::is_none")] + pub skip_directory_scheme_validation: Option, +} + +#[derive(Clone, Debug, Deserialize, Eq, JsonSchema, PartialEq, Serialize)] +#[serde(rename_all = "camelCase")] +pub struct LocalExchangeConfig { + /// Local filesystem paths for exchange storage. + pub base_directories: Vec, +} + +#[derive(Snafu, Debug)] +pub enum Error { + #[snafu(display("Failed to resolve S3 connection"))] + S3Connection { + source: s3::v1alpha1::ConnectionError, + }, + + #[snafu(display("trino does not support disabling the TLS verification of S3 servers"))] + S3TlsNoVerificationNotSupported, + + #[snafu(display("failed to convert data size for [{field}] to bytes"))] + QuantityConversion { + source: stackable_operator::memory::Error, + field: &'static str, + }, +} + +/// Fault tolerant execution configuration with external resources resolved +pub struct ResolvedFaultTolerantExecutionConfig { + /// Properties to add to config.properties + pub config_properties: BTreeMap, + + /// Properties to add to exchange-manager.properties (if needed) + pub exchange_manager_properties: BTreeMap, + + /// Volumes required for the configuration (e.g., for S3 credentials) + pub volumes: Vec, + + /// Volume mounts required for the configuration + pub volume_mounts: Vec, + + /// Env-Vars that should be exported from files. + /// You can think of it like `export ="$(cat )"` + pub load_env_from_files: BTreeMap, + + /// Additional commands that need to be executed before starting Trino + pub init_container_extra_start_commands: Vec, +} + +impl ResolvedFaultTolerantExecutionConfig { + /// Helper function to insert optional values into properties map + fn insert_if_present( + properties: &mut BTreeMap, + key: &str, + value: Option, + ) { + if let Some(v) = value { + properties.insert(key.to_string(), v.to_string()); + } + } + + /// Helper function to insert optional Quantity values after converting to Trino bytes string + fn insert_quantity_if_present( + properties: &mut BTreeMap, + key: &'static str, + quantity: Option<&Quantity>, + ) -> Result<(), Error> { + if let Some(q) = quantity { + use snafu::ResultExt; + let v = crate::crd::quantity_to_trino_bytes(q) + .context(QuantityConversionSnafu { field: key })?; + properties.insert(key.to_string(), v); + } + Ok(()) + } + + /// Create a resolved fault tolerant execution configuration from the cluster config + pub async fn from_config( + config: &FaultTolerantExecutionConfig, + client: Option<&Client>, + namespace: &str, + ) -> Result { + let mut config_properties = BTreeMap::new(); + + // Handle different retry policies and their configurations + let (retry_policy_str, exchange_manager_opt) = match config { + FaultTolerantExecutionConfig::Query(query_config) => { + // Set query-specific properties + Self::insert_if_present( + &mut config_properties, + "query-retry-attempts", + query_config.retry_attempts, + ); + Self::insert_if_present( + &mut config_properties, + "retry-initial-delay", + query_config + .retry_initial_delay + .as_ref() + .map(|d| format!("{}s", d.as_secs())), + ); + Self::insert_if_present( + &mut config_properties, + "retry-max-delay", + query_config + .retry_max_delay + .as_ref() + .map(|d| format!("{}s", d.as_secs())), + ); + Self::insert_if_present( + &mut config_properties, + "retry-delay-scale-factor", + query_config.retry_delay_scale_factor.as_ref(), + ); + Self::insert_quantity_if_present( + &mut config_properties, + "exchange.deduplication-buffer-size", + query_config.exchange_deduplication_buffer_size.as_ref(), + )?; + + ("QUERY", query_config.exchange_manager.as_ref()) + } + FaultTolerantExecutionConfig::Task(task_config) => { + // Set task-specific properties + Self::insert_if_present( + &mut config_properties, + "task-retry-attempts-per-task", + task_config.retry_attempts_per_task, + ); + Self::insert_if_present( + &mut config_properties, + "retry-initial-delay", + task_config + .retry_initial_delay + .as_ref() + .map(|d| format!("{}s", d.as_secs())), + ); + Self::insert_if_present( + &mut config_properties, + "retry-max-delay", + task_config + .retry_max_delay + .as_ref() + .map(|d| format!("{}s", d.as_secs())), + ); + Self::insert_if_present( + &mut config_properties, + "retry-delay-scale-factor", + task_config.retry_delay_scale_factor.as_ref(), + ); + Self::insert_quantity_if_present( + &mut config_properties, + "exchange.deduplication-buffer-size", + task_config.exchange_deduplication_buffer_size.as_ref(), + )?; + + ("TASK", Some(&task_config.exchange_manager)) + } + }; + + config_properties.insert("retry-policy".to_string(), retry_policy_str.to_string()); + + let mut exchange_manager_properties = BTreeMap::new(); + if let Some(exchange_config) = exchange_manager_opt { + Self::insert_if_present( + &mut config_properties, + "fault-tolerant-execution.exchange-encryption-enabled", + exchange_config.encryption_enabled, + ); + Self::insert_if_present( + &mut exchange_manager_properties, + "exchange.sink-buffer-pool-min-size", + exchange_config.sink_buffer_pool_min_size, + ); + Self::insert_if_present( + &mut exchange_manager_properties, + "exchange.sink-buffers-per-partition", + exchange_config.sink_buffers_per_partition, + ); + Self::insert_quantity_if_present( + &mut exchange_manager_properties, + "exchange.sink-max-file-size", + exchange_config.sink_max_file_size.as_ref(), + )?; + Self::insert_if_present( + &mut exchange_manager_properties, + "exchange.source-concurrent-readers", + exchange_config.source_concurrent_readers, + ); + + // Add backend-specific configuration + match &exchange_config.backend { + ExchangeManagerBackend::S3(s3_config) => { + exchange_manager_properties.insert( + "exchange-manager.name".to_string(), + "filesystem".to_string(), + ); + exchange_manager_properties.insert( + "exchange.base-directories".to_string(), + s3_config.base_directories.join(","), + ); + + Self::insert_if_present( + &mut exchange_manager_properties, + "exchange.s3.iam-role", + s3_config.iam_role.as_ref(), + ); + Self::insert_if_present( + &mut exchange_manager_properties, + "exchange.s3.external-id", + s3_config.external_id.as_ref(), + ); + Self::insert_if_present( + &mut exchange_manager_properties, + "exchange.s3.max-error-retries", + s3_config.max_error_retries, + ); + Self::insert_quantity_if_present( + &mut exchange_manager_properties, + "exchange.s3.upload.part-size", + s3_config.upload_part_size.as_ref(), + )?; + } + ExchangeManagerBackend::Hdfs(hdfs_config) => { + exchange_manager_properties + .insert("exchange-manager.name".to_string(), "hdfs".to_string()); + exchange_manager_properties.insert( + "exchange.base-directories".to_string(), + hdfs_config.base_directories.join(","), + ); + + Self::insert_quantity_if_present( + &mut exchange_manager_properties, + "exchange.hdfs.block-size", + hdfs_config.block_size.as_ref(), + )?; + Self::insert_if_present( + &mut exchange_manager_properties, + "exchange.hdfs.skip-directory-scheme-validation", + hdfs_config.skip_directory_scheme_validation, + ); + + let hdfs_config_dir = format!("{CONFIG_DIR_NAME}/exchange-hdfs-config"); + exchange_manager_properties.insert( + "hdfs.config.resources".to_string(), + format!("{hdfs_config_dir}/core-site.xml,{hdfs_config_dir}/hdfs-site.xml"), + ); + } + ExchangeManagerBackend::Local(local_config) => { + exchange_manager_properties.insert( + "exchange-manager.name".to_string(), + "filesystem".to_string(), + ); + exchange_manager_properties.insert( + "exchange.base-directories".to_string(), + local_config.base_directories.join(","), + ); + } + } + + exchange_manager_properties.extend(exchange_config.config_overrides.clone()); + } + + let mut resolved_config = Self { + config_properties, + exchange_manager_properties, + volumes: Vec::new(), + volume_mounts: Vec::new(), + load_env_from_files: BTreeMap::new(), + init_container_extra_start_commands: Vec::new(), + }; + + // Resolve external resources if Kubernetes client is available + // This should always be the case, except for when this function is called during unit tests + if let (Some(client), Some(exchange_config)) = (client, exchange_manager_opt) { + match &exchange_config.backend { + ExchangeManagerBackend::S3(s3_config) => { + resolved_config + .resolve_s3_backend(s3_config, client, namespace) + .await?; + } + ExchangeManagerBackend::Hdfs(hdfs_config) => { + resolved_config.resolve_hdfs_backend(hdfs_config); + } + ExchangeManagerBackend::Local(_) => { + // Local backend requires no external resource resolution + } + } + } + + Ok(resolved_config) + } + + async fn resolve_s3_backend( + &mut self, + s3_config: &S3ExchangeConfig, + client: &Client, + namespace: &str, + ) -> Result<(), Error> { + use snafu::ResultExt; + + let s3_connection = s3_config + .connection + .clone() + .resolve(client, namespace) + .await + .context(S3ConnectionSnafu)?; + + let (volumes, mounts) = s3_connection + .volumes_and_mounts() + .context(S3ConnectionSnafu)?; + self.volumes.extend(volumes); + self.volume_mounts.extend(mounts); + + self.exchange_manager_properties.insert( + "exchange.s3.region".to_string(), + s3_connection.region.name.clone(), + ); + self.exchange_manager_properties.insert( + "exchange.s3.endpoint".to_string(), + s3_connection + .endpoint() + .context(S3ConnectionSnafu)? + .to_string(), + ); + self.exchange_manager_properties.insert( + "exchange.s3.path-style-access".to_string(), + (s3_connection.access_style == s3::v1alpha1::S3AccessStyle::Path).to_string(), + ); + + if let Some((access_key_path, secret_key_path)) = s3_connection.credentials_mount_paths() { + let access_key_env = "EXCHANGE_S3_AWS_ACCESS_KEY".to_string(); + let secret_key_env = "EXCHANGE_S3_AWS_SECRET_KEY".to_string(); + + self.exchange_manager_properties.insert( + "exchange.s3.aws-access-key".to_string(), + format!("${{ENV:{access_key_env}}}"), + ); + self.exchange_manager_properties.insert( + "exchange.s3.aws-secret-key".to_string(), + format!("${{ENV:{secret_key_env}}}"), + ); + + self.load_env_from_files + .insert(access_key_env, access_key_path); + self.load_env_from_files + .insert(secret_key_env, secret_key_path); + } + + if let Some(tls) = s3_connection.tls.tls.as_ref() { + match &tls.verification { + TlsVerification::None {} => return S3TlsNoVerificationNotSupportedSnafu.fail(), + TlsVerification::Server(TlsServerVerification { + ca_cert: CaCert::WebPki {}, + }) => {} + TlsVerification::Server(TlsServerVerification { + ca_cert: CaCert::SecretClass(_), + }) => { + if let Some(ca_cert) = s3_connection.tls.tls_ca_cert_mount_path() { + self.init_container_extra_start_commands.extend( + command::add_cert_to_truststore( + &ca_cert, + STACKABLE_CLIENT_TLS_DIR, + "exchange-s3-ca-cert", + ), + ); + } + } + } + } + + Ok(()) + } + + fn resolve_hdfs_backend(&mut self, hdfs_config: &HdfsExchangeConfig) { + let hdfs_config_dir = format!("{CONFIG_DIR_NAME}/exchange-hdfs-config"); + let volume_name = "exchange-hdfs-config".to_string(); + + self.volumes.push( + VolumeBuilder::new(&volume_name) + .with_config_map(&hdfs_config.hdfs.config_map) + .build(), + ); + self.volume_mounts + .push(VolumeMountBuilder::new(&volume_name, &hdfs_config_dir).build()); + } +} + +#[cfg(test)] +mod tests { + use stackable_operator::time::Duration; + + use super::*; + + #[tokio::test] + async fn test_query_retry_policy_without_exchange_manager() { + let config = FaultTolerantExecutionConfig::Query(QueryRetryConfig { + retry_attempts: Some(5), + retry_initial_delay: Some(Duration::from_secs(15)), + retry_max_delay: Some(Duration::from_secs(90)), + retry_delay_scale_factor: Some(3.0), + exchange_deduplication_buffer_size: Some(Quantity("64Mi".to_string())), + exchange_manager: None, + }); + + let fte_config = + ResolvedFaultTolerantExecutionConfig::from_config(&config, None, "default") + .await + .unwrap(); + + assert_eq!( + fte_config.config_properties.get("retry-policy"), + Some(&"QUERY".to_string()) + ); + assert_eq!( + fte_config.config_properties.get("query-retry-attempts"), + Some(&"5".to_string()) + ); + assert_eq!( + fte_config.config_properties.get("retry-initial-delay"), + Some(&"15s".to_string()) + ); + assert_eq!( + fte_config.config_properties.get("retry-max-delay"), + Some(&"90s".to_string()) + ); + assert_eq!( + fte_config.config_properties.get("retry-delay-scale-factor"), + Some(&"3".to_string()) + ); + assert_eq!( + fte_config + .config_properties + .get("exchange.deduplication-buffer-size"), + Some(&"67108864B".to_string()) + ); + } + + #[tokio::test] + async fn test_query_retry_policy_with_exchange_manager() { + let config = FaultTolerantExecutionConfig::Query(QueryRetryConfig { + retry_attempts: Some(3), + retry_initial_delay: Some(Duration::from_secs(10)), + retry_max_delay: Some(Duration::from_secs(60)), + retry_delay_scale_factor: Some(2.0), + exchange_deduplication_buffer_size: Some(Quantity("100Mi".to_string())), + exchange_manager: Some(ExchangeManagerConfig { + encryption_enabled: Some(true), + sink_buffer_pool_min_size: Some(10), + sink_buffers_per_partition: Some(2), + sink_max_file_size: Some(Quantity("1Gi".to_string())), + source_concurrent_readers: Some(4), + backend: ExchangeManagerBackend::Local(LocalExchangeConfig { + base_directories: vec!["/tmp/exchange".to_string()], + }), + config_overrides: HashMap::new(), + }), + }); + + let fte_config = + ResolvedFaultTolerantExecutionConfig::from_config(&config, None, "default") + .await + .unwrap(); + + assert_eq!( + fte_config.config_properties.get("retry-policy"), + Some(&"QUERY".to_string()) + ); + assert_eq!( + fte_config.config_properties.get("query-retry-attempts"), + Some(&"3".to_string()) + ); + assert_eq!( + fte_config.config_properties.get("retry-initial-delay"), + Some(&"10s".to_string()) + ); + assert_eq!( + fte_config.config_properties.get("retry-max-delay"), + Some(&"60s".to_string()) + ); + assert_eq!( + fte_config.config_properties.get("retry-delay-scale-factor"), + Some(&"2".to_string()) + ); + + assert_eq!( + fte_config + .exchange_manager_properties + .get("exchange-manager.name"), + Some(&"filesystem".to_string()) + ); + assert_eq!( + fte_config + .exchange_manager_properties + .get("exchange.base-directories"), + Some(&"/tmp/exchange".to_string()) + ); + assert_eq!( + fte_config + .config_properties + .get("exchange.deduplication-buffer-size"), + Some(&"104857600B".to_string()) + ); + assert_eq!( + fte_config + .config_properties + .get("fault-tolerant-execution.exchange-encryption-enabled"), + Some(&"true".to_string()) + ); + } + + #[tokio::test] + async fn test_task_retry_policy_with_s3_exchange_manager() { + let config = FaultTolerantExecutionConfig::Task(TaskRetryConfig { + retry_attempts_per_task: Some(2), + retry_initial_delay: None, + retry_max_delay: None, + retry_delay_scale_factor: None, + exchange_deduplication_buffer_size: None, + exchange_manager: ExchangeManagerConfig { + encryption_enabled: None, + sink_buffer_pool_min_size: Some(20), + sink_buffers_per_partition: Some(4), + sink_max_file_size: Some(Quantity("2Gi".to_string())), + source_concurrent_readers: Some(8), + backend: ExchangeManagerBackend::S3(S3ExchangeConfig { + base_directories: vec!["s3://my-bucket/exchange".to_string()], + connection: stackable_operator::crd::s3::v1alpha1::InlineConnectionOrReference::Reference( + "test-s3-connection".to_string() + ), + iam_role: Some("arn:aws:iam::123456789012:role/TrinoRole".to_string()), + external_id: Some("external-id-123".to_string()), + max_error_retries: Some(5), + upload_part_size: Some(Quantity("10Mi".to_string())), + }), + config_overrides: std::collections::HashMap::new(), + }, + }); + + let fte_config = + ResolvedFaultTolerantExecutionConfig::from_config(&config, None, "default") + .await + .unwrap(); + + assert_eq!( + fte_config.config_properties.get("retry-policy"), + Some(&"TASK".to_string()) + ); + assert_eq!( + fte_config + .config_properties + .get("task-retry-attempts-per-task"), + Some(&"2".to_string()) + ); + + assert_eq!( + fte_config + .exchange_manager_properties + .get("exchange-manager.name"), + Some(&"filesystem".to_string()) + ); + assert_eq!( + fte_config + .exchange_manager_properties + .get("exchange.base-directories"), + Some(&"s3://my-bucket/exchange".to_string()) + ); + + assert_eq!( + fte_config + .exchange_manager_properties + .get("exchange.s3.iam-role"), + Some(&"arn:aws:iam::123456789012:role/TrinoRole".to_string()) + ); + assert_eq!( + fte_config + .exchange_manager_properties + .get("exchange.s3.external-id"), + Some(&"external-id-123".to_string()) + ); + assert_eq!( + fte_config + .exchange_manager_properties + .get("exchange.s3.max-error-retries"), + Some(&"5".to_string()) + ); + assert_eq!( + fte_config + .exchange_manager_properties + .get("exchange.s3.upload.part-size"), + Some(&"10485760B".to_string()) + ); + assert_eq!( + fte_config + .exchange_manager_properties + .get("exchange.sink-buffer-pool-min-size"), + Some(&"20".to_string()) + ); + assert_eq!( + fte_config + .exchange_manager_properties + .get("exchange.sink-buffers-per-partition"), + Some(&"4".to_string()) + ); + assert_eq!( + fte_config + .exchange_manager_properties + .get("exchange.sink-max-file-size"), + Some(&"2147483648B".to_string()) + ); + assert_eq!( + fte_config + .exchange_manager_properties + .get("exchange.source-concurrent-readers"), + Some(&"8".to_string()) + ); + } + + #[tokio::test] + async fn test_exchange_manager_config_overrides() { + let mut config_overrides = HashMap::new(); + config_overrides.insert("custom.property".to_string(), "custom-value".to_string()); + config_overrides.insert( + "exchange.s3.upload.part-size".to_string(), + "overridden-value".to_string(), + ); + + let config = FaultTolerantExecutionConfig::Task(TaskRetryConfig { + retry_attempts_per_task: Some(2), + retry_initial_delay: None, + retry_max_delay: None, + retry_delay_scale_factor: None, + exchange_deduplication_buffer_size: None, + exchange_manager: ExchangeManagerConfig { + encryption_enabled: None, + sink_buffer_pool_min_size: None, + sink_buffers_per_partition: None, + sink_max_file_size: None, + source_concurrent_readers: None, + backend: ExchangeManagerBackend::S3(S3ExchangeConfig { + base_directories: vec!["s3://my-bucket/exchange".to_string()], + connection: stackable_operator::crd::s3::v1alpha1::InlineConnectionOrReference::Reference( + "test-s3-connection".to_string() + ), + iam_role: None, + external_id: None, + max_error_retries: None, + upload_part_size: Some(Quantity("10Mi".to_string())), + }), + config_overrides, + }, + }); + + let fte_config = + ResolvedFaultTolerantExecutionConfig::from_config(&config, None, "default") + .await + .unwrap(); + + assert_eq!( + fte_config + .exchange_manager_properties + .get("custom.property"), + Some(&"custom-value".to_string()) + ); + + assert_eq!( + fte_config + .exchange_manager_properties + .get("exchange.s3.upload.part-size"), + Some(&"overridden-value".to_string()) + ); + } +} diff --git a/rust/operator-binary/src/crd/mod.rs b/rust/operator-binary/src/crd/mod.rs index 14fca469..14cab937 100644 --- a/rust/operator-binary/src/crd/mod.rs +++ b/rust/operator-binary/src/crd/mod.rs @@ -2,6 +2,7 @@ pub mod affinity; pub mod authentication; pub mod catalog; pub mod discovery; +pub mod fault_tolerant_execution; use std::{collections::BTreeMap, ops::Div, str::FromStr}; @@ -59,6 +60,7 @@ pub const NODE_PROPERTIES: &str = "node.properties"; pub const LOG_PROPERTIES: &str = "log.properties"; pub const ACCESS_CONTROL_PROPERTIES: &str = "access-control.properties"; pub const JVM_SECURITY_PROPERTIES: &str = "security.properties"; +pub const EXCHANGE_MANAGER_PROPERTIES: &str = "exchange-manager.properties"; // node.properties pub const NODE_ENVIRONMENT: &str = "node.environment"; // config.properties @@ -135,6 +137,15 @@ pub const WORKER_SHUTDOWN_GRACE_PERIOD: Duration = Duration::from_secs(30); /// Safety puffer to guarantee the graceful shutdown works every time. pub const WORKER_GRACEFUL_SHUTDOWN_SAFETY_OVERHEAD: Duration = Duration::from_secs(10); +/// Convert a Kubernetes `Quantity` to a Trino property string in bytes, e.g. `"65536B"`. +pub(crate) fn quantity_to_trino_bytes( + q: &Quantity, +) -> Result { + let in_mebi = MemoryQuantity::try_from(q)?.scale_to(BinaryMultiple::Mebi); + let bytes = (in_mebi.value * 1024.0 * 1024.0).round() as u64; + Ok(format!("{bytes}B")) +} + #[derive(Snafu, Debug)] pub enum Error { #[snafu(display("object has no namespace associated"))] @@ -283,6 +294,12 @@ pub mod versioned { #[serde(default)] pub tls: TrinoTls, + /// Fault tolerant execution configuration. + /// When enabled, Trino can automatically retry queries or tasks in case of failures. + #[serde(skip_serializing_if = "Option::is_none")] + pub fault_tolerant_execution: + Option, + /// Name of the Vector aggregator [discovery ConfigMap](DOCS_BASE_URL_PLACEHOLDER/concepts/service_discovery). /// It must contain the key `ADDRESS` with the address of the Vector aggregator. /// Follow the [logging tutorial](DOCS_BASE_URL_PLACEHOLDER/tutorials/logging-vector-aggregator) diff --git a/rust/operator-binary/src/operations/graceful_shutdown.rs b/rust/operator-binary/src/operations/graceful_shutdown.rs index 92c56a51..5b00bbab 100644 --- a/rust/operator-binary/src/operations/graceful_shutdown.rs +++ b/rust/operator-binary/src/operations/graceful_shutdown.rs @@ -27,16 +27,23 @@ pub fn graceful_shutdown_config_properties( ) -> BTreeMap> { match role { TrinoRole::Coordinator => { - let min_worker_graceful_shutdown_timeout = trino.min_worker_graceful_shutdown_timeout(); - // We know that queries taking longer than the minimum gracefulShutdownTimeout are subject to failure. - // Read operator docs for reasoning. - BTreeMap::from([( - "query.max-execution-time".to_string(), - Some(format!( - "{}s", - min_worker_graceful_shutdown_timeout.as_secs() - )), - )]) + // Only set query.max-execution-time if fault tolerant execution is not configured. + // With fault tolerant execution enabled, queries can be retried and run indefinitely. + if trino.spec.cluster_config.fault_tolerant_execution.is_none() { + let min_worker_graceful_shutdown_timeout = + trino.min_worker_graceful_shutdown_timeout(); + // We know that queries taking longer than the minimum gracefulShutdownTimeout are subject to failure. + // Read operator docs for reasoning. + BTreeMap::from([( + "query.max-execution-time".to_string(), + Some(format!( + "{}s", + min_worker_graceful_shutdown_timeout.as_secs() + )), + )]) + } else { + BTreeMap::new() + } } TrinoRole::Worker => BTreeMap::from([( "shutdown.grace-period".to_string(), diff --git a/tests/templates/kuttl/fault-tolerant-execution/00-assert.yaml b/tests/templates/kuttl/fault-tolerant-execution/00-assert.yaml new file mode 100644 index 00000000..47bfe1ea --- /dev/null +++ b/tests/templates/kuttl/fault-tolerant-execution/00-assert.yaml @@ -0,0 +1,19 @@ +--- +apiVersion: kuttl.dev/v1beta1 +kind: TestAssert +timeout: 300 +--- +apiVersion: v1 +kind: Secret +metadata: + name: minio-credentials +--- +apiVersion: v1 +kind: Secret +metadata: + name: minio-tls-certificates +--- +apiVersion: s3.stackable.tech/v1alpha1 +kind: S3Connection +metadata: + name: minio diff --git a/tests/templates/kuttl/fault-tolerant-execution/00-install-vector-aggregator-discovery-configmap.yaml.j2 b/tests/templates/kuttl/fault-tolerant-execution/00-install-vector-aggregator-discovery-configmap.yaml.j2 new file mode 100644 index 00000000..2d6a0df5 --- /dev/null +++ b/tests/templates/kuttl/fault-tolerant-execution/00-install-vector-aggregator-discovery-configmap.yaml.j2 @@ -0,0 +1,9 @@ +{% if lookup('env', 'VECTOR_AGGREGATOR') %} +--- +apiVersion: v1 +kind: ConfigMap +metadata: + name: vector-aggregator-discovery +data: + ADDRESS: {{ lookup('env', 'VECTOR_AGGREGATOR') }} +{% endif %} diff --git a/tests/templates/kuttl/fault-tolerant-execution/00-patch-ns.yaml.j2 b/tests/templates/kuttl/fault-tolerant-execution/00-patch-ns.yaml.j2 new file mode 100644 index 00000000..67185acf --- /dev/null +++ b/tests/templates/kuttl/fault-tolerant-execution/00-patch-ns.yaml.j2 @@ -0,0 +1,9 @@ +{% if test_scenario['values']['openshift'] == 'true' %} +# see https://github.com/stackabletech/issues/issues/566 +--- +apiVersion: kuttl.dev/v1beta1 +kind: TestStep +commands: + - script: kubectl patch namespace $NAMESPACE -p '{"metadata":{"labels":{"pod-security.kubernetes.io/enforce":"privileged"}}}' + timeout: 120 +{% endif %} diff --git a/tests/templates/kuttl/fault-tolerant-execution/00-rbac.yaml.j2 b/tests/templates/kuttl/fault-tolerant-execution/00-rbac.yaml.j2 new file mode 100644 index 00000000..9cbf0351 --- /dev/null +++ b/tests/templates/kuttl/fault-tolerant-execution/00-rbac.yaml.j2 @@ -0,0 +1,29 @@ +--- +kind: Role +apiVersion: rbac.authorization.k8s.io/v1 +metadata: + name: use-integration-tests-scc +rules: +{% if test_scenario['values']['openshift'] == "true" %} + - apiGroups: ["security.openshift.io"] + resources: ["securitycontextconstraints"] + resourceNames: ["privileged"] + verbs: ["use"] +{% endif %} +--- +apiVersion: v1 +kind: ServiceAccount +metadata: + name: integration-tests-sa +--- +kind: RoleBinding +apiVersion: rbac.authorization.k8s.io/v1 +metadata: + name: use-integration-tests-scc +subjects: + - kind: ServiceAccount + name: integration-tests-sa +roleRef: + kind: Role + name: use-integration-tests-scc + apiGroup: rbac.authorization.k8s.io diff --git a/tests/templates/kuttl/fault-tolerant-execution/00-secrets.yaml b/tests/templates/kuttl/fault-tolerant-execution/00-secrets.yaml new file mode 100644 index 00000000..b9dd795f --- /dev/null +++ b/tests/templates/kuttl/fault-tolerant-execution/00-secrets.yaml @@ -0,0 +1,62 @@ +--- +apiVersion: v1 +kind: Secret +metadata: + name: minio-credentials + labels: + secrets.stackable.tech/class: s3-credentials-class +stringData: + accessKey: minioAccessKey + secretKey: minioSecretKey + # The following two entries are used by the Bitnami chart for MinIO to + # set up credentials for accessing buckets managed by the MinIO tenant. + root-user: minioAccessKey + root-password: minioSecretKey +--- +apiVersion: secrets.stackable.tech/v1alpha1 +kind: SecretClass +metadata: + name: s3-credentials-class +spec: + backend: + k8sSearch: + searchNamespace: + pod: {} +--- +apiVersion: secrets.stackable.tech/v1alpha1 +kind: SecretClass +metadata: + name: minio-tls-certificates +spec: + backend: + k8sSearch: + searchNamespace: + pod: {} +--- +apiVersion: v1 +kind: Secret +metadata: + name: minio-tls-certificates + labels: + secrets.stackable.tech/class: minio-tls-certificates +# Have a look at the folder certs on how to create this +data: + ca.crt: LS0tLS1CRUdJTiBDRVJUSUZJQ0FURS0tLS0tCk1JSUQyVENDQXNHZ0F3SUJBZ0lVTmpxdUdZV3R5SjVhNnd5MjNIejJHUmNNbHdNd0RRWUpLb1pJaHZjTkFRRUwKQlFBd2V6RUxNQWtHQTFVRUJoTUNSRVV4R3pBWkJnTlZCQWdNRWxOamFHeGxjM2RwWnkxSWIyeHpkR1ZwYmpFTwpNQXdHQTFVRUJ3d0ZWMlZrWld3eEtEQW1CZ05WQkFvTUgxTjBZV05yWVdKc1pTQlRhV2R1YVc1bklFRjFkR2h2CmNtbDBlU0JKYm1NeEZUQVRCZ05WQkFNTURITjBZV05yWVdKc1pTNWtaVEFnRncweU16QTJNVFl4TWpVeE1ESmEKR0E4eU1USXpNRFV5TXpFeU5URXdNbG93ZXpFTE1Ba0dBMVVFQmhNQ1JFVXhHekFaQmdOVkJBZ01FbE5qYUd4bApjM2RwWnkxSWIyeHpkR1ZwYmpFT01Bd0dBMVVFQnd3RlYyVmtaV3d4S0RBbUJnTlZCQW9NSDFOMFlXTnJZV0pzClpTQlRhV2R1YVc1bklFRjFkR2h2Y21sMGVTQkpibU14RlRBVEJnTlZCQU1NREhOMFlXTnJZV0pzWlM1a1pUQ0MKQVNJd0RRWUpLb1pJaHZjTkFRRUJCUUFEZ2dFUEFEQ0NBUW9DZ2dFQkFOblYvdmJ5M1JvNTdhMnF2UVJubjBqZQplS01VMitGMCtsWk5DQXZpR1VENWJtOGprOTFvUFpuazBiaFFxZXlFcm1EUzRXVDB6ZXZFUklCSkpEamZMMEQ4CjQ2QmU3UGlNS2UwZEdqb3FJM3o1Y09JZWpjOGFMUEhTSWxnTjZsVDNmSXJ1UzE2Y29RZ0c0dWFLaUhGNStlV0YKRFJVTGR1NmRzWXV6NmRLanFSaVVPaEh3RHd0VUprRHdQditFSXRxbzBIK01MRkxMWU0wK2xFSWFlN2RONUNRNQpTbzVXaEwyY3l2NVZKN2xqL0VBS0NWaUlFZ0NtekRSRGNSZ1NTald5SDRibjZ5WDIwMjZmUEl5V0pGeUVkTC82CmpBT0pBRERSMEd5aE5PWHJFZXFob2NTTW5JYlFWcXdBVDBrTWh1WFN2d3Zscm5MeVRwRzVqWm00bFVNMzRrTUMKQXdFQUFhTlRNRkV3SFFZRFZSME9CQllFRkVJM1JNTWl5aUJqeVExUlM4bmxPUkpWZDFwQk1COEdBMVVkSXdRWQpNQmFBRkVJM1JNTWl5aUJqeVExUlM4bmxPUkpWZDFwQk1BOEdBMVVkRXdFQi93UUZNQU1CQWY4d0RRWUpLb1pJCmh2Y05BUUVMQlFBRGdnRUJBSHRLUlhkRmR0VWh0VWpvZG1ZUWNlZEFEaEhaT2hCcEtpbnpvdTRicmRrNEhmaEYKTHIvV0ZsY1JlbWxWNm1Cc0xweU11SytUZDhaVUVRNkpFUkx5NmxTL2M2cE9HeG5CNGFDbEU4YXQrQytUakpBTwpWbTNXU0k2VlIxY0ZYR2VaamxkVlE2eGtRc2tNSnpPN2RmNmlNVFB0VjVSa01lSlh0TDZYYW1FaTU0ckJvZ05ICk5yYStFSkJRQmwvWmU5ME5qZVlidjIwdVFwWmFhWkZhYVNtVm9OSERwQndsYTBvdXkrTWpPYkMzU3BnT3ExSUMKUGwzTnV3TkxWOFZiT3I1SHJoUUFvS21nU05iM1A4dmFUVnV4L1gwWWZqeS9TN045a1BCYUs5bUZqNzR6d1Y5dwpxU1ExNEtsNWpPM1YzaHJHV1laRWpET2diWnJyRVgxS1hFdXN0K1E9Ci0tLS0tRU5EIENFUlRJRklDQVRFLS0tLS0K + tls.crt: LS0tLS1CRUdJTiBDRVJUSUZJQ0FURS0tLS0tCk1JSUR5RENDQXJDZ0F3SUJBZ0lVQ0kyUE5OcnR6cDZRbDdHa3VhRnhtRGE2VUJvd0RRWUpLb1pJaHZjTkFRRUwKQlFBd2V6RUxNQWtHQTFVRUJoTUNSRVV4R3pBWkJnTlZCQWdNRWxOamFHeGxjM2RwWnkxSWIyeHpkR1ZwYmpFTwpNQXdHQTFVRUJ3d0ZWMlZrWld3eEtEQW1CZ05WQkFvTUgxTjBZV05yWVdKc1pTQlRhV2R1YVc1bklFRjFkR2h2CmNtbDBlU0JKYm1NeEZUQVRCZ05WQkFNTURITjBZV05yWVdKc1pTNWtaVEFnRncweU16QTJNVFl4TWpVeE1ESmEKR0E4eU1USXpNRFV5TXpFeU5URXdNbG93WGpFTE1Ba0dBMVVFQmhNQ1JFVXhHekFaQmdOVkJBZ01FbE5qYUd4bApjM2RwWnkxSWIyeHpkR1ZwYmpFT01Bd0dBMVVFQnd3RlYyVmtaV3d4RWpBUUJnTlZCQW9NQ1ZOMFlXTnJZV0pzClpURU9NQXdHQTFVRUF3d0ZiV2x1YVc4d2dnRWlNQTBHQ1NxR1NJYjNEUUVCQVFVQUE0SUJEd0F3Z2dFS0FvSUIKQVFDanluVnorWEhCOE9DWTRwc0VFWW1qb2JwZHpUbG93d2NTUU4rWURQQ2tCZW9yMFRiODdFZ0x6SksrSllidQpwb1hCbE5JSlBRYW93SkVvL1N6U2s4ZnUyWFNNeXZBWlk0RldHeEp5Mnl4SXh2UC9pYk9HT1l1aVBHWEsyNHQ2ClpjR1RVVmhhdWlaR1Nna1dyZWpXV2g3TWpGUytjMXZhWVpxQitRMXpQczVQRk1sYzhsNVYvK2I4WjdqTUppODQKbU9mSVB4amt2SXlKcjVVa2VGM1VmTHFKUzV5NExGNHR5NEZ0MmlBZDdiYmZIYW5mdlltdjZVb0RWdE1YdFdvMQpvUVBmdjNzaFdybVJMenc2ZXVJQXRiWGM1Q2pCeUlha0NiaURuQVU4cktnK0IxSjRtdlFnckx3bzNxUHJ5Smd4ClNkaWRtWjJtRVI3RXorYzVCMG0vTGlJaEFnTUJBQUdqWHpCZE1Cc0dBMVVkRVFRVU1CS0NCVzFwYm1sdmdnbHMKYjJOaGJHaHZjM1F3SFFZRFZSME9CQllFRkpRMGdENWtFdFFyK3REcERTWjdrd1o4SDVoR01COEdBMVVkSXdRWQpNQmFBRkVJM1JNTWl5aUJqeVExUlM4bmxPUkpWZDFwQk1BMEdDU3FHU0liM0RRRUJDd1VBQTRJQkFRQmNkaGQrClI0Sm9HdnFMQms1OWRxSVVlY2N0dUZzcmRQeHNCaU9GaFlOZ1pxZWRMTTBVTDVEenlmQUhmVk8wTGZTRURkZFgKUkpMOXlMNytrTVUwVDc2Y3ZkQzlYVkFJRTZIVXdUbzlHWXNQcXN1eVpvVmpOcEVESkN3WTNDdm9ubEpWZTRkcQovZ0FiSk1ZQitUU21ZNXlEUHovSkZZL1haellhUGI3T2RlR3VqYlZUNUl4cDk3QXBTOFlJaXY3M0Mwd1ViYzZSCmgwcmNmUmJ5a1NRVWg5dmdWZFhSU1I4RFQzV0NmZHFOek5CWVh2OW1xZlc1ejRzYkdqK2wzd1VsL0kzRi9tSXcKZnlPNEN0aTRha2lHVkhsZmZFeTB3a3pWYUJ4aGNYajJJM0JVVGhCNFpxamxzc2llVmFGa3d2WG1teVJUMG9FVwo1SCtOUEhjcXVTMXpQc2NsCi0tLS0tRU5EIENFUlRJRklDQVRFLS0tLS0K + tls.key: LS0tLS1CRUdJTiBQUklWQVRFIEtFWS0tLS0tCk1JSUV2QUlCQURBTkJna3Foa2lHOXcwQkFRRUZBQVNDQktZd2dnU2lBZ0VBQW9JQkFRQ2p5blZ6K1hIQjhPQ1kKNHBzRUVZbWpvYnBkelRsb3d3Y1NRTitZRFBDa0Jlb3IwVGI4N0VnTHpKSytKWWJ1cG9YQmxOSUpQUWFvd0pFbwovU3pTazhmdTJYU015dkFaWTRGV0d4SnkyeXhJeHZQL2liT0dPWXVpUEdYSzI0dDZaY0dUVVZoYXVpWkdTZ2tXCnJlaldXaDdNakZTK2MxdmFZWnFCK1ExelBzNVBGTWxjOGw1Vi8rYjhaN2pNSmk4NG1PZklQeGprdkl5SnI1VWsKZUYzVWZMcUpTNXk0TEY0dHk0RnQyaUFkN2JiZkhhbmZ2WW12NlVvRFZ0TVh0V28xb1FQZnYzc2hXcm1STHp3NgpldUlBdGJYYzVDakJ5SWFrQ2JpRG5BVThyS2crQjFKNG12UWdyTHdvM3FQcnlKZ3hTZGlkbVoybUVSN0V6K2M1CkIwbS9MaUloQWdNQkFBRUNnZ0VBQWQzdDVzdUNFMjdXY0llc3NxZ3NoSFAwZHRzKyswVzF6K3h6WC8xTnhPRFkKWVhWNkJmbi9mRHJ4dFQ4aVFaZ2VVQzJORTFQaHZveXJXdWMvMm9xYXJjdEd1OUFZV29HNjJLdG9VMnpTSFdZLwpJN3VERTFXV2xOdlJZVFdOYW5DOGV4eGpRRzE4d0RKWjFpdFhTeEl0NWJEM3lrL3dUUlh0dCt1SnpyVjVqb2N1CmNoeERMd293aXUxQWo2ZFJDWk5CejlUSnh5TnI1ME5ZVzJVWEJhVC84N1hyRkZkSndNVFZUMEI3SE9uRzdSQlYKUWxLdzhtcVZiYU5lbmhjdk1qUjI5c3hUekhSK2p4SU8zQndPNk9Hai9PRmhGQllVN1RMWGVsZDFxb2UwdmIyRwpiOGhQcEd1cHRyNUF0OWx3MXc1d1EzSWdpdXRQTkg1cXlEeUNwRWw2RVFLQmdRRGNkYnNsT2ZLSmo3TzJMQXlZCkZ0a1RwaWxFMFYzajBxbVE5M0lqclY0K0RSbUxNRUIyOTk0MDdCVVlRUWoxL0RJYlFjb1oyRUVjVUI1cGRlSHMKN0RNRUQ2WExIYjJKVTEyK2E3c1d5Q05kS2VjZStUNy9JYmxJOFR0MzQwVWxIUTZ6U01TRGNqdmZjRkhWZ3YwcwpDYWpoRng3TmtMRVhUWnI4ZlQzWUloajR2UUtCZ1FDK01nWjFVbW9KdzlJQVFqMnVJVTVDeTl4aldlWURUQU8vCllhWEl6d2xnZTQzOE1jYmI0Y04yU2FOU0dEZ1Y3bnU1a3FpaWhwalBZV0lpaU9CcDlrVFJIWE9kUFc0N3N5ZUkKdDNrd3JwMnpWbFVnbGNNWlo2bW1WM1FWYUFOWmdqVTRSU3Y0ZS9WeFVMamJaYWZqUHRaUnNqWkdwSzBZVTFvdApWajhJZVE3Zk5RS0JnQ1ArWk11ekpsSW5VQ1FTRlF4UHpxbFNtN0pNckpPaHRXV2h3TlRxWFZTc050dHV5VmVqCktIaGpneDR1b0JQcFZSVDJMTlVEWmI0RnByRjVPYVhBK3FOVEdyS0s3SU1iUlZidHArSVVVeEhHNGFGQStIUVgKUVhVVFRhNUpRT1RLVmJnWHpWM1lyTVhTUk1valZNcDMyVWJHeTVTc1p2MXpBamJ2QzhYWjYxSFJBb0dBZEJjUQp2aGU1eFpBUzVEbUtjSGkvemlHa3ViZXJuNk9NUGdxYUtJSEdsVytVOExScFR0ajBkNFRtL1Rydk1PUEovVEU1CllVcUtoenBIcmhDaCtjdHBvY0k2U1dXdm5SenpLbzNpbVFaY0Y1VEFqUTBjY3F0RmI5UzlkRHR5bi9YTUNqYWUKYWlNdll5VUVVRll5TFpDelBGWnNycDNoVVpHKzN5RmZoQXB3TzJrQ2dZQkh3WWFQSWRXNld3NytCMmhpbjBvdwpqYTNjZXN2QTRqYU1Qd1NMVDhPTnRVMUdCU01md2N6TWJuUEhMclJ2Qjg3bjlnUGFSMndRR1VtckZFTzNMUFgvCmtSY09HcFlCSHBEWEVqRGhLa1dkUnVMT0ZnNEhMWmRWOEFOWmxRMFZTY0U4dTNkRERVTzg5cEdEbjA4cVRBcmwKeDlreHN1ZEVWcmtlclpiNVV4RlZxUT09Ci0tLS0tRU5EIFBSSVZBVEUgS0VZLS0tLS0K +--- +apiVersion: s3.stackable.tech/v1alpha1 +kind: S3Connection +metadata: + name: minio +spec: + host: minio + port: 9000 + accessStyle: Path + credentials: + secretClass: s3-credentials-class + tls: + verification: + server: + caCert: + secretClass: minio-tls-certificates diff --git a/tests/templates/kuttl/fault-tolerant-execution/01-assert.yaml b/tests/templates/kuttl/fault-tolerant-execution/01-assert.yaml new file mode 100644 index 00000000..4d24ed7d --- /dev/null +++ b/tests/templates/kuttl/fault-tolerant-execution/01-assert.yaml @@ -0,0 +1,17 @@ +--- +apiVersion: kuttl.dev/v1beta1 +kind: TestAssert +timeout: 600 +--- +apiVersion: apps/v1 +kind: Deployment +metadata: + name: minio +status: + readyReplicas: 1 + replicas: 1 +--- +apiVersion: v1 +kind: Service +metadata: + name: minio diff --git a/tests/templates/kuttl/fault-tolerant-execution/01-install-minio.yaml b/tests/templates/kuttl/fault-tolerant-execution/01-install-minio.yaml new file mode 100644 index 00000000..7a063b49 --- /dev/null +++ b/tests/templates/kuttl/fault-tolerant-execution/01-install-minio.yaml @@ -0,0 +1,11 @@ +--- +apiVersion: kuttl.dev/v1beta1 +kind: TestStep +commands: + - script: >- + helm install minio + --namespace $NAMESPACE + --version 17.0.19 + -f 01_helm-bitnami-minio-values.yaml + oci://registry-1.docker.io/bitnamicharts/minio + timeout: 240 diff --git a/tests/templates/kuttl/fault-tolerant-execution/01_helm-bitnami-minio-values.yaml b/tests/templates/kuttl/fault-tolerant-execution/01_helm-bitnami-minio-values.yaml new file mode 100644 index 00000000..367669e8 --- /dev/null +++ b/tests/templates/kuttl/fault-tolerant-execution/01_helm-bitnami-minio-values.yaml @@ -0,0 +1,79 @@ +--- +global: + security: + allowInsecureImages: true + +image: + repository: bitnamilegacy/minio +clientImage: + repository: bitnamilegacy/minio-client +defaultInitContainers: + volumePermissions: # volumePermissions moved under defaultInitContainers starting with Chart version 17.0.0 + enabled: false + image: + repository: bitnamilegacy/os-shell +console: + image: + repository: bitnamilegacy/minio-object-browser + +mode: standalone +disableWebUI: false +extraEnvVars: + - name: BITNAMI_DEBUG + value: "true" + - name: MINIO_LOG_LEVEL + value: DEBUG + +provisioning: + enabled: true + buckets: + - name: exchange-bucket + resources: + requests: + memory: 1Gi + cpu: "512m" + limits: + memory: "1Gi" + cpu: "1" + podSecurityContext: + enabled: false + containerSecurityContext: + enabled: false + +# volumePermissions can be removed starting with Chart version 17.0.0, moved under defaultInitContainers +volumePermissions: + enabled: false + image: + repository: bitnamilegacy/os-shell + +podSecurityContext: + enabled: false + +containerSecurityContext: + enabled: false + +persistence: + enabled: false + +resources: + requests: + memory: 1Gi + cpu: "512m" + limits: + memory: "1Gi" + cpu: "1" + +auth: + existingSecret: minio-credentials + +service: + type: NodePort + +tls: + enabled: true + autoGenerated: + enabled: false + existingCASecret: minio-tls-certificates + existingSecret: minio-tls-certificates + server: + existingSecret: minio-tls-certificates diff --git a/tests/templates/kuttl/fault-tolerant-execution/02-assert.yaml b/tests/templates/kuttl/fault-tolerant-execution/02-assert.yaml new file mode 100644 index 00000000..d4947eeb --- /dev/null +++ b/tests/templates/kuttl/fault-tolerant-execution/02-assert.yaml @@ -0,0 +1,25 @@ +--- +apiVersion: kuttl.dev/v1beta1 +kind: TestAssert +timeout: 600 +--- +apiVersion: apps/v1 +kind: StatefulSet +metadata: + name: trino-fte-coordinator-default +status: + readyReplicas: 1 + replicas: 1 +--- +apiVersion: apps/v1 +kind: StatefulSet +metadata: + name: trino-fte-worker-default +status: + readyReplicas: 2 + replicas: 2 +--- +apiVersion: trino.stackable.tech/v1alpha1 +kind: TrinoCatalog +metadata: + name: tpch diff --git a/tests/templates/kuttl/fault-tolerant-execution/02-install-trino.yaml.j2 b/tests/templates/kuttl/fault-tolerant-execution/02-install-trino.yaml.j2 new file mode 100644 index 00000000..1cbc298e --- /dev/null +++ b/tests/templates/kuttl/fault-tolerant-execution/02-install-trino.yaml.j2 @@ -0,0 +1,57 @@ +--- +apiVersion: trino.stackable.tech/v1alpha1 +kind: TrinoCluster +metadata: + name: trino-fte +spec: + image: +{% if test_scenario['values']['trino'].find(",") > 0 %} + custom: "{{ test_scenario['values']['trino'].split(',')[1] }}" + productVersion: "{{ test_scenario['values']['trino'].split(',')[0] }}" +{% else %} + productVersion: "{{ test_scenario['values']['trino'] }}" +{% endif %} + pullPolicy: IfNotPresent + clusterConfig: + catalogLabelSelector: + matchLabels: + trino: trino-fte + # Fault tolerant execution with S3/MinIO exchange manager + faultTolerantExecution: + task: + exchangeManager: + s3: + baseDirectories: + - "s3://exchange-bucket/" + connection: + reference: "minio" +{% if lookup('env', 'VECTOR_AGGREGATOR') %} + vectorAggregatorConfigMapName: vector-aggregator-discovery +{% endif %} + coordinators: + config: + logging: + enableVectorAgent: {{ lookup('env', 'VECTOR_AGGREGATOR') | length > 0 }} + roleGroups: + default: + replicas: 1 + config: {} + workers: + config: + gracefulShutdownTimeout: 5s # Let the test run faster + logging: + enableVectorAgent: {{ lookup('env', 'VECTOR_AGGREGATOR') | length > 0 }} + roleGroups: + default: + replicas: 2 + config: {} +--- +apiVersion: trino.stackable.tech/v1alpha1 +kind: TrinoCatalog +metadata: + name: tpch + labels: + trino: trino-fte +spec: + connector: + tpch: {} diff --git a/tests/templates/kuttl/fault-tolerant-execution/03-assert.yaml b/tests/templates/kuttl/fault-tolerant-execution/03-assert.yaml new file mode 100644 index 00000000..168d5d8f --- /dev/null +++ b/tests/templates/kuttl/fault-tolerant-execution/03-assert.yaml @@ -0,0 +1,12 @@ +--- +apiVersion: kuttl.dev/v1beta1 +kind: TestAssert +timeout: 300 +--- +apiVersion: apps/v1 +kind: StatefulSet +metadata: + name: trino-test-helper +status: + readyReplicas: 1 + replicas: 1 diff --git a/tests/templates/kuttl/fault-tolerant-execution/03-install-test-helper.yaml b/tests/templates/kuttl/fault-tolerant-execution/03-install-test-helper.yaml new file mode 100644 index 00000000..4980bd6f --- /dev/null +++ b/tests/templates/kuttl/fault-tolerant-execution/03-install-test-helper.yaml @@ -0,0 +1,29 @@ +--- +apiVersion: apps/v1 +kind: StatefulSet +metadata: + name: trino-test-helper + labels: + app: trino-test-helper +spec: + replicas: 1 + selector: + matchLabels: + app: trino-test-helper + template: + metadata: + labels: + app: trino-test-helper + spec: + serviceAccount: integration-tests-sa + containers: + - name: trino-test-helper + image: oci.stackable.tech/sdp/testing-tools:0.2.0-stackable0.0.0-dev + command: ["sleep", "infinity"] + resources: + requests: + cpu: "250m" + memory: "64Mi" + limits: + cpu: "250m" + memory: "64Mi" diff --git a/tests/templates/kuttl/fault-tolerant-execution/04-copy-scripts.yaml b/tests/templates/kuttl/fault-tolerant-execution/04-copy-scripts.yaml new file mode 100644 index 00000000..c37cff38 --- /dev/null +++ b/tests/templates/kuttl/fault-tolerant-execution/04-copy-scripts.yaml @@ -0,0 +1,5 @@ +--- +apiVersion: kuttl.dev/v1beta1 +kind: TestStep +commands: + - script: kubectl cp -n $NAMESPACE 04_check-fte.py trino-test-helper-0:/tmp/ diff --git a/tests/templates/kuttl/fault-tolerant-execution/04_check-fte.py b/tests/templates/kuttl/fault-tolerant-execution/04_check-fte.py new file mode 100644 index 00000000..75ad8ec9 --- /dev/null +++ b/tests/templates/kuttl/fault-tolerant-execution/04_check-fte.py @@ -0,0 +1,98 @@ +#!/usr/bin/env python +import trino +import argparse + + +def get_connection(coordinator): + """Create anonymous connection for basic cluster health check""" + conn = trino.dbapi.connect( + host=coordinator, + port=8443, + user="test", + http_scheme="https", + verify=False, + session_properties={"query_max_execution_time": "60s"}, + ) + return conn + + +if __name__ == "__main__": + # Construct an argument parser + all_args = argparse.ArgumentParser() + + # Add arguments to the parser + all_args.add_argument( + "-c", + "--coordinator", + required=True, + help="Trino Coordinator Host to connect to", + ) + all_args.add_argument( + "-w", + "--workers", + required=True, + help="Expected amount of workers to be present", + ) + + args = vars(all_args.parse_args()) + + expected_workers = args["workers"] + conn = get_connection(args["coordinator"]) + + try: + cursor = conn.cursor() + + # Check that workers are active + cursor.execute( + "SELECT COUNT(*) as nodes FROM system.runtime.nodes WHERE coordinator=false AND state='active'" + ) + (active_workers,) = cursor.fetchone() + + if int(active_workers) != int(expected_workers): + print( + "Mismatch: [expected/active] workers [" + + str(expected_workers) + + "/" + + str(active_workers) + + "]" + ) + exit(-1) + + print(f"Active workers check passed: {active_workers}/{expected_workers}") + + # Test that TPCH connector is working + cursor.execute("SELECT COUNT(*) FROM tpch.tiny.nation") + result = cursor.fetchone() + if result[0] != 25: # TPCH tiny.nation has 25 rows + print(f"TPCH test failed: expected 25 nations, got {result[0]}") + exit(-1) + + print("TPCH connector test passed") + + # Test a more complex query + cursor.execute(""" + SELECT + nation.name, + COUNT(*) AS num_cust + FROM + tpch.tiny.customer + JOIN + tpch.tiny.nation ON customer.nationkey = nation.nationkey + GROUP BY + nation.name + ORDER BY + num_cust DESC + """) + results = cursor.fetchall() + if len(results) == 0: + print("Complex query returned no results") + exit(-1) + + print("Complex query test passed") + + except Exception as e: + print(f"Test failed with error: {e}") + import traceback + + traceback.print_exc() + exit(-1) diff --git a/tests/templates/kuttl/fault-tolerant-execution/05-assert.yaml b/tests/templates/kuttl/fault-tolerant-execution/05-assert.yaml new file mode 100644 index 00000000..615e91b2 --- /dev/null +++ b/tests/templates/kuttl/fault-tolerant-execution/05-assert.yaml @@ -0,0 +1,20 @@ +--- +apiVersion: kuttl.dev/v1beta1 +kind: TestAssert +timeout: 300 +--- +apiVersion: apps/v1 +kind: StatefulSet +metadata: + name: trino-fte-coordinator-default +status: + readyReplicas: 1 + replicas: 1 +--- +apiVersion: apps/v1 +kind: StatefulSet +metadata: + name: trino-fte-worker-default +status: + readyReplicas: 2 + replicas: 2 diff --git a/tests/templates/kuttl/fault-tolerant-execution/05-run-tests.yaml b/tests/templates/kuttl/fault-tolerant-execution/05-run-tests.yaml new file mode 100644 index 00000000..aa0e3601 --- /dev/null +++ b/tests/templates/kuttl/fault-tolerant-execution/05-run-tests.yaml @@ -0,0 +1,18 @@ +--- +apiVersion: kuttl.dev/v1beta1 +kind: TestStep +commands: + - script: kubectl exec -n $NAMESPACE trino-test-helper-0 -- python /tmp/04_check-fte.py -c trino-fte-coordinator -w 2 + timeout: 120 + # Verify that the exchange bucket contains data + - script: | + sleep 10 + kubectl exec -n $NAMESPACE deployment/minio -- mc alias set local https://localhost:9000 minioAccessKey minioSecretKey --api S3v4 + count=$(kubectl exec -n $NAMESPACE deployment/minio -- mc stat --insecure local/exchange-bucket | awk '/Objects count:/ {print $3}') + if [ "$count" -gt 0 ]; then + echo "Objects count is $count (> 0)" + else + echo "Objects count is $count (not > 0)" + exit 1 + fi + timeout: 30 diff --git a/tests/test-definition.yaml b/tests/test-definition.yaml index 92023474..6102c5ca 100644 --- a/tests/test-definition.yaml +++ b/tests/test-definition.yaml @@ -111,6 +111,10 @@ tests: - opa - keycloak - openshift + - name: fault-tolerant-execution + dimensions: + - trino + - openshift - name: listener dimensions: - trino