Source: server.js


/* global ErrorCat */
'use strict';

const assign = require('101/assign');
const clone = require('101/clone');
const defaults = require('101/defaults');
const errorCat = require('error-cat');
const Immutable = require('immutable');
const isFunction = require('101/is-function');
const isObject = require('101/is-object');
const pick = require('101/pick');
const Promise = require('bluebird');

const logger = require('./logger');
const RabbitMQ = require('./rabbitmq');
const Worker = require('./worker');

/**
 * Ponos server class. Given a queue adapter the worker server will
 * connect to RabbitMQ, subscribe to the given queues, and begin spawning
 * workers for incoming jobs.
 *
 * The only required option is `opts.queues` which should be a non-empty flat
 * list of strings. The server uses this list to subscribe to only queues you
 * have provided.
 *
 * @author Bryan Kendall
 * @author Ryan Sandor Richards
 * @param {Object} opts Options for the server.
 * @param {ErrorCat} [opts.errorCat] An error cat instance to use for the
 *   server.
 * @param {Object<String, Function>} [opts.events] Mapping of event (fanout)
 *   exchanges which to subscribe and handlers.
 * @param {bunyan} [opts.log] A bunyan logger to use for the server.
 * @param {String} [opts.name=ponos] A name to namespace the created exchange queues.
 * @param {Object} [opts.rabbitmq] RabbitMQ connection options.
 * @param {Object} [opts.rabbitmq.channel] RabbitMQ channel options.
 * @param {Object} [opts.rabbitmq.channel.prefetch] Set prefetch for each
 *   consumer in a channel.
 * @param {String} [opts.rabbitmq.hostname=localhost] Hostname for RabbitMQ. Can
 *   be set with environment variable RABBITMQ_HOSTNAME.
 * @param {Number} [opts.rabbitmq.port=5672] Port for RabbitMQ. Can be set with
 *   environment variable RABBITMQ_PORT.
 * @param {String} [opts.rabbitmq.username] Username for RabbitMQ. Can be set
 *   with environment variable RABBITMQ_USERNAME.
 * @param {String} [opts.rabbitmq.password] Username for Password. Can be set
 *   with environment variable RABBITMQ_PASSWORD.
 * @param {Object<String, Function>} [opts.tasks] Mapping of queues to subscribe
 *   directly with handlers.
 */
class Server {

  constructor(opts) {
    this._opts = assign({}, opts);
    this.log = this._opts.log || logger.child({ module: 'ponos:server' });
    this._workerOptions = {};

    this._tasks = new Immutable.Map();
    if (this._opts.tasks) {
      this.setAllTasks(this._opts.tasks);
    }
    this._events = new Immutable.Map();
    if (this._opts.events) {
      this.setAllEvents(this._opts.events);
    }

    this.errorCat = this._opts.errorCat || errorCat;

    // add the name to RabbitMQ options
    const rabbitmqOpts = defaults(this._opts.rabbitmq || {}, { name: this._opts.name });
    this._rabbitmq = new RabbitMQ(rabbitmqOpts);
  }

  /**
   * Start consuming from the subscribed queues. This is called by `.start`.
   * This can be called after the server has been started to start consuming
   * from additional queues.
   *
   * @return {Promise} Promise resolved when consuming has started.
   */
  consume() {
    return this._rabbitmq.consume();
  }

  /**
   * Starts the worker server, connects to RabbitMQ, subscribes and consumes
   * from all the provided queues and exchanges (tasks and events).
   *
   * @return {Promise} Promise that resolves once the server is listening.
   */
  start() {
    this.log.trace('starting');
    return this._rabbitmq.connect().then(() => {
      return this._subscribeAll();
    }).then(() => {
      return this.consume();
    }).then(() => {
      this.log.trace('started');
    }).catch(err => {
      this.errorCat.report(err);
      throw err;
    });
  }

  /**
   * Stops the worker server, unsubscribing and disconnecting from RabbitMQ.
   *
   * @return {Promise} A promise that resolves when the server is stopped.
   */
  stop() {
    this.log.trace('stopping');
    return this._rabbitmq.unsubscribe().then(() => {
      return this._rabbitmq.disconnect();
    }).then(() => {
      this.log.trace('stopped');
    }).catch(err => {
      this.errorCat.report(err);
      throw err;
    });
  }

