Example of request/response pattern with SWI and MQTT?

396 views
Skip to first unread message

Steve Prior

unread,
Feb 3, 2017, 9:14:13 PM2/3/17
to SWI-Prolog
I've got home automation code with a SWI-Prolog "brain" that I wrote around 15 years ago and it still runs my house.  The system currently uses C++ code to wrap around the Prolog so that the system takes in event notifications and makes action request calls via CORBA.  The outgoing CORBA calls that make a request that requires a response will block until that response is received.  I'm looking to port this system to use MQTT as the message transport mechanism, but in that type environment a caller makes a request on one topic/queue and then receives the response asynchronously on another queue.

Does anyone have any example SWI Prolog code that shows how to implement a request/response pattern to make a Prolog thread wait until it receives a response to its request?

Jan Wielemaker

unread,
Feb 4, 2017, 6:07:53 AM2/4/17
to Steve Prior, SWI-Prolog
Hi Steve,

On 02/04/2017 03:14 AM, Steve Prior wrote:
> I've got home automation code with a SWI-Prolog "brain" that I wrote
> around 15 years ago and it still runs my house. The system currently

:)

> uses C++ code to wrap around the Prolog so that the system takes in
> event notifications and makes action request calls via CORBA. The
> outgoing CORBA calls that make a request that requires a response will
> block until that response is received. I'm looking to port this system
> to use MQTT as the message transport mechanism, but in that type
> environment a caller makes a request on one topic/queue and then
> receives the response asynchronously on another queue.
>
> Does anyone have any example SWI Prolog code that shows how to implement
> a request/response pattern to make a Prolog thread wait until it
> receives a response to its request?

Basically make the thread wait on a message queue and have some central
thread gathering the messages from the network and sending them to the
appropriate message queue. This is similar as to what Pengines are
doing.

In recent versions `engines' may be a nice alternative. In this scenario
the engine represents a Prolog state that is detached from a thread while
waiting. A pool of threads deal with incoming messages, find the associated
engine and revive the appropriate engine until it is ready to wait for the
next message. Complexity-wise, this is very similar. The main difference
is that it uses less resources and can scale to much larger numbers of
states
(or smaller hardware).

Please contribute a pack if you make an infrastructure for MQTT.

Cheers --- Jan

Steve Prior

unread,
Feb 4, 2017, 2:07:54 PM2/4/17
to SWI-Prolog, spr...@geekster.com


Please contribute a pack if you make an infrastructure for MQTT.

 I installed the existing MQTT pack: http://www.swi-prolog.org/pack/list?p=mqtt
A half hour after I found that I was able to send MQTT messages from Prolog to the other devices on my network, haven't tried subscribing to topics within Prolog yet.

It's possible I haven't gone far enough in thinking about how my architecture should change when going from CORBA to MQTT messages, and should do more to eliminate the concept of blocking requests altogether.  Here's an example of how my system currently handles an event with the CORBA setup:

motion(garage):-
   logMachine(LogMachine),
   log_add(LogMachine, 'brain', 'X10react', 'saw garage motion', false),
   save_outdoor_picture('outside'),!.

save_outdoor_picture(Location):-
   video_location(Location,VideoMachine),
   max_snap_res(VideoMachine,Resolution),
   video_TakeSnapshotAtRes(VideoMachine,Resolution,Picture),
   filestore_storeFile('tux', Location,'image/jpeg',Picture).

The tricky part is "video_TakeSnapshotAtRes" - all the other lines can initiate an action and keep on going, but that line makes a request to a video server and waits for the Picture to come back and that's needed before continuing.  I probably need to think more about how EVERYTHING in the system is implemented as a notification and there are no blocking requests.  In this particular case a request would be sent to initiate the picture, and it would be an entirely new notification that would respond to the event that a new picture is available and perform the appropriate action with that picture.

BTW, yes Picture really is the array of bytes of the picture image, not a URL or anything - the picture is handled in memory and doesn't hit any filesystem until filestore stores it.

Another tricky case involves the web based chat interface.  In this case a Java based webapp takes a text field with user input and then makes a CORBA request which calls a Prolog predicate similar to command(Request, Response) and the webapp must wait for the Response and display it back to the user.  At that point the waiting is a Java, not Prolog problem, but it's still going to be tricky to deal with, unless I ditch the current interface and go with a WebSockets type system and just push notifications.  It would then be possible to display responses in a different order than the requests were made, but that's not necessarily a bad thing for a conversational interface.

Steve Prior

unread,
Jun 18, 2017, 2:10:31 PM6/18/17
to SWI-Prolog, spr...@geekster.com
I've been playing more with the MQTT library for SWI-Prolog at https://github.com/olsky/swi-mqtt-pack which does not appear
to be actively maintained and have been able to publish and subscribe to MQTT messages.  It looks like this repository bundles an old version of the mosquitto library which caused some problems with subscribe, but once I used an up to date library that problem went away - so far so good.

One requirement I've got for my use of Prolog in my Smarthome system is to be able to pass binary blobs (actually images) around and I found this MQTT library doesn't work for that.  Digging into the C code I've found that the author used PL_get_chars(topic, &mqtt_topic, CVT_WRITE | BUF_MALLOC) to copy from the term_t into the message buffer used for mosquitto - since PL_get_chars returns a null terminated string that's certainly not going to work for me.  Since MQTT messages are binary blobs there should be another way.

To support the work I'm about to do I've forked the SWI MQTT pack on Github, my fork is at  https://github.com/sprior/swi-mqtt-pack.  The code of interest is c/mqtt.c around lines 716-729.  I know that binary blobs aren't explicitly supported by SWI-Prolog, but 10 years ago when I did this for CORBA Jan mentioned that Strings are internally stored as an array of unsigned chars, so binary data should actually work there, and it has - that's been working for 10 years!  When I did it I wrote my code in C++, not C so it was a lot easier.  The one problem I had at the time was how to determine the length of the char buffer, so I ended up doing it ont the Prolog side with string_length/2:

filestore_storeFile(ServerMachine, Description, Mimetype, Contents):-
   string_length(Contents,Length),
   filestore_storeFile(ServerMachine, Description, Mimetype, Contents, Length).

filestore_storeFile was implemented in C++.

So now with the MQTT code I'm looking at the code:
where term_t payload;
char* mqtt_payload;
PL_get_chars(payload, &mqtt_payload, CVT_WRITE | BUF_MALLOC)
mosq_rc = mosquitto_publish(m->mosq, &mid, mqtt_topic, strlen(mqtt_payload), mqtt_payload, qos, retain);

Is it more correct to use
int PL_get_pointer(term_t +t, void **ptr)
instead of PL_get_chars
and is there any way to ask a term_t for its length to avoid the call to string_length in the Prolog wrapper?

Thanks in advance
Steve

Jan Wielemaker

unread,
Jun 19, 2017, 3:27:09 AM6/19/17
to Steve Prior, SWI-Prolog
On 06/18/2017 08:10 PM, Steve Prior wrote:
> I've been playing more with the MQTT library for SWI-Prolog at
> https://github.com/olsky/swi-mqtt-pack which does not appear
> to be actively maintained and have been able to publish and subscribe to
> MQTT messages. It looks like this repository bundles an old version of
> the mosquitto library which caused some problems with subscribe, but
> once I used an up to date library that problem went away - so far so good.
>
> One requirement I've got for my use of Prolog in my Smarthome system is
> to be able to pass binary blobs (actually images) around and I found
> this MQTT library doesn't work for that. Digging into the C code I've
> found that the author used PL_get_chars(topic, &mqtt_topic, CVT_WRITE |
> BUF_MALLOC) to copy from the term_t into the message buffer used for
> mosquitto - since PL_get_chars returns a null terminated string that's
> certainly not going to work for me. Since MQTT messages are binary
> blobs there should be another way.

Just replace that by PL_get_nchars() and you can represent blobs in
atoms, strings and lists of character codes. Just make sure all
`characters' are in the range 0..255. If one is higher it internally
switches to wide character arrays and it won't work any longer.

> To support the work I'm about to do I've forked the SWI MQTT pack on
> Github, my fork is at https://github.com/sprior/swi-mqtt-pack. The
> code of interest is c/mqtt.c around lines 716-729. I know that binary
> blobs aren't explicitly supported by SWI-Prolog, but 10 years ago when I
> did this for CORBA Jan mentioned that Strings are internally stored as
> an array of unsigned chars, so binary data should actually work there,
> and it has - that's been working for 10 years! When I did it I wrote my
> code in C++, not C so it was a lot easier. The one problem I had at the
> time was how to determine the length of the char buffer, so I ended up
> doing it ont the Prolog side with string_length/2:
>
> filestore_storeFile(ServerMachine, Description, Mimetype, Contents):-
> string_length(Contents,Length),
> filestore_storeFile(ServerMachine, Description, Mimetype, Contents,
> Length).

So, see above. Might not have existed by then, but I doubt it. Just,
there is no C++ support for it. You can simply call the C API from
C++ though.
> filestore_storeFile was implemented in C++.
>
> So now with the MQTT code I'm looking at the code:
> where term_t payload;
> char* mqtt_payload;
> PL_get_chars(payload, &mqtt_payload, CVT_WRITE | BUF_MALLOC)
> mosq_rc = mosquitto_publish(m->mosq, &mid, mqtt_topic,
> strlen(mqtt_payload), mqtt_payload, qos, retain);
>
> Is it more correct to use
> int *PL_get_pointer*(term_t +t, void **ptr)

No no. PL_get_pointer() is only the inverse of PL_put_pointer(), which
allows storing a pointer as a Prolog integer. There is some logic in
there that changes the pointer representation such that it typically
becomes a small integer which is stored more efficiently in Prolog.
Its use is generally not encouraged these days.

> instead of PL_get_chars
> and is there any way to ask a term_t for its length to avoid the call to
> string_length in the Prolog wrapper?

See above.

Cheers --- Jan
> --
> You received this message because you are subscribed to the Google
> Groups "SWI-Prolog" group.
> To unsubscribe from this group and stop receiving emails from it, send
> an email to swi-prolog+...@googlegroups.com
> <mailto:swi-prolog+...@googlegroups.com>.
> Visit this group at https://groups.google.com/group/swi-prolog.
> For more options, visit https://groups.google.com/d/optout.

Steve Prior

unread,
Jun 19, 2017, 9:05:26 AM6/19/17
to SWI-Prolog, spr...@geekster.com
Thanks!  So I create a size_t and pass it as the second arg to PL_get_nchars and it fills in the size of the blob returned?
I'm running SWI Prolog 6.2.2 at the moment, might have to upgrade first.

Steve

Steve Prior

unread,
Jun 19, 2017, 11:32:33 PM6/19/17
to SWI-Prolog, spr...@geekster.com

Thanks for your help, I got it working tonight!  As a test I was able to use Prolog to send a MQTT message to request an image from a Java program which was delivered back to Prolog via a MQTT topic it subscribed to.  The Prolog code then sent that image as a MQTT message to another Java program that stored it as a blob in a database table.  I was then able to view that stored image via a webapp.  I was also able to subscribe and receive regular text MQTT messages.  The GitHub repository I forked has been updated with the working code.  There are still some issues with the MQTT glue code - it'll coredump if you try to exit Prolog without unsubscribing to any topics you've subscribed to and disconnected from MQTT.  There are also some things I really don't like about the code I forked from (including lots of GOTOs), but it seems to work for now so I can start writing the new version of my smarthome system "brain".

Steve

Jan Burse

unread,
Jun 22, 2017, 11:18:15 AM6/22/17
to SWI-Prolog
Thats very interesting. On a broader scale MQTT looks like it poses
some subproblems of good ole product configuration (aka sometimes
viewed as Expert Systems). Here is a nice video:

MQTT Buddy [how-to]
https://www.youtube.com/watch?v=m6O835pMR_I

According to the MQTT wiki an essential element of MQTT is that
somehow slow and stupid sensor data is mirrored and stored
on a server. There is a lot of demand for flexible data I guess.

RDF/SPARQL could be also a solution, but possibly MQTT has its
own approaches, even an own set of verbs it seems. Anything you
done besides the library, also some examples somewhere?

Like on github or so?

Jan Burse

unread,
Jun 22, 2017, 11:19:27 AM6/22/17
to SWI-Prolog
Your example in the fork?

Steve Prior

unread,
Jun 22, 2017, 6:32:23 PM6/22/17
to SWI-Prolog
I haven't touched the examples in the fork yet.  Actually I've realized the forked code still seems to have a memory leak, received binary messages work fine, sending binary messages work, but then when you shut down Prolog it coredumps.  I'll be working on this.

I'll have to figure out what to do about examples.  I should at least clean up the examples the original author supplied - I found out the hard way that two of the connection examples are mutually exclusive, but not documented as such.  I'm willing to share more, I just haven't figured out what.  In the meantime I can answer questions.  Sending /receiving messages in MQTT is quite simple in a number of languages and if you want a drag and drop dashboard environment I can say that NODE-RED makes it REALLY easy.  As for sending MQTT messages via ESP8266 in the Arduino environment, there are lots of good examples of that around.  What I want to get to is an example that uses the connection manager library in combination with then sending/receiving messages via MQTT in such a way that you don't have to change the source code for the initial configuration.  I made progress on this, but want to do the Prolog side of things first.

The main problem with sharing my specific examples is that none of the pieces are really that complicated, but they are specific to my own environment (network and devices) so as such they wouldn't really be usable to anyone else.

The problem I haven't actually solved is the original request of this thread - so far I'm rewriting the code so that there are no situations where a request must wait for a response, this also means I don't have to worry about threading.  But there are cases where I can imagine it doesn't make sense to continue until a response is received and someday I'll have to deal with that.  But I can at least rewrite the behaviors I already had with the CORBA based transport now in MQTT.

Steve Prior

unread,
Jun 22, 2017, 6:53:49 PM6/22/17
to SWI-Prolog
It depends on what you consider a problem.  MQTT is a simpler version of MQ, MQTT is more targeted at being
simple enough to implement on small/dumb devices.  Yes it does require you to have a central message broker, but that's
easy to set up (the one I've got installed is called Mosquitto on Linux) with the warning that I've only got it set up to allow
connections with within my home network - I do not have it exposed to the Internet.  Mosquitto supports encryption for clients
robust enough to support it which you'd want to do if you were going to allow an incoming connection from your smartphone (MQTT buddy or otherwise).
There are public MQTT servers out there, but I'm a own your own infrastructure type guy.

But that broker has some real advantages for the system.  For one all the pieces only need to be configured to connect to that broker, they don't have
to be aware of or have connectivity to each other.  So if you were going to use a public MQTT broker you could do so with only outbound connections
from your LAN, you would not have to open any incoming connections.

You only have to come up with whatever message "topics" you are going to use for your system and what message bodies you expect on those topics.
The sender of a message does not know or care how many clients are subscribed to a message it sends (if any).  Obviously no subscribers would be pretty useless, but it wouldn't break anything and could be temporary.  The message broker handles all those details.  It also handles "quality of service" which you referred to which means whether a missed message is stored by the broker until it can be delivered, whether the message has to be acknowledged as received, or whether it's just sent on a best effort basis and dropped if not delivered.  A dump device can send the message to the broker with one QOS and the clients can decide for themselves what QOS they require.

I will mention that in my personal environment I've got lots of facilities available to play with - I've got my home linux servers running as virtual machines including DHCP, DNS (internal to my network and external to the Internet), email, web server, database...  For example I'm working on the new Prolog "brain" code on a brand new virtual machine with SWI, compilers, and Mosquitto libraries installed.  I'm a geek.

Steve

Steve Prior

unread,
Jun 23, 2017, 12:28:24 AM6/23/17
to SWI-Prolog, spr...@geekster.com


Just replace that by PL_get_nchars() and you can represent blobs in
atoms, strings and lists of character codes.  Just make sure all
`characters' are in the range 0..255.  If one is higher it internally
switches to wide character arrays and it won't work any longer.

I'm somewhat stuck again.  I did replace the call with:
PL_get_nchars(payload, &payload_length, &mqtt_payload, CVT_WRITE | BUF_MALLOC)
and it works, but then when I eventually shutdown the program it coredumps (and yes I do call
PL_free(mqtt_payload); after this call I added debug messages:

  _LOG("--- payload length %lu \n", payload_length);
  _LOG("--- size_t length %lu \n", sizeof (size_t));

And when run I get:
 --- payload length 185216
--- size_t length 8
The payload length was correct.
payload is declared as a term_t being passed into the function.

I'm now running on SWI Prolog ver 7.2.3

So it really appears to me that the PL_get_nchars is overrunning a buffer somewhere.  I tried reading the SWI source code and didn't make it much past the source for PL_get_nchars itself.

But I'm also wondering if I really need to be calling PL_get_nchars at all.  Very shortly after this in the code I'm actually making the call to publish the message via MQTT and that call reads and then puts the message on the wire, but doesn't not modify the buffer passed to it.  It sounds like PL_get_nchars is making a new copy of the term as an unsigned char * - it seems that if the incoming term_t already has a buffer of those values that I could just pass that instead (but how do I get a pointer to that?) - the only other thing I'd need is the number of bytes of the length of the value.

Any hints?

Jan Wielemaker

unread,
Jun 23, 2017, 2:49:25 AM6/23/17
to Steve Prior, SWI-Prolog
On 06/23/2017 06:28 AM, Steve Prior wrote:
>
>
> Just replace that by PL_get_nchars() and you can represent blobs in
> atoms, strings and lists of character codes. Just make sure all
> `characters' are in the range 0..255. If one is higher it internally
> switches to wide character arrays and it won't work any longer.
>
>
> I'm somewhat stuck again. I did replace the call with:
> PL_get_nchars(payload, &payload_length, &mqtt_payload, CVT_WRITE |
> BUF_MALLOC)

It is kind of unlikely you only want CVT_WRITE. Most likely you want
CVT_ATOM|CVT_STRING|CVT_LIST to handle atoms, strings and code/char
lists. Typically you add CVT_EXCEPTION so that if the conversion
fails it leaves the appropriate exception in the environment.

> and it works, but then when I eventually shutdown the program it
> coredumps (and yes I do call
> PL_free(mqtt_payload); after this call I added debug messages:
>
> _LOG("--- payload length %lu \n", payload_length);
> _LOG("--- size_t length %lu \n", sizeof (size_t));
>
> And when run I get:
> --- payload length 185216
> --- size_t length 8
> The payload length was correct.
> payload is declared as a term_t being passed into the function.

!? a term_t is the size of a pointer (8 byte). term_t is a difficult
type where you have to be very careful about the scoping. Returning
control back to Prolog invalidates all the term handles that you have,
i.e. you can only use them in the context of a foreign predicate or if
you use foreign frames in their context. Typically, do not put them
in permanent structures unless you really know what you are doing.

> I'm now running on SWI Prolog ver 7.2.3

I'd still upgrade. Why run such an old version?

> So it really appears to me that the PL_get_nchars is overrunning a
> buffer somewhere. I tried reading the SWI source code and didn't make
> it much past the source for PL_get_nchars itself.

I think that is highly unlikely ...

> But I'm also wondering if I really need to be calling PL_get_nchars at
> all. Very shortly after this in the code I'm actually making the call
> to publish the message via MQTT and that call reads and then puts the
> message on the wire, but doesn't not modify the buffer passed to it. It
> sounds like PL_get_nchars is making a new copy of the term as an
> unsigned char * - it seems that if the incoming term_t already has a
> buffer of those values that I could just pass that instead (but how do I
> get a pointer to that?) - the only other thing I'd need is the number of
> bytes of the length of the value.

Without insight in the overall architecture that is hard to comment on.

To resolve memory issues I'd advice running under valgrind. What can
also help is to recompile Prolog using -DO_DEBUG and run it using
swipl -d chk_secure <args>. That adds a lot of additional consistency
checks to the code.

Cheers --- Jan

P.s. For the overall control you may want to have a look at Jeff's
TIPC library. That uses the Prolog broadcast library for
handling TIPC messages inside Prolog. TIPC is not MQTT, but
both are, as I understand it, message based protocols.





>
> Any hints?
Reply all
Reply to author
Forward
0 new messages