Teine praktikum - Teadete edastus hajussüsteemides
Selle praktikumi eesmärk on anda ülevaade kuidas luua hajussüsteeme, mis kasutavad teadete edastust ja teadete järjekordasid. Selleks kasutame RabbitMQ maaklerit (broker), et luua sõnumite või tööde järjekorasid ning kolm väikest Python programmi, mis kõik töötavad iseseisvalt ning suhtlevad omavahel läbi maakleri.
Viited
- RabbitMQ Python teegi, Pika dokumentatsioon: https://pika.readthedocs.io/en/latest/intro.html
- Pika näited: https://pika.readthedocs.io/en/latest/examples.html
Probleemide korral kontrollige:
- Pinnitud sõnumid
#praktikum-2-teated
Slacki kanalis. - Võimalikud probleemid ja nende potentiaalsed lahendused juhendi lõpus.
- Küsi
#praktikum-2-teated
Slacki kanalist.
Ülesanne 2.1: RabbitMQ ligipääs
Selles ülesandes kasutame RabbitMQ administreerimise liidest, et hallata RabbitMQ olemeid ning läbi proovide lihtsamad andmete saatmise ja kuulamise sammud. See liides aitab meil ka hästi uurida, kas andmed jõuavad Maaklerisse kohale ning, mis on hetkel aktiivsed järjekorrad, ühendused ja seosed.
- Logi sisse RabbitMQ administreerimise liidesesse: http://172.17.67.66:156XX/
- Täpse pordi väärtuse, kasutajanime ja parooli saab õppejõu käest. Igal tudengil on erinev keskkond.
- See server on kätte saadav ainult Ülikooli võrgust.
- Kontrollige, et kasutate Delta majas Eduroam võrku, või VPN'i.
- UT public võrk ei ole piisav.
- Eduroam võrk väljaspoolt Delta maja ei pruugi olla piisav.
- Ülikooli VPN õpetus: https://wiki.ut.ee/pages/viewpage.action?pageId=17105590
- Kontrollige, et kasutate Delta majas Eduroam võrku, või VPN'i.
- Loo uus RabbitMQ exchange:
- Pane selle nimeks oma perekonnanimi. Ei tohiks sisaldada täpi- või spetsiaalseid tähti.
- Type: topic
- Publitseeri käsitsi läbi administreerimise liidese uus sõnum:
- Routing key: delta.tudeng.oma_prekonnanimi.temperatuur
- Payload:
{"temperature": "24"}
- Loo uus järjekord (Queue):
- Nimi: oma_perekonnanimi_queue
- Loo uus Binding:
- From exchange: oma (varasemalt loodud) Exchange nimi
- Routing key: delta.tudeng.oma_prekonnanimi.*
- Publitseeri käsitsi läbi administreerimise liidese uuesti sama sõnum oma loodud Exchange sisse, mis enne.
- NB! ära tee seda Queue lehe kaudu, vaid Echanges lehe kaudu
- Vaata sõnumit järjekorras:
- Queues -> Get Messages
- Võta uuesti sõnum järjekorrast.
- Miks see järjekorras edasi on?
- Võta sõnum uuesti, aga seekord seadista:
- Ack mode: Automatic Ack.
- Tee ekraanivaade olukorrast kus on näha, et järjekorrast õnnestus andmeid kätte saada
Peale seda, veelkord proovides järjekorrast andmeid kätte saada, ei tohiks enam sõnumit järjekorras olla, kui sa vahepeal mitut ei lisanud.
Lisategevus:
- Uuri Admin vaate funktsionaalsust. Selle kaudu saab:
- luua uusi kasutajaid ja neile paroole ning õigusi seadistada
- Luua Virtuaalseid host'e, ehk virtuaalselt eraldatud rabbitMQ keskondasid, kus igas on eraldi exhcange'd, järjekorrad, jne. Mis on kasulik, erinevate rakenduste eraldamiseks üksteisest.
Ülesanne 2.2: Sõnumite publitseerimine
Loome Python programmi, mis publitseerib meie poolt genereeritud sõnumeid RabbitMQ serverisse.
- Loome uue Python programmi (Sama moodi nagu eelmises praktikumis. Näiteks PyCharm IDE kasutades)
- Ettevalmistus:
- Impordime pika teegi, ja teised mida selles ülesandes kasutame:
import pika import random import datetime import json
- Impordime pika teegi, ja teised mida selles ülesandes kasutame:
- Paneme paika ühenduse detailid:
- Maakleri asukoht:
broker_host = 172.17.67.66
- Maakleri port:
broker_port = 56XX
(täpse pordi saate küsida õppejõu käest)
- Maakleri asukoht:
- Autentimise seadistamine
- Kasutame varasemalt saadud kasutajanime ja parooli ning loome Pika PlainCredentials objekti:
username = "..." password = "..." credentials = pika.PlainCredentials(username, password)
- Ühenduse loomine
connection = pika.BlockingConnection( pika.ConnectionParameters(host=broker_host, port=broker_port, credentials=credentials)) channel = connection.channel()
- BlockingConnection on blokkeeriv, ehk sünkroonne ühendus RabbitMQ'ga. See tähendab seda, et meie programm jääb ootama, kuni saab kinnituse, et saadetud andmed on kirjutatud RabbitMQ Exchange sisse.
- RabbitMQ pika teek toetab ka asünkroonset suhtlust, SelectConnection ühenduse tüübi kaudu.
- channel() loob uue kanali, mille kaudu saame RabbitMQ maaklerisse andmeid saata ning kuulata.
- Defineerimine sõnumi Python sõnaraamatu (dictionary) andmestructuurina:
message = { "device_name": "jakovits_sensor", "temperature": random.randint(20, 40), "time": str(datetime.datetime.now()) }
- NB! asendage seadme nimes õppejõu nimi oma perekonnanimega (vältige täpi- ja spetsiaalseid tähti)
- konverteerimie sõnumi string väärtueks, kasutades json teeki:
message_str = json.dumps(message)
- Andmete publitseerimine maaklerisse
- Marsruutimise võtme seadistamine:
my_routing_key = iotdevice.perekonnanimi.tempsensor
channel.basic_publish( exchange=exchange, routing_key=routing_key, body=message_str)
- Marsruutimise võtme seadistamine:
- Programmi lõpus paneme ühendused kinni:
channel.close() connection.close()
Iseseisev ülesanne:
- Pane programm saatma genereeritud sünteetilisi sensori sõnumeid iga 5 sekundi tagant, kuni programmi töö katkestatakse.
- Soovitused:
- Saab teha tsükkli, mis jääb püsivalt sõnumeid saatma ning mis magab sõnumite saatmise vahel piisavalt pikalt
- Tsükkli sisse on mõistlik jätta ainult sõnumi genereerimine ning publitseerimine (
channel.basic_publish(...)
)
NB! Salvesta ülesande lahendus eraldi Python skriptina.
Ülesanne 2.3: Sõnumite kuulamine
Loome Python programmi, mis kuulab sõnumeid RabbitMQ serverist, ning prindib need välja.
- Tehke koopia eelmisest programmist (eraldi Python filina)
- Jätame alles kõik kuni
channel
objekti loomiseni. - Loome järjekorra
- Järjekorra nimeks paneme perekonnanimi_queue
- Näiteks
queue_name = "jakovits_queue"
- Näiteks
channel.queue_declare(queue=queue_name, durable=True)
- Durable järab andmed püsibalt järjekorda, ka siis kui kuulajat hetkel ei ole.
- Järjekorra nimeks paneme perekonnanimi_queue
- Paneme paika marsruutimise võtme
- Kuulame kõiki temperatuuri sensoreid iotdevice all, mitte ainult endanimelist:
routing_key = "iotdevice.*.tempsensor"
- NB! Kuna tudengitel on kõigil eraldi RabbitMQ serverid, siis üksteise andmeid te tegelikult siin ei näe.
- Loome seose järjekorra ning Exchange vahel
channel.queue_bind(exchange=exchange, queue=queue_name, routing_key=routing_key )
- Loome eraldi Python funktsiooni, mis hiljem kutsutakse välja iga sõnumi korral:
- Funktsioon peaks väla printima saabunud sõnumi sisu ning marsruutimise võtme.
def lab_callback(ch, method, properties, body): print("Saabunud sõnum: %r" % body.decode()) ch.basic_ack(delivery_tag=method.delivery_tag)
- Seadistame rakenduse kuulama RabbitMQ järjekorda, ning käivitama meie lab_callback funktsiooni iga saabuva sõnumi peal.
channel.basic_consume(queue=queue_name, on_message_callback=lab_callback) channel.start_consuming()@
NB! Salvesta ülesande lahendus eraldi Python skriptina.
Iseseisev ülesanne:
- Pane kaks koopiat sellest programmist käima, genereeri andmeid ning uuri, kas andmed jagatakse nende vahel ära.
- See on tulemus mida soovime, aga kui on vajadus, et kaks programmi saaksid oma koopia kõikidest sõnumitest, siis tuleks iga programmi jaoks seada üles oma järjekord ning seos selle ja marsruutimise võtme vahel (meie seda praegu siin ei tee).
Ülesanne 2.4: Sõnumite töötlus ja marsruutimine
Loome uue Python rakenduse, mis kuulab sõnumeid sisend-järjekorrast, käivitab kasutaja poolt defineeritud funktsiooni ning publitseerib tulemused teistesse väljund-järjekordadesse. kasutame seda funktsiooni selleks, et filtreerida ning eraldi ruutida andmed siis, kui oleks vaja tõsta alarm (näiteks liiga suure temperatuuri väärtuse korral).
- Võtame aluseks ülesande 2.3 koodi, teeme sellest koopia uue programmina.
- Modifitseerime funktsiooni (mida kutsutakse välja iga sõnumi korral), et saata uus sõnum juhul kui temperatuuri väärtus on üle 30.
- Seadistame väljund marsruutimise võtmeks
alarm.routing_key
. Näiteks:alarm.iotdevice.jakovits.tempsensor
output_routing_key = "alarm."+ str(method.routing_key)
- Kontrollime kas temperatuuri väärtus saabunud sõnumis on suurem kui 30:
message = json.loads(body.decode()) if(message['temperature'] > 30):
- loome uue Alarm sõnumi (
if
bloki sees), mis sialdab originaalset sõnumit ning ka alarmi tüüpi:alarm_message = { "message": message, "alarm_type": "High Temperature" } alarm_message_string = json.dumps(alarm_message)
- Tulemuste saatmine tagasi RabitMQ maaklerisse
if
bloki sees)ch.basic_publish(exchange = exchange , routing_key=output_routing_key, body=alarm_message_string) print("Saadetud alarm! Sõnum: %r" % body.decode())
- Seadistame väljund marsruutimise võtmeks
- Muudame ülejäänud pea koodi programmis:
- Hoolitseme selle eest, et luuakse uus väljund-järjekord, kuhu alarmid jõuavad püsivalt
output_routing_key = "alarm.#" output_queue_name = "jakovits_alarm_queue" channel.queue_declare(queue=queue_name, durable=True) channel.queue_bind(exchange=exchange, queue=output_queue_name, routing_key=output_routing_key)
- NB! asendage järjekorras õppejõu nimi oma perekonnanimega (vältige täpi- ja spetsiaalseid tähti)
- Hoolitseme selle eest, et luuakse uus väljund-järjekord, kuhu alarmid jõuavad püsivalt
Iseseisev ülesanne:
- Genereerige andmeid Ülesande 2.2 programmiga, ning testige, et alarme saadetakse (prinditakse välja, jõuavad järjekorda - saate vaadataRabbitMQ administreerimise liideses)
- RabbitMQ administreerimise liideses saab vajadusel seoseid (binding) ja järjekordasid kustutada, kui nendega midagi paigast ära läheb
NB! Salvesta ülesande lahendus eraldi Python skriptina.
Ülesanne 2.5: Hajusüsteemi testimine
Paneme nüüd kõik eelnevad komponendid koos tööle.
- Pane tööle üks ülesande 2.2 Python programm andmeid genereerima
- Genereeritud andmete marsruutimise võtmete muster:
iotdevice.perekonna_nimi.tempsensor
- Genereeritud andmete marsruutimise võtmete muster:
- Pane tööle kolm protsessi ülesandest 2.4 paralleelselt andmeid töötlema
- Filtreeritud andmete marsruutimise võtmete muster:
alarm.iotdevice.perekonna_nimi.tempsensor
- Filtreeritud andmete marsruutimise võtmete muster:
- Pane tööle ülesande üks 2.3 programm, mis prindib välja välja alarme, mis publitseeritakse ülesande 2.4 protsesside poolt.
- Programmi peaks muutma, et se nüüd kuulaks uut alarmide järjekorda, kuhu suunatakse andmed mille marsruutimise võti on:
alarm.#
- Ehk nüüd paneme selle programmi kuulame neid andmeid, mida publitseerib ülesande 2.4 programm, kui alarmid tekivad.
- Programmi peaks muutma, et se nüüd kuulaks uut alarmide järjekorda, kuhu suunatakse andmed mille marsruutimise võti on:
Kuigi me praegu kasutame sama arvutit, et kõiki neid programme jookustada, siis reaalselt ei ole vahet millises arvutis need käima pannakse, need töötaksid täpselt sama moodi. Selleks, et suurendada andmete töötlemise kiirust, saame skaleerida Ülesandes 2.4 loodud protsesside arvu.
- Tee ekraanivaade lõplikust väljundist, kus on näha, et kõigi kolme andmetöötluse protsessi tulemused jõuavad kohale.
Lahenduse esitamine
Praktikumi lahendusena tuleb esitada:
- Kood ja näidis väljund programmidest, mis on loodud ülesannetes 2.2, 2.3, 2.4
- Ekraanivaated Ülesannetest 2.1 ja 2.5.
- Failid tuleks kokku pakkida üheks Zip failiks enne üles laadimist.
Võimalikud probleemid ja nende potentiaalsed lahendused.
- Kui ei saa rabbitMQ administreerimise liidesele ligi
- Kontrollige, et administreerimise liidesele kasutate porti, mis on viie kohaline ning algab numbriga 1 (Näiteks: 15699)
- Kui ei ole saanud ligipääsu detaile, võtke õppejõuga ühendusi Slack kaudu.
- Kontrollige, et kasutate Delta majas Eduroam võrku, või VPN'i.
- Ülikooli VPN õpetus: https://wiki.ut.ee/pages/viewpage.action?pageId=17105590
- Võtke ühendust õppejõuga.
- Kui ei saa ühendusti RabbitMQ Maakleriga Python programmi seest.
- Kontrollige, et kasutate porti mis on nelja kohaline ning algab numbriga 5 (näiteks 5699).