Rabbitmq + Spark Streaming using AMQP

114 views
Skip to first unread message

HanumanthaRao Karyampudi

unread,
May 23, 2018, 2:30:31 AM5/23/18
to rabbitmq-discuss
Hi,

I am trying to fetch RabbitMQ messages using spark streaming and AMQP utils. I wrote the below code but it not fetching any thing and not showing any error. please help me on that.

JavaSparkContext javaSparkContext = new JavaSparkContext(sparkSession.sparkContext());
        JavaStreamingContext javaStreamingContext = new JavaStreamingContext(javaSparkContext, Durations.seconds(Long.parseLong(sparkStreamingInterval)));
        javaStreamingContext.checkpoint(sparkCheckpointDir);

        java.util.Map<String, String> params = new HashMap<String, String>();
        List<JavaRabbitMQDistributedKey> distributedKeys = new LinkedList<JavaRabbitMQDistributedKey>();

        params.put("hosts", "localhost");
        params.put("vHost", "/");
        params.put("userName", "guest");
        params.put("password", "guest");


        distributedKeys.add(new JavaRabbitMQDistributedKey("test",
                new ExchangeAndRouting("test1"),
                params
        ));

        Function<org.apache.qpid.proton.message.Message,scala.Option<java.lang.String>> messageHandler1 = new Function<org.apache.qpid.proton.message.Message, scala.Option<java.lang.String>>() {

            public Option<String> call(Message message) {

                return new Some(message.getBody());
        };

      

        JavaReceiverInputDStream<String>  receiveStream =
                AMQPUtils.createStream(javaStreamingContext,this.host,
                        this.port,
                        Option.apply("guest"),
                        Option.apply("guest"),
                        "test", messageHandler1, StorageLevel.MEMORY_AND_DISK());


      
        receiveStream.foreachRDD(rdd->{
            System.out.println("count"+rdd.count());
        });


Regards
Hanuman

VinaY MeSsi

unread,
Dec 1, 2021, 3:30:36 PM12/1/21
to rabbitmq-discuss
Hello,
Are you able to figure this out?

Reply all
Reply to author
Forward
0 new messages