Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions helm/olake/templates/_helpers.tpl
Original file line number Diff line number Diff line change
Expand Up @@ -119,3 +119,14 @@ Return the PostgreSQL secret name
{{- printf "%s-external-postgresql" (include "olake.fullname" .) }}
{{- end }}
{{- end }}

{{/*
Return the Temporal secret name
*/}}
{{- define "olake.temporal.secretName" -}}
{{- if .Values.temporal.external.existingSecret }}
{{- .Values.temporal.external.existingSecret }}
{{- else }}
{{- printf "%s-external-temporal" (include "olake.fullname" .) }}
{{- end }}
{{- end }}
19 changes: 19 additions & 0 deletions helm/olake/templates/external-temporal-secret.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
{{- if and (not .Values.temporal.enabled) (not .Values.temporal.external.existingSecret) }}
apiVersion: v1
kind: Secret
metadata:
name: {{ include "olake.temporal.secretName" . }}
namespace: {{ include "olake.namespace" . }}
labels:
{{- include "olake.labels" . | nindent 4 }}
type: Opaque
stringData:
{{- with .Values.temporal.external.properties }}
{{ $.Values.temporal.external.secretKeys.TEMPORAL_ADDRESS | default "TEMPORAL_ADDRESS" }}: {{ .TEMPORAL_ADDRESS | quote }}
{{ $.Values.temporal.external.secretKeys.TEMPORAL_API_KEY | default "TEMPORAL_API_KEY" }}: {{ .TEMPORAL_API_KEY | quote }}
{{ $.Values.temporal.external.secretKeys.TEMPORAL_ENABLE_TLS | default "TEMPORAL_ENABLE_TLS" }}: {{ .TEMPORAL_ENABLE_TLS | quote }}
{{ $.Values.temporal.external.secretKeys.TEMPORAL_EXTERNAL | default "TEMPORAL_EXTERNAL" }}: {{ .TEMPORAL_EXTERNAL | quote }}
{{ $.Values.temporal.external.secretKeys.TEMPORAL_NAMESPACE | default "TEMPORAL_NAMESPACE" }}: {{ .TEMPORAL_NAMESPACE | quote }}
{{ $.Values.temporal.external.secretKeys.TEMPORAL_TASK_QUEUE | default "TEMPORAL_TASK_QUEUE" }}: {{ .TEMPORAL_TASK_QUEUE | quote }}
{{- end }}
{{- end }}
4 changes: 4 additions & 0 deletions helm/olake/templates/olake-ui/configmap.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,11 @@ data:
OLAKE_POSTGRES_SSLMODE = ${OLAKE_POSTGRES_SSLMODE}

# Temporal Configuration
{{- if .Values.temporal.enabled }}
TEMPORAL_ADDRESS = temporal.{{ include "olake.namespace" . }}.svc.cluster.local:7233
{{- else }}
TEMPORAL_ADDRESS = ${TEMPORAL_ADDRESS}
{{- end }}
Comment thread
schitizsharma marked this conversation as resolved.

# Optimization Configuration
ENABLE_OPTIMIZATION = ${ENABLE_OPTIMIZATION||false}
Expand Down
9 changes: 9 additions & 0 deletions helm/olake/templates/olake-ui/deployment.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,9 @@ spec:
{{- if (include "olake.jobServiceAccountName" .) }}
serviceAccountName: {{ include "olake.jobServiceAccountName" . }}
{{- end }}
{{- if or .Values.temporal.enabled .Values.fusion.enabled }}
initContainers:
{{- if .Values.temporal.enabled }}
- name: wait-for-temporal
image: "{{ include "olake.registryBase" . }}/library/busybox:latest"
imagePullPolicy: IfNotPresent
Expand All @@ -67,6 +69,7 @@ spec:
sleep 5
done
echo "Temporal is ready!"
{{- end }}
{{- if .Values.fusion.enabled }}
- name: wait-for-fusion
image: "{{ include "olake.registryBase" . }}/library/busybox:latest"
Expand All @@ -82,6 +85,7 @@ spec:
done
echo "Fusion REST service is ready!"
{{- end }}
{{- end }}
containers:
- name: olake-ui
image: "{{ include "olake.registryBase" . }}/{{ .Values.olakeUI.image.repository }}:{{ .Values.olakeUI.image.tag }}"
Expand Down Expand Up @@ -149,6 +153,11 @@ spec:
secretKeyRef:
name: {{ include "olake.postgresql.secretName" . }}
key: {{ if .Values.postgresql.enabled }}ssl_mode{{ else }}{{ .Values.postgresql.external.secretKeys.ssl_mode }}{{ end }}
{{- if not .Values.temporal.enabled }}
envFrom:
- secretRef:
name: {{ include "olake.temporal.secretName" . }}
{{- end }}
# - name: POSTGRES_DB
# value: "postgres://$(POSTGRES_DB_USER):$(POSTGRES_DB_PASSWORD)@$(POSTGRES_DB_HOST):$(POSTGRES_DB_PORT)/$(POSTGRES_DB_NAME)?sslmode=$(POSTGRES_DB_SSLMODE)"
volumeMounts:
Expand Down
2 changes: 2 additions & 0 deletions helm/olake/templates/olake-worker/configmap.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@ metadata:
app.kubernetes.io/component: workers
data:
# Temporal Configuration
{{- if .Values.temporal.enabled }}
TEMPORAL_ADDRESS: "temporal.{{ include "olake.namespace" . }}.svc.cluster.local:7233"
{{- end }}

