Tuning Hive on Spark
Minimum Required Role: Configurator (also provided by Cluster Administrator, Full Administrator)
Hive on Spark provides better performance than Hive on MapReduce while offering the same features. Running Hive on Spark requires no changes to user queries. Specifically, user-defined functions (UDFs) are fully supported, and most performance-related configurations work with the same semantics.
This topic describes how to configure and tune Hive on Spark for optimal performance. This topic assumes that your cluster is managed by Cloudera Manager and that you use YARN as the Spark cluster manager.
The example described in the following sections assumes a 40-host YARN cluster, and each host has 32 cores and 120 GB memory.
The YARN properties yarn.nodemanager.resource.cpu-vcores and yarn.nodemanager.resource.memory-mb determine how cluster resources can be used by Hive on Spark (and other YARN applications). The values for the two properties are determined by the capacity of your host and the number of other non-YARN applications that coexist on the same host. Most commonly, only YARN NodeManager and HDFS DataNode services are running on worker hosts.
Allocate 1 core for each of the services and 2 additional cores for OS usage, leaving 28 cores available for YARN. To do so, set yarn.nodemanager.resource.cpu-vcores=28.
Allocate 20 GB memory for these services and processes. To do so, set yarn.nodemanager.resource.memory-mb=100 GB.
For more information on tuning YARN, see Tuning YARN.
After allocating resources to YARN, you define how Spark uses the resources: executor and driver memory, executor allocation, and parallelism.
Configuring Executor Memory
Spark executor configurations are described in Configuring Spark on YARN Applications.
When setting executor memory size, consider the following factors:
- More executor memory enables map join optimization for more queries, but can result in increased overhead due to garbage collection.
- In some cases the HDFS client does not handle concurrent writers well, so a race condition can occur if an executor has too many cores.
To minimize the number of unused cores, Cloudera recommends setting spark.executor.cores to 4, 5, or 6, depending on the number of cores allocated for YARN. In this example, the number of available cores is 28.
Because 28 cores is divisible by 4, set spark.executor.cores to 4. Setting it to 6 would leave 4 cores unused; setting it to 5 leaves 3 cores unused. (If there were 24 cores, you could specify either 4 or 6, because 24 is divisible by either.) With spark.executor.cores set to 4, the maximum number of executors that can run concurrently on a host is seven (28 / 4). Divide the total memory among these executors, with each getting approximately 14 GB (100 / 7).
The total memory allocated to an executor includes spark.executor.memory and spark.yarn.executor.memoryOverhead. Cloudera recommends setting spark.yarn.executor.memoryOverhead to 15-20% of the total memory size that is, set spark.executor.memoryOverhead=2 G and spark.executor.memory=12 G.
With these configurations, each host can run up to 7 executors at a time. Each executor can run up to 4 tasks (one per core). So, each task has on average 3.5 GB (14 / 4) memory. All tasks running in an executor share the same heap space.
Make sure the sum of spark.executor.memoryOverhead and spark.executor.memory is less than yarn.scheduler.maximum-allocation-mb.
Configuring Driver Memory
- spark.driver.memory—Maximum size of each Spark driver's Java heap memory when Hive is running on Spark.
- spark.yarn.driver.memoryOverhead—Amount of extra off-heap memory that can be requested from YARN, per driver. This, together with spark.driver.memory, is the total memory that YARN can use to create a JVM for a driver process.
- 12 GB when X is greater than 50 GB
- 4 GB when X is between 12 GB and 50 GB
- 1 GB when X is between 1GB and 12 GB
- 256 MB when X is less than 1 GB
These numbers are for the sum of spark.driver.memory and spark.yarn.driver.memoryOverhead. Overhead should be 10-15% of the total. In this example, yarn.nodemanager.resource.memory-mb=100 GB, so the total memory for the Spark driver can be set to 12 GB. As a result, memory settings are spark.driver.memory=10.5gb and spark.yarn.driver.memoryOverhead=1.5gb.
Choosing the Number of Executors
The number of executors for a cluster is determined by the number of executors on each host and the number of worker hosts in the cluster. If you have 40 worker hosts in your cluster, the maximum number of executors that Hive can use to run Hive on Spark jobs is 160 (40 x 4). The maximum is slightly smaller than this because the driver uses one core and 12 GB total driver memory. This assumes that no other YARN applications are running.
Hive performance is directly related to the number of executors used to run a query. However, the characteristics vary from query to query. In general, performance is proportional to the number of executors. For example, using four executors for a query takes approximately half of the time of using two executors. However, performance peaks at a certain number of executors, above which increasing the number does not improve performance and can have an adverse impact.
In most cases, using half of the cluster capacity (half the number of executors) provides good performance. To achieve maximum performance, it is usually a good idea to use all available executors. For example, set spark.executor.instances=160. For benchmarking and performance measurement, this is strongly recommended.
Dynamic Executor Allocation
Although setting spark.executor.instances to the maximum value usually maximizes performance, doing so is not recommended for a production environment in which multiple users are running Hive queries. Avoid allocating a fixed number of executors for a user session, because the executors cannot be used by other user queries if they are idle. In a production environment, plan for executor allocation that allows greater resource sharing.
Spark allows you to dynamically scale the set of cluster resources allocated to a Spark application based on the workload. To enable dynamic allocation, follow the procedure in Dynamic Allocation. Except in certain circumstances, Cloudera strongly recommends enabling dynamic allocation.
For available executors to be fully utilized you must run enough tasks concurrently (in parallel). In most cases, Hive determines parallelism automatically for you, but you may have some control in tuning concurrency. On the input side, the number of map tasks is equal to the number of splits generated by the input format. For Hive on Spark, the input format is CombineHiveInputFormat, which can group the splits generated by the underlying input formats as required. You have more control over parallelism at the stage boundary. Adjust hive.exec.reducers.bytes.per.reducer to control how much data each reducer processes, and Hive determines an optimal number of partitions, based on the available executors, executor memory settings, the value you set for the property, and other factors. Experiments show that Spark is less sensitive than MapReduce to the value you specify for hive.exec.reducers.bytes.per.reducer, as long as enough tasks are generated to keep all available executors busy. For optimal performance, pick a value for the property so that Hive generates enough tasks to fully use all available executors.
Hive on Spark shares most if not all Hive performance-related configurations. You can tune those parameters much as you would for MapReduce. However, hive.auto.convert.join.noconditionaltask.size, which is the threshold for converting common join to map join based on statistics, can have a significant performance impact. Although this configuration is used for both Hive on MapReduce and Hive on Spark, it is interpreted differently by each.
The size of data is described by two statistics:
- totalSize—Approximate size of data on disk
- rawDataSize—Approximate size of data in memory
Hive on MapReduce uses totalSize. When both are available, Hive on Spark uses rawDataSize. Because of compression and serialization, a large difference between totalSize and rawDataSize can occur for the same dataset. For Hive on Spark, you might need to specify a larger value for hive.auto.convert.join.noconditionaltask.size to convert the same join to a map join. You can increase the value for this parameter to make map join conversion more aggressive. Converting common joins to map joins can improve performance. Alternatively, if this value is set too high, too much memory is used by data from small tables, and tasks may fail because they run out of memory. Adjust this value according to your cluster environment.
You can control whether rawDataSize statistics should be collected, using the property hive.stats.collect.rawdatasize. Cloudera recommends setting this to true in Hive (the default).
Cloudera also recommends setting two additional configuration properties, using a Cloudera Manager advanced configuration snippet for HiveServer2:
hive.optimize.reducededuplication.min.reducer=4 hive.optimize.reducededuplication=true hive.merge.mapfiles=true hive.merge.mapredfiles=false hive.merge.smallfiles.avgsize=16000000 hive.merge.size.per.task=256000000 hive.merge.sparkfiles=true hive.auto.convert.join=true hive.auto.convert.join.noconditionaltask=true hive.auto.convert.join.noconditionaltask.size=20M(might need to increase for Spark, 200M) hive.optimize.bucketmapjoin.sortedmerge=false hive.map.aggr.hash.percentmemory=0.5 hive.map.aggr=true hive.optimize.sort.dynamic.partition=false hive.stats.autogather=true hive.stats.fetch.column.stats=true hive.compute.query.using.stats=true hive.limit.pushdown.memory.usage=0.4 (MR and Spark) hive.optimize.index.filter=true hive.exec.reducers.bytes.per.reducer=67108864 hive.smbjoin.cache.rows=10000 hive.fetch.task.conversion=more hive.fetch.task.conversion.threshold=1073741824 hive.optimize.ppd=true
Pre-warming YARN Containers
When you submit your first query after starting a new session, you may experience a slightly longer delay before you see the query start. You may also notice that if you run the same query again, it finishes much faster than the first one.
Spark executors need extra time to start and initialize for the Spark on YARN cluster, which causes longer latency. In addition, Spark does not wait for all executors to be ready before starting the job so some executors may be still starting up after the job is submitted to the cluster. However, for jobs running on Spark, the number of available executors at the time of job submission partly determines the number of reducers. When the number of ready executors has not reached the maximum, the job may not have maximal parallelism. This can further impact performance for the first job.
In long-lived user sessions, this extra time causes no problems because it only happens on the first query execution. However short-lived sessions, such as Hive jobs launched by Oozie, may not achieve optimal performance.
To reduce startup time, you can enable container pre-warming before a job starts. The job starts running only when the requested executors are ready. This way, a short-lived session parallelism is not decreased on the reduce side.
To enable pre-warming, set hive.prewarm.enabled to true before the query is issued. You can also set the number of containers by setting hive.prewarm.numcontainers. The default is 10.
The actual number of executors to pre-warm is capped by the value of either spark.executor.instances (static allocation) or spark.dynamicAllocation.maxExecutors (dynamic allocation). The value for hive.prewarm.numcontainers should not exceed that allocated to a user session.