Lazy evaluation in a GraphFrame?

13 views
Skip to first unread message

Russell Jurney

unread,
Feb 11, 2025, 9:48:03 AMFeb 11
to GraphFrames
I have a Spark / GraphFrames question... if I loop, and in each loop I define a new GraphFrame and run pagerank... will it lazy evaluate and run multiple pageranks at the same time? Having a lot of trouble with this script... it takes the single entire cluster down after a few iterations. Not at all sure what I'm doing wrong but I've been debugging it for a week so here I am :) Thought I'd try this list before user@spark...

The code is approximately:

list_of_years = range(2000, 2025)

first_year = 2000

for metric_year in list_of_years:


    # Filter the graph to edges between 2000 and the metric_year, then run PageRank
    
loop_edges: DataFrame = edges.filter(F.col("year").between(first_year, metric_year))

    loop_nodes: DataFrame = (

        loop_edges.selectExpr("src as id")

        .union(loop_edges.selectExpr("dst as id"))

        .distinct()

    )

    loop_graph: GraphFrame = GraphFrame(loop_nodes, loop_edges)

    pagerank_graph = loop_graph.pageRank(resetProbability=0.15, maxIter=10)

    pagerank_df = (

        pagerank_graph.vertices.withColumn("metric_date", F.lit(metric_year))

        .select("id", "metric_date", "pagerank")

    )

    pagerank_df.write.parquet("data/pagerank.parquet", mode="append")

I've tried a lot of variations and can't figure out what is going on...


Thanks,

Erik Eklund

unread,
Feb 11, 2025, 9:58:17 AMFeb 11
to GraphFrames
Since you have an action in each loop it will not proceed with the next year until it finished the current year.
You could append the pagerank_df into a union which you then save once. Then all years would run in parallel assuming the pagerank implementation does not have any actions in it.

How is edges created? Maybe you can cache/persist it since it will be used many times.

What kind of error do you get?

Russell Jurney

unread,
Feb 11, 2025, 10:06:42 AMFeb 11
to Erik Eklund, GraphFrames
Thanks, Erik. I do cache the edges DataFrame and I don't actually want all the jobs to run at once - at least I thought that would overload things. I did in the beginning union with a single DataFrame but I sought to make it work bit-by-bit as it keeps crashing. What I haven't tried doing is storing and loading it each loop... that is a great idea :)

I suspect, given the way I've run this over and over in different ways that the pageRank is leaking something that is breaking things. I suspect but I don't know.

Russell

--
You received this message because you are subscribed to the Google Groups "GraphFrames" group.
To unsubscribe from this group and stop receiving emails from it, send an email to graphframes...@googlegroups.com.
To view this discussion visit https://groups.google.com/d/msgid/graphframes/8848d854-dee6-4ba4-98a5-105c0c7ef4f1n%40googlegroups.com.

Erik Eklund

unread,
Feb 11, 2025, 10:14:54 AMFeb 11
to GraphFrames
Hard to know without your actual error. Do you know at what year gap it fails? Since the graph grows and grows up to 25 years gap it might just be to big?

Russell Jurney

unread,
Feb 11, 2025, 10:19:28 AMFeb 11
to Erik Eklund, GraphFrames
It does definitely grow, but what does "too big" mean here? It should be able to handle that, right?

Russell

Russell Jurney

unread,
Feb 11, 2025, 10:23:41 AMFeb 11
to Erik Eklund, GraphFrames
This is the exception on Databricks... will get the one from my machine too:

The spark driver has stopped unexpectedly and is restarting. Your notebook will be automatically reattached.

at com.databricks.spark.chauffeur.Chauffeur.onDriverStateChange(Chauffeur.scala:1551)

at com.databricks.spark.chauffeur.Chauffeur.$anonfun$driverStateOpt$1(Chauffeur.scala:203)

at com.databricks.spark.chauffeur.Chauffeur.$anonfun$driverStateOpt$1$adapted(Chauffeur.scala:203)

at com.databricks.spark.chauffeur.DriverDaemonMonitorImpl.$anonfun$goToStopped$4(DriverDaemonMonitorImpl.scala:295)

at com.databricks.spark.chauffeur.DriverDaemonMonitorImpl.$anonfun$goToStopped$4$adapted(DriverDaemonMonitorImpl.scala:295)

