Problems with typesafe config and forks

1,078 views
Skip to first unread message

Maurice Wolter

unread,
Jun 30, 2017, 10:44:21 AM6/30/17
to gobblin-users
Hi,
I ran into the following problem while ingesting data from Kafka into HDFS using gobblin 0.10.0


Exception in thread "main" com.typesafe.config.ConfigException$WrongType: hardcoded value: writer.fs.uri has type OBJECT rather than STRING
        at com
.typesafe.config.impl.SimpleConfig.findKeyOrNull(SimpleConfig.java:159)
        at com
.typesafe.config.impl.SimpleConfig.findOrNull(SimpleConfig.java:170)
        at com
.typesafe.config.impl.SimpleConfig.findOrNull(SimpleConfig.java:176)
        at com
.typesafe.config.impl.SimpleConfig.findOrNull(SimpleConfig.java:176)
        at com
.typesafe.config.impl.SimpleConfig.find(SimpleConfig.java:184)
        at com
.typesafe.config.impl.SimpleConfig.find(SimpleConfig.java:189)
        at com
.typesafe.config.impl.SimpleConfig.getString(SimpleConfig.java:246)
        at gobblin
.util.ConfigUtils.getString(ConfigUtils.java:307)
        at net
.meetrics.dada.ingestion.gobblin.runtime.CliMRJobLauncher.<init>(CliMRJobLauncher.java:51)
        at net
.meetrics.dada.ingestion.gobblin.runtime.CliMRJobLauncher.main(CliMRJobLauncher.java:79)
        at sun
.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun
.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun
.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java
.lang.reflect.Method.invoke(Method.java:497)
        at org
.apache.hadoop.util.RunJar.run(RunJar.java:221)
        at org
.apache.hadoop.util.RunJar.main(RunJar.java:136)
Build was aborted

As I use different branches/forks I have the following entries in my configuration:
fs.uri=hdfs://nameservice1
writer.fs.uri.0=${fs.uri}
writer.fs.uri.1=${fs.uri}
writer.fs.uri.2=${fs.uri}

To me, it looks like the problem is related to com.typesafe.config.ConfigFactory#parseMap(Map<String, ? extends Object>,String) which is invoked from within gobblin.util.ConfigUtils,
The javaDoc says:
An exception will be thrown (and it is a bug in the caller of the method) if a path is both an object and a value, for example if you had both "a=foo" and "a.b=bar",
then "a" is both the string "foo" and the parent object of "b". The caller of this method should ensure that doesn't happen.

The Exception does not occur when I remove the branch branch specific configurations, however, then the application does not use the correct fs.uri.

I use Gobblin for a while already so I run my ingestion process classically as a mapReduceJob using a slightly modified version of
gobblin.runtime.mapreduce.CliMRJobLauncher together with plain property (.properties) file.

Do I need to change the way I run my jobs or is this really a bug? Did someone experience similar problems?

Best,
Maurice

Issac Buenrostro

unread,
Jul 12, 2017, 7:41:47 PM7/12/17
to Maurice Wolter, gobblin-users
Hi Maurice,

It looks like net.meetrics.dada.ingestion.gobblin.runtime.CliMRJobLauncher is trying to read the property writer.fs.uri at line 51, which does not exist. Can you take a look at your code?

If you need any support, an excerpt of your code might be helpful.

Best,
Issac

--
You received this message because you are subscribed to the Google Groups "gobblin-users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to gobblin-users+unsubscribe@googlegroups.com.
To post to this group, send email to gobbli...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/gobblin-users/6a384298-3c4a-43cf-8dbb-f5324287ed28%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

Maurice Wolter

unread,
Jul 13, 2017, 11:32:26 AM7/13/17
to gobblin-users, maurice...@gmail.com
Hi,

the values are read from a property file which definietly contains the writer.fs.uri property, it also does not throw an Exception when I remove the "indexed" properties like writer.fs.uri.0.



import java.util.Properties;

import com.typesafe.config.Config;

import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.DefaultParser;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.Options;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.ToolRunner;

import gobblin.configuration.ConfigurationKeys;
import gobblin.runtime.cli.CliOptions;
import gobblin.util.ConfigUtils;
import gobblin.util.JobConfigurationUtils;

