Source: index.js

'use strict';

require('loadenv')('cluster-man');

var cluster = require('cluster');
var debug = require('debug');
var domain = require('domain');
var os = require('os');
var isFunction = require('101/is-function');
var noop = require('101/noop');
var defaults = require('101/defaults');
var pluck = require('map-utils').pluck;

/**
 * Extendable and easy-to-use node cluster management.
 * @module cluster-man
 * @author Ryan Sandor Richards, Anandkumar Patel
 */
module.exports = ClusterManager;

/**
 * Utility class for creating new server clusters.
 *
 * @example
 * var ClusterManager = require('cluster-man');
 * var server = require('./lib/server');
 *
 * // Basic usage (if you only need to handle workers)
 * new ClusterManager(server.start).start();
 *
 * @example
 * var ClusterManager = require('cluster-man');
 * var server = require('./lib/server');
 *
 * // Create a new cluster manager using options, for handling master process
 * // and worker processes with a specific number of workers.
 * var serverCluster = new ClusterManager({
 *   worker: server.start,
 *   master: masterStart,
 *   numWorkers: 4
 * });
 *
 * function masterStart(clusterManager) {
 *   // Any additional things you'd like to do after the cluster
 *   // has started...
 * }
 *
 * // Start the cluster
 * serverCluster.start();
 *
 * @author Ryan Sandor Richards.
 * @class
 * @param {object|function} opts Options for the cluster or a worker function to
 *   execute on worker processes.
 * @param {cluster-man~Callback} opt.worker Function to execute on the worker
 *   processes.
 * @param {cluster-man~Callback} opt.master Function to execute on the master
 *   process.
 * @param {Number} opt.numWorkers Number of workers to spawn. Defaults to the
 *   value in `process.env.CLUSTER_WORKERS` if present, and if not then the
 *   number of CPUs as reported by `os.cpus().length`.
 * @param {String} opt.debugScope Root scope for debug logging. Defaults to the
 *   value in `process.env.CLUSTER_DEBUG` if present, and if not then defaults
 *   to 'cluster-man'.
 * @param {Boolean} opt.killOnError=true Whether or not to kill the master
 *   process on and unhandled error.
 * @param {cluster-man~BeforeExit} opt.beforeExit Callback to execute before the
 *   master process exits in response to an error.
 * @throws Error If a opt.worker was not specified or was not a function.
 */
function ClusterManager(opts) {
  if (isFunction(opts)) {
    opts = { worker: opts };
  }
  this.options = opts || {};

  var hasNumWorkers = !this.options.numWorkers && !process.env.CLUSTER_WORKERS;

  defaults(this.options, {
    debugScope: process.env.CLUSTER_DEBUG || 'cluster-man',
    master: noop,
    numWorkers: process.env.CLUSTER_WORKERS || os.cpus().length,
    killOnError: true,
    beforeExit: function (err, done) {
      done();
    }
  });

  this._addLogger('info', [this.options.debugScope, 'info'].join(':'));
  this._addLogger('warning', [this.options.debugScope, 'warning'].join(':'));
  this._addLogger('error', [this.options.debugScope, 'error'].join(':'));

  if (!this.options.worker || !isFunction(this.options.worker)) {
    throw new Error('Cluster must be provided with a worker closure.');
  }

  if (!hasNumWorkers) {
    this.log.warning('Number of workers not specified, using default.');
  }

  if (!isFunction(this.options.beforeExit)) {
    this.log.warning('Before exit callback is not a function, removing.');
    this.options.beforeExit = noop;
  }

  this.workers = [];

  // This is here to expose the cluster without having to re-require in the
  // script that uses cluster-man
  this.cluster = cluster;
}

/**
 * Callback for performing tasks before the master process is killed.
 * @callback cluster-man~BeforeExit
 * @param {Error} [err] Error that caused the cluster to be shut down.
 * @param {function} done Execute this method when you are done performing
 *   tasks.
 */

/**
 * Starts either a cluster master or a worker depending on the process type at
 * the time of invocation.
 */
ClusterManager.prototype.start = function () {
  if (this.cluster.isMaster) {
    this._startMaster();
  }
  else {
    this._startWorker();
  }
};

/**
 * Adds a logger debug method to the manager.
 * @param {string} name Name of the logger method.
 * @param {string} label Output label for debug.
 */
ClusterManager.prototype._addLogger = function (name, label) {
  if (!this.log) {
    this.log = {};
  }
  this.log[name] = debug(label);
};

