X

Cloudera Tutorials

Optimize your time with detailed tutorials that clearly explain the best way to deploy, use, and manage Cloudera products. Login or register below to access all Cloudera tutorials.

Cloudera named a leader in 2022 Gartner® Magic Quadrant™ for Cloud Database Management Systems Get the report

Introduction

This tutorial will show you how use Apache NiFi to pull data in the form of a CSV file placed on a cloud storage solution, format it in order to send it to a messaging queue (Kafka), consuming from that queue to ingest it into an operational database (HBase), and then retrieving the data via SQL syntax using Phoenix, all within Cloudera Data Platform - Public Cloud (CDP-PC).

Prerequisites

  • Have administrator access to already created environment on Cloudera Data Platform (CDP) Public Cloud.
  • Have created a CDP workload User
  • Basic AWS CLI skills

Outline

Watch Video

The video below provides a brief overview of what is covered in this tutorial:

 

Download Assets

 

You have two (2) options to get the assets needed for this tutorial:

  1. Download a ZIP file

It contains only necessary files used in this tutorial. Unzip tutorial-files.zip and remember its location.

  1. Clone our GitHub repository

It provides assets used in this and other tutorials; organized by tutorial title.

 

Using AWS CLI, copy file corvette-data-june2020.csv to S3 bucket, s3a://<storage.location>/tutorial-opdb/, where <storage.location> is your environment’s property value for storage.location.base.

Note: You may need to ask your environment's administrator to get property value for storage.location.base.

 

In this example, property storage.location.base has value s3a://usermarketing-cdp-demo; therefore copy the file using the command:

aws s3 cp corvette-data-june2020.csv s3://usermarketing-cdp-demo/tutorial-opdb/corvette-data-june2020.csv

Provision Data Hub Clusters

This tutorial requires that we provision three (3) data hub clusters named:

 

1. um-nifi-demo, using cluster definition 7.2.0 - Flow Management Light Duty for AWS

 

2. um-streaming-demo, using cluster definition 7.2.0 - Streams Messaging Light Duty for AWS

3. um-opdb-demo, using cluster definition 7.2.0 - Operational Database with SQL for AWS

Note: Your CDP environment may have different cluster definitions.

Refer to How to Create a Data Hub on Cloudera Data Platform for details on how to provision a Data Hub.

Data Hub Configuration

In this section, we’ll configure our Data Hub Clusters to communicate with one another.

Flow Management

1. Download HBase configuration file, hbase-clientconfig.zip. Unzip the file and remember its location.

 

Begin from Operational Database Data Hub:

um-opdb-demo > CM-UI > Clusters > HBase > Actions > Download Client Configuration

 

2. Copy hbase-site.xml file into every NiFi worker node.

Note: There are other files in the client configuration file downloaded - we only need to copy hbase-site.xml.

a. Determine NiFi worker node(s) public IP addresses

Begin from Flow Management Data Hub: um-nifi-demo > Hardware > NiFi > Public IP

 

 

b. On the command line, issue the following commands on every NiFi worker node. This will copy the HBase configuration file, hbase-site.xml and set permissions accordingly:

Note: You need to update <USERNAME> and <PUBLIC_IP>, where <PUBLIC_IP> is the public IP address of each NiFi worker node.

 

 

scp hbase-conf/hbase-site.xml <USERNAME>@<PUBLIC_IP>:/tmp/hbase-site.xml

 

ssh <USERNAME>@<PUBLIC_IP>

 

chmod 777 /tmp/hbase-site.xml && exit

 

output-cp-hbase-config.jpg

Operational Database

We need to create an HBase table and column family - this is where our data will finally be stored.

 

1. Determine HBase worker node(s) public IP addresses

Begin from Operational Database Data Hub:

um-opdb-demo > Hardware > Worker > Public IP

 

2. In the command prompt, SSH into any HBase worker node and issue the following commands to create table and column family:

Note: You need to update <USERNAME> and <PUBLIC_IP>, where <PUBLIC_IP> is the public IP address of any (one) HBase worker node.

 

ssh username@<PUBLIC_IP>

 

hbase shell

 

create 'corvette_demo','corvette_data'

 

exit

 

exit

 

Streams Messaging

1. Add new schema

Begin from Streams Messaging Data Hub:

um-streaming-demo > Schema Registry

 

  • Click on   to add new schema

  • Name:

    corvette_schema

  • Description:

    corvette sensor data

  • Type:

    Avro schema provider

  • Schema Group:

    Kafka

  • Compatibility:

    Backward

  • Select checkbox

    EVOLVE

  • Drag & Drop file corvette-schema.json into SCHEMA TEXT

  • Save

 

2. Add new TOPIC:

Begin from Streams Messaging Data Hub:

um-streaming-demo > Streams Messaging Manager

 

dh-streamingdemo-smm
 
 
  1. Select

    Topics

  2. Click on Add New
  3. TOPIC NAME:

    corvette_stream

  4. PARTITIONS:

    5

  5. Availability:

    Maximum

  6. Cleanup.Policy:

    delete

  7. Save

 

Build and Configure NiFi Data Flow

 

Instead of building the data flow from scratch, we will use the template provided in download assets. If you want to learn to build the data flow from scratch, take a look at Importing RDBMS Data into Hive.

 

Begin from Flow Management Data Hub:

um-nifi-demo > NiFi

 

1. From the files downloaded, let’s import data flow template, corvette-dataflow-template.xml, into NiFi:

 

 

  • Click on  to upload template
  • Click and drag  into the canvas

 

2. Create variables that the data flow requires:

