Source: rabbitmq.js


/* global RabbitMQChannel RabbitMQConfirmChannel RabbitMQConnection SubscribeObject RabbitMQOptions */
'use strict';

const amqplib = require('amqplib');
const defaults = require('101/defaults');
const getNamespace = require('continuation-local-storage').getNamespace;
const Immutable = require('immutable');
const isFunction = require('101/is-function');
const isObject = require('101/is-object');
const isString = require('101/is-string');
const Promise = require('bluebird');
const uuid = require('uuid');

const logger = require('./logger');

/**
 * RabbitMQ model. Can be used independently for publishing or other uses.
 *
 * @author Bryan Kendall
 * @param {Object} [opts] RabbitMQ connection options.
 * @param {Object} [opts.channel] RabbitMQ channel options.
 * @param {Object} [opts.channel.prefetch] Set prefetch for each consumer in a
 *   channel.
 * @param {String} [opts.hostname=localhost] Hostname for RabbitMQ. Can be set
 *   with environment variable RABBITMQ_HOSTNAME.
 * @param {Number} [opts.port=5672] Port for RabbitMQ. Can be set with
 *   environment variable RABBITMQ_PORT.
 * @param {String} [opts.username] Username for RabbitMQ. Can be set with
 *   environment variable RABBITMQ_USERNAME.
 * @param {String} [opts.password] Username for Password. Can be set with
 *   environment variable RABBITMQ_PASSWORD.
 */
class RabbitMQ {

  constructor(opts) {
    this.name = opts.name || 'ponos';
    this.hostname = opts.hostname || process.env.RABBITMQ_HOSTNAME || 'localhost';
    this.port = opts.port || parseInt(process.env.RABBITMQ_PORT, 10) || 5672;
    this.username = opts.username || process.env.RABBITMQ_USERNAME;
    this.password = opts.password || process.env.RABBITMQ_PASSWORD;
    this.log = logger.child({
      module: 'ponos:rabbitmq',
      hostname: this.hostname,
      port: this.port,
      clientName: this.name
    });
    this.channelOpts = opts.channel || {};
    if (!this.username || !this.password) {
      this.log.warn('RabbitMQ username and password not found. See Ponos Server ' + 'constructor documentation.');
    }
    this._setCleanState();
  }

  /**
   * Connect to the RabbitMQ server.
   *
   * @return {Promise} Promise that resolves once connection is established.
   */
  connect() {
    if (this._isPartlyConnected() || this._isConnected()) {
      return Promise.reject(new Error('cannot call connect twice'));
    }
    let authString = '';
    if (this.username && this.password) {
      authString = `${ this.username }:${ this.password }@`;
    }
    const url = `amqp://${ authString }${ this.hostname }:${ this.port }`;
    this.log.info({ url: url }, 'connecting');
    return Promise.resolve(amqplib.connect(url, {})).catch(err => {
      this.log.fatal({ err: err }, 'an error occured while connecting');
      throw err;
    }).then(conn => {
      this.connection = conn;
      this.log.info('connected');
      this.connection.on('error', this._connectionErrorHandler.bind(this));

      this.log.info('creating channel');
      return Promise.resolve(this.connection.createChannel()).catch(err => {
        this.log.fatal({ err: err }, 'an error occured creating channel');
        throw err;
      });
    }).then(channel => {
      if (this.channelOpts.prefetch) {
        this.log.info('setting prefetch on channel');
        return Promise.resolve(channel.prefetch(this.channelOpts.prefetch)).return(channel);
      }
      return channel;
    }).then(channel => {
      this.log.info('created channel');
      this.channel = channel;
      this.channel.on('error', this._channelErrorHandler.bind(this));

      this.log.info('creating publishing channel');
      return Promise.resolve(this.connection.createConfirmChannel()).catch(err => {
        this.log.fatal({ err: err }, 'errored creating confirm channel');
        throw err;
      });
    }).then(channel => {
      this.log.info('created confirm channel');
      this.publishChannel = channel;
      this.publishChannel.on('error', this._channelErrorHandler.bind(this));
    });
  }

  /**
   * Takes an object representing a message and sends it to a queue.
   *
   * @deprecated
   * @param {String} queue Queue to receive the message.
   * @param {Object} content Content to send.
   * @return {Promise} Promise resolved when message is sent to queue.
   */
  publishToQueue(queue, content) {
    return Promise.try(() => {
      this.log.warn({
        method: 'publishToQueue',
        queue
      }, 'rabbitmq.publishToQueue is deprecated. use `publishTask`.');
      return this.publishTask(queue, content);
    });
  }

  /**
   * Takes an object representing a message and sends it to an exchange using
   * a provided routing key.
   *
   * Note: Providing an empty string as the routing key is functionally the same
   * as sending the message directly to a named queue. The function
   * {@link RabbitMQ#publishToQueue} is preferred in this case.
   *
   * @deprecated
   * @param {String} queue Exchange to receive the message.
   * @param {String} routingKey Routing Key for the exchange.
   * @param {Object} content Content to send.
   * @return {Promise} Promise resolved when message is sent to the exchange.
   */
  publishToExchange(exchange, routingKey, content) {
    return Promise.try(() => {
      this.log.warn({
        method: 'publishToExchange',
        exchange
      }, 'rabbitmq.publishToExchange is deprecated. use `publishEvent`.');
      return this.publishEvent(exchange, content);
    });
  }

  /**
   * Takes an object representing a message and sends it to a task queue.
   *
   * @param {String} queue Task queue to receive the message.
   * @param {Object} content Job to send.
   * @return {Promise} Promise resolved when message is sent to queue.
   */
  publishTask(queue, content) {
    return Promise.try(() => {
      const bufferContent = this._validatePublish(queue, content);
      return Promise.resolve(this.publishChannel.sendToQueue(queue, bufferContent));
    });
  }

  /**
   * Sends an object representing a message to an exchange for the specified
   * event.
   *
   * @param {String} queue Exchange to receive the message.
   * @param {Object} content Content to send.
   * @return {Promise} Promise resolved when message is sent to the exchange.
   */
  publishEvent(exchange, content) {
    return Promise.try(() => {
      const bufferContent = this._validatePublish(exchange, content);
      // events do not need a routing key (so we send '')
      return Promise.resolve(this.publishChannel.publish(exchange, '', bufferContent));
    });
  }

  /**
   * Subscribe to a specific direct queue.
   *
   * @private
   * @param {String} queue Queue name.
   * @param {Function} handler Handler for jobs.
   * @param {Object} [queueOptions] Options for the queue.
   * @see RabbitMQ.AMQPLIB_QUEUE_DEFAULTS
   * @return {Promise} Promise that is resolved once queue is subscribed.
   */
  subscribeToQueue(queue, handler, queueOptions) {
    const log = this.log.child({
      method: 'subscribeToQueue',
      queue: queue
    });
    log.info('subscribing to queue');
    if (!this._isConnected()) {
      return Promise.reject(new Error('you must .connect() before subscribing'));
    }
    if (!isFunction(handler)) {
      log.error('handler must be a function');
      return Promise.reject(new Error(`handler for ${ queue } must be a function`));
    }
    if (this.subscribed.has(`queue:::${ queue }`)) {
      log.warn('already subscribed to queue');
      return Promise.resolve();
    }
    return Promise.resolve(this.channel.assertQueue(queue, defaults(queueOptions, RabbitMQ.AMQPLIB_QUEUE_DEFAULTS))).then(() => {
      log.info('queue asserted, binding queue');
      this.subscriptions = this.subscriptions.set(queue, handler);
      this.subscribed = this.subscribed.add(`queue:::${ queue }`);
    });
  }

  /**
   * Subcribe to fanout exchange.
   *
   * @private
   * @param {String} exchange Name of fanout exchange.
   * @param {Function} handler Handler for jobs.
   * @param {Object} [rabbitMQOptions] Options for the queues and exchanges.
   * @param {Object} [rabbitMQOptions.queueOptions] Options for the queue.
   * @see RabbitMQ.AMQPLIB_QUEUE_DEFAULTS
   * @param {Object} [rabbitMQOptions.exchangeOptions] Options for the exchange.
   * @see RabbitMQ.AMQPLIB_EXCHANGE_DEFAULTS
   * @return {Promise} Promise resolved once subscribed.
   */
  subscribeToFanoutExchange(exchange, handler, rabbitMQOptions) {
    const opts = {
      exchange: exchange,
      type: 'fanout',
      handler: handler,
      queueOptions: {},
      exchangeOptions: {}
    };
    if (rabbitMQOptions && rabbitMQOptions.queueOptions) {
      opts.queueOptions = rabbitMQOptions.queueOptions;
    }
    if (rabbitMQOptions && rabbitMQOptions.exchangeOptions) {
      opts.exchangeOptions = rabbitMQOptions.exchangeOptions;
    }
    return this._subscribeToExchange(opts);
  }

  /**
   * Subscribe to topic exchange.
   *
   * @private
   * @param {String} exchange Name of topic exchange.
   * @param {String} routingKey Routing key for topic exchange.
   * @param {Function} handler Handler for jobs.
   * @param {Object} [rabbitMQOptions] Options for the queues and exchanges.
   * @param {Object} [rabbitMQOptions.exchangeOptions] Options for the exchange.
   * @see RabbitMQ.AMQPLIB_EXCHANGE_DEFAULTS
   * @param {Object} [rabbitMQOptions.queueOptions] Options for the queue.
   * @see RabbitMQ.AMQPLIB_QUEUE_DEFAULTS
   * @return {Promise} Promise resolved once subscribed.
   */
  subscribeToTopicExchange(exchange, routingKey, handler, rabbitMQOptions) {
    const opts = {
      exchange: exchange,
      type: 'topic',
      routingKey: routingKey,
      handler: handler,
      queueOptions: {},
      exchangeOptions: {}
    };
    if (rabbitMQOptions && rabbitMQOptions.queueOptions) {
      opts.queueOptions = rabbitMQOptions.queueOptions;
    }
    if (rabbitMQOptions && rabbitMQOptions.exchangeOptions) {
      opts.exchangeOptions = rabbitMQOptions.exchangeOptions;
    }
    return this._subscribeToExchange(opts);
  }

  /**
   * Start consuming from subscribed queues.
   *
   * @private
   * @return {Promise} Promise resolved when all queues consuming.
   */
  consume() {
    const log = this.log.child({ method: 'consume' });
    log.info('starting to consume');
    if (!this._isConnected()) {
      return Promise.reject(new Error('you must .connect() before consuming'));
    }
    const subscriptions = this.subscriptions;
    this.subscriptions = new Immutable.Map();
    const channel = this.channel;
    return Promise.map(subscriptions.keySeq(), queue => {
      const handler = subscriptions.get(queue);
      log.info({ queue: queue }, 'consuming on queue');
      // XXX(bryan): is this valid? should I not be checking _this_.consuming?
      if (this.consuming.has(queue)) {
        log.warn({ queue: queue }, 'already consuming queue');
        return Promise.resolve();
      }
      function wrapper(msg) {
        let job;
        try {
          job = JSON.parse(msg.content);
        } catch (err) {
          // relatively safe stringifying - could be buffer, could be invalid
          log.error({ job: '' + msg.content }, 'content not valid JSON');
          return channel.ack(msg);
        }
        handler(job, () => {
          channel.ack(msg);
        });
      }
      return Promise.resolve(this.channel.consume(queue, wrapper)).then(consumeInfo => {
        this.consuming = this.consuming.set(queue, consumeInfo.consumerTag);
      });
    });
  }

  /**
   * Unsubscribe and stop consuming from all queues.
   *
   * @private
   * @return {Promise} Promise resolved when all queues canceled.
   */
  unsubscribe() {
    const consuming = this.consuming;
    return Promise.map(consuming.keySeq(), queue => {
      const consumerTag = consuming.get(queue);
      return Promise.resolve(this.channel.cancel(consumerTag)).then(() => {
        this.consuming = this.consuming.delete(queue);
      });
    });
  }

  /**
   * Disconnect from RabbitMQ.
   *
   * @return {Promise} Promise resolved when disconnected from RabbitMQ.
   */
  disconnect() {
    if (!this._isPartlyConnected()) {
      return Promise.reject(new Error('not connected. cannot disconnect.'));
    }
    return Promise.resolve(this.publishChannel.waitForConfirms()).then(() => Promise.resolve(this.connection.close())).then(() => this._setCleanState());
  }

  // Private Methods

  /**
   * Helper method to re-set the state of the model to be 'clean'.
   *
   * @private
   */
  _setCleanState() {
    delete this.channel;
    delete this.connection;
    this.subscriptions = new Immutable.Map();
    this.subscribed = new Immutable.Set();
    this.consuming = new Immutable.Map();
  }

  /**
   * Error handler for the RabbitMQ connection.
   *
   * @private
   * @throws {Error}
   * @param {object} err Error object from event.
   */
  _connectionErrorHandler(err) {
    this.log.fatal({ err: err }, 'connection has caused an error');
    throw err;
  }

  /**
   * Error handler for the RabbitMQ channel.
   *
   * @private
   * @throws {Error}
   * @param {object} err Error object from event.
   */
  _channelErrorHandler(err) {
    this.log.fatal({ err: err }, 'channel has caused an error');
    throw err;
  }

  /**
   * Check to see if model is connected.
   *
   * @private
   * @return {Boolean} True if model is connected and channel is established.
   */
  _isConnected() {
    return !!(this._isPartlyConnected() && this.channel && this.publishChannel);
  }

  /**
   * Check to see if model is _partially_ connected. This means that the
   * connection was established, but the channel was not.
   *
   * @private
   * @return {Boolean} True if connection is established.
   */
  _isPartlyConnected() {
    return !!this.connection;
  }

  /**
   * Helper function to consolidate logic for subscribing to queues. Stores
   * information about what is subscribed and is responsible for asserting
   * exchanges and queues into existance.
   *
   * @private
   * @param {Object} opts Object describing the exchange connection.
   * @param {String} opts.exchange Name of exchange.
   * @param {String} opts.handler Handler of jobs.
   * @param {String} opts.type Type of exchange: 'fanout' or 'topic'.
   * @param {Object} [opts.exchangeOptions] Options for the exchange.
   * @see RabbitMQ.AMQPLIB_EXCHANGE_DEFAULTS
   * @param {Object} [opts.queueOptions] Options for the queue.
   * @see RabbitMQ.AMQPLIB_QUEUE_DEFAULTS
   * @param {String} [opts.routingKey] Routing key for a topic exchange.
   * @return {Promise} Promise resolved when subcribed to exchange.
   */
  _subscribeToExchange(opts) {
    const log = this.log.child({
      method: '_subscribeToExchange',
      opts: opts
    });
    log.info('subscribing to exchange');
    if (!this._isConnected()) {
      return Promise.reject(new Error('must .connect() before subscribing'));
    }
    if (opts.type === 'topic' && !opts.routingKey) {
      return Promise.reject(new Error('routingKey required for topic exchange'));
    }
    let subscribedKey = `${ opts.type }:::${ opts.exchange }`;
    if (opts.type === 'topic') {
      subscribedKey = `${ subscribedKey }:::${ opts.routingKey }`;
    }
    if (this.subscribed.has(subscribedKey)) {
      log.warn(`already subscribed to ${ opts.type } exchange`);
      return Promise.resolve();
    }
    return Promise.resolve(this.channel.assertExchange(opts.exchange, opts.type, defaults(opts.exchangeOptions, RabbitMQ.AMQPLIB_EXCHANGE_DEFAULTS))).then(() => {
      log.info('exchange asserted');
      let queueName = `${ this.name }.${ opts.exchange }`;
      if (opts.type === 'topic') {
        queueName = `${ queueName }.${ opts.routingKey }`;
      }
      return Promise.resolve(this.channel.assertQueue(queueName, defaults(opts.queueOptions, RabbitMQ.AMQPLIB_QUEUE_DEFAULTS)));
    }).then(queueInfo => {
      const queue = queueInfo.queue;
      log.info({ queue: queue }, 'queue asserted');
      log.info('binding queue');
      if (!opts.routingKey) {
        opts.routingKey = '';
      }
      return Promise.resolve(this.channel.bindQueue(queue, opts.exchange, opts.routingKey)).return(queue);
    }).then(queue => {
      log.info('bound queue');
      this.subscriptions = this.subscriptions.set(queue, opts.handler);
      this.subscribed = this.subscribed.add(subscribedKey);
    });
  }

