Your browser is out of date!

Update your browser to view this website correctly. Update my browser now

×

Exercise 3: Relationship Strength Analytics using Spark

You come up with a great idea that it would be interesting for the marketing team to know which products are most commonly purchased together. Perhaps there are optimizations to be made in marketing campaigns to position components together that will generate a strong lead pipeline? Perhaps they can use product correlation data to help increase sales for the less commonly viewed products? Or recover revenue for the product that was on the top 10 viewed, but not top 10 sold from last exercise?

The tool in CDH best suited for quick analytics on object relationships is Apache Spark. You can compose a Spark job to do this work and give you insight on product relationships.

> spark-shell --master yarn-client

Note: If left alone for some time, the scala > prompt may become covered up by log messages from the cluster. Simply hit enter to refresh the prompt.

Once the scala> prompt has appeared, paste the following code:

// First we're going to import the classes we need
import org.apache.hadoop.mapreduce.Job
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
import org.apache.avro.generic.GenericRecord
import parquet.hadoop.ParquetInputFormat
import parquet.avro.AvroReadSupport
import org.apache.spark.rdd.RDD
// Then we create RDD's for 2 of the files we imported from MySQL with Sqoop
// RDD's are Spark's data structures for working with distributed datasets
def rddFromParquetHdfsFile(path: String): RDD[GenericRecord] = {
    val job = new Job()
    FileInputFormat.setInputPaths(job, path)
    ParquetInputFormat.setReadSupportClass(job,
        classOf[AvroReadSupport[GenericRecord]])
    return sc.newAPIHadoopRDD(job.getConfiguration,
        classOf[ParquetInputFormat[GenericRecord]],
        classOf[Void],
        classOf[GenericRecord]).map(x => x._2)
}

val warehouse = "hdfs://{{cluster_data.manager_node_hostname}}/user/hive/warehouse/"
val order_items = rddFromParquetHdfsFile(warehouse + "order_items");
val products = rddFromParquetHdfsFile(warehouse + "products");
// Next, we extract the fields from order_items and products that we care about
// and get a list of every product, its name and quantity, grouped by order
val orders = order_items.map { x => (
    x.get("order_item_product_id"),
    (x.get("order_item_order_id"), x.get("order_item_quantity")))
}.join(
  products.map { x => (
    x.get("product_id"),
    (x.get("product_name")))
  }
).map(x => (
    scala.Int.unbox(x._2._1._1), // order_id
    (
        scala.Int.unbox(x._2._1._2), // quantity
        x._2._2.toString // product_name
    )
)).groupByKey()
// Finally, we tally how many times each combination of products appears
// together in an order, then we sort them and take the 10 most common
val cooccurrences = orders.map(order =>
  (
    order._1,
    order._2.toList.combinations(2).map(order_pair =>
        (
            if (order_pair(0)._2 < order_pair(1)._2)
                (order_pair(0)._2, order_pair(1)._2)
            else
                (order_pair(1)._2, order_pair(0)._2),
            order_pair(0)._1 * order_pair(1)._1
        )
    )
  )
)
val combos = cooccurrences.flatMap(x => x._2).reduceByKey((a, b) => a + b)
val mostCommon = combos.map(x => (x._2, x._1)).sortByKey(false).take(10)
// We print our results, 1 per line, and exit the Spark shell
println(mostCommon.deep.mkString("\n"))

To better understand this script, you could read through the comments which aim to explain what each block does and the basic process we're going through.

When we do a 'map', we specify a function that will take each record and output a modified record. This is useful when we only need a couple of fields from each record or when we need the record to use a different field as the key: we simply invoke map with a function that takes in the entire record, and returns a new record with the fields and the key we want.

The 'reduce' operations - like 'join' and 'groupBy' - will organize these records by their keys so we can group similar records together and then process them as a group. For instance, we group every purchased item by which specific order it was in - allowing us to determine all the combinations of products that were part of the same order.

You should see a result similar to the following:

Now that you have the results, we can close our Spark shell with the 'exit' command (note that you may see many log messages from other nodes in the cluster as the application shuts down).

scala> exit

Conclusion:

An experienced SQL user may recognize how this job could be written in SQL as well. Like MapReduce, Spark can actually be used as an environment for executing SQL queries. But Spark's ability to interactively write custom code in a real programming language allows you to go much beyond that, and preview the results at each step.

You can learn more about building advanced recommendation systems by consulting Cloudera's Spark Guide, or to dig into Spark even more deeper, by enrolling in Cloudera's Data Science at Scale using Spark and Hadoop training. You can also take more in-depth training specifically for Spark with Cloudera's Developer Training for Spark and Hadoop.