Buffer flush sends too much data at once - Can this be configured?

95 views
Skip to first unread message

Damon Miller

unread,
Sep 10, 2015, 11:49:04 PM9/10/15
to Fluentd Google Group
Hi all.  I'm running FluentD to deliver data to HTTP endpoints via an output plugin I wrote and all is well except when the destination server can't accept POSTs and data is buffered to disk.  That works exactly as expected in that data is written to disk but my problem is this:  When FluentD attempts to flush the queue it tries to POST a very large number of messages at once.  This exceeds the destination server's capacity and the POST is rejected, resulting in a failed retry.  As a result the queue can basically never be flushed.  The output plugin is very simple and I could iterate through messages rather than try to POST them all at once, however I don't know how to handle a "partial flush" properly.  Here's an excerpt of the output plugin code:

--

    def write(chunk)
      messages = []

      chunk.msgpack_each do |tag, time, record|
        messages << record['message']
      end

      $log.info "Received " + messages.length.to_s + " Messages"

      post = Net::HTTP::Post.new @uri
      post.body = messages.join("\n")

      # retry up to :post_retry_max times
      1.upto(@post_retry_max) do |c|
        response = @http.request @uri, post

        if response.code == "200"
          # success
          break
        elsif response.code.match(/^40/)
          # client error
          break
        elsif c < @post_retry_max
          # retry
          sleep @post_retry_interval
          next
        else
          # other errors. fluentd will retry processing on exception
          raise "#{@uri}: #{response.message}"
        end
      end
    end

--

Instead of simply joining all messages to create the requeust's body, I could of course iterate through them in groups, e.g. 50 at once instead of 232,784 (as an example from a few minutes ago).  I'm just not sure what happens if I get halfway through and then POSTs start failing, however.  Is there a way to only return the unflushed portion of messages to the queue?

Here's the source and output definition with only labels altered for privacy purposes:

--

<source>
  type tcp
  port 2346
  bind 127.0.0.1
  tag testsource1.tcp
  format none
  log_level info
</source>

<match testsource1.**>
  buffer_type file
  buffer_path /var/log/td-agent/buffer/testsource1/
  buffer_chunk_limit 256m
  buffer_queue_limit 128
  flush_at_shutdown true
  flush_interval 5s
  disable_retry_limit true
  retry_wait 1s
  max_retry_wait 1800s
  type http_buffered
  customer_id testsource1
  endpoint_url https://data.destination.com/receiver/v1/http/SukjdPZNhM7sNoF9p
  http_read_timeout 5
  http_open_timeout 5
</match>

--

So I've got a chunk size of 256 MB and a limit of 128 (total of 32 GB).  I'm not sure where the message counts are coming from since they aren't complete chunks.  These messages are much smaller than 1,153 KB in size (256 MB / 232,784 from the earlier message count example).

Is there a way to process just a portion of the messages being sent to my output plugin's write() method and return the remaining messages to the queue?  That would at least allow me to make some progress in flushing the queue.


Thanks,

Damon

Mr. Fiber

unread,
Sep 14, 2015, 7:21:42 AM9/14/15
to Fluentd Google Group
Is there a way to process just a portion of the messages being sent to my output plugin's write() method and return the remaining messages to the queue?  That would at least allow me to make some progress in flushing the queue.

Why do you set smaller buffer_chunk_limit and larger buffer_queue_limit?
Set buffer_chunk_limit to smaller size, e.g. 16mb or similar, seems to resolve this big request problem.
Does this approach have other problem?


Masahiro

--
You received this message because you are subscribed to the Google Groups "Fluentd Google Group" group.
To unsubscribe from this group and stop receiving emails from it, send an email to fluentd+u...@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

Damon Miller

unread,
Sep 16, 2015, 1:30:15 PM9/16/15
to Fluentd Google Group
That makes perfect sense.  I originally used a large chunk size because I thought perhaps having a large number of smaller files would less efficient.  I also didn't realize messages would be retried in chunks "directly", rather than processing smaller pieces of the chunk to smooth delivery.  I changed the chunk size to be much smaller and increased the queue limit to be much larger.  It seems to be working well.  Thank you again!

Damon
Reply all
Reply to author
Forward
0 new messages