彻底了解Spring 中 RabbitMQ配置的concurrency 和 task-executor
Contents
问题
在生产环境中,突然发现有个Tomcat报如下类似错误:
Caused by: org.springframework.amqp.UncategorizedAmqpException: java.util.concurrent.TimeoutException: Timed out waiting for startup
at org.springframework.amqp.rabbit.connection.RabbitUtils.convertRabbitAccessException(RabbitUtils.java:118)
at org.springframework.amqp.rabbit.connection.RabbitAccessor.convertRabbitAccessException(RabbitAccessor.java:106)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.start(AbstractMessageListenerContainer.java:365)
at org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:167)
... 59 common frames omitted
Caused by: java.util.concurrent.TimeoutException: Timed out waiting for startup
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.getStartupException(SimpleMessageListenerContainer.java:512)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doStart(SimpleMessageListenerContainer.java:337)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.start(AbstractMessageListenerContainer.java:363)
... 60 common frames omitted
原因
项目里的rabbitmq的配置大致如下:
<task:annotation-driven scheduler="myScheduler"/>
<task:executor id="myExecutor" pool-size="8" />
<task:scheduler id="myScheduler" pool-size="5" />
<rabbit:listener-container connection-factory="rabbitConnectionFactory" error-handler="MessageErrorHandler" concurrency="10" task-executor="myExecutor">
...
</rabbit:listener>
刚开始,我以为这个task-executor
只是负责调用listener
方法的(即,只有真正执行listener
的时候,才会提交该task到task-executor
线程池去执行的.
但是,如果真的是按预期的来执行的话,是不应该报这个错误的。可知,真正执行的与理解的,还是有一定的差距。这时,只好看看JVM底层的线程情况才能真正知道它到底是如何执行的了。
情况1
pool-size
: 8
concurrency
: 8
并且使用task-executor
,listener-container
里只有一个listener
这时可以正常启动,也可以正常出队.
情况2
pool-size
: 7
concurrency
: 8
并且使用task-executor
,listener-container
里只有一个listener
启动就会报上面的那些类似错误。
推论1:*concurrency*的线程,是包含在task-executor
内部的.而且是会一直使用的,并不会释放的.
推论2:pool-size
>= concurrency
(所有配置了concurrency
的总和 <= 使用同一个executor
线程池大小),这只是最低限度.
情况3
使用concurrency
,但是没有task-executor
,它使用的是默认的SimpleAsyncTaskExecutor
来适应concurrency
(默认是1,即对于每个listener的并发都是1,注意,它是对每个listener都生效的,这时要特别注意线程池的大小)
SimpleAsyncTaskExecutor
,注意它并不是线程池,它是在启动的时候,由concurrency
的参数来决定数量的.
解决
方法1
可以添加concurrency
,然后计算一下,所有使用该executor
的配置估计下至少要使用多少条线程。最好使用弹性的线程池(pool-size=“3-5”)这种配置,不过这样子的话,就一定要配置execuotr的queue-capacity
.
方法2
可以添加concurrency
,但去掉executor
即可.这样了,Spring就会按需自动new线程了.
参考资料
更新
2015-12-18 : 这情况,只是适用于listener的方法没有添加@Async
的情况下.