Hi,
I ran into the following problem while ingesting data from Kafka into HDFS using gobblin 0.10.0
Exception in thread "main" com.typesafe.config.ConfigException$WrongType: hardcoded value: writer.fs.uri has type OBJECT rather than STRING
at com.typesafe.config.impl.SimpleConfig.findKeyOrNull(SimpleConfig.java:159)
at com.typesafe.config.impl.SimpleConfig.findOrNull(SimpleConfig.java:170)
at com.typesafe.config.impl.SimpleConfig.findOrNull(SimpleConfig.java:176)
at com.typesafe.config.impl.SimpleConfig.findOrNull(SimpleConfig.java:176)
at com.typesafe.config.impl.SimpleConfig.find(SimpleConfig.java:184)
at com.typesafe.config.impl.SimpleConfig.find(SimpleConfig.java:189)
at com.typesafe.config.impl.SimpleConfig.getString(SimpleConfig.java:246)
at gobblin.util.ConfigUtils.getString(ConfigUtils.java:307)
at net.meetrics.dada.ingestion.gobblin.runtime.CliMRJobLauncher.<init>(CliMRJobLauncher.java:51)
at net.meetrics.dada.ingestion.gobblin.runtime.CliMRJobLauncher.main(CliMRJobLauncher.java:79)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at org.apache.hadoop.util.RunJar.run(RunJar.java:221)
at org.apache.hadoop.util.RunJar.main(RunJar.java:136)
Build was aborted
As I use different branches/forks I have the following entries in my configuration:
fs.uri=hdfs://nameservice1
writer.fs.uri.0=${fs.uri}
writer.fs.uri.1=${fs.uri}
writer.fs.uri.2=${fs.uri}
To me, it looks like the problem is related to com.typesafe.config.ConfigFactory#parseMap(Map<String, ? extends Object>,String) which is invoked from within gobblin.util.ConfigUtils,
The javaDoc says:
An exception will be thrown (and it is a bug in the caller of the method) if a path is both an object and a value, for example if you had both "a=foo" and "a.b=bar",
then "a" is both the string "foo" and the parent object of "b". The caller of this method should ensure that doesn't happen.
--
You received this message because you are subscribed to the Google Groups "gobblin-users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to gobblin-users+unsubscribe@googlegroups.com.
To post to this group, send email to gobbli...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/gobblin-users/6a384298-3c4a-43cf-8dbb-f5324287ed28%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.
import java.util.Properties;
import com.typesafe.config.Config;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.DefaultParser;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.Options;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.ToolRunner;
import gobblin.configuration.ConfigurationKeys;
import gobblin.runtime.cli.CliOptions;
import gobblin.util.ConfigUtils;
import gobblin.util.JobConfigurationUtils;
private CliMRJobLauncher(final Configuration conf, final Properties jobProperties) throws Exception {
super(conf, jobProperties);
final Config config = ConfigUtils.propertiesToConfig(jobProperties);
// next line will print the value of writer.fs.uri
System.out.println(jobProperties.getProperty(ConfigurationKeys.WRITER_FILE_SYSTEM_URI));
// next line will throw the Exception
System.out.println(ConfigUtils.getString(config, ConfigurationKeys.STATE_STORE_FS_URI_KEY, ConfigurationKeys.LOCAL_FS_URI));
}
public static void main(final String[] args) throws Exception {
final Configuration conf = new Configuration();
final String[] genericCmdLineOpts = new GenericOptionsParser(conf, args).getCommandLine().getArgs();
final Properties gobblinProperties = CliOptions.parseArgs(CliMRJobLauncher.class, genericCmdLineOpts);
final Options options = new Options();
options.addOption(MEETRICS_CONFIG_OPTION);
options.addOption(CliOptions.SYS_CONFIG_OPTION);
options.addOption(CliOptions.JOB_CONFIG_OPTION);
final CommandLine cmd = new DefaultParser().parse(options, genericCmdLineOpts);
final Properties meetricsProperties = JobConfigurationUtils.fileToProperties(cmd.getOptionValue(MEETRICS_CONFIG_OPTION.getLongOpt()));
// Launch and run the job
System.exit(ToolRunner.run(
new CliMRJobLauncher(
conf,
JobConfigurationUtils.combineSysAndJobProperties(meetricsProperties, gobblinProperties)),
args));
}
hadoop jar $WORKSPACE/dada/dada-ingestion/target/dada-ingestion-dev.jar\
-jobconfig ingestion.mxraw.dev.properties\
-sysconfig ingestion.common.properties\
-meetricsconfig meetrics-dev.properties
ingestion.common.properties
:
# suppress inspection "UnusedProperty" for whole file
# File system URIs
fs.uri=hdfs://nameservice1
writer.fs.uri=${fs.uri}
writer.fs.uri.0=${fs.uri}
writer.fs.uri.1=${fs.uri}
writer.fs.uri.2=${fs.uri}
state.store.fs.uri=${fs.uri}
state.store.fs.uri.0=${fs.uri}
state.store.fs.uri.1=${fs.uri}
state.store.fs.uri.2=${fs.uri}
# Writer related configuration properties
writer.destination.type=HDFS
writer.output.format=AVRO
task.data.root.dir=${env:GOBBLIN_WORK_DIR}
# Data publisher related configuration properties
data.publisher.type=gobblin.publisher.BaseDataPublisher
data.publisher.final.dir=${env:GOBBLIN_WORK_DIR}/job-output
data.publisher.replace.final.dir=false
# Directory where job/task state files are stored
state.store.dir=${env:GOBBLIN_WORK_DIR}/state-store
# Directory where error files from the quality checkers are stored
qualitychecker.row.err.file=${env:GOBBLIN_WORK_DIR}/err
# Directory where job locks are stored
job.lock.dir=${env:GOBBLIN_WORK_DIR}/locks
# Directory where metrics log files are stored
metrics.log.dir=${env:GOBBLIN_WORK_DIR}/metrics
# Interval of task state reporting in milliseconds
task.status.reportintervalinms=5000
# MapReduce properties
mr.job.root.dir=${env:GOBBLIN_WORK_DIR}/working
mr.include.task.counters=false
job.lock.enabled=true
ingestion.mxraw.dev.properties:
Code hier eingeben...# suppress inspection "UnusedProperty" for whole file
job.name=dev_mxraw
job.group=kafka
bootstrap.with.offset=latest
extract.namespace=gobblin.extract.kafka
root.dir.data=/data_dev/mxraw
root.dir.gobblin=/data_dev/.ingestion
state.store.dir=${root.dir.gobblin}/state-store
qualitychecker.row.err.file=${root.dir.gobblin}/err/${job.name}
job.lock.dir=${root.dir.gobblin}/locks
mr.job.root.dir=${root.dir.gobblin}/working
task.data.root.dir=${root.dir.gobblin}
default.writer.codec.type=deflate
default.writer.deflate.level=6
default.writer.destination.type=HDFS
default.writer.output.format=AVRO
writer.builder.class=net.meetrics.dada.ingestion.gobblin.writer.MxrawAvroDataWriterBuilder
writer.partitioner.class=net.meetrics.dada.ingestion.gobblin.writer.partitioner.ProjectIdWriterPartitioner
default.writer.partition.columns=it
default.writer.partition.prefix=hourly
default.writer.partition.timezone=UTC
default.writer.partition.pattern=yyyy/MM/dd/HH
data.publisher.type=net.meetrics.dada.ingestion.gobblin.publisher.KafkaTimePartitionedDataPublisher
data.publisher.replace.final.dir=false
source.class=net.meetrics.dada.ingestion.gobblin.extractor.KafkaMxRawSource
converter.classes=net.meetrics.dada.ingestion.gobblin.converter.JsonByteArrayToAvroConverter,gobblin.converter.avro.AvroToAvroCopyableConverter
fork.branches=3
#
# BASE
#
fork.branch.name.0=base
writer.staging.dir.0=${task.data.root.dir}/task-staging
writer.output.dir.0=${task.data.root.dir}/task-output
writer.file.path.0=${fork.branch.name.0}
writer.codec.type.0=${default.writer.codec.type}
writer.deflate.level.0=${default.writer.deflate.level}
writer.destination.type.0=${default.writer.destination.type}
writer.output.format.0=${default.writer.output.format}
writer.builder.class.0=${writer.builder.class}
writer.partitioner.class.0=${writer.partitioner.class}
writer.partition.columns.0=${default.writer.partition.columns}
writer.partition.prefix.0=${default.writer.partition.prefix}
writer.partition.pattern.0=${default.writer.partition.pattern}
writer.partition.timezone.0=${default.writer.partition.timezone}
data.publisher.final.dir.0=${root.dir.data}
#
# PANELIST
#
fork.branch.name.1=panelist
converter.classes.1=net.meetrics.dada.ingestion.gobblin.converter.filter.PanelistFilterConverter
writer.staging.dir.1=${task.data.root.dir}/task-staging
writer.output.dir.1=${task.data.root.dir}/task-output
writer.file.path.1=${fork.branch.name.1}
writer.codec.type.1=${default.writer.codec.type}
writer.deflate.level.1=${default.writer.deflate.level}
writer.destination.type.1=${default.writer.destination.type}
writer.output.format.1=${default.writer.output.format}
writer.builder.class.1=${writer.builder.class}
writer.partitioner.class.1=${writer.partitioner.class}
writer.partition.columns.1=${default.writer.partition.columns}
writer.partition.prefix.1=${default.writer.partition.prefix}
writer.partition.pattern.1=${default.writer.partition.pattern}
writer.partition.timezone.1=${default.writer.partition.timezone}
data.publisher.final.dir.1=${root.dir.data}
#
# BRANDSAFETY
#
fork.branch.name.2=brandsafety
converter.classes.2=net.meetrics.dada.ingestion.gobblin.converter.filter.BrandsafetyFilterConverter
writer.staging.dir.2=${task.data.root.dir}/task-staging
writer.output.dir.2=${task.data.root.dir}/task-output
writer.file.path.2=${fork.branch.name.2}
writer.codec.type.2=${default.writer.codec.type}
writer.deflate.level.2=${default.writer.deflate.level}
writer.destination.type.2=${default.writer.destination.type}
writer.output.format.2=${default.writer.output.format}
writer.builder.class.2=${writer.builder.class}
writer.partitioner.class.2=${writer.partitioner.class}
writer.partition.columns.2=${default.writer.partition.columns}
writer.partition.prefix.2=${default.writer.partition.prefix}
writer.partition.pattern.2=${default.writer.partition.pattern}
writer.partition.timezone.2=${default.writer.partition.timezone}
data.publisher.final.dir.2=${root.dir.data}
net.meetrics.dada.ingestion.gobblin.runtime.CliMRJobLauncher extends gobblin.runtime.mapreduce.CliMRJobLauncher
--
You received this message because you are subscribed to the Google Groups "gobblin-users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to gobblin-users+unsubscribe@googlegroups.com.
To post to this group, send email to gobbli...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/gobblin-users/b41fabca-57b2-4a39-8492-f9b70bc616ff%40googlegroups.com.
Hi,
no I do not need to read it there, it just put it to point out the source of the problem, at some point ConfigUtils.propertiesToConfig(jobProperties); will be called and then I get the error at anohter point, I'll have to check the code again to find the exact location. Still to mee it seems like that it is not possible to put values like fs.writer.uri.0 into my config file, wich I need, because ForkOperatorUtils.getPropertyNameForBranch is just building the propety name out of branch number and basic property name.
--
You received this message because you are subscribed to the Google Groups "gobblin-users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to gobblin-users+unsubscribe@googlegroups.com.
To post to this group, send email to gobbli...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/gobblin-users/ae962558-11ea-4ffd-bda9-d5d24a431e04%40googlegroups.com.
... but as it is the same uri for all writers, I could also use only one, but then the ForkOperatorUtils.getPropertyNameForBranch operation won't work.
--
You received this message because you are subscribed to the Google Groups "gobblin-users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to gobblin-user...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/gobblin-users/4053e3ce-e903-4524-aed7-9b4860b32bb0%40googlegroups.com.
2017-07-18 15:04:48,211 ERROR [Task-committing-pool-1] gobblin.runtime.Task: Task task_dev_mxraw_1500382791215_384 failed
gobblin.runtime.ForkException: Fork branches [0, 1, 2] failed for task task_dev_mxraw_1500382791215_384
at gobblin.runtime.Task.commit(Task.java:808)
at gobblin.runtime.GobblinMultiTaskAttempt$1$1.call(GobblinMultiTaskAttempt.java:166)
at gobblin.runtime.GobblinMultiTaskAttempt$1$1.call(GobblinMultiTaskAttempt.java:161)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at gobblin.util.executors.MDCPropagatingRunnable.run(MDCPropagatingRunnable.java:35)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
2017-07-18 15:04:48,211 INFO [Task-committing-pool-1] gobblin.source.extractor.extract.kafka.KafkaExtractor: Actual high watermark for partition dc_4F963AF3-5BE2-4229-9FC8-CE4F55CE93BC:1=1268, expected=1621
2017-07-18 15:04:48,211 INFO [Task-committing-pool-1] gobblin.source.extractor.extract.kafka.KafkaExtractor: Avg time to pull a record for partition dc_4F963AF3-5BE2-4229-9FC8-CE4F55CE93BC:1 not recorded
2017-07-18 15:04:48,212 INFO [Task-committing-pool-1] gobblin.runtime.Task: publish.data.at.job.level is true. Will publish data at the job level.
2017-07-18 15:04:48,212 INFO [Task-committing-pool-1] gobblin.runtime.mapreduce.MRTaskStateTracker: Task task_dev_mxraw_1500382791215_384 completed running in 117014ms with state FAILED
2017-07-18 15:04:48,212 WARN [ForkExecutor-78] gobblin.runtime.fork.AsynchronousFork: Interrupted while trying to get a record off the queue
java.lang.InterruptedException
at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.reportInterruptAfterWait(AbstractQueuedSynchronizer.java:2014)
at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2088)
at java.util.concurrent.ArrayBlockingQueue.poll(ArrayBlockingQueue.java:418)
at gobblin.runtime.BoundedBlockingRecordQueue.get(BoundedBlockingRecordQueue.java:97)
at gobblin.runtime.fork.AsynchronousFork.processRecord(AsynchronousFork.java:91)
at gobblin.runtime.fork.AsynchronousFork.processRecords(AsynchronousFork.java:81)
at gobblin.runtime.fork.Fork.run(Fork.java:180)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
2017-07-18 15:04:48,212 WARN [ForkExecutor-80] gobblin.runtime.fork.AsynchronousFork: Interrupted while trying to get a record off the queue
java.lang.InterruptedException
at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.reportInterruptAfterWait(AbstractQueuedSynchronizer.java:2014)
at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2088)
at java.util.concurrent.ArrayBlockingQueue.poll(ArrayBlockingQueue.java:418)
at gobblin.runtime.BoundedBlockingRecordQueue.get(BoundedBlockingRecordQueue.java:97)
at gobblin.runtime.fork.AsynchronousFork.processRecord(AsynchronousFork.java:91)
at gobblin.runtime.fork.AsynchronousFork.processRecords(AsynchronousFork.java:81)
at gobblin.runtime.fork.Fork.run(Fork.java:180)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
2017-07-18 15:04:48,212 WARN [ForkExecutor-79] gobblin.runtime.fork.AsynchronousFork: Interrupted while trying to get a record off the queue
java.lang.InterruptedException
at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.reportInterruptAfterWait(AbstractQueuedSynchronizer.java:2014)
at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2088)
at java.util.concurrent.ArrayBlockingQueue.poll(ArrayBlockingQueue.java:418)
at gobblin.runtime.BoundedBlockingRecordQueue.get(BoundedBlockingRecordQueue.java:97)
at gobblin.runtime.fork.AsynchronousFork.processRecord(AsynchronousFork.java:91)
at gobblin.runtime.fork.AsynchronousFork.processRecords(AsynchronousFork.java:81)
at gobblin.runtime.fork.Fork.run(Fork.java:180)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
2017-07-18 15:04:48,213 ERROR [ForkExecutor-80] gobblin.runtime.fork.Fork-2: Fork 2 of task task_dev_mxraw_1500382791215_384 failed to process data records
java.lang.RuntimeException: java.lang.InterruptedException
at com.google.common.base.Throwables.propagate(Throwables.java:160)
at gobblin.runtime.fork.AsynchronousFork.processRecord(AsynchronousFork.java:102)
at gobblin.runtime.fork.AsynchronousFork.processRecords(AsynchronousFork.java:81)
at gobblin.runtime.fork.Fork.run(Fork.java:180)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.InterruptedException
at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.reportInterruptAfterWait(AbstractQueuedSynchronizer.java:2014)
at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2088)
at java.util.concurrent.ArrayBlockingQueue.poll(ArrayBlockingQueue.java:418)
at gobblin.runtime.BoundedBlockingRecordQueue.get(BoundedBlockingRecordQueue.java:97)
at gobblin.runtime.fork.AsynchronousFork.processRecord(AsynchronousFork.java:91)
... 7 more
2017-07-18 15:04:48,213 ERROR [ForkExecutor-78] gobblin.runtime.fork.Fork-0: Fork 0 of task task_dev_mxraw_1500382791215_384 failed to process data records
java.lang.RuntimeException: java.lang.InterruptedException
at com.google.common.base.Throwables.propagate(Throwables.java:160)
at gobblin.runtime.fork.AsynchronousFork.processRecord(AsynchronousFork.java:102)
at gobblin.runtime.fork.AsynchronousFork.processRecords(AsynchronousFork.java:81)
at gobblin.runtime.fork.Fork.run(Fork.java:180)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.InterruptedException
at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.reportInterruptAfterWait(AbstractQueuedSynchronizer.java:2014)
at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2088)
at java.util.concurrent.ArrayBlockingQueue.poll(ArrayBlockingQueue.java:418)
at gobblin.runtime.BoundedBlockingRecordQueue.get(BoundedBlockingRecordQueue.java:97)
at gobblin.runtime.fork.AsynchronousFork.processRecord(AsynchronousFork.java:91)
... 7 more
2017-07-18 15:04:48,213 INFO [Task-committing-pool-1] gobblin.publisher.TaskPublisher: All components finished successfully, checking quality tests
2017-07-18 15:04:48,213 INFO [Task-committing-pool-1] gobblin.publisher.TaskPublisher: All required test passed for this task passed.
2017-07-18 15:04:48,213 INFO [Task-committing-pool-1] gobblin.publisher.TaskPublisher: Cleanup for task publisher executed successfully.
2017-07-18 15:04:48,213 INFO [Task-committing-pool-1] gobblin.runtime.fork.Fork-0: Committing data for fork 0 of task task_dev_mxraw_1500382791215_385
2017-07-18 15:04:48,213 ERROR [ForkExecutor-79] gobblin.runtime.fork.Fork-1: Fork 1 of task task_dev_mxraw_1500382791215_384 failed to process data records
java.lang.RuntimeException: java.lang.InterruptedException
at com.google.common.base.Throwables.propagate(Throwables.java:160)
at gobblin.runtime.fork.AsynchronousFork.processRecord(AsynchronousFork.java:102)
at gobblin.runtime.fork.AsynchronousFork.processRecords(AsynchronousFork.java:81)
at gobblin.runtime.fork.Fork.run(Fork.java:180)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.InterruptedException
at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.reportInterruptAfterWait(AbstractQueuedSynchronizer.java:2014)
at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2088)
at java.util.concurrent.ArrayBlockingQueue.poll(ArrayBlockingQueue.java:418)
at gobblin.runtime.BoundedBlockingRecordQueue.get(BoundedBlockingRecordQueue.java:97)
at gobblin.runtime.fork.AsynchronousFork.processRecord(AsynchronousFork.java:91)
... 7 more