Task Execution and Environment

MRAppMaster executes the Mapper andReducer tasks as child processes in separate JVMs.

The child task inherits the environment of the parent MRAppMaster. You can specify additional options to the child JVM via themapreduce.map.java.opts andmapred.reduce.java.opts configuration parameters in theJob. For example, you can set non-standard paths for the runtime linker to search shared libraries via -Djava.library.path=<>. If the mapreduce.map.java.opts ormapred.reduce.java.opts property contains the symbol(taskid), it is interpolated with the taskid value of the MapReduce task.

The following example property has multiple arguments and substitutions, shows JVM garbage collection logging, and starts a passwordless JVM JMX agent so that it can connect withjconsole to watch child memory and threads and get thread dumps. It also sets the maximum heap-size of the map and reduce child jvm to 512 MB and 1024 MB, respectively, and adds a path to the java.library.path of the child-jvm.
<property>
  <name>mapreduce.map.java.opts</name>
  <value>
    -Xmx512M -Djava.library.path=/home/mycompany/lib -verbose:gc -Xloggc:/tmp/@taskid@.gc
    -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false
  </value>
</property>

<property>
  <name>mapreduce.reduce.java.opts</name>
  <value>
    -Xmx1024M -Djava.library.path=/home/mycompany/lib -verbose:gc -Xloggc:/tmp/@taskid@.gc
    -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false
  </value>
</property>

Memory Management

You can specify the maximum virtual memory of the launched child task, and any subprocess it launches recursively, using mapreduce.map.ulimit andmapreduce.reduce.ulimit. The value set here is a per-process limit. The value should be specified in kilobytes (KB) and must be greater than or equal to the -Xmx value passed to the Java VM.

The memory available to some parts of the framework is also configurable. In map and reduce tasks, performance can be influenced by adjusting parameters influencing the concurrency of operations and the frequency with which data is written to disk. Monitoring the filesystem counters for a job, particularly relative to byte counts from the map and into the reduce, helps you tune these parameters.

Map Parameters

A record emitted from a map is serialized into a buffer, and metadata is stored into accounting buffers. As described in the following options, when either the serialization buffer or the metadata exceed a threshold, the contents of the buffers is sorted and written to disk in the background while the map continues to output records. If either buffer fills completely while the spill is in progress, the map thread blocks. When the map is finished, any remaining records are written to disk, and all on-disk segments are merged into a single file. Minimizing the number of spills to disk can decrease map time, but a larger buffer also decreases the memory available to the mapper.

Name

Type

Description

mapreduce.task.io.sort.mb

int

The cumulative size of the serialization and accounting buffers storing records emitted from the map, in megabytes.

mapreduce.map.sort.spill.percent

float

The soft limit in the serialization buffer. Once reached, a thread begins to spill contents to disk in the background.

Other Notes

  • If either spill threshold is exceeded while a spill is in progress, collection continues until the spill is finished. For example, ifmapreduce.map.sort.spill.percent is set to 0.33, and the remainder of the buffer is filled while the spill runs, the next spill includes all the collected records, or 0.66 of the buffer, and does not generate additional spills. In other words, the thresholds are defining triggers, not blocking.
  • A record larger than the serialization buffer first triggers a spill, then spills to a separate file. It is undefined whether or not this record first passes through the combiner.

Shuffle/Reduce Parameters

As described previously, each reduce fetches the output assigned to it by the partitioner via HTTP into memory, and periodically merges these outputs to disk. If intermediate compression of map outputs is turned on, each output is decompressed into memory. The following options affect the frequency of these merges to disk prior to the reduce and the memory allocated to storing map output during the reduce.

Name

Type

Description

mapreduce.task.io.sort.factor

int

Specifies the number of segments on disk to be merged at the same time. It limits the number of open files and compression codecs during the merge. If the number of files exceeds this limit, the merge proceeds in several passes. Although this limit also applies to the map, most jobs should be configured so that reaching this limit is unlikely.

mapreduce.reduce.merge.inmem.thresholds

int

The number of sorted map outputs fetched into memory before being merged to disk. Like the spill thresholds, this is not defining a unit of partition, but a trigger. In practice, this is usually set very high (1000) or disabled (0), because merging in-memory segments is often less expensive than merging from disk (see the notes following this table). This threshold influences only the frequency of in-memory merges during the shuffle.

mapreduce.reduce.shuffle.merge.percent

float

The memory threshold for fetched map outputs before an in-memory merge is started, expressed as a percentage of memory allocated to storing map outputs in memory. Because map outputs that cannot fit in memory can be stalled, setting this high can decrease parallelism between the fetch and merge. Conversely, values as high as 1.0 have been effective for reduces with input that can fit entirely in memory. This parameter influences only the frequency of in-memory merges during the shuffle.

mapreduce.reduce.shuffle.input.buffer.percent

float

The percentage of memory, relative to the maximum heap size as typically specified inmapreduce.reduce.java.opts, that can be allocated to storing map outputs during the shuffle. Although some memory should be set aside for the framework, in general it is advantageous to set this high enough to store large and numerous map outputs.

mapreduce.reduce.input.buffer.percent

float

The percentage of memory relative to the maximum heap size in which map outputs can be retained during the reduce. When the reduce begins, map outputs are merged to disk until those that remain are under the resource limit this defines. By default, all map outputs are merged to disk before the reduce begins to maximize the memory available to the reduce. For less memory-intensive reduces, this should be increased to avoid trips to disk.

Other Notes

  • If a map output is larger than 25 percent of the memory allocated to copying map outputs, it is written directly to disk without first staging through memory.
  • When running with a combiner, the reasoning for high merge thresholds and large buffers might not apply. For merges started before all map outputs have been fetched, the combiner is run while spilling to disk. In some cases, you can get better reduce times by spending resources combining map outputs – making disk spills small and parallelizing spilling and fetching – instead of aggressively increasing buffer sizes.
  • When merging in-memory map outputs to disk to begin the reduce, if an intermediate merge is necessary because there are segments to spill and at leastmapreduce.task.io.sort.factor segments already on disk, the in-memory map outputs are part of the intermediate merge.

Configured Parameters

The following properties are localized in the job configuration for each task's execution:

Name

Type

Description

mapreduce.job.id

String

The job id

mapreduce.job.jar

String

job.jar location in job directory

mapreduce.job.local.dir

String

The job-specific shared scratch space

mapreduce.task.id

String

The task id

mapreduce.task.attempt.id String The task attempt id

mapreduce.task.is.map

boolean

True, if this is a map task

mapreduce.task.partition

int

The id of the task within the job

mapreduce.map.input.file

String

The filename that the map is reading from

mapreduce.map.input.start

long

The offset of the start of the map input split

mapreduce.map.input.length

long

The number of bytes in the map input split

mapreduce.task.output.dir

String

The task's temporary output directory

During execution of a streaming job, the periods (.) in mapreduce parameters are changed to underscores (_). For example,mapreduce.job.id becomes mapreduce_job_id. To get the values from a streaming job's mapper or reducer, use parameter names with underscores.

Task Logs

The standard output streams (stdout), standard error streams (stderr), and the syslog of the task are read by the NodeManager and logged to${HADOOP_LOG_DIR}/userlogs. By default, they are subsequently aggregated to HDFS.

Distributing Libraries

DistributedCache can be used to distribute both JARs and native libraries for use in the map and reduce tasks. The child JVM always has its current working directory added to thejava.library.path and LD_LIBRARY_PATH. The cached libraries can be loaded via System.loadLibrary or System.load. For more information on how to load shared libraries through a distributed cache, see Native Shared Libraries.