|
I have a scala program in which I have implemented a secondary sort which works perfectly. The way I have written that program is:
I wanted to implement the same thing using clojure's DSL for spark called flambo. Since I can't write partitioner using clojure, I re-used the code defind above, compiled it and used it as a dependency in my Clojure code. Now I am importing the partitioner and the key in my clojure code the following way:
But when I try to create
My guess is that its not able to find the ordering that I have defined after the partitioner. But if it works in Scala, why doesn't it work in Clojure? |
RFMCPartitioner` ?
see examples at https://clojuredocs.org/clojure.core/import(ns foo.bar (:import (java.util Date Calendar) (java.util.logging Logger Level)))
(ns xyz
(:import
[** RFMCPartitioner]
[** RFMCKey]
)
)
Hi Ashish
The "package" is indeed the full package name.
--
You received this message because you are subscribed to the Google
Groups "Clojure" group.
To post to this group, send email to clo...@googlegroups.com
Note that posts from new members are moderated - please be patient with your first post.
To unsubscribe from this group, send email to
clojure+u...@googlegroups.com
For more options, visit this group at
http://groups.google.com/group/clojure?hl=en
---
You received this message because you are subscribed to a topic in the Google Groups "Clojure" group.
To unsubscribe from this topic, visit https://groups.google.com/d/topic/clojure/ZoLWl_vbcdU/unsubscribe.
To unsubscribe from this group and all its topics, send an email to clojure+u...@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.
So I finally figured it out on my own. I had to basically write my custom ordering function as a separate scala project and then call that in clojure.
I had my scala file written in this manner:
import org.apache.spark.Partitioner
import org.apache.spark.rdd.RDD
case class RFMCKey(cId: String, R: Double, F: Long, M: Double, C: Double)
class RFMCPartitioner(partitions: Int) extends Partitioner {
require(partitions >= 0, "Number of partitions ($partitions) cannot be negative.")
override def numPartitions: Int = partitions
override def getPartition(key: Any): Int = {
val k = key.asInstanceOf[RFMCKey]
k.cId.hashCode() % numPartitions
}
}
object RFMCKey {
implicit def orderingBycId[A <: RFMCKey] : Ordering[A] = {
Ordering.by(k => (k.R, k.F * -1, k.M * -1, k.C * -1))
}
}
class rfmcSort {
def sortWithRFMC(a: RDD[(String, (((Double, Long), Double), Double))], parts: Int): RDD[(RFMCKey, String)] = {
val x = a.map(v => v match {
case (custId, (((rVal, fVal), mVal),cVal)) => (RFMCKey(custId, rVal, fVal, mVal, cVal), rVal+","+fVal+","+mVal+","+cVal)
}).repartitionAndSortWithinPartitions(new RFMCPartitioner(parts))
x
}
}
I compiled it as ascala project and used it in my clojure code this way:
(:import [org.formcept.wisdom rfmcSort]
[org.apache.spark.rdd.RDD])
sorted-rfmc-records (.toJavaRDD (.sortWithRFMC (rfmcSort.) (.rdd rfmc-records) num_partitions))
Please notice the way I am calling the sortWithRFMC
function from the rfmcSort
object that I created. Also one very important thing to note here is when you pass your JavaPairRDD
to your scala function, you have to convert it into a normal spark RDD
first by calling the .rdd
method on it. And then you have to convert the spark RDD
back to JavaPairRDD
to work with it in clojure.
And sorry that I got your name wrong *Blake :)