Nested JSON using Morphlines

141 views
Skip to first unread message

Buntu Dev

unread,
May 1, 2015, 1:20:49 AM5/1/15
to cdk...@cloudera.org
I got a stream of JSON events that I would like to transform using Morphlines and wanted to know how to map the fields to match the Avro schema. I was able to generate the schema and do an import using Kite CLI but I need to transform the events and redirect to the Kite Dataset sink. 

Heres the Avro schema:

{
  "namespace" : "my.com.ns",
  "name": "myrecord",
  "type" :  "record",
  "fields" : [
     {"name": "uid", "type": "int"},
     {"name": "somefield", "type": "string"},
     {"name": "options", "type": {
        "type": "array",
        "items": {
            "type": "record",
            "name": "lvl2_record",
            "fields": [
                {"name": "item1_lvl2", "type": "string"},
                {"name": "item2_lvl2", "type": {
                    "type": "array",
                    "items": {
                        "type": "record",
                        "name": "lvl3_record",
                        "fields": [
                            {"name": "item1_lvl3", "type": "string"},
                            {"name": "item2_lvl3", "type": "string"}
                        ]
                    }
                }}
            ]
        }
     }}
  ]
}
Here is a sample JSON document:

{
 "uid": 29153333,
 "somefield": "somevalue",
 "options": [
   {
     "item1_lvl2": "a",
     "item2_lvl2": [
       {
         "item1_lvl3": "x1",
         "item2_lvl3": "y1"
       },
       {
         "item1_lvl3": "x2",
         "item2_lvl3": "y2"
       }
     ]
   }
 ]
}
Thanks!

Wolfgang Hoschek

unread,
May 1, 2015, 9:12:14 AM5/1/15
to Buntu Dev, cdk...@cloudera.org
There's currently no morphline command available that would convert arbitrary JSON to arbitrary Avro. You could write and contribute a custom command that does something like that, though, for example along the following lines:

