Introduction
This tutorial will show you how use Apache NiFi to pull data in the form of a CSV file placed on a cloud storage solution, format it in order to send it to a messaging queue (Kafka), consuming from that queue to ingest it into an operational database (HBase), and then retrieving the data via SQL syntax using Phoenix, all within Cloudera Data Platform - Public Cloud (CDP-PC).
Prerequisites
- Have administrator access to already created environment on Cloudera Data Platform (CDP) Public Cloud.
- Have created a CDP workload User
- Basic AWS CLI skills
Outline
Watch Video
The video below provides a brief overview of what is covered in this tutorial:
Download Assets
You have two (2) options to get the assets needed for this tutorial:
It contains only necessary files used in this tutorial. Unzip tutorial-files.zip and remember its location.
It provides assets used in this and other tutorials; organized by tutorial title.
Using AWS CLI, copy file corvette-data-june2020.csv to S3 bucket, s3a://<storage.location>/tutorial-opdb/, where <storage.location> is your environment’s property value for storage.location.base.
Note: You may need to ask your environment's administrator to get property value for storage.location.base.
In this example, property storage.location.base has value s3a://usermarketing-cdp-demo; therefore copy the file using the command:
aws s3 cp corvette-data-june2020.csv s3://usermarketing-cdp-demo/tutorial-opdb/corvette-data-june2020.csv

This tutorial requires that we provision three (3) data hub clusters named:
1. um-nifi-demo
, using cluster definition 7.2.0 - Flow Management Light Duty for AWS

2. um-streaming-demo
, using cluster definition 7.2.0 - Streams Messaging Light Duty for AWS

3. um-opdb-demo
, using cluster definition 7.2.0 - Operational Database with SQL for AWS

Note: Your CDP environment may have different cluster definitions.
Refer to How to Create a Data Hub on Cloudera Data Platform for details on how to provision a Data Hub.
In this section, we’ll configure our Data Hub Clusters to communicate with one another.
1. Download HBase configuration file, hbase-clientconfig.zip. Unzip the file and remember its location.
Begin from Operational Database Data Hub:
um-opdb-demo > CM-UI > Clusters > HBase > Actions > Download Client Configuration

2. Copy hbase-site.xml file into every NiFi worker node.
Note: There are other files in the client configuration file downloaded - we only need to copy hbase-site.xml.
a. Determine NiFi worker node(s) public IP addresses
Begin from Flow Management Data Hub: um-nifi-demo > Hardware > NiFi > Public IP

b. On the command line, issue the following commands on every NiFi worker node. This will copy the HBase configuration file, hbase-site.xml and set permissions accordingly:
Note: You need to update <USERNAME> and <PUBLIC_IP>, where <PUBLIC_IP> is the public IP address of each NiFi worker node.
scp hbase-conf/hbase-site.xml <USERNAME>@<PUBLIC_IP>:/tmp/hbase-site.xml
ssh <USERNAME>@<PUBLIC_IP>
chmod 777 /tmp/hbase-site.xml && exit

We need to create an HBase table and column family - this is where our data will finally be stored.
1. Determine HBase worker node(s) public IP addresses
Begin from Operational Database Data Hub:
um-opdb-demo > Hardware > Worker > Public IP

2. In the command prompt, SSH into any HBase worker node and issue the following commands to create table and column family:
Note: You need to update <USERNAME> and <PUBLIC_IP>, where <PUBLIC_IP> is the public IP address of any (one) HBase worker node.
ssh username@<PUBLIC_IP>
hbase shell
create 'corvette_demo','corvette_data'
exit
exit

1. Add new schema
Begin from Streams Messaging Data Hub:
um-streaming-demo > Schema Registry
Click on
to add new schema
- Name:
corvette_schema
- Description:
corvette sensor data
- Type:
Avro schema provider
- Schema Group:
Kafka
- Compatibility:
Backward
- Select checkbox
EVOLVE
Drag & Drop file
corvette-schema.json
into SCHEMA TEXT- Save

2. Add new TOPIC:
Begin from Streams Messaging Data Hub:
um-streaming-demo > Streams Messaging Manager

- Select
Topics
- Click on Add New
- TOPIC NAME:
corvette_stream
- PARTITIONS:
5
- Availability:
Maximum
- Cleanup.Policy:
delete
- Save

Instead of building the data flow from scratch, we will use the template provided in download assets. If you want to learn to build the data flow from scratch, take a look at Importing RDBMS Data into Hive.
Begin from Flow Management Data Hub:
um-nifi-demo > NiFi
1. From the files downloaded, let’s import data flow template, corvette-dataflow-template.xml, into NiFi:
- Click on
to upload template
- Click and drag
into the canvas
2. Create variables that the data flow requires:
Right-click anywhere on the canvas and select Variables.

- Name:
cdpuser
, Value: <use your CDP user id> - Name:
kafkabrokers
, Value: <list of Kafka broker addresses, separated by commas>
The Kafka broker addresses are determined by:
Begin from Streams Messaging Data Hub:
um-streaming-demo > Streams Messaging Manager > Brokers

