카프카 데이터 소스 관련 문의

130 views
Skip to first unread message

kall...@gmail.com

unread,
Jun 19, 2019, 8:19:46 PM6/19/19
to Metatron Discovery User Group
카프카 토픽들을 데이터소스로 사용하기 위해서 밀리세컨드 타임스탬프 포맷에 관하여 문의드립니다.

블로그를 보니 세컨드 단위의 포맷이 아래 두가지인데 둘 다 괜찮은 포맷인가요?
"yyyy-MM-ddTHH:mm:ssZ" / "yyyy-MM-dd'T'HH:mm:ssZ"

밀리세컨드 포맷은 아래 중 어떤 것이 맞는지 궁금합니다.
"yyyy-MM-ddTHH:mm:ss.SSSZ" / "yyyy-MM-dd'T'HH:mm:ss.SSSZ"


저희가 REST API에 POST하는 json은 아래와 유사한데 문제점이 있을지 확인해주시면 좋을 것 같습니다.

{
"name": "A0000",
"description": "",
"dsType": "MASTER",
"connType": "ENGINE",
"srcType": "REALTIME",
"granularity": "SECOND",
"segGranularity": "DAY",
"ingestion": {
"type": "realtime",
"topic": "A0000",
"consumerType": "KAFKA",
"consumerProperties": {
"bootstrap.servers": "localhost:9092"
},
"format": {
"type": "json"
},
"rollup": false
},
"fields": [
{
"name": "recv_time",
"alias": "recv_time",
"type": "TIMESTAMP",
"logicalType": "TIMESTAMP",
"role": "TIMESTAMP",
"format":{
"format":"yyyy-MM-ddTHH:mm:ss.SSSZ",
"locale":"en",
"unit": "MILLISECOND",
"type":"time_format",
"timeZone":"Asia/Seoul"
},
"seq": 0
},
{
"name": "A",
"alias": "A",
"type": "STRING",
"logicalType": "STRING",
"role": "MEASURE",
"seq": 1
},
{
"name": "B",
"alias": "B",
"type": "STRING",
"logicalType": "STRING",
"role": "MEASURE",
"seq": 2
}
]
}

감사합니다.

kyungtaak Noh

unread,
Jun 19, 2019, 9:37:59 PM6/19/19
to Metatron Discovery User Group
스크린캡춰가 잘못되어 있네요. 이부분은 수정해두겠습니다.
타임 포맷 지정시 포맷 이외의 글자는 따옴표를 붙여주어야합니다. ex. yyyy-MM-dd'T'HH:mm:ss.SSSZ

Screen capture is wrong. I will fix this part.
When specifying the timestamp format, characters other than format characters must be attached with a single quote.

kall...@gmail.com

unread,
Jun 19, 2019, 9:46:38 PM6/19/19
to Metatron Discovery User Group
마지막 'Z'가 아니라 Z인 것은 왜그런가요?
그리고 혹시 REST로 datasource delete도 가능한가요? 있으면 편할 것 같습니다.

kyungtaak Noh

unread,
Jun 19, 2019, 9:55:05 PM6/19/19
to Metatron Discovery User Group
답변 드립니다.

- Z 는 timezone offset(GMT+0) 을 의미합니다.  time format 에 대한 설명은 https://www.joda.org/joda-time/key_format.html 를 참고해주세요.
- API Guide 가 존재합니다. https://discovery.metatron.app/docs/api-guide.html#resources-datasource-methods-delete 부족한 부분이 있으시면 리포팅 부탁드릴게요.
  
Message has been deleted

kall...@gmail.com

unread,
Jun 20, 2019, 12:47:09 AM6/20/19
to Metatron Discovery User Group
답변 감사드립니다.

저희가 500개 정도의 kafka data resource를 등록하려는데 일부 data resource들이 disabled로 전환됩니다. druid 에서 관련 로그는 아래와 같습니다.

2019-06-20T04:31:15,230 INFO [KafkaIndexTaskClient-i7012-0] io.druid.indexing.kafka.KafkaIndexTaskClient - No TaskLocation available for task [index_kafka_i7012_688feb2abea189a_celklhmc], this task may not have been assigned to a worker yet or may have already completed

혹시 500개 정도의 data resource를 등록하는데 필요한 별도의 설정이나 하드웨어 요구사항이 있다면 알려주시면 감사하겠습니다.

kyungtaak Noh

unread,
Jun 20, 2019, 3:36:45 AM6/20/19
to Metatron Discovery User Group

우선 일부 data resource들이 disabled로 전환 된다는 것은 등록한 데이터가 kafka 를 통해 들어오지 않거나, 
druid 적재 task 가 정상적으로 동작하지 않아 엔진 내 데이터 소스에 한건도 적재가 되지 않은 상태를 의미합니다.

위 로그를 보니, worker 의 할당이 제대로 되지 않았다는것인데요. 

${DRUID_HOME}/conf/druid/middleManager/runtime.properties 의 설정을 보시면 아래와 같이 되어 있을거에요.
----------------------------------------------------------------------------------------------------------
# Number of tasks per middleManager
druid.worker.capacity=10

# Task launch parameters
druid.indexer.runner.javaOpts=-server -Xmx2g -Duser.timezone=UTC -Dfile.encoding=UTF-8 -Djava.util.logging.manager=org.apache.logging.log4j.jul.LogManager
----------------------------------------------------------------------------------------------------------

그리고 데이터 소스에 등록하실 때 아래 처럼 "taskOptions" 항목에 task 에 대한 옵션도 지정이 가능합니다.
{
    ...   
    "ingestion": {
        "type": "realtime",
        "topic": "realtime_sample",
        "consumerType": "KAFKA",
        "consumerProperties": {
            "bootstrap.servers": "localhost:9092"
        },
        "format": {
            "type": "json"
        },
        "rollup": false,
        "taskOptions": {
          "taskCount": 2,
          "replicas": 3   
          ...
        }
    }
    ...
}

그럼 middleManager node 가 설치된 서버 가용 메모리가 20G 인 경우, 
task 하나당 차지하는 메모리가 2G  이면, druid.worker.capacity 값이 10 이하로 구성해야 적절합니다.  참고로, 하나의 worker (=task) 는 독립적인 jvm 프로세스로 동작합니다.

즉, "druid.worker.capacity * druid.indexer.runner.javaOpts 내 -Xmx 용량 =< middelmanager node free memory" 이라는 대략적인 산술식이 나옵니다.

그럼 500개라고 가정하면, 500개의 task 가 동작해야하고요. 
산술식에 의거하면, 서버 가용 메모리가 위 처럼 20G 인 middlemanager node 가 설치된 서버에서 capacity 를 10개로 지정 했다고하면, 
middlemanager node 가 50개가 필요합니다. 

