Your browser is out of date

Update your browser to view this website correctly. Update my browser now

×

Cloudera Tutorials

Optimize your time with detailed tutorials that clearly explain the best way to deploy, use, and manage Cloudera products. Login or register below to access all Cloudera tutorials.

 

Introduction

 

This tutorial will show you how use Apache NiFi to pull data in the form of a CSV file placed on a cloud storage solution, format it in order to send it to a messaging queue (Kafka), consuming from that queue to ingest it into an operational database (HBase), and then retrieving the data via SQL syntax using Phoenix, all within Cloudera Data Platform - Public Cloud (CDP-PC).

 

Prerequisites

 

  • Have access to Cloudera Data Platform (CDP) Public Cloud with a Data Lake running.
  • If you're new to CDP or need a CDP environment please visit Discover CDP
  • Have AWS CLI fundamental knowledge

 

Outline

 

 

Watch Video

 

The video below provides a brief overview of what is covered in this tutorial:

 

 

Download Assets

 

Download and unzip tutorial files; remember location where you extracted the files.

 

Using AWS CLI, copy file corvette-data-june2020.csv to S3 bucket, s3a://<storage.location>/tutorial-opdb/, where <storage.location> is your environment’s property value for storage.location.base. In this example, property storage.location.base has value s3a://usermarketing-cdp-demo:

aws s3 cp corvette-data-june2020.csv s3://usermarketing-cdp-demo/tutorial-opdb/corvette-data-june2020.csv

 

Provision Data Hub Clusters

 

This tutorial requires that we provision three (3) data hub clusters named:

 

1. um-nifi-demo, using cluster definition 7.2.0 - Flow Management Light Duty for AWS

 

 

2. um-streaming-demo, using cluster definition 7.2.0 - Streams Messaging Light Duty for AWS

 

3. um-opdb-demo, using cluster definition 7.2.0 - Operational Database with SQL for AWS

 

Note: Your CDP environment may have different cluster definitions.

Refer to How to Create a Data Hub on Cloudera Data Platform for details on how to provision a Data Hub.

 

Data Hub Configuration

 

In this section, we’ll configure our Data Hub Clusters to communicate with one another.

 

Flow Management

 

1. Download HBase configuration file, "hbase-clientconfig.zip" and unzip into the same directory you downloaded the tutorial files.

 

Begin from Operational Database Data Hub:

um-opdb-demo > CM-UI > Clusters > HBase > Actions > Download Client Configuration

 

2. Copy "hbase-site.xml" file into every NiFi worker node.

Note: There are other files in the client configuration file downloaded - we only need to copy hbase-site.xml.

a. Determine NiFi worker node(s) public IP addresses

Begin from Flow Management Data Hub: um-nifi-demo > Hardware > NiFi > Public IP

 

 

b. In the command line, issue the following commands on every NiFi worker node. This will copy the HBase configuration file, hbase-site.xml and set permissions accordingly:

Note: You need to update <USERNAME> and <PUBLIC_IP>, where <PUBLIC_IP> is the public IP address of each NiFi worker node.

 

 

scp hbase-conf/hbase-site.xml <USERNAME>@<PUBLIC_IP>:/tmp/hbase-site.xml

 

ssh -i ~/.ssh/cdp_key.pem cloudbreak@<PUBLIC_IP>

 

sudo chmod 777 /tmp/hbase-site.xml && sudo chown nifi:nifi /tmp/hbase-site.xml && exit

 

3. Restart NiFi

 

NiFi needs to be restarted to pick-up the changes made to the worker nodes. First, we need to stop all services dependent on NiFi before restarting NiFi:

 

Begin from Flow Management Data Hub:

um-nifi-demo > CM-UI > Clusters > NiFi Registry > Actions > Stop

 

 

um-nifi-demo > CM-UI > Clusters > NiFi > Actions > Restart

 

 

um-nifi-demo > CM-UI > Clusters > NiFi Registry > Actions > Start

 

 

