Cassie Connection Management

71 views
Skip to first unread message

Daniel Natic

unread,
Nov 15, 2012, 5:33:34 PM11/15/12
to twitter...@googlegroups.com
Good afternoon all,

I am currently testing Cassie and its performance regarding connections and connection management.  After turning on the OstrichStatsReceiver, I noticed that there are various things you have to do to get the values of "connects", "connections", and "load balancer" counts to change.  With our initial implementation, we were only getting one "connection" for the entire application and one "connect" for each akka actor that contained our Cassie cluster and keyspace definitions.

My question is:  What is the standardized way to access Cassie to get optimal connection performance.  I can think of three ways to setup cassie given our paradigm.  I have pasted all three definitions at the bottom.  After writing out these tests, it seems like the final implementation is the correct one.

Thanks!

Dan Natic

import com.twitter.cassie.{CounterColumn, Column, Cluster}
import com.twitter.finagle.stats.OstrichStatsReceiver
import com.twitter.finagle.tracing.NullTracer
import com.twitter.cassie.codecs.{ByteArrayCodec, Utf8Codec, LongCodec, IntCodec}

abstract class CassieStuff extends akka.actor.Actor {
  protected val cluster = new Cluster(Set("localhost"), 9160, new OstrichStatsReceiver(), NullTracer.factory)
  protected val keyspaceName = "distserv"

  protected def insert()

  def receive = {
    case _ => insert()
  }
}

class CassieTest1 extends CassieStuff {
  // Connection immediately established
  private val keyspace = cluster.keyspace(keyspaceName)
    .minConnectionsPerHost(5)
    .maxConnectionsPerHost(10)
    .connect()

  // Column Family definitions use keyspace instance
  private val batchInfo = keyspace.columnFamily("BatchInfo", Utf8Codec, Utf8Codec, ByteArrayCodec)
  private val batchCount = keyspace.counterColumnFamily("BatchCount",  Utf8Codec, Utf8Codec)

  protected def insert() {
    val batchID = "123"
    batchInfo.batch()
      .insert(batchID, Column("batchCount", IntCodec.encode(5)))
      .insert(batchID, Column("startTime", LongCodec.encode(10)))
      .execute()

    batchCount.add(batchID, CounterColumn("batchCount", 5))
  }
}

class CassieTest2 extends CassieStuff {
  private val cluster = new Cluster(Set("localhost"), 9160, new OstrichStatsReceiver(), NullTracer.factory)

  // Connection is not immediately established on the keyspace
  private val keyspace = cluster.keyspace(keyspaceName)
    .minConnectionsPerHost(5)
    .maxConnectionsPerHost(10)

  // Keyspace builder creates connection per keyspace definition
  private val batchInfo = keyspace.connect().columnFamily("BatchInfo", Utf8Codec, Utf8Codec, ByteArrayCodec)
  private val batchCount = keyspace.connect().counterColumnFamily("BatchCount",  Utf8Codec, Utf8Codec)

  protected def insert() {
    val batchID = "123"
    batchInfo.batch()
      .insert(batchID, Column("batchCount", IntCodec.encode(5)))
      .insert(batchID, Column("startTime", LongCodec.encode(10)))
      .execute()

    batchCount.add(batchID, CounterColumn("batchCount", 5))
  }
}

class CassieTest3 extends CassieStuff {
  // Connection is not immediately established on the keyspace
  private val keyspace = cluster.keyspace(keyspaceName)
    .minConnectionsPerHost(5)
    .maxConnectionsPerHost(10)

  // Column family definitions are changed from properties to methods to yield a new connection per reference
  private def batchInfo = keyspace.connect().columnFamily("BatchInfo", Utf8Codec, Utf8Codec, ByteArrayCodec)
  private def batchCount = keyspace.connect().counterColumnFamily("BatchCount",  Utf8Codec, Utf8Codec)

  protected def insert() {
    val batchID = "123"
    batchInfo.batch()
      .insert(batchID, Column("batchCount", IntCodec.encode(5)))
      .insert(batchID, Column("startTime", LongCodec.encode(10)))
      .execute()

    batchCount.add(batchID, CounterColumn("batchCount", 5))
  }
}

Ryan King

unread,
Nov 15, 2012, 6:44:13 PM11/15/12
to twitter...@googlegroups.com
On Thu, Nov 15, 2012 at 2:33 PM, Daniel Natic <dnat...@gmail.com> wrote:
> Good afternoon all,
>
> I am currently testing Cassie and its performance regarding connections and
> connection management. After turning on the OstrichStatsReceiver, I noticed
> that there are various things you have to do to get the values of
> "connects", "connections", and "load balancer" counts to change. With our
> initial implementation, we were only getting one "connection" for the entire
> application and one "connect" for each akka actor that contained our Cassie
> cluster and keyspace definitions.
>
> My question is: What is the standardized way to access Cassie to get
> optimal connection performance. I can think of three ways to setup cassie
> given our paradigm. I have pasted all three definitions at the bottom.
> After writing out these tests, it seems like the final implementation is the
> correct one.

Its not clear to me what you're trying to fix here. If you want
finalge (which is what handles the connections) to open more
connections, you need to use more concurrent connections.

My guess is that you'd be better served by optimizing for latency or
throughput and only worrying about connections if one of those metrics
is sub-optimal.

-ryan
--
@rk / theryanking.com

Daniel Natic

unread,
Nov 16, 2012, 9:56:41 AM11/16/12
to twitter...@googlegroups.com
Ryan,

