Arvutiteaduse instituut
  1. Kursused
  2. 2023/24 kevad
  3. Pilvetehnoloogia (LTAT.06.008)
EN
Logi sisse

Pilvetehnoloogia 2023/24 kevad

  • Main
  • Lectures
  • Practicals
    • Plagiarism Policy
  • Results
  • Submit Homework

Practice 14 - Redesigning the Message-Board Application using open-source tools.

In this lab, you will learn two different open-source databases such as JSON document database(MongoDB) and blob storage(MinIO). Further, We deploy and execute time-consuming tasks in an asynchronous and resilient manner using distributed task queues such as Celery and RabbitMQ. Using these tools, you will re-design the Message Board application in a cloud-native way.

References

  • Celery: https://docs.celeryq.dev/en/stable/index.html
  • RabbitMQ: https://www.rabbitmq.com/
  • Flower: https://flower.readthedocs.io/en/latest/
  • MongoDB: https://www.mongodb.com/
  • MinIO: https://min.io/

Exercise 14.1 Setting the Prerequisites

  • Create a VM in OpenStack Cloud with
    • Choose Instance Snapshot Ubuntu22+Docker
    • Enbale Delete the Volume on Instance Delete
    • Flavour: m2.tiny
    • Volume Size (GB): 20GB
    • Add security group lab142024 along with default.
  • SSH to the VM and copy the Message Board application code submitted in the deliverable of Practice Session 9-RabbitMQ
    • You can use your own approach to modify the code in the remote machine.
    • Using Visual Studio Code makes easier to edit, modify the code in the remote machine.
      • To setup Visual Studio Code in Windows, Mac and Linux
      • Connecting(SSH) to remote machine in Visual Studio Code and accesing the files.
  • Modify app.py
    • Remove the rabbitmq(pika) imports.
    • Remove the publish_rabbitmq definition and invocation in handle_message().
    • Similarly update the requirements.txt

Exercise 14.2 Working with Documents (JSON) based database: MongoDB

In this task, you will replace Azure COSMOS database with the MongoDB in your message board application. MongoDB stores data records as BSON documents. BSON is a binary representation of JSON documents.

  • Run MongoDB using docker container
    • Run in detached mode
    • Assign the name mongodb
    • Volume mount(-v) -v ${HOME}/mongo-data:/data/db
      • Create a directory mongo-data
    • Port forwarding 27017:27017
    • Enviorment variables to set username and password MONGODB_INITDB_ROOT_USERNAME=sample-db-user, MONGODB_INITDB_ROOT_PASSWORD=sample-password
    • Image mongo:latest
  • Modify app.py to insert JSON documents (Messages)
    • Delete azure cosmos imports and all the variables related to cosmos.
    • We will use PyMongo pip library for this task. Import pymongo and create a mongo client, for example here.
      • MongoDB client mongo_client=MongoClient("localhost", 27017).
      • Create a db db = mongo_client.flask_db.
      • Create a document collection messages=db.messages
    • Now, write a method to insert the documents similar to insert_cosmos().
      • Basically, you can rename the method.
      • Arguments remain the same.
      • You can use messages.insert_one(new_message) to insert document to MongoDB.
    • Similarly, modify the method read_cosmos to read the documents from MongoDB.
      • Get the latest 10 documents (messages). For this, you can use a similar to this.
        • Firstly, use find to get the records
        • Use sort to sort by id or sort([('timestamp', 1)])
        • Limit to 10 records using limit
        • Get them in a Python list using list(db.messages...).
  • Test the message board application.
    • Make sure you modified the following in app.py.
      • You need to comment out insert_blob(img_path) method invocation invocation in handle_message() and its declared variables and imports.
      • The blob_path arguement could be img_path for now in insert_mongo().
    • Call the method insert_mongo instead of insert_cosmos under handle_message().
    • Create a virtual environment and activate it.
    • Install pip requirements
    • Run the application and insert some messages.
  • You can connect to MongoDB shell
    • Exec to the mongodb container with bash
    • In mongodb shell, type mongosh
    • Write the queries to check your data. Some of query examples here.
      • List databases
      • Use your db
      • List the documents under messages collection
      • Query the list of documents with certain JOSN key equals. For example with username="Shiva".

Screenshot1: Take a screenshot of the terminal (mongodb shell) showing the output of the queries.

Exercise 14.3 Working with blob storage : MinIO

