Subscription not set! health check errors

46 views
Skip to first unread message

Jerry Lawson

unread,
Sep 2, 2020, 1:22:37 AM9/2/20
to SmallRye
We've been using @Incoming Subscriber<Message<T>> method() signature since at least quarkus 1.5.0 (smallrye-reactive 2.0.2) but now with quarkus 1.7.0 (smallrye 2.2.0), this results in a /health liveness check with this type of response:

    "checks": [
        {
            "name": "SmallRye Reactive Messaging",
            "status": "DOWN",
            "data": {
                "[metrics]": "[KO] - Subscription not set!"
            }
        },

when it invokes SafeSubscriber.onNext() and that instance does not have an upstream.

This onError() was added sometime between 2.0.2 and 2.2.0:

Even though we can disable healthcheck on a per-channel via property "health-enabled", the HealthCenter class always reports the generated NPE generated from SubscriberWrapper/SafeSubscriber.

As I mentioned above, this used to behave differently. It should be noted that the subscription does allow data to be received at our Subscriber's onNext() as expected, but the healthchecks in kuberntes are now failing and kube restarts the pod.

Is there a better way of using the 

trisp...@googlemail.com

unread,
Sep 3, 2020, 5:33:41 PM9/3/20
to SmallRye
Hi,

I have also encountered this problem and am now confused as to how to use subscribers without causing a failed health check. Definitely interested in a solution to this.

Cheers,

Stuart

clement escoffier

unread,
Sep 4, 2020, 1:38:38 AM9/4/20
to SmallRye
Hello,

That's definitely a bug. It may actually be 2 bugs.
First, having a Subscriber<Message<T>> should not have an impact on the health check. Now I can see why it could and explain the issue you are seeing. 
While you are providing a subscriber (which BTW is a rather rare case, I would be interested in the use case as implementing a Subscriber is far from being easy), there is a downstream in reactive messaging. So we have something like this:

upstream -> your subscriber -> downstream

It looks like the downstream does not get a subscription but get another signal (item, failure, completion). This is a reactive streams protocol violation as before getting any signal you must have received a subscription. So that's bug #1. I've created (https://github.com/smallrye/smallrye-reactive-messaging/issues/719).
Now, the potential bug #2. Failures should be terminal, meaning that your application should not receive any more messages. That does not mean to be the case, so cancellation is missing somewhere. 

As a temporary workaround, you can replace the subscriber with:

CompletionStage<Void> consume(Message<I> message) {
     // process...
    return message.ack(); (or nack if something bad happened)
}

Clement

--
You received this message because you are subscribed to the Google Groups "SmallRye" group.
To unsubscribe from this group and stop receiving emails from it, send an email to smallrye+u...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/smallrye/3cf48d75-e839-4acc-9416-70e719c83b65n%40googlegroups.com.

clement escoffier

unread,
Sep 4, 2020, 1:40:29 AM9/4/20
to SmallRye
And ... the first bug is at line: https://github.com/smallrye/smallrye-reactive-messaging/blob/master/smallrye-reactive-messaging-provider/src/main/java/io/smallrye/reactive/messaging/SubscriberWrapper.java#L44
(subscription not passed downstream)

The second bug is at the same line (subscription must be passed and cancellation propagated upstream).

Clement

Jerry Lawson

unread,
Sep 4, 2020, 2:05:19 AM9/4/20
to SmallRye

Thanks for looking into this. The use-case for using Subscriber is to be able to control the number of requests in the Subscription. We're also using MANUAL Acknowlegement Strategy, to be able to control when the incoming Messages get acked. We're consuming data from a kafka topic, but we want to be able to control how many incoming data we can handle, asynchronously. We set the initial subscription.request() to a reasonable number and then increment the request as more asynchronous handlers are added and the data gets processed. The asynchronous processing expects to receive a "batch" of data (multiple kafka Messages) before it does something with them. After it has completed its processing of those Messages, it will ack() and then call subscription.request(n) to keep the pipeline full of data. 

I have temporarily converted to using the @Incoming, as you suggested. But this does not give us the ability to control the number of outstanding requests at the Subscription, via subscription.request(n).

Previously with Subscriber<Message<T>>, in quarkus, the onSubscribe() gets automatically invoked and we start receiving Messages from the kafka topic. We didn't need to wire up an upstream to start receiving data. Is there an example of how we should be wiring things up to our Subscriber? It's not clear to me after reviewing the SubscriberMediator what I should be doing differently.

clement escoffier

unread,
Sep 4, 2020, 2:24:38 AM9/4/20
to SmallRye
Le ven. 4 sept. 2020 à 08:05, Jerry Lawson <jerry....@moogsoft.com> a écrit :

Thanks for looking into this. The use-case for using Subscriber is to be able to control the number of requests in the Subscription. We're also using MANUAL Acknowlegement Strategy, to be able to control when the incoming Messages get acked. We're consuming data from a kafka topic, but we want to be able to control how many incoming data we can handle, asynchronously. We set the initial subscription.request() to a reasonable number and then increment the request as more asynchronous handlers are added and the data gets processed. The asynchronous processing expects to receive a "batch" of data (multiple kafka Messages) before it does something with them. After it has completed its processing of those Messages, it will ack() and then call subscription.request(n) to keep the pipeline full of data. 

I have temporarily converted to using the @Incoming, as you suggested. But this does not give us the ability to control the number of outstanding requests at the Subscription, via subscription.request(n).

Previously with Subscriber<Message<T>>, in quarkus, the onSubscribe() gets automatically invoked and we start receiving Messages from the kafka topic. We didn't need to wire up an upstream to start receiving data. Is there an example of how we should be wiring things up to our Subscriber? It's not clear to me after reviewing the SubscriberMediator what I should be doing differently.

The problem is in SmallRye Reactive Messaging, not in your code. I will fix it. 

That's a nice use case you have. Yes, to manage the flow control yourself you need to use Subscriber. So, as a workaround, you may be able to do something like this:

@Incoming(...)
@Outgoing("dev/null")
Processor<Message<X>, Message<X>> processor() {
 // like your subscriber, but as a processor, so you can handle the requests
}

@Incoming("dev/null")
void blackhole(X x) {
   // do nothing, just here to ignore the messages
}

Clement
 

clement escoffier

unread,
Sep 4, 2020, 5:38:03 AM9/4/20
to SmallRye

Jerry Lawson

unread,
Sep 4, 2020, 3:41:10 PM9/4/20
to SmallRye
Is there an early distribution of the jars we can use, before it's officially published as 2.4.0 (or whatever the final version is) ?

clement escoffier

unread,
Sep 6, 2020, 3:00:31 AM9/6/20
to SmallRye
Reply all
Reply to author
Forward
0 new messages