Generic parser for arbitrary messages

36 views
Skip to first unread message

Marc Schlegel

unread,
Oct 26, 2019, 1:35:06 PM10/26/19
to ScalaPB
Hello everyone

I am implementing a Akka-Streams Source and Sink to write and read many Protobuf-Messages to a single file. Writing was the easy part using GeneratedMessage to convert the generic type to a byte[].

Unfortunately I am struggling to figure out a way to get a generic parser working, where the type is only available at runtime. I was hoping to use something like GeneratedMessage.parseFrom(bytes, classof[T]) but since I dont have any actual type I cannot get a hold on any of the provided classes.
The only thing I have is the generic type and a byte[].

package xxx

import java.io.{BufferedInputStream, DataInputStream}
import java.nio.file.{Files, Path, StandardOpenOption}

import akka.stream.{Attributes, Outlet, SourceShape}
import akka.stream.stage.{GraphStage, GraphStageLogic, OutHandler}
import scalapb.{GeneratedMessage, GeneratedMessageCompanion}

import scala.util.{Failure, Success, Try}

class ProtobufFileSource[T <: GeneratedMessage with scalapb.Message[T] : GeneratedMessageCompanion](destination: Path)  extends GraphStage[SourceShape[T]] {
 
private[this] val out = Outlet.create[T]("Protobuf.File.Source")
 
private[this] val sourceShape = SourceShape.of(out)

 
override def shape: SourceShape[T] = sourceShape

 override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape){

 
val inputStream = new DataInputStream(
   
new BufferedInputStream(
   
Files.newInputStream(destination, StandardOpenOption.READ)))

 
val offset = 0

  setHandler(out, new OutHandler {
   
override def onPull(): Unit = {

   
val bytesToRead: Int = Try(inputStream.readInt()) match {
     
case Success(value) => value
     
case Failure(exception) =>  -1
    }
   
   
if(bytesToRead != -1){
     
val bytes = Array.ofDim[Byte](bytesToRead)
     
inputStream.read(bytes, offset, bytesToRead)
     
// FIXME parse bytes to given type T
     val message: GeneratedMessage = ???

     push(out, message)
   
}else{
     onDownstreamFinish
()
   
}
   
}
 
})

 
override def postStop(): Unit = {
   
inputStream.close()
 
}
 
}
}

object ProtobufFileSource {
 
def apply[T <: GeneratedMessage](destination: Path): ProtobufFileSource[T] = new ProtobufFileSource[T](destination)
}


Is this somehow supported?

regards
Marc

Nadav Samet

unread,
Oct 26, 2019, 1:44:25 PM10/26/19
to Marc Schlegel, ScalaPB
Hi Marc, there is a a parseFrom(b: Array[Byte]):T defined on the GeneratedMessageCompanion[T] trait.

So
class ProtobufFileSource[T <: GeneratedMessage with scalapb.Message[T]](destination: Path)(implicit cmp: GenereatedMessageCompanion[T])  extends GraphStage[SourceShape[T]] {
  ...
  cmp.parseFrom(...bytes)
}

--
You received this message because you are subscribed to the Google Groups "ScalaPB" group.
To unsubscribe from this group and stop receiving emails from it, send an email to scalapb+u...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/scalapb/69f9c919-aa75-4bd6-b334-9187cfec92f0%40googlegroups.com.


--
-Nadav

Marc Schlegel

unread,
Oct 26, 2019, 3:58:23 PM10/26/19
to ScalaPB
Thanks for the weekend-reply :-)
To unsubscribe from this group and stop receiving emails from it, send an email to sca...@googlegroups.com.


--
-Nadav
Reply all
Reply to author
Forward
0 new messages