at scala.collection.immutable.List.foreach(List.scala:431)

at com.databricks.spark.chauffeur.DriverDaemonMonitorImpl.goToStopped(DriverDaemonMonitorImpl.scala:295)

at com.databricks.spark.chauffeur.DriverDaemonMonitorImpl.monitorDriver(DriverDaemonMonitorImpl.scala:507)

at com.databricks.spark.chauffeur.DriverDaemonMonitorImpl.$anonfun$job$2(DriverDaemonMonitorImpl.scala:139)

at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)

at com.databricks.logging.AttributionContextTracing.$anonfun$withAttributionContext$1(AttributionContextTracing.scala:48)

at com.databricks.logging.AttributionContext$.$anonfun$withValue$1(AttributionContext.scala:276)

at scala.util.DynamicVariable.withValue(DynamicVariable.scala:62)

at com.databricks.logging.AttributionContext$.withValue(AttributionContext.scala:272)

at com.databricks.logging.AttributionContextTracing.withAttributionContext(AttributionContextTracing.scala:46)

at com.databricks.logging.AttributionContextTracing.withAttributionContext$(AttributionContextTracing.scala:43)

at com.databricks.spark.chauffeur.DriverDaemonMonitorImpl.withAttributionContext(DriverDaemonMonitorImpl.scala:96)

at com.databricks.logging.AttributionContextTracing.withAttributionTags(AttributionContextTracing.scala:95)

at com.databricks.logging.AttributionContextTracing.withAttributionTags$(AttributionContextTracing.scala:76)

at com.databricks.spark.chauffeur.DriverDaemonMonitorImpl.withAttributionTags(DriverDaemonMonitorImpl.scala:96)

at com.databricks.spark.chauffeur.DriverDaemonMonitorImpl.$anonfun$job$1(DriverDaemonMonitorImpl.scala:139)

at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)

at com.databricks.logging.UsageLogging.$anonfun$recordOperation$1(UsageLogging.scala:528)

at com.databricks.logging.UsageLogging.executeThunkAndCaptureResultTags$1(UsageLogging.scala:633)

at com.databricks.logging.UsageLogging.$anonfun$recordOperationWithResultTags$4(UsageLogging.scala:656)

at com.databricks.logging.AttributionContextTracing.$anonfun$withAttributionContext$1(AttributionContextTracing.scala:48)

at com.databricks.logging.AttributionContext$.$anonfun$withValue$1(AttributionContext.scala:276)

at scala.util.DynamicVariable.withValue(DynamicVariable.scala:62)

at com.databricks.logging.AttributionContext$.withValue(AttributionContext.scala:272)

at com.databricks.logging.AttributionContextTracing.withAttributionContext(AttributionContextTracing.scala:46)

at com.databricks.logging.AttributionContextTracing.withAttributionContext$(AttributionContextTracing.scala:43)

at com.databricks.threading.SingletonJob$SingletonJobImpl.withAttributionContext(SingletonJob.scala:408)

at com.databricks.logging.AttributionContextTracing.withAttributionTags(AttributionContextTracing.scala:95)

at com.databricks.logging.AttributionContextTracing.withAttributionTags$(AttributionContextTracing.scala:76)

at com.databricks.threading.SingletonJob$SingletonJobImpl.withAttributionTags(SingletonJob.scala:408)

at com.databricks.logging.UsageLogging.recordOperationWithResultTags(UsageLogging.scala:628)

at com.databricks.logging.UsageLogging.recordOperationWithResultTags$(UsageLogging.scala:537)

at com.databricks.threading.SingletonJob$SingletonJobImpl.recordOperationWithResultTags(SingletonJob.scala:408)

at com.databricks.logging.UsageLogging.recordOperation(UsageLogging.scala:529)

at com.databricks.logging.UsageLogging.recordOperation$(UsageLogging.scala:495)

at com.databricks.threading.SingletonJob$SingletonJobImpl.recordOperation(SingletonJob.scala:408)

at com.databricks.threading.SingletonJob$SingletonJobImpl$SingletonRun.$anonfun$run$4(SingletonJob.scala:467)

at com.databricks.logging.AttributionContextTracing.$anonfun$withAttributionContext$1(AttributionContextTracing.scala:48)

at com.databricks.logging.AttributionContext$.$anonfun$withValue$1(AttributionContext.scala:276)

