Restarting a stream with a bursty Source

46 views
Skip to first unread message

Brian Topping

unread,
Jun 12, 2016, 4:04:11 AM6/12/16
to Akka User List
Hi all, I have a very small stream that takes data from a GraphStage and pushes it to a file: 

val (killSwitch, flow) = dataSource
.viaMat(KillSwitches.single)(Keep.right)
.toMat(FileIO.toPath(Paths.get("/home/brian/test")))(Keep.both)
.run()

The dataSource is a GraphStage that takes data from a native method that talks to hardware. There's a fairly large amount of data that comes in (~1MB/s). The template for the GraphStage is https://github.com/akka/akka/blob/bd8fcc9d9aeafc2c49745e7519d13b3e0abc18e6/akka-docs/rst/scala/code/docs/stream/GraphStageDocSpec.scala#L48-L68 with an onPull() that looks like this:

override def onPull(): Unit = {
if (isAvailable(out)) {
val pulled: ByteString = nativePull()
if (pulled != null) {
push(out, pulled)
}
}
}

In some cases, nativePull() does not have any data from the device. In that case, the code does not do a push(). 

I've seen that the GraphStage only gets a single onPull() -- if it does not return data, the stream stalls. 

What is the correct means to restart the calls to onPull() once it's determined that a call to nativePull() would be successful? Maybe it's as simple as the call to nativePull() should block? A Source that has no data definitely qualifies the entire stream to not idly spin with no data to work on...

Thanks! Brian

Brian Topping

unread,
Jun 12, 2016, 7:44:22 PM6/12/16
to akka...@googlegroups.com
Ah, I think I have the answer that I need, it’s to use getAsyncCallback which allows unrestricted entry to the graph. I didn’t see the connection looking through the source and the docs catch my eye for some reason to start with.

Cheers, Brian
 
--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to a topic in the Google Groups "Akka User List" group.
To unsubscribe from this topic, visit https://groups.google.com/d/topic/akka-user/JlFj8mzvBSA/unsubscribe.
To unsubscribe from this group and all its topics, send an email to akka-user+...@googlegroups.com.
To post to this group, send email to akka...@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

signature.asc
Reply all
Reply to author
Forward
0 new messages