Operational Database

 

We need to create an HBase table and column family - this is where our data will finally be stored.

 

1. Determine HBase worker node(s) public IP addresses

 

Begin from Operational Database Data Hub:

um-opdb-demo > Hardware > Worker > Public IP

 

2. In the command prompt, SSH into any HBase worker node and issue the following commands to create table and column family:

 

Note: You need to update <USERNAME> and <PUBLIC_IP>, where <PUBLIC_IP> is the public IP address of any HBase worker node.

 

ssh username@<PUBLIC_IP>

 

hbase shell

 

create 'corvette_demo','corvette_data'

 

exit

 

exit

 

 

Streams Messaging

 

1. Add new schema

 

Begin from Streams Messaging Data Hub:

um-streaming-demo > Schema Registry

 

  • Click on   to add new schema

  • Name:

    corvette_schema

  • Description:

    corvette sensor data

  • Type:

    Avro schema provider

  • Schema Group:

    Kafka

  • Compatibility:

    Backward

  • Select checkbox

    EVOLVE

  • Drag & Drop file corvette-schema.json into SCHEMA TEXT

  • Save

 

2. Add new TOPIC:

 

Begin from Streams Messaging Data Hub:

um-streaming-demo > Streams Messaging Manager

 

dh-streamingdemo-smm
 
 
  1. Select

    Topics

  2. Click on Add New
  3. TOPIC NAME:

    corvette_stream

  4. PARTITIONS:

    5

  5. Availability:

    Maximum

  6. Cleanup.Policy:

    delete

  7. Save

 

 

Build and Configure NiFi Data Flow

 

The data flow for this tutorial is too complex to build from scratch, so please use the provided template. If you’d like to build your own, you can refer to Importing RDBMS Data into Hive for instructions on how to build a data flow.

 

Begin from Flow Management Data Hub:

um-nifi-demo > NiFi

 

1. From the files downloaded, let’s import data flow template, corvette-dataflow-template.xml, into NiFi:

 

 

  • Click on  to upload template
  • Click and drag  into the canvas

 

2. Create variables that the data flow requires:

 

Right-click anywhere on the canvas and select Variables.

 

 
  • Name: username, Value: <use your CDP user id>
  • Name: kafkabrokers, Value: <list of Kafka broker addresses, separated by commas>

 

        The Kafka broker addresses are determined by:

            Begin from Streams Messaging Data Hub:

                um-streaming-demo > Streams Messaging Manager > Brokers

 

 

Based on the image above, the value would be:

um-streaming-demo-broker2.usermark.a465-9q4k.cloudera.site:9093,um-streaming-demo-broker1.usermark.a465-9q4k.cloudera.site:9093

 
 
  • Name: schemaurl, Value: Schema Registry URL using format https://<FQDN>:7790/api/v1 

    FQDN is determined by: 

Begin from Streams Messaging Data Hub:

um-streaming-demo > Hardware > Master > FQDN

 

 

Based on the image above, the value would be:

https://um-streaming-demo-master0.usermark.a465-9q4k.cloudera.site:7790/api/v1

 

3. Update Processors and Controller Services:

 

Processors:

  • Fetch_from_S3 > Properties
    • Kerberos Password: <your-cdp-password>
  • Publish_Kafka_Topic > Properties
    • SSL Context Service: select Default NiFi SSL Context Service
    • Password: <your-cdp-password>
  • ConsumeKafkaRecord_2_0 > Properties 
    • SSL Context Service: select Default NiFi SSL Context Service
    • Password: <your-cdp-password>

 

Controller Service:

  • HortonworksSchemaRegistry > Properties
    • Kerberos Password: <your-cdp-password>
    • SSL Context Service: select Default NiFi SSL Context Service
  • HBase_2_ClientService > Properties
    • Kerberos Password: <your-cdp-password>

 

4. Enable All Controller Services

 

  • JsonTreeReader
  • JsonRecordSetWriter
  • HortonworksSchemaRegistry
  • HBase_2_ClientService
  • CorvetteAvroReader
  • CSVReader
  • AvroRecordSetWriter

    Note: If you see two (2) services named Default NiFi SSL Context Service, delete the duplicate service marked Invalid.

 

 

Run Data Flow

 

Let's run the data flow you have just created. You can run the entire processor group or single processors at a time. For general debugging and diagnostics, it is recommended to run one processor at a time.

 

Let's run all processors at once by clicking  from the Operate menu.

After a few seconds, you will see the data flow through all the processors. Click on  from the Operate menu to stop all processors at once.

 

 

Let’s take a look at some data throughput metrics on the Kafka Topic, corvette_stream, we created:

Begin from Streams Messaging Data Hub:

1. um-streaming-demo > Streams Messaging Manager > Topics

2. Search for corvette_stream

3. Click on  to view the profile

 

 

4. Streamed data throughput metrics for our Kafka topic:

 

 

View HBase Data

 

We will use Apache Phoenix to create a view against our HBase data and use SQL-like statements to see our data.

In the command prompt, SSH into any HBase worker node, just like you did in Operational Database Configuration:

 

ssh username@<PUBLIC_IP>

 

phoenix-sqlline

 

  • Create View
create view "corvette_demo" (
    scanID                                          VARCHAR PRIMARY KEY
  , "corvette_data"."Offset"                        VARCHAR
  ,"corvette_data"."IntakeManifoldAbsolutePressure" VARCHAR
  ,"corvette_data"."ShortTermFuelTrimBank2"         VARCHAR
  ,"corvette_data"."ShortTermFuelTrimBank1"         VARCHAR
  ,"corvette_data"."FuelSystemStatus"               VARCHAR
  ,"corvette_data"."MassAirflow"                    VARCHAR
  ,"corvette_data"."O2VoltageB1S1"                  VARCHAR
  ,"corvette_data"."O2VoltageB2S1"                  VARCHAR
  ,"corvette_data"."TimingAdvance"                  VARCHAR
  ,"corvette_data"."EquivalenceRatioCommanded"      VARCHAR
  ,"corvette_data"."EngineRPM"                      VARCHAR
  ,"corvette_data"."CommandedThrottleActuator"      VARCHAR
  ,"corvette_data"."ThrottlePosition"               VARCHAR
  ,"corvette_data"."AbsoluteLoad"                   VARCHAR
  ,"corvette_data"."MassAirflowSensor"              VARCHAR
  ,"corvette_data"."VehicleSpeed"                   VARCHAR
  ,"corvette_data"."IntakeAirTemp"                  VARCHAR
  ,"corvette_data"."EngineCoolantTemp"              VARCHAR
  ,"corvette_data"."Wideband02Sensor"               VARCHAR
);

 

  • Run query to find abnormal data, FuelSystemStatus not equal to ‘CL - Normal’

SELECT "IntakeManifoldAbsolutePressure"
     , "MassAirflow"
     , "EngineRPM"
     , "MassAirflowSensor"
     , "FuelSystemStatus" 
  FROM "corvette_demo" 
  WHERE "FuelSystemStatus" NOT IN ('CL - Normal')
;

 

  • Exit phoenix-sqlline:

    !quit

 

 

Summary

 

Congratulations on completing the tutorial.

As you've now experienced, it takes a small amount of effort to configure multiple Data Hub clusters to communicate with one another using Cloudera Data Platform - Public Cloud (CDP-PC).

NiFi’s flexible processors make it simple to extract, transform and load data into HBase - hopefully this tutorial sparks your imagination and inspires other creative solutions.

 

Further Reading

 

Your form submission has failed.

This may have been caused by one of the following:

  • Your request timed out
  • A plugin/browser extension blocked the submission. If you have an ad blocking plugin please disable it and close this message to reload the page.