In this task, you will replace Azure Storage Blob service with the MinIO in your message board application. MinIO is a high-performance, S3 compatible object store. It is built for large scale data lake and database workloads.

  • Running a MinIO database (Single node) in a Docker container
    • Run in detached mode
    • Ports 9000:9000 and 9001:9001
    • Enviornment vars MINIO_ROOT_USER=username and MINIO_ROOT_PASSWORD=password
    • Volume mount ${HOME}/minio/data:/data
      • PS! make sure to create ${HOME}/minio/data directories
    • Image and starting command quay.io/minio/minio server /data --console-address ":9001"
  • You can access the MinIO web interface using http://VM_IP:9001
    • Login using your username and password.
  • Create a two buckets rawimages and processedimages
    • Administrator-->Buckets-->Create Bucket
  • Create Access Keys needed to work with MinIO storage using Python SDK.
    • Go to User-->Access Keys-->Create Access Keys
      • Set Expiry date of your own, Provide a name
      • Further click on Download for import to save the keys.
  • You can also access the MinIO through minio shell client mc
    • Exec to minio container and use mc client as below.
      • Provide your VM IP, accessKey, secretKey docker exec -it container_name mc alias set minio http://VM_IP:9000 accessKey secretKey
      • List the buckets mc ls minio. It should list your two buckets.
      • Set anonymous access policies to download the images from both the buckets using mc anonymous set download minio/rawimages and similarly for processedimages
  • Add minio to requirements.txt and install the pip requirements.
  • Now, let us modify app.py to store images in MinIO buckets. Here, We will use MinIO pip library and you can refer to guide here for information on Python SDK.
    • Delete all the imports of azure.storage and its declared variabels.
    • Import MinIO library
    • Declare MinIO client as minio_client as described in the guide.
      • Replace "play.min.io" with VM_IP:9000
      • Replace access_key and secret_key values. PS! Dont put the values directly, you may access through enviorment variables.
      • Add one more arguement secure=False
    • Modify the method insert_blob(img_path)
      • Keep filename variable
      • Use minio_client.fput_object to upload the image to rawimages bucket.(You can refer to the guide for the complete statement)
        • bucket_name should be "rawimages"
        • destination_file should be to filename
          • Should have only filename, for example sample.png without any path or so.
        • source_file should be img_path.
    • Updates in handle_message()
      • blob_path value should be 'http://VM_IP:9000/'+"rawimages"+"/"+img_path.split('/')[-1]
      • Use blob_path as a arguement in insert_mongo().
  • Run the Flask application and insert some messages.
  • You should be able to see the images under rawimages bucket in MinIO.

Screenshot2: Take a screenshot of the MinIO web page showing the list of two buckets. (Make sure your IP is visible)

Exercise 14.4 Distributed task queue management system with Celery

In Practice Session 9 - RabbitMQ, we used Twilio API to dispatch SMS notifications for messages tagged as 'urgent'. We designed a RabbitMQ consumer to handle SMS dispatching asynchronously, preventing delays in the message board application. However, developers must keep eye on consumers and processing tasks to check the status. To enhance resilience and streamline such operations, Celery will come in place.

[ Click to see Celery description ]

Celery is a simple, flexible, and reliable distributed system to process vast amounts of messages, while providing operations with the tools required to maintain such a system.

Celery employs task queues to distribute work across threads or machines. Tasks, units of work, are input into these queues. Dedicated worker processes continuously monitor these queues for new tasks. Celery communicates via messages, usually mediated by a broker (such RabbitMQ) between clients and workers. Clients add messages to the queue to initiate tasks, which are then delivered to workers by the broker.

