Debugging MapReduce Programs With MRUnit

July 3rd, 2009 by Aaron Kimball

The distributed nature of MapReduce programs makes debugging a challenge. Attaching a debugger to a remote process is cumbersome, and the lack of a single console makes it difficult to inspect what is occurring when several distributed copies of a mapper or reducer are running concurrently. Furthermore, operations that work on small amounts of input (e.g., saving the inputs to a reducer in an array) fail when running at scale, causing out-of-memory exceptions or other unintended effects.

A full discussion of how to debug MapReduce programs is beyond the scope of a single blog post, but I’d like to introduce you to a tool we designed at Cloudera to assist you with MapReduce debugging: MRUnit.

MRUnit helps bridge the gap between MapReduce programs and JUnit by providing a set of interfaces and test harnesses, which allow MapReduce programs to be more easily tested using standard tools and practices.

While this doesn’t solve the problem of distributed debugging, many common bugs in MapReduce programs can be caught and debugged locally. For this purpose, developers often try to use JUnit to test their MapReduce programs. The current state of the art often involves writing a set of tests that each create a JobConf object, which is configured to use a mapper and reducer, and then set to use the LocalJobRunner (via JobConf.set(”mapred.job.tracker”, “local”)). A MapReduce job will then run in a single thread, reading its input from test files stored on the local filesystem and writing its output to another local directory.

This process provides a solid mechanism for end-to-end testing, but has several drawbacks. Developing new tests requires adding test inputs to files that are stored alongside one’s program. Validating correct output also requires filesystem access and parsing of the emitted data files. This involves writing a great deal of test harness code, which itself may contain subtle bugs. Finally, this process is slow. Each test requires several seconds to run. Users often find themselves aggregating several unrelated inputs into a single test (violating a unit testing principle of isolating unrelated tests) or performing less exhaustive testing due to the high barriers to test authorship.

The easiest way to test MapReduce programs is to include as little Hadoop-specific code as possible in one’s application. Parsers can operate on instances of String instead of Text, and mappers should instantiate instances of MySpecificParser to tokenize input data rather than embed parsing code in the body of MyMapper.map(). Your MySpecificParser implementation can then be tested with ordinary JUnit tests. Another class or method could then be used to perform processing on parsed lines.

But even with those components separately tested, your map() and reduce() calls should still be tested individually, as the composition of separate classes may cause unintended bugs to surface. MRUnit provides test drivers that accept programmatically specified inputs and outputs, which validate the correct behavior of mappers and reducers in isolation, as well as when composed in a MapReduce job. For instance, the following code checks whether the IdentityMapper emits the same (key, value) pair as output that it receives as input:

import junit.framework.TestCase;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.lib.IdentityMapper;
import org.junit.Before;
import org.junit.Test;

public class TestExample extends TestCase {

  private Mapper mapper;
  private MapDriver driver;

  @Before
  public void setUp() {
    mapper = new IdentityMapper();
    driver = new MapDriver(mapper);
  }

  @Test
  public void testIdentityMapper() {
    driver.withInput(new Text("foo"), new Text("bar"))
            .withOutput(new Text("foo"), new Text("bar"))
            .runTest();
  }
}

The MapDriver orchestrates the test process, feeding the input (“foo” and “bar”) record to the IdentityMapper when its runTest() method is called. It also passes a mock OutputCollector implementation to the mapper. The driver then validates the output received by the OutputCollector against the expected output (”foo” and “bar”) record. If the actual and expected outputs mismatch, a JUnit assertion failure is raised, informing the developer of the error. More test drivers exist for testing individual reducers, as well as mapper/reducer compositions.

End-to-end tests involving JobConf configuration code, InputFormat and OutputFormat implementations, filesystem access, and larger scale testing are still necessary. But many errors can be quickly identified with small tests involving a single, well-chosen input record, and a suite of regression tests allows correct behavior to be assured in the face of ongoing changes to your data processing pipeline. We hope MRUnit helps your organization test code, find bugs, and improve its use of Hadoop and big data by facilitating faster and more thorough test cycles.

MRUnit is open source and is included in Cloudera’s Distribution for Hadoop. For more information about MRUnit, including where to get it and how to use its API, see the MRUnit documentation page.


