Consuming Twitter’s Streaming API using Python and cURL

Twitter’s Streaming API provides developers access to a global stream of tweet data. By setting up a persistant HTTP connection to one of the streaming endpoints you will be pushed tweets and other messages. There are some good posts and tutorials on the web about how to use the Streaming API, there are also libraries that you can use. But if you want to roll your own code there are several things you need to think about if you want to get it all right.

In this tutorial we will create a complete example of how to consume the public stream and getting all tweets that mentions the products iphone, ipad or ipod. We will connect to the stream using OAuth and deal with common errors and warnings.

Setting up the connection

Lets start with some code for setting up the connection:

import time
import pycurl
import urllib
import json
import oauth2 as oauth

API_ENDPOINT_URL = 'https://stream.twitter.com/1.1/statuses/filter.json'
USER_AGENT = 'TwitterStream 1.0' # This can be anything really

# You need to replace these with your own values
OAUTH_KEYS = {'consumer_key': <Consumer key>,
              'consumer_secret': <Consumer secret>,
              'access_token_key': <Token key>,
              'access_token_secret': <Token secret>}

# These values are posted when setting up the connection
POST_PARAMS = {'include_entities': 0,
               'stall_warning': 'true',
               'track': 'iphone,ipad,ipod'}

class TwitterStream:
    def __init__(self):
        self.oauth_token = oauth.Token(key=OAUTH_KEYS['access_token_key'], secret=OAUTH_KEYS['access_token_secret'])
        self.oauth_consumer = oauth.Consumer(key=OAUTH_KEYS['consumer_key'], secret=OAUTH_KEYS['consumer_secret'])
        self.conn = None
        self.buffer = ''
        self.setup_connection()

    def setup_connection(self):
        """ Create persistant HTTP connection to Streaming API endpoint using cURL.
        """
        if self.conn:
            self.conn.close()
            self.buffer = ''
        self.conn = pycurl.Curl()
        self.conn.setopt(pycurl.URL, API_ENDPOINT_URL)
        self.conn.setopt(pycurl.USERAGENT, USER_AGENT)
        # Using gzip is optional but saves us bandwidth.
        self.conn.setopt(pycurl.ENCODING, 'deflate, gzip')
        self.conn.setopt(pycurl.POST, 1)
        self.conn.setopt(pycurl.POSTFIELDS, urllib.urlencode(POST_PARAMS))
        self.conn.setopt(pycurl.HTTPHEADER, ['Host: stream.twitter.com',
                                             'Authorization: %s' % self.get_oauth_header()])
        # self.handle_tweet is the method that are called when new tweets arrive
        self.conn.setopt(pycurl.WRITEFUNCTION, self.handle_tweet)

We start with defining som global parameters. Starting with version 1.1 of the Streaming API you will need to authenticate the connection using OAuth, we use Python’s  oauth2 library for this. You need to create your own OAuth-parameters and put them in OAUTH_KEYS, see here for more info about how to create these.

In POST_PARAMS we put the parameters that are posted when setting up the connection. By settings stall_warnings to 0 we will receive warnings if the connection is about to be disconnected due to the client falling behind. This can be a good idea especially if we are following high-traffic keywords.
In the parameter track we put all the keywords we want to follow in a comma-separated list.

In the method setup_connection we connect to the Streaming endpoint using cURL. We start by making sure there is no open connection already, if so we close it and empty the buffer used for saving intermediate tweet data. Then we set the necessary parameters to cURL:

  • URL – URL to the Streaming API endpoint.
  • USER_AGENT – An optional, but recommended, string for identifying your application.
  • ENCODING – By setting it to “deflate, gzip” the stream will be sent in gzipped format which saves a lot of bandwidth.
  • POST – We are going do do a POST so we set this to “1″.
  • POSTFIELDS – These are the data that we are going to post, at a minimum we need to send the keywords we want to track.
  • HTTPHEADER – Host needs to be set to ensure we get a gzipped stream and Authorization is the Oauth header.
  • WRITEFUNCTION – This method will be called with the data from stream.

Creating the Oauth header

The method get_oauth_header creates and returns the OAuth header needed for authenticating the connection.

def get_oauth_header(self):
    """ Create and return OAuth header.
    """
    params = {'oauth_version': '1.0',
              'oauth_nonce': oauth.generate_nonce(),
              'oauth_timestamp': int(time.time())}
    req = oauth.Request(method='POST', parameters=params, url='%s?%s' % (API_ENDPOINT_URL,
                                                                         urllib.urlencode(POST_PARAMS)))
    req.sign_request(oauth.SignatureMethod_HMAC_SHA1(), self.oauth_consumer, self.oauth_token)
    return req.to_header()['Authorization'].encode('utf-8')

We create a request that we sign using our OAuth consumer key and token. Note that the url parameter need to be exactly the same as the URL and POSTFIELDS parameters that we use in setup_connection. Then we extract and return the resulting header.

Handling connection errors

By calling self.conn.perform() we enter a loop that starts receiving from the stream and sending the data to the method handle_tweet.
This loop will run until we kill the program or something happens with the connection. The connection can be dropped either by a network error or by a HTTP error. Depending on which type of error we receive, Twitter recommends different reconnection strategies. For a network error we should back off linearly, for HTTP errors we should back off exponentially, the following code calls perform and handles connection errors according to Twitters recommendations:

def start(self):
    """ Start listening to Streaming endpoint.
    Handle exceptions according to Twitter's recommendations.
    """
    backoff_network_error = 0.25
    backoff_http_error = 5
    backoff_rate_limit = 60
    while True:
        self.setup_connection()
        try:
            self.conn.perform()
        except:
            # Network error, use linear back off up to 16 seconds
            print 'Network error: %s' % self.conn.errstr()
            print 'Waiting %s seconds before trying again' % backoff_network_error
            time.sleep(backoff_network_error)
            backoff_network_error = min(backoff_network_error + 1, 16)
            continue
        # HTTP Error
        sc = self.conn.getinfo(pycurl.HTTP_CODE)
        if sc == 420:
            # Rate limit, use exponential back off starting with 1 minute and double each attempt
            print 'Rate limit, waiting %s seconds' % backoff_rate_limit
            time.sleep(backoff_rate_limit)
            backoff_rate_limit *= 2
        else:
            # HTTP error, use exponential back off up to 320 seconds
            print 'HTTP error %s, %s' % (sc, self.conn.errstr())
            print 'Waiting %s seconds' % backoff_http_error
            time.sleep(backoff_http_error)
            backoff_http_error = min(backoff_http_error * 2, 320)

(Optimally we should reset the backoff-values to the default values after a successful reconnection, that is left as an excercise…)

Processing the tweets

The Streaming API send data as a series of newline-delimited messages, where newline is considered to be “/r/n” and messages are JSON encoded data.

Apart from normal tweets we might also receive various warnings and error messages in the stream so we need to be prepared for that. You can find a complete list of these messages here. In the code bellow we will not handle all of these messages just the most important ones.

def handle_tweet(self, data):
    """ This method is called when data is received through Streaming endpoint.
    """
    self.buffer += data
    if data.endswith('\r\n') and self.buffer.strip():
        # complete message received
        message = json.loads(self.buffer)
        self.buffer = ''
        msg = ''
        if message.get('limit'):
            print 'Rate limiting caused us to miss %s tweets' % (message['limit'].get('track'))
        elif message.get('disconnect'):
            raise Exception('Got disconnect: %s' % message['disconnect'].get('reason'))
        elif message.get('warning'):
            print 'Got warning: %s' % message['warning'].get('message')
        else:
            print 'Got tweet with text: %s' % message.get('text')

Hopefully this tutorial has given you a basic understanding about how Twitter’s Streaming API works. The complete code for this example is available here.

  • http://www.jcopro.net Jeremy Cook

    Hey, thanks for the tutorial! I’ve been playing around with automating some real-world objects (servos, LEDs, etc) using Python. Can’t wait to hopefully try out some of this stuff with Twitter!

  • http://anly.tk toby

    or alternatively:
    import tweepy

    ;)

  • Philipp

    thanks for the tutorial! But am i right this code miss the part that collect tweets?

    how would i do that using scraperwiki for example?

    • arngarden

      No you don’t need to use any other lib for collecting tweets. All tweets that matches the keywords are sent to the handle_tweet-method.

      • Philipp

        And am i right wrighting this:

        POST_PARAMS = {‘include_entities’: 0,
        ‘stall_warning’: ‘true’,
        ‘locations’: ‘-122.75,36.8,-121.75,37.8′}

        for location filtering

        // I just start python and althrouth there is no errors nothing happens – what problem can it be?

        • Philipp
        • arngarden

          Looks right I believe, but I have never tried location-based filtering.. Perhaps you could try a larger area? Not sure how many users have geolocation of tweets enabled..

          • Philipp

            Sory for so may questions, i have a bigger problem now
            - Network error: SSL certificate problem, verify that the CA cert is OK. Details:error:14090086:SSL routines:SSL3_GET_SERVER_CERTIFICATE:certificate verify failed
            Can you help me to handle it? Have no idea even how to start solving it

          • admin

            Don’t know if I can help you much, have never encountered that problem. Perhaps something is wrong with your OAUTH_KEYS? A good starting point might be here: https://dev.twitter.com/docs/auth/using-oauth

          • arngarden

            Don’t know if I can help you much, have never encountered that problem. Perhaps something is wrong with your OAUTH_KEYS? A good starting point might be here: https://dev.twitter.com/docs/auth/using-oauth

          • Philipp

            hmm looks like all correct. can this cause a problem:

            https://dev.twitter.com/blog/ssl-upgrade-for-twitterapi ??

          • arngarden

            Don’t know, I have never had to think about SSL certificates for my code..

  • Vishal

    where/how are you managing SSL certificates?

  • http://twitter.com/keyzer_s0ze keyzer

    Any idea why I would be getting an invalid syntax error at the end of line 15? The error is at the 8 at the end of the line, but there was no error on the other key lines….

    ‘access_token_key’: 119834338718-1ZPtYnrIPDkVIL34536F42zL72fX9dIfP1zf3tz8,

    • arngarden

      Could it be that you forgot the quotes around the key? Should then be:

      ‘access_token_key’: ’119834338718-1ZPtYnrIPDkVIL34536F42zL72fX9dIfP1zf3tz8′,

  • Jason Hill

    I’ve used some of this code, modified it, and expended upon it. Would you consider placing a GPL header in your github code?

    • arngarden

      Good idea! I’ve updated the code with GPL header.

  • fernandopso

    How do I close the connection?

  • Patrick

    Hey, great piece of code. Is it possible to do this with a get request. When I curl /statuses/filter.json with locations, it returns the whole tweet, not just the text. Is that possible through pyCurl? Thanks

  • Patrick

    How would i then send this to mongodb?

  • jebesen

    I am becoming crazy. I’ve tried everything and I am still receiving: HTTP error 401.

    I have read that it is an authorizing problem but I have checked it thousands of times and my consumer and acces token codes are correct.

    Please help.

  • Matt Hart

    Nice article mate. Cheers.

  • Guest

    Thank you for the post, it’s been awhile since anyone has discussed on here so I hope that you’re still active. I have modified your code for a project I am working on. The script has been running fine until recently and I believe it is due to a change in Twitter’s new connection standards, see article here:
    https://dev.twitter.com/discussions/24239?page=2

    Would you happen to know how to implement SSL connections in place of any plain-text HTML connections for your code? Any feedback would be much appreciated.

  • Feadurn

    Thank you for your script. However I have a noob question. How can I do to return the tweet instead of printing them. I try to understand where to put a return but cannot find any working solution.

    Thanks again for sharing your code :)

    • arngarden

      The handle_tweet-method is a so called “callback-method” that is called for each new tweet. It doesn’t really make sense to add a return to that method, instead you should add all functionality for dealing with the tweet to that method. (Of course the handle_tweet-method can call other methods, for example to add tweet to a database)

      • Feadurn

        Thank you, it is what I have done at the end

  • Sourabh

    Thanks for the script

    I am trying to use it but its showing a traceback:

    Traceback (most recent call last):

    File “C:/Python27/nytimes/2062014/pucurl tweets .py”, line 110, in

    ts = TwitterStream()

    File “C:/Python27/nytimes/2062014/pucurl tweets .py”, line 24, in __init__

    self.oauth_token = oauth.Token(key=OAUTH_KEYS['access_token_key'], secret=OAUTH_KEYS['access_token_secret'])

    AttributeError: ‘module’ object has no attribute ‘Token’

    Regards
    Sourabh

    • arngarden

      I would guess something is wrong with your installation of the Oauth2 library, have you tried reinstalling it? I see that you also run Windows, could be related to that, I have not tested the script in on other platforms than Linux and Mac.