New to fluentd - need some assistance

3,122 views
Skip to first unread message

Angelo

unread,
Dec 14, 2014, 12:30:12 PM12/14/14
to flu...@googlegroups.com
I just recently started using fluentd. I am currently comparing/evaluating this product in parallel with logstash.


I would like to know how to do the equivalent of logstash's mutate-replace. For instance, how would this be done:

     mutate {

            replace => { "newfield" => "%{previously_computedfield1} - %{previously_computedfield2}" }

     }

 


My problem is that I am accustomed to logstash, where it more resembles normal programming - 
for instance, you can use conditional statements (if) and treat fields as variables (as above with %xxx). 
I am not quite there yet with fluentd.


Any assistance is greatly appreciated.



Kiyoto Tamura

unread,
Dec 14, 2014, 12:57:39 PM12/14/14
to flu...@googlegroups.com
Hi Angelo-

More than happy to help.

To answer your question first, here is how you would do something similar. You are lucky: we just released v0.12, which makes it much more straightforward to do this.

<source>
  type syslog
  tag system
</source>

<filter system.**>
  type record_transformer
  <record>
    newfield ${previous_computedfield1}-${previous_computedfield2}
  </record>
</filter>

<match system.**>
  type stdoug
</match>

should do the job: what the a I am _just_ writing the docs for filters right now, but here is an example config you can look at in the meantime: https://github.com/fluent/fluentd/blob/master/example/v0_12_filter.conf

I will announce here once the v0.12 docs are up and running.

Kiyoto


--
You received this message because you are subscribed to the Google Groups "Fluentd Google Group" group.
To unsubscribe from this group and stop receiving emails from it, send an email to fluentd+u...@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.


--
Check out Fluentd, the open source data collector to unify log management.

Kiyoto Tamura

unread,
Dec 14, 2014, 1:02:54 PM12/14/14
to flu...@googlegroups.com
>should do the job: what the a

I misfired without completing the sentence. WHat I meant to say was

what the above config does is:

1) <source>...</source> listens to syslog messages and parses it (you can add your own parser too) and assigns the tag "system", which, in syslog's case, generates a tag "system.facility.priority"
2) <filter> matches the event and does what you want
3) <match system.**> matches the tag "system.facility.priority" like "system.kern.warn" and prints out to stdout (there was a typo "stdoug")

I hope this helps.

Kiyoto

Angelo

unread,
Dec 15, 2014, 2:07:29 PM12/15/14
to flu...@googlegroups.com
Thank-you for the reply. I will give this a try shortly.

Can you please tell me how to perform the equivalent of this logstash code fragment in fluentd:

If [previously_computedfield] == some regex {

Case 1

} else {

Case 2

}

Kiyoto Tamura

unread,
Dec 16, 2014, 2:37:42 AM12/16/14
to flu...@googlegroups.com
Fluentd does not have the if...else construct. It is entirely declarative based on tags (and labels in v0.12)

That said, you can use https://github.com/fluent/fluent-plugin-rewrite-tag-filter to cover the scenario that you described.

Kiyoto


}

--
You received this message because you are subscribed to the Google Groups "Fluentd Google Group" group.
To unsubscribe from this group and stop receiving emails from it, send an email to fluentd+u...@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

Angelo

unread,
Dec 16, 2014, 11:03:11 AM12/16/14
to flu...@googlegroups.com
Kiyoto,


With your help, I have been able to convert most of my logstash scripts to fluentd.
I have just a few other items I would like some assistance with:


1. Perform the equivalent of :
       mutate {
convert => { "field1" => "integer" }
convert => { "field2" => "integer" }
                etc ....  
       }

      The output would be to elasticsearch, hence I am performing this conversion.


2.   mutate {
           remove_field => {"field1", "field2", ...}

     }


While I am waiting for a response, I will go through the list of filters and see if I can find something that will do the above.

 
Thank-you,

Angelo

Mr. Fiber

unread,
Dec 16, 2014, 12:12:32 PM12/16/14
to flu...@googlegroups.com
Fluentd does not have the if...else construct. It is entirely declarative based on tags (and labels in v0.12)