# Kubernetes Configuration
WORKER_NAMESPACE: {{ include "olake.namespace" . | quote }}
Expand Down
6 changes: 6 additions & 0 deletions helm/olake/templates/olake-worker/deployment.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ spec:
{{- toYaml . | nindent 8 }}
{{- end }}
serviceAccountName: {{ include "olake.workerServiceAccountName" . }}
{{- if .Values.temporal.enabled }}
initContainers:
- name: wait-for-temporal
image: "{{ include "olake.registryBase" . }}/library/busybox:latest"
Expand All @@ -64,6 +65,7 @@ spec:
sleep 5
done
echo "Temporal is ready!"
{{- end }}
containers:
- name: olake-workers
image: "{{ include "olake.registryBase" . }}/{{ .Values.olakeWorker.image.repository }}:{{ .Values.olakeWorker.image.tag }}"
Expand Down Expand Up @@ -129,6 +131,10 @@ spec:
envFrom:
- configMapRef:
name: olake-workers-config
{{- if not .Values.temporal.enabled }}
- secretRef:
name: {{ include "olake.temporal.secretName" . }}
{{- end }}
{{- $defaultResources := dict
"requests" (dict "memory" "256Mi" "cpu" "250m")
}}
Expand Down
4 changes: 3 additions & 1 deletion helm/olake/templates/temporal/deployment.yaml
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
{{- if .Values.temporal.enabled }}
apiVersion: apps/v1
kind: Deployment
metadata:
Expand Down Expand Up @@ -173,4 +174,5 @@ spec:
}}
{{- $livenessProbe := mergeOverwrite $defaultLivenessProbe (.Values.temporal.server.livenessProbe | default dict) }}
livenessProbe:
{{- toYaml $livenessProbe | nindent 10 }}
{{- toYaml $livenessProbe | nindent 10 }}
{{- end }}
4 changes: 3 additions & 1 deletion helm/olake/templates/temporal/service.yaml
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
{{- if .Values.temporal.enabled }}
apiVersion: v1
kind: Service
metadata:
Expand All @@ -22,4 +23,5 @@ spec:
selector:
app.kubernetes.io/name: temporal
app.kubernetes.io/instance: {{ .Release.Name }}
app.kubernetes.io/component: workflow-engine
app.kubernetes.io/component: workflow-engine
{{- end }}
2 changes: 1 addition & 1 deletion helm/olake/templates/temporal/ui-deployment.yaml
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
{{- if .Values.temporal.ui.enabled }}
{{- if and .Values.temporal.enabled .Values.temporal.ui.enabled }}
apiVersion: apps/v1
kind: Deployment
metadata:
Expand Down
2 changes: 1 addition & 1 deletion helm/olake/templates/temporal/ui-service.yaml
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
{{- if .Values.temporal.ui.enabled }}
{{- if and .Values.temporal.enabled .Values.temporal.ui.enabled }}
apiVersion: v1
kind: Service
metadata:
Expand Down
38 changes: 38 additions & 0 deletions helm/olake/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -369,6 +369,44 @@ nfsServer:

# Temporal configuration for workflow orchestration
temporal:
# -- Enable internal Temporal deployment
# Set to false to use an external Temporal Cloud/server
enabled: true

