From 221ab87f0e870ccb4b6cce2b45e64b857e9a60f9 Mon Sep 17 00:00:00 2001 From: wood Date: Thu, 14 Apr 2022 15:25:06 +0800 Subject: [PATCH 1/6] chore: add logs for bug trace --- internal/outgoing/dispatcher.go | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/internal/outgoing/dispatcher.go b/internal/outgoing/dispatcher.go index 04e0f5f..46998de 100644 --- a/internal/outgoing/dispatcher.go +++ b/internal/outgoing/dispatcher.go @@ -6,12 +6,12 @@ import ( "strconv" "time" + destination_filters "github.com/shoplineapp/captin/destinations/filters" captin_errors "github.com/shoplineapp/captin/errors" interfaces "github.com/shoplineapp/captin/interfaces" - destination_filters "github.com/shoplineapp/captin/destinations/filters" - models "github.com/shoplineapp/captin/models" documentStores "github.com/shoplineapp/captin/internal/document_stores" helpers "github.com/shoplineapp/captin/internal/helpers" + models "github.com/shoplineapp/captin/models" log "github.com/sirupsen/logrus" ) @@ -269,6 +269,7 @@ func (d *Dispatcher) processDelayedEvent(e models.IncomingEvent, timeRemain time } else { // Create Value config := dest.Config + dLogger.WithFields(log.Fields{"key": dataKey, "event": e.GetTraceInfo(), "payload": string(jsonString)}).Info("Store data for delayed event") _, saveErr := store.Set(dataKey, string(jsonString), config.GetThrottleValue()*2) if saveErr != nil { panic(saveErr) @@ -278,6 +279,7 @@ func (d *Dispatcher) processDelayedEvent(e models.IncomingEvent, timeRemain time time.AfterFunc(timeRemain, func() { dLogger.WithFields(log.Fields{"key": dataKey}).Debug("After event callback") payload, _, _, _ := store.Get(dataKey) + dLogger.WithFields(log.Fields{"key": dataKey, "event": e.GetTraceInfo(), "payload": payload}).Info("Fetch data for delayed event") event := models.IncomingEvent{} json.Unmarshal([]byte(payload), &event) d.sendEvent(event, dest, store, documentStore) From 53405a45d42c5e9b5afb383fde4a0971a09b824a Mon Sep 17 00:00:00 2001 From: wood Date: Thu, 14 Apr 2022 19:08:56 +0800 Subject: [PATCH 2/6] chore: log delay send event time remain --- internal/outgoing/dispatcher.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/internal/outgoing/dispatcher.go b/internal/outgoing/dispatcher.go index 46998de..75634a2 100644 --- a/internal/outgoing/dispatcher.go +++ b/internal/outgoing/dispatcher.go @@ -269,7 +269,7 @@ func (d *Dispatcher) processDelayedEvent(e models.IncomingEvent, timeRemain time } else { // Create Value config := dest.Config - dLogger.WithFields(log.Fields{"key": dataKey, "event": e.GetTraceInfo(), "payload": string(jsonString)}).Info("Store data for delayed event") + dLogger.WithFields(log.Fields{"key": dataKey, "event": e.GetTraceInfo(), "payload": string(jsonString), "timeRemain": timeRemain}).Info("Store data for delayed event") _, saveErr := store.Set(dataKey, string(jsonString), config.GetThrottleValue()*2) if saveErr != nil { panic(saveErr) @@ -279,7 +279,7 @@ func (d *Dispatcher) processDelayedEvent(e models.IncomingEvent, timeRemain time time.AfterFunc(timeRemain, func() { dLogger.WithFields(log.Fields{"key": dataKey}).Debug("After event callback") payload, _, _, _ := store.Get(dataKey) - dLogger.WithFields(log.Fields{"key": dataKey, "event": e.GetTraceInfo(), "payload": payload}).Info("Fetch data for delayed event") + dLogger.WithFields(log.Fields{"key": dataKey, "event": e.GetTraceInfo(), "payload": payload, "timeRemain": timeRemain}).Info("Fetch data for delayed event") event := models.IncomingEvent{} json.Unmarshal([]byte(payload), &event) d.sendEvent(event, dest, store, documentStore) From aff75210a4d924530c0bbcb57dd1de5711a5c814 Mon Sep 17 00:00:00 2001 From: Terence Wu Date: Thu, 14 Apr 2022 19:09:05 +0800 Subject: [PATCH 3/6] chore(dispatcher): add log --- internal/outgoing/dispatcher.go | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) diff --git a/internal/outgoing/dispatcher.go b/internal/outgoing/dispatcher.go index 75634a2..fc6881f 100644 --- a/internal/outgoing/dispatcher.go +++ b/internal/outgoing/dispatcher.go @@ -180,7 +180,7 @@ func (d *Dispatcher) customizeDocument(e *models.IncomingEvent, destination mode } } -func (d *Dispatcher) customizePayload(e models.IncomingEvent, destination interfaces.DestinationInterface) map[string]interface{} { +func (d *Dispatcher) customizePayload(e models.IncomingEvent, destination interfaces.DestinationInterface) map[string]interface{} { config := destination.(models.Destination).Config if len(config.GetIncludePayloadAttrs()) >= 1 { return helpers.IncludeFields(e.Payload, config.GetIncludePayloadAttrs()).(map[string]interface{}) @@ -233,8 +233,8 @@ func (d *Dispatcher) processDelayedEvent(e models.IncomingEvent, timeRemain time panic(jsonErr) } dLogger.WithFields(log.Fields{ - "queueKey": queueKey, - "event": e.GetTraceInfo(), + "queueKey": queueKey, + "event": e.GetTraceInfo(), "enqueuePayload": jsonString, }).Debug("Storing throttled payload") store.Enqueue(queueKey, string(jsonString), dest.Config.GetThrottleValue()*2) @@ -248,8 +248,8 @@ func (d *Dispatcher) processDelayedEvent(e models.IncomingEvent, timeRemain time panic(jsonErr) } dLogger.WithFields(log.Fields{ - "queueKey": queueKey, - "event": e.GetTraceInfo(), + "queueKey": queueKey, + "event": e.GetTraceInfo(), "enqueueDocument": jsonString, }).Debug("Storing throttled document") store.Enqueue(queueKey, string(jsonString), dest.Config.GetThrottleValue()*2) @@ -269,17 +269,18 @@ func (d *Dispatcher) processDelayedEvent(e models.IncomingEvent, timeRemain time } else { // Create Value config := dest.Config - dLogger.WithFields(log.Fields{"key": dataKey, "event": e.GetTraceInfo(), "payload": string(jsonString), "timeRemain": timeRemain}).Info("Store data for delayed event") + dLogger.WithFields(log.Fields{"key": dataKey, "event": e.GetTraceInfo(), "payload": string(jsonString)}).Info("Store data for delayed event") _, saveErr := store.Set(dataKey, string(jsonString), config.GetThrottleValue()*2) if saveErr != nil { panic(saveErr) } // Schedule send event later + dLogger.WithFields(log.Fields{"key": dataKey, "event": e.GetTraceInfo(), "timeRemain": timeRemain}).Info("AfterFunc") time.AfterFunc(timeRemain, func() { dLogger.WithFields(log.Fields{"key": dataKey}).Debug("After event callback") - payload, _, _, _ := store.Get(dataKey) - dLogger.WithFields(log.Fields{"key": dataKey, "event": e.GetTraceInfo(), "payload": payload, "timeRemain": timeRemain}).Info("Fetch data for delayed event") + payload, exists, ttl, err := store.Get(dataKey) + dLogger.WithFields(log.Fields{"key": dataKey, "event": e.GetTraceInfo(), "payload": payload, "key_exists": exists, "ttl": ttl, "err": err}).Info("Fetch data for delayed event") event := models.IncomingEvent{} json.Unmarshal([]byte(payload), &event) d.sendEvent(event, dest, store, documentStore) From ecdf1ea4e5896fcf9a0ed86ad143774c29a2fd7d Mon Sep 17 00:00:00 2001 From: Ken Lee Date: Mon, 16 May 2022 16:56:13 +0800 Subject: [PATCH 4/6] feat(add debug messages): add log for error cases --- internal/outgoing/dispatcher.go | 27 +++++++++++++++++++++++++++ 1 file changed, 27 insertions(+) diff --git a/internal/outgoing/dispatcher.go b/internal/outgoing/dispatcher.go index fc6881f..6070ebd 100644 --- a/internal/outgoing/dispatcher.go +++ b/internal/outgoing/dispatcher.go @@ -96,6 +96,7 @@ func (d *Dispatcher) Dispatch( responses <- 0 d.processDelayedEvent(e, timeRemain, destination, store, documentStore) } else { + dLogger.WithFields(log.Fields{"event": e.GetTraceInfo(), "destination": destination}).Debug("Cannot trigger send event") responses <- 0 } } @@ -206,6 +207,10 @@ func (d *Dispatcher) processDelayedEvent(e models.IncomingEvent, timeRemain time dataKey := getEventDataKey(store, e, dest) storedData, dataExists, _, storeErr := store.Get(dataKey) if storeErr != nil { + dLogger.WithFields(log.Fields{ + "event": e.GetTraceInfo(), + "eventDataKey": dataKey, + }).Error("Get stored data error: ", storeErr) panic(storeErr) } @@ -230,6 +235,10 @@ func (d *Dispatcher) processDelayedEvent(e models.IncomingEvent, timeRemain time queueKey := getEventThrottledPayloadsKey(store, e, dest) jsonString, jsonErr := json.Marshal(customizedPayload) if jsonErr != nil { + dLogger.WithFields(log.Fields{ + "queueKey": queueKey, + "event": e.GetTraceInfo(), + }).Error("Marshal throttled payload error") panic(jsonErr) } dLogger.WithFields(log.Fields{ @@ -245,6 +254,10 @@ func (d *Dispatcher) processDelayedEvent(e models.IncomingEvent, timeRemain time queueKey := getEventThrottledDocumentsKey(store, e, dest) jsonString, jsonErr := json.Marshal(customizedDocument) if jsonErr != nil { + dLogger.WithFields(log.Fields{ + "queueKey": queueKey, + "event": e.GetTraceInfo(), + }).Error("Marshal throttled document error") panic(jsonErr) } dLogger.WithFields(log.Fields{ @@ -257,6 +270,10 @@ func (d *Dispatcher) processDelayedEvent(e models.IncomingEvent, timeRemain time jsonString, jsonErr := json.Marshal(e) if jsonErr != nil { + dLogger.WithFields(log.Fields{ + "event": e.GetTraceInfo(), + "enqueuePayload": jsonString, + }).Error("Marshal event error") panic(jsonErr) } @@ -264,6 +281,10 @@ func (d *Dispatcher) processDelayedEvent(e models.IncomingEvent, timeRemain time // Update Value _, updateErr := store.Update(dataKey, string(jsonString)) if updateErr != nil { + dLogger.WithFields(log.Fields{ + "event": e.GetTraceInfo(), + "dataKey": dataKey, + }).Error("Update payload error: ", updateErr) panic(updateErr) } } else { @@ -272,6 +293,10 @@ func (d *Dispatcher) processDelayedEvent(e models.IncomingEvent, timeRemain time dLogger.WithFields(log.Fields{"key": dataKey, "event": e.GetTraceInfo(), "payload": string(jsonString)}).Info("Store data for delayed event") _, saveErr := store.Set(dataKey, string(jsonString), config.GetThrottleValue()*2) if saveErr != nil { + dLogger.WithFields(log.Fields{ + "event": e.GetTraceInfo(), + "dataKey": dataKey, + }).Error("Save config error: ", saveErr) panic(saveErr) } @@ -388,6 +413,7 @@ func (d *Dispatcher) sendEvent(evt models.IncomingEvent, destination models.Dest _sendEvent := func() { defer func() { if err := recover(); err != nil { + callbackLogger.Error(fmt.Sprintf("Dispatcher Error: %s", err)) d.Errors = append(d.Errors, &captin_errors.DispatcherError{ Msg: fmt.Sprintf("%+v", err), Destination: destination, @@ -399,6 +425,7 @@ func (d *Dispatcher) sendEvent(evt models.IncomingEvent, destination models.Dest err := sender.SendEvent(evt, destination) if err != nil { + callbackLogger.Error(fmt.Sprintf("Send event failed, %s", err)) panic(err) return } From 272aee9589a1d9c1c5ecee34532f5525e19fe859 Mon Sep 17 00:00:00 2001 From: Ken Lee Date: Fri, 20 May 2022 12:49:04 +0800 Subject: [PATCH 5/6] feat(add debug messages): add log before error and time remaining before resend --- internal/outgoing/dispatcher.go | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/internal/outgoing/dispatcher.go b/internal/outgoing/dispatcher.go index 6070ebd..631bd83 100644 --- a/internal/outgoing/dispatcher.go +++ b/internal/outgoing/dispatcher.go @@ -206,6 +206,12 @@ func (d *Dispatcher) processDelayedEvent(e models.IncomingEvent, timeRemain time // Check if store have payload dataKey := getEventDataKey(store, e, dest) storedData, dataExists, _, storeErr := store.Get(dataKey) + + dLogger.WithFields(log.Fields{ + "event": e.GetTraceInfo(), + "eventDataKey": dataKey, + }).Info("Data exists: " + strconv.FormatBool(dataExists) + ", stored data: " + storedData) + if storeErr != nil { dLogger.WithFields(log.Fields{ "event": e.GetTraceInfo(), From 3f8b0f3b094977be6ca5dda3153f5eb539ee10eb Mon Sep 17 00:00:00 2001 From: Ken Lee Date: Fri, 20 May 2022 12:49:50 +0800 Subject: [PATCH 6/6] feat(add debug messages): update logging level --- internal/outgoing/dispatcher.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/outgoing/dispatcher.go b/internal/outgoing/dispatcher.go index 631bd83..70e6691 100644 --- a/internal/outgoing/dispatcher.go +++ b/internal/outgoing/dispatcher.go @@ -96,7 +96,7 @@ func (d *Dispatcher) Dispatch( responses <- 0 d.processDelayedEvent(e, timeRemain, destination, store, documentStore) } else { - dLogger.WithFields(log.Fields{"event": e.GetTraceInfo(), "destination": destination}).Debug("Cannot trigger send event") + dLogger.WithFields(log.Fields{"event": e.GetTraceInfo(), "destination": destination}).Info("Cannot trigger send event") responses <- 0 } }