Yes. Fluentd has a tag to indentify data source and separate event streams.
I don't know the Logstash manner but
record content based conditional branching is rare in Fluentd.

Of course, rewrite-tag-fiilter is useful for aggregation, re-construct event stream and etc.


Masahiro

Kiyoto Tamura

unread,
Dec 16, 2014, 1:12:55 PM12/16/14
to flu...@googlegroups.com
Hi Angelo-

Using the record_transformer filter:

<filter foo.bar>
  type record_transformer
  enable_ruby
  remove_keys remove_field1,remove_field2 #remove fields.
  <record>
    field1 ${field1.to_i}  # converting to int
    field2 ${field2.to_s} # converting to str
  </record>
</filter>

Angelo

unread,
Dec 16, 2014, 2:15:32 PM12/16/14
to flu...@googlegroups.com
Kiyoto,

I will give this a try. Thank-you.



2 more item I need to resolve ...


#1
The kv filter allows tokenizing only keys we are interested in. For instance ...

kv {
  source => "my_source"
  field_split => "field_splitter"
  value_split => "value_splitter"
  include_keys => ["key1", "key2", etc ...]
}
 
Using the "include_keys" is useful as to not tokenize any unnecessary fields. 


How can I reproduce this same behavior? 
(fields_parser works fine in tokenizing but it does not have an inclusion list)
 

#2
Logs recorded in JSON format can be extracted using the json 'plugin' in logstash.
For instance, this will extract json logs and will retain only the fields we are interested in ...

if [type] == "JSON_TEST" {
json {
source => "message"
target => "jsontemp"
}
                # retain only the 3 fields of interest
mutate {
 replace => { "firstname" => "%{jsontemp[firstName]}" }
 replace => { "phonenumbers" => "%{jsontemp[phoneNumbers]}" }
 replace => { "address" => "%{jsontemp[address]}" }
}
# delete/remove originally parsed json 
mutate {
remove_field => [ "jsontemp" ]
}
}

Looking at the list of plugins, these 2 plugin-ins may be of use, although the documentation is lacking:

There is also the splunk_ex plugin can produce json output for use with a splunk server.


Angelo

Kiyoto Tamura

unread,
Dec 16, 2014, 2:29:49 PM12/16/14
to flu...@googlegroups.com
Hi Angelo-

I need to do some research on 1. What is your data source? TCP? File?

For 2, you can use "format json" in your <source> declaritives and it parses incoming payload as JSON. See, for example, in_tail's documentation (https://docs.fluentd.org/articles/in_tail) for the list of supported parsers out of the box (search for "format json" on that page, etc.).

Kiyoto

Angelo

unread,
Dec 16, 2014, 3:47:40 PM12/16/14
to flu...@googlegroups.com
Data source is syslog format via udp on a given port.
Input is being provided via nxlog using the pm_transformer module to generate syslog_bsd output format.


Angelo

Angelo

unread,
Dec 17, 2014, 5:39:08 PM12/17/14
to flu...@googlegroups.com
Kiyoto,

remove_keys on record transformer works. Although, converting to i does not seem to work.


I am trying to convert a computed field from a previous step to an integer ...

<match parse98>
   type parser
   key_name myfield
   format /.<field1>(?<myfield>(?:[+-]?(?:[0-9]+)))</field1>.*/
   suppress_parse_error_log true
   reserve_data yes
   tag parse99
</match>



# convert the relevant fields to integer
<filter parse99>
   type record_transformer
   enable_ruby 
   <record>
      myfield ${myfield.to_i}
   </record>
</filter>


# output to elasticsearch
<match parse99.**>
    type elasticsearch

    etc ....


Angelo

Kiyoto Tamura

unread,
Dec 17, 2014, 6:13:39 PM12/17/14
to flu...@googlegroups.com
Hi Angelo-

I am about to release a plugin parser for kv pairs that should be able to...

1. parse the incoming text into JSON, e.g. "k1=v1 k2=v2" -> {"k1":"v1", "k2":"v2}
2. do type conversions, including parsing the time field, e.g., "k1=v1 k2=200 time=2013-01-01T00:00:00" -> {"k1":"v1", "k2":200} with time = 1356998400

And you can use it with various input plugins (in_tail, in_tcp, in_udp, etc.) like this

<source>
  type tail
  path /path/to/kv-log
  format kv
 
...
</source>

I will follow up on this thread once it is released.

Kiyoto

Kiyoto Tamura

unread,
Dec 17, 2014, 6:40:14 PM12/17/14
to flu...@googlegroups.com
Hi Angelo,

I just released a parser plugin. You might be way too far in to find this useful, but this might simplify things for you: https://github.com/kiyoto/fluent-plugin-kv-parser

Also, can you share your current Logstash config? I can see if I can make one for Fluentd to do the same.

Kiyoto

Mr. Fiber

unread,
Dec 17, 2014, 11:38:17 PM12/17/14
to flu...@googlegroups.com
fluentd's parser has 'types' parameter to convert value types.
See 'types' section in this document: http://docs.fluentd.org/articles/in_tail

Latest fluent-plugin-parser uses fluentd built-in parser so
you can also use 'types' like below.

    types myfield:integer


Masahiro

Angelo

unread,
Dec 18, 2014, 9:26:01 AM12/18/14
to flu...@googlegroups.com
Masahiro,

Thank-you. ' types myfield:integer' works fine.


Kiyoto,

>> Also, can you share your current Logstash config? I can see if I can make one for Fluentd to do the same.

So far, I have cloned the functionality of logstash except for the 1 item I mentioned previously - tokenizing only specific items of interest.
I can probably accomplish this with some creativity.


Angelo

Kiyoto Tamura

unread,
Dec 18, 2014, 11:51:59 AM12/18/14
to flu...@googlegroups.com
Angelo-

Oh, is this related to #1 that you mentioned earlier? If so, can you show me

1. what the input looks like
2. what the desired output looks like after going through the kv filter in logstash?

Kiyoto

Angelo

unread,
Dec 18, 2014, 1:37:21 PM12/18/14
to flu...@googlegroups.com
Kiyoto,


From my previous email ...

kv {
   source => "my_source"
   field_split => "field_splitter"
   value_split => "value_splitter"
   include_keys => ["key1", "key2", etc ...]
}


Let us assume that the field_split value is a '|', the value split value is '=' 
Let's say that my_source is the following:


city=los angeles|country=united states|zipcode=50210|firstname=bob|lastname=smith|streetname=main street

(Real input would be much bigger than this)



If include_keys was:  ["city", "firstname"] 

The output would only contain city and firstname with their values "los angeles" and "bob", respectively


include_keys is preventing me from logging undesired/useless information to elasticsearch.



Angelo

Kiyoto Tamura

unread,
Dec 18, 2014, 2:09:42 PM12/18/14
to flu...@googlegroups.com
Hi Angelo

1. Install fluent-plugin-kv-parser with "gem install fluent-plugin-kv-parser"
2. Here is the Fluentd config that parses the key-value pairs coming in from UDP on port 24225, parses it, and keeps only the fields city and firstname before outputting to stdout (substitute it elasticsearch for your use case)

<source>
  type udp
  port 24225
  format kv
  kv_delimiter "|"
  tag my_tag
</source>

<filter my_tag>
  type record_transformer
  renew_record true
  keep_keys city,firstname
</filter>

<match my_tag>
  type stdout
</match>

3. If you send

echo "city=new york|firstname=kiyoto|other_field=2|another_field=3" | netcat -u localhost 24225

You get

2014-12-18 19:07:55 +0000 my_tag: {"city":"new york","firstname":"kiyoto"}

Note that the undeeded field is now gone.

You can also use "types" with "format kv", so,


<source>
  type udp
  port 24225
  format kv
  kv_delimiter "|"
  tag my_tag
  types a_field:integer
</source>

would convert the value of "a_field" into int.

Let me know if you have more questions.

Kiyoto

Angelo

unread,
Dec 18, 2014, 2:21:00 PM12/18/14
to flu...@googlegroups.com
Thank-you for this,

Angelo

Angelo

unread,
Dec 18, 2014, 5:54:25 PM12/18/14
to flu...@googlegroups.com
Kiyoto,


I am now testing json parsing.

On the windows side, I push json data no different than any other of my log data.
I go through a syslog trasnaformation.

I. E. ...

 
# json log processing
<Processor t5>
Module pm_transformer
OutputFormat syslog_bsd
Exec $Message=(": "+$raw_event);
</Processor>

<Output out5>
Module om_tcp
Host somehost 
Port someport
</Output>

<Route r5>
Path in5 => t5 => out5
</Route>


On the fluentd side, code is:

<source>
   type syslog
   port 5144
   protocol_type tcp
   tag nxlogjsontest
   format json
</source>
 

This causes a parsing failure on fluentd. I'm guessing this is not the proper way to do it.

With logstash, I do the following to extract the input to specific fields:

json {
source => "message"
target => "jsontemp"
}



Kiyoto Tamura

unread,
Dec 18, 2014, 7:28:43 PM12/18/14
to flu...@googlegroups.com
Angelo-

Can you provide both the entire nxlog conf and Logstash conf? That would be hugely helpful.

Kiyoto

Angelo

unread,
Dec 19, 2014, 9:24:17 AM12/19/14
to flu...@googlegroups.com
Kiyoto,

Due to company policy, I can't provide details. Here is enough information to explain the issue/situation:


nxlog config looks like this:


define ROOT C:\Program Files (x86)\nxlog

Moduledir %ROOT%\modules
CacheDir %ROOT%\data
Pidfile %ROOT%\data\nxlog.pid
SpoolDir %ROOT%\data
LogFile c:\logs\nxlog\fluentd\nxlog2.log


<Extension syslog>
Module      xm_syslog
</Extension>

<Extension json>
Module      xm_json
</Extension>

<Extension exec>
Module      xm_exec
</Extension>


>>> many items here, omitted <<<


# json log processing
<Input in5>
Module im_file
        File "c:\logs\JSON_Test.txt"
SavePos TRUE
InputType LineBased
</Input>

<Processor t5>
Module pm_transformer
OutputFormat syslog_bsd
Exec $Message=(": "+$raw_event);
</Processor>


<Output out5>
Module om_tcp
Host 10.x.x.x 
Port 5144
</Output>

<Route r5>
Path in5 => t5 => out5
</Route>


fluentd config (this is just a snippet of the entire config):

# JSON testing
<source>
   type syslog
   port 5144
   protocol_type tcp
   tag nxlogjsontest
   format json
</source>




<match nxlogjsontest.**>
   type copy

    # write output to file
    <store>
       type file
       path /home/angelo/nxinput_jsontest
    </store>

    # extract contents - I tried below and this failed too
#    <store>
#       type parser
#       key_name message
#       suppress_parse_error_log true
#       reserve_data yes
#       tag parse501
#    </store>

    <store>
        type elasticsearch
        hosts host1:9200,host2:9200,host3:9200,host4:9200
        index_name indexname
        type_name JSonTest
        buffer_type file
        buffer_path /home/angelo/buffer55/
        buffer_chunk_limit 256m
        retry_wait 15s
     </store>
</match>



One thing I want to mention, with logstash, there is very good control of how the json is extract.
Consider this logstash example (I think I mentioned this before in a previous email):


if [type] == "JSON_TEST" {
json {
source => "message"
target => "jsontemp"
}
mutate {
 replace => { "firstnamenew" => "%{jsontemp[firstName]}" }
 replace => { "phoneNumbersnew" => "%{jsontemp[phoneNumbers]}" }
 replace => { "addressnew" => "%{jsontemp[address]}" }
}
# delete/remove temp fields 
mutate {
remove_field => [ "jsontemp" ]
}
}

With this code, we can extract all json elements and then only keep what we are interested in.
Ideally, I need the equivalent in fluentd.


Angelo

Kiyoto Tamura

unread,
Dec 19, 2014, 3:07:27 PM12/19/14
to flu...@googlegroups.com
Angelo-

It looks like the latest version of Nxlog broke compatibility with the currently documented integration with Fluentd (docs.fluentd.org/articles/windows).

Let me look into this more closely. Meanwhile, can you share the version of nxlog that you are running?

Kiyoto

Angelo

unread,
Dec 19, 2014, 4:47:26 PM12/19/14
to flu...@googlegroups.com
Kiyoto,

It's version nxlog-ce-2.8.1248

Kiyoto Tamura

unread,
Dec 19, 2014, 7:03:19 PM12/19/14
to flu...@googlegroups.com
Hi Angelo-

I tried with the same version of nxlog and followed docs.fluentd.org/articles/windows. It works as long as you send data over UDP. When TCP is used (like your config), nxlog starts garbling the payload with random bytes/etc. I am still trying to figure out why.

Can you switch to UDP as shown on docs.fluentd.org/articles/windows and see if you see data on the Fluentd's side?

Thanks,

Kiyoto

Kiyoto Tamura

unread,
Dec 19, 2014, 7:50:18 PM12/19/14
to flu...@googlegroups.com
Angelo-

Okay, I took another good look at this, and here is my conclusion

1. om_tcp can be supported with a different, simpler config
2. but not when syslog_bsd is used for reasons unknown to me

I will update the docs over the next few days to use 1) TCP and 2) much simpler configs. The new one won't try to shoehorn nxlog -> Fluentd into syslog and instead directly work with newline delimited TCP payload.

Kiyoto

Kiyoto Tamura

unread,
Dec 19, 2014, 9:29:08 PM12/19/14
to flu...@googlegroups.com
Angelo-

I just updated the Windows article on docs.fluentd.org/articles/windows

I believe that the new version is much more streamlined and also meets your needs better. Please check it out.

If you have more questions, please feel free to follow up =)

Thank you for making Fluentd better!

Kiyoto

Angelo

unread,
Dec 22, 2014, 11:56:09 AM12/22/14
to flu...@googlegroups.com
Kiyoto, I should be thanking you for making a good product and working so hard to support it.


I want to mention the following:

Replacing syslog source type with tcp source, type format none works ok when the input is single line.
I did not get as far as testing json formatted input because I encountered an issue ...

On multiline input, (i.e. using xm_multiline with nxlog) causes the system to be broken. 
It appears that the input is being received as multiple lines as opposed to one large line of input.
I will investigate this further. It is perhaps due to 'type format none'


Angelo

Kiyoto Tamura

unread,
Dec 22, 2014, 12:32:11 PM12/22/14
to flu...@googlegroups.com
Angelo-

>On multiline input, (i.e. using xm_multiline with nxlog) causes the system to be broken. It appears that the input is being received as multiple lines as opposed to one large line of input. I will investigate this further. It is perhaps due to 'type format none'

I will look at this myself too, but most likely it is NXLog's issue.

Kiyoto

Angelo

unread,
Dec 22, 2014, 3:39:25 PM12/22/14
to flu...@googlegroups.com
Kiyoto,

I am currently experimenting with syslog-ng as a replacement to nxlog for my Windows machines.
I like the fact that syslog-ng supports high-availability by allowing the use of failover servers in the event one communication to the primary server fails.

I am currently testing multi-line support. In syslog-ng, this is accomplished with multi-line-prefix(regex) added to the source.

In the fluentd config, I am echoing output to file:


    <store>
       type file
       path /home/angelo/nxinput
    </store>

   <store>
         >>> lots of stuff here and the output to elasticsearch eventually <<<
   </store



nxinput shows that the input is not one continuous line rather a series of inputs.
I am not certain that I am using syslog-ng correctly, but, please let me know if you have found any issue with the tcp input module.


Angelo

Kiyoto Tamura

unread,
Dec 22, 2014, 4:09:05 PM12/22/14
to flu...@googlegroups.com
>nxinput shows that the input is not one continuous line rather a series of inputs.

What is 'nxinput'?

Also, if you are using syslog on your windows machine, in_syslog might be a better choice. But I personally have little experience with syslog-ng itself, let alone running it on Windows with multi-line log messages.

Have you looked at using buffering in nxlog for reliability? > http://nxlog-ce.sourceforge.net/nxlog-docs/en/nxlog-reference-manual.html#pm_buffer

