[virgil] r166 committed - Moved trigger code into module.

7 views
Skip to first unread message

virgil.apach...@codespot.com

unread,
Feb 6, 2012, 1:23:21 PM2/6/12
to virgil...@gmail.com
Revision: 166
Author: boneill42
Date: Mon Feb 6 10:22:32 2012
Log: Moved trigger code into module.


http://code.google.com/a/apache-extras.org/p/virgil/source/detail?r=166

Added:

/trunk/triggers/src/main/java/org/apache/virgil/triggers/ColumnOperation.java
/trunk/triggers/src/main/java/org/apache/virgil/triggers/LogEntry.java

/trunk/triggers/src/main/java/org/apache/virgil/triggers/LogEntryStatus.java
Deleted:
/trunk/server/src/test/resources/core-site.xml
/trunk/server/src/test/resources/hdfs-site.xml
/trunk/server/src/test/resources/mapred-site.xml
Modified:
/trunk/release/assembly/mapreduce/jars/virgil-mapreduce-hdeploy.jar

/trunk/triggers/src/main/java/org/apache/virgil/triggers/CassandraServerTriggerAspect.java

/trunk/triggers/src/main/java/org/apache/virgil/triggers/DistributedCommitLog.java
Replaced:
/trunk/server/src/test/resources/cassandra_test.yaml
/trunk/triggers/src/main/java/org/apache/virgil/triggers/TestTrigger.java
/trunk/triggers/src/main/java/org/apache/virgil/triggers/Trigger.java
/trunk/triggers/src/main/java/org/apache/virgil/triggers/TriggerStore.java
/trunk/triggers/src/main/java/org/apache/virgil/triggers/TriggerTask.java

