diff --git a/core/redis-datastore/redis.go b/core/redis-datastore/redis.go index 5eedbc5..bcbdd47 100644 --- a/core/redis-datastore/redis.go +++ b/core/redis-datastore/redis.go @@ -13,11 +13,12 @@ type RedisDataStore struct { redisClient redis.UniversalClient } -func GetRedisDataStore(redisUri string, password string) (sdk.DataStore, error) { +func GetRedisDataStore(redisUri string, password string, db int) (sdk.DataStore, error) { ds := &RedisDataStore{} client := redis.NewClient(&redis.Options{ Addr: redisUri, Password: password, + DB: db, }) err := client.Ping().Err() if err != nil { diff --git a/core/redis-statestore/redis.go b/core/redis-statestore/redis.go index a8a6e60..3098a1f 100644 --- a/core/redis-statestore/redis.go +++ b/core/redis-statestore/redis.go @@ -19,12 +19,13 @@ type Incrementer interface { Incr(key string, value int64) (int64, error) } -func GetRedisStateStore(redisUri string, password string) (sdk.StateStore, error) { +func GetRedisStateStore(redisUri string, password string, db int) (sdk.StateStore, error) { stateStore := &RedisStateStore{} client := redis.NewClient(&redis.Options{ Addr: redisUri, Password: password, + DB: db, }) err := client.Ping().Err() diff --git a/dashboard/service.go b/dashboard/service.go index 0a41c03..c322d77 100644 --- a/dashboard/service.go +++ b/dashboard/service.go @@ -2,14 +2,15 @@ package main import ( "fmt" - "github.com/rs/xid" "log" + "os" + "strconv" + "strings" + "github.com/rs/xid" lib2 "github.com/s8sg/goflow/dashboard/lib" goflow3 "github.com/s8sg/goflow/v1" redis "gopkg.in/redis.v5" - "os" - "strings" ) var rdb *redis.Client @@ -115,6 +116,7 @@ func executeFlow(flow string, data []byte) (string, error) { fs := goflow3.FlowService{ RedisURL: getRedisAddr(), RedisPassword: getRedisPassword(), + RedisDB: getRedisDB(), } requestId := getNewId() @@ -136,6 +138,7 @@ func pauseRequest(flow string, requestID string) error { fs := &goflow3.FlowService{ RedisURL: getRedisAddr(), RedisPassword: getRedisPassword(), + RedisDB: getRedisDB(), } err := fs.Pause(flow, requestID) @@ -151,6 +154,7 @@ func resumeRequest(flow string, requestID string) error { fs := &goflow3.FlowService{ RedisURL: getRedisAddr(), RedisPassword: getRedisPassword(), + RedisDB: getRedisDB(), } err := fs.Resume(flow, requestID) @@ -166,6 +170,7 @@ func stopRequest(flow string, requestID string) error { fs := &goflow3.FlowService{ RedisURL: getRedisAddr(), RedisPassword: getRedisPassword(), + RedisDB: getRedisDB(), } err := fs.Stop(flow, requestID) @@ -179,11 +184,12 @@ func stopRequest(flow string, requestID string) error { func getRDB() *redis.Client { addr := getRedisAddr() password := getRedisPassword() + db := getRedisDB() if rdb == nil { rdb = redis.NewClient(&redis.Options{ Addr: addr, Password: password, - DB: 0, + DB: db, }) } return rdb @@ -202,6 +208,19 @@ func getRedisPassword() string { return addr } +func getRedisDB() int { + dbStr := os.Getenv("REDIS_DB") + if dbStr == "" { + return 0 + } + db, err := strconv.Atoi(dbStr) + if err != nil { + log.Printf("Failed get redisDB, %v", err) + return 0 + } + return db +} + func getNewId() string { guid := xid.New() return guid.String() diff --git a/runtime/flow_runtime.go b/runtime/flow_runtime.go index 82989a8..867ef4b 100644 --- a/runtime/flow_runtime.go +++ b/runtime/flow_runtime.go @@ -28,6 +28,7 @@ type FlowRuntime struct { OpenTracingUrl string RedisURL string RedisPassword string + RedisDB int stateStore sdk.StateStore DataStore sdk.DataStore Logger sdk.Logger @@ -87,16 +88,16 @@ func (fRuntime *FlowRuntime) Init() error { fRuntime.rdb = redis.NewClient(&redis.Options{ Addr: fRuntime.RedisURL, Password: fRuntime.RedisPassword, - DB: 0, + DB: fRuntime.RedisDB, }) - fRuntime.stateStore, err = initStateStore(fRuntime.RedisURL, fRuntime.RedisPassword) + fRuntime.stateStore, err = initStateStore(fRuntime.RedisURL, fRuntime.RedisPassword, fRuntime.RedisDB) if err != nil { return fmt.Errorf("failed to initialize the StateStore, %v", err) } if fRuntime.DataStore == nil { - fRuntime.DataStore, err = initDataStore(fRuntime.RedisURL, fRuntime.RedisPassword) + fRuntime.DataStore, err = initDataStore(fRuntime.RedisURL, fRuntime.RedisPassword, fRuntime.RedisDB) if err != nil { return fmt.Errorf("failed to initialize the StateStore, %v", err) } @@ -225,7 +226,7 @@ func OpenConnectionV2(tag string, network string, address string, password strin func (fRuntime *FlowRuntime) Execute(flowName string, request *runtime.Request) error { - connection, err := OpenConnectionV2("goflow", "tcp", fRuntime.RedisURL, fRuntime.RedisPassword, 0, nil) + connection, err := OpenConnectionV2("goflow", "tcp", fRuntime.RedisURL, fRuntime.RedisPassword, fRuntime.RedisDB, nil) if err != nil { return fmt.Errorf("failed to initiate connection, error %v", err) } @@ -251,7 +252,7 @@ func (fRuntime *FlowRuntime) Execute(flowName string, request *runtime.Request) } func (fRuntime *FlowRuntime) Pause(flowName string, request *runtime.Request) error { - connection, err := OpenConnectionV2("goflow", "tcp", fRuntime.RedisURL, fRuntime.RedisPassword, 0, nil) + connection, err := OpenConnectionV2("goflow", "tcp", fRuntime.RedisURL, fRuntime.RedisPassword, fRuntime.RedisDB, nil) if err != nil { return fmt.Errorf("failed to initiate connection, error %v", err) } @@ -276,7 +277,7 @@ func (fRuntime *FlowRuntime) Pause(flowName string, request *runtime.Request) er } func (fRuntime *FlowRuntime) Stop(flowName string, request *runtime.Request) error { - connection, err := OpenConnectionV2("goflow", "tcp", fRuntime.RedisURL, fRuntime.RedisPassword, 0, nil) + connection, err := OpenConnectionV2("goflow", "tcp", fRuntime.RedisURL, fRuntime.RedisPassword, fRuntime.RedisDB, nil) if err != nil { return fmt.Errorf("failed to initiate connection, error %v", err) } @@ -301,7 +302,7 @@ func (fRuntime *FlowRuntime) Stop(flowName string, request *runtime.Request) err } func (fRuntime *FlowRuntime) Resume(flowName string, request *runtime.Request) error { - connection, err := OpenConnectionV2("goflow", "tcp", fRuntime.RedisURL, fRuntime.RedisPassword, 0, nil) + connection, err := OpenConnectionV2("goflow", "tcp", fRuntime.RedisURL, fRuntime.RedisPassword, fRuntime.RedisDB, nil) if err != nil { return fmt.Errorf("failed to initiate connection, error %v", err) } diff --git a/runtime/init_data_store.go b/runtime/init_data_store.go index cf6557d..395877b 100644 --- a/runtime/init_data_store.go +++ b/runtime/init_data_store.go @@ -5,7 +5,7 @@ import ( "github.com/s8sg/goflow/core/sdk" ) -func initDataStore(redisURI string, password string) (dataStore sdk.DataStore, err error) { - dataStore, err = redisDataStore.GetRedisDataStore(redisURI, password) +func initDataStore(redisURI string, password string, db int) (dataStore sdk.DataStore, err error) { + dataStore, err = redisDataStore.GetRedisDataStore(redisURI, password, db) return dataStore, err } diff --git a/runtime/init_state_store.go b/runtime/init_state_store.go index e55ee33..c7173e0 100644 --- a/runtime/init_state_store.go +++ b/runtime/init_state_store.go @@ -5,7 +5,7 @@ import ( "github.com/s8sg/goflow/core/sdk" ) -func initStateStore(redisURI string, password string) (stateStore sdk.StateStore, err error) { - stateStore, err = redisStateStore.GetRedisStateStore(redisURI, password) +func initStateStore(redisURI string, password string, db int) (stateStore sdk.StateStore, err error) { + stateStore, err = redisStateStore.GetRedisStateStore(redisURI, password, db) return stateStore, err } diff --git a/v1/goflow.go b/v1/goflow.go index 8031fe2..89c1405 100644 --- a/v1/goflow.go +++ b/v1/goflow.go @@ -13,6 +13,7 @@ type FlowService struct { Port int RedisURL string RedisPassword string + RedisDB int RequestAuthSharedSecret string RequestAuthEnabled bool WorkerConcurrency int @@ -54,6 +55,7 @@ func (fs *FlowService) Execute(flowName string, req *Request) error { fs.runtime = &runtime.FlowRuntime{ RedisURL: fs.RedisURL, RedisPassword: fs.RedisPassword, + RedisDB: fs.RedisDB, RequestAuthEnabled: fs.RequestAuthEnabled, RequestAuthSharedSecret: fs.RequestAuthSharedSecret, } @@ -86,6 +88,7 @@ func (fs *FlowService) Pause(flowName string, requestId string) error { fs.runtime = &runtime.FlowRuntime{ RedisURL: fs.RedisURL, RedisPassword: fs.RedisPassword, + RedisDB: fs.RedisDB, RequestAuthEnabled: fs.RequestAuthEnabled, RequestAuthSharedSecret: fs.RequestAuthSharedSecret, } @@ -115,6 +118,7 @@ func (fs *FlowService) Resume(flowName string, requestId string) error { fs.runtime = &runtime.FlowRuntime{ RedisURL: fs.RedisURL, RedisPassword: fs.RedisPassword, + RedisDB: fs.RedisDB, RequestAuthEnabled: fs.RequestAuthEnabled, RequestAuthSharedSecret: fs.RequestAuthSharedSecret, } @@ -144,6 +148,7 @@ func (fs *FlowService) Stop(flowName string, requestId string) error { fs.runtime = &runtime.FlowRuntime{ RedisURL: fs.RedisURL, RedisPassword: fs.RedisPassword, + RedisDB: fs.RedisDB, RequestAuthEnabled: fs.RequestAuthEnabled, RequestAuthSharedSecret: fs.RequestAuthSharedSecret, } @@ -198,7 +203,6 @@ func (fs *FlowService) Register(flowName string, handler runtime.FlowDefinitionH func (fs *FlowService) Start() error { fs.ConfigureDefault() - errorChan := make(chan error) defer close(errorChan) @@ -216,7 +220,6 @@ func (fs *FlowService) Start() error { func (fs *FlowService) StartServer() error { fs.ConfigureDefault() - errorChan := make(chan error) defer close(errorChan) if err := fs.initRuntime(errorChan); err != nil { @@ -234,7 +237,6 @@ func (fs *FlowService) StartServer() error { func (fs *FlowService) StartWorker() error { fs.ConfigureDefault() - errorChan := make(chan error) defer close(errorChan)