Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions .semaphore/semaphore.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ blocks:
value: "1"
prologue:
commands:
- sem-version go 1.15
- sem-version go 1.22
# Go project boiler plate
- export "SEMAPHORE_GIT_DIR=/tmp/${SEMAPHORE_PROJECT_NAME}"
- export "PATH=$(go env GOPATH)/bin:${PATH}"
Expand All @@ -29,7 +29,7 @@ blocks:
task:
prologue:
commands:
- sem-version go 1.15
- sem-version go 1.22
# Go project boiler plate
- export "SEMAPHORE_GIT_DIR=/tmp/${SEMAPHORE_PROJECT_NAME}"
- export "PATH=$(go env GOPATH)/bin:${PATH}"
Expand All @@ -45,4 +45,4 @@ blocks:
jobs:
- name: Suite
commands:
- make test
- make test
7 changes: 4 additions & 3 deletions cmd/captin/main.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
package main

import (
"context"
"os"
"os/signal"
"path/filepath"
"syscall"
"time"

core "github.com/shoplineapp/captin/core"
models "github.com/shoplineapp/captin/models"
core "github.com/shoplineapp/captin/v2/core"
models "github.com/shoplineapp/captin/v2/models"
log "github.com/sirupsen/logrus"
)

Expand All @@ -31,7 +32,7 @@ func main() {

go func() {
for enabled && captin.IsRunning() != true {
captin.Execute(models.IncomingEvent{
captin.Execute(context.Background(), models.IncomingEvent{
Key: "product.update",
Source: "core",
Payload: map[string]interface{}{"field1": 1},
Expand Down
29 changes: 15 additions & 14 deletions core/captin.go
Original file line number Diff line number Diff line change
@@ -1,19 +1,20 @@
package core

import (
"context"
"fmt"

destination_filters "github.com/shoplineapp/captin/destinations/filters"
d "github.com/shoplineapp/captin/dispatcher"
interfaces "github.com/shoplineapp/captin/interfaces"
outgoing "github.com/shoplineapp/captin/internal/outgoing"
models "github.com/shoplineapp/captin/models"
senders "github.com/shoplineapp/captin/senders"

captin_errors "github.com/shoplineapp/captin/errors"
documentStores "github.com/shoplineapp/captin/internal/document_stores"
stores "github.com/shoplineapp/captin/internal/stores"
throttles "github.com/shoplineapp/captin/internal/throttles"
destination_filters "github.com/shoplineapp/captin/v2/destinations/filters"
d "github.com/shoplineapp/captin/v2/dispatcher"
interfaces "github.com/shoplineapp/captin/v2/interfaces"
outgoing "github.com/shoplineapp/captin/v2/internal/outgoing"
models "github.com/shoplineapp/captin/v2/models"
senders "github.com/shoplineapp/captin/v2/senders"

captin_errors "github.com/shoplineapp/captin/v2/errors"
documentStores "github.com/shoplineapp/captin/v2/internal/document_stores"
stores "github.com/shoplineapp/captin/v2/internal/stores"
throttles "github.com/shoplineapp/captin/v2/internal/throttles"
log "github.com/sirupsen/logrus"
)

Expand Down Expand Up @@ -116,7 +117,7 @@ func (c Captin) IsRunning() bool {
}

// Execute - Execute for events
func (c *Captin) Execute(ie interfaces.IncomingEventInterface) (bool, []interfaces.ErrorInterface) {
func (c *Captin) Execute(ctx context.Context, ie interfaces.IncomingEventInterface) (bool, []interfaces.ErrorInterface) {
c.Status = STATUS_RUNNING

e := ie.(models.IncomingEvent)
Expand All @@ -131,7 +132,7 @@ func (c *Captin) Execute(ie interfaces.IncomingEventInterface) (bool, []interfac
destinations = append(destinations, models.Destination{Config: config})
}

destinations = outgoing.Custom{}.Sift(&e, destinations, c.filters, c.middlewares)
destinations = outgoing.Custom{}.Sift(ctx, &e, destinations, c.filters, c.middlewares)
cLogger.WithFields(log.Fields{
"event": e,
"destinations": destinations,
Expand All @@ -143,7 +144,7 @@ func (c *Captin) Execute(ie interfaces.IncomingEventInterface) (bool, []interfac
dispatcher.SetMiddlewares(c.dispatchMiddlewares)
dispatcher.SetErrorHandler(c.dispatchErrorHandler)
dispatcher.SetDelayer(c.dispatchDelayer)
dispatcher.Dispatch(e, c.store, c.throttler, c.DocumentStoreMapping)
dispatcher.Dispatch(ctx, e, c.store, c.throttler, c.DocumentStoreMapping)

errors := dispatcher.GetErrors()

Expand Down
10 changes: 6 additions & 4 deletions destinations/filters/base.go
Original file line number Diff line number Diff line change
@@ -1,16 +1,18 @@
package destination_filters

import (
models "github.com/shoplineapp/captin/models"
"context"

models "github.com/shoplineapp/captin/v2/models"
)

// DestinationMiddleware - Interface for third-party application to add extra handling on destinations
type DestinationMiddlewareInterface interface {
Apply(e *models.IncomingEvent, d []models.Destination) []models.Destination
Apply(ctx context.Context, e *models.IncomingEvent, d []models.Destination) []models.Destination
}

// DestinationFilter - Interface for third-party application to filter destination by event
type DestinationFilterInterface interface {
Run(e models.IncomingEvent, c models.Destination) (bool, error)
Applicable(e models.IncomingEvent, c models.Destination) bool
Run(ctx context.Context, e models.IncomingEvent, c models.Destination) (bool, error)
Applicable(ctx context.Context, e models.IncomingEvent, c models.Destination) bool
}
14 changes: 8 additions & 6 deletions destinations/filters/desired_hook.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
package destination_filters

import (
models "github.com/shoplineapp/captin/models"
"context"

models "github.com/shoplineapp/captin/v2/models"
)

func isPresent(str string, list []string) bool {
Expand All @@ -21,13 +23,13 @@ func stringList(list []interface{}) []string {
return sList
}

var _ DestinationFilterInterface = DesiredHookFilter{}

// DesiredHookFilter - Filter destination if given event has desired destination
type DesiredHookFilter struct {
DestinationFilterInterface
}
type DesiredHookFilter struct{}

// Run - Get desired hooks in control and filter out exclusion
func (f DesiredHookFilter) Run(e models.IncomingEvent, d models.Destination) (bool, error) {
func (f DesiredHookFilter) Run(ctx context.Context, e models.IncomingEvent, d models.Destination) (bool, error) {
hook := d.Config.GetName()
list := e.Control["desired_hooks"]
switch list.(type) {
Expand All @@ -42,6 +44,6 @@ func (f DesiredHookFilter) Run(e models.IncomingEvent, d models.Destination) (bo
}

// Applicable - Check if desired hooks is present
func (f DesiredHookFilter) Applicable(e models.IncomingEvent, d models.Destination) bool {
func (f DesiredHookFilter) Applicable(ctx context.Context, e models.IncomingEvent, d models.Destination) bool {
return e.Control["desired_hooks"] != nil
}
14 changes: 8 additions & 6 deletions destinations/filters/environment.go
Original file line number Diff line number Diff line change
@@ -1,18 +1,20 @@
package destination_filters

import (
models "github.com/shoplineapp/captin/models"
"context"

models "github.com/shoplineapp/captin/v2/models"
log "github.com/sirupsen/logrus"
)

var eLogger = log.WithFields(log.Fields{"class": "EnvironmentFilter"})

type EnvironmentFilter struct {
DestinationFilterInterface
}
var _ DestinationFilterInterface = EnvironmentFilter{}

type EnvironmentFilter struct{}

// Destination needs to be enabled by ENV Variable {Config Name}_ENABLED, e.g, WAPOS_SYNC_ENABLED
func (f EnvironmentFilter) Run(e models.IncomingEvent, d models.Destination) (bool, error) {
func (f EnvironmentFilter) Run(ctx context.Context, e models.IncomingEvent, d models.Destination) (bool, error) {
variableName, value := d.Config.GetByEnv("enabled")
isEnabled := value != "false"

Expand All @@ -23,6 +25,6 @@ func (f EnvironmentFilter) Run(e models.IncomingEvent, d models.Destination) (bo
return isEnabled, nil
}

func (f EnvironmentFilter) Applicable(e models.IncomingEvent, d models.Destination) bool {
func (f EnvironmentFilter) Applicable(ctx context.Context, e models.IncomingEvent, d models.Destination) bool {
return true
}
14 changes: 8 additions & 6 deletions destinations/filters/source.go
Original file line number Diff line number Diff line change
@@ -1,17 +1,19 @@
package destination_filters

import (
models "github.com/shoplineapp/captin/models"
"context"

models "github.com/shoplineapp/captin/v2/models"
)

type SourceFilter struct {
DestinationFilterInterface
}
var _ DestinationFilterInterface = SourceFilter{}

type SourceFilter struct{}

func (f SourceFilter) Run(e models.IncomingEvent, d models.Destination) (bool, error) {
func (f SourceFilter) Run(ctx context.Context, e models.IncomingEvent, d models.Destination) (bool, error) {
return e.Source != d.Config.GetSource(), nil
}

func (f SourceFilter) Applicable(e models.IncomingEvent, d models.Destination) bool {
func (f SourceFilter) Applicable(ctx context.Context, e models.IncomingEvent, d models.Destination) bool {
return d.Config.GetAllowLoopback() == false
}
11 changes: 7 additions & 4 deletions destinations/filters/validate.go
Original file line number Diff line number Diff line change
@@ -1,20 +1,23 @@
package destination_filters

import (
"context"
"encoding/json"
"fmt"

"github.com/robertkrimen/otto"
models "github.com/shoplineapp/captin/models"
models "github.com/shoplineapp/captin/v2/models"
log "github.com/sirupsen/logrus"
)

var vLogger = log.WithFields(log.Fields{"class": "ValidateFilter"})

var _ DestinationFilterInterface = ValidateFilter{}

type ValidateFilter struct {
DestinationFilterInterface
}

func (f ValidateFilter) Run(e models.IncomingEvent, d models.Destination) (bool, error) {
func (f ValidateFilter) Run(ctx context.Context, e models.IncomingEvent, d models.Destination) (bool, error) {
payloadJson, _ := json.Marshal(e.Payload)
configJson, _ := json.Marshal(d.Config)
template := fmt.Sprintf(
Expand All @@ -39,6 +42,6 @@ func (f ValidateFilter) Run(e models.IncomingEvent, d models.Destination) (bool,
return valid, err
}

func (f ValidateFilter) Applicable(e models.IncomingEvent, d models.Destination) bool {
func (f ValidateFilter) Applicable(ctx context.Context, e models.IncomingEvent, d models.Destination) bool {
return (d.Config.GetValidate()) != ""
}
15 changes: 8 additions & 7 deletions dispatcher/delayers/goroutine.go
Original file line number Diff line number Diff line change
@@ -1,23 +1,24 @@
package dispatcher_delayers

import (
"context"
"fmt"

"time"

"github.com/shoplineapp/captin/dispatcher"
"github.com/shoplineapp/captin/interfaces"
"github.com/shoplineapp/captin/models"
"github.com/shoplineapp/captin/v2/dispatcher"
"github.com/shoplineapp/captin/v2/interfaces"
"github.com/shoplineapp/captin/v2/models"
log "github.com/sirupsen/logrus"
)

type GoroutineDelayer struct {
interfaces.DispatchDelayerInterface
}
var _ interfaces.DispatchDelayerInterface = GoroutineDelayer{}

type GoroutineDelayer struct{}

var dLogger = log.WithFields(log.Fields{"class": "Goroutine"})

func (d GoroutineDelayer) Execute(evt interfaces.IncomingEventInterface, dest interfaces.DestinationInterface, exec func()) {
func (d GoroutineDelayer) Execute(ctx context.Context, evt interfaces.IncomingEventInterface, dest interfaces.DestinationInterface, exec func()) {
event := d.TapDelayedEvent(evt.(models.IncomingEvent), dest.(models.Destination))
config := dest.GetConfig()

Expand Down
9 changes: 5 additions & 4 deletions errors/dispatcher_error.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,15 @@ package errors

import (
"fmt"
interfaces "github.com/shoplineapp/captin/interfaces"
models "github.com/shoplineapp/captin/models"

interfaces "github.com/shoplineapp/captin/v2/interfaces"
models "github.com/shoplineapp/captin/v2/models"
)

var _ interfaces.ErrorInterface = &DispatcherError{}

// DispatcherError - Error when send events
type DispatcherError struct {
interfaces.ErrorInterface

Msg string
Event models.IncomingEvent
Destination models.Destination
Expand Down
2 changes: 1 addition & 1 deletion errors/execution_error.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package errors

import (
"fmt"
interfaces "github.com/shoplineapp/captin/interfaces"
interfaces "github.com/shoplineapp/captin/v2/interfaces"
)

// ExecutionError - Error on executing events
Expand Down
4 changes: 2 additions & 2 deletions errors/unretryable_errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ package errors
import (
"fmt"

interfaces "github.com/shoplineapp/captin/interfaces"
models "github.com/shoplineapp/captin/models"
interfaces "github.com/shoplineapp/captin/v2/interfaces"
models "github.com/shoplineapp/captin/v2/models"
)

type UnretryableError struct {
Expand Down
8 changes: 4 additions & 4 deletions examples/api/incoming/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@ package incoming

import (
"github.com/gin-gonic/gin"
interfaces "github.com/shoplineapp/captin/interfaces"
models "github.com/shoplineapp/captin/models"
interfaces "github.com/shoplineapp/captin/v2/interfaces"
models "github.com/shoplineapp/captin/v2/models"
"net/http"
)

Expand All @@ -18,7 +18,7 @@ func (h *HttpEventHandler) Setup(c interfaces.CaptinInterface) {

func (h HttpEventHandler) SetRoutes(router *gin.Engine) {
router.GET("/", func(c *gin.Context) {
c.String(200, "github.com/shoplineapp/captin aboard")
c.String(200, "github.com/shoplineapp/captin/v2 aboard")
})
router.POST("/api/events", func(c *gin.Context) {
h.HandleEventCreation(c)
Expand All @@ -35,7 +35,7 @@ func (h HttpEventHandler) HandleEventCreation(c *gin.Context) {
return
}

_, errors := h.captin.Execute(event)
_, errors := h.captin.Execute(c, event)
if len(errors) > 0 {
c.JSON(http.StatusBadRequest, gin.H{"error": "Error occurred when handling event"})
return
Expand Down
8 changes: 4 additions & 4 deletions examples/api/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,10 @@ import (

"github.com/gin-gonic/gin"

core "github.com/shoplineapp/captin/core"
incoming "github.com/shoplineapp/captin/incoming"
stores "github.com/shoplineapp/captin/internal/stores"
models "github.com/shoplineapp/captin/models"
core "github.com/shoplineapp/captin/v2/core"
incoming "github.com/shoplineapp/captin/v2/incoming"
stores "github.com/shoplineapp/captin/v2/internal/stores"
models "github.com/shoplineapp/captin/v2/models"
)

func main() {
Expand Down
6 changes: 3 additions & 3 deletions examples/api/test/incoming/http_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@ import (
"net/http/httptest"
"testing"

. "github.com/shoplineapp/captin/incoming"
interfaces "github.com/shoplineapp/captin/interfaces"
models "github.com/shoplineapp/captin/models"
. "github.com/shoplineapp/captin/v2/incoming"
interfaces "github.com/shoplineapp/captin/v2/interfaces"
models "github.com/shoplineapp/captin/v2/models"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
)
Expand Down
Loading