I'm playing with Akka Stream and trying to define a Source that listen to file system events with the java nio WatchService.
import java.nio.file.StandardWatchEventKinds._
import java.nio.file._
import akka.actor.ActorSystem
import akka.stream.FlowMaterializer
import akka.stream.scaladsl._
import scala.util.{Failure, Success}
object Main extends App {
implicit val system = ActorSystem()
import system.dispatcher
implicit val materializer = FlowMaterializer()
val dir = Paths.get("/tmp")
val watcher = FileSystems.getDefault().newWatchService()
dir.register(watcher, ENTRY_CREATE, ENTRY_DELETE, ENTRY_MODIFY)
val source = Source(() => Some(watcher.poll())).filter(key => key != null)
val sink = ForeachSink[WatchKey](key => {
println(key)
Thread.sleep(1000)
})
val materialized = FlowGraph { implicit builder =>
import akka.stream.scaladsl.FlowGraphImplicits._
source ~> sink
}.run()
materialized.get(sink).onComplete {
case Success(_) =>
system.shutdown()
case Failure(e) =>
println(e)
system.shutdown()
}
}
When I try to filter the source to exclude null watch keys, I get the following exception: