src_utils_thread.js

/**
 * Thread Pool.
 * Highly inspired from {@link http://www.smartjava.org/content/html5-easily-parallelize-jobs-using-web-workers-and-threadpool}.
 */
export class ThreadPool {

  /**
   * @param {number} poolSize The size of the pool.
   */
  constructor(poolSize) {
    this.poolSize = poolSize;
    // task queue
    this.taskQueue = [];
    // lsit of available threads
    this.freeThreads = [];
    // create 'poolSize' number of worker threads
    for (let i = 0; i < poolSize; ++i) {
      this.freeThreads.push(new WorkerThread(this));
    }
    // list of running threads (unsed in abort)
    this.runningThreads = [];
  }

  /**
   * Add a worker task to the queue.
   * Will be run when a thread is made available.
   *
   * @param {object} workerTask The task to add to the queue.
   */
  addWorkerTask(workerTask) {
    // send work start if first task
    if (this.freeThreads.length === this.poolSize) {
      this.onworkstart({type: 'work-start'});
    }
    // launch task or queue
    if (this.freeThreads.length > 0) {
      // get the first free worker thread
      const workerThread = this.freeThreads.shift();
      // add the thread to the runnning list
      this.runningThreads.push(workerThread);
      // run the input task
      workerThread.run(workerTask);
    } else {
      // no free thread, add task to queue
      this.taskQueue.push(workerTask);
    }
  }

  /**
   * Abort all threads.
   */
  abort() {
    // stop all threads
    this.#stop();
    // callback
    this.onabort({type: 'work-abort'});
    this.onworkend({type: 'work-end'});
  }

  /**
   * Handle a task end.
   *
   * @param {object} workerThread The thread to free.
   */
  onTaskEnd(workerThread) {
    // launch next task in queue or finish
    if (this.taskQueue.length > 0) {
      // get waiting task
      const workerTask = this.taskQueue.shift();
      // use input thread to run the waiting task
      workerThread.run(workerTask);
    } else {
      // stop the worker
      workerThread.stop();
      // no task to run, add to free list
      this.freeThreads.push(workerThread);
      // remove from running list
      for (let i = 0; i < this.runningThreads.length; ++i) {
        if (this.runningThreads[i].getId() === workerThread.getId()) {
          this.runningThreads.splice(i, 1);
        }
      }
      // the work is done when the queue is back to its initial size
      if (this.freeThreads.length === this.poolSize) {
        this.onwork({type: 'work'});
        this.onworkend({type: 'work-end'});
      }
    }
  }

  /**
   * Handle an error message from a worker.
   *
   * @param {object} event The error event.
   */
  handleWorkerError = (event) => {
    // stop all threads
    this.#stop();
    // callback
    this.onerror({error: event});
    this.onworkend({type: 'work-end'});
  };

  // private ----------------------------------------------------------------

  /**
   * Stop the pool: stop all running threads.
   *
   */
  #stop() {
    // clear tasks
    this.taskQueue = [];
    // cancel running workers
    for (let i = 0; i < this.runningThreads.length; ++i) {
      this.runningThreads[i].stop();
    }
    this.runningThreads = [];
  }


  /**
   * Handle a work start event.
   * Default does nothing.
   *
   * @param {object} _event The work start event.
   */
  onworkstart(_event) {}

  /**
   * Handle a work item event.
   * Default does nothing.
   *
   * @param {object} _event The work item event fired
   *   when a work item ended successfully.
   */
  onworkitem(_event) {}

  /**
   * Handle a work event.
   * Default does nothing.
   *
   * @param {object} _event The work event fired
   *   when a work ended successfully.
   */
  onwork(_event) {}

  /**
   * Handle a work end event.
   * Default does nothing.
   *
   * @param {object} _event The work end event fired
   *  when a work has completed, successfully or not.
   */
  onworkend(_event) {}

  /**
   * Handle an error event.
   * Default does nothing.
   *
   * @param {object} _event The error event.
   */
  onerror(_event) {}

  /**
   * Handle an abort event.
   * Default does nothing.
   *
   * @param {object} _event The abort event.
   */
  onabort(_event) {}

} // ThreadPool

/**
 * Worker background task.
 *
 * @external Worker
 * @see https://developer.mozilla.org/en-US/docs/Web/API/Worker
 */

/**
 * Worker thread.
 */
class WorkerThread {

  /**
   * @param {object} parentPool The parent pool.
   */
  constructor(parentPool) {
    this.parentPool = parentPool;
    // thread ID
    this.id = Math.random().toString(36).substring(2, 15);
    // running task
    this.runningTask = null;
    // worker used to run task
    this.worker;
  }

  /**
   * Get the thread ID.
   *
   * @returns {string} The thread ID (alphanumeric).
   */
  getId() {
    return this.id;
  }

  /**
   * Run a worker task
   *
   * @param {object} workerTask The task to run.
   */
  run(workerTask) {
    // store task
    this.runningTask = workerTask;
    // create a new web worker if not done yet
    if (typeof this.worker === 'undefined') {
      this.worker = new Worker(this.runningTask.script);
      // set callbacks
      this.worker.onmessage = this.onmessage;
      this.worker.onerror = this.onerror;
    }
    // launch the worker
    this.worker.postMessage(this.runningTask.startMessage);
  }

  /**
   * Finish a task and tell the parent.
   */
  stop() {
    // stop the worker
    if (typeof this.worker !== 'undefined') {
      this.worker.terminate();
      // force create at next run
      this.worker = undefined;
    }
  }

  /**
   * Message event handler.
   * For now assume we only get a single callback from a worker
   * which also indicates the end of this worker.
   *
   * @param {object} event The message event.
   */
  onmessage = (event) => {
    // augment event
    event.itemNumber = this.runningTask.info.itemNumber;
    event.numberOfItems = this.runningTask.info.numberOfItems;
    event.dataIndex = this.runningTask.info.dataIndex;
    // send event
    this.parentPool.onworkitem(event);
    // tell the parent pool the task is done
    this.parentPool.onTaskEnd(this);
  };

  /**
   * Error event handler.
   *
   * @param {object} event The error event.
   */
  onerror = (event) => {
    // augment event
    event.itemNumber = this.runningTask.info.itemNumber;
    event.numberOfItems = this.runningTask.info.numberOfItems;
    event.dataIndex = this.runningTask.info.dataIndex;
    // pass to parent
    this.parentPool.handleWorkerError(event);
    // stop the worker and free the thread
    this.stop();
  };
} // class WorkerThread

/**
 * Worker task.
 */
export class WorkerTask {
  /**
   * @param {string} script The worker script.
   * @param {object} message The data to pass to the worker.
   * @param {object} info Information object about the input data.
   */
  constructor(script, message, info) {
    // worker script
    this.script = script;
    // worker start message
    this.startMessage = message;
    // information about the work data
    this.info = info;
  }
}