起因

线上DSP系统, 使用 RabbitMQ 作为竞价日志的中转站, 然后出队插入到MySQL. 于是就需要进行批量出队插入DB, 这样子性能和效率更高点.

但 RabbitMQ 自身, 并不支持一次性批量获取消息的. 只能通过以下设置来间接实现

  • 设置 prefetch
  • 然后不断获取消息
  • 然后在确认的时候, basic.ack(消息ID, true) (这表示确认所有 <= 消息ID 的消息)

代码

版本一

一开始时, 有个同事是使用比较原始的 rabbitmq 的官方库, 手动来管理 RabbitMQ 的 connectionchannel . 代码如下

import java.io.IOException;
import java.util.concurrent.TimeoutException;

import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import javax.annotation.Resource;

import org.springframework.stereotype.Component;

import com.rabbitmq.client.AlreadyClosedException;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.ShutdownListener;
import com.rabbitmq.client.ShutdownSignalException;
import com.company.util.Loggers;

@Component
public class RabbitmqClient {

    @Resource (name = "rabbitConnectionFactory")
    private ConnectionFactory rabbitConnectionFactory;

    private volatile Connection rabbitConnection;

    @PostConstruct
    private void init() {
        try {
            rabbitConnection = rabbitConnectionFactory.newConnection();
            rabbitConnection.addShutdownListener(new ShutdownListener() {
                @Override
                public void shutdownCompleted(ShutdownSignalException cause) {
                    Loggers.ERROR_LOG.error("rabbitmq connection shutdown. reason: {}", cause);
                }
            });
        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        }
    }

