Flume inconsistencies

Showing 1-14 of 14 messages
Flume inconsistencies Nicolas Maillard 6/13/13 4:42 AM
Hi everyone

I am currently using flume to bring mysql dumps from other machines to my hdfs cluster.
I have a spooling directory agent that monitors a specific directory and sends the dumps as avro sink to a second agent with an avro source and then writes to hdfs.
I am having a lot of stability issues.
If for some reason it loses the connection to the namenode it just keeps warning but never connects again. This can happen for example if the hdfs service is restarted.
A more serious concern is that some files seem to be sent, are deleted from the directory but in hdfs I only have a small portion of the file and no error messages or warnings.

I was wondering id flumeng is currently considered stable and should be used in production and If my issues are normal. I like the technology but I am having a really hard getting to work.
It usually starts of well everything looks fine and after a while an hour, a day it just crashes or starts having weird behaviours.

thanks fro your help and time
Re: Flume inconsistencies Will Briggs 6/13/13 4:56 AM

I'm new to flume and am having issues of my own, but I wonder if maybe sqoop is a better fit for your use case of extracting relational data into hdfs.

Re: Flume inconsistencies Nicolas Maillard 6/13/13 5:14 AM
Hi william

thanks for your reply
Sqoop might be a better fit for the mysql use case, allthough I have many more use cases.
I am also not convinced by sqoop currrently I found it to be not very reliable or easy to use.
But that being said I use flume to copy logs files and this mysql dump is just a special case but I get the same issues with any other file.


Re: Flume inconsistencies Nicolas Maillard 6/13/13 6:52 AM
Hello again

still trying to figure out this problem.
My initial file i'd like to move is about 300mb.
Flume starts pushing it out and after about 70mb just stops and does not do anything, the logs are silent on both agents.
I'm not sure what is going on, my hdfs files are not complete.

Thanks for any help

Here is my config files:
Agent 1 that reads and pushes:
a.sources = r1 
a.sinks = k1
a.channels = c1

# Describe/configure the source
a.sources.r1.type = spooldir
a.sources.r1.channels = c1
a.sources.r1.spoolDir = /flumedir/flume
a.sources.r1.fileHeader = true
a.sources.r1.fileHeaderKey = filename
a.sources.r1.deletePolicy = immediate

a.sinks.k1.type = avro
a.sinks.k1.channel = c1
a.sinks.k1.hostname = IPtoagent2
a.sinks.k1.port = 2112

# Use a channel which buffers events in memory
a.channels.c1.type = file
a.channels.c1.checkpointDir = /home/runner/flumetmp/checkpoint
a.channels.c1.dataDirs = /home/runner/flumetmp/data
a.channels.c1.capacity = 100000000

a.sources.r1t.interceptors = i1 i2 i3 i4 i5 i6
a.sources.r1.interceptors.i1.type = static
a.sources.r1.interceptors.i1.key = client
a.sources.r1.interceptors.i1.value = clientname
a.sources.r1.interceptors.i2.type = host
a.sources.r1.interceptors.i2.hostHeader = hostname
a.sources.r1.interceptors.i2.useIP = false
a.sources.r1.interceptors.i3.type = static
a.sources.r1.interceptors.i3.key = category
a.sources.r1.interceptors.i3.value = MTP
a.sources.r1.interceptors.i4.type = static
a.sources.1.interceptors.i4.key = subcategory
a.sources.r1.interceptors.i4.value = HITS
a.sources.r1.interceptors.i5.type = regex_extractor
a.sources.r1.interceptors.i5.regex = (\\d{4})-(\\d{2})-(\\d{2}) \\d{2}:\\d{2}:\\d{2}
a.sources.r1.interceptors.i5.serializers = s1 s2 s3
a.sources.r1.interceptors.i6.type = static
a.sources.r1.interceptors.i6.key = instance
a.sources.r1.interceptors.i6.value = instancename

Agent 2 that wrties to hdfs I'll leave out the interceptors that are static

gen.sources = genTail
gen.channels = genChannel
gen.sinks = genHdfs

# Describe/configure the source
gen.sources.genTail.type = avro
gen.sources.genTail.bind =
gen.sources.genTail.port = 2112
gen.sources.genTail.channels = genChannel

gen.channels.genChannel.type = file
gen.channels.genChannel.checkpointDir = /home/runner/flumetmp/checkpoint
gen.channels.genChannel.dataDirs = /home/runner/flumetmp/data
gen.channels.genChannel.capacity = 10000000

#Describe/configure the sink
gen.sinks.genHdfs.type = hdfs
gen.sinks.genHdfs.hdfs.rollInterval = 0
gen.sinks.genHdfs.hdfs.rollCount = 0
gen.sinks.genHdfs.hdfs.rollSize = 0
gen.sinks.genHdfs.hdfs.file.Format = Text
gen.sinks.genHdfs.hdfs.fileType = DataStream
gen.sinks.genHdfs.hdfs.path = /data/in/%{instance}/%{client}/%{category}/%{subcategory}/%{hostname}/%{year}/%{month}/%{day}/%{filename}
gen.sinks.genHdfs.hdfs.filePrefix = %{fileprefix}
gen.sinks.genHdfs.channel = genChannel
Re: Flume inconsistencies Nicolas Maillard 6/13/13 8:32 AM
Hi everyone

still working on this flume issue
As show in the agent configuration earlier I have tried to never roll the hdfs file and just keep writing to it.
I have set hdfs.batchSize=1000  does this mean that fmy file will have 1000 before beeing flushed to hdfs.
Once fluhed to hdfs since there is no roll what happens to the next batch? do you they get appended or is the file locked.

I can't make sense of this weird freezing behaviour. All of a sudden flume does not do anything at all anymore
Re: Flume inconsistencies Mike Percy 6/13/13 1:45 PM
Hi Nicholas,
I'd like to help out, can you please provide the following?

1. Version of CDH & Flume you are using
2. Please post a jstack dump of the Flume process when it appears to be hung. You can do something like sudo -u flume jstack <pid>
3. Please post the contents of your flume.log file with log level set to DEBUG

Regarding your questions:

1. Yes flume-ng is very stable overall, however the spooling directory source is a relatively new component so we should make sure you haven't found an undiscovered issue with it.
2. hdfs.batchSize = 1000 means the HDFS sink will try to write 1000 events at a time, however if it empties the channel then it will write whatever it has gotten in that batch (as long as it was able to read at least 1 event from the channel for this batch)
3. Flume will continue appending batches to the same file over time until it hits one of the rolling criteria, such as rollInterval, idleTimeout, rollCount, etc.



Re: Flume inconsistencies Helen Poon 6/13/13 2:09 PM
I am using flume-ng in product and found it to be very stable. 

The following few tips will help with flume-ng stability: 

- Use file channels.  In a unstable environment, file channel works better than memory channel.  When the sink can't consume the data because where it is sink to is down or slow, the data goes into local files.  The file channel capacity is tunable.  In the flume-ng log, you can see the queue size ..... number of events that is in the file channel waiting to be consumed by the sink.

- Use batching. For hdfs, you can set the following for example.  
fl_core.sinks.k1.hdfs.batchSize = 1000
fl_core.sinks.k1.hdfs.txnEventMax = 1000

- For hdfs sink, see that is your rolling rule, and don't expect the file to be fully usable or have the right size before it rolls. The file that is currently being sinked into has some indication in its name (ie ends with .tmp).  For example, I have the following rolling rule so I don't expect to see the data as soon as it is being picked up by flume-ng.  

# rolling every 6 hours or file reaches 3 blocks (128mb/block) whichever comes first
fl_core.sinks.k1.hdfs.rollInterval = 21600
fl_core.sinks.k1.hdfs.rollSize = 536870912
fl_core.sinks.k1.hdfs.rollCount = 0

- About the size of your hdfs file. So you are sinking to avro and then to hdfs, so there is some combining action as well as compression.  The ending file size will be different and smaller than the original.

