val s3ListBucket: Source[ByteString, NotUsed] =
s3Client.listBucket(bucket, Some(currentPrefix), maxKeys, nextMarker)
val bucketListingXml: Future[String] = s3ListBucket
.map(_.utf8String)
.runWith(Sink.seq)(materializer)
.map(_.mkString)(materializer.executionContext)
bucketListingXml.foreach {
xml =>println(s"This gets called. prefix $currentPrefix")pushCallback.invoke(xml)
}(materializer.executionContext)
And the callback
val pushCallback = getAsyncCallback[String] { xml =>
log.info(s"This is never called for last element in graph!")
push(out, xml)
}
--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscribe@googlegroups.com.
To post to this group, send email to akka...@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.
To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+...@googlegroups.com.
My crystal ball tells me you may be observing timing artifacts which originate from the fact that you println one thing, but log the other.So there will be a timing difference when one or the other actually hits System.out.
--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to a topic in the Google Groups "Akka User List" group.
To unsubscribe from this topic, visit https://groups.google.com/d/topic/akka-user/fBkWg4gSwEI/unsubscribe.
To unsubscribe from this group and all its topics, send an email to akka-user+...@googlegroups.com.
package hack.streams
import akka.NotUsed
import akka.stream._
import akka.stream.scaladsl.{Balance, Flow, GraphDSL, Merge, Sink, Source}
import akka.stream.stage._
import org.joda.time.{DateTime, Duration, Interval}
import scala.collection.immutable.Seq
import scala.concurrent.{Await, Future}
object AsyncIssue {
import StreamsMaterializer._
def minuteSource(interval: Interval) = new GraphStage[SourceShape[DateTime]]() {
val out = Outlet[DateTime]("keys")
val shape = SourceShape(out)
def zeroToMinute(date: DateTime) = date.withMillisOfSecond(0).withSecondOfMinute(0)
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) with StageLogging {
var isDone: Boolean = false
var current: DateTime = new DateTime(0)
override def preStart() = {
current = zeroToMinute(interval.getStart)
}
setHandler(out,
new OutHandler {
override def onPull(): Unit = {
if (!isDone) {
push(out, current)
current = current.plusMinutes(1)
if (current.isEqual(zeroToMinute(interval.getEnd))) {
isDone = true
}
} else {
complete(out)
}
}
})
}
}
def futureCallbackFlow = new GraphStage[FlowShape[DateTime, String]]() {
val in = Inlet[DateTime]("minute")
val out = Outlet[String]("string")
val formatter = org.joda.time.format.DateTimeFormat.forPattern("yyyy/MM/dd/HH/mm")
val shape = FlowShape.of(in, out)
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) with StageLogging {
val pushCallback = getAsyncCallback[String] { seq =>
push(out, seq)
}
setHandler(in, new InHandler {
override def onPush(): Unit = {
val minute = grab(in)
val fMin: Future[DateTime] = Future {minute}
fMin.foreach { min =>
pushCallback.invoke(formatter.print(min))
}
}
})
setHandler(out,
new OutHandler {
override def onPull(): Unit = {
pull(in)
}
}
)
}
}
def parallelizeFlow[In, Out](parallelize: Int, flow: Flow[In,Out,NotUsed]): Flow[In, Out, NotUsed] = Flow.fromGraph(GraphDSL.create() { implicit builder =>
import GraphDSL.Implicits._
val dispatcher = builder.add(Balance[In](parallelize))
val merger = builder.add(Merge[Out](parallelize))
for (i <- 0 to parallelize - 1) {
dispatcher.out(i) ~> flow.async ~> merger.in(i)
}
FlowShape(dispatcher.in, merger.out)
})
def run() = {
import StreamsMaterializer._
val asyncFlow = Flow[DateTime].via(futureCallbackFlow)
val source: Source[DateTime, NotUsed] = Source(11 to 20).via(Flow[Int].map { num =>
val formatter = org.joda.time.format.DateTimeFormat.forPattern("yyyy/MM/dd/HH/mm")
formatter.parseDateTime(s"2016/12/14/14/$num")
})
val mat: Future[Seq[String]] = Source.fromGraph(minuteSource(new Interval(Duration.standardMinutes(10), new DateTime().dayOfMonth().roundFloorCopy().minusDays(1))))
.via(parallelizeFlow(parallelize = 1, asyncFlow))
// .via(asyncFlow)
.runWith(Sink.seq)
val seq: Seq[String] = Await.result(mat, scala.concurrent.duration.Duration.Inf)
println(s"seq is $seq")
}
}
To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+...@googlegroups.com.
To post to this group, send email to akka...@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.
--Cheers,√
To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscribe@googlegroups.com.
To post to this group, send email to akka...@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.
final def responseEntityAsString(entity: ResponseEntity)(implicit executionContext: ExecutionContext, materializer: Materializer): Future[String] =
entity.dataBytes.runWith(Sink.fold(ByteString.empty)(_ ++ _))
.map(_.utf8String)
Http().singleRequest(HttpRequest(uri = uri))
.flatMap { response =>
responseEntityAsString(response.entity)
.map { data =>
if (!(data.endsWith("}") || data.endsWith("]")))
println(s"Got data [${data.length}] contentLength:[${response.entity.contentLengthOption}] response: [${response.headers.mkString(",")}, $response]")
}
override def onUpstreamFinish(): Unit = {
if (inflight == 0) {
complete(out)
} else {
// there are still items in progress, wait and we'll complete the out port in future callback
}
}
if (inflight == 0) {
if (isClosed(in)) {
complete(out)
}
}