ElasticsearchSink/ElasticSearchLogStashEventSerializer inserting "java memomy address" instead of json object

12 views
Skip to first unread message

praya...@gmail.com

unread,
Nov 8, 2016, 6:53:14 PM11/8/16
to flume-user
I wrote a flume interceptor that reads each log event and transforms into required log event, with few fields modified, like transform xml into json etc. The log event itself is in json format.

Then I'm using ElasticsearchSink. The flume.conf is 

##########################################################################                                     
# source 
# to bind to 0.0.0.0:41414. Connect it to channel ch1.
##########################################################################                           
shipping_agent.sources.avro-source1.channels = logEventStream
shipping_agent.sources.avro-source1.type = exec
shipping_agent.sources.avro-source1.command = tail -f /var/log/supply_source.log
shipping_agent.sources.avro-source1.interceptors = logEnricher
shipping_agent.sources.avro-source1.interceptors.logEnricher.type = com.pseudo.enricher.api.LogEnrichBuilder

####################################################
# a memory channel
####################################################                                                        
shipping_agent.channels.logEventStream.type = memory
shipping_agent.channels.logEventStream.capacity = 10000000
shipping_agent.channels.logEventStream.transactionCapacity = 1000

################################################
# Sink ElasticSearch
################################################          
shipping_agent.sinks.sink1.type = org.apache.flume.sink.elasticsearch.ElasticSearchSink
shipping_agent.sinks.sink1.hostNames = localhost:9300,localhost:9200
shipping_agent.sinks.sink1.indexName = applications
shipping_agent.sinks.sink1.indexType = SupplyChain
shipping_agent.sinks.sink1.clusterName = datalens
shipping_agent.sinks.sink1.batchSize = 1000
shipping_agent.sinks.sink1.ttl = 2
#this serializer is crucial in order to use kibana
shipping_agent.sinks.sink1.serializer = org.apache.flume.sink.elasticsearch.ElasticSearchLogStashEventSerializer
#inserts the memory-address instead of json
#shipping_agent.sinks.sink1.serializer = org.apache.flume.sink.elasticsearch.ElasticSearchDynamicSerializer

# sink stream
shipping_agent.sinks.sink1.channel = logEventStream

# Finally, now that we've defined all of our components, tell                                         
# agent1 which ones we want to activate.                                                           
shipping_agent.channels = logEventStream
shipping_agent.sources = avro-source1
shipping_agent.sinks = sink1


The elasticsearch documents are

curl -XGET localhost:9200/applications-2016-11-08/_search?pretty=true
{
  "took" : 45,
  "timed_out" : false,
  "_shards" : {
    "total" : 5,
    "successful" : 5,
    "failed" : 0
  },
  "hits" : {
    "total" : 6,
    "max_score" : 1.0,
    "hits" : [ {
      "_index" : "applications-2016-11-08",
      "_type" : "SupplyChain",
      "_id" : "AVhF6y6GYoyNj2tLYnh-",
      "_score" : 1.0,
      "_source":{"body":"org.elasticsearch.common.xcontent.XContentBuilder@548301f2"}
    }, {
      "_index" : "applications-2016-11-08",
      "_type" : "SupplyChain",
      "_id" : "AVhF6y6GYoyNj2tLYnh_",
      "_score" : 1.0,
      "_source":{"body":"org.elasticsearch.common.xcontent.XContentBuilder@1060b04b"}
    }, {
      "_index" : "applications-2016-11-08",
      "_type" : "SupplyChain",
      "_id" : "AVhGCcLRYoyNj2tLYniA",
      "_score" : 1.0,
      "_source":{"@message":"org.elasticsearch.common.xcontent.XContentBuilder@7591085a","@fields":{}}
    }, {
      "_index" : "applications-2016-11-08",
      "_type" : "SupplyChain",
      "_id" : "AVhGCuOFYoyNj2tLYniC",
      "_score" : 1.0,
      "_source":{"body":"org.elasticsearch.common.xcontent.XContentBuilder@381ef60c"}
    }, {
      "_index" : "applications-2016-11-08",
      "_type" : "SupplyChain",
      "_id" : "AVhGCuOFYoyNj2tLYniD",
      "_score" : 1.0,
      "_source":{"body":"org.elasticsearch.common.xcontent.XContentBuilder@c8c937b"}
    }, {
      "_index" : "applications-2016-11-08",
      "_type" : "SupplyChain",
      "_id" : "AVhGCcLRYoyNj2tLYniB",
      "_score" : 1.0,
      "_source":{"@message":"org.elasticsearch.common.xcontent.XContentBuilder@7b56e898","@fields":{}}
    } ]
  }
}


I read the ElasticsearchSerializer code which is checking if the body is json or not, based on that appending the XContentBuilder. At here https://github.com/apache/flume/blob/trunk/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/ElasticSearchLogStashEventSerializer.java#L84

  public static void appendField(XContentBuilder builder, String field,
      byte[] data) throws IOException {
    XContentType contentType = XContentFactory.xContentType(data);
    if (contentType == null) {
      addSimpleField(builder, field, data);
    } else {
      addComplexField(builder, field, contentType, data);
    }
  }

I don't know what's wrong with that, 
Reply all
Reply to author
Forward
0 new messages