Some questions about using fluent-plugin-kafka

515 views
Skip to first unread message

elain ding

unread,
Sep 29, 2015, 4:46:19 AM9/29/15
to Fluentd Google Group


After I used fluent-plugin-kafka, I found that I had no time field in my log.
follows:
{"client_ip":"39.76.243.121","domain":"i.xxx.com","method":"GET","url":"/login/callback?pwd=0&d=emXYDnLHD6emC279&tsl=0&auth=jBDNN%2BpeV7JDqZ%2Fxmyu4RVGo1HuNiMjd8gj98oMbtWbgFyQ8ZkO3FVKOyo3LIi0jC3WV5nDH%2Bi%2BWZA2RmW%2F%2FpmtmzkaivDgErjuk1x%2F07XZqUa9uo5aYZkbTAA%2FqYXMQdi4HXf4HtErNWQZx7GfZcyoq5OtiAQ%2FTG8bz%2Bn6y%2F8Q%3D&m=1&pass_eas=6.0&pass_uas=8.0&pass_ss=6.0&nonce=smvZw3bR4S4Bbxre&_ssign=5AsKKQzLv2YYHrBwDt0j1OYaN3I%3D&clientSign=VyhhPhkYecG9vmaUSOxwrtCAe4c%3D&_userIdNeedEncrypt=true","http_ver":"HTTP/1.0","http_code":"200","http_length":"741","referer":"-","ua":"2014811/wt88047; MIUI/V6.7.1.0.KHJCNCH E/V6 B/S L/zh-CN LO/CN","proxy_ip":"10.108.70.12","upstream_addr":"127.0.0.1:9000","request_time":"0.001","response_time":"0.001","upstream_status":"200","custom_status":"-","userid":"","logid":"496401249220","time":1443515401,"tag":"ngx_log_xxx"}

My fluentd configuration is as follows:

