Data lost in tranquility + realtime indexing task

530 views
Skip to first unread message

Flowyi

unread,
Dec 14, 2014, 11:22:21 PM12/14/14
to druid-de...@googlegroups.com
Hi!

We read events from kafka, and using tranquility to push them to indexing services. It works well most of time, but in one or two hours, some events are lost(up to 50%).

Tranquility: v0.2.28
Druid: v0.6.160

Any help is appreciated.

I checked the tranquility log, first there are some zookeeper reconnect info:

2014-12-14 20:03:52,840 INFO  [main-SendThread(master.hadoop:2181)] org.apache.zookeeper.ClientCnxn - Client session timed out, have not heard from server in 26666ms for sess

ionid 0x24a3e96c73f0051, closing socket connection and attempting reconnect

2014-12-14 20:03:52,941 INFO  [main-EventThread] org.I0Itec.zkclient.ZkClient - zookeeper state changed (Disconnected)

2014-12-14 20:03:53,041 INFO  [main-SendThread(master.hadoop:2181)] org.apache.zookeeper.ClientCnxn - Client session timed out, have not heard from server in 26666ms for sess

ionid 0x14a3e96c6c1323b, closing socket connection and attempting reconnect

2014-12-14 20:03:53,065 INFO  [main-SendThread(192.168.3.18:2181)] org.apache.zookeeper.ClientCnxn - Opening socket connection to server 192.168.3.18/192.168.3.18:2181. Will

not attempt to authenticate using SASL (unknown error)

2014-12-14 20:03:53,065 INFO  [main-SendThread(192.168.3.18:2181)] org.apache.zookeeper.ClientCnxn - Socket connection established to 192.168.3.18/192.168.3.18:2181, initiati

ng session

2014-12-14 20:03:53,066 INFO  [main-SendThread(192.168.3.18:2181)] org.apache.zookeeper.ClientCnxn - Session establishment complete on server 192.168.3.18/192.168.3.18:2181,

sessionid = 0x24a3e96c73f0051, negotiated timeout = 40000

2014-12-14 20:03:53,066 INFO  [main-EventThread] org.I0Itec.zkclient.ZkClient - zookeeper state changed (SyncConnected)

2014-12-14 20:03:53,141 INFO  [main-EventThread] org.I0Itec.zkclient.ZkClient - zookeeper state changed (Disconnected)


And then, some updating instances:

2014-12-14 20:03:55,198 WARN  [Hashed wheel timer #1] com.metamx.tranquility.finagle.FutureRetry$ - Transient error, will try again in 455 ms               

com.twitter.finagle.NoBrokersAvailableException: No hosts are available for client druid:prod:overlord

        at com.twitter.finagle.NoStacktrace(Unknown Source)

2014-12-14 20:03:55,218 WARN  [Hashed wheel timer #1] com.metamx.tranquility.finagle.FutureRetry$ - Transient error, will try again in 350 ms               

com.twitter.finagle.NoBrokersAvailableException: No hosts are available for client druid:prod:overlord

        at com.twitter.finagle.NoStacktrace(Unknown Source)

2014-12-14 20:03:55,227 INFO  [ServiceCache-0] com.metamx.common.scala.net.finagle.DiscoResolver - Updating instances for service[druid:firehose:dsp_client-11-0001-0000] to S

et()

2014-12-14 20:03:55,235 INFO  [ServiceCache-0] com.metamx.common.scala.net.finagle.DiscoResolver - Updating instances for service[druid:firehose:dsp_client-11-0001-0000] to S

et(ServiceInstance{name='druid:firehose:dsp_client-11-0001-0000', id='21a284a5-be86-4fda-9630-c101d049350f', address='192.168.3.22', port=8087, sslPort=null, payload=null, re

gistrationTimeUTC=1418554812678, serviceType=DYNAMIC, uriSpec=null})

2014-12-14 20:03:55,256 INFO  [ServiceCache-0] com.metamx.common.scala.net.finagle.DiscoResolver - Updating instances for service[druid:firehose:dsp_client-11-0000-0000] to S

et()

2014-12-14 20:03:55,258 WARN  [Hashed wheel timer #1] com.metamx.tranquility.finagle.FutureRetry$ - Transient error, will try again in 551 ms



And finally, exceptions:


2014-12-14 20:03:56,164 WARN  [New I/O worker #28] com.metamx.tranquility.finagle.FutureRetry$ - Transient error, will try again in 134 ms

java.io.IOException: Unable to push events to task: index_realtime_dsp_client_2014-12-14T20:00:00.000+08:00_0_0_ngmmddka (status = TaskRunning)

        at com.metamx.tranquility.druid.DruidBeam$$anonfun$4$$anonfun$apply$2$$anonfun$apply$4$$anonfun$apply$5$$anonfun$apply$7$$anonfun$apply$9.apply(DruidBeam.scala:157)

        at com.metamx.tranquility.druid.DruidBeam$$anonfun$4$$anonfun$apply$2$$anonfun$apply$4$$anonfun$apply$5$$anonfun$apply$7$$anonfun$apply$9.apply(DruidBeam.scala:143)

        at com.twitter.util.Future$$anonfun$map$1$$anonfun$apply$4.apply(Future.scala:821)

        at com.twitter.util.Try$.apply(Try.scala:13)

        at com.twitter.util.Future$.apply(Future.scala:82)

        at com.twitter.util.Future$$anonfun$map$1.apply(Future.scala:821)

        at com.twitter.util.Future$$anonfun$map$1.apply(Future.scala:821)

        at com.twitter.util.Future$$anonfun$flatMap$1.apply(Future.scala:784)

        at com.twitter.util.Future$$anonfun$flatMap$1.apply(Future.scala:783)

        at com.twitter.util.Promise$Transformer.liftedTree1$1(Promise.scala:93)

        at com.twitter.util.Promise$Transformer.k(Promise.scala:93)

        at com.twitter.util.Promise$Transformer.apply(Promise.scala:102)

        at com.twitter.util.Promise$Transformer.apply(Promise.scala:84)

        at com.twitter.util.Promise$$anon$2.run(Promise.scala:324)

        at com.twitter.concurrent.LocalScheduler$Activation.run(Scheduler.scala:184)

        at com.twitter.concurrent.LocalScheduler$Activation.submit(Scheduler.scala:155)

        at com.twitter.concurrent.LocalScheduler.submit(Scheduler.scala:210)

        at com.twitter.concurrent.Scheduler$.submit(Scheduler.scala:84)

        at com.twitter.util.Promise.runq(Promise.scala:310)

        at com.twitter.util.Promise.updateIfEmpty(Promise.scala:605)

        at com.twitter.util.Promise.update(Promise.scala:583)

        at com.twitter.util.Promise.setValue(Promise.scala:559)

        at com.twitter.concurrent.AsyncQueue.offer(AsyncQueue.scala:76)

        at com.twitter.finagle.transport.ChannelTransport.handleUpstream(ChannelTransport.scala:45)

        at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564)

        at org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:791)

        at org.jboss.netty.handler.codec.http.HttpContentDecoder.messageReceived(HttpContentDecoder.java:108)

        at org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:70)

        at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564)

        at org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:791)

        at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:296)

        at org.jboss.netty.handler.codec.http.HttpChunkAggregator.messageReceived(HttpChunkAggregator.java:194)

        at org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:70)

        at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564)

        at org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:791)

        at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:296)

        at org.jboss.netty.handler.codec.frame.FrameDecoder.unfoldAndFireMessageReceived(FrameDecoder.java:459)

        at org.jboss.netty.handler.codec.replay.ReplayingDecoder.callDecode(ReplayingDecoder.java:536)

        at org.jboss.netty.handler.codec.replay.ReplayingDecoder.messageReceived(ReplayingDecoder.java:435)

        at org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:70)

        at org.jboss.netty.handler.codec.http.HttpClientCodec.handleUpstream(HttpClientCodec.java:92)

        at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564)

        at org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:791)

        at org.jboss.netty.channel.SimpleChannelHandler.messageReceived(SimpleChannelHandler.java:142)

        at com.twitter.finagle.channel.ChannelStatsHandler.messageReceived(ChannelStatsHandler.scala:81)

        at org.jboss.netty.channel.SimpleChannelHandler.handleUpstream(SimpleChannelHandler.java:88)

        at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564)

        at org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:791)

        at org.jboss.netty.channel.SimpleChannelHandler.messageReceived(SimpleChannelHandler.java:142)

        at com.twitter.finagle.channel.ChannelRequestStatsHandler.messageReceived(ChannelRequestStatsHandler.scala:35)

        at org.jboss.netty.channel.SimpleChannelHandler.handleUpstream(SimpleChannelHandler.java:88)

        at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564)

        at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:559)

        at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:268)

        at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:255)

        at org.jboss.netty.channel.socket.nio.NioWorker.read(NioWorker.java:88)

        at org.jboss.netty.channel.socket.nio.AbstractNioWorker.process(AbstractNioWorker.java:108)

        at org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:318)

        at org.jboss.netty.channel.socket.nio.AbstractNioWorker.run(AbstractNioWorker.java:89)

        at org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:178)

        at org.jboss.netty.util.ThreadRenamingRunnable.run(ThreadRenamingRunnable.java:108)

        at org.jboss.netty.util.internal.DeadLockProofWorker$1.run(DeadLockProofWorker.java:42)

        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)

        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)                                                                  

        at java.lang.Thread.run(Thread.java:745)

