跳转至

Tornado 源码分析

本文所有的分析都基于 Tornado 4.5.2,对应官方文档为 Tornado Docs branch4.5,运行示例使用的 Python 版本为 2.7.18。

Tornado 可以大致分为四个主要组成部分:

  • Web 框架(包含 RequestHandler 以及一些支持类)。
  • HTTP 客户端以及服务端的实现(包括 HTTPServerAsyncHTTPClient)。
  • 异步网络库(包括 IOLoopIOStream ,可以作为 HTTP 组件的构建块,也可用于实现其他协议)。
  • 协程库 tornado.gen

非阻塞与异步

术语异步(asynchronous )和非阻塞(non-blocking)密切相关,通常可以互换使用,但它们并不完全相同。

阻塞

当函数在返回之前等待某事发生时阻塞。函数可能由于多种原因而阻塞: 网络 I/O、磁盘 I/O、互斥锁等等。

异步

异步函数在完成之前返回(普通的同步函数在返回之前执行所有要执行的操作)。其有多种设计风格:

  • 通过参数回调
  • 返回占位符(Future、Promise、Deferred等)
  • 使用队列
  • 注册回调(比如 POSIX 信号等)

比如同步代码风格如下:

from tornado.httpclient import HTTPClient

def synchronous_fetch(url):
    http_client = HTTPClient()
    response = http_client.fetch(url)
    return response.body

通过 Tornado 采用的协程库 tornado.genFuture 占位符设计,相应的异步代码如下:

from tornado import gen

@gen.coroutine
def fetch_coroutine(url):
    http_client = AsyncHTTPClient()
    response = yield http_client.fetch(url)
    raise gen.Return(response.body)

可以发现两者风格是非常一致的,这更利于阅读和理解。由于 Python 3.3 之前不允许在生成器中使用 return,Tornado 使用触发异常的形式进行返回结果。

class Return(Exception):  # 并没有直接继承自 StopIteration
    def __init__(self, value=None):
        super(Return, self).__init__()
        self.value = value
        # Cython 通过 .args 元组识别 StopIteration 的子类
        self.args = (value,)

事件循环 IOLoop

Tornado 是以单进程单线程场景所设计的,因此 IOloop 是单例的(非典型场景下是可以多例的,比如多个线程中分别使用不同的 IOLoop),它是一个水平触发的 I/O 事件循环,在 Linux 下使用 epoll,BSD 下使用 kqueue,在两者不可用的情况下使用 select

水平触发(Level-Triggered):只要文件描述符关联的读内核缓冲区非空,有数据可以读取,就一直发出可读信号进行通知,当文件描述符关联的内核写缓冲区不满,有空间可以写入,就一直发出可写信号进行通知。

边缘触发(Edge-Triggered):当文件描述符关联的读内核缓冲区由空转化为非空的时候,则发出可读信号进行通知,当文件描述符关联的内核写缓冲区由满转化为不满的时候,则发出可写信号进行通知。

epoll 默认采用水平触发。

创建与启动

loop = IOLoop.current()
loop.start()

创建 IOLoop.current()

IOLoop.currentIOLoop.instance 都可以用来创建事件循环,其区别为:

class IOLoop(Configurable):
    _instance_lock = threading.Lock()
    _current = threading.local()

    def instance():
        if not hasattr(IOLoop, "_instance"):
            with IOLoop._instance_lock:
                # 避免多线程并发调用,所以 instance() 始终返回单例
                if not hasattr(IOLoop, "_instance"):
                    # New instance after double check
                    IOLoop._instance = IOLoop()  # 类变量为线程共享的
        return IOLoop._instance

    def current(instance=True):
        current = getattr(IOLoop._current, "instance", None)
        # 当前线程没有对应的 IOLoop 时,默认调用 instance 完成创建
        if current is None and instance:
            return IOLoop.instance()
        return current

在主线程中调用 instancecurrent 是等价的。其他线程可以通过直接实例化 loop = IOLoop() 创建自己的事件循环(实例化过程中默认会调用 IOLoop.make_current 方法绑定到当前线程),或者通过 IOLoop.instance() 获取主线程的事件循环。

print("In main thread ", id(IOLoop.instance()), id(IOLoop.current()))

