Skip to content

TAPA; Deadlock due to interleaving Stream Management and Connection Monitoring #578

@zolug

Description

@zolug

Describe the bug
The TAPA service is experiencing a critical deadlock that prevents the proper lifecycle management of Streams. This issue manifests e.g. as new Tap.Open RPC requests hanging indefinitely, alongside a failure to properly clean up or update existing Streams, leading to general system unresponsiveness.

The core of the problem lies in a contention between two parallel critical paths:

  1. Stream Activation Path: StreamManager is designed to wait for a Stream to be confirmed by NSP before proceeding with its opening. Once this confirmation is received, StreamManager initiates the opening process for pending streams. As part of this, an individual Stream.Open operation attempts to fetch local IPs from the Conduit using Conduit.GetIPs().

  2. Connection Monitoring & Cleanup Path: In parallel, the Conduit actively monitors its underlying network connection. Upon detecting an event indicating an IP address change, the Conduit initiates a cleanup/reconfiguration sequence. Crucially, during this sequence, the Conduit attempts to stop the StreamManager (c.StreamManager.Stop()) first while already holding its own mutex, c.mu.

The above describes a classic circular dependency:

  • The Stream.Open worker goroutine holds s.mu and is blocked waiting for c.mu (via its call to Conduit.GetIPs()).
  • Conversely, the Conduit.monitorConnection goroutine holds c.mu (as it initiated the cleanup sequence while holding this lock) and indirectly waits for s.mu (via the Stream.Close workers it spawned, which need s.mu to complete).

This deadlock is highly likely to be triggered upon opening a Stream via the TAP API (when it involves establishing a Conduit connection). Specifically, a scenario where updating the connection with VIP addresses prompts a connection monitor event that observes a change in local IPs.
A potential underlying cause for such a local IP update could be a faulty Proxy that lacks proper bridge IPs, leading to confusion in the TAPA side's logic for determining local IPs and thus triggering spurious updates or reconfigurations.

To Reproduce
Steps to reproduce the behavior:
I have modified the proxy code to never succeed obtaining a bridge IP, thus securing that on the TAPA side once VIP update takes place (for a recently established conduit connection) the localIPs get replaced by an empty list.

  1. Deploy a Trench with 2 Conduits each with at least 1 Stream and Flow.
  2. Deploy example targets.
  3. In the example target PODs every 30 seconds either try to open Streams within the two Conduits, or try try to close all opened Streams depending on their status.
  4. When the problem is reproduced the close/open TAP API call shall hang.

Context

  • Meridio: v1.1.4

Logs
Conduit load-balancer-b1 with Stream stream-b1
Conduit load-balancer-a1 with Streams stream-a1 and stream-a2
#578 (comment)

(pprof) traces
File: tapa
Type: goroutine
Time: 2025-06-25 13:58:39 CEST
-----------+-------------------------------------------------------
        18   runtime.gopark
             runtime.selectgo
             google.golang.org/grpc/internal/grpcsync.(*CallbackSerializer).run
-----------+-------------------------------------------------------
         9   runtime.gopark
             runtime.selectgo
             google.golang.org/grpc.newClientStreamWithParams.func4
-----------+-------------------------------------------------------
         5   runtime.gopark
             runtime.selectgo
             google.golang.org/grpc/internal/transport.(*controlBuffer).get
             google.golang.org/grpc/internal/transport.(*loopyWriter).run
             google.golang.org/grpc/internal/transport.newHTTP2Client.func6
-----------+-------------------------------------------------------
         3   runtime.gopark
             runtime.netpollblock
             internal/poll.runtime_pollWait
             internal/poll.(*pollDesc).wait
             internal/poll.(*pollDesc).waitRead (inline)
             internal/poll.(*FD).ReadMsg
             net.(*netFD).readMsg
             net.(*UnixConn).readMsg
             net.(*UnixConn).ReadMsgUnix
             github.com/edwarnicke/grpcfd.(*connWrap).Read
             crypto/tls.(*atLeastReader).Read
             bytes.(*Buffer).ReadFrom
             crypto/tls.(*Conn).readFromUntil
             crypto/tls.(*Conn).readRecordOrCCS
             crypto/tls.(*Conn).readRecord (inline)
             crypto/tls.(*Conn).Read
             bufio.(*Reader).Read
             io.ReadAtLeast
             io.ReadFull (inline)
             golang.org/x/net/http2.readFrameHeader
             golang.org/x/net/http2.(*Framer).ReadFrame
             google.golang.org/grpc/internal/transport.(*http2Client).reader
