Drools with Spark and Hadoop

935 views
Skip to first unread message

Aniket Kulkarni

unread,
Aug 14, 2018, 7:33:26 PM8/14/18
to Drools Usage
Hi all, 

I am trying to integrate Drools with Spark. I wish to read the drl file from hdfs and then apply the file to certain events.

However, when I am running the code I am getting a NullPointerException.

Exception in thread "main" java.lang.NullPointerException

at org.kie.internal.io.ResourceFactory.newClassPathResource(ResourceFactory.java:95)

at ............................$.GetKnowledgeSession(BatchDriverDrools.scala:272)

at ..........................................$.main(BatchDriverDrools.scala:100)

at ...............................................main(BatchDriverDrools.scala)

at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)

at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

at java.lang.reflect.Method.invoke(Method.java:498)

at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)

at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:906)

at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:197)

at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:227)

at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:136)

at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)



The code is as shown below : 


def GetKnowledgeSession() : InternalKnowledgeBase = {
val config:KnowledgeBuilderConfiguration = KnowledgeBuilderFactory.newKnowledgeBuilderConfiguration()
config.setProperty("drools.dialect.mvel.strict", "false")
var kbuilder : KnowledgeBuilder = KnowledgeBuilderFactory.newKnowledgeBuilder(config)
kbuilder.add(ResourceFactory.newClassPathResource("hdfs://<cluster-name>/user/aniketkulkarni/rules/eventfilter.drl", BatchDriverDrools.getClass), ResourceType.DRL)
println(kbuilder.getErrors().toString())
var kbase : InternalKnowledgeBase = KnowledgeBaseFactory.newKnowledgeBase()
kbase.addPackages(kbuilder.getKnowledgePackages())

kbase
}

I am using the above method to instantiate the internalknowledgebase and broadcast it to all executors
val internalKnowledgeBase = GetKnowledgeSession()
val broadcastRules = sc.broadcast(internalKnowledgeBase)


....

def filterEvents(kbase: Broadcast[InternalKnowledgeBase], rdd: RDD[Event]): RDD[Event] = {
val filteredEventsRDD: RDD[Event] = rdd.mapPartitions(
partition => {
val kieSession = kbase.value.newKieSession()
partition.filter { case event => {
val map = event.getData
kieSession.insert(map)
kieSession.fireAllRules()
//eventFilter.value.isCheckoutEvent(fptimap)
//fptimap.containsKey("name")
map.containsKey("name") match {
case x: Boolean => {
x
}
}
}
}
}
)
filteredEventsRDD
}


I am able to run this locally on my eclipse setup when the drl file is in the classpath under /src/main/resources/rules/eventfilter.drl
I have tried using classpathresource and fileresource( with hdfs and local file on the cluster) and I am getting a NullPointerException in both cases.

Is there any setup required to make this run on a hadoop cluster? Please let me know if I can clarify any part of this.

Thanks.











jagan reddy

unread,
Aug 15, 2018, 12:41:02 PM8/15/18
to Drools Usage
Hi,

remove hdfs://<cluster-name>/user/aniketkulkarni/rules/eventfilter.drl  path, add eventfilter.drl file. in resource folder.

Thanks
Jagan

Aniket Kulkarni

unread,
Aug 15, 2018, 2:18:02 PM8/15/18
to Drools Usage
Hello Jagan,

Thanks for your reply. As mentioned above, I have tried adding the file to the classpath under src/main/resources/rules and it did not work.

Do you recommend specifically trying under src/main/resources without a subfolder?

Thanks.

jagan reddy

unread,
Aug 16, 2018, 1:01:56 AM8/16/18
to Drools Usage
Hi,

Yes

Thanks,
Jagan

William Benton

unread,
Aug 23, 2018, 12:52:53 PM8/23/18
to Drools Usage
Hi Aniket,

I don't have an HDFS installation to test this, but I know there are sometimes problems with getting classpath resources from HDFS from Spark.  However, it isn't clear that your resource is actually in the classpath in the first place (it would be if you loaded a JAR from HDFS, but then you'd have a different path and probably a custom classloader).   

Can you try this possible solution?  Load the resource as a FSDataInputStream (get this with FileSystem.open from HDFS) and pass that to ResourceFactory.newInputStreamResource.


best,
wb
Reply all
Reply to author
Forward
0 new messages