Tutorial, Python, RabbitMQ, Development

Creating a RabbitMQ Consumer in Python

In my last tutorial, I showed you how I put messages on a RabbitMQ message queue for my home automation system using pika. I figured since you now know how to put messages on the queue, you'd probably like to know how to pull them off, right Right.

Let's get to it.

In this tutorial, I'm going to show you how to create a RabbitMQ Consumer class that pulls messages off a queue. I use this component heavily in my home automation projects, so hopefully you will find it just as useful.

SETUP

Before jumping right in, I encourage you to check out the "Setting up Your Environment" section in the previous tutorial on publishing messages to the queue. I will be using the same set up for this tutorial, so rather than writing it all over again, I'm just going to direct you back to that section for the 4-1-1 on how to get going.

Once you have your environment set up, go ahead and fire up your code editor of choice, and then create a new file called consumer.py to house the code for the consumer class.

Just like with the publishing class, the first thing you'll need to do is import the pika package:

import pika

Then, create a new class called Consumer with an initialization method that takes in a configuration object for connection, exchange, and queue settings:

import pika

class Consumer:
    def init(self, config):
        self.config = config

CREATING THE CONNECTION

Before we can actually consume a message, we need to establish a connection with the RabbitMQ server.

For this, I will add a new “private” method that creates and returns a new connection object that will be consumed by another method I will define here shortly:

import pika

class Consumer:
    def init(self, config):
        self.config = config
        
    def _create_connection(self):

The first thing we need to do is create an object containing the credentials the Consumer class will need to authenticate with the RabbitMQ server.

This can be done using the PlainCredentials method in the pika package, which accepts a username and password. I will be passing the appropriate credentials into the Consumer through the configuration dictionary, so I will retrieve the appropriate values needed for each parameter from there:

import pika

class Consumer:
    def init(self, config):
        self.config = config
        
    def _create_connection(self):
        credentials = pika.PlainCredentials(self.config['userName'], self.config['password'])

Next, we'll need to create an object containing the necessary connection parameters, such as the host, port number, virtual host, the credentials, and a flag indicating whether or not we want to use SSL.

The host parameter is just the URL of the RabbitMQ server, the port is the port number on which the RabbitMQ server is listening, the virtual host parameter is the name of the virtual host containing the exchanges and queues to which you want to publish messages, the credentials parameter is just the credentials object we just created, and the SSL flag indicates whether or not the connection should be made over a secure channel.

I pass most of this information into the Consumer via the configuration object, except for the SSL flag since it's just always true for me, so I will just pull the appropriate values for each of the arguments from there, and pass them into the ConnectionParameters method in the pika package:

import pika

class Consumer:
    def init(self, config):
        self.config = config

    def _create_connection(self):
        credentials = pika.PlainCredentials(self.config['userName'], self.config['password'])
        parameters = pika.ConnectionParameters(self.config['host'], self.config['port'], self.config['virtualHost'], credentials, ssl=True)

The final step, then, is to create a connection object using the parameters we just created, and return that from the method to be used by another method I'll define in the next section. This can easily be done by returning the result of calling the BlockingConnection method in the pika package:

import pika

class Consumer:
    def init(self, config):
        self.config = config
        
    def _create_connection(self):
        credentials = pika.PlainCredentials(self.config['userName'], self.config['password'])
        parameters = pika.ConnectionParameters(self.config['host'], self.config['port'], self.config['virtualHost'], credentials, ssl=True)
        
        return pika.BlockingConnection(parameters)

CREATING A CONTEXT MANAGER

If you recall from the Publisher tutorial, the connection is a resource that needs to be cleaned up. In that tutorial, I wrapped the body of the publish method with a try...except...finally block to ensure the connection would be closed at the tail-end of the method, even if an exception occurs in the process.

I could take a similar approach with the Consumer class here, but it's not really ideal to open a connection, check for any messages, close the connection, and repeat.

Instead, I'd like to open the connection when the Consumer class is instantiated, and then close it when the instance is no longer needed, so that any application utilizing this class can receive messages on the same open connection so long as it is running.

To satisfy this requirement, I'm going to turn the Consumer class into a Context Manager that opens the connection in an enter() method and closes the connection in an exit() method.

