list_of_years = range(2000, 2025)
first_year = 2000
for metric_year in list_of_years:
# Filter the graph to edges between 2000 and the metric_year, then run PageRank
loop_edges: DataFrame = edges.filter(F.col("year").between(first_year, metric_year))
loop_nodes: DataFrame = (
loop_edges.selectExpr("src as id")
.union(loop_edges.selectExpr("dst as id"))
.distinct()
)
loop_graph: GraphFrame = GraphFrame(loop_nodes, loop_edges)
pagerank_graph = loop_graph.pageRank(resetProbability=0.15, maxIter=10)
pagerank_df = (
pagerank_graph.vertices.withColumn("metric_date", F.lit(metric_year))
.select("id", "metric_date", "pagerank")
)
pagerank_df.write.parquet("data/pagerank.parquet", mode="append")
I've tried a lot of variations and can't figure out what is going on...
Russell Jurney | rju...@graphlet.ai | graphlet.ai | Graphlet AI Blog | LinkedIn | BlueSky
--
You received this message because you are subscribed to the Google Groups "GraphFrames" group.
To unsubscribe from this group and stop receiving emails from it, send an email to graphframes...@googlegroups.com.
To view this discussion visit https://groups.google.com/d/msgid/graphframes/8848d854-dee6-4ba4-98a5-105c0c7ef4f1n%40googlegroups.com.
To view this discussion visit https://groups.google.com/d/msgid/graphframes/577cc44b-8f05-4918-b791-cf293051a47en%40googlegroups.com.
The spark driver has stopped unexpectedly and is restarting. Your notebook will be automatically reattached.
at com.databricks.spark.chauffeur.Chauffeur.onDriverStateChange(Chauffeur.scala:1551)
at com.databricks.spark.chauffeur.Chauffeur.$anonfun$driverStateOpt$1(Chauffeur.scala:203)
at com.databricks.spark.chauffeur.Chauffeur.$anonfun$driverStateOpt$1$adapted(Chauffeur.scala:203)
at com.databricks.spark.chauffeur.DriverDaemonMonitorImpl.$anonfun$goToStopped$4(DriverDaemonMonitorImpl.scala:295)
at com.databricks.spark.chauffeur.DriverDaemonMonitorImpl.$anonfun$goToStopped$4$adapted(DriverDaemonMonitorImpl.scala:295)
at scala.collection.immutable.List.foreach(List.scala:431)
at com.databricks.spark.chauffeur.DriverDaemonMonitorImpl.goToStopped(DriverDaemonMonitorImpl.scala:295)
at com.databricks.spark.chauffeur.DriverDaemonMonitorImpl.monitorDriver(DriverDaemonMonitorImpl.scala:507)
at com.databricks.spark.chauffeur.DriverDaemonMonitorImpl.$anonfun$job$2(DriverDaemonMonitorImpl.scala:139)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at com.databricks.logging.AttributionContextTracing.$anonfun$withAttributionContext$1(AttributionContextTracing.scala:48)
at com.databricks.logging.AttributionContext$.$anonfun$withValue$1(AttributionContext.scala:276)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:62)
at com.databricks.logging.AttributionContext$.withValue(AttributionContext.scala:272)
at com.databricks.logging.AttributionContextTracing.withAttributionContext(AttributionContextTracing.scala:46)
at com.databricks.logging.AttributionContextTracing.withAttributionContext$(AttributionContextTracing.scala:43)
at com.databricks.spark.chauffeur.DriverDaemonMonitorImpl.withAttributionContext(DriverDaemonMonitorImpl.scala:96)
at com.databricks.logging.AttributionContextTracing.withAttributionTags(AttributionContextTracing.scala:95)
at com.databricks.logging.AttributionContextTracing.withAttributionTags$(AttributionContextTracing.scala:76)
at com.databricks.spark.chauffeur.DriverDaemonMonitorImpl.withAttributionTags(DriverDaemonMonitorImpl.scala:96)
at com.databricks.spark.chauffeur.DriverDaemonMonitorImpl.$anonfun$job$1(DriverDaemonMonitorImpl.scala:139)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at com.databricks.logging.UsageLogging.$anonfun$recordOperation$1(UsageLogging.scala:528)
at com.databricks.logging.UsageLogging.executeThunkAndCaptureResultTags$1(UsageLogging.scala:633)
at com.databricks.logging.UsageLogging.$anonfun$recordOperationWithResultTags$4(UsageLogging.scala:656)
at com.databricks.logging.AttributionContextTracing.$anonfun$withAttributionContext$1(AttributionContextTracing.scala:48)
at com.databricks.logging.AttributionContext$.$anonfun$withValue$1(AttributionContext.scala:276)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:62)
at com.databricks.logging.AttributionContext$.withValue(AttributionContext.scala:272)
at com.databricks.logging.AttributionContextTracing.withAttributionContext(AttributionContextTracing.scala:46)
at com.databricks.logging.AttributionContextTracing.withAttributionContext$(AttributionContextTracing.scala:43)
at com.databricks.threading.SingletonJob$SingletonJobImpl.withAttributionContext(SingletonJob.scala:408)
at com.databricks.logging.AttributionContextTracing.withAttributionTags(AttributionContextTracing.scala:95)
at com.databricks.logging.AttributionContextTracing.withAttributionTags$(AttributionContextTracing.scala:76)
at com.databricks.threading.SingletonJob$SingletonJobImpl.withAttributionTags(SingletonJob.scala:408)
at com.databricks.logging.UsageLogging.recordOperationWithResultTags(UsageLogging.scala:628)
at com.databricks.logging.UsageLogging.recordOperationWithResultTags$(UsageLogging.scala:537)
at com.databricks.threading.SingletonJob$SingletonJobImpl.recordOperationWithResultTags(SingletonJob.scala:408)
at com.databricks.logging.UsageLogging.recordOperation(UsageLogging.scala:529)
at com.databricks.logging.UsageLogging.recordOperation$(UsageLogging.scala:495)
at com.databricks.threading.SingletonJob$SingletonJobImpl.recordOperation(SingletonJob.scala:408)
at com.databricks.threading.SingletonJob$SingletonJobImpl$SingletonRun.$anonfun$run$4(SingletonJob.scala:467)
at com.databricks.logging.AttributionContextTracing.$anonfun$withAttributionContext$1(AttributionContextTracing.scala:48)
at com.databricks.logging.AttributionContext$.$anonfun$withValue$1(AttributionContext.scala:276)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:62)
at com.databricks.logging.AttributionContext$.withValue(AttributionContext.scala:272)
at com.databricks.logging.AttributionContextTracing.withAttributionContext(AttributionContextTracing.scala:46)
at com.databricks.logging.AttributionContextTracing.withAttributionContext$(AttributionContextTracing.scala:43)
at com.databricks.threading.SingletonJob$SingletonJobImpl.withAttributionContext(SingletonJob.scala:408)
at com.databricks.threading.SingletonJob$SingletonJobImpl$SingletonRun.$anonfun$run$3(SingletonJob.scala:467)
at scala.util.Try$.apply(Try.scala:213)
at com.databricks.threading.SingletonJob$SingletonJobImpl$SingletonRun.$anonfun$run$1(SingletonJob.scala:466)
at com.databricks.util.UntrustedUtils$.tryLog(UntrustedUtils.scala:109)
at com.databricks.threading.SingletonJob$SingletonJobImpl$SingletonRun.run(SingletonJob.scala:460)
at com.databricks.threading.ContextBoundRunnable.$anonfun$run$2(ContextBoundRunnable.scala:15)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at com.databricks.logging.AttributionContextTracing.$anonfun$withAttributionContext$1(AttributionContextTracing.scala:48)
at com.databricks.logging.AttributionContext$.$anonfun$withValue$1(AttributionContext.scala:276)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:62)
at com.databricks.logging.AttributionContext$.withValue(AttributionContext.scala:272)
at com.databricks.logging.AttributionContextTracing.withAttributionContext(AttributionContextTracing.scala:46)
at com.databricks.logging.AttributionContextTracing.withAttributionContext$(AttributionContextTracing.scala:43)
at com.databricks.threading.ContextBoundRunnable.withAttributionContext(ContextBoundRunnable.scala:6)
at com.databricks.threading.ContextBoundRunnable.$anonfun$run$1(ContextBoundRunnable.scala:15)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at com.databricks.context.integrity.IntegrityCheckContext$ThreadLocalStorage$.withValue(IntegrityCheckContext.scala:73)
at com.databricks.threading.ContextBoundRunnable.run(ContextBoundRunnable.scala:14)
at com.databricks.threading.InstrumentedExecutorService.$anonfun$makeContextAware$2(InstrumentedExecutorService.scala:141)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at com.databricks.instrumentation.QueuedThreadPoolInstrumenter.trackActiveThreads(QueuedThreadPoolInstrumenter.scala:110)
at com.databricks.instrumentation.QueuedThreadPoolInstrumenter.trackActiveThreads$(QueuedThreadPoolInstrumenter.scala:107)
at com.databricks.threading.InstrumentedExecutorService.trackActiveThreads(InstrumentedExecutorService.scala:40)
at com.databricks.threading.InstrumentedExecutorService.$anonfun$makeContextAware$1(InstrumentedExecutorService.scala:139)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
To view this discussion visit https://groups.google.com/d/msgid/graphframes/11473f00-8e22-4ec9-bdcc-e71ab5362a8dn%40googlegroups.com.
I've been running some tests with the code Russell provided but haven't been able to reproduce the issue. With 2,600 different nodes, the JVM heap didn't exceed 3GB, and the execution completed on time without any errors.
I have no idea how PageRank works—I'm not an expert in graph theory—but since the maximum number of iterations is set to 10, I assume that the dataset Russell is trying to execute somehow requires multiple iterations in certain cases. As I reported in SPARK-50992, Spark is constantly generating the string representation of every internally generated plan, and as the plan grows, more heap is consumed until the JVM throws an OOM.
@Russell Jurney, have you tried disabling AQE? For now, that seems to be the only solution we have for this issue.
Databricks is most likely restarting the driver due to an OOM (Out of Memory) error. I'm afraid Databricks doesn't explicitly show the OOM error in the case of the driver (that happened to me last year). However, if you go to the metrics page and check the memory of the driver node, you'll likely see a huge peak in memory consumption followed by a sharp drop when the driver is restarted.
To view this discussion visit https://groups.google.com/d/msgid/graphframes/feba8edf-3d84-4dae-8209-d9e9b44b61ea%40apache.org.
Totals for _graph_ for 1980, looking back from 1980 to 1980, nodes: 27,397, edges: 57,531 and maximum degree: 43
Totals for _graph_ for 1981, looking back from 1981 to 1980, nodes: 49,324, edges: 120,516 and maximum degree: 70
Totals for _graph_ for 1982, looking back from 1982 to 1980, nodes: 69,012, edges: 188,464 and maximum degree: 113
Totals for _graph_ for 1983, looking back from 1983 to 1980, nodes: 88,122, edges: 265,337 and maximum degree: 197
Totals for _graph_ for 1984, looking back from 1984 to 1980, nodes: 106,992, edges: 351,457 and maximum degree: 330
Totals for _graph_ for 1985, looking back from 1985 to 1980, nodes: 126,447, edges: 452,146 and maximum degree: 481
Totals for _graph_ for 1986, looking back from 1986 to 1980, nodes: 145,379, edges: 555,355 and maximum degree: 592
Totals for _graph_ for 1987, looking back from 1987 to 1980, nodes: 165,519, edges: 668,708 and maximum degree: 654
Totals for _graph_ for 1988, looking back from 1988 to 1980, nodes: 186,240, edges: 803,516 and maximum degree: 741
To view this discussion visit https://groups.google.com/d/msgid/graphframes/CAGUyL8iikNTydYAhKyy9Z2fH9Bkr1j1UDW6%2B860ofb04tPJkgw%40mail.gmail.com.
To view this discussion visit https://groups.google.com/d/msgid/graphframes/49e6affb-1978-4685-b953-a3eca3011091n%40googlegroups.com.
To view this discussion visit https://groups.google.com/d/msgid/graphframes/CAGUyL8g952YU_oUf%3DzgX5i%3D2gqB%3D5-viSUD%3D4V017epyC2zSZw%40mail.gmail.com.
PageRank algorithm implementation. There are two implementations of PageRank.
*
* The first one uses the `org.apache.spark.graphx.graph` interface with `aggregateMessages` and runs
* PageRank for a fixed number of iterations. This can be executed by setting `maxIter`.
* The second implementation uses the `org.apache.spark.graphx.Pregel` interface and runs PageRank until
* convergence and this can be run by setting `tol`.
val pagerankGraph = graph.pageRank.resetProbability(0.15).maxIter(10).run()