-----------+-------------------------------------------------------
         2   runtime.gopark
             runtime.chanrecv
             runtime.chanrecv1
             github.com/networkservicemesh/sdk/pkg/networkservice/common/dial.(*dialer).Dial.func1
-----------+-------------------------------------------------------
         2   runtime.gopark
             runtime.selectgo
             context.(*cancelCtx).propagateCancel.func2
-----------+-------------------------------------------------------
         2   runtime.gopark
             runtime.selectgo
             google.golang.org/grpc/internal/transport.(*recvBufferReader).readClient
             google.golang.org/grpc/internal/transport.(*recvBufferReader).Read
             google.golang.org/grpc/internal/transport.(*transportReader).Read
             io.ReadAtLeast
             io.ReadFull (inline)
             google.golang.org/grpc/internal/transport.(*Stream).Read
             google.golang.org/grpc.(*parser).recvMsg
             google.golang.org/grpc.recvAndDecompress
             google.golang.org/grpc.recv
             google.golang.org/grpc.(*csAttempt).recvMsg
             google.golang.org/grpc.(*clientStream).RecvMsg.func1
             google.golang.org/grpc.(*clientStream).withRetry
             google.golang.org/grpc.(*clientStream).RecvMsg
             github.com/networkservicemesh/api/pkg/api/networkservice.(*monitorConnectionMonitorConnectionsClient).Recv
             github.com/networkservicemesh/sdk/pkg/networkservice/common/heal.(*clientFilter).Recv
             github.com/networkservicemesh/sdk/pkg/networkservice/common/heal.(*eventLoop).monitorCtrlPlane.func1
-----------+-------------------------------------------------------
         2   runtime.gopark
             runtime.selectgo
             google.golang.org/grpc/internal/transport.(*recvBufferReader).readClient
             google.golang.org/grpc/internal/transport.(*recvBufferReader).Read
             google.golang.org/grpc/internal/transport.(*transportReader).Read
             io.ReadAtLeast
             io.ReadFull (inline)
             google.golang.org/grpc/internal/transport.(*Stream).Read
             google.golang.org/grpc.(*parser).recvMsg
             google.golang.org/grpc.recvAndDecompress
             google.golang.org/grpc.recv
             google.golang.org/grpc.(*csAttempt).recvMsg
             google.golang.org/grpc.(*clientStream).RecvMsg.func1
             google.golang.org/grpc.(*clientStream).withRetry
             google.golang.org/grpc.(*clientStream).RecvMsg
             github.com/nordix/meridio/api/nsp/v1.(*configurationManagerWatchStreamClient).Recv
             github.com/nordix/meridio/pkg/ambassador/tap/conduit.(*configurationImpl).watchStreams.func1
             github.com/nordix/meridio/pkg/retry.Do
             github.com/nordix/meridio/pkg/ambassador/tap/conduit.(*configurationImpl).watchStreams
