Closing resources in the Akka Stream

1,135 views
Skip to first unread message

Petr Janda

unread,
May 26, 2015, 10:42:54 AM5/26/15
to akka...@googlegroups.com
Hi guys,

I was wondering what is the best practice used in Akka Streams to clean up opened resources. My example use case is the stream reading lines from the file, streaming them to Apache Kafka (using https://github.com/softwaremill/reactive-kafka subscriber). See the example code here:

val file = io.Source.fromFile(path)
val lines = file.getLines()
val kafka = new ReactiveKafka(host = "localhost:9092", zooKeeperHost = "localhost:2181")
val subscriber = kafka.publish("uppercaseStrings", "groupName", new StringEncoder())

Source(() => lines)
  .map(_.toUpperCase)
  .to(Sink(subscriber))
  .run()

As soon as the flow is done I would like to cleanup and close the file input stream. One way I used to go about that is to have Sink.onComplete or Sink.fold although this is not viable here. Also, ideally I would like to close the file in case of any error. 

Could you advice on any idiomatic way to do this? 

Thanks,
~Petr

 

Konrad Malawski

unread,
May 29, 2015, 12:27:48 PM5/29/15
to Akka User List
Hi Petr,
Firstly - do not use the io.Source + getLines trick to get lines from a File, it's horribly slow :-)
Instead use the SynchronousFileSource*as shown in stream-io.html#Streaming_File_IO.
It's much faster and also takes care of closing the File properly in case of completion or failure.
You'll want to use the parseLines cookbook recipe for the time being for parsing lines:

We already have a built-in lines parser in the works and it will be provided in the next RC of Akka Streams:

Secondly, more generally speaking in cases like these you can broadcast to 2 sinks, one of them being an onComplete sink and then you
can use this sink as the "on completion do this and that" signal.


* Synchronous because it's using blocking API, because non-blocking API is not available on Java 6 which Akka Streams have to support currently. We have a ticket for the async file source https://github.com/akka/akka/issues/17269 and it will be provided later on (once Akka Streams join Akka 2.4 with requiring Java 8).

--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+...@googlegroups.com.
To post to this group, send email to akka...@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.



--
Cheers,
Konrad 'ktoso' Malawski

Petr Janda

unread,
May 30, 2015, 6:31:48 PM5/30/15
to akka...@googlegroups.com
Hi Konrad,

thanks for elaboration and advice about io.Source. I've heard that io.Source had some design flaws but wasn't sure what it was specifically. Will try your suggestion.

Regarding the Flow, what you suggest is very close to what I've end up doing, as I've made a Flow which effectively transforms a Sink to Flow like this:

object SinkFlow {
 
def apply[T](sink:Sink[T, _]): Flow[T, T, _] = Flow() { implicit b =>

   
import akka.stream.scaladsl.FlowGraph.Implicits._


    val bcast
= b.add(Broadcast[T](2))


    bcast
.out(0) ~> b.add(sink)


   
(bcast.in, bcast.out(1))
 
}
}

which allowed me to connect multiple Sinks into the flow as:

source
   
.via(...)
   
.via(SinkFlow(sink))
   
.to(Sink.onComplete { ... }).run()

I was just wondering if I could have overlooked something more "built in".

Thanks!
~Petr

Konrad Malawski

unread,
May 31, 2015, 6:06:45 PM5/31/15
to Akka User List
Looks good Petr!
No, we do not have a built-in combinator like this, I can see it being useful so perhaps it's worth rising an issue on akka/akka, if you'd have a moment to spare to explain the use case in a ticket - thanks!
Reply all
Reply to author
Forward
0 new messages