Institute of Computer Science
  1. Courses
  2. 2024/25 spring
  3. Cloud Computing (LTAT.06.008)
ET
Log in

Cloud Computing 2024/25 spring

  • Main
  • Lectures
  • Practicals
    • Plagiarism Policy
  • Results
  • Submit Homework

Practice 9 - Introduction to Message Broker - RabbitMQ

The purpose of this practice session is to provide an overview and working of the message queues and the transmission of the messages using the publish/subscribe mechanism. For this experience, we use the open-source distributed message broker RabbitMQ. You will learn how to set up RabbitMQ in the cloud, and how to create an exchange, queues, and set up routing rules. Further, we will use RabbitMQ in the message board application and process the message by sending a SMS using Twilio API.

References

  1. More about RabbitMQ concepts https://www.rabbitmq.com/tutorials/amqp-concepts.html
  2. RabbitMQ Python Library, Long Documentation: https://pika.readthedocs.io/en/latest/intro.html
  3. Long examples: https://pika.readthedocs.io/en/latest/examples.html

If you have problems, check:

  1. Possible problems and their potential solutions at the end of this guide.
  2. Ask in the Zulip channels

Exercise 9.1. Setting up RabbitMQ in OpenStack

In this task, we set up RabbitMQ in OpenStack VM using Docker CE. Further, we use the RabbitMQ administration interface to manage RabbitMQ entities and simpler steps for publishing and subscribing to data streams. This interface also helps us investigate whether data arrives at the Broker and what are the currently active queues, connections, and relationships.

Create an OpenStack VM instance:
  • Source: Ubuntu22.04
  • Flavour: should be g4.r4c2
  • Don't forget to enable "Delete volume on instance deletion"!

Install Docker as we did in Lab 2

Setup RabbitMQ:
  • Create a RabbitMQ Container
    • We configure it to run in background mode (-d)
    • Set environment variables RABBITMQ_DEFAULT_USER and RABBITMQ_DEFAULT_PASS
    • Port forward RabbitMQ container ports to host ports:
      • 5672:5672 (broker)
      • 80:15672 (RabbitMQ admin user interface)
    • RabbitMQ container image: rabbitmq:3-management
    • In the command, replace <LASTNAME> with your last name and set CUSTOM_PASSWORD
    • The complete command should look like this:
      • docker run -d --hostname <LAST_NAME>-rabbit -p 5672:5672 -p 80:15672 -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=CUSTOM_PASSWORD --name <LAST_NAME>rabbit rabbitmq:3-management
Accessing RabbitMQ and working with message publishing and routing:
  • You can access the RabbitMQ administration interface at http://<VM_IP>:80 and log in using the credentials set while creating the container.
  • Get familiar with the different tabs available, such as:
    • Connections: It shows the connections established to the RabbitMQ server.
    • Channels: Information about all current channels between clients and the RabbitMQ server.
    • Exchanges: It lists the available exchanges. An exchange receives messages from producers and users, or applications can define bindings between exchanges and queues, which define which data is delivered to a queue.
    • Queues: It shows the queues which are already defined in the RabbitMQ server.
  • Now, let us create a new exchange. Click on Exchanges--> Add a new Exchange.
    • Name: LASTNAME Give your last name. It should not contain punctuation or special letters.
    • Type: topic
    • Leave other details as default.
  • Let us publish a new message manually using the admin interface. For this, click on your newly created exchange and got to publish a message
    • Routing key: logs.warning
    • Payload:{ "log_type":"Warning","Message": "Your system is low on memory ..."}
    • Leave other details as default.
  • Create a new queue. Click on Queues and Streams-->Add a new queue
    • Clients will be able to subscribe to this queue to listen to messages.
    • Name: all_logs
    • Leave other details as default.
  • Create a new Binding. Click on the newly created queue and then go to Bindings
    • Binding defines which data (based on routing key value/pattern) is routed from an exchange to a specific queue.
    • From exchange: Your (previously created) Exchange name
    • Routing key: logs.*
    • Leave other details as default.
  • The overall Exchange, Binding and Queue looks like
  • Now, let us test by Manually republishing the same message you created in Exchange through the administration interface as before. (NB! do not do this through the Queue page but through the Exchanges page)
  • You should NOT see a message that message was "not routed"
  • Check for the messages that arrived in the queue
    • Go to Queues and Streams--> all_logs-->Get Messages. You should see the output as below
  • Take a screenshot of the situation where you can see that the queue was successfully retrieved. (NB! Please make sure that your RabbitMQ server IP is visible on the screenshot)

