Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 46 additions & 20 deletions filebeat/input/filestream/environment_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -317,8 +317,8 @@ func (e *inputTestingEnvironment) waitUntilOffsetInRegistry(
e.t.Fatalf("could not stat '%s', err: %s", filepath, err)
}

fileSizeString.WriteString(fmt.Sprint(fi.Size()))
cursorString.WriteString(fmt.Sprint(entry.Cursor.Offset))
fmt.Fprint(&fileSizeString, fi.Size())
fmt.Fprint(&cursorString, entry.Cursor.Offset)

return entry.Cursor.Offset == expectedOffset
},
Expand Down Expand Up @@ -492,9 +492,9 @@ func (e *inputTestingEnvironment) requireEventsReceived(events []string) {
}

var missingEvents []string
for i, found := range foundEvents {
if !found {
missingEvents = append(missingEvents, events[i])
for i, ev := range events {
if !foundEvents[i] {
missingEvents = append(missingEvents, ev)
}
}

Expand Down Expand Up @@ -587,8 +587,23 @@ type mockClient struct {
published []beat.Event
ackHandler beat.EventListener
closed atomic.Bool
mtx sync.Mutex
canceler context.CancelFunc
// publishingStarted is set the first time PublishAll is called. It must
// be readable without holding mtx because PublishAll keeps mtx while
// invoking ackHandler.ACKEvents, which can block (e.g. with a blocking
// ack handler used in TestFilestreamTruncateBlockedOutput).
publishingStarted atomic.Bool
mtx sync.Mutex
// done is closed by cancel() to release a blocked ack handler. Tests call
// cancel either directly (via this client) or via
// mockPipelineConnector.cancelAllClients.
done chan struct{}
cancelOnce sync.Once
}

// cancel releases a blocked ack handler. Safe to call from multiple goroutines
// and idempotent.
func (c *mockClient) cancel() {
c.cancelOnce.Do(func() { close(c.done) })
}

// GetEvents returns the published events
Expand All @@ -610,6 +625,12 @@ func (c *mockClient) PublishAll(events []beat.Event) {
defer c.mtx.Unlock()

c.publishing = append(c.publishing, events...)
if len(events) > 0 {
// Only flag as started for non-empty batches so an empty PublishAll
// does not wake waitUntilPublishingHasStarted prematurely (preserving
// the pre-fix semantics of `len(c.publishing) > 0`).
c.publishingStarted.Store(true)
}
for _, event := range events {
c.ackHandler.AddEvent(event, true)
}
Expand All @@ -619,7 +640,7 @@ func (c *mockClient) PublishAll(events []beat.Event) {
}

func (c *mockClient) waitUntilPublishingHasStarted() {
for len(c.publishing) == 0 {
for !c.publishingStarted.Load() {
time.Sleep(10 * time.Millisecond)
}
}
Expand Down Expand Up @@ -667,38 +688,43 @@ func (pc *mockPipelineConnector) ConnectWith(config beat.ClientConfig) (beat.Cli
pc.mtx.Lock()
defer pc.mtx.Unlock()

ctx, cancel := context.WithCancel(context.Background())
c := &mockClient{
canceler: cancel,
ackHandler: newMockACKHandler(ctx, pc.blocking, config),
}

c := newMockClient(pc.blocking, config)
pc.clients = append(pc.clients, c)

return c, nil
}

func newMockClient(blocking bool, config beat.ClientConfig) *mockClient {
done := make(chan struct{})
return &mockClient{
done: done,
ackHandler: newMockACKHandler(done, blocking, config),
}
}

func (pc *mockPipelineConnector) cancelAllClients() {
pc.mtx.Lock()
defer pc.mtx.Unlock()

for _, client := range pc.clients {
client.canceler()
client.cancel()
}
}

func newMockACKHandler(starter context.Context, blocking bool, config beat.ClientConfig) beat.EventListener {
func newMockACKHandler(done <-chan struct{}, blocking bool, config beat.ClientConfig) beat.EventListener {
if !blocking {
return config.EventListener
}

return acker.Combine(blockingACKer(starter), config.EventListener)
return acker.Combine(blockingACKer(done), config.EventListener)
}

func blockingACKer(starter context.Context) beat.EventListener {
// blockingACKer blocks the publisher's ack call until done is closed. Tests
// rely on this to hold cursorPublisher.forward in PublishAll long enough to
// observe back-pressure scenarios.
func blockingACKer(done <-chan struct{}) beat.EventListener {
return acker.EventPrivateReporter(func(acked int, private []any) {
for starter.Err() == nil {
}
<-done
})
}

Expand Down
27 changes: 25 additions & 2 deletions filebeat/input/filestream/fswatch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -286,7 +286,17 @@ scanner:
defer cancel()

fw := createWatcherWithConfig(t, logptest.NewTestingLogger(t, ""), paths, cfgStr)
go fw.Run(ctx)
// Wait for the watcher goroutine to exit before the subtest returns.
// logptest.NewTestingLogger writes via t.Log, which is unsafe to call
// after the subtest finishes and triggers a data race in
// testing.(*common).destination. The deferred cancel above runs
// before t.Cleanup, so this only needs to wait for Run to return.
runDone := make(chan struct{})
go func() {
defer close(runDone)
fw.Run(ctx)
}()
t.Cleanup(func() { <-runDone })

basename := "created.log"
filename := filepath.Join(dir, basename)
Expand Down Expand Up @@ -384,7 +394,15 @@ scanner:
inMemoryLog, buff := logp.NewInMemoryLocal("", logp.JSONEncoderConfig())
fw := createWatcherWithConfig(t, inMemoryLog, paths, cfgStr)

go fw.Run(ctx)
// Wrap Run so we can wait for the watcher goroutine to exit before
// inspecting the in-memory log buffer. The buffer returned by
// logp.NewInMemoryLocal is goroutine safe for writes only — reading
// it concurrently with watcher logging triggers the race detector.
runDone := make(chan struct{})
go func() {
defer close(runDone)
fw.Run(ctx)
}()

expectedEvents := []loginp.FSEvent{
{
Expand Down Expand Up @@ -427,6 +445,11 @@ scanner:
requireEqualEvents(t, expectedEvents[i], actualEvent)
}

// Stop the watcher and wait for its goroutine to return so the buffer
// is no longer being written to before we read from it.
cancel()
<-runDone

require.NotContainsf(t, buff.String(), "WARN",
"must be no warning messages")
})
Expand Down
2 changes: 1 addition & 1 deletion filebeat/input/filestream/input_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -845,7 +845,7 @@ func TestFilestreamTruncateBlockedOutput(t *testing.T) {
time.Sleep(10 * time.Millisecond)
}
env.pipeline.clients[0].waitUntilPublishingHasStarted()
env.pipeline.clients[0].canceler()
env.pipeline.clients[0].cancel()

env.waitUntilEventCount(2)
env.requireOffsetInRegistry(testlogName, id, len(testlines))
Expand Down
74 changes: 56 additions & 18 deletions filebeat/input/filestream/prospector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ func TestProspector_InitCleanIfRemoved(t *testing.T) {
t.Run(name, func(t *testing.T) {
testStore := newMockStoreUpdater(testCase.entries)
p := fileProspector{
logger: logp.L(),
logger: logp.NewNopLogger(),
identifier: mustPathIdentifier(false),
cleanRemoved: testCase.cleanRemoved,
filewatcher: newMockFileWatcherWithFiles(testCase.filesOnDisk),
Expand Down Expand Up @@ -162,7 +162,7 @@ func TestProspector_InitUpdateIdentifiers(t *testing.T) {
t.Run(name, func(t *testing.T) {
testStore := newMockStoreUpdater(testCase.entries)
p := fileProspector{
logger: logp.L(),
logger: logp.NewNopLogger(),
identifier: mustPathIdentifier(false),
filewatcher: newMockFileWatcherWithFiles(testCase.filesOnDisk),
}
Expand Down Expand Up @@ -262,7 +262,7 @@ func TestMigrateRegistryToFingerprint(t *testing.T) {
}

p := fileProspector{
logger: logp.L(),
logger: logp.NewNopLogger(),
identifier: tc.newIdentifier,
filewatcher: newMockFileWatcherWithFiles(filesOnDisk),
}
Expand Down Expand Up @@ -379,12 +379,12 @@ func TestProspectorNewAndUpdatedFiles(t *testing.T) {

t.Run(name, func(t *testing.T) {
p := fileProspector{
logger: logp.L(),
logger: logp.NewNopLogger(),
filewatcher: newMockFileWatcher(test.events, len(test.events)),
identifier: mustPathIdentifier(false),
ignoreOlder: test.ignoreOlder,
}
ctx := input.Context{Logger: logp.L(), Cancelation: context.Background()}
ctx := input.Context{Logger: logp.NewNopLogger(), Cancelation: context.Background()}
hg := newTestHarvesterGroup()

p.Run(ctx, newMockMetadataUpdater(), hg)
Expand Down Expand Up @@ -417,12 +417,12 @@ func TestProspectorHarvesterUpdateIgnoredFiles(t *testing.T) {

filewatcher := newMockFileWatcher([]loginp.FSEvent{eventCreate}, 2)
p := fileProspector{
logger: logp.L(),
logger: logp.NewNopLogger(),
filewatcher: filewatcher,
identifier: mustPathIdentifier(false),
ignoreOlder: 10 * time.Second,
}
ctx := input.Context{Logger: logp.L(), Cancelation: context.Background()}
ctx := input.Context{Logger: logp.NewNopLogger(), Cancelation: context.Background()}
hg := newTestHarvesterGroup()
testStore := newMockMetadataUpdater()
var wg sync.WaitGroup
Expand Down Expand Up @@ -482,12 +482,12 @@ func TestProspectorDeletedFile(t *testing.T) {

t.Run(name, func(t *testing.T) {
p := fileProspector{
logger: logp.L(),
logger: logp.NewNopLogger(),
filewatcher: newMockFileWatcher(test.events, len(test.events)),
identifier: mustPathIdentifier(false),
cleanRemoved: test.cleanRemoved,
}
ctx := input.Context{Logger: logp.L(), Cancelation: context.Background()}
ctx := input.Context{Logger: logp.NewNopLogger(), Cancelation: context.Background()}

testStore := newMockMetadataUpdater()
testStore.set("path::/path/to/file")
Expand Down Expand Up @@ -564,12 +564,12 @@ func TestProspectorRenamedFile(t *testing.T) {

t.Run(name, func(t *testing.T) {
p := fileProspector{
logger: logp.L(),
logger: logp.NewNopLogger(),
filewatcher: newMockFileWatcher(test.events, len(test.events)),
identifier: mustPathIdentifier(test.trackRename),
stateChangeCloser: stateChangeCloserConfig{Renamed: test.closeRenamed},
}
ctx := input.Context{Logger: logp.L(), Cancelation: context.Background()}
ctx := input.Context{Logger: logp.NewNopLogger(), Cancelation: context.Background()}

testStore := newMockMetadataUpdater()
testStore.set("path::/old/path/to/file")
Expand Down Expand Up @@ -700,8 +700,14 @@ func (m *mockFileWatcher) NotifyChan() chan loginp.HarvesterStatus {
return m.c
}

// mockMetadataUpdater is a test implementation of loginp.MetadataUpdater whose
// methods may be invoked from the prospector's goroutines while the test
// goroutine inspects the stored state (e.g. via assert.Eventually). Read paths
// dominate (assert.Eventually polls), so an RWMutex is used to allow
// concurrent reads.
type mockMetadataUpdater struct {
table map[string]interface{}
mu sync.RWMutex
table map[string]any
}

func newMockMetadataUpdater() *mockMetadataUpdater {
Expand All @@ -710,14 +716,38 @@ func newMockMetadataUpdater() *mockMetadataUpdater {
}
}

func (mu *mockMetadataUpdater) set(id string) { mu.table[id] = struct{}{} }
func (mu *mockMetadataUpdater) set(id string) {
mu.mu.Lock()
defer mu.mu.Unlock()
mu.table[id] = struct{}{}
}

// setRaw stores an arbitrary value under id. Used by tests that pre-populate
// the store before running the prospector.
func (mu *mockMetadataUpdater) setRaw(id string, v any) {
mu.mu.Lock()
defer mu.mu.Unlock()
mu.table[id] = v
}

// get returns the raw value stored under id. Used by tests that need to
// inspect the stored value after the prospector has run.
func (mu *mockMetadataUpdater) get(id string) any {
mu.mu.RLock()
defer mu.mu.RUnlock()
return mu.table[id]
}

func (mu *mockMetadataUpdater) has(id string) bool {
mu.mu.RLock()
defer mu.mu.RUnlock()
_, ok := mu.table[id]
return ok
}

func (mu *mockMetadataUpdater) checkOffset(id string, offset int64) bool {
mu.mu.RLock()
defer mu.mu.RUnlock()
c, ok := mu.table[id]
if !ok {
return false
Expand All @@ -729,25 +759,33 @@ func (mu *mockMetadataUpdater) checkOffset(id string, offset int64) bool {
return cursor.Offset == offset
}

func (mu *mockMetadataUpdater) FindCursorMeta(s loginp.Source, v interface{}) error {
func (mu *mockMetadataUpdater) FindCursorMeta(s loginp.Source, v any) error {
mu.mu.RLock()
defer mu.mu.RUnlock()
meta, ok := mu.table[s.Name()]
if !ok {
return fmt.Errorf("no such id [%q]", s.Name())
}
return typeconv.Convert(v, meta)
}

func (mu *mockMetadataUpdater) ResetCursor(s loginp.Source, cur interface{}) error {
func (mu *mockMetadataUpdater) ResetCursor(s loginp.Source, cur any) error {
mu.mu.Lock()
defer mu.mu.Unlock()
mu.table[s.Name()] = cur
return nil
}

func (mu *mockMetadataUpdater) UpdateMetadata(s loginp.Source, v interface{}) error {
func (mu *mockMetadataUpdater) UpdateMetadata(s loginp.Source, v any) error {
mu.mu.Lock()
defer mu.mu.Unlock()
mu.table[s.Name()] = v
return nil
}

func (mu *mockMetadataUpdater) Remove(s loginp.Source) error {
mu.mu.Lock()
defer mu.mu.Unlock()
delete(mu.table, s.Name())
return nil
}
Expand Down Expand Up @@ -867,13 +905,13 @@ func TestOnRenameFileIdentity(t *testing.T) {

testStore := newMockMetadataUpdater()
if tc.populateStore {
testStore.table[id] = fileMeta{Source: path, IdentifierName: expectedIdentifier}
testStore.setRaw(id, fileMeta{Source: path, IdentifierName: expectedIdentifier})
}

hg := newTestHarvesterGroup()
p.Run(ctx, testStore, hg)

got := testStore.table[id]
got := testStore.get(id)
meta := fileMeta{}
typeconv.Convert(&meta, got)

Expand Down
Loading
Loading