-----------+-------------------------------------------------------
         2   runtime.gopark
             runtime.selectgo
             google.golang.org/grpc/internal/transport.(*recvBufferReader).readClient
             google.golang.org/grpc/internal/transport.(*recvBufferReader).Read
             google.golang.org/grpc/internal/transport.(*transportReader).Read
             io.ReadAtLeast
             io.ReadFull (inline)
             google.golang.org/grpc/internal/transport.(*Stream).Read
             google.golang.org/grpc.(*parser).recvMsg
             google.golang.org/grpc.recvAndDecompress
             google.golang.org/grpc.recv
             google.golang.org/grpc.(*csAttempt).recvMsg
             google.golang.org/grpc.(*clientStream).RecvMsg.func1
             google.golang.org/grpc.(*clientStream).withRetry
             google.golang.org/grpc.(*clientStream).RecvMsg
             github.com/nordix/meridio/api/nsp/v1.(*configurationManagerWatchFlowClient).Recv
             github.com/nordix/meridio/pkg/ambassador/tap/trench.(*configurationImpl).watchVIPs.func1
             github.com/nordix/meridio/pkg/retry.Do
             github.com/nordix/meridio/pkg/ambassador/tap/trench.(*configurationImpl).watchVIPs
-----------+-------------------------------------------------------
         2   runtime.gopark
             runtime.selectgo
             github.com/networkservicemesh/sdk/pkg/networkservice/common/refresh.(*refreshClient).Request.func1
-----------+-------------------------------------------------------
         2   runtime.gopark
             runtime.selectgo
             github.com/networkservicemesh/sdk/pkg/networkservice/common/heal.(*eventLoop).waitForEvents
             github.com/networkservicemesh/sdk/pkg/networkservice/common/heal.(*eventLoop).eventLoop
-----------+-------------------------------------------------------
         2   runtime.gopark
             runtime.goparkunlock (inline)
             runtime.semacquire1
             sync.runtime_SemacquireMutex
             sync.(*Mutex).lockSlow
             sync.(*Mutex).Lock (inline)
             github.com/nordix/meridio/pkg/ambassador/tap/stream.(*Stream).Close
             github.com/nordix/meridio/pkg/ambassador/tap/conduit.(*streamRetry).Close
             github.com/nordix/meridio/pkg/ambassador/tap/conduit.(*streamManager).Stop.func1
-----------+-------------------------------------------------------
         2   runtime.gopark
             runtime.goparkunlock (inline)
             runtime.semacquire1
             sync.runtime_SemacquireMutex
             sync.(*Mutex).lockSlow
             sync.(*Mutex).Lock (inline)
             github.com/nordix/meridio/pkg/ambassador/tap/conduit.(*Conduit).GetIPs
             github.com/nordix/meridio/pkg/ambassador/tap/stream.(*Stream).Open
             github.com/nordix/meridio/pkg/ambassador/tap/conduit.(*streamRetry).Open.func1.1.1
             github.com/nordix/meridio/pkg/retry.Do
             github.com/nordix/meridio/pkg/ambassador/tap/conduit.(*streamRetry).Open.func1.1
             github.com/nordix/meridio/pkg/retry.Do
             github.com/nordix/meridio/pkg/ambassador/tap/conduit.(*streamRetry).Open.func1
-----------+-------------------------------------------------------
         2   runtime.gopark
             runtime.goparkunlock (inline)
             runtime.semacquire1
             sync.runtime_SemacquireMutex
             sync.(*Mutex).lockSlow
             sync.(*Mutex).Lock (inline)
             github.com/nordix/meridio/pkg/ambassador/tap.(*Tap).Close.func1
-----------+-------------------------------------------------------
         1   runtime.notetsleepg
             os/signal.signal_recv
             os/signal.loop
-----------+-------------------------------------------------------
         1   runtime.goroutineProfileWithLabels
             runtime/pprof.runtime_goroutineProfileWithLabels
             runtime/pprof.writeRuntimeProfile
             runtime/pprof.writeGoroutine
             runtime/pprof.(*Profile).WriteTo
             net/http/pprof.handler.ServeHTTP
             net/http/pprof.Index
             net/http.HandlerFunc.ServeHTTP
             net/http.(*ServeMux).ServeHTTP
             net/http.serverHandler.ServeHTTP
             net/http.(*conn).serve
-----------+-------------------------------------------------------
         1   runtime.gopark
             runtime.chanrecv
             runtime.chanrecv1
             github.com/nordix/meridio/pkg/health.(*Checker).Start.func2
