fluentd to Amazon Kinesis performance issues

1,314 views
Skip to first unread message

Urban Škudnik

unread,
Aug 21, 2014, 6:52:54 AM8/21/14
to flu...@googlegroups.com
Hey guys,

I've tried pushing data from fluentd to kinesis with the plugin and while sending data to Kinesis was achieved quite fast, the performance that I am seeing is very slow - on average I see roughly 5-10 data put record requests a second - when I launch ab -n 10000 test, the test itself finishes in about 10-15s, but fluentd uploads data for the next half an hour or so.

I'm not an ruby expert - far from it - but the way I'm looking at the code it seems as though write function (https://github.com/awslabs/aws-fluent-plugin-kinesis/blob/master/lib/fluent/plugin/out_kinesis.rb#L77) in the plugin that should output data puts messages up sequentially and in synchronous manner. Can somebody confirm this please?

Since the whole point of sending messages to Kinesis is to upload several thousand records a second this behaviour - if correct - is obviously inadequate. What would be best steps to resolve this? I don't mind helping with plugin, though, as said, I'm no ruby expert. As I understand, EventMachine for example wouldn't save my problems as it only accepts lots of connections and handles input in a synchronous manner. Maybe something along the lines of Celluloid (http://celluloid.io) which is based on actor model? Can something like that be implemented as a fluentd plugin? Should we develop a separate service (we're thinking in Scala - possibly with Java Async library from Amazon) that takes as an input data from fluentd and just uploads it (maybe something with out_exec? Didn't go through the entire list of plugins yet to see if there is something more appropriate, so if it is, I'm all ears :))?

Thanks for any help!

Cheers,
Urban 

Masahiro Nakagawa

unread,
Aug 21, 2014, 3:46:22 PM8/21/14
to flu...@googlegroups.com
Hi Urban,

The potential problem is Kinesis doesn't support batch insert yet.
In this result, a producer should call `put record` API for each event.
If you have 10000 events, needs 10000 API call. This is why fluent-plugin-kinesis is slow.
This is not only Fluentd issue. One my acquaintance stopped using Kinesis by this limitation for now.

I'm not sure how to improve this problem.
One idea is merging some events to one record to reduce API call.
But I don't know Ruby SDK supports such approach.

Maybe, your approach also relax this problem.
Kinesis is now good for many parallel requests model, not micro batch model.
So launching some Actors / Threads for sending events in the parallel or
writing specific service seems good for now.


Masahiro


--
You received this message because you are subscribed to the Google Groups "Fluentd Google Group" group.
To unsubscribe from this group and stop receiving emails from it, send an email to fluentd+u...@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

Urban Skudnik

unread,
Aug 21, 2014, 5:03:57 PM8/21/14
to flu...@googlegroups.com
Yeah, lack of batch insert at first wasn’t consider much of an issue on our end but it’s turning into a problem quite fast.

Merging events is not really an option for us since we then can’t split by appropriate partition key.

Will explore further, maybe something like parallel map (https://github.com/celluloid/celluloid/blob/master/examples/simple_pmap.rb) will help, but will see how fluentd behaves in such scenario.

Open for suggestions if anyone knows of a better approach :)
> You received this message because you are subscribed to a topic in the Google Groups "Fluentd Google Group" group.
> To unsubscribe from this topic, visit https://groups.google.com/d/topic/fluentd/l3hnAVHDAWM/unsubscribe.
> To unsubscribe from this group and all its topics, send an email to fluentd+u...@googlegroups.com.

Masahiro Nakagawa

unread,
Aug 21, 2014, 6:34:18 PM8/21/14
to flu...@googlegroups.com
Merging events is not really an option for us since we then can’t split by appropriate partition key.

Yeah. This is not pupular approach but one company in japan use this apparoach.

will see how fluentd behaves in such scenario.

Fluentd doesn't touch plugin internal.
You can use any rubygems in your plugin.


Kazuki Ohta

unread,
Aug 25, 2014, 5:26:07 AM8/25/14
to flu...@googlegroups.com
Maybe you can increase 'num_threads' parameter, to get more parallelism. However, batch insert is the ideal solution anyway. Thanks -K



==
Kazuki Ohta | Founder and CTO, Treasure Data
Follow us:   

yuta imai

unread,
Aug 27, 2014, 9:06:21 PM8/27/14
to flu...@googlegroups.com
Hi, sorry about my delayed involvement.
I'm one of the developer of the plugin.

Currently I'm working on multi-threaden the plugin.
Also I have considered about merging records on client side, however as Urban mentioned, it may cause some partition key issue. I think plugin should emit each individual record to Kinesis, not to make consumers complicated.

