parse syslog message and forward to different kafka topics on the basis of msg id in syslog.

1,170 views
Skip to first unread message

Godfather858

unread,
Feb 26, 2019, 1:07:51 PM2/26/19
to Fluentd Google Group
My requirement is to send the syslog message to different Kafka topics depending on the msg id in the syslog.

I have tried using grep but it filters the syslog in descending order and once filtered I am unable to check for further msg ids in the syslog.

Godfather858

unread,
Feb 26, 2019, 1:16:15 PM2/26/19
to Fluentd Google Group


On Tuesday, February 26, 2019 at 11:37:51 PM UTC+5:30, Godfather858 wrote:
My requirement is to send the syslog message to different Kafka topics depending on the msg id in the syslog.

I have tried using grep but it filters the syslog in descending order and once filtered I am unable to check for further msg ids in the syslog.
For example , syslogs having msgid1 goes to kafka topic "msgid1" or syslogs having msgid3 goes to kafka topic "msgid3"

Mr. Fiber

unread,
Feb 28, 2019, 2:29:01 AM2/28/19
to Fluentd Google Group
Use msgid in topic_key and buffer agument. Check README:


--
You received this message because you are subscribed to the Google Groups "Fluentd Google Group" group.
To unsubscribe from this group and stop receiving emails from it, send an email to fluentd+u...@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

Godfather858

unread,
Mar 3, 2019, 12:34:53 AM3/3/19
to Fluentd Google Group
Thanks, 

But is there any way I can group multiple MsgId into single topics? 
For example, Msgid1 and MsgId2 go to TopicA whereas MsgId5 and Msgid6 go to TopicB.


On Tuesday, February 26, 2019 at 11:37:51 PM UTC+5:30, Godfather858 wrote:

Mr. Fiber

unread,
Mar 4, 2019, 9:18:20 AM3/4/19
to Fluentd Google Group
> But is there any way I can group multiple MsgId into single topics? 

You can use record_transformer or similar filter to set new key for topic.


--

Godfather858

