Export table from Aurora PostgreSQL to Amazon S3

In this blog post I discuss how to export 100GB non-partitioned table from Aurora PostgreSQL to Amazon S3. I will walk you through two approaches that you can use to export the data. Firstly I will demonstrate using aws_s3, a PostgreSQL extension which Aurora PostgreSQL provides and then using AWS Glue service. The post also covers the performance and scaling challenges when exporting the table using AWS Glue.

Basic Information

  • Database Details – I am running PostgreSQL 12.4 version on Aurora 4.0.2 version. The writer instance is running on db.r5.2xlarge having 8 vCPUs, 2 threads per code and 64Gb memory.
admin@fleetdb # \i basic.sql
+-----------------------------------+-------------------------------------------------------------------------------------------------+
|              metric               |                                              value                                              |
+-----------------------------------+-------------------------------------------------------------------------------------------------+
| Postgres Version                  | PostgreSQL 12.4 on x86_64-pc-linux-gnu, compiled by x86_64-pc-linux-gnu-gcc (GCC) 7.4.0, 64-bit |
| Started At                        | 2021-05-28 06:45:47+00                                                                          |
| Uptime                            | 10 days 10:55:35                                                                                |
| Database Name                     | fleetdb                                                                                         |
| Database Size                     | 130 GB                                                                                          |
| Installed Extensions              | apg_plan_mgmt (2.0), aws_commons (1.1), aws_s3 (1.1), plpgsql (1.0)                             |
| Cache Effectiveness               | 57.88%                                                                                          |
| Successful Commits                | 99.98%                                                                                          |
| Conflicts                         | 0                                                                                               |
| Temp Files: total size            | 14 GB                                                                                           |
| Temp Files: total number of files | 15                                                                                              |
| Deadlocks                         | 0                                                                                               |
| Stats Since                       | 2021-05-28 06:45:49+00                                                                          |
| Stats Age                         | 10 days 10:55:33                                                                                |
+-----------------------------------+-------------------------------------------------------------------------------------------------+
(14 rows)
  • Table Details – We will export the data for table “article” . The table is 100Gb in size and is non-partitioned.
CREATE TABLE article (
  id SERIAL UNIQUE NOT NULL,
  code VARCHAR(10) NOT NULL,
  article TEXT,
  name TEXT NOT NULL,
  department VARCHAR(4)
);


insert into article (
    code, article, name, department
)
select
    left(md5(i::text), 10),
    md5(random()::text),
    md5(random()::text),
    left(md5(random()::text), 4)
from generate_series(1, 1000000) s(i)

admin@fleetdb # \d article
                                       Table "admin.article"
+------------+-----------------------+-----------+----------+-------------------------------------+
|   Column   |         Type          | Collation | Nullable |               Default               |
+------------+-----------------------+-----------+----------+-------------------------------------+
| id         | integer               |           | not null | nextval('article_id_seq'::regclass) |
| code       | character varying(10) |           | not null |                                     |
| article    | text                  |           |          |                                     |
| name       | text                  |           | not null |                                     |
| department | character varying(4)  |           |          |                                     |
+------------+-----------------------+-----------+----------+-------------------------------------+
Indexes:
    "article_id_key" UNIQUE CONSTRAINT, btree (id)


admin@fleetdb # \i table_size.sql 
Enter the relation name:article
+-------+--------------+------------+----------------+--------------+-------------+-------------+--------------+--------+-------+------------+--------+
|  oid  | table_schema | table_name |  row_estimate  | total_bytes  | index_bytes | toast_bytes | table_bytes  | total  | index |   toast    | table  |
+-------+--------------+------------+----------------+--------------+-------------+-------------+--------------+--------+-------+------------+--------+
| 74288 | admin        | article    | 1.00000006e+09 | 139518713856 | 22461349888 |        8192 | 117057355776 | 130 GB | 21 GB | 8192 bytes | 109 GB |
+-------+--------------+------------+----------------+--------------+-------------+-------------+--------------+--------+-------+------------+--------+
(1 row)


admin@fleetdb # select * from article limit 5;
+---------+------------+----------------------------------+----------------------------------+------------+
|   id    |    code    |             article              |               name               | department |
+---------+------------+----------------------------------+----------------------------------+------------+
| 1078401 | ad5cd87412 | c213ddd48db9e5c0d2386bc0fedd59cf | 9b331bc79d967cc4403c56d7e51f275a | 7c5d       |
| 1078402 | 5adfc37e8b | 0ffb8cdb521fafe16a4caed63b13f2d3 | 8cf51fc7ce7d316dce6da10947c4a538 | 7517       |
| 1078403 | 8e1302db3b | bbf7c48351fb478559082c087119d318 | 33c525bca96e89d11bb041f43f5af1e4 | b66e       |
| 1078404 | 2588231ba5 | 1592cd3e92b2cf54b159b03f35e35303 | 7bb4db38f74a78ef0ac196fa088fdaf8 | 7657       |
| 1078405 | 47fa1ae0d5 | e2a0e8c29749765f6e01593f7b87735c | 1d58ee700b8bef4b681cf3aa8980445e | f435       |
+---------+------------+----------------------------------+----------------------------------+------------+
(5 rows)

  • IAM Role – We will need an IAM role for aws_s3 and when using AWS Glue service to export and write the data to S3. The IAM role should have permission to read and write data to S3 bucket (added as an inline policy) along with AWSGlueServiceRole (AWS managed Policy) attached.

AWS Console > IAM > Create Role

  • Click ‘Next: Permissions’, search and add “AWSGlueServiceRole” Policy to the Role. Click Next and add tags if needed before moving to Review page. On the review page add the Role name, confirm the Trusted entities and click on create role.

  • Once the role has been created, attach an inline policy by clicking on “Attach policies” to add permissions for the S3 bucket –
{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "s3:Get*",
                "s3:List*",
                "s3:Put*",
                "s3:DeleteObject",
                "s3:AbortMultipartUpload"
            ],
            "Resource": [
                "arn:aws:s3:::fleetdb-s3",
                "arn:aws:s3:::fleetdb-s3/*"
            ]
        }
    ]
}
  • Add “glue.amazonaws.com” service to the Trust Relationship.
{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Sid": "",
      "Effect": "Allow",
      "Principal": {
        "Service": [
          "glue.amazonaws.com",
          "rds.amazonaws.com"
        ]
      },
      "Action": "sts:AssumeRole"
    }
  ]
}

AWS_S3

aws_s3 is PostgreSQL extension that Aurora PostgreSQL provides to export data into files in an Amazon S3 bucket. These files written to S3 are encrypted using server-side encryption (SSE-KMS) by default. You can write the data in S3 in text, csv or binary format.

  • Connect to the PostgreSQL database and install the aws_s3 PostgreSQL extension. aws_s3 extension provides aws_s3.query_export_to_s3 function that we will use to export the data to Amazon S3.
admin@fleetdb # CREATE EXTENSION IF NOT EXISTS aws_s3 CASCADE;
NOTICE:  extension "aws_s3" already exists, skipping
CREATE EXTENSION
Time: 13.903 ms
admin@fleetdb # 
  • Attache the IAM Role to grant Aurora PostgreSQL DB Cluster permissions to access the Amazon S3 bucket.

AWS Console > RDS > Databases > Select your cluster from DB Identifier > Connectivity & security > Manage IAM roles

  • Once the status changes to “Active”, login to the PostgreSQL database.
  • Create s3_uri which will contain the configurations – S3 bucket location, File name, region – to be used during the export.
admin@fleetdb # SELECT aws_commons.create_s3_uri(
   'fleetdb-s3',
   'fleetdb/admin/article/article_unload',
   'us-east-1'
) AS s3_uri_1 \gset

admin@fleetdb # \echo :s3_uri_1
(fleetdb-s3,fleetdb/admin/article/article_unload,us-east-1)
admin@fleetdb # 
  • Call aws_s3.query_export_to_s3 to unload the data Amazon S3.
admin@fleetdb # SELECT * FROM aws_s3.query_export_to_s3('SELECT * FROM article', :'s3_uri_1', options :='format csv, delimiter $$:$$, HEADER true');

  • Few things to note in above SQL
    • The files will be exported in CSV format
    • The delimiter used is “:”
    • The CSV files will have headers
  • It took 4106077.448 ms (01:08:26.077) to complete the export of the full table. It created 15 CSV files in S3 each of ~6GB. Though I have only specified the file name to be ‘article_unload’, the additional file names have prefix _partXX appended. The XX represents the numbers.
+---------------+----------------+----------------+
| rows_uploaded | files_uploaded | bytes_uploaded |
+---------------+----------------+----------------+
|    1000000000 |             15 |    91893000035 |
+---------------+----------------+----------------+
(1 row)

Time: 4106077.448 ms (01:08:26.077)
  • During the export, the CPU utilization was up by 30-35% and Read IOPs by 4,000.

AWS Glue ETL

AWS Glue is a fully managed ETL service to load large amounts of datasets from various sources for analytics and data processing with Apache Spark ETL jobs. For this scenario we will use AWS Glue service to read the data from Aurora PostgreSQL database and write to Amazon S3.

  • Create a AWS Glue connection to store connection information of Aurora PostgreSQL. In this case the database is running in a private subnet. You can follow the below screenshots to create the AWS Glue connection for any database having JDBC URL and running in a private subnet.

AWS Glue > Data catalog > connections > Add connection

  • Review all the details and click Finish. Then test the connection and make sure it is successful.
  • Few things to note while creating the AWS Glue connection –
    • As the database is running in private subnet, you will need to choose private subnet within your VPC when creating the Glue connection.
    • The private subnet that you choose should be part of route table which has NAT Gateway attached.
    • The NAT Gateway should be attached to a public subnet.
    • The difference between public and private subnet is that the instances in the public subnet can send outbound traffic directly to the Internet whereas private subnet can access the Internet by using a network address translation (NAT) gateway that resides in the public subnet.

  • Add a new AWS Glue job. Enter the name for the Glue job and choose “A new script to be authored by you” and in the end of the page under Catalog options (optional) select “Use Glue data catalog as the Hive metastore” before clicking Next. On the next page choose connections, which in this case will be the connection that you created earlier as it will be required by this job. Once done click on ‘Save job and edit script’.

AWS Console > AWS Glue > ETL > Jobs > Add job

  • With the default settings the AWS Glue job will have Worker type – G.1X (Recommended for memory-intensive jobs) with 10 workers. In G.1X each worker maps to 1 DPU (4 vCPU, 16 GB of memory, 64 GB disk), and provides 1 executor per worker.
  • Below is the AWS Glue PySpark script to read the data from Aurora PostgreSQL and write it to Amazon S3-
    • Import required libraries
    • Read the data from admin.article table into a Glue Dynamic Frames using create_dynamic_frame_from_options
    • Write the data to S3 in glueparquet format.
    • The enableUpdateCatalog set to True indicate that the Data Catalog is to be updated during the job run as the new partitions are created.
    • The updateBehavior set to UPDATE_IN_DATABASE indicates to overwrite the schema and add new partitions in the Data Catalog during the job run.
    • To create a new table in Glue catalog specify the database and new table name using setCatalogInfo along with enableUpdateCatalog and updateBehavior parameter set.
import sys
import os
import logging

from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

from pyspark import SparkConf

## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)

logging.basicConfig(level=os.environ.get("LOGLEVEL", "INFO"),
                    format='[%(filename)s:%(lineno)s - %(funcName)10s() ] %(asctime)-15s %(message)s',
                    datefmt='%Y-%m-%d:%H:%M:%S')

logger = logging.getLogger(__name__)

logger.info('Create dynamicframe by reading data from admin.article table in fleetdb')

db_username = "admin"
db_password = "xxxxxxxxxxxxx"
table_name = "article"

#define connection options
connection_options = {  
    "url": "jdbc:postgresql://cluster-fleetdb.cluster-xxxxxxxxx.us-east-1.rds.amazonaws.com:8192/fleetdb",
    "dbtable": table_name,
    "user": db_username,
    "password": db_password,
}


#glue dynamicframe
datasource0 = glueContext.create_dynamic_frame_from_options(
    connection_type="postgresql", 
    connection_options=connection_options,
    transformation_ctx = "datasource0"
    )

logger.info('Writing data to S3')
dataSink0 = glueContext.getSink(
    path = "s3://fleetdb-s3/glue/admin/articles/", 
    connection_type = "s3", 
    updateBehavior = "UPDATE_IN_DATABASE",  
    partitionKeys = [], 
    compression = "snappy", 
    enableUpdateCatalog = True, ##Data Catalog is to be updated during the job run
    transformation_ctx = "DataSink0"
)

dataSink0.setCatalogInfo(catalogDatabase = "fleetdb",catalogTableName = "articles") ##create a new table in Glue catalog
dataSink0.setFormat("glueparquet")
dataSink0.writeFrame(datasource0)

logger.info('Completed writing data to S3')

job.commit()
  • In my case with all the default settings the Glue job failed after 2 hrs with ‘No space left on device’. The job ran with 10 workers of G.1X worker type.
An error occurred while calling o101.pyWriteDynamicFrame. error while calling spill() on org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter@7330172e : No space left on device

Cloudwatch metrics

  • Observations
    • The Glue job failed with No space left on device which indicate that the worker’s local disks are filling up. In spark, worker’s local disk is used for storing intermediate shuffle files and RDD persistence.
    • If you watch the Cloudwatch metric, only 1 worker is active at a given point in time.
    • Aurora PostgreSQL database had only 1 session connected using JDBC driver.
+-------+---------+------------------------+---------------------+------------------+---------+------------------------+
|  pid  | datname |    application_name    |        state        |       age        | usename |       substring        |
+-------+---------+------------------------+---------------------+------------------+---------+------------------------+
| 29189 | fleetdb | PostgreSQL JDBC Driver | idle in transaction | -00:00:00.002794 | admin   | SELECT * FROM article  |
+-------+---------+------------------------+---------------------+------------------+---------+------------------------+
  • How can we unload 100Gb table into Amazon S3 using AWS Glue ETL –
    • What if, we try with G.1X with 30 DPUs?
      • Well, increasing the DPU would mean more memory, more threads, more disk space. But still there is 1 thread to query/read the data from database and in the end failed with the same error of ‘No space left on device’
    • What if, we try with G.2X because each worker maps to 2 DPU (8 vCPU, 32 GB of memory, 128 GB disk), and provides 1 executor per worker. This mean G.2X has double the space.
      • Even with G.2X the job run failed with ‘An error occurred while calling o88.pyWriteDynamicFrame. No space left on device.’. But this time the job ran for ~4hrs before failing.

So what can be done to unload a table of 100Gb to Amazon S3 using AWS Glue?

  • With Glue 2.0 you can decouple the shuffle storage from the workers by using Amazon S3 to store the shuffle data. To do so you need to set 2 parameters in AWS Glue Job –
    • –write-shuffle-files-to-s3 : true – This flag tells Spark to use the Amazon S3 bucket for writing/reading shuffle data.
    • –conf : spark.shuffle.glue.s3ShuffleBucket=s3://bucket-name/prefix – The Amazon S3 bucket where we write the shuffle files. The bucket needs to be in the same region where the job is being executed.
  • Read the table in parallel – To do so you can use either hashfield or hashexpression and hashpartitions parameter in JDBC connection options.
    • From the Glue Document –
      • hashfield – Set hashfield to the name of a column in the JDBC table to be used to divide the data into partitions. For best results, this column should have an even distribution of values to spread the data between partitions. This column can be of any data type. AWS Glue generates non-overlapping queries that run in parallel to read the data partitioned by this column. 
      • hashexpression – Set hashexpression to an SQL expression (conforming to the JDBC database engine grammar) that returns a whole number. A simple expression is the name of any numeric column in the table. AWS Glue generates SQL queries to read the JDBC data in parallel using the hashexpression in the WHERE clause to partition data.
      • hashpartitions – Set hashpartitions to the number of parallel reads of the JDBC table. If this property is not set, the default value is 7.
  • Optimize the memory for driver and executor process. By default the driver and executor process use 1Gb of memory. Given that we are using G.1X worker type it maps to 16Gb of memory available. We will set the memory configurations spark.driver.memory and spark.executor.memory to 12Gb.
import sys
import os
import logging

from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

from pyspark import SparkConf

## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])
conf = SparkConf()
conf.set("spark.driver.memory", "12g")\  ##Amount of memory to use for the driver process
    .set("spark.executor.memory", "12g")\ ##Amount of memory to use per executor process

sc = SparkContext(conf=conf)
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)

#print(sc.getConf().getAll())

logging.basicConfig(level=os.environ.get("LOGLEVEL", "INFO"),
                    format='[%(filename)s:%(lineno)s - %(funcName)10s() ] %(asctime)-15s %(message)s',
                    datefmt='%Y-%m-%d:%H:%M:%S')

logger = logging.getLogger(__name__)

logger.info('Create dynamicframe by reading data from admin.article table in fleetdb')

db_username = "admin"
db_password = "xxxxxxxxx"
table_name = "article"

#define connection options
connection_options = {  
    "url": "jdbc:postgresql://cluster-fleetdb.cluster-xxxxxxxxxxx.us-east-1.rds.amazonaws.com:8192/fleetdb",
    "dbtable": table_name,
    "user": db_username,
    "password": db_password,
    "hashexpression": "id", ##use the numeric column `id` to read data partitioned.
    "hashpartitions": 7 ##number of parallel reads of the JDBC table
}


#glue dynamicframe
datasource0 = glueContext.create_dynamic_frame_from_options(
    connection_type="postgresql", 
    connection_options=connection_options,
    transformation_ctx = "datasource0"
    )

logger.info('Writing data to S3')
dataSink0 = glueContext.getSink(
    path = "s3://fleetdb-s3/glue/admin/articles/", 
    connection_type = "s3", 
    updateBehavior = "UPDATE_IN_DATABASE", 
    partitionKeys = [], 
    compression = "snappy", 
    enableUpdateCatalog = True, ##Data Catalog is to be updated during the job run
    transformation_ctx = "DataSink0"
)

dataSink0.setCatalogInfo(catalogDatabase = "fleetdb",catalogTableName = "articles") ##create a new table in Glue catalog
dataSink0.setFormat("glueparquet")
dataSink0.writeFrame(datasource0)

logger.info('Completed writing data to S3')

job.commit()
  • As we had set hashexpression=id and hashpartitions=7, the PostgreSQL database had 7 sessions running SELECT sql to fetch the data.
+-------+---------+------------------------+---------------------+------------------+---------+--------------------------------------------------------------------+
|  pid  | datname |    application_name    |        state        |       age        | usename |                             substring                              |
+-------+---------+------------------------+---------------------+------------------+---------+--------------------------------------------------------------------+
| 14482 | fleetdb | PostgreSQL JDBC Driver | idle in transaction | -00:00:00.003484 | admin   | SELECT * FROM (select * from article WHERE id % 7 = 5) as article  |
| 25066 | fleetdb | PostgreSQL JDBC Driver | idle in transaction | -00:00:00.003587 | admin   | SELECT * FROM (select * from article WHERE id % 7 = 6) as article  |
| 14485 | fleetdb | PostgreSQL JDBC Driver | active              | -00:00:00.006084 | admin   | SELECT * FROM (select * from article WHERE id % 7 = 3) as article  |
| 14486 | fleetdb | PostgreSQL JDBC Driver | active              | -00:00:00.007741 | admin   | SELECT * FROM (select * from article WHERE id % 7 = 4) as article  |
| 31745 | fleetdb | PostgreSQL JDBC Driver | active              | -00:00:00.009476 | admin   | SELECT * FROM (select * from article WHERE id % 7 = 2) as article  |
| 31732 | fleetdb | PostgreSQL JDBC Driver | active              | -00:00:00.010721 | admin   | SELECT * FROM (select * from article WHERE id % 7 = 1) as article  |
| 31747 | fleetdb | PostgreSQL JDBC Driver | idle in transaction | -00:00:24.181482 | admin   | SELECT * FROM (select * from article WHERE id % 7 = 0) as article  |
+-------+---------+------------------------+---------------------+------------------+---------+--------------------------------------------------------------------+
(7 rows)

Time: 2.810 ms
  • The AWS Glue job completed in 1hr 4mins. The Memory and CPU profile of the driver and executor process shows all threads being used. Also you will notice the data shuffle across executors.

  • As we had set –write-shuffle-files-to-s3 : true, Spark used the Amazon S3 bucket for writing the shuffle data. All 7 threads [0-6] have the *.data file of 12 GB each written to Amazon S3.
  • The Glue job wrote the file in glueparquet format to Amazon S3.
  • The Glue job also created the “articles” table under “fleetdb” database in Glue catalog which can be queried from Athena service.

Conclusion

In this post, I highlighted how you can use aws_s3 and AWS Glue service to export data from non-partitioned big table in Aurora PostgreSQL to Amazon S3. We worked on troubleshooting the AWS Glue job failures due to ‘No space left on device’ error. We explored how can we decouple the shuffle storage from the workers by using Amazon S3 to store the shuffle data. Additionally we also looked at how we can read the Aurora PostgreSQL table in parallel when using JDBC connection in AWS Glue. We updated the default memory configuration for Spark driver and executor for better performance. We also explored various datasink parameters which indicated that the AWS Glue Data Catalog is to be updated during the job run. Using all these mechanism we were able to optimize the AWS Glue job and run the job successfully to export the data to Amazon S3.

References –

https://docs.aws.amazon.com/glue/latest/dg/run-jdbc-parallel-read-job.html

https://docs.aws.amazon.com/glue/latest/dg/add-job.html

https://docs.aws.amazon.com/glue/latest/dg/update-from-job.html

https://docs.aws.amazon.com/glue/latest/dg/aws-glue-api-crawler-pyspark-extensions-glue-context.html

Advertisement

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s