=======================================
--- /dev/null
+++
/trunk/triggers/src/main/java/org/apache/virgil/triggers/ColumnOperation.java
Mon Feb 6 10:22:32 2012
@@ -0,0 +1,22 @@
+package org.apache.virgil.triggers;
+
+import java.nio.ByteBuffer;
+
+public class ColumnOperation {
+ private ByteBuffer name;
+ private boolean isDelete;
+
+ public ByteBuffer getName() {
+ return name;
+ }
+ public void setName(ByteBuffer name) {
+ this.name = name;
+ }
+
+ public boolean isDelete() {
+ return isDelete;
+ }
+ public void setDelete(boolean isDelete) {
+ this.isDelete = isDelete;
+ }
+}
=======================================
--- /dev/null
+++ /trunk/triggers/src/main/java/org/apache/virgil/triggers/LogEntry.java
Mon Feb 6 10:22:32 2012
@@ -0,0 +1,96 @@
+package org.apache.virgil.triggers;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.cassandra.db.ColumnFamily;
+import org.apache.cassandra.db.IColumn;
+import org.apache.cassandra.thrift.ConsistencyLevel;
+import org.apache.cassandra.utils.UUIDGen;
+
+public class LogEntry {
+ private String keyspace = null;
+ private String columnFamily = null;
+ private ConsistencyLevel consistencyLevel = null;
+ private List<ColumnOperation> operations = new
ArrayList<ColumnOperation>();
+ private LogEntryStatus status = null;
+ private ByteBuffer rowKey = null;
+ private String uuid = null;
+
+ public LogEntry() {
+ }
+
+ public LogEntry(String keyspace, ColumnFamily columnFamily, ByteBuffer
rowKey, ConsistencyLevel consistencyLevel)
+ throws Throwable {
+ this.columnFamily = columnFamily.metadata().cfName;
+ this.keyspace = keyspace;
+ this.rowKey = rowKey;
+ for (IColumn column : columnFamily.getSortedColumns()) {
+ ColumnOperation operation = new ColumnOperation();
+ operation.setName(column.name());
+ operation.setDelete(columnFamily.isMarkedForDelete());
+ operations.add(operation);
+ }
+ this.uuid =
UUIDGen.getUUID(ByteBuffer.wrap(UUIDGen.getTimeUUIDBytes())).toString();
+ this.status = LogEntryStatus.PREPARING;
+ this.consistencyLevel = consistencyLevel;
+ }
+
+ public String getKeyspace() {
+ return keyspace;
+ }
+
+ public void setKeyspace(String keyspace) {
+ this.keyspace = keyspace;
+ }
+
+ public String getColumnFamily() {
+ return columnFamily;
+ }
+
+ public void setColumnFamily(String columnFamily) {
+ this.columnFamily = columnFamily;
+ }
+
+ public List<ColumnOperation> getOperations() {
+ return operations;
+ }
+
+ public void setOperations(List<ColumnOperation> operations) {
+ this.operations = operations;
+ }
+
+ public LogEntryStatus getStatus() {
+ return status;
+ }
+
+ public void setStatus(LogEntryStatus status) {
+ this.status = status;
+ }
+
+ public ByteBuffer getRowKey() {
+ return rowKey;
+ }
+
+ public void setRowKey(ByteBuffer rowKey) {
+ this.rowKey = rowKey;
+ }
+
+ public String getUuid() {
+ return uuid;
+ }
+
+ public void setUuid(String uuid) {
+ this.uuid = uuid;
+ }
+
+ public ConsistencyLevel getConsistencyLevel() {
+ return consistencyLevel;
+ }
+
+ public void setConsistencyLevel(ConsistencyLevel consistencyLevel) {
+ this.consistencyLevel = consistencyLevel;
+ }
+
+}
=======================================
--- /dev/null
+++
/trunk/triggers/src/main/java/org/apache/virgil/triggers/LogEntryStatus.java
Mon Feb 6 10:22:32 2012
@@ -0,0 +1,10 @@
+package org.apache.virgil.triggers;
+
+public enum LogEntryStatus {
+ PREPARING, COMMITTED, COMPLETE; // ; is optional
+
+ @Override
+ public String toString() {
+ return super.toString();
+ }
+}
=======================================
--- /trunk/server/src/test/resources/core-site.xml Tue Jan 31 06:29:08 2012
+++ /dev/null
@@ -1,12 +0,0 @@
-<?xml version="1.0"?>
-<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
-
-<!-- Put site-specific property overrides in this file. -->
-
-<configuration>
-<property>
- <name>fs.default.name</name>
- <value>hdfs://NAME_NODE:9000</value>
- <description>The name of the default filesystem</description>
-</property>
-</configuration>
=======================================
--- /trunk/server/src/test/resources/hdfs-site.xml Tue Jan 31 06:29:08 2012
+++ /dev/null
@@ -1,24 +0,0 @@
-<?xml version="1.0"?>
-<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
-
-<!-- Put site-specific property overrides in this file. -->
-
-<configuration>
- <property>
- <name>dfs.replication</name>
- <value>3</value>
- </property>
- <property>
- <name>dfs.name.dir</name>
- <value>/opt/data/hadoop/dfs/name</value>
- </property>
- <property>
- <name>dfs.data.dir</name>
- <value>/opt/data/hadoop/dfs/data</value>
- </property>
- <property>
- <name>dfs.checkpoint.dir</name>
- <value>/opt/data/hadoop/dfs/namesecondary</value>
- </property>
-
-</configuration>
=======================================
--- /trunk/server/src/test/resources/mapred-site.xml Tue Jan 31 06:29:08
2012
+++ /dev/null
@@ -1,18 +0,0 @@
-<?xml version="1.0"?>
-<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
-
-<!-- Put site-specific property overrides in this file. -->
-
-<configuration>
- <property>
- <name>mapred.job.tracker</name>
- <value>JOB_TRACKER_NODE:9001</value>
- <description>Where the job tracker is</description>
- </property>
- <property>
- <name>mapred.job.reuse.jvm.num.tasks</name>
- <value>-1</value>
- <description>The number of JVMs to reuse. -1 reuses
all.</description>
- </property>
-
-</configuration>
=======================================
--- /trunk/release/assembly/mapreduce/jars/virgil-mapreduce-hdeploy.jar Thu
Feb 2 07:42:56 2012
+++ /trunk/release/assembly/mapreduce/jars/virgil-mapreduce-hdeploy.jar Mon
Feb 6 10:22:32 2012
File is too large to display a diff.
=======================================
---
/trunk/triggers/src/main/java/org/apache/virgil/triggers/CassandraServerTriggerAspect.java
Thu Feb 2 07:42:56 2012
+++
/trunk/triggers/src/main/java/org/apache/virgil/triggers/CassandraServerTriggerAspect.java
Mon Feb 6 10:22:32 2012
@@ -1,12 +1,13 @@
package org.apache.virgil.triggers;

