package xxx
import java.io.{BufferedInputStream, DataInputStream}
import java.nio.file.{Files, Path, StandardOpenOption}
import akka.stream.{Attributes, Outlet, SourceShape}
import akka.stream.stage.{GraphStage, GraphStageLogic, OutHandler}
import scalapb.{GeneratedMessage, GeneratedMessageCompanion}
import scala.util.{Failure, Success, Try}
class ProtobufFileSource[T <: GeneratedMessage with scalapb.Message[T] : GeneratedMessageCompanion](destination: Path)  extends GraphStage[SourceShape[T]] {
 private[this] val out = Outlet.create[T]("Protobuf.File.Source")
 private[this] val sourceShape = SourceShape.of(out)
 override def shape: SourceShape[T] = sourceShape
 override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape){
  val inputStream = new DataInputStream(
   new BufferedInputStream(
    Files.newInputStream(destination, StandardOpenOption.READ)))
  val offset = 0
  setHandler(out, new OutHandler {
   override def onPull(): Unit = {
    val bytesToRead: Int = Try(inputStream.readInt()) match {
     case Success(value) => value
     case Failure(exception) =>  -1
    }
    
    if(bytesToRead != -1){
     val bytes = Array.ofDim[Byte](bytesToRead)
     inputStream.read(bytes, offset, bytesToRead)
     // FIXME parse bytes to given type T
     val message: GeneratedMessage = ???
     push(out, message) 
    }else{
     onDownstreamFinish()
    }
   }
  })
  override def postStop(): Unit = {
   inputStream.close()
  }
 }
}
object ProtobufFileSource {
 def apply[T <: GeneratedMessage](destination: Path): ProtobufFileSource[T] = new ProtobufFileSource[T](destination)
}