Messages not received

120 views
Skip to first unread message

mikeant...@gmail.com

unread,
Jul 4, 2014, 5:35:27 AM7/4/14
to sna...@googlegroups.com
I'm trying to find a reliable way to know when the message queue is ready so that I can send messages after starting a thread ASAP.

I've not had much luck with either Link.on_ready_to_send or Link.on_connect, the latter of which I noticed was the method used in the performance examples.

The server thread is running long before the client message thread is spawned, and the two are able to send messages if a delay between the thread startup and message transmission is present.

How can I reliably time the first transmission as soon as SnakeMQ is ready?

Kind regards,
Mike

David Siroky

unread,
Jul 5, 2014, 4:57:39 AM7/5/14
to sna...@googlegroups.com
Hi Mike,

the message queue is ready immediately when you create it. Start filling it, it
will be flushed when the connection is made.

Best regards
David

mikeant...@gmail.com

unread,
Jul 7, 2014, 6:53:04 AM7/7/14
to sna...@googlegroups.com
The problem is more with my implementation. I'm starting a SnakeMQ thread, and then immediately trying to send a message (presumably before the thread has had time to start).

Currently I'm just sleeping for a short time before sending the first message. A better question:

Is there a variable I can poll to check when the message queue is ready? Link._do_loop looks like a good bet.

mikeant...@gmail.com

unread,
Jul 7, 2014, 7:03:02 AM7/7/14
to sna...@googlegroups.com, mikeant...@gmail.com

Just a small update:

I logged the value of Link._do_loop on Thread.start(), however sending a message after asserting _do_loop is True still results in a lost message - so far the most reliable way of determining messages get through is just sleeping 200ms before the initial message.

Any ideas?

David Siroky

unread,
Jul 7, 2014, 7:21:56 AM7/7/14
to sna...@googlegroups.com
>
> The problem is more with my implementation. I'm starting a SnakeMQ thread, and then immediately trying to send a message (presumably before the thread has had time to start).
>
> Currently I'm just sleeping for a short time before sending the first message. A better question:
>
> Is there a variable I can poll to check when the message queue is ready? Link._do_loop looks like a good bet.
>

There is no such state when the queue is not ready. The queue is always ready.
You can call send_message() even if the loop is not running. Can you post some
minimal example reproducing your problem?

David

gooo...@gmail.com

unread,
Oct 16, 2014, 6:06:22 PM10/16/14
to sna...@googlegroups.com
# initialize messaging
messctl = MasterMessenger("CLI", log)
messctl.add_connector(4000)
messctl.start()
if task == "messaging":
recipient = arg[0]
command = arg[1]
messctl.send(recipient, command)

# need to wait a bit before closing socket
#time.sleep(1)
messctl.stop()

With a sleep it works fine, but when commented, message is not send. I would like to ask a proper way to identify when snakemq instance is ready to close loop/send next message.

David Siroky

unread,
Oct 17, 2014, 2:07:38 AM10/17/14
to sna...@googlegroups.com
Hi!

You forgot to post somewhere your MasterMessenger class. I have no idea
what are you doing in that class. What exactly are you trying to do?
Open a connection, send a message, close the connection and start again?
This is not the way snakemq works. The loop should run "forever"
somewhere in the background. That is why you need the sleep in your code
to give the loop some time to process the message.

David

gooo...@gmail.com

unread,
Oct 17, 2014, 4:03:29 AM10/17/14
to sna...@googlegroups.com
Hi David,

Your guess was almost correct. MasterMessenger is just a wrapper for snakemq. Instance is running in separate thread.
I'm creating a CLI module for a server, so behavior is following:
- start CLI
- open connection in background
- parse arguments
- send command
- close connection
- exit

It seems not really pythonic way to sleep (with no guaranty of message delivery) instead of knowing exactly, when connection could be closed. E.g. when the server or network is really busy, I should wait not 1 second, but probably 5 (or 10).

That's the reason I'm asking.
Could you please give me some example of code, which will wait for message delivery and then close connection?

David Siroky

unread,
Oct 17, 2014, 10:00:45 AM10/17/14
to sna...@googlegroups.com
In this case the snakemq is a wrong tool. Use plain old TCP. There is no
need for an extra thread.

David

Nikodim

unread,
Oct 17, 2014, 11:07:01 AM10/17/14
to sna...@googlegroups.com
David,

Please clarify, if I'm asking a question you don't have an answer.
I'm using snakemq across different components of the solution (running in endless loop, so it's ok), and I don't want to re-invent bicycle for this particular CLI case.

Official snakemq documentation says:
"Always wait for on_ready_to_send to have confirmation about successful send and information about amount of sent data."

How can I do this? Event for endlessly-looped instances I would like to perform this check.

Thank you in advance.

David Siroky

unread,
Oct 18, 2014, 8:03:24 AM10/18/14
to sna...@googlegroups.com
Now I understand why you want to use the snakemq here. Try something
like this:

----------------------

import snakemq.link
import snakemq.packeter
import snakemq.messaging
import snakemq.message

def on_sent(conn, ident, message):
global s
s.stop()

s = snakemq.link.Link()
s.add_connector(("localhost", 4000))

pktr = snakemq.packeter.Packeter(s)

m = snakemq.messaging.Messaging("xconnector", "", pktr)
m.on_message_sent.add(on_sent)

msg = snakemq.message.Message(b"hello", ttl=60)
m.send_message("xlistener", msg)

s.loop()

----------------------

David

Nikodim

unread,
Oct 21, 2014, 1:29:23 PM10/21/14
to sna...@googlegroups.com
David,

Thank you for the answer.

I still have one question. Is there's a way to force on_drop with no connection established?
E.g. when no listener running, following code will wait for connection forever, and will trigger on_drop once listener is added and client re-connected.
For me expected behavior would be like raising on_drop when ttl is over regardless of connection status.

----------------------
import snakemq.link
import snakemq.packeter
import snakemq.messaging
import snakemq.message
import logging

def on_sent(conn, ident, message):
    global s
    print "SUCCESS delivering message to %s" % ident
    s.stop()
    
def on_drop(ident, message):
    global s
    print "FAILED to deliver message to %s" % ident
    s.stop()
    
def on_error(ident):
    global s
    print "ERROR delivering message to %s" % ident
    s.stop()

print "Sending started."

snakemq.init_logging()
logger = logging.getLogger("snakemq")
logger.setLevel(logging.DEBUG)

s = snakemq.link.Link()
s.add_connector(("localhost", 4000))

pktr = snakemq.packeter.Packeter(s)

m = snakemq.messaging.Messaging("xconnector", "", pktr)
m.on_message_sent.add(on_sent)
m.on_message_drop.add(on_drop)
m.on_error.add(on_error)

msg = snakemq.message.Message(b"hello", ttl=10)
m.send_message("xlistener", msg)

s.loop()
----------------------

David Siroky

unread,
Oct 23, 2014, 4:00:19 AM10/23/14
to sna...@googlegroups.com
This behavior is not implemented for performance reasons. I will
implement on demand garbage collection some day. For now try something
like this:

==========
...
m = Messaging(...)
...
# force garbage collection for peer with identifier "bob"
queue = m.queues_manager.get_queue("bob")
if not queue.connected:
queue.connect()
queue.disconnect()
==========

This is undocumented and I don't guarantee it will work in future
releases.
Be aware that this is not thread safe. Perform it e.g. in
Link.on_loop_pass callback.

David
Reply all
Reply to author
Forward
0 new messages