PyMC-Apache Spark Integration

612 views
Skip to first unread message

mertte...@gmail.com

unread,
Jun 19, 2014, 9:13:06 PM6/19/14
to py...@googlegroups.com
Hi,

As I had mentioned in one of the posts, I'm currently developing an interface for PyMC to use Apache Spark as a backend. I forked PyMC at PyMC-Spark Repository. Since PyMC v3 is currently under development, I've decided to go with v2.3. I'd like to share the current status with you. So far, I've completed the following items: 

- Implemented HDFS backend for PyMC v3 (hdfs.py).

- Implemented HDFS db backend for PyMC v2.3 (hdfs.py).

The above implementations use PyWebHdfs to establish a communication with HDFS. Therefore, sending data through the network would be a bit slow. 

- Added MCMCSpark module to build an interface for the user to access Spark RDDs (Spark wraps data inside so called RDDs) with regular MCMC class methods (MCMCSpark.py). By using this class, traces can be easily saved into and loaded from HDFS, without requiring any network allocation. By calling the 'sample' method which has been implemented in MCMCSpark, one can run MCMC's sample method parallel on Spark clusters. However Spark jobs cannot return the MCMC instance that they had created, because PySpark can't serialize MCMC objects due to Fortran code. Instead, each Spark job returns a tuple of integer (chain number) and dictionary (trace data). 

- Implemented Spark db backend. This module is simply the database backend for Spark RDD. It contains a Trace and a Database class like the other database modules (spark.py). 

- I've also added some example code which shows how to use HDFS backend (disaster_model_hdfs.py) or MCMCSpark (disaster_model_spark.py).

I'd like to hear your opinions and ideas. I'd appreciate it, if you can give some feedback. 

Thomas Wiecki

unread,
Jun 21, 2014, 5:36:45 AM6/21/14
to py...@googlegroups.com
This looks really interesting.

On Fri, Jun 20, 2014 at 3:13 AM, <mertte...@gmail.com> wrote:
Hi,

As I had mentioned in one of the posts, I'm currently developing an interface for PyMC to use Apache Spark as a backend. I forked PyMC at PyMC-Spark Repository. Since PyMC v3 is currently under development, I've decided to go with v2.3.

While I can understand that decision, it is a little unfortunate to have such an exciting feature not work with the up-and-coming pymc. The code base is much lighter and certainly we'd happy to help if questions arise.

I'd like to share the current status with you. So far, I've completed the following items: 

- Implemented HDFS backend for PyMC v3 (hdfs.py).
 
This seems to be the same link as below for v2.3?

- Implemented HDFS db backend for PyMC v2.3 (hdfs.py).

The above implementations use PyWebHdfs to establish a communication with HDFS. Therefore, sending data through the network would be a bit slow. 

- Added MCMCSpark module to build an interface for the user to access Spark RDDs (Spark wraps data inside so called RDDs) with regular MCMC class methods (MCMCSpark.py). By using this class, traces can be easily saved into and loaded from HDFS, without requiring any network allocation. By calling the 'sample' method which has been implemented in MCMCSpark, one can run MCMC's sample method parallel on Spark clusters. However Spark jobs cannot return the MCMC instance that they had created, because PySpark can't serialize MCMC objects due to Fortran code. Instead, each Spark job returns a tuple of integer (chain number) and dictionary (trace data). 

Why don't all instances simply directly save their traces to the HDFS and you simply read that out on the master thread?

- Implemented Spark db backend. This module is simply the database backend for Spark RDD. It contains a Trace and a Database class like the other database modules (spark.py). 

- I've also added some example code which shows how to use HDFS backend (disaster_model_hdfs.py) or MCMCSpark (disaster_model_spark.py).

I'd like to hear your opinions and ideas. I'd appreciate it, if you can give some feedback. 

Once this works more robustly it would be great to get the map-reducable samplers. Do you plan on adding those? I think Kai laid some interesting ground work here: https://github.com/pymc-devs/pymc/pull/547 (pymc3 though).

Thomas

