Using AWS Data Wrangler with AWS Glue Job 2.0 and Amazon Redshift connection

I will admit, AWS Data Wrangler has become my go to package for developing extract, transform, and load (ETL) data pipelines and other day-to-day scripts. AWS Data Wrangler integration with multiple big data AWS services like S3, Glue Catalog, Athena, Databases, EMR, and others makes life simple for engineers. It also provides the ability to import packages like Pandas and PyArrow to help writing transformations.

In this blog post I will walk you through a hypothetical use-case to read data from glue catalog table and obtain filter value to retrieve data from redshift. I would create glue connection with redshift, use AWS Data Wrangler with AWS Glue 2.0 to read data from Glue catalog table, retrieve filtered data from redshift database and write result data set to S3. Along the way I will also mention troubleshooting Glue network connection issues.

AWS Glue is a fully managed extract, transform, and load (ETL) service to process large amount of datasets from various sources for analytics and data processing.

AWS Glue Connection

You will need a glue connection to connect to reshift database via Glue job.

AWS Glue > Data catalog > connections > Add connection

Once you have created the connection, test it. During test, I faced the below error –

Reason: Could not find S3 endpoint or NAT gateway for subnetId: subnet-630140c22a02f8cc2 in Vpc vpc-xxxxxxxxxxx.

The reason for connection failure is quite straight forward. The subnet used for the connection does not have a S3 endpoint or NAT gateway. To look into more details, navigate to the VPC page on AWS console and select the subnet

VPC > Subnets > Filter > VPC: vpc-xxxxxxxxxxx > subnet

Review the route table the subnet is associated with. In this case, the subnet was associated with a route table that has a route to an Internet gateway making it a public subnet. To resolve this issue either associate the subnet to route table that has route to NAT gateway or edit the glue connection to use private subnet (using NAT gateway).

The difference between public and private subnet is that the instances in the public subnet can send outbound traffic directly to the Internet whereas private subnet can access the Internet by using a network address translation (NAT) gateway that resides in the public subnet. 

After making the changes, test the connection. This time the test connection will take some time but will succeed.

Add AWS Glue Job

Lets move ahead with creating a new Glue job . For this blog, I choose “A new script to be authored by you” and its gives an option Connections.

AWS Console > AWS Glue > ETL > Jobs > Add job

You can choose to enable “Continuous logging” in Monitoring options sections. Continuing ahead, down on the same page there is an option to add job parameters. AWS Data Wrangler development team has made the package integration simple. When adding a new job with Glue Version 2.0 all you need to do is specify “--additional-python-modules” as key in Job Parameters and “pyarrow==2,awswrangler” as value to use data wrangler (Updated – 04-27-21: Screenshot is from earlier versions and it might not work now)

AWS Console > AWS Glue > ETL > Jobs > Add job > Security configuration, script libraries, and job parameters (optional)

On the next page, choose the connection to be used by the job which in my case is “MyRedshift”.

Add the authored script –

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.dynamicframe import DynamicFrame
from awsglue.job import Job
import awswrangler as wr
import pandas as pd

## @params: [TempDir, JOB_NAME]
args = getResolvedOptions(sys.argv, ['TempDir','JOB_NAME'])

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)

db_username = "admin"
db_password = "xxxxxxxxx"

def GetTableData():
    ## get total row count using aws data wrangler
    getTotalRowCount = "SELECT count(*) as totalRowCount from orders"
    df_count = wr.athena.read_sql_query(
                 sql=getTotalRowCount, 
                 database="myredshift")
    totalRowCount = df_count.iloc[0,0]
    print("Total row count from Glue Table: ", totalRowCount)
    ## get max Order Date using aws data wrangler
    maxOrderDateSql = "SELECT max(o_orderdate) as maxOrderDate FROM orders"
    df = wr.athena.read_sql_query(sql=maxOrderDateSql, database="myredshift")
    maxOrderDate = df.iloc[0,0]
    print("MaxOrderdate from Glue Table: ", maxOrderDate)
    return maxOrderDate
 
print("Get the max order date from glue table myredshift.orders to create redsfhit query")
maxOrderDate = GetTableData()
query = "SELECT * from admin.orders where o_orderdate > '{}'".format(maxOrderDate)
print("Query to be executed in redshift: ", query)

## define the redshift connection options
connection_options = {  
    "url": "jdbc:redshift://dwtest.paidfoobarrr.us-east-1.redshift.amazonaws.com:5439/dwtest",
    "query": query,
    "user": db_username,
    "password": db_password,
    "redshiftTmpDir": args["TempDir"],
    "aws_iam_role": "arn:aws:iam::xxxxxxxxxxxx:role/dwtest-s3-access"
 }

## create glue dynamicframe</em>
df=glueContext.create_dynamic_frame_from_options(
               connection_type="redshift", 
               connection_options=connection_options)
#get row count
print("Record count from redshift: ", df.toDF().count())

## write the data to S3 location using glue catalog
sink = glueContext.write_dynamic_frame_from_catalog(
                 frame=df,
                 database="myredshift",
                 table_name="orders")
print("Completed writing data to Glue table myredshift.orders")
print("Get the total row count and current max order from Glue table")
GetTableData()

job.commit()

Log –

Get the max order date from glue table myredshift.orders to create redsfhit query  
Total row count from Glue Table:  69240242  
MaxOrderdate from Glue Table:  1997-12-31  
Query to be executed in redshift:  SELECT * from admin.orders where o_orderdate > '1997-12-31'  
Record count from redshift:  6759758  
Completed writing data to Glue table myredshift.orders  
Get the total row count and current max order from Glue table  
Total row count from Glue Table:  76000000  
MaxOrderdate from Glue Table:  1998-08-02 

Walkthrough of the authored script –

  • import awsglue libraries
  • import awswrangler and pandas
  • create glue context and spark session
  • get the max(o_orderdate) data from glue catalog table using wr.athena.read_sql_query function
  • Use the max order date to query the redshift database to get all records post that using create_dynamic_frame_from_options
  • write the data on S3 using write_dynamic_frame_from_catalog

In the background, Glue executes UNLOAD command to retrieve the data from redshift.

UNLOAD (
   'SELECT "o_orderkey", "o_custkey", "o_orderstatus", "o_totalprice", "o_orderdate", "o_orderpriority", "o_clerk", "o_shippriority", "o_comment" FROM (SELECT * from admin.orders where o_orderdate > \'1997-12-31\') '
 ) TO 's3://aws-glue-temporary-xxxxxxxxxxxx-us-east-1/admin/2c726fe4-6d85-4b81-92ee-69d9543640ae/' WITH CREDENTIALS '' ESCAPE MANIFEST

Even though the Glue connection succeeded you might still encounter job failure (happened to me atleast) –

  1. Glue job run fails with “Command failed with exit code 1”
  2. ModuleNotFoundError: No module named ‘awswrangler’

There could be few possible reasons for job failure but in my case it boiled down to the Subnet used for connection. So, make sure you are using private subnet, with NAT gateway attached to route table associated with the subnet. Also confirm you have NAT gateway that resides in the public subnet.

To conclude, in this blog post we learned

  • how to setup glue connection to redshift database
  • use of subnets and its importance
  • difference between public and private subnet
  • creating a glue job with aws data wrangle package
  • using aws data wrangler to query Glue catalog table
  • using the result of above data in filter to query the redshift database
  • unload the redshift data to S3 using glue dynamicframe
  • troubleshooting glue connections, module not found error

2 thoughts on “Using AWS Data Wrangler with AWS Glue Job 2.0 and Amazon Redshift connection

  1. Hi, thanks for a useful article!
    May I just ask a question about the script part?
    I am not sure, why it raises an error in my script – is there anything I have to change in this part, or may I just copy-paste it?

    @params: [TempDir, JOB_NAME]
    args = getResolvedOptions(sys.argv, [‘TempDir’,’JOB_NAME’])
    sc = SparkContext()
    glueContext = GlueContext(sc)
    spark = glueContext.spark_session
    job = Job(glueContext)
    job.init(args[‘JOB_NAME’], args)

    line 14 @params: [TempDir, JOB_NAME] ^ SyntaxError: invalid syntax

    1. Hi Jirka,

      Thank you for visiting the blog and reporting the code issue. I have updated the script. Kindly try it now and let me know incase you still face any issue.

      Regard,
      Anand

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s