Rackspace Upgrades to Cloudera’s Distribution for Hadoop

June 30th, 2009 by Christophe Bisciglia
Hadoop moves fast. Users often find that they need to upgrade after just a few months. Upgrading can be a daunting task, especially if you are several versions behind. We’ve been working with Rackspace for a while now, and they recently embarked on an upgrade from Hadoop 0.15.3 to Cloudera’s Distribution for Hadoop based on 0.18.3. Stu Hood, Search Team Technical Lead at Rackspace, was kind enough to document their experience, and we’re happy to share it with you here. -Christophe

Upgrading to the Cloudera Distribution

Hadoop plays an integral part in the email analytics performed at Rackspace Email and Apps, and our installation of Apache Hadoop 0.15.3 ran smoothly for 18 months after we deployed it in January 2008. By the time we decided to upgrade to Cloudera’s Distribution for Hadoop in June 2009, our production cluster had performed almost 600,000 MapReduce jobs.

In the past, we have deployed Hadoop along with our primary MapReduce application by checking the entire Hadoop distribution and our configuration into version control. Deploying a new slave for the cluster involved running custom scripts to create users, directories, and install dependencies.

There were a few important reasons to upgrade a cluster as trusty as ours to Cloudera’s Distribution for Hadoop (version 0.18.3):

  • Hadoop improves rapidly (since version 0.15.3 was released, over 1500 JIRA issues were resolved).
  • The Cloudera Distribution contains backported patches that are considered stable, but have not been applied to previous versions by the Apache project, such as the FairScheduler. Some of these patches fix critical bugs, add new features, or improve performance.
  • Cloudera’s configuration RPMs maintain the optimal settings for the installed version of Hadoop. Tweaking these settings manually would involve far more research than we can afford.
  • Standardizing on a Red Hat deployment infrastructure like RPM and YUM makes it much easier to track the latest stable version of Hadoop.

Steps

Configure Hadoop

In order to take advantage of Cloudera’s recommended configuration values, we decided to use Cloudera’s Configurator for Hadoop to generate the configuration that we would be using on the upgraded cluster.

We started by following the steps at https://my.cloudera.com/, using parameters that matched our current configuration. Since we were upgrading an existing cluster, it was important that the data directories matched up in our new configuration. The following table describes mapping between entries made in the GUI as well as those in the generated configuration files:

Step 2: NameNode Metadata Path(s) dfs.name.dir
Step 3: Secondary NameNode Metadata Path(s) fs.checkpoint.dir
Step 5: TaskTracker Intermediate Data Path(s) mapred.local.dir
Step 5: HDFS Data Path(s) dfs.data.dir

Note that the configurator does not support the type of variable expansion that Hadoop’s configuration files sometimes do. One such example is ${hadoop.tmp.dir} expanding to the Hadoop temporary directory.

If one of your previous configuration values used variable expansion for ${username}, you would need to replace ${username} with the name of the user that you had previously used to run the Hadoop daemons. In our case, we needed to replace instances of ${username} in the dfs.data.dir and dfs.name.dir values with user “hadoopuser.”

When we reached the end of the configurator, we downloaded the generated hadoop-site.xml* files and the Cloudera Repository RPM, and then recorded our repository ID. To double check that our data directories were configured properly, we compared the values (from the table above) in the new hadoop-site.xml* files against our previous configuration. If you see any mismatches at this step, you will probably want to restart the configurator until the resulting files are consistent.

Upgrade

At this point, it was time to jump into the upgrade. We installed the Configurator RPM, which we had downloaded earlier on all machines in our cluster by walking through the steps in the config guide. After listing out the available configuration packages with yum search hadoop-conf, we installed the matching packages for each class of machine in the cluster using yum install $packagename. At this point, the new version of Hadoop was installed, but not running.

In order to swap out the running version of Hadoop, and create a backup of the current filesystem, we needed to follow the steps leading up to the “Install New Version” step from the Hadoop Wiki upgrade page. After walking through those preparation instructions and successfully shutting down the cluster, it was time to make the switch.

Cloudera’s Distribution for Hadoop creates user ”hadoop” and this user runs all of the necessary services/daemons for the cluster. If your cluster had previously been running with a different username (ours was running as ”hadoopuser”) you will need to give the new user ownership of various different directories. We ran…

