X

Cloudera Tutorials

Optimize your time with detailed tutorials that clearly explain the best way to deploy, use, and manage Cloudera products. Login or register below to access all Cloudera tutorials.

Cloudera named a leader in 2022 Gartner® Magic Quadrant™ for Cloud Database Management Systems Get the report

Ready to Get Started?

 

NOTICE

 

As of January 31, 2021, this tutorial references legacy products that no longer represent Cloudera’s current product offerings.

Please visit recommended tutorials:

 

Introduction

We now know the role that Storm plays in this Trucking IoT system. Let's dive into the code and dissect what the code is doing and also learn how to build this topology.

Outline

Storm Components

Now that we have a general idea of the power of Storm, let's look at its different components, our building blocks when defining a Storm process, and what they're used for.

  • Tuple: A list of values. The main data structure in Storm.
  • Stream: An unbounded sequence of tuples.
  • Spout: A source of streams. Spouts will read tuples in from an external source and emit them into streams for the rest of the system to process.
  • Bolt: Processes the tuples from an input stream and produces an output stream of results. This process is also called stream transformation. Bolts can do filtering, run custom functions, aggregations, joins, database operations, and much more.
  • Topology: A network of spouts and bolts that are connected together by streams. In other words, the overall process for Storm to perform.

Environment Setup

We will be working with the trucking-iot-demo-storm-on-scala project that you downloaded in previous sections. Feel free to download the project again on your local environment so you can open it with your favorite text editor or IDE.

git clone https://github.com/orendain/trucking-iot-demo-storm-on-scala

Alternatively, if you would prefer not to download the code, and simply follow along, you may view this project directly on GitHub.

Topology Build and Submit Overview

Look inside the KafkaToKafka.scala class and you'll find a companion object with our standard entry point, main, and a KafkaToKafka class with a method named buildTopology which handles the building of our Storm topology.

The primary purpose of our main method is to configure and build our topology and then submit it for deployment onto our cluster. Let's take a closer look at what's inside:

// Set up configuration for the Storm Topology
val stormConfig = new Config()
stormConfig.setDebug(config.getBoolean(Config.TOPOLOGY_DEBUG))
stormConfig.setMessageTimeoutSecs(config.getInt(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS))
stormConfig.setNumWorkers(config.getInt(Config.TOPOLOGY_WORKERS))

The org.apache.storm.Config class provides a convenient way to create a topology config by providing setter methods for all the configs that can be set. It also makes it easier to do things like add serializations.

Following the creation of a Config instance, our main method continues with:

// Build and submit the Storm config and topology
val topology = new KafkaToKafka(config).buildTopology()
StormSubmitter.submitTopologyWithProgressBar("KafkaToKafka", stormConfig, topology)

Here, we invoke the buildTopology method of our class, which is responsible for building a StormTopology. With the topology that is returned, we use the StormSubmitter class to submit topologies to run on the Storm cluster.

Starting to Build a Storm Topology

Let's dive into the buildTopology method to see exactly how to build a topology from the ground up.

// Builder to perform the construction of the topology.
implicit val builder = new TopologyBuilder()

We start by creating an instance of TopologyBuilder, which exposes an easy-to-use Java API for putting together a topology. Next, we pull in some values from our configuration file (application.conf).

Building a Kafka Spout

/* Construct a record translator that defines how to extract and turn a Kafka ConsumerRecord into a list of objects to be emitted
 */
lazy val truckRecordTranslator = new Func[ConsumerRecord[String, String], java.util.List[AnyRef]] {
  def apply(record: ConsumerRecord[String, String]) = new Values("EnrichedTruckData", record.value())
}

/* In order, the following snippet of code does the following:
 *
 * Configure a KafkaSpout to connect to a particular set of bootstrap servers, naming the spout "trucking_data_traffic"
 *
 * Set the record translator to the one defined above.  The two values that the record translator outputs we are naming "dataType" and "data", respectively.
 *
 * Configure the spout to poll the Kafka topic starting from the earliest (i.e. oldest) possible data available.  In other words, ingests the entire Kafka topic.
 *
 * Set the groupID to the character "g"
 *
 * Build the KafkaSpoutConfig object following these configurations.
 */
val truckSpoutConfig: KafkaSpoutConfig[String, String] = KafkaSpoutConfig.builder(config.getString("kafka.bootstrap-servers"), "trucking_data_truck_enriched")
  .setRecordTranslator(truckRecordTranslator, new Fields("dataType", "data"))
  .setFirstPollOffsetStrategy(KafkaSpoutConfig.FirstPollOffsetStrategy.EARLIEST)
  .setGroupId("g")
  .build()

In order to build a KafkaSpout, we first need to decide what Kafka topic will be read from, where it exists, and exactly how we want to ingest that data in our topology. This is where the above KafkaSpoutConfig comes in handy.

Now that we have a KafkaSpoutConfig, we use it to build a KafkaSpout and place it in the topology.