# -- External Temporal configuration (used when enabled: false)
# Provide Kubernetes secret containing Temporal credentials/configuration
external:
# -- Name of the Kubernetes secret containing external Temporal configuration
# The secret must contain keys:
# TEMPORAL_ADDRESS, TEMPORAL_API_KEY, TEMPORAL_ENABLE_TLS, TEMPORAL_EXTERNAL, TEMPORAL_NAMESPACE, TEMPORAL_TASK_QUEUE
# Example secret creation:
# kubectl create secret generic my-temporal-secret \
# --from-literal=TEMPORAL_ADDRESS=<region>.<cloud>.api.temporal.io:7233 \
# --from-literal=TEMPORAL_API_KEY=ldjflskjdglsed.sidhfksdhf.sdlkjglksjdgljsd-sldjglksjdgljs-sd;kg;sldjg;ljsd-sdklhglksdhglskjd \
# --from-literal=TEMPORAL_ENABLE_TLS=true \
# --from-literal=TEMPORAL_EXTERNAL=true \
# --from-literal=TEMPORAL_NAMESPACE=<namespace> \
# --from-literal=TEMPORAL_TASK_QUEUE=<queue_name>
existingSecret: ""

# -- Explicit Temporal properties (used if existingSecret is not set)
properties:
TEMPORAL_ADDRESS: ""
TEMPORAL_API_KEY: ""
TEMPORAL_ENABLE_TLS: "true"
TEMPORAL_EXTERNAL: "true"
TEMPORAL_NAMESPACE: ""
TEMPORAL_TASK_QUEUE: ""

# -- Map the keys from existing secret to the required fields
secretKeys:
TEMPORAL_ADDRESS: "TEMPORAL_ADDRESS"
TEMPORAL_API_KEY: "TEMPORAL_API_KEY"
TEMPORAL_ENABLE_TLS: "TEMPORAL_ENABLE_TLS"
TEMPORAL_EXTERNAL: "TEMPORAL_EXTERNAL"
TEMPORAL_NAMESPACE: "TEMPORAL_NAMESPACE"
TEMPORAL_TASK_QUEUE: "TEMPORAL_TASK_QUEUE"

