package xxx
import java.io.{BufferedInputStream, DataInputStream}
import java.nio.file.{Files, Path, StandardOpenOption}
import akka.stream.{Attributes, Outlet, SourceShape}
import akka.stream.stage.{GraphStage, GraphStageLogic, OutHandler}
import scalapb.{GeneratedMessage, GeneratedMessageCompanion}
import scala.util.{Failure, Success, Try}
class ProtobufFileSource[T <: GeneratedMessage with scalapb.Message[T] : GeneratedMessageCompanion](destination: Path) extends GraphStage[SourceShape[T]] {
private[this] val out = Outlet.create[T]("Protobuf.File.Source")
private[this] val sourceShape = SourceShape.of(out)
override def shape: SourceShape[T] = sourceShape
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape){
val inputStream = new DataInputStream(
new BufferedInputStream(
Files.newInputStream(destination, StandardOpenOption.READ)))
val offset = 0
setHandler(out, new OutHandler {
override def onPull(): Unit = {
val bytesToRead: Int = Try(inputStream.readInt()) match {
case Success(value) => value
case Failure(exception) => -1
}
if(bytesToRead != -1){
val bytes = Array.ofDim[Byte](bytesToRead)
inputStream.read(bytes, offset, bytesToRead)
// FIXME parse bytes to given type T
val message: GeneratedMessage = ???
push(out, message)
}else{
onDownstreamFinish()
}
}
})
override def postStop(): Unit = {
inputStream.close()
}
}
}
object ProtobufFileSource {
def apply[T <: GeneratedMessage](destination: Path): ProtobufFileSource[T] = new ProtobufFileSource[T](destination)
}