# chown -R hadoop $directory

…for each of the following configured directories:

* dfs.data.dir,
* dfs.name.dir,
* fs.checkpoint.dir,
* mapred.local.dir,
* hadoop.tmp.dir,
* /var/log/hadoop (FIXME: the hardcoded(?) log directory)

Once the ”hadoop” user had access to the necessary directories, we were ready to upgrade the Namenode. We ran the following command from our Namenode machine, so that the process would start in the background and begin upgrading its checkpoint:

$ sudo -u hadoop /usr/lib/hadoop/bin/hadoop-daemon.sh –config “/etc/hadoop/conf” start namenode -upgrade

We watched the “Upgrades” section of the DFS status page at http://$namenode:50070/ while waiting for the Namenode upgrade to complete, and then we started up the remaining Hadoop services on their respective machines using the instructions from the “Managing Hadoop Services” section of the config guide.

Finalize

Code Changes

Once our cluster was upgraded, we needed to port our Hadoop jobs to the Hadoop 0.18.3 API. There were actually only minor changes in the MapReduce and FileSystem APIs between 0.15.3 and 0.18.3:

  • Our OutputFormats needed to extend FileOuputFormat, rather than OutputFormatBase.
  • FileSystem.listPaths() was removed, in favor of .globPaths().

Finalizing the Upgrade

After verifying that our newly updated jobs were running correctly against the cluster, we were ready to make the changes permanent. The dfsadmin -finalizeUpgrade command runs in the background and cleans up the outdated copies of blocks left behind by the upgrade, freeing disk space.

$ sudo -u hadoop hadoop dfsadmin -finalizeUpgrade

Future

Now that we’ve upgraded to the Cloudera distribution using the configurator, it will be much easier to stay at the bleeding edge of Hadoop development (or the cutting edge, if we choose stability over features). We can also add the Cloudera repository RPM to our base server image and add a single command to pull down the entire distribution from Yum. Finally, we can conveniently install the packages for Pig and Hive to give our developers more options for their processing jobs.


Parallel LZO: Splittable Compression for Hadoop

June 24th, 2009 by Christophe Bisciglia


Yesterday, Chris Goffinet from Digg made a great blog post about LZO and Hadoop. Many users have been frustrated because LZO has been removed from Hadoop’s core, and Chris highlights a great way to mitigate this while the project identifies an alternative with a compatible license. We liked the post so much, we asked Chris to share it with our audience. Thanks Chris! -Christophe

So at Digg, we have been working our own Hadoop cluster using Cloudera’s distribution. One of the things we have been working through is how can we split our large compressed data and run them in parallel on Hadoop? One of the biggest drawbacks from compression algorithms like Gzip is that you can’t split them into multiple mappers. This is where LZO comes in.

Lempel-Ziv-Oberhumer (LZO) is a lossless data compression algorithm that is focused on decompression speed.

The LZO library implements a number of algorithms with the following features:

  • Compression is comparable in speed to deflate compression.
  • On modern architectures, decompression is very fast; in non-trivial cases able to exceed the speed of a straight memory-to-memory copy due to the reduced memory-reads.
  • Requires an additional buffer during compression (of size 8 kB or 64 kB, depending on compression level).
  • Requires no additional memory for decompression other than the source and destination buffers.
  • Allows the user to adjust the balance between compression quality and compression speed, without affecting the speed of decompression.

This is great until you start trying to actually get LZO working on Hadoop. First off, it gets really confusing when its now removed from Hadoop 0.20+ because of GPL restrictions.

I first came across a blog post by Johan Oskarsson that discussed this. Unfortunately when you dive into HADOOP-4640 you find out it’s against 0.20. Cloudera’s distribution uses a modified version of 0.18.3. The patch from HADOOP-4640 applies pretty cleanly besides a few things. On top of this, you need HADOOP-2664 which enables LZOP codec. You actually need this because the compressor on most Linux systems is `lzop` and that differs from the traditional LzoCodec bundled in 0.18.

So how do we get all of this working? First off grab both modified patches from my Github account.

Once you have those, apply the patches to your Cloudera distribution. Then be sure to rebuild. After that’s done and you have redeployed to your clients and production cluster you need to modify your hadoop-site.xml on the client side.

