RabbitMQ与Spring AMQP(2)
00 分钟
2024-7-26
2024-7-27
type
status
date
Jul 27, 2024 01:09 PM
slug
summary
category
tags
password
icon

背景

RabbitMQ与Spring AMQP
RabbitMQ与Spring AMQP
中,介绍了消息队列的相关模型,以及如何使用Spring AMQP去解决RabbitMQ的消息模型。本节,我们继续来看消息的队列的高级应用:批量消息,消费重试,定时消息,并发消息,顺序消息等等~~~

批量消息

在一些业务场景下,我们希望使用 Publisher 批量发送消息,提高发送性能。Spring-AMQP 提供的批量发送消息,它提供了一个 MessageBatch 消息收集器,将发送给相同 Exchange + RoutingKey 的消息们,“偷偷”收集在一起,当满足条件时候,一次性批量发送提交给 RabbitMQ Broker 。
Spring-AMQP 通过 BatchingRabbitTemplate 提供批量发送消息的功能。如下是三个条件,满足任一即会批量发送
  • 【数量】batchSize :超过收集的消息数量的最大条数。
  • 【空间】bufferLimit :超过收集的消息占用的最大内存。
  • 【时间】timeout :超过收集的时间的最大等待时长,单位:毫秒。不过要注意,这里的超时开始计时的时间,是以最后一次发送时间为起点。也就说,每调用一次发送消息,都以当前时刻开始计时,重新到达 timeout 毫秒才算超时。
另外,BatchingRabbitTemplate 提供的批量发送消息的能力比较弱。对于同一个 BatchingRabbitTemplate 对象来说,同一时刻只能有一个批次(保证 Exchange + RoutingKey 相同),否则会报错。

批量发送消息

引入Maven依赖

配置文件

常量类

自定义消息

publisher

consumer

RabbitConfig

BatchingRabbitTemplate 通过重写 #send(String exchange, String routingKey, Message message, CorrelationData correlationData) 核心方法,实现批量发送的功能。感兴趣可以自己去研究下源码。

测试

日志输出如下,可以看到:
  1. 因为使用 BatchingRabbitTemplate 批量发送消息,所以在 Publisher 成功发送完第一条消息后,Consumer 并未消费到这条消息。
  1. 每条消息间隔10s发送,在最后一条消息发送完成之后,再等30s(大于超时时间)会立即消费消息。

优雅监听消息发布:基于注解声明队列和交换机

DemoBatchConsumer类中使用注解的方式来声明队列和交换机,并且同时进行绑定,不用在RabbitConfig 类中注入@Bean
RabbitConfig的代码如下,就很清晰简单了。
日志输出如下:

批量消费消息1

在上一节,我们已经实现批量发送消息到 RabbitMQ Broker 中。那么,我们来思考一个问题,这批消息在 RabbitMQ Broker 到底是存储一条消息,还是多条消息?
  • 如果使用过 Kafka、RocketMQ 这两个消息队列,那么判断肯定会是多条消息。
  • 从输出的日志中,我们可以看到逐条消息的消费,也会认为是多条消息。
💡
😭 实际上,RabbitMQ Broker 存储的是一条消息。又或者说,RabbitMQ 并没有提供批量接收消息的 API 接口。
那么,为什么我们能够实现呢?答案是批量发送消息是 Spring-AMQP 的 SimpleBatchingStrategy 所封装提供:
  • 在 Pubisher 最终批量发送消息时,SimpleBatchingStrategy 会通过 #assembleMessage() 方法,将批量发送的多条消息组装成一条“批量”消息,然后进行发送。
在一些业务场景下,我们希望使用 Consumer 批量消费消息,提高消费速度。在 Spring-AMQP 中,提供了两种批量消费消息的方式。
  1. 基于批量发送消息之上实现,在 SimpleBatchingStrategy 将一条“批量”消息拆开,变成多条消息后,直接批量交给 Consumer 进行消费处理。本节先来实现第一种方式。
  1. 批量消费消息2
下面代码如果不特殊说明,与上文保持一致。

RabbitConfig

  • 在 RabbitAnnotationDrivenConfiguration 自动化配置类中,它会默认创建一个名字为 rabbitListenerContainerFactory 的 SimpleRabbitListenerContainerFactory Bean ,可用于消费者的监听器是单个消费消费的
  • 我们自定义创建的一个名字为"consumerBatchContainerFactory" 的 SimpleRabbitListenerContainerFactory Bean ,可用于消费者的监听器是批量消费消费的。重点是factory.setBatchListener(true),配置消费者的监听器是批量消费消息的类型。

Consumer

  • 在类上的 @RabbitListener 注解的 containerFactory 属性,设置了SimpleRabbitListenerContainerFactory Bean ,表示它要批量消费消息。
  • 在 #onMessage(...) 消费方法上,修改方法入参的类型为 List 数组。

测试

  • 额外添加了 @EnableAsync 注解,因为我们稍后要使用 Spring 提供的异步调用的功能。
