class TEST_KAFKA_TO_WEBSOCKET extends PlaySpec with BeforeAndAfterAll {
var i = 0
val port = 8080
val test_server = TestServer(port)
val periodMs = 5L
val duringMs = 500L
val awaitWebSocketInit = 1
val awaitKafka = 5
def getDevicename = {
val config = Play.application().configuration()
config.getString("test.deviceName")
}
def initKafkaOutput(deviceName : String, session : String) = {
val config = Play.application().configuration()
val topic = config.getString("kafka.topic")
val partitionNumber = config.getInt("kafka.partitionNumber")
val saslPath = getClass.getResource("/resources/sasl.conf").getPath
KafkaSettings.sasl(saslPath)
val kafkaProducer = KafkaSettings.producer()
kafkaProducer.put("ssl.truststore.location", getClass().getResource("/resources/trustore.jks").getPath)
val partitionKey = deviceName + "_" + session
Logger.info("Partition key -> " + partitionKey)
// custom class wrapping kafka producer
new KafkaOutput(kafkaProducer, topic, partitionNumber, partitionKey)
}
def initSimulatorInput(x : Byte) = new ConstantInput(x)
def initWebSocket(deviceName : String, session : String) = {
val uri = "ws://localhost:"+port+"/socket/open/device?deviceName="+deviceName+"&sessionId="+session
val socket = new PSSocketHandlerKeepMessage()
val wsio = new WebSocketIO(uri, socket)
wsio.start()
val sessionWebSocket = wsio.connect()
checkSessionStatus(sessionWebSocket)
(wsio, socket)
}
def checkSessionStatus(session : Session) = {
Logger.info("session opened ? = " + session.isOpen())
session.isOpen mustBe true
}
def createSimulator(byteValue : Byte, deviceName : String, session : String) = {
val simulatorInput = initSimulatorInput(byteValue)
val kafkaOutput = initKafkaOutput(deviceName, session)
new DeviceSimulator(simulatorInput, kafkaOutput)
}
def checkNumberOfMessage
(
partitionKey : String,
deviceOutput : DeviceOutput,
webSocketOuput : List[Array[Byte]]
) = {
val pw = new java.io.PrintWriter("/tmp/foo.txt" +i)
i += 1
val deviceOutputCount = deviceOutput.count
val webSocketOutputCount = webSocketOuput.size
pw.write("pkey = " + partitionKey + "device count = " + deviceOutputCount + " --- web socket output = " + webSocketOutputCount)
pw.close
Logger.info("*********************************")
Logger.info("device count = " + deviceOutputCount + " --- web socket output = " + webSocketOutputCount)
Logger.info("*********************************")
(deviceOutput!=null) mustBe true
(webSocketOuput!=null) mustBe true
(deviceOutputCount>0) mustBe true
(webSocketOutputCount>0) mustBe true
deviceOutputCount mustBe webSocketOutputCount
}
def checkMessageContent
(
deviceOutput : DeviceOutput,
webSocketOutput : List[Array[Byte]],
exceptedContent : Byte
) = {
webSocketOutput foreach { message : Array[Byte]=>
// Logger.info("message size = " + message.size)
message foreach { messageValue : Byte =>
// Logger.info("messageValue = " + messageValue + " --- excepted value = " + exceptedContent)
messageValue mustBe exceptedContent
}
}
}
def produceToKafkaAndConsumeFromWS(deviceName : String, session : String, byteValue : Byte) = {
val simulator = createSimulator(byteValue, deviceName, session)
val (wsio,socket) = initWebSocket(deviceName, session)
wsio.getHandler.await(awaitWebSocketInit, TimeUnit.SECONDS);
simulator.simulate(periodMs, duringMs)
wsio.getHandler.await(awaitKafka, TimeUnit.SECONDS)
wsio.stop
(simulator.output(), socket.webSocketOutput.asScala.toList)
}
"Publish from kafka to the websocket" should {
"have the same number of packets for the sender (kafka) and the receiver (websocket)" in {
running(test_server) {
val byteValue : Byte = 0
val deviceName = getDevicename
val session = "X"
val (simulatorOutput, wsOutput) = produceToKafkaAndConsumeFromWS(deviceName, session, byteValue)
val pkey = deviceName + "_" + session
checkNumberOfMessage(pkey, simulatorOutput, wsOutput)
}
}
val numberOfDevices = 1
"have for " + numberOfDevices + " devices the same number of packets for the sender (kafka) and the receiver (websocket) for every device" in {
running(test_server) {
(0 until (numberOfDevices))
.map ( i => (i.toByte, getDevicename+i, "S"+i))
.foreach { case (byteValue, deviceName, session) =>
val pkey = deviceName + "_" + session
val (simulatorOutput, wsOutput) = produceToKafkaAndConsumeFromWS(deviceName, session, byteValue)
checkNumberOfMessage(pkey, simulatorOutput, wsOutput)
}
}
}
}
}