HiveBrain v1.2.0
Get Started
← Back to all entries
patternpythonMinor

Creating a Kinesis stream, Firehose, and lambda event source from config file

Submitted by: @import:stackexchange-codereview··
0
Viewed 0 times
streamfilecreatingsourcefirehoseconfigandfromeventlambda

Problem

I'm in the process of writing a python script for automating a data ingestion pipeline using Amazon Web Service's Kinesis stream, Firehose and lambda. This is my first stab at writing Python, but I do have some experience with JavaScript and Node.js.

Scripts are organized as follows:

  • build_pipeline.py - this is where all the logic lives



  • aws_pipeline.py - this is where my corresponding classes live that called within build_pipeline.py



  • config.json - this is a JSON file that has some configs for the stream name, etc.



Pipeline process:

  • Run script like so: python build_pipeline.py config.json



  • Script checks to make sure config file exists and then reads the file so we can access the JSON properties.



  • Set local variables based on JSON properties



  • Initiate classes for s3 (for storing config), stream, and Firehose



  • Upload file to s3 folder



  • Check if the stream exists, if not, create and add tags



  • Check if the Firehose exists, if not, create



  • Get stream ARN and pass into lambda class and initialize



  • Check if lambda event source exists, if not, create event source



aws_pipeline.py

```
import boto3
import os

class Stream(object):
'''
This a Class for Kinesis Stream
'''

CLIENT = boto3.client('kinesis')
SHARD_COUNT = 2

def __init__(self, stream_name):
self.stream_name = stream_name

def list(self):
'''
Gets stream list
'''

try:
print 'GET: Kinesis Stream list'
stream_list = self.CLIENT.list_streams()['StreamNames']

print 'DONE: Kinesis Stream List returned'
return stream_list

except Exception as ex:
print ex
raise ex

def create(self):
'''
Creates a stream
'''
try:
print 'CREATE: Stream'
self.CLIENT.create_stream(
StreamName=self.stream_name,
ShardCount=self.SHARD_COUNT
)
pr

Solution

Naming

I don't think your names are very descriptive. You call your class Stream, for example, and then have to explain in the docstring that it is a Kinesis Stream. Just call the class KinesisStream and then remove the docstring (well, ideally give it a better docstring). This can be repeated for all of those classes. Lambda is especially confusing, given that lambda functions already exist in Python. I don't know enough about the usecase and the system to suggest a better name, but you should come up with one.

Exception handling and logging

In my mind there are two types of exception handling - application exceptions, and function exceptions (afaik this is all my own terminology - there might be an official name but I don't know it). Function exceptions include the specific exceptions you plan for and can handle (or know to pass up appropriately) inside of the function. They are targeted (meaning they encapsulate the fewest liens possible) and specific. They might look something like this:

def to_int(thing):
    try:
        return int(thing)
    except TypeError as e:
        do_something()
        return -1
    except ValueError as e:
        do_something_else()
        return -1


Here you know what specific exception you're looking for, and you can handle everything inside of your function.

This also includes things like passing a (hopefully specific) exception up to something that is expected to handle it.

Application exceptions are the catchall, log-em and pray type exceptions where you want to know that something bad happened, but you weren't able to handle it inside the code. This is what you do, everywhere. There are a few problems here. The first is that you're going to end up printing the exception multiple times if any of your functions end up calling one another. The second is that you're going to end up adding some unnecessary stuff to your traceback. The third is that it makes the code much harder to read - normally seeing exception handling gives you an idea of what to expect from some code, but seeing except Exception as ex doesn't tell me anything - just that something could go wrong. The fourth is that your logging doesn't really add much value because you just print to stdout.

I'd just remove all of that error handling and move it up a level. If you know about specific exceptions that you want some specific logging or behavior for, those are great. Otherwise its just noise.

WRT logging - printing to stdout barely qualifies as logging. I'd recommend either using the logging module, a third party module, or roll your own. If you so desire you can configure your logger to just print to stdout, but you shouldn't default to that. Additionally, the messages you're logging are pointless - they just say "I called this function" and "this function is done". You know that just by looking at the code. Remove all of them (or if you keep them, then set them to the lowest priority level of your logger).

Docstrings

All of your docstrings are pretty much worthless. Use an existing style guide (I like numpydoc, although Sphinx's RST format is probably slightly more common) to help guide what you should write, and then rewrite them to give useful information. If the name of the function gives you everything you need to know about it, which may sometimes be true, then pat yourself on the back - you've written a self-describing function. In these cases, however, you probably still want to have a class-level docstring explaining overall purpose and workflow with this class.

CLI and CLA

Instead of rolling your own sys.argv parser (which is admittedly easy in this case) you're better off using argparse or a third party library. They'll be easier to use if you add more complexity in the future, and more immediately understandable to fellow Python developers. I haven't made those changes in my code below, but they should be straightforward.

Configuration

Right now you're able to handle your config file with two simple functions and some local variables. That's fine, but if this expands in complexity you'll probably want a dedicated class to handle interfacing with the config file.

Overall nitpickyness

You sometimes make local variables just to have them. This can be nice for readability, but it is often unnecessary. You also have comments like set various values that very clearly add no information - dump those as well.

In main you check if the stream/firehose exist, then make it there. I'd rather see logic like that in the constructor - your main should be as dumb as possible.

Lastly, you should always use an if __name__ == '__main__' block to hold your main function, just in case.

Here is your rewritten code for aws_pipeline - I didn't make all of the changes mentioned above, but it should be a good start.

```
import boto3
import os

class KinesisStream(object):

CLIENT = boto3.client('kinesis')
SHARD_COUNT = 2

def __init__(self, stream_nam

Code Snippets

def to_int(thing):
    try:
        return int(thing)
    except TypeError as e:
        do_something()
        return -1
    except ValueError as e:
        do_something_else()
        return -1
import boto3
import os

class KinesisStream(object):

    CLIENT = boto3.client('kinesis')
    SHARD_COUNT = 2

    def __init__(self, stream_name):
        self.stream_name = stream_name

    def list(self):
        return self.CLIENT.list_streams()['StreamNames']

    def create(self):
        self.CLIENT.create_stream(
            StreamName=self.stream_name,
            ShardCount=self.SHARD_COUNT
            )

    def add_tags(self):
        self.CLIENT.add_tags_to_stream(
            StreamName=self.stream_name,
            Tags=self.build_tags()
            )

    def build_tags(self):
        return {
            'BUSINESS_REGION': 'NORTHAMERICA',
            'BUSINESS_UNIT': 'DATASERVICES',
            'CLIENT': 'NONE',
            'ENVIRONMENT': 'POC',
            'NAME': self.stream_name,
            'PLATFORM': 'ATLAS'
        }

    def get_arn(self):
        return self.CLIENT.describe_stream(
            StreamName=self.stream_name
            )['StreamDescription']['StreamARN']

class KinesisFirehose(object):
    CLIENT = boto3.client('firehose')

    def __init__(self, firehose_name, bucket_name, prefix_name):
        self.firehose_name = firehose_name
        self.bucket_name = bucket_name
        self.prefix_name = prefix_name

    def list(self):
        return self.CLIENT.list_delivery_streams()['DeliveryStreamNames']

    def create(self):
        return self.CLIENT.create_stream(
            DeliveryStreamName=self.firehose_name,
            S3DestinationConfiguration=self.config
            )

    def config(self):

        return {
            'RoleARN': 'arn:aws:iam::123456789:role/example_role',
            'BucketARN': 'arn:aws:s3:::' + self.bucket_name,
            'Prefix': self.prefix_name,
            'BufferingHints': {
                'SizeInMBs': 128,
                'IntervalInSeconds': 900
            },
            'CompressionFormat': 'Snappy',
            'EncryptionConfiguration': {
                'NoEncryptionConfig': 'NoEncryption'
            },
            'CloudWatchLoggingOptions': {
                'Enabled': True,
                'LogGroupName': '/aws/kinesisfirehose/' + self.firehose_name,
                'LogStreamName': 'S3Delivery'
            }
        }


class S3(object):
    RESOURCE = boto3.resource('s3')
    CONFIG_FILE_BUCKET = 'avrotest'
    CONFIG_FILE_PREFIX = 'lambda-configs/'

    def __init__(self, config_file):
        self.file = config_file

    def upload_file_to_config_folder(self):
        self.RESOURCE.meta.client.upload_file(
            os.path.realpath(self.file),
            self.CONFIG_FILE_BUCKET, 
            self.CONFIG_FILE_PREFIX + self.file
            )

class Lambda(object):

    CLIENT = boto3.client('lambda')

    def __init__(self, stream_arn, function_name):
        self.stream_arn = stream_arn
        self.function_name = function_name

    def event_source_list(self):
        return self.CLIENT.list_event_source_mappings(
            EventSourceArn=self.

Context

StackExchange Code Review Q#133534, answer score: 6

Revisions (0)

No revisions yet.