Thanks in advance for any help!Jan
--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+...@googlegroups.com.
To post to this group, send email to akka...@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.
broadcast ~> sink
source ~> merge ~> fetchUrl ~> broadcast ~> extractNextUrl
merge.preferred <~ extractNextUrl
source ~> sync ~> merge.preferred
merge ~> broadcast
broadcast.out(0) ~> finishedFilter
broadcast.out(1) ~> unfinishedFilter
merge <~ syncStatusFlow <~ Limiter.limit[Long](Client.limiter, 10 seconds) <~ Flow[SyncStatus].map(_.account) <~ unfinishedFilter
(finishedFilter.outlet)
broadcast ~> sink
source ~> merge ~> fetchUrl ~> broadcast ~> extractNextUrl
merge.preferred <~ extractNextUrl
class FeedbackShape[T](val _init: FanInShape.Init[T]) extends FanInShape[T](_init) { def this() = this(FanInShape.Name[T]("FeedbackShape"))
override protected def construct(init: Init[T]): FanInShape[T] = new FeedbackShape(init) override def deepCopy(): FeedbackShape[T] = super.deepCopy().asInstanceOf[FeedbackShape[T]]
val in = newInlet[T]("in") val feedback = newInlet[Option[T]]("feedback") }
class Feedback[T] extends GraphStage[FeedbackShape[T]] { // Define the shape of this stage, which is SourceShape with the port we defined above override val shape: FeedbackShape[T] = new FeedbackShape
def in: Inlet[T] = shape.in def out: Outlet[T] = shape.out def feedback: Inlet[Option[T]] = shape.feedback
// This is where the actual (possibly stateful) logic will live override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) { var flowingCount = 0
def checkComplete(): Unit = { // Complete this stage if there are no data in the feedback loop if(flowingCount == 0 && isClosed(in)) { completeStage() } }
def pullSecondary(): Unit = tryPull(in)
/** * Output port handler */ setHandler(out, new OutHandler { private var first = true override def onPull(): Unit = { if (first) { first = false tryPull(feedback) tryPull(in) } } })
var feedbackEmitting = 0
/** * Prefered input port handler */ setHandler(feedback, new InHandler { override def onUpstreamFinish(): Unit = checkComplete() override def onPush(): Unit = { /* Received back the element in the feedback loop */ flowingCount -= 1 emitFeedback() }
def emitFeedback(): Unit = { feedbackEmitting += 1
grab(feedback) match { case Some(e) => { flowingCount += 1 emit(out, e, emitted) } case None => checkComplete() } tryPull(feedback) }
val emitted = () ⇒ { feedbackEmitting -= 1 if (isAvailable(feedback)) emitFeedback() else if (feedbackEmitting == 0) emitSecondary() }
def emitSecondary(): Unit = { if (isAvailable(in)) { flowingCount += 1 emit(out, grab(in), pullSecondary) } }
})
/** * Secondary input port handler */ setHandler(in, new InHandler { override def onPush(): Unit = { if (feedbackEmitting == 0) { flowingCount += 1 emit(out, grab(in), pullSecondary) } } override def onUpstreamFinish(): Unit = checkComplete() })
} }
source ~> feedback.in
feedback.out ~> broadcast ~> output/sink
feedback.feedback <~ loopOperation <~ broadcast
def emitFeedback(): Unit = {
grab(feedback) match {
case Some(e) => {
feedbackEmitting += 1
flowingCount += 1
emit(out, e, emitted)
}
case None => checkComplete()
}
tryPull(feedback)
}
val takeWhile = Flow[Option[T]].takeWhile(_.nonEmpty)
val mapConcat = Flow[Option[T]].mapConcat(_.toList)
broadcast ~> sink
source ~> merge ~> fetchUrl ~> broadcast ~> extractNextUrlOpt
merge <~ mapConcat <~ takeWhile <~ extractNextUrlOpt
val counter = Flow[List[T]].statefulMapConcat { () =>
var counter = sourceSize // Initialize the stateful counter to the # of tasks we start out with in our Source
newTasks => {
counter -= 1 // Each time we get a list of new tasks, we know that exactly one old task is done
counter += newTasks.size // And exactly the # of tasks in the list has been added
(counter, newTasks)
}
}
val takeWhile = Flow[(Long, List[T])].takeWhile { case (counter, _) => counter != 0 }
val mapConcat = Flow[(Long, List[T])].mapConcat { case (_, newTasks) => newTasks }
broadcast ~> sink
source ~> merge ~> fetchUrl ~> broadcast ~> extractNextUrlList
merge <~ mapConcat <~ takeWhile <~ counter <~ extractNextUrlList