本文之前,今天发现使用spring-batch也能够达到相同的效果,不过相比spring-batch,本文使用的数据表更少,且相应的逻辑更简洁,特转如下。
本文转自:http://blog.csdn.net/sfdev/article/details/4056114 原文作者:sfdev
背景
随着应用系统功能的不断新增,而某些功能的实现对实时性要求并不是那么高,但是逻辑却很复杂、执行比较耗时,比如涉及外部系统调用、多数据源等等;此时,我们就希望可以让这些复杂的业务逻辑放在后台执行,而前台与用户的交互可以不用等待,从而提高用户体验;
另外,从系统架构这个层面来说,我们也希望按照不同功能来拆分,以保持各个系统之间的低耦合,当一个系统出现问题时不会影响到其他系统,并且对于独立的各个系统,我们可以专门进行性能优化、监控等;所以我们需要通用、高效的异步任务处理系统;
设计目标
打造轻量级、简单、高效、通用、高扩展性、高可靠性的异步任务处理系统!
系统设计
要实现类似的异步处理系统,相信大家首先想到的就是JMS,Alibaba里面也有基于JMS的异步处理系统,而且该系统在网店系统中应用非常广泛;但由于目前我们阿里软件采用了不同的技术框架,所以不能直接拿来使用;况且,该系统为了实现异步任务系统的并发,采取了JMS与MDB结合的策略,所以系统就依赖于EJB了,这样系统就变得笨重了,由此系统部署的应用服务器必须要支持EJB,一些轻量级的不支持EJB规范的应用服务器就没法部署了;
考虑到如上的系统设计目标,我们的设计思路为:任务DB持久化 + Spring封装Job调度、线程池;
- 任务DB持久化:是说我们需要将待处理的任务信息保存在我们可信任的DB中,若任务未到达千万级可以和业务DB放在一起,确保当我们的任务处理服务器down了之后这些未执行成功、或未开始执行的任务不会被丢失;
- Spring封装Job调度:当任务信息都持久化在DB中之后,我们需要将这些信息读取出来执行具体的业务逻辑操作,这里我们通过ScheduledExecutorFactoryBean来实现对任务的循环调度,比如说可采取每隔5min扫描一次待处理任务列表,若有记录则提取出来执行;当然,若要实现更加强大的任务调度功能,可以采用Spring内部集成的Quartz这个开源调度框架;
- Spring封装线程池:为了提高任务执行效率,我们必须考虑让任务的具体操作能够被并发执行;为了让系统更加轻量级,这里我们直接采用Spring中基于JDK线程池的默认封装实现,通过配置调整参数;
系统的部署图可参考下图:
下面我们来看以下具体的系统设计:
首先,需要新建两张表,用来持久化我们的任务相关信息,以下表结构及其SQL都基于Oracle;表名可自取,比如Tasks/Tasks_Fail_History,两者的字段完全一样,字段建议包括:
字段 | 类型 | 描述 | 可空 | 默认值 |
TASK_ID | VARCHAR2(36) | PK,唯一标识即可,默认是UUID | NOT | |
GMT_CREATE | DATE | 创建日期 | NOT | |
GMT_HANDLE | DATE | 任务待执行日期 | NOT | |
TASK_HANDLER | VARCHAR2(32) | 待执行任务类型 | NOT | |
LOAD_BALANCE_NUM | NUMBER |
待执行任务获取的负载均衡值 当有多台服务器时用于平衡各服务器压力 |
NOT | 0 |
TASK_PARAMS | VARCHAR2(4000) | 待执行任务需要的参数 | NULL | |
RETRY_COUNT | NUMBER | 重试次数,每次加1 | NOT | 0 |
RETRY_REASON | VARCHAR2(512) | 重试原因,即上次失败原因,便于排错 | NULL |
主要用来保存所有待执行的任务,每条任务信息属于一种任务类型,由TASK_HANDLER字段标识,因为本系统核心基于Spring,所以任务类型的值建议为:该类型任务的具体实现类在Spring容器中的bean id;
执行该任务需要的所有参数都由TASK_PARAMS字段提供,该字段内的字符串可以由应用自行组装,只要具体任务实现类能够解析即可;
对于字段LOAD_BALANCE_NUM,主要是用来满足未来任务很多时,需要多台服务器来平衡压力时使用,相当于对每条任务分配了一个负载均衡值,不同服务器能够处理具有不同负载均衡值的任务信息;该字段值要求在全表内尽量平均分布,比如说全表内共500条记录,其中1、2、……、10每个值的任务总条数都在50条左右;
每条任务被执行之后根据执行情况进行删除或者更新操作;
表Tasks_Fail_History主要用来保存执行失败、需要人工干预的任务记录;记录来源于Tasks表,当任务执行重试超过一定次数时任务记录就会保存到失败历史表中;
其次,我们要明确任务生产者、消费者各自关注的一些信息:
对于任务的生产者,他需要提供的必备信息包括:任务待执行日期、任务类型、任务执行所需参数;
另外一个可选字段:LOAD_BALANCE_NUM;当任务的消费者有多台服务器时,可以利用该字段来进行分布式任务处理,此时可以根据一定规则对该字段设值,比如说产生一个1-10之间的随机数;或者根据其他自行设计的规则生成一个值,只要保持该字段值是在全表内平均分布的即可;
对于任务的消费者,大致的消费过程如下:
下面对上图中的各个过程中具体逻辑进行一些详细描述:
-
当消费者服务启动之后,会根据配置好的调度策略(通过Spring内置的ScheduledExecutorFactoryBean实现,可以选择两种调度策略:其一:FixedRate,即每隔几分钟调度一次,而不管上次调度是否已经执行完毕;其二:FixedDelay,即在每次调度完成后都delay相同时间;)扫描Tasks表,从中取出xx条数据,比如1000,可配置;
基本SQL语句为:SELECT * FROM tasks WHERE gmt_handle <= SYSDATE;
当然根据扩展策略不同,每次扫描Tasks表的查询条件也不同,比如:-
当待执行任务类型较少,任务数量也不是很多的情况下,单台服务器已经可以搞定,所以查询SQL为:
SELECT * FROM tasks WHERE gmt_handle <= SYSDATE AND ROWNUM <= ?; -
当任务类型、任务数量越来越多时,单台服务器已经不能搞定了,此时我们需要考虑对消费者服务器进行线性扩展,此时有不同的扩展策略可供选择:
-
若按功能水平扩展的策略,即将不同的任务类型让不同的消费者服务器执行;则查询SQL条件为:
WHERE gmt_handle <= SYSDATE AND task_handler IN (?) AND ROWNUM <= ?; -
若按压力水平扩展的策略,即尽量保持各台消费者服务器的压力很平均,避免出现某些服务器很繁忙,而有些服务器却很空闲的情况;前面的按功能水平扩展的策略就会出现服务器繁忙程度不一样的问题;若采取这种策略,每台消费者服务器可能会处理多种类型的任务,此时SQL查询条件为:
WHERE gmt_handle <= SYSDATE AND load_balance_num IN (?) AND ROWNUM <= ?; -
除了根据上面两个独立维度进行扩展的策略之后,还可以将两者进行结合起来使用;可适用于我们想按照功能进行水平扩展,但是某些任务类型单台服务器又搞不定,此时就需要对这些特殊任务类型再按照压力进行水平扩展,此时SQL查询条件为:
WHERE gmt_handle <= SYSDATE AND task_handler IN (?) AND load_balance_num IN (?) AND ROWNUM <= ?; - 对于以上任务的查询SQL中有用IN这个关键词,有人可能会担心查询性能,其实不必担心,因为我们处理的任务类型、任务服务器数量都不会太多,几百个任务类型估计最多了,而且IN语句的查询也是会用到index的,再以ROWNUM的辅助限制条件,所以SQL的执行效率不用担心;另外,若任务类型较少,则SQL中的IN可用=替换;
-
若按功能水平扩展的策略,即将不同的任务类型让不同的消费者服务器执行;则查询SQL条件为:
-
当待执行任务类型较少,任务数量也不是很多的情况下,单台服务器已经可以搞定,所以查询SQL为:
- 对从DB中查询出来的每条记录,将该条记录的ID放进本地cache(static变量即可搞定,但要处理并发)中,根据记录中TASK_HANDLER字段的值在Spring容器中找到对应的处理类bean实例,并扔到Spring异步线程池中执行;
- 具体处理类对该任务处理完成之后返回结果,然后任务系统根据返回结果对该条记录对应的Tasks表中的记录进行更新(增加重试次数,并根据重试策略设置下次执行时间)或者删除(执行成功);同时将cache中的记录ID清除、避免cache无限膨胀;
- 根据调度规则,当到了下次执行时间时,再次利用步骤1中的规则扫描Tasks表,循环上面的处理逻辑,差别在于,在将任务让具体TASK_Handler处理之前会先到本地cache中查询是否该条记录正在被处理,若cache中已经存在该条记录就无需处理了;这主要是为了避免一些比较耗时的任务被重复并发执行;
- 对于失败后的重试,设置重试策略,每次可delay不同的时间,可配置;比如第一次失败后1分钟后重试,第二次失败后5分钟后重试,第三次失败后20分钟后重试。。。失败超过x次后将记录移至history表中,并email报警;
详细设计
针对以上的系统设计,我们可以规划出大致的类图,可以参考如下实现:
其中类图中涉及到的几个核心class的用途说明可以参考如下的Spring配置信息:
<!-- 任务从此处开始加载 --> <bean id="notifySpringScheduledExecutorFactoryBean" class="org.springframework.scheduling.concurrent.ScheduledExecutorFactoryBean"> <property name="scheduledExecutorTasks"> <list> <ref bean="notifySpringScheduledExecutorTask" /> </list> </property> </bean> <!-- 待加入Spring Schedual进行调度的task列表 --> <bean id="notifySpringScheduledExecutorTask" class="org.springframework.scheduling.concurrent.ScheduledExecutorTask"> <property name="runnable" ref="notifyScheduledMainExecutor" /> <!-- 初次执行任务delay时间,单位为ms,默认值为0,代表首次加载任务时立即执行;比如1min --> <property name="delay" value="60000" /> <!-- 间隔时间,单位为ms,默认值为0,代表任务只执行一次;比如2min --> <property name="period" value="120000" /> <!-- 是否采用fixedRate方式进行任务调度,默认为false,即采用fixedDelay方式 --> <!-- fixedRate:定时间隔执行,不管上次任务是否已执行完毕;fixedDelay:每次任务执行完毕之后delay固定的时间 --> <property name="fixedRate" value="true" /> </bean> <!-- 任务调度主线程 --> <bean id="notifyScheduledMainExecutor" class="com.alisoft.aep.notify.schedual.NotifyScheduledMainExecutor"> <!-- 针对Notify服务端的Service,用于更新Notify重试信息等 --> <property name="notifyServerService" ref="notifyServerService" /> <!-- notify.notifyId缓存策略实现类,可自行扩展 --> <property name="notifyIdCacheStrategy" ref="defaultNotifyIdCacheStrategy" /> <!-- notify.load_balance_num字段值生成、以及调度时where条件中取值的策略实现类,可自行扩展 --> <!-- 当有多台notify服务器时才有用,用于平衡各台server间的压力;一般不用配置 --> <!-- <property name="loadBalanceNumStrategy" ref="alternateLoadBalanceNumStrategy" /> --> <!-- notify.handler字段值在调度时where条件中取值的策略实现类,可自行扩展 --> <!-- 当有多台notify服务器时才有用,用于表明某台server可执行哪些handler;一般不用配置 --> <!-- <property name="notifyHandlerStrategy" ref="defaultNotifyHandlerStrategy" /> --> <!-- 当有多台notify服务器时才有用,用于设置某台server调度时每次读取的Notify最大数,用于覆盖maxNum;一般不用配置 --> <!-- <property name="notifyMaxNumPerJobStrategy" ref="defaultNotifyMaxNumPerJobStrategy" /> --> <!-- 用于并发的线程池 --> <property name="notifyTaskExecutor" ref="notifyTaskExecutor" /> <!-- 每次调度读取的Notify最大记录数,默认为1000 --> <property name="maxNum" value="1000" /> <property name="notifyDao" ref="notifyDao" /> </bean> <!-- 异步线程池 --> <bean id="notifyTaskExecutor" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor"> <!-- 核心线程数,默认为1 --> <property name="corePoolSize" value="10" /> <!-- 最大线程数,默认为Integer.MAX_VALUE --> <property name="maxPoolSize" value="50" /> <!-- 队列最大长度,一般需要设置值>=notifyScheduledMainExecutor.maxNum;默认为Integer.MAX_VALUE --> <property name="queueCapacity" value="1000" /> <!-- 线程池维护线程所允许的空闲时间,默认为60s --> <property name="keepAliveSeconds" value="300" /> <!-- 线程池对拒绝任务(无线程可用)的处理策略,目前只支持AbortPolicy、CallerRunsPolicy;默认为后者 --> <property name="rejectedExecutionHandler"> <!-- AbortPolicy:直接抛出java.util.concurrent.RejectedExecutionException异常 --> <!-- CallerRunsPolicy:主线程直接执行该任务,执行完之后尝试添加下一个任务到线程池中,可以有效降低向线程池内添加任务的速度 --> <!-- DiscardOldestPolicy:抛弃旧的任务、暂不支持;会导致被丢弃的任务无法再次被执行 --> <!-- DiscardPolicy:抛弃当前任务、暂不支持;会导致被丢弃的任务无法再次被执行 --> <bean class="java.util.concurrent.ThreadPoolExecutor$CallerRunsPolicy" /> </property> </bean> <bean id="notifyServerService" class="com.alisoft.aep.notify.service.impl.NotifyServerServiceImpl"> <!-- 针对任务执行失败后Notify如何重试的策略实现类,可自行扩展 --> <property name="notifyRetryStrategy" ref="defaultNotifyRetryStrategy" /> <!-- 针对任务执行失败后异常处理策略实现类,可自行扩展 --> <!-- 默认不对异常进行补救,具体handler实现类中若返回NULL或抛出异常,则均按异常处理,直接将Notify记录迁移到历史表中,不进行重试; --> <!-- <property name="notifyHandlerExceptionStrategy" ref="defaultNotifyHandlerExceptionStrategy" /> --> <!-- 描述见notifyScheduledMainExecutor --> <property name="notifyIdCacheStrategy" ref="defaultNotifyIdCacheStrategy" /> <!-- 事务模板,需保证能够找到对应的bean --> <property name="transactionTemplate" ref="transactionTemplate" /> <property name="notifyDao" ref="notifyDao" /> </bean>
是否达成设计目标?
- 轻量:核心实现完全基于Spring、Dao层完全可以自行决定采取何种框架;可以部署于任何Web容器中;这也是相对于JMS系统最大的改进;
- 简单:对于任务的生产者,只需要向Tasks表中insert记录即可,无需引入任何其他通讯协议;
- 对于任务的消费者而言,因为系统只依赖于Spring,所以要想将该系统与目前已有系统进行集成将会非常简单:引入jar包,将Ibatis、Spring配置文件加入到自己系统的加载列表中即可;
- 另外,任务的调度策略设置基于Spring Schedual,配置文件相对于Quartz来说更少;
- 高效:若采取FixedRate调度方式,系统的处理能力可以被准确计算;比如每1min提取1000条数据,那么1天单台服务器的处理能力为144w;当然需要考虑每个任务的具体耗时,因为1min内系统不一定能将1000条数据处理完毕;
- 若采取FixedDelay调度方式,系统的处理能力就完全基于任务的具体执行耗时了,因为当该种调度方式设置每次调度完成之后delay 1s,其实就相当于系统一直在处理任务,这样就可以最大化的保持系统的利用率;
- 可能有人会怀疑多台消费者服务器都对TASKS表进行查询会不会有性能问题?其实经过我们的系统运行经验,该问题是不存在的,因为该表的记录当执行成功之后就会被删除的,所以该表的数据量不会太大,除非消费者服务大面积down掉,但这是极少数情况,当出现这种情况时,当消费者服务再次启动时系统会有一定压力,但也不会太大,因为每次查询待执行任务时是取前XX条的,况且可以建立index来进行辅助;
- 通用:该系统只实现最核心的异步处理功能,而与具体业务逻辑没有任何关系,系统根据TASK_HANDLER去加载具体的业务逻辑实现;具体的Handler实现只需实现对应接口,并在Spring中添加bean配置即可;
- 扩展:根据TASKS表中的TASK_HANDLER/LOAD_BALANCE_NUM中任意一个字段、或者两者组合的方式可以实现分布式线性扩展,他们分别对应于两种不同的分布式线性扩展策略;而这对于客户端而言是完全透明的,任务生产者插入时只需配置不同策略而已;而且可以通过合理使用这两种策略达到新增任务类型时已经在运行的消费者服务无需重新发布;
- 可靠:由于待执行任务信息是在我们自行维护的可靠DB中保存,所以当我们的消费者服务down了也不会让未处理的任务信息丢失,相比于基于JMS Server的一些内存数据库定时持久化方案,与业务DB的稳定性相比,在可靠性方面不是一个级别的;
后记:整个设计非常好,并且结合多线程,也可以达到并发处理,同时,包括错误重试,数据库连接等,都能够达到生产的要求。当然,原文没源代码,不过其中的思想已经能够解决实际的问题了。
相比spring-batch,这个设计小巧,并且按需设计,能解决问题即可。
转载请标明出处:i flym
本文地址:https://www.iflym.com/index.php/code/201305070001.html
请问一下:就是当多个消费者去读取task表的前xx条时,他们之间的读取到的数据应该是有重复的把,这样会不会有任务重复执行了呢?