import akka.NotUsed
import akka.actor.ActorSystem
import akka.stream._
import akka.stream.scaladsl.{BroadcastHub, Flow, Keep, Sink, Source, SourceQueueWithComplete}
import scala.concurrent.Await
import scala.util.Try
object QueueGraph extends App {
implicit val system = ActorSystem()
implicit val materializer = ActorMaterializer()
val sourceQueue: Source[String, SourceQueueWithComplete[String]] =
Source.queue[String](bufferSize = 2, OverflowStrategy.backpressure)
val decider: Supervision.Decider = {
case e: Exception =>
println(s"Graph failed ${e}.. resuming")
Supervision.Resume
}
val (queue: SourceQueueWithComplete[String], source: Source[String, NotUsed]) =
sourceQueue
.via(Flow[String].map { num =>
println(s"flow handling ${num}")
if (num == "1") {
throw new Exception()
} else {
num
}
})
.recover{case e: Exception => "error"}
// If supervisor is removed, the recover catches the exception and returns "error" but the graph fails and no longer processes anything. With supervisor "error" is never emitted
// .withAttributes(ActorAttributes.supervisionStrategy(decider))
.toMat(BroadcastHub.sink(bufferSize = 1))(Keep.both).run()
import scala.concurrent.duration._
// find error
val f = source
.filter(_ == "error")
.idleTimeout(1.second)
.toMat(Sink.headOption)(Keep.right).run()
queue.offer("1")
println(s"Result 1 ${Try(Await.result(f, Duration.Inf))}")
queue.offer("2")
val f2 = source
.filter(_ == "2")
.idleTimeout(1.second)
.toMat(Sink.headOption)(Keep.right).run()
println(s"Result 2 ${Try(Await.result(f2, Duration.Inf))}")
}
val (queue: SourceQueueWithComplete[String], source: Source[String, NotUsed]) =
sourceQueue
.via(Flow[String].map { num =>
println(s"flow handling ${num}")
if (num == "1") {
Failure(new Exception())
} else {
Success(num)
}
})
.map {
case Success(v) => v
case Failure(ex) => "error"
}
.toMat(BroadcastHub.sink(bufferSize = 1))(Keep.both).run()
--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to a topic in the Google Groups "Akka User List" group.
To unsubscribe from this topic, visit https://groups.google.com/d/topic/akka-user/v3vFDox-jLg/unsubscribe.
To unsubscribe from this group and all its topics, send an email to akka-user+...@googlegroups.com.
To post to this group, send email to akka...@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.
Recover allows to send last element on failure and gracefully complete the stream Since the underlying failure signal onError arrives out-of-band, it might jump over existing elements. This stage can recover the failure signal, but not the skipped elements, which will be dropped.
Throwing an exception insiderecover_will_ be logged on ERROR level automatically.
Emits when element is available from the upstream or upstream is failed and pf returns an element
Backpressures when downstream backpressures
Completes when upstream completes or upstream failed with exception pf can handle
Cancels when downstream cancels
When a stream stage ("component") fails, it sends the effective "HUP" signal upstream, and the exception downstream. Perhaps this helps explain why the behavior of recover is as it is?
If you abandon using streams, are you missing an opportunity to improve the overall design of your code? Is the pain because "streams are awkward", or, because the existing design is awkward?
To unsubscribe from this group and all its topics, send an email to akka-user+unsubscribe@googlegroups.com.
To post to this group, send email to akka...@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.
--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to a topic in the Google Groups "Akka User List" group.
To unsubscribe from this topic, visit https://groups.google.com/d/topic/akka-user/v3vFDox-jLg/unsubscribe.
To unsubscribe from this group and all its topics, send an email to akka-user+unsubscribe@googlegroups.com.
To post to this group, send email to akka...@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.
--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to a topic in the Google Groups "Akka User List" group.
To unsubscribe from this topic, visit https://groups.google.com/d/topic/akka-user/v3vFDox-jLg/unsubscribe.
To unsubscribe from this group and all its topics, send an email to akka-user+unsubscribe@googlegroups.com.
To post to this group, send email to akka...@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.
--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to a topic in the Google Groups "Akka User List" group.
To unsubscribe from this topic, visit https://groups.google.com/d/topic/akka-user/v3vFDox-jLg/unsubscribe.
To unsubscribe from this group and all its topics, send an email to akka-user+unsubscribe@googlegroups.com.
To unsubscribe from this group and all its topics, send an email to akka-user+...@googlegroups.com.
To post to this group, send email to akka...@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.
--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to a topic in the Google Groups "Akka User List" group.
To unsubscribe from this topic, visit https://groups.google.com/d/topic/akka-user/v3vFDox-jLg/unsubscribe.
To unsubscribe from this group and all its topics, send an email to akka-user+...@googlegroups.com.
To post to this group, send email to akka...@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.
--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to a topic in the Google Groups "Akka User List" group.
To unsubscribe from this topic, visit https://groups.google.com/d/topic/akka-user/v3vFDox-jLg/unsubscribe.
To unsubscribe from this group and all its topics, send an email to akka-user+...@googlegroups.com.
To post to this group, send email to akka...@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.
--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to a topic in the Google Groups "Akka User List" group.
To unsubscribe from this topic, visit https://groups.google.com/d/topic/akka-user/v3vFDox-jLg/unsubscribe.
To unsubscribe from this group and all its topics, send an email to akka-user+...@googlegroups.com.
To post to this group, send email to akka...@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.
--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to a topic in the Google Groups "Akka User List" group.
To unsubscribe from this topic, visit https://groups.google.com/d/topic/akka-user/v3vFDox-jLg/unsubscribe.
To unsubscribe from this group and all its topics, send an email to akka-user+...@googlegroups.com.