Right-click anywhere on the canvas and select Variables.

 

 
  • Name: cdpuser, Value: <use your CDP user id>
  • Name: kafkabrokers, Value: <list of Kafka broker addresses, separated by commas>

 

        The Kafka broker addresses are determined by:

            Begin from Streams Messaging Data Hub:

                um-streaming-demo > Streams Messaging Manager > Brokers

 

 

Based on the image above, the value would be:

um-streaming-demo-broker2.usermark.a465-9q4k.cloudera.site:9093,um-streaming-demo-broker1.usermark.a465-9q4k.cloudera.site:9093

 
 
  • Name: schemaurl, Value: Schema Registry URL using format https://<FQDN>:7790/api/v1 

    FQDN is determined by: 

Begin from Streams Messaging Data Hub:

um-streaming-demo > Hardware > Master > FQDN

 

 

Based on the image above, the value would be:

https://um-streaming-demo-master0.usermark.a465-9q4k.cloudera.site:7790/api/v1

 

3. Update Processors and Controller Services:

Processors:

  • Fetch_from_S3 > Properties
    • Kerberos Password: <your-cdp-password>
  • Publish_Kafka_Topic > Properties
    • SSL Context Service: select Default NiFi SSL Context Service
    • Password: <your-cdp-password>
  • ConsumeKafkaRecord_2_0 > Properties 
    • SSL Context Service: select Default NiFi SSL Context Service
    • Password: <your-cdp-password>

 

Controller Service:

  • HortonworksSchemaRegistry > Properties
    • Kerberos Password: <your-cdp-password>
    • SSL Context Service: select Default NiFi SSL Context Service
  • HBase_2_ClientService > Properties
    • Kerberos Password: <your-cdp-password>

 

4. Enable All Controller Services

  • JsonTreeReader
  • JsonRecordSetWriter
  • HortonworksSchemaRegistry
  • HBase_2_ClientService
  • CorvetteAvroReader
  • CSVReader
  • AvroRecordSetWriter

    Note: If you see two (2) services named Default NiFi SSL Context Service, delete the duplicate service marked Invalid.

 

Run Data Flow

Let's run the data flow you have just created. You can run the entire processor group or single processors at a time. For general debugging and diagnostics, it is recommended to run one processor at a time.

 

Let's run all processors at once by clicking  from the Operate menu.

After a few seconds, you will see the data flow through all the processors. Click on  from the Operate menu to stop all processors at once.

 

 

Let’s take a look at some data throughput metrics on the Kafka Topic, corvette_stream, we created:

Begin from Streams Messaging Data Hub:

1. um-streaming-demo > Streams Messaging Manager > Topics

2. Search for corvette_stream

3. Click on  to view the profile

 

 

4. Streamed data throughput metrics for our Kafka topic:

 

View HBase Data

We will use Apache Phoenix to create a view against our HBase data and use SQL-like statements to see our data.

In the command prompt, SSH into any (one) HBase worker node, just like you did in Operational Database Configuration:

 

ssh username@<PUBLIC_IP>

 

phoenix-sqlline

 

  • Create View
create view "corvette_demo" (
    scanID                                          VARCHAR PRIMARY KEY
  , "corvette_data"."Offset"                        VARCHAR
  ,"corvette_data"."IntakeManifoldAbsolutePressure" VARCHAR
  ,"corvette_data"."ShortTermFuelTrimBank2"         VARCHAR
  ,"corvette_data"."ShortTermFuelTrimBank1"         VARCHAR
  ,"corvette_data"."FuelSystemStatus"               VARCHAR
  ,"corvette_data"."MassAirflow"                    VARCHAR
  ,"corvette_data"."O2VoltageB1S1"                  VARCHAR
  ,"corvette_data"."O2VoltageB2S1"                  VARCHAR
  ,"corvette_data"."TimingAdvance"                  VARCHAR
  ,"corvette_data"."EquivalenceRatioCommanded"      VARCHAR
  ,"corvette_data"."EngineRPM"                      VARCHAR
  ,"corvette_data"."CommandedThrottleActuator"      VARCHAR
  ,"corvette_data"."ThrottlePosition"               VARCHAR
  ,"corvette_data"."AbsoluteLoad"                   VARCHAR
  ,"corvette_data"."MassAirflowSensor"              VARCHAR
  ,"corvette_data"."VehicleSpeed"                   VARCHAR
  ,"corvette_data"."IntakeAirTemp"                  VARCHAR
  ,"corvette_data"."EngineCoolantTemp"              VARCHAR
  ,"corvette_data"."Wideband02Sensor"               VARCHAR
);

 

  • Run query to find abnormal data, FuelSystemStatus not equal to ‘CL - Normal’

SELECT "IntakeManifoldAbsolutePressure"
     , "MassAirflow"
     , "EngineRPM"
     , "MassAirflowSensor"
     , "FuelSystemStatus" 
  FROM "corvette_demo" 
  WHERE "FuelSystemStatus" NOT IN ('CL - Normal')
;

 

  • Exit phoenix-sqlline:

    !quit

 

Summary

Congratulations on completing the tutorial.

As you've now experienced, it takes a small amount of effort to configure multiple Data Hub clusters to communicate with one another using Cloudera Data Platform - Public Cloud (CDP-PC).

NiFi’s flexible processors make it simple to extract, transform and load data into HBase - hopefully this tutorial sparks your imagination and inspires other creative solutions.

Further Reading

Your form submission has failed.

This may have been caused by one of the following:

  • Your request timed out
  • A plugin/browser extension blocked the submission. If you have an ad blocking plugin please disable it and close this message to reload the page.