Spark matrix multiplication

4,032 views
Skip to first unread message

Thomas Vacek

unread,
Apr 17, 2013, 11:03:10 AM4/17/13
to spark...@googlegroups.com
I wanted to see how Spark would handle a matrix multiply.  Basically, I'm representing each matrix as a sparse coordinate form in an RDD[( (Int,Int), Double)].  Here are two algorithms for matrix multiplication:

class Sparse(m: Int, n:Int, data:RDD[((Int,Int),Double)]) extends Serializable {
   ...
  def dot ...
  def schoolBookMultiply(B:Sparse): Sparse =
  {
    val aRows = data.map(tt => (tt._1._1, (tt._1._2, tt._2))).groupByKey()
    val bCols = B.data.map(tt => (tt._1._2, (tt._1._1, tt._2))).groupByKey()
    val C = aRows.cartesian(bCols).map(tt => ((tt._1._1, tt._2._1), dot(tt._1._2, tt._2._2)))
    new Sparse(m, B.n, C)
  }

  def outerMultiply(B:Sparse) : Sparse =
  {
    val aCols = data.map(tt => (tt._1._2, (tt._1._1, tt._2)))
    val bRows = B.data.map(tt => (tt._1._1, (tt._1._2, tt._2)))
    val cdata = aCols.join(bRows).map( {case (key, ( (rowInd, rowVal), (colInd, colVal)) ) => ((rowInd, colInd), rowVal*colVal)}).reduceByKey(_+_)
    new Sparse(m, B.n, cdata)
  }
}

While both methods scale with cubic complexity as expected, they are very, very slow in practice.  It takes 5 minutes (299.23 seconds) to multiply a 1000x1000 fully-filled matrix by schoolBookMultiply, and outerMultiply is even slower.  For comparison, the same operation takes 2.2 seconds for a Matlab sparse matrix.  Any ideas what is going on here?  Of course, a 1000x1000 matrix doesn't need to be parallelized, but the goal here is to operate on large distributed matrices.  Perhaps an external interface to MPI might be needed to optimize certain code segments.

Thanks,

Tom

Hai-Anh Trinh

unread,
Apr 17, 2013, 12:12:01 PM4/17/13
to spark...@googlegroups.com
Hi Tom,

Looks like Spark has to do a lot of work just to iterate over the data, and the network IO and synchronization cost dominated.

In the schoolbook algorithm I suppose groupByKey requires network shuffle, I'm guessing both cartesian & join are map-join here, and the outer multiply algo ends with a big shuffle to group of all [i, j] index pair for the final reduce.

Also, you said fully-filled matrix but this is supposed to be impl for sparse matrix?

Larger-than-memory matrix algorithm is uncharted territory though. Even Mahout does not implement such matrix: https://cwiki.apache.org/MAHOUT/matrix-and-vector-needs.html. I believe that the Netflix ratings matrix could be loaded into RAM of a single machine.

Are you trying to accomplish a particular task?


--
You received this message because you are subscribed to the Google Groups "Spark Users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to spark-users...@googlegroups.com.
For more options, visit https://groups.google.com/groups/opt_out.
 
 

Dmitriy Lyubimov

unread,
Apr 17, 2013, 12:40:24 PM4/17/13
to spark...@googlegroups.com
Tom, 

you think you formulated the problem the same way to compare solutions but you haven't really. 

Matlab problem has cheap random access to element. Your problem initially does not provide this. Consequently, you spend a lot of effort to just make data available to summing and multiplying, such as figuring rows and columns. 

In Mahout, the data is represented sparse or dense row vectors, and multiplication really solves A'B problem, not AB problem. (in order to multiply, one would have to pre-transpose A first which is another job). Among other things, it allows to reduce sort complexities in MR.

That, and heavy use of combiners to sum up outer products row vectors is the approach taken in Mahout. 

Even that has significant deficiencies. One fundamental problem with MR approaches to matrix problems is that MR strives to provide stronger programming model guarantees than most matrix multiplication operations need: 

(*) sorted order of keys in reducers -- matrix operations don't need that 
(*) simultaneous iterator access to all elements of a full group 

in order to satisfy these guarantees, MR goes into sorting things, which is a lot of overhead too, and multiplying keys to address interconnectedness of the problem. 

I am actually quite anxious to try Bagel programming model, something i wanted to do for a long time, to some of the Mahout solvers. This should be quite a win, including matrix multiplications, and would probably make it possible to provide both A'B and AB solvers just as easily.

-d





--

Mike Spreitzer

unread,
Apr 17, 2013, 1:46:39 PM4/17/13
to spark...@googlegroups.com
Tom, can you tell us whether your data is actually sparse or dense?  (This is a question of degree.)

The representation you cited is very fine grained, that will likely involve more overheads than necessary.  Have you considered a block-oriented representation?  Even if it is sparse, using a sparse block representation may be a big win.

