<configuration>
<property>
<name>fs.defaultFS</name>
<value>s3n://mybucket</value>
</property>
<property>
<name>fs.s3n.awsAccessKeyId</name>
<value>AWS-ID</value>
</property>
<property>
<name>fs.s3n.awsSecretAccessKey</name>
<value>SECRET-ID</value>
</property>
<!-- OOZIE proxy user setting -->
<property>
<name>hadoop.proxyuser.oozie.hosts</name>
<value>*</value>
</property>
<property>
<name>hadoop.proxyuser.oozie.groups</name>
<value>*</value>
</property>
<!-- HTTPFS proxy user setting -->
<property>
<name>hadoop.proxyuser.httpfs.hosts</name>
<value>*</value>
</property>
<property>
<name>hadoop.proxyuser.httpfs.groups</name>
<value>*</value>
</property>
<!-- Llama proxy user setting -->
<property>
<name>hadoop.proxyuser.llama.hosts</name>
<value>*</value>
</property>
<property>
<name>hadoop.proxyuser.llama.groups</name>
<value>*</value>
</property>
<!-- Hue proxy user setting -->
<property>
<name>hadoop.proxyuser.hue.hosts</name>
<value>*</value>
</property>
<property>
<name>hadoop.proxyuser.hue.groups</name>
<value>*</value>
</property>
<!-- Mapred proxy user setting -->
<property>
<name>hadoop.proxyuser.mapred.hosts</name>
<value>*</value>
</property>
<property>
<name>hadoop.proxyuser.mapred.groups</name>
<value>*</value>
</property>
</configuration>
yarn-site.xml:
<configuration>
<property>
<name>yarn.nodemanager.aux-services</name>
<value>mapreduce_shuffle</value>
</property>
<property>
<name>yarn.nodemanager.aux-services.mapreduce_shuffle.class</name>
<value>org.apache.hadoop.mapred.ShuffleHandler</value>
</property>
<property>
<name>yarn.log-aggregation-enable</name>
<value>true</value>
</property>
<property>
<name>yarn.dispatcher.exit-on-error</name>
<value>true</value>
</property>
<property>
<description>List of directories to store localized files in.</description>
<name>yarn.nodemanager.local-dirs</name>
<value>/var/lib/hadoop-yarn/cache/${user.name}/nm-local-dir</value>
</property>
<property>
<description>Where to store container logs.</description>
<name>yarn.nodemanager.log-dirs</name>
<value>/var/log/hadoop-yarn/containers</value>
</property>
<property>
<description>Where to aggregate logs to.</description>
<name>yarn.nodemanager.remote-app-log-dir</name>
<value>/var/log/hadoop-yarn/apps</value>
</property>
<property>
<description>Classpath for typical applications.</description>
<name>yarn.application.classpath</name>
<value>
$HADOOP_CONF_DIR,
$HADOOP_COMMON_HOME/*,$HADOOP_COMMON_HOME/lib/*,
$HADOOP_HDFS_HOME/*,$HADOOP_HDFS_HOME/lib/*,
$HADOOP_MAPRED_HOME/*,$HADOOP_MAPRED_HOME/lib/*,
$HADOOP_YARN_HOME/*,$HADOOP_YARN_HOME/lib/*
</value>
</property>
</configuration>
mapred-site.xml:
<configuration>
<property> <name>mapred.job.tracker</name>
<value>localhost:8021</value>
</property>
<property>
<name>mapreduce.framework.name</name>
<value>yarn</value>
</property>
<property>
<name>mapreduce.jobhistory.address</name>
<value>localhost:10020</value>
</property>
<property>
<name>mapreduce.jobhistory.webapp.address</name>
<value>localhost:19888</value>
</property>
<property>
<description>To set the value of tmp directory for map and reduce tasks.</description>
<name>mapreduce.task.tmp.dir</name>
<value>/var/lib/hadoop-mapreduce/cache/${user.name}/tasks</value>
</property>
</configuration>
camus.properties:
# Needed Camus properties, more cleanup to come
###
# Map Reduce configuration
# Set to local for debugging, set to classic for MRv1
#mapreduce.framework.name=classic
# max hadoop tasks to use, each task can pull multiple topic partitions
mapreduce.job.maps=30
#mapreduce.output.fileoutputformat.compress=true
mapreduce.output.fileoutputformat.compress=false
mapreduce.map.maxattempts=1
###
# ETL (Extract-Load-Transform) Configuration
#
# final top-level data output directory, sub-directory will be dynamically created for each topic pulled
fs.defaultFS=s3n://AWS-ID:SECRET-ID@mybucket
etl.destination.path=/camus/output
# HDFS location where you want to keep execution files, i.e. offsets, error logs, and count files
etl.execution.base.path=/camus
# where completed Camus job output directories are kept, usually a sub-dir in the base.path
etl.execution.history.path=/camus/history
# Concrete implementation of the Encoder class to use (used by Kafka Audit, and thus optional for now)
#camus.message.encoder.class=com.linkedin.batch.etl.kafka.coders.KafkaAvroMessageEncoder
# Concrete implementation of the Decoder class to use
camus.message.decoder.class=com.linkedin.camus.etl.kafka.coders.KafkaAvroMessageDecoder
# Used by avro-based Decoders to use as their Schema Registry
kafka.message.coder.schema.registry.class=com.linkedin.camus.schemaregistry.AvroRestSchemaRegistry
etl.schema.registry.url=http://myvm:2876/schema-repo
# Used by the committer to arrange .avro files into a partitioned scheme. This will be the default partitioner for all
# topic that do not have a partitioner specified
#etl.partitioner.class=com.linkedin.camus.etl.kafka.coders.DefaultPartitioner
# Partitioners can also be set on a per-topic basis
#etl.partitioner.class.<topic-name>=com.your.custom.CustomPartitioner
# all files in this dir will be added to the distributed cache and placed on the classpath for hadoop tasks
# hdfs.default.classpath.dir=
# max historical time that will be pulled from each partition based on event timestamp
kafka.max.pull.hrs=1
# events with a timestamp older than this will be discarded.
kafka.max.historical.days=3
# Max minutes for each mapper to pull messages (-1 means no limit)
kafka.max.pull.minutes.per.task=-1
# if whitelist has values, only whitelisted topic are pulled. nothing on the blacklist is pulled
#kafka.blacklist.topics=
#kafka.whitelist.topics=camus\\..*
# Name of the client as seen by kafka
kafka.client.name=camus
# Fetch Request Parameters
#kafka.fetch.buffer.size=
#kafka.fetch.request.correlationid=
#kafka.fetch.request.max.wait=
#kafka.fetch.request.min.bytes=
# Connection parameters.
#kafka.brokers=precise64:9092
kafka.brokers=192.168.50.3:9092
kafka.timeout.value=6000
#Stops the mapper from getting inundated with Decoder exceptions for the same topic
#Default value is set to 10
max.decoder.exceptions.to.print=5
#Controls the submitting of counts to Kafka
#Default value set to true
#post.tracking.counts.to.kafka=true
post.tracking.counts.to.kafka=false
log4j.configuration=true
# everything below this point can be ignored for the time being, will provide more documentation down the road
##########################
etl.run.tracking.post=false
#kafka.monitor.tier=
#etl.counts.path=
kafka.monitor.time.granularity=10
etl.hourly=hourly
etl.daily=daily
etl.ignore.schema.errors=false
# configure output compression for deflate or snappy. Defaults to deflate
etl.output.codec=deflate
etl.deflate.level=6
#etl.output.codec=snappy
etl.default.timezone=America/Los_Angeles
etl.output.file.time.partition.mins=60
etl.keep.count.files=false
etl.execution.history.max.of.quota=.8
kafka.client.buffer.size=20971520
kafka.client.so.timeout=60000
#zookeeper.session.timeout=
#zookeeper.connection.timeout=
[CamusJob] - Dir Destination set to: /camus/output
No previous execution, all topics pulled from earliest available offset
[CamusJob] - New execution temp location: /camus/2014-07-22-20-51-51
[RMProxy] - Connecting to ResourceManager at /0.0.0.0:8032
[UserGroupInformation] - PriviledgedActionException as:vagrant (auth:SIMPLE) cause:org.apache.hadoop.fs.UnsupportedFileSystemException: No AbstractFileSystem for scheme: s3n
[Cluster] - Failed to use org.apache.hadoop.mapred.YarnClientProtocolProvider due to error: Error in instantiating YarnClient
[UserGroupInformation] - PriviledgedActionException as:vagrant (auth:SIMPLE) cause:java.io.IOException: Cannot initialize Cluster. Please check your configuration for mapreduce.framework.name and the correspond server addresses.
Exception in thread "main" java.io.IOException: Cannot initialize Cluster. Please check your configuration for mapreduce.framework.name and the correspond server addresses.
at org.apache.hadoop.mapreduce.Cluster.initialize(Cluster.java:120)
at org.apache.hadoop.mapreduce.Cluster.<init>(Cluster.java:82)
at org.apache.hadoop.mapreduce.Cluster.<init>(Cluster.java:75)
at org.apache.hadoop.mapreduce.Job$9.run(Job.java:1265)
at org.apache.hadoop.mapreduce.Job$9.run(Job.java:1261)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:416)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1548)
at org.apache.hadoop.mapreduce.Job.connect(Job.java:1260)
at org.apache.hadoop.mapreduce.Job.submit(Job.java:1289)
at com.linkedin.camus.etl.kafka.CamusJob.run(CamusJob.java:318)
at com.linkedin.camus.etl.kafka.CamusJob.run(CamusJob.java:646)
at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:70)
at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:84)
at com.linkedin.camus.etl.kafka.CamusJob.main(CamusJob.java:610)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:622)
at org.apache.hadoop.util.RunJar.main(RunJar.java:212)
In order to fix this error, I added the following to mepred-site.xml:
<property>
<name>fs.AbstractFileSystem.s3n.impl</name>
<value>org.apache.hadoop.fs.s3native.NativeS3FileSystem</value>
<description>The FileSystem for s3n: (Native S3) uris.</description>
</property>
Unfortunately, this produces a new error which is caused by the fact that the NativeS3FileSystem doesn't contain the proper constructor to be used.
[CamusJob] - Dir Destination set to: /camus/output
No previous execution, all topics pulled from earliest available offset
[CamusJob] - New execution temp location: /camus/2014-07-23-21-03-01
[RMProxy] - Connecting to ResourceManager at /0.0.0.0:8032
[Cluster] - Failed to use org.apache.hadoop.mapred.YarnClientProtocolProvider due to error: java.lang.NoSuchMethodException: org.apache.hadoop.fs.s3native.NativeS3FileSystem.<init>(java.net.URI, org.apache.hadoop.conf.Configuration)
[UserGroupInformation] - PriviledgedActionException as:vagrant (auth:SIMPLE) cause:java.io.IOException: Cannot initialize Cluster. Please check your configuration for mapreduce.framework.name and the correspond server addresses.
Exception in thread "main" java.io.IOException: Cannot initialize Cluster. Please check your configuration for mapreduce.framework.name and the correspond server addresses.
at org.apache.hadoop.mapreduce.Cluster.initialize(Cluster.java:120)
at org.apache.hadoop.mapreduce.Cluster.<init>(Cluster.java:82)
at org.apache.hadoop.mapreduce.Cluster.<init>(Cluster.java:75)
at org.apache.hadoop.mapreduce.Job$9.run(Job.java:1265)
at org.apache.hadoop.mapreduce.Job$9.run(Job.java:1261)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:416)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1554)
at org.apache.hadoop.mapreduce.Job.connect(Job.java:1260)
at org.apache.hadoop.mapreduce.Job.submit(Job.java:1289)
at com.linkedin.camus.etl.kafka.CamusJob.run(CamusJob.java:318)
at com.linkedin.camus.etl.kafka.CamusJob.run(CamusJob.java:646)
at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:70)
at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:84)
at com.linkedin.camus.etl.kafka.CamusJob.main(CamusJob.java:610)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:622)
at org.apache.hadoop.util.RunJar.main(RunJar.java:212)
...