Tutorial, JavaScript, RabbitMQ, Development

Creating a RabbitMQ Consumer in JavaScript

Since my post on Creating a RabbitMQ Consumer in Python was such a hit, I figured I'd follow it up with a tutorial on how to create a RabbitMQ Consumer in JavaScript.

The beauty of using a message broker, such as RabbitMQ, and messaging protocols, such as AMQP or MQTT, is that you can publish a message using, say, a Python client, and then consume that message in a JavaScript app - or C# app, or C++ app, or ... you get the point.

If you've read my tutorial on creating a consumer in Python, then you will probably notice similarities here, but, of course, the code will be much different - ya know, because it's JavaScript and not Python. ;-)

SETUP

Before jumping into the actual code, you will need to ensure your machine is properly set up with Node.js and NPM. I do have a post here on how to set up Node.js on a Raspberry Pi, so if that's your environment of choice, then you can refer to that post to help get you up and running. It would make a great system for hosting a message queueing app in an IoT-like setting - just sayin'.

Otherwise, if you're using any other OS/system, hit up the Node.js website and download the appropriate installer/binaries for your OS.

Once you have Node.js and NPM up and running on your machine, fire up a terminal window and navigate to your project directory. Once there, run the following command:

npm install amqplib

This will install the amqplib package from the NPM repository into a local node_modules directory, so that it can be referenced in your application. This package contains the necessary client code for working with the AMQP protocol, which will be used to pull messages from a queue on a RabbitMQ broker.

CONSTRUCTING THE MODULE

If you don't already have your IDE or code editor of choice open, do that now, and then create a new JavaScript source file called, rabbitmq-consumer.js (or whatever name makes sense to you).

We're going to be constructing a new module, which can then be pulled into your Node.js applications. Before we add and export the constructor function, though, we need to require a few things first.

I prefer to use strict mode, so if you want to enable it, make sure you have "use strict" at the top of your source file:

"use strict";

Next, we need to require a few modules:

"use strict";

var amqp = require('amqplib');
var EventEmitter = require('events').EventEmitter;
var util = require('util');

The first module is amqplib, which is what we just installed and will be using to pull messages off a message queue.

The next module pulled in is the Node.js events module, and from it, we're just going to use the EventEmitter class. As I'll demonstrate in a bit, this class will allow us to emit a custom received event for which applications can listen, so that they know when a message has been pulled off of a queue.

The final module require'd is the Node.js util module, which will be used to do a little bit of class inheritance momentarily.

Next up is the constructor function. It will just take in a single argument, opts, which will be an object containing all of the configuration options (more on this later).

"use strict";

var amqp = require('amqplib');
var EventEmitter = require('events').EventEmitter;
var util = require('util');

function RabbitMQConsumer(opts) {
}

Since we want the RabbitMQConsumer class to be able to emit "message received" events, we'll go ahead and have it inherit from the EventEmitter class we just pulled in by using the inherits function from the util module.

"use strict";

var amqp = require('amqplib');
var EventEmitter = require('events').EventEmitter;
var util = require('util');

function RabbitMQConsumer(opts) {
}

utils.inherits(RabbitMQConsumer, EventEmitter);

The final thing we're going to do before moving on to discuss the opts object is export the constructor function by throwing in a module.exports statement at the end of the script file:

"use strict";

var amqp = require('amqplib');
var EventEmitter = require('events').EventEmitter;
var util = require('util');

function RabbitMQConsumer(opts) {
}

utils.inherits(RabbitMQConsumer, EventEmitter);

module.exports = RabbitMQConsumer;

OPTIONS

The "options" object passed into the opts parameter in the constructor function will contain all of the necessary information to connect to the RabbitMQ broker, as well as queue and exchange information that will be needed once a connection has been established.

The options object will look like the following:

{ 
    userName: "user",
    password: "abc123",
    host: "rabbitmq.mydomain.com",
    port: 5671,
    virtualHost: "home",
    exchangeName: "home_exchange",
    exchangeType: "topic",
    exchangeOptions: {
        durable: true,
        internal: false,
        autoDelete: false
    },
    queueName: "roku_remote_queue",
    queueOptions: {
        exclusive: false,
        durable: true,
        autoDelete: false
    },
    routingKey: "home.remote.roku"
}

I'll just quickly run through each of the options here before going any further, so that you have a better understanding of what's being passed around and why.

