Introduction
In this tutorial we will use Cloudera Data Platform Public Cloud (CDP-PC) to build a data flow using Apache NiFi to extract data from an external relational database into CDP data warehouse, Apache Hive.
Prerequisites
- Have access to Cloudera Data Platform (CDP) Public Cloud with a Data Lake running
- Have created a CDP workload User
- Basic AWS CLI skills
- Have SQLLINE installed
Setup External Relational Database
Generally, we don't need to setup an external relational database because most likely, you already have one. To see this tutorial through, we will create an Oracle database on the cloud, using AWS. Let's get started!
Using command line, move into the folder where you downloaded the assets. Issue the AWS CLI command to create Oracle database instance:
aws rds create-db-instance \ --db-name ORCL \ --db-instance-identifier sampledb-orcl \ --allocated-storage 20 \ --db-instance-class db.t3.small \ --engine oracle-se1 \ --master-username admin \ --master-user-password tut0rial \ --publicly-accessible \ --backup-retention-period 0 \ --license-model license-included \ --db-subnet-group-name <MODIFY_MY_DB_SUBNET_GROUP>
Note: You need to modify the command above using your DB subnet group. Refer to AWS documentation for more information.

When the Oracle database, sampledb-orcl, is successfully created and available; you need two pieces of information - Endpoint & Port. You can get this information by executing the following AWS CLI commands. Refer to AWS documentation for more information.
aws rds describe-db-instances --db-instance-identifier sampledb-orcl | grep Address && aws rds describe-db-instances --db-instance-identifier sampledb-orcl | grep \"Port\"

Using the Endpoint and Port information we just gathered, let's execute the following SQLLINE commands to:
- Create user (cdc)
./sqlline -u jdbc:oracle:thin:@ENDPOINT:1521:ORCL -n admin -p tut0rial -f create_orcl_user.sql
./sqlline -u jdbc:oracle:thin:@ENDPOINT:1521:ORCL -n cdc -p tut0rial -f create_orcl_tables.sql
./sqlline -u jdbc:oracle:thin:@ENDPOINT:1521:ORCL -n cdc -p tut0rial -f load_orcl_tables.sql
Note: The JDBC database connection URL will differ if you are using an RDBMS other than Oracle.

Great! We just created an Oracle relational database. We will use it to migrate data over to CDP data warehouse using NiFi.

In the Environments section, search for your environment and click on its name:

Select Data Hubs to see all the data hubs we have created in our environment.
Since we are creating a new data hub, click on Create Data Hub.

Let's complete the data hub provisioning form:
- Selected Environment: usermarketing
- Select Cluster Definition
- Cluster Definition: 7.1.0 - Flow Management Light Duty for AWS
- Cluster Name: um-nifi-tutorial
- Click on Provision Cluster
Note: Your CDP environment may be different and adjustments may be necessary.

After a few minutes, the newly created data hub will be Running and available for use. Click on the data hub name, um-nifi-tutorial.

We are now ready to proceed to the highlight of this tutorial - NiFi!
In the Services section, click on the NiFi iconto start building our data flow.

Build NiFi Data Flow
We are now ready to build our NiFi data flow to extract data from the Oracle database we just created and move the data into an S3 bucket.
This data flow requires six (6) NiFi processors. We will create them one at a time. Let's get started.
Processor 1: List Database Tables
Click and drag processor icon into canvas
- Filter on ListDatabaseTables
- Click on ADD

Right-click on processor and select Configure

SETTINGS:
- Name: List CDC Tables

SCHEDULING:
- Execution: Primary node
Note: In clustered environment, when getting data from external source, you should execute on primary node only. Otherwise, you may have duplicate data in multiple nodes.

PROPERTIES:
- Database Connection Pooling Service: DBCPConnectionPool
The service controller does not exist, therefore, create it using defaults (no changes).

- Schema Pattern:
%CDC%
- Click on the
to configure the Controller Service
When prompted to save changes - click YES

