corrupted files in deep storage

74 views
Skip to first unread message

Tommaso Parisi

unread,
Sep 22, 2016, 11:19:45 AM9/22/16
to Druid User
Hello,
  I have a druid cluster with druid.storage.type=local  and storageDirectory pointing to a nfs path. I have two data servers with historical, middleManager and broker nodes and two master servers with coordinator and overlord nodes

Ingestion is done with a kafka supervisor with taskCount=2 and replicas=2

Every once in a while a receive an error from the historical nodes complaining that they can't load a segment because the zip file is corrupted (see log below).

I am wondering if it is a bug, or I am forced to use another kind of deep storage, because nfs does not guarantee that there are no concurrent writes on the same file from different replicas. Could it be that there is not a lock mechanism to avoid that two replicas write on the same file on nfs?

Here is the error that I find in the historical logs

2016-09-21T23:00:04,582 ERROR [ZkCoordinator-0] io.druid.server.coordination.ZkCoordinator - Failed to load segment for dataSource: {class=io.druid.server.coordination.ZkCoordinator, exceptionType=class io.druid.segment.loading.SegmentLoadingException, exceptionMessage=Exception loading segment[content20stats_2016-09-12T00:00:00.000Z_2016-09-13T00:00:00.000Z_2016-09-12T00:00:00.136Z_1], segment=DataSegment{size=377662, shardSpec=NumberedShardSpec{partitionNum=1, partitions=0}, metrics=[count, impressions, clicks, boximpressions, totstaytime, fblike, fbshare, fbcomment, twcount, searchres], dimensions=[a_attrs, a_boxes_ctr_id, a_boxes_id, n_boximpression, n_breakpoint, n_click, n_doc_type, n_fbcomment, n_fblike, n_fbshare, n_gplus, n_impression, n_info, n_mappa, n_searchno, n_staytime, n_twcount, s_area, s_box, s_cat1, s_cat2, s_cat3, s_dest_id, s_doc_id, s_domain, s_link_type, s_pag_id, s_page, s_ref_host, s_ref_path, s_search, s_ua], version='2016-09-12T00:00:00.136Z', loadSpec={type=local, path=/data/druid/content/deep-storage/content20stats/2016-09-12T00:00:00.000Z_2016-09-13T00:00:00.000Z/2016-09-12T00:00:00.136Z/1/index.zip}, interval=2016-09-12T00:00:00.000Z/2016-09-13T00:00:00.000Z, dataSource='content20stats', binaryVersion='9'}}
io.druid.segment.loading.SegmentLoadingException: Exception loading segment[content20stats_2016-09-12T00:00:00.000Z_2016-09-13T00:00:00.000Z_2016-09-12T00:00:00.136Z_1]
        at io.druid.server.coordination.ZkCoordinator.loadSegment(ZkCoordinator.java:309) ~[druid-server-0.9.1.1.jar:0.9.1.1]
        at io.druid.server.coordination.ZkCoordinator.addSegment(ZkCoordinator.java:350) [druid-server-0.9.1.1.jar:0.9.1.1]
        at io.druid.server.coordination.SegmentChangeRequestLoad.go(SegmentChangeRequestLoad.java:44) [druid-server-0.9.1.1.jar:0.9.1.1]
        at io.druid.server.coordination.ZkCoordinator$1.childEvent(ZkCoordinator.java:152) [druid-server-0.9.1.1.jar:0.9.1.1]
        at org.apache.curator.framework.recipes.cache.PathChildrenCache$5.apply(PathChildrenCache.java:522) [curator-recipes-2.10.0.jar:?]
        at org.apache.curator.framework.recipes.cache.PathChildrenCache$5.apply(PathChildrenCache.java:516) [curator-recipes-2.10.0.jar:?]
        at org.apache.curator.framework.listen.ListenerContainer$1.run(ListenerContainer.java:93) [curator-framework-2.10.0.jar:?]
        at com.google.common.util.concurrent.MoreExecutors$SameThreadExecutorService.execute(MoreExecutors.java:297) [guava-16.0.1.jar:?]
        at org.apache.curator.framework.listen.ListenerContainer.forEach(ListenerContainer.java:85) [curator-framework-2.10.0.jar:?]
        at org.apache.curator.framework.recipes.cache.PathChildrenCache.callListeners(PathChildrenCache.java:514) [curator-recipes-2.10.0.jar:?]
        at org.apache.curator.framework.recipes.cache.EventOperation.invoke(EventOperation.java:35) [curator-recipes-2.10.0.jar:?]
        at org.apache.curator.framework.recipes.cache.PathChildrenCache$9.run(PathChildrenCache.java:772) [curator-recipes-2.10.0.jar:?]
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [?:1.8.0_101]
        at java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_101]
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [?:1.8.0_101]
        at java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_101]
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) [?:1.8.0_101]
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [?:1.8.0_101]
        at java.lang.Thread.run(Thread.java:745) [?:1.8.0_101]
Caused by: java.lang.RuntimeException: java.util.zip.ZipException: invalid entry size (expected 376503 but got 376409 bytes)
        at com.google.common.base.Throwables.propagate(Throwables.java:160) ~[guava-16.0.1.jar:?]
        at com.metamx.common.CompressionUtils.unzip(CompressionUtils.java:146) ~[java-util-0.27.9.jar:?]
        at io.druid.segment.loading.LocalDataSegmentPuller.getSegmentFiles(LocalDataSegmentPuller.java:162) ~[druid-server-0.9.1.1.jar:0.9.1.1]
        at io.druid.segment.loading.LocalLoadSpec.loadSegment(LocalLoadSpec.java:64) ~[druid-server-0.9.1.1.jar:0.9.1.1]
        at io.druid.segment.loading.SegmentLoaderLocalCacheManager.getSegmentFiles(SegmentLoaderLocalCacheManager.java:143) ~[druid-server-0.9.1.1.jar:0.9.1.1]
        at io.druid.segment.loading.SegmentLoaderLocalCacheManager.getSegment(SegmentLoaderLocalCacheManager.java:95) ~[druid-server-0.9.1.1.jar:0.9.1.1]
        at io.druid.server.coordination.ServerManager.loadSegment(ServerManager.java:152) ~[druid-server-0.9.1.1.jar:0.9.1.1]
        at io.druid.server.coordination.ZkCoordinator.loadSegment(ZkCoordinator.java:305) ~[druid-server-0.9.1.1.jar:0.9.1.1]
        ... 18 more
Caused by: java.util.zip.ZipException: invalid entry size (expected 376503 but got 376409 bytes)
        at java.util.zip.ZipInputStream.readEnd(ZipInputStream.java:384) ~[?:1.8.0_101]
        at java.util.zip.ZipInputStream.read(ZipInputStream.java:196) ~[?:1.8.0_101]
        at java.io.FilterInputStream.read(FilterInputStream.java:107) ~[?:1.8.0_101]
        at com.google.common.io.ByteStreams.copy(ByteStreams.java:175) ~[guava-16.0.1.jar:?]
        at com.google.common.io.ByteSink.writeFrom(ByteSink.java:139) ~[guava-16.0.1.jar:?]
        at com.metamx.common.CompressionUtils.unzip(CompressionUtils.java:248) ~[java-util-0.27.9.jar:?]
        at com.metamx.common.CompressionUtils$1.call(CompressionUtils.java:138) ~[java-util-0.27.9.jar:?]
        at com.metamx.common.CompressionUtils$1.call(CompressionUtils.java:134) ~[java-util-0.27.9.jar:?]
        at com.metamx.common.RetryUtils.retry(RetryUtils.java:60) ~[java-util-0.27.9.jar:?]
        at com.metamx.common.RetryUtils.retry(RetryUtils.java:78) ~[java-util-0.27.9.jar:?]
        at com.metamx.common.CompressionUtils.unzip(CompressionUtils.java:132) ~[java-util-0.27.9.jar:?]
        at io.druid.segment.loading.LocalDataSegmentPuller.getSegmentFiles(LocalDataSegmentPuller.java:162) ~[druid-server-0.9.1.1.jar:0.9.1.1]
        at io.druid.segment.loading.LocalLoadSpec.loadSegment(LocalLoadSpec.java:64) ~[druid-server-0.9.1.1.jar:0.9.1.1]
        at io.druid.segment.loading.SegmentLoaderLocalCacheManager.getSegmentFiles(SegmentLoaderLocalCacheManager.java:143) ~[druid-server-0.9.1.1.jar:0.9.1.1]
        at io.druid.segment.loading.SegmentLoaderLocalCacheManager.getSegment(SegmentLoaderLocalCacheManager.java:95) ~[druid-server-0.9.1.1.jar:0.9.1.1]
        at io.druid.server.coordination.ServerManager.loadSegment(ServerManager.java:152) ~[druid-server-0.9.1.1.jar:0.9.1.1]
        at io.druid.server.coordination.ZkCoordinator.loadSegment(ZkCoordinator.java:305) ~[druid-server-0.9.1.1.jar:0.9.1.1]
        ... 18 more



Regards,
Tommaso

Gian Merlino

unread,
Sep 22, 2016, 12:46:50 PM9/22/16
to druid...@googlegroups.com
Hey Tommaso,

Your guess sounds likely. For a workaround you could try using another deep storage or setting task.replicas = 1 to prevent concurrent writes.

In the long run I think this would fix the problem: https://github.com/druid-io/druid/issues/3493

Gian

--
You received this message because you are subscribed to the Google Groups "Druid User" group.
To unsubscribe from this group and stop receiving emails from it, send an email to druid-user+unsubscribe@googlegroups.com.
To post to this group, send email to druid...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/druid-user/a72a39c4-a071-4634-a404-fb7efde66552%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

Tommaso Parisi

unread,
Sep 23, 2016, 4:47:46 AM9/23/16
to Druid User
Thanks Gian. I have set task.replicas = 1 and everything seems to be working fine.

Now that I think about it, maybe I didn't need more than one replica. I have two data servers. If one fails, the overlord after a timeout should become aware of it and spawn a new task for the given partition on the other server.
I will probably loose only the data that the first peon (the one that was running on the crashed server) has already fetched from kafka but has not yet pushed to deep-storage. If I have taskDuration=PT1H I will loose at most an hour of data.

Is this assumption correct?

Regards,
Tommaso

Gian

To unsubscribe from this group and stop receiving emails from it, send an email to druid-user+...@googlegroups.com.

Gian Merlino

unread,
Sep 23, 2016, 1:05:27 PM9/23/16
to druid...@googlegroups.com
With the Kafka indexing service, you actually won't lose any data permanently. If a server fails, Druid will relaunch its tasks on another server and that will re-read the same data from Kafka. With replicas = 1 your risk is just that the data will be unavailable for querying for some time, until it can be re-read from Kafka.

Gian

To unsubscribe from this group and stop receiving emails from it, send an email to druid-user+unsubscribe@googlegroups.com.

To post to this group, send email to druid...@googlegroups.com.
Reply all
Reply to author
Forward
0 new messages