The userName and password properties just contain the credentials needed to connect to your RabbitMQ broker. On your server, you should have a number of accounts with different permissions, so these properties will contain the information for the account you want to use for consuming messages.

The host and port properties contain identifying information for your broker. In other words, the host property contains the URI for your broker, and the port property indicates on which port the broker is listening on your server.

Typically, the port number is 5671 for AMQP connections and 8883 for MQTT connections. Since we're talking about AMQP in this tutorial, I just listed 5671 in the sample object, but remember to change that if your broker listens on a different port.

The virtualHost property contains the name of the virtual host containing the exchanges and queues from which you're consuming messages. Virtual hosts can be thought of as logical groupings of various system resources, such as exchanges, queues, user permissions, etc. You can read more about them here if you'd like a deeper explanation.

Regarding the exchange data, the exchangeName property is the name of the exchange routing the messages to the various queues from which you want to consume the messages, the exchangeType property will contain one of four values (direct, fanout, topic, header), which you can read more about here, and the exchangeOptions property is another object containing different attributes for the exchange, all of which are boolean types.

The durable property indicates whether or not the exchange should be nuked when the message broker restarts, the internal property indicates whether or not external clients (such as a JavaScript publishing app) can publish messages to the exchange (a value of false), or if publishing to this exchange is only permissable by other exchanges (a value of true), and the autoDelete property indicates whether or the exchange should be destroyed after the final queue bound to it is unbound.

The queue properties are similar to the exchange properties, but to ensure there's no confusion, here's a quick rundown of what they all mean. The queueName property indicates the name of the queue from which you want to consume messages, and the queueOptions property is another object containing some attributes of the queue from which you're reading.

The durable property means the same thing with queues as it does with exchanges, so if you give this property a value of false, the queue will be nuked when the broker restarts, and a value of true will preserve the queue.

The autoDelete property is similar to the exchange's autoDelete property in that if you set this property to true, the queue will be deleted after all clients disconnect (a value of false will preserve the queue after all clients have disconnected).

And finally, the exclusive property is unique to queues, but essentially, if you set this property to true, then the queue can only be accessed on the connection that created it (a value of false will allow the queue to be shared across many connections).

The final property in the "options" object depicted above is the routingKey, which is a string containing a pattern that tells the exchange to which queues the published messages should be routed.

I know I kinda breezed through these properties, but I've written about them before in my "Python Consumer" tutorial, so if you want a little more information on them, be sure to check out that article. That, or you can visit the official RabbitMQ site to learn more about exchanges and queues.

Now that we know what's being passed into the constructor function, let's go ahead and add some code to store these options locally, so that they can be used later.

Since this class is being designed in a way that you can have multiple instances of it, we'll need a way to store the options for each instance.

For this, we'll use a Map, which I'll just call priv (a common notation I've come across, which is just short for "private"):

"use strict";

var amqp = require('amqplib');
var EventEmitter = require('events').EventEmitter;
var util = require('util');

var priv = new Map();

function RabbitMQConsumer(opts) {
}

utils.inherits(RabbitMQConsumer, EventEmitter);

module.exports = RabbitMQConsumer;

A Map essentially stores key/value pairs, just like a Dictionary that you might see in other languages.

The key will be a reference to the class instance, itself, and the value will be the "options" object passed into the opts parameter in the constructor function.

To assign a value to a key, we'll use the set function on the Map:

priv.set(this, opts);

Placing this line inside the constructor function should result in your script file looking like the following:

"use strict";

var amqp = require('amqplib');
var EventEmitter = require('events').EventEmitter;
var util = require('util');

var priv = new Map();

function RabbitMQConsumer(opts) {
    priv.set(this, opts);
}

utils.inherits(RabbitMQConsumer, EventEmitter);

module.exports = RabbitMQConsumer;

BUILDING THE URI

Now that we have a basic foundation laid for our consumer class, let's move on to actually connecting to the server.

I'm only going to be adding a single function, consume, to the class, which will be responsible for connecting to a RabbitMQ broker and pulling messages off a specified queue.

Since I want the consume function to be available to all objects of the RabbitMQConsumer type, I'm going to add it to the RabbitMQConsumer prototype:

RabbitMQConsumer.prototype.consume = function() {};

In order to connect to the broker, we're gong to have to give the connection code a URI, which will contain the credentials, host, port, and virtual host information I discussed above.

I use secure connections for my consuming applications, so the scheme I'm going to specify in the URI is amqps. For non-secure connections, you would just use amqp (without the "s").

So, in the new consume function, I'll add a local variable, uri, and will assign it the value of "amqps://":

RabbitMQConsumer.prototype.consume = function() { 
    var uri = "amqps://"; 
};

In the URI, we need to specify the user name, password, host, port, and virtual host. Since we pulled all of that information in via the opts parameter and stored it in our priv Map in the constructor function, we can retrieve it here using the get function on the Map by supplying this function the current instance (i.e. this) as the key:

RabbitMQConsumer.prototype.consume = function() { 
    var config = priv.get(this); 
    var uri = "amqps://"; 
};

The amqp URI has the following syntax:

"amqps://user_name:password@host:port/virtual_host"

so we'll plug in the appropriate values using the config object we retrieved from our priv Map:

RabbitMQConsumer.prototype.consume = function() { 
    var config = priv.get(this); 
    var uri = `amqps://${config.userName}:${config.password}@${config.host}:${config.port}/${config.virtualHost}`;
};

Plugging this new consume function into the rest of the script file should produce the following:

"use strict";

var amqp = require('amqplib');
var EventEmitter = require('events').EventEmitter;
var util = require('util');

var priv = new Map();

function RabbitMQConsumer(opts) {
    priv.set(this, opts);
}

utils.inherits(RabbitMQConsumer, EventEmitter);

RabbitMQConsumer.prototype.consume = function() {
    var config = priv.get(this);
    var uri = `amqps://${config.userName}:${config.password}@${config.host}:${config.port}/${config.virtualHost}`;
};

module.exports = RabbitMQConsumer;

If you want to read more about the URI specification for RabbitMQ, you can check out this tutorial on the RabbitMQ website.

CONNECTING TO THE BROKER

Now that we have the connection string built, it's time to connect to the broker.

For this, we'll create a new function, openConnection, inside the consume function we just added, which will encompass all of the connection functionality, as well as the exchange and queue creation.

Inside the consume function, create a new, local variable called openConnection and assign it a new function, like so:

RabbitMQConsumer.prototype.consume = function() {
    var config = priv.get(this);
    var uri = `amqps://${config.userName}:${config.password}@${config.host}:${config.port}/${config.virtualHost}`;

    var openConnection = function() {};
};

Next, call the connect function on the amqp object we pulled in at the start of this tutorial, and supply it an encoded version of the URI string we just built in the last section:

RabbitMQConsumer.prototype.consume = function() { 
    var config = priv.get(this); 
    var uri = `amqps://${config.userName}:${config.password}@${config.host}:${config.port}/${config.virtualHost}`;

    var openConnection = function() {
        amqp.connect(encodeURI(uri));
    };
};

We encode the URI as a precautionary measure, so that way if there are any special characters in any of the values supplied by the "options" object, they're handled appropriately.

The amqplib offers a Promise API, and that's exactly what we'll be using for the remainder of the code.

The connect function we just called returns a promise, which, when resolved, supplies us with a connection object. So, we'll use the .then() notation and supply a function for the resolve argument that takes in a connection object.

RabbitMQConsumer.prototype.consume = function() {
    var config = priv.get(this);
    var uri = `amqps://${config.userName}:${config.password}@${config.host}:${config.port}/${config.virtualHost}`;
    
    var openConnection = function() {
        amqp.connect(encodeURI(uri))
            .then(function(conn) {
                  });
    };
};

We'll just quickly tack on a .catch on the Promise returned from the connect function to handle any errors that come up during the initial connection, which will just simply be fed to a call to console.warn.

(NOTE: For simplicity, I will not be adding any reject functions other than on the initial connection Promise, so if you want proper error handling, you'll need to supply your own functions that make sense for your applications.)

RabbitMQConsumer.prototype.consume = function() {
    var config = priv.get(this);
    var uri = `amqps://${config.userName}:${config.password}@${config.host}:${config.port}/${config.virtualHost}`;
    
    var openConnection = function() {
        amqp.connect(encodeURI(uri))
            .then(function(conn) {
                  })
            .catch(console.warn);
    };
};

Now that we have a connection established, I want to use this opportunity to add a couple of code blocks for handling a couple of scenarios.

The first scenario is when the Node.js process receives a 'SIGINT' event. When this happens, we want to make sure our connection is closed, so just inside our new resolve function, we need to add an event handler to close the connection on a 'SIGINT' event:

RabbitMQConsumer.prototype.consume = function() { 
    var config = priv.get(this); 
    var uri = `amqps://${config.userName}:${config.password}@${config.host}:${config.port}/${config.virtualHost}`;
    
    var openConnection = function() {
        amqp.connect(encodeURI(uri))
            .then(function(conn) {
                      process.once('SIGINT', function() {
                          conn.close();
                      });
                  })
            .catch(console.warn);
    };
};

Another type of scenario we might run into is an error with an established connection (e.g. maybe a network failure occurred). In this scenario, however, we want to reopen the connection, so just below the 'SIGINT' handler we just added, let's go ahead and place a connection 'error' handler function that will log the error to the console and then call back into the openConnection function we're already in:

RabbitMQConsumer.prototype.consume = function() { 
    var config = priv.get(this); 
    var uri = `amqps://${config.userName}:${config.password}@${config.host}:${config.port}/${config.virtualHost}`;
    
    var openConnection = function() {
        amqp.connect(encodeURI(uri))
            .then(function(conn) {
                      process.once('SIGINT', function() {
                          conn.close();
                      });

                      conn.on('error', function(err) {
                                console.warn(err.message);
                                openConnection();
                             });
                  })
            .catch(console.warn);
    };
};

Of course, this is mediocre and very minimal error handling at its finest, but I just wanted to at least bring these types of scenarios to your attention, so that you can be thinking of how you might need to handle them in your own applications.

CREATING A CHANNEL

Using the connection object passed into our resolve function, we need to create a new channel. In short, a channel is like a session through which various operations are performed (more on this in a bit).

In order to create a channel, we'll use the createChannel function on the connection object passed into our function. This call will also return a Promise, and when it resolves, will hand us an established/constructed channel object (the ch parameter in the snippet below) to be used for other operations:

RabbitMQConsumer.prototype.consume = function() {
    var config = priv.get(this);
    var uri = `amqps://${config.userName}:${config.password}@${config.host}:${config.port}/${config.virtualHost}`;
    
    var openConnection = function() {
        amqp.connect(encodeURI(uri))
            .then(function(conn) {
                    process.once('SIGINT', function() {
                                    conn.close();
                                });
                                
                    conn.on('error', function(err) {
                                console.warn(err.message);
                                openConnection();
                            });
                            
                    conn.createChannel()
                        .then(function(ch) {
                             });
                 })
             .catch(console.warn);
    };
};

CREATING THE EXCHANGE

Using the channel object passed into the createChannel's resolve function, we can now create the exchange, which is responsible for routing the messages to their proper destinations (i.e. queues).

To create the exchange, we call the assertExchange function on the channel object. This function "asserts an exchange into existence", which essentially means that it's first verified whether or not an exchange with the specified name and attributes already exists. If it does exist, and the properties are different than the ones supplied, then things will blow up. Otherwise, the exchange is created and everything is just dandy.

RabbitMQConsumer.prototype.consume = function() {
    var config = priv.get(this);
    var uri = `amqps://${config.userName}:${config.password}@${config.host}:${config.port}/${config.virtualHost}`;

    var openConnection = function() {
        amqp.connect(encodeURI(uri))
            .then(function(conn) {
                    process.once('SIGINT', function() {
                                    conn.close();
                                });
                                
                    conn.on('error', function(err) {
                                console.warn(err.message);
                                openConnection();
                            });
                            
                    conn.createChannel()
                        .then(function(ch) {
                                ch.assertExchange(config.exchangeName, config.exchangeType, config.exchangeOptions)
                                  .then(function() {
                                       });
                              });
                 })
             .catch(console.warn);
    };
};

As you can see, the first argument we supply to the assertExchange function is the exchange's name, which we pull from the config passed into our constructor function.

The second argument is the exchange type, which, as I mentioned before, can be a handful of different values (e.g. fanout, topic, direct, or header).

The final argument is the exchange options object discussed above, which contains the various attributes of the exchange.

Like the other calls made thus far, the assertExchange function returns a Promise. The response from this call, which is passed to the resolve function, is just the name of the exchange echoed back to us. We don't actually need it in this case, so we'll just ignore it and not bother taking it in as an argument to our function.

CREATING THE QUEUE

Now that we have the exchange created, it's time to create the queue, which is done in a very similar fashion as creating the exchange.

The call we need to make is to a function on the channel object called assertQueue, which takes in the name of the queue and the options object containing all of the attributes of the queue:

RabbitMQConsumer.prototype.consume = function() {
    var config = priv.get(this);
    var uri = `amqps://${config.userName}:${config.password}@${config.host}:${config.port}/${config.virtualHost}`;

    var openConnection = function() {
        amqp.connect(encodeURI(uri))
            .then(function(conn) {
                    process.once('SIGINT', function() {
                                    conn.close();
                                });
                                
                    conn.on('error', function(err) {
                                console.warn(err.message);
                                openConnection();
                            });
                            
                    conn.createChannel()
                        .then(function(ch) {
                                ch.assertExchange(config.exchangeName, config.exchangeType, config.exchangeOptions)
                                  .then(function() {
                                      return ch.assertQueue(config.queueName, config.queueOptions);
                                  })
                                  .then(function(queueData) {
                                       });
                                  });
                })
            .catch(console.warn);
    };
};

Of course, we pull the queue's name and options from the configuration object passed into the constructor function, but you may notice that I'm doing something a little different here. Rather than tacking a .then() onto the end of this call, I'm returning the response from the assertQueue call. Why?

The reason is because there are still a number of calls that we haven't seen yet that will need to be resolved, and if I continue down the path I have been going, it would turn into a nested resolve function nightmare. This way, I can essentially just chain the resolve functions from the various promises together, which makes the code a little more readable and less Christmas tree-like.

The new resolve function takes in an object containing queue data from the call to assertQueue. This object contains the name of the queue, the message count, and the consumer count:

{
    queueName: 'queue name',
    messageCount: 12,
    consumerCount: 3
}

Unlike with the assertExchange function, you don't have to specify a queue name. If you leave this field blank, the system will assign a name to the queue, and then return that name in the response.

If you do specify a name, which I recommend doing, then the name will just be echoed back to you in the queueName field on the response object.

It's for this reason that I'm taking in the queue data as an argument to the resolve function, just in case a queue name is not passed in. This name is crucial for the next step, so we need to make sure we know what it is.

BINDING THE QUEUE

Now that both the queue and exchange have been created, we need to bind the queue to the exchange. Essentially what this does is set up a routing path from an exchange to a queue, and you can have multiple queues bound to the same exchange.

To do the binding, we call the bindQueue function on the channel object and give it the queue name (pulled from the previous call's response), exchange name (pulled from the configuration object), and routing key (also pulled from the configuration object):

RabbitMQConsumer.prototype.consume = function() {
    var config = priv.get(this);
    var uri = `amqps://${config.userName}:${config.password}@${config.host}:${config.port}/${config.virtualHost}`;

    var openConnection = function() {
        amqp.connect(encodeURI(uri))
            .then(function(conn) {
                    process.once('SIGINT', function() {
                                    conn.close();
                                });
                                
                    conn.on('error', function(err) {
                                console.warn(err.message);
                                openConnection();
                            });
                            
                    conn.createChannel()
                        .then(function(ch) {
                                ch.assertExchange(config.exchangeName, config.exchangeType, config.exchangeOptions)
                                  .then(function() {
                                          return ch.assertQueue(config.queueName, config.queueOptions);
                                        })
                                  .then(function(queueData) {
                                          ch.bindQueue(queueData.queue, config.exchangeName, config.routingKey);
                                          return data.queue;
                                       })
                                  .then(function(queueName) {
                                       });
                             });
                })
            .catch(console.warn);
    };
};

Wait, what's going on with this new resolve function? Why aren't we returning the Promise from the bindQueue call?

Again, a queue name can be blank, so to ensure every function call that needs the name gets it, I'm just passing along the name returned from the server. This name will be necessary in the next step, as you'll see here shortly, so I'm just simply returning that out of this function call. The bindQueue response has no data on it, so there wouldn't be anything from that call to pass along, anyway.

CONSUMING MESSAGES

We're now in the home stretch with just a few more pieces to go.

Now that the queue is bound to the exchange, we can start consuming messages. To do that, we call the consume function on the channel object and give it the name of the queue from which we want to consume messages, as well as a callback function that will handle our received messages.

Before making the call to the consume function, let's skip down just a bit and define a new function that will take in the message object received when a message is pulled from a queue:

RabbitMQConsumer.prototype.consume = function() {
    var self = this;
    var config = priv.get(self);
    var uri = `amqps://${config.userName}:${config.password}@${config.host}:${config.port}/${config.virtualHost}`;

    var openConnection = function() {
        amqp.connect(encodeURI(uri))
            .then(function(conn) {
                    process.once('SIGINT', function() {
                                    conn.close();
                                });
                                
                    conn.on('error', function(err) {
                                console.warn(err.message);
                                openConnection();
                            });
                            
                    conn.createChannel()
                        .then(function(ch) {
                                ch.assertExchange(config.exchangeName, config.exchangeType, config.exchangeOptions)
                                  .then(function() {
                                          return ch.assertQueue(config.queueName, config.queueOptions);
                                        })
                                  .then(function(queueData) {
                                          ch.bindQueue(queueData.queue, config.exchangeName, config.routingKey);
                                          return data.queue;
                                        })
                                  .then(function(queueName) {
                                       });
                                  
                                function consumeMessage(msg) {
                                    self.emit("received", msg.content.toString());
                                    ch.ack(msg);
                                }
                            });
                })
            .catch(console.warn);
    };
};

Okay, so there are a number of things going on here that I want to point out.

First, I placed the function where I did because I needed access to the ch channel object. Of course, I could have placed the function in a different spot and passed the ch object to it, but I only need to reference this function once, so rather than taking in an extra argument, it was just easier to place it in a scope that has access to the ch object.

Next, I added an emit call, which will emit a "received" event, so that any application using this consumer object can know when a message is received. In order to emit the event, however, I had to gain access to the current instance of RabbitMQConsumer, which inherits the emit function from EventEmitter. Simply adding this.emit would not have worked because of the way the this keyword works in JavaScript.

So, at the top of the consume function, I declared a variable called self and assigned it this, which is a reference to the current instance of the RabbitMQConsumer class.

To avoid confusion, I went ahead and replaced this in the priv.get call with the new self variable, as well, so that I didnt have two references to this. Confusing, I know, but that's JavaScript for ya :-).

When the received event is emitted, the content of the message object pulled from the queue is passed along, so that any application using this object only gets the text passed along in the message queue.

This post is already insanely long, so I'm not going to go into the details of what all is returned on the message object. If you want to read more about it, however, please check out the amqplib documentation on this matter.

The final piece of the new consume message is a call to ack on the channel object, which just simply acknowledges that the message was received.

Now, we can go back up to the empty resolve function and plug in a call to the consume function on the channel:

RabbitMQConsumer.prototype.consume = function() {
    var self = this;
    var config = priv.get(self);
    var uri = `amqps://${config.userName}:${config.password}@${config.host}:${config.port}/${config.virtualHost}`;

    var openConnection = function() {
        amqp.connect(encodeURI(uri))
            .then(function(conn) {
                    process.once('SIGINT', function() {
                                    conn.close();
                                });
                                
                    conn.on('error', function(err) {
                                console.warn(err.message);
                                openConnection();
                            });
                            
                    conn.createChannel()
                        .then(function(ch) {
                                ch.assertExchange(config.exchangeName, config.exchangeType, config.exchangeOptions)
                                  .then(function() {
                                          return ch.assertQueue(config.queueName, config.queueOptions);
                                        })
                                  .then(function(queueData) {
                                          ch.bindQueue(queueData.queue, config.exchangeName, config.routingKey);
                                          return data.queue;
                                        })
                                  .then(function(queueName) {
                                          ch.consume(queueName, consumeMessage);
                                        });
                                        
                                function consumeMessage(msg) {
                                    self.emit("received", msg.content.toString());
                                    ch.ack(msg);
                                }
                            });
                })
            .catch(console.warn);
    };
};

WRAPPING UP

Woohoo! You made it to the end. If you're still hanging on, mad props to you.

The final function I want to add to this is one that indicates that everything is connected and the consumer is just waiting for messages to arrive in the specified queue. Aside from that, all that's left to do is add a call at the bottom of the consume prototype function to the openConnection function to ensure everything is connected and created when the consume function is called in your applications:

RabbitMQConsumer.prototype.consume = function() {
    var self = this;
    var config = priv.get(self);
    var uri = `amqps://${config.userName}:${config.password}@${config.host}:${config.port}/${config.virtualHost}`;
    
    var openConnection = function() {
        amqp.connect(encodeURI(uri))
            .then(function(conn) {
                    process.once('SIGINT', function() {
                                    conn.close();
                                });
                                
                    conn.on('error', function(err) {
                                console.warn(err.message);
                                openConnection();
                            });
                            
                    conn.createChannel()
                        .then(function(ch) {
                                ch.assertExchange(config.exchangeName, config.exchangeType, config.exchangeOptions)
                                  .then(function() {
                                          return ch.assertQueue(config.queueName, config.queueOptions);
                                        })
                                  .then(function(queueData) {
                                          ch.bindQueue(queueData.queue, config.exchangeName, config.routingKey);
                                          return data.queue;
                                        })
                                  .then(function(queueName) {
                                          ch.consume(queueName, consumeMessage);
                                        })
                                  .then(function() {
                                          console.log(' [*] Waiting for messages.');
                                        });
                                        
                                function consumeMessage(msg) {
                                    self.emit("received", msg.content.toString());
                                    ch.ack(msg);
                                }
                            });
                })
            .catch(console.warn);
    };
    
    openConnection();
};

All said and done, your full script should now look like this:

"use strict";

var amqp = require('amqplib');
var EventEmitter = require('events').EventEmitter;
var util = require('util');

var priv = new Map();

function RabbitMQConsumer(opts) {
    priv.set(this, opts);
}

utils.inherits(RabbitMQConsumer, EventEmitter);

RabbitMQConsumer.prototype.consume = function() {
    var self = this;
    var config = priv.get(self);
    var uri = `amqps://${config.userName}:${config.password}@${config.host}:${config.port}/${config.virtualHost}`;
    
    var openConnection = function() {
        amqp.connect(encodeURI(uri))
            .then(function(conn) {
                    process.once('SIGINT', function() {
                                    conn.close();
                                });
                                
                    conn.on('error', function(err) {
                                console.warn(err.message);
                                openConnection();
                            });
                            
                    conn.createChannel()
                        .then(function(ch) {
                                ch.assertExchange(config.exchangeName, config.exchangeType, config.exchangeOptions)
                                  .then(function() {
                                          return ch.assertQueue(config.queueName, config.queueOptions);
                                        })
                                  .then(function(queueData) {
                                          ch.bindQueue(queueData.queue, config.exchangeName, config.routingKey);
                                          return data.queue;
                                        })
                                  .then(function(queueName) {
                                          ch.consume(queueName, consumeMessage);
                                        })
                                  .then(function() {
                                          console.log(' [*] Waiting for messages.');
                                        });
                                        
                                function consumeMessage(msg) {
                                    self.emit("received", msg.content.toString());
                                    ch.ack(msg);
                                }
                            });
                })
            .catch(console.warn);
    };
    
    openConnection();
};

module.exports = RabbitMQConsumer;

USING THE CONSUMER

Now, in order to use your new consumer, you'll want to create a new script file, pull in the RabbitMQConsumer class we just created, create a new instance with a configuration object containing all of the necessary RabbitMQ information we've talked about throughout this post, add an event handler for the received event, and call the consume function to kick it all off:

var RabbitMQConsumer = require('./rabbitmq-consumer'); //specify the name of the file here containing the code we just wrote

var configuration = {
    userName: "user",
    password: "abc123",
    host: "rabbitmq.mydomain.com",
    port: 5671,
    virtualHost: "home",
    exchangeName: "home_exchange",
    exchangeType: "topic",
    exchangeOptions: {
        durable: true,
        internal: false,
        autoDelete: false
    },
    queueName: "roku_remote_queue",
    queueOptions: {
        exclusive: false,
        durable: true,
        autoDelete: false
    },
    routingKey: "home.remote.roku"
};

var consumer = new RabbitMQConsumer(configuration);

consumer.on('received', function(messageText) {
    console.log('Message received:', messageText);
});

consumer.consume();

I hope this post was beneficial and that you learned quite a bit. As long as this tutorial was, I barely scratched the surface of what RabbitMQ and the amqplib package have to offer.

Please check out the documentation on the respective websites if you want to learn more about either.

Enjoy!

Author image

About Tony Thorsen

Father of two, husband of one, Maker of many things. Tinkerer, dreamer, pixel nudger.