HBase back-pressure

197 views
Skip to first unread message

Nick Telford

unread,
Jun 11, 2012, 1:31:34 PM6/11/12
to async...@googlegroups.com
Hi,

Benoit, I saw a post from you saying that you were considering adding back-pressure in to an upcoming release, but cannot find any more details.

While we love the high-throughput and non-blocking nature of asynchbase, we're concerned about the scenario where clients attempt to write with a higher throughput than the cluster can handle, usually transiently, e.g. during major compactions, heavy GC-load, moving region servers etc.

What's the current situation, does the client buffer writes until they complete? Will it continue buffering until OOM if it can't complete them fast enough?

Regards,

Nick Telford

tsuna

unread,
Jun 11, 2012, 2:06:00 PM6/11/12
to Nick Telford, async...@googlegroups.com
On Mon, Jun 11, 2012 at 10:31 AM, Nick Telford <nick.t...@gmail.com> wrote:
> While we love the high-throughput and non-blocking nature of asynchbase,
> we're concerned about the scenario where clients attempt to write with a
> higher throughput than the cluster can handle, usually transiently, e.g.
> during major compactions, heavy GC-load, moving region servers etc.
>
> What's the current situation, does the client buffer writes until they
> complete? Will it continue buffering until OOM if it can't complete them
> fast enough?

The only really problematic case is when a region is moving (due to
balancing or RS failure) or is being split. There is another
interesting problematic case that can happen if you try to suddenly
send a very large number of RPCs, each to a different key of a given
table, on a freshly created HBaseClient.

Let me first explain briefly the life of an RPC:
When you give an RPC to HBaseClient, it first looks up in its META
cache the region where it thinks this RPC ought to go. In a long
running application, chances are that the client already knows all the
regions in its working set, and already has an open connection the
right RegionServer. In that case the RPC goes straight to the right
RegionClient, which serializes it and sends it out to the network. At
this point, only 2 things are left in memory: the actual object of the
RPC being sent, along with an entry in some map that keeps track of
RPC IDs assigned associated with outstanding ("in-flight") RPCs.

There is currently no limit on the number of RPCs that can be
outstanding at the same time. If you look at RegionClient.java,
you'll find this:
// TODO(tsuna): Check the size() of rpcs_inflight. We don't want to have
// more than M RPCs in flight at the same time, and we may be overwhelming
// the server if we do.

What this means is that yes it is currently possible that your
application would run itself out of memory if it keeps producing and
sending new RPCs faster than the server can handle. As the TODO shows
above, there is currently no mechanism in asynchbase to prevent this
from happening. It's not hard to put a limit on the number of RPCs in
flight, what's hard and the reason why I haven't done this yet is:
what do you do once you hit that limit? The premise of asynchbase is
to never block, so how do we relay some back pressure to the caller to
tell them to slow down?

In the event that the META cache didn't know where the key was, a META
lookup needs to occur. This can also be problematic. To see how,
create a new HBaseClient, and have a tight loop create 1000000 RPCs,
each for a different key, and send them all to the HBaseClient. The
client will think it needs to do one META lookup per RPC, which would
be really bad and cause what I call a "META storm", as all of a sudden
a bazillion lookups hit the META region. In order to try to mitigate
this, and because META lookups are generally fast, there is a
rate-limit mechanism which is explained in the javadoc of the
acquireMetaLookupPermit method in RegionClient.java. The idea is that
there is a semaphore that controls the number of META lookups that can
happen concurrently, and when the semaphore runs out of permits,
subsequent META lookups get a 5ms penalty. In this case I "cheated"
because I'm actually blocking, albeit only for 5ms. I found this
strategy to be rather effective as it significantly reduces the
magnitude of "META storms" when they happen.

In my experience however, the concerns above, although they're real,
don't tend to cause big practical production issues. The problems
tend to come from region unavailability due to splits / balancing /
failures.

What happens then is that the RegionServer will return a
NoSuchRegionException (during balancing / splits), or the connection
to the RegionServer will fail if the RegionServer is dead (the best
case is because the JVM of the RS is dead, so the OS will just reset
all the connections, the worst case is if the JVM isn't dead or the
machine is dead, in which case you have to wait until the TCP
connection keep-alive times out). Once a region is identified as
being unavailable, it is marked as such in HBaseClient's META cache,
so that all subsequent RPCs to this region don't even go to the
network. Instead, they get diverted into a queue where they wait
until the region comes back online. This queue has a hard-coded limit
of 10k RPCs per region. There is actually a low watermark (hardcoded
at 1k RPCs) and a high watermark (hardcoded at 10k RPCs). When the
low watermark is hit, the 1000th RPCs to try to enter the queue will
immediately fail with a PleaseThrottleException, which you can handle
by adding an errback to the Deferred of your RPCs. Then subsequent
RPCs are also queued until the high watermark is hit, after which all
subsequent RPCs will fail-fast with a PleaseThrottleException.

