Agent SSE 可恢复流:后端别把长连接当任务生命周期
做 Agent 聊天时,最开始很容易把 SSE 当成一条“任务管道”。
用户点发送,后端开一个流,Agent 一边跑一边往这个 HTTP response 里写数据。浏览器断开,就认为用户不需要了,于是顺手把后台任务也取消。
这套在 demo 里很顺。但一到长任务就会暴露问题。
生成视频、跑工作流、调图像模型,可能要几十秒甚至几分钟。用户网络抖一下、页面切后台、代理断一下,SSE 连接断了。但 Agent 任务本身不应该死。
真正要拆开的,是两件事:
- Agent run:后台任务生命周期
- SSE connection:前端订阅事件的传输连接
连接可以断,可以重连。run 不能因为连接断了就被杀。
架构图
flowchart LR
FE["Frontend<br/>sendMessage / resumeActiveRun<br/>POST /v1/agent/runs<br/>fetch-event-source 订阅 events<br/>最近 500 个 SSE id 去重"]
API["Agent API<br/>POST /runs 创建 run<br/>GET /runs/{run_id}/events 订阅<br/>cancel / HITL<br/>active-run 只用于页面恢复"]
Producer["Background Agent Producer<br/>执行 Agent run<br/>写 Redis Stream 事件<br/>刷新 runner lease<br/>最终历史写 MongoDB"]
Redis["Redis Runtime Layer<br/>Stream per run<br/>id + event + data<br/>active run state<br/>runner lease TTL"]
SSE["SSE Events Endpoint<br/>读取 Last-Event-ID header<br/>XREAD Redis Stream<br/>输出标准 SSE<br/>heartbeat comment"]
Mongo["MongoDB Final History<br/>完整聊天历史<br/>最终一致性持久化"]
FE -->|"create run / 409 active_run_id"| API
API -->|"start background task"| Producer
Producer -->|"XADD event"| Redis
SSE -->|"XREAD from cursor"| Redis
FE -.->|"SSE reconnect with Last-Event-ID"| SSE
SSE -.->|"id / event / data"| FE
Producer -->|"final messages"| Mongo
SSE -.->|"closes only on run.completed / run.failed / run.cancelled"| FE
这个图里最重要的箭头不是 SSE,而是 Producer -> Redis。
Agent 生成的每个事件都先落到 Redis Stream。SSE endpoint 只是一个 reader。前端断线以后,新的 SSE 连接拿着 Last-Event-ID 再读一次 Redis Stream,就能从上次位置继续。
后端分三层
后端不要把所有逻辑塞进一个 /chat 流式接口里。拆成三层会清楚很多。
第一层是 Run API。
POST /v1/agent/runs 只负责创建一次后台 run,返回 run_id。如果同一个 session 已经有 active run,不要再创建一个新的,而是返回 409 ACTIVE_RUN_EXISTS,带上 active_run_id,让前端订阅旧 run。
第二层是 Background Agent Producer。
它和前端连接无关。只要 run 创建成功,它就在后台继续执行 Agent loop、tool call、workflow,并把事件写入 Redis Stream。
第三层是 SSE Events Endpoint。
GET /v1/agent/runs/{run_id}/events 不执行 Agent,不创建任务,只做一件事:从 Redis Stream 读事件,转成标准 SSE,推给前端。
这个拆法会让很多边界自然变简单。
Redis Stream 是运行期日志,不是最终聊天记录
Redis Stream 不是拿来替代 MongoDB 的。
它更像一段短期可恢复日志,保存最近一段时间的 run events。比如保留 24 小时,或者每个 run 保留最近 2000 条事件。
写事件的代码大概长这样:
import json
def run_stream_key(run_id: str) -> str:
return f"agent:run:{run_id}:events"
async def append_run_event(redis, run_id: str, event: str, data: dict) -> str:
redis_id = await redis.xadd(
run_stream_key(run_id),
{
"event": event,
"data": json.dumps(data, ensure_ascii=False),
},
maxlen=2000,
approximate=True,
)
await redis.expire(run_stream_key(run_id), 60 * 60 * 24)
return redis_id.decode() if isinstance(redis_id, bytes) else redis_id
这里每条事件都有 Redis Stream id,比如:
1778494100696-0
这个 id 就是前端恢复的游标。它比自己发一个自增 seq 更省事,因为 Redis 已经保证了同一个 stream 内的顺序。
MongoDB 仍然保存最终聊天历史。比如用户消息、assistant 最终 parts、HITL 状态、工作流结果。Redis 只承担“断线重连时补事件”的职责。
如果 Redis 里的窗口已经过期,就不要假装能恢复。后端应该直接返回 410 RESUME_WINDOW_EXPIRED,让前端刷新历史。
标准 SSE 输出
SSE 的 wire protocol 不复杂,但最好别手拼得到处都是。
服务端统一用 formatter,保证 id、event、多行 data 都正确。
import json
def format_sse_event(event_id: str, event_name: str, data: dict) -> str:
payload = json.dumps(data, ensure_ascii=False, separators=(",", ":"))
lines = [
f"id: {event_id}",
f"event: {event_name}",
]
for line in payload.splitlines() or [""]:
lines.append(f"data: {line}")
return "\n".join(lines) + "\n\n"
def format_sse_comment(comment: str) -> str:
return f": {comment}\n\n"
心跳不要写进 Redis Stream。
心跳只是保活连接,用 SSE comment 就行:
: heartbeat
浏览器不会把 comment 当业务事件处理,DevTools 的 EventStream 里也不一定展示它。这是好事,业务层不用关心心跳。
Events endpoint 只负责读
SSE endpoint 收 Last-Event-ID header。没有 header,就从 "0-0" 开始。
核心逻辑是 XREAD stream last_id。Redis 会返回 last id 之后的新事件。
from fastapi import Header
from fastapi.responses import StreamingResponse
RUN_TERMINAL_EVENTS = {"run.completed", "run.failed", "run.cancelled"}
@app.get("/v1/agent/runs/{run_id}/events")
async def stream_run_events(
run_id: str,
last_event_id: str | None = Header(default=None, alias="Last-Event-ID"),
):
cursor = last_event_id or "0-0"
async def event_stream():
nonlocal cursor
while True:
events = await read_run_events(run_id, cursor, block_ms=15000)
if not events:
yield format_sse_comment("heartbeat")
continue
for redis_id, event_name, data in events:
cursor = redis_id
yield format_sse_event(redis_id, event_name, data)
if event_name in RUN_TERMINAL_EVENTS:
return
return StreamingResponse(
event_stream(),
media_type="text/event-stream; charset=utf-8",
headers={
"Cache-Control": "no-cache",
"X-Accel-Buffering": "no",
},
)
这里有个关键点:SSE endpoint 不应该因为当前进程里没有 task,就立刻判定 run 中断。
在多副本、重启、重连打到另一台实例时,本进程当然可能没有那个 task。更稳的做法是 runner lease:
- run 启动时写
agent:run:{run_id}:lease - value 里放
instance_id - TTL 例如 30 秒
- background task 每 10 秒刷新一次
- 只有 run 仍是 running,且 lease 过期,才追加
agent.failed+run.failed
单副本可以先不做复杂 lease,但架构上要知道这个坑在哪。
agent.* 和 run.* 必须分清
这次改造里,我最想保留的一个规则是:业务状态和连接生命周期分开。
agent.* 是给 UI 看的业务事件。
比如:
agent.completed
agent.failed
agent.stopped
它表示 Agent 内容层面已经完成、失败或停止。前端可以更新消息状态,但 SSE 连接不一定马上关。
真正决定 SSE 连接关闭的是 run.*:
run.completed
run.failed
run.cancelled
事件顺序应该固定下来。
成功:
chat.message.completed
agent.completed
run.completed
失败:
agent.failed
run.failed
取消:
agent.stopped
run.cancelled
为什么要多一层 run.*?
因为 agent.completed 有时候只是“这段业务内容生成完了”。后端可能还要写最终状态、收尾清理、刷新历史。SSE endpoint 如果看到 agent.completed 就关,前端会更容易遇到半截状态。
run.* 是生命周期终态。看到它,客户端停止重连,后端也可以清 active run。
active run 不是轮询接口
active-run 很容易被滥用。
每次 React 状态变了就查一次,每次发送前也查一次,最后 Network 里全是 active-run。更糟的是,如果旧 run 还在跑,新消息又创建失败,UI 会进入很奇怪的状态。
更干净的策略是:
- 页面刷新、进入 session、历史加载完后,查一次 active run
- 如果有
active_run_id,订阅它的 events - 发送消息前不主动查 active-run
- 发送消息直接
POST /runs - 如果后端返回
409 ACTIVE_RUN_EXISTS,拿active_run_id订阅旧 run
也就是说,active-run 用来恢复页面,不用来轮询状态。
前端断线时发生什么
前端使用 @microsoft/fetch-event-source 后,传输层不用自己维护 reader、parser 和重连循环。
正常请求顺序是:
POST /v1/agent/runs
GET /v1/agent/runs/{run_id}/events
第一次 events 请求没有 Last-Event-ID。这是正常的,因为还没有收到任何事件。
服务端返回:
id: 1778494100696-0
event: chat.message.delta
data: {"text":"hello"}
库收到 id 后,会把它记下来。下一次网络断开重连时,请求头里会带:
Last-Event-ID: 1778494100696-0
后端从这个 id 后面继续 XREAD,不会重发已确认的事件。
前端仍然可以保留一个最近 500 个 SSE id 的去重窗口。但它只是防御性措施,不是 cursor 来源。真正的恢复游标应该是 Last-Event-ID。
哪些错误该重试,哪些不该
网络错误、429、5xx 可以重试。
重试间隔不要太激进。比如:
1s -> 2s -> 4s -> 8s -> 15s -> 30s
最多 10 次就够了。
这些不要重试:
400 / 401 / 403 / 404 / 410
尤其是 410 RESUME_WINDOW_EXPIRED。这不是网络抖动,而是 Redis Stream 窗口已经没了。继续重试只会浪费请求,正确处理是提示用户刷新会话历史。
还有一种也不要当网络错误重试:
agent.failed
run.failed
这是业务失败。模型失败、工具失败、工作流失败,都应该以业务错误展示,而不是把 SSE 连接重连 10 次。
为什么不用 Redis Pub/Sub
Pub/Sub 更快,但不适合这个场景。
它没有回放。订阅者断线期间发布的消息就丢了。你还得再做一层事件缓存,最后又会回到 Redis Stream 或数据库。
Agent SSE 要解决的不是“最低延迟广播”,而是:
- 长任务不断线
- 断线后能补事件
- 每个 run 的事件有序
- 前端不重复渲染
- 过期时能明确失败
Redis Stream 正好卡在这个点上。它不是最轻的,但复杂度放在合理的位置。
这套架构解决了什么
它解决的是一个产品层面的体验问题。
用户网络不稳时,前端可以断。浏览器可以重连。SSE handler 可以消失再回来。
但后台 Agent run 继续跑。
事件先进入 Redis Stream,再由 SSE endpoint 转发。前端每次用 Last-Event-ID 说明“我收到哪里了”,后端只补后面的事件。最终消息再落 MongoDB,保证刷新页面后还有完整历史。
一条长连接不再承担所有职责。
它只是订阅。
真正的任务生命周期,在 run。