/ tornado

一个请求到达tornado后如何被处理

异步非阻塞的tornado能够让同一台服务器接受更多的连接,服务更多用户,但对于每个用户来说,原本要等10秒才出现的浏览结果,现在至少还需要10秒,甚至可能还会慢一点,因为同样的硬件,现在有更多请求被接入进来。

很早打算阅读tornado源码,因为从来没有读过一个web框架的源码,作为后端实在不应该也不利于自己能力提升。最近遇到tornado相关问题,需要阅读源码证实猜想——tornado是否能检测到连接断开,不再处理相关request?

import tornado.ioloop
import tornado.web

class MainHandler(tornado.web.RequestHandler):
    def get(self):
        self.write("Hello, world")

def make_app():
    return tornado.web.Application([
        (r"/", MainHandler),
    ])

if __name__ == "__main__":
    app = make_app()
    app.listen(8888)
    tornado.ioloop.IOLoop.current().start()

app监听在8888端口,一般socket程序接下来是accept调用,在阻塞socket中,accept会使得调用阻塞住,有socket接入时accept才返回。但在这里和一般socket程序不太一样,没有调用accept,而是调用了ioloop的start。

为了能说清楚ioloop,不得不做一些铺垫。tornado被设计成单进程异步非阻塞web框架,核心为非阻塞IO,在各个IO操作的地方都被设计成非阻塞,使得tornado可以在单位时间内处理成倍于其他框架的请求量。因为一个request被接入到最后response发出之间有无数IO操作,如果这些IO操作都是非阻塞的,那么在处理第一个request需要等待IO时,由于非阻塞,就可以处理第二个request,使得CPU一直处于忙碌状态不至于闲着等IO返回。

ioloop使用singleton模式,在同一个进程中,都指的是同一个对象。

@classmethod
def configurable_default(cls):
    if hasattr(select, "epoll"):
        from tornado.platform.epoll import EPollIOLoop
        return EPollIOLoop
    if hasattr(select, "kqueue"):
        # Python 2.6+ on BSD or Mac
        from tornado.platform.kqueue import KQueueIOLoop
        return KQueueIOLoop
    from tornado.platform.select import SelectIOLoop
    return SelectIOLoop

获取ioloop推荐的做法是IOLoop.current(),而ioloop的具体实现是PollIOLoop(Python3是AsyncIOLoop,后续基于Python2的实现),而PollIOLoop的实现根据系统平台不同:EPollIOLoopKQueueIOLoopSelectIOLoop,具体是如何根据平台返回实现可以阅读Configurable接口。

def listen(self, port, address=""):
    """Starts accepting connections on the given port.

    This method may be called more than once to listen on multiple ports.
    `listen` takes effect immediately; it is not necessary to call
    `TCPServer.start` afterwards.  It is, however, necessary to start
    the `.IOLoop`.
    """
    sockets = bind_sockets(port, address=address)
    self.add_sockets(sockets)

在事件循环开始之前,hello world做了两件事——创建了app,让app监听在8888端口。监听8888端口是HTTPServer完成的,这一个非阻塞单线程http server,继承链中有TCPServer,最终是调用TCPServerlisten方法。

def add_sockets(self, sockets):
    """Makes this server start accepting connections on the given sockets.

    The ``sockets`` parameter is a list of socket objects such as
    those returned by `~tornado.netutil.bind_sockets`.
    `add_sockets` is typically used in combination with that
    method and `tornado.process.fork_processes` to provide greater
    control over the initialization of a multi-process server.
    """
    for sock in sockets:
        self._sockets[sock.fileno()] = sock
        self._handlers[sock.fileno()] = add_accept_handler(
            sock, self._handle_connection)

socket监听在制定地址和端口上后,TCPServerioloop上注册accept handler,意思是:当这些sockets可以READ时,调用accept handler

def _handle_connection(self, connection, address):
    if self.ssl_options is not None:
        assert ssl, "Python 2.6+ and OpenSSL required for SSL"
        try:
            connection = ssl_wrap_socket(connection,
                                         self.ssl_options,
                                         server_side=True,
                                         do_handshake_on_connect=False)
        except ssl.SSLError as err:
            if err.args[0] == ssl.SSL_ERROR_EOF:
                return connection.close()
            else:
                raise
        except socket.error as err:
            # If the connection is closed immediately after it is created
            # (as in a port scan), we can get one of several errors.
            # wrap_socket makes an internal call to getpeername,
            # which may return either EINVAL (Mac OS X) or ENOTCONN
            # (Linux).  If it returns ENOTCONN, this error is
            # silently swallowed by the ssl module, so we need to
            # catch another error later on (AttributeError in
            # SSLIOStream._do_ssl_handshake).
            # To test this behavior, try nmap with the -sT flag.
            # https://github.com/tornadoweb/tornado/pull/750
            if errno_from_exception(err) in (errno.ECONNABORTED, errno.EINVAL):
                return connection.close()
            else:
                raise
    try:
        if self.ssl_options is not None:
            stream = SSLIOStream(connection,
                                 max_buffer_size=self.max_buffer_size,
                                 read_chunk_size=self.read_chunk_size)
        else:
            stream = IOStream(connection,
                              max_buffer_size=self.max_buffer_size,
                              read_chunk_size=self.read_chunk_size)

        future = self.handle_stream(stream, address)
        if future is not None:
            self.io_loop.add_future(gen.convert_yielded(future),
                                    lambda f: f.result())
    except Exception:
        app_log.error("Error in connection callback", exc_info=True)

accept handler完成socket连接建立之后调用TCPServer._handle_connection处理连接,在tornado中不会直接对底层socket进行操作,对socket进行操作的是IOStream

@gen.coroutine
def _server_request_loop(self, delegate):
    try:
        while True:
            conn = HTTP1Connection(self.stream, False,
                                   self.params, self.context)
            request_delegate = delegate.start_request(self, conn)
            try:
                ret = yield conn.read_response(request_delegate)
            except (iostream.StreamClosedError,
                    iostream.UnsatisfiableReadError):
                return
            except _QuietException:
                # This exception was already logged.
                conn.close()
                return
            except Exception:
                gen_log.error("Uncaught exception", exc_info=True)
                conn.close()
                return
            if not ret:
                return
            yield gen.moment
    finally:
        delegate.on_close(self)

HTTPServerhandle_stream调用HTTP1ServerConnection开始处理stream,HTTP1ServerConnection调用_server_request_loop循环处理stream,此时stream已被用来生成request,终于到这里出现了熟悉的request。_server_request_loop是个异步函数,当执行到这里时,ioloop可以在闲暇时处理其他事情,比如处理下一个request

我们常说的request在tornado中被看做一个HTTP1Connection,处理request先从解析request message开始。

http request主要由headerbody组成,解析request message就是将这几部分解析出来。header中有start line和我们常说的header,比如Cookieauthorization等;start line是:GET /index HTTP/1.1这样的一行,可以获取到http method、uri、http version信息。

request的headersbody都准备好后就可以开始处理request,处理request就是调用request handler

def __call__(self, request):
    # Legacy HTTPServer interface
    dispatcher = self.find_handler(request)
    return dispatcher.execute()

之前说listen的时候生成了一个HTTPServer对象,初始化该对象时,必填的一个参数是request_callback,在这里app对象被传入初始化HTTPServer。所以,调用request handler就是调用Application实例。在Application中实现了__call__

self.wildcard_router = _ApplicationRouter(self, handlers)
self.default_router = _ApplicationRouter(self, [
    Rule(AnyMatches(), self.wildcard_router)
])
def find_handler(self, request, **kwargs):
    for rule in self.rules:
        target_params = rule.matcher.match(request)
        if target_params is not None:
            if rule.target_kwargs:
                target_params['target_kwargs'] = rule.target_kwargs

            delegate = self.get_target_delegate(
                rule.target, request, **target_params)

            if delegate is not None:
                return delegate

    return None

另外一件在ioloop开始执行前做的事是创建app对象。在Application初始化的时候,handlers被作为参数传进来,放入wildcard_router中,默认路由default_router中将所有请求导入到了wildcard_router中,请求到了wildcard_router再一条条匹配。

def get_target_delegate(self, target, request, **target_params):
    if isclass(target) and issubclass(target, RequestHandler):
        return self.application.get_handler_delegate(request, target, **target_params)

    return super(_ApplicationRouter, self).get_target_delegate(target, request, **target_params)
def execute(self):
    # If template cache is disabled (usually in the debug mode),
    # re-compile templates and reload static files on every
    # request so you don't need to restart to see changes
    if not self.application.settings.get("compiled_template_cache", True):
        with RequestHandler._template_loader_lock:
            for loader in RequestHandler._template_loaders.values():
                loader.reset()
    if not self.application.settings.get('static_hash_cache', True):
        StaticFileHandler.reset()

    self.handler = self.handler_class(self.application, self.request,
                                      **self.handler_kwargs)
    transforms = [t(self.request) for t in self.application.transforms]

    if self.stream_request_body:
        self.handler._prepared_future = Future()
    # Note that if an exception escapes handler._execute it will be
    # trapped in the Future it returns (which we are ignoring here,
    # leaving it to be logged when the Future is GC'd).
    # However, that shouldn't happen because _execute has a blanket
    # except handler, and we cannot easily access the IOLoop here to
    # call add_future (because of the requirement to remain compatible
    # with WSGI)
    self.handler._execute(transforms, *self.path_args,
                          **self.path_kwargs)
    # If we are streaming the request body, then execute() is finished
    # when the handler has prepared to receive the body.  If not,
    # it doesn't matter when execute() finishes (so we return None)
    return self.handler._prepared_future

handlers被添加到router中时也做了一些初始化,最重要的初始化工作是调用了Application.get_handler_delegate进行包装,包装好后的handler有了一些新方法,最重要的是execute,最终执行匹配到的handler_execute方法。

@gen.coroutine
def _execute(self, transforms, *args, **kwargs):
    """Executes this request with the given output transforms."""
    self._transforms = transforms
    try:
        if self.request.method not in self.SUPPORTED_METHODS:
            raise HTTPError(405)
        self.path_args = [self.decode_argument(arg) for arg in args]
        self.path_kwargs = dict((k, self.decode_argument(v, name=k))
                                for (k, v) in kwargs.items())
        # If XSRF cookies are turned on, reject form submissions without
        # the proper cookie
        if self.request.method not in ("GET", "HEAD", "OPTIONS") and \
                self.application.settings.get("xsrf_cookies"):
            self.check_xsrf_cookie()

        result = self.prepare()
        if result is not None:
            result = yield result
        if self._prepared_future is not None:
            # Tell the Application we've finished with prepare()
            # and are ready for the body to arrive.
            self._prepared_future.set_result(None)
        if self._finished:
            return

        if _has_stream_request_body(self.__class__):
            # In streaming mode request.body is a Future that signals
            # the body has been completely received.  The Future has no
            # result; the data has been passed to self.data_received
            # instead.
            try:
                yield self.request.body
            except iostream.StreamClosedError:
                return

        method = getattr(self, self.request.method.lower())
        result = method(*self.path_args, **self.path_kwargs)
        if result is not None:
            result = yield result
        if self._auto_finish and not self._finished:
            self.finish()
    except Exception as e:
        try:
            self._handle_request_exception(e)
        except Exception:
            app_log.error("Exception in exception handler", exc_info=True)
        if (self._prepared_future is not None and
                not self._prepared_future.done()):
            # In case we failed before setting _prepared_future, do it
            # now (to unblock the HTTP server).  Note that this is not
            # in a finally block to avoid GC issues prior to Python 3.4.
            self._prepared_future.set_result(None)

handler执行了具体的方法后,例如执行完get或者post,会调用handlerfinish,此时会组织response发回到客户端,已经接近尾声了。

response几乎是按照相反的方向,从requestHTTP1ConnectionIOStreamsocket直到对端。

至此,一个request从进入tornadoresponse返回的过程就结束了。中间有很多IO方法都是非阻塞的,在等待IO的时候,ioloop可以处理别的request等等。


最前面提到的一个问题:tornado是否能检测到连接断开,不再处理相关request

例如同时有很多请求发到tornado,在处理第一个请求的handler之前,就可能已经处理了一部分其他请求了。可以在handler中time.sleep,这会将ioloop阻塞住,在处理完handler之前,没有其他事情可以被处理。此时之前的请求socket可能会被断开,例如请求发起方设置了read timeout,客户端会断开连接,此时服务端由于需要等handle处理完之后才会将response发回,发回时发现连接已被关闭,然后才会断开连接。