The idea behind PleaseThrottleException is that this is how asynchbase
is trying to apply non-blocking back pressure on the caller when
things are backing up too much. When you get it, you can handle it
however you want. Typically what I would recommend you do is that you
turn a boolean to mark the server as unhealthy, so that it stops
receiving traffic.

I hope this helped shed some light on the current state of things.
There is obviously a lot of room for improvement, and I'm all hear if
you have cool ideas on how to properly apply back pressure in a
non-blocking fashion in various scenarios above, while mitigating the
risk of running ourselves out of memory.

--
Benoit "tsuna" Sigoure
Software Engineer @ www.StumbleUpon.com

Nick Telford

unread,
Jun 11, 2012, 2:32:20 PM6/11/12
to async...@googlegroups.com, Nick Telford
Thanks for your quick response!

This issue is quite a lot more complex than I'd anticipated, but it's good that there are already (partial) mechanisms in place to aid back-pressure.

Generally I think it would be nice if, along with configuring the threshold, it's possible to configure a "policy" to control the action to take when the threshold is hit. I think there are a few options, each that would have valid use-cases:
  • Discard the RPC, good for applications that care more about throughput than durability
  • Block, with an optional timeout after which either a) the message is discarded as above, or b) an Exception is thrown as below
  • Throw a PleaseThrottleException

This is somewhat similar to the policies available to Java's ThreadPoolExecutor.

 In my particular use-case, we'd love to block once we hit the high-water mark for in-flight RPCs. This may sound contradictory, but our application spools data from disk as fast as it can, and periodically blocking the thread is the best way we can throttle our ingest.


This is interesting but I'm not sure it's the best solution for all use-cases. In our scenario (see above), if we were to hit one of these high-water marks, I'm not sure how we could handle the PleaseThrottleException. Anything we do (e.g. sleep/block for a period of time) would be a shot in the dark as we'd have no way to know if/when the client is able to receive writes again. I imagine even asynchronous scenarios would suffer from a similar problem.
 
I hope this helped shed some light on the current state of things.
There is obviously a lot of room for improvement, and I'm all hear if
you have cool ideas on how to properly apply back pressure in a
non-blocking fashion in various scenarios above, while mitigating the
risk of running ourselves out of memory.

Thanks for taking the time to explain; this seems to be a tricky problem and I doubt finding the "right" solution will be easy.

For now, I'm probably going to build a simple monitor in to our application that throws each Deferred in to a BlockingQueue with a fixed capacity and have each Deferred remove itself once completed. It's not pretty, but it should handle things for my use-case for now.

I look forward to seeing how this discussion unfolds!

Regards,

Nick Telford

tsuna

unread,
Jun 12, 2012, 2:51:51 AM6/12/12
to Nick Telford, async...@googlegroups.com
On Mon, Jun 11, 2012 at 11:32 AM, Nick Telford <nick.t...@gmail.com> wrote:
> This issue is quite a lot more complex than I'd anticipated, but it's good
> that there are already (partial) mechanisms in place to aid back-pressure.

Yup, the devil is always in the details :(

> Generally I think it would be nice if, along with configuring the threshold,
> it's possible to configure a "policy" to control the action to take when the
> threshold is hit. I think there are a few options, each that would have
> valid use-cases:
>
> Discard the RPC, good for applications that care more about throughput than
> durability
> Block, with an optional timeout after which either a) the message is
> discarded as above, or b) an Exception is thrown as below
> Throw a PleaseThrottleException

The idea behind PleaseThrottleException was that it would allow you to
handle the situation however you want. I never felt really good about
this PleaseThrottleException business to be honest, but I haven't
found anything better. Once you get the exception in your callback,
then it's trivial for you to decide what to do: discard it, block,
sacrifice a goat, whatever else.

>  In my particular use-case, we'd love to block once we hit the high-water
> mark for in-flight RPCs. This may sound contradictory, but our application
> spools data from disk as fast as it can, and periodically blocking the
> thread is the best way we can throttle our ingest.

The way I was hoping to deal with this in OpenTSDB was as follows.
OpenTSDB has a similar issues when doing batch imports from a file,
where you can read a ton of data really quickly from a file. The
PleaseThrottleException exception you get when you hit the low
watermark was supposed to atomically set a boolean to false, so that
the thread reading from the input would realize it has to stop. Once
the current backlog of RPCs was flushed, the boolean would be set to
true and the import thread would get signaled so it can continue. I
found that the signaling part wasn't as easy to get right as I
thought.

What I tried at first was to have the import thread block on an RPC as
soon as the flag is set:

// In the import loop:
Deferred<..> d = client.put(some_write);
if (need_to_throttle) {
d.join(); // block until that write goes through
need_to_throttle = false;
}

Where need_to_throttle is like a volatile boolean or something of that effect.
Of course, there is a callback that handles PleaseThrottleException
and sets need_to_throttle, e.g.:

d.addErrback(new Callback<Exception, Exception>() {
public Exception call(Exception e) {
if (e instanceof PleaseThrottleException) {
PleaseThrottleException pte = (PleaseThrottleException) e;
need_to_throttle = true;
// Retry the RPC to avoid data loss
client.put(pte.getFailedRpc());
}
}
});

This admittedly naive approach isn't effective because the .join()
might return very quickly if that write happened to go to a region
that wasn't having an issue. The other problem is that in a
high-throughput import, after the low watermark is reached, by the
time the callback executes and starts handling the
PleaseThrottleException, the high watermark might already be reached
too, and so we're in an unpleasant situation because we have this
hot-potato RPC in our hands that no one wants but that we don't wanna
discard either.

I've had more success by wrapping the entire import loop with a
semaphore that controls the amount of concurrency of the import. Grab
a permit when you send out an RPC, release it when the RPC completes
(successfully or not). Give the semaphore enough permits to achieve
high throughput and pound hard on the cluster, but not too many so
that when things back up, you run out of permits and your import
naturally stops.

> This is interesting but I'm not sure it's the best solution for all
> use-cases. In our scenario (see above), if we were to hit one of these
> high-water marks, I'm not sure how we could handle the
> PleaseThrottleException.

You have to set a flag to tell the thread doing the import to stop
producing new RPCs.

> Anything we do (e.g. sleep/block for a period of
> time) would be a shot in the dark as we'd have no way to know if/when the
> client is able to receive writes again. I imagine even asynchronous
> scenarios would suffer from a similar problem.

Yes, although the problem is more insidious in batch imports where a
small number of threads can generate RPCs at a pace that significantly
exceeds the pace at which they can be consumed.

> For now, I'm probably going to build a simple monitor in to our application
> that throws each Deferred in to a BlockingQueue with a fixed capacity and
> have each Deferred remove itself once completed. It's not pretty, but it
> should handle things for my use-case for now.

Or you could just use a Semaphore as suggested above. It accomplishes
the same thing, but it's simpler and a lot more lightweight.

tsuna

unread,
Jun 12, 2012, 3:17:08 AM6/12/12
to Nick Telford, async...@googlegroups.com
On Mon, Jun 11, 2012 at 11:51 PM, tsuna <tsun...@gmail.com> wrote:
> Yup, the devil is always in the details :(

So much that I feel like I should clarify a couple points of my own answer.

> I've had more success by wrapping the entire import loop with a
> semaphore that controls the amount of concurrency of the import.  Grab
> a permit when you send out an RPC, release it when the RPC completes
> (successfully or not).

Obviously, PTE (PleaseThrottleException) still needs to be handled
specially, because releasing a permit on a PleaseThrottleException
would not help. In my approach with a semaphore, I was handling PTE
by grabbing all outstanding permits from the semaphore and releasing
them all only after the RPC that caused the PTE would go through
(meaning would succeed or fail with another error than a PTE).

> Give the semaphore enough permits to achieve
> high throughput and pound hard on the cluster, but not too many so
> that when things back up, you run out of permits and your import
> naturally stops.

Of course it's really hard to size the semaphore appropriately, and
under-provisioning permits will artificially lower throughput, so
instead I do as described above and grab all remaining permits at once
upon a PTE. As part of
https://issues.apache.org/jira/browse/HBASE-5539 which includes
asynchbase in HBase's own PerformanceEvaluation benchmark, I actually
released code that implements the ideas above. If you look at the
patch attached to the issue, you'll see how I used the semaphore (in
that code I actually used one semaphore per producer thread).

I'm also not suggesting that this is "the right way" to solve this
problem. It clearly is not. I'm still looking for what the right way
should be. Having to do all this Semaphore business manually outside
of asynchbase sucks. Maybe you're right that the solution relies on
providing some kind of a policy class that people can override to
write their own custom policies, along with basic policies like
"block", "discard", and of course the mandatory "sacrifice a goat"
policy. I just haven't found the right API yet to make things as
painless as possible.

Finally i should point out that the RPC buffer into which RPCs get
diverted when a region is known to be unavailable is a per-region
buffer, not global or per-connection. This is generally fine as you
tend to have no more than a few regions unavailable at a time during
normal operations, but during rack failures or cases where the cluster
goes tits up, you might quickly run out of memory if a lot of regions
drop off the grid at the same time.
Reply all
Reply to author
Forward
0 new messages