Angelo Carzis

unread,
Dec 22, 2014, 4:14:36 PM12/22/14
to flu...@googlegroups.com
nxinput is just the name of the log file I am creating. I gave it the name nxinput.


>> Have you looked at using buffering in nxlog for reliability? > http://nxlog-ce.sourceforge.net/nxlog-docs/en/nxlog-reference-manual.html#pm_buffer
Yes I did. Buffering does not accomplish automatic failover.

I too have little experience with syslog-ng. I just began using it a few days ago. I have not performed thorough testing but my preliminary tests shows 
that failover was working,





  




--
You received this message because you are subscribed to a topic in the Google Groups "Fluentd Google Group" group.
To unsubscribe from this topic, visit https://groups.google.com/d/topic/fluentd/671Jp9zIkLs/unsubscribe.
To unsubscribe from this group and all its topics, send an email to fluentd+u...@googlegroups.com.

Angelo Carzis

unread,
Dec 23, 2014, 11:01:48 AM12/23/14
to flu...@googlegroups.com
Masahiro,

Can you please tell me for what other plugin, besides parser,  does 'types' work with?



I am still not able to convert a field to a date/time value.


This my situation. I have something like this:

<filter parse0>
   type record_transformer
   <record>
       mydatetimeentry  ${the_year}/${the_month}/${the_day} ${the_hour}:${the_minute}:${the_second}.${the_msecond}
   </record>
</filter>


mydatetimeentry contains a 'string' that is holding a date/time value. I need to convert this to a (true) date/time field and store it in elasticsearch.

I tried using a instance of the parser plugin...


<match parse0>
   type parser
   key_name message
   types logstampin:time                          <<-- there may be a mistake here. This may not even be valid as logstampin is not found within this element.
</match>


According to the docs:

For the time and array types, there is an optional third field after the type name. For the “time” type, you can specify a time format like you would in time_format.

So I tried the following:      types logstampin:time:%F %T.%L  ?
This did not work.


Any suggestions?



Angelo








--
You received this message because you are subscribed to a topic in the Google Groups "Fluentd Google Group" group.
To unsubscribe from this topic, visit https://groups.google.com/d/topic/fluentd/671Jp9zIkLs/unsubscribe.
To unsubscribe from this group and all its topics, send an email to fluentd+u...@googlegroups.com.

Kiyoto Tamura

unread,
Dec 23, 2014, 11:25:59 AM12/23/14
to flu...@googlegroups.com
Angelo-



> Can you please tell me for what other plugin, besides parser,  does 'types' work with?

It is not tied to a particular input or output plugin. It is enabled when you do "format /YOUR_REGEX/" or "format csv" or "format tsv" or "format ltsv". I will update the docs to make this point clear.

><match parse0>
>   type parser
>   key_name message
>   types logstampin:time                          <<-- there may be a mistake here. This may not even be valid as logstampin is not found within this element.
></match>

This would have not worked since you did not specify the "format" field. "types" parameter is not tied to fluent-plugin-parser, but the parser plugin, which is specified by the "format" parameter. So, what "format" parameter did you use there?

><filter parse0>
>   type record_transformer
>   <record>
>       mydatetimeentry  ${the_year}/${the_month}/${the_day} ${the_hour}:${the_minute}:${the_second}.${the_msecond}
>   </record>
></filter>

What are you trying to do in that filter directive? Also, is "mydatetimeentry" related to the above?

I know I asked you this before and you said you could not share the entire config file, but it would make it much, much easier to provide support if you can share the config file + error logs in THEIR ENTIRETY. And if that's off-limits, at least the entire config file.

Kiyoto


Angelo

unread,
Dec 23, 2014, 11:53:38 AM12/23/14
to flu...@googlegroups.com
Kiyoto,

My config file resembles the following:

<filter parse0>
   type record_transformer
   <record>
      mydatetimeentry  ${the_year}/${the_month}/${the_day} ${the_hour}:${the_minute}:${the_second}.${the_msecond}
   </record>
</filter>

