How to load table with JSONB data type into Aurora PostgreSQL using AWS Glue

In this blog post I will cover what is JSON data type, what options does PostgreSQL offers to store JSON data, how you can create AWS Glue connection to Aurora PostgreSQL database running in private subnet and how can you then use AWS Glue to write data into table with JSONB datatype into Aurora/RDS PostgreSQL database.

JSON (JavaScript Object Notation) is a format to store data. The data is stored in key/value or you can say name/value pairs. The below object defines a data object with key as “data” and rest as its value –

{
    "data": {
        "id": "1234", 
        "version": "1", 
        "receiver": "100", 
        "billing_date": "20210305", 
        "account": "11111111"
    }
}

In PostgreSQL you can store a JSON data in text datatype but to enforce that each stored value is valid according to the JSON rules it is optimal to store them in json or jsonb datatype. So what is the difference between json and jsonb datatype. PostgreSQL official document has it well stated –

The json and jsonb data types accept almost identical sets of values as input. The major practical difference is one of efficiency. The json data type stores an exact copy of the input text, which processing functions must reparse on each execution; while jsonb data is stored in a decomposed binary format that makes it slightly slower to input due to added conversion overhead, but significantly faster to process, since no reparsing is needed. jsonb also supports indexing, which can be a significant advantage.
Because the json type stores an exact copy of the input text, it will preserve semantically-insignificant white space between tokens, as well as the order of keys within JSON objects. Also, if a JSON object within the value contains the same key more than once, all the key/value pairs are kept. (The processing functions consider the last value as the operative one.) By contrast, jsonb does not preserve white space, does not preserve the order of object keys, and does not keep duplicate object keys. If duplicate keys are specified in the input, only the last value is kept.

To replicate a real scenario, I will create a AWS Glue connection, have a source and target table in the same Aurora PostgreSQL database, and have a AWS Glue job read the data from source table and write it to target table containing jsonb data type.

  1. Create a AWS Glue connection to store connection information of Aurora PostgreSQL. In this case the database is running in a private subnet. You can follow the below screenshots to create the AWS Glue connection for any database having JDBC URL and running in a private subnet.

AWS Glue > Data catalog > connections > Add connection

Review all the details and click Finish. Then test the connection and make sure it is successful.

Few things to note while creating the AWS Glue connection –

  • As the database is running in private subnet, you will need to choose private subnet within your VPC when creating the Glue connection.
  • The private subnet that you choose should be part of route table which has NAT Gateway attached.
  • The NAT Gateway should be attached to a public subnet.
  • The difference between public and private subnet is that the instances in the public subnet can send outbound traffic directly to the Internet whereas private subnet can access the Internet by using a network address translation (NAT) gateway that resides in the public subnet.

2. Create source and target tables in the same admin schema. In both the source and target table the “data” column is of jsonb data type. The column class is of text data type in source but of integer data type in target.

admin@fleetdb # CREATE TABLE pg_glue_src (id serial not null, data jsonb, name text, class text);
CREATE TABLE
Time: 9.611 ms
admin@fleetdb # CREATE TABLE pg_glue_trgt (id serial not null, data jsonb, name text, class int);
CREATE TABLE
Time: 13.034 ms
admin@fleetdb # \d pg_glue_src
                              Table "admin.pg_glue_src"
+--------+---------+-----------+----------+-----------------------------------------+
| Column |  Type   | Collation | Nullable |                 Default                 |
+--------+---------+-----------+----------+-----------------------------------------+
| id     | integer |           | not null | nextval('pg_glue_src_id_seq'::regclass) |
| data   | jsonb   |           |          |                                         |
| name   | text    |           |          |                                         |
| class  | text    |           |          |                                         |
+--------+---------+-----------+----------+-----------------------------------------+

admin@fleetdb # \d pg_glue_trgt
                              Table "admin.pg_glue_trgt"
+--------+---------+-----------+----------+------------------------------------------+
| Column |  Type   | Collation | Nullable |                 Default                  |
+--------+---------+-----------+----------+------------------------------------------+
| id     | integer |           | not null | nextval('pg_glue_trgt_id_seq'::regclass) |
| data   | jsonb   |           |          |                                          |
| name   | text    |           |          |                                          |
| class  | integer |           |          |                                          |
+--------+---------+-----------+----------+------------------------------------------+

3. Insert some dummy data to the source table pg_glue_src and select a single record –

admin@fleetdb # insert into pg_glue_src values (1, '{"id": "1234", "version": "1", "receiver": "100", "billing_date": "20210605", "account": "11111111"}', 'joe', null);
INSERT 0 1
Time: 2.263 ms
admin@fleetdb # insert into pg_glue_src values (2, '{"id": "2345", "version": "1", "receiver": "110", "billing_date": "20210604", "account": "11112222"}', 'john', '1');
INSERT 0 1
Time: 2.026 ms
admin@fleetdb # insert into pg_glue_src values (3, '{"id": "3456", "version": "2", "receiver": "120", "billing_date": "20210604", "account": "33331111"}', 'mark', '2');
INSERT 0 1
Time: 1.973 ms
admin@fleetdb # insert into pg_glue_src values (4, '{"id": "4567", "version": "3", "receiver": "130", "billing_date": "20210606", "account": "11444411"}', 'thomas', '3');
INSERT 0 1

admin@fleetdb # select * from pg_glue_src;
+----+------------------------------------------------------------------------------------------------------+--------+-------+
| id |                                                 data                                                 |  name  | class |
+----+------------------------------------------------------------------------------------------------------+--------+-------+
|  1 | {"id": "1234", "account": "11111111", "version": "1", "receiver": "100", "billing_date": "20210605"} | joe    | NULL  |
|  2 | {"id": "2345", "account": "11112222", "version": "1", "receiver": "110", "billing_date": "20210604"} | john   | 1     |
|  3 | {"id": "3456", "account": "33331111", "version": "2", "receiver": "120", "billing_date": "20210604"} | mark   | 2     |
|  4 | {"id": "4567", "account": "11444411", "version": "3", "receiver": "130", "billing_date": "20210606"} | thomas | 3     |
+----+------------------------------------------------------------------------------------------------------+--------+-------+
(4 rows)

admin@fleetdb # SELECT name, class FROM pg_glue_src WHERE data->>'account'='11444411';
+--------+-------+
|  name  | class |
+--------+-------+
| thomas | 3     |
+--------+-------+
(1 row)

Time: 2.923 ms

4. Create new AWS Glue Job to extract the data from pg_glue_src and write it to pg_glue_trgt. When adding the Glue job do not forget to attach the connection. In my case I selected “glue-fleetdb” before saving the job. Below is the pyspark code to read the data from source table into glue dynamicframe, perform applymapping to cast class column from string to integer and write the dynamicframe to target table.

import sys
import os
import logging

from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

from pyspark import SparkConf

## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)


logging.basicConfig(level=os.environ.get("LOGLEVEL", "INFO"),
                    format='[%(filename)s:%(lineno)s - %(funcName)10s() ] %(asctime)-15s %(message)s',
                    datefmt='%Y-%m-%d:%H:%M:%S')

logger = logging.getLogger(__name__)


db_username = "admin"
db_password = "xxxxxxxxxxx"
db_url = "jdbc:postgresql://cluster-fleetdb.cluster-xxxxxxxxxxx.xx-xxxx-x.rds.amazonaws.com:8192/fleetdb"
src_table_name = "pg_glue_src"


logger.info("Source connection options")
connection_options = {  
    "url": db_url,
    "user": db_username,
    "password": db_password,
    "dbtable": src_table_name,
}


logger.info("Create dynamicframe reading data from source table pg_glue_src having jsonb datatype")
datasource0 = glueContext.create_dynamic_frame_from_options(
    connection_type="postgresql", 
    connection_options=connection_options,
    transformation_ctx = "datasource0"
    )

datasource0.printSchema()

logger.info("Cast class column from string to int datatype")
applymapping0 = ApplyMapping.apply(frame=datasource0,
        mappings = [ 
            ("id", "int", "id", "int"),
            ("data", "string", "data", "string"),
            ("name", "string", "name", "string"),
            ("class", "string", "class", "int"), #casting string to integer
        ],
        transformation_ctx = "applymapping0"
        )

logger.info("Target connection options")
connection_aurora_options = {
    "url": db_url,
    "user": db_username,
    "password": db_password,
    "database": "fleetdb", #required
    "dbtable": "admin.pg_glue_trgt", #target table
    }

logger.info("Write data to target table pg_glue_trgt")
datasink0 = glueContext.write_dynamic_frame_from_jdbc_conf(
    frame = applymapping0, 
    catalog_connection = "glue-fleetdb",
    connection_options=connection_aurora_options,
    transformation_ctx = "datasink0"
)

This Glue job execution failed with – column “data” is of type jsonb but expression is of type character varying

An error occurred while calling o83.pyWriteDynamicFrame. ERROR: column "data" is of type jsonb but expression is of type character varying

py4j.protocol.Py4JJavaError: An error occurred while calling o83.pyWriteDynamicFrame.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 2 in stage 1.0 failed 4 times, most recent failure: Lost task 2.3 in stage 1.0 (TID 19, 10.0.50.63, executor 1): java.sql.BatchUpdateException: Batch entry 0 INSERT INTO "admin".pg_glue_trgt ("id","data","name","class") VALUES (4,'
{
    "id": "4567",
    "account": "11444411",
    "version": "3",
    "receiver": "130",
    "billing_date": "20210606"
}
','thomas',3) was aborted: ERROR: column "data" is of type jsonb but expression is of type character varying
  Hint: You will need to rewrite or cast the expression.
  Position: 74  Call getNextException to see other errors in the batch.

To resolve this issue you will need to have "stringtype":"unspecified" set in the connection options of the target for the write_dynamic_frame_from_jdbc_conf method.

logger.info("Target connection options")
connection_aurora_options = {
    "url": db_url,
    "user": db_username,
    "password": db_password,
    "database": "fleetdb",
    "dbtable": "admin.pg_glue_trgt",
    "stringtype":"unspecified",  ##required to resolve error
    }

logger.info("Write data to target table pg_glue_trgt")
datasink0 = glueContext.write_dynamic_frame_from_jdbc_conf(
    frame = applymapping0, 
    catalog_connection = "glue-fleetdb",
    connection_options=connection_aurora_options,
    transformation_ctx = "datasink0"
)

Having set the above parameter the Glue job completed successfully and the target table has all the records.

admin@fleetdb # select * from pg_glue_trgt;
+----+------------------------------------------------------------------------------------------------------+--------+-------+
| id |                                                 data                                                 |  name  | class |
+----+------------------------------------------------------------------------------------------------------+--------+-------+
|  1 | {"id": "1234", "account": "11111111", "version": "1", "receiver": "100", "billing_date": "20210605"} | joe    |  NULL |
|  3 | {"id": "3456", "account": "33331111", "version": "2", "receiver": "120", "billing_date": "20210604"} | mark   |     2 |
|  2 | {"id": "2345", "account": "11112222", "version": "1", "receiver": "110", "billing_date": "20210604"} | john   |     1 |
|  4 | {"id": "4567", "account": "11444411", "version": "3", "receiver": "130", "billing_date": "20210606"} | thomas |     3 |
+----+------------------------------------------------------------------------------------------------------+--------+-------+
(4 rows)

One caveat with this solution is that if the table has any row with null value in the jsonb data type column, the Glue job will insert all the other rows where there is value for jsonb column and fail in the end with below error –

py4j.protocol.Py4JJavaError: An error occurred while calling o83.pyWriteDynamicFrame.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 4 in stage 1.0 failed 4 times, most recent failure: Lost task 4.3 in stage 1.0 (TID 22, 10.0.62.171, executor 1): java.sql.BatchUpdateException: Batch entry 0 INSERT INTO "admin".pg_glue_trgt ("id","data","name","class") VALUES (5,NULL,'don',2) was aborted: ERROR: column "data" is of type jsonb but expression is of type character
  Hint: You will need to rewrite or cast the expression.
  Position: 74  Call getNextException to see other errors in the batch.

In the above job run which failed, the source table had total of 10 rows of which 2 rows had null value for data column and the AWS Glue job inserted 8 records in target table and failed with error.

So to conclude when you have table with jsonb data type, use "stringtype":"unspecified" parameter defined in the connection options for the target to load the data.

Hope this helps.

References –

https://www.postgresql.org/docs/current/datatype-json.html

Advertisement

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s