问题

在生产环境中,突然发现有个Tomcat报如下类似错误:

Caused by: org.springframework.amqp.UncategorizedAmqpException: java.util.concurrent.TimeoutException: Timed out waiting for startup
	at org.springframework.amqp.rabbit.connection.RabbitUtils.convertRabbitAccessException(RabbitUtils.java:118)
	at org.springframework.amqp.rabbit.connection.RabbitAccessor.convertRabbitAccessException(RabbitAccessor.java:106)
	at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.start(AbstractMessageListenerContainer.java:365)
	at org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:167)
	... 59 common frames omitted
Caused by: java.util.concurrent.TimeoutException: Timed out waiting for startup
	at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.getStartupException(SimpleMessageListenerContainer.java:512)
	at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doStart(SimpleMessageListenerContainer.java:337)
	at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.start(AbstractMessageListenerContainer.java:363)
	... 60 common frames omitted

原因

项目里的rabbitmq的配置大致如下:


<task:annotation-driven  scheduler="myScheduler"/>
<task:executor id="myExecutor" pool-size="8" />
<task:scheduler id="myScheduler" pool-size="5" />

<rabbit:listener-container connection-factory="rabbitConnectionFactory" error-handler="MessageErrorHandler"  concurrency="10" task-executor="myExecutor">
...
</rabbit:listener>

刚开始,我以为这个task-executor只是负责调用listener方法的(即,只有真正执行listener的时候,才会提交该task到task-executor线程池去执行的.

但是,如果真的是按预期的来执行的话,是不应该报这个错误的。可知,真正执行的与理解的,还是有一定的差距。这时,只好看看JVM底层的线程情况才能真正知道它到底是如何执行的了。

情况1

pool-size: 8 concurrency: 8 并且使用task-executorlistener-container里只有一个listener

这时可以正常启动,也可以正常出队.

情况2

pool-size: 7 concurrency: 8 并且使用task-executorlistener-container里只有一个listener

启动就会报上面的那些类似错误。

推论1:*concurrency*的线程,是包含在task-executor内部的.而且是会一直使用的,并不会释放的. 推论2:pool-size >= concurrency(所有配置了concurrency的总和 <= 使用同一个executor线程池大小),这只是最低限度.

情况3

使用concurrency,但是没有task-executor,它使用的是默认的SimpleAsyncTaskExecutor来适应concurrency(默认是1,即对于每个listener的并发都是1,注意,它是对每个listener都生效的,这时要特别注意线程池的大小) SimpleAsyncTaskExecutor,注意它并不是线程池,它是在启动的时候,由concurrency的参数来决定数量的.

解决

方法1

可以添加concurrency,然后计算一下,所有使用该executor的配置估计下至少要使用多少条线程。最好使用弹性的线程池(pool-size=“3-5”)这种配置,不过这样子的话,就一定要配置execuotr的queue-capacity.

方法2

可以添加concurrency,但去掉executor即可.这样了,Spring就会按需自动new线程了.

参考资料

  1. http://docs.spring.io/spring-amqp/reference/htmlsingle/

更新

2015-12-18 : 这情况,只是适用于listener的方法没有添加@Async的情况下.