Rewrite fields to replace characters

1,807 views
Skip to first unread message

Robert Swain

unread,
Jan 12, 2016, 5:34:19 AM1/12/16
to Fluentd Google Group
I am using the fluentd docker log driver, fluentd, fluent-plugin-elasticsearch and elasticsearch. I want to add all the reverse domain notation labels that docker swarm and compose and others add to containers. However, elasticsearch rejects fields that contain the "." character. I would like to add a filter in fluentd to rewrite the fields to be alphanumeric.


Here comes my configuration in case it's useful context...


For the fluentd docker log driver I have used the following options:

--log-driver fluentd --log-opt fluentd-address=localhost:24224 --log-opt tag=docker.{{.ImageName}}.{{.ImageID}}.{{.Name}}.{{.ID}} --log-top labels=com.docker.compose.config-hash,com.docker.compose.container-number,com.docker.compose.oneoff,com.docker.compose.project,com.docker.compose.service,com.docker.compose.version,com.docker.swarm.affinities,com.docker.swarm.id


fluentd configuration:

<source>
  type forward
  port 24224
  bind 0.0.0.0
</source>

<match docker.**>
  @type copy
  <store>
    @type           elasticsearch
    @id             output_elasticsearch1
    hosts           http://100.100.100.100:9200/
    logstash_format true
    index_name      fluentd
    type_name       fluentd
  </store>
  <store>
    @type             file
    @id               output_docker1
    path              /fluentd/logs/docker.*.log
    symlink_path      /fluentd/logs/docker.log
    append            true
    time_slice_format %Y%m%d
    time_slice_wait   1m
    time_format       %Y%m%dT%H%M%S%z
  </store>
</match>
<match **>
  @type             file
  @id               output1
  path              /fluentd/logs/data.*.log
  symlink_path      /fluentd/logs/data.log
  append            true
  time_slice_format %Y%m%d
  time_slice_wait   10m
  time_format       %Y%m%dT%H%M%S%z
</match>


Error output from elasticsearch:

[2016-01-12 08:12:05,215][DEBUG][action.bulk              ] [Punchout] [logstash-2016.01.12][3] failed to execute bulk item (index) index {[logstash-2016.01.12][fluentd][AVI05FLyAO15xpkHnohx], source[{"com.example.site.applicationid":"logs","com.example.site.componentid":"kibana","container_id":"6b0e7cb8db11024bf712f0c56e29774d49b67cfaa5ca823bbf03569312b30245","container_name":"/elated_williams","source":"stdout","log":"Some log text","com.docker.swarm.id":"db0a0774ccdc4c131d1f69848b6390a55691f42a397f6fff7deb97f8d0ba293b","@timestamp":"2016-01-12T08:11:26+00:00"}]}
MapperParsingException[Field name [com.docker.swarm.id] cannot contain '.']
    at org.elasticsearch.index.mapper.object.ObjectMapper$TypeParser.parseProperties(ObjectMapper.java:278)
    at org.elasticsearch.index.mapper.object.ObjectMapper$TypeParser.parseObjectOrDocumentTypeProperties(ObjectMapper.java:223)
    at org.elasticsearch.index.mapper.object.RootObjectMapper$TypeParser.parse(RootObjectMapper.java:139)
    at org.elasticsearch.index.mapper.DocumentMapperParser.parse(DocumentMapperParser.java:140)
    at org.elasticsearch.index.mapper.DocumentMapperParser.parseCompressed(DocumentMapperParser.java:121)
    at org.elasticsearch.index.mapper.MapperService.parse(MapperService.java:391)
    at org.elasticsearch.cluster.metadata.MetaDataMappingService$2.execute(MetaDataMappingService.java:386)
    at org.elasticsearch.cluster.service.InternalClusterService$UpdateTask.run(InternalClusterService.java:388)
    at org.elasticsearch.common.util.concurrent.PrioritizedEsThreadPoolExecutor$TieBreakingPrioritizedRunnable.runAndClean(PrioritizedEsThreadPoolExecutor.java:231)
    at org.elasticsearch.common.util.concurrent.PrioritizedEsThreadPoolExecutor$TieBreakingPrioritizedRunnable.run(PrioritizedEsThreadPoolExecutor.java:194)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

Note the labels like "com.docker.swarm.id":"db0a0774ccdc4c131d1f69848b6390a55691f42a397f6fff7deb97f8d0ba293b" and MapperParsingException[Field name [com.docker.swarm.id] cannot contain '.']

Robert Swain

unread,
Jan 12, 2016, 6:04:56 AM1/12/16
to Fluentd Google Group
I guess that maybe I can use record_transformer a bit like this:

<filter docker.**>
  @type record_transformer
  renew_record true
  keep_keys container_id,container_name,source,log
  <record>
    image_name ${tag_parts[1]}
  </record>
  <record>
    image_id ${tag_parts[2]}
  </record>
  <record>
    com_docker_swarm_id com.docker.swarm.id
  </record>
</filter>

But that isn't generic as I would have to explicitly add rules every time a new label is added with disallowed characters. I need a generic solution.

Mr. Fiber

unread,
Jan 12, 2016, 10:11:12 AM1/12/16
to Fluentd Google Group
You can use ${...} in keys and enable_ruby combination.


Or if you want to iterate all fields, following plugin may help.



Masahiro

--
You received this message because you are subscribed to the Google Groups "Fluentd Google Group" group.
To unsubscribe from this group and stop receiving emails from it, send an email to fluentd+u...@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

Robert Swain

unread,
Jan 13, 2016, 11:04:37 AM1/13/16
to Fluentd Google Group
Thanks, that worked! :)
Reply all
Reply to author
Forward
0 new messages