# -- Temporal server configuration
server:
# -- Temporal server image configuration
Expand Down
5 changes: 5 additions & 0 deletions worker/constants/env.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,11 @@ const (
// temporal
EnvTemporalAddress = "TEMPORAL_ADDRESS"
EnvTemporalRetentionPeriod = "TEMPORAL_RETENTION_PERIOD"
EnvTemporalExternal = "TEMPORAL_EXTERNAL"
EnvTemporalAPIKey = "TEMPORAL_API_KEY"
EnvTemporalNamespace = "TEMPORAL_NAMESPACE"
EnvTemporalEnableTLS = "TEMPORAL_ENABLE_TLS"
EnvTemporalTaskQueue = "TEMPORAL_TASK_QUEUE"

// registry
ContainerRegistryBase = "CONTAINER_REGISTRY_BASE"
Expand Down
1 change: 1 addition & 0 deletions worker/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ require (
github.com/stretchr/testify v1.11.1
github.com/testcontainers/testcontainers-go v0.42.0
go.temporal.io/api v1.51.0
go.temporal.io/cloud-sdk v0.8.0
go.temporal.io/sdk v1.36.0
google.golang.org/grpc v1.80.0
google.golang.org/protobuf v1.36.11
Expand Down
2 changes: 2 additions & 0 deletions worker/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -295,6 +295,8 @@ go.opentelemetry.io/otel/trace v1.43.0 h1:BkNrHpup+4k4w+ZZ86CZoHHEkohws8AY+WTX09
go.opentelemetry.io/otel/trace v1.43.0/go.mod h1:/QJhyVBUUswCphDVxq+8mld+AvhXZLhe+8WVFxiFff0=
go.temporal.io/api v1.51.0 h1:9+e14GrIa7nWoWoudqj/PSwm33yYjV+u8TAR9If7s/g=
go.temporal.io/api v1.51.0/go.mod h1:iaxoP/9OXMJcQkETTECfwYq4cw/bj4nwov8b3ZLVnXM=
go.temporal.io/cloud-sdk v0.8.0 h1:+MRcD1EEdZLWT/xFn5pG5/FHmYok/6Jif2FsZxZByV8=
go.temporal.io/cloud-sdk v0.8.0/go.mod h1:W2O9t9tvo3Q/LhGgYdj8JijWbN5C84os+cz/BadIHYI=
go.temporal.io/sdk v1.36.0 h1:WO9zetpybBNK7xsQth4Z+3Zzw1zSaM9MOUGrnnUjZMo=
go.temporal.io/sdk v1.36.0/go.mod h1:8BxGRF0LcQlfQrLLGkgVajbsKUp/PY7280XTdcKc18Y=
go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto=
Expand Down
4 changes: 3 additions & 1 deletion worker/temporal/activity.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,13 +155,15 @@ func (a *Activity) PostClearActivity(ctx context.Context, req *types.ExecutionRe
scheduleID := fmt.Sprintf("schedule-%s", workflowID)
handle := a.tempClient.ScheduleClient().GetHandle(ctx, scheduleID)

taskQueue := utils.GetTemporalTaskQueue()

err := handle.Update(ctx, client.ScheduleUpdateOptions{
DoUpdate: func(input client.ScheduleUpdateInput) (*client.ScheduleUpdate, error) {
input.Description.Schedule.Action = &client.ScheduleWorkflowAction{
ID: workflowID,
Workflow: RunSyncWorkflow,
Args: []any{req},
TaskQueue: constants.TaskQueue,
TaskQueue: taskQueue,
}

if input.Description.Schedule.State != nil {
Expand Down
64 changes: 52 additions & 12 deletions worker/temporal/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package temporal

import (
"context"
"crypto/tls"
"fmt"
"time"

Expand All @@ -17,19 +18,36 @@ import (

// Temporal provides methods to interact with Temporal
type Temporal struct {
client client.Client
client client.Client
cloudClient *CloudClient // non-nil only when IsTemporalCloud() is true
}

// NewClient creates a new Temporal client
// NewClient creates a new Temporal client.
func NewClient() (*Temporal, error) {
var temporalClient *Temporal

namespace := utils.GetTemporalNamespace()

err := utils.RetryWithBackoff(func() error {
client, err := client.Dial(client.Options{
HostPort: viper.GetString(constants.EnvTemporalAddress),
Logger: logger.Log(context.Background()),
Namespace: constants.DefaultTemporalNamespace,
})
opts := client.Options{
HostPort: viper.GetString(constants.EnvTemporalAddress),
Logger: logger.Log(context.Background()),
Namespace: namespace,
}

if viper.GetBool(constants.EnvTemporalEnableTLS) {
opts.ConnectionOptions = client.ConnectionOptions{
TLS: &tls.Config{
MinVersion: tls.VersionTLS12,
},
}
}

if apiKey := viper.GetString(constants.EnvTemporalAPIKey); apiKey != "" {
opts.Credentials = client.NewAPIKeyStaticCredentials(apiKey)
}

client, err := client.Dial(opts)
if err != nil {
return err
}
Expand All @@ -42,11 +60,23 @@ func NewClient() (*Temporal, error) {
return nil, fmt.Errorf("failed to create Temporal client: %s", err)
}

if utils.IsTemporalCloud() {
cloudClient, err := NewCloudClient()
if err != nil {
temporalClient.Close()
return nil, fmt.Errorf("failed to create Temporal Cloud client: %w", err)
}
temporalClient.cloudClient = cloudClient
}

return temporalClient, nil
}

// Close closes the Temporal client
// Close closes the Temporal client and, if initialised, the cloud management client.
func (t *Temporal) Close() {
if t.cloudClient != nil {
t.cloudClient.Close()
}
if t.client != nil {
t.client.Close()
}
Expand All @@ -55,7 +85,7 @@ func (t *Temporal) GetClient() client.Client {
return t.client
}

// SetWorkflowRetentionPeriod sets the workflow execution retention period for the default namespace.
// SetWorkflowRetentionPeriod sets the workflow execution retention period for the namespace.
// This ensures workflow history is available for debugging (defaults to 7 days).
// Handles both fresh installs and upgrades from shorter retention periods.
// Fatal: worker fails to start if this fails.
Expand All @@ -65,16 +95,26 @@ func (t *Temporal) SetWorkflowRetentionPeriod(ctx context.Context) error {
return fmt.Errorf("failed to parse retention string: %s", err)
}

namespace := utils.GetTemporalNamespace()

if t.cloudClient != nil {
retentionDays := int32(retentionPeriod.Hours() / 24)
if retentionDays < 1 {
retentionDays = 1
}
return t.cloudClient.SetNamespaceRetention(ctx, namespace, retentionDays)
}

_, err = t.client.WorkflowService().UpdateNamespace(ctx, &workflowservice.UpdateNamespaceRequest{
Namespace: constants.DefaultTemporalNamespace,
Namespace: namespace,
Config: &namespacepb.NamespaceConfig{
WorkflowExecutionRetentionTtl: durationpb.New(retentionPeriod),
},
})
if err != nil {
return fmt.Errorf("failed to update namespace %s retention: %s", constants.DefaultTemporalNamespace, err)
return fmt.Errorf("failed to update namespace %s retention: %s", namespace, err)
}

logger.Infof("namespace %s retention set to %s", constants.DefaultTemporalNamespace, retentionPeriod)
logger.Infof("namespace %s retention set to %s", namespace, retentionPeriod)
return nil
}
Loading