Practice 9 - Introduction to Message Broker - RabbitMQ
The purpose of this practice session is to provide an overview and working of the message queues and the transmission of the messages using the publish/subscribe mechanism. For this experience, we use the open-source distributed message broker RabbitMQ. You will learn how to set up RabbitMQ in the cloud, and how to create an exchange, queues, and set up routing rules. Further, we will use RabbitMQ in the message board application and process the message by sending a SMS using Twilio API.
References
- More about RabbitMQ concepts https://www.rabbitmq.com/tutorials/amqp-concepts.html
- RabbitMQ Python Library, Long Documentation: https://pika.readthedocs.io/en/latest/intro.html
- Long examples: https://pika.readthedocs.io/en/latest/examples.html
If you have problems, check:
- Possible problems and their potential solutions at the end of this guide.
- Ask in the Zulip channels
Exercise 9.1. Setting up RabbitMQ in OpenStack
In this task, we set up RabbitMQ in OpenStack VM using Docker CE. Further, we use the RabbitMQ administration interface to manage RabbitMQ entities and simpler steps for publishing and subscribing to data streams. This interface also helps us investigate whether data arrives at the Broker and what are the currently active queues, connections, and relationships.
Create an OpenStack VM instance:
- Source: Instead of an Image, use Volume Snapshot, choose
Ubuntu22+Docker
- In this Ubuntu VM snapshot, the installation of Docker (as we did in Lab 2) has already been done for us.
- Flavour: should be
m3.nano
- Don't forget to enable "Delete volume on instance deletion"!
Setup RabbitMQ:
- Create a RabbitMQ Container
- We configure it to run in background mode (-d)
- Set environment variables RABBITMQ_DEFAULT_USER and RABBITMQ_DEFAULT_PASS
- Port forward RabbitMQ container ports to host ports:
5672:5672
(broker)80:15672
(RabbitMQ admin user interface)
- RabbitMQ container image:
rabbitmq:3-management
- In the command, replace <LASTNAME> with your last name and set CUSTOM_PASSWORD
- The complete command should look like this:
docker run -d --hostname <LAST_NAME>-rabbit -p 5672:5672 -p 80:15672 -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=CUSTOM_PASSWORD --name <LAST_NAME>rabbit rabbitmq:3-management
Accessing RabbitMQ and working with message publishing and routing:
- You can access the RabbitMQ administration interface at http://<VM_IP>:80 and log in using the credentials set while creating the container.
- Get familiar with the different tabs available, such as:
- Connections: It shows the connections established to the RabbitMQ server.
- Channels: Information about all current channels between clients and the RabbitMQ server.
- Exchanges: It lists the available exchanges. An exchange receives messages from producers and users, or applications can define bindings between exchanges and queues, which define which data is delivered to a queue.
- Queues: It shows the queues which are already defined in the RabbitMQ server.
- Now, let us create a new exchange. Click on Exchanges--> Add a new Exchange.
- Name:
LASTNAME
Give your last name. It should not contain punctuation or special letters. - Type:
topic
- Leave other details as default.
- Name:
- Let us publish a new message manually using the admin interface. For this, click on your newly created exchange and got to publish a message
- Routing key:
logs.warning
- Payload:
{ "log_type":"Warning","Message": "Your system is low on memory ..."}
- Leave other details as default.
- Routing key:
- Create a new queue. Click on Queues-->Add a new queue
- Clients will be able to subscribe to this queue to listen to messages.
- Name:
all_logs
- Leave other details as default.
- Create a new Binding. Click on the newly created queue and then go to Bindings
- Binding defines which data (based on routing key value/pattern) is routed from an exchange to a specific queue.
- From exchange: Your (previously created) Exchange name
- Routing key:
logs.*
- Leave other details as default.
- The overall Exchange, Binding and Queue looks like
- Now, let us test by Manually republishing the same message you created in Exchange through the administration interface as before. (NB! do not do this through the Queue page but through the Echanges page)
- You should NOT see a message that message was "not routed"
- Check for the messages that arrived in the queue
- Go to Queues--> Get Messages. You should see the output as below
- Take a screenshot of the situation where you can see that the queue was successfully retrieved. (NB! Please make sure that your RabbitMQ server IP is visible on the screenshot)
Exercise 9.2. Publishing and Consuming the Messages using RabbitMQ
In this task, we are going to learn to publish and subscribe to messages to/from RabbitMQ using the Python library (Pika).
Publish Messages:
We create a new Python program (Same as in the previous practice session. For example, using the PyCharm IDE), or you can create it in the VM using nano. The program will use the following instructions to publish a message. Before writing the Python program, Please install the Pika library using pip (pip3 install pika) or use the Python package manager in PyCharm IDE.
- Create a Python virtual environment and activate it.
- You can choose
producer.py
as file name. - Install pika pip package
- Import the libraries
import pika import random import datetime import json
- Set up the details of the connection:
- Broker location:
broker_host = VM_IP
(replace VM_IP to Your VM IP address ) - Broker port:
broker_port = 5672
- Broker location:
- Configuring authentication. Here, we use a username and password while creating the rabbitmq container to create a Long PlainCredentials object:
-
username = "..." password = "..." credentials = pika.PlainCredentials (username, password)
-
- Create a connection string
connection = pika.BlockingConnection ( pika.ConnectionParameters (host = broker_host, port = broker_port, credentials = credentials)) channel = connection.channel()
- Here, channel () creates a new channel through which we can send and listen to data to the RabbitMQ broker.
- Here, we use BlockingConnection: It is a blocking, or synchronous, connection to RabbitMQ. This means that our program will wait for confirmation that the data sent has been entered into the RabbitMQ Exchange. You can also use SelectConnection for asynchronous communication.
- Define a sample message to send using Python dictionaries
message = { "id": "f634f37d-218d-4f04-a5ab-8e61b14ceedd", "content": "Parcel is arriving in 10min", "img_path": "https://mywebsite.blob.core.windows.net/lab9/fognode.png", "timestamp": "2024-04-08 09:20:57" }
- Convert the message dictionary to a string value using the JSON library:
message_str = json.dumps(message)
- Finally, we will publish the data to the broker
- First of all set rout key:
routing_key = "messageboard.messages.urgent"
- Define your exchange
exchange=your_exchange
replace here <your_exchange> that you created in previous task - Publish statement every five seconds and should send the messages until you interrupt with the keyboard. Here, use either a sleep function for 5 seconds for each publish or maybe try out some other ideas.
- First of all set rout key:
channel.basic_publish ( exchange = exchange, routing_key = routing_key, body = message_str)
- At last, should close the channel connection using
channel.close() connection.close()
- Run the Python program to publish the messages; check for the live connection and channel in the admin web interface, and it should look like the screenshot below.
- It may be necessary to leave the connections open while making the screenshot. Feel free to add some code (e.g., for sleep) to pause the script before you close the RabbitMQ session.
Take a screenshot of the situation where you can see the live connections in the RabbitMQ web interface. (NB! Please make sure that your IP is visible on the screenshot)
Subscribe to Messages:
Create a Python program that listens for messages from the RabbitMQ server and prints them out.
- Make a copy of the previous Python program
- Name it as
consumer.py
- Name it as
- We will reuse the code until
channel
the object is created. - Create a queue:
- Name the queue
queue_name = "all_messages_queue"
- Create the queue
channel.queue_declare(queue=queue_name, durable=True)
, Here we use durable queues, that data is queued even if there is currently no listener.
- Name the queue
- Create a Routing key
- Let us listen to all messages:
routing_key = "messageboard.messages.*"
- EDIT (this sentence does not make any sense, seems) :
NB! Because students all have separate RabbitMQ servers, you don't actually see each other's data here.
- Let us listen to all messages:
- Create a Binding between the Queue and Exchange:
channel.queue_bind(exchange=exchange, queue=queue_name, routing_key=routing_key)
- Create a separate Python function that will be called later for each message, and The feature should print the contents of the incoming message and the routing key.
def lab_callback (ch, method, properties, body): message = json.loads(body.decode()) print(message) ch.basic_ack (delivery_tag = method.delivery_tag)
- We set the application to listen to the RabbitMQ queue and run our lab_callback function on every incoming message.
channel.basic_consume(queue = queue_name, on_message_callback = lab_callback) channel.start_consuming()
Testing publish and consume (subscribe) clients:
Now run both the publish and consume Python codes on different terminals and should show the output below:
Take a screenshot of both publisher and consumer terminals and save both codes for submission.
Exercise 9.3.Message Board Application and message routing
We will reuse the Message Board application developed during Practice Session 5. Here, we will modify the app to filter the messages and process them asynchronously when they are bound to an external API or compute or io-bound task. Here, we accept message priority from the user, and if a message seems to be of higher priority (urgent), then we'll will notify through SMS. For this, we integrate Twilio API to dispatch SMS notifications. However, since this SMS dispatch task requires an external API, we'll employ RabbitMQ worker queues for queuing and handling (sending SMS) high-priority messages.
In this task, we route high-priority messages into urgent_messages_queue
and all the messages into all_messages_queue
as shown below. Consumer service for sending Twilio API is performed in the next exercise.
- Prepare the Message Board application for deployment on your local environment.
- Bring the code submitted as a part of deliverables in Practice Session 05.
- Create storage account, storage container, COSMOS database, and its container in Azure enviornment as performed in Practice Session 05.
- Export the environment variables CONN_KEY, STORAGE_ACCOUNT, COSMOS_URL, MasterKey.
- PS! Dont forget to remove APPSETTING_ prefixed while accessing environment variables in
app.py
- Install pip requirements
- PS! Dont forget to remove APPSETTING_ prefixed while accessing environment variables in
- Run and test the application.
- Modify the
home.html
to accept the user priority of the message.- It can be a drop-down menu that accepts two selections (urgent, not-urgent)
- You can use HTML select element, for example, described here
<label for="message-type">Choose message type :</label><br> <select name="message-type" id="message-type"> <option value="urgent">urgent</option> <option value="not-urgent">not-urgent</option> </select><br>
- Modify
app.py
to publish the messages to RabbitMQ.- Import the pika package
- Read the environment variables needed to connect to the RabbitMQ Channel using
os.getenv(' ')
- BROKER_IP, BROKER_PORT, USERNAME, PASSWORD
- Create pika plain credentials as you created in exercise 2.
- Create a pika.BlockingConnection() as you created in exercise 2.
- Create a function
publish_rabbitmq(new_message,blob_path,messagetype)
- Create a channel
channel = connection.channel()
- Declare the exchange
exchange=<your_last_name>
- Create a JSON document as a payload to message broker
message_to_send = {"content": message,"blob_path": blob_path}
- Reuse the publish logic from the previous exercise. But you need to publish to exchange by checking the condition.
- Checking the condition
if messagetype == "urgent"
holds true, then routing_key should bemessageboard.messages.urgent
- If the condition doesn't hold true, then routing_key should be
messageboard.messages.noturgent
- Use channel.basic_publish() method for publishing the message on both of the conditions.
- Checking the condition
- Create a channel
- In
handleMessage()
method read the input(messagetype) from home.html form parameter.- It can be
messagetype = request.form['message-type']
- Call the publish_rabbitmq(new_message,blob_path,messagetype) method.
- It can be called after insert_cosmos method invocation.
- It can be
- Save the
app.py
code. - Export the variables (BROKER_IP, BROKER_PORT, USERNAME, PASSWORD) with their values in ther terminal.
- Start the RabbitMQ consumer created in the previous task in the new terminal.
- Change the queue name
queue_name = "urgent_messages_queue"
- Change the routing key
routing_key="messageboard.messages.urgent"
- Change the queue name
- Run the application and insert some messages.
- Now, you should see a message arriving in the queue and in the consumer terminal.
NB! Enable debugging to see error messages. if you use the command line command flask
to start the program, add --debug
key to the command to enable it.
Take a screenshot from the RabbitMQ dashboard showing the statistics of urgent_messages_queeu
as shown below. This can be achieved by moving inside the queue. (Make sure your IP is visible)
Exercise 9.4. Process the message by notifying it via SMS using Twilio API
The objective of this task is to consume messages that arrive in the RabbitMQ exchange and notify the recipient via SMS using the Twilio API as shown in the figure.
- Create a Twilio Free trial account and set up Twilio credentials at console.
- Sign up for a free Twilio trial.
- You need a mobile phone number for this.
- Validate your mobile number
- Set up your account so that you get access to a Twilio virtual mobile number, user ID (TWILIO_ACCOUNT_SID), and Authentication Token (TWILIO_AUTH_TOKEN) in the Twilio console.
- These will be needed to send automated SMS.
- Install
twilio
pip package - Extend the
Consumer.py
code to send the SMS using the Twilio pip package.- import Twilio methods
from twilio.rest import Client
- Read the environmental variables
-
account_sid = os.environ['TWILIO_ACCOUNT_SID']
andauth_token = os.environ['TWILIO_AUTH_TOKEN']
. - Phone numbers
to_phone = os.getenv('tophone')
,from_phone = os.getenv('fromphone')
- Change the
from_phone
number to your virtual number offered by Twilio. - Change the
to_phone
number to your phone number, where the SMS has to arrive.
- Change the
-
- Send SMS by modifying
lab_callback
method. After printing the statement, add the code to send the SMS. Keep the rest of the code the same.- Create a Twilio client
client = Client(account_sid, auth_token)
- Create a message and send it.
- Create a Twilio client
- import Twilio methods
message = client.messages \ .create( body="High priority message for you:"+message['content'], from_=from_phone, #'+13343731130' to=to_phone #'+372562XXXX' )
- Print the message on the console
print(message)
- Print the message on the console
- export environment variables
- TWILIO_ACCOUNT_SID and TWILIO_AUTH_TOKEN,tophone,fromphone
- Broker details: username, password,broker_host
- Run the consumer
- Test the end-to-end tasks by inserting the messages in the Message Board Application.
Take a screenshot from the RabbitMQ dashboard showing the consumer connected under queue urgent_messages_queue. (Make sure your IP is visible)
Take a screenshot of the consumer terminal that contains the printed Twilio message details.
Bonus task
PS! Make a screenshot of the message board application running with sample images shown on the web page.
Create another RabbitMQ consumer service that resizes the image whenever a new message arrives in all_messages_queue
. This is similar to consumer.py
with some of extra functions such as:
- Download the blob
download_blob_to_file(blob_path)
function- The
blob_path
can be taken from the message (JSON document) that arrived in the queue. - You can get hint to download blob from here
- The
- Image resize function
resize_image(filename)
.- You can use Pillow - a pip package for image-related functions such as resize. One such example is here
- Make thumbnail of size (50,50)
- Upload the blob with the filename(for example orginalfilename_small.jpg) and blob_path
insert_blob(img_path)
:- This is similar to the function used in the earlier exercise to upload the blob.
- Invoke the functions in
lab_callback
method- Get the JSON document-message
- Get the blob_path
- Invoke download blob function
- Invoke resize function
- Invoke insert blob function
Bonus task deliverables:
- Python code for the RabbitMQ Python consumer script
- Make screenshot that displays at least one image after the image is resized and displayed on the web page of your message board application. (Make sure your IP is visible)
Deliverables:
- Terminate your VM Instance
- Codes and screenshots from the following tasks:
- From 9.1 - 1 screenshot
- From 9.2 - 2 screenshots and code (producer and consumer).
- From 9.3 - 1 screenshot
- From 9.4 - 2 screenshot and code.