Live Audio Streaming Server using Spray

42 views
Skip to first unread message

Pat Jayet

unread,
Dec 31, 2015, 1:30:15 AM12/31/15
to spray.io User List
Hi all,

I am exposing a command generating live audio on stdout as a REST service with Spray. I have got a working version right now which still has a couple of caveats.

What is working:
- I am using a streaming actor to send back the response in chunks in order not to saturate the network (see [1]).

- That's how I start the command and hook myself to stdout:

          val process = dummyCommand run new ProcessIO(

            {in: OutputStream => ()},

            {out => sendFirstResponse(out)},

            {err => stderrPrinter(err) }

          )


What is not working:
- Since the service is streaming AAC audio files, I am getting more than one requests for each stream (that is normal behavior for streaming). However my implementation deals poorly with that, since for each incoming request a new command is run (which is quite time consuming and inefficient).

- In the current version, I am using the withRangeSupport directive. It is however not working properly (actually I am even not sure that this directive is compatible with the streaming actor).

  val myRoute =

    path("stream") {

        get{

          withRangeSupport(){

            respondWithMediaType(audioAac) {

                sendStreamingResponse()

            }

          }

        }

      }


- So far I have not been able to implement caching. That would solve the problem with several request for the same stream, where only the first one would execute the command.


- I don't want to wait for the command to finish execution, since it can take up to 30 seconds to finish, hence I want to stream the data while the command is still running.


Any idea how I can solve the range support and implement caching? Is caching (using the various caching directives or another mechanism) possible when implementing a low level streaming actor like I do? Any other suggestion?


Thanks in advance.

Cheers,


Patrick


P.S. Working example project with the implementation can be downloaded here: http://extrabright.com/dl/spray-streaming-server.zip




[1] Streaming actor which sends back the result of a time consuming command as an AAC stream to the client (time consuming command here simulated by 'cat tmp/nature.aac')


  case object Ack


  def sendStreamingResponse()(ctx: RequestContext): Unit =

    actorRefFactory.actorOf {

      Props {

        new Actor with ActorLogging {


          val length = 8*1024

          val buffer = new Array[Byte](length)

          var count = 0


          var bufferedStream: BufferedInputStream = null

          var inputStream: InputStream = null

          

          val dummyCommand = Seq(

            "cat"

            "tmp/nature.aac"

          )


          def stderrPrinter(err: InputStream) = {

            Source.fromInputStream(err).getLines().foreach(

              line => log.info("[err] " + line)

            )

          }

          

          val process = dummyCommand run new ProcessIO(

            {in: OutputStream => ()},

            {out => sendFirstResponse(out)},

            {err => stderrPrinter(err) }

          )


          def sendFirstResponse(input: InputStream) {

            inputStream = input

            bufferedStream = new BufferedInputStream(input)

            

            log.info("Input stream is {}, buffered stream is {}, buffer is {}", input, bufferedStream, buffer)

            val bytesRead = bufferedStream.read(buffer, 0, length)

            count += bytesRead

            

            val newBuffer = if (bytesRead == length) buffer else buffer.slice(0, bytesRead)

            val firstResponse: HttpResponse = HttpResponse(entity = HttpEntity(audioAac, newBuffer))

            ctx.responder ! ChunkedResponseStart(firstResponse).withAck(Ack)

            log.info(">>> sent {} bytes (first response)", bytesRead)

          }


          def sendSubsequentResponse() {

            val bytesRead = bufferedStream.read(buffer, 0, length)

            count += bytesRead

            

            if (bytesRead != -1) {

              val newBuffer = if (bytesRead == length) buffer else buffer.slice(0, bytesRead)

              ctx.responder ! MessageChunk(newBuffer).withAck(Ack)

            }

            else /* end of stdout */ {

              log.info("Std out end, read {} bytes", count)

              ctx.responder ! ChunkedMessageEnd

            }

          }

          

          def receive = {

            case Ack => sendSubsequentResponse()


            case ev: Http.ConnectionClosed =>

              log.warning("Stopping response streaming due to {}", ev)

              

            case any =>

              log.warning("Received another msg {}", any)

          }

        }

      }

    }

Pat Jayet

unread,
Mar 14, 2016, 1:54:37 PM3/14/16
to spray.io User List
Hi all,

For the record: I have a working solution of the streamer with spray. What I needed to add was a limitation of the streaming rate. The implementation I had just waiting the Ack from the network layer before sending the next chunk was not enough (the client audio streamer was overrun with data, was resetting its connection and therefore opening a new data connection to the spray server regularly).

The solution that worked was to use an Akka scheduler to send a tick regularly in order to send a chunked message to the client.

Pat Jayet

unread,
Mar 14, 2016, 1:55:15 PM3/14/16
to spray.io User List
Cheers,
Patrick

Alex

unread,
Mar 16, 2016, 7:14:36 PM3/16/16
to spray.io User List
cool, is this open-source?

On Monday, March 14, 2016 at 1:55:15 PM UTC-4, Pat Jayet wrote:
Cheers,
Patrick

Pat Jayet

unread,
Mar 17, 2016, 5:58:18 PM3/17/16
to spray.io User List
Hi Alex,

No it is not. Though the current solution I use is quite similar to the one sketched in my first post above. There is in addition the regular tick I mention.

If it interests people though, I could upload a version on github. Ping me in this case.

Cheers,
Pat
Reply all
Reply to author
Forward
0 new messages