+import java.util.ArrayList;
import java.util.List;

import org.apache.cassandra.db.IMutation;
import org.apache.cassandra.db.RowMutation;
import org.apache.cassandra.thrift.ConsistencyLevel;
-import org.aspectj.lang.JoinPoint;
-import org.aspectj.lang.annotation.AfterReturning;
+import org.aspectj.lang.ProceedingJoinPoint;
+import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -15,25 +16,44 @@
public class CassandraServerTriggerAspect {
private static Logger logger =
LoggerFactory.getLogger(CassandraServerTriggerAspect.class);

- @AfterReturning("execution(*
org.apache.cassandra.thrift.CassandraServer.doInsert(..))")
- public void writeToCommitLog(JoinPoint thisJoinPoint) {
+ @Around("execution(*
org.apache.cassandra.thrift.CassandraServer.doInsert(..))")
+ public void writeToCommitLog(ProceedingJoinPoint thisJoinPoint) throws
Throwable {
try {
ConsistencyLevel consistencyLevel = (ConsistencyLevel)
thisJoinPoint.getArgs()[0];
@SuppressWarnings("unchecked")
List<IMutation> mutations = (List<IMutation>)
thisJoinPoint.getArgs()[1];
- for (IMutation mutation : mutations) {
- if (mutation instanceof RowMutation) {
- RowMutation rowMutation = (RowMutation) mutation;
- logger.debug("Mutation for [" + rowMutation.getTable()
+ "] with consistencyLevel ["
- + consistencyLevel + "]");
- if
(!rowMutation.getTable().equals(DistributedCommitLog.KEYSPACE)) {
-
DistributedCommitLog.getLog().writeMutation(consistencyLevel, rowMutation);
- }
+ List<LogEntry> logEntries = writePending(consistencyLevel,
mutations);
+ thisJoinPoint.proceed(thisJoinPoint.getArgs());
+ writeCommitted(logEntries);
+ // TODO: Catch Invalid Request separately, and remove the
pending.
+ } catch (Throwable t) {
+ logger.error("Could not write to cassandra!", t);
+ throw t;
+ }
+ }
+
+ private List<LogEntry> writePending(ConsistencyLevel consistencyLevel,
List<IMutation> mutations)
+ throws Throwable {
+ List<LogEntry> logEntries = new ArrayList<LogEntry>();
+ for (IMutation mutation : mutations) {
+ if (mutation instanceof RowMutation) {
+ RowMutation rowMutation = (RowMutation) mutation;
+ logger.debug("Mutation for [" + rowMutation.getTable()
+ "] with consistencyLevel [" + consistencyLevel
+ + "]");
+ if
(!rowMutation.getTable().equals(DistributedCommitLog.KEYSPACE)) {
+
logEntries.addAll(DistributedCommitLog.getLog().writePending(consistencyLevel,
rowMutation));
}
}
- } catch (Exception e) {
- e.printStackTrace();
- }
-
- }
-}
+ }
+ return logEntries;
+ }
+
+ private void writeCommitted(List<LogEntry> logEntries)
+ throws Throwable {
+ for (LogEntry logEntry : logEntries) {
+ logEntry.setStatus(LogEntryStatus.COMMITTED);
+ DistributedCommitLog.getLog().writeLogEntry(logEntry);
+ }
+ }
+
+}
=======================================
---
/trunk/triggers/src/main/java/org/apache/virgil/triggers/DistributedCommitLog.java
Thu Feb 2 07:42:56 2012
+++
/trunk/triggers/src/main/java/org/apache/virgil/triggers/DistributedCommitLog.java
Mon Feb 6 10:22:32 2012
@@ -6,24 +6,36 @@
import java.util.List;
import java.util.Map;