<property>
<name>io.compression.codecs</name>

<value>org.apache.hadoop.io.compress.DefaultCodec,org.apache.hadoop.io.compress.GzipCodec,org.apache.hadoop.io.compress.LzopCodec</value>
<description>A list of the compression codec classes that can be used for compression/decompression.</description>
</property>

Once that is completed, go ahead and upload your large LZO file to your Hadoop cluster.

So lets say you uploaded the file:

$ hadoop fs -put large_file.lzo /tmp/large_file.lzo

The next step is you need to index your LZO file, so that hadoop knows how to split the file into multiple mappers.

The Indexer.jar in the my Github account will be used for this process. Now you need to run the Indexer.jar and tell it what file to generate an index file for.

$ hadoop jar Indexer.jar /tmp/large_file.lzo

After that’s completed, you’re almost there! The index file will be created in /tmp. Now all you need to do is run a map/reduce job and your set! Don’t forget to set the -inputFormat parameter. Here is a code snippet using wordcount example:

#!/bin/sh
HADOOP_HOME=/usr/lib/hadoop
$HADOOP_HOME/bin/hadoop  jar $HADOOP_HOME/contrib/streaming/hadoop-0.18.3-7-streaming.jar \
-input /tmp/large_file.lzo \
-output wc_test \
-inputformat org.apache.hadoop.mapred.LzoTextInputFormat \
-mapper 'cat' \
-reducer 'wc -l'

A Great Week for Hadoop: Summit Roundup

June 22nd, 2009 by Christophe Bisciglia

On June 10th, more than 750 people from around the world descended on the Santa Clara Marriott to share their love for a little stuffed elephant named Hadoop. It was a good week to be part of this exploding community, and I want to extend Cloudera’s heartfelt thanks to everyone who made it possible, especially our friends at Yahoo! who organized this Summit. Most importantly, I want to thank all of you who were able to participate. I know many of you couldn’t make it to California this time, so I hope to see you at the Hadoop Summit East in October.

For those of you who couldn’t join us, I thought I would post my notes on a few of the highlights.

Hadoop Goes Mainstream:
About 300 developers attended last year’s summit, primarily from web companies and research labs. They were joined by a few forward-thinking venture capitalists. This year’s audience was both larger and different. In addition to the vibrant developer community, there was a flood of users of Hadoop. Though the audience was still dominated by web companies, attendees included traditional enterprise users with applications ranging from finance to biotech. There were technology previews from IBM and Sun. Major companies like Amazon joined our commercial efforts around Hadoop. VCs had also stepped up to sponsor status. Take-away? You ain’t seen nothing yet.

Hadoop In Print:
Yahoo! Developer Network gave away 500 copies of Tom White’s book, “Hadoop: The Definitive Guide,” published by O’Reilly. If you missed your copy, I’ve heard that when they aren’t busy developing AWS, Amazon has been known to sell a few books here and there.

Cloudera Presentation Slides:
Several Cloudera employees spoke at the Summit, and we have posted slides from those talks on the Hadoop Wiki. If you spoke, please put your slides up as well. Here are direct links to the Cloudera talks:

Cloudera Announces New Distribution Features:
We see an increasing number of users moving data between Hadoop and more traditional database products, and more and more usage moving to the cloud - especially Amazon. To that end, we’ve released two new features, and a collection of new packages, that make Hadoop easier to use.

  • Sqoop: Database Import for Hadoop. Brainchild of Aaron Kimball, Sqoop is an extensible command-line tool that copies data from a relational database into Hadoop. Sqoop uses JDBC to inspect the database schema, and automatically generates all of the code necessary to move the data. It can import data from any database over JDBC, and includes an extension to allow better performance in MySQL by using the mysqldump command.
  • EBS Integration for Hadoop on AWS: Tom White had a busy month. Besides finishing his book, he spent some time thinking about how Hadoop runs on Amazon Web Services, and came up with new code to make that better. Hadoop clusters on EC2 have always needed to copy data from S3 when they started up, and write results back to S3 before they powered down. While Amazon’s Elastic MapReduce makes this round-trip much easier operationally, EMR doesn’t support tools like Pig and Hive. Using Tom’s work, Cloudera is able to store data blocks on EBS volumes, and to connect them to EC2 nodes running Hadoop as needed. This delivers better throughput and more disks per node at lower cost , since EBS is cheaper than S3. Since no copies are required at startup and shutdown, your EC2 instances run for less time, saving CPU costs. Best of all, these changes to Hadoop work with Hive, Pig, Sqoop, and the rest of the Hadoop family. You can now load data, run jobs in your favorite language, turn your cluster off, and pick up exactly where you left off later. All your data survives.
  • Preview Release of 0.20 Packages: Matt Massie and Todd Lipcon doubled down to get our testing release packaged so that those of you who crave the bleeding edge can start experimenting with version 0.20 of Hadooop today. Over the next few weeks, we’ll be bringing in changes from other leading Hadoop developers, upgrading our customers, and releasing stable packages to the community.

