http://code.google.com/a/apache-extras.org/p/virgil/source/detail?r=168
Added:
/trunk/triggers/src/main/java/org/apache/virgil/triggers/CassandraStore.java
/trunk/triggers/src/main/java/org/apache/virgil/triggers/ConfigurationStore.java
Deleted:
/trunk/triggers/src/main/java/org/apache/virgil/triggers/InternalCassandraClient.java
Modified:
/trunk/server/src/main/java/org/apache/virgil/CassandraStorage.java
/trunk/triggers/src/main/java/org/apache/virgil/triggers/CassandraServerTriggerAspect.java
/trunk/triggers/src/main/java/org/apache/virgil/triggers/DistributedCommitLog.java
/trunk/triggers/src/main/java/org/apache/virgil/triggers/TriggerStore.java
/trunk/triggers/src/main/java/org/apache/virgil/triggers/TriggerTask.java
=======================================
--- /dev/null
+++
/trunk/triggers/src/main/java/org/apache/virgil/triggers/CassandraStore.java
Mon Feb 6 13:38:52 2012
@@ -0,0 +1,75 @@
+package org.apache.virgil.triggers;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.cassandra.thrift.Cassandra;
+import org.apache.cassandra.thrift.CassandraServer;
+import org.apache.cassandra.thrift.CfDef;
+import org.apache.cassandra.thrift.KsDef;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class CassandraStore {
+ private static Logger logger =
LoggerFactory.getLogger(CassandraStore.class);
+ private static boolean initialized = false;
+ private String keyspace = null;
+ private String columnFamily = null;
+
+ protected CassandraStore(String keyspace, String columnFamily) throws
Exception {
+ this.keyspace = keyspace;
+ this.columnFamily = columnFamily;
+ this.create();
+ }
+
+ Cassandra.Iface getConnection(String keyspace) throws Exception {
+ CassandraServer server = new CassandraServer();
+ if (keyspace != null) {
+ server.set_keyspace(keyspace);
+ }
+ return server;
+ }
+
+ public synchronized void create() throws Exception {
+ if (!initialized) {
+ try {
+ List<CfDef> cfDefList = new ArrayList<CfDef>();
+ KsDef ksDef = new
KsDef(this.getKeyspace(), "org.apache.cassandra.locator.SimpleStrategy",
cfDefList);
+ ksDef.putToStrategy_options("replication_factor", "1");
+ getConnection(null).system_add_keyspace(ksDef);
+ } catch (Exception e) {
+ logger.debug("Did not create [" + this.getKeyspace() + ":"
+ this.getColumnFamily()
+ + "] (probably already there)");
+ }
+ try {
+ CfDef columnFamily = new CfDef(this.getKeyspace(),
this.getColumnFamily());
+ columnFamily.setKey_validation_class("UTF8Type");
+ columnFamily.setDefault_validation_class("UTF8Type");
+ columnFamily.setComparator_type("UTF8Type");
+
+
getConnection(this.getKeyspace()).system_add_column_family(columnFamily);
+ initialized = true;
+ } catch (Exception e) {
+ logger.debug("Did not create [" + this.getKeyspace() + ":"
+ this.getColumnFamily()
+ + "] (probably already there)");
+ }
+ }
+ }
+
+ public String getKeyspace() {
+ return keyspace;
+ }
+
+ public void setKeyspace(String keyspace) {
+ this.keyspace = keyspace;
+ }
+
+ public String getColumnFamily() {
+ return columnFamily;
+ }
+
+ public void setColumnFamily(String columnFamily) {
+ this.columnFamily = columnFamily;
+ }
+
+}
=======================================
--- /dev/null
+++
/trunk/triggers/src/main/java/org/apache/virgil/triggers/ConfigurationStore.java
Mon Feb 6 13:38:52 2012
@@ -0,0 +1,79 @@
+package org.apache.virgil.triggers;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.cassandra.thrift.ColumnOrSuperColumn;
+import org.apache.cassandra.thrift.ColumnParent;
+import org.apache.cassandra.thrift.ConsistencyLevel;
+import org.apache.cassandra.thrift.KeyRange;
+import org.apache.cassandra.thrift.KeySlice;
+import org.apache.cassandra.thrift.SlicePredicate;
+import org.apache.cassandra.thrift.SliceRange;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ConfigurationStore extends CassandraStore {
+ public static final String KEYSPACE = "virgil";
+ public static final String COLUMN_FAMILY = "Configuration";
+ private static Logger logger =
LoggerFactory.getLogger(ConfigurationStore.class);
+ private static ConfigurationStore instance = null;
+ private long lastFetchTime = -1;
+ private static final int REFRESH_INTERVAL = 1000 * 30; // 30 seconds
+ private Map<String, Map<String, String>> cache = null;
+
+ public ConfigurationStore(String keyspace, String columnFamily) throws
Exception {
+ super(keyspace, columnFamily);
+ logger.debug("Instantiated configuration store.");
+ }
+
+ public static synchronized ConfigurationStore getStore() throws
Exception {
+ if (instance == null)
+ instance = new ConfigurationStore(KEYSPACE, COLUMN_FAMILY);
+ return instance;
+ }
+
+ public boolean isCommitLogEnabled() throws Throwable {
+ Map<String, String> commitLogProperties =
this.getConfiguration().get("CommitLog");
+ if (commitLogProperties != null) {
+ String enabled = commitLogProperties.get("enabled");
+ return (enabled != null && enabled.equals("true"));
+ }
+ return false;
+ }
+
+ public Map<String, Map<String, String>> getConfiguration() throws
Throwable {
+ long currentTime = System.currentTimeMillis();
+ long timeSinceRefresh = currentTime - this.lastFetchTime;
+ if (timeSinceRefresh > REFRESH_INTERVAL) {
+ logger.debug("Refreshing virgil run-time configuration.");
+ SlicePredicate predicate = new SlicePredicate();
+ SliceRange range = new SliceRange(ByteBufferUtil.bytes(""),
ByteBufferUtil.bytes(""), false, 10);
+ predicate.setSlice_range(range);
+
+ Map<String, Map<String, String>> configuration = new
HashMap<String, Map<String, String>>();
+ KeyRange keyRange = new KeyRange(1000);
+ keyRange.setStart_key(ByteBufferUtil.bytes(""));
+ keyRange.setEnd_key(ByteBufferUtil.EMPTY_BYTE_BUFFER);
+ ColumnParent parent = new ColumnParent(COLUMN_FAMILY);
+ List<KeySlice> rows =
getConnection(KEYSPACE).get_range_slices(parent, predicate, keyRange,
+ ConsistencyLevel.ALL);
+ for (KeySlice slice : rows) {
+ String component = ByteBufferUtil.string(slice.key);
+ Map<String, String> properties = new HashMap<String,
String>();
+ for (ColumnOrSuperColumn column : slice.columns) {
+ String key = ByteBufferUtil.string(column.column.name);
+ String value =
ByteBufferUtil.string(column.column.value);
+ properties.put(key, value);
+ }
+ configuration.put(component, properties);
+ }
+
+ this.lastFetchTime = currentTime;
+ cache = configuration;
+ }
+ return cache;
+ }
+}
=======================================
---
/trunk/triggers/src/main/java/org/apache/virgil/triggers/InternalCassandraClient.java
Thu Feb 2 07:42:56 2012
+++ /dev/null
@@ -1,14 +0,0 @@
-package org.apache.virgil.triggers;
-
-import org.apache.cassandra.thrift.Cassandra;
-import org.apache.cassandra.thrift.CassandraServer;
-
-public class InternalCassandraClient {
- Cassandra.Iface getConnection(String keyspace) throws Exception{
- CassandraServer server = new CassandraServer();
- if (keyspace != null){
- server.set_keyspace(keyspace);
- }
- return server;
- }
-}
=======================================
--- /trunk/server/src/main/java/org/apache/virgil/CassandraStorage.java Tue
Jan 31 06:23:41 2012
+++ /trunk/server/src/main/java/org/apache/virgil/CassandraStorage.java Mon
Feb 6 13:38:52 2012
@@ -63,8 +63,6 @@
// CassandraStorage.server = server;
this.config = config;
ConnectionPool.initializePool();
- DistributedCommitLog.getLog().create();
- TriggerStore.getStore().create();
triggerTimer = new Timer(true);
triggerTimer.schedule(new TriggerTask(), 0, TRIGGER_FREQUENCY);
}
=======================================
---
/trunk/triggers/src/main/java/org/apache/virgil/triggers/CassandraServerTriggerAspect.java
Mon Feb 6 10:22:32 2012
+++
/trunk/triggers/src/main/java/org/apache/virgil/triggers/CassandraServerTriggerAspect.java
Mon Feb 6 13:38:52 2012
@@ -18,22 +18,27 @@
@Around("execution(*
org.apache.cassandra.thrift.CassandraServer.doInsert(..))")
public void writeToCommitLog(ProceedingJoinPoint thisJoinPoint) throws
Throwable {
- try {
- ConsistencyLevel consistencyLevel = (ConsistencyLevel)
thisJoinPoint.getArgs()[0];
- @SuppressWarnings("unchecked")
- List<IMutation> mutations = (List<IMutation>)
thisJoinPoint.getArgs()[1];
- List<LogEntry> logEntries = writePending(consistencyLevel,
mutations);
+ if (ConfigurationStore.getStore().isCommitLogEnabled()) {
+ try {
+ ConsistencyLevel consistencyLevel = (ConsistencyLevel)
thisJoinPoint.getArgs()[0];
+ @SuppressWarnings("unchecked")
+ List<IMutation> mutations = (List<IMutation>)
thisJoinPoint.getArgs()[1];
+ List<LogEntry> logEntries = writePending(consistencyLevel,
mutations);
+ thisJoinPoint.proceed(thisJoinPoint.getArgs());
+ writeCommitted(logEntries);
+ // TODO: Catch Invalid Request separately, and remove the
+ // pending.
+ } catch (Throwable t) {
+ logger.error("Could not write to cassandra!", t);
+ t.printStackTrace();
+ throw t;
+ }
+ } else {
thisJoinPoint.proceed(thisJoinPoint.getArgs());
- writeCommitted(logEntries);
- // TODO: Catch Invalid Request separately, and remove the
pending.
- } catch (Throwable t) {
- logger.error("Could not write to cassandra!", t);
- throw t;
}
}
- private List<LogEntry> writePending(ConsistencyLevel consistencyLevel,
List<IMutation> mutations)
- throws Throwable {
+ private List<LogEntry> writePending(ConsistencyLevel consistencyLevel,
List<IMutation> mutations) throws Throwable {
List<LogEntry> logEntries = new ArrayList<LogEntry>();
for (IMutation mutation : mutations) {
if (mutation instanceof RowMutation) {
@@ -48,8 +53,7 @@
return logEntries;
}
- private void writeCommitted(List<LogEntry> logEntries)
- throws Throwable {
+ private void writeCommitted(List<LogEntry> logEntries) throws
Throwable {
for (LogEntry logEntry : logEntries) {
logEntry.setStatus(LogEntryStatus.COMMITTED);
DistributedCommitLog.getLog().writeLogEntry(logEntry);
=======================================
---
/trunk/triggers/src/main/java/org/apache/virgil/triggers/DistributedCommitLog.java
Mon Feb 6 10:50:01 2012
+++
/trunk/triggers/src/main/java/org/apache/virgil/triggers/DistributedCommitLog.java
Mon Feb 6 13:38:52 2012
@@ -8,7 +8,6 @@
import org.apache.cassandra.db.ColumnFamily;
import org.apache.cassandra.db.RowMutation;
-import org.apache.cassandra.thrift.CfDef;
import org.apache.cassandra.thrift.Column;
import org.apache.cassandra.thrift.ColumnOrSuperColumn;
import org.apache.cassandra.thrift.ColumnParent;
@@ -16,7 +15,6 @@
import org.apache.cassandra.thrift.ConsistencyLevel;
import org.apache.cassandra.thrift.KeyRange;
import org.apache.cassandra.thrift.KeySlice;
-import org.apache.cassandra.thrift.KsDef;
import org.apache.cassandra.thrift.Mutation;
import org.apache.cassandra.thrift.SlicePredicate;
import org.apache.cassandra.thrift.SliceRange;
@@ -24,7 +22,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class DistributedCommitLog extends InternalCassandraClient {
+public class DistributedCommitLog extends CassandraStore {
private static Logger logger =
LoggerFactory.getLogger(DistributedCommitLog.class);
public static final String KEYSPACE = "virgil";
@@ -32,38 +30,18 @@
public static final int MAX_ROW_SIZE = 10;
public static final int BATCH_SIZE = 50;
public static final int IN_FUTURE = 1000 * 60;
- private static boolean initialized = false;
private static DistributedCommitLog instance = null;
- public static synchronized DistributedCommitLog getLog() {
+ public DistributedCommitLog(String keyspace, String columnFamily)
throws Exception{
+ super(keyspace, columnFamily);
+ logger.debug("Instantiated distributed commit log.");
+ }
+
+ public static synchronized DistributedCommitLog getLog() throws
Exception {
if (instance == null)
- instance = new DistributedCommitLog();
+ instance = new DistributedCommitLog(KEYSPACE, COLUMN_FAMILY);
return instance;
}
-
- public synchronized void create() throws Exception {
- if (!initialized) {
- try {
- List<CfDef> cfDefList = new ArrayList<CfDef>();
- KsDef ksDef = new
KsDef(KEYSPACE, "org.apache.cassandra.locator.SimpleStrategy", cfDefList);
- ksDef.putToStrategy_options("replication_factor", "1");
- getConnection(null).system_add_keyspace(ksDef);
- } catch (Exception e) {
- logger.debug("Did not create System.CommitLog. (probably
already there)");
- }
- try {
- CfDef columnFamily = new CfDef(KEYSPACE, COLUMN_FAMILY);
- columnFamily.setKey_validation_class("UTF8Type");
- columnFamily.setDefault_validation_class("UTF8Type");
- columnFamily.setComparator_type("UTF8Type");
-
-
getConnection(KEYSPACE).system_add_column_family(columnFamily);
- initialized = true;
- } catch (Exception e) {
- logger.debug("Did not create System.CommitLog. (probably
already there)");
- }
- }
- }
public List<LogEntry> writePending(ConsistencyLevel consistencyLevel,
RowMutation rowMutation) throws Throwable {
String keyspace = rowMutation.getTable();
=======================================
---
/trunk/triggers/src/main/java/org/apache/virgil/triggers/TriggerStore.java
Mon Feb 6 10:50:01 2012
+++
/trunk/triggers/src/main/java/org/apache/virgil/triggers/TriggerStore.java
Mon Feb 6 13:38:52 2012
@@ -5,55 +5,35 @@
import java.util.List;
import java.util.Map;
-import org.apache.cassandra.thrift.CfDef;
import org.apache.cassandra.thrift.ColumnOrSuperColumn;
import org.apache.cassandra.thrift.ColumnParent;
import org.apache.cassandra.thrift.ConsistencyLevel;
import org.apache.cassandra.thrift.KeyRange;
import org.apache.cassandra.thrift.KeySlice;
-import org.apache.cassandra.thrift.KsDef;
import org.apache.cassandra.thrift.SlicePredicate;
import org.apache.cassandra.thrift.SliceRange;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class TriggerStore extends InternalCassandraClient {
+public class TriggerStore extends CassandraStore {
+ private static Logger logger =
LoggerFactory.getLogger(TriggerStore.class);
+
public static final String KEYSPACE = "virgil";
public static final String COLUMN_FAMILY = "Triggers";
public static final String ENABLED = "enabled";
- private static boolean initialized = false;
- private static Logger logger =
LoggerFactory.getLogger(TriggerStore.class);
private static TriggerStore instance = null;
- public static synchronized TriggerStore getStore() {
+ public TriggerStore(String keyspace, String columnFamily) throws
Exception{
+ super(keyspace, columnFamily);
+ logger.debug("Instantiated trigger store.");
+ }
+
+ public static synchronized TriggerStore getStore() throws Exception {
if (instance == null)
- instance = new TriggerStore();
+ instance = new TriggerStore(KEYSPACE, COLUMN_FAMILY);
return instance;
}
-
- public void create() throws Exception {
- if (!initialized) {
- try {
- List<CfDef> cfDefList = new ArrayList<CfDef>();
- KsDef ksDef = new
KsDef(KEYSPACE, "org.apache.cassandra.locator.SimpleStrategy", cfDefList);
- ksDef.putToStrategy_options("replication_factor", "1");
- getConnection(null).system_add_keyspace(ksDef);
- } catch (Exception e) {
- logger.debug("Did not create System.CommitLog. (probably
already there)");
- }
- try {
- CfDef columnFamily = new CfDef(KEYSPACE, COLUMN_FAMILY);
- columnFamily.setKey_validation_class("UTF8Type");
- columnFamily.setComparator_type("UTF8Type");
- columnFamily.setDefault_validation_class("UTF8Type");
-
getConnection(KEYSPACE).system_add_column_family(columnFamily);
- initialized = true;
- } catch (Exception e) {
- logger.debug("Did not create System.CommitLog. (probably
already there)");
- }
- }
- }
@SuppressWarnings("unchecked")
public static Trigger getTrigger(String triggerClass) throws Exception
{
=======================================
---
/trunk/triggers/src/main/java/org/apache/virgil/triggers/TriggerTask.java
Mon Feb 6 10:22:32 2012
+++
/trunk/triggers/src/main/java/org/apache/virgil/triggers/TriggerTask.java
Mon Feb 6 13:38:52 2012
@@ -1,6 +1,5 @@
package org.apache.virgil.triggers;
-import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.TimerTask;
@@ -13,28 +12,31 @@
@Override
public void run() {
- Map<String, List<Trigger>> triggerMap = null;
try {
- logger.debug("Running triggers @ [" + new Date() + "]");
- triggerMap = TriggerStore.getStore().getTriggers();
- List<LogEntry> logEntries =
DistributedCommitLog.getLog().getPending();
- for (LogEntry logEntry : logEntries) {
- logger.debug("Processing Entry [" + logEntry.getUuid()
+ "]:[" + logEntry.getKeyspace() + "]:["
- + logEntry.getColumnFamily() + "]");
- String path = logEntry.getKeyspace() + ":" +
logEntry.getColumnFamily();
- List<Trigger> triggers = triggerMap.get(path);
- if (triggers != null) {
- for (Trigger trigger : triggers) {
- trigger.process(logEntry);
- }
- }
-
- // Provided all processed properly, remove the logEntry
- DistributedCommitLog.getLog().removeLogEntry(logEntry);
+ if (ConfigurationStore.getStore().isCommitLogEnabled()) {
+ Map<String, List<Trigger>> triggerMap = null;
+ logger.debug("Running triggers.");
+ triggerMap = TriggerStore.getStore().getTriggers();
+ List<LogEntry> logEntries =
DistributedCommitLog.getLog().getPending();
+ for (LogEntry logEntry : logEntries) {
+ logger.debug("Processing Entry [" + logEntry.getUuid()
+ "]:[" + logEntry.getKeyspace() + "]:["
+ + logEntry.getColumnFamily() + "]");
+ String path = logEntry.getKeyspace() + ":" +
logEntry.getColumnFamily();
+ List<Trigger> triggers = triggerMap.get(path);
+ if (triggers != null) {
+ for (Trigger trigger : triggers) {
+ trigger.process(logEntry);
+ }
+ }
+
+ // Provided all processed properly, remove the logEntry
+ DistributedCommitLog.getLog().removeLogEntry(logEntry);
+ }
+ } else {
+ logger.debug("Skipping trigger execution because commit
log is disabled.");
}
} catch (Throwable t) {
logger.error("Could not execute triggers.", t);
}
-
}
}