########td-agent.conf###########
<source>
  type tail
  format /^(?<client_ip>[^ ]*) - (?<domain>[^ ]*) \[(?<log_time>[^\]]*)\] "(?<method>[^ ]*) (?<url>[^ ]*) (?<http_ver>[^ ]*)" (?<http_code>[^ ]*) (?<http_length>[^ ]*) "(?<referer>[^ ]*)" "(?<ua>[^\"]*)" "(?<proxy_ip>[^\/]*)/(?<upstream_addr>[^\"]*)" "(?<request_time>[^\/]*)/(?<response_time>[^\/]*)/(?<upstream_status>[^\/]*)/(?<custom_status>[^\/]*)/(?<userid>[^\"]*)"( "(?<logid>[^"]*)")?$/
  time_format %d/%b/%Y:%H:%M:%S %z
  pos_file /tmp/td-agent/nginx_log.pos
  #refresh_interval 60
  path /home/work/logs/nginx/*.log
  tag ngx_log_xxx
</source>

<match ngx_log_xxx>
  type                kafka
  brokers             kafka01:9092,kafka02:9092,kafka03:9092
  zookeeper           zk01:2181,zk02:2181,zk03:2181

  default_topic       kafka_test
  output_data_type json
  #output_include_tag true
  #output_include_time true

</match>
################################


But I want the result to be like this:


{"client_ip":"222.84.167.100","domain":"xmevent.xxx.com","time":"29/Sep/2015:16:31:43 +0800","method":"GET","url":"/login/callback?pwd=0&d=eSjZpzJmym0c7dlS&tsl=0&auth=SB0AyMVyyVx0lyLE%2FubXxiCvcdqbYNLWuWmDDKW9PDBb2Xjmep%2FoEL5BUjWgmUc1gR2AXosiQzvRLQMDkPN7vHtJwNHSEWaJw%2FOpFIF8LWXaoXDU%2B5DJKi5fP24SWkb7dJREJA07b%2BFFf9lLme8ODUn2dDRuSErex2EJtGbYXQg%3D&m=1&pass_eas=9.0&pass_uas=4.0&pass_ss=7.0&nonce=1JQ30TqRERYBbxrf&_ssign=ENDBT9SB%2BwjrAyypY1PLOFSBByE%3D&_userIdNeedEncrypt=true&clientSign=vJxpZfnzcw6rCcc6ZrEGtWXRcgU%3D","http_ver":"HTTP/1.0","http_code":"200","http_length":"849","referer":"-","ua":"Redmi Note 2/Redmi Note 2; MIUI/V6.7.10.0.LHMCNCH E/V6 B/S L/zh-CN LO/CN","proxy_ip":"10.108.47.18","upstream_addr":"127.0.0.1:9000","request_time":"0.003","response_time":"0.003","upstream_status":"200","custom_status":"0","userid":"","logid":"603503536912","time":1443515503,"tag":"ngx_log_xxx"}


I tried to change the time to log_time in the log format to be output, but I need time to create the pattern index, and I now find that I can't create the pattern time if it is collected by me,
1, time field in fluentd-plugin-kafka and td-agent whether there is a special significance? Is there any way to solve this problem?
2, in the fluentd-plugin-kafka is to create such a tag: ngx_xxx_log-2015.09.29

Mr. Fiber

unread,
Sep 30, 2015, 5:03:34 PM9/30/15
to Fluentd Google Group
1, time field in fluentd-plugin-kafka and td-agent whether there is a special significance? Is there any way to solve this problem?

'keep_time_key true' in input plugin, parser internally, seems to archive your requirement.
Could you set 'keep_time_key true' to your in_tail setting?

2, in the fluentd-plugin-kafka is to create such a tag: ngx_xxx_log-2015.09.29

Sorry, I don't understand what you say correctly.
What is the problem? From the code, kafka output doesn't generate new tag.


So if you got ngx_xxx_log-2015.09.29 tag, it seems not kafka plugin issue.


Masahiro


--
You received this message because you are subscribed to the Google Groups "Fluentd Google Group" group.
To unsubscribe from this group and stop receiving emails from it, send an email to fluentd+u...@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

elain ding

unread,
Oct 8, 2015, 3:56:40 AM10/8/15
to Fluentd Google Group


Added keep_time_key or not, if the type Kafka output_include_time set to time, true in output after a time stamp, the following is my configuration, please help look at where the configuration is not configured properly? Thank you very much


nginx.log:
############################
106.118.148.60 - i.xxx.com [08/Oct/2015:15:49:36 +0800] "GET /login HTTP/1.0" 200 747 "-" "MI 3/MI 3; MIUI/V6.7.1.0.KXCCNCH E/V6 B/S L/zh-CN LO/CN" "10.108.47.18/127.0.0.1:9000" "0.001/0.001/200/-/" "847576670474"


fluentd output:
#############################
{"client_ip":"106.118.148.60","domain":"i.xxx.com","method":"GET","url":"/login","http_ver":"HTTP/1.0","http_code":"200","http_length":"747","referer":"-","ua":"MI 3/MI 3; MIUI/V6.7.1.0.KXCCNCH E/V6 B/S L/zh-CN LO/CN","proxy_ip":"10.108.47.18","upstream_addr":"127.0.0.1:9000","request_time":"0.001","response_time":"0.001","upstream_status":"200","custom_status":"-","userid":"","logid":"847576670474","time":1444290576,"tag":"ngx_log_huodog"}


my config:
#############################
<source>
  type tail
  format /^(?<client_ip>[^ ]*) - (?<domain>[^ ]*) \[(?<time>[^\]]*)\] "(?<method>[^ ]*) (?<url>[^ ]*) (?<http_ver>[^ ]*)" (?<http_code>[^ ]*) (?<http_length>[^ ]*) "(?<referer>[^ ]*)" "(?<ua>[^\"]*)" "(?<proxy_ip>[^\/]*)/(?<upstream_addr>[^\"]*)" "(?<request_time>[^\/]*)/(?<response_time>[^\/]*)/(?<upstream_status>[^\/]*)/(?<custom_status>[^\/]*)/(?<userid>[^\"]*)"( "(?<logid>[^"]*)")?$/
  time_format %d/%b/%Y:%H:%M:%S %z
  keep_time_key true
  pos_file /tmp/td-agent/nginx_log.pos
  #refresh_interval 60
  path /home/work/logs/nginx/*.log
  tag ngx_log_huodog
</source>

<match ngx_log_huodong.**>
  type rewrite_tag_filter
  remove_tag_prefix ngx_log_huodong
  rewriterule1 url   \.(gif|jpe?g|png|ico|pdf|zip|js|css)$  clear
  rewriterule2 url .+       ngx_log_huodong_grep
</match>

<match ngx_log_huodong_grep>
  type grep
  input_key url
  exclude favicon.ico|nginx_status
  tag ngx_log_huodong_ua
</match>

<match ngx_log_huodong_ua.**>
  @include /etc/td-agent/templates/useragent.tpl
  tag ngx_log_huodong_replaced
</match>

<match ngx_log_huodong_replaced.**>
  type replace
  rules_yaml /etc/td-agent/rules.yml
  tag ngx_log_huodong_record
</match>

<match ngx_log_huodong_record.**>
  type record_modifier
  char_encoding utf-8
  remove_keys http_ver,upstream_addr,upstream_status
  hostname ${hostname}
  tag ngx_log_huodong_kafka
</match>

<match *.**>
  type                kafka
  brokers             log-es01:9092,log-es02:9092,log-es03:9092
  zookeeper           log-es01:2181,log-es02:2181,log-es03:2181

  default_topic       kafka_topic1
  output_data_type json
  default_partition_key nil
  output_include_tag true
  output_include_time true

</match>







在 2015年10月1日星期四 UTC+8上午5:03:34,repeatedly写道:

Mr. Fiber

unread,
Oct 13, 2015, 11:15:52 AM10/13/15
to Fluentd Google Group
If you set `output_include_time true`, it overwrites existing time field.

Reply all
Reply to author
Forward
0 new messages