Unmarshalling failed for decode String from "text/html" HttpEntity From HttpResponse

89 views
Skip to first unread message

Sarah Yin

unread,
Aug 16, 2016, 2:07:56 AM8/16/16
to Akka User List
Hi Akka User List,


I'm trying to get the text response from httpResponse as a string. Here's my http call.
val responseFuture: Future[HttpResponse] = Http().singleRequest(HttpRequest(uri = uri))


However some how I did not have a lot luck in getting the body. Here are the methods I've tried.

responseFuture onComplete {
case Success(response) =>
val timeout = 300.millis
response.status match {
case StatusCodes.OK =>
println("printing content")
println(response.entity.dataBytes.runForeach(println))
case _ =>
println("failed to get dynamic url")
}
case Failure(error) =>
println(error.toString)
}
However this prints empty response
------------------------------------------------------------------------------------------------------------------

responseFuture onComplete {
case Success(response) =>
val timeout = 300.millis
response.status match {
case StatusCodes.OK =>
println("printing content")
println(response.entity.dataBytes.map(_.utf8String))
case _ =>
println("failed to get dynamic url")
}
case Failure(error) =>
println(error.toString)
}
However this returns the string of the object instead akka.stream.scaladsl.Source@2dae110b
------------------------------------------------------------------------------------------------------------------

responseFuture onComplete {
case Success(response) =>

val timeout = 300.millis
response.status
response.status match {

case StatusCodes.OK =>

println("printing content")
response.entity.dataBytes.map(line =>
response.entity.dataBytes.map(line => println(line))

case _ =>

println("failed to get dynamic url")
}

}
case Failure(error) =>

println(error.toString)
}
}
However this prints empty response
------------------------------------------------------------------------------------------------------------------

responseFuture onComplete {
case Success(response) =>

val timeout = 300.millis
response.status
response.status match {

case StatusCodes.OK =>

println("printing content")

println(response.entity.toStrict(5 seconds).map(_.data.decodeString("UTF-8")))

case _ =>

println("failed to get dynamic url")
}

}
case Failure(error) =>

println(error.toString)
}
}
However this shows Error in stage [unknown-operation]: Promise already completed.

[default-akka.actor.default-dispatcher-47] [akka://default/user/StreamSupervisor-0/flow-40-0-unknown-operation] Error in stage [unknown-operation]: Promise already completed.

java.lang.IllegalStateException: Promise already completed.

java.lang.IllegalStateException: Promise already completed.

at scala.concurrent.Promise$class.complete(Promise.scala:55)

at scala.concurrent.Promise$class.complete(Promise.scala:55)

at scala.concurrent.impl.Promise$DefaultPromise.complete(Promise.scala:153)

at scala.concurrent.impl.Promise$DefaultPromise.complete(Promise.scala:153)

at scala.concurrent.Promise$class.failure(Promise.scala:104)

at scala.concurrent.Promise$class.failure(Promise.scala:104)

at scala.concurrent.impl.Promise$DefaultPromise.failure(Promise.scala:153)

at scala.concurrent.impl.Promise$DefaultPromise.failure(Promise.scala:153)

at akka.http.impl.util.StreamUtils$$anon$2.onUpstreamFailure(StreamUtils.scala:76)

at akka.http.impl.util.StreamUtils$$anon$2.onUpstreamFailure(StreamUtils.scala:76)

at akka.http.impl.util.StreamUtils$$anon$2.onUpstreamFailure(StreamUtils.scala:69)

at akka.http.impl.util.StreamUtils$$anon$2.onUpstreamFailure(StreamUtils.scala:69)

at akka.stream.stage.AbstractStage$PushPullGraphLogic$$anon$1.onUpstreamFailure(Stage.scala:62)

at akka.stream.stage.AbstractStage$PushPullGraphLogic$$anon$1.onUpstreamFailure(Stage.scala:62)

at akka.stream.impl.fusing.GraphInterpreter.processEvent(GraphInterpreter.scala:621)

at akka.stream.impl.fusing.GraphInterpreter.processEvent(GraphInterpreter.scala:621)

at akka.stream.impl.fusing.GraphInterpreter.execute(GraphInterpreter.scala:535)

at akka.stream.impl.fusing.GraphInterpreter.execute(GraphInterpreter.scala:535)

at akka.stream.impl.fusing.GraphInterpreterShell.runBatch(ActorGraphInterpreter.scala:443)

at akka.stream.impl.fusing.GraphInterpreterShell.runBatch(ActorGraphInterpreter.scala:443)

at akka.stream.impl.fusing.GraphInterpreterShell.init(ActorGraphInterpreter.scala:364)

at akka.stream.impl.fusing.GraphInterpreterShell.init(ActorGraphInterpreter.scala:364)

at akka.stream.impl.fusing.ActorGraphInterpreter.tryInit(ActorGraphInterpreter.scala:502)

at akka.stream.impl.fusing.ActorGraphInterpreter.tryInit(ActorGraphInterpreter.scala:502)

at akka.stream.impl.fusing.ActorGraphInterpreter.preStart(ActorGraphInterpreter.scala:539)

at akka.stream.impl.fusing.ActorGraphInterpreter.preStart(ActorGraphInterpreter.scala:539)

at akka.actor.Actor$class.aroundPreStart(Actor.scala:485)

at akka.actor.Actor$class.aroundPreStart(Actor.scala:485)

at akka.stream.impl.fusing.ActorGraphInterpreter.aroundPreStart(ActorGraphInterpreter.scala:493)

at akka.stream.impl.fusing.ActorGraphInterpreter.aroundPreStart(ActorGraphInterpreter.scala:493)

at akka.actor.ActorCell.create(ActorCell.scala:590)

at akka.actor.ActorCell.create(ActorCell.scala:590)

at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:461)

at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:461)

at akka.actor.ActorCell.systemInvoke(ActorCell.scala:483)

at akka.actor.ActorCell.systemInvoke(ActorCell.scala:483)

at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:282)

at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:282)

at akka.dispatch.Mailbox.run(Mailbox.scala:223)

at akka.dispatch.Mailbox.run(Mailbox.scala:223)

at akka.dispatch.Mailbox.exec(Mailbox.scala:234)

at akka.dispatch.Mailbox.exec(Mailbox.scala:234)

at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)

at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)

at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)

at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)

at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)

at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)

at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
------------------------------------------------------------------------------------------------------------------

responseFuture onComplete {
case Success(response) =>

val timeout = 300.millis
response.status
response.status match {

case StatusCodes.OK =>

println("printing content")
val entity = Unmarshal(response.entity).to[String]
entity onComplete { entity =>

entity onComplete { entity =>
println(entity)
}

}
case _ =>

println("failed to get dynamic url")
}

}
case Failure(error) =>

println(error.toString)
}
}
However this shows Error in stage [unknown-operation]: Promise already completed.
[default-akka.actor.default-dispatcher-58] [akka://default/user/StreamSupervisor-0/flow-45-0-unknown-operation] Error in stage [unknown-operation]: Promise already completed.

java.lang.IllegalStateException: Promise already completed.

java.lang.IllegalStateException: Promise already completed.

at scala.concurrent.Promise$class.complete(Promise.scala:55)

at scala.concurrent.Promise$class.complete(Promise.scala:55)

at scala.concurrent.impl.Promise$DefaultPromise.complete(Promise.scala:153)

at scala.concurrent.impl.Promise$DefaultPromise.complete(Promise.scala:153)

at scala.concurrent.Promise$class.failure(Promise.scala:104)

at scala.concurrent.Promise$class.failure(Promise.scala:104)

at scala.concurrent.impl.Promise$DefaultPromise.failure(Promise.scala:153)

at scala.concurrent.impl.Promise$DefaultPromise.failure(Promise.scala:153)

at akka.http.impl.util.StreamUtils$$anon$2.onUpstreamFailure(StreamUtils.scala:76)

at akka.http.impl.util.StreamUtils$$anon$2.onUpstreamFailure(StreamUtils.scala:76)

at akka.http.impl.util.StreamUtils$$anon$2.onUpstreamFailure(StreamUtils.scala:69)

at akka.http.impl.util.StreamUtils$$anon$2.onUpstreamFailure(StreamUtils.scala:69)

at akka.stream.stage.AbstractStage$PushPullGraphLogic$$anon$1.onUpstreamFailure(Stage.scala:62)

at akka.stream.stage.AbstractStage$PushPullGraphLogic$$anon$1.onUpstreamFailure(Stage.scala:62)

at akka.stream.impl.fusing.GraphInterpreter.processEvent(GraphInterpreter.scala:621)

at akka.stream.impl.fusing.GraphInterpreter.processEvent(GraphInterpreter.scala:621)

at akka.stream.impl.fusing.GraphInterpreter.execute(GraphInterpreter.scala:535)

at akka.stream.impl.fusing.GraphInterpreter.execute(GraphInterpreter.scala:535)

at akka.stream.impl.fusing.GraphInterpreterShell.runBatch(ActorGraphInterpreter.scala:443)

at akka.stream.impl.fusing.GraphInterpreterShell.runBatch(ActorGraphInterpreter.scala:443)

at akka.stream.impl.fusing.GraphInterpreterShell.init(ActorGraphInterpreter.scala:364)

at akka.stream.impl.fusing.GraphInterpreterShell.init(ActorGraphInterpreter.scala:364)

at akka.stream.impl.fusing.ActorGraphInterpreter.tryInit(ActorGraphInterpreter.scala:502)

at akka.stream.impl.fusing.ActorGraphInterpreter.tryInit(ActorGraphInterpreter.scala:502)

at akka.stream.impl.fusing.ActorGraphInterpreter.preStart(ActorGraphInterpreter.scala:539)

at akka.stream.impl.fusing.ActorGraphInterpreter.preStart(ActorGraphInterpreter.scala:539)

at akka.actor.Actor$class.aroundPreStart(Actor.scala:485)

at akka.actor.Actor$class.aroundPreStart(Actor.scala:485)

at akka.stream.impl.fusing.ActorGraphInterpreter.aroundPreStart(ActorGraphInterpreter.scala:493)

at akka.stream.impl.fusing.ActorGraphInterpreter.aroundPreStart(ActorGraphInterpreter.scala:493)

at akka.actor.ActorCell.create(ActorCell.scala:590)

at akka.actor.ActorCell.create(ActorCell.scala:590)

at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:461)

