Exception in thread "main" java.lang.NullPointerException
at org.kie.internal.io.ResourceFactory.newClassPathResource(ResourceFactory.java:95)
at ............................$.GetKnowledgeSession(BatchDriverDrools.scala:272)
at ..........................................$.main(BatchDriverDrools.scala:100)
at ...............................................main(BatchDriverDrools.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:906)
at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:197)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:227)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:136)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
The code is as shown below :
def GetKnowledgeSession() : InternalKnowledgeBase = {
val config:KnowledgeBuilderConfiguration = KnowledgeBuilderFactory.newKnowledgeBuilderConfiguration()
config.setProperty("drools.dialect.mvel.strict", "false")
var kbuilder : KnowledgeBuilder = KnowledgeBuilderFactory.newKnowledgeBuilder(config)
kbuilder.add(ResourceFactory.newClassPathResource("hdfs://<cluster-name>/user/aniketkulkarni/rules/eventfilter.drl", BatchDriverDrools.getClass), ResourceType.DRL)
println(kbuilder.getErrors().toString())
var kbase : InternalKnowledgeBase = KnowledgeBaseFactory.newKnowledgeBase()
kbase.addPackages(kbuilder.getKnowledgePackages())
kbase
}
I am using the above method to instantiate the internalknowledgebase and broadcast it to all executors
val internalKnowledgeBase = GetKnowledgeSession()
val broadcastRules = sc.broadcast(internalKnowledgeBase)....def filterEvents(kbase: Broadcast[InternalKnowledgeBase], rdd: RDD[Event]): RDD[Event] = {
val filteredEventsRDD: RDD[Event] = rdd.mapPartitions(
partition => {
val kieSession = kbase.value.newKieSession()
partition.filter { case event => {
val map = event.getData
kieSession.insert(map)
kieSession.fireAllRules()
//eventFilter.value.isCheckoutEvent(fptimap)
//fptimap.containsKey("name")
map.containsKey("name") match {
case x: Boolean => {
x
}
}
}
}
}
)
filteredEventsRDD
}I am able to run this locally on my eclipse setup when the drl file is in the classpath under /src/main/resources/rules/eventfilter.drlI have tried using classpathresource and fileresource( with hdfs and local file on the cluster) and I am getting a NullPointerException in both cases.Is there any setup required to make this run on a hadoop cluster? Please let me know if I can clarify any part of this. Thanks.