Essential Interfaces

Hadoop MapReduce is bundled with a library of useful mappers, reducers, partitioners, and counters. Applications typically implement the Mapper and Reducer interfaces.

Continue reading:

Mapper

Mapper maps input key-value pairs to a set of intermediate key-value pairs. Maps are the individual tasks that transform input records into intermediate records. The transformed intermediate records do not need to be of the same type as the input records. A given input pair can map to zero or many output pairs.

The Hadoop MapReduce framework spawns one map task for eachInputSplit generated by theInputFormat for the job.

The framework calls map(WritableComparable, Writable, Context) for each key-value pair in the InputSplit for that task. Applications can override the cleanup(Context) method to perform any required cleanup.

Output pairs do not need to be of the same types as input pairs. A given input pair might map to zero or many output pairs. Output pairs are collected with calls to context.write(WritableComparable,Writable).

Applications can use Counter to report statistics.

All intermediate values associated with a given output key are grouped by the framework, and passed to Reducer(s) to determine the final output. You can control the grouping by specifying a Comparator with Job.setGroupingComparatorClass(Class).

Mapper outputs are sorted and then partitioned perReducer. The total number of partitions is the same as the number of reduce tasks for the job. You can control which keys (and hence records) go to which Reducer by implementing a custom Partitioner.

You can optionally specify a combiner, with Job.setCombinerClass(Class), to perform local aggregation of the intermediate outputs, which helps reduce the amount of data transferred from the Mapper to theReducer.

The intermediate, sorted outputs are always stored in a simple (key-len,key, value-len, value) format. Applications can control if, and how, the intermediate outputs are to be compressed and the CompressionCodec to be used with theConfiguration.

How Many Maps?

The number of maps is usually driven by the total size of the inputs; that is, the total number of blocks of the input files.

The optimal level of parallelism for maps seems to be around 10-100 maps per host, although it can been set up to 300 maps for very CPU-light map tasks. Task setup takes a while, so it is best if the maps take at least a minute to execute.

Thus, if you expect 10 TB of input data and have a block size of 128 MB, you will end up with 82,000 maps, unless Configuration.set(MRJobConfig.NUM_MAPS, int) (which only provides a hint to the framework) is used to set it even higher.

Reducer

Reducer reduces a set of intermediate values that share a key to a smaller set of values.

You set the number of reducers for the job using Job.setNumReduceTasks(int).

The framework calls the reduce(WritableComparable, Iterable<Writable>, Context) method for each <key, (list of values)> pair in the grouped inputs. Applications can then override the cleanup(Context) method to perform any required cleanup.

Reducer has three primary phases: shuffle, sort, and reduce.

Shuffle

Input to the Reducer is the sorted output of the mappers. In this phase, the framework fetches the relevant partition of the output of all the mappers, via HTTP.

Sort

The framework groups Reducer inputs by key (since different mappers might output the same key) in this stage.

The shuffle and sort phases occur simultaneously; map outputs are merged as they are fetched.

Secondary Sort

If equivalence rules for grouping the intermediate keys are required to be different from those for grouping keys before reduction, you can specify aComparator with Job.setSortComparator(Class). Because Job.setGroupingComparatorClass(Class) can be used to control how intermediate keys are grouped, these can be used in conjunction to simulate a secondary sort on values.

Reduce

In this phase, the Reducer.reduce(WritableComparable, Iterator, OutputCollector, Reporter) method is called for each <key, (list of values)> pair in the grouped inputs.

The output of the reduce task is typically written to the FileSystem with ReduceContext(WritableComparable, Writable).

Applications can use the Counter to report statistics.

The output of the Reducer is not sorted.

How Many Reduces?

The right number of reduces is typically 0.95 or 1.75 multiplied by (no. of hosts * <no. of maximum containers per host>).

With 0.95, all of the reduces can launch immediately and start transferring map outputs as the maps finish. With 1.75, the faster hosts finish their first round of reduces and launch a second wave of reduces, doing a much better job of load balancing.

Increasing the number of reduces increases the framework overhead, but increases load balancing and lowers the cost of failures.

The scaling factors above are slightly less than whole numbers, to reserve a few reduce slots in the framework for speculative tasks and failed tasks.

Reducer NONE

You can set the number of reduce tasks to zero if no reduction is desired.

In this case, the outputs of the map tasks go directly to the filesystem, into the output path set by FileOutputFormat.setOutputPath(Job, Path). The framework does not sort the map outputs before writing them to the filesystem.

Partitioner

Partitioner partitions the key space. Partitioner controls the partitioning of the keys of the intermediate map outputs. The key (or a subset of the key) is used to derive the partition, typically by a hash function. The total number of partitions is the same as the number of reduce tasks for the job. Hence this controls which of the reduce tasks the intermediate key (and hence the record) are sent to for reduction.

HashPartitioner is the default partitioner.

Counter

Counter is a facility for MapReduce applications to report statistics. Mapper and Reducer implementations can use Counter to report statistics.