at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:461)

at akka.actor.ActorCell.systemInvoke(ActorCell.scala:483)

at akka.actor.ActorCell.systemInvoke(ActorCell.scala:483)

at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:282)

at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:282)

at akka.dispatch.Mailbox.run(Mailbox.scala:223)

at akka.dispatch.Mailbox.run(Mailbox.scala:223)

at akka.dispatch.Mailbox.exec(Mailbox.scala:234)

at akka.dispatch.Mailbox.exec(Mailbox.scala:234)

at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)

at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)

at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)

at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)

at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)

at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)

at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
------------------------------------------------------------------------------------------------------------------
responseFuture onComplete {
case Success(response) =>

val timeout = 300.millis
response.status
response.status match {

case StatusCodes.OK =>

println("printing content")

println(PredefinedFromEntityUnmarshallers.stringUnmarshaller(response.entity))

case _ =>

println("failed to get dynamic url")
}

}
case Failure(error) =>

println(error.toString)
}
}
However this also shows Error in stage [unknown-operation]: Promise already completed.
------------------------------------------------------------------------------------------------------------------

responseFuture onComplete {
case Success(response) =>

val timeout = 300.millis
response.status
response.status match {

case StatusCodes.OK =>

println("printing content")

println(response.entity.toStrict(5 seconds).map(_.data.decodeString("UTF-8")) )

case _ =>

println("failed to get dynamic url")
}

}
case Failure(error) =>

println(error.toString)
}
}
However this also shows Error in stage [unknown-operation]: Promise already completed.
------------------------------------------------------------------------------------------------------------------

responseFuture onComplete {
case Success(response) =>

val timeout = 300.millis
response.status
response.status match {

case StatusCodes.OK =>

println("printing content")


val bs: Future[ByteString] = response.entity.toStrict(timeout).map { _.data }

val s: Future[String] = bs.map(_.utf8String)

println(s)

case _ =>

println("failed to get dynamic url")
}

}
case Failure(error) =>

println(error.toString)
}
}
However this also shows Error in stage [unknown-operation]: Promise already completed.
------------------------------------------------------------------------------------------------------------------


Has anyone come to this problem before? Let me know if you've got any ideas. Thanks a lot!

Sarah Yin

unread,
Aug 17, 2016, 2:29:15 AM8/17/16
to Akka User List
Find the problem, it is before onComplete has racing problem with steam, which ends up close the connection before it is finished. Fixed by using flatMap instead
Reply all
Reply to author
Forward
0 new messages