Hi,
I am able to port avro data from kafka to s3 using map-reduce script. But I want to run the job every 5 min. I see that it's not possible to use default quartz scheduler via map-reduce launch script but we can use standalone script to launch the map reduce job with scheduler.
I modified my standalone properties and job file to do it but I am getting
java.lang.RuntimeException: java.lang.ClassNotFoundException: Class org.apache.hadoop.fs.s3a.S3AFileSystem not found
Also it's not reporting metric via standalone script even if I set it
Can we not use s3a file system via standalone script? How do I fix it? Also, what do I have to do differently to report matrix via standalone script?
Error logs :
2016-04-22 17:24:34 UTC INFO [JobScheduler STARTING] gobblin.scheduler.JobScheduler 146 - Starting the job scheduler
2016-04-22 17:24:34 UTC INFO [JobScheduler STARTING] org.quartz.core.QuartzScheduler 575 - Scheduler LocalJobScheduler_$_NON_CLUSTERED started.
2016-04-22 17:24:34 UTC INFO [JobScheduler STARTING] gobblin.scheduler.JobScheduler 365 - Scheduling locally configured jobs
2016-04-22 17:24:34 UTC INFO [JobScheduler STARTING] gobblin.scheduler.JobScheduler 378 - Loaded 1 job configuration
2016-04-22 17:24:34 UTC INFO [MetricsReportingService STARTING] gobblin.metrics.GobblinMetrics 481 - Not reporting metrics to JMX
2016-04-22 17:24:34 UTC INFO [MetricsReportingService STARTING] gobblin.metrics.GobblinMetrics 430 - Not reporting metrics to log files
2016-04-22 17:24:34 UTC INFO [MetricsReportingService STARTING] gobblin.metrics.GobblinMetrics 492 - Not reporting metrics to Kafka
2016-04-22 17:25:00 UTC WARN [LocalJobScheduler_Worker-1] org.apache.hadoop.util.NativeCodeLoader 62 - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
2016-04-22 17:25:00 UTC INFO [LocalJobScheduler_Worker-1] org.quartz.core.JobRunShell 207 - Job JwGobblinKafkaImport5.JwGobblinKafkaImport5 threw a JobExecutionException:
org.quartz.JobExecutionException: gobblin.runtime.JobException: Failed to run job JwGobblinKafkaImport5 [See nested exception: gobblin.runtime.JobException: Failed to run job JwGobblinKafkaImport5]
at gobblin.scheduler.JobScheduler$GobblinJob.execute(JobScheduler.java:514)
at org.quartz.core.JobRunShell.run(JobRunShell.java:202)
at org.quartz.simpl.SimpleThreadPool$WorkerThread.run(SimpleThreadPool.java:573)
Caused by: gobblin.runtime.JobException: Failed to run job JwGobblinKafkaImport5
at gobblin.scheduler.JobScheduler.runJob(JobScheduler.java:294)
at gobblin.scheduler.JobScheduler$GobblinJob.execute(JobScheduler.java:512)
... 2 more
Caused by: java.lang.RuntimeException: java.lang.ClassNotFoundException: Class org.apache.hadoop.fs.s3a.S3AFileSystem not found
at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2074)
at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:2578)
at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2591)
at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:91)
at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2630)
at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2612)
at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:370)
at gobblin.runtime.JobContext.<init>(JobContext.java:125)
at gobblin.runtime.AbstractJobLauncher.<init>(AbstractJobLauncher.java:126)
at gobblin.runtime.local.LocalJobLauncher.<init>(LocalJobLauncher.java:67)
at gobblin.runtime.JobLauncherFactory.newJobLauncher(JobLauncherFactory.java:63)
at gobblin.scheduler.JobScheduler.runJob(JobScheduler.java:292)
... 3 more
Caused by: java.lang.ClassNotFoundException: Class org.apache.hadoop.fs.s3a.S3AFileSystem not found
at org.apache.hadoop.conf.Configuration.getClassByName(Configuration.java:1980)
at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2072)
... 14 more
Below are my configuration files :
1.pull
job.group=GobblinKafkaImport5
job.description=Gobblin quick start job for Kafka
job.lock.enabled=true
mr.job.root.dir=${env:GOBBLIN_WORK_DIR}/working
task.data.root.dir=${env:GOBBLIN_WORK_DIR}/working/task-data
job.schedule=0 0/5 * * * ?
job.runonce=False
launcher.type=MAPREDUCE
source.class=com.extractor.KafkaSource2
extract.namespace= gobblin.extract.kafka
topic.whitelist=ping-avro
writer.builder.class=gobblin.writer.AvroDataWriterBuilder
writer.file.path.type=tablename
writer.destination.type=HDFS
writer.output.format=AVRO
bootstrap.with.offset=latest
metrics.reporting.file.enabled=true
metircs.enabled=true
metrics.reporting.file.suffix=txt
writer.partitioner.class=gobblin.writer.partitioner.TimeBasedAvroWriterPartitioner
writer.partition.granularity=day
data.publisher.type=gobblin.publisher.TimePartitionedDataPublisher
writer.partition.timezone=UTC
gobblin-standalone.properties
# Thread pool settings for the task executor
taskexecutor.threadpool.size=2
taskretry.threadpool.coresize=1
taskretry.threadpool.maxsize=2
# File system URIs
fs.uri=hdfs://{host}:8020
writer.fs.uri=${fs.uri}
# Writer related configuration properties
writer.destination.type=HDFS
writer.output.format=AVRO
writer.staging.dir=${env:GOBBLIN_WORK_DIR}/task-staging
writer.output.dir=${env:GOBBLIN_WORK_DIR}/task-output
# Data publisher related configuration properties
data.publisher.type=gobblin.publisher.BaseDataPublisher
data.publisher.replace.final.dir=false
# Directory where job configuration files are stored
jobconf.dir=${env:GOBBLIN_JOB_CONFIG_DIR}
# Directory where commit sequences are stored
gobblin.runtime.commit.sequence.store.dir=${env:GOBBLIN_WORK_DIR}/commit-sequence-store
# Directory where error files from the quality checkers are stored
qualitychecker.row.err.file=${env:GOBBLIN_WORK_DIR}/err
# Directory where job locks are stored
job.lock.dir=${env:GOBBLIN_WORK_DIR}/locks
# Directory where metrics log files are stored
metrics.log.dir=${env:GOBBLIN_WORK_DIR}/metrics
# Interval of task state reporting in milliseconds
task.status.reportintervalinms=5000
# MapReduce properties
mr.job.root.dir=${env:GOBBLIN_WORK_DIR}/working
task.data.root.dir=${env:GOBBLIN_WORK_DIR}/working/task-data
# s3 bucket configuration
fs.s3a.access.key={key}
fs.s3a.secret.key={key}