输出日志如下:

批量消费消息2

批量消费消息的第一种方式基于批量发送消息实现,有点过于苛刻,所以,Spring-AMQP 提供了第二种批量消费消息的方式。其实现方式是,阻塞等待最多 receiveTimeout 秒,拉取 batchSize 条消息,进行批量消费。
  1. 如果在 receiveTimeout 秒内已经成功拉取到 batchSize 条消息,则直接进行批量消费消息。
  1. 如果在 receiveTimeout 秒还没拉取到 batchSize 条消息,不再等待,而是进行批量消费消息。
不过 Spring-AMQP 的阻塞等待时长 receiveTimeout 的设计有点“神奇”。
  • 它代表的是,每次拉取一条消息,最多阻塞等待 receiveTimeout 时长。如果等待不到下一条消息,则进入已获取到的消息的批量消费。也就是说,极端情况下,可能等待 receiveTimeout * batchSize 时长,才会进行批量消费。
感兴趣的朋友,可以点击 SimpleMessageListenerContainer#doReceiveAndExecute(BlockingQueueConsumer consumer) 方法,简单阅读源码,即可快速理解。

RabbitConfig

相比RabbitConfig来说,额外增加了 batchSize = 10receiveTimeout = 30 * 1000LconsumerBatchEnabled = 30 * 1000L 属性。严格意义上来说,这才是真正意义上的批量消费消息

