Process monthly warc files

192 views
Skip to first unread message

Meghana Agrawal

unread,
Feb 15, 2022, 3:34:08 PM2/15/22
to Common Crawl
Hi team, 
I want to process all warc files on AWS EMR for a given month, in this case, CC-MAIN-2022-05. I am using 20 instances of core nodes of instance type: r5.12xlarge and driver and executor memory 21G. 
Even after trying multiple times, the processing fails. 
I need to use warc file only, and wat and wet can't help.
Can anyone suggest the best optimum spark configuration to process one month crawl in minimum time. 

Sebastian Nagel

unread,
Feb 16, 2022, 3:25:45 AM2/16/22
to common...@googlegroups.com
Hi,

we're seeing an exceptionally high rate of HTTP 503s when requesting
data from our bucket s3://commoncrawl/ and working on a solution.

For now, we can only recommend you to wait until the problem is fixed.
We'll post updates here in this discussion group.

Thanks for your patience!

Best,
Sebastian

Jay Patel

unread,
Feb 16, 2022, 5:17:53 AM2/16/22
to common...@googlegroups.com
I think all the users of common crawl data seem to have increased the number of retries in their code (to combat the 503 errors), and that feeds into the vicious circle of getting even more 503 errors since we all exceed the collective rate limit of the common crawl S3 bucket.

Having said that, dont you think 20 instances of r5.12xlarge is a bit of a overkill? 

Each contains 48 cores at 300+ GB of memory. With this much computing power thrown at common crawl S3 bucket, your own server cluster will get you nearly at the maximum limit of a S3 bucket (is it still 5500 requests/second?). 

If at all your use case can handle it then can you slow down and make only a dew dozen requests from your server cluster and see if it mitigates the issue? 

--
You received this message because you are subscribed to the Google Groups "Common Crawl" group.
To unsubscribe from this group and stop receiving emails from it, send an email to common-crawl...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/common-crawl/2545cef3-74d1-9d83-3397-ef9a952cdf78%40commoncrawl.org.

Meghana Agrawal

unread,
Feb 16, 2022, 8:54:08 AM2/16/22
to Common Crawl
Thanks for the reply. I was also getting Slow down error so to combat that, downloaded all the monthly warc content in my personal bucket. Now I am not getting Slow Down error but Spark context getting shut down. 
I am using minPartitions as 1000 to process only 72 warc files per task.  
I have a feeling that spark context is shutting down because of memory error. 
I was getting error while keeping minPartitions as 400 as well. 
Is there some recommended number?
Thanks
Meghana

Sebastian Nagel

unread,
Feb 16, 2022, 11:17:50 AM2/16/22
to common...@googlegroups.com
Hi Meghana, hi Jay,

> I am using minPartitions as 1000 to process only 72 warc files per
> task.
> I have a feeling that spark context is shutting down because of memory
> error.

The reason of the error should be logged in the (driver) logs, see [1,2].

> I was getting error while keeping minPartitions as 400 as well.
> Is there some recommended number?

In order to avoid that compute resources are spinning without work,
you would typically choose minPartitions as a multiple of the max.
number of concurrent task your cluster is able to run - basically, it's
num_executors * cores_per_executor.

However, I'd recommended not to try to process the WARC files of an
entire monthly crawl in a single Spark job. Instead, split the data
into 10 parts and write the result as an intermediate output to S3.
Later combine the parts into your final result:
1. web data is unpredictable and may trigger rare exceptions.
My own experience: you think all exceptions are handled but
then after a billion of records you hit an unexpected one.
Then not the entire job is lost but only the current part.
2. this would allow you to use cheap EC2 spot instances.


> the maximum limit of a S3 bucket (is it still 5500
> requests/second?).

The limit does apply per prefix on a bucket, see [3].


Best,
Sebastian

