Amqp output problem

107 views
Skip to first unread message

joaquin.sil...@gmail.com

unread,
May 5, 2016, 4:45:23 PM5/5/16
to Fluentd Google Group
Hi, I'm trying to move data from a file using tail, and then select some of the logs and then write it into a queue in amqp.This is my conf file:


<source>
 @type tail
 path /home/ubuntu/datos-prueba/SG04_Itau_LogsInformes_ssl__37_160413122345.log
 pos_file /var/log/td-agent/tmp/access.log.pos
 tag input
 format none
 read_from_head true
</source>
 
#assign sslyes tag if the log contains the string "ssl", otherwise assign sslno
<match input>
 @type rewrite_tag_filter
 capitalize_regex_backreference no
 rewriterule1 message ssl sslyes
 rewriterule2 message ^((?!ssl).)*$ sslno
</match>
<match sslyes sslno>
 @type amqp
 host xxxxxx
 port 5672
 user xxxxx
 password xxxxxx
 vhost /
 exchange xxxxxx
 exchange_type topic
 exchange_durable true
 payload_only true
 content_type application/octet-stream
</match>


 but when I run this, none of the logs are queued. So I did some tests to find the problem:
  1. Tested with stdout output only and worked, matching one or both tags.
  2. Tested with stdout and amqp  matching both tags, did not worked.
  3. Tested with stdout and amqp matching one tags, and only worked the stdout with the tag that is not matched.
So I don't undestand this behaivour, please help me.
Regards,

Mr. Fiber

unread,
May 5, 2016, 4:53:36 PM5/5/16
to Fluentd Google Group
Could you paste all 3 case configurations?


Masahiro


--
You received this message because you are subscribed to the Google Groups "Fluentd Google Group" group.
To unsubscribe from this group and stop receiving emails from it, send an email to fluentd+u...@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

joaquin.sil...@gmail.com

unread,
May 5, 2016, 5:07:06 PM5/5/16
to Fluentd Google Group
  1. <source>
      @type tail
      path /home/ubuntu/datos-prueba/SG04_Itau_LogsInformes_ssl__37_160413122345.log
      pos_file /var/log/td-agent/tmp/access.log.pos
      tag input
      format none
      read_from_head true
      #time_format %Y%m%d%H%M%S%6N
    </source>
    
    <match input>
      @type rewrite_tag_filter
      capitalize_regex_backreference no
      rewriterule1 message ssl sslyes
      rewriterule2 message ^((?!ssl).)*$ sslno
    </match>
    
    <match sslyes sslno>
      type stdout
    </match>
  2. <source>
      @type tail
      path /home/ubuntu/datos-prueba/SG04_Itau_LogsInformes_ssl__37_160413122345.log
      pos_file /var/log/td-agent/tmp/access.log.pos
      tag input
      format none
      read_from_head true
      #time_format %Y%m%d%H%M%S%6N
    </source>
    
    <match input>
      @type rewrite_tag_filter
      capitalize_regex_backreference no
      rewriterule1 message ssl sslyes
      rewriterule2 message ^((?!ssl).)*$ sslno
    </match>
    
    <match sslyes sslno>
      @type amqp
    
      # Set broker host and port
      host xxxxx
      port 5672
    
      # Set user and password for authentication
      user xxxxxx
      password xxxxx
    
      # Configure amqp entities vhost, exchange id and type
      vhost /
      exchange fd_exchange
      exchange_type topic
      exchange_durable true # optionally set exchange durability - default is true.
      payload_only true # optional - default is false. if true, only the payload will be sent. if false, data format is { "key" => tag, "timestamp" => time, "payload" => record }.
      content_type application/octet-stream # optional - default is application/octet-stream. some amqp consumers will expect application/json.
    </match>
  3. <source>
      @type tail
      path /home/ubuntu/datos-prueba/SG04_Itau_LogsInformes_ssl__37_160413122345.log
      pos_file /var/log/td-agent/tmp/access.log.pos
      tag input
      format none
      read_from_head true
      #time_format %Y%m%d%H%M%S%6N
    </source>
    
    <match input>
      @type rewrite_tag_filter
      capitalize_regex_backreference no
      rewriterule1 message ssl sslyes
      rewriterule2 message ^((?!ssl).)*$ sslno
    </match>
    <match sslyes>
      @type amqp
    
      # Set broker host and port
      host xxxxxx
      port xxxxxx
    
      # Set user and password for authentication
      user xxxxxx
      password xxxxxx
    
      # Configure amqp entities vhost, exchange id and type
      vhost /
      exchange fd_exchange
      exchange_type topic
      exchange_durable true # optionally set exchange durability - default is true.
      payload_only true # optional - default is false. if true, only the payload will be sent. if false, data format is { "key" => tag, "timestamp" => time, "payload" => record }.
      content_type application/octet-stream # optional - default is application/octet-stream. some amqp consumers will expect application/json.
    </match>
    <match sslyes sslno>
      type stdout
    </match>

