dxWorkerPool
1. Overview
dxWorkerPool
module is part of the official system module library of dejaOS, used for managing a pool of worker threads to handle concurrent tasks, preventing performance bottlenecks in multi-threaded applications.
This module provides a thread-safe, task-queuing system for worker threads. It listens for events on specified topics from dxEventBus
and distributes the tasks to a pool of available workers for processing.
Key Features:
- Manages a pool of worker threads to handle concurrent tasks.
- Automatically distributes tasks from a shared queue to available workers.
- Buffers incoming tasks in a queue when all workers are busy.
- Designed as a global singleton; should be initialized only once.
- Event-driven task dispatching for high efficiency and low idle CPU usage.
- Simple and intuitive API for both main thread initialization and worker thread implementation.
2. Files
dxWorkerPool.js
- JavaScript module wrapper and implementation.
Ensure this file is included in the
dxmodules
subdirectory under your project root directory.
3. Dependencies
dxStd.js
dxLogger.js
dxEventBus.js
(must be passed during initialization)
4. Compatible Devices
Compatible with all devices running dejaOS v2.0+
5. API Reference
pool.init(file, bus, topics, count, maxsize)
Initializes the worker pool. This is the core function of the module and must be called once from the main thread before any tasks can be processed.
file
{string}
: (Required) The absolute path to the worker script file (e.g., '/app/code/src/worker.js').bus
{Object}
: (Required) ThedxEventBus
instance for task distribution.topics
{string[]}
: (Required) An array of event bus topics to subscribe to. Tasks from these topics will be processed by the pool. Must not be an empty array.count
{number}
: [Optional] The number of worker threads in the pool. Defaults to2
. Must be a positive number.maxsize
{number}
: [Optional] The maximum size of the task queue. If the queue is full, the oldest task is discarded. Defaults to100
.- Throws:
Error
if called from a worker thread, initialized more than once, or if parameters are invalid.
pool.callback(callbackFunction)
Registers the task handler function for a worker thread. This must be called from within the worker script.
callbackFunction
{function(Object): void}
: (Required) The callback function to execute for each task. The task object passed to the function contains{ topic: string, data: * }
.- Throws:
Error
if the callback is not a function, or if this method is called from the main thread.
pool.getWorkerId()
Returns the unique ID of the current worker thread, or 'main' if called from the main thread. This is useful for logging and debugging within worker scripts.
- Returns:
{string}
The worker's unique ID (e.g., 'pool__id0') or 'main'.
6. Usage Examples
Main Thread (main.js
)
import pool from 'dxmodules/dxWorkerPool.js';
import eventBus from 'dxmodules/dxEventBus.js';
import log from 'dxmodules/dxLogger.js';
import std from 'dxmodules/dxStd.js';
// 1. Initialize the worker pool
const workerFile = '/app/code/src/worker.js';
const topics = ['image.process', 'data.upload'];
const workerCount = 4; // Use 4 workers
const queueSize = 50;
log.info(`Initializing pool with ${workerCount} workers...`);
pool.init(workerFile, eventBus, topics, workerCount, queueSize);
log.info("Worker pool initialized.");
// 2. Fire events to be processed by the pool
log.info("Firing 10 tasks to the pool...");
for (let i = 1; i <= 10; i++) {
const topic = i % 2 === 0 ? topics[0] : topics[1];
const payload = { taskId: i, timestamp: Date.now() };
eventBus.fire(topic, payload);
}
log.info("All tasks fired. Workers will process them concurrently.");
Worker Script (worker.js
)
import pool from 'dxmodules/dxWorkerPool.js';
import log from 'dxmodules/dxLogger.js';
import std from 'dxmodules/dxStd.js';
// 1. Register the callback function to handle tasks.
pool.callback(function(task) {
const workerId = pool.getWorkerId();
log.info(`[${workerId}] Started task on topic '${task.topic}'.`);
log.info(`[${workerId}] Data: ${JSON.stringify(task.data)}`);
// 2. Simulate a long-running task.
const workTime = Math.floor(Math.random() * 500) + 100; // 100-600ms
std.sleep(workTime);
log.info(`[${workerId}] Finished task in ${workTime}ms.`);
});
7. Thread Safety and Task Distribution
The module is designed to be thread-safe and manages task distribution automatically.
- The
init
function can only be called from the main thread. - The
callback
function can only be called from a worker thread. - When an event is fired on a subscribed topic, the task is added to an internal queue.
- The module's event-driven dispatcher assigns tasks from the queue to the next available (idle) worker.
- This ensures that tasks are processed concurrently up to the maximum number of workers in the pool, and subsequent tasks are queued efficiently without busy-waiting or polling.
8. Demo
None