User Streams just went into production, and I'm trying to use a
modified version of Phirehose to consume them. The following is based
closely on a proof-of-concept script that worked at the Chirp
conference back in April... but now I can't seem to get anything but
this:
Phirehose: Connecting to twitter stream:
https://userstream.twitter.com/2/user.json
with params: array ( 'delimited' => 'length', 'track' =>
'#newtwitter',)
Phirehose: Resolved host
userstream.twitter.com to 128.242.245.39
Phirehose: Connecting to 128.242.245.39
Phirehose: TCP failure 1 of 20 connecting to stream: Operation timed
out (60). Sleeping for 1 seconds.
... and on and on. What am I missing or doing wrong?
Jonathon Hill
<?php
class Intake2 extends Phirehose {
const URL_BASE = '
https://userstream.twitter.com/2/';
const METHOD_USER = 'user';
private static $instance;
public $log_to = 'screen';
public static function Initialize() {
if(! self::$instance instanceof self) {
self::$instance = new self(
Config::get('twitter_stream_api_username'),
Config::get('twitter_stream_api_password'),
Intake2::METHOD_USER
);
}
}
public static function start() {
self::Initialize();
self::$instance->log_to = PATH_APP.'logs/intake2.log';
self::$instance->checkFilterPredicates();
self::$instance->consume();
}
public function enqueueStatus($status, $log =
'intake2_memory_usage.csv') {
# stuff the tweet into a database table
$sql = sprintf("INSERT INTO stream_queue SET contents = %s",
database()->escape($status));
database()->execute($sql);
# @DEBUG
error_log(format_date_mysql().','.(memory_get_usage()/1024)."\r\n",
3, PATH_APP.'logs/'.$log);
}
protected function checkFilterPredicates() {
$this->setTrack(array('#newtwitter'));
}
/**
* Connects to the stream URL using the configured method.
*/
protected function connect() {
// Init state
$connectFailures = 0;
$tcpRetry = self::TCP_BACKOFF / 2;
$httpRetry = self::HTTP_BACKOFF / 2;
// Keep trying until connected (or max connect failures exceeded)
do {
// Check filter predicates for every connect (for filter
method)
if ($this->method == self::METHOD_FILTER) {
$this->checkFilterPredicates();
}
// Construct URL/HTTP bits
$url = self::URL_BASE . $this->method . '.' . $this->format;
$urlParts = parse_url($url);
$authCredentials = base64_encode($this->username . ':' . $this-
>password);
// Setup params appropriately
$requestParams = array('delimited' => 'length');
// Filter takes additional parameters
if ($this->method == self::METHOD_USER && count($this-
>trackWords) > 0) {
$requestParams['track'] = implode(',', $this->trackWords);
}
if ($this->method == self::METHOD_USER && count($this-
>followIds) > 0) {
$requestParams['follow'] = implode(',', $this->followIds);
}
// Debugging is useful
$this->log('Connecting to twitter stream: ' . $url . ' with
params: ' . str_replace("\n", '',
var_export($requestParams, TRUE)));
/**
* Open socket connection to make POST request. It'd be nice to
use stream_context_create with the native
* HTTP transport but it hides/abstracts too many required bits
(like HTTP error responses).
*/
$errNo = $errStr = NULL;
$scheme = ($urlParts['scheme'] == 'https') ? 'ssl://' :
'tcp://';
/**
* We must perform manual host resolution here as Twitter's IP
regularly rotates (ie: DNS TTL of 60 seconds) and
* PHP appears to cache it the result if in a long running
process (as per Phirehose).
*/
$streamIPs = gethostbynamel($urlParts['host']);
if (empty($streamIPs)) {
throw new PhirehoseNetworkException("Unable to resolve
hostname: '" . $urlParts['host'] . '"');
}
// Choose one randomly (if more than one)
$this->log('Resolved host ' . $urlParts['host'] . ' to ' .
implode(', ', $streamIPs));
$streamIP = $streamIPs[rand(0, (count($streamIPs) - 1))];
$this->log('Connecting to ' . $streamIP);
@$this->conn = fsockopen($scheme . $streamIP, 80, $errNo,
$errStr, $this->connectTimeout);
// No go - handle errors/backoff
if (!$this->conn || !is_resource($this->conn)) {
$this->lastErrorMsg = $errStr;
$this->lastErrorNo = $errNo;
$connectFailures ++;
if ($connectFailures > $this->connectFailuresMax) {
$msg = 'TCP failure limit exceeded with ' .
$connectFailures . ' failures. Last error: ' . $errStr;
$this->log($msg);
throw new PhirehoseConnectLimitExceeded($msg,
$errNo); // Throw an exception for other code to handle
}
// Increase retry/backoff up to max
$tcpRetry = ($tcpRetry < self::TCP_BACKOFF_MAX) ? $tcpRetry *
2 : self::TCP_BACKOFF_MAX;
$this->log('TCP failure ' . $connectFailures . ' of ' . $this-
>connectFailuresMax . ' connecting to stream: ' .
$errStr . ' (' . $errNo . '). Sleeping for ' . $tcpRetry .
' seconds.');
sleep($tcpRetry);
continue;
}
// TCP connect OK, clear last error (if present)
$this->log('Connection established to ' . $streamIP);
$this->lastErrorMsg = NULL;
$this->lastErrorNo = NULL;
// If we have a socket connection, we can attempt a HTTP
request - Ensure blocking read for the moment
stream_set_blocking($this->conn, 1);
// Encode request data
$postData = http_build_query($requestParams);
// Do it
fwrite($this->conn, "POST " . $urlParts['path'] . " HTTP/1.0\r
\n");
fwrite($this->conn, "Host: " . $urlParts['host'] . "\r\n");
fwrite($this->conn, "Content-type: application/x-www-form-
urlencoded\r\n");
fwrite($this->conn, "Content-length: " . strlen($postData) . "\r
\n");
fwrite($this->conn, "Accept: */*\r\n");
fwrite($this->conn, 'Authorization: Basic ' .
$authCredentials . "\r\n");
fwrite($this->conn, 'User-Agent: ' . self::USER_AGENT . "\r
\n");
fwrite($this->conn, "\r\n");
fwrite($this->conn, $postData . "\r\n");
fwrite($this->conn, "\r\n");
// First line is response
list($httpVer, $httpCode, $httpMessage) = preg_split('/\s+/',
trim(fgets($this->conn, 1024)), 3);
// Response buffers
$respHeaders = $respBody = '';
// Consume each header response line until we get to body
while ($hLine = trim(fgets($this->conn, 4096))) {
$respHeaders .= $hLine;
}
// If we got a non-200 response, we need to backoff and retry
if ($httpCode != 200) {
$connectFailures ++;
// Twitter will disconnect on error, but we want to consume
the rest of the response body (which is useful)
while ($bLine = trim(fgets($this->conn, 4096))) {
$respBody .= $bLine;
}
// Construct error
$errStr = 'HTTP ERROR ' . $httpCode . ': ' . $httpMessage .
' (' . $respBody . ')';
// Set last error state
$this->lastErrorMsg = $errStr;
$this->lastErrorNo = $httpCode;
// Have we exceeded maximum failures?
if ($connectFailures > $this->connectFailuresMax) {
$msg = 'Connection failure limit exceeded with ' .
$connectFailures . ' failures. Last error: ' . $errStr;
$this->log($msg);
throw new PhirehoseConnectLimitExceeded($msg,
$httpCode); // We eventually throw an exception for other code to
handle
}
// Increase retry/backoff up to max
$httpRetry = ($httpRetry < self::HTTP_BACKOFF_MAX) ?
$httpRetry * 2 : self::HTTP_BACKOFF_MAX;
$this->log('HTTP failure ' . $connectFailures . ' of ' .
$this->connectFailuresMax . ' connecting to stream: ' .
$errStr . '. Sleeping for ' . $httpRetry . ' seconds.');
sleep($httpRetry);
continue;
} // End if not http 200
// Loop until connected OK
} while (!is_resource($this->conn) || $httpCode != 200);
// Connected OK, reset connect failures
$connectFailures = 0;
$this->lastErrorMsg = NULL;
$this->lastErrorNo = NULL;
// Switch to non-blocking to consume the stream (important)
stream_set_blocking($this->conn, 0);
// Connect always causes the filterChanged status to be cleared
$this->filterChanged = FALSE;
// Flush stream buffer & (re)assign fdrPool (for reconnect)
$this->fdrPool = array($this->conn);
$this->buff = '';
}
}