private CliMRJobLauncher(final Configuration conf, final Properties jobProperties) throws Exception {
   
super(conf, jobProperties);
   
final Config config = ConfigUtils.propertiesToConfig(jobProperties);

   
// next line will print the value of writer.fs.uri
   
System.out.println(jobProperties.getProperty(ConfigurationKeys.WRITER_FILE_SYSTEM_URI));

    // next line will throw the Exception
   
System.out.println(ConfigUtils.getString(config, ConfigurationKeys.STATE_STORE_FS_URI_KEY, ConfigurationKeys.LOCAL_FS_URI));
}
public static void
main(final String[] args) throws Exception {
   
final Configuration conf = new Configuration();

   
final String[] genericCmdLineOpts = new GenericOptionsParser(conf, args).getCommandLine().getArgs();
   
final Properties gobblinProperties = CliOptions.parseArgs(CliMRJobLauncher.class, genericCmdLineOpts);

   
final Options options = new Options();
    options
.addOption(MEETRICS_CONFIG_OPTION);
    options
.addOption(CliOptions.SYS_CONFIG_OPTION);
    options
.addOption(CliOptions.JOB_CONFIG_OPTION);
   
final CommandLine cmd = new DefaultParser().parse(options, genericCmdLineOpts);

   
final Properties meetricsProperties = JobConfigurationUtils.fileToProperties(cmd.getOptionValue(MEETRICS_CONFIG_OPTION.getLongOpt()));

   
// Launch and run the job
    System.exit(ToolRunner.run(
       
new CliMRJobLauncher(
            conf
,
           
JobConfigurationUtils.combineSysAndJobProperties(meetricsProperties, gobblinProperties)),
        args
));
}

The Job is started via
hadoop jar $WORKSPACE/dada/dada-ingestion/target/dada-ingestion-dev.jar\
 -jobconfig ingestion.mxraw.dev.properties\
 
-sysconfig ingestion.common.properties\
 
-meetricsconfig meetrics-dev.properties

meetricsconfig file contains just some specific configuration to our system,

This is ingestion.common.properties:
# suppress inspection "UnusedProperty" for whole file


# File system URIs
fs.uri=hdfs://nameservice1
writer.fs.uri=${fs.uri}
writer.fs.uri.0=${fs.uri}
writer.fs.uri.1=${fs.uri}
writer.fs.uri.2=${fs.uri}
state.store.fs.uri=${fs.uri}
state.store.fs.uri.0=${fs.uri}
state.store.fs.uri.1=${fs.uri}
state.store.fs.uri.2=${fs.uri}

# Writer related configuration properties
writer.destination.type=HDFS
writer.output.format=AVRO
task.data.root.dir=${env:GOBBLIN_WORK_DIR}

# Data publisher related configuration properties
data.publisher.type=gobblin.publisher.BaseDataPublisher
data.publisher.final.dir=${env:GOBBLIN_WORK_DIR}/job-output
data.publisher.replace.final.dir=false

# Directory where job/task state files are stored
state.store.dir=${env:GOBBLIN_WORK_DIR}/state-store

# Directory where error files from the quality checkers are stored
qualitychecker.row.err.file=${env:GOBBLIN_WORK_DIR}/err

# Directory where job locks are stored
job.lock.dir=${env:GOBBLIN_WORK_DIR}/locks

# Directory where metrics log files are stored
metrics.log.dir=${env:GOBBLIN_WORK_DIR}/metrics

# Interval of task state reporting in milliseconds
task.status.reportintervalinms=5000

# MapReduce properties
mr.job.root.dir=${env:GOBBLIN_WORK_DIR}/working
mr.include.task.counters=false

job.lock.enabled=true

This is ingestion.mxraw.dev.properties:
Code hier eingeben...
# suppress inspection "UnusedProperty" for whole file

job.name=dev_mxraw
job.group=kafka

bootstrap.with.offset=latest

extract.namespace=gobblin.extract.kafka

root.dir.data=/data_dev/mxraw
root.dir.gobblin=/data_dev/.ingestion

state.store.dir=${root.dir.gobblin}/state-store
qualitychecker.row.err.file=${root.dir.gobblin}/err/${job.name}
job.lock.dir=${root.dir.gobblin}/locks
mr.job.root.dir=${root.dir.gobblin}/working
task.data.root.dir=${root.dir.gobblin}

default.writer.codec.type=deflate
default.writer.deflate.level=6
default.writer.destination.type=HDFS
default.writer.output.format=AVRO

writer.builder.class=net.meetrics.dada.ingestion.gobblin.writer.MxrawAvroDataWriterBuilder
writer.partitioner.class=net.meetrics.dada.ingestion.gobblin.writer.partitioner.ProjectIdWriterPartitioner
default.writer.partition.columns=it
default.writer.partition.prefix=hourly
default.writer.partition.timezone=UTC
default.writer.partition.pattern=yyyy/MM/dd/HH

data.publisher.type=net.meetrics.dada.ingestion.gobblin.publisher.KafkaTimePartitionedDataPublisher
data.publisher.replace.final.dir=false

source.class=net.meetrics.dada.ingestion.gobblin.extractor.KafkaMxRawSource
converter.classes=net.meetrics.dada.ingestion.gobblin.converter.JsonByteArrayToAvroConverter,gobblin.converter.avro.AvroToAvroCopyableConverter

fork.branches=3

#
# BASE
#
fork.branch.name.0=base
writer.staging.dir.0=${task.data.root.dir}/task-staging
writer.output.dir.0=${task.data.root.dir}/task-output
writer.file.path.0=${fork.branch.name.0}
writer.codec.type.0=${default.writer.codec.type}
writer.deflate.level.0=${default.writer.deflate.level}
writer.destination.type.0=${default.writer.destination.type}
writer.output.format.0=${default.writer.output.format}
writer.builder.class.0=${writer.builder.class}
writer.partitioner.class.0=${writer.partitioner.class}
writer.partition.columns.0=${default.writer.partition.columns}
writer.partition.prefix.0=${default.writer.partition.prefix}
writer.partition.pattern.0=${default.writer.partition.pattern}
writer.partition.timezone.0=${default.writer.partition.timezone}

data.publisher.final.dir.0=${root.dir.data}

#
# PANELIST
#
fork.branch.name.1=panelist
converter.classes.1=net.meetrics.dada.ingestion.gobblin.converter.filter.PanelistFilterConverter
writer.staging.dir.1=${task.data.root.dir}/task-staging
writer.output.dir.1=${task.data.root.dir}/task-output
writer.file.path.1=${fork.branch.name.1}
writer.codec.type.1=${default.writer.codec.type}
writer.deflate.level.1=${default.writer.deflate.level}
writer.destination.type.1=${default.writer.destination.type}
writer.output.format.1=${default.writer.output.format}
writer.builder.class.1=${writer.builder.class}
writer.partitioner.class.1=${writer.partitioner.class}
writer.partition.columns.1=${default.writer.partition.columns}
writer.partition.prefix.1=${default.writer.partition.prefix}
writer.partition.pattern.1=${default.writer.partition.pattern}
writer.partition.timezone.1=${default.writer.partition.timezone}

data.publisher.final.dir.1=${root.dir.data}

#
# BRANDSAFETY
#
fork.branch.name.2=brandsafety
converter.classes.2=net.meetrics.dada.ingestion.gobblin.converter.filter.BrandsafetyFilterConverter
writer.staging.dir.2=${task.data.root.dir}/task-staging
writer.output.dir.2=${task.data.root.dir}/task-output
writer.file.path.2=${fork.branch.name.2}
writer.codec.type.2=${default.writer.codec.type}
writer.deflate.level.2=${default.writer.deflate.level}
writer.destination.type.2=${default.writer.destination.type}
writer.output.format.2=${default.writer.output.format}
writer.builder.class.2=${writer.builder.class}
writer.partitioner.class.2=${writer.partitioner.class}
writer.partition.columns.2=${default.writer.partition.columns}
writer.partition.prefix.2=${default.writer.partition.prefix}
writer.partition.pattern.2=${default.writer.partition.pattern}
writer.partition.timezone.2=${default.writer.partition.timezone}

data.publisher.final.dir.2=${root.dir.data}





Maurice Wolter

unread,
Jul 13, 2017, 11:35:32 AM7/13/17
to gobblin-users
I forgot to mention,

net.meetrics.dada.ingestion.gobblin.runtime.CliMRJobLauncher extends gobblin.runtime.mapreduce.CliMRJobLauncher

Issac Buenrostro

unread,
Jul 13, 2017, 11:38:37 AM7/13/17
to Maurice Wolter, gobblin-users
Hi Maurice,

