Tag Archives: python

Consuming Twitter’s Streaming API using Python and cURL

Twitter’s Streaming API provides developers access to a global stream of tweet data. By setting up a persistant HTTP connection to one of the streaming endpoints you will be pushed tweets and other messages. There are some good posts and tutorials on the web about how to use the Streaming API, there are also libraries that you can use. But if you want to roll your own code there are several things you need to think about if you want to get it all right.

In this tutorial we will create a complete example of how to consume the public stream and getting all tweets that mentions the products iphone, ipad or ipod. We will connect to the stream using OAuth and deal with common errors and warnings.

Setting up the connection

Lets start with some code for setting up the connection:

import time
import pycurl
import urllib
import json
import oauth2 as oauth

API_ENDPOINT_URL = 'https://stream.twitter.com/1.1/statuses/filter.json'
USER_AGENT = 'TwitterStream 1.0' # This can be anything really

# You need to replace these with your own values
OAUTH_KEYS = {'consumer_key': <Consumer key>,
              'consumer_secret': <Consumer secret>,
              'access_token_key': <Token key>,
              'access_token_secret': <Token secret>}

# These values are posted when setting up the connection
POST_PARAMS = {'include_entities': 0,
               'stall_warning': 'true',
               'track': 'iphone,ipad,ipod'}

class TwitterStream:
    def __init__(self):
        self.oauth_token = oauth.Token(key=OAUTH_KEYS['access_token_key'], secret=OAUTH_KEYS['access_token_secret'])
        self.oauth_consumer = oauth.Consumer(key=OAUTH_KEYS['consumer_key'], secret=OAUTH_KEYS['consumer_secret'])
        self.conn = None
        self.buffer = ''
        self.setup_connection()

    def setup_connection(self):
        """ Create persistant HTTP connection to Streaming API endpoint using cURL.
        """
        if self.conn:
            self.conn.close()
            self.buffer = ''
        self.conn = pycurl.Curl()
        self.conn.setopt(pycurl.URL, API_ENDPOINT_URL)
        self.conn.setopt(pycurl.USERAGENT, USER_AGENT)
        # Using gzip is optional but saves us bandwidth.
        self.conn.setopt(pycurl.ENCODING, 'deflate, gzip')
        self.conn.setopt(pycurl.POST, 1)
        self.conn.setopt(pycurl.POSTFIELDS, urllib.urlencode(POST_PARAMS))
        self.conn.setopt(pycurl.HTTPHEADER, ['Host: stream.twitter.com',
                                             'Authorization: %s' % self.get_oauth_header()])
        # self.handle_tweet is the method that are called when new tweets arrive
        self.conn.setopt(pycurl.WRITEFUNCTION, self.handle_tweet)

We start with defining som global parameters. Starting with version 1.1 of the Streaming API you will need to authenticate the connection using OAuth, we use Python’s  oauth2 library for this. You need to create your own OAuth-parameters and put them in OAUTH_KEYS, see here for more info about how to create these.

In POST_PARAMS we put the parameters that are posted when setting up the connection. By settings stall_warnings to 0 we will receive warnings if the connection is about to be disconnected due to the client falling behind. This can be a good idea especially if we are following high-traffic keywords.
In the parameter track we put all the keywords we want to follow in a comma-separated list.

In the method setup_connection we connect to the Streaming endpoint using cURL. We start by making sure there is no open connection already, if so we close it and empty the buffer used for saving intermediate tweet data. Then we set the necessary parameters to cURL:

  • URL – URL to the Streaming API endpoint.
  • USER_AGENT – An optional, but recommended, string for identifying your application.
  • ENCODING – By setting it to “deflate, gzip” the stream will be sent in gzipped format which saves a lot of bandwidth.
  • POST – We are going do do a POST so we set this to “1”.
  • POSTFIELDS – These are the data that we are going to post, at a minimum we need to send the keywords we want to track.
  • HTTPHEADER – Host needs to be set to ensure we get a gzipped stream and Authorization is the Oauth header.
  • WRITEFUNCTION – This method will be called with the data from stream.

Creating the Oauth header

The method get_oauth_header creates and returns the OAuth header needed for authenticating the connection.

def get_oauth_header(self):
    """ Create and return OAuth header.
    """
    params = {'oauth_version': '1.0',
              'oauth_nonce': oauth.generate_nonce(),
              'oauth_timestamp': int(time.time())}
    req = oauth.Request(method='POST', parameters=params, url='%s?%s' % (API_ENDPOINT_URL,
                                                                         urllib.urlencode(POST_PARAMS)))
    req.sign_request(oauth.SignatureMethod_HMAC_SHA1(), self.oauth_consumer, self.oauth_token)
    return req.to_header()['Authorization'].encode('utf-8')

We create a request that we sign using our OAuth consumer key and token. Note that the url parameter need to be exactly the same as the URL and POSTFIELDS parameters that we use in setup_connection. Then we extract and return the resulting header.

Handling connection errors

By calling self.conn.perform() we enter a loop that starts receiving from the stream and sending the data to the method handle_tweet.
This loop will run until we kill the program or something happens with the connection. The connection can be dropped either by a network error or by a HTTP error. Depending on which type of error we receive, Twitter recommends different reconnection strategies. For a network error we should back off linearly, for HTTP errors we should back off exponentially, the following code calls perform and handles connection errors according to Twitters recommendations:

def start(self):
    """ Start listening to Streaming endpoint.
    Handle exceptions according to Twitter's recommendations.
    """
    backoff_network_error = 0.25
    backoff_http_error = 5
    backoff_rate_limit = 60
    while True:
        self.setup_connection()
        try:
            self.conn.perform()
        except:
            # Network error, use linear back off up to 16 seconds
            print 'Network error: %s' % self.conn.errstr()
            print 'Waiting %s seconds before trying again' % backoff_network_error
            time.sleep(backoff_network_error)
            backoff_network_error = min(backoff_network_error + 1, 16)
            continue
        # HTTP Error
        sc = self.conn.getinfo(pycurl.HTTP_CODE)
        if sc == 420:
            # Rate limit, use exponential back off starting with 1 minute and double each attempt
            print 'Rate limit, waiting %s seconds' % backoff_rate_limit
            time.sleep(backoff_rate_limit)
            backoff_rate_limit *= 2
        else:
            # HTTP error, use exponential back off up to 320 seconds
            print 'HTTP error %s, %s' % (sc, self.conn.errstr())
            print 'Waiting %s seconds' % backoff_http_error
            time.sleep(backoff_http_error)
            backoff_http_error = min(backoff_http_error * 2, 320)

(Optimally we should reset the backoff-values to the default values after a successful reconnection, that is left as an excercise…)

Processing the tweets

The Streaming API send data as a series of newline-delimited messages, where newline is considered to be “/r/n” and messages are JSON encoded data.

Apart from normal tweets we might also receive various warnings and error messages in the stream so we need to be prepared for that. You can find a complete list of these messages here. In the code bellow we will not handle all of these messages just the most important ones.

def handle_tweet(self, data):
    """ This method is called when data is received through Streaming endpoint.
    """
    self.buffer += data
    if data.endswith('\r\n') and self.buffer.strip():
        # complete message received
        message = json.loads(self.buffer)
        self.buffer = ''
        msg = ''
        if message.get('limit'):
            print 'Rate limiting caused us to miss %s tweets' % (message['limit'].get('track'))
        elif message.get('disconnect'):
            raise Exception('Got disconnect: %s' % message['disconnect'].get('reason'))
        elif message.get('warning'):
            print 'Got warning: %s' % message['warning'].get('message')
        else:
            print 'Got tweet with text: %s' % message.get('text')

Hopefully this tutorial has given you a basic understanding about how Twitter’s Streaming API works. The complete code for this example is available here.