There's currently no morphline command available that would convert arbitrary JSON to arbitrary Avro. You could write and contribute a custom command that does something like that, though, for example along the following lines:
- copy kite-morphlines-avro module and call it kite-morphlines-json-avro (in order to not add a jackson dependency to the existing avro module)
- copy the toAvro command and call it jsonToAvro (code should be very similar)
- add jsonToAvro command that takes a jackson2 JsonNode object as input (e.g. as emitted by the readJSON command) and convert it to avro
- add unit tests
- submit a pull request
Below is a strawman to get started (incomplete and not tested at all):
public static Object toAvro(Object item, Schema schema) {
// RECORD, ENUM, ARRAY, MAP, UNION, FIXED, STRING, BYTES, INT, LONG, FLOAT,
// DOUBLE, BOOLEAN, NULL
switch (schema.getType()) {
case RECORD:
if (item instanceof Map) {
Map<String,Object> map = (Map) item;
IndexedRecord record = new GenericData.Record(schema);
for (Field field : schema.getFields()) {
Object value = map.get(field.name());
Object result = toAvro(value, field);
if (result == ERROR) {
return ERROR;
}
record.put(field.pos(), result);
}
return record;
}
if (item instanceof JsonNode) {
JsonNode jsonNode = (JsonNode)item;
if (jsonNode.isObject()) {
IndexedRecord record = new GenericData.Record(schema);
for (Field field : schema.getFields()) {
Object value = jsonNode.get(field.name());
Object result = toAvro(value, field);
if (result == ERROR) {
return ERROR;
}
record.put(field.pos(), result);
}
return record;
}
}
return ERROR;
case ENUM:
if (schema.hasEnumSymbol(item.toString())) {
return item.toString();
}
return ERROR;
case ARRAY:
if (item instanceof List) {
ListIterator iter = ((List)item).listIterator();
while (iter.hasNext()) {
Object result = toAvro(iter.next(), schema.getElementType());
if (result == ERROR) {
return ERROR;
}
iter.set(result);
}
return item;
}
if (item instanceof JsonNode) {
JsonNode jsonNode = (JsonNode)item;
if (jsonNode.isArray()) {
List results = new ArrayList(jsonNode.size());
Iterator<JsonNode> iter = jsonNode.iterator();
while (iter.hasNext()) {
Object result = toAvro(iter.next(), schema.getElementType());
if (result == ERROR) {
return ERROR;
}
results.add(result);
}
return results;
}
}
return ERROR;
case MAP:
if (item instanceof Map) {
Map<String,Object> map = (Map) item;
for (Map.Entry entry : map.entrySet()) {
if (!(entry.getKey() instanceof CharSequence)) {
return ERROR; // Avro requires that map keys are CharSequences
}
Object result = toAvro(entry.getValue(), schema.getValueType());
if (result == ERROR) {
return ERROR;
}
entry.setValue(result);
}
return item;
}
if (item instanceof JsonNode) {
JsonNode jsonNode = (JsonNode)item;
if (jsonNode.isObject()) {
Map<String, Object> results = new HashMap(2 * jsonNode.size());
Iterator<Map.Entry<String, JsonNode>> iter = jsonNode.getFields();
while (iter.hasNext()) {
Map.Entry<String, JsonNode> entry = iter.next();
Object result = toAvro(entry.getValue(), schema.getValueType());
if (result == ERROR) {
return ERROR;
}
results.put(entry.getKey(), result);
}
return results;
}
}
return ERROR;
case UNION:
return toAvroUnion(item, schema);
case FIXED:
if (item instanceof byte[]) {
return new GenericData.Fixed(schema, (byte[])item);
}
return ERROR;
case STRING:
assert item != null;
return item.toString();
case BYTES:
if (item instanceof ByteBuffer) {
return item;
}
if (item instanceof byte[]) {
return ByteBuffer.wrap((byte[])item);
}
return ERROR;
case INT:
if (item instanceof Integer) {
return item;
}
if (item instanceof Number) {
return ((Number) item).intValue();
}
try {
return Integer.valueOf(item.toString());
} catch (NumberFormatException e) {
return ERROR;
}
case LONG:
if (item instanceof Long) {
return item;
}
if (item instanceof Number) {
return ((Number) item).longValue();
}
try {
return Long.valueOf(item.toString());
} catch (NumberFormatException e) {
return ERROR;
}
case FLOAT:
if (item instanceof Float) {
return item;
}
if (item instanceof Number) {
return ((Number) item).floatValue();
}
try {
return Float.valueOf(item.toString());
} catch (NumberFormatException e) {
return ERROR;
}
case DOUBLE:
if (item instanceof Double) {
return item;
}
if (item instanceof Number) {
return ((Number) item).doubleValue();
}
try {
return Double.valueOf(item.toString());
} catch (NumberFormatException e) {
return ERROR;
}
case BOOLEAN:
if (item instanceof Boolean) {
return item;
}
assert item != null;
String str = item.toString();
if ("true".equals(str)) {
return Boolean.TRUE;
}
if ("false".equals(str)) {
return Boolean.FALSE;
}
return ERROR;
case NULL:
if (item == null) {
return null;
}
return ERROR;
default:
throw new MorphlineRuntimeException("Unknown Avro schema type: " + schema.getType());
}
}
Wolfgang.