IllegalArgumentException: Wrong FS when running fat jar on Amazon EMR

290 views
Skip to first unread message

pishen tsai

unread,
Jul 30, 2014, 11:42:47 AM7/30/14
to scoobi...@googlegroups.com
Hello,

Just bumped into this Wrong FS exception when I tried to run a fat jar on Amazon EMR, the details are listed at
https://gist.github.com/pishen/d2e4c6b679f821ce0b47

Seems like it doesn't allow me to persist file on S3?
But the weird part is, the folder s3://research-center/output/mining-basic-info/users still contains the output I want.

The EMR version is:
AMI version: 3.0.4
Hadoop distribution: Amazon 2.2.0 (I think this is hadoop 2.2.0 optimized by Amazon, not very sure.)

Also wondering are the bunch of
"INFO org.apache.hadoop.conf.Configuration.deprecation" normal?
These info started to appear since I moved from hadoop 1.x to 2.x

Thanks,
pishen

Eric Torreborre

unread,
Jul 30, 2014, 6:52:11 PM7/30/14
to scoobi...@googlegroups.com
Hi,

You can't use a Text source to directly read files off S3. What we generally do is that we use distcp to copy files to HDFS on the cluster, then we execute the Scoobi job(s) and finally copy back results to S3.

> Also wondering are the bunch of "INFO org.apache.hadoop.conf.Configuration.deprecation" normal?
> These info started to appear since I moved from hadoop 1.x to 2.x

If I remember correctly some of those deprecations are being triggered by Hadoop itself which is loading its own "old" properties and then warning about them...

Eric.

pishen tsai

unread,
Jul 30, 2014, 9:30:40 PM7/30/14
to scoobi...@googlegroups.com
hmmm, interesting.
So even I am still able to read the input directly from s3 and write the output to my destination on s3 in this case, this is still a dangerous action (with unpredictable result) due to the Exception?

pishen


--
You received this message because you are subscribed to a topic in the Google Groups "scoobi-users" group.
To unsubscribe from this topic, visit https://groups.google.com/d/topic/scoobi-users/A5tXf0vYW5I/unsubscribe.
To unsubscribe from this group and all its topics, send an email to scoobi-users...@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

Kevin C

unread,
Sep 3, 2014, 12:45:22 AM9/3/14
to scoobi...@googlegroups.com
I have the exact same version and same problems with Pishen. Pig works perfectly fine in EMR<->s3 only mode. For some people, the point of using only EMR is to not deal with Hadoop/HDFS management, upgrades, debugging, monitoring, and what not.

Scoobi seems to work with S3 (writing the results) up to the very last stage, and then it fails. Based on the exceptions I suspect it has something to do with org.apache.hadoop.fs.FileSystem.exists' checkPath (which doesn't seem to allow s3n:// prefix).

If I can't find a work-around, I will switch to Scalding which I've heard works well with only S3.

Kevin C

unread,
Sep 3, 2014, 12:49:22 AM9/3/14
to scoobi...@googlegroups.com
BTW here's the exception: http://pastebin.com/ANhvGmh7

Eric Torreborre

unread,
Sep 3, 2014, 1:57:09 AM9/3/14
to scoobi...@googlegroups.com
The workaround is to disable the check in the fromTextFiles method:

def fromTextFiles(paths: Seq[String], check: Source.InputCheck = Source.defaultInputCheck): DList[String] =

Like this:

fromTextFiles(Seq(mapping), Source.noInputCheck)

Eric.

Kevin C

unread,
Sep 4, 2014, 4:32:19 PM9/4/14
to scoobi...@googlegroups.com
The error occurs when trying to write a file in setJobSuccess(). It fails when it tries to check whether the directory exists. Do you mean toTextFiles instead of fromTextFiles? At any rate, I tried recompiling the latest Scoobi with Scala 2.10.3 and run it with my example (also Scala 2.10.3) and get the following issue:

[error] (run-main-0) java.lang.IncompatibleClassChangeError: Found interface scalaz.syntax.ApplyOps, but class was expected
java.lang.IncompatibleClassChangeError: Found interface scalaz.syntax.ApplyOps, but class was expected
at com.nicta.scoobi.application.Hadoop$class.classDirs(Hadoop.scala:50)
at myCompany.WordCount$.classDirs(WordCount.scala:11)

Any help would be greatly appreciated.

Eric Torreborre

unread,
Sep 5, 2014, 9:30:00 AM9/5/14
to scoobi...@googlegroups.com
Try again with the latest-latest Scoobi. We've recently reverted the Scalaz 7.1.0 upgrade which is causing the issue you are seeing.

Kevin C

unread,
Sep 5, 2014, 5:41:28 PM9/5/14
to scoobi...@googlegroups.com
Thanks Eric. I reverted to 0.8.5, recompiled, and it worked. So here are my findings (at least with 0.8.5)

A) In ... io.impl.FileSystem.scala:
  def listDirectPaths(dest: Path)(implicit configuration: ScoobiConfiguration): Seq[Path] = {
    ...
    // it uses fileSystem (which simply uses FileSystem.get(configuration)) which
    // defaults to hdfs://, but if using S3 as dest:Path, it'll return with a "Wrong FS" exception.
    // Here, fileSystem should really be FileSystem.get(dest.toUri, configuration) to be the
    // actual file system of dest:Path. After this fix, it finally works!!! 
    // I don't know if you already fixed this in the latest version.
    ...
  }
     
B) In OutputChannel.scala:
  def moveFileFromTo(srcDir: Path, destDir: Path)(implicit sc: ScoobiConfiguration, fileSystems: FileSystems, outerLogger: Log): Path => Unit = { path =>
    ...
    // below line will move local directory successfully, but will not move correctly on S3.
    // into       s3n://kevin-logrepo/emr-output/ch13-11 (with the undesired tag and sink ids)
    // instead of s3n://kevin-logrepo/emr-output/
    val moved = moveTo(destDir).apply(path, newPath)
    ...
  }

C) In OutputChannel.scala:
  def collectSuccessFile(successFile: Option[Path])(implicit sc: ScoobiConfiguration, fileSystems: FileSystems) = {
    ...
        FileSystems.fileSystem(outDir).create(new Path(outDir, "_SUCCESS_JOB"))
        // above line does not actually create a _SUCCESS_JOB file on S3:// for unknown reason. You can
        // verify that it doesn't work on s3 by adding below:
        //val ls = FileSystems.fileSystem(outDir).listStatus(outDir).toSeq.map(_.getPath)
        //outer.logger.info(s"**** Oh darn it! I don't have _SUCCESS_JOB:$ls")
    ...
  }

I apologize to bombard you with reports, but I'm trying to finalize the framework for my company's developers. I'd really really prefer using Scoobi than Scalding due to cleaner syntax and hoping to get something done quickly before next week. Thanks,
 
-Kevin

Kevin C

unread,
Sep 5, 2014, 7:13:12 PM9/5/14
to scoobi...@googlegroups.com
Hi Eric, I bit the bullet and spent some time looking at the s3-related bugs and fixed it. So I'm happy to say that my company (~300 employees) will try out Scoobi! Comments below:

On Friday, September 5, 2014 2:41:28 PM UTC-7, Kevin C wrote:

B) In OutputChannel.scala:
  def moveFileFromTo(srcDir: Path, destDir: Path)(implicit sc: ScoobiConfiguration, fileSystems: FileSystems, outerLogger: Log): Path => Unit = { path =>
    ...
    // below line will move local directory successfully, but will not move correctly on S3.
    // into       s3n://kevin-logrepo/emr-output/ch13-11 (with the undesired tag and sink ids)
    // instead of s3n://kevin-logrepo/emr-output/
    val moved = moveTo(destDir).apply(path, newPath)
    ...
  }

I have a fix below because s3 file commands works very differently than other file systems, I hope the fix below (or another fix you incorporate) can be in the next release:
  /** @return a function moving a Path to a given directory */
  def moveTo(dir: Path)(implicit configuration: Configuration): (Path, Path) => Boolean = (path: Path, newPath: Path) => {
    !pathExists(path) || {
      val from = fileSystem(path)
      val to   = fileSystem(dir)

      val destPath = to.makeQualified(new Path(dirPath(dir.toString) + newPath))
      if (!pathExists(destPath.getParent)) to.mkdirs(destPath.getParent)

      // s3 has some quirks (can't rename, can't copy/rename dir simultaneously, ...)
      val isS3 = List("s3n", "s3").contains(to.getScheme.toLowerCase)
      if (!isS3 && sameFileSystem(from, to))
        (path == destPath) || {
          logger.debug(s"renaming $path to $destPath")
          tryOk {
            Compatibility.rename(path, destPath)  // not s3 compatible
          }
        }
      else {
        val fStatus = from.getFileStatus(path)
        if (fStatus.isDirectory && isS3) {
          // copying from dir/ to s3 requires copying individual dir/* files
          val sourceFiles = FileSystem.get(path.toUri, configuration).listStatus(path)
            .toSeq.map(_.getPath).toList
          for (sourceFile <- sourceFiles) {
            logger.debug(s"individually copying $sourceFile to $destPath")
            FileUtil.copy(from, sourceFile, to, destPath,
              true /* deleteSource */, true /* overwrite */, configuration)
          }
          true
        } else {
          // copying from one dir to another
          logger.debug(s"copying $path to $destPath")
          FileUtil.copy(from, path, to, destPath,
            true /* deleteSource */, true /* overwrite */, configuration)
        }
      }
    }
  }
 

