Fwd: Can not get correct pagerank value when using more than one machine

83 views
Skip to first unread message

Yi Lu

unread,
Nov 21, 2013, 1:04:13 PM11/21/13
to graph...@googlegroups.com
Hi Everybody,

I copy the pagerank code in the toolkit source code and make some small modification(I want to use my own line parser, and adjust the number of threads). It seems that I can get the correct answer when I use one machine. The log is the following (using ./GasPageRank 1 to run)


GRAPHLAB_SUBNET_ID/GRAPHLAB_SUBNET_MASK environment variables not defined.

Using default values

Subnet ID: 0.0.0.0

Subnet Mask: 0.0.0.0

Will find first IPv4 non-loopback address matching the subnet

INFO:     dc.cpp(init:573): Cluster of 1 instances created.

INFO:     distributed_graph.hpp(set_ingress_method:3200): Automatically determine ingress method: grid

INFO:     distributed_graph.hpp(load_from_hdfs:2242): Loading graph from file: hdfs://master:9000/web-google/part_0

INFO:     distributed_graph.hpp(load_from_hdfs:2242): Loading graph from file: hdfs://master:9000/web-google/part_4

INFO:     distributed_graph.hpp(load_from_hdfs:2242): Loading graph from file: hdfs://master:9000/web-google/part_2

INFO:     distributed_graph.hpp(load_from_hdfs:2242): Loading graph from file: hdfs://master:9000/web-google/part_1

INFO:     distributed_graph.hpp(load_from_hdfs:2242): Loading graph from file: hdfs://master:9000/web-google/part_3

INFO:     distributed_graph.hpp(load_from_hdfs:2242): Loading graph from file: hdfs://master:9000/web-google/part_5

INFO:     distributed_ingress_base.hpp(finalize:185): Finalizing Graph...

INFO:     distributed_ingress_base.hpp(exchange_global_info:532): Graph info: 

nverts: 916428

nedges: 5105039

nreplicas: 916428

replication factor: 1

#vertices: 916428 #edges:5105039

Elapsed time in loading: 5.3647

INFO:     distributed_ingress_base.hpp(finalize:185): Finalizing Graph...

INFO:     synchronous_engine.hpp(start:1299): Iteration counter will only output every 5 seconds.

INFO:     synchronous_engine.hpp(start:1314): 0: Starting iteration: 0

INFO:     synchronous_engine.hpp(start:1363): Active vertices: 916428

INFO:     synchronous_engine.hpp(start:1412):  Running Aggregators

INFO:     synchronous_engine.hpp(start:1314): 0: Starting iteration: 9

INFO:     synchronous_engine.hpp(start:1363): Active vertices: 365789

INFO:     synchronous_engine.hpp(start:1412):  Running Aggregators

INFO:     synchronous_engine.hpp(start:1424): 64 iterations completed.

Updates: 7516532

Finished Running engine in 7.7 seconds.

Total rank: 622905

INFO:     distributed_ingress_base.hpp(finalize:185): Finalizing Graph...

Elapsed time in dumping: 1.10235




However, when using multiple machines, I can not get correct answers. (using mpiexec -n 2 -f machines ./GasPageRank 1 to run) You can check the log, the total pagerank at last is different. But, If I use the sample pagerank code to run, there is no such problem. I only do a little  modification. Could anybody tell me which part is wrong? Thanks a lot.

GRAPHLAB_SUBNET_ID/GRAPHLAB_SUBNET_MASK environment variables not defined.

Using default values

Subnet ID: 0.0.0.0

Subnet Mask: 0.0.0.0

Will find first IPv4 non-loopback address matching the subnet

INFO:     dc.cpp(init:573): Cluster of 2 instances created.

INFO:     distributed_graph.hpp(set_ingress_method:3200): Automatically determine ingress method: grid

INFO:     distributed_graph.hpp(load_from_hdfs:2242): Loading graph from file: hdfs://master:9000/web-google/part_0

INFO:     distributed_graph.hpp(load_from_hdfs:2242): Loading graph from file: hdfs://master:9000/web-google/part_4

INFO:     distributed_graph.hpp(load_from_hdfs:2242): Loading graph from file: hdfs://master:9000/web-google/part_2

INFO:     distributed_graph.hpp(load_from_hdfs:2242): Loading graph from file: hdfs://master:9000/web-google/part_3

INFO:     distributed_graph.hpp(load_from_hdfs:2242): Loading graph from file: hdfs://master:9000/web-google/part_5

INFO:     distributed_graph.hpp(load_from_hdfs:2242): Loading graph from file: hdfs://master:9000/web-google/part_1

INFO:     distributed_ingress_base.hpp(finalize:185): Finalizing Graph...

INFO:     distributed_ingress_base.hpp(exchange_global_info:532): Graph info: 

nverts: 916428

nedges: 5105039

nreplicas: 1672370

replication factor: 1.82488

#vertices: 916428 #edges:5105039

Elapsed time in loading: 5.61887

INFO:     distributed_ingress_base.hpp(finalize:185): Finalizing Graph...

INFO:     synchronous_engine.hpp(start:1299): Iteration counter will only output every 5 seconds.

INFO:     synchronous_engine.hpp(start:1314): 0: Starting iteration: 0

INFO:     synchronous_engine.hpp(start:1363): Active vertices: 916428

INFO:     synchronous_engine.hpp(start:1412):  Running Aggregators

INFO:     synchronous_engine.hpp(start:1314): 0: Starting iteration: 21

INFO:     synchronous_engine.hpp(start:1363): Active vertices: 437

INFO:     synchronous_engine.hpp(start:1412):  Running Aggregators

INFO:     synchronous_engine.hpp(start:1424): 53 iterations completed.

Updates: 2173546

Finished Running engine in 9.6 seconds.