Matrix multiplication in any framework that follows the bulk synchronous parallel paradigm is going to have some unnecessary costs.  Either some data piles up somewhere waiting, or some processors sit idle somewhere waiting, or both.  It is the synchronization that is the central problem here.  You may not be near to this fundamental issue, yet.  I have a suggestion for how to cope with it, see my paper in ICDCS this year.

Regards,
Mike

Thomas Vacek

unread,
Apr 17, 2013, 2:47:13 PM4/17/13
to spark...@googlegroups.com
Hi all,

Thanks for the responses.  I am an MPI programmer playing catch-up with MR.  This was intended just to help me get an idea of what I could expect from Spark/MR.  I realize I picked a hard problem.

It seems to me that my outerMultiply function should be similar in principle to Mahout's matrix multiply.  If I'm missing something, please let me know.

I did implement a third algorithm, where the RDD primitive is a row/col sparse vector rather than a coordinate entry, and the reducer adds sparse vectors.  This was the worst time-wise.  

Mike: I'll have a look at your paper.  Are there any other papers related to this?

Thomas Vacek

unread,
Apr 24, 2013, 9:51:09 AM4/24/13
to spark...@googlegroups.com
Follow-up philosophical question: MR developed the model of distinct map, key sort, and reduce phases because of the limitations of sequential read/write media.  Since the goal of Spark is make use of memory caching, doesn't the assumption of distinct phases need to be re-examined?

There is somebody looking at out-of-memory matrix computations on Hadoop: http://www.slideshare.net/dgleich/techcon-matrix/.  With the same experiment as before, this required 50 sec v. 300.  I need to do some more experimenting.

Christopher Nguyen

unread,
Apr 27, 2013, 9:34:20 PM4/27/13
to spark...@googlegroups.com
Yes, your question about reviewing the assumptions/requirements/designs is entirely valid, but the correct answer is "it depends (on your design goal)".

First, to be precise, Google's MR was implemented with reliability as the primary design goal. That disk storage slowed down the persistence was addressed with, e.g., sstables. Next was scalability; while "real-time" speed of results was never an important goal.

Once you start dividing a job up into small chunks (partitions), it becomes possible to have an execution graph where these chunks are executed asynchronously, e.g., if you have 1000 chunks to be processed by 10 executors (more "partitions" than available "mappers" or "reducers"). When this happens, the intermediate results from the first 10, 20, 30, etc. need to be saved somewhere so that they can be properly combined in the next phase. If any partition processing should fail, it is simply "replayed" at the partition level. So whether your persistence is disk-based or memory-based, these graphs are going to have synchronous blocks, or "distinct phases" as you point out. This is the classic bulk-synchronous parallel model.

If on the other hand you have an execution graph where you can fit the entire problem (data set) simultaneously within your computing resources, and speed is far more important than reliability (e.g., sub-second-latency SQL queries that would just be retried in its entirety if failed halfway), then it can make sense to avoid these synchronization barriers altogether and have the entire graph proceed asynchronously, as fast as possible without waiting. This is more like the MPI model, and you might have "mappers" push their results to "reducers" memory-to-memory without waiting for some common sync or spilling to any disk-based persistence. The disadvantage is you lose a lot of the reliability guarantees; in general it's harder to reason out the system state; for any given amount of resources you'd face more frequent back pressure on, e.g., memory, as the data pipes through the execution graph and parts are ready/not ready to release resources. But all this could be worth it when speed is the primary goal. Indeed this is the design choice of Cloudera's Impala.

To sum up, I for one am very interested in having such a mode in Spark, i.e., where I'm willing to give up some/all resiliency guarantees in exchange for speed.

Matei Zaharia

unread,
Apr 28, 2013, 7:05:10 PM4/28/13
to spark...@googlegroups.com
Interesting discussion! While allowing random access across the network or asynchronous updates might be possible in the future, I don't think that's actually the major problem here. The problem is that the current code acts on an element-by-element basis and does a lot of object creation, function calls, etc as a result. In contrast, Matlab probably uses a very fast (e.g. tuned SIMD) multiply implementation that works on blocks of floating point numbers.

To get better performance in Spark, I'd recommend representing your matrix as an RDD of blocks (say 128x128 double arrays) instead of (int, int, double) pairs. Then, when you multiply two blocks together, use an optimized implementation, such as netlib (https://github.com/fommil/netlib-java/), on each block. You may want to experiment with the pure-Java netlib versus the C and Fortran backends based on the block size (I think the Java one is faster for small blocks).

The other problem is that there is inherently communication across the network here, which won't be present in Matlab. Even in local mode, Spark uses TCP sockets for communication to go through the same paths as distributed mode (to avoid possible bugs). Eventually that might be a bottleneck, though eliminating the CPU overhead of dealing with many small key-value pair objects ought to help a lot.

Matei

Matei Zaharia

unread,
Apr 28, 2013, 7:11:34 PM4/28/13
to spark...@googlegroups.com
By the way, I should add that this is also the point Mike made -- I missed that.

Matei
Reply all
Reply to author
Forward
0 new messages