Exercise 9.2. Publishing and Consuming the Messages using RabbitMQ

In this task, we are going to learn to publish and subscribe to messages to/from RabbitMQ using the Python library (Pika).

Publish Messages:

We create a new Python program (Same as in the previous practice session. For example, using the PyCharm IDE), or you can create it in the VM using nano. The program will use the following instructions to publish a message. Before writing the Python program, Please install the Pika library using pip (pip3 install pika) or use the Python package manager in PyCharm IDE.

  • Create a Python virtual environment and activate it.
  • You can choose producer.py as file name.
  • Install pika pip package
  • Import the libraries
    import pika
    import random
    import datetime
    import json
  • Set up the details of the connection:
    • Broker location:broker_host = VM_IP (replace VM_IP to Your VM IP address )
    • Broker port: broker_port = 5672
  • Configuring authentication. Here, we use a username and password while creating the rabbitmq container to create a Long PlainCredentials object:
    • username = "..."
      password = "..."
      credentials = pika.PlainCredentials (username, password)
      
  • Create a connection string
    • connection = pika.BlockingConnection (
              pika.ConnectionParameters (host = broker_host, port = broker_port, credentials = credentials))
      channel = connection.channel()
      
    • Here, channel () creates a new channel through which we can send and listen to data to the RabbitMQ broker.
    • Here, we use BlockingConnection: It is a blocking, or synchronous, connection to RabbitMQ. This means that our program will wait for confirmation that the data sent has been entered into the RabbitMQ Exchange. You can also use SelectConnection for asynchronous communication.
  • Define a sample message to send using Python dictionaries
    •  message = {   
          "id": "f634f37d-218d-4f04-a5ab-8e61b14ceedd", 
          "content": "Parcel is arriving in 10min", 
          "img_path": "https://mywebsite.blob.core.windows.net/lab9/fognode.png",  
          "timestamp": "2024-04-08 09:20:57"
      }
      
    • Convert the message dictionary to a string value using the JSON library:message_str = json.dumps(message)
  • Finally, we will publish the data to the broker
    • First of all set rout key: routing_key = "messageboard.messages.urgent"
    • Define your exchange exchange=your_exchange replace here <your_exchange> that you created in previous task
    • Publish statement every five seconds and should send the messages until you interrupt with the keyboard. Here, use either a sleep function for 5 seconds for each publish or maybe try out some other ideas.
channel.basic_publish (
        exchange = exchange,
        routing_key = routing_key,
        body = message_str)
  • At last, should close the channel connection using
channel.close()
connection.close()
  • Run the Python program to publish the messages; check for the live connection and channel in the admin web interface, and it should look like the screenshot below.
    • It may be necessary to leave the connections open while making the screenshot. Feel free to add some code (e.g., for sleep) to pause the script before you close the RabbitMQ session.

Take a screenshot of the situation where you can see the live connections in the RabbitMQ web interface. (NB! Please make sure that your IP is visible on the screenshot)

Subscribe to Messages:

Create a Python program that listens for messages from the RabbitMQ server and prints them out.

  • Make a copy of the previous Python program
    • Name it as consumer.py
  • We will reuse the code until channel the object is created.
  • Create a queue:
    • Name the queue queue_name = "all_messages_queue"
    • Create the queue channel.queue_declare(queue=queue_name, durable=True), Here we use durable queues, that data is queued even if there is currently no listener.
  • Create a Routing key
    • Let us listen to all messages:
      • routing_key = "messageboard.messages.*"
  • Create a Binding between the Queue and Exchange:
    channel.queue_bind(exchange=exchange, queue=queue_name, routing_key=routing_key)
  • Create a separate Python function that will be called later for each message, and The feature should print the contents of the incoming message and the routing key.
    def lab_callback (ch, method, properties, body):
        message = json.loads(body.decode())
        print(message)
        ch.basic_ack (delivery_tag = method.delivery_tag)
  • We set the application to listen to the RabbitMQ queue and run our lab_callback function on every incoming message.
channel.basic_consume(queue = queue_name, on_message_callback = lab_callback)
channel.start_consuming()
Testing publish and consume (subscribe) clients:

Now run both the publish and consume Python codes on different terminals and should show the output below:

Take a screenshot of both publisher and consumer terminals and save both codes for submission.

Exercise 9.3.Message Board Application and message routing

We will reuse the Message Board application developed during Practice Session 5. Here, we will modify the app to filter the messages and process them asynchronously when they are bound to an external API or compute or io-bound task. Here, we accept message priority from the user, and if a message seems to be of higher priority (urgent), then we'll will notify through SMS. For this, we integrate Twilio API to dispatch SMS notifications. However, since this SMS dispatch task requires an external API, we'll employ RabbitMQ worker queues for queuing and handling (sending SMS) high-priority messages.

In this task, we route high-priority messages into urgent_messages_queue and all the messages into all_messages_queue as shown below. Consumer service for sending Twilio API is performed in the next exercise.

  • Prepare the Message Board application for deployment on your local environment.
    • Bring the code submitted as a part of deliverables in Practice Session 05.
    • Create storage account, storage container, COSMOS database, and its container in Azure enviornment as performed in Practice Session 05.
    • Export the environment variables CONN_KEY, STORAGE_ACCOUNT, COSMOS_URL, MasterKey.
      • PS! Dont forget to remove APPSETTING_ prefixed while accessing environment variables in app.py
      • Install pip requirements
    • Run and test the application.
  • Modify the home.html to accept the user priority of the message.
    • It can be a drop-down menu that accepts two selections (urgent, not-urgent)
    • You can use HTML select element, for example, described here
       <label for="message-type">Choose message type :</label><br> 
            <select name="message-type" id="message-type"> 
                <option value="urgent">urgent</option> 
                <option value="not-urgent">not-urgent</option> 
            </select><br>
  • Modify app.py to publish the messages to RabbitMQ.
    • Import the pika package
    • Read the environment variables needed to connect to the RabbitMQ Channel using os.getenv(' ')
      • BROKER_IP, BROKER_PORT, USERNAME, PASSWORD
    • Create pika plain credentials as you created in exercise 2.
    • Create a pika.BlockingConnection() as you created in exercise 2.
    • Create a function publish_rabbitmq(new_message,blob_path,messagetype)
      • Create a channel channel = connection.channel()
      • Declare the exchange exchange=<your_last_name>
      • Create a JSON document as a payload to message broker message_to_send = {"content": message,"blob_path": blob_path}
      • Reuse the publish logic from the previous exercise. But you need to publish to exchange by checking the condition.
        • Checking the condition if messagetype == "urgent" holds true, then routing_key should be messageboard.messages.urgent
        • If the condition doesn't hold true, then routing_key should be messageboard.messages.noturgent
        • Use channel.basic_publish() method for publishing the message on both of the conditions.
    • In handleMessage() method read the input(messagetype) from home.html form parameter.
      • It can be messagetype = request.form['message-type']
      • Call the publish_rabbitmq(new_message,blob_path,messagetype) method.
        • It can be called after insert_cosmos method invocation.
    • Save the app.py code.
    • Export the variables (BROKER_IP, BROKER_PORT, USERNAME, PASSWORD) with their values in the terminal.
    • Start the RabbitMQ consumer created in the previous task in the new terminal.
      • Change the queue name queue_name = "urgent_messages_queue"
      • Change the routing key routing_key="messageboard.messages.urgent"
    • Run the application and insert some messages.
    • Now, you should see a message arriving in the queue and in the consumer terminal.

NB! Enable debugging to see error messages. if you use the command line command flask to start the program, add --debug key to the command to enable it.

Take a screenshot from the RabbitMQ dashboard showing the statistics of urgent_messages_queeu as shown below. This can be achieved by moving inside the queue. (Make sure your IP is visible)

Exercise 9.4. Process the message by notifying it via SMS using Twilio API

The objective of this task is to consume messages that arrive in the RabbitMQ exchange and notify the recipient via SMS using the Twilio API as shown in the figure.

  • Create a Twilio Free trial account and set up Twilio credentials at console.
    • Sign up for a free Twilio trial.
    • You need a mobile phone number for this.
      • Validate your mobile number
    • Set up your account so that you get access to a Twilio virtual mobile number, user ID (TWILIO_ACCOUNT_SID), and Authentication Token (TWILIO_AUTH_TOKEN) in the Twilio console.
    • These will be needed to send automated SMS.
  • Install twilio pip package
  • Extend the Consumer.py code to send the SMS using the Twilio pip package.
    • import Twilio methods from twilio.rest import Client
    • Read the environmental variables
      • account_sid = os.environ['TWILIO_ACCOUNT_SID'] and auth_token = os.environ['TWILIO_AUTH_TOKEN'].
      • Phone numbers to_phone = os.getenv('tophone'),from_phone = os.getenv('fromphone')
        • Change the from_phone number to your virtual number offered by Twilio.
        • Change the to_phone number to your phone number, where the SMS has to arrive.
    • Send SMS by modifying lab_callback method. After printing the statement, add the code to send the SMS. Keep the rest of the code the same.
      • Create a Twilio client client = Client(account_sid, auth_token)
      • Create a message and send it.
message = client.messages \
        .create(
            body="High priority message for you:"+message['content'],
            from_=from_phone, #'+13343731130'
            to=to_phone #'+372562XXXX'
        )
  • Print the message on the consoleprint(message)
  • export environment variables
    • TWILIO_ACCOUNT_SID and TWILIO_AUTH_TOKEN,tophone,fromphone
    • Broker details: username, password,broker_host
  • Run the consumer
  • Test the end-to-end tasks by inserting the messages in the Message Board Application.

Take a screenshot from the RabbitMQ dashboard showing the consumer connected under queue urgent_messages_queue. (Make sure your IP is visible)

Take a screenshot of the consumer terminal that contains the printed Twilio message details.

Bonus task

PS! Make a screenshot of the message board application running with sample images shown on the web page.

Create another RabbitMQ consumer service that resizes the image whenever a new message arrives in all_messages_queue. This is similar to consumer.py with some of extra functions such as:

  1. Download the blob download_blob_to_file(blob_path) function
    • The blob_path can be taken from the message (JSON document) that arrived in the queue.
    • You can get hint to download blob from here
  2. Image resize function resize_image(filename).
    • You can use Pillow - a pip package for image-related functions such as resize. One such example is here
    • Make thumbnail of size (50,50)
  3. Upload the blob with the filename(for example orginalfilename_small.jpg) and blob_path insert_blob(img_path):
    • This is similar to the function used in the earlier exercise to upload the blob.
  4. Invoke the functions in lab_callback method
    • Get the JSON document-message
    • Get the blob_path
    • Invoke download blob function
    • Invoke resize function
    • Invoke insert blob function

Bonus task deliverables:

  1. Python code for the RabbitMQ Python consumer script
  2. Make screenshot that displays at least one image after the image is resized and displayed on the web page of your message board application. (Make sure your IP is visible)

Deliverables:

  • Terminate your VM Instance
  • Codes and screenshots from the following tasks:
    1. From 9.1 - 1 screenshot
    2. From 9.2 - 2 screenshots and code (producer and consumer).
    3. From 9.3 - 1 screenshot
    4. From 9.4 - 2 screenshot and code.
You must be logged in and registered to the course in order to submit solutions.
  • Institute of Computer Science
  • Faculty of Science and Technology
  • University of Tartu
In case of technical problems or questions write to:

Contact the course organizers with the organizational and course content questions.
The proprietary copyrights of educational materials belong to the University of Tartu. The use of educational materials is permitted for the purposes and under the conditions provided for in the copyright law for the free use of a work. When using educational materials, the user is obligated to give credit to the author of the educational materials.
The use of educational materials for other purposes is allowed only with the prior written consent of the University of Tartu.
Terms of use for the Courses environment