// Create a spout with the specified configuration, with only 1 instance of this bolt running in parallel, and place it in the topology blueprint
builder.setSpout("enrichedTruckData", new KafkaSpout(truckSpoutConfig), 1)

Remember that builder refers to the TopologyBuilder. We're creating a new KafkaSpout with a parallelism_hint of 1 (how many tasks, or instances, of the component to run on the cluster). We place the spout in the topology blueprint with the name "enrichedTruckData".

Building a Custom Bolt

Great, we now have a way to ingest our CSV-delimited strings from Kafka topics and into our Storm topology. We now need a way to unpackage these strings into Java objects so we can more easily interact with them.

Let's go ahead and build a custom Storm Bolt for this purpose. We'll call it CSVStringToObjectBolt. But first, let's see how this new custom bolt will fit into our topology blueprint.

/* Build a bolt for creating JVM objects from the ingested strings
 *
 * Our custom bolt, CSVStringToObjectBolt, is given the bolt id of "unpackagedData".  Storm is told to assign only
 * a single task for this bolt (i.e. create only 1 instance of this bolt in the cluster).
 *
 * ShuffleGrouping shuffles data flowing in from the specified spouts evenly across all instances of the newly
 * created bolt (which is only 1 in this example)
 */
builder.setBolt("unpackagedData", new CSVStringToObjectBolt(), 1)
  .shuffleGrouping("enrichedTruckData")
  .shuffleGrouping("trafficData")

We create a new CSVStringToObjectBolt bolt, and tell Storm to assign only a single task for this bolt (i.e. create only 1 instance of this bolt in the cluster). We name it "unpackagedData".

ShuffleGrouping shuffles data flowing in from the specified spouts evenly across all instances of the newly created bolt.

Let's dig in and see how we create this bolt from scratch: check out the CSVStringToObjectBolt.java file.

class CSVStringToObjectBolt extends BaseRichBolt {

Rather than creating a Storm bolt entirely from scratch, we leverage one of Storm's base classes and simply extend BaseRichBolt. BaseRichBolt takes care of a lot of the lower-level implementation for us.

private var outputCollector: OutputCollector = _

override def prepare(stormConf: util.Map[_, _], context: TopologyContext, collector: OutputCollector): Unit = {
  outputCollector = collector
}

The prepare method provides the bolt with an OutputCollector that is used for emitting tuples from this bolt. Tuples can be emitted at anytime from the bolt -- in the prepare, execute, or cleanup methods, or even asynchronously in another thread. This prepare implementation simply saves the OutputCollector as an instance variable to be used later on in the execute method.

override def execute(tuple: Tuple): Unit = {
  // Convert each string into its proper case class instance (e.g. EnrichedTruckData or TrafficData)
  val (dataType, data) = tuple.getStringByField("dataType") match {
    case typ @ "EnrichedTruckData" => (typ, EnrichedTruckData.fromCSV(tuple.getStringByField("data")))
    case typ @ "TrafficData" => (typ, TrafficData.fromCSV(tuple.getStringByField("data")))
  }

  outputCollector.emit(new Values(dataType, data))
  outputCollector.ack(tuple)
}

The execute method receives a tuple from one of the bolt's inputs. For each tuple that this bolt processes, the execute method is called.

We start by extracting the value of the tuple stored under the name "dataType", which we know is either "EnrichedTruckData" or "TrafficData". Depending on which it is, we call the fromCSV method of the appropriate object, which returns a JVM object based on this CSV string.

Next, we use the outputCollector to emit a Tuple onto this bolt's outbound stream. Finally, we ack (acknowledge) that the bolt has processed this tuple. This is part of Storm's reliability API for guaranteeing no data loss.

The last method in this bolt is a short one:

override def declareOutputFields(declarer: OutputFieldsDeclarer): Unit = declarer.declare(new Fields("dataType", "data"))

The declareOutputFields method declares that this bolt emits 2-tuples with fields called "dataType" and "data".

That's it! We've just seen how to build a custom Storm bolt from scratch.

Building a Tumbling Windowed Bolt

Let's get back to our KafkaToKafka class and look at what other components we're adding downstream of the CSVStringToObjectBolt.

We now have KafkaSpouts ingesting in CSV strings from Kafka topics and a bolt that creating Java objects from these CSV strings. The next step in our process is to join these two types of Java objects into one.

 /* Create a tumbling windowed bolt using our custom TruckAndTrafficJoinBolt, which houses the logic for how to
  * merge the different Tuples.
  *
  * A tumbling window with a duration means the stream of incoming Tuples are partitioned based on the time
  * they were processed (think of a traffic light, allowing all vehicles to pass but only the ones that get there
  * by the time the light turns red).  All tuples that made it within the window are then processed all at once
  * in the TruckAndTrafficJoinBolt.
  */
 val joinBolt = new TruckAndTrafficJoinBolt().withTumblingWindow(new BaseWindowedBolt.Duration(20, SECONDS))

