Practice 10 - Introduction to Message Broker - RabbitMQ
The purpose of this practice session is to provide an overview and working of the message queues and the transmission of the messages using publish/subscribe mechanism. For this experience, we use open source-based distributed message broker RabbitMQ. You will learn how to set up RabbitMQ in the cloud, and how to create an exchange, queues, and routing. Further, you will work with distributed processing by consuming and publishing the output.
References
- More about RabbitMQ concepts https://www.rabbitmq.com/tutorials/amqp-concepts.html
- RabbitMQ Python Library, Long Documentation: https://pika.readthedocs.io/en/latest/intro.html
- Long examples: https://pika.readthedocs.io/en/latest/examples.html
If you have problems, check:
- Pinned messages in
#lab10-rabbitmq
Slack channel. - Possible problems and their potential solutions at the end of this guide.
- Ask
#lab10-rabbitmq
Slack's channel.
Exercise 10.1. Setting up of RabbitMQ!
In this task, we set up RabbitMQ in OpenStack VM using Docker CE. Further, we use the RabbitMQ administration interface to manage RabbitMQ entities and simpler steps for sending and listening to data through samples. This interface also helps us investigate well whether the data arrives at the Broker and what are the currently active queues, connections, and relationships.
Create an OpenStack VM instance:
- Source: Instead of an Image, use Volume Snapshot, choose
Ubuntu 20 + Docker 20.10
- in this Ubuntu-based snapshot, the installation of Docker (as we did in Lab 2) has already been done for us.
- Flavour: should be
m2.tiny
- Don't forget to enable "Delete volume on instance deletion"!
Setup RabbitMQ:
- Create a RabbitMQ Container using the command
- Run in background mode
- Set environment variables RABBITMQ_DEFAULT_USER and RABBITMQ_DEFAULT_PASS
- Port forward container port to host port:
5672:5672
(broker),80:15672
(RabbitMQ admin user interface) - RabbitMQ container image:
rabbitmq:3-management
- In the command, replace <LASTNAME> with your last name and set CUSTOM_PASSWORD
- The complete command looks like :
docker run -d --hostname <LAST_NAME>-rabbit -p 5672:5672 -p 80:15672 -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=CUSTOM_PASSWORD --name <LAST_NAME>rabbit rabbitmq:3-management
Accessing RAbbitMQ and working with publishing and routing:
- You can access the RabbitMQ administration interface at http://<VM_IP>:80 and login using the credentials set while creating the container.
- Get handhold with different tabs available such as
- Connections: It shows the connections established to the RabbitMQ server.
- Channels: Information about all current channels.
- Exchanges: It lists the list of exchanges. An exchange receives messages from producers and pushes them to queues. The exchange must know exactly what to do with a message it receives.
- Queues: It shows the queues for all or one selected host.
- Now, let us create a exchange. Click on Exchanges--> Add a new Exchange.
- Name:
LASTNAME
Give your last name. Should not contain punctuation or special letters. - Type:
topic
- Leave other details as default.
- Name:
- Let us publish a new message manually using the admin interface. For this click on your newly created exchange and got to publish a message
- Routing key:
delta.student.your_lastname.temperature
- Payload:
{"temperature": "24"}
- Leave other details as default.
- Routing key:
- Create a new queue. Click on Queues-->Add a new queue
- Name:
your_lastname_queue
- Leave other details as default.
- Name:
- Create a new Binding. Click on newly create queue and than go to Bindings
- From exchange: Your (previously created) Exchange name
- Routing key:
delta.student.your_lastname.*
- Leave other details as default.
- The overall Exchange, Binding and Queue looks like
- Now, let us test by Manually republishing the same message you created in Exchange through the administration interface as before. (NB! do not do this through the Queue page but through the Echanges page)
- Check for the messages that arrived in the queue
- Go to Queues--> Get Messages. You should see the output as below
- Take a screen view of the situation where you can see that the queue was successfully retrieved. (NB! Please make sure that your IP is visible on the screenshot)
- After that, when you try to retrieve data from the queue again, the message should no longer be in the queue unless you have added more than one in the meantime.
Exercise 10.2. Publishing and Consuming the Messages using RabbitMQ
In this task,we are going to learn to publish and consume (subscribe) messages to/from RabbitMQ using the python long library (Pika).
Publish Messages:
We create a new Python program (Same as in the previous tutorial. For example, using the PyCharm IDE) or you create it in the VM using nano. The program will use the following instructions to publish a message. Before writing the python program, Please install the Pika library using pip (pip3 install pika) or in PyCharm IDE follow Practice Session 8 instructions to add a python library.
- Import the libraries
import pika import random import datetime import json
- Set up the details of the connection:
- Broker location:
broker_host = VM_IP
(replace VM_IP to Your VM IP address ) - Broker port:
broker_port = 5672
- Broker location:
- Configuring authentication. Here, we use username and password set while creating the rabbitmq container to create a Long PlainCredentials object:
-
username = "..." password = "..." credentials = pika.PlainCredentials (username, password)
-
- Create a connection string
connection = pika.BlockingConnection ( pika.ConnectionParameters (host = broker_host, port = broker_port, credentials = credentials)) channel = connection.channel()
- Here, channel () creates a new channel through which we can send and listen to data to the RabbitMQ broker.
- Here we use BlockingConnection: It is a blocking, or synchronous, connection to RabbitMQ. This means that our program will wait for confirmation that the data sent has been entered into the RabbitMQ Exchange. You can also use SelectConnection for asynchronous communication.
- Define a sample message to send and we use python dictionries
message = { "device_name": "your_last_name_sensor", "temperature": random.randint (20, 40), "time": str (datetime.datetime.now ()) }
- Convert the message dict to a string value using the JSON library:
message_str = json.dumps(message)
- Finally we will publish the data to the broker
- First of all set rout key:
routing_key = "iotdevice.your_last_name.tempsensor"
- Define your exchange
exchange=your_exchange
replace here <your_exchange> that you created in previous task - Publish statement every five seconds and should send the messages until you interrupt with the keyboard. Here, use either sleep function for 5 seconds in each publish or may be try out with your ideas.
- First of all set rout key:
channel.basic_publish ( exchange = exchange, routing_key = routing_key, body = message_str)
- At last, should close the channel connection using
channel.close() connection.close()
- Run the python program to publish the messages, Check for the live connection and channel in the admin web interface, and should look like as below screenshot.
Take a screen view of the situation where you can see the live connections in the RabbitMQ web interface. (NB! Please make sure that your IP is visible on the screenshot)
Consume (Subscribe) Messages:
Create a Python program that listens for messages from the RabbitMQ server and prints them out.
- Make a copy of the previous python program
- We will reuse the code until
channel
the object is created. - Create a queue:
- Name the queue
queue_name = "last_name_queue"
- Create the queue
channel.queue_declare(queue=queue_name, durable=True)
, Here we use durable queues, that data is queued even if there is currently no listener.
- Name the queue
- Create a Routing key
- Let us listen to all temperature sensors under iotdevice, not just yours:
routing_key = "iotdevice.*.tempsensor"
(NB! Because students all have separate RabbitMQ servers, you don't actually see each other's data here.)
- Create a Binding between the Queue and Exchange:
channel.queue_bind(exchange=exchange, queue=queue_name, routing_key=routing_key)
- Create a separate Python function that will be called later for each message and The feature should print the contents of the incoming message and the routing key.
def lab_callback (ch, method, properties, body): print ("Inbox:% r"% body.decode ()) ch.basic_ack (delivery_tag = method.delivery_tag)
- We set the application to listen to the RabbitMQ queue, and run our lab_callback function on every incoming message.
channel.basic_consume(queue = queue_name, on_message_callback = lab_callback) channel.start_consuming()
Testing publish and consume (subscribe) clients:
Now run the both the publish and consume python codes on different terminals and should show the output as below
Take a screen view of both publisher and consumer terminals and save both codes for submission.
Exercise 10.3. Message Processing and Routing
This task will help you to explore how to subscribe to the messages. process them and publish and route them to the other output queues. For this, we will use real-time IoT data.
Dataset description:
[ Click to see dataset description ]
The IoT data used in this task belongs to IoT based Puhatu Monitoring System. This includes wetland environmental monitoring using IoT devices with a set of sensors (such as Air & Water Temp, the height of the bog on the wetland using distance sensor, Air and Water pressure so on ) in the northern part of the Puhatu Nature Protection Area (NPA), NW Estonia next to the open-pit oil-shale quarry.
You can download the datasets Attach:puhatu.csv. There are eleven IoT devices each consisting of five sensor readings. But four devices were test devices located in the laboratory and the other seven were on the site. Data was recorded at a certain time intervals and it also consists of metadata such as the current status of the IoT device such as current battery utilization etc.
The devices with dev_id fipy_e1, fipy_b1,fipy_b2,fipy_b3 were indoor, located at laboratory and with dev_id puhatu_b1,puhatu_b2,puhatu_b3,puhatu_c1,puhatu_c2,puhatu_c3 and puhatu_l1 were outdoor, located at monitoring site.
The following table shows the sensor's data fields in the left column and the right column indicates the list of a few metadata fields.
Dataset attributes (column names) are: | |||
|
|
In this exercise, we will mimic the sensor behavior and publish the sensor data to the exchange, further We write a new Python application that listens for messages from the input queue, launches a user-defined function (to parse the raw sensor data and extract the required fields and tag the device either its testing device or on the bog device using device id ), and publishes the results to other output queues. The overall scenario to be designed in this exercise is as shown below:
Publish the IoT data to Exchange:
Create a python program to publish the IoT data by reading from a CSV file at interval every 2 seconds.
- Make a copy of the previous publish python program
- We will add our code after the line
channel = connection.channel()
- Let us create a queue for puhatu raw data and bind to the exchange
exchange = "your_exchange_name" queue_name = "puhatu_queue" output_routing_key = "puhatu.#" # Create new queue channel.queue_declare(queue = queue_name, durable = True) # Binding Exchange and queue with routing key channel.queue_bind(exchange = exchange, queue = queue_name, routing_key = output_routing_key)
- Read the csv using pandas data frame.(Please install pandas pip package if not present )
- Convert the dataframe row in to json documents like
meesages= json.loads(df.to_json(orient="records"))
- Define rouring_key and exchange
routing_key = "puhatu.last_name.raw"
exchange = your_exchange
(replace with earlier created exchange)
- Write the code to publish the messages at every 2 seconds
- Iterate over the len(messages)
- Dump the message using json like
message = json.dumps(messages[i])
- Use basic_publish to publish the message
- Write the print statement like ("Message published to the exchange",exchange)
- Wait for 2 seconds
- Close the channel and connection.
- After the running the program, each row of the CSV published as a json document to RabbitMQ Exchange
- You can see the message in admin interface under puhatu_queue as like below
Take a screen view of publisher as shown above(Make sure your VM IP visible).
Subscribe, Process the message (raw IoT data) and Publish again to exchange:
Here, we consume the IoT data which is published in the above and process to extract the dev_id field then find that IoT device is an indoor or outdoor device, and than tag with field device_location with value indoor or outdoor and again publish the data to another queue(tag_queue).
- Make a copy of the previous subscriber python program from Excerices 10.2
- Here, update the lab_callback method with the following things:
- Define routing for output message
output_routing_key = "tag."+str(method.routing_key)
- Get the message and decode
message = json.loads(body.decode())
- Check for message['dev_id'] is found in indoor = ['fipy_e1', 'fipy_b1','fipy_b2','fipy_b3'] or else in outdoor = ['puhatu_b1','puhatu_b2','puhatu_b3','puhatu_c1','puhatu_c2','puhatu_c3','puhatu_l1'] using
if
. - Tag with dev_location as
outdoor
orindoor
based on result of theif
as likeparsed_message = {"message": message,"dev_location": "outdoor"}
- Than dump the json document to send(outside
if
)parsed_message_string = json.dumps(parsed_message)
- Send back the result
ch.basic_publish(exchange = exchange, routing_key = output_routing_key, body = parsed_message_string)
- Define routing for output message
- Let's change the rest of the head code in the program:
- We will make sure that a new output queue is created where the tagged messages reach permanently (Create a queue with name tag_queue in RabbitMQ Administrative interface)
- Here, update the lab_callback method with the following things:
queue_name="puhatu_queue" output_routing_key = "tag.#" output_queue_name = "tag_queue" channel.queue_declare (queue = output_queue_name , durable = True) channel.queue_bind (exchange = exchange, queue = output_queue_name, routing_key = output_routing_key)
Testing the end to end system:
- Run the publisher in one terminal
- Run the publisher in the another terminal
- Now, you should see the queued, printed messages in the administrative dashboard.
- Under Queues-->tag_queue-->Get Messages
Take a screen view as shown above(Make sure your VM IP is visible).
Exercise 10.4. Filtering and Alert
In this exercise, we will filter the messages from tag_queue based on dev_location and check for anomaly in sensor data with a certain threshold on the outdoor tagged messages. If anomaly present then will print the message.
You need to implement the overall scenario as shown below:
Subscribe the tagged data from tag_queue, process and publish the output:
Here, consume the tag_queue and process the message to filter the data based on dev_location and send to indoor and outdoor queues.
- Copy and modify the python program from Ex. 10.3 (Subscribe, Process the message (raw IoT data) and Publish again to output queue).
- Change the logic of lab_callback method
- Get the message and decode
message = json.loads(body.decode())
- Inside
if
statement- Check for message['dev_location'] is found in indoor than set
output_routing_key = "indoor."+"+str(method.routing_key)
elseoutput_routing_key = "outdoor."+"+str(method.routing_key)
- Check for message['dev_location'] is found in indoor than set
- Than dump the json document to send
parsed_message_string = json.dumps(message)
- Send back the result
ch.basic_publish(exchange = exchange, routing_key = output_routing_key, body = parsed_message_string)
.
- Send back the result
- Let's change the rest of the head code in the program:
- Get the message and decode
queue_name = "tag_queue" # indoor channel.queue_declare(queue ="indoor_queue", durable = True) channel.queue_bind(exchange = exchange, queue ="indoor_queue", routing_key = "indoor.#") # Outdoor channel.queue_declare(queue = "outdoor_queue", durable = True) channel.queue_bind(exchange = exchange, queue ="outdoor_queue", routing_key="outdoor.#")
- Now run the program
Take a screen shot of the Queues in RabbitMQ admin interface.
Consume the outdoor_queue and process the message to find anamoloy:
Here, Write subcriber program to find anomaly of sensor data and display the anmolous data in the terminal.
- Consder two sensors dist and air_Temp_float.
- Consider outdoor_queue as input queue.
- The fields can abe accessed from the message as message['message']['dist']
- The dist > 70 considered as anomaly and air_Temp_float > 25.
- The final output to be displayed in the terminal should be like this "The IoT device puhatu_b1 has anomaly data"
Take a screen shot of the anomaly data displayed in the terminal.
Exercise 10.5. Distributed system testing
Let's put all the previous components of Puhatu IoT data usecase together
- Run 10.3 Publish Python program to generate the data (generate for every second)
- Generated data routing key pattern:
puhatu.last_name.raw
- Generated data routing key pattern:
- Run 10.3 Python program subscribe and tag the data (Run two processes in parallel)
- Tagged data routing key pattern:
tag.puhatu.last_name.raw
- Tagged data routing key pattern:
- Run 10.4 to consume the tagged data and filter out and indoor device (Run two processes in parallel)
- Filtered data routing pattern:
indoor.tag.puhatu.last_name.raw
andoutdoor.tag.puhatu.last_name.raw
- Filtered data routing pattern:
- Run the anomaly data identification from 10.4 for outdoor devices.
- Output should be displayed in the terminal
Although we currently use the same computer to run all of these programs, it doesn't really matter which computer they run on, they work exactly the same way. In order to increase the speed of data processing, we can scale the number of processes created in Exercises 10.3 and 10.4. Take screenshots of the final result. Try to make sure that all the started processes (and their outputs) are visible.
Deliverables:
- Terminate your VM Instances
- Screenshots from the following tasks :
- Screenshost from 10.1
- From 10.2 - 2 screenshots and code(producer and consumer).
- From 10.3 - 2 screenshots and codes.
- From 10.4 - 2 screenshots and codes.
- From 10.5- 1 screen shot