http://code.google.com/a/apache-extras.org/p/virgil/source/detail?r=164
Added:
/trunk/triggers
/trunk/triggers/pom.xml
/trunk/triggers/src
/trunk/triggers/src/main
/trunk/triggers/src/main/java
/trunk/triggers/src/main/java/org
/trunk/triggers/src/main/java/org/apache
/trunk/triggers/src/main/java/org/apache/virgil
/trunk/triggers/src/main/java/org/apache/virgil/triggers
/trunk/triggers/src/main/java/org/apache/virgil/triggers/CassandraServerTriggerAspect.java
/trunk/triggers/src/main/java/org/apache/virgil/triggers/DistributedCommitLog.java
/trunk/triggers/src/main/java/org/apache/virgil/triggers/InternalCassandraClient.java
/trunk/triggers/src/main/java/org/apache/virgil/triggers/TestTrigger.java
/trunk/triggers/src/main/java/org/apache/virgil/triggers/Trigger.java
/trunk/triggers/src/main/java/org/apache/virgil/triggers/TriggerStore.java
/trunk/triggers/src/main/java/org/apache/virgil/triggers/TriggerTask.java
/trunk/triggers/src/main/resources
/trunk/triggers/src/test
/trunk/triggers/src/test/java
/trunk/triggers/src/test/java/org
/trunk/triggers/src/test/java/org/apache
/trunk/triggers/src/test/java/org/apache/virgil
/trunk/triggers/src/test/java/org/apache/virgil/triggers
/trunk/triggers/src/test/java/org/apache/virgil/triggers/TriggerTest.java
/trunk/triggers/src/test/resources
/trunk/triggers/src/test/resources/wordcount.js
/trunk/triggers/src/test/resources/wordcount.rb
Deleted:
/trunk/server/src/main/java/org/apache/virgil/triggers
/trunk/server/src/test/java/org/apache/virgil/triggers
Modified:
/trunk/pom.xml
/trunk/release/assembly/mapreduce/jars/virgil-mapreduce-hdeploy.jar
/trunk/server/pom.xml
/trunk/server/src/main/java/org/apache/virgil/config/VirgilConfiguration.java
=======================================
--- /dev/null
+++ /trunk/triggers/pom.xml Thu Feb 2 07:42:56 2012
@@ -0,0 +1,68 @@
+<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/maven-v4_0_0.xsd">
+
+ <parent>
+ <groupId>org.apache.virgil</groupId>
+ <artifactId>virgil-parent</artifactId>
+ <version>0.12.0-SNAPSHOT</version>
+ </parent>
+
+ <modelVersion>4.0.0</modelVersion>
+ <artifactId>virgil-triggers</artifactId>
+ <packaging>jar</packaging>
+ <name> Virgil : Triggers </name>
+ <url>http://maven.apache.org</url>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.aspectj</groupId>
+ <artifactId>aspectjrt</artifactId>
+ <version>1.6.11</version>
+ </dependency>
+ <dependency>
+ <groupId>net.java.dev.jna</groupId>
+ <artifactId>jna</artifactId>
+ <version>3.4.0</version>
+ </dependency>
+ <dependency>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ <version>11.0.1</version>
+ </dependency>
+ <dependency>
+ <groupId>org.codehaus.jackson</groupId>
+ <artifactId>jackson-core-asl</artifactId>
+ <version>1.9.3</version>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>aspectj-maven-plugin</artifactId>
+ <version>1.4</version>
+ <configuration>
+ <complianceLevel>1.5</complianceLevel>
+ <weaveDependencies>
+ <weaveDependency>
+ <groupId>org.apache.cassandra</groupId>
+ <artifactId>cassandra-thrift</artifactId>
+ </weaveDependency>
+ <weaveDependency>
+ <groupId>org.apache.cassandra</groupId>
+ <artifactId>cassandra-all</artifactId>
+ </weaveDependency>
+ </weaveDependencies>
+ </configuration>
+ <executions>
+ <execution>
+ <goals>
+ <goal>compile</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+</project>
=======================================
--- /dev/null
+++
/trunk/triggers/src/main/java/org/apache/virgil/triggers/CassandraServerTriggerAspect.java
Thu Feb 2 07:42:56 2012
@@ -0,0 +1,39 @@
+package org.apache.virgil.triggers;
+
+import java.util.List;
+
+import org.apache.cassandra.db.IMutation;
+import org.apache.cassandra.db.RowMutation;
+import org.apache.cassandra.thrift.ConsistencyLevel;
+import org.aspectj.lang.JoinPoint;
+import org.aspectj.lang.annotation.AfterReturning;
+import org.aspectj.lang.annotation.Aspect;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@Aspect
+public class CassandraServerTriggerAspect {
+ private static Logger logger =
LoggerFactory.getLogger(CassandraServerTriggerAspect.class);
+
+ @AfterReturning("execution(*
org.apache.cassandra.thrift.CassandraServer.doInsert(..))")
+ public void writeToCommitLog(JoinPoint thisJoinPoint) {
+ try {
+ ConsistencyLevel consistencyLevel = (ConsistencyLevel)
thisJoinPoint.getArgs()[0];
+ @SuppressWarnings("unchecked")
+ List<IMutation> mutations = (List<IMutation>)
thisJoinPoint.getArgs()[1];
+ for (IMutation mutation : mutations) {
+ if (mutation instanceof RowMutation) {
+ RowMutation rowMutation = (RowMutation) mutation;
+ logger.debug("Mutation for [" + rowMutation.getTable()
+ "] with consistencyLevel ["
+ + consistencyLevel + "]");
+ if
(!rowMutation.getTable().equals(DistributedCommitLog.KEYSPACE)) {
+
DistributedCommitLog.getLog().writeMutation(consistencyLevel, rowMutation);
+ }
+ }
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+
+ }
+}
=======================================
--- /dev/null
+++
/trunk/triggers/src/main/java/org/apache/virgil/triggers/DistributedCommitLog.java
Thu Feb 2 07:42:56 2012
@@ -0,0 +1,77 @@
+package org.apache.virgil.triggers;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.cassandra.db.RowMutation;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.thrift.CfDef;
+import org.apache.cassandra.thrift.Column;
+import org.apache.cassandra.thrift.ColumnOrSuperColumn;
+import org.apache.cassandra.thrift.ConsistencyLevel;
+import org.apache.cassandra.thrift.KsDef;
+import org.apache.cassandra.thrift.Mutation;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.UUIDGen;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class DistributedCommitLog extends InternalCassandraClient {
+ private static Logger logger =
LoggerFactory.getLogger(DistributedCommitLog.class);
+
+ public static final String KEYSPACE = "cirrus";
+ public static final String COLUMN_FAMILY = "CommitLog";
+ private static boolean initialized = false;
+ private static DistributedCommitLog instance = null;
+
+ public static synchronized DistributedCommitLog getLog() {
+ if (instance == null)
+ instance = new DistributedCommitLog();
+ return instance;
+ }
+
+ public synchronized void create() throws Exception {
+ if (!initialized) {
+ try {
+ List<CfDef> cfDefList = new ArrayList<CfDef>();
+ KsDef ksDef = new
KsDef(KEYSPACE, "org.apache.cassandra.locator.SimpleStrategy", cfDefList);
+ ksDef.putToStrategy_options("replication_factor", "1");
+ getConnection(null).system_add_keyspace(ksDef);
+ } catch (Exception e) {
+ logger.debug("Did not create System.CommitLog. (probably
already there)");
+ }
+ try {
+ CfDef columnFamily = new CfDef(KEYSPACE, COLUMN_FAMILY);
+ columnFamily.setKey_validation_class("TimeUUIDType");
+
getConnection(KEYSPACE).system_add_column_family(columnFamily);
+ initialized = true;
+ } catch (Exception e) {
+ logger.debug("Did not create System.CommitLog. (probably
already there)");
+ }
+ }
+ }
+
+ public void writeMutation(ConsistencyLevel consistencyLevel,
RowMutation rowMutation) throws Exception {
+ List<Mutation> slice = new ArrayList<Mutation>();
+ Column c = new Column();
+ c.setName(ByteBufferUtil.bytes("mutation"));
+
c.setValue(rowMutation.getSerializedBuffer(MessagingService.version_));
+ c.setTimestamp(System.currentTimeMillis() * 1000);
+
+ Mutation m = new Mutation();
+ ColumnOrSuperColumn cc = new ColumnOrSuperColumn();
+ cc.setColumn(c);
+ m.setColumn_or_supercolumn(cc);
+ slice.add(m);
+ Map<ByteBuffer, Map<String, List<Mutation>>> mutationMap = new
HashMap<ByteBuffer, Map<String, List<Mutation>>>();
+ Map<String, List<Mutation>> cfMutations = new HashMap<String,
List<Mutation>>();
+ cfMutations.put(COLUMN_FAMILY, slice);
+ byte[] rowKey = UUIDGen.getTimeUUIDBytes();
+ mutationMap.put(ByteBuffer.wrap(rowKey), cfMutations);
+ // TODO: Add Exception Handling.
+ getConnection(KEYSPACE).batch_mutate(mutationMap,
consistencyLevel);
+ }
+}
=======================================
--- /dev/null
+++
/trunk/triggers/src/main/java/org/apache/virgil/triggers/InternalCassandraClient.java
Thu Feb 2 07:42:56 2012
@@ -0,0 +1,14 @@
+package org.apache.virgil.triggers;
+
+import org.apache.cassandra.thrift.Cassandra;
+import org.apache.cassandra.thrift.CassandraServer;
+
+public class InternalCassandraClient {
+ Cassandra.Iface getConnection(String keyspace) throws Exception{
+ CassandraServer server = new CassandraServer();
+ if (keyspace != null){
+ server.set_keyspace(keyspace);
+ }
+ return server;
+ }
+}
=======================================
--- /dev/null
+++
/trunk/triggers/src/main/java/org/apache/virgil/triggers/TestTrigger.java
Thu Feb 2 07:42:56 2012
@@ -0,0 +1,11 @@
+package org.apache.virgil.triggers;
+
+import org.apache.cassandra.db.RowMutation;
+
+public class TestTrigger implements Trigger {
+
+ public void process(RowMutation rowMutation) {
+ // TODO Auto-generated method stub
+ }
+
+}
=======================================
--- /dev/null
+++ /trunk/triggers/src/main/java/org/apache/virgil/triggers/Trigger.java
Thu Feb 2 07:42:56 2012
@@ -0,0 +1,9 @@
+package org.apache.virgil.triggers;
+
+import org.apache.cassandra.db.RowMutation;
+
+public interface Trigger {
+
+ public void process(RowMutation rowMutation);
+
+}
=======================================
--- /dev/null
+++
/trunk/triggers/src/main/java/org/apache/virgil/triggers/TriggerStore.java
Thu Feb 2 07:42:56 2012
@@ -0,0 +1,91 @@
+package org.apache.virgil.triggers;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.cassandra.thrift.CfDef;
+import org.apache.cassandra.thrift.ColumnOrSuperColumn;
+import org.apache.cassandra.thrift.ColumnParent;
+import org.apache.cassandra.thrift.ConsistencyLevel;
+import org.apache.cassandra.thrift.KeyRange;
+import org.apache.cassandra.thrift.KeySlice;
+import org.apache.cassandra.thrift.KsDef;
+import org.apache.cassandra.thrift.SlicePredicate;
+import org.apache.cassandra.thrift.SliceRange;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class TriggerStore extends InternalCassandraClient {
+ public static final String KEYSPACE = "cirrus";
+ public static final String COLUMN_FAMILY = "Trigger";
+ public static final String ENABLED = "enabled";
+ private static boolean initialized = false;
+ private static Logger logger =
LoggerFactory.getLogger(TriggerStore.class);
+ private static TriggerStore instance = null;
+
+ public static synchronized TriggerStore getStore() {
+ if (instance == null)
+ instance = new TriggerStore();
+ return instance;
+ }
+
+ public void create() throws Exception {
+ if (!initialized) {
+ try {
+ List<CfDef> cfDefList = new ArrayList<CfDef>();
+ KsDef ksDef = new
KsDef(KEYSPACE, "org.apache.cassandra.locator.SimpleStrategy", cfDefList);
+ ksDef.putToStrategy_options("replication_factor", "1");
+ getConnection(null).system_add_keyspace(ksDef);
+ } catch (Exception e) {
+ logger.debug("Did not create System.CommitLog. (probably
already there)");
+ }
+ try {
+ CfDef columnFamily = new CfDef(KEYSPACE, COLUMN_FAMILY);
+ columnFamily.setKey_validation_class("UTF8Type");
+ columnFamily.setComparator_type("UTF8Type");
+ columnFamily.setDefault_validation_class("UTF8Type");
+
getConnection(KEYSPACE).system_add_column_family(columnFamily);
+ initialized = true;
+ } catch (Exception e) {
+ logger.debug("Did not create System.CommitLog. (probably
already there)");
+ }
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ public static Trigger getTrigger(String triggerClass) throws Exception
{
+ Class<Trigger> clazz = (Class<Trigger>)
Class.forName(triggerClass);
+ return clazz.newInstance();
+ }
+
+ public Map<String, List<Trigger>> getTriggers() throws Exception {
+ // TODO: Cache this.
+ Map<String, List<Trigger>> triggerMap = new HashMap<String,
List<Trigger>>();
+ SlicePredicate predicate = new SlicePredicate();
+ SliceRange range = new SliceRange(ByteBufferUtil.bytes(""),
ByteBufferUtil.bytes(""), false, 10);
+ predicate.setSlice_range(range);
+
+ KeyRange keyRange = new KeyRange(1000);
+ keyRange.setStart_key(ByteBufferUtil.bytes(""));
+ keyRange.setEnd_key(ByteBufferUtil.EMPTY_BYTE_BUFFER);
+ ColumnParent parent = new ColumnParent(COLUMN_FAMILY);
+ List<KeySlice> rows =
getConnection(KEYSPACE).get_range_slices(parent, predicate, keyRange,
+ ConsistencyLevel.ALL);
+ for (KeySlice slice : rows) {
+ String columnFamily = ByteBufferUtil.string(slice.key);
+ List<Trigger> triggers = new ArrayList<Trigger>();
+ for (ColumnOrSuperColumn column : slice.columns) {
+ String className =
ByteBufferUtil.string(column.column.name);
+ String enabled =
ByteBufferUtil.string(column.column.value);
+ if (enabled.equals(ENABLED)) {
+ triggers.add(getTrigger(className));
+ }
+ }
+ triggerMap.put(columnFamily, triggers);
+ }
+ return triggerMap;
+ }
+}
=======================================
--- /dev/null
+++
/trunk/triggers/src/main/java/org/apache/virgil/triggers/TriggerTask.java
Thu Feb 2 07:42:56 2012
@@ -0,0 +1,31 @@
+package org.apache.virgil.triggers;
+
+import java.util.Date;
+import java.util.List;
+import java.util.Map;
+import java.util.TimerTask;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class TriggerTask extends TimerTask {
+ private static Logger logger =
LoggerFactory.getLogger(TriggerTask.class);
+
+ @Override
+ public void run() {
+ Map<String, List<Trigger>> triggerMap = null;
+ try {
+ logger.debug("Running triggers @ [" + new Date() + "]");
+ triggerMap = TriggerStore.getStore().getTriggers();
+ } catch (Exception e){
+ logger.error("Could not retrieve triggers.", e);
+ }
+
+ for (String path : triggerMap.keySet()){
+ String keyspace = path.substring(path.indexOf(':'));
+ String columnFamily = path.substring(path.indexOf(':'));
+ logger.debug("[" + keyspace + "]:[" + columnFamily + "]");
+ }
+ }
+
+}
=======================================
--- /dev/null
+++
/trunk/triggers/src/test/java/org/apache/virgil/triggers/TriggerTest.java
Thu Feb 2 07:42:56 2012
@@ -0,0 +1,5 @@
+package org.apache.virgil.triggers;
+
+public class TriggerTest {
+
+}
=======================================
--- /dev/null
+++ /trunk/triggers/src/test/resources/wordcount.js Thu Feb 2 07:42:56 2012
@@ -0,0 +1,9 @@
+function map(rowKey, columns) {
+ print('Got here.');
+ print(rowKey);
+ print(columns.size());
+ for (i in columns.entrySet()) {
+ print('key is: ' + i + ', value is: ' + columns[i]);
+ }
+
+}
=======================================
--- /dev/null
+++ /trunk/triggers/src/test/resources/wordcount.rb Thu Feb 2 07:42:56 2012
@@ -0,0 +1,25 @@
+require 'json'
+require 'rest-client'
+
+def map(rowKey, columns)
+ result = []
+ columns.each do |column_name, value|
+ words = value.split
+ words.each do |word|
+ result << [word, "1"]
+ end
+ end
+ return result
+end
+
+def reduce(key, values)
+ rows = {}
+ total = 0
+ columns = {}
+ values.each do |value|
+ total += value.to_i
+ end
+ columns["count"] = total.to_s
+ rows[key] = columns
+ return rows
+end
=======================================
--- /trunk/pom.xml Fri Jan 27 13:50:54 2012
+++ /trunk/pom.xml Thu Feb 2 07:42:56 2012
@@ -12,6 +12,7 @@
<module>server</module>
<module>mapreduce</module>
<module>release</module>
+ <module>triggers</module>
</modules>
<dependencyManagement>
@@ -26,6 +27,11 @@
<artifactId>virgil-server</artifactId>
<version>${project.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.virgil</groupId>
+ <artifactId>virgil-triggers</artifactId>
+ <version>${project.version}</version>
+ </dependency>
</dependencies>
</dependencyManagement>
=======================================
--- /trunk/release/assembly/mapreduce/jars/virgil-mapreduce-hdeploy.jar Fri
Jan 27 13:50:54 2012
+++ /trunk/release/assembly/mapreduce/jars/virgil-mapreduce-hdeploy.jar Thu
Feb 2 07:42:56 2012
File is too large to display a diff.
=======================================
--- /trunk/server/pom.xml Tue Jan 31 09:05:50 2012
+++ /trunk/server/pom.xml Thu Feb 2 07:42:56 2012
@@ -19,6 +19,11 @@
<artifactId>virgil-mapreduce</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.apache.virgil</groupId>
+ <artifactId>virgil-triggers</artifactId>
+ </dependency>
+
<dependency>
<groupId>com.yammer.dropwizard</groupId>
<artifactId>dropwizard-core</artifactId>
@@ -36,19 +41,6 @@
<artifactId>aspectjrt</artifactId>
<version>1.6.11</version>
</dependency>
-
- <dependency>
- <groupId>com.eaio.uuid</groupId>
- <artifactId>uuid</artifactId>
- <version>3.2</version>
- </dependency>
-<dependency>
- <groupId>net.java.dev.jna</groupId>
- <artifactId>jna</artifactId>
- <version>3.4.0</version>
-</dependency>
-
-
</dependencies>
<build>
@@ -58,6 +50,7 @@
<directory>${basedir}/src/main/resources</directory>
</resource>
</resources>
+
<plugins>
<plugin>
<groupId>org.codehaus.mojo</groupId>
=======================================
---
/trunk/server/src/main/java/org/apache/virgil/config/VirgilConfiguration.java
Tue Jan 24 13:31:50 2012
+++
/trunk/server/src/main/java/org/apache/virgil/config/VirgilConfiguration.java
Thu Feb 2 07:42:56 2012
@@ -8,49 +8,52 @@
import com.yammer.dropwizard.config.Configuration;
public class VirgilConfiguration extends Configuration {
- public final static String CASSANDRA_HOST_PROPERTY
= "virgil.cassandra_host";
- public final static String CASSANDRA_PORT_PROPERTY
= "virgil.cassandra_port";
+ public final static String CASSANDRA_HOST_PROPERTY
= "virgil.cassandra_host";
+ public final static String CASSANDRA_PORT_PROPERTY
= "virgil.cassandra_port";
public final static String CASSANDRA_EMBEDDED = "virgil.embedded";
- @NotEmpty
- @NotNull
- private String solrHost;
-
- @NotEmpty
- @NotNull
- private String cassandraYaml;
-
- private boolean enableIndexing;
-
- public String getSolrHost() {
- return solrHost;
- }
-
- public String getCassandraYaml() {
- return cassandraYaml;
- }
-
- public boolean isIndexingEnabled() {
- return enableIndexing;
- }
-
- public ConsistencyLevel getConsistencyLevel(String consistencyLevel) {
- // Defaulting consistency level to ALL
- if (consistencyLevel == null)
- return ConsistencyLevel.ALL;
- else
- return ConsistencyLevel.valueOf(consistencyLevel);
- }
-
- public static boolean isEmbedded(){
- return
System.getProperty(VirgilConfiguration.CASSANDRA_EMBEDDED).equals("1");
- }
-
- public static String getHost(){
+ @NotEmpty
+ @NotNull
+ private String solrHost;
+
+ @NotEmpty
+ @NotNull
+ private String cassandraYaml;
+
+ private boolean enableIndexing;
+
+ public String getSolrHost() {
+ return solrHost;
+ }
+
+ public String getCassandraYaml() {
+ return cassandraYaml;
+ }
+
+ public boolean isIndexingEnabled() {
+ return enableIndexing;
+ }
+
+ public ConsistencyLevel getConsistencyLevel(String consistencyLevel) {
+ // Defaulting consistency level to ALL
+ if (consistencyLevel == null)
+ return ConsistencyLevel.ALL;
+ else
+ return ConsistencyLevel.valueOf(consistencyLevel);
+ }
+
+ public static boolean isEmbedded() {
+ if (System.getProperty(VirgilConfiguration.CASSANDRA_EMBEDDED) ==
null)
+ return true;
+ else
+ return
(System.getProperty(VirgilConfiguration.CASSANDRA_EMBEDDED).equals("1"));
+ }
+
+ public static String getHost() {
return
System.getProperty(VirgilConfiguration.CASSANDRA_HOST_PROPERTY);
}
-
- public static int getPort(){
+
+ public static int getPort() {
return
Integer.parseInt(System.getProperty(VirgilConfiguration.CASSANDRA_PORT_PROPERTY));
}
}