Building S3 Data Pipelines with Python and Boto3

In my last post I outlined a number of architectural options for solutions that could be implemented in light of Microsoft retiring SQL Server 2019 Big Data Clusters, one of which was data pipelines that leverage Python and Boto 3. Before diving into these things in greater detail, lets take a recap on what S3 is.

S3 101

S3, or simple storage service to give it its full name, was one of AWS’s earliest storage services. S3 is an object storage platform on which files and objects are stored in ‘Buckets’, a storage container in other words. A bucket is accessed via an access key id / secret access key pair, and buckets can in turn contain folders. When using S3 on AWS, data is made instantly highly available across regions. When S3 was originally developed, it was largely aimed at static content and as such it because popular as the storage technology of choice for serving static web content. Over time S3 has become a popular choice for data lakes and analytics platforms. S3 can now be found in a variety of places other than AWS, including on-premises storage devices, GCP and on various software defined storage platforms. S3 is accessed via a REST API and objects are created using PUT and GET semantics.

AWS provides the “Gold standard” for S3, readers of this blog post should be aware that other parties that might support S3 may or may not support:

  • The full S3 API
  • The full S3 security model – which is ACL based
  • Full AWS S3 feature parity

Tools for Working with S3

Three of my favourite tools for working with S3 include:

  • s5cmd
    This is a command line tool written in GOLANG, I like this because its free, easy to use and fast as per this blog post which I have borrowed the chart below from. To use s5cmd, install the tool in the first instance per the instructions in this GitHub repo , and then create a file called credentials under an .aws directory in your home directory containing your access key id and access key secret:
[default]
aws_access_key_id = YOURACCESSKEYIDGOESHERE
aws_secret_access_key = YOURSECRETACCESSKEYGOESHERE
  • Cyberduck – an S3 object browser in GUI form
  • Boto 3 – an AWS SDK in Python package form which is covered below, simply install this using pip as you would with any python package

Boto3

Boto 3 is the Python SDK for AWS, the bit that we are interested specifically is that which is used for S3. AWS provide some simple examples of how to use Boto 3 for S3 here, there are also lots of good coding examples over on stackoverflow. There are two ways of leveraging the S3 API via Boto 3:

1. A client
As illustrated in the code excerpt below, a client provides access to the full S3 API surface area.

import logging
import boto3
from botocore.exceptions import ClientError

def create_bucket(bucket_name, region=None):
    """Create an S3 bucket in a specified region

    If a region is not specified, the bucket is created in the S3 default
    region (us-east-1).

    :param bucket_name: Bucket to create
    :param region: String region to create bucket in, e.g., 'us-west-2'
    :return: True if bucket created, else False
    """

    # Create bucket
    try:
        if region is None:
            s3_client = boto3.client('s3')
            s3_client.create_bucket(Bucket=bucket_name)
        else:
            s3_client = boto3.client('s3', region_name=region)
            location = {'LocationConstraint': region}
            s3_client.create_bucket(Bucket=bucket_name,
                                    CreateBucketConfiguration=location)
    except ClientError as e:
        logging.error(e)
        return False
    return True

2. Resource
This is a higher level object oriented API that does not provide access to the full S3 API surface area. The code excerpt below illustrates establishing a connection to an S3 endpoint via a resource.

def dump_to_s3(csv_buf, aws_endpoint, bucket, aws_key, aws_secret):
    s3 = boto3.resource(service_name          = 's3',
                        use_ssl               = False,
                        aws_access_key_id     = aws_key,
                        aws_secret_access_key = aws_secret,
                        endpoint_url          = aws_endpoint)

    csv_name = str(uuid.uuid4())
    s3.Object(os.environ['BUCKET'], csv_name).put(Body=csv_buf)

3. Session
Doing anything with boto ultimately requires a session and the client and resource APIs will create these for you under the covers. However, if there is a requirement to use different credentials in the same code, the session API is the way to go.

Supplying Credentials To Boto 3

Credentials can be supplied to Boto 3 in a variety of ways:

  1. Passing credentials as parameters in the boto.client() method
  2. Passing credentials as parameters when creating a Session object
  3. Environment variables
  4. Shared credential file (~/.aws/credentials)
  5. AWS config file (~/.aws/config)
  6. Assume Role provider
  7. Boto2 config file (/etc/boto.cfg and ~/.boto)
  8. Instance metadata service on an Amazon EC2 instance that has an IAM role configured.

Note that anything which relates to IAM (AWS Identity and Access Management) is only relevant to S3 on AWS. For the purposes of this post, credentials will be specified via environment variables.

A Simple Data Pipeline Example

When I set out creating this content, I wanted to achieve two goals:

  1. Come up with data pipelines that could illustrate the use of SQL Server 2022 object virtualisation, i.e. place some data inside a bucket in an ‘Interesting’ way and then leverage SQL Server 2022’s enhanced polybase capabilities on it.
  2. Build some pipelines to illustrate the usage of Portworx Data Services, a database-as-a-service platform built on top of Portworx and Kubernetes.

With this in mind, I came up with the following code to harvest tweets via Twitter’s tweepy API, sentiment score each tweet using the nltk package and finally store each tweet in .csv file form in an S3 bucket.

#!/usr/bin/env python

from sys import prefix
import tweepy
import pandas as pd
import numpy  as np
import boto3
import nltk
import uuid
import os

from tweepy.tweet import Tweet
from nltk.sentiment.vader import SentimentIntensityAnalyzer

def get_tweets(bearer_token, query, max_tweets=1000):
    client = None
    tweets = None

    client = tweepy.Client(bearer_token=bearer_token)
    tweets = tweepy.Paginator(client.search_recent_tweets,
                              query,
                              max_results=100).flatten(limit=max_tweets)
    return tweets

def build_tweet_dataframe(tweets):
    tweet_list =[]
    nltk.download('vader_lexicon', quiet=True)
    for tweet in tweets:
        tweet_id = tweet.id 
        text = tweet.text 
        score = SentimentIntensityAnalyzer().polarity_scores(text)
        tweet_list.append({'tweet_id'  : tweet_id, 
                           'text'      : text, 
                           's_negative': score['neg'],
                           's_neutral' : score['neu'], 
                           's_positive': score['pos'], 
                           's_compound': score['compound']})
    # create dataframe   
    df = pd.DataFrame(tweet_list, columns=['tweet_id',
                                           'text',
                                           's_negative',
                                           's_neutral',
                                           's_positive',
                                           's_compound'])
    return df

def dump_to_s3(csv_buf, aws_endpoint, bucket, aws_key, aws_secret):
    s3 = boto3.resource(service_name          = 's3',
                        use_ssl               = False,
                        aws_access_key_id     = aws_key,
                        aws_secret_access_key = aws_secret,
                        endpoint_url          = aws_endpoint)

    csv_name = str(uuid.uuid4())
    s3.Object(os.environ['BUCKET'], csv_name).put(Body=csv_buf)

def main():
    tweets = get_tweets(os.environ['BEARER_TOKEN'],os.environ['TWITTER_QUERY'])
    df = build_tweet_dataframe(tweets)
    chunks = len(df)/int(os.environ['MAX_TABLE_SIZE'])
    df_split = np.array_split(df, chunks)
    for d in df_split:
        to_csv = d.to_csv(index=False, header=False)
        dump_to_s3(to_csv, os.environ['ENDPOINT_URL'], os.environ['BUCKET'],
                   os.environ['AWS_ACCESS_KEY_ID'], os.environ['AWS_SECRET_ACCESS_KEY'])

if __name__ == "__main__":
    main()

In order to use the code you will require:

  • Python (obviously)
  • The python package manager – pip
  • The following Python packages – which need to be explicitly imported using pip:
    boto3 pandas nltk tweepy uuid numpy
  • A bearer token for accessing the twitter API via tweepy, this requires that you sign up for a twitter developer account , the code picks this up from the BEARER_TOKEN environment variable.
  • An S3 bucket, specify the name of this in the BUCKET environment variable
  • A S3 endpoint, specified in the ENDPOINT_URL environment variable, at the time of writing I have used an on-premises storage device (Pure Storage FlashBlade) for testing this, this code should work with any S3 compatible storage device or software defined storage platform, I will in due course get around to modifying the code such that it will work with AWS S3
  • The access key id and secret access key secret for accessing the S3 bucket, these should be specified in AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY variables respectively
  • A twitter query to be specified in the TWITTER_QUERY environment variable, refer to
    twitter’s developer guide for instructions on how to specify queries
  • The maximum number of tweets per file, specify this in the MAX_TABLE_SIZE environment variable

Next Time

The next post in this series will focus on containerising the code and executing the resulting container inside an Argo workflow.

2 thoughts on “Building S3 Data Pipelines with Python and Boto3

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s