Next to DBCPConnectionPool, click on to configure properties

PROPERTIES:
- Database Connection URL:
jdbc:oracle:thin:@ENDPOINT:1521/ORCL
Replace ENDPOINT with your database endpoint, as covered earlier.
- Database Driver Class Name:
oracle.jdbc.OracleDriver
- Database Driver Location(s):
/tmp/jdbc-drivers/ojdbc6.jar
- Kerberos Principal: CDP-USERID
- Kerberos Password: CDP-PASSWORD
- Database User:
cdc
- Password:
tut0rial
- Apply changes

Notice the Invalid State on the DBCPConnectionPool controller service. This is because the database JDBC driver does not exist.

We've included a copy of the JDBC driver where you downloaded the assets. We will copy the JDBC driver into every NiFi worker node. To do this, we need the public IP address of each NiFi worker node. Here's one way to get the public IP addresses:
From the Cloudera Data Platform (CDP) home page, select Data Hub Clusters

Search for our data hub, um-nifi-tutorial and click on the name

Select the Hardware tab. Under the NiFi section, you will find all public IP addresses of all NiFi worker nodes. Make a note of these addresses.

Using command line, move into the folder where you downloaded the assets. Issue the following commands on every NiFi worker node. This will copy the local JDBC driver and set permissions accordingly:
Note: You need to update <PUBLIC_IP>, where <PUBLIC_IP> is the public IP address of each NiFi worker node.
scp ojdbc6.jar userid@<PUBLIC_IP>:/tmp/ojdbc6.jar
ssh userid@<PUBLIC_IP>
mkdir -p /tmp/jdbc-drivers && chmod 777 /tmp/jdbc-drivers && mv /tmp/ojdbc6.jar /tmp/jdbc-drivers/ && chmod 777 /tmp/jdbc-drivers/ojdbc6.jar && exit

Now that we resolved the warning, click on to enable this service controller.

Click on ENABLE, then CLOSE


PROPERTIES:
- Replacement Value:
SELECT * FROM ${db.table.fullname}
APPLY to save changes

Link the processors together by click and drag from Processor1 to Processor2

To evenly distribute the workload, choose Round Robin for the Load Balance Strategy in the connection configuration.


PROPERTIES:
- Database Connection Pooling Service: DBCPConnectionPool
APPLY to save changes

Link the processors together by click and drag from Processor2 to Processor3:
- Under the DETAILS tab, set For Relationships to success



PROPERTIES:
- Record Reader: AvroReader
The service controller does not exist, therefore, create it using defaults (no changes).

- Record Writer: CSVRecordSetWriter
The service controller does not exist, therefore, create it by selecting Compatible Controller Service CSVRecordSetWriter.

- Click on the first
to configure the Service Controller
When prompted to save changes - click YES

Neither AvroReader nor CSVRecordSetWriter need to be configured. Click on to enable the service controllers.

Link the processors together by click and drag from Processor3 to Processor4:
- Under the DETAILS tab, set For Relationships to success

Processor 4 complete... Next!
Processor 5: Update File Name
Click and drag processor icon into canvas
- Filter on UpdateAttribute
- Click on ADD
Right-click on processor and select Configure
SETTINGS
- Name:
Update File Name
PROPERTIES
Add a new property by clicking
- Property:
filename
- Value:
${db.table.name}.csv
APPLY to save changes

Link the processors together by click and drag from Processor4 to Processor5:
- Under the DETAILS tab, set For Relationships to success


PROPERTIES
- Hadoop Configuration Resources:
/etc/hadoop/conf.cloudera.core_settings/core-site.xml
- Kerberos Principal:
CDP-USERID
- Kerberos Password:
CDP-PASSWORD
- Directory:
/staging/${db.table.name}
- Conflict Resolution Strategy:
replace
- Apply changes

Link the processors together by click and drag from Processor5 to Processor6:

