kafka.schema.registry.url

589 views
Skip to first unread message

Zhenhua Cao

unread,
May 14, 2016, 10:30:29 AM5/14/16
to gobblin-users
I ingest data from Kafka to HDFS  ,
the job failed on the step get source avro schema
kafka.schema.registry.url=#schema registry URI

I add two class about kafka avro source as bellow:

public class KafkaAvroSource extends KafkaSource<Schema, GenericRecord> {

private static final Logger LOG = LoggerFactory.getLogger(KafkaAvroSource.class);

private Optional<KafkaAvroSchemaRegistry> schemaRegistry = Optional.absent();

@Override
public Extractor<Schema, GenericRecord> getExtractor(WorkUnitState state) throws IOException {
return new CustomKafkaAvroExtractor(state);
}

@Override
public List<WorkUnit> getWorkunits(SourceState state) {
if (!this.schemaRegistry.isPresent()) {
this.schemaRegistry = Optional.of(new KafkaAvroSchemaRegistry(state.getProperties()));
}
return super.getWorkunits(state);
}

/**
* A {@link KafkaTopic} is qualified if its schema exists in the schema registry.
*/
@Override
protected boolean isTopicQualified(KafkaTopic topic) {
Preconditions.checkState(this.schemaRegistry.isPresent(),
"Schema registry not found. Unable to verify topic schema");

try {
this.schemaRegistry.get().getLatestSchemaByTopic(topic.getName());
return true;
} catch (SchemaRegistryException e) {
LOG.error(String.format("Cannot find latest schema for topic %s. This topic will be skipped", topic.getName()));
return false;
}
}
}

public class CustomKafkaAvroExtractor extends KafkaAvroExtractor {
private static final Logger LOG = LoggerFactory.getLogger(CustomKafkaAvroExtractor.class);


public CustomKafkaAvroExtractor(WorkUnitState state) {
super(state);
}

@Override
protected Schema getRecordSchema(byte[] payload) {
Closer closer = Closer.create();
Schema schema = null;
InputStream inputStream = null;
try {
inputStream = closer.register(new DataInputStream(new ByteArrayInputStream(payload)));
schema = new Schema.Parser().parse(inputStream);
} catch (Exception ex) {
LOG.error(String.format("Failed to get Record Schema at CustomKafkaAvroExtractor %s", inputStream));
}
return schema;
}

@Override
protected Decoder getDecoder(byte[] payload) {
Closer closer = Closer.create();
InputStream inputStream = null;
Decoder decoder = null;
try {
inputStream = closer.register(new DataInputStream(new ByteArrayInputStream(payload)));
LOG.info(String.format("CustomKafkaAvroExtractor getDecoder inputStream:%s", inputStream.toString()));
decoder = DecoderFactory.get().binaryDecoder(inputStream, null);
} catch (Exception ex) {
LOG.error(String.format("Failed to get Decoder at CustomKafkaAvroExtractor %s", inputStream));
}
return decoder;
}
}


job config file as follow:

job.name=GobblinKafkaQuickStart
job.group=GobblinKafka
job.description=Gobblin quick start job for Kafka
job.lock.enabled=false


kafka.brokers=localhost:9092
topic.whitelist=test
bootstrap.with.offset=earliest


source.class=gobblin.source.extractor.extract.kafka.KafkaAvroSource
source.schema={"namespace":"example.avro", "type":"record", "name":"User", "fields":[{"name":"name", "type":"string"}, {"name":"favorite_number",  "type":"int"}, {"name":"favorite_color", "type":"string"}]}
extract.namespace=gobblin.extract.kafka


writer.builder.class=gobblin.writer.SimpleDataWriterBuilder

data.publisher.final.dir=/gobblintest/job-output

mr.job.max.mappers=1
mr.job.root.dir=/gobblin-kafka/working


metrics.reporting.file.enabled=true
metrics.reporting.file.suffix=txt


task.data.root.dir=/jobs/kafkaetl/gobblin/gobblin-kafka/task-data


Any advice will be very appreciated, thanks!

Zhenhua Cao

unread,
May 14, 2016, 10:33:28 AM5/14/16
to gobblin-users
Update the job config as below: this is test config.

job.name=GobblinKafkaQuickStart
job.group=GobblinKafka
job.description=Gobblin quick start job for Kafka
job.lock.enabled=false

kafka.brokers=localhost:9092
topic.whitelist=test
source.class=gobblin.source.extractor.extract.kafka.KafkaAvroSource
extract.namespace=gobblin.extract.kafka
source.schema={"namespace":"example.avro", "type":"record", "name":"User", "fields":[{"name":"name", "type":"string"}, {"name":"favorite_number",  "type":"int"}, {"name":"favorite_color", "type":"string"}]}


writer.builder.class=gobblin.writer.SimpleDataWriterBuilder
writer.file.path.type=tablename
writer.destination.type=HDFS
writer.output.format=AVRO

data.publisher.type=gobblin.publisher.BaseDataPublisher

mr.job.max.mappers=1

metrics.reporting.file.enabled=true
metrics.log.dir=/gobblin-kafka/metrics
metrics.reporting.file.suffix=txt

bootstrap.with.offset=earliest

fs.uri=hdfs://localhost:9090
writer.fs.uri=hdfs://localhost:9090
state.store.fs.uri=hdfs://localhost:9090

mr.job.root.dir=/gobblin-kafka/working
state.store.dir=/gobblin-kafka/state-store
task.data.root.dir=/jobs/kafkaetl/gobblin/gobblin-kafka/task-data
data.publisher.final.dir=/gobblintest/job-output

在 2016年5月14日星期六 UTC+8下午10:30:29,Zhenhua Cao写道:

Chavdar Botev

unread,
May 14, 2016, 3:25:37 PM5/14/16
to Zhenhua Cao, gobblin-users
What is the exception that you get?
> --
> You received this message because you are subscribed to the Google Groups
> "gobblin-users" group.
> To unsubscribe from this group and stop receiving emails from it, send an
> email to gobblin-user...@googlegroups.com.
> To post to this group, send email to gobbli...@googlegroups.com.
> To view this discussion on the web visit
> https://groups.google.com/d/msgid/gobblin-users/3f9e6a97-4a99-4309-b494-6190c570fb23%40googlegroups.com.
>
> For more options, visit https://groups.google.com/d/optout.

Zhenhua Cao

unread,
May 14, 2016, 8:12:34 PM5/14/16
to gobblin-users
2016-05-14 06:47:12 PDT INFO  [main] gobblin.util.ExecutorsUtils  144 - Successfully shutdown ExecutorService: java.util.concurrent.ThreadPoolExecutor@4e34c76f[Terminated, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 1]
2016-05-14 06:47:12 PDT ERROR [main] gobblin.runtime.SourceDecorator  59 - Failed to get work units for job job_GobblinKafkaQuickStart_1463233631178
java.lang.IllegalArgumentException: Property kafka.schema.registry.url not provided.
        at com.google.common.base.Preconditions.checkArgument(Preconditions.java:93)
        at gobblin.metrics.kafka.KafkaAvroSchemaRegistry.<init>(KafkaAvroSchemaRegistry.java:65)
        at gobblin.source.extractor.extract.kafka.KafkaAvroSource.getWorkunits(KafkaAvroSource.java:54)
        at gobblin.runtime.SourceDecorator.getWorkunits(SourceDecorator.java:52)
        at gobblin.runtime.AbstractJobLauncher.launchJob(AbstractJobLauncher.java:241)
        at gobblin.runtime.mapreduce.CliMRJobLauncher.launchJob(CliMRJobLauncher.java:87)
        at gobblin.runtime.mapreduce.CliMRJobLauncher.run(CliMRJobLauncher.java:64)
        at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:70)
        at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:84)
        at gobblin.runtime.mapreduce.CliMRJobLauncher.main(CliMRJobLauncher.java:110)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:606)
        at org.apache.hadoop.util.RunJar.run(RunJar.java:221)
        at org.apache.hadoop.util.RunJar.main(RunJar.java:136)
2016-05-14 06:47:12 PDT ERROR [main] gobblin.runtime.AbstractJobLauncher  321 - Failed to launch and run job job_GobblinKafkaQuickStart_1463233631178: gobblin.runtime.JobException: Failed to get work units for job job_GobblinKafkaQuickStart_1463233631178
gobblin.runtime.JobException: Failed to get work units for job job_GobblinKafkaQuickStart_1463233631178
        at gobblin.runtime.AbstractJobLauncher.launchJob(AbstractJobLauncher.java:249)
        at gobblin.runtime.mapreduce.CliMRJobLauncher.launchJob(CliMRJobLauncher.java:87)
        at gobblin.runtime.mapreduce.CliMRJobLauncher.run(CliMRJobLauncher.java:64)
        at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:70)
        at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:84)
        at gobblin.runtime.mapreduce.CliMRJobLauncher.main(CliMRJobLauncher.java:110)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:606)
        at org.apache.hadoop.util.RunJar.run(RunJar.java:221)
        at org.apache.hadoop.util.RunJar.main(RunJar.java:136)
2016-05-14 06:47:13 PDT INFO  [main] gobblin.util.ExecutorsUtils  125 - Attempting to shutdown ExecutorService: java.util.concurrent.ThreadPoolExecutor@38528f18[Shutting down, pool size = 1, active threads = 1, queued tasks = 0, completed tasks = 0]
2016-05-14 06:47:13 PDT INFO  [main] gobblin.util.ExecutorsUtils  144 - Successfully shutdown ExecutorService: java.util.concurrent.ThreadPoolExecutor@38528f18[Terminated, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 1]
2016-05-14 06:47:13 PDT INFO  [main] gobblin.util.ExecutorsUtils  125 - Attempting to shutdown ExecutorService: java.util.concurrent.ThreadPoolExecutor@6fa4c25c[Shutting down, pool size = 1, active threads = 1, queued tasks = 0, completed tasks = 0]
2016-05-14 06:47:13 PDT INFO  [main] gobblin.util.ExecutorsUtils  144 - Successfully shutdown ExecutorService: java.util.concurrent.ThreadPoolExecutor@6fa4c25c[Terminated, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 1]
2016-05-14 06:47:13 PDT INFO  [main] gobblin.runtime.app.ServiceBasedAppLauncher  162 - Shutting down the application
2016-05-14 06:47:13 PDT INFO  [MetricsReportingService STOPPING] gobblin.util.ExecutorsUtils  125 - Attempting to shutdown ExecutorService: java.util.concurrent.Executors$DelegatedScheduledExecutorService@334385bc
2016-05-14 06:47:13 PDT INFO  [MetricsReportingService STOPPING] gobblin.util.ExecutorsUtils  144 - Successfully shutdown ExecutorService: java.util.concurrent.Executors$DelegatedScheduledExecutorService@334385bc
2016-05-14 06:47:13 PDT DEBUG [MetricsReportingService STOPPING] org.apache.hadoop.hdfs.DFSOutputStream  1813 - DFSClient writeChunk allocating new packet seqno=0, src=/gobblin-kafka/metrics/app_CliMRJob-3ada5c29-9607-44f9-b5ea-91c5c105b8ce_1463233630846/app_CliMRJob-3ada5c29-9607-44f9-b5ea-91c5c105b8ce_1463233630846.txt.metrics.log, packetSize=65016, chunksPerPacket=126, bytesCurBlock=0
2016-05-14 06:47:13 PDT DEBUG [IPC Parameter Sending Thread #0] org.apache.hadoop.ipc.Client$Connection$3  1032 - IPC Client (216406687) connection to master/172.16.68.129:9000 from richard sending #12
2016-05-14 06:47:13 PDT DEBUG [IPC Client (216406687) connection to master/172.16.68.129:9000 from richard] org.apache.hadoop.ipc.Client$Connection  1089 - IPC Client (216406687) connection to master/172.16.68.129:9000 from richard got value #12
2016-05-14 06:47:13 PDT DEBUG [main] org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker  250 - Call: getFileInfo took 2ms
2016-05-14 06:47:13 PDT DEBUG [IPC Parameter Sending Thread #0] org.apache.hadoop.ipc.Client$Connection$3  1032 - IPC Client (216406687) connection to master/172.16.68.129:9000 from richard sending #13
2016-05-14 06:47:13 PDT DEBUG [IPC Client (216406687) connection to master/172.16.68.129:9000 from richard] org.apache.hadoop.ipc.Client$Connection  1089 - IPC Client (216406687) connection to master/172.16.68.129:9000 from richard got value #13
2016-05-14 06:47:13 PDT DEBUG [main] org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker  250 - Call: delete took 2ms
2016-05-14 06:47:13 PDT INFO  [main] gobblin.runtime.mapreduce.MRJobLauncher  464 - Deleted working directory /gobblin-kafka/working/GobblinKafkaQuickStart
2016-05-14 06:47:13 PDT WARN  [Thread-7] gobblin.metrics.reporter.ContextAwareReporter  116 - Reporter OutputStreamReporter has already been stopped.
2016-05-14 06:47:13 PDT DEBUG [Thread-7] org.apache.hadoop.hdfs.DFSOutputStream  1732 - Queued packet 0
2016-05-14 06:47:13 PDT DEBUG [Thread-7] org.apache.hadoop.hdfs.DFSOutputStream  1732 - Queued packet 1


在 2016年5月14日星期六 UTC+8下午10:30:29,Zhenhua Cao写道:
I ingest data from Kafka to HDFS  ,

Zhenhua Cao

unread,
May 14, 2016, 10:49:53 PM5/14/16
to gobblin-users
2016-05-14 19:37:15 PDT DEBUG [pool-14-thread-1] gobblin.metrics.kafka.KafkaAvroSchemaRegistry  108 - Fetching from URL : http://master:9000/schema-repo//latest_with_type=test
2016-05-14 19:37:15 PDT DEBUG [pool-14-thread-1] org.apache.commons.httpclient.params.DefaultHttpParams  151 - Set parameter http.useragent = Jakarta Commons-HttpClient/3.1
2016-05-14 19:37:15 PDT DEBUG [pool-14-thread-1] org.apache.commons.httpclient.params.DefaultHttpParams  151 - Set parameter http.protocol.version = HTTP/1.1
2016-05-14 19:37:15 PDT DEBUG [pool-14-thread-1] org.apache.commons.httpclient.params.DefaultHttpParams  151 - Set parameter http.connection-manager.class = class org.apache.commons.httpclient.SimpleHttpConnectionManager
2016-05-14 19:37:15 PDT DEBUG [pool-14-thread-1] org.apache.commons.httpclient.params.DefaultHttpParams  151 - Set parameter http.protocol.cookie-policy = default
2016-05-14 19:37:15 PDT DEBUG [pool-14-thread-1] org.apache.commons.httpclient.params.DefaultHttpParams  151 - Set parameter http.protocol.element-charset = US-ASCII
2016-05-14 19:37:15 PDT DEBUG [pool-14-thread-1] org.apache.commons.httpclient.params.DefaultHttpParams  151 - Set parameter http.protocol.content-charset = ISO-8859-1
2016-05-14 19:37:15 PDT DEBUG [pool-14-thread-1] org.apache.commons.httpclient.params.DefaultHttpParams  151 - Set parameter http.method.retry-handler = org.apache.commons.httpclient.DefaultHttpMethodRetryHandler@6e58d51c
2016-05-14 19:37:15 PDT DEBUG [pool-14-thread-1] org.apache.commons.httpclient.params.DefaultHttpParams  151 - Set parameter http.dateparser.patterns = [EEE, dd MMM yyyy HH:mm:ss zzz, EEEE, dd-MMM-yy HH:mm:ss zzz, EEE MMM d HH:mm:ss yyyy, EEE, dd-MMM-yyyy HH:mm:ss z, EEE, dd-MMM-yyyy HH-mm-ss z, EEE, dd MMM yy HH:mm:ss z, EEE dd-MMM-yyyy HH:mm:ss z, EEE dd MMM yyyy HH:mm:ss z, EEE dd-MMM-yyyy HH-mm-ss z, EEE dd-MMM-yy HH:mm:ss z, EEE dd MMM yy HH:mm:ss z, EEE,dd-MMM-yy HH:mm:ss z, EEE,dd-MMM-yyyy HH:mm:ss z, EEE, dd-MM-yyyy HH:mm:ss z]
2016-05-14 19:37:15 PDT DEBUG [pool-14-thread-1] org.apache.commons.httpclient.HttpClient  72 - Java version: 1.7.0_45
2016-05-14 19:37:15 PDT DEBUG [pool-14-thread-1] org.apache.commons.httpclient.HttpClient  73 - Java vendor: Oracle Corporation
2016-05-14 19:37:15 PDT DEBUG [pool-14-thread-1] org.apache.commons.httpclient.HttpClient  75 - Operating system name: Linux
2016-05-14 19:37:15 PDT DEBUG [pool-14-thread-1] org.apache.commons.httpclient.HttpClient  76 - Operating system architecture: amd64
2016-05-14 19:37:15 PDT DEBUG [pool-14-thread-1] org.apache.commons.httpclient.HttpClient  77 - Operating system version: 4.2.0-16-generic
2016-05-14 19:37:15 PDT DEBUG [pool-14-thread-1] org.apache.commons.httpclient.HttpClient  82 - SUN 1.7: SUN (DSA key/parameter generation; DSA signing; SHA-1, MD5 digests; SecureRandom; X.509 certificates; JKS keystore; PKIX CertPathValidator; PKIX CertPathBuilder; LDAP, Collection CertStores, JavaPolicy Policy; JavaLoginConfig Configuration)
2016-05-14 19:37:15 PDT DEBUG [pool-14-thread-1] org.apache.commons.httpclient.HttpClient  82 - SunRsaSign 1.7: Sun RSA signature provider
2016-05-14 19:37:15 PDT DEBUG [pool-14-thread-1] org.apache.commons.httpclient.HttpClient  82 - SunEC 1.7: Sun Elliptic Curve provider (EC, ECDSA, ECDH)
2016-05-14 19:37:15 PDT DEBUG [pool-14-thread-1] org.apache.commons.httpclient.HttpClient  82 - SunJSSE 1.7: Sun JSSE provider(PKCS12, SunX509 key/trust factories, SSLv3, TLSv1)
2016-05-14 19:37:15 PDT DEBUG [pool-14-thread-1] org.apache.commons.httpclient.HttpClient  82 - SunJCE 1.7: SunJCE Provider (implements RSA, DES, Triple DES, AES, Blowfish, ARCFOUR, RC2, PBE, Diffie-Hellman, HMAC)
2016-05-14 19:37:15 PDT DEBUG [pool-14-thread-1] org.apache.commons.httpclient.HttpClient  82 - SunJGSS 1.7: Sun (Kerberos v5, SPNEGO)
2016-05-14 19:37:15 PDT DEBUG [pool-14-thread-1] org.apache.commons.httpclient.HttpClient  82 - SunSASL 1.7: Sun SASL provider(implements client mechanisms for: DIGEST-MD5, GSSAPI, EXTERNAL, PLAIN, CRAM-MD5, NTLM; server mechanisms for: DIGEST-MD5, GSSAPI, CRAM-MD5, NTLM)
2016-05-14 19:37:15 PDT DEBUG [pool-14-thread-1] org.apache.commons.httpclient.HttpClient  82 - XMLDSig 1.0: XMLDSig (DOM XMLSignatureFactory; DOM KeyInfoFactory)
2016-05-14 19:37:15 PDT DEBUG [pool-14-thread-1] org.apache.commons.httpclient.HttpClient  82 - SunPCSC 1.7: Sun PC/SC provider
2016-05-14 19:37:15 PDT DEBUG [pool-14-thread-1] org.apache.commons.httpclient.HttpConnection  692 - Open connection to master:9000
2016-05-14 19:37:15 PDT DEBUG [pool-14-thread-1] org.apache.commons.httpclient.Wire  70 - >> "GET /schema-repo//latest_with_type=test HTTP/1.1[\r][\n]"
2016-05-14 19:37:15 PDT DEBUG [pool-14-thread-1] org.apache.commons.httpclient.HttpMethodBase  1352 - Adding Host request header
2016-05-14 19:37:15 PDT DEBUG [pool-14-thread-1] org.apache.commons.httpclient.Wire  70 - >> "User-Agent: Jakarta Commons-HttpClient/3.1[\r][\n]"
2016-05-14 19:37:15 PDT DEBUG [pool-14-thread-1] org.apache.commons.httpclient.Wire  70 - >> "Host: master:9000[\r][\n]"
2016-05-14 19:37:15 PDT DEBUG [pool-14-thread-1] org.apache.commons.httpclient.Wire  70 - >> "[\r][\n]"
2016-05-14 19:37:15 PDT DEBUG [pool-14-thread-1] org.apache.commons.httpclient.Wire  70 - << "HTTP/1.1 404 Not Found[\r][\n]"
2016-05-14 19:37:15 PDT DEBUG [pool-14-thread-1] org.apache.commons.httpclient.Wire  70 - << "HTTP/1.1 404 Not Found[\r][\n]"
2016-05-14 19:37:15 PDT DEBUG [pool-14-thread-1] org.apache.commons.httpclient.Wire  70 - << "Content-type: text/plain[\r][\n]"
2016-05-14 19:37:15 PDT DEBUG [pool-14-thread-1] org.apache.commons.httpclient.Wire  70 - << "[\r][\n]"
2016-05-14 19:37:15 PDT INFO  [pool-14-thread-1] org.apache.commons.httpclient.HttpMethodBase  1869 - Response content length is not known
2016-05-14 19:37:15 PDT DEBUG [pool-14-thread-1] org.apache.commons.httpclient.HttpMethodBase  967 - Force-close connection: true
2016-05-14 19:37:15 PDT WARN  [pool-14-thread-1] org.apache.commons.httpclient.HttpMethodBase  682 - Going to buffer response body of large or unknown size. Using getResponseBodyAsStream instead is recommended.
2016-05-14 19:37:15 PDT DEBUG [pool-14-thread-1] org.apache.commons.httpclient.HttpMethodBase  685 - Buffering response body
2016-05-14 19:37:15 PDT DEBUG [pool-14-thread-1] org.apache.commons.httpclient.Wire  70 - << "It looks like you are making an HTTP request to a Hadoop IPC port. This is not the correct port for the web interface on this daemon.[\r][\n]"
2016-05-14 19:37:15 PDT DEBUG [pool-14-thread-1] org.apache.commons.httpclient.HttpMethodBase  984 - Should force-close connection.
2016-05-14 19:37:15 PDT DEBUG [pool-14-thread-1] org.apache.commons.httpclient.HttpConnection  1178 - Releasing connection back to connection manager.
2016-05-14 19:37:15 PDT DEBUG [pool-14-thread-1] org.apache.commons.httpclient.HttpMethodBase  2342 - Default charset used: ISO-8859-1
2016-05-14 19:37:15 PDT ERROR [pool-14-thread-1] gobblin.source.extractor.extract.kafka.KafkaAvroSource  71 - Cannot find latest schema for topic test. This topic will be skipped
2016-05-14 19:37:15 PDT INFO  [pool-14-thread-1] kafka.utils.Logging$class  68 - Reconnect due to socket error: java.nio.channels.ClosedChannelException
2016-05-14 19:37:15 PDT DEBUG [pool-14-thread-1] kafka.utils.Logging$class  52 - Disconnecting from master:9092
2016-05-14 19:37:15 PDT DEBUG [pool-14-thread-1] kafka.utils.Logging$class  52 - Disconnecting from master:9092
2016-05-14 19:37:15 PDT DEBUG [pool-14-thread-1] kafka.utils.Logging$class  52 - Created socket with SO_TIMEOUT = 30000 (requested 30000), SO_RCVBUF = 212992 (requested 1048576), SO_SNDBUF = 1313280 (requested -1), connectTimeoutMs = 30000.
2016-05-14 19:37:15 PDT INFO  [pool-14-thread-1] gobblin.source.extractor.extract.kafka.KafkaSource  443 - Created workunit for partition test:0: lowWatermark=25, highWatermark=51, range=26
2016-05-14 19:37:15 PDT INFO  [main] gobblin.util.ExecutorsUtils  144 - Successfully shutdown ExecutorService: java.util.concurrent.ThreadPoolExecutor@678e882f[Terminated, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 1]
2016-05-14 19:37:15 PDT INFO  [main] gobblin.source.extractor.extract.kafka.KafkaSource  149 - Created workunits for 1 topics in 0 seconds
2016-05-14 19:37:15 PDT INFO  [main] gobblin.source.extractor.extract.kafka.workunit.packer.KafkaAvgRecordTimeBasedWorkUnitSizeEstimator  135 - Estimated avg time to pull a record for topic test is 3791.500000 milliseconds
2016-05-14 19:37:15 PDT INFO  [main] gobblin.source.extractor.extract.kafka.workunit.packer.KafkaAvgRecordTimeBasedWorkUnitSizeEstimator  145 - For all topics not pulled in the previous run, estimated avg time to pull a record is 3791.4999999999986 milliseconds
2016-05-14 19:37:15 PDT INFO  [main] gobblin.source.extractor.extract.kafka.workunit.packer.KafkaWorkUnitPacker  219 - Created MultiWorkUnit for partitions [test:0]
2016-05-14 19:37:15 PDT INFO  [main] gobblin.source.extractor.extract.kafka.workunit.packer.KafkaWorkUnitPacker  293 - MultiWorkUnit 0: estimated load=0.003010, partitions=[[test:0]]
2016-05-14 19:37:15 PDT INFO  [main] gobblin.source.extractor.extract.kafka.workunit.packer.KafkaWorkUnitPacker  279 - Min load of multiWorkUnit = 0.003010; Max load of multiWorkUnit = 0.003010; Diff = 0.000000%
2016-05-14 19:37:15 PDT DEBUG [IPC Parameter Sending Thread #0] org.apache.hadoop.ipc.Client$Connection$3  1032 - IPC Client (474146442) connection to master/172.16.68.129:9000 from richard sending #12
2016-05-14 19:37:15 PDT DEBUG [IPC Client (474146442) connection to master/172.16.68.129:9000 from richard] org.apache.hadoop.ipc.Client$Connection  1089 - IPC Client (474146442) connection to master/172.16.68.129:9000 from richard got value #12
2016-05-14 19:37:15 PDT DEBUG [main] org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker  250 - Call: getFileInfo took 2ms
2016-05-14 19:37:15 PDT DEBUG [IPC Parameter Sending Thread #0] org.apache.hadoop.ipc.Client$Connection$3  1032 - IPC Client (474146442) connection to master/172.16.68.129:9000 from richard sending #13
2016-05-14 19:37:15 PDT DEBUG [IPC Client (474146442) connection to master/172.16.68.129:9000 from richard] org.apache.hadoop.ipc.Client$Connection  1089 - IPC Client (474146442) connection to master/172.16.68.129:9000 from richard got value #13

my custom class as follow:
an

job.name=GobblinKafkaQuickStart
job.group=GobblinKafka
job.description=Gobblin quick start job for Kafka
job.lock.enabled=false

kafka.schema.registry.url=http://master:9000/schema-repo/
kafka.brokers=master:9092
topic.whitelist=test
source.class=gobblin.source.extractor.extract.kafka.KafkaAvroSource
extract.namespace=gobblin.extract.kafka
source.schema={"namespace":"example.avro", "type":"record", "name":"User", "fields":[{"name":"name", "type":"string"}, {"name":"favorite_number",  "type":"int"}, {"name":"favorite_color", "type":"string"}]}


writer.builder.class=gobblin.writer.SimpleDataWriterBuilder
writer.file.path.type=tablename
writer.destination.type=HDFS
writer.output.format=AVRO

data.publisher.type=gobblin.publisher.BaseDataPublisher

mr.job.max.mappers=1

metrics.reporting.file.enabled=true
metrics.log.dir=/gobblin-kafka/metrics
metrics.reporting.file.suffix=txt

bootstrap.with.offset=earliest

fs.uri=hdfs://master:9000
writer.fs.uri=${fs.uri}
state.store.fs.uri=${fs.uri}

mr.job.root.dir=/gobblin-kafka/working
state.store.dir=/gobblin-kafka/state-store
task.data.root.dir=/jobs/kafkaetl/gobblin/gobblin-kafka/task-data
data.publisher.final.dir=/gobblintest/job-output

I think the error because of KafkaAvroSource is not right. cat you show me detail about how to use KafkaAvroSource and how to modify my custom kakfaAvroSource class.

Zhenhua Cao

unread,
May 15, 2016, 4:03:43 AM5/15/16
to gobblin-users


在 2016年5月15日星期日 UTC+8上午10:49:53,Zhenhua Cao写道:
        I want ETL kafka data to hadoop data format as bellow:

      Obj avro.schema {"type":"record","name":"User","namespace":"example.avro","fields":[{"name":"name","type":"string"},{"name":"favorite_number","type":"int"},{"name":"favorite_color","type":"string"}]}'HO/1���=|
@ �� Alyssa yellow Ben red Charlie blue Alyssa yellow Ben red Charlie blue Alyssa yellow Ben red Charlie blue Alyssa yellow Ben red Charlie blue Alyssa yellow Ben red Charlie blue Alyssa yellow Ben red Charlie blue Alyssa yellow Ben red Charlie blue Alyssa yellow Ben red Charlie blue Alyssa yellow Ben red Charlie blue Alyssa yellow Ben red Charlie blue Alyssa yellow Ben red Charlie blue Alyssa yellow Ben red Charlie blue Alyssa yellow Ben red Charlie blue Alyssa yellow Ben red Charlie blue Alyssa yellow Ben red Charlie blue Alyssa yellow Ben red Charlie blue Alyssa yellow Ben red Charlie blue Alyssa yellow Ben red Charlie blue Alyssa yellow Ben red Charlie blue Alyssa yellow Ben red Charlie blue Alyssa yellow Ben red Charlie blue Alyssa yellow Ben red Charlie blue Alyssa yellow Ben red Charlie blue Alyssa yellow Ben red Charlie blue Alyssa yellow Ben red Charlie blue Alyssa yellow Ben red Charlie blue Alyssa yellow Ben red Charlie blue Alyssa yellow Ben red Charlie blue Alyssa yellow Ben red Charlie blue Alyssa yellow Ben red Charlie blue Alyssa yellow Ben red Charlie blue Alyssa yellow Ben red Charlie blue Alyssa yellow Ben red Charlie blue Alyssa yellow Ben red Charlie blue Alyssa yellow Ben red Charlie blue Alyssa yellow Ben red Charlie blue Alyssa yellow Ben red Charlie blue Alyssa yellow Ben red Charlie blue Alyssa yellow Ben red Charlie blue Alyssa yellow Ben red Charlie blue Alyssa yellow Ben red Charlie blue Alyssa yellow Ben red Charlie blue Alyssa yellow Ben red Charlie blue Alyssa yellow Ben red Charli 

Sahil Takiar

unread,
May 17, 2016, 12:41:17 PM5/17/16
to Zhenhua Cao, gobblin-users
If you just want to copy your data byte-for-byte, you can just use the KafkaSimpleSource instead - the KafkaAvroSource is mainly useful for Avro data that needs to be partitioned by some column, for testing purposes KafkaSimpleSource is probably your best bet.

KafkaSimpleSource doesn't require a SchemaRegistry so you don't need to worry about that either.

--
You received this message because you are subscribed to the Google Groups "gobblin-users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to gobblin-user...@googlegroups.com.
To post to this group, send email to gobbli...@googlegroups.com.

Zhenhua Cao

unread,
May 18, 2016, 9:42:19 AM5/18/16
to gobblin-users, caozhen...@gmail.com
I want to get the file format as bellow, how to cofig the job properties file;

Obj   avro.schema {"type":"record","name":"TUPLE_0","fields":[{"name":"column1","type":["null","int"],"doc":"autogenerated from Pig Field Schema"},{"name":"column2","type":["null","string"],"doc":"autogenerated from Pig Field Schema"},{"name":"column3","type":["null","int"],"doc":"autogenerated from Pig Field Schema"},{"name":"column4","type":["null","int"],"doc":"autogenerated from Pig Field Schema"},{"name":"column5","type":["null","string"],"doc":"autogenerated from Pig Field Schema"},{"name":"column6","type":["null","int"],"doc":"autogenerated from Pig Field Schema"},{"name":"column7","type":["null","int"],"doc":"autogenerated from Pig Field Schema"},{"name":"column8","type":["null","int"],"doc":"autogenerated from Pig Field Schema"},{"name":"column9","type":["null","int"],"doc":"autogenerated from Pig Field Schema"},{"name":"column10","type":["null","int"],"doc":"autogenerated from Pig Field Schema"},{"name":"column11","type":["null","string"],"doc":"autogenerated from Pig Field Schema"},{"name":"column12","type":["null","string"],"doc":"autogenerated from Pig Field Schema"},{"name":"column13","type":["null","string"],"doc":"autogenerated from Pig Field Schema"},{"name":"count","type":["null","int"],"doc":"autogenerated from Pig Field Schema"},{"name":"weeksSinceEpochSunday","type":["null","int"],"doc":"autogenerated from Pig Field Schema"},{"name":"daysSinceEpoch","type":["null","int"],"doc":"autogenerated from Pig Field Schema"},{"name":"column17","type":["null","int"],"doc":"autogenerated from Pig Field Schema"},{"name":"column18","type":["null","int"],"doc":"autogenerated from Pig Field Schema"}]}g 6\ MdW j��\�� ��  ����  lNdAcT ��  ����   gFuH   ��ܣ  ����  �ȹ�  ����   P  HEuxNvH  P   ���| ��x ڽ  ��Ҫ  ����  lNdAcT ��  ����   gFuH   ��ܣ  ����  �ȹ�  ����   P  HEuxNvH  P   ���| ��x ڽ  ��Ҫ  ��ˤ  lNdAcT ����  ����   gFuH ��ݚ  ��ݚ  ����  ��ݚ  ����   o 4KrNxpdycSiwoRohEiTIlLqDHnx  o   ���| ��x ���
��͗ ׂ lNdAcT �Կ ���� gFuH ��ܣ �؝� ���� ���� ���� P HEuxNvH t ���| ��x ڽ ���� ׂ lNdAcT ���� gFuH ��ܣ �؝� ���� ���� ���� P HEuxNvH P ���| ��x ڽ ���% ��� lNdAcT ��ٮ ���� gFuH ���
���� ���� Ұ ���� P 4KrNxpdycSiwoRohEiTIlLqDHnx P ���| ��x ���
���� ��� lNdAcT ��ٮ ���� gFuH ���
���� ���� Ұ ���� P 4KrNxpdycSiwoRohEiTIlLqDHnx P ���| ��x ��� 

Now my job.pull as bellow. 
job.name=GobblinKafkaQuickStart
job.group=GobblinKafka
job.description=Gobblin quick start job for Kafka
job.lock.enabled=false

kafka.brokers=localhost:9092
topic.whitelist=eventOLAP
source.class=gobblin.source.extractor.extract.kafka.KafkaSimpleSource
extract.namespace=gobblin.extract.kafka

writer.builder.class=gobblin.writer.SimpleDataWriterBuilder
writer.file.path.type=tablename
writer.destination.type=HDFS
writer.output.format=avro

data.publisher.type=gobblin.publisher.BaseDataPublisher

mr.job.max.mappers=1

metrics.reporting.file.enabled=true
metrics.log.dir=/gobblin-kafka/metrics
metrics.reporting.file.suffix=txt

bootstrap.with.offset=earliest

fs.uri=hdfs://localhost:9090
writer.fs.uri=hdfs://localhost:9090
state.store.fs.uri=hdfs://localhost:9090

mr.job.root.dir=/gobblin-kafka/working
state.store.dir=/gobblin-kafka/state-store
task.data.root.dir=/jobs/kafkaetl/gobblin/gobblin-kafka/task-data
data.publisher.final.dir=/gobblintest/job-output/eventolap


在 2016年5月18日星期三 UTC+8上午12:41:17,Sahil Takiar写道:

Sahil Takiar

unread,
May 18, 2016, 1:47:41 PM5/18/16
to Zhenhua Cao, gobblin-users
What's wrong with your current configuration file? It should work. Are you hitting an exception? If you are, can you send the stack trace.

Zhenhua Cao

unread,
May 18, 2016, 10:55:42 PM5/18/16
to gobblin-users, caozhen...@gmail.com

I want to write bytes of Avro object to the avro files. just as bellow,   


 Obj avro.schema {"type":"record","name":"TUPLE_0","fields":[{"name":"column1","type":["null","int"],"doc":"autogenerated from Pig Field Schema"},{"name":"column2","type":["null","string"],"doc":"autogenerated from Pig Field Schema"},{"name":"column3","type":["null","int"],"doc":"autogenerated from Pig Field Schema"},{"name":"column4","type":["null","int"],"doc":"autogenerated from Pig Field Schema"},{"name":"column5","type":["null","string"],"doc":"autogenerated from Pig Field Schema"},{"name":"column6","type":["null","int"],"doc":"autogenerated from Pig Field Schema"},{"name":"column7","type":["null","int"],"doc":"autogenerated from Pig Field Schema"},{"name":"column8","type":["null","int"],"doc":"autogenerated from Pig Field Schema"},{"name":"column9","type":["null","int"],"doc":"autogenerated from Pig Field Schema"},{"name":"column10","type":["null","int"],"doc":"autogenerated from Pig Field Schema"},{"name":"column11","type":["null","string"],"doc":"autogenerated from Pig Field Schema"},{"name":"column12","type":["null","string"],"doc":"autogenerated from Pig Field Schema"},{"name":"column13","type":["null","string"],"doc":"autogenerated from Pig Field Schema"},{"name":"count","type":["null","int"],"doc":"autogenerated from Pig Field Schema"},{"name":"weeksSinceEpochSunday","type":["null","int"],"doc":"autogenerated from Pig Field Schema"},{"name":"daysSinceEpoch","type":["null","int"],"doc":"autogenerated from Pig Field Schema"},{"name":"column17","type":["null","int"],"doc":"autogenerated from Pig Field Schema"},{"name":"column18","type":["null","int"],"doc":"autogenerated from Pig Field Schema"}]}g 6\ MdW j��\�� �� ���� lNdAcT �� ���� gFuH ��ܣ ���� �ȹ� ���� P HEuxNvH P ���| ��x ڽ ��Ҫ ���� lNdAcT �� ���� gFuH ��ܣ ���� �ȹ� ���� P HEuxNvH P ���| ��x ڽ ��Ҫ ��ˤ lNdAcT ���� ���� gFuH ��ݚ ��ݚ ���� ��ݚ ���� o 4KrNxpdycSiwoRohEiTIlLqDHnx o ���| ��x ���

 ��͗  ׂ  lNdAcT �Կ  ����   gFuH ��ܣ  �؝�  ����  ����  ����   P  HEuxNvH  t   ���| ��x ڽ  ���� ׂ  lNdAcT   ����   gFuH ��ܣ  �؝�  ����  ����  ����   P  HEuxNvH  P   ���| ��x ڽ  ���% ���  lNdAcT ��ٮ  ����   gFuH ���
���� ���� Ұ ���� P 4KrNxpdycSiwoRohEiTIlLqDHnx P ���| ��x ���
���� ��� lNdAcT ��ٮ ���� gFuH ���
���� ���� Ұ ���� P 4KrNxpdycSiwoRohEiTIlLqDHnx P  �������� 

But now I get the avro file content as bellow:

{"city":"yiggVwsCvy", "country":"BLVlDVqCFk", "create_time":1463544194, "event_count":379, "event_key":"MdtvIBrvNp", "event_name":"jsubFvQPzK","group_id":290,"open_id":1, "plat_id":"eUTRlQcXqH", "province":"GJUMcENXmA", "sex":42}[2016-05-18 18:54:22,394] INFO Closing socket connection to /127.0.0.1. (kafka.network.Processor){"city":"yiggVwsCvy", "country":"BLVlDVqCFk", "create_time":1463544194, "event_count":379, "event_key":"MdtvIBrvNp", "event_name":"jsubFvQPzK","group_id":290,"open_id":1, "plat_id":"eUTRlQcXqH", "province":"GJUMcENXmA", "sex":42}{"city":"yiggVwsCvy", "country":"BLVlDVqCFk", "create_time":1463544194, "event_count":379, "event_key":"MdtvIBrvNp", "event_name":"jsubFvQPzK","group_id":290,"open_id":1, "plat_id":"eUTRlQcXqH", "province":"GJUMcENXmA", "sex":42}{"city":"yiggVwsCvy", "country":"BLVlDVqCFk", "create_time":1463544194, "event_count":379, "event_key":"MdtvIBrvNp", "event_name":"jsubFvQPzK","group_id":290,"open_id":1, "plat_id":"eUTRlQcXqH", "province":"GJUMcENXmA", "sex":42}[2016-05-18 18:54:22,394] INFO Closing socket connection to /127.0.0.1. (kafka.network.Processor){"city":"yiggVwsCvy", "country":"BLVlDVqCFk", "create_time":1463544194, "event_count":379, "event_key":"MdtvIBrvNp", "event_name":"jsubFvQPzK","group_id":290,"open_id":1, "plat_id":"eUTRlQcXqH", "province":"GJUMcENXmA", "sex":42}{"city":"yiggVwsCvy", "country":"BLVlDVqCFk", "create_time":1463544194, "event_count":379, "event_key":"MdtvIBrvNp", "event_name":"jsubFvQPzK","group_id":290,"open_id":1, "plat_id":"eUTRlQcXqH", "province":"GJUMcENXmA", "sex":42}{"city":"yiggVwsCvy", "country":"BLVlDVqCFk", "create_time":1463544194, "event_count":379, "event_key":"MdtvIBrvNp", "event_name":"jsubFvQPzK","group_id":290,"open_id":1, "plat_id":"eUTRlQcXqH", "province":"GJUMcENXmA", "sex":42}[2016-05-18 18:54:22,394] INFO Closing socket connection to /127.0.0.1. (kafka.network.Processor){"city":"yiggVwsCvy", "country":"BLVlDVqCFk", "create_time":1463544194, "event_count":379, "event_key":"MdtvIBrvNp", "event_name":"jsubFvQPzK","group_id":290,"open_id":1, "plat_id":"eUTRlQcXqH", "province":"GJUMcENXmA", "sex":42}{"city":"yiggVwsCvy", "country":"BLVlDVqCFk", "create_time":1463544194, "event_count":379, "event_key":"MdtvIBrvNp", "event_name":"jsubFvQPzK","group_id":290,"open_id":1, "plat_id":"eUTRlQcXqH", "province":"GJUMcENXmA", "sex":42}

the config file content as bellow:
job.name=GobblinKafkaQuickStart
job.group=GobblinKafka
job.description=Gobblin quick start job for Kafka
job.lock.enabled=false

kafka.brokers=localhost:9092
topic.whitelist=eventOLAP

source.class=gobblin.source.extractor.extract.kafka.KafkaSimpleSource
extract.namespace=gobblin.extract.kafka

#writer.builder.class=gobblin.writer.SimpleDataWriterBuilder
#writer.file.path.type=tablename
writer.destination.type=HDFS
writer.output.format=avro

data.publisher.type=gobblin.publisher.BaseDataPublisher

mr.job.max.mappers=4

metrics.reporting.file.enabled=true
metrics.log.dir=/gobblin-kafka/metrics
metrics.reporting.file.suffix=txt

bootstrap.with.offset=earliest

fs.uri=hdfs://master:9000
writer.fs.uri=hdfs://master:9000
state.store.fs.uri=hdfs://master:9000

mr.job.root.dir=/gobblin-kafka/working
state.store.dir=/gobblin-kafka/state-store
task.data.root.dir=/jobs/kafkaetl/gobblin/gobblin-kafka/task-data
data.publisher.final.dir=/gobblintest/job-output/eventolap


the logs content as bellow:

2016-05-17 21:09:38 PDT INFO  [main] gobblin.runtime.mapreduce.MRJobLauncher  197 - Launching Hadoop MR job Gobblin-GobblinKafkaQuickStart
2016-05-17 21:09:38 PDT INFO  [main] org.apache.hadoop.yarn.client.RMProxy  98 - Connecting to ResourceManager at master/172.16.68.129:8032
2016-05-17 21:09:41 PDT INFO  [main] org.apache.hadoop.mapreduce.lib.input.FileInputFormat  283 - Total input paths to process : 1
2016-05-17 21:09:41 PDT INFO  [main] org.apache.hadoop.mapreduce.JobSubmitter  198 - number of splits:1
2016-05-17 21:09:41 PDT INFO  [main] org.apache.hadoop.mapreduce.JobSubmitter  287 - Submitting tokens for job: job_1463540068725_0003
2016-05-17 21:09:42 PDT INFO  [main] org.apache.hadoop.yarn.client.api.impl.YarnClientImpl  273 - Submitted application application_1463540068725_0003
2016-05-17 21:09:42 PDT INFO  [main] org.apache.hadoop.mapreduce.Job  1294 - The url to track the job: http://master:8088/proxy/application_1463540068725_0003/
2016-05-17 21:09:42 PDT INFO  [main] gobblin.runtime.mapreduce.MRJobLauncher  207 - Waiting for Hadoop MR job job_1463540068725_0003 to complete
2016-05-17 21:09:42 PDT INFO  [main] org.apache.hadoop.mapreduce.Job  1339 - Running job: job_1463540068725_0003
2016-05-17 21:10:06 PDT INFO  [main] org.apache.hadoop.mapreduce.Job  1360 - Job job_1463540068725_0003 running in uber mode : false
2016-05-17 21:10:06 PDT INFO  [main] org.apache.hadoop.mapreduce.Job  1367 -  map 0% reduce 0%
2016-05-17 21:10:35 PDT INFO  [main] org.apache.hadoop.mapreduce.Job  1367 -  map 100% reduce 0%
2016-05-17 21:10:38 PDT WARN  [TaskStateCollectorService RUNNING] gobblin.runtime.TaskStateCollectorService  119 - Output task state path /gobblin-kafka/working/GobblinKafkaQuickStart/output/job_GobblinKafkaQuickStart_1463544573362 does not exist
2016-05-17 21:11:33 PDT INFO  [main] org.apache.hadoop.mapreduce.Job  1378 - Job job_1463540068725_0003 completed successfully
2016-05-17 21:11:33 PDT INFO  [main] org.apache.hadoop.mapreduce.Job  1385 - Counters: 30
        File System Counters
                FILE: Number of bytes read=0
                FILE: Number of bytes written=131176
                FILE: Number of read operations=0
                FILE: Number of large read operations=0
                FILE: Number of write operations=0
                HDFS: Number of bytes read=1948
                HDFS: Number of bytes written=108346
                HDFS: Number of read operations=44
                HDFS: Number of large read operations=0
                HDFS: Number of write operations=21
        Job Counters
                Launched map tasks=1
                Other local map tasks=1
                Total time spent by all maps in occupied slots (ms)=83068
                Total time spent by all reduces in occupied slots (ms)=0
                Total time spent by all map tasks (ms)=83068
                Total vcore-seconds taken by all map tasks=83068
                Total megabyte-seconds taken by all map tasks=85061632
        Map-Reduce Framework
                Map input records=1
                Map output records=0
                Input split bytes=182
                Spilled Records=0
                Failed Shuffles=0
                Merged Map outputs=0
                GC time elapsed (ms)=256
                CPU time spent (ms)=3330
                Physical memory (bytes) snapshot=210235392
                Virtual memory (bytes) snapshot=719265792
                Total committed heap usage (bytes)=88080384
        File Input Format Counters
                Bytes Read=105
        File Output Format Counters
                Bytes Written=0
2016-05-17 21:11:33 PDT INFO  [TaskStateCollectorService STOPPING] gobblin.runtime.TaskStateCollectorService  98 - Stopping the TaskStateCollectorService
2016-05-17 21:11:33 PDT INFO  [ParallelRunner] org.apache.hadoop.io.compress.CodecPool  181 - Got brand-new decompressor [.deflate]
2016-05-17 21:11:33 PDT INFO  [ParallelRunner] org.apache.hadoop.io.compress.CodecPool  181 - Got brand-new decompressor [.deflate]
2016-05-17 21:11:33 PDT INFO  [ParallelRunner] org.apache.hadoop.io.compress.CodecPool  181 - Got brand-new decompressor [.deflate]
2016-05-17 21:11:33 PDT INFO  [ParallelRunner] org.apache.hadoop.io.compress.CodecPool  181 - Got brand-new decompressor [.deflate]
2016-05-17 21:11:33 PDT INFO  [ParallelRunner] org.apache.hadoop.io.compress.CodecPool  181 - Got brand-new decompressor [.deflate]
2016-05-17 21:11:33 PDT INFO  [TaskStateCollectorService STOPPING] gobblin.util.ExecutorsUtils  125 - Attempting to shutdown ExecutorService: java.util.concurrent.ThreadPoolExecutor@4ed39f12[Shutting down, pool size = 3, active threads = 1, queued tasks = 0, completed tasks = 2]
2016-05-17 21:11:33 PDT INFO  [TaskStateCollectorService STOPPING] gobblin.util.ExecutorsUtils  144 - Successfully shutdown ExecutorService: java.util.concurrent.ThreadPoolExecutor@4ed39f12[Terminated, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 3]
2016-05-17 21:11:33 PDT INFO  [TaskStateCollectorService STOPPING] gobblin.runtime.TaskStateCollectorService  147 - Collected task state of 3 completed tasks
2016-05-17 21:11:33 PDT INFO  [TaskStateCollectorService STOPPING] gobblin.runtime.JobContext  324 - 3 more tasks of job job_GobblinKafkaQuickStart_1463544573362 have completed
2016-05-17 21:11:33 PDT INFO  [main] gobblin.runtime.mapreduce.MRJobLauncher  464 - Deleted working directory /gobblin-kafka/working/GobblinKafkaQuickStart
2016-05-17 21:11:33 PDT INFO  [main] gobblin.runtime.JobContext  397 - Committing dataset  of job job_GobblinKafkaQuickStart_1463544573362 with commit policy COMMIT_ON_FULL_SUCCESS and state SUCCESSFUL
2016-05-17 21:11:33 PDT INFO  [main] gobblin.publisher.BaseDataPublisher  310 - Moving hdfs://master:9000/jobs/kafkaetl/gobblin/gobblin-kafka/task-data/job_GobblinKafkaQuickStart_1463544573362/task-output/eventOLAP/part.task_GobblinKafkaQuickStart_1463544573362_0.avro to /gobblintest/job-output/eventolap/eventOLAP/part.task_GobblinKafkaQuickStart_1463544573362_0.avro
2016-05-17 21:11:33 PDT WARN  [main] gobblin.publisher.BaseDataPublisher  199 - Branch 0 of WorkUnit task_GobblinKafkaQuickStart_1463544573362_1 produced no data
2016-05-17 21:11:33 PDT WARN  [main] gobblin.publisher.BaseDataPublisher  199 - Branch 0 of WorkUnit task_GobblinKafkaQuickStart_1463544573362_2 produced no data
2016-05-17 21:11:33 PDT INFO  [main] gobblin.util.ExecutorsUtils  125 - Attempting to shutdown ExecutorService: java.util.concurrent.ThreadPoolExecutor@5b324d9a[Shutting down, pool size = 1, active threads = 1, queued tasks = 0, completed tasks = 0]
2016-05-17 21:11:33 PDT INFO  [main] gobblin.util.ExecutorsUtils  144 - Successfully shutdown ExecutorService: java.util.concurrent.ThreadPoolExecutor@5b324d9a[Terminated, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 1]
2016-05-17 21:11:33 PDT INFO  [main] gobblin.runtime.JobContext  567 - Persisting dataset state for dataset
2016-05-17 21:11:33 PDT INFO  [main] gobblin.runtime.FsDatasetStateStore  219 - Persisting job_GobblinKafkaQuickStart_1463544573362.jst to the job state store
2016-05-17 21:11:33 PDT INFO  [main] org.apache.hadoop.io.compress.CodecPool  153 - Got brand-new compressor [.deflate]
2016-05-17 21:11:34 PDT INFO  [main] gobblin.util.JobLauncherUtils  213 - Cleaning up staging directory /jobs/kafkaetl/gobblin/gobblin-kafka/task-data/job_GobblinKafkaQuickStart_1463544573362/task-staging/eventOLAP
2016-05-17 21:11:34 PDT INFO  [main] gobblin.util.JobLauncherUtils  219 - Cleaning up output directory /jobs/kafkaetl/gobblin/gobblin-kafka/task-data/job_GobblinKafkaQuickStart_1463544573362/task-output/eventOLAP
2016-05-17 21:11:34 PDT INFO  [main] gobblin.util.ExecutorsUtils  125 - Attempting to shutdown ExecutorService: java.util.concurrent.ThreadPoolExecutor@393d706b[Shutting down, pool size = 2, active threads = 0, queued tasks = 0, completed tasks = 2]
2016-05-17 21:11:34 PDT INFO  [main] gobblin.util.ExecutorsUtils  144 - Successfully shutdown ExecutorService: java.util.concurrent.ThreadPoolExecutor@393d706b[Terminated, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 2]
2016-05-17 21:11:34 PDT INFO  [main] gobblin.util.ExecutorsUtils  125 - Attempting to shutdown ExecutorService: java.util.concurrent.ThreadPoolExecutor@2d17ad86[Shutting down, pool size = 1, active threads = 0, queued tasks = 0, completed tasks = 1]
2016-05-17 21:11:34 PDT INFO  [main] gobblin.util.ExecutorsUtils  144 - Successfully shutdown ExecutorService: java.util.concurrent.ThreadPoolExecutor@2d17ad86[Terminated, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 1]
2016-05-17 21:11:34 PDT INFO  [main] gobblin.runtime.app.ServiceBasedAppLauncher  165 - Shutting down the application
2016-05-17 21:11:34 PDT INFO  [MetricsReportingService STOPPING] gobblin.util.ExecutorsUtils  125 - Attempting to shutdown ExecutorService: java.util.concurrent.Executors$DelegatedScheduledExecutorService@3eb5e480
2016-05-17 21:11:34 PDT INFO  [MetricsReportingService STOPPING] gobblin.util.ExecutorsUtils  144 - Successfully shutdown ExecutorService: java.util.concurrent.Executors$DelegatedScheduledExecutorService@3eb5e480
2016-05-17 21:11:34 PDT WARN  [Thread-7] gobblin.metrics.reporter.ContextAwareReporter  116 - Reporter OutputStreamReporter has already been stopped.


在 2016年5月19日星期四 UTC+8上午1:47:41,Sahil Takiar写道:

Sahil Takiar

unread,
May 24, 2016, 5:31:49 PM5/24/16
to Zhenhua Cao, gobblin-users
How are you writing data to your Kafka cluster? Are you writing it in Avro format?

...

[Message clipped]  

Reply all
Reply to author
Forward
0 new messages