Data disappear after some using shared NAS

95 views
Skip to first unread message

Maurizio Gallelli

unread,
Mar 23, 2015, 11:59:46 AM3/23/15
to druid-de...@googlegroups.com
Hi,
I'm trying to setup a Druid cluster. It's setup on some virtual server and has a NAS as storage, a realtime process ingest from Kafka8 rows.
On Friday I've loaded some data and I was able to query it, Saturday I was able to query again and looking at Druid console segments were located on Realtime node but today doing the same query result is empty and console doesn't show these segments.

Into the nas I'm able to find these folders but inside are empty:
2015-03-20T01:00:00.000Z_2015-03-20T02:00:00.000Z
2015-03-23T02:00:00.000Z_2015-03-23T03:00:00.000Z
2015-03-23T01:00:00.000Z_2015-03-23T02:00:00.000Z
2015-03-23T03:00:00.000Z_2015-03-23T04:00:00.000Z



Here some of the configurations
common.runtime.properties
# Extensions
druid.extensions.coordinates=["io.druid.extensions:druid-kafka-eight","io.druid.extensions:mysql-metadata-storage"]

# Zookeeper
druid.zk.service.host=10.200.6.60:2181,10.200.6.70:2181
druid.zk.paths.base=/druid

# Metadata Storage (mysql)
druid.metadata.storage.type=mysql
druid.metadata.storage.connector.connectURI=jdbc\:mysql\://10.200.6.5\:3306/druid
druid.metadata.storage.connector.user=druid
druid.metadata.storage.connector.password=druid

# Deep storage (local filesystem for examples - don't use this in production)
druid.storage.type=local
druid.storage.storage.storageDirectory=/nas/druid/dataStorage

# Query Cache (we use a simple 10mb heap-based local cache on the broker)
druid.cache.type=local
druid.cache.sizeInBytes=10000000

# Indexing service discovery
druid.selectors.indexing.serviceName=overlord

# Monitoring (disabled for examples)
# druid.monitoring.emissionPeriod = PT5m
# druid.monitoring.monitors=["com.metamx.metrics.SysMonitor","com.metamx.metrics.JvmMonitor"]

# Metrics logging (disabled for examples)
druid.emitter=logging
druid.emitter.logging.loggerClass=LoggingEmitter
druid.emitter.logging.logLevel=debug

realtime.spec file:
[
{
    "dataSchema" : {
      "dataSource" : "buck_bidding",
      "parser" : {
        "type" : "string",
        "parseSpec" : {
          "format" : "json",
          "timestampSpec" : {
            "column" : "insert_datetime",
            "format" : "yyyy-MM-dd HH:mm:ss"
          },
          "dimensionsSpec" : {
            "dimensions": ["bid","win","click","convert","exchange_name","creative_id","device_country","device_make","device_model","device_os","device_osv","device_type","carrierid","bid_price","win_price","company_price","advertiser_price","pub_id","pub_name","place_id","place_name","domain","categories","exchange_id","campaign_id","offer_id","advertiser_id","device_country_id","device_make_id","device_model_id","device_os_id","device_osv_id","device_type_id","carrierid_id","banner_width","banner_heigth","supply_type","session_bid_id"],
            "dimensionExclusions" : [],
            "spatialDimensions" : []
          }
        }
      },
      "metricsSpec" : [{
     "type" : "count",
     "name" : "count"
      },{
     "type" : "doubleSum",
     "name" : "bid_spent",
     "fieldName": "bid_price"
      },{
      "type": "longSum",
      "name" : "bid_response",
      "fieldName" : "bid"
      },{
      "type": "longSum",
      "name" : "win",
      "fieldName" : "win"
      }
     ],
      "granularitySpec" : {
        "type" : "uniform",
        "segmentGranularity" : "hour",
        "queryGranularity" : "NONE"
      }
    },
    "ioConfig" : {
      "type" : "realtime",
      "firehose": {
        "type": "kafka-0.8",
        "consumerProps": {
          "zookeeper.connect": "10.200.6.60:2181,10.200.6.70:2181",
          "zookeeper.connection.timeout.ms" : "15000",
          "zookeeper.session.timeout.ms" : "15000",
          "zookeeper.sync.time.ms" : "5000",
          "group.id": "druid",
          "fetch.message.max.bytes" : "1048586",
          "auto.offset.reset": "largest",
          "auto.commit.enable": "true"
        },
        "feed": "buck_bidding"
      },
      "plumber": {
        "type": "realtime"
      }
    },
    "tuningConfig": {
      "type" : "realtime",
      "maxRowsInMemory": 500000,
      "intermediatePersistPeriod": "PT10m",
      "windowPeriod": "PT2h",
      "basePersistDirectory": "\/nas\/druid\/dataStorage",
      "rejectionPolicy": {
        "type": "messageTime"
      }
    }
  }
]

Realtime conf:
druid.host=10.200.6.64
druid.service=realtime
druid.port=8083

druid.processing.buffer.sizeBytes=100000000
druid.processing.numThreads=3

druid.server.tier=tats_tier

druid.monitoring.monitors=["io.druid.segment.realtime.RealtimeMetricsMonitor"]

druid.realtime.specFile=/usr/local/druid/config/realtime/bucksense_new_realtime.spec

Overlord conf:
druid.service=overlord
druid.host=10.200.6.60
druid.port=8087

# Task Log Module (Overlord and MiddleManager node)
druid.indexer.logs.type=file
druid.indexer.logs.directory=/usr/local/druid/logs/


druid.indexer.queue.startDelay=PT1M

druid.indexer.runner.type=remote
druid.indexer.storage.type=metadata

Middle conf:
druid.service=middleManager
druid.host=10.200.6.62
druid.port=8091

# Task Log Module (Overlord and MiddleManager node)
druid.indexer.logs.type=file
druid.indexer.logs.directory=/usr/local/druid/logs/

# Middle Manager Configuration
druid.worker.ip=10.200.6.62
druid.worker.capacity=1

# Peon Configuration
druid.indexer.runner.javaOpts="-server -Xmx3g -XX:+UseGxi2GC -XX:MaxGCPauseMillis=100 -XX:+PrintGCDetails -XX:+PrintGCTimeStamps"
druid.indexer.task.chathandler.type=announce
druid.indexer.task.baseTaskDir=/nas/druid/task

druid.indexer.fork.property.druid.computation.buffer.size=536870912
druid.indexer.fork.property.druid.processing.numThreads=3
druid.indexer.fork.property.druid.request.logging.type=file
druid.indexer.fork.property.druid.request.logging.dir=/usr/local/druid/logs
druid.indexer.fork.property.druid.segmentCache.locations=[{"path": "/nas/druid/indexCache", "maxSize": 0}]
druid.indexer.fork.property.druid.server.http.numThreads=50

Broker conf:
druid.host=10.200.6.62
druid.service=broker
druid.port=8080


# Druid Processing Module (Historical, Realtime, and Broker nodes)
druid.processing.numThreads=7
druid.processing.buffer.sizeBytes=100000000

# Queryable Module (Historical, Realtime, and Broker nodes)
druid.request.logging.type=file
druid.request.logging.dir=/usr/local/druid/logs

Historical conf:
druid.host=10.200.6.70
druid.port=8081
druid.service=historical

# Druid Processing Module (Historical, Realtime, and Broker nodes)
druid.processing.numThreads=2
druid.processing.buffer.sizeBytes=536870912

druid.historical.cache.useCache=true
druid.historical.cache.populateCache=true

druid.server.maxSize=10000000000
druid.server.tier=tats_tier
druid.server.priority=50
druid.segmentCache.locations=[{"path": "/nas/druid/indexCache", "maxSize": 10000000000}]


Can someone help me on understanding what's happening and what's wrong in my configuration.

Thanks
Maurizio

Fangjin Yang

unread,
Mar 23, 2015, 1:55:28 PM3/23/15
to druid-de...@googlegroups.com
Hi Maurizio, a few questions. When you are querying for your segments, are you querying the realtime node directly or are you querying through the broker? Looking at the coordinator console, are the segments located anywhere else or gone all together from the console? Do you see any exceptions in the logs of your realtime nodes?

As an aside, I notice you are using the messageTime rejection Policy. It should be noted that the realtime ingestion mechanism in its current state is not particularly great for ingesting historical (or batch) data and it much more suited for current time data. We have some proposals out about reworking this: https://groups.google.com/forum/#!searchin/druid-development/proposal$20windowperiod/druid-development/kHgHTgqKFlQ/fXvtsNxWzlMJ.

Maurizio Gallelli

unread,
Mar 23, 2015, 3:55:18 PM3/23/15
to druid-de...@googlegroups.com

Hi Fangjin,
I'm doing queries to the Broker module.
Into Coordinator console I can see all segments belongs to Realtime server but the one I've submitted on Friday are anymore there.

I've also noticed that segments are not stored on Mysql druid_segments table, can it be an issue?

I didn't find any error on Realtime logs, did you have any specific suggestion I can search for?

I've set messageTime because I'm currently simulating my traffic so timestamp doesn't match with server one.
Did you think it can generate some kind of issue on data movement from Realtime to Historical?

Just to be sure, setting the storageDirectory to the same path for both Realtime and Historical, is it the right thing?

Thanks
Maurizio

sent by Nexus

--
You received this message because you are subscribed to a topic in the Google Groups "Druid Development" group.
To unsubscribe from this topic, visit https://groups.google.com/d/topic/druid-development/6VD98RDgsZI/unsubscribe.
To unsubscribe from this group and all its topics, send an email to druid-developm...@googlegroups.com.
To post to this group, send email to druid-de...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/druid-development/d94fcd9c-c5e7-4466-8cb8-dc2fd1964980%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

Fangjin Yang

unread,
Mar 23, 2015, 11:04:44 PM3/23/15
to druid-de...@googlegroups.com
Hi Maurizio, please see inline.


On Monday, March 23, 2015 at 12:55:18 PM UTC-7, Maurizio Gallelli wrote:

Hi Fangjin,
I'm doing queries to the Broker module.
Into Coordinator console I can see all segments belongs to Realtime server but the one I've submitted on Friday are anymore there.

Are there any processes that may delete from your storage at all? In general, the realtime logic should try to recover any segments that have not been handed off as long as disk is not lost and attempt to hand those segments off. The fact that you have directories which are empty is interesting to me, as nothing should be deleted from the realtime logic as long as handoff has not occurred. I wonder, are you always reingesting the same set of events from Kafka? Is there a continuous stream of messages?
 

I've also noticed that segments are not stored on Mysql druid_segments table, can it be an issue?

I don't think your handoff is working.
 

I didn't find any error on Realtime logs, did you have any specific suggestion I can search for?

Exceptions or warnings would help.
 

I've set messageTime because I'm currently simulating my traffic so timestamp doesn't match with server one.
Did you think it can generate some kind of issue on data movement from Realtime to Historical?

Using messageTime rejection Policy, handoff will not occur unless you have a constant stream of events. For this rejection Policy, handoff occurs after segmentGranularity + windowPeriod and additional events are seen. I don't know anyone using this rejection policy in production. You can read more about it here: http://druid.io/docs/latest/Realtime-ingestion.html
 

Just to be sure, setting the storageDirectory to the same path for both Realtime and Historical, is it the right thing?

No. The handoff logic is as follows:

1. Realtime uploads the finalized segment to deep storage.
2. Realtime writes an entry to the metadata store (mysql) indicating there is a new segment and where the segment is located in deep storage.
3. The coordinator notices a new entry in the metadata store
4. The coordinator creates a ZK entry indicating a historical should download a new segment
5. A historical locally downloads the segment from deep storage and serves the segment.
6. The realtime notices a historical is now serving the segment and drops the segment.

 
To unsubscribe from this group and all its topics, send an email to druid-development+unsubscribe@googlegroups.com.
To post to this group, send email to druid-development@googlegroups.com.

Maurizio Gallelli

unread,
Mar 24, 2015, 9:40:02 AM3/24/15
to druid-de...@googlegroups.com
Hi Fangjin,
thanks for your help.

On nas there is any process that do delate. 
I've rerun a new test this morning changing rejection Policy to serverTime as you suggested and updating my ingester accordingly.
Below a screen of nas dataStorage directory:
[root@druid06 buck_bidding]# ls
2015-03-23T01:00:00.000Z_2015-03-23T02:00:00.000Z  2015-03-24T11:00:00.000Z_2015-03-24T12:00:00.000Z
[root@druid06 buck_bidding]# cd 2015-03-24T11\:00\:00.000Z_2015-03-24T12\:00\:00.000Z/
[root@druid06 2015-03-24T11:00:00.000Z_2015-03-24T12:00:00.000Z]# ls
0  1  2  3  4  5

All of 0,1 ..5 subfolders are empty.

Into logs I was not able to find anything but seems that outside logs this error was fired
Exception in thread "plumber_persist_4" com.metamx.common.ISE: Cannot merge columns of type[STRING] and [LONG]
        at io.druid.segment.column.ColumnCapabilitiesImpl.merge(ColumnCapabilitiesImpl.java:124)
        at io.druid.segment.IndexMerger.makeIndexFiles(IndexMerger.java:423)
        at io.druid.segment.IndexMerger.merge(IndexMerger.java:319)
        at io.druid.segment.IndexMerger.persist(IndexMerger.java:181)
        at io.druid.segment.IndexMerger.persist(IndexMerger.java:151)
        at io.druid.segment.IndexMerger.persist(IndexMerger.java:134)
        at io.druid.segment.realtime.plumber.RealtimePlumber.persistHydrant(RealtimePlumber.java:830)
        at io.druid.segment.realtime.plumber.RealtimePlumber$3.doRun(RealtimePlumber.java:354)
        at io.druid.common.guava.ThreadRenamingRunnable.run(ThreadRenamingRunnable.java:40)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)

Exception in thread "plumber_persist_5" com.metamx.common.ISE: Cannot merge columns of type[STRING] and [LONG]
        at io.druid.segment.column.ColumnCapabilitiesImpl.merge(ColumnCapabilitiesImpl.java:124)
        at io.druid.segment.IndexMerger.makeIndexFiles(IndexMerger.java:423)
        at io.druid.segment.IndexMerger.merge(IndexMerger.java:319)
        at io.druid.segment.IndexMerger.persist(IndexMerger.java:181)
        at io.druid.segment.IndexMerger.persist(IndexMerger.java:151)
        at io.druid.segment.IndexMerger.persist(IndexMerger.java:134)
        at io.druid.segment.realtime.plumber.RealtimePlumber.persistHydrant(RealtimePlumber.java:830)
        at io.druid.segment.realtime.plumber.RealtimePlumber$3.doRun(RealtimePlumber.java:354)
        at io.druid.common.guava.ThreadRenamingRunnable.run(ThreadRenamingRunnable.java:40)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)   

Here again on metadata (mysql) I didn't find segment row info even if it's shown into Coordinator console.
I think I need to work on my incoming data checking what's wrong.

Can you please clarify when 
"basePersistDirectory" and "druid.storage.storageDirectory" are involved, seems that Reatime Node has only used basePersistDirectory 

 In your previous thread  you mention "5. A historical locally downloads the segment from deep storage and serves the segment." where Historical will store it, how can I setup this path?

I'll do some more tests with fresh data.

Thanks,
Maurizio

Fangjin Yang

unread,
Mar 24, 2015, 7:26:09 PM3/24/15
to druid-de...@googlegroups.com
Hi Maurizio, please see inline.

Did you by any chance, stop the realtime node, update your schema, and start the node again? This exception is causing your handoff to fail. If this is just a POC environment, I recommend just wiping the directories you've created and starting again.
 
Here again on metadata (mysql) I didn't find segment row info even if it's shown into Coordinator console.
I think I need to work on my incoming data checking what's wrong.

Can you please clarify when 
"basePersistDirectory" and "druid.storage.storageDirectory" are involved, seems that Reatime Node has only used basePersistDirectory 
 
basePersistDirectory is where realtime nodes store intermediate persisted chunks locally. The storageDirectory refers to where the finalized and complete segment will be stored in deep storage.

 In your previous thread  you mention "5. A historical locally downloads the segment from deep storage and serves the segment." where Historical will store it, how can I setup this path?

Historical nodes download segments based on druid.segmentCache.locations. 

Maurizio Gallelli

unread,
Mar 28, 2015, 4:03:18 AM3/28/15
to druid-de...@googlegroups.com
Hi Fangjin,
at the end I was able to have it working.
Issue seems to be related to the merging process, it fails trying to merge strings and float on the same field.
Removing below section from spec file now everything is working fine.

{
      "type": "longSum",
      "name" : "bid_response",
      "fieldName" : "bid"
      },{
      "type": "longSum",
      "name" : "win",
      "fieldName" : "win"
      },{
      "type": "longSum",
      "name" : "clicks",
      "fieldName" : "click"
      },{
      "type": "longSum",
      "name" : "conversion",
      "fieldName" : "convert"
      }

These four fields are mainly a bit (values are 0 or 1) where I define kind of stored row (meaning it belongs to banner bid or banner win or user click or conversion); all four are always filled with 0 or 1.

Did you have any idea why it's causing this issue?


Thanks
Maurizio


Il giorno lunedì 23 marzo 2015 16:59:46 UTC+1, Maurizio Gallelli ha scritto:

Fangjin Yang

unread,
Mar 29, 2015, 11:31:52 AM3/29/15
to druid-de...@googlegroups.com
Hi Maurizio,

I suspect that at some point you restarted a realtime node and changed the schema, leading to problems merging. Schema changes with realtime nodes is not a simple thing, which is one of the reasons we created Tranquility (https://github.com/metamx/tranquility), and do all realtime ingestion via the indexing service.

I also notice you have metrics listed as dimensions in your schema, which should be okay, but it might be interesting to test if there is a bug there with overlap.

dimensions": ["bid","win","click","convert","exchange_name","creative_id","device_country","device_make","device_model","device_os","device_osv","device_type","carrierid","bid_price","win_price","company_price","advertiser_price","pub_id","pub_name","place_id","place_name","domain","categories","exchange_id","campaign_id","offer_id","advertiser_id","device_country_id","device_make_id","device_model_id","device_os_id","device_osv_id","device_type_id","carrierid_id","banner_width","banner_heigth","supply_type","session_bid_id"]

Some more info on dimensions and metrics:
Reply all
Reply to author
Forward
0 new messages