C) In OutputChannel.scala:
  def collectSuccessFile(successFile: Option[Path])(implicit sc: ScoobiConfiguration, fileSystems: FileSystems) = {
    ...
        FileSystems.fileSystem(outDir).create(new Path(outDir, "_SUCCESS_JOB"))
        // above line does not actually create a _SUCCESS_JOB file on S3:// for unknown reason. You can
        // verify that it doesn't work on s3 by adding below:
        //val ls = FileSystems.fileSystem(outDir).listStatus(outDir).toSeq.map(_.getPath)
        //outer.logger.info(s"**** Oh darn it! I don't have _SUCCESS_JOB:$ls")
    ...
  }

  def collectSuccessFile(successFile: Option[Path])(implicit sc: ScoobiConfiguration, fileSystems: FileSystems) = {
    import fileSystems._; implicit val configuration = sc.configuration

    // if the job is successful, create a success file to every output directory
    // however create it as a _SUCCESS_JOB file
    // this will be switched to a _SUCCESS job if all the jobs of the Scoobi job succeed
    successFile.foreach { successFile =>
      sinks.flatMap(_.outputPath).foreach { outDir =>
        //FileSystems.fileSystem(outDir).create(new Path(outDir, "_SUCCESS_JOB"))  // will not work on s3
        FileSystems.fileSystem(outDir).createNewFile(new Path(outDir, "_SUCCESS_JOB")) // required on s3
      }
    }
  }

I'm sticking to the latest/stable release (0.8.5) for now. Thanks for your help Eric.

-Kevin

Eric Torreborre

unread,
Sep 7, 2014, 6:36:01 PM9/7/14
to scoobi...@googlegroups.com
Could you please open a pull request on Github? I'll test it on my side and merge it. 

Thanks,

Eric.

Kevin C

unread,
Sep 8, 2014, 3:45:15 PM9/8/14
to scoobi...@googlegroups.com

Cindy Lamm

unread,
Jun 15, 2015, 5:30:24 AM6/15/15
to scoobi...@googlegroups.com
Hello,

I am evaluating Scoobi for the usage on AWS EMR (with AMI 3.7.0 and Hadoop version "Amazon 2.4.0"). The Scoobi version I use is 0.9.2 - the latest as of this moment.

The job I want to run should read a text file from S3, do some aggregation (here only count lines in file) and write the aggregation result back into another text file on S3. 
The Scoobi application I wrote does the job, meaning the output is written on S3, however I get the following exception:

Exception in thread "main" java.lang.IllegalArgumentException: Wrong FS: s3://staging-for-aggregation/output-data/scoobi/ch46-23, expected: hdfs://172.31.21.136:9000
at org.apache.hadoop.fs.FileSystem.checkPath(FileSystem.java:647)
        ....
        at com.nicta.scoobi.impl.Persister.persist(Persister.scala:52)
at com.nicta.scoobi.impl.ScoobiConfigurationImpl.persist(ScoobiConfigurationImpl.scala:392)
at com.nicta.scoobi.application.Persist$class.persist(Persist.scala:32)
at com.mycompany.aggregation.poc.TotalUserCount$.persist(TotalUserCount.scala:6)


My code:

object TotalUserCount extends ScoobiApp {
  def run(): Unit = {
    val lines: DList[String] = fromTextFile(args(0))
    val total: DObject[Long] = lines.size
    total.toTextFile(args(1)).persist
  }
}


I saw the pull request from Kevin is merged and published with Scoobi version 0.9.0, so I wonder why this issue appears again?

Regards,
Cindy


----------------------------
Full exception stack:

Exception in thread "main" java.lang.IllegalArgumentException: Wrong FS: s3://staging-for-aggregation/output-data/scoobi/ch46-23, expected: hdfs://172.31.21.136:9000
at org.apache.hadoop.fs.FileSystem.checkPath(FileSystem.java:647)
at org.apache.hadoop.hdfs.DistributedFileSystem.getPathName(DistributedFileSystem.java:191)
at org.apache.hadoop.hdfs.DistributedFileSystem.access$000(DistributedFileSystem.java:102)
at org.apache.hadoop.hdfs.DistributedFileSystem$11.doCall(DistributedFileSystem.java:595)
at org.apache.hadoop.hdfs.DistributedFileSystem$11.doCall(DistributedFileSystem.java:591)
at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
at org.apache.hadoop.hdfs.DistributedFileSystem.delete(DistributedFileSystem.java:591)
at com.nicta.scoobi.impl.exec.ExecutionMode$$anonfun$saveSinks$2$$anonfun$apply$5$$anonfun$apply$6.apply(ExecutionMode.scala:145)
at com.nicta.scoobi.impl.exec.ExecutionMode$$anonfun$saveSinks$2$$anonfun$apply$5$$anonfun$apply$6.apply(ExecutionMode.scala:141)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at com.nicta.scoobi.impl.exec.ExecutionMode$$anonfun$saveSinks$2$$anonfun$apply$5.apply(ExecutionMode.scala:141)
at com.nicta.scoobi.impl.exec.ExecutionMode$$anonfun$saveSinks$2$$anonfun$apply$5.apply(ExecutionMode.scala:140)
at scala.Option.foreach(Option.scala:245)
at com.nicta.scoobi.impl.exec.ExecutionMode$$anonfun$saveSinks$2.apply(ExecutionMode.scala:140)
at com.nicta.scoobi.impl.exec.ExecutionMode$$anonfun$saveSinks$2.apply(ExecutionMode.scala:90)
at scala.collection.immutable.List.foreach(List.scala:383)
at com.nicta.scoobi.impl.exec.ExecutionMode$class.saveSinks(ExecutionMode.scala:90)
at com.nicta.scoobi.impl.exec.HadoopMode.saveSinks(HadoopMode.scala:46)
at com.nicta.scoobi.impl.exec.HadoopMode$$anonfun$executeNode$1.apply(HadoopMode.scala:78)
at com.nicta.scoobi.impl.exec.HadoopMode$$anonfun$executeNode$1.apply(HadoopMode.scala:67)
at org.kiama.attribution.AttributionCore$CachedAttribute.apply(AttributionCore.scala:63)
at scalaz.syntax.IdOps.$bar$greater(IdOps.scala:28)
at com.nicta.scoobi.impl.exec.HadoopMode.execute(HadoopMode.scala:54)
at com.nicta.scoobi.impl.exec.HadoopMode.execute(HadoopMode.scala:52)
at com.nicta.scoobi.impl.Persister.persist(Persister.scala:52)
at com.nicta.scoobi.impl.ScoobiConfigurationImpl.persist(ScoobiConfigurationImpl.scala:392)
at com.nicta.scoobi.application.Persist$class.persist(Persist.scala:32)
at com.mycompany.aggregation.poc.TotalUserCount$.persist(TotalUserCount.scala:6)
at com.nicta.scoobi.application.Persist$PersistableObject.persist(Persist.scala:156)
at com.mycompany.aggregation.poc.TotalUserCount$.run(TotalUserCount.scala:13)
at com.nicta.scoobi.application.ScoobiApp$$anonfun$main$1.apply$mcV$sp(ScoobiApp.scala:81)
at com.nicta.scoobi.application.ScoobiApp$$anonfun$main$1.apply(ScoobiApp.scala:76)
at com.nicta.scoobi.application.ScoobiApp$$anonfun$main$1.apply(ScoobiApp.scala:76)
at com.nicta.scoobi.application.Hadoop$class.runOnCluster(Hadoop.scala:115)
at com.mycompany.aggregation.poc.TotalUserCount$.runOnCluster(TotalUserCount.scala:6)
at com.nicta.scoobi.application.Hadoop$class.executeOnCluster(Hadoop.scala:69)
at com.mycompany.aggregation.poc.TotalUserCount$.executeOnCluster(TotalUserCount.scala:6)
at com.nicta.scoobi.application.Hadoop$$anonfun$onCluster$1.apply(Hadoop.scala:55)
at com.nicta.scoobi.application.InMemoryHadoop$class.withTimer(InMemory.scala:71)
at com.mycompany.aggregation.poc.TotalUserCount$.withTimer(TotalUserCount.scala:6)
at com.nicta.scoobi.application.InMemoryHadoop$class.showTime(InMemory.scala:79)
at com.mycompany.aggregation.poc.TotalUserCount$.showTime(TotalUserCount.scala:6)
at com.nicta.scoobi.application.Hadoop$class.onCluster(Hadoop.scala:55)
at com.mycompany.aggregation.poc.TotalUserCount$.onCluster(TotalUserCount.scala:6)
at com.nicta.scoobi.application.Hadoop$class.onHadoop(Hadoop.scala:61)
at com.mycompany.aggregation.poc.TotalUserCount$.onHadoop(TotalUserCount.scala:6)
at com.nicta.scoobi.application.ScoobiApp$class.main(ScoobiApp.scala:76)
at com.mycompany.aggregation.poc.TotalUserCount$.main(TotalUserCount.scala:6)
at com.mycompany.aggregation.poc.TotalUserCount.main(TotalUserCount.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.apache.hadoop.util.RunJar.main(RunJar.java:212)
Reply all
Reply to author
Forward
0 new messages