测试

  • #testSyncSend01() 方法,发送 3 条消息,测试 DemoBatchConsumer 获取数量为 batchSize = 10 消息,超时情况下的批量消费。
  • #testSyncSend02() 方法,发送 10 条消息,测试 DemoBatchConsumer 获取数量为 batchSize = 10 消息,未超时情况下的批量消费。
  1. 我们来执行 #testSyncSend01() 方法,超时情况下的批量消费。控制台输出如下,Consumer 30 秒超时等待后,批量消费到 3 条消息,符合预期。
    1. 执行 #testSyncSend02() 方法,未超时情况下的批量消费。控制台输出如下,Consumer 拉取到 10 条消息后,立即批量消费到 10 条消息,符合预期。

      消费重试

      在开始之前,首先要对 RabbitMQ 的死信队列的机制有一定的了解。
      在消息消费失败的时候,Spring-AMQP 会通过消费重试机制,重新投递该消息给 Consumer ,让 Consumer 有机会重新消费消息,实现消费成功。
      当然,Spring-AMQP 并不会无限重新投递消息给 Consumer 重新消费,而是在默认情况下,达到 N 次重试次数时,Consumer 还是消费失败时,该消息就会进入到死信队列。后续,我们可以通过对死信队列中的消息进行重发,来使得消费者实例再次进行消费。
      💡
      注意:
      • 消费重试和死信队列,是 RocketMQ 自带的功能。
      • 而在 RabbitMQ 中,消费重试是由 Spring-AMQP 所封装提供的,死信队列是 RabbitMQ 自带的功能。
      那么消费失败到达最大次数的消息,是怎么进入到死信队列的呢?Spring-AMQP 在消息到达最大消费次数的时候,会将该消息进行否定(basic.nack),并且 requeue=false ,这样后续就可以利用 RabbitMQ 的死信队列的机制,将该消息转发到死信队列。
      另外,每条消息的失败重试,是可以配置一定的间隔时间

      配置文件

      • 相比应用配置文件来说,我们通过新增 spring.rabbitmq.simple.retry.enable=true 配置项,来开启 Spring-AMQP 的消费重试的功能。同时,通过新增 max-attempts 和 initial-interval 配置项,设置重试次数和间隔。
        • 注意:
          1. max-attempts 配置项,是一条消息总共尝试消费 max-attempts次,包括首次的正常消费。
          1. 通过添加 spring.rabbitmq.listener.simple.retry.multiplier 配置项来实现递乘的时间间隔,添加 spring.rabbitmq.listener.simple.retry.max-interval 配置项来实现最大的时间间隔。
      在 Spring-AMQP 的消费重试机制中,在消费失败到达最大次数后,会自动抛出 AmqpRejectAndDontRequeueException 异常,从而结束该消息的消费重试。这意味着什么呢?如果我们在消费消息的逻辑中,主动抛出 AmqpRejectAndDontRequeueException 异常,也能结束该消息的消费重试。结束的方式,Spring-AMQP 是通过我们在上文中提到的 basic.nack + requeue=false ,从而实现转发该消息到死信队列中。
      另外,默认情况下,spring.rabbitmq.simple.retry.enable=false ,关闭 Spring-AMQP 的消费重试功能。但是实际上,消费发生异常的消息,还是会一直重新消费。这是为什么呢?Spring-AMQP 会将该消息通过 basic.nack + requeue=true ,重新投递回原队列的尾巴。如此,我们便会不断拉取到该消息,不断“重试”消费该消息。当然在这种情况下,我们一样可以主动抛出 AmqpRejectAndDontRequeueException 异常,也能结束该消息的消费重试。结束的方式,Spring-AMQP 也是通过我们在上文中提到的 basic.nack + requeue=false ,从而实现转发该消息到死信队列中。
      这里,我们再来简单说说 Spring-AMQP 是怎么提供消费重试的功能的。
      • Spring-AMQP 基于 spring-retry 项目提供的 RetryTemplate ,实现重试功能。Spring-AMQP 在获取到消息时,会交给 RetryTemplate 来调用消费者 Consumer 的监听器 Listener(就是我们实现的),实现该消息的多次消费重试。
      • 在该消息的每次消费失败后,RetryTemplate 会通过 BackOffPolicy 来进行计算,该消息的下一次重新消费的时间,通过 Thread#sleep(...) 方法,实现重新消费的时间间隔。到达时间间隔后,RetryTemplate 又会调用消费者 Consumer 的监听器 Listener 来消费该消息。
      • 当该消息的重试消费到达上限后,RetryTemplate 会调用 MethodInvocationRecoverer 回调来实现恢复。而 Spring-AMQP 自定义实现了 RejectAndDontRequeueRecoverer 来自动抛出 AmqpRejectAndDontRequeueException 异常,从而结束该消息的消费重试。😈 结束的方式,Spring-AMQP 是通过我们在上文中提到的 basic.nack + requeue=false ,从而实现转发该消息到死信队列中。
      • 有一点需要注意,Spring-AMQP 提供的消费重试的计数客户端级别的,重启 JVM 应用后,计数是会丢失的。所以,如果想要计数进行持久化,需要自己重新实现下。
        • 注意:RocketMQ 提供的消费重试的计数,目前是服务端级别,已经进行持久化。

      常量类

      RabbitConfig

      主要有以下两点变化:
      1. 创建的正常 Queue 额外设置了,当消息成为死信时,RabbitMQ 自动转发到 Exchange 为MsgConstant.EXCHANGE,RoutingKey 为 MsgConstant.DEAD_ROUTING_KEY 的死信队列中。
      1. 通过 #demoDeadQueue() 方法来创建死信队列的 Queue ,通过 #demoDeadBinding() 方法来创建死信队列的 Binding 。 因为我们重用了 Exchange 为 MsgConstant.EXCHANGE ,所以无需创建。当然也可以根据自己的需要,创建死信队列的 Exchange 。

      Consumer

      DeadConsumer

      测试

      输出日志如下,首先 Publisher 成功同步发送了 1 条消息,Consumer每次间隔1s尝试消费这条消息,但是每次都出现异常,没能消费掉,总共尝试了3次,该消息消费重试到达上限,全部都失败,最终该消息转发到死信队列中,同时打印异常堆栈,最后由DeadConsumer消费死信队列的该条消息。

      定时消息

      RabbitMQ 提供了过期时间 TTL 机制,可以设置消息在队列中的存活时长。在消息到达过期时间时,会从当前队列中删除,并被 RabbitMQ 自动转发到对应的死信队列中。
      那么,如果我们创建消费者 Consumer ,来消费该死信队列,是不是就实现了延迟队列的效果。如此,我们便实现了定时消息的功能。

      RabbitConfig

      Publisher

      测试

      1. #testSyncSend01() 方法,不设置消息的过期时间,使用队列默认的消息过期时间(10s),输出日志如下:
        1. #testSyncSend02() 方法,发送消息的过期时间为 5000 毫秒(5s),输出日志如下:

          RabbitMQ Delayed Message Plugin

          RabbitMQ 目前提供了 RabbitMQ Delayed Message Plugin 插件,提供更加通用的定时消息的功能,使用起来比较简单。
          生产环境下,还是推荐直接使用 RabbitMQ Delayed Message Plugin 插件的方式。毕竟这是 RabbitMQ 官方认可的插件,使用起来肯定是没错的。

          消息模式

          在消息队列中,有两种经典的消息模式:「点对点」和「发布订阅」。
          如果胖友有使用过 RocketMQ 或者 Kafka 消息队列,可能比较习惯的叫法是:
          • 集群消费(Clustering):对应「点对点」 集群消费模式下,相同 Consumer Group 的每个 Consumer 实例平均分摊消息。
          • 广播消费(Broadcasting):对应「发布订阅」 广播消费模式下,相同 Consumer Group 的每个 Consumer 实例都接收全量的消息。

          两种经典的消息模式

          点对点:Queue,不可重复消费

          消息生产者生产消息发送到queue中,然后消息消费者从queue中取出并且消费消息。 消息被消费以后,queue中不再有存储,所以消息消费者不可能消费到已经被消费的消息。Queue支持存在多个消费者,但是对一个消息而言,只会有一个消费者可以消费。
          notion image

          发布/订阅:Topic,可以重复消费

          消息生产者(发布)将消息发布到topic中,同时有多个消息消费者(订阅)消费该消息。和点对点方式不同,发布到topic的消息会被所有订阅者消费。
          notion image
          支持订阅组的发布订阅模式: 发布订阅模式下,当发布者消息量很大时,显然单个订阅者的处理能力是不足的。实际上现实场景中是多个订阅者节点组成一个订阅组负载均衡消费topic消息即分组订阅,这样订阅者很容易实现消费能力线性扩展。可以看成是一个topic下有多个Queue,每个Queue是点对点的方式,Queue之间是发布订阅方式。
          notion image

          区别

          点对点模式
          发布订阅模式
          生产者发送一条消息到queue,一个queue可以有很多消费者,但是一个消息只能被一个消费者接受,当没有消费者可用时,这个消息会被保存直到有 一个可用的消费者,所以Queue实现了一个可靠的负载均衡。
          发布者发送到topic的消息,只有订阅了topic的订阅者才会收到消息。topic实现了发布和订阅,当你发布一个消息,所有订阅这个topic的服务都能得到这个消息,所以从1到N个订阅者都能得到这个消息的拷贝。

          集群消费

          在 RabbitMQ 中,如果多个 Consumer 订阅相同的 Queue ,那么每一条消息有且仅会被一个 Consumer 所消费。这个特性,就为我们实现集群消费提供了基础。
          在本示例中,我们会把一个 Queue 作为一个 Consumer Group ,同时创建消费该 Queue 的 Consumer 。这样,在我们启动多个 JVM 进程时,就会有多个 Consumer 消费该 Queue ,从而实现集群消费的效果。

          ClusterMessage

          RabbitConfig

          ClusterPublisher

          ClusterConsumer

          • 在这里,我们创建了 Exchange 类型是 Topic 。
            • 解释
              1. 为什么不选择 Exchange 类型是 Direct 呢?考虑到集群消费的模式,会存在多 Consumer Group 消费的情况,显然我们要支持一条消息投递到多个 Queue 中,所以 Direct Exchange 基本就被排除了。
              1. 为什么不选择 Exchange 类型是 Fanout 或者 Headers 呢?实际是可以的,不过Spring Cloud Stream RabbitMQ 中默认是使用 Topic Exchange 的,所以这里也就使用 Topic Exchange 类型了。
          • 通过 key 属性,设置使用的 RoutingKey 为 # ,匹配所有。这就是为什么我们在ClusterMessage未定义 RoutingKey ,以及在ClusterPublisher中使用 routingKey = null 的原因。

          测试

          1. 执行 #mock1() 测试方法,先启动一个消费 "QUEUE_CLUSTER-GROUP-01" 这个 Queue 的 Consumer 节点。
          1. 执行 #mock2() 测试方法,再启动一个消费 "QUEUE_CLUSTER-GROUP-01" 这个 Queue 的 Consumer 节点。
          1. 执行 #testSyncSend() 测试方法,再启动一个消费 "QUEUE_CLUSTER-GROUP-01" 这个 Queue 的 Consumer 节点。同时,该测试方法,调用 ClusterPublisher#syncSend(id) 方法,同步发送了 3 条消息。
          输出日志如下,可以看到3 条消息,都仅被 三个 Consumer 节点的一个进行消费符合集群消费的预期。

          广播消费

          在实现集群消费时,我们通过“在 RabbitMQ 中,如果多个 Consumer 订阅相同的 Queue ,那么每一条消息有且仅会被一个 Consumer 所消费”特性,来实现了集群消费。但是,在实现广播消费时,这个特性恰恰成为了一种阻碍。
          不过我们可以通过给每个 Consumer 创建一个其独有 Queue,从而保证都能接收到全量的消息。同时,RabbitMQ 支持队列的自动删除,所以我们可以在 Consumer 关闭的时候,通过该功能删除其独有的 Queue 。

          BroadcastMessage

          BroadcastPublisher

          BroadcastConsumer

          变化主要有以下两点:
          1. 在 @Queue 注解的 name 属性,我们通过 Spring EL 表达式,在 Queue 的名字上,使用 UUID 生成其后缀。这样,我们就能保证每个项目启动的 Consumer 的 Queue 不同,以达到广播消费的目的。
          1. 在 @Queue 注解的 autoDelete 属性,我们设置为 "true" ,这样在 Consumer 关闭时,该队列就可以被自动删除了。

          测试

          1. 首先,执行 #mock() 测试方法,先启动一个消费 "QUEUE_BROADCAST-${UUID1}" 这个 Queue 的 Consumer 节点。
          1. 再执行 #testSyncSend() 测试方法,再启动一个消费 "QUEUE_BROADCAST-${UUID2}" 这个 Queue 的 Consumer 节点。同时,该测试方法,调用 BroadcastPublisher#syncSend(id) 方法,同步发送了 3 条消息。控制台输出如下,两个 Consumer 节点,都消费了这条发送的消息。符合广播消费的预期。

            并发消费

            在上述的示例中,我们配置的每一个 Spring-AMQP @RabbitListener ,都是串行消费的。显然,这在监听的 Queue 每秒消息量比较大的时候,会导致消费不及时,导致消息积压的问题。
            虽然说,我们可以通过启动多个 JVM 进程,实现多进程的并发消费,从而加速消费的速度。但是问题是,否能够实现多线程的并发消费呢?答案是
            在 @RabbitListener 注解中,有 concurrency 属性,它可以指定并发消费的线程数。例如说,如果设置 concurrency=4 时,Spring-AMQP 就会为 @RabbitListener 创建 4 个线程,进行并发消费。
            为了更好的理解 concurrency 属性,我们来简单说说 Spring-AMQP 在这块的实现方式。我们来举个例子:
            • 首先,我们来创建一个 Queue 为 QUEUE_CONCURRENCY_DEMO 。
            • 然后,我们创建一个 DemoConsumer 类,并在其消费方法上,添加 @RabbitListener(concurrency=2) 注解。
            • 再然后,我们启动项目。Spring-AMQP 会根据 @RabbitListener(concurrency=2) 注解,创建 2 个 RabbitMQ Consumer !后续,每个 RabbitMQ Consumer 会被单独分配到一个线程中,进行拉取消息,消费消息。

            配置文件

            在开始看具体的应用配置文件之前,我们先来了了解下 Spring-AMQP 的两个 ContainerType 容器类型,枚举如下:
            1. 第一种类型,SIMPLE 对应 SimpleMessageListenerContainer 消息监听器容器。它一共有两线程:
                • Consumer 线程,负责从 RabbitMQ Broker 获取 Queue 中的消息,存储到内存中的 BlockingQueue 阻塞队列中。
                • Listener 线程,负责从内存中的 BlockingQueue 获取消息,进行消费逻辑。
                注意,每一个 Consumer 线程,对应一个 RabbitMQ Consumer ,对应一个 Listener 线程。也就是说,它们三者是一一对应的。
            1. 第二种类型,DIRECT 对应 DirectMessageListenerContainer 消息监听器容器。它只有一类线程,即做 SIMPLE 的 Consumer 线程的工作,也做 SIMPLE 的 Listener 线程工作。
              1. 注意,因为只有一类线程,所以它要么正在获取消息,要么正在消费消息,也就是串行的。
            🔥 默认情况下,Spring-AMQP 选择使用第一种类型,即 SIMPLE 容器类型。
            • 比 配置文件 来说,额外三个参数:
              • spring.rabbitmq.listener.type
              • spring.rabbitmq.listener.simple.concurrency
              • spring.rabbitmq.listener.simple.max-concurrency
            注意,是 spring.rabbitmq.listener.simple.max-concurrency 配置,是限制每个 @RabbitListener 的允许配置的 concurrency 最大大小。如果超过,则会抛出 IllegalArgumentException 异常。

            Consumer

            • 通过 @RabbitListener 注解,设置并发数。优先级最高,可覆盖配置文件中的 spring.rabbitmq.listener.simple.concurrency 配置项。
            不过个人建议,还是每个 @RabbitListener 各自配置,毕竟每个 Queue 的消息数量,都是不同的。当然,也可以结合使用。
            RabbitConfig与config相同

            测试

            输入日志如下,可以看到,线程28 和线程29在交替消费QUEUE_CONCURRENCY_DEMO下的消息。

            顺序消息

            • 我们先来一起了解下顺序消息的定义:
              • 普通顺序消息 :Publisher 将相关联的消息发送到相同的消息队列。
              • 完全严格顺序 :在【普通顺序消息】的基础上,Consumer 严格顺序消费。
            • 那么,让我们来思考下,如果我们希望在 RabbitMQ 上,实现顺序消息需要做两个事情。
                1. 我们需要保证 RabbitMQ Publisher 发送相关联的消息发送到相同的 Queue 中。例如说,我们要发送用户信息发生变更的 Message ,那么如果我们希望使用顺序消息的情况下,可以将用户编号相同的消息发送到相同的 Queue 中。
                1. 我们在有且仅启动一个 Consumer 消费该队列,保证 Consumer 严格顺序消费。
            • 不过如果这样做,会存在两个问题,我们逐个来看看。
                1. 正如我们在「并发消费」中提到,如果我们将消息仅仅投递到一个 Queue 中,并且采用单个 Consumer 串行消费,在监听的 Queue 每秒消息量比较大的时候,会导致消费不及时,导致消息积压的问题。
                1. 如果我们启动相同 Consumer 的多个进程,会导致相同 Queue 的消息被分配到多个 Consumer 进行消费,破坏 Consumer 严格顺序消费。
            • 对于问题一,我们有两种方案来解决:
                1. 方案一,在 Publisher 端,将 Queue 拆成多个 Queue 。假设原先 Queue 是 QUEUE_USER ,那么我们就分拆成 QUEUE_USER_00 至 QUEUE_USER_..${N-1} 这样 N 个队列,然后基于消息的用户编号取余,路由到对应的 Queue 中。
                1. 方案二,在 Consumer 端,将 Queue 拉取到的消息,将相关联的消息发送到相同的线程中来消费。例如说,还是 Queue 是 QUEUE_USER 的例子,我们创建 N 个线程池大小为 1 的 ExecutorService 数组,然后基于消息的用户编号取余,提交到对应的 ExecutorService 中的单个线程来执行。
                两个方案,并不冲突,可以结合使用。
            • 对于问题二,我们也有两种方案来解决:
                1. 方案一,引入 ZooKeeper 来协调,动态设置多个进程中的相同的 Consumer 的开关,保证有且仅有一个 Consumer 开启对同一个 Queue 的消费。
                1. 方案二,仅适用于【问题一】的【方案一】。还是引入 ZooKeeper 来协调,动态设置多个进程中的相同的 Consumer 消费的 Queue 的分配,保证有且仅有一个 Consumer 开启对同一个 Queue 的消费。
            本小节解决:
            • 对于问题一,我们采用方案一。因为在 Spring-AMQP 中,自己定义线程来消费消息,无法和现有的 MessageListenerContainer 的实现所结合,除非自定义一个 MessageListenerContainer 实现类。
            • 对于问题二,因为实现起来比较复杂,暂时先不提供。

            DemoMessage

            • 定义了 QUEUE_DEMO_ 的四个 Queue 。
            • 定义了统一的 Exchange 。
            • 暂未定义 RoutingKey 的名字,我们会使用“队列编号”作为 RoutingKey ,然后路由消息到每个子 Queue 中。

            DemoPublisher

            创建 DemoPublisher  类,它会使用 Spring-AMQP 封装提供的 RabbitTemplate ,实现发送消息到 Queue 中。代码如下:
            发送消息时,我们对 DemoMessage.id % 队列编号 进行取余,获得队列编号作为 RoutingKey ,从而路由消息到对应的 Queue 中。

            DemoConsumer

            • 为了实现每个 Queue 能够被每个 Consumer 串行消费,从而实现基于 Queue 的并行严格消费顺序消息,我们只好在类上添了四个 @RabbitListener 注解,每个对应一个 Queue 。
            • 如果使用一个 @RabbitListener 注解,并添加四个 Queue ,然后设置 concurrency = 4 时,实际是并发四个线程,消费四个 Queue 的消息,无法保证严格消费顺序消息

            测试

            测试一共发送 8 条消息:发送 2 轮消息,每一轮消息的编号都是 0 至 3 。
            输出日志如下,可以看到,相同编号的消息,被投递到相同的 Queue ,被相同的线程所消费,符合预期。

            事务消息

            RabbitMQ 内置提供事务消息的支持。不过 RabbitMQ 提供的并不是完整的的事务消息的支持,缺少了回查机制。目前,常用的分布式消息队列,只有 RocketMQ 提供了完整的事务消息的支持。

            RabbitConfig

            • 在类上,添加 @EnableTransactionManagement 注解,开启Spring Transaction 的支持。
            • 标记创建的 RabbitMQ Channel 是事务性的,从而可以使用 RabbitMQ 的事务消息。
            因为 Spring-AMQP 通过 RabbitTransactionManager 来实现对 Spring Transaction 的集成,所以在实际开发中,我们只需要配合使用 @Transactional 注解,来声明事务即可。

            DemoPublisher

            • 在发送消息方法上添加了 @Transactional 注解,声明事务。因为创建了 RabbitTransactionManager 事务管理器,所以这里会创建 RabbitMQ 事务。
            • 故意等待 Thread#sleep(long millis) 10 秒,判断 RabbitMQ 事务是否生效。
              • 如果同步发送消息成功后,Consumer 立即消费到该消息,说明未生效。
              • 如果 Consumer 是 10 秒之后,才消费到该消息,说明已生效。

            测试

            输出日志如下,可以看到,Publisher 成功同步发送了 1 条消息。此时,事务并未提交。10 秒后,Publisher 提交事务。此时,Consumer 消费到该消息。

            消费者的消息确认

            在 RabbitMQ 中,Consumer 有两种消息确认的方式:
            1. 自动确认:RabbitMQ Broker 只要将消息写入到 TCP Socket 中成功,就认为该消息投递成功,而无需 Consumer 手动确认
            1. 手动确认:RabbitMQ Broker 将消息发送给 Consumer 之后,由 Consumer 手动确认之后,才算任务消息投递成功。
            实际场景下,因为自动确认存在可能丢失消息的情况,所以在对可靠性有要求的场景下,我们基本采用手动确认。当然,如果允许消息有一定的丢失,对性能有更高的产经下,我们可以考虑采用自动确认。
            更多关于消费者的消息确认的内容,胖友可以阅读如下的文章:
            在 Spring-AMQP 中,在 AcknowledgeMode 中,定义了三种消息确认的方式:
            • 实际上,就是将手动确认进一步细分,提供了由 Spring-AMQP 提供 Consumer 级别的自动确认。
            在上述的示例中,我们都采用了 Spring-AMQP 默认的 AUTO 模式。下面,我们来搭建一个 MANUAL 模式,手动确认的示例。

            配置文件

            新增 spring.rabbitmq.listener.simple.acknowledge-mode=true 配置项,来配置 Consumer 手动提交。

            DemoConsumer

            • 在消费方法上,我们增加类型为 Channel 的方法参数,和 deliveryTag 。通过调用其 Channel#basicAck(deliveryTag, multiple) 方法,可以进行消息的确认。
            • 在 @RabbitListener 注解的 ackMode 属性,我们可以设置自定义的 AcknowledgeMode 模式,这里的优先级最高,可以覆盖配置文件的内容。
            • 在消费逻辑中,我们故意只提交消费的消息的 DemoMessage.id 为奇数的消息。这样,我们只需要发送一条 id=1 ,一条 id=2 的消息,如果第二条的消费进度没有被提交,就可以说明手动提交消费进度成功。

            测试

            执行 #testSyncSend() 单元测试,输出日志如下,可以看到,Publisher成功同步发送 2 条消息,DemoConsumer成功消费 2 条消息。
             
            notion image

            生产者的发送确认

            在 RabbitMQ 中,默认情况下,Publisher 发送消息的方法,只保证将消息写入到 TCP Socket 中成功,并不保证消息发送到 RabbitMQ Broker 成功,并且持久化消息到磁盘成功。也就是说,我们上述的示例,Publisher 在发送消息都不是绝对可靠,存在丢失消息的可能性。
            不过不用担心,在 RabbitMQ 中,Publisher 采用 Confirm 模式,实现发送消息的确认机制,以保证消息发送的可靠性。实现原理如下:
            1. 首先,Publisher 通过调用 Channel#confirmSelect() 方法,将 Channel 设置为 Confirm 模式。
            1. 然后,在该 Channel 发送的消息时,需要先通过 Channel#getNextPublishSeqNo() 方法,给发送的消息分配一个唯一的 ID 编号(seqNo 从 1 开始递增),再发送该消息给 RabbitMQ Broker 。
            1. 之后,RabbitMQ Broker 在接收到该消息,并被路由到相应的队列之后,会发送一个包含消息的唯一编号(deliveryTag)的确认给Publisher。
            通过 seqNo 编号,将 Publisher 发送消息的“请求”,和 RabbitMQ Broker 确认消息的“响应”串联在一起。
            通过这样的方式,Publisher 就可以知道消息是否成功发送到 RabbitMQ Broker 之中,保证消息发送的可靠性。不过要注意,整个执行的过程实际是异步,需要我们调用 Channel#waitForConfirms() 方法,同步阻塞等待 RabbitMQ Broker 确认消息的“响应”。
            也因此,Publisher 采用 Confirm 模式时,有三种编程方式:
            1. 【同步】普通 Confirm 模式:Publisher 每发送一条消息后,调用 Channel#waitForConfirms() 方法,等待服务器端 Confirm。
            1. 【同步】批量 Confirm 模式:Publisher 每发送一批消息后,调用Channel#waitForConfirms() 方法,等待服务器端 Confirm。
              1. 本质上,和「普通 Confirm 模式」是一样的。
            1. 【异步】异步 Confirm 模式:Publisher 提供一个回调方法,RabbitMQ Broker 在 Confirm 了一条或者多条消息后,Publisher 会回调这个方法。
            更多关于 Producer 的 Confirm 模式的内容,胖友可以阅读如下的文章:
            在 Spring-AMQP 中,在 ConfirmType 中,定义了三种消息确认的方式:
            在上述的示例中,我们都采用了 Spring-AMQP 默认的 NONE 模式。下面,我们来搭建两个示例:
            • 「同步 Confirm 模式」 中,我们会使用 SIMPLE 类型,实现同步的 Confirm 模式。
              • 注意:这里的同步,指的是我们通过调用 Channel#waitForConfirms() 方法,同步阻塞等待 RabbitMQ Broker 确认消息的“响应”。

            同步 Confirm 模式

            配置文件

            • 新增spring.rabbitmq.publisher-confirm-type=simple 配置项,设置 Confirm 类型为 ConfirmType.SIMPLE 。
            在该类型下,Spring-AMQP 在创建完 RabbitMQ Channel 之后,会自动调用 Channel#confirmSelect() 方法,将 Channel 设置为 Confirm 模式。

            DemoPublisher

            在 RabbitTemplate 提供的 API 方法中,如果 Producer 要使用同步的 Confirm 模式,需要调用 #invoke(action, acks, nacks) 方法。代码如下:
            • 因为 Confirm 模式需要基于相同 Channel ,所以我们需要使用该方法。
            • 在方法参数 action 中,我们可以自定义操作。
            • 在方法参数 acks 中,定义接收到 RabbitMQ Broker 的成功“响应”时的成回调。
            • 在方法参数 nacks 中,定义接收到 RabbitMQ Broker 的失败“响应”时的成回调。
              • 当消息最终得到确认之后,生产者应用便可以通过回调方法来处理该确认消息。如果 RabbitMQ 因为自身内部错误导致消息丢失,就会发送一条 nack 消息,生产者应用程序同样可以在回调方法中处理该 nack 消息。

            测试

            输出日志如下:

            异步 Confirm 模式

            配置文件

            • 新增 spring.rabbitmq.publisher-confirm-type=correlated 配置项,设置 Confirm 类型为 ConfirmType.CORRELATED 。
            在该类型下,Spring-AMQP 在创建完 RabbitMQ Channel 之后,也会自动调用 Channel#confirmSelect() 方法,将 Channel 设置为 Confirm 模式。

            RabbitPublisherConfirmCallback

            实现 RabbitTemplate.ConfirmCallback 接口,提供 Producer 收到 RabbitMQ 确认消息的“响应”的回调。
            • 在构造方法中,把自己设置到 RabbitTemplate 中,作为 Confirm 的回调。
            • 在 #confirm(...) 方法中,根据是否 ack 成功,打印不同的日志。

            DemoPublisher

            测试

            执行 #testSyncSend() 单元测试,输出日志如下:

            ReturnCallback

            当 Producer 成功发送消息到 RabbitMQ Broker 时,但是在通过 Exchange 进行匹配不到 Queue 时,Broker 会将该消息回退给 Producer 。

            RabbitPublisherReturnCallback

            实现 RabbitTemplate.ReturnCallback 接口,提供 Producer 收到 RabbitMQ Broker 回退消息的的回调
            • 在构造方法中,把自己设置到 RabbitTemplate 中,作为消息 Return 的回调。
            • 在 #returnedMessage(...) 方法中,打印错误日志。当然,具体怎么处理,可以根据实际需求实现。

            DemoPublisher

            增加一个发送无法匹配到 Queue 的消息的方法

            测试

            执行 #testSyncSendReturn() 单元测试,输出日志如下:

            RPC 远程调用

            在 RabbitMQ 中,我们可以通过 Direct Reply-to 特性,实现 RPC 远程调用的功能。

            DemoPublisher

            主要有两点说明:
            • 创建 CorrelationData 对象,使用 UUID 作为唯一标识。
            • 调用 RabbitTemplate#convertSendAndReceive(exchange, routingKey, message, correlationData) 方法,Producer 发送消息,并等待结果。该结果,是 Consumer 消费消息,返回的结果。

            DemoConsumer

            设置队列里的消息无需持久化,毕竟 RPC 是个瞬态操作

            测试

            执行 #testSyncSend() 单元测试,输出日志如下:

            消费异常处理器

            「消费重试」中,我们一起看了下,在 Consumer 消费异常时,Spring-AMQP 提供的消费重试机制。除此之外,在 Spring-AMQP 中可以自定义消费异常时的处理器。目前有两个接口,可以实现对 Consumer 消费异常的处理:
            下面,我们来搭建一个 RabbitListenerErrorHandler 和 ErrorHandler 的使用示例。

            RabbitListenerErrorHandlerImpl

            1. 实现 RabbitListenerErrorHandler 接口。
            1. 在类上,添加 @Component 注解,并设置其 Bean 名为 "rabbitListenerErrorHandler" 。稍后会使用到该 Bean 名字。
            1. 在 #handleError(...) 方法中,先打印异常日志,并继续抛出 ListenerExecutionFailedException 异常。要注意,如果此时我们不继续抛出异常,而是 return 结果,意味着 Consumer 消息成功。如果我们结合「消费重试」一起使用的时候,一定要继续抛出该异常,否则消费重试机制将失效。

            RabbitLoggingErrorHandler

            • 在构造方法中,把自己设置到 SimpleRabbitListenerContainerFactory 中,作为其 ErrorHandler 异常处理器。
            • 在 #handleError(...) 方法中,打印错误日志。当然,具体怎么处理,可以根据自己的需要。
            1. 执行顺序上,RabbitListenerErrorHandler ErrorHandler 执行。不过这个需要建立在一个前提上,RabbitListenerErrorHandler 需要继续抛出异常。
            1. 另外,RabbitListenerErrorHandler 在每个 @RabbitListener 注解上,需要每个手动设置下 errorHandler 属性。而 ErrorHandler 是相对全局的,所有 SimpleRabbitListenerContainerFactory 创建的 SimpleMessageListenerContainer 都会生效。
            1. 具体选择 ErrorHandler 还是 RabbitLoggingErrorHandler ,暂时没有答案。不过个人感觉,如果不需要对 Consumer 消费的结果(包括成功和异常)做进一步处理,还是考虑 ErrorHandler 即可。在 ErrorHandler 中,我们可以通过判断 Throwable 异常是不是 ListenerExecutionFailedException 异常,从而拿到 Message 相关的信息。

            DemoConsumer

            • 在 #onMessage(...) 方法中,我们通过抛出 RuntimeException 异常,模拟消费异常。

            测试

            执行 #testSyncSend() 单元测试,输出日志如下:

            参考

             

            评论
            Loading...