-----------+-------------------------------------------------------
         1   runtime.gopark
             runtime.chanrecv
             runtime.chanrecv1
             main.main
             runtime.main
-----------+-------------------------------------------------------
         1   runtime.gopark
             runtime.netpollblock
             internal/poll.runtime_pollWait
             internal/poll.(*pollDesc).wait
             internal/poll.(*pollDesc).waitRead (inline)
             internal/poll.(*FD).Read
             net.(*netFD).Read
             net.(*conn).Read
             crypto/tls.(*atLeastReader).Read
             bytes.(*Buffer).ReadFrom
             crypto/tls.(*Conn).readFromUntil
             crypto/tls.(*Conn).readRecordOrCCS
             crypto/tls.(*Conn).readRecord (inline)
             crypto/tls.(*Conn).Read
             bufio.(*Reader).Read
             io.ReadAtLeast
             io.ReadFull (inline)
             golang.org/x/net/http2.readFrameHeader
             golang.org/x/net/http2.(*Framer).ReadFrame
             google.golang.org/grpc/internal/transport.(*http2Client).reader
-----------+-------------------------------------------------------
         1   runtime.gopark
             runtime.netpollblock
             internal/poll.runtime_pollWait
             internal/poll.(*pollDesc).wait
             internal/poll.(*pollDesc).waitRead (inline)
             internal/poll.(*FD).Read
             net.(*netFD).Read
             net.(*conn).Read
             bufio.(*Reader).Read
             io.ReadAtLeast
             io.ReadFull (inline)
             golang.org/x/net/http2.readFrameHeader
             golang.org/x/net/http2.(*Framer).ReadFrame
             google.golang.org/grpc/internal/transport.(*http2Client).reader
-----------+-------------------------------------------------------
         1   runtime.gopark
             runtime.netpollblock
             internal/poll.runtime_pollWait
             internal/poll.(*pollDesc).wait
             internal/poll.(*pollDesc).waitRead (inline)
             internal/poll.(*FD).Read
             net.(*netFD).Read
             net.(*conn).Read
             bufio.(*Reader).Read
             io.ReadAtLeast
             io.ReadFull (inline)
             golang.org/x/net/http2.readFrameHeader
             golang.org/x/net/http2.(*Framer).ReadFrame
             google.golang.org/grpc/internal/transport.(*http2Server).HandleStreams
             google.golang.org/grpc.(*Server).serveStreams
             google.golang.org/grpc.(*Server).handleRawConn.func1
-----------+-------------------------------------------------------
         1   runtime.gopark
             runtime.netpollblock
             internal/poll.runtime_pollWait
             internal/poll.(*pollDesc).wait
             internal/poll.(*pollDesc).waitRead (inline)
             internal/poll.(*FD).Read
             net.(*netFD).Read
             net.(*conn).Read
             net/http.(*connReader).backgroundRead
-----------+-------------------------------------------------------
         1   runtime.gopark
             runtime.netpollblock
             internal/poll.runtime_pollWait
             internal/poll.(*pollDesc).wait
             internal/poll.(*pollDesc).waitRead (inline)
             internal/poll.(*FD).Accept
             net.(*netFD).accept
             net.(*TCPListener).accept
             net.(*TCPListener).Accept
             net/http.(*Server).Serve
             net/http.(*Server).ListenAndServe
             net/http.ListenAndServe (inline)
             main.main.func1
-----------+-------------------------------------------------------
         1   runtime.gopark
             runtime.netpollblock
             internal/poll.runtime_pollWait
             internal/poll.(*pollDesc).wait
             internal/poll.(*pollDesc).waitRead (inline)
             internal/poll.(*FD).Accept
             net.(*netFD).accept
             net.(*UnixListener).accept
             net.(*UnixListener).Accept
             google.golang.org/grpc.(*Server).Serve
             github.com/nordix/meridio/pkg/health.(*Checker).Start
             github.com/nordix/meridio/pkg/health.CreateChecker.func1
