question : migration from zeromq

已查看 124 次
跳至第一个未读帖子

Bino Oetomo

未读,
2014年5月20日 05:35:462014/5/20
收件人 sna...@googlegroups.com
Dear All ..

I just play abit with zeromq on a telemetry project.

overall, i'm satisfied with it.
except that it didn't have msg data persistence, so I will lose my msg whenever my python script stoped/killed.

Basically my existing script have 2 part :

1. A 0MQ 'devices' just like this one , except that the 'backend' part not 'binded' (backend.bind) but connect to remote zmq host (backend.connect)
2. a UDP listener.

it work like this :
a. udp listener received msg from other host, post received msg to 0MQ device.
b. 0mq device, :
b.1. when there is network connection : will push to remote zmq host via it's 'backend' socket , or
b.2. when there is no network : just put the msg in it's internal-queue ... that by design have no persistency.

So , my question is : How to do this with SnakeMQ using sqlite as it's persistence storage ?

Sincerely
-bino-


David Široký

未读,
2014年5月20日 06:05:512014/5/20
收件人 sna...@googlegroups.com
Hi Bino!

So your model looks like:
                          /-0MQ-> consumer1
generator -UDP-> streamer --0MQ-> consumer2
                          \-0MQ-> consumer3

and you need persistent messages from streamer to consumers?

David

Bino Oetomo

未读,
2014年5月21日 11:27:002014/5/21
收件人 sna...@googlegroups.com
Dear All,
C/Q David.

I Really appreciate your response.

Actually, the 'Streamer'  it self is builded using two 0mq socket.
refer to url i provided, one called 'frontend' and the other called 'backend'
Streamer receive msg via 'frontend' , and push it out via 'backend'

Between that two 0mq socket .. there is in-memory cache, and that is I want it to be on-disk (persistent).

Sincerely
-bino-

Bino Oetomo

未读,
2014年5月22日 04:46:012014/5/22
收件人 sna...@googlegroups.com


On Wednesday, May 21, 2014 10:27:00 PM UTC+7, Bino Oetomo wrote:

Actually, the 'Streamer'  it self is builded using two 0mq socket.
refer to url i provided, one called 'frontend' and the other called 'backend'
Streamer receive msg via 'frontend' , and push it out via 'backend'

Between that two 0mq socket .. there is in-memory cache, and that is I want it to be on-disk (persistent).



Ahh ... I got it .
I Read https://github.com/dsiroky/snakemq/blob/master/examples/persistent_messaging_connector.py
And I think if I use this persistent connector ... every time I send a message it'll stored on it's sqlite db storage.

I tried that example and also the listener counter part ... it work just like what I want.

My dumb brain still have question :

The persistent connector example just have one shot send. How to send data while this connector in it's 'loop()'
i.e : How if I want a script keep sending --> datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
Once per second ?

Sincerely
-bino-

David Široký

未读,
2014年5月22日 11:52:192014/5/22
收件人 sna...@googlegroups.com

Ahh ... I got it .
I Read https://github.com/dsiroky/snakemq/blob/master/examples/persistent_messaging_connector.py
And I think if I use this persistent connector ... every time I send a message it'll stored on it's sqlite db storage.

Yes. But be aware that snakeMQ does not yet provide a reliable multicast (something like a pub-sub pattern). The persistence is achieved only per individual peer identifiers. In your case if the recipients group is fixed and you know all their identifiers then there is no problem.

Bonus tip/note: the persistence is not bound to the connector or listener, only to a identifier. You can create a snakeMQ stack without any listeners or connectors and send a message with long TTL. It will be stored in the database. The next day you fix your program by adding e.g. a listener and if the appropriate peer connects it will get the message from yesterday.
 
My dumb brain still have question :

The persistent connector example just have one shot send. How to send data while this connector in it's 'loop()'
i.e : How if I want a script keep sending --> datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
Once per second ?
 
messaging.send_message() method is thread safe. You can have a dedicated thread that will send those timestamps.

David

Bino Oetomo

未读,
2014年5月22日 21:54:532014/5/22
收件人 sna...@googlegroups.com
Dear All,
C/Q David.

I really appreciate your response


On Thursday, May 22, 2014 10:52:19 PM UTC+7, David Široký wrote:

Yes. But be aware that snakeMQ does not yet provide a reliable multicast (something like a pub-sub pattern). The persistence is achieved only per individual peer identifiers. In your case if the recipients group is fixed and you know all their identifiers then there is no problem.

 Ooopps ...
I think I need to make a test of how fast the listener can handle multi 'sender' (i.e up to 150 sender)

Bonus tip/note: the persistence is not bound to the connector or listener, only to a identifier. You can create a snakeMQ stack without any listeners or connectors and send a message with long TTL. It will be stored in the database. The next day you fix your program by adding e.g. a listener and if the appropriate peer connects it will get the message from yesterday.
 

Yup ... i test it
 
My dumb brain still have question :

The persistent connector example just have one shot send. How to send data while this connector in it's 'loop()'
i.e : How if I want a script keep sending --> datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
Once per second ?
 
messaging.send_message() method is thread safe. You can have a dedicated thread that will send those timestamps.


I'm not clear about it.
I use threading with persistence sqlite storage ... and sqlite dont like it.

Hmmm ... my current approach is :
1. write a non persistence listener script that just printout received message.

2. Write kind of broker. That is a snakemq based script that have one listener and one connector and use sqlite persistence storage.
each message received by listener ... thrown to #1, that mean write to sqlite whenever #1 is not available.

3. write a non persistence 'sender', that just loop a file .. and send each line to #2 (broker). This one will be changed to be udp-to-snakeMQ ... that listen on a local udp port and send any received msg to #2

Note : #2 and #3 is at the same host , and #1 at another host (virtualbox)

Test methode :
a. Start #2 in one terminal and then start #3 in other terminal. #1 stil deactivated.
b. when #3 finish the job , means all data is in #2 cache .... start #1 in virtualbox.

Current result : 1091 line of data with 537 char perline is transmited from #2 cache to #1 in arround 3 minutes ...its arround 6 line per sec.

Yes ...it's looks slow, but I thing it's cause by disk read/write/del botleneck.

Today plan :
1. make an openwrt package of snakeMQ.
2. and make a test on routerboard 450 G with SD card.

Background storry :
I build kind of telemetry 'bridge' for dump trucks. there is posibilities that dumptruck will 'out of network range' up to 2 weeks.
The data is UDP sent by ECU reader each second ... and my responsibility to make sure that the data will reach end-point-server as fast as possible.


Sincerely
-bino-

David Siroky

未读,
2014年5月23日 04:29:012014/5/23
收件人 sna...@googlegroups.com
> Ooopps ...
> I think I need to make a test of how fast the listener can handle multi 'sender'
> (i.e up to 150 sender)

The bottom socket layer uses polling and every identifier/queue/buffer lookup is
performed via maps/dictionaries. I believe it should be fast enough.

> messaging.send_message() method is thread safe. You can have a dedicated
> thread that will send those timestamps.
>
>
> I'm not clear about it.
> I use threading with persistence sqlite storage ... and sqlite dont like it.

Don't worry. Every IO is placed into a locked critical section.

> Current result : 1091 line of data with 537 char perline is transmited from #2
> cache to #1 in arround 3 minutes ...its arround 6 line per sec.
>
> Yes ...it's looks slow, but I thing it's cause by disk read/write/del botleneck.

That is terribly slow. Can you send me these testing scripts?

David

Bino Oetomo

未读,
2014年5月23日 05:01:492014/5/23
收件人 sna...@googlegroups.com

Dear All,
C/Q David

On Friday, May 23, 2014 3:29:01 PM UTC+7, David Široký wrote:

That is terribly slow. Can you send me these testing scripts?

David



Yes Sir.
I will try sent it via direct email to you

Sincerely
-bino-

Bino Oetomo

未读,
2014年5月23日 21:52:332014/5/23
收件人 sna...@googlegroups.com
Dear All

C/Q David


On Friday, May 23, 2014 3:29:01 PM UTC+7, David Široký wrote:


That is terribly slow. Can you send me these testing scripts?

David


After your fix on SSL part, I made another test

A. Scripts:
1. 'Broker' using SQLITE placed at RouterBoard 450 with openwrt and SD Card
2. end-point listener plased at virtualbox with debian
3. Sender at Host PC with ubuntu.

B. Scenario and results :
1. Script#1 and script#2 not start
2. Start script#3 , make sure all data loaded to it's mem cache
3. with script#3 still alive : Start script#1 , make sure it'll receive msg from script#3 and just place it to it's sqlite cache. it takes 1 minute from the start of script#1
4. with script#1 still alive : start scripts#2 in virtualbox. it takes 1 minute from the start of script#2 untill last data printed out.

Note : looks like the 'connection initialization' between each script takes up-to 30 seconds (I'm not so sure).

Thankyou for your great work.

Sincerely
-bino-
回复全部
回复作者
转发
0 个新帖子