Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions pkg/transport/session/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,13 @@ func (m *Manager) Stop() error {
return nil
}

// SupportsRange reports whether the current storage backend supports Range().
// Returns true only for LocalStorage; distributed backends (Redis, Valkey) return false.
func (m *Manager) SupportsRange() bool {
_, ok := m.storage.(*LocalStorage)
return ok
}

// Range calls f sequentially for each key and value present in the map.
// If f returns false, range stops the iteration.
//
Expand Down
15 changes: 15 additions & 0 deletions pkg/vmcp/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,10 @@ const (
// defaultSessionTTL is the default session time-to-live duration.
// Sessions that are inactive for this duration will be automatically cleaned up.
defaultSessionTTL = 30 * time.Minute

// defaultSessionLoggingInterval is how often active sessions are logged when
// SessionManagementV2 is enabled.
defaultSessionLoggingInterval = 1 * time.Minute
)

//go:generate mockgen -destination=mocks/mock_watcher.go -package=mocks -source=server.go Watcher
Expand Down Expand Up @@ -667,6 +671,17 @@ func (s *Server) Start(ctx context.Context) error {
}
}

// Start periodic session logging if SessionManagementV2 is enabled
if s.vmcpSessionMgr != nil {
sessionLoggingCtx, sessionLoggingCancel := context.WithCancel(ctx)
s.vmcpSessionMgr.StartPeriodicLogging(sessionLoggingCtx, defaultSessionLoggingInterval)
slog.Info("session periodic logging started", "interval", defaultSessionLoggingInterval)
s.shutdownFuncs = append(s.shutdownFuncs, func(context.Context) error {
sessionLoggingCancel()
return nil
})
}

