Problem to convert Postrgesql json to elastic search json with fluent

120 views
Skip to first unread message

Thomas Haessle

unread,
Oct 26, 2016, 10:47:54 AM10/26/16
to Fluentd Google Group
Hi all,

I use Fluentd to replicate postgresql data to elasticsearch index.
I use 'fluent-plugin-postgres-replicator' plugin for INPUT and 'fluent-plugin-elasticsearch' plugin for OUTPUT

I have some problems to  convert JSON input field to JSON output

My SQL return return 3 text columns and 1 JSON column
If I feed ES with it, the JSON column is convert to TEXT

If I use FILTER type parser, it seems not work (but I have no logs even in debugs), I get no record in ES

If I use FILTER type record_transformer to parse INPUT JSON in Ruby JSON it also integrate TEXT in ES

I'm lost and need some help :)

In my exemple I have an activity table (id, name) with many sessions (date, status)

CONF : 

<source>
  type postgres_replicator
  host XXXX
  port XXXX
  username XXXX
  password XXXX
  database XXX
  sql SELECT "Activities"."id", "Activities"."name", json_object_agg("Sessions"."date", "Sessions"."status") as sessions FROM "Activities" INNER JOIN "Sessions" on "Activities"."id" = "Sessions"."activityId" GROUP BY "Activities"."id"
  primary_keys id
  interval 1m
  tag replicator.pipeline.Activities.${event}.${primary_keys}
</source>
<match replicator.pipeline.Activities.**>
  @type elasticsearch
  user XXXXX
  password XXXXX
  host XXXXX
  scheme https
  index_name MY_INDEX
  type_name MY_TYPE
  id_key id
  time_key updatedAt
  logstash_format false
  scheme https
  port 443
</match>


=> The JSON field is like that in ES : {"id":"a47fa0b1-9b52-11e6-bcbd-19f2b5872ee3","name":"Gym","sessions":"{ \"2017-10-26T08:03:13.116+00:00\" : \"open\", \"2017-09-10T17:03:13.116+00:00\" : \"open\" }"}

If I had a filter 
<filter replicator.pipeline.Activities.**>
  @type record_transformer
  enable_ruby
  <record>
    sessions ${ require 'json' ; JSON.parse(sessions) }
  </record>
</filter>

=> The JSON field is the same in ES : {"id":"a47fa0b1-9b52-11e6-bcbd-19f2b5872ee3","name":"Gym","sessions":"{\"2017-10-26T08:03:13.116+00:00\"=>\"open\", \"2017-09-10T17:03:13.116+00:00\"=>\"open\"}"}

If I do a pivot with ruby hash : 
<filter replicator.pipeline.Activities.**>
@type record_transformer
enable_ruby
<record>
   sessions ${require 'json'; JSON.parse(sessions.gsub(/[\"]/,'').split(', ').map{|h| h1,h2 = h.split(' : '); {h1 => h2}}.reduce(:merge))}
</record>
 
</filter>


Also not what I expect : {"id":"a47fa0b1-9b52-11e6-bcbd-19f2b5872ee3","name":"Gym","sessions":"{\"2017-10-26T08:03:13.116+00:00\"=>\"open\", \"2017-09-10T17:03:13.116+00:00\"=>\"open\"}"}

Any idea ?

Mr. Fiber

unread,
Oct 26, 2016, 1:32:48 PM10/26/16
to Fluentd Google Group
How about setting "auto_typecast true"?



Masahiro

--
You received this message because you are subscribed to the Google Groups "Fluentd Google Group" group.
To unsubscribe from this group and stop receiving emails from it, send an email to fluentd+unsubscribe@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

Mr. Fiber

unread,
Oct 26, 2016, 3:38:49 PM10/26/16
to Fluentd Google Group
BTW, I tested parser plugin and it worked with your log example.

2016-10-27 04:36:43 +0900 [info]: reading config file path="postgre_parser.conf"
2016-10-27 04:36:43 +0900 [info]: starting fluentd-0.12.29
2016-10-27 04:36:43 +0900 [info]: gem 'fluent-plugin-beats' version '0.1.1'
2016-10-27 04:36:43 +0900 [info]: gem 'fluent-plugin-elasticsearch' version '1.7.0'
2016-10-27 04:36:43 +0900 [info]: gem 'fluent-plugin-flowcounter-simple' version '0.0.4'
2016-10-27 04:36:43 +0900 [info]: gem 'fluent-plugin-parser' version '0.6.1'
2016-10-27 04:36:43 +0900 [info]: gem 'fluent-plugin-record-modifier' version '0.5.0'
2016-10-27 04:36:43 +0900 [info]: gem 'fluent-plugin-s3' version '0.7.1'
2016-10-27 04:36:43 +0900 [info]: gem 'fluent-plugin-secure-forward' version '0.4.2'
2016-10-27 04:36:43 +0900 [info]: gem 'fluent-plugin-td' version '0.10.29'
2016-10-27 04:36:43 +0900 [info]: gem 'fluentd' version '0.12.29'
2016-10-27 04:36:43 +0900 [info]: adding filter pattern="debug.*" type="parser"
2016-10-27 04:36:43 +0900 [info]: adding match pattern="debug.*" type="stdout"
2016-10-27 04:36:43 +0900 [info]: adding source type="forward"
2016-10-27 04:36:43 +0900 [info]: using configuration file: <ROOT>
  <source>
    @type forward
  </source>
  <filter debug.*>
    @type parser
    format json
    key_name sessions
  </filter>
  <match debug.*>
    @type stdout
  </match>
</ROOT>
2016-10-27 04:36:43 +0900 [info]: listening fluent socket on 0.0.0.0:24224
2016-10-27 04:37:07 +0900 debug.test: {"2017-10-26T08:03:13.116+00:00":"open","2017-09-10T17:03:13.116+00:00":"open"}
^C2016-10-27 04:37:16 +0900 [info]: shutting down fluentd
2016-10-27 04:37:16 +0900 [info]: shutting down input type="forward" plugin_id="object:3ff8c6c8c504"
2016-10-27 04:37:16 +0900 [info]: shutting down filter type="parser" plugin_id="object:3ff8c600cf68"
2016-10-27 04:37:16 +0900 [info]: shutting down output type="stdout" plugin_id="object:3ff8c6c11048"
2016-10-27 04:37:16 +0900 [info]: process finished code=0



Thomas Haessle

unread,
Oct 27, 2016, 9:10:38 AM10/27/16
to Fluentd Google Group
Thanks for your answers 

auto_typecast true change nothing.

I have a strange issue using "parser"
When I add
<filter replicator.pipeline.Activities.**>

  @type parser
  format json
  key_name sessions
</filter>

I have no match :S

Mr. Fiber

unread,
Oct 27, 2016, 7:19:57 PM10/27/16
to Fluentd Google Group
Does filter put after match?
Could you paste entire configuration?


Thomas Haessle

unread,
Nov 7, 2016, 9:14:54 AM11/7/16
to Fluentd Google Group

It's before match :)

I tried to alter the sessions field (which i don't succeed) but this way it's worked :
<source>
  type postgres_replicator
  host "#{ENV['POSGRESQL_HOST']}"
  port "#{ENV['POSGRESQL_PORT']}"
  username "#{ENV['POSGRESQL_USER']}"
  password "#{ENV['POSGRESQL_PASSWORD']}"
  database "#{ENV['POSGRESQL_DB']}"
  sql SELECT "Activities"."id", "Activities"."name", json_object_agg("Sessions"."date", "Sessions"."status") as rawSessions FROM "Activities" INNER JOIN "Sessions" on "Activities"."id" = "Sessions"."activityId" GROUP BY "Activities"."id"  primary_keys id

  interval 1m
  tag replicator.pipeline.Activities.${event}.${primary_keys}
</source>
<filter replicator.pipeline.Activities.**>
  @type parser
  format json
  key_name rawSessions
  hash_value_field sessions
  reserve_data true
</filter>
<match replicator.pipeline.Activities.**>
  @type stdout
</match>

Thanks @repeatdly for your help.
To unsubscribe from this group and stop receiving emails from it, send an email to fluentd+u...@googlegroups.com.

Thomas Haessle

unread,
Nov 7, 2016, 11:54:34 AM11/7/16
to Fluentd Google Group
The problem I had seems to be related to 'fluent-plugin-elasticsearch'
If I match to stdout, it's fine.

Then I replace stdout by elasticsearch (config below)
Fluentd write nothing in my index.
If I remove the first filter (type parser) : my index is feed.
Without filters my index is also feed (but json is map as string in ES)
No Error, no useful log in DEBUG mode ... Does anybody succeed to use this plugin with parser ?

<source>
  type postgres_replicator
  host "#{ENV['POSGRESQL_HOST']}"
  port "#{ENV['POSGRESQL_PORT']}"
  username "#{ENV['POSGRESQL_USER']}"
  password "#{ENV['POSGRESQL_PASSWORD']}"
  database "#{ENV['POSGRESQL_DB']}"
  sql SELECT "Activities"."id", "Activities"."name", json_object_agg("Sessions"."
date", "Sessions"."status") as rawSessions FROM "Activities" INNER JOIN "Sessions" on "Activities"."id" = "Sessions"."activityId" GROUP BY "Activities"."id"  primary_keys id
  primary_keys id
  interval 1m
  tag replicator.pipeline.Activities.${event}.${primary_keys}
</source>
<filter replicator.pipeline.Activities.**>
  @type parser
  format json
  key_name rawSessions
  hash_value_field sessions
  reserve_data true
</filter>
<filter replicator.pipeline.Activities.**>
  @type record_transformer
  remove_keys rawSessions
</filter>
<match replicator.pipeline.Activities.**>
  @type elasticsearch
  user  "#{ENV['ES_USER']}"
  password "#{ENV['ES_PASSWORD']}"
  host "#{ENV['ES_HOST']}"
  scheme https
  index_name myindex

  type_name activities

  id_key id
  time_key updatedAt
  logstash_format false
  scheme https
  port "#{ENV['ES_PORT']}"
</match>

Thomas Haessle

unread,
Nov 8, 2016, 5:48:22 AM11/8/16
to Fluentd Google Group
TO CLOSE.
My problem concerns only bonsai.io (ES 2.4 compatibility problem?)
I create an index on cloud.elastic.co and it works fine  (ES 5.0)

Mr. Fiber

unread,
Nov 8, 2016, 5:49:47 PM11/8/16
to Fluentd Google Group
Interesting.

My problem concerns only bonsai.io (ES 2.4 compatibility problem?)

Does this problem happen with only bonsai.io hosted elasticsearch?


To unsubscribe from this group and stop receiving emails from it, send an email to fluentd+unsubscribe@googlegroups.com.
Reply all
Reply to author
Forward
0 new messages