深入理解 Python asyncio:从事件循环到 AI Agent 并发调用
写 AI Agent 的时候,你一定遇到过这个场景:LLM 返回了两个 tool call,比如同时查天气和查日历。它们之间没有依赖关系,完全可以并发执行。但如果你用 await 一个一个等,白白浪费了时间。
这篇文章从事件循环讲起,搞清楚 await 和 asyncio.create_task 的本质区别,最后落到 Agent 开发中的实际用法。
事件循环:asyncio 的心脏
事件循环(Event Loop)是 asyncio 的核心调度器。你可以把它想象成一个单线程的任务调度中心——它维护一个任务队列,不断地检查:哪个任务可以往前推进了?
import asyncio
async def say(msg, delay):
await asyncio.sleep(delay)
print(msg)
async def main():
await say("hello", 1)
await say("world", 1)
asyncio.run(main())
# 总耗时 2 秒:hello(1s) -> world(1s),串行执行
asyncio.run(main()) 做了三件事:
- 创建一个事件循环
- 把
main()作为入口协程扔进去 - 驱动事件循环直到
main()完成
关键点:事件循环是单线程的。它不是靠多线程实现并发,而是靠"在等待 IO 的时候切换到别的任务"来实现并发。当一个协程 await asyncio.sleep(1) 的时候,事件循环知道这个任务要等 1 秒,就去执行别的任务了。
await:挂起当前协程,等结果回来
await 的语义很明确:挂起当前协程,等待目标协程完成,拿到返回值后继续往下走。
async def fetch_weather(city: str) -> dict:
await asyncio.sleep(1) # 模拟 API 调用
return {"city": city, "temp": "22°C"}
async def main():
# 串行:先查北京,等结果回来,再查上海
beijing = await fetch_weather("北京")
shanghai = await fetch_weather("上海")
print(beijing, shanghai)
# 总耗时 2 秒
这就像你在餐厅点菜,跟服务员说"先上第一道菜,等我吃完了再上第二道"。效率很低,但逻辑简单,适合有依赖关系的场景。
create_task:扔进事件循环,不等它完成
asyncio.create_task() 的语义完全不同:把协程包装成一个 Task 对象,立即注册到事件循环中开始调度,但不等它完成。
async def main():
# 并发:两个任务同时开始
task1 = asyncio.create_task(fetch_weather("北京"))
task2 = asyncio.create_task(fetch_weather("上海"))
# 两个任务已经在事件循环中跑了
# 现在 await 拿结果
beijing = await task1
shanghai = await task2
print(beijing, shanghai)
# 总耗时 1 秒
这就像你同时跟两个服务员说"一个上北京烤鸭,一个上小笼包",两道菜同时做,谁先好谁先上。
核心区别
await coroutine() |
asyncio.create_task(coroutine()) |
|
|---|---|---|
| 何时开始执行 | 立即执行,但阻塞当前协程 | 立即注册到事件循环,不阻塞 |
| 何时拿到结果 | await 返回时 | 后续 await task 时 |
| 并发能力 | 无,串行 | 有,多个 task 并发 |
| 适用场景 | 有依赖关系的调用 | 无依赖关系的调用 |
在 AI Agent 中的实际应用
现在把这些知识用到 AI Agent 开发中。一个典型的 Agent 循环长这样:
用户输入 → LLM 思考 → 返回 tool calls → 执行 tools → 结果喂回 LLM → ...
LLM 可能一次返回多个 tool call。比如用户问"北京和上海今天天气怎么样",LLM 会同时吐出两个 get_weather 调用。这两个调用之间没有依赖,应该并发执行。
串行版本(慢)
async def run_agent(user_input: str):
messages = [{"role": "user", "content": user_input}]
while True:
response = await call_llm(messages)
if not response.tool_calls:
print(response.content)
return
# 串行执行每个 tool call —— 慢!
for tool_call in response.tool_calls:
result = await execute_tool(tool_call)
messages.append({
"role": "tool",
"tool_call_id": tool_call.id,
"content": result
})
# 把结果喂回 LLM 继续
如果 LLM 返回了 3 个 tool call,每个耗时 1 秒,总共要等 3 秒。
并发版本(快)
async def run_agent(user_input: str):
messages = [{"role": "user", "content": user_input}]
while True:
response = await call_llm(messages)
if not response.tool_calls:
print(response.content)
return
# 并发执行所有 tool call
tasks = [
asyncio.create_task(execute_tool(tc))
for tc in response.tool_calls
]
results = await asyncio.gather(*tasks)
for tool_call, result in zip(response.tool_calls, results):
messages.append({
"role": "tool",
"tool_call_id": tool_call.id,
"content": result
})
同样 3 个 tool call,并发执行只需要 1 秒(取决于最慢的那个)。
asyncio.gather vs create_task
上面用了 asyncio.gather,它本质上就是帮你做了 create_task + await 的组合:
# 这两种写法等价
# 写法 1:手动 create_task
task1 = asyncio.create_task(execute_tool(tc1))
task2 = asyncio.create_task(execute_tool(tc2))
result1 = await task1
result2 = await task2
# 写法 2:gather 一步到位
result1, result2 = await asyncio.gather(
execute_tool(tc1),
execute_tool(tc2)
)
gather 更简洁,适合"一批任务全部完成后再继续"的场景。手动 create_task 更灵活,适合需要在中途检查某个任务状态的场景。
一个更完整的 Agent 示例
import asyncio
import json
# 定义 tools
async def get_weather(city: str) -> str:
"""模拟天气 API 调用"""
await asyncio.sleep(1)
data = {"北京": "晴 22°C", "上海": "多云 25°C", "深圳": "雨 28°C"}
return json.dumps({"city": city, "weather": data.get(city, "未知")})
async def get_calendar(date: str) -> str:
"""模拟日历 API 调用"""
await asyncio.sleep(0.8)
return json.dumps({"date": date, "events": ["团队周会 10:00", "代码评审 14:00"]})
TOOL_MAP = {
"get_weather": get_weather,
"get_calendar": get_calendar,
}
async def execute_tool(tool_call) -> str:
"""执行单个 tool call"""
func = TOOL_MAP[tool_call.function.name]
args = json.loads(tool_call.function.arguments)
return await func(**args)
async def run_agent(user_input: str):
messages = [{"role": "user", "content": user_input}]
while True:
response = await call_llm(messages)
if not response.tool_calls:
return response.content
# 关键:并发执行所有 tool calls
results = await asyncio.gather(
*[execute_tool(tc) for tc in response.tool_calls]
)
# 组装结果
messages.append(response.message)
for tool_call, result in zip(response.tool_calls, results):
messages.append({
"role": "tool",
"tool_call_id": tool_call.id,
"content": result,
})
# 用户问 "北京天气怎么样,顺便看看我今天有什么会"
# LLM 返回两个 tool call: get_weather("北京") + get_calendar("2025-10-15")
# create_task 让它们并发执行,1 秒搞定,而不是 1.8 秒
什么时候不该并发
不是所有 tool call 都能并发。如果 tool call 之间有依赖关系,必须串行:
# 场景:先搜索文件,再读取搜索到的文件
# 这两步有依赖,必须串行
search_result = await execute_tool(search_call) # 先搜索
read_result = await execute_tool(read_call) # 再读取
在 Agent 框架中,通常的策略是:LLM 单次返回的多个 tool call 并发执行,不同轮次的 tool call 串行执行。 因为 LLM 在同一轮返回多个 tool call,说明它认为这些调用之间没有依赖。
总结
- 事件循环是 asyncio 的调度中心,单线程通过切换任务实现并发
await串行等待,适合有依赖的场景create_task把任务扔进事件循环并发跑,适合无依赖的场景- 在 Agent 开发中,LLM 同一轮返回的多个 tool call 应该用
create_task/gather并发执行,显著降低延迟