本文描述了在一个disruptor中生产者和消费者之间是如何通过sequence进行协作的,以及如何通过sequence构建一个类似数据追赶模型的.
本文采用基本的一个生产者和一个消费者的处理模型进行描述.
所谓的追赶模型,即理解为2个数字A和B,当A要进行+1操作时,需要判断A<B,如果满足此条件,则可以进行操作,否则就等待,直到B进行操作从而满足条件A<B时才继续进行自身的操作.
从生产者/消费者模型来看,就相当于,生产者需要生产时,不再需要判断是否存在可存放的位置,而是判断要存放东西的位置是否还没有被消费,即生产位置 数<消费位置数;对于消费者,不再需要判断是否有没有被消费的物品,而是判断当前消费的数目是否小于生产的数目,即消费数<生产数.如果满 足,即获取指定位置的数据本身进行处理,然后自身位置数/数目+1操作,然后进行同样的处理.
这里面有2个区别的信息,位置数即在一个ringBuffer环中的下标值(最大值ringBuffer的长度),而数目则指不断的生产/消费的累积数(最大值Long.MaxValue).
以下分2个场景分别描述生产者通过持有消费者sequence用于进行生产判断,消费者通过持有生产者sequence用于进行消费判断.
消费者引用生产者
我们看消费者是如何被创建的,以及在创建过程中,生产者的sequence是如何进入到消费者视野中的.首先是如何创建一个消费者,通过handleEventsWith方法可以添加一个消费者,我们来看相应的createEventProcessors实现:
EventHandlerGroup<T> createEventProcessors(final Sequence[] barrierSequences, final EventHandler<T>[] eventHandlers) { ...... final Sequence[] processorSequences = new Sequence[eventHandlers.length]; final SequenceBarrier barrier = ringBuffer.newBarrier(barrierSequences); ...... final EventHandler<T> eventHandler = eventHandlers[i]; final BatchEventProcessor<T> batchEventProcessor = new BatchEventProcessor<T>(ringBuffer, barrier, eventHandler);
可以看出,首先构建出一个sequenceBarrier对象,这个对象就可以认为是我们文中提到的追赶器.由ringBuffer产生,即间接引用了ringBuffer中的sequence信息.
准确地说,ringBuffer中没有sequence信息,它的sequence信息由内部的sequencer(累加器)持有,以完成在单线程或多线程中确定下标和或者数据的功能.而sequencer,就可以理解为是一个生产者.我们针对ringBuffer的主要生产操作next/get/publish,都是通过这个sequencer来实现的.
然后,这个sequenceBarrier对象被放入BatchEventProcessor对象,即事件处理器当中.而这个事件处理器即封装了我们的消费者操作.这样就完成了消费者对生产者信息的隐式依赖.
生产者引用消费者
生产者要获取下一个可生产对象的位置时,需要知道消费者当前已经消费了第N个对象,那么就需要知道消费者的sequence信息.这个信息是通过调用disruptor的start方法来建立起关系了.如代码所示:
public RingBuffer<T> start() { Sequence[] gatingSequences = consumerRepository.getLastSequenceInChain(true); ringBuffer.addGatingSequences(gatingSequences); for (ConsumerInfo consumerInfo : consumerRepository) { consumerInfo.start(executor); } return ringBuffer; }
首先,通过获取消费者操作的所有sequence,添加到ringBuffer中,即可以理解为添加到生产者sequencer中.而这个消费者仓库,就可以理解为当注册eventHandler时用于记录这些信息的容器.这里只不过简单理解为获取每个eventHandler的sequence,组合成一个数组来实现gatingSequence的功能.
因为,本文只考虑一个消费者简单的场景,因此不考虑多个消费者消费速度不一致的问题.简单理解为生产者添加了一个gatingSequence对象,而这个gatingSequence,就是消费者的sequence.
生产者可生产判断(next)
我们调用生产者生产事件的标准方法就是调用ringBuffer的next方法,实际上就是调用sequencer的next(1)方法.代码如下所示:
long nextValue = pad.nextValue; long nextSequence = nextValue + n; long wrapPoint = nextSequence - bufferSize; long cachedGatingSequence = pad.cachedValue; if (wrapPoint > cachedGatingSequence || cachedGatingSequence > nextValue) { long minSequence; while (wrapPoint > (minSequence = Util.getMinimumSequence(gatingSequences, nextValue))) { LockSupport.parkNanos(1L); // TODO: Use waitStrategy to spin? } pad.cachedValue = minSequence; }
这里面的nextValue就是下一个应该放的位置.这里有2种情况,第一种情况就是当值还没有超过bufferSize时,第二种情况就是当值超地bufferSize时.在上面代码中cachedGatingSequence表示上一次判断时最小的的那个处理数.
第一种情况,如果没有超过bufferSize,表示第一轮的buffer都没填充完毕,那么可以放心地使用value值.它也不会进入if判断(wrapPoint是一个负值,cacheGatingSequence也一定比nextValue小)
第二种情况,当wrapPoint值比cached值更大时,就表示离上次判断已经超出一个bufferSize值了.这时就要获取所有消费者当前消费的最小值,看这个wrapPoint值是否比最小值还大,即表示要获取的这个位置消费者正在处理,就需要等到消费者处理完毕之后,更新gatingSequence的值再作判断了.
消费者工作机制
在当前的disruptor的start方法中,我们看到以下的代码:
for (ConsumerInfo consumerInfo : consumerRepository) { consumerInfo.start(executor); }
这最终会让executor执行所有的消费者处理器,就是在独立的线程中进行运行.那么像我们的eventHandler所对应的eventProcessor大概的处理逻辑如下所示:
long nextSequence = sequence.get() + 1L; ...... final long availableSequence = sequenceBarrier.waitFor(nextSequence); while (nextSequence <= availableSequence) { event = dataProvider.get(nextSequence); eventHandler.onEvent(event, nextSequence, nextSequence == availableSequence); nextSequence++; } sequence.set(availableSequence);
这里首先获取一个当前准备要消费的数字,即上次消费完的下一个数字.然后将这个数字交由barrier计算出可以一直无间断消费的下一个有效数字,即生产者在上次消费完之后又已经生产好的信息数字.这个barrier又转向使用waitStrategy来进行计算.而在这个waitStrategy中,又同时使用了生产者sequence和消费依赖sequence共同判断,获取可以消费的上限数字.我们使用YieldingWaitStrategy参考,如下所示:
long availableSequence; int counter = SPIN_TRIES; while ((availableSequence = dependentSequence.get()) < sequence) ..... return availableSequence;
这个代码可以简化理解为就是获取dependentSequence的值,而在单个消费者场景或者说是第一个eventHandler场景中,depentSeq就是ringBuffer sequencer中的seq.实际上就是获取了生产者已经生产好的那个数字信息.
如果在这段期间生产者一直没有生产,那么返回的值就是之前的数字,如果之前消费者已经消费完了,那就等于availableSequence-1,即当前消费者之前记录的消费数字.那这个循环暂时就无事可做.
如果生产者生产了多个event,那么这时候消费者会一直循环消费,一直到消费到刚获取到的消费数字上限.这也正是这个消费者处理器为什么叫batch的原因.即它不是并行消费的意思,而是说一次性循环性地消费多个event.
总结
从整个模型中看,我们可以将生产者和消费者理解为两个不同的线程.每个线程中有一个处理数字,在互相进行着追赶判断操作.当生产者与消费者的差距在1个ringBuffer之内时,生产者就可以不停地生产;而消费者只要在生产者和依赖者的后面时,就可以不停地消费,直至追赶上生产者.
在消费者模型中,由于是专门的处理消费,因此可以对这个处理流程进行优化.比如使其一直进行CPU密集性运算,让整个线程一直在进行运算,因为disruptor中cpu缓存模型的有效处理,可以高效地使用数据,而不担心cpu缓存失效问题,也不用担心线程等待挂起问题.当然,如果消费任务有其它IO任务,对不起,还是让多开线程,待某个线程挂起时进行切换运行吧.
转载请标明出处:i flym
本文地址:https://www.iflym.com/index.php/code/201403310001.html