[1]
https://aws.amazon.com/premiumsupport/knowledge-center/spark-driver-logs-emr-cluster/
[2]
https://docs.aws.amazon.com/emr/latest/ManagementGuide/emr-manage-view-web-log-files.html
[3]
https://aws.amazon.com/premiumsupport/knowledge-center/s3-503-within-request-rate-prefix/
> <https://groups.google.com/d/msgid/common-crawl/2545cef3-74d1-9d83-3397-ef9a952cdf78%40commoncrawl.org>.
>
> --
> You received this message because you are subscribed to the Google
> Groups "Common Crawl" group.
> To unsubscribe from this group and stop receiving emails from it, send
> an email to common-crawl...@googlegroups.com
> <mailto:common-crawl...@googlegroups.com>.
> To view this discussion on the web visit
> https://groups.google.com/d/msgid/common-crawl/250d43cb-889d-4b80-b6e4-f824016b75e7n%40googlegroups.com
> <https://groups.google.com/d/msgid/common-crawl/250d43cb-889d-4b80-b6e4-f824016b75e7n%40googlegroups.com?utm_medium=email&utm_source=footer>.

Meghana Agrawal

unread,
Mar 9, 2022, 6:48:19 AM3/9/22
to Common Crawl
Thanks for the suggestion Sebastian
I observed that when I was using 5 cores in one executor then Spark context was shutting down but such is not the case when using 1 core per executor. I wanted to know, that what is the ideal executor memory overhead and executor memory to be given for processing 1 compressed warc file (1.1Gb). Is 2Gb sufficient or should it be > 10G. 
Meghana

Sebastian Nagel

unread,
Mar 9, 2022, 7:51:31 AM3/9/22
to common...@googlegroups.com
Hi Meghana,

> what is the ideal executor memory overhead and executor memory to
> be given for processing 1 compressed warc file (1.1Gb). Is 2Gb
> sufficient or should it be > 10G.

2 GB can be sufficient. But the answer depends on

- the memory requirements of your processing routines

- whether the WARC file is kept completely in memory or backed by disk
as a temporary file. In the latter case the 1 GB is mapped memory
which does not count into the executor memory. Mapped memory is
freed by the operating system if the overall memory is not sufficient.

Usually, only a single WARC record needs to be in memory and Common
Crawl puts a limit on the payload size (max. 1 MiB). This keeps the
the memory requirements very low.

One final notes: if you're still using r5.* instances: there are 8 GB of
RAM per vCPU which means there is no need to limit the executor memory
per executor core to 2 GB.

Best,
Sebastian
> <https://groups.google.com/d/msgid/common-crawl/250d43cb-889d-4b80-b6e4-f824016b75e7n%40googlegroups.com?utm_medium=email&utm_source=footer
> <https://groups.google.com/d/msgid/common-crawl/250d43cb-889d-4b80-b6e4-f824016b75e7n%40googlegroups.com?utm_medium=email&utm_source=footer>>.
>
>
> --
> You received this message because you are subscribed to the Google
> Groups "Common Crawl" group.
> To unsubscribe from this group and stop receiving emails from it, send
> an email to common-crawl...@googlegroups.com
> <mailto:common-crawl...@googlegroups.com>.
> To view this discussion on the web visit
> https://groups.google.com/d/msgid/common-crawl/20309ca7-7976-4076-a350-8625c9facedbn%40googlegroups.com
> <https://groups.google.com/d/msgid/common-crawl/20309ca7-7976-4076-a350-8625c9facedbn%40googlegroups.com?utm_medium=email&utm_source=footer>.

Meghana Agrawal

unread,
Mar 9, 2022, 8:47:38 AM3/9/22
to Common Crawl
Hi Sebastian, 
Oh, this is really helpful. Understood. From your point, yes, there is no need to use r5 instances. Will try out the case with m5 and c5 and check the optimal memory for executor. 
Meghana

Greg Womack

unread,
Mar 19, 2024, 4:06:26 PMMar 19
to Common Crawl
Hi Sebastian, 

Reviving this old thread because I'm having similar issues. I've been processing the wet files in Spark and am finding that workers crash when using 32 GB 8 core instances. The problem is workers are downloading 8 gzipped files at a time, which unzip to 1GB+. Spark tries to do the gzip decompression in-memory, runs out of memory and crashes. I've been able to fix this for the wet files by upgrading to 64GB machines. 

The WARC files however are 800MB+ gzipped, which is almost certainly going to cause OOM problems when Spark tries to unzip them in parallel. 

Would it be possible to limit file output size to something smaller on future crawls, perhaps 60MB gzipped per file? 

Thanks,

Greg

Henry S. Thompson

unread,
Mar 20, 2024, 6:42:14 AMMar 20
to common...@googlegroups.com
Greg Womack writes:

> The WARC files however are 800MB+ gzipped, which is almost certainly
> going to cause OOM problems when Spark tries to unzip them in
> parallel.
>
> Would it be possible to limit file output size to something smaller
> on future crawls, perhaps 60MB gzipped per file?

Um, that means increasing the _number_ of warc files per crawl to the
order of 10K, which translates into an order of magnitude slowdown in,
for example, simple command-line file-by-file script processing...
And yes, people can rewrite their scripts to mitigate the impact, but
so, presumably, can you rewrite your Spark scripts.

So, not a decision to be taken lightly.

ht
--
Henry S. Thompson, School of Informatics, University of Edinburgh
10 Crichton Street, Edinburgh EH8 9AB, SCOTLAND
e-mail: h...@inf.ed.ac.uk
URL: https://www.ltg.ed.ac.uk/~ht/
[mail from me _always_ has a .sig like this -- mail without it is forged spam]

Sebastian Nagel

unread,
Mar 20, 2024, 2:41:38 PMMar 20
to common...@googlegroups.com
Hi Greg,

I only can second Henry...

The 1 GiB WARC file size is a recommendation of the WARC standard [1].
WET and WAT files are smaller because every file contains the
transformations of the records of a WARC file.

It should be possible to let the Spark job read the WET files record per
record, decompressing only a single record at time per executor.

Our example project cc-pyspark [2] downloads the WARC/WAT/WET files to
a temporary file and then uses a WARC parser (warcio or fastwarc) to
process them record by record (and also decompressing record by record).

Unfortunately, we (yet) do not provide a Java (or Scala) Spark example.
In a JVM language it should be possible (at least, much easier than in
Python) to implement a WARC input format reading record by record.

There might be example out in the internet. Does any have a good pointer?

@Greg: if you could share your code (or parts of it) we might be able
to help you to find a solution.

Best,
Sebastian

[1]
https://iipc.github.io/warc-specifications/specifications/warc-format/warc-1.1/#annex-c-informative-warc-file-size-and-name-recommendations
[2] https://github.com/commoncrawl/cc-pyspark

Greg Womack

unread,
Mar 20, 2024, 4:23:22 PMMar 20
to common...@googlegroups.com
> Our example project cc-pyspark [2] downloads the WARC/WAT/WET files to
a temporary file and then uses a WARC parser (warcio or fastwarc) to
process them record by record (and also decompressing record by record).

Thanks for the help, I think I get it. Have Spark read the gzipped files as binary, without decompressing, then decompress and process record by record using a WARC processor. 

My current approach was to read the files as text files, and have Spark decompress them automatically:

common_crawl_segments = spark.sparkContext.wholeTextFiles(
f"s3a://commoncrawl/crawl-data/{crawl}/segments/1695233505362.29/wet/CC-MAIN-20230921073711-20230921103711-*.warc.wet.gz",
minPartitions=200
)
pages_rdd = common_crawl_segments.flatMap(lambda file_content: split_segment_by_page(file_content[1]))

This was blowing up because Spark was trying to decompress 8 large files at a time in memory. I'll try loading the files using sparkContext.binaryFiles and then process them using a WARC processor.

Thanks,

Greg
--
You received this message because you are subscribed to the Google Groups "Common Crawl" group.
To unsubscribe from this group and stop receiving emails from it, send an email to common-crawl...@googlegroups.com.
Reply all
Reply to author
Forward
0 new messages