Hope this helps.
Re: Flume inconsistencies Helen Poon 6/13/13 2:14 PM
Not sure if your hdfs file will roll because of your config: 

gen.sinks.genHdfs.hdfs.rollInterval = 0
gen.sinks.genHdfs.hdfs.rollCount = 0
gen.sinks.genHdfs.hdfs.rollSize = 0

You are saying never roll on time, event count nor size.  I am not sure what hdfs sink will do in this case.  Perhaps try to set the rollCount to 10 and lower your batch size to 10 just to see if you get the data you expected as a test.  But I recommend much bigger number for production use.  

On Thursday, June 13, 2013 7:42:28 AM UTC-4, nick wrote:
Re: Flume inconsistencies Nicolas Maillard 6/13/13 3:23 PM

thank you for all the help.
My use case is getting full files that is why I try to never roll and I let the hdfs path with %Y%m%d sort of naturally roll.
As a day goes by I start writing to a different directory based on the timestamp I thought this would roll my files.
Maybe I was wrong and it is a bad way to go about things.
My idea was to add the file name as a directory and this way actually be able to check the file pushed and the file recieved and just roll based on day and file name.
I Have added rolling based on idle time as well after 10 minutes of idle time.

Is there a better way of going about this use case?


Re: Flume inconsistencies Mike Percy 6/13/13 4:18 PM
Your approach seems alright.

I can help more if you can give more info (see my questions in my previous post)

Note that you should look @ the metrics using Cloudera Manager or JMX to find out how many events are in the channel at each hop.

Hope that helps,


Re: Flume inconsistencies Nicolas Maillard 6/17/13 1:19 AM
Hi Mike and Helen

Thanks for your time and help.
I am currently running on cloudera manager 4.6 and CDH4.3.
I have attached files with my log of agent1 and agent2 as well as a jstack.
and my properties.

my topology is:
Agent 1 on a machine with a spooling directory , a file channel of 10gb, avro sink.
Agent 2 on a remote machine avro source, a file channel of 10gb and hdfs sink
Agent1 finds files of about 400 mb, and pushes them to agent2 to store in hdfs.

To store with a pattern, I looks for dates in the events with a regextractor interceptor and also push the filename with the events.
my storing path is based on year,month,day,filename. My files have dates from may 2012 to may 2013, a years worth of data.
I have added, as suggested 
fl_core.sinks.k1.hdfs.rollInterval = 21600
fl_core.sinks.k1.hdfs.rollSize = 536870912
and an idletime to roll of 10 minutes.

What I see happening is that the agent1 starts pushing stuff I get directories for may and june 2012 and then everything stops ans stays quiet.
the agant2 in charge of writing says:
checking process:FileChannel genChannel { dataDirs: [/home/runner/flumetmp/data] } supervisoree:{ status:{ lastSeen:1371218255089 lastSeenState:START desiredState:START firstSeen:1371202195869 failures:0 discard:false error:false } policy:org.apache.flume.lifecycle.LifecycleSupervisor$SupervisorPolicy$AlwaysRestartPolicy@69b01afa }
14 Jun 2013 15:57:38,090 DEBUG [lifecycleSupervisor-1-0] (org.apache.flume.lifecycle.LifecycleSupervisor$MonitorRunnable.run:307)  - Status check complete

And agent1 does not say anything. So I'm not really sure what is going on.
Another situation is one where agent1 says 
java.lang.IllegalStateException: Serializer has been closed 
       at org.apache.flume.serialization.LineDeserializer.ensureOpen(LineDeserializer.java:124) 
       at org.apache.flume.serialization.LineDeserializer.readEvents(LineDeserializer.java:88) 
       at org.apache.flume.client.avro.ReliableSpoolingFileEventReader.readEvents(ReliableSpoolingFileEventReader.java:221) 
       at org.apache.flume.source.SpoolDirectorySource$SpoolDirectoryRunnable.run(SpoolDirectorySource.java:154) 
       at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:441) 
       at java.util.concurrent.FutureTask$Sync.innerRunAndReset(FutureTask.java:317) 
       at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:150) 
       at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$101(ScheduledThreadPoolExecutor.java:98) 
       at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.runPeriodic(ScheduledThreadPoolExecutor.java:180) 
       at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:204) 
       at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886) 
       at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908) 
       at java.lang.Thread.run(Thread.java:662)
