Practice 12 - Introduction data pipelines using Apache NiFi
In this Practice session you will work with Apache NiFi. You will learn how to set up NiFi in cloud, how to create NiFi pipelines and how to use it to manage data streams.
The lab content is related to the RADON Horizon 2020 EU project (http://radon-h2020.eu/), which goal is to unlocking the benefits of serverless FaaS for the European software industry using TOSCA language and developed tools.
- G. Casale, M. Artac, W.-J. van den Heuvel, A. van Hoorn, P. Jakovits, F. Leymann, M. Long, V. Papanikolaou, D. Presenza, A. Russo, S. N. Srirama, D.A. Tamburri, M. Wurster, L. Zhu RADON: rational decomposition and orchestration for serverless computing. SICS Software-Intensive Cyber-Physical Systems. 2019 Jan 1-11. https://doi.org/10.1007/s00450-019-00413-w
References
- Apache NiFi In Depth https://nifi.apache.org/docs/nifi-docs/html/nifi-in-depth.html#flowfile-repository
- Apache NiFi documentation http://nifi.apache.org/docs.html
Exercise 12.1. Installation of Apache NiFi!
In this task we will use OpenStack to run Apache NiFi inside an instance using Docker.
- Create an OpenStack instance
- Flavour should be
m2.tiny
- Flavour should be
- Install Docker on the instance, just like we did in lab 3 Containers: Working with docker
- Make sure to follow all the steps of installing Docker in the guide, including pre-configuring the network addresses before installing Docker.
- Create a docker container with apache nifi image from docker hub using the following command line command:
sudo docker run --name nifi \ -p 8080:8080 \ -p 8081:8081 \ -d \ -e NIFI_WEB_HTTP_PORT='8080' \ apache/nifi:latest
- The NiFi will run inside the docker container and the NiFi 8080 web interface port will be made available through the instance.
- To access NiFi web interface, you can direct your browser to the following address
<IP of Nifi Instance>:8080/nifi
, afetr replacing the IP with the OpenStack instance IP. - The web interface should look like this:
- Description of the main Web interface elements:
- Description of the main control elements:
- Now you are ready to start creating NiFi pipelines
Exercise 12.2. Generate flow files and send to local directory
In this task, we will create a simple pipeline which generates FlowFiles with random content and stores them as files into the filesystem. We will use two NiFi processors: GenerateFlowFile and PutFile.
- GenerateFlowFile : This processor creates FlowFiles with random data or custom content. It is useful for load testing, configuration, and simulation.
- PutFile : This Processor can used to store incoming FlowFiles into user configured folder in the local filesystem.
Lets create both of these processors and connect them:
- Add GenerateFlowFile processor to the NiFi canvas
- Drag and drop the NiFi processor icon (top left on the NiFi web interface) to the canvas. Nifi will display a table of available Processors.
- Type GenerateFlowFile in the search box.
- Double click on the processor to add to the canvas.
- Now GenerateFlowFile processor will be added to the main canvas.
- Double click on the newly added GenerateFlowFile Processor to get the configure processor window.
- Configure the Scheduling tab
- Schedule this processor to run in every
20 sec
. This allow us to limit the number of FlowFiles that are generated. - NB! Make sure that the Run schedule is not set to 0 sec, as this will make NiFi to schedule the Processor without limit and huge number of files will be generated at once.
- Scheduling Tab should now look like this:
- Schedule this processor to run in every
- Configure the Properties tab with the following info:
- File size:
10B
- Properties Tab should look like this:
- File size:
Lets now add the second NiFi processor: PutFile
- Configure the PutFile processor:
- In the Properties Tab set:
- Directory:
/tmp/NiFi_ex1/
- Directory:
- The Properties tab should look like this:
- In the Properties Tab set:
- In the Setting Tab:
- Setting Tab allows us to configure which outgoing relationships are not used, meaning where data is not sent and which outgoing pipes are Automatically Terminated.
- This is very important, as every processor can have many outgoing relationships and NiFi will not allow us to start Processors when it is not clear how every outgoing relationship is used.
- You will need to configure this for every Processor, setting any unused outgoing relationships as Automatically Terminated
- Configure this processor to mark Failure and Success relationships as Automatically Terminated.
- The Setting Tab should look like this:
- Establishing connection between two processors.
- Hover over the ''GenerateFlowFile'Processor and drag the appearing arrow over to the other processor to establish relationship between them.
- NiFi may sometimes ask, which outgoing relationship to use, but there will only be one option for the ''GenerateFlowFile'processor: success, so this time it will not ask:
- The resulting pipeline should look like this:
Lets now start the pipeline and verify that data is being generated and stored to the file system:
- Right click on the ''GenerateFlowFile' processor and select Start menu item.
- Similarly right click on the PutFile processor and select Start menu item.
- You verify the output of data pipeline in two ways:
- Through Data Provenance:
- For this, right click on PutFile processor.
- Select View data provenance menu item
- This will show the list of flow files that are handled by this processor.
- Click on the i button in the first column of each record.
- Goto CONTENT tab.
- Click on View button.
- The second way could be to login VM and exec inside container
docker exec -it container-name /bin/bash
and verifying the files in the given directoryls /tmp/NiFi_ex1/
- Through Data Provenance:
- Take a screenshot which displays the created pipelines (After starting them and testing them. PS! IP of the instance should be visible in the screenshot)
- Take a screenshot of the result (either through Data Provenance view, or by checking the output folder from the command line)
PS! To check issues related to NiFi Processors, you can hover your mouse over the Error icon:
Exercise 12.3. Creating NiFi templates
NiFi templates can be used to save NiFi pipelines as re-useable software artifacts that can be imported multiple times into NiFi canvas, downloaded as XML files, shared and uploaded into other NiFi deployments.
Lets create a NiFi template from the previous pipeline you created.
- Select the components you want to put into a template. In this case select all the components (GenerateFlowFile, 'PutFile, and the connection queue'').
- Right click on any selected component.
- Select ''Create template' option.
- Give a template name, and the description (optional).
- Now click on ''create' button followed by 'Ok' button.
Steps to download a template
- Click on the icon present in the right top corner
- Select Template option.
- Now find/select the template you want to download.
- In the last column, click on the download icon.
Importing template
- First it is required to upload the template
- Click on search Icon -> select the file -> click on Upload button
- Drag and Drop Template Icon to the canvas
- Select the template name from the drop down list.
- Now click on Add button.
- Make sure to save the downloaded template, this will be one of the lab submissions.
Exercise 12.4. More advanced Nifi pipeline
In this task we are going to create two Nifi data pipelines that interact with an external service (MQTT broker) and exposes a Web server endpoint to listen for incoming data.
First pipeline listens to incoming REST POST requests and publishes the data into a MQTT data broker. Second pipeline listens to data incoming from the MQTT broker, recognizes whether the message contains an ERROR string and store the data into different output folders depending whether it contained the match or not.
The first pipeline listens to incoming data sent through REST into port 8081 and sends the data to MQTT broker:
- Install Mosquitto Mqtt server on the instance
sudo apt-get install mosquitto
sudo apt-get install mosquitto-clients
- Test that Mqtt server is working using two terminals inside the OpenStack instance
- In first terminal, subscribe to data from topic test:
mosquitto_sub -t restdata
- In the second terminal, publish data to topic test:
mosquitto_pub -h localhost -t restdata -m "Hello MQTT"
- You should see the message appearing in the first terminal.
- In first terminal, subscribe to data from topic test:
- Use Nifi ListenHTTP processor to listen for incoming REST POST requests.
- Listening port:
8081
- Listening port:
- Use nifi PublishMQTT processor to publish incoming data through POST requests into MQTT
- Broker URI:
tcp://INSTANCE_IP:1883
(NB! change the INSTANCE_IP to match the actual IP of your instance) - Retain Message:
False
- Topic:
restdata
- Client ID:
(you can freely pick a client id string)
- Quality of Service:
1
- Broker URI:
- Start the pipeline and test that it works
- To generate data into the REST interface behind port 8081, we can use curl command inside the instance.
- Our NiFi ListenHTTP processor is listening to port 8081 and /contentListener endpoint
- You can send data to Docker container port 8081 using the following curl command:
curl --data "funnytext" INSTANCE_IP:8081/contentListener
- This will send a REST POST request to the sever and submit the string "funnytext" as the data payload.
- Replace the instance ip with the correct IP (do NOT use localhost)
- As a result, you should notice that processors are processing FlowFiles, and you can use Processor data provenance to investigate whether correct flow files have been moving through the pipe.
- To generate data into the REST interface behind port 8081, we can use curl command inside the instance.
- The pipeline should look like this:
Now lets create another pipeline, which listens to the messages from the MQTT server.
- Use NiFi ConsumeMQTT processor to listen to MQTT topic
- Broker URI:
tcp://INSTANCE_IP:1883
(NB! change the INSTANCE_IP to match the actual IP of your instance) - Client ID:
(you can freely pick another client id string, but it should be different from the previous one)
- Topic Filter:
restdata
- Max Queue Size:
1000
- NB! change the Run Schedule value to 0.1 sec (otherwise the processor will use too much CPU)
- Broker URI:
- Route data coming from MQTT based on content using RouteOnContent processor
- Lets create a route rule for FlowFiles that contain the string:
ERROR
- Change the Match Requirement to: content must contain match
- Add a new Property:
- Key:
errortext
- Value:
ERROR
- Key:
- The result should looks something like this:
- As a result, this processor now has a new outgoing relationship labelled
errortext
which we can connect to another processor
- Lets create a route rule for FlowFiles that contain the string:
- Create a new PutFile processor that receives data from errortext outgoing relationship of RouteOnContent. We will write incoming FlowFiles into into /tmp/errodata folder.
- Directory:
/tmp/errodata
- Directory:
- Create a new PutFile processor that receives data from unmatched outgoing relationship of RouteOnContent. This processor will receive all data that did not match any user provided routing rules. We will write incoming FlowFiles into /tmp/datafolder.
- Directory:
/tmp/data
- Directory:
- The result should look something like this:
- Start the pipelines, verify that the data is being received and processed by both pipelines.
- Generate data that does not contain "ERROR" string in the message using curl into the MQTT broker (just like you did at the start of the exercise)
- Generate data that contains string "ERROR" in the message using curl
- Verify that data is processed by both of the final PutFile processors
- Save the exercise solution as a template (make sure ALL elements are selected) and download the template. This will be one of the lab submissions.
- Take a screenshot which displays the two created pipelines (After starting them and testing them. PS! IP of the instance should be visible in the screenshot)
Deliverables:
- Screenshots from tasks 12.2 and 12.4
- Templates from tasks 12.3 and 12.4
- Answer the following question:
- Which of the available NiFi processors looks most interesting or useful to you (which was NOT covered in this lab)
- Why do you find it interesting or useful?
- You can see the list of NiFi processors here: http://nifi.apache.org/docs.html
- Explain what is copy-on-write paradigm that NiFi Content Repository uses.
- Read about the NiFi content Repository here: https://nifi.apache.org/docs/nifi-docs/html/nifi-in-depth.html#content-repository
- Why is copy-on-write useful when dealing with large amount of streaming data?
- Which of the available NiFi processors looks most interesting or useful to you (which was NOT covered in this lab)