MapReduceIndexerTool

MapReduceIndexerTool is a MapReduce batch job driver that takes a morphline and creates a set of Solr index shards from a set of input files and writes the indexes into HDFS in a flexible, scalable, and fault-tolerant manner.

For more information on Morphlines, see:

MapReduceIndexerTool also supports merging the output shards into a set of live customer-facing Solr servers, typically a SolrCloud.

The indexer creates an offline index on HDFS in the output directory specified by the --output-dir parameter. If the --go-live parameter is specified, Solr merges the resulting offline index into the live running service. Thus, the Solr service must have read access to the contents of the output directory to complete the go-live step. In an environment with restrictive permissions, such as one with an HDFS umask of 077, the Solr user may not be able to read the contents of the newly created directory. To address this issue, the indexer automatically applies the HDFS ACLs to enable Solr to read the output directory contents. These ACLs are only applied if HDFS ACLs are enabled on the HDFS NameNode. For more information, see HDFS Extended ACLs.

The indexer only makes ACL updates to the output directory and its contents. If the output directory's parent directories do not include the run permission, the Solr service is not be able to access the output directory. Solr must have run permissions from standard permissions or ACLs on the parent directories of the output directory.

MapReduceIndexerTool Input Splits

Different from some other indexing tools, the MapReduceIndexerTool does not operate on HDFS blocks as input splits. This means that when indexing a smaller number of large files, fewer hosts may be involved. For example, indexing two files that are each one GB results in two hosts acting as mappers. If these files were stored on a system with a 128 MB block size, other mappers might divide the work on the two files among 16 mappers, corresponding to the 16 HDFS blocks that store the two files.

This intentional design choice aligns with MapReduceIndexerTool supporting indexing non-splittable file formats such as JSON, XML, jpg, or log4j.

In theory, this could result in inefficient use of resources when a single host indexes a large file while many other hosts sit idle. In reality, this indexing strategy typically results in satisfactory performance in production environments because in most cases the number of files is large enough that work is spread throughout the cluster.

While dividing tasks by input splits does not present problems in most cases, users may still want to divide indexing tasks along HDFS splits. In that case, use the CrunchIndexerTool, which can work with Hadoop input splits using the input-file-format option.

MapReduceIndexerTool Metadata

The MapReduceIndexerTool generates metadata fields for each input file when indexing. These fields can be used in morphline commands. These fields can also be stored in Solr, by adding definitions like the following to your Solr schema.xml file. After the MapReduce indexing process completes, the fields are searchable through Solr.
<!-- file metadata -->
<field name="file_download_url" type="string" indexed="false" stored="true" />
<field name="file_upload_url" type="string" indexed="false" stored="true" />
<field name="file_scheme" type="string" indexed="true" stored="true" />
<field name="file_host" type="string" indexed="true" stored="true" />
<field name="file_port" type="int" indexed="true" stored="true" />
<field name="file_path" type="string" indexed="true" stored="true" />
<field name="file_name" type="string" indexed="true" stored="true" />
<field name="file_length" type="tlong" indexed="true" stored="true" />
<field name="file_last_modified" type="tlong" indexed="true" stored="true" />
<field name="file_owner" type="string" indexed="true" stored="true" />
<field name="file_group" type="string" indexed="true" stored="true" />
<field name="file_permissions_user" type="string" indexed="true" stored="true" />
<field name="file_permissions_group" type="string" indexed="true" stored="true" />
<field name="file_permissions_other" type="string" indexed="true" stored="true" />
<field name="file_permissions_stickybit" type="boolean" indexed="true" stored="true" />

Example output:

"file_upload_url":"foo/test-documents/sample-statuses-20120906-141433.avro",
"file_download_url":"hdfs://host1.mycompany.com:8020/user/foo/ test-documents/sample-statuses-20120906-141433.avro",
"file_scheme":"hdfs",
"file_host":"host1.mycompany.com",
"file_port":8020,
"file_name":"sample-statuses-20120906-141433.avro",
"file_path":"/user/foo/test-documents/sample-statuses-20120906-141433.avro",
"file_last_modified":1357193447106,
"file_length":1512,
"file_owner":"foo",
"file_group":"foo",
"file_permissions_user":"rw-",
"file_permissions_group":"r--",
"file_permissions_other":"r--",
"file_permissions_stickybit":false,

MapReduceIndexerTool Usage Syntax

  • To view the usage syntax in a default parcel-based deployment, run:
    hadoop jar /opt/cloudera/parcels/CDH/jars/search-mr-*-job.jar \
    org.apache.solr.hadoop.MapReduceIndexerTool --help
  • To view the usage syntax in a default package-based deployment, use:
    hadoop jar /usr/lib/solr/contrib/mr/search-mr-*-job.jar \
    org.apache.solr.hadoop.MapReduceIndexerTool --help
usage: hadoop [GenericOptions]... jar search-mr-*-job.jar org.apache.solr.hadoop.MapReduceIndexerTool
       [--help] --output-dir HDFS_URI [--input-list URI]
       --morphline-file FILE [--morphline-id STRING] [--solr-home-dir DIR]
       [--update-conflict-resolver FQCN] [--mappers INTEGER]
       [--reducers INTEGER] [--max-segments INTEGER]
       [--fair-scheduler-pool STRING] [--dry-run] [--log4j FILE]
       [--verbose] [--show-non-solr-cloud] [--zk-host STRING] [--go-live]
       [--collection STRING] [--go-live-min-replication-factor INTEGER]
       [--go-live-threads INTEGER] [HDFS_URI [HDFS_URI ...]]

MapReduce batch job driver that  takes  a  morphline  and  creates a set of
Solr index shards from a set  of  input  files  and writes the indexes into
HDFS, in a flexible, scalable  and  fault-tolerant manner. It also supports
merging the output shards into a set  of live customer facing Solr servers,
typically  a  SolrCloud.  The  program   proceeds  in  several  consecutive
MapReduce based phases, as follows:

1) Randomization phase: This (parallel) phase  randomizes the list of input
files in order to spread  indexing  load  more  evenly among the mappers of
the subsequent phase.

2) Mapper phase: This (parallel) phase  takes the input files, extracts the
relevant content, transforms it and  hands  SolrInputDocuments  to a set of
reducers. The ETL functionality is  flexible  and customizable using chains
of arbitrary morphline commands that  pipe  records from one transformation
command to another. Commands to parse and  transform a set of standard data
formats such as Avro, CSV,  Text,  HTML,  XML,  PDF,  Word, Excel, etc. are
provided out of the box,  and  additional  custom  commands and parsers for
additional file or data formats can be  added as morphline plugins. This is
done by implementing a simple Java  interface  that consumes a record (e.g.
a file in the form  of  an  InputStream  plus  some headers plus contextual
metadata) and generates as output zero  or  more  records. Any kind of data
format can be indexed and any  Solr  documents  for any kind of Solr schema
can be generated, and any custom ETL logic can be registered and executed.
Record fields, including  MIME  types,  can  also  explicitly  be passed by
force  from  the  CLI  to  the   morphline,  for  example:  hadoop  ...  -D
morphlineField._attachment_mimetype=text/csv

3)   Reducer   phase:   This   (parallel)    phase   loads   the   mapper's
SolrInputDocuments into  one  EmbeddedSolrServer  per  reducer.  Each  such
reducer and Solr server can be  seen  as  a (micro) shard. The Solr servers
store their data in HDFS.

4) Mapper-only  merge  phase:  This  (parallel)  phase  merges  the  set of
reducer shards into the number of  solr  shards expected by the user, using
a mapper-only job.  This  phase  is  omitted  if  the  number  of shards is
already equal to the number of shards expected by the user.

5) Go-live phase: This optional  (parallel)  phase merges the output shards
of the previous phase into  a  set  of  live  customer facing Solr servers,
typically a SolrCloud. If this  phase  is  omitted you can explicitly point
each Solr server to one of the HDFS output shard directories.

Fault Tolerance: Mapper and reducer  task  attempts  are retried on failure
per the standard MapReduce semantics. On program startup all data in the --
output-dir is deleted  if  that  output  directory  already  exists. If the
whole job fails you can retry  simply  by rerunning the program again using
the same arguments.

positional arguments:
  HDFS_URI               HDFS URI  of  file  or  directory  tree  to index.
                         (default: [])