  /**
   * Validate publish params. Adds a TID to the job it does not already have
   * one.
   * @private
   * @param {String} name Name of queue or exchange.
   * @param {Object} content Content to send.
   * @throws {Error} Must be connected to RabbitMQ.
   * @throws {Error} Name must be a non-empty string.
   * @throws {Error} Object must be an Object.
   * @return {Buffer} Content to send as job.
   */
  _validatePublish(name, content) {
    if (!this._isConnected()) {
      throw new Error('you must call .connect() before publishing');
    }
    // flowtype does not prevent users from using this function incorrectly.
    if (!isString(name) || name === '') {
      throw new Error('name must be a string');
    }
    if (!isObject(content)) {
      throw new Error('content must be an object');
    }
    // add tid to message if one does not exist
    if (!content.tid) {
      const ns = getNamespace('ponos');
      const tid = ns && ns.get('tid');
      content.tid = tid || uuid();
    }
    const stringContent = JSON.stringify(content);
    return new Buffer(stringContent);
  }
}

/**
 * Default options provided for asserted queues.
 *
 * Reference the [amqplib docs]{@link
 * http://www.squaremobius.net/amqp.node/channel_api.html#channel_assertQueue}
 * for more information.
 *
 * @typedef AMQPLIB_QUEUE_DEFAULTS
 * @const {Object}
 * @property {Boolean} autoDelete=false Delete queue when it has 0 consumers.
 * @property {Boolean} durable=true Queue survives broker restarts.
 * @property {Boolean} exclusive=false Scopes the queue to the connection.
 */
RabbitMQ.AMQPLIB_QUEUE_DEFAULTS = {
  exclusive: false,
  durable: true,
  autoDelete: false
};

/**
 * Default options provided for asserted exchanges.
 *
 * Reference the [amqplib docs]{@link
 * http://www.squaremobius.net/amqp.node/channel_api.html#channel_assertExchange}
 * for more information.
 *
 * @typedef AMQPLIB_EXCHANGE_DEFAULTS
 * @const {Object}
 * @property {Boolean} autoDelete=false Delete exchange when it has 0 bindings.
 * @property {Boolean} durable=true Queue survives broker restarts.
 * @property {Boolean} internal=false Messages cannot be published directly to
 *   the exchange.
 */
RabbitMQ.AMQPLIB_EXCHANGE_DEFAULTS = {
  durable: true,
  internal: false,
  autoDelete: false
};

/**
 * RabbitMQ model.
 *
 * @module ponos/lib/rabbitmq
 * @see RabbitMQ
 */
module.exports = RabbitMQ;