4elements, web design and consultancy

  1. Building With the Twitter API: Using Real-Time Streams

    Final product image
    What You'll Be Creating

    While the Twitter REST API is suitable for many applications, if you want immediate updates and access to a broader array of notifications, the Twitter Streaming API is essential. For example, only the streaming API will tell you when another user favorites one of your tweets.

    Using the Streaming API requires a persistent, keep-alive connection between your web server and Twitter. This type of implementation may be unfamiliar to many PHP developers. As soon as tweets come in, Twitter notifies your server in real time, allowing you to store them into your database without the delay of polling the REST API. Use of the Streaming API is also not subject to Twitter's API rate limits.

    Here's a visualization of how it works:

    How the Twitter Streaming API works

    There are three variations of the Twitter Streaming API:

    1. The Public Stream. This allows your application to monitor public data on Twitter, such as public tweets, hashtag filters, et al. 
    2. The User Stream. This allows you to track a user's tweet stream in real time. Part three of this series will focus on the user stream. 
    3. Site Streams. Site streams allow your application to monitor real-time Twitter feeds for a large number of users. 

    The job of your streaming implementation is to log the incoming events as quickly as possible and process them in the background using the REST API as necessary to harvest deeper data. Site streams require prior approval from Twitter, which are likely reserved for larger companies and developers.

    Fortunately, there is a free, open-source library called Phirehose, which implements most of the streaming API requirements. This tutorial will describe how to integrate Phirehose into our open-source Birdcage application.

    The Phirehose Library

    Phirehose is a fantastic open-source PHP implementation of the Twitter Stream API requirements written by Fenn Bailey. As he describes it, Phirehose is meant to:

    • provide a simple interface to the Twitter Streaming API for PHP applications
    • comply with Streaming API recommendations for error handling, reconnection, etc.
    • encourage well-behaved streaming API clients
    • operate independently of PHP extensions (i.e. shared memory, PCNTL, etc.)

    I've found the library to operate quite flawlessly. There's more Phirehose documentation here.

    It's meant to maintain the connection with Twitter and respond to Twitter's data feed while operating indefinitely without interruption. It's not meant to perform detailed tweet processing and data hydrating, which we've described in part two of this series. This can be done separately.

    Running Phirehose Indefinitely

    Generally, you can't run a typical web-based cron task as an indefinite keep-alive operation. It's better to create a command line daemon.

    One of the powerful features Yii offers is the ability to run console-based applications from the command line. This will allow us to run a keep-alive command line application that utilizes the entire Birdcage PHP and MySQL framework that we've built.

    Building a Yii Console Command

    In the /app/ directory, outside the web accessible root, we'll add a stream.php file that runs our Phirehose streaming console command:

    <?php
    defined('YII_DEBUG') or define('YII_DEBUG',true);
    $yii=dirname(__FILE__).'/../framework/yii.php';
    $config=dirname(__FILE__).'/protected/config/main.php';
    require_once($yii);
    Yii::createConsoleApplication($config)->run();

    Next, we'll build the actual command file, StreamCommand.php, in the /app/protected/commands directory:

    <?php
    class StreamCommand extends CConsoleCommand
    {    
        // test with php ./app/stream.php Stream
        public function run($args)
        {
            // get Twitter user account keys
            $result = Account::model()->findByPk(1);
            $c = new Consumer($result['oauth_token'],$result['oauth_token_secret'],Phirehose::METHOD_USER);
            // load Twitter App keys
            $app = UserSetting::model()->loadPrimarySettings();
            $c->consumerKey = $app['twitter_key'];
            $c->consumerSecret = $app['twitter_secret'];
            $c->consume();
        }
    }

    It will launch the Phirehose process, Consumer, using our Twitter application and user keys.

    Note: For purposes of the Birdcage streaming example, we assume there is only one Twitter account registered and hardcode loading the credentials, e.g. account_id = 1.

    Integrating Phirehose

    To integrate Phirehose into Birdcage, I moved OAuthPhirehose.php and UserstreamPhirehose.php into the /app/protected/components directory. In my main.php configuration file, I added phirehose to the list of loaded components:

        'preload'=>array(
    	  'log',
    	  'bootstrap',
    	  'mailgun',
    	  'phirehose',
      	  'advanced'
    	  ),
    

    Then, I created a database migration to create a table to store raw data from the Twitter stream:

    class m140919_193106_create_stream_table extends CDbMigration
    {
         protected $MySqlOptions = 'ENGINE=InnoDB CHARSET=utf8 COLLATE=utf8_unicode_ci';
         public $tablePrefix;
         public $tableName;
    
         public function before() {
           $this->tablePrefix = Yii::app()->getDb()->tablePrefix;
           if ($this->tablePrefix <> '')
             $this->tableName = $this->tablePrefix.'stream';
         }
    
           public function safeUp()
       	{
       	  $this->before();
          $this->createTable($this->tableName, array(
                   'id' => 'pk',
                   'tweet_id' => 'bigint(20) unsigned NOT NULL',
                   'code' => 'text  NULL',
                   'is_processed' => 'tinyint default 0',
                   'created_at' => 'DATETIME NOT NULL DEFAULT 0',
                   'modified_at' => 'TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP',
                     ), $this->MySqlOptions);
                     $this->createIndex('tweet_id', $this->tableName , 'tweet_id', true);               
    
       	}

    I also created a new model called Consumer.php which extends OauthPhirehose with its required enqueueStatus method. 

    We want to minimize the amount of processing the real-time response needs to perform. Essentially, we just want to record the data received from Twitter to our database—and nothing else. We can do other processing in our own background tasks without slowing Phirehose's streaming connection. My function just takes tweet data from the incoming stream and stores it in the stream table:

    <?php
      class Consumer extends OauthPhirehose
      {
        // This function is called automatically by the Phirehose class
        // when a new tweet is received with the JSON data in $status
        public function enqueueStatus($status) {
          $stream_item = json_decode($status);
          if (!(isset($stream_item->id_str))) { return;}
          $s = new Stream;
          $s->tweet_id = $stream_item->id_str;
          $s->code = base64_encode(serialize($stream_item));
          $s->is_processed=0;
          $s->created_at = new CDbExpression('NOW()');          
          $s->modified_at =new CDbExpression('NOW()');          
          $s->save();
          var_dump($stream_item);
        }
      }
      ?>
    

    We'll rely on background tasks run by our DaemonController to process the data into the Bircdcage Tweet model. This is described further below.

    Activating Phirehose

    You can test Phirehose using the PHP console command:

    php ./app/stream.php Stream

    Twitter will send a stream of follower information for the user account, followed by real-time data as it arrives.

    To activate Phirehose as a keep-alive, always-on console command, we'll use the nohup command, e.g. no hangup, and redirect output to dev/null:

    nohup php ./app/stream.php Stream > /dev/null 2>&1&

    Ubuntu will respond with a job id of your process for future monitoring and termination:

    [1] 9768

    If you wish to check that the process is running, scan the task list for the job id:

    ps -e all | grep 9768

    You should see something like this:

    0  1000  9768  9743  20   0 273112 16916 poll_s S    pts/0      0:00 php ./app/stream.php Stream

    And you can terminate Phirehose by killing the job id:

    kill 9768

    In my experience, Phirehose has worked flawlessly with this technique operating without interruption for the past two weeks.

    Processing the Streaming Data

    We also need to create a background process in Birdcage which will process the streaming data into our Tweet, Mention, URL and Hashtag tables—as if it had come from the REST API.

    Change your twitter.ini file setting to use streams:

    twitter_stream = true

    And, we can use the same cron job from part two to run this operation:

    # To define the time you can provide concrete values for
    # minute (m), hour (h), day of month (dom), month (mon),
    # and day of week (dow) or use '*' in these fields (for 'any').# 
    # Notice that tasks will be started based on the cron's system
    # daemon's notion of time and timezones.
    # 
    # For example, you can run a backup of all your user accounts
    # at 5 a.m every week with:
    # 0 5 * * 1 tar -zcf /var/backups/home.tgz /home/
    # 
    # m h  dom mon dow   command
    */5 * * * * wget -O /dev/null http://birdcage.yourdomain.com/daemon/index

    Then, when DaemonController is called, it will activate the Stream model() process method:

        public function actionIndex() {
    	  // if not using twitter streams, we'll process tweets by REST API
    	  if (!Yii::app()->params['twitter_stream']) {
    	    Tweet::model()->getStreams();	    
    	  } else {
    	    Stream::model()->process();
    	  }
      }

    The process method unpacks the encoded stream data and parses each entry just as we did with content from the REST API:

        public function process() {
    	  // get unprocessed tweets from stream engine
    	  // to do 
    	  $account_id = 1;
    	  $items = Stream::model()->unprocessed()->findAll();
    	  foreach ($items as $i) {
    	    $tweet = unserialize(base64_decode($i['code']));
          Tweet::model()->parse($account_id,$tweet);
          $this->setStatus($i['id'],self::STREAM_PROCESSED);      
    	  }
    	}

    Birdcage currently ignores data from the stream that's not a tweet, e.g. notifications, direct messages, etc. I'll leave that to you to expand—or you can check out my expanded application, Birdhouse.

    In Closing

    I hope you've found this three-part Twitter API series informative and useful. By now, you've learned about OAuth, the REST API, the Streaming API, building a database for Twitter, processing the timeline with both types of APIs, properly counting characters in tweets and posting them, and more.

    Please post any comments, corrections, or additional ideas below. You can browse my other Tuts+ tutorials on my author page or follow me on Twitter @reifman.

     

    0 Comments

    Leave a comment › Posted in: Daily

0 Comments

Got anything to add?

(Basic HTML is fine)