dummy

reactor --> Linux
              |
              |
              v
class EPollReactor(posixbase.PosixReactorBase, posixbase._PollLikeMixin):
                                    △
                                    |
           +------------------------+                                                          _newTimedCalls
           |                                                                                       /  ^
class PosixReactorBase(_SignalReactorMixin, _DisconnectSelectableMixin, ReactorBase):             /   |
                                ^                                           △                    /    |
                                |                                           |                   /     | 存放
              +-----------------+                         +-----------------+                  /      |
              |                                           |                    /---------------       |
class _SignalReactorMixin(object):              class ReactorBase(object):    /                       |
           |                                            |                pack a callable obj          |
           |                                            |--> callLater()  -----------------> class DelayedCall:
           |                                            |                                       /     |
         1 +--> run()                                   |                                      /      |
           |                                            |--> fireSystemEvent("startup")       /       |
           |                                            |       start threadpool             /        |
         2 +--> startRunning()  --->  startRunning() <--+                                   /         +--> getTime()
           |                                            |                          --------/          |
           |                                            |                         /                   +--> active*()
         3 +--> mainLoop()                              |                        /                    |
           |       |          +--- runUntilCurrent() <--+  run times call ------/                     +--> delay()
           |       |          |                         |                                             |
           |       | mix call |                         |                                             +--> __le__(), __lt__()
           |       +--------->|----------- timeout() <--+  determine the sleep time                   |  (when push order by time)
                              |                         |                                             |
                              |                         |
                              +---     doIteration() <--+
                                            |           |
                                            | impl
                                            v
                                   EPollReactor.doPoll()
                                    poll(timeout, fds)

mainLoop()是由几层while嵌套实现以poll机制驱动整个程序运作.


1 Twisted介绍

1.1 Deffereds

Twisted uses the Deferred object to manage the callback sequence. The client application attaches a series of functions to the deferred to be called in order when the results of the asynchronous request are available (this series of functions is known as a series of callbacks, or a callback chain), together with a series of functions to be called if there is an error in the asynchronous request (known as a series of errbacks or an errback chain). The asynchronous library code calls the first callback when the result is available, or the first errback when an error occurs, and the Deferred object then hands the results of each callback or errback function to the next function in the chain.

引用

1.2 Callback

A twisted.internet.defer.Deferred is a promise that a function will at some point have a result. We can attach callback functions to a Deferred, and once it gets a result these callbacks will be called. In addition Deferreds allow the developer to register a callback for an error, with the default behavior of logging the error. The deferred mechanism standardizes the application programmer’s interface with all sorts of blocking or delayed operations.

1.3 Timeouts

Timeouts are a special case of Cancellation. Let’s say we have a Deferred representing a task that may take a long time. We want to put an upper bound on that task, so we want the Deferred to time out X seconds in the future. A convenient API to do so is Deferred.addTimeout. By default, it will fail with a TimeoutError if the Deferred hasn’t fired (with either an errback or a callback) within timeout seconds.

2 源码实例

2.1 代码-1 (单回调)


#!/usr/bin/python3
# -*- coding: utf-8 -*-

from twisted.internet import reactor, defer

def getDummyData(inputData):
    print('getDummyData called')
    deferred = defer.Deferred()
    reactor.callLater(2, deferred.callback, inputData * 3)
    return deferred

def cbPrintData(result):
    print('Result received: {}'.format(result))

deferred = getDummyData(3)
deferred.addCallback(cbPrintData)

# manually set up the end of the process by asking the reactor to
# stop itself in 4 seconds time
reactor.callLater(4, reactor.stop)
# start up the Twisted reactor (event loop handler) manually
print('Starting the reactor')
reactor.run()

如果reactor.callLater(2, deferred.callback, inputData * 3)参数deferred.callback换成普通函数, 那么deferred.addCallback添加的 callback将不会执行, 要理解DelayCall对象和Deffered中callbacks之间的联系. Deffered类 如果有多个deferred.addCallback, 前一个回调的返回作为后一个callback的参数.

2.2 代码-2 (多回调)


#!/usr/bin/python3
# -*- coding: utf-8 -*-

from twisted.internet import reactor, defer

class Getter:
    def gotResults(self, x):
        if self.d is None:
            print("Nowhere to put results")
            return

        d = self.d
        self.d = None
        if x % 2 == 0:
            d.callback(x*3)
        else:
            d.errback(ValueError("You used an odd number!"))

    def _toHTML(self, r):
        return "Result: %s" % r

    def getDummyData(self, x):
        self.d = defer.Deferred()
        reactor.callLater(2, self.gotResults, x)
        self.d.addCallback(self._toHTML)
        return self.d

def cbPrintData(result):
    print(result)

def ebPrintError(failure):
    import sys
    sys.stderr.write(str(failure))

# this series of callbacks and errbacks will print an error message
g = Getter()
d = g.getDummyData(3)
d.addCallback(cbPrintData)
d.addErrback(ebPrintError)

# this series of callbacks and errbacks will print "Result: 12"
g = Getter()
d = g.getDummyData(4)
d.addCallback(cbPrintData)
d.addErrback(ebPrintError)

reactor.callLater(4, reactor.stop)
reactor.run()

2.3 代码-3 (线程)


#!/usr/bin/python3
# -*- coding: utf-8 -*-

from twisted.internet.defer import Deferred
from twisted.internet import reactor

import threading

def loadRemoteData(callback, errback, url):
    print("thread[%s] url = %s" % (threading.current_thread().name, url))
    # callback 和 errback 只能调用其中一个,  否则:AlreadyCalledError
    callback('callback data')
    #  errback(ValueError("errback dat"))

def getResult(v):
    print("thread[%s] result = %s" % (threading.current_thread().name, v))

def getError(e):
    print("thread[%s] error = %s" % (threading.current_thread().name, str(e)))

if __name__ == '__main__':
    d = Deferred()
    d.addCallback(getResult)
    d.addErrback(getError)

    print("main():thread[%s]" % threading.current_thread().name)
    reactor.callInThread(loadRemoteData, d.callback, d.errback, "http://www.baidu.com")
    reactor.callLater(4, reactor.stop);
    reactor.run()

3 源码剖析

主要文件: 1. defer.py 2. base.py

3.1 Deffered类


class Deferred
        |          callbacks: 存callback, _chainedTo: 将多个Deffered串成链                     reactor.callLater
        |                                                                                              |
        +---> addCallbacks() <---+----+----+                                                           |
        |                        |    |    |                                                           |
        +---> addCallback() -----+    |    | call                                                      v
        |                             |    |                                                       DelayedCall
        +---> addErrback() -----------+    |                                                           |
        |                                  |                                                           |
        +---> addBoth() -----------+-------+                                                           |
        |                          |                                                                   v
        +---> addTimeout() --------+                                                              reactor.run
        |                                                                                              |
        +---> pause()/cancel()                                                                         |
        |                                                                                              |
        |                             Understand the relation between callback and DelayCall           v
        +---> callback() ---------    <----------------------------------------------------+        timeout
        |                call   /                                                          |           |
        +---> errback()-----   /                                                           |           |
        |                 /   /                                                            |           |
        |                /   /                                                             |           v
        |               v   v                                                 DelayedCall.fun()-->Deferred.callback()
        +--->  _runCallbacks                                                                           |
        |              |                                                                               |
        |              o----> item = callbacks.pop(0)                                                  |
        |              |                                                                               |
        |              | Normal:                                                                       |
        |              o----> item.callbak()    <------------------------------------------------------+
        |              |
        |              | Exception:
        |              o----> failure.Failure()
        |              |

3.2 reactor方法

3.2.1 1. reactor.callLater()


// reactor.callLater(2, deferred.callback, inputData * 3)

@implementer(IReactorCore, IReactorTime, IReactorPluggableResolver,
             IReactorPluggableNameResolver)
class ReactorBase(object):
    ...
    def callLater(self, _seconds, _f, *args, **kw):
       tple = DelayedCall(self.seconds() + _seconds, _f, args, kw,
                          self._cancelCallLater,
                          self._moveCallLaterSooner,
                          seconds=self.seconds)
       self._newTimedCalls.append(tple)
       return tple

将参数封装到DelayedCall对象中并存储在_newTimeCalls变量中, 需要注意的是callLater处理及回调函数还是在主线程中调用的.

3.2.2 2. reactor.run()


// reactor.run()
class _SignalReactorMixin(object):
    ...
    def run(self, installSignalHandlers=True):
        self.startRunning(installSignalHandlers=installSignalHandlers)
        self.mainLoop()


    def startRunning(self, installSignalHandlers=True):
        self._installSignalHandlers = installSignalHandlers
        ReactorBase.startRunning(self)


    def mainLoop(self):
        while self._started:
            try:
                while self._started:
                    self.runUntilCurrent()
                    t2 = self.timeout()
                    t = self.running and t2
                    self.doIteration(t)
            except:
                log.msg("Unexpected error in main loop.")
                log.err()
            else:
                log.msg('Main loop terminated.')

_SignalReactorMixi 混合类, 调用了与它自身没有任何血缘关ReactorBase的方法runUntilCurrent, timeout, doIterationstartRunning.


class ReactorBase(object):
           |
           |
           +---> startRunning()
           |      |
           |      |
           |      |
           +------o---> fireSystemEvent()
           |                 |              "startup"
           |                 |
           |                 o---> _eventTriggers.get()
           |                 |
           |                 |
           |                 o---> event.fireEvent()
           |                 |            | running = True
           |                              |
           |                              o---> DeferredList.addCallback()
           +---> mainLoop()               |                      |
           |        |                                            |
           |        |                                            o---> Deferred._runCallbacks()
           |        o---> runUntilCurrent()                      |
           |        |
           |                 _pendingTimedCalls
           |                   队列中以时间排序
           |

reactor在install初始化时完成了两个重要操作: 1. self._initThreads() 线程池 2. self.installWaker() 唤醒线程, impl: PosixReactorBase.installWaker


