Practice 11 - Advanced Apache NiFi and Edge computing with MiNiFi
In this lab we will continue working with Apache NiFi. You will learn some more advanced features of NiFi and how to set up NiFi data pipelines across multiple devices. We will set up 2 instances: one NiFi server for central data management and a smaller instance running MiNiFi.
MiNiFi, as its name indicates, is a Mini version of NiFi, which is designed to be deployed on more resource constrained devices, such as Edge devices. MiNiFi does not have a graphical interface and supports fewer processors than NiFi. There are two versions of MiNiFi: namely Java and C++. C++ has less supported processors than Java version.
You can read more information about MiNiFi from the MiNiFi documentation
The lab consists of 2 parts: |
Part 1: Setting up NiFi, MiNiFi and C2 (Command Center)
|
Part 2: Building a pipeline with MiNiFi, NiFi, InfluxDB and Slack integration
|
Before continuing with this lab you should be already familiar with general usage of NiFi, as covered in lab 10!
Part 1 - Setting up NiFi, MiNiFi and Command Center (C2)
Exercise 11.1.1 Run Apache NiFi using Docker
Launch an OpenStack VM for Machine #1:
- Use your last name as a part of the instance name!
- Source: Use Volume Snapshot, choose
Ubuntu 20 + Docker 20.10
- in this Ubuntu-based snapshot, the installation of Docker (as we did in Lab 2) has already been done for us.
- Enable "Delete Volume on Instance Delete"
- Flavour: should be
m2.tiny
- Security Groups:
default
,NiFi Remote Group S2S
andGraphana InfluxDB
- Create a Docker container with the NiFi image version 1.9.2 from Docker Hub using the following CLI command:
docker run --name nifi \ -p 8080:8080 \ -p 8081:8081 \ -p 10000:10000 \ -d \ -e NIFI_WEB_HTTP_PORT='8080' \ apache/nifi:1.9.2
- You can access the NiFi web interface using your browser on the URL:
http://<<IP_OF_OPENSTACK_VM>>:8080/nifi
( replacing the IP with the OpenStack VM IP ).
Exercise 11.1.2 Creating a basic NiFi flow
The goal of this exercise is to design a pipeline which integrates data flow between multiple instances: a MiNiFi instance and a NiFi instance, both of them having certain sub-parts of the pipeline running. One NiFi/MiNiFi pipeline can forward data to other pipelines using Remote Process Groups, which we will be using. It enables directing FlowFiles to different NiFi/MiniFi instances using Site-to-Site protocol.
Our pipeline will have 2 parts: Data producer and Data consumer. Data producer will run on MiniFi, while Data consumer will run on NiFi
However, MiNiFi provides no Graphical UI! So, in order to establish this flow, we first need to design both partsin the NiFi GUI, and then save/move certain subparts (templates) of our pipeline from NiFi to MiNiFi.
Data Producer:
- Add a GenerateFlowFile Processor, with the configuration:
- File Size: 2B
- Scheduling - Run Schedule 10s
- This will generate random data every 10 seconds.
- Add a Remote Process Group
- Configuration - URL: http://
YOUR_NIFI_INSTANCE_IP
:8080/nifi
- Configuration - URL: http://
- Try to connect the GenerateFlowFile output to the Remote Process Group.
- You will get an error about missing an Input Port, because our NiFi canvas has no Input Ports that would take care of data sent to it via Remote Process Groups. We will take care of this in a moment.
This "data producer" pipeline will later be moved to run on MiNiFi. It simply generates some random data and sends it to NiFi. Before moving it, for testing purposes, we will first make sure it works on NiFi as well.
Let's add the "data consumer" pipeline alongside it.
Data Consumer:
- Add an "Input Port"
- Give it some name, e.g. "From Sensor"
- Whenever somebody sends FlowFiles to a "Remote Process Group" target to this NiFi instance, they will be received by the "Input Port". In this example, we are sending the data to ourselves.
- Add a "Process group"
- Give it some name, like "Sensor Data Handling"
- A process group is just a container for a nested NiFi flow.
- Try connecting the Input Port to the Process Group, again, you will get an error, because the process group hasn't defined that it can handle any incoming data
- Double-click on the process group to open it
- Add an Input Port, give it some name.
- Close the Process Group (Right-click on canvas -> "Leave Group")
- Connect the Input Port to the Process Group.
- You can now also connect GenerateFlowFile to Remote Process Group.
- Double-click on the process group to open it
Our minimal 2-part pipeline is complete, it should look like the below image. Let's verify that it works by running all parts on NiFi:
- Start the GenerateFlowFile and Input Port components.
- For the Remote Process Group, right click and select "Enable Transmission"
You should see that the Input Port is able to receive data and that it is getting queued up right before the Process Group (in below screenshot 8 FlowFiles have been received and are queued):
Once you've verified that it works, stop the GenerateFlowFile processor, leave the Input Port enabled.
11.1.4 Setting up C2
We want to migrate the Data Producer pipeline to run on MiNiFi instead. We could save the pipeline as a template, and manually move the file over to a MiNiFi instance, but this becomes hard to manage in IoT scenarios, where there may be hundreds of edge devices running MiNiFi.
MiNiFi Command Center (C2) is a service which automates deployment of NiFi templates to MiNiFi, let's set it up. It will run on the same VM as NiFi (Machine #1).
- Download the configuration file for C2 into your NiFi VM (e.g. using wget) : minifi-c2-context.xml
- C2 supports different options for what to use as the sources of templates (e.g. S3 storage, or some NiFi instance). Our configuration will use our NiFi instance to act as the source of templates available to MiNiFi instances.
- Edit the XML, find the
NiFiRestConfigurationProvider
underconfigService
.- Find the child tag:
<value>http://localhost:8080/nifi-api</value>
, this indicates the address of the NiFi API to use as the source. Replace localhost with the IP of the NiFi VM!
- Find the child tag:
- Run MiNiFi C2 using Docker
- When running, we provide our updated XML configuration file as a volume, overwriting the default file defined in the C2 Docker image:
docker run --name nifi-c2 \ -d \ -p 80:10080 \ -v /home/ubuntu/minifi-c2-context.xml:/opt/minifi-c2/minifi-c2-0.5.0/conf/minifi-c2-context.xml \ apache/nifi-minifi-c2:0.5.0
- Make sure you use the correct absolute path for your edited xml file! (e.g.
/home/ubuntu/minifi-c2-context.xml
)
- Make sure you use the correct absolute path for your edited xml file! (e.g.
- C2 should be running now
- You can check if there were any errors with
docker logs nifi-c2
, for example.
- You can check if there were any errors with
Let's verify that C2 is able to find templates from our NiFi instance.
- In the NiFi interface Select the Data Producer flow and save it as a template (This is a similar approach like from the previous Practice Session 9-Exercise 9.3.)
- Name it
sensor-device.v1
- The 1st part of the template name identifies the device class (remember this name for later!)
- The 2nd part identifies the version of this template. If you later wish to update the template, you should use a newer version number.
- Name it
- Test that C2 is able to identify the saved template:
- In a browser, access http://YOUR_VM_IP:80/c2/config?class=sensor-device
- If it is successful, the server responds with a NiFi template converted to MiNiFi format (config file of type text/yml)
- Firefox users: If you get This address is restricted because of non-standard ports, try using Chrome (or changing Firefox configuration)
- Deliverable: The C2 server response template file you got in this step.
- In a browser, access http://YOUR_VM_IP:80/c2/config?class=sensor-device
- You can also inspect the logs of C2 to see when a client is requesting templates
docker logs nifi-c2
- You should see something like
Handling request from Client 172.17.65.112 with parameters {class=[sensor-device]}
- You should see something like
11.1.5 Starting MiNiFi
In this task we will use OpenStack to run Apache Java MiNiFi inside another instance (Machine #2). As a result we will have 2 instances running.
- Start a 2nd Openstack VM Instance
- Use your last name as a part of the instance name!
- Flavor: m3.nano
- Image: Ubuntu 18.04
- Enable "Delete Volume on Instance Delete"
- Install Java
sudo apt-get update
sudo apt install openjdk-8-jdk
- Install MiNiFi service
wget https://archive.apache.org/dist/nifi/minifi/0.5.0/minifi-0.5.0-bin.tar.gz
tar -xf minifi-0.5.0-bin.tar.gz
unpack the archivecd minifi-0.5.0/
sudo bin/minifi.sh install
- Modify the hosts file on the MiNiFi instance, to define what IP matches the hostname of the NiFi Docker container:
sudo nano /etc/hosts
- NB! Don't miss this step or will have issues debugging later.
- NB! Also make sure you use the NiFi docker container ID and not nifi-c2 docker container id!
- Add a new line inside the hosts file, which contains the following values:
- NIFI_INSTANCE_IP NIFI_DOCKER_CONTAINER_ID
- You can check docker container ID by running the docker command to list all running docker container:
docker ps
- The result should look like something like this (with your own docker username and nifi instance IP):
Before starting the MiNiFi service, let's configure it so that it can fetch flow templates from C2
- In Minifi directory:
nano conf/bootstrap.conf
, uncomment the line:
nifi.minifi.notifier.ingestors=org.apache.nifi.minifi.bootstrap.configuration.ingestors.PullHttpChangeIngestor
- Under the section "Pull HTTP change notifier configuration", unnomment and configure:
nifi.minifi.notifier.ingestors.pull.http.hostname=<<YOUR_NIFI_VM_IP>> nifi.minifi.notifier.ingestors.pull.http.port=80 nifi.minifi.notifier.ingestors.pull.http.path=/c2/config
nifi.minifi.notifier.ingestors.pull.http.query=class=sensor-device
- This class name property (
sensor-device
in this case) will identify our MiNiFi instance to C2. Note: we'll need this name later! For example, you may have different kinds of sensors/devices, each using different templates, so you would use the class name to distinguish them.
- This class name property (
nifi.minifi.notifier.ingestors.pull.http.period.ms=30000
- How often MiNiFi will pull updates from C2 from (every 30s)
- Under the section "Pull HTTP change notifier configuration", unnomment and configure:
Let's now start MiNiFi
- Starting MiNiFi service:
~/minifi-0.5.0/bin/minifi.sh start
- If you need to Stop MiNiFi service (do not run this command unless you need to stop or restart MiNiFi):
~/minifi-0.5.0/bin/minifi.sh stop
- This is useful when you get an error about MiNiFi already running. then you can first stop the MiNiFi and then start it again.
- You can check any issues with MiNiFi from the MiNiFi logs at
minifi-0.5.0/logs/minifi-app.log
If MiNiFi is running, it fetches the template and automatically starts it. If your NiFi pipeline Input Port is enabled, you should see new FlowFiles coming into the queue.
- Check that MiNiFi is successfully sending data, the
minifi-app.log
should contain messages similar to:
[....] RemoteGroupPort[name=from MiNiFi,targets=http://172.17.66.123:8080/nifi] Successfully sent [StandardFlowFileRecord[....]] (5 bytes) to nifi://fe3f69f3dab9:10000 [...]
- In the above example,
[....]
means some parts have been left out - Deliverable: Take a screenshot of your terminal showing successful output from
minifi-app.log
- In the above example,
Part 2: Advanced pipeline with InfluxDB and Slack
In this task, we shall create advanced pipeline by using the puhatu data from the previous lab, the overview of the use case scenario as mentioned below in the figure
IoT device: Here we write a sensor.py
which mimic the behavior of the Puhatu IoT device, where we read the each row of data from the puhatu.csv and store them as a .json
document. You can download the Attach:sensor.py and Attach:puhatu.csv in Machine #2 (MiniFi machine). While running the program provide two arguments such as dev_id ("puhatu_c1") and sleep_interval (2). Follow the commands as below
- Install necessary packages like pip and pandas
sudo apt install python3-pip
andpip install pandas
- Create a directory in home directory to store json documents:
mkdir /home/ubuntu/puhatu
- Download the
sensor.py
andpuhatu.csv
in the home directory (/home/ubuntu). - Run the python script
python3 sensor.py "puhatu_c1" 2
. Here, it creates a json documents by reading each row and stored with current time stamp as filename.
Gateway Device: Here, we use MiniFi to collect the data from the IoT device and preprocess the data and forward further to the Apache NiFi.
Data processing, Storage and Visualization: Here, we use Apache NiFi for data processing and store processed data it in the time series database known as Influxdb.
For this part of the lab, you should just modify your existing flows.
11.2.1 MiNiFi pipeline
- Remove the connection between RemoteProcessGroup and GenerateFlowFile
- (you may have to clear the queue by right-clicking on the connection and stop transmission of the Remote Process Group, also with right-click)
Let's start collecting the IoT data (.json document) which is created by sensor.py (IoT Device).
- Select the Get File Processor:
- Input Dirctory:
/home/ubuntu/puhatu
- Scheduling - Run Schedule 3s
- Input Dirctory:
- Create a EvaluateJsonPath processor for extracting needed values from generated JSON document as NiFi attributes.
- Lets create a new tempAttribute by reading the temp value from the FlowFile.
- Change the following property:
- Destination:
flowfile-attribute
- This will specify that EvaluateJsonPath processor should update attributes and not content of FlowFile (JSON)
- Destination:
- Add a new property to the Processor with the following key and value:
- tempAttribute:
$.wat_Temp_float
- This JSON lookup operation
$.
will look for json element named temp from each FlowFile.
- tempAttribute:
- Change the following property:
- Lets create a new tempAttribute by reading the temp value from the FlowFile.
- Connect GetFile output to EvaluateJsonPath input
- Create a UpdateAttribute processor, for tracking whether a new FlowFiles tempAttribute value is different from the last one
- Properties:
- Store State: Store state locally
- Stateful Variables Initial Value: 0
- Add 2 new properties:
- 1) name
hasValueChanged
and value${getStateValue("lastValue"):equals(${tempAttribute}):not()}
- 2) name
lastValue
and value${tempAttribute}
- The
lastValue
stateful property is used to store the last seen value of tempAttribute, whilehasValueChanged
compares whether the newly seen tempAttribute value does not equal the last seen (lastValue), the result is stored as a boolean.
- 1) name
- Properties:
- Connect EvaluateJsonPath
matched
output to UpdateAttribute input (auto-terminate other outputs) - Create a RouteOnAttribute processor for defining how to filter which data is sent to the NiFi server
- We will send data to NiFi only if the new value is different from the last value (using
hasValueChanged
created by UpdateAttribute - Add a new property to the processor to define new routing rule:
- valueChange:
${hasValueChanged:equals(true)}
- valueChange:
- We will send data to NiFi only if the new value is different from the last value (using
- Connect UpdateAttribute
success
output to RouteOnAttribute input - Connect the
valueChange
output of RouteOnAttribute to the RemoteProcessGroup
Finally, to complete this pipeline, let's also save all temperature values (regardless of whether the value changed) to file as well.
- Create a PutFile processor for storing the original generated files into filesystem, which is useful for checking that MiNiFi flow s working properly.
- Change the processor to output FlowFiles into
/tmp/nifi
folder- Later, when running the pipeline MiNiFi, you can check that new files are being created in this folder to confirm that the pipeline was properly started.
- Connect GetFile output to PutFile input
- Change the processor to output FlowFiles into
The completed result should look something like this:
On your MiNiFi instance, stop MiNiFi using the command ( ~/minifi-0.5.0/bin/minifi.sh stop
).
On your NiFi instance where you designed the flow, first test if it works on NiFi by starting it, you should see some values arriving at the "Data Consumer" part of your canvas. To perform this task you have repeat the instructions from from IoT device part in NiFi machine.
- If it works, let's save it as a template for our "sensor-device" MiNiFi instance.
- Remember, in order for MiNiFi to fetch the new version of the template, we should update the version number!
- For example, save it as
sensor-device.v2
.
Since we already set up MiNiFi with C2, MiNiFi should automatically load the new version of the template.
- Stop the template in your NiFi canvas, and start MiNiFi again, verify that data is arriving at the Input Port.
11.2.2 NiFi pipeline
In the same canvas, let's now add functionality to the NiFi (Data Consumer) part.
- Open the Process Group we defined earlier.
- Create a RouteOnAttribute to act upon certain too high temperature values
- Add a new property to the processor to define new routing rule:
- highTemp:
${tempAttribute:gt(20)}
- highTemp:
- Add a new property to the processor to define new routing rule:
- Create a new PutSlack processor to send high temperature alerts to Slack
- We will send Slack messages to a new Slack channel #practice11,
- Configure PutSlack:
- WebHook URL: Use the URL shared in the Slack #lab11-minifi
- Webhook Text:
Message from <STUDENT_NAME_HERE>: :warning: Water Temperature value was high! value: ${tempAttribute}! :warning:
- Customize this text so that it contains your name, if you wish you can modify it further. - Run Schedule: Put 15 sec (change this to a higher value later once everything is working for less spam)
- Connect output of RouteOnAttribute "highTemp" relationship to PutSlack
- Try starting the processor and see if you can see messages in Slack.
- Deliverable: Take a screenshot of your Slack Alert message appearing in the #Practice11 channel in the workspace you joined in lab9.
11.2.3 Saving data to InfluxDB
As the final part, we also want to store all the data to a time-series database, for this we will use InfluxDB
In NiFi VM instance:
- In you home directory, create 2 directories:
mkdir /home/ubuntu/influxdb
mkdir /home/ubuntu/grafana
- Run InfluxDB and Grafana using Docker:
docker run -d \ --name docker-influxdb-grafana \ -p 3003:3003 \ -p 3004:8083 \ -p 8086:8086 \ -v /home/ubuntu/influxdb:/var/lib/influxdb \ -v /home/ubuntu/grafana:/var/lib/grafana \ philhawthorne/docker-influxdb-grafana:latest
- Open InfluxDB-s web management interface:
YOUR_VM_IP:3004
- Get Started -> Add Connection (keep default settings) -> Skip (Dashboards)-> Skip (Kapacitor) -> View All Connections
- Now our web interface is connected to the underlying InfluxDB service.
- Let's create a database. From the left side-menu, choose "InfluxDB Admin" (2nd icon from bottom)
- Create Database
- Name: "weather"
The InfluxDB is created, we can now send data to it from NiFi.
- Add a "ReplaceText" Processor to the Nifi Canvas (Data Consumer Pipeline). We use ReplaceText to transform data to the right format for InfluxDB.
- Properties:
- Replacement Value:
weather,source=nifi temperature=${tempAttribute}
- Replacement Strategy:
Always Replace
- Replacement Value:
- We configured ReplaceText to always replace the entire body of the FLowFile with the template
weather,source=nifi temperature=${tempAttribute}
, which is formatted according to InfluxDB Line Protocol
- Properties:
- Connect BOTH "highTemp" and "unmatched" relationships from RouteOnAttribute to ReplaceText
- Auto-terminate "failure" relationship for ReplaceText
- Add a PutInfluxDB Processor
- Properties:
- Database Name: "weather"
- InfluxDB connection URL:
http://<<YOUR_NIFI_VM_IP>>:8086
- Properties:
- Connect "success" output from ReplaceText to PutInfluxDB
The full pipeline will look like this:
Try running your pipeline. Inspect the Data Provenance of the PutInfluxDB processor to see if there are any failures.
11.2.4 Visualizing the data in InfluxDB
The docker Image we are using bundles InfluxDB together with Grafana, which has powerful visualization and querying features for data. However, for this exercise we will use the same InfluxDB web interface to see some simpler graphs.
Let's see if data is arriving in InfluxDB using it's web interface:
- Choose "Explore" from left side-menu of InfluxDB web interface.
- Select "weather.autogen" as the DB, choose the "weather" measurements and source: nifi, and select the "temperature field":
- You should be able to see your data being visualized.
- Try changing the timeframe to last 15 minutes, from top-right of the interface
- deliverable: Take a screenshot of your data visualization, showing data from last 15 minutes
Deliverables:
NB! Terminate your instances!
- Part 1:
- Template file produced by C2 server (task 11.1.4 )
- NB! NOT the one produced from NiFi web GUI!
- 11.1.5 Screenshot of succesful MiNiFi log output
minifi-app.log
- NB! Make sure the terminal hostname part (instance name) is visible in the screenshot!
- Template file produced by C2 server (task 11.1.4 )
- Part 2
- Templates from tasks (you can export from NiFi web interface)
- MiNiFi pipeline template (11.2.1)
- NiFi pipeline template (after finishing both 11.2.2, 11.2.3)
- Screenshots
- 11.2.2 Screenshot of your alert message appearing in #alerts channel in Slack, containing your name
- 11.2.4 Screenshot of your InfluxDB data visualization (last 15 mins)
- Templates from tasks (you can export from NiFi web interface)
%NB! Don't forget to delete created instances (and their volumes!) after finishing the lab!
11. Lab 11Troubleshooting
In case of issues, consider:
- Are all NiFi processors error-free (Don't see "warning" icon") ?
- If no, have you connected or auto-terminated all relationships for each processor?
- Check Data Provenance of a NiFi processor to find error messages regarding processors
- Checking the C2 docker container logs:
docker logs nifi-c2
- Checking the MiNiFi logs
~/minifi-0.5.0/logs/minifi-app.log
~/minifi-0.5.0/logs/minifi-bootstrap.log
- If after saving a template for MiNiFi inside NiFI but the update doesn't seem to reach MiNiFi:
- Did you use a newer version file extension for the template name?
- Try restarting MiNiFi
This lab's content is related to the RADON Horizon 2020 EU project, the project's goal is unlocking the benefits of serverless FaaS for the European software industry using TOSCA language and developed tools.
- G. Casale, M. Artac, W.-J. van den Heuvel, A. van Hoorn, P. Jakovits, F. Leymann, M. Long, V. Papanikolaou, D. Presenza, A. Russo, S. N. Srirama, D.A. Tamburri, M. Wurster, L. Zhu RADON: rational decomposition and orchestration for serverless computing. SICS Software-Intensive Cyber-Physical Systems. 2019 Jan 1-11. https://doi.org/10.1007/s00450-019-00413-w