What are the correlation between ByteString in Akka Streams TCP packet payloads?

218 views
Skip to first unread message

Magnus Andersson

unread,
Apr 9, 2016, 5:00:31 PM4/9/16
to Akka User List
Hi

I'm implementing a TCP protocol (Lumberjack2/Beats) in Akka Streams. It is a streaming application level protocol where each messages varies data length. In Akka streams I get ByteStrings fed through the stream but I can't find any information about how these correlate with TCP packets sent over wire. Without that knowledge I have to create a buffer for partial message and handle all theoretical cases that could occur when ByteString does not correlate 1:1 with TCP payloads. The resulting implementation becomes quite hairy.

1. What are the guarantees around correlation between Akka ByteStrings and TCP packets? 
2. Examples of TCP protocols implemented with Akka streams that uses dynamic framing examples?

/Magnus

Endre Varga

unread,
Apr 10, 2016, 2:51:03 AM4/10/16
to akka...@googlegroups.com
Hi Magnus,



On Sat, Apr 9, 2016 at 11:00 PM, Magnus Andersson <mag...@magnusart.com> wrote:
Hi

I'm implementing a TCP protocol (Lumberjack2/Beats) in Akka Streams. It is a streaming application level protocol where each messages varies data length. In Akka streams I get ByteStrings fed through the stream but I can't find any information about how these correlate with TCP packets sent over wire.

Because TCP is a stream protocol. Hence no sent buffers correlate with TCP frames at all, TCP is free to rechunk them as it sees fit. 
 
Without that knowledge I have to create a buffer for partial message and handle all theoretical cases that could occur when ByteString does not correlate 1:1 with TCP payloads. The resulting implementation becomes quite hairy.

1. What are the guarantees around correlation between Akka ByteStrings and TCP packets? 

Nothing. This is the nature of TCP and has nothing to do with Akka.
 
2. Examples of TCP protocols implemented with Akka streams that uses dynamic framing examples?

Check the akka.io Framing classes.

-Endre
 

/Magnus

--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+...@googlegroups.com.
To post to this group, send email to akka...@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

Magnus Andersson

unread,
Apr 20, 2016, 9:41:15 AM4/20/16
to akka...@googlegroups.com

Solved it by doing my own version of something similar to the framing classes.

Perhaps a custom stage is what I should use for problems like a protocol with variable length payloads, acking on application level, messages received in a specific order?

Doing it with flows or graphs seemed to create tons of boilerplate and was not really easy to follow along with. I'd really like to do some type of state machine for this, mixing Akka FSM and streams. Do you know of any idiomatic examples doing something like that?

/Magnus


You received this message because you are subscribed to a topic in the Google Groups "Akka User List" group.
To unsubscribe from this topic, visit https://groups.google.com/d/topic/akka-user/o8djg3OKV3Q/unsubscribe.
To unsubscribe from this group and all its topics, send an email to akka-user+...@googlegroups.com.

Derek Williams

unread,
Apr 20, 2016, 10:42:29 AM4/20/16
to akka...@googlegroups.com
If it helps, a simple custom framing stage could look like this:

Flow[ByteString].statefulMapConcat[MyThing] { () =>
  var buffer = ByteString.empty
  in => {
    val (things, remaining) = parseThings(buffer ++ in)
    buffer = remaining
    things
  }
}

def parseThings(bytes: ByteString): (Vector[MyThing], ByteStrings) = { ... }

where parseThings finds as many things as it can from the available bytes, returning the found things and the unused bytestring.
Other state can also be kept alongside the buffer if you need that as well.

Endre Varga

unread,
Apr 20, 2016, 10:43:47 AM4/20/16
to akka...@googlegroups.com
Yes, this is a simple approach and works without having to do a custom stage (exactly why statefulMapConcat was introduced).

-Endre

Magnus Andersson

unread,
Apr 20, 2016, 7:20:51 PM4/20/16
to Akka User List
Hi

So the statefulMapConat was what I used at first, but the implementation became a mess. I'll try to give a richer example of what I am missing in streams in this particular case.

My original approach was a bit naive. I wished to model the protocol and state transitions as a flow graph. I realized that sharing the connection/transaction specific state across several flow stages and model the protocol state machine in a streaming fashion became a very complex affair of messages as nested Eithers together with broadcasts, filtering and priority merges for semi-dynamic routing.

The challenge I had was that without guarantees about how much data is available in each ByteString (my original question) I must be able to pull in more data into a buffer until I have enough data to continue processing. I couldn't know how much of the buffer was consumed until downstream stages had finished so I had to feed back the buffer and how much the buffer size (payload) was required back into the first stage that pulled in ByteStrings from Akka TCP IO. When I noticed that I was modeling if-else logic and pattern matching in a complex maze of (built in) flow stages I stopped and looked for a different approach.

If I were to do it again I could have had to crammed everything into a single statefulMapConcat instead. Much like what I did with the GraphStage in the end. The result would be similar. But that is not using the stream DSL to model the protocol at all just putting a big blob of imperative code on one stage. This means that easy composition is lost. On the plus side the reactive properties are still there.

Many protocols are designed in a way that implies keeping track of state for the duration of the connection or batch of messages. Same goes for file formats or other data structures. A result/validation is built up in successions based on previously received data. In these cases my conclusion is that the provided in Streams DSL is not enough/suitable, I have to create custom stages or use actors which gives me more control.

Your thought on this as implementors/designers of the Streams DSL would be interesting.

/Magnus
Reply all
Reply to author
Forward
0 new messages