From bf7b490d41e30b47096f21d963568b519c9c3233 Mon Sep 17 00:00:00 2001 From: taskbot Date: Thu, 5 Mar 2026 16:27:13 +0100 Subject: [PATCH 1/2] feat(vmcp): expose active backend sessions on /status endpoint Add session visibility to the /status endpoint for operational debugging and compliance auditing when sessionManagementV2 is enabled. Closes: #3876 --- pkg/vmcp/server/session_manager_interface.go | 6 + .../sessionmanager/list_sessions_test.go | 261 ++++++++++++++++++ .../server/sessionmanager/session_manager.go | 106 +++++++ pkg/vmcp/server/status.go | 32 ++- pkg/vmcp/server/status_test.go | 208 ++++++++++++++ 5 files changed, 612 insertions(+), 1 deletion(-) create mode 100644 pkg/vmcp/server/sessionmanager/list_sessions_test.go diff --git a/pkg/vmcp/server/session_manager_interface.go b/pkg/vmcp/server/session_manager_interface.go index ee72f72648..5646e3597c 100644 --- a/pkg/vmcp/server/session_manager_interface.go +++ b/pkg/vmcp/server/session_manager_interface.go @@ -8,6 +8,7 @@ import ( mcpserver "github.com/mark3labs/mcp-go/server" + "github.com/stacklok/toolhive/pkg/vmcp/server/sessionmanager" vmcpsession "github.com/stacklok/toolhive/pkg/vmcp/session" ) @@ -30,4 +31,9 @@ type SessionManager interface { // handlers. This enables session-scoped routing: each tool call goes through the // session's backend connections rather than the global router. GetAdaptedTools(sessionID string) ([]mcpserver.ServerTool, error) + + // ListActiveSessions returns backend-centric usage statistics for all active vMCP sessions. + // Used by the /status endpoint to expose operational visibility without leaking session identifiers. + // Returns (activeCount, backendUsage map). Returns (0, empty map) if no sessions exist. + ListActiveSessions() (int, map[string]sessionmanager.BackendUsage) } diff --git a/pkg/vmcp/server/sessionmanager/list_sessions_test.go b/pkg/vmcp/server/sessionmanager/list_sessions_test.go new file mode 100644 index 0000000000..192686f7b6 --- /dev/null +++ b/pkg/vmcp/server/sessionmanager/list_sessions_test.go @@ -0,0 +1,261 @@ +// SPDX-FileCopyrightText: Copyright 2025 Stacklok, Inc. +// SPDX-License-Identifier: Apache-2.0 + +package sessionmanager_test + +import ( + "context" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/stacklok/toolhive/pkg/auth" + transportsession "github.com/stacklok/toolhive/pkg/transport/session" + "github.com/stacklok/toolhive/pkg/vmcp" + vmcpauth "github.com/stacklok/toolhive/pkg/vmcp/auth" + "github.com/stacklok/toolhive/pkg/vmcp/auth/strategies" + authtypes "github.com/stacklok/toolhive/pkg/vmcp/auth/types" + "github.com/stacklok/toolhive/pkg/vmcp/server/sessionmanager" + vmcpsession "github.com/stacklok/toolhive/pkg/vmcp/session" +) + +// createTestFactory creates a MultiSessionFactory for testing +func createTestFactory(t *testing.T) vmcpsession.MultiSessionFactory { + t.Helper() + + authReg := vmcpauth.NewDefaultOutgoingAuthRegistry() + require.NoError(t, authReg.RegisterStrategy( + authtypes.StrategyTypeUnauthenticated, + strategies.NewUnauthenticatedStrategy(), + )) + + return vmcpsession.NewSessionFactory(authReg) +} + +func TestManager_ListActiveSessions_EmptyStore(t *testing.T) { + t.Parallel() + + // Create a manager with empty session store + storage := transportsession.NewTypedManager(30*time.Minute, transportsession.SessionTypeMCP) + defer storage.Stop() + + factory := createTestFactory(t) + mgr := sessionmanager.New(storage, factory, vmcp.NewImmutableRegistry([]vmcp.Backend{})) + + // List sessions from empty store + activeCount, backendUsage := mgr.ListActiveSessions() + + assert.Equal(t, 0, activeCount, "Should have 0 active sessions") + assert.NotNil(t, backendUsage, "Should return non-nil map") + assert.Empty(t, backendUsage, "Should return empty map for empty store") +} + +func TestManager_ListActiveSessions_WithPlaceholderSessions(t *testing.T) { + t.Parallel() + + // Create a manager with session store + storage := transportsession.NewTypedManager(30*time.Minute, transportsession.SessionTypeMCP) + defer storage.Stop() + + factory := createTestFactory(t) + mgr := sessionmanager.New(storage, factory, vmcp.NewImmutableRegistry([]vmcp.Backend{})) + + // Generate some placeholder sessions (Phase 1 only, not fully initialized) + sessionID1 := mgr.Generate() + sessionID2 := mgr.Generate() + + assert.NotEmpty(t, sessionID1) + assert.NotEmpty(t, sessionID2) + + // List sessions - placeholders should be skipped + activeCount, backendUsage := mgr.ListActiveSessions() + + // Placeholder sessions (not yet MultiSession) should not be included + assert.Equal(t, 0, activeCount, "Placeholder sessions should not be counted") + assert.Empty(t, backendUsage, "Placeholder sessions should not contribute to backend usage") +} + +func TestManager_ListActiveSessions_ReturnsEmptyMapNotNil(t *testing.T) { + t.Parallel() + + storage := transportsession.NewTypedManager(30*time.Minute, transportsession.SessionTypeMCP) + defer storage.Stop() + + factory := createTestFactory(t) + mgr := sessionmanager.New(storage, factory, vmcp.NewImmutableRegistry([]vmcp.Backend{})) + + // Even with no sessions, should return non-nil empty map + activeCount, backendUsage := mgr.ListActiveSessions() + + assert.Equal(t, 0, activeCount, "Should have 0 active sessions") + assert.NotNil(t, backendUsage, "Should return non-nil map") + assert.Len(t, backendUsage, 0, "Should be empty") +} + +// TestManager_ListActiveSessions_SkipsNonMultiSession verifies that +// only MultiSession instances are included in the results. +func TestManager_ListActiveSessions_SkipsNonMultiSession(t *testing.T) { + t.Parallel() + + storage := transportsession.NewTypedManager(30*time.Minute, transportsession.SessionTypeMCP) + defer storage.Stop() + + factory := createTestFactory(t) + mgr := sessionmanager.New(storage, factory, vmcp.NewImmutableRegistry([]vmcp.Backend{})) + + // Generate placeholders - these are ProxySession, not MultiSession + _ = mgr.Generate() + _ = mgr.Generate() + + // Also manually add a non-MultiSession session to storage + plainSession := transportsession.NewProxySession("plain-session") + err := storage.AddSession(plainSession) + require.NoError(t, err) + + // List should skip all non-MultiSession types + activeCount, backendUsage := mgr.ListActiveSessions() + + assert.Equal(t, 0, activeCount, "Should skip non-MultiSession instances") + assert.Empty(t, backendUsage, "Should have no backend usage from non-MultiSession") +} + +// TestManager_ListActiveSessions_BackendUsageStructure verifies the +// structure of BackendUsage returned by ListActiveSessions. +func TestManager_ListActiveSessions_BackendUsageStructure(t *testing.T) { + t.Parallel() + + // This test verifies that BackendUsage has the expected fields by + // creating an instance and checking it can be constructed. + usage := sessionmanager.BackendUsage{ + SessionCount: 5, + HealthyCount: 4, + FailedCount: 1, + } + + assert.Equal(t, 5, usage.SessionCount) + assert.Equal(t, 4, usage.HealthyCount) + assert.Equal(t, 1, usage.FailedCount) +} + +// mockMultiSession is a minimal mock that implements vmcpsession.MultiSession +// for testing purposes. It embeds a real ProxySession for base functionality +// and adds the MultiSession-specific methods. +type mockMultiSession struct { + *transportsession.ProxySession + backendSessions map[string]string + tools []vmcp.Tool + resources []vmcp.Resource + prompts []vmcp.Prompt +} + +func newMockMultiSession(id string, backendSessions map[string]string) *mockMultiSession { + return &mockMultiSession{ + ProxySession: transportsession.NewProxySession(id), + backendSessions: backendSessions, + tools: []vmcp.Tool{}, + resources: []vmcp.Resource{}, + prompts: []vmcp.Prompt{}, + } +} + +func (m *mockMultiSession) BackendSessions() map[string]string { + return m.backendSessions +} + +func (m *mockMultiSession) Tools() []vmcp.Tool { + return m.tools +} + +func (m *mockMultiSession) Resources() []vmcp.Resource { + return m.resources +} + +func (m *mockMultiSession) Prompts() []vmcp.Prompt { + return m.prompts +} + +// Implement Caller interface methods (required but not used in these tests) +func (*mockMultiSession) CallTool(_ context.Context, _ *auth.Identity, _ string, _, _ map[string]any) (*vmcp.ToolCallResult, error) { + return nil, nil +} + +func (*mockMultiSession) ReadResource(_ context.Context, _ *auth.Identity, _ string) (*vmcp.ResourceReadResult, error) { + return nil, nil +} + +func (*mockMultiSession) GetPrompt(_ context.Context, _ *auth.Identity, _ string, _ map[string]any) (*vmcp.PromptGetResult, error) { + return nil, nil +} + +func (*mockMultiSession) Close() error { + return nil +} + +// Compile-time check that mockMultiSession implements MultiSession +var _ vmcpsession.MultiSession = (*mockMultiSession)(nil) + +// TestManager_ListActiveSessions_WithMockMultiSession tests the iteration +// logic and backend aggregation by manually storing mock MultiSession instances. +func TestManager_ListActiveSessions_WithMockMultiSession(t *testing.T) { + t.Parallel() + + storage := transportsession.NewTypedManager(30*time.Minute, transportsession.SessionTypeMCP) + defer storage.Stop() + + factory := createTestFactory(t) + mgr := sessionmanager.New(storage, factory, vmcp.NewImmutableRegistry([]vmcp.Backend{})) + + // Manually add mock MultiSessions to storage + session1 := newMockMultiSession("session-1", map[string]string{ + "backend1": "backend1-session-id", + "backend2": "backend2-session-id", + }) + session2 := newMockMultiSession("session-2", map[string]string{ + "backend1": "backend1-session-id-2", // backend1 used by 2 sessions + "backend3": "backend3-session-id", + }) + + err := storage.AddSession(session1) + require.NoError(t, err) + err = storage.AddSession(session2) + require.NoError(t, err) + + // Also add a placeholder to verify it's skipped + placeholder := transportsession.NewProxySession("placeholder-session") + err = storage.AddSession(placeholder) + require.NoError(t, err) + + // Verify storage has the sessions + t.Logf("Storage count before listing: %d", storage.Count()) + + // List sessions + activeCount, backendUsage := mgr.ListActiveSessions() + + // Should have 2 active sessions (placeholder should be skipped) + t.Logf("ListActiveSessions returned %d sessions, %d backends", activeCount, len(backendUsage)) + assert.Equal(t, 2, activeCount, "Should have 2 active MultiSession instances") + assert.Len(t, backendUsage, 3, "Should have 3 unique backends") + + // Verify backend1 (used by 2 sessions) + backend1Usage, ok := backendUsage["backend1"] + require.True(t, ok, "Should include backend1") + assert.Equal(t, 2, backend1Usage.SessionCount, "backend1 should be used by 2 sessions") + assert.Equal(t, 2, backend1Usage.HealthyCount, "backend1 should have 2 healthy connections") + assert.Equal(t, 0, backend1Usage.FailedCount, "backend1 should have 0 failed connections") + + // Verify backend2 (used by 1 session) + backend2Usage, ok := backendUsage["backend2"] + require.True(t, ok, "Should include backend2") + assert.Equal(t, 1, backend2Usage.SessionCount, "backend2 should be used by 1 session") + assert.Equal(t, 1, backend2Usage.HealthyCount, "backend2 should have 1 healthy connection") + assert.Equal(t, 0, backend2Usage.FailedCount, "backend2 should have 0 failed connections") + + // Verify backend3 (used by 1 session) + backend3Usage, ok := backendUsage["backend3"] + require.True(t, ok, "Should include backend3") + assert.Equal(t, 1, backend3Usage.SessionCount, "backend3 should be used by 1 session") + assert.Equal(t, 1, backend3Usage.HealthyCount, "backend3 should have 1 healthy connection") + assert.Equal(t, 0, backend3Usage.FailedCount, "backend3 should have 0 failed connections") +} diff --git a/pkg/vmcp/server/sessionmanager/session_manager.go b/pkg/vmcp/server/sessionmanager/session_manager.go index f41ec991f0..072aa10bee 100644 --- a/pkg/vmcp/server/sessionmanager/session_manager.go +++ b/pkg/vmcp/server/sessionmanager/session_manager.go @@ -18,6 +18,7 @@ import ( "errors" "fmt" "log/slog" + "sync" "github.com/google/uuid" "github.com/mark3labs/mcp-go/mcp" @@ -41,6 +42,15 @@ const ( MetadataValTrue = "true" ) +// BackendUsage tracks session usage statistics for a specific backend. +// This provides operational visibility into backend load and health without +// exposing session identifiers that could be used for session hijacking. +type BackendUsage struct { + SessionCount int `json:"session_count"` // Number of sessions using this backend + HealthyCount int `json:"healthy_count"` // Number of sessions with healthy connections + FailedCount int `json:"failed_count"` // Number of sessions with failed connections +} + // Manager bridges the domain session lifecycle (MultiSession / MultiSessionFactory) // to the mark3labs SDK's SessionIdManager interface. // @@ -68,6 +78,9 @@ type Manager struct { storage *transportsession.Manager factory vmcpsession.MultiSessionFactory backendRegistry vmcp.BackendRegistry + + // storageWarningOnce ensures we only log the distributed storage warning once + storageWarningOnce sync.Once } // New creates a Manager backed by the given transport @@ -404,3 +417,96 @@ func (sm *Manager) GetAdaptedTools(sessionID string) ([]mcpserver.ServerTool, er return sdkTools, nil } + +// ListActiveSessions returns backend-centric usage statistics for all active vMCP sessions. +// Used by the /status endpoint to expose operational visibility without leaking session identifiers. +// +// Security Note: This method intentionally does NOT expose session IDs or backend session IDs, +// as these could be used as bearer tokens for session hijacking attacks via the Mcp-Session-Id +// header. Instead, it aggregates session data by backend to provide operational insights +// (load distribution, health status) without security risk. +// +// Storage Backend Limitation: This method uses transportsession.Manager.Range(), which only +// works with LocalStorage. With distributed storage backends (Redis, Valkey), Range is a no-op +// and this method will return (0, empty map). This is acceptable because MultiSessions are +// node-local (they hold in-process HTTP connections and routing state that cannot be serialized). +// Even with distributed storage, a session can only be served by the process that created it, +// so reporting only local sessions provides accurate operational visibility for this node. +// +// Returns: +// - activeCount: Total number of active sessions on this node +// - backendUsage: Map of backend_name -> usage statistics (session count, healthy, failed) +// +// Returns (0, empty map) if no sessions exist, all sessions are placeholders, or using +// non-local storage backend. +func (sm *Manager) ListActiveSessions() (int, map[string]BackendUsage) { + backendUsage := make(map[string]BackendUsage) + activeCount := 0 + rangeExecuted := false + + // Iterate over all sessions in storage using the Range() helper. + // Note: Range only works with LocalStorage backend. With distributed storage + // (Redis, Valkey), this is a no-op and returns empty results. This is expected + // because sessions are node-local and cannot be transferred between processes. + sm.storage.Range(func(key, value interface{}) bool { + rangeExecuted = true + sessionID, ok := key.(string) + if !ok { + slog.Warn("Manager.ListActiveSessions: non-string session key", "key", key) + return true // continue iteration + } + + // Type assert to MultiSession to access session-specific methods + multiSess, ok := value.(vmcpsession.MultiSession) + if !ok { + // Session is a placeholder (not yet fully initialized) or different type + slog.Debug("Manager.ListActiveSessions: skipping non-MultiSession", + "session_id", sessionID, + "type", fmt.Sprintf("%T", value)) + return true // continue iteration + } + + activeCount++ + + // Get backend session mappings (backendID -> backendSessionID) + backendSessions := multiSess.BackendSessions() + + // Aggregate statistics by backend name. + // + // Limitation: Currently all backends with sessions are counted as healthy. + // This doesn't account for partial failures where some backends failed during + // session initialization. To properly track failed backends, we would need: + // 1. MultiSession to expose which backends were attempted vs succeeded + // 2. Session metadata storing the original backend list at creation time + // 3. Comparison between attempted and successful backends to compute failures + // + // For now, BackendSessions() only returns successfully connected backends, + // so we cannot distinguish between "backend not attempted" and "backend failed". + // This means FailedCount will always be 0 until proper failure tracking is added. + for backendID := range backendSessions { + usage := backendUsage[backendID] + usage.SessionCount++ + usage.HealthyCount++ // Only successfully connected backends appear in BackendSessions() + // usage.FailedCount stays 0 - we don't currently track which backends failed + backendUsage[backendID] = usage + } + + return true // continue iteration + }) + + // Log a one-time warning if Range was never executed (distributed storage) + if !rangeExecuted && activeCount == 0 { + sm.storageWarningOnce.Do(func() { + slog.Warn("Session enumeration not supported with current storage backend", + "reason", "Range() only works with LocalStorage; distributed storage (Redis, Valkey) cannot enumerate sessions", + "impact", "/status endpoint will show 0 sessions even if sessions exist in the cluster", + "note", "Sessions are node-local - each node can only serve sessions it created; this is expected behavior") + }) + } + + slog.Debug("Manager.ListActiveSessions: completed", + "total_sessions", activeCount, + "backends", len(backendUsage)) + + return activeCount, backendUsage +} diff --git a/pkg/vmcp/server/status.go b/pkg/vmcp/server/status.go index 484ee5bd88..bae1a2a2ff 100644 --- a/pkg/vmcp/server/status.go +++ b/pkg/vmcp/server/status.go @@ -12,16 +12,28 @@ import ( "github.com/stacklok/toolhive/pkg/versions" "github.com/stacklok/toolhive/pkg/vmcp" authtypes "github.com/stacklok/toolhive/pkg/vmcp/auth/types" + "github.com/stacklok/toolhive/pkg/vmcp/server/sessionmanager" ) // StatusResponse represents the vMCP server's operational status. type StatusResponse struct { Backends []BackendStatus `json:"backends"` + Sessions *SessionsStatus `json:"sessions,omitempty"` // Only present when sessionManagementV2 enabled Healthy bool `json:"healthy"` Version string `json:"version"` GroupRef string `json:"group_ref"` } +// SessionsStatus contains information about active vMCP sessions. +type SessionsStatus struct { + ActiveCount int `json:"active_count"` + BackendUsage map[string]sessionmanager.BackendUsage `json:"backend_usage"` // backend_name -> usage stats +} + +// BackendUsage tracks session usage statistics for a specific backend. +// This provides operational visibility without exposing session identifiers. +type BackendUsage = sessionmanager.BackendUsage + // BackendStatus represents the status of a single backend MCP server. type BackendStatus struct { Name string `json:"name"` @@ -81,12 +93,30 @@ func (s *Server) buildStatusResponse(ctx context.Context) StatusResponse { // Healthy = true if at least one backend is healthy AND there's at least one backend healthy := len(backends) > 0 && hasHealthyBackend - return StatusResponse{ + response := StatusResponse{ Backends: backendStatuses, Healthy: healthy, Version: versions.GetVersionInfo().Version, GroupRef: s.config.GroupRef, } + + // Add session information if sessionManagementV2 is enabled + // This provides operational visibility into backend usage patterns without + // exposing session identifiers that could be used for session hijacking. + if s.config.SessionManagementV2 && s.vmcpSessionMgr != nil { + activeCount, backendUsage := s.vmcpSessionMgr.ListActiveSessions() + response.Sessions = &SessionsStatus{ + ActiveCount: activeCount, + BackendUsage: backendUsage, + } + // #nosec G706 -- structured logging with slog is safe from injection + slog.Debug("buildStatusResponse: included session info", + "active_sessions", activeCount, + "backends", len(backendUsage), + "v2_enabled", true) + } + + return response } // getAuthType returns the auth type string from the backend auth strategy. diff --git a/pkg/vmcp/server/status_test.go b/pkg/vmcp/server/status_test.go index 7dcf7021b7..30d63cbc31 100644 --- a/pkg/vmcp/server/status_test.go +++ b/pkg/vmcp/server/status_test.go @@ -17,21 +17,39 @@ import ( "github.com/stacklok/toolhive/pkg/networking" "github.com/stacklok/toolhive/pkg/vmcp" "github.com/stacklok/toolhive/pkg/vmcp/aggregator" + vmcpauth "github.com/stacklok/toolhive/pkg/vmcp/auth" + "github.com/stacklok/toolhive/pkg/vmcp/auth/strategies" authtypes "github.com/stacklok/toolhive/pkg/vmcp/auth/types" discoveryMocks "github.com/stacklok/toolhive/pkg/vmcp/discovery/mocks" "github.com/stacklok/toolhive/pkg/vmcp/mocks" "github.com/stacklok/toolhive/pkg/vmcp/router" "github.com/stacklok/toolhive/pkg/vmcp/server" + vmcpsession "github.com/stacklok/toolhive/pkg/vmcp/session" ) // StatusResponse mirrors the server's status response structure for test deserialization. type StatusResponse struct { Backends []BackendStatus `json:"backends"` + Sessions *SessionsStatus `json:"sessions,omitempty"` Healthy bool `json:"healthy"` Version string `json:"version"` GroupRef string `json:"group_ref"` } +// SessionsStatus mirrors the server's sessions status structure for test deserialization. +type SessionsStatus struct { + ActiveCount int `json:"active_count"` + BackendUsage map[string]BackendUsage `json:"backend_usage"` +} + +// BackendUsage mirrors the server's backend usage structure for test deserialization. +// This provides operational visibility without exposing session identifiers. +type BackendUsage struct { + SessionCount int `json:"session_count"` // Number of sessions using this backend + HealthyCount int `json:"healthy_count"` // Number of sessions with healthy connections + FailedCount int `json:"failed_count"` // Number of sessions with failed connections +} + // BackendStatus mirrors the server's backend status structure for test deserialization. type BackendStatus struct { Name string `json:"name"` @@ -241,3 +259,193 @@ func TestStatusEndpoint_BackendFieldMapping(t *testing.T) { assert.Equal(t, "streamable-http", b.Transport) assert.Equal(t, authtypes.StrategyTypeTokenExchange, b.AuthType) } + +func TestStatusEndpoint_SessionsNotIncludedWhenV2Disabled(t *testing.T) { + t.Parallel() + + // Default server configuration has SessionManagementV2 = false + backends := []vmcp.Backend{{ + ID: "b1", Name: "test-backend", HealthStatus: vmcp.BackendHealthy, + }} + srv := createTestServerWithBackends(t, backends, "") + + resp, err := http.Get("http://" + srv.Address() + "/status") + require.NoError(t, err) + defer resp.Body.Close() + + var status StatusResponse + require.NoError(t, json.NewDecoder(resp.Body).Decode(&status)) + + // Sessions field should be nil when v2 is disabled + assert.Nil(t, status.Sessions, "Sessions should not be present when SessionManagementV2 is disabled") + assert.Len(t, status.Backends, 1) + assert.True(t, status.Healthy) +} + +func TestStatusEndpoint_SessionsIncludedWhenV2Enabled(t *testing.T) { + t.Parallel() + + // This test uses the sessionmanager package directly rather than mocking + // to verify the full integration of backend usage statistics in /status. + // Since we cannot easily create real MultiSessions without starting backend + // servers, we verify the empty case (no active sessions) and the structure. + + ctrl := gomock.NewController(t) + t.Cleanup(ctrl.Finish) + + mockBackendClient := mocks.NewMockBackendClient(ctrl) + mockDiscoveryMgr := discoveryMocks.NewMockManager(ctrl) + rt := router.NewDefaultRouter() + + port := networking.FindAvailable() + require.NotZero(t, port, "Failed to find available port") + + mockDiscoveryMgr.EXPECT(). + Discover(gomock.Any(), gomock.Any()). + Return(&aggregator.AggregatedCapabilities{ + Tools: []vmcp.Tool{}, + Resources: []vmcp.Resource{}, + Prompts: []vmcp.Prompt{}, + RoutingTable: &vmcp.RoutingTable{ + Tools: make(map[string]*vmcp.BackendTarget), + Resources: make(map[string]*vmcp.BackendTarget), + Prompts: make(map[string]*vmcp.BackendTarget), + }, + Metadata: &aggregator.AggregationMetadata{}, + }, nil). + AnyTimes() + mockDiscoveryMgr.EXPECT().Stop().AnyTimes() + + backends := []vmcp.Backend{ + {ID: "backend1", Name: "backend1", HealthStatus: vmcp.BackendHealthy}, + } + + // Create a real session factory (required for SessionManagementV2) + authReg := vmcpauth.NewDefaultOutgoingAuthRegistry() + require.NoError(t, authReg.RegisterStrategy( + authtypes.StrategyTypeUnauthenticated, + strategies.NewUnauthenticatedStrategy(), + )) + factory := vmcpsession.NewSessionFactory(authReg) + + ctx, cancel := context.WithCancel(t.Context()) + t.Cleanup(cancel) + + srv, err := server.New(ctx, &server.Config{ + Name: "test-vmcp-v2", + Version: "1.0.0", + Host: "127.0.0.1", + Port: port, + GroupRef: "test-group", + SessionTTL: 5 * time.Minute, + SessionManagementV2: true, + SessionFactory: factory, + }, rt, mockBackendClient, mockDiscoveryMgr, vmcp.NewImmutableRegistry(backends), nil) + require.NoError(t, err) + + errCh := make(chan error, 1) + go func() { + if err := srv.Start(ctx); err != nil { + errCh <- err + } + }() + + select { + case <-srv.Ready(): + case err := <-errCh: + t.Fatalf("Server failed to start: %v", err) + case <-time.After(5 * time.Second): + t.Fatalf("Server did not become ready within 5s") + } + + time.Sleep(10 * time.Millisecond) + + // Make request to /status endpoint + resp, err := http.Get("http://" + srv.Address() + "/status") + require.NoError(t, err) + defer resp.Body.Close() + + var status StatusResponse + require.NoError(t, json.NewDecoder(resp.Body).Decode(&status)) + + // Verify sessions section is included (even if empty) + require.NotNil(t, status.Sessions, "Sessions should be present when SessionManagementV2 is enabled") + assert.Equal(t, 0, status.Sessions.ActiveCount, "Should report 0 active sessions (no sessions created yet)") + assert.NotNil(t, status.Sessions.BackendUsage, "BackendUsage should be non-nil map") + assert.Empty(t, status.Sessions.BackendUsage, "BackendUsage should be empty (no sessions created yet)") + + // Verify the response structure doesn't expose session IDs + resp2, err := http.Get("http://" + srv.Address() + "/status") + require.NoError(t, err) + defer resp2.Body.Close() + + var rawResponse map[string]interface{} + require.NoError(t, json.NewDecoder(resp2.Body).Decode(&rawResponse)) + + sessions, ok := rawResponse["sessions"].(map[string]interface{}) + require.True(t, ok, "sessions should be a map") + + // Ensure no session_id field exists + _, hasSessionID := sessions["session_id"] + assert.False(t, hasSessionID, "Should not expose session_id field") + + // Ensure no sessions array with individual session data + _, hasSessions := sessions["sessions"] + assert.False(t, hasSessions, "Should not have sessions array with individual session data") + + // Should only have active_count and backend_usage + assert.Contains(t, sessions, "active_count") + assert.Contains(t, sessions, "backend_usage") + + // Verify backend_usage is a map, not an array + backendUsage, ok := sessions["backend_usage"].(map[string]interface{}) + require.True(t, ok, "backend_usage should be a map") + assert.Empty(t, backendUsage, "backend_usage should be empty (no sessions yet)") +} + +func TestStatusEndpoint_NoCredentialsExposed(t *testing.T) { + t.Parallel() + + backends := []vmcp.Backend{{ + ID: "b1", + Name: "secure-backend", + BaseURL: "https://secret-internal-url.example.com:9090/mcp", + HealthStatus: vmcp.BackendHealthy, + AuthConfig: &authtypes.BackendAuthStrategy{ + Type: authtypes.StrategyTypeTokenExchange, + }, + }} + srv := createTestServerWithBackends(t, backends, "") + + resp, err := http.Get("http://" + srv.Address() + "/status") + require.NoError(t, err) + defer resp.Body.Close() + + // Read raw response body to check what's exposed + var rawResponse map[string]interface{} + require.NoError(t, json.NewDecoder(resp.Body).Decode(&rawResponse)) + + // Verify no sensitive fields are exposed + respBackends, ok := rawResponse["backends"].([]interface{}) + require.True(t, ok) + require.Len(t, respBackends, 1) + + backend := respBackends[0].(map[string]interface{}) + + // Should NOT expose: BaseURL, credentials, tokens, internal URLs + _, hasBaseURL := backend["base_url"] + _, hasURL := backend["url"] + _, hasToken := backend["token"] + _, hasCredentials := backend["credentials"] + + assert.False(t, hasBaseURL, "BaseURL should not be exposed") + assert.False(t, hasURL, "URL should not be exposed") + assert.False(t, hasToken, "Token should not be exposed") + assert.False(t, hasCredentials, "Credentials should not be exposed") + + // Should expose: Name, Health, Transport, AuthType (safe metadata) + assert.Contains(t, backend, "name") + assert.Contains(t, backend, "health") + assert.Contains(t, backend, "transport") + assert.Contains(t, backend, "auth_type") +} From 3ad3eae7521b6921f5f36d04eeaea8590bcba0ce Mon Sep 17 00:00:00 2001 From: taskbot Date: Mon, 9 Mar 2026 11:43:24 +0100 Subject: [PATCH 2/2] feat(vmcp): log active sessions on creation, expiry, and periodically Replace the /status endpoint session fields with structured logging: - CreateSession logs Info with session ID, backend count, and names - Terminate logs Info with session ID and backend count on expiry - StartPeriodicLogging starts a background goroutine that logs active session counts every minute via logActiveSessions() - Server.Start() wires up periodic logging when SessionManagementV2 is on Remove ListActiveSessions() from the SessionManager interface and drop the /status session fields (Sessions, SessionsStatus, BackendUsage). Closes: #3876 Co-Authored-By: Claude Sonnet 4.6 fix(vmcp): drop else after return in Terminate (revive lint) Co-Authored-By: Claude Sonnet 4.6 --- pkg/transport/session/manager.go | 7 + pkg/vmcp/server/server.go | 15 + pkg/vmcp/server/session_manager_interface.go | 10 +- .../sessionmanager/list_sessions_test.go | 261 ------------------ .../server/sessionmanager/session_manager.go | 199 ++++++------- pkg/vmcp/server/status.go | 32 +-- pkg/vmcp/server/status_test.go | 165 +---------- 7 files changed, 116 insertions(+), 573 deletions(-) delete mode 100644 pkg/vmcp/server/sessionmanager/list_sessions_test.go diff --git a/pkg/transport/session/manager.go b/pkg/transport/session/manager.go index 79bfce34b4..38a0a6faf6 100644 --- a/pkg/transport/session/manager.go +++ b/pkg/transport/session/manager.go @@ -219,6 +219,13 @@ func (m *Manager) Stop() error { return nil } +// SupportsRange reports whether the current storage backend supports Range(). +// Returns true only for LocalStorage; distributed backends (Redis, Valkey) return false. +func (m *Manager) SupportsRange() bool { + _, ok := m.storage.(*LocalStorage) + return ok +} + // Range calls f sequentially for each key and value present in the map. // If f returns false, range stops the iteration. // diff --git a/pkg/vmcp/server/server.go b/pkg/vmcp/server/server.go index b793534595..0a3101cfdc 100644 --- a/pkg/vmcp/server/server.go +++ b/pkg/vmcp/server/server.go @@ -67,6 +67,10 @@ const ( // defaultSessionTTL is the default session time-to-live duration. // Sessions that are inactive for this duration will be automatically cleaned up. defaultSessionTTL = 30 * time.Minute + + // defaultSessionLoggingInterval is how often active sessions are logged when + // SessionManagementV2 is enabled. + defaultSessionLoggingInterval = 1 * time.Minute ) //go:generate mockgen -destination=mocks/mock_watcher.go -package=mocks -source=server.go Watcher @@ -667,6 +671,17 @@ func (s *Server) Start(ctx context.Context) error { } } + // Start periodic session logging if SessionManagementV2 is enabled + if s.vmcpSessionMgr != nil { + sessionLoggingCtx, sessionLoggingCancel := context.WithCancel(ctx) + s.vmcpSessionMgr.StartPeriodicLogging(sessionLoggingCtx, defaultSessionLoggingInterval) + slog.Info("session periodic logging started", "interval", defaultSessionLoggingInterval) + s.shutdownFuncs = append(s.shutdownFuncs, func(context.Context) error { + sessionLoggingCancel() + return nil + }) + } + // Start status reporter if configured if s.statusReporter != nil { shutdown, err := s.statusReporter.Start(ctx) diff --git a/pkg/vmcp/server/session_manager_interface.go b/pkg/vmcp/server/session_manager_interface.go index 5646e3597c..9d5c80e2f9 100644 --- a/pkg/vmcp/server/session_manager_interface.go +++ b/pkg/vmcp/server/session_manager_interface.go @@ -5,10 +5,10 @@ package server import ( "context" + "time" mcpserver "github.com/mark3labs/mcp-go/server" - "github.com/stacklok/toolhive/pkg/vmcp/server/sessionmanager" vmcpsession "github.com/stacklok/toolhive/pkg/vmcp/session" ) @@ -32,8 +32,8 @@ type SessionManager interface { // session's backend connections rather than the global router. GetAdaptedTools(sessionID string) ([]mcpserver.ServerTool, error) - // ListActiveSessions returns backend-centric usage statistics for all active vMCP sessions. - // Used by the /status endpoint to expose operational visibility without leaking session identifiers. - // Returns (activeCount, backendUsage map). Returns (0, empty map) if no sessions exist. - ListActiveSessions() (int, map[string]sessionmanager.BackendUsage) + // StartPeriodicLogging starts a background goroutine that periodically logs + // the active session count and per-backend session counts at the given interval. + // The goroutine stops when ctx is cancelled. + StartPeriodicLogging(ctx context.Context, interval time.Duration) } diff --git a/pkg/vmcp/server/sessionmanager/list_sessions_test.go b/pkg/vmcp/server/sessionmanager/list_sessions_test.go deleted file mode 100644 index 192686f7b6..0000000000 --- a/pkg/vmcp/server/sessionmanager/list_sessions_test.go +++ /dev/null @@ -1,261 +0,0 @@ -// SPDX-FileCopyrightText: Copyright 2025 Stacklok, Inc. -// SPDX-License-Identifier: Apache-2.0 - -package sessionmanager_test - -import ( - "context" - "testing" - "time" - - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" - - "github.com/stacklok/toolhive/pkg/auth" - transportsession "github.com/stacklok/toolhive/pkg/transport/session" - "github.com/stacklok/toolhive/pkg/vmcp" - vmcpauth "github.com/stacklok/toolhive/pkg/vmcp/auth" - "github.com/stacklok/toolhive/pkg/vmcp/auth/strategies" - authtypes "github.com/stacklok/toolhive/pkg/vmcp/auth/types" - "github.com/stacklok/toolhive/pkg/vmcp/server/sessionmanager" - vmcpsession "github.com/stacklok/toolhive/pkg/vmcp/session" -) - -// createTestFactory creates a MultiSessionFactory for testing -func createTestFactory(t *testing.T) vmcpsession.MultiSessionFactory { - t.Helper() - - authReg := vmcpauth.NewDefaultOutgoingAuthRegistry() - require.NoError(t, authReg.RegisterStrategy( - authtypes.StrategyTypeUnauthenticated, - strategies.NewUnauthenticatedStrategy(), - )) - - return vmcpsession.NewSessionFactory(authReg) -} - -func TestManager_ListActiveSessions_EmptyStore(t *testing.T) { - t.Parallel() - - // Create a manager with empty session store - storage := transportsession.NewTypedManager(30*time.Minute, transportsession.SessionTypeMCP) - defer storage.Stop() - - factory := createTestFactory(t) - mgr := sessionmanager.New(storage, factory, vmcp.NewImmutableRegistry([]vmcp.Backend{})) - - // List sessions from empty store - activeCount, backendUsage := mgr.ListActiveSessions() - - assert.Equal(t, 0, activeCount, "Should have 0 active sessions") - assert.NotNil(t, backendUsage, "Should return non-nil map") - assert.Empty(t, backendUsage, "Should return empty map for empty store") -} - -func TestManager_ListActiveSessions_WithPlaceholderSessions(t *testing.T) { - t.Parallel() - - // Create a manager with session store - storage := transportsession.NewTypedManager(30*time.Minute, transportsession.SessionTypeMCP) - defer storage.Stop() - - factory := createTestFactory(t) - mgr := sessionmanager.New(storage, factory, vmcp.NewImmutableRegistry([]vmcp.Backend{})) - - // Generate some placeholder sessions (Phase 1 only, not fully initialized) - sessionID1 := mgr.Generate() - sessionID2 := mgr.Generate() - - assert.NotEmpty(t, sessionID1) - assert.NotEmpty(t, sessionID2) - - // List sessions - placeholders should be skipped - activeCount, backendUsage := mgr.ListActiveSessions() - - // Placeholder sessions (not yet MultiSession) should not be included - assert.Equal(t, 0, activeCount, "Placeholder sessions should not be counted") - assert.Empty(t, backendUsage, "Placeholder sessions should not contribute to backend usage") -} - -func TestManager_ListActiveSessions_ReturnsEmptyMapNotNil(t *testing.T) { - t.Parallel() - - storage := transportsession.NewTypedManager(30*time.Minute, transportsession.SessionTypeMCP) - defer storage.Stop() - - factory := createTestFactory(t) - mgr := sessionmanager.New(storage, factory, vmcp.NewImmutableRegistry([]vmcp.Backend{})) - - // Even with no sessions, should return non-nil empty map - activeCount, backendUsage := mgr.ListActiveSessions() - - assert.Equal(t, 0, activeCount, "Should have 0 active sessions") - assert.NotNil(t, backendUsage, "Should return non-nil map") - assert.Len(t, backendUsage, 0, "Should be empty") -} - -// TestManager_ListActiveSessions_SkipsNonMultiSession verifies that -// only MultiSession instances are included in the results. -func TestManager_ListActiveSessions_SkipsNonMultiSession(t *testing.T) { - t.Parallel() - - storage := transportsession.NewTypedManager(30*time.Minute, transportsession.SessionTypeMCP) - defer storage.Stop() - - factory := createTestFactory(t) - mgr := sessionmanager.New(storage, factory, vmcp.NewImmutableRegistry([]vmcp.Backend{})) - - // Generate placeholders - these are ProxySession, not MultiSession - _ = mgr.Generate() - _ = mgr.Generate() - - // Also manually add a non-MultiSession session to storage - plainSession := transportsession.NewProxySession("plain-session") - err := storage.AddSession(plainSession) - require.NoError(t, err) - - // List should skip all non-MultiSession types - activeCount, backendUsage := mgr.ListActiveSessions() - - assert.Equal(t, 0, activeCount, "Should skip non-MultiSession instances") - assert.Empty(t, backendUsage, "Should have no backend usage from non-MultiSession") -} - -// TestManager_ListActiveSessions_BackendUsageStructure verifies the -// structure of BackendUsage returned by ListActiveSessions. -func TestManager_ListActiveSessions_BackendUsageStructure(t *testing.T) { - t.Parallel() - - // This test verifies that BackendUsage has the expected fields by - // creating an instance and checking it can be constructed. - usage := sessionmanager.BackendUsage{ - SessionCount: 5, - HealthyCount: 4, - FailedCount: 1, - } - - assert.Equal(t, 5, usage.SessionCount) - assert.Equal(t, 4, usage.HealthyCount) - assert.Equal(t, 1, usage.FailedCount) -} - -// mockMultiSession is a minimal mock that implements vmcpsession.MultiSession -// for testing purposes. It embeds a real ProxySession for base functionality -// and adds the MultiSession-specific methods. -type mockMultiSession struct { - *transportsession.ProxySession - backendSessions map[string]string - tools []vmcp.Tool - resources []vmcp.Resource - prompts []vmcp.Prompt -} - -func newMockMultiSession(id string, backendSessions map[string]string) *mockMultiSession { - return &mockMultiSession{ - ProxySession: transportsession.NewProxySession(id), - backendSessions: backendSessions, - tools: []vmcp.Tool{}, - resources: []vmcp.Resource{}, - prompts: []vmcp.Prompt{}, - } -} - -func (m *mockMultiSession) BackendSessions() map[string]string { - return m.backendSessions -} - -func (m *mockMultiSession) Tools() []vmcp.Tool { - return m.tools -} - -func (m *mockMultiSession) Resources() []vmcp.Resource { - return m.resources -} - -func (m *mockMultiSession) Prompts() []vmcp.Prompt { - return m.prompts -} - -// Implement Caller interface methods (required but not used in these tests) -func (*mockMultiSession) CallTool(_ context.Context, _ *auth.Identity, _ string, _, _ map[string]any) (*vmcp.ToolCallResult, error) { - return nil, nil -} - -func (*mockMultiSession) ReadResource(_ context.Context, _ *auth.Identity, _ string) (*vmcp.ResourceReadResult, error) { - return nil, nil -} - -func (*mockMultiSession) GetPrompt(_ context.Context, _ *auth.Identity, _ string, _ map[string]any) (*vmcp.PromptGetResult, error) { - return nil, nil -} - -func (*mockMultiSession) Close() error { - return nil -} - -// Compile-time check that mockMultiSession implements MultiSession -var _ vmcpsession.MultiSession = (*mockMultiSession)(nil) - -// TestManager_ListActiveSessions_WithMockMultiSession tests the iteration -// logic and backend aggregation by manually storing mock MultiSession instances. -func TestManager_ListActiveSessions_WithMockMultiSession(t *testing.T) { - t.Parallel() - - storage := transportsession.NewTypedManager(30*time.Minute, transportsession.SessionTypeMCP) - defer storage.Stop() - - factory := createTestFactory(t) - mgr := sessionmanager.New(storage, factory, vmcp.NewImmutableRegistry([]vmcp.Backend{})) - - // Manually add mock MultiSessions to storage - session1 := newMockMultiSession("session-1", map[string]string{ - "backend1": "backend1-session-id", - "backend2": "backend2-session-id", - }) - session2 := newMockMultiSession("session-2", map[string]string{ - "backend1": "backend1-session-id-2", // backend1 used by 2 sessions - "backend3": "backend3-session-id", - }) - - err := storage.AddSession(session1) - require.NoError(t, err) - err = storage.AddSession(session2) - require.NoError(t, err) - - // Also add a placeholder to verify it's skipped - placeholder := transportsession.NewProxySession("placeholder-session") - err = storage.AddSession(placeholder) - require.NoError(t, err) - - // Verify storage has the sessions - t.Logf("Storage count before listing: %d", storage.Count()) - - // List sessions - activeCount, backendUsage := mgr.ListActiveSessions() - - // Should have 2 active sessions (placeholder should be skipped) - t.Logf("ListActiveSessions returned %d sessions, %d backends", activeCount, len(backendUsage)) - assert.Equal(t, 2, activeCount, "Should have 2 active MultiSession instances") - assert.Len(t, backendUsage, 3, "Should have 3 unique backends") - - // Verify backend1 (used by 2 sessions) - backend1Usage, ok := backendUsage["backend1"] - require.True(t, ok, "Should include backend1") - assert.Equal(t, 2, backend1Usage.SessionCount, "backend1 should be used by 2 sessions") - assert.Equal(t, 2, backend1Usage.HealthyCount, "backend1 should have 2 healthy connections") - assert.Equal(t, 0, backend1Usage.FailedCount, "backend1 should have 0 failed connections") - - // Verify backend2 (used by 1 session) - backend2Usage, ok := backendUsage["backend2"] - require.True(t, ok, "Should include backend2") - assert.Equal(t, 1, backend2Usage.SessionCount, "backend2 should be used by 1 session") - assert.Equal(t, 1, backend2Usage.HealthyCount, "backend2 should have 1 healthy connection") - assert.Equal(t, 0, backend2Usage.FailedCount, "backend2 should have 0 failed connections") - - // Verify backend3 (used by 1 session) - backend3Usage, ok := backendUsage["backend3"] - require.True(t, ok, "Should include backend3") - assert.Equal(t, 1, backend3Usage.SessionCount, "backend3 should be used by 1 session") - assert.Equal(t, 1, backend3Usage.HealthyCount, "backend3 should have 1 healthy connection") - assert.Equal(t, 0, backend3Usage.FailedCount, "backend3 should have 0 failed connections") -} diff --git a/pkg/vmcp/server/sessionmanager/session_manager.go b/pkg/vmcp/server/sessionmanager/session_manager.go index 072aa10bee..b1d9e74708 100644 --- a/pkg/vmcp/server/sessionmanager/session_manager.go +++ b/pkg/vmcp/server/sessionmanager/session_manager.go @@ -19,6 +19,7 @@ import ( "fmt" "log/slog" "sync" + "time" "github.com/google/uuid" "github.com/mark3labs/mcp-go/mcp" @@ -42,15 +43,6 @@ const ( MetadataValTrue = "true" ) -// BackendUsage tracks session usage statistics for a specific backend. -// This provides operational visibility into backend load and health without -// exposing session identifiers that could be used for session hijacking. -type BackendUsage struct { - SessionCount int `json:"session_count"` // Number of sessions using this backend - HealthyCount int `json:"healthy_count"` // Number of sessions with healthy connections - FailedCount int `json:"failed_count"` // Number of sessions with failed connections -} - // Manager bridges the domain session lifecycle (MultiSession / MultiSessionFactory) // to the mark3labs SDK's SessionIdManager interface. // @@ -216,9 +208,14 @@ func (sm *Manager) CreateSession( return nil, fmt.Errorf("Manager.CreateSession: failed to replace placeholder: %w", err) } - slog.Debug("Manager: created multi-session", + backendNames := make([]string, len(backends)) + for i, b := range backends { + backendNames[i] = b.Name + } + slog.Info("Manager: session created", "session_id", sessionID, - "backend_count", len(backends)) + "backend_count", len(backends), + "backends", backendNames) return sess, nil } @@ -281,6 +278,7 @@ func (sm *Manager) Terminate(sessionID string) (isNotAllowed bool, err error) { // If the session is a fully-formed MultiSession, close its backend connections. if multiSess, ok := sess.(vmcpsession.MultiSession); ok { + backendSessions := multiSess.BackendSessions() if closeErr := multiSess.Close(); closeErr != nil { slog.Warn("Manager.Terminate: error closing multi-session backend connections", "session_id", sessionID, "error", closeErr) @@ -289,37 +287,39 @@ func (sm *Manager) Terminate(sessionID string) (isNotAllowed bool, err error) { if deleteErr := sm.storage.Delete(sessionID); deleteErr != nil { return false, fmt.Errorf("Manager.Terminate: failed to delete session from storage: %w", deleteErr) } - } else { - // Placeholder session (not yet upgraded to MultiSession). - // - // This handles the race condition where a client sends DELETE between - // Generate() (Phase 1) and CreateSession() (Phase 2). The two-phase - // pattern creates a window where the session exists as a placeholder: - // - // 1. Client sends initialize → Generate() creates placeholder - // 2. Client sends DELETE before OnRegisterSession hook fires - // 3. We mark the placeholder as terminated (don't delete it) - // 4. CreateSession() hook fires → sees terminated flag → fails fast - // - // Without this branch, CreateSession() would open backend HTTP connections - // for a session the client already terminated, silently resurrecting it. - // - // We mark (not delete) so Validate() can return isTerminated=true, which - // lets the SDK distinguish "actively terminated" from "never existed". - // TTL cleanup will remove the placeholder later. - sess.SetMetadata(MetadataKeyTerminated, MetadataValTrue) - if replaceErr := sm.storage.UpsertSession(sess); replaceErr != nil { - slog.Warn("Manager.Terminate: failed to persist terminated flag for placeholder; attempting delete fallback", - "session_id", sessionID, "error", replaceErr) - if deleteErr := sm.storage.Delete(sessionID); deleteErr != nil { - return false, fmt.Errorf( - "Manager.Terminate: failed to persist terminated flag and delete placeholder: upsertErr=%v, deleteErr=%w", - replaceErr, deleteErr) - } + slog.Info("Manager: session terminated", "session_id", sessionID, "backend_count", len(backendSessions)) + return false, nil + } + + // Placeholder session (not yet upgraded to MultiSession). + // + // This handles the race condition where a client sends DELETE between + // Generate() (Phase 1) and CreateSession() (Phase 2). The two-phase + // pattern creates a window where the session exists as a placeholder: + // + // 1. Client sends initialize → Generate() creates placeholder + // 2. Client sends DELETE before OnRegisterSession hook fires + // 3. We mark the placeholder as terminated (don't delete it) + // 4. CreateSession() hook fires → sees terminated flag → fails fast + // + // Without this branch, CreateSession() would open backend HTTP connections + // for a session the client already terminated, silently resurrecting it. + // + // We mark (not delete) so Validate() can return isTerminated=true, which + // lets the SDK distinguish "actively terminated" from "never existed". + // TTL cleanup will remove the placeholder later. + sess.SetMetadata(MetadataKeyTerminated, MetadataValTrue) + if replaceErr := sm.storage.UpsertSession(sess); replaceErr != nil { + slog.Warn("Manager.Terminate: failed to persist terminated flag for placeholder; attempting delete fallback", + "session_id", sessionID, "error", replaceErr) + if deleteErr := sm.storage.Delete(sessionID); deleteErr != nil { + return false, fmt.Errorf( + "Manager.Terminate: failed to persist terminated flag and delete placeholder: upsertErr=%v, deleteErr=%w", + replaceErr, deleteErr) } } - slog.Info("Manager.Terminate: session terminated", "session_id", sessionID) + slog.Info("Manager: placeholder session terminated", "session_id", sessionID) return false, nil } @@ -418,95 +418,66 @@ func (sm *Manager) GetAdaptedTools(sessionID string) ([]mcpserver.ServerTool, er return sdkTools, nil } -// ListActiveSessions returns backend-centric usage statistics for all active vMCP sessions. -// Used by the /status endpoint to expose operational visibility without leaking session identifiers. -// -// Security Note: This method intentionally does NOT expose session IDs or backend session IDs, -// as these could be used as bearer tokens for session hijacking attacks via the Mcp-Session-Id -// header. Instead, it aggregates session data by backend to provide operational insights -// (load distribution, health status) without security risk. +// StartPeriodicLogging starts a background goroutine that logs active session +// counts at the given interval until ctx is cancelled. // -// Storage Backend Limitation: This method uses transportsession.Manager.Range(), which only -// works with LocalStorage. With distributed storage backends (Redis, Valkey), Range is a no-op -// and this method will return (0, empty map). This is acceptable because MultiSessions are -// node-local (they hold in-process HTTP connections and routing state that cannot be serialized). -// Even with distributed storage, a session can only be served by the process that created it, -// so reporting only local sessions provides accurate operational visibility for this node. -// -// Returns: -// - activeCount: Total number of active sessions on this node -// - backendUsage: Map of backend_name -> usage statistics (session count, healthy, failed) +// This provides operational visibility into session load without exposing +// session identifiers over the network. +func (sm *Manager) StartPeriodicLogging(ctx context.Context, interval time.Duration) { + if interval <= 0 { + slog.Warn("Manager: StartPeriodicLogging called with non-positive interval, defaulting to 1m", "interval", interval) + interval = time.Minute + } + go func() { + ticker := time.NewTicker(interval) + defer ticker.Stop() + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + sm.logActiveSessions() + } + } + }() +} + +// logActiveSessions logs a summary of all active vMCP sessions on this node. // -// Returns (0, empty map) if no sessions exist, all sessions are placeholders, or using -// non-local storage backend. -func (sm *Manager) ListActiveSessions() (int, map[string]BackendUsage) { - backendUsage := make(map[string]BackendUsage) +// Storage Backend Limitation: Range() only works with LocalStorage. With +// distributed backends (Redis, Valkey) it is a no-op and 0 sessions will be +// reported even if sessions exist in the cluster. This is expected: sessions +// hold in-process HTTP connections and are inherently node-local. +func (sm *Manager) logActiveSessions() { + if !sm.storage.SupportsRange() { + sm.storageWarningOnce.Do(func() { + slog.Warn("Manager: session enumeration unavailable with current storage backend", + "reason", "Range() only works with LocalStorage; distributed storage (Redis, Valkey) cannot enumerate sessions", + "note", "Sessions are node-local; this is expected behavior") + }) + return + } + activeCount := 0 - rangeExecuted := false + backendSessionCounts := make(map[string]int) // backend ID -> session count - // Iterate over all sessions in storage using the Range() helper. - // Note: Range only works with LocalStorage backend. With distributed storage - // (Redis, Valkey), this is a no-op and returns empty results. This is expected - // because sessions are node-local and cannot be transferred between processes. sm.storage.Range(func(key, value interface{}) bool { - rangeExecuted = true - sessionID, ok := key.(string) + _, ok := key.(string) if !ok { - slog.Warn("Manager.ListActiveSessions: non-string session key", "key", key) - return true // continue iteration + return true } - // Type assert to MultiSession to access session-specific methods multiSess, ok := value.(vmcpsession.MultiSession) if !ok { - // Session is a placeholder (not yet fully initialized) or different type - slog.Debug("Manager.ListActiveSessions: skipping non-MultiSession", - "session_id", sessionID, - "type", fmt.Sprintf("%T", value)) - return true // continue iteration + return true // placeholder, skip } activeCount++ - - // Get backend session mappings (backendID -> backendSessionID) - backendSessions := multiSess.BackendSessions() - - // Aggregate statistics by backend name. - // - // Limitation: Currently all backends with sessions are counted as healthy. - // This doesn't account for partial failures where some backends failed during - // session initialization. To properly track failed backends, we would need: - // 1. MultiSession to expose which backends were attempted vs succeeded - // 2. Session metadata storing the original backend list at creation time - // 3. Comparison between attempted and successful backends to compute failures - // - // For now, BackendSessions() only returns successfully connected backends, - // so we cannot distinguish between "backend not attempted" and "backend failed". - // This means FailedCount will always be 0 until proper failure tracking is added. - for backendID := range backendSessions { - usage := backendUsage[backendID] - usage.SessionCount++ - usage.HealthyCount++ // Only successfully connected backends appear in BackendSessions() - // usage.FailedCount stays 0 - we don't currently track which backends failed - backendUsage[backendID] = usage + for backendID := range multiSess.BackendSessions() { + backendSessionCounts[backendID]++ } - - return true // continue iteration + return true }) - // Log a one-time warning if Range was never executed (distributed storage) - if !rangeExecuted && activeCount == 0 { - sm.storageWarningOnce.Do(func() { - slog.Warn("Session enumeration not supported with current storage backend", - "reason", "Range() only works with LocalStorage; distributed storage (Redis, Valkey) cannot enumerate sessions", - "impact", "/status endpoint will show 0 sessions even if sessions exist in the cluster", - "note", "Sessions are node-local - each node can only serve sessions it created; this is expected behavior") - }) - } - - slog.Debug("Manager.ListActiveSessions: completed", - "total_sessions", activeCount, - "backends", len(backendUsage)) - - return activeCount, backendUsage + slog.Info("Manager: active sessions", "total_sessions", activeCount, "backend_session_counts", backendSessionCounts) } diff --git a/pkg/vmcp/server/status.go b/pkg/vmcp/server/status.go index bae1a2a2ff..484ee5bd88 100644 --- a/pkg/vmcp/server/status.go +++ b/pkg/vmcp/server/status.go @@ -12,28 +12,16 @@ import ( "github.com/stacklok/toolhive/pkg/versions" "github.com/stacklok/toolhive/pkg/vmcp" authtypes "github.com/stacklok/toolhive/pkg/vmcp/auth/types" - "github.com/stacklok/toolhive/pkg/vmcp/server/sessionmanager" ) // StatusResponse represents the vMCP server's operational status. type StatusResponse struct { Backends []BackendStatus `json:"backends"` - Sessions *SessionsStatus `json:"sessions,omitempty"` // Only present when sessionManagementV2 enabled Healthy bool `json:"healthy"` Version string `json:"version"` GroupRef string `json:"group_ref"` } -// SessionsStatus contains information about active vMCP sessions. -type SessionsStatus struct { - ActiveCount int `json:"active_count"` - BackendUsage map[string]sessionmanager.BackendUsage `json:"backend_usage"` // backend_name -> usage stats -} - -// BackendUsage tracks session usage statistics for a specific backend. -// This provides operational visibility without exposing session identifiers. -type BackendUsage = sessionmanager.BackendUsage - // BackendStatus represents the status of a single backend MCP server. type BackendStatus struct { Name string `json:"name"` @@ -93,30 +81,12 @@ func (s *Server) buildStatusResponse(ctx context.Context) StatusResponse { // Healthy = true if at least one backend is healthy AND there's at least one backend healthy := len(backends) > 0 && hasHealthyBackend - response := StatusResponse{ + return StatusResponse{ Backends: backendStatuses, Healthy: healthy, Version: versions.GetVersionInfo().Version, GroupRef: s.config.GroupRef, } - - // Add session information if sessionManagementV2 is enabled - // This provides operational visibility into backend usage patterns without - // exposing session identifiers that could be used for session hijacking. - if s.config.SessionManagementV2 && s.vmcpSessionMgr != nil { - activeCount, backendUsage := s.vmcpSessionMgr.ListActiveSessions() - response.Sessions = &SessionsStatus{ - ActiveCount: activeCount, - BackendUsage: backendUsage, - } - // #nosec G706 -- structured logging with slog is safe from injection - slog.Debug("buildStatusResponse: included session info", - "active_sessions", activeCount, - "backends", len(backendUsage), - "v2_enabled", true) - } - - return response } // getAuthType returns the auth type string from the backend auth strategy. diff --git a/pkg/vmcp/server/status_test.go b/pkg/vmcp/server/status_test.go index 30d63cbc31..b5d78aabe6 100644 --- a/pkg/vmcp/server/status_test.go +++ b/pkg/vmcp/server/status_test.go @@ -17,39 +17,21 @@ import ( "github.com/stacklok/toolhive/pkg/networking" "github.com/stacklok/toolhive/pkg/vmcp" "github.com/stacklok/toolhive/pkg/vmcp/aggregator" - vmcpauth "github.com/stacklok/toolhive/pkg/vmcp/auth" - "github.com/stacklok/toolhive/pkg/vmcp/auth/strategies" authtypes "github.com/stacklok/toolhive/pkg/vmcp/auth/types" discoveryMocks "github.com/stacklok/toolhive/pkg/vmcp/discovery/mocks" "github.com/stacklok/toolhive/pkg/vmcp/mocks" "github.com/stacklok/toolhive/pkg/vmcp/router" "github.com/stacklok/toolhive/pkg/vmcp/server" - vmcpsession "github.com/stacklok/toolhive/pkg/vmcp/session" ) // StatusResponse mirrors the server's status response structure for test deserialization. type StatusResponse struct { Backends []BackendStatus `json:"backends"` - Sessions *SessionsStatus `json:"sessions,omitempty"` Healthy bool `json:"healthy"` Version string `json:"version"` GroupRef string `json:"group_ref"` } -// SessionsStatus mirrors the server's sessions status structure for test deserialization. -type SessionsStatus struct { - ActiveCount int `json:"active_count"` - BackendUsage map[string]BackendUsage `json:"backend_usage"` -} - -// BackendUsage mirrors the server's backend usage structure for test deserialization. -// This provides operational visibility without exposing session identifiers. -type BackendUsage struct { - SessionCount int `json:"session_count"` // Number of sessions using this backend - HealthyCount int `json:"healthy_count"` // Number of sessions with healthy connections - FailedCount int `json:"failed_count"` // Number of sessions with failed connections -} - // BackendStatus mirrors the server's backend status structure for test deserialization. type BackendStatus struct { Name string `json:"name"` @@ -260,149 +242,6 @@ func TestStatusEndpoint_BackendFieldMapping(t *testing.T) { assert.Equal(t, authtypes.StrategyTypeTokenExchange, b.AuthType) } -func TestStatusEndpoint_SessionsNotIncludedWhenV2Disabled(t *testing.T) { - t.Parallel() - - // Default server configuration has SessionManagementV2 = false - backends := []vmcp.Backend{{ - ID: "b1", Name: "test-backend", HealthStatus: vmcp.BackendHealthy, - }} - srv := createTestServerWithBackends(t, backends, "") - - resp, err := http.Get("http://" + srv.Address() + "/status") - require.NoError(t, err) - defer resp.Body.Close() - - var status StatusResponse - require.NoError(t, json.NewDecoder(resp.Body).Decode(&status)) - - // Sessions field should be nil when v2 is disabled - assert.Nil(t, status.Sessions, "Sessions should not be present when SessionManagementV2 is disabled") - assert.Len(t, status.Backends, 1) - assert.True(t, status.Healthy) -} - -func TestStatusEndpoint_SessionsIncludedWhenV2Enabled(t *testing.T) { - t.Parallel() - - // This test uses the sessionmanager package directly rather than mocking - // to verify the full integration of backend usage statistics in /status. - // Since we cannot easily create real MultiSessions without starting backend - // servers, we verify the empty case (no active sessions) and the structure. - - ctrl := gomock.NewController(t) - t.Cleanup(ctrl.Finish) - - mockBackendClient := mocks.NewMockBackendClient(ctrl) - mockDiscoveryMgr := discoveryMocks.NewMockManager(ctrl) - rt := router.NewDefaultRouter() - - port := networking.FindAvailable() - require.NotZero(t, port, "Failed to find available port") - - mockDiscoveryMgr.EXPECT(). - Discover(gomock.Any(), gomock.Any()). - Return(&aggregator.AggregatedCapabilities{ - Tools: []vmcp.Tool{}, - Resources: []vmcp.Resource{}, - Prompts: []vmcp.Prompt{}, - RoutingTable: &vmcp.RoutingTable{ - Tools: make(map[string]*vmcp.BackendTarget), - Resources: make(map[string]*vmcp.BackendTarget), - Prompts: make(map[string]*vmcp.BackendTarget), - }, - Metadata: &aggregator.AggregationMetadata{}, - }, nil). - AnyTimes() - mockDiscoveryMgr.EXPECT().Stop().AnyTimes() - - backends := []vmcp.Backend{ - {ID: "backend1", Name: "backend1", HealthStatus: vmcp.BackendHealthy}, - } - - // Create a real session factory (required for SessionManagementV2) - authReg := vmcpauth.NewDefaultOutgoingAuthRegistry() - require.NoError(t, authReg.RegisterStrategy( - authtypes.StrategyTypeUnauthenticated, - strategies.NewUnauthenticatedStrategy(), - )) - factory := vmcpsession.NewSessionFactory(authReg) - - ctx, cancel := context.WithCancel(t.Context()) - t.Cleanup(cancel) - - srv, err := server.New(ctx, &server.Config{ - Name: "test-vmcp-v2", - Version: "1.0.0", - Host: "127.0.0.1", - Port: port, - GroupRef: "test-group", - SessionTTL: 5 * time.Minute, - SessionManagementV2: true, - SessionFactory: factory, - }, rt, mockBackendClient, mockDiscoveryMgr, vmcp.NewImmutableRegistry(backends), nil) - require.NoError(t, err) - - errCh := make(chan error, 1) - go func() { - if err := srv.Start(ctx); err != nil { - errCh <- err - } - }() - - select { - case <-srv.Ready(): - case err := <-errCh: - t.Fatalf("Server failed to start: %v", err) - case <-time.After(5 * time.Second): - t.Fatalf("Server did not become ready within 5s") - } - - time.Sleep(10 * time.Millisecond) - - // Make request to /status endpoint - resp, err := http.Get("http://" + srv.Address() + "/status") - require.NoError(t, err) - defer resp.Body.Close() - - var status StatusResponse - require.NoError(t, json.NewDecoder(resp.Body).Decode(&status)) - - // Verify sessions section is included (even if empty) - require.NotNil(t, status.Sessions, "Sessions should be present when SessionManagementV2 is enabled") - assert.Equal(t, 0, status.Sessions.ActiveCount, "Should report 0 active sessions (no sessions created yet)") - assert.NotNil(t, status.Sessions.BackendUsage, "BackendUsage should be non-nil map") - assert.Empty(t, status.Sessions.BackendUsage, "BackendUsage should be empty (no sessions created yet)") - - // Verify the response structure doesn't expose session IDs - resp2, err := http.Get("http://" + srv.Address() + "/status") - require.NoError(t, err) - defer resp2.Body.Close() - - var rawResponse map[string]interface{} - require.NoError(t, json.NewDecoder(resp2.Body).Decode(&rawResponse)) - - sessions, ok := rawResponse["sessions"].(map[string]interface{}) - require.True(t, ok, "sessions should be a map") - - // Ensure no session_id field exists - _, hasSessionID := sessions["session_id"] - assert.False(t, hasSessionID, "Should not expose session_id field") - - // Ensure no sessions array with individual session data - _, hasSessions := sessions["sessions"] - assert.False(t, hasSessions, "Should not have sessions array with individual session data") - - // Should only have active_count and backend_usage - assert.Contains(t, sessions, "active_count") - assert.Contains(t, sessions, "backend_usage") - - // Verify backend_usage is a map, not an array - backendUsage, ok := sessions["backend_usage"].(map[string]interface{}) - require.True(t, ok, "backend_usage should be a map") - assert.Empty(t, backendUsage, "backend_usage should be empty (no sessions yet)") -} - func TestStatusEndpoint_NoCredentialsExposed(t *testing.T) { t.Parallel() @@ -420,6 +259,7 @@ func TestStatusEndpoint_NoCredentialsExposed(t *testing.T) { resp, err := http.Get("http://" + srv.Address() + "/status") require.NoError(t, err) defer resp.Body.Close() + require.Equal(t, http.StatusOK, resp.StatusCode) // Read raw response body to check what's exposed var rawResponse map[string]interface{} @@ -430,7 +270,8 @@ func TestStatusEndpoint_NoCredentialsExposed(t *testing.T) { require.True(t, ok) require.Len(t, respBackends, 1) - backend := respBackends[0].(map[string]interface{}) + backend, ok := respBackends[0].(map[string]interface{}) + require.True(t, ok, "expected backends[0] to be a JSON object") // Should NOT expose: BaseURL, credentials, tokens, internal URLs _, hasBaseURL := backend["base_url"]