使用自定义FutureTask实现大小不固定的定时线程池

在使用定时任务执行线程池ScheduledThreadPoolExecutor时,在相应的定义中,只需要通过coreSize来定义1个大小固定的线程池,并且在其具体的定义中,因为队列长度是无限的,因此maxSize实际上也没有任何作用。其问题在于,如果coreSize定义过大,则会造成线程池中大量的空闲线程,实际上没有任务可作.

如定义1个coreSize为10的定时线程池,即使只周期性的执行1个任务,在一段时间之后,其池中的执行线程会逐渐增多,直到达到coreSize上限. 并且由于定时调度的原因,不能设置 allowCoreThreadTimeOut, 此设置会导致定时的任务因为无线程可用而不会触发,原因在于任务的定时是通过执行线程的takeTask操作被动触发的。

本文通过一个单线程定时线程池和一个额外的普通的ExecutorService进行协作,定时线程池只负责调度,而具体的执行则交给执行线程池来处理。而执行线程池的线程本身是可以设置或处理达到线程数可调节。通过2个线程池之间进行协作,完成调度完成。

同时,本实现也将专门处理 scheduleWithFixedDelay 或 scheduleWithFixedDelay 中的延时处理,保证任务必须在前1个任务执行完成之后才处理下一个任务,而避免简单调度中可能产生同1个任务由于执行超时出现并行执行的问题.

本文将定义定时线程池称之为 defineScheduledExecutor,类型为ScheduledThreadPoolExecutor; 实际执行线程池称之为 realExecutor,类型为 ExecutorService

实现分离的思路有两种,一种是调整参数中的Runnable的实现,将其具体的执行逻辑交由realExecutor来处理。代码如下参考所示:

    private static Runnable wrapRunnable(Runnable run) {
        return () -> realExecutor.submit(run);
    }

以上的逻辑称之为包装原始逻辑. 但此方法有1个缺点,就是不能达到延时的目的。以上的调用将马上返回,并且触发调度的下一次运行,但实际上任务本身可能执行还需要一些时间。比如,一些long pull任务(执行耗时60s,调度周期为1s)

另一种思路即调整在定时池中的 RunnableScheduledFuture 对象,重新调整其执行逻辑。在定时调度中,所有的运行逻辑都封装在对象 ScheduledThreadPoolExecutor$ScheduledFutureTask中,其在内部的run逻辑封装了具体的定时调度逻辑。在原始的代码中,其调度逻辑如下代码所示

public void run() {
    boolean periodic = isPeriodic();
    if (!canRunInCurrentRunState(periodic))
        cancel(false);
    //非周期性任务,运行并且更新状态为 NORMAL
    else if (!periodic)
        ScheduledFutureTask.super.run();
    //周期性任务,运行但其状态运行完之后,仍为 NEW
    else if (ScheduledFutureTask.super.runAndReset()) {
        //这里设置其下一次执行时间
        setNextRunTime();
        //这里会重新将任务加入到队列中,以处理定时的语义
        reExecutePeriodic(outerTask);
    }
}

上述的逻辑,不仅处理了定时的语义,而且在FutureTask的状态设置上也作了调整。因为在FutureTask的运行逻辑中,同样有对state作判断(如果状态与预期不符,则将直接结束.如对一个已经完成的FutureTask,再次调用run方法,则其会直接完成,并不会执行实际业务逻辑)

因为其逻辑执行的类 ScheduledFutureTask 定义在 ScheduledThreadPoolExecutor 内部,并且其是private类,不能被继承。最终的方案即是通过Copy&Paste的方式,实现一个相同逻辑的类,通过 decorateTask 方法调整其task实现,达到替换原逻辑的目的。

decorateTask 是 标准方法中用于扩展包装执行运行的类,其原始参数类为 ScheduledFutureTask, 默认实现为原样返回,这里可以返回替换之后的类。参考代码如下所示:

private <V> RunnableScheduledFuture<V> doDecorateTask(RunnableScheduledFuture<V> task) {
    OuterScheduledFutureTask outerScheduledFutureTask = OuterScheduledFutureTask.buildFromScheduledFutureTask(task);
    outerScheduledFutureTask.setDefineScheduledThreadPoolExecutor(this);
    outerScheduledFutureTask.setRealExecutor(realExecutor);

    return outerScheduledFutureTask;
}

定义新的执行任务类,并引用相应的 defineScheduledExecutor 和 realExecutor。 在使用者层面,一切没有任何变化,相应的变化调整在新的 OuterScheduledFutureTask 中.

OuterScheduledFutureTask中的逻辑直接原样从 ScheduledFutureTask 复制而来,包括属性及字段, 数据通过反射或其它手段(如MethodHandle)获取, 主要的逻辑仍然在 run 方法中,调整后的执行方法参考如下

public void run() {
    boolean periodic = isPeriodic();
    boolean canRunInCurrentRunState;
    //这里通过methodHandle调用原 ScheduledThreadPoolExecutor#canRunInCurrentRunState 方法
    canRunInCurrentRunState = (boolean) methodCanRunInCurrentRunState.invokeExact(defineScheduledExecutor, periodic);
    
    if(!canRunInCurrentRunState) {
        cancel(false);
    } else if(!periodic) {
        //单次任务,直接由执行池执行,也不存在任务延时的问题
        realExecutor.submit(OuterScheduledFutureTask.super::run);
    } else {
        //周期性任务,切换执行池
        realExecutor.submit(() -> {
            //延时的逻辑放在执行池中处理,这里执行耗时发生在执行池中,但后面相应的计算时间同样在任务执行后才处理,即整个延时操作均在独立线程中进行
            if(OuterScheduledFutureTask.super.runAndReset()) {
                setNextRunTime();
                //这里完成下次任务处理,重新将任务添加回定义线程池,由定义线程池触发调度(再次回到run方法中/切换线程池/添加调度,循环...)
                methodReExecutePeriodic.invokeExact(defineScheduledExecutor, outerTask);                
            }
        });
    }
}

总结

整个思路的调整,即定义1个新的调度线程池,通过override decorateTask 以替换执行任务;定义新的执行任务FutureTask,参考原逻辑实现调度语义,但具体的执行逻辑切换至新的执行线程池。realExecutor执行线程池,并不需要调度的语义,其本身只是一个标准的executorService.

在此方案中,定义调度线程池的coreSize设置为1, 以保证其可以成功地进行任务调度。因为其不承担具体执行逻辑,所以不会有任何任务阻塞的问题。执行线程池可以根据实际情况进行线程大小的的处理,甚至可以使用任何实现了executorService语义的类来完成,这样可以替换线程池实现,定制更多线程控制逻辑.

转载请标明出处:i flym
本文地址:https://www.iflym.com/index.php/code/202009220001.html

相关文章:

作者: flym

I am flym,the master of the site:)

发表评论

电子邮件地址不会被公开。 必填项已用*标注