Practice 10 - Introduction data pipelines using Apache NiFi
In this Practice session you will work with Apache NiFi. You will learn how to set up NiFi in cloud, how to create NiFi pipelines and how to use it to manage data streams.
This lab's content is related to the RADON Horizon 2020 EU project, the project's goal is unlocking the benefits of serverless FaaS for the European software industry using TOSCA language and developed tools.
- G. Casale, M. Artac, W.-J. van den Heuvel, A. van Hoorn, P. Jakovits, F. Leymann, M. Long, V. Papanikolaou, D. Presenza, A. Russo, S. N. Srirama, D.A. Tamburri, M. Wurster, L. Zhu RADON: rational decomposition and orchestration for serverless computing. SICS Software-Intensive Cyber-Physical Systems. 2019 Jan 1-11. https://doi.org/10.1007/s00450-019-00413-w
References
- Apache NiFi In Depth https://nifi.apache.org/docs/nifi-docs/html/nifi-in-depth.html#flowfile-repository
- Apache NiFi documentation http://nifi.apache.org/docs.html
Exercise 12.1. Installation of Apache NiFi!
In this task we will use OpenStack to run Apache NiFi inside an instance using Docker.
Create an OpenStack VM instance:
- Source: Instead of an Image, this time we will use Instance Snapshot, choose
CC Ubuntu 18.04 with Docker 20.10.6
- in this Ubuntu-based snapshot, the installation of Docker (as we did in Lab 2) has already been done for us.
- Flavour: should be
m2.tiny
Run Apache NiFi using Docker
- Create a Docker container with the NiFi image from Docker Hub using the following command line command:
docker run --name nifi \ -p 8080:8080 \ -p 8081:8081 \ -d \ -e NIFI_WEB_HTTP_PORT='8080' \ apache/nifi:latest
- NiFi will run inside the container and the NiFi web interface will be made available on port 8080.
- To access NiFi web interface, you can direct your browser to the following address
<IP of Nifi Instance>:8080/nifi
, after replacing the IP with the OpenStack instance IP. - The web interface looks something like this:
- Description of the main Web interface elements:
- Description of the main control elements:
- Now you are ready to start creating NiFi pipelines
Exercise 12.2. Generate flow files and send to local directory
In this task, we will create a simple pipeline which generates FlowFiles with random content and stores them as files into the filesystem. We will use two NiFi processors: GenerateFlowFile and PutFile.
- GenerateFlowFile : This processor creates FlowFiles with random data or custom content. It is useful for load testing, configuration, and simulation.
- PutFile : This Processor can used to store incoming FlowFiles into user configured folder in the local filesystem.
Lets create both of these processors and connect them:
- Add GenerateFlowFile processor to the NiFi canvas
- Drag and drop the NiFi processor icon (top left on the NiFi web interface) to the canvas. Nifi will display a table of available Processors.
- Type GenerateFlowFile in the search box.
- Double click on the processor to add to the canvas.
- Now GenerateFlowFile processor will be added to the main canvas.
- Double click on the newly added GenerateFlowFile Processor to get the configure processor window.
- Configure the Scheduling tab
- Schedule this processor to run in every
20 sec
. This allow us to limit the number of FlowFiles that are generated. - NB! Make sure that the Run schedule is not set to 0 sec, as this will make NiFi to schedule the Processor without limit and huge number of files will be generated at once.
- Scheduling Tab should now look like this:
- Schedule this processor to run in every
- Configure the Properties tab with the following info:
- File size:
10B
- Properties Tab should look like this:
- File size:
Lets now add the second NiFi processor: PutFile
- Configure the PutFile processor:
- In the Properties Tab set:
- Directory:
/tmp/NiFi_ex1/
- Directory:
- The Properties tab should look like this:
- In the Properties Tab set:
- In the Setting Tab:
- Setting Tab allows us to configure which outgoing relationships are not used, meaning where data is not sent and which outgoing pipes are Automatically Terminated.
- This is very important, as every processor can have many outgoing relationships and NiFi will not allow us to start Processors when it is not clear how every outgoing relationship is used.
- You will need to configure this for every Processor, setting any unused outgoing relationships as Automatically Terminated
- Configure this processor to mark Failure and Success relationships as Automatically Terminated.
- The Setting Tab should look like this:
- Establishing connection between two processors.
- Hover over the ''GenerateFlowFile'Processor and drag the appearing arrow over to the other processor to establish relationship between them.
- NiFi may sometimes ask, which outgoing relationship to use, but there will only be one option for the ''GenerateFlowFile'processor: success, so this time it will not ask:
- The resulting pipeline should look like this:
Lets now start the pipeline and verify that data is being generated and stored to the file system:
- Right click on the ''GenerateFlowFile' processor and select Start menu item.
- Similarly right click on the PutFile processor and select Start menu item.
- You verify the output of data pipeline in two ways:
- Through Data Provenance:
- For this, right click on PutFile processor.
- Select View data provenance menu item
- This will show the list of flow files that are handled by this processor.
- Click on the i button in the first column of each record.
- Goto CONTENT tab.
- Click on View button.
- The second way could be to login VM and exec inside container
docker exec -it container-name /bin/bash
and verifying the files in the given directoryls /tmp/NiFi_ex1/
- Through Data Provenance:
- Take a screenshot which displays the created pipelines (After starting them and testing them. PS! IP of the instance should be visible in the screenshot)
- Take a screenshot of the result (either through Data Provenance view, or by checking the output folder from the command line)
PS! To check issues related to NiFi Processors, you can hover your mouse over the Error icon:
Exercise 12.3. Creating NiFi templates
NiFi templates can be used to save NiFi pipelines as re-useable software artifacts that can be imported multiple times into NiFi canvas, downloaded as XML files, shared and uploaded into other NiFi deployments.
Lets create a NiFi template from the previous pipeline you created.
- Select the components you want to put into a template. In this case select all the components (GenerateFlowFile, 'PutFile, and the connection queue'').
- You can use shift-clicking to select multiple items.
- Right click on any selected component.
- Select ''Create template' option.
- Give a template name, and the description (optional).
- Now click on ''create' button followed by 'Ok' button.
Steps to download a template
- Click on the icon present in the right top corner
- Select Template option.
- Now find/select the template you want to download.
- In the last column, click on the download icon.
Importing template
- First it is required to upload the template
- Click on search Icon -> select the file -> click on Upload button
- Drag and Drop Template Icon to the canvas
- Select the template name from the drop down list.
- Now click on Add button.
- Make sure to save the downloaded template, this will be one of the lab submissions.
Exercise 12.4. More advanced NiFi pipeline
In this task we will create a NiFi pipeline which:
- Listens for incoming data by exposing a Web server
- Recognizes whether the message contains an ERROR string
- Periodically merges multiple messages into different files, based on whether they contained an error
- Finally, stores the file into your personal Owncloud account's storage.
This kind of pipeline could be useful in a scenario where some component (microservice) in our system is configured to send status updates to our pipeline, which then reorganizes and groups the messages before pushing the results to a 3rd service, where they are later reviewed or just stored for historical purposes.
12.4.1 Listening for HTTP requests, routing flows based on content
- Use Nifi ListenHTTP processor to listen for incoming HTTP requests.
- Add the processor to your pipeline and configure it:
- Listening port:
8081
- Base Path: loglistener
- Listening port:
- Add the processor to your pipeline and configure it:
Next, let's try to distinguish different kinds of incoming data with the RouteOnContent processor. Our goal is to have separate flows for data which contain the string "ERROR" and those which do not.
- Add a RouteOnContent processor to your pipeline
- Connect to the "success" flow from ListenHTTP to it.
- Configure it's properties:
- Change the Match Requirement to:
content must contain match
- Add a new Property:
- Key:
errortext
- Value:
ERROR
- Key:
- Change the Match Requirement to:
- The result should looks something like this:
- As a result, this processor now has a new outgoing relationship (flow) labelled
errortext
which we can connect to another processor, it also has another outgoing relationship "unmatched", for files which failed the Match Requirement.
Before proceeding, let's test that ListenHTTP is working properly. Start the ListenHTTP processor (but not RouteOnContent) and make some requests. To make requests, we can use curl command (explained below). The pipeline should look like this:
- Our NiFi ListenHTTP processor is listening on port 8081 , at /loglistener URL endpoint
- You use the following curl command:
curl --data "funnytext" INSTANCE_IP:8081/loglistener
(replace with your correct IP )- This will send a HTTP POST request to the server with the string "funnytext" as the data payload.
- As a result, you should notice that processors are processing FlowFiles, and you can use Processor data provenance to investigate whether correct flow files have been moving through the pipe.
12.4.2 Merging and Grouping the FlowFiles
RouteOnContent produces 2 outgoing flows:
- One for FlowFiles which contained the text ERROR - this flow is named errortext
- Another for those which don't match - this flow is named unmatched
Let's add another processor which will 1) group the FlowFiles based on the name of the flow (errortext or unmatched) and for each group, merge multiple FlowFiles into one single line by concatenating them.
- Add a MergeContent Processor.
- MergeContent merges a batch of several FlowFiles into a single FlowFile based on what merging strategy the user defines. The various options include grouping by certain attributes similar to the Reduce operation in MapReduce.
- Connect both the errortext and unmatched relationships from RouteOnContent to MergeContent
- Configure MergeContent:
Scheduling -> Run Schedule:
60 sec - The processor executes every 60s- Merge Strategy: Bin-Packing Algorithm
- The Flow Files are merged on a per-group (bin) basis.
- Correlation Attribute Name:
RouteOnContent.Route
- This tells the processor to create separate groups (bins), based on the
RouteOnContent.Route
attribute, the value of this is set by the RouteOnContent Processor, this is how we can create separate groups for "errortext" and "unmatched".
- This tells the processor to create separate groups (bins), based on the
- Merge Format use Binary Concatenation
- When merging, concatenates the FlowFile contents into a single FlowFile (instead of e.g. compressing into a ZIP)
- Maximum Number of Entries: 1000
- Determines max. no. of items per group so if you have a lot of HTTP requests, you may get several groups
- Maximum Bin Age: 300s - forces a merged flowfile to be output after a certain amount of time.
- To make the concatenation of the files use a newline (linebreak) character as the delimiter:
- Delimiter Strategy: Text
- Demarcator: Open the field and hit shift + return keys to enter a linebreak character.
12.4.3 Saving merged FlowFiles to OwnCloud
Owncloud (aka Nextcloud) is a cloud storage service (similar to Dropbox, Microsoft OneDrive, etc). University of Tartu hosts it's own instance of this open-source software. It is accessible at https://owncloud.ut.ee, every student's UT account is linked to UT's OwnCloud.
We will use OwnCloud's HTTP API to save the merged FlowFiles to our individual accounts Owncloud folders.
- Log in to https://owncloud.ut.ee
- Create a folder "NiFiLab", preferrably at the root of your Owncloud directories.
- Inside this folder, create 2 sub-folders:
errortext
andunmatched
- Inside this folder, create 2 sub-folders:
Next, to use OwnCloud's HTTP API, we need our account's WebDAV link. WebDAV is a HTTP-based protocol used by OwnCloud
- Get your WebDAV link from the OwnCloud web interface
- Expand the left sidebar menu, and at the bottom of the sidebar, open Settings.
- Find your WebDAV URL, which looks something like this:
- Add an InvokeHTTP processor your pipeline. Connect the "merged" outgoing flow of MergeContent to InvokeHttp.
- Configure InvokeHTTP to make a PUT request to OwnCloud API:
- HTTP Method: PUT
- Remote URL:
<<YOUR_NEXTCLOUD_WEBDAV_URL>>/NiFiLab/${RouteOnContent.Route}/${now()}.txt
- This will save the file into a folder based on the RouteOnContent.Route attribute (errortext/unmatched), the filename will be a timestamp (now()).
- Basic Authentication Username: your UT account username
- Basic Authentication Password: your UT account password
- Check your processors for any warnings, such as relationships which have not been terminated, make them automatically terminate.
The pipeline is now finished! It looks like this:
Start all of your processors and test it by sending HTTP requests (e.g. with curl as shown earlier) to the /loglistener
endpoint. You should see the processors handle tasks and also see the files being created in your owncloud folders.
- Generate a few HTTP requests that don't contain "ERROR" string in the message using curl
- Generate a few HTTP requests that contain string "ERROR" in the message using curl
- Verify that data is processed by the final InvokeHttp processor
- Verify that you can see files being created in your owncloud folders.
- Save the exercise solution as a template (make sure ALL elements are selected) and download the template. This will be one of the lab submissions.%
- Take a screenshot which displays the created pipeline (After starting them and testing them. PS! IP of the instance should be visible in the screenshot)
- Take a screenshots of your owncloud folder
- One showing the list of files (e.g. in errortext folder)
- One showing the contents of a single chosen file.
Deliverables:
- Screenshots from tasks 12.2 and 12.4
- Templates from tasks 12.3 and 12.4
- Please try importing your saved templates to the canvas by yourself before submitting to verify you did not miss anything when saved the template.
- Answer the following question:
- Which of the available NiFi processors looks most interesting or useful to you (which was NOT covered in this lab)
- Why do you find it interesting or useful?
- You can see the list of NiFi processors here: http://nifi.apache.org/docs.html
- Explain what is copy-on-write paradigm that NiFi Content Repository uses.
- Read about the NiFi content Repository here: https://nifi.apache.org/docs/nifi-docs/html/nifi-in-depth.html#content-repository
- Why is copy-on-write useful when dealing with large amount of streaming data?
- Which of the available NiFi processors looks most interesting or useful to you (which was NOT covered in this lab)
Troubleshooting
If a Processor is not behaving as expected (e.g. InvokeHTTP runs, but nothing appears in Owncloud), one way to get extra information is to check data provenance
- Right-click on Processor, choose data provenance.
- You will see a list of data the processor has handled, try clickin on the "i" icon to view details of a single item.
- Here you can find Attributes of the FlowFile, which may contain useful information such as the servers response in case of InvokeHTTP.