Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions pkg/vmcp/composer/elicitation_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func TestWorkflowEngine_ExecuteElicitationStep_Accept(t *testing.T) {

handler := NewDefaultElicitationHandler(mockSDK)
stateStore := NewInMemoryStateStore(1*time.Minute, 1*time.Hour)
engine := NewWorkflowEngine(te.Router, te.Backend, handler, stateStore, nil)
engine := NewWorkflowEngine(te.Router, te.Backend, handler, stateStore, nil, nil)

workflow := &WorkflowDefinition{
Name: "deployment-workflow",
Expand Down Expand Up @@ -151,7 +151,7 @@ func TestWorkflowEngine_ExecuteElicitationStep_Decline(t *testing.T) {

handler := NewDefaultElicitationHandler(mockSDK)
stateStore := NewInMemoryStateStore(1*time.Minute, 1*time.Hour)
engine := NewWorkflowEngine(te.Router, te.Backend, handler, stateStore, nil)
engine := NewWorkflowEngine(te.Router, te.Backend, handler, stateStore, nil, nil)

workflow := &WorkflowDefinition{
Name: "test-workflow",
Expand Down Expand Up @@ -228,7 +228,7 @@ func TestWorkflowEngine_ExecuteElicitationStep_Cancel(t *testing.T) {

handler := NewDefaultElicitationHandler(mockSDK)
stateStore := NewInMemoryStateStore(1*time.Minute, 1*time.Hour)
engine := NewWorkflowEngine(te.Router, te.Backend, handler, stateStore, nil)
engine := NewWorkflowEngine(te.Router, te.Backend, handler, stateStore, nil, nil)

workflow := &WorkflowDefinition{
Name: "test-workflow",
Expand Down Expand Up @@ -275,7 +275,7 @@ func TestWorkflowEngine_ExecuteElicitationStep_Timeout(t *testing.T) {

handler := NewDefaultElicitationHandler(mockSDK)
stateStore := NewInMemoryStateStore(1*time.Minute, 1*time.Hour)
engine := NewWorkflowEngine(te.Router, te.Backend, handler, stateStore, nil)
engine := NewWorkflowEngine(te.Router, te.Backend, handler, stateStore, nil, nil)

workflow := &WorkflowDefinition{
Name: "test-workflow",
Expand Down Expand Up @@ -309,7 +309,7 @@ func TestWorkflowEngine_ExecuteElicitationStep_NoHandler(t *testing.T) {

te := newTestEngine(t)
// Create engine WITHOUT elicitation handler
engine := NewWorkflowEngine(te.Router, te.Backend, nil, nil, nil)
engine := NewWorkflowEngine(te.Router, te.Backend, nil, nil, nil, nil)

workflow := &WorkflowDefinition{
Name: "test-workflow",
Expand Down Expand Up @@ -348,7 +348,7 @@ func TestWorkflowEngine_MultiStepWithElicitation(t *testing.T) {

handler := NewDefaultElicitationHandler(mockSDK)
stateStore := NewInMemoryStateStore(1*time.Minute, 1*time.Hour)
engine := NewWorkflowEngine(te.Router, te.Backend, handler, stateStore, nil)
engine := NewWorkflowEngine(te.Router, te.Backend, handler, stateStore, nil, nil)

workflow := &WorkflowDefinition{
Name: "multi-step-workflow",
Expand Down
8 changes: 7 additions & 1 deletion pkg/vmcp/composer/testhelpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,14 @@ func newTestEngine(t *testing.T) *testEngine {
t.Cleanup(ctrl.Finish)

mockRouter := routermocks.NewMockRouter(ctrl)
// ResolveToolName is called by getToolInputSchema on every tool step.
// For tests that use NewWorkflowEngine (no tools list), the result is
// always nil, so a pass-through AnyTimes expectation is sufficient.
mockRouter.EXPECT().ResolveToolName(gomock.Any(), gomock.Any()).
DoAndReturn(func(_ context.Context, name string) string { return name }).
AnyTimes()
mockBackend := mocks.NewMockBackendClient(ctrl)
engine := NewWorkflowEngine(mockRouter, mockBackend, nil, nil, nil) // nil elicitationHandler, stateStore, and auditor for simple tests
engine := NewWorkflowEngine(mockRouter, mockBackend, nil, nil, nil, nil) // nil elicitationHandler, stateStore, auditor, and tools for simple tests

return &testEngine{
Engine: engine,
Expand Down
10 changes: 5 additions & 5 deletions pkg/vmcp/composer/workflow_audit_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func TestWorkflowEngine_WithAuditor_SuccessfulWorkflow(t *testing.T) {
require.NoError(t, err)

// Create engine with auditor
engine := NewWorkflowEngine(te.Router, te.Backend, nil, nil, auditor)
engine := NewWorkflowEngine(te.Router, te.Backend, nil, nil, auditor, nil)

// Setup simple workflow
workflow := simpleWorkflow("audit-test",
Expand Down Expand Up @@ -86,7 +86,7 @@ func TestWorkflowEngine_WithAuditor_FailedWorkflow(t *testing.T) {
})
require.NoError(t, err)

engine := NewWorkflowEngine(te.Router, te.Backend, nil, nil, auditor)
engine := NewWorkflowEngine(te.Router, te.Backend, nil, nil, auditor, nil)

workflow := simpleWorkflow("fail-test",
toolStep("step1", "tool1", map[string]any{"arg": "value"}),
Expand Down Expand Up @@ -119,7 +119,7 @@ func TestWorkflowEngine_WithAuditor_WorkflowTimeout(t *testing.T) {
})
require.NoError(t, err)

engine := NewWorkflowEngine(te.Router, te.Backend, nil, nil, auditor)
engine := NewWorkflowEngine(te.Router, te.Backend, nil, nil, auditor, nil)

workflow := &WorkflowDefinition{
Name: "timeout-test",
Expand Down Expand Up @@ -162,7 +162,7 @@ func TestWorkflowEngine_WithAuditor_StepSkipped(t *testing.T) {
})
require.NoError(t, err)

engine := NewWorkflowEngine(te.Router, te.Backend, nil, nil, auditor)
engine := NewWorkflowEngine(te.Router, te.Backend, nil, nil, auditor, nil)

workflow := &WorkflowDefinition{
Name: "skip-test",
Expand Down Expand Up @@ -215,7 +215,7 @@ func TestWorkflowEngine_WithAuditor_RetryStep(t *testing.T) {
})
require.NoError(t, err)

engine := NewWorkflowEngine(te.Router, te.Backend, nil, nil, auditor)
engine := NewWorkflowEngine(te.Router, te.Backend, nil, nil, auditor, nil)

workflow := &WorkflowDefinition{
Name: "retry-test",
Expand Down
38 changes: 21 additions & 17 deletions pkg/vmcp/composer/workflow_engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import (
"github.com/stacklok/toolhive/pkg/audit"
"github.com/stacklok/toolhive/pkg/vmcp"
"github.com/stacklok/toolhive/pkg/vmcp/conversion"
"github.com/stacklok/toolhive/pkg/vmcp/discovery"
"github.com/stacklok/toolhive/pkg/vmcp/router"
"github.com/stacklok/toolhive/pkg/vmcp/schema"
)
Expand Down Expand Up @@ -46,6 +45,10 @@ type workflowEngine struct {
// backendClient makes calls to backend MCP servers.
backendClient vmcp.BackendClient

// tools is the resolved tool list for the session, used by getToolInputSchema
// for argument type coercion. Nil means no schema-based coercion (discovery-based routing).
tools []vmcp.Tool

// templateExpander handles template expansion.
templateExpander TemplateExpander

Expand All @@ -67,19 +70,20 @@ type workflowEngine struct {

// NewWorkflowEngine creates a new workflow execution engine.
//
// The elicitationHandler parameter is optional. If nil, elicitation steps will fail.
// This allows the engine to be used without elicitation support for simple workflows.
// tools is the resolved tool list for schema-based argument type coercion. Pass nil
// when the engine is used for validation or discovery-based routing only.
//
// The elicitationHandler parameter is optional. If nil, elicitation steps will fail.
// The stateStore parameter is optional. If nil, workflow status tracking and cancellation
// will not be available. Use NewInMemoryStateStore() for basic state tracking.
//
// The auditor parameter is optional. If nil, workflow execution will not be audited.
func NewWorkflowEngine(
rtr router.Router,
backendClient vmcp.BackendClient,
elicitationHandler ElicitationProtocolHandler,
stateStore WorkflowStateStore,
auditor *audit.WorkflowAuditor,
tools []vmcp.Tool,
) Composer {
return &workflowEngine{
router: rtr,
Expand All @@ -90,6 +94,7 @@ func NewWorkflowEngine(
dagExecutor: newDAGExecutor(defaultMaxParallelSteps),
stateStore: stateStore,
auditor: auditor,
tools: tools,
}
}

Expand Down Expand Up @@ -1223,20 +1228,19 @@ func (e *workflowEngine) auditStepSkipped(
}
}

// getToolInputSchema looks up a tool's InputSchema from discovered capabilities.
// Returns nil if the tool is not found or capabilities are not in context.
func (*workflowEngine) getToolInputSchema(ctx context.Context, toolName string) map[string]any {
caps, ok := discovery.DiscoveredCapabilitiesFromContext(ctx)
if !ok || caps == nil {
return nil
}

// Search in backend tools
for i := range caps.Tools {
if caps.Tools[i].Name == toolName {
return caps.Tools[i].InputSchema
// getToolInputSchema looks up a tool's InputSchema from the session-bound tools
// list. If toolName uses the dot convention "{workloadID}.{originalCapabilityName}",
// ResolveToolName is called to translate it to the conflict-resolved key before
// lookup. Returns nil if the engine has no tools list or the tool is not found.
func (e *workflowEngine) getToolInputSchema(ctx context.Context, toolName string) map[string]any {
resolved := toolName
if e.router != nil {
resolved = e.router.ResolveToolName(ctx, toolName)
}
for i := range e.tools {
if e.tools[i].Name == resolved {
return e.tools[i].InputSchema
}
}

return nil
}
106 changes: 105 additions & 1 deletion pkg/vmcp/composer/workflow_engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -390,9 +390,12 @@ func TestWorkflowEngine_ParallelExecution(t *testing.T) {
defer ctrl.Finish()

mockRouter := routermocks.NewMockRouter(ctrl)
mockRouter.EXPECT().ResolveToolName(gomock.Any(), gomock.Any()).
DoAndReturn(func(_ context.Context, name string) string { return name }).
AnyTimes()
mockBackend := mocks.NewMockBackendClient(ctrl)
stateStore := NewInMemoryStateStore(1*time.Minute, 1*time.Hour)
engine := NewWorkflowEngine(mockRouter, mockBackend, nil, stateStore, nil)
engine := NewWorkflowEngine(mockRouter, mockBackend, nil, stateStore, nil, nil)

// Track execution timing to verify parallel execution
var executionMu sync.Mutex
Expand Down Expand Up @@ -741,3 +744,104 @@ func TestWorkflowEngine_WorkflowMetadataAvailableInTemplates(t *testing.T) {
assert.Equal(t, WorkflowStatusCompleted, result.Status)
assert.Len(t, result.Steps, 2)
}

func TestWorkflowEngine_SessionEngine_CoercesTemplateStringToTypedArg(t *testing.T) {
t.Parallel()

// Template expansion always produces strings. When the engine is created
// with a bound tool list, getToolInputSchema resolves the target tool's InputSchema
// and the schema coercion layer converts "42" → 42 before calling the backend.
ctrl := gomock.NewController(t)
t.Cleanup(ctrl.Finish)

mockRouter := routermocks.NewMockRouter(ctrl)
mockRouter.EXPECT().ResolveToolName(gomock.Any(), gomock.Any()).
DoAndReturn(func(_ context.Context, name string) string { return name }).
AnyTimes()
mockBackend := mocks.NewMockBackendClient(ctrl)

tools := []vmcp.Tool{
{
Name: "count_items",
InputSchema: map[string]any{
"type": "object",
"properties": map[string]any{
"limit": map[string]any{"type": "integer"},
},
},
},
}

engine := NewWorkflowEngine(mockRouter, mockBackend, nil, nil, nil, tools)

target := &vmcp.BackendTarget{WorkloadID: "backend1", BaseURL: "http://backend1:8080"}
mockRouter.EXPECT().RouteTool(gomock.Any(), "count_items").Return(target, nil)

// Expect the backend to receive the coerced integer, not the string "42".
coercedArgs := map[string]any{"limit": int64(42)}
mockBackend.EXPECT().
CallTool(gomock.Any(), target, "count_items", coercedArgs, gomock.Any()).
Return(&vmcp.ToolCallResult{StructuredContent: map[string]any{"items": []any{}}, Content: []vmcp.Content{}}, nil)

workflow := &WorkflowDefinition{
Name: "coerce_test",
Parameters: map[string]any{
"type": "object",
"properties": map[string]any{
"n": map[string]any{"type": "string"},
},
},
Steps: []WorkflowStep{
{
ID: "step1",
Type: StepTypeTool,
Tool: "count_items",
// Template expansion produces a string; coercion must convert it to int.
Arguments: map[string]any{"limit": "{{.params.n}}"},
},
},
}

result, err := engine.ExecuteWorkflow(context.Background(), workflow, map[string]any{"n": "42"})
require.NoError(t, err)
assert.Equal(t, WorkflowStatusCompleted, result.Status)
}

func TestWorkflowEngine_SessionEngine_ToolNotInList_ReturnsNilSchema(t *testing.T) {
t.Parallel()

// When a bound tool list is provided but the requested tool is not in it,
// getToolInputSchema returns nil and coercion is a no-op.
ctrl := gomock.NewController(t)
t.Cleanup(ctrl.Finish)

mockRouter := routermocks.NewMockRouter(ctrl)
mockRouter.EXPECT().ResolveToolName(gomock.Any(), gomock.Any()).
DoAndReturn(func(_ context.Context, name string) string { return name }).
AnyTimes()
mockBackend := mocks.NewMockBackendClient(ctrl)

// Tools list does not include "other_tool".
tools := []vmcp.Tool{{Name: "known_tool", InputSchema: map[string]any{"type": "object"}}}
engine := NewWorkflowEngine(mockRouter, mockBackend, nil, nil, nil, tools)

target := &vmcp.BackendTarget{WorkloadID: "backend1", BaseURL: "http://backend1:8080"}
mockRouter.EXPECT().RouteTool(gomock.Any(), "other_tool").Return(target, nil)

// Args pass through unmodified (string stays a string).
rawArgs := map[string]any{"value": "hello"}
mockBackend.EXPECT().
CallTool(gomock.Any(), target, "other_tool", rawArgs, gomock.Any()).
Return(&vmcp.ToolCallResult{StructuredContent: map[string]any{"ok": true}, Content: []vmcp.Content{}}, nil)

workflow := &WorkflowDefinition{
Name: "no_schema_test",
Steps: []WorkflowStep{
{ID: "s1", Type: StepTypeTool, Tool: "other_tool", Arguments: rawArgs},
},
}

result, err := engine.ExecuteWorkflow(context.Background(), workflow, nil)
require.NoError(t, err)
assert.Equal(t, WorkflowStatusCompleted, result.Status)
}
10 changes: 6 additions & 4 deletions pkg/vmcp/discovery/middleware.go
Original file line number Diff line number Diff line change
Expand Up @@ -281,10 +281,12 @@ func handleSubsequentRequest(
return ctx, fmt.Errorf("session not found: %s", sessionID)
}

// Backend tool calls are routed by session-scoped handlers registered with the SDK.
// However, composite tool workflow steps go through the shared router which requires
// DiscoveredCapabilities in the context. Inject capabilities built from the session's
// routing table so composite workflows can route backend tool calls correctly.
// Backend tool handlers (created by DefaultHandlerFactory) resolve their backend
// target by calling router.RouteTool(ctx, name), which reads DiscoveredCapabilities
// from the request context. Inject capabilities built from the session's routing
// table so these handlers can route correctly on subsequent requests.
// Note: composite tool workflow engines are created per-session and route via
// SessionRouter directly, so they no longer depend on this context value.
multiSess, isMulti := rawSess.(vmcpsession.MultiSession)
if !isMulti {
// The session is still a StreamableSession placeholder — Phase 2
Expand Down
7 changes: 7 additions & 0 deletions pkg/vmcp/router/default_router.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,13 @@ func (*defaultRouter) RouteTool(ctx context.Context, toolName string) (*vmcp.Bac
)
}

// ResolveToolName returns toolName unchanged. The defaultRouter has no static
// routing table, so dot-convention resolution is not available; the caller
// should already be using resolved names when working with this router.
func (*defaultRouter) ResolveToolName(_ context.Context, toolName string) string {
return toolName
}

// RouteResource resolves a resource URI to its backend target.
// With lazy discovery, this method gets capabilities from the request context
// instead of using a cached routing table.
Expand Down
14 changes: 14 additions & 0 deletions pkg/vmcp/router/mocks/mock_router.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 8 additions & 0 deletions pkg/vmcp/router/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,14 @@ type Router interface {
// Returns ErrToolNotFound if the tool doesn't exist in any backend.
RouteTool(ctx context.Context, toolName string) (*vmcp.BackendTarget, error)

// ResolveToolName translates a tool name (which may use the dot-convention
// "{workloadID}.{originalCapabilityName}") to the conflict-resolved routing
// table key used in the session tools list. Returns toolName unchanged when
// the name cannot be resolved or the router has no static routing table —
// pass-through semantics so callers can use the result directly without
// special-casing the unresolvable case.
ResolveToolName(ctx context.Context, toolName string) string

// RouteResource resolves a resource URI to its backend target.
// Returns ErrResourceNotFound if the resource doesn't exist in any backend.
RouteResource(ctx context.Context, uri string) (*vmcp.BackendTarget, error)
Expand Down
Loading
Loading