使用 Servlet3.1 WriteListener进行异步响应输出以及踩过的坑

在标准的spring mvc项目中,当对象数据已经产生时,不管是输出json,还是view,最终均是将相应的数据直接写入response的outputStream中。这里的输出是同步式调用的,即意味着只有当整个数据完整地写到输出流中时,整个业务线程才会被认为执行结束。

这就意味着在传统的基于请求的慢速攻击之外,还有一种基于响应的慢速攻击,其思路即是客户端强制让整个接收变慢,而服务端因为缓存区不足以存储完整的响应数据,而被动地阻塞调用端,进而阻塞业务线程。

在Servlet 3.0版本中,提出了业务中的异步处理,即 通过 HttpServletRequest#startAsync 来开启异步处理。等业务处理完成之后,才通过 AsyncContext#complete 来结束处理。在中间,可以仍然通过 HttpServletResponse#getOutputStream 来输出数据. 这里的异步处理仅仅是将容器的处理线程解放出来,即当业务处理需要长时间或者需要故意挂起请求(如LongPull)时,才有相应的意义。在最终输出数据时,这里仍然是阻塞的,只不过是阻塞业务线程(而非容器线程).造成的后果即是缓慢地IO操作造成线程阻塞。

随着 NIO 的出现,所有Web容器均使用了Nio来处理请求,即当整个请求已经完整读取完毕之后,才转交由具体的业务线程来处理,即在这里的io读取均为非阻塞读取,由很少数的线程就可以处理上百(千)个请求的网络请求. 在 Servlet 3.1 版本中, 规范中给出了 异步响应的概念,即可以通过响应式来达到异步非阻塞输出数据的效果. 从调用端来看,调用write之后即刻返回,并不会阻塞业务线程。后续未刷新到缓冲区的数据将在后续通过标准Nio流程由web容器的IO线程负责处理.