// Start status reporter if configured
if s.statusReporter != nil {
shutdown, err := s.statusReporter.Start(ctx)
Expand Down
6 changes: 6 additions & 0 deletions pkg/vmcp/server/session_manager_interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package server

import (
"context"
"time"

mcpserver "github.com/mark3labs/mcp-go/server"

Expand All @@ -30,4 +31,9 @@ type SessionManager interface {
// handlers. This enables session-scoped routing: each tool call goes through the
// session's backend connections rather than the global router.
GetAdaptedTools(sessionID string) ([]mcpserver.ServerTool, error)

// StartPeriodicLogging starts a background goroutine that periodically logs
// the active session count and per-backend session counts at the given interval.
// The goroutine stops when ctx is cancelled.
StartPeriodicLogging(ctx context.Context, interval time.Duration)
}
137 changes: 107 additions & 30 deletions pkg/vmcp/server/sessionmanager/session_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ import (
"errors"
"fmt"
"log/slog"
"sync"
"time"

"github.com/google/uuid"
"github.com/mark3labs/mcp-go/mcp"
Expand Down Expand Up @@ -68,6 +70,9 @@ type Manager struct {
storage *transportsession.Manager
factory vmcpsession.MultiSessionFactory
backendRegistry vmcp.BackendRegistry

// storageWarningOnce ensures we only log the distributed storage warning once
storageWarningOnce sync.Once
}

// New creates a Manager backed by the given transport
Expand Down Expand Up @@ -203,9 +208,14 @@ func (sm *Manager) CreateSession(
return nil, fmt.Errorf("Manager.CreateSession: failed to replace placeholder: %w", err)
}

slog.Debug("Manager: created multi-session",
backendNames := make([]string, len(backends))
for i, b := range backends {
backendNames[i] = b.Name
}
slog.Info("Manager: session created",
"session_id", sessionID,
"backend_count", len(backends))
"backend_count", len(backends),
"backends", backendNames)
return sess, nil
}

Expand Down Expand Up @@ -268,6 +278,7 @@ func (sm *Manager) Terminate(sessionID string) (isNotAllowed bool, err error) {

// If the session is a fully-formed MultiSession, close its backend connections.
if multiSess, ok := sess.(vmcpsession.MultiSession); ok {
backendSessions := multiSess.BackendSessions()
if closeErr := multiSess.Close(); closeErr != nil {
slog.Warn("Manager.Terminate: error closing multi-session backend connections",
"session_id", sessionID, "error", closeErr)
Expand All @@ -276,37 +287,39 @@ func (sm *Manager) Terminate(sessionID string) (isNotAllowed bool, err error) {
if deleteErr := sm.storage.Delete(sessionID); deleteErr != nil {
return false, fmt.Errorf("Manager.Terminate: failed to delete session from storage: %w", deleteErr)
}
} else {
// Placeholder session (not yet upgraded to MultiSession).
//
// This handles the race condition where a client sends DELETE between
// Generate() (Phase 1) and CreateSession() (Phase 2). The two-phase
// pattern creates a window where the session exists as a placeholder:
//
// 1. Client sends initialize → Generate() creates placeholder
// 2. Client sends DELETE before OnRegisterSession hook fires
// 3. We mark the placeholder as terminated (don't delete it)
// 4. CreateSession() hook fires → sees terminated flag → fails fast
//
// Without this branch, CreateSession() would open backend HTTP connections
// for a session the client already terminated, silently resurrecting it.
//
// We mark (not delete) so Validate() can return isTerminated=true, which
// lets the SDK distinguish "actively terminated" from "never existed".
// TTL cleanup will remove the placeholder later.
sess.SetMetadata(MetadataKeyTerminated, MetadataValTrue)
if replaceErr := sm.storage.UpsertSession(sess); replaceErr != nil {
slog.Warn("Manager.Terminate: failed to persist terminated flag for placeholder; attempting delete fallback",
"session_id", sessionID, "error", replaceErr)
if deleteErr := sm.storage.Delete(sessionID); deleteErr != nil {
return false, fmt.Errorf(
"Manager.Terminate: failed to persist terminated flag and delete placeholder: upsertErr=%v, deleteErr=%w",
replaceErr, deleteErr)
}
slog.Info("Manager: session terminated", "session_id", sessionID, "backend_count", len(backendSessions))
return false, nil
}

// Placeholder session (not yet upgraded to MultiSession).
//
// This handles the race condition where a client sends DELETE between
// Generate() (Phase 1) and CreateSession() (Phase 2). The two-phase
// pattern creates a window where the session exists as a placeholder:
//
// 1. Client sends initialize → Generate() creates placeholder
// 2. Client sends DELETE before OnRegisterSession hook fires
// 3. We mark the placeholder as terminated (don't delete it)
// 4. CreateSession() hook fires → sees terminated flag → fails fast
//
// Without this branch, CreateSession() would open backend HTTP connections
// for a session the client already terminated, silently resurrecting it.
//
// We mark (not delete) so Validate() can return isTerminated=true, which
// lets the SDK distinguish "actively terminated" from "never existed".
// TTL cleanup will remove the placeholder later.
sess.SetMetadata(MetadataKeyTerminated, MetadataValTrue)
if replaceErr := sm.storage.UpsertSession(sess); replaceErr != nil {
slog.Warn("Manager.Terminate: failed to persist terminated flag for placeholder; attempting delete fallback",
"session_id", sessionID, "error", replaceErr)
if deleteErr := sm.storage.Delete(sessionID); deleteErr != nil {
return false, fmt.Errorf(
"Manager.Terminate: failed to persist terminated flag and delete placeholder: upsertErr=%v, deleteErr=%w",
replaceErr, deleteErr)
}
}

slog.Info("Manager.Terminate: session terminated", "session_id", sessionID)
slog.Info("Manager: placeholder session terminated", "session_id", sessionID)
return false, nil
}

Expand Down Expand Up @@ -404,3 +417,67 @@ func (sm *Manager) GetAdaptedTools(sessionID string) ([]mcpserver.ServerTool, er

return sdkTools, nil
}

// StartPeriodicLogging starts a background goroutine that logs active session
// counts at the given interval until ctx is cancelled.
//
// This provides operational visibility into session load without exposing
// session identifiers over the network.
func (sm *Manager) StartPeriodicLogging(ctx context.Context, interval time.Duration) {
if interval <= 0 {
slog.Warn("Manager: StartPeriodicLogging called with non-positive interval, defaulting to 1m", "interval", interval)
interval = time.Minute
}
go func() {
ticker := time.NewTicker(interval)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
sm.logActiveSessions()
}
}
}()
}

// logActiveSessions logs a summary of all active vMCP sessions on this node.
//
// Storage Backend Limitation: Range() only works with LocalStorage. With
// distributed backends (Redis, Valkey) it is a no-op and 0 sessions will be
// reported even if sessions exist in the cluster. This is expected: sessions
// hold in-process HTTP connections and are inherently node-local.
func (sm *Manager) logActiveSessions() {
if !sm.storage.SupportsRange() {
sm.storageWarningOnce.Do(func() {
slog.Warn("Manager: session enumeration unavailable with current storage backend",
"reason", "Range() only works with LocalStorage; distributed storage (Redis, Valkey) cannot enumerate sessions",
"note", "Sessions are node-local; this is expected behavior")
})
return
}

activeCount := 0
backendSessionCounts := make(map[string]int) // backend ID -> session count

sm.storage.Range(func(key, value interface{}) bool {
_, ok := key.(string)
if !ok {
return true
}

multiSess, ok := value.(vmcpsession.MultiSession)
if !ok {
return true // placeholder, skip
}

activeCount++
for backendID := range multiSess.BackendSessions() {
backendSessionCounts[backendID]++
}
return true
})

slog.Info("Manager: active sessions", "total_sessions", activeCount, "backend_session_counts", backendSessionCounts)
}
49 changes: 49 additions & 0 deletions pkg/vmcp/server/status_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -241,3 +241,52 @@ func TestStatusEndpoint_BackendFieldMapping(t *testing.T) {
assert.Equal(t, "streamable-http", b.Transport)
assert.Equal(t, authtypes.StrategyTypeTokenExchange, b.AuthType)
}

func TestStatusEndpoint_NoCredentialsExposed(t *testing.T) {
t.Parallel()

backends := []vmcp.Backend{{
ID: "b1",
Name: "secure-backend",
BaseURL: "https://secret-internal-url.example.com:9090/mcp",
HealthStatus: vmcp.BackendHealthy,
AuthConfig: &authtypes.BackendAuthStrategy{
Type: authtypes.StrategyTypeTokenExchange,
},
}}
srv := createTestServerWithBackends(t, backends, "")

resp, err := http.Get("http://" + srv.Address() + "/status")
require.NoError(t, err)
defer resp.Body.Close()
require.Equal(t, http.StatusOK, resp.StatusCode)

// Read raw response body to check what's exposed
var rawResponse map[string]interface{}
require.NoError(t, json.NewDecoder(resp.Body).Decode(&rawResponse))

// Verify no sensitive fields are exposed
respBackends, ok := rawResponse["backends"].([]interface{})
require.True(t, ok)
require.Len(t, respBackends, 1)

backend, ok := respBackends[0].(map[string]interface{})
require.True(t, ok, "expected backends[0] to be a JSON object")

// Should NOT expose: BaseURL, credentials, tokens, internal URLs
_, hasBaseURL := backend["base_url"]
_, hasURL := backend["url"]
_, hasToken := backend["token"]
_, hasCredentials := backend["credentials"]

assert.False(t, hasBaseURL, "BaseURL should not be exposed")
assert.False(t, hasURL, "URL should not be exposed")
assert.False(t, hasToken, "Token should not be exposed")
assert.False(t, hasCredentials, "Credentials should not be exposed")

// Should expose: Name, Health, Transport, AuthType (safe metadata)
assert.Contains(t, backend, "name")
assert.Contains(t, backend, "health")
assert.Contains(t, backend, "transport")
assert.Contains(t, backend, "auth_type")
}
Loading