Ratchet with Sessions

2,251 views
Skip to first unread message

jwhar...@gmail.com

unread,
Jul 28, 2013, 2:25:06 PM7/28/13
to ratch...@googlegroups.com
Hi everyone,

I am playing around with a todo application I found online, that is using Ratchet and I am trying to learn how to setup the Sessions so that when a user connects up (myself from different browsers for my learning) to say demo8.todo.app/demo8/index.php and someone connects to deom5.todo.app/demo5/index.php, they can use the same websocket server on the backend, but they only see what someone on the same domain posts/edits.  Right now, I do have the demo setup so that if I pass a variable to the index.php it only pulls that todo list and displays it to the visitor, but if someone on either site adds a new entry then both domains show it.  I believe that the sessionProvider will do what I am trying too do, but I am just having a hard time wrapping my brain around how to set it up and make it work.  I am not a programmer, just wanting to learn how to do it and help a buddy setup a websocket system for his site if I can.

If someone has a working demo that uses the Ratchet/WampServer, ZMQ, plus SessionProvider they are willing to share, I would greatly appreciate it. 

Thanks in advance.

Joe

elkorchi anass

unread,
Jul 30, 2013, 7:16:43 PM7/30/13
to ratch...@googlegroups.com
her is a server code that use session handler,you just need to setup your session handler :
$app=new AggregateApplication();
        $loop   = \React\EventLoop\Factory::create();
        $context = new \React\ZMQ\Context($loop);
        $pull = $context->getSocket(\ZMQ::SOCKET_PULL);
        $pull->bind('tcp://127.0.0.1:5555');
        $pull->on('message', array($app, 'EditMessage'));
        $webSock = new \React\Socket\Server($loop);
        $webSock->listen(8080, '127.0.0.1');
        $handler = $this->getContainer()->get('session.handler');
        $server=new \Ratchet\Wamp\WampServer($app);
        $server = new SessionProvider($server, $handler);
        $webServer = new \Ratchet\Server\IoServer(new \Ratchet\WebSocket\WsServer($server),$webSock);
        $loop->run();

Joseph Hardeman

unread,
Jul 30, 2013, 10:21:42 PM7/30/13
to ratch...@googlegroups.com
Hey elkorchi

I modified the server.php to what you put below and I got the following error:

"PHP Fatal error:  Using $this when not in object context in "

once I have a few more minutes, I will post the entire code I am playing with, just so that maybe you guys can spot my errors, which are probably many and let me know what i need to change. LOL

Joe

Joseph Hardeman

unread,
Jul 31, 2013, 11:34:50 AM7/31/13
to ratch...@googlegroups.com
Hi everyone,

Below is the code that I am playing with, I can get the original to run without Sessions, but I am playing with the Sessions trying to understand how to get the server to run with the session and actually start up, I am getting different errors based on what I change, so I am hoping someone can help me out.

Here is my todo-server.php file:

<?php

use Ratchet\Session\SessionProvider;
use Ratchet\Server\IoServer;
use Ratchet\WebSocket\WsServer;
use Ratchet\Wamp\WampServer;
use MyApp\Demo;
use Symfony\Component\HttpFoundation\Session\Session;
use Symfony\Component\HttpFoundation\Session\Storage\Handler;
use Symfony\Component\HttpFoundation\Session\Storage\NativeSessionStorage;
use Symfony\Component\HttpFoundation\Session\Storage\Handler\MemcacheSessionHandler;



//require __DIR__ . '/../vendor/autoload.php';
require dirname(__DIR__) . '/vendor/autoload.php';

//Session managemnt

$todo
= new MyApp\Demo();

    $memcache
= new Memcache;
    $memcache
->connect('localhost', 11211);



    $session
= new SessionProvider(
//        new MyApp\Demo()
        $todo
     
, new Handler\MemcacheSessionHandler($memcache)
   
);

   
// Make sure to run as root
   
// $server = IoServer::factory(new WsServer($session));
   
// $server->run();


//$todo = new MyApp\Demo();
//$todo = new \App\Todo\Demo();

// bind a ZeroMQ PULL listener
//$app=new AggregateApplication();
// initiate the loop listener

$loop
= \React\EventLoop\Factory::create();
$context
= new \React\ZMQ\Context($loop);
$pull
= $context->getSocket(\ZMQ::SOCKET_PULL);
$pull
->bind('tcp://127.0.0.1:5555');

$pull
->on('message', array($todo, 'onMessage'));
//$pull->on('message', array($session, 'onMessage'));

// Set up our WebSocket server for clients wanting real-time updates
$websock
= new \React\Socket\Server($loop);
$websock
->listen(8091, '0.0.0.0');

 $handler
= new $handler($this->getContainer()->get('session.handler'));
        $server
= new \Ratchet\Wamp\WampServer($todo);

        $server
= new SessionProvider($server, $handler);
        $webServer
= new \Ratchet\Server\IoServer(new \Ratchet\WebSocket\WsServer($server),$webSock);
        $loop
->run();
/**
$webserver = new \Ratchet\Server\IoServer(
    new \Ratchet\WebSocket\WsServer(
//      new \Symfony\Component\HttpFoundation\Session\Session(
      new \Ratchet\Session\SessionProvider(
        new \Ratchet\Wamp\WampServer(
            //$todo
                $session
        )
      )
    ),
    $websock
);

$loop->run();
**/



As you can see I have modified and remodified it, but it still will not start and run when I try to use the Session function.

Here is my Demo.php script:
<?php
namespace MyApp;

use Ratchet\ConnectionInterface;
use Ratchet\Wamp\WampServerInterface;
use Ratchet\MessageComponentInterface;
use Ratchet\Session\Serialize\HandlerInterface;

/**
 * This example application is based on the Ratchet Push tutorial found at:
 * http://socketo.me/docs/push
 */

class Demo implements WampServerInterface, MessageComponentInterface {

   
/**
     * A lookup of all the topics  clients have subscribed to. A topic is a common
     * word for PubSub namespaces.
     *
     * @var array
     */

   
protected $_subscribedTopics = array();

   
/**
     * We also keep track of the count of subscribed topics for purposes of
     * garbage collection.
     *
     * @var array
     */

   
protected $_subscribedCount = array();

   
/**
     * An array of locked todos due to editing.
     *
     * @var array
     */

   
protected $_locked = array();

   
/**
     * Whether one of the users is repositioning the sortable.
     *
     * @var array
     */

   
protected $_repositioning = FALSE;

   
/**
     * This method gets called when a client subscribes to a particular PubSub
     * category/topic.
     *
     * @access  public
     * @param   ConnectionInterface     $conn
     * @param   object                  $topic
     * @return  void
     */

   
public function onSubscribe(ConnectionInterface $conn, $topic)
   
{
       
// ensure we add the topic to the list of subscribed topics
       
if (!isset($this->_subscribedTopics[$topic->getId()])) {
            error_log
('User ' . $conn->WAMP->sessionId . ' subscribed to topic ' . $topic->getId());
            $this
->_subscribedTopics[$topic->getId()] = $topic;
            $this
->_subscribedCount[$topic->getId()] = 0;
       
}

       
// increment subscribed count
        $this
->_subscribedCount[$topic->getId()]++;
   
}

   
/**
     * This method gets called when a client unsubscribes from a particular PubSub
     * category/topic.
     *
     * @access  public
     * @param   ConnectionInterface     $conn
     * @param   object                  $topic
     * @return  void
     */

   
public function onUnSubscribe(ConnectionInterface $conn, $topic)
   
{
       
// ensure we add the topic to the list of subscribed topics
       
if (!isset($this->_subscribedTopics[$topic->getId()])) {
           
return;
       
}

        error_log
('User ' . $conn->WAMP->sessionId . ' unsubscribed from topic ' . $topic->getId());

       
// decrement the topic count
        $this
->_subscribedCount[$topic->getId()]--;

       
// if the count is 0, we can remove the topic (garbage collection)
       
if ($this->_subscribedCount[$topic->getId()] <= 0) {
            unset
(
                $this
->_subscribedTopics[$topic->getId()],
                $this
->_subscribedCount[$topic->getId()]
           
);
       
}
   
}

   
/**
     * Triggered on initial opening of connection with a given browser client
     * (end user).
     *
     * @access  public
     * @param   ConnectionInterface $clientConn
     * @return  mixed
     */

   
public function onOpen(ConnectionInterface $conn)
   
{
        error_log
('onOpen event triggered for user ' . $conn->WAMP->sessionId);

       
try {

/**            // add to list of connected users
            $this->_connections[$conn->WAMP->sessionId] = TRUE;

                //$conn->send('Hello ' . $conn->Session->get('name'));

            // broadcast connected message to others
            $this->broadcastMessage(
                'connected',
                json_encode(array('id' => $conn->WAMP->sessionId))
            );
**/

        $user
= $conn->Session->get('name');
        $conn
->send('Hello ' . $user['displayName']);
        print_r
($conn->Session->all());

       
} catch (Exception $e) {
            error_log
($e->getMessage());
       
}
   
}



   
/**
     * Triggered on closing of connection with a given browser client (end user).
     * Simply detaches (removes the client connection from the list of active
     * clients).
     *
     * @access  public
     * @param   ConnectionInterface $clientConn
     * @return  mixed
     */

   
public function onClose(ConnectionInterface $conn)
   
{
        error_log
('onClose event triggered for user ' . $conn->WAMP->sessionId);

       
try {

            $user_id
= $conn->WAMP->sessionId;

           
// check if the user held any locks
           
foreach ($this->_locked as $id => $user_lock_id) {
               
if ($user_id == $user_lock_id) {
                   
// remove lock
                    unset
($this->_locked[$id]);

                   
// inform all users of unlock
                    $this
->broadcastMessage(
                       
'unlock',
                        json_encode
(array('user_id' => $user_id))
                   
);
               
}
           
}

           
if ($this->_repositioning == $user_id) {
                $this
->_repositioning = false;

               
// inform all users of unlock
                $this
->broadcastMessage(
                   
'finish-reposition',
                    json_encode
(array('user_id' => $user_id))
               
);
           
}

            $this
->broadcastMessage(
               
'disconnected',
                json_encode
(array('id' => $conn->WAMP->sessionId))
           
);

       
} catch (Exception $e) {
            error_log
($e->getMessage());
       
}
   
}

   
/**
     *
     */

   
public function onCall(ConnectionInterface $conn, $id, $topic, array $params)
   
{
       
// In this application if clients send data it's because the user hacked around in console
        $conn
->callError($id, $topic, 'You are not allowed to make calls')->close();
   
}

   
/**
     * Handler for events triggered by Autobahn's publish method.
     *
     * @access  public
     * @param   ConnectionInterface $conn
     * @param   string              $topic
     * @param   array               $event      The event data
     */

   
public function onPublish(ConnectionInterface $conn, $topic, $event, array $exclude, array $eligible)
   
{
       
try {

           
// get the publisher id
            $return
= false;
            $user_id
= $conn->WAMP->sessionId;
            $type
= $topic->getId();

            error_log
('onPublish event triggered: ' . $type);

           
// determine how to handle publish event
           
if ($type == 'lock') {
               
// check if the field is locked
               
if (!isset($this->_locked[$event['id']])) {
                   
// set as locked and notify parties
                    $this
->_locked[$event['id']] = $event['user_id'];
                    $return
= true;
               
}
           
} else if ($type == 'unlock') {
               
if (isset($this->_locked[$event['id']])) {
                    unset
($this->_locked[$event['id']]);
                    $return
= true;
               
}
           
} else if ($type == 'reposition') {
               
if (!$this->_repositioning) {
                    $this
->_repositioning = $event['user_id'];
                    $return
= true;
               
}
           
} else if ($type == 'finish-reposition') {
               
if ($this->_repositioning) {
                    $this
->_repositioning = false;
                    $return
= true;
               
}
           
} else if ($type == 'sort') {
               
// send the new sort data to other clients
                $return
= true;
           
} else if ($type == 'update') {
               
return $true;
           
}

           
// broadcast the data out to subscribers
           
if ($return === true) {
                $this
->broadcastMessage($type, json_encode($event));
           
}

       
} catch (Exception $e) {
            error_log
($e->getMessage());
       
}
   
}

   
/**
     * Triggered on receiving an error from a given browser client (end user).
     * Simply close the existing socket connection.
     *
     * @access  public
     * @param   ConnectionInterface $conn
     * @return  mixed
     */

   
public function onError(ConnectionInterface $conn, \Exception $e)
   
{
        error_log
('An error occurred, closing connection.');
        error_log
($e->getMessage());

        $conn
->close();
   
}

   
/**
     * Handles incoming "message" data from ZeroMQ. In this case, we're assuming
     * incoming data is JSON encoded. We take the incoming data, check it for
     * a particular message type, and broadcast the message out to all clients
     * subscribed to the particular pubsub category.
     *
     * @param   mixed   $entry
     * @return  void
     */

   
public function onMessage(ConnectionInterface $from, $entry)
   
{
       
try {

            error_log
('Message received.');

           
// decode the message
            $message
= json_decode($entry, true);

           
// wrapper for broadcasting topic by type
            $this
->broadcastMessage($message['type'], $entry);

       
} catch (Exception $e) {
            error_log
($e->getMessage());
       
}
   
}

   
/**
     * Handles broadcasting the message out to all topic subscribers.
     *
     * @access  public
     * @param   string  $type
     * @param   mixed   $entry
     */

   
public function broadcastMessage($type, $entry)
   
{
       
try {

           
// If the lookup topic object isn't set there is no one to publish to
           
if (!isset($this->_subscribedTopics[$type])) {
               
return false;
           
}

           
// grab the topic for broadcast
            $topic
= $this->_subscribedTopics[$type];

           
// re-send the serialized JSON to all the clients subscribed to that category
            $topic
->broadcast($entry);

           
return true;

       
} catch (Exception $e) {
            error_log
($e->getMessage());
       
}

       
return false;
   
}

}


And last here is the Session info I put into my index file, which does show my domain name.

// setup memcaceh session
use Ratchet\Session\SessionProvider;
use Symfony\Component\HttpFoundation\Session\Session;
use Symfony\Component\HttpFoundation\Session\Storage\Handler;
use Symfony\Component\HttpFoundation\Session\Storage\NativeSessionStorage;
use Symfony\Component\HttpFoundation\Session\Storage\Handler\MemcacheSessionHandler;


require '/opt/php-websockets-servers/servertest/vendor/autoload.php';

$memcache
= new Memcache;
$memcache
->connect('localhost', 11211);
$storage
= new NativeSessionStorage(array(), new MemcacheSessionHandler($memcache));

$session
= new Session();

$session
->start();
$session
->set('name', $_SERVER['SERVER_NAME']);
print_r
($session->all());





I grabbed a demo off line that I am using to play with, it works with a single domain and page, but I would like to figure out how to use it with multiple domain names.  I have a buddy who has asked if I could help figure this out and even though I am not a programmer, I am making a little headway, just stuck on this last par, at least for now.  LOL

Thanks, everyone, any help would be much appreciated.

Joe

elkorchi anass

unread,
Jul 31, 2013, 7:28:25 PM7/31/13
to ratch...@googlegroups.com
As I see you are working on Symfony Framework so this what you should to do I ll give a small tutorial :

1- put you re server code in a ContainerAwareCommand in other way you will create a you re own console command to run the server script here is an example :