t = Thread(
    target=lambda : IOLoop() and print("In other thread ", id(IOLoop.instance()), id(IOLoop.current()))
)
t.start(), t.join()

# In main thread  140101302999120 140101302999120
# In other thread  140101302999120 140101291243856

可以看到线程 t 通过 IOLoop.instance() 拿到了主线程的 IOLoop,由于其直接实例化了 IOLoop(),所以 IOLoop.current() 拿到了自己独有的 IOLoop。

启动事件循环 loop.start()

class PollIOLoop(IOLoop):
    def start(self):
        try:
            while True:
                pass
        finally:
            pass

创建过程,以 EPollIOLoop 为例

print(type(IOLoop()))
# <class 'tornado.platform.epoll.EPollIOLoop'>

上例可以看到,IOLoop() 最后生成了 EPollIOLoop 的实例,那么这个过程是怎样的呢?

class EPollIOLoop(PollIOLoop):
    def initialize(self, **kwargs):
        super(EPollIOLoop, self).initialize(impl=select.epoll(), **kwargs)

class PollIOLoop(IOLoop):
    def initialize(self, impl, time_func=None, **kwargs):
        super(PollIOLoop, self).initialize(**kwargs)
        self._impl = impl

class IOLoop(Configurable):
    @classmethod
    def configurable_base(cls):
        return IOLoop

    @classmethod
    def configurable_default(cls):
        if hasattr(select, "epoll"):
            # epoll 可用则返回 EPollIOLoop
            from tornado.platform.epoll import EPollIOLoop
            return EPollIOLoop
        # 其他情形下返回 KQueueIOLoop 或者 SelectIOLoop

    def initialize(self, make_current=None):
        if make_current is None:
            if IOLoop.current(instance=False) is None:
                # 当前线程没有事件循环则将该事件循环设置为此线程的事件循环
                self.make_current()

class Configurable(object):
    __impl_kwargs = None

    def __new__(cls, *args, **kwargs):
        base = cls.configurable_base()  # base 为 IOLoop
        init_kwargs = {}
        if cls is base:
            # 直接实例化 loop = IOLoop() 时
            # 根据当前系统环境进行选择,此时为 EPollIOLoop
            impl = cls.configured_class()
            if base.__impl_kwargs:
                init_kwargs.update(base.__impl_kwargs)
        else:
            # 显式实例化 loop = EpollIOloop() 时
            impl = cls
        init_kwargs.update(kwargs)
        # 实例化 EPollIOLoop
        instance = super(Configurable, cls).__new__(impl)
        # 调用 EPollIOLoop 中的 initialize 方法
        instance.initialize(*args, **init_kwargs)
        return instance
总结一下,当使用 IOloop() 或者 EpollIOloop() 创建事件循环对象时,根据当前环境,前者隐式创建了 EpollIOloop 的对象,和后者等价。 实例化 IOLoop() 的调用顺序为:

Configurable.__new__()

# 用于判断是通过 IOLoop() 隐式创建还是 EPollIOLoop() 等显式创建
IOLoop.configurable_base()

# 根据当前系统环境进行选择,此时为 EPollIOLoop
IOLoop.configured_class()

# 确定具体实现为 select.epoll()
EPollIOLoop().initialize()

# 初始化 IOLoop 的状态、callback 队列等
PollIOLoop().initialize()

# 调用 make_current
IOLoop().initialize()

# 若当前线程没有设置事件循环,则将当前实例化的事件循环设置为当前线程的事件循环
IOLoop().make_current()

IOLoop 的休眠与唤醒

IOLoop 本质上是一个无限循环,每次循环都会通过 event_pairs = self._impl.poll(poll_timeout) 检测 I/O 事件,如果 poll_timeout 时间内没有 I/O事件,循环就会在这段时间阻塞,线程进入休眠。此期间如果其他线程调用此 IOLoop 的 loop.stop()loop.add_callback()等操作,IOLoop 就无法及时响应,所以需要有一套机制能够在 poll_timeout 期间解除阻塞并唤醒线程。Waker 承担了这份工作。

def _set_nonblocking(fd):
    # 设置 fd 为非阻塞
    flags = fcntl.fcntl(fd, fcntl.F_GETFL)
    fcntl.fcntl(fd, fcntl.F_SETFL, flags | os.O_NONBLOCK)

