Spring 与 RabbitMQ 结合配置以及注意事项
Contents
主要是listener-container
配置
<rabbit:listener-container connection-factory="rabbitConnectionFactory" error-handler="MessageErrorHandler" task-executor="myExecutor" concurrency="10">
</rabbit:listener-container>
属性说明
task-executor
这个属性表示,在执行listener
时,使用的线程行为。默认为SimpleAsyncTaskExecutor
,即每执行一个listener
,都创建一条新的线程。
concurrency
这个表示每个listener
创建多少个消费者(会创建多少个线程来消费)
所以,如果想要多线程执行,就要配置concurrency
,因为默认情况下,它是1
.
如果还配置了task-executor
,就要特别注意task-executor
要有足够的线程来满足执行.
建议配置这个为>1
,因为如果只有一个进程的话,会阻塞后面的消息队列。但如果有多个消费者的话,那一个线程的阻塞,并不会导致其他线程的阻塞.
task-executor
<task:executor id="myExecutor" pool-size="20" />
这个表示固定的线程池.属性有:
属性说明
keep-alive
单位是秒
.这个主要在pool-size
为弹性的时候才有效.
pool-size
如果只是写一个数字,那就表示是固定的。如果是N-M
,表示最少会保持N
条活动线程池,然后最大可以扩展到M
条线程.
然后根据keep-alive
来销毁不活动的线程.
如果设置为N-M
的形式,那就一定要设置queue-capacity
这个属性.
queue-capacity
表示队列容量大小.如果pool-size
为可变的,那么这个属性就一定要设置.
rejection-policy
当队列满时,进行的策略.它是一个java.util.concurrent.RejectedExecutionHandler
类型.
一共有:
ABORT
(缺省):抛出TaskRejectedException异常,然后不执行DISCARD
:不执行,也不抛出异常DISCARD_OLDEST
:丢弃queue中最旧的那个任务CALLER_RUNS
:不在新线程中执行任务,而是有调用者所在的线程来执行
建议设置为CALLER_RUNS
.
建议配置为
<task:executor id="myExecutor" pool-size="20-100" keep-alive="600" queue-capacity="2000" rejection-policy="CALLER_RUNS" />
<rabbit:listener-container connection-factory="rabbitConnectionFactory" error-handler="MessageErrorHandler" concurrency="10" task-executor="myExecutor">
</rabbit:listener-container>
关于排查问题
最好在 rabbitmq 的 connection 和 channel 上,都注册一个 addShutdownListener
,例如下面的例子:
try {
rabbitConnection = rabbitConnectionFactory.newConnection();
rabbitConnection.addShutdownListener(new ShutdownListener() {
@Override
public void shutdownCompleted(ShutdownSignalException cause) {
Loggers.ERROR_LOG.error("rabbitmq connection already close. reason: {}", cause);
}
});
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
让它打印出关闭的原因.
遇到过的问题
Caused by: com.rabbitmq.client.AlreadyClosedException: channel is already closed due to clean channel shutdown.
com.rabbitmq.client.AlreadyClosedException: connection is already closed due to connection error; cause: com.rabbitmq.client.impl.UnknownChannelException: Unknown channel number xxx
解决思路
为每个 Connection 和 Channel 注册相应的生命周期的事件通知。然后打印相应的线程栈,最后定位问题代码和排查问题。
比如,我们遇到了 Unknown channel number xxx
的异常,一开始时,虽然打印出这些问题代码,但是还是比如模糊。最后,我们为每个 Connection 添加相应的 listener ,然后打印相应的线程栈。(按上面那个 shutdownListener 的伪代码即可)
最后,我们发现是我们的同事,在使用底层的 Channel 时,没有正确关闭资源,导致这些问题的。错误的伪代码如下:
try {
AMQP.Queue.DeclareOk declareOk = channel.queueDeclare(monitorQueue, true, false, false, null);
if (declareOk.getMessageCount() == 0) { //over if there's no message in queue
channel.close();
return;
}
} finally {
try {
channel.close();
} catch (IOException e) {
Loggers.ERROR_LOG.error("monitor log dequeue close channel IOException. ", e);
} catch (TimeoutException e) {
Loggers.ERROR_LOG.error("monitor log dequeue close channel TimeoutException. ", e);
}
}
最后修改为了如下:
try {
AMQP.Queue.DeclareOk declareOk = channel.queueDeclare(monitorQueue, true, false, false, null);
if (declareOk.getMessageCount() == 0) { //over if there's no message in queue
return;
}
} finally {
if (channel != null && channel.isOpen()) {
try {
channel.close();
} catch (IOException e) {
Loggers.ERROR_LOG.error("monitor log dequeue close channel IOException. ", e);
} catch (TimeoutException e) {
Loggers.ERROR_LOG.error("monitor log dequeue close channel TimeoutException. ", e);
}
}
}
这样子线上就没有再因为这个原因,而不断报类似上面的异常了。
publish confirm 与 publish returns
要开启的话, 可以设置
cachingConnectionFactory.setPublisherReturns(true);
cachingConnectionFactory.setPublisherConfirms(true);
//然后在 RabbitTemplate 中设置
rabbitTemplate.setConfirmCallback();
rabbitTemplate.setReturnCallback();
rabbitTemplate.setMandatory(true);