主要是listener-container配置

<rabbit:listener-container connection-factory="rabbitConnectionFactory" error-handler="MessageErrorHandler" task-executor="myExecutor" concurrency="10">
</rabbit:listener-container>

属性说明

task-executor

这个属性表示,在执行listener时,使用的线程行为。默认为SimpleAsyncTaskExecutor,即每执行一个listener,都创建一条新的线程。

concurrency

这个表示每个listener创建多少个消费者(会创建多少个线程来消费)

所以,如果想要多线程执行,就要配置concurrency,因为默认情况下,它是1.

如果还配置了task-executor,就要特别注意task-executor要有足够的线程来满足执行.

建议配置这个为>1,因为如果只有一个进程的话,会阻塞后面的消息队列。但如果有多个消费者的话,那一个线程的阻塞,并不会导致其他线程的阻塞.

task-executor

<task:executor id="myExecutor" pool-size="20"  />

这个表示固定的线程池.属性有:

属性说明

keep-alive

单位是.这个主要在pool-size为弹性的时候才有效.

pool-size

如果只是写一个数字,那就表示是固定的。如果是N-M,表示最少会保持N条活动线程池,然后最大可以扩展到M条线程. 然后根据keep-alive来销毁不活动的线程.

如果设置为N-M的形式,那就一定要设置queue-capacity这个属性.

queue-capacity

表示队列容量大小.如果pool-size为可变的,那么这个属性就一定要设置.

rejection-policy

当队列满时,进行的策略.它是一个java.util.concurrent.RejectedExecutionHandler类型.

一共有:

  1. ABORT(缺省):抛出TaskRejectedException异常,然后不执行
  2. DISCARD:不执行,也不抛出异常
  3. DISCARD_OLDEST:丢弃queue中最旧的那个任务
  4. CALLER_RUNS:不在新线程中执行任务,而是有调用者所在的线程来执行

建议设置为CALLER_RUNS.

建议配置为


<task:executor id="myExecutor" pool-size="20-100" keep-alive="600" queue-capacity="2000" rejection-policy="CALLER_RUNS" />

<rabbit:listener-container connection-factory="rabbitConnectionFactory" error-handler="MessageErrorHandler" concurrency="10" task-executor="myExecutor">
</rabbit:listener-container>

关于排查问题

最好在 rabbitmq 的 connection 和 channel 上,都注册一个 addShutdownListener,例如下面的例子:


try {
           rabbitConnection = rabbitConnectionFactory.newConnection();
            rabbitConnection.addShutdownListener(new ShutdownListener() {
               @Override
               public void shutdownCompleted(ShutdownSignalException cause) {
                   Loggers.ERROR_LOG.error("rabbitmq connection already close. reason: {}", cause);
               }
           });
       } catch (IOException e) {
           e.printStackTrace();
       } catch (TimeoutException e) {
           e.printStackTrace();
       }

让它打印出关闭的原因.

遇到过的问题

Caused by: com.rabbitmq.client.AlreadyClosedException: channel is already closed due to clean channel shutdown.

com.rabbitmq.client.AlreadyClosedException: connection is already closed due to connection error; cause: com.rabbitmq.client.impl.UnknownChannelException: Unknown channel number xxx

解决思路

为每个 Connection 和 Channel 注册相应的生命周期的事件通知。然后打印相应的线程栈,最后定位问题代码和排查问题。

比如,我们遇到了 Unknown channel number xxx 的异常,一开始时,虽然打印出这些问题代码,但是还是比如模糊。最后,我们为每个 Connection 添加相应的 listener ,然后打印相应的线程栈。(按上面那个 shutdownListener 的伪代码即可)

最后,我们发现是我们的同事,在使用底层的 Channel 时,没有正确关闭资源,导致这些问题的。错误的伪代码如下:


        try {
            AMQP.Queue.DeclareOk declareOk = channel.queueDeclare(monitorQueue, true, false, false, null);
            if (declareOk.getMessageCount() == 0) { //over if there's no message in queue
                channel.close();
                return;
            }
        } finally {
                try {
                    channel.close();
                } catch (IOException e) {
                    Loggers.ERROR_LOG.error("monitor log dequeue close channel IOException. ", e);
                } catch (TimeoutException e) {
                    Loggers.ERROR_LOG.error("monitor log dequeue close channel TimeoutException. ", e);
                }
        }

最后修改为了如下:


        try {
            AMQP.Queue.DeclareOk declareOk = channel.queueDeclare(monitorQueue, true, false, false, null);
            if (declareOk.getMessageCount() == 0) { //over if there's no message in queue
                return;
            }
        } finally {
            if (channel != null && channel.isOpen()) {
                try {
                    channel.close();
                } catch (IOException e) {
                    Loggers.ERROR_LOG.error("monitor log dequeue close channel IOException. ", e);
                } catch (TimeoutException e) {
                    Loggers.ERROR_LOG.error("monitor log dequeue close channel TimeoutException. ", e);
                }
            }
        }            

这样子线上就没有再因为这个原因,而不断报类似上面的异常了。