Celery configuration requires two endpoints: CELERY_BROKER and CELERY_RESULT_BACKEND. CELERY_BROKER serves as the endpoint for the message broker, while CELERY_RESULT_BACKEND stores the endpoint for the celery task's response. In our scenario, we use RabbitMQ as both the message broker and the storage for the task's response.

  • Setup RabbitMQ broker using docker container
    • Keep name as rabbitmq
    • Run in deattached mode
    • Ports 15672:15672 and 5672:5672
    • Enviornment variables RABBITMQ_DEFAULT_USER=rabbit_user and RABBITMQ_DEFAULT_PASS=rabbit_password
    • Image as rabbitmq:3.9.13-management
  • Getting ready to use celery in app.py. A sample guide is here, to start using celery in python.
    • Import the celery pip library
    • Define CELERY_BROKER variable with value 'pyamqp://rabbit_user:rabbit_password@VM_IP//'. Similarly CELERY_RESULT_BACKEND = 'rpc://rabbit_user:rabbit_password@VM_IP//'.
      • PS! Change the VM_IP
    • Declare celery client that takes two arguements, celery=Celery(broker=CELERY_BROKER ,backend=CELERY_RESULT_BACKEND).
  • Let us run a Celery worker
    • Open another terminal, move to project directory and activate virtual enviorment.
    • Add celery as a pip requirements and install it through pip install.
    • Let us install watchdog sudo apt install python3-watchdog.
      • This, we use to check the change in the events in the application files, so that celery worker can adopt to the changes. We dont need to run the worker every time when change happens in app.py.
    • Now run the celery worker watchmedo auto-restart --directory=./ --pattern=*.py --recursive -- celery -A app.celery worker --loglevel=info
      • Watchdog will check for changes in .py files present in the directory path --directory=./. This mainly checks for changes in app.py
      • celery command will run the worker initiating the tasks defined app.py using app.celery.
      • Finally, print all the logs in to the terminal.
    • After running the command, you should see the worker running as below. (PS! Keep this terminal open and run this command)
  • Let us create a celery tasks in app.py. A sample guide is here, to start using celery.
    • Move to app.py
    • Let us write a simple Test() celery task
      • Define a celery task as @celery.task(bind=True).
        • This should be decorated to a Test() method.
      • Write a method Test(self, message) that takes two arguements. Here, self provides us task information such as task_id and other task status parameters.
        • This method will print the message.
        • Method should have a return statement, that returns {"success": "ok", "Task Id": self.request.id}
      • The code looks like
  • To invoke as celery task, it should be Test.delay(self,message), you can add this statement in handle_message() method.
    • .delay() is a “Calling API” used by celery task instances.
  • Save app.py and now you should see the changes in celery worker terminal showing updates from app.py.
  • Now run the message board application and insert a message.
  • You should see the logs displayed in the worker showing Test() celery task execution as below:
  • You can also check the messages in the RabbitMQ dashboard
    • Move to http://VM_IP:15672 and use your RabbitMQ username, and password.
    • Go to Queues section-->Open the queue, that has some messages-->Get message. You should see the result of the celery task, because we used RabbitMQ as the result backend (CELERY_RESULT_BACKEND).

It's troublesome activity to monitor celery tasks and workers, if the application is complex. So the Flower tool comes in place. A flower is a web-based tool for monitoring and administrating Celery clusters. It has a UI that monitors all the workers on celery. It gives clear statistics on the active tasks, processed tasks showing whether the task was successful or not, and also tells the load average of the tasks.

  • Setup Flower as a docker container.
    • Run in detached mode
    • Ports 5556:5555
    • Environment vars
      • RabbitMQ values: AMQP_USERNAME=rabbit_user,AMQP_PASSWORD=rabbit_password,AMQP_ADMIN_USERNAME=rabbit_user, AMQP_ADMIN_PASSWORD=rabbit_password, AMQP_HOST=VM_IP, AMQP_PORT=5672,AMQP_ADMIN_HOST=VM_IP,AMQP_ADMIN_PORT=15672.
      • Flower access credentials with username and password : FLOWER_BASIC_AUTH=username:password.
        • Set user name as user
        • Set your Pseudonym as password.
    • Image as gregsi/latest-celery-flower-docker:latest
  • Access Flower dashboard at http://VM_IP:5556 with username and password.
    • You can explore by clicking on Tasks, Broker
    • In the main dashboard, you can click worker celery@... and see the details about the worker.

Screenshot3: Take a screenshot of the Flower web page showing the list of Tasks (Make sure your IP is visible)

Exercise 14.5 DIY task: Designing celery tasks for Message Board application.

In this task, you will design two celery tasks. You have do this by yourself based on the knowledge gained in the previous exercises and Practice Session 9.

Task 14.5.1: Sending SMS notification using Twilio API:
  • Declare celery task with the method send_twilio_sms for sending SMS using Twilio API.
    • Remember that we implemented this scenario in RabbitMQ consumer task in PS-9 Exercise 9.4
    • Use the knowledge and write a logic to send SMS as similar to created in PS-9 Exercise 9.4.
    • Should have return statement as return {"success": "ok", "Task Id": self.request.id}
  • Invoke send_twilio_sms(self,message) in handle_message(), if messagetype == "urgent"

Screenshot4: Take a screenshot of the celery worker terminal showing the execution output.

