This is the documentation for CDH 5.0.x. Documentation for other versions is available at Cloudera Documentation.

Tuning Impala for Performance

The following sections explain the factors affecting the performance of Impala features, and procedures for tuning, monitoring, and benchmarking Impala queries and other SQL operations.

This section also describes techniques for maximizing Impala scalability. Scalability is tied to performance: it means that performance remains high as the system workload increases. For example, reducing the disk I/O performed by a query can speed up an individual query, and at the same time improve scalability by making it practical to run more queries simultaneously. Sometimes, an optimization technique improves scalability more than performance. For example, reducing memory usage for a query might not change the query performance much, but might improve scalability by allowing more Impala queries or other kinds of jobs to run at the same time without running out of memory.

  Note:

Before starting any performance tuning or benchmarking, make sure your system is configured with all the recommended minimum hardware requirements from Hardware Requirements and software settings from Post-Installation Configuration for Impala.

  • Partitioning. This technique physically divides the data based on the different values in frequently queried columns, allowing queries to skip reading a large percentage of the data in a table.
  • Performance Considerations for Join Queries. Joins are the main class of queries that you can tune at the SQL level, as opposed to changing physical factors such as the file format or the hardware configuration. The related topics Column Statistics and Table Statistics are also important primarily for join performance.
  • Table Statistics and Column Statistics. Gathering table and column statistics, using the COMPUTE STATS statement, helps Impala automatically optimize the performance for join queries, without requiring changes to SQL query statements. (This process is greatly simplified in Impala 1.2.2 and higher, because the COMPUTE STATS statement gathers both kinds of statistics in one operation, and does not require any setup and configuration as was previously necessary for the ANALYZE TABLE statement in Hive.)
  • Testing Impala Performance. Do some post-setup testing to ensure Impala is using optimal settings for performance, before conducting any benchmark tests.
  • Benchmarking Impala Queries. The configuration and sample data that you use for initial experiments with Impala is often not appropriate for doing performance tests.
  • Controlling Resource Usage. The more memory Impala can utilize, the better query performance you can expect. In a cluster running other kinds of workloads as well, you must make tradeoffs to make sure all Hadoop components have enough memory to perform well, so you might cap the memory that Impala can use.

Continue reading:

Impala Performance Guidelines and Best Practices

Here are performance guidelines and best practices that you can use during planning, experimentation, and performance tuning for an Impala-enabled CDH cluster. All of this information is also available in more detail elsewhere in the Impala documentation; it is gathered together here to serve as a cookbook and emphasize which performance techniques typically provide the highest return on investment

  1. Choose the appropriate file format for the data. Typically, for large volumes of data (multiple gigabytes per table or partition), the Parquet file format performs best because of its combination of columnar storage layout, large I/O request size, and compression and encoding. See How Impala Works with Hadoop File Formats for comparisons of all file formats supported by Impala, and Using the Parquet File Format with Impala Tables for details about the Parquet file format.
      Note: For smaller volumes of data, a few gigabytes or less for each table or partition, you might not see significant performance differences between file formats. At small data volumes, reduced I/O from an efficient compressed file format can be counterbalanced by reduced opportunity for parallel execution. When planning for a production deployment or conducting benchmarks, always use realistic data volumes to get a true picture of performance and scalability.
  2. Avoid data ingestion processes that produce many small files. Always use INSERT ... SELECT to copy significant volumes of data from table to table. Avoid INSERT ... VALUES for any substantial volume of data or performance-critical tables, because each such statement produces a separate tiny data file. See INSERT Statement for examples of the INSERT ... SELECT syntax.

    For example, if you have thousands of partitions in a Parquet table, each with less than 1 GB of data, consider partitioning in a less granular way, such as by year / month rather than year / month / day. If an inefficient data ingestion process produces thousands of data files in the same table or partition, consider compacting the data by performing an INSERT ... SELECT to copy all the data to a different table; the data will be reorganized into a smaller number of larger files by this process.

  3. Use partitioning when appropriate. Choose the right level of granularity to avoid the "many small files" problem. Prefer to have several gigabytes of data or more in each partition, to take advantage of HDFS bulk I/O and Impala distributed queries. See Partitioning for details.
  4. Gather statistics for all tables used in performance-critical or high-volume join queries, using the COMPUTE STATS statement. See Performance Considerations for Join Queries for details.
  5. Minimize the overhead of transmitting results back to the client, by using techniques such as:
    • Aggregation. If you need to know how many rows match a condition, the total values of matching values from some column, the lowest or highest matching value, and so on, call aggregate functions such as COUNT(), SUM(), and MAX() in the query rather than sending the result set to an application and doing those computations there. Remember that the size of an unaggregated result set could be huge, requiring substantial time to transmit across the network.
    • Filtering. Use all applicable tests in the WHERE clause of a query to eliminate rows that are not relevant, rather than producing a big result set and filtering it using application logic.
    • LIMIT clause. If you only need to see a few sample values from a result set, or the top or bottom values from a query using ORDER BY, include the LIMIT clause to reduce the size of the result set rather than asking for the full result set and then throwing most of the rows away.
    • Avoid overhead from pretty-printing the result set and displaying it on the screen. When you retrieve the results through impala-shell, use impala-shell options such as -B and --output_delimiter to produce results without special formatting, and redirect output to a file rather than printing to the screen. Consider using INSERT ... SELECT to write the results directly to new files in HDFS. See impala-shell Command-Line Options for details about the impala-shell command-line options.
  6. Verify that your queries are planned in an efficient logical manner, by examining the EXPLAIN plan for a query before actually running it. See EXPLAIN Statement and Using the EXPLAIN Plan for Performance Tuning for details.
  7. Verify that the low-level aspects of I/O, memory usage, network bandwidth, CPU utilization, and so on are within expected ranges by examining the query profile for a query after running it. See Using the Query Profile for Performance Tuning for details.

