Flume failing to pick xml record processed by morphline interceptor

44 views
Skip to first unread message

trinath

unread,
Oct 29, 2015, 5:29:08 PM10/29/15
to CDK Development
MORPHLINE CONF :
SOLR_LOCATOR : {
    collection : xmlindexer
    zkHost : "127.0.0.1:2181/solr"
}

morphlines : [
  {
    id : xml_shredder
    importCommands : ["com.cloudera.**"]

    commands : [
   #Reading the line from Flume event:
           {
           readLine {
              charset : UTF-8
           }
     }
      {
        xquery {
          fragments : [
            {
              fragmentPath : "/" # record boundary
     queryString : """
        for $entry in /h:Message
        <ATTACHMENT_BODY>
        <_attachment_body>
        concat({$entry/hdr:Header/hdr:ApplicationID},",",{$entry/hdr:Header/hdr:ComponentName},",",{$entry/hdr:Header/hdr:Hostname})
        </_attachment_body>
        </ATTACHMENT_BODY>
"""          
            }
          ]
        }
      }

}
 { 
 java { code:"""
     record.replaceValues("_attachment_body",   record.getFirstValue("_attachment_body").toString().getBytes(Charsets.UTF_8));
     return child.process(record)""";
    } }
#{
# toByteArray {
# field : _attachment_body
# }
# }

      { logDebug { format : "output record: {}", args : ["@{}"] } }

    ]
  }
]


Flume Configuration:

# Define a memory channel on agent called memory-channel.
agent.channels.memory-channel.type = memory
agent.channels.memory-channel.capacity = 10000
agent.channels.memory-channel.transactionCapacity = 10000
agent.channels.memory-channle.byteCapacityBufferPercentage = 20
agent.channels.memory-channel.byteCapacity = 800000

#interceptor to shred xml
agent.sources.tail-source.interceptors = morphlineinterceptor
agent.sources.tail-source.interceptors.morphlineinterceptor.type = org.apache.flume.sink.solr.morphline.MorphlineInterceptor$Builder
agent.sources.tail-source.interceptors.morphlineinterceptor.morphlineFile = /home/cloudera/morphline_xml_parser.conf
agent.sources.tail-source.interceptors.morphlineinterceptor.morphlineId = xml_shredder


# Define a source on agent and connect to channel memory-channel.
agent.sources.tail-source.type = exec
agent.sources.tail-source.command = tail -F /home/cloudera/flumeSpark/flume.log
agent.sources.tail-source.channels = memory-channel


# Define a sink that outputs to hdfs.
agent.sinks.hdfs-sink.type = hdfs
agent.sinks.hdfs-sink.channel = memory-channel
agent.sinks.hdfs-sink.hdfs.path = /user/cloudera/tail_log/ds=%Y%m%d/hr=%H
agent.sinks.hdfs-sink.hdfs.fileType = CompressedStream 
agent.sinks.hdfs-sink.hdfs.codeC = gzip
agent.sinks.hdfs-sink.hdfs.inUsePrefix = _
agent.sinks.hdfs-sink.hdfs.fileSuffix = .txt.gz
agent.sinks.hdfs-sink.hdfs.idleTimeout = 0
agent.sinks.hdfs-sink.hdfs.batchSize = 1000
agent.sinks.hdfs-sink.hdfs.rollInterval = 120
agent.sinks.hdfs-sink.hdfs.rollSize = 0
agent.sinks.hdfs-sink.hdfs.rollCount = 0
agent.sinks.hdfs-sink.hdfs.maxOpenFiles = 1
agent.sinks.hdfs-sink.hdfs.round = true
agent.sinks.hdfs-sink.hdfs.roundValue = 1
agent.sinks.hdfs-sink.hdfs.roundUnit = hour
agent.sinks.hdfs-sink.hdfs.useLocalTimeStamp = true
agent.sinks.hdfs-sink.hdfs.callTimeout = 50000

agent.channels = memory-channel
agent.sources = tail-source
agent.sinks = hdfs-sink 