Hadoop Developer Offsite
With so many Hadoop developers in the Bay Area, we decided to invite the Hadoop committers and some active developers to Cloudera’s offices. We wanted to collaborate without the assistance of email lists, JIRA, hudson, or any other technology designed to make our lives easier. We used sticky notes to identify issues in parallel, identified consensus with clusters, and broke off into smaller teams to explore solutions. Out of this, we identified five things we love and hate about Hadoop, the biggest upcoming challenges for the project and a wish list for the future. We broke into sub-projects to make concrete plans to address these issues, and we posted the meeting notes online. We’ll continue to host such meetings, and to work with other leaders in the development community. Bottom line, as Hadoop grows up, we need to grow with it, and meetings like this are a great way to coordinate development efforts with the needs of the community.

Yahoo! Distribution of Hadoop:
Long known for their leadership in the Hadoop development community, Yahoo! stepped it up again by releasing the source code that they run on their alpha clusters to the community at large. There are some things you can only learn about Hadoop from running at Y!’s scale, and while this is not a stable production distribution, their source-only (available via github) release provides 17 patches slated for inclusion in later versions of Hadoop. Cloudera is working closely with the team at Yahoo! to fold these patches into the next release of our distribution, along with dozens of patches we have developed to support customer workloads, and a half dozen or so from our friends at Amazon to improve performance on AWS. As big players like Yahoo! and Amazon continue to open their development processes, Cloudera can deliver more stable, better tested, and ultimately, more trusted code to our enterprise customers and the community at large in the packages you know and love (RPMs, Debian Packages, AMIs, etc). It’s not always easy for big companies to be open, so we’d like to thank and congratulate everyone involved.

HBase, Wow:
HBase has endured its share of criticism over the last year, but based on last week’s presentation, many of those problems have been addressed. HBase has made incredible strides in terms of reliability, availability and performance. Version 0.20 the first-ever “performance” release, and is focused on improving random access, scan and insert times. Check out these slides for details. We’re looking at an order of magnitude performance improvement, with random reads on par with traditional RDBMS. The other major improvement involves ZooKeeper integration, and eliminates the single point of failure in the master node. This strengthens the case for including HBase with the Cloudera Distribution for Hadoop. Please let us know if you want HBase support.

In Summary:
We had a great time at the summit – we learned a lot and got to talk to a lot of smart people. We’re looking forward to October’s Hadoop Summit East in New York City!


Analyzing Apache logs with Pig

June 17th, 2009 by Dmitriy Ryaboy

A number of organizations donate server space and bandwidth to the Apache Foundation; when you download Hadoop, Tomcat, Maven, CouchDB, or any of the other great Apache projects, the bits are sent to you from a large list of mirrors. One of the ways in which Cloudera supports the open source community is to host such a mirror.

In this blog post, we will use Pig to examine the download logs recorded on our server, demonstrating several features that are often glossed over in introductory Pig tutorials—parameter substitution in PigLatin scripts, Pig Streaming, and the use of custom loaders and user-defined functions (UDFs). It’s worth mentioning here that, as of last week, the Cloudera Distribution for Hadoop includes a package for Pig version 0.2 for both Red Hat and Ubuntu, as promised in an earlier post. It’s as simple as apt-get install pig or yum install hadoop-pig.

There are many software packages that can do this kind of analysis automatically for you on average-sized log files, of course. However, many organizations log so much data and require such custom analytics that these ordinary approaches cease to work. Hadoop provides a reliable method for scaling storage and computation; PigLatin provides an expressive and flexible language for data analysis.

