简介-翻译

原文 https://github.com/LMAX-Exchange/disruptor/wiki/Introduction

理解 Disruptor 是什么的最好方式是与相似的众所周知的东西进行比较. 对于 Disruptor 来说, 就是 Java 的 BlockingQueue . 像队列一样, Disruptor 的目的是在同一进程内的线程相互之间移动数据(例如消息或事件). 但 Disruptor 提供的一些与其他队列不同的主要特性 :

  • 向具有消费者依赖关系图的消费者发送多播事件
  • 为事件预分配内存
  • (可选) 无锁

核心概念

在理解 Disruptor 如何工作之前, 这有一些定义的术语, 它们将在整个文档和代码中使用. 对于那些倾向于 DDD (领域驱动设计) 的人, 可以将其视为 Disruptor 领域中无处不在的语言.

Ring Buffer

它通常被认为是 Disruptor 的主要方面, 但从 3.0 开始, 它仅负责存储和更新在 Disruptor 中移动的数据(事件). 对某些高级用例, 可完全由用户代替.

Sequence

Disruptor 使用它作为一种手段来识别特定组件在哪里. 每个消费者 (EventProcessor) 和 Disruptor 本身一样都维护一个 Sequence . 大多数并发代码依赖于这些 Sequence 值的移动, 因此 Sequence 支持 AtomicLong 的许多当前功能. 实际上, 和版本 2 之间的唯一真正区别是, Sequence 包含其他功能, 以防止 Sequence 与其他值之间的伪共享.

Sequencer

它是 Disruptor 的真正核心. 此接口的两个实(单生产者, 多生产者)实现了所有并发算法, 这些算法用于在生产者和消费者之间快速正确地传递数据

Sequence Barrier

它是由 Sequencer 产生, 包含主要发布的 Sequence 的引用以及任何从属消费者的 Sequence . 它包含确定是否有任何事件可供消费者处理的逻辑.

Wait Strategy

它确定消费者如何等待生产者将事件放入 Disruptor 中. 更多详细, 在下面的 可选无锁 部分.

Event

从生产者到消费者的数据传递单元. 没有特定的代码来表示它, 完全是由用户定义的

EventProcessor

用来处理来自 Disruptor 的事件的主事件循环(event loop), 并拥有消费者的 Sequence 所有权. 有一个 BatchEventProcessor , 它包含事件循环的高效实现并回调到 EventHandler 接口的提供的实现中.

EventHandler

一个用于被用户实现的接口, 对于 Disruptor 来说, 就是一个消费者

Producer

它是用户定义的代码, 调用 Disruptor 将事件进队. 这个概念并没有特定的代码表示.

特性

将上面的元素放入上下文. 下面是 LMAX 如何在其高性能核心服务中使用 Disruptor 的示例.

Models

多播事件 Multicast Events

这是 Disruptor 与 队列最大的区别. 当你有多个消费者在同一个 Disruptor 上监听时, 所有事件都发布给所有消费者, 而队列中单个事件只会发送给一个消费者. Disruptor 的行为旨在用于需要对同一数据进行多个并行操作的情况. LMAX 的一个典型例子是我们执行三个操作, 即日志记录(将输入数据写入持久日志文件), 复制(将输入数据发送到另一台机器以确保数据的远程副本)和业务逻辑(真正的加工工作).

Executor-style 事件处理, 也可以使用 WorkerPool 通过同时并行处理不同事件来伸缩. 注意, 这是固定在现有 Disruptor 类之上的, 并且没有得到相同的一流支持, 因此, 这可能不是实现特定目标的最有效方法.

从上面的图可以看出, 有三个 EventHandler 在监听 Disruptor (JournalConsumer, ReplicationConsumer, ApplicationConsumer) , 这些 EventHandler 中的每一个都将接收 Disruptor 中所有可用的消息(按相同的顺序). 这允许这些消费者中的每一个并行工作.

消费者依赖图 Consumer Dependency Graph

为了支持现实世界中并行处理行为的应用, 有必要支持消费者之间的协调. 再次参考上面的示例, 有必要防止业务逻辑消费者在日志记录和复制消费者完成其任务之前继续. 我们将此概念称为门控( gating ), 或更准确地说, 是此行为的超集功能称为门控. 门控发生在两个地方. 首先, 我们要确保生产者不会超过消费者. 这是通过调用 RingBuffer.addGatingConsumers() 将相关消费者添加到 Disruptor 中来处理的. 其次, 通过构造一个 SequenceBarrier 来实现前面提到的情况, 该 SequenceBarrier 包含必须首先完成其他处理的组件中的 Sequence

参考上图. 有一个消费者从 Ring Buffer 中监听事件. 此示例中有一个依赖图. ApplicationConsumer 取决于JournalConsumerReplicationConsumer . 这意味着, JournalConsumerReplicationConsumer 可以相互并行自由运行. 从 ApplicationConsumerSequenceBarrierJournalConsumerReplicationConsumerSequence 之间的连接可以看到依赖关系. 还值得注意的是, Sequencer 与下游消费者之间的关系. 其作用之一是确保发布不会包装 Ring Buffer . 为此, 所有下游消费者都不能拥有比 Ring BufferSequence 低且小于 Ring Buffer 大小的 Sequence. 但是, 消费者依赖关系图可以进行有趣的优化. 因为保证 ApplicationConsumer sequnce 小于或等于 JournalConsumerReplicationConsumer (这就是依赖关系所确保的), 所以 Sequencer 只需要查看 ApplicationConsumersequence . 从更一般的意义上讲, Sequencer 仅需要了解作为依赖树中的叶子节点的消费者 Sequence

事件预分配 Event Preallocation

Disruptor 的目标之一是允许在低延迟环境中使用. 在低延迟系统中, 有必要减少或删除内存分配. 在基于 Java 的系统中, 目的是有关法律由于 GC 而导致的停顿数(在低延迟 C/C++ 系统中, 由于在内存分配器上竞争, 因此繁重的内存分配也存在问题)

为此, 用户可以在 Disruptor 中预先分配事件所需的存储空间. 在构建过程中, EventFactory 由用户提供, 将为 DisruptorRing Buffer 中的每个条目调用. 在将新数据发布到 Disruptor 时, API 将允许用户保留所构造的对象, 以便它们可以调用该存储对象上的方法或更新字段. 只要正确实施这些操作, Disruptor 就可以保证这些操作是并发安全的.

可选无锁 Optionally Lock-free

对低延迟的渴望推动的另一个关键实现细节是广泛使用无锁算法来实现 Disruptor . 所有内存可见性和正确性保证都是使用内存屏障和/或 CAS 实现的. 只有一个用例需要实际锁定, 并且在 BlockingWaitStrategy 中. 这样做仅出于使用条件的目的, 以便在等待新事件到达时可以停放正在使用的线程. 许多低延迟系统将使用 busy-wait 来避免由于使用条件而引起的抖动, 但是, 许多系统的 busy-wait 操作会导致性能显著下降, 尤其是在 CPU 资源严重受限的情况下. 例如, 虚拟化环境中的 Web 服务器.

入门指南

基本的事件产生和消费

事件

public class LongEvent {
    private long value;

    public void set(long value) {
        this.value = value;
    }
}

事件工厂(用于在 Ring Buffer 中占位, 然后复用对象, 以减内存分配和 GC)

public class LongEventFactory implements EventFactory<LongEvent> {
    @Override
    public LongEvent newInstance() {
        return new LongEvent();
    }
}

事件处理器

public class LongEventHandler implements EventHandler<LongEvent> {

    @Override
    public void onEvent(LongEvent longEvent, long l, boolean b) throws Exception {
        System.out.println("Event: " + longEvent);
    }
}

发布事件

使用 Translator

public class LongEventProducerWithTranslator {
    private final RingBuffer<LongEvent> ringBuffer;

    public LongEventProducerWithTranslator(RingBuffer<LongEvent> ringBuffer) {
        this.ringBuffer = ringBuffer;
    }

    private static final EventTranslatorOneArg<LongEvent, ByteBuffer> TRANSLATOR =
            new EventTranslatorOneArg<LongEvent, ByteBuffer>() {
                @Override
                public void translateTo(LongEvent event, long sequence, ByteBuffer bb) {
                    event.set(bb.getLong(0));
                }
            };

