[virgil] r160 committed - Connection Pooling for Cassandra.

39 views
Skip to first unread message

virgil.apach...@codespot.com

unread,
Jan 31, 2012, 9:25:09 AM1/31/12
to virgil...@gmail.com
Revision: 160
Author: bone...@gmail.com
Date: Tue Jan 31 06:23:41 2012
Log: Connection Pooling for Cassandra.
http://code.google.com/a/apache-extras.org/p/virgil/source/detail?r=160

Added:
/trunk/server/src/main/java/org/apache/virgil/pool
/trunk/server/src/main/java/org/apache/virgil/pool/ConnectionPool.java

/trunk/server/src/main/java/org/apache/virgil/pool/ConnectionPoolAspect.java

/trunk/server/src/main/java/org/apache/virgil/pool/ConnectionPoolClient.java

/trunk/server/src/main/java/org/apache/virgil/pool/EmptyConnectionPoolException.java
/trunk/server/src/main/java/org/apache/virgil/pool/PooledConnection.java

/trunk/server/src/main/java/org/apache/virgil/triggers/CassandraServerTriggerAspect.java
/trunk/server/src/main/resources/META-INF
/trunk/server/src/test/resources/core-site.xml
/trunk/server/src/test/resources/hdfs-site.xml
/trunk/server/src/test/resources/mapred-site.xml
Deleted:
/trunk/server/src/main/java/org/apache/virgil/aop
/trunk/server/src/main/java/org/apache/virgil/pool/CassandraDaemonAspect.aj
/trunk/server/src/main/java/org/apache/virgil/pool/CassandraServerAspect.aj
/trunk/server/src/test/resources/META-INF
Modified:
/trunk/server/src/main/java/org/apache/virgil/CassandraStorage.java
/trunk/server/src/main/java/org/apache/virgil/cli/VirgilCommand.java

/trunk/server/src/main/java/org/apache/virgil/triggers/DistributedCommitLog.java
/trunk/server/src/main/java/org/apache/virgil/triggers/TestTrigger.java
/trunk/server/src/main/java/org/apache/virgil/triggers/TriggerStore.java
/trunk/server/src/main/java/org/apache/virgil/triggers/TriggerTask.java
/trunk/server/src/main/resources/META-INF/aop.xml

