PlaySpec - test on websocket did'nt work on localhost but works on heroku

64 views
Skip to first unread message

Kilic Ali-Firat

unread,
Aug 25, 2016, 1:14:58 PM8/25/16
to play-framework
Hi guys, 

I have an issue that I cannot fix since the beginning of this week. The context on the test is that I'm reading some data from Kafka and pushing it to a websocket. The test checks if the number of messages from websocket is the same than the number of messages read from kafka. In a first time, I wrote my test based on a app that we deployed on Heroku and the test didn't fail but now, I want to test it in local and I cannot know why it's doesn't work. So first, here my conf.routes file :
# Routes
# This file defines all application routes (Higher priority routes




GET  
/ping      @com.bioserenity.bps.controller.WebSocketController.ping


GET  
/socket/open/device @com.bioserenity.bps.controller.WebSocketController.openSocketOfDevice(deviceName : String ?= "", sessionId :String ?= "")

Here the core code from the test : 
import org.scalatest.BeforeAndAfter
import org.scalatest._
import org.scalatestplus.play._
import play.api.test._
import play.api.Play.current
import play.api.test.Helpers._
import play.api.test.TestServer
import play.api.Logger

class TEST_KAFKA_TO_WEBSOCKET  extends PlaySpec with BeforeAndAfterAll {


 
var i = 0
  val port
= 8080
  val test_server
= TestServer(port)
  val periodMs
= 5L
  val duringMs
= 500L
  val awaitWebSocketInit
= 1
  val awaitKafka
= 5


 
def getDevicename = {
    val config
= Play.application().configuration()
    config
.getString("test.deviceName")
 
}


 
def initKafkaOutput(deviceName : String, session : String) = {
    val config
= Play.application().configuration()
    val topic
= config.getString("kafka.topic")
    val partitionNumber
= config.getInt("kafka.partitionNumber")
    val saslPath
= getClass.getResource("/resources/sasl.conf").getPath
   
KafkaSettings.sasl(saslPath)
    val kafkaProducer
= KafkaSettings.producer()
    kafkaProducer
.put("ssl.truststore.location", getClass().getResource("/resources/trustore.jks").getPath)
    val partitionKey
= deviceName + "_" + session
   
Logger.info("Partition key -> " + partitionKey)
   
// custom class wrapping kafka producer  
   
new KafkaOutput(kafkaProducer, topic, partitionNumber, partitionKey)
 
}


 
def initSimulatorInput(x : Byte) = new ConstantInput(x)


 
def initWebSocket(deviceName : String, session : String) = {
    val uri
= "ws://localhost:"+port+"/socket/open/device?deviceName="+deviceName+"&sessionId="+session
    val socket
= new PSSocketHandlerKeepMessage()
    val wsio
= new WebSocketIO(uri, socket)
    wsio
.start()
    val sessionWebSocket
= wsio.connect()
    checkSessionStatus
(sessionWebSocket)
   
(wsio, socket)
 
}


 
def checkSessionStatus(session : Session) = {
   
Logger.info("session opened ? = " + session.isOpen())
    session
.isOpen mustBe true
 
}


 
def createSimulator(byteValue : Byte, deviceName : String, session : String) = {
    val simulatorInput
= initSimulatorInput(byteValue)
    val kafkaOutput
= initKafkaOutput(deviceName, session)
   
new DeviceSimulator(simulatorInput, kafkaOutput)
 
}


 
def checkNumberOfMessage
   
(
      partitionKey
: String,
      deviceOutput
: DeviceOutput,
      webSocketOuput
: List[Array[Byte]]
   
) = {
    val pw
= new java.io.PrintWriter("/tmp/foo.txt" +i)
    i
+= 1
    val deviceOutputCount
= deviceOutput.count
    val webSocketOutputCount
= webSocketOuput.size
    pw
.write("pkey = " + partitionKey + "device count = " + deviceOutputCount + " --- web socket output = " + webSocketOutputCount)
    pw
.close
   
Logger.info("*********************************")
   
Logger.info("device count = " + deviceOutputCount + " --- web socket output = " + webSocketOutputCount)
   
Logger.info("*********************************")
   
(deviceOutput!=null) mustBe true
   
(webSocketOuput!=null) mustBe true
   
(deviceOutputCount>0) mustBe true
   
(webSocketOutputCount>0) mustBe true
    deviceOutputCount mustBe webSocketOutputCount
 
}


 
def checkMessageContent
 
(
    deviceOutput
: DeviceOutput,
    webSocketOutput
: List[Array[Byte]],
    exceptedContent
: Byte
 
) = {
    webSocketOutput
foreach { message : Array[Byte]=>
     
// Logger.info("message size = " + message.size)
      message
foreach { messageValue : Byte =>
       
// Logger.info("messageValue = " + messageValue + " --- excepted value = " + exceptedContent)
        messageValue mustBe exceptedContent
     
}
   
}
 
}




 
def produceToKafkaAndConsumeFromWS(deviceName : String,  session : String, byteValue : Byte) = {
    val simulator
= createSimulator(byteValue, deviceName, session)
    val
(wsio,socket) = initWebSocket(deviceName, session)
    wsio
.getHandler.await(awaitWebSocketInit, TimeUnit.SECONDS);
    simulator
.simulate(periodMs, duringMs)
    wsio
.getHandler.await(awaitKafka, TimeUnit.SECONDS)
    wsio
.stop
   
(simulator.output(), socket.webSocketOutput.asScala.toList)
 
}




 
"Publish from kafka to the websocket" should {
   
"have the same number of packets for the sender (kafka) and the receiver (websocket)" in {
      running
(test_server) {
        val byteValue
: Byte = 0
        val deviceName
= getDevicename
        val session
= "X"
        val
(simulatorOutput, wsOutput) = produceToKafkaAndConsumeFromWS(deviceName, session, byteValue)
        val pkey
= deviceName + "_" + session
        checkNumberOfMessage
(pkey, simulatorOutput, wsOutput)
     
}
   
}
    val numberOfDevices
= 1
   
"have for " + numberOfDevices + " devices the same number of packets for the sender (kafka) and the receiver (websocket) for every device" in {
      running
(test_server) {
       
(0 until (numberOfDevices))
         
.map ( i => (i.toByte, getDevicename+i, "S"+i))
         
.foreach { case (byteValue, deviceName, session) =>
            val pkey
= deviceName + "_" + session
            val
(simulatorOutput, wsOutput) = produceToKafkaAndConsumeFromWS(deviceName, session, byteValue)
            checkNumberOfMessage
(pkey, simulatorOutput, wsOutput)
       
}
     
}
   
}
 
}
}