Total rank: 363980

Total rank: 363980

INFO:     distributed_ingress_base.hpp(finalize:185): Finalizing Graph...

Elapsed time in dumping: 1.32918


My code is here.



#include <vector>

#include <string>

#include <fstream>


#include <graphlab.hpp>

// #include <graphlab/macros_def.hpp>


// Global random reset probability

double RESET_PROB = 0.15;


double TOLERANCE = 1.0E-2;

// The vertex data is just the pagerank value (a double)

typedef double vertex_data_type;


// There is no edge data in the pagerank application

typedef graphlab::empty edge_data_type;


// The graph type is determined by the vertex and edge data types

typedef graphlab::distributed_graph<vertex_data_type, edge_data_type> graph_type;


/*

 * A simple function used by graph.transform_vertices(init_vertex);

 * to initialize the vertes data.

 */

void init_vertex(graph_type::vertex_type& vertex) { vertex.data() = 1; }


class pagerank :

    public graphlab::ivertex_program<graph_type, double> {


        double last_change;

        public:


        /**

         * Gather only in edges.

         */

        edge_dir_type gather_edges(icontext_type& context,

                const vertex_type& vertex) const {

            return graphlab::IN_EDGES;

        } // end of Gather edges



        /* Gather the weighted rank of the adjacent page   */

        double gather(icontext_type& context, const vertex_type& vertex,

                edge_type& edge) const {

            return (edge.source().data() / edge.source().num_out_edges());

        }


        /* Use the total rank of adjacent pages to update this page */

        void apply(icontext_type& context, vertex_type& vertex,

                const gather_type& total) {


            const double newval = (1.0 - RESET_PROB) * total + RESET_PROB;

            last_change = (newval - vertex.data());

            vertex.data() = newval;

        }


        /* The scatter edges depend on whether the pagerank has converged */

        edge_dir_type scatter_edges(icontext_type& context,

                const vertex_type& vertex) const {

            if(std::fabs(last_change) > TOLERANCE ) {

                return graphlab::OUT_EDGES;

            } else {

                return graphlab::NO_EDGES;

            }

        }


        /* The scatter function just signal adjacent pages */

        void scatter(icontext_type& context, const vertex_type& vertex,

                edge_type& edge) const {


            if(last_change > TOLERANCE || last_change < -TOLERANCE) {

                context.signal(edge.target());

            } else {

                context.signal(edge.target()); //, std::fabs(last_change));

            }

        }


        void save(graphlab::oarchive& oarc) const {

            // If we are using iterations as a counter then we do not need to

            // move the last change in the vertex program along with the

            // vertex data.

            oarc << last_change;

        }


        void load(graphlab::iarchive& iarc) {

            iarc >> last_change;

        }


    }; // end of factorized_pagerank update functor



/*

 * We want to save the final graph so we define a write which will be

 * used in graph.save("path/prefix", pagerank_writer()) to save the graph.

 */

struct pagerank_writer {

    std::string save_vertex(graph_type::vertex_type v) {

        std::stringstream strm;

        strm << v.id() << "\t" << v.data() << "\n";

        return strm.str();

    }

    std::string save_edge(graph_type::edge_type e) { return ""; }

}; // end of pagerank writer



double map_rank(const graph_type::vertex_type& v) { return v.data(); }



double pagerank_sum(graph_type::vertex_type v) {

    return v.data();

}

bool line_parser(graph_type& graph, const std::string& filename,

        const std::string& textline) {


    std::istringstream ssin(textline);

    graphlab::vertex_id_type vid;

    ssin >> vid;

    graph.add_vertex(vid);

    int block_id;

    //ssin >> block_id;

    int out_nb;

    ssin >> out_nb;

    while (out_nb--) {

        graphlab::vertex_id_type other_vid;

        ssin >> other_vid;

        graph.add_edge(vid, other_vid);

    }

    return true;

}

int main(int argc, char** argv) {

    // Initialize control plain using mpi

    graphlab::mpi_tools::init(argc, argv);

    graphlab::distributed_control dc;

    //global_logger().set_log_level(LOG_INFO);

    graphlab::timer timer;

    timer.start();

    graphlab::graphlab_options ops;

    ops.set_ncpus(atoi(argv[1]));


    std::string exec_type = "synchronous";

    graph_type graph(dc);

    //graph.load_format("hdfs://master:9000/web-google/", "adj");

    graph.load("hdfs://master:9000/web-google/", line_parser);

    // must call finalize before querying the graph

    graph.finalize();

    dc.cout() << "#vertices: " << graph.num_vertices()

        << " #edges:" << graph.num_edges() << std::endl;

    dc.cout() << "Elapsed time in loading: " << timer.current_time() << std::endl; 

    // Initialize the vertex data

    graph.transform_vertices(init_vertex);


    // Running The Engine -------------------------------------------------------

    graphlab::omni_engine<pagerank> engine(dc, graph, exec_type, ops);

    engine.signal_all();

    engine.start();

    const double runtime = engine.elapsed_seconds();

    dc.cout() << "Finished Running engine in " << runtime

        << " seconds." << std::endl;



    const double total_rank = graph.map_reduce_vertices<double>(map_rank);

    std::cout << "Total rank: " << total_rank << std::endl;

    timer.start();

    graph.save("hdfs://master:9000/tmp/pagerank/part", pagerank_writer(), false, // set to true if each output file is to be gzipped

            true, // whether vertices are saved

            false); // whether edges are saved

    // Tear-down communication layer and quit -----------------------------------

    

    dc.cout() << "Elapsed time in dumping: " << timer.current_time() << std::endl; 

    graphlab::mpi_tools::finalize();

    return EXIT_SUCCESS;

} // End of main



// We render this entire program in the documentation







Reply all
Reply to author
Forward
0 new messages