GRAPHLAB_SUBNET_ID/GRAPHLAB_SUBNET_MASK environment variables not defined.
Using default values
Subnet ID: 0.0.0.0
Subnet Mask: 0.0.0.0
Will find first IPv4 non-loopback address matching the subnet
INFO: dc.cpp(init:573): Cluster of 1 instances created.
INFO: distributed_graph.hpp(set_ingress_method:3200): Automatically determine ingress method: grid
INFO: distributed_graph.hpp(load_from_hdfs:2242): Loading graph from file: hdfs://master:9000/web-google/part_0
INFO: distributed_graph.hpp(load_from_hdfs:2242): Loading graph from file: hdfs://master:9000/web-google/part_4
INFO: distributed_graph.hpp(load_from_hdfs:2242): Loading graph from file: hdfs://master:9000/web-google/part_2
INFO: distributed_graph.hpp(load_from_hdfs:2242): Loading graph from file: hdfs://master:9000/web-google/part_1
INFO: distributed_graph.hpp(load_from_hdfs:2242): Loading graph from file: hdfs://master:9000/web-google/part_3
INFO: distributed_graph.hpp(load_from_hdfs:2242): Loading graph from file: hdfs://master:9000/web-google/part_5
INFO: distributed_ingress_base.hpp(finalize:185): Finalizing Graph...
INFO: distributed_ingress_base.hpp(exchange_global_info:532): Graph info:
nverts: 916428
nedges: 5105039
nreplicas: 916428
replication factor: 1
#vertices: 916428 #edges:5105039
Elapsed time in loading: 5.3647
INFO: distributed_ingress_base.hpp(finalize:185): Finalizing Graph...
INFO: synchronous_engine.hpp(start:1299): Iteration counter will only output every 5 seconds.
INFO: synchronous_engine.hpp(start:1314): 0: Starting iteration: 0
INFO: synchronous_engine.hpp(start:1363): Active vertices: 916428
INFO: synchronous_engine.hpp(start:1412): Running Aggregators
INFO: synchronous_engine.hpp(start:1314): 0: Starting iteration: 9
INFO: synchronous_engine.hpp(start:1363): Active vertices: 365789
INFO: synchronous_engine.hpp(start:1412): Running Aggregators
INFO: synchronous_engine.hpp(start:1424): 64 iterations completed.
Updates: 7516532
Finished Running engine in 7.7 seconds.
Total rank: 622905
INFO: distributed_ingress_base.hpp(finalize:185): Finalizing Graph...
Elapsed time in dumping: 1.10235
GRAPHLAB_SUBNET_ID/GRAPHLAB_SUBNET_MASK environment variables not defined.
Using default values
Subnet ID: 0.0.0.0
Subnet Mask: 0.0.0.0
Will find first IPv4 non-loopback address matching the subnet
INFO: dc.cpp(init:573): Cluster of 2 instances created.
INFO: distributed_graph.hpp(set_ingress_method:3200): Automatically determine ingress method: grid
INFO: distributed_graph.hpp(load_from_hdfs:2242): Loading graph from file: hdfs://master:9000/web-google/part_0
INFO: distributed_graph.hpp(load_from_hdfs:2242): Loading graph from file: hdfs://master:9000/web-google/part_4
INFO: distributed_graph.hpp(load_from_hdfs:2242): Loading graph from file: hdfs://master:9000/web-google/part_2
INFO: distributed_graph.hpp(load_from_hdfs:2242): Loading graph from file: hdfs://master:9000/web-google/part_3
INFO: distributed_graph.hpp(load_from_hdfs:2242): Loading graph from file: hdfs://master:9000/web-google/part_5
INFO: distributed_graph.hpp(load_from_hdfs:2242): Loading graph from file: hdfs://master:9000/web-google/part_1
INFO: distributed_ingress_base.hpp(finalize:185): Finalizing Graph...
INFO: distributed_ingress_base.hpp(exchange_global_info:532): Graph info:
nverts: 916428
nedges: 5105039
nreplicas: 1672370
replication factor: 1.82488
#vertices: 916428 #edges:5105039
Elapsed time in loading: 5.61887
INFO: distributed_ingress_base.hpp(finalize:185): Finalizing Graph...
INFO: synchronous_engine.hpp(start:1299): Iteration counter will only output every 5 seconds.
INFO: synchronous_engine.hpp(start:1314): 0: Starting iteration: 0
INFO: synchronous_engine.hpp(start:1363): Active vertices: 916428
INFO: synchronous_engine.hpp(start:1412): Running Aggregators
INFO: synchronous_engine.hpp(start:1314): 0: Starting iteration: 21
INFO: synchronous_engine.hpp(start:1363): Active vertices: 437
INFO: synchronous_engine.hpp(start:1412): Running Aggregators
INFO: synchronous_engine.hpp(start:1424): 53 iterations completed.
Updates: 2173546
Finished Running engine in 9.6 seconds.
Total rank: 363980
Total rank: 363980
INFO: distributed_ingress_base.hpp(finalize:185): Finalizing Graph...
Elapsed time in dumping: 1.32918
My code is here.
#include <vector>
#include <string>
#include <fstream>
#include <graphlab.hpp>
// #include <graphlab/macros_def.hpp>
// Global random reset probability
double RESET_PROB = 0.15;
double TOLERANCE = 1.0E-2;
// The vertex data is just the pagerank value (a double)
typedef double vertex_data_type;
// There is no edge data in the pagerank application
typedef graphlab::empty edge_data_type;
// The graph type is determined by the vertex and edge data types
typedef graphlab::distributed_graph<vertex_data_type, edge_data_type> graph_type;
/*
* A simple function used by graph.transform_vertices(init_vertex);
* to initialize the vertes data.
*/
void init_vertex(graph_type::vertex_type& vertex) { vertex.data() = 1; }
class pagerank :
public graphlab::ivertex_program<graph_type, double> {
double last_change;
public:
/**
* Gather only in edges.
*/
edge_dir_type gather_edges(icontext_type& context,
const vertex_type& vertex) const {
return graphlab::IN_EDGES;
} // end of Gather edges
/* Gather the weighted rank of the adjacent page */
double gather(icontext_type& context, const vertex_type& vertex,
edge_type& edge) const {
return (edge.source().data() / edge.source().num_out_edges());
}
/* Use the total rank of adjacent pages to update this page */
void apply(icontext_type& context, vertex_type& vertex,
const gather_type& total) {
const double newval = (1.0 - RESET_PROB) * total + RESET_PROB;
last_change = (newval - vertex.data());
vertex.data() = newval;
}
/* The scatter edges depend on whether the pagerank has converged */
edge_dir_type scatter_edges(icontext_type& context,
const vertex_type& vertex) const {
if(std::fabs(last_change) > TOLERANCE ) {
return graphlab::OUT_EDGES;
} else {
return graphlab::NO_EDGES;
}
}
/* The scatter function just signal adjacent pages */
void scatter(icontext_type& context, const vertex_type& vertex,
edge_type& edge) const {
if(last_change > TOLERANCE || last_change < -TOLERANCE) {
context.signal(edge.target());
} else {
context.signal(edge.target()); //, std::fabs(last_change));
}
}
void save(graphlab::oarchive& oarc) const {
// If we are using iterations as a counter then we do not need to
// move the last change in the vertex program along with the
// vertex data.
oarc << last_change;
}
void load(graphlab::iarchive& iarc) {
iarc >> last_change;
}
}; // end of factorized_pagerank update functor
/*
* We want to save the final graph so we define a write which will be
* used in graph.save("path/prefix", pagerank_writer()) to save the graph.
*/
struct pagerank_writer {
std::string save_vertex(graph_type::vertex_type v) {
std::stringstream strm;
strm << v.id() << "\t" << v.data() << "\n";
return strm.str();
}
std::string save_edge(graph_type::edge_type e) { return ""; }
}; // end of pagerank writer
double map_rank(const graph_type::vertex_type& v) { return v.data(); }
double pagerank_sum(graph_type::vertex_type v) {
return v.data();
}
bool line_parser(graph_type& graph, const std::string& filename,
const std::string& textline) {
std::istringstream ssin(textline);
graphlab::vertex_id_type vid;
ssin >> vid;
graph.add_vertex(vid);
int block_id;
//ssin >> block_id;
int out_nb;
ssin >> out_nb;
while (out_nb--) {
graphlab::vertex_id_type other_vid;
ssin >> other_vid;
graph.add_edge(vid, other_vid);
}
return true;
}
int main(int argc, char** argv) {
// Initialize control plain using mpi
graphlab::mpi_tools::init(argc, argv);
graphlab::distributed_control dc;
//global_logger().set_log_level(LOG_INFO);
graphlab::timer timer;
timer.start();
graphlab::graphlab_options ops;
ops.set_ncpus(atoi(argv[1]));
std::string exec_type = "synchronous";
graph_type graph(dc);
//graph.load_format("hdfs://master:9000/web-google/", "adj");
graph.load("hdfs://master:9000/web-google/", line_parser);
// must call finalize before querying the graph
graph.finalize();
dc.cout() << "#vertices: " << graph.num_vertices()
<< " #edges:" << graph.num_edges() << std::endl;
dc.cout() << "Elapsed time in loading: " << timer.current_time() << std::endl;
// Initialize the vertex data
graph.transform_vertices(init_vertex);
// Running The Engine -------------------------------------------------------
graphlab::omni_engine<pagerank> engine(dc, graph, exec_type, ops);
engine.signal_all();
engine.start();
const double runtime = engine.elapsed_seconds();
dc.cout() << "Finished Running engine in " << runtime
<< " seconds." << std::endl;
const double total_rank = graph.map_reduce_vertices<double>(map_rank);
std::cout << "Total rank: " << total_rank << std::endl;
timer.start();
graph.save("hdfs://master:9000/tmp/pagerank/part", pagerank_writer(), false, // set to true if each output file is to be gzipped
true, // whether vertices are saved
false); // whether edges are saved
// Tear-down communication layer and quit -----------------------------------
dc.cout() << "Elapsed time in dumping: " << timer.current_time() << std::endl;
graphlab::mpi_tools::finalize();
return EXIT_SUCCESS;
} // End of main
// We render this entire program in the documentation