Our log files are in Apache’s standard CombinedLogFormat. It’s a tad more complicated to parse than tab- or comma- delimited files, so we can’t just use the built-in PigLoader().  Luckily, there is already a custom loader in the Piggybank built specifically for parsing these kinds of logs.

First, we need to get the PiggyBank from Apache. The PiggyBank is a collection of useful add-ons (UDFs) for Pig, contributed by the Pig user community. There are instructions on the Pig website for downloading and compiling the PiggyBank. Note that you will need to make sure to add pig.jar to your CLASSPATH environment variable before running ant.

Now, we can start our PigLatin script by registering the piggybank jarfile and defining references to methods we will be using.

register /home/dvryaboy/src/pig/trunk/piggybank.jar;
DEFINE LogLoader
org.apache.pig.piggybank.storage.apachelog.CombinedLogLoader();
DEFINE DayExtractor
org.apache.pig.piggybank.evaluation.util.apachelogparser.DateExtractor('yyyy-MM-dd');

By the way — the PiggyBank contains another useful loader, called MyRegExLoader, which can be instantiated with any regular expression when you declare it with a DEFINE statement. Useful in a pinch.

While we are working on our script, it may be useful to run in local mode, only reading a small sample data set (a few hundred lines). In production we will want to run on a different file. Moreover, if we like the reports enough to automate them, we may wish to run the report every day, as new logs come in. This means we need to parameterize the source data location. We will also be using a database that maps geographic locations to IPs, and we probably want to parametrize that as well.

%default LOGS 'access_log.small'
%default GEO 'GeoLiteCity.dat'

To specify a different value for a parameter, we can use the -param flag when launching the pig script:

# pig -x mapreduce -f scripts/blogparse.pig -param LOGS='/mirror.cloudera.com/logs/access_log.*'

For mapping IPs to geographic locations, we use a third-party database from MaxMind.  This database maps IP ranges to countries, regions, and cities.  Since the data from MaxMind lists IP ranges, and our logs list specific IPs, a regular join won’t work for our purposes. Instead, we will write a simple script that takes a parsed log as input, looks up the geo information using MaxMind’s Perl module, and outputs the log with geo data prepended.

The script itself is simple — it reads in a tuple representing a parsed log record, checks the first field (the IP) against the database, and prints the data back to STDOUT :

#!/usr/bin/env perl
use warnings;
use strict;
use Geo::IP::PurePerl;

my ($path)=shift;
my $gi = Geo::IP::PurePerl->new($path);

while (<>) {
	chomp;
	if (/([^\t]*)\t(.*)/) {
		my ($ip, $rest) = ($1, $2);
		my ($country_code, undef, $country_name, $region, $city)
			= $gi->get_city_record($ip);
		print join("\t", $country_code||'', $country_name||'',
			$region||'', $city||'', $ip, $rest), "\n";
	}
}

Getting this script into Pig is a bit more interesting. The Pig Streaming interface provides us with a simple way to ship scripts that will process data, and cache any necessary objects (such as the GeoLiteCity.dat file we downloaded from MaxMind).  However, when the scripts are shipped, they are simply dropped into the current working directory. It is our responsibility to ensure that all dependencies—such as the Geo::IP::PurePerl module—are satisfied. We could install the module on all the nodes of our cluster; however, this may not be an attractive option. We can ship the module with our script—but in Perl, packages are represented by directories, so just dropping the .pm file into cwd will not be sufficient, and Pig doesn’t let us ship directory hierarchies.  We solve this problem by packing the directory into a tarball, and writing a small Bash script called “ipwrapper.sh” that will set up our Perl environment when invoked:

#!/usr/bin/env bash
tar -xzf geo-pack.tgz
PERL5LIB=$PERL5LIB:$(pwd) ./geostream.pl $1

The geo-pack.tgz tarball simply contains geostream.pl and Geo/IP/PurePerl.pm .

We also want to make the GeoLiteCity.dat file available to all of our nodes. It would be inefficient to simply drop the file in HDFS and reference it directly from every mapper, as this would cause unnecessary network traffic.  Instead, we can instruct Pig to cache a file from HDFS locally, and use the local copy.