<match parse0>
   type parser
   key_name message
   types mydatetimeentry:time:%F %T.%L                       <<<-- I want mydatetimeentry converted to a date/time field 
   types myid:integer
   format /.* ID: (?<myid>(?:[+-]?(?:[0-9]+))) - .*/
   suppress_parse_error_log true
   reserve_data yes
   tag parse1
</match>

etc ...


The statement is invalid because it is expected that 'mydatetimeentry' should be a part of the 'format' statement, as you just mentioned.
How can I convert mydatetimeentry to a datetime value?

Kiyoto Tamura

unread,
Dec 23, 2014, 11:58:35 AM12/23/14
to flu...@googlegroups.com
Angelo-

Can you show me the ENTIRE config file as well as what the data looks like when it comes into Fluentd? I can keep pointing out the flaws (like how your "format" parameter has no field called "mydateentry" but you somehow want to parse it), but it won't help me help you without understanding the whole dataflow.

Kiyoto

Angelo Carzis

unread,
Dec 23, 2014, 1:41:28 PM12/23/14
to flu...@googlegroups.com
Config file resembles this...

<match nxloginput.**>
    type copy

    <store>
      type stdout
    </store>

    <store>
       type file
       path /home/angelo/nxinput
    </store>

    <store>
       type parser
       key_name message
       format /\[(?<the_year>[0-9]{4,4})\-(?<the_month>[0-9]{2,2})\-(?<the_day>[0-9]{2,2}) (?<the_hour>[0-9]{2,2}):(?<the_minute>[0-9]{2,2}):(?<the_second>[0-9]{2,2})\.(?<the_msecond>[0-9]{1,3}).*/
       types the_year:integer,the_month:integer,the_day:integer,the_hour:integer,the_minute:integer,the_second:integer,the_msecond:integer
       suppress_parse_error_log true
       reserve_data yes
       tag parse0
    </store>
</match>


# create logstampin
<filter parse0>
   type record_transformer
   <record>
      logstampin ${the_year}/${the_month}/${the_day} ${the_hour}:${the_minute}:${the_second}.${the_msecond}
   </record>
</filter>


# extract theid
<match parse0>
   type parser
   key_name message
   types logstampin:time:%F %T.%L
   types theid:integer
   format /.* THEID: (?<theid>(?:[+-]?(?:[0-9]+))) - .*/
   suppress_parse_error_log true
   reserve_data yes
   tag parse1
</match>


# extract input request, if possible
<match parse1>
   type parser
   key_name message
   format /.* THEID: (?:[+-]?(?:[0-9]+)) - Received request string, (?<inputrequest>.*)/
   suppress_parse_error_log true
   reserve_data yes
   tag parse2
</match>


>> 10 more match parse statements here, similar to the above 'parse1' statement <<
        >> parse2 feeds parse3 which feeds parse4... eventually feeding parse99



# output to elasticsearch
<match parse99.**>
    type elasticsearch
    host myhost
    port 9200
    index_name myindex
    type_name mytype
  # I tried both of these:
    # utc_index false
  # logstash_format true
</match>


## input from nxlog 
<source>
   type tcp
   format none
   port 5140
   tag nxloginput
</source>






nxlog config:


define ROOT C:\Program Files (x86)\nxlog

Moduledir %ROOT%\modules
CacheDir %ROOT%\data
Pidfile %ROOT%\data\nxlog.pid
SpoolDir %ROOT%\data
LogFile c:\logs\nxlog\fluentd\nxlog3.log
LogLevel INFO
 

<Extension syslog>
Module      xm_syslog
</Extension>

<Extension exec>
Module      xm_exec
</Extension>


<Input in1>
Module im_file
File "c:\logs\TransactionLog2*.txt"
SavePos TRUE
InputType LineBased
</Input>

<Output out1>
Module om_tcp
Host 10.0.0.205
Port 5140
</Output>

<Route r1>
Path in1 => out1
</Route>

















Kiyoto Tamura

unread,
Dec 23, 2014, 1:52:19 PM12/23/14
to flu...@googlegroups.com
Angelo-

Thanks so much. This is really helpful.

Right off the bat, the better path forward is to get rid of parse0...parse99 and parse the incoming data with a single parser. It looks like some fields exist while others don't (and this is probably why you are using
   "suppress_parse_error_log true")

