Could not deliver value due to lack of requests when .onBackpressureBuffer present

178 views
Skip to first unread message

Scorpio The Dark

unread,
Aug 31, 2021, 2:31:21 AM8/31/21
to RxJava
good day. i have folow like this 
this.disposableSubscriptionOnUpstream = Flowable.fromPublisher(upstream)
.onBackpressureBuffer(
1000
, () -> {
messagesDropped.incrementAndGet();
log.error("буфер для канала телеметрии {} переполнен"
, channelId);
}
, BackpressureOverflowStrategy.DROP_OLDEST)
.observeOn(this.scheduler)
.subscribeOn(this.scheduler)
.subscribe(this::onNext,this::onError);

private void onNext(T element) {
writer.write(element).addCallback(this::onWriteSuccess, this::onWriteFailure);
}

this.scheduler = Schedulers.from(Executors.newFixedThreadPool(1,
new RxThreadFactory("ThreadForSinkFor" + channelId))
)

i have about 1K of upstreams and about ten or twenty of those sources are emiting values at rate of 3-6 elemets per seconds. in those flows i got error  catched in this::onError block
error is io.reactivex.exceptions.MissingBackpressureException: Could not deliver value due to lack of requests. upstream is BehaviorProcessor . can you advice why  lack of requests can happens when .onBackpressureBuffer is present in flow 

Dávid Karnok

unread,
Aug 31, 2021, 2:59:52 AM8/31/21
to Scorpio The Dark, RxJava
Hi.

As I mentioned in the Gitter.im thread, the exception should not happen unless something else is going on between the BehaviorProcessor and onBackpressureBuffer. Please provide a self-contained example that reproduces the issue.

--
You received this message because you are subscribed to the Google Groups "RxJava" group.
To unsubscribe from this group and stop receiving emails from it, send an email to rxjava+un...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/rxjava/a61f5326-6563-4025-9180-7205dd70d714n%40googlegroups.com.


--
Best regards,
David Karnok
Reply all
Reply to author
Forward
0 new messages