Is contents of GraphDSL.create() {..} thread-safe for BidiFlow?

38 views
Skip to first unread message

Alexey Shuksto

unread,
Oct 19, 2016, 7:24:48 AM10/19/16
to Akka User List
Hello hAkkers,

Simple example:
val zipper = BidiFlow.fromGraph(GraphDSL.create() { b =>
 
var counter = 0
 
  val outbound
= b.add(Flow[String].map { str =>
    counter
+= 1
    str
-> counter
 
})
  val inbound
= b.add(Flow[(String, Int)].map { pair =>
    counter
-= 1
    pair
._1
 
})
 
 
BidiShape.fromFlows(outbound, inbound)
})

Can I presume that contents of 'build block' is thread-safe or I need to guard `counter` somehow (use `AtomicInt` and such)?

Also, do BidiFlow support 'duplex' mode or they process incoming/outgoing messages one at time?

Konrad Malawski

unread,
Oct 19, 2016, 7:27:45 AM10/19/16
to akka...@googlegroups.com, Alexey Shuksto
This is not safe, outbound and inbound flows could be executing on different threads.
It's not a question about the the DSL being safe - that's fine as it's only constructing stuff,
but the graph you constructed is accessing shared state from (potentially) different threads - thus it is not safe.

-- 
Konrad `ktoso` Malawski
Akka @ Lightbend
--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+...@googlegroups.com.
To post to this group, send email to akka...@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

Viktor Klang

unread,
Oct 19, 2016, 7:29:14 AM10/19/16
to Akka User List
Hi Alexey,

Not only is it not thread-safe, but it also actively prevents multiple materializations.

Perhaps if you state your use-case we can suggest an alternative?

--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscribe@googlegroups.com.

To post to this group, send email to akka...@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.



--
Cheers,

Alexey Shuksto

unread,
Oct 19, 2016, 8:18:50 AM10/19/16
to Akka User List
2 Konrad: Yep, in original question I meant not 'DSL construction time' but 'execution time' thread-safety. Thanks for clarification.

2 Victor: Use case is simple: outgoing flow need to store `Promise` of future remote response in some shared state which then would be completed by incoming flow. There could be as many promises as there were outgoing messages, but the order of responses are not guaranteed and there could be additional messages in incoming flow.

What do you meant by 'actively prevents multiple materializations'?

среда, 19 октября 2016 г., 14:29:14 UTC+3 пользователь √ написал:
To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+...@googlegroups.com.

To post to this group, send email to akka...@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.



--
Cheers,

Viktor Klang

unread,
Oct 19, 2016, 8:23:04 AM10/19/16
to Akka User List
On Wed, Oct 19, 2016 at 2:18 PM, Alexey Shuksto <sei...@gmail.com> wrote:
2 Konrad: Yep, in original question I meant not 'DSL construction time' but 'execution time' thread-safety. Thanks for clarification.

2 Victor: Use case is simple: outgoing flow need to store `Promise` of future remote response in some shared state which then would be completed by incoming flow.

So it's a bidirectional buffer of Promises and Futures?


 
There could be as many promises as there were outgoing messages, but the order of responses are not guaranteed and there could be additional messages in incoming flow.

What do you meant by 'actively prevents multiple materializations'?

What happens when you materialize that bidiflow N times?
 
To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscribe@googlegroups.com.

To post to this group, send email to akka...@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.



--
Cheers,

Alexey Shuksto

unread,
Oct 19, 2016, 8:43:07 AM10/19/16
to Akka User List
1. Flow itself is a bidi-codec from ByteString to our own Request/Response entities. Each Request has Promise[Response] attribute. Shared state is more like Map[Request.Id, Promise[Response]] -- because order of Responses are not guarantied.

2. It should have state shared only "inside" this one materialized flow.

среда, 19 октября 2016 г., 15:23:04 UTC+3 пользователь √ написал:

Viktor Klang

unread,
Oct 19, 2016, 8:47:31 AM10/19/16
to Akka User List
On Wed, Oct 19, 2016 at 2:43 PM, Alexey Shuksto <sei...@gmail.com> wrote:
1. Flow itself is a bidi-codec from ByteString to our own Request/Response entities. Each Request has Promise[Response] attribute. Shared state is more like Map[Request.Id, Promise[Response]] -- because order of Responses are not guarantied.

Ok, so a sort of "correlator"-stage?
 

2. It should have state shared only "inside" this one materialized flow.

Yes, so the BidiFlow you create has shared state tied to the intance, not the mateiralization.
I think you'll need to create a custom GraphStage with a BidiShape.
 
To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscribe@googlegroups.com.

To post to this group, send email to akka...@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.



--
Cheers,

Alexey Shuksto

unread,
Oct 19, 2016, 12:26:11 PM10/19/16
to Akka User List
среда, 19 октября 2016 г., 15:47:31 UTC+3 пользователь √ написал:
Ok, so a sort of "correlator"-stage?

Exactly.
 
Yes, so the BidiFlow you create has shared state tied to the intance, not the mateiralization.
I think you'll need to create a custom GraphStage with a BidiShape.

Yep, thanks for your help -- I've implement such correlator as GraphStage, looks better now.

One minor question (docs aren't clear enough there): is it safe to drop `pull(in)` in `in` `onPush()` handler if such `pull(in)` happen in `out` `onPull()` handler?

In documentation for custom stages `in` pulled twice (both in `onPush()` and `onPull()`) and in code it is only pulled once.
Reply all
Reply to author
Forward
0 new messages