optional arguments:
  --help, -help, -h      Show this help message and exit
  --input-list URI       Local URI or  HDFS  URI  of  a  UTF-8 encoded file
                         containing a list of HDFS  URIs  to index, one URI
                         per line in the  file.  If  '-' is specified, URIs
                         are read  from  the  standard  input.  Multiple --
                         input-list arguments can be specified.
  --morphline-id STRING  The identifier  of  the  morphline  that  shall be
                         executed  within   the   morphline   config   file
                         specified by --morphline-file. If the --morphline-
                         id option is  ommitted  the  first (i.e. top-most)
                         morphline  within  the   config   file   is  used.
                         Example: morphline1
  --solr-home-dir DIR    Optional relative or absolute path  to a local dir
                         containing  Solr  conf/  dir   and  in  particular
                         conf/solrconfig.xml and optionally  also lib/ dir.
                         This directory will be  uploaded  to each MR task.
                         Example: src/test/resources/solr/minimr
  --update-conflict-resolver FQCN
                         Fully qualified class name  of  a  Java class that
                         implements the  UpdateConflictResolver  interface.
                         This  enables  deduplication  and  ordering  of  a
                         series of document  updates  for  the  same unique
                         document key. For example,  a  MapReduce batch job
                         might index multiple files  in  the same job where
                         some of the files contain  old and new versions of
                         the very  same  document,  using  the  same unique
                         document key.
                         Typically,  implementations   of   this  interface
                         forbid collisions  by  throwing  an  exception, or
                         ignore all but the  most  recent document version,
                         or, in the general  case,  order colliding updates
                         ascending  from  least   recent   to  most  recent
                         (partial) update. The caller of this interface (i.
                         e.  the  Hadoop  Reducer)   will  then  apply  the
                         updates to  Solr  in  the  order  returned  by the
                         orderUpdates() method.
                         The                                        default
                         RetainMostRecentUpdateConflictResolver
                         implementation ignores  all  but  the  most recent
                         document version, based on  a configurable numeric
                         Solr    field,    which     defaults     to    the
                         file_last_modified timestamp (default: org.apache.
                         solr.hadoop.dedup.
                         RetainMostRecentUpdateConflictResolver)
  --mappers INTEGER      Tuning knob that indicates  the  maximum number of
                         MR mapper tasks to use.  -1  indicates use all map
                         slots available on the cluster. (default: -1)
  --reducers INTEGER     Tuning knob that indicates  the number of reducers
                         to index into.  0  is  reserved  for a mapper-only
                         feature that may  ship  in  a  future  release. -1
                         indicates use all  reduce  slots  available on the
                         cluster. -2 indicates use  one  reducer per output
                         shard,  which   disables   the   mtree   merge  MR
                         algorithm. The mtree  merge  MR algorithm improves
                         scalability by spreading  load  (in particular CPU
                         load) among a  number  of  parallel  reducers that
                         can be much larger than  the number of solr shards
                         expected by  the  user.  It  can  be  seen  as  an
                         extension of concurrent  lucene  merges and tiered
                         lucene  merges   to   the   clustered   case.  The
                         subsequent mapper-only phase merges  the output of
                         said large number  of  reducers  to  the number of
                         shards expected by  the  user,  again by utilizing
                         more  available   parallelism   on   the  cluster.
                         (default: -1)
  --max-segments INTEGER
                         Tuning knob that indicates  the  maximum number of
                         segments to be contained  on  output  in the index
                         of each reducer shard.  After  a reducer has built
                         its output index  it  applies  a  merge  policy to
                         merge segments  until  there  are  <=  maxSegments
                         lucene  segments  left  in   this  index.  Merging
                         segments involves reading  and  rewriting all data
                         in all these  segment  files, potentially multiple
                         times,  which  is  very  I/O  intensive  and  time
                         consuming. However, an  index  with fewer segments
                         can later be merged  faster,  and  it can later be
                         queried  faster  once  deployed  to  a  live  Solr
                         serving shard. Set  maxSegments  to  1 to optimize
                         the index for low query  latency. In a nutshell, a
                         small maxSegments  value  trades  indexing latency
                         for subsequently improved query  latency. This can
                         be  a  reasonable  trade-off  for  batch  indexing
                         systems. (default: 1)
  --dry-run              Run in local mode  and  print  documents to stdout
                         instead of loading them  into  Solr. This executes
                         the  morphline  in  the  client  process  (without
                         submitting a job  to  MR)  for  quicker turnaround
                         during early  trial  &  debug  sessions. (default:
                         false)
  --log4j FILE           Relative or absolute  path  to  a log4j.properties
                         config file on the  local  file  system. This file
                         will  be  uploaded  to   each  MR  task.  Example:
                         /path/to/log4j.properties
  --verbose, -v          Turn on verbose output. (default: false)
  --show-non-solr-cloud  Also show options for  Non-SolrCloud  mode as part
                         of --help. (default: false)