- copy kite-morphlines-avro module and call it kite-morphlines-json-avro (in order to not add a jackson dependency to the existing avro module)
- copy the toAvro command and call it jsonToAvro (code should be very similar)
- add jsonToAvro command that takes a jackson2 JsonNode object as input (e.g. as emitted by the readJSON command) and convert it to avro
- in the new module adjust AvroConversions.java (see https://github.com/kite-sdk/kite/blob/master/kite-morphlines/kite-morphlines-avro/src/main/java/org/kitesdk/morphline/avro/AvroConversions.java#L69 ) to handle the various jackson2 input datatypes - JsonNode, Object, array, string, numbers, etc.
- add unit tests
- submit a pull request

Below is a strawman to get started (incomplete and not tested at all):

  public static Object toAvro(Object item, Schema schema) {
   
// RECORD, ENUM, ARRAY, MAP, UNION, FIXED, STRING, BYTES, INT, LONG, FLOAT,
   
// DOUBLE, BOOLEAN, NULL
   
switch (schema.getType()) {
     
case RECORD:
       
if (item instanceof Map) {
          Map<String,Object>
map = (Map) item;
          IndexedRecord
record = new GenericData.Record(schema);
         
for (Field field : schema.getFields()) {
            Object
value = map.get(field.name());
            Object
result = toAvro(value, field);
           
if (result == ERROR) {
             
return ERROR;
            }
           
record.put(field.pos(), result);
          }
         
return record;
        }
       
if (item instanceof JsonNode) {
          JsonNode
jsonNode = (JsonNode)item;
         
if (jsonNode.isObject()) {
            IndexedRecord
record = new GenericData.Record(schema);
           
for (Field field : schema.getFields()) {
              Object
value = jsonNode.get(field.name());
              Object
result = toAvro(value, field);
             
if (result == ERROR) {
               
return ERROR;
              }
             
record.put(field.pos(), result);
            }
           
return record;
          }
        }
       
return ERROR;
     
case ENUM:
       
if (schema.hasEnumSymbol(item.toString())) {
         
return item.toString();
        }
       
return ERROR;
     
case ARRAY:
       
if (item instanceof List) {
          ListIterator
iter = ((List)item).listIterator();
         
while (iter.hasNext()) {
            Object
result = toAvro(iter.next(), schema.getElementType());
           
if (result == ERROR) {
             
return ERROR;
            }
           
iter.set(result);
          }
         
return item;
        }
       
if (item instanceof JsonNode) {
          JsonNode
jsonNode = (JsonNode)item;
         
if (jsonNode.isArray()) {
            List
results = new ArrayList(jsonNode.size());
            Iterator<JsonNode>
iter = jsonNode.iterator();
           
while (iter.hasNext()) {
              Object
result = toAvro(iter.next(), schema.getElementType());
             
if (result == ERROR) {
               
return ERROR;
              }
             
results.add(result);
            }
           
return results;
          }
        }
       
return ERROR;
     
case MAP:
       
if (item instanceof Map) {
          Map<String,Object>
map = (Map) item;
         
for (Map.Entry entry : map.entrySet()) {
           
if (!(entry.getKey() instanceof CharSequence)) {
             
return ERROR; // Avro requires that map keys are CharSequences
            }
            Object
result = toAvro(entry.getValue(), schema.getValueType());
           
if (result == ERROR) {
             
return ERROR;
            }
           
entry.setValue(result);
          }
         
return item;
        }
       
if (item instanceof JsonNode) {
          JsonNode
jsonNode = (JsonNode)item;
         
if (jsonNode.isObject()) {
            Map<String, Object>
results = new HashMap(2 * jsonNode.size());
            Iterator<Map.Entry<String, JsonNode>>
iter = jsonNode.getFields();
           
while (iter.hasNext()) {
              Map.Entry<String, JsonNode>
entry = iter.next();
              Object
result = toAvro(entry.getValue(), schema.getValueType());
             
if (result == ERROR) {
               
return ERROR;
              }
             
results.put(entry.getKey(), result);
            }
           
return results;
          }
        }
       
return ERROR;
     
case UNION:
       
return toAvroUnion(item, schema);
     
case FIXED:
       
if (item instanceof byte[]) {
         
return new GenericData.Fixed(schema, (byte[])item);
        }         
       
return ERROR;
     
case STRING:
       
assert item != null;
       
return item.toString();
     
case BYTES:
       
if (item instanceof ByteBuffer) {
         
return item;
        }
       
if (item instanceof byte[]) {
         
return ByteBuffer.wrap((byte[])item);
        } 
       
return ERROR;
     
case INT:
       
if (item instanceof Integer) {
         
return item;
        }
       
if (item instanceof Number) {
         
return ((Number) item).intValue();
        }
       
try {
         
return Integer.valueOf(item.toString());
        }
catch (NumberFormatException e) {
         
return ERROR;
        }
     
case LONG:
       
if (item instanceof Long) {
         
return item;
        }
       
if (item instanceof Number) {
         
return ((Number) item).longValue();
        }
       
try {
         
return Long.valueOf(item.toString());
        }
catch (NumberFormatException e) {
         
return ERROR;
        }
     
case FLOAT:
       
if (item instanceof Float) {
         
return item;
        }
       
if (item instanceof Number) {
         
return ((Number) item).floatValue();
        }
       
try {
         
return Float.valueOf(item.toString());
        }
catch (NumberFormatException e) {
         
return ERROR;
        }
     
case DOUBLE:
       
if (item instanceof Double) {
         
return item;
        }
       
if (item instanceof Number) {
         
return ((Number) item).doubleValue();
        }
       
try {
         
return Double.valueOf(item.toString());
        }
catch (NumberFormatException e) {
         
return ERROR;
        }
     
case BOOLEAN:
       
if (item instanceof Boolean) {
         
return item;
        }
       
assert item != null;
        String
str = item.toString();
       
if ("true".equals(str)) {
         
return Boolean.TRUE;
        }
       
if ("false".equals(str)) {
         
return Boolean.FALSE;
        }
       
return ERROR;
     
case NULL:
       
if (item == null) {
         
return null;
        }
       
return ERROR;
     
default:
       
throw new MorphlineRuntimeException("Unknown Avro schema type: " + schema.getType());
    }
  }

Wolfgang.

--
You received this message because you are subscribed to the Google Groups "CDK Development" group.
To unsubscribe from this group and stop receiving emails from it, send an email to cdk-dev+u...@cloudera.org.
For more options, visit https://groups.google.com/a/cloudera.org/d/optout.

Buntu Dev

unread,
May 1, 2015, 12:47:43 PM5/1/15
to Wolfgang Hoschek, cdk...@cloudera.org
Thanks Wolfgang for the pointers.

Ryan Blue

unread,
May 3, 2015, 5:19:02 PM5/3/15
to Buntu Dev, Wolfgang Hoschek, cdk...@cloudera.org
There are some utility methods in kite-data that you can use to do the
conversion. The main two can give you a schema for a JSON object and
convert a JSON object to an Avro generic object using a particular
Schema. There is also a utility for merging two Schemas, so you can get
the Schema for multiple JSON objects and then merge them to get an
overall schema that closely resembles the file. These are what we use in
the Kite CLI JSON support, which might be helpful as well.

rb
>> <mailto:cdk-dev+u...@cloudera.org>.
>> For more options, visit
>> https://groups.google.com/a/cloudera.org/d/optout.
>
>
> --
> You received this message because you are subscribed to the Google
> Groups "CDK Development" group.
> To unsubscribe from this group and stop receiving emails from it, send
> an email to cdk-dev+u...@cloudera.org
> <mailto:cdk-dev+u...@cloudera.org>.
--
Ryan Blue
Software Engineer
Cloudera, Inc.

Buntu Dev

unread,
May 4, 2015, 3:45:03 AM5/4/15
to Ryan Blue, Wolfgang Hoschek, cdk...@cloudera.org
Thanks Ryan. Does this mean we could use the JSONUtils.convertToAvro() on the JsonNode object and not have to invoke the AvroConversions on each field of the schema? 


Ryan Blue

unread,
May 4, 2015, 11:20:34 AM5/4/15
to Buntu Dev, Wolfgang Hoschek, cdk...@cloudera.org
On 05/04/2015 08:45 AM, Buntu Dev wrote:
> Thanks Ryan. Does this mean we could use the JSONUtils.convertToAvro()
> on the JsonNode object and not have to invoke the AvroConversions on
> each field of the schema?

What AvroConversions are you referring to?

I think JsonUtil.convertToAvro should do what you want. It handles whole
records, as you can see in the JSONFileReader implementation [1].

rb


[1]:
https://github.com/kite-sdk/kite/blob/master/kite-data/kite-data-core/src/main/java/org/kitesdk/data/spi/filesystem/JSONFileReader.java#L103

Buntu Dev

unread,
May 4, 2015, 1:49:10 PM5/4/15
to Ryan Blue, Wolfgang Hoschek, cdk...@cloudera.org
I was looking at the AvroConversions pointed to by Wolfgang:

Its awesome if I can rely on JsonUtil.convertToAvro that I can apply on the input record's _attachement_body, will give that a try.

Thanks!

--
You received this message because you are subscribed to the Google Groups "CDK Development" group.
To unsubscribe from this group and stop receiving emails from it, send an email to cdk-dev+u...@cloudera.org.
Reply all
Reply to author
Forward
0 new messages