<?php
namespace App\YoureBundle\Command;
use Symfony\Bundle\FrameworkBundle\Command\ContainerAwareCommand;
use Symfony\Component\Console\Input\InputArgument;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Input\InputOption;
use Symfony\Component\Console\Output\OutputInterface;
use \Ratchet\Session\SessionProvider;
class ListenCommand extends ContainerAwareCommand
{
    protected function configure()
    {
        $this->setName('websocket:listen')
            ->setDescription('Listen for websocket requests (blocks indefinitely)')
            ->addOption('port', 'p', InputOption::VALUE_REQUIRED, 'The port to listen on', 8000)
            ->addOption('interface', 'i', InputOption::VALUE_REQUIRED, 'The interface to listen on', '0.0.0.0');
    }

    protected function execute(InputInterface $input, OutputInterface $output)
    {       
        $app=new AggregateApplication();
        $loop   = \React\EventLoop\Factory::create();
        $context = new \React\ZMQ\Context($loop);
        $pull = $context->getSocket(\ZMQ::SOCKET_PULL);
        $pull->bind('tcp://127.0.0.1:5555');
        $pull->on('message', array($app, 'EditMessage'));
        $webSock = new \React\Socket\Server($loop);
        $webSock->listen(8080, '127.0.0.1');
        $handler = $this->getContainer()->get('session.handler');
        $server=new \Ratchet\Wamp\WampServer($app);
        $server = new SessionProvider($server, $handler);
        $webServer = new \Ratchet\Server\IoServer(new \Ratchet\WebSocket\WsServer($server),$webSock);
        $loop->run();

        }
}
?>
2- you need to create an app class and it should be in the same folder that you have put you re command
3- you need to configure the session handler in /app/config/config.yml I am using PdoSessionHandler here is an exemple :
framework:
    #esi:             ~
    #translator:      { fallback: %locale% }
    secret:          %secret%
    router:
        resource: "%kernel.root_dir%/config/routing.yml"
        strict_requirements: %kernel.debug%
    form:            ~
    csrf_protection: ~
    validation:      { enable_annotations: true }
    templating:
        engines: ['twig']
        #assets_version: SomeVersionScheme
    default_locale:  "%locale%"
    trusted_proxies: ~
    session:      
            handler_id:     session.handler.pdo
    fragments:       ~
parameters:
    pdo.db_options:
        db_table:    session
        db_id_col:   session_id
        db_data_col: session_value
        db_time_col: session_time
services:
    pdo:
        class: PDO
        arguments:
            dsn:      "mysql:dbname=DBNAME"
            user:     root
            password: ~
        calls:
            - [setAttribute, [3, 2]] # \PDO::ATTR_ERRMODE, \PDO::ERRMODE_EXCEPTION

    session.handler.pdo:
        class:     Symfony\Component\HttpFoundation\Session\Storage\Handler\PdoSessionHandler
        arguments: ["@pdo", "%pdo.db_options%"]
4. you need to create a table called "session" in you re database containing columns :session_id varchar(255) session_value(text) session_time(int (11))

Le dimanche 28 juillet 2013 19:25:06 UTC+1, Joseph Hardeman a écrit :

Joseph Hardeman

unread,
Jul 31, 2013, 9:11:16 PM7/31/13
to ratch...@googlegroups.com
Hey elkorchi

I was just trying different things I saw on line, I am not stuck on symfony2 or anything, I was just trying to get the sessions to work. LOL

I guess if that works the best I will use it, if I had a working demo to look at and follow the code, I could definitely figure it out. 

I am trying to work through what you sent over and see if I can get it working.

thanks

Joe

Seshachalam Malisetti

unread,
Oct 15, 2013, 1:13:58 AM10/15/13
to ratch...@googlegroups.com
Your session handler service name is session.handler.pdo then why are you using session.handler in listen command class. both these services should be same ?
Reply all
Reply to author
Forward
0 new messages