PUB/SUB analysis (round 2)

85 views
Skip to first unread message

Martin Sustrik

unread,
Aug 25, 2011, 2:37:39 AM8/25/11
to sp-discu...@googlegroups.com
Hi all,

This email tries to analyse publish/subscribe pattern, identify the
problems and the design options, and start the discussion about the
issues. It tries to elaborate on the points raised in the previous
pub/sub analysis thread.

1. The nature of pub/sub

The idea of pub/sub is to build an overlay multicast network on top of
the underlying transport, such as TCP. The problems it tries to solve
are twofold:

a. IP multicast doesn't work well in large-scale and loosely managed
environments.
b. IP multicast system for message filtering (joining the multicst
groups) is too inefficient for real-world usage, where filtering on
business criteria as well as propagating the filters out of the LAN
scope is crucial.

2. Late joining

The publisher cannot know it advance who's going to subscribe for the
data and so it cannot guarantee that subscriber will get the messages
that were sent *before* it was subscribed -- unless all the historical
messages are stored in the publisher forever, which doesn't make much sense.

Publishers can have a limited "recent messages" buffer so that late
joiners can get couple of historical messages (e.g. RFC3208, section
9.4), however, that doesn't really affect the following statement:

The pub/sub should be viewed as a radio broadcast: If you turn on your
receiver too late and miss the show, that's it, you've missed it. (You
can of course resort to alternative mechanisms such as searching the
archive, but that's out of scope of basic pub/sub pattern).

3. Separating message distribution from message aggregation

Most existing pub/sub systems (including IP mutlicast, AMQP, MQTT, even
0MQ) mix the message aggregation (N to 1) functionality with message
distribution functionality (1 to N). It is commonly implemented by
having a broker/switch in the middle of the topology. All publishers
send messages to the broker (aggregation) and the broker distributes the
messages to all consumers (distibution).

However, experience shows that the two patterns, while similar in some
ways, actually have quite different semnatics. For example, distribution
pattern can't support pushback (see below) while the pushback is
necessary for aggregation (not to lose messages). Further differences
can be seen when considering how should the system behave in a
disconnected state etc.

By enforcing separation between aggregation and distribution, the
topology, which is a generic graph in the mixed case, falls apart into
set of trees. The idea is that you can re-create the graph by connecting
several topologies on the application layer, however, on the messaging
level you have to deal only with trees.

Using a tree for message distribution furthermore satisfies uniformity
principle (the view of the world is consitent whatever part of the
topology you are connected to) which is not true for the case of mixed
aggregation/distribution. Consider following topology:

+-------+ +-------+
| PUB A | | PUB B |
+-------+ +-------+
| \ |
| +-------+
| | C |
| +-------+
| |
+-------+ +-------+
| SUB D | | SUB E |
+-------+ +-------+

Note that subscriber D gets different set of messages than subscriber E.

In the case of pub/sub the tree topology is enforced by requiring each
intermediate node to have at most one publisher connected to it.

4. Reliability and pushback

Publish/subscribe pattern should be defined as inherently unreliable.
The rationale goes like this:

If the pattern was reliable and there's a single slow/dead/malevolent
consumer that doesn't consume messages or does so too slowly, the
publisher would have to buffer the messages intended for that consumer
to ensure reliability. Once the resources of the publisher (memory/disk)
are depleted it would have to stop accepting messages from the upstream
(it has nowhere to store them) thus stopping the message delivery even
to the siblings of the slow/dead consumer. Once it stops accepting the
messages from the upstream, it becomes slow/dead consumer itself and the
same thing repeats on the higher level of the distribution tree.
Finally, the process reaches the root of the distibution tree at which
point the whole system is stuck and nobody receives anything.

The above scenario implies, that with reliable pub/sub every node in the
topology becomes a single point of failure, which makes the whole system
extremely fragile, even in tightly controlled enterprise environments,
not even speaking of use on the Internet. Even worse the tendency to
collapse gets worse with the growing size of the topology. It should be
expected that at some size, any topology would be stuck almost all the time.

It should be also noted that some applications (mostly real-time ones)
specifically require pub/sub not to be reliable at all, ie. not to even
try to deliver messages that cannot be delivered immediately. My feeling
is that the disctinction between completely unreliable and semi-reliable
topologies is blurry and should be addressed by setting rx/tx buffers to
the appropriate size.

The other way to think about the problem is in the terms of bufferbloat
(as reported currently with the Internet fabric itself, eg.
http://mirrors.bufferbloat.net/Talks/BellLabs01192011/). In short: You
improve reliability by growing buffers. However, growing buffers causes
latency to increase out of proportion in congestion situations.

5. Dropping messages

Given that the pub/sub pattern cannot be reliable, it has to drop
messages sometimes. The question is how the messages are selected for
dropping.

The most common requirement is to drop the oldest messages (assuming
that the new messages have fresher data that would replace the data in
the old messages anyway).

The problem with that is that oldest messages are already on the way to
the consumer, possibly residing on some intermediate node. To drop them,
congested node would have to send an out-of-band prioritised message to
the downstream nodes asking them to drop messages. The downstream nodes
would have to propagate the request further downstram to reach the
oldest messages that may be residing there. What makes things even worse
is that if the problem is caused by network congestion we have no
guarantee that these emergency "drop" messages sent downstream would
ever reach their destination.

In multicast protocols like PGM (RFC3208) the problem is solved by
setting a rate limit for the publisher. This measure actually solves two
problems:

a. The oldest messages are either sent downstream or -- if not
possibe -- dropped at each publishing node. This guarantees that even in
the case of congestion or a slow/dead client, the buffer space is going
to become gradually available.
b. By restricting the rate at the root of the distribution tree,
every downstream node/user/enterprise can rely on the limit and do the
capacity planning accordingly.

One question to answer is whether there are two distinct use cases here:
First, the "as reliable as possible", ie. drop the message only if the
buffer is full; second, "as real-time as possible", ie. drop message
when it's due to be sent even if the buffer is not full. The problem
seems similar to the problem solved by ToS bits at IP layer.

Another question is whether to drop only outbound messages or whether to
drop inbound messages as well. While the latter seems like an obvious
best choice, the former may have following advantage: Even misbehaved
publishers (publishing at a rate larger than it's assumed) would be
treated decently. Instead of getting a buffer full of old messages
immediately and dropping them gradually on the output, the messages
would be dropped on input, thus selecting messages from the feed more
evenly instead of following a batch-pause-batch-pause pattern.

6. The nature of subscriptions

Subscriptions are a mechanism for a consumer to specify what kind of
messages it wants to receive.

The important thing about subscriptions is that the criteria specified
by the subscription deal with application data, not the SP internal
data. In other words, the subscription deals with message content, not
the labels.

Another important point is that subscription is a filter, not a link. In
practical terms it means that even if a message matches two distinct
subscriptions, it's still delivered once only. The reason is that that
way if you want it to behave as a link, you can create two links and
pass each subscription on a separate link. However, if subscription
behaved like a link, the only way to make it behave like a filter would
be to implement de-duplication on the application layer. Furthermore,
some 95% of use cases tend to ask for "filter" behaviour and most
messaging implementations in the wild treat the subscriptions this way.

7. Default subscription

When there are no subscription, no messages should be passed.

It is often assumed that if user doesn't subscribe at all what he wants
is simple broadcast and thus the behaviour in such case should be to
pass *all* the messages, ie. effectively turn the filtering off. The
problem with such approach is that if user wants to subscribe, he has to
do so after establishing a link first. Before the first subscription
reaches the publsiher, there are no subscriptions and thus all the
messages are sent to the user. Therefore a batch of messages arrives
that user is not interested in, but still has to parse and drop.

The consequences can be especially grave with high-volume feeds, where
temporary bursts of "pass all" traffic can lead to severe congestion.

7. Unsubscriptions

There should be a way to cancel subscriptions ("unsubscriptions"). There
are two groups of problems with unsubscriptions:

First, how the subscription should be identified? The options are:

a. By subscription string (ie. two subscriptions with the same
textual representation are considered equal)
b. By subscription semantics (ie. two subscrptions filtering exactly
the same messages are considered equal)
c. By subscription ID, ie. an unique ID generated at the point when
subscritpion happens that can be later used to unsubscribe.

My preference at the moment is the first option. Second option requires
converting subscriptions to a normal form, which doesn't play nice with
filtering algorithm agnosticism (see below) and the third option
introduces new entity (subscription ID) to be passed on the wire which
doesn't play nice with Occam's razor.

Second, how should the duplicate subscriptions be treated?

a. Ignore duplicate subscription. Ignore unsubscription if there's no
corresponding subscription.
b. Keep reference count associated with the subscriptions.

AMQP is an example of the former approach. Subscription is uniquely
identified by the pair composed of the subscription and a connection it
was received on. That way, unsubscribing from a specific topic via
connection A doesn't mess with the same subscription placed via
connection B.

However, if we move beyond the start topology, the approach isn't
sufficient. Consider following topology:

+---+
| A |
+---+
|
+---+
| B |
+---+
/ \
+---+ +---+
| C | | D |
+---+ +---+

If C and D subscribe to the same topic, the subscriptions will be
ultimately forwarded (see discussion of subscription forwarding below)
to node A which has no way to distinguish them as both have arrived via
a single connection (A-B). So, if there's no reference count, when
unsubscription posted by C reaches A, it would stop the flow of the
messages not only to C but also to D.

That's why the option b. should be pursued.

8. Subscription forwarding

It is often required that the filtering of messages is not done on the
consumer but rather delegated to an upstream node, such as broker. Doing
so has significant impact on the bandwidth usage.

The detailed discussion of the problem and an example of subscription
forwarding system, as implemented in 0MQ, can be found here this:
http://www.250bpm.com/pubsub

Comment: With subscription forwarding mechanism in place it's important
to distinguish "subscriptions" per se, ie. the filtering rules stored in
SP nodes and "subscription commands", ie. protocol commands passed
upstream to ask for (un)subscriptions to happen.


9. Multiple matching algorithms

The subscriptions are used to check whether message application-level
content matches a particular pattern and filter messages on base of
that. Thus, the matching algortihms differ depending on both the
application logic and the content format (binary, XML, JSON etc.) They
range from a single prefix match (0MQ) through exact match (AMQP direct
exchange) and regular pattern match (MQTT) to sophisticated
multi-property matches (AMQP headers exchange) and SQL- or x-path- like
statements (JMS).

Given the broad range of possible algorithms we want the set to be
extensible, i.e. we want to add a new matching algorithm without
breaking the existing deployments. Given the matching is done on
intermediary nodes (brokers), introducing a new matching algorithm in a
distributed multi-enterprise environment would mean upgrading the
infrastructure in all the participating enterprises in a single go,
which is close to impossible and would probably lead to problems similar
to those encoutered with IPv6 deployment.

Thus, my proposal is to treat subscriptions as end-to-end functionality
(filtering hapens on the terminal consumer) and passing subscriptions
upstream as an optional optimisation of the algorithm. The consequences are:

1. If a node is not aware of a particular subscription type it can still
be functional by not doing filtering at all and blindly forwarding the
subscriptions upstream.
2. Even dumb intermediaries with no filtering functionality (such as L3
switches) can be integrated into pub/sub topologies.
3. Hardware-based filtering is often optimised to be very efficient, yet
imperfect (check how multicast packets are filtered in NICs). With
filtering viewed as an optional feature rather than hard requirement,
this kind of devices can be added to the mix.
4. Even with uni-directional transports (such as IP multicast) where
it's not possible to pass subscriptions upstream, the system should
still work as intended.
5. Mandatory filtering at the edges of the topology solves
synchronisation problems, such as receiving a message after
unsubscription from associated topic was already asked for, but not yet
processed by the upstream node.
6. Finally, it fits well with the traditional model used in most
broker-based messaging systems, namely forwarding the subscription only
half way upstream (to the broker, but not to the publisher). By treating
the filtering as an optimisation, there's no hard requirement for
publishers to do filtering.


Final comment: The need for messaging patterns (such as pub/sub) is
often disputed. The alternative proposed in most cases is to provide a
set of primitives to construct a messaging patterns on the application
level (e.g. AMQP's exchange/queue/binding/cosumer model). I believe that
brief discussion of pub/sub pattern above is a telling example of how
complex can designing a messaging pattern can be. Leaving the design to
the user rather than provinding a pre-packaged implementation in form of
a pattern would mean solving the easy problem (framing etc.) and
silently passing the responsibility for the hard problem to the user.
That in turn would make the messaging systems significantly less useful
and less used.

Martin

Paul Colomiets

unread,
Aug 26, 2011, 5:21:29 PM8/26/11
to sp-discu...@googlegroups.com
Hi Martin,

On Thu, Aug 25, 2011 at 9:37 AM, Martin Sustrik <sus...@250bpm.com> wrote:
> Most existing pub/sub systems (including IP mutlicast, AMQP, MQTT, even 0MQ)
> mix the message aggregation (N to 1) functionality with message distribution
> functionality (1 to N). It is commonly implemented by having a broker/switch
> in the middle of the topology. All publishers send messages to the broker
> (aggregation) and the broker distributes the messages to all consumers
> (distibution).

> [.. snip ..]

We used to have several broker-like instances which have different distribution
tree, but have same nodes at the aggregation side. In other words we push
to several brokers at each node for performance and reliability reasons.
Does it plays well with your pattern distinction?

> Final comment: The need for messaging patterns (such as pub/sub) is often
> disputed. The alternative proposed in most cases is to provide a set of
> primitives to construct a messaging patterns on the application level (e.g.
> AMQP's exchange/queue/binding/cosumer model). I believe that brief
> discussion of pub/sub pattern above is a telling example of how complex can
> designing a messaging pattern can be. Leaving the design to the user rather
> than provinding a pre-packaged implementation in form of a pattern would
> mean solving the easy problem (framing etc.) and silently passing the
> responsibility for the hard problem to the user. That in turn would make the
> messaging systems significantly less useful and less used.
>

This is especially true.

--
Paul

Martin Sustrik

unread,
Aug 27, 2011, 4:07:10 AM8/27/11
to sp-discu...@googlegroups.com, Paul Colomiets
On 08/26/2011 11:21 PM, Paul Colomiets wrote:

> We used to have several broker-like instances which have different distribution
> tree, but have same nodes at the aggregation side. In other words we push
> to several brokers at each node for performance and reliability reasons.
> Does it plays well with your pattern distinction?

It's hot-hot failover, right? All the "brokers" doing the same thing
with clients de-duplicating the results.

It can be easily decomposed into multiple distribution and aggregation
topologies.

In the future, it should be definitelly possible to define such complex
patterns (eg. ask DNS about the service to connect to, if several
records are returned, assume hot-hot failover etc.) so that they don't
have to be dealt with on the application level, however, at the moment
it's too far ahead.

Martin

Reply all
Reply to author
Forward
0 new messages