dxMap & dxQueue: 高性能线程间数据共享
简介
在 DejaOS 的多线程模型中,dxEventBus 和 dxMap/dxQueue 扮演着不同但互补的角色。理解它们的本质区别至关重要:
dxEventBus解决的是线程间的通信与通知问题。它的核心是**“事件”**,用于告诉其他线程“某件事刚刚发生了”。数据只是事件的附带信息。dxMap和dxQueue解决的是线程间的数据共享问题。它们的核心是**“数据”** 本身,提供了一个所有线程都可以访问的共享内存容器。其他线程可以随时存取最新的数据状态,而无需关心数据是何时、由谁写入的。
虽然 dxEventBus 的底层 postMessage 机制因数据克隆会带来一些性能开销,但这并非选择 dxMap/dxQueue 的唯一原因。真正的选择依据在于你的意图:你是想“发送一个通知”,还是想“共享一份数据”?
DejaOS 提供了 dxMap 和 dxQueue 这两个基于原生 C/C++ 实现的高性能、线程安全的数据共享模块,它们允许所有线程访问同一块内存,实现了**零拷贝(Zero-Copy)**的数据交换。
dxMap: 一个线程安全的键值存储(Key-Value Store),用于在多个线程间共享状态。dxQueue: 一个线程安全的先进先出队列(FIFO Queue),用于在多个线程间实现生产者-消费者模式的数据流。
dxMap: 线程安全的共享状态
dxMap 提供了一个全局的、按主题(topic)隔离的键值存储区。你可以把它想象成一个所有线程都可以同时读写的全局对象。
核心机制
dxMap 由底层 C 实现,确保了所有 put/get/del 等操作都是原子性的,从而保证 了线程安全。当你从不同的线程访问同一个 topic 的 dxMap 实例时,它们操作的都是同一个底层数据结构。
API 概览
-
获取/创建实例
dxMap.get(topic): 获取一个指定topic名称的 map 实例。如果该 topic 的 map 不存在,它会在第一次put操作时被隐式创建。
-
操作方法
map.put(key, value): 设置一个键值对。支持字符串、数字、布尔值、对象和数组。如果value为null或undefined,则会删除该key。map.get(key): 获取指定key的值。如果key不存在,返回undefined。map.has(key): 检查是否存在指定的key。map.del(key): 删除一个键值对。map.keys(): 返回该 topic 下所有key的数组。map.destroy(): 销毁整个 topic 的 map,释放资源。
使用示例
这个例子展示了主线程初始化一个状态,Worker 线程读取、修改并增加这个状态,最后主线程验证变更。
main.js
import bus from "./dxmodules/dxEventBus.js";
import map from "./dxmodules/dxMap.js";
import log from "./dxmodules/dxLogger.js";
// 1. 获取一个用于线程间共享的 map 实例
const sharedMap = map.get("app_status");
// 2. 主线程设置初始状态
log.info("[Main] Setting initial status...");
sharedMap.put("isRunning", true);
sharedMap.put("mode", "idle");
sharedMap.put("config", { version: "1.0.2" });
// 3. 启动 Worker
bus.newWorker("status_worker", "/app/code/worker.js");
// 4. 监听 Worker 完成的事件,并验证 map 的最终状态
bus.on("worker_finished", () => {
log.info("[Main] Worker finished. Verifying final status:");
log.info(`- isRunning: ${sharedMap.get("isRunning")}`); // 预期: false
log.info(`- mode: ${sharedMap.get("mode")}`); // 预期: 'finished'
log.info(`- workerId: ${sharedMap.get("workerId")}`); // 预期: 'status_worker'
sharedMap.destroy(); // 清理
});
worker.js
import bus from "./dxmodules/dxEventBus.js";
import map from "./dxmodules/dxMap.js";
import log from "./dxmodules/dxLogger.js";
// 1. 在 Worker 中获取同一个 map 实例
const sharedMap = map.get("app_status");
log.info("[Worker] Worker started.");
// 2. 读取主线程设置的状态
const initialMode = sharedMap.get("mode");
log.info(`[Worker] Read initial mode: ${initialMode}`); // 预期: 'idle'
// 3. 修改和增加状态
sharedMap.put("isRunning", false);
sharedMap.put("mode", "finished");
sharedMap.put("workerId", bus.id); // bus.id 可以获取当前worker的id
log.info("[Worker] Status updated.");
// 4. 通知主线程任务已完成
bus.fire("worker_finished");
dxQueue: 生产者-消费者数据流
dxQueue 实现了一个全局的、按名称隔离的、固定容量的先进先出队列。它是实现“生产者-消费者”模式的理想工具,一个或多个线程(生产者)向队列中放入数据,一个或多个线程(消费者)从队列中取出数据进行处理。
核心机制
与 dxMap 类似,dxQueue 也是由底层 C 实现的线程安全数据结构。它内部带有锁机制,确保 push 和 pop 操作的原子性。