How-to: Use Impala and Kudu Together for Analytic Workloads

Using Apache Impala (incubating) on top of Apache Kudu (incubating) has significant performance benefits

Apache Kudu (incubating) is the newest addition to the set of storage engines that integrate with the Apache Hadoop ecosystem. The promise of Kudu is to deliver high-scan performance, targeting analytical workloads, while allowing users to concurrently insert, update, and delete records. With these properties, Kudu becomes a viable alternative to existing combinations of HDFS and/or Apache HBase to achieve similar results with less complicated ETL pipelines, comparable performance, and a lighter maintenance burden.

In this blog post, we want to describe how Apache Impala (incubating) integrates with Kudu for analytic SQL queries on Hadoop and how it takes full advantage of the distinct properties of Kudu.

Scanning Data from Kudu

When scanning data from Kudu, Impala behaves no differently than when the data is stored in HDFS. Instead of scanning data from HDFS to feed into the different operators, such as joins or aggregations, Impala scans the data from Kudu. When Kudu is deployed to a cluster, Impala assumes the same locality principles as for HDFS, meaning that it typically expects a Kudu tablet server to be co-located with an Impala daemon to reduce network transfer.

Before a SQL query is executed, the query is analyzed and planned in Impala’s query front end. During query planning, Impala will fetch the table metadata from Kudu, which contains information about the physical location and the key ranges that are associated with each tablet. A scan range defines the unit of work for Impala’s scan nodes and, currently, each tablet will be associated with exactly one scan range.

In the query plan, a new type of scan node (the Kudu scan node) is now available, which is analogous to the HDFS and HBase scan nodes. Inside the Kudu scan node, Impala uses the public client API to communicate with Kudu. It is noteworthy that Impala does not consume the raw table format of Kudu; instead, it instantiates scans from the client that are then executed by Kudu daemons. Impala will start one thread to execute each of the assigned scan ranges, which currently have one-to-one mapping with Kudu tablets. If there are multiple tablets to scan local to a single node, Impala will start multiple threads and scan them in parallel.

Like Apache Parquet, Kudu stores the data on disk column-wise and optionally encoded and compressed, leading to very efficient scan projections that are subsets of the total number of columns in a table. However, simply scanning and returning all the rows to perform filtering in Impala would not yield the desired performance. Rather, Impala exploits Kudu’s ability to associate predicates with the scan that is executed by Kudu. If the predicate concerns a table’s primary key, then whole scan ranges might be skipped completely; if the predicate concerns a non-key column, then it is evaluated on the Kudu side and only the rows that satisfy the predicate are returned. (Currently, for predicates that concern a table’s primary key that would cause whole scan ranges [Kudu tablets] to be skipped, Impala still initiates the scans; however, they are transformed into no-ops on the Kudu side and the scans return immediately. In the future, for this case, the scans will be not be created on the Impala side at all.)

As Kudu does not yet support all column types or predicate expression, during query planning an extra step will extract all those predicates that can be executed by Kudu. Kudu currently supports the evaluation of predicates of types <=, =, and => on Integer, String, Float, and Double columns. Moreover, in the case of integer typed columns, Impala will perform a constant transformation that will allow pushing down < and > typed predicates.

Pushing down as many predicates as possible significantly improves scan performance. If there are range predicates that concern key columns, Kudu will skip all data that is guaranteed not to satisfy the predicate, which it can do efficiently thanks to Kudu’s primary-key indexes. After finding the data that satisfies the primary-key predicates, Kudu will materialize non-key columns that were also assigned predicates and select the rows that satisfy all predicates, and only then will Kudu read the remaining columns. In contrast, with Impala and Parquet storage, all columns are materialized for predicates that apply to non-key columns.

Inserts/Updates/Deletes

The main focus of Impala is to batch-load data and enable fast analytics on top of that data. Even though single inserts have always been possible with Impala, they have been highly discouraged because each single insert will end up being stored in a single file. Tables backed with Kudu don’t have this limitation, and thus Impala with Kudu tables behaves more like a traditional database system than an optimized analytical data warehouse. For Impala to make the maximum use of these additional capabilities, however, it was necessary to extend the SQL language support in Impala for UPDATE and single-row DELETE operations.

Inserts into a Kudu table are typically fed either by a scan from another table or via single inserts so they do not have a precondition on the source table. For UPDATE and DELETE operations, the situation is slightly different. UPDATEs and DELETEs to Kudu have an API that is key-centric–meaning that to modify a record, at least the full primary key needs to be specified in addition to the columns that are modified. The primary key itself cannot be updated in a single operation, only as a combination of a DELETE and an UPDATE. However, because both operations are executed independently, there are no consistency guarantees that span both operations. In Impala, DELETE and UPDATE operations are implemented in a very similar manner by first issuing a scan to Kudu to retrieve the matching rows and writing back the modifications. In case of a DELETE, only the primary keys are sent back to Kudu to identify the records that will be deleted.

Even though Kudu provides transactional properties for single-record operations, executing complex DELETE and UPDATE queries that yield the update rows evaluating complex non-primary key predicates and potentially joins with other tables will not be executed in a transactional context. To avoid potentially concurrent UPDATEs or DELETEs, we added an additional IGNORE keyword: INSERT IGNORE, UPDATE IGNORE, and DELETE IGNORE are specifically designed to ignore problems when a record with the same primary key already exists (INSERT IGNORE) or the record no longer exists (UPDATE and DELETE IGNORE).

Performance Considerations

When the analytical pipeline is set and performance needs to be optimized, keep your eye on a few things. First of all, although Kudu provides efficient single-row inserts and updates, Impala is not optimized for these kinds of queries. If you expect your workload to be heavily driven by such very small operations, it might be advisable to use the Kudu API directly instead of going through Impala. In addition, you should always try to batch as many inserts as possible to achieve the best performance. In the case of inserts, this batching can be achieved by inserting multiple rows at once using the values clause or the INSERT INTO table SELECT notation.

When inserting data into Kudu using the INSERT INTO table SELECT notation, it is possible to increase the throughput by increasing the batch size in Impala to a higher number. Be advised that a higher batch size has direct impact on the memory consumption of Impala. In addition, there is a limit on the amount of data that Kudu can send in a single RPC of approximately 8MB. If the batch size becomes too large, write operations will fail because of this property.

Impala uses scan ranges to perform scheduling of reads on a particular host, and currently a single tablet is the smallest unit of work for now, so having too few tablets will result in suboptimal performance. On the other end of the spectrum, having too many tablets with too few rows on average per tablet will incur a higher coordination and network overhead and thus may slow down queries. When using Impala with Kudu, we found that a general rule of thumb is to have as many tablets per host as you want to scan in parallel. For example, assuming that you have 10 servers with 16 cores each, account for at least 16 tablets per server and 160 tablets in total. (Please be advised that this simple rule only works if there is enough data per tablet.)

Future Work

With the Impala-Kudu integration recently merged into the Impala main line, the team’s immediate focus is on stability, scalability, and performance. In particular, we’ve been working on queries with very large datasets and on some leftover items from the big merge.

The Kudu team has been hard at work on partition pruning improvements, which we will pull into the Impala integration as soon as they are ready. These improvements should help when running queries that affect a small number of ranges at the same time, by reducing considerably the number of partitions to scan (and thus scan nodes to start). Further down the line, we intend to implement the Apache Arrow in-memory data format and to share memory between Kudu and Impala, which we expect will help with performance and resource usage.

Both projects welcome your contributions in these (and all other) areas; so please reach out!

David Alves is a Software Engineer at Cloudera, and a Kudu committer.