-
Notifications
You must be signed in to change notification settings - Fork 113
Expand file tree
/
Copy pathagent_loop.py
More file actions
96 lines (86 loc) · 4.13 KB
/
agent_loop.py
File metadata and controls
96 lines (86 loc) · 4.13 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
import json, re
from dataclasses import dataclass
from typing import Any, Optional
@dataclass
class StepOutcome:
data: Any
next_prompt: Optional[str] = None
should_exit: bool = False
def try_call_generator(func, *args, **kwargs):
ret = func(*args, **kwargs)
if hasattr(ret, '__iter__') and not isinstance(ret, (str, bytes, dict, list)):
ret = yield from ret
return ret
class BaseHandler:
def tool_before_callback(self, tool_name, args, response): pass
def tool_after_callback(self, tool_name, args, response, ret): pass
def next_prompt_patcher(self, next_prompt, outcome, turn): return next_prompt
def dispatch(self, tool_name, args, response):
method_name = f"do_{tool_name}"
if hasattr(self, method_name):
prer = yield from try_call_generator(self.tool_before_callback, tool_name, args, response)
ret = yield from try_call_generator(getattr(self, method_name), args, response)
_ = yield from try_call_generator(self.tool_after_callback, tool_name, args, response, ret)
return ret
elif tool_name == 'bad_json':
return StepOutcome(None, next_prompt=args.get('msg', 'bad_json'), should_exit=False)
else:
yield f"未知工具: {tool_name}\n"
return StepOutcome(None, next_prompt=f"未知工具 {tool_name}", should_exit=False)
def json_default(o):
if isinstance(o, set): return list(o)
return str(o)
def exhaust(g):
try:
while True: next(g)
except StopIteration as e: return e.value
def get_pretty_json(data):
if isinstance(data, dict) and "script" in data:
data = data.copy()
data["script"] = data["script"].replace("; ", ";\n ")
return json.dumps(data, indent=2, ensure_ascii=False).replace('\\n', '\n')
def agent_runner_loop(client, system_prompt, user_input, handler, tools_schema, max_turns=15, verbose=True, initial_user_content=None):
messages = [
{"role": "system", "content": system_prompt},
{"role": "user", "content": initial_user_content if initial_user_content is not None else user_input}
]
for turn in range(max_turns):
yield f"**LLM Running (Turn {turn+1}) ...**\n\n"
if (turn+1) % 10 == 0: client.last_tools = '' # 每10轮重置一次工具描述,避免上下文过大导致的模型性能下降
response_gen = client.chat(messages=messages, tools=tools_schema)
if verbose:
response = yield from response_gen
yield '\n\n'
else:
response = exhaust(response_gen)
yield response.content
if not response.tool_calls:
tool_name, args = 'no_tool', {}
else:
tool_call = response.tool_calls[0]
tool_name = tool_call.function.name
args = json.loads(tool_call.function.arguments)
if tool_name == 'no_tool': pass
else:
showarg = get_pretty_json(args)
if not verbose and len(showarg) > 200: showarg = showarg[:200] + ' ...'
yield f"🛠️ **正在调用工具:** `{tool_name}` 📥**参数:**\n````text\n{showarg}\n````\n"
handler.current_turn = turn + 1
gen = handler.dispatch(tool_name, args, response)
if verbose:
yield '`````\n'
outcome = yield from gen
yield '`````\n'
else:
outcome = exhaust(gen)
if outcome.next_prompt is None: return {'result': 'CURRENT_TASK_DONE', 'data': outcome.data}
if outcome.should_exit: return {'result': 'EXITED', 'data': outcome.data}
if outcome.next_prompt.startswith('未知工具'): client.last_tools = ''
next_prompt = ""
if outcome.data is not None:
datastr = json.dumps(outcome.data, ensure_ascii=False, default=json_default) if type(outcome.data) in [dict, list] else str(outcome.data)
next_prompt += f"<tool_result>\n{datastr}\n</tool_result>\n\n"
next_prompt += outcome.next_prompt
next_prompt = handler.next_prompt_patcher(next_prompt, outcome, turn+1)
messages = [{"role": "user", "content": next_prompt}]
return {'result': 'MAX_TURNS_EXCEEDED'}