Java sample for Darwin Push Port

693 views
Skip to first unread message

Joel Haasnoot

unread,
Apr 12, 2015, 6:50:29 AM4/12/15
to openrail...@googlegroups.com
Hi all,

I've been playing with the Darwin Push Port and got things working with Python, but wanted tot move over to Java (has several advantages). 

I've found two main options for Java clients - Gozirra and Stampy. I've tried both but have gotten stuck on both. Stampy does nothing - don't ever get notified, while with Gozirra all messages come in as errors due to an "invalid command". For those interested, my basic client code is here: https://gist.github.com/joelhaasnoot/1c2df99e09b2b5dd85ff

Does anyone either have pointers, a working sample (with Stomp, Onewire, whatever it may be) or a success story?

Thanks,


Joel Haasnoot

Peter Mount

unread,
Apr 12, 2015, 7:08:20 AM4/12/15
to Joel Haasnoot, openraildata-talk

I'm running with java 8 & open wire.

https://github.com/peter-mount/opendata

Look under applications for both nr & push port clients & brokers for the open wire & rabbit code.

Peter

--
You received this message because you are subscribed to the Google Groups "A gathering place for the Open Rail Data community" group.
To unsubscribe from this group and stop receiving emails from it, send an email to openraildata-t...@googlegroups.com.
To post to this group, send email to openrail...@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

Mike Flynn

unread,
Apr 12, 2015, 7:13:40 AM4/12/15
to openrail...@googlegroups.com
I attached my Gozzira code but please be aware, I tested this a year or two back and worked then but I amn't able to test it right now as I don't currently have Java installed on anything.  And that all takes a bit doing as I remember.  As I say though I remember this to work okay then and should do so now so long as you have the Gozzira implementation installed correctly and with right classpaths and setup, etc.
run1.java

Joel Haasnoot

unread,
Apr 12, 2015, 8:33:21 AM4/12/15
to Peter Mount, openraildata-talk
Hi Peter,
Thanks for that - was able to distill a working sample from that. For those interested in the (almost) bare minimum, but not quite production proof, see this gist: https://gist.github.com/joelhaasnoot/54e2a65528aa9940dae8 . This will print every 1000th message to the console.

Joel Haasnoot

----------------------------------------------------------
Joel Haasnoot

Mike Flynn

unread,
Jun 15, 2015, 5:41:40 AM6/15/15
to openrail...@googlegroups.com
Here is ActiveMQ java client example to connect to either NR or NRE;

- - - - - - 

package example;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.command.ActiveMQTopic;
import javax.jms.*;
import java.io.*;
import java.util.zip.GZIPInputStream;

class Listener {