Performance Considerations for Join Queries

Queries involving join operations often require more tuning than queries that refer to only one table. The maximum size of the result set from a join query is the product of the number of rows in all the joined tables. When joining several tables with millions or billions of rows, any missed opportunity to filter the result set, or other inefficiency in the query, could lead to an operation that does not finish in a practical time and has to be cancelled.

The simplest technique for tuning an Impala join query is to collect statistics on each table involved in the join using the COMPUTE STATS statement, and then let Impala automatically optimize the query based on the size of each table, number of distinct values of each column, and so on. The COMPUTE STATS statement and the join optimization are new features introduced in Impala 1.2.2. For accurate statistics about each table, issue the COMPUTE STATS statement after loading the data into that table, and again if the amount of data changes substantially due to an INSERT, LOAD DATA, adding a partition, and so on.

If statistics are not available for all the tables in the join query, or if Impala chooses a join order that is not the most efficient, you can override the automatic join order optimization by specifying the STRAIGHT_JOIN keyword immediately after the SELECT keyword. In this case, Impala uses the order the tables appear in the query to guide how the joins are processed.

When you use the STRAIGHT_JOIN technique, you must order the tables in the join query manually instead of relying on the Impala optimizer. The optimizer uses sophisticated techniques to estimate the size of the result set at each stage of the join. For manual ordering, use this heuristic approach to start with, and then experiment to fine-tune the order:

  • Specify the largest table first. This table is read from disk by each Impala node and so its size is not significant in terms of memory usage during the query.
  • Next, specify the smallest table. The contents of the second, third, and so on tables are all transmitted across the network. You want to minimize the size of the result set from each subsequent stage of the join query. The most likely approach involves joining a small table first, so that the result set remains small even as subsequent larger tables are processed.
  • Join the next smallest table, then the next smallest, and so on.
  • For example, if you had tables BIG, MEDIUM, SMALL, and TINY, the logical join order to try would be BIG, TINY, SMALL, MEDIUM.

The terms "largest" and "smallest" refers to the size of the intermediate result set based on the number of rows and columns from each table that are part of the result set. For example, if you join one table sales with another table customers, a query might find results from 100 different customers who made a total of 5000 purchases. In that case, you would specify SELECT ... FROM sales JOIN customers ..., putting customers on the right side because it is smaller in the context of this query.

The Impala query planner chooses between different techniques for performing join queries, depending on the absolute and relative sizes of the tables. Broadcast joins are the default, where the right-hand table is considered to be smaller than the left-hand table, and its contents are sent to all the other nodes involved in the query. The alternative technique is known as a partitioned join (not related to a partitioned table), which is more suitable for large tables of roughly equal size. With this technique, portions of each table are sent to appropriate other nodes where those subsets of rows can be processed in parallel. The choice of broadcast or partitioned join also depends on statistics being available for all tables in the join, gathered by the COMPUTE STATS statement.

To see which join strategy is used for a particular query, issue an EXPLAIN statement for the query. If you find that a query uses a broadcast join when you know through benchmarking that a partitioned join would be more efficient, or vice versa, add a hint to the query to specify the precise join mechanism to use. See Hints for details.

How Joins Are Processed when Statistics Are Unavailable

If table or column statistics are not available for some tables in a join, Impala still reorders the tables using the information that is available. Tables with statistics are placed on the left side of the join order, in descending order of cost based on overall size and cardinality. Tables without statistics are treated as zero-size, that is, they are always placed on the right side of the join order.

Overriding Join Reordering with STRAIGHT_JOIN

If an Impala join query is inefficient because of outdated statistics or unexpected data distribution, you can keep Impala from reordering the joined tables by using the STRAIGHT_JOIN keyword immediately after the SELECT keyword. The STRAIGHT_JOIN keyword turns off the reordering of join clauses that Impala does internally, and produces a plan that relies on the join clauses being ordered optimally in the query text. In this case, rewrite the query so that the largest table is on the left, followed by the next largest, and so on until the smallest table is on the right.

In this example, the subselect from the BIG table produces a very small result set, but the table might still be treated as if it were the biggest and placed first in the join order. Using STRAIGHT_JOIN for the last join clause prevents the final table from being reordered, keeping it as the rightmost table in the join order.

select straight_join x from medium join small join (select * from big where c1 < 10) as big
  where medium.id = small.id and small.id = big.id;

Examples of Join Order Optimization

Here are examples showing joins between tables with 1 billion, 200 million, and 1 million rows. (In this case, the tables are unpartitioned and using Parquet format.) The smaller tables contain subsets of data from the largest one, for convenience of joining on the unique ID column. The smallest table only contains a subset of columns from the others.

