class EventRDDBeamFactory extends BeamFactory[Map[String,String]] {
lazy val makeBeam: Beam[Map[String,String]] = {
val curator = CuratorFrameworkFactory.newClient(
new BoundedExponentialBackoffRetry(100, 3000, 5))
curator.start()
val indexService = "druid/overlord"
val discoveryPath = "/discovery"
val dataSource = "test"
val dimensions = IndexedSeq("ip")
val aggregators = Seq(new CountAggregatorFactory("website"))
val timestampFn = (message: Map[String,String]) => new DateTime(message.get("time").get)
DruidBeams
.builder(timestampFn)
.curator(curator)
.discoveryPath(discoveryPath)
.location(DruidLocation.create(indexService, dataSource))
.rollup(DruidRollup(SpecificDruidDimensions(dimensions), aggregators, QueryGranularity.MINUTE))
.tuning(
ClusteredBeamTuning(
segmentGranularity = Granularity.HOUR,
windowPeriod = new Period("PT10M"),
partitions = 1,
replicants = 1
)
)
.buildBeam()
}
}
:verify bin/verify-java
:verify bin/verify-default-ports
!p10 zk bin/run-zk conf-quickstart
coordinator bin/run-druid coordinator conf-quickstart
broker bin/run-druid broker conf-quickstart
historical bin/run-druid historical conf-quickstart
!p80 overlord bin/run-druid overlord conf-quickstart
!p90 middleManager bin/run-druid middleManager conf-quickstart
#tranquility-server bin/tranquility server -configFile conf-quickstart/tranquility/server.json
# Uncomment to use Tranquility Kafka
#tranquility-kafka bin/tranquility kafka -configFile conf-quickstart/tranquility/kafka.json
Apart from this, I haven't made any changes to any other config files. I am able to get the requests get to Druid through tranquility library, but I see the following error on zookeeper:
2016-02-10 12:52:31,558 INFO [SyncThread:0] org.apache.zookeeper.server.ZooKeeperServer - Established session 0x152cb3c23a20005 with negotiated timeout 40000 for client /10.196.192.38:49836
2016-02-10 12:52:33,943 INFO [ProcessThread(sid:0 cport:-1):] org.apache.zookeeper.server.PrepRequestProcessor - Got user-level KeeperException when processing sessionid:0x152cb3c23a20005 type:create cxid:0x2 zxid:0x44 txntype:-1 reqpath:n/a Error Path:/tranquility/beams/druid:overlord/druid_ingest Error:KeeperErrorCode = NoNode for /tranquility/beams/druid:overlord/druid_ingest
2016-02-10 12:52:33,963 INFO [ProcessThread(sid:0 cport:-1):] org.apache.zookeeper.server.PrepRequestProcessor - Got user-level KeeperException when processing sessionid:0x152cb3c23a20005 type:create cxid:0xc zxid:0x4a txntype:-1 reqpath:n/a Error Path:/tranquility/beams/druid:overlord/druid_ingest/mutex/locks Error:KeeperErrorCode = NoNode for /tranquility/beams/druid:overlord/druid_ingest/mutex/locks
2016-02-10 12:52:33,980 INFO [ProcessThread(sid:0 cport:-1):] org.apache.zookeeper.server.PrepRequestProcessor - Got user-level KeeperException when processing sessionid:0x152cb3c23a20005 type:create cxid:0x17 zxid:0x4e txntype:-1 reqpath:n/a Error Path:/tranquility/beams/druid:overlord/druid_ingest/mutex/leases Error:KeeperErrorCode = NoNode for /tranquility/beams/druid:overlord/druid_ingest/mutex/leases
My datasource name is druid_ingest. I don't see any real-time indexing tasks on the coordinator either. And there are no datasources returned from the broker API<host>:8082/druid/v2/datasources.
I see this in the overlord logs, when the overlord starts up:
2016-02-10T13:05:49,020 WARN [main] com.metamx.common.RetryUtils - Failed on try 1, retrying in 2,054ms.
org.skife.jdbi.v2.exceptions.UnableToObtainConnectionException: java.sql.SQLException: Cannot create PoolableConnectionFactory (java.net.ConnectException : Error connecting to server localhost on port 1,527 with message Connection refused.)
at org.skife.jdbi.v2.DBI.open(DBI.java:230) ~[jdbi-2.63.1.jar:2.63.1]
at org.skife.jdbi.v2.DBI.withHandle(DBI.java:279) ~[jdbi-2.63.1.jar:2.63.1]
at io.druid.metadata.SQLMetadataConnector$2.call(SQLMetadataConnector.java:108) ~[druid-server-0.8.3-iap1.jar:0.8.3-iap1]
at com.metamx.common.RetryUtils.retry(RetryUtils.java:38) [java-util-0.27.4.jar:?]
at io.druid.metadata.SQLMetadataConnector.retryWithHandle(SQLMetadataConnector.java:113) [druid-server-0.8.3-iap1.jar:0.8.3-iap1]
at io.druid.metadata.SQLMetadataConnector.createTable(SQLMetadataConnector.java:157) [druid-server-0.8.3-iap1.jar:0.8.3-iap1]
at io.druid.metadata.SQLMetadataConnector.createConfigTable(SQLMetadataConnector.java:231) [druid-server-0.8.3-iap1.jar:0.8.3-iap1]
at io.druid.metadata.SQLMetadataConnector.createConfigTable(SQLMetadataConnector.java:374) [druid-server-0.8.3-iap1.jar:0.8.3-iap1]
at io.druid.guice.JacksonConfigManagerModule$1.start(JacksonConfigManagerModule.java:56) [druid-common-0.8.3-iap1.jar:0.8.3-iap1]
at com.metamx.common.lifecycle.Lifecycle.start(Lifecycle.java:244) [java-util-0.27.4.jar:?]
at io.druid.guice.LifecycleModule$2.start(LifecycleModule.java:155) [druid-api-0.3.13.jar:0.8.3-iap1]
at io.druid.cli.GuiceRunnable.initLifecycle(GuiceRunnable.java:71) [druid-services-0.8.3-iap1.jar:0.8.3-iap1]
at io.druid.cli.ServerRunnable.run(ServerRunnable.java:38) [druid-services-0.8.3-iap1.jar:0.8.3-iap1]
at io.druid.cli.Main.main(Main.java:99) [druid-services-0.8.3-iap1.jar:0.8.3-iap1]
Caused by: java.sql.SQLException: Cannot create PoolableConnectionFactory (java.net.ConnectException : Error connecting to server localhost on port 1,527 with message Connection refused.)
at org.apache.commons.dbcp2.BasicDataSource.createPoolableConnectionFactory(BasicDataSource.java:2152) ~[commons-dbcp2-2.0.1.jar:2.0.1]
at org.apache.commons.dbcp2.BasicDataSource.createDataSource(BasicDataSource.java:1903) ~[commons-dbcp2-2.0.1.jar:2.0.1]
at org.apache.commons.dbcp2.BasicDataSource.getConnection(BasicDataSource.java:1413) ~[commons-dbcp2-2.0.1.jar:2.0.1]
at org.skife.jdbi.v2.DataSourceConnectionFactory.openConnection(DataSourceConnectionFactory.java:36) ~[jdbi-2.63.1.jar:2.63.1]
at org.skife.jdbi.v2.DBI.open(DBI.java:212) ~[jdbi-2.63.1.jar:2.63.1]
... 13 more
Caused by: java.sql.SQLNonTransientConnectionException: java.net.ConnectException : Error connecting to server localhost on port 1,527 with message Connection refused.
at org.apache.derby.client.am.SQLExceptionFactory.getSQLException(Unknown Source) ~[derbyclient-10.11.1.1.jar:?]
at org.apache.derby.client.am.SqlException.getSQLException(Unknown Source) ~[derbyclient-10.11.1.1.jar:?]
at org.apache.derby.jdbc.ClientDriver.connect(Unknown Source) ~[derbyclient-10.11.1.1.jar:?]
at org.apache.commons.dbcp2.DriverConnectionFactory.createConnection(DriverConnectionFactory.java:39) ~[commons-dbcp2-2.0.1.jar:2.0.1]
at org.apache.commons.dbcp2.PoolableConnectionFactory.makeObject(PoolableConnectionFactory.java:205) ~[commons-dbcp2-2.0.1.jar:2.0.1]
at org.apache.commons.dbcp2.BasicDataSource.validateConnectionFactory(BasicDataSource.java:2162) ~[commons-dbcp2-2.0.1.jar:2.0.1]
at org.apache.commons.dbcp2.BasicDataSource.createPoolableConnectionFactory(BasicDataSource.java:2148) ~[commons-dbcp2-2.0.1.jar:2.0.1]
at org.apache.commons.dbcp2.BasicDataSource.createDataSource(BasicDataSource.java:1903) ~[commons-dbcp2-2.0.1.jar:2.0.1]
at org.apache.commons.dbcp2.BasicDataSource.getConnection(BasicDataSource.java:1413) ~[commons-dbcp2-2.0.1.jar:2.0.1]
at org.skife.jdbi.v2.DataSourceConnectionFactory.openConnection(DataSourceConnectionFactory.java:36) ~[jdbi-2.63.1.jar:2.63.1]
at org.skife.jdbi.v2.DBI.open(DBI.java:212) ~[jdbi-2.63.1.jar:2.63.1]
... 13 more
val DRUID_INDEX_SERVICE = "druid/overlord"
val DRUID_DISCOVERY_PATH = "/discovery"
val DATA_SOURCE = "druid_ingest"
val DRUID_TRANQUILITY_RETRY_POLICY = new BoundedExponentialBackoffRetry(100, 3000, 5)
val DRUID_TRANQUILITY_TUNING = ClusteredBeamTuning(
segmentGranularity = Granularity.HOUR,
windowPeriod = new Period("PT10M"),
partitions = 1,
replicants = 1)
lazy val BeamInstance: Beam[Map[String, String]] = {
val curator = CuratorFrameworkFactory.newClient(
ConfigConstants.DRUID_ZOOKEEPER,
ConfigConstants.DRUID_TRANQUILITY_RETRY_POLICY)
curator.start()
val dimensions = <keys>
val aggregators = <metrics>
val timestampFn = (message: Map[String, String]) => CommonUtil.convertMillisToJodaDate(message.get("timestamp").get)
DruidBeams
.builder(timestampFn)
.curator(curator)
.discoveryPath(ConfigConstants.DRUID_DISCOVERY_PATH)
.location(DruidLocation.create(ConfigConstants.DRUID_INDEX_SERVICE, ConfigConstants.DATA_SOURCE))
.rollup(DruidRollup(SpecificDruidDimensions(dimensions), aggregators, QueryGranularity.MINUTE))
.tuning(ConfigConstants.DRUID_TRANQUILITY_TUNING)
.buildBeam()
To view this discussion on the web visit https://groups.google.com/d/msgid/druid-user/672e5549-7412-4553-8d4f-5a8843ce11d2%40googlegroups.com.--
You received this message because you are subscribed to the Google Groups "Druid User" group.
To unsubscribe from this group and stop receiving emails from it, send an email to druid-user+...@googlegroups.com.
To post to this group, send email to druid...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/druid-user/8c4e4426-ad8a-4966-a562-40e711c12fe8%40googlegroups.com.