def set_close_exec(fd):
    # 在 fork 子进程中执行 exec 的时候,子进程会自动关闭继承得到的 fd。
    flags = fcntl.fcntl(fd, fcntl.F_GETFD)
    fcntl.fcntl(fd, fcntl.F_SETFD, flags | fcntl.FD_CLOEXEC)

class Waker(interface.Waker):
    def __init__(self):
        r, w = os.pipe()
        _set_nonblocking(r)
        _set_nonblocking(w)
        set_close_exec(r)
        set_close_exec(w)
        self.reader = os.fdopen(r, "rb", 0)
        self.writer = os.fdopen(w, "wb", 0)

    def fileno(self):
        return self.reader.fileno()

    def wake(self):
        # 写入的数据并无实际意义
        self.writer.write(b"x")

    def consume(self):
        # 只是将管道内数据消费掉
        while True:
            result = self.reader.read()

class PollIOLoop(IOLoop):
    def initialize(self, impl, time_func=None, **kwargs):
        # 保存对 fd 的处理函数,
        self._handlers = {}
        self._waker = Waker()
        # 将 waker 的 reader 添加到 epoll 的监听中,当有可读事件的时候,阻塞就解除了
        self.add_handler(self._waker.fileno(), lambda fd, events: self._waker.consume(), self.READ)

    def add_handler(self, fd, handler, events):
        # 这里取得fd.fileno() 和 fd 对象,做一个 fd 和 handler 的绑定
        fd, obj = self.split_fd(fd)
        self._handlers[fd] = (obj, stack_context.wrap(handler))
        # 将 fd 注册到 epoll 监听列表中
        self._impl.register(fd, events | self.ERROR)

    def start(self):
        while True:
            self._run_callback(self._callbacks.popleft())
            # No timeouts and no callbacks, so use the default.
            poll_timeout = _POLL_TIMEOUT  # 3600.0
            # 如果此时 poll_timeout = 3600,且监听的 fd 都没有事件,线程会阻塞在此,进入休眠
            # 这时其他线程调用 add_callback 给当前 IOLoop 添加的 callback 就无法处理了
            # 所以在 add_callback 时调用 wake 方法,由于 epoll 监听了管道 reader 对应的 fd
            # 阻塞解除,进行下一轮循环
            event_pairs = self._impl.poll(poll_timeout)
            self._events.update(event_pairs)
            while self._events:
                fd, events = self._events.popitem()
                try:
                    # 获取到 fd 对应事件后调用相应的处理函数
                    fd_obj, handler_func = self._handlers[fd]
                    # self._waker.consume() 消费掉管道中的数据
                    handler_func(fd_obj, events)

    def add_callback(self, callback, *args, **kwargs):
        if thread.get_ident() != self._thread_ident:
            # 其他线程对此 IOLoop 调用 add_callback
            self._waker.wake()

    def stop(self):
        # 如果 IOLoop 阻塞,调用 stop 状态不能及时更新,所以也需要 wake
        self._running = False
        self._stopped = True
        self._waker.wake()

占位符 Future

Future 封装了异步操作的结果,并非线程安全,因此与单线程事件循环一起使用更快。Tornado 中常和 IOLoop.add_future 连用,或者由 gen.coroutine yield。其设计和结构都非常简单,需要注意的是它的回调处理,它通过回调连接了 IOLoop 和 Runner ,协助完成了生成器的调度执行。

class Future(object):
    def __init__(self):
        self._done = False  # 操作是否完成
        self._result = None
        self._exc_info = None

        # 保存操作完成后的回调函数
        self._callbacks = []

    def result(self, timeout=None):
        # 操作完成返回结果,操作失败抛出报错
        self._clear_tb_log()
        if self._result is not None:
            # 已有结果
            return self._result
        if self._exc_info is not None:
            # 已有报错
            try:
                raise_exc_info(self._exc_info)
            finally:
                self = None
        self._check_done()
        return self._result

    def set_result(self, result):
        # 设置结果并触发回调
        self._result = result
        self._set_done()

    def _set_done(self):
        self._done = True
        for cb in self._callbacks:
            # 这里的 cb 就包括 IOLoop 中添加的 lambda future: self.add_callback(callback, future)
            # callback 为 Runner.run
            # IOLoop 下一次循环就会通过  Runner.run 继续执行原生成器了
            cb(self)
        self._callbacks = None