Based on the image above, the value would be:
um-streaming-demo-broker2.usermark.a465-9q4k.cloudera.site:9093,um-streaming-demo-broker1.usermark.a465-9q4k.cloudera.site:9093
- Name:
schemaurl
, Value: Schema Registry URL using formathttps://<FQDN>:7790/api/v1
FQDN is determined by:
Begin from Streams Messaging Data Hub:
um-streaming-demo > Hardware > Master > FQDN

Based on the image above, the value would be:
https://um-streaming-demo-master0.usermark.a465-9q4k.cloudera.site:7790/api/v1

3. Update Processors and Controller Services:
Processors:
- Fetch_from_S3 > Properties
- Kerberos Password: <your-cdp-password>
- Publish_Kafka_Topic > Properties
- SSL Context Service: select Default NiFi SSL Context Service
- Password: <your-cdp-password>
- ConsumeKafkaRecord_2_0 > Properties
- SSL Context Service: select Default NiFi SSL Context Service
- Password: <your-cdp-password>
Controller Service:
- HortonworksSchemaRegistry > Properties
- Kerberos Password: <your-cdp-password>
- SSL Context Service: select Default NiFi SSL Context Service
- HBase_2_ClientService > Properties
- Kerberos Password: <your-cdp-password>
4. Enable All Controller Services
- JsonTreeReader
- JsonRecordSetWriter
- HortonworksSchemaRegistry
- HBase_2_ClientService
- CorvetteAvroReader
- CSVReader
- AvroRecordSetWriter
Note: If you see two (2) services named Default NiFi SSL Context Service, delete the duplicate service marked Invalid.
Let's run the data flow you have just created. You can run the entire processor group or single processors at a time. For general debugging and diagnostics, it is recommended to run one processor at a time.
Let's run all processors at once by clicking from the Operate menu.
After a few seconds, you will see the data flow through all the processors. Click on from the Operate menu to stop all processors at once.

Let’s take a look at some data throughput metrics on the Kafka Topic, corvette_stream, we created:
Begin from Streams Messaging Data Hub:
1. um-streaming-demo > Streams Messaging Manager > Topics
2. Search for corvette_stream
3. Click on to view the profile

4. Streamed data throughput metrics for our Kafka topic:

We will use Apache Phoenix to create a view against our HBase data and use SQL-like statements to see our data.
In the command prompt, SSH into any (one) HBase worker node, just like you did in Operational Database Configuration:
ssh username@<PUBLIC_IP>
phoenix-sqlline
- Create View
create view "corvette_demo" ( scanID VARCHAR PRIMARY KEY , "corvette_data"."Offset" VARCHAR ,"corvette_data"."IntakeManifoldAbsolutePressure" VARCHAR ,"corvette_data"."ShortTermFuelTrimBank2" VARCHAR ,"corvette_data"."ShortTermFuelTrimBank1" VARCHAR ,"corvette_data"."FuelSystemStatus" VARCHAR ,"corvette_data"."MassAirflow" VARCHAR ,"corvette_data"."O2VoltageB1S1" VARCHAR ,"corvette_data"."O2VoltageB2S1" VARCHAR ,"corvette_data"."TimingAdvance" VARCHAR ,"corvette_data"."EquivalenceRatioCommanded" VARCHAR ,"corvette_data"."EngineRPM" VARCHAR ,"corvette_data"."CommandedThrottleActuator" VARCHAR ,"corvette_data"."ThrottlePosition" VARCHAR ,"corvette_data"."AbsoluteLoad" VARCHAR ,"corvette_data"."MassAirflowSensor" VARCHAR ,"corvette_data"."VehicleSpeed" VARCHAR ,"corvette_data"."IntakeAirTemp" VARCHAR ,"corvette_data"."EngineCoolantTemp" VARCHAR ,"corvette_data"."Wideband02Sensor" VARCHAR );
Run query to find abnormal data, FuelSystemStatus not equal to ‘CL - Normal’
SELECT "IntakeManifoldAbsolutePressure" , "MassAirflow" , "EngineRPM" , "MassAirflowSensor" , "FuelSystemStatus" FROM "corvette_demo" WHERE "FuelSystemStatus" NOT IN ('CL - Normal') ;
- Exit phoenix-sqlline:
!quit

Congratulations on completing the tutorial.
As you've now experienced, it takes a small amount of effort to configure multiple Data Hub clusters to communicate with one another using Cloudera Data Platform - Public Cloud (CDP-PC).
NiFi’s flexible processors make it simple to extract, transform and load data into HBase - hopefully this tutorial sparks your imagination and inspires other creative solutions.
Videos
- Collections-Data Hub library of videos
- Collections-Data Flow / Streaming library of videos
Blogs
- Building a Scalable Process Using NiFi, Kafka and HBase on CDP
- Overview of the Operational Database performance in CDP
Meetup
Other
- CDP Users Page - other CDP resources, including video, tutorials, blogs and events
- Have a question? Join Cloudera Community
- Cloudera Data Hub documentation
- Operational Database product information