class GnipStreamHttpClient(host: String, account: String, processor: ActorRef) extends Actor with ActorLogging {this: Authorization =>private val system = context.systemprivate val endpoint = Uri(s"https://$host/somepath")private implicit val executionContext = system.dispatcherprivate implicit val flowMaterializer: Materializer = ActorMaterializer(ActorMaterializerSettings(system))val client = Http(system).outgoingConnectionTls(host, port, settings = ClientConnectionSettings(system))override def receive: Receive = {case response: HttpResponse if response.status.intValue / 100 == 2 =>response.entity.dataBytes.map(processor ! _).runWith(Sink.ignore)case response: HttpResponse =>log.info(s"Got unsuccessful response $response")case _ =>val req = HttpRequest(GET, endpoint).withHeaders(`Accept-Encoding`(gzip), Connection("Keep-Alive")) ~> authorizelog.info(s"Making request: $req")Source.single(req).via(client).runWith(Sink.head).pipeTo(self)}}
curl --compressed -v -uuser:pass https://my.streaming.api.com/somepath
val connection = new java.net.URL("...").openConnection()
val url = new URL(endpoint.toString)
val connection = url.openConnection().asInstanceOf[HttpURLConnection]
connection.setRequestProperty("Authorization", req.getHeader("Authorization").get.value)
connection.setRequestProperty("Accept-Encoding", req.getHeader("Accept-Encoding").get.value)
InputStreamSource(() => new GZIPInputStream(connection.getInputStream)).map { processor ! _ }.runWith(Sink.ignore)
def gunzip(bytes: Array[Byte]) = {
val output = new ByteArrayOutputStream()
FileUtils.copyAll(new GZIPInputStream(new ByteArrayInputStream(bytes)), output))
output.toString
}
... // further in the code as part of my Flow graph
.map(byteString => gunzip(byteString.toArray()))
.via(Gzip.decoderFlow)