I am trying to fetch RabbitMQ messages using spark streaming and AMQP utils. I wrote the below code but it not fetching any thing and not showing any error. please help me on that.
JavaSparkContext javaSparkContext = new JavaSparkContext(sparkSession.sparkContext());
JavaStreamingContext javaStreamingContext = new JavaStreamingContext(javaSparkContext, Durations.seconds(Long.parseLong(sparkStreamingInterval)));
javaStreamingContext.checkpoint(sparkCheckpointDir);
java.util.Map<String, String> params = new HashMap<String, String>();
List<JavaRabbitMQDistributedKey> distributedKeys = new LinkedList<JavaRabbitMQDistributedKey>();
params.put("hosts", "localhost");
params.put("vHost", "/");
params.put("userName", "guest");
params.put("password", "guest");
distributedKeys.add(new JavaRabbitMQDistributedKey("test",
new ExchangeAndRouting("test1"),
params
));
Function<org.apache.qpid.proton.message.Message,scala.Option<java.lang.String>> messageHandler1 = new Function<org.apache.qpid.proton.message.Message, scala.Option<java.lang.String>>() {
public Option<String> call(Message message) {
return new Some(message.getBody());
};
JavaReceiverInputDStream<String> receiveStream =
AMQPUtils.createStream(javaStreamingContext,this.host,
this.port,
Option.apply("guest"),
Option.apply("guest"),
"test", messageHandler1, StorageLevel.MEMORY_AND_DISK());
receiveStream.foreachRDD(rdd->{
System.out.println("count"+rdd.count());
});