Camus w/ YARN dumping to S3

347 views
Skip to first unread message

Jonathan Herriott

unread,
Jul 23, 2014, 5:18:17 PM7/23/14
to camu...@googlegroups.com
Hi,

I checked out camus on the hadoop2 branch and changed the hadoop libraries to be the 2.3.0 hadoop libraries to match CDH5 I have installed on my vm and added in jar shading for simplicity of deployment.  After that, I tried to run on a pseudo cluster on my local VM pointing to S3 as the fs.defaultFS.  It creates a couple folders just fine in S3, however, it fails in the mapreduce phase due to not recognizing the s3n schema.  My understanding, based on some other threads here, that it should be possible to dump to S3, however, no one ever seemed to give the steps in getting this to work.  Below are my hadoop settings as well as the failures I happened upon.  Any help would be appreciated.

core-site.xml:
<configuration>
 
<property>
   
<name>fs.defaultFS</name>
   
<value>s3n://mybucket</value>
 
</property>
 
<property>
   
<name>fs.s3n.awsAccessKeyId</name>
   
<value>AWS-ID</value>
 
</property>

 
<property>
   
<name>fs.s3n.awsSecretAccessKey</name>
   
<value>SECRET-ID</value>
 
</property>

 
<!-- OOZIE proxy user setting -->
 
<property>
   
<name>hadoop.proxyuser.oozie.hosts</name>
   
<value>*</value>
 
</property>
 
<property>
   
<name>hadoop.proxyuser.oozie.groups</name>
   
<value>*</value>
 
</property>

 
<!-- HTTPFS proxy user setting -->
 
<property>
   
<name>hadoop.proxyuser.httpfs.hosts</name>
   
<value>*</value>
 
</property>
 
<property>
   
<name>hadoop.proxyuser.httpfs.groups</name>
   
<value>*</value>
 
</property>

 
<!-- Llama proxy user setting -->
 
<property>
   
<name>hadoop.proxyuser.llama.hosts</name>
   
<value>*</value>
 
</property>
 
<property>
   
<name>hadoop.proxyuser.llama.groups</name>
   
<value>*</value>
 
</property>

 
<!-- Hue proxy user setting -->
 
<property>
   
<name>hadoop.proxyuser.hue.hosts</name>
   
<value>*</value>
 
</property>
 
<property>
   
<name>hadoop.proxyuser.hue.groups</name>
   
<value>*</value>
 
</property>

 
<!-- Mapred proxy user setting -->
 
<property>
   
<name>hadoop.proxyuser.mapred.hosts</name>
    <value>*</value>
 
</property>
 
<property>
   
<name>hadoop.proxyuser.mapred.groups</name>
   
<value>*</value>
 
</property>
</configuration>


yarn-site.xml:

<configuration>
  <property>
    <name>yarn.nodemanager.aux-services</name>
    <value>mapreduce_shuffle</value>
  </property>
  <property>
    <name>yarn.nodemanager.aux-services.mapreduce_shuffle.class</name>
    <value>org.apache.hadoop.mapred.ShuffleHandler</value>
  </property>
  <property>
    <name>yarn.log-aggregation-enable</name>
  <value>true</value>
  </property>
  <property>
    <name>yarn.dispatcher.exit-on-error</name>
  <value>true</value>
  </property>
  <property>
    <description>List of directories to store localized files in.</description>
    <name>yarn.nodemanager.local-dirs</name>
    <value>/var/lib/hadoop-yarn/cache/${user.name}/nm-local-dir</value>
  </property>
  <property>
    <description>Where to store container logs.</description>
    <name>yarn.nodemanager.log-dirs</name>
    <value>/var/log/hadoop-yarn/containers</value>
  </property>
  <property>
    <description>Where to aggregate logs to.</description>
    <name>yarn.nodemanager.remote-app-log-dir</name>
    <value>/var/log/hadoop-yarn/apps</value>
  </property>
  <property>
    <description>Classpath for typical applications.</description>
    <name>yarn.application.classpath</name>
    <value>
        $HADOOP_CONF_DIR,
        $HADOOP_COMMON_HOME/*,$HADOOP_COMMON_HOME/lib/*,
        $HADOOP_HDFS_HOME/*,$HADOOP_HDFS_HOME/lib/*,
        $HADOOP_MAPRED_HOME/*,$HADOOP_MAPRED_HOME/lib/*,
        $HADOOP_YARN_HOME/*,$HADOOP_YARN_HOME/lib/*
    </value>
  </property>
</configuration>



mapred-site.xml:

<configuration>
  <property>

    <name>mapred.job.tracker</name>
    <value>localhost:8021</value>
  </property>
  <property>
    <name>mapreduce.framework.name</name>
  <value>yarn</value>
  </property>
  <property>
    <name>mapreduce.jobhistory.address</name>
    <value>localhost:10020</value>
  </property>
  <property>
    <name>mapreduce.jobhistory.webapp.address</name>
    <value>localhost:19888</value>
  </property>
  <property>
    <description>To set the value of tmp directory for map and reduce tasks.</description>
    <name>mapreduce.task.tmp.dir</name>
    <value>/var/lib/hadoop-mapreduce/cache/${user.name}/tasks</value>
  </property>
</configuration>


camus.properties:

# Needed Camus properties, more cleanup to come
###
# Map Reduce configuration
# Set to local for debugging, set to classic for MRv1
#mapreduce.framework.name=classic
# max hadoop tasks to use, each task can pull multiple topic partitions
mapreduce.job.maps=30
#mapreduce.output.fileoutputformat.compress=true
mapreduce.output.fileoutputformat.compress=false
mapreduce.map.maxattempts=1

###
# ETL (Extract-Load-Transform) Configuration
#

# final top-level data output directory, sub-directory will be dynamically created for each topic pulled
fs.defaultFS=s3n://AWS-ID:SECRET-ID@mybucket

etl
.destination.path=/camus/output
# HDFS location where you want to keep execution files, i.e. offsets, error logs, and count files
etl.execution.base.path=/camus
# where completed Camus job output directories are kept, usually a sub-dir in the base.path
etl.execution.history.path=/camus/history

# Concrete implementation of the Encoder class to use (used by Kafka Audit, and thus optional for now)
#camus.message.encoder.class=com.linkedin.batch.etl.kafka.coders.KafkaAvroMessageEncoder

# Concrete implementation of the Decoder class to use
camus.message.decoder.class=com.linkedin.camus.etl.kafka.coders.KafkaAvroMessageDecoder

# Used by avro-based Decoders to use as their Schema Registry
kafka.message.coder.schema.registry.class=com.linkedin.camus.schemaregistry.AvroRestSchemaRegistry
etl.schema.registry.url=http://myvm:2876/schema-repo

# Used by the committer to arrange .avro files into a partitioned scheme. This will be the default partitioner for all

# topic that do not have a partitioner specified
#etl.partitioner.class=com.linkedin.camus.etl.kafka.coders.DefaultPartitioner

# Partitioners can also be set on a per-topic basis
#etl.partitioner.class.<topic-name>=com.your.custom.CustomPartitioner

# all files in this dir will be added to the distributed cache and placed on the classpath for hadoop tasks
# hdfs.default.classpath.dir=

# max historical time that will be pulled from each partition based on event timestamp
kafka.max.pull.hrs=1
# events with a timestamp older than this will be discarded.
kafka.max.historical.days=3
# Max minutes for each mapper to pull messages (-1 means no limit)
kafka.max.pull.minutes.per.task=-1

# if whitelist has values, only whitelisted topic are pulled.  nothing on the blacklist is pulled
#kafka.blacklist.topics=
#kafka.whitelist.topics=camus\\..*

# Name of the client as seen by kafka
kafka.client.name=camus
# Fetch Request Parameters
#kafka.fetch.buffer.size=
#kafka.fetch.request.correlationid=
#kafka.fetch.request.max.wait=
#kafka.fetch.request.min.bytes=
# Connection parameters.
#kafka.brokers=precise64:9092
kafka.brokers=192.168.50.3:9092
kafka
.timeout.value=6000

#Stops the mapper from getting inundated with Decoder exceptions for the same topic
#Default value is set to 10
max.decoder.exceptions.to.print=5

#Controls the submitting of counts to Kafka
#Default value set to true
#post.tracking.counts.to.kafka=true
post.tracking.counts.to.kafka=false

log4j
.configuration=true

# everything below this point can be ignored for the time being, will provide more documentation down the road
##########################
etl.run.tracking.post=false
#kafka.monitor.tier=
#etl.counts.path=
kafka.monitor.time.granularity=10

etl
.hourly=hourly
etl.daily=daily
etl.ignore.schema.errors=false

# configure output compression for deflate or snappy. Defaults to deflate
etl.output.codec=deflate
etl.deflate.level=6
#etl.output.codec=snappy

etl
.default.timezone=America/Los_Angeles
etl.output.file.time.partition.mins=60
etl.keep.count.files=false
etl.execution.history.max.of.quota=.8

kafka
.client.buffer.size=20971520
kafka.client.so.timeout=60000
#zookeeper.session.timeout=
#zookeeper.connection.timeout=


Starting the job:
[CamusJob] - Dir Destination set to: /camus/output
No previous execution, all topics pulled from earliest available offset
[CamusJob] - New execution temp location: /camus/2014-07-22-20-51-51
[RMProxy] - Connecting to ResourceManager at /0.0.0.0:8032
[UserGroupInformation] - PriviledgedActionException as:vagrant (auth:SIMPLE) cause:org.apache.hadoop.fs.UnsupportedFileSystemException: No AbstractFileSystem for scheme: s3n
[Cluster] - Failed to use org.apache.hadoop.mapred.YarnClientProtocolProvider due to error: Error in instantiating YarnClient
[UserGroupInformation] - PriviledgedActionException as:vagrant (auth:SIMPLE) cause:java.io.IOException: Cannot initialize Cluster. Please check your configuration for mapreduce.framework.name and the correspond server addresses.
Exception in thread "main" java.io.IOException: Cannot initialize Cluster. Please check your configuration for mapreduce.framework.name and the correspond server addresses.
 at org
.apache.hadoop.mapreduce.Cluster.initialize(Cluster.java:120)
 at org
.apache.hadoop.mapreduce.Cluster.<init>(Cluster.java:82)
 at org
.apache.hadoop.mapreduce.Cluster.<init>(Cluster.java:75)
 at org
.apache.hadoop.mapreduce.Job$9.run(Job.java:1265)
 at org
.apache.hadoop.mapreduce.Job$9.run(Job.java:1261)
 at java
.security.AccessController.doPrivileged(Native Method)
 at javax
.security.auth.Subject.doAs(Subject.java:416)
 at org
.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1548)
 at org
.apache.hadoop.mapreduce.Job.connect(Job.java:1260)
 at org
.apache.hadoop.mapreduce.Job.submit(Job.java:1289)
 at com
.linkedin.camus.etl.kafka.CamusJob.run(CamusJob.java:318)
 at com
.linkedin.camus.etl.kafka.CamusJob.run(CamusJob.java:646)
 at org
.apache.hadoop.util.ToolRunner.run(ToolRunner.java:70)
 at org
.apache.hadoop.util.ToolRunner.run(ToolRunner.java:84)
 at com
.linkedin.camus.etl.kafka.CamusJob.main(CamusJob.java:610)
 at sun
.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
 at sun
.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
 at sun
.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java
.lang.reflect.Method.invoke(Method.java:622)
 at org
.apache.hadoop.util.RunJar.main(RunJar.java:212)


In order to fix this error, I added the following to mepred-site.xml:

<property>

  <name>fs.AbstractFileSystem.s3n.impl</name>
 
<value>org.apache.hadoop.fs.s3native.NativeS3FileSystem</value>
 
<description>The FileSystem for s3n: (Native S3) uris.</description>
</property>


Unfortunately, this produces a new error which is caused by the fact that the NativeS3FileSystem doesn't contain the proper constructor to be used.

[CamusJob] - Dir Destination set to: /camus/output

No previous execution, all topics pulled from earliest available offset
[CamusJob] - New execution temp location: /camus/2014-07-23-21-03-01
[RMProxy] - Connecting to ResourceManager at /0.0.0.0:8032
[Cluster] - Failed to use org.apache.hadoop.mapred.YarnClientProtocolProvider due to error: java.lang.NoSuchMethodException: org.apache.hadoop.fs.s3native.NativeS3FileSystem.<init>(java.net.URI, org.apache.hadoop.conf.Configuration)
[UserGroupInformation] - PriviledgedActionException as:vagrant (auth:SIMPLE) cause:java.io.IOException: Cannot initialize Cluster. Please check your configuration for mapreduce.framework.name and the correspond server addresses.
Exception in thread "main" java.io.IOException: Cannot initialize Cluster. Please check your configuration for mapreduce.framework.name and the correspond server addresses.
       at org.apache.hadoop.mapreduce.Cluster.initialize(Cluster.java:120)
       at org.apache.hadoop.mapreduce.Cluster.<init>(Cluster.java:82)
       at org.apache.hadoop.mapreduce.Cluster.<init>(Cluster.java:75)
       at org.apache.hadoop.mapreduce.Job$9.run(Job.java:1265)
       at org.apache.hadoop.mapreduce.Job$9.run(Job.java:1261)
       at java.security.AccessController.doPrivileged(Native Method)
       at javax.security.auth.Subject.doAs(Subject.java:416)
       at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1554)
       at org.apache.hadoop.mapreduce.Job.connect(Job.java:1260)
       at org.apache.hadoop.mapreduce.Job.submit(Job.java:1289)
       at com.linkedin.camus.etl.kafka.CamusJob.run(CamusJob.java:318)
       at com.linkedin.camus.etl.kafka.CamusJob.run(CamusJob.java:646)
       at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:70)
       at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:84)
       at com.linkedin.camus.etl.kafka.CamusJob.main(CamusJob.java:610)
       at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
       at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
       at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
       at java.lang.reflect.Method.invoke(Method.java:622)
       at org.apache.hadoop.util.RunJar.main(RunJar.java:212)



Zhun Shen

unread,
May 8, 2015, 6:00:01 AM5/8/15
to camu...@googlegroups.com
Hi Jonathan,

I met the same problem. Did you solve it so far ?

Thanks,

Allen

haiyi...@gmail.com

unread,
May 9, 2015, 3:29:56 PM5/9/15
to camu...@googlegroups.com
If you switch s3n:// with s3://, will that work?

Writing directly to s3n from camus might not be a good idea since s3n doesn't provide data locality (comparing to the regular HDFS filesystem).  Also S3 filesystem have eventual consistency problem (the file might not there after the writing) which might be an issue since camus needs to do file rename during the middle.  S3 is also a remote filesystem to camus which would incur latency and reliability problem.  It might be better to write to local HDFS and then distcp or file-move to s3.
...
Reply all
Reply to author
Forward
0 new messages