Skip to main content

dxWorkerPool

1. Overview

dxWorkerPool module is part of the official system module library of dejaOS, used for managing a pool of worker threads to handle concurrent tasks, preventing performance bottlenecks in multi-threaded applications.

This module provides a thread-safe, task-queuing system for worker threads. It listens for events on specified topics from dxEventBus and distributes the tasks to a pool of available workers for processing.

Key Features:

  • Manages a pool of worker threads to handle concurrent tasks.
  • Automatically distributes tasks from a shared queue to available workers.
  • Buffers incoming tasks in a queue when all workers are busy.
  • Designed as a global singleton; should be initialized only once.
  • Event-driven task dispatching for high efficiency and low idle CPU usage.
  • Simple and intuitive API for both main thread initialization and worker thread implementation.

2. Files

  • dxWorkerPool.js - JavaScript module wrapper and implementation.

Ensure this file is included in the dxmodules subdirectory under your project root directory.

3. Dependencies

  • dxStd.js
  • dxLogger.js
  • dxEventBus.js (must be passed during initialization)

4. Compatible Devices

Compatible with all devices running dejaOS v2.0+

5. API Reference

pool.init(file, bus, topics, count, maxsize)

Initializes the worker pool. This is the core function of the module and must be called once from the main thread before any tasks can be processed.

  • file {string}: (Required) The absolute path to the worker script file (e.g., '/app/code/src/worker.js').
  • bus {Object}: (Required) The dxEventBus instance for task distribution.
  • topics {string[]}: (Required) An array of event bus topics to subscribe to. Tasks from these topics will be processed by the pool. Must not be an empty array.
  • count {number}: [Optional] The number of worker threads in the pool. Defaults to 2. Must be a positive number.
  • maxsize {number}: [Optional] The maximum size of the task queue. If the queue is full, the oldest task is discarded. Defaults to 100.
  • Throws: Error if called from a worker thread, initialized more than once, or if parameters are invalid.

pool.callback(callbackFunction)

Registers the task handler function for a worker thread. This must be called from within the worker script.

  • callbackFunction {function(Object): void}: (Required) The callback function to execute for each task. The task object passed to the function contains { topic: string, data: * }.
  • Throws: Error if the callback is not a function, or if this method is called from the main thread.

pool.getWorkerId()

Returns the unique ID of the current worker thread, or 'main' if called from the main thread. This is useful for logging and debugging within worker scripts.

  • Returns: {string} The worker's unique ID (e.g., 'pool__id0') or 'main'.

6. Usage Examples

Main Thread (main.js)

import pool from 'dxmodules/dxWorkerPool.js';
import eventBus from 'dxmodules/dxEventBus.js';
import log from 'dxmodules/dxLogger.js';
import std from 'dxmodules/dxStd.js';

// 1. Initialize the worker pool
const workerFile = '/app/code/src/worker.js';
const topics = ['image.process', 'data.upload'];
const workerCount = 4; // Use 4 workers
const queueSize = 50;

log.info(`Initializing pool with ${workerCount} workers...`);
pool.init(workerFile, eventBus, topics, workerCount, queueSize);
log.info("Worker pool initialized.");

// 2. Fire events to be processed by the pool
log.info("Firing 10 tasks to the pool...");
for (let i = 1; i <= 10; i++) {
const topic = i % 2 === 0 ? topics[0] : topics[1];
const payload = { taskId: i, timestamp: Date.now() };
eventBus.fire(topic, payload);
}
log.info("All tasks fired. Workers will process them concurrently.");

Worker Script (worker.js)

import pool from 'dxmodules/dxWorkerPool.js';
import log from 'dxmodules/dxLogger.js';
import std from 'dxmodules/dxStd.js';

// 1. Register the callback function to handle tasks.
pool.callback(function(task) {
const workerId = pool.getWorkerId();
log.info(`[${workerId}] Started task on topic '${task.topic}'.`);
log.info(`[${workerId}] Data: ${JSON.stringify(task.data)}`);

// 2. Simulate a long-running task.
const workTime = Math.floor(Math.random() * 500) + 100; // 100-600ms
std.sleep(workTime);

log.info(`[${workerId}] Finished task in ${workTime}ms.`);
});

7. Thread Safety and Task Distribution

The module is designed to be thread-safe and manages task distribution automatically.

  • The init function can only be called from the main thread.
  • The callback function can only be called from a worker thread.
  • When an event is fired on a subscribed topic, the task is added to an internal queue.
  • The module's event-driven dispatcher assigns tasks from the queue to the next available (idle) worker.
  • This ensures that tasks are processed concurrently up to the maximum number of workers in the pool, and subsequent tasks are queued efficiently without busy-waiting or polling.

8. Demo

None