Hi all, I have a very small stream that takes data from a GraphStage and pushes it to a file:
val (killSwitch, flow) = dataSource
.viaMat(KillSwitches.single)(Keep.right)
.toMat(FileIO.toPath(Paths.get("/home/brian/test")))(Keep.both)
.run()
override def onPull(): Unit = {
if (isAvailable(out)) {
val pulled: ByteString = nativePull()
if (pulled != null) {
push(out, pulled)
}
}
}
In some cases, nativePull() does not have any data from the device. In that case, the code does not do a push().
I've seen that the GraphStage only gets a single onPull() -- if it does not return data, the stream stalls.
What is the correct means to restart the calls to onPull() once it's determined that a call to nativePull() would be successful? Maybe it's as simple as the call to nativePull() should block? A Source that has no data definitely qualifies the entire stream to not idly spin with no data to work on...
Thanks! Brian