Queue 队列的持久化

声明队列时, Durability 的属性值, 可以为 durable (持久的) 和 transient (短暂的).

  • durable : 表示 RabbitMQ 重启后, 依然会存在的队列(MQ 会自动重新声明这个队列, 即有持久化这个队列的声明)
  • transient : 表示 RabbitMQ 重启后, 不会存在的队列(MQ不会自动声明了, 即没有持久化这个队列的声明属性)

Message 的持久化属性

投递消息时, 也有个属性: Delivery mode , 可以为 PersistentNon-Persistent .

  • persistent : 只要消息一达到队列, 就会立即写到文件来持久化
  • non-persistent: 它只会在 rabbitmq 存在内存压力的时候, 才会持久化到磁盘.

persistent

四种组合的情况

durable queue , durable message

这个应该没有疑问吧? 哈哈.

durable queue, non-persistent message

这个也没有疑问吧? 哈哈

transient queue, non-persistent message

这个也没有疑问吧? 哈哈

transient queue, persistent message

这点我自己就比较困惑, 非持久化的队列, 投递一条持久化的消息会发生什么事?

经测试: 结果是, 这种情况下, 持久化消息的属性并没有用(即没生效). 非持久化的队列, 投递持久化的消息时, 消息持久化的属性并没有作用!

性能

以下是5轮, 每轮5000条插入消息的耗时统计

测试代码

package com.example.demo.demorabbit;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageDeliveryMode;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;

@Component
public class TestRabbit {

    private static final String DURABLE_QUEUE = "hello.durable.queue";

    private static final String TRANSIENT_QUEUE = "hello.transient.queue";

    private static final Queue durableQueue = new Queue(DURABLE_QUEUE, true);
    private static final Queue transientQueue = new Queue(TRANSIENT_QUEUE, false);

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Autowired
    private RabbitAdmin rabbitAdmin;

    private ObjectMapper objectMapper = new ObjectMapper();

    @PostConstruct
    public void init() throws JsonProcessingException {
        setup();
        rabbitAdmin.purgeQueue(durableQueue.getName(), false);
        for (int i = 0; i < 5; i++) {
            long s = System.currentTimeMillis();
            for (int j = 0; j < 5000; j++) {
                sendDurableQueueAndDurableMessage();
            }
            long end = System.currentTimeMillis();
            System.out.println("sendDurableQueueAndDurableMessage cost " + (end - s) + " ms");
        }
        rabbitAdmin.purgeQueue(durableQueue.getName(), false);
        for (int i = 0; i < 5; i++) {
            long s = System.currentTimeMillis();
            for (int j = 0; j < 5000; j++) {
                sendDurableQueueAndNoneDurableMessage();
            }
            long end = System.currentTimeMillis();
            System.out.println("sendDurableQueueAndNoneDurableMessage cost " + (end - s) + " ms");
        }

        rabbitAdmin.purgeQueue(durableQueue.getName(), false);

        rabbitAdmin.purgeQueue(transientQueue.getName(), false);
        for (int i = 0; i < 5; i++) {
            long s = System.currentTimeMillis();
            for (int j = 0; j < 5000; j++) {
                sendNoneDurableQueueAndDurableMessage();
            }
            long end = System.currentTimeMillis();
            System.out.println("sendNoneDurableQueueAndDurableMessage cost " + (end - s) + " ms");
        }

        rabbitAdmin.purgeQueue(transientQueue.getName(), false);
        for (int i = 0; i < 5; i++) {
            long s = System.currentTimeMillis();
            for (int j = 0; j < 5000; j++) {
                sendNoneDurableQueueAndNoneDurableMessage();
            }
            long end = System.currentTimeMillis();
            System.out.println("sendNoneDurableQueueAndNoneDurableMessage cost " + (end - s) + " ms");
        }

        rabbitAdmin.purgeQueue(transientQueue.getName(), false);
    }

    final MessagePostProcessor messageDurablePostProcessor = new MessagePostProcessor() {
        @Override
        public Message postProcessMessage(final Message message) throws AmqpException {
            message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
            ;
            return message;
        }
    };

    final MessagePostProcessor messageNoneDurablePostProcessor = new MessagePostProcessor() {
        @Override
        public Message postProcessMessage(final Message message) throws AmqpException {
            message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT);
            ;
            return message;
        }
    };

    public void sendDurableQueueAndDurableMessage() throws JsonProcessingException {
        BidLog bidLog = new BidLog();
        rabbitTemplate.convertAndSend(durableQueue.getName(), (Object) objectMapper.writeValueAsString(bidLog), messageDurablePostProcessor);
    }

    public void sendDurableQueueAndNoneDurableMessage() throws JsonProcessingException {
        BidLog bidLog = new BidLog();
        rabbitTemplate.convertAndSend(durableQueue.getName(), (Object) objectMapper.writeValueAsString(bidLog), messageNoneDurablePostProcessor);
    }

    public void sendNoneDurableQueueAndDurableMessage() throws JsonProcessingException {
        BidLog bidLog = new BidLog();
        rabbitTemplate.convertAndSend(transientQueue.getName(), (Object) objectMapper.writeValueAsString(bidLog), messageDurablePostProcessor);
    }


    public void sendNoneDurableQueueAndNoneDurableMessage() throws JsonProcessingException {
        BidLog bidLog = new BidLog();
        rabbitTemplate.convertAndSend(transientQueue.getName(), (Object) objectMapper.writeValueAsString(bidLog), messageNoneDurablePostProcessor);
    }

    public void setup() {
        System.out.println("init ....");
        rabbitAdmin.declareQueue(durableQueue);
        rabbitAdmin.declareQueue(transientQueue);
        System.out.println("声明队列完毕");
    }
}

以下是输出结果

sendDurableQueueAndDurableMessage cost 668 ms
sendDurableQueueAndDurableMessage cost 470 ms
sendDurableQueueAndDurableMessage cost 777 ms
sendDurableQueueAndDurableMessage cost 419 ms
sendDurableQueueAndDurableMessage cost 743 ms

sendDurableQueueAndNoneDurableMessage cost 389 ms
sendDurableQueueAndNoneDurableMessage cost 283 ms
sendDurableQueueAndNoneDurableMessage cost 294 ms
sendDurableQueueAndNoneDurableMessage cost 308 ms
sendDurableQueueAndNoneDurableMessage cost 332 ms

sendNoneDurableQueueAndDurableMessage cost 155 ms
sendNoneDurableQueueAndDurableMessage cost 242 ms
sendNoneDurableQueueAndDurableMessage cost 251 ms
sendNoneDurableQueueAndDurableMessage cost 397 ms
sendNoneDurableQueueAndDurableMessage cost 357 ms

sendNoneDurableQueueAndNoneDurableMessage cost 228 ms
sendNoneDurableQueueAndNoneDurableMessage cost 347 ms
sendNoneDurableQueueAndNoneDurableMessage cost 296 ms
sendNoneDurableQueueAndNoneDurableMessage cost 472 ms
sendNoneDurableQueueAndNoneDurableMessage cost 385 ms