    public void onData(ByteBuffer bb) {
        ringBuffer.publishEvent(TRANSLATOR, bb);
    }
}

以前的方式

public class LongEventProducer {
    private final RingBuffer<LongEvent> ringBuffer;

    public LongEventProducer(RingBuffer<LongEvent> ringBuffer) {
        this.ringBuffer = ringBuffer;
    }

    public void onData(ByteBuffer bb) {
        long sequence = ringBuffer.next();  // Grab the next sequence
        try {
            LongEvent event = ringBuffer.get(sequence); // Get the entry in the Disruptor
            // for the sequence
            event.set(bb.getLong(0));  // Fill with data
        } finally {
            ringBuffer.publish(sequence);
        }
    }
}

调用

    @Override
    public void run(String... args) throws Exception {
        // The factory for the event
        LongEventFactory factory = new LongEventFactory();

        // Specify the size of the ring buffer, must be power of 2.
        int bufferSize = 1024;

        // Construct the Disruptor
        Disruptor<LongEvent> disruptor = new Disruptor<>(factory, bufferSize, DaemonThreadFactory.INSTANCE);

        // Connect the handler
        disruptor.handleEventsWith(new LongEventHandler());

        // Start the Disruptor, starts all threads running
        disruptor.start();

        // Get the ring buffer from the Disruptor to be used for publishing.
        RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer();

        LongEventProducer producer = new LongEventProducer(ringBuffer);

        ByteBuffer bb = ByteBuffer.allocate(8);
        for (long l = 0; true; l++) {
            bb.putLong(0, l);
            producer.onData(bb);
            Thread.sleep(1000);
        }
    }

Java 8 风格

    private void java8() throws InterruptedException {
        // Specify the size of the ring buffer, must be power of 2.
        int bufferSize = 1024;

        // Construct the Disruptor
        Disruptor<LongEvent> disruptor = new Disruptor<>(LongEvent::new, bufferSize, DaemonThreadFactory.INSTANCE);

        // Connect the handler
        disruptor.handleEventsWith((event, sequence, endOfBatch) -> System.out.println("Event: " + event));

        // Start the Disruptor, starts all threads running
        disruptor.start();

        // Get the ring buffer from the Disruptor to be used for publishing.
        RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer();

        ByteBuffer bb = ByteBuffer.allocate(8);
        for (long l = 0; true; l++) {
            bb.putLong(0, l);
            ringBuffer.publishEvent((event, sequence, buffer) -> event.set(buffer.getLong(0)), bb);
            Thread.sleep(1000);
        }
    }

清除对象

  • 如果只有一个 Handler , 可以直接在该 Handler 里清除即可. 调用
  • 如果有多个 Handler 调用链, 则可在最后添加个专门负责清除的 Handler

    class ObjectEvent<T>
    {
    T val;
    
    void clear()
    {
        val = null;
    }
    }
    
    public class ClearingEventHandler<T> implements EventHandler<ObjectEvent<T>>
    {
    public void onEvent(ObjectEvent<T> event, long sequence, boolean endOfBatch)
    {
        // Failing to call clear here will result in the 
        // object associated with the event to live until
        // it is overwritten once the ring buffer has wrapped
        // around to the beginning.
        event.clear(); 
    }
    }
    
    public static void main(String[] args)
    {
    Disruptor<ObjectEvent<String>> disruptor = new Disruptor<>(
        () -> ObjectEvent<String>(), bufferSize, DaemonThreadFactory.INSTANCE);
    
    disruptor
        .handleEventsWith(new ProcessingEventHandler())
        .then(new ClearingObjectHandler());
    }
    

调优

使用 Single Producer

坚持使用 Single Writer Principle . 注意, 默认情况下, 是 Multi Producer 的.

创建一个 Single Producer 的 Disruptor 代码如下

// Construct the Disruptor with a SingleProducerSequencer
        Disruptor<LongEvent> disruptor = new Disruptor(
            factory, bufferSize, DaemonThreadFactory.INSTANCE, ProducerType.SINGLE, new BlockingWaitStrategy());

可选的等待策略

