JSON to Avro, using subrecord is not working

79 views
Skip to first unread message

Bas van de Lustgraaf

unread,
Mar 25, 2015, 7:11:02 PM3/25/15
to cdk...@cloudera.org

I'm trying to convert JSON into Avro using the kite-sdk morphline module. After playing around I'm able to convert the JSON into Avro using a simple schema (no complex data types and no nested structure).

 

Then I took it one step further and modified the Avro schema as displayed below (subrec.avsc). As you can see the schema consist of a subrecord. As soon as I tried to convert the JSON to Avro using the morphlines.conf and the subrec.avsc it failed.


The error from the trace is `Cannot convert item: [] to schema`

 

Thanks

 

The morphlines.conf 

morphlines : [

   
{   id : morphline1

   importCommands
: ["org.kitesdk.**"]   commands : [

     
# Read the JSON blob

     
{ readJson: {} }


     
{ logError { format : "record: {}", args : ["@{}"] } }


     
# Extract JSON

     
{ extractJsonPaths { flatten: false, paths: {

             
"/record_type[]/alert/action" : /alert/action,

             
"/record_type[]/alert/signature_id" : /alert/signature_id,

             
"/record_type[]/alert/signature" : /alert/signature,

             
"/record_type[]/alert/category" : /alert/category,

             
"/record_type[]/alert/severity" : /alert/severity

     
} } }


     
{ logError { format : "EXTRACTED THIS : {}", args : ["@{}"] } }


     
{ extractJsonPaths { flatten: false, paths: {

              timestamp
: /timestamp,

              event_type : /
event_type,

              source_ip
: /src_ip,

              source_port : /
src_port,

              destination_ip
: /dest_ip,

              destination_port : /
dest_port,

              protocol
: /proto,

      } } }


      # Create Avro according to schema

      { logError { format : "WE GO TO AVRO"} }

      { toAvro { schemaFile : /
etc/flume/conf/conf.empty/subrec.avsc } }


     
# Create Avro container

     
{ logError { format : "WE GO TO BINARY"} }

     
{ writeAvroToByteArray { format: containerlessBinary } }


     
{ logError { format : "DONE!!!"} }

   
]

   
}

]


 

And the subrec.avsc

{

 
"type" : "record",

 
"name" : "Event",

 
"fields" : [ {

   
"name" : "timestamp",

   
"type" : "string"

 
}, {

   
"name" : "event_type",

   
"type" : "string"

 
}, {

   
"name" : "source_ip",

   
"type" : "string"

 
}, {

   
"name" : "source_port",

   
"type" : "int"

 
}, {

   
"name" : "destination_ip",

   
"type" : "string"

 
}, {

   
"name" : "destination_port",

   
"type" : "int"

 
}, {

   
"name" : "protocol",

   
"type" : "string"

 
}, {

   
"name": "record_type",

   
"type" : ["null", {

     
"name" : "alert",

     
"type" : "record",

     
"fields" : [ {

           
"name" : "action",

           
"type" : "string"

       
}, {

           
"name" : "signature_id",

           
"type" : "int"

       
}, {

           
"name" : "signature",

           
"type" : "string"

       
}, {

           
"name" : "category",

           
"type" : "string"

       
}, {

           
"name" : "severity",

           
"type" : "int"

       
}

     
] } ]

 
} ]

}


Trace

25 Mar 2015 20:52:28,328 TRACE [New I/O  worker #1] (org.kitesdk.morphline.base.AbstractCommand.beforeProcess:168)  - beforeProcess: {/record_type[]/alert/action=[allowed], /record_type[]/alert/category=[test2], /record_type[]/alert/severity=[3], /record_type[]/alert/signature=[GeoIP from NL, Netherlands ], /record_type[]/alert/signature_id=[88006], _attachment_body=[{"timestamp":"2015-03-23T07:42:01.303046","event_type":"alert","src_ip":"2.2.2.2","src_port":18192,"dest_ip":"46.231.41.166","dest_port":62004,"proto":"TCP","alert":{"action":"allowed","gid":"1","signature_id":"88006","rev":"1","signature":"GeoIP from NL, Netherlands ","category":"test2","severity":"3"}}], _attachment_mimetype=[json/java+memory], destination_ip=[46.231.41.166], destination_port=[62004], event_type=[alert], protocol=[TCP], source_ip=[2.2.2.2], source_port=[18192], timestamp=[2015-03-23T07:42:01.303046]}

25 Mar 2015 20:52:28,329 DEBUG [New I/O  worker #1] (org.kitesdk.morphline.avro.ToAvroBuilder$ToAvro.doProcess:156)  - Cannot convert item: [] to schema: {"type":"record","name":"Event","fields":[{"name":"timestamp","type":"string"},{"name":"event_type","type":"string"},{"name":"source_ip","type":"string"},{"name":"source_port","type":"int"},{"name":"destination_ip","type":"string"},{"name":"destination_port","type":"int"},{"name":"protocol","type":"string"},{"name":"record_type","type":[{"type":"record","name":"alert","fields":[{"name":"action","type":"string"},{"name":"signature_id","type":"string"},{"name":"signature","type":"string"},{"name":"category","type":"string"},{"name":"severity","type":"string"}]}]}]}


Original input

{"timestamp":"2015-03-23T07:42:01.303046","event_type":"alert","src_ip":"2.2.2.2","src_port":18192,"dest_ip":"46.231.41.166","dest_port":62004,"proto":"TCP","alert":
"action":"allowed","gid":"1","signature_id":"88006","rev":"1","signature":"GeoIP from NL, Netherlands ","category":"test2","severity":"3"}}


Bas

unread,
Mar 25, 2015, 7:17:11 PM3/25/15
to cdk...@cloudera.org
For testing I've simplified the extractJsonPaths and schema by removing the nesting. Now it is working, see scripts in this post. 

But since this is not the desired schema, how sould the extractJsonPaths and Avro schema look like to get this up and running using the schema posted in the initial post using  nested records.

Thanks!


Simple version of the extractJsonPaths
      # Extract JSON
     
{ extractJsonPaths { flatten: false, paths: {

              action
: /alert/action,
              signature_id
: /alert/signature_id,
              signature
: /alert/signature,
              category
: /alert/category,
              severity
: /alert/severity
     
} } }

And the simple flat schema
{
 
"type" : "record",
 
"name" : "Event",
 
"fields" : [ {
   
"name" : "timestamp",
   
"type" : "string"
 
}, {
   
"name" : "event_type",
   
"type" : "string"
 
}, {
   
"name" : "source_ip",
   
"type" : "string"
 
}, {
   
"name" : "source_port",
   
"type" : "int"
 
}, {
   
"name" : "destination_ip",
   
"type" : "string"
 
}, {
   
"name" : "destination_port",
   
"type" : "int"
 
}, {
   
"name" : "protocol",
   
"type" : "string"
 
}, {

   
"name" : "action",
   
"type" : "string"
 
}, {
   
"name" : "signature_id",
   
"type" : "int"
 
}, {
   
"name" : "signature",
   
"type" : "string"
 
}, {
   
"name" : "category",
   
"type" : "string"
 
}, {
   
"name" : "severity",
   
"type" : "int"
 
}]
}
...

Buntu Dev

unread,
Apr 20, 2015, 6:44:44 PM4/20/15
to cdk...@cloudera.org
I have the same requirement to map the nested JSON to AVRO subrecords. How does one do it in the custom Morphlines command?
...
Reply all
Reply to author
Forward
0 new messages