Using Apache Impala (incubating) with Kudu
Apache Kudu has tight integration with Apache Impala (incubating), allowing you to use Impala to insert, query, update, and delete data from Kudu tablets using Impala's SQL syntax, as an alternative to using the Kudu APIs to build a custom Kudu application. In addition, you can use JDBC or ODBC to connect existing or new applications written in any language, framework, or business intelligence tool to your Kudu data, using Impala as the broker.
To use Impala to query Kudu data as described in this topic, you will require Cloudera Manager 5.10.x and CDH 5.10.x or later.
The syntax described in this topic is specific to Impala 2.8 (ships with CDH 5.10) and above, and will not work on previous versions. If you are using an earlier version of Impala (including the IMPALA_KUDU releases previously available), upgrade to Impala 2.8.
Note that this topic does not describe Impala installation or upgrade procedures. Refer to the Impala documentation to make sure you are able to run queries against Impala tables on HDFS before proceeding.
Lower versions of CDH and Cloudera Manager used an experimental fork of Impala which is referred to as IMPALA_KUDU. If you have previously installed the IMPALA_KUDU service, make sure you remove it from your cluster before you proceed. Install Kudu 1.2.x (or later) using either Cloudera Manager or the command-line.
Impala Database Containment Model
CREATE DATABASE impala_kudu; USE impala_kudu; CREATE TABLE my_first_table ( ...The my_first_table table is created within the impala_kudu database. To refer to this database in the future, without using a specific USE statement, you can refer to the table using <database>:<table> syntax. For example, to specify the my_first_table table in database impala_kudu, as opposed to any other table with the same name in another database, refer to the table as impala_kudu:my_first_table. This also applies to INSERT, UPDATE, DELETE, and DROP statements.
Internal and External Impala Tables
- An internal table (created by CREATE TABLE) is managed by Impala, and can be dropped by Impala. When you create a new table using Impala, it is generally a internal table. When such a table is created in Impala, the corresponding Kudu table will be named my_database::table_name.
- An external table (created by CREATE EXTERNAL TABLE) is not managed by Impala, and dropping such a table does not drop the table from its source location (here, Kudu). Instead, it only removes the mapping between Impala and Kudu. This is the mode used in the syntax provided by Kudu for mapping an existing table to Impala.
See the Impala documentation for more information about internal and external tables.
Using Impala To Query Kudu Tables
Neither Kudu nor Impala need special configuration in order for you to use the Impala Shell or the Impala API to insert, update, delete, or query Kudu data using Impala. However, you do need to create a mapping between the Impala and Kudu tables. Kudu provides the Impala query to map to an existing Kudu table in the web UI.
- Make sure you are using the impala-shell binary provided by the default CDH Impala binary. The following example shows how you can verify this using the
alternatives command on a RHEL 6 host. Do not copy and paste the alternatives --set command directly, because the file names are likely
$ sudo alternatives --display impala-shell impala-shell - status is auto. link currently points to /opt/cloudera/parcels/CDH-5.10.0-1.cdh5.10.0.p0.25/bin/impala-shell /opt/cloudera/parcels/CDH-5.10.0-1.cdh5.10.0.p0.25/bin/impala-shell - priority 10 Current `best' version is /opt/cloudera/parcels/CDH-5.10.0-1.cdh5.10.0.p0.25/bin/impala-shell.
- Although not necessary, it is recommended that you configure Impala with the locations of the Kudu Masters using the --kudu_master_hosts=<master1>[:port] flag. If this flag is not set, you will need to manually provide this configuration each time you create a table by specifying the
kudu_master_addresses property inside a TBLPROPERTIES clause. If you are using Cloudera Manager, no such configuration is needed. The
Impala service will automatically recognize the Kudu
The rest of this guide assumes that this configuration has been set.
- Start Impala Shell using the impala-shell command. By default, impala-shell attempts to connect to the Impala daemon on
localhost on port 21000. To connect to a different host, use the -i <host:port> option.
To automatically connect to a specific Impala database, use the -d <database> option. For instance, if all your Kudu tables are in Impala in the database impala_kudu, use -d impala_kudu to use this database.
- To quit the Impala Shell, use the following command: quit;
Querying an Existing Kudu Table from Impala
CREATE EXTERNAL TABLE my_mapping_table STORED AS KUDU TBLPROPERTIES ( 'kudu.table_name' = 'my_kudu_table' );
Creating a New Kudu Table From Impala
Creating a new table in Kudu from Impala is similar to mapping an existing Kudu table to an Impala table, except that you need to specify the schema and partitioning information yourself. Use the following example as a guideline. Impala first creates the table, then creates the mapping.
In the CREATE TABLE statement, the columns that comprise the primary key must be listed first. Additionally, primary key columns are implicitly considered NOT NULL.
When creating a new table in Kudu, you must define a partition schema to pre-split your table. The best partition schema to use depends upon the structure of your data and your data access patterns. The goal is to maximize parallelism and use all your tablet servers evenly. For more information on partition schemas, see Partitioning Tables.
The following CREATE TABLE example distributes the table into 16 partitions by hashing the id column, for simplicity.
CREATE TABLE my_first_table ( id BIGINT, name STRING, PRIMARY KEY(id) ) PARTITION BY HASH PARTITIONS 16 STORED AS KUDU;
CREATE TABLE AS SELECT
You can create a table by querying any other table or tables in Impala, using a CREATE TABLE ... AS SELECT statement. The following example imports all rows from an existing table, old_table, into a new Kudu table, new_table. The columns in new_table will have the same names and types as the columns in old_table, but you will need to additionally specify the primary key and partitioning schema.
CREATE TABLE new_table PRIMARY KEY (ts, name) PARTITION BY HASH(name) PARTITIONS 8 STORED AS KUDU AS SELECT ts, name, value FROM old_table;
You can refine the SELECT statement to only match the rows and columns you want to be inserted into the new table. You can also rename the columns by using syntax like SELECT name as new_col_name.
Tables are partitioned into tablets according to a partition schema on the primary key columns. Each tablet is served by at least one tablet server. Ideally, a table should be split into tablets that are distributed across a number of tablet servers to maximize parallel operations. The details of the partitioning schema you use will depend entirely on the type of data you store and how you access it.
Kudu currently has no mechanism for splitting or merging tablets after the table has been created. Until this feature has been implemented, you must provide a partition schema for your table when you create it. When designing your tables, consider using primary keys that will allow you to partition your table into tablets which grow at similar rates
You can partition your table using Impala's PARTITION BY clause, which supports distribution by RANGE or HASH. The partition scheme can contain zero or more HASH definitions, followed by an optional RANGE definition. The RANGE definition can refer to one or more primary key columns. Examples of basic and advanced partitioning are shown below.
Monotonically Increasing Values - If you partition by range on a column whose values are monotonically increasing, the last tablet will grow much larger than the others. Additionally, all data being inserted will be written to a single tablet at a time, limiting the scalability of data ingest. In that case, consider distributing by HASH instead of, or in addition to, RANGE.
PARTITION BY RANGE
You can specify range partitions for one or more primary key columns. Range partitioning in Kudu allows splitting a table based based on specific values or ranges of values of the chosen partition keys. This allows you to balance parallelism in writes with scan efficiency.
For instance, if you have a table that has the columns state, name, and purchase_count, and you partition the table by state, it will create 50 tablets, one for each US state.
CREATE TABLE customers ( state STRING, name STRING, purchase_count int, PRIMARY KEY (state, name) ) PARTITION BY RANGE (state) ( PARTITION VALUE = 'al', PARTITION VALUE = 'ak', PARTITION VALUE = 'ar', ... ... PARTITION VALUE = 'wv', PARTITION VALUE = 'wy' ) STORED AS KUDU;
PARTITION BY HASH
Instead of distributing by an explicit range, or in combination with range distribution, you can distribute into a specific number of partitions by hash. You specify the primary key columns you want to partition by, and the number of partitions you want to use. Rows are distributed by hashing the specified key columns. Assuming that the values being hashed do not themselves exhibit significant skew, this will serve to distribute the data evenly across all partitions.
- HASH(a), HASH(b) -- will succeed
- HASH(a,b) -- will succeed
- HASH(a), HASH(a,b) -- will fail
Hash partitioning is a reasonable approach if primary key values are evenly distributed in their domain and no data skew is apparent, such as timestamps or serial IDs.
CREATE TABLE cust_behavior ( id BIGINT, sku STRING, salary STRING, edu_level INT, usergender STRING, `group` STRING, city STRING, postcode STRING, last_purchase_price FLOAT, last_purchase_date BIGINT, category STRING, rating INT, fulfilled_date BIGINT, PRIMARY KEY (id, sku) ) PARTITION BY HASH PARTITIONS 16 STORED AS KUDU;
You can combine HASH and RANGE partitioning to create more complex partition schemas. You can also specify zero or more HASH definitions, followed by zero or one RANGE definitions. Each schema definition can encompass one or more columns. While enumerating every possible distribution schema is out of the scope of this topic, the following examples illustrate some of the possibilities.
PARTITION BY HASH and RANGE
Consider the basic PARTITION BY HASH example above. If you often query for a range of sku values, you can optimize the example by combining hash partitioning with range partitioning.
The following example still creates 16 tablets, by first hashing the id column into 4 partitions, and then applying range partitioning to split each partition into four tablets, based upon the value of the sku string. At least four tablets (and possibly up to 16) can be written to in parallel, and when you query for a contiguous range of sku values, there's a good chance you only need to read a quarter of the tablets to fulfill the query.By default, the entire primary key (id, sku) will be hashed when you use PARTITION BY HASH. To hash on only part of the primary key, and use a range partition on the rest, use the syntax demonstrated below.
CREATE TABLE cust_behavior ( id BIGINT, sku STRING, salary STRING, edu_level INT, usergender STRING, `group` STRING, city STRING, postcode STRING, last_purchase_price FLOAT, last_purchase_date BIGINT, category STRING, rating INT, fulfilled_date BIGINT, PRIMARY KEY (id, sku) ) PARTITION BY HASH (id) PARTITIONS 4, RANGE (sku) ( PARTITION VALUES < 'g', PARTITION 'g' <= VALUES < 'o', PARTITION 'o' <= VALUES < 'u', PARTITION 'u' <= VALUES ) STORED AS KUDU;
Multiple PARTITION BY HASH Definitions
CREATE TABLE cust_behavior ( id BIGINT, sku STRING, salary STRING, edu_level INT, usergender STRING, `group` STRING, city STRING, postcode STRING, last_purchase_price FLOAT, last_purchase_date BIGINT, category STRING, rating INT, fulfilled_date BIGINT, PRIMARY KEY (id, sku) ) PARTITION BY HASH (id) PARTITIONS 4, HASH (sku) PARTITIONS 4 STORED AS KUDU;The example creates 16 partitions. You could also use HASH (id, sku) PARTITIONS 16. However, a scan for sku values would almost always impact all 16 partitions, rather than possibly being limited to 4.
Non-Covering Range Partitions
- In the case of time-series data or other schemas which need to account for constantly-increasing primary keys, tablets serving old data will be relatively fixed in size, while tablets receiving new data will grow without bounds.
- In cases where you want to partition data based on its category, such as sales region or product type, without non-covering range partitions you must know all of the partitions ahead of time or manually recreate your table if partitions need to be added or removed, such as the introduction or elimination of a product type.
CREATE TABLE sales_by_year ( year INT, sale_id INT, amount INT, PRIMARY KEY (sale_id, year) ) PARTITION BY RANGE (year) ( PARTITION VALUE = 2012, PARTITION VALUE = 2013, PARTITION VALUE = 2014, PARTITION VALUE = 2015, PARTITION VALUE = 2016 ) STORED AS KUDU;
ALTER TABLE sales_by_year ADD RANGE PARTITION VALUE = 2017;In use cases where a rolling window of data retention is required, range partitions may also be dropped. For example, if data from 2012 should no longer be retained, it may be deleted in bulk:
ALTER TABLE sales_by_year DROP RANGE PARTITION VALUE = 2012;Note that just like dropping a table, this irrecoverably deletes all data stored in the dropped partition.
- For large tables, such as fact tables, aim for as many tablets as you have cores in the cluster.
- For small tables, such as dimension tables, aim for a large enough number of tablets that each tablet is at least 1 GB in size.
Optimizing Performance for Evaluating SQL Predicates
If the WHERE clause of your query includes comparisons with the operators =, <=, <, >, >=, BETWEEN, or IN, Kudu evaluates the condition directly and only returns the relevant results. This provides optimum performance, because Kudu only returns the relevant results to Impala.
For predicates such as !=, LIKE, or any other predicate type supported by Impala, Kudu does not evaluate the predicates directly. Instead, it returns all results to Impala and relies on Impala to evaluate the remaining predicates and filter the results accordingly. This may cause differences in performance, depending on the delta of the result set before and after evaluating the WHERE clause. In some cases, creating and periodically updating materialized views may be the right solution to work around these inefficiencies.
Inserting a Row
INSERT INTO my_first_table VALUES (99, "sarah"); INSERT INTO my_first_table VALUES (1, "john"), (2, "jane"), (3, "jim");
The primary key must not be null.
Inserting In Bulk
- Multiple Single INSERT statements
- This approach has the advantage of being easy to understand and implement. This approach is likely to be inefficient because Impala has a high query start-up cost compared to Kudu's insertion performance. This will lead to relatively high latency and poor throughput.
- Single INSERT statement with multiple VALUES subclauses
- If you include more than 1024 VALUES statements, Impala batches them into groups of 1024 (or the value of batch_size)
before sending the requests to Kudu. This approach may perform slightly better than multiple sequential INSERT statements by amortizing the query start-up penalties on
the Impala side. To set the batch size for the current Impala Shell session, use the following syntax:
- Batch Insert
- The approach that usually performs best, from the standpoint of both Impala and Kudu, is usually to import the data using a SELECT FROM subclause in
- If your data is not already in Impala, one strategy is to import it from a text file, such as a TSV or CSV file.
- Create the Kudu table, being mindful that the columns designated as primary keys cannot have null values.
- Insert values into the Kudu table by querying the table containing the original data, as in the following example:
INSERT INTO my_kudu_table SELECT * FROM legacy_data_import_table;
- Ingest using the C++ or Java API
- In many cases, the appropriate ingest path is to use the C++ or Java API to insert directly into Kudu tables. Unlike other Impala tables, data inserted into Kudu tables using the API becomes available for query in Impala without the need for any INVALIDATE METADATA statements or other statements needed for other Impala storage types.
INSERT and Primary Key Uniqueness Violations
In many relational databases, if you try to insert a row that has already been inserted, the insertion will fail because the primary key will be duplicated (see Failures During INSERT, UPDATE, UPSERT, and DELETE Operations). Impala, however, does not fail the query. Instead, it will generate a warning and continue to execute the remainder of the insert statement.
INSERT INTO my_first_table VALUES (99, "sarah"); UPSERT INTO my_first_table VALUES (99, "zoe");The current value of the row is now zoe.
Updating a Row
UPDATE my_first_table SET name="bob" where id = 3;
Updating In Bulk
You can update in bulk using the same approaches outlined in Inserting In Bulk.
Upserting a Row
- If another row already exists with the same set of primary key values, the other columns are updated to match the values from the row being "UPSERTed".
- If there is no row with the same set of primary key values, the row is created, the same as if the INSERT statement was used.
UPSERT INTO my_first_table VALUES (1, "jonathan"), (4, "james");
Deleting a Row
DELETE FROM my_first_table WHERE id < 3;You can even use more complex joins when deleting rows. For example, Impala uses a comma in the FROM sub-clause to specify a join query.
DELETE c FROM my_second_table c, stock_symbols s WHERE c.name = s.symbol;
Deleting In Bulk
You can delete in bulk using the same approaches outlined in Inserting In Bulk.
Failures During INSERT, UPDATE, UPSERT, and DELETE Operations
INSERT, UPDATE, and DELETE statements cannot be considered transactional as a whole. If one of these operations fails part of the way through, the keys may have already been created (in the case of INSERT) or the records may have already been modified or removed by another process (in the case of UPDATE or DELETE). You should design your application with this in mind.
Altering Table Properties
Rename an Impala Mapping Table
ALTER TABLE my_table RENAME TO my_new_table;Renaming a table using the ALTER TABLE ... RENAME statement only renames the Impala mapping table, regardless of whether the table is an internal or external table. This avoids disruption to other applications that may be accessing the underlying Kudu table.
Rename the underlying Kudu table for an internal tableIf a table is an internal table, the underlying Kudu table may be renamed by changing the kudu.table_name property:
ALTER TABLE my_internal_table SET TBLPROPERTIES('kudu.table_name' = 'new_name')
Remapping an external table to a different Kudu tableIf another application has renamed a Kudu table under Impala, it is possible to re-map an external table to point to a different Kudu table name.
ALTER TABLE my_external_table_ SET TBLPROPERTIES('kudu.table_name' = 'some_other_kudu_table')
Change the Kudu Master Addresses
ALTER TABLE my_table SET TBLPROPERTIES('kudu.master_addresses' = 'kudu-original-master.example.com:7051,kudu-new-master.example.com:7051');
Dropping a Kudu Table using Impala
DROP TABLE my_first_table;
Known Issues and Limitations
When creating a Kudu table, the CREATE TABLE statement must include the primary key columns before other columns, in primary key order.
Impala cannot update values in primary key columns.
Impala cannot create Kudu tables with TIMESTAMP, DECIMAL, VARCHAR, or nested-typed columns.
Kudu tables with a name containing upper case or non-ASCII characters must be assigned an alternate name when used as an external table in Impala.
Kudu tables with a column name containing upper case or non-ASCII characters may not be used as an external table in Impala. Non-primary key columns may be renamed in Kudu to work around this issue.
Kudu tables containing UNIXTIME_MICROS-typed columns may not be used as an external table in Impala.
NULL, NOT NULL, !=, and LIKE predicates are not pushed to Kudu, and instead will be evaluated by the Impala scan node. This may decrease performance relative to other types of predicates.
Updates, inserts, and deletes using Impala are non-transactional. If a query fails part of the way through, its partial effects will not be rolled back.
The maximum parallelism of a single query is limited to the number of tablets in a table. For good analytic performance, aim for 10 or more tablets per host or use large tables.