Caused by: com.twitter.finagle.NoBrokersAvailableException: No hosts are available for client druid:firehose:dsp_client-12-0000-0000                        

        at com.twitter.finagle.NoStacktrace(Unknown Source)




Flowyi

unread,
Dec 15, 2014, 3:28:22 AM12/15/14
to druid-de...@googlegroups.com
After digging into logs, I think I found the root cause: 

First, under the request of tranquility, overlord node assign task A to worker A at the beginning of hour. Then every thing works fine, until worker throw many exception like this:

2014-12-14 13:02:13,593 ERROR [CuratorFramework-0] org.apache.curator.ConnectionState - Connection timed out for connection string (192.168.3.16:2181,192.168.3.18:2181) and t
imeout (15000) / elapsed (16018)
org.apache.curator.CuratorConnectionLossException: KeeperErrorCode = ConnectionLoss
at org.apache.curator.ConnectionState.checkTimeouts(ConnectionState.java:198)
at org.apache.curator.ConnectionState.getZooKeeper(ConnectionState.java:88)
at org.apache.curator.CuratorZookeeperClient.getZooKeeper(CuratorZookeeperClient.java:115)
at org.apache.curator.framework.imps.CuratorFrameworkImpl.performBackgroundOperation(CuratorFrameworkImpl.java:793)
at org.apache.curator.framework.imps.CuratorFrameworkImpl.backgroundOperationsLoop(CuratorFrameworkImpl.java:779)
at org.apache.curator.framework.imps.CuratorFrameworkImpl.access$400(CuratorFrameworkImpl.java:58) at org.apache.curator.framework.imps.CuratorFrameworkImpl$4.call(CuratorFrameworkImpl.java:265)
at java.util.concurrent.FutureTask.run(FutureTask.java:262)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)

worker A cannot update its status on zookeeper, maybe overlord think it is failed, then reassign the task to worker B, that is why the old events on worker A are lost. 
Because I found task log on both worker A and B, and log of overlord also confirm my guess.

Gian Merlino

unread,
Dec 15, 2014, 11:05:44 AM12/15/14
to druid-de...@googlegroups.com
The indexing service is currently pretty sensitive to ZK disconnections. See https://github.com/metamx/druid/issues/709 for some more information; we plan to go into the indexing service in the future and address that and other issues. In the meantime you can try this strategy to keep things stable:

a) Use tranquility replication (.replicants(X) on your ClusteredBeamTuning) so you won't lose a segment worth of data if a single worker goes down. This is a reasonable thing to do even if you aren't having ZK disconnect issues, since a worker could always fail for some random reason.

b) Make sure your ZK cluster is running smoothly. Memory tunings should be good enough to prevent large GC pauses. If the ZK cluster has high load, SSDs are good, although we don't currently run with SSDs in production because our ZK load is not incredibly high.

c) Make sure your middle managers and overlord are running smoothly in terms of memory tunings as well.

d) Consider setting a higher ZK session timeout, like 30 or 45 seconds.

Idr Wa

unread,
Nov 22, 2015, 8:46:58 PM11/22/15
to Druid Development
it's not work when I use a and b 

在 2014年12月16日星期二 UTC+8上午12:05:44,Gian Merlino写道:
Reply all
Reply to author
Forward
0 new messages