コードを雑に読むアプローチでScrapyを入門する
2017/01/15
Scrapyはスクレイピング用フレームワークなので、登場人物多すぎてよく分からない。
彼らの関係性や役割を理解を深めるために『Data flow図』の順にそってコードを読んだ。
ちなみに、前提知識として下記をおさえておくとフレームワークへの理解が早まる。
- pythonのジェネレーター、
yield
文 twisted
のDeffered- javascriptで言うところ(と、括っていいのかは知らんが)のpromiseみたいなモノ
twisted
のinlineCallback- コード中のあちこちで登場するので理解しておくと混乱せずにコード読める
- http://skitazaki.appspot.com/translation/twisted-intro-ja/p17.html
Contents
- 1 The Engine gets the initial Requests to crawl from the Spider.
- 2 The Engine schedules the Requests in the Scheduler and asks for the next Requests to crawl.
- 3 The Engine sends the Requests to the Downloader, passing through the Downloader Middlewares (see process_request()).
- 4 Once the page finishes downloading the Downloader generates a Response (with that page) and sends it to the Engine, passing through the Downloader Middlewares (see process_response()).
- 5 The Engine receives the Response from the Downloader and sends it to the Spider for processing, passing through the Spider Middleware (see process_spider_input()).
- 6 The Spider processes the Response and returns scraped items and new Requests (to follow) to the Engine, passing through the Spider Middleware (see process_spider_output()).
- 7 The Engine sends processed items to Item Pipelines, then send processed Requests to the Scheduler and asks for possible next Requests to crawl.
- 8 The process repeats (from step 1) until there are no more requests from the Scheduler.
- 9 終わりに
The Engine gets the initial Requests to crawl from the Spider.
まず、EngineはSpiderから『どこからクロールを始めるか?』をという情報を得るらしい。
Crawlerというクラスを介して、Spiderが保持する初期リクエストのリストをEngineのopen_spiderメソッドの引数として渡している。
The Engine schedules the Requests in the Scheduler and asks for the next Requests to crawl.
次に、Engineは初期リクエストのリストをSchedulerに渡しつつ、Schedulerに『次にクローリングすべきリクエストはどれか?』とたずねる。
これは先に登場したopen_spider
メソッドの内部で行われている。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 |
@defer.inlineCallbacks def open_spider(self, spider, start_requests=(), close_if_idle=True): assert self.has_capacity(), "No free spider slot when opening %r" % \ spider.name logger.info("Spider opened", extra={'spider': spider}) nextcall = CallLaterOnce(self._next_request, spider) scheduler = self.scheduler_cls.from_crawler(self.crawler) start_requests = yield self.scraper.spidermw.process_start_requests(start_requests, spider) slot = Slot(start_requests, close_if_idle, nextcall, scheduler) self.slot = slot self.spider = spider yield scheduler.open(spider) yield self.scraper.open_spider(spider) self.crawler.stats.open_spider(spider) yield self.signals.send_catch_log_deferred(signals.spider_opened, spider=spider) slot.nextcall.schedule() slot.heartbeat.start(5) |
ここから如何にしてSchedulerにリクエストを詰めているのか、パッと見では全く分からない。
リクエストがSchedulerに詰まれるまで複数のメソッドを辿ることになる。
突然だが、CallLaterOnceクラスを見る必要がある。
open_spiderメソッド内ではself._next_request
とspider
を渡してnextcallという名でインスタンスを生成している。
後にslot.nextcall.schedule()
という形でインスタンスのメソッドが呼び出される。
schedule()
は何をやっているのか?
CallLaterOnceクラスの定義を見てみよう。
CallLaterOnceとは?
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 |
class CallLaterOnce(object): """Schedule a function to be called in the next reactor loop, but only if it hasn't been already scheduled since the last time it ran. """ def __init__(self, func, *a, **kw): self._func = func self._a = a self._kw = kw self._call = None def schedule(self, delay=0): if self._call is None: self._call = reactor.callLater(delay, self) def cancel(self): if self._call: self._call.cancel() def __call__(self): self._call = None return self._func(*self._a, **self._kw) |
よく分からないですがschedule
っていうメソッドを呼ぶと、reactor(twisted用語、イベントループみたいなもん)に対して『コールバック(selfなので自インスタンス)をdelay
秒後に実行してくれよ。』と依頼します。
このとき、_callにreactor.callLater
の返り値を格納しています。
連続で呼び出されたとしても_callが空じゃないかぎりはスケジューリングが実行されないわけですね。
ここがCallLaterOnceってことなのかな。知らんけど。
で、delay
秒後に__call__
が呼び出されるわけ。(pythonの文法で、自クラスのインスタンスを関数のように扱うと、定義されたcallが呼ばれるらしいよ)
インスタンス生成時に渡された関数func
を実行するわけですな。
nextcall = CallLaterOnce(self._next_request, spider)
なので
nextcall.schedule(delay=XXX)
と呼び出すと、XXX秒後にself._next_request(spider)
が実行されるわけです。
self._next_requestとは
engine.pyに戻る。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 |
def _next_request(self, spider): slot = self.slot if not slot: return if self.paused: return while not self._needs_backout(spider): if not self._next_request_from_scheduler(spider): break if slot.start_requests and not self._needs_backout(spider): try: request = next(slot.start_requests) except StopIteration: slot.start_requests = None except Exception: slot.start_requests = None logger.error('Error while obtaining start requests', exc_info=True, extra={'spider': spider}) else: self.crawl(request, spider) if self.spider_is_idle(spider) and slot.close_if_idle: self._spider_idle(spider) |
メソッドを見渡してみると、next(slot.start_requests)
で初期リクエスト群を取り出している箇所がありますね。
1 2 3 4 5 6 7 8 9 10 11 |
if slot.start_requests and not self._needs_backout(spider): try: request = next(slot.start_requests) except StopIteration: slot.start_requests = None except Exception: slot.start_requests = None logger.error('Error while obtaining start requests', exc_info=True, extra={'spider': spider}) else: self.crawl(request, spider) |
このリクエストをSchedulerに突っ込むのかな…と思ったら、self.craw(request, spider)
呼び出してます。
メソッドを追いかけます。
1 2 3 4 5 6 7 8 9 10 11 12 |
def crawl(self, request, spider): assert spider in self.open_spiders, \ "Spider %r not opened when crawling: %s" % (spider.name, request) self.schedule(request, spider) self.slot.nextcall.schedule() def schedule(self, request, spider): self.signals.send_catch_log(signal=signals.request_scheduled, request=request, spider=spider) if not self.slot.scheduler.enqueue_request(request): self.signals.send_catch_log(signal=signals.request_dropped, request=request, spider=spider) |
crawl
の中ではschedule
を呼び出して、再びself.slot.nextcall.schedule()
が呼び出されています。
少なくともstart_requests
の個数だけself._next_request
が呼び出されることがわかりました。
schedule
の中でSchedulerインスタンスのenqueue_request
を実行してrequestをキューに詰め込んでいます。
ここまでで
The Engine schedules the Requests in the Scheduler and asks for the next Requests to crawl.
のうち、
The Engine schedules the Requests in the Scheduler
が完了しました。
and asks for the next Requests to crawl.
は、どこでたずねているのでしょうか?
_next_request
の下記がそれに該当します。
1 2 3 |
while not self._needs_backout(spider): if not self._next_request_from_scheduler(spider): break |
_needs_backout(spider)
が偽である(=取り消す必要がない)限り、無限ループしています。
ループの中では_next_request_from_scheduler(spider)
を実行し続けています。
メソッドの名前からして、Schedulerが保持するリクエストキューから次に実行すべきリクエストを取り出しているように見えますね。
で、このメソッドの返り値がFalseだったら無限ループから脱出しています。
おそらく返り値にリクエストの存在の有無を返しているのでしょうか?
次のリクエストが存在しなければ無限ループに滞留する理由はないですね。
_needs_backout(spider)
の詳細には立ち入らず、_next_request_from_scheduler(spider)
を細かく見ます。
_next_request_from_schedulerとは?
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 |
def _next_request_from_scheduler(self, spider): slot = self.slot request = slot.scheduler.next_request() if not request: return d = self._download(request, spider) d.addBoth(self._handle_downloader_output, request, spider) d.addErrback(lambda f: logger.info('Error while handling downloader output', exc_info=failure_to_exc_info(f), extra={'spider': spider})) d.addBoth(lambda _: slot.remove_request(request)) d.addErrback(lambda f: logger.info('Error while removing request from slot', exc_info=failure_to_exc_info(f), extra={'spider': spider})) d.addBoth(lambda _: slot.nextcall.schedule()) d.addErrback(lambda f: logger.info('Error while scheduling new request', exc_info=failure_to_exc_info(f), extra={'spider': spider})) return d |
メソッドを見れば一目瞭然ですが、下記の
and asks for the next Requests to crawl.
は
slot.scheduler.next_request()
で実現されています。
requestが存在しなければreturnしています。
これによって、呼び出し元のif文がFalseとなり無限ループからbreakします。
requestが存在すると、_downloadが実行されます。
これは次に紹介する『EngineがDownloaderにRequestを送りつける(Downloaderにコンテンツのダウンロードを依頼する)』部分の処理になります。
The Engine sends the Requests to the Downloader, passing through the Downloader Middlewares (see process_request()).
Data flow図の通り、RequestをDownloaderのミドルウェアで包んでからDownloaderに送りつけます。
前項の最後に軽く触れた、_downloadメソッドの中身を見ます。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 |
def _download(self, request, spider): slot = self.slot slot.add_request(request) def _on_success(response): assert isinstance(response, (Response, Request)) if isinstance(response, Response): response.request = request # tie request to response received logkws = self.logformatter.crawled(request, response, spider) logger.log(*logformatter_adapter(logkws), extra={'spider': spider}) self.signals.send_catch_log(signal=signals.response_received, \ response=response, request=request, spider=spider) return response def _on_complete(_): slot.nextcall.schedule() return _ dwld = self.downloader.fetch(request, spider) dwld.addCallbacks(_on_success) dwld.addBoth(_on_complete) return dwld |
Downloaderクラスのfetchメソッドがそれっぽいですね。
fetchメソッドを見てみると…
1 2 3 4 5 6 7 8 |
def fetch(self, request, spider): def _deactivate(response): self.active.remove(request) return response self.active.add(request) dfd = self.middleware.download(self._enqueue_request, request, spider) return dfd.addBoth(_deactivate) |
ここでmiddlewareが出てきます。
middlewareの実体はDownloaderMiddlewareManagerクラスのインスタンスです。
downloadメソッドを見てみましょう。
ここでやってることは、Data flow図の通りですが下記の3点をdeferredにまとめています。
- Requestのダウンロード前に、与えられたミドルウェア群のprocess_requestを実行する
- Requestのダウンロード後に、与えられたミドルウェア群のprocess_responseを実行する
- Requestのダウンロード中に例外が発生したら、与えられたミドルウェア群のprocess_exceptionを実行する
ここで特に注目すべきはprocess_requestの中身です。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 |
def download(self, download_func, request, spider): @defer.inlineCallbacks def process_request(request): # settings.pyで設定されたミドルウェア群の`process_request`を逐次実行してゆく for method in self.methods['process_request']: # ミドルウェアの実行結果 response = yield method(request=request, spider=spider) # ミドルウェアの返り値としてNoneかResponseもしくはRequestオブジェクトを期待している assert response is None or isinstance(response, (Response, Request)), \ 'Middleware %s.process_request must return None, Response or Request, got %s' % \ (six.get_method_self(method).__class__.__name__, response.__class__.__name__) # レスポンスが得られたら、以降の処理は中断して値を含んだdeferを返す if response: defer.returnValue(response) # すべてのミドルウェアの返り値がNoneであれば、download_funcを実行した結果をdeferとして返す defer.returnValue((yield download_func(request=request,spider=spider))) |
The Engine sends the Requests to the Downloader, passing through the Downloader Middlewares (see process_request()).
は、このprocess_request
の中で行われていることが分かりました。
もう少しだけ深追いして、実際にRequestの内容がダウンロードされる様子を見てみます。
先のdonwload_func
がそれっぽいですね。
download_funcの実体はDownloaderクラスのdownloadメソッドの引数として与えられた…
1 |
dfd = self.middleware.download(self._enqueue_request, request, spider) |
_enqueue_request
メソッドです。
そこから、詳細には触れませんが_process_queue
…_download
と流れていきます。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 |
def _enqueue_request(self, request, spider): key, slot = self._get_slot(request, spider) request.meta['download_slot'] = key def _deactivate(response): slot.active.remove(request) return response slot.active.add(request) deferred = defer.Deferred().addBoth(_deactivate) slot.queue.append((request, deferred)) self._process_queue(spider, slot) return deferred def _process_queue(self, spider, slot): if slot.latercall and slot.latercall.active(): return # Delay queue processing if a download_delay is configured now = time() delay = slot.download_delay() if delay: penalty = delay - now + slot.lastseen if penalty > 0: slot.latercall = reactor.callLater(penalty, self._process_queue, spider, slot) return # Process enqueued requests if there are free slots to transfer for this slot while slot.queue and slot.free_transfer_slots() > 0: slot.lastseen = now request, deferred = slot.queue.popleft() dfd = self._download(slot, request, spider) dfd.chainDeferred(deferred) # prevent burst if inter-request delays were configured if delay: self._process_queue(spider, slot) break def _download(self, slot, request, spider): # The order is very important for the following deferreds. Do not change! # 1. Create the download deferred dfd = mustbe_deferred(self.handlers.download_request, request, spider) # 2. Notify response_downloaded listeners about the recent download # before querying queue for next request def _downloaded(response): self.signals.send_catch_log(signal=signals.response_downloaded, response=response, request=request, spider=spider) return response dfd.addCallback(_downloaded) # 3. After response arrives, remove the request from transferring # state to free up the transferring slot so it can be used by the # following requests (perhaps those which came from the downloader # middleware itself) slot.transferring.add(request) def finish_transferring(_): slot.transferring.remove(request) self._process_queue(spider, slot) return _ return dfd.addBoth(finish_transferring) |
_download内の初っ端の処理でダウンロードを行っています。
1 |
dfd = mustbe_deferred(self.handlers.download_request, request, spider) |
このself.handlers.download_request
ってヤツですね。
self.handlersを初期化している箇所を見てみるとDownloadHandlersというクラスが現れます。
このクラスのdownload_request
というメソッドを呼び出しているので定義元を見てみると…
1 2 3 4 5 6 7 |
def download_request(self, request, spider): scheme = urlparse_cached(request).scheme handler = self._get_handler(scheme) if not handler: raise NotSupported("Unsupported URL scheme '%s': %s" % (scheme, self._notconfigured[scheme])) return handler.download_request(request, spider) |
パッと見、リクエストのスキーマに対応するハンドラを取得、そのハンドラのdownload_requestを呼び出してコンテンツをGETする…って感じでしょうか。
scrapyでは標準でいくつかのハンドラを用意してくれています。
ポピュラーなhttp(1.1)ハンドラのdownload_requestメソッドを見てみます。
1 2 3 4 5 6 |
def download_request(self, request, spider): """Return a deferred for the HTTP download""" agent = ScrapyAgent(contextFactory=self._contextFactory, pool=self._pool, maxsize=getattr(spider, 'download_maxsize', self._default_maxsize), warnsize=getattr(spider, 'download_warnsize', self._default_warnsize)) return agent.download_request(request) |
それっぽいですね。
Once the page finishes downloading the Downloader generates a Response (with that page) and sends it to the Engine, passing through the Downloader Middlewares (see process_response()).
passing through the Downloader Middlewares (see process_response()).
と、書かれてるようにrequestを投げた時と同じく、responseを得るときにもミドルウェアを通します。
例によってDownloaderMiddlewareManagerを見てみると。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 |
@defer.inlineCallbacks def process_response(response): assert response is not None, 'Received None in process_response' if isinstance(response, Request): defer.returnValue(response) for method in self.methods['process_response']: response = yield method(request=request, response=response, spider=spider) assert isinstance(response, (Response, Request)), \ 'Middleware %s.process_response must return Response or Request, got %s' % \ (six.get_method_self(method).__class__.__name__, type(response)) if isinstance(response, Request): defer.returnValue(response) defer.returnValue(response) |
request時と同じような構造ですね。
で、各メソッドのreturnを辿ってゆくと、engine.pyに帰ってきます。
具体的には_next_request_from_scheduler
です。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 |
def _next_request_from_scheduler(self, spider): slot = self.slot request = slot.scheduler.next_request() if not request: return d = self._download(request, spider) d.addBoth(self._handle_downloader_output, request, spider) d.addErrback(lambda f: logger.info('Error while handling downloader output', exc_info=failure_to_exc_info(f), extra={'spider': spider})) d.addBoth(lambda _: slot.remove_request(request)) d.addErrback(lambda f: logger.info('Error while removing request from slot', exc_info=failure_to_exc_info(f), extra={'spider': spider})) d.addBoth(lambda _: slot.nextcall.schedule()) d.addErrback(lambda f: logger.info('Error while scheduling new request', exc_info=failure_to_exc_info(f), extra={'spider': spider})) return d |
The Engine receives the Response from the Downloader and sends it to the Spider for processing, passing through the Spider Middleware (see process_spider_input()).
DownloaderからResponseを受け取って、Spiderに横流ししてスクレイピングさせる。
Downloaderと同様に、ResponseはSpider Middlewareを通してからSpiderに渡される。
Engineは受け取ったResponseを_handle_downloader_output
に渡します。
1 2 3 4 5 6 7 8 9 10 11 12 |
def _handle_downloader_output(self, response, request, spider): assert isinstance(response, (Request, Response, Failure)), response # downloader middleware can return requests (for example, redirects) if isinstance(response, Request): self.crawl(response, spider) return # response is a Response or Failure d = self.scraper.enqueue_scrape(response, request, spider) d.addErrback(lambda f: logger.error('Error while enqueuing downloader output', exc_info=failure_to_exc_info(f), extra={'spider': spider})) return d |
Downloaderから帰ってきたResponseの型がRequestだったら再度crawlさせています。
コメントにも書いてあるように、Downloaderミドルウェアを通したら『リダイレクトすべき』と判断されてリダイレクト先のRequestがResponse(紛らわしい)として帰ってきた場合なんかが考えられます。
ここでは、responseはResponse型(紛らわしい)とします。
すると先の処理であるself.scraper.enqueue_scrape
に進むわけです。
Scraperクラスのenqueue_scrape
メソッドを見てみましょう。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 |
def enqueue_scrape(self, response, request, spider): slot = self.slot dfd = slot.add_response_request(response, request) def finish_scraping(_): slot.finish_response(response, request) self._check_if_closing(spider, slot) self._scrape_next(spider, slot) return _ dfd.addBoth(finish_scraping) dfd.addErrback( lambda f: logger.error('Scraper bug processing %(request)s', {'request': request}, exc_info=failure_to_exc_info(f), extra={'spider': spider})) self._scrape_next(spider, slot) return dfd |
slotにscrape対象のresponseとrequestを詰めてます。
実際にスクレイピングしてる処理は_scrape_next
メソッドっぽいですね。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 |
def _scrape_next(self, spider, slot): while slot.queue: response, request, deferred = slot.next_response_request_deferred() self._scrape(response, request, spider).chainDeferred(deferred) def _scrape(self, response, request, spider): """Handle the downloaded response or failure through the spider callback/errback""" assert isinstance(response, (Response, Failure)) dfd = self._scrape2(response, request, spider) # returns spiders processed output dfd.addErrback(self.handle_spider_error, request, response, spider) dfd.addCallback(self.handle_spider_output, request, response, spider) return dfd def _scrape2(self, request_result, request, spider): """Handle the different cases of request's result been a Response or a Failure""" if not isinstance(request_result, Failure): return self.spidermw.scrape_response( self.call_spider, request_result, request, spider) else: # FIXME: don't ignore errors in spider middleware dfd = self.call_spider(request_result, request, spider) return dfd.addErrback( self._log_download_errors, request_result, request, spider) |
メソッドを辿っていくと_scrape2
という不思議な名前のメソッドの中で、spidermw(Spider Middleware)のscrape_responseを呼び出しています。
spidermwの定義元を見てみると、SpiderMiddlewareManagerというクラスのメソッドであることが分かります。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 |
def scrape_response(self, scrape_func, response, request, spider): fname = lambda f:'%s.%s' % ( six.get_method_self(f).__class__.__name__, six.get_method_function(f).__name__) def process_spider_input(response): for method in self.methods['process_spider_input']: try: result = method(response=response, spider=spider) assert result is None, \ 'Middleware %s must returns None or ' \ 'raise an exception, got %s ' \ % (fname(method), type(result)) except: return scrape_func(Failure(), request, spider) return scrape_func(response, request, spider) def process_spider_exception(_failure): exception = _failure.value for method in self.methods['process_spider_exception']: result = method(response=response, exception=exception, spider=spider) assert result is None or _isiterable(result), \ 'Middleware %s must returns None, or an iterable object, got %s ' % \ (fname(method), type(result)) if result is not None: return result return _failure def process_spider_output(result): for method in self.methods['process_spider_output']: result = method(response=response, result=result, spider=spider) assert _isiterable(result), \ 'Middleware %s must returns an iterable object, got %s ' % \ (fname(method), type(result)) return result dfd = mustbe_deferred(process_spider_input, response) dfd.addErrback(process_spider_exception) dfd.addCallback(process_spider_output) return dfd |
DownloaderMiddlewareManagerと同じ構造ですね。
- Spiderのスクレイピング前に、与えられたミドルウェア群のprocess_spider_inputを実行する
- Spiderのスクレイピング後に、与えられたミドルウェア群のprocess_spider_outputを実行する
- Spiderのスクレイピング中に、与えられたミドルウェア群のprocess_spider_exceptionを実行する
The Spider processes the Response and returns scraped items and new Requests (to follow) to the Engine, passing through the Spider Middleware (see process_spider_output()).
スパイダーはレスポンスを処理して、スクレイピングしたアイテムと新しいRequestをSpider Middlewareを通じてEngineに返す。
process_spider_input
の終わりにscrape_funcを呼んでいます。
これはscrape_response
の引数として与えられたモノです。
呼び出し元を見てみるとcall_spider
というメソッドであることが分かります。
1 2 3 4 5 |
def call_spider(self, result, request, spider): result.request = request dfd = defer_result(result) dfd.addCallbacks(request.callback or spider.parse, request.errback) return dfd.addCallback(iterate_spider_output) |
addCallbacksでrequestのcallbackかspiderのparseメソッドのいずれか存在する方をdeferのコールバックとしています。
ここではspider.parseが呼ばれることにします。
parseメソッドの中身はユーザーが好き勝手に書くところです。
例えば、下記のような感じです。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 |
class QuotesSpider(scrapy.Spider): name = "quotes" start_urls = [ 'http://quotes.toscrape.com/page/1/', ] def parse(self, response): for quote in response.css('div.quote'): yield { 'text': quote.css('span.text::text').extract_first(), 'author': quote.css('span small::text').extract_first(), 'tags': quote.css('div.tags a.tag::text').extract(), } next_page = response.css('li.next a::attr(href)').extract_first() if next_page is not None: next_page = response.urljoin(next_page) yield scrapy.Request(next_page, callback=self.parse) |
parseメソッドの返り値は上記のようなgeneratorオブジェクトとなります(ユーザーの作り次第で返り値変わるけど)。
最後のiterate_spider_output
はparseメソッドの返り値をすべからくiterableにするための処理です。
これは次に行う処理のための下準備となります。
メソッドを辿ってゆくとこんな感じ。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 |
return dfd.addCallback(iterate_spider_output) // https://github.com/scrapy/scrapy/blob/master/scrapy/utils/spider.py def iterate_spider_output(result): return arg_to_iter(result) // https://github.com/scrapy/scrapy/blob/master/scrapy/utils/misc.py _ITERABLE_SINGLE_VALUES = dict, BaseItem, six.text_type, bytes def arg_to_iter(arg): """Convert an argument to an iterable. The argument can be a None, single value, or an iterable. Exception: if arg is a dict, [arg] will be returned """ if arg is None: return [] elif not isinstance(arg, _ITERABLE_SINGLE_VALUES) and hasattr(arg, '__iter__'): return arg else: return [arg] |
これらの処理の結果をSpider Middlewareのprocess_spider_outputに渡します。
Downloaderの流れと同じなので詳細は省略します。
これでResponseをSpiderによってパースした結果を得ることができました。
The Engine sends processed items to Item Pipelines, then send processed Requests to the Scheduler and asks for possible next Requests to crawl.
call_spider
の呼び出し元は_scrape2
でさらにその呼び出し元は_scrape
でした。
先述の通り、この段階でSpider Middlewareを通したナニカを得ています。
このナニカにはBaseItemというクラスのオブジェクトだったりプレーンなdictだったり、はたまたRequestオブジェクトを含んだジェネレーターやリストだったりします。この辺の中身はSpiderのparseメソッドの中身や、Spider Middlewareに依存します。
例えば、先にあげたQuotesSpider
のparse
はdictやRequestを返すジェネレーターを返します。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 |
class QuotesSpider(scrapy.Spider): name = "quotes" start_urls = [ 'http://quotes.toscrape.com/page/1/', ] def parse(self, response): for quote in response.css('div.quote'): yield { 'text': quote.css('span.text::text').extract_first(), 'author': quote.css('span small::text').extract_first(), 'tags': quote.css('div.tags a.tag::text').extract(), } next_page = response.css('li.next a::attr(href)').extract_first() if next_page is not None: next_page = response.urljoin(next_page) yield scrapy.Request(next_page, callback=self.parse) |
この多様なナニカを処理してゆくのが次のhandle_spider_output
メソッドです。
1 |
dfd.addCallback(self.handle_spider_output, request, response, spider) |
1 2 3 4 5 6 7 |
def handle_spider_output(self, result, request, response, spider): if not result: return defer_succeed(None) it = iter_errback(result, self.handle_spider_error, request, response, spider) dfd = parallel(it, self.concurrent_items, self._process_spidermw_output, request, response, spider) return dfd |
iter_errbackは与えられたナニカ(iterableなオブジェクト)とエラー処理を結びつけるための関数です。
先述のarg_to_iterはこの辺を円滑にするために必要だったわけですね。
1 2 3 4 5 6 7 8 9 10 11 12 |
def iter_errback(iterable, errback, *a, **kw): """Wraps an iterable calling an errback if an error is caught while iterating it. """ it = iter(iterable) while True: try: yield next(it) except StopIteration: break except: errback(failure.Failure(), *a, **kw) |
次はparallelです。
1 2 3 4 5 6 7 8 |
def parallel(iterable, count, callable, *args, **named): """Execute a callable over the objects in the given iterable, in parallel, using no more than ``count`` concurrent calls. Taken from: http://jcalderone.livejournal.com/24285.html """ coop = task.Cooperator() work = (callable(elem, *args, **named) for elem in iterable) return defer.DeferredList([coop.coiterate(work) for _ in range(count)]) |
Cooperatorの動作がイマイチ理解できてないのでよく分からないですが、引数として与えられたcallableをparallelに実行する(同時に実行する数を制御する、ことが目的?)ための関数のようです。
callableとして_process_spidermw_output
が与えられています。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 |
def _process_spidermw_output(self, output, request, response, spider): """Process each Request/Item (given in the output parameter) returned from the given spider """ if isinstance(output, Request): self.crawler.engine.crawl(request=output, spider=spider) elif isinstance(output, (BaseItem, dict)): self.slot.itemproc_size += 1 dfd = self.itemproc.process_item(output, spider) dfd.addBoth(self._itemproc_finished, output, response, spider) return dfd elif output is None: pass else: typename = type(output).__name__ logger.error('Spider must return Request, BaseItem, dict or None, ' 'got %(typename)r in %(request)s', {'request': request, 'typename': typename}, extra={'spider': spider}) |
outputの型に応じて処理が分岐されています。
Request型だったら再びクローリングします。
BaseItemやdictだったらpipelineに渡して処理してもらいます。
Noneだったら何もしない。
それ以外だったらエラーを吐く。
先にあげたQuoteSpiderのparseメソッドを例にoutputがどんな感じになるか見てみます。
1 2 3 4 5 6 7 8 9 10 11 12 |
def parse(self, response): for quote in response.css('div.quote'): yield { 'text': quote.css('span.text::text').extract_first(), 'author': quote.css('span small::text').extract_first(), 'tags': quote.css('div.tags a.tag::text').extract(), } next_page = response.css('li.next a::attr(href)').extract_first() if next_page is not None: next_page = response.urljoin(next_page) yield scrapy.Request(next_page, callback=self.parse) |
ご覧の通り、parseメソッドはgeneratorを返しています。
で、このgeneratorをparallelを通じて逐次処理してゆくわけです。
最初のyieldではtextやauthorを含んだdictを返しています。
で、dictを取りきった後は『次のページのURL』を抱えたRequestオブジェクトを返していますね。
[dict, dict, dict, … , Request]
リストでイメージすると↑のような感じでしょうか(このイメージが適切じゃないかもしれないが)
で、_process_spidermw_output
を逐次やっていく。って感じです。
ここではoutputがBaseItemやdictであるとします。
BaseItemやdictの場合はpipelineを通して加工するなり…とにかく、何かしらします。
piplineの管理を行っているのはItemPipelineManagerクラスです。(デフォの設定を見ると分かります。)
このItemPipelineManagerのprocess_itemメソッドを呼んでいます。
クラスの定義を見てみます。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 |
class ItemPipelineManager(MiddlewareManager): component_name = 'item pipeline' @classmethod def _get_mwlist_from_settings(cls, settings): return build_component_list(settings.getwithbase('ITEM_PIPELINES')) def _add_middleware(self, pipe): super(ItemPipelineManager, self)._add_middleware(pipe) if hasattr(pipe, 'process_item'): self.methods['process_item'].append(pipe.process_item) def process_item(self, item, spider): return self._process_chain('process_item', item, spider) |
質素。
MiddlewareManager
の_process_chain
を見ると…
1 2 |
def _process_chain(self, methodname, obj, *args): return process_chain(self.methods[methodname], obj, *args) |
で、process_chain
を見ると…
1 2 3 4 5 6 7 |
def process_chain(callbacks, input, *a, **kw): """Return a Deferred built by chaining the given callbacks""" d = defer.Deferred() for x in callbacks: d.addCallback(x, *a, **kw) d.callback(input) return d |
callbacks(ItemPipelineたち)をdeferredに詰めて、itemを引数にcallbackを呼んでItemPipelineたちの処理を発火してやる。
dにはItemPipelineを経由したナニカが格納されてそうですね。
次の処理はScraperに書かれています。
1 |
dfd.addBoth(self._itemproc_finished, output, response, spider) |
_itemproc_finished
の中身はどうなってるでしょうか。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 |
def _itemproc_finished(self, output, item, response, spider): """ItemProcessor finished for the given ``item`` and returned ``output`` """ self.slot.itemproc_size -= 1 if isinstance(output, Failure): ex = output.value if isinstance(ex, DropItem): logkws = self.logformatter.dropped(item, ex, response, spider) logger.log(*logformatter_adapter(logkws), extra={'spider': spider}) return self.signals.send_catch_log_deferred( signal=signals.item_dropped, item=item, response=response, spider=spider, exception=output.value) else: logger.error('Error processing %(item)s', {'item': item}, exc_info=failure_to_exc_info(output), extra={'spider': spider}) else: logkws = self.logformatter.scraped(output, response, spider) logger.log(*logformatter_adapter(logkws), extra={'spider': spider}) return self.signals.send_catch_log_deferred( signal=signals.item_scraped, item=output, response=response, spider=spider) |
outputがFailure
(失敗)だったらエラーメッセージを表示します。
それ以外だったらoutputに関するメッセージを表示しています。
The process repeats (from step 1) until there are no more requests from the Scheduler.
Schedulerの中身が空になるまで一連の処理をやり続けます。
終わりに
大体の流れが分かった。
deferredだらけの世界なのでメソッドを潜っては浮上して、再び潜る…の繰り返しでコードを追うのが大変。
twistedにベッタリ依存した実装なので、twistedのボキャブラリーを知らないと意味不明になっちゃう。
336px
336px
関連記事
-
-
フロントエンド開発のメモ
最近 …
-
-
scrapy実行時のエラー対処
Ma …
-
-
RDSの特定のデータベースをダンプする
Pu …
-
-
DockerでNginxしたい
Co …
-
-
FlutterでListViewしたい
こん …
-
-
Bower再入門
Co …