Today we are introducing a new series of blog posts that will take a look at recent enhancements to Apache Impala. Many of these are performance improvements, such as the feature described below which will give anywhere from a 2x to 7x performance improvement by taking better advantage of all the CPU cores. In addition, a lot of work has also been put into ensuring that Impala runs optimally in decoupled compute scenarios, where the data lives in object storage or remote HDFS. This is especially important now that more and more users are running containerized Impala clusters, such as what is offered in the Cloudera Data Warehouse (CDW) service.
Figure 1. Performance Improvements with Apache Impala’s New Multithreading Model (20 executors, mt_dop=12)
In this first post we will focus on work that was recently completed to expand the multithreading model used during query execution. But first, some context. Two of the key tenets of Impala’s design philosophy are:
From day one Impala has been able to break a query up and run it across multiple nodes – a true Massively Parallel Processing (MPP) engine. This is scale out at its best. A few years ago we added multithreaded scans, whereby many threads on each node scan the required data simultaneously. This further increased parallelism by adding the ability to scale up within a node, albeit for a limited number of operations. That brings us to today, where we are now able to multithread both joins and aggregations, letting us parallelize each part of query execution – both by scaling out and by scaling up. With this new change, the key operations in a query can be scaled vertically within a node if the input data is large enough (i.e. enough rows or distinct values.)
This is where the point about open file formats becomes relevant. No other cloud data warehouse engine currently offers native querying over open file formats stored in object stores with competitive performance on joins and aggregations. These are the common bottlenecks in analytic queries, and are notoriously difficult to optimize.
Most query engines achieve performance improvements at the join and aggregation level by taking advantage of tight coupling between the query layer and the storage layer. Their secret sauce usually results in proprietary formats and/or a requirement for local storage. So performance is improved at the expense of flexibility, interoperability, and cost. Impala is the first SQL engine that has effectively married this class of SQL optimizations with open file formats in the cloud storage context.
If you are a TL;DR person, here you go:
Read on if you want more details…
Impala’s existing plan generation proceeds in two phases: first a single-node plan is generated from analysis output; then that is transformed into a distributed plan. With this new multithreading model a third phase is added. The distributed plan, which is a tree of plan fragments, is transformed into a parallel plan, which is a tree of distributed plans. The distributed plans are connected by join build operators that set up a join operator in the parent plan for streaming in-memory execution, e.g. building in-memory hash tables for a hash join.
The following images show an example using TPC-H query 11. The first one is a distributed plan, where each box is fragment.
The second is the corresponding parallel plan, where:
Impala’s scheduler takes a parallel plan, which has been divided up into fragments, and determines how many instances of those fragments to run and on which nodes. The process is summarized below.
From the example above, if the partsupp table is divided into 32 evenly-sized remote scan ranges and run on 4 nodes with mt_dop=4, each node might be allocated 8 scan ranges (depending on data locality), which could then be divided between up to 4 instances of F6 per node. Each of these F6 instances is then executed by a different thread, utilizing 4 CPUs on each node in parallel. In the diagram below each fragment instance is represented by a white square, which illustrates both the scale out and the scale up of this scan operation.
It was a conscious decision to keep the tuning options for the new multithreading model simple. This is why the single mt_dop option was chosen. It determines the maximum degree of parallelism for a query, and Impala will automatically reduce the level of parallelism if the query is smaller. This option can be set at the server, resource pool, session and query level. In addition to giving users a knob to tune the level of intra-node parallelism, this has an important side effect of making scheduling easier and query latencies more predictable.
Introducing the mt_dop variable means that Impala queries can have very different CPU requirements. Therefore we have to factor this into admission control decisions to avoid underutilization as well as oversaturation. For example, more concurrent queries can be run with low dop than with high dop, since the cores are not oversubscribed.
Impala has a concept of “admission control slots” – the amount of parallelism that should be allowed on an impala daemon. This defaults to the number of processors, and can be overridden with –admission_control_slots. The admission control slot model offers the best path forward for admission control and multithreading.
A query running on an executor will consume slots based on the effective dop, which is computed as the maximum number of instances of a fragment running on the executor. E.g. if there are 2 instances of F0 and 4 instances of F1, the effective dop is 4.
It is desirable in some cases to oversubscribe Impala to maximise CPU utilization, or, on the flipside, to achieve predictability by avoiding oversubscription. This trade-off can be managed by changing –admission_control_slots to be greater than, or equal to the number of cores on the system.
We have sought to minimize or eliminate the additional impact to CPU, memory, and network related to overhead from this new multithreading model. Some notable points are below:
In this section we look at a few examples of the impact that this new multithreading model has on various steps of the execution process. This gives an idea of implementation details, as well as the work that is done to minimize the amount of CPU and memory overhead required to use the multithreading model.
The Grouping Aggregation is parallelized by a) replicating the pre-aggregation in each fragment instance, and b) partitioning the input to the merge aggregation across all fragment instances.
For the Broadcast Hash Join, the parallelization is based on a separation of the build and probe phases into distinct plans. One builds the data structures needed for the Hash Join and Nested Loop Join, and one probes them and produces result rows.
One consequence of this is a significant increase in the number of fragment instances participating in each exchange. For example a hash exchange for a partitioned join previously would have data streams from at most # nodes instances; now it has data streams from # nodes * dop instances. To offset this KRPC will multiplex all these logical streams onto a single point-to-point connection between each pair of nodes, which avoids many potential scalability problems.
Analytic functions depend on the PARTITION BY clause for parallelism. Input rows are hash-partitioned across instances, then evaluation of each partition is done independently. Potential speedup from multithreading is only possible if there are more partitions than nodes. If that is the case, then linear speedup can be achieved since evaluation of each partition is independent.
Runtime code generation in Impala was historically done for each fragment instance. This approach could still work with the new multithreading model, but it would be wasteful – it redundantly generates the code dop times per fragment instance. Instead, we have refactored code generation so that it is done once for all instances of the same fragment on an impala daemon. Other improvements to minimize the impact of runtime code generation include the recently implemented asynchronous codegen improvement (IMPALA-5444), as well as the work on codegen caching that is currently in progress.
Runtime filters are currently generated by the relevant fragment instances, then sent to the Coordinator, which then sends them independently to each fragment on all relevant nodes. Due to multithreading, the number of filters for partitioned joins will increase by a factor of dop, potentially resulting in the Coordinator becoming a bottleneck. To prevent this, we aggregate partitioned join filters locally on each node before sending to the Coordinator so that the total amount of work done on the coordinator does not depend on the degree of parallelism.
To drill down into how multithreading may impact your query performance we took query 84 from the TPC-DS benchmark below as an example. This query involves joining two large fact tables together with four dimension tables. The query plan graph below indicates that the brightly colored operations take the largest part of the query execution time. In this case the bulk of the time is dedicated to performing join operations, which means the query is likely more bottlenecked by CPU than I/O, making it a good candidate for multithreaded execution.
To see how well this query might scale up with more CPU cores we ran it with a dop equal to 1 and then subsequently ramped up the configured parallelism. This test was done on the CDW service, using instances with 16 vCPUs (r5d.4xlarge) querying a 10TB TPC-DS dataset stored as Apache Parquet files in S3.
Degree of Parallelism | Runtime (sec) |
mt_dop=1 | 151.85 |
mt_dop=2 | 73.48 |
mt_dop=4 | 38.03 |
mt_dop=8 | 22.88 |
mt_dop=12 | 20.07 |
Using the runtime from a dop of one as the baseline we can see a near-linear increase in performance up to a degree of 4. However, we have a slight tapering off at 8 and with a parallelism of 12, we aren’t doing better than a 7.5x performance improvement. Many things can be a factor for the diminishing returns, including data skew, 16 virtual versus 8 physical CPUs, or other implementation inefficiencies discussed in later sections. We believe this is a great start and we see many opportunities to continue improving the parallel scalability of Impala. We know picking the correct degree of parallelism can be a nuisance for the end-user or Impala administrator, so we plan to work on automatically determining the best value during query execution in future releases.
In an internal TPC-DS benchmark analyzing performance improvements resulting from this new multithreading model (comparing mt_dop=1 vs mt_dop=12), we saw:
The chart below shows the 36 queries that had 2x or more runtime improvements to give examples of where we saw the biggest gains.
Some important notes about this testing are worth mentioning:
This new multithreading model will provide the biggest benefit in the following scenarios:
On the flipside, one area where you can expect to see less improvement is in scan-intensive queries, such as searching string columns with the LIKE operator or performing a regexp_extract. Because scans were already multithreaded, there are not as many more CPU utilization gains to be had.
This optimization becomes even more important when running Impala in the cloud context, where the compute cluster can automatically start and stop itself, or grow and shrink. When your workload runs faster…
Furthermore, if you can utilize a greater number of the available cores for a given workload, you might be able to use a smaller number of compute nodes. This reduces the overall costs.
Finally, given the ease of provisioning Impala clusters in the cloud – which is important for workload isolation and self-service agility – it is common to see a larger number of clusters dedicated to a single workload which are likely to not be fully utilized at the CPU level. This optimization ensures that even in this scenario the workload can achieve high levels of utilization.
To recap… Impala now has the ability to multithread some of the most heavyweight operations in analytic query use cases – namely the joins and aggregations. And we have done this without sacrificing the ability to use open source file formats and without requiring colocated storage. This will result in much higher CPU utilization, faster query times, and lower cloud costs.
You can try this out yourself via Cloudera’s CDP trial experience.
Go to the Cloudera engineering blog to learn more about Impala’s performance and architecture.
Special thanks goes out to the many people who built this new multithreading model. Tim Armstrong, Joe McDonnell, Bikramjeet Vig, David Rorke, and Kurt Deschler worked on the most recent phases of the project. Other Impala committers and contributors who did foundational work for scalability and multithreading include Marcel Kornacker and Alex Behm.
# Q11 - Important Stock Identification select * from ( select ps_partkey, sum(ps_supplycost * ps_availqty) as value from tpch.partsupp, tpch.supplier, tpch.nation where ps_suppkey = s_suppkey and s_nationkey = n_nationkey and n_name = 'GERMANY' group by ps_partkey ) as inner_query where value > ( select sum(ps_supplycost * ps_availqty) * 0.0001 from tpch.partsupp, tpch.supplier, tpch.nation where ps_suppkey = s_suppkey and s_nationkey = n_nationkey and n_name = 'GERMANY' ) order by value desc
This may have been caused by one of the following: