Consuming Twitter’s Streaming API using Python and cURL

Twitter’s Streaming API provides developers access to a global stream of tweet data. By setting up a persistant HTTP connection to one of the streaming endpoints you will be pushed tweets and other messages. There are some good posts and tutorials on the web about how to use the Streaming API, there are also libraries that you can use. But if you want to roll your own code there are several things you need to think about if you want to get it all right.

In this tutorial we will create a complete example of how to consume the public stream and getting all tweets that mentions the products iphone, ipad or ipod. We will connect to the stream using OAuth and deal with common errors and warnings.

Setting up the connection

Lets start with some code for setting up the connection:

import time
import pycurl
import urllib
import json
import oauth2 as oauth

API_ENDPOINT_URL = 'https://stream.twitter.com/1.1/statuses/filter.json'
USER_AGENT = 'TwitterStream 1.0' # This can be anything really

# You need to replace these with your own values
OAUTH_KEYS = {'consumer_key': <Consumer key>,
              'consumer_secret': <Consumer secret>,
              'access_token_key': <Token key>,
              'access_token_secret': <Token secret>}

# These values are posted when setting up the connection
POST_PARAMS = {'include_entities': 0,
               'stall_warning': 'true',
               'track': 'iphone,ipad,ipod'}

class TwitterStream:
    def __init__(self):
        self.oauth_token = oauth.Token(key=OAUTH_KEYS['access_token_key'], secret=OAUTH_KEYS['access_token_secret'])
        self.oauth_consumer = oauth.Consumer(key=OAUTH_KEYS['consumer_key'], secret=OAUTH_KEYS['consumer_secret'])
        self.conn = None
        self.buffer = ''
        self.setup_connection()

    def setup_connection(self):
        """ Create persistant HTTP connection to Streaming API endpoint using cURL.
        """
        if self.conn:
            self.conn.close()
            self.buffer = ''
        self.conn = pycurl.Curl()
        self.conn.setopt(pycurl.URL, API_ENDPOINT_URL)
        self.conn.setopt(pycurl.USERAGENT, USER_AGENT)
        # Using gzip is optional but saves us bandwidth.
        self.conn.setopt(pycurl.ENCODING, 'deflate, gzip')
        self.conn.setopt(pycurl.POST, 1)
        self.conn.setopt(pycurl.POSTFIELDS, urllib.urlencode(POST_PARAMS))
        self.conn.setopt(pycurl.HTTPHEADER, ['Host: stream.twitter.com',
                                             'Authorization: %s' % self.get_oauth_header()])
        # self.handle_tweet is the method that are called when new tweets arrive
        self.conn.setopt(pycurl.WRITEFUNCTION, self.handle_tweet)

We start with defining som global parameters. Starting with version 1.1 of the Streaming API you will need to authenticate the connection using OAuth, we use Python’s  oauth2 library for this. You need to create your own OAuth-parameters and put them in OAUTH_KEYS, see here for more info about how to create these.

In POST_PARAMS we put the parameters that are posted when setting up the connection. By settings stall_warnings to 0 we will receive warnings if the connection is about to be disconnected due to the client falling behind. This can be a good idea especially if we are following high-traffic keywords.
In the parameter track we put all the keywords we want to follow in a comma-separated list.

In the method setup_connection we connect to the Streaming endpoint using cURL. We start by making sure there is no open connection already, if so we close it and empty the buffer used for saving intermediate tweet data. Then we set the necessary parameters to cURL:

  • URL – URL to the Streaming API endpoint.
  • USER_AGENT – An optional, but recommended, string for identifying your application.
  • ENCODING – By setting it to “deflate, gzip” the stream will be sent in gzipped format which saves a lot of bandwidth.
  • POST – We are going do do a POST so we set this to “1”.
  • POSTFIELDS – These are the data that we are going to post, at a minimum we need to send the keywords we want to track.
  • HTTPHEADER – Host needs to be set to ensure we get a gzipped stream and Authorization is the Oauth header.
  • WRITEFUNCTION – This method will be called with the data from stream.

Creating the Oauth header

The method get_oauth_header creates and returns the OAuth header needed for authenticating the connection.

def get_oauth_header(self):
    """ Create and return OAuth header.
    """
    params = {'oauth_version': '1.0',
              'oauth_nonce': oauth.generate_nonce(),
              'oauth_timestamp': int(time.time())}
    req = oauth.Request(method='POST', parameters=params, url='%s?%s' % (API_ENDPOINT_URL,
                                                                         urllib.urlencode(POST_PARAMS)))
    req.sign_request(oauth.SignatureMethod_HMAC_SHA1(), self.oauth_consumer, self.oauth_token)
    return req.to_header()['Authorization'].encode('utf-8')

We create a request that we sign using our OAuth consumer key and token. Note that the url parameter need to be exactly the same as the URL and POSTFIELDS parameters that we use in setup_connection. Then we extract and return the resulting header.

Handling connection errors

By calling self.conn.perform() we enter a loop that starts receiving from the stream and sending the data to the method handle_tweet.
This loop will run until we kill the program or something happens with the connection. The connection can be dropped either by a network error or by a HTTP error. Depending on which type of error we receive, Twitter recommends different reconnection strategies. For a network error we should back off linearly, for HTTP errors we should back off exponentially, the following code calls perform and handles connection errors according to Twitters recommendations:

def start(self):
    """ Start listening to Streaming endpoint.
    Handle exceptions according to Twitter's recommendations.
    """
    backoff_network_error = 0.25
    backoff_http_error = 5
    backoff_rate_limit = 60
    while True:
        self.setup_connection()
        try:
            self.conn.perform()
        except:
            # Network error, use linear back off up to 16 seconds
            print 'Network error: %s' % self.conn.errstr()
            print 'Waiting %s seconds before trying again' % backoff_network_error
            time.sleep(backoff_network_error)
            backoff_network_error = min(backoff_network_error + 1, 16)
            continue
        # HTTP Error
        sc = self.conn.getinfo(pycurl.HTTP_CODE)
        if sc == 420:
            # Rate limit, use exponential back off starting with 1 minute and double each attempt
            print 'Rate limit, waiting %s seconds' % backoff_rate_limit
            time.sleep(backoff_rate_limit)
            backoff_rate_limit *= 2
        else:
            # HTTP error, use exponential back off up to 320 seconds
            print 'HTTP error %s, %s' % (sc, self.conn.errstr())
            print 'Waiting %s seconds' % backoff_http_error
            time.sleep(backoff_http_error)
            backoff_http_error = min(backoff_http_error * 2, 320)

(Optimally we should reset the backoff-values to the default values after a successful reconnection, that is left as an excercise…)

Processing the tweets

The Streaming API send data as a series of newline-delimited messages, where newline is considered to be “/r/n” and messages are JSON encoded data.

Apart from normal tweets we might also receive various warnings and error messages in the stream so we need to be prepared for that. You can find a complete list of these messages here. In the code bellow we will not handle all of these messages just the most important ones.

def handle_tweet(self, data):
    """ This method is called when data is received through Streaming endpoint.
    """
    self.buffer += data
    if data.endswith('\r\n') and self.buffer.strip():
        # complete message received
        message = json.loads(self.buffer)
        self.buffer = ''
        msg = ''
        if message.get('limit'):
            print 'Rate limiting caused us to miss %s tweets' % (message['limit'].get('track'))
        elif message.get('disconnect'):
            raise Exception('Got disconnect: %s' % message['disconnect'].get('reason'))
        elif message.get('warning'):
            print 'Got warning: %s' % message['warning'].get('message')
        else:
            print 'Got tweet with text: %s' % message.get('text')

Hopefully this tutorial has given you a basic understanding about how Twitter’s Streaming API works. The complete code for this example is available here.

  • http://www.jcopro.net Jeremy Cook

    Hey, thanks for the tutorial! I’ve been playing around with automating some real-world objects (servos, LEDs, etc) using Python. Can’t wait to hopefully try out some of this stuff with Twitter!

  • http://anly.tk toby

    or alternatively:
    import tweepy

    😉

  • Philipp

    thanks for the tutorial! But am i right this code miss the part that collect tweets?

    how would i do that using scraperwiki for example?

    • arngarden

      No you don’t need to use any other lib for collecting tweets. All tweets that matches the keywords are sent to the handle_tweet-method.

      • Philipp

        And am i right wrighting this:

        POST_PARAMS = {‘include_entities’: 0,
        ‘stall_warning’: ‘true’,
        ‘locations’: ‘-122.75,36.8,-121.75,37.8’}

        for location filtering

        // I just start python and althrouth there is no errors nothing happens – what problem can it be?

        • Philipp
        • arngarden

          Looks right I believe, but I have never tried location-based filtering.. Perhaps you could try a larger area? Not sure how many users have geolocation of tweets enabled..

          • Philipp

            Sory for so may questions, i have a bigger problem now
            – Network error: SSL certificate problem, verify that the CA cert is OK. Details:error:14090086:SSL routines:SSL3_GET_SERVER_CERTIFICATE:certificate verify failed
            Can you help me to handle it? Have no idea even how to start solving it

          • admin

            Don’t know if I can help you much, have never encountered that problem. Perhaps something is wrong with your OAUTH_KEYS? A good starting point might be here: https://dev.twitter.com/docs/auth/using-oauth

          • arngarden

            Don’t know if I can help you much, have never encountered that problem. Perhaps something is wrong with your OAUTH_KEYS? A good starting point might be here: https://dev.twitter.com/docs/auth/using-oauth

          • Philipp

            hmm looks like all correct. can this cause a problem:

            https://dev.twitter.com/blog/ssl-upgrade-for-twitterapi ??

          • arngarden

            Don’t know, I have never had to think about SSL certificates for my code..

  • Vishal

    where/how are you managing SSL certificates?

  • http://twitter.com/keyzer_s0ze keyzer

    Any idea why I would be getting an invalid syntax error at the end of line 15? The error is at the 8 at the end of the line, but there was no error on the other key lines….

    ‘access_token_key’: 119834338718-1ZPtYnrIPDkVIL34536F42zL72fX9dIfP1zf3tz8,

    • arngarden

      Could it be that you forgot the quotes around the key? Should then be:

      ‘access_token_key’: ‘119834338718-1ZPtYnrIPDkVIL34536F42zL72fX9dIfP1zf3tz8’,

  • Jason Hill

    I’ve used some of this code, modified it, and expended upon it. Would you consider placing a GPL header in your github code?

    • arngarden

      Good idea! I’ve updated the code with GPL header.

  • fernandopso

    How do I close the connection?

  • Patrick

    Hey, great piece of code. Is it possible to do this with a get request. When I curl /statuses/filter.json with locations, it returns the whole tweet, not just the text. Is that possible through pyCurl? Thanks

  • Patrick

    How would i then send this to mongodb?

    • Feadurn

      instead of printing the tweet, you can do a db.insert(tweet) and it will insert your tweet into the database

  • jebesen

    I am becoming crazy. I’ve tried everything and I am still receiving: HTTP error 401.

    I have read that it is an authorizing problem but I have checked it thousands of times and my consumer and acces token codes are correct.

    Please help.

  • Matt Hart

    Nice article mate. Cheers.

  • Guest

    Thank you for the post, it’s been awhile since anyone has discussed on here so I hope that you’re still active. I have modified your code for a project I am working on. The script has been running fine until recently and I believe it is due to a change in Twitter’s new connection standards, see article here:
    https://dev.twitter.com/discussions/24239?page=2

    Would you happen to know how to implement SSL connections in place of any plain-text HTML connections for your code? Any feedback would be much appreciated.

  • Feadurn

    Thank you for your script. However I have a noob question. How can I do to return the tweet instead of printing them. I try to understand where to put a return but cannot find any working solution.

    Thanks again for sharing your code :)

    • arngarden

      The handle_tweet-method is a so called “callback-method” that is called for each new tweet. It doesn’t really make sense to add a return to that method, instead you should add all functionality for dealing with the tweet to that method. (Of course the handle_tweet-method can call other methods, for example to add tweet to a database)

      • Feadurn

        Thank you, it is what I have done at the end

  • Sourabh

    Thanks for the script

    I am trying to use it but its showing a traceback:

    Traceback (most recent call last):

    File “C:/Python27/nytimes/2062014/pucurl tweets .py”, line 110, in

    ts = TwitterStream()

    File “C:/Python27/nytimes/2062014/pucurl tweets .py”, line 24, in __init__

    self.oauth_token = oauth.Token(key=OAUTH_KEYS[‘access_token_key’], secret=OAUTH_KEYS[‘access_token_secret’])

    AttributeError: ‘module’ object has no attribute ‘Token’

    Regards
    Sourabh

    • arngarden

      I would guess something is wrong with your installation of the Oauth2 library, have you tried reinstalling it? I see that you also run Windows, could be related to that, I have not tested the script in on other platforms than Linux and Mac.

  • Feadurn

    I have another question that I tried to solve lately but couldn’t find the way to do it. If I want to use your API but on some point terminate properly the connection (to update the keyword from another script’s result, how can I do that as it is the self.conn.perform()is a loop. I was thinking about using a check in the handle_tweet(), but if no message is receive, it will never check it.

    Thank you very mych for any help

    • arngarden

      One option could be to catch the SIGTERM signal (sent by Ctrl-c) and have that set a “stop-parameter”. If you also set a timeout, you do not need have to wait for a new message to be recieved.

      • Feadurn

        Thank you. I think I will go with the timeout then, it is seems the best option (a check when it get message and another check after a timeout)

  • Feadurn

    Thank you very much. I am using it for my research project but modify it a little bit (and solved all problems I asked in the previous comment). I don’t plan on the next future to put it public but I help people around me with it by giving them my modified version. I am not a developer and not really aware of how I can state in the file I did modifications. I obviously left the header with your copyright and the GNU licence but I was wondering how can I add my own ownership on the modifications without interfering with your ownership of it.

    Again, thank you very much,

    PS: I initially posted this comment on a wrong post, sorry.

    • arngarden

      Thank you for your kind words!
      I’m not an expert myself on licensing issues, but you of course own the rights of the modifications you have made. So I think the standard way is to just add your name next to mine in the license header in the files you create.

  • pasumarthi

    API_ENDPOINT_URL = ‘https://stream.twitter.com/1.1/statuses/filter.json’
    when i am trying to open the above link it is asking me to give user name and pwd.
    i don’t know what are those
    tried with twitter credentials(not worked)
    tried with consumer key&secret(not worked)
    please help me on this…..
    Thanks in advance

    • Rajasekar G

      I too have the same problem.. ask username and password credentials did you get any solutions. please let me know.. Thanks in advance @pasumarthi:disqus . please any one help me.

  • Pingback: python twitter api example | bruinrow