Tornado 源码分析¶
本文所有的分析都基于 Tornado 4.5.2,对应官方文档为 Tornado Docs branch4.5,运行示例使用的 Python 版本为 2.7.18。
- Web 框架(包含
RequestHandler
以及一些支持类)。 - HTTP 客户端以及服务端的实现(包括
HTTPServer
和AsyncHTTPClient
)。 - 异步网络库(包括
IOLoop
和IOStream
,可以作为 HTTP 组件的构建块,也可用于实现其他协议)。 - 协程库
tornado.gen
。
非阻塞与异步¶
术语异步(asynchronous )和非阻塞(non-blocking)密切相关,通常可以互换使用,但它们并不完全相同。
当函数在返回之前等待某事发生时阻塞。函数可能由于多种原因而阻塞: 网络 I/O、磁盘 I/O、互斥锁等等。
异步函数在完成之前返回(普通的同步函数在返回之前执行所有要执行的操作)。其有多种设计风格:
- 通过参数回调
- 返回占位符(Future、Promise、Deferred等)
- 使用队列
- 注册回调(比如 POSIX 信号等)
比如同步代码风格如下:
from tornado.httpclient import HTTPClient
def synchronous_fetch(url):
http_client = HTTPClient()
response = http_client.fetch(url)
return response.body
通过 Tornado 采用的协程库 tornado.gen
和 Future
占位符设计,相应的异步代码如下:
from tornado import gen
@gen.coroutine
def fetch_coroutine(url):
http_client = AsyncHTTPClient()
response = yield http_client.fetch(url)
raise gen.Return(response.body)
可以发现两者风格是非常一致的,这更利于阅读和理解。由于 Python 3.3 之前不允许在生成器中使用 return
,Tornado 使用触发异常的形式进行返回结果。
class Return(Exception): # 并没有直接继承自 StopIteration
def __init__(self, value=None):
super(Return, self).__init__()
self.value = value
# Cython 通过 .args 元组识别 StopIteration 的子类
self.args = (value,)
事件循环 IOLoop
¶
Tornado 是以单进程单线程场景所设计的,因此 IOloop
是单例的(非典型场景下是可以多例的,比如多个线程中分别使用不同的 IOLoop),它是一个水平触发的 I/O 事件循环,在 Linux 下使用 epoll
,BSD 下使用 kqueue
,在两者不可用的情况下使用 select
。
水平触发(Level-Triggered):只要文件描述符关联的读内核缓冲区非空,有数据可以读取,就一直发出可读信号进行通知,当文件描述符关联的内核写缓冲区不满,有空间可以写入,就一直发出可写信号进行通知。
边缘触发(Edge-Triggered):当文件描述符关联的读内核缓冲区由空转化为非空的时候,则发出可读信号进行通知,当文件描述符关联的内核写缓冲区由满转化为不满的时候,则发出可写信号进行通知。
epoll
默认采用水平触发。
创建与启动¶
loop = IOLoop.current()
loop.start()
创建 IOLoop.current()
¶
IOLoop.current
和 IOLoop.instance
都可以用来创建事件循环,其区别为:
class IOLoop(Configurable):
_instance_lock = threading.Lock()
_current = threading.local()
def instance():
if not hasattr(IOLoop, "_instance"):
with IOLoop._instance_lock:
# 避免多线程并发调用,所以 instance() 始终返回单例
if not hasattr(IOLoop, "_instance"):
# New instance after double check
IOLoop._instance = IOLoop() # 类变量为线程共享的
return IOLoop._instance
def current(instance=True):
current = getattr(IOLoop._current, "instance", None)
# 当前线程没有对应的 IOLoop 时,默认调用 instance 完成创建
if current is None and instance:
return IOLoop.instance()
return current
在主线程中调用 instance
和 current
是等价的。其他线程可以通过直接实例化 loop = IOLoop()
创建自己的事件循环(实例化过程中默认会调用 IOLoop.make_current
方法绑定到当前线程),或者通过 IOLoop.instance()
获取主线程的事件循环。
print("In main thread ", id(IOLoop.instance()), id(IOLoop.current()))
t = Thread(
target=lambda : IOLoop() and print("In other thread ", id(IOLoop.instance()), id(IOLoop.current()))
)
t.start(), t.join()
# In main thread 140101302999120 140101302999120
# In other thread 140101302999120 140101291243856
可以看到线程 t
通过 IOLoop.instance()
拿到了主线程的 IOLoop,由于其直接实例化了 IOLoop()
,所以 IOLoop.current()
拿到了自己独有的 IOLoop。
启动事件循环 loop.start()
¶
class PollIOLoop(IOLoop):
def start(self):
try:
while True:
pass
finally:
pass
创建过程,以 EPollIOLoop
为例¶
print(type(IOLoop()))
# <class 'tornado.platform.epoll.EPollIOLoop'>
上例可以看到,IOLoop()
最后生成了 EPollIOLoop
的实例,那么这个过程是怎样的呢?
class EPollIOLoop(PollIOLoop):
def initialize(self, **kwargs):
super(EPollIOLoop, self).initialize(impl=select.epoll(), **kwargs)
class PollIOLoop(IOLoop):
def initialize(self, impl, time_func=None, **kwargs):
super(PollIOLoop, self).initialize(**kwargs)
self._impl = impl
class IOLoop(Configurable):
@classmethod
def configurable_base(cls):
return IOLoop
@classmethod
def configurable_default(cls):
if hasattr(select, "epoll"):
# epoll 可用则返回 EPollIOLoop
from tornado.platform.epoll import EPollIOLoop
return EPollIOLoop
# 其他情形下返回 KQueueIOLoop 或者 SelectIOLoop
def initialize(self, make_current=None):
if make_current is None:
if IOLoop.current(instance=False) is None:
# 当前线程没有事件循环则将该事件循环设置为此线程的事件循环
self.make_current()
class Configurable(object):
__impl_kwargs = None
def __new__(cls, *args, **kwargs):
base = cls.configurable_base() # base 为 IOLoop
init_kwargs = {}
if cls is base:
# 直接实例化 loop = IOLoop() 时
# 根据当前系统环境进行选择,此时为 EPollIOLoop
impl = cls.configured_class()
if base.__impl_kwargs:
init_kwargs.update(base.__impl_kwargs)
else:
# 显式实例化 loop = EpollIOloop() 时
impl = cls
init_kwargs.update(kwargs)
# 实例化 EPollIOLoop
instance = super(Configurable, cls).__new__(impl)
# 调用 EPollIOLoop 中的 initialize 方法
instance.initialize(*args, **init_kwargs)
return instance
IOloop()
或者 EpollIOloop()
创建事件循环对象时,根据当前环境,前者隐式创建了 EpollIOloop
的对象,和后者等价。 实例化 IOLoop()
的调用顺序为: Configurable.__new__()
# 用于判断是通过 IOLoop() 隐式创建还是 EPollIOLoop() 等显式创建
IOLoop.configurable_base()
# 根据当前系统环境进行选择,此时为 EPollIOLoop
IOLoop.configured_class()
# 确定具体实现为 select.epoll()
EPollIOLoop().initialize()
# 初始化 IOLoop 的状态、callback 队列等
PollIOLoop().initialize()
# 调用 make_current
IOLoop().initialize()
# 若当前线程没有设置事件循环,则将当前实例化的事件循环设置为当前线程的事件循环
IOLoop().make_current()
IOLoop
的休眠与唤醒¶
IOLoop 本质上是一个无限循环,每次循环都会通过 event_pairs = self._impl.poll(poll_timeout)
检测 I/O 事件,如果 poll_timeout
时间内没有 I/O事件,循环就会在这段时间阻塞,线程进入休眠。此期间如果其他线程调用此 IOLoop 的 loop.stop()
或 loop.add_callback()
等操作,IOLoop 就无法及时响应,所以需要有一套机制能够在 poll_timeout
期间解除阻塞并唤醒线程。Waker
承担了这份工作。
def _set_nonblocking(fd):
# 设置 fd 为非阻塞
flags = fcntl.fcntl(fd, fcntl.F_GETFL)
fcntl.fcntl(fd, fcntl.F_SETFL, flags | os.O_NONBLOCK)
def set_close_exec(fd):
# 在 fork 子进程中执行 exec 的时候,子进程会自动关闭继承得到的 fd。
flags = fcntl.fcntl(fd, fcntl.F_GETFD)
fcntl.fcntl(fd, fcntl.F_SETFD, flags | fcntl.FD_CLOEXEC)
class Waker(interface.Waker):
def __init__(self):
r, w = os.pipe()
_set_nonblocking(r)
_set_nonblocking(w)
set_close_exec(r)
set_close_exec(w)
self.reader = os.fdopen(r, "rb", 0)
self.writer = os.fdopen(w, "wb", 0)
def fileno(self):
return self.reader.fileno()
def wake(self):
# 写入的数据并无实际意义
self.writer.write(b"x")
def consume(self):
# 只是将管道内数据消费掉
while True:
result = self.reader.read()
class PollIOLoop(IOLoop):
def initialize(self, impl, time_func=None, **kwargs):
# 保存对 fd 的处理函数,
self._handlers = {}
self._waker = Waker()
# 将 waker 的 reader 添加到 epoll 的监听中,当有可读事件的时候,阻塞就解除了
self.add_handler(self._waker.fileno(), lambda fd, events: self._waker.consume(), self.READ)
def add_handler(self, fd, handler, events):
# 这里取得fd.fileno() 和 fd 对象,做一个 fd 和 handler 的绑定
fd, obj = self.split_fd(fd)
self._handlers[fd] = (obj, stack_context.wrap(handler))
# 将 fd 注册到 epoll 监听列表中
self._impl.register(fd, events | self.ERROR)
def start(self):
while True:
self._run_callback(self._callbacks.popleft())
# No timeouts and no callbacks, so use the default.
poll_timeout = _POLL_TIMEOUT # 3600.0
# 如果此时 poll_timeout = 3600,且监听的 fd 都没有事件,线程会阻塞在此,进入休眠
# 这时其他线程调用 add_callback 给当前 IOLoop 添加的 callback 就无法处理了
# 所以在 add_callback 时调用 wake 方法,由于 epoll 监听了管道 reader 对应的 fd
# 阻塞解除,进行下一轮循环
event_pairs = self._impl.poll(poll_timeout)
self._events.update(event_pairs)
while self._events:
fd, events = self._events.popitem()
try:
# 获取到 fd 对应事件后调用相应的处理函数
fd_obj, handler_func = self._handlers[fd]
# self._waker.consume() 消费掉管道中的数据
handler_func(fd_obj, events)
def add_callback(self, callback, *args, **kwargs):
if thread.get_ident() != self._thread_ident:
# 其他线程对此 IOLoop 调用 add_callback
self._waker.wake()
def stop(self):
# 如果 IOLoop 阻塞,调用 stop 状态不能及时更新,所以也需要 wake
self._running = False
self._stopped = True
self._waker.wake()
占位符 Future
¶
Future
封装了异步操作的结果,并非线程安全,因此与单线程事件循环一起使用更快。Tornado 中常和 IOLoop.add_future
连用,或者由 gen.coroutine
yield。其设计和结构都非常简单,需要注意的是它的回调处理,它通过回调连接了 IOLoop 和 Runner ,协助完成了生成器的调度执行。
class Future(object):
def __init__(self):
self._done = False # 操作是否完成
self._result = None
self._exc_info = None
# 保存操作完成后的回调函数
self._callbacks = []
def result(self, timeout=None):
# 操作完成返回结果,操作失败抛出报错
self._clear_tb_log()
if self._result is not None:
# 已有结果
return self._result
if self._exc_info is not None:
# 已有报错
try:
raise_exc_info(self._exc_info)
finally:
self = None
self._check_done()
return self._result
def set_result(self, result):
# 设置结果并触发回调
self._result = result
self._set_done()
def _set_done(self):
self._done = True
for cb in self._callbacks:
# 这里的 cb 就包括 IOLoop 中添加的 lambda future: self.add_callback(callback, future)
# callback 为 Runner.run
# IOLoop 下一次循环就会通过 Runner.run 继续执行原生成器了
cb(self)
self._callbacks = None
协程装饰器 @gen.coroutine
¶
def coroutine(func, replace_callback=True):
return _make_coroutine_wrapper(func, replace_callback=True)
# key 为 future 对象,value 为 Runner 对象
# 这个映射提供了对 Runner 对象的强引用,只要它们的结果 future 对象也有强引用(通常来自父协程的 Runner), Runner 就不会被 GC 回收
# 当 future 没有引用时,对应 Runner 也就可以被 GC 回收了
_futures_to_runners = weakref.WeakKeyDictionary()
def _make_coroutine_wrapper(func, replace_callback):
wrapped = func
@functools.wraps(wrapped)
def wrapper(*args, **kwargs):
future = TracebackFuture() # TracebackFuture = Future
try:
# 调用 func,如果是普通函数,则同步执行其中代码
# 如果是生成器函数,直接返回生成器
result = func(*args, **kwargs)
except (Return, StopIteration) as e:
result = _value_from_stopiteration(e)
except Exception:
future.set_exc_info(sys.exc_info())
return future
else:
if isinstance(result, GeneratorType):
try:
# result:被装饰的 func 调用后返回的生成器
# 这里第一次执行生成器
yielded = next(result)
else:
# future:生成器调用完成最终结果保存在此 future
# yielded:生成器第一次返回的结果
_futures_to_runners[future] = Runner(result, future, yielded)
return future
# func 是同步函数,执行完的 result 保存在 future 中
future.set_result(result)
return future
return wrapper
在使用 @gen.coroutine
装饰一个函数(代码块中没有 yield
关键字)时,代码同步执行,结果通过 future.set_result(result)
直接返回,future 为已完成状态。在装饰一个生成器时,代码异步执行,返回的 future 为未完成状态,后续的执行包装在 Runner 中,然后存放在 _futures_to_runners
,等到第一次 yield 的 future (下例中为 gen.sleep(10)
)为已完成状态时,IOLoop 通过回调调用 Runner.run()
使生成器继续运行。
@gen.coroutine
def foo():
return 1 + 2
future = foo()
print(future.done(), future.result(), gen._futures_to_runners.keys())
# (True, 3, [])
@gen.coroutine
def bar():
yield gen.sleep(10)
raise gen.Return(1 + 2)
future = bar()
print(future.done(), hex(id(future)), gen._futures_to_runners.keys())
# (False, '0x7f89f51a3710', [<tornado.concurrent.Future object at 0x7f89f51a3710>])
生成器运行工具 Runner
¶
在装饰器 @gen.coroutine
中,将被装饰的生成器、生成器的最终 future、第一次调用生成器 yield 的结果包装成了一个 Runner 对象(Runner(result, future, yielded)
),后续所有对生成器的执行,都由 Runner 进行:
class Runner(object):
def __init__(self, gen, result_future, first_yielded):
self.gen = gen # 待运行的生成器
self.result_future = result_future # 生成器的最终结果
self.future = _null_future
self.io_loop = IOLoop.current()
if self.handle_yield(first_yielded):
self.run()
def handle_yield(self, yielded):
# 处理每一次 yield 出来的结果,包装后存在 self.future 中
self.future = convert_yielded(yielded)
if not self.future.done() or self.future is moment:
def inner(f):
# Break a reference cycle to speed GC.
f = None # noqa
self.run()
# 未完成的话放到 IOLoop 中注册一下
# 这里的注册一下指的是 lambda future: self.add_callback(callback, future)
# 在 future 完成的时候会将 callback(run)添加到 IOLoop 的 callback 列表
self.io_loop.add_future(self.future, inner)
return False
return True
def run(self):
""" 运行或是暂停生成器,运行直到下一个没有完成的 yield point """
try:
while True:
# 首次运行时,self.future 包装了第一次调用 next(gen) 的结果
# 可以理解为到达了一个 yield point
future = self.future
if not future.done():
# yield point 未完成,暂停生成器的运行
return
# 已完成
self.future = None
value = future.result()
future = None
# 继续运行生成器
yielded = self.gen.send(value)
# yielded 就是下一个 yield point 了
# handle_yield 会把 yielded 设置为 self.future 的,保证每次循环都能继续
if not self.handle_yield(yielded):
return
# 未完成的 future 添加到 IOLoop,完成后调用 inner(就是 run)
# self.io_loop.add_future(self.future, inner)
class IOLoop(Configurable):
def add_future(self, future, callback):
# 这个 lambda 函数在 future 完成时执行,比如 future.set_result 时
# 此时实际执行的内容是 IOLoop(这里的self) 的 add_callback
# 将 inner (这里的 callback) 方法添加到 IOLoop 的 _callbacks 队列,inner 的参数为 future
# 总的来说,就是未完成的 future 添加到 IOLoop 中,完成时调用 Runner 的 run 方法,继续执行生成器
future.add_done_callback(lambda future: self.add_callback(callback, future))
class PollIOLoop(IOLoop):
def add_callback(self, callback, *args, **kwargs):
self._callbacks.append(functools.partial(stack_context.wrap(callback), *args, **kwargs))
def start(self):
# inner 并不是在 future.set_result 时、立即执行,而是添加到 _callbacks 中
# 在 IOLoop 的循环中执行
self._run_callback(self._callbacks.popleft())
异步 sleep()
与异步 fetch()
¶
gen.sleep()
¶
def sleep(duration):
# 直接返回一个未完成 future,并在 IOLoop 中注册回调函数,
# 等 duration 后 set_result(None)
f = Future()
IOLoop.current().call_later(duration, lambda: f.set_result(None))
return f
class IOLoop(Configurable):
def call_later(self, delay, callback, *args, **kwargs):
# 在 self.time() + delay 时间后,调用 callback 也就是 f.set_result(None)
return self.call_at(self.time() + delay, callback, *args, **kwargs)
class PollIOLoop(IOLoop):
def initialize(self, impl, time_func=None, **kwargs):
# 最小堆实现,堆顶元素的 deadline 距离当前时间最近
self._timeouts = [] # type: List[_Timeout]
def call_at(self, deadline, callback, *args, **kwargs):
# _Timeout 就是对 deadline 以及需要执行的 callback 的一个简单封装
timeout = _Timeout(
deadline,
functools.partial(stack_context.wrap(callback), *args, **kwargs),
self
)
heapq.heappush(self._timeouts, timeout)
return timeout
def start(self):
# 待执行的 timeout 列表
due_timeouts = []
if self._timeouts:
now = self.time()
while self._timeouts:
if self._timeouts[0].callback is None:
# timeout 被取消执行
elif self._timeouts[0].deadline <= now:
# 堆顶元素到了执行时间,添加到待执行列表
due_timeouts.append(heapq.heappop(self._timeouts))
else:
# 堆顶元素都没到执行时间,剩下的不可能到执行时间
break
for timeout in due_timeouts:
if timeout.callback is not None:
# timeout.callback 即 lambda: f.set_result(None)
# set_result 会将 inner 函数(Runner().run)添加到 IOLoop 中
# 这样生成器就会接着运行了
self._run_callback(timeout.callback)
gen.sleep()
的实现过程非常简单,调用它直接返回一个未完成的 Future 对象,并在 IOLoop 的 _timeouts(使用最小堆实现)中添加一个 _Timeout 对象,在 IOLoop 的循环过程中检测 _timeouts,到达执行时间后便对相应的 Future 对象执行 f.set_result(None)
。如果有如下代码: @gen.coroutine
def foo():
yield gen.sleep(3)
print("end sleep")
由于 gen.sleep(3)
返回的 future 未完成,所以 Runner 暂停了生成器 foo
的执行,3 秒后,IOLoop 获取 due_timeouts 并执行 f.set_result(None)
,inner
作为 future 绑定的 callback 被调用,Runner.run
被执行,此时判断条件 if not future.done():
为 False
,生成器 foo
继续运行:yielded = self.gen.send(value)
。
AsyncHTTPClient().fetch()
¶
AsyncHTTPClient()
实际生成的对象是 SimpleAsyncHTTPClient
的实例。其隐式的创建过程和上面调用 IOLoop()
生成 EpollIOLoop
的实例一样。
class AsyncHTTPClient(Configurable):
def fetch(self, request, callback=None, raise_error=True, **kwargs):
if not isinstance(request, HTTPRequest):
request = HTTPRequest(url=request, **kwargs)
def handle_response(response):
future.set_result(response)
self.fetch_impl(request, handle_response)
return future
class SimpleAsyncHTTPClient(AsyncHTTPClient):
def initialize(...):
# 正在执行,key 为 object(),value 为 二元组,形式为(HTTPRequest(url), callback)
self.active = {}
# 等待执行,key 为 object(),value 为 三元组,形式为(HTTPRequest(url), callback, timeout_handle)
self.waiting = {}
# 默认 10
self.max_clients = max_clients
# 任务队列,元素为三元组,形式为 (object(), HTTPRequest(url), callback)
self.queue = collections.deque()
# 后续的所有 _HTTPConnection 的实例都使用同一个 tcp_client
self.tcp_client = TCPClient(resolver=self.resolver, io_loop=io_loop)
def fetch_impl(self, request, callback):
key = object()
self.queue.append((key, request, callback))
if not len(self.active) < self.max_clients:
# 如果正在运行的任务 >= max_clients,则计算一个 timeout
# 添加 _on_timeout 方法作为 timeout_handle 到IOLoop
# 如果到达 timeout 还未执行,则 raise 一个超时报错(status_code=599)
# 如果执行了则取消 timeout_handle
else:
timeout_handle = None
self.waiting[key] = (request, callback, timeout_handle)
self._process_queue()
def _process_queue(self):
while self.queue and len(self.active) < self.max_clients:
key, request, callback = self.queue.popleft()
# 从 waiting 中移出,如果有 timeout_handle,则在 IOLoop 中取消
self._remove_timeout(key)
# 放入到 active 中,表示正在执行
self.active[key] = (request, callback)
release_callback = functools.partial(self._release_fetch, key)
self._handle_request(request, release_callback, callback)
def _release_fetch(self, key):
# 请求已完成,从 active 中移出,继续从 self.queue 中取任务执行
del self.active[key]
self._process_queue()
def _handle_request(self, request, release_callback, final_callback):
# self._connection_class() 返回 _HTTPConnection 这个类
# 多个请求使用同一个 SimpleAsyncHTTPClient 对象
# 每个请求使用不同的 _HTTPConnection 对象
self._connection_class()(
self.io_loop, self, request, release_callback,
final_callback, self.max_buffer_size, self.tcp_client,
self.max_header_size, self.max_body_size)
class _HTTPConnection(httputil.HTTPMessageDelegate):
def __init__():
self.io_loop = io_loop
# 这个 connection 对应的 SimpleAsyncHTTPClient
self.client = client
self.request = request
# 即 SimpleAsyncHTTPClient 的 _release_fetch 方法
self.release_callback = release_callback
# 即 SimpleAsyncHTTPClient 的 fetch 方法中的 handle_response 方法
self.final_callback = final_callback
# connect() 是一个被 @gen.coroutine 装饰的生成器
# 这里生成器就由 Runner 来运行了
self.tcp_client.connect(
# 这个 callback 是在 _make_coroutine_wrapper 中处理的
callback=self._on_connect
)
class TCPClient(object):
@gen.coroutine
def connect(self, host, port, af=socket.AF_UNSPEC, ssl_options=None,
max_buffer_size=None, source_ip=None, source_port=None):
# DNS 解析
addrinfo = yield self.resolver.resolve(host, port, af)
connector = _Connector(
addrinfo,
self.io_loop,
functools.partial(
self._create_stream,
max_buffer_size,
source_ip=source_ip,
source_port=source_port
)
)
# connector.start() 调用 connector.try_connect() 接着调用 connector.connect()
# connect 就是实例化 _Connector 传入的 self._create_stream 的偏函数
af, addr, stream = yield connector.start()
raise gen.Return(stream)
def _create_stream(self, max_buffer_size, af, addr, source_ip=None, source_port=None):
# 创建了本次连接使用的 socket 对象
socket_obj = socket.socket(af)
try:
stream = IOStream(socket_obj, io_loop=self.io_loop, max_buffer_size=max_buffer_size)
except socket.error as e:
fu = Future()
fu.set_exception(e)
return fu
else:
return stream.connect(addr)
class IOStream(BaseIOStream):
def connect(self, address, callback=None, server_hostname=None):
# 监听 socket_obj 的写事件
self._add_io_state(self.io_loop.WRITE)
class BaseIOStream(object):
def _add_io_state(self, state):
if self._state is None:
# 将 socket 对应的 fd 添加到 IOLoop 的监听列表
self.io_loop.add_handler(self.fileno(), self._handle_events, self._state)
class PollIOLoop(IOLoop):
def add_handler(self, fd, handler, events):
fd, obj = self.split_fd(fd)
# 对应 fd 有事件后的处理函数
self._handlers[fd] = (obj, stack_context.wrap(handler))
# 在 epoll 中注册 fd
self._impl.register(fd, events | self.ERROR)
SimpleAsyncHTTPClient
保存了一个任务队列,内容为其所发出的请求以及读取响应后的回调,每一个请求都实例化一个 _HTTPConnection
对象进行封装,_HTTPConnection
封装了一些 HTTP 操作(比如计算 Content-Length 、选择 Content-Type 等),然后通过 TCPClient
创建 IOStream
将该请求使用的 fd 添加到 IOLoop 中。这样在 IOLoop 就能监听到相应的 IO 事件了。 上下文 stack_context
¶
在 IOLoop 的 add_callback
方法中,所有的回调方法被添加到 self._callbacks
之前,都使用 stack_context.wrap(callback)
进行包装,它的作用是什么?实现的逻辑又是怎样的呢?
class PollIOLoop(IOLoop):
def add_callback(self, callback, *args, **kwargs):
self._callbacks.append(functools.partial(stack_context.wrap(callback), *args, **kwargs))
在 stack_context.py
的头部注释中,有一个示例:
@contextlib.contextmanager
def die_on_error():
try:
yield
except Exception:
logging.error("exception in asynchronous operation",exc_info=True)
sys.exit(1)
with StackContext(die_on_error):
# Any exception thrown here *or in callback and its descendants*
# will cause the process to exit instead of spinning endlessly
# in the ioloop.
http_client.fetch(url, callback)
ioloop.start()
其意思是说,在 StackContext(die_on_error)
上下文中的报错,会被 die_on_error
中的 try-except 捕获,然后执行 sys.exit(1)
结束程序。而且神奇的是 callback
中的报错也能被 die_on_error
捕获处理,我们知道,执行 http_client.fetch(url, callback)
的时候,是不会同步执行 callback
的,需要获取数据后才会回调执行,此时的报错能被 die_on_error
中的 try-except 捕获,说明 callback
必然绑定了 die_on_error
所提供的上下文环境,那么很显然 stack_context.wrap(callback)
就是为 callback
来绑定上下文的。
import sys
import contextlib
from tornado.stack_context import StackContext
from tornado.ioloop import IOLoop
@contextlib.contextmanager
def foo():
try:
yield
except Exception:
print("catch in foo")
sys.exit(1)
@contextlib.contextmanager
def bar():
try:
yield
except Exception:
print("catch in bar")
raise
with StackContext(foo):
with StackContext(bar):
def callback():
raise Exception("in callback")
IOLoop.current().add_callback(callback)
try:
IOLoop.current().start()
except KeyboardInterrupt:
sys.exit(0)
# catch in bar
# catch in foo
修改一下官方示例,运行代码可以发现 callback
中的异常被 bar
foo
捕获了。如果将 stack_context.wrap
中的内容移除,会怎样呢?将下面的代码加在上述示例的开头:
from tornado import stack_context
def _wrap(fn):
def inner(*args, **kwargs):
return fn(*args, **kwargs)
return inner
# 将 stack_context.wrap 替换为一个没有实际作用的装饰器
stack_context.wrap = _wrap
再次运行可以发现 callback
中的异常就没有被 foo
bar
捕获,所以 with StackContext(foo)
用于创建上下文,而 stack_context.wrap
用于将上下文绑定给 callback
。
class _State(threading.local): # 线程隔离
def __init__(self):
self.contexts = (tuple(), None) # tuple() 位置存放所有绑定的上下文,None 位置存放顶部上下文
_state = _State() # 单例
def wrap(fn):
# 为了在闭包中修改 cap_contexts 设置为 list
cap_contexts = [_state.contexts]
if not cap_contexts[0][0] and not cap_contexts[0][1]:
# 没有上下文,fn 执行时也不需要进入上下文了
def null_wrapper(*args, **kwargs):
try:
current_state = _state.contexts
_state.contexts = cap_contexts[0]
return fn(*args, **kwargs)
finally:
_state.contexts = current_state
# 设置标识避免重复装饰
null_wrapper._wrapped = True
return null_wrapper
def wrapped(*args, **kwargs):
ret = None
try:
# Capture old state
current_state = _state.contexts
# Remove deactivated items
# 移除被调用了 deactivate 的上下文
cap_contexts[0] = contexts = _remove_deactivated(cap_contexts[0])
# Force new state
_state.contexts = contexts
# Current exception
exc = (None, None, None)
top = None
# Apply stack contexts
# 当前需要进入的上下文
last_ctx = 0
stack = contexts[0]
# Apply state
for n in stack:
try:
n.enter()
last_ctx += 1
except:
# Exception happened. Record exception info and store top-most handler
# 进入上下文时报错,top 保存上一个上下文
exc = sys.exc_info()
top = n.old_contexts[1]
if top is None:
# top 为 None 进入上下文没有报错,直接执行 fn
try:
ret = fn(*args, **kwargs)
if top is not None:
# top 不为 None 进入上下文有报错,处理报错
else:
while last_ctx > 0:
last_ctx -= 1
c = stack[last_ctx]
try:
# 退出进入过的上下文
c.exit(*exc)
except:
finally:
_state.contexts = current_state
return ret
wrapped._wrapped = True
return wrapped
此模块在 Tornado 5.1 中被标记为弃用,在6.0 中被正式移除,原因是 async def
原生协程使 stack_context
无法提供原有功能。
一些问题¶
阻塞代码¶
在单线程场景下,如果生成器中有阻塞线程的代码,就会导致 IOLoop 被阻塞,这时生成器之间的执行顺序可能会影响对结果的预计。比如:
def log(str_):
print("{} {}".format(datetime.now(), str_))
@gen.coroutine
def f1():
log("f1 start")
yield time.sleep(3)
log("f1 end")
@gen.coroutine
def f2():
log("f2 start")
yield gen.sleep(3)
log("f2 end")
先运行 f1()
后运行 f2()
,可以看到整个过程花费了 6 秒。
loop = IOLoop.current()
f1(), f2()
loop.start()
# 2022-08-25 09:02:43.093956 f1 start
# 2022-08-25 09:02:46.097484 f2 start
# 2022-08-25 09:02:46.097825 f1 end
# 2022-08-25 09:02:49.100197 f2 end
先运行 f2()
后运行 f1()
,结果只花费了 3 秒。
loop = IOLoop.current()
f2(), f1()
loop.start()
# 2022-08-25 09:03:27.147476 f2 start
# 2022-08-25 09:03:27.147732 f1 start
# 2022-08-25 09:03:30.149497 f1 end
# 2022-08-25 09:03:30.149698 f2 end
结合上面对 Runner
的理解以及对 gen.sleep
的实现分析不难明白其原因。