Testing Async Pipelines / Middlewares

188 views
Skip to first unread message

Dimitris Kouzis - Loukas

unread,
May 28, 2014, 1:44:27 AM5/28/14
to scrapy...@googlegroups.com
Hello,

Let's assume I have a middleware e.g. a pipeline and it is async (uses Deferred) and I would like to write some unit tests for that. What would you suggest as a good way to organise test code and use as much as possible scrapy infrastruct. Scrapy uses trial and I guess it's a good idea to inherit from SiteTest e.g. as in scrapy/tests/test_command_fetch.py. Is this right?

Thanks

Nikolaos-Digenis Karagiannis

unread,
May 28, 2014, 4:25:50 AM5/28/14
to scrapy...@googlegroups.com
Why deferred? Do you want to overcome this http://doc.scrapy.org/en/latest/topics/settings.html#concurrent-items restriction in a specific pipeline or while processing a specific item?
I am asking because I inherited such a pipeline and I am still searching for a justification for deferring the item processing a second time.

Dimitris Kouzis - Loukas

unread,
May 28, 2014, 6:34:16 AM5/28/14
to scrapy...@googlegroups.com
Basically I assume that since the whole architecture is async - you wouldn't like to block e.g. to access a file or any socket operation. So if someone wants to do e.g. an API lookup, I guess it's better to do it asynchronously. For example this is the typical MySQL async example... http://snipplr.com/view/66989/async-twisted-db-pipeline/ ... it doesn't use Deferred but it is async. An example of pipeline with is itself the ImagesPipeline: https://github.com/scrapy/scrapy/blob/master/scrapy/contrib/pipeline/media.py#L38

Nikolaos-Digenis Karagiannis

unread,
May 28, 2014, 7:51:23 AM5/28/14
to scrapy...@googlegroups.com
I think the code from snippets.scrapy.org causes log messages from the deferred to appear after the spider closes and the stats are printed. I can not devote time to create steps to reproduce this right now but a delay in its deferred that causes it to finish after the engine stops should do it.
Anyway, back to the topic, since all the middlewares (pipeline among them) run asynchronously, I thought process_item does not need to create a differed because it is called asynchronously already.
However I tried time.sleep() in process_item() and it seems further yields from spider.parse() did block. Maybe it's a bug or maybe CONCURRENT_ITEMS refers to something else.

Dimitris Kouzis - Loukas

unread,
May 29, 2014, 8:49:35 AM5/29/14
to scrapy...@googlegroups.com
It's kind of unlikely the log messages to be systematically deferred after the close of the spider - maybe under certain conditions.

Yes... that's the whole trick about the async - it's difficult to get the full-extend of it.

I will try to do my best to give an overview:

1. Scrapy uses twisted. On twisted "the default reactor uses a minimum size of 5 and a maximum size of 10" (http://twistedmatrix.com/documents/13.0.0/core/howto/threading.html)

In other words if you manage to have those (max) 10 threads blocked, Scrapy freezes till one of the threads frees up.

2. What do those 10 threads spend their time on? On whatever you assign them to... one thing that runs is the reactor loop. This checks if we have any tasks that have all their dependencies satisfied and makes sure they are executed on an available thread.

3. What are those tasks? Tasks are things (meant to be short - max a few ms) to do. They are meant to be non-blocking i.e. if you want to read a file or a web page this is not a task. A task is to start the read or (quickly) process the data read. Tasks are meant to be chained to make something useful e.g [start the read of a page] -> [process the data and start a write to the database] -> [log that the write was done]. This is what we achieve with deferred. We chain (via callbacks) simple tasks. Those tasks will run on a thread when their conditions are met - in the above case the conditions for the second is that the page load completed and the third that the database write was completed.

4. The standard sequence of tasks is presented on the Scrapy Architecture http://doc.scrapy.org/en/latest/topics/architecture.html ...

So I will simplify a bit. Let's say that you just start a Spider and the initial URL is http://google.com. What Scrapy will do is more or less:

create a Request("http://google.com") then pass this through the Downlader and download middlewares add a few tasks as callbacks to the Request... then get it back to your spider (inside a callback) and call the parse() function where you break your response to Items. Then it's going to group those items (if they are more than one... very often they will be just one per request/page) into groups of CONCURRENT_ITEMS using Cooperator.coiterate (http://twistedmatrix.com/documents/current/api/twisted.internet.task.Cooperator.html) so that those tasks could potentially run in parallel in many threads (out of the original pool of 5-10 threads). Now inside one of those tasks, your pipeline's process_item() is chained etc. etc. .. and at the very end a log message is printed saying that we successfully processed this item (essentially this is the end: https://github.com/scrapy/scrapy/blob/c0a72e9c24121b2b387cafb3c82d403631c2f07d/scrapy/core/scraper.py#L198).

All the above thing happens in a few microseconds... because at the end of that process YOU HAVEN'T RUN any of those tasks. Scrapy just went through the middlewares, downloaders and item pipelines and created something like a movie "film" that describes what has to be done. And then it hands this film to the movie player (a.k.a. Reactor).

Now the reactor starts by loading the page... This is done, internally, by opening a socket, sending an http request header and then (on unix) by calling a "magic" "select" (or equivalent) system call (http://linux.die.net/man/2/select) that allows the Reactor to monitor 100's of open sockets or regular files without spending any processing power. Essentially it's done at operating system level. The application is sleeping waiting for any new data becoming available or a failure or something. This is great! To achieve the same on a traditional application, we would have to either do some ugly looping (processing intensive) and spend CPU time or have 100's of threads where each thread doing something meaningful requires considerable amount of memory. Now we wait for something to happen without consuming anything of the above.

When the read of your page completes and the data is in memory (let's say 10 seconds later - it's a slow server) - "select" unblocks (well... more or less) and the next task is called... this is likely inside the downloader and then some downloader middleware... they do some assignments... increase some counters... then the next downloader middleware... then after a few of them it comes to your spider's parse() method... you do some XPath'ing etc. cool stuff... return/yield some items (Nothing blocking here hopefully - no API calls - no Database Lookups - no File lookups). Then those items ... let's say that you spawn 500 items out of that page... are grouped in batches of 100 (out of your CONCURRENT_ITEMS setting). Now... those 100 items are a set and let's say that Cooperator.coiterate decides to use 3 threads to process them (I'm not 100% sure about the mechanism here but it's something along this line). Those three treads are going to eventually run your pipeline's process_item() method for each of those 100 items of this batch (there are 5 batches). Now let's assume that 99 out of them (which we are lucky to be the first ones processed) are somewhat bad e.g. duplicates - your process_item() drops them immediately by raising a DropItem() exception. This means that the batch is almost finished and 2 out of 3 threads are freed to do other things. For the last of your items though - we have to do a super-expensive operation that takes 20 seconds blocking the completion of the whole batch of 100 items and preventing the beginning of processing the next batch.

What is important... is that from the moment that the data of your page arrived till before this super-expensive operation probably less than 1-2 ms have passed. What has been done is some assignments, some counter increments (stats), some XPath'ing (hopefully fast), and some exception handling. This is great - you've essentially spend almost zero of your server's processing time for this page!

And now comes your expensive pipeline's process_item() and is about to ruin everything. You want to avoid this. What can you do? Well - it depends on what this expensive processing is...

* If it's some I/O operation e.g. database read, file read, remote API call or whatever in this realm - that's communication cost that you will be paying. You will be making a blocking call, blocking this thread (one of your 5 to 10 precious threads) waiting till this 20 second database lookup completes. And - I mean it's almost "unethical" because it's likely that your time isn't even spend on the database processing itself. It is likely spent on your request getting from router to router and queueing in the database's input queue or something and then waiting for a disk somewhere to get to the proper sector and then all the network way back. A real waste of your time. Essentially your application depends on someone else's load on their server. BAD. You double your RAM - nothing changes. You double your cores - nothing changes. It's completely out of your control. But you have alternatives... If it's a database request you can use the async' database API (from twisted.enterprise import adbapi, dbpool.runInteraction()) and then what will happen is the request is going to be send to your remote db and your next task will go back to the "magic" "select" waiting for the db response to arrive without spending any of your resources. If it's a file operation you can use either threads.deferToThread() or the scrapy equevalent of @inthread decorator for your function or fdesc (http://twistedmatrix.com/documents/current/api/twisted.internet.fdesc.html). If you want to use more or less any other operation... do a google for "operation async twisted" and you will find a way... I mean in reality all the things in the world are asynchronous and we "force" them to look like synchronous for a bit easier programming - probably our kids won't have this problem - they will be thinking-asynchronous from day 0. So for everything that is an I/O operation there will be a very efficient async implementation somewhere. Ah - by the way - if your I/O operation is an http/s operation you can use the Request() API of scrapy and return a few Requests() from your item_pipeline(). The somewhat complicated example is in the media pipeline of scrapy itself (https://github.com/scrapy/scrapy/blob/247b2ad59690a1e1b2e89fb0473ec25944392fb2/scrapy/contrib/pipeline/media.py#L38). The Big+ of using Requests is that you reuse all the scrapy functionality and settings e.g. caches etc. Another way is to use crawler.engine.download(request, spider) - and write a bit more twisted oriented Defered() code. In this case @inlineCallbacks is your friend that might help you save a few method definitions noise.

* If your 20 seconds is processing then... there is not much you can do really. If you are doing monte carlo in your process_item()... well this Thread deserves to be blocked. If you do it consistently maybe you would prefer a dedicated server for that (and make this case fall-back on the I/O problem). I have a strong feeling you won't be doing heavy processing here though. If you do - consider storing your item raw... and then throw tons of them on a hadoop cluster or something. I don't think you should be doing lots of heavy processing in process_item().

And that's more or less the end of the story. If you do it "wrong" then you spend for this page 20 * 5 = 100 seconds (since I guess on every one of the 5 batches of 100 items - you have one that requires 20sec of process_item()) and a few miliseconds here and there of one thread's time for each page. Your "wall" time will be 100 seconds + 10 seconds loading = 110 seconds for a single page.

If you do it "right" then you spend a few miliseconds here and there of thread time for each page and that's it. All the waiting is done for free from the operating system's "select". Your total wall time will be a bit more than 20 seconds for your expensive item_process() assuming it can be done scalably in parallel (all your 5 expensive requests will be done in parallel since none will block any of your threads) + 10 seconds for the original load = 30 seconds.

But you don't really care a lot about wall time since the whole point is that you run 100's of requests (actually 16 is your default limit subject to your CONCURRENT_REQUESTS, CONCURRENT_REQUESTS_PER_DOMAIN and CONCURRENT_REQUESTS_PER_IP settings) in parallel so assuming 16 req's based on your throughput it's 30 / 16 = 2 seconds per request (despite the fact that just loading takes 10 seconds). If you start locking threads with synchronous expensive operations in item pipelines or spiders... you won't be able to parallelize and your time per request will skyrocket. 

Well this should be at least a blog post... Disclaimer: I might reuse parts of it on some book or something :)

Keep in mind that the subject is complicated and there are lots of details. Despite that the above description shouldn't be severely inaccurate.

For a bit - maybe better - explanations on async and twisted... read here:

http://twistedmatrix.com/documents/10.2.0/core/howto/deferredindepth.html
http://twistedmatrix.com/documents/current/core/howto/defer.html
http://technicae.cogitat.io/2008/06/async-batching-with-twisted-walkthrough.html
Reply all
Reply to author
Forward
0 new messages