In Typesafe Config, it is not possible for both writer.fs.uri and writer.fs.uri.0 to both have a string value. As such, the line that prints the value of ConfigurationKeys.WRITER_FILE_SYSTEM_URI will fail when you have forks.

In general, in Gobblin, it is incorrect to read the value of writer.fs.uri without using "ForkOperatorUtils.getPropertyNameForBranch". Is there any reason you need to read the writer file system in the job launcher?

Best,
Issac

--
You received this message because you are subscribed to the Google Groups "gobblin-users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to gobblin-users+unsubscribe@googlegroups.com.
To post to this group, send email to gobbli...@googlegroups.com.

Maurice Wolter

unread,
Jul 13, 2017, 11:45:51 AM7/13/17
to gobblin-users, maurice...@gmail.com
Hi,

no I do not need to read it there, it just put it to point out the source of the problem, at some point ConfigUtils.propertiesToConfig(jobProperties); will be called and then I get the error at anohter point, I'll have to check the code again to find the exact location. Still to mee it seems like that it is not possible to put values like fs.writer.uri.0 into my config file, wich I need, because ForkOperatorUtils.getPropertyNameForBranch is just building the propety name out of branch number and basic property name.

Issac Buenrostro

unread,
Jul 13, 2017, 11:48:19 AM7/13/17
to Maurice Wolter, gobblin-users
Make sure you do not have both "writer.fs.uri" and "writer.fs.uri.0" in your job configuration.

On Thu, Jul 13, 2017 at 8:45 AM, Maurice Wolter <maurice...@gmail.com> wrote:
Hi,

no I do not need to read it there, it just put it to point out the source of the problem, at some point ConfigUtils.propertiesToConfig(jobProperties); will be called and then I get the error at anohter point, I'll have to check the code again to find the exact location. Still to mee it seems like that it is not possible to put values like fs.writer.uri.0 into my config file, wich I need, because ForkOperatorUtils.getPropertyNameForBranch is just building the propety name out of branch number and basic property name.

--
You received this message because you are subscribed to the Google Groups "gobblin-users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to gobblin-users+unsubscribe@googlegroups.com.
To post to this group, send email to gobbli...@googlegroups.com.

Maurice Wolter

unread,
Jul 14, 2017, 4:09:14 AM7/14/17
to gobblin-users, maurice...@gmail.com
This is not working either, also I still have three branches so I need writer.fs.uri.0, writer.fs.uri.1 and writer.fs.uri.3 which leads to the same problem.

Maurice Wolter

unread,
Jul 14, 2017, 4:10:38 AM7/14/17
to gobblin-users, maurice...@gmail.com
... but as it is the same uri for all writers, I could also use only one, but then the ForkOperatorUtils.getPropertyNameForBranch operation won't work.

Shirshanka Das

unread,
Jul 14, 2017, 10:14:20 AM7/14/17
to Maurice Wolter, gobblin-users
Can you do the following and let us know what happens. 

1. remove writer.fs.uri from your config file (keep the fork specific writer.fs.uri.N around)

2. Remove the code (println etc) in your job launcher that accesses the writer.fs.uri property. 




Shirshanka


On Jul 14, 2017, at 1:10 AM, Maurice Wolter <maurice...@gmail.com> wrote:

... but as it is the same uri for all writers, I could also use only one, but then the ForkOperatorUtils.getPropertyNameForBranch operation won't work.

--
You received this message because you are subscribed to the Google Groups "gobblin-users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to gobblin-user...@googlegroups.com.

To post to this group, send email to gobbli...@googlegroups.com.

Maurice Wolter

unread,
Jul 19, 2017, 9:20:09 AM7/19/17
to gobblin-users, maurice...@gmail.com
Hey,

so this definiately changes something ;) ... the job still fails, but I cannot say if the error is still related to the configuration.
I post the StackTrace here for convenience, but I need to dig into it, to figure out, if the original problem was solved.

From my map Tasks I get the following, also note that not all map tasks fail:
2017-07-18 15:04:48,211 ERROR [Task-committing-pool-1] gobblin.runtime.Task: Task task_dev_mxraw_1500382791215_384 failed
gobblin
.runtime.ForkException: Fork branches [0, 1, 2] failed for task task_dev_mxraw_1500382791215_384
        at gobblin
.runtime.Task.commit(Task.java:808)
        at gobblin
.runtime.GobblinMultiTaskAttempt$1$1.call(GobblinMultiTaskAttempt.java:166)
        at gobblin
.runtime.GobblinMultiTaskAttempt$1$1.call(GobblinMultiTaskAttempt.java:161)
        at java
.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java
.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java
.util.concurrent.FutureTask.run(FutureTask.java:266)
        at gobblin
.util.executors.MDCPropagatingRunnable.run(MDCPropagatingRunnable.java:35)
        at java
.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java
.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java
.lang.Thread.run(Thread.java:745)
2017-07-18 15:04:48,211 INFO [Task-committing-pool-1] gobblin.source.extractor.extract.kafka.KafkaExtractor: Actual high watermark for partition dc_4F963AF3-5BE2-4229-9FC8-CE4F55CE93BC:1=1268, expected=1621
2017-07-18 15:04:48,211 INFO [Task-committing-pool-1] gobblin.source.extractor.extract.kafka.KafkaExtractor: Avg time to pull a record for partition dc_4F963AF3-5BE2-4229-9FC8-CE4F55CE93BC:1 not recorded
2017-07-18 15:04:48,212 INFO [Task-committing-pool-1] gobblin.runtime.Task: publish.data.at.job.level is true. Will publish data at the job level.
2017-07-18 15:04:48,212 INFO [Task-committing-pool-1] gobblin.runtime.mapreduce.MRTaskStateTracker: Task task_dev_mxraw_1500382791215_384 completed running in 117014ms with state FAILED
2017-07-18 15:04:48,212 WARN [ForkExecutor-78] gobblin.runtime.fork.AsynchronousFork: Interrupted while trying to get a record off the queue
java
.lang.InterruptedException
        at java
.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.reportInterruptAfterWait(AbstractQueuedSynchronizer.java:2014)
        at java
.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2088)
        at java
.util.concurrent.ArrayBlockingQueue.poll(ArrayBlockingQueue.java:418)
        at gobblin
.runtime.BoundedBlockingRecordQueue.get(BoundedBlockingRecordQueue.java:97)
        at gobblin
.runtime.fork.AsynchronousFork.processRecord(AsynchronousFork.java:91)
        at gobblin
.runtime.fork.AsynchronousFork.processRecords(AsynchronousFork.java:81)
        at gobblin
.runtime.fork.Fork.run(Fork.java:180)
        at java
.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java
.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java
.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java
.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java
.lang.Thread.run(Thread.java:745)
2017-07-18 15:04:48,212 WARN [ForkExecutor-80] gobblin.runtime.fork.AsynchronousFork: Interrupted while trying to get a record off the queue
java
.lang.InterruptedException
        at java
.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.reportInterruptAfterWait(AbstractQueuedSynchronizer.java:2014)
        at java
.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2088)
        at java
.util.concurrent.ArrayBlockingQueue.poll(ArrayBlockingQueue.java:418)
        at gobblin
.runtime.BoundedBlockingRecordQueue.get(BoundedBlockingRecordQueue.java:97)
        at gobblin
.runtime.fork.AsynchronousFork.processRecord(AsynchronousFork.java:91)
        at gobblin
.runtime.fork.AsynchronousFork.processRecords(AsynchronousFork.java:81)
        at gobblin
.runtime.fork.Fork.run(Fork.java:180)
        at java
.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java
.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java
.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java
.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java
.lang.Thread.run(Thread.java:745)
2017-07-18 15:04:48,212 WARN [ForkExecutor-79] gobblin.runtime.fork.AsynchronousFork: Interrupted while trying to get a record off the queue
java
.lang.InterruptedException
        at java
.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.reportInterruptAfterWait(AbstractQueuedSynchronizer.java:2014)
        at java
.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2088)
        at java
.util.concurrent.ArrayBlockingQueue.poll(ArrayBlockingQueue.java:418)
        at gobblin
.runtime.BoundedBlockingRecordQueue.get(BoundedBlockingRecordQueue.java:97)
        at gobblin
.runtime.fork.AsynchronousFork.processRecord(AsynchronousFork.java:91)
        at gobblin
.runtime.fork.AsynchronousFork.processRecords(AsynchronousFork.java:81)
        at gobblin
.runtime.fork.Fork.run(Fork.java:180)
        at java
.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java
.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java
.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java
.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java
.lang.Thread.run(Thread.java:745)
2017-07-18 15:04:48,213 ERROR [ForkExecutor-80] gobblin.runtime.fork.Fork-2: Fork 2 of task task_dev_mxraw_1500382791215_384 failed to process data records
java
.lang.RuntimeException: java.lang.InterruptedException
        at com