Thanks for your fast response.  We have tuned our application to the point were Cassie/Cassandra seems to be the bottleneck.  We have a simple socket server listening for messages.  As soon as that socket server receives a message, it immediately sends the message to an Akka router which, in turn, sends the message to our Cassie/Cassandra actor to write to Cassandra.  Our test case that we use most frequently is a simple test client that sends 10,000 - 100,000 messages from a parallel looping socket client.  Using Ostrich, we can clearly see that the socket server/application will process and route all messages through Akka with very high performance, but Cassie is always lagging behind.

Given this issue, I started looking in depth at our implementation.  After I noticed that Cassie was only yielding one connection (as Ostrich reported), I was thinking that Cassie is just simply bottlenecking on processing all of the messages over a single connection.  If I could yield exactly one connection for each Cassie/Cassandra actor, I believe I will get better performance with my Cassandra writes and the bottleneck will be removed.  As it stands, we are currently using an implementation that models the CassieTest1 class I provided in my initial post.

To rephrase my original question, I just do not know how to implement Cassie correctly to yield exactly one connection per actor.  CassieTest1 yields one connection for the entire application.  CassieTest2 yields one connection per column family definition for the entire application.  CassieTest3 yields one connection for every transaction made regardless of actor count.  I just need help so that I yield one connection per actor, regardless of what keyspaces and column families I have pre-defined within my actor(s).  Does this make sense?

Thanks again for your help,

Dan Natic

Ryan King

unread,
Nov 16, 2012, 2:54:11 PM11/16/12
to twitter...@googlegroups.com
On Fri, Nov 16, 2012 at 6:56 AM, Daniel Natic <dnat...@gmail.com> wrote:
> Ryan,
>
> Thanks for your fast response. We have tuned our application to the point
> were Cassie/Cassandra seems to be the bottleneck. We have a simple socket
> server listening for messages. As soon as that socket server receives a
> message, it immediately sends the message to an Akka router which, in turn,
> sends the message to our Cassie/Cassandra actor to write to Cassandra. Our
> test case that we use most frequently is a simple test client that sends
> 10,000 - 100,000 messages from a parallel looping socket client. Using
> Ostrich, we can clearly see that the socket server/application will process
> and route all messages through Akka with very high performance, but Cassie
> is always lagging behind.

You should be able to achieve this will cassie without much problem.

> Given this issue, I started looking in depth at our implementation. After I
> noticed that Cassie was only yielding one connection (as Ostrich reported),
> I was thinking that Cassie is just simply bottlenecking on processing all of
> the messages over a single connection. If I could yield exactly one
> connection for each Cassie/Cassandra actor, I believe I will get better
> performance with my Cassandra writes and the bottleneck will be removed. As
> it stands, we are currently using an implementation that models the
> CassieTest1 class I provided in my initial post.

I've never used Akka so I don't know anything about its concurrency model.

Are its actors single threaded? If so you're not going to have
concurrent requests to cassie, and therefore will not have more than
one connection open (or used).

> To rephrase my original question, I just do not know how to implement Cassie
> correctly to yield exactly one connection per actor. CassieTest1 yields one
> connection for the entire application. CassieTest2 yields one connection
> per column family definition for the entire application. CassieTest3 yields
> one connection for every transaction made regardless of actor count. I just
> need help so that I yield one connection per actor, regardless of what
> keyspaces and column families I have pre-defined within my actor(s). Does
> this make sense?

Ignoring the specifics of actors, generally the way you want to use
cassie, or any finagle client, is to set it up once at startup time,
then let it handle the connection management. If you're resorting to
creating new instances all the time then there's probably something
else wrong. My guess in this case is that you aren't actually making
concurrent requests.

-ryan
--
@rk / theryanking.com

Stu Hood

unread,
Nov 16, 2012, 3:28:06 PM11/16/12
to twitter...@googlegroups.com
You mentioned looking at the "connects", "connections", and "load balancer" metrics that Finagle is reporting via Ostrich... would you mind including (sanitized, if necessary) those stats, so that we can confirm?

Seeing only 1 connection, (despite the fact that you are clearly setting:
    .minConnectionsPerHost(5)
    .maxConnectionsPerHost(10)
)... would indicate to me that something is preventing concurrency above cassie/finagle.

Just like with threads, if you have exactly 1 actor touching cassie, then you would never be able to utilize more than 1 connection at a time. You would need a pool of actors of size N to utilize N connections.

----

Sidenote: your code examples are not waiting for the results of their calls:
    batchInfo.batch()
      .insert(batchID, Column("batchCount", IntCodec.encode(5)))
      .insert(batchID, Column("startTime", LongCodec.encode(10)))
      .execute()

    batchCount.add(batchID, CounterColumn("batchCount", 5))
^ both of these calls are "fire-and-forget", so you will never know whether they actually succeeded.

If you'd like to block the actor in order to determine whether the call succeeded, you'd want to add `.apply()`, `.get()`, or `()` to block for the Future returned by each call. If you are ok with the calls happening in parallel, you could also do:

   
 val batchFuture =
    batchInfo.batch()
      .insert(batchID, Column("batchCount", IntCodec.encode(5)))
      .insert(batchID, Column("startTime", LongCodec.encode(10)))
      .execute()
 val countFuture = 
    batchCount.add(batchID, CounterColumn("batchCount", 5))
Future.join(batchFuture, countFuture).get

 

On Thu, Nov 15, 2012 at 2:33 PM, Daniel Natic <dnat...@gmail.com> wrote:

Stu Hood

unread,
Nov 16, 2012, 3:35:15 PM11/16/12
to twitter...@googlegroups.com
Hm... and actually, I just realized that my 'sidenote' was relevant to the original point. If you're not actually blocking for your requests, you should be able to trivially fire many requests in parallel. So I'll reiterate the request for stats.
Reply all
Reply to author
Forward
0 new messages