Required arguments:
  --output-dir HDFS_URI  HDFS directory to  write  Solr  indexes to. Inside
                         there one  output  directory  per  shard  will  be
                         generated.    Example:     hdfs://c2202.mycompany.
                         com/user/$USER/test
  --morphline-file FILE  Relative or absolute path  to  a local config file
                         that contains one  or  more  morphlines.  The file
                         must     be      UTF-8      encoded.      Example:
                         /path/to/morphline.conf

Cluster arguments:
  Arguments that provide information about your Solr cluster.

  --zk-host STRING       The address of a ZooKeeper  ensemble being used by
                         a SolrCloud cluster. This  ZooKeeper ensemble will
                         be examined  to  determine  the  number  of output
                         shards to create  as  well  as  the  Solr  URLs to
                         merge the output shards into  when using the --go-
                         live option. Requires that  you  also  pass the --
                         collection to merge the shards into.

                         The   --zk-host   option   implements   the   same
                         partitioning semantics as  the  standard SolrCloud
                         Near-Real-Time (NRT)  API.  This  enables  to  mix
                         batch  updates  from   MapReduce   ingestion  with
                         updates from standard  Solr  NRT  ingestion on the
                         same SolrCloud  cluster,  using  identical  unique
                         document keys.

                         Format is: a  list  of  comma  separated host:port
                         pairs,  each  corresponding   to   a   zk  server.
                         Example: '127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:
                         2183' If the optional  chroot  suffix  is used the
                         example  would  look  like:  '127.0.0.1:2181/solr,
                         127.0.0.1:2182/solr,127.0.0.1:2183/solr'     where
                         the client would  be  rooted  at  '/solr'  and all
                         paths would  be  relative  to  this  root  -  i.e.
                         getting/setting/etc... '/foo/bar' would  result in
                         operations being run on  '/solr/foo/bar' (from the
                         server perspective).

                         If --solr-home-dir  is  not  specified,  the  Solr
                         home  directory   for   the   collection   may  be
                         downloaded from this ZooKeeper ensemble.

Go live arguments:
  Arguments for  merging  the  shards  that  are  built  into  a  live Solr
  cluster. Also see the Cluster arguments.

  --go-live              Allows you to  optionally  merge  the  final index
                         shards into a  live  Solr  cluster  after they are
                         built. You can pass the  ZooKeeper address with --
                         zk-host and the relevant  cluster information will
                         be auto detected.  (default: false)
  --collection STRING    The SolrCloud  collection  to  merge  shards  into
                         when  using  --go-live   and  --zk-host.  Example:
                         collection1
  --go-live-min-replication-factor INTEGER
                         The  minimum  number  of   SolrCloud  replicas  to
                         successfully merge  any  final  index  shard into.
                         The go-live  job  phase  attempts  to  merge final
                         index shards into all  SolrCloud replicas. Some of
                         these merge operations  may  fail,  for example if
                         some  SolrCloud  servers  are  down.  This  option
                         enables indexing  jobs  to  succeed  even  if some
                         such   merge   operations    fail   on   SolrCloud
                         followers. Successful  merge  operations  into all
                         leaders  are  always  required  for  job  success,
                         regardless  of   the   value   of   --go-live-min-
                         replication-factor.    -1     indicates    require
                         successful merge operations  into  all replicas. 1
                         indicates  require  successful   merge  operations
                         only into leader replicas. (default: -1)
  --go-live-threads INTEGER
                         Tuning knob that indicates  the  maximum number of
                         live merges  to  run  in  parallel  at  one  time.
                         (default: 1000)

Generic options supported are:
  --conf <configuration file>
                         specify an application configuration file
  -D <property=value>    define a value for a given property
-fs <file:///|hdfs://namenode:port> specify default filesystem URL to use, overrides 'fs.defaultFS' property from configurations.
  --jt <local|resourcemanager:port>
                         specify a ResourceManager
  --files <file1,...>    specify a  comma-separated  list  of  files  to be
                         copied to the map reduce cluster
  --libjars <jar1,...>   specify a comma-separated list of  jar files to be
                         included in the classpath
  --archives <archive1,...>
                         specify a comma-separated list  of  archives to be
                         unarchived on the compute machines

The general command line syntax is:
command [genericOptions] [commandOptions]

Examples:

# (Re)index an Avro based Twitter tweet file:
sudo -u hdfs hadoop \
  --config /etc/hadoop/conf.cloudera.mapreduce1 \
  jar target/search-mr-*-job.jar org.apache.solr.hadoop.MapReduceIndexerTool \
  -D 'mapred.child.java.opts=-Xmx500m' \
  --log4j src/test/resources/log4j.properties \
  --morphline-file ../search-core/src/test/resources/test-morphlines/tutorialReadAvroContainer.conf \
  --solr-home-dir src/test/resources/solr/minimr \
  --output-dir hdfs://c2202.mycompany.com/user/$USER/test \
  --shards 1 \
  hdfs:///user/$USER/test-documents/sample-statuses-20120906-141433.avro