[localhost:21000] > create table big stored as parquet as select * from raw_data;
+----------------------------+
| summary                    |
+----------------------------+
| Inserted 1000000000 row(s) |
+----------------------------+
Returned 1 row(s) in 671.56s
[localhost:21000] > desc big;
+-----------+---------+---------+
| name      | type    | comment |
+-----------+---------+---------+
| id        | int     |         |
| val       | int     |         |
| zfill     | string  |         |
| name      | string  |         |
| assertion | boolean |         |
+-----------+---------+---------+
Returned 5 row(s) in 0.01s
[localhost:21000] > create table medium stored as parquet as select * from big limit 200 * floor(1e6);
+---------------------------+
| summary                   |
+---------------------------+
| Inserted 200000000 row(s) |
+---------------------------+
Returned 1 row(s) in 138.31s
[localhost:21000] > create table small stored as parquet as select id,val,name from big where assertion = true limit 1 * floor(1e6);
+-------------------------+
| summary                 |
+-------------------------+
| Inserted 1000000 row(s) |
+-------------------------+
Returned 1 row(s) in 6.32s

For any kind of performance experimentation, use the EXPLAIN statement to see how any expensive query will be performed without actually running it, and enable verbose EXPLAIN plans containing more performance-oriented detail: The most interesting plan lines are highlighted in bold, showing that without statistics for the joined tables, Impala cannot make a good estimate of the number of rows involved at each stage of processing, and is likely to stick with the BROADCAST join mechanism that sends a complete copy of one of the tables to each node.

[localhost:21000] > set explain_level=verbose;
EXPLAIN_LEVEL set to verbose
[localhost:21000] > explain select count(*) from big join medium where big.id = medium.id;
+----------------------------------------------------------+
| Explain String                                           |
+----------------------------------------------------------+
| Estimated Per-Host Requirements: Memory=2.10GB VCores=2  |
|                                                          |
| PLAN FRAGMENT 0                                          |
|   PARTITION: UNPARTITIONED                               |
|                                                          |
|   6:AGGREGATE (merge finalize)                           |
|   |  output: SUM(COUNT(*))                               |
|   |  cardinality: 1                                      |
|   |  per-host memory: unavailable                        |
|   |  tuple ids: 2                                        |
|   |                                                      |
|   5:EXCHANGE                                             |
|      cardinality: 1                                      |
|      per-host memory: unavailable                        |
|      tuple ids: 2                                        |
|                                                          |
| PLAN FRAGMENT 1                                          |
|   PARTITION: RANDOM                                      |
|                                                          |
|   STREAM DATA SINK                                       |
|     EXCHANGE ID: 5                                       |
|     UNPARTITIONED                                        |
|                                                          |
|   3:AGGREGATE                                            |
|   |  output: COUNT(*)                                    |
|   |  cardinality: 1                                      |
|   |  per-host memory: 10.00MB                            |
|   |  tuple ids: 2                                        |
|   |                                                      |
|   2:HASH JOIN                                            |
|   |  join op: INNER JOIN (BROADCAST)                     |
|   |  hash predicates:                                    |
|   |    big.id = medium.id                                |
|   |  cardinality: unavailable                            |
|   |  per-host memory: 2.00GB                             |
|   |  tuple ids: 0 1                                      |
|   |                                                      |
|   |----4:EXCHANGE                                        |
|   |       cardinality: unavailable                       |
|   |       per-host memory: 0B                            |
|   |       tuple ids: 1                                   |
|   |                                                      |
|   0:SCAN HDFS                                            |
|      table=join_order.big #partitions=1/1 size=23.12GB   |
|      table stats: unavailable                            |
|      column stats: unavailable                           |
|      cardinality: unavailable                            |
|      per-host memory: 88.00MB                            |
|      tuple ids: 0                                        |
|                                                          |
| PLAN FRAGMENT 2                                          |
|   PARTITION: RANDOM                                      |
|                                                          |
|   STREAM DATA SINK                                       |
|     EXCHANGE ID: 4                                       |
|     UNPARTITIONED                                        |
|                                                          |
|   1:SCAN HDFS                                            |
|      table=join_order.medium #partitions=1/1 size=4.62GB |
|      table stats: unavailable                            |
|      column stats: unavailable                           |
|      cardinality: unavailable                            |
|      per-host memory: 88.00MB                            |
|      tuple ids: 1                                        |
+----------------------------------------------------------+
Returned 64 row(s) in 0.04s

Gathering statistics for all the tables is straightforward, one COMPUTE STATS statement per table:

[localhost:21000] > compute stats small;
+-----------------------------------------+
| summary                                 |
+-----------------------------------------+
| Updated 1 partition(s) and 3 column(s). |
+-----------------------------------------+
Returned 1 row(s) in 4.26s
[localhost:21000] > compute stats medium;
+-----------------------------------------+
| summary                                 |
+-----------------------------------------+
| Updated 1 partition(s) and 5 column(s). |
+-----------------------------------------+
Returned 1 row(s) in 42.11s
[localhost:21000] > compute stats big;
+-----------------------------------------+
| summary                                 |
+-----------------------------------------+
| Updated 1 partition(s) and 5 column(s). |
+-----------------------------------------+
Returned 1 row(s) in 165.44s

With statistics in place, Impala can choose a more effective join order rather than following the left-to-right sequence of tables in the query, and can choose BROADCAST or PARTITIONED join strategies based on the overall sizes and number of rows in the table:

[localhost:21000] > explain select count(*) from medium join big where big.id = medium.id;
Query: explain select count(*) from medium join big where big.id = medium.id
+-----------------------------------------------------------+
| Explain String                                            |
+-----------------------------------------------------------+
| Estimated Per-Host Requirements: Memory=937.23MB VCores=2 |
|                                                           |
| PLAN FRAGMENT 0                                           |
|   PARTITION: UNPARTITIONED                                |
|                                                           |
|   6:AGGREGATE (merge finalize)                            |
|   |  output: SUM(COUNT(*))                                |
|   |  cardinality: 1                                       |
|   |  per-host memory: unavailable                         |
|   |  tuple ids: 2                                         |
|   |                                                       |
|   5:EXCHANGE                                              |
|      cardinality: 1                                       |
|      per-host memory: unavailable                         |
|      tuple ids: 2                                         |
|                                                           |
| PLAN FRAGMENT 1                                           |
|   PARTITION: RANDOM                                       |
|                                                           |
|   STREAM DATA SINK                                        |
|     EXCHANGE ID: 5                                        |
|     UNPARTITIONED                                         |
|                                                           |
|   3:AGGREGATE                                             |
|   |  output: COUNT(*)                                     |
|   |  cardinality: 1                                       |
|   |  per-host memory: 10.00MB                             |
|   |  tuple ids: 2                                         |
|   |                                                       |
|   2:HASH JOIN                                             |
|   |  join op: INNER JOIN (BROADCAST)                      |
|   |  hash predicates:                                     |
|   |    big.id = medium.id                                 |
|   |  cardinality: 1443004441                              |
|   |  per-host memory: 839.23MB                            |
|   |  tuple ids: 1 0                                       |
|   |                                                       |
|   |----4:EXCHANGE                                         |
|   |       cardinality: 200000000                          |
|   |       per-host memory: 0B                             |
|   |       tuple ids: 0                                    |
|   |                                                       |
|   1:SCAN HDFS                                             |
|      table=join_order.big #partitions=1/1 size=23.12GB    |
|      table stats: 1000000000 rows total                   |
|      column stats: all                                    |
|      cardinality: 1000000000                              |
|      per-host memory: 88.00MB                             |
|      tuple ids: 1                                         |
|                                                           |
| PLAN FRAGMENT 2                                           |
|   PARTITION: RANDOM                                       |
|                                                           |
|   STREAM DATA SINK                                        |
|     EXCHANGE ID: 4                                        |
|     UNPARTITIONED                                         |
|                                                           |
|   0:SCAN HDFS                                             |
|      table=join_order.medium #partitions=1/1 size=4.62GB  |
|      table stats: 200000000 rows total                    |
|      column stats: all                                    |
|      cardinality: 200000000                               |
|      per-host memory: 88.00MB                             |
|      tuple ids: 0                                         |
+-----------------------------------------------------------+
Returned 64 row(s) in 0.04s

[localhost:21000] > explain select count(*) from small join big where big.id = small.id;
Query: explain select count(*) from small join big where big.id = small.id
+-----------------------------------------------------------+
| Explain String                                            |
+-----------------------------------------------------------+
| Estimated Per-Host Requirements: Memory=101.15MB VCores=2 |
|                                                           |
| PLAN FRAGMENT 0                                           |
|   PARTITION: UNPARTITIONED                                |
|                                                           |
|   6:AGGREGATE (merge finalize)                            |
|   |  output: SUM(COUNT(*))                                |
|   |  cardinality: 1                                       |
|   |  per-host memory: unavailable                         |
|   |  tuple ids: 2                                         |
|   |                                                       |
|   5:EXCHANGE                                              |
|      cardinality: 1                                       |
|      per-host memory: unavailable                         |
|      tuple ids: 2                                         |
|                                                           |
| PLAN FRAGMENT 1                                           |
|   PARTITION: RANDOM                                       |
|                                                           |
|   STREAM DATA SINK                                        |
|     EXCHANGE ID: 5                                        |
|     UNPARTITIONED                                         |
|                                                           |
|   3:AGGREGATE                                             |
|   |  output: COUNT(*)                                     |
|   |  cardinality: 1                                       |
|   |  per-host memory: 10.00MB                             |
|   |  tuple ids: 2                                         |
|   |                                                       |
|   2:HASH JOIN                                             |
|   |  join op: INNER JOIN (BROADCAST)                      |
|   |  hash predicates:                                     |
|   |    big.id = small.id                                  |
|   |  cardinality: 1000000000                              |
|   |  per-host memory: 3.15MB                              |
|   |  tuple ids: 1 0                                       |
|   |                                                       |
|   |----4:EXCHANGE                                         |
|   |       cardinality: 1000000                            |
|   |       per-host memory: 0B                             |
|   |       tuple ids: 0                                    |
|   |                                                       |
|   1:SCAN HDFS                                             |
|      table=join_order.big #partitions=1/1 size=23.12GB    |
|      table stats: 1000000000 rows total                   |
|      column stats: all                                    |
|      cardinality: 1000000000                              |
|      per-host memory: 88.00MB                             |
|      tuple ids: 1                                         |
|                                                           |
| PLAN FRAGMENT 2                                           |
|   PARTITION: RANDOM                                       |
|                                                           |
|   STREAM DATA SINK                                        |
|     EXCHANGE ID: 4                                        |
|     UNPARTITIONED                                         |
|                                                           |
|   0:SCAN HDFS                                             |
|      table=join_order.small #partitions=1/1 size=17.93MB  |
|      table stats: 1000000 rows total                      |
|      column stats: all                                    |
|      cardinality: 1000000                                 |
|      per-host memory: 32.00MB                             |
|      tuple ids: 0                                         |
+-----------------------------------------------------------+
Returned 64 row(s) in 0.03s

When queries like these are actually run, the execution times are relatively consistent regardless of the table order in the query text. Here are examples using both the unique ID column and the VAL column containing duplicate values:

