Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 36 additions & 1 deletion internal/outgoing/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,17 @@ func (d *Dispatcher) processDelayedEvent(e models.IncomingEvent, timeRemain time
// Check if store have payload
dataKey := getEventDataKey(store, e, dest)
storedData, dataExists, _, storeErr := store.Get(dataKey)

dLogger.WithFields(log.Fields{
"event": e.GetTraceInfo(),
"eventDataKey": dataKey,
}).Info("Data exists: " + strconv.FormatBool(dataExists) + ", stored data: " + storedData)

if storeErr != nil {
dLogger.WithFields(log.Fields{
"event": e.GetTraceInfo(),
"eventDataKey": dataKey,
}).Error("Get stored data error: ", storeErr)
panic(storeErr)
}

Expand All @@ -268,6 +278,10 @@ func (d *Dispatcher) processDelayedEvent(e models.IncomingEvent, timeRemain time
queueKey := getEventThrottledPayloadsKey(store, e, dest)
jsonString, jsonErr := json.Marshal(customizedPayload)
if jsonErr != nil {
dLogger.WithFields(log.Fields{
"queueKey": queueKey,
"event": e.GetTraceInfo(),
}).Error("Marshal throttled payload error")
panic(jsonErr)
}
dLogger.WithFields(log.Fields{
Expand All @@ -283,6 +297,10 @@ func (d *Dispatcher) processDelayedEvent(e models.IncomingEvent, timeRemain time
queueKey := getEventThrottledDocumentsKey(store, e, dest)
jsonString, jsonErr := json.Marshal(customizedDocument)
if jsonErr != nil {
dLogger.WithFields(log.Fields{
"queueKey": queueKey,
"event": e.GetTraceInfo(),
}).Error("Marshal throttled document error")
panic(jsonErr)
}
dLogger.WithFields(log.Fields{
Expand All @@ -295,27 +313,42 @@ func (d *Dispatcher) processDelayedEvent(e models.IncomingEvent, timeRemain time

jsonString, jsonErr := json.Marshal(e)
if jsonErr != nil {
dLogger.WithFields(log.Fields{
"event": e.GetTraceInfo(),
"enqueuePayload": jsonString,
}).Error("Marshal event error")
panic(jsonErr)
}

if dataExists {
// Update Value
_, updateErr := store.Update(dataKey, string(jsonString))
if updateErr != nil {
dLogger.WithFields(log.Fields{
"event": e.GetTraceInfo(),
"dataKey": dataKey,
}).Error("Update payload error: ", updateErr)
panic(updateErr)
}
} else {
// Create Value
config := dest.Config
dLogger.WithFields(log.Fields{"key": dataKey, "event": e.GetTraceInfo(), "payload": string(jsonString)}).Info("Store data for delayed event")
_, saveErr := store.Set(dataKey, string(jsonString), config.GetThrottleValue()*2)
if saveErr != nil {
dLogger.WithFields(log.Fields{
"event": e.GetTraceInfo(),
"dataKey": dataKey,
}).Error("Save config error: ", saveErr)
panic(saveErr)
}

// Schedule send event later
dLogger.WithFields(log.Fields{"key": dataKey, "event": e.GetTraceInfo(), "timeRemain": timeRemain}).Info("AfterFunc")
dispatcher.TrackAfterFuncJob(timeRemain, func() {
dLogger.WithFields(log.Fields{"key": dataKey}).Debug("After event callback")
payload, _, _, _ := store.Get(dataKey)
payload, exists, ttl, err := store.Get(dataKey)
dLogger.WithFields(log.Fields{"key": dataKey, "event": e.GetTraceInfo(), "payload": payload, "key_exists": exists, "ttl": ttl, "err": err}).Info("Fetch data for delayed event")
event := models.IncomingEvent{}
json.Unmarshal([]byte(payload), &event)
d.sendEvent(event, dest, store, documentStore)
Expand Down Expand Up @@ -422,6 +455,7 @@ func (d *Dispatcher) sendEvent(evt models.IncomingEvent, destination models.Dest
_sendEvent := func() {
defer func() {
if err := recover(); err != nil {
callbackLogger.Error(fmt.Sprintf("Dispatcher Error: %s", err))
d.OnError(evt, &captin_errors.DispatcherError{
Msg: err.(error).Error(),
Destination: destination,
Expand All @@ -434,6 +468,7 @@ func (d *Dispatcher) sendEvent(evt models.IncomingEvent, destination models.Dest
event := deepcopy.Copy(evt).(models.IncomingEvent)
err := sender.SendEvent(event, destination)
if err != nil {
callbackLogger.Error(fmt.Sprintf("Send event failed, %s", err))
panic(err)
}
callbackLogger.Info(fmt.Sprintf("Event successfully sent to %s [%s]", config.GetName(), destination.GetCallbackURL()))
Expand Down