Let's run the data flow you have just created. You can run the entire processor group or single processors at a time. For generally debugging and diagnostics, it is recommended to run one processor at a time.
Let's run all processors at once by clicking from the Operate menu.
Likewise, to stop all processors at once, click on from the Operate menu.


Click on to download the template file.

Create and Populate Hive Tables
Now that we successfully extracted data from an Oracle database and stored it in an S3 bucket, we will use Data Analytics Studio (DAS) to move the data into Hive.
Open DAS from your virtual warehouse.
Beginning from CDP home page > Data Warehouse:
- Click Overview
- Search for your Virtual Warehouse
- Click on
- Open DAS

Now that we have DAS opened, click on Compose, copy-paste the following DDL into the Worksheet, make one modification, then execute:
CREATE DATABASE cdc; CREATE EXTERNAL TABLE IF NOT EXISTS cdc.LOCATION_EXT( location_id int, location_name string ) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' STORED AS TEXTFILE -- Modify with storage location base attribute: -- storage.location.base LOCATION '<storage-location>/staging/LOCATION/' tblproperties("skip.header.line.count"="1"); CREATE TABLE IF NOT EXISTS cdc.LOCATION ( location_id int, location_name string, PRIMARY KEY (location_id) DISABLE NOVALIDATE ) STORED AS ORC; INSERT OVERWRITE TABLE cdc.LOCATION SELECT * FROM cdc.LOCATION_EXT; CREATE EXTERNAL TABLE IF NOT EXISTS cdc.VACCINE_EXT( vaccine_id int, vaccine_name string, vaccine_dose int, vaccine_age string ) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' STORED AS TEXTFILE -- Modify with storage location base attribute: -- storage.location.base LOCATION '<storage-location>/staging/VACCINE/' tblproperties("skip.header.line.count"="1"); CREATE TABLE IF NOT EXISTS cdc.VACCINE ( vaccine_id int, vaccine_name string, vaccine_dose int, vaccine_age string, PRIMARY KEY (vaccine_id) DISABLE NOVALIDATE ) STORED AS ORC; INSERT OVERWRITE TABLE cdc.VACCINE SELECT * FROM cdc.VACCINE_EXT; CREATE EXTERNAL TABLE IF NOT EXISTS cdc.VACCINATION_RATE_EXT( location_id int, vaccine_id int, year int, rate double, lower_limit double, upper_limit double, confidence_interval double, sample_size double, target double ) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' STORED AS TEXTFILE -- Modify with storage location base attribute: -- storage.location.base LOCATION '<storage-location>/staging/VACCINATION_RATE/' tblproperties("skip.header.line.count"="1"); CREATE TABLE IF NOT EXISTS cdc.VACCINATION_RATE ( location_id int, vaccine_id int, year int, rate double, lower_limit double, upper_limit double, confidence_interval double, sample_size double, target double, PRIMARY KEY (location_id, vaccine_id, year) DISABLE NOVALIDATE ) STORED AS ORC; INSERT OVERWRITE TABLE cdc.VACCINATION_RATE SELECT * FROM cdc.VACCINATION_RATE_EXT;
NOTE: The modification needed above is to specify the storage location of the source data file - determined by Storage Location Base, when environment was provisioned.
For example:
- Property storage.location.base has value s3a://usermarketing-cdp-demo
- modify <storage-location> with s3a://usermarketing-cdp-demo
Let's run a simple query to verify data was copied over to Hive successfully:
SELECT * FROM cdc.vaccine;

Summary
Congratulations on completing the tutorial.
As you've now experienced, it is simple to create a data flow using NiFi's flexible processors to extract, transform, and load data into a Hive storage solution. Hopefully this will spark your imagination and begin to inspire other creative solutions using NiFi and Hive.
As your project(s) become larger and complicated, you should consider using NiFi Process Groups, which allow you to group a set of NiFi processors into their own embedded canvas.