I had a use case to read data (few columns) from parquet file stored in S3, and write to DynamoDB table, every time a file was uploaded. Thinking to use AWS Lambda, I was looking at options of how to read parquet files within lambda until I stumbled upon AWS Data Wrangler.
From the document –
What is AWS Data Wrangler?
An open-source Python package that extends the power of Pandas library to AWS connecting DataFrames and AWS data related services (Amazon Redshift, AWS Glue, Amazon Athena, Amazon EMR, etc).
Built on top of other open-source projects like Pandas, Apache Arrow, Boto3, s3fs, SQLAlchemy, Psycopg2 and PyMySQL, it offers abstracted functions to execute usual ETL tasks like load/unload data from Data Lakes, Data Warehouses and Databases.
Quick demo of use-case
Source : Data stored in Parquet file on S3
+------+----------+-------------------+-----+----------+-------------+
|change| comments| last_updated|price| sector|ticker_symbol|
+------+----------+-------------------+-----+----------+-------------+
| 1|VVS Ticker|2020-04-01 18:15:05| 14| ENERGY| VVS|
| 0|VVY Ticker|2020-04-02 18:15:05| 12|HEALTHCARE| VVY|
| 9|TGT Ticker|2020-04-03 18:15:05| 58| RETAIL| TGT|
| 11|VVY Ticker|2020-04-04 18:15:05| 48|HEALTHCARE| VVY|
| 3|WSB Ticker|2020-04-05 18:15:05| 147| FINANCIAL| WSB|
| 7|KFU Ticker|2020-04-06 18:15:05| 33| ENERGY| KFU|
| 6|VVS Ticker|2020-04-07 18:15:05| 93| ENERGY| VVS|
| 91|SLW Ticker|2020-04-08 18:15:05| 35| ENERGY| SLW|
| 78|ALY Ticker|2020-04-09 18:15:05| 7| ENERGY| ALY|
+------+----------+-------------------+-----+----------+-------------+
Destination : Record stored in Dynamodb table. It consist of “ticker_symbol”, “sector” and “last_updated (in epoch time)” data from parquet file , with extra attribute “location” specifying the S3 file containing the particular record and “source” with default value of system.
- Created a Lambda function and attached IAM role with proper permissions to read from s3, put items into dynamodb and AWSLambdaBasicExecutionRole policy.
2. Created Lambda layer for AWS Data Wrangler
-
- Go to GitHub’s release section and download the layer zip related to the desired version. In my case, I downloaded awswrangler-layer-1.0.1-py3.8.zip .
- In AWS Lambda Panel, open the layer section (left side) and click create layer.
- Set name and python version, upload your fresh downloaded zip file and press create to create the layer.
- Go to your Lambda and select your new layer!
3. Created the function code, with few highlights
-
- Read the parquet file (specified columns) into pandas dataframe.
- Convert pandas dataframe column with Timestamp datatype to epoch time in number for record to be stored in dynamodb.
- Convert the final pandas record dataframe to return list of dictionary.
- Write the records to dynamodb.
import os import json import boto3 import logging import urllib.parse import awswrangler as wr import pandas as pd from datetime import datetime from botocore.exceptions import ClientError logger = logging.getLogger(__name__) def s3_url(bucket, prefix, folder=False): """ Produce s3 url from 'bucket' and 'prefix' """ assert bucket and prefix return "s3://{bucket}/{prefix}{folder}".format( bucket=bucket, prefix=prefix, folder="/" if folder else "") def read_parquet(s3_path, columns, source, dataset=True): """ Read Apache Parquet file(s) from a received S3 prefix or list of S3 objects paths. Convert the last_updated column from Timestamp to Epoch time. Return records as list of dictionary. """ assert source, "source can't be None" df = wr.s3.read_parquet(path=s3_path, columns=columns, dataset=dataset) df[["last_updated"]] = (df[["last_updated"]] - pd.Timestamp("1970-01-01")) // pd.Timedelta('1s') df["source"] = source df["location"] = s3_path return df.to_dict(orient='records') def write_to_ddb(client, table_name, records): """ Write data to DynamoDB table. Returns inserted row count. """ table = client.Table(table_name) row_count = 0 for row in records: table.put_item(Item=row) row_count += 1 return row_count def lambda_handler(event, context): record = event["Records"][0] assert "eventSource" in record assert "eventName" in record assert record["eventSource"] == "aws:s3", "Unknown event source: %s." % record["eventSource"] assert record["eventName"].startswith( "ObjectCreated:"), "Unsupported event name: %s." % record["eventName"] assert "s3" in record assert "bucket" in record["s3"] assert "name" in record["s3"]["bucket"] assert "object" in record["s3"] assert "key" in record["s3"]["object"] bucket = record["s3"]["bucket"]["name"] # If S3 URL contains special character usch as '=', it will be quoted, like: %3D # This is to unquote them back to original character, or it will complain path not exist. key = urllib.parse.unquote(record["s3"]["object"]["key"]) #create s3 path s3_path = s3_url(bucket=bucket, prefix=key) # Retrieving the data directly from Amazon S3 cols = ["ticker_symbol", "sector", "last_updated"] df = read_parquet(s3_path=s3_path, columns=cols, source="system", dataset=True) #Instantiate Dynamodb connection ddb = boto3.resource("dynamodb") #Get the dynamodb table name ddb_table = os.environ["DDB_TABLE"] #Write data to dynamodb record_count = write_to_ddb( client=ddb, table_name=ddb_table, records=df) #return total record inserted return("Total records inserted:", record_count)
4. Added S3 Event type: ObjectCreated as trigger. So every time a new file is uploaded to S3, the trigger gets fired invoking the lambda function to read the parquet file and write the data to dynamodb table.
Hope this helps!
Reference:-