Using Quartz Scheduler for Gobblin in MR mode.

268 views
Skip to first unread message

Prashant Bhardwaj

unread,
Jan 4, 2016, 7:08:06 AM1/4/16
to gobblin-users
Hi

My use case is simple, I have written some Sources/Converters/Extractors. Currently whenever I want to run Gobblin job, I run it by submitting to Hadoop CLI. Now I want to schedule this job to run every 5 minutes. So I added these lines to my "gobblin-mapreduce.properties" file.

job.lock.enabled=True

job.schedule=0 0/5 * * * ?

job.runonce=False

launcher.type=MAPREDUCE


Now my questions are.
1. Is this sufficient to run JobScheduler in MR mode? How do I monitor if Job Scheduler is running or not? How do I stop scheduler, if I want to?

2. Also I want to run multiple Gobblin MR jobs in parallel at same time. These jobs have almost everything in common except Converters/Extractors/WriterPartitioner. In standalone mode we can specify some common properties and give "jobconf.dir" which contains configuration for all the jobs. Can it be done in MR mode?

--
Prashant

Sahil Takiar

unread,
Jan 5, 2016, 9:55:35 PM1/5/16
to Prashant Bhardwaj, gobblin-users
Hey Prashant,

It's a little counter-intuitive, but I think if you keep your standalone mode config file, and simply added "launched.type=MAPREDUCE" to your config, everything should work as desired.

So basically, the major change you need to make is to launch Gobblin using "bin/gobblin-standalone.sh" instead of "bin/gobblin-mapreduce.sh".

--Sahil

--
You received this message because you are subscribed to the Google Groups "gobblin-users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to gobblin-user...@googlegroups.com.
To post to this group, send email to gobbli...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/gobblin-users/9b44f972-2f92-408f-87db-4f4b085e3b89%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

Prashant Bhardwaj

unread,
Jan 6, 2016, 2:17:44 AM1/6/16
to gobblin-users, prashantb...@gmail.com
Hi Sahil

I did as you mentioned but somehow my jobs are running in local mode on cluster. I mean, MRJobLauncher is submitting the job to cluster but it is not running in distributed mode. While using gobblin-mapreduce.sh it was running in distributed mode.

Also I'm getting this error in nohup.out
Exception in thread "Thread-980" java.lang.IllegalArgumentException: Missing required property writer.staging.dir
at com.google.common.base.Preconditions.checkArgument(Preconditions.java:93)
at gobblin.util.WriterUtils.getWriterStagingDir(WriterUtils.java:64)
at gobblin.util.JobLauncherUtils.cleanTaskStagingData(JobLauncherUtils.java:169)
at gobblin.runtime.mapreduce.GobblinOutputCommitter.abortJob(GobblinOutputCommitter.java:98)
at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:549)
Exception in thread "Thread-978" java.lang.IllegalArgumentException: Missing required property writer.staging.dir
at com.google.common.base.Preconditions.checkArgument(Preconditions.java:93)
at gobblin.util.WriterUtils.getWriterStagingDir(WriterUtils.java:64)
at gobblin.util.JobLauncherUtils.cleanTaskStagingData(JobLauncherUtils.java:169)
at gobblin.runtime.mapreduce.GobblinOutputCommitter.abortJob(GobblinOutputCommitter.java:98)
at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:549)

Even when the property "writer.staging.dir" is set in gobblin-standalone.properties.

--
Prashant 

Sahil Takiar

unread,
Jan 6, 2016, 10:48:05 AM1/6/16
to Prashant Bhardwaj, gobblin-users
Hey Prashant,

Unfortunately, there seems to be a bug where "bin/gobblin-mapreduce.sh" script is hard coded to use "conf/gobblin-mapreduce.properties". I'll open up a ticket for this.

So you need to make sure "writer.staging.dir" is set in "conf/gobblin-mapreduce.properties".

Also, I still think you can get this to work by running "bin/gobblin-standalone.sh" instead. I believe I know why it wasn't working for you. Gobblin standalone has a distinction between system properties and job level properties. The properties "launcher.type" and "job.schedule" are job level properties, and thus should be put in your ".job" or ".pull" file, but not in your ".properties" file. I understand if this is a little confusing, I'll put some thought into how to make the documentation clearer on this point.

--Sahil

Prashant Bhardwaj

unread,
Jan 6, 2016, 3:22:39 PM1/6/16
to gobblin-users, prashantb...@gmail.com
Right now I'm using Jenkins to schedule Gobblin job. It is working fine most of the time. But in some runs I got this error. I'm not sure what is causing this error. Also next run of job continues to run fine.

WARNING: Error while attempting to shut down the service after failure.
java.io.IOException: java.io.EOFException
at gobblin.util.ParallelRunner.close(ParallelRunner.java:291)
at gobblin.runtime.TaskStateCollectorService.collectOutputTaskStates(TaskStateCollectorService.java:145)
at gobblin.runtime.TaskStateCollectorService.runOneIteration(TaskStateCollectorService.java:81)
at gobblin.runtime.TaskStateCollectorService.shutDown(TaskStateCollectorService.java:102)
at com.google.common.util.concurrent.AbstractScheduledService$1$1.run(AbstractScheduledService.java:175)
at com.google.common.util.concurrent.Callables$3.run(Callables.java:93)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:304)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:178)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.EOFException
at java.io.DataInputStream.readFully(DataInputStream.java:197)
at java.io.DataInputStream.readFully(DataInputStream.java:169)
at org.apache.hadoop.io.SequenceFile$Reader.init(SequenceFile.java:1845)
at org.apache.hadoop.io.SequenceFile$Reader.initialize(SequenceFile.java:1810)
at org.apache.hadoop.io.SequenceFile$Reader.<init>(SequenceFile.java:1759)
at org.apache.hadoop.io.SequenceFile$Reader.<init>(SequenceFile.java:1773)
at gobblin.util.ParallelRunner$3.call(ParallelRunner.java:160)
at gobblin.util.ParallelRunner$3.call(ParallelRunner.java:154)
at java.util.concurrent.FutureTask.run(FutureTask.java:262)
... 3 more

ERROR [AbstractJobLauncher] Failed to launch and run job job_ReqLogsIngestion_1452110402762: java.lang.IllegalStateException: Expected the service to be TERMINATED, but the service has FAILED
java.lang.IllegalStateException: Expected the service to be TERMINATED, but the service has FAILED
at com.google.common.util.concurrent.AbstractService.checkCurrentState(AbstractService.java:334)
at com.google.common.util.concurrent.AbstractService.awaitTerminated(AbstractService.java:303)
at com.google.common.util.concurrent.AbstractScheduledService.awaitTerminated(AbstractScheduledService.java:402)
at gobblin.runtime.mapreduce.MRJobLauncher.runWorkUnits(MRJobLauncher.java:225)
at gobblin.runtime.AbstractJobLauncher.launchJob(AbstractJobLauncher.java:245)
at gobblin.runtime.mapreduce.CliMRJobLauncher.run(CliMRJobLauncher.java:60)
at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:70)
at gobblin.runtime.mapreduce.CliMRJobLauncher.main(CliMRJobLauncher.java:133)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.apache.hadoop.util.RunJar.main(RunJar.java:212)
Caused by: java.io.IOException: java.io.EOFException
at gobblin.util.ParallelRunner.close(ParallelRunner.java:291)
at gobblin.runtime.TaskStateCollectorService.collectOutputTaskStates(TaskStateCollectorService.java:145)
at gobblin.runtime.TaskStateCollectorService.runOneIteration(TaskStateCollectorService.java:81)
at com.google.common.util.concurrent.AbstractScheduledService$1$1.run(AbstractScheduledService.java:172)
at com.google.common.util.concurrent.Callables$3.run(Callables.java:93)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:304)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:178)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.EOFException
at java.io.DataInputStream.readFully(DataInputStream.java:197)
at java.io.DataInputStream.readFully(DataInputStream.java:169)
at org.apache.hadoop.io.SequenceFile$Reader.init(SequenceFile.java:1845)
at org.apache.hadoop.io.SequenceFile$Reader.initialize(SequenceFile.java:1810)
at org.apache.hadoop.io.SequenceFile$Reader.<init>(SequenceFile.java:1759)
at org.apache.hadoop.io.SequenceFile$Reader.<init>(SequenceFile.java:1773)
at gobblin.util.ParallelRunner$3.call(ParallelRunner.java:160)
at gobblin.util.ParallelRunner$3.call(ParallelRunner.java:154)
at java.util.concurrent.FutureTask.run(FutureTask.java:262)
... 3 more
Failed to launch the job due to the following exception:
gobblin.runtime.JobException: Job job_ReqLogsIngestion_1452110402762 failed

Sahil Takiar

unread,
Jan 6, 2016, 3:39:33 PM1/6/16
to Prashant Bhardwaj, gobblin-users
What version of Hadoop are you using?

Can you send over the full Gobblin logs, as well as the logs from the Map Tasks?

Based on the stack trace, the AbstractJobLauncher's TaskStateCollectorService is throwing an exception when trying to read SequenceFiles from HDFS. Each of these SequenceFiles represents a TaskState, and the file is produced at the end of a Gobblin Task (which runs in a Hadoop Map Task). I took a look at the source code of SequenceFile.Reader and it seems to be failing when reading some initial metadata stored in the SequenceFile (specifically the SequenceFile version being used). My guess is that for some reason Gobblin Tasks are producing empty SequenceFiles.

--Sahil 

Sahil Takiar

unread,
Feb 16, 2016, 5:22:08 PM2/16/16
to Prashant Bhardwaj, gobblin-users
There was a bug with the TaskStateCollectorService that was fixed in: https://github.com/linkedin/gobblin/pull/677

--Sahil
Reply all
Reply to author
Forward
0 new messages