    @PreDestroy
    private void destroy() {
        if (rabbitConnection != null && rabbitConnection.isOpen()) {
            try {
                rabbitConnection.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

    public Channel getRabbitChannel() {
        try {
        	if (!rabbitConnection.isOpen()) {
                synchronized (rabbitConnection) {
                    if (!rabbitConnection.isOpen()) {
                        try {
                            Loggers.RUNNING_LOG.error("rabbitmq connection try new connection. ");
                            rabbitConnection = rabbitConnectionFactory.newConnection();
                            rabbitConnection.addShutdownListener(new ShutdownListener() {
                                @Override
                                public void shutdownCompleted(ShutdownSignalException cause) {
                                    Loggers.ERROR_LOG.error("rabbitmq connection shutdown. reason: {}", cause);
                                }
                            });
                        } catch (IOException e) {
                            e.printStackTrace();
                        } catch (TimeoutException e) {
                            e.printStackTrace();
                        }
                    }
                }
            }

            return rabbitConnection.createChannel();
        } catch (AlreadyClosedException ace) {
            Loggers.ERROR_LOG.error("rabbitmq connection already close. error: {}", ace);
        } catch (IOException e) {
            e.printStackTrace();
        }
        return null;
    }
}

然后在其他地方出队时, 手动 close channel.

出队的处理

package com.company.listener.bid;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.QueueingConsumer;
import com.company.listener.RabbitmqClient;
import com.company.pojo.WinPriceLog;
import com.company.service.TaskBidLogService;
import com.company.service.rabbitmq.RabbitMQService;
import com.company.util.JsonUtil;
import com.company.util.Loggers;
import org.apache.commons.lang.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeoutException;

@Component
public class WinPriceLogListener {

    @Value("${log.dequeue.prefetch}")
    private String prefetchCount;

    @Value("${queue.win.price.log}")
    private String winPriceLogQueue;
    @Value("${queue.win.price.log.process}")
    private String winPriceLogQueProcess;

    @Autowired
    private RabbitMQService queueService;

    @Autowired
    private TaskBidLogService taskBidLogService;
    @Autowired
    private RabbitmqClient rabbitmqClient;


    public void winPriceLogOutQueue(byte[] msg) {
        winPriceLogOutQueue(new String(msg, StandardCharsets.UTF_8));
    }

    private void winPriceLogOutQueue(String _msg) {
        long s = System.currentTimeMillis();

        if (StringUtils.isBlank(_msg)) {
            return;
        }

        long enQueueTime = Long.parseLong(_msg);
        Channel channel = rabbitmqClient.getRabbitChannel();
        if (channel == null) {
            return;
        }
        try {
            AMQP.Queue.DeclareOk declareOk = channel.queueDeclare(winPriceLogQueue, true, false, false, null);
            int messageCount = declareOk.getMessageCount();
            if (messageCount == 0) { //over if there's no message in queue
                return;
            }
            channel.basicQos(Integer.parseInt(prefetchCount));
            QueueingConsumer consumer = new QueueingConsumer(channel);
            channel.basicConsume(winPriceLogQueue, false, consumer);

            List<WinPriceLog> list = new ArrayList<>();
            List<QueueingConsumer.Delivery> deliveryList = new ArrayList<>();

            long _s = System.currentTimeMillis();
            while (messageCount > 0) {
                QueueingConsumer.Delivery delivery = consumer.nextDelivery(500);
                if (delivery == null) {
                    break;
                }
                String msg = new String(delivery.getBody());
                if (StringUtils.isBlank(msg)) {
                    break;
                }
                WinPriceLog wpl = JsonUtil.jsonStr2Obj(msg, WinPriceLog.class);
                list.add(wpl);
                deliveryList.add(delivery);

                messageCount--;

                if (deliveryList.size() % 2500 == 0) {
                    int affectedRow = taskBidLogService.insertWinPriceBatchJDBC(list);
                    channel.basicAck(deliveryList.get(deliveryList.size() - 1).getEnvelope().getDeliveryTag(), true);

                    Loggers.AUDIT_LOG.info("{} winPriceLog out queue. consume using {}ms, insert:{} ", new Object[]{deliveryList.size(), (System.currentTimeMillis()
                            - _s), affectedRow});
                    list.clear();
                    deliveryList.clear();
                }
            }
            long consumeCost = System.currentTimeMillis() - _s;

            int effectRow = taskBidLogService.insertWinPriceBatchJDBC(list);
            //ack
            if (!CollectionUtils.isEmpty(deliveryList)) {
                channel.basicAck(deliveryList.get(deliveryList.size() - 1).getEnvelope().getDeliveryTag(), true);
            }
            Loggers.AUDIT_LOG.info("{} winPriceLogs out queue.  totally using {}ms, consume using {}ms, effectRow: {}",
                    new Object[]{deliveryList.size(), (System.currentTimeMillis() - s), consumeCost, effectRow});

            if (declareOk.getMessageCount() > Integer.parseInt(prefetchCount)) {//message count of queue greater than [PREFETCH_COUNT]
                queueService.putMessage(winPriceLogQueProcess, _msg);
            }

        } catch (Exception e) {
            Loggers.ERROR_LOG.error("win price log dequeue Exception. ", e);
        } finally {
            try {
                if (channel != null && channel.isOpen()) {
                    channel.close();
                }
            } catch (IOException e) {
                Loggers.ERROR_LOG.error("rabbitmq channel close IOException. {}", e);
            } catch (TimeoutException e) {
                Loggers.ERROR_LOG.error("rabbitmq channel close TimeoutException. {}", e);
            }
        }
    }
}

问题

  • 可以看到 getRabbitChannel() 方法, 虽然使用了 double check 来判断 connection 的状态, 然后重连. 但是, 它没有限制资源以及连接的频率等, 导致线上 RabbitMQ 中间件出现阻塞等问题时, 这个方法会在短时间内, 连接大量的 Connection , 最后导致触发了 Linux 的 OOM 机制.

  • 不知道是不是因为手动管理不当的原因, 当在高并发, 高出价率时, 经常会报 com.rabbitmq.client.impl.UnknownChannelException: Unknown channel number X 等异常

几乎每次大量投放业务时, 这种现象经常会出现.

虽然正常情况下, 并没有数据丢失这种问题.

版本2

既然项目中使用了 Spring 的 amqp 项目, 那为什么不用Spring RabbitTemplate 这些模板类来处理呢? Spring 有比较多的模板模式(比如 RedisTemplate, RabbitTemplte, RestTemplate等) 都是非常好用的. 它会自动管理资源和缓存等问题.

所以, 以下是使用 RabbitTemplate 后的代码版本

   private void winPriceLogOutQueue(final String msg) {
        if (StringUtils.isBlank(msg)) {
            return;
        }
        final long s = System.currentTimeMillis();

        rabbitTemplate.execute(new ChannelCallback<Object>() {
            @Override
            public Object doInRabbit(final Channel channel) throws Exception {
                DeclareOk declareOk = channel.queueDeclare(winPriceLogQueue, true, false, false, null);
                int messageCount = declareOk.getMessageCount();
                if (messageCount == 0) { //over if there's no message in queue
                    return null;
                }
                channel.basicQos(reloadableProperties.getPrefetchCount());
                QueueingConsumer consumer = new QueueingConsumer(channel);
                channel.basicConsume(winPriceLogQueue, false, "win price log out queue consumer" + DateUtil.formartFullDate(new Date()), consumer);

                List<WinPriceLog> list = new ArrayList<>();
                List<Delivery> deliveryList = new ArrayList<>();
                long messageId = -1;
                long startTime = System.currentTimeMillis();
                Loggers.WIN_LOG.info("win price log dequeue size {}", messageCount);
                int totalEffectedRows = 0;
                while (messageCount-- > 0) {
                    Delivery delivery = consumer.nextDelivery(500);
                    if (delivery == null) {
                        break;
                    }
                    messageId = delivery.getEnvelope().getDeliveryTag();
                    String body = new String(delivery.getBody());
                    Loggers.WIN_LOG.info("win price log dequeue {}", body);
                    if (StringUtils.isBlank(body)) {
                             continue;
                    }
                    WinPriceLog wpl = JsonUtil.jsonStr2Obj(body, WinPriceLog.class);
                    list.add(wpl);
                    deliveryList.add(delivery);

                    if (deliveryList.size() % 2500 == 0) {
                        int affectedRow = taskBidLogService.insertWinPriceBatchJDBC(list);
                        totalEffectedRows += affectedRow;
                        channel.basicAck(messageId, true);

                        Loggers.AUDIT_LOG.info("{} winPriceLog out queue. consume using {}ms, insert:{} ", deliveryList.size(), System.currentTimeMillis()
                                - startTime, affectedRow);
                        list.clear();
                        deliveryList.clear();
                        messageId = -1;
                    }
                }
                long consumeCost = System.currentTimeMillis() - startTime;

                int effectRow = taskBidLogService.insertWinPriceBatchJDBC(list);
                totalEffectedRows += effectRow;
                Loggers.WIN_LOG.info("win price log dequeue insert db size {}", totalEffectedRows);
                //ack
                if (messageId > 0) {
                    channel.basicAck(messageId, true);
                }
                Loggers.AUDIT_LOG.info("{} winPriceLogs out queue.  totally using {}ms, consume using {}ms, effectRow: {}",
                        deliveryList.size(), (System.currentTimeMillis() - s), consumeCost, effectRow);

                return null;
            }
        });
    }

好处

  • MQ 相关的资源 Connection , Channel 等, 全交给 Spring RabbitTemplate 来管理了. 避免手动管理不当而导致 OOM .

代码问题

但是, 经过测试, 发现它会有 数据丢失 的问题!. RabbitMQ 一向是以稳定可靠, 以及保证数据不会丢失而著名的. 而我们在测试过程中, 竟然发现每次都会有数据丢失!

经过Debug, 发现导致的原因如下

  • 设置了 prefetch (相当于在 consumer 里设置了一个消息缓存池, RabbitMQ Server 会不断一条一条地发送消息过来, 直到达到 prefetch 大小为止, 不断这样子循环)
  • 批量处理完后(参见以上代码) 虽然代码是处理了当时快照时的队列大小的消息: DeclareOk declareOk = channel.queueDeclare(winPriceLogQueue, true, false, false, null); int messageCount = declareOk.getMessageCount(); 但是, 生产者还是会不断发送消息过来, 然后 MQ 还是会不断发送消息到 consumer
  • 虽然我们 ack 了已经正确插入DB 的数据. 但是该 Channel 中, 它的 messageID ( messageID 在每个 channel 中是独立的, 即每一个 channel 中, 都有一个 messageID 计数器, 并且是从1开始不断递增的) 还是在不断地递增.
  • 假设, 第一轮时, 我们处理了 100 消息( messageId 1~100), 但该 channel 中的 messageID 已经累计到 250 了(因为它还会不断接收 MQ Server 发送过来的消息, 只是我们还没有确认)
  • 现在开始第二轮(由于 Spring amqp 默认情况下, 会缓存 channel 的), 它可能复用了第一轮的 Channel , 这时出队时的 messageID 是 251 了! 最后, 我们处理该轮消息时, 假设又处理了 100条, 即 (messageID为 351). 这时, 批量确认所有 messageID <= 351 的消息, 就会导致中间的 (101~250) 这些消息, 没有被处理!但却被我们确认了!

重现代码:

写一个不断发送消息的代码

        new Thread(new Runnable() {
            @Override
            public void run() {
                for (int i = 0; i < 500000; i++) {
                    rabbitTemplate.convertAndSend(queueName, "hello" + i);
                }

                System.out.println("done =======> 1000");
            }
        }).start();

还有一个批量出队的代码(模拟每次处理 100 条消息)

    @Scheduled(cron = "*/5 * *  * * *")
    public void consumer() {
        rabbitTemplate.execute(channel -> {
            SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
            try {
                final AMQP.Queue.DeclareOk ok = channel.queueDeclare(queueName, true, false, false, null);
                int messageCount = ok.getMessageCount();
                log.info("run consumer {}, msg count {}", sdf.format(new Date()), messageCount);
                if (messageCount == 0) {
                    return null;
                }

                channel.basicQos(100);
                QueueingConsumer queueingConsumer = new QueueingConsumer(channel);

                log.info("channel id {}", Integer.toHexString(System.identityHashCode(channel)));
                final String inConsumerTag = "test consumer" + sdf.format(new Date());
                channel.basicConsume(queueName, false, inConsumerTag, queueingConsumer);

                long messageId = -1;
                int dealedCount = 0;
                int i = 100;
                while (i-- > 0) {
                    QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery(100);
                    if (delivery == null) {
                        break;
                    }
                    String msg = new String(delivery.getBody());

                    messageId = delivery.getEnvelope().getDeliveryTag();
                    log.info("get message {} delivery id {}", msg, messageId);
                    dealedCount++;
                    if (dealedCount % 5 == 0) {
                        channel.basicAck(messageId, true);
                        log.info("batch ack message id =>{}", messageId);
                        messageId = -1;
                    }
                }
                if (messageId > 0) {
                    channel.basicAck(messageId, true);
                    log.info("last to ack message id =>{}", messageId);
                }
            } finally {
                log.info("consumer done {}", sdf.format(new Date()));
            }
            return null;
        });
    }

这时就可以重现”数据丢失”的问题了!

img img

  • 可以看到, 它复用了 channel (它们的内存地址同样为 6d7d893)
  • 第二轮时, 它的 delivery id 是从 201 开始的! (实际上, 我们期待它应该是从 1 开始的或 消息内容从 hello101 开始! 因为我们只 ack 了前100条, 那后续的处理应该接着上一轮继续处理的)

  • 那为什么 template 处理完后 执行了 channel.close() 并没有真实关闭呢(因为缓存 channel, close 的时候, 只是放回缓存池)

解决

强制刷新 channel (即让 mq 每次都创建一个新的 channel, 虽然缓存模式为 channel, 但这种情况下, 看来只能强制关闭 channel, 让还没有真正处理的消息返回队列, 然后重新出队 ack了. 不知道还有没有更好的方式, 有的话, 请 email 我哈)

即在代码最后 return null; 之前, 添加以下方法

channel.abort()

这个方法, 会强制关闭 channel , 由于 consumer 设置了手动 ack, 所以那些未被插入DB的内存消息会重新出队.

这时, 可以看到

img img

  • channel 每次都是新的了
  • 消息是接着上一轮开始的, 并且没有 “丢失”