If you're not familiar with Context Managers, check out this excellent post by Jeff Knupp to get a better understanding.

First, I'll add a new method, enter(), that will be responsible for calling the _create_connection method, assigning the result to an instance variable that can be accessed throughout the rest of the class, and then returning a reference to self for the consuming application to use:

import pika

class Consumer:
    def init(self, config):
        self.config = config
        
    def enter(self):
        self.connection = self._create_connection()
        return self
        
    def _create_connection(self):
        credentials = pika.PlainCredentials(self.config['userName'], self.config['password'])
        parameters = pika.ConnectionParameters(self.config['host'], self.config['port'], self.config['virtualHost'], credentials, ssl=True)
        
        return pika.BlockingConnection(parameters)

At this point, the connection is being opened when the Consumer class is instantiated within a with statement, so now, we need to ensure the connection is closed when the Consumer instance goes out of scope, and this is done by calling the close method on the connection object in an exit() method:

import pika

class Consumer:
    def init(self, config):
        self.config = config
        
    def enter(self):
        self.connection = self._create_connection()
        return self
        
    def exit(self, *args):
        self.connection.close()
        
    def _create_connection(self):
        credentials = pika.PlainCredentials(self.config['userName'], self.config['password'])
        parameters = pika.ConnectionParameters(self.config['host'], self.config['port'], self.config['virtualHost'], credentials, ssl=True)
        
        return pika.BlockingConnection(parameters)

CREATING THE EXCHANGE

As I mentioned in the tutorial on creating the Publisher class, I always put the burden of creating the exchanges and queues on the consuming applications. That way, I don't have a bunch of messages hanging out in RabbitMQ land with no place to go. That being the case, I need to make sure the Consumer properly creates the exchange and queue it needs before trying to consume any messages.

Starting with the exchange, I'll create a new private method, _create_exchange, that will take in the communication channel as an argument needed to actually declare the exchange:

import pika

class Consumer:
    def init(self, config):
        self.config = config
        
    def enter(self):
        self.connection = self._create_connection()
        return self
        
    def exit(self, *args):
        self.connection.close()
        
    def _create_exchange(self, channel):
    
    def _create_connection(self):
        credentials = pika.PlainCredentials(self.config['userName'], self.config['password'])
        parameters = pika.ConnectionParameters(self.config['host'], self.config['port'], self.config['virtualHost'], credentials, ssl=True)
        
        return pika.BlockingConnection(parameters)

Just like with the Publisher, I need to call the exchange_declare method on the channel to actually create the exchange on the RabbitMQ server. In the case of the Publisher, however, I didn't want pika to tell RabbitMQ to create a new exchange, and just merely wanted to check for its existence.

Being that I want the Consumer to actually create the exchange, I will need to pass in more arguments to make this happen:

import pika

class Consumer:
    def init(self, config):
        self.config = config
        
    def enter(self):
        self.connection = self._create_connection()
        return self
        
    def exit(self, *args):
        self.connection.close()
        
    def _create_exchange(self, channel):
        exchange_options = self.config['exchangeOptions']
        channel.exchange_declare(exchange=self.config['exchangeName'], exchange_type=self.config['exchangeType'], passive=exchange_options['passive'], durable=exchange_options['durable'], auto_delete=exchange_options['autoDelete'], internal=exchange_options['internal'])
     
    def _create_connection(self):
        credentials = pika.PlainCredentials(self.config['userName'], self.config['password'])
        parameters = pika.ConnectionParameters(self.config['host'], self.config['port'], self.config['virtualHost'], credentials, ssl=True)
        
        return pika.BlockingConnection(parameters)

Each of the argument values are pulled from the configuration object passed into the class, so that the consuming application can dictate all of the parameters. This ensures the exchange is created exactly how the application needs.

Two of the values are pulled directly from the root of the config, exchangeName and exchangeType, along with a third configuration object, exchangeOptions, which is just a simple object containing a number of boolean values, as I'll explain next.

The exchange parameter is just the name of the exchange that is going to be created, and the exchange_type parameter indicates whether the exchange should be a direct, topic, fanout, or header exchange. You can read more about the different exchange types here, if you're interested.