class ReactorBase(object):
    ...
    def runUntilCurrent(self):
        """
        Run all pending timed calls.
        """
        if self.threadCallQueue:
            count = 0
            total = len(self.threadCallQueue)
            for (f, a, kw) in self.threadCallQueue:
                try:
                    f(*a, **kw)
                except:
                    log.err()
                count += 1
                if count == total:
                    break
            del self.threadCallQueue[:count]
            if self.threadCallQueue:
                self.wakeUp()

        # insert new delayed calls now
        self._insertNewDelayedCalls()

        now = self.seconds()
        while self._pendingTimedCalls and (self._pendingTimedCalls[0].time <= now):
            call = heappop(self._pendingTimedCalls)
            if call.cancelled:
                self._cancellations-=1
                continue

            if call.delayed_time > 0:
                call.activate_delay()
                heappush(self._pendingTimedCalls, call)
                continue

            try:
                call.called = 1
                call.func(*call.args, **call.kw)
            except:
                ...


        if (self._cancellations > 50 and
             self._cancellations > len(self._pendingTimedCalls) >> 1):
            self._cancellations = 0
            self._pendingTimedCalls = [x for x in self._pendingTimedCalls
                                       if not x.cancelled]
            heapify(self._pendingTimedCalls)

        if self._justStopped:
            self._justStopped = False
            self.fireSystemEvent("shutdown")

3.2.3 3. reactor.callInThread()

涉及文件:

文件 路径
base.py /usr/local/lib/python3.4/dist-packages/twisted/internet
defer.py /usr/local/lib/python3.4/dist-packages/twisted/internet
threadpool.py /usr/local/lib/python3.4/dist-packages/twisted/python
context.py /usr/local/lib/python3.4/dist-packages/twisted/python
_pool.py /usr/local/lib/python3.4/dist-packages/twisted/_threads
_team.py /usr/local/lib/python3.4/dist-packages/twisted/_threads
_threadworker.py /usr/local/lib/python3.4/dist-packages/twisted/_threads

context.py模块中call/get方法代理:


def installContextTracker(ctr):
    global theContextTracker
    global call
    global get

    theContextTracker = ctr
    call = theContextTracker.callWithContext
    get = theContextTracker.getContext

installContextTracker(ThreadedContextTracker())

func函数线程调用图:


ReactorBase    Deferred      ThreadPool    ContextTracker      Team        LockWorker      ThreadWorker      LocalStorage
  |                             |                 |              |             |               |                   |
  | callInThread                |                 |              |             |               |                   |
  +---------------------------->|callInThread     |              |             |               |                   |
  |                      func   |    |            |              |             |               |                   |
  |                                  |            |     inContext|             |               |                   |
  |                      callInThreadWithCallback--------------->|do           |               |                   |
  |                                  |            |              |             |               |                   |
  |                                  |            |              |             |               |                   |
  |                                  |            |              +------------>|do             |                   |
                                     |            |              |        work |               |                   |
                                     |            |              |             |               |            append |
                                     |            |              |             |---------------------------------->|working
                                     |            |              |             |               |                   |
                                     |            |              |                             |             pop   |
                                     |            |              |<------------------------------------------------|
    +----------------------+         |            |      _coordinateThisTask                   |
    |  _coordinateThisTask |         |            |                 |        _pool.py          |
    |                      |         |            |                 |            |             |
    |   +--------------+   |         |            |          worker |------------------------->| __init__
    |   |    doWork    |   |         |            |                 |    limitedWorkerCreator  |     |
    |   | +----------+ |   |         |            |                 |            |             |     |
    |   | | inContext| |   |         |            |                 |            |             |     |
    |   | |  +----+  | |   |         |            |                 |       startThread <------------|
    |   | |  |func|  | |   |         |            |                 |            |             |     |
    |   | |  +----+  | |   |         |            |                 |            |             |     |
    |   | +----------+ |   |         |            |                 |     Thread.start()------------>| work()
    |   +--------------+   |         |            |                 |                          |          |
    +----------------------+         |            |                 |                          |    while | get task
                                     |            |       @worker.do|                          |          |<------<
                                     |            |                 |------------------------->|do        |       |
                                     |            |                 |                          |          |       |
                                     |            |                 |                          | put task |       |
                                     |            |                 |                          |--------> |       |
                                     |            |                 |                          |          |       |
                                     |            |                 |                                     |       |
                                     |            |          doWork |<------------------------------ task()-------^
                                     |            |            |    |                                     |
                                     |            |            |    |                                     |
                                  inContext<--------task() <---+    |                                   <===> thread end!
                                      |           |                 |
                                      | theWork() |                 | _recycleWorker
                                      |---------->| context.call    |
                                                         |
                                                         |
                                                         |
                                                   callWithContext
                                                         |
                                                         |
              func() <-----------------------------------+

过程: 初始化线程池ThreadPool --> 线程协调LockWorker --> 创建工作线程ThreadWorker --> 传递任务(team.do) --> 线程中处理任务 --> 回收 线程

原理: ThreadWorker维护Queue将传递过来的task加入队列, ThreadWorker在初始化时调用startThread()启动Thread.start, 到此一个新的 线程被创建, 该新线程会执行ThreadWorker中work方法, work方法从Queue队列中取新的task去执行.