INodeSuspensionStrategyc actually working?

85 views
Skip to first unread message

tcn

unread,
May 23, 2011, 10:09:30 AM5/23/11
to Scale 7 - Libraries and systems for scalable computing
Hi!

Is somebody successfully using an INodeSuspensionStrategy? So I wrote
a very simple one that remembers the last count of corrupt
connections
in a ConcurrentHashMap and returns true if they new count is higher.
This actually works. But what appears not to (reliably) work is the
removal
of the nodes from the pool.

pelops 1.1, threaded ofc.

thx
tcn

Dan Washusen

unread,
May 23, 2011, 6:53:01 PM5/23/11
to sca...@googlegroups.com
The javadoc for INodeSuspensionStrategy could be a little clearer but it does give a pretty big hint.

Interface used to define how nodes should be suspended for behaving badly. For example, if a node is reporting lots of corrupt connections then maybe it should be avoided for a while.
Implementations should indicate if a node is suspended by ensuring that CommonsBackedPool.INodeSuspensionState.isSuspended() returns true until the node should no longer be considered suspended.
Any state required to determine if a node should be suspended should be stored in the nodes PooledNode.getSuspensionState(). Note that the suspension state may be null if the node has not been evaluated before.
Also note that the evaluate(CommonsBackedPool, PooledNode) will be called by the scheduled tasks thread even when the node is currently suspended. 

Based on the code you sent through via private message I'd say your issue is that you're never actually suspending the node.  It's not sufficient to just return false from the evaluate method.  You need to populate the INodeSuspensionState and ensure that the PooledNode knows that it's suspended.

p.s. It's probably worth while including the INodeSuspensionStrategy impl that you're trying to use (so others have some context)...
p.p.s. Suspending nodes based solely on connection corrupted probably isn't going to have the desired effect.  Connections are marked as corrupted for a range of reasons so it doesn't provide a big enough picture to do anything useful.
p.p.p.s Also note that in this case the node is only the coordinator for the request.  Once Cassandra receives the request it will potentially re-route the request to another node.

tcn

unread,
May 24, 2011, 3:11:32 AM5/24/11
to Scale 7 - Libraries and systems for scalable computing
On May 24, 12:53 am, Dan Washusen <d...@reactive.org> wrote:
> that you're never actually suspending the node.  It's *not* sufficient to
> just return false from the evaluate method.  You need to populate
> the INodeSuspensionState and ensure that the PooledNode knows that it's
> suspended.

Oh, good to know, somehow I missed that... :)

> p.s. It's probably worth while including the INodeSuspensionStrategy impl
> that you're trying to use (so others have some context)...

private static final Map<String, Integer> CORRUPTIONS =
Maps.newConcurrentMap();

new CommonsBackedPool.INodeSuspensionStrategy()
{
@Override
public boolean evaluate(final CommonsBackedPool
pool, final PooledNode node)
{
final int had =
Objects.firstNonNull(CORRUPTIONS.get(node.getAddress()), 0); // poor
man's default value
final int has =
node.getConnectionsCorrupted();
final boolean suspend = has > had; // maybe
add some threshold (?)
if (suspend) LOG.info("{} had {} corruptions,
now {}. Suspending.", new Object[] { node.getAddress(), had, has });
CORRUPTIONS.put(node.getAddress(), has);
node.setSuspensionState(new
INodeSuspensionState()
{
@Override
public boolean isSuspended()
{
return true;
}
});
return suspend;
}
}

> p.p.s. Suspending nodes based solely on connection corrupted probably isn't
> going to have the desired effect.  Connections are marked as corrupted for a
> range of reasons so it doesn't provide a big enough picture to do anything
> useful.

True. What's your suggestion? :)

> p.p.p.s Also note that in this case the node is only the coordinator for the
> request.  Once Cassandra receives the request it will potentially re-route
> the request to another node.

Sure but it won't route it to a dead note :)

Alex Araujo

unread,
May 24, 2011, 9:09:50 AM5/24/11
to Scale 7 - Libraries and systems for scalable computing
On May 24, 2:11 am, tcn <timo.nent...@gmail.com> wrote:
> > p.p.p.s Also note that in this case the node is only the coordinator for the
> > request.  Once Cassandra receives the request it will potentially re-route
> > the request to another node.
>
> Sure but it won't route it to a dead note :)

I believe what Dan means here is that you will potentially be removing
the node that was coordinating the underlying corrupt connection; not
the node with the corrupt connection itself.

Generally, it's best to let Cassandra make the 'node is dead'
determination and simply have the client avoid making requests to a
node that is returning timeouts or corrupt connections for some period
of time. Cassandra coordinator nodes will re-route requests to other
nodes in the replica set until the dead node is operational or
replaced. From my own experience, client libraries have what you need
99% of the time and do not typically require additional policies/
logic.

I would suggest jumping on the #cassandra channel on freenode IRC or
emailing the cassandra-user list for general strategy questions or
Cassandra failure detection/handling questions. Both are fairly
active.

tcn

unread,
May 24, 2011, 10:43:09 AM5/24/11
to Scale 7 - Libraries and systems for scalable computing

On May 24, 3:09 pm, Alex Araujo <alexara...@gmail.com> wrote:
> Generally, it's best to let Cassandra make the 'node is dead'
> determination and simply have the client avoid making requests to a
> node that is returning timeouts or corrupt connections for some period
> of time.  Cassandra coordinator nodes will re-route requests to other

That's what I'm trying to achieve actually :)

