Secondar Sorting in spark using clojure/flambo

103 views
Skip to first unread message

Punit Naik

unread,
Jul 8, 2016, 12:01:27 PM7/8/16
to Clojure


         

I have a scala program in which I have implemented a secondary sort which works perfectly. The way I have written that program is:

object rfmc {
  // Custom Key and partitioner

  case class RFMCKey(cId: String, R: Double, F: Double, M: Double, C: Double)
  class RFMCPartitioner(partitions: Int) extends Partitioner {
    require(partitions >= 0, "Number of partitions ($partitions) cannot be negative.")
    override def numPartitions: Int = partitions
    override def getPartition(key: Any): Int = {
      val k = key.asInstanceOf[RFMCKey]
      k.cId.hashCode() % numPartitions
    }
  }
  object RFMCKey {
    implicit def orderingBycId[A <: RFMCKey] : Ordering[A] = {
      Ordering.by(k => (k.R, k.F * -1, k.M * -1, k.C * -1))
    }
  }
  // The body of the code
  //
  //
  val x = rdd.map(RFMCKey(cust,r,f,m,c), r+","+f+","+m+","+c)
  val y = x.repartitionAndSortWithinPartitions(new RFMCPartitioner(1))
}

I wanted to implement the same thing using clojure's DSL for spark called flambo. Since I can't write partitioner using clojure, I re-used the code defind above, compiled it and used it as a dependency in my Clojure code.

Now I am importing the partitioner and the key in my clojure code the following way:

(ns xyz
  (:import
    [package RFMCPartitioner]
    [package RFMCKey]
    )
  )

But when I try to create RFMCKey by doing (RFMCKey. cust_id r f m c), it throws the following error:

java.lang.ClassCastException: org.formcept.wisdom.RFMCKey cannot be cast to java.lang.Comparable
    at org.spark-project.guava.collect.NaturalOrdering.compare(NaturalOrdering.java:28)
    at scala.math.LowPriorityOrderingImplicits$$anon$7.compare(Ordering.scala:153)
    at org.apache.spark.util.collection.ExternalSorter$$anon$8.compare(ExternalSorter.scala:170)
    at org.apache.spark.util.collection.ExternalSorter$$anon$8.compare(ExternalSorter.scala:164)
    at org.apache.spark.util.collection.TimSort.countRunAndMakeAscending(TimSort.java:252)
    at org.apache.spark.util.collection.TimSort.sort(TimSort.java:110)
    at org.apache.spark.util.collection.Sorter.sort(Sorter.scala:37)
    at org.apache.spark.util.collection.SizeTrackingPairBuffer.destructiveSortedIterator(SizeTrackingPairBuffer.scala:83)
    at org.apache.spark.util.collection.ExternalSorter.partitionedIterator(ExternalSorter.scala:687)
    at org.apache.spark.util.collection.ExternalSorter.iterator(ExternalSorter.scala:705)
    at org.apache.spark.shuffle.hash.HashShuffleReader.read(HashShuffleReader.scala:64)
    at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:92)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
    at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:70)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:242)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
    at org.apache.spark.scheduler.Task.run(Task.scala:64)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:745)

My guess is that its not able to find the ordering that I have defined after the partitioner. But if it works in Scala, why doesn't it work in Clojure?

Ashish Negi

unread,
Jul 9, 2016, 1:31:42 AM7/9/16
to Clojure
Should not be `package` in `:import` be the actual package name of  `RFMCPartitioner` ?

see examples at https://clojuredocs.org/clojure.core/import

like :

(ns foo.bar
  (:import (java.util Date
                      Calendar)
           (java.util.logging Logger
                              Level)))


(ns xyz
  (:import
    [**  RFMCPartitioner]
    [** RFMCKey]
    )
  )

where ** is package full name.

Punit Naik

unread,
Jul 9, 2016, 2:13:17 AM7/9/16
to clo...@googlegroups.com

Hi Ashish

The "package" is indeed the full package name.

--
You received this message because you are subscribed to the Google
Groups "Clojure" group.
To post to this group, send email to clo...@googlegroups.com
Note that posts from new members are moderated - please be patient with your first post.
To unsubscribe from this group, send email to
clojure+u...@googlegroups.com
For more options, visit this group at
http://groups.google.com/group/clojure?hl=en
---
You received this message because you are subscribed to a topic in the Google Groups "Clojure" group.
To unsubscribe from this topic, visit https://groups.google.com/d/topic/clojure/ZoLWl_vbcdU/unsubscribe.
To unsubscribe from this group and all its topics, send an email to clojure+u...@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

Blake Miller

unread,
Jul 11, 2016, 2:12:10 PM7/11/16
to Clojure
Hi Punit

The behavior you are referring to is a feature of the Scala compiler, which is why it does not happen automatically when you try to use it from Clojure.

Please see the note here:

https://github.com/t6/from-scala/blob/4e1752aaa2ef835dd67a8404273bee067510a431/test/t6/from_scala/guide.clj#L161-L166

You may find that library a useful resource, either as a dependency or simply as reference material.

What you want to do is find the full method signature, including the implicits, and invoke _that_ from clojure, passing values for all implicit parameters (in this case, your custom ordering function.

HTH

Punit Naik

unread,
Jul 11, 2016, 2:46:42 PM7/11/16
to Clojure
Hi Black

Thanks for the reply but  figured it out on my own. Posting the answer after this.

Punit Naik

unread,
Jul 11, 2016, 2:48:01 PM7/11/16
to Clojure

So I finally figured it out on my own. I had to basically write my custom ordering function as a separate scala project and then call that in clojure.

I had my scala file written in this manner:


import org.apache.spark.Partitioner
import org.apache.spark.rdd.RDD

case class RFMCKey(cId: String, R: Double, F: Long, M: Double, C: Double)

class RFMCPartitioner(partitions: Int) extends Partitioner {
  require(partitions >= 0, "Number of partitions ($partitions) cannot be negative.")
  override def numPartitions: Int = partitions
  override def getPartition(key: Any): Int = {
    val k = key.asInstanceOf[RFMCKey]
    k.cId.hashCode() % numPartitions
  }
}
object RFMCKey {
  implicit def orderingBycId[A <: RFMCKey] : Ordering[A] = {
    Ordering.by(k => (k.R, k.F * -1, k.M * -1, k.C * -1))
  }
}

class rfmcSort {
  def sortWithRFMC(a: RDD[(String, (((Double, Long), Double), Double))], parts: Int): RDD[(RFMCKey, String)] = {
    val x = a.map(v => v match {
                case (custId, (((rVal, fVal), mVal),cVal)) => (RFMCKey(custId, rVal, fVal, mVal, cVal), rVal+","+fVal+","+mVal+","+cVal)
            }).repartitionAndSortWithinPartitions(new RFMCPartitioner(parts))
    x
  }
}

I compiled it as ascala project and used it in my clojure code this way:


(:import [org.formcept.wisdom rfmcSort]
         [org.apache.spark.rdd.RDD])

sorted-rfmc-records (.toJavaRDD (.sortWithRFMC (rfmcSort.) (.rdd rfmc-records) num_partitions))

Please notice the way I am calling the sortWithRFMC function from the rfmcSort object that I created. Also one very important thing to note here is when you pass your JavaPairRDD to your scala function, you have to convert it into a normal spark RDD first by calling the .rdd method on it. And then you have to convert the spark RDD back to JavaPairRDD to work with it in clojure.


And sorry that I got your name wrong *Blake :)

Reply all
Reply to author
Forward
0 new messages