The last four arguments, passive, durable, auto_delete, and internal are boolean flags that dictate the behavior of the exchange, as well as, the declaration call.

We saw the passive argument in the Publisher class, but to recap, this just simply states whether or not the declaration call is a simple check for the existence of the specified exchange - in which case this value needs to be True - versus actually telling RabbitMQ to create the exchange - in which case the value should be False.

The durable flag indicates whether or not the exchange should be destroyed when the message broker restarts. A value of True tells the broker to hold on to the exchange through a reboot, and a value of False tells the broker to destroy it.

The auto_delete parameter tells the broker whether or not the exchange should be deleted when the final queue bound to it is unbound. We'll talk about queues next, but to shed a little more light here, exchanges route messages and are bound to queues.  That being said, if you pass in True for the auto_delete parameter, the exchange will be removed when the final queue is unbound from it. A value of False will allow the exchange to hang around, so that queues can be bound to it again at a later point without having to first redeclare the exchange.

The final parameter you see here is internal, and it indicates whether or not only other exchanges can publish messages to the new exchange. So, if you give it a value of True, then the exchange will not be accessible by outside, publishing applications, and can only be accessed by other exchanges. A value of False, of course, will allow outside applications to publish messages to it.

There are a handful of other arguments you can pass into the exchange_declare method, which you can read about in the pika channel docs, but these arguments have been sufficient for my projects.

CREATING THE QUEUE

As I alluded to just a bit ago, exchanges route messages to queues, and the consumers then pick up the messages directly from the queues, not the exchanges.

In order for an exchange to route the messages appropriate, queues have to be bound to exchanges. I'll touch on the binding piece in the next section, but for now, let's focus on getting a new queue spun up from which the Consumer will read messages.

The queue declaration is done in the same manner as the exchange declaration and involves similar arguments, so I'm just going to add a new method, _create_queue, that will take in a channel and call its queue_declare method:

import pika

class Consumer:
    def init(self, config):
        self.config = config
        
    def enter(self):
        self.connection = self._create_connection()
        return self
        
    def exit(self, *args):
        self.connection.close()
        
    def _create_exchange(self, channel):
        exchange_options = self.config['exchangeOptions']
        channel.exchange_declare(exchange=self.config['exchangeName'], exchange_type=self.config['exchangeType'], passive=exchange_options['passive'], durable=exchange_options['durable'], auto_delete=exchange_options['autoDelete'], internal=exchange_options['internal'])
        
    def _create_queue(self, channel):
        queue_options = self.config['queueOptions']
        channel.queue_declare(queue=self.config['queueName'], passive=queue_options['passive'], durable=queue_options['durable'], exclusive=queue_options['exclusive'], auto_delete=queue_options['autoDelete'])

    def _create_connection(self):
        credentials = pika.PlainCredentials(self.config['userName'], self.config['password'])
        parameters = pika.ConnectionParameters(self.config['host'], self.config['port'], self.config['virtualHost'], credentials, ssl=True)
        
        return pika.BlockingConnection(parameters)

Of course, each of the values for the various parameters are pulled from the class' configuration object, with the first one being queueName, which is assigned to the queue parameter of the channel's queue_declare method.

Just like with the exchange options, I have structured the configuration to include a queueOptions object, which contains a handful of boolean properties for the remaining arguments I want to send to the declaration method.

The passive parameter works the exact same way with queues as it does with exchanges, so if this is set to True, the broker will just merely check for the existence of a queue with the specified name and raise an error if it doesn't exist. Passing in a value of False for this parameter will tell the broker to create the queue if it doesn't already exist.

The durable parameter works the same way with queues as it does with exchanges, so if a value of True is passed in, the queue will remain intact during server reboots. Otherwise, if this parameter is False, the queue will be wiped out whenever a reboot occurs.

Next up is the exclusive parameter, which we didn't see before with the exchange declaration, but this parameter tells the server whether or not the queue can be shared across connections.

In other words, if this parameter is set to True, then only the connection on which the queue was created can access the queue. Once the connection is closed, the queue is then deleted. On the flip side, if this parameter is False, then the queue is shared across many connections and remains intact when the original connection that created the queue is closed.

