Practice 14 - Redesigning the Message-Board Application using open-source tools.
In this lab, you will learn two different open-source databases such as JSON document database(MongoDB) and blob storage(MinIO). Further, We deploy and execute time-consuming tasks in an asynchronous and resilient manner using distributed task queues such as Celery and RabbitMQ. Using these tools, you will re-design the Message Board application in a cloud-native way.
References
- Celery: https://docs.celeryq.dev/en/stable/index.html
- RabbitMQ: https://www.rabbitmq.com/
- Flower: https://flower.readthedocs.io/en/latest/
- MongoDB: https://www.mongodb.com/
- MinIO: https://min.io/
Exercise 14.1 Setting the Prerequisites
- Create a VM in OpenStack Cloud with
- Choose Instance Snapshot
Ubuntu22+Docker
- Enbale Delete the Volume on Instance Delete
- Flavour:
m2.tiny
- Volume Size (GB): 20GB
- Add security group
lab142024
along withdefault
.
- Choose Instance Snapshot
- SSH to the VM and copy the Message Board application code submitted in the deliverable of Practice Session 9-RabbitMQ
- You can use your own approach to modify the code in the remote machine.
- Using Visual Studio Code makes easier to edit, modify the code in the remote machine.
- Modify
app.py
- Remove the rabbitmq(pika) imports.
- Remove the publish_rabbitmq definition and invocation in handle_message().
- Similarly update the
requirements.txt
Exercise 14.2 Working with Documents (JSON) based database: MongoDB
In this task, you will replace Azure COSMOS database with the MongoDB in your message board application. MongoDB stores data records as BSON documents. BSON is a binary representation of JSON documents.
- Run MongoDB using docker container
- Run in detached mode
- Assign the name
mongodb
- Volume mount(-v)
-v ${HOME}/mongo-data:/data/db
- Create a directory
mongo-data
- Create a directory
- Port forwarding
27017:27017
- Enviorment variables to set username and password
MONGODB_INITDB_ROOT_USERNAME=sample-db-user
,MONGODB_INITDB_ROOT_PASSWORD=sample-password
- Image
mongo:latest
- Modify
app.py
to insert JSON documents (Messages)- Delete azure cosmos imports and all the variables related to cosmos.
- We will use PyMongo pip library for this task. Import pymongo and create a mongo client, for example here.
- MongoDB client
mongo_client=MongoClient("localhost", 27017)
. - Create a db
db = mongo_client.flask_db
. - Create a document collection
messages=db.messages
- MongoDB client
- Now, write a method to insert the documents similar to insert_cosmos().
- Basically, you can rename the method.
- Arguments remain the same.
- You can use
messages.insert_one(new_message)
to insert document to MongoDB.
- Similarly, modify the method
read_cosmos
to read the documents from MongoDB.- Get the latest 10 documents (messages). For this, you can use a similar to this.
- Firstly, use
find
to get the records - Use
sort
to sort byid
orsort([('timestamp', 1)])
- Limit to 10 records using
limit
- Get them in a Python list using list(db.messages...).
- Firstly, use
- Get the latest 10 documents (messages). For this, you can use a similar to this.
- Test the message board application.
- Make sure you modified the following in
app.py
.- You need to comment out
insert_blob(img_path)
method invocation invocation in handle_message() and its declared variables and imports. - The
blob_path
arguement could beimg_path
for now in insert_mongo().
- You need to comment out
- Call the method
insert_mongo
instead of insert_cosmos underhandle_message()
. - Create a virtual environment and activate it.
- Install pip requirements
- Run the application and insert some messages.
- Make sure you modified the following in
- You can connect to MongoDB shell
- Exec to the mongodb container with bash
- In mongodb shell, type
mongosh
- Write the queries to check your data. Some of query examples here.
- List databases
- Use your db
- List the documents under
messages
collection - Query the list of documents with certain JOSN key equals. For example with username="Shiva".
Screenshot1: Take a screenshot of the terminal (mongodb shell) showing the output of the queries.
Exercise 14.3 Working with blob storage : MinIO
In this task, you will replace Azure Storage Blob service with the MinIO in your message board application. MinIO is a high-performance, S3 compatible object store. It is built for large scale data lake and database workloads.
- Running a MinIO database (Single node) in a Docker container
- Run in detached mode
- Ports
9000:9000
and9001:9001
- Enviornment vars
MINIO_ROOT_USER=username
andMINIO_ROOT_PASSWORD=password
- Volume mount
${HOME}/minio/data:/data
- PS! make sure to create
${HOME}/minio/data
directories
- PS! make sure to create
- Image and starting command
quay.io/minio/minio server /data --console-address ":9001"
- You can access the MinIO web interface using http://VM_IP:9001
- Login using your username and password.
- Create a two buckets
rawimages
andprocessedimages
- Administrator-->Buckets-->Create Bucket
- Create Access Keys needed to work with MinIO storage using Python SDK.
- Go to User-->Access Keys-->Create Access Keys
- Set Expiry date of your own, Provide a name
- Further click on Download for import to save the keys.
- Go to User-->Access Keys-->Create Access Keys
- You can also access the MinIO through minio shell client mc
- Exec to minio container and use mc client as below.
- Provide your VM IP, accessKey, secretKey
docker exec -it container_name mc alias set minio http://VM_IP:9000 accessKey secretKey
- List the buckets
mc ls minio
. It should list your two buckets. - Set anonymous access policies to download the images from both the buckets using
mc anonymous set download minio/rawimages
and similarly forprocessedimages
- Provide your VM IP, accessKey, secretKey
- Exec to minio container and use mc client as below.
- Add
minio
to requirements.txt and install the pip requirements. - Now, let us modify
app.py
to store images in MinIO buckets. Here, We will useMinIO
pip library and you can refer to guide here for information on Python SDK.- Delete all the imports of azure.storage and its declared variabels.
- Import MinIO library
- Declare MinIO client as
minio_client
as described in the guide.- Replace
"play.min.io"
withVM_IP:9000
- Replace access_key and secret_key values. PS! Dont put the values directly, you may access through enviorment variables.
- Add one more arguement
secure=False
- Replace
- Modify the method
insert_blob(img_path)
- Keep filename variable
- Use
minio_client.fput_object
to upload the image torawimages
bucket.(You can refer to the guide for the complete statement)- bucket_name should be
"rawimages"
- destination_file should be to
filename
- Should have only filename, for example
sample.png
without any path or so.
- Should have only filename, for example
- source_file should be
img_path
.
- bucket_name should be
- Updates in handle_message()
blob_path
value should be'http://VM_IP:9000/'+"rawimages"+"/"+img_path.split('/')[-1]
- Use blob_path as a arguement in insert_mongo().
- Run the Flask application and insert some messages.
- You should be able to see the images under rawimages bucket in MinIO.
Screenshot2: Take a screenshot of the MinIO web page showing the list of two buckets. (Make sure your IP is visible)
Exercise 14.4 Distributed task queue management system with Celery
In Practice Session 9 - RabbitMQ, we used Twilio API to dispatch SMS notifications for messages tagged as 'urgent'. We designed a RabbitMQ consumer to handle SMS dispatching asynchronously, preventing delays in the message board application. However, developers must keep eye on consumers and processing tasks to check the status. To enhance resilience and streamline such operations, Celery will come in place.
[ Click to see Celery description ]
Celery is a simple, flexible, and reliable distributed system to process vast amounts of messages, while providing operations with the tools required to maintain such a system.
Celery employs task queues to distribute work across threads or machines. Tasks, units of work, are input into these queues. Dedicated worker processes continuously monitor these queues for new tasks. Celery communicates via messages, usually mediated by a broker (such RabbitMQ) between clients and workers. Clients add messages to the queue to initiate tasks, which are then delivered to workers by the broker.
Celery configuration requires two endpoints: CELERY_BROKER and CELERY_RESULT_BACKEND. CELERY_BROKER serves as the endpoint for the message broker, while CELERY_RESULT_BACKEND stores the endpoint for the celery task's response. In our scenario, we use RabbitMQ as both the message broker and the storage for the task's response.
- Setup RabbitMQ broker using docker container
- Keep name as
rabbitmq
- Run in deattached mode
- Ports
15672:15672
and5672:5672
- Enviornment variables
RABBITMQ_DEFAULT_USER=rabbit_user
andRABBITMQ_DEFAULT_PASS=rabbit_password
- Image as
rabbitmq:3.9.13-management
- Keep name as
- Getting ready to use celery in
app.py
. A sample guide is here, to start using celery in python.- Import the celery pip library
- Define CELERY_BROKER variable with value
'pyamqp://rabbit_user:rabbit_password@VM_IP//'
. SimilarlyCELERY_RESULT_BACKEND = 'rpc://rabbit_user:rabbit_password@VM_IP//'
.- PS! Change the VM_IP
- Declare celery client that takes two arguements,
celery=Celery(broker=CELERY_BROKER ,backend=CELERY_RESULT_BACKEND)
.
- Let us run a Celery worker
- Open another terminal, move to project directory and activate virtual enviorment.
- Add
celery
as a pip requirements and install it through pip install. - Let us install watchdog
sudo apt install python3-watchdog
.- This, we use to check the change in the events in the application files, so that celery worker can adopt to the changes. We dont need to run the worker every time when change happens in
app.py
.
- This, we use to check the change in the events in the application files, so that celery worker can adopt to the changes. We dont need to run the worker every time when change happens in
- Now run the celery worker
watchmedo auto-restart --directory=./ --pattern=*.py --recursive -- celery -A app.celery worker --loglevel=info
- Watchdog will check for changes in .py files present in the directory path
--directory=./
. This mainly checks for changes inapp.py
- celery command will run the worker initiating the tasks defined
app.py
using app.celery. - Finally, print all the logs in to the terminal.
- Watchdog will check for changes in .py files present in the directory path
- After running the command, you should see the worker running as below. (PS! Keep this terminal open and run this command)
- Let us create a celery tasks in
app.py
. A sample guide is here, to start using celery.- Move to
app.py
- Let us write a simple
Test()
celery task- Define a celery task as
@celery.task(bind=True)
.- This should be decorated to a
Test()
method.
- This should be decorated to a
- Write a method
Test(self, message)
that takes two arguements. Here,self
provides us task information such as task_id and other task status parameters.- This method will print the message.
- Method should have a return statement, that returns
{"success": "ok", "Task Id": self.request.id}
- The code looks like
- Define a celery task as
- Move to
- To invoke as celery task, it should be
Test.delay(self,message)
, you can add this statement in handle_message() method..delay()
is a “Calling API” used by celery task instances.
- Save
app.py
and now you should see the changes in celery worker terminal showing updates fromapp.py
.
- To invoke as celery task, it should be
- Now run the message board application and insert a message.
- You should see the logs displayed in the worker showing
Test()
celery task execution as below:
- You can also check the messages in the RabbitMQ dashboard
- Move to http://VM_IP:15672 and use your RabbitMQ username, and password.
- Go to Queues section-->Open the queue, that has some messages-->Get message. You should see the result of the celery task, because we used RabbitMQ as the result backend (CELERY_RESULT_BACKEND).
It's troublesome activity to monitor celery tasks and workers, if the application is complex. So the Flower tool comes in place. A flower is a web-based tool for monitoring and administrating Celery clusters. It has a UI that monitors all the workers on celery. It gives clear statistics on the active tasks, processed tasks showing whether the task was successful or not, and also tells the load average of the tasks.
- Setup Flower as a docker container.
- Run in detached mode
- Ports
5556:5555
- Environment vars
- RabbitMQ values:
AMQP_USERNAME=rabbit_user
,AMQP_PASSWORD=rabbit_password
,AMQP_ADMIN_USERNAME=rabbit_user
,AMQP_ADMIN_PASSWORD=rabbit_password
,AMQP_HOST=VM_IP
,AMQP_PORT=5672
,AMQP_ADMIN_HOST=VM_IP
,AMQP_ADMIN_PORT=15672
. - Flower access credentials with username and password :
FLOWER_BASIC_AUTH=username:password
.- Set user name as
user
- Set your Pseudonym as password.
- Set user name as
- RabbitMQ values:
- Image as
gregsi/latest-celery-flower-docker:latest
- Access Flower dashboard at
http://VM_IP:5556
with username and password.- You can explore by clicking on Tasks, Broker
- In the main dashboard, you can click worker
celery@...
and see the details about the worker.
Screenshot3: Take a screenshot of the Flower web page showing the list of Tasks (Make sure your IP is visible)
Exercise 14.5 DIY task: Designing celery tasks for Message Board application.
In this task, you will design two celery tasks. You have do this by yourself based on the knowledge gained in the previous exercises and Practice Session 9.
Task 14.5.1: Sending SMS notification using Twilio API:
- Declare celery task with the method
send_twilio_sms
for sending SMS using Twilio API.- Remember that we implemented this scenario in RabbitMQ consumer task in PS-9 Exercise 9.4
- Use the knowledge and write a logic to send SMS as similar to created in PS-9 Exercise 9.4.
- Should have return statement as
return {"success": "ok", "Task Id": self.request.id}
- Invoke
send_twilio_sms(self,message)
in handle_message(), ifmessagetype == "urgent"
Screenshot4: Take a screenshot of the celery worker terminal showing the execution output.
Task 14.5.2: Image resize
- Declare celery task with the method
resize
, that has two arguementsself
andblob_path
. The function logic would be:- Download the image (blob) using
minio_client.fget_object("rawimages", blob_name, filename)
- blob_name can get from blob_path. (Its Source filename)
- filename can be same as blob_name. (filename to be saved in the disk)
- Image resize logic
- You can use Pillow - a pip package for image-related functions such as resize. One such example is here
- Make thumbnail of size (50,50)
- Upload the blob with the filename to different bucket using
minio_client.fput_object("processedimages", filename, filename,)
- Should have return statement as
return {"success": "ok", "Task Id": self.request.id}
- Download the image (blob) using
- Invoke the function in
handle_message
method, whenever a new message arrives. This should be added after insert_blob().
Screenshot5: Take a screenshot of the celery worker terminal showing the execution output.
Screenshot6: Take a screenshot of the Flower web page showing the Tasks dashboard.
Exercise 14.6 Handling celery task operations.
In this task, you will work to handle the celery tasks based on the requirements, for example, assigning deadlines or countdown, priorities, and other operations.
- Assigning deadlines: In earlier exercises, you have invoked Celery tasks using
delay(*args, **kwargs)
, however it doesn't support execution operations such as countdown. This can be achieved usingapply_async(args[, kwargs[, …]])
- We will use
apply_async
to execute thesend_twilio_sms
in 10 seconds from submission, usingsend_twilio_sms.apply_async(new_message,countdown=10)
- Modify and save
app.py
- Test the application and see the celery worker logs.
- Similarly, assign expiry to the task
send_twilio_sms.apply_async(new_message,countdown=5,expires=10)
- We will use
- Assigning priorities: Let us create one more worker that handles the high priory tasks using the tag
-Q celery,priority.high
- In another terminal run the command
watchmedo auto-restart --directory=./ --pattern=*.py --recursive -- celery -A app.celery worker -n w2@lab14 -Q celery,priority.high --loglevel=info
- Make sure to active Venv and move to the application directory
- Let us assign priority to the task
send_twilio_sms.apply_async(new_message,queue='priority.high')
- Save and add some messages in the web application.
- You should see the logs of the celery worker executing the priority task.
- In another terminal run the command
Screenshot7: Take a screenshot of the celery worker executing the priority task.
The following step is optional:
- Linking multiple tasks: Celery supports linking (or chaining) tasks together so that one task follows another. Linking takes another (callback) task as an argument, which will be called when the initial task finishes. The result (return value) of the initial task will be provided as an argument to the callback function. For example - this can be uiseful when you want to add a callback function that is executed after the initial task finishes.
- For example, we have
Test()
celery task that accepts a message as an argument and prints out the message. Lets invoke Test() task and link to send_twilio_sms(). - This can be achieved using
Test.apply_async(new_message, link=send_twilio_sms)
. - To test whether this works, replace the previous send_twilio_sms() invocation in the handle_message() with the above statement.
- Save and add some messages in the web application.
- You should see the Flower web page, executing the task.
- For example, we have
Run the following services in background mode in your VM using nohup
as described in Practice 1.
- This is needed during evaluation (the services should stray running even after you log out of the instance).
- Message board Python flask application
- Celery's first worker created in 14.4
- Celery's second worker created in 14.6
PS! Don't delete the VM and keep services running inside the VM.
Deliverables:
- PS! VM with all the services should stay running.
- Screenshots from exercises: 14.2, 14.3, 14.4, 14.5(3), 14.6
- Link (Address including IP and port) to your Flask application
- Message board application source code
- DO NOT include any Python virtual environment folders/files (e.g.,
.env
folder content)
- DO NOT include any Python virtual environment folders/files (e.g.,