niuzj

Agent SSE 可恢复流:后端别把长连接当任务生命周期

做 Agent 聊天时,最开始很容易把 SSE 当成一条“任务管道”。

用户点发送,后端开一个流,Agent 一边跑一边往这个 HTTP response 里写数据。浏览器断开,就认为用户不需要了,于是顺手把后台任务也取消。

这套在 demo 里很顺。但一到长任务就会暴露问题。

生成视频、跑工作流、调图像模型,可能要几十秒甚至几分钟。用户网络抖一下、页面切后台、代理断一下,SSE 连接断了。但 Agent 任务本身不应该死。

真正要拆开的,是两件事:

  • Agent run:后台任务生命周期
  • SSE connection:前端订阅事件的传输连接

连接可以断,可以重连。run 不能因为连接断了就被杀。

架构图

flowchart LR
  FE["Frontend<br/>sendMessage / resumeActiveRun<br/>POST /v1/agent/runs<br/>fetch-event-source 订阅 events<br/>最近 500 个 SSE id 去重"]
  API["Agent API<br/>POST /runs 创建 run<br/>GET /runs/{run_id}/events 订阅<br/>cancel / HITL<br/>active-run 只用于页面恢复"]
  Producer["Background Agent Producer<br/>执行 Agent run<br/>写 Redis Stream 事件<br/>刷新 runner lease<br/>最终历史写 MongoDB"]
  Redis["Redis Runtime Layer<br/>Stream per run<br/>id + event + data<br/>active run state<br/>runner lease TTL"]
  SSE["SSE Events Endpoint<br/>读取 Last-Event-ID header<br/>XREAD Redis Stream<br/>输出标准 SSE<br/>heartbeat comment"]
  Mongo["MongoDB Final History<br/>完整聊天历史<br/>最终一致性持久化"]

  FE -->|"create run / 409 active_run_id"| API
  API -->|"start background task"| Producer
  Producer -->|"XADD event"| Redis
  SSE -->|"XREAD from cursor"| Redis
  FE -.->|"SSE reconnect with Last-Event-ID"| SSE
  SSE -.->|"id / event / data"| FE
  Producer -->|"final messages"| Mongo
  SSE -.->|"closes only on run.completed / run.failed / run.cancelled"| FE

这个图里最重要的箭头不是 SSE,而是 Producer -> Redis

Agent 生成的每个事件都先落到 Redis Stream。SSE endpoint 只是一个 reader。前端断线以后,新的 SSE 连接拿着 Last-Event-ID 再读一次 Redis Stream,就能从上次位置继续。

后端分三层

后端不要把所有逻辑塞进一个 /chat 流式接口里。拆成三层会清楚很多。

第一层是 Run API

POST /v1/agent/runs 只负责创建一次后台 run,返回 run_id。如果同一个 session 已经有 active run,不要再创建一个新的,而是返回 409 ACTIVE_RUN_EXISTS,带上 active_run_id,让前端订阅旧 run。

第二层是 Background Agent Producer

它和前端连接无关。只要 run 创建成功,它就在后台继续执行 Agent loop、tool call、workflow,并把事件写入 Redis Stream。

第三层是 SSE Events Endpoint

GET /v1/agent/runs/{run_id}/events 不执行 Agent,不创建任务,只做一件事:从 Redis Stream 读事件,转成标准 SSE,推给前端。

这个拆法会让很多边界自然变简单。

Redis Stream 是运行期日志,不是最终聊天记录

Redis Stream 不是拿来替代 MongoDB 的。

它更像一段短期可恢复日志,保存最近一段时间的 run events。比如保留 24 小时,或者每个 run 保留最近 2000 条事件。

写事件的代码大概长这样:

import json


def run_stream_key(run_id: str) -> str:
    return f"agent:run:{run_id}:events"


