Source: worker.js


/* global ErrorCat WorkerError DDTimer */
'use strict';

const cls = require('continuation-local-storage').createNamespace('ponos');
const clsBlueBird = require('cls-bluebird');
const defaults = require('101/defaults');
const errorCat = require('error-cat');
const exists = require('101/exists');
const isNumber = require('101/is-number');
const isObject = require('101/is-object');
const merge = require('101/put');
const monitor = require('monitor-dog');
const pick = require('101/pick');
const Promise = require('bluebird');
const uuid = require('uuid');
const WorkerStopError = require('error-cat/errors/worker-stop-error');

const TimeoutError = Promise.TimeoutError;
clsBlueBird(cls);

/**
 * Performs tasks for jobs on a given queue.
 *
 * @author Bryan Kendall
 * @author Ryan Sandor Richards
 * @param {Object} opts Options for the worker.
 * @param {Function} opts.done Callback to execute when the job has successfully
 *   been completed.
 * @param {Object} opts.job Data for the job to process.
 * @param {String} opts.queue Name of the queue for the job the worker is
 *   processing.
 * @param {Function} opts.task A function to handle the tasks.
 * @param {ErrorCat} [opts.errorCat] An error-cat instance to use for the
 *   worker.
 * @param {bunyan} [opts.log] The bunyan logger to use when logging messages
 *   from the worker.
 * @param {number} [opts.msTimeout] A specific millisecond timeout for this
 *   worker.
 * @param {boolean} [opts.runNow] Whether or not to run the job immediately,
 *   defaults to `true`.
 */
class Worker {

  constructor(opts) {
    // managed required fields
    const fields = ['done', 'job', 'log', 'queue', 'task'];
    fields.forEach(function (f) {
      if (!exists(opts[f])) {
        throw new Error(f + ' is required for a Worker');
      }
    });

    // manage field defaults
    fields.push('errorCat', 'log', 'msTimeout', 'runNow');
    opts = pick(opts, fields);
    defaults(opts, {
      // default non-required user options
      errorCat: errorCat,
      runNow: true,
      // other options
      attempt: 0,
      msTimeout: process.env.WORKER_TIMEOUT || 0,
      retryDelay: process.env.WORKER_MIN_RETRY_DELAY || 1
    });

    this.tid = opts.job.tid || uuid();
    opts.log = opts.log.child({ tid: this.tid, module: 'ponos:worker' });
    // put all opts on this
    Object.assign(this, opts);
    this.log.info({ queue: this.queue, job: this.job }, 'Worker created');

    // Ensure that the `msTimeout` option is valid
    this.msTimeout = parseInt(this.msTimeout, 10);
    if (!isNumber(this.msTimeout)) {
      throw new Error('Provided `msTimeout` is not an integer');
    }

    if (this.msTimeout < 0) {
      throw new Error('Provided `msTimeout` is negative');
    }

    if (this.runNow) {
      this.run();
    }
  }

  /**
   * Factory method for creating new workers. This method exists to make it
   * easier to unit test other modules that need to instantiate new workers.
   *
   * @see Worker
   * @param {Object} opts Options for the Worker.
   * @returns {Worker} New Worker.
   */
  static create(opts) {
    return new Worker(opts);
  }