默认的等待策略是 BlockingWaitStrategy 它使用锁和条件变量来处理线程唤醒, 这个策略是最慢的, 但在 CPU 使用率方面是最保守的.

  • SleepingWaitStrategy : 通过 busy-wait 循环来保守 CPU 使用率, 但在循环中对 LockSupport.parkNanos(1) 的调用. 在典型的 Linux 系统上, 这会将线程暂停大约 60 µs . 这样子的好处是, 生产者线程不需要采取任何其他动作就可以增加适当的计数器, 也不需要花费信号通知条件变量. 但在生产者线程和消费者线程之间移动事件的平均等等时间会更长. 它在不需要低延迟但对生产线程的影响较小的情况下效果最好. 典型的使用情况是异步日志记录
  • YieldingWaitStrategy : 是可在低延迟系统中使用的 2 种等待策略之一, 在该系统中, 可以选择烧 CPU 周期以改善延迟. YieldingWaitStrategy 将忙于自旋以等等 Sequence 增加到适当的值. 在循环体中, 将调用 Thread.yield() , 以允许其他排队的线程运行. 当需要非常高的性能并且事件处理程序线程数量少于逻辑内核总数时, 这是推荐的策略, 例如, 你开启了超线程(hyper-threading)
  • BusySpinWaitStrategy : 这是最高性能的等待策略, 但对部署环境设置了最高的约束. 仅当事件处理程序线程的数量小于包装盒上物理内核的数量时, 才应使用该策略. 例如, 禁用了超线程.

选择 buffer size 的大小

官方建议, buffer size 的大小, 应该与 CPU 的 L3 cache 匹配.

要想获取 CPU 缓存相关的大小, 可以用

getconf -a | grep -i cache
# 直接获取 L3
getconf -a | grep -i "LEVEL3_CACHE_SIZE" | awk '{print $2}'
  • ICACHE 表示指令缓存大小
  • DCACHE 表示数据缓存大小

根据官方文档的建议, 假设查到的 L3 缓存(LEVEL3_CACHE_SIZE) 为 34603008 , 每个disruptor 的对象消息大小为 256 byte , 则 buffer size 可设置为 65536 (2^16) . 可通过下面的 bash 脚本来计算 :(这里倾向使用偶数指数, 因为 buffer size 必须是 2 的指数的大小)

n=$( getconf -a | grep -i "LEVEL3_CACHE_SIZE" | awk '{print $2}' )
object_size=$1

max_size=$(( ${n} / ${object_size} ))
echo "max size ${max_size}"

v=$( echo "l(${max_size})/l(2)" | bc -l )
buffer_ex=$( printf "%.0f\n" ${v} )
echo "max ex ${buffer_ex}"

if [ $((buffer_ex%2)) -ne 0 ]; then
    echo "buffer_ex is not event . so -1 "
    buffer_ex=$(( ${buffer_ex} - 1 ))
    
fi

real_buffer=$( echo "2^${buffer_ex}" | bc )

echo "real buffer ${real_buffer} = 2^${buffer_ex}"

假设保存为文件名 a.sh , 运行时调用 ./a.sh 256 参数 256 是元素对象的最大大小

计算 对象大小可参考 https://stackoverflow.com/questions/52353/in-java-what-is-the-best-way-to-determine-the-size-of-an-object#52682

与 Spring Boot 集成

定义事件对象及工厂

public class LogEvent {
    private String queue;

    private String msg;

    public String getQueue() {
        return queue;
    }

    public void setQueue(String queue) {
        this.queue = queue;
    }

    public String getMsg() {
        return msg;
    }

    public void setMsg(String msg) {
        this.msg = msg;
    }
}

LogEventFactory

public class LogEventFactory implements EventFactory<LogEvent> {

    @Override
    public LogEvent newInstance() {
        return new LogEvent();
    }
}

定义 Handler 及异常 Handler

import com.lmax.disruptor.EventHandler;
import com.company.service.rabbitmq.RabbitMQService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Component
public class LogEventHandler implements EventHandler<LogEvent> {
    @Autowired
    private RabbitMQService rabbitMQService;

    @Override
    public void onEvent(LogEvent event, long sequence, boolean endOfBatch) throws Exception {
        rabbitMQService.putBidMQMessage(event.getQueue(), event.getMsg());
    }
}

异常 Handler

import com.lmax.disruptor.ExceptionHandler;
import com.company.util.Loggers;