LOG:
2015-10-29 14:25:34,906 INFO org.apache.flume.conf.FlumeConfiguration: Processing:hdfs-sink
2015-10-29 14:25:34,906 INFO org.apache.flume.conf.FlumeConfiguration: Processing:hdfs-sink
2015-10-29 14:25:34,906 INFO org.apache.flume.conf.FlumeConfiguration: Processing:hdfs-sink
2015-10-29 14:25:34,948 INFO org.apache.flume.conf.FlumeConfiguration: Post-validation flume configuration contains configuration for agents: [agent]
2015-10-29 14:25:34,949 INFO org.apache.flume.node.AbstractConfigurationProvider: Creating channels
2015-10-29 14:25:34,963 INFO org.apache.flume.channel.DefaultChannelFactory: Creating instance of channel memory-channel type memory
2015-10-29 14:25:34,974 INFO org.apache.flume.node.AbstractConfigurationProvider: Created channel memory-channel
2015-10-29 14:25:34,975 INFO org.apache.flume.source.DefaultSourceFactory: Creating instance of source tail-source, type exec
2015-10-29 14:25:35,559 INFO org.kitesdk.morphline.api.MorphlineContext: Importing commands
2015-10-29 14:25:14,881 INFO org.apache.flume.conf.FlumeConfiguration: Processing:hdfs-sink
2015-10-29 14:25:14,881 INFO org.apache.flume.conf.FlumeConfiguration: Processing:hdfs-sink
2015-10-29 14:25:14,881 INFO org.apache.flume.conf.FlumeConfiguration: Processing:hdfs-sink
2015-10-29 14:25:14,881 INFO org.apache.flume.conf.FlumeConfiguration: Processing:hdfs-sink
2015-10-29 14:25:14,882 INFO org.apache.flume.conf.FlumeConfiguration: Processing:hdfs-sink
2015-10-29 14:25:14,882 INFO org.apache.flume.conf.FlumeConfiguration: Processing:hdfs-sink
2015-10-29 14:25:14,882 INFO org.apache.flume.conf.FlumeConfiguration: Added sinks: hdfs-sink Agent: agent
2015-10-29 14:25:14,882 INFO org.apache.flume.conf.FlumeConfiguration: Processing:hdfs-sink
2015-10-29 14:25:14,882 INFO org.apache.flume.conf.FlumeConfiguration: Processing:hdfs-sink
2015-10-29 14:25:14,882 INFO org.apache.flume.conf.FlumeConfiguration: Processing:hdfs-sink
2015-10-29 14:25:14,883 INFO org.apache.flume.conf.FlumeConfi2015-10-29 14:26:16,632 INFO org.apache.flume.node.PollingPropertiesFileConfigurationProvider: Configuration provider starting
2015-10-29 14:26:16,704 INFO org.apache.flume.node.PollingPropertiesFileConfigurationProvider: Reloading configuration file:/var/run/cloudera-scm-agent/process/56-flume-AGENT/flume.conf
2015-10-29 14:26:16,723 INFO org.apache.flume.conf.FlumeConfiguration: Processing:hdfs-sink
2015-10-29 14:26:16,723 INFO org.apache.flume.conf.FlumeConfiguration: Processing:hdfs-sink
2015-10-29 14:26:16,724 INFO org.apache.flume.conf.FlumeConfiguration: Processing:hdfs-sink
2015-10-29 14:26:16,724 INFO org.apache.flume.conf.FlumeConfiguration: Processing:hdfs-sink
2015-10-29 14:26:16,724 INFO org.apache.flume.conf.FlumeConfiguration: Processing:hdfs-sink
2015-10-29 14:26:16,725 INFO org.apache.flume.conf.FlumeConfiguration: Processing:hdfs-sink
2015-10-29 14:26:16,725 INFO org.apache.flume.conf.FlumeConfiguration: Processing:hdfs-sink
2015-10-29 14:26:16,725 INFO org.apache.flume.conf.FlumeConfiguration: Processing:hdfs-sink
2015-10-29 14:26:16,726 INFO org.apache.flume.conf.FlumeConfiguration: Processing:hdfs-sink
2015-10-29 14:26:16,726 INFO org.apache.flume.conf.FlumeConfiguration: Processing:hdfs-sink
2015-10-29 14:26:16,726 INFO org.apache.flume.conf.FlumeConfiguration: Added sinks: hdfs-sink Agent: agent
2015-10-29 14:26:16,727 INFO org.apache.flume.conf.FlumeConfiguration: Processing:hdfs-sink
2015-10-29 14:26:16,727 INFO org.apache.flume.conf.FlumeConfiguration: Processing:hdfs-sink
2015-10-29 14:26:16,727 INFO org.apache.flume.conf.FlumeConfiguration: Processing:hdfs-sink
2015-10-29 14:26:16,728 INFO org.apache.flume.conf.FlumeConfiguration: Processing:hdfs-sink
2015-10-29 14:26:16,728 INFO org.apache.flume.conf.FlumeConfiguration: Processing:hdfs-sink
2015-10-29 14:26:16,728 INFO org.apache.flume.conf.FlumeConfiguration: Processing:hdfs-sink
2015-10-29 14:26:16,728 INFO org.apache.flume.conf.FlumeConfiguration: Processing:hdfs-sink
2015-10-29 14:26:16,729 INFO org.apache.flume.conf.FlumeConfiguration: Processing:hdfs-sink
2015-10-29 14:26:16,787 INFO org.apache.flume.conf.FlumeConfiguration: Post-validation flume configuration contains configuration for agents: [agent]
2015-10-29 14:26:16,787 INFO org.apache.flume.node.AbstractConfigurationProvider: Creating channels
2015-10-29 14:26:16,801 INFO org.apache.flume.channel.DefaultChannelFactory: Creating instance of channel memory-channel type memory
2015-10-29 14:26:16,809 INFO org.apache.flume.node.AbstractConfigurationProvider: Created channel memory-channel
2015-10-29 14:26:16,811 INFO org.apache.flume.source.DefaultSourceFactory: Creating instance of source tail-source, type exec
2015-10-29 14:26:17,483 INFO org.kitesdk.morphline.api.MorphlineContext: Importing commands

