📚 Documentation |
diff --git a/worker/database/job.go b/worker/database/job.go
index 263ee63..bd66735 100644
--- a/worker/database/job.go
+++ b/worker/database/job.go
@@ -6,6 +6,7 @@ import (
"time"
"github.com/datazip-inc/olake-helm/worker/types"
+ "github.com/datazip-inc/olake-helm/worker/utils"
"github.com/datazip-inc/olake-helm/worker/utils/logger"
"github.com/lib/pq"
)
@@ -14,6 +15,24 @@ const (
queryTimeout = 5 * time.Second
)
+// decryptJobData decrypts the Source and Destination config fields of a JobData.
+// If OLAKE_SECRET_KEY is not configured, Decrypt returns the value unchanged.
+func decryptJobData(jobData *types.JobData) error {
+ decryptedSource, err := utils.Decrypt(jobData.Source)
+ if err != nil {
+ return fmt.Errorf("failed to decrypt source config: %s", err)
+ }
+ jobData.Source = decryptedSource
+
+ decryptedDest, err := utils.Decrypt(jobData.Destination)
+ if err != nil {
+ return fmt.Errorf("failed to decrypt destination config: %s", err)
+ }
+ jobData.Destination = decryptedDest
+
+ return nil
+}
+
func (db *DB) GetJobData(ctx context.Context, jobId int) (types.JobData, error) {
log := logger.Log(ctx)
cctx, cancel := context.WithTimeout(ctx, queryTimeout)
@@ -34,6 +53,12 @@ func (db *DB) GetJobData(ctx context.Context, jobId int) (types.JobData, error)
log.Error("failed to get job data from database", "jobID", jobId, "error", err)
return types.JobData{}, fmt.Errorf("failed to scan job data: %w", err)
}
+
+ if err := decryptJobData(&jobData); err != nil {
+ log.Error("failed to decrypt job data", "jobID", jobId, "error", err)
+ return types.JobData{}, fmt.Errorf("failed to decrypt job data job_id[%d]: %s", jobId, err)
+ }
+
return jobData, nil
}
diff --git a/worker/go.mod b/worker/go.mod
index dc3fbeb..65df24b 100644
--- a/worker/go.mod
+++ b/worker/go.mod
@@ -30,6 +30,21 @@ require (
github.com/Azure/go-ansiterm v0.0.0-20250102033503-faa5f7b0171c // indirect
github.com/Microsoft/go-winio v0.6.2 // indirect
github.com/apache/arrow-go/v18 v18.2.0 // indirect
+ github.com/aws/aws-sdk-go-v2 v1.41.7 // indirect
+ github.com/aws/aws-sdk-go-v2/config v1.32.17 // indirect
+ github.com/aws/aws-sdk-go-v2/credentials v1.19.16 // indirect
+ github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.18.23 // indirect
+ github.com/aws/aws-sdk-go-v2/internal/configsources v1.4.23 // indirect
+ github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.7.23 // indirect
+ github.com/aws/aws-sdk-go-v2/internal/v4a v1.4.24 // indirect
+ github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.13.9 // indirect
+ github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.13.23 // indirect
+ github.com/aws/aws-sdk-go-v2/service/kms v1.51.1 // indirect
+ github.com/aws/aws-sdk-go-v2/service/signin v1.0.11 // indirect
+ github.com/aws/aws-sdk-go-v2/service/sso v1.30.17 // indirect
+ github.com/aws/aws-sdk-go-v2/service/ssooidc v1.35.21 // indirect
+ github.com/aws/aws-sdk-go-v2/service/sts v1.42.1 // indirect
+ github.com/aws/smithy-go v1.25.1 // indirect
github.com/cenkalti/backoff/v4 v4.3.0 // indirect
github.com/cespare/xxhash/v2 v2.3.0 // indirect
github.com/containerd/errdefs/pkg v0.3.0 // indirect
diff --git a/worker/go.sum b/worker/go.sum
index 6f0f29a..825fc9b 100644
--- a/worker/go.sum
+++ b/worker/go.sum
@@ -18,6 +18,36 @@ github.com/apache/spark-connect-go/v35 v35.0.0-20250317154112-ffd832059443 h1:pA
github.com/apache/spark-connect-go/v35 v35.0.0-20250317154112-ffd832059443/go.mod h1:ODlxb8YN0y/JyS7h+vhz+afnQ+beSkYTqDHYtg2T6E8=
github.com/apache/thrift v0.21.0 h1:tdPmh/ptjE1IJnhbhrcl2++TauVjy242rkV/UzJChnE=
github.com/apache/thrift v0.21.0/go.mod h1:W1H8aR/QRtYNvrPeFXBtobyRkd0/YVhTc6i07XIAgDw=
+github.com/aws/aws-sdk-go-v2 v1.41.7 h1:DWpAJt66FmnnaRIOT/8ASTucrvuDPZASqhhLey6tLY8=
+github.com/aws/aws-sdk-go-v2 v1.41.7/go.mod h1:4LAfZOPHNVNQEckOACQx60Y8pSRjIkNZQz1w92xpMJc=
+github.com/aws/aws-sdk-go-v2/config v1.32.17 h1:FpL4/758/diKwqbytU0prpuiu60fgXKUWCpDJtApclU=
+github.com/aws/aws-sdk-go-v2/config v1.32.17/go.mod h1:OXqUMzgXytfoF9JaKkhrOYsyh72t9G+MJH8mMRaexOE=
+github.com/aws/aws-sdk-go-v2/credentials v1.19.16 h1:r3RJBuU7X9ibt8RHbMjWE6y60QbKBiII6wSrXnapxSU=
+github.com/aws/aws-sdk-go-v2/credentials v1.19.16/go.mod h1:6cx7zqDENJDbBIIWX6P8s0h6hqHC8Avbjh9Dseo27ug=
+github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.18.23 h1:UuSfcORqNSz/ey3VPRS8TcVH2Ikf0/sC+Hdj400QI6U=
+github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.18.23/go.mod h1:+G/OSGiOFnSOkYloKj/9M35s74LgVAdJBSD5lsFfqKg=
+github.com/aws/aws-sdk-go-v2/internal/configsources v1.4.23 h1:GpT/TrnBYuE5gan2cZbTtvP+JlHsutdmlV2YfEyNde0=
+github.com/aws/aws-sdk-go-v2/internal/configsources v1.4.23/go.mod h1:xYWD6BS9ywC5bS3sz9Xh04whO/hzK2plt2Zkyrp4JuA=
+github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.7.23 h1:bpd8vxhlQi2r1hiueOw02f/duEPTMK59Q4QMAoTTtTo=
+github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.7.23/go.mod h1:15DfR2nw+CRHIk0tqNyifu3G1YdAOy68RftkhMDDwYk=
+github.com/aws/aws-sdk-go-v2/internal/v4a v1.4.24 h1:OQqn11BtaYv1WLUowvcA30MpzIu8Ti4pcLPIIyoKZrA=
+github.com/aws/aws-sdk-go-v2/internal/v4a v1.4.24/go.mod h1:X5ZJyfwVrWA96GzPmUCWFQaEARPR7gCrpq2E92PJwAE=
+github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.13.9 h1:FLudkZLt5ci0ozzgkVo8BJGwvqNaZbTWb3UcucAateA=
+github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.13.9/go.mod h1:w7wZ/s9qK7c8g4al+UyoF1Sp/Z45UwMGcqIzLWVQHWk=
+github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.13.23 h1:pbrxO/kuIwgEsOPLkaHu0O+m4fNgLU8B3vxQ+72jTPw=
+github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.13.23/go.mod h1:/CMNUqoj46HpS3MNRDEDIwcgEnrtZlKRaHNaHxIFpNA=
+github.com/aws/aws-sdk-go-v2/service/kms v1.51.1 h1:zuSf4olLKZW8cF/W9Y5wvGT+/0raY/3kVp49KsGs0QY=
+github.com/aws/aws-sdk-go-v2/service/kms v1.51.1/go.mod h1:Y0+uxvxz6ib4KktRdK0V4X45Vcs/JyYoz8H71pO8xeI=
+github.com/aws/aws-sdk-go-v2/service/signin v1.0.11 h1:TdJ+HdzOBhU8+iVAOGUTU63VXopcumCOF1paFulHWZc=
+github.com/aws/aws-sdk-go-v2/service/signin v1.0.11/go.mod h1:R82ZRExE/nheo0N+T8zHPcLRTcH8MGsnR3BiVGX0TwI=
+github.com/aws/aws-sdk-go-v2/service/sso v1.30.17 h1:7byT8HUWrgoRp6sXjxtZwgOKfhss5fW6SkLBtqzgRoE=
+github.com/aws/aws-sdk-go-v2/service/sso v1.30.17/go.mod h1:xNWknVi4Ezm1vg1QsB/5EWpAJURq22uqd38U8qKvOJc=
+github.com/aws/aws-sdk-go-v2/service/ssooidc v1.35.21 h1:+1Kl1zx6bWi4X7cKi3VYh29h8BvsCoHQEQ6ST9X8w7w=
+github.com/aws/aws-sdk-go-v2/service/ssooidc v1.35.21/go.mod h1:4vIRDq+CJB2xFAXZ+YgGUTiEft7oAQlhIs71xcSeuVg=
+github.com/aws/aws-sdk-go-v2/service/sts v1.42.1 h1:F/M5Y9I3nwr2IEpshZgh1GeHpOItExNM9L1euNuh/fk=
+github.com/aws/aws-sdk-go-v2/service/sts v1.42.1/go.mod h1:mTNxImtovCOEEuD65mKW7DCsL+2gjEH+RPEAexAzAio=
+github.com/aws/smithy-go v1.25.1 h1:J8ERsGSU7d+aCmdQur5Txg6bVoYelvQJgtZehD12GkI=
+github.com/aws/smithy-go v1.25.1/go.mod h1:YE2RhdIuDbA5E5bTdciG9KrW3+TiEONeUWCqxX9i1Fc=
github.com/cenkalti/backoff/v4 v4.3.0 h1:MyRJ/UdXutAwSAT+s3wNd7MfTIcy71VQueUuFK343L8=
github.com/cenkalti/backoff/v4 v4.3.0/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE=
github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs=
diff --git a/worker/utils/encryption.go b/worker/utils/encryption.go
new file mode 100644
index 0000000..87465a1
--- /dev/null
+++ b/worker/utils/encryption.go
@@ -0,0 +1,99 @@
+package utils
+
+import (
+ "context"
+ "crypto/aes"
+ "crypto/cipher"
+ "crypto/sha256"
+ "encoding/base64"
+ "encoding/json"
+ "errors"
+ "fmt"
+ "strings"
+
+ "github.com/aws/aws-sdk-go-v2/config"
+ "github.com/aws/aws-sdk-go-v2/service/kms"
+ "github.com/datazip-inc/olake-helm/worker/constants"
+ "github.com/spf13/viper"
+)
+
+// getSecretKey returns the AES key bytes and optionally a KMS client.
+//
+// - If OLAKE_SECRET_KEY is empty → encryption disabled (nil key, nil client)
+// - If OLAKE_SECRET_KEY starts with "arn:aws:kms:" → AWS KMS mode
+// - Otherwise → local AES-256-GCM (key is SHA-256 of the env value)
+func getSecretKey() ([]byte, *kms.Client, error) {
+ envKey := viper.GetString(constants.EnvSecretKey)
+ if strings.TrimSpace(envKey) == "" {
+ return []byte{}, nil, nil // Encryption is disabled
+ }
+
+ if strings.HasPrefix(envKey, "arn:aws:kms:") {
+ cfg, err := config.LoadDefaultConfig(context.Background())
+ if err != nil {
+ return nil, nil, fmt.Errorf("failed to load AWS config: %s", err)
+ }
+ return []byte(envKey), kms.NewFromConfig(cfg), nil
+ }
+
+ // Local AES-GCM mode — derive 256-bit key via SHA-256
+ hash := sha256.Sum256([]byte(envKey))
+ return hash[:], nil, nil
+}
+
+// Decrypt decrypts a value that was encrypted by the server's Encrypt function.
+// The stored format is a JSON-quoted, base64-encoded ciphertext (nonce prepended for AES-GCM).
+// If OLAKE_SECRET_KEY is not set, the raw encryptedText is returned unchanged.
+func Decrypt(encryptedText string) (string, error) {
+ if strings.TrimSpace(encryptedText) == "" {
+ return "", fmt.Errorf("cannot decrypt empty or whitespace-only input")
+ }
+
+ key, kmsClient, err := getSecretKey()
+ if err != nil || key == nil || len(key) == 0 {
+ return encryptedText, err
+ }
+
+ // The server stores the ciphertext as a JSON-quoted string (via fmt.Sprintf("%q", ...))
+ var decoded string
+ if err := json.Unmarshal([]byte(encryptedText), &decoded); err != nil {
+ return "", fmt.Errorf("failed to unmarshal JSON string: %s", err)
+ }
+
+ encryptedData, err := base64.StdEncoding.DecodeString(decoded)
+ if err != nil {
+ return "", fmt.Errorf("failed to decode base64 data: %s", err)
+ }
+
+ // AWS KMS path
+ if kmsClient != nil {
+ result, err := kmsClient.Decrypt(context.Background(), &kms.DecryptInput{
+ CiphertextBlob: encryptedData,
+ })
+ if err != nil {
+ return "", fmt.Errorf("failed to decrypt with KMS: %s", err)
+ }
+ return string(result.Plaintext), nil
+ }
+
+ // Local AES-256-GCM path
+ block, err := aes.NewCipher(key)
+ if err != nil {
+ return "", fmt.Errorf("failed to create cipher: %s", err)
+ }
+
+ gcm, err := cipher.NewGCM(block)
+ if err != nil {
+ return "", fmt.Errorf("failed to create GCM: %s", err)
+ }
+
+ if len(encryptedData) < gcm.NonceSize() {
+ return "", errors.New("ciphertext too short")
+ }
+
+ plaintext, err := gcm.Open(nil, encryptedData[:gcm.NonceSize()], encryptedData[gcm.NonceSize():], nil)
+ if err != nil {
+ return "", fmt.Errorf("failed to decrypt: %s", err)
+ }
+ return string(plaintext), nil
+}