Describe the bug
The TAPA service is experiencing a critical deadlock that prevents the proper lifecycle management of Streams. This issue manifests e.g. as new Tap.Open RPC requests hanging indefinitely, alongside a failure to properly clean up or update existing Streams, leading to general system unresponsiveness.
The core of the problem lies in a contention between two parallel critical paths:
-
Stream Activation Path: StreamManager is designed to wait for a Stream to be confirmed by NSP before proceeding with its opening. Once this confirmation is received, StreamManager initiates the opening process for pending streams. As part of this, an individual Stream.Open operation attempts to fetch local IPs from the Conduit using Conduit.GetIPs().
-
Connection Monitoring & Cleanup Path: In parallel, the Conduit actively monitors its underlying network connection. Upon detecting an event indicating an IP address change, the Conduit initiates a cleanup/reconfiguration sequence. Crucially, during this sequence, the Conduit attempts to stop the StreamManager (c.StreamManager.Stop()) first while already holding its own mutex, c.mu.
The above describes a classic circular dependency:
- The
Stream.Open worker goroutine holds s.mu and is blocked waiting for c.mu (via its call to Conduit.GetIPs()).
- Conversely, the
Conduit.monitorConnection goroutine holds c.mu (as it initiated the cleanup sequence while holding this lock) and indirectly waits for s.mu (via the Stream.Close workers it spawned, which need s.mu to complete).
This deadlock is highly likely to be triggered upon opening a Stream via the TAP API (when it involves establishing a Conduit connection). Specifically, a scenario where updating the connection with VIP addresses prompts a connection monitor event that observes a change in local IPs.
A potential underlying cause for such a local IP update could be a faulty Proxy that lacks proper bridge IPs, leading to confusion in the TAPA side's logic for determining local IPs and thus triggering spurious updates or reconfigurations.
To Reproduce
Steps to reproduce the behavior:
I have modified the proxy code to never succeed obtaining a bridge IP, thus securing that on the TAPA side once VIP update takes place (for a recently established conduit connection) the localIPs get replaced by an empty list.
- Deploy a Trench with 2 Conduits each with at least 1 Stream and Flow.
- Deploy example targets.
- In the example target PODs every 30 seconds either try to open Streams within the two Conduits, or try try to close all opened Streams depending on their status.
- When the problem is reproduced the close/open TAP API call shall hang.
Context
Logs
Conduit load-balancer-b1 with Stream stream-b1
Conduit load-balancer-a1 with Streams stream-a1 and stream-a2
#578 (comment)
(pprof) traces
File: tapa
Type: goroutine
Time: 2025-06-25 13:58:39 CEST
-----------+-------------------------------------------------------
18 runtime.gopark
runtime.selectgo
google.golang.org/grpc/internal/grpcsync.(*CallbackSerializer).run
-----------+-------------------------------------------------------
9 runtime.gopark
runtime.selectgo
google.golang.org/grpc.newClientStreamWithParams.func4
-----------+-------------------------------------------------------
5 runtime.gopark
runtime.selectgo
google.golang.org/grpc/internal/transport.(*controlBuffer).get
google.golang.org/grpc/internal/transport.(*loopyWriter).run
google.golang.org/grpc/internal/transport.newHTTP2Client.func6
-----------+-------------------------------------------------------
3 runtime.gopark
runtime.netpollblock
internal/poll.runtime_pollWait
internal/poll.(*pollDesc).wait
internal/poll.(*pollDesc).waitRead (inline)
internal/poll.(*FD).ReadMsg
net.(*netFD).readMsg
net.(*UnixConn).readMsg
net.(*UnixConn).ReadMsgUnix
github.com/edwarnicke/grpcfd.(*connWrap).Read
crypto/tls.(*atLeastReader).Read
bytes.(*Buffer).ReadFrom
crypto/tls.(*Conn).readFromUntil
crypto/tls.(*Conn).readRecordOrCCS
crypto/tls.(*Conn).readRecord (inline)
crypto/tls.(*Conn).Read
bufio.(*Reader).Read
io.ReadAtLeast
io.ReadFull (inline)
golang.org/x/net/http2.readFrameHeader
golang.org/x/net/http2.(*Framer).ReadFrame
google.golang.org/grpc/internal/transport.(*http2Client).reader
-----------+-------------------------------------------------------
2 runtime.gopark
runtime.chanrecv
runtime.chanrecv1
github.com/networkservicemesh/sdk/pkg/networkservice/common/dial.(*dialer).Dial.func1
-----------+-------------------------------------------------------
2 runtime.gopark
runtime.selectgo
context.(*cancelCtx).propagateCancel.func2
-----------+-------------------------------------------------------
2 runtime.gopark
runtime.selectgo
google.golang.org/grpc/internal/transport.(*recvBufferReader).readClient
google.golang.org/grpc/internal/transport.(*recvBufferReader).Read
google.golang.org/grpc/internal/transport.(*transportReader).Read
io.ReadAtLeast
io.ReadFull (inline)
google.golang.org/grpc/internal/transport.(*Stream).Read
google.golang.org/grpc.(*parser).recvMsg
google.golang.org/grpc.recvAndDecompress
google.golang.org/grpc.recv
google.golang.org/grpc.(*csAttempt).recvMsg
google.golang.org/grpc.(*clientStream).RecvMsg.func1
google.golang.org/grpc.(*clientStream).withRetry
google.golang.org/grpc.(*clientStream).RecvMsg
github.com/networkservicemesh/api/pkg/api/networkservice.(*monitorConnectionMonitorConnectionsClient).Recv
github.com/networkservicemesh/sdk/pkg/networkservice/common/heal.(*clientFilter).Recv
github.com/networkservicemesh/sdk/pkg/networkservice/common/heal.(*eventLoop).monitorCtrlPlane.func1
-----------+-------------------------------------------------------
2 runtime.gopark
runtime.selectgo
google.golang.org/grpc/internal/transport.(*recvBufferReader).readClient
google.golang.org/grpc/internal/transport.(*recvBufferReader).Read
google.golang.org/grpc/internal/transport.(*transportReader).Read
io.ReadAtLeast
io.ReadFull (inline)
google.golang.org/grpc/internal/transport.(*Stream).Read
google.golang.org/grpc.(*parser).recvMsg
google.golang.org/grpc.recvAndDecompress
google.golang.org/grpc.recv
google.golang.org/grpc.(*csAttempt).recvMsg
google.golang.org/grpc.(*clientStream).RecvMsg.func1
google.golang.org/grpc.(*clientStream).withRetry
google.golang.org/grpc.(*clientStream).RecvMsg
github.com/nordix/meridio/api/nsp/v1.(*configurationManagerWatchStreamClient).Recv
github.com/nordix/meridio/pkg/ambassador/tap/conduit.(*configurationImpl).watchStreams.func1
github.com/nordix/meridio/pkg/retry.Do
github.com/nordix/meridio/pkg/ambassador/tap/conduit.(*configurationImpl).watchStreams
-----------+-------------------------------------------------------
2 runtime.gopark
runtime.selectgo
google.golang.org/grpc/internal/transport.(*recvBufferReader).readClient
google.golang.org/grpc/internal/transport.(*recvBufferReader).Read
google.golang.org/grpc/internal/transport.(*transportReader).Read
io.ReadAtLeast
io.ReadFull (inline)
google.golang.org/grpc/internal/transport.(*Stream).Read
google.golang.org/grpc.(*parser).recvMsg
google.golang.org/grpc.recvAndDecompress
google.golang.org/grpc.recv
google.golang.org/grpc.(*csAttempt).recvMsg
google.golang.org/grpc.(*clientStream).RecvMsg.func1
google.golang.org/grpc.(*clientStream).withRetry
google.golang.org/grpc.(*clientStream).RecvMsg
github.com/nordix/meridio/api/nsp/v1.(*configurationManagerWatchFlowClient).Recv
github.com/nordix/meridio/pkg/ambassador/tap/trench.(*configurationImpl).watchVIPs.func1
github.com/nordix/meridio/pkg/retry.Do
github.com/nordix/meridio/pkg/ambassador/tap/trench.(*configurationImpl).watchVIPs
-----------+-------------------------------------------------------
2 runtime.gopark
runtime.selectgo
github.com/networkservicemesh/sdk/pkg/networkservice/common/refresh.(*refreshClient).Request.func1
-----------+-------------------------------------------------------
2 runtime.gopark
runtime.selectgo
github.com/networkservicemesh/sdk/pkg/networkservice/common/heal.(*eventLoop).waitForEvents
github.com/networkservicemesh/sdk/pkg/networkservice/common/heal.(*eventLoop).eventLoop
-----------+-------------------------------------------------------
2 runtime.gopark
runtime.goparkunlock (inline)
runtime.semacquire1
sync.runtime_SemacquireMutex
sync.(*Mutex).lockSlow
sync.(*Mutex).Lock (inline)
github.com/nordix/meridio/pkg/ambassador/tap/stream.(*Stream).Close
github.com/nordix/meridio/pkg/ambassador/tap/conduit.(*streamRetry).Close
github.com/nordix/meridio/pkg/ambassador/tap/conduit.(*streamManager).Stop.func1
-----------+-------------------------------------------------------
2 runtime.gopark
runtime.goparkunlock (inline)
runtime.semacquire1
sync.runtime_SemacquireMutex
sync.(*Mutex).lockSlow
sync.(*Mutex).Lock (inline)
github.com/nordix/meridio/pkg/ambassador/tap/conduit.(*Conduit).GetIPs
github.com/nordix/meridio/pkg/ambassador/tap/stream.(*Stream).Open
github.com/nordix/meridio/pkg/ambassador/tap/conduit.(*streamRetry).Open.func1.1.1
github.com/nordix/meridio/pkg/retry.Do
github.com/nordix/meridio/pkg/ambassador/tap/conduit.(*streamRetry).Open.func1.1
github.com/nordix/meridio/pkg/retry.Do
github.com/nordix/meridio/pkg/ambassador/tap/conduit.(*streamRetry).Open.func1
-----------+-------------------------------------------------------
2 runtime.gopark
runtime.goparkunlock (inline)
runtime.semacquire1
sync.runtime_SemacquireMutex
sync.(*Mutex).lockSlow
sync.(*Mutex).Lock (inline)
github.com/nordix/meridio/pkg/ambassador/tap.(*Tap).Close.func1
-----------+-------------------------------------------------------
1 runtime.notetsleepg
os/signal.signal_recv
os/signal.loop
-----------+-------------------------------------------------------
1 runtime.goroutineProfileWithLabels
runtime/pprof.runtime_goroutineProfileWithLabels
runtime/pprof.writeRuntimeProfile
runtime/pprof.writeGoroutine
runtime/pprof.(*Profile).WriteTo
net/http/pprof.handler.ServeHTTP
net/http/pprof.Index
net/http.HandlerFunc.ServeHTTP
net/http.(*ServeMux).ServeHTTP
net/http.serverHandler.ServeHTTP
net/http.(*conn).serve
-----------+-------------------------------------------------------
1 runtime.gopark
runtime.chanrecv
runtime.chanrecv1
github.com/nordix/meridio/pkg/health.(*Checker).Start.func2
-----------+-------------------------------------------------------
1 runtime.gopark
runtime.chanrecv
runtime.chanrecv1
main.main
runtime.main
-----------+-------------------------------------------------------
1 runtime.gopark
runtime.netpollblock
internal/poll.runtime_pollWait
internal/poll.(*pollDesc).wait
internal/poll.(*pollDesc).waitRead (inline)
internal/poll.(*FD).Read
net.(*netFD).Read
net.(*conn).Read
crypto/tls.(*atLeastReader).Read
bytes.(*Buffer).ReadFrom
crypto/tls.(*Conn).readFromUntil
crypto/tls.(*Conn).readRecordOrCCS
crypto/tls.(*Conn).readRecord (inline)
crypto/tls.(*Conn).Read
bufio.(*Reader).Read
io.ReadAtLeast
io.ReadFull (inline)
golang.org/x/net/http2.readFrameHeader
golang.org/x/net/http2.(*Framer).ReadFrame
google.golang.org/grpc/internal/transport.(*http2Client).reader
-----------+-------------------------------------------------------
1 runtime.gopark
runtime.netpollblock
internal/poll.runtime_pollWait
internal/poll.(*pollDesc).wait
internal/poll.(*pollDesc).waitRead (inline)
internal/poll.(*FD).Read
net.(*netFD).Read
net.(*conn).Read
bufio.(*Reader).Read
io.ReadAtLeast
io.ReadFull (inline)
golang.org/x/net/http2.readFrameHeader
golang.org/x/net/http2.(*Framer).ReadFrame
google.golang.org/grpc/internal/transport.(*http2Client).reader
-----------+-------------------------------------------------------
1 runtime.gopark
runtime.netpollblock
internal/poll.runtime_pollWait
internal/poll.(*pollDesc).wait
internal/poll.(*pollDesc).waitRead (inline)
internal/poll.(*FD).Read
net.(*netFD).Read
net.(*conn).Read
bufio.(*Reader).Read
io.ReadAtLeast
io.ReadFull (inline)
golang.org/x/net/http2.readFrameHeader
golang.org/x/net/http2.(*Framer).ReadFrame
google.golang.org/grpc/internal/transport.(*http2Server).HandleStreams
google.golang.org/grpc.(*Server).serveStreams
google.golang.org/grpc.(*Server).handleRawConn.func1
-----------+-------------------------------------------------------
1 runtime.gopark
runtime.netpollblock
internal/poll.runtime_pollWait
internal/poll.(*pollDesc).wait
internal/poll.(*pollDesc).waitRead (inline)
internal/poll.(*FD).Read
net.(*netFD).Read
net.(*conn).Read
net/http.(*connReader).backgroundRead
-----------+-------------------------------------------------------
1 runtime.gopark
runtime.netpollblock
internal/poll.runtime_pollWait
internal/poll.(*pollDesc).wait
internal/poll.(*pollDesc).waitRead (inline)
internal/poll.(*FD).Accept
net.(*netFD).accept
net.(*TCPListener).accept
net.(*TCPListener).Accept
net/http.(*Server).Serve
net/http.(*Server).ListenAndServe
net/http.ListenAndServe (inline)
main.main.func1
-----------+-------------------------------------------------------
1 runtime.gopark
runtime.netpollblock
internal/poll.runtime_pollWait
internal/poll.(*pollDesc).wait
internal/poll.(*pollDesc).waitRead (inline)
internal/poll.(*FD).Accept
net.(*netFD).accept
net.(*UnixListener).accept
net.(*UnixListener).Accept
google.golang.org/grpc.(*Server).Serve
github.com/nordix/meridio/pkg/health.(*Checker).Start
github.com/nordix/meridio/pkg/health.CreateChecker.func1
-----------+-------------------------------------------------------
1 runtime.gopark
runtime.netpollblock
internal/poll.runtime_pollWait
internal/poll.(*pollDesc).wait
internal/poll.(*pollDesc).waitRead (inline)
internal/poll.(*FD).Accept
net.(*netFD).accept
net.(*UnixListener).accept
net.(*UnixListener).Accept
google.golang.org/grpc.(*Server).Serve
main.main.func3
-----------+-------------------------------------------------------
1 runtime.gopark
runtime.selectgo
os/signal.NotifyContext.func1
-----------+-------------------------------------------------------
1 runtime.gopark
runtime.selectgo
google.golang.org/grpc/internal/transport.(*controlBuffer).get
google.golang.org/grpc/internal/transport.(*loopyWriter).run
google.golang.org/grpc/internal/transport.NewServerTransport.func2
-----------+-------------------------------------------------------
1 runtime.gopark
runtime.selectgo
google.golang.org/grpc/internal/transport.(*http2Client).keepalive
-----------+-------------------------------------------------------
1 runtime.gopark
runtime.selectgo
google.golang.org/grpc/internal/transport.(*http2Server).keepalive
-----------+-------------------------------------------------------
1 runtime.gopark
runtime.selectgo
google.golang.org/grpc/internal/transport.(*recvBufferReader).readClient
google.golang.org/grpc/internal/transport.(*recvBufferReader).Read
google.golang.org/grpc/internal/transport.(*transportReader).Read
io.ReadAtLeast
io.ReadFull (inline)
google.golang.org/grpc/internal/transport.(*Stream).Read
google.golang.org/grpc.(*parser).recvMsg
google.golang.org/grpc.recvAndDecompress
google.golang.org/grpc.recv
google.golang.org/grpc.(*csAttempt).recvMsg
google.golang.org/grpc.(*clientStream).RecvMsg.func1
google.golang.org/grpc.(*clientStream).withRetry
google.golang.org/grpc.(*clientStream).RecvMsg
github.com/networkservicemesh/api/pkg/api/networkservice.(*monitorConnectionMonitorConnectionsClient).Recv
github.com/nordix/meridio/pkg/ambassador/tap/conduit.(*Conduit).monitorConnection.func1
github.com/nordix/meridio/pkg/retry.Do
github.com/nordix/meridio/pkg/ambassador/tap/conduit.(*Conduit).monitorConnection
-----------+-------------------------------------------------------
1 runtime.gopark
runtime.selectgo
google.golang.org/grpc/internal/transport.(*recvBufferReader).readClient
google.golang.org/grpc/internal/transport.(*recvBufferReader).Read
google.golang.org/grpc/internal/transport.(*transportReader).Read
io.ReadAtLeast
io.ReadFull (inline)
google.golang.org/grpc/internal/transport.(*Stream).Read
google.golang.org/grpc.(*parser).recvMsg
google.golang.org/grpc.recvAndDecompress
google.golang.org/grpc.recv
google.golang.org/grpc.(*csAttempt).recvMsg
google.golang.org/grpc.(*clientStream).RecvMsg.func1
google.golang.org/grpc.(*clientStream).withRetry
google.golang.org/grpc.(*clientStream).RecvMsg
github.com/spiffe/go-spiffe/v2/proto/spiffe/workload.(*spiffeWorkloadAPIFetchX509SVIDClient).Recv
github.com/spiffe/go-spiffe/v2/workloadapi.(*Client).watchX509Context
github.com/spiffe/go-spiffe/v2/workloadapi.(*Client).WatchX509Context
github.com/spiffe/go-spiffe/v2/workloadapi.newWatcher.func4
-----------+-------------------------------------------------------
1 runtime.gopark
runtime.selectgo
github.com/nordix/meridio/pkg/retry.delay.func1
-----------+-------------------------------------------------------
1 runtime.gopark
runtime.selectgo
github.com/nordix/meridio/pkg/retry.Do
github.com/nordix/meridio/pkg/ambassador/tap/conduit.(*streamRetry).Open.func1.1
github.com/nordix/meridio/pkg/retry.Do
github.com/nordix/meridio/pkg/ambassador/tap/conduit.(*streamRetry).Open.func1
-----------+-------------------------------------------------------
1 runtime.gopark
runtime.selectgo
github.com/nordix/meridio/pkg/ambassador/tap/conduit.(*configurationImpl).streamHandler
-----------+-------------------------------------------------------
1 runtime.gopark
runtime.selectgo
github.com/nordix/meridio/pkg/ambassador/tap/trench.(*configurationImpl).vipHandler
-----------+-------------------------------------------------------
1 runtime.gopark
runtime.selectgo
github.com/nordix/meridio/pkg/health/probe.CreateAndRunGRPCHealthProbe.func2
-----------+-------------------------------------------------------
1 runtime.gopark
runtime.goparkunlock (inline)
runtime.semacquire1
sync.runtime_Semacquire
sync.(*WaitGroup).Wait
github.com/nordix/meridio/pkg/ambassador/tap/conduit.(*streamManager).Stop
github.com/nordix/meridio/pkg/ambassador/tap/conduit.(*Conduit).monitorConnection.func1
github.com/nordix/meridio/pkg/retry.Do
github.com/nordix/meridio/pkg/ambassador/tap/conduit.(*Conduit).monitorConnection
-----------+-------------------------------------------------------
1 runtime.gopark
runtime.goparkunlock (inline)
runtime.semacquire1
sync.runtime_SemacquireMutex
sync.(*Mutex).lockSlow
sync.(*Mutex).Lock (inline)
github.com/nordix/meridio/pkg/ambassador/tap/conduit.(*Conduit).SetVIPs
github.com/nordix/meridio/pkg/ambassador/tap/trench.(*configurationImpl).vipHandler.func1
github.com/nordix/meridio/pkg/retry.Do
github.com/nordix/meridio/pkg/ambassador/tap/trench.(*configurationImpl).vipHandler
-----------+-------------------------------------------------------
1 runtime.gopark
runtime.goparkunlock (inline)
runtime.semacquire1
sync.runtime_SemacquireMutex
sync.(*Mutex).lockSlow
sync.(*Mutex).Lock (inline)
github.com/nordix/meridio/pkg/ambassador/tap/conduit.(*streamManager).RemoveStream
github.com/nordix/meridio/pkg/ambassador/tap/conduit.(*Conduit).RemoveStream
github.com/nordix/meridio/pkg/ambassador/tap.(*Tap).Close.func1
-----------+-------------------------------------------------------
1 runtime.gopark
runtime.goparkunlock (inline)
runtime.semacquire1
sync.runtime_SemacquireMutex
sync.(*Mutex).lockSlow
sync.(*Mutex).Lock (inline)
github.com/nordix/meridio/pkg/ambassador/tap/conduit.(*streamManager).SetStreams
github.com/nordix/meridio/pkg/ambassador/tap/conduit.(*configurationImpl).streamHandler
-----------+-------------------------------------------------------
1 runtime.gopark
runtime.goparkunlock (inline)
runtime.semacquire1
sync.runtime_SemacquireMutex
sync.(*Mutex).lockSlow
sync.(*Mutex).Lock (inline)
github.com/nordix/meridio/pkg/ambassador/tap.(*Tap).Open
github.com/nordix/meridio/api/ambassador/v1._Tap_Open_Handler
google.golang.org/grpc.(*Server).processUnaryRPC
google.golang.org/grpc.(*Server).handleStream
google.golang.org/grpc.(*Server).serveStreams.func2.1
-----------+-------------------------------------------------------
(pprof)
Describe the bug
The TAPA service is experiencing a critical deadlock that prevents the proper lifecycle management of Streams. This issue manifests e.g. as new
Tap.OpenRPC requests hanging indefinitely, alongside a failure to properly clean up or update existing Streams, leading to general system unresponsiveness.The core of the problem lies in a contention between two parallel critical paths:
Stream Activation Path:
StreamManageris designed to wait for a Stream to be confirmed by NSP before proceeding with its opening. Once this confirmation is received,StreamManagerinitiates the opening process for pending streams. As part of this, an individualStream.Openoperation attempts to fetch local IPs from the Conduit usingConduit.GetIPs().Connection Monitoring & Cleanup Path: In parallel, the Conduit actively monitors its underlying network connection. Upon detecting an event indicating an IP address change, the Conduit initiates a cleanup/reconfiguration sequence. Crucially, during this sequence, the Conduit attempts to stop the
StreamManager(c.StreamManager.Stop()) first while already holding its own mutex,c.mu.The above describes a classic circular dependency:
Stream.Openworker goroutine holdss.muand is blocked waiting forc.mu(via its call toConduit.GetIPs()).Conduit.monitorConnectiongoroutine holdsc.mu(as it initiated the cleanup sequence while holding this lock) and indirectly waits fors.mu(via theStream.Closeworkers it spawned, which needs.muto complete).This deadlock is highly likely to be triggered upon opening a Stream via the TAP API (when it involves establishing a Conduit connection). Specifically, a scenario where updating the connection with VIP addresses prompts a connection monitor event that observes a change in local IPs.
A potential underlying cause for such a local IP update could be a faulty Proxy that lacks proper bridge IPs, leading to confusion in the TAPA side's logic for determining local IPs and thus triggering spurious updates or reconfigurations.
To Reproduce
Steps to reproduce the behavior:
I have modified the proxy code to never succeed obtaining a bridge IP, thus securing that on the TAPA side once VIP update takes place (for a recently established conduit connection) the localIPs get replaced by an empty list.
Context
Logs
Conduit
load-balancer-b1with Streamstream-b1Conduit
load-balancer-a1with Streamsstream-a1andstream-a2#578 (comment)