一个参考使用例子如下所示(参考于 https://www.oracle.com/webfolder/technetwork/tutorials/obe/java/HTML5andServlet31/HTML5andServlet%203.1.html):

//开启异步
val context = request.startAsync();
val response = context.getResponse();
val servletOut = response.getOutputStream();

//定义响应监听器,以可写时输出相应的数据
val writeListener = new WriteListenerImpl(context, servletOut, actionQueue);
servletOut.setWriteListener(writeListener);

//一个参考的监听器实现
public class WriteListenerImpl implements WriteListener {
    private ServletOutputStream output = null;
    private Queue queue = null;
    private AsyncContext context = null;

    private boolean flag = false;

    @Override
    public void onWritePossible() throws IOException {
        if (flag) {
            return;
        }

        //这里仅当output.isReady() 时才会调用write方法,否则会报 IllegalStateException
        while (queue.peek() != null && output.isReady()) {
            val action = queue.poll();
            doWrite(action);
        }
    }

    private void doWrite(Action action) throws IOException {
        //已完成标记位
        if (action == Action.COMPLETE) {
            flag = true;
        }

        action.run(context, servletOut);
    }
}
继续阅读“使用 Servlet3.1 WriteListener进行异步响应输出以及踩过的坑”

Jboss EnhancedQueueExecutor源码解读

在JDK线程池中自带的Executor遵循一种典型的生产者,消费者队列模型,即一个统一的阻塞队列,然后一个线程数组不停地消费其中的数据。其本身的处理逻辑为 coreSize->queueSize->maxSize 的增长方式,即先尝试增加 coreSize, 然后再不断地将任务放进队列中,如果队列满了,则再尝试增加 maxSize, 直至拒绝任务。

通过一些手法可以调整策略为 coreSize->maxSize->queueSize。

本文则描述一个由 jboss-threads 中提到的 EnhancedQueueExecutor,中文为增加型队列执行器。其除支持典型的executor模型外,也同样保留如 coreSize,maxSize, queueSize 这些模型。与jdk中实现相区别的是,其本身采用单个链表来完成任务的提交和线程的执行,同时采用额外的数据来存储计数类数据. 更重要的是,其默认线程策略即 coreSize->maxSize->queueSize, 同时可以根据参数调整此策略.

创建对象与ThreadPoolExecutor类似,指定相应的参数即可,如下所示:

EnhancedQueueExecutor executor = new EnhancedQueueExecutor.Builder()
        .setCorePoolSize(corePoolSize)
        .setMaximumPoolSize(maxPoolSize)
        .setKeepAliveTime(Duration.ofMinutes(5))
        .setMaximumQueueSize(1024)
        .setThreadFactory(threadFactory)
        .setExceptionHandler(uncaughtExceptionHandler)
        .setRegisterMBean(false)
        .setGrowthResistance(growthResistance) //增长因子,控制新线程创建逻辑(if >= coreSize时)
        .build();
继续阅读“Jboss EnhancedQueueExecutor源码解读”

使用自定义FutureTask实现大小不固定的定时线程池

在使用定时任务执行线程池ScheduledThreadPoolExecutor时,在相应的定义中,只需要通过coreSize来定义1个大小固定的线程池,并且在其具体的定义中,因为队列长度是无限的,因此maxSize实际上也没有任何作用。其问题在于,如果coreSize定义过大,则会造成线程池中大量的空闲线程,实际上没有任务可作.

如定义1个coreSize为10的定时线程池,即使只周期性的执行1个任务,在一段时间之后,其池中的执行线程会逐渐增多,直到达到coreSize上限. 并且由于定时调度的原因,不能设置 allowCoreThreadTimeOut, 此设置会导致定时的任务因为无线程可用而不会触发,原因在于任务的定时是通过执行线程的takeTask操作被动触发的。

本文通过一个单线程定时线程池和一个额外的普通的ExecutorService进行协作,定时线程池只负责调度,而具体的执行则交给执行线程池来处理。而执行线程池的线程本身是可以设置或处理达到线程数可调节。通过2个线程池之间进行协作,完成调度完成。

同时,本实现也将专门处理 scheduleWithFixedDelay 或 scheduleWithFixedDelay 中的延时处理,保证任务必须在前1个任务执行完成之后才处理下一个任务,而避免简单调度中可能产生同1个任务由于执行超时出现并行执行的问题.

本文将定义定时线程池称之为 defineScheduledExecutor,类型为ScheduledThreadPoolExecutor; 实际执行线程池称之为 realExecutor,类型为 ExecutorService

继续阅读“使用自定义FutureTask实现大小不固定的定时线程池”

实现优先使用运行线程及调整线程数大小的线程池

当前在JDK中默认使用的线程池 ThreadPoolExecutor,在具体使用场景中,有以下几个缺点

  1. core线程一般不会timeOut
  2. 新任务提交时,如果工作线程数小于 coreSize,会自动先创建线程,即使当前工作线程已经空闲,这样会造成空闲线程浪费
  3. 设置的maxSize参数只有在队列满之后,才会生效,而默认情况下容器队列会很大(比如1000)

如一个coreSize为10,maxSize为100,队列长度为1000的线程池,在运行一段时间之后的效果会是以下2个效果:

  1. 系统空闲时,线程池中始终保持10个线程不变,有一部分线程在执行任务,另一部分线程一直wait中(即使设置allowCoreThreadTimeOut)
  2. 系统繁忙时,线程池中线程仍然为10个,但队列中有还没有执行的任务(不超过1000),存在任务堆积现象

本文将描述一下简单版本的线程池,参考于 Tomcat ThreadPoolExecutor, 实现以下3个目标

  1. 新任务提交时,如果有空闲线程,直接让空闲线程执行任务,而非创建新线程
  2. 如果coreSize满了,并且线程数没有超过maxSize,则优先创建线程,而不是放入队列
  3. 其它规则与ThreadPoolExecutor一致,如 timeOut机制
继续阅读“实现优先使用运行线程及调整线程数大小的线程池”

一种在线程池中透传或继承ThreadLocal信息的方法

在实际的业务代码中,经常会使用到 ThreadLocal 用于跨业务代码 来获取在上游设置的值。比如,在spring mvc中 spring web mvc 中通过 RequestContextHolder 设置 HttpServletRequest,业务代码则可以在 controller 或者是 service中 通过 RequestContextHolder#getRequestAttributes 获取相应的对象. 但这种方法有一个限制,即 setValue 和 getValue 的代码必须在同一个线程内. 当然,这也是属于通过 ThreadLocal 来避免竞争的一种手法.

针对之前已经可以工作的代码,如果将相应的业务代码 迁移至一个新的线程池中运行,即封装为 1个 runnable 对象,那么相应的代码即不能正确地工作了。

如下参考所示

ThreadLocal<String> local = new ThreadLocal<>();
local.set("value1");

//打印 获取->value1
System.out.println("获取->" + local.get());

Runnable runnable = () -> {
    System.out.println("获取->" + local.get());
};
//打印 获取->null
new Thread(runnable).start();

上面的 sout 可以是任意1个业务上的 method 调用。仅仅是将 method调用 转由新线程来运行,相应的业务逻辑即不能正常工作。 至于这里将调用转由新线程来运行,可以有很多的场景。如 支持 timedout 调用(利用future.get(timed))。

本文通过反射调用读取当前Thread的信息,将值注入到新线程中的threadLocal中,以达到透传threadLocal的目的。不需要修改任何业务代码,也不需要使用InheritableThreadLocal(此类也并不用于当前场景)

主要的实现基于以下步骤

  • 提前提取当前线程ThreadLocal变量值
  • 执行时复制至新线程中
  • 执行结束之后删除未变化值

本文中的代码均基于反射调用,需要打开相应的 Accessible 属性.

继续阅读“一种在线程池中透传或继承ThreadLocal信息的方法”

使用FeignClient进行服务调用时的各类错误列表及快速定位

基于spring cloud 体系的微服务,服务间调用默认使用 openFeign 进行调用,但是由于源码本身的问题。针对于在调用时出现的各类信息,并没有分门别类的错误描述。当业务需要针对不同的错误进行处理时,就不能简单的通过FeignException来进行区分。笔者通过整理服务间调用时的各类异常堆栈及列表。分别列出相应的错误列表及触发时堆栈,以方便后续分别进行业务判断和处理

本应用基于ribbon进行服务查找,使用spring-cloud open-feign, 没有使用retry机制, 底层使用feign-okhttp, 因此,如果有异常与下列堆栈不一致时,请检查是否一致. 

版本: openFeign: 9.7.0, netflix ribbon: 2.2.5,  okhttp3: 3.8.1

总共的异常列表如下所示

  • 无服务时异常
  • 有服务但连接失败
  • 调用中网络错误读取数据超时
  • 响应4xx错误码
  • 响应5xx错误码
  • 解析结果错误(json反序列化失败)
  • 解析结果错误(无converter)

所有的异常触发类均由类 SynchronousMethodHandler 触发,但作用行不一样.
以下异常栈从原始cause,依次往下。如 cause1为 root cause, 最下层为业务层接收到的异常

继续阅读“使用FeignClient进行服务调用时的各类错误列表及快速定位”