def testReduceByKeyAndWindowWithFilteredInverse(
name: String,
input: Seq[Seq[(String, Int)]],
expectedOutput: Seq[Seq[(String, Int)]],
windowDuration: Duration = Seconds(2),
slideDuration: Duration = Seconds(1)
) {
test("reduceByKeyAndWindow with inverse and filter functions - " + name) {
logInfo("reduceByKeyAndWindow with inverse and filter functions - " + name)
val numBatches = expectedOutput.size * (slideDuration / batchDuration).toInt
val filterFunc = (p: (String, Int)) => p._2 != 0
val operation = (s: DStream[(String, Int)]) => {
s.reduceByKeyAndWindow(_ + _, _ - _, windowDuration, slideDuration, filterFunc = filterFunc)
.map( x => x ) //THIS IS THE NEW LINE
.persist()
.checkpoint(Seconds(100)) // Large value to avoid effect of RDD checkpointing
}
testOperation(input, operation, expectedOutput, numBatches, true)
}
}
the new line just adds a dependency in the streaming graph.
Now when you run sbt/sbt 'test-only spark.streaming.WindowOperationsSuite' that last test fails