public class DisruptorExceptionHandler implements ExceptionHandler<LogEvent> {
    @Override
    public void handleEventException(Throwable ex, long sequence, LogEvent event) {
        Loggers.ERROR_LOG.error(ex.getMessage() + event + ", seq = " + sequence, ex);
    }

    @Override
    public void handleOnStartException(Throwable ex) {
        Loggers.ERROR_LOG.error(ex.getMessage(), ex);
    }

    @Override
    public void handleOnShutdownException(Throwable ex) {
        Loggers.ERROR_LOG.error(ex.getMessage(), ex);
    }
}

初始化

import com.lmax.disruptor.*;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;
import com.lmax.disruptor.util.DaemonThreadFactory;
import com.company.util.Loggers;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import java.util.concurrent.TimeUnit;

@Component
public class BidLogDisruptor {

    private static final EventTranslatorTwoArg<LogEvent, String, String> TRANSLATOR = (event, sequence, ququeName, msg) -> {
        event.setQueue(ququeName);
        event.setMsg(msg);
    };

    @Autowired
    private LogEventHandler logEventHandler;

    private Disruptor<LogEvent> disruptor;
    private RingBuffer<LogEvent> ringBuffer;
    private static final LogEventFactory factory = new LogEventFactory();

    @PreDestroy
    public void shutdown() {
        try {
            disruptor.shutdown(1, TimeUnit.MINUTES);
        } catch (TimeoutException e) {
            Loggers.ERROR_LOG.error(e.getMessage(), e);
        }
    }

    @PostConstruct
    public void init() {
        final int bufferSize = 128 * 1024;
        disruptor = new Disruptor<>(factory, bufferSize, DaemonThreadFactory.INSTANCE, ProducerType.SINGLE, new BlockingWaitStrategy());
        disruptor.handleEventsWith(logEventHandler);
        disruptor.setDefaultExceptionHandler(new DisruptorExceptionHandler());
        disruptor.start();
        ringBuffer = disruptor.getRingBuffer();
        Loggers.RUNNING_LOG.info("started disruptor ....");
    }

    public void onData(String queueName, String msg) {
        ringBuffer.publishEvent(TRANSLATOR, queueName, msg);
    }
}

使用

@Autowired
private BidLogDisruptor bidLogDisruptor;


bidLogDisruptor.onData(queueName, "msg...");
  

性能

new Thread(() -> {
    Stopwatch stopwatch = Stopwatch.createStarted();
    for (int i = 0; i < 10000; i++) {
        bidLogDisruptor.onData("test", "你好.." + i);
    }
    Loggers.RUNNING_LOG.info("队列 cost {}", stopwatch);

    Stopwatch stopwatch2 = Stopwatch.createStarted();
    for (int i = 0; i < 10000; i++) {
        rabbitMQService.putBidMQMessage("test", "你好.." + i);
    }
    Loggers.RUNNING_LOG.info("直接 cost {}", stopwatch2);
}).start();



队列 cost 6.350 ms
直接 cost 4.359 s

注意事项

  • 构造 Disruptor 的参数 bufferSize : 这个大小要根据业务具体情况来调整. 不宜过高, 也不宜过低. 这是环的对象大小, 基本不会被 GC 回收的
  • BlockingWaitStrategy : 等待策略, 用默认的这个就好. 除非你知道自己在干什么. YieldingWaitStrategy 这个策略的话, CPU 基本都会大于 100%

  • 注意环的对象占用的空间大小. 如果有必要, 可添加个清除对象的 Handler(放在最后): disruptor.handleEventsWith(logEventHandler).then(logEventClearHandler); 清除的 handler 只是将持有的对象设置为 null 即可.

清除对象

public class LogEvent {
  	private TaskBidLog taskBidLog;
    public void clear() {
        taskBidLog = null;
    } 
    public TaskBidLog getTaskBidLog() {
        return taskBidLog;
    }

    public void setTaskBidLog(final TaskBidLog taskBidLog) {
        this.taskBidLog = taskBidLog;
    }
}
public class LogEventClearHandler implements EventHandler<LogEvent> {

    @Override
    public void onEvent(final LogEvent event, final long sequence, final boolean endOfBatch) {
        event.clear();
    }
}

其他资料