http://code.google.com/a/apache-extras.org/p/virgil/source/detail?r=155
Added:
/trunk/server/src/main/java/org/apache/virgil/triggers/TestTrigger.java
/trunk/server/src/main/java/org/apache/virgil/triggers/Trigger.java
/trunk/server/src/main/java/org/apache/virgil/triggers/TriggerStore.java
/trunk/server/src/main/java/org/apache/virgil/triggers/TriggerTask.java
Modified:
/trunk/server/pom.xml
/trunk/server/src/main/java/org/apache/virgil/CassandraStorage.java
/trunk/server/src/main/java/org/apache/virgil/JsonMarshaller.java
/trunk/server/src/main/java/org/apache/virgil/aop/CassandraServerAspect.aj
/trunk/server/src/main/java/org/apache/virgil/triggers/DistributedCommitLog.java
/trunk/server/src/test/resources/virgil.yaml
=======================================
--- /dev/null
+++ /trunk/server/src/main/java/org/apache/virgil/triggers/TestTrigger.java
Thu Jan 26 09:00:31 2012
@@ -0,0 +1,5 @@
+package org.apache.virgil.triggers;
+
+public class TestTrigger {
+
+}
=======================================
--- /dev/null
+++ /trunk/server/src/main/java/org/apache/virgil/triggers/Trigger.java Thu
Jan 26 09:00:31 2012
@@ -0,0 +1,9 @@
+package org.apache.virgil.triggers;
+
+import org.apache.cassandra.db.RowMutation;
+
+public interface Trigger {
+
+ public void process(RowMutation rowMutation);
+
+}
=======================================
--- /dev/null
+++
/trunk/server/src/main/java/org/apache/virgil/triggers/TriggerStore.java
Thu Jan 26 09:00:31 2012
@@ -0,0 +1,86 @@
+package org.apache.virgil.triggers;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.cassandra.db.IColumn;
+import org.apache.cassandra.thrift.CfDef;
+import org.apache.cassandra.thrift.ColumnOrSuperColumn;
+import org.apache.cassandra.thrift.ColumnParent;
+import org.apache.cassandra.thrift.ConsistencyLevel;
+import org.apache.cassandra.thrift.KeyRange;
+import org.apache.cassandra.thrift.KeySlice;
+import org.apache.cassandra.thrift.KsDef;
+import org.apache.cassandra.thrift.SlicePredicate;
+import org.apache.cassandra.thrift.SliceRange;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.virgil.CassandraStorage;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class TriggerStore {
+ public static final String KEYSPACE = "cirrus";
+ public static final String COLUMN_FAMILY = "Trigger";
+ public static final String ENABLED = "enabled";
+ private static boolean initialized = false;
+ private static Logger logger =
LoggerFactory.getLogger(TriggerStore.class);
+
+ public static void create() {
+ if (!initialized) {
+ try {
+ List<CfDef> cfDefList = new ArrayList<CfDef>();
+ KsDef ksDef = new
KsDef(KEYSPACE, "org.apache.cassandra.locator.SimpleStrategy", cfDefList);
+ ksDef.putToStrategy_options("replication_factor", "1");
+
CassandraStorage.getCassandra(null).system_add_keyspace(ksDef);
+ } catch (Exception e) {
+ logger.debug("Did not create System.CommitLog. (probably
already there)");
+ }
+ try {
+ CfDef columnFamily = new CfDef(KEYSPACE, COLUMN_FAMILY);
+ columnFamily.setKey_validation_class("UTF8Type");
+ columnFamily.setComparator_type("UTF8Type");
+ columnFamily.setDefault_validation_class("UTF8Type");
+
CassandraStorage.getCassandra(KEYSPACE).system_add_column_family(columnFamily);
+ initialized = true;
+ } catch (Exception e) {
+ logger.debug("Did not create System.CommitLog. (probably
already there)");
+ }
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ public static Trigger getTrigger(String triggerClass) throws Exception{
+ Class<Trigger> clazz = (Class<Trigger>)
Class.forName(triggerClass);
+ return clazz.newInstance();
+ }
+
+
+ public static Map<String, List<Trigger>> getTriggers() throws
Exception{
+ // TODO: Cache this.
+ Map<String,List<Trigger>> triggerMap = new
HashMap<String,List<Trigger>>();
+ SlicePredicate predicate = new SlicePredicate();
+ SliceRange range = new SliceRange(ByteBufferUtil.bytes(""),
ByteBufferUtil.bytes(""), false, 10);
+ predicate.setSlice_range(range);
+
+ KeyRange keyRange = new KeyRange(1000);
+ keyRange.setStart_key(ByteBufferUtil.bytes(""));
+ keyRange.setEnd_key(ByteBufferUtil.EMPTY_BYTE_BUFFER);
+ ColumnParent parent = new ColumnParent(COLUMN_FAMILY);
+ List<KeySlice> rows =
CassandraStorage.getCassandra(KEYSPACE).get_range_slices(parent, predicate,
keyRange, ConsistencyLevel.ALL);
+ for (KeySlice slice : rows){
+ String columnFamily = ByteBufferUtil.string(slice.key);
+ List<Trigger> triggers = new ArrayList<Trigger>();
+ for (ColumnOrSuperColumn column : slice.columns){
+ String className =
ByteBufferUtil.string(column.column.name);
+ String enabled =
ByteBufferUtil.string(column.column.value);
+ if (enabled.equals(ENABLED)){
+ triggers.add(getTrigger(className));
+ }
+ }
+ triggerMap.put(columnFamily, triggers);
+ }
+ return triggerMap;
+ }
+}
=======================================
--- /dev/null
+++ /trunk/server/src/main/java/org/apache/virgil/triggers/TriggerTask.java
Thu Jan 26 09:00:31 2012
@@ -0,0 +1,31 @@
+package org.apache.virgil.triggers;
+
+import java.util.Date;
+import java.util.List;
+import java.util.Map;
+import java.util.TimerTask;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class TriggerTask extends TimerTask {
+ private static Logger logger =
LoggerFactory.getLogger(TriggerTask.class);
+
+ @Override
+ public void run() {
+ Map<String, List<Trigger>> triggerMap = null;
+ try {
+ logger.debug("Running triggers @ [" + new Date() + "]");
+ triggerMap = TriggerStore.getTriggers();
+ } catch (Exception e){
+ logger.error("Could not retrieve triggers.", e);
+ }
+
+ for (String path : triggerMap.keySet()){
+ String keyspace = path.substring(path.indexOf(':'));
+ String columnFamily = path.substring(path.indexOf(':'));
+ System.out.println("[" + keyspace + "]:[" + columnFamily
+ "]");
+ }
+ }
+
+}
=======================================
--- /trunk/server/pom.xml Tue Jan 24 13:31:50 2012
+++ /trunk/server/pom.xml Thu Jan 26 09:00:31 2012
@@ -37,6 +37,13 @@
<artifactId>aspectjrt</artifactId>
<version>1.6.11</version>
</dependency>
+
+ <dependency>
+ <groupId>com.eaio.uuid</groupId>
+ <artifactId>uuid</artifactId>
+ <version>3.2</version>
+ </dependency>
+
</dependencies>
<build>
@@ -52,7 +59,7 @@
<artifactId>aspectj-maven-plugin</artifactId>
<version>1.4</version>
<configuration>
-
<complianceLevel>1.5</complianceLevel>
+ <complianceLevel>1.5</complianceLevel>
<weaveDependencies>
<weaveDependency>
<groupId>org.apache.cassandra</groupId>
=======================================
--- /trunk/server/src/main/java/org/apache/virgil/CassandraStorage.java Tue
Jan 24 13:31:50 2012
+++ /trunk/server/src/main/java/org/apache/virgil/CassandraStorage.java Thu
Jan 26 09:00:31 2012
@@ -23,6 +23,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Timer;
import org.apache.cassandra.thrift.Cassandra;
import org.apache.cassandra.thrift.CassandraServer;
@@ -46,6 +47,9 @@
import org.apache.thrift.transport.TTransport;
import org.apache.virgil.config.VirgilConfiguration;
import org.apache.virgil.index.Indexer;
+import org.apache.virgil.triggers.DistributedCommitLog;
+import org.apache.virgil.triggers.TriggerStore;
+import org.apache.virgil.triggers.TriggerTask;
import org.json.simple.JSONArray;
import org.json.simple.JSONObject;
@@ -54,12 +58,18 @@
private static final int MAX_ROWS = 20;
private Indexer indexer = null;
private VirgilConfiguration config = null;
+ private Timer triggerTimer = null;
+ private static final long TRIGGER_FREQUENCY = 5000; // every X
milliseconds
// TODO: Come back and make indexing AOP
public CassandraStorage(VirgilConfiguration config, Indexer indexer) {
this.indexer = indexer;
// CassandraStorage.server = server;
this.config = config;
+ DistributedCommitLog.create();
+ TriggerStore.create();
+ triggerTimer = new Timer(true);
+ triggerTimer.schedule(new TriggerTask(), 0, TRIGGER_FREQUENCY);
}
// For now, get a new connection every time.
=======================================
--- /trunk/server/src/main/java/org/apache/virgil/JsonMarshaller.java Mon
Jan 16 18:14:34 2012
+++ /trunk/server/src/main/java/org/apache/virgil/JsonMarshaller.java Thu
Jan 26 09:00:31 2012
@@ -10,35 +10,43 @@
import org.apache.cassandra.thrift.KeySlice;
import org.apache.cassandra.thrift.KsDef;
import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.UUIDGen;
import org.json.simple.JSONArray;
import org.json.simple.JSONObject;
public class JsonMarshaller {
- @SuppressWarnings("unchecked")
- public static String marshallColumn(ColumnOrSuperColumn column) throws
UnsupportedEncodingException {
- JSONObject json = new JSONObject();
- Column c = column.getColumn();
- json.put(string(c.getName()), string(c.getValue()));
- return json.toString();
- }
-
- @SuppressWarnings("unchecked")
- public static JSONObject marshallSlice(List<ColumnOrSuperColumn> slice)
throws UnsupportedEncodingException {
- JSONObject json = new JSONObject();
- for (ColumnOrSuperColumn column : slice) {
- Column c = column.getColumn();
- json.put(string(c.getName()), string(c.getValue()));
- }
- return json;
- }
-
- @SuppressWarnings("unchecked")
+ @SuppressWarnings("unchecked")
+ public static String marshallColumn(ColumnOrSuperColumn column) throws
UnsupportedEncodingException {
+ JSONObject json = new JSONObject();
+ Column c = column.getColumn();
+ json.put(string(c.getName()), string(c.getValue()));
+ return json.toString();
+ }
+
+ @SuppressWarnings("unchecked")
+ public static JSONObject marshallSlice(List<ColumnOrSuperColumn>
slice) throws UnsupportedEncodingException {
+ JSONObject json = new JSONObject();
+ for (ColumnOrSuperColumn column : slice) {
+ Column c = column.getColumn();
+ json.put(string(c.getName()), string(c.getValue()));
+ }
+ return json;
+ }
+
+ @SuppressWarnings("unchecked")
public static JSONArray marshallRows(List<KeySlice> rows, boolean
flatten) throws Exception {
if (flatten){
JSONArray cfJson = new JSONArray();
for (KeySlice row : rows){
- String rowKey = ByteBufferUtil.string(row.key);
+
+ String rowKey = null;
+ try {
+ rowKey = ByteBufferUtil.string(row.key);
+ } catch (Exception e){
+ // TODO: Be smarter, use the ValidationClass to marshal to a string
+ rowKey = UUIDGen.getUUID(row.key).toString();
+ }
for (ColumnOrSuperColumn column : row.columns){
JSONObject rowJson = new JSONObject();
rowJson.put("row", rowKey);
@@ -60,42 +68,43 @@
throw new RuntimeException("Virgil does not support hiearchical fetch
of column family yet.");
}
}
-
- @SuppressWarnings("unchecked")
- public static JSONArray marshallKeyspaces(List<KsDef> keyspaces, boolean
flatten) throws UnsupportedEncodingException {
- JSONArray keyspaceJson = new JSONArray();
- if (flatten) {
- for (KsDef keyspace : keyspaces) {
- List<CfDef> columnFamilies = keyspace.getCf_defs();
- for (CfDef columnFamily : columnFamilies) {
- JSONObject json = new JSONObject();
- json.put("keyspace", keyspace.getName());
- json.put("columnFamily", columnFamily.getName());
- keyspaceJson.add(json);
- }
- }
- } else {
- for (KsDef keyspace : keyspaces) {
- JSONObject json = new JSONObject();
- json.put("keyspace", keyspace.getName());
- json.put("strategy", keyspace.getStrategy_class());
- List<CfDef> columnFamilies = keyspace.getCf_defs();
- JSONArray cfJsonArray = new JSONArray();
- for (CfDef columnFamily : columnFamilies) {
- JSONObject cfJson = new JSONObject();
- cfJson.put("name", columnFamily.getName());
- cfJsonArray.add(cfJson);
- }
- json.put("columnFamilies", cfJsonArray);
- keyspaceJson.add(json);
- }
- }
- return keyspaceJson;
- }
-
- private static String string(byte[] bytes) throws
UnsupportedEncodingException {
- return new String(bytes, "UTF8");
- }
+
+ @SuppressWarnings("unchecked")
+ public static JSONArray marshallKeyspaces(List<KsDef> keyspaces,
boolean flatten)
+ throws UnsupportedEncodingException {
+ JSONArray keyspaceJson = new JSONArray();
+ if (flatten) {
+ for (KsDef keyspace : keyspaces) {
+ List<CfDef> columnFamilies = keyspace.getCf_defs();
+ for (CfDef columnFamily : columnFamilies) {
+ JSONObject json = new JSONObject();
+ json.put("keyspace", keyspace.getName());
+ json.put("columnFamily", columnFamily.getName());
+ keyspaceJson.add(json);
+ }
+ }
+ } else {
+ for (KsDef keyspace : keyspaces) {
+ JSONObject json = new JSONObject();
+ json.put("keyspace", keyspace.getName());
+ json.put("strategy", keyspace.getStrategy_class());
+ List<CfDef> columnFamilies = keyspace.getCf_defs();
+ JSONArray cfJsonArray = new JSONArray();
+ for (CfDef columnFamily : columnFamilies) {
+ JSONObject cfJson = new JSONObject();
+ cfJson.put("name", columnFamily.getName());
+ cfJsonArray.add(cfJson);
+ }
+ json.put("columnFamilies", cfJsonArray);
+ keyspaceJson.add(json);
+ }
+ }
+ return keyspaceJson;
+ }
+
+ private static String string(byte[] bytes) throws
UnsupportedEncodingException {
+ return new String(bytes, "UTF8");
+ }
}
=======================================
---
/trunk/server/src/main/java/org/apache/virgil/aop/CassandraServerAspect.aj
Tue Jan 24 13:31:50 2012
+++
/trunk/server/src/main/java/org/apache/virgil/aop/CassandraServerAspect.aj
Thu Jan 26 09:00:31 2012
@@ -2,12 +2,9 @@
import java.util.List;
-import org.apache.cassandra.db.ColumnFamily;
-import org.apache.cassandra.db.IColumn;
import org.apache.cassandra.db.IMutation;
import org.apache.cassandra.db.RowMutation;
import org.apache.cassandra.thrift.ConsistencyLevel;
-import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.virgil.triggers.DistributedCommitLog;
public aspect CassandraServerAspect {
@@ -17,30 +14,40 @@
after() : insertMethod() {
try {
- Object consistencyLevel = thisJoinPoint.getArgs()[0];
+ ConsistencyLevel consistencyLevel = (ConsistencyLevel)
thisJoinPoint.getArgs()[0];
@SuppressWarnings("unchecked")
List<IMutation> mutations = (List<IMutation>)
thisJoinPoint.getArgs()[1];
for (IMutation mutation : mutations) {
if (mutation instanceof RowMutation) {
RowMutation rowMutation = (RowMutation) mutation;
- String key = ByteBufferUtil.string(rowMutation.key());
- System.out.println("Mutation for [" + key + "] @ [" +
rowMutation.getTable()
- + "] with consistencyLevel [" +
consistencyLevel + "]");
- for (Integer cfId : rowMutation.getColumnFamilyIds()) {
- ColumnFamily columnFamily =
rowMutation.getColumnFamily(cfId);
- for (IColumn column :
columnFamily.getSortedColumns()) {
- String name =
ByteBufferUtil.string(column.name());
- String value =
ByteBufferUtil.string(column.value());
- boolean delete =
columnFamily.isMarkedForDelete();
- if (delete) {
- System.out.println(" -- DELETE -- [" +
name + "] => [" + value + "]");
- } else {
- System.out.println(" -- SET -- [" + name
+ "] => [" + value + "]");
- }
- }
- }
- }
- DistributedCommitLog.writeMutation(mutation);
+ // String key =
ByteBufferUtil.string(rowMutation.key());
+ // System.out.println("Mutation for [" + key + "] @ ["
+
+ // rowMutation.getTable()
+ // + "] with consistencyLevel [" + consistencyLevel
+ "]");
+ System.out.println("Mutation for [" +
rowMutation.getTable() + "] with consistencyLevel ["
+ + consistencyLevel + "]");
+ if
(!rowMutation.getTable().equals(DistributedCommitLog.KEYSPACE)) {
+
DistributedCommitLog.writeMutation(consistencyLevel, rowMutation);
+ }
+ // for (Integer cfId :
rowMutation.getColumnFamilyIds()) {
+ // ColumnFamily columnFamily =
+ // rowMutation.getColumnFamily(cfId);
+ // for (IColumn column :
columnFamily.getSortedColumns()) {
+ // if (co)
+ // String name = ByteBufferUtil.string(column.name());
+ // String value =
ByteBufferUtil.string(column.value());
+ // boolean delete = columnFamily.isMarkedForDelete();
+ // if (delete) {
+ // System.out.println(" -- DELETE -- [" + name + "] =>
[" +
+ // value + "]");
+ // } else {
+ // System.out.println(" -- SET -- [" + name + "] => ["
+
+ // value + "]");
+ // }
+ // }
+ // }
+
+ }
}
} catch (Exception e) {
e.printStackTrace();
=======================================
---
/trunk/server/src/main/java/org/apache/virgil/triggers/DistributedCommitLog.java
Tue Jan 24 13:31:50 2012
+++
/trunk/server/src/main/java/org/apache/virgil/triggers/DistributedCommitLog.java
Thu Jan 26 09:00:31 2012
@@ -1,11 +1,21 @@
package org.apache.virgil.triggers;
+import java.nio.ByteBuffer;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
-
-import org.apache.cassandra.db.IMutation;
+import java.util.Map;
+
+import org.apache.cassandra.db.RowMutation;
+import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.thrift.CfDef;
+import org.apache.cassandra.thrift.Column;
+import org.apache.cassandra.thrift.ColumnOrSuperColumn;
+import org.apache.cassandra.thrift.ConsistencyLevel;
import org.apache.cassandra.thrift.KsDef;
+import org.apache.cassandra.thrift.Mutation;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.UUIDGen;
import org.apache.virgil.CassandraStorage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -24,25 +34,38 @@
ksDef.putToStrategy_options("replication_factor", "1");
CassandraStorage.getCassandra(null).system_add_keyspace(ksDef);
} catch (Exception e) {
- logger.warn("Did not create System.CommitLog. (probably
already there)");
- }
-
+ logger.debug("Did not create System.CommitLog. (probably
already there)");
+ }
try {
CfDef columnFamily = new CfDef(KEYSPACE, COLUMN_FAMILY);
- columnFamily.setKey_validation_class("UTF8Type");
- columnFamily.setComparator_type("UTF8Type");
- columnFamily.setDefault_validation_class("UTF8Type");
+ columnFamily.setKey_validation_class("TimeUUIDType");
CassandraStorage.getCassandra(KEYSPACE).system_add_column_family(columnFamily);
initialized = true;
} catch (Exception e) {
- e.printStackTrace();
- logger.warn("Did not create System.CommitLog. (probably
already there)");
- }
-
+ logger.debug("Did not create System.CommitLog. (probably
already there)");
+ }
}
}
- public static void writeMutation(IMutation mutation) {
- DistributedCommitLog.create();
+ public static void writeMutation(ConsistencyLevel consistencyLevel,
RowMutation rowMutation)
+ throws Exception {
+ List<Mutation> slice = new ArrayList<Mutation>();
+ Column c = new Column();
+ c.setName(ByteBufferUtil.bytes("mutation"));
+
c.setValue(rowMutation.getSerializedBuffer(MessagingService.version_));
+ c.setTimestamp(System.currentTimeMillis() * 1000);
+
+ Mutation m = new Mutation();
+ ColumnOrSuperColumn cc = new ColumnOrSuperColumn();
+ cc.setColumn(c);
+ m.setColumn_or_supercolumn(cc);
+ slice.add(m);
+ Map<ByteBuffer, Map<String, List<Mutation>>> mutationMap = new
HashMap<ByteBuffer, Map<String, List<Mutation>>>();
+ Map<String, List<Mutation>> cfMutations = new HashMap<String,
List<Mutation>>();
+ cfMutations.put(COLUMN_FAMILY, slice);
+ byte[] rowKey = UUIDGen.getTimeUUIDBytes();
+ mutationMap.put(ByteBuffer.wrap(rowKey), cfMutations);
+ // TODO: Add Exception Handling.
+ CassandraStorage.getCassandra(KEYSPACE).batch_mutate(mutationMap,
consistencyLevel);
}
}
=======================================
--- /trunk/server/src/test/resources/virgil.yaml Tue Jan 24 13:31:50 2012
+++ /trunk/server/src/test/resources/virgil.yaml Thu Jan 26 09:00:31 2012
@@ -121,7 +121,7 @@
loggers:
# Sets the level for 'com.example.app' to DEBUG.
- org.apache.virgil: WARN
+ org.apache.virgil: DEBUG
org.apache: WARN
org.eclipse: WARN
httpclient.wire: WARN