I want to play with the Enron email data and some natural language processing libraries. Since I'm using streaming and Python, NLTK seems like a natural fit.
First, go grab the zip file under "Installation to Non-Standard Location " at the NLTK site. Once you've pulled it down to your local box and unzipped it, hop into the download directory and run:
zip -r nltkandyaml.zip nltk yaml mv ntlkandyaml.zip /path/to/where/your/mapper/will/be/nltkandyaml.mod
This will create a zip file of the two modules we'll need, with the extension .mod, which will prevent Hadoop from trying to decompress our zip file.
Now here's my Python code, in a file called hammer-hack.py:
import re
import sys
import zipimport
importer = zipimport.zipimporter('nltkandyaml.mod')
yaml = importer.load_module('yaml')
nltk = importer.load_module('nltk')
punct = re.compile('[^\w\s]+')
def mapper(args):
for line in sys.stdin:
line = line.strip()
sender, msg = line.split("\t");
msg_metadata, msg_text = msg.split(chr(0) + chr(0))
# now do some magic with the message content and nltk
tokenizer = nltk.tokenize.punkt.PunktSentenceTokenizer()
tokens = tokenizer.tokenize(msg_text)
sen_count = str(len(tokens))
tokenizer = nltk.tokenize.punkt.PunktWordTokenizer()
tokens = tokenizer.tokenize(msg_text)
tokens = [token for token in tokens if not punct.search(token)]
word_count = str(len(tokens))
print "\t".join([sender, ','.join([sen_count, word_count])])
def reducer(args):
email_counts = {}
sen_counts = {}
word_counts = {}
for line in sys.stdin:
line = line.strip()
sender, msg_features = line.split('\t', 1)
features = msg_features.split(',')
try:
features = map(int, features)
email_counts[sender] = email_counts.get(sender, 0) + 1
sen_counts[sender] = sen_counts.get(sender, 0) + features[0]
word_counts[sender] = word_counts.get(sender, 0) + features[1]
except ValueError:
pass
for sender, count in email_counts.iteritems():
print '%s\t%s'% (sender, str(count))
if __name__ == "__main__":
if sys.argv[1] == "mapper":
mapper(sys.argv[2:])
elif sys.argv[1] == "reducer":
reducer(sys.argv[2:])
And here's my driver, hammer-hack.sh:
hadoop jar $HADOOP_HOME/contrib/streaming/hadoop-*-streaming.jar \ -input $1 \ -output $2 \ -mapper "python hammer-hack.py mapper" \ -reducer "python hammer-hack.py reducer" \ -file hammer-hack.py \ -file nltkandyaml.mod \ -jobconf mapred.job.name=hammer-hack
Note that I'm using -file to send my code and libraries out to the remote cluster's Distributed Cache; I'm also being polite and using -jobconf to name my job. I ran it on a portion of the small dataset as follows:
./hammer-hack.sh /shared/enron-emails/small/a* enron-emails-small-out
This produces lots of output parts, so we pull them down and merge them into one file:
$HADOOP_HOME/bin/hadoop fs -getmerge enron-emails-small-out localout