[翻译]RabbitMQ中的消费者确认和发布者确认
Contents
介绍
使用消息 Broker 的系统,例如 RabbitMQ 是被定义为分布式的。由于协议的方法(消息)的发送并不能保证到达对应目的地或者被成功地处理掉这些消息,因此,发布者和消费者都需要一个进行分发和处理的确认机制。RabbitMQ 支持的几种消息协议都支持这种特性。本指南介绍了AMQP 0-9-1的功能,但其他协议(STOMP,MQTT等)的思想基本相同。
消费者对RabbitMQ的分发处理确认称为 AMQP 0-9-1 说明中的确认; Broker 对发布者的确认是一种协议扩展,称为发布者确认。
消费者分发确认
当 RabbitMQ 分发一条消息给消费者时,它需要知道什么时候认为消息已经被成功发送了。至于是什么样的逻辑优化,取决于系统。因此主要由应用层来决定。在 AMQP 0-9-1 中,它是通过当消费者使用 basic.consume 方法来注册或者通过 basic.get 方法按需获取消息时实现的。
如果你更喜欢采用面向例子并且循序渐进的资料,消费者确认在 RabbitMQ 教程#2 有介绍。
分发标识符(Delivery Identifiers):分发标签(Delivery Tags)
在我们继续讨论其他主题之前,重要的是要解释分发是如何被标识的(并且确认指明他们各自的分发)。当消费者(订阅)注册时,消费将被 RabbitMQ 通过使用 basic.deliver 方法来分发(推送)。该方法携带一个分发标签(devlivery tag),它唯一地标识了在 Channel 上的分发。分发标签(devlivery tag) 因此是每个 channel 范围私有的。
分发标签(devlivery tag) 是单调递增的正整数,并由客户端库来表示。客户端库中的确认分发的方法,将带上一个分发标签(devlivery tag)作为参数。
确认方法
根据所使用的确认模式,RabbitMQ可以在发送(写入TCP套接字)后立即认为消息已经被成功地分发或者当接收到客户端显式(“手动”)确认时才认为它已经被成功地分发。手动发送的确认消息可以为主动或被动,并且使用以下协议方法之一:
basic.ack :用于主动确认
basic.nack : 用于被动确认(注意,这是一个 RabbitMQ 对 AMQP 0-9-1 的扩展 )
basic.reject : 用于被动确认,但与 basic.nack 比较而言,它有一些限制。
主动确认就简单地指示 RabbitMQ 记录一条消息已经被成功地分发了。
使用 basic.reject 的被动确认也有同样的效果。差异主要在于主义方面:主动确认假设消息已经被成功地处理了,而被动确认对应的是分发的消息并没有被成功处理,但消息仍然应该被删除。
一次性确认多条分发
手动确认可以被批量处理,以减少网络传输。这可以通过设置确认方法(参见上面)中的 multiple 字段为 true 来实现。请注意, basic.reject 以前并没有该字段,这就是为什么 basic.nack 被 RabbitMQ 作为一个协议扩展来引入。
当 multiple 字段设置时,RabbitMQ 将确认所有未完成的分发标签 直到并包括 确认中的指定的标签。像其他与确认相关的东西一样,它也是 Channel 私有的。例如,在 Channel ch 中给定了分发标签为 5,6,7,8 是未确认的, 当一个带有 delivery_tag 为8, multiple 为 true 的确认帧到达 channel ch 后,所有从5到8(包括)的标签都会被确认。如果 multiple 为 false ,则分发的 5,6,7将仍然是未确认的。
Channel 的 prefetch 设置(QoS)
由于消息发送(推送)到客户端是异步的,因此Channel中,在给定的一瞬间,通常有超过一条消息是 in flight 状态的。此外,从客户端手动确认本质上也是异步的。因此,会有一个没有被确认的分发标签的滑动窗口。开发者通会倾向于限制此窗口的大小,以避免消费者端无限缓冲的问题。这是通过使用 basic.qos 方法设置 预取计数 值来完成的。该值定义了Channel上允许的未确认的分发的最大数值。一旦数量达到配置的计数,RabbitMQ 将停止在该Channel 上分发更多消息,除非至少有一个未确认的消息被确认了。
例如,在给定的Channel ch 上有分发标签为 5,6,7,8 的未确认消息,并且 Channel ch 的 prefetch 计数设置为4,则 RabbitMQ 将不会推送更多的分发给 Channel ch,除非至少有一个未完成的分发被确认了。当该Channel上有一个带有 delivery_tag 为 8的确认帧到达时,RabbitMQ 将通知和分发多一条消息给该Channel。
值得重申的是,分发流程和客户端手动确认完全是异步的。因此,如果 prefetch 的值在分发已经在 in flight 状态时被改变了,则会出现自然竞态条件,并且可能会暂时超过 Channel 上 prefetch 计数的未确认消息。
可以为Channel或消费者设置 QoS 。详情参看 Consumer Prefetch
客户端错误:双重确认以及未知标签(double acking and unknow tags)
如果客户端多次对同一个分发标签进行确认,RabbitMQ 将导致一个 Channel 错误,例如 PRECONDITION_FAILED - unknown delivery tag 100 。如果使用了一个未知的分发标签,也会抛出同样的 Channel 异常。
发布者确认
使用标准的 AMQP 0-9-1 ,保证消息不丢失的唯一方法是使用事务——用 Channel 事务,发布消息,提交。在这种情况下,事务是不必须的,而且它是重置级的,并且会降低 250 倍左右的吞吐量。为了弥补这一点,引入了确认机制。它模仿了协议中已经存在的消费者确认机制。
为了开启确认机制,客户端发送 confirm.select 方法。根据是否设置 no-wait , Broker 可以使用 confirm.select-ok 来响应。一旦 confirm.select 在Channel上使用了,就被认为处于确认模式。事务Channel 不能进入确认模式,并且一旦Channel处于确认模式,则不能进行事务处理。
一旦Channel处于确认模式, Broker 和 客户端都会计数消息(在第一个 confirm.select 时,从1开始计数). Broker 通过在同一个Channel上发送一个 basic.ack 来确认消息。 delivery-tag 字段包含已经确认消息的序列号。Broker 也可以在 basic.ack 设置 multiple 以指示所有到达并包含具有序列号的消息已经被处理。
Java中的一个发布大量的消息到一个确认模式的Channel并等待确认的例子,可以在 这里 找到.
被动确认
在特殊情况下,当 Broker 无法成功处理消息时,Broker 会发送一个 basic.nack 而不是 basic.ack 。在这种情况下,basic.nack 的字段具有与 basic.ack 相应字段的相同含义,并且 requeue 字段应该被忽略。nack 一条或多条消息时,Broker 指示它无法处理消息,并拒绝为它们承担责任;在这点上,客户端可以选择重新发布消息。
Channel处理确认模式后,所有后续发布的消息将会被确认或 nack 一次。并不保证消息被确认要多久。不会存在消息既是确认又是 nack情况。
如果在负责该队列的Erlang进程中,发生内部错误时,则仅会分发 basic.nack 。
什么时候消息会被确认?
对于不可路由的消息,一旦交换机验证一条消息将不会被路由到任何队列(返回一个空的队列列表),Broker 将发出确认。如果消息也被发布为 mandatory 时,则在 basic.ack 之前将发送一个 basic.return 给客户端。被动确认( basic.nack )也如此。
对于可路由的消息,当消息被所有队列接受时,就发送 basic.ack 。对于路由到持久化队列的持久化消息,这意味着已经持久化到磁盘了。对于镜像队列,这意味着所有镜像都已经接受了该消息。
对于持久化消息的 ACK 延迟
对于持久化消息路由到一个持久化队列时,将在消息持久化到磁盘后发送 basic.ack 。RabbitMQ 消息存储间隔(几百毫秒)后,将消息分批存储,以最小化调用 fsync(2) 次数,或当队列空间时。这意味着,在一直恒定的负载下, basic.ack 的延迟可以达到几百毫秒。为了提高吞吐量,强烈建议应用程序进行异步处理确认(作为流),或者批量发布消息,并等待未完成的确认。不同的客户端库之间的具体API有所不同。
发布者确认的顺序注意事项
在大多数情况下,RabbitMQ 将与发布的相同顺序向发布者确认消息(这适用于在单个Channel上发布的消息。但是,发布者确认是异步发出的,可以确认单条消息或一组消息。发出确认的确切时刻取决于消息的分发模式( persistent VS transient) 以及消息被路由到队列的属性(见上面)。也就是说,不同的消息可以被认为是准备好在不同的时间进行确认的。这意味着与其各自的消息相比,确认可以以不同的顺序到达。应用程序不应该依赖于确认的顺序。
发布者确认和保证分发
如果在所有消息写入磁盘之前崩溃,Broker 将丢失持久的消息。
在某些情况下,这些会导致 Broker 会有意外的行为。
例如,考虑这种情况:
- 客户端向持久化队列发送一条持久化消息
- 客户端从队列中消费消息(注意,消息是持久化的,队列也是持久化的),但并没有 ack 它
- Broker 挂了,并且重启
- 客户端重连,并且开始消费消息
在这一点上,客户端可以合理地假设该消息将被再次分发。这并不是以下情况:重启导致 Broker 丢失消息。为了保证持久性,客户端应该使用确认。如果发布者的 Channel 已经处在确认模式,发布者将不会收到丢失的消息的 ACK(因为消息还没有写到磁盘)
限制
最大的分发标签(maximum delivery tag)
分发标签是一个 64位的长整型数值,因此它的最大值为 9223372036854775807 。由于分发标签是 Channel 私有的,因此在实践中,发布者或消费者不太可能会运行到这个值。