Disruptor学习
Contents
简介-翻译
原文 https://github.com/LMAX-Exchange/disruptor/wiki/Introduction
理解 Disruptor
是什么的最好方式是与相似的众所周知的东西进行比较. 对于 Disruptor
来说, 就是 Java 的 BlockingQueue
. 像队列一样, Disruptor
的目的是在同一进程内的线程相互之间移动数据(例如消息或事件). 但 Disruptor
提供的一些与其他队列不同的主要特性 :
- 向具有消费者依赖关系图的消费者发送多播事件
- 为事件预分配内存
- (可选) 无锁
核心概念
在理解 Disruptor
如何工作之前, 这有一些定义的术语, 它们将在整个文档和代码中使用. 对于那些倾向于 DDD (领域驱动设计) 的人, 可以将其视为 Disruptor
领域中无处不在的语言.
Ring Buffer
它通常被认为是 Disruptor
的主要方面, 但从 3.0
开始, 它仅负责存储和更新在 Disruptor
中移动的数据(事件). 对某些高级用例, 可完全由用户代替.
Sequence
Disruptor
使用它作为一种手段来识别特定组件在哪里. 每个消费者 (EventProcessor
) 和 Disruptor
本身一样都维护一个 Sequence
. 大多数并发代码依赖于这些 Sequence
值的移动, 因此 Sequence
支持 AtomicLong
的许多当前功能. 实际上, 和版本 2 之间的唯一真正区别是, Sequence
包含其他功能, 以防止 Sequence
与其他值之间的伪共享.
Sequencer
它是 Disruptor
的真正核心. 此接口的两个实(单生产者, 多生产者)实现了所有并发算法, 这些算法用于在生产者和消费者之间快速正确地传递数据
Sequence Barrier
它是由 Sequencer
产生, 包含主要发布的 Sequence
的引用以及任何从属消费者的 Sequence
. 它包含确定是否有任何事件可供消费者处理的逻辑.
Wait Strategy
它确定消费者如何等待生产者将事件放入 Disruptor
中. 更多详细, 在下面的 可选无锁 部分.
Event
从生产者到消费者的数据传递单元. 没有特定的代码来表示它, 完全是由用户定义的
EventProcessor
用来处理来自 Disruptor
的事件的主事件循环(event loop), 并拥有消费者的 Sequence
所有权. 有一个 BatchEventProcessor
, 它包含事件循环的高效实现并回调到 EventHandler
接口的提供的实现中.
EventHandler
一个用于被用户实现的接口, 对于 Disruptor
来说, 就是一个消费者
Producer
它是用户定义的代码, 调用 Disruptor
将事件进队. 这个概念并没有特定的代码表示.
特性
将上面的元素放入上下文. 下面是 LMAX 如何在其高性能核心服务中使用 Disruptor
的示例.
多播事件 Multicast Events
这是 Disruptor
与 队列最大的区别. 当你有多个消费者在同一个 Disruptor
上监听时, 所有事件都发布给所有消费者, 而队列中单个事件只会发送给一个消费者. Disruptor
的行为旨在用于需要对同一数据进行多个并行操作的情况. LMAX 的一个典型例子是我们执行三个操作, 即日志记录(将输入数据写入持久日志文件), 复制(将输入数据发送到另一台机器以确保数据的远程副本)和业务逻辑(真正的加工工作).
Executor-style
事件处理, 也可以使用 WorkerPool 通过同时并行处理不同事件来伸缩. 注意, 这是固定在现有 Disruptor
类之上的, 并且没有得到相同的一流支持, 因此, 这可能不是实现特定目标的最有效方法.
从上面的图可以看出, 有三个 EventHandler
在监听 Disruptor
(JournalConsumer, ReplicationConsumer, ApplicationConsumer) , 这些 EventHandler 中的每一个都将接收 Disruptor
中所有可用的消息(按相同的顺序). 这允许这些消费者中的每一个并行工作.
消费者依赖图 Consumer Dependency Graph
为了支持现实世界中并行处理行为的应用, 有必要支持消费者之间的协调. 再次参考上面的示例, 有必要防止业务逻辑消费者在日志记录和复制消费者完成其任务之前继续. 我们将此概念称为门控( gating
), 或更准确地说, 是此行为的超集功能称为门控. 门控发生在两个地方. 首先, 我们要确保生产者不会超过消费者. 这是通过调用 RingBuffer.addGatingConsumers()
将相关消费者添加到 Disruptor
中来处理的. 其次, 通过构造一个 SequenceBarrier
来实现前面提到的情况, 该 SequenceBarrier
包含必须首先完成其他处理的组件中的 Sequence
参考上图. 有一个消费者从 Ring Buffer
中监听事件. 此示例中有一个依赖图. ApplicationConsumer
取决于JournalConsumer
和 ReplicationConsumer
. 这意味着, JournalConsumer
和 ReplicationConsumer
可以相互并行自由运行. 从 ApplicationConsumer
的 SequenceBarrier
到 JournalConsumer
和 ReplicationConsumer
的 Sequence
之间的连接可以看到依赖关系. 还值得注意的是, Sequencer
与下游消费者之间的关系. 其作用之一是确保发布不会包装 Ring Buffer
. 为此, 所有下游消费者都不能拥有比 Ring Buffer
的 Sequence
低且小于 Ring Buffer
大小的 Sequence
. 但是, 消费者依赖关系图可以进行有趣的优化. 因为保证 ApplicationConsumer
sequnce 小于或等于 JournalConsumer
和 ReplicationConsumer
(这就是依赖关系所确保的), 所以 Sequencer
只需要查看 ApplicationConsumer
的 sequence
. 从更一般的意义上讲, Sequencer
仅需要了解作为依赖树中的叶子节点的消费者 Sequence
事件预分配 Event Preallocation
Disruptor
的目标之一是允许在低延迟环境中使用. 在低延迟系统中, 有必要减少或删除内存分配. 在基于 Java 的系统中, 目的是有关法律由于 GC 而导致的停顿数(在低延迟 C/C++ 系统中, 由于在内存分配器上竞争, 因此繁重的内存分配也存在问题)
为此, 用户可以在 Disruptor
中预先分配事件所需的存储空间. 在构建过程中, EventFactory
由用户提供, 将为 Disruptor
的 Ring Buffer
中的每个条目调用. 在将新数据发布到 Disruptor
时, API 将允许用户保留所构造的对象, 以便它们可以调用该存储对象上的方法或更新字段. 只要正确实施这些操作, Disruptor
就可以保证这些操作是并发安全的.
可选无锁 Optionally Lock-free
对低延迟的渴望推动的另一个关键实现细节是广泛使用无锁算法来实现 Disruptor
. 所有内存可见性和正确性保证都是使用内存屏障和/或 CAS 实现的. 只有一个用例需要实际锁定, 并且在 BlockingWaitStrategy
中. 这样做仅出于使用条件的目的, 以便在等待新事件到达时可以停放正在使用的线程. 许多低延迟系统将使用 busy-wait
来避免由于使用条件而引起的抖动, 但是, 许多系统的 busy-wait
操作会导致性能显著下降, 尤其是在 CPU 资源严重受限的情况下. 例如, 虚拟化环境中的 Web 服务器.
入门指南
基本的事件产生和消费
事件
public class LongEvent {
private long value;
public void set(long value) {
this.value = value;
}
}
事件工厂(用于在 Ring Buffer 中占位, 然后复用对象, 以减内存分配和 GC)
public class LongEventFactory implements EventFactory<LongEvent> {
@Override
public LongEvent newInstance() {
return new LongEvent();
}
}
事件处理器
public class LongEventHandler implements EventHandler<LongEvent> {
@Override
public void onEvent(LongEvent longEvent, long l, boolean b) throws Exception {
System.out.println("Event: " + longEvent);
}
}
发布事件
使用 Translator
public class LongEventProducerWithTranslator {
private final RingBuffer<LongEvent> ringBuffer;
public LongEventProducerWithTranslator(RingBuffer<LongEvent> ringBuffer) {
this.ringBuffer = ringBuffer;
}
private static final EventTranslatorOneArg<LongEvent, ByteBuffer> TRANSLATOR =
new EventTranslatorOneArg<LongEvent, ByteBuffer>() {
@Override
public void translateTo(LongEvent event, long sequence, ByteBuffer bb) {
event.set(bb.getLong(0));
}
};
public void onData(ByteBuffer bb) {
ringBuffer.publishEvent(TRANSLATOR, bb);
}
}
以前的方式
public class LongEventProducer {
private final RingBuffer<LongEvent> ringBuffer;
public LongEventProducer(RingBuffer<LongEvent> ringBuffer) {
this.ringBuffer = ringBuffer;
}
public void onData(ByteBuffer bb) {
long sequence = ringBuffer.next(); // Grab the next sequence
try {
LongEvent event = ringBuffer.get(sequence); // Get the entry in the Disruptor
// for the sequence
event.set(bb.getLong(0)); // Fill with data
} finally {
ringBuffer.publish(sequence);
}
}
}
调用
@Override
public void run(String... args) throws Exception {
// The factory for the event
LongEventFactory factory = new LongEventFactory();
// Specify the size of the ring buffer, must be power of 2.
int bufferSize = 1024;
// Construct the Disruptor
Disruptor<LongEvent> disruptor = new Disruptor<>(factory, bufferSize, DaemonThreadFactory.INSTANCE);
// Connect the handler
disruptor.handleEventsWith(new LongEventHandler());
// Start the Disruptor, starts all threads running
disruptor.start();
// Get the ring buffer from the Disruptor to be used for publishing.
RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer();
LongEventProducer producer = new LongEventProducer(ringBuffer);
ByteBuffer bb = ByteBuffer.allocate(8);
for (long l = 0; true; l++) {
bb.putLong(0, l);
producer.onData(bb);
Thread.sleep(1000);
}
}
Java 8 风格
private void java8() throws InterruptedException {
// Specify the size of the ring buffer, must be power of 2.
int bufferSize = 1024;
// Construct the Disruptor
Disruptor<LongEvent> disruptor = new Disruptor<>(LongEvent::new, bufferSize, DaemonThreadFactory.INSTANCE);
// Connect the handler
disruptor.handleEventsWith((event, sequence, endOfBatch) -> System.out.println("Event: " + event));
// Start the Disruptor, starts all threads running
disruptor.start();
// Get the ring buffer from the Disruptor to be used for publishing.
RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer();
ByteBuffer bb = ByteBuffer.allocate(8);
for (long l = 0; true; l++) {
bb.putLong(0, l);
ringBuffer.publishEvent((event, sequence, buffer) -> event.set(buffer.getLong(0)), bb);
Thread.sleep(1000);
}
}
清除对象
- 如果只有一个 Handler , 可以直接在该 Handler 里清除即可. 调用
如果有多个 Handler 调用链, 则可在最后添加个专门负责清除的 Handler
class ObjectEvent<T> { T val; void clear() { val = null; } } public class ClearingEventHandler<T> implements EventHandler<ObjectEvent<T>> { public void onEvent(ObjectEvent<T> event, long sequence, boolean endOfBatch) { // Failing to call clear here will result in the // object associated with the event to live until // it is overwritten once the ring buffer has wrapped // around to the beginning. event.clear(); } } public static void main(String[] args) { Disruptor<ObjectEvent<String>> disruptor = new Disruptor<>( () -> ObjectEvent<String>(), bufferSize, DaemonThreadFactory.INSTANCE); disruptor .handleEventsWith(new ProcessingEventHandler()) .then(new ClearingObjectHandler()); }
调优
使用 Single Producer
坚持使用 Single Writer Principle
. 注意, 默认情况下, 是 Multi Producer 的.
创建一个 Single Producer 的 Disruptor 代码如下
// Construct the Disruptor with a SingleProducerSequencer
Disruptor<LongEvent> disruptor = new Disruptor(
factory, bufferSize, DaemonThreadFactory.INSTANCE, ProducerType.SINGLE, new BlockingWaitStrategy());
可选的等待策略
默认的等待策略是 BlockingWaitStrategy
它使用锁和条件变量来处理线程唤醒, 这个策略是最慢的, 但在 CPU 使用率方面是最保守的.
SleepingWaitStrategy
: 通过busy-wait
循环来保守 CPU 使用率, 但在循环中对LockSupport.parkNanos(1)
的调用. 在典型的 Linux 系统上, 这会将线程暂停大约 60 µs . 这样子的好处是, 生产者线程不需要采取任何其他动作就可以增加适当的计数器, 也不需要花费信号通知条件变量. 但在生产者线程和消费者线程之间移动事件的平均等等时间会更长. 它在不需要低延迟但对生产线程的影响较小的情况下效果最好. 典型的使用情况是异步日志记录YieldingWaitStrategy
: 是可在低延迟系统中使用的 2 种等待策略之一, 在该系统中, 可以选择烧 CPU 周期以改善延迟.YieldingWaitStrategy
将忙于自旋以等等Sequence
增加到适当的值. 在循环体中, 将调用Thread.yield()
, 以允许其他排队的线程运行. 当需要非常高的性能并且事件处理程序线程数量少于逻辑内核总数时, 这是推荐的策略
, 例如, 你开启了超线程(hyper-threading)BusySpinWaitStrategy
: 这是最高性能的等待策略, 但对部署环境设置了最高的约束. 仅当事件处理程序线程的数量小于包装盒上物理内核的数量时, 才应使用该策略. 例如, 禁用了超线程.
选择 buffer size 的大小
官方建议, buffer size 的大小, 应该与 CPU 的 L3 cache 匹配.
要想获取 CPU 缓存相关的大小, 可以用
getconf -a | grep -i cache
# 直接获取 L3
getconf -a | grep -i "LEVEL3_CACHE_SIZE" | awk '{print $2}'
- ICACHE 表示指令缓存大小
- DCACHE 表示数据缓存大小
根据官方文档的建议, 假设查到的 L3 缓存(LEVEL3_CACHE_SIZE
) 为 34603008 , 每个disruptor 的对象消息大小为 256 byte , 则 buffer size 可设置为 65536 (2^16) . 可通过下面的 bash 脚本来计算 :(这里倾向使用偶数指数, 因为 buffer size 必须是 2 的指数的大小)
n=$( getconf -a | grep -i "LEVEL3_CACHE_SIZE" | awk '{print $2}' )
object_size=$1
max_size=$(( ${n} / ${object_size} ))
echo "max size ${max_size}"
v=$( echo "l(${max_size})/l(2)" | bc -l )
buffer_ex=$( printf "%.0f\n" ${v} )
echo "max ex ${buffer_ex}"
if [ $((buffer_ex%2)) -ne 0 ]; then
echo "buffer_ex is not event . so -1 "
buffer_ex=$(( ${buffer_ex} - 1 ))
fi
real_buffer=$( echo "2^${buffer_ex}" | bc )
echo "real buffer ${real_buffer} = 2^${buffer_ex}"
假设保存为文件名 a.sh
, 运行时调用 ./a.sh 256
参数 256 是元素对象的最大大小
与 Spring Boot 集成
定义事件对象及工厂
public class LogEvent {
private String queue;
private String msg;
public String getQueue() {
return queue;
}
public void setQueue(String queue) {
this.queue = queue;
}
public String getMsg() {
return msg;
}
public void setMsg(String msg) {
this.msg = msg;
}
}
LogEventFactory
public class LogEventFactory implements EventFactory<LogEvent> {
@Override
public LogEvent newInstance() {
return new LogEvent();
}
}
定义 Handler 及异常 Handler
import com.lmax.disruptor.EventHandler;
import com.company.service.rabbitmq.RabbitMQService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component
public class LogEventHandler implements EventHandler<LogEvent> {
@Autowired
private RabbitMQService rabbitMQService;
@Override
public void onEvent(LogEvent event, long sequence, boolean endOfBatch) throws Exception {
rabbitMQService.putBidMQMessage(event.getQueue(), event.getMsg());
}
}
异常 Handler
import com.lmax.disruptor.ExceptionHandler;
import com.company.util.Loggers;
public class DisruptorExceptionHandler implements ExceptionHandler<LogEvent> {
@Override
public void handleEventException(Throwable ex, long sequence, LogEvent event) {
Loggers.ERROR_LOG.error(ex.getMessage() + event + ", seq = " + sequence, ex);
}
@Override
public void handleOnStartException(Throwable ex) {
Loggers.ERROR_LOG.error(ex.getMessage(), ex);
}
@Override
public void handleOnShutdownException(Throwable ex) {
Loggers.ERROR_LOG.error(ex.getMessage(), ex);
}
}
初始化
import com.lmax.disruptor.*;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;
import com.lmax.disruptor.util.DaemonThreadFactory;
import com.company.util.Loggers;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import java.util.concurrent.TimeUnit;
@Component
public class BidLogDisruptor {
private static final EventTranslatorTwoArg<LogEvent, String, String> TRANSLATOR = (event, sequence, ququeName, msg) -> {
event.setQueue(ququeName);
event.setMsg(msg);
};
@Autowired
private LogEventHandler logEventHandler;
private Disruptor<LogEvent> disruptor;
private RingBuffer<LogEvent> ringBuffer;
private static final LogEventFactory factory = new LogEventFactory();
@PreDestroy
public void shutdown() {
try {
disruptor.shutdown(1, TimeUnit.MINUTES);
} catch (TimeoutException e) {
Loggers.ERROR_LOG.error(e.getMessage(), e);
}
}
@PostConstruct
public void init() {
final int bufferSize = 128 * 1024;
disruptor = new Disruptor<>(factory, bufferSize, DaemonThreadFactory.INSTANCE, ProducerType.SINGLE, new BlockingWaitStrategy());
disruptor.handleEventsWith(logEventHandler);
disruptor.setDefaultExceptionHandler(new DisruptorExceptionHandler());
disruptor.start();
ringBuffer = disruptor.getRingBuffer();
Loggers.RUNNING_LOG.info("started disruptor ....");
}
public void onData(String queueName, String msg) {
ringBuffer.publishEvent(TRANSLATOR, queueName, msg);
}
}
使用
@Autowired
private BidLogDisruptor bidLogDisruptor;
bidLogDisruptor.onData(queueName, "msg...");
性能
new Thread(() -> {
Stopwatch stopwatch = Stopwatch.createStarted();
for (int i = 0; i < 10000; i++) {
bidLogDisruptor.onData("test", "你好.." + i);
}
Loggers.RUNNING_LOG.info("队列 cost {}", stopwatch);
Stopwatch stopwatch2 = Stopwatch.createStarted();
for (int i = 0; i < 10000; i++) {
rabbitMQService.putBidMQMessage("test", "你好.." + i);
}
Loggers.RUNNING_LOG.info("直接 cost {}", stopwatch2);
}).start();
队列 cost 6.350 ms
直接 cost 4.359 s
注意事项
- 构造
Disruptor
的参数bufferSize
: 这个大小要根据业务具体情况来调整. 不宜过高, 也不宜过低. 这是环的对象大小, 基本不会被 GC 回收的 BlockingWaitStrategy
: 等待策略, 用默认的这个就好. 除非你知道自己在干什么.YieldingWaitStrategy
这个策略的话, CPU 基本都会大于100%
注意环的对象占用的空间大小. 如果有必要, 可添加个清除对象的 Handler(放在最后):
disruptor.handleEventsWith(logEventHandler).then(logEventClearHandler);
清除的 handler 只是将持有的对象设置为null
即可.
清除对象
public class LogEvent {
private TaskBidLog taskBidLog;
public void clear() {
taskBidLog = null;
}
public TaskBidLog getTaskBidLog() {
return taskBidLog;
}
public void setTaskBidLog(final TaskBidLog taskBidLog) {
this.taskBidLog = taskBidLog;
}
}
public class LogEventClearHandler implements EventHandler<LogEvent> {
@Override
public void onEvent(final LogEvent event, final long sequence, final boolean endOfBatch) {
event.clear();
}
}