Using Impala to Query Kudu Tables

You can use Impala to query tables stored by Apache Kudu. This capability allows convenient access to a storage system that is tuned for different kinds of workloads than the default with Impala.

By default, Impala tables are stored on HDFS using data files with various file formats. HDFS files are ideal for bulk loads (append operations) and queries using full-table scans, but do not support in-place updates or deletes. Kudu is an alternative storage engine used by Impala which can do both in-place updates (for mixed read/write workloads) and fast scans (for data-warehouse/analytic operations). Using Kudu tables with Impala can simplify the ETL pipeline by avoiding extra steps to segregate and reorganize newly arrived data.

Certain Impala SQL statements and clauses, such as DELETE, UPDATE, UPSERT, and PRIMARY KEY work only with Kudu tables. Other statements and clauses, such as LOAD DATA, TRUNCATE TABLE, and INSERT OVERWRITE, are not applicable to Kudu tables.

Benefits of Using Kudu Tables with Impala

The combination of Kudu and Impala works best for tables where scan performance is important, but data arrives continuously, in small batches, or needs to be updated without being completely replaced. HDFS-backed tables can require substantial overhead to replace or reorganize data files as new data arrives. Impala can perform efficient lookups and scans within Kudu tables, and Impala can also perform update or delete operations efficiently. You can also use the Kudu Java, C++, and Python APIs to do ingestion or transformation operations outside of Impala, and Impala can query the current data at any time.

Configuring Impala for Use with Kudu

The -kudu_master_hosts configuration property must be set correctly for the impalad daemon, for CREATE TABLE ... STORED AS KUDU statements to connect to the appropriate Kudu server. Typically, the required value for this setting is kudu_host:7051. In a high-availability Kudu deployment, specify the names of multiple Kudu hosts separated by commas. If you are using Cloudera Manager, no such configuration is required.

If the -kudu_master_hosts configuration property is not set, you can still associate the appropriate value for each table by specifying a TBLPROPERTIES('kudu.master_addresses') clause in the CREATE TABLE statement or changing the TBLPROPERTIES('kudu.master_addresses') value with an ALTER TABLE statement.

If you are using Cloudera Manager, for each Impala service, navigate to the Configuration tab and specify the Kudu service you want to use in the Kudu Service field.

Cluster Topology for Kudu Tables

With HDFS-backed tables, you are typically concerned with the number of DataNodes in the cluster, how many and how large HDFS data files are read during a query, and therefore the amount of work performed by each DataNode and the network communication to combine intermediate results and produce the final result set.

With Kudu tables, the topology considerations are different, because:

  • The underlying storage is managed and organized by Kudu, not represented as HDFS data files.

  • Kudu handles some of the underlying mechanics of partitioning the data. You can specify the partitioning scheme with combinations of hash and range partitioning, so that you can decide how much effort to expend to manage the partitions as new data arrives. For example, you can construct partitions that apply to date ranges rather than a separate partition for each day or each hour.

  • Data is physically divided based on units of storage called tablets. Tablets are stored by tablet servers. Each tablet server can store multiple tablets, and each tablet is replicated across multiple tablet servers, managed automatically by Kudu. Where practical, colocate the tablet servers on the same hosts as the DataNodes, although that is not required.

Kudu Replication Factor

By default, Kudu tables created through Impala use a tablet replication factor of 3. To change the replication factor for a Kudu table, specify the replication factor using the TBLPROPERTIES in the CREATE TABLE Statement statement as below where n is the replication factor you want to use:
TBLPROPERTIES ('kudu.num_tablet_replicas' = 'n')

The number of replicas for a Kudu table must be odd.

Altering the kudu.num_tablet_replicas property after table creation currently has no effect.

Impala DDL Enhancements for Kudu Tables (CREATE TABLE and ALTER TABLE)

You can use the Impala CREATE TABLE and ALTER TABLE statements to create and fine-tune the characteristics of Kudu tables. Because Kudu tables have features and properties that do not apply to other kinds of Impala tables, familiarize yourself with Kudu-related concepts and syntax first. For the general syntax of the CREATE TABLE statement for Kudu tables, see CREATE TABLE Statement.

Primary Key Columns for Kudu Tables

Kudu tables introduce the notion of primary keys to Impala for the first time. The primary key is made up of one or more columns, whose values are combined and used as a lookup key during queries. The tuple represented by these columns must be unique and cannot contain any NULL values, and can never be updated once inserted. For a Kudu table, all the partition key columns must come from the set of primary key columns.

The primary key has both physical and logical aspects:

  • On the physical side, it is used to map the data values to particular tablets for fast retrieval. Because the tuples formed by the primary key values are unique, the primary key columns are typically highly selective.

  • On the logical side, the uniqueness constraint allows you to avoid duplicate data in a table. For example, if an INSERT operation fails partway through, only some of the new rows might be present in the table. You can re-run the same INSERT, and only the missing rows will be added. Or if data in the table is stale, you can run an UPSERT statement that brings the data up to date, without the possibility of creating duplicate copies of existing rows.

Kudu-Specific Column Attributes for CREATE TABLE

For the general syntax of the CREATE TABLE statement for Kudu tables, see CREATE TABLE Statement. The following sections provide more detail for some of the Kudu-specific keywords you can use in column definitions.

The column list in a CREATE TABLE statement can include the following attributes, which only apply to Kudu tables:

  PRIMARY KEY
| [NOT] NULL
| ENCODING codec
| COMPRESSION algorithm
| DEFAULT constant_expression
| BLOCK_SIZE number

PRIMARY KEY Attribute

The primary key for a Kudu table is a column, or set of columns, that uniquely identifies every row. The primary key value also is used as the natural sort order for the values from the table. The primary key value for each row is based on the combination of values for the columns.

Because all of the primary key columns must have non-null values, specifying a column in the PRIMARY KEY clause implicitly adds the NOT NULL attribute to that column.

The primary key columns must be the first ones specified in the CREATE TABLE statement. For a single-column primary key, you can include a PRIMARY KEY attribute inline with the column definition. For a multi-column primary key, you include a PRIMARY KEY (c1, c2, ...) clause as a separate entry at the end of the column list.

You can specify the PRIMARY KEY attribute either inline in a single column definition, or as a separate clause at the end of the column list:

CREATE TABLE pk_inline
(
  col1 BIGINT PRIMARY KEY,
  col2 STRING,
  col3 BOOLEAN
) PARTITION BY HASH(col1) PARTITIONS 2 STORED AS KUDU;

CREATE TABLE pk_at_end
(
  col1 BIGINT,
  col2 STRING,
  col3 BOOLEAN,
  PRIMARY KEY (col1)
) PARTITION BY HASH(col1) PARTITIONS 2 STORED AS KUDU;

When the primary key is a single column, these two forms are equivalent. If the primary key consists of more than one column, you must specify the primary key using a separate entry in the column list:

CREATE TABLE pk_multiple_columns
(
  col1 BIGINT,
  col2 STRING,
  col3 BOOLEAN,
  PRIMARY KEY (col1, col2)
) PARTITION BY HASH(col2) PARTITIONS 2 STORED AS KUDU;

The SHOW CREATE TABLE statement always represents the PRIMARY KEY specification as a separate item in the column list:

CREATE TABLE inline_pk_rewritten (id BIGINT PRIMARY KEY, s STRING)
  PARTITION BY HASH(id) PARTITIONS 2 STORED AS KUDU;

SHOW CREATE TABLE inline_pk_rewritten;
+------------------------------------------------------------------------------+
| result                                                                       |
+------------------------------------------------------------------------------+
| CREATE TABLE user.inline_pk_rewritten (                                      |
|   id BIGINT NOT NULL ENCODING AUTO_ENCODING COMPRESSION DEFAULT_COMPRESSION, |
|   s STRING NULL ENCODING AUTO_ENCODING COMPRESSION DEFAULT_COMPRESSION,      |
|   PRIMARY KEY (id)                                                           |
| )                                                                            |
| PARTITION BY HASH (id) PARTITIONS 2                                          |
| STORED AS KUDU                                                               |
| TBLPROPERTIES ('kudu.master_addresses'='host.example.com')                   |
+------------------------------------------------------------------------------+

The notion of primary key only applies to Kudu tables. Every Kudu table requires a primary key. The primary key consists of one or more columns. You must specify any primary key columns first in the column list.

The contents of the primary key columns cannot be changed by an UPDATE or UPSERT statement. Including too many columns in the primary key (more than 5 or 6) can also reduce the performance of write operations. Therefore, pick the most selective and most frequently tested non-null columns for the primary key specification. If a column must always have a value, but that value might change later, leave it out of the primary key and use a NOT NULL clause for that column instead. If an existing row has an incorrect or outdated key column value, delete the old row and insert an entirely new row with the correct primary key.

NULL | NOT NULL Attribute

For Kudu tables, you can specify which columns can contain nulls or not. This constraint offers an extra level of consistency enforcement for Kudu tables. If an application requires a field to always be specified, include a NOT NULL clause in the corresponding column definition, and Kudu prevents rows from being inserted with a NULL in that column.

For example, a table containing geographic information might require the latitude and longitude coordinates to always be specified. Other attributes might be allowed to be NULL. For example, a location might not have a designated place name, its altitude might be unimportant, and its population might be initially unknown, to be filled in later.

Because all of the primary key columns must have non-null values, specifying a column in the PRIMARY KEY clause implicitly adds the NOT NULL attribute to that column.

For non-Kudu tables, Impala allows any column to contain NULL values, because it is not practical to enforce a "not null" constraint on HDFS data files that could be prepared using external tools and ETL processes.

CREATE TABLE required_columns
(
  id BIGINT PRIMARY KEY,
  latitude DOUBLE NOT NULL,
  longitude DOUBLE NOT NULL,
  place_name STRING,
  altitude DOUBLE,
  population BIGINT
) PARTITION BY HASH(id) PARTITIONS 2 STORED AS KUDU;

During performance optimization, Kudu can use the knowledge that nulls are not allowed to skip certain checks on each input row, speeding up queries and join operations. Therefore, specify NOT NULL constraints when appropriate.

The NULL clause is the default condition for all columns that are not part of the primary key. You can omit it, or specify it to clarify that you have made a conscious design decision to allow nulls in a column.

Because primary key columns cannot contain any NULL values, the NOT NULL clause is not required for the primary key columns, but you might still specify it to make your code self-describing.

DEFAULT Attribute

You can specify a default value for columns in Kudu tables. The default value can be any constant expression, for example, a combination of literal values, arithmetic and string operations. It cannot contain references to columns or non-deterministic function calls.

The following example shows different kinds of expressions for the DEFAULT clause. The requirement to use a constant value means that you can fill in a placeholder value such as NULL, empty string, 0, -1, 'N/A' and so on, but you cannot reference functions or column names. Therefore, you cannot use DEFAULT to do things such as automatically making an uppercase copy of a string value, storing Boolean values based on tests of other columns, or add or subtract one from another column representing a sequence number.

CREATE TABLE default_vals
(
  id BIGINT PRIMARY KEY,
  name STRING NOT NULL DEFAULT 'unknown',
  address STRING DEFAULT upper('no fixed address'),
  age INT DEFAULT -1,
  earthling BOOLEAN DEFAULT TRUE,
  planet_of_origin STRING DEFAULT 'Earth',
  optional_col STRING DEFAULT NULL
) PARTITION BY HASH(id) PARTITIONS 2 STORED AS KUDU;

ENCODING Attribute

Each column in a Kudu table can optionally use an encoding, a low-overhead form of compression that reduces the size on disk, then requires additional CPU cycles to reconstruct the original values during queries. Typically, highly compressible data benefits from the reduced I/O to read the data back from disk. By default, each column uses the "plain" encoding where the data is stored unchanged.

The encoding keywords that Impala recognizes are:
  • AUTO_ENCODING: use the default encoding based on the column type; currently always the same as PLAIN_ENCODING, but subject to change in the future.

  • PLAIN_ENCODING: leave the value in its original binary format.

  • RLE: compress repeated values (when sorted in primary key order) by including a count.

  • DICT_ENCODING: when the number of different string values is low, replace the original string with a numeric ID.

  • BIT_SHUFFLE: rearrange the bits of the values to efficiently compress sequences of values that are identical or vary only slightly based on primary key order. The resulting encoded data is also compressed with LZ4.

  • PREFIX_ENCODING: compress common prefixes in string values; mainly for use internally within Kudu.

The following example shows the Impala keywords representing the encoding types. (The Impala keywords match the symbolic names used within Kudu.) For usage guidelines on the different kinds of encoding, see the Kudu documentation. The DESCRIBE output shows how the encoding is reported after the table is created, and that omitting the encoding (in this case, for the ID column) is the same as specifying DEFAULT_ENCODING.

CREATE TABLE various_encodings
(
  id BIGINT PRIMARY KEY,
  c1 BIGINT ENCODING PLAIN_ENCODING,
  c2 BIGINT ENCODING AUTO_ENCODING,
  c3 TINYINT ENCODING BIT_SHUFFLE,
  c4 DOUBLE ENCODING BIT_SHUFFLE,
  c5 BOOLEAN ENCODING RLE,
  c6 STRING ENCODING DICT_ENCODING,
  c7 STRING ENCODING PREFIX_ENCODING
) PARTITION BY HASH(id) PARTITIONS 2 STORED AS KUDU;

-- Some columns are omitted from the output for readability.
describe various_encodings;
+------+---------+-------------+----------+-----------------+
| name | type    | primary_key | nullable | encoding        |
+------+---------+-------------+----------+-----------------+
| id   | bigint  | true        | false    | AUTO_ENCODING   |
| c1   | bigint  | false       | true     | PLAIN_ENCODING  |
| c2   | bigint  | false       | true     | AUTO_ENCODING   |
| c3   | tinyint | false       | true     | BIT_SHUFFLE     |
| c4   | double  | false       | true     | BIT_SHUFFLE     |
| c5   | boolean | false       | true     | RLE             |
| c6   | string  | false       | true     | DICT_ENCODING   |
| c7   | string  | false       | true     | PREFIX_ENCODING |
+------+---------+-------------+----------+-----------------+

COMPRESSION Attribute

You can specify a compression algorithm to use for each column in a Kudu table. This attribute imposes more CPU overhead when retrieving the values than the ENCODING attribute does. Therefore, use it primarily for columns with long strings that do not benefit much from the less-expensive ENCODING attribute.

The choices for COMPRESSION are LZ4, SNAPPY, and ZLIB.

The following example shows design considerations for several STRING columns with different distribution characteristics, leading to choices for both the ENCODING and COMPRESSION attributes. The country values come from a specific set of strings, therefore this column is a good candidate for dictionary encoding. The post_id column contains an ascending sequence of integers, where several leading bits are likely to be all zeroes, therefore this column is a good candidate for bitshuffle encoding. The body column and the corresponding columns for translated versions tend to be long unique strings that are not practical to use with any of the encoding schemes, therefore they employ the COMPRESSION attribute instead. The ideal compression codec in each case would require some experimentation to determine how much space savings it provided and how much CPU overhead it added, based on real-world data.

CREATE TABLE blog_posts
(
  user_id STRING ENCODING DICT_ENCODING,
  post_id BIGINT ENCODING BIT_SHUFFLE,
  subject STRING ENCODING PLAIN_ENCODING,
  body STRING COMPRESSION LZ4,
  spanish_translation STRING COMPRESSION SNAPPY,
  esperanto_translation STRING COMPRESSION ZLIB,
  PRIMARY KEY (user_id, post_id)
) PARTITION BY HASH(user_id, post_id) PARTITIONS 2 STORED AS KUDU;

BLOCK_SIZE Attribute

Although Kudu does not use HDFS files internally, and thus is not affected by the HDFS block size, it does have an underlying unit of I/O called the block size. The BLOCK_SIZE attribute lets you set the block size for any column.

The block size attribute is a relatively advanced feature. Refer to the Kudu documentation for usage details.

Partitioning for Kudu Tables

Kudu tables use special mechanisms to distribute data among the underlying tablet servers. Although we refer to such tables as partitioned tables, they are distinguished from traditional Impala partitioned tables by use of different clauses on the CREATE TABLE statement. Kudu tables use PARTITION BY, HASH, RANGE, and range specification clauses rather than the PARTITIONED BY clause for HDFS-backed tables, which specifies only a column name and creates a new partition for each different value.

For background information and architectural details about the Kudu partitioning mechanism, see the Kudu white paper, section 3.2.

Hash Partitioning

Hash partitioning is the simplest type of partitioning for Kudu tables. For hash-partitioned Kudu tables, inserted rows are divided up between a fixed number of "buckets" by applying a hash function to the values of the columns specified in the HASH clause. Hashing ensures that rows with similar values are evenly distributed, instead of clumping together all in the same bucket. Spreading new rows across the buckets this way lets insertion operations work in parallel across multiple tablet servers. Separating the hashed values can impose additional overhead on queries, where queries with range-based predicates might have to read multiple tablets to retrieve all the relevant values.

-- 1M rows with 50 hash partitions = approximately 20,000 rows per partition.
-- The values in each partition are not sequential, but rather based on a hash function.
-- Rows 1, 99999, and 123456 might be in the same partition.
CREATE TABLE million_rows (id string primary key, s string)
  PARTITION BY HASH(id) PARTITIONS 50
  STORED AS KUDU;

-- Because the ID values are unique, we expect the rows to be roughly
-- evenly distributed between the buckets in the destination table.
INSERT INTO million_rows SELECT * FROM billion_rows ORDER BY id LIMIT 1e6;

Range Partitioning

Range partitioning lets you specify partitioning precisely, based on single values or ranges of values within one or more columns. You add one or more RANGE clauses to the CREATE TABLE statement, following the PARTITION BY clause.

Range-partitioned Kudu tables use one or more range clauses, which include a combination of constant expressions, VALUE or VALUES keywords, and comparison operators. (This syntax replaces the SPLIT ROWS clause used with early Kudu versions.) For the full syntax, see CREATE TABLE Statement.

-- 50 buckets, all for IDs beginning with a lowercase letter.
-- Having only a single range enforces the allowed range of values
-- but does not add any extra parallelism.
create table million_rows_one_range (id string primary key, s string)
  partition by hash(id) partitions 50,
  range (partition 'a' <= values < '{')
  stored as kudu;

-- 50 buckets for IDs beginning with a lowercase letter
-- plus 50 buckets for IDs beginning with an uppercase letter.
-- Total number of buckets = number in the PARTITIONS clause x number of ranges.
-- We are still enforcing constraints on the primary key values
-- allowed in the table, and the 2 ranges provide better parallelism
-- as rows are inserted or the table is scanned.
create table million_rows_two_ranges (id string primary key, s string)
  partition by hash(id) partitions 50,
  range (partition 'a' <= values < '{', partition 'A' <= values < '[')
  stored as kudu;

-- Same as previous table, with an extra range covering the single key value '00000'.
create table million_rows_three_ranges (id string primary key, s string)
  partition by hash(id) partitions 50,
  range (partition 'a' <= values < '{', partition 'A' <= values < '[', partition value = '00000')
  stored as kudu;

-- The range partitioning can be displayed with a SHOW command in impala-shell.
show range partitions million_rows_three_ranges;
+---------------------+
| RANGE (id)          |
+---------------------+
| VALUE = "00000"     |
| "A" <= VALUES < "[" |
| "a" <= VALUES < "{" |
+---------------------+

For range-partitioned Kudu tables, an appropriate range must exist before a data value can be created in the table. Any INSERT, UPDATE, or UPSERT statements fail if they try to create column values that fall outside the specified ranges. The error checking for ranges is performed on the Kudu side; Impala passes the specified range information to Kudu, and passes back any error or warning if the ranges are not valid. (A nonsensical range specification causes an error for a DDL statement, but only a warning for a DML statement.)

Ranges can be non-contiguous:

partition by range (year) (partition 1885 <= values <= 1889, partition 1893 <= values <= 1897)

partition by range (letter_grade) (partition value = 'A', partition value = 'B',
  partition value = 'C', partition value = 'D', partition value = 'F')

The ALTER TABLE statement with the ADD PARTITION or DROP PARTITION clauses can be used to add or remove ranges from an existing Kudu table.

ALTER TABLE foo ADD PARTITION 30 <= VALUES < 50;
ALTER TABLE foo DROP PARTITION 1 <= VALUES < 5;

When a range is added, the new range must not overlap with any of the previous ranges; that is, it can only fill in gaps within the previous ranges.

alter table test_scores add range partition value = 'E';

alter table year_ranges add range partition 1890 <= values < 1893;

When a range is removed, all the associated rows in the table are deleted. (This is true whether the table is internal or external.)

alter table test_scores drop range partition value = 'E';

alter table year_ranges drop range partition 1890 <= values < 1893;

Kudu tables can also use a combination of hash and range partitioning.

partition by hash (school) partitions 10,
  range (letter_grade) (partition value = 'A', partition value = 'B',
    partition value = 'C', partition value = 'D', partition value = 'F')

Working with Partitioning in Kudu Tables

To see the current partitioning scheme for a Kudu table, you can use the SHOW CREATE TABLE statement or the SHOW PARTITIONS statement. The CREATE TABLE syntax displayed by this statement includes all the hash, range, or both clauses that reflect the original table structure plus any subsequent ALTER TABLE statements that changed the table structure.

To see the underlying buckets and partitions for a Kudu table, use the SHOW TABLE STATS or SHOW PARTITIONS statement.

Handling Date, Time, or Timestamp Data with Kudu

In CDH 5.12 / Impala 2.9 and higher, you can include TIMESTAMP columns in Kudu tables, instead of representing the date and time as a BIGINT value. The behavior of TIMESTAMP for Kudu tables has some special considerations:
  • Any nanoseconds in the original 96-bit value produced by Impala are not stored, because Kudu represents date/time columns using 64-bit values. The nanosecond portion of the value is rounded, not truncated. Therefore, a TIMESTAMP value that you store in a Kudu table might not be bit-for-bit identical to the value returned by a query.

  • The conversion between the Impala 96-bit representation and the Kudu 64-bit representation introduces some performance overhead when reading or writing TIMESTAMP columns. You can minimize the overhead during writes by performing inserts through the Kudu API. Because the overhead during reads applies to each query, you might continue to use a BIGINT column to represent date/time values in performance-critical applications.

  • The Impala TIMESTAMP type has a narrower range for years than the underlying Kudu data type. Impala can represent years 1400-9999. If year values outside this range are written to a Kudu table by a non-Impala client, Impala returns NULL by default when reading those TIMESTAMP values during a query. Or, if the ABORT_ON_ERROR query option is enabled, the query fails when it encounters a value with an out-of-range year.

--- Make a table representing a date/time value as TIMESTAMP.
-- The strings representing the partition bounds are automatically
-- cast to TIMESTAMP values.
create table native_timestamp(id bigint, when_exactly timestamp, event string, primary key (id, when_exactly))
  partition by hash (id) partitions 20,
  range (when_exactly)
  (
    partition '2015-01-01' <= values < '2016-01-01',
    partition '2016-01-01' <= values < '2017-01-01',
    partition '2017-01-01' <= values < '2018-01-01'
  )
  stored as kudu;

insert into native_timestamp values (12345, now(), 'Working on doc examples');

select * from native_timestamp;
+-------+-------------------------------+-------------------------+
| id    | when_exactly                  | event                   |
+-------+-------------------------------+-------------------------+
| 12345 | 2017-05-31 16:27:42.667542000 | Working on doc examples |
+-------+-------------------------------+-------------------------+

Because Kudu tables have some performance overhead to convert TIMESTAMP columns to the Impala 96-bit internal representation, for performance-critical applications you might store date/time information as the number of seconds, milliseconds, or microseconds since the Unix epoch date of January 1, 1970. Specify the column as BIGINT in the Impala CREATE TABLE statement, corresponding to an 8-byte integer (an int64) in the underlying Kudu table). Then use Impala date/time conversion functions as necessary to produce a numeric, TIMESTAMP, or STRING value depending on the context.

For example, the unix_timestamp() function returns an integer result representing the number of seconds past the epoch. The now() function produces a TIMESTAMP representing the current date and time, which can be passed as an argument to unix_timestamp(). And string literals representing dates and date/times can be cast to TIMESTAMP, and from there converted to numeric values. The following examples show how you might store a date/time column as BIGINT in a Kudu table, but still use string literals and TIMESTAMP values for convenience.

-- now() returns a TIMESTAMP and shows the format for string literals you can cast to TIMESTAMP.
select now();
+-------------------------------+
| now()                         |
+-------------------------------+
| 2017-01-25 23:50:10.132385000 |
+-------------------------------+

-- unix_timestamp() accepts either a TIMESTAMP or an equivalent string literal.
select unix_timestamp(now());
+------------------+
| unix_timestamp() |
+------------------+
| 1485386670       |
+------------------+

select unix_timestamp('2017-01-01');
+------------------------------+
| unix_timestamp('2017-01-01') |
+------------------------------+
| 1483228800                   |
+------------------------------+

-- Make a table representing a date/time value as BIGINT.
-- Construct 1 range partition and 20 associated hash partitions for each year.
-- Use date/time conversion functions to express the ranges as human-readable dates.
create table time_series(id bigint, when_exactly bigint, event string, primary key (id, when_exactly))
  partition by hash (id) partitions 20,
  range (when_exactly)
  (
    partition unix_timestamp('2015-01-01') <= values < unix_timestamp('2016-01-01'),
    partition unix_timestamp('2016-01-01') <= values < unix_timestamp('2017-01-01'),
    partition unix_timestamp('2017-01-01') <= values < unix_timestamp('2018-01-01')
  )
  stored as kudu;

-- On insert, we can transform a human-readable date/time into a numeric value.
insert into time_series values (12345, unix_timestamp('2017-01-25 23:24:56'), 'Working on doc examples');

-- On retrieval, we can examine the numeric date/time value or turn it back into a string for readability.
select id, when_exactly, from_unixtime(when_exactly) as 'human-readable date/time', event
  from time_series order by when_exactly limit 100;
+-------+--------------+--------------------------+-------------------------+
| id    | when_exactly | human-readable date/time | event                   |
+-------+--------------+--------------------------+-------------------------+
| 12345 | 1485386696   | 2017-01-25 23:24:56      | Working on doc examples |
+-------+--------------+--------------------------+-------------------------+

-- 1 million and 1 microseconds = 1.000001 seconds.
select microseconds,
  cast (microseconds as decimal(20,7)) / 1e6 as fractional_seconds
  from table_with_microsecond_column;
+--------------+----------------------+
| microseconds | fractional_seconds   |
+--------------+----------------------+
| 1000001      | 1.000001000000000000 |
+--------------+----------------------+

How Impala Handles Kudu Metadata

Much of the metadata for Kudu tables is handled by the underlying storage layer. Kudu tables have less reliance on the metastore database, and require less metadata caching on the Impala side. For example, information about partitions in Kudu tables is managed by Kudu, and Impala does not cache any block locality metadata for Kudu tables.

The REFRESH and INVALIDATE METADATA statements are needed less frequently for Kudu tables than for HDFS-backed tables. Neither statement is needed when data is added to, removed, or updated in a Kudu table, even if the changes are made directly to Kudu through a client program using the Kudu API. Run REFRESH table_name or INVALIDATE METADATA table_name for a Kudu table only after making a change to the Kudu table schema, such as adding or dropping a column.

Because Kudu manages the metadata for its own tables separately from the metastore database, there is a table name stored in the metastore database for Impala to use, and a table name on the Kudu side, and these names can be modified independently through ALTER TABLE statements.

To avoid potential name conflicts, the prefix impala:: and the Impala database name are encoded into the underlying Kudu table name:

create database some_database;
use some_database;

create table table_name_demo (x int primary key, y int)
  partition by hash (x) partitions 2 stored as kudu;

describe formatted table_name_demo;
...
kudu.table_name  | impala::some_database.table_name_demo

See Kudu Tables for examples of how to change the name of the Impala table in the metastore database, the name of the underlying Kudu table, or both.

Loading Data into Kudu Tables

Kudu tables are well-suited to use cases where data arrives continuously, in small or moderate volumes. To bring data into Kudu tables, use the Impala INSERT and UPSERT statements. The LOAD DATA statement does not apply to Kudu tables.

Because Kudu manages its own storage layer that is optimized for smaller block sizes than HDFS, and performs its own housekeeping to keep data evenly distributed, it is not subject to the "many small files" issue and does not need explicit reorganization and compaction as the data grows over time. The partitions within a Kudu table can be specified to cover a variety of possible data distributions, instead of hardcoding a new partition for each new day, hour, and so on, which can lead to inefficient, hard-to-scale, and hard-to-manage partition schemes with HDFS tables.

Your strategy for performing ETL or bulk updates on Kudu tables should take into account the limitations on consistency for DML operations.

Make INSERT, UPDATE, and UPSERT operations idempotent: that is, able to be applied multiple times and still produce an identical result.

If a bulk operation is in danger of exceeding capacity limits due to timeouts or high memory usage, split it into a series of smaller operations.

Avoid running concurrent ETL operations where the end results depend on precise ordering. In particular, do not rely on an INSERT ... SELECT statement that selects from the same table into which it is inserting, unless you include extra conditions in the WHERE clause to avoid reading the newly inserted rows within the same statement.

Because relationships between tables cannot be enforced by Impala and Kudu, and cannot be committed or rolled back together, do not expect transactional semantics for multi-table operations.

Impala DML Support for Kudu Tables (INSERT, UPDATE, DELETE, UPSERT)

Impala supports certain DML statements for Kudu tables only. The UPDATE and DELETE statements let you modify data within Kudu tables without rewriting substantial amounts of table data. The UPSERT statement acts as a combination of INSERT and UPDATE, inserting rows where the primary key does not already exist, and updating the non-primary key columns where the primary key does already exist in the table.

The INSERT statement for Kudu tables honors the unique and NOT NULL requirements for the primary key columns.

Because Impala and Kudu do not support transactions, the effects of any INSERT, UPDATE, or DELETE statement are immediately visible. For example, you cannot do a sequence of UPDATE statements and only make the changes visible after all the statements are finished. Also, if a DML statement fails partway through, any rows that were already inserted, deleted, or changed remain in the table; there is no rollback mechanism to undo the changes.

In particular, an INSERT ... SELECT statement that refers to the table being inserted into might insert more rows than expected, because the SELECT part of the statement sees some of the new rows being inserted and processes them again.

Starting from CDH 5.12 / Impala 2.9, the INSERT or UPSERT operations into Kudu tables automatically add an exchange and a sort node to the plan that partitions and sorts the rows according to the partitioning/primary key scheme of the target table (unless the number of rows to be inserted is small enough to trigger single node execution). Since Kudu partitions and sorts rows on write, pre-partitioning and sorting takes some of the load off of Kudu and helps large INSERT operations to complete without timing out. However, this default behavior may slow down the end-to-end performance of the INSERT or UPSERT operations. Starting from , you can use the /* +NOCLUSTERED */ and /* +NOSHUFFLE */ hints together to disable partitioning and sorting before the rows are sent to Kudu. Additionally, since sorting may consume a large amount of memory, consider setting the MEM_LIMIT query option for those queries.

Consistency Considerations for Kudu Tables

Kudu tables have consistency characteristics such as uniqueness, controlled by the primary key columns, and non-nullable columns. The emphasis for consistency is on preventing duplicate or incomplete data from being stored in a table.

Currently, Kudu does not enforce strong consistency for order of operations, total success or total failure of a multi-row statement, or data that is read while a write operation is in progress. Changes are applied atomically to each row, but not applied as a single unit to all rows affected by a multi-row DML statement. That is, Kudu does not currently have atomic multi-row statements or isolation between statements.

If some rows are rejected during a DML operation because of a mismatch with duplicate primary key values, NOT NULL constraints, and so on, the statement succeeds with a warning. Impala still inserts, deletes, or updates the other rows that are not affected by the constraint violation.

Consequently, the number of rows affected by a DML operation on a Kudu table might be different than you expect.

Because there is no strong consistency guarantee for information being inserted into, deleted from, or updated across multiple tables simultaneously, consider denormalizing the data where practical. That is, if you run separate INSERT statements to insert related rows into two different tables, one INSERT might fail while the other succeeds, leaving the data in an inconsistent state. Even if both inserts succeed, a join query might happen during the interval between the completion of the first and second statements, and the query would encounter incomplete inconsistent data. Denormalizing the data into a single wide table can reduce the possibility of inconsistency due to multi-table operations.

Information about the number of rows affected by a DML operation is reported in impala-shell output, and in the PROFILE output, but is not currently reported to HiveServer2 clients such as JDBC or ODBC applications.

Security Considerations for Kudu Tables

Security for Kudu tables involves:

  • Sentry authorization.

    Access to Kudu tables must be granted to and revoked from roles as usual. Only users with ALL privileges on SERVER can create external Kudu tables. Currently, access to a Kudu table is "all or nothing": enforced at the table level rather than the column level, and applying to all SQL operations rather than individual statements such as INSERT. Because non-SQL APIs can access Kudu data without going through Sentry authorization, currently the Sentry support is considered preliminary and subject to change.

  • Kerberos authentication. See Kudu Security for details.

  • TLS encryption. See Kudu Security for details.

  • Lineage tracking.

  • Auditing.

  • Redaction of sensitive information from log files.

Impala Query Performance for Kudu Tables

For queries involving Kudu tables, Impala can delegate much of the work of filtering the result set to Kudu, avoiding some of the I/O involved in full table scans of tables containing HDFS data files. This type of optimization is especially effective for partitioned Kudu tables, where the Impala query WHERE clause refers to one or more primary key columns that are also used as partition key columns. For example, if a partitioned Kudu table uses a HASH clause for col1 and a RANGE clause for col2, a query using a clause such as WHERE col1 IN (1,2,3) AND col2 > 100 can determine exactly which tablet servers contain relevant data, and therefore parallelize the query very efficiently.

See EXPLAIN Statement for examples of evaluating the effectiveness of the predicate pushdown for a specific query against a Kudu table.

The TABLESAMPLE clause of the SELECT statement does not apply to a table reference derived from a view, a subquery, or anything other than a real base table. This clause only works for tables backed by HDFS or HDFS-like data files, therefore it does not apply to Kudu or HBase tables.