draining connections gracefully for nats streaming

703 views
Skip to first unread message

Dinçer Kavraal

unread,
Oct 30, 2019, 8:33:11 AM10/30/19
to nats
Hi,

I have read intro to draining for nats server at https://nats-io.github.io/docs/developer/receiving/drain.html . I tried to revise that for streaming (below).
However I encounter problems (at the bottom), such as the client still trying to ping the server (and fail the app) while I'm waiting for drainage to end.

I could not successfully and gracefully unsubscribe and close nats/nats-stream as a nats-streaming listener. Do you know how to make it right?

Here is my revised test code:

  1. const STAN = require('node-nats-streaming');  
  2.   
  3. let stan = STAN.connect('stan''test', { url: "nats://127.0.0.1:4222", name: 'test' });  
  4. const subject = "Test";  
  5.   
  6. stan.publish(subject, Buffer.from('test 1 msg''utf-8'), (err, guid) => { if(err) console.log(err); });  
  7. stan.publish(subject, Buffer.from('test 2 msg''utf-8'), (err, guid) => { if(err) console.log(err); });  
  8. stan.publish(subject, Buffer.from('test 3 msg''utf-8'), (err, guid) => { if(err) console.log(err); });  
  9. stan.publish(subject, Buffer.from('test 4 msg''utf-8'), (err, guid) => { if(err) console.log(err); });  
  10.   
  11.   
  12. var subs;  // not a good idea, just for test  
  13. stan.on('connect', () => {  
  14.     subs = stan.subscribe(subject, 'test-q', stan.subscriptionOptions());  
  15.     subs.on('message', (msg) => { });  
  16. });  
  17.   
  18. setTimeout(() => {  
  19.     testDrain();  
  20. }, 5000);  
  21.   
  22. testDrain = () => {  
  23.     subs.unsubscribe(); // is this the way it should be?  
  24.     stan.nc.drain((err)=> {  
  25.         if(err) {  
  26.             console.log(err);  
  27.         }  
  28.         console.log('connection is closed:', stan.nc.closed);  
  29.     });  
  30. };  

It yields:

connection is closed: true

events.js:173
      throw er; // Unhandled 'error' event
      ^
NatsError: Connection draining
    at Client.publish (.../node_modules/nats/lib/nats.js:1529:32)
    at Timeout.pingFun [as _onTimeout] (.../node_modules/node-nats-streaming/lib/stan.js:361:33)
    at listOnTimeout (internal/timers.js:531:17)
    at processTimers (internal/timers.js:475:7)
Emitted 'error' event at:
    at Client.publish (.../node_modules/nats/lib/nats.js:1529:18)
    at Timeout.pingFun [as _onTimeout] (.../node_modules/node-nats-streaming/lib/stan.js:361:33)
    at listOnTimeout (internal/timers.js:531:17)
    at processTimers (internal/timers.js:475:7)

and then immediately terminates.

Alberto Ricart

unread,
Oct 30, 2019, 9:49:55 AM10/30/19
to nats
The issue you have is that you are draining the underlying NATS connection while the STAN connection has no such notion.

If you wanted to gracefully exit, you should:

- Associate an 'unsubscribed' handler on the stan subscription
- call unsubscribe on the subscription
- wait for the handler to trigger
- call close on STAN.


The nats-streaming client interactions are request-reply, and because you are typically interacting with another process (client -> nats-server -> streaming-server -> nats-server -> client) you have to allow for the streaming server to return responses back to you before the connection is closed to ensure that all inflight requests correctly finished or failed.

As an aside, while sharing a connection or interacting with the NATS connection seems convenient, likely is bad practice to do that The STAN connection has encoding requirements, and lifecycle requirements that are managed by it's library. You should likely not interact with the NATS connection directly.

Dinçer Kavraal

unread,
Oct 30, 2019, 9:53:46 AM10/30/19
to nats
Hi Alberto,

How can I make sure client has returned back? Do you mean the stream's publish callback (without an error)?

Thanks for the advice

Alberto Ricart

unread,
Oct 30, 2019, 10:06:42 AM10/30/19
to nat...@googlegroups.com
You add listeners to the subscription object you received:

sub.on('error', (err) => { //do something});
sub.on('unsubscribe', () => { //unsubscribed handled });
sub.on('close', ()= {// this is called when you close close instead of unsubscribe});

Now one thing that you should keep in mind is that nats-streaming server keeps track of your client, so even if you close with messages in flight, when you reconnect those messages will be replayed to you. With that said, it is possible for you to see messages you have seen (if server never saw you 'acking' a message for example).



--
You received this message because you are subscribed to the Google Groups "nats" group.
To unsubscribe from this group and stop receiving emails from it, send an email to natsio+un...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/natsio/8b901a51-c87d-4bac-a811-c0406bc872bc%40googlegroups.com.

Dinçer Kavraal

unread,
Oct 30, 2019, 10:16:23 AM10/30/19
to nat...@googlegroups.com
Obviously. Thank you very much.


Reply all
Reply to author
Forward
0 new messages