Amazon Athena – CTAS, UNLOAD, Parameterized Prepared statements, Partition Projection

Amazon Athena is a query service that enables users to analyze data in Amazon S3 using SQL. It uses Presto with ANSI SQL support and works with multiple data formats like CSV, JSON, Parquet, Avro and ORC. In this blog post I will go through below available features of Athena –

  • CREATE TABLE AS SELECT (CTAS)
  • UNLOAD
  • Parameterized Prepared Statement
  • Partition Projection

I have citibike dataset in S3 bucket using which I have created an external table from Athena console. The table has 21568 records and is partitioned in S3 by YYYY-MM.

CREATE EXTERNAL TABLE `source_citibike`(
`starttime` string,
`stoptime` string,
`start_station_id` bigint,
`start_station_name` string,
`start_station_latitude` double,
`start_station_longitude` double,
`end_station_id` bigint,
`end_station_name` string,
`end_station_latitude` double,
`end_station_longitude` double,
`bikeid` bigint,
`usertype` string,
`birth_year` bigint,
`gender` bigint)
PARTITIONED BY (
`partition_0` string)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ','
STORED AS INPUTFORMAT
'org.apache.hadoop.mapred.TextInputFormat'
OUTPUTFORMAT
'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
LOCATION
's3://learn-share-repeat-us-west-2/source/citibike/'
TBLPROPERTIES (
'CrawlerSchemaDeserializerVersion'='1.0',
'CrawlerSchemaSerializerVersion'='1.0',
'areColumnsQuoted'='false',
'averageRecordSize'='175',
'classification'='csv',
'columnsOrdered'='true',
'compressionType'='none',
'delimiter'=',',
'objectCount'='2',
'recordCount'='19656',
'sizeKey'='3440024',
'skip.header.line.count'='1',
'typeOfData'='file')

Sample Records –

starttimestoptimestart_station_idstart_station_namestart_station_latitudestart_station_longitudeend_station_idend_station_nameend_station_latitudeend_station_longitudebikeidusertypebirth_yeargenderpartition_0
12021-02-01 00:08:57.29402021-02-01 00:20:21.26803483Montgomery St40.71942-74.050993203Hamilton Park40.72759597-74.0442473140517Subscriber195812021-02
22021-02-01 00:21:12.49102021-02-01 01:08:42.95603203Hamilton Park40.72759597-74.044247313275Columbus Drive40.7183552-74.0389144440517Subscriber195812021-02
32021-02-01 00:53:08.75002021-02-01 01:02:14.389032205 Corners Library40.73496102-74.059503083212Christ Hospital40.73478582-74.0504436444907Customer199612021-02
42021-02-01 00:59:10.11102021-02-01 01:07:03.848032205 Corners Library40.73496102-74.059503083210Pershing Field40.74267714-74.0517886329727Subscriber198722021-02
52021-02-01 01:01:29.49902021-02-01 01:13:57.89403210Pershing Field40.74267714-74.051788633269Brunswick & 6th40.72601173-74.0503889345550Subscriber196212021-02
62021-02-01 01:01:43.91802021-02-01 01:07:57.50303792Columbus Dr at Exchange Pl40.71687-74.032813267Morris Canal40.71241882-74.0385255248753Subscriber197712021-02
72021-02-01 01:03:42.40302021-02-01 01:12:12.68903276Marin Light Rail40.71458404-74.042817063279Dixon Mills40.72163014-74.0499678341992Subscriber198812021-02
82021-02-01 01:13:31.83502021-02-01 01:19:05.58303202Newport PATH40.7272235-74.03375893639Harborside40.7192517-74.03423443766Subscriber197622021-02
92021-02-01 01:16:05.47002021-02-01 01:57:22.01103187Warren St40.7211236-74.038050953483Montgomery St40.71942-74.0509944852Subscriber195812021-02
102021-02-01 02:05:05.20902021-02-01 02:07:32.86603186Grove St PATH40.71958612-74.043117463187Warren St40.7211236-74.0380509546538Subscriber197212021-02

CREATE TABLE AS SELECT – CTAS

Using CREATE TABLE AS SELECT you can create a new table in Athena based on the results of SELECT SQL statement. The result of CTAS can be stored in PARQUETORCAVROJSON, and TEXTFILE formats. Athena supports writing to 100 unique partition and bucket combinations. You can create both partitioned and non-partitioned table using CTAS. To create a partitioned table use partitioned_by property in the command. You can provide an array list of columns by which the CTAS table will be partitioned. But you need to make sure that the names of partitioned columns are listed last in the list of columns in the SELECT statement of CTAS.

partitioned_by = ARRAY[ col_name[,…] ]

Example of CTAS SQL –

Below is CTAS SQL for creating a partitioned table based on usertype in Athena.

CREATE TABLE citibike.source_citibike_parquet
    WITH (
        format = 'PARQUET', 
        parquet_compression = 'SNAPPY', 
        external_location = 's3://learn-share-repeat-us-west-2/source/citibike_parquet/',
        partitioned_by = ARRAY['usertype'])
AS SELECT starttime, 
					stoptime, 
					start_station_id, 
					start_station_name, 
					start_station_latitude, 
					start_station_longitude, 
					end_station_id, 
					end_station_name, 
					end_station_latitude, 
					end_station_longitude, 
					bikeid, 
					birth_year, 
					gender, 
					partition_0 AS year_month, 
					usertype 
    FROM source_citibike ORDER BY start_station_id;

Due to ORDER BY clause in the SQL, the data stored within each parquet file in S3 is ordered by start_station_id.

Check the partitions using SQL –

SELECT * FROM citibike.”source_citibike_parquet$partitions”

Output –

usertype
1Customer
2Subscriber

To creating an Empty Copy of an Existing Table you can use the below SQL –

CREATE TABLE new_table AS SELECT * FROM old_table WITH NO DATA;

UNLOAD

You can use UNLOAD statement to output the results of a SELECT SQL into a non-csv format and when you do not require an associated table in Athena for the selected data set. UNLOAD statement supports – Apache Parquet, ORC, Apache Avro, and JSON formats. For UNLOAD statement Athena engine version 2 is required and it can be used with prepared statements.

Example of UNLOAD SQL –

In the below UNLOAD SQL, I am using CASE statement to return Female when the value is 1 else Male while unloading data to S3 in Parquet file format.

UNLOAD (SELECT  starttime, 
				stoptime, 
				start_station_id, 
				start_station_name, 
				start_station_latitude, 
				start_station_longitude, 
				end_station_id, 
				end_station_name, 
				end_station_latitude, 
				end_station_longitude, 
				bikeid, 
				birth_year, 
				CASE gender WHEN 1 THEN 'Female' ELSE 'Male' END AS gender, 
				partition_0 AS year_month, 
				usertype
        FROM source_citibike ORDER BY start_station_id) 
TO 's3://learn-share-repeat-us-west-2/source/citibank_unload/' 
WITH (format = 'PARQUET', partitioned_by = ARRAY['usertype'])

The Parquet file in S3 has Female and Male stored in data file.

2021-03-27 17:50:44.7490,2021-03-27 18:03:52.7040,3277,Communipaw & Berry Lane,40.71435837,-74.06661093,3276,Marin Light Rail,40.71458404,-74.04281706,46265,1968,Female,2021-03

2021-03-29 21:57:30.5960,2021-03-29 22:19:16.0210,3278,Monmouth and 6th,40.72568548,-74.04879034,3270,Jersey & 6th St,40.72528911,-74.04557168,33652,1977,Male,2021-03

One thing to note is, if partition key is not the last column in the SELECT SQL then the SQL will fail with below error –

HIVE_COLUMN_ORDER_MISMATCH: Partition keys must be the last columns in the table and in the same order as the table properties: [usertype]

Parameterized Prepared Statements

Prepared statements prepare the sql for execution. When the PREPARE statement is executed, the specified statement is parsed, analyzed, and rewritten. It avoids repetitive parse analysis work, while allowing the execution plan to depend on the specific parameter values supplied. Prepared statements are workgroup-specific, and the names must be unique within the workgroup.

Example of Parameterized Prepared Statement –

In the below example, I have created a prepared statement which takes one value for usertype and twop values for start_station_id as input.

PREPARE citibank_prepare_unload FROM 
UNLOAD (SELECT  starttime, 
				stoptime, 
				start_station_id, 
				start_station_name, 
				start_station_latitude, 
				start_station_longitude, 
				end_station_id, 
				end_station_name, 
				end_station_latitude, 
				end_station_longitude, 
				bikeid, 
				birth_year,
				CASE gender WHEN 1 THEN 'Female' ELSE 'Male' END AS gender, 
				partition_0 AS year_month,
                usertype
        FROM source_citibike WHERE usertype=? and start_station_id in (?, ?) ORDER BY start_station_id) 
TO 's3://learn-share-repeat-us-west-2/source/citibank_prepare_unload/' 
WITH (format = 'PARQUET')

Execute the prepared statement –

EXECUTE citibank_prepare_unload USING 'Customer', 3276, 3185
Time in queue:0.103 sec.    Run time:1 min 44.093 sec.    Data scanned:47.38 GB

Partition Projection

When you execute SQL query on a partitioned table in Athena, it makes a GetPartitions call to the AWS Glue Data Catalog before performing partition pruning. If the tables has a large number of partitions it can affect the performance negatively. So, to avoid it you can use Partition Projection which can reduce SQL runtime for queries that are constrained on partition metadata retrieval.

I have Amazon Customer Reviews Dataset stored in Amazon S3, which is partitioned by year. The table has data partitioned for 20 years – 1973, 1995 – 2013 and has total of 160796570 records.

SELECT count(*) from amazon_reviews_year #160796570

To enable partition pruning for table, you can follow the below steps –

  • Navigate to AWS Glue Service
  • Choose the table
    • AWS Glue > Data Catalog >Tables
  • Choose – Edit table
  • In the Table properties section, for each partitioned column, add the following key-value pair:
    • For Key, add projection.columnName.type.
    • For Value, add one of the supported types: enumintegerdate, or injected.
  • Set additional properties as in this case I have set projection.columnName.range to restricting the values that can be returned to a range from 2010 through 2013.
  • Also set Key projection.enabled and Value true to enable partition projection.
  • Choose Apply.

Below is the screenshot for the same –

Once completed, go to Athena Query Editor and perform the SELECT sqls. As you can notice, the below query now returns only 2010 – 2013 based on the Value set in projection.year.range

SELECT year from amazon_reviews_year
year
2010
2011
2012
2013

Total count of records from table –

SELECT count(*) from amazon_reviews_year #53087567

To conclude, you can use these different features available in Athena to solve some of your engineering problems and also optimize query processing of highly partitioned tables.

Hope you find it helpful!

2 thoughts on “Amazon Athena – CTAS, UNLOAD, Parameterized Prepared statements, Partition Projection

  1. Hi Anand. Thanks for your helpful writeup. I just wanted to clarify that UNLOAD does in fact support output in delimited text formats such as CSV. -Cheers.

    1. Thank you Luis for the feedback. I have tested it and will update the blog. Thank you again.

      UNLOAD (SELECT starttime,
      stoptime,
      start_station_id
      FROM source_citibike ORDER BY start_station_id)
      TO 's3://learn-share-repeat-us-west-2/source/citibank_unload/CSV/'
      WITH (format = 'TEXTFILE' , field_delimiter = ',')

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Google photo

You are commenting using your Google account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s