RabbitMQ and Spark Streaming Error Connection Refused

352 views
Skip to first unread message

Maka R

unread,
Jun 29, 2017, 8:12:50 AM6/29/17
to rabbitmq-discuss
I am trying to connect to a remote RabbitMQ service (which I have tested for connectivity using a separate python code), using Spark Streaming. I'm using a custom receiver in this way:

import com.rabbitmq.client._
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.QueueingConsumer;
import org.apache.spark.streaming.rabbitmq.RabbitMQUtils
import java.io.{BufferedReader, InputStreamReader}
import java.net.Socket
import java.nio.charset.StandardCharsets
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.dstream.{InputDStream, ReceiverInputDStream}
import org.apache.spark.SparkContext
import org.apache.spark.streaming._
import org.apache.spark.streaming.receiver._
import org.apache.spark.internal.Logging

object CustomReceiverUT {
       
def main(args: Array[String]) {

               
//RabbitMQ things
                val host
= "1.2.3.4"
                val port
= "5672"
                val queueName
= "Test_Wave"
                val vHost
= "/"
                val userName
= "mhadmin"
                val password
= "password"

               
//Configure Spark properties
                val sparkConfig
= new SparkConf()
                 
.setAppName("Spark-Stream-Test")
                 
.setIfMissing("spark.master", "local[*]")
                val ssc
=  new StreamingContext(sparkConfig, Seconds(5))
                val receiverStream
= RabbitMQUtils.createStream(ssc, Map(
                       
"host" -> host,
                       
"port" -> port,
                       
"queueName" -> queueName,
                       
"vHost" -> vHost,
                       
"userName" -> userName,
                       
"password" -> password
               
))
                val totalEvents
= ssc.sparkContext.longAccumulator("My Accumulator")
                receiverStream
.start()
                println
("started receiverStream")
                 receiverStream
.foreachRDD(rdd => {
                     
if (!rdd.isEmpty()) {
                        val count
= rdd.count()
                           
// Do something with this message
                                println
(s"EVENTS COUNT : \t $count")
                                totalEvents
.add(count)
                         
//rdd.collect().sortBy(event => event.toInt).foreach(event => print(s"$event, "))
                   
} else println("RDD is empty")
                                println
(s"TOTAL EVENTS : \t $totalEvents")
               
})

                ssc
.start()
                ssc
.awaitTermination()
}
}


However

, when I run this code, I keep getting a connection refused error:

17/06/27 16:04:47 INFO ReceiverSupervisorImpl: Starting receiver 0
17/06/27 16:04:47 ERROR RabbitMQReceiver: Could not connect
17/06/27 16:04:47 INFO ReceiverSupervisorImpl: Called receiver 0 onStart
17/06/27 16:04:47 INFO ReceiverSupervisorImpl: Waiting for receiver to be stopped
17/06/27 16:04:47 WARN ReceiverSupervisorImpl: Restarting receiver with delay 2000 ms: Could not connect
java
.net.ConnectException: Connection refused (Connection refused)
        at java
.net.PlainSocketImpl.socketConnect(Native Method)
        at java
.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350)
        at java
.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206)
        at java
.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188)
        at java
.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
        at java
.net.Socket.connect(Socket.java:589)
        at com
.rabbitmq.client.impl.SocketFrameHandlerFactory.create(SocketFrameHandlerFactory.java:50)
        at com
.rabbitmq.client.impl.recovery.RecoveryAwareAMQConnectionFactory.newConnection(RecoveryAwareAMQConnectionFactory.java:60)
        at com
.rabbitmq.client.impl.recovery.AutorecoveringConnection.init(AutorecoveringConnection.java:99)
        at com
.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:911)
        at com
.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:870)
        at com
.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:692)
        at org
.apache.spark.streaming.rabbitmq.consumer.Consumer$.org$apache$spark$streaming$rabbitmq$consumer$Consumer$$addConnection(Consumer.scala:269)
        at org
.apache.spark.streaming.rabbitmq.consumer.Consumer$$anonfun$1.apply(Consumer.scala:251)
        at org
.apache.spark.streaming.rabbitmq.consumer.Consumer$$anonfun$1.apply(Consumer.scala:251)
        at scala
.collection.MapLike$class.getOrElse(MapLike.scala:128)
        at scala
.collection.AbstractMap.getOrElse(Map.scala:59)
        at org
.apache.spark.streaming.rabbitmq.consumer.Consumer$.getChannel(Consumer.scala:251)
        at org
.apache.spark.streaming.rabbitmq.consumer.Consumer$.apply(Consumer.scala:207)
        at org
.apache.spark.streaming.rabbitmq.receiver.RabbitMQReceiver$$anonfun$3.apply(RabbitMQInputDStream.scala:60)
        at org
.apache.spark.streaming.rabbitmq.receiver.RabbitMQReceiver$$anonfun$3.apply(RabbitMQInputDStream.scala:59)
        at scala
.util.Try$.apply(Try.scala:192)
        at org
.apache.spark.streaming.rabbitmq.receiver.RabbitMQReceiver.onStart(RabbitMQInputDStream.scala:59)
        at org
.apache.spark.streaming.receiver.ReceiverSupervisor.startReceiver(ReceiverSupervisor.scala:149)
        at org
.apache.spark.streaming.receiver.ReceiverSupervisor.start(ReceiverSupervisor.scala:131)
        at org
.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverTrackerEndpoint$$anonfun$9.apply(ReceiverTracker.scala:597)
        at org
.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverTrackerEndpoint$$anonfun$9.apply(ReceiverTracker.scala:587)
        at org
.apache.spark.SparkContext$$anonfun$33.apply(SparkContext.scala:1976)
        at org
.apache.spark.SparkContext$$anonfun$33.apply(SparkContext.scala:1976)
        at org
.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
        at org
.apache.spark.scheduler.Task.run(Task.scala:86)
        at org
.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
        at java
.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java
.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java
.lang.Thread.run(Thread.java:748)
17/06/27 16:04:47 INFO ReceiverSupervisorImpl: Stopping receiver with message: Restarting receiver with delay 2000ms: Could not connect: java.net.ConnectException: Connection refused (Connection refused)
17/06/27 16:04:47 INFO RabbitMQReceiver: Closed all RabbitMQ connections
17/06/27 16:04:47 INFO ReceiverSupervisorImpl: Called receiver onStop
17/06/27 16:04:47 INFO ReceiverSupervisorImpl: Deregistering receiver 0
17/06/27 16:04:47 ERROR ReceiverTracker: Deregistered receiver for stream 0: Restarting receiver with delay 2000ms: Could not connect - java.net.ConnectException: Connection refused (Connection refused)
        at java
.net.PlainSocketImpl.socketConnect(Native Method)
        at java
.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350)
        at java
.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206)
        at java
.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188)
        at java
.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
        at java
.net.Socket.connect(Socket.java:589)
        at com
.rabbitmq.client.impl.SocketFrameHandlerFactory.create(SocketFrameHandlerFactory.java:50)
        at com
.rabbitmq.client.impl.recovery.RecoveryAwareAMQConnectionFactory.newConnection(RecoveryAwareAMQConnectionFactory.java:60)
        at com
.rabbitmq.client.impl.recovery.AutorecoveringConnection.init(AutorecoveringConnection.java:99)
        at com
.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:911)
        at com
.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:870)
        at com
.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:692)
        at org
.apache.spark.streaming.rabbitmq.consumer.Consumer$.org$apache$spark$streaming$rabbitmq$consumer$Consumer$$addConnection(Consumer.scala:269)
        at org
.apache.spark.streaming.rabbitmq.consumer.Consumer$$anonfun$1.apply(Consumer.scala:251)
        at org
.apache.spark.streaming.rabbitmq.consumer.Consumer$$anonfun$1.apply(Consumer.scala:251)
        at scala
.collection.MapLike$class.getOrElse(MapLike.scala:128)
        at scala
.collection.AbstractMap.getOrElse(Map.scala:59)
        at org
.apache.spark.streaming.rabbitmq.consumer.Consumer$.getChannel(Consumer.scala:251)
        at org
.apache.spark.streaming.rabbitmq.consumer.Consumer$.apply(Consumer.scala:207)
        at org
.apache.spark.streaming.rabbitmq.receiver.RabbitMQReceiver$$anonfun$3.apply(RabbitMQInputDStream.scala:60)
        at org
.apache.spark.streaming.rabbitmq.receiver.RabbitMQReceiver$$anonfun$3.apply(RabbitMQInputDStream.scala:59)
        at scala
.util.Try$.apply(Try.scala:192)
        at org
.apache.spark.streaming.rabbitmq.receiver.RabbitMQReceiver.onStart(RabbitMQInputDStream.scala:59)
        at org
.apache.spark.streaming.receiver.ReceiverSupervisor.startReceiver(ReceiverSupervisor.scala:149)
        at org
.apache.spark.streaming.receiver.ReceiverSupervisor.start(ReceiverSupervisor.scala:131)
        at org
.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverTrackerEndpoint$$anonfun$9.apply(ReceiverTracker.scala:597)
        at org
.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverTrackerEndpoint$$anonfun$9.apply(ReceiverTracker.scala:587)
        at org
.apache.spark.SparkContext$$anonfun$33.apply(SparkContext.scala:1976)
        at org
.apache.spark.SparkContext$$anonfun$33.apply(SparkContext.scala:1976)
        at org
.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
        at org
.apache.spark.scheduler.Task.run(Task.scala:86)
        at org
.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
        at java
.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java
.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java
.lang.Thread.run(Thread.java:748)


I've tried as much as I can to troubleshoot, I have checked that there is no firewall or network connectivity issues between the two servers. I have also checked that the credential, IP, Port, Queue, Username, and Password are all correct.

Any help would be greatly appreciated!

MG2729

unread,
Jul 30, 2017, 2:19:40 PM7/30/17
to rabbitmq-discuss
I am facing the same issue, I am trying to connect to our RabbitMQ platform, whereas, its working fine when I connect to localhost. Please let me know if you found any resolution for this?

VinaY MeSsi

unread,
Dec 1, 2021, 3:31:44 PM12/1/21
to rabbitmq-discuss
Hello,
Could you please let me know if you were able to solve this issue.

Thanks.

Reply all
Reply to author
Forward
0 new messages