Hi all. I'm running FluentD to deliver data to HTTP endpoints via an output plugin I wrote and all is well except when the destination server can't accept POSTs and data is buffered to disk. That works exactly as expected in that data is written to disk but my problem is this: When FluentD attempts to flush the queue it tries to POST a very large number of messages at once. This exceeds the destination server's capacity and the POST is rejected, resulting in a failed retry. As a result the queue can basically never be flushed. The output plugin is very simple and I could iterate through messages rather than try to POST them all at once, however I don't know how to handle a "partial flush" properly. Here's an excerpt of the output plugin code:
--
def write(chunk)
messages = []
chunk.msgpack_each do |tag, time, record|
messages << record['message']
end
$
log.info "Received " + messages.length.to_s + " Messages"
post = Net::HTTP::Post.new @uri
post.body = messages.join("\n")
# retry up to :post_retry_max times
1.upto(@post_retry_max) do |c|
response = @http.request @uri, post
if response.code == "200"
# success
break
elsif response.code.match(/^40/)
# client error
break
elsif c < @post_retry_max
# retry
sleep @post_retry_interval
next
else
# other errors. fluentd will retry processing on exception
raise "#{@uri}: #{response.message}"
end
end
end
--
Instead of simply joining all messages to create the requeust's body, I could of course iterate through them in groups, e.g. 50 at once instead of 232,784 (as an example from a few minutes ago). I'm just not sure what happens if I get halfway through and then POSTs start failing, however. Is there a way to only return the unflushed portion of messages to the queue?
Here's the source and output definition with only labels altered for privacy purposes:
--
<source>
type tcp
port 2346
bind 127.0.0.1
tag testsource1.tcp
format none
log_level info
</source>
<match testsource1.**>
buffer_type file
buffer_path /var/log/td-agent/buffer/testsource1/
buffer_chunk_limit 256m
buffer_queue_limit 128
flush_at_shutdown true
flush_interval 5s
disable_retry_limit true
retry_wait 1s
max_retry_wait 1800s
type http_buffered
customer_id testsource1
endpoint_url
https://data.destination.com/receiver/v1/http/SukjdPZNhM7sNoF9p http_read_timeout 5
http_open_timeout 5
</match>
--
So I've got a chunk size of 256 MB and a limit of 128 (total of 32 GB). I'm not sure where the message counts are coming from since they aren't complete chunks. These messages are much smaller than 1,153 KB in size (256 MB / 232,784 from the earlier message count example).
Is there a way to process just a portion of the messages being sent to my output plugin's write() method and return the remaining messages to the queue? That would at least allow me to make some progress in flushing the queue.
Thanks,
Damon