Mr. Fiber

unread,
May 5, 2016, 8:49:59 PM5/5/16
to Fluentd Google Group
I confirmed 1st case works so the posted configuration also should work.
So I have a question.

none of the logs are queued. 

- No error logs?
- amqp plugin is a buffered plugin so you should check events are buffered or not.

If events are buffered but output plugin doesn't set logs to amqp,
your configuration is wrong.


joaquin.sil...@gmail.com

unread,
May 6, 2016, 8:13:10 AM5/6/16
to Fluentd Google Group
Yes, no error is shown.
I tested the amqp conf by connecting the source with amqp output directly and it worked fine.

Mr. Fiber

unread,
May 6, 2016, 8:22:56 AM5/6/16
to Fluentd Google Group
What the result of buffer?

Mr. Fiber

unread,
May 6, 2016, 8:25:21 AM5/6/16
to Fluentd Google Group
I tested the amqp conf by connecting the source with amqp output directly and it worked fine.

It means you use conf like below?

<source>
  @type tail
</source>

<match>
  @type amqp
</match>

joaquin.sil...@gmail.com

unread,
May 6, 2016, 8:29:16 AM5/6/16
to Fluentd Google Group
yeah, this one exactly:

<source>
  @type tail
  path /home/ubuntu/datos-prueba/SG04_Itau_LogsInformes_ssl__37_160413122345.log
  pos_file /var/log/td-agent/tmp/access.log.pos
  tag input
  format none
  read_from_head true
  #time_format %Y%m%d%H%M%S%6N
</source>

<match input>
  @type amqp

  # Set broker host and port
  host xxxxxxx
  port 5672

  # Set user and password for authentication
  user xxxxx
  password xxxxxx

  # Configure amqp entities vhost, exchange id and type
  vhost /
  exchange xxxxxx
  exchange_type topic
  exchange_durable true # optionally set exchange durability - default is true.
  payload_only true # optional - default is false. if true, only the payload will be sent. if false, data format is { "key" => tag, "timestamp" => time, "payload" => record }.
  content_type application/octet-stream # optional - default is application/octet-stream. some amqp consumers will expect application/json.
</match>

Mr. Fiber

unread,
May 6, 2016, 8:30:58 AM5/6/16
to Fluentd Google Group
I see. Please paste the result of amqp buffer.

joaquin.sil...@gmail.com

unread,
May 6, 2016, 8:44:41 AM5/6/16
to Fluentd Google Group
This is the result:

{"plugins":[{"plugin_id":"object:3fdc1ef827a0","plugin_category":"input","type":"tail","config":{"@type":"tail","path":"/home/ubuntu/datos-prueba/SG04_Itau_LogsInformes_ssl__37_160413122345.log","pos_file":"/var/log/td-agent/tmp/access.log.pos","tag":"input","format":"none","read_from_head":"true"},"output_plugin":false,"retry_count":null},{"plugin_id":"object:3fdc1ef81350","plugin_category":"input","type":"monitor_agent","config":{"@type":"monitor_agent","bind":"0.0.0.0","port":"24220"},"output_plugin":false,"retry_count":null},{"plugin_id":"object:3fdc1ea3da24","plugin_category":"output","type":"rewrite_tag_filter","config":{"@type":"rewrite_tag_filter","capitalize_regex_backreference":"no","rewriterule1":"message ssl sslyes","rewriterule2":"message ^((?!ssl).)*$ sslno"},"output_plugin":true,"retry_count":null},{"plugin_id":"object:3fdc1ea128ec","plugin_category":"output","type":"amqp","config":{"@type":"amqp","host":"xxxxxx","port":"5672","xxxx":"xxxxx","password":"xxxxxx","vhost":"/","exchange":"fd_exchange","exchange_type":"topic","exchange_durable":"true","payload_only":"true","content_type":"application/octet-stream"},"output_plugin":true,"buffer_queue_length":0,"buffer_total_queued_size":0,"retry_count":0},{"plugin_id":"object:3fdc1efef698","plugin_category":"output","type":"stdout","config":{"type":"stdout"},"output_plugin":true,"retry_count":null}]}

Mr. Fiber

unread,
May 6, 2016, 9:35:01 AM5/6/16
to Fluentd Google Group
buffer_queue_length is 0 so your amqp doesn't receive the logs.
That's weird.

joaquin.sil...@gmail.com

unread,
May 6, 2016, 9:55:40 AM5/6/16
to Fluentd Google Group
There is a way that you can test my configuration? I'm using RabbitMQ as the amqp.

Mr. Fiber

unread,
May 6, 2016, 10:00:10 AM5/6/16
to Fluentd Google Group
There is a way that you can test my configuration? 

amqp plugin itself seems no problem because buffer is a part of BufferedOutput, not amqp itself.

