X

Cloudera Tutorials

Optimize your time with detailed tutorials that clearly explain the best way to deploy, use, and manage Cloudera products. Login or register below to access all Cloudera tutorials.

By registering or submitting your data, you acknowledge, understand, and agree to Cloudera's Terms and Conditions, including our Privacy Statement.
By checking this box, you consent to receive marketing and promotional communications about Cloudera’s products and services and/or related offerings from us, or sent on our behalf, in accordance with our Privacy Statement. You may withdraw your consent by using the unsubscribe or opt-out link in our communications.

Cloudera acquires Octopai's platform to enhance metadata management capabilities

Read the press release

Introduction

Data Lifecycle - collecting data. This tutorial will show you how to use Apache NiFi to pull data from a cloud storage solution, format it to send it into a messaging queue (Kafka), and finally consume from that queue to ingest it into both an Apache Hive ready format and an operational database (HBase).

Prerequisites

Outline

Watch Video

The video below provides a brief overview of what is covered in this tutorial:

 

Download Assets

 

You have two (2) options to get the assets needed for this tutorial:

  1. Download a ZIP file

It contains only necessary files used in this tutorial. Unzip tutorial-files.zip and remember its location.

  1. Clone our GitHub repository

It provides assets used in this and other tutorials; organized by tutorial title.

 

Using AWS CLI, copy file parts_production_export.csv to S3 bucket, defined by your environment’s storage.location.base attribute.

Note: You may need to ask your environment's administrator to get property value for storage.location.base.

 

For example, property storage.location.base has value s3a://usermarketing-cdp-demo; therefore copy the files using the command:

aws s3 cp parts_production_export.csv s3://usermarketing-cdp-demo/parts_production_export.csv

 

aws-copy-files

Provision Data Hub Clusters

This tutorial requires that we provision three (3) data hub clusters named:

  1. flow-management, using cluster definition 7.x - Flow Management Light Duty for AWS

 

provision-flow-management

 

  1. streams-messaging, using cluster definition 7.x - Streams Messaging Light Duty for AWS

 

provision-streams-messaging

 

  1. operational-database, using cluster definition 7.x - Operational Database with SQL for AWS

 

provision-operational-database

 

Note: Your CDP environment may have different cluster definitions.

Refer to How to Create a Data Hub on Cloudera Data Platform for details on how to provision a Data Hub.

 

Data Hub Configuration

In this section, we’ll configure our Data Hub Clusters to communicate with one another.

Flow Management

From the operational-database data hub, download HBase configuration file, hbase-clientconfig.zip. Unzip the file and remember its location.

 

Begin from Operational Database Data Hub:

operational-database > CM-UI > Clusters > HBase > Actions > Download Client Configuration

 

download-hbase-config

 

Copy hbase-site.xml file into every NiFi worker node.

Note: There are other files in the client configuration file downloaded - we only need to copy hbase-site.xml.

 

Here is how to find all NiFi worker node(s) public IP addresses:

Begin from Flow Management Data Hub:

flow-management > Hardware > NiFi > Public IP

 

nifi-worker-nodes

 

On the command line, issue the following commands on every NiFi worker node. This will copy the HBase configuration file, hbase-site.xml and set permissions accordingly:

Note: You need to update <USERNAME> and <PUBLIC_IP>, where <PUBLIC_IP> is the public IP address of each NiFi worker node.

 

scp hbase-conf/hbase-site.xml <USERNAME>@<PUBLIC_IP>:/tmp/hbase-site.xml

 

ssh <USERNAME>@<PUBLIC_IP>

 

chmod 644 /tmp/hbase-site.xml && exit

 

 

output-nifi-hbase-config

Operational Database

We need to create an HBase table and column family - this is where our data will finally be stored.

 

Determine HBase worker node(s) public IP addresses:

Begin from Operational Database Data Hub:

operational-database > Hardware > Worker > Public IP

 

opdb-worker-nodes

 

On the command line, SSH into any (one)  HBase worker node and issue the following commands to create a table and column family:

 

Note: You need to update <USERNAME> and <PUBLIC_IP>, where <PUBLIC_IP> is the public IP address of any (one) HBase worker node.

 

ssh <USERNAME>@<PUBLIC_IP>

 

hbase shell

 

create 'inventory','parts_data'

 

exit

 

exit

 

output-hbase-create-table

Streams Messaging

Let's create five (5) new Topics:

Begin from Streams Messaging Data Hub:

streams-messaging > Streams Messaging Manager

 

open-streams-messaging

 

  1. Select Topics
  2. Select Add New
  3. TOPIC NAME: Factory-1
  4. PARTITIONS: 5
  5. Availability: Maximum
  6. Cleanup Policy: delete
  7. Save

Repeat the same steps to create the following named topics:

Factory-2

Factory-3

Factory-4

Factory-5

 

smm-add-topics

Build and Configure NiFi DataFlow

Instead of building the data flow from scratch, we will use the template provided in download assets. If you want to learn to build the data flow from scratch, take a look at Importing RDBMS Data into Hive.

 

Begin from Flow Management Data Hub:

flow-management > NiFi

 

 

 

open-flow-management

Upload NiFi Template

