事件驱动架构
Event-Driven Architecture
基于事件管理器的实时通信和状态广播机制
子问题
1.事件注册与分发
2.SSE流式推送
3.Webhook事件广播
4.队列背压控制
5.同步 put_nowait 到异步 emit 的桥接适配
6.Job 级别的队列-事件管理器-Task 三元组生命周期
7.SSE 连接的心跳保活与断连检测
各项目的解法1 solutions
Signals
横向对比
| 维度 | Langflow |
|---|---|
| 事件分发模型 | 双层架构:内层 Queue 直连 + 外层 Pub-Sub 广播,通过 ForwardingQueue 适配器桥接 |
| 背压控制 | 有界队列 maxsize=100 + 1s emit 超时丢弃 + dead queue 自动清理 |
| SSE 协议实现 | 标准 SSE text/event-stream + 30s 心跳 + request.is_disconnected 主动检测 |
| 生命周期管理 | JobQueueService 两阶段清理:60s 扫描标记 + 300s 宽限期后实际清理 |
| 事件注册方式 | on_ 前缀约定 + functools.partial 预绑定 + __getattr__ 属性式调用 |
| Webhook 集成 | WebhookForwardingQueue 鸭子类型适配,fire-and-forget create_task 转发 |
最佳实践
1.使用asyncio.Queue实现非阻塞事件传递
2.用鸭子类型 ForwardingQueue 桥接直连队列与广播系统
3.两阶段清理(标记+宽限期)避免竞态条件下的资源泄漏
4.emit 时 copy-on-read 监听器集合避免迭代中修改