 // GlobalGrouping suggests all data from "unpackagedData" component go to a single one of the bolt's tasks
 builder.setBolt("joinedData", joinBolt, 1).globalGrouping("unpackagedData")

Here, we create a tumbling windowed bolt using our custom TruckAndTrafficJoinBolt, which houses the logic for how to merge the different Tuples. This bolt processes both EnrichedTruckData and TrafficData and joins them to emit instances of EnrichedTruckAndTrafficData.

A tumbling window with a duration means the stream of incoming Tuples are partitioned based on the time they were processed. Think of a traffic light, allowing all vehicles to pass but only the ones that get there by the time the light turns red. All tuples that made it within the window are then processed all at once in the TruckAndTrafficJoinBolt.

We'll take a look at how to build a custom windowed bolt in the next section.

Building a Sliding Windowed Bolt

Now that we have successfully joined data coming in from two streams, let's perform some windowed analytics on this data.

/*
 * Build a bolt to generate driver stats from the Tuples in the stream.
 */

/* Creates a sliding windowed bolt using our custom DataWindowindBolt, which is responsible for reducing a list
 * of recent Tuples(data) for a particular driver into a single datatype.  This data is used for machine learning.
 *
 * This sliding windowed bolt with a tuple count of 10 means we always process the last 10 tuples in the
 * specified bolt.  The window slides over by one, dropping the oldest, every time a new tuple is processed.
 */
val statsBolt = new DataWindowingBolt().withWindow(new BaseWindowedBolt.Count(10))

Creates a sliding windowed bolt using our custom DataWindowingBolt, which is responsible for reducing a list of recent Tuples(data) for a particular driver into a single datatype. This data is used for machine learning.

This sliding windowed bolt with a tuple count as a length means we always process the last 'N' tuples in the specified bolt. The window slides over by one, dropping the oldest, each time a new tuple is processed.

The next step is to build a bolt and then place in the topology blueprint connected to the "joinedData" stream.

builder.setBolt("windowedDriverStats", statsBolt, 1).shuffleGrouping("joinedData")

Building Another Custom Bolt

Before we push our Storm-processed data back out to Kafka, we want to serialize the Java objects we've been working with into string form.

/* Build bolts to serialize data into a CSV string.
 *
 * The first bolt ingests tuples from the "joinedData" bolt, which streams instances of EnrichedTruckAndTrafficData.
 * The second bolt ingests tuples from the "joinedData" bolt, which streams instances of WindowedDriverStats.
 */
builder.setBolt("serializedJoinedData", new ObjectToCSVStringBolt()).shuffleGrouping("joinedData")
builder.setBolt("serializedDriverStats", new ObjectToCSVStringBolt()).shuffleGrouping("windowedDriverStats")

These bolts, ObjectToCSVStringBolt are inverse to our previous custom bolt, CSVStringToObjectBolt. They expect tuples with Java objects and emit a CSV string representation of them. Check out the source code if you're interested in their inner-workings.

Now, we have two streams emitting string data: "serializedJoinedData" which is the result of the two joined streams, and "serializedDriverStats", which is the result of windowed analytics we performed.

Building a Kafka Bolt

We now build KafkaBolts to push data from these streams into Kafka topics. We start by defining some Kafka properties:

val kafkaBoltProps = new Properties()
kafkaBoltProps.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, config.getString("kafka.bootstrap-servers"))
kafkaBoltProps.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, config.getString("kafka.key-serializer"))
kafkaBoltProps.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, config.getString("kafka.value-serializer"))

Next, we build a KafkaBolt:

val truckingKafkaBolt = new KafkaBolt()
  .withTopicSelector(new DefaultTopicSelector("trucking_data_joined"))
  .withTupleToKafkaMapper(new FieldNameBasedTupleToKafkaMapper("key", "data"))
  .withProducerProperties(kafkaBoltProps)

withTopicSelector specifies the Kafka topic to drop entries into.

withTupleToKafkaMapper is passed an instance of FieldNameBasedTupleToKafkaMapper, which tells the bolt which fields of a Tuple the data to pass in is stored as.

withProducerProperties takes in properties to set itself up with.

Finally, we drop this bolt into the rest of the topology.

builder.setBolt("joinedDataToKafka", truckingKafkaBolt, 1).shuffleGrouping("serializedJoinedData")

Creating the Topology

Now that we have specified the entire Storm topology by adding components into our TopologyBuilder, we create an actual topology using the builder's blueprint and return it.

// Now that the entire topology blueprint has been built, we create an actual topology from it
builder.createTopology()

Next: Deploying the Storm topology

Phew! We've now learned about how a Storm topology is developed. In the next section, we'll package this project up into a portable JAR file and run a quick command that will deploy this code onto a cluster.



Your form submission has failed.

This may have been caused by one of the following:

  • Your request timed out
  • A plugin/browser extension blocked the submission. If you have an ad blocking plugin please disable it and close this message to reload the page.