val process = dummyCommand run new ProcessIO(
{in: OutputStream => ()},
{out => sendFirstResponse(out)},
{err => stderrPrinter(err) }
)
val myRoute =
path("stream") {
get{
withRangeSupport(){
respondWithMediaType(audioAac) {
sendStreamingResponse()
}
}
}
}
- So far I have not been able to implement caching. That would solve the problem with several request for the same stream, where only the first one would execute the command.
- I don't want to wait for the command to finish execution, since it can take up to 30 seconds to finish, hence I want to stream the data while the command is still running.
Any idea how I can solve the range support and implement caching? Is caching (using the various caching directives or another mechanism) possible when implementing a low level streaming actor like I do? Any other suggestion?
Thanks in advance.
Cheers,
Patrick
P.S. Working example project with the implementation can be downloaded here: http://extrabright.com/dl/spray-streaming-server.zip
[1] Streaming actor which sends back the result of a time consuming command as an AAC stream to the client (time consuming command here simulated by 'cat tmp/nature.aac')
case object Ack
def sendStreamingResponse()(ctx: RequestContext): Unit =
actorRefFactory.actorOf {
Props {
new Actor with ActorLogging {
val length = 8*1024
val buffer = new Array[Byte](length)
var count = 0
var bufferedStream: BufferedInputStream = null
var inputStream: InputStream = null
val dummyCommand = Seq(
"cat",
"tmp/nature.aac"
)
def stderrPrinter(err: InputStream) = {
Source.fromInputStream(err).getLines().foreach(
line => log.info("[err] " + line)
)
}
val process = dummyCommand run new ProcessIO(
{in: OutputStream => ()},
{out => sendFirstResponse(out)},
{err => stderrPrinter(err) }
)
def sendFirstResponse(input: InputStream) {
inputStream = input
bufferedStream = new BufferedInputStream(input)
log.info("Input stream is {}, buffered stream is {}, buffer is {}", input, bufferedStream, buffer)
val bytesRead = bufferedStream.read(buffer, 0, length)
count += bytesRead
val newBuffer = if (bytesRead == length) buffer else buffer.slice(0, bytesRead)
val firstResponse: HttpResponse = HttpResponse(entity = HttpEntity(audioAac, newBuffer))
ctx.responder ! ChunkedResponseStart(firstResponse).withAck(Ack)
log.info(">>> sent {} bytes (first response)", bytesRead)
}
def sendSubsequentResponse() {
val bytesRead = bufferedStream.read(buffer, 0, length)
count += bytesRead
if (bytesRead != -1) {
val newBuffer = if (bytesRead == length) buffer else buffer.slice(0, bytesRead)
ctx.responder ! MessageChunk(newBuffer).withAck(Ack)
}
else /* end of stdout */ {
log.info("Std out end, read {} bytes", count)
ctx.responder ! ChunkedMessageEnd
}
}
def receive = {
case Ack => sendSubsequentResponse()
case ev: Http.ConnectionClosed =>
log.warning("Stopping response streaming due to {}", ev)
case any =>
log.warning("Received another msg {}", any)
}
}
}
}
Cheers,Patrick