diff --git a/internal/outgoing/dispatcher.go b/internal/outgoing/dispatcher.go index 715e87c..2673581 100644 --- a/internal/outgoing/dispatcher.go +++ b/internal/outgoing/dispatcher.go @@ -243,7 +243,17 @@ func (d *Dispatcher) processDelayedEvent(e models.IncomingEvent, timeRemain time // Check if store have payload dataKey := getEventDataKey(store, e, dest) storedData, dataExists, _, storeErr := store.Get(dataKey) + + dLogger.WithFields(log.Fields{ + "event": e.GetTraceInfo(), + "eventDataKey": dataKey, + }).Info("Data exists: " + strconv.FormatBool(dataExists) + ", stored data: " + storedData) + if storeErr != nil { + dLogger.WithFields(log.Fields{ + "event": e.GetTraceInfo(), + "eventDataKey": dataKey, + }).Error("Get stored data error: ", storeErr) panic(storeErr) } @@ -268,6 +278,10 @@ func (d *Dispatcher) processDelayedEvent(e models.IncomingEvent, timeRemain time queueKey := getEventThrottledPayloadsKey(store, e, dest) jsonString, jsonErr := json.Marshal(customizedPayload) if jsonErr != nil { + dLogger.WithFields(log.Fields{ + "queueKey": queueKey, + "event": e.GetTraceInfo(), + }).Error("Marshal throttled payload error") panic(jsonErr) } dLogger.WithFields(log.Fields{ @@ -283,6 +297,10 @@ func (d *Dispatcher) processDelayedEvent(e models.IncomingEvent, timeRemain time queueKey := getEventThrottledDocumentsKey(store, e, dest) jsonString, jsonErr := json.Marshal(customizedDocument) if jsonErr != nil { + dLogger.WithFields(log.Fields{ + "queueKey": queueKey, + "event": e.GetTraceInfo(), + }).Error("Marshal throttled document error") panic(jsonErr) } dLogger.WithFields(log.Fields{ @@ -295,6 +313,10 @@ func (d *Dispatcher) processDelayedEvent(e models.IncomingEvent, timeRemain time jsonString, jsonErr := json.Marshal(e) if jsonErr != nil { + dLogger.WithFields(log.Fields{ + "event": e.GetTraceInfo(), + "enqueuePayload": jsonString, + }).Error("Marshal event error") panic(jsonErr) } @@ -302,20 +324,31 @@ func (d *Dispatcher) processDelayedEvent(e models.IncomingEvent, timeRemain time // Update Value _, updateErr := store.Update(dataKey, string(jsonString)) if updateErr != nil { + dLogger.WithFields(log.Fields{ + "event": e.GetTraceInfo(), + "dataKey": dataKey, + }).Error("Update payload error: ", updateErr) panic(updateErr) } } else { // Create Value config := dest.Config + dLogger.WithFields(log.Fields{"key": dataKey, "event": e.GetTraceInfo(), "payload": string(jsonString)}).Info("Store data for delayed event") _, saveErr := store.Set(dataKey, string(jsonString), config.GetThrottleValue()*2) if saveErr != nil { + dLogger.WithFields(log.Fields{ + "event": e.GetTraceInfo(), + "dataKey": dataKey, + }).Error("Save config error: ", saveErr) panic(saveErr) } // Schedule send event later + dLogger.WithFields(log.Fields{"key": dataKey, "event": e.GetTraceInfo(), "timeRemain": timeRemain}).Info("AfterFunc") dispatcher.TrackAfterFuncJob(timeRemain, func() { dLogger.WithFields(log.Fields{"key": dataKey}).Debug("After event callback") - payload, _, _, _ := store.Get(dataKey) + payload, exists, ttl, err := store.Get(dataKey) + dLogger.WithFields(log.Fields{"key": dataKey, "event": e.GetTraceInfo(), "payload": payload, "key_exists": exists, "ttl": ttl, "err": err}).Info("Fetch data for delayed event") event := models.IncomingEvent{} json.Unmarshal([]byte(payload), &event) d.sendEvent(event, dest, store, documentStore) @@ -422,6 +455,7 @@ func (d *Dispatcher) sendEvent(evt models.IncomingEvent, destination models.Dest _sendEvent := func() { defer func() { if err := recover(); err != nil { + callbackLogger.Error(fmt.Sprintf("Dispatcher Error: %s", err)) d.OnError(evt, &captin_errors.DispatcherError{ Msg: err.(error).Error(), Destination: destination, @@ -434,6 +468,7 @@ func (d *Dispatcher) sendEvent(evt models.IncomingEvent, destination models.Dest event := deepcopy.Copy(evt).(models.IncomingEvent) err := sender.SendEvent(event, destination) if err != nil { + callbackLogger.Error(fmt.Sprintf("Send event failed, %s", err)) panic(err) } callbackLogger.Info(fmt.Sprintf("Event successfully sent to %s [%s]", config.GetName(), destination.GetCallbackURL()))