异步任务队列
Async Task Queue
管理长时间运行的异步任务,支持状态追踪和超时保护
子问题
1.任务状态机
2.超时保护
3.队列清理
4.执行模式切换
5.双后端引擎切换(本地asyncio vs 分布式Celery)
6.取消语义区分(用户主动取消 vs 系统异常取消)
7.孤儿队列检测与回收
各项目的解法1 solutions
Signals
横向对比
| 维度 | Langflow |
|---|---|
| 队列架构 | 三层分离:API路由层 + JobService状态持久化层 + TaskService双后端执行层 |
| 状态模型 | 6态枚举(QUEUED/IN_PROGRESS/COMPLETED/FAILED/CANCELLED/TIMED_OUT)持久化到SQL |
| 执行后端 | 双后端抽象:本地asyncio.Queue + CancelScope,生产Celery,配置切换 |
| 超时保护 | asyncio.wait_for(300s) 包装同步执行,TimeoutError映射TIMED_OUT状态 |
| 清理策略 | 两阶段清理:60s轮询标记 + 300s宽限期后真正删除,防止竞态回收 |
| 取消语义 | CancelledError.args携带USER/SYSTEM标记码,区分用户取消与系统取消 |
| 事件系统 | EventManager绑定asyncio.Queue,9种预注册事件类型支持SSE推送 |
最佳实践
1.支持sync/stream/background三种执行模式按需选择
2.两阶段清理(标记+宽限期)防止消费者竞态
3.CancelledError.args携带语义标记码区分取消来源
4.后台模式先持久化Job再fire_and_forget确保状态可查