at scala.util.DynamicVariable.withValue(DynamicVariable.scala:62)

at com.databricks.logging.AttributionContext$.withValue(AttributionContext.scala:272)

at com.databricks.logging.AttributionContextTracing.withAttributionContext(AttributionContextTracing.scala:46)

at com.databricks.logging.AttributionContextTracing.withAttributionContext$(AttributionContextTracing.scala:43)

at com.databricks.threading.SingletonJob$SingletonJobImpl.withAttributionContext(SingletonJob.scala:408)

at com.databricks.threading.SingletonJob$SingletonJobImpl$SingletonRun.$anonfun$run$3(SingletonJob.scala:467)

at scala.util.Try$.apply(Try.scala:213)

at com.databricks.threading.SingletonJob$SingletonJobImpl$SingletonRun.$anonfun$run$1(SingletonJob.scala:466)

at com.databricks.util.UntrustedUtils$.tryLog(UntrustedUtils.scala:109)

at com.databricks.threading.SingletonJob$SingletonJobImpl$SingletonRun.run(SingletonJob.scala:460)

at com.databricks.threading.ContextBoundRunnable.$anonfun$run$2(ContextBoundRunnable.scala:15)

at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)

at com.databricks.logging.AttributionContextTracing.$anonfun$withAttributionContext$1(AttributionContextTracing.scala:48)

at com.databricks.logging.AttributionContext$.$anonfun$withValue$1(AttributionContext.scala:276)

at scala.util.DynamicVariable.withValue(DynamicVariable.scala:62)

at com.databricks.logging.AttributionContext$.withValue(AttributionContext.scala:272)

at com.databricks.logging.AttributionContextTracing.withAttributionContext(AttributionContextTracing.scala:46)

at com.databricks.logging.AttributionContextTracing.withAttributionContext$(AttributionContextTracing.scala:43)

at com.databricks.threading.ContextBoundRunnable.withAttributionContext(ContextBoundRunnable.scala:6)

at com.databricks.threading.ContextBoundRunnable.$anonfun$run$1(ContextBoundRunnable.scala:15)

at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)

at com.databricks.context.integrity.IntegrityCheckContext$ThreadLocalStorage$.withValue(IntegrityCheckContext.scala:73)

at com.databricks.threading.ContextBoundRunnable.run(ContextBoundRunnable.scala:14)

at com.databricks.threading.InstrumentedExecutorService.$anonfun$makeContextAware$2(InstrumentedExecutorService.scala:141)

at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)

at com.databricks.instrumentation.QueuedThreadPoolInstrumenter.trackActiveThreads(QueuedThreadPoolInstrumenter.scala:110)

at com.databricks.instrumentation.QueuedThreadPoolInstrumenter.trackActiveThreads$(QueuedThreadPoolInstrumenter.scala:107)

at com.databricks.threading.InstrumentedExecutorService.trackActiveThreads(InstrumentedExecutorService.scala:40)

at com.databricks.threading.InstrumentedExecutorService.$anonfun$makeContextAware$1(InstrumentedExecutorService.scala:139)

at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)

at java.util.concurrent.FutureTask.run(FutureTask.java:266)

at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)

at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)

at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)

at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)

Erik Eklund

unread,
Feb 11, 2025, 10:33:22 AMFeb 11
to GraphFrames
I dont know databricks but that error does not say anything useful for me unfortunately :/

Ángel

unread,
Feb 11, 2025, 11:49:42 AMFeb 11
to Erik Eklund, GraphFrames
Grrreat! A new "mystery" to investigate. Thanks, Russell.



Sem

unread,
Feb 11, 2025, 12:01:04 PMFeb 11
to graph...@googlegroups.com
Based on the error it looks like the problem is on the driver that may
indicate you are attempting to run a very big plan. What is `edges`
DataFrame? How was it created?

Best regards,

Sem
> graphlet.ai <https://graphlet.ai/> |
> Graphlet AI Blog
> <https://blog.graphlet.ai/> | LinkedIn
> <https://linkedin.com/in/russelljurney> |
> BlueSky
> <https://bsky.app/profile/rjurney.bsky.social>
>
> --
> You received this message because you are
> subscribed to the Google Groups "GraphFrames"
> group.
> To unsubscribe from this group and stop
> receiving emails from it, send an email to
> graphframes...@googlegroups.com.
> To view this discussion visit
> https://groups.google.com/d/msgid/graphframes/8848d854-dee6-4ba4-98a5-105c0c7ef4f1n%40googlegroups.com
> <https://groups.google.com/d/msgid/graphframes/8848d854-dee6-4ba4-98a5-105c0c7ef4f1n%40googlegroups.com?utm_medium=email&utm_source=footer>.
>
> --
> You received this message because you are subscribed
> to the Google Groups "GraphFrames" group.
> To unsubscribe from this group and stop receiving
> emails from it, send an email to
> graphframes...@googlegroups.com.
> To view this discussion visit
> https://groups.google.com/d/msgid/graphframes/577cc44b-8f05-4918-b791-cf293051a47en%40googlegroups.com
> <https://groups.google.com/d/msgid/graphframes/577cc44b-8f05-4918-b791-cf293051a47en%40googlegroups.com?utm_medium=email&utm_source=footer>.
>
> --
> You received this message because you are subscribed to the Google
> Groups "GraphFrames" group.
> To unsubscribe from this group and stop receiving emails from it,
> send an email to graphframes...@googlegroups.com.
> To view this discussion visit
> https://groups.google.com/d/msgid/graphframes/11473f00-8e22-4ec9-bdcc-e71ab5362a8dn%40googlegroups.com
> <https://groups.google.com/d/msgid/graphframes/11473f00-8e22-4ec9-bdcc-e71ab5362a8dn%40googlegroups.com?utm_medium=email&utm_source=footer>.
>
> --
> You received this message because you are subscribed to the Google
> Groups "GraphFrames" group.
> To unsubscribe from this group and stop receiving emails from it, send
> an email to graphframes...@googlegroups.com.
> To view this discussion visit
> https://groups.google.com/d/msgid/graphframes/CAGUyL8jiBdLUoD2FaQw2j5mYSQxKXGQe%2B6J_QD%3DQNRi9qZZfOQ%40mail.gmail.com
> <https://groups.google.com/d/msgid/graphframes/CAGUyL8jiBdLUoD2FaQw2j5mYSQxKXGQe%2B6J_QD%3DQNRi9qZZfOQ%40mail.gmail.com?utm_medium=email&utm_source=footer>.

Ángel

unread,
Feb 12, 2025, 12:46:05 AMFeb 12
to Sem, graph...@googlegroups.com

I've been running some tests with the code Russell provided but haven't been able to reproduce the issue. With 2,600 different nodes, the JVM heap didn't exceed 3GB, and the execution completed on time without any errors.

I have no idea how PageRank works—I'm not an expert in graph theory—but since the maximum number of iterations is set to 10, I assume that the dataset Russell is trying to execute somehow requires multiple iterations in certain cases. As I reported in SPARK-50992, Spark is constantly generating the string representation of every internally generated plan, and as the plan grows, more heap is consumed until the JVM throws an OOM.

@Russell Jurney, have you tried disabling AQE? For now, that seems to be the only solution we have for this issue.

Databricks is most likely restarting the driver due to an OOM (Out of Memory) error. I'm afraid Databricks doesn't explicitly show the OOM error in the case of the driver (that happened to me last year). However, if you go to the metrics page and check the memory of the driver node, you'll likely see a huge peak in memory consumption followed by a sharp drop when the driver is restarted.


Russell Jurney

unread,
Feb 13, 2025, 11:52:23 PMFeb 13
to Ángel, Sem, graph...@googlegroups.com
I have determined that yes, turning off either speculative execution or AQE will make the script run indefinitely.

Here's the runlog:

Totals for _graph_ for 1980, looking back from 1980 to 1980, nodes: 27,397, edges: 57,531 and maximum degree: 43

Totals for _graph_ for 1981, looking back from 1981 to 1980, nodes: 49,324, edges: 120,516 and maximum degree: 70

Totals for _graph_ for 1982, looking back from 1982 to 1980, nodes: 69,012, edges: 188,464 and maximum degree: 113

Totals for _graph_ for 1983, looking back from 1983 to 1980, nodes: 88,122, edges: 265,337 and maximum degree: 197

Totals for _graph_ for 1984, looking back from 1984 to 1980, nodes: 106,992, edges: 351,457 and maximum degree: 330

