Still fighting to put in production a server based on Spray + Akka on Tomcat 7. Machines are EC2 instances.
But it's not important for my current problem.
For the purpose of this Post I've written a complete sample that minic my real server expect that I make nothing here just the logic around spray and akka.
So I make test with JMeter I don't try to overkill the server just make a light test:
When I up the threads to only 5 concurrents I begin to receive a lot of error with response : The requested resource could not be found.
package com.inneractive.exchange.test
import akka.actor.{ActorRef, Props, Actor}
import spray.routing.{ExceptionHandler, MissingQueryParamRejection, RejectionHandler, HttpService}
import scala.concurrent.duration._
import akka.pattern.{ask, pipe}
import spray.util.LoggingContext
import spray.http._
import spray.http.HttpResponse
import spray.http.HttpEntity
import akka.util.Timeout
import spray.httpx.marshalling.Marshaller
import scala.concurrent.{Await, ExecutionContext, Future}
import scala.util.control.NonFatal
import com.inneractive.services.events.exception.ClickHTTPOperationException
import scala.util.{Failure, Success}
/**
* User: richard
* Date: 11/6/13
* Time: 12:00 PM
*/
sealed trait Message
case class Message1(m: String, n: Int) extends Message
case class Message2(m: String, n: Int) extends Message
case class Message3(m: String, n: Int) extends Message
class Actor4 extends Actor {
implicit def executionContext = context.dispatcher
implicit val timeout = Timeout(2 seconds)
def receive: Actor4#Receive = {
case Message3(m, n) => {
context.system.scheduler.scheduleOnce(100 millis, self, Message3(m, n))
sender ! (m + " Count:" + n)
}
}
}
class Actor2 extends Actor {
def receive: Actor2#Receive = {
case Message2(m, n) => {
println("Get Message:" + m + " count:" + n)
sender ! Message3(m, n)
}
}
}
class Actor1 extends Actor {
implicit def executionContext = context.dispatcher
implicit val timeout = Timeout(2 seconds)
val actor2 = context.actorOf(Props[Actor2])
val actor4 = context.actorOf(Props[Actor4])
var controllerActor: ActorRef = _
var count = 0
def receive: Actor1#Receive = {
case Message1(m, n) => {
controllerActor = sender
count += 1
actor2 ! Message2(m, count)
}
case Message3(m, n) => {
val future = (actor4 ? Message3(m, n)).mapTo[String]
future pipeTo controllerActor
}
}
}
class TestController extends Actor with TestHttpService {
def actorRefFactory = context
def receive = runRoute(mappings)
}
trait TestHttpService extends HttpService {
implicit def executionContext = actorRefFactory.dispatcher
implicit val timeout = Timeout(2 seconds)
val actionActor = actorRefFactory.actorFor("/user/application/actor1")
implicit val rejectionHandler = RejectionHandler {
case MissingQueryParamRejection(paramName) :: _ => ctx => ctx.complete(StatusCodes.BadRequest)
}
/**
* Exception handler - when an exception is thrown, display fallback ad
*/
implicit def exceptionHandler(implicit log: LoggingContext) = ExceptionHandler {
case NonFatal(e: ClickHTTPOperationException) => ctx => {
ctx.complete("ERROR")
}
case NonFatal(e) => ctx => {
log.error(e, "Error during processing of request {}", ctx.request)
ctx.complete("ERROR")
}
}
def mappings(implicit log: LoggingContext) = {
get {
path("test") {
parameterMap {
requestParams => {
respondWithMediaType(MediaTypes.`text/html`) {
onComplete(actionActor ? Message1(requestParams.getOrElse("", ""), 0)) {
case Success(result : String) => complete(result)
case Failure(e) => reject
}
}
}
}
}
}
}
}