# (Re)index all files that match all of the following conditions:
# 1) File is contained in dir tree hdfs:///user/$USER/solrloadtest/twitter/tweets
# 2) file name matches the glob pattern 'sample-statuses*.gz'
# 3) file was last modified less than 100000 minutes ago
# 4) file size is between 1 MB and 1 GB
# Also include extra library jar file containing JSON tweet Java parser:
hadoop fs \
  -find hdfs:///user/$USER/solrloadtest/twitter/tweets \
  -type f \
  -name 'sample-statuses*.gz' \
  -mmin -1000000 \
  -size -100000000c \
  -size +1000000c \
| sudo -u hdfs hadoop \
  --config /etc/hadoop/conf.cloudera.mapreduce1 \
  jar target/search-mr-*-job.jar org.apache.solr.hadoop.MapReduceIndexerTool \
  --libjars /path/to/kite-morphlines-twitter-0.10.0.jar \
  -D 'mapred.child.java.opts=-Xmx500m' \
  --log4j src/test/resources/log4j.properties \
  --morphline-file ../search-core/src/test/resources/test-morphlines/tutorialReadJsonTestTweets.conf \
  --solr-home-dir src/test/resources/solr/minimr \
  --output-dir hdfs://c2202.mycompany.com/user/$USER/test \
  --shards 100 \
  --input-list -

# Go live by merging resulting index shards into a live Solr cluster
# (explicitly specify Solr URLs - for a SolrCloud cluster see next example):
sudo -u hdfs hadoop \
  --config /etc/hadoop/conf.cloudera.mapreduce1 \
  jar target/search-mr-*-job.jar org.apache.solr.hadoop.MapReduceIndexerTool \
  -D 'mapred.child.java.opts=-Xmx500m' \
  --log4j src/test/resources/log4j.properties \
  --morphline-file ../search-core/src/test/resources/test-morphlines/tutorialReadAvroContainer.conf \
  --solr-home-dir src/test/resources/solr/minimr \
  --output-dir hdfs://c2202.mycompany.com/user/$USER/test \
  --shard-url http://solr001.mycompany.com:8983/solr/collection1 \
  --shard-url http://solr002.mycompany.com:8983/solr/collection1 \
  --go-live \
  hdfs:///user/foo/indir

# Go live by merging resulting index shards into a live SolrCloud cluster
# (discover shards and Solr URLs through ZooKeeper):
sudo -u hdfs hadoop \
  --config /etc/hadoop/conf.cloudera.mapreduce1 \
  jar target/search-mr-*-job.jar org.apache.solr.hadoop.MapReduceIndexerTool \
  -D 'mapred.child.java.opts=-Xmx500m' \
  --log4j src/test/resources/log4j.properties \
  --morphline-file ../search-core/src/test/resources/test-morphlines/tutorialReadAvroContainer.conf \
  --output-dir hdfs://c2202.mycompany.com/user/$USER/test \
  --zk-host zk01.mycompany.com:2181/solr \
  --collection collection1 \
  --go-live \
  hdfs:///user/foo/indir

# MapReduce on Yarn - Pass custom JVM arguments (including a custom tmp directory)
HADOOP_CLIENT_OPTS='-DmaxConnectionsPerHost=10000 -DmaxConnections=10000 -Djava.io.tmpdir=/my/tmp/dir/'; \
sudo -u hdfs hadoop \
  --config /etc/hadoop/conf.cloudera.mapreduce1 \
  jar target/search-mr-*-job.jar org.apache.solr.hadoop.MapReduceIndexerTool \
  -D 'mapreduce.map.java.opts=-DmaxConnectionsPerHost=10000 -DmaxConnections=10000' \
  -D 'mapreduce.reduce.java.opts=-DmaxConnectionsPerHost=10000 -DmaxConnections=10000' \
  --log4j src/test/resources/log4j.properties \
  --morphline-file ../search-core/src/test/resources/test-morphlines/tutorialReadAvroContainer.conf \
  --solr-home-dir src/test/resources/solr/minimr \
  --output-dir hdfs://c2202.mycompany.com/user/$USER/test \
  --shards 1 \
  hdfs:///user/$USER/test-documents/sample-statuses-20120906-141433.avro