  /**
   * Runs the worker. If the task for the job fails, then this method will retry
   * the task (with an exponential backoff) as set by the environment.
   *
   * @returns {Promise} Promise that is resolved once the task succeeds or
   *   fails.
   */
  run() {
    this._incMonitor('ponos');
    const timer = this._createTimer();
    const log = this.log.child({
      method: 'run',
      queue: this.queue,
      job: this.job
    });

    return Promise.fromCallback(cb => {
      cls.run(() => {
        cls.set('tid', this.tid);
        Promise.try(() => {
          const attemptData = {
            attempt: this.attempt++,
            timeout: this.msTimeout
          };
          log.info(attemptData, 'running task');
          let taskPromise = Promise.try(() => {
            return this.task(this.job);
          });

          if (this.msTimeout) {
            taskPromise = taskPromise.timeout(this.msTimeout);
          }
          return taskPromise;
        }).asCallback(cb);
      });
    }).then(result => {
      log.info({ result: result }, 'Task complete');
      this._incMonitor('ponos.finish', { result: 'success' });
      return this.done();
    })
    // if the type is TimeoutError, we will log and retry
    .catch(TimeoutError, err => {
      log.warn({ err: err }, 'Task timed out');
      this._incMonitor('ponos.finish', { result: 'timeout-error' });
      // by throwing this type of error, we will retry :)
      throw err;
    }).catch(err => {
      if (err.cause) {
        err = err.cause;
      }
      if (!isObject(err.data)) {
        err.data = {};
      }
      if (!err.data.queue) {
        err.data.queue = this.queue;
      }
      if (!err.data.job) {
        err.data.job = this.job;
      }
      throw err;
    })
    // if it's a WorkerStopError, we can't accomplish the task
    .catch(WorkerStopError, err => {
      log.error({ err: err }, 'Worker task fatally errored');
      this._incMonitor('ponos.finish', { result: 'fatal-error' });
      this._reportError(err);
      // If we encounter a fatal error we should no longer try to schedule
      // the job.
      return this.done();
    }).catch(err => {
      const attemptData = {
        err: err,
        nextAttemptDelay: this.retryDelay
      };
      log.warn(attemptData, 'Task failed, retrying');
      this._incMonitor('ponos.finish', { result: 'task-error' });
      this._reportError(err);

      // Try again after a delay
      return Promise.delay(this.retryDelay).then(() => {
        // Exponentially increase the retry delay
        const retryDelay = parseInt(process.env.WORKER_MAX_RETRY_DELAY) || 0;
        if (this.retryDelay < retryDelay) {
          this.retryDelay *= 2;
        }
        return this.run();
      });
    }).finally(() => {
      if (timer) {
        timer.stop();
      }
    });
  }

  // Private Methods

  /**
   * Helper function for reporting errors to rollbar via error-cat.
   *
   * @private
   * @param {Error} err Error to report.
   */
  _reportError(err) {
    this.errorCat.report(err);
  }

  /**
   * Helper function for creating monitor-dog events tags. `queue` is the only
   * mandatory tag. Few tags will be created depending on the queue name. If
   * queueName use `.` as delimiter e.x. `10.0.0.20.api.github.push` then the
   * following tags will be created:
   * {
   *   token0: 'push'
   *   token1: 'github.push'
   *   token2: 'api.github.push'
   *   token3: '10.0.0.20.api.github.push'
   * }
   *
   * @private
   * @returns {Object} tags as Object { queue: 'docker.event.publish' }.
   */
  _eventTags() {
    const tokens = this.queue.split('.').reverse();
    let lastToken = '';
    let tags = tokens.reduce((acc, currentValue, currentIndex) => {
      const key = 'token' + currentIndex;
      const newToken = currentIndex === 0 ? currentValue : currentValue + '.' + lastToken;
      acc[key] = newToken;
      lastToken = newToken;
      return acc;
    }, {});
    tags.queue = this.queue;
    return tags;
  }

  /**
   * Helper function calling `monitor.increment`. Monitor won't be called if
   * `WORKER_MONITOR_DISABLED` is set.
   *
   * @private
   * @param {String} eventName Name to be reported into the datadog.
   * @param {Object} [extraTags] Extra tags to be send with the event.
   */
  _incMonitor(eventName, extraTags) {
    if (process.env.WORKER_MONITOR_DISABLED) {
      return;
    }
    let tags = this._eventTags();
    if (extraTags) {
      tags = merge(tags, extraTags);
    }
    monitor.increment(eventName, tags);
  }

  /**
   * Helper function calling `monitor.timer`. Timer won't be created if
   * `WORKER_MONITOR_DISABLED` is set.
   *
   * @return {Object} New timer.
   * @private
   */
  _createTimer() {
    const tags = this._eventTags();
    return !process.env.WORKER_MONITOR_DISABLED ? monitor.timer('ponos.timer', true, tags) : null;
  }
}

/**
 * Worker class.
 * @module ponos/lib/worker
 * @see Worker
 */
module.exports = Worker;