Spring Data Redis 发布订阅
Contents
因为线上项目因为 RabbitMQ 连接数过多(代码不良导致), 而导致 Redis 的 pub/sub 机制失效, 所以才有了这篇排查问题的文章. 记于 2018-1-17 事故日期: 2017-1-15
Spring Data Redis 中的 pub/sub
@Bean
RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory, MessageListenerAdapter listenerAdapter) {
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.addMessageListener(listenerAdapter, new ChannelTopic("chat"));
return container;
}
topic 类型
默认情况下, Spring 提供了两种 topic 的实现:
# 这种是完全匹配的 channel
ChannelTopic
# 这种是正则匹配的 channel
PatternTopic
重连机制
Spring Data Redis 的 pub/sub 是提供了重试机制的. 默认的行为是:
每间隔5秒钟重试一次. 不过要注意, 重试的前提是线程是因为 RedisConnectionFailureException
的异常时, 才会自动再重试, 如果是其他的异常, 则不会重试了的. 这个通过源码可知:
我测试的版本是: spring-data-redis 1.8.9
/**
* Handle subscription task exception. Will attempt to restart the subscription if the Exception is a connection
* failure (for example, Redis was restarted).
*
* @param ex Throwable exception
*/
protected void handleSubscriptionException(Throwable ex) {
listening = false;
subscriptionTask.closeConnection();
if (ex instanceof RedisConnectionFailureException) {
if (isRunning()) {
logger.error("Connection failure occurred. Restarting subscription task after " + recoveryInterval + " ms");
sleepBeforeRecoveryAttempt();
lazyListen();
}
} else {
logger.error("SubscriptionTask aborted with exception:", ex);
}
}
/**
* Sleep according to the specified recovery interval. Called between recovery attempts.
*/
protected void sleepBeforeRecoveryAttempt() {
if (this.recoveryInterval > 0) {
try {
Thread.sleep(this.recoveryInterval);
} catch (InterruptedException interEx) {
logger.debug("Thread interrupted while sleeping the recovery interval");
Thread.currentThread().interrupt();
}
}
}
比较典型的情况是:(这里摘取的是线上项目出现的异常, 哈哈~)
o.s.d.r.l.RedisMessageListenerContainer.handleSubscriptionException 652 [2018-01-15 12:09:29] [ERROR]: SubscriptionTask aborted with exception:java.lang.OutOfMemoryError: unable to create new native thread
o.s.d.r.l.RedisMessageListenerContainer.handleSubscriptionException 652 [2018-01-15 12:10:00] [ERROR]: SubscriptionTask aborted with exception:java.lang.OutOfMemoryError: unable to create new native thread
可以看到, OOM
的时候, 它就不会重试了.