> nodes in the replica set until the dead node is operational or
> replaced.  From my own experience, client libraries have what you need
> 99% of the time and do not typically require additional policies/
> logic.

Does not apply to Pelops. As soon as I shut down 1 node it throws
Exceptions forever.

> I would suggest jumping on the #cassandra channel on freenode IRC or
> emailing the cassandra-user list for general strategy questions or
> Cassandra failure detection/handling questions.  Both are fairly
> active.

If this was a cassandra and not a Pelops issue I'd do so...

Alex Araujo

unread,
May 24, 2011, 12:44:23 PM5/24/11
to Scale 7 - Libraries and systems for scalable computing
I am seeing the following:

011-05-24 11:34:42.928|DEBUG|
pool.CommonsBackedPool.getConnectionExcept|ThreadName=main|The node
selection strategy was unable to choose a node, sleeping before trying
again...
2011-05-24 11:34:43.28|DEBUG|
pool.CommonsBackedPool.getConnectionExcept|ThreadName=main|Max wait
time for connection exceeded
2011-05-24 11:34:43.29|ERROR|
pool.CommonsBackedPool.getConnectionExcept|ThreadName=main|Failed to
get a connection within the configured wait time because there are no
available nodes. This possibly indicates that either the suspension
strategy is too aggressive or that your cluster is in a bad way.

With:

Cluster cluster = new Cluster(nodeStr, port);
CommonsBackedPool.Policy policy = new
CommonsBackedPool.Policy();
policy.setMaxActivePerNode(maxActivePerNode);
policy.setMaxWaitForConnection(maxWaitForConnection);
policy.setRunMaintenanceTaskDuringInit(true);
policy.setTestConnectionsWhileIdle(true);

policy.setTimeBetweenScheduledMaintenanceTaskRunsMillis(10001);
CommonsBackedPool.INodeSuspensionStrategy suspension = new
CommonsBackedPool.INodeSuspensionStrategy()
{
@Override
public boolean evaluate(final CommonsBackedPool pool,
final PooledNode node)
{
CommonsBackedPool.PooledConnection c = null;
try
{
c = pool.new PooledConnection(new
Cluster.Node(node.getAddress(),
pool.getCluster().getConnectionConfig()),
pool.getKeyspace());
if (!pool.getConnectionValidator().validate(c)) {
final long suspendedUntil =
System.currentTimeMillis() + 60000L;
node.setSuspensionState(new
CommonsBackedPool.INodeSuspensionState()
{
@Override
public boolean isSuspended()
{
return suspendedUntil >=
System.currentTimeMillis();
}
});
return true;
}
return false;
}
catch (final Exception e)
{
return true;
}
finally
{
if (c != null) c.close();
}
}
};
CommonsBackedPool commonsBackedPool = new
CommonsBackedPool(cluster, keyspace, policy, new OperandPolicy(),
null, suspension, null);
Pelops.addPool(pool, commonsBackedPool);

Seems like Pelops is doing the right thing, but I'm still a bit
unclear as to what you are expecting.

Dan Washusen

unread,
May 24, 2011, 7:46:21 PM5/24/11
to sca...@googlegroups.com
I'm also unclear as to what you're trying to achieve/what your issue is.  I can take down nodes and as long as the cluster can maintain quorum (which my codes requires for certain reads and writes) everything works as expected without any further modification to Pelops.  It does log warning messages every once in a while but it doesn't throw exceptions.  I do this on a semi-regular basis in production when the Cassandra team releases new versions...

