Source: index.js

  1. 'use strict';
  2. require('loadenv')('cluster-man');
  3. var cluster = require('cluster');
  4. var debug = require('debug');
  5. var domain = require('domain');
  6. var os = require('os');
  7. var isFunction = require('101/is-function');
  8. var noop = require('101/noop');
  9. var defaults = require('101/defaults');
  10. var pluck = require('map-utils').pluck;
  11. /**
  12. * Extendable and easy-to-use node cluster management.
  13. * @module cluster-man
  14. * @author Ryan Sandor Richards, Anandkumar Patel
  15. */
  16. module.exports = ClusterManager;
  17. /**
  18. * Utility class for creating new server clusters.
  19. *
  20. * @example
  21. * var ClusterManager = require('cluster-man');
  22. * var server = require('./lib/server');
  23. *
  24. * // Basic usage (if you only need to handle workers)
  25. * new ClusterManager(server.start).start();
  26. *
  27. * @example
  28. * var ClusterManager = require('cluster-man');
  29. * var server = require('./lib/server');
  30. *
  31. * // Create a new cluster manager using options, for handling master process
  32. * // and worker processes with a specific number of workers.
  33. * var serverCluster = new ClusterManager({
  34. * worker: server.start,
  35. * master: masterStart,
  36. * numWorkers: 4
  37. * });
  38. *
  39. * function masterStart(clusterManager) {
  40. * // Any additional things you'd like to do after the cluster
  41. * // has started...
  42. * }
  43. *
  44. * // Start the cluster
  45. * serverCluster.start();
  46. *
  47. * @author Ryan Sandor Richards.
  48. * @class
  49. * @param {object|function} opts Options for the cluster or a worker function to
  50. * execute on worker processes.
  51. * @param {cluster-man~Callback} opt.worker Function to execute on the worker
  52. * processes.
  53. * @param {cluster-man~Callback} opt.master Function to execute on the master
  54. * process.
  55. * @param {Number} opt.numWorkers Number of workers to spawn. Defaults to the
  56. * value in `process.env.CLUSTER_WORKERS` if present, and if not then the
  57. * number of CPUs as reported by `os.cpus().length`.
  58. * @param {String} opt.debugScope Root scope for debug logging. Defaults to the
  59. * value in `process.env.CLUSTER_DEBUG` if present, and if not then defaults
  60. * to 'cluster-man'.
  61. * @param {Boolean} opt.killOnError=true Whether or not to kill the master
  62. * process on and unhandled error.
  63. * @param {cluster-man~BeforeExit} opt.beforeExit Callback to execute before the
  64. * master process exits in response to an error.
  65. * @throws Error If a opt.worker was not specified or was not a function.
  66. */
  67. function ClusterManager(opts) {
  68. if (isFunction(opts)) {
  69. opts = { worker: opts };
  70. }
  71. this.options = opts || {};
  72. var hasNumWorkers = !this.options.numWorkers && !process.env.CLUSTER_WORKERS;
  73. defaults(this.options, {
  74. debugScope: process.env.CLUSTER_DEBUG || 'cluster-man',
  75. master: noop,
  76. numWorkers: process.env.CLUSTER_WORKERS || os.cpus().length,
  77. killOnError: true,
  78. beforeExit: function (err, done) {
  79. done();
  80. }
  81. });
  82. this._addLogger('info', [this.options.debugScope, 'info'].join(':'));
  83. this._addLogger('warning', [this.options.debugScope, 'warning'].join(':'));
  84. this._addLogger('error', [this.options.debugScope, 'error'].join(':'));
  85. if (!this.options.worker || !isFunction(this.options.worker)) {
  86. throw new Error('Cluster must be provided with a worker closure.');
  87. }
  88. if (!hasNumWorkers) {
  89. this.log.warning('Number of workers not specified, using default.');
  90. }
  91. if (!isFunction(this.options.beforeExit)) {
  92. this.log.warning('Before exit callback is not a function, removing.');
  93. this.options.beforeExit = noop;
  94. }
  95. this.workers = [];
  96. // This is here to expose the cluster without having to re-require in the
  97. // script that uses cluster-man
  98. this.cluster = cluster;
  99. }
  100. /**
  101. * Callback for performing tasks before the master process is killed.
  102. * @callback cluster-man~BeforeExit
  103. * @param {Error} [err] Error that caused the cluster to be shut down.
  104. * @param {function} done Execute this method when you are done performing
  105. * tasks.
  106. */
  107. /**
  108. * Starts either a cluster master or a worker depending on the process type at
  109. * the time of invocation.
  110. */
  111. ClusterManager.prototype.start = function () {
  112. if (this.cluster.isMaster) {
  113. this._startMaster();
  114. }
  115. else {
  116. this._startWorker();
  117. }
  118. };
  119. /**
  120. * Adds a logger debug method to the manager.
  121. * @param {string} name Name of the logger method.
  122. * @param {string} label Output label for debug.
  123. */
  124. ClusterManager.prototype._addLogger = function (name, label) {
  125. if (!this.log) {
  126. this.log = {};
  127. }
  128. this.log[name] = debug(label);
  129. };
  130. /**
  131. * Starts a cluster master. Specifically this will bind worker events to
  132. * specific handlers on this manager instance, fork all worker process, setup a
  133. * domain to catch unhandled errors on the master process and execute the master
  134. * process callback (as specified in the constructor).
  135. */
  136. ClusterManager.prototype._startMaster = function() {
  137. var self = this;
  138. // Setup master process domain error handling
  139. var masterDomain = domain.create();
  140. masterDomain.on('error', function() {
  141. self.masterError.apply(self, arguments);
  142. });
  143. masterDomain.add(this);
  144. // Bind cluster events to this object.
  145. var eventNames = ['fork', 'listening', 'exit', 'online', 'disconnect'];
  146. eventNames.forEach(function (eventName) {
  147. self.cluster.on(eventName, function() {
  148. self[eventName].apply(self, arguments);
  149. });
  150. });
  151. // Spawn workers
  152. for (var i = 0; i < this.options.numWorkers; i++) {
  153. this.createWorker();
  154. }
  155. // Execute master callback from options
  156. masterDomain.run(function () {
  157. self.options.master(this);
  158. });
  159. };
  160. /**
  161. * Runs before exit callback and exits the master process.
  162. * @param {Error} [err] Error that caused the master process to exit.
  163. */
  164. ClusterManager.prototype._exitMaster = function (err) {
  165. this.options.beforeExit(err, function () {
  166. process.exit(err ? 1 : 0);
  167. });
  168. };
  169. /**
  170. * Starts a cluster worker. Simply executes the provided worker callback.
  171. */
  172. ClusterManager.prototype._startWorker = function() {
  173. this.options.worker(this);
  174. };
  175. /**
  176. * Creates a new worker. Specifically it forks a new worker, sets a domain error
  177. * handler for the worker, and returns it.
  178. * @return {cluster~Worker} Newly created worker.
  179. */
  180. ClusterManager.prototype.createWorker = function () {
  181. var self = this;
  182. var worker = this.cluster.fork();
  183. // Deals with unhandled worker errors
  184. var workerDomain = domain.create();
  185. workerDomain.add(worker);
  186. workerDomain.on('error', function (err) {
  187. self.log.error('Unhandled worker error: ' + err.stack);
  188. worker.process.kill(1);
  189. });
  190. this.workers.push(worker);
  191. this.log.info('Created new worker: ' + worker.id);
  192. return worker;
  193. };
  194. /**
  195. * Handles worker `fork` events. This event is emitted when a worker is forked
  196. * off the master cluster.
  197. * @param {cluster~Worker} Worker that was forked.
  198. */
  199. ClusterManager.prototype.fork = function (worker) {
  200. this.log.info('Worker forked: ' + worker.id);
  201. };
  202. /**
  203. * Handles worker `listening` events. Indicates to the master that a particular
  204. * worker is listening.
  205. * @param {cluster~Worker} Worker that is now listening.
  206. * @param address Address on which the worker is listening.
  207. */
  208. ClusterManager.prototype.listening = function (worker, address) {
  209. this.log.info([
  210. 'Worker listening:', worker.id,
  211. 'on address', (address.address+':'+address.port)
  212. ].join(' '));
  213. };
  214. /**
  215. * Handles worker `exit` events.
  216. * @param {cluster~Worker} worker Worker that exited.
  217. * @param {Number} code Exit code for the worker process.
  218. * @param {String} signal Signal name that caused the process to be killed.
  219. */
  220. ClusterManager.prototype.exit = function (worker, code, signal) {
  221. this.log.info([
  222. 'Worker exited:', worker.id,
  223. '-- with status:', code,
  224. '-- and signal:', signal
  225. ].join(' '));
  226. var self = this;
  227. this.workers.map(pluck('id')).some(function (workerId, i) {
  228. if (workerId === worker.id) {
  229. self.workers.splice(i, 1);
  230. }
  231. });
  232. // If all the workers have been killed, exit the process
  233. if (this.workers.length === 0) {
  234. this.log.error('Cluster fatal: all worker have died. Master process exiting.');
  235. this._exitMaster(new Error('All workers have died.'));
  236. }
  237. };
  238. /**
  239. * Handles worker `online` events. This indicates to the cluster that a worker
  240. * process has successfully spawned a process and is running.
  241. * @param {cluster~Worker} worker Worker that came online.
  242. */
  243. ClusterManager.prototype.online = function (worker) {
  244. this.log.info('Worker online: ' + worker.id);
  245. };
  246. /**
  247. * Handles worker `disconnect` events. This indicates that the worker has
  248. * disconnected from communication but is not nessessarily dead.
  249. * @param {cluster~Worker} worker Worker that disconnected.
  250. */
  251. ClusterManager.prototype.disconnect = function (worker) {
  252. this.log.info('Worker disconnected: ' + worker.id + ' -- killing');
  253. };
  254. /**
  255. * Called when master process domain encounters an unhandled error. By default
  256. * this method will log the error stack, indicate that the error is fatal, and
  257. * kill the process with a status code `1`.
  258. * @param {Error} err Unhandled error on the master process.
  259. */
  260. ClusterManager.prototype.masterError = function(err) {
  261. this.log.error('Unhandled master error: ' + err.stack);
  262. if (this.options.killOnError) {
  263. this.log.error('Cluster fatal: unhandled error in master process, exiting.');
  264. this._exitMaster(err);
  265. }
  266. };