RabbitMQ 批量处理时神奇的数据丢失及处理办法
Contents
起因
线上DSP系统, 使用 RabbitMQ 作为竞价日志的中转站, 然后出队插入到MySQL. 于是就需要进行批量出队插入DB, 这样子性能和效率更高点.
但 RabbitMQ 自身, 并不支持一次性批量获取消息的. 只能通过以下设置来间接实现
- 设置 prefetch
- 然后不断获取消息
- 然后在确认的时候,
basic.ack(消息ID, true)
(这表示确认所有<=
消息ID 的消息)
代码
版本一
一开始时, 有个同事是使用比较原始的 rabbitmq 的官方库, 手动来管理 RabbitMQ 的 connection
和 channel
. 代码如下
import java.io.IOException;
import java.util.concurrent.TimeoutException;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import javax.annotation.Resource;
import org.springframework.stereotype.Component;
import com.rabbitmq.client.AlreadyClosedException;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.ShutdownListener;
import com.rabbitmq.client.ShutdownSignalException;
import com.company.util.Loggers;
@Component
public class RabbitmqClient {
@Resource (name = "rabbitConnectionFactory")
private ConnectionFactory rabbitConnectionFactory;
private volatile Connection rabbitConnection;
@PostConstruct
private void init() {
try {
rabbitConnection = rabbitConnectionFactory.newConnection();
rabbitConnection.addShutdownListener(new ShutdownListener() {
@Override
public void shutdownCompleted(ShutdownSignalException cause) {
Loggers.ERROR_LOG.error("rabbitmq connection shutdown. reason: {}", cause);
}
});
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
}
@PreDestroy
private void destroy() {
if (rabbitConnection != null && rabbitConnection.isOpen()) {
try {
rabbitConnection.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
public Channel getRabbitChannel() {
try {
if (!rabbitConnection.isOpen()) {
synchronized (rabbitConnection) {
if (!rabbitConnection.isOpen()) {
try {
Loggers.RUNNING_LOG.error("rabbitmq connection try new connection. ");
rabbitConnection = rabbitConnectionFactory.newConnection();
rabbitConnection.addShutdownListener(new ShutdownListener() {
@Override
public void shutdownCompleted(ShutdownSignalException cause) {
Loggers.ERROR_LOG.error("rabbitmq connection shutdown. reason: {}", cause);
}
});
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
}
}
}
return rabbitConnection.createChannel();
} catch (AlreadyClosedException ace) {
Loggers.ERROR_LOG.error("rabbitmq connection already close. error: {}", ace);
} catch (IOException e) {
e.printStackTrace();
}
return null;
}
}
然后在其他地方出队时, 手动 close channel.
出队的处理
package com.company.listener.bid;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.QueueingConsumer;
import com.company.listener.RabbitmqClient;
import com.company.pojo.WinPriceLog;
import com.company.service.TaskBidLogService;
import com.company.service.rabbitmq.RabbitMQService;
import com.company.util.JsonUtil;
import com.company.util.Loggers;
import org.apache.commons.lang.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeoutException;
@Component
public class WinPriceLogListener {
@Value("${log.dequeue.prefetch}")
private String prefetchCount;
@Value("${queue.win.price.log}")
private String winPriceLogQueue;
@Value("${queue.win.price.log.process}")
private String winPriceLogQueProcess;
@Autowired
private RabbitMQService queueService;
@Autowired
private TaskBidLogService taskBidLogService;
@Autowired
private RabbitmqClient rabbitmqClient;
public void winPriceLogOutQueue(byte[] msg) {
winPriceLogOutQueue(new String(msg, StandardCharsets.UTF_8));
}
private void winPriceLogOutQueue(String _msg) {
long s = System.currentTimeMillis();
if (StringUtils.isBlank(_msg)) {
return;
}
long enQueueTime = Long.parseLong(_msg);
Channel channel = rabbitmqClient.getRabbitChannel();
if (channel == null) {
return;
}
try {
AMQP.Queue.DeclareOk declareOk = channel.queueDeclare(winPriceLogQueue, true, false, false, null);
int messageCount = declareOk.getMessageCount();
if (messageCount == 0) { //over if there's no message in queue
return;
}
channel.basicQos(Integer.parseInt(prefetchCount));
QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicConsume(winPriceLogQueue, false, consumer);
List<WinPriceLog> list = new ArrayList<>();
List<QueueingConsumer.Delivery> deliveryList = new ArrayList<>();
long _s = System.currentTimeMillis();
while (messageCount > 0) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery(500);
if (delivery == null) {
break;
}
String msg = new String(delivery.getBody());
if (StringUtils.isBlank(msg)) {
break;
}
WinPriceLog wpl = JsonUtil.jsonStr2Obj(msg, WinPriceLog.class);
list.add(wpl);
deliveryList.add(delivery);
messageCount--;
if (deliveryList.size() % 2500 == 0) {
int affectedRow = taskBidLogService.insertWinPriceBatchJDBC(list);
channel.basicAck(deliveryList.get(deliveryList.size() - 1).getEnvelope().getDeliveryTag(), true);
Loggers.AUDIT_LOG.info("{} winPriceLog out queue. consume using {}ms, insert:{} ", new Object[]{deliveryList.size(), (System.currentTimeMillis()
- _s), affectedRow});
list.clear();
deliveryList.clear();
}
}
long consumeCost = System.currentTimeMillis() - _s;
int effectRow = taskBidLogService.insertWinPriceBatchJDBC(list);
//ack
if (!CollectionUtils.isEmpty(deliveryList)) {
channel.basicAck(deliveryList.get(deliveryList.size() - 1).getEnvelope().getDeliveryTag(), true);
}
Loggers.AUDIT_LOG.info("{} winPriceLogs out queue. totally using {}ms, consume using {}ms, effectRow: {}",
new Object[]{deliveryList.size(), (System.currentTimeMillis() - s), consumeCost, effectRow});
if (declareOk.getMessageCount() > Integer.parseInt(prefetchCount)) {//message count of queue greater than [PREFETCH_COUNT]
queueService.putMessage(winPriceLogQueProcess, _msg);
}
} catch (Exception e) {
Loggers.ERROR_LOG.error("win price log dequeue Exception. ", e);
} finally {
try {
if (channel != null && channel.isOpen()) {
channel.close();
}
} catch (IOException e) {
Loggers.ERROR_LOG.error("rabbitmq channel close IOException. {}", e);
} catch (TimeoutException e) {
Loggers.ERROR_LOG.error("rabbitmq channel close TimeoutException. {}", e);
}
}
}
}
问题
可以看到
getRabbitChannel()
方法, 虽然使用了 double check 来判断 connection 的状态, 然后重连. 但是, 它没有限制资源以及连接的频率等, 导致线上 RabbitMQ 中间件出现阻塞等问题时, 这个方法会在短时间内, 连接大量的 Connection , 最后导致触发了 Linux 的 OOM 机制.不知道是不是因为手动管理不当的原因, 当在高并发, 高出价率时, 经常会报
com.rabbitmq.client.impl.UnknownChannelException: Unknown channel number X
等异常
几乎每次大量投放业务时, 这种现象经常会出现.
虽然正常情况下, 并没有数据丢失这种问题.
版本2
既然项目中使用了 Spring 的 amqp 项目, 那为什么不用Spring RabbitTemplate 这些模板类来处理呢? Spring 有比较多的模板模式(比如 RedisTemplate, RabbitTemplte, RestTemplate等) 都是非常好用的. 它会自动管理资源和缓存等问题.
所以, 以下是使用 RabbitTemplate 后的代码版本
private void winPriceLogOutQueue(final String msg) {
if (StringUtils.isBlank(msg)) {
return;
}
final long s = System.currentTimeMillis();
rabbitTemplate.execute(new ChannelCallback<Object>() {
@Override
public Object doInRabbit(final Channel channel) throws Exception {
DeclareOk declareOk = channel.queueDeclare(winPriceLogQueue, true, false, false, null);
int messageCount = declareOk.getMessageCount();
if (messageCount == 0) { //over if there's no message in queue
return null;
}
channel.basicQos(reloadableProperties.getPrefetchCount());
QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicConsume(winPriceLogQueue, false, "win price log out queue consumer" + DateUtil.formartFullDate(new Date()), consumer);
List<WinPriceLog> list = new ArrayList<>();
List<Delivery> deliveryList = new ArrayList<>();
long messageId = -1;
long startTime = System.currentTimeMillis();
Loggers.WIN_LOG.info("win price log dequeue size {}", messageCount);
int totalEffectedRows = 0;
while (messageCount-- > 0) {
Delivery delivery = consumer.nextDelivery(500);
if (delivery == null) {
break;
}
messageId = delivery.getEnvelope().getDeliveryTag();
String body = new String(delivery.getBody());
Loggers.WIN_LOG.info("win price log dequeue {}", body);
if (StringUtils.isBlank(body)) {
continue;
}
WinPriceLog wpl = JsonUtil.jsonStr2Obj(body, WinPriceLog.class);
list.add(wpl);
deliveryList.add(delivery);
if (deliveryList.size() % 2500 == 0) {
int affectedRow = taskBidLogService.insertWinPriceBatchJDBC(list);
totalEffectedRows += affectedRow;
channel.basicAck(messageId, true);
Loggers.AUDIT_LOG.info("{} winPriceLog out queue. consume using {}ms, insert:{} ", deliveryList.size(), System.currentTimeMillis()
- startTime, affectedRow);
list.clear();
deliveryList.clear();
messageId = -1;
}
}
long consumeCost = System.currentTimeMillis() - startTime;
int effectRow = taskBidLogService.insertWinPriceBatchJDBC(list);
totalEffectedRows += effectRow;
Loggers.WIN_LOG.info("win price log dequeue insert db size {}", totalEffectedRows);
//ack
if (messageId > 0) {
channel.basicAck(messageId, true);
}
Loggers.AUDIT_LOG.info("{} winPriceLogs out queue. totally using {}ms, consume using {}ms, effectRow: {}",
deliveryList.size(), (System.currentTimeMillis() - s), consumeCost, effectRow);
return null;
}
});
}
好处
- MQ 相关的资源 Connection , Channel 等, 全交给 Spring RabbitTemplate 来管理了. 避免手动管理不当而导致 OOM .
代码问题
但是, 经过测试, 发现它会有 数据丢失
的问题!. RabbitMQ 一向是以稳定可靠, 以及保证数据不会丢失而著名的. 而我们在测试过程中, 竟然发现每次都会有数据丢失!
经过Debug, 发现导致的原因如下
- 设置了 prefetch (相当于在 consumer 里设置了一个消息缓存池, RabbitMQ Server 会不断一条一条地发送消息过来, 直到达到 prefetch 大小为止, 不断这样子循环)
- 批量处理完后(参见以上代码) 虽然代码是处理了当时快照时的队列大小的消息:
DeclareOk declareOk = channel.queueDeclare(winPriceLogQueue, true, false, false, null); int messageCount = declareOk.getMessageCount();
但是, 生产者还是会不断发送消息过来, 然后 MQ 还是会不断发送消息到 consumer - 虽然我们 ack 了已经正确插入DB 的数据. 但是该 Channel 中, 它的 messageID ( messageID 在每个 channel 中是独立的, 即每一个 channel 中, 都有一个 messageID 计数器, 并且是从1开始不断递增的) 还是在不断地递增.
- 假设, 第一轮时, 我们处理了 100 消息( messageId 1~100), 但该 channel 中的 messageID 已经累计到 250 了(因为它还会不断接收 MQ Server 发送过来的消息, 只是我们还没有确认)
- 现在开始第二轮(由于 Spring amqp 默认情况下, 会缓存 channel 的), 它可能复用了第一轮的 Channel , 这时出队时的 messageID 是 251 了! 最后, 我们处理该轮消息时, 假设又处理了 100条, 即 (messageID为 351). 这时, 批量确认所有 messageID <= 351 的消息, 就会导致中间的 (101~250) 这些消息, 没有被处理!但却被我们确认了!
重现代码:
写一个不断发送消息的代码
new Thread(new Runnable() {
@Override
public void run() {
for (int i = 0; i < 500000; i++) {
rabbitTemplate.convertAndSend(queueName, "hello" + i);
}
System.out.println("done =======> 1000");
}
}).start();
还有一个批量出队的代码(模拟每次处理 100 条消息)
@Scheduled(cron = "*/5 * * * * *")
public void consumer() {
rabbitTemplate.execute(channel -> {
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
try {
final AMQP.Queue.DeclareOk ok = channel.queueDeclare(queueName, true, false, false, null);
int messageCount = ok.getMessageCount();
log.info("run consumer {}, msg count {}", sdf.format(new Date()), messageCount);
if (messageCount == 0) {
return null;
}
channel.basicQos(100);
QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
log.info("channel id {}", Integer.toHexString(System.identityHashCode(channel)));
final String inConsumerTag = "test consumer" + sdf.format(new Date());
channel.basicConsume(queueName, false, inConsumerTag, queueingConsumer);
long messageId = -1;
int dealedCount = 0;
int i = 100;
while (i-- > 0) {
QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery(100);
if (delivery == null) {
break;
}
String msg = new String(delivery.getBody());
messageId = delivery.getEnvelope().getDeliveryTag();
log.info("get message {} delivery id {}", msg, messageId);
dealedCount++;
if (dealedCount % 5 == 0) {
channel.basicAck(messageId, true);
log.info("batch ack message id =>{}", messageId);
messageId = -1;
}
}
if (messageId > 0) {
channel.basicAck(messageId, true);
log.info("last to ack message id =>{}", messageId);
}
} finally {
log.info("consumer done {}", sdf.format(new Date()));
}
return null;
});
}
这时就可以重现”数据丢失”的问题了!
- 可以看到, 它复用了 channel (它们的内存地址同样为 6d7d893)
第二轮时, 它的 delivery id 是从 201 开始的! (实际上, 我们期待它应该是从 1 开始的或 消息内容从 hello101 开始! 因为我们只 ack 了前100条, 那后续的处理应该接着上一轮继续处理的)
那为什么 template 处理完后 执行了
channel.close()
并没有真实关闭呢(因为缓存 channel, close 的时候, 只是放回缓存池)
解决
强制刷新 channel (即让 mq 每次都创建一个新的 channel, 虽然缓存模式为 channel, 但这种情况下, 看来只能强制关闭 channel, 让还没有真正处理的消息返回队列, 然后重新出队 ack了. 不知道还有没有更好的方式, 有的话, 请 email 我哈)
即在代码最后 return null;
之前, 添加以下方法
channel.abort()
这个方法, 会强制关闭 channel , 由于 consumer 设置了手动 ack, 所以那些未被插入DB的内存消息会重新出队.
这时, 可以看到
- channel 每次都是新的了
- 消息是接着上一轮开始的, 并且没有 “丢失”