We can relate all of the above to Pig in a single instruction:

DEFINE iplookup `ipwrapper.sh $GEO`
	ship ('ipwrapper.sh')
	cache('/home/dvryaboy/tmp/$GEO#$GEO');

We can now write our main Pig script. The objective here is to load the logs, filter out obviously non-human traffic, and using the rest, calculate the distribution of downloads by country and by Apache project.

Load the logs:

logs = LOAD '$LOGS' USING LogLoader as
	(remoteAddr, remoteLogname, user, time, method,
	 uri, proto, status, bytes, referer, userAgent);

Filter out records that represent non-humans (Googlebot and such), aren’t Apache-related, or just check the headers and do not download contents.

logs = FILTER logs BY bytes != '-' AND  uri matches '/apache.*';

-- project just the columns we will need
logs = FOREACH logs GENERATE
	remoteAddr,
	DayExtractor(time) as day, uri, bytes, userAgent;

-- The filtering function is not actually in the PiggyBank.
-- We plan on contributing it soon.
notbots = FILTER logs BY (NOT
	org.apache.pig.piggybank.filtering.IsBotUA(userAgent));

Get country information, group by country code, aggregate.

with_country = STREAM notbots THROUGH `ipwrapper.sh $GEO`
		AS (country_code, country, state, city, ip, time, uri, bytes, userAgent);

geo_uri_groups = GROUP with_country BY country_code;

geo_uri_group_counts = FOREACH geo_uri_groups GENERATE
		group,
		COUNT(with_country) AS cnt,
		SUM(with_country.bytes) AS total_bytes;

geo_uri_group_counts = ORDER geo_uri_group_counts BY cnt DESC;

STORE geo_uri_group_counts INTO 'by_country.tsv';

The first few rows look like:

Country Hits Bytes
USA 8906 2.0458781232E10
India 3930 1.5742887409E10
China 3628 1.6991798253E10
Mexico 595 1.220121453E9
Colombia 259 5.36596853E8

At this point, the data is small enough to plug into your favorite visualization tools. We wrote a quick-and-dirty python script to take logarithms and use the Google Chart API to draw this map:

Bytes by Country

This is pretty interesting. Let’s do a breakdown by US states.

Note that with the upcoming Pig 0.3 release, you will be able to have multiple stores in the same script, allowing you to re-use the loading and filtering results from earlier steps. With Pig 0.2, this needs to go in a separate script, with all the required DEFINEs, LOADs, etc.

us_only = FILTER with_country BY country_code == 'US';

by_state = GROUP us_only BY state;

by_state_cnt = FOREACH by_state GENERATE
	     group,
	     COUNT(us_only.state) AS cnt,
	     SUM(us_only.bytes) AS total_bytes;

by_state_cnt = ORDER by_state_cnt BY cnt DESC;

store by_state_cnt into 'by_state.tsv';

Theoretically, Apache selects an appropriate server based on the visitor’s location, so our logs should show a heavy skew towards California. Indeed, they do (recall that the intensity of the blue color is based on a log-scale).

Bytes by US State

Now, let’s get a breakdown by project. To get a rough mapping of URI to Project, we simply get the directory name after /apache in the URI. This is somewhat inaccurate, but good for quick prototyping. This time around, we won’t even bother writing a separate script — this is a simple awk job, after all! Using streaming, we can process data the same way we would with basic Unix utilities connected by pipes.

uris = FOREACH notbots GENERATE uri;

-- note that we have to escape the dollar sign for $3,
-- otherwise Pig will attempt to interpret this as a Pig variable.
project_map = STREAM uris
			THROUGH `awk -F '/' '{print \$3;}'` AS (project);

project_groups = GROUP project_map BY project;

project_count = FOREACH project_groups GENERATE
			group,
			COUNT(project_map.project) AS cnt;

project_count = ORDER project_count BY cnt DESC;

STORE project_count INTO 'by_project.tsv';

We can now take the by_project.tsv file and plot the results (in this case, we plotted the top 18 projects, by number of downloads).
Downloads by Project

We can see that Tomcat and Httpd dwarf the rest of the projects in terms of file downloads, and the distribution appears to follow a power-law.

We’d love to hear how folks are using Pig to analyze their data. Drop us a line, or comment below!