Enron Email Analysis with Streaming and NLTK

I want to play with the Enron email data and some natural language processing libraries. Since I'm using streaming and Python, NLTK seems like a natural fit.

First, go grab the zip file under "Installation to Non-Standard Location " at the NLTK site. Once you've pulled it down to your local box and unzipped it, hop into the download directory and run:

zip -r nltkandyaml.zip nltk yaml
mv ntlkandyaml.zip /path/to/where/your/mapper/will/be/nltkandyaml.mod

This will create a zip file of the two modules we'll need, with the extension .mod, which will prevent Hadoop from trying to decompress our zip file.

Now here's my Python code, in a file called hammer-hack.py:

import re
import sys
import zipimport

importer = zipimport.zipimporter('nltkandyaml.mod')
yaml = importer.load_module('yaml')
nltk = importer.load_module('nltk')
punct = re.compile('[^\w\s]+')

def mapper(args):
  for line in sys.stdin:
    line = line.strip()
    sender, msg = line.split("\t");
    msg_metadata, msg_text = msg.split(chr(0) + chr(0))

    # now do some magic with the message content and nltk                                                                                                                         
    tokenizer = nltk.tokenize.punkt.PunktSentenceTokenizer()
    tokens = tokenizer.tokenize(msg_text)
    sen_count = str(len(tokens))

    tokenizer = nltk.tokenize.punkt.PunktWordTokenizer()
    tokens = tokenizer.tokenize(msg_text)
    tokens = [token for token in tokens if not punct.search(token)]
    word_count = str(len(tokens))

    print "\t".join([sender, ','.join([sen_count, word_count])])

def reducer(args):
  email_counts = {}
  sen_counts = {}
  word_counts = {}

  for line in sys.stdin:
    line = line.strip()

    sender, msg_features = line.split('\t', 1)
    features = msg_features.split(',')
    try:
      features = map(int, features)
      email_counts[sender] = email_counts.get(sender, 0) + 1
      sen_counts[sender] = sen_counts.get(sender, 0) + features[0]
      word_counts[sender] = word_counts.get(sender, 0) + features[1]
    except ValueError:
      pass

  for sender, count in email_counts.iteritems():
    print '%s\t%s'% (sender, str(count))

if __name__ == "__main__":
  if sys.argv[1] == "mapper":
    mapper(sys.argv[2:])
  elif sys.argv[1] == "reducer":
    reducer(sys.argv[2:])

And here's my driver, hammer-hack.sh:

hadoop jar $HADOOP_HOME/contrib/streaming/hadoop-*-streaming.jar \
  -input $1 \
  -output $2 \
  -mapper "python hammer-hack.py mapper" \
  -reducer "python hammer-hack.py reducer" \
  -file hammer-hack.py \
  -file nltkandyaml.mod \
  -jobconf mapred.job.name=hammer-hack

Note that I'm using -file to send my code and libraries out to the remote cluster's Distributed Cache; I'm also being polite and using -jobconf to name my job. I ran it on a portion of the small dataset as follows:

./hammer-hack.sh /shared/enron-emails/small/a* enron-emails-small-out

This produces lots of output parts, so we pull them down and merge them into one file:

$HADOOP_HOME/bin/hadoop fs -getmerge enron-emails-small-out localout