Jetty中如何实现servlet的异步Request调用

2018/08/07 17:10:12 No Comments

在servlet3.1规范当中,已经支持在servlet中通过调用request.startAsync来开启1个异步调用,然后在相应的业务线程里面进行一些业务操作,再通过asyncContext.complete即完成业务的整个操作。一个参考的demo如下所示:

    val context = req.startAsync();
    //重新设置业务超时时间
    context.setTimeout(40_000);

    Runnable runnable = () -> {
        try{
            //执行你的业务操作

            //输出数据
            context.getResponse().getWriter().println(totalMoney);

            //完成业务
            context.complete();
        } catch(Exception e) {
            e.printStackTrace();
        }
    };
    
    new Thread(runnable).start();

在上面的参考中,原来的servlet在调用完 thread.start之后,相应的逻辑即完成。相应的容器线程则已经还给线程池,此线程即可以接收其它客户端的请求并处理了. 异步servlet的目的即在于将接收请求的io线程与实际的业务执行相分开,避免过慢的业务阻塞了整个容器,而不能再接收更多的请求了.甚至可以在后端的业务池中定义一个队列,将要进行执行的业务逻辑放入队列里面慢慢执行。

由于异步Servlet的目的在于将web容器的io线程与业务线程分离,那么关键的部分即在于当servlet方法执行完之后,当context.complete时,如何重新触发相应io流的操作。本文尝试以jetty为参考,从源码角度查看其实现的原理和机制。

本文参考的jetty版本为 9.4.11.v20180605

1. 封装相应的request和response

当业务在调用request.startAsync时,将会直接创建一个 AsyncContext 对象。按照标准API的定义,相应的context会将相应的request和response封闭在内部,以供业务端使用。在jetty中,整个模型结构参考如下所示。

  • HttpChannel 用于维护整个http请求/响应的通道流对象,其可以理解为一个封装了http协议的socket对象
  • HttpChannelState持有对整个io通道的相应信息以及状态对象,其内部保存了当前请求响应具体执行的步骤信息,以及当前在异步处理时当前正在处于哪一个步骤。可以通过此对象来追踪业务的具体执行生命周期情况
  • AsyncContextState即描述在异步请求过程中的状态信息,其内部连接httpChannel和state信息,即当开始异步访问时,相应的异步状态对象即会开启,其同时实现了相应的AsyncContext接口,用于与业务作api对接.
  • AsyncContextEvent用于描述当业务端发起startAsync时的一个事件信息,共内部连接 asyncState和 channelState对象,使用此对象可以通过内部封装的 asyncState 拿到其它的对象,如reqeust response等

在上面的对象当中,主要关注的对象即为HttpChannelState,其内部关注的状态对象为 State 和 Async,两者均是内部类,其相应的枚举对象参考如下:

    /**
     * The state of the HttpChannel,used to control the overall lifecycle.
     */
    public enum State
    {
        IDLE,             // Idle request
        DISPATCHED,       // Request dispatched to filter/servlet
        THROWN,           // Exception thrown while DISPATCHED
        ASYNC_WAIT,       // Suspended and waiting
        ASYNC_WOKEN,      // Dispatch to handle from ASYNC_WAIT
        ASYNC_IO,         // Dispatched for async IO
        ASYNC_ERROR,      // Async error from ASYNC_WAIT
        COMPLETING,       // Response is completable
        COMPLETED,        // Response is completed
        UPGRADED          // Request upgraded the connection
    }

    /**
     * The state of the servlet async API.
     */
    private enum Async
    {
        NOT_ASYNC,
        STARTED,          // AsyncContext.startAsync() has been called
        DISPATCH,         // AsyncContext.dispatch() has been called
        COMPLETE,         // AsyncContext.complete() has been called
        EXPIRING,         // AsyncContext timeout just happened
        EXPIRED,          // AsyncContext timeout has been processed
        ERRORING,         // An error just happened
        ERRORED           // The error has been processed
    }

State用于描述在标准的request/response响应过程中,当前的执行的步骤值和到了哪个步骤。比如已经分发到业务流程中,开启异步等,即此对象更关注于request对象类的步骤.
Async类则用于描述在异步处理过程中,特定的异步执行点。比如启动异步处理,完成异步等,此对象关注于asyncContext对象的处理过程.

2. 异步处理时,容器的一个处理流程

整个处理流程由HttpChannel来完成,整个类的处理逻辑参考方法 handle(),其会根据State以及Async的不同状态执行不同的操作.

2.1 处理servlet service方法

当处理业务时,Sate的初始状态为Idle,因为其所对应的下一步操作Action即为 DISPATCH,此Action的对应操作即为执行整个业务处理逻辑,代码参考如下:

        case DISPATCH:
        {
            if (!_request.hasMetaData())
                throw new IllegalStateException("state=" + _state);
            _request.setHandled(false);
            _response.getHttpOutput().reopen();

            try
            {
                _request.setDispatcherType(DispatcherType.REQUEST);

                if (!_request.isHandled())
                    getServer().handle(this);
            }

此段逻辑中最后一段 getServer().handle(this) 即为执行整个service方法,其会按照标准的 Filter->Servlet的执行流程处理业务逻辑,即其会最终处理到serlvet的doGet/Post方法.

2.2 业务下一步判定和处理

在业务处理完成之后,此循环会通过判断 channelState对象 的状态决定如何处理下一步操作,相应的方法为 HttpChannelState#unhandle.

这里重新回到request.startAsync的处理代码,在startAsync中,会创建一个事件对象 AsyncContextEvent,并通过 HttpChannelState来启动异步处理。在此逻辑中,相应的异步状态值Async会修改为 STARTED,相应代码如下参考所示:

    public void startAsync(AsyncContextEvent event)
    {

        try(Locker.Lock lock= _locker.lock())
        {
            _async=Async.STARTED;
            _event=event;
            lastAsyncListeners=_asyncListeners;
            _asyncListeners=null;            
        }

此逻辑,即在启用异步时即会触发,那么当 Servlet#service方法执行完之后,执行 HttpChannelState#unhandle 方法时,相应的处理处理逻辑即会判断async的值,并进入不同的Action逻辑,如下参考所示:

    protected Action unhandle()
    {
        boolean read_interested = false;

        try(Locker.Lock lock= _locker.lock())
        {
......
            _initial=false;
            switch(_async)
            {
...
                case STARTED:

                    if (_asyncWritePossible)
                    {
                        _state=State.ASYNC_IO;
                        _asyncWritePossible=false;
                        return Action.WRITE_CALLBACK;
                    }
                    else
                    {
                        _state=State.ASYNC_WAIT;
                        
                        Scheduler scheduler=_channel.getScheduler();
                        if (scheduler!=null && _timeoutMs>0 && !_event.hasTimeoutTask())
                            _event.setTimeoutTask(scheduler.schedule(_event,_timeoutMs,TimeUnit.MILLISECONDS));

                        return Action.WAIT; 
                    }

如上所示,当判定 async为STARTED之后,并且当前并没有回写响应数据,那么相应的state即转换为 ASYNC_WAIT,并且下一步的Action为 WAIT

2.3 HttpChannel针对 WAIT Action的处理

其针对WAIT的处理很简单,就是直接退出循环,即表示此针对此请求的整个业务处理已经结束,在容器处理的层面已经完结。可以理解为该request/response的整个生命周期已经结束。

不过由于异步的业务处理,相应的io信息仍然是存在了,并且在后续的业务处理中仍然是可用的,相应的channel,socket信息并没有被关闭.

3. complete时的响应流回调

在上一步提到在servlet层面,相应的处理已经结束。当调用 AsyncContext#completed 时,其实际上会重新启动相应的channel通道处理,以完结相应的业务处理,即可以理解为其会重新启动相应的HttpChannel#handle 来执行相应的处理,只不过因为状态不一样,而是执行其它的Action.

源码层面,相应的AsyncContext#completed 会委托给 HttpChannelState#complete 来处理,在处理逻辑中,相应的 async会更换为 COMPLETE, 同时,再次执行 HttpChannel#handle方法,如下参考所示:

    public void complete()
    {
        boolean handle=false;
        AsyncContextEvent event;
        try(Locker.Lock lock= _locker.lock())
        {
            _async=Async.COMPLETE;
            _state=State.ASYNC_WOKEN;
        }
            runInContext(event,_channel);
    }

HttpChannel实现了runnable,因此重新执行handle方法,在此方法中将重新根据状态值判定Action, 由 HttpChannelState#handling 的判定推断出相应的 Action为Completed. 参考如下所示:

    protected Action handling()
    {
        try(Locker.Lock lock= _locker.lock())
        {
            
            switch(_state)
            {
                case ASYNC_WOKEN:

                    switch(_async)
                    {
                        case COMPLETE:
                            _state=State.COMPLETING;
                            return Action.COMPLETE;
....

Action.COMPLETE 即可以理解为整个servlet处理周期的扫尾阶段,这里在设置相应的响应头,关闭输出,设置标记位等,即完成最终的请求处理.

4. 业务流程处理超时时的处理

在调用完Request#startAsync 之后,其实容器是并不能完全保证业务代码一定会调用 AsyncContext#completed 这个方法,为保证channel的socket能够被正常处理,避免无限度持有连接信息,因此应该有一个保底的处理。即当业务线程并没有在指定的时间处理完毕时,容器应该主动发起连接的中断处理,以及时断开连接,保证容器的稳定性。

在API定义上,AsyncContext可以通过调用 setTimeout(long) 来设置业务流程的超时时间,其默认值为30_000。即意味着,容器会在完成servlet#service方法之后的 timeout时间后触发相应的超时处理。

在源码上,此逻辑为当 HttpChannel#handle 执行完servlet#service方法之后的判定逻辑中处理,即 参考 2#1 部分所示,当 HttpChannelState判定当前 async为 ASYNC_WAIT,下一步操作为 Action#WAIT时,即会开启一个异步任务,执行到期超时处理。如下参考所示:

    _state=State.ASYNC_WAIT;
    
    Scheduler scheduler=_channel.getScheduler();
    if (scheduler!=null && _timeoutMs>0 && !_event.hasTimeoutTask())
        _event.setTimeoutTask(scheduler.schedule(_event,_timeoutMs,TimeUnit.MILLISECONDS));

    return Action.WAIT; 

这里的timeout即为 AsyncContextState#setTimeout 设置的值,其最终委托给 HttpChannelState来完成。如果此值为业务代码设置,即最终以业务设置为准。此任务对象即 AsyncContextEvent 对象,其最终的超时处理会委托给 HttpChannelState#onTimeout 来处理,其最终逻辑参考如下:

    protected void onTimeout()
    {
        final List<AsyncListener> listeners;
        AsyncContextEvent event;
        try(Locker.Lock lock= _locker.lock())
        {
            if (_async!=Async.STARTED)
                return;
            _async=Async.EXPIRING;
            event=_event;
            listeners=_asyncListeners;
        }

        Throwable th=error.get();

        if (th!=null)
        {
            if (LOG.isDebugEnabled())
                LOG.debug("Error after async timeout {}",this,th);
            onError(th);
        }        
    }

而最终的逻辑即往客户端输出相应的内部服务错误信息.

当然,如果业务正常的完成时,此timeoutTask也会正常的取消,其相应的逻辑散落在相应的 HttpChannelState 各个处理方法中,即会通过调用 cancelTimeout() 来取消相应的任务.

总结

在jetty的整个处理逻辑中,即是围绕着 channel通道对象, 状态对象,以及相应的上下文对象,来完成相应的状态和逻辑的转换操作,最终完成业务的处理。只不过在中间处理过程中,由于异步的概念会影响到流程的处理,需要将流程挂起;在完成之后再重新启用。整个逻辑还是很清晰的,对于了解之前不处理异步servlet体系的容器到支持async概念的容器的实现和设计思路,还是很有参考价值的。同时,在考虑一些极端问题时,也可以做到面面俱到,避免其它问题的产生.

转载请标明出处:i flym
本文地址:https://www.iflym.com/index.php/code/201808070001.html

相关文章:

留下足迹