Java mongodb-driver-reactivestreams hangs on calls to subscribe()

154 views
Skip to first unread message

volkan...@gmail.com

unread,
Mar 26, 2018, 10:00:07 PM3/26/18
to mongodb-user
Hello,

I am trying to connect to MongoDB (3.6.3) using mongodb-driver-reactivestreams (v1.7.1), but subscribing on the publishers just hangs. Strangely, Wireshark captures *stuff* happening between MongoDB and my application, so things are definitely moving. Any ideas on what might I be missing?

import com.mongodb.MongoCredential;
import com.mongodb.ServerAddress;
import com.mongodb.async.client.MongoClientSettings;
import com.mongodb.connection.ClusterSettings;
import com.mongodb.reactivestreams.client.MongoClient;
import com.mongodb.reactivestreams.client.MongoClients;
import com.mongodb.reactivestreams.client.MongoDatabase;
import org.bson.Document;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;

import java.util.Collections;
import java.util.concurrent.CountDownLatch;

public enum MongoDbTest {;

    public static void main(String[] args) throws InterruptedException {
        MongoCredential credential = MongoCredential.createMongoCRCredential(
                "vyazici", "optimus_data", "secret".toCharArray());
        ClusterSettings clusterSettings = ClusterSettings
                .builder()
                .hosts(Collections.singletonList(new ServerAddress("foo.bar.com")))
                .build();
        MongoClientSettings clientSettings = MongoClientSettings
                .builder()
                .clusterSettings(clusterSettings)
                .credential(credential)
                .build();
        try (MongoClient client = MongoClients.create(clientSettings)) {
            MongoDatabase database = client.getDatabase("optimus_data");
            CountDownLatch latch = new CountDownLatch(1);
            database.listCollections().subscribe(new Subscriber<Document>() {

                @Override
                public void onSubscribe(Subscription s) {
                    System.out.println("subscription");
                }

                @Override
                public void onNext(Document document) {
                    System.out.println("next: " + document);
                }

                @Override
                public void onError(Throwable error) {
                    System.out.println("error");
                    error.printStackTrace();
                    latch.countDown();
                }

                @Override
                public void onComplete() {
                    System.out.println("complete");
                    latch.countDown();
                }

            });
            System.out.println("await");
            latch.await();
        }
    }

}
/*
$ mongo -p -u vyazici foo.bar.com/optimus_data
MongoDB shell version v3.6.3
Enter password:
connecting to: mongodb://foo.bar.com:27017/optimus_data
MongoDB server version: 3.6.3
rs_optimus:PRIMARY> show collections
eenESjson
my_product_export_txt
productwithoffer
*/

Ross Lawley

unread,
Mar 27, 2018, 5:33:41 AM3/27/18
to mongodb-user
Hi,

That all looks good, but the reason you are stuck on the latch.await() is because you have not signaled any demand from the Subscription.  Reactive streams are designed in a way that allows asynchronous processes to signal demand and handle backpressure.  See: http://www.reactive-streams.org/ for more information.  What you are seeing via wireshark are the server discovery and monitoring processes for the connection pool.

All you need to do with your example is signal how much demand you can handle or if you want all the results then you can signal max demand like so:

database.listCollections().subscribe(new Subscriber<Document>() {

                @Override
                public void onSubscribe(Subscription s) {
                    System.out.println("subscription");
                    s.request(Long.MAX_VALUE); // Request all the collections.
                }

                @Override
                public void onNext(Document document) {
                    System.out.println("next: " + document);
                }

                @Override
                public void onError(Throwable error) {
                    System.out.println("error");
                    error.printStackTrace();
                    latch.countDown();
                }

                @Override
                public void onComplete() {
                    System.out.println("complete");
                    latch.countDown();
                }

            });
            System.out.println("await");
            latch.await();
        }

I hope that helps,

Ross
Reply all
Reply to author
Forward
0 new messages