[localhost:21000] > select count(*) from big join small on (big.id = small.id);
Query: select count(*) from big join small on (big.id = small.id)
+----------+
| count(*) |
+----------+
| 1000000  |
+----------+
Returned 1 row(s) in 21.68s
[localhost:21000] > select count(*) from small join big on (big.id = small.id);
Query: select count(*) from small join big on (big.id = small.id)
+----------+
| count(*) |
+----------+
| 1000000  |
+----------+
Returned 1 row(s) in 20.45s

[localhost:21000] > select count(*) from big join small on (big.val = small.val);
+------------+
| count(*)   |
+------------+
| 2000948962 |
+------------+
Returned 1 row(s) in 108.85s
[localhost:21000] > select count(*) from small join big on (big.val = small.val);
+------------+
| count(*)   |
+------------+
| 2000948962 |
+------------+
Returned 1 row(s) in 100.76s
  Note: When examining the performance of join queries and the effectiveness of the join order optimization, make sure the query involves enough data and cluster resources to see a difference depending on the query plan. For example, a single data file of just a few megabytes will reside in a single HDFS block and be processed on a single node. Likewise, if you use a single-node or two-node cluster, there might not be much difference in efficiency for the broadcast or partitioned join strategies.

How Impala Uses Statistics for Query Optimization

Impala can do better optimization for complex or multi-table queries when statistics are available, to better understand the volume of data and how the values are distributed, and use this information to help parallelize and distribute the work for a query. The following sections describe the categories of statistics Impala can work with, and how to produce them and keep them up to date.

Originally, Impala relied on the Hive mechanism for collecting statistics, through the Hive ANALYZE TABLE statement which initiates a MapReduce job. For better user-friendliness and reliability, Impala implements its own COMPUTE STATS statement in Impala 1.2.2 and higher, along with the SHOW TABLE STATS and SHOW COLUMN STATS statements.

Table Statistics

The Impala query planner can make use of statistics about entire tables and partitions when that metadata is available in the metastore database. This metadata is used on its own for certain optimizations, and used in combination with column statistics for other optimizations.

To gather table statistics after loading data into a table or partition, use one of the following techniques:

  • Issue the statement COMPUTE STATS in Impala. This statement, new in Impala 1.2.2, is the preferred method because:
    • It gathers table statistics and statistics for all partitions and columns in a single operation.
    • It does not rely on any special Hive settings, metastore configuration, or separate database to hold the statistics.
    • If you need to adjust statistics incrementally for an existing table, such as after adding a partition or inserting new data, you can use an ALTER TABLE statement such as:
      alter table analysis_data set tblproperties('numRows'='new_value');
      to update that one property rather than re-processing the whole table.
  • Load the data through the INSERT OVERWRITE statement in Hive, while the Hive setting hive.stats.autogather is enabled.
  • Issue an ANALYZE TABLE statement in Hive, for the entire table or a specific partition.
    ANALYZE TABLE tablename [PARTITION(partcol1[=val1], partcol2[=val2], ...)] COMPUTE STATISTICS [NOSCAN];
    For example, to gather statistics for a non-partitioned table:
    ANALYZE TABLE customer COMPUTE STATISTICS;
    To gather statistics for a store table partitioned by state and city, and both of its partitions:
    ANALYZE TABLE store PARTITION(s_state, s_county) COMPUTE STATISTICS;
    To gather statistics for the store table and only the partitions for California:
    ANALYZE TABLE store PARTITION(s_state='CA', s_county) COMPUTE STATISTICS;

To check that table statistics are available for a table, and see the details of those statistics, use the statement SHOW TABLE STATS table_name. See SHOW Statement for details.

If you use the Hive-based methods of gathering statistics, see the Hive wiki for information about the required configuration on the Hive side. Cloudera recommends using the Impala COMPUTE STATS statement to avoid potential configuration and scalability issues with the statistics-gathering process.

Column Statistics

The Impala query planner can make use of statistics about individual columns when that metadata is available in the metastore database. This technique is most valuable for columns compared across tables in join queries, to help estimate how many rows the query will retrieve from each table. Currently, Impala does not create this metadata itself. Use the ANALYZE TABLE statement in the Hive shell to gather these statistics. (This statement works from Hive whether you create the table in Impala or in Hive.)

  Note: For column statistics to be effective in Impala, you also need to have table statistics for the applicable tables, as described in Table Statistics. If you use the Impala COMPUTE STATS statement, both table and column statistics are automatically gathered at the same time, for all columns in the table.

To check whether column statistics are available for a particular set of columns, use the SHOW COLUMN STATS table_name statement, or check the extended EXPLAIN output for a query against that table that refers to those columns. See SHOW Statement and EXPLAIN Statement for details.

Setting Statistics Manually through ALTER TABLE

The most crucial piece of data in all the statistics is the number of rows in the table (for an unpartitioned table) or for each partition (for a partitioned table). The COMPUTE STATS statement always gathers statistics about all columns, as well as overall table statistics. If it is not practical to do an entire COMPUTE STATS operation after adding a partition or inserting data, or if you can see that Impala would produce a more efficient plan if the number of rows was different, you can manually set the number of rows through an ALTER TABLE statement:

create table analysis_data stored as parquet as select * from raw_data;
Inserted 1000000000 rows in 181.98s
compute stats analysis_data;
insert into analysis_data select * from smaller_table_we_forgot_before;
Inserted 1000000 rows in 15.32s
-- Now there are 1001000000 rows. We can update this single data point in the stats.
alter table analysis_data set tblproperties('numRows'='1001000000');

For a partitioned table, update both the per-partition number of rows and the number of rows for the whole table:

-- If the table originally contained 1000000 rows, and we add another partition,
-- change the numRows property for the partition and the overall table.
alter table partitioned_data partition(year=2009, month=4) set tblproperties ('numRows'='30000');
alter table partitioned_data set tblproperties ('numRows'='1030000');

In practice, the COMPUTE STATS statement should be fast enough that this technique is not needed. It is most useful as a workaround for in case of performance issues where you might adjust the numRows value higher or lower to produce the ideal join order.

Examples of Using Table and Column Statistics with Impala

The following examples walk through a sequence of SHOW TABLE STATS, SHOW COLUMN STATS, ALTER TABLE, and SELECT and INSERT statements to illustrate various aspects of how Impala uses statistics to help optimize queries.

This example shows table and column statistics for the STORE column used in the TPC-DS benchmarks for decision support systems. It is a tiny table holding data for 12 stores. Initially, before any statistics are gathered by a COMPUTE STATS statement, most of the numeric fields show placeholder values of -1, indicating that the figures are unknown. The figures that are filled in are values that are easily countable or deducible at the physical level, such as the number of files, total data size of the files, and the maximum and average sizes for data types that have a constant size such as INT, FLOAT, and TIMESTAMP.

[localhost:21000] > show table stats store;
+-------+--------+--------+--------+
| #Rows | #Files | Size   | Format |
+-------+--------+--------+--------+
| -1    | 1      | 3.08KB | TEXT   |
+-------+--------+--------+--------+
Returned 1 row(s) in 0.03s
[localhost:21000] > show column stats store;
+--------------------+-----------+------------------+--------+----------+----------+
| Column             | Type      | #Distinct Values | #Nulls | Max Size | Avg Size |
+--------------------+-----------+------------------+--------+----------+----------+
| s_store_sk         | INT       | -1               | -1     | 4        | 4        |
| s_store_id         | STRING    | -1               | -1     | -1       | -1       |
| s_rec_start_date   | TIMESTAMP | -1               | -1     | 16       | 16       |
| s_rec_end_date     | TIMESTAMP | -1               | -1     | 16       | 16       |
| s_closed_date_sk   | INT       | -1               | -1     | 4        | 4        |
| s_store_name       | STRING    | -1               | -1     | -1       | -1       |
| s_number_employees | INT       | -1               | -1     | 4        | 4        |
| s_floor_space      | INT       | -1               | -1     | 4        | 4        |
| s_hours            | STRING    | -1               | -1     | -1       | -1       |
| s_manager          | STRING    | -1               | -1     | -1       | -1       |
| s_market_id        | INT       | -1               | -1     | 4        | 4        |
| s_geography_class  | STRING    | -1               | -1     | -1       | -1       |
| s_market_desc      | STRING    | -1               | -1     | -1       | -1       |
| s_market_manager   | STRING    | -1               | -1     | -1       | -1       |
| s_division_id      | INT       | -1               | -1     | 4        | 4        |
| s_division_name    | STRING    | -1               | -1     | -1       | -1       |
| s_company_id       | INT       | -1               | -1     | 4        | 4        |
| s_company_name     | STRING    | -1               | -1     | -1       | -1       |
| s_street_number    | STRING    | -1               | -1     | -1       | -1       |
| s_street_name      | STRING    | -1               | -1     | -1       | -1       |
| s_street_type      | STRING    | -1               | -1     | -1       | -1       |
| s_suite_number     | STRING    | -1               | -1     | -1       | -1       |
| s_city             | STRING    | -1               | -1     | -1       | -1       |
| s_county           | STRING    | -1               | -1     | -1       | -1       |
| s_state            | STRING    | -1               | -1     | -1       | -1       |
| s_zip              | STRING    | -1               | -1     | -1       | -1       |
| s_country          | STRING    | -1               | -1     | -1       | -1       |
| s_gmt_offset       | FLOAT     | -1               | -1     | 4        | 4        |
| s_tax_precentage   | FLOAT     | -1               | -1     | 4        | 4        |
+--------------------+-----------+------------------+--------+----------+----------+
Returned 29 row(s) in 0.04s

With the Hive ANALYZE TABLE statement for column statistics, you had to specify each column for which to gather statistics. The Impala COMPUTE STATS statement automatically gathers statistics for all columns, because it reads through the entire table relatively quickly and can efficiently compute the values for all the columns. This example shows how after running the COMPUTE STATS statement, statistics are filled in for both the table and all its columns:

[localhost:21000] > compute stats store;
+------------------------------------------+
| summary                                  |
+------------------------------------------+
| Updated 1 partition(s) and 29 column(s). |
+------------------------------------------+
Returned 1 row(s) in 1.88s
[localhost:21000] > show table stats store;
+-------+--------+--------+--------+
| #Rows | #Files | Size   | Format |
+-------+--------+--------+--------+
| 12    | 1      | 3.08KB | TEXT   |
+-------+--------+--------+--------+
Returned 1 row(s) in 0.02s
[localhost:21000] > show column stats store;
+--------------------+-----------+------------------+--------+----------+-------------------+
| Column             | Type      | #Distinct Values | #Nulls | Max Size | Avg Size          |
+--------------------+-----------+------------------+--------+----------+-------------------+
| s_store_sk         | INT       | 12               | 0      | 4        | 4                 |
| s_store_id         | STRING    | 6                | 0      | 16       | 16                |
| s_rec_start_date   | TIMESTAMP | 4                | 0      | 16       | 16                |
| s_rec_end_date     | TIMESTAMP | 3                | 6      | 16       | 16                |
| s_closed_date_sk   | INT       | 3                | 9      | 4        | 4                 |
| s_store_name       | STRING    | 8                | 0      | 5        | 4.25              |
| s_number_employees | INT       | 9                | 0      | 4        | 4                 |
| s_floor_space      | INT       | 10               | 0      | 4        | 4                 |
| s_hours            | STRING    | 2                | 0      | 8        | 7.083300113677979 |
| s_manager          | STRING    | 7                | 0      | 15       | 12                |
| s_market_id        | INT       | 7                | 0      | 4        | 4                 |
| s_geography_class  | STRING    | 1                | 0      | 7        | 7                 |
| s_market_desc      | STRING    | 10               | 0      | 94       | 55.5              |
| s_market_manager   | STRING    | 7                | 0      | 16       | 14                |
| s_division_id      | INT       | 1                | 0      | 4        | 4                 |
| s_division_name    | STRING    | 1                | 0      | 7        | 7                 |
| s_company_id       | INT       | 1                | 0      | 4        | 4                 |
| s_company_name     | STRING    | 1                | 0      | 7        | 7                 |
| s_street_number    | STRING    | 9                | 0      | 3        | 2.833300113677979 |
| s_street_name      | STRING    | 12               | 0      | 11       | 6.583300113677979 |
| s_street_type      | STRING    | 8                | 0      | 9        | 4.833300113677979 |
| s_suite_number     | STRING    | 11               | 0      | 9        | 8.25              |
| s_city             | STRING    | 2                | 0      | 8        | 6.5               |
| s_county           | STRING    | 1                | 0      | 17       | 17                |
| s_state            | STRING    | 1                | 0      | 2        | 2                 |
| s_zip              | STRING    | 2                | 0      | 5        | 5                 |
| s_country          | STRING    | 1                | 0      | 13       | 13                |
| s_gmt_offset       | FLOAT     | 1                | 0      | 4        | 4                 |
| s_tax_precentage   | FLOAT     | 5                | 0      | 4        | 4                 |
+--------------------+-----------+------------------+--------+----------+-------------------+
Returned 29 row(s) in 0.04s

The following example shows how statistics are represented for a partitioned table. In this case, we have set up a table to hold the world's most trivial census data, a single STRING field, partitioned by a YEAR column. The table statistics include a separate entry for each partition, plus final totals for the numeric fields. The column statistics include some easily deducible facts for the partitioning column, such as the number of distinct values (the number of partition subdirectories) and the number of NULL values (none in this case).

localhost:21000] > describe census;
+------+----------+---------+
| name | type     | comment |
+------+----------+---------+
| name | string   |         |
| year | smallint |         |
+------+----------+---------+
Returned 2 row(s) in 0.02s
[localhost:21000] > show table stats census;
+-------+-------+--------+------+---------+
| year  | #Rows | #Files | Size | Format  |
+-------+-------+--------+------+---------+
| 2000  | -1    | 0      | 0B   | TEXT    |
| 2004  | -1    | 0      | 0B   | TEXT    |
| 2008  | -1    | 0      | 0B   | TEXT    |
| 2010  | -1    | 0      | 0B   | TEXT    |
| 2011  | 0     | 1      | 22B  | TEXT    |
| 2012  | -1    | 1      | 22B  | TEXT    |
| 2013  | -1    | 1      | 231B | PARQUET |
| Total | 0     | 3      | 275B |         |
+-------+-------+--------+------+---------+
Returned 8 row(s) in 0.02s
[localhost:21000] > show column stats census;
+--------+----------+------------------+--------+----------+----------+
| Column | Type     | #Distinct Values | #Nulls | Max Size | Avg Size |
+--------+----------+------------------+--------+----------+----------+
| name   | STRING   | -1               | -1     | -1       | -1       |
| year   | SMALLINT | 7                | 0      | 2        | 2        |
+--------+----------+------------------+--------+----------+----------+
Returned 2 row(s) in 0.02s

The following example shows how the statistics are filled in by a COMPUTE STATS statement in Impala.

[localhost:21000] > compute stats census;
+-----------------------------------------+
| summary                                 |
+-----------------------------------------+
| Updated 3 partition(s) and 1 column(s). |
+-----------------------------------------+
Returned 1 row(s) in 2.16s
[localhost:21000] > show table stats census;
+-------+-------+--------+------+---------+
| year  | #Rows | #Files | Size | Format  |
+-------+-------+--------+------+---------+
| 2000  | -1    | 0      | 0B   | TEXT    |
| 2004  | -1    | 0      | 0B   | TEXT    |
| 2008  | -1    | 0      | 0B   | TEXT    |
| 2010  | -1    | 0      | 0B   | TEXT    |
| 2011  | 4     | 1      | 22B  | TEXT    |
| 2012  | 4     | 1      | 22B  | TEXT    |
| 2013  | 1     | 1      | 231B | PARQUET |
| Total | 9     | 3      | 275B |         |
+-------+-------+--------+------+---------+
Returned 8 row(s) in 0.02s
[localhost:21000] > show column stats census;
+--------+----------+------------------+--------+----------+----------+
| Column | Type     | #Distinct Values | #Nulls | Max Size | Avg Size |
+--------+----------+------------------+--------+----------+----------+
| name   | STRING   | 4                | 1      | 5        | 4.5      |
| year   | SMALLINT | 7                | 0      | 2        | 2        |
+--------+----------+------------------+--------+----------+----------+
Returned 2 row(s) in 0.02s

For examples showing how some queries work differently when statistics are available, see Examples of Join Order Optimization. You can see how Impala executes a query differently in each case by observing the EXPLAIN output before and after collecting statistics. Measure the before and after query times, and examine the throughput numbers in before and after PROFILE output, to verify how much the improved plan speeds up performance.

Benchmarking Impala Queries

Because Impala, like other Hadoop components, is designed to handle large data volumes in a distributed environment, conduct any performance tests using realistic data and cluster configurations. Use a multi-node cluster rather than a single node; run queries against tables containing terabytes of data rather than tens of gigabytes. The parallel processing techniques used by Impala are most appropriate for workloads that are beyond the capacity of a single server.

When you run queries returning large numbers of rows, the CPU time to pretty-print the output can be substantial, giving an inaccurate measurement of the actual query time. Consider using the -B option on the impala-shell command to turn off the pretty-printing, and optionally the -o option to store query results in a file rather than printing to the screen. See impala-shell Command-Line Options for details.

Controlling Resource Usage

Sometimes, balancing raw query performance against scalability requires limiting the amount of resources, such as memory or CPU, used by a single query or group of queries. Impala can use several mechanisms that help to smooth out the load during heavy concurrent usage, resulting in faster overall query times and sharing of resources across Impala queries, MapReduce jobs, and other kinds of workloads across a CDH cluster:

  • The Impala admission control feature uses a fast, distributed mechanism to hold back queries that exceed limits on the number of concurrent queries or the amount of memory used. The queries are queued, and executed as other queries finish and resources become available. You can control the concurrency limits, and specify different limits for different groups of users to divide cluster resources according to the priorities of different classes of users. This feature is new in Impala 1.3, and works with both CDH 4 and CDH 5. See Admission Control and Query Queuing for details.
  • You can restrict the amount of memory Impala reserves during query execution by specifying the -mem_limit option for the impalad daemon. See Modifying Impala Startup Options for details. This limit applies only to the memory that is directly consumed by queries; Impala reserves additional memory at startup, for example to hold cached metadata.

  • For production deployment, Cloudera recommends that you implement resource isolation using mechanisms such as cgroups, which you can configure using Cloudera Manager. For details, see Managing Clusters with Cloudera Manager.

  • When you use Impala in combination with CDH 5, you can use the YARN resource management framework in combination with the Llama service, as explained in Using YARN Resource Management with Impala (CDH 5 Only).
      Warning: In CDH 5.0.0, the Llama component is in beta. It is intended for evaluation of resource management in test environments, in combination with Impala and YARN. It is currently not recommended for production deployment.

Testing Impala Performance

Test to ensure that Impala is configured for optimal performance. If you have installed Impala without Cloudera Manager, complete the processes described in this topic to help ensure a proper configuration. Even if you installed Impala with Cloudera Manager, which automatically applies appropriate configurations, these procedures can be used to verify that Impala is set up correctly.

Checking Impala Configuration Values

You can inspect Impala configuration values by connecting to your Impala server using a browser.

To check Impala configuration values:

  1. Use a browser to connect to one of the hosts running impalad in your environment. Connect using an address of the form http://hostname:port/varz.
      Note: In the preceding example, replace hostname and port with the name and port of your Impala server. The default port is 25000.
  2. Review the configured values.

    For example, to check that your system is configured to use block locality tracking information, you would check that the value for dfs.datanode.hdfs-blocks-metadata.enabled is true.

To check data locality:

  1. Execute a query on a dataset that is available across multiple nodes. For example, for a table named MyTable that has a reasonable chance of being spread across multiple DataNodes:
    [impalad-host:21000] > SELECT COUNT (*) FROM MyTable
  2. After the query completes, review the contents of the Impala logs. You should find a recent message similar to the following:
    Total remote scan volume = 0

The presence of remote scans may indicate impalad is not running on the correct nodes. This can be because some DataNodes do not have impalad running or it can be because the impalad instance that is starting the query is unable to contact one or more of the impalad instances.

To understand the causes of this issue:

  1. Connect to the debugging web server. By default, this server runs on port 25000. This page lists all impalad instances running in your cluster. If there are fewer instances than you expect, this often indicates some DataNodes are not running impalad. Ensure impalad is started on all DataNodes.
  2. If you are using multi-homed hosts, ensure that the Impala daemon's hostname resolves to the interface on which impalad is running. The hostname Impala is using is displayed when impalad starts. If you need to explicitly set the hostname, use the --hostname flag.
  3. Check that statestored is running as expected. Review the contents of the state store log to ensure all instances of impalad are listed as having connected to the state store.

Reviewing Impala Logs

You can review the contents of the Impala logs for signs that short-circuit reads or block location tracking are not functioning. Before checking logs, execute a simple query against a small HDFS dataset. Completing a query task generates log messages using current settings. Information on starting Impala and executing queries can be found in Starting Impala and Using the Impala Shell (impala-shell Command). Information on logging can be found in Using Impala Logging. Log messages and their interpretations are as follows:

Log Message

Interpretation

Unknown disk id. This will negatively affect performance. Check your hdfs settings to enable block location metadata

Tracking block locality is not enabled.

Unable to load native-hadoop library for your platform... using builtin-java classes where applicable

Native checksumming is not enabled.

Page generated September 3, 2015.