=======================================
--- /dev/null
+++ /trunk/server/src/main/java/org/apache/virgil/pool/ConnectionPool.java
Tue Jan 31 06:23:41 2012
@@ -0,0 +1,95 @@
+package org.apache.virgil.pool;
+
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.NoSuchElementException;
+
+import org.apache.cassandra.thrift.Cassandra;
+import org.apache.cassandra.thrift.CassandraServer;
+import org.apache.cassandra.thrift.TBinaryProtocol;
+import org.apache.thrift.protocol.TProtocol;
+import org.apache.thrift.transport.TFramedTransport;
+import org.apache.thrift.transport.TSocket;
+import org.apache.thrift.transport.TTransport;
+import org.apache.virgil.config.VirgilConfiguration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ConnectionPool {
+ private static Logger logger =
LoggerFactory.getLogger(ConnectionPool.class);
+ // TODO: May want this to match the HTTP thread configuration.
+ private static final int MAX_POOL_SIZE = 100;
+ private static final int MAX_TRIES_FOR_CONNECTION = 3;
+ private static final int CONNECTION_WAIT_TIME = 500;
+ private static Object LOCK = new Object();
+
+ private static LinkedList<Cassandra.Iface> free = new
LinkedList<Cassandra.Iface>();
+ private static Map<Cassandra.Iface, TTransport> socketMap = new
HashMap<Cassandra.Iface, TTransport>();
+ private static Cassandra.Iface embeddedServer = null;
+
+ public static void initializePool() throws Exception {
+ logger.debug("Creating connection pool, initializing [" +
MAX_POOL_SIZE + "] connections.");
+ // Don't need pooling if we are embedded
+ if (VirgilConfiguration.isEmbedded()) {
+ embeddedServer = new CassandraServer();
+ } else {
+ for (int i = 0; i < MAX_POOL_SIZE; i++) {
+ free.add(createConnection());
+ }
+ }
+ }
+
+ public static Cassandra.Iface createConnection() throws Exception {
+ if (VirgilConfiguration.isEmbedded()) {
+ return new CassandraServer();
+ } else {
+ TTransport transport = new TFramedTransport(new
TSocket(VirgilConfiguration.getHost(),
+ VirgilConfiguration.getPort()));
+ TProtocol proto = new TBinaryProtocol(transport);
+ transport.open();
+ Cassandra.Iface connection = new Cassandra.Client(proto);
+ socketMap.put(connection, transport);
+ return connection;
+ }
+ }
+
+ public static Cassandra.Iface getConnection(Object requestor) throws
EmptyConnectionPoolException {
+ // Short circuit if embedded.
+ if (VirgilConfiguration.isEmbedded()) {
+ return embeddedServer;
+ }
+
+ for (int x = 0; x < MAX_TRIES_FOR_CONNECTION; x++) {
+ try {
+ Cassandra.Iface connection = free.pop();
+ logger.debug("Releasing connection to [" +
requestor.getClass() + "] [" + free.size() + "] remain.");
+ return connection;
+ } catch (NoSuchElementException nsee) {
+ logger.warn("Waiting " + CONNECTION_WAIT_TIME + "ms for
cassandra connection, attempt [" + x + "]");
+ }
+ try {
+ synchronized (LOCK) {
+ logger.warn("LOCKING for cassandra connection.");
+ LOCK.wait(CONNECTION_WAIT_TIME);
+ }
+ } catch (InterruptedException ie) {
+ throw new EmptyConnectionPoolException("No cassandra
connection, and interupted while waiting.", ie);
+ }
+ }
+ throw new EmptyConnectionPoolException("No cassandra connections
left in pool.");
+ }
+
+ public static void release(Object requestor, Cassandra.Iface
connection) throws Exception {
+ // (TODO: Could make to ConnectionPool implementations based on
embedded or not.)
+ if (VirgilConfiguration.isEmbedded()) {
+ return;
+ }
+
+ free.add(connection);
+ logger.debug("Returning connection from [" + requestor.getClass()
+ "] [" + free.size() + "] remain.");
+ synchronized (LOCK) {
+ LOCK.notify();
+ }
+ }
+}
=======================================
--- /dev/null
+++
/trunk/server/src/main/java/org/apache/virgil/pool/ConnectionPoolAspect.java
Tue Jan 31 06:23:41 2012
@@ -0,0 +1,34 @@
+package org.apache.virgil.pool;
+
+import org.apache.cassandra.thrift.Cassandra;
+import org.aspectj.lang.ProceedingJoinPoint;
+import org.aspectj.lang.annotation.Around;
+import org.aspectj.lang.annotation.Aspect;
+import org.aspectj.lang.annotation.Pointcut;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@Aspect
+public class ConnectionPoolAspect {
+ private static Logger logger =
LoggerFactory.getLogger(ConnectionPoolAspect.class);
+
+ public Object delegate(ProceedingJoinPoint thisJoinPoint) throws
Throwable {
+ ConnectionPoolClient client = (ConnectionPoolClient)
thisJoinPoint.getTarget();
+ Cassandra.Iface connection = ConnectionPool.getConnection(client);
+ client.setConnection(connection);
+ try {
+ return thisJoinPoint.proceed(thisJoinPoint.getArgs());
+ } finally {
+ ConnectionPool.release(client, connection);
+ }
+ }
+
+ @Pointcut("execution(@PooledConnection * *.*(..))")
+ public void methodAnnotatedWithPooledConnection() {}
+
+ @Around("methodAnnotatedWithPooledConnection()")
+ public Object handleStorage(ProceedingJoinPoint thisJoinPoint) throws
Throwable {
+ logger.debug("AOP:STORAGE connection for [" +
thisJoinPoint.getSignature() + "]");
+ return delegate(thisJoinPoint);
+ }
+}
=======================================
--- /dev/null
+++
/trunk/server/src/main/java/org/apache/virgil/pool/ConnectionPoolClient.java
Tue Jan 31 06:23:41 2012
@@ -0,0 +1,23 @@
+package org.apache.virgil.pool;
+
+import org.apache.cassandra.thrift.Cassandra;
+
+public class ConnectionPoolClient {
+ ThreadLocal<Cassandra.Iface> connection = new
ThreadLocal<Cassandra.Iface>() {
+ @Override
+ protected Cassandra.Iface initialValue() {
+ throw new RuntimeException("Using connection w/o attaining from
the pool.");
+ }
+ };
+
+ public void setConnection(Cassandra.Iface connection) {
+ this.connection.set(connection);
+ }
+
+ public Cassandra.Iface getConnection(String keyspace) throws Exception
{
+ Cassandra.Iface connection = this.connection.get();
+ if (keyspace != null)
+ connection.set_keyspace(keyspace);
+ return connection;
+ }
+}
=======================================
--- /dev/null
+++
/trunk/server/src/main/java/org/apache/virgil/pool/EmptyConnectionPoolException.java
Tue Jan 31 06:23:41 2012
@@ -0,0 +1,13 @@
+package org.apache.virgil.pool;
+
+public class EmptyConnectionPoolException extends RuntimeException {
+ private static final long serialVersionUID = 1L;
+
+ public EmptyConnectionPoolException(String message){
+ super(message);
+ }
+
+ public EmptyConnectionPoolException(String message, Exception e){
+ super(message, e);
+ }
+}
=======================================
--- /dev/null
+++
/trunk/server/src/main/java/org/apache/virgil/pool/PooledConnection.java
Tue Jan 31 06:23:41 2012
@@ -0,0 +1,5 @@
+package org.apache.virgil.pool;
+
+public @interface PooledConnection {
+
+}
=======================================
--- /dev/null
+++
/trunk/server/src/main/java/org/apache/virgil/triggers/CassandraServerTriggerAspect.java
Tue Jan 31 06:23:41 2012
@@ -0,0 +1,39 @@
+package org.apache.virgil.triggers;
+
+import java.util.List;
+
+import org.apache.cassandra.db.IMutation;
+import org.apache.cassandra.db.RowMutation;
+import org.apache.cassandra.thrift.ConsistencyLevel;
+import org.aspectj.lang.JoinPoint;
+import org.aspectj.lang.annotation.AfterReturning;
+import org.aspectj.lang.annotation.Aspect;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@Aspect
+public class CassandraServerTriggerAspect {
+ private static Logger logger =
LoggerFactory.getLogger(CassandraServerTriggerAspect.class);
+
+ @AfterReturning("call(*
org.apache.cassandra.thrift.CassandraServer.doInsert(..))")
+ public void writeToCommitLog(JoinPoint thisJoinPoint) {
+ try {
+ ConsistencyLevel consistencyLevel = (ConsistencyLevel)
thisJoinPoint.getArgs()[0];
+ @SuppressWarnings("unchecked")
+ List<IMutation> mutations = (List<IMutation>)
thisJoinPoint.getArgs()[1];
+ for (IMutation mutation : mutations) {
+ if (mutation instanceof RowMutation) {
+ RowMutation rowMutation = (RowMutation) mutation;
+ logger.debug("Mutation for [" + rowMutation.getTable()
+ "] with consistencyLevel ["
+ + consistencyLevel + "]");
+ if
(!rowMutation.getTable().equals(DistributedCommitLog.KEYSPACE)) {
+
DistributedCommitLog.getLog().writeMutation(consistencyLevel, rowMutation);
+ }
+ }
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+
+ }
+}
=======================================
--- /dev/null
+++ /trunk/server/src/test/resources/core-site.xml Tue Jan 31 06:23:41 2012
@@ -0,0 +1,12 @@
+<?xml version="1.0"?>
+<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
+
+<!-- Put site-specific property overrides in this file. -->
+
+<configuration>
+<property>
+ <name>fs.default.name</name>
+ <value>hdfs://dlcirrus01.hmsonline.com:9000</value>
+ <description>The name of the default filesystem</description>
+</property>
+</configuration>
=======================================
--- /dev/null
+++ /trunk/server/src/test/resources/hdfs-site.xml Tue Jan 31 06:23:41 2012
@@ -0,0 +1,24 @@
+<?xml version="1.0"?>
+<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
+
+<!-- Put site-specific property overrides in this file. -->
+
+<configuration>
+ <property>
+ <name>dfs.replication</name>
+ <value>3</value>
+ </property>
+ <property>
+ <name>dfs.name.dir</name>
+ <value>/opt/cirrus/data/hadoop/dfs/name</value>
+ </property>
+ <property>
+ <name>dfs.data.dir</name>
+ <value>/opt/cirrus/data/hadoop/dfs/data</value>
+ </property>
+ <property>
+ <name>dfs.checkpoint.dir</name>
+ <value>/opt/cirrus/data/hadoop/dfs/namesecondary</value>
+ </property>
+
+</configuration>
=======================================
--- /dev/null
+++ /trunk/server/src/test/resources/mapred-site.xml Tue Jan 31 06:23:41
2012
@@ -0,0 +1,18 @@
+<?xml version="1.0"?>
+<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
+
+<!-- Put site-specific property overrides in this file. -->
+
+<configuration>
+ <property>
+ <name>mapred.job.tracker</name>
+ <value>dlcirrus01.hmsonline.com:9001</value>
+ <description>Where the job tracker is</description>
+ </property>
+ <property>
+ <name>mapred.job.reuse.jvm.num.tasks</name>
+ <value>-1</value>
+ <description>The number of JVMs to reuse. -1 reuses
all.</description>
+ </property>
+
+</configuration>
=======================================
---
/trunk/server/src/main/java/org/apache/virgil/aop/CassandraDaemonAspect.aj
Tue Jan 24 13:31:50 2012
+++ /dev/null
@@ -1,16 +0,0 @@
-package org.apache.virgil.aop;
-
-public aspect CassandraDaemonAspect {
-
- private pointcut mainMethod() :
- execution(public static void main(String[]));
-
- before() : mainMethod() {
- //throw new RuntimeException("CRASH IT!");
- System.out.println("> " + thisJoinPoint);
- }
-
- after() : mainMethod() {
- System.out.println("< " + thisJoinPoint);
- }
-}
=======================================
---
/trunk/server/src/main/java/org/apache/virgil/aop/CassandraServerAspect.aj
Thu Jan 26 09:00:31 2012
+++ /dev/null
@@ -1,57 +0,0 @@
-package org.apache.virgil.aop;
-
-import java.util.List;
-
-import org.apache.cassandra.db.IMutation;
-import org.apache.cassandra.db.RowMutation;
-import org.apache.cassandra.thrift.ConsistencyLevel;
-import org.apache.virgil.triggers.DistributedCommitLog;
-
-public aspect CassandraServerAspect {
-
- private pointcut insertMethod() :
- execution(private void doInsert(ConsistencyLevel, List<?
extends IMutation>));
-
- after() : insertMethod() {
- try {
- ConsistencyLevel consistencyLevel = (ConsistencyLevel)
thisJoinPoint.getArgs()[0];
- @SuppressWarnings("unchecked")
- List<IMutation> mutations = (List<IMutation>)
thisJoinPoint.getArgs()[1];
- for (IMutation mutation : mutations) {
- if (mutation instanceof RowMutation) {
- RowMutation rowMutation = (RowMutation) mutation;
- // String key =
ByteBufferUtil.string(rowMutation.key());
- // System.out.println("Mutation for [" + key + "] @ ["
+
- // rowMutation.getTable()
- // + "] with consistencyLevel [" + consistencyLevel
+ "]");
- System.out.println("Mutation for [" +
rowMutation.getTable() + "] with consistencyLevel ["
- + consistencyLevel + "]");
- if
(!rowMutation.getTable().equals(DistributedCommitLog.KEYSPACE)) {
-
DistributedCommitLog.writeMutation(consistencyLevel, rowMutation);
- }
- // for (Integer cfId :
rowMutation.getColumnFamilyIds()) {
- // ColumnFamily columnFamily =
- // rowMutation.getColumnFamily(cfId);
- // for (IColumn column :
columnFamily.getSortedColumns()) {
- // if (co)
- // String name = ByteBufferUtil.string(column.name());
- // String value =
ByteBufferUtil.string(column.value());
- // boolean delete = columnFamily.isMarkedForDelete();
- // if (delete) {
- // System.out.println(" -- DELETE -- [" + name + "] =>
[" +
- // value + "]");
- // } else {
- // System.out.println(" -- SET -- [" + name + "] => ["
+
- // value + "]");
- // }
- // }
- // }
-
- }
- }
- } catch (Exception e) {
- e.printStackTrace();
- }
-
- }
-}
=======================================
--- /trunk/server/src/main/java/org/apache/virgil/CassandraStorage.java Thu
Jan 26 09:00:31 2012
+++ /trunk/server/src/main/java/org/apache/virgil/CassandraStorage.java Tue
Jan 31 06:23:41 2012
@@ -25,8 +25,6 @@
import java.util.Map;
import java.util.Timer;

-import org.apache.cassandra.thrift.Cassandra;
-import org.apache.cassandra.thrift.CassandraServer;
import org.apache.cassandra.thrift.CfDef;
import org.apache.cassandra.thrift.Column;
import org.apache.cassandra.thrift.ColumnOrSuperColumn;
@@ -39,21 +37,19 @@
import org.apache.cassandra.thrift.Mutation;
import org.apache.cassandra.thrift.SlicePredicate;
import org.apache.cassandra.thrift.SliceRange;
-import org.apache.cassandra.thrift.TBinaryProtocol;
import org.apache.cassandra.utils.ByteBufferUtil;
-import org.apache.thrift.protocol.TProtocol;
-import org.apache.thrift.transport.TFramedTransport;
-import org.apache.thrift.transport.TSocket;
-import org.apache.thrift.transport.TTransport;
import org.apache.virgil.config.VirgilConfiguration;
import org.apache.virgil.index.Indexer;
+import org.apache.virgil.pool.ConnectionPool;
+import org.apache.virgil.pool.ConnectionPoolClient;
+import org.apache.virgil.pool.PooledConnection;
import org.apache.virgil.triggers.DistributedCommitLog;
import org.apache.virgil.triggers.TriggerStore;
import org.apache.virgil.triggers.TriggerTask;
import org.json.simple.JSONArray;
import org.json.simple.JSONObject;

-public class CassandraStorage {
+public class CassandraStorage extends ConnectionPoolClient {
private static final int MAX_COLUMNS = 1000;
private static final int MAX_ROWS = 20;
private Indexer indexer = null;
@@ -62,57 +58,43 @@
private static final long TRIGGER_FREQUENCY = 5000; // every X
milliseconds

// TODO: Come back and make indexing AOP
- public CassandraStorage(VirgilConfiguration config, Indexer indexer) {
+ public CassandraStorage(VirgilConfiguration config, Indexer indexer)
throws Exception {
this.indexer = indexer;
// CassandraStorage.server = server;
this.config = config;
- DistributedCommitLog.create();
- TriggerStore.create();
+ ConnectionPool.initializePool();
+ DistributedCommitLog.getLog().create();
+ TriggerStore.getStore().create();
triggerTimer = new Timer(true);
triggerTimer.schedule(new TriggerTask(), 0, TRIGGER_FREQUENCY);
}

- // For now, get a new connection every time.
- // TODO: Use connection pooling
- public static Cassandra.Iface getCassandra(String keyspace)
- throws Exception {
- if (VirgilConfiguration.isEmbedded()) {
- Cassandra.Iface server = new CassandraServer();
- if (keyspace != null)
- server.set_keyspace(keyspace);
- return server;
- } else {
- TTransport tr = new TFramedTransport(new
TSocket(VirgilConfiguration.getHost(), VirgilConfiguration.getPort()));
- TProtocol proto = new TBinaryProtocol(tr);
- tr.open();
- Cassandra.Iface server = new Cassandra.Client(proto);
- if (keyspace != null)
- server.set_keyspace(keyspace);
- return server;
- }
- }
-
+ @PooledConnection
public JSONArray getKeyspaces() throws Exception {
- List<KsDef> keyspaces = getCassandra(null).describe_keyspaces();
+ List<KsDef> keyspaces = getConnection(null).describe_keyspaces();
return JsonMarshaller.marshallKeyspaces(keyspaces, true);
}

+ @PooledConnection
public void addKeyspace(String keyspace) throws Exception {
// TODO: Take key space in via JSON/XML. (Replace hard-coded
values)
List<CfDef> cfDefList = new ArrayList<CfDef>();
KsDef ksDef = new
KsDef(keyspace, "org.apache.cassandra.locator.SimpleStrategy", cfDefList);
ksDef.putToStrategy_options("replication_factor", "1");
- getCassandra(null).system_add_keyspace(ksDef);
+ getConnection(null).system_add_keyspace(ksDef);
}

+ @PooledConnection
public void dropColumnFamily(String keyspace, String columnFamily)
throws Exception {
- getCassandra(keyspace).system_drop_column_family(columnFamily);
+ getConnection(keyspace).system_drop_column_family(columnFamily);
}

+ @PooledConnection
public void dropKeyspace(String keyspace) throws Exception {
- getCassandra(keyspace).system_drop_keyspace(keyspace);
+ getConnection(keyspace).system_drop_keyspace(keyspace);
}

+ @PooledConnection
public void createColumnFamily(String keyspace, String
columnFamilyName) throws Exception {
// TODO: Take column family definition in via JSON/XML. (Replace
// hard-coded values)
@@ -120,10 +102,11 @@
columnFamily.setKey_validation_class("UTF8Type");
columnFamily.setComparator_type("UTF8Type");
columnFamily.setDefault_validation_class("UTF8Type");
- getCassandra(keyspace).system_add_column_family(columnFamily);
+ getConnection(keyspace).system_add_column_family(columnFamily);
}

@SuppressWarnings("unchecked")
+ @PooledConnection
public void addColumn(String keyspace, String column_family, String
rowkey, String column_name, String value,
ConsistencyLevel consistency_level, boolean index) throws
Exception {
JSONObject json = new JSONObject();
@@ -139,11 +122,13 @@
}
}

+ @PooledConnection
public void setColumn(String keyspace, String column_family, String
key, JSONObject json,
ConsistencyLevel consistency_level, boolean index) throws
Exception {
this.setColumn(keyspace, column_family, key, json,
consistency_level, index, System.currentTimeMillis() * 1000);
}

+ @PooledConnection
public void setColumn(String keyspace, String column_family, String
key, JSONObject json,
ConsistencyLevel consistency_level, boolean index, long
timestamp) throws Exception {
List<Mutation> slice = new ArrayList<Mutation>();
@@ -165,17 +150,18 @@
Map<String, List<Mutation>> cfMutations = new HashMap<String,
List<Mutation>>();
cfMutations.put(column_family, slice);
mutationMap.put(ByteBufferUtil.bytes(key), cfMutations);
- getCassandra(keyspace).batch_mutate(mutationMap,
consistency_level);
+ getConnection(keyspace).batch_mutate(mutationMap,
consistency_level);

if (config.isIndexingEnabled() && index)
indexer.index(column_family, key, json);
}

+ @PooledConnection
public void deleteColumn(String keyspace, String column_family, String
key, String column,
ConsistencyLevel consistency_level, boolean purgeIndex) throws
Exception {
ColumnPath path = new ColumnPath(column_family);
path.setColumn(ByteBufferUtil.bytes(column));
- getCassandra(keyspace).remove(ByteBufferUtil.bytes(key), path,
System.currentTimeMillis() * 1000,
+ getConnection(keyspace).remove(ByteBufferUtil.bytes(key), path,
System.currentTimeMillis() * 1000,
consistency_level);

// TODO: Revisit deleting a single field because it requires a
fetch
@@ -191,11 +177,12 @@
}
}

+ @PooledConnection
public long deleteRow(String keyspace, String column_family, String
key, ConsistencyLevel consistency_level,
boolean purgeIndex) throws Exception {
long deleteTime = System.currentTimeMillis() * 1000;
ColumnPath path = new ColumnPath(column_family);
- getCassandra(keyspace).remove(ByteBufferUtil.bytes(key), path,
deleteTime, consistency_level);
+ getConnection(keyspace).remove(ByteBufferUtil.bytes(key), path,
deleteTime, consistency_level);

// Update Index
if (config.isIndexingEnabled() && purgeIndex) {
@@ -204,15 +191,17 @@
return deleteTime;
}

+ @PooledConnection
public String getColumn(String keyspace, String columnFamily, String
key, String column,
ConsistencyLevel consistencyLevel) throws Exception {
ColumnPath path = new ColumnPath(columnFamily);
path.setColumn(ByteBufferUtil.bytes(column));
- ColumnOrSuperColumn column_result =
getCassandra(keyspace).get(ByteBufferUtil.bytes(key), path,
+ ColumnOrSuperColumn column_result =
getConnection(keyspace).get(ByteBufferUtil.bytes(key), path,
consistencyLevel);
return new String(column_result.getColumn().getValue(), "UTF8");
}

+ @PooledConnection
public JSONArray getRows(String keyspace, String columnFamily,
ConsistencyLevel consistencyLevel) throws Exception {
SlicePredicate predicate = new SlicePredicate();
SliceRange range = new SliceRange(ByteBufferUtil.bytes(""),
ByteBufferUtil.bytes(""), false, MAX_COLUMNS);
@@ -222,17 +211,18 @@
keyRange.setStart_key(ByteBufferUtil.bytes(""));
keyRange.setEnd_key(ByteBufferUtil.EMPTY_BYTE_BUFFER);
ColumnParent parent = new ColumnParent(columnFamily);
- List<KeySlice> rows =
getCassandra(keyspace).get_range_slices(parent, predicate, keyRange,
consistencyLevel);
+ List<KeySlice> rows =
getConnection(keyspace).get_range_slices(parent, predicate, keyRange,
consistencyLevel);
return JsonMarshaller.marshallRows(rows, true);
}

+ @PooledConnection
public JSONObject getSlice(String keyspace, String columnFamily,
String key, ConsistencyLevel consistencyLevel)
throws Exception {
SlicePredicate predicate = new SlicePredicate();
SliceRange range = new SliceRange(ByteBufferUtil.bytes(""),
ByteBufferUtil.bytes(""), false, MAX_COLUMNS);
predicate.setSlice_range(range);
ColumnParent parent = new ColumnParent(columnFamily);
- List<ColumnOrSuperColumn> slice =
getCassandra(keyspace).get_slice(ByteBufferUtil.bytes(key), parent,
+ List<ColumnOrSuperColumn> slice =
getConnection(keyspace).get_slice(ByteBufferUtil.bytes(key), parent,
predicate, consistencyLevel);
if (slice.size() > 0)
return JsonMarshaller.marshallSlice(slice);
=======================================
--- /trunk/server/src/main/java/org/apache/virgil/cli/VirgilCommand.java
Tue Jan 24 13:31:50 2012
+++ /trunk/server/src/main/java/org/apache/virgil/cli/VirgilCommand.java
Tue Jan 31 06:23:41 2012
@@ -6,7 +6,6 @@
import org.apache.commons.cli.OptionBuilder;
import org.apache.commons.cli.OptionGroup;
import org.apache.commons.cli.Options;
-import org.apache.thrift.transport.TTransportException;
import org.apache.virgil.CassandraStorage;
import org.apache.virgil.VirgilService;
import org.apache.virgil.config.VirgilConfiguration;
@@ -35,7 +34,7 @@
}

private CassandraStorage createCassandraStorage(CommandLine params,
VirgilConfiguration config)
- throws TTransportException {
+ throws Exception {
SolrIndexer indexer = new SolrIndexer(config);

if (params.hasOption("embedded")) {
=======================================
---
/trunk/server/src/main/java/org/apache/virgil/triggers/DistributedCommitLog.java
Thu Jan 26 09:00:31 2012
+++
/trunk/server/src/main/java/org/apache/virgil/triggers/DistributedCommitLog.java
Tue Jan 31 06:23:41 2012
@@ -16,30 +16,40 @@
import org.apache.cassandra.thrift.Mutation;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.UUIDGen;
-import org.apache.virgil.CassandraStorage;
+import org.apache.virgil.pool.ConnectionPoolClient;
+import org.apache.virgil.pool.PooledConnection;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

-public class DistributedCommitLog {
+public class DistributedCommitLog extends ConnectionPoolClient {
+ private static Logger logger =
LoggerFactory.getLogger(DistributedCommitLog.class);
+
public static final String KEYSPACE = "cirrus";
public static final String COLUMN_FAMILY = "CommitLog";
private static boolean initialized = false;
- private static Logger logger =
LoggerFactory.getLogger(DistributedCommitLog.class);
-
- public static void create() {
+ private static DistributedCommitLog instance = null;
+
+ public static synchronized DistributedCommitLog getLog() {
+ if (instance == null)
+ instance = new DistributedCommitLog();
+ return instance;
+ }
+
+ @PooledConnection
+ public synchronized void create() throws Exception {
if (!initialized) {
try {
List<CfDef> cfDefList = new ArrayList<CfDef>();
KsDef ksDef = new
KsDef(KEYSPACE, "org.apache.cassandra.locator.SimpleStrategy", cfDefList);
ksDef.putToStrategy_options("replication_factor", "1");
-
CassandraStorage.getCassandra(null).system_add_keyspace(ksDef);
+ getConnection(null).system_add_keyspace(ksDef);
} catch (Exception e) {
logger.debug("Did not create System.CommitLog. (probably
already there)");
- }
+ }
try {
CfDef columnFamily = new CfDef(KEYSPACE, COLUMN_FAMILY);
columnFamily.setKey_validation_class("TimeUUIDType");
-
CassandraStorage.getCassandra(KEYSPACE).system_add_column_family(columnFamily);
+
getConnection(KEYSPACE).system_add_column_family(columnFamily);
initialized = true;
} catch (Exception e) {
logger.debug("Did not create System.CommitLog. (probably
already there)");
@@ -47,8 +57,8 @@
}
}

- public static void writeMutation(ConsistencyLevel consistencyLevel,
RowMutation rowMutation)
- throws Exception {
+ @PooledConnection
+ public void writeMutation(ConsistencyLevel consistencyLevel,
RowMutation rowMutation) throws Exception {
List<Mutation> slice = new ArrayList<Mutation>();
Column c = new Column();
c.setName(ByteBufferUtil.bytes("mutation"));
@@ -66,6 +76,6 @@
byte[] rowKey = UUIDGen.getTimeUUIDBytes();
mutationMap.put(ByteBuffer.wrap(rowKey), cfMutations);
// TODO: Add Exception Handling.
- CassandraStorage.getCassandra(KEYSPACE).batch_mutate(mutationMap,
consistencyLevel);
+ getConnection(KEYSPACE).batch_mutate(mutationMap,
consistencyLevel);
}
}
=======================================
--- /trunk/server/src/main/java/org/apache/virgil/triggers/TestTrigger.java
Thu Jan 26 09:00:31 2012
+++ /trunk/server/src/main/java/org/apache/virgil/triggers/TestTrigger.java
Tue Jan 31 06:23:41 2012
@@ -1,5 +1,13 @@
package org.apache.virgil.triggers;

-public class TestTrigger {
+import org.apache.cassandra.db.RowMutation;
+
+public class TestTrigger implements Trigger {
+
+ @Override
+ public void process(RowMutation rowMutation) {
+ // TODO Auto-generated method stub
+
+ }

}
=======================================
---
/trunk/server/src/main/java/org/apache/virgil/triggers/TriggerStore.java
Thu Jan 26 14:01:22 2012
+++
/trunk/server/src/main/java/org/apache/virgil/triggers/TriggerStore.java
Tue Jan 31 06:23:41 2012
@@ -15,24 +15,33 @@
import org.apache.cassandra.thrift.SlicePredicate;
import org.apache.cassandra.thrift.SliceRange;
import org.apache.cassandra.utils.ByteBufferUtil;
-import org.apache.virgil.CassandraStorage;
+import org.apache.virgil.pool.ConnectionPoolClient;
+import org.apache.virgil.pool.PooledConnection;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

-public class TriggerStore {
+public class TriggerStore extends ConnectionPoolClient {
public static final String KEYSPACE = "cirrus";
public static final String COLUMN_FAMILY = "Trigger";
public static final String ENABLED = "enabled";
private static boolean initialized = false;
private static Logger logger =
LoggerFactory.getLogger(TriggerStore.class);
-
- public static void create() {
+ private static TriggerStore instance = null;
+
+ public static synchronized TriggerStore getStore() {
+ if (instance == null)
+ instance = new TriggerStore();
+ return instance;
+ }
+
+ @PooledConnection
+ public void create() throws Exception {
if (!initialized) {
try {
List<CfDef> cfDefList = new ArrayList<CfDef>();
KsDef ksDef = new
KsDef(KEYSPACE, "org.apache.cassandra.locator.SimpleStrategy", cfDefList);
ksDef.putToStrategy_options("replication_factor", "1");
-
CassandraStorage.getCassandra(null).system_add_keyspace(ksDef);
+ getConnection(null).system_add_keyspace(ksDef);
} catch (Exception e) {
logger.debug("Did not create System.CommitLog. (probably
already there)");
}
@@ -41,7 +50,7 @@
columnFamily.setKey_validation_class("UTF8Type");
columnFamily.setComparator_type("UTF8Type");
columnFamily.setDefault_validation_class("UTF8Type");
-
CassandraStorage.getCassandra(KEYSPACE).system_add_column_family(columnFamily);
+
getConnection(KEYSPACE).system_add_column_family(columnFamily);
initialized = true;
} catch (Exception e) {
logger.debug("Did not create System.CommitLog. (probably
already there)");
@@ -50,14 +59,15 @@
}

@SuppressWarnings("unchecked")
- public static Trigger getTrigger(String triggerClass) throws Exception{
+ public static Trigger getTrigger(String triggerClass) throws Exception
{
Class<Trigger> clazz = (Class<Trigger>)
Class.forName(triggerClass);
- return clazz.newInstance();
- }
-
- public static Map<String, List<Trigger>> getTriggers() throws
Exception{
- // TODO: Cache this.
- Map<String,List<Trigger>> triggerMap = new
HashMap<String,List<Trigger>>();
+ return clazz.newInstance();
+ }
+
+ @PooledConnection
+ public Map<String, List<Trigger>> getTriggers() throws Exception {
+ // TODO: Cache this.
+ Map<String, List<Trigger>> triggerMap = new HashMap<String,
List<Trigger>>();
SlicePredicate predicate = new SlicePredicate();
SliceRange range = new SliceRange(ByteBufferUtil.bytes(""),
ByteBufferUtil.bytes(""), false, 10);
predicate.setSlice_range(range);
@@ -66,19 +76,20 @@
keyRange.setStart_key(ByteBufferUtil.bytes(""));
keyRange.setEnd_key(ByteBufferUtil.EMPTY_BYTE_BUFFER);
ColumnParent parent = new ColumnParent(COLUMN_FAMILY);
- List<KeySlice> rows =
CassandraStorage.getCassandra(KEYSPACE).get_range_slices(parent, predicate,
keyRange, ConsistencyLevel.ALL);
- for (KeySlice slice : rows){
+ List<KeySlice> rows =
getConnection(KEYSPACE).get_range_slices(parent, predicate, keyRange,
+ ConsistencyLevel.ALL);
+ for (KeySlice slice : rows) {
String columnFamily = ByteBufferUtil.string(slice.key);
List<Trigger> triggers = new ArrayList<Trigger>();
- for (ColumnOrSuperColumn column : slice.columns){
+ for (ColumnOrSuperColumn column : slice.columns) {
String className =
ByteBufferUtil.string(column.column.name);
- String enabled =
ByteBufferUtil.string(column.column.value);
- if (enabled.equals(ENABLED)){
+ String enabled =
ByteBufferUtil.string(column.column.value);
+ if (enabled.equals(ENABLED)) {
triggers.add(getTrigger(className));
}
}
triggerMap.put(columnFamily, triggers);
- }
+ }
return triggerMap;
}
}
=======================================
--- /trunk/server/src/main/java/org/apache/virgil/triggers/TriggerTask.java
Fri Jan 27 10:56:08 2012
+++ /trunk/server/src/main/java/org/apache/virgil/triggers/TriggerTask.java
Tue Jan 31 06:23:41 2012
@@ -16,7 +16,7 @@
Map<String, List<Trigger>> triggerMap = null;
try {
logger.debug("Running triggers @ [" + new Date() + "]");
- triggerMap = TriggerStore.getTriggers();
+ triggerMap = TriggerStore.getStore().getTriggers();
} catch (Exception e){
logger.error("Could not retrieve triggers.", e);
}
=======================================
--- /trunk/server/src/test/resources/META-INF/aop.xml Thu Jan 26 14:04:12
2012
+++ /trunk/server/src/main/resources/META-INF/aop.xml Tue Jan 31 06:23:41
2012
@@ -2,6 +2,7 @@
<aspectj>
<aspects>
<aspect name="org.apache.virgil.aop.CassandraServerAspect" />
+ <aspect name="org.apache.virgil.aop.ConnectionPoolAspect" />
</aspects>
<weaver options="-showWeaveInfo">
<include within="org.apache.cassandra.thrift.*" />

Reply all
Reply to author
Forward
0 new messages