Thank you for your help and time.


Re: Flume inconsistencies Helen Poon 6/17/13 8:09 AM
Hi Nick;

I don't see where regextractor interceptor is being used.  regex interceptor in your case should be in the avro source of agent2 and here is a sample of 
getting timestamp from event body with regex into event header with a name of "timestamp".  Later on in your sink, you can use all pieces of the timestamp in the hdfs path. 
agent.sources.r1.interceptors.i1.regex = ^(?:\\n)?(\\d\\d\\d\\d-\\d\\d-\\d\\d\\s\\d\\d:\\d\\d)
agent.sources.r1.interceptors.i1.serializers = s1
agent.sources.r1.interceptors.i1.serializers.s1.type = org.apache.flume.interceptor.RegexExtractorInterceptorMillisSerializer
agent.sources.r1.interceptors.i1.serializers.s1.name = timestamp
agent.sources.r1.interceptors.i1.serializers.s1.pattern = yyyy-MM-dd HH:mm

I also see many static interceptors in the avro source. Is that what you want?

Here is doc on static interceptor:

"Static interceptor allows user to append a static header with static value to all events.

The current implementation does not allow specifying multiple headers at one time. Instead user might chain multiple static interceptors each defining one static header. "

I see you also need the hostname, category , etc.  So you might be wise to just code up a custom interceptor which will get the right pieces of information from your event and put them in event headers... you have total control this way.

Re: Flume inconsistencies Nicolas Maillard 6/17/13 9:14 AM
hello helen

Thanks for the advice, I was actually contemplating making a couple of custom interceptors. But as this is a proof of concept I wanted to use standard available parameters. So I have a lot of static interceptors to drive the HDFS path. My feeding agents aka agent1 will give me values such as category,client hostname etc... this will make my hdfs path: /flume/client/category/hostname/yyyy/mm/dd
the hostname will be sent from the agent1 so I can now what agent and distant machine is responsible fot which log, to be able to troubleshoot later on.
The regexextractor is agent one, I extract year,month,day from every event line since my files span over multiple days, in this case one whole year.

On the agent2 side I actually recreate these interceptors if they don't exist and give them a default value, this should rarely happen but this way I can also find rapidly files or entries that are not good since they will have default values.

This all seems to work pretty well allthough there might be a better approach. What is really concerning is that for larger files like 400mb in spooling directory the flume agent1 and 2 start then stay alive but sending anything they kind of just freeze and seem to not die but not do anything. Sometimes agnt1 also fires up a lot of serializer errors ans stops pushing anything.
On a log tail this has been working fine for 2 weeks but for the spooling on larger files I can not get it to work.

thank you for your help 

You received this message because you are subscribed to the Google Groups "CDH Users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to cdh-user+u...@cloudera.org.
For more options, visit https://groups.google.com/a/cloudera.org/groups/opt_out.

Re: Flume inconsistencies Mike Percy 6/17/13 12:03 PM
Hi Nicolas,
My best guess is that you have an invalid Unicode character in your file. If you want to get a better idea of at what point the problem exists, there should be a file called .flumespool-main.meta in your spooldirectory which you can read with avro-tools.jar. You can download avro tools from here: http://mirror.cc.columbia.edu/pub/software/apache/avro/avro-1.7.4/java/avro-tools-1.7.4.jar

Then you just run java -jar avro-tools-1.7.4.jar tojson .flumespool-main.meta

This should give you an idea of where in the file it's hitting a decoding issue.

This is likely a problem with "non pristine" data. The workaround is to ensure that the inputCharset is correctly set and also ensure that the data is valid for the specified charset.