Using Impala at Scale at Allstate
- by Justin Kestelyn (@kestelyn)
- May 15, 2014
- no comments
Our thanks to Don Drake (@dondrake), an independent technology consultant who is currently working as a Principal Big Data Consultant at Allstate Insurance, for the guest post below about his experiences with Impala.
It started with a simple request from one of the managers in my group at Allstate to put together a demo of Tableau connecting to Cloudera Impala. I had previously worked on Impala with a large dataset about a year ago while it was still in beta, and was curious to see how Impala had improved since then in features and stability.
In this post, I’ll share my experiences with Impala during the process of building my demo. As you’ll see below, I learned that unlike traditional computing environments that can’t effectively process the full spectrum of your records — thereby limiting the power of your data – with Impala, you do not have to narrow that spectrum!
Generating a Dataset
I immediately began to brainstorm a good dataset, thinking it should be based on internal data that is large enough to stress Impala and our CDH cluster.
I have been working on a project that calculates earned premiums across a large dataset as a Hadoop Streaming job. This job provides ad hoc reporting with hundreds of possible dimensions to choose from as well as a handful of facts. My thought was to take the output of this job using all dimensions and all facts, as that would yield a dataset that is very wide as well as rich in rows. I chose numerous years of input for my Streaming job and generated a dataset with more than 800 columns and approximately 1.1 billion rows stored in HDFS as an uncompressed CSV file. This file was 2.3TB in size.
This output represented a countrywide dataset that spans numerous years, and is a magnitude or two larger of what is normally processed by our user community. Once loaded into Impala, this would be a great dataset to showcase query performance as well as Tableau reporting.
Loading the Dataset
After you have a dataset sitting in a directory in HDFS, the fastest way to start using it in Impala (or even Hive) is through External Tables. Basically, you create a table with the EXTERNAL
keyword, supply column names and data types as usual, and then reference the table location in HDFS. This is a schema-on-read scenario, so although the create table might succeed, you will still have to query and inspect the data before you realize you made a mistake creating the external table.
The following is a subset of my create table DDL and executed by pasting into impala-shell
:
DROP TABLE IF EXISTS default.dataset_2010_2013; CREATE EXTERNAL TABLE IF NOT EXISTS default.dataset_2010_2013 ( PROC_MONTH STRING, PROC_YEAR STRING, POLCT STRING, COMPNY STRING, GEOST STRING,STATST STRING, [800+ column definitions removed for brevity] EEXP REAL, EPREM REAL, EXP REAL, PREM REAL ) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LINES TERMINATED BY 'n' STORED AS TEXTFILE LOCATION '/user/drake/cdf_impala';
(Note: For aggregation/demo purposes, I have defined only facts, not dimensions, as decimals in the schema here.)
All queries were run on a 35-node Hadoop cluster running CDH 4.6 and Impala 1.2.4. The cluster was lightly loaded running MapReduce jobs while I ran the following queries. (With each SQL statement, I will also show the EXPLAIN PLAN
output, as this will provide some insight into how Impala is constructing the execution strategy for the query.) After I created a table referencing our data in the HDFS directory /user/drake/cdf_impala
, I checked to see how many rows are available:
[datanode01:21000] > select count(*) from default.dataset_2010_2013; Query: select count(*) from default.dataset_2010_2013 +------------+ | count(*) | +------------+ | 1168654867 | +------------+ Returned 1 row(s) in 171.40s [datanode01:21000] > explain select count(*) from default.dataset_2010_2013; Query: explain select count(*) from default.dataset_2010_2013 +--------------------------------------------------------------+ | Explain String | +--------------------------------------------------------------+ | PLAN FRAGMENT 0 | | PARTITION: UNPARTITIONED | | | | 3:AGGREGATE (merge finalize) | | | output: SUM(COUNT(*)) | | | | | 2:EXCHANGE | | | | PLAN FRAGMENT 1 | | PARTITION: RANDOM | | | | STREAM DATA SINK | | EXCHANGE ID: 2 | | UNPARTITIONED | | | | 1:AGGREGATE | | | output: COUNT(*) | | | | | 0:SCAN HDFS | | table=default.dataset_2010_2013 #partitions=1/1 size=2.28TB | +--------------------------------------------------------------+ Returned 20 row(s) in 0.16s
So, I had a 1.1 billion row dataset and it took just under three minutes for Impala to tell me that. The explain plan shows that it had to scan 2.28TB of data.
I consider that response time to be quite fast, but “fast” is also a relative term. For comparison, the exact same Hive query took eight minutes to return a row count.
Next, I ran a more difficult query for Impala: a distinct count of geographic areas by year:
[datanode01:21000] > explain select count(distinct GEOST), ACTYR from default.dataset_2010_2013 GROUP BY ACTYR; Query: explain select count(distinct GEOST), ACTYR from default.dataset_2010_2013 GROUP BY ACTYR +--------------------------------------------------------------+ | Explain String | +--------------------------------------------------------------+ | PLAN FRAGMENT 0 | | PARTITION: UNPARTITIONED | | | | 5:EXCHANGE | | | | PLAN FRAGMENT 1 | | PARTITION: HASH_PARTITIONED: ACTYR | | | | STREAM DATA SINK | | EXCHANGE ID: 5 | | UNPARTITIONED | | | | 2:AGGREGATE (merge finalize) | | | output: COUNT(GEOST) | | | group by: ACTYR | | | | | 4:AGGREGATE (merge) | | | group by: ACTYR, GEOST | | | | | 3:EXCHANGE | | | | PLAN FRAGMENT 2 | | PARTITION: RANDOM | | | | STREAM DATA SINK | | EXCHANGE ID: 3 | | HASH_PARTITIONED: ACTYR | | | | 1:AGGREGATE | | | group by: ACTYR, GEOST | | | | | 0:SCAN HDFS | | table=default.dataset_2010_2013 #partitions=1/1 size=2.28TB | +--------------------------------------------------------------+ Returned 33 row(s) in 0.15s [datanode01:21000] > select count(distinct GEOST), ACTYR from default.dataset_2010_2013 GROUP BY ACTYR; Query: select count(distinct GEOST), ACTYR from default.dataset_2010_2013 GROUP BY ACTYR +-----------------------+-------+ | count(distinct geost) | actyr | +-----------------------+-------+ | 42 | 112 | | 42 | 110 | | 42 | 113 | | 42 | 111 | +-----------------------+-------+ Returned 4 row(s) in 142.63s
From the explain plan, you can see Impala had to scan the entire dataset before it could aggregate. The query time came down a little bit after the row-count query, probably because of disk caching on the data nodes.
Then, I tweaked the query to see only one year’s of results. As you can see below, if you do limit the query for a single year, the explain plan shows it must still scan the entire dataset:
[datanode01:21000] > explain select count(distinct GEOST), ACTYR from default.dataset_2010_2013 WHERE ACTYR = '111' GROUP BY ACTYR; Query: explain select count(distinct GEOST), ACTYR from default.dataset_2010_2013 WHERE ACTYR = '111' GROUP BY ACTYR +--------------------------------------------------------------+ | Explain String | +--------------------------------------------------------------+ | PLAN FRAGMENT 0 | | PARTITION: UNPARTITIONED | | | | 5:EXCHANGE | | | | PLAN FRAGMENT 1 | | PARTITION: HASH_PARTITIONED: ACTYR | | | | STREAM DATA SINK | | EXCHANGE ID: 5 | | UNPARTITIONED | | | | 2:AGGREGATE (merge finalize) | | | output: COUNT(GEOST) | | | group by: ACTYR | | | | | 4:AGGREGATE (merge) | | | group by: ACTYR, GEOST | | | | | 3:EXCHANGE | | | | PLAN FRAGMENT 2 | | PARTITION: RANDOM | | | | STREAM DATA SINK | | EXCHANGE ID: 3 | | HASH_PARTITIONED: ACTYR | | | | 1:AGGREGATE | | | group by: ACTYR, GEOST | | | | | 0:SCAN HDFS | | table=default.dataset_2010_2013 #partitions=1/1 size=2.28TB | | predicates: ACTYR = '111' | +--------------------------------------------------------------+ Returned 34 row(s) in 0.21s [datanode01:21000] > select count(distinct GEOST), ACTYR from default.dataset_2010_2013 WHERE ACTYR = '111' GROUP BY ACTYR; Query: select count(distinct GEOST), ACTYR from default.dataset_2010_2013 WHERE ACTYR = '111' GROUP BY ACTYR +-----------------------+-------+ | count(distinct geost) | actyr | +-----------------------+-------+ | 42 | 111 | +-----------------------+-------+ Returned 1 row(s) in 122.36s
This looks pretty good. I got aggregates on the entire dataset as well as a subset of it in minutes!
But wait: One can improve these queries by utilizing some features available to Impala – specifically, partitions and Parquet.
Optimization using Partitions and Parquet
Data partitioning is not unique to Impala; it is available in just about every RDBMS and provides performance by physically separating data based on one or more column values. For example, if normal query patterns reveal that most queries on a table only look at a single year (as shown in our previous example), and if you partition your table by year, the query planner can easily ignore other years’ worth of data not needed by your query. That happens to be the case for our dataset, and we will partition our dataset by these known columns.
Parquet support in Impala is where Impala really shines. Parquet is an open source serialization format that stores data in a binary column-oriented fashion. Instead of how row-oriented data is stored, where every column for a row is stored together and then followed by the next row (again with columns stored next to each other), Parquet turns things on its head. Instead, Parquet will take a group of records and store the values of the first column together for the entire row group, followed by the values of the second column, and so on. Parquet has optimizations for scanning individual columns, so it doesn’t have to read the entire row group if you are only interested in a subset of columns.
When you have data with low cardinality, as we have in our example with our ACTYR column, we have millions of rows of data with the value "111". Parquet can utilize old-school compression strategies like run length encoding (RLE) and dictionary encoding, as well as modern data compression codecs like Snappy. This approach allows the data to be compressed at a very high rate, which will reduce I/O times and further reduce query times.
Let’s put these two features to use. Lucky for us, it is really easy for us to add support for both partitioning with by adding a few lines of DDL to our create table. Our strategy is to create a new table that has partitions defined with Parquet enabled and then load our existing data into this new table.
DROP TABLE IF EXISTS default.datasetp_2010_2013; CREATE EXTERNAL TABLE IF NOT EXISTS default.datasetp_2010_2013 ( PROC_MONTH STRING, PROC_YEAR STRING, POLCT STRING, STATST STRING, [800+ column definitions removed for brevity] EEXP REAL, EPREM REAL, EXP REAL, PREM REAL ) PARTITIONED BY (ACTYR STRING, GEOST STRING, ALINE STRING, COMPNY STRING) STORED AS PARQUET LOCATION '/user/drake/cdf_impala_part';
Our DDL for this new table has two additional statements. The first tells Impala to partition the data on four columns: the accounting year, geographic area, line of business, and company values. Impala will handle ensuring data is put into the correct partition and utilize these partitions for optimal queries.
The other additional statement says to store the data in the Parquet serialization format. By default, Impala will use Snappy compression.
With a few additional statements our new table is now partitioned, optimized for Parquet, and compressed. As you can see, using Parquet and Impala is pretty easy.
Reload
Now that our new table is ready to go, let’s populate it.
INSERT INTO default.datasetp_2010_2013 ( PROC_MONTH, PROC_YEAR, POLCT, CLINE, STATST, ALLORO, ACTMO, ACTDA, [800+ column definitions removed for brevity] ) PARTITION (ACTYR, GEOST, ALINE, COMPNY) SELECT PROC_MONTH, PROC_YEAR, POLCT, CLINE, STATST, ALLORO, ACTMO, ACTDA, [800+ column definitions removed for brevity] ACTYR, GEOST, ALINE, COMPNY FROM default.dataset_2010_2013;
The INSERT
syntax specifies the PARTITION
-ed columns. We do not specify any values in the INSERT
statement as we are utilizing dynamic partitioning. This allows the data to be partitioned automatically based on the values from the SELECT
statement. One trick to note is that the partition column names must appear at the end of the list of columns in the SELECT
statement.
This simple INSERT
statement is doing a lot underneath the covers: partitioning the data on disk and creating the Parquet-based and Snappy compressed data files. It takes six hours for this INSERT
statement to complete.
In the next section, we will see if this extra processing is worth it.
Time to Query
I’ll re-run the previous queries against the newly partitioned table as well as run an EXPLAIN plan before running the query to see what the optimizer is doing.
[datanode01:21000] > explain select count(*) from default.datasetp_2010_2013; Query: explain select count(*) from default.datasetp_2010_2013 +---------------------------------------------------------------------+ | Explain String | +---------------------------------------------------------------------+ | PLAN FRAGMENT 0 | | PARTITION: UNPARTITIONED | | | | 3:AGGREGATE (merge finalize) | | | output: SUM(COUNT(*)) | | | | | 2:EXCHANGE | | | | PLAN FRAGMENT 1 | | PARTITION: RANDOM | | | | STREAM DATA SINK | | EXCHANGE ID: 2 | | UNPARTITIONED | | | | 1:AGGREGATE | | | output: COUNT(*) | | | | | 0:SCAN HDFS | | table=default.datasetp_2010_2013 #partitions=787/787 size=105.90GB | +---------------------------------------------------------------------+ Returned 20 row(s) in 0.07s [datanode01:21000] > select count(*) from default.datasetp_2010_2013; Query: select count(*) from default.datasetp_2010_2013 +------------+ | count(*) | +------------+ | 1168654867 | +------------+ Returned 1 row(s) in 1.94s
As you can see, we have a dramatic improvement over the original Impala table. The explain plan shows that there are now 787 partitions in this table and that the total size of the table is 106GB, down from 2.28TB.
The first time I saw these numbers I thought for sure I made a mistake in creating the table – could it possibly shrink by 2.28TB to 106GB? To prove to myself that I did have a billion rows represented in this dataset, I ran counts on distinct policy numbers to verify it. Only then was I confident that it could be used for running further queries.
Let’s see how our query on geographic regions performs on the new table.
[datanode01:21000] > explain select count(distinct GEOST), ACTYR from default.datasetp_2010_2013 GROUP BY ACTYR; Query: explain select count(distinct GEOST), ACTYR from default.datasetp_2010_2013 GROUP BY ACTYR +---------------------------------------------------------------------+ | Explain String | +---------------------------------------------------------------------+ | PLAN FRAGMENT 0 | | PARTITION: UNPARTITIONED | | | | 5:EXCHANGE | | | | PLAN FRAGMENT 1 | | PARTITION: HASH_PARTITIONED: ACTYR | | | | STREAM DATA SINK | | EXCHANGE ID: 5 | | UNPARTITIONED | | | | 2:AGGREGATE (merge finalize) | | | output: COUNT(GEOST) | | | group by: ACTYR | | | | | 4:AGGREGATE (merge) | | | group by: ACTYR, GEOST | | | | | 3:EXCHANGE | | | | PLAN FRAGMENT 2 | | PARTITION: RANDOM | | | | STREAM DATA SINK | | EXCHANGE ID: 3 | | HASH_PARTITIONED: ACTYR | | | | 1:AGGREGATE | | | group by: ACTYR, GEOST | | | | | 0:SCAN HDFS | | table=default.datasetp_2010_2013 #partitions=787/787 size=105.90GB | +---------------------------------------------------------------------+ Returned 33 row(s) in 0.06s [datanode01:21000] > select count(distinct GEOST), ACTYR from default.datasetp_2010_2013 GROUP BY ACTYR; Query: select count(distinct GEOST), ACTYR from default.datasetp_2010_2013 GROUP BY ACTYR +-----------------------+-------+ | count(distinct geost) | actyr | +-----------------------+-------+ | 42 | 112 | | 42 | 110 | | 42 | 113 | | 42 | 111 | +-----------------------+-------+ Returned 4 row(s) in 8.69s
The explain plan shows a full-table scan again (but this time on 106GB of data) and now the query returns in under 10 seconds. We went from calculating aggregates in minutes to seconds!
Now we run the GROUP BY
query and filter on a single year again:
[datanode01:21000] > explain select count(distinct GEOST), ACTYR from default.datasetp_2010_2013 WHERE ACTYR = '111' GROUP BY ACTYR; Query: explain select count(distinct GEOST), ACTYR from default.datasetp_2010_2013 WHERE ACTYR = '111' GROUP BY ACTYR +--------------------------------------------------------------------+ | Explain String | +--------------------------------------------------------------------+ | PLAN FRAGMENT 0 | | PARTITION: UNPARTITIONED | | | | 5:EXCHANGE | | | | PLAN FRAGMENT 1 | | PARTITION: HASH_PARTITIONED: ACTYR | | | | STREAM DATA SINK | | EXCHANGE ID: 5 | | UNPARTITIONED | | | | 2:AGGREGATE (merge finalize) | | | output: COUNT(GEOST) | | | group by: ACTYR | | | | | 4:AGGREGATE (merge) | | | group by: ACTYR, GEOST | | | | | 3:EXCHANGE | | | | PLAN FRAGMENT 2 | | PARTITION: RANDOM | | | | STREAM DATA SINK | | EXCHANGE ID: 3 | | HASH_PARTITIONED: ACTYR | | | | 1:AGGREGATE | | | group by: ACTYR, GEOST | | | | | 0:SCAN HDFS | | table=default.datasetp_2010_2013 #partitions=198/787 size=24.25GB | +--------------------------------------------------------------------+ Returned 33 row(s) in 0.22s [datanode01:21000] > select count(distinct GEOST), ACTYR from default.datasetp_2010_2013 WHERE ACTYR = '111' GROUP BY ACTYR; Query: select count(distinct GEOST), ACTYR from default.datasetp_2010_2013 WHERE ACTYR = '111' GROUP BY ACTYR +-----------------------+-------+ | count(distinct geost) | actyr | +-----------------------+-------+ | 42 | 111 | +-----------------------+-------+ Returned 1 row(s) in 1.77s
Here you can see that the optimizer used 198 of the 787 partitions and only had to read 24GB of data. This query on the other table took just over two minutes to complete, compared to just under two seconds on our new table.
Conclusion
With some assistance from a colleague I was able to do a live demo of Tableau against the partitioned table with reports presented in seconds. The demo was so fast, in fact, that I had to remind the group a few times that we were analyzing a billion-row dataset with a few clicks.
From the above, you should now realize that running queries against an Impala data store is fast and scales as your cluster scales. Furthermore, by including partitioning and Parquet, dramatic improvements in query time are possible with minimal effort.
Marrying our most powerful source of data with this technology can put critical information quickly in the hands of our decision makers. Our data source is very complex; it requires specialized skills to stage, merge, and summarize data into actionable information. However, with Impala, we can make objective, customer-focused decisions by providing trusted actionable information to decision makers at record speed, and also allow them to hand-select which information is important to them.
Now, I’m looking for a larger dataset!
Don Drake (@dondrake) is an independent technology consultant in the Chicago area. He can be reached at don@drakeconsulting.com.
Filed under: