Groups keyboard shortcuts have been updated
Dismiss
See shortcuts

AWS EMR cluster cc-pyspark job is ridiculously slow

122 views
Skip to first unread message

Anton Landerer

unread,
Aug 23, 2024, 10:32:29 PM8/23/24
to Common Crawl
My goal is to search the entire Common Crawl dataset for `search_string` in the HTML of all response records. I created a cluster on AWS EMR with a spot instance fleet of 10-20 instances, each with 4 vCores. 

For my test run, I'm using an input of 100 WARC files from the latest crawl. It takes almost 2 hours for my cluster to process all these WARCs. My laptop can run about 1 WARC per minute (including downloading/streaming) with smart_open, so it can do the same job in 1.5 hours. 

If I scaled this linearly, scanning all 90k WARC files would cost upwards of $1000. Could I be making a major error?

Anton Landerer

unread,
Aug 23, 2024, 10:36:57 PM8/23/24
to common...@googlegroups.com
I was expecting each instance to process more like 90 WARCs per hour. And just to emphasize, the 1.5 hour estimate for the 100 WARCs on my laptop doesn’t even account for any parallelism I could apply.

--
You received this message because you are subscribed to the Google Groups "Common Crawl" group.
To unsubscribe from this group and stop receiving emails from it, send an email to common-crawl...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/common-crawl/c1aa2164-7b11-4acc-a7ce-0f84716dc81an%40googlegroups.com.

Anton Landerer

unread,
Aug 23, 2024, 11:44:41 PM8/23/24
to Common Crawl
Also, interestingly I got exactly 368 "normalized instance hours" for two separate test runs. One with general purposes instance types, the other with compute optimized types. I should also mention I modified the script to not store WARCs in temp storage but instead keep them in RAM. I didn't see the reason to put them on disk when each WARC is immediately processed after downloading and then tossed.

Henry S. Thompson

unread,
Aug 25, 2024, 10:38:33 AM8/25/24
to common...@googlegroups.com
Anton Landerer writes:

> For my test run, I'm using an input of 100 WARC files from the
> latest crawl. It takes almost 2 hours for my cluster to process all
> these WARCs. My laptop can run about 1 WARC per minute (including
> downloading/streaming) with smart_open, so it can do the same job in
> 1.5 hours.

Just to confirm that your laptop is doing pretty well, but you're
certainly losing in your cluster setup somehow.

For _local_ files, for example, here's my result for searching 100
WARC (local) files using 10 threads on a lightly loaded 2.10 GHz Intel
Xeon with 18 2cpu cores:

>: time ls *001??.warc.gz | parallel -j 10 "uz '{}' | egrep -iac '\bthe\b'"
325270
300294
313315
...
311426
316116
327770

real 8m11.985s
user 84m48.443s
sys 6m25.282s

That's 5088 seconds for 100 searches =~ 51 seconds each of compute
time.

So streaming, even assuming you're using a single download thread for
your laptop test, must be taking most of the time.

And indeed streaming files these days using AWS S3 is taking me at least an
hour per segment (for 2022-33, as it happens) using 10 threads == 600
minutes for 800 files == 45 seconds per file.

So adding the above two tasks on my setup we get 95 seconds =~ 1.5 minutes
per WARC file, same as your laptop.

Conclusion: Your cluster is not set up properly, as you're not
getting _any_ benefit, indeed it's costing you, to multiplex the job
over somewhere between 20 and 40 pairs of threads.

ht
--
Henry S. Thompson, School of Informatics, University of Edinburgh
10 Crichton Street, Edinburgh EH8 9AB, SCOTLAND
e-mail: h...@inf.ed.ac.uk
URL: https://www.ltg.ed.ac.uk/~ht/
[mail from me _always_ has a .sig like this -- mail without it is forged spam]

Tom Morris

unread,
Aug 25, 2024, 6:10:56 PM8/25/24
to common...@googlegroups.com
On Fri, Aug 23, 2024 at 10:32 PM Anton Landerer <alan...@ucsb.edu> wrote:
My goal is to search the entire Common Crawl dataset for `search_string` in the HTML of all response records. I created a cluster on AWS EMR with a spot instance fleet of 10-20 instances, each with 4 vCores. 

