dxEventBus:线程安全的事件总线
简介
在 Worker 概述 中我们提到,DejaOS 推荐使用 dxEventBus 作为 Worker 间的标准通信方式。dxEventBus 是一个基于发布/订阅模式的全局事件总线,它构建在 QuickJS 的原生 postMessage 机制之上,但提供了更高级、更解耦的抽象,是构建复杂多线程应用的首选。
核心优势:
- 完全解耦: 通信双方无需相互持有引用,只需约定好事件名称(
topic)即可通信。 - 架构统一: 无论是主线程与
Worker,还是Worker与Worker之间,都 使用相同的bus.on/bus.fireAPI,形成统一的事件驱动架构。 - 模式灵活: 可以轻松实现一对一、一对多(广播)等多种通信模式。
核心机制:主线程转发
dxEventBus 的设计遵循了 QuickJS Worker 的底层通信机制,其核心是:主线程必须作为所有跨线程事件的中转枢纽。这是由 QuickJS 的 Worker 模型决定的,Worker 之间无法直接通信,所有消息都必须通过主线程进行收发。dxEventBus 在此基础上提供了优雅的封装。
当一个 Worker 调用 bus.fire() 时,事件并不会直接发送到另一个 Worker。相反,它会经历以下流程:
Worker A通过postMessage将事件发送给主线程。- 主线程接收到事件后,在其内部的订阅者列表中查找该事件的所有订阅者。
- 如果主线程发现
Worker B订阅了该事件,它会再次通过postMessage将事件转发给Worker B。 Worker B接收到主线程转发来的消息,并最终执行对应的回调函数。
+----------+ +-------------+ +----------+
| Worker A | | Main Thread | | Worker B |
+----------+ +-------------+ +----------+
| | |
| bus.fire('topic', data) | |
|------------------------------->| |
| (底层通过 postMessage) | |
| | 查找 'topic' 的订阅者 |
| |------------------------------>|
| | |
| | 发现 Worker B 订阅了该事件 |
| |------------------------------>|
| | |
| | 转发事件和数据 |
| |------------------------------>|
| | (底层通过 postMessage) |
| | |
| | | 执行 bus.on('topic') 回调
| | |------------------------------>|
| | |
理解这个核心机制至关重要。这意味着所有 Worker 之间的通信都会经过主线程,并产生两次 postMessage 的开销(Worker A -> Main -> Worker B)。因此,它非常适合业务逻辑的解耦和状态通知,但对于需要极低延迟、高吞吐量的原始数据交换场景,后续文档将介绍的 dxMap 可能是更好的选择。
基础 API
dxEventBus 的 API 非常简洁直观。
bus.on(topic, callback): 订阅一个事件。当任何线程触发了该topic的事件时,callback函数就会被执行。注意在同一个线程(主线程或某个 Worker)内,对同一个
topic多次调用bus.on,新的回调函数会覆盖旧的回调函数。每个topic在单个线程内只对应一个处理函数。bus.fire(topic, data): 触发一个事件。topic是事件名称,data是希望传递的数据对象 。bus.off(topic): 取消当前线程对某个事件的订阅。bus.newWorker(id, file): (仅主线程可用) 创建一个Worker并自动将其纳入dxEventBus的管理体系。idstring: Worker 的唯一标识符,必须全局唯一,不能重复。filestring: Worker 入口脚本的绝对路径。DejaOS 的Worker模型要求将代码预先写入一个 JS 文件,然后通过路径加载,无法动态创建后注入代码。
为了确保 Worker 能被事件总线正确管理,必须使用 bus.newWorker() 来创建线程,而不能使用 QuickJS 原生的 new Worker()。使用原生接口创建的 Worker 将独立于 dxEventBus 体系之外,无法接收或发送总线事件。