+import org.apache.cassandra.db.ColumnFamily;
import org.apache.cassandra.db.RowMutation;
-import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.thrift.CfDef;
import org.apache.cassandra.thrift.Column;
import org.apache.cassandra.thrift.ColumnOrSuperColumn;
+import org.apache.cassandra.thrift.ColumnParent;
+import org.apache.cassandra.thrift.ColumnPath;
import org.apache.cassandra.thrift.ConsistencyLevel;
+import org.apache.cassandra.thrift.InvalidRequestException;
+import org.apache.cassandra.thrift.KeyRange;
+import org.apache.cassandra.thrift.KeySlice;
import org.apache.cassandra.thrift.KsDef;
import org.apache.cassandra.thrift.Mutation;
+import org.apache.cassandra.thrift.SlicePredicate;
+import org.apache.cassandra.thrift.SliceRange;
+import org.apache.cassandra.thrift.TimedOutException;
+import org.apache.cassandra.thrift.UnavailableException;
import org.apache.cassandra.utils.ByteBufferUtil;
-import org.apache.cassandra.utils.UUIDGen;
+import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

-public class DistributedCommitLog extends InternalCassandraClient {
+public class DistributedCommitLog extends InternalCassandraClient {
private static Logger logger =
LoggerFactory.getLogger(DistributedCommitLog.class);

public static final String KEYSPACE = "cirrus";
public static final String COLUMN_FAMILY = "CommitLog";
+ public static final int MAX_ROW_SIZE = 10;
+ public static final int BATCH_SIZE = 50;
+ public static final int IN_FUTURE = 1000 * 60;
private static boolean initialized = false;
private static DistributedCommitLog instance = null;

@@ -42,10 +54,13 @@
getConnection(null).system_add_keyspace(ksDef);
} catch (Exception e) {
logger.debug("Did not create System.CommitLog. (probably
already there)");
- }
+ }
try {
CfDef columnFamily = new CfDef(KEYSPACE, COLUMN_FAMILY);
- columnFamily.setKey_validation_class("TimeUUIDType");
+ columnFamily.setKey_validation_class("UTF8Type");
+ columnFamily.setDefault_validation_class("UTF8Type");
+ columnFamily.setComparator_type("UTF8Type");
+

getConnection(KEYSPACE).system_add_column_family(columnFamily);
initialized = true;
} catch (Exception e) {
@@ -54,24 +69,104 @@
}
}

