I tried your logfile but not reproduced.
- fluentd log
2015-02-12 10:18:44 +0900 [info]: reading config file path="fluent.conf"
2015-02-12 10:18:44 +0900 [info]: starting fluentd-0.10.59
...
2015-02-12 10:18:44 +0900 [info]: gem 'fluent-plugin-elasticsearch' version '0.7.0'
...
2015-02-12 10:18:44 +0900 [info]: using configuration file: <ROOT>
<source>
type tail
path /Users/repeatedly/tmp/fluentd/es/log/*.log
pos_file /Users/repeatedly/tmp/fluentd/es/pos_file
format json
read_from_head true
refresh_interval 10s
tag es.test
</source>
<match es.**>
type copy
<store>
type flowcounter_simple # check in_tail emitted document number
unit second
</store>
<store>
type elasticsearch
logstash_format true
flush_at_shutdown true
</store>
</match>
</ROOT>
2015-02-12 10:18:44 +0900 [info]: adding source type="tail"
2015-02-12 10:18:44 +0900 [info]: adding match pattern="es.**" type="copy"
2015-02-12 10:18:54 +0900 [info]: following tail of /Users/repeatedly/tmp/fluentd/es/log/klein10.log
2015-02-12 10:18:55 +0900 [info]: plugin:out_flowcounter_simple count:250 indicator:num unit:second
^C2015-02-12 10:19:09 +0900 [info]: shutting down fluentd
2015-02-12 10:19:10 +0900 [info]: Connection opened to Elasticsearch cluster => {:host=>"localhost", :port=>9200, :scheme=>"http"}
2015-02-12 10:19:11 +0900 [info]: process finished code=0
- put klein10.log
/Users/repeatedly/tmp/fluentd/es% ls
fluent.conf klein10.log log/
/Users/repeatedly/tmp/fluentd/es% cp klein10.log log/
/Users/repeatedly/tmp/fluentd/es% cat pos_file
/Users/repeatedly/tmp/fluentd/es/log/klein10.log 000000000000a4e8 000000000224c4f7
- es log
[2015-02-12 10:19:10,271][INFO ][cluster.metadata ] [Scimitar] [logstash-2015.02.03] creating index, cause [auto(bulk api)], shards [5]/[1], mappings []
[2015-02-12 10:19:10,449][INFO ][cluster.metadata ] [Scimitar] [logstash-2015.02.07] creating index, cause [auto(bulk api)], shards [5]/[1], mappings []
[2015-02-12 10:19:10,469][INFO ][cluster.metadata ] [Scimitar] [logstash-2015.02.06] creating index, cause [auto(bulk api)], shards [5]/[1], mappings []
[2015-02-12 10:19:10,489][INFO ][cluster.metadata ] [Scimitar] [logstash-2015.02.02] creating index, cause [auto(bulk api)], shards [5]/[1], mappings []
[2015-02-12 10:19:10,509][INFO ][cluster.metadata ] [Scimitar] [logstash-2015.02.01] creating index, cause [auto(bulk api)], shards [5]/[1], mappings []
[2015-02-12 10:19:10,528][INFO ][cluster.metadata ] [Scimitar] [logstash-2015.02.05] creating index, cause [auto(bulk api)], shards [5]/[1], mappings []
[2015-02-12 10:19:10,548][INFO ][cluster.metadata ] [Scimitar] [logstash-2015.02.04] creating index, cause [auto(bulk api)], shards [5]/[1], mappings []
[2015-02-12 10:19:11,143][INFO ][cluster.metadata ] [Scimitar] [logstash-2015.02.03] update_mapping [fluentd] (dynamic)
[2015-02-12 10:19:11,152][INFO ][cluster.metadata ] [Scimitar] [logstash-2015.02.06] update_mapping [fluentd] (dynamic)
[2015-02-12 10:19:11,162][INFO ][cluster.metadata ] [Scimitar] [logstash-2015.02.05] update_mapping [fluentd] (dynamic)
[2015-02-12 10:19:11,164][INFO ][cluster.metadata ] [Scimitar] [logstash-2015.02.01] update_mapping [fluentd] (dynamic)
[2015-02-12 10:19:11,169][INFO ][cluster.metadata ] [Scimitar] [logstash-2015.02.02] update_mapping [fluentd] (dynamic)
[2015-02-12 10:19:11,186][INFO ][cluster.metadata ] [Scimitar] [logstash-2015.02.07] update_mapping [fluentd] (dynamic)
[2015-02-12 10:19:11,190][INFO ][cluster.metadata ] [Scimitar] [logstash-2015.02.04] update_mapping [fluentd] (dynamic)
- check script
require 'open-uri'
require 'json'
result = open('
http://localhost:9200/_stats') { |f| JSON.parse(f.read) }
total = 0
result['indices'].each_pair { |i, status|
num = status['total']['docs']['count']
puts "#{i} : #{num}"
total += num
}
puts "the number of docs: #{total}"
- check script result
logstash-2015.02.03 : 38
logstash-2015.02.07 : 38
logstash-2015.02.02 : 46
logstash-2015.02.06 : 29
logstash-2015.02.01 : 36
logstash-2015.02.05 : 34
logstash-2015.02.04 : 29
the number of docs: 250
Hmm...