Source: worker.js

/* global ErrorCat WorkerError DDTimer */
'use strict';

const cls = require('continuation-local-storage').createNamespace('ponos');
const clsBlueBird = require('cls-bluebird');
const defaults = require('101/defaults');
const errorCat = require('error-cat');
const exists = require('101/exists');
const isNumber = require('101/is-number');
const isObject = require('101/is-object');
const merge = require('101/put');
const monitor = require('monitor-dog');
const pick = require('101/pick');
const Promise = require('bluebird');
const uuid = require('uuid');
const WorkerStopError = require('error-cat/errors/worker-stop-error');

const TimeoutError = Promise.TimeoutError;

 * Performs tasks for jobs on a given queue.
 * @author Bryan Kendall
 * @author Ryan Sandor Richards
 * @param {Object} opts Options for the worker.
 * @param {Function} opts.done Callback to execute when the job has successfully
 *   been completed.
 * @param {Object} opts.job Data for the job to process.
 * @param {String} opts.queue Name of the queue for the job the worker is
 *   processing.
 * @param {Function} opts.task A function to handle the tasks.
 * @param {ErrorCat} [opts.errorCat] An error-cat instance to use for the
 *   worker.
 * @param {bunyan} [opts.log] The bunyan logger to use when logging messages
 *   from the worker.
 * @param {number} [opts.msTimeout] A specific millisecond timeout for this
 *   worker.
 * @param {boolean} [opts.runNow] Whether or not to run the job immediately,
 *   defaults to `true`.
class Worker {

  constructor(opts) {
    // managed required fields
    const fields = ['done', 'job', 'log', 'queue', 'task'];
    fields.forEach(function (f) {
      if (!exists(opts[f])) {
        throw new Error(f + ' is required for a Worker');

    // manage field defaults
    fields.push('errorCat', 'log', 'msTimeout', 'runNow');
    opts = pick(opts, fields);
    defaults(opts, {
      // default non-required user options
      errorCat: errorCat,
      runNow: true,
      // other options
      attempt: 0,
      msTimeout: process.env.WORKER_TIMEOUT || 0,
      retryDelay: process.env.WORKER_MIN_RETRY_DELAY || 1

    this.tid = opts.job.tid || uuid();
    opts.log = opts.log.child({ tid: this.tid, module: 'ponos:worker' });
    // put all opts on this
    Object.assign(this, opts);{ queue: this.queue, job: this.job }, 'Worker created');

    // Ensure that the `msTimeout` option is valid
    this.msTimeout = parseInt(this.msTimeout, 10);
    if (!isNumber(this.msTimeout)) {
      throw new Error('Provided `msTimeout` is not an integer');

    if (this.msTimeout < 0) {
      throw new Error('Provided `msTimeout` is negative');

    if (this.runNow) {;

   * Factory method for creating new workers. This method exists to make it
   * easier to unit test other modules that need to instantiate new workers.
   * @see Worker
   * @param {Object} opts Options for the Worker.
   * @returns {Worker} New Worker.
  static create(opts) {
    return new Worker(opts);

   * Runs the worker. If the task for the job fails, then this method will retry
   * the task (with an exponential backoff) as set by the environment.
   * @returns {Promise} Promise that is resolved once the task succeeds or
   *   fails.
  run() {
    const timer = this._createTimer();
    const log = this.log.child({
      method: 'run',
      queue: this.queue,
      job: this.job

    return Promise.fromCallback(cb => { => {
        cls.set('tid', this.tid);
        Promise.try(() => {
          const attemptData = {
            attempt: this.attempt++,
            timeout: this.msTimeout
, 'running task');
          let taskPromise = Promise.try(() => {
            return this.task(this.job);

          if (this.msTimeout) {
            taskPromise = taskPromise.timeout(this.msTimeout);
          return taskPromise;
    }).then(result => {{ result: result }, 'Task complete');
      this._incMonitor('ponos.finish', { result: 'success' });
      return this.done();
    // if the type is TimeoutError, we will log and retry
    .catch(TimeoutError, err => {
      log.warn({ err: err }, 'Task timed out');
      this._incMonitor('ponos.finish', { result: 'timeout-error' });
      // by throwing this type of error, we will retry :)
      throw err;
    }).catch(err => {
      if (err.cause) {
        err = err.cause;
      if (!isObject( { = {};
      if (! { = this.queue;
      if (! { = this.job;
      throw err;
    // if it's a WorkerStopError, we can't accomplish the task
    .catch(WorkerStopError, err => {
      log.error({ err: err }, 'Worker task fatally errored');
      this._incMonitor('ponos.finish', { result: 'fatal-error' });
      // If we encounter a fatal error we should no longer try to schedule
      // the job.
      return this.done();
    }).catch(err => {
      const attemptData = {
        err: err,
        nextAttemptDelay: this.retryDelay
      log.warn(attemptData, 'Task failed, retrying');
      this._incMonitor('ponos.finish', { result: 'task-error' });

      // Try again after a delay
      return Promise.delay(this.retryDelay).then(() => {
        // Exponentially increase the retry delay
        const retryDelay = parseInt(process.env.WORKER_MAX_RETRY_DELAY) || 0;
        if (this.retryDelay < retryDelay) {
          this.retryDelay *= 2;
    }).finally(() => {
      if (timer) {

  // Private Methods

   * Helper function for reporting errors to rollbar via error-cat.
   * @private
   * @param {Error} err Error to report.
  _reportError(err) {;

   * Helper function for creating monitor-dog events tags. `queue` is the only
   * mandatory tag. Few tags will be created depending on the queue name. If
   * queueName use `.` as delimiter e.x. `` then the
   * following tags will be created:
   * {
   *   token0: 'push'
   *   token1: 'github.push'
   *   token2: 'api.github.push'
   *   token3: ''
   * }
   * @private
   * @returns {Object} tags as Object { queue: 'docker.event.publish' }.
  _eventTags() {
    const tokens = this.queue.split('.').reverse();
    let lastToken = '';
    let tags = tokens.reduce((acc, currentValue, currentIndex) => {
      const key = 'token' + currentIndex;
      const newToken = currentIndex === 0 ? currentValue : currentValue + '.' + lastToken;
      acc[key] = newToken;
      lastToken = newToken;
      return acc;
    }, {});
    tags.queue = this.queue;
    return tags;

   * Helper function calling `monitor.increment`. Monitor won't be called if
   * @private
   * @param {String} eventName Name to be reported into the datadog.
   * @param {Object} [extraTags] Extra tags to be send with the event.
  _incMonitor(eventName, extraTags) {
    if (process.env.WORKER_MONITOR_DISABLED) {
    let tags = this._eventTags();
    if (extraTags) {
      tags = merge(tags, extraTags);
    monitor.increment(eventName, tags);

   * Helper function calling `monitor.timer`. Timer won't be created if
   * @return {Object} New timer.
   * @private
  _createTimer() {
    const tags = this._eventTags();
    return !process.env.WORKER_MONITOR_DISABLED ? monitor.timer('ponos.timer', true, tags) : null;

 * Worker class.
 * @module ponos/lib/worker
 * @see Worker
module.exports = Worker;