-
Notifications
You must be signed in to change notification settings - Fork 3
Expand file tree
/
Copy pathagent_delegation.py
More file actions
180 lines (142 loc) · 5.47 KB
/
agent_delegation.py
File metadata and controls
180 lines (142 loc) · 5.47 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
"""
This example demonstrates agent delegation, where one agent (the orchestrator) can dynamically
invoke other agents through tools. This pattern is useful when you want to:
1. Let an agent dynamically choose which specialized agents to use
2. Allow the orchestrator to adapt its strategy based on initial responses
3. Enable flexible workflows where the sequence of agent calls isn't fixed
4. Track which agents were used and why
The example shows how to:
1. Set up a tool that allows one agent to call another
2. Structure input/output types for delegation
3. Configure the orchestrator agent with the delegation tool
4. Handle responses and track agent usage
"""
import asyncio
from typing import Optional
import pytest
from pydantic import BaseModel, Field
import workflowai
from workflowai import Model
class DelegateInput(BaseModel):
"""Input for delegating a task to a specialized agent."""
task: str = Field(description="The task to delegate")
model: Model = Field(description="The model to use for this task")
context: Optional[str] = Field(
default=None,
description="Additional context that might help the agent",
)
class DelegateOutput(BaseModel):
"""Output from a delegated task."""
response: str = Field(description="The agent's response to the task")
confidence: float = Field(
description="Confidence score between 0 and 1",
ge=0,
le=1,
)
class WorkerInput(BaseModel):
"""Input for the worker agent."""
task: str = Field(description="The task to perform")
context: Optional[str] = Field(
default=None,
description="Additional context that might help with the task",
)
class WorkerOutput(BaseModel):
"""Output from the worker agent."""
response: str = Field(description="The response to the task")
confidence: float = Field(
description="Confidence score between 0 and 1",
ge=0,
le=1,
)
class OrchestratorInput(BaseModel):
"""Input for the orchestrator agent."""
objective: str = Field(description="The high-level objective to achieve")
requirements: list[str] = Field(
description="List of specific requirements or constraints",
default_factory=list,
)
class OrchestratorOutput(BaseModel):
"""Final output from the orchestrator."""
solution: str = Field(description="The final solution that meets the objective")
explanation: str = Field(description="Explanation of how the solution was derived")
agents_used: list[str] = Field(
description="List of agents/models used in the process",
default_factory=list,
)
@workflowai.agent()
async def worker_agent(agent_input: WorkerInput) -> WorkerOutput:
"""
A specialized worker agent that handles specific tasks.
Make sure to:
1. Focus on the specific task assigned
2. Provide detailed reasoning for your approach
3. Include confidence level in your response
"""
...
async def delegate_task(agent_input: DelegateInput) -> DelegateOutput:
"""Delegate a task to a worker agent with a specific model."""
# Run the worker agent with the specified model
run = await worker_agent.run(
WorkerInput(
task=agent_input.task,
context=agent_input.context,
),
model=agent_input.model,
)
return DelegateOutput(
response=run.output.response,
confidence=run.output.confidence,
)
@workflowai.agent(
id="orchestrator",
model=Model.GEMINI_2_0_FLASH_LATEST,
tools=[delegate_task],
)
async def orchestrator_agent(agent_input: OrchestratorInput) -> OrchestratorOutput:
"""
You are an expert orchestrator that breaks down complex objectives into smaller tasks
and delegates them to specialized agents. You can use the delegate_task tool to assign
work to other agents.
Your responsibilities:
1. Break down the objective into smaller, focused tasks
2. Choose appropriate models for each task based on its nature:
- GPT-4O for complex reasoning or creative tasks
- Claude for analytical or structured tasks
- Gemini for technical or scientific tasks
3. Use the delegate_task tool to assign work
4. Evaluate responses and confidence levels
5. Request additional work if needed
6. Synthesize all responses into a cohesive solution
7. Track which models were used and why
Make sure the final solution:
- Meets all specified requirements
- Is well-reasoned and explained
- Acknowledges any limitations or uncertainties
- Lists all models/agents used in the process
"""
...
@pytest.mark.xfail(reason="Example is flaky")
async def main():
# Example: Software architecture task
print("\nExample: Software Architecture Design")
print("-" * 50)
result = await orchestrator_agent.run(
OrchestratorInput(
objective="Design a scalable microservices architecture for an e-commerce platform",
requirements=[
"Must handle 10,000+ concurrent users",
"Include payment processing and inventory management",
"Ensure data consistency across services",
"Provide real-time order tracking",
],
),
)
print("\nSolution:")
print(result.output.solution)
print("\nExplanation:")
print(result.output.explanation)
print("\nAgents Used:")
for agent in result.output.agents_used:
print(f"- {agent}")
if __name__ == "__main__":
asyncio.run(main())