Wolfgang Hoschek

unread,
Oct 29, 2015, 5:35:22 PM10/29/15
to trinath, CDK Development
Try to remove that readLine and java commands. xquery expects an byte array or ByteArrayInputstream in the _attachment_body input field.

--
You received this message because you are subscribed to the Google Groups "CDK Development" group.
To unsubscribe from this group and stop receiving emails from it, send an email to cdk-dev+u...@cloudera.org.
For more options, visit https://groups.google.com/a/cloudera.org/d/optout.

trinath

unread,
Oct 29, 2015, 6:00:14 PM10/29/15
to CDK Development
Updated conf.. still doesn't write anything to hdfs and there is no error log..
morphlines : [
  {
    id : xml_shredder
    importCommands : ["com.cloudera.**"]

    commands : [
      {
        xquery {
          fragments : [
            {
              fragmentPath : "/" # record boundary
      queryString : """
        for $entry in /h:Message
        <ATTACHMENT_BODY>
        <_attachment_body>
        concat({$entry/hdr:Header/hdr:ApplicationID},",",{$entry/hdr:Header/hdr:ComponentName},",",{$entry/hdr:Header/hdr:Hostname})
        </_attachment_body>
        </ATTACHMENT_BODY>
"""          
            }
          ]
        }
      }

}


      { logDebug { format : "output record: {}", args : ["@{}"] } }

    ]
  }
]

Wolfgang Hoschek

unread,
Oct 29, 2015, 6:29:09 PM10/29/15
to trinath, CDK Development

trinath

unread,
Oct 29, 2015, 7:49:45 PM10/29/15
to CDK Development, tsiri...@gmail.com

Please suggest what I am missing here. I dont see any data in hdfs I have altered log4j properties of flume and added log4j.logger.org.kitesdk.morphline=TRACE  

removed the commands you suggested. Please help me understand what I am missing

latest morphline conf.
morphlines : [
  {
    id : xml_shredder
    importCommands : ["com.cloudera.**"]

    commands : [
      {
        xquery {
          fragments : [
            {
              fragmentPath : "/" # record boundary
      queryString : """
        for $entry in /h:Message
        <ATTACHMENT_BODY>
        <_attachment_body>
        concat({$entry/hdr:Header/hdr:ApplicationID},",",{$entry/hdr:Header/hdr:ComponentName},",",{$entry/hdr:Header/hdr:Hostname})
        </_attachment_body>
        </ATTACHMENT_BODY>
"""          
            }
          ]
        }
      }

}


      { logDebug { format : "output record: {}", args : ["@{}"] } }

    ]
  }
]

Wolfgang Hoschek

unread,
Oct 29, 2015, 8:18:43 PM10/29/15
to trinath, CDK Development
Try writing a unit test as shown in the link below to see if your xquery works for your xml input data. Also keep in mind that a flume interceptor can only accept a single record from each morphline invocation, not multiple output records per xquery xml input doc. That's per flume's design.
Reply all
Reply to author
Forward
0 new messages