Teine praktikum - Teadete edastus hajussüsteemides
Selle praktikumi eesmärk on anda ülevaade kuidas luua hajussüsteeme, mis kasutavad teadete edastust ja teadete järjekordasid. Selleks kasutame RabbitMQ maaklerit (broker), et luua sõnumite või tööde järjekorasid ning kolm väikest Python programmi, mis kõik töötavad iseseisvalt ning suhtlevad omavahel läbi maakleri.
Viited
- RabbitMQ Python teegi, Pika dokumentatsioon: https://pika.readthedocs.io/en/latest/intro.html
- Pika näited: https://pika.readthedocs.io/en/latest/examples.html
Probleemide korral kontrollige:
- Sõnumid
#praktikum-2
Zulip teemas. - Võimalikud probleemid ja nende potentiaalsed lahendused juhendi lõpus.
- Küsi
#praktikum-2
Zulip teemas.
Ülesanne 2.1: RabbitMQ ligipääs
Selles ülesandes kasutame RabbitMQ administreerimise liidest, et hallata RabbitMQ olemeid ning läbi proovide lihtsamad andmete saatmise ja kuulamise sammud. See liides aitab meil ka hästi uurida, kas andmed jõuavad RabbitMQ Maaklerisse kohale ning, mis on hetkel aktiivsed järjekorrad, ühendused ja seosed.
Kasutame õppejõu poolt üles pandud RabbitMQ Docker konteinereid, mis on üle interneti kätte saadavad. Aga kui soovite ise Dockerit üles sead siis kasutage järgmise käsku:
[ Dockeri käsk: ]
docker run -d --hostname <LAST_NAME>-rabbit -p 5672:5672 -p 80:15672 -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=CUSTOM_PASSWORD --name <LAST_NAME>rabbit rabbitmq:3-management
Tegevused:
- Logi sisse RabbitMQ administreerimise liidesesse: http://172.17.66.250:156XX/
- Täpse pordi väärtuse, kasutajanime ja parooli saab õppejõu käest. Igal tudengil on erinev keskkond.
- See server on kätte saadav ainult Ülikooli võrgust.
- Kontrollige, et kasutate Delta majas Eduroam võrku, või VPN'i.
- UT public võrk ei ole piisav.
- Eduroam võrk väljaspoolt Delta maja ei pruugi olla piisav.
- Ülikooli VPN õpetus: https://wiki.ut.ee/pages/viewpage.action?pageId=17105590
- Kontrollige, et kasutate Delta majas Eduroam võrku, või VPN'i.
- Tutvuge RabbitMQ erinevate vahekaartidega, näiteks:
- Vahenduskeskonnad (Exchanges): Seal näeb olemasolevaid Vahenduskeskondasid. Andmete maakler võtab vastu sõnumeid andmete publitseerijatelt. Rakendused/kasutajad saavad määratleda ja järjekordade vahelisi seoseid, mis määravad, millised andmed millistesse järjekordadesse edastatakse.
- Järjekorrad (Queues): see näitab järjekordi, mis on RabbitMQ serveris juba loodud. Andmete tellijad saavad Järjekordade kaudu andmeid kuulata.
- Ühendused (Connections): See näitab RabbitMQ serveriga loodud aktiivseid ühendusi. Ühendused luuakse Kliendi RabbitMQ vahel ühenduse loomiseks.
- Kanalid (Channels): Kõigi kanalite nimekiri klientide ja RabbitMQ serveri vahel. Kanalid luuakse Kliendi ja Järjekorra vahel andmete aktiivseks edastamiseks.
- Klientrakenduse ja RabbitMQ vahel on tavaliselt üks ühendus aga mitu kanalit (virtuaalset ühendust).
- Loo uus RabbitMQ exchange:
- Pane selle nimeks oma perekonnanimi. Ei tohiks sisaldada täpi- või spetsiaalseid tähti.
- Type: topic
- Publitseeri käsitsi läbi administreerimise liidese uus sõnum:
- Routing key:
delta.tudeng.oma_perekonnanimi.temperatuur
- Payload:
{"temperature": "24"}
- Routing key:
- Loo uus järjekord (Queue):
- Nimi: oma_perekonnanimi_queue
- Loo uus Binding:
- From exchange: oma (varasemalt loodud) Exchange nimi
- Routing key:
delta.tudeng.oma_perekonnanimi.*
- Publitseeri käsitsi läbi administreerimise liidese uuesti sama sõnum oma loodud Exchange sisse, mis enne.
- NB! ära tee seda Queue lehe kaudu, vaid Echanges lehe kaudu
- Vaata sõnumit järjekorras:
- Queues -> Get Messages
- Võta uuesti sõnum järjekorrast.
- Miks see järjekorras edasi on?
- Võta sõnum uuesti, aga seekord seadista:
- Ack mode: Automatic Ack.
- Tee ekraanivaade olukorrast kus on näha, et järjekorrast õnnestus andmeid kätte saada
Peale seda, veelkord proovides järjekorrast andmeid kätte saada, ei tohiks enam sõnumit järjekorras olla, kui sa vahepeal mitut ei lisanud.
Lisategevus:
- Uuri Admin vaate funktsionaalsust. Selle kaudu saab:
- luua uusi kasutajaid ja neile paroole ning õigusi seadistada
- Luua Virtuaalseid host'e, ehk virtuaalselt eraldatud rabbitMQ keskondasid, kus igas on eraldi exhcange'd, järjekorrad, jne. Mis on kasulik, erinevate rakenduste eraldamiseks üksteisest.
Ülesanne 2.2: Sõnumite publitseerimine
Loome Python programmi, mis publitseerib meie poolt genereeritud sõnumeid RabbitMQ serverisse.
- Loome uue Python programmi (Sama moodi nagu eelmises praktikumis. Näiteks PyCharm IDE kasutades)
- Ettevalmistus:
- Impordime pika teegi, ja teised mida selles ülesandes kasutame:
import pika import random import datetime import json
- Impordime pika teegi, ja teised mida selles ülesandes kasutame:
- Paneme paika ühenduse detailid:
- Maakleri asukoht:
broker_host = 172.17.66.250
- Maakleri port:
broker_port = 56XX
(täpse pordi saate küsida õppejõu käest)
- Maakleri asukoht:
- Autentimise seadistamine
- Kasutame varasemalt saadud kasutajanime ja parooli ning loome Pika
PlainCredentials
objekti: username = "..." password = "..." credentials = pika.PlainCredentials(username, password)
- Kasutame varasemalt saadud kasutajanime ja parooli ning loome Pika
- Ühenduse loomine
connection = pika.BlockingConnection( pika.ConnectionParameters(host=broker_host, port=broker_port, credentials=credentials)) channel = connection.channel()
- BlockingConnection on blokkeeriv, ehk sünkroonne ühendus RabbitMQ'ga. See tähendab seda, et meie programm jääb ootama, kuni saab kinnituse, et saadetud andmed on kirjutatud RabbitMQ Exchange sisse.
- RabbitMQ pika teek toetab ka asünkroonset suhtlust, SelectConnection ühenduse tüübi kaudu.
- channel() loob uue kanali, mille kaudu saame RabbitMQ maaklerisse andmeid saata ning kuulata.
- Defineerimine sõnumi Python sõnaraamatu (dictionary) andmestructuurina:
message = { "device_name": "jakovits_sensor", "temperature": random.randint(20, 40), "time": str(datetime.datetime.now()) }
- NB! asendage seadme nimes õppejõu nimi oma perekonnanimega (vältige täpi- ja spetsiaalseid tähti ja märke)
- konverteerimie sõnumi string väärtueks, kasutades json teeki:
message_str = json.dumps(message)
- Andmete publitseerimine maaklerisse
- Marsruutimise võtme seadistamine:
my_routing_key = iotdevice.perekonnanimi.tempsensor
- Asenda perekonnanimi enda perekonnanimega
channel.basic_publish( exchange=exchange, routing_key=routing_key, body=message_str)
- Marsruutimise võtme seadistamine:
- Programmi lõpus paneme ühendused kinni:
channel.close() connection.close()
Iseseisev ülesanne:
- Pane programm saatma genereeritud sünteetilisi sensori sõnumeid iga 5 sekundi tagant, kuni programmi töö katkestatakse.
- Soovitused:
- Saab teha tsükkli, mis jääb püsivalt sõnumeid saatma ning mis magab sõnumite saatmise vahel piisavalt pikalt
- Tsükkli sisse on mõistlik jätta ainult sõnumi genereerimine ning publitseerimine (
channel.basic_publish(...)
)
NB! Salvesta ülesande lahendus eraldi Python skriptina.
Ülesanne 2.3: Sõnumite kuulamine
Loome eraldi Python programmi, mis kuulab sõnumeid RabbitMQ serverist, ning prindib need välja.
Selle ja eelneva ülesande tulemuse illustratsioon:
- Tehke koopia eelmisest programmist (eraldi Python filina)
- Jätame alles kõik kuni
channel
objekti loomiseni. - Loome järjekorra
- Järjekorra nimeks paneme iot_temperatures
channel.queue_declare(queue=queue_name, durable=True)
- Durable jätab andmed püsivalt järjekorda, ka siis kui kuulajat hetkel ei ole.
- Järjekorra nimeks paneme iot_temperatures
- Paneme paika marsruutimise võtme
- Kuulame kõiki temperatuuri sensoreid iotdevice all, mitte ainult endanimelist:
routing_key = "iotdevice.*.tempsensor"
- NB! Kuna tudengitel on kõigil eraldi RabbitMQ serverid, siis üksteise andmeid te tegelikult siin ei näe.
- Loome seose järjekorra ning Exchange vahel
channel.queue_bind(exchange=exchange, queue=queue_name, routing_key=routing_key )
- Loome eraldi Python funktsiooni, mis hiljem kutsutakse välja iga sõnumi korral:
- Funktsioon peaks välja printima saabunud sõnumi sisu ning marsruutimise võtme.
def lab_callback(ch, method, properties, body): print("Saabunud sõnum: %r" % body.decode()) ch.basic_ack(delivery_tag=method.delivery_tag)
- Seadistame rakenduse kuulama RabbitMQ järjekorda, ning käivitama meie lab_callback funktsiooni iga saabuva sõnumi peal.
channel.basic_consume(queue=queue_name, on_message_callback=lab_callback) channel.start_consuming()
NB! Salvesta ülesande lahendus eraldi Python skriptina.
Iseseisev ülesanne:
- Pane kaks koopiat sellest programmist käima, genereeri andmeid ning uuri, kas andmed jagatakse nende vahel ära.
- See on tulemus mida soovime, aga kui on vajadus, et kaks programmi saaksid oma koopia kõikidest sõnumitest, siis tuleks iga programmi jaoks seada üles oma järjekord ning seos selle ja marsruutimise võtme vahel (meie seda praegu siin ei tee).
Ülesanne 2.4: Sõnumite töötlus ja marsruutimine
Loome uue Python rakenduse, mis kuulab sõnumeid sisend-järjekorrast, käivitab kasutaja poolt defineeritud funktsiooni ning publitseerib tulemused teistesse väljund-järjekordadesse. kasutame seda funktsiooni selleks, et filtreerida ning eraldi ruutida andmed siis, kui oleks vaja tõsta alarm (näiteks liiga suure temperatuuri väärtuse korral).
- Võtame aluseks ülesande 2.3 koodi, teeme sellest koopia uue programmina.
- Modifitseerime
lab_callback
funktsiooni (mida kutsutakse välja iga sõnumi korral), et saata uus sõnum juhul kui temperatuuri väärtus on üle 30.- Seadistame väljund marsruutimise võtmeks
alarm.routing_key
. Näiteks:alarm.iotdevice.jakovits.tempsensor
output_routing_key = "alarm."+ str(method.routing_key)
- Kontrollime kas temperatuuri väärtus saabunud sõnumis on suurem kui 30:
message = json.loads(body.decode()) if(message['temperature'] > 30):
- loome uue Alarm sõnumi (
if
bloki sees), mis sialdab originaalset sõnumit ning ka alarmi tüüpi:alarm_message = { "message": message, "alarm_type": "High Temperature" } alarm_message_string = json.dumps(alarm_message)
- Tulemuste saatmine tagasi RabitMQ maaklerisse
if
bloki sees)ch.basic_publish(exchange = exchange , routing_key=output_routing_key, body=alarm_message_string) print("Saadetud alarm! Sõnum: %r" % body.decode())
- Seadistame väljund marsruutimise võtmeks
Iseseisev ülesanne:
- Genereerige andmeid Ülesande 2.2 programmiga, ning testige, et alarme saadetakse (prinditakse välja, jõuavad järjekorda - saate vaadata RabbitMQ administreerimise liideses)
- RabbitMQ administreerimise liideses saab vajadusel seoseid (binding) ja järjekordasid kustutada, kui nendega midagi paigast ära läheb
NB! Salvesta ülesande lahendus eraldi Python skriptina.
Ülesanne 2.5: Hajusüsteemi testimine
Paneme nüüd kõik eelnevad komponendid koos tööle.
- Pane tööle üks ülesande 2.2 Python programm andmeid genereerima
- Genereeritud andmete marsruutimise võtmete muster:
iotdevice.perekonna_nimi.tempsensor
- Genereeritud andmete marsruutimise võtmete muster:
- Pane tööle kolm protsessi ülesandest 2.4 paralleelselt andmeid töötlema
- Filtreeritud andmete marsruutimise võtmete muster:
alarm.iotdevice.perekonna_nimi.tempsensor
- Filtreeritud andmete marsruutimise võtmete muster:
- Pane tööle ülesande üks 2.3 programm, mis prindib välja välja alarme, mis publitseeritakse ülesande 2.4 protsesside poolt.
- Programmi peab muutma nii et:
- Tehaks uus alarmide järjekord:
perekonnanimi_alarm_queue
- Järjekorra marsruutimise peaks nüüd olema:
alarm.#
(ehk kuulame kõiki alarme)
- Tehaks uus alarmide järjekord:
- Ehk nüüd paneme selle programmi nüüd kuulama neid andmeid, mida publitseerib ülesande 2.4 programm, kui alarmid tekivad.
- NB! Programmis peab muutma ära nii järjekorra nime kui ka routing_key!
- Muidu võib juhtuda, et kogemata hakatakse alarme saatma ka iot_temperatures järjekorda (kui programm loob bindingu valesse järjekorda)
- Programmi peab muutma nii et:
Kuigi me praegu kasutame sama arvutit, et kõiki neid programme jookustada, siis reaalselt ei ole vahet millises arvutis need käima pannakse, need töötaksid täpselt sama moodi. Selleks, et suurendada andmete töötlemise kiirust, saame skaleerida Ülesandes 2.4 loodud protsesside arvu.
- Tee ekraanivaade lõplikust väljundist, kus on näha, et andmetöötluse protsesside tulemused jõuavad kohale alertidena.
Lahenduse esitamine
Praktikumi lahendusena tuleb esitada:
- Kood ja näidis väljund programmidest, mis on loodud ülesannetes 2.2, 2.3, 2.4
- Ekraanivaated Ülesannetest 2.1 ja 2.5.
- Failid tuleks kokku pakkida üheks Zip failiks enne üles laadimist.
- Vältige
env
,.env
kaustade lisamist!
- Vältige
Võimalikud probleemid ja nende potentiaalsed lahendused.
- Kui ei saa rabbitMQ administreerimise liidesele ligi
- Kontrollige, et administreerimise liidesele kasutate porti, mis on viie kohaline ning algab numbriga 1 (Näiteks: 15699)
- Kui ei ole saanud ligipääsu detaile, võtke õppejõuga ühendusi Zulip'i kaudu.
- Kontrollige, et kasutate Delta majas Eduroam võrku, või VPN'i.
- Ülikooli VPN õpetus: https://wiki.ut.ee/pages/viewpage.action?pageId=17105590
- Võtke ühendust õppejõuga.
- Kui ei saa ühendusti RabbitMQ Maakleriga Python programmi seest.
- Kontrollige, et kasutate porti mis on nelja kohaline ning algab numbriga 5 (näiteks 5699).
- Kui saate vea brauseris: cant connect to host 172.17.6X.YYY: No route to host
- Check you are using Eduroam Wifi in Delta, or VPN from elsewhere.
- IF you have Linux on your computer AND have Docker installed:
- Then there is a possibility of network address conflict between docker network and university network that VPN uses. It is likely that packets you try to send to some of the 172.17.* networks are sent to your local docker network instead.
- One solution would be to reconfigure what networks your local Docker uses. The steps are:
- Create a directory in the virtual machine in the path:
sudo mkdir /etc/docker
- Create a file in the docker directory:
sudo nano /etc/docker/daemon.json
- Copy the following entry into the file:
{ "default-address-pools": [{"base":"172.80.0.0/16","size":24}] }
- Restart docker service using:
sudo systemctl restart docker
- Create a directory in the virtual machine in the path:
- IF you have Windows and have Docker installed:
- Add
"default-address-pools": [{ "base":"172.80.0.0/16","size":24 }]
to the Docker Engine configuration.- You will find it inside the Docker Desktop app settings:
Settings -> Docker Engine
- You will find it inside the Docker Desktop app settings:
- This will reconfigure what networks the Docker engine will use/create.
- Add