Pelops has inbuilt functionality to suspend downed nodes (nothing to do with INodeSuspensionStrategy).  By default it suspends downed nodes for 10 seconds (see org.scale7.cassandra.pelops.pool.CommonsBackedPool.Policy#setNodeDownSuspensionMillis).

-- 
Dan Washusen
if (!!pool.getConnectionValidator().validate(c)) {

final long suspendedUntil =
System.currentTimeMillis() + 60000L;
node.setSuspensionState(new
CommonsBackedPool.INodeSuspensionState()
{
@@Override
Message has been deleted

tcn

unread,
May 25, 2011, 5:52:31 AM5/25/11
to Scale 7 - Libraries and systems for scalable computing
On May 25, 1:46 am, Dan Washusen <d...@reactive.org> wrote:
> I'm also unclear as to what you're trying to achieve/what your issue is. I can take down nodes and as long as the cluster can maintain quorum (which my codes requires for certain reads and writes) everything works as expected without any further modification to Pelops. It does log warning messages every once in a while but it doesn't throw exceptions. I do this on a semi-regular basis in production when the Cassandra team releases new versions...

This is just what I want to achieve :)

I wrote a simple test and noticed that if I write a single column with
quorum with a downed node it logs this 10s suspension but succeeds. As
soon as I write 2 columns in a batch it breaks:

2011-05-25 11:50:59,453 WARN [main]
org.scale7.cassandra.pelops.Operand - Operation failed as result of
network exception. Connection is being marked as corrupt (and will
probably be be destroyed). See cause for details...
org.apache.thrift.transport.TTransportException:
java.net.SocketTimeoutException: Read timed out
at
org.apache.thrift.transport.TIOStreamTransport.read(TIOStreamTransport.java:
129)
at org.apache.thrift.transport.TTransport.readAll(TTransport.java:84)
at
org.apache.thrift.transport.TFramedTransport.readFrame(TFramedTransport.java:
129)
at
org.apache.thrift.transport.TFramedTransport.read(TFramedTransport.java:
101)
at org.apache.thrift.transport.TTransport.readAll(TTransport.java:84)
at
org.apache.thrift.protocol.TBinaryProtocol.readAll(TBinaryProtocol.java:
378)
at
org.apache.thrift.protocol.TBinaryProtocol.readI32(TBinaryProtocol.java:
297)
at
org.apache.thrift.protocol.TBinaryProtocol.readMessageBegin(TBinaryProtocol.java:
204)
at org.apache.cassandra.thrift.Cassandra
$Client.recv_batch_mutate(Cassandra.java:906)
at org.apache.cassandra.thrift.Cassandra
$Client.batch_mutate(Cassandra.java:890)
at org.scale7.cassandra.pelops.Mutator$1.execute(Mutator.java:67)
at org.scale7.cassandra.pelops.Mutator$1.execute(Mutator.java:63)
at org.scale7.cassandra.pelops.Operand.tryOperation(Operand.java:82)
at org.scale7.cassandra.pelops.Mutator.execute(Mutator.java:72)

Caused by: java.net.SocketTimeoutException: Read timed out
at java.net.SocketInputStream.socketRead0(Native Method)
at java.net.SocketInputStream.read(SocketInputStream.java:129)
at java.io.BufferedInputStream.fill(BufferedInputStream.java:218)
at java.io.BufferedInputStream.read1(BufferedInputStream.java:258)
at java.io.BufferedInputStream.read(BufferedInputStream.java:317)
at
org.apache.thrift.transport.TIOStreamTransport.read(TIOStreamTransport.java:
127)
... 14 more
2011-05-25 11:51:03,455 WARN [main]
org.scale7.cassandra.pelops.Operand - Operation failed as result of
network exception. Connection is being marked as corrupt (and will
probably be be destroyed). See cause for details...
org.apache.thrift.transport.TTransportException:
java.net.SocketTimeoutException: Read timed out
at
org.apache.thrift.transport.TIOStreamTransport.read(TIOStreamTransport.java:
129)
at org.apache.thrift.transport.TTransport.readAll(TTransport.java:84)
at
org.apache.thrift.transport.TFramedTransport.readFrame(TFramedTransport.java:
129)
at
org.apache.thrift.transport.TFramedTransport.read(TFramedTransport.java:
101)
at org.apache.thrift.transport.TTransport.readAll(TTransport.java:84)
at
org.apache.thrift.protocol.TBinaryProtocol.readAll(TBinaryProtocol.java:
378)
at
org.apache.thrift.protocol.TBinaryProtocol.readI32(TBinaryProtocol.java:
297)
at
org.apache.thrift.protocol.TBinaryProtocol.readMessageBegin(TBinaryProtocol.java:
204)
at org.apache.cassandra.thrift.Cassandra
$Client.recv_batch_mutate(Cassandra.java:906)
at org.apache.cassandra.thrift.Cassandra
$Client.batch_mutate(Cassandra.java:890)
at org.scale7.cassandra.pelops.Mutator$1.execute(Mutator.java:67)
at org.scale7.cassandra.pelops.Mutator$1.execute(Mutator.java:63)
at org.scale7.cassandra.pelops.Operand.tryOperation(Operand.java:82)
at org.scale7.cassandra.pelops.Mutator.execute(Mutator.java:72)

Caused by: java.net.SocketTimeoutException: Read timed out
at java.net.SocketInputStream.socketRead0(Native Method)
at java.net.SocketInputStream.read(SocketInputStream.java:129)
at java.io.BufferedInputStream.fill(BufferedInputStream.java:218)
at java.io.BufferedInputStream.read1(BufferedInputStream.java:258)
at java.io.BufferedInputStream.read(BufferedInputStream.java:317)
at
org.apache.thrift.transport.TIOStreamTransport.read(TIOStreamTransport.java:
127)
... 14 more

Same with hector. Now I'm completely confused. What does it matter how
many columns I write? I'm not supposed to execute after each column,
am I?
> >  if (!pool.getConnectionValidator().validate(c)) {
> >  final long suspendedUntil =
> > System.currentTimeMillis() + 60000L;
> >  node.setSuspensionState(new
> > CommonsBackedPool.INodeSuspensionState()
> >  {
> >  @Override

tcn

unread,
May 25, 2011, 6:28:54 AM5/25/11
to Scale 7 - Libraries and systems for scalable computing
On May 25, 11:52 am, tcn <timo.nent...@gmail.com> wrote:
> Same with hector. Now I'm completely confused. What does it matter how
> many columns I write? I'm not supposed to execute after each column,
> am I?

Apparently I'm lacking some fundamental knowledge. If I decommission
the node, quorums work fine with the remaining 4 nodes. Continues to
work if I shut down another (the forth) node. Guess it has something
to do with me having had re-balanced the ring like this:
http://wiki.apache.org/cassandra/Operations#Load_balancing

Dan Washusen

unread,
May 25, 2011, 6:35:43 AM5/25/11
to sca...@googlegroups.com
Nope, you build up your mutations on a Mutator instance and then call execute.  Once you've called execute the mutator instance is no longer valid and shouldn't be reused.

Pelops will try each operation (mutation, select, etc) three times before giving up.  It's 'normal' to see log messages indicating that an operation failed.  It's not normal to see operations fail and raise exceptions that you have to deal with.  If this does happen then their is something wrong with your cluster (or the conditions you are placing on the operation) and that isn't really to do with Pelops.  I'd suggest you raise it over on the Cassandra users mailing list...

I do agree that logging a huge stack trace every 10 seconds while a node is down is a bit excessive.  Maybe we should change it to an info level message or it's own category that can be ignored.

As for your earlier question regarding a useful implementation of INodeSuspensionStrategy; I think you were on the right track, checking if a node is reachable would cover one of the more basic tests of health (although your implementation wasn't correct, see the impl by Alex Araujo from earlier for a working version).  A better approach might be to integrate some of the checks mentioned on here: http://www.datastax.com/docs/0.8/operations/monitoring.

Hope that helps...

-- 
Dan Washusen
Make big files fly

tcn

unread,
May 25, 2011, 8:45:38 AM5/25/11
to Scale 7 - Libraries and systems for scalable computing
On May 25, 12:35 pm, Dan Washusen <d...@reactive.org> wrote:
> I'd suggest you raise it over on the Cassandra users mailing list...

Yes, did so and meanwhile I understood the issue.

When being new to Pelops these WARNs every 10s combined with the NoOps
strategy is confusing anyway :)

Thanks for the support!
Reply all
Reply to author
Forward
Message has been deleted
0 new messages