What does the incoming log data look like? Does it have any well-defined format?

Kiyoto

Angelo Carzis

unread,
Dec 23, 2014, 2:01:08 PM12/23/14
to flu...@googlegroups.com
The input looks as follows:

[2014-12-04 15:16:41.912] Thread: pool-5-thread-4 - THEID: 20192958 - >>> here there could be any type of text as explained below<<<


What I am trying to do is make sense of the incoming text. What I know is that there are 10 pre-defined possibilities for what text can appear on a given input.
Therefore, I am trying to extract any information that is relevant and try to associate it with it's given id (THEID) and timestamp.




Kiyoto Tamura

unread,
Dec 23, 2014, 2:14:01 PM12/23/14
to flu...@googlegroups.com
I see. how were you doing this with Logstash? I am asking this because if this was already done in Logstash, maybe using Fluentd's grok parser allows you to re-use some of your work: github.com/kiyoto/fluent-plugin-grok-parser

Otherwise, maybe the best path forward is to create a filter version of the parser plugin and use it like this:

<filter nxinput.*>
  type parser
  format /first regexp/
  suppress_parse_error_log true
  reserve_data yes
  types ...
</filter>
<filter nxinput.*>
  type parser
  format /second regexp/
  types ...
  suppress_parse_error_log true
  reserve_data yes
</filter>
...

Then, at least you won't have to retag the events every time.

Kiyoto

Angelo Carzis

unread,
Dec 23, 2014, 2:40:02 PM12/23/14
to flu...@googlegroups.com
With logstash, I had a series of grok match statements. From my perspective, it does not appear really much different to fluentd's parser statements.

I will consider your recommendations.


My script for fluentd is working fine with the exception of :

1. the fact that I can't convert one of my fields to a datetime elastisearch value (datatype).  
2. logstash is not generating the @timestamp field. I don't know why. There may be an issue with the elasticsearch plugin.



I have many different types of logs to process.Many (I would say almost all of them) of the logs require multi-line processing. 
When processing these type of logs, I use this configuration:

nxlog config:


<Extension multiline_test>
    Module xm_multiline
    HeaderLine /\[[0-9]{1,2}\/[0-9]{1,2}\/[0-9]{1,4} [0-9]{1,2}:[0-9]{1,2}:[0-9]{1,2}\] .*/
</Extension>

<Input in2>
Module im_file
        File "z:\logs\log201412*.log"
SavePos TRUE
InputType multiline_test
</Input>

<Processor t2>
Module pm_transformer
OutputFormat syslog_bsd
Exec $Message=(": "+$raw_event);
</Processor>

<Output out2>
Module om_tcp
Host 10.0.0.205 
Port 5141
</Output>

<Route r2>
Path in2 => t2 => out2
</Route>



fluentd config:

<source>
   type syslog
   port 5141
   protocol_type tcp
   tag nxloginput
</source>



The above works fine.


When I omit the pm_transformer module from nxlog (i.e. Path in2 => out2) and configure fluentd as follows:

<source>
   type tcp
   format none
   port 5141
   tag nxloginput
</source>

multiline processing is broken.

This is what I was trying to say in my previous post.









 

 

Kiyoto Tamura

unread,
Dec 23, 2014, 3:46:34 PM12/23/14
to flu...@googlegroups.com
Angelo-

I see. Ok, let's shelve the discussion about parser/filter and just focus on the elasticsearch/@timestamp part.

Can you replace


<match parse99.**>
    type elasticsearch
    host myhost
    port 9200
    index_name myindex
    type_name mytype
  # I tried both of these:
    # utc_index false
  # logstash_format true
</match>

with

<match parse99.**>
    type copy
    <store>
      type stdout
    </store>
    <store>
      type elasticsearch
      host myhost
      port 9200
      index_name myindex
      type_name mytype
    </store>
</match>

and share the logs, grepping for "parse99"? The goal is to inspect the events right before they were sent to Elasticsearch.

Thanks,

Kiyoito
Reply all
Reply to author
Forward
0 new messages