问题域/PD-396

异步任务队列

Async Task Queue

管理长时间运行的异步任务,支持状态追踪和超时保护

子问题

1.任务状态机

2.超时保护

3.队列清理

4.执行模式切换

5.双后端引擎切换(本地asyncio vs 分布式Celery)

6.取消语义区分(用户主动取消 vs 系统异常取消)

7.孤儿队列检测与回收

各项目的解法1 solutions

Signals

横向对比

维度Langflow
队列架构三层分离:API路由层 + JobService状态持久化层 + TaskService双后端执行层
状态模型6态枚举(QUEUED/IN_PROGRESS/COMPLETED/FAILED/CANCELLED/TIMED_OUT)持久化到SQL
执行后端双后端抽象:本地asyncio.Queue + CancelScope,生产Celery,配置切换
超时保护asyncio.wait_for(300s) 包装同步执行,TimeoutError映射TIMED_OUT状态
清理策略两阶段清理:60s轮询标记 + 300s宽限期后真正删除,防止竞态回收
取消语义CancelledError.args携带USER/SYSTEM标记码,区分用户取消与系统取消
事件系统EventManager绑定asyncio.Queue,9种预注册事件类型支持SSE推送

最佳实践

1.支持sync/stream/background三种执行模式按需选择

2.两阶段清理(标记+宽限期)防止消费者竞态

3.CancelledError.args携带语义标记码区分取消来源

4.后台模式先持久化Job再fire_and_forget确保状态可查