-----------+-------------------------------------------------------
         1   runtime.gopark
             runtime.netpollblock
             internal/poll.runtime_pollWait
             internal/poll.(*pollDesc).wait
             internal/poll.(*pollDesc).waitRead (inline)
             internal/poll.(*FD).Accept
             net.(*netFD).accept
             net.(*UnixListener).accept
             net.(*UnixListener).Accept
             google.golang.org/grpc.(*Server).Serve
             main.main.func3
-----------+-------------------------------------------------------
         1   runtime.gopark
             runtime.selectgo
             os/signal.NotifyContext.func1
-----------+-------------------------------------------------------
         1   runtime.gopark
             runtime.selectgo
             google.golang.org/grpc/internal/transport.(*controlBuffer).get
             google.golang.org/grpc/internal/transport.(*loopyWriter).run
             google.golang.org/grpc/internal/transport.NewServerTransport.func2
-----------+-------------------------------------------------------
         1   runtime.gopark
             runtime.selectgo
             google.golang.org/grpc/internal/transport.(*http2Client).keepalive
-----------+-------------------------------------------------------
         1   runtime.gopark
             runtime.selectgo
             google.golang.org/grpc/internal/transport.(*http2Server).keepalive
-----------+-------------------------------------------------------
         1   runtime.gopark
             runtime.selectgo
             google.golang.org/grpc/internal/transport.(*recvBufferReader).readClient
             google.golang.org/grpc/internal/transport.(*recvBufferReader).Read
             google.golang.org/grpc/internal/transport.(*transportReader).Read
             io.ReadAtLeast
             io.ReadFull (inline)
             google.golang.org/grpc/internal/transport.(*Stream).Read
             google.golang.org/grpc.(*parser).recvMsg
             google.golang.org/grpc.recvAndDecompress
             google.golang.org/grpc.recv
             google.golang.org/grpc.(*csAttempt).recvMsg
             google.golang.org/grpc.(*clientStream).RecvMsg.func1
             google.golang.org/grpc.(*clientStream).withRetry
             google.golang.org/grpc.(*clientStream).RecvMsg
             github.com/networkservicemesh/api/pkg/api/networkservice.(*monitorConnectionMonitorConnectionsClient).Recv
             github.com/nordix/meridio/pkg/ambassador/tap/conduit.(*Conduit).monitorConnection.func1
             github.com/nordix/meridio/pkg/retry.Do
             github.com/nordix/meridio/pkg/ambassador/tap/conduit.(*Conduit).monitorConnection
-----------+-------------------------------------------------------
         1   runtime.gopark
             runtime.selectgo
             google.golang.org/grpc/internal/transport.(*recvBufferReader).readClient
             google.golang.org/grpc/internal/transport.(*recvBufferReader).Read
             google.golang.org/grpc/internal/transport.(*transportReader).Read
             io.ReadAtLeast
             io.ReadFull (inline)
             google.golang.org/grpc/internal/transport.(*Stream).Read
             google.golang.org/grpc.(*parser).recvMsg
             google.golang.org/grpc.recvAndDecompress
             google.golang.org/grpc.recv
             google.golang.org/grpc.(*csAttempt).recvMsg
             google.golang.org/grpc.(*clientStream).RecvMsg.func1
             google.golang.org/grpc.(*clientStream).withRetry
             google.golang.org/grpc.(*clientStream).RecvMsg
             github.com/spiffe/go-spiffe/v2/proto/spiffe/workload.(*spiffeWorkloadAPIFetchX509SVIDClient).Recv
             github.com/spiffe/go-spiffe/v2/workloadapi.(*Client).watchX509Context
             github.com/spiffe/go-spiffe/v2/workloadapi.(*Client).WatchX509Context
             github.com/spiffe/go-spiffe/v2/workloadapi.newWatcher.func4
-----------+-------------------------------------------------------
         1   runtime.gopark
             runtime.selectgo
             github.com/nordix/meridio/pkg/retry.delay.func1
