RabbitMQ 中队列与消息持久化属性组合说明
Contents
Queue 队列的持久化
声明队列时, Durability
的属性值, 可以为 durable
(持久的) 和 transient
(短暂的).
- durable : 表示 RabbitMQ 重启后, 依然会存在的队列(MQ 会自动重新声明这个队列, 即有持久化这个队列的声明)
- transient : 表示 RabbitMQ 重启后, 不会存在的队列(MQ不会自动声明了, 即没有持久化这个队列的声明属性)
Message 的持久化属性
投递消息时, 也有个属性: Delivery mode
, 可以为 Persistent
和 Non-Persistent
.
- persistent : 只要消息一达到队列, 就会立即写到文件来持久化
- non-persistent: 它只会在 rabbitmq 存在内存压力的时候, 才会持久化到磁盘.
四种组合的情况
durable queue , durable message
这个应该没有疑问吧? 哈哈.
durable queue, non-persistent message
这个也没有疑问吧? 哈哈
transient queue, non-persistent message
这个也没有疑问吧? 哈哈
transient queue, persistent message
这点我自己就比较困惑, 非持久化的队列, 投递一条持久化的消息会发生什么事?
经测试: 结果是, 这种情况下, 持久化消息的属性并没有用(即没生效). 非持久化的队列, 投递持久化的消息时, 消息持久化的属性并没有作用!
性能
以下是5轮, 每轮5000条插入消息的耗时统计
测试代码
package com.example.demo.demorabbit;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageDeliveryMode;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
@Component
public class TestRabbit {
private static final String DURABLE_QUEUE = "hello.durable.queue";
private static final String TRANSIENT_QUEUE = "hello.transient.queue";
private static final Queue durableQueue = new Queue(DURABLE_QUEUE, true);
private static final Queue transientQueue = new Queue(TRANSIENT_QUEUE, false);
@Autowired
private RabbitTemplate rabbitTemplate;
@Autowired
private RabbitAdmin rabbitAdmin;
private ObjectMapper objectMapper = new ObjectMapper();
@PostConstruct
public void init() throws JsonProcessingException {
setup();
rabbitAdmin.purgeQueue(durableQueue.getName(), false);
for (int i = 0; i < 5; i++) {
long s = System.currentTimeMillis();
for (int j = 0; j < 5000; j++) {
sendDurableQueueAndDurableMessage();
}
long end = System.currentTimeMillis();
System.out.println("sendDurableQueueAndDurableMessage cost " + (end - s) + " ms");
}
rabbitAdmin.purgeQueue(durableQueue.getName(), false);
for (int i = 0; i < 5; i++) {
long s = System.currentTimeMillis();
for (int j = 0; j < 5000; j++) {
sendDurableQueueAndNoneDurableMessage();
}
long end = System.currentTimeMillis();
System.out.println("sendDurableQueueAndNoneDurableMessage cost " + (end - s) + " ms");
}
rabbitAdmin.purgeQueue(durableQueue.getName(), false);
rabbitAdmin.purgeQueue(transientQueue.getName(), false);
for (int i = 0; i < 5; i++) {
long s = System.currentTimeMillis();
for (int j = 0; j < 5000; j++) {
sendNoneDurableQueueAndDurableMessage();
}
long end = System.currentTimeMillis();
System.out.println("sendNoneDurableQueueAndDurableMessage cost " + (end - s) + " ms");
}
rabbitAdmin.purgeQueue(transientQueue.getName(), false);
for (int i = 0; i < 5; i++) {
long s = System.currentTimeMillis();
for (int j = 0; j < 5000; j++) {
sendNoneDurableQueueAndNoneDurableMessage();
}
long end = System.currentTimeMillis();
System.out.println("sendNoneDurableQueueAndNoneDurableMessage cost " + (end - s) + " ms");
}
rabbitAdmin.purgeQueue(transientQueue.getName(), false);
}
final MessagePostProcessor messageDurablePostProcessor = new MessagePostProcessor() {
@Override
public Message postProcessMessage(final Message message) throws AmqpException {
message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
;
return message;
}
};
final MessagePostProcessor messageNoneDurablePostProcessor = new MessagePostProcessor() {
@Override
public Message postProcessMessage(final Message message) throws AmqpException {
message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT);
;
return message;
}
};
public void sendDurableQueueAndDurableMessage() throws JsonProcessingException {
BidLog bidLog = new BidLog();
rabbitTemplate.convertAndSend(durableQueue.getName(), (Object) objectMapper.writeValueAsString(bidLog), messageDurablePostProcessor);
}
public void sendDurableQueueAndNoneDurableMessage() throws JsonProcessingException {
BidLog bidLog = new BidLog();
rabbitTemplate.convertAndSend(durableQueue.getName(), (Object) objectMapper.writeValueAsString(bidLog), messageNoneDurablePostProcessor);
}
public void sendNoneDurableQueueAndDurableMessage() throws JsonProcessingException {
BidLog bidLog = new BidLog();
rabbitTemplate.convertAndSend(transientQueue.getName(), (Object) objectMapper.writeValueAsString(bidLog), messageDurablePostProcessor);
}
public void sendNoneDurableQueueAndNoneDurableMessage() throws JsonProcessingException {
BidLog bidLog = new BidLog();
rabbitTemplate.convertAndSend(transientQueue.getName(), (Object) objectMapper.writeValueAsString(bidLog), messageNoneDurablePostProcessor);
}
public void setup() {
System.out.println("init ....");
rabbitAdmin.declareQueue(durableQueue);
rabbitAdmin.declareQueue(transientQueue);
System.out.println("声明队列完毕");
}
}
以下是输出结果
sendDurableQueueAndDurableMessage cost 668 ms
sendDurableQueueAndDurableMessage cost 470 ms
sendDurableQueueAndDurableMessage cost 777 ms
sendDurableQueueAndDurableMessage cost 419 ms
sendDurableQueueAndDurableMessage cost 743 ms
sendDurableQueueAndNoneDurableMessage cost 389 ms
sendDurableQueueAndNoneDurableMessage cost 283 ms
sendDurableQueueAndNoneDurableMessage cost 294 ms
sendDurableQueueAndNoneDurableMessage cost 308 ms
sendDurableQueueAndNoneDurableMessage cost 332 ms
sendNoneDurableQueueAndDurableMessage cost 155 ms
sendNoneDurableQueueAndDurableMessage cost 242 ms
sendNoneDurableQueueAndDurableMessage cost 251 ms
sendNoneDurableQueueAndDurableMessage cost 397 ms
sendNoneDurableQueueAndDurableMessage cost 357 ms
sendNoneDurableQueueAndNoneDurableMessage cost 228 ms
sendNoneDurableQueueAndNoneDurableMessage cost 347 ms
sendNoneDurableQueueAndNoneDurableMessage cost 296 ms
sendNoneDurableQueueAndNoneDurableMessage cost 472 ms
sendNoneDurableQueueAndNoneDurableMessage cost 385 ms