协程装饰器 @gen.coroutine

def coroutine(func, replace_callback=True):
    return _make_coroutine_wrapper(func, replace_callback=True)

# key 为 future 对象,value 为 Runner 对象
# 这个映射提供了对 Runner 对象的强引用,只要它们的结果 future 对象也有强引用(通常来自父协程的 Runner), Runner 就不会被 GC 回收
# 当 future 没有引用时,对应 Runner 也就可以被 GC 回收了
_futures_to_runners = weakref.WeakKeyDictionary()

def _make_coroutine_wrapper(func, replace_callback):
    wrapped = func

    @functools.wraps(wrapped)
    def wrapper(*args, **kwargs):
        future = TracebackFuture()  # TracebackFuture = Future

        try:
            # 调用 func,如果是普通函数,则同步执行其中代码
            # 如果是生成器函数,直接返回生成器
            result = func(*args, **kwargs)
        except (Return, StopIteration) as e:
            result = _value_from_stopiteration(e)
        except Exception:
            future.set_exc_info(sys.exc_info())
            return future
        else:
            if isinstance(result, GeneratorType):
                try:
                    # result:被装饰的 func 调用后返回的生成器
                    # 这里第一次执行生成器
                    yielded = next(result)
                else:
                    # future:生成器调用完成最终结果保存在此 future
                    # yielded:生成器第一次返回的结果
                    _futures_to_runners[future] = Runner(result, future, yielded)

                return future

        # func 是同步函数,执行完的 result 保存在 future 中
        future.set_result(result)
        return future

    return wrapper

在使用 @gen.coroutine 装饰一个函数(代码块中没有 yield 关键字)时,代码同步执行,结果通过 future.set_result(result) 直接返回,future 为已完成状态。在装饰一个生成器时,代码异步执行,返回的 future 为未完成状态,后续的执行包装在 Runner 中,然后存放在 _futures_to_runners,等到第一次 yield 的 future (下例中为 gen.sleep(10))为已完成状态时,IOLoop 通过回调调用 Runner.run() 使生成器继续运行。

@gen.coroutine
def foo():
    return 1 + 2

future = foo()
print(future.done(), future.result(), gen._futures_to_runners.keys())
# (True, 3, [])

@gen.coroutine
def bar():
    yield gen.sleep(10)
    raise gen.Return(1 + 2)

future = bar()
print(future.done(), hex(id(future)), gen._futures_to_runners.keys())
# (False, '0x7f89f51a3710', [<tornado.concurrent.Future object at 0x7f89f51a3710>])

生成器运行工具 Runner

在装饰器 @gen.coroutine 中,将被装饰的生成器、生成器的最终 future、第一次调用生成器 yield 的结果包装成了一个 Runner 对象(Runner(result, future, yielded)),后续所有对生成器的执行,都由 Runner 进行:

class Runner(object):
    def __init__(self, gen, result_future, first_yielded):
        self.gen = gen  # 待运行的生成器
        self.result_future = result_future  # 生成器的最终结果
        self.future = _null_future
        self.io_loop = IOLoop.current()
        if self.handle_yield(first_yielded):
            self.run()

    def handle_yield(self, yielded):
        # 处理每一次 yield 出来的结果,包装后存在 self.future 中
        self.future = convert_yielded(yielded)

        if not self.future.done() or self.future is moment:
            def inner(f):
                # Break a reference cycle to speed GC.
                f = None # noqa
                self.run()
            # 未完成的话放到 IOLoop 中注册一下
            # 这里的注册一下指的是 lambda future: self.add_callback(callback, future)
            # 在 future 完成的时候会将 callback(run)添加到 IOLoop 的 callback 列表
            self.io_loop.add_future(self.future, inner)
            return False
        return True

    def run(self):
        """ 运行或是暂停生成器,运行直到下一个没有完成的 yield point """
        try:
            while True:
                # 首次运行时,self.future 包装了第一次调用 next(gen) 的结果
                # 可以理解为到达了一个 yield point
                future = self.future
                if not future.done():
                    # yield point 未完成,暂停生成器的运行
                    return
                # 已完成
                self.future = None

                value = future.result()
                future = None
                # 继续运行生成器
                yielded = self.gen.send(value)
                # yielded 就是下一个 yield point 了
                # handle_yield 会把 yielded 设置为 self.future 的,保证每次循环都能继续
                if not self.handle_yield(yielded):
                    return

# 未完成的 future 添加到 IOLoop,完成后调用 inner(就是 run)
# self.io_loop.add_future(self.future, inner)
class IOLoop(Configurable):
    def add_future(self, future, callback):
        # 这个 lambda 函数在 future 完成时执行,比如 future.set_result 时
        # 此时实际执行的内容是 IOLoop(这里的self) 的 add_callback
        # 将 inner (这里的 callback) 方法添加到 IOLoop 的 _callbacks 队列,inner 的参数为 future
        # 总的来说,就是未完成的 future 添加到 IOLoop 中,完成时调用 Runner 的 run 方法,继续执行生成器
        future.add_done_callback(lambda future: self.add_callback(callback, future))

class PollIOLoop(IOLoop):
    def add_callback(self, callback, *args, **kwargs):
        self._callbacks.append(functools.partial(stack_context.wrap(callback), *args, **kwargs))

    def start(self):
        # inner 并不是在 future.set_result 时、立即执行,而是添加到 _callbacks 中
        # 在 IOLoop 的循环中执行
        self._run_callback(self._callbacks.popleft())

异步 sleep() 与异步 fetch()

gen.sleep()

def sleep(duration):
    # 直接返回一个未完成 future,并在 IOLoop 中注册回调函数,
    # 等 duration 后 set_result(None)
    f = Future()
    IOLoop.current().call_later(duration, lambda: f.set_result(None))
    return f

class IOLoop(Configurable):
    def call_later(self, delay, callback, *args, **kwargs):
        # 在 self.time() + delay 时间后,调用 callback 也就是 f.set_result(None)
        return self.call_at(self.time() + delay, callback, *args, **kwargs)

class PollIOLoop(IOLoop):
    def initialize(self, impl, time_func=None, **kwargs):
        # 最小堆实现,堆顶元素的 deadline 距离当前时间最近
        self._timeouts = []  # type: List[_Timeout]

    def call_at(self, deadline, callback, *args, **kwargs):
        # _Timeout 就是对 deadline 以及需要执行的 callback 的一个简单封装
        timeout = _Timeout(
            deadline,
            functools.partial(stack_context.wrap(callback), *args, **kwargs),
            self
        )
        heapq.heappush(self._timeouts, timeout)
        return timeout

    def start(self):
        # 待执行的 timeout 列表
        due_timeouts = []
        if self._timeouts:
            now = self.time()
            while self._timeouts:
                if self._timeouts[0].callback is None:
                    # timeout 被取消执行
                elif self._timeouts[0].deadline <= now:
                    # 堆顶元素到了执行时间,添加到待执行列表
                    due_timeouts.append(heapq.heappop(self._timeouts))
                else:
                    # 堆顶元素都没到执行时间,剩下的不可能到执行时间
                    break

        for timeout in due_timeouts:
            if timeout.callback is not None:
                # timeout.callback 即 lambda: f.set_result(None)
                # set_result 会将 inner 函数(Runner().run)添加到 IOLoop 中
                # 这样生成器就会接着运行了
                self._run_callback(timeout.callback)
可以看到 gen.sleep() 的实现过程非常简单,调用它直接返回一个未完成的 Future 对象,并在 IOLoop 的 _timeouts(使用最小堆实现)中添加一个 _Timeout 对象,在 IOLoop 的循环过程中检测 _timeouts,到达执行时间后便对相应的 Future 对象执行 f.set_result(None)。如果有如下代码:

@gen.coroutine
def foo():
    yield gen.sleep(3)
    print("end sleep")

由于 gen.sleep(3) 返回的 future 未完成,所以 Runner 暂停了生成器 foo 的执行,3 秒后,IOLoop 获取 due_timeouts 并执行 f.set_result(None)inner 作为 future 绑定的 callback 被调用,Runner.run 被执行,此时判断条件 if not future.done():False,生成器 foo 继续运行:yielded = self.gen.send(value)

AsyncHTTPClient().fetch()

AsyncHTTPClient() 实际生成的对象是 SimpleAsyncHTTPClient 的实例。其隐式的创建过程和上面调用 IOLoop() 生成 EpollIOLoop 的实例一样。

class AsyncHTTPClient(Configurable):
    def fetch(self, request, callback=None, raise_error=True, **kwargs):
        if not isinstance(request, HTTPRequest):
            request = HTTPRequest(url=request, **kwargs)

        def handle_response(response):
            future.set_result(response)

        self.fetch_impl(request, handle_response)
        return future

class SimpleAsyncHTTPClient(AsyncHTTPClient):
    def initialize(...):
        # 正在执行,key 为 object(),value 为 二元组,形式为(HTTPRequest(url), callback)
        self.active = {}
        # 等待执行,key 为 object(),value 为 三元组,形式为(HTTPRequest(url), callback, timeout_handle)
        self.waiting = {}
        # 默认 10
        self.max_clients = max_clients
        # 任务队列,元素为三元组,形式为 (object(), HTTPRequest(url), callback)
        self.queue = collections.deque()
        # 后续的所有 _HTTPConnection 的实例都使用同一个 tcp_client
        self.tcp_client = TCPClient(resolver=self.resolver, io_loop=io_loop)

    def fetch_impl(self, request, callback):
        key = object()
        self.queue.append((key, request, callback))
        if not len(self.active) < self.max_clients:
            # 如果正在运行的任务 >= max_clients,则计算一个 timeout
            # 添加 _on_timeout 方法作为 timeout_handle 到IOLoop
            # 如果到达 timeout 还未执行,则 raise 一个超时报错(status_code=599)
            # 如果执行了则取消 timeout_handle
        else:
            timeout_handle = None
        self.waiting[key] = (request, callback, timeout_handle)
        self._process_queue()

    def _process_queue(self):
        while self.queue and len(self.active) < self.max_clients:
            key, request, callback = self.queue.popleft()
            # 从 waiting 中移出,如果有 timeout_handle,则在 IOLoop 中取消
            self._remove_timeout(key)
            # 放入到 active 中,表示正在执行
            self.active[key] = (request, callback)
            release_callback = functools.partial(self._release_fetch, key)
            self._handle_request(request, release_callback, callback)

    def _release_fetch(self, key):
        # 请求已完成,从 active 中移出,继续从 self.queue 中取任务执行
        del self.active[key]
        self._process_queue()

    def _handle_request(self, request, release_callback, final_callback):
        # self._connection_class() 返回 _HTTPConnection 这个类
        # 多个请求使用同一个 SimpleAsyncHTTPClient 对象
        # 每个请求使用不同的 _HTTPConnection 对象
        self._connection_class()(
            self.io_loop, self, request, release_callback,
            final_callback, self.max_buffer_size, self.tcp_client,
            self.max_header_size, self.max_body_size)

class _HTTPConnection(httputil.HTTPMessageDelegate):
    def __init__():
        self.io_loop = io_loop
        # 这个 connection 对应的 SimpleAsyncHTTPClient
        self.client = client
        self.request = request
        # 即 SimpleAsyncHTTPClient 的 _release_fetch 方法
        self.release_callback = release_callback
        # 即 SimpleAsyncHTTPClient 的 fetch 方法中的 handle_response 方法
        self.final_callback = final_callback
        # connect() 是一个被 @gen.coroutine 装饰的生成器
        # 这里生成器就由 Runner 来运行了
        self.tcp_client.connect(
            # 这个 callback 是在 _make_coroutine_wrapper 中处理的
            callback=self._on_connect
        )

class TCPClient(object):
    @gen.coroutine
    def connect(self, host, port, af=socket.AF_UNSPEC, ssl_options=None,
                max_buffer_size=None, source_ip=None, source_port=None):
        # DNS 解析
        addrinfo = yield self.resolver.resolve(host, port, af)
        connector = _Connector(
            addrinfo,
            self.io_loop,
            functools.partial(
                self._create_stream,
                max_buffer_size,
                source_ip=source_ip,
                source_port=source_port
            )
        )
        # connector.start() 调用 connector.try_connect() 接着调用 connector.connect()
        # connect 就是实例化 _Connector 传入的 self._create_stream 的偏函数
        af, addr, stream = yield connector.start()

        raise gen.Return(stream)

    def _create_stream(self, max_buffer_size, af, addr, source_ip=None, source_port=None):
        # 创建了本次连接使用的 socket 对象
        socket_obj = socket.socket(af)
        try:
            stream = IOStream(socket_obj, io_loop=self.io_loop, max_buffer_size=max_buffer_size)
        except socket.error as e:
            fu = Future()
            fu.set_exception(e)
            return fu
        else:
            return stream.connect(addr)

class IOStream(BaseIOStream):
    def connect(self, address, callback=None, server_hostname=None):
        # 监听 socket_obj 的写事件
        self._add_io_state(self.io_loop.WRITE)

class BaseIOStream(object):
    def _add_io_state(self, state):
        if self._state is None:
            # 将 socket 对应的 fd 添加到 IOLoop 的监听列表
            self.io_loop.add_handler(self.fileno(), self._handle_events, self._state)


class PollIOLoop(IOLoop):
    def add_handler(self, fd, handler, events):
        fd, obj = self.split_fd(fd)
        # 对应 fd 有事件后的处理函数
        self._handlers[fd] = (obj, stack_context.wrap(handler))
        # 在 epoll 中注册 fd
        self._impl.register(fd, events | self.ERROR)
SimpleAsyncHTTPClient 保存了一个任务队列,内容为其所发出的请求以及读取响应后的回调,每一个请求都实例化一个 _HTTPConnection 对象进行封装,_HTTPConnection 封装了一些 HTTP 操作(比如计算 Content-Length 、选择 Content-Type 等),然后通过 TCPClient 创建 IOStream 将该请求使用的 fd 添加到 IOLoop 中。这样在 IOLoop 就能监听到相应的 IO 事件了。

上下文 stack_context

在 IOLoop 的 add_callback 方法中,所有的回调方法被添加到 self._callbacks 之前,都使用 stack_context.wrap(callback) 进行包装,它的作用是什么?实现的逻辑又是怎样的呢?

class PollIOLoop(IOLoop):
    def add_callback(self, callback, *args, **kwargs):
        self._callbacks.append(functools.partial(stack_context.wrap(callback), *args, **kwargs))

stack_context.py 的头部注释中,有一个示例:

@contextlib.contextmanager
def die_on_error():
    try:
        yield
    except Exception:
        logging.error("exception in asynchronous operation",exc_info=True)
        sys.exit(1)

with StackContext(die_on_error):
    # Any exception thrown here *or in callback and its descendants*
    # will cause the process to exit instead of spinning endlessly
    # in the ioloop.
    http_client.fetch(url, callback)

ioloop.start()

其意思是说,在 StackContext(die_on_error) 上下文中的报错,会被 die_on_error 中的 try-except 捕获,然后执行 sys.exit(1) 结束程序。而且神奇的是 callback 中的报错也能被 die_on_error 捕获处理,我们知道,执行 http_client.fetch(url, callback) 的时候,是不会同步执行 callback 的,需要获取数据后才会回调执行,此时的报错能被 die_on_error 中的 try-except 捕获,说明 callback 必然绑定了 die_on_error 所提供的上下文环境,那么很显然 stack_context.wrap(callback) 就是为 callback 来绑定上下文的。

import sys
import contextlib
from tornado.stack_context import StackContext
from tornado.ioloop import IOLoop


@contextlib.contextmanager
def foo():
    try:
        yield
    except Exception:
        print("catch in foo")
        sys.exit(1)

@contextlib.contextmanager
def bar():
    try:
        yield
    except Exception:
        print("catch in bar")
        raise


with StackContext(foo):
    with StackContext(bar):
        def callback():
            raise Exception("in callback")

        IOLoop.current().add_callback(callback)

try:
    IOLoop.current().start()
except KeyboardInterrupt:
    sys.exit(0)

# catch in bar
# catch in foo

修改一下官方示例,运行代码可以发现 callback 中的异常被 bar foo 捕获了。如果将 stack_context.wrap 中的内容移除,会怎样呢?将下面的代码加在上述示例的开头:

from tornado import stack_context
def _wrap(fn):
    def inner(*args, **kwargs):
        return fn(*args, **kwargs)
    return inner
# 将 stack_context.wrap 替换为一个没有实际作用的装饰器
stack_context.wrap = _wrap

再次运行可以发现 callback 中的异常就没有被 foo bar 捕获,所以 with StackContext(foo) 用于创建上下文,而 stack_context.wrap 用于将上下文绑定给 callback

class _State(threading.local):  # 线程隔离
    def __init__(self):
        self.contexts = (tuple(), None)  # tuple() 位置存放所有绑定的上下文,None 位置存放顶部上下文

_state = _State()  # 单例

def wrap(fn):
    # 为了在闭包中修改 cap_contexts 设置为 list
    cap_contexts = [_state.contexts]

    if not cap_contexts[0][0] and not cap_contexts[0][1]:
        # 没有上下文,fn 执行时也不需要进入上下文了
        def null_wrapper(*args, **kwargs):
            try:
                current_state = _state.contexts
                _state.contexts = cap_contexts[0]
                return fn(*args, **kwargs)
            finally:
                _state.contexts = current_state
        # 设置标识避免重复装饰
        null_wrapper._wrapped = True
        return null_wrapper

    def wrapped(*args, **kwargs):
        ret = None
        try:
            # Capture old state
            current_state = _state.contexts

            # Remove deactivated items
            # 移除被调用了 deactivate 的上下文
            cap_contexts[0] = contexts = _remove_deactivated(cap_contexts[0])

            # Force new state
            _state.contexts = contexts

            # Current exception
            exc = (None, None, None)
            top = None

            # Apply stack contexts
            # 当前需要进入的上下文
            last_ctx = 0
            stack = contexts[0]

            # Apply state
            for n in stack:
                try:
                    n.enter()
                    last_ctx += 1
                except:
                    # Exception happened. Record exception info and store top-most handler
                    # 进入上下文时报错,top 保存上一个上下文
                    exc = sys.exc_info()
                    top = n.old_contexts[1]

            if top is None:
                # top 为 None 进入上下文没有报错,直接执行 fn
                try:
                    ret = fn(*args, **kwargs)

            if top is not None:
                # top 不为 None 进入上下文有报错,处理报错
            else:
                while last_ctx > 0:
                    last_ctx -= 1
                    c = stack[last_ctx]

                    try:
                        # 退出进入过的上下文
                        c.exit(*exc)
                    except:
        finally:
            _state.contexts = current_state
        return ret

    wrapped._wrapped = True
    return wrapped

此模块在 Tornado 5.1 中被标记为弃用,在6.0 中被正式移除,原因是 async def 原生协程使 stack_context 无法提供原有功能

一些问题

阻塞代码

在单线程场景下,如果生成器中有阻塞线程的代码,就会导致 IOLoop 被阻塞,这时生成器之间的执行顺序可能会影响对结果的预计。比如:

def log(str_):
    print("{} {}".format(datetime.now(), str_))

@gen.coroutine
def f1():
    log("f1 start")
    yield time.sleep(3)
    log("f1 end")

@gen.coroutine
def f2():
    log("f2 start")
    yield gen.sleep(3)
    log("f2 end")

先运行 f1() 后运行 f2(),可以看到整个过程花费了 6 秒。

loop = IOLoop.current()
f1(), f2()
loop.start()
# 2022-08-25 09:02:43.093956 f1 start
# 2022-08-25 09:02:46.097484 f2 start
# 2022-08-25 09:02:46.097825 f1 end
# 2022-08-25 09:02:49.100197 f2 end

先运行 f2() 后运行 f1(),结果只花费了 3 秒。

loop = IOLoop.current()
f2(), f1()
loop.start()
# 2022-08-25 09:03:27.147476 f2 start
# 2022-08-25 09:03:27.147732 f1 start
# 2022-08-25 09:03:30.149497 f1 end
# 2022-08-25 09:03:30.149698 f2 end

结合上面对 Runner 的理解以及对 gen.sleep 的实现分析不难明白其原因。