Totals for _graph_ for 1985, looking back from 1985 to 1980, nodes: 126,447, edges: 452,146 and maximum degree: 481

Totals for _graph_ for 1986, looking back from 1986 to 1980, nodes: 145,379, edges: 555,355 and maximum degree: 592

Totals for _graph_ for 1987, looking back from 1987 to 1980, nodes: 165,519, edges: 668,708 and maximum degree: 654

Totals for _graph_ for 1988, looking back from 1988 to 1980, nodes: 186,240, edges: 803,516 and maximum degree: 741


Here are the nodes and edge files for stats.meta.stackexchange.com that may be big enough to reproduce the same behavior:

https://rjurneyopen.s3.amazonaws.com/stats.meta/Nodes.parquet

They may be useful for testing.

Russell


Russell Jurney

unread,
Feb 14, 2025, 12:19:08 AMFeb 14
to Ángel, Sem, graph...@googlegroups.com
Sorry, I'll make a unit including these datasets.

The weird thing is how incredibly slow the computation is on this network... it grinds to a crawl without AQE, which I disabled with spark.sql.adaptive.enabled false. These networks do grow in size but I would not expect the size of the work to grow this quickly unless the degree exploded.

Russell

Erik Eklund

unread,
Feb 14, 2025, 3:58:53 AMFeb 14
to GraphFrames
I had a brief look at the pagerank implementation in graphx and it consists of many shuffle operations to get the data where it needs to for the calculations. So then my best guess is that your cluster dont have enough resources. How many executors and how big are they? Does the spark UI tell you anything?

Ángel

unread,
Feb 14, 2025, 11:12:52 AMFeb 14
to Russell Jurney, Sem, graph...@googlegroups.com
I'm getting the following error when trying to load the Nodes.parquet files you attached:

DataSource: [COLUMN_ALREADY_EXISTS] The column `id` already exists. Consider to choose another name or rename the existing column.

Getting the nodes from the edges dataframe (the way you did in the code you sent some days ago) the PageRank ends some minutes later still without any errors.

 

Ángel

unread,
Feb 14, 2025, 11:19:16 AMFeb 14
to Erik Eklund, GraphFrames
While it's true that there's a lot of shuffle, there are also lots of transformations performed too. I'm not sure that's the reason of the driver restarting. To me, that sounds more like an OOM on the driver side.

image.png

On the other hand, I've also had a brief look at the PageRank implementation in the graphx and ... now I'm also not so sure why disabling AQE works, because I had forgotten that graphx uses the RDD API, not the Dataframe API, so ... AQE shouldn't play any role here, should it? 
This is getting even more interesting ... any ideas?

Russell Jurney

unread,
Feb 14, 2025, 10:45:54 PMFeb 14
to Ángel, Erik Eklund, GraphFrames
Well there are two GraphFrames pageRank implementations. One is GraphX and runs until convergence, the other is GraphFrames’ aggregateMessages directly.

Sorry for the nodes error, I’ll fix. There is an id and Id field and you have to change a setting.

Ángel

unread,
Feb 15, 2025, 1:07:03 AMFeb 15
to Russell Jurney, Erik Eklund, GraphFrames
I'm calling PageRank in the same way the code you sent, and according to the comments in the class:
PageRank algorithm implementation. There are two implementations of PageRank.
*
* The first one uses the `org.apache.spark.graphx.graph` interface with `aggregateMessages` and runs
* PageRank for a fixed number of iterations. This can be executed by setting `maxIter`.
* The second implementation uses the `org.apache.spark.graphx.Pregel` interface and runs PageRank until
* convergence and this can be run by setting `tol`.
I'm using maxIter, so my test is using the first one.
val pagerankGraph = graph.pageRank.resetProbability(0.15).maxIter(10).run()
Anyway, the PageRank graphframes class is quite simple and both implementations end up using graphx.  

Don't worry about the nodes file, I'm getting the nodes from the edges dataset, so no need for that file.


Russell Jurney

unread,
Feb 15, 2025, 8:01:00 PMFeb 15
to Ángel, Erik Eklund, GraphFrames
Thing is I filter the nodes differently on each iteration based on the date, ann increasingly large anmount of data and I think that makes the plan more complex.

Thanks, I could maybe try taking my own advice on which thing to run 😛

I appreciate you looking into this!
Reply all
Reply to author
Forward
0 new messages