Runtime Filtering for Impala Queries (CDH 5.7 or higher only)
Runtime filtering is a wide-ranging optimization feature available in CDH 5.7 / Impala 2.5 and higher. When only a fraction of the data in a table is needed for a query against a partitioned table or to evaluate a join condition, Impala determines the appropriate conditions while the query is running, and broadcasts that information to all the impalad nodes that are reading the table so that they can avoid unnecessary I/O to read partition data, and avoid unnecessary network transmission by sending only the subset of rows that match the join keys across the network.
This feature is primarily used to optimize queries against large partitioned tables (under the name dynamic partition pruning) and joins of large tables. The information in this section includes concepts, internals, and troubleshooting information for the entire runtime filtering feature. For specific tuning steps for partitioned tables, see Dynamic Partition Pruning.
- Background Information for Runtime Filtering
- Runtime Filtering Internals
- File Format Considerations for Runtime Filtering
- Wait Intervals for Runtime Filters
- Query Options for Runtime Filtering
- Runtime Filtering and Query Plans
- Examples of Queries that Benefit from Runtime Filtering
- Tuning and Troubleshooting Queries that Use Runtime Filtering
- Limitations and Restrictions for Runtime Filtering
Background Information for Runtime Filtering
To understand how runtime filtering works at a detailed level, you must be familiar with some terminology from the field of distributed database technology:
What a plan fragment is. Impala decomposes each query into smaller units of work that are distributed across the cluster. Wherever possible, a data block is read, filtered, and aggregated by plan fragments executing on the same host. For some operations, such as joins and combining intermediate results into a final result set, data is transmitted across the network from one DataNode to another.
What SCAN and HASH JOIN plan nodes are, and their role in computing query results:
In the Impala query plan, a scan node performs the I/O to read from the underlying data files. Although this is an expensive operation from the traditional database perspective, Hadoop clusters and Impala are optimized to do this kind of I/O in a highly parallel fashion. The major potential cost savings come from using the columnar Parquet format (where Impala can avoid reading data for unneeded columns) and partitioned tables (where Impala can avoid reading data for unneeded partitions).
Most Impala joins use the hash join mechanism. (It is only fairly recently that Impala started using the nested-loop join technique, for certain kinds of non-equijoin queries.) In a hash join, when evaluating join conditions from two tables, Impala constructs a hash table in memory with all the different column values from the table on one side of the join. Then, for each row from the table on the other side of the join, Impala tests whether the relevant column values are in this hash table or not.
A hash join node constructs such an in-memory hash table, then performs the comparisons to identify which rows match the relevant join conditions and should be included in the result set (or at least sent on to the subsequent intermediate stage of query processing). Because some of the input for a hash join might be transmitted across the network from another host, it is especially important from a performance perspective to prune out ahead of time any data that is known to be irrelevant.
The more distinct values are in the columns used as join keys, the larger the in-memory hash table and thus the more memory required to process the query.
The difference between a broadcast join and a shuffle join. (The Hadoop notion of a shuffle join is sometimes referred to in Impala as a partitioned join.) In a broadcast join, the table from one side of the join (typically the smaller table) is sent in its entirety to all the hosts involved in the query. Then each host can compare its portion of the data from the other (larger) table against the full set of possible join keys. In a shuffle join, there is no obvious "smaller" table, and so the contents of both tables are divided up, and corresponding portions of the data are transmitted to each host involved in the query. See Query Hints in Impala SELECT Statements for information about how these different kinds of joins are processed.
The notion of the build phase and probe phase when Impala processes a join query. The build phase is where the rows containing the join key columns, typically for the smaller table, are transmitted across the network and built into an in-memory hash table data structure on one or more destination nodes. The probe phase is where data is read locally (typically from the larger table) and the join key columns are compared to the values in the in-memory hash table. The corresponding input sources (tables, subqueries, and so on) for these phases are referred to as the build side and the probe side.
How to set Impala query options: interactively within an impala-shell session through the SET command, for a JDBC or ODBC application through the SET statement, or globally for all impalad daemons through the default_query_options configuration setting.
Runtime Filtering Internals
The filter that is transmitted between plan fragments is essentially a list of values for join key columns. When this list is values is transmitted in time to a scan node, Impala can filter out non-matching values immediately after reading them, rather than transmitting the raw data to another host to compare against the in-memory hash table on that host. This data structure is implemented as a Bloom filter, which uses a probability-based algorithm to determine all possible matching values. (The probability-based aspects means that the filter might include some non-matching values, but if so, that does not cause any inaccuracy in the final results.)
There are different kinds of filters to match the different kinds of joins (partitioned and broadcast). A broadcast filter is a complete list of relevant values that can be immediately evaluated by a scan node. A partitioned filter is a partial list of relevant values (based on the data processed by one host in the cluster); all the partitioned filters must be combined into one (by the coordinator node) before the scan nodes can use the results to accurately filter the data as it is read from storage.
Broadcast filters are also classified as local or global. With a local broadcast filter, the information in the filter is used by a subsequent query fragment that is running on the same host that produced the filter. A non-local broadcast filter must be transmitted across the network to a query fragment that is running on a different host. Impala designates 3 hosts to each produce non-local broadcast filters, to guard against the possibility of a single slow host taking too long. Depending on the setting of the RUNTIME_FILTER_MODE query option (LOCAL or GLOBAL), Impala either uses a conservative optimization strategy where filters are only consumed on the same host that produced them, or a more aggressive strategy where filters are eligible to be transmitted across the network.
File Format Considerations for Runtime Filtering
Parquet tables get the most benefit from the runtime filtering optimizations. Runtime filtering can speed up join queries against partitioned or unpartitioned Parquet tables, and single-table queries against partitioned Parquet tables. See Using the Parquet File Format with Impala Tables for information about using Parquet tables with Impala.
For other file formats (text, Avro, RCFile, and SequenceFile), runtime filtering speeds up queries against partitioned tables only. Because partitioned tables can use a mixture of formats, Impala produces the filters in all cases, even if they are not ultimately used to optimize the query.
Wait Intervals for Runtime Filters
Because it takes time to produce runtime filters, especially for partitioned filters that must be combined by the coordinator node, there is a time interval above which it is more efficient for the scan nodes to go ahead and construct their intermediate result sets, even if that intermediate data is larger than optimal. If it only takes a few seconds to produce the filters, it is worth the extra time if pruning the unnecessary data can save minutes in the overall query time. You can specify the maximum wait time in milliseconds using the RUNTIME_FILTER_WAIT_TIME_MS query option.
By default, each scan node waits for up to 1 second (1000 milliseconds) for filters to arrive. If all filters have not arrived within the specified interval, the scan node proceeds, using whatever filters did arrive to help avoid reading unnecessary data. If a filter arrives after the scan node begins reading data, the scan node applies that filter to the data that is read after the filter arrives, but not to the data that was already read.
If the cluster is relatively busy and your workload contains many resource-intensive or long-running queries, consider increasing the wait time so that complicated queries do not miss opportunities for optimization. If the cluster is lightly loaded and your workload contains many small queries taking only a few seconds, consider decreasing the wait time to avoid the 1 second delay for each query.
Query Options for Runtime Filtering
See the following sections for information about the query options that control runtime filtering:
The first query option adjusts the "sensitivity" of this feature. By default, it is set to the highest level (GLOBAL). (This default applies to CDH 5.8 / Impala 2.6 and higher. In previous releases, the default was LOCAL.)
The other query options are tuning knobs that you typically only adjust after doing performance testing, and that you might want to change only for the duration of a single expensive query:
RUNTIME_BLOOM_FILTER_SIZE Query Option (CDH 5.7 or higher only); in CDH 5.8 / Impala 2.6 and higher, this setting acts as a fallback when statistics are not available, rather than as a directive.
Runtime Filtering and Query Plans
In the same way the query plan displayed by the EXPLAIN statement includes information about predicates used by each plan fragment, it also includes annotations showing whether a plan fragment produces or consumes a runtime filter. A plan fragment that produces a filter includes an annotation such as runtime filters: filter_id <- table.column, while a plan fragment that consumes a filter includes an annotation such as runtime filters: filter_id -> table.column.
The following example shows a query that uses a single runtime filter (labelled RF00) to prune the partitions that are scanned in one stage of the query, based on evaluating the result set of a subquery:
create table yy (s string) partitioned by (year int) stored as parquet; insert into yy partition (year) values ('1999', 1999), ('2000', 2000), ('2001', 2001), ('2010',2010); compute stats yy; create table yy2 (s string) partitioned by (year int) stored as parquet; insert into yy2 partition (year) values ('1999', 1999), ('2000', 2000), ('2001', 2001); compute stats yy2; -- The query reads an unknown number of partitions, whose key values are only -- known at run time. The 'runtime filters' lines show how the information about -- the partitions is calculated in query fragment 02, and then used in query -- fragment 00 to decide which partitions to skip. explain select s from yy2 where year in (select year from yy where year between 2000 and 2005); +----------------------------------------------------------+ | Explain String | +----------------------------------------------------------+ | Estimated Per-Host Requirements: Memory=16.00MB VCores=2 | | | | 04:EXCHANGE [UNPARTITIONED] | | | | | 02:HASH JOIN [LEFT SEMI JOIN, BROADCAST] | | | hash predicates: year = year | | | runtime filters: RF000 <- year | | | | | |--03:EXCHANGE [BROADCAST] | | | | | | | 01:SCAN HDFS [dpp.yy] | | | partitions=2/4 files=2 size=468B | | | | | 00:SCAN HDFS [dpp.yy2] | | partitions=2/3 files=2 size=468B | | runtime filters: RF000 -> year | +----------------------------------------------------------+
The query profile (displayed by the PROFILE command in impala-shell) contains both the EXPLAIN plan and more detailed information about the internal workings of the query. The profile output includes a section labelled the "filter routing table", with information about each filter based on its ID.
Examples of Queries that Benefit from Runtime Filtering
In this example, Impala would normally do extra work to interpret the columns C1, C2, C3, and ID for each row in HUGE_T1, before checking the ID value against the in-memory hash table constructed from all the TINY_T2.ID values. By producing a filter containing all the TINY_T2.ID values even before the query starts scanning the HUGE_T1 table, Impala can skip the unnecessary work to parse the column info as soon as it determines that an ID value does not match any of the values from the other table.
The example shows COMPUTE STATS statements for both the tables (even though that is a one-time operation after loading data into those tables) because Impala relies on up-to-date statistics to determine which one has more distinct ID values than the other. That information lets Impala make effective decisions about which table to use to construct the in-memory hash table, and which table to read from disk and compare against the entries in the hash table.
COMPUTE STATS huge_t1; COMPUTE STATS tiny_t2; SELECT c1, c2, c3 FROM huge_t1 JOIN tiny_t2 WHERE huge_t1.id = tiny_t2.id;
In this example, T1 is a table partitioned by year. The subquery on T2 produces multiple values, and transmits those values as a filter to the plan fragments that are reading from T1. Any non-matching partitions in T1 are skipped.
select c1 from t1 where year in (select distinct year from t2);
Now the WHERE clause contains an additional test that does not apply to the partition key column. A filter on a column that is not a partition key is called a per-row filter. Because per-row filters only apply for Parquet, T1 must be a Parquet table.
The subqueries result in two filters being transmitted to the scan nodes that read from T1. The filter on YEAR helps the query eliminate entire partitions based on non-matching years. The filter on C2 lets Impala discard rows with non-matching C2 values immediately after reading them. Without runtime filtering, Impala would have to keep the non-matching values in memory, assemble C1, C2, and C3 into rows in the intermediate result set, and transmit all the intermediate rows back to the coordinator node, where they would be eliminated only at the very end of the query.
select c1, c2, c3 from t1 where year in (select distinct year from t2) and c2 in (select other_column from t3);
This example involves a broadcast join. The fact that the ON clause would return a small number of matching rows (because there are not very many rows in TINY_T2) means that the corresponding filter is very selective. Therefore, runtime filtering will probably be effective in optimizing this query.
select c1 from huge_t1 join [broadcast] tiny_t2 on huge_t1.id = tiny_t2.id where huge_t1.year in (select distinct year from tiny_t2) and c2 in (select other_column from t3);
This example involves a shuffle or partitioned join. Assume that most rows in HUGE_T1 have a corresponding row in HUGE_T2. The fact that the ON clause could return a large number of matching rows means that the corresponding filter would not be very selective. Therefore, runtime filtering might be less effective in optimizing this query.
select c1 from huge_t1 join [shuffle] huge_t2 on huge_t1.id = huge_t2.id where huge_t1.year in (select distinct year from huge_t2) and c2 in (select other_column from t3);
Tuning and Troubleshooting Queries that Use Runtime Filtering
These tuning and troubleshooting procedures apply to queries that are resource-intensive enough, long-running enough, and frequent enough that you can devote special attention to optimizing them individually.
Use the EXPLAIN statement and examine the runtime filters: lines to determine whether runtime filters are being applied to the WHERE predicates and join clauses that you expect. For example, runtime filtering does not apply to queries that use the nested loop join mechanism due to non-equijoin operators.
Make sure statistics are up-to-date for all tables involved in the queries. Use the COMPUTE STATS statement after loading data into non-partitioned tables, and COMPUTE INCREMENTAL STATS after adding new partitions to partitioned tables.
If join queries involving large tables use unique columns as the join keys, for example joining a primary key column with a foreign key column, the overhead of producing and transmitting the filter might outweigh the performance benefit because not much data could be pruned during the early stages of the query. For such queries, consider setting the query option RUNTIME_FILTER_MODE=OFF.
Limitations and Restrictions for Runtime Filtering
The runtime filtering feature is most effective for the Parquet file formats. For other file formats, filtering only applies for partitioned tables. See File Format Considerations for Runtime Filtering.
When the spill-to-disk mechanism is activated on a particular host during a query, that host does not produce any filters while processing that query. This limitation does not affect the correctness of results; it only reduces the amount of optimization that can be applied to the query.