그리고 추가적인 task 개수상승의 요인은.. 상용 kafka topic 은 relica 설정을 하게 될텐데요. 그럼.. 500개의 task 에서 relica 개수만큼 곱하게 되어.. 더 늘어나야 합니다. ;;
※ 참고로, 해당 링크 내, "Capacity Planning" 관련글을 참조해주시면 좋을것 같습니다. 링크내 문서의  KafkaSupervisorIOConfig 와 위 스펙내  taskOptions 속성과는 동일한 역할을 합니다 (https://druid.apache.org/docs/latest/development/extensions-core/kafka-ingestion.html)

물론 단순 산술로 말씀드린것이지만, 사실 어떠한 변수가 발생할지 예상하기 어려운데요.;
처음부터 한번에 500개의 데이터 소스를 유지하는것은 어려우니 일부 몇개의 데이터소스를 유지해보신 다음 점차 개수를 늘려서 진행을 해보시는것을 권장드립니다. 

kall...@gmail.com

unread,
Jun 25, 2019, 7:28:01 PM6/25/19
to Metatron Discovery User Group
감사합니다. 일단 3개의 데이터소스에 대해서는 잘 동작하는 것을 확인했습니다.

kall...@gmail.com

unread,
Jun 25, 2019, 8:26:43 PM6/25/19
to Metatron Discovery User Group
테스트를 계속하던 중 추가로 문제가 발생하여 도움을 주실 수 있는지 문의드립니다.

저희가 세 개의 토픽(A3015 / G7015 / B6015)에 대해서 테스트 중인데,
약 10만 건을 스트리밍한 뒤 3~5분 정도 딜레이 후 추가로 스트리밍 할 때 G7015 / B6015 데이터 소스는 disabled로 전환됩니다.

---------------
하드웨어
램: 56gb
core: 8(16thread)

---------------
서비스 (모두 서버 1대 내에서 동작)

kafka: local only / replica=1
metatron-discovery: official docker / -m 40G

---------------
druid 설정

[druid/middleManager/runtime.properties]
druid.worker.capacity=15
druid.processing.numThreads=4

---------------
metatron datasource 설정

taskOptions: {
"taskCount": 2,
"replicas": 1
}
---------------


아래는 의심가는 관련 로그입니다.

druid-overload-stdout.log

2019-06-25T23:09:20,756 INFO [qtp1014135205-123] io.druid.indexing.overlord.TaskLockbox - Try lock for task index_kafka_b6015_84437a2eaef5ba8_jgiocpdo on [b6015] for interval 2018-02-08T00:00:00.000Z/2018-02-09T00:00:00.000Z
2019-06-25T23:09:20,757 WARN [qtp1014135205-123] io.druid.indexing.overlord.http.OverlordResource - Failed to perform task action (not acceptable)
com.metamx.common.ISE: Unable to grant lock to inactive Task [index_kafka_b6015_84437a2eaef5ba8_jgiocpdo]
at io.druid.indexing.overlord.TaskLockbox.tryLock(TaskLockbox.java:241) ~[druid-indexing-service-0.9.1-SNAPSHOT.jar:0.9.1-SNAPSHOT]
at io.druid.indexing.overlord.TaskLockbox.tryLock(TaskLockbox.java:207) ~[druid-indexing-service-0.9.1-SNAPSHOT.jar:0.9.1-SNAPSHOT]
at io.druid.indexing.common.actions.SegmentAllocateAction.perform(SegmentAllocateAction.java:179) ~[druid-indexing-service-0.9.1-SNAPSHOT.jar:0.9.1-SNAPSHOT]
at io.druid.indexing.common.actions.SegmentAllocateAction.perform(SegmentAllocateAction.java:53) ~[druid-indexing-service-0.9.1-SNAPSHOT.jar:0.9.1-SNAPSHOT]
at io.druid.indexing.common.actions.LocalTaskActionClient.submit(LocalTaskActionClient.java:68) ~[druid-indexing-service-0.9.1-SNAPSHOT.jar:0.9.1-SNAPSHOT]
at io.druid.indexing.overlord.http.OverlordResource$5.apply(OverlordResource.java:465) [druid-indexing-service-0.9.1-SNAPSHOT.jar:0.9.1-SNAPSHOT]
at io.druid.indexing.overlord.http.OverlordResource$5.apply(OverlordResource.java:454) [druid-indexing-service-0.9.1-SNAPSHOT.jar:0.9.1-SNAPSHOT]
at io.druid.indexing.overlord.http.OverlordResource.asLeaderWith(OverlordResource.java:852) [druid-indexing-service-0.9.1-SNAPSHOT.jar:0.9.1-SNAPSHOT]
at io.druid.indexing.overlord.http.OverlordResource.doAction(OverlordResource.java:451) [druid-indexing-service-0.9.1-SNAPSHOT.jar:0.9.1-SNAPSHOT]
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_212]
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:1.8.0_212]
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_212]
at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_212]
at com.sun.jersey.spi.container.JavaMethodInvokerFactory$1.invoke(JavaMethodInvokerFactory.java:60) [jersey-server-1.19.jar:1.19]
at com.sun.jersey.server.impl.model.method.dispatch.AbstractResourceMethodDispatchProvider$ResponseOutInvoker._dispatch(AbstractResourceMethodDispatchProvider.java:205) [jersey-server-1.19.jar:1.19]
at com.sun.jersey.server.impl.model.method.dispatch.ResourceJavaMethodDispatcher.dispatch(ResourceJavaMethodDispatcher.java:75) [jersey-server-1.19.jar:1.19]
at com.sun.jersey.server.impl.uri.rules.HttpMethodRule.accept(HttpMethodRule.java:302) [jersey-server-1.19.jar:1.19]
at com.sun.jersey.server.impl.uri.rules.RightHandPathRule.accept(RightHandPathRule.java:147) [jersey-server-1.19.jar:1.19]
at com.sun.jersey.server.impl.uri.rules.ResourceClassRule.accept(ResourceClassRule.java:108) [jersey-server-1.19.jar:1.19]
at com.sun.jersey.server.impl.uri.rules.RightHandPathRule.accept(RightHandPathRule.java:147) [jersey-server-1.19.jar:1.19]
at com.sun.jersey.server.impl.uri.rules.RootResourceClassesRule.accept(RootResourceClassesRule.java:84) [jersey-server-1.19.jar:1.19]
at com.sun.jersey.server.impl.application.WebApplicationImpl._handleRequest(WebApplicationImpl.java:1542) [jersey-server-1.19.jar:1.19]
at com.sun.jersey.server.impl.application.WebApplicationImpl._handleRequest(WebApplicationImpl.java:1473) [jersey-server-1.19.jar:1.19]
at com.sun.jersey.server.impl.application.WebApplicationImpl.handleRequest(WebApplicationImpl.java:1419) [jersey-server-1.19.jar:1.19]
at com.sun.jersey.server.impl.application.WebApplicationImpl.handleRequest(WebApplicationImpl.java:1409) [jersey-server-1.19.jar:1.19]
at com.sun.jersey.spi.container.servlet.WebComponent.service(WebComponent.java:409) [jersey-servlet-1.19.jar:1.19]
at com.sun.jersey.spi.container.servlet.ServletContainer.service(ServletContainer.java:558) [jersey-servlet-1.19.jar:1.19]
at com.sun.jersey.spi.container.servlet.ServletContainer.service(ServletContainer.java:733) [jersey-servlet-1.19.jar:1.19]
at javax.servlet.http.HttpServlet.service(HttpServlet.java:790) [javax.servlet-api-3.1.0.jar:3.1.0]
at com.google.inject.servlet.ServletDefinition.doServiceImpl(ServletDefinition.java:286) [guice-servlet-4.1.0.jar:?]
at com.google.inject.servlet.ServletDefinition.doService(ServletDefinition.java:276) [guice-servlet-4.1.0.jar:?]
at com.google.inject.servlet.ServletDefinition.service(ServletDefinition.java:181) [guice-servlet-4.1.0.jar:?]
at com.google.inject.servlet.ManagedServletPipeline.service(ManagedServletPipeline.java:91) [guice-servlet-4.1.0.jar:?]
at com.google.inject.servlet.FilterChainInvocation.doFilter(FilterChainInvocation.java:85) [guice-servlet-4.1.0.jar:?]
at com.google.inject.servlet.ManagedFilterPipeline.dispatch(ManagedFilterPipeline.java:120) [guice-servlet-4.1.0.jar:?]
at com.google.inject.servlet.DelegatedGuiceFilter.doFilter(DelegatedGuiceFilter.java:86) [druid-services-0.9.1-SNAPSHOT.jar:?]
at org.eclipse.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1759) [jetty-servlet-9.3.24.v20180605.jar:9.3.24.v20180605]
at io.druid.server.http.RedirectFilter.doFilter(RedirectFilter.java:71) [druid-server-0.9.1-SNAPSHOT.jar:0.9.1-SNAPSHOT]
at org.eclipse.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1759) [jetty-servlet-9.3.24.v20180605.jar:9.3.24.v20180605]
at org.eclipse.jetty.servlet.ServletHandler.doHandle(ServletHandler.java:582) [jetty-servlet-9.3.24.v20180605.jar:9.3.24.v20180605]
at org.eclipse.jetty.server.session.SessionHandler.doHandle(SessionHandler.java:224) [jetty-server-9.3.24.v20180605.jar:9.3.24.v20180605]
at org.eclipse.jetty.server.handler.ContextHandler.doHandle(ContextHandler.java:1180) [jetty-server-9.3.24.v20180605.jar:9.3.24.v20180605]
at org.eclipse.jetty.servlet.ServletHandler.doScope(ServletHandler.java:512) [jetty-servlet-9.3.24.v20180605.jar:9.3.24.v20180605]
at org.eclipse.jetty.server.session.SessionHandler.doScope(SessionHandler.java:185) [jetty-server-9.3.24.v20180605.jar:9.3.24.v20180605]
at org.eclipse.jetty.server.handler.ContextHandler.doScope(ContextHandler.java:1112) [jetty-server-9.3.24.v20180605.jar:9.3.24.v20180605]
at org.eclipse.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:141) [jetty-server-9.3.24.v20180605.jar:9.3.24.v20180605]
at org.eclipse.jetty.server.handler.gzip.GzipHandler.handle(GzipHandler.java:493) [jetty-server-9.3.24.v20180605.jar:9.3.24.v20180605]
at org.eclipse.jetty.server.handler.HandlerList.handle(HandlerList.java:52) [jetty-server-9.3.24.v20180605.jar:9.3.24.v20180605]
at org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:134) [jetty-server-9.3.24.v20180605.jar:9.3.24.v20180605]
at org.eclipse.jetty.server.Server.handle(Server.java:539) [jetty-server-9.3.24.v20180605.jar:9.3.24.v20180605]
at org.eclipse.jetty.server.HttpChannel.handle(HttpChannel.java:333) [jetty-server-9.3.24.v20180605.jar:9.3.24.v20180605]
at org.eclipse.jetty.server.HttpConnection.onFillable(HttpConnection.java:251) [jetty-server-9.3.24.v20180605.jar:9.3.24.v20180605]
at org.eclipse.jetty.io.AbstractConnection$ReadCallback.succeeded(AbstractConnection.java:283) [jetty-io-9.3.24.v20180605.jar:9.3.24.v20180605]
at org.eclipse.jetty.io.FillInterest.fillable(FillInterest.java:108) [jetty-io-9.3.24.v20180605.jar:9.3.24.v20180605]
at org.eclipse.jetty.io.SelectChannelEndPoint$2.run(SelectChannelEndPoint.java:93) [jetty-io-9.3.24.v20180605.jar:9.3.24.v20180605]
at org.eclipse.jetty.util.thread.strategy.ExecuteProduceConsume.executeProduceConsume(ExecuteProduceConsume.java:303) [jetty-util-9.3.24.v20180605.jar:9.3.24.v20180605]
at org.eclipse.jetty.util.thread.strategy.ExecuteProduceConsume.produceConsume(ExecuteProduceConsume.java:148) [jetty-util-9.3.24.v20180605.jar:9.3.24.v20180605]
at org.eclipse.jetty.util.thread.strategy.ExecuteProduceConsume.run(ExecuteProduceConsume.java:136) [jetty-util-9.3.24.v20180605.jar:9.3.24.v20180605]
at org.eclipse.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:671) [jetty-util-9.3.24.v20180605.jar:9.3.24.v20180605]
at org.eclipse.jetty.util.thread.QueuedThreadPool$2.run(QueuedThreadPool.java:589) [jetty-util-9.3.24.v20180605.jar:9.3.24.v20180605]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_212]



druid-middlemanager-stdout.log

2019-06-25T23:10:48,007 INFO [forking-task-runner-11] io.druid.indexing.overlord.TaskRunnerUtils - Task [index_kafka_b6015_84437a2eaef5ba8_lbheoall] location changed to [TaskLocation{host='57db4521d0cf', port=8102}].
2019-06-25T23:10:48,007 INFO [forking-task-runner-11] io.druid.indexing.overlord.TaskRunnerUtils - Task [index_kafka_b6015_84437a2eaef5ba8_lbheoall] status changed to [RUNNING].
2019-06-25T23:10:48,007 INFO [WorkerTaskMonitor] io.druid.indexing.worker.WorkerTaskMonitor - Updating task [index_kafka_b6015_84437a2eaef5ba8_lbheoall] announcement with location [TaskLocation{host='57db4521d0cf', port=8102}]
2019-06-25T23:10:48,007 INFO [forking-task-runner-11] io.druid.indexing.overlord.ForkingTaskRunner - Logging task index_kafka_b6015_84437a2eaef5ba8_lbheoall output to: var/druid/task/index_kafka_b6015_84437a2eaef5ba8_lbheoall/log
2019-06-25T23:27:46,880 INFO [qtp1783978315-44] io.druid.indexing.overlord.ForkingTaskRunner - Killing process for task: index_kafka_a3015_57092b9518ac571_pnaehoec
2019-06-25T23:27:46,887 INFO [qtp1783978315-39] io.druid.indexing.overlord.ForkingTaskRunner - Killing process for task: index_kafka_b6015_84437a2eaef5ba8_lbheoall
2019-06-25T23:27:46,892 INFO [forking-task-runner-10] io.druid.indexing.common.tasklogs.FileTaskLogs - Wrote task log to: var/druid/indexing-logs/index_kafka_a3015_57092b9518ac571_pnaehoec.log
2019-06-25T23:27:46,892 INFO [forking-task-runner-10] io.druid.indexing.overlord.ForkingTaskRunner - Exception caught during execution
java.io.IOException: Stream closed
at java.io.BufferedInputStream.getBufIfOpen(BufferedInputStream.java:170) ~[?:1.8.0_212]
at java.io.BufferedInputStream.read1(BufferedInputStream.java:291) ~[?:1.8.0_212]
at java.io.BufferedInputStream.read(BufferedInputStream.java:345) ~[?:1.8.0_212]
at java.io.FilterInputStream.read(FilterInputStream.java:107) ~[?:1.8.0_212]
at com.google.common.io.ByteStreams.copy(ByteStreams.java:175) ~[guava-16.0.1.jar:?]
at io.druid.indexing.overlord.ForkingTaskRunner$1.call(ForkingTaskRunner.java:439) [druid-indexing-service-0.9.1-SNAPSHOT.jar:0.9.1-SNAPSHOT]
at io.druid.indexing.overlord.ForkingTaskRunner$1.call(ForkingTaskRunner.java:220) [druid-indexing-service-0.9.1-SNAPSHOT.jar:0.9.1-SNAPSHOT]
at java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_212]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_212]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_212]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_212]
2019-06-25T23:27:46,905 INFO [qtp1783978315-43] io.druid.indexing.overlord.ForkingTaskRunner - Killing process for task: index_kafka_g7015_bf68122028c531c_fplhdhli
2019-06-25T23:27:46,907 INFO [forking-task-runner-10] io.druid.indexing.overlord.ForkingTaskRunner - Removing task directory: var/druid/task/index_kafka_a3015_57092b9518ac571_pnaehoec
2019-06-25T23:27:46,909 INFO [forking-task-runner-9] io.druid.indexing.common.tasklogs.FileTaskLogs - Wrote task log to: var/druid/indexing-logs/index_kafka_g7015_bf68122028c531c_fplhdhli.log
2019-06-25T23:27:46,910 INFO [forking-task-runner-9] io.druid.indexing.overlord.ForkingTaskRunner - Exception caught during execution
java.io.IOException: Stream closed
at java.io.BufferedInputStream.getBufIfOpen(BufferedInputStream.java:170) ~[?:1.8.0_212]
at java.io.BufferedInputStream.read1(BufferedInputStream.java:291) ~[?:1.8.0_212]
at java.io.BufferedInputStream.read(BufferedInputStream.java:345) ~[?:1.8.0_212]
at java.io.FilterInputStream.read(FilterInputStream.java:107) ~[?:1.8.0_212]
at com.google.common.io.ByteStreams.copy(ByteStreams.java:175) ~[guava-16.0.1.jar:?]
at io.druid.indexing.overlord.ForkingTaskRunner$1.call(ForkingTaskRunner.java:439) [druid-indexing-service-0.9.1-SNAPSHOT.jar:0.9.1-SNAPSHOT]
at io.druid.indexing.overlord.ForkingTaskRunner$1.call(ForkingTaskRunner.java:220) [druid-indexing-service-0.9.1-SNAPSHOT.jar:0.9.1-SNAPSHOT]
at java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_212]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_212]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_212]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_212]
2019-06-25T23:27:46,911 INFO [forking-task-runner-9] io.druid.indexing.overlord.ForkingTaskRunner - Removing task directory: var/druid/task/index_kafka_g7015_bf68122028c531c_fplhdhli
2019-06-25T23:27:46,914 INFO [forking-task-runner-11] io.druid.indexing.common.tasklogs.FileTaskLogs - Wrote task log to: var/druid/indexing-logs/index_kafka_b6015_84437a2eaef5ba8_lbheoall.log
2019-06-25T23:27:46,914 INFO [forking-task-runner-11] io.druid.indexing.overlord.ForkingTaskRunner - Exception caught during execution
java.io.IOException: Stream closed
at java.io.BufferedInputStream.getBufIfOpen(BufferedInputStream.java:170) ~[?:1.8.0_212]
at java.io.BufferedInputStream.read1(BufferedInputStream.java:291) ~[?:1.8.0_212]
at java.io.BufferedInputStream.read(BufferedInputStream.java:345) ~[?:1.8.0_212]
at java.io.FilterInputStream.read(FilterInputStream.java:107) ~[?:1.8.0_212]
at com.google.common.io.ByteStreams.copy(ByteStreams.java:175) ~[guava-16.0.1.jar:?]
at io.druid.indexing.overlord.ForkingTaskRunner$1.call(ForkingTaskRunner.java:439) [druid-indexing-service-0.9.1-SNAPSHOT.jar:0.9.1-SNAPSHOT]
at io.druid.indexing.overlord.ForkingTaskRunner$1.call(ForkingTaskRunner.java:220) [druid-indexing-service-0.9.1-SNAPSHOT.jar:0.9.1-SNAPSHOT]
at java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_212]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_212]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_212]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_212]
2019-06-25T23:27:46,916 INFO [forking-task-runner-11] io.druid.indexing.overlord.ForkingTaskRunner - Removing task directory: var/druid/task/index_kafka_b6015_84437a2eaef5ba8_lbheoall
2019-06-25T23:27:46,955 INFO [WorkerTaskMonitor] io.druid.indexing.worker.WorkerTaskMonitor - Job's finished. Completed [index_kafka_a3015_57092b9518ac571_pnaehoec] with status [FAILED]
2019-06-25T23:27:47,043 INFO [WorkerTaskMonitor] io.druid.indexing.worker.WorkerTaskMonitor - Job's finished. Completed [index_kafka_g7015_bf68122028c531c_fplhdhli] with status [FAILED]
2019-06-25T23:27:47,120 INFO [WorkerTaskMonitor] io.druid.indexing.worker.WorkerTaskMonitor - Job's finished. Completed [index_kafka_b6015_84437a2eaef5ba8_lbheoall] with status [FAILED]
2019-06-25T23:27:47,157 INFO [WorkerTaskMonitor] io.druid.indexing.worker.WorkerTaskMonitor - Submitting runnable for task[index_kafka_a3015_57092b9518ac571_nfoofeph]
2019-06-25T23:27:47,196 INFO [WorkerTaskMonitor] io.druid.indexing.worker.WorkerTaskMonitor - Affirmative. Running task [index_kafka_a3015_57092b9518ac571_nfoofeph]
2019-06-25T23:27:47,230 INFO [forking-task-runner-12] io.druid.indexing.overlord.ForkingTaskRunner - Forking java processor with: -server -Xmx2g -Duser.timezone=UTC -Dfile.encoding=UTF-8 -Djava.util.logging.manager=org.apache.logging.log4j.jul.LogManager -Ddruid.indexer.task.baseTaskDir=var/druid/task -Ddruid.processing.buffer.sizeBytes=536870912 -Ddruid.metadata.storage.connector.host=localhost -Ddruid.computation.buffer.size=67108864 -Duser.timezone=UTC -Dfile.encoding.pkg=sun.io -Ddruid.storage.storageDirectory=var/druid/segments -Ddruid.selectors.coordinator.serviceName=druid/coordinator -Ddruid.extensions.directory=/usr/local/druid/extensions -Ddruid.selectors.indexing.serviceName=druid/overlord -Ddruid.port=8091 -Ddruid.server.http.numThreads=25 -Ddruid.worker.capacity=15 -Ddruid.metadata.storage.connector.port=1527 -Ddruid.service=druid/middleManager -Ddruid.metadata.storage.type=derby -Ddruid.metadata.storage.connector.connectURI=jdbc:derby://localhost:1527/var/druid/metadata.db;create=true -Djava.io.tmpdir=/usr/local/druid/tmp -Ddruid.extensions.loadList=["druid-jdbc-firehose", "druid-histogram", "druid-datasketches", "druid-hive-extensions", "mysql-metadata-storage", "druid-hdfs-storage", "druid-range", "druid-area", "druid-stats", "druid-jdbc-firehose", "druid-orc-extensions", "druid-kafka-indexing-service", "druid-lucene-extensions", "druid-geotools-extensions", "druid-hive-udf-extensions"] -Ddruid.startup.logging.logProperties=true -Ddruid.processing.numThreads=4 -Ddruid.zk.service.host=localhost -Ddruid.indexer.logs.directory=var/druid/indexing-logs -Ddruid.zk.paths.base=/druid -Dfile.encoding=UTF-8 -Ddruid.indexer.task.defaultHadoopCoordinates=["org.apache.hadoop:hadoop-client:2.7.3"] -Ddruid.storage.type=local -Ddruid.indexer.task.hadoopWorkingPath=var/druid/hadoop-tmp -Ddruid.indexer.logs.type=file -Ddruid.metrics.emitter.dimension.dataSource=a3015 -Ddruid.metrics.emitter.dimension.taskId=index_kafka_a3015_57092b9518ac571_nfoofeph -Ddruid.host=57db4521d0cf -Ddruid.port=8100 io.druid.cli.Main internal peon var/druid/task/index_kafka_a3015_57092b9518ac571_nfoofeph/task.json var/druid/task/index_kafka_a3015_57092b9518ac571_nfoofeph/d34643d8-9db3-4026-bf59-044791e533d5/status.json --queryable
2019-06-25T23:27:47,233 INFO [forking-task-runner-12] io.druid.indexing.overlord.TaskRunnerUtils - Task [index_kafka_a3015_57092b9518ac571_nfoofeph] location changed to [TaskLocation{host='57db4521d0cf',
port=8100}].
2019-06-25T23:27:47,234 INFO [forking-task-runner-12] io.druid.indexing.overlord.TaskRunnerUtils - Task [index_kafka_a3015_57092b9518ac571_nfoofeph] status changed to [RUNNING].
2019-06-25T23:27:47,234 INFO [WorkerTaskMonitor] io.druid.indexing.worker.WorkerTaskMonitor - Updating task [index_kafka_a3015_57092b9518ac571_nfoofeph] announcement with location [TaskLocation{host='57db4521d0cf', port=8100}]
2019-06-25T23:27:47,234 INFO [forking-task-runner-12] io.druid.indexing.overlord.ForkingTaskRunner - Logging task index_kafka_a3015_57092b9518ac571_nfoofeph output to: var/druid/task/index_kafka_a3015_57092b9518ac571_nfoofeph/log

Jae Young Lee

unread,
Jul 9, 2019, 3:36:12 AM7/9/19
to Metatron Discovery User Group
답변이 늦었습니다.
위의 정보만 가지고는 문제를 파악하기 어려운데, 데이터 소스 3개를 적재한 Query를 올려주실 수 있으실까요?  

kall...@gmail.com

unread,
Jul 9, 2019, 7:37:06 PM7/9/19
to Metatron Discovery User Group
제가 데이터소스 등록시 unit test로 사용하는 json 샘플은 아래와 같습니다. 실제 POST할 때도 모든 필드는 STRING입니다. 감사합니다.

{
"name": "fakeTrCode",
"description": "",
"dsType": "MASTER",
"connType": "ENGINE",
"srcType": "REALTIME",
"granularity": "SECOND",
"segGranularity": "DAY",
"ingestion": {
"type": "realtime",
"topic": "fakeTrCode",
"consumerType": "KAFKA",
"consumerProperties": {
"bootstrap.servers": "192.168.36.1:9092"
},
"format": {
"type": "json"
},
"rollup": false,
"taskOptions": {
"taskCount": 1,
"replicas": 1
}
},
"fields": [
{
"name": "recv_time",
"alias": "recv_time",
"type": "TIMESTAMP",
"logicalType": "TIMESTAMP",
"role": "TIMESTAMP",
"format":{
"format":"yyyy-MM-dd'T'HH:mm:ss.SSSZ",
"locale":"en",
"unit": "MILLISECOND",
"type":"time_format",
"timeZone":"Asia/Seoul"
},
"seq": 0
},
{
"name": "A",
"alias": "A",
"type": "STRING",
"logicalType": "STRING",
"role": "DIMENSION",
"seq": 1
},
{
"name": "B",
"alias": "B",
"type": "STRING",
"logicalType": "STRING",
"role": "DIMENSION",
"seq": 2
}
]
}

Jae Young Lee

unread,
Jul 12, 2019, 3:05:23 AM7/12/19
to Metatron Discovery User Group

감사합니다. 내용 확인해 보겠습니다.

kyungtaak Noh

unread,
Aug 21, 2019, 8:03:01 PM8/21/19
to Metatron Discovery User Group
혹시 추가로 disabled 로 표시된 데이터소스 명 (engine_name) 을 아래 스크립트에 넣어서 호출결과를 알려주실수 있을가요?

curl -X POST \
  http://${brokernode url}/druid/v2/ \
  -H 'Content-Type: application/json' \
  -d '{
  "queryType":"segmentMetadata",
  "dataSource":"${datasource name}",
  "merge": true,
  "intervals":["2013-01-01/2020-01-01"]
}'
Reply all
Reply to author
Forward
0 new messages