Mr. Fiber

unread,
May 6, 2016, 10:03:56 AM5/6/16
to Fluentd Google Group
Could you change this output config:

<match sslyes sslno>
 @type amqp
 host xxxxxx
 port 5672
 user xxxxx
 password xxxxxx
 vhost /
 exchange xxxxxx
 exchange_type topic
 exchange_durable true
 payload_only true
 content_type application/octet-stream
</match>

to:

<match sslyes sslno>
  @type copy
  <store>
    @type stdout
  </store>
  <store>

    @type amqp
    host xxxxxx
    port 5672
    user xxxxx
    password xxxxxx
    vhost /
    exchange xxxxxx
    exchange_type topic
    exchange_durable true
    payload_only true
    content_type application/octet-stream
  </store>
</match>



Mr. Fiber

unread,
May 6, 2016, 10:21:42 AM5/6/16
to Fluentd Google Group
BTW, did you remove pos_file before re-start fluentd?

joaquin.sil...@gmail.com

unread,
May 6, 2016, 10:33:50 AM5/6/16
to Fluentd Google Group
The stdout part works but is not queueing.

joaquin.sil...@gmail.com

unread,
May 6, 2016, 10:34:09 AM5/6/16
to Fluentd Google Group
Yes

Mr. Fiber

unread,
May 6, 2016, 7:14:26 PM5/6/16
to Fluentd Google Group
How about inserting debug code?

Insert `log.warn event.to_json` to check events are flushed or not at after this line:


joaquin.sil...@gmail.com

unread,
May 9, 2016, 8:35:59 AM5/9/16
to Fluentd Google Group
This is the result,

"output_plugin":true,"buffer_queue_length":0,"buffer_total_queued_size":0,"retry_count":0

2016-05-09 09:33:13 -0300 [warn]: {"message":"2016-04-14 02:59:19...}
2016-05-09 09:33:13 -0300 [warn]: {"message":"2016-04-14 02:59:22...}
...

joaquin.sil...@gmail.com

unread,
May 11, 2016, 11:35:23 AM5/11/16
to Fluentd Google Group
No one has a clue?.

Mr. Fiber

unread,
May 11, 2016, 11:36:53 AM5/11/16
to Fluentd Google Group
How to check queued or not?
From your logs, fluentd suceeded to send events.

Mr. Fiber

unread,
May 11, 2016, 7:29:00 PM5/11/16
to Fluentd Google Group
I checked on my Mac and it worked.

- configuration example 

<source>
 @type tail
 path /path/to/ssl_test.log

 tag input
 format none
 read_from_head true
</source>

<match input>
 @type rewrite_tag_filter
 capitalize_regex_backreference no
 rewriterule1 message ssl sslyes
 rewriterule2 message ^((?!ssl).)*$ sslno
</match>

<match sslyes sslno>
  @type amqp
  host localhost
  port 5672
  user guest
  password guest
  vhost /
  exchange amq.direct
  exchange_type direct

  exchange_durable true
  payload_only true
  content_type application/octet-stream
  flush_interval 5s
</match>

- ssl_test.log

this is ssl text
this is normal text

- rabbitmq status

% ./rabbitmqadmin list queues
+----------------+----------+
|      name      | messages |
+----------------+----------+
| fluentd.sslno  | 1        |
| fluentd.sslyes | 1        |
+----------------+----------+

- subscriber example

require "rubygems"
require "bunny"

STDOUT.sync = true

conn = Bunny.new
conn.start
at_exit {
  conn.close
}

ch = conn.create_channel
ch.queue("fluentd.sslyes").bind("amq.direct", :routing_key => 'sslyes').subscribe do |delivery_info, metadata, payload|
  puts "Received #{payload} from sslyes"
end
ch.queue("fluentd.sslno").bind("amq.direct", :routing_key => 'sslno').subscribe do |delivery_info, metadata, payload|
  puts "Received #{payload} from sslno"
end

- subscriber result

Received {"message":"this is ssl text"} from sslyes
Received {"message":"this is normal text"} from sslno

- env

fluentd v0.12.22
fluent-plugin-amqp2 v0.1.0
rabbitmq-3.6.1

joaquin.sil...@gmail.com

unread,
May 13, 2016, 8:42:14 AM5/13/16
to Fluentd Google Group
I found what the problem was. I'm trying to create an exchange that recives both flows, sslyes and sslno. So i was creating binds with routing key like "sslyes sslno" or "ssl*", that does not works. So now i want to merge the two tags into one, so i can create a unique exchange for both flows.

Thanks, :)

joaquin.sil...@gmail.com

unread,
May 13, 2016, 8:58:02 AM5/13/16
to Fluentd Google Group
Solved with:
<match sslyes sslno>
  type retag
  tag output
</match>
Reply all
Reply to author
Forward
0 new messages