async def append_run_event(redis, run_id: str, event: str, data: dict) -> str:
    redis_id = await redis.xadd(
        run_stream_key(run_id),
        {
            "event": event,
            "data": json.dumps(data, ensure_ascii=False),
        },
        maxlen=2000,
        approximate=True,
    )
    await redis.expire(run_stream_key(run_id), 60 * 60 * 24)
    return redis_id.decode() if isinstance(redis_id, bytes) else redis_id

这里每条事件都有 Redis Stream id,比如:

1778494100696-0

这个 id 就是前端恢复的游标。它比自己发一个自增 seq 更省事,因为 Redis 已经保证了同一个 stream 内的顺序。

MongoDB 仍然保存最终聊天历史。比如用户消息、assistant 最终 parts、HITL 状态、工作流结果。Redis 只承担“断线重连时补事件”的职责。

如果 Redis 里的窗口已经过期,就不要假装能恢复。后端应该直接返回 410 RESUME_WINDOW_EXPIRED,让前端刷新历史。

标准 SSE 输出

SSE 的 wire protocol 不复杂,但最好别手拼得到处都是。

服务端统一用 formatter,保证 idevent、多行 data 都正确。

import json


def format_sse_event(event_id: str, event_name: str, data: dict) -> str:
    payload = json.dumps(data, ensure_ascii=False, separators=(",", ":"))
    lines = [
        f"id: {event_id}",
        f"event: {event_name}",
    ]
    for line in payload.splitlines() or [""]:
        lines.append(f"data: {line}")
    return "\n".join(lines) + "\n\n"


def format_sse_comment(comment: str) -> str:
    return f": {comment}\n\n"

心跳不要写进 Redis Stream。

心跳只是保活连接,用 SSE comment 就行:

: heartbeat

浏览器不会把 comment 当业务事件处理,DevTools 的 EventStream 里也不一定展示它。这是好事,业务层不用关心心跳。

Events endpoint 只负责读

SSE endpoint 收 Last-Event-ID header。没有 header,就从 "0-0" 开始。

核心逻辑是 XREAD stream last_id。Redis 会返回 last id 之后的新事件。

from fastapi import Header
from fastapi.responses import StreamingResponse


RUN_TERMINAL_EVENTS = {"run.completed", "run.failed", "run.cancelled"}


@app.get("/v1/agent/runs/{run_id}/events")
async def stream_run_events(
    run_id: str,
    last_event_id: str | None = Header(default=None, alias="Last-Event-ID"),
):
    cursor = last_event_id or "0-0"

    async def event_stream():
        nonlocal cursor

        while True:
            events = await read_run_events(run_id, cursor, block_ms=15000)

            if not events:
                yield format_sse_comment("heartbeat")
                continue

            for redis_id, event_name, data in events:
                cursor = redis_id
                yield format_sse_event(redis_id, event_name, data)

                if event_name in RUN_TERMINAL_EVENTS:
                    return

    return StreamingResponse(
        event_stream(),
        media_type="text/event-stream; charset=utf-8",
        headers={
            "Cache-Control": "no-cache",
            "X-Accel-Buffering": "no",
        },
    )

这里有个关键点:SSE endpoint 不应该因为当前进程里没有 task,就立刻判定 run 中断。

在多副本、重启、重连打到另一台实例时,本进程当然可能没有那个 task。更稳的做法是 runner lease:

  • run 启动时写 agent:run:{run_id}:lease
  • value 里放 instance_id
  • TTL 例如 30 秒
  • background task 每 10 秒刷新一次
  • 只有 run 仍是 running,且 lease 过期,才追加 agent.failed + run.failed

单副本可以先不做复杂 lease,但架构上要知道这个坑在哪。

agent.* 和 run.* 必须分清

这次改造里,我最想保留的一个规则是:业务状态和连接生命周期分开。

agent.* 是给 UI 看的业务事件。

比如:

agent.completed
agent.failed
agent.stopped

它表示 Agent 内容层面已经完成、失败或停止。前端可以更新消息状态,但 SSE 连接不一定马上关。

真正决定 SSE 连接关闭的是 run.*

run.completed
run.failed
run.cancelled

事件顺序应该固定下来。

成功:

chat.message.completed
agent.completed
run.completed

失败:

agent.failed
run.failed

取消:

agent.stopped
run.cancelled

为什么要多一层 run.*

因为 agent.completed 有时候只是“这段业务内容生成完了”。后端可能还要写最终状态、收尾清理、刷新历史。SSE endpoint 如果看到 agent.completed 就关,前端会更容易遇到半截状态。

run.* 是生命周期终态。看到它,客户端停止重连,后端也可以清 active run。

active run 不是轮询接口

active-run 很容易被滥用。

每次 React 状态变了就查一次,每次发送前也查一次,最后 Network 里全是 active-run。更糟的是,如果旧 run 还在跑,新消息又创建失败,UI 会进入很奇怪的状态。

更干净的策略是:

  • 页面刷新、进入 session、历史加载完后,查一次 active run
  • 如果有 active_run_id,订阅它的 events
  • 发送消息前不主动查 active-run
  • 发送消息直接 POST /runs
  • 如果后端返回 409 ACTIVE_RUN_EXISTS,拿 active_run_id 订阅旧 run

也就是说,active-run 用来恢复页面,不用来轮询状态。

前端断线时发生什么

前端使用 @microsoft/fetch-event-source 后,传输层不用自己维护 reader、parser 和重连循环。

正常请求顺序是:

POST /v1/agent/runs
GET  /v1/agent/runs/{run_id}/events

第一次 events 请求没有 Last-Event-ID。这是正常的,因为还没有收到任何事件。

服务端返回:

id: 1778494100696-0
event: chat.message.delta
data: {"text":"hello"}

库收到 id 后,会把它记下来。下一次网络断开重连时,请求头里会带:

Last-Event-ID: 1778494100696-0

后端从这个 id 后面继续 XREAD,不会重发已确认的事件。

前端仍然可以保留一个最近 500 个 SSE id 的去重窗口。但它只是防御性措施,不是 cursor 来源。真正的恢复游标应该是 Last-Event-ID

哪些错误该重试,哪些不该

网络错误、4295xx 可以重试。

重试间隔不要太激进。比如:

1s -> 2s -> 4s -> 8s -> 15s -> 30s

最多 10 次就够了。

这些不要重试:

400 / 401 / 403 / 404 / 410

尤其是 410 RESUME_WINDOW_EXPIRED。这不是网络抖动,而是 Redis Stream 窗口已经没了。继续重试只会浪费请求,正确处理是提示用户刷新会话历史。

还有一种也不要当网络错误重试:

agent.failed
run.failed

这是业务失败。模型失败、工具失败、工作流失败,都应该以业务错误展示,而不是把 SSE 连接重连 10 次。

为什么不用 Redis Pub/Sub

Pub/Sub 更快,但不适合这个场景。

它没有回放。订阅者断线期间发布的消息就丢了。你还得再做一层事件缓存,最后又会回到 Redis Stream 或数据库。

Agent SSE 要解决的不是“最低延迟广播”,而是:

  • 长任务不断线
  • 断线后能补事件
  • 每个 run 的事件有序
  • 前端不重复渲染
  • 过期时能明确失败

Redis Stream 正好卡在这个点上。它不是最轻的,但复杂度放在合理的位置。

这套架构解决了什么

它解决的是一个产品层面的体验问题。

用户网络不稳时,前端可以断。浏览器可以重连。SSE handler 可以消失再回来。

但后台 Agent run 继续跑。

事件先进入 Redis Stream,再由 SSE endpoint 转发。前端每次用 Last-Event-ID 说明“我收到哪里了”,后端只补后面的事件。最终消息再落 MongoDB,保证刷新页面后还有完整历史。

一条长连接不再承担所有职责。

它只是订阅。

真正的任务生命周期,在 run。