.google.common.base.Throwables.propagate(Throwables.java:160)
        at gobblin
.runtime.fork.AsynchronousFork.processRecord(AsynchronousFork.java:102)
        at gobblin
.runtime.fork.AsynchronousFork.processRecords(AsynchronousFork.java:81)
        at gobblin
.runtime.fork.Fork.run(Fork.java:180)
        at java
.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java
.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java
.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java
.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java
.lang.Thread.run(Thread.java:745)
Caused by: java.lang.InterruptedException
        at java
.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.reportInterruptAfterWait(AbstractQueuedSynchronizer.java:2014)
        at java
.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2088)
        at java
.util.concurrent.ArrayBlockingQueue.poll(ArrayBlockingQueue.java:418)
        at gobblin
.runtime.BoundedBlockingRecordQueue.get(BoundedBlockingRecordQueue.java:97)
        at gobblin
.runtime.fork.AsynchronousFork.processRecord(AsynchronousFork.java:91)
       
... 7 more
2017-07-18 15:04:48,213 ERROR [ForkExecutor-78] gobblin.runtime.fork.Fork-0: Fork 0 of task task_dev_mxraw_1500382791215_384 failed to process data records
java
.lang.RuntimeException: java.lang.InterruptedException
        at com
.google.common.base.Throwables.propagate(Throwables.java:160)
        at gobblin
.runtime.fork.AsynchronousFork.processRecord(AsynchronousFork.java:102)
        at gobblin
.runtime.fork.AsynchronousFork.processRecords(AsynchronousFork.java:81)
        at gobblin
.runtime.fork.Fork.run(Fork.java:180)
        at java
.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java
.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java
.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java
.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java
.lang.Thread.run(Thread.java:745)
Caused by: java.lang.InterruptedException
        at java
.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.reportInterruptAfterWait(AbstractQueuedSynchronizer.java:2014)
        at java
.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2088)
        at java
.util.concurrent.ArrayBlockingQueue.poll(ArrayBlockingQueue.java:418)
        at gobblin
.runtime.BoundedBlockingRecordQueue.get(BoundedBlockingRecordQueue.java:97)
        at gobblin
.runtime.fork.AsynchronousFork.processRecord(AsynchronousFork.java:91)
       
... 7 more
2017-07-18 15:04:48,213 INFO [Task-committing-pool-1] gobblin.publisher.TaskPublisher: All components finished successfully, checking quality tests
2017-07-18 15:04:48,213 INFO [Task-committing-pool-1] gobblin.publisher.TaskPublisher: All required test passed for this task passed.
2017-07-18 15:04:48,213 INFO [Task-committing-pool-1] gobblin.publisher.TaskPublisher: Cleanup for task publisher executed successfully.
2017-07-18 15:04:48,213 INFO [Task-committing-pool-1] gobblin.runtime.fork.Fork-0: Committing data for fork 0 of task task_dev_mxraw_1500382791215_385
2017-07-18 15:04:48,213 ERROR [ForkExecutor-79] gobblin.runtime.fork.Fork-1: Fork 1 of task task_dev_mxraw_1500382791215_384 failed to process data records
java
.lang.RuntimeException: java.lang.InterruptedException
        at com
.google.common.base.Throwables.propagate(Throwables.java:160)
        at gobblin
.runtime.fork.AsynchronousFork.processRecord(AsynchronousFork.java:102)
        at gobblin
.runtime.fork.AsynchronousFork.processRecords(AsynchronousFork.java:81)
        at gobblin
.runtime.fork.Fork.run(Fork.java:180)
        at java
.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java
.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java
.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java
.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java
.lang.Thread.run(Thread.java:745)
Caused by: java.lang.InterruptedException
        at java
.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.reportInterruptAfterWait(AbstractQueuedSynchronizer.java:2014)
        at java
.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2088)
        at java
.util.concurrent.ArrayBlockingQueue.poll(ArrayBlockingQueue.java:418)
        at gobblin
.runtime.BoundedBlockingRecordQueue.get(BoundedBlockingRecordQueue.java:97)
        at gobblin
.runtime.fork.AsynchronousFork.processRecord(AsynchronousFork.java:91)
       
... 7 more


Reply all
Reply to author
Forward
0 new messages