Your browser is out of date

Update your browser to view this website correctly. Update my browser now

×

Cloudera Tutorials

Optimize your time with detailed tutorials that clearly explain the best way to deploy, use, and manage Cloudera products. Login or register below to access all Cloudera tutorials.

Introduction

 

In this tutorial we will use Cloudera Data Platform Public Cloud (CDP-PC) to build a data flow using Apache NiFi to extract data from an external relational database into CDP data warehouse, Apache Hive.

 

Prerequisites

 

 

Watch Video

 

The video below provides a brief overview of what is covered in this tutorial:

 

 

Setup External Relational Database

 

Generally, we don't need to setup an external relational database because most likely, you already have one. In order to see this tutorial through, we will create an Oracle database on the cloud, using AWS. Let's get started!

  1. Download and unzip tutorial files
  2. Using command line, move into the folder where you unzipped the tutorial files
  3. Issue the AWS CLI command to create Oracle database instance:
  4. aws rds create-db-instance  \
            --db-name ORCL  \
            --db-instance-identifier sampledb-orcl  \
            --allocated-storage 20  \
            --db-instance-class db.t3.small  \
            --engine oracle-se1  \
            --master-username admin  \
            --master-user-password tut0rial  \
            --publicly-accessible  \
            --backup-retention-period 0  \
            --license-model license-included \
            --db-subnet-group-name <MODIFY_MY_DB_SUBNET_GROUP>
    

Note: You need to modify the command above using your DB subnet group. Refer to AWS documentation for more information.

 

create-orcl-database-instance

 

When the Oracle database, sampledb-orcl, is successfully created and available; you need two pieces of information - Endpoint & Port. You can get this information by executing the following AWS CLI commands. Refer to AWS documentation for more information.

aws rds describe-db-instances --db-instance-identifier sampledb-orcl | grep Address && aws rds describe-db-instances --db-instance-identifier sampledb-orcl | grep \"Port\"

 

create-orcl-database-endpoint-port

 

Using the Endpoint and Port information we just gathered, let's execute the following SQLLINE commands to:

  • Create user (cdc)
  • ./sqlline -u jdbc:oracle:thin:@ENDPOINT:1521:ORCL -n admin -p tut0rial -f create_orcl_user.sql
    
  • Create Oracle tables (LOCATION, VACCINE, VACCINATION_RATE)
  • ./sqlline -u jdbc:oracle:thin:@ENDPOINT:1521:ORCL -n cdc -p tut0rial -f create_orcl_tables.sql
    
  • Populate Oracle tables (will take some time)
  • ./sqlline -u jdbc:oracle:thin:@ENDPOINT:1521:ORCL -n cdc -p tut0rial -f load_orcl_tables.sql
    

Note: The JDBC database connection URL will differ if you are using an RDBMS other than Oracle.

 

create-orcl-database-load-data

 

Great! We just created an Oracle relational database. We will use it to migrate data over to CDP data warehouse using NiFi.

 

Create Data Hub

 

If you do not already have a Data Hub created for Flow Management, let's create one now. Otherwise, you can skip this step.

Select Management Console from Cloudera Data Platform (CDP) home page.

 

cdp-home-management-console

 

In the Environments section, search for your environment and click on its name:

 

environment-list

 

Select Data Hubs to see all the data hubs we have created in our environment.

Since we are creating a new data hub, click on Create Data Hub.

 

data-hub-list

 

Let's complete the data hub provisioning form:

  1. Selected Environment: usermarketing
  2. Select Cluster Definition
  3. Cluster Definition: 7.1.0 - Flow Management Light Duty for AWS
  4. Cluster Name: um-nifi-tutorial
  5. Click on Provision Cluster

Note: Your CDP environment may be different and adjustments may be necessary.

 

data-hub-provisioning-form

 

After a few minutes, the newly created data hub will be Running and available for use. Click on the data hub name, um-nifi-tutorial.

 

data-hub-created

 

We are now ready to proceed to the highlight of this tutorial - NiFi!

In the Services section, click on the NiFi iconto start building our data flow.

 

data-hub-start-nifi

 

Build NiFi Data Flow

 

We are now ready to build our NiFi data flow to extract data from the Oracle database we just created and move the data into an S3 bucket.

This data flow requires six (6) NiFi processors. We will create them one at a time. Let's get started.

 

Processor 1: List Database Tables

 

Click and drag processor icon into canvas

  1. Filter on ListDatabaseTables
  2. Click on ADD

 

P1-add-processor

 

Right-click on processor and select Configure

 

P1-processor-configure

 

SETTINGS:

  1. Name: List CDC Tables

 

P1-settings

 

SCHEDULING:

  1. Execution: Primary node

Note: In clustered environment, when getting data from external source, you should execute on primary node only. Otherwise, you may have duplicate data in multiple nodes.

 

P1-scheduling

 

PROPERTIES:

  1. Database Connection Pooling Service: DBCPConnectionPool

    The service controller does not exist, therefore, create it using defaults (no changes).

 

P1-add-controller-service

 

  1. Schema Pattern:

    %CDC%

  2. Click on the  to configure the Controller Service

    When prompted to save changes - click YES

 

P1-properties

 

Next to DBCPConnectionPool, click on to configure properties

 

P1-configure-dbcpconnectionpool

 

PROPERTIES:

  1. Database Connection URL:

    jdbc:oracle:thin:@ENDPOINT:1521/ORCL

    Replace ENDPOINT with your database endpoint, as covered earlier.

  2. Database Driver Class Name:

    oracle.jdbc.OracleDriver

  3. Database Driver Location(s):

    /tmp/jdbc-drivers/ojdbc6.jar

  4. Kerberos Principal: CDP-USERID
  5. Kerberos Password: CDP-PASSWORD
  6. Database User:

    cdc

  7. Password:

    tut0rial

  8. Apply changes

 

P1-dbcpconnectionpool-properties

 

Notice the Invalid State on the DBCPConnectionPool controller service. This is because the database JDBC driver does not exist.

 

P1-invalid-jdbc-driver-dbcpconnectionpool

 

We've included a copy of the JDBC driver in the tutorial files you downloaded earlier. We will copy the JDBC driver into every NiFi worker node. To do this, we need the public IP address of each NiFi worker node. Here's one way to get the public IP addresses:

From the Cloudera Data Platform (CDP) home page, select Data Hub Clusters

 

cdp-home-data-hub-clusters

 

Search for our data hub, um-nifi-tutorial and click on the name

 

data-hub-search

 

Select the Hardware tab. Under the NiFi section, you will find all public IP addresses of all NiFi worker nodes. Make a note of these addresses.

 

data-hub-ip-address

 

From the command line, move to the folder where you unzipped the tutorial files. Issue the following commands on every NiFi worker node. This will copy the local JDBC driver and set permissions accordingly:

Note: You need to update <PUBLIC_IP>, where <PUBLIC_IP> is the public IP address of each NiFi worker node.

scp ojdbc6.jar userid@<PUBLIC_IP>:/tmp/ojdbc6.jar
ssh userid@<PUBLIC_IP>
mkdir -p /tmp/jdbc-drivers && chmod 777 /tmp/jdbc-drivers && mv /tmp/ojdbc6.jar /tmp/jdbc-drivers/ && chmod 777 /tmp/jdbc-drivers/ojdbc6.jar && exit

 

cmd-copy-jdbc-to-cdp

 

Now that we resolved the warning, click on to enable this service controller.

 

P1-dbcpconnectionpool-enable

 

Click on ENABLE, then CLOSE

 

P1-dbcpconnectionpool-enable-controller

 

Processor 1 complete... Next!

 

Processor 2: Generate Query

 

Click and drag processor icon into canvas

  • Filter on ReplaceText
  • Click on ADD

Right-click on processor and select Configure

SETTINGS:

  • Name:

    Generate Query

  • Automatically Terminate Relationships: Check Box failure

 

P2-settings

 

PROPERTIES:

  • Replacement Value:

    SELECT * FROM ${db.table.fullname}

APPLY to save changes

 

P2-properties

 

Link the processors together by click and drag from Processor1 to Processor2

 

link-P1-P2

 

To evenly distribute the workload, choose Round Robin for the Load Balance Strategy in the connection configuration.

 

link-P1-P2-settings

 

Processor 2 complete... Next!

 

Processor 3: Select CDC Data

 

Click and drag processor icon into canvas

  1. Filter on ExecuteSQL
  2. Click on ADD

Right-click on processor and select Configure

SETTINGS:

  • Name:

    Select CDC Data

  • Automatically Terminate Relationships: Check Box failure

 

P3-settings

 

PROPERTIES:

  • Database Connection Pooling Service: DBCPConnectionPool

APPLY to save changes

 

P3-properties

 

Link the processors together by click and drag from Processor2 to Processor3:

  • Under the DETAILS tab, set For Relationships to success

 

link-details-P2-P3
link-P2-P3

Processor 3 complete... Next!

 

Processor 4: Convert Record

 

Click and drag processor icon into canvas

  1. Filter on ConvertRecord
  2. Click on ADD

 

Right-click on processor and select Configure

SETTINGS:

  • Name:

    Convert Record

  • Automatically Terminate Relationships: Check Box failure

 

P4-settings

 

PROPERTIES:

  1. Record Reader: AvroReader

    The service controller does not exist, therefore, create it using defaults (no changes).

 

P4-add-avro-service-controller

 

  1. Record Writer: CSVRecordSetWriter

    The service controller does not exist, therefore, create it by selecting Compatible Controller Service CSVRecordSetWriter.

 

P4-add-csv-service-controller

 

  1. Click on the first  to configure the Service Controller 

    When prompted to save changes - click YES

 

P4-properties

 

Neither AvroReader nor CSVRecordSetWriter need to be configured. Click on to enable the service controllers.

 

P4-configure-avro

 

Link the processors together by click and drag from Processor3 to Processor4:

  • Under the DETAILS tab, set For Relationships to success

 

link-P3-P4

 

Processor 4 complete... Next!

 

Processor 5: Update File Name

 

Click and drag processor icon into canvas

  1. Filter on UpdateAttribute
  2. Click on ADD

 

Right-click on processor and select Configure

SETTINGS

  • Name:

    Update File Name

 

PROPERTIES

Add a new property by clicking 

  • Property:

    filename

  • Value:

    ${db.table.name}.csv

 

APPLY to save changes

 

P5-properties

 

Link the processors together by click and drag from Processor4 to Processor5:

  • Under the DETAILS tab, set For Relationships to success

 

link-P4-P5

 

Processor 5 complete... Next!

 

Processor 6: Write to S3

 

Click and drag processor icon into canvas

  1. Filter on PutHDFS
  2. Click on ADD

 

Right-click on processor and select Configure

SETTINGS:

  • Name:

    Write to S3

  • Automatically Terminate Relationships: Check Box failure and success

 

P6-settings

 

PROPERTIES

  1. Hadoop Configuration Resources:

    /etc/hadoop/conf.cloudera.core_settings/core-site.xml

  2. Kerberos Principal:

    CDP-USERID

  3. Kerberos Password:

    CDP-PASSWORD

  4. Directory:

    /staging/${db.table.name}

  5. Conflict Resolution Strategy:

    replace

  6. Apply changes

 

P6-properties

 

Link the processors together by click and drag from Processor5 to Processor6:

 

link-P5-P6

 

Let's run the data flow you have just created. You can run the entire processor group or single processors at a time. For generally debugging and diagnostics, it is recommended to run one processor at a time.

Let's run all processors at once by clicking  from the Operate menu.
Likewise, to stop all processors at once, click on  from the Operate menu.

 

run-data-flow

 

Create Data Flow Template

 

Let's save this data flow for later or just for sharing.

  1. Click on from the Operate menu to create template
  2. Template Name:

    Move RDBMS data to S3

  3. CREATE
  4. Click on  for more options
  5. Select Templates

 

nifi-template-save

 

Click on  to download the template file.

 

nifi-template-download

 

Create and Populate Hive Tables

 

Now that we successfully extracted data from an Oracle database and stored it in an S3 bucket, we will use Data Analytics Studio (DAS) to move the data into Hive.

Open DAS from your virtual warehouse.

Beginning from CDP home page > Data Warehouse:

  1. Click Overview
  2. Search for your Virtual Warehouse
  3. Click on 
  4. Open DAS

 

virtual-warehouse-options-das

 

Now that we have DAS opened, click on Compose, copy-paste the following DDL into the Worksheet, make one modification, then execute:

 

CREATE DATABASE cdc;

CREATE EXTERNAL TABLE IF NOT EXISTS cdc.LOCATION_EXT(
                                        location_id int,
                                        location_name string
                                    ) 
                                    ROW FORMAT DELIMITED
                                    FIELDS TERMINATED BY ','
                                    STORED AS TEXTFILE

               -- Modify with storage location base attribute:
               --                      storage.location.base
                                   LOCATION '<storage-location>/staging/LOCATION/'
                                    tblproperties("skip.header.line.count"="1");

CREATE TABLE IF NOT EXISTS cdc.LOCATION (
                                        location_id int,
                                        location_name string,
                                        PRIMARY KEY (location_id) DISABLE NOVALIDATE
                                    )
                                    STORED AS ORC;

INSERT OVERWRITE TABLE cdc.LOCATION SELECT * FROM cdc.LOCATION_EXT;


CREATE EXTERNAL TABLE IF NOT EXISTS cdc.VACCINE_EXT(
                                        vaccine_id int,
                                        vaccine_name string,
                                        vaccine_dose int,
                                        vaccine_age string
                                    ) 
                                    ROW FORMAT DELIMITED
                                    FIELDS TERMINATED BY ','
                                    STORED AS TEXTFILE
               -- Modify with storage location base attribute:
               --                      storage.location.base
                                    LOCATION '<storage-location>/staging/VACCINE/'
                                    tblproperties("skip.header.line.count"="1");

CREATE TABLE IF NOT EXISTS cdc.VACCINE (
                                        vaccine_id int,
                                        vaccine_name string,
                                        vaccine_dose int,
                                        vaccine_age string,
                                        PRIMARY KEY (vaccine_id) DISABLE NOVALIDATE
                                    )
                                    STORED AS ORC;

INSERT OVERWRITE TABLE cdc.VACCINE SELECT * FROM cdc.VACCINE_EXT;



CREATE EXTERNAL TABLE IF NOT EXISTS cdc.VACCINATION_RATE_EXT(
                                        location_id int,
                                        vaccine_id int,
                                        year int,
                                        rate double,
                                        lower_limit double,
                                        upper_limit double,
                                        confidence_interval double,
                                        sample_size double,
                                        target double
                                    ) 
                                    ROW FORMAT DELIMITED
                                    FIELDS TERMINATED BY ','
                                    STORED AS TEXTFILE
               -- Modify with storage location base attribute:
               --                      storage.location.base
                                    LOCATION '<storage-location>/staging/VACCINATION_RATE/'
                                    tblproperties("skip.header.line.count"="1");

CREATE TABLE IF NOT EXISTS cdc.VACCINATION_RATE (
                                        location_id int,
                                        vaccine_id int,
                                        year int,
                                        rate double,
                                        lower_limit double,
                                        upper_limit double,
                                        confidence_interval double,
                                        sample_size double,
                                        target double,
                                        PRIMARY KEY (location_id, vaccine_id, year) DISABLE NOVALIDATE
                                    )
                                    STORED AS ORC;

INSERT OVERWRITE TABLE cdc.VACCINATION_RATE SELECT * FROM cdc.VACCINATION_RATE_EXT;

NOTE: The modification needed above is to specify the storage location of the source data file - determined by Storage Location Base, when environment was provisioned.

For example:

  • Property storage.location.base has value s3a://usermarketing-cdp-demo
  • modify <storage-location> with s3a://usermarketing-cdp-demo

 

 

Let's run a simple query to verify data was copied over to Hive successfully:

SELECT * FROM cdc.vaccine;

 

hive-output-select-all

 

Summary

 

Congratulations on completing the tutorial.

As you've now experienced, it is simple to create a data flow using NiFi's flexible processors to extract, transform, and load data into a Hive storage solution. Hopefully this will spark your imagination and begin to inspire other creative solutions using NiFi and Hive.

As your project(s) become larger and complicated, you should consider using NiFi Process Groups, which allow you to group a set of NiFi processors into their own embedded canvas.

 

 

Further Reading

 

Videos

Other

 

Your form submission has failed.

This may have been caused by one of the following:

  • Your request timed out
  • A plugin/browser extension blocked the submission. If you have an ad blocking plugin please disable it and close this message to reload the page.