Tests

48 views
Skip to first unread message

Michael Schiff

unread,
Aug 5, 2013, 6:05:50 PM8/5/13
to spark-de...@googlegroups.com
I noticed that all of the tests in RDDSuite use "local" as the master.  I have been running up against this issue with CoGroupedRDD 

https://spark-project.atlassian.net/browse/SPARK-703?jql=project%20%3D%20SPARK%20AND%20resolution%20%3D%20Unresolved%20AND%20priority%20%3D%20Major%20ORDER%20BY%20key%20DESC

Things have been working fine when running with a local master, but if I run with master and slave on separate machines, I see the reported error.  Because of this, I wanted to check unit test behavior in a distributed setting.

I made a couple of modifications to RDDSuite.scala in core/src/test...

SparkContexts are initialized with a reference to a distributed spark master, SPARK_HOME, and the path to the jar containing the test code (necessary for running in non-local mode).

Now unit tests begin to fail.  I believe it is important to have test coverage for cases with a non-local master.  Should I make my modifications available as a pull request?

Reynold Xin

unread,
Aug 5, 2013, 6:23:05 PM8/5/13
to spark-de...@googlegroups.com
Hi Michael,

Thanks for the email. What exactly are the problems that the unit tests are not catching? It might be better to have a test that captures those errors rather than changing RDDSuite.

The tests in RDDSuite are just testing the logic of the operators. There are DistributedSuite and ShuffleSuite to actually test shuffle operations in a semi-distributed setting (using the local-cluster master).

Unit tests are supposed to be short and easy to test. Requiring connection to an external standalone cluster would make these tests very hard to run, not to mention CI.


--
You received this message because you are subscribed to the Google Groups "Spark Developers" group.
To unsubscribe from this group and stop receiving emails from it, send an email to spark-develope...@googlegroups.com.
For more options, visit https://groups.google.com/groups/opt_out.
 
 

Michael Schiff

unread,
Aug 5, 2013, 6:33:04 PM8/5/13
to spark-de...@googlegroups.com
I agree that this change definitely makes things a little harder to run, and as such would not be a good idea to roll into the main branch.  I was un-aware of the Distributed/Shuffle suite, I will take a look at those.

Mainly I am trying to diagnose and hopefully fix the issue I linked to in the original post.  It seems to only occur when running in a distributed setting.  I have tried the suggestions in the comments of that issue, but none of these work.  It seems to have something to do with CoGroupedRDD, which is used to implement many of the DStreams that handle operations by Key (reduceByKeyAndWindow -> ReducedWindowedDStream, updateStateByKey -> StateDStream, etc.)

It seems that the issue can be caused with the Kafka example, but I would like to get some unit test coverage that can invoke the error, this way it will be easier for more people to try to fix.

Michael Schiff

unread,
Aug 5, 2013, 10:23:09 PM8/5/13
to spark-de...@googlegroups.com
So I looked at both the DistributedSuite and ShuffleSuite.

ShuffleSuite appears to use "local" as the master as well.  DistributedSuite uses local-cluster mode, so I will try in add in a unit test there that invokes the error.

In the mean time, the error can be seen by modifying TestSuiteBase.scala to uses standalone mode (change the streaming context to use a spark://url:7077 for the master, and provide reference to SPARK_HOME and the test jar).

Then modify WindowOperationsSuite.scala:

def testReduceByKeyAndWindowWithFilteredInverse(
      name: String,
      input: Seq[Seq[(String, Int)]],
      expectedOutput: Seq[Seq[(String, Int)]],
      windowDuration: Duration = Seconds(2),
      slideDuration: Duration = Seconds(1)
    ) {
    test("reduceByKeyAndWindow with inverse and filter functions - " + name) {
      logInfo("reduceByKeyAndWindow with inverse and filter functions - " + name)
      val numBatches = expectedOutput.size * (slideDuration / batchDuration).toInt
      val filterFunc = (p: (String, Int)) => p._2 != 0
      val operation = (s: DStream[(String, Int)]) => {
        s.reduceByKeyAndWindow(_ + _, _ - _, windowDuration, slideDuration, filterFunc = filterFunc)
.map( x => x ) //THIS IS THE NEW LINE
          .persist()
          .checkpoint(Seconds(100)) // Large value to avoid effect of RDD checkpointing
      }
      testOperation(input, operation, expectedOutput, numBatches, true)
    }
  }

the new line just adds a dependency in the streaming graph.

Now when you run sbt/sbt 'test-only spark.streaming.WindowOperationsSuite' that last test fails

Michael Schiff

unread,
Aug 5, 2013, 10:43:22 PM8/5/13
to spark-de...@googlegroups.com
So the bug can be caused in local-cluster mode.

If you change the cluster url url to "local-cluster[2,1,512]" in TestSuiteBase and add the identity mapper from the my last post you will see the error I am talking about.

Matei Zaharia

unread,
Aug 6, 2013, 4:04:33 PM8/6/13
to spark-de...@googlegroups.com
Thanks for pointing this out, Michael. My suggestion is also to add more tests to DistributedSuite rather than making all our tests be on a cluster. Having some tests that can run fast really makes a difference.

Matei

On Aug 5, 2013, at 7:43 PM, Michael Schiff <schiff....@gmail.com> wrote:

So the bug can be caused in local-cluster mode.

If you change the cluster url url to "local-cluster[2,1,512]" in TestSuiteBase and add the identity mapper from the my last post you will see the error I am talking about.

Reply all
Reply to author
Forward
0 new messages