欢迎访问昆山宝鼎软件有限公司网站! 设为首页 | 网站地图 | XML | RSS订阅 | 宝鼎邮箱 | 后台管理


新闻资讯

MENU

软件开发知识

int flag) { long bufferAddress = (ind 次  来源:宝鼎软件 时间:2018-04-29

原文出处: 笨狐狸

disruptor颠末几年的成长,好像已经成为机能优化的大杀器,险些每个想优化机能的项目宣称本身用上了disruptor,机能城市泛起质的跃进。究竟,最好的例子就是LMAX本身的架构设计,支撑了600w/s的吞吐。

本文试图从代码层面将要害问题做些解答。

根基观念

Disruptor: 实际上就是整个基于ringBuffer实现的出产者消费者模式的容器。

RingBuffer: 著名的环形行列,可以类比为BlockingQueue之类的行列,ringBuffer的利用,使得内存被轮回利用,淘汰了某些场景的内存分派接纳扩容等耗时操纵。

 int flag) { long bufferAddress = (ind  <a href=苏州软件定制开拓 ex * SCALE) + BASE; UNSAFE.putOrderedInt(availableBuffer" src="http://www.importnew.com/https:/img-blog.csdn.net/20171219141927339" />

 

EventProcessor: 事件处理惩罚器,实际上可以领略为消费者模子的框架,实现了线程Runnable的run要领,将轮回判定等操纵封在了内里。

 int flag) { long bufferAddress = (ind  <a href=苏州软件定制开拓 ex * SCALE) + BASE; UNSAFE.putOrderedInt(availableBuffer" src="http://www.importnew.com/https:/img-blog.csdn.net/20171219175751608" />

EventHandler: 事件处理器,与前面处理惩罚器的差异是,事件处理器不认真框架内的行为,仅仅是EventProcessor作为消费者框架对外预留的扩展点而已。

Sequencer: 作为RingBuffer出产者的父接口,其直接实现有SingleProducerSequencer和MultiProducerSequencer。

 int flag) { long bufferAddress = (ind  <a href=苏州软件定制开拓 ex * SCALE) + BASE; UNSAFE.putOrderedInt(availableBuffer" src="http://www.importnew.com/https:/img-blog.csdn.net/20171219141940748" />

EventTranslator: 事件转换器。实际上就是新事件向往事件包围的接口界说。

SequenceBarrier: 消费者路障。划定了消费者如何向下走。都说disruptor无锁,事实上,该路障算是变向的锁。

WaitStrategy: 当出产者出产得太快而消费者消费得太慢时的期待计策。

 int flag) { long bufferAddress = (ind  <a href=苏州软件定制开拓 ex * SCALE) + BASE; UNSAFE.putOrderedInt(availableBuffer" src="http://www.importnew.com/https:/img-blog.csdn.net/20171219142244617" />

把上面几个要害观念画个图,昆山软件公司,或许长这样:

 int flag) { long bufferAddress = (ind  <a href=苏州软件定制开拓 ex * SCALE) + BASE; UNSAFE.putOrderedInt(availableBuffer" src="http://www.importnew.com/https:/img-blog.csdn.net/20171221134247821" width="635" height="494" />

所以接下来主要也就从出产者,消费者以及ringBuffer3个维度去看disruptor是如何玩的。

出产者

出产者宣布动静的进程从disruptor的publish要领为进口,实际挪用了ringBuffer的publish要领。publish要领主要做了几件事,一是先确保能拿到后头的n个sequence;二是利用translator来填充新数据到相应的位置;三是真正的声明这些位置已经宣布完成。

    public void publishEvent(EventTranslator<E> translator)  
   {  
     final long sequence = sequencer.next();  
     translateAndPublish(translator, sequence);  
   }  
    public void publishEvents(EventTranslator<E>[] translators, int batchStartsAt, int batchSize)  
    {  
        checkBounds(translators, batchStartsAt, batchSize);  
        final long finalSequence = sequencer.next(batchSize);  
        translateAndPublishBatch(translators, batchStartsAt, batchSize, finalSequence);  
    }

获取出产者下一个sequence的要领,细节已经注释,实际上最终目标就是确保出产者和消费者相互不越界。

public long next(int n)  
{  
    if (n < 1)  
    {  
        throw new IllegalArgumentException("n must be > 0");  
    }  
    //该出产者宣布的最大序列号  
    long nextValue = this.nextValue;  
    //该出产者欲宣布的序列号  
    long nextSequence = nextValue + n;  
    //包围点,即该出产者假如宣布了这次的序列号,那它最终会落在哪个位置,实际上是nextSequence做了算术处理惩罚今后的值,最终目标是统一计较,不然就要去判绝对值以及取模等贫苦操纵  
    long wrapPoint = nextSequence - bufferSize;  
    //所有消费者中消费得最慢谁人的前一个序列号  
    long cachedGatingSequence = this.cachedValue;  
  
    //这里两个判定条件:一是看出产者出产是不是高出了消费者,所以判定的是包围点是否高出了最慢消费者;二是看消费者是否高出了当前出产者的最大序号,判定的是消费者是不是比出产者还快这种异常环境  
    if (wrapPoint > cachedGatingSequence || cachedGatingSequence > nextValue)  
    {  
        cursor.setVolatile(nextValue);  // StoreLoad fence  
  
        long minSequence;  
        //包围点是不是已经高出了最慢消费者和当前出产者序列号的最小者(这两个有点难领略,实际上就是包围点不能高出最慢谁人出产者,也不能高出当前自身,好比一次宣布高出bufferSize),gatingSequences的处理惩罚也是雷同算术处理惩罚,也可以当作是相对付原点是正照旧负  
        while (wrapPoint > (minSequence = Util.getMinimumSequence(gatingSequences, nextValue)))  
        {  
            //叫醒阻塞的消费者  
            waitStrategy.signalAllWhenBlocking();  
            //等上1纳秒  
            LockSupport.parkNanos(1L); // TODO: Use waitStrategy to spin?  
        }  
        //把这个最慢消费者缓存下来,以便下一次利用  
        this.cachedValue = minSequence;  
    }  
    //把当前序列号更新为欲宣布序列号  
    this.nextValue = nextSequence;  
  
    return nextSequence;  
}