Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions helm/olake/Chart.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@ apiVersion: v2
name: olake
description: A Helm chart for OLake UI - Fastest open-source tool for replicating Databases to Apache Iceberg or Data Lakehouse
type: application
version: 0.0.17
appVersion: "0.3.5"
version: 0.0.18
appVersion: "0.3.6"
home: https://github.com/datazip-inc/olake-helm
sources:
- https://github.com/datazip-inc/olake-helm
Expand Down
5 changes: 5 additions & 0 deletions helm/olake/templates/postgresql/secret.yaml
Original file line number Diff line number Diff line change
@@ -1,13 +1,18 @@
{{- /*
TODO: Remove the Helm hook logic in a future release (e.g. v1.0.0).
*/ -}}
{{- if .Values.postgresql.enabled }}
apiVersion: v1
kind: Secret
metadata:
name: {{ include "olake.fullname" . }}-postgresql-secret
namespace: {{ include "olake.namespace" . }}
annotations:
{{- if not .Values.useStandardResources }}
"helm.sh/hook": pre-install,pre-upgrade
"helm.sh/hook-weight": "10"
"helm.sh/hook-delete-policy": before-hook-creation
{{- end }}
labels:
app.kubernetes.io/name: postgresql
app.kubernetes.io/instance: {{ .Release.Name }}
Expand Down
4 changes: 2 additions & 2 deletions index.html
Original file line number Diff line number Diff line change
Expand Up @@ -127,8 +127,8 @@ <h2>📦 Available Charts</h2>
<div class="chart-card">
<h3>olake</h3>
<p><strong>Description:</strong> A comprehensive Helm chart for deploying OLake UI and Worker components along with required infrastructure (PostgreSQL, Temporal, Elasticsearch, optional NFS server).</p>
<p><strong>Version:</strong> 0.0.17</p>
<p><strong>App Version:</strong> 0.3.5</p>
<p><strong>Version:</strong> 0.0.18</p>
<p><strong>App Version:</strong> 0.3.6</p>
<p><strong>Keywords:</strong> data-pipeline, elt, kubernetes, iceberg, data-lakehouse, olake</p>
<div style="margin-top: 15px;">
<a href="https://github.com/datazip-inc/olake-helm/tree/master/helm/olake/README.md" target="_blank">📚 Documentation</a> |
Expand Down
25 changes: 25 additions & 0 deletions worker/database/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"time"

"github.com/datazip-inc/olake-helm/worker/types"
"github.com/datazip-inc/olake-helm/worker/utils"
"github.com/datazip-inc/olake-helm/worker/utils/logger"
"github.com/lib/pq"
)
Expand All @@ -14,6 +15,24 @@ const (
queryTimeout = 5 * time.Second
)

// decryptJobData decrypts the Source and Destination config fields of a JobData.
// If OLAKE_SECRET_KEY is not configured, Decrypt returns the value unchanged.
func decryptJobData(jobData *types.JobData) error {
decryptedSource, err := utils.Decrypt(jobData.Source)
if err != nil {
return fmt.Errorf("failed to decrypt source config: %s", err)
}
jobData.Source = decryptedSource

decryptedDest, err := utils.Decrypt(jobData.Destination)
if err != nil {
return fmt.Errorf("failed to decrypt destination config: %s", err)
}
jobData.Destination = decryptedDest

return nil
}

func (db *DB) GetJobData(ctx context.Context, jobId int) (types.JobData, error) {
log := logger.Log(ctx)
cctx, cancel := context.WithTimeout(ctx, queryTimeout)
Expand All @@ -34,6 +53,12 @@ func (db *DB) GetJobData(ctx context.Context, jobId int) (types.JobData, error)
log.Error("failed to get job data from database", "jobID", jobId, "error", err)
return types.JobData{}, fmt.Errorf("failed to scan job data: %w", err)
}

