TD data using STOMP in Python fails after a few hours

75 views
Skip to first unread message

A gathering place for the Open Rail Data community

unread,
Sep 19, 2023, 7:42:32 AMSep 19
to A gathering place for the Open Rail Data community
Hi all, 

looking for a bit of support on this one, my stomp client is failing after a few hours of recieving data from the TD datafeed, reading other posts on here i have tried the heartbeat settings which i now have at 55000,55000 but no matter how high i put the hearbeat i just get higher and higher timeouts, started at 10, then 20, then 40,  50 ... and i just get increasingly higher heartbeat errors, most if have it run for is 7-8 hours, but usually its a few hours and it fails this is running as a service under systemd

someone mentioned about being behind a NAT being a potential issue, im on Virginmedia so natted ... i tried running the code on a machine that has a VPN out but same issues after a few hours ... any help here would be appreciated

Sep 19 11:03:51 SYSTEMNAME python3[3321]: heartbeat timeout: diff_receive=82.50032559712417, time=2072636.606227724, lastrec=2072554.105902127

i find it hard to believe that the system has gone 82 seconds without recieving anything ... 

my code is a bit messy, see below, its a specic use case, i just want to monitor a specifi TD function on YO TD address 94 ... but before i clean up the code i want to figure out why its failing after a few hours

#!/bin/python3

import logging
import datetime
import stomp
import json
import time

def getbit(x,n):
x=int(x,16)
return x & (1<< n) and 1 or 0

def tracklogs(logdata):
f=open("tracklog.txt","a")
f.write("%s\n" %logdata)
f.close()

NETWORK_RAIL_AUTH = ('myu...@email.com', 'mySecretPassword')

class MyListener(stomp.ConnectionListener):
def on_connected(self, headers):
tracklogs("connected")

def on_error(self, headers):
err=headers
f=open("tracklog.txt", "a")
f.write ('error %s\n' %headers)
f.close()

def on_disconnected(self):
tracklogs("DISconnected")

def on_heartbeat(self):
tracklogs("heartbeat")

def on_message(self, headers):
msg=json.loads(headers.body)
for entry in msg:
mykeys=[*entry]
myvals=[*entry.values()]
for item in myvals:
msgtype= (item["msg_type"])
rectime=int(item["time"])/1000.0
rectime2=datetime.datetime.fromtimestamp(rectime).strftime('%Y-%m-%d %H:%M:%S')
fdate=datetime.datetime.fromtimestamp(rectime).strftime('%Y-%m-%d')
area=(item["area_id"])
if (area=="YO" and msgtype=="SF"):
addressf=item["address"]
dataf= item["data"]

if (addressf=="94"):
filename="%s-trackdata.txt" %fdate
f = open(filename, "a")
if getbit(dataf,0)==0:
f.write ('%s - T235 is occupied\n' %rectime2)

if getbit(dataf,0)==1:
f.write ('%s - T235 is clear\n' %rectime2)

if getbit(dataf,1)==0:
f.write ('%s - T236 is occupied\n' %rectime2)
if getbit(dataf,1)==1:
f.write ('%s - T236 is clear\n' %rectime2)

if getbit(dataf,2)==0:
f.write ('%s - T238 is occupied\n' %rectime2)
if getbit(dataf,2)==1:
f.write ('%s - T238 is clear\n' %rectime2)

if getbit(dataf,3)==0:
f.write ('%s - T239 is occupied\n' %rectime2)
if getbit(dataf,3)==1:
f.write ('%s - T239 is clear\n' %rectime2)

if getbit(dataf,4)==0:
f.write ('%s - T241 is occupied\n' %rectime2)
if getbit(dataf,4)==1:
f.write ('%s - T241 is clear\n' %rectime2)

if getbit(dataf,5)==0:
f.write ('%s - T242 is occupied\n' %rectime2)
if getbit(dataf,5)==1:
f.write ('%s - T242 is clear\n' %rectime2)

if getbit(dataf,6)==0:
f.write ('%s - T245 is occupied\n' %rectime2)
if getbit(dataf,6)==1:
f.write ('%s - T245 is clear\n' %rectime2)

