不好意思,这个问题是由于前面dpark在重构Bagel相关API的时候没有更新这个example.
下面是修复这个问题的补丁,请帮忙看看是否可以修正您的问题。github上的修复会在下一次dpark更新时修正:
diff --git a/examples/pagerank.py b/examples/pagerank.py
index 6af5ade..aab7d1a 100755
--- a/examples/pagerank.py
+++ b/examples/pagerank.py
@@ -3,7 +3,7 @@ import sys, os.path
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from dpark import DparkContext
-from dpark.bagel import Vertex, Edge, Message, Bagel
+from dpark.bagel import Vertex, Edge, Bagel
def parse_vertex(line, numV):
fields = line.split(' ')
@@ -18,7 +18,7 @@ def gen_compute(num, epsilon):
else:
newValue = self.value
terminate = (superstep >= 10 and abs(newValue-self.value) < epsilon) or superstep > 30
- outbox = [Message(edge.target_id, newValue / len(self.outEdges))
+ outbox = [(edge.target_id, newValue / len(self.outEdges))
for edge in self.outEdges] if not terminate else []
return Vertex(
self.id, newValue, self.outEdges, not terminate), outbox
return compute
@@ -37,5 +37,5 @@ if __name__ == '__main__':
result = Bagel.run(dpark, vertices, messages,
gen_compute(numVertex, epsilon))
- for v in result.filter(lambda x:x.value > threshold).collect():
- print
v.id, v.value
+ for id, v in result.filter(lambda (id, v): v.value>threshold).collect():
+ print id, v