Practice 9 - Introduction data pipelines using Apache NiFi
In this Practice session you will work with Apache NiFi. You will learn how to set up NiFi in cloud, how to create NiFi pipelines and how to use it to manage data streams.
This lab's content is related to the RADON Horizon 2020 EU project, the project's goal is unlocking the benefits of serverless FaaS for the European software industry using TOSCA language and developed tools.
- G. Casale, M. Artac, W.-J. van den Heuvel, A. van Hoorn, P. Jakovits, F. Leymann, M. Long, V. Papanikolaou, D. Presenza, A. Russo, S. N. Srirama, D.A. Tamburri, M. Wurster, L. Zhu RADON: rational decomposition and orchestration for serverless computing. SICS Software-Intensive Cyber-Physical Systems. 2019 Jan 1-11. https://doi.org/10.1007/s00450-019-00413-w
References
- Apache NiFi In Depth https://nifi.apache.org/docs/nifi-docs/html/nifi-in-depth.html#flowfile-repository
- Apache NiFi documentation http://nifi.apache.org/docs.html
Exercise 9.1. Installation of Apache NiFi!
In this task we will use OpenStack to run Apache NiFi inside an instance using Docker.
Create an OpenStack VM instance:
- Source: Instead of an Image, use Volume Snapshot, choose
Ubuntu 20 + Docker 20.10
- in this Ubuntu-based snapshot, the installation of Docker (as we did in Lab 2) has already been done for us.
- Flavour: should be
m2.tiny
Run Apache NiFi using Docker
- Create a Docker container with the NiFi image from Docker Hub using the following command line command:
docker run --name nifi \ -p 8080:8080 \ -d \ -e NIFI_WEB_HTTP_PORT='8080' \ apache/nifi:latest
- NiFi will run inside the container and the NiFi web interface will be made available on port 8080.
- To access NiFi web interface, you can direct your browser to the following address
<IP of Nifi Instance>:8080/nifi
, after replacing the IP with the OpenStack instance IP. - The web interface looks something like this:
- Description of the main Web interface elements:
- Description of the main control elements:
- Now you are ready to start creating NiFi pipelines
Exercise 9.2. Generate flow files and send to local directory
Let's create a simple pipeline which generates FlowFiles with random content and stores them as files into the filesystem. We will use 2 NiFi processors:
- GenerateFlowFile : This processor creates FlowFiles with random data or custom content. It is useful for load testing, configuration, and simulation.
- PutFile : This Processor can used to store incoming FlowFiles into user configured folder in the local filesystem.
Lets create both of these processors and connect them:
- Add GenerateFlowFile processor to the NiFi canvas
- Drag and drop the NiFi processor icon (top left on the NiFi web interface) to the canvas. Nifi will display a table of available Processors.
- Type GenerateFlowFile in the search box.
- Double click on the processor to add to the canvas.
- Now GenerateFlowFile processor will be added to the main canvas.
- Double click on the newly added GenerateFlowFile Processor to get the configure processor window.
- Configure the Scheduling tab
- Schedule this processor to run in every
20 sec
. This allow us to limit the number of FlowFiles that are generated. - NB! Make sure that the Run schedule is not set to 0 sec, as this will make NiFi to schedule the Processor without limit and huge number of files will be generated at once.
- Scheduling Tab should now look like this:
- Schedule this processor to run in every
- Configure the Properties tab with the following info:
- File size:
10B
- Properties Tab should look like this:
- File size:
Lets now add the second NiFi processor: PutFile
- Configure the PutFile processor:
- In the Properties Tab set:
- Directory:
/tmp/NiFi_ex1/
- Directory:
- The Properties tab should look like this:
- In the Properties Tab set:
- In the Relationships Tab:
- Relationships Tab allows us to configure which outgoing relationships are not used, meaning where data is not sent and which outgoing pipes are Automatically Terminated.
- This is very important, as every processor can have many outgoing relationships and NiFi will not allow us to start Processors when it is not clear how every outgoing relationship is used.
- You will need to configure this for every Processor, setting any unused outgoing relationships as Automatically Terminated
- Configure this processor to mark Failure and Success relationships as Automatically Terminated.
- The Relationships Tab should look like this:
- Establishing connection between two processors.
- Hover over the ''GenerateFlowFile'Processor and drag the appearing arrow over to the other processor to establish relationship between them.
- NiFi usually asks, which outgoing relationship to use, but there will only be one option for the ''GenerateFlowFile'processor: success, which should already be selected.
- The resulting pipeline should look like this:
Lets now start the pipeline and verify that data is being generated and stored to the file system:
- Right click on the ''GenerateFlowFile' processor and select Start menu item.
- Similarly right click on the PutFile processor and select Start menu item.
- You verify the output of data pipeline in two ways:
- Through Data Provenance:
- For this, right click on PutFile processor.
- Select View data provenance menu item
- This will show the list of flow files that are handled by this processor.
- Click on the i button in the first column of each record.
- Goto CONTENT tab.
- Click on View button.
- The second way could be to login VM and exec inside container
docker exec -it container-name /bin/bash
and verifying the files in the given directoryls /tmp/NiFi_ex1/
- Through Data Provenance:
- Take a screenshot which displays the created pipelines (After starting them and testing them. PS! IP of the instance should be visible in the screenshot)
- Take a screenshot of the result (either through Data Provenance view, or by checking the output folder from the command line)
PS! To check issues related to NiFi Processors, you can hover your mouse over the Error icon:
Exercise 9.3. Creating NiFi templates
NiFi templates can be used to save NiFi pipelines as re-useable software artifacts that can be imported multiple times into NiFi canvas, downloaded as XML files, shared and uploaded into other NiFi deployments.
Lets create a NiFi template from the previous pipeline you created.
- Select the components you want to put into a template. In this case select all the components (GenerateFlowFile, 'PutFile, and the connection queue'').
- You can use shift-clicking to select multiple items.
- Right click on any selected component.
- Select ''Create template' option.
- Give a template name, and the description (optional).
- Now click on ''create' button followed by 'Ok' button.
Steps to download a template
- Click on the icon present in the right top corner
- Select Template option.
- Now find/select the template you want to download.
- In the last column, click on the download icon.
Importing template
- First it is required to upload the template
- Click on search Icon -> select the file -> click on Upload button
- Drag and Drop Template Icon to the canvas
- Select the template name from the drop down list.
- Now click on Add button.
- Make sure to save the downloaded template, this will be one of the lab submissions.
Exercise 9.4. More advanced NiFi pipeline
In this task we will create a NiFi pipeline which:
- Periodically queries a PostgreSQL database for new entries
- Parses the entries as JSON
- Sends a Slack message to notify about new messages
This pipeline represents a notification service, which listens to data from one service (PostgresDB), transforms the data, and acts on the data based on it's contents, by invoking other services (Slack).
9.4.1 Querying a database, parsing results as JSON
We will use the Postgres database of the Message Board Flask app from Heroku labs.
- Go to https://data.heroku.com/ and locate the Postgres instance which you used for the messageboard application in Practice 3 .
- Go to "Settings" -> "View Credentials" to find your detailed DB credentials, you will need these in the next step.
- Alternatively, if you don't have the Heroku app available, you should start/run the Postgres version of the app as we did in Practice 2 Task 2.3.3
Let's first configure a database connection pool service, which NiFi processors use to manage connection to DB. Because we use Postgres, we need to first add the JDBC Postgres Driver jar , as it is not bundled by default with NiFi. In the Openstack VM:
wget https://jdbc.postgresql.org/download/postgresql-42.3.3.jar
to download the .jardocker cp postgresql-42.3.3.jar nifi:/opt/nifi/nifi-current/lib
to copy the jar into the containers NiFi "lib" folder.
Now, in NiFi flow canvas:
- Right click on the background of the canvas and choose "Configure"
- Under "Controller Services", add a new service.
- Choose
DBCPConnectionPool 1.16.0
, click on the ⚙ (Gear) icon to configure it. Under "Properties":- Database Connection URL :
jdbc:postgresql://HOST/DATABASE
, replace "HOST" and "DATABASE" with your the Postgres service credentials. (NB! - this shouldn't include any username and password info, don't use URI field from Heroku!) - Database Driver Class Name:
org.postgresql.Driver
- Database Driver Loaction(s):
lib/postgresql-42.3.3.jar
- Database user: Your postgres user
- Password: Your Postgres password
- NB! Be careful with whitespace when copying values!
- Database Connection URL :
- Once the properties are set, hit "Apply" and click on the "⚡" (Lightning bolt) symbol to Enable the Service
- If successful, you should see "State: Enabled".
Lets return to the canvas and use the QueryDatabaseTable processor to make queries to a database.
- Add the processor to your pipeline and configure it (Properties tab):
- Database Connection Pooling Service: -> Choose the ConnectionPool you just created
- Database Type: "PostgreSQL"
- Table name: "messages" (or whatever value you used in your case)
- Maximum-value Columns:
id
- This indicates what DB column will be used to keep track of what data has already been queried. NiFi maintains the maximum observed value of this column in it's state, and uses that to query only rows data have not been retrieved so far.
- Adjust Scheduling:
- Run schedule: 15 sec
The QueryDatabaseTable returns database results in Avro format . For easier data handling in this lab, let's first convert them to JSON:
- Add a "ConvertAvroToJSON" processor
- Wrap Single Record: true
- Connect "QueryDatabaseTable" "success" result relation to "ConvertAvroToJSON" by dragging on the canvas.
Since the database query probably returns multiple rows, let's split the JSON results into separate FlowFiles:
- Add a 'SplitJson' processor.
- Properties -> JsonPath Expression:
$
- this JsonPath tells NiFi to consider the root JSON object as the list which is to be split. - Connect the
success
relation of "ConvertAvroToJSON" to "SplitJson" - Update the "failure" ConvertAvroToJSON relationship to terminate.
- Update the "failure", "original" relationships of "SplitJson" to terminate
- Properties -> JsonPath Expression:
- Add "EvaluateJsonPath" Processor to your pipeline
- Connect the "split" flow from SplitJSon to it.
Before proceeding to configure EvaluateJsonPath, let's test if our DB integration works and inspect the structure of the incoming data.
- Right-click and "Start" the SplitJson, ConvertAvroToJSON and QueryDatabaseTable processors.
- If your database has no data, post a few messages to it in the Flask app.
- As a result of the pipeline, you should see a few messages appear in the "split" relation (output of SplitJson).
- The situation should look like this:
- Right-click on the "split" queue, then select "List queue" .
- Take a screenshot of the "List queue" view, where at least 2 items are visible
- Check the contents of one of the FlowFiles (click on the "Eye" icon). NB! Note down which JSON property has the text contents of the message.
- For instance, in this example, the message is under JSON property called "message_text":
- If you are not receiving anything in the "split" queue, troubleshoot for errors, make sure you have configured the Processors correctly, didn't forget to set relationships to auto-terminate, and your DB actually has contents.
Now that we have the messages as individual split JSON FlowFiles, let's configure the EvaluateJsonPath processor that extracts the specific sub-part of the JSON containing the message text, and sets it as a FlowFile Attribute. We will make decisions based on the Attribute value later.
- EvaluateJsonPath Properties:
- Click on the "+" icon at the top-right
- Property Name: "messageContent"
- Value:
$["message_text"]
- this is the JSON path, the part of the whole JSON we want to extract. The part in quotation marks should match the JSON property you observed previously.
- Under "Properties", set Destination as
flowfile-attribute
- As a result of this, the extracted JSON path will be assigned to the attribute named "messageContent".
- Click on the "+" icon at the top-right
We now have the text contents of the message in the attribute "messageContent". Let's push this message to Slack!
- Add a PutSlack Processor
- Connect the "matched" relationship of EvaluateJsonPath to it
- We will send Slack messages to a new Slack Workspace, join it via the link shared on this labs channel on the courses main Slack (check Pinned messages of #lab9-nifi)
- Configure PutSlack:
- WebHook URL: Use the URL shared in the courses main Slack
- Webhook Text:
A message has been posted: ${messageContent}
- feel free to customize this message. - Run Schedule: Put 15 sec (change this to a higher value later once everything is working for less spam)
Let's test that everything works:
- Check your processors for any warnings, such as relationships which have not been terminated, make them automatically terminate.
- Now start all of your processors, add a new message your Heroku app connected to the Database.
- You should see a message appearing in the Slack channel "practice9" with your text!
- Once everything works, save the exercise solution as a template (make sure ALL elements are selected) and download the template. This will be one of the lab submissions.%
Exercise 9.5. Tagging users for urgent messages
As the final modification, let's update the pipeline, so that messages which contain the string "#urgent" will also tag your Slack user. Other messages will simply be posted to Slack without tagging (like before).
- Delete the existing "matched" flow from EvaluateJsonPath to PutSlack
- You need to stop the processors for this.
- Add a RouteOnAttribute processor to your pipeline
- Connect the "matched" flow from EvaluateJsonPath to it.
- Configure properties of RouteOnAttribute:
- Click on "+" Icon to add a property:
- Name: isUrgent
- Value:
${messageContent:contains("#urgent")}
- this expression checks the messageContent Attribute whether it contains the string "#urgent".
- Routing Strategy:
Route to matched if any matches
- if any of the conditions (we have 1 condition- "isUrgent") is matched, the FlowFile is routed to relationship called "matched"
- Click on "+" Icon to add a property:
- Now, connect the "unmatched" flow from "RouteOnAttribute" to your existing PutSlack Processor.
- So messages which don't match the "#urgent" string get directly posted to Slack.
- Add a second PutSlack Processor
- Connect the "matched" relationship of RouteOnAttribute to it
- Configure PutSlack:
- WebHook URL: Same as the other PutSlack processor
- Webhook Text:
Message for <@STUDENT_SLACK_USERID_HERE> :warning: - Urgent message posted: ${messageContent}
-- For STUCKENT_SLACK_USERID_HERE, Go to Slack, open your Profile details -> "More" -> "Copy Member ID", it looks something like
UERMN39UY
. If you wish you can modify the Slack message further. - Don't remove the <@ > (arrow and "@" symbols)
- For STUCKENT_SLACK_USERID_HERE, Go to Slack, open your Profile details -> "More" -> "Copy Member ID", it looks something like
As a result, the pipeline routes messages into 2 different PutSlack processors based on the message contents. You can also try adding more attribute checks to "RouteOnAttribute", e.g. "#warning", "#help".
The whole pipeline looks like this:
- Check your processors for any warnings, such as relationships which have not been terminated, make them automatically terminate.
- Now start all of your processors.
- Add a new message containing the string "#urgent" to your Heroku app connected to the Database.
- You should see a message appearing in the Slack channel "practice9" which also tags your user. Take a screenshot of this!
- Save the exercise solution as a template (make sure ALL elements are selected) and download the template. %
- Take a screenshot which displays the created pipeline (After starting them and testing them. PS! IP of the instance should be visible in the screenshot)
Deliverables:
- Screenshots from tasks 9.2 and 9.4, 9.5
- Templates from tasks 9.3 and 9.4, 9.5
- Please try importing your saved templates to the canvas by yourself before submitting to verify you did not miss anything when saved the template.
- Answer the following question:
- Which of the available NiFi processors looks most interesting or useful to you (which was NOT covered in this lab)
- Why do you find it interesting or useful?
- You can see the list of NiFi processors here: http://nifi.apache.org/docs.html
- Explain what is copy-on-write paradigm that NiFi Content Repository uses.
- Read about the NiFi content Repository here: https://nifi.apache.org/docs/nifi-docs/html/nifi-in-depth.html#content-repository
- Why is copy-on-write useful when dealing with large amount of streaming data?
- Which of the available NiFi processors looks most interesting or useful to you (which was NOT covered in this lab)
Troubleshooting
If a Processor is not behaving as expected (e.g. RouteOnAttribute runs, but nothing is matched), one way to get extra information is to check data provenance
- Right-click on Processor, choose data provenance.
- You will see a list of data the processor has handled, try clickin on the "i" icon to view details of a single item.
- Here you can find Attributes of the FlowFile, which may contain useful information such as the message contents in case of this example.
- If you have started the QueryDatabaseTable processor, it keeps track of the database entry ID-s to query only for fresh entries. If for testing purposes, you would like to reset the "ID" counter and re-test with older messages, stop the processor, right click and select "View State", and select "Clear state" to reset the ID counter.