    public static void main(String []args) throws JMSException {

        System.out.println("Running: " + args[0] + "\n");

        String user = "";
        String password = "";
        String host = "";
        int port = -1;
        String destination = "";
        String queue_name = "";

        if ( args[0].equals( "nr" ) ) {
            user = " MY USERNAME ";
            password = " MY PASSWORD ";
            host = "datafeeds.networkrail.co.uk";
            port = Integer.parseInt( "61619" );
            destination = "TRAIN_MVT_ALL_TOC";
        } else if ( args[0].equals( "nre" ) ) {
            user = "d3user";
            password = "d3password";
            host = "datafeeds.nationalrail.co.uk";
            port = Integer.parseInt( "61616" );
            queue_name = " MY TOKEN ";
        }

        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://" + host + ":" + port);
        Connection connection = factory.createConnection(user, password);
        connection.start();
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

        MessageConsumer consumer;
        if ( args[0].equals( "nr" ) ) {
            Destination dest = new ActiveMQTopic(destination);
            consumer = session.createConsumer(dest);
        } else {
            Queue queue = session.createQueue(queue_name);
            consumer = session.createConsumer(queue);
        }

        long start = System.currentTimeMillis();
        long count = 0;
        System.out.println("Waiting for messages...");

        while(true) {
            count++;

            Message msg = consumer.receive();

            if (args[0].equals("nr")) {

                if (msg instanceof TextMessage) {
                    String body = ((TextMessage) msg).getText();
                    if ("SHUTDOWN".equals(body)) {
                        long diff = System.currentTimeMillis() - start;
                        System.out.println(String.format("Received %d in %.2f seconds", count, (1.0 * diff / 1000.0)));
                        break;
                    } else {
                       //if (count % 10 == 0) {
                            System.out.println("Message body: " + body);
                        //  System.out.println(String.format("Received %d messages.", count));
                        //}
                   }

                } else {
                    System.out.println("Unexpected message type: "+msg.getClass());
                }

            } else if (args[0].equals("nre")) {

                if (count % 100 == 0) {
                    System.out.println("msg: " + convertToXmlString((BytesMessage) msg));
                    System.out.println(String.format("Received %d messages.", count));
                }

            }

        }

        connection.close();

    }

Peter Hicks

unread,
Jun 15, 2015, 5:48:03 AM6/15/15
to Mike Flynn, openrail...@googlegroups.com

Hi Mike

Does this implement the best practice advice as shown on the wiki?  I can't tell if it plays nice on connection failures or if it can't subscribe to the topic.

Peter


Mike Flynn

unread,
Jun 15, 2015, 6:23:02 AM6/15/15
to openrail...@googlegroups.com
First 3 good practice bullet points covered but not yet

- exponential backoff 
- durable subscription
- implement stomp heartbeats

I'll want a durable subscription.  I see there's also details on wiki re durable subscription.  

I should be able to add an exponential backoff routing to my code if you think it's necessary.  Any coding examples available.

The reason I'm going with ActiveMQ was talk from the forum about connection issues and ability to deal with using stomp heartbeats.  So I intend to apply also.

Mike Flynn

unread,
Jun 15, 2015, 8:00:38 AM6/15/15
to openrail...@googlegroups.com
Here is code snippet for durable subscription to NR feed, but..  

. . . 
. . . 

ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://" + host + ":" + port);
factory.setClientID( " MY USERNAME " );
Connection connection = factory.createConnection(user, password);
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

MessageConsumer consumer;
TopicSubscriber ts;

if ( args[0].equals( "nr" ) ) {

    //Destination dest = new ActiveMQTopic(destination);
    Topic dest = new ActiveMQTopic(destination);
    
    //consumer = session.createConsumer(dest);
    consumer = session.createDurableSubscriber(dest, "DEV:" + destination  );

} else {

    Queue queue = session.createQueue(queue_name);
    consumer = session.createConsumer(queue);

}

' ' '
' ' '


..I can't see how to create durable when for a Queue.  There's an ActiveMQConnection.createDurableConnectionConsumer method but this also takes a Topic as an argument.  Has anyone else a code example to create a durable subscription for the Darwin push port?

Peter Hicks

unread,
Jun 15, 2015, 8:05:07 AM6/15/15
to Mike Flynn, openrail...@googlegroups.com
Hi Mike

On Mon, 15 Jun 2015 at 13:00 Mike Flynn <mi...@a1publishing.com> wrote:

Here is code snippet for durable subscription to NR feed, but..  

..I can't see how to create durable when for a Queue.  There's an ActiveMQConnection.createDurableConnectionConsumer method but this also takes a Topic as an argument.  Has anyone else a code example to create a durable subscription for the Darwin push port?

Queues aren't topics, therefore don't need a durable subscription.

I'm finding it hard to follow your code, for example:

   port = Integer.parseInt( "61619" );

Is there a better way you could write that?  And are you able to split the code up so that it's easier to understand?


Peter

Mike Flynn

unread,
Jun 15, 2015, 9:58:40 AM6/15/15
to openrail...@googlegroups.com
>> Queues aren't topics, therefore don't need a durable subscription.

Oh, okay, now I know!  Cheers : ) But like it says in the wiki, what if the line goes down, etc.?  


>>  port = Integer.parseInt( "61619" );

The code's just a work in progress, Peter, I've lifted from Joel's code (above, BTW, thanks Joel :), and from the Listener.java prog from the ActiveMQ "examples\openwire\java\src\main\java\example" folder;  

..
int port = Integer.parseInt(env("ACTIVEMQ_PORT", "61616"));
'' 
''
private static String env(String key, String defaultValue) {
String rc = System.getenv(key);
if( rc== null )
   return defaultValue;
return rc;
}
''
''

Since I'm not using environment variables for the testing here I stripped it down to avoid the compile warning.  But you're quite right, "port = 61619", is better LOL.

I posted it just as an example for anyone else pulling their hair out trying to work out if is topic or queue to go with and which port to use, etc.  As you know, there actually a bucketload of combinations depending on which language / methodology / feed you're using for access, maybe why it's not all on the wiki, and there was nothing explicit for NR and NRE using Java ActiveMQ, except on this thread.  But, yes, add a caveat to my example as per the parseint above : )

Mike Flynn

unread,
Jun 15, 2015, 10:38:30 AM6/15/15
to openrail...@googlegroups.com
>> >> Queues aren't topics, therefore don't need a durable subscription.

>> Oh, okay, now I know!  Cheers : ) But like it says in the wiki, what if the line goes down, etc.?

To answer my own question, but I am a learner here, am I right in thinking that if, say, my computer were to crash, or whatever, the queue would resume wherever it left off?  And if so what timescale before messages were lost.  I saw 5 minutes mentioned somewhere.  And, again, if so, is this configurable?


Peter Hicks

unread,
Jun 15, 2015, 10:49:56 AM6/15/15
to Mike Flynn, openrail...@googlegroups.com
Hi Mike

The TTL of messages in the Push Port queues is five minutes, and it's not configurable at the user end.

Incidentally, http://activemq.apache.org/how-does-a-queue-compare-to-a-topic.html explains the difference between queues and topics.

In the politest way possible, can I suggest that you build up some more experience with message queues before sending around sample code?  If you're not sure why something like port = Integer.parseInt("61619"); isn't good practice, you're creating more code that's difficult to understand, which makes it more difficult for others to work.

I am working on a nice, clean client based on Apache Camel which processes messages through a separate class and should be much easier than shoving a bunch of code inside a single class.  It'll be ready in a few days, I estimate - lots going on at this end.


Peter

Mike Flynn

unread,
Jun 15, 2015, 10:53:17 AM6/15/15
to openrail...@googlegroups.com
Hi Peter,

Just trying to be helpful ;->

Yes, I'm on the page Topics vs Queues.  My thinking is that a backup(s) would be needed for failover when dealing with Queues.  

I'll be watching out for your code.

Cheers
Mike

Qi Ou

unread,
Sep 16, 2015, 4:15:09 PM9/16/15
to A gathering place for the Open Rail Data community
Hi Mike,

I am quite new to activemq. I have managed to run the example you provided. It prints out below messages

msg: <?xml version="1.0" encoding="UTF-8"?><Pport xmlns="http://www.thalesgroup.com/rtti/PushPort/v12" ts="2015-09-15T17:02:00.7613345+01:00" version="12.0"><uR updateOrigin="Darwin"><deactivated rid="201509150795393"/></uR></Pport>
Received 300 messages.

But I struggled to get the actual data (schedule, time table etc) from Darwin push-port. Do you have some examples or can you point me in the right direction?

Thanks,
Qi

Peter Hicks

unread,
Sep 16, 2015, 4:28:17 PM9/16/15
to Qi Ou, A gathering place for the Open Rail Data community
Hi Qi

The timetable, i.e. the set of schedules which Darwin understands are valid on a particular day, is described at http://nrodwiki.rockshore.net/index.php/Darwin:Push_Port#Timetable - you need tp FTP in to datafeeds.networkrail.co.uk with the username and password displayed on the 'My Feeds' page.


Peter

--
You received this message because you are subscribed to the Google Groups "A gathering place for the Open Rail Data community" group.
To unsubscribe from this group and stop receiving emails from it, send an email to openraildata-t...@googlegroups.com.
To post to this group, send an email to openrail...@googlegroups.com.

George Roberts

unread,
Apr 8, 2016, 6:44:54 AM4/8/16
to A gathering place for the Open Rail Data community
Hi Mike, I know this was a while ago and maybe you can't remember but this code seems to work well apart from this line 

                System.out.println("msg: " + convertToXMLString((BytesMessage) msg));


Eclipse wants me to create a new method for convertToXMLString function which i'm assuming should actually be coming from one of the imported libraries?

Can you remember where this comes from?

Thansk,

George

Mike Flynn

unread,
Apr 11, 2016, 5:37:43 AM4/11/16
to A gathering place for the Open Rail Data community
 >> this code seems to work well apart from this line 

>>             System.out.println("msg: " + convertToXMLString((BytesMessage) msg));

>>Can you remember where this comes from?

Thansk,

George

- show quoted 

Mike Flynn

unread,
Apr 11, 2016, 5:41:03 AM4/11/16
to A gathering place for the Open Rail Data community

** oops! pls excuse premature post above **
 


Hi George,

>> Can you remember where this comes from?

Here is  convertToXmlString code;

    private static String convertToXmlString(BytesMessage t) {

        if (t != null) {
            try {
                long l = t.getBodyLength();
                byte b[] = new byte[(int) l];
                t.readBytes(b);
                Reader r = null;
                try {
                    r = new InputStreamReader(new GZIPInputStream(new ByteArrayInputStream(b)));
                    StringBuilder sb = new StringBuilder();
                    char cb[] = new char[1024];
                    int s = r.read(cb);
                    while (s > -1) {
                        sb.append(cb, 0, s);
                        s = r.read(cb);
                    }
                    return sb.toString();
                } finally {
                    if (r != null) {
                        r.close();
                    }
                }

            } catch (IOException ex) {
                System.out.println( "Failed to parse message: IOException" );
            } catch (JMSException ex) {
                System.out.println( "Failed to parse message: JMSException" );
            }
        }

        return null;
    }

I vaguely remember picking the code up from somewhere in this forum.   I remember the code does work though I'm currently unable to test it.  Hope this helps.

MIke

George Roberts

unread,
Apr 11, 2016, 1:38:40 PM4/11/16
to A gathering place for the Open Rail Data community
Awesome, that seems to work perfectly. Just to clarify, as far as I understand, I can distill train delay information from this queue. Is that right?
Thanks again,
George

Peter Mount

unread,
Apr 11, 2016, 1:49:23 PM4/11/16
to George Roberts, openraildata-talk

Yes,  I do just that.

There's a few fields involved but in simplistic terms:

Delay Arrival = (forecast eta | ata ) - schedule pta

Where eta or ata could be set.  Just be aware not everywhere reports an ata/atd & some well have one not the other - Otford coast bound is a good one,  you'll see an ata but no atd.

For example the train I'm currently on right now: http://uktra.in/rtt/train/201604110876696

--
You received this message because you are subscribed to the Google Groups "A gathering place for the Open Rail Data community" group.
To unsubscribe from this group and stop receiving emails from it, send an email to openraildata-t...@googlegroups.com.
To post to this group, send email to openrail...@googlegroups.com.

University

unread,
Apr 11, 2016, 1:52:12 PM4/11/16
to Peter Mount, openraildata-talk
OK that sounds good. I was interested in also being able to reference this table, and assumed that these codes, if necessary, would come through in this feed. http://wiki.openraildata.com/index.php/Darwin:Late_Running_reason_codes_and_text Is that correct?
Thanks

Peter Mount

unread,
Apr 11, 2016, 1:57:28 PM4/11/16
to George Roberts, openraildata-talk

Yes they do.

From memory (no room to get the laptop out to check) canceled reason is in the schedule whilst the delay code is in the forecast - I think.

Reply all
Reply to author
Forward
0 new messages