unread,
Mar 22, 2019, 5:42:41 AM3/22/19
to Fluentd Google Group
I could use the record transformer to put "parition_key" in message. (If key name partition_keyexists in a message, this plugin set its value of partition_key as key.).
But How do I dynamically assign topic name ? and is there a way to do string comparison in record_transformer?  (I couldn't find any examples for that.)

Godfather858

unread,
Mar 24, 2019, 3:28:52 AM3/24/19
to Fluentd Google Group
In the plugin code, I found that using the "topic" field in message record we can dynamically assign topics.

Mr. Fiber

unread,
Mar 25, 2019, 6:13:39 PM3/25/19
to Fluentd Google Group
With out_kafka2, you need to set its field in buffer argument.

cincu....@gmail.com

unread,
May 4, 2019, 2:52:34 PM5/4/19
to Fluentd Google Group
Hi, can you give me an example please how to send specific log to specific topic ?

For example:

{"message":"<191>1061: May  4 21:48:55: %SYS-7-USERLOG_DEBUG: Message from tty0(user id: ): asd","source":"192.168.0.17"}   ===> ciscotopic1
{"message":"<189>1065: May  4 21:49:04: %LINEPROTO-5-UPDOWN: Line protocol on Interface Loopback14, changed state to down","source":"192.168.0.17"} ====> ciscotopic2

I have the following config

<source>
  @type udp
  port 5140
  bind 0.0.0.0
  tag fluentd.syslog.cisco.udp
  # Set 'source' as the field name for the message source
  source_hostname_key source
  # Using built-in 'none' parser which shoves the whole log into the message field
  # and sets the timestamp to the time the message was received, see:
  # https://docs.fluentd.org/v1.0/articles/parser_none
  format none
</source>

<match fluentd.syslog.cisco.**>
  @type kafka2
  topic_key ciscotopic1
  default_message_key '%LINK-5-CHANGED'
  message_key_key '%LINK-5-CHANGED'
  brokers kafka01:9092
  default_topic ciscotopic
    <format>
      @type json
    </format>
  <buffer topic>
    flush_interval 10s
  </buffer>
</match>

To unsubscribe from this group and stop receiving emails from it, send an email to flu...@googlegroups.com.

Godfather858

unread,
May 6, 2019, 4:39:35 AM5/6/19
to Fluentd Google Group
you can add the topic field in the output message record which is to be forwarded to kafka. That way it will take the topic dynamically. If topic field is not present in the record, it will take up the default_topic. 

use @record_transformer plugin with conditionals statements (as done in ruby) to add the topic field in the output message.

Adrian

unread,
May 6, 2019, 4:43:39 AM5/6/19
to flu...@googlegroups.com
Can you give me a configuration example?

To unsubscribe from this group and stop receiving emails from it, send an email to fluentd+u...@googlegroups.com.

Godfather858

unread,
May 6, 2019, 4:53:29 AM5/6/19
to Fluentd Google Group
filter syslog.**>
  @type record_transformer
  enable_ruby true
  <record>
   partition_key ${record["host"]}
   topic ${if (record["msgid"] == "abc" or record["msgid"] == "xyz") then "topic_name1" elsif (record["msgid"] == "pqr"  or record["msgid"] == "rtd") then "topic_name2" end}
  </record>
</filter>

<match syslog.**>
  @type kafka_buffered
  # list of seed brokers

  # buffer settings
  buffer_type file
  buffer_path /var/log/td-agent/buffer/td
  flush_interval 3s
  num_threads 10

  # topic settings
  default_topic "default_topic_name"

  # data type settings
  output_data_type json
#  compression_codec gzip

  # producer settings
  max_send_retries 1
  required_acks -1
</match>

Adrian

unread,
May 6, 2019, 4:54:50 AM5/6/19
to flu...@googlegroups.com
Thank you very much ! 

To unsubscribe from this group and stop receiving emails from it, send an email to fluentd+u...@googlegroups.com.

Adrian

unread,
May 6, 2019, 10:11:08 AM5/6/19
to flu...@googlegroups.com
The syslog messages that i received from cisco switch based on IOS-XE doesn't have msgid
Any ideea on how i can filter based on anything else? Maybe based on string ? It's possible ?

On Mon, May 6, 2019 at 11:53 AM Godfather858 <parthsar...@gmail.com> wrote:
To unsubscribe from this group and stop receiving emails from it, send an email to fluentd+u...@googlegroups.com.

Godfather858

unread,
May 7, 2019, 2:37:20 AM5/7/19
to Fluentd Google Group
Yes it should be possible.
Try exploring more on the record_transformer plugin. You may find your solution there.  

Adrian

unread,
May 13, 2019, 1:45:16 PM5/13/19
to flu...@googlegroups.com
Hi,

I managed to get msg-id from cisco syslog, it's xml format. But i cannot route messages based on msg-id value

Here is cisco log:
{"message":"<187>1467: <ios-log-msg><facility>LINK</facility><severity>3</severity><msg-id>UPDOWN</msg-id><time>May  9 22:55:09</time><args><arg id=\"0\">Loopback13</arg><arg id=\"1\">up</arg></args></ios-log-msg>","source":"192.168.0.17","topic":null}

Here is fluentd config:
<source>
  @type udp
  port 5140
  bind 0.0.0.0
  tag syslog

  # Set 'source' as the field name for the message source
  source_hostname_key source
  # Using built-in 'none' parser which shoves the whole log into the message field
  # and sets the timestamp to the time the message was received, see:
  # https://docs.fluentd.org/v1.0/articles/parser_none
  format none
</source>

<filter syslog.**>
@type record_transformer
  enable_ruby true
  #partition_key ${record["msg-id"]}
  <record>
   topic ${if (record["<msg-id>"] == "UPDOWN" or record["<msg-id>"] == "LINK") then "ciscotopic2"

  </record>
</filter>


<match syslog.**>
  @type kafka_buffered
  brokers kafka01:9092

  # buffer settings
  buffer_type file
  buffer_path /var/log/td-agent/buffer/td
  flush_interval 3s
  num_threads 10

  # topic settings
  default_topic "ciscotopic"
  max_send_retries 1
  required_acks -1
  get_kafka_client_log true



On Mon, May 6, 2019 at 11:53 AM Godfather858 <parthsar...@gmail.com> wrote:
To unsubscribe from this group and stop receiving emails from it, send an email to fluentd+u...@googlegroups.com.
Reply all
Reply to author
Forward
0 new messages