  /**
   * Takes a map of queues and task handlers and sets them all.
   *
   * @param {Object<String, Function>} map A map of queue names and task
   *   handlers.
   * @param {String} map.key Queue name.
   * @param {Object} map.value Object with a handler and additional options for
   *   the worker (must have a `.task` handler function)
   * @param {Function} map.value Handler function to take a job.
   * @returns {Server} The server.
   */
  setAllTasks(map) {
    if (!isObject(map)) {
      throw new Error('ponos.server: setAllTasks must be called with an object');
    }
    Object.keys(map).forEach(key => {
      const value = map[key];
      if (isObject(value)) {
        if (!isFunction(value.task)) {
          this.log.warn({ key: key }, 'no task function defined for key');
          return;
        }
        this.setTask(key, value.task, value);
      } else {
        this.setTask(key, map[key]);
      }
    });
    return this;
  }

  /**
   * Takes a map of event exchanges and handlers and subscribes to them all.
   *
   * @param {Object<String, Function>} map A map of exchanges and task handlers.
   * @param {String} map.key Exchange name.
   * @param {Object} map.value Object with handler and additional options for
   *   the worker (must have a `.task` handler function)
   * @param {Function} map.value Handler function to take a job.
   * @returns {Server} The server.
   */
  setAllEvents(map) {
    if (!isObject(map)) {
      throw new Error('ponos.server: setAllEvents must be called with an object');
    }
    Object.keys(map).forEach(key => {
      const value = map[key];
      if (isObject(value)) {
        if (!isFunction(value.task)) {
          this.log.warn({ key: key }, 'no task function defined for key');
          return;
        }
        this.setEvent(key, value.task, value);
      } else {
        this.setEvent(key, map[key]);
      }
    });
    return this;
  }

  /**
   * Assigns a task to a queue.
   *
   * @param {String} queueName Queue name.
   * @param {Function} task Function to take a job and return a promise.
   * @param {Object} [opts] Options for the worker that performs the task.
   * @returns {Server} The server.
   */
  setTask(queueName, task, opts) {
    this.log.trace({
      queue: queueName,
      method: 'setTask'
    }, 'setting task for queue');
    if (!isFunction(task)) {
      throw new Error('ponos.server: setTask task handler must be a function');
    }
    this._tasks = this._tasks.set(queueName, task);
    this._workerOptions[queueName] = opts && isObject(opts) ? pick(opts, 'msTimeout') : {};
    return this;
  }

  /**
   * Assigns a task to an exchange.
   *
   * @param {String} exchangeName Exchange name.
   * @param {Function} task Function to take a job and return a promise.
   * @param {Object} [opts] Options for the worker that performs the task.
   * @returns {Server} The server.
   */
  setEvent(exchangeName, task, opts) {
    this.log.trace({
      exchange: exchangeName,
      method: 'setEvent'
    }, 'setting task for queue');
    if (!isFunction(task)) {
      throw new Error('ponos.server: setEvent task handler must be a function');
    }
    this._events = this._events.set(exchangeName, task);
    this._workerOptions[exchangeName] = opts && isObject(opts) ? pick(opts, 'msTimeout') : {};
    return this;
  }

  // Private Methods

  /**
   * Helper function to subscribe to all queues.
   *
   * @private
   * @return {Promise} Promise that resolves when queues are all subscribed.
   */
  _subscribeAll() {
    this.log.trace('_subscribeAll');
    const tasks = this._tasks;
    const events = this._events;
    return Promise.map(tasks.keySeq(), queue => {
      return this._rabbitmq.subscribeToQueue(queue, (job, done) => {
        this._runWorker(queue, tasks.get(queue), job, done);
      });
    }).then(() => {
      return Promise.map(events.keySeq(), exchange => {
        return this._rabbitmq.subscribeToFanoutExchange(exchange, (job, done) => {
          this._runWorker(exchange, events.get(exchange), job, done);
        });
      });
    });
  }

  /**
   * Runs a worker for the given queue name, job, and acknowledgement callback.
   *
   * @private
   * @param {String} queueName Name of the queue.
   * @param {Function} handler Handler to perform the work.
   * @param {Object} job Job for the worker to perform.
   * @param {Function} done RabbitMQ acknowledgement callback.
   */
  _runWorker(queueName, handler, job, done) {
    this.log.trace({
      queue: queueName,
      job: job,
      method: '_runWorker'
    }, 'running worker');
    const opts = clone(this._workerOptions[queueName]);
    defaults(opts, {
      queue: queueName,
      job: job,
      task: handler,
      done: done,
      log: this.log,
      errorCat: this.errorCat
    });
    Worker.create(opts);
  }
}

/**
 * Server class.
 * @module ponos/lib/server
 * @see Server
 */
module.exports = Server;