因为线上项目因为 RabbitMQ 连接数过多(代码不良导致), 而导致 Redis 的 pub/sub 机制失效, 所以才有了这篇排查问题的文章. 记于 2018-1-17 事故日期: 2017-1-15

Spring Data Redis 中的 pub/sub

    @Bean
    RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory, MessageListenerAdapter listenerAdapter) {
        RedisMessageListenerContainer container = new RedisMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
        container.addMessageListener(listenerAdapter, new ChannelTopic("chat"));
        return container;
    }

topic 类型

默认情况下, Spring 提供了两种 topic 的实现:

# 这种是完全匹配的 channel
ChannelTopic

# 这种是正则匹配的 channel
PatternTopic

重连机制

Spring Data Redis 的 pub/sub 是提供了重试机制的. 默认的行为是:

每间隔5秒钟重试一次. 不过要注意, 重试的前提是线程是因为 RedisConnectionFailureException 的异常时, 才会自动再重试, 如果是其他的异常, 则不会重试了的. 这个通过源码可知:

我测试的版本是: spring-data-redis 1.8.9

	/**
	 * Handle subscription task exception. Will attempt to restart the subscription if the Exception is a connection
	 * failure (for example, Redis was restarted).
	 * 
	 * @param ex Throwable exception
	 */
	protected void handleSubscriptionException(Throwable ex) {
		listening = false;
		subscriptionTask.closeConnection();
		if (ex instanceof RedisConnectionFailureException) {
			if (isRunning()) {
				logger.error("Connection failure occurred. Restarting subscription task after " + recoveryInterval + " ms");
				sleepBeforeRecoveryAttempt();
				lazyListen();
			}
		} else {
			logger.error("SubscriptionTask aborted with exception:", ex);
		}
	}

	/**
	 * Sleep according to the specified recovery interval. Called between recovery attempts.
	 */
	protected void sleepBeforeRecoveryAttempt() {
		if (this.recoveryInterval > 0) {
			try {
				Thread.sleep(this.recoveryInterval);
			} catch (InterruptedException interEx) {
				logger.debug("Thread interrupted while sleeping the recovery interval");
				Thread.currentThread().interrupt();
			}
		}
	}

比较典型的情况是:(这里摘取的是线上项目出现的异常, 哈哈~)

o.s.d.r.l.RedisMessageListenerContainer.handleSubscriptionException 652 [2018-01-15 12:09:29] [ERROR]: SubscriptionTask aborted with exception:java.lang.OutOfMemoryError: unable to create new native thread
o.s.d.r.l.RedisMessageListenerContainer.handleSubscriptionException 652 [2018-01-15 12:10:00] [ERROR]: SubscriptionTask aborted with exception:java.lang.OutOfMemoryError: unable to create new native thread

可以看到, OOM 的时候, 它就不会重试了.