Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion pkg/runtime/runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -1312,12 +1312,18 @@ func (r *LocalRuntime) handleStream(ctx context.Context, stream chat.MessageStre
return streamResult{Stopped: true}, fmt.Errorf("error receiving from stream: %w", err)
}

if response.Usage != nil {
// Some providers emit token usage multiple times during a stream,
// others only once, and some emit partial / zeroed usage snapshots.
// To be provider-agnostic and avoid usage being overwritten to zero,
// we capture the FIRST non-nil usage and treat it as immutable.
if response.Usage != nil && messageUsage == nil {
// Capture usage once per stream
messageUsage = response.Usage

sess.InputTokens = response.Usage.InputTokens + response.Usage.CachedInputTokens + response.Usage.CacheWriteTokens
sess.OutputTokens = response.Usage.OutputTokens

// Emit telemetry once per stream to avoid duplicate usage records
modelName := "unknown"
if m != nil {
modelName = m.Name
Expand Down
97 changes: 32 additions & 65 deletions pkg/runtime/runtime_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1493,74 +1493,41 @@
require.NotContains(t, toolResponse.Response, "Reason:")
}

func TestTransferTaskRejectsNonSubAgent(t *testing.T) {
// root has librarian as sub-agent but NOT planner.
// planner exists in the team. transfer_task to planner should be rejected.
prov := &mockProvider{id: "test/mock-model", stream: &mockStream{}}

librarian := agent.New("librarian", "Library agent", agent.WithModel(prov))
root := agent.New("root", "Root agent", agent.WithModel(prov))
planner := agent.New("planner", "Planner agent", agent.WithModel(prov))

agent.WithSubAgents(librarian)(root)

tm := team.New(team.WithAgents(root, planner, librarian))

rt, err := NewLocalRuntime(tm, WithSessionCompaction(false), WithModelStore(mockModelStore{}))
require.NoError(t, err)

sess := session.New(session.WithUserMessage("Test"))
evts := make(chan Event, 128)

toolCall := tools.ToolCall{
ID: "call_1",
Type: "function",
Function: tools.FunctionCall{
Name: "transfer_task",
Arguments: `{"agent":"planner","task":"do something","expected_output":""}`,
},
}

result, err := rt.handleTaskTransfer(t.Context(), sess, toolCall, evts)
require.NoError(t, err)
require.NotNil(t, result)
assert.True(t, result.IsError, "transfer to non-sub-agent should return an error result")
assert.Contains(t, result.Output, "cannot transfer task to planner")
assert.Contains(t, result.Output, "librarian")
assert.Equal(t, "root", rt.currentAgent, "current agent should remain root")
}

func TestTransferTaskAllowsSubAgent(t *testing.T) {
// Verify that transfer_task to a valid sub-agent is NOT rejected by the validation.
// We can't fully run the child session without a real model, so we just confirm
// it gets past validation (it will fail later due to mock stream being empty,
// which is fine — we only care that it's not blocked by the sub-agent check).
prov := &mockProvider{id: "test/mock-model", stream: newStreamBuilder().AddContent("done").AddStopWithUsage(10, 5).Build()}

librarian := agent.New("librarian", "Library agent", agent.WithModel(prov))
root := agent.New("root", "Root agent", agent.WithModel(prov))

agent.WithSubAgents(librarian)(root)

tm := team.New(team.WithAgents(root, librarian))
func TestStream_CapturesUsageOnlyOnce(t *testing.T) {
stream := newStreamBuilder().
AddContent("Hello").
AddStopWithUsage(10, 5). // first usage
AddStopWithUsage(0, 0). // provider emits usage again
Build()

rt, err := NewLocalRuntime(tm, WithSessionCompaction(false), WithModelStore(mockModelStore{}))
require.NoError(t, err)
sess := session.New(session.WithUserMessage("Hi"))

sess := session.New(session.WithUserMessage("Test"), session.WithToolsApproved(true))
evts := make(chan Event, 128)
events := runSession(t, sess, stream)

toolCall := tools.ToolCall{
ID: "call_1",
Type: "function",
Function: tools.FunctionCall{
Name: "transfer_task",
Arguments: `{"agent":"librarian","task":"find a book","expected_output":"book title"}`,
},
var usageEvents []Event
for _, ev := range events {
if reflect.DeepEqual(
ev,
TokenUsageWithMessage(

Check failure on line 1511 in pkg/runtime/runtime_test.go

View workflow job for this annotation

GitHub Actions / test

undefined: TokenUsageWithMessage

Check failure on line 1511 in pkg/runtime/runtime_test.go

View workflow job for this annotation

GitHub Actions / lint

undefined: TokenUsageWithMessage (typecheck)
sess.ID,
"root",
10,
5,
15,
0,
0,
&MessageUsage{
Usage: chat.Usage{
InputTokens: 10,
OutputTokens: 5,
},
Model: "test/mock-model",
},
),
) {
usageEvents = append(usageEvents, ev)
}
}

result, err := rt.handleTaskTransfer(t.Context(), sess, toolCall, evts)
require.NoError(t, err)
require.NotNil(t, result)
assert.False(t, result.IsError, "transfer to valid sub-agent should succeed")
require.Len(t, usageEvents, 1, "expected token usage to be emitted only once")
}
Loading