Tutorial, Python, RabbitMQ

Creating a RabbitMQ Publisher in Python

I chose RabbitMQ as the messaging backbone for my personal digital assistant, T.I.T.U.S., as I wanted a common way to publish and consume messages for each application and device connected to the network.

I wrote a blog post on the overall architecture of T.I.T.U.S. a little while back, so if you refer to the diagram about 2/3 of the way down, the “messaging backbone” to which I referred above corresponds to the vertical arrow in the diagram labelled, “T.I.T.U.S. Message Bus.”

Since a lot of the projects I’m going to be writing about and creating courses on down the road will tie into this message bus, I wanted to take the time to show you how the applications hook into it, starting with the publisher.

THE CONTROLLER

Currently, the Controller (depicted at the top of the blog post's diagram) is the only component that publishes messages to the bus. The reason for this is that I wanted a central place for any application or device to connect in order to kick off messages to other devices or applications on the network.

Yes, this introduces a single point of failure, so if my Controller goes down, then no messages will be able to flow through the system. The flip side, though, is to allow any application to publish messages to the bus directly, but then each app would have to know all of the routing details for each of the other applications and devices on the network, which just seems extremely redundant and inefficient.

However, being that each application already has a connection to RabbitMQ for consuming messages, I could easily introduce a publishing component to each application that would allow for direct publishing to the bus, if I decided to change things up later on.

But for now, I’m just going to leave the Controller as the single publisher.

Without going into too much detail, the controller is just a big REST API built with Python, Flask and SQLAlchemy, so each application can just call into various endpoints that call into a centralized publishing component with the necessary data the consuming application needs.

SETTING UP YOUR ENVIRONMENT

I used Python 3.5, but I think any 3.x version will suffice for the sake of this project, so make sure you have some version of Python 3 installed.

I find it good practice to set up a virtual environment when starting new Python projects, so if you don’t already have virtualenv installed, I would set that up next. Here are some instructions to help you out if you need them.

The next step, then, is to create a project directory and a new virtual environment.

For my project, I just created a directory called rabbitmq:

mkdir rabbitmq

and then created a virtual environment inside of that called env:

virtualenv env

Once that’s created, you then need to activate the new environment before installing any packages, which you do by running the activate script in the virtual environment’s bin folder:

source env/bin/activate

If you’re on a Windows machine, you’ll need to call the activate batch script in the same bin directory:

.\env\bin\activate.bat

INSTALLING PIKA

If you read through the documentation for RabbitMQ, you’ll find that it supports multiple protocols for different types of applications. The one I chose for my Controller project is AMQP, as it’s the core protocol supported by RabbitMQ, is very robust, and is recommended by the RabbitMQ team.

On the RabbitMQ site, you’ll also find a listing of available client tools and libraries that you can use to communicate with the server for various programming languages that support the different protocols. The Python section currently only has three listed, so I just chose the first one, pika, as it supports AMQP.

Installing pika is really straightforward and can be brought into your project folder using pip:

pip install pika

This will install pika in your virtual environment (env), so whenever you deactivate the virtual environment or reboot your machine, pika will be inaccessible, unless you have it installed as a global package, as well.

CREATING THE PUBLISHER CLASS

Using your code editor or IDE of choice (I’m hooked on PyCharm at the moment, but you do you), go ahead and create a new file called publisher.py, which will house the logic for publishing messages to a queue.

Once you have that set up, the first thing you will need to do is import the pika package:

import pika

Next, we’ll create a new class with an initialization method, so that we can spin up new instances with different configurations if need be:

import pika

class Publisher:
    def init(self, config):
        self.config = config

CONFIGURATION

The new publisher class will need a handful of values that it will then pass to pika as arguments when establishing a connection to the server and publishing messages to an exchange.

You can certainly hard-code the necessary parameter values by placing them in constants or local variables, but that approach doesn’t really allow this new Publisher class to be reused for different applications.

Instead, the approach I took was to pass in a configuration object that contains a number of key/value pairs (i.e. a dictionary) containing the configuration data needed by pika.

This allows me to then store the actual configuration data in a database, JSON file, environment variables, etc., convert it to a dictionary, and then pass that into the Publisher class when it’s initialized.

This puts the responsibility on the application utilizing this Publisher class to figure out how to retrieve the configuration data and put it into a dictionary that the Publisher can then use.

CREATING THE CONNECTION

Before we can actually send a message to an exchange, we need to establish a connection with the RabbitMQ server.

For this, I will add a “private” method that creates and returns a new connection object that will be consumed by another method I will define here shortly:

import pika

class Publisher:
    def init(self, config):
        self.config = config
        
    def _create_connection(self):

The first thing we need to do is create an object containing the credentials the Publisher class will need to authenticate with the RabbitMQ server. This can be done using the PlainCredentials method in the pika package, which accepts a username and password.

I will be passing the appropriate credentials into the Publisher through the configuration dictionary, so I will retrieve the appropriate values needed for each parameter from there:

import pika

class Publisher:
    def init(self, config):
        self.config = config
        
    def _create_connection(self):
        credentials = pika.PlainCredentials(self.config['userName'], self.config['password'])

Next, we'll need to create an object containing the necessary connection parameters, such as the host, port number, virtual host, the credentials, and a flag indicating whether or not we want to use SSL.

The host parameter is just the URL of the RabbitMQ server, the port is the port number on which the RabbitMQ server is listening, the virtual host parameter is the name of the virtual host containing the exchanges and queues to which you want to publish messages, the credentials parameter is the credentials object we just created, and the SSL flag indicates whether or not the connection should be made over a secure channel.

I pass most of this information into the Publisher via the configuration object, except for the SSL flag since it's just always true for me, so I will just pull the appropriate values for each of the arguments from there, and pass them into the ConnectionParameters method in the pika package:

import pika

class Publisher:
    def init(self, config):
        self.config = config
        
    def _create_connection(self):
        credentials = pika.PlainCredentials(self.config['userName'], self.config['password'])
        parameters = pika.ConnectionParameters(self.config['host'], self.config['port'], self.config['virtualHost'], credentials, ssl=True)

The final step, then, is to create a connection object using the parameters we just created and return that from the method to be used by another method I'll define in the next section. This can easily be done by returning the result of calling the BlockingConnection method in the pika package:

import pika

class Publisher:
    def init(self, config):
        self.config = config

    def _create_connection(self):
        credentials = pika.PlainCredentials(self.config['userName'], self.config['password'])
        parameters = pika.ConnectionParameters(self.config['host'], self.config['port'], self.config['virtualHost'], credentials, ssl=True)
        
        return pika.BlockingConnection(parameters)

PUBLISHING A MESSAGE

Now that we have a way to connect to the RabbitMQ sever, it's time to add the appropriate method to actually publish a message to an exchange.

The first step is to define a new instance method, publish, that will take in a message to be published:

import pika

class Publisher:
    def init(self, config):
        self.config = config
        
    def publish(self, message):

    def _create_connection(self):
        credentials = pika.PlainCredentials(self.config['userName'], self.config['password'])
        parameters = pika.ConnectionParameters(self.config['host'], self.config['port'], self.config['virtualHost'], credentials, ssl=True)
        
        return pika.BlockingConnection(parameters)

This method will first need to create a new connection, so we'll assign the result of calling self._create_connection() to a new variable, connection:

import pika

class Publisher:
    def init(self, config):
        self.config = config
        
    def publish(self, message):
        connection = self._create_connection()
        
    def _create_connection(self):
        credentials = pika.PlainCredentials(self.config['userName'], self.config['password'])
        parameters = pika.ConnectionParameters(self.config['host'], self.config['port'], self.config['virtualHost'], credentials, ssl=True)
        
        return pika.BlockingConnection(parameters)

Now that we have a connection to the server, we need to set up a communication channel by invoking the channel method on the connection object:

import pika

class Publisher:
    def init(self, config):
        self.config = config
        
    def publish(self, message):
        connection = self._create_connection()
        channel = connection.channel()
        
    def _create_connection(self):
        credentials = pika.PlainCredentials(self.config['userName'], self.config['password'])
        parameters = pika.ConnectionParameters(self.config['host'], self.config['port'], self.config['virtualHost'], credentials, ssl=True)
        
        return pika.BlockingConnection(parameters)

In order for messages to be published to a queue, they need to go through an exchange. To ensure an exchange exists when a message is published, we need to declare it using the exchange_declare method on the channel object.

This method takes a number of arguments, but the only two I am going to pass in are the exchange name, whose parameter name is just exchange, as well as, the passive flag.

I'll pull the exchange's name from the configuration dictionary, and I will just set the passsive flag to True:

import pika

class Publisher:
    def init(self, config):
        self.config = config
        
    def publish(self, message):
        connection = self._create_connection()
        channel = connection.channel()
        
        channel.exchange_declare(exchange=self.config['exchangeName'], passive=True)
       
    def _create_connection(self):
        credentials = pika.PlainCredentials(self.config['userName'], self.config['password'])
        parameters = pika.ConnectionParameters(self.config['host'], self.config['port'], self.config['virtualHost'], credentials, ssl=True)
        
        return pika.BlockingConnection(parameters)

With the passive flag set to True, all this line of code is doing is ensuring that the exchange exists before trying to publish, so this particular call is not actually creating the exchange. I put that burden on my consumers, which I'll cover in another tutorial. If the exchange already exists, then the code just moves along nicely, but if the specified exchange does NOT exist, the exchange_declare method will raise an exception.

This is ideal, because if I know the consumer is supposed to create the exchange, and the specified exchange doesn't exist, then I know that no application is expecting the message that is about to be published, so there's no point in actually publishing it at this time.

Now that the connection and communication channels have been established and the exchange has been declared, we need to actually publish the message to the exchange, which is done using the basic_publish method on the channel object.

For this call, I am going to specify the name of the exchange to which the message will be published, the routing key, which is used for binding and routing messages from an exchange to one or more queues, and then then body, which is the actual message:

import pika

class Publisher:
    def init(self, config):
        self.config = config
        
    def publish(self, message):
        connection = self._create_connection()
        channel = connection.channel()
        
        channel.exchange_declare(exchange=self.config['exchangeName'], passive=True)
        channel.basic_publish(exchange=self.config['exchangeName'], routing_key=self.config['routingKey'], body=message)
        
    def _create_connection(self):
        credentials = pika.PlainCredentials(self.config['userName'], self.config['password'])
        parameters = pika.ConnectionParameters(self.config['host'], self.config['port'], self.config['virtualHost'], credentials, ssl=True)
        
        return pika.BlockingConnection(parameters)

And just for good measure, I'm going to add an output statement that will display the contents of the message that was just published to the exchange:

import pika

class Publisher:
    def init(self, config):
        self.config = config
        
    def publish(self, message):
        connection = self._create_connection()
        channel = connection.channel()
        
        channel.exchange_declare(exchange=self.config['exchangeName'], passive=True)
        channel.basic_publish(exchange=self.config['exchangeName'], routing_key=self.config['routingKey'], body=message)
        
        print(" [x] Sent message %r" % message)
        
    def _create_connection(self):
        credentials = pika.PlainCredentials(self.config['userName'], self.config['password'])
        parameters = pika.ConnectionParameters(self.config['host'], self.config['port'], self.config['virtualHost'], credentials, ssl=True)
        
        return pika.BlockingConnection(parameters)

The final thing that needs to be done in this new publish method is that the connection needs to be closed after the message has been published to the exchange. This helps ensure resources are always being cleaned up appropriately.

Just like with any software system, there are many different situations that could cause our publish method to error out. So, to ensure we're always closing the connection - even if an error occurs - I'm going to wrap the method body with a try/except/finally.

In the except block, I'm going to print out the error message and stack trace, and then raise the exception for the caller to handle.

The final version of this class is as follows (NOTE: Don't forget to import traceback at the top for the stack trace message!):

import pika
import traceback

class Publisher:
    def init(self, config):
        self.config = config
        
    def publish(self, message):
        connection = None
        
        try:
            connection = self._create_connection()
            channel = connection.channel()
            
            channel.exchange_declare(exchange=self.config['exchangeName'], passive=True)
            channel.basic_publish(exchange=self.config['exchangeName'], routing_key=self.config['routingKey'], body=message)
            
            print(" [x] Sent message %r" % message)
        except Exception as e:
            print(repr(e))
            traceback.print_exc()
            raise e
        finally:
            if connection:
                connection.close()
        
    def _create_connection(self):
        credentials = pika.PlainCredentials(self.config['userName'], self.config['password'])
        parameters = pika.ConnectionParameters(self.config['host'], self.config['port'], self.config['virtualHost'], credentials, ssl=True)
        
        return pika.BlockingConnection(parameters)

At this point, you should have a fully functional RabbitMQ publishing class that you can use in your Python scripts or applications, and now you know a little bit of how I publish messages to the T.I.T.U.S. message bus.

Enjoy!

Author image

About Tony Thorsen

Father of two, husband of one, Maker of many things. Tinkerer, dreamer, pixel nudger.