Add new partitions in AWS Glue Data Catalog from AWS Glue Job

Given that you have a partitioned table in AWS Glue Data Catalog, there are few ways in which you can update the Glue Data Catalog with the newly created partitions.

  1. Run MSCK REPAIR TABLE <database>.<table_name> in AWS Athena service.
  2. Rerun the AWS Glue crawler .

Recently, AWS Glue service team has added a new feature (or say parameter for Glue job) using which you can immediately view the newly created partitions in Glue Data Catalog.

To demo this, I will pre-create an empty partitioned table using Amazon Athena Service with target location to S3. I have another S3 location which acts as the data source for creating the AWS Glue DynamicFrame. I will enable AWS Glue Job Bookmark feature to read and process only the newly added objects from source S3 location. The AWS Glue ETL job will process the source data and write the data to target S3 location along with updating the Glue Data Catalog with newly created partitions.

  1. As the first step, I create table orders_history partitioned by year and month. The LOCATION parameter specifies the target S3 location for the table’s data. And if you would have noticed, I am using JSON format.
CREATE EXTERNAL TABLE `orders_history`(
  `o_orderkey` bigint COMMENT 'from deserializer', 
  `o_custkey` bigint COMMENT 'from deserializer', 
  `o_orderstatus` string COMMENT 'from deserializer', 
  `o_totalprice` decimal(38,18) COMMENT 'from deserializer', 
  `o_orderdate` date COMMENT 'from deserializer', 
  `o_orderpriority` string COMMENT 'from deserializer', 
  `o_clerk` string COMMENT 'from deserializer', 
  `o_shippriority` int COMMENT 'from deserializer', 
  `o_comment` string COMMENT 'from deserializer')
PARTITIONED BY ( 
  `year` int, 
  `month` int)
ROW FORMAT SERDE 
  'org.openx.data.jsonserde.JsonSerDe' 
STORED AS INPUTFORMAT 
  'org.apache.hadoop.mapred.TextInputFormat' 
OUTPUTFORMAT 
  'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
LOCATION
  's3://anand-playground/athena/default/orders_history/'
TBLPROPERTIES (
  'classification'='json')

2. Add a new AWS Glue job. In the ‘This job runs’ section choose “A new script to be authored by you” and enable Job bookmark.

AWS Glue > Jobs > Add job

3. Click Next and add the AWS Glue job script. Notice the argument “enableUpdateCatalog” in the script. This parameter enables the AWS Glue job to update the Glue Data Catalog during the job run as new partitions are created. With this parameter enabled, there is no need to execute msck repair table command or rerun the crawler every time new partitions are added to the table.

import sys

from awsglue.job import Job
from awsglue.transforms import *
from awsglue.context import GlueContext
from awsglue.utils import getResolvedOptions
from awsglue.dynamicframe import DynamicFrame

from pyspark.context import SparkContext
import pyspark.sql.functions as functions

## @params: [TempDir, JOB_NAME]
args = getResolvedOptions(sys.argv, ['TempDir','JOB_NAME'])

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
temp_dir = args["TempDir"]

# source dataset location
S3_location = "s3://anand-playground/dwtest/orders/"

# create aws glue dynamicframe using create_dynamic_frame_from_options by
# reading the source s3 location. With AWS Glue job bookmark feature enabled 
# the job will process incremental data since the last job run avoiding 
# duplicate processing.
datasource0 = glueContext.create_dynamic_frame_from_options(
            connection_type="s3",
            connection_options = {
                "paths": [S3_location]
            },
            format="parquet",
            additional_options={"useS3ListImplementation": True},
            transformation_ctx="default_orders")

# convert aws glue dynamicframe to spark dataframe 
df1 = datasource0.toDF()

# add 2 new columns year and month which will be used for partitioning the data        
partitiondf = (df1
                .withColumn('year', functions.year(functions.col('o_orderdate')))
                .withColumn('month', functions.month(functions.col('o_orderdate')))           
            )

# convert spark dataframe to aws glue dynamicframe
gluedf = DynamicFrame.fromDF(
                partitiondf, glueContext, "gluedf")

# parameter "enableUpdateCatalog" tells the aws glue job to update the
# glue data catalog  as the new partitions are created
additionalOptions = {"enableUpdateCatalog": True}

# define the partition keys 
additionalOptions["partitionKeys"] = ["year", "month"]

# write the data using write_dynamic_frame_from_catalog. In this case the target is
# s3 location.
sink = glueContext.write_dynamic_frame_from_catalog(
    frame=gluedf, 
    database="default",
    table_name="orders_history",
    additional_options=additionalOptions,
    transformation_ctx="default_orders_history"
    )

# commit the glue job
job.commit()

4. Before the first execution of AWS Glue job

5. After the execution of AWS Glue job

6. You can also verify the data from AWS Athena console using SQL

  • If you are using Athena Engine 1
#Athena engine version 1.
SELECT *
FROM   information_schema.__internal_partitions__
WHERE  table_schema = 'default'
       AND table_name = 'orders_history'
ORDER  BY partition_number
  • If using Athena Engine 2
#Athena engine version 2.
SELECT * FROM default."orders_history$partitions" ORDER BY 1,2

6. Post some time as more data was added to source s3 location, I reran the AWS Glue job. As Glue job bookmark feature was enabled the job only processed incremental dataset. After the job completed, it added the new partitions to Glue Data Catalog and the total partition count increased to 20.

Currently there are few restrictions with this feature:-

  • As of now only S3 is supported as target.
  • json, avro, csv and glueparquet are the only formats supported currently.
  • If you use parquet as format, the AWS Glue job will complete successfully (no errors reported) but Data Catalog is not updated.
  • The order of the field/column names in partitionKeys parameter should match with the order of partition keys as in Glue Data Catalog.

To conclude, enableUpdateCatalog is a nice feature addition to AWS Glue jobs. These days many data pipelines are running incremental ETLs using AWS Glue job. Once the job completes, you will have to either trigger Glue crawler or call msck repair command to update the Glue Data Catalog with newly created partition. Over the time as the number of partitions increase the msck repair command or glue crawler execution will consume more time ( One thing to note is that AWS has also recently released incremental crawls functionality in AWS Glue ). So with enableUpdateCatalog argument you do not need to depend on msck command or Glue crawler to update the Data Catalog and the new partitions will be immediately available to view.

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Google photo

You are commenting using your Google account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s