if getbit(dataf,7)==0:
f.write ('%s - T246 is occupied\n' %rectime2)
if getbit(dataf,7)==1:
f.write ('%s - T246 is clear\n' %rectime2)
f.close()


# print ("%s %s %s %s %s" %(msgtype, rectime2, area, addressf, dataf))
# print ("---------- ADDRESS 94 -----------")

mq=stomp.Connection12(host_and_ports=[('publicdatafeeds.networkrail.co.uk', 61618)], heartbeats=(55000,55000), keepalive=True)

mq.set_listener('', MyListener())

mq.connect(username=NETWORK_RAIL_AUTH[0], passcode=NETWORK_RAIL_AUTH[1], wait=True)

mq.subscribe('/topic/TD_ALL_SIG_AREA', 871756, ack='client-individual')

while mq.is_connected():
time.sleep(60)

Evelyn Snow

unread,
Sep 19, 2023, 9:13:22 AMSep 19
to openrail...@googlegroups.com
Hi,

Since you're using stomp.py, you might find this previous reply helpful, it
covers some configuration basics.

https://groups.google.com/g/openraildata-talk/c/HVRoC-xWIYg/m/i5-NpARaAwAJ

These are all sensible recommendations, in particular I'd highlight the
recommendation to set heart_beat_receive_scale=2.5.

I'd additionally recommend setting smaller heartbeats, 50000,50000 is too
much. Combined with the default timeout scale, this means your minimum
heartbeat timeout is effectively 75 seconds.

I'd tentatively recommend 10000,10000 with a receive scale of 2.5. Your
minimum heartbeat timeout then becomes 25s, which should be enough to
survive the worst temporary latency/disruption Virgin Media is likely to
throw at you, but not to waste time if the connection is permanently dead.

Then, finally, your script doesn't attempt to reconnect. stomp.py does include
some parameters which are aimed at helping automatic reconnection, but it
doesn't do so itself. The docs have more information on this, see here:

https://stomppy.readthedocs.io/en/latest/api.html#dealing-with-disconnects

Evelyn

A gathering place for the Open Rail Data community

unread,
Sep 19, 2023, 9:52:12 AMSep 19
to A gathering place for the Open Rail Data community
Evelyn, 

thanks for the info, i did see the earlier thread on a similar topic, i'll try the heart beat scale soltion later tonight when i get a chance, i did also run the 'dealing with disconects' alsmost exactly as it is posted, obviously with the correct server details, but it would not reconnect, i kept gettin some sort of error with the connect_and_subscribe function when called from within the on_disconnect handler (my python knowledge letting me down here) but i couldnt get that to work either ... i'll try again with that and post the error, maybe some helpful soul might know how to get round that one as well ...

Sandy

Malcolm Bovey

unread,
Sep 19, 2023, 7:21:20 PMSep 19
to A gathering place for the Open Rail Data community
Hi Sandy,

Looks like you are connecting with a v1.2 STOMP client.  At the time I wrote my client, NR Open Data was (at least officially) only supporting v1.1, though I can't find any reference to that limitation now.  I use stomp.py with StompConnection11 and haven't seen any problems.  Maybe try that?

It also looks like your client is doing some post-processing / filtering /writing out of the messages.  I'd recommend doing that in a downstream process rather than as part of your client - as the TD feed is very high volume it could cause problems if the client stuggles to keep up.  Maybe use a queue/threading?

M

Sandy Forrest

unread,
Sep 20, 2023, 2:53:41 AMSep 20
to A gathering place for the Open Rail Data community
Malcolm, 

thanks for the tip on v1.1 of STOMP, i've switched between v.10 and v1.2 never thought of trying the middle one ;)

your maybe onto something with the post-processing, i have a stripped down version of the client (using some of the earlier posted suggestions) and it now been running for almost 10hrs and hasn't stuttered yet, its now just throwing the address and data into a MYSQL database and i just process the info from there when needed, which is much cleaner (i think) although there is still some minor post processing to strip out the address and data bit from each chunk of the stream

my usage case is quite unique and limited to a single track section really but potentially could grow to be much more than this, i'll look at threading though, thats a good shout :)

Thanks
Sandy

Reply all
Reply to author
Forward
0 new messages