Consuming User Streams with Phirehose

181 views
Skip to first unread message

Jonathon Hill

unread,
Sep 30, 2010, 5:59:59 PM9/30/10
to Phirehose Users
User Streams just went into production, and I'm trying to use a
modified version of Phirehose to consume them. The following is based
closely on a proof-of-concept script that worked at the Chirp
conference back in April... but now I can't seem to get anything but
this:

Phirehose: Connecting to twitter stream: https://userstream.twitter.com/2/user.json
with params: array ( 'delimited' => 'length', 'track' =>
'#newtwitter',)
Phirehose: Resolved host userstream.twitter.com to 128.242.245.39
Phirehose: Connecting to 128.242.245.39
Phirehose: TCP failure 1 of 20 connecting to stream: Operation timed
out (60). Sleeping for 1 seconds.

... and on and on. What am I missing or doing wrong?

Jonathon Hill


<?php

class Intake2 extends Phirehose {

const URL_BASE = 'https://userstream.twitter.com/2/';
const METHOD_USER = 'user';

private static $instance;
public $log_to = 'screen';


public static function Initialize() {

if(! self::$instance instanceof self) {
self::$instance = new self(
Config::get('twitter_stream_api_username'),
Config::get('twitter_stream_api_password'),
Intake2::METHOD_USER
);
}

}


public static function start() {
self::Initialize();
self::$instance->log_to = PATH_APP.'logs/intake2.log';
self::$instance->checkFilterPredicates();
self::$instance->consume();
}


public function enqueueStatus($status, $log =
'intake2_memory_usage.csv') {

# stuff the tweet into a database table
$sql = sprintf("INSERT INTO stream_queue SET contents = %s",
database()->escape($status));
database()->execute($sql);

# @DEBUG
error_log(format_date_mysql().','.(memory_get_usage()/1024)."\r\n",
3, PATH_APP.'logs/'.$log);
}


protected function checkFilterPredicates() {
$this->setTrack(array('#newtwitter'));
}


/**
* Connects to the stream URL using the configured method.
*/
protected function connect() {

// Init state
$connectFailures = 0;
$tcpRetry = self::TCP_BACKOFF / 2;
$httpRetry = self::HTTP_BACKOFF / 2;

// Keep trying until connected (or max connect failures exceeded)
do {

// Check filter predicates for every connect (for filter
method)
if ($this->method == self::METHOD_FILTER) {
$this->checkFilterPredicates();
}

// Construct URL/HTTP bits
$url = self::URL_BASE . $this->method . '.' . $this->format;
$urlParts = parse_url($url);
$authCredentials = base64_encode($this->username . ':' . $this-
>password);

// Setup params appropriately
$requestParams = array('delimited' => 'length');

// Filter takes additional parameters
if ($this->method == self::METHOD_USER && count($this-
>trackWords) > 0) {
$requestParams['track'] = implode(',', $this->trackWords);
}
if ($this->method == self::METHOD_USER && count($this-
>followIds) > 0) {
$requestParams['follow'] = implode(',', $this->followIds);
}


// Debugging is useful
$this->log('Connecting to twitter stream: ' . $url . ' with
params: ' . str_replace("\n", '',
var_export($requestParams, TRUE)));

/**
* Open socket connection to make POST request. It'd be nice to
use stream_context_create with the native
* HTTP transport but it hides/abstracts too many required bits
(like HTTP error responses).
*/
$errNo = $errStr = NULL;
$scheme = ($urlParts['scheme'] == 'https') ? 'ssl://' :
'tcp://';

/**
* We must perform manual host resolution here as Twitter's IP
regularly rotates (ie: DNS TTL of 60 seconds) and
* PHP appears to cache it the result if in a long running
process (as per Phirehose).
*/
$streamIPs = gethostbynamel($urlParts['host']);
if (empty($streamIPs)) {
throw new PhirehoseNetworkException("Unable to resolve
hostname: '" . $urlParts['host'] . '"');
}

// Choose one randomly (if more than one)
$this->log('Resolved host ' . $urlParts['host'] . ' to ' .
implode(', ', $streamIPs));
$streamIP = $streamIPs[rand(0, (count($streamIPs) - 1))];
$this->log('Connecting to ' . $streamIP);

@$this->conn = fsockopen($scheme . $streamIP, 80, $errNo,
$errStr, $this->connectTimeout);

// No go - handle errors/backoff
if (!$this->conn || !is_resource($this->conn)) {
$this->lastErrorMsg = $errStr;
$this->lastErrorNo = $errNo;
$connectFailures ++;
if ($connectFailures > $this->connectFailuresMax) {
$msg = 'TCP failure limit exceeded with ' .
$connectFailures . ' failures. Last error: ' . $errStr;
$this->log($msg);
throw new PhirehoseConnectLimitExceeded($msg,
$errNo); // Throw an exception for other code to handle
}
// Increase retry/backoff up to max
$tcpRetry = ($tcpRetry < self::TCP_BACKOFF_MAX) ? $tcpRetry *
2 : self::TCP_BACKOFF_MAX;
$this->log('TCP failure ' . $connectFailures . ' of ' . $this-
>connectFailuresMax . ' connecting to stream: ' .
$errStr . ' (' . $errNo . '). Sleeping for ' . $tcpRetry .
' seconds.');
sleep($tcpRetry);
continue;
}

// TCP connect OK, clear last error (if present)
$this->log('Connection established to ' . $streamIP);
$this->lastErrorMsg = NULL;
$this->lastErrorNo = NULL;

// If we have a socket connection, we can attempt a HTTP
request - Ensure blocking read for the moment
stream_set_blocking($this->conn, 1);

// Encode request data
$postData = http_build_query($requestParams);

// Do it
fwrite($this->conn, "POST " . $urlParts['path'] . " HTTP/1.0\r
\n");
fwrite($this->conn, "Host: " . $urlParts['host'] . "\r\n");
fwrite($this->conn, "Content-type: application/x-www-form-
urlencoded\r\n");
fwrite($this->conn, "Content-length: " . strlen($postData) . "\r
\n");
fwrite($this->conn, "Accept: */*\r\n");
fwrite($this->conn, 'Authorization: Basic ' .
$authCredentials . "\r\n");
fwrite($this->conn, 'User-Agent: ' . self::USER_AGENT . "\r
\n");
fwrite($this->conn, "\r\n");
fwrite($this->conn, $postData . "\r\n");
fwrite($this->conn, "\r\n");

// First line is response
list($httpVer, $httpCode, $httpMessage) = preg_split('/\s+/',
trim(fgets($this->conn, 1024)), 3);

// Response buffers
$respHeaders = $respBody = '';

// Consume each header response line until we get to body
while ($hLine = trim(fgets($this->conn, 4096))) {
$respHeaders .= $hLine;
}

// If we got a non-200 response, we need to backoff and retry
if ($httpCode != 200) {
$connectFailures ++;

// Twitter will disconnect on error, but we want to consume
the rest of the response body (which is useful)
while ($bLine = trim(fgets($this->conn, 4096))) {
$respBody .= $bLine;
}

// Construct error
$errStr = 'HTTP ERROR ' . $httpCode . ': ' . $httpMessage .
' (' . $respBody . ')';

// Set last error state
$this->lastErrorMsg = $errStr;
$this->lastErrorNo = $httpCode;

// Have we exceeded maximum failures?
if ($connectFailures > $this->connectFailuresMax) {
$msg = 'Connection failure limit exceeded with ' .
$connectFailures . ' failures. Last error: ' . $errStr;
$this->log($msg);
throw new PhirehoseConnectLimitExceeded($msg,
$httpCode); // We eventually throw an exception for other code to
handle
}
// Increase retry/backoff up to max
$httpRetry = ($httpRetry < self::HTTP_BACKOFF_MAX) ?
$httpRetry * 2 : self::HTTP_BACKOFF_MAX;
$this->log('HTTP failure ' . $connectFailures . ' of ' .
$this->connectFailuresMax . ' connecting to stream: ' .
$errStr . '. Sleeping for ' . $httpRetry . ' seconds.');
sleep($httpRetry);
continue;

} // End if not http 200

// Loop until connected OK
} while (!is_resource($this->conn) || $httpCode != 200);

// Connected OK, reset connect failures
$connectFailures = 0;
$this->lastErrorMsg = NULL;
$this->lastErrorNo = NULL;

// Switch to non-blocking to consume the stream (important)
stream_set_blocking($this->conn, 0);

// Connect always causes the filterChanged status to be cleared
$this->filterChanged = FALSE;

// Flush stream buffer & (re)assign fdrPool (for reconnect)
$this->fdrPool = array($this->conn);
$this->buff = '';

}

}

Scott Wilcox

unread,
Sep 30, 2010, 6:16:20 PM9/30/10
to phireho...@googlegroups.com
Phirehose uses basic auth, userstreams is OAuth only.


Scott Wilcox

t: +44 (0) 7538 842418
+1 (646) 257 0580
e: sc...@dor.ky
w: http://dor.ky

John Kalucki

unread,
Sep 30, 2010, 7:41:55 PM9/30/10
to phireho...@googlegroups.com
There's that, and also it's possible that it tried to open an HTTP
connection, despite you asking for HTTPS. The connection timed out,
which means either going to the non-existant HTTP port, or your IP has
been blacklisted.

-John Kalucki
http://twitter.com/jkalucki
Twitter, Inc

Jonathon Hill

unread,
Sep 30, 2010, 8:18:22 PM9/30/10
to phireho...@googlegroups.com
I do not have this problem with the normal Streaming API, only with User Streams, and I am not using OAuth.

On investigation, I found:

@$this->conn = fsockopen($scheme . $streamIP, 80, $errNo, $errStr, $this->connectTimeout);

So it was connecting to port 80 instead of 443. After changing that, I now get this error:

Connecting to twitter stream: https://userstream.twitter.com/2/user.json with params: array ( 'delimited' => 'length', 'track' => '#newtwitter',)

Resolved host userstream.twitter.com to 128.242.245.39

Connection established to 128.242.245.39
Phirehose: HTTP failure 1 of 20 connecting to stream: HTTP ERROR 401: Unauthorized (Basic auth not supported). Sleeping for 10 seconds.


Jonathon


--
This e-mail, and any attachments thereto, is intended only for use by the addressee(s) named herein and may contain legally privileged and/or confidential information. If you are not the intended recipient of this e-mail (or the person responsible for delivering this document to the intended recipient), you are hereby notified that any dissemination, distribution, printing, or coping of this e-mail, and any attachment thereto, is strictly prohibited. If you have received this e-mail in error, please respond to the individual sending the message, and permanently delete the original and any copy of any e-mail and printout thereof.

John Kalucki

unread,
Oct 1, 2010, 12:17:01 AM10/1/10
to phireho...@googlegroups.com
Basic auth is not supported on User Streams, only OAuth. This is
documented here: http://dev.twitter.com/pages/user_streams

Note that if you are using Phirehose to build a server-side
application that will open more than a very few User Streams
connections, you must be using Site Streams.

-John Kalucki
http://twitter.com/jkalucki
Twitter, Inc.

Fenn Bailey

unread,
Oct 1, 2010, 1:05:14 AM10/1/10
to phireho...@googlegroups.com
Hey All,

Phirehose is definitely not (currently) intended to be used for User Streams. In fact, I believe the current version was released before User Streams beta was even announced :)

OAuth + User Streams is probably the most requested two features, and is definitely something I'm planning on implementing. It does (as shown above) require some fairly significant changes to the client though.

Cheers,

  Fenn.

John Kalucki

unread,
Oct 1, 2010, 1:13:18 AM10/1/10
to phireho...@googlegroups.com
I'm assuming the target audience of Phirehose is server applications.
I'd suggest going for Site Streams before User Streams if this is
generally true.

-John

Fenn Bailey

unread,
Oct 1, 2010, 1:17:50 AM10/1/10
to phireho...@googlegroups.com
That makes more sense to me too, though quite a few people have already requested user streams.

Could anyone who is after such functionality possibly give us some example use-cases so perhaps John (and myself) can review/evaluate?

Thanks,

  Fenn.

Jonathon Hill

unread,
Oct 1, 2010, 1:59:28 AM10/1/10
to phireho...@googlegroups.com
Absolutely. My app is a robot of sorts, that listens for commands in the
form of direct messages to the main app twitter account. Before user
streams, I had to continually poll for direct messages. Now, I can use a
single user streams connection to listen for those DMs.

So obviously, that's only one connection and will not be a problem.

I have managed to get OAuth working with Phirehose and User Streams,
thanks to some borrowed code from Jason Maithai's twitter-async PHP
library (see the attached PHP file).

Fenn, the only problem left is that Phirehose isn't passing the entire
message to enqueueStatus(). Any ideas? I have not been able to detect any
pattern in where the message is cut off, except that longer tweets seem to
get cut off shorter.

Thanks,

Jonathon Hill


On Fri, 01 Oct 2010 01:17:50 -0400, Fenn Bailey <fenn....@gmail.com>
wrote:

Intake2.php

Jonathon Hill

unread,
Oct 1, 2010, 5:21:12 PM10/1/10
to phireho...@googlegroups.com
I found the problem, and it is due to an inconsistency between the Streaming API and the User Streams API.

Streaming API - each status is prefixed by a numeric message length, in decimal numbers.

User Streams API - each status is prefixed by a numeric message length, in HEXADECIMAL numbers.

I've attached a patch to Phirehose.php to allow fixing this by extension for those of us who are consuming a User Stream. Would love to see it included in your next release.


~ Jonathon

Phirehose.diff

John Kalucki

unread,
Oct 1, 2010, 5:46:26 PM10/1/10
to phireho...@googlegroups.com
User Streams and Streaming use the same code.

Note that the stream is HTTP chunk encoded. You can add an optional
length=delimited to get the message length in decimal in the encoded
stream.

-John

Reply all
Reply to author
Forward
0 new messages