if err := decryptJobData(&jobData); err != nil {
log.Error("failed to decrypt job data", "jobID", jobId, "error", err)
return types.JobData{}, fmt.Errorf("failed to decrypt job data job_id[%d]: %s", jobId, err)
}

return jobData, nil
}

Expand Down
15 changes: 15 additions & 0 deletions worker/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,21 @@ require (
github.com/Azure/go-ansiterm v0.0.0-20250102033503-faa5f7b0171c // indirect
github.com/Microsoft/go-winio v0.6.2 // indirect
github.com/apache/arrow-go/v18 v18.2.0 // indirect
github.com/aws/aws-sdk-go-v2 v1.41.7 // indirect
github.com/aws/aws-sdk-go-v2/config v1.32.17 // indirect
github.com/aws/aws-sdk-go-v2/credentials v1.19.16 // indirect
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.18.23 // indirect
github.com/aws/aws-sdk-go-v2/internal/configsources v1.4.23 // indirect
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.7.23 // indirect
github.com/aws/aws-sdk-go-v2/internal/v4a v1.4.24 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.13.9 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.13.23 // indirect
github.com/aws/aws-sdk-go-v2/service/kms v1.51.1 // indirect
github.com/aws/aws-sdk-go-v2/service/signin v1.0.11 // indirect
github.com/aws/aws-sdk-go-v2/service/sso v1.30.17 // indirect
github.com/aws/aws-sdk-go-v2/service/ssooidc v1.35.21 // indirect
github.com/aws/aws-sdk-go-v2/service/sts v1.42.1 // indirect
github.com/aws/smithy-go v1.25.1 // indirect
github.com/cenkalti/backoff/v4 v4.3.0 // indirect
github.com/cespare/xxhash/v2 v2.3.0 // indirect
github.com/containerd/errdefs/pkg v0.3.0 // indirect
Expand Down
30 changes: 30 additions & 0 deletions worker/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,36 @@ github.com/apache/spark-connect-go/v35 v35.0.0-20250317154112-ffd832059443 h1:pA
github.com/apache/spark-connect-go/v35 v35.0.0-20250317154112-ffd832059443/go.mod h1:ODlxb8YN0y/JyS7h+vhz+afnQ+beSkYTqDHYtg2T6E8=
github.com/apache/thrift v0.21.0 h1:tdPmh/ptjE1IJnhbhrcl2++TauVjy242rkV/UzJChnE=
github.com/apache/thrift v0.21.0/go.mod h1:W1H8aR/QRtYNvrPeFXBtobyRkd0/YVhTc6i07XIAgDw=
github.com/aws/aws-sdk-go-v2 v1.41.7 h1:DWpAJt66FmnnaRIOT/8ASTucrvuDPZASqhhLey6tLY8=
github.com/aws/aws-sdk-go-v2 v1.41.7/go.mod h1:4LAfZOPHNVNQEckOACQx60Y8pSRjIkNZQz1w92xpMJc=
github.com/aws/aws-sdk-go-v2/config v1.32.17 h1:FpL4/758/diKwqbytU0prpuiu60fgXKUWCpDJtApclU=
github.com/aws/aws-sdk-go-v2/config v1.32.17/go.mod h1:OXqUMzgXytfoF9JaKkhrOYsyh72t9G+MJH8mMRaexOE=
github.com/aws/aws-sdk-go-v2/credentials v1.19.16 h1:r3RJBuU7X9ibt8RHbMjWE6y60QbKBiII6wSrXnapxSU=
github.com/aws/aws-sdk-go-v2/credentials v1.19.16/go.mod h1:6cx7zqDENJDbBIIWX6P8s0h6hqHC8Avbjh9Dseo27ug=
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.18.23 h1:UuSfcORqNSz/ey3VPRS8TcVH2Ikf0/sC+Hdj400QI6U=
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.18.23/go.mod h1:+G/OSGiOFnSOkYloKj/9M35s74LgVAdJBSD5lsFfqKg=
github.com/aws/aws-sdk-go-v2/internal/configsources v1.4.23 h1:GpT/TrnBYuE5gan2cZbTtvP+JlHsutdmlV2YfEyNde0=
github.com/aws/aws-sdk-go-v2/internal/configsources v1.4.23/go.mod h1:xYWD6BS9ywC5bS3sz9Xh04whO/hzK2plt2Zkyrp4JuA=
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.7.23 h1:bpd8vxhlQi2r1hiueOw02f/duEPTMK59Q4QMAoTTtTo=
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.7.23/go.mod h1:15DfR2nw+CRHIk0tqNyifu3G1YdAOy68RftkhMDDwYk=
github.com/aws/aws-sdk-go-v2/internal/v4a v1.4.24 h1:OQqn11BtaYv1WLUowvcA30MpzIu8Ti4pcLPIIyoKZrA=
github.com/aws/aws-sdk-go-v2/internal/v4a v1.4.24/go.mod h1:X5ZJyfwVrWA96GzPmUCWFQaEARPR7gCrpq2E92PJwAE=
github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.13.9 h1:FLudkZLt5ci0ozzgkVo8BJGwvqNaZbTWb3UcucAateA=
github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.13.9/go.mod h1:w7wZ/s9qK7c8g4al+UyoF1Sp/Z45UwMGcqIzLWVQHWk=
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.13.23 h1:pbrxO/kuIwgEsOPLkaHu0O+m4fNgLU8B3vxQ+72jTPw=
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.13.23/go.mod h1:/CMNUqoj46HpS3MNRDEDIwcgEnrtZlKRaHNaHxIFpNA=
github.com/aws/aws-sdk-go-v2/service/kms v1.51.1 h1:zuSf4olLKZW8cF/W9Y5wvGT+/0raY/3kVp49KsGs0QY=
github.com/aws/aws-sdk-go-v2/service/kms v1.51.1/go.mod h1:Y0+uxvxz6ib4KktRdK0V4X45Vcs/JyYoz8H71pO8xeI=
github.com/aws/aws-sdk-go-v2/service/signin v1.0.11 h1:TdJ+HdzOBhU8+iVAOGUTU63VXopcumCOF1paFulHWZc=
github.com/aws/aws-sdk-go-v2/service/signin v1.0.11/go.mod h1:R82ZRExE/nheo0N+T8zHPcLRTcH8MGsnR3BiVGX0TwI=
github.com/aws/aws-sdk-go-v2/service/sso v1.30.17 h1:7byT8HUWrgoRp6sXjxtZwgOKfhss5fW6SkLBtqzgRoE=
github.com/aws/aws-sdk-go-v2/service/sso v1.30.17/go.mod h1:xNWknVi4Ezm1vg1QsB/5EWpAJURq22uqd38U8qKvOJc=
github.com/aws/aws-sdk-go-v2/service/ssooidc v1.35.21 h1:+1Kl1zx6bWi4X7cKi3VYh29h8BvsCoHQEQ6ST9X8w7w=
github.com/aws/aws-sdk-go-v2/service/ssooidc v1.35.21/go.mod h1:4vIRDq+CJB2xFAXZ+YgGUTiEft7oAQlhIs71xcSeuVg=
github.com/aws/aws-sdk-go-v2/service/sts v1.42.1 h1:F/M5Y9I3nwr2IEpshZgh1GeHpOItExNM9L1euNuh/fk=
github.com/aws/aws-sdk-go-v2/service/sts v1.42.1/go.mod h1:mTNxImtovCOEEuD65mKW7DCsL+2gjEH+RPEAexAzAio=
github.com/aws/smithy-go v1.25.1 h1:J8ERsGSU7d+aCmdQur5Txg6bVoYelvQJgtZehD12GkI=
github.com/aws/smithy-go v1.25.1/go.mod h1:YE2RhdIuDbA5E5bTdciG9KrW3+TiEONeUWCqxX9i1Fc=
github.com/cenkalti/backoff/v4 v4.3.0 h1:MyRJ/UdXutAwSAT+s3wNd7MfTIcy71VQueUuFK343L8=
github.com/cenkalti/backoff/v4 v4.3.0/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE=
github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs=
Expand Down
99 changes: 99 additions & 0 deletions worker/utils/encryption.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
package utils

import (
"context"
"crypto/aes"
"crypto/cipher"
"crypto/sha256"
"encoding/base64"
"encoding/json"
"errors"
"fmt"
"strings"

"github.com/aws/aws-sdk-go-v2/config"
"github.com/aws/aws-sdk-go-v2/service/kms"
"github.com/datazip-inc/olake-helm/worker/constants"
"github.com/spf13/viper"
)

// getSecretKey returns the AES key bytes and optionally a KMS client.
//
// - If OLAKE_SECRET_KEY is empty → encryption disabled (nil key, nil client)
// - If OLAKE_SECRET_KEY starts with "arn:aws:kms:" → AWS KMS mode
// - Otherwise → local AES-256-GCM (key is SHA-256 of the env value)
func getSecretKey() ([]byte, *kms.Client, error) {
envKey := viper.GetString(constants.EnvSecretKey)
if strings.TrimSpace(envKey) == "" {
return []byte{}, nil, nil // Encryption is disabled
}

if strings.HasPrefix(envKey, "arn:aws:kms:") {
cfg, err := config.LoadDefaultConfig(context.Background())
if err != nil {
return nil, nil, fmt.Errorf("failed to load AWS config: %s", err)
}
return []byte(envKey), kms.NewFromConfig(cfg), nil
}

// Local AES-GCM mode — derive 256-bit key via SHA-256
hash := sha256.Sum256([]byte(envKey))
return hash[:], nil, nil
}

// Decrypt decrypts a value that was encrypted by the server's Encrypt function.
// The stored format is a JSON-quoted, base64-encoded ciphertext (nonce prepended for AES-GCM).
// If OLAKE_SECRET_KEY is not set, the raw encryptedText is returned unchanged.
func Decrypt(encryptedText string) (string, error) {
if strings.TrimSpace(encryptedText) == "" {
return "", fmt.Errorf("cannot decrypt empty or whitespace-only input")
}

key, kmsClient, err := getSecretKey()
if err != nil || key == nil || len(key) == 0 {
return encryptedText, err
}

// The server stores the ciphertext as a JSON-quoted string (via fmt.Sprintf("%q", ...))
var decoded string
if err := json.Unmarshal([]byte(encryptedText), &decoded); err != nil {
return "", fmt.Errorf("failed to unmarshal JSON string: %s", err)
}

encryptedData, err := base64.StdEncoding.DecodeString(decoded)
if err != nil {
return "", fmt.Errorf("failed to decode base64 data: %s", err)
}

// AWS KMS path
if kmsClient != nil {
result, err := kmsClient.Decrypt(context.Background(), &kms.DecryptInput{
CiphertextBlob: encryptedData,
})
if err != nil {
return "", fmt.Errorf("failed to decrypt with KMS: %s", err)
}
return string(result.Plaintext), nil
}

// Local AES-256-GCM path
block, err := aes.NewCipher(key)
if err != nil {
return "", fmt.Errorf("failed to create cipher: %s", err)
}

gcm, err := cipher.NewGCM(block)
if err != nil {
return "", fmt.Errorf("failed to create GCM: %s", err)
}

if len(encryptedData) < gcm.NonceSize() {
return "", errors.New("ciphertext too short")
}

plaintext, err := gcm.Open(nil, encryptedData[:gcm.NonceSize()], encryptedData[gcm.NonceSize():], nil)
if err != nil {
return "", fmt.Errorf("failed to decrypt: %s", err)
}
return string(plaintext), nil
}
Loading