Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -418,7 +418,9 @@ func ProcessInChannel(wg *sync.WaitGroup, scConfig *common.SCConfiguration) {
out.Status = channel.SUCCESS
}
}
scConfig.EventOutCh <- &out
if !common.SendToChannel(scConfig.EventOutCh, &out) {
log.Warningf("EventOutCh full, dropping ack for %s", d.Address)
}
} else if d.Type == channel.STATUS && d.Status == channel.NEW {
log.Warnf("event disabled,no action taken(can't send to a destination): logging new status check %v\n", d)
out := channel.DataChan{
Expand Down
28 changes: 23 additions & 5 deletions cmd/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,16 +26,30 @@ var (
)

func storeCleanUp() {
_ = scConfig.PubSubAPI.DeleteAllPublishers()
_ = scConfig.PubSubAPI.DeleteAllSubscriptions()
if scConfig != nil && scConfig.PubSubAPI != nil {
log.Info("deleting all publishers")
_ = scConfig.PubSubAPI.DeleteAllPublishers()
}
if scConfig != nil && scConfig.SubscriberAPI != nil {
log.Info("deleting all subscription")
_, _ = scConfig.SubscriberAPI.DeleteAllSubscriptions()
}
}

func TestSidecar_MainWithHTTP(t *testing.T) {
apiPort = 8990
defer storeCleanUp()

// Create a unique temporary directory for this test run to avoid conflicts
tempDir, err := os.MkdirTemp("", "sidecar-test-*")
assert.NoError(t, err)
defer func() {
storeCleanUp()
os.RemoveAll(tempDir) // Clean up temp directory
}()

wg := &sync.WaitGroup{}
pl := plugins.Handler{Path: "../plugins"}
var storePath = "."
var storePath = tempDir
if sPath, ok := os.LookupEnv("STORE_PATH"); ok && sPath != "" {
storePath = sPath
}
Expand All @@ -55,10 +69,14 @@ func TestSidecar_MainWithHTTP(t *testing.T) {
Err: nil,
},
}

// Clean up any existing state before starting the test
storeCleanUp()

log.Infof("Configuration set to %#v", scConfig)

//start rest service
err := common.StartPubSubService(scConfig)
err = common.StartPubSubService(scConfig)
assert.Nil(t, err)

// imitate main process
Expand Down
39 changes: 32 additions & 7 deletions pkg/common/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,20 @@ import (
log "github.com/sirupsen/logrus"
)

const sendTimeout = 5 * time.Second

// SendToChannel sends data to a channel with a timeout.
// Returns true if the send succeeded, false if the channel was full for the
// entire timeout duration.
func SendToChannel(ch chan *channel.DataChan, d *channel.DataChan) bool {
select {
case ch <- d:
return true
case <-time.After(sendTimeout):
return false
}
}

// TransportType defines transport type supported
type TransportType int

Expand Down Expand Up @@ -324,28 +338,39 @@ func PublishEvent(scConfig *SCConfiguration, e ceevent.Event) error {
func PublishEventViaAPI(scConfig *SCConfiguration, cneEvent ceevent.Event, resourceAddress string) error {
if ceEvent, err := GetPublishingCloudEvent(scConfig, cneEvent); err == nil {
if IsV1Api(scConfig.APIVersion) {
scConfig.EventInCh <- &channel.DataChan{
d := &channel.DataChan{
Type: channel.EVENT,
Status: channel.NEW,
Data: ceEvent,
Address: ceEvent.Source(), // this is the publishing address
ClientID: scConfig.ClientID(),
}
if SendToChannel(scConfig.EventInCh, d) {
log.Debugf("event source %s sent to queue to process", ceEvent.Source())
log.Debugf("event sent %s", cneEvent.JSONString())
localmetrics.UpdateEventPublishedCount(ceEvent.Source(), localmetrics.SUCCESS, 1)
} else {
log.Warningf("EventInCh full for %s, dropping event for %s", sendTimeout, ceEvent.Source())
localmetrics.UpdateEventPublishedCount(ceEvent.Source(), localmetrics.FAIL, 1)
}
} else {
// use EventOutCh instead of EventInCh to bypass http transport
scConfig.EventOutCh <- &channel.DataChan{
d := &channel.DataChan{
Type: channel.EVENT,
Status: channel.NEW,
Data: ceEvent,
Address: resourceAddress, // this is the publishing address
ClientID: scConfig.ClientID(),
}
if SendToChannel(scConfig.EventOutCh, d) {
log.Debugf("event source %s sent to queue to process", ceEvent.Source())
log.Debugf("event sent %s", cneEvent.JSONString())
localmetrics.UpdateEventPublishedCount(ceEvent.Source(), localmetrics.SUCCESS, 1)
} else {
log.Warningf("EventOutCh full for %s, dropping event for %s", sendTimeout, resourceAddress)
localmetrics.UpdateEventPublishedCount(ceEvent.Source(), localmetrics.FAIL, 1)
}
}

log.Debugf("event source %s sent to queue to process", ceEvent.Source())
log.Debugf("event sent %s", cneEvent.JSONString())

localmetrics.UpdateEventPublishedCount(ceEvent.Source(), localmetrics.SUCCESS, 1)
}
return nil
}
Expand Down
47 changes: 44 additions & 3 deletions pkg/common/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,14 @@ package common_test
import (
"net"
"testing"
"time"

"github.com/redhat-cne/cloud-event-proxy/pkg/common"
"github.com/redhat-cne/sdk-go/pkg/channel"
ceevent "github.com/redhat-cne/sdk-go/pkg/event"
"github.com/redhat-cne/sdk-go/pkg/types"

v1pubsub "github.com/redhat-cne/sdk-go/v1/pubsub"
log "github.com/sirupsen/logrus"

"github.com/redhat-cne/cloud-event-proxy/pkg/common"
"github.com/stretchr/testify/assert"
)

Expand Down Expand Up @@ -61,3 +63,42 @@ func TestTransportHost_ParseTransportHost(t *testing.T) {
}
}
}

func TestPublishEventViaAPI_NonBlockingWhenChannelFull(t *testing.T) {
// Create a channel with buffer size 1 and fill it
eventOutCh := make(chan *channel.DataChan, 1)
eventOutCh <- &channel.DataChan{} // fill the buffer

pubSubAPI := v1pubsub.GetAPIInstance("/tmp/test-store")
pub, _ := pubSubAPI.CreatePublisher(v1pubsub.NewPubSub(
types.ParseURI("http://localhost/dummy"),
"/test/resource",
"1.0",
))

scConfig := &common.SCConfiguration{
EventOutCh: eventOutCh,
PubSubAPI: pubSubAPI,
}

// Create event with matching publisher ID
event := ceevent.Event{ID: pub.ID}

// PublishEventViaAPI should return after the 5s timeout (not block forever)
// even though the channel is full
done := make(chan struct{})
go func() {
_ = common.PublishEventViaAPI(scConfig, event, "/test/resource")
close(done)
}()

select {
case <-done:
// success — returned after timeout, did not block forever
case <-time.After(10 * time.Second):
t.Fatal("PublishEventViaAPI blocked on full EventOutCh — should return after 5s timeout")
}

// Channel should still have exactly 1 item (the original, not the new one)
assert.Equal(t, 1, len(eventOutCh))
}
31 changes: 31 additions & 0 deletions plugins/ptp_operator/metrics/filesystem.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package metrics

import (
"os"
)

type WFiles interface {
WriteFile(name string, data []byte, perm os.FileMode) error
}

type OSFileSystem struct {
}

func (f OSFileSystem) WriteFile(name string, data []byte, perm os.FileMode) error {
return os.WriteFile(name, data, perm)
}

type MockFileSystem struct {
WriteCount int
}

func (m *MockFileSystem) WriteFile(name string, data []byte, perm os.FileMode) error {
m.WriteCount += 1
return nil
}

func (m *MockFileSystem) Clear() {
m.WriteCount = 0
}

var Filesystem WFiles = OSFileSystem{}
Loading
Loading