/**
 * Starts a cluster master. Specifically this will bind worker events to
 * specific handlers on this manager instance, fork all worker process, setup a
 * domain to catch unhandled errors on the master process and execute the master
 * process callback (as specified in the constructor).
 */
ClusterManager.prototype._startMaster = function() {
  var self = this;

  // Setup master process domain error handling
  var masterDomain = domain.create();
  masterDomain.on('error', function() {
    self.masterError.apply(self, arguments);
  });
  masterDomain.add(this);

  // Bind cluster events to this object.
  var eventNames = ['fork', 'listening', 'exit', 'online', 'disconnect'];
  eventNames.forEach(function (eventName) {
    self.cluster.on(eventName, function() {
      self[eventName].apply(self, arguments);
    });
  });

  // Spawn workers
  for (var i = 0; i < this.options.numWorkers; i++) {
    this.createWorker();
  }

  // Execute master callback from options
  masterDomain.run(function () {
    self.options.master(this);
  });
};

/**
 * Runs before exit callback and exits the master process.
 * @param {Error} [err] Error that caused the master process to exit.
 */
ClusterManager.prototype._exitMaster = function (err) {
  this.options.beforeExit(err, function () {
    process.exit(err ? 1 : 0);
  });
};

/**
 * Starts a cluster worker. Simply executes the provided worker callback.
 */
ClusterManager.prototype._startWorker = function() {
  this.options.worker(this);
};

/**
 * Creates a new worker. Specifically it forks a new worker, sets a domain error
 * handler for the worker, and returns it.
 * @return {cluster~Worker} Newly created worker.
 */
ClusterManager.prototype.createWorker = function () {
  var self = this;
  var worker = this.cluster.fork();

  // Deals with unhandled worker errors
  var workerDomain = domain.create();
  workerDomain.add(worker);
  workerDomain.on('error', function (err) {
    self.log.error('Unhandled worker error: ' + err.stack);
    worker.process.kill(1);
  });

  this.workers.push(worker);
  this.log.info('Created new worker: ' + worker.id);
  return worker;
};

/**
 * Handles worker `fork` events. This event is emitted when a worker is forked
 * off the master cluster.
 * @param {cluster~Worker} Worker that was forked.
 */
ClusterManager.prototype.fork = function (worker) {
  this.log.info('Worker forked: ' + worker.id);
};

/**
 * Handles worker `listening` events. Indicates to the master that a particular
 * worker is listening.
 * @param {cluster~Worker} Worker that is now listening.
 * @param address Address on which the worker is listening.
 */
ClusterManager.prototype.listening = function (worker, address) {
  this.log.info([
    'Worker listening:', worker.id,
    'on address', (address.address+':'+address.port)
  ].join(' '));
};

/**
 * Handles worker `exit` events.
 * @param {cluster~Worker} worker Worker that exited.
 * @param {Number} code Exit code for the worker process.
 * @param {String} signal Signal name that caused the process to be killed.
 */
ClusterManager.prototype.exit = function (worker, code, signal) {
  this.log.info([
    'Worker exited:', worker.id,
    '-- with status:', code,
    '-- and signal:', signal
  ].join(' '));

  var self = this;
  this.workers.map(pluck('id')).some(function (workerId, i) {
    if (workerId === worker.id) {
      self.workers.splice(i, 1);
    }
  });

  // If all the workers have been killed, exit the process
  if (this.workers.length === 0) {
    this.log.error('Cluster fatal: all worker have died. Master process exiting.');
    this._exitMaster(new Error('All workers have died.'));
  }
};

/**
 * Handles worker `online` events. This indicates to the cluster that a worker
 * process has successfully spawned a process and is running.
 * @param {cluster~Worker} worker Worker that came online.
 */
ClusterManager.prototype.online = function (worker) {
  this.log.info('Worker online: ' + worker.id);
};

/**
 * Handles worker `disconnect` events. This indicates that the worker has
 * disconnected from communication but is not nessessarily dead.
 * @param {cluster~Worker} worker Worker that disconnected.
 */
ClusterManager.prototype.disconnect = function (worker) {
  this.log.info('Worker disconnected: ' + worker.id + ' -- killing');
};

/**
 * Called when master process domain encounters an unhandled error. By default
 * this method will log the error stack, indicate that the error is fatal, and
 * kill the process with a status code `1`.
 * @param {Error} err Unhandled error on the master process.
 */
ClusterManager.prototype.masterError = function(err) {
  this.log.error('Unhandled master error: ' + err.stack);
  if (this.options.killOnError) {
    this.log.error('Cluster fatal: unhandled error in master process, exiting.');
    this._exitMaster(err);
  }
};