From 52417b31fd34767655203c2b4acbf6e64c1577d9 Mon Sep 17 00:00:00 2001 From: saptarshi-datazip <253665537+saptarshi-datazip@users.noreply.github.com> Date: Thu, 16 Apr 2026 18:38:57 +0530 Subject: [PATCH 01/11] feat: support for external temporal --- helm/olake/templates/_helpers.tpl | 11 ++++++ .../templates/external-temporal-secret.yaml | 19 ++++++++++ helm/olake/templates/olake-ui/configmap.yaml | 4 ++ helm/olake/templates/olake-ui/deployment.yaml | 20 +++++++++- .../templates/olake-worker/configmap.yaml | 4 ++ .../templates/olake-worker/deployment.yaml | 19 +++++++++- helm/olake/templates/temporal/deployment.yaml | 4 +- helm/olake/templates/temporal/service.yaml | 4 +- .../templates/temporal/ui-deployment.yaml | 2 +- helm/olake/templates/temporal/ui-service.yaml | 2 +- helm/olake/values.yaml | 38 +++++++++++++++++++ 11 files changed, 121 insertions(+), 6 deletions(-) create mode 100644 helm/olake/templates/external-temporal-secret.yaml diff --git a/helm/olake/templates/_helpers.tpl b/helm/olake/templates/_helpers.tpl index aa07ab8..ef8141b 100644 --- a/helm/olake/templates/_helpers.tpl +++ b/helm/olake/templates/_helpers.tpl @@ -120,3 +120,14 @@ Return the PostgreSQL secret name {{- printf "%s-external-postgresql" (include "olake.fullname" .) }} {{- end }} {{- end }} + +{{/* +Return the Temporal secret name +*/}} +{{- define "olake.temporal.secretName" -}} +{{- if .Values.temporal.external.existingSecret }} +{{- .Values.temporal.external.existingSecret }} +{{- else }} +{{- printf "%s-external-temporal" (include "olake.fullname" .) }} +{{- end }} +{{- end }} diff --git a/helm/olake/templates/external-temporal-secret.yaml b/helm/olake/templates/external-temporal-secret.yaml new file mode 100644 index 0000000..f56c11c --- /dev/null +++ b/helm/olake/templates/external-temporal-secret.yaml @@ -0,0 +1,19 @@ +{{- if and (not .Values.temporal.enabled) (not .Values.temporal.external.existingSecret) }} +apiVersion: v1 +kind: Secret +metadata: + name: {{ include "olake.temporal.secretName" . }} + namespace: {{ include "olake.namespace" . }} + labels: + {{- include "olake.labels" . | nindent 4 }} +type: Opaque +stringData: + {{- with .Values.temporal.external.properties }} + {{ $.Values.temporal.external.secretKeys.TEMPORAL_ADDRESS | default "TEMPORAL_ADDRESS" }}: {{ .TEMPORAL_ADDRESS | quote }} + {{ $.Values.temporal.external.secretKeys.TEMPORAL_API_KEY | default "TEMPORAL_API_KEY" }}: {{ .TEMPORAL_API_KEY | quote }} + {{ $.Values.temporal.external.secretKeys.TEMPORAL_ENABLE_TLS | default "TEMPORAL_ENABLE_TLS" }}: {{ .TEMPORAL_ENABLE_TLS | quote }} + {{ $.Values.temporal.external.secretKeys.TEMPORAL_EXTERNAL | default "TEMPORAL_EXTERNAL" }}: {{ .TEMPORAL_EXTERNAL | quote }} + {{ $.Values.temporal.external.secretKeys.TEMPORAL_NAMESPACE | default "TEMPORAL_NAMESPACE" }}: {{ .TEMPORAL_NAMESPACE | quote }} + {{ $.Values.temporal.external.secretKeys.TEMPORAL_TASK_QUEUE | default "TEMPORAL_TASK_QUEUE" }}: {{ .TEMPORAL_TASK_QUEUE | quote }} + {{- end }} +{{- end }} diff --git a/helm/olake/templates/olake-ui/configmap.yaml b/helm/olake/templates/olake-ui/configmap.yaml index 3f1da80..c4ca2ba 100644 --- a/helm/olake/templates/olake-ui/configmap.yaml +++ b/helm/olake/templates/olake-ui/configmap.yaml @@ -30,7 +30,11 @@ data: OLAKE_POSTGRES_SSLMODE = ${OLAKE_POSTGRES_SSLMODE} # Temporal Configuration + {{- if .Values.temporal.enabled }} TEMPORAL_ADDRESS = temporal.{{ include "olake.namespace" . }}.svc.cluster.local:7233 + {{- else }} + TEMPORAL_ADDRESS = ${TEMPORAL_ADDRESS} + {{- end }} # Optimization Configuration ENABLE_OPTIMIZATION = ${ENABLE_OPTIMIZATION||false} diff --git a/helm/olake/templates/olake-ui/deployment.yaml b/helm/olake/templates/olake-ui/deployment.yaml index 113421f..4cc99da 100644 --- a/helm/olake/templates/olake-ui/deployment.yaml +++ b/helm/olake/templates/olake-ui/deployment.yaml @@ -62,11 +62,24 @@ spec: - -c - | echo "Waiting for Temporal to be ready..." - until nc -z temporal.{{ include "olake.namespace" . }}.svc.cluster.local 7233; do + HOST="$(echo "$TEMPORAL_ADDRESS" | cut -d: -f1)" + PORT="$(echo "$TEMPORAL_ADDRESS" | cut -d: -f2)" + PORT="${PORT:-7233}" + until nc -z "$HOST" "$PORT"; do echo "Temporal is not ready yet. Waiting..." sleep 5 done echo "Temporal is ready!" + env: + - name: TEMPORAL_ADDRESS + {{- if .Values.temporal.enabled }} + value: "temporal.{{ include "olake.namespace" . }}.svc.cluster.local:7233" + {{- else }} + valueFrom: + secretKeyRef: + name: {{ include "olake.temporal.secretName" . }} + key: {{ .Values.temporal.external.secretKeys.TEMPORAL_ADDRESS }} + {{- end }} {{- if .Values.fusion.enabled }} - name: wait-for-fusion image: "{{ include "olake.registryBase" . }}/library/busybox:latest" @@ -149,6 +162,11 @@ spec: secretKeyRef: name: {{ include "olake.postgresql.secretName" . }} key: {{ if .Values.postgresql.enabled }}ssl_mode{{ else }}{{ .Values.postgresql.external.secretKeys.ssl_mode }}{{ end }} + {{- if not .Values.temporal.enabled }} + envFrom: + - secretRef: + name: {{ include "olake.temporal.secretName" . }} + {{- end }} # - name: POSTGRES_DB # value: "postgres://$(POSTGRES_DB_USER):$(POSTGRES_DB_PASSWORD)@$(POSTGRES_DB_HOST):$(POSTGRES_DB_PORT)/$(POSTGRES_DB_NAME)?sslmode=$(POSTGRES_DB_SSLMODE)" volumeMounts: diff --git a/helm/olake/templates/olake-worker/configmap.yaml b/helm/olake/templates/olake-worker/configmap.yaml index 65553ed..1791e2e 100644 --- a/helm/olake/templates/olake-worker/configmap.yaml +++ b/helm/olake/templates/olake-worker/configmap.yaml @@ -9,7 +9,11 @@ metadata: app.kubernetes.io/component: workers data: # Temporal Configuration + {{- if .Values.temporal.enabled }} TEMPORAL_ADDRESS: "temporal.{{ include "olake.namespace" . }}.svc.cluster.local:7233" + {{- else }} + TEMPORAL_ADDRESS: ${TEMPORAL_ADDRESS} + {{- end }} # Kubernetes Configuration WORKER_NAMESPACE: {{ include "olake.namespace" . | quote }} diff --git a/helm/olake/templates/olake-worker/deployment.yaml b/helm/olake/templates/olake-worker/deployment.yaml index bcb2ffa..1eebbe4 100644 --- a/helm/olake/templates/olake-worker/deployment.yaml +++ b/helm/olake/templates/olake-worker/deployment.yaml @@ -59,11 +59,24 @@ spec: - -c - | echo "Waiting for Temporal to be ready..." - until nc -z temporal.{{ include "olake.namespace" . }}.svc.cluster.local 7233; do + HOST="$(echo "$TEMPORAL_ADDRESS" | cut -d: -f1)" + PORT="$(echo "$TEMPORAL_ADDRESS" | cut -d: -f2)" + PORT="${PORT:-7233}" + until nc -z "$HOST" "$PORT"; do echo "Temporal is not ready yet. Waiting..." sleep 5 done echo "Temporal is ready!" + env: + - name: TEMPORAL_ADDRESS + {{- if .Values.temporal.enabled }} + value: "temporal.{{ include "olake.namespace" . }}.svc.cluster.local:7233" + {{- else }} + valueFrom: + secretKeyRef: + name: {{ include "olake.temporal.secretName" . }} + key: {{ .Values.temporal.external.secretKeys.TEMPORAL_ADDRESS }} + {{- end }} containers: - name: olake-workers image: "{{ include "olake.registryBase" . }}/{{ .Values.olakeWorker.image.repository }}:{{ .Values.olakeWorker.image.tag }}" @@ -129,6 +142,10 @@ spec: envFrom: - configMapRef: name: olake-workers-config + {{- if not .Values.temporal.enabled }} + - secretRef: + name: {{ include "olake.temporal.secretName" . }} + {{- end }} {{- $defaultResources := dict "requests" (dict "memory" "256Mi" "cpu" "250m") }} diff --git a/helm/olake/templates/temporal/deployment.yaml b/helm/olake/templates/temporal/deployment.yaml index f921892..f9a9fff 100644 --- a/helm/olake/templates/temporal/deployment.yaml +++ b/helm/olake/templates/temporal/deployment.yaml @@ -1,3 +1,4 @@ +{{- if .Values.temporal.enabled }} apiVersion: apps/v1 kind: Deployment metadata: @@ -173,4 +174,5 @@ spec: }} {{- $livenessProbe := mergeOverwrite $defaultLivenessProbe (.Values.temporal.server.livenessProbe | default dict) }} livenessProbe: - {{- toYaml $livenessProbe | nindent 10 }} \ No newline at end of file + {{- toYaml $livenessProbe | nindent 10 }} +{{- end }} \ No newline at end of file diff --git a/helm/olake/templates/temporal/service.yaml b/helm/olake/templates/temporal/service.yaml index eb0dd76..ad3ecef 100644 --- a/helm/olake/templates/temporal/service.yaml +++ b/helm/olake/templates/temporal/service.yaml @@ -1,3 +1,4 @@ +{{- if .Values.temporal.enabled }} apiVersion: v1 kind: Service metadata: @@ -22,4 +23,5 @@ spec: selector: app.kubernetes.io/name: temporal app.kubernetes.io/instance: {{ .Release.Name }} - app.kubernetes.io/component: workflow-engine \ No newline at end of file + app.kubernetes.io/component: workflow-engine +{{- end }} \ No newline at end of file diff --git a/helm/olake/templates/temporal/ui-deployment.yaml b/helm/olake/templates/temporal/ui-deployment.yaml index 8054ffd..ddb6993 100644 --- a/helm/olake/templates/temporal/ui-deployment.yaml +++ b/helm/olake/templates/temporal/ui-deployment.yaml @@ -1,4 +1,4 @@ -{{- if .Values.temporal.ui.enabled }} +{{- if and .Values.temporal.enabled .Values.temporal.ui.enabled }} apiVersion: apps/v1 kind: Deployment metadata: diff --git a/helm/olake/templates/temporal/ui-service.yaml b/helm/olake/templates/temporal/ui-service.yaml index fa737a5..8be69b3 100644 --- a/helm/olake/templates/temporal/ui-service.yaml +++ b/helm/olake/templates/temporal/ui-service.yaml @@ -1,4 +1,4 @@ -{{- if .Values.temporal.ui.enabled }} +{{- if and .Values.temporal.enabled .Values.temporal.ui.enabled }} apiVersion: v1 kind: Service metadata: diff --git a/helm/olake/values.yaml b/helm/olake/values.yaml index 080e7d2..e18257b 100644 --- a/helm/olake/values.yaml +++ b/helm/olake/values.yaml @@ -369,6 +369,44 @@ nfsServer: # Temporal configuration for workflow orchestration temporal: + # -- Enable internal Temporal deployment + # Set to false to use an external Temporal Cloud/server + enabled: true + + # -- External Temporal configuration (used when enabled: false) + # Provide Kubernetes secret containing Temporal credentials/configuration + external: + # -- Name of the Kubernetes secret containing external Temporal configuration + # The secret must contain keys: + # TEMPORAL_ADDRESS, TEMPORAL_API_KEY, TEMPORAL_ENABLE_TLS, TEMPORAL_EXTERNAL, TEMPORAL_NAMESPACE, TEMPORAL_TASK_QUEUE + # Example secret creation: + # kubectl create secret generic my-temporal-secret \ + # --from-literal=TEMPORAL_ADDRESS=..api.temporal.io:7233 \ + # --from-literal=TEMPORAL_API_KEY=ldjflskjdglsed.sidhfksdhf.sdlkjglksjdgljsd-sldjglksjdgljs-sd;kg;sldjg;ljsd-sdklhglksdhglskjd \ + # --from-literal=TEMPORAL_ENABLE_TLS=true \ + # --from-literal=TEMPORAL_EXTERNAL=true \ + # --from-literal=TEMPORAL_NAMESPACE= \ + # --from-literal=TEMPORAL_TASK_QUEUE= + existingSecret: "" + + # -- Explicit Temporal properties (used if existingSecret is not set) + properties: + TEMPORAL_ADDRESS: "" + TEMPORAL_API_KEY: "" + TEMPORAL_ENABLE_TLS: "true" + TEMPORAL_EXTERNAL: "true" + TEMPORAL_NAMESPACE: "" + TEMPORAL_TASK_QUEUE: "" + + # -- Map the keys from existing secret to the required fields + secretKeys: + TEMPORAL_ADDRESS: "TEMPORAL_ADDRESS" + TEMPORAL_API_KEY: "TEMPORAL_API_KEY" + TEMPORAL_ENABLE_TLS: "TEMPORAL_ENABLE_TLS" + TEMPORAL_EXTERNAL: "TEMPORAL_EXTERNAL" + TEMPORAL_NAMESPACE: "TEMPORAL_NAMESPACE" + TEMPORAL_TASK_QUEUE: "TEMPORAL_TASK_QUEUE" + # -- Temporal server configuration server: # -- Temporal server image configuration From 85c55149b376a71ac324b21daa69fcd697b8b7b9 Mon Sep 17 00:00:00 2001 From: saptarshi-datazip <253665537+saptarshi-datazip@users.noreply.github.com> Date: Tue, 21 Apr 2026 15:49:07 +0530 Subject: [PATCH 02/11] fix: resolved comments --- helm/olake/templates/olake-ui/deployment.yaml | 19 +++++-------------- .../templates/olake-worker/configmap.yaml | 2 -- .../templates/olake-worker/deployment.yaml | 17 +++-------------- 3 files changed, 8 insertions(+), 30 deletions(-) diff --git a/helm/olake/templates/olake-ui/deployment.yaml b/helm/olake/templates/olake-ui/deployment.yaml index 4cc99da..c93cfa8 100644 --- a/helm/olake/templates/olake-ui/deployment.yaml +++ b/helm/olake/templates/olake-ui/deployment.yaml @@ -53,7 +53,9 @@ spec: {{- if (include "olake.jobServiceAccountName" .) }} serviceAccountName: {{ include "olake.jobServiceAccountName" . }} {{- end }} + {{- if or .Values.temporal.enabled .Values.fusion.enabled }} initContainers: + {{- if .Values.temporal.enabled }} - name: wait-for-temporal image: "{{ include "olake.registryBase" . }}/library/busybox:latest" imagePullPolicy: IfNotPresent @@ -62,24 +64,12 @@ spec: - -c - | echo "Waiting for Temporal to be ready..." - HOST="$(echo "$TEMPORAL_ADDRESS" | cut -d: -f1)" - PORT="$(echo "$TEMPORAL_ADDRESS" | cut -d: -f2)" - PORT="${PORT:-7233}" - until nc -z "$HOST" "$PORT"; do + until nc -z temporal.{{ include "olake.namespace" . }}.svc.cluster.local 7233; do echo "Temporal is not ready yet. Waiting..." sleep 5 done echo "Temporal is ready!" - env: - - name: TEMPORAL_ADDRESS - {{- if .Values.temporal.enabled }} - value: "temporal.{{ include "olake.namespace" . }}.svc.cluster.local:7233" - {{- else }} - valueFrom: - secretKeyRef: - name: {{ include "olake.temporal.secretName" . }} - key: {{ .Values.temporal.external.secretKeys.TEMPORAL_ADDRESS }} - {{- end }} + {{- end }} {{- if .Values.fusion.enabled }} - name: wait-for-fusion image: "{{ include "olake.registryBase" . }}/library/busybox:latest" @@ -95,6 +85,7 @@ spec: done echo "Fusion REST service is ready!" {{- end }} + {{- end }} containers: - name: olake-ui image: "{{ include "olake.registryBase" . }}/{{ .Values.olakeUI.image.repository }}:{{ .Values.olakeUI.image.tag }}" diff --git a/helm/olake/templates/olake-worker/configmap.yaml b/helm/olake/templates/olake-worker/configmap.yaml index 1791e2e..0c5dc79 100644 --- a/helm/olake/templates/olake-worker/configmap.yaml +++ b/helm/olake/templates/olake-worker/configmap.yaml @@ -11,8 +11,6 @@ data: # Temporal Configuration {{- if .Values.temporal.enabled }} TEMPORAL_ADDRESS: "temporal.{{ include "olake.namespace" . }}.svc.cluster.local:7233" - {{- else }} - TEMPORAL_ADDRESS: ${TEMPORAL_ADDRESS} {{- end }} # Kubernetes Configuration diff --git a/helm/olake/templates/olake-worker/deployment.yaml b/helm/olake/templates/olake-worker/deployment.yaml index 1eebbe4..dd00ff7 100644 --- a/helm/olake/templates/olake-worker/deployment.yaml +++ b/helm/olake/templates/olake-worker/deployment.yaml @@ -50,6 +50,7 @@ spec: {{- toYaml . | nindent 8 }} {{- end }} serviceAccountName: {{ include "olake.workerServiceAccountName" . }} + {{- if .Values.temporal.enabled }} initContainers: - name: wait-for-temporal image: "{{ include "olake.registryBase" . }}/library/busybox:latest" @@ -59,24 +60,12 @@ spec: - -c - | echo "Waiting for Temporal to be ready..." - HOST="$(echo "$TEMPORAL_ADDRESS" | cut -d: -f1)" - PORT="$(echo "$TEMPORAL_ADDRESS" | cut -d: -f2)" - PORT="${PORT:-7233}" - until nc -z "$HOST" "$PORT"; do + until nc -z temporal.{{ include "olake.namespace" . }}.svc.cluster.local 7233; do echo "Temporal is not ready yet. Waiting..." sleep 5 done echo "Temporal is ready!" - env: - - name: TEMPORAL_ADDRESS - {{- if .Values.temporal.enabled }} - value: "temporal.{{ include "olake.namespace" . }}.svc.cluster.local:7233" - {{- else }} - valueFrom: - secretKeyRef: - name: {{ include "olake.temporal.secretName" . }} - key: {{ .Values.temporal.external.secretKeys.TEMPORAL_ADDRESS }} - {{- end }} + {{- end }} containers: - name: olake-workers image: "{{ include "olake.registryBase" . }}/{{ .Values.olakeWorker.image.repository }}:{{ .Values.olakeWorker.image.tag }}" From 6ad104608596d7b287e846b7f9e85e958ddb8506 Mon Sep 17 00:00:00 2001 From: Schitiz Sharma Date: Thu, 14 May 2026 16:08:15 +0530 Subject: [PATCH 03/11] feat: add support for external temporal --- worker/constants/env.go | 4 + worker/go.mod | 1 + worker/go.sum | 2 + worker/temporal/client.go | 55 ++++++++-- worker/temporal/external.go | 194 ++++++++++++++++++++++++++++++++++++ worker/temporal/worker.go | 38 +++++-- 6 files changed, 276 insertions(+), 18 deletions(-) create mode 100644 worker/temporal/external.go diff --git a/worker/constants/env.go b/worker/constants/env.go index 6617872..a59e558 100644 --- a/worker/constants/env.go +++ b/worker/constants/env.go @@ -17,6 +17,10 @@ const ( // temporal EnvTemporalAddress = "TEMPORAL_ADDRESS" EnvTemporalRetentionPeriod = "TEMPORAL_RETENTION_PERIOD" + EnvTemporalExternal = "TEMPORAL_EXTERNAL" + EnvTemporalAPIKey = "TEMPORAL_API_KEY" + EnvTemporalNamespace = "TEMPORAL_NAMESPACE" + EnvTemporalEnableTLS = "TEMPORAL_ENABLE_TLS" // registry ContainerRegistryBase = "CONTAINER_REGISTRY_BASE" diff --git a/worker/go.mod b/worker/go.mod index 0cac0a2..5f99a08 100644 --- a/worker/go.mod +++ b/worker/go.mod @@ -14,6 +14,7 @@ require ( github.com/stretchr/testify v1.11.1 github.com/testcontainers/testcontainers-go v0.40.0 go.temporal.io/api v1.51.0 + go.temporal.io/cloud-sdk v0.8.0 go.temporal.io/sdk v1.36.0 google.golang.org/grpc v1.79.3 google.golang.org/protobuf v1.36.10 diff --git a/worker/go.sum b/worker/go.sum index bdfa2e7..bafb137 100644 --- a/worker/go.sum +++ b/worker/go.sum @@ -276,6 +276,8 @@ go.opentelemetry.io/proto/otlp v1.7.1 h1:gTOMpGDb0WTBOP8JaO72iL3auEZhVmAQg4ipjOV go.opentelemetry.io/proto/otlp v1.7.1/go.mod h1:b2rVh6rfI/s2pHWNlB7ILJcRALpcNDzKhACevjI+ZnE= go.temporal.io/api v1.51.0 h1:9+e14GrIa7nWoWoudqj/PSwm33yYjV+u8TAR9If7s/g= go.temporal.io/api v1.51.0/go.mod h1:iaxoP/9OXMJcQkETTECfwYq4cw/bj4nwov8b3ZLVnXM= +go.temporal.io/cloud-sdk v0.8.0 h1:+MRcD1EEdZLWT/xFn5pG5/FHmYok/6Jif2FsZxZByV8= +go.temporal.io/cloud-sdk v0.8.0/go.mod h1:W2O9t9tvo3Q/LhGgYdj8JijWbN5C84os+cz/BadIHYI= go.temporal.io/sdk v1.36.0 h1:WO9zetpybBNK7xsQth4Z+3Zzw1zSaM9MOUGrnnUjZMo= go.temporal.io/sdk v1.36.0/go.mod h1:8BxGRF0LcQlfQrLLGkgVajbsKUp/PY7280XTdcKc18Y= go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= diff --git a/worker/temporal/client.go b/worker/temporal/client.go index 2bfcf15..f26d657 100644 --- a/worker/temporal/client.go +++ b/worker/temporal/client.go @@ -2,6 +2,7 @@ package temporal import ( "context" + "crypto/tls" "fmt" "time" @@ -24,12 +25,29 @@ type Temporal struct { func NewClient() (*Temporal, error) { var temporalClient *Temporal + namespace := viper.GetString(constants.EnvTemporalNamespace) + if namespace == "" { + namespace = constants.DefaultTemporalNamespace + } + err := utils.RetryWithBackoff(func() error { - client, err := client.Dial(client.Options{ - HostPort: viper.GetString(constants.EnvTemporalAddress), - Logger: logger.Log(context.Background()), - Namespace: constants.DefaultTemporalNamespace, - }) + opts := client.Options{ + HostPort: viper.GetString(constants.EnvTemporalAddress), + Logger: logger.Log(context.Background()), + Namespace: namespace, + } + + if viper.GetBool(constants.EnvTemporalEnableTLS) { + opts.ConnectionOptions = client.ConnectionOptions{ + TLS: &tls.Config{}, + } + } + + if apiKey := viper.GetString(constants.EnvTemporalAPIKey); apiKey != "" { + opts.Credentials = client.NewAPIKeyStaticCredentials(apiKey) + } + + client, err := client.Dial(opts) if err != nil { return err } @@ -55,7 +73,7 @@ func (t *Temporal) GetClient() client.Client { return t.client } -// SetWorkflowRetentionPeriod sets the workflow execution retention period for the default namespace. +// SetWorkflowRetentionPeriod sets the workflow execution retention period for the namespace. // This ensures workflow history is available for debugging (defaults to 7 days). // Handles both fresh installs and upgrades from shorter retention periods. // Fatal: worker fails to start if this fails. @@ -65,16 +83,35 @@ func (t *Temporal) SetWorkflowRetentionPeriod(ctx context.Context) error { return fmt.Errorf("failed to parse retention string: %s", err) } + namespace := viper.GetString(constants.EnvTemporalNamespace) + if namespace == "" { + namespace = constants.DefaultTemporalNamespace + } + + if viper.GetBool(constants.EnvTemporalExternal) { + externalClient, err := NewExternalClient() + if err != nil { + return fmt.Errorf("failed to create external Temporal client: %w", err) + } + defer externalClient.Close() + + days := int32(retentionPeriod.Hours() / 24) + if days < 1 { + days = 1 + } + return externalClient.SetNamespaceRetention(ctx, namespace, days) + } + _, err = t.client.WorkflowService().UpdateNamespace(ctx, &workflowservice.UpdateNamespaceRequest{ - Namespace: constants.DefaultTemporalNamespace, + Namespace: namespace, Config: &namespacepb.NamespaceConfig{ WorkflowExecutionRetentionTtl: durationpb.New(retentionPeriod), }, }) if err != nil { - return fmt.Errorf("failed to update namespace %s retention: %s", constants.DefaultTemporalNamespace, err) + return fmt.Errorf("failed to update namespace %s retention: %s", namespace, err) } - logger.Infof("namespace %s retention set to %s", constants.DefaultTemporalNamespace, retentionPeriod) + logger.Infof("namespace %s retention set to %s", namespace, retentionPeriod) return nil } diff --git a/worker/temporal/external.go b/worker/temporal/external.go new file mode 100644 index 0000000..decb25e --- /dev/null +++ b/worker/temporal/external.go @@ -0,0 +1,194 @@ +package temporal + +import ( + "context" + "fmt" + "time" + + "github.com/datazip-inc/olake-helm/worker/constants" + "github.com/datazip-inc/olake-helm/worker/utils/logger" + "github.com/spf13/viper" + enums "go.temporal.io/api/enums/v1" + cloudservice "go.temporal.io/cloud-sdk/api/cloudservice/v1" + namespacepb "go.temporal.io/cloud-sdk/api/namespace/v1" + operationpb "go.temporal.io/cloud-sdk/api/operation/v1" + "go.temporal.io/cloud-sdk/cloudclient" +) + +// ExternalClient wraps the Temporal Cloud API client +type ExternalClient struct { + client *cloudclient.Client +} + +// NewExternalClient creates a new Temporal Cloud API client +func NewExternalClient() (*ExternalClient, error) { + apiKey := viper.GetString(constants.EnvTemporalAPIKey) + if apiKey == "" { + return nil, fmt.Errorf("TEMPORAL_API_KEY is required for external Temporal") + } + + client, err := cloudclient.New(cloudclient.Options{ + APIKey: apiKey, + }) + if err != nil { + return nil, fmt.Errorf("failed to create external Temporal client: %w", err) + } + + return &ExternalClient{ + client: client, + }, nil +} + +// Close closes the external Temporal API client +func (c *ExternalClient) Close() { + if c.client != nil { + c.client.Close() + } +} + +// waitForAsyncOperation polls the async operation until it completes +func (c *ExternalClient) waitForAsyncOperation(ctx context.Context, opID string) error { + service := c.client.CloudService() + ticker := time.NewTicker(2 * time.Second) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + return ctx.Err() + case <-ticker.C: + resp, err := service.GetAsyncOperation(ctx, &cloudservice.GetAsyncOperationRequest{ + AsyncOperationId: opID, + }) + if err != nil { + return fmt.Errorf("failed to get async operation status: %w", err) + } + + switch state := resp.GetAsyncOperation().GetState(); state { + case operationpb.AsyncOperation_STATE_FULFILLED: + return nil + case operationpb.AsyncOperation_STATE_FAILED: + return fmt.Errorf("async operation failed: %s", resp.GetAsyncOperation().GetFailureReason()) + case operationpb.AsyncOperation_STATE_CANCELLED: + return fmt.Errorf("async operation cancelled") + } + // Keep waiting for STATE_PENDING or STATE_IN_PROGRESS + } + } +} + +// SetNamespaceRetention sets the workflow execution retention period via external Temporal API +func (c *ExternalClient) SetNamespaceRetention(ctx context.Context, namespace string, retentionDays int32) error { + service := c.client.CloudService() + + getResp, err := service.GetNamespace(ctx, &cloudservice.GetNamespaceRequest{ + Namespace: namespace, + }) + if err != nil { + return fmt.Errorf("failed to get namespace %s: %w", namespace, err) + } + + ns := getResp.GetNamespace() + spec := ns.GetSpec() + if spec == nil { + return fmt.Errorf("namespace %s has no spec", namespace) + } + + if spec.GetRetentionDays() == retentionDays { + logger.Infof("namespace %s retention is already %d days", namespace, retentionDays) + return nil + } + + spec.RetentionDays = retentionDays + updateResp, err := service.UpdateNamespace(ctx, &cloudservice.UpdateNamespaceRequest{ + Namespace: namespace, + Spec: spec, + ResourceVersion: ns.GetResourceVersion(), + }) + if err != nil { + return fmt.Errorf("failed to update namespace %s: %w", namespace, err) + } + + opID := updateResp.GetAsyncOperation().GetId() + logger.Infof("waiting for namespace %s update operation (id: %s) to complete", namespace, opID) + if err := c.waitForAsyncOperation(ctx, opID); err != nil { + return fmt.Errorf("namespace update operation failed: %w", err) + } + + logger.Infof("namespace %s retention successfully set to %d days in external Temporal", namespace, retentionDays) + return nil +} + +// AddSearchAttributes adds custom search attributes via external Temporal API +func (c *ExternalClient) AddSearchAttributes(ctx context.Context, namespace string, searchAttributes map[string]enums.IndexedValueType) error { + service := c.client.CloudService() + + getResp, err := service.GetNamespace(ctx, &cloudservice.GetNamespaceRequest{ + Namespace: namespace, + }) + if err != nil { + return fmt.Errorf("failed to get namespace %s: %w", namespace, err) + } + + ns := getResp.GetNamespace() + spec := ns.GetSpec() + if spec == nil { + return fmt.Errorf("namespace %s has no spec", namespace) + } + + existingAttributes := spec.GetSearchAttributes() + if existingAttributes == nil { + existingAttributes = make(map[string]namespacepb.NamespaceSpec_SearchAttributeType) + } + + needsUpdate := false + + for k, v := range searchAttributes { + if _, exists := existingAttributes[k]; !exists { + var externalType namespacepb.NamespaceSpec_SearchAttributeType + switch v { + case enums.INDEXED_VALUE_TYPE_TEXT: + externalType = namespacepb.NamespaceSpec_SEARCH_ATTRIBUTE_TYPE_TEXT + case enums.INDEXED_VALUE_TYPE_KEYWORD: + externalType = namespacepb.NamespaceSpec_SEARCH_ATTRIBUTE_TYPE_KEYWORD + case enums.INDEXED_VALUE_TYPE_INT: + externalType = namespacepb.NamespaceSpec_SEARCH_ATTRIBUTE_TYPE_INT + case enums.INDEXED_VALUE_TYPE_DOUBLE: + externalType = namespacepb.NamespaceSpec_SEARCH_ATTRIBUTE_TYPE_DOUBLE + case enums.INDEXED_VALUE_TYPE_BOOL: + externalType = namespacepb.NamespaceSpec_SEARCH_ATTRIBUTE_TYPE_BOOL + case enums.INDEXED_VALUE_TYPE_DATETIME: + externalType = namespacepb.NamespaceSpec_SEARCH_ATTRIBUTE_TYPE_DATETIME + case enums.INDEXED_VALUE_TYPE_KEYWORD_LIST: + externalType = namespacepb.NamespaceSpec_SEARCH_ATTRIBUTE_TYPE_KEYWORD_LIST + default: + return fmt.Errorf("unsupported search attribute type for key %s: %v", k, v) + } + existingAttributes[k] = externalType + needsUpdate = true + } + } + + if !needsUpdate { + logger.Infof("search attributes already exist in namespace %s", namespace) + return nil + } + + updateResp, err := service.UpdateNamespace(ctx, &cloudservice.UpdateNamespaceRequest{ + Namespace: namespace, + Spec: spec, + ResourceVersion: ns.GetResourceVersion(), + }) + if err != nil { + return fmt.Errorf("failed to update namespace %s: %w", namespace, err) + } + + opID := updateResp.GetAsyncOperation().GetId() + logger.Infof("waiting for namespace %s search attributes update operation (id: %s) to complete", namespace, opID) + if err := c.waitForAsyncOperation(ctx, opID); err != nil { + return fmt.Errorf("namespace update operation failed: %w", err) + } + + logger.Infof("custom search attributes successfully added to namespace %s in external Temporal", namespace) + return nil +} diff --git a/worker/temporal/worker.go b/worker/temporal/worker.go index 06f30ce..fe736d7 100644 --- a/worker/temporal/worker.go +++ b/worker/temporal/worker.go @@ -2,10 +2,12 @@ package temporal import ( "context" + "fmt" "github.com/datazip-inc/olake-helm/worker/constants" "github.com/datazip-inc/olake-helm/worker/database" "github.com/datazip-inc/olake-helm/worker/utils/logger" + "github.com/spf13/viper" "github.com/datazip-inc/olake-helm/worker/executor" enums "go.temporal.io/api/enums/v1" @@ -45,17 +47,35 @@ func NewWorker(ctx context.Context, t *Temporal, e *executor.AbstractExecutor, d w.RegisterActivity(activitiesInstance.PostClearActivity) w.RegisterActivity(activitiesInstance.SendWebhookNotificationActivity) - // Register search attributes - // Namespace is required for SQL/Postgres visibility store, optional for Elasticsearch - _, err := t.GetClient().OperatorService().AddSearchAttributes(ctx, &operatorservice.AddSearchAttributesRequest{ - SearchAttributes: map[string]enums.IndexedValueType{constants.OperationTypeKey: enums.INDEXED_VALUE_TYPE_KEYWORD}, - Namespace: constants.DefaultTemporalNamespace, - }) - if err != nil && serviceerror.ToStatus(err).Code() != codes.AlreadyExists { - return nil, err + searchAttributes := map[string]enums.IndexedValueType{constants.OperationTypeKey: enums.INDEXED_VALUE_TYPE_KEYWORD} + + namespace := viper.GetString(constants.EnvTemporalNamespace) + if namespace == "" { + namespace = constants.DefaultTemporalNamespace + } + + if viper.GetBool(constants.EnvTemporalExternal) { + externalClient, err := NewExternalClient() + if err != nil { + return nil, fmt.Errorf("failed to create external Temporal client: %w", err) + } + defer externalClient.Close() + + if err := externalClient.AddSearchAttributes(ctx, namespace, searchAttributes); err != nil { + return nil, fmt.Errorf("failed to add search attributes: %w", err) + } + } else { + // Namespace is required for SQL/Postgres visibility store, optional for Elasticsearch + _, err := t.GetClient().OperatorService().AddSearchAttributes(ctx, &operatorservice.AddSearchAttributesRequest{ + SearchAttributes: searchAttributes, + Namespace: namespace, + }) + if err != nil && serviceerror.ToStatus(err).Code() != codes.AlreadyExists { + return nil, err + } } - logger.Infof("worker client created successfully") + logger.Infof("worker client created successfully") return &Worker{ worker: w, From 2793510d044506401571324c3b638fab628d42de Mon Sep 17 00:00:00 2001 From: Schitiz Sharma Date: Thu, 14 May 2026 16:30:02 +0530 Subject: [PATCH 04/11] feat: separate cloud and true external temporal --- worker/temporal/client.go | 2 +- worker/temporal/worker.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/worker/temporal/client.go b/worker/temporal/client.go index f26d657..2e61d25 100644 --- a/worker/temporal/client.go +++ b/worker/temporal/client.go @@ -88,7 +88,7 @@ func (t *Temporal) SetWorkflowRetentionPeriod(ctx context.Context) error { namespace = constants.DefaultTemporalNamespace } - if viper.GetBool(constants.EnvTemporalExternal) { + if viper.GetBool(constants.EnvTemporalExternal) && viper.GetString(constants.EnvTemporalAPIKey) != "" { externalClient, err := NewExternalClient() if err != nil { return fmt.Errorf("failed to create external Temporal client: %w", err) diff --git a/worker/temporal/worker.go b/worker/temporal/worker.go index fe736d7..b8703c3 100644 --- a/worker/temporal/worker.go +++ b/worker/temporal/worker.go @@ -54,7 +54,7 @@ func NewWorker(ctx context.Context, t *Temporal, e *executor.AbstractExecutor, d namespace = constants.DefaultTemporalNamespace } - if viper.GetBool(constants.EnvTemporalExternal) { + if viper.GetBool(constants.EnvTemporalExternal) && viper.GetString(constants.EnvTemporalAPIKey) != "" { externalClient, err := NewExternalClient() if err != nil { return nil, fmt.Errorf("failed to create external Temporal client: %w", err) From 5c8e69e8d3168be174ccf1b6f24a9398f52b2b87 Mon Sep 17 00:00:00 2001 From: Schitiz Sharma Date: Mon, 18 May 2026 13:12:39 +0530 Subject: [PATCH 05/11] fix: pr review fixes --- worker/constants/env.go | 1 + worker/temporal/activity.go | 8 +++++- worker/temporal/client.go | 21 ++++++--------- worker/temporal/external.go | 53 +++++++++++++++++-------------------- worker/temporal/worker.go | 16 ++++++----- worker/utils/utils.go | 18 +++++++++++++ 6 files changed, 68 insertions(+), 49 deletions(-) diff --git a/worker/constants/env.go b/worker/constants/env.go index a59e558..8a8c6c0 100644 --- a/worker/constants/env.go +++ b/worker/constants/env.go @@ -21,6 +21,7 @@ const ( EnvTemporalAPIKey = "TEMPORAL_API_KEY" EnvTemporalNamespace = "TEMPORAL_NAMESPACE" EnvTemporalEnableTLS = "TEMPORAL_ENABLE_TLS" + EnvTemporalTaskQueue = "TEMPORAL_TASK_QUEUE" // registry ContainerRegistryBase = "CONTAINER_REGISTRY_BASE" diff --git a/worker/temporal/activity.go b/worker/temporal/activity.go index 438b621..6fad305 100644 --- a/worker/temporal/activity.go +++ b/worker/temporal/activity.go @@ -13,6 +13,7 @@ import ( "github.com/datazip-inc/olake-helm/worker/utils/logger" "github.com/datazip-inc/olake-helm/worker/utils/notifications" "github.com/datazip-inc/olake-helm/worker/utils/telemetry" + "github.com/spf13/viper" "go.temporal.io/sdk/activity" "go.temporal.io/sdk/client" "go.temporal.io/sdk/temporal" @@ -155,13 +156,18 @@ func (a *Activity) PostClearActivity(ctx context.Context, req *types.ExecutionRe scheduleID := fmt.Sprintf("schedule-%s", workflowID) handle := a.tempClient.ScheduleClient().GetHandle(ctx, scheduleID) + taskQueue := constants.TaskQueue + if viper.GetBool(constants.EnvTemporalExternal) { + taskQueue = utils.GetTemporalTaskQueue() + } + err := handle.Update(ctx, client.ScheduleUpdateOptions{ DoUpdate: func(input client.ScheduleUpdateInput) (*client.ScheduleUpdate, error) { input.Description.Schedule.Action = &client.ScheduleWorkflowAction{ ID: workflowID, Workflow: RunSyncWorkflow, Args: []any{req}, - TaskQueue: constants.TaskQueue, + TaskQueue: taskQueue, } if input.Description.Schedule.State != nil { diff --git a/worker/temporal/client.go b/worker/temporal/client.go index 2e61d25..5d912e4 100644 --- a/worker/temporal/client.go +++ b/worker/temporal/client.go @@ -25,10 +25,7 @@ type Temporal struct { func NewClient() (*Temporal, error) { var temporalClient *Temporal - namespace := viper.GetString(constants.EnvTemporalNamespace) - if namespace == "" { - namespace = constants.DefaultTemporalNamespace - } + namespace := utils.GetTemporalNamespace() err := utils.RetryWithBackoff(func() error { opts := client.Options{ @@ -83,23 +80,21 @@ func (t *Temporal) SetWorkflowRetentionPeriod(ctx context.Context) error { return fmt.Errorf("failed to parse retention string: %s", err) } - namespace := viper.GetString(constants.EnvTemporalNamespace) - if namespace == "" { - namespace = constants.DefaultTemporalNamespace - } + namespace := constants.DefaultTemporalNamespace - if viper.GetBool(constants.EnvTemporalExternal) && viper.GetString(constants.EnvTemporalAPIKey) != "" { + if utils.IsTemporalCloud() { externalClient, err := NewExternalClient() if err != nil { return fmt.Errorf("failed to create external Temporal client: %w", err) } defer externalClient.Close() - days := int32(retentionPeriod.Hours() / 24) - if days < 1 { - days = 1 + namespace = utils.GetTemporalNamespace() + retentionDays := int32(retentionPeriod.Hours() / 24) + if retentionDays < 1 { + retentionDays = 1 } - return externalClient.SetNamespaceRetention(ctx, namespace, days) + return externalClient.SetNamespaceRetention(ctx, namespace, retentionDays) } _, err = t.client.WorkflowService().UpdateNamespace(ctx, &workflowservice.UpdateNamespaceRequest{ diff --git a/worker/temporal/external.go b/worker/temporal/external.go index decb25e..31eef87 100644 --- a/worker/temporal/external.go +++ b/worker/temporal/external.go @@ -79,19 +79,9 @@ func (c *ExternalClient) waitForAsyncOperation(ctx context.Context, opID string) // SetNamespaceRetention sets the workflow execution retention period via external Temporal API func (c *ExternalClient) SetNamespaceRetention(ctx context.Context, namespace string, retentionDays int32) error { - service := c.client.CloudService() - - getResp, err := service.GetNamespace(ctx, &cloudservice.GetNamespaceRequest{ - Namespace: namespace, - }) + ns, spec, err := c.getNamespaceAndSpec(ctx, namespace) if err != nil { - return fmt.Errorf("failed to get namespace %s: %w", namespace, err) - } - - ns := getResp.GetNamespace() - spec := ns.GetSpec() - if spec == nil { - return fmt.Errorf("namespace %s has no spec", namespace) + return err } if spec.GetRetentionDays() == retentionDays { @@ -100,7 +90,7 @@ func (c *ExternalClient) SetNamespaceRetention(ctx context.Context, namespace st } spec.RetentionDays = retentionDays - updateResp, err := service.UpdateNamespace(ctx, &cloudservice.UpdateNamespaceRequest{ + updateResp, err := c.client.CloudService().UpdateNamespace(ctx, &cloudservice.UpdateNamespaceRequest{ Namespace: namespace, Spec: spec, ResourceVersion: ns.GetResourceVersion(), @@ -121,25 +111,15 @@ func (c *ExternalClient) SetNamespaceRetention(ctx context.Context, namespace st // AddSearchAttributes adds custom search attributes via external Temporal API func (c *ExternalClient) AddSearchAttributes(ctx context.Context, namespace string, searchAttributes map[string]enums.IndexedValueType) error { - service := c.client.CloudService() - - getResp, err := service.GetNamespace(ctx, &cloudservice.GetNamespaceRequest{ - Namespace: namespace, - }) + ns, spec, err := c.getNamespaceAndSpec(ctx, namespace) if err != nil { - return fmt.Errorf("failed to get namespace %s: %w", namespace, err) + return err } - ns := getResp.GetNamespace() - spec := ns.GetSpec() - if spec == nil { - return fmt.Errorf("namespace %s has no spec", namespace) - } - - existingAttributes := spec.GetSearchAttributes() - if existingAttributes == nil { - existingAttributes = make(map[string]namespacepb.NamespaceSpec_SearchAttributeType) + if spec.SearchAttributes == nil { + spec.SearchAttributes = make(map[string]namespacepb.NamespaceSpec_SearchAttributeType) } + existingAttributes := spec.SearchAttributes needsUpdate := false @@ -174,7 +154,7 @@ func (c *ExternalClient) AddSearchAttributes(ctx context.Context, namespace stri return nil } - updateResp, err := service.UpdateNamespace(ctx, &cloudservice.UpdateNamespaceRequest{ + updateResp, err := c.client.CloudService().UpdateNamespace(ctx, &cloudservice.UpdateNamespaceRequest{ Namespace: namespace, Spec: spec, ResourceVersion: ns.GetResourceVersion(), @@ -192,3 +172,18 @@ func (c *ExternalClient) AddSearchAttributes(ctx context.Context, namespace stri logger.Infof("custom search attributes successfully added to namespace %s in external Temporal", namespace) return nil } + +func (c *ExternalClient) getNamespaceAndSpec(ctx context.Context, namespace string) (*namespacepb.Namespace, *namespacepb.NamespaceSpec, error) { + namespaceResp, err := c.client.CloudService().GetNamespace(ctx, &cloudservice.GetNamespaceRequest{ + Namespace: namespace, + }) + if err != nil { + return nil, nil, fmt.Errorf("failed to get namespace %s: %w", namespace, err) + } + ns := namespaceResp.GetNamespace() + spec := ns.GetSpec() + if spec == nil { + return nil, nil, fmt.Errorf("namespace %s has no spec", namespace) + } + return ns, spec, nil +} diff --git a/worker/temporal/worker.go b/worker/temporal/worker.go index b8703c3..41e7526 100644 --- a/worker/temporal/worker.go +++ b/worker/temporal/worker.go @@ -6,6 +6,7 @@ import ( "github.com/datazip-inc/olake-helm/worker/constants" "github.com/datazip-inc/olake-helm/worker/database" + "github.com/datazip-inc/olake-helm/worker/utils" "github.com/datazip-inc/olake-helm/worker/utils/logger" "github.com/spf13/viper" @@ -32,7 +33,12 @@ func NewWorker(ctx context.Context, t *Temporal, e *executor.AbstractExecutor, d NewLoggingInterceptor(), }, } - w := worker.New(t.GetClient(), constants.TaskQueue, workerOptions) + var w worker.Worker + if viper.GetBool(constants.EnvTemporalExternal) { + w = worker.New(t.GetClient(), utils.GetTemporalTaskQueue(), workerOptions) + } else { + w = worker.New(t.GetClient(), constants.TaskQueue, workerOptions) + } // regsiter workflows w.RegisterWorkflow(RunSyncWorkflow) @@ -49,18 +55,16 @@ func NewWorker(ctx context.Context, t *Temporal, e *executor.AbstractExecutor, d searchAttributes := map[string]enums.IndexedValueType{constants.OperationTypeKey: enums.INDEXED_VALUE_TYPE_KEYWORD} - namespace := viper.GetString(constants.EnvTemporalNamespace) - if namespace == "" { - namespace = constants.DefaultTemporalNamespace - } + namespace := constants.DefaultTemporalNamespace - if viper.GetBool(constants.EnvTemporalExternal) && viper.GetString(constants.EnvTemporalAPIKey) != "" { + if utils.IsTemporalCloud() { externalClient, err := NewExternalClient() if err != nil { return nil, fmt.Errorf("failed to create external Temporal client: %w", err) } defer externalClient.Close() + namespace = utils.GetTemporalNamespace() if err := externalClient.AddSearchAttributes(ctx, namespace, searchAttributes); err != nil { return nil, fmt.Errorf("failed to add search attributes: %w", err) } diff --git a/worker/utils/utils.go b/worker/utils/utils.go index 6534db0..f7d3d3a 100644 --- a/worker/utils/utils.go +++ b/worker/utils/utils.go @@ -239,6 +239,24 @@ func WorkflowHash(workflowID string) string { return fmt.Sprintf("%x", sha256.Sum256([]byte(workflowID))) } +func GetTemporalNamespace() string { + if ns := viper.GetString(constants.EnvTemporalNamespace); ns != "" { + return ns + } + return constants.DefaultTemporalNamespace +} + +func GetTemporalTaskQueue() string { + if queue := viper.GetString(constants.EnvTemporalTaskQueue); queue != "" { + return queue + } + return constants.TaskQueue +} + +func IsTemporalCloud() bool { + return viper.GetBool(constants.EnvTemporalExternal) && viper.GetString(constants.EnvTemporalAPIKey) != "" +} + func GetExecutorEnvironment() string { if viper.GetString(constants.EnvKubernetesServiceHost) != "" { return string(types.Kubernetes) From e969310913928e144d8ba4dca64cf223b0f585b8 Mon Sep 17 00:00:00 2001 From: Schitiz Sharma Date: Mon, 18 May 2026 13:26:12 +0530 Subject: [PATCH 06/11] chore: add if-clause for client creation --- worker/temporal/client.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/worker/temporal/client.go b/worker/temporal/client.go index 5d912e4..557edd2 100644 --- a/worker/temporal/client.go +++ b/worker/temporal/client.go @@ -25,7 +25,10 @@ type Temporal struct { func NewClient() (*Temporal, error) { var temporalClient *Temporal - namespace := utils.GetTemporalNamespace() + namespace := constants.DefaultTemporalNamespace + if utils.IsTemporalCloud() { + namespace = utils.GetTemporalNamespace() + } err := utils.RetryWithBackoff(func() error { opts := client.Options{ From 6354cf632a18bda39893e37403e4622db729cb84 Mon Sep 17 00:00:00 2001 From: vishalm0509 Date: Tue, 19 May 2026 01:58:11 +0530 Subject: [PATCH 07/11] chore: refactor code --- worker/temporal/activity.go | 6 +----- worker/temporal/client.go | 8 ++------ worker/temporal/worker.go | 11 ++--------- worker/utils/utils.go | 21 +++++++++++++++++---- 4 files changed, 22 insertions(+), 24 deletions(-) diff --git a/worker/temporal/activity.go b/worker/temporal/activity.go index 6fad305..a391cee 100644 --- a/worker/temporal/activity.go +++ b/worker/temporal/activity.go @@ -13,7 +13,6 @@ import ( "github.com/datazip-inc/olake-helm/worker/utils/logger" "github.com/datazip-inc/olake-helm/worker/utils/notifications" "github.com/datazip-inc/olake-helm/worker/utils/telemetry" - "github.com/spf13/viper" "go.temporal.io/sdk/activity" "go.temporal.io/sdk/client" "go.temporal.io/sdk/temporal" @@ -156,10 +155,7 @@ func (a *Activity) PostClearActivity(ctx context.Context, req *types.ExecutionRe scheduleID := fmt.Sprintf("schedule-%s", workflowID) handle := a.tempClient.ScheduleClient().GetHandle(ctx, scheduleID) - taskQueue := constants.TaskQueue - if viper.GetBool(constants.EnvTemporalExternal) { - taskQueue = utils.GetTemporalTaskQueue() - } + taskQueue := utils.GetTemporalTaskQueue() err := handle.Update(ctx, client.ScheduleUpdateOptions{ DoUpdate: func(input client.ScheduleUpdateInput) (*client.ScheduleUpdate, error) { diff --git a/worker/temporal/client.go b/worker/temporal/client.go index 557edd2..cbbc4af 100644 --- a/worker/temporal/client.go +++ b/worker/temporal/client.go @@ -25,10 +25,7 @@ type Temporal struct { func NewClient() (*Temporal, error) { var temporalClient *Temporal - namespace := constants.DefaultTemporalNamespace - if utils.IsTemporalCloud() { - namespace = utils.GetTemporalNamespace() - } + namespace := utils.GetTemporalNamespace() err := utils.RetryWithBackoff(func() error { opts := client.Options{ @@ -83,7 +80,7 @@ func (t *Temporal) SetWorkflowRetentionPeriod(ctx context.Context) error { return fmt.Errorf("failed to parse retention string: %s", err) } - namespace := constants.DefaultTemporalNamespace + namespace := utils.GetTemporalNamespace() if utils.IsTemporalCloud() { externalClient, err := NewExternalClient() @@ -92,7 +89,6 @@ func (t *Temporal) SetWorkflowRetentionPeriod(ctx context.Context) error { } defer externalClient.Close() - namespace = utils.GetTemporalNamespace() retentionDays := int32(retentionPeriod.Hours() / 24) if retentionDays < 1 { retentionDays = 1 diff --git a/worker/temporal/worker.go b/worker/temporal/worker.go index 41e7526..500a37f 100644 --- a/worker/temporal/worker.go +++ b/worker/temporal/worker.go @@ -8,7 +8,6 @@ import ( "github.com/datazip-inc/olake-helm/worker/database" "github.com/datazip-inc/olake-helm/worker/utils" "github.com/datazip-inc/olake-helm/worker/utils/logger" - "github.com/spf13/viper" "github.com/datazip-inc/olake-helm/worker/executor" enums "go.temporal.io/api/enums/v1" @@ -33,12 +32,7 @@ func NewWorker(ctx context.Context, t *Temporal, e *executor.AbstractExecutor, d NewLoggingInterceptor(), }, } - var w worker.Worker - if viper.GetBool(constants.EnvTemporalExternal) { - w = worker.New(t.GetClient(), utils.GetTemporalTaskQueue(), workerOptions) - } else { - w = worker.New(t.GetClient(), constants.TaskQueue, workerOptions) - } + w := worker.New(t.GetClient(), utils.GetTemporalTaskQueue(), workerOptions) // regsiter workflows w.RegisterWorkflow(RunSyncWorkflow) @@ -55,7 +49,7 @@ func NewWorker(ctx context.Context, t *Temporal, e *executor.AbstractExecutor, d searchAttributes := map[string]enums.IndexedValueType{constants.OperationTypeKey: enums.INDEXED_VALUE_TYPE_KEYWORD} - namespace := constants.DefaultTemporalNamespace + namespace := utils.GetTemporalNamespace() if utils.IsTemporalCloud() { externalClient, err := NewExternalClient() @@ -64,7 +58,6 @@ func NewWorker(ctx context.Context, t *Temporal, e *executor.AbstractExecutor, d } defer externalClient.Close() - namespace = utils.GetTemporalNamespace() if err := externalClient.AddSearchAttributes(ctx, namespace, searchAttributes); err != nil { return nil, fmt.Errorf("failed to add search attributes: %w", err) } diff --git a/worker/utils/utils.go b/worker/utils/utils.go index f7d3d3a..a2002d3 100644 --- a/worker/utils/utils.go +++ b/worker/utils/utils.go @@ -84,6 +84,11 @@ func GetWorkerEnvVars() map[string]string { "PERSISTENT_DIR": nil, "CONTAINER_REGISTRY_BASE": nil, "TEMPORAL_ADDRESS": nil, + "TEMPORAL_API_KEY": nil, + "TEMPORAL_EXTERNAL": nil, + "TEMPORAL_ENABLE_TLS": nil, + "TEMPORAL_NAMESPACE": nil, + "TEMPORAL_TASK_QUEUE": nil, "OLAKE_SECRET_KEY": nil, "_": nil, } @@ -239,16 +244,24 @@ func WorkflowHash(workflowID string) string { return fmt.Sprintf("%x", sha256.Sum256([]byte(workflowID))) } +// GetTemporalNamespace returns the configured namespace when TEMPORAL_EXTERNAL is true, +// otherwise returns the default namespace. func GetTemporalNamespace() string { - if ns := viper.GetString(constants.EnvTemporalNamespace); ns != "" { - return ns + if viper.GetBool(constants.EnvTemporalExternal) { + if ns := viper.GetString(constants.EnvTemporalNamespace); ns != "" { + return ns + } } return constants.DefaultTemporalNamespace } +// GetTemporalTaskQueue returns the configured task queue when TEMPORAL_EXTERNAL is true, +// otherwise returns the default task queue. func GetTemporalTaskQueue() string { - if queue := viper.GetString(constants.EnvTemporalTaskQueue); queue != "" { - return queue + if viper.GetBool(constants.EnvTemporalExternal) { + if queue := viper.GetString(constants.EnvTemporalTaskQueue); queue != "" { + return queue + } } return constants.TaskQueue } From 9bd8bb1a05daf43ae162e04ce4769bd523e80076 Mon Sep 17 00:00:00 2001 From: vishalm0509 Date: Tue, 19 May 2026 02:04:31 +0530 Subject: [PATCH 08/11] chore: add timeout in waitForAsyncOperation method --- worker/temporal/external.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/worker/temporal/external.go b/worker/temporal/external.go index 31eef87..aa685cc 100644 --- a/worker/temporal/external.go +++ b/worker/temporal/external.go @@ -46,8 +46,11 @@ func (c *ExternalClient) Close() { } } -// waitForAsyncOperation polls the async operation until it completes +// waitForAsyncOperation polls the async operation until it completes or times out. func (c *ExternalClient) waitForAsyncOperation(ctx context.Context, opID string) error { + ctx, cancel := context.WithTimeout(ctx, 2*time.Minute) + defer cancel() + service := c.client.CloudService() ticker := time.NewTicker(2 * time.Second) defer ticker.Stop() From b080c59036c506a4e51dd0c03780ad57c7145bed Mon Sep 17 00:00:00 2001 From: vishalm0509 Date: Tue, 19 May 2026 02:36:30 +0530 Subject: [PATCH 09/11] chore: init client once --- worker/temporal/client.go | 29 ++++++++++++++++++----------- worker/temporal/external.go | 24 ++++++++++++------------ worker/temporal/worker.go | 10 ++-------- 3 files changed, 32 insertions(+), 31 deletions(-) diff --git a/worker/temporal/client.go b/worker/temporal/client.go index cbbc4af..4de4609 100644 --- a/worker/temporal/client.go +++ b/worker/temporal/client.go @@ -18,10 +18,11 @@ import ( // Temporal provides methods to interact with Temporal type Temporal struct { - client client.Client + client client.Client + cloudClient *CloudClient // non-nil only when IsTemporalCloud() is true } -// NewClient creates a new Temporal client +// NewClient creates a new Temporal client. func NewClient() (*Temporal, error) { var temporalClient *Temporal @@ -57,11 +58,23 @@ func NewClient() (*Temporal, error) { return nil, fmt.Errorf("failed to create Temporal client: %s", err) } + if utils.IsTemporalCloud() { + cloudClient, err := NewCloudClient() + if err != nil { + temporalClient.Close() + return nil, fmt.Errorf("failed to create Temporal Cloud client: %w", err) + } + temporalClient.cloudClient = cloudClient + } + return temporalClient, nil } -// Close closes the Temporal client +// Close closes the Temporal client and, if initialised, the cloud management client. func (t *Temporal) Close() { + if t.cloudClient != nil { + t.cloudClient.Close() + } if t.client != nil { t.client.Close() } @@ -82,18 +95,12 @@ func (t *Temporal) SetWorkflowRetentionPeriod(ctx context.Context) error { namespace := utils.GetTemporalNamespace() - if utils.IsTemporalCloud() { - externalClient, err := NewExternalClient() - if err != nil { - return fmt.Errorf("failed to create external Temporal client: %w", err) - } - defer externalClient.Close() - + if t.cloudClient != nil { retentionDays := int32(retentionPeriod.Hours() / 24) if retentionDays < 1 { retentionDays = 1 } - return externalClient.SetNamespaceRetention(ctx, namespace, retentionDays) + return t.cloudClient.SetNamespaceRetention(ctx, namespace, retentionDays) } _, err = t.client.WorkflowService().UpdateNamespace(ctx, &workflowservice.UpdateNamespaceRequest{ diff --git a/worker/temporal/external.go b/worker/temporal/external.go index aa685cc..aaac90b 100644 --- a/worker/temporal/external.go +++ b/worker/temporal/external.go @@ -15,13 +15,13 @@ import ( "go.temporal.io/cloud-sdk/cloudclient" ) -// ExternalClient wraps the Temporal Cloud API client -type ExternalClient struct { +// CloudClient wraps the Temporal Cloud API client +type CloudClient struct { client *cloudclient.Client } -// NewExternalClient creates a new Temporal Cloud API client -func NewExternalClient() (*ExternalClient, error) { +// NewCloudClient creates a new Temporal Cloud management API client +func NewCloudClient() (*CloudClient, error) { apiKey := viper.GetString(constants.EnvTemporalAPIKey) if apiKey == "" { return nil, fmt.Errorf("TEMPORAL_API_KEY is required for external Temporal") @@ -31,23 +31,23 @@ func NewExternalClient() (*ExternalClient, error) { APIKey: apiKey, }) if err != nil { - return nil, fmt.Errorf("failed to create external Temporal client: %w", err) + return nil, fmt.Errorf("failed to create Temporal Cloud client: %w", err) } - return &ExternalClient{ + return &CloudClient{ client: client, }, nil } -// Close closes the external Temporal API client -func (c *ExternalClient) Close() { +// Close closes the Temporal Cloud management API client +func (c *CloudClient) Close() { if c.client != nil { c.client.Close() } } // waitForAsyncOperation polls the async operation until it completes or times out. -func (c *ExternalClient) waitForAsyncOperation(ctx context.Context, opID string) error { +func (c *CloudClient) waitForAsyncOperation(ctx context.Context, opID string) error { ctx, cancel := context.WithTimeout(ctx, 2*time.Minute) defer cancel() @@ -81,7 +81,7 @@ func (c *ExternalClient) waitForAsyncOperation(ctx context.Context, opID string) } // SetNamespaceRetention sets the workflow execution retention period via external Temporal API -func (c *ExternalClient) SetNamespaceRetention(ctx context.Context, namespace string, retentionDays int32) error { +func (c *CloudClient) SetNamespaceRetention(ctx context.Context, namespace string, retentionDays int32) error { ns, spec, err := c.getNamespaceAndSpec(ctx, namespace) if err != nil { return err @@ -113,7 +113,7 @@ func (c *ExternalClient) SetNamespaceRetention(ctx context.Context, namespace st } // AddSearchAttributes adds custom search attributes via external Temporal API -func (c *ExternalClient) AddSearchAttributes(ctx context.Context, namespace string, searchAttributes map[string]enums.IndexedValueType) error { +func (c *CloudClient) AddSearchAttributes(ctx context.Context, namespace string, searchAttributes map[string]enums.IndexedValueType) error { ns, spec, err := c.getNamespaceAndSpec(ctx, namespace) if err != nil { return err @@ -176,7 +176,7 @@ func (c *ExternalClient) AddSearchAttributes(ctx context.Context, namespace stri return nil } -func (c *ExternalClient) getNamespaceAndSpec(ctx context.Context, namespace string) (*namespacepb.Namespace, *namespacepb.NamespaceSpec, error) { +func (c *CloudClient) getNamespaceAndSpec(ctx context.Context, namespace string) (*namespacepb.Namespace, *namespacepb.NamespaceSpec, error) { namespaceResp, err := c.client.CloudService().GetNamespace(ctx, &cloudservice.GetNamespaceRequest{ Namespace: namespace, }) diff --git a/worker/temporal/worker.go b/worker/temporal/worker.go index 500a37f..b877d5c 100644 --- a/worker/temporal/worker.go +++ b/worker/temporal/worker.go @@ -51,14 +51,8 @@ func NewWorker(ctx context.Context, t *Temporal, e *executor.AbstractExecutor, d namespace := utils.GetTemporalNamespace() - if utils.IsTemporalCloud() { - externalClient, err := NewExternalClient() - if err != nil { - return nil, fmt.Errorf("failed to create external Temporal client: %w", err) - } - defer externalClient.Close() - - if err := externalClient.AddSearchAttributes(ctx, namespace, searchAttributes); err != nil { + if t.cloudClient != nil { + if err := t.cloudClient.AddSearchAttributes(ctx, namespace, searchAttributes); err != nil { return nil, fmt.Errorf("failed to add search attributes: %w", err) } } else { From eb7436ee1a525b12cc6e71189dc08901cc1ef951 Mon Sep 17 00:00:00 2001 From: Schitiz Sharma Date: Tue, 19 May 2026 11:59:55 +0530 Subject: [PATCH 10/11] chore: supress gosec warning because handled internally by temporal --- worker/temporal/client.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/worker/temporal/client.go b/worker/temporal/client.go index 557edd2..38b6ba0 100644 --- a/worker/temporal/client.go +++ b/worker/temporal/client.go @@ -39,7 +39,7 @@ func NewClient() (*Temporal, error) { if viper.GetBool(constants.EnvTemporalEnableTLS) { opts.ConnectionOptions = client.ConnectionOptions{ - TLS: &tls.Config{}, + TLS: &tls.Config{}, // #nosec G402 -- Temporal SDK handles TLS negotiation internally } } From 453ab3fe924973b693d33700c1fd4b70f6453f3c Mon Sep 17 00:00:00 2001 From: Schitiz Sharma Date: Mon, 25 May 2026 13:09:52 +0530 Subject: [PATCH 11/11] feat: set minVersion for TLS --- worker/temporal/client.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/worker/temporal/client.go b/worker/temporal/client.go index e94bd15..64791bb 100644 --- a/worker/temporal/client.go +++ b/worker/temporal/client.go @@ -37,7 +37,9 @@ func NewClient() (*Temporal, error) { if viper.GetBool(constants.EnvTemporalEnableTLS) { opts.ConnectionOptions = client.ConnectionOptions{ - TLS: &tls.Config{}, // #nosec G402 -- Temporal SDK handles TLS negotiation internally + TLS: &tls.Config{ + MinVersion: tls.VersionTLS12, + }, } }