--
You received this message because you are subscribed to the Google Groups "PyMC" group.
To unsubscribe from this group and stop receiving emails from it, send an email to pymc+uns...@googlegroups.com.
To post to this group, send email to py...@googlegroups.com.
Visit this group at http://groups.google.com/group/pymc.
For more options, visit https://groups.google.com/d/optout.



--
Thomas Wiecki
PhD candidate, Brown University
Quantitative Researcher, Quantopian Inc, Boston

Mert Terzihan

unread,
Jul 10, 2014, 1:57:10 PM7/10/14
to py...@googlegroups.com
First of all, thank you for the reply and the feedback. 


On Sat, Jun 21, 2014 at 2:36 AM, Thomas Wiecki <thomas...@gmail.com> wrote:
This looks really interesting.

While I can understand that decision, it is a little unfortunate to have such an exciting feature not work with the up-and-coming pymc. The code base is much lighter and certainly we'd happy to help if questions arise.
 
One of the reasons is to develop this project for PyMC v2.3 is that  PyMC 3 is still in alpha stage, whereas PyMC 2.3 is more mature. However once PyMC 3 reaches enough maturity, we can apply the same idea that has been used to PyMC 3 to obtain a distributed processing for PyMC. 

This seems to be the same link as below for v2.3? 
 
The link for v2.3 is under the branch pyspark, whereas the link for v3 is under master branch. Both of the modules may be similar but they are designed for different versions. 

Why don't all instances simply directly save their traces to the HDFS and you simply read that out on the master thread? 

That might be a solution, however we assume that the size of the data is big enough that it cannot fit into main memory. So, it would not be possible to load all the data from HDFS to the driver program. Instead, each machine in the cluster stores the traces that it has produced and the traces can be queried via Spark methods. I have been rewritten some MCMC methods so that they can work with this approach. For instance, in order to compute some statistics, every machine can calculate a local version of the statistic representing the local data, then they can be combined in a reducer to produce a global statistic. 
 
Once this works more robustly it would be great to get the map-reducable samplers. Do you plan on adding those? I think Kai laid some interesting ground work here: https://github.com/pymc-devs/pymc/pull/547 (pymc3 though).

Implementing one of the distributed samplers (i.e. Asymptotically Exact, Embarrassingly Parallel MCMC) would be a great future step. However we are more interested in distributing the data (assuming that the observations are big in size), rather than parallelizing the number of iterations in a sampler as described in the paper. We can work on this once we are done with the features on focus. In order to provide a framework to distribute the data across the machines in the cluster, I've been implementing DistributedMCMC.py. It differs from MCMCSpark.py in a way that MCMCSpark runs the same model on the same data across the cluster, whereas DistributedMCMC distributes the data and runs a model that utilizes the local data, and synchronizes the samplers via a global update function. As an introductory example, I have implemented a simple LDA model, DistributedLDA.py

I would like to also contribute this ongoing work to the actual PyMC repository. Would it be possible to send a pull request? We can either integrate the work to the 2.3 branch or create another branch which contains the modules that provide distributed sampling. So far, both HDFS backends for v2.3 and v3 have passed all the unit tests and are ready to be used. I will test MCMCSpark and spark backend and they will be ready in a couple of days. There are some missing pieces in DistributedMCMC, however it will be a complete working module in a few days, too. 

Thank you.

Mert

--
Mert Terzihan
M.Sc. Student in Computer Science at Brown University
Brown University, Computer Science Dept.
115 Waterman Street, 4th Floor
Providence, RI 02912-1910

Thomas Wiecki

unread,
Jul 15, 2014, 3:01:33 AM7/15/14
to py...@googlegroups.com
On Thu, Jul 10, 2014 at 7:57 PM, Mert Terzihan <mertte...@gmail.com> wrote:
First of all, thank you for the reply and the feedback. 


On Sat, Jun 21, 2014 at 2:36 AM, Thomas Wiecki <thomas...@gmail.com> wrote:
This looks really interesting.

While I can understand that decision, it is a little unfortunate to have such an exciting feature not work with the up-and-coming pymc. The code base is much lighter and certainly we'd happy to help if questions arise.
 
One of the reasons is to develop this project for PyMC v2.3 is that  PyMC 3 is still in alpha stage, whereas PyMC 2.3 is more mature. However once PyMC 3 reaches enough maturity, we can apply the same idea that has been used to PyMC 3 to obtain a distributed processing for PyMC. 

That certainly makes sense and it's true this could be implemented for pymc 3 much easier then. The main reason that we consider pymc3 to be alpha is the lack of documentation. But since the code is pretty readable I think for this purpose it would not cause you issues.

 
Once this works more robustly it would be great to get the map-reducable samplers. Do you plan on adding those? I think Kai laid some interesting ground work here: https://github.com/pymc-devs/pymc/pull/547 (pymc3 though).

Implementing one of the distributed samplers (i.e. Asymptotically Exact, Embarrassingly Parallel MCMC) would be a great future step. However we are more interested in distributing the data (assuming that the observations are big in size), rather than parallelizing the number of iterations in a sampler as described in the paper. We can work on this once we are done with the features on focus. In order to provide a framework to distribute the data across the machines in the cluster, I've been implementing DistributedMCMC.py. It differs from MCMCSpark.py in a way that MCMCSpark runs the same model on the same data across the cluster, whereas DistributedMCMC distributes the data and runs a model that utilizes the local data, and synchronizes the samplers via a global update function. As an introductory example, I have implemented a simple LDA model, DistributedLDA.py

Not sure I understand. My read of the paper you cite was that it allows you to do exactly that -- combine traces from different parts of the data into one posterior of all data. How do you combine the samples now?
 
I would like to also contribute this ongoing work to the actual PyMC repository. Would it be possible to send a pull request? We can either integrate the work to the 2.3 branch or create another branch which contains the modules that provide distributed sampling. So far, both HDFS backends for v2.3 and v3 have passed all the unit tests and are ready to be used. I will test MCMCSpark and spark backend and they will be ready in a couple of days. There are some missing pieces in DistributedMCMC, however it will be a complete working module in a few days, too. 

That would definitely be of interest. Main concerns would be added dependencies (could be optional), unit tests (sounds like you have those already) and documentation / tutorials.

In any case, very interesting and exciting work.

Thomas


Thank you.

Mert

--
Mert Terzihan
M.Sc. Student in Computer Science at Brown University
Brown University, Computer Science Dept.
115 Waterman Street, 4th Floor
Providence, RI 02912-1910

--
You received this message because you are subscribed to the Google Groups "PyMC" group.
To unsubscribe from this group and stop receiving emails from it, send an email to pymc+uns...@googlegroups.com.
To post to this group, send email to py...@googlegroups.com.
Visit this group at http://groups.google.com/group/pymc.
For more options, visit https://groups.google.com/d/optout.

Mert Terzihan

unread,
Jul 17, 2014, 5:06:33 PM7/17/14
to py...@googlegroups.com
Thanks for your feedback, Thomas.

On Tue, Jul 15, 2014 at 12:00 AM, Thomas Wiecki <thomas...@gmail.com> wrote:



On Thu, Jul 10, 2014 at 7:57 PM, Mert Terzihan <mertte...@gmail.com> wrote:
First of all, thank you for the reply and the feedback. 


On Sat, Jun 21, 2014 at 2:36 AM, Thomas Wiecki <thomas...@gmail.com> wrote:
This looks really interesting.

While I can understand that decision, it is a little unfortunate to have such an exciting feature not work with the up-and-coming pymc. The code base is much lighter and certainly we'd happy to help if questions arise.
 
One of the reasons is to develop this project for PyMC v2.3 is that  PyMC 3 is still in alpha stage, whereas PyMC 2.3 is more mature. However once PyMC 3 reaches enough maturity, we can apply the same idea that has been used to PyMC 3 to obtain a distributed processing for PyMC. 

That certainly makes sense and it's true this could be implemented for pymc 3 much easier then. The main reason that we consider pymc3 to be alpha is the lack of documentation. But since the code is pretty readable I think for this purpose it would not cause you issues.

Yes, you are right. I've spent some time playing with PyMC 3 and it was very well implemented with high readability. Once I'm done with the  

 
Once this works more robustly it would be great to get the map-reducable samplers. Do you plan on adding those? I think Kai laid some interesting ground work here: https://github.com/pymc-devs/pymc/pull/547 (pymc3 though).

Implementing one of the distributed samplers (i.e. Asymptotically Exact, Embarrassingly Parallel MCMC) would be a great future step. However we are more interested in distributing the data (assuming that the observations are big in size), rather than parallelizing the number of iterations in a sampler as described in the paper. We can work on this once we are done with the features on focus. In order to provide a framework to distribute the data across the machines in the cluster, I've been implementing DistributedMCMC.py. It differs from MCMCSpark.py in a way that MCMCSpark runs the same model on the same data across the cluster, whereas DistributedMCMC distributes the data and runs a model that utilizes the local data, and synchronizes the samplers via a global update function. As an introductory example, I have implemented a simple LDA model, DistributedLDA.py

Not sure I understand. My read of the paper you cite was that it allows you to do exactly that -- combine traces from different parts of the data into one posterior of all data. How do you combine the samples now?

Oh, I'm sorry that I confused this paper with another. It is based on the idea that partitions the data across the cluster and runs separate MCMC on the distributed data. Since most of the step methods in PyMC 2.3 have symmetric proposal, where the Hastings factor is just 1, it would be much more easier to implement that approach. I've started doing that and will be pushing my code when I'm done. 

 
I would like to also contribute this ongoing work to the actual PyMC repository. Would it be possible to send a pull request? We can either integrate the work to the 2.3 branch or create another branch which contains the modules that provide distributed sampling. So far, both HDFS backends for v2.3 and v3 have passed all the unit tests and are ready to be used. I will test MCMCSpark and spark backend and they will be ready in a couple of days. There are some missing pieces in DistributedMCMC, however it will be a complete working module in a few days, too. 

That would definitely be of interest. Main concerns would be added dependencies (could be optional), unit tests (sounds like you have those already) and documentation / tutorials.

The dependencies that the distributed MCMC requires is just having Spark installed on the machine or cluster that the code will be run. Other than that, HDFS backend requires pywebhdfs to establish communication with HDFS. Apart from them, I'm trying to use the same libraries that have been used in PyMC. Regarding documentation/tutorial, I will be writing a blog post about the work that I've done, going over some example code. Also I'd be really happy to write additional documentation. 

In any case, very interesting and exciting work.

Thank you for your feedback, it is vital for the process. 
 


Thomas



--
Thomas Wiecki
PhD candidate, Brown University
Quantitative Researcher, Quantopian Inc, Boston

Thomas Wiecki

unread,
Sep 1, 2014, 3:51:00 PM9/1/14
to py...@googlegroups.com
Not sure why Mert didn't post this here but for anyone interested, here is a write-up of this project:

http://blog.cloudera.com/blog/2014/08/bayesian-machine-learning-on-apache-spark/

Nice work!


--
You received this message because you are subscribed to the Google Groups "PyMC" group.
To unsubscribe from this group and stop receiving emails from it, send an email to pymc+uns...@googlegroups.com.
To post to this group, send email to py...@googlegroups.com.
Visit this group at http://groups.google.com/group/pymc.
For more options, visit https://groups.google.com/d/optout.



--
Thomas Wiecki, PhD

Mert Terzihan

unread,
Sep 8, 2014, 1:48:26 PM9/8/14
to py...@googlegroups.com
Thanks Thomas for sharing my blog post about the project. I hope that it will be useful for people who are looking for running MCMC methods distributed. 

I'm looking forward to integrating PyMC 3 with Spark. I'm sure that we can even get better results with PyMC 3. 

You received this message because you are subscribed to a topic in the Google Groups "PyMC" group.
To unsubscribe from this topic, visit https://groups.google.com/d/topic/pymc/-Y3DGWSAfvI/unsubscribe.
To unsubscribe from this group and all its topics, send an email to pymc+uns...@googlegroups.com.

To post to this group, send email to py...@googlegroups.com.
Visit this group at http://groups.google.com/group/pymc.
For more options, visit https://groups.google.com/d/optout.



--
Reply all
Reply to author
Forward
0 new messages