The final parameter, auto_delete, tells the server whether or not the queue should be deleted when all consumers disconnect from it. A value of True will cause the queue to be nuked when there are no consumers retrieving messages from it, and a value of False will keep the queue around, even if nothing is consuming messages from it.

There are a few other arguments you can pass to the queue_declare method that I haven't needed to use as of yet, but I encourage you to check out the pika documentation on the queue_declare method for more information.

BINDING THE QUEUE

With our methods defined to declare an exchange and queue, we now need to actually bind the queue to the exchange, so that messages can be routed properly.

I'm going to do the exchange and queue declarations, as well as, the queue binding as part of the message consumption workflow, so I'm going to add a new instance method, consume, that will take in a callback function as an argument to allow for applications using this Consumer class to handle received messages as they see fit.

More on this later, but for now, I'll just have the consume method assign the callback function argument to an instance variable that can be accessed by other methods later:

import pika

class Consumer:
    def init(self, config):
        self.config = config
        
    def enter(self):
        self.connection = self._create_connection()
        return self
        
    def exit(self, *args):
        self.connection.close()
        
    def consume(self, message_received_callback):
        self.message_received_callback = message_received_callback
        
    def _create_exchange(self, channel):
        exchange_options = self.config['exchangeOptions']
        channel.exchange_declare(exchange=self.config['exchangeName'], exchange_type=self.config['exchangeType'], passive=exchange_options['passive'], durable=exchange_options['durable'], auto_delete=exchange_options['autoDelete'], internal=exchange_options['internal'])
        
    def _create_queue(self, channel):
        queue_options = self.config['queueOptions']
        channel.queue_declare(queue=self.config['queueName'], passive=queue_options['passive'], durable=queue_options['durable'], exclusive=queue_options['exclusive'], auto_delete=queue_options['autoDelete'])
        
    def _create_connection(self):
        credentials = pika.PlainCredentials(self.config['userName'], self.config['password'])
        parameters = pika.ConnectionParameters(self.config['host'], self.config['port'], self.config['virtualHost'], credentials, ssl=True)
        
        return pika.BlockingConnection(parameters)

Just like with the Publisher, we need to gain access to the communication channel on the connection object, which will be utilized in a number of different places, so I'll just assign the result of calling the channel() method on the connection object to a local variable called channel:

import pika

class Consumer:
    def init(self, config):
        self.config = config
        
    def enter(self):
        self.connection = self._create_connection()
        return self
        
    def exit(self, *args):
        self.connection.close()
        
    def consume(self, message_received_callback):
        self.message_received_callback = message_received_callback
        channel = self.connection.channel()
        
    def _create_exchange(self, channel):
        exchange_options = self.config['exchangeOptions']
        channel.exchange_declare(exchange=self.config['exchangeName'], exchange_type=self.config['exchangeType'], passive=exchange_options['passive'], durable=exchange_options['durable'], auto_delete=exchange_options['autoDelete'], internal=exchange_options['internal'])
        
    def _create_queue(self, channel):
        queue_options = self.config['queueOptions']
        channel.queue_declare(queue=self.config['queueName'], passive=queue_options['passive'], durable=queue_options['durable'], exclusive=queue_options['exclusive'], auto_delete=queue_options['autoDelete'])
        
    def _create_connection(self):
        credentials = pika.PlainCredentials(self.config['userName'], self.config['password'])
        parameters = pika.ConnectionParameters(self.config['host'], self.config['port'], self.config['virtualHost'], credentials, ssl=True)
        
        return pika.BlockingConnection(parameters)

Before actually consuming any messages, we need to call the _create_exchange and _create_queue methods we just defined to ensure the exchange and queue exist, and then bind the queue to the exchange by calling the queue_bind method on the channel object, so that the broker can properly route messages from the exchange to the queue:

import pika

class Consumer:
    def init(self, config):
        self.config = config
        
    def enter(self):
        self.connection = self._create_connection()
        return self
        
    def exit(self, *args):
        self.connection.close()
        
    def consume(self, message_received_callback):
        self.message_received_callback = message_received_callback
        channel = self.connection.channel()
        
        self._create_exchange(channel)
        self._create_queue(channel)
        
        channel.queue_bind(queue=self.config['queueName'], exchange=self.config['exchangeName'], routing_key=self.config['routingKey'])

    def _create_exchange(self, channel):
        exchange_options = self.config['exchangeOptions']
        channel.exchange_declare(exchange=self.config['exchangeName'], exchange_type=self.config['exchangeType'], passive=exchange_options['passive'], durable=exchange_options['durable'], auto_delete=exchange_options['autoDelete'], internal=exchange_options['internal'])
        
    def _create_queue(self, channel):
        queue_options = self.config['queueOptions']
        channel.queue_declare(queue=self.config['queueName'], passive=queue_options['passive'], durable=queue_options['durable'], exclusive=queue_options['exclusive'], auto_delete=queue_options['autoDelete'])

    def _create_connection(self):
        credentials = pika.PlainCredentials(self.config['userName'], self.config['password'])
        parameters = pika.ConnectionParameters(self.config['host'], self.config['port'], self.config['virtualHost'], credentials, ssl=True)
        
        return pika.BlockingConnection(parameters)

The queue_bind method takes the names of the queue and exchange you want bound together, as well as, a routing_key argument, which is a string containing a pattern that tells the exchange to which queues the published messages should be routed.

The Publisher class uses the same routing key when dropping a message on the exchange to ensure the message is placed in the appropriate queue(s).

If you think of an exchange as a post office and queues as individual mailboxes, then the routing key would be the equivalent of someone's address. Of course, routing keys aren't always used, depending on how the exchange is configured, but that's the closest analogy I can think of that might clarify the concepts a bit.

CONSUMING MESSAGES

Now comes the final piece where we actually tell the Consumer class to pick a message up off the queue. W00t!

There are two calls I need to make to accomplish this, with the first call being made to the basic_consume method on the channel. This method tells the broker to spin up a consumer process, which checks for messages on a specified queue, and then registers a callback function that should be executed in the Consumer class when a message is available and has been delivered to the client:

import pika

class Consumer:
    def init(self, config):
        self.config = config
        
    def enter(self):
        self.connection = self._create_connection()
        return self
        
    def exit(self, *args):
        self.connection.close()
        
    def consume(self, message_received_callback):
        self.message_received_callback = message_received_callback
        channel = self.connection.channel()
        
        self._create_exchange(channel)
        self._create_queue(channel)
        
        channel.queue_bind(queue=self.config['queueName'], exchange=self.config['exchangeName'], routing_key=self.config['routingKey'])
        channel.basic_consume(self._consume_message, queue=self.config['queueName'])
        
    def _create_exchange(self, channel):
        exchange_options = self.config['exchangeOptions']
        channel.exchange_declare(exchange=self.config['exchangeName'], exchange_type=self.config['exchangeType'], passive=exchange_options['passive'], durable=exchange_options['durable'], auto_delete=exchange_options['autoDelete'], internal=exchange_options['internal'])
        
    def _create_queue(self, channel):
        queue_options = self.config['queueOptions']
        channel.queue_declare(queue=self.config['queueName'], passive=queue_options['passive'], durable=queue_options['durable'], exclusive=queue_options['exclusive'], auto_delete=queue_options['autoDelete'])

    def _create_connection(self):
        credentials = pika.PlainCredentials(self.config['userName'], self.config['password'])
        parameters = pika.ConnectionParameters(self.config['host'], self.config['port'], self.config['virtualHost'], credentials, ssl=True)
        
        return pika.BlockingConnection(parameters)

If you look at the two arguments I'm passing into the basic_consume message, you'll see that the first one is a non-existent callback function, and the second is the name of the queue from which I want to receive, or consume, messages.

So, why did I specify a callback function that doesn't exist (yet), instead of just using the message_received_callback argument passed into the consume method?

The reason is because pika will pass multiple arguments to the registered callback function, but my applications really only care about the message body, or content. So, rather than asking each application that utilizes this Consumer class to set up a callback function that takes in data it probably doesn't need, I will create a new private method in the Consumer class, _consume_message, to filter out the unused data, and then pass only the message body along to the message_received_callback function sent into the consume method:

import pika