Task 14.5.2: Image resize
  • Declare celery task with the method resize, that has two arguements self and blob_path. The function logic would be:
    • Download the image (blob) using minio_client.fget_object("rawimages", blob_name, filename)
      • blob_name can get from blob_path. (Its Source filename)
      • filename can be same as blob_name. (filename to be saved in the disk)
    • Image resize logic
      • You can use Pillow - a pip package for image-related functions such as resize. One such example is here
      • Make thumbnail of size (50,50)
    • Upload the blob with the filename to different bucket using minio_client.fput_object("processedimages", filename, filename,)
    • Should have return statement as return {"success": "ok", "Task Id": self.request.id}
  • Invoke the function in handle_message method, whenever a new message arrives. This should be added after insert_blob().

Screenshot5: Take a screenshot of the celery worker terminal showing the execution output.

Screenshot6: Take a screenshot of the Flower web page showing the Tasks dashboard.

Exercise 14.6 Handling celery task operations.

In this task, you will work to handle the celery tasks based on the requirements, for example, assigning deadlines or countdown, priorities, and other operations.

  • Assigning deadlines: In earlier exercises, you have invoked Celery tasks using delay(*args, **kwargs), however it doesn't support execution operations such as countdown. This can be achieved using apply_async(args[, kwargs[, …]])
    • We will use apply_async to execute the send_twilio_sms in 10 seconds from submission, using send_twilio_sms.apply_async(new_message,countdown=10)
    • Modify and save app.py
    • Test the application and see the celery worker logs.
    • Similarly, assign expiry to the task send_twilio_sms.apply_async(new_message,countdown=5,expires=10)
  • Assigning priorities: Let us create one more worker that handles the high priory tasks using the tag -Q celery,priority.high
    • In another terminal run the command watchmedo auto-restart --directory=./ --pattern=*.py --recursive -- celery -A app.celery worker -n w2@lab14 -Q celery,priority.high --loglevel=info
      • Make sure to active Venv and move to the application directory
    • Let us assign priority to the task send_twilio_sms.apply_async(new_message,queue='priority.high')
    • Save and add some messages in the web application.
    • You should see the logs of the celery worker executing the priority task.

Screenshot7: Take a screenshot of the celery worker executing the priority task.

The following step is optional:

  • Linking multiple tasks: Celery supports linking (or chaining) tasks together so that one task follows another. Linking takes another (callback) task as an argument, which will be called when the initial task finishes. The result (return value) of the initial task will be provided as an argument to the callback function. For example - this can be uiseful when you want to add a callback function that is executed after the initial task finishes.
    • For example, we have Test() celery task that accepts a message as an argument and prints out the message. Lets invoke Test() task and link to send_twilio_sms().
    • This can be achieved using Test.apply_async(new_message, link=send_twilio_sms).
    • To test whether this works, replace the previous send_twilio_sms() invocation in the handle_message() with the above statement.
    • Save and add some messages in the web application.
    • You should see the Flower web page, executing the task.

Run the following services in background mode in your VM using nohup as described in Practice 1.

  • This is needed during evaluation (the services should stray running even after you log out of the instance).
    1. Message board Python flask application
    2. Celery's first worker created in 14.4
    3. Celery's second worker created in 14.6

PS! Don't delete the VM and keep services running inside the VM.

Deliverables:

  1. PS! VM with all the services should stay running.
  2. Screenshots from exercises: 14.2, 14.3, 14.4, 14.5(3), 14.6
  3. Link (Address including IP and port) to your Flask application
  4. Message board application source code
    • DO NOT include any Python virtual environment folders/files (e.g., .env folder content)
Lahenduste esitamiseks peate olema sisse loginud ja kursusele registreerunud.
  • Arvutiteaduse instituut
  • Loodus- ja täppisteaduste valdkond
  • Tartu Ülikool
Tehniliste probleemide või küsimuste korral kirjuta:

Kursuse sisu ja korralduslike küsimustega pöörduge kursuse korraldajate poole.
Õppematerjalide varalised autoriõigused kuuluvad Tartu Ülikoolile. Õppematerjalide kasutamine on lubatud autoriõiguse seaduses ettenähtud teose vaba kasutamise eesmärkidel ja tingimustel. Õppematerjalide kasutamisel on kasutaja kohustatud viitama õppematerjalide autorile.
Õppematerjalide kasutamine muudel eesmärkidel on lubatud ainult Tartu Ülikooli eelneval kirjalikul nõusolekul.
Courses’i keskkonna kasutustingimused