★0
start_requests = yield self.scraper.spidermw.process_start_requests(start_requests, spider)
|
|generator +----------------------------------------------------------------------------+
| | |
| | def _runCallbacks(self): |
| | ... |
| | while current.callbacks: |
| | item = current.callbacks.pop(0) |
| | callback, args, kw = item[ |
| | isinstance(current.result, failure.Failure)] |
| | ... |
| current.result | try: |
+--------------------+-------------------------------- current.result = callback(current.result, *args, **kw) |
| | ... |
| | else: |
| | if isinstance(current.result, Deferred): |
| | ... |
| +----------------------------------------------------------------------------+
| ^
| |
| | d.callback(input)
| |
| generator |
v scrapy.utils.defer.process_chain() |
Crawl51JobSpiderMiddleware.process_start_requests() +------------------------------------------------+
+------------------------------------------------------------+ | ★4 |
| ★5 | | def process_chain(callbacks, input, *a, **kw):|
| def process_start_requests(self, start_requests, spider): | x | d = defer.Deferred() |
| for r in start_requests: | <------------- for x in callbacks: |
| yield r | | d.addCallback(x, *a, **kw) |
+------------------------------------------------------------+ | d.callback(input) |
| return d |
+------------------------------------------------+
|
| |
m ---> _process_chain() |
| \ |
| \ |
| +-----------------------------------------------------------------+
MiddlewareManager ---+ | def _process_chain(self, methodname, obj, *args): ★3 |
△ | | return process_chain(self.methods[methodname], obj, *args) |
| | +-----------------------------------+-----------------------------+
| | |
| | |
| | |
| | v
| Crawl51JobSpiderMiddleware.process_start_requests()
spidermw |
Scraper ◆---------> SpiderMiddlewareManager
|
| ★1
m---> process_start_requests(start_requests)
| /
| /
| +-------------------------------------------------------------------------------------+
| | def process_start_requests(self, start_requests, spider): ★2 |
| | return self._process_chain('process_start_requests', start_requests, spider) |
| +-------------------------------------------------------------------------------------+
命令启动crawl scrapy crawl --nolog s51job -o /tmp/file.csv
1 源码crawl()
crawler.Crawler.crawl():
@defer.inlineCallbacks
def crawl(self, *args, **kwargs):
self.crawling = True
try:
self.spider = self._create_spider(*args, **kwargs)
self.engine = self._create_engine()
start_requests = iter(self.spider.start_requests())
yield self.engine.open_spider(self.spider, start_requests)
yield defer.maybeDeferred(self.engine.start)
...
Scrapy中大量使用Twisted中的延迟机制 参考前文 self.spider: 命令行参数指定的spider,如s51job对应的类S51jobSpider self.engine: 调度执行spider, 驱动spider前行 self.start_requests(): 调用基类的的实现, 方法中使用了yield是个生成器方法, 把start_urls封装城Request对象中
+---------------------------------------------------+
S51jobSpider ------▷ CrawlSpider -----▷ Spider ---| def start_requests(self): |
| start_urls / | ... |
| / | for url in self.start_urls: |
m----> start_requests() | yield Request(url, dont_filter=True) |
| +---------------------------------------------------+
|
2 源码open_spider()
core.engine.ExecutionEngine.open_spider():
@defer.inlineCallbacks
def open_spider(self, spider, start_requests=(), close_if_idle=True):
...
nextcall = CallLaterOnce(self._next_request, spider)
scheduler = self.scheduler_cls.from_crawler(self.crawler)
start_requests = yield self.scraper.spidermw.process_start_requests(start_requests, spider)
slot = Slot(start_requests, close_if_idle, nextcall, scheduler)
self.slot = slot
self.spider = spider
yield scheduler.open(spider)
yield self.scraper.open_spider(spider)
self.crawler.stats.open_spider(spider)
yield self.signals.send_catch_log_deferred(signals.spider_opened, spider=spider)
slot.nextcall.schedule()
slot.heartbeat.start(5)
start_requests: 作为参数时是Spider.start_requests()生成器方法, yield返回变为process_start_requests()生成器方法 self.crawler: 初始化时传过来的scrapy.crawler.Crawler, 启动爬虫对象 scheduler: scrapy.core.scheduler.Scheduler, 对优先级, Memory, Disk队列push/pop调度管理 scrape.spidermw: Core.Scraper.SpiderMiddlewareManager slot.nextcall: 实现__call__()方法的CallLaterOnce类, 该类()调用的是_next_request() slot.heartbeat: twisted.internet.task.LoopingCall(nextcall.schedule)
3 源码CallLaterOnce()
class CallLaterOnce(object):
def __init__(self, func, *a, **kw):
self._func = func
self._a = a
self._kw = kw
self._call = None
def schedule(self, delay=0):
if self._call is None:
self._call = reactor.callLater(delay, self)
def cancel(self):
if self._call:
self._call.cancel()
def __call__(self):
self._call = None
return self._func(*self._a, **self._kw)
self._func(): _next_request() callLater(): 将self(可执行的类)封装到DelayedCall()并加入到以delay时间排序的队列中, 等待执行. 了解更多 call: nextcall.schedule() --> reactor.callLater() --> _next_request()
4 源码LoopingCall()
class LoopingCall:
call = None
running = False
_deferred = None
interval = None
_runAtStart = False
starttime = None
def __init__(self, f, *a, **kw):
self.f = f
self.a = a
self.kw = kw
from twisted.internet import reactor
self.clock = reactor
def start(self, interval, now=True):
self.running = True
deferred = self._deferred = defer.Deferred()
self.starttime = self.clock.seconds()
self.interval = interval
self._runAtStart = now
if now:
self()
else:
self._scheduleFrom(self.starttime)
return deferred
def stop(self):
self.running = False
if self.call is not None:
self.call.cancel()
self.call = None
d, self._deferred = self._deferred, None
d.callback(self)
...
def __call__(self):
def cb(result):
if self.running:
self._scheduleFrom(self.clock.seconds())
else:
d, self._deferred = self._deferred, None
d.callback(self)
def eb(failure):
self.running = False
d, self._deferred = self._deferred, None
d.errback(failure)
self.call = None
d = defer.maybeDeferred(self.f, *self.a, **self.kw)
d.addCallback(cb)
d.addErrback(eb)
...
def _scheduleFrom(self, when):
def howLong():
if self.interval == 0:
return 0
runningFor = when - self.starttime
untilNextInterval = self.interval - (runningFor % self.interval)
if when == when + untilNextInterval:
return self.interval
return untilNextInterval
self.call = self.clock.callLater(howLong(), self)
self.f: nextcall.schedule(): seconds(): 返回当前系统时间秒数(根据平台不同, 实现不同) start(): 启动loop任务, 以interval为循环周期, 执行__call__() __call__(): 调用maybeDeferred()执行self.f 即nextcall.schedule()该函数返回值为None.
5 源码maybeDeferred()
def succeed(result):
d = Deferred()
d.callback(result)
return d
def fail(result=None):
d = Deferred()
d.errback(result)
return d
def maybeDeferred(f, *args, **kw):
try:
result = f(*args, **kw)
except:
return fail(failure.Failure(captureVars=Deferred.debug))
if isinstance(result, Deferred):
return result
elif isinstance(result, failure.Failure):
return fail(result)
else:
return succeed(result)
f(): == nextcall.shedule(), 返回值为None, 所以maybeDeferred()此处返回succeed(result), callback回调执行LoopingCall__call__.cb()
URL调度:
|
m---> _scheduleFrom() <-----------------------------------------+
| | |
| | |
| o---> reactor.callLater(interval, self) | loop-2
| |
m---> start(interval) |
| | |
| | |
LoopingCall ------+ o---> self()--+ |
^ | | |
| | | +-----> self._scheduleFrom(interval)--+
| m---> __call__() <---+ |
| | | |
| | | +----> errback()
| | cb-------+ |
| | eb-------------------+
| o ---> defer.maybeDeferred(self.f)
heartbeat | | | is
◆ nextcall |
Slot ◆-----------> CallLaterOnce |
◆ | |
| | ★1 |
| m ---> schedule() <-------------+---------------+
| | | |
v | | |
start_requests | o ---> reactor.callLater(0, self) |
| | | |
| call | |
m ---> __call__() <-----------+ |
| | |
| | ★2 |
o ---> self._func() |
| | | |
| ★3 | |
m ---> _next_request() <--------------------+ is |
| | |
| call | |
ExecutionEngine --+ +---o |
| | | |
| | | ★4 |
| | o---> request = next(slot.start_requests) |
| | | |
| | | call |
download() <--- m | o-------------------------------+ |
| | | | |
| | | |
| | | |
| | | | loop-1
| v ★8 | |
m ---> _next_request_from_scheduler() | |
| | | |
| | ★9 | |
| o---> slot.scheduler.next_request() | |
| | | |
| | ★10 | |
| o---> self._download() | |
| | | |
| | |
m ---> crawl() <----------------------------+ |
| | |
| | ★5 |
| o---> self.schedule() |
| | | |
| | | ★6 |
| | o---> slot.scheduler.enqueue_request() |
| |
o---> self.slot.nextcall.schedule() -------------------------+
| ★7
|
6 疑问:
在open_spider函数的最后两行代码: > slot.nextcall.schedule() > slot.heartbeat.start(5)
分析源码之后发现这两行的动作有些重复, nextcall.schedule()自身也能实现loop, 如上图(Loop-1), heartbeat.start(5)(Loop-2)是不是多余的? 这个疑问留到以后解决