I'm using Play 2.4.0 and the only difference between the two tests is the websocket URI (device name and session changed). I cannot fix my issue and I'm searching some advices to fix that. 

Thanking you in advance,
Ali-Firat Kilic.

Kilic Ali-Firat

unread,
Aug 26, 2016, 6:34:53 AM8/26/16
to play-framework
Hi guys, 

I found something to fix : after the first test, the akka system is shutdown and this is why the second test cannot connect to the websocket. How can I avoid a such behavior ?

Kilic Ali-Firat

unread,
Aug 26, 2016, 8:07:03 AM8/26/16
to play-framework
In the source code of play.api.test.Helpers, I found what I'm using in the test :

/**
* Executes a block of code in a running server.
*/
def running[T](testServer: TestServer)(block: => T): T = {
PlayRunners.mutex.synchronized {
try {
testServer.start()
block
} finally {
testServer.stop()
}
}
} In the code that I wrote, I called running twice so the ActorSystem must start at the second call but it's not the case. To pass the step, I have to put a `Play.start(FakeApplication())` in order to have a ActorSystem available. Is it a normal behavior ?

Greg Methvin

unread,
Aug 26, 2016, 8:12:04 AM8/26/16
to play-framework
Instead of reusing the same application (or server, which contains an application), create a new one for each test. Generally you can do this by changing your val to a def, so the server will be recreated each time the method is called.

--
You received this message because you are subscribed to the Google Groups "play-framework" group.
To unsubscribe from this group and stop receiving emails from it, send an email to play-framework+unsubscribe@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/play-framework/b7603315-167a-4759-acd7-cf1b24a94360%40googlegroups.com.

For more options, visit https://groups.google.com/d/optout.



--
Greg Methvin
Senior Software Engineer

Kilic Ali-Firat

unread,
Aug 26, 2016, 9:38:59 AM8/26/16
to play-framework


Le vendredi 26 août 2016 14:12:04 UTC+2, Greg Methvin a écrit :
Instead of reusing the same application (or server, which contains an application), create a new one for each test. Generally you can do this by changing your val to a def, so the server will be recreated each time the method is called.
Oh thank you for the tip, it works !  

On Fri, Aug 26, 2016 at 5:07 AM, Kilic Ali-Firat <kilic.a...@gmail.com> wrote:
In the source code of play.api.test.Helpers, I found what I'm using in the test :

/**
* Executes a block of code in a running server.
*/
def running[T](testServer: TestServer)(block: => T): T = {
PlayRunners.mutex.synchronized {
try {
testServer.start()
block
} finally {
testServer.stop()
}
}
} In the code that I wrote, I called running twice so the ActorSystem must start at the second call but it's not the case. To pass the step, I have to put a `Play.start(FakeApplication())` in order to have a ActorSystem available. Is it a normal behavior ?

--
You received this message because you are subscribed to the Google Groups "play-framework" group.
To unsubscribe from this group and stop receiving emails from it, send an email to play-framewor...@googlegroups.com.
Reply all
Reply to author
Forward
0 new messages