Regarding AWS SDK for Ruby, itself does not have kind of multh-thread or async put method.( Actually it has interface to implement it, it's like this: https://github.com/JoshMcKin/em_aws

Thanks
-Yuta


yuta imai
- 090-6927-9469
- http://worldspin.info

Urban Skudnik

unread,
Sep 1, 2014, 9:13:34 AM9/1/14
to flu...@googlegroups.com
Yeah, joining on the producer side and then splitting data on consumer side makes all operations more complicated (and possibly with higher resource consumption than using a couple more cores to upload data (see below).

Anyway, a few updates from my end - after getting rather abysmal performance even with actor implementation (about 100/s, with extremely high CPU/RAM usage - questionable if it could be used in long-term scenario, but I didn’t really do much optimising on this approach), I moved testing to Amazon and used c1.xlarge EC2 instance for testing (http://www.ec2instances.info/?filter=c1.xlarge) and performance boost was huge - all of this is from default plugin implementation where I was getting about 5 puts/s from my machine, not with my celluloid actor experiment.

First I started experimenting with num_threads parameter: http://imgur.com/JqsvmAH

From left to right: 1000, 1, 10, 100, 500, 2000

The increase of about 5k puts/s came after switching off dummer (https://github.com/sonots/dummer) that I used for performance testing - at that point fluentd was no longer ingesting any data and was just processing queue. In all instances I was pushing in 2k records a second.

While not completely obvious from this photo (if anyone interested I can upload detailed pics), the upload speed was highest and most consistent with num_threads=100.

I didn’t experiment with exact value of num_threads any further so maybe there could be some increase still.

So, num_threads=100 gave us about 450 puts/s at which point we got CPU bounded - network was basically sleeping (500KB or something?), RAM was also under no serious stress.

Next stop - multithread plugin. Started with quad core config, figured that machine can take some more, switched to 8-core configuration, each core was pushing a bit less then 450 puts/s (about 350?), so in total about 2700 puts/s.

So yeah, current fluentd to kinesis plugin can achieve quite good throughput (of course, if we/you can optimise, so much the better - Imai, if you need some help do tell), but for really fast upload from a single instance do make sure you are pushing data from EC2 (or be ready to have lots of serves pushing data).

Cheers,
Urban


On 28 Aug 2014, at 03:06, yuta imai <imai.f...@gmail.com> wrote:

> Hi, sorry about my delayed involvement.
> I'm one of the developer of the plugin.
>
> Currently I'm working on multi-threaden the plugin.
> Also I have considered about merging records on client side, however as Urban mentioned, it may cause some partition key issue. I think plugin should emit each individual record to Kinesis, not to make consumers complicated.
>
> Regarding AWS SDK for Ruby, itself does not have kind of multh-thread or async put method.( Actually it has interface to implement it, it's like this: https://github.com/JoshMcKin/em_aws )
>
> Thanks
> -Yuta
>
>
>
>
> 2014-08-25 18:25 GMT+09:00 Kazuki Ohta <k...@treasure-data.com>:
> Maybe you can increase 'num_threads' parameter, to get more parallelism. However, batch insert is the ideal solution anyway. Thanks -K
>
>
>
> ==
> Kazuki Ohta | Founder and CTO, Treasure Data
> Cell: +1-650-223-5679
> k...@treasure-data.com | www.treasuredata.com
> Follow us:
>
>
>

yuta imai

unread,
Sep 2, 2014, 12:36:15 AM9/2/14
to flu...@googlegroups.com
Hi Urban, thanks for your updates!

Yes, thanks you and Kazuki, also I have achieved much high TPS with num_threads.
I have tested with 1000TPS log stream at an EC2(c3.xlarge).

With 50 of num_threads, the number was about 600-700TPS.
With 50 of num_threads + 2 processes by DetachMultiProcessMixin, the number was about 1000TPS.

You can find the chart by CloudWatch here.

I haven't tested further TPS at this point of time, however currently I'm modifying code as follows:
1. Add DetachMultiProcessMixin support
2. Add guide for high TPS manner to README
3. Some other tiny fixes

If you find I'm missing something, please let me know.
Thanks!
-Yuta

Amit Pandey

unread,
Sep 2, 2014, 12:36:21 PM9/2/14
to flu...@googlegroups.com
Hi Urban,

I am also stuck with the fluentd-kinesis performance. Can you just share how you configured the fluentd-kinesis plugin for a quad-core or ocacore box.
Where did you get the multi thread plugin you are talking off? Can you please let me know this.

Urban Skudnik

unread,
Sep 2, 2014, 1:24:54 PM9/2/14
to flu...@googlegroups.com
(Amit, check below for configs, but do read everything as it’s wrong)

Huh, I guess I was being happy a bit too fast! At first I was benchmarking with ab and was just looking at peak load and how fast we can send data up (well, at least we know how fast we _can_ go) but today I tried benchmarking with constant loads (httperf --rate 50 --num-conns 30000 --server 127.0.1.13 --uri /json/base64-encoded-load) to see longer running performance and how fast does data get sent up and that’s when I noticed that numbers simply don’t add up.

http://i.imgur.com/V1K5TZg.png

Center and right graph peaks - the same command from before (httperf), first with one core, second with dual core.

One core:
In [21]: dps = [2242, 6124, 11389, 11731, 11401, 6438, 4098, 4008, 2569]
In [22]: sum(dps)
Out[22]: 60000

Two cores:
In [27]: dps2 = [612, 8611, 18506, 22989, 23143, 20239, 9603, 7691, 7917, 689]
In [28]: sum(dps2)
Out[28]: 120000

(The base64 encoded load that I’m pushing into our service contains two events, therefore two events are sent to Kinesis and that’s why 60k is what we expect in the first case.)

This leads me to the conclusion that while in_multiprocess indeed spawns multiple processes, in_tail apparently doesn’t share position file in a thread-safe way, meaning data is not uploaded in parallel but rather sent multiples times (each core sends it once). Is that expected behaviour? Did I mess something up with configuration?

Cheers,
Urban

———————————

Config files:

Master:
<source>
type multiprocess

<process>
cmdline -c /etc/td-agent/td-agent.conf
sleep_before_start 1s
sleep_before_shutdown 5s
</process>
<process>
cmdline -c /etc/td-agent/td-agent.conf
sleep_before_start 1s
sleep_before_shutdown 5s
</process>
</source>

------------------------------
Agent:

<source>
type tail
path /tracker-logs/tracker-events.events
pos_file /tracker-logs/events.log.pos
tag kinesis.eventtracker
read_from_head true
format json
refresh_interval 5
</source>

<match kinesis.eventtracker>
type copy
<store>
type flowcounter
count_keys *
unit second
tag fluentd.traffic
</store>

<store>
type kinesis

stream_name eventStream

aws_key_id key
aws_sec_key secret

region us-east-1
partition_key sessionId
num_threads 100
flush_interval 60s
</store>
</match>

<match fluentd.traffic>
type stdout
output_type json
</match>

Urban Skudnik

unread,
Sep 16, 2014, 12:51:44 PM9/16/14
to flu...@googlegroups.com
A bit late after I resolved the issue but maybe it helps someone in the future - an update on how I resolved the issue. If anyone knows of better solution or sees drawbacks to this one, I’m all ears :)

We use 3 (or more) fluentd instances. Configs at the end of email. In this case all the instances are on the same machine, obviously they could be separated. Haven’t benchmarked this beyond a point of our estimated load per machine (1k reqs/s/machine), but with c1.xlarge I would presume somewhere between 2-4k reqs/s to kinesis (from ec2 instances) shouldn’t be impossible.

1. Forwarder - A single fluentd instaces that serves as load balancer. We listen to HTTP requests and distribute the load with roundrobin plugin to multiple slave instances running on the same machine, each listening on a different port.

2. Master - just to start up the slave processes, this could also be done without it.

3. Slaves - listen to forwarder data (each on separate port) and send it to kinesis.

Nothing too fancy really :)

Question if anyone knows - is it better to use in_multiprocess plugin or should I start the slave processes up with one of the daemonizer tools? Any benefits/issues with either?

Cheers,
Urban

=======================================

Configs:

Forwarder:

<source>
type http
port 20000
bind 0.0.0.0
format json
</source>

<match kinesis.eventtracker>
type roundrobin

<store>
type forward
heartbeat_type tcp
flush_interval 5
<server>
host 0.0.0.0
port 20001
weight 50
</server>
</store>
<store>
type forward
heartbeat_type tcp
flush_interval 5
<server>
host 0.0.0.0
port 20002
weight 50
</server>
</store>
...
</match>

———————————————————————

Master - serves only to start up multiple instances:

<source>
type multiprocess

<process>
cmdline -c /root/fluentd-configs/slave-1.conf
sleep_before_start 1s
sleep_before_shutdown 5s
</process>
<process>
cmdline -c /root/fluentd-configs/slave-2.conf
sleep_before_start 1s
sleep_before_shutdown 5s
</process>
...
</source>

———————————————————————

Slave

slave-1.conf
<source>
type forward
port 20001 # (or 20001, 20002, …)
bind 0.0.0.0
tag kinesis.eventtracker
</source>


<match kinesis.eventtracker>
type kinesis

stream_name eventStream

aws_key_id key
aws_sec_key id

region us-east-1
#debug true
partition_key sessionId
#partition_key timestamp
num_threads 1000
flush_interval 60s
</match>
Reply all
Reply to author
Forward
0 new messages