-
Notifications
You must be signed in to change notification settings - Fork 41
Expand file tree
/
Copy path65_parallel_with_tools.py
More file actions
102 lines (81 loc) · 3.26 KB
/
Copy path65_parallel_with_tools.py
File metadata and controls
102 lines (81 loc) · 3.26 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
# Copyright (c) 2025 Agentspan
# Licensed under the MIT License. See LICENSE file in the project root for details.
"""Parallel Agents with Tools — each branch has its own tools.
Extends the basic parallel pattern (example 07) by giving each parallel
branch its own domain tools. All branches run concurrently and each
independently calls its tools.
Architecture:
parallel_analysis
├── financial_analyst (tools: [check_balance])
└── order_analyst (tools: [lookup_order])
Both analysts run at the same time on the same input. Their results
are aggregated by the parent.
Requirements:
- Conductor server with LLM support
- AGENTSPAN_SERVER_URL=http://localhost:6767/api in .env or environment
- AGENTSPAN_LLM_MODEL=openai/gpt-4o-mini in .env or environment
"""
from conductor.ai.agents import Agent, AgentRuntime, Strategy, tool
from settings import settings
# ── Domain tools ────────────────────────────────────────────────────
@tool
def check_balance(account_id: str) -> dict:
"""Check the balance of a bank account."""
return {"account_id": account_id, "balance": 5432.10, "currency": "USD"}
@tool
def lookup_order(order_id: str) -> dict:
"""Look up the status of an order."""
return {"order_id": order_id, "status": "shipped", "eta": "2 days"}
# ── Parallel agents with tools ─────────────────────────────────────
financial_analyst = Agent(
name="financial_analyst",
model=settings.llm_model,
instructions=(
"You are a financial analyst. Use check_balance to look up the "
"account mentioned. Report the balance and any financial observations."
),
tools=[check_balance],
)
order_analyst = Agent(
name="order_analyst",
model=settings.llm_model,
instructions=(
"You are an order analyst. Use lookup_order to check the order "
"mentioned. Report the status and delivery timeline."
),
tools=[lookup_order],
)
# Both analysts run concurrently
analysis = Agent(
name="parallel_analysis",
model=settings.llm_model,
agents=[financial_analyst, order_analyst],
strategy=Strategy.PARALLEL,
)
if __name__ == "__main__":
with AgentRuntime() as runtime:
result = runtime.run(
analysis,
"Check account ACC-200 balance and look up order ORD-300 status.",
)
result.print_result()
output = str(result.output)
checks = []
if "5432" in output:
checks.append("[OK] Financial analyst retrieved balance")
else:
checks.append("[WARN] Expected balance in output")
if "shipped" in output.lower():
checks.append("[OK] Order analyst retrieved order status")
else:
checks.append("[WARN] Expected order status in output")
for c in checks:
print(c)
# Production pattern:
# 1. Deploy once during CI/CD:
# runtime.deploy(analysis)
# CLI alternative:
# agentspan deploy --package examples.65_parallel_with_tools
#
# 2. In a separate long-lived worker process:
# runtime.serve(analysis)