diff --git a/pkg/transport/session/manager.go b/pkg/transport/session/manager.go index 79bfce34b4..38a0a6faf6 100644 --- a/pkg/transport/session/manager.go +++ b/pkg/transport/session/manager.go @@ -219,6 +219,13 @@ func (m *Manager) Stop() error { return nil } +// SupportsRange reports whether the current storage backend supports Range(). +// Returns true only for LocalStorage; distributed backends (Redis, Valkey) return false. +func (m *Manager) SupportsRange() bool { + _, ok := m.storage.(*LocalStorage) + return ok +} + // Range calls f sequentially for each key and value present in the map. // If f returns false, range stops the iteration. // diff --git a/pkg/vmcp/server/server.go b/pkg/vmcp/server/server.go index b793534595..0a3101cfdc 100644 --- a/pkg/vmcp/server/server.go +++ b/pkg/vmcp/server/server.go @@ -67,6 +67,10 @@ const ( // defaultSessionTTL is the default session time-to-live duration. // Sessions that are inactive for this duration will be automatically cleaned up. defaultSessionTTL = 30 * time.Minute + + // defaultSessionLoggingInterval is how often active sessions are logged when + // SessionManagementV2 is enabled. + defaultSessionLoggingInterval = 1 * time.Minute ) //go:generate mockgen -destination=mocks/mock_watcher.go -package=mocks -source=server.go Watcher @@ -667,6 +671,17 @@ func (s *Server) Start(ctx context.Context) error { } } + // Start periodic session logging if SessionManagementV2 is enabled + if s.vmcpSessionMgr != nil { + sessionLoggingCtx, sessionLoggingCancel := context.WithCancel(ctx) + s.vmcpSessionMgr.StartPeriodicLogging(sessionLoggingCtx, defaultSessionLoggingInterval) + slog.Info("session periodic logging started", "interval", defaultSessionLoggingInterval) + s.shutdownFuncs = append(s.shutdownFuncs, func(context.Context) error { + sessionLoggingCancel() + return nil + }) + } + // Start status reporter if configured if s.statusReporter != nil { shutdown, err := s.statusReporter.Start(ctx) diff --git a/pkg/vmcp/server/session_manager_interface.go b/pkg/vmcp/server/session_manager_interface.go index ee72f72648..9d5c80e2f9 100644 --- a/pkg/vmcp/server/session_manager_interface.go +++ b/pkg/vmcp/server/session_manager_interface.go @@ -5,6 +5,7 @@ package server import ( "context" + "time" mcpserver "github.com/mark3labs/mcp-go/server" @@ -30,4 +31,9 @@ type SessionManager interface { // handlers. This enables session-scoped routing: each tool call goes through the // session's backend connections rather than the global router. GetAdaptedTools(sessionID string) ([]mcpserver.ServerTool, error) + + // StartPeriodicLogging starts a background goroutine that periodically logs + // the active session count and per-backend session counts at the given interval. + // The goroutine stops when ctx is cancelled. + StartPeriodicLogging(ctx context.Context, interval time.Duration) } diff --git a/pkg/vmcp/server/sessionmanager/session_manager.go b/pkg/vmcp/server/sessionmanager/session_manager.go index f41ec991f0..b1d9e74708 100644 --- a/pkg/vmcp/server/sessionmanager/session_manager.go +++ b/pkg/vmcp/server/sessionmanager/session_manager.go @@ -18,6 +18,8 @@ import ( "errors" "fmt" "log/slog" + "sync" + "time" "github.com/google/uuid" "github.com/mark3labs/mcp-go/mcp" @@ -68,6 +70,9 @@ type Manager struct { storage *transportsession.Manager factory vmcpsession.MultiSessionFactory backendRegistry vmcp.BackendRegistry + + // storageWarningOnce ensures we only log the distributed storage warning once + storageWarningOnce sync.Once } // New creates a Manager backed by the given transport @@ -203,9 +208,14 @@ func (sm *Manager) CreateSession( return nil, fmt.Errorf("Manager.CreateSession: failed to replace placeholder: %w", err) } - slog.Debug("Manager: created multi-session", + backendNames := make([]string, len(backends)) + for i, b := range backends { + backendNames[i] = b.Name + } + slog.Info("Manager: session created", "session_id", sessionID, - "backend_count", len(backends)) + "backend_count", len(backends), + "backends", backendNames) return sess, nil } @@ -268,6 +278,7 @@ func (sm *Manager) Terminate(sessionID string) (isNotAllowed bool, err error) { // If the session is a fully-formed MultiSession, close its backend connections. if multiSess, ok := sess.(vmcpsession.MultiSession); ok { + backendSessions := multiSess.BackendSessions() if closeErr := multiSess.Close(); closeErr != nil { slog.Warn("Manager.Terminate: error closing multi-session backend connections", "session_id", sessionID, "error", closeErr) @@ -276,37 +287,39 @@ func (sm *Manager) Terminate(sessionID string) (isNotAllowed bool, err error) { if deleteErr := sm.storage.Delete(sessionID); deleteErr != nil { return false, fmt.Errorf("Manager.Terminate: failed to delete session from storage: %w", deleteErr) } - } else { - // Placeholder session (not yet upgraded to MultiSession). - // - // This handles the race condition where a client sends DELETE between - // Generate() (Phase 1) and CreateSession() (Phase 2). The two-phase - // pattern creates a window where the session exists as a placeholder: - // - // 1. Client sends initialize → Generate() creates placeholder - // 2. Client sends DELETE before OnRegisterSession hook fires - // 3. We mark the placeholder as terminated (don't delete it) - // 4. CreateSession() hook fires → sees terminated flag → fails fast - // - // Without this branch, CreateSession() would open backend HTTP connections - // for a session the client already terminated, silently resurrecting it. - // - // We mark (not delete) so Validate() can return isTerminated=true, which - // lets the SDK distinguish "actively terminated" from "never existed". - // TTL cleanup will remove the placeholder later. - sess.SetMetadata(MetadataKeyTerminated, MetadataValTrue) - if replaceErr := sm.storage.UpsertSession(sess); replaceErr != nil { - slog.Warn("Manager.Terminate: failed to persist terminated flag for placeholder; attempting delete fallback", - "session_id", sessionID, "error", replaceErr) - if deleteErr := sm.storage.Delete(sessionID); deleteErr != nil { - return false, fmt.Errorf( - "Manager.Terminate: failed to persist terminated flag and delete placeholder: upsertErr=%v, deleteErr=%w", - replaceErr, deleteErr) - } + slog.Info("Manager: session terminated", "session_id", sessionID, "backend_count", len(backendSessions)) + return false, nil + } + + // Placeholder session (not yet upgraded to MultiSession). + // + // This handles the race condition where a client sends DELETE between + // Generate() (Phase 1) and CreateSession() (Phase 2). The two-phase + // pattern creates a window where the session exists as a placeholder: + // + // 1. Client sends initialize → Generate() creates placeholder + // 2. Client sends DELETE before OnRegisterSession hook fires + // 3. We mark the placeholder as terminated (don't delete it) + // 4. CreateSession() hook fires → sees terminated flag → fails fast + // + // Without this branch, CreateSession() would open backend HTTP connections + // for a session the client already terminated, silently resurrecting it. + // + // We mark (not delete) so Validate() can return isTerminated=true, which + // lets the SDK distinguish "actively terminated" from "never existed". + // TTL cleanup will remove the placeholder later. + sess.SetMetadata(MetadataKeyTerminated, MetadataValTrue) + if replaceErr := sm.storage.UpsertSession(sess); replaceErr != nil { + slog.Warn("Manager.Terminate: failed to persist terminated flag for placeholder; attempting delete fallback", + "session_id", sessionID, "error", replaceErr) + if deleteErr := sm.storage.Delete(sessionID); deleteErr != nil { + return false, fmt.Errorf( + "Manager.Terminate: failed to persist terminated flag and delete placeholder: upsertErr=%v, deleteErr=%w", + replaceErr, deleteErr) } } - slog.Info("Manager.Terminate: session terminated", "session_id", sessionID) + slog.Info("Manager: placeholder session terminated", "session_id", sessionID) return false, nil } @@ -404,3 +417,67 @@ func (sm *Manager) GetAdaptedTools(sessionID string) ([]mcpserver.ServerTool, er return sdkTools, nil } + +// StartPeriodicLogging starts a background goroutine that logs active session +// counts at the given interval until ctx is cancelled. +// +// This provides operational visibility into session load without exposing +// session identifiers over the network. +func (sm *Manager) StartPeriodicLogging(ctx context.Context, interval time.Duration) { + if interval <= 0 { + slog.Warn("Manager: StartPeriodicLogging called with non-positive interval, defaulting to 1m", "interval", interval) + interval = time.Minute + } + go func() { + ticker := time.NewTicker(interval) + defer ticker.Stop() + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + sm.logActiveSessions() + } + } + }() +} + +// logActiveSessions logs a summary of all active vMCP sessions on this node. +// +// Storage Backend Limitation: Range() only works with LocalStorage. With +// distributed backends (Redis, Valkey) it is a no-op and 0 sessions will be +// reported even if sessions exist in the cluster. This is expected: sessions +// hold in-process HTTP connections and are inherently node-local. +func (sm *Manager) logActiveSessions() { + if !sm.storage.SupportsRange() { + sm.storageWarningOnce.Do(func() { + slog.Warn("Manager: session enumeration unavailable with current storage backend", + "reason", "Range() only works with LocalStorage; distributed storage (Redis, Valkey) cannot enumerate sessions", + "note", "Sessions are node-local; this is expected behavior") + }) + return + } + + activeCount := 0 + backendSessionCounts := make(map[string]int) // backend ID -> session count + + sm.storage.Range(func(key, value interface{}) bool { + _, ok := key.(string) + if !ok { + return true + } + + multiSess, ok := value.(vmcpsession.MultiSession) + if !ok { + return true // placeholder, skip + } + + activeCount++ + for backendID := range multiSess.BackendSessions() { + backendSessionCounts[backendID]++ + } + return true + }) + + slog.Info("Manager: active sessions", "total_sessions", activeCount, "backend_session_counts", backendSessionCounts) +} diff --git a/pkg/vmcp/server/status_test.go b/pkg/vmcp/server/status_test.go index 7dcf7021b7..b5d78aabe6 100644 --- a/pkg/vmcp/server/status_test.go +++ b/pkg/vmcp/server/status_test.go @@ -241,3 +241,52 @@ func TestStatusEndpoint_BackendFieldMapping(t *testing.T) { assert.Equal(t, "streamable-http", b.Transport) assert.Equal(t, authtypes.StrategyTypeTokenExchange, b.AuthType) } + +func TestStatusEndpoint_NoCredentialsExposed(t *testing.T) { + t.Parallel() + + backends := []vmcp.Backend{{ + ID: "b1", + Name: "secure-backend", + BaseURL: "https://secret-internal-url.example.com:9090/mcp", + HealthStatus: vmcp.BackendHealthy, + AuthConfig: &authtypes.BackendAuthStrategy{ + Type: authtypes.StrategyTypeTokenExchange, + }, + }} + srv := createTestServerWithBackends(t, backends, "") + + resp, err := http.Get("http://" + srv.Address() + "/status") + require.NoError(t, err) + defer resp.Body.Close() + require.Equal(t, http.StatusOK, resp.StatusCode) + + // Read raw response body to check what's exposed + var rawResponse map[string]interface{} + require.NoError(t, json.NewDecoder(resp.Body).Decode(&rawResponse)) + + // Verify no sensitive fields are exposed + respBackends, ok := rawResponse["backends"].([]interface{}) + require.True(t, ok) + require.Len(t, respBackends, 1) + + backend, ok := respBackends[0].(map[string]interface{}) + require.True(t, ok, "expected backends[0] to be a JSON object") + + // Should NOT expose: BaseURL, credentials, tokens, internal URLs + _, hasBaseURL := backend["base_url"] + _, hasURL := backend["url"] + _, hasToken := backend["token"] + _, hasCredentials := backend["credentials"] + + assert.False(t, hasBaseURL, "BaseURL should not be exposed") + assert.False(t, hasURL, "URL should not be exposed") + assert.False(t, hasToken, "Token should not be exposed") + assert.False(t, hasCredentials, "Credentials should not be exposed") + + // Should expose: Name, Health, Transport, AuthType (safe metadata) + assert.Contains(t, backend, "name") + assert.Contains(t, backend, "health") + assert.Contains(t, backend, "transport") + assert.Contains(t, backend, "auth_type") +}