It would be helpful to know the actual instance type, but it's important to know that I/O bandwidth scales with the size of the instances and the smaller instance types can be seriously anemic in the I/O bandwidth department. The table in this README is old, but it gives you an idea of how dramatically I/O bandwidth varies (as well as a ballpark for your instance type).

Tom 

Anton Landerer

unread,
Aug 25, 2024, 8:17:59 PM8/25/24
to Common Crawl
I got exactly 368 "normalized instance hours" with 2 different task fleet types (general and compute specialized), and that made me think there's a utilization problem. I tried removing the spot task fleet entirely, only using an m5.2xlarge instance as my core node to do the processing. It finished in exactly the same amount of time (but of course with less instance hours). So my problem is just that my task fleet isn't used at all. I guess this makes my issue more of a spark config problem than a common crawl problem, but of course I'll still appreciate any advice people have. 

The below is the example I followed to configure everything:

Tom Morris

unread,
Aug 25, 2024, 9:21:44 PM8/25/24
to common...@googlegroups.com
On Sun, Aug 25, 2024 at 8:18 PM Anton Landerer <alan...@ucsb.edu> wrote:
 I guess this makes my issue more of a spark config problem than a common crawl problem, but of course I'll still appreciate any advice people have. 

Yes, it sounds like you have a bottleneck causing unexpected serialization of the processing.
Why not start from a CommonCrawl specific example, such as ones in https://github.com/commoncrawl/cc-pyspark, rather than a generic EMR example ?

Also, if your unspecified "search_string" is in the rendered text, not the HTML tags, you might want to consider using the WET files instead of the WARC files since they are much more compact (but only if you trust the default text extraction process).

Good luck!

Tom
 

Anton Landerer

unread,
Aug 25, 2024, 11:43:01 PM8/25/24
to Common Crawl
I used the https://github.com/commoncrawl/cc-pyspark repo and only modified the process_record() function. The other github I linked is for my EMR setup. Also my `search_string` is in the HTML source, not the rendered text.

Anton Landerer

unread,
Aug 26, 2024, 6:07:59 PM8/26/24
to Common Crawl
I figured out that Spark is putting everything into a single task. That task is running on a spot instance (so the problem isn't with spot configuration), and all the executors on the other spot instances get killed because they have no task. I've made sure my input RDD has 50 partitions before I call mapPartitions(process_warcs), but I can't get more than 1 task running. 

Anton Landerer

unread,
Aug 26, 2024, 7:27:33 PM8/26/24
to Common Crawl
Finally got it working! For anyone interested, here are the spark configs i used:
```
    "Configurations": [
        {
            "Classification": "spark",
            "Properties": {
                "maximizeResourceAllocation": "true"
            }
        },
        {
            "Classification": "spark-defaults",
            "Properties": {
                "spark.dynamicAllocation.enabled": "true",
                "spark.default.parallelism": "100"
            }
        }
    ]
```

I made sure that my CORE node had the same vCores/memory as my spot TASK fleet types, because `maximizeResourceAllocation` calculates `spark-defaults` based on nodes available at the start of the job even though the TASK fleet provisions later. I don't think setting `spark.default.parallelism` is necessary with the minPartitions argument in the python script. The other key thing was getting spark to create more tasks. Here is my code:
```
        input_data = session.sparkContext.textFile(self.args.input, minPartitions=self.args.num_input_partitions)
        self.get_logger().info(f"Number of partitions: {input_data.getNumPartitions()}")

        output = input_data.mapPartitions(self.process_warcs)
        output.cache()
        output.count()

        session.createDataFrame(output, schema=self.output_schema) \
            .coalesce(self.args.num_output_partitions) \
            .write.mode('overwrite').csv(self.args.output)
```

For some reason Spark would only create 1 task if `output` was computed during the createDataFrame() step, but I got 1 task per partition by using `output.count()` to force computation earlier. 

Tom Morris

unread,
Aug 26, 2024, 10:39:07 PM8/26/24
to common...@googlegroups.com
Congratulations! And thanks for reporting back on the keys to success.
Reply all
Reply to author
Forward
0 new messages