-----------+-------------------------------------------------------
         1   runtime.gopark
             runtime.selectgo
             github.com/nordix/meridio/pkg/retry.Do
             github.com/nordix/meridio/pkg/ambassador/tap/conduit.(*streamRetry).Open.func1.1
             github.com/nordix/meridio/pkg/retry.Do
             github.com/nordix/meridio/pkg/ambassador/tap/conduit.(*streamRetry).Open.func1
-----------+-------------------------------------------------------
         1   runtime.gopark
             runtime.selectgo
             github.com/nordix/meridio/pkg/ambassador/tap/conduit.(*configurationImpl).streamHandler
-----------+-------------------------------------------------------
         1   runtime.gopark
             runtime.selectgo
             github.com/nordix/meridio/pkg/ambassador/tap/trench.(*configurationImpl).vipHandler
-----------+-------------------------------------------------------
         1   runtime.gopark
             runtime.selectgo
             github.com/nordix/meridio/pkg/health/probe.CreateAndRunGRPCHealthProbe.func2
-----------+-------------------------------------------------------
         1   runtime.gopark
             runtime.goparkunlock (inline)
             runtime.semacquire1
             sync.runtime_Semacquire
             sync.(*WaitGroup).Wait
             github.com/nordix/meridio/pkg/ambassador/tap/conduit.(*streamManager).Stop
             github.com/nordix/meridio/pkg/ambassador/tap/conduit.(*Conduit).monitorConnection.func1
             github.com/nordix/meridio/pkg/retry.Do
             github.com/nordix/meridio/pkg/ambassador/tap/conduit.(*Conduit).monitorConnection
-----------+-------------------------------------------------------
         1   runtime.gopark
             runtime.goparkunlock (inline)
             runtime.semacquire1
             sync.runtime_SemacquireMutex
             sync.(*Mutex).lockSlow
             sync.(*Mutex).Lock (inline)
             github.com/nordix/meridio/pkg/ambassador/tap/conduit.(*Conduit).SetVIPs
             github.com/nordix/meridio/pkg/ambassador/tap/trench.(*configurationImpl).vipHandler.func1
             github.com/nordix/meridio/pkg/retry.Do
             github.com/nordix/meridio/pkg/ambassador/tap/trench.(*configurationImpl).vipHandler
-----------+-------------------------------------------------------
         1   runtime.gopark
             runtime.goparkunlock (inline)
             runtime.semacquire1
             sync.runtime_SemacquireMutex
             sync.(*Mutex).lockSlow
             sync.(*Mutex).Lock (inline)
             github.com/nordix/meridio/pkg/ambassador/tap/conduit.(*streamManager).RemoveStream
             github.com/nordix/meridio/pkg/ambassador/tap/conduit.(*Conduit).RemoveStream
             github.com/nordix/meridio/pkg/ambassador/tap.(*Tap).Close.func1
-----------+-------------------------------------------------------
         1   runtime.gopark
             runtime.goparkunlock (inline)
             runtime.semacquire1
             sync.runtime_SemacquireMutex
             sync.(*Mutex).lockSlow
             sync.(*Mutex).Lock (inline)
             github.com/nordix/meridio/pkg/ambassador/tap/conduit.(*streamManager).SetStreams
             github.com/nordix/meridio/pkg/ambassador/tap/conduit.(*configurationImpl).streamHandler
-----------+-------------------------------------------------------
         1   runtime.gopark
             runtime.goparkunlock (inline)
             runtime.semacquire1
             sync.runtime_SemacquireMutex
             sync.(*Mutex).lockSlow
             sync.(*Mutex).Lock (inline)
             github.com/nordix/meridio/pkg/ambassador/tap.(*Tap).Open
             github.com/nordix/meridio/api/ambassador/v1._Tap_Open_Handler
             google.golang.org/grpc.(*Server).processUnaryRPC
             google.golang.org/grpc.(*Server).handleStream
             google.golang.org/grpc.(*Server).serveStreams.func2.1
-----------+-------------------------------------------------------
(pprof) 

Metadata

Metadata

Assignees

No one assigned

    Type

    No fields configured for Bug.

    Projects

    Status
    No status

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions