Tag: API

Python and AWS Lambda – A match made in heaven

In recent months, I’ve begun moving some of my analytics functions to the cloud. Specifically, I’ve been moving them many of my python scripts and API’s to AWS’ Lambda platform using the Zappa framework.  In this post, I’ll share some basic information about Python and AWS Lambda…hopefully it will get everyone out there thinking about new ways to use platforms like Lambda.

Before we dive into an example of what I’m moving to Lambda, let’s spend some time talking about Lambda. When I first heard about, I was a confused…but once I ‘got’ it, I saw the value. Here’s the description of Lambda from AWS’ website:

AWS Lambda lets you run code without provisioning or managing servers. You pay only for the compute time you consume – there is no charge when your code is not running. With Lambda, you can run code for virtually any type of application or backend service – all with zero administration. Just upload your code and Lambda takes care of everything required to run and scale your code with high availability. You can set up your code to automatically trigger from other AWS services or call it directly from any web or mobile app.

Once I realized how easy it is to move code to lambda to use whenever/wherever I needed it, I jumped at the opportunity.  But…it took a while to get a good workflow in place to simplify deploying to lambda. I stumbled across Zappa and couldn’t be happier…it makes deploying to lambda simple (very simple).

OK.  So. Why would you want to move your code to Lambda?

Lots of reasons. Here’s a few:

  • Rather than host your own server to handle some API endpoints — move to Lambda
  • Rather than build out a complex development environment to support your complex system, move some of that complexity to Lambda and make a call to an API endpoint.
  • If you travel and want to downsize your travel laptop but still need to access your python data analytics stack move the stack to Lambda.
  • If you have a script that you run very irregularly and don’t want to pay $5 a month at Digital Ocean — move it to Lambda.

There are many other more sophisticated reasons of course, but these’ll do for now.

Let’s get started looking at python and AWS Lambda.  You’ll need an AWS account for this.

First – I’m going to talk a bit about building an API endpoint using Flask. You don’t have to use flask, but its an easy framework to use and you can quickly build an API endpoint with it with very little fuss.  With this example, I’m going to use Lambda to host an API endpoint that uses the Newspaper library to scrape a website, pull down the text and return that text to my local script.

Writing your first Flask + Lambda API

To get started, install Flask,Flask-Restful and Zappa.  You’ll want to do this in a fresh environment using virtualenv (see my previous posts about virtualenv and vagrant) because we’ll be moving this up to Lambda using Zappa.

pip install flask flask_restful zappa

Our flask driven API is going to be extremely simple and exist in less than 20 lines of code:

from flask import Flask
from newspaper import Article
from flask_restful import Resource, Api
app = Flask(__name__)
api = Api(app)

class hello(Resource):
    def get(self):
       return "Hello World"
api.add_resource(hello, '/hello')
if __name__ == '__main__':
    app.run(debug=True, host='0.0.0.0', port=5001)

Note: The ‘host = 0.0.0.0’ and ‘port=50001’ are extranous and are how I use Flask with vagrant. If you keep this in and run it locally, you’d need to visit “`http://0.0.0.0:5001“` to view your app.

The last thing you need to do is build your “`requirements.txt“` file for Zappa to use when building your application files to send to Lambda. For a quick/dirty requirements file, I used the following:

zappa
newspaper
flask
flask_restful

Now…let’s get this up to lambda.  With zappa, its as easy as a couple of command line instructions.

First, run the init command from the command line in your virtualenv:

zappa init

You should see something similar to this:

zappa init screenshot

You’ll be asked a few questions, you can hit ‘enter’ to take the defaults or enter your own. For this eample, I used ‘dev’ for the environment name (you can set up multiple environments for dev, staging, production, etc) and made a S3 bucket for use with this application.

Zappa should realize you are working with Flask app and automatically set things up for you. It will ask you what the name of your Flask app’s main function is (in this case it is api.app). Lastly, Zappa will ask if you want to deploy to all AWS regions…I chose not to for this example. Once complete, you’ll have a zappa_settings.json file in your directory that will look something like the following:

{
    "dev": {
        "app_function": "api.app", 
        "profile_name": "default", 
        "s3_bucket": "DEV_BUCKET_NAME" #I removed the S3 bucket name for security purposes
    }
}

I’ve found that I need to add more information to this json file before I can successfully deploy. For some reason, Zappa doesn’t add the “region” to the settings file. I also like to add the “runtime” as well. Edit your json file to read (feel free to use whatever region you want):

{
    "dev": {
        "app_function": "api.app", 
        "profile_name": "default", 
        "s3_bucket": "DEV_BUCKET_NAME",
        "runtime": "python2.7",
        "aws_region": "us-east-1"
    }
}

Now…you are ready to deploy. You can do that with the following command:

zappa deploy dev

Zappa will set up all the necessary configurations and systems on AWS AND zip up your libraries and code and push it to Lambda.   I’ve not found another framework as easy to use as Zappa when it comes to deploying…if you know of one feel free to leave a comment.

After a minute or two, you should see a “Deployment Complete: …” message that includes the endpoint for your new API. In this case, Zappa built the following endpoint for me:

https://4wq2muonbb.execute-api.us-east-1.amazonaws.com/dev

If you make some changes to your code and need to update Lambda, Zappa makes it easy to do that with the following command:

zappa update dev

Additionally, if you want to add a ‘production’ lambda environment, all you need to do is add that new environment to your settings json file and deploy it. For this example, our settings file would change to:

{
    "dev": {
        "app_function": "api.app", 
        "profile_name": "default", 
        "s3_bucket": "DEV_BUCKET_NAME",
        "runtime": "python2.7",
        "aws_region": "us-east-1"
    }.
    "prod": {
        "app_function": "api.app", 
        "profile_name": "default", 
        "s3_bucket": "PROD_BUCKET_NAME",
        "runtime": "python2.7",
        "aws_region": "us-east-1"
    }
}

Next, do a “`deploy prod“` and your production environment is ready to go at a new endpoint.

zappa deploy prod

Interfacing with the API

Our code is pushed to Lambda and ready to start accepting requests.  In this example’s case, all we are doing is returning “hello world” but you can see the power in this for other functionality.  To check out the results, just open a browser and enter your Zappa Deployment URL and append /hello to the end of it like this:

https://4wq2muonbb.execute-api.us-east-1.amazonaws.com/dev/hello

You should see the standard “Hello World” response in your browser window.

You can find the code for the lambda api.py function here.

Note: At some point, I’ll pull this endpoint down…but will leave it up for a bit for users to play around with.


 

If you want to learn more about Lambda, there are two fairly good books on the topic – check them out (Amazon links):


 

Collecting / Storing Tweets with Python and MongoDB

A good amount of the work that I do involves using social media content for analyzing networks, sentiment, influencers and other various types of analysis.

In order to do this type of analysis, you first need to have some data to analyze.  You can also scrape websites like Twitter or Facebook using simple web scrapers, but I’ve always found it easier to use the API’s that these companies / websites provide to pull down data.

The Twitter Streaming API is ideal for grabbing data in real-time and storing it for analysis. Twitter also has a search API that lets you pull down a certain number of historical tweets (I think I read it was the last 1,000 tweets…but its been a while since I’ve looked at the Search API).   I’m a fan of the Streaming API because it lets me grab a much larger set of data than the Search API, but it requires you to build a script that ‘listens’ to the API for your required keywords and then store those tweets somewhere for later analysis.

There are tons of ways to connect up to the Streaming API. There are also quite a few Twitter API wrappers for Python (and most of them work very well).   I tend to use Tweepy more than others due to its ease of use and simple structure. Additionally, if I’m working on a small / short-term project, I tend to reach for MongoDB to store the tweets using the PyMongo module. For larger / longer-term projects I usually connect the streaming API script to MySQL instead of MongoDB simply because MySQL fits into my ecosystem of backup scripts, etc better than MongoDB does.  MongoDB is perfectly suited for this type of work for larger projects…I just tend to swing toward MySQL for those projects.

For this post, I wanted to share my script for collecting Tweets from the Twitter API and storing them into MongoDB.

Note: This script is a mashup of many other scripts I’ve found on the web over the years. I don’t recall where I found the pieces/parts of this script but I don’t want to discount the help I had from other people / sites in building this script.

Collecting / Storing Tweets with Python and MongoDB

Let’s set up our imports:

from __future__ import print_function
import tweepy
import json
from pymongo import MongoClient

Next, set up your mongoDB path:

MONGO_HOST= 'mongodb://localhost/twitterdb'  # assuming you have mongoDB installed locally
                                             # and a database called 'twitterdb'

Next, set up the words that you want to ‘listen’ for on Twitter. You can use words or phrases seperated by commas.

WORDS = ['#bigdata', '#AI', '#datascience', '#machinelearning', '#ml', '#iot']

Here, I’m listening for words related to maching learning, data science, etc.

Next, let’s set up our Twitter API Access information.  You can set these up here.

CONSUMER_KEY = "KEY"
CONSUMER_SECRET = "SECRET"
ACCESS_TOKEN = "TOKEN"
ACCESS_TOKEN_SECRET = "TOKEN_SECRET"

Time to build the listener class.

class StreamListener(tweepy.StreamListener):    
    #This is a class provided by tweepy to access the Twitter Streaming API. 
    def on_connect(self):
        # Called initially to connect to the Streaming API
        print("You are now connected to the streaming API.")
 
    def on_error(self, status_code):
        # On error - if an error occurs, display the error / status code
        print('An Error has occured: ' + repr(status_code))
        return False
 
    def on_data(self, data):
        #This is the meat of the script...it connects to your mongoDB and stores the tweet
        try:
            client = MongoClient(MONGO_HOST)
            
            # Use twitterdb database. If it doesn't exist, it will be created.
            db = client.twitterdb
    
            # Decode the JSON from Twitter
            datajson = json.loads(data)
            
            #grab the 'created_at' data from the Tweet to use for display
            created_at = datajson['created_at']
            #print out a message to the screen that we have collected a tweet
            print("Tweet collected at " + str(created_at))
            
            #insert the data into the mongoDB into a collection called twitter_search
            #if twitter_search doesn't exist, it will be created.
            db.twitter_search.insert(datajson)
        except Exception as e:
           print(e)

Now that we have the listener class, let’s set everything up to start listening.

auth = tweepy.OAuthHandler(CONSUMER_KEY, CONSUMER_SECRET)
auth.set_access_token(ACCESS_TOKEN, ACCESS_TOKEN_SECRET)
#Set up the listener. The 'wait_on_rate_limit=True' is needed to help with Twitter API rate limiting.
listener = StreamListener(api=tweepy.API(wait_on_rate_limit=True)) 
streamer = tweepy.Stream(auth=auth, listener=listener)
print("Tracking: " + str(WORDS))
streamer.filter(track=WORDS)

Now you are ready to go. The full script is below. You can store this script as “streaming_API.py” and run it as “python streaming_API.py” and – assuming you set up mongoDB and your twitter API key’s correctly, you should start collecting Tweets.

The Full Script:

from __future__ import print_function
import tweepy
import json
from pymongo import MongoClient
MONGO_HOST= 'mongodb://localhost/twitterdb'  # assuming you have mongoDB installed locally
                                             # and a database called 'twitterdb'
WORDS = ['#bigdata', '#AI', '#datascience', '#machinelearning', '#ml', '#iot']
CONSUMER_KEY = "KEY"
CONSUMER_SECRET = "SECRET"
ACCESS_TOKEN = "TOKEN"
ACCESS_TOKEN_SECRET = "TOKEN_SECRET"

class StreamListener(tweepy.StreamListener):    
    #This is a class provided by tweepy to access the Twitter Streaming API. 
    def on_connect(self):
        # Called initially to connect to the Streaming API
        print("You are now connected to the streaming API.")
 
    def on_error(self, status_code):
        # On error - if an error occurs, display the error / status code
        print('An Error has occured: ' + repr(status_code))
        return False
 
    def on_data(self, data):
        #This is the meat of the script...it connects to your mongoDB and stores the tweet
        try:
            client = MongoClient(MONGO_HOST)
            
            # Use twitterdb database. If it doesn't exist, it will be created.
            db = client.twitterdb
    
            # Decode the JSON from Twitter
            datajson = json.loads(data)
            
            #grab the 'created_at' data from the Tweet to use for display
            created_at = datajson['created_at']
            #print out a message to the screen that we have collected a tweet
            print("Tweet collected at " + str(created_at))
            
            #insert the data into the mongoDB into a collection called twitter_search
            #if twitter_search doesn't exist, it will be created.
            db.twitter_search.insert(datajson)
        except Exception as e:
           print(e)
auth = tweepy.OAuthHandler(CONSUMER_KEY, CONSUMER_SECRET)
auth.set_access_token(ACCESS_TOKEN, ACCESS_TOKEN_SECRET)
#Set up the listener. The 'wait_on_rate_limit=True' is needed to help with Twitter API rate limiting.
listener = StreamListener(api=tweepy.API(wait_on_rate_limit=True)) 
streamer = tweepy.Stream(auth=auth, listener=listener)
print("Tracking: " + str(WORDS))
streamer.filter(track=WORDS)