- public void writeMutation(ConsistencyLevel consistencyLevel,
RowMutation rowMutation) throws Exception {
+ public List<LogEntry> writePending(ConsistencyLevel consistencyLevel,
RowMutation rowMutation) throws Throwable {
+ String keyspace = rowMutation.getTable();
+ ByteBuffer rowKey = rowMutation.key();
+ List<LogEntry> entries = new ArrayList<LogEntry>();
+ for (Integer cfId : rowMutation.getColumnFamilyIds()) {
+ ColumnFamily columnFamily = rowMutation.getColumnFamily(cfId);
+ LogEntry entry = new LogEntry(keyspace, columnFamily, rowKey,
consistencyLevel);
+ entries.add(entry);
+ writeLogEntry(entry);
+ }
+ return entries;
+ }
+
+ public List<LogEntry> getPending() throws Throwable {
+ SlicePredicate predicate = new SlicePredicate();
+ SliceRange range = new SliceRange(ByteBufferUtil.bytes(""),
ByteBufferUtil.bytes(""), false, MAX_ROW_SIZE);
+ predicate.setSlice_range(range);
+
+ KeyRange keyRange = new KeyRange(BATCH_SIZE);
+ keyRange.setStart_key(ByteBufferUtil.bytes(""));
+ keyRange.setEnd_key(ByteBufferUtil.EMPTY_BYTE_BUFFER);
+ ColumnParent parent = new ColumnParent(COLUMN_FAMILY);
+ List<KeySlice> rows =
getConnection(KEYSPACE).get_range_slices(parent, predicate, keyRange,
+ ConsistencyLevel.ALL);
+ List<LogEntry> logEntries = new ArrayList<LogEntry>();
+ for (KeySlice keySlice : rows) {
+ if (keySlice.columns.size() > 0) {
+ LogEntry logEntry = new LogEntry();
+ logEntry.setUuid(ByteBufferUtil.string(keySlice.key));
+ for (ColumnOrSuperColumn cc : keySlice.columns) {
+ if
(ByteBufferUtil.string(cc.column.name).equals("ks")) {
+
logEntry.setKeyspace(ByteBufferUtil.string(cc.column.value));
+ } else if
(ByteBufferUtil.string(cc.column.name).equals("cf")) {
+
logEntry.setColumnFamily(ByteBufferUtil.string(cc.column.value));
+ } else if
(ByteBufferUtil.string(cc.column.name).equals("row")) {
+ logEntry.setRowKey(cc.column.value);
+ } else if
(ByteBufferUtil.string(cc.column.name).equals("status")) {
+
logEntry.setStatus(LogEntryStatus.valueOf(ByteBufferUtil.string(cc.column.value)));
+ }
+ }
+ logEntries.add(logEntry);
+ }
+ }
+ return logEntries;
+ }
+
+ public void writeLogEntry(LogEntry logEntry) throws Throwable {
List<Mutation> slice = new ArrayList<Mutation>();
+ slice.add(getMutation("ks", logEntry.getKeyspace()));
+ slice.add(getMutation("cf", logEntry.getColumnFamily()));
+ slice.add(getMutation("row", logEntry.getRowKey()));
+ slice.add(getMutation("status", logEntry.getStatus().toString()));
+ for (ColumnOperation operation : logEntry.getOperations()) {
+ if (operation.isDelete()) {
+ slice.add(getMutation(operation.getName(), "DELETE"));
+ } else {
+ slice.add(getMutation(operation.getName(), "UPDATE"));
+ }
+ }
+ Map<ByteBuffer, Map<String, List<Mutation>>> mutationMap = new
HashMap<ByteBuffer, Map<String, List<Mutation>>>();
+ Map<String, List<Mutation>> cfMutations = new HashMap<String,
List<Mutation>>();
+ cfMutations.put(COLUMN_FAMILY, slice);
+
+ ByteBuffer rowKey = ByteBufferUtil.bytes(logEntry.getUuid());
+ mutationMap.put(rowKey, cfMutations);
+ getConnection(KEYSPACE).batch_mutate(mutationMap,
logEntry.getConsistencyLevel());
+ }
+
+ public void removeLogEntry(LogEntry logEntry) throws Throwable {
+ long deleteTime = System.currentTimeMillis() * 1000;
+ ColumnPath path = new ColumnPath(COLUMN_FAMILY);
+ getConnection(KEYSPACE)
+ .remove(ByteBufferUtil.bytes(logEntry.getUuid()), path,
deleteTime, ConsistencyLevel.ALL);
+ }
+
+ // Utility Methods
+ private Mutation getMutation(String name, String value) {
+ return getMutation(name, ByteBufferUtil.bytes(value));
+ }
+
+ private Mutation getMutation(String name, ByteBuffer value) {
+ return getMutation(ByteBufferUtil.bytes(name), value);
+ }
+
+ private Mutation getMutation(ByteBuffer name, String value) {
+ return getMutation(name, ByteBufferUtil.bytes(value));
+ }
+
+ private Mutation getMutation(ByteBuffer name, ByteBuffer value) {
Column c = new Column();
- c.setName(ByteBufferUtil.bytes("mutation"));
-
c.setValue(rowMutation.getSerializedBuffer(MessagingService.version_));
+ c.setName(name);
+ c.setValue(value);
c.setTimestamp(System.currentTimeMillis() * 1000);

Mutation m = new Mutation();
ColumnOrSuperColumn cc = new ColumnOrSuperColumn();
cc.setColumn(c);
m.setColumn_or_supercolumn(cc);
- slice.add(m);
- Map<ByteBuffer, Map<String, List<Mutation>>> mutationMap = new
HashMap<ByteBuffer, Map<String, List<Mutation>>>();
- Map<String, List<Mutation>> cfMutations = new HashMap<String,
List<Mutation>>();
- cfMutations.put(COLUMN_FAMILY, slice);
- byte[] rowKey = UUIDGen.getTimeUUIDBytes();
- mutationMap.put(ByteBuffer.wrap(rowKey), cfMutations);
- // TODO: Add Exception Handling.
- getConnection(KEYSPACE).batch_mutate(mutationMap,
consistencyLevel);
+ return m;
}
}
=======================================
---
/trunk/triggers/src/main/java/org/apache/virgil/triggers/TestTrigger.java
Thu Feb 2 07:42:56 2012
+++
/trunk/triggers/src/main/java/org/apache/virgil/triggers/TestTrigger.java
Mon Feb 6 10:22:32 2012
@@ -1,11 +1,12 @@
package org.apache.virgil.triggers;

-import org.apache.cassandra.db.RowMutation;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;

public class TestTrigger implements Trigger {
-
- public void process(RowMutation rowMutation) {
- // TODO Auto-generated method stub
- }
-
-}
+ private static Logger logger =
LoggerFactory.getLogger(TestTrigger.class);
+
+ public void process(LogEntry logEntry) {
+ logger.debug("Trigger processing : [" + logEntry.getUuid() + "]");
+ }
+}
=======================================
--- /trunk/triggers/src/main/java/org/apache/virgil/triggers/Trigger.java
Thu Feb 2 07:42:56 2012
+++ /trunk/triggers/src/main/java/org/apache/virgil/triggers/Trigger.java
Mon Feb 6 10:22:32 2012
@@ -1,9 +1,8 @@
package org.apache.virgil.triggers;

-import org.apache.cassandra.db.RowMutation;

public interface Trigger {

- public void process(RowMutation rowMutation);
+ public void process(LogEntry loEntry);

}
=======================================
---
/trunk/triggers/src/main/java/org/apache/virgil/triggers/TriggerTask.java
Thu Feb 2 07:42:56 2012
+++
/trunk/triggers/src/main/java/org/apache/virgil/triggers/TriggerTask.java
Mon Feb 6 10:22:32 2012
@@ -17,15 +17,24 @@
try {
logger.debug("Running triggers @ [" + new Date() + "]");
triggerMap = TriggerStore.getStore().getTriggers();
- } catch (Exception e){
- logger.error("Could not retrieve triggers.", e);
- }
-
- for (String path : triggerMap.keySet()){
- String keyspace = path.substring(path.indexOf(':'));
- String columnFamily = path.substring(path.indexOf(':'));
- logger.debug("[" + keyspace + "]:[" + columnFamily + "]");
- }
- }
-
-}
+ List<LogEntry> logEntries =
DistributedCommitLog.getLog().getPending();
+ for (LogEntry logEntry : logEntries) {
+ logger.debug("Processing Entry [" + logEntry.getUuid()
+ "]:[" + logEntry.getKeyspace() + "]:["
+ + logEntry.getColumnFamily() + "]");
+ String path = logEntry.getKeyspace() + ":" +
logEntry.getColumnFamily();
+ List<Trigger> triggers = triggerMap.get(path);
+ if (triggers != null) {
+ for (Trigger trigger : triggers) {
+ trigger.process(logEntry);
+ }
+ }
+
+ // Provided all processed properly, remove the logEntry
+ DistributedCommitLog.getLog().removeLogEntry(logEntry);
+ }
+ } catch (Throwable t) {
+ logger.error("Could not execute triggers.", t);
+ }
+
+ }
+}

Reply all
Reply to author
Forward
0 new messages