Added:
/trunk/mapreduce/src/main/java/org/apache/virgil/mapreduce/Emitter.java
Modified:
/trunk/mapreduce/pom.xml
/trunk/mapreduce/src/main/java/org/apache/virgil/mapreduce/JobSpawner.java
/trunk/mapreduce/src/main/java/org/apache/virgil/mapreduce/RubyInvoker.java
/trunk/mapreduce/src/main/java/org/apache/virgil/mapreduce/RubyMapReduce.java
/trunk/mapreduce/src/test/java/org/apache/virgil/mapreduce/CassandraMapReduceTest.java
/trunk/pom.xml
/trunk/release/assembly/mapreduce/jars/virgil-mapreduce-hdeploy.jar
/trunk/release/pom.xml
/trunk/server/pom.xml
/trunk/server/src/main/java/org/apache/virgil/resource/MapReduceResource.java
=======================================
--- /dev/null
+++ /trunk/mapreduce/src/main/java/org/apache/virgil/mapreduce/Emitter.java
Fri Jan 27 13:50:54 2012
@@ -0,0 +1,28 @@
+//
+// Copyright (c) 2012 Health Market Science, Inc.
+//
+package org.apache.virgil.mapreduce;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.ObjectWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Mapper.Context;
+
+public class Emitter {
+ Text key;
+ ObjectWritable value;
+ Context context;
+
+ public Emitter(Context context) {
+ this.context = context;
+ this.value = new ObjectWritable();
+ this.key = new Text();
+ }
+
+ public void emit(String key, Object value) throws IOException,
InterruptedException {
+ this.value.set(value);
+ this.key.set(key);
+ context.write(this.key, this.value);
+ }
+}
=======================================
--- /trunk/mapreduce/pom.xml Thu Jan 26 14:01:22 2012
+++ /trunk/mapreduce/pom.xml Fri Jan 27 13:50:54 2012
@@ -4,7 +4,7 @@
<parent>
<groupId>org.apache.virgil</groupId>
<artifactId>virgil-parent</artifactId>
- <version>0.11.0-SNAPSHOT</version>
+ <version>0.12.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
=======================================
---
/trunk/mapreduce/src/main/java/org/apache/virgil/mapreduce/JobSpawner.java
Fri Jan 27 10:56:08 2012
+++
/trunk/mapreduce/src/main/java/org/apache/virgil/mapreduce/JobSpawner.java
Fri Jan 27 13:50:54 2012
@@ -21,10 +21,19 @@
public static final int OUTPUT_COLUMN_FAMILY = 6;
public static final int SOURCE = 7;
public static final int PARAMS = 8;
-
- private static String[] getArgs(String jobName, String cassandraHost,
int cassandraPort, String inputKeyspace,
- String inputColumnFamily, String outputKeyspace, String
outputColumnFamily, String source, String params,
- boolean local) {
+ public static final int MAP_EMIT_FLAG = 9;
+ public static final int REDUCE_RAW_DATA_FLAG = 10;
+
+ public static final String MAP_EMIT_FLAG_STR = "mapEmitFlag";
+ public static final String REDUCE_RAW_DATA_FLAG_STR
= "reduceRawDataFlag";
+
+ private static String[] getArgs(String jobName, String cassandraHost,
int cassandraPort,
+ String inputKeyspace,
+ String inputColumnFamily, String
outputKeyspace,
+ String outputColumnFamily, String
source, String params,
+ String mapEmitFlag,
+ String reduceRawDataFlag,
+ boolean local) {
List<String> args = new ArrayList<String>();
if (!local) {
args.add("mapreduce/jars/virgil-mapreduce-hdeploy.jar");
@@ -39,6 +48,8 @@
args.add(outputColumnFamily);
args.add(source);
args.add(params);
+ args.add(mapEmitFlag);
+ args.add(reduceRawDataFlag);
LOG.info("Running job against [" + cassandraHost + ":" +
cassandraPort + "]");
return args.toArray(new String[0]);
}
@@ -62,23 +73,32 @@
if (args.length > JobSpawner.PARAMS &&
StringUtils.isNotBlank(args[JobSpawner.PARAMS])) {
conf.set("params", args[JobSpawner.PARAMS]);
}
+ if (StringUtils.isNotBlank(args[MAP_EMIT_FLAG])){
+ conf.set(MAP_EMIT_FLAG_STR, args[MAP_EMIT_FLAG]);
+ }
+ if (StringUtils.isNotBlank(args[MAP_EMIT_FLAG])){
+ conf.set(MAP_EMIT_FLAG_STR, args[MAP_EMIT_FLAG]);
+ }
+ if (StringUtils.isNotBlank(args[REDUCE_RAW_DATA_FLAG])){
+ conf.set(REDUCE_RAW_DATA_FLAG_STR, args[REDUCE_RAW_DATA_FLAG]);
+ }
return conf;
}
public static void spawnLocal(String jobName, String cassandraHost,
int cassandraPort, String inputKeyspace,
- String inputColumnFamily, String outputKeyspace, String
outputColumnFamily, String source, String params)
+ String inputColumnFamily, String outputKeyspace, String
outputColumnFamily, String source, String params, String mapEmitFlag,
String reduceRawDataFlag)
throws Exception {
String[] args = JobSpawner.getArgs(jobName, cassandraHost,
cassandraPort, inputKeyspace, inputColumnFamily,
- outputKeyspace, outputColumnFamily, source, params, true);
+ outputKeyspace, outputColumnFamily, source, params,
mapEmitFlag, reduceRawDataFlag, true);
Configuration conf = JobSpawner.getConfiguration(args);
ToolRunner.run(conf, new RubyMapReduce(), new String[0]);
}
public static void spawnRemote(String jobName, String cassandraHost,
int cassandraPort, String inputKeyspace,
- String inputColumnFamily, String outputKeyspace, String
outputColumnFamily, String source, String params)
+ String inputColumnFamily, String outputKeyspace, String
outputColumnFamily, String source, String params, String mapEmitFlag,
String reduceRawDataFlag)
throws Throwable {
String[] args = JobSpawner.getArgs(jobName, cassandraHost,
cassandraPort, inputKeyspace, inputColumnFamily,
- outputKeyspace, outputColumnFamily, source, params, false);
+ outputKeyspace, outputColumnFamily, source, params,
mapEmitFlag, reduceRawDataFlag, false);
RunJar.main(args);
}
}
=======================================
---
/trunk/mapreduce/src/main/java/org/apache/virgil/mapreduce/RubyInvoker.java
Tue Jan 17 12:30:34 2012
+++
/trunk/mapreduce/src/main/java/org/apache/virgil/mapreduce/RubyInvoker.java
Fri Jan 27 13:50:54 2012
@@ -1,25 +1,46 @@
package org.apache.virgil.mapreduce;
-import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import javax.script.ScriptException;
+import org.apache.hadoop.io.ObjectWritable;
import org.jruby.RubyArray;
import org.jruby.embed.ScriptingContainer;
public class RubyInvoker {
public static synchronized RubyArray invokeMap(ScriptingContainer
container, Object rubyReceiver, String rowKey,
- Map<String, String> columns, Map<String, Object> params)
throws ScriptException {
+ Map<String, String> columns, Emitter emitter, Map<String,
Object> params) throws ScriptException {
// TODO- see if we can possibly deprecate the one without params
- if (params == null) {
- return (RubyArray) container.callMethod(rubyReceiver, "map",
rowKey, columns);
+ if (params == null) {
+ return (RubyArray) container.callMethod(rubyReceiver, "map",
rowKey, columns, emitter);
} else {
- return (RubyArray) container.callMethod(rubyReceiver, "map",
rowKey, columns, params);
+ return (RubyArray) container.callMethod(rubyReceiver, "map",
rowKey, columns, emitter, params);
}
}
-
+
+ public static synchronized RubyArray invokeMap(ScriptingContainer
container, Object rubyReceiver, String rowKey,
+ Map<String, String>
columns, Map<String, Object> params) throws ScriptException {
+ // TODO- see if we can
possibly deprecate the one without params
+ if (params == null) {
+ return (RubyArray)
container.callMethod(rubyReceiver, "map", rowKey, columns);
+ } else {
+ return (RubyArray)
container.callMethod(rubyReceiver, "map", rowKey, columns, params);
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ public static synchronized Map<String, Map<String, String>>
invokeReduce(ScriptingContainer container,
+ Object rubyReceiver, String key, Iterable<ObjectWritable>
values, Map<String, Object> params) throws ScriptException {
+ // TODO- see if we can possibly deprecate the one without params
+ if (params == null) {
+ return (Map<String, Map<String, String>>)
container.callMethod(rubyReceiver, "reduce", key, values);
+ } else {
+ return (Map<String, Map<String, String>>)
container.callMethod(rubyReceiver, "reduce", key, values, params);
+ }
+ }
+
@SuppressWarnings("unchecked")
public static synchronized Map<String, Map<String, String>>
invokeReduce(ScriptingContainer container,
Object rubyReceiver, String key, List<Object> values,
Map<String, Object> params) throws ScriptException {
=======================================
---
/trunk/mapreduce/src/main/java/org/apache/virgil/mapreduce/RubyMapReduce.java
Thu Jan 26 14:01:22 2012
+++
/trunk/mapreduce/src/main/java/org/apache/virgil/mapreduce/RubyMapReduce.java
Fri Jan 27 13:50:54 2012
@@ -19,6 +19,7 @@
import org.apache.cassandra.thrift.SlicePredicate;
import org.apache.cassandra.thrift.SliceRange;
import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.io.ObjectWritable;
@@ -87,43 +88,77 @@
if (getConf().get("params") != null){
job.getConfiguration().set("params", getConf().get("params"));
}
+ if
(StringUtils.isNotBlank(getConf().get(JobSpawner.MAP_EMIT_FLAG_STR))){
+ job.getConfiguration().set(JobSpawner.MAP_EMIT_FLAG_STR,
getConf().get(JobSpawner.MAP_EMIT_FLAG_STR));
+ }
+ if
(StringUtils.isNotBlank(getConf().get(JobSpawner.REDUCE_RAW_DATA_FLAG_STR))){
+ job.getConfiguration().set(JobSpawner.REDUCE_RAW_DATA_FLAG_STR,
getConf().get(JobSpawner.REDUCE_RAW_DATA_FLAG_STR));
+ }
job.setOutputFormatClass(ColumnFamilyOutputFormat.class);
SlicePredicate sp = new SlicePredicate();
SliceRange sr = new SliceRange(ByteBufferUtil.EMPTY_BYTE_BUFFER,
ByteBufferUtil.EMPTY_BYTE_BUFFER, false,
MAX_COLUMNS_PER_ROW);
sp.setSlice_range(sr);
ConfigHelper.setInputSlicePredicate(job.getConfiguration(), sp);
- // job.waitForCompletion(true);
+ //job.waitForCompletion(true);
job.submit();
return 0;
}
public static class CassandraMapper extends
- Mapper<ByteBuffer, SortedMap<ByteBuffer, IColumn>, Text,
ObjectWritable> {
- private ScriptingContainer rubyContainer = null;
- private Object rubyReceiver = null;
- private Map<String, Object> params;
- private static Logger logger =
LoggerFactory.getLogger(CassandraMapper.class);
-
- @Override
- protected void map(ByteBuffer key, SortedMap<ByteBuffer, IColumn>
value, Context context) throws IOException,
- InterruptedException {
- Map<String, String> columns = new HashMap<String, String>();
- for (ByteBuffer b : value.keySet()) {
- columns.put(ByteBufferUtil.string(b),
ByteBufferUtil.string(value.get(b).value()));
- }
- String rowKey = ByteBufferUtil.string(key);
- try {
- RubyArray tuples = RubyInvoker.invokeMap(rubyContainer,
rubyReceiver, rowKey, columns, params);
- for (Object element : tuples) {
- RubyArray tuple = (RubyArray) element;
- context.write(new Text((String) tuple.get(0)), new
ObjectWritable(tuple.get(1)));
- }
- } catch (Exception e) {
- // TODO: Make this more severe.
- logger.warn("Exception running map on [" + rowKey + "]",
e);
- }
- }
+ Mapper<ByteBuffer, SortedMap<ByteBuffer, IColumn>, Text,
ObjectWritable> {
+ private ScriptingContainer rubyContainer = null;
+ private Object rubyReceiver = null;
+ private Map<String, Object> params;
+ private static Logger logger =
LoggerFactory.getLogger(CassandraMapper.class);
+ private Emitter emitter;
+
+ @Override
+ protected void map(ByteBuffer key, SortedMap<ByteBuffer, IColumn>
value, Context context)
+ throws IOException,
+ InterruptedException {
+ if (emitter != null) {
+ mapRubyEmit(key, value, context);
+ } else {
+ mapJavaEmit(key, value, context);
+ }
+ }
+
+ protected void mapJavaEmit(ByteBuffer key, SortedMap<ByteBuffer,
IColumn> value, Context context) throws IOException,
+ InterruptedException {
+Map<String, String> columns = new HashMap<String, String>();
+for (ByteBuffer b : value.keySet()) {
+ columns.put(ByteBufferUtil.string(b),
ByteBufferUtil.string(value.get(b).value()));
+}
+String rowKey = ByteBufferUtil.string(key);
+try {
+ RubyArray tuples = RubyInvoker.invokeMap(rubyContainer, rubyReceiver,
rowKey, columns, params);
+ for (Object element : tuples) {
+ RubyArray tuple = (RubyArray) element;
+ context.write(new Text((String) tuple.get(0)), new
ObjectWritable(tuple.get(1)));
+ }
+} catch (Exception e) {
+ // TODO: Make this more severe.
+ logger.warn("Exception running map on [" + rowKey + "]", e);
+}
+}
+
+ protected void mapRubyEmit(ByteBuffer key, SortedMap<ByteBuffer,
IColumn> value, Context context)
+ throws IOException,
+ InterruptedException {
+ Map<String, String> columns = new HashMap<String, String>();
+ for (ByteBuffer b : value.keySet()) {
+ columns.put(ByteBufferUtil.string(b),
ByteBufferUtil.string(value.get(b).value()));
+ }
+ String rowKey = ByteBufferUtil.string(key);
+ try {
+ RubyInvoker.invokeMap(rubyContainer, rubyReceiver, rowKey,
columns, this.emitter, params);
+ }
+ catch (Exception e) {
+ // TODO: Make this more severe.
+ logger.warn("Exception running map on [" + rowKey + "]", e);
+ }
+ }
@SuppressWarnings("unchecked")
@Override
@@ -136,6 +171,11 @@
params = new HashMap<String, Object>();
params = (Map<String, Object>)
JSONValue.parse(context.getConfiguration().get("params"));
}
+
if(StringUtils.isNotBlank(context.getConfiguration().get(org.apache.virgil.mapreduce.JobSpawner.MAP_EMIT_FLAG_STR)))
{
+ emitter = new Emitter(context);
+ } else {
+ emitter = null;
+ }
}
}
@@ -144,28 +184,58 @@
private Object rubyReceiver = null;
private Map<String, Object> params;
+ private boolean reduceRawDataFlag;
+
@Override
protected void reduce(Text key, Iterable<ObjectWritable> vals,
Context context) throws IOException,
- InterruptedException {
- List<Object> values = new ArrayList<Object>();
- for (ObjectWritable value : vals) {
- values.add(value.get());
- }
- try {
- Map<String, Map<String, String>> results =
RubyInvoker.invokeReduce(rubyContainer, rubyReceiver,
- key.toString(), values, params);
- for (String rowKey : results.keySet()) {
- Map<String, String> columns = results.get(rowKey);
- for (String columnName : columns.keySet()) {
- String columnValue = columns.get(columnName);
- context.write(ByteBufferUtil.bytes(rowKey),
-
CassandraReducer.getMutationList(columnName, columnValue));
- }
- }
- } catch (Exception e) {
- throw new RuntimeException(e);
+ InterruptedException {
+ if(reduceRawDataFlag) {
+ reduceRawData(key, vals, context);
+ } else {
+ reduceNonRawData(key, vals, context);
+ }
+
+}
+
+ protected void reduceRawData(Text key, Iterable<ObjectWritable>
vals, Context context) throws IOException,
+ InterruptedException {
+ try {
+ Map<String, Map<String, String>> results =
RubyInvoker.invokeReduce(rubyContainer, rubyReceiver,
+ key.toString(), vals, params);
+ for (String rowKey : results.keySet()) {
+ Map<String, String> columns = results.get(rowKey);
+ for (String columnName : columns.keySet()) {
+ String columnValue = columns.get(columnName);
+ context.write(ByteBufferUtil.bytes(rowKey),
+ CassandraReducer.getMutationList(columnName,
columnValue));
}
}
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+}
+
+ protected void reduceNonRawData(Text key, Iterable<ObjectWritable>
vals, Context context) throws IOException,
+ InterruptedException {
+ List<Object> values = new ArrayList<Object>();
+ for (ObjectWritable value : vals) {
+ values.add(value.get());
+ }
+ try {
+ Map<String, Map<String, String>> results =
RubyInvoker.invokeReduce(rubyContainer, rubyReceiver,
+ key.toString(), values, params);
+ for (String rowKey : results.keySet()) {
+ Map<String, String> columns = results.get(rowKey);
+ for (String columnName : columns.keySet()) {
+ String columnValue = columns.get(columnName);
+ context.write(ByteBufferUtil.bytes(rowKey),
+ CassandraReducer.getMutationList(columnName,
columnValue));
+ }
+ }
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
@SuppressWarnings("unchecked")
@Override
@@ -178,6 +248,7 @@
params = new HashMap<String, Object>();
params = (Map<String, Object>)
JSONValue.parse(context.getConfiguration().get("params"));
}
+ reduceRawDataFlag =
StringUtils.isNotBlank(context.getConfiguration().get(org.apache.virgil.mapreduce.JobSpawner.REDUCE_RAW_DATA_FLAG_STR));
}
private static List<Mutation> getMutationList(String name, String
value) {
=======================================
---
/trunk/mapreduce/src/test/java/org/apache/virgil/mapreduce/CassandraMapReduceTest.java
Mon Jan 16 18:14:34 2012
+++
/trunk/mapreduce/src/test/java/org/apache/virgil/mapreduce/CassandraMapReduceTest.java
Fri Jan 27 13:50:54 2012
@@ -9,7 +9,7 @@
@Test
public void testMapReduce() throws Exception {
String source = RubyInvokerTest.getSource();
- JobSpawner.spawnLocal("test-reduce", "localhost",
9160, "playground", "toys", "datastore", "test", source, null);
+ JobSpawner.spawnLocal("test-reduce", "localhost",
9160, "playground", "toys", "datastore", "test", source, null, null, null);
}
}
=======================================
--- /trunk/pom.xml Thu Jan 26 14:01:22 2012
+++ /trunk/pom.xml Fri Jan 27 13:50:54 2012
@@ -4,7 +4,7 @@
<groupId>org.apache.virgil</groupId>
<artifactId>virgil-parent</artifactId>
- <version>0.11.0-SNAPSHOT</version>
+ <version>0.12.0-SNAPSHOT</version>
<packaging>pom</packaging>
<name> Virgil : Parent</name>
@@ -14,6 +14,21 @@
<module>release</module>
</modules>
+ <dependencyManagement>
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.virgil</groupId>
+ <artifactId>virgil-mapreduce</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.virgil</groupId>
+ <artifactId>virgil-server</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ </dependencies>
+ </dependencyManagement>
+
<dependencies>
<dependency>
<groupId>log4j</groupId>
=======================================
--- /trunk/release/assembly/mapreduce/jars/virgil-mapreduce-hdeploy.jar Thu
Jan 26 14:01:22 2012
+++ /trunk/release/assembly/mapreduce/jars/virgil-mapreduce-hdeploy.jar Fri
Jan 27 13:50:54 2012
File is too large to display a diff.
=======================================
--- /trunk/release/pom.xml Thu Jan 26 14:01:22 2012
+++ /trunk/release/pom.xml Fri Jan 27 13:50:54 2012
@@ -5,7 +5,7 @@
<parent>
<groupId>org.apache.virgil</groupId>
<artifactId>virgil-parent</artifactId>
- <version>0.11.0-SNAPSHOT</version>
+ <version>0.12.0-SNAPSHOT</version>
</parent>
<artifactId>virgil</artifactId>
@@ -16,12 +16,10 @@
<dependency>
<groupId>org.apache.virgil</groupId>
<artifactId>virgil-mapreduce</artifactId>
- <version>0.10.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.virgil</groupId>
<artifactId>virgil-server</artifactId>
- <version>0.10.0-SNAPSHOT</version>
</dependency>
</dependencies>
=======================================
--- /trunk/server/pom.xml Thu Jan 26 14:01:22 2012
+++ /trunk/server/pom.xml Fri Jan 27 13:50:54 2012
@@ -4,7 +4,7 @@
<parent>
<groupId>org.apache.virgil</groupId>
<artifactId>virgil-parent</artifactId>
- <version>0.11.0-SNAPSHOT</version>
+ <version>0.12.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
@@ -17,7 +17,6 @@
<dependency>
<groupId>org.apache.virgil</groupId>
<artifactId>virgil-mapreduce</artifactId>
- <version>0.10.0-SNAPSHOT</version>
</dependency>
<dependency>
=======================================
---
/trunk/server/src/main/java/org/apache/virgil/resource/MapReduceResource.java
Tue Jan 24 13:31:50 2012
+++
/trunk/server/src/main/java/org/apache/virgil/resource/MapReduceResource.java
Fri Jan 27 13:50:54 2012
@@ -5,6 +5,7 @@
import javax.ws.rs.Produces;
import javax.ws.rs.QueryParam;
+import org.apache.commons.lang.StringUtils;
import org.apache.virgil.CassandraStorage;
import org.apache.virgil.VirgilService;
import org.apache.virgil.config.VirgilConfiguration;
@@ -32,7 +33,10 @@
@QueryParam("inputKeyspace") String inputKeyspace,
@QueryParam("inputColumnFamily") String inputColumnFamily,
@QueryParam("outputKeyspace") String outputKeyspace,
- @QueryParam("outputColumnFamily") String outputColumnFamily,
String source) throws Throwable {
+ @QueryParam("outputColumnFamily") String outputColumnFamily,
+ @QueryParam("mapEmitFlag") String mapEmitFlag,
+ @QueryParam("reduceRawDataFlag") String reduceRawDataFlag,
+ String source) throws Throwable {
if (inputKeyspace == null)
throw new RuntimeException("Must supply inputKeyspace.");
if (inputColumnFamily == null)
@@ -42,6 +46,7 @@
if (outputColumnFamily == null)
throw new RuntimeException("Must supply outputColumnFamily.");
+
if (logger.isDebugEnabled()) {
logger.debug("Launching job [" + jobName + "]");
logger.debug(" --> Input : Keyspace [" + inputKeyspace + "],
ColumnFamily [" + inputColumnFamily + "]");
@@ -51,11 +56,11 @@
if (VirgilConfiguration.isEmbedded()) {
logger.debug("Running in embedded mode.");
JobSpawner.spawnLocal(jobName, VirgilConfiguration.getHost(),
VirgilConfiguration.getPort(), inputKeyspace,
- inputColumnFamily, outputKeyspace, outputColumnFamily,
source, params);
+ inputColumnFamily, outputKeyspace, outputColumnFamily,
source, params, mapEmitFlag, reduceRawDataFlag);
} else {
logger.debug("Spawning job remotely.");
JobSpawner.spawnRemote(jobName, VirgilConfiguration.getHost(),
VirgilConfiguration.getPort(),
- inputKeyspace, inputColumnFamily, outputKeyspace,
outputColumnFamily, source, params);
+ inputKeyspace, inputColumnFamily, outputKeyspace,
outputColumnFamily, source, params, mapEmitFlag, reduceRawDataFlag);
}
}