class Consumer:
    def init(self, config):
        self.config = config
        
    def enter(self):
        self.connection = self._create_connection()
        return self
        
    def exit(self, *args):
        self.connection.close()
        
    def consume(self, message_received_callback):
        self.message_received_callback = message_received_callback
        channel = self.connection.channel()
        
        self._create_exchange(channel)
        self._create_queue(channel)
        
        channel.queue_bind(queue=self.config['queueName'], exchange=self.config['exchangeName'], routing_key=self.config['routingKey'])
        channel.basic_consume(self._consume_message, queue=self.config['queueName'])
        
    def _create_exchange(self, channel):
        exchange_options = self.config['exchangeOptions']
        channel.exchange_declare(exchange=self.config['exchangeName'], exchange_type=self.config['exchangeType'], passive=exchange_options['passive'], durable=exchange_options['durable'], auto_delete=exchange_options['autoDelete'], internal=exchange_options['internal'])
        
    def _create_queue(self, channel):
        queue_options = self.config['queueOptions']
        channel.queue_declare(queue=self.config['queueName'], passive=queue_options['passive'], durable=queue_options['durable'], exclusive=queue_options['exclusive'], auto_delete=queue_options['autoDelete'])
        
    def _create_connection(self):
        credentials = pika.PlainCredentials(self.config['userName'], self.config['password'])
        parameters = pika.ConnectionParameters(self.config['host'], self.config['port'], self.config['virtualHost'], credentials, ssl=True)
        
        return pika.BlockingConnection(parameters)
        
    def _consume_message(self, channel, method, properties, body):
        self.message_received_callback(body)
        channel.basic_ack(delivery_tag=method.delivery_tag)

You can see that pika will pass along four different arguments to the registered callback function when a message is received (you can read more about these here, if you're interested), but the only one with which I'm concerned is the body, which contains the actual message content.

I simply just pass this along to the message_received_callback function that was passed into the consume method above, and then call the basic_ack method on the channel to acknowledge that the message was actually received.

Now, to put a bow on this tutorial, the final call I need to make in order to set everything in motion is to the start_consuming method on the channel object, which I will add just under the call to channel.basic_consume in the consume method:

import pika

class Consumer:
    def init(self, config):
        self.config = config
        
    def enter(self):
        self.connection = self._create_connection()
        return self

    def exit(self, *args):
        self.connection.close()
        
    def consume(self, message_received_callback):
        self.message_received_callback = message_received_callback
        channel = self.connection.channel()
        
        self._create_exchange(channel)
        self._create_queue(channel)
        
        channel.queue_bind(queue=self.config['queueName'], exchange=self.config['exchangeName'], routing_key=self.config['routingKey'])
        channel.basic_consume(self._consume_message, queue=self.config['queueName'])
        channel.start_consuming()
        
    def _create_exchange(self, channel):
        exchange_options = self.config['exchangeOptions']
        channel.exchange_declare(exchange=self.config['exchangeName'], exchange_type=self.config['exchangeType'], passive=exchange_options['passive'], durable=exchange_options['durable'], auto_delete=exchange_options['autoDelete'], internal=exchange_options['internal'])
        
    def _create_queue(self, channel):
        queue_options = self.config['queueOptions']
        channel.queue_declare(queue=self.config['queueName'], passive=queue_options['passive'], durable=queue_options['durable'], exclusive=queue_options['exclusive'], auto_delete=queue_options['autoDelete'])
        
    def _create_connection(self):
        credentials = pika.PlainCredentials(self.config['userName'], self.config['password'])
        parameters = pika.ConnectionParameters(self.config['host'], self.config['port'], self.config['virtualHost'], credentials, ssl=True)
        
        return pika.BlockingConnection(parameters)
        
    def _consume_message(self, channel, method, properties, body):
        self.message_received_callback(body)
        channel.basic_ack(delivery_tag=method.delivery_tag)

Since I am using a BlockingConnection in the _create_connection method, the start_consuming method needs to be called on the channel object to kick off the continuous, blocking loop for constant message consumption.

And with that, you can now add this Consumer class to your own applications to retrieve any published messages on your RabbitMQ server.

Enjoy!

Author image

About Tony Thorsen

Father of two, husband of one, Maker of many things. Tinkerer, dreamer, pixel nudger.