NiFi data flow template, collect-dataflow-template.xml, was provided in download assets. Follow these easy  steps to upload.

 

  1. Click on  to upload collect-dataflow-template.xml template
  2. Click and drag  into the canvas and select collect-dataflow-template

 

Create Variables used in DataFlow

Right-click anywhere on the canvas, outside the processor groups and select Variables.

Click on to create a new variable:

  • Name: username, Value: <use your CDP workload username>
  • Name: file_location, Value: <environment’s storage.location.base attribute>
  • Name: kafkabrokers, Value: <list of all Kafka broker addresses, separated by commas>

Kafka broker addresses are found in Streams Messaging Data Hub.

streams-messaging > Streams Messaging Manager

 

open-streams-messaging

 

Select Brokers

The broker address is located underneath its name.

 

broker-addresses

 

The variables defined should look something like:

 

nifi-variables

Configure Controller Services

  1. Right-click on processor group Push to Kafka
  2. Select Configure
  3. Enable CSVReader and CSVRecordSetWriter by clicking on 

Note1: You should see two (2) services named Default NiFi SSL Context Service. Delete the one marked with .

  1. Right-click on processor group Kafka Ingest
  2. Select Configure
  3. Enable CSVReader and both CSVRecordSetWriter by clicking on 

Note2: Do not close Kafka Ingest configuration window after enabling the service controllers. We have one (1) more controller to enable.

 

nifi-enable-service-controllers

 

There are two (2) steps in enabling HBase_2_ClientService controller:

  1. Update property Kerberos Password with your CDP workload password by clicking 
  2. Enable it by clicking  

 

nifi-enable-hbase-controller

Configure Processors

Several processors, in each group, require passwords. We will update them using your CDP workload password.

 

Push to Kafka processor group

Expand processor group and update properties for the following processors:

  • Pull-From-S3: update Kerberos Password
  • Factory 1 Kafka Stream: update Password and SSL Context Service with Default NiFi SSL Context Service
  • Factory 2 Kafka Stream: update Password and SSL Context Service with Default NiFi SSL Context Service
  • Factory 3 Kafka Stream: update Password and SSL Context Service with Default NiFi SSL Context Service
  • Factory 4 Kafka Stream: update Password and SSL Context Service with Default NiFi SSL Context Service
  • Factory 5 Kafka Stream: update Password and SSL Context Service with Default NiFi SSL Context Service

 

Kafka Ingest processor group

Expand processor group and update properties for the following processors:

  • Factory 1 Kafka Stream: update Password and SSL Context Service with Default NiFi SSL Context Service
  • Factory 2 Kafka Stream: update Password and SSL Context Service with Default NiFi SSL Context Service
  • Factory 3 Kafka Stream: update Password and SSL Context Service with Default NiFi SSL Context Service
  • Factory 4 Kafka Stream: update Password and SSL Context Service with Default NiFi SSL Context Service
  • Factory 5 Kafka Stream: update Password and SSL Context Service with Default NiFi SSL Context Service
  • Write to Hive: update Kerberos Password

 

nifi-processor-update

Run DataFlow

Let's run the data flow you have just created. You have the option to run all processor groups, a processor group at a time or single processor at a time. For general debugging and diagnostics, it is recommended to run one processor at a time. This will allow you to validate data in the list queues.

 

We will run one processor group at a time.

 

Expand processor group Push to Kafka and run all processors at once by clicking in the Operate menu.

After a few seconds, you will see the data flow through all the processors. Click on from the Operate menu to stop all processors at once.

 

nifi-run-flow-push

 

Expand processor group Kafka Ingest and  run all processors at once by clicking in the Operate menu.

After a few seconds, you will see the data flow through all the processors. Click on from the Operate menu to stop all processors at once.

 

nifi-run-flow-ingest

 

Let’s see metrics on one of the Factory Kafka Topics we created.

 

Begin from Streams Messaging Data Hub:

streams-messaging > Streams Messaging Manager

 

open-streams-messaging

 

Let's take a look at one of the factory's profile.

  1. Select Topics
  2. Search for factory
  3. Click on  to view profile for Factory-1.

 

kafka-profile-factory1

 

Here's an example of metrics gathered for Factory-1:

 

kafka-metrics-factory1

View HBase Data

Let's take a brief look at the data stored in HBase table inventory:

 

On the command line, SSH into any (one)  HBase worker node and issue commands - just as you did before:

Note: You need to update <USERNAME> and <PUBLIC_IP>, where <PUBLIC_IP> is the public IP address of any (one) HBase worker node.

 

ssh <USERNAME>@<PUBLIC_IP>

 

hbase shell

 

scan 'inventory', {'LIMIT' => 3}

 

exit

 

exit

 

output-hbase-scan-table

Summary

Congratulations on completing the tutorial.

As you've now experienced, it takes a small amount of effort to configure multiple Data Hub clusters to communicate with one another using Cloudera Data Platform - Public Cloud (CDP-PC).

NiFi’s flexible processors make it simple to extract, transform and load data into HBase - hopefully this tutorial sparks your imagination and inspires other creative solutions.

 

Further Reading

Your form submission has failed.

This may have been caused by one of the following:

  • Your request timed out
  • A plugin/browser extension blocked the submission. If you have an ad blocking plugin please disable it and close this message to reload the page.