type
status
date
Jul 27, 2024 01:09 PM
slug
summary
category
tags
password
icon
背景批量消息批量发送消息引入Maven依赖配置文件常量类自定义消息publisherconsumerRabbitConfig测试优雅监听消息发布:基于注解声明队列和交换机批量消费消息1RabbitConfigConsumer测试批量消费消息2RabbitConfig测试消费重试配置文件常量类RabbitConfigConsumerDeadConsumer测试定时消息RabbitConfigPublisher测试RabbitMQ Delayed Message Plugin消息模式两种经典的消息模式点对点:Queue,不可重复消费发布/订阅:Topic,可以重复消费区别集群消费ClusterMessageRabbitConfigClusterPublisherClusterConsumer测试广播消费BroadcastMessageBroadcastPublisherBroadcastConsumer测试并发消费配置文件Consumer测试顺序消息DemoMessageDemoPublisherDemoConsumer测试事务消息RabbitConfigDemoPublisher测试消费者的消息确认配置文件DemoConsumer测试生产者的发送确认同步 Confirm 模式配置文件DemoPublisher测试异步 Confirm 模式配置文件RabbitPublisherConfirmCallbackDemoPublisher测试ReturnCallbackRabbitPublisherReturnCallbackDemoPublisher测试RPC 远程调用DemoPublisherDemoConsumer测试消费异常处理器RabbitListenerErrorHandlerImplRabbitLoggingErrorHandlerDemoConsumer测试参考
背景
在 RabbitMQ与Spring AMQP 中,介绍了消息队列的相关模型,以及如何使用Spring AMQP去解决RabbitMQ的消息模型。本节,我们继续来看消息的队列的高级应用:批量消息,消费重试,定时消息,并发消息,顺序消息等等~~~
批量消息
在一些业务场景下,我们希望使用 Publisher 批量发送消息,提高发送性能。Spring-AMQP 提供的批量发送消息,它提供了一个 MessageBatch 消息收集器,将发送给相同 Exchange + RoutingKey 的消息们,“偷偷”收集在一起,当满足条件时候,一次性批量发送提交给 RabbitMQ Broker 。
Spring-AMQP 通过 BatchingRabbitTemplate 提供批量发送消息的功能。如下是三个条件,满足任一即会批量发送:
- 【数量】
batchSize
:超过收集的消息数量的最大条数。
- 【空间】
bufferLimit
:超过收集的消息占用的最大内存。
- 【时间】
timeout
:超过收集的时间的最大等待时长,单位:毫秒。不过要注意,这里的超时开始计时的时间,是以最后一次发送时间为起点。也就说,每调用一次发送消息,都以当前时刻开始计时,重新到达timeout
毫秒才算超时。
另外,BatchingRabbitTemplate 提供的批量发送消息的能力比较弱。对于同一个 BatchingRabbitTemplate 对象来说,同一时刻只能有一个批次(保证 Exchange + RoutingKey 相同),否则会报错。
批量发送消息
引入Maven依赖
配置文件
常量类
自定义消息
publisher
consumer
RabbitConfig
BatchingRabbitTemplate
通过重写 #send(String exchange, String routingKey, Message message, CorrelationData correlationData)
核心方法,实现批量发送的功能。感兴趣可以自己去研究下源码。测试
日志输出如下,可以看到:
- 因为使用 BatchingRabbitTemplate 批量发送消息,所以在 Publisher 成功发送完第一条消息后,Consumer 并未消费到这条消息。
- 每条消息间隔10s发送,在最后一条消息发送完成之后,再等30s(大于超时时间)会立即消费消息。
优雅监听消息发布:基于注解声明队列和交换机
在
DemoBatchConsumer
类中使用注解的方式来声明队列和交换机,并且同时进行绑定,不用在RabbitConfig
类中注入@Bean
。RabbitConfig
的代码如下,就很清晰简单了。日志输出如下:
批量消费消息1
在上一节,我们已经实现批量发送消息到 RabbitMQ Broker 中。那么,我们来思考一个问题,这批消息在 RabbitMQ Broker 到底是存储一条消息,还是多条消息?
- 如果使用过 Kafka、RocketMQ 这两个消息队列,那么判断肯定会是多条消息。
- 从输出的日志中,我们可以看到逐条消息的消费,也会认为是多条消息。
😭 实际上,RabbitMQ Broker 存储的是一条消息。又或者说,RabbitMQ 并没有提供批量接收消息的 API 接口。
那么,为什么我们能够实现呢?答案是批量发送消息是 Spring-AMQP 的 SimpleBatchingStrategy 所封装提供:
- 在 Pubisher 最终批量发送消息时,SimpleBatchingStrategy 会通过
#assembleMessage()
方法,将批量发送的多条消息组装成一条“批量”消息,然后进行发送。
- 在 Consumer 拉取到消息时,会根据
#canDebatch(MessageProperties properties)
方法,判断该消息是否为一条“批量”消息?如果是,则调用# deBatch(Message message, Consumer<Message> fragmentConsumer)
方法,将一条“批量”消息拆开,变成多条消息。
在一些业务场景下,我们希望使用 Consumer 批量消费消息,提高消费速度。在 Spring-AMQP 中,提供了两种批量消费消息的方式。
- 基于批量发送消息之上实现,在 SimpleBatchingStrategy 将一条“批量”消息拆开,变成多条消息后,直接批量交给 Consumer 进行消费处理。本节先来实现第一种方式。
下面代码如果不特殊说明,与上文保持一致。
RabbitConfig
- 在 RabbitAnnotationDrivenConfiguration 自动化配置类中,它会默认创建一个名字为
rabbitListenerContainerFactory
的 SimpleRabbitListenerContainerFactory Bean ,可用于消费者的监听器是单个消费消费的。
- 我们自定义创建的一个名字为
"consumerBatchContainerFactory"
的 SimpleRabbitListenerContainerFactory Bean ,可用于消费者的监听器是批量消费消费的。重点是factory.setBatchListener(true)
,配置消费者的监听器是批量消费消息的类型。
Consumer
- 在类上的
@RabbitListener
注解的containerFactory
属性,设置了SimpleRabbitListenerContainerFactory Bean
,表示它要批量消费消息。
- 在
#onMessage(...)
消费方法上,修改方法入参的类型为 List 数组。
测试
- 额外添加了
@EnableAsync
注解,因为我们稍后要使用 Spring 提供的异步调用的功能。
输出日志如下:
批量消费消息2
批量消费消息的第一种方式基于批量发送消息实现,有点过于苛刻,所以,Spring-AMQP 提供了第二种批量消费消息的方式。其实现方式是,阻塞等待最多
receiveTimeout
秒,拉取 batchSize
条消息,进行批量消费。- 如果在
receiveTimeout
秒内已经成功拉取到batchSize
条消息,则直接进行批量消费消息。
- 如果在
receiveTimeout
秒还没拉取到batchSize
条消息,不再等待,而是进行批量消费消息。
不过 Spring-AMQP 的阻塞等待时长
receiveTimeout
的设计有点“神奇”。- 它代表的是,每次拉取一条消息,最多阻塞等待
receiveTimeout
时长。如果等待不到下一条消息,则进入已获取到的消息的批量消费。也就是说,极端情况下,可能等待receiveTimeout * batchSize
时长,才会进行批量消费。
感兴趣的朋友,可以点击SimpleMessageListenerContainer#doReceiveAndExecute(BlockingQueueConsumer consumer)
方法,简单阅读源码,即可快速理解。
RabbitConfig
相比RabbitConfig来说,额外增加了
batchSize = 10
、receiveTimeout = 30 * 1000L
、consumerBatchEnabled = 30 * 1000L
属性。严格意义上来说,这才是真正意义上的批量消费消息。测试
#testSyncSend01()
方法,发送 3 条消息,测试 DemoBatchConsumer 获取数量为batchSize = 10
消息,超时情况下的批量消费。
#testSyncSend02()
方法,发送 10 条消息,测试 DemoBatchConsumer 获取数量为batchSize = 10
消息,未超时情况下的批量消费。
- 我们来执行
#testSyncSend01()
方法,超时情况下的批量消费。控制台输出如下,Consumer 30 秒超时等待后,批量消费到 3 条消息,符合预期。
- 执行
#testSyncSend02()
方法,未超时情况下的批量消费。控制台输出如下,Consumer 拉取到 10 条消息后,立即批量消费到 10 条消息,符合预期。
消费重试
在开始之前,首先要对 RabbitMQ 的死信队列的机制有一定的了解。
在消息消费失败的时候,Spring-AMQP 会通过消费重试机制,重新投递该消息给 Consumer ,让 Consumer 有机会重新消费消息,实现消费成功。
当然,Spring-AMQP 并不会无限重新投递消息给 Consumer 重新消费,而是在默认情况下,达到 N 次重试次数时,Consumer 还是消费失败时,该消息就会进入到死信队列。后续,我们可以通过对死信队列中的消息进行重发,来使得消费者实例再次进行消费。
注意:
- 消费重试和死信队列,是 RocketMQ 自带的功能。
- 而在 RabbitMQ 中,消费重试是由 Spring-AMQP 所封装提供的,死信队列是 RabbitMQ 自带的功能。
那么消费失败到达最大次数的消息,是怎么进入到死信队列的呢?Spring-AMQP 在消息到达最大消费次数的时候,会将该消息进行否定(
basic.nack
),并且 requeue=false
,这样后续就可以利用 RabbitMQ 的死信队列的机制,将该消息转发到死信队列。另外,每条消息的失败重试,是可以配置一定的间隔时间。
配置文件
- 相比应用配置文件来说,我们通过新增
spring.rabbitmq.simple.retry.enable=true
配置项,来开启 Spring-AMQP 的消费重试的功能。同时,通过新增max-attempts
和initial-interval
配置项,设置重试次数和间隔。 max-attempts
配置项,是一条消息总共尝试消费max-attempts
次,包括首次的正常消费。- 通过添加
spring.rabbitmq.listener.simple.retry.multiplier
配置项来实现递乘的时间间隔,添加spring.rabbitmq.listener.simple.retry.max-interval
配置项来实现最大的时间间隔。
注意:
在 Spring-AMQP 的消费重试机制中,在消费失败到达最大次数后,会自动抛出 AmqpRejectAndDontRequeueException 异常,从而结束该消息的消费重试。这意味着什么呢?如果我们在消费消息的逻辑中,主动抛出 AmqpRejectAndDontRequeueException 异常,也能结束该消息的消费重试。结束的方式,Spring-AMQP 是通过我们在上文中提到的
basic.nack
+ requeue=false
,从而实现转发该消息到死信队列中。另外,默认情况下,
spring.rabbitmq.simple.retry.enable=false
,关闭 Spring-AMQP 的消费重试功能。但是实际上,消费发生异常的消息,还是会一直重新消费。这是为什么呢?Spring-AMQP 会将该消息通过 basic.nack
+ requeue=true
,重新投递回原队列的尾巴。如此,我们便会不断拉取到该消息,不断“重试”消费该消息。当然在这种情况下,我们一样可以主动抛出 AmqpRejectAndDontRequeueException 异常,也能结束该消息的消费重试。结束的方式,Spring-AMQP 也是通过我们在上文中提到的 basic.nack
+ requeue=false
,从而实现转发该消息到死信队列中。这里,我们再来简单说说 Spring-AMQP 是怎么提供消费重试的功能的。
- Spring-AMQP 基于 spring-retry 项目提供的 RetryTemplate ,实现重试功能。Spring-AMQP 在获取到消息时,会交给 RetryTemplate 来调用消费者 Consumer 的监听器 Listener(就是我们实现的),实现该消息的多次消费重试。
- 在该消息的每次消费失败后,RetryTemplate 会通过 BackOffPolicy 来进行计算,该消息的下一次重新消费的时间,通过
Thread#sleep(...)
方法,实现重新消费的时间间隔。到达时间间隔后,RetryTemplate 又会调用消费者 Consumer 的监听器 Listener 来消费该消息。
- 当该消息的重试消费到达上限后,RetryTemplate 会调用 MethodInvocationRecoverer 回调来实现恢复。而 Spring-AMQP 自定义实现了 RejectAndDontRequeueRecoverer 来自动抛出 AmqpRejectAndDontRequeueException 异常,从而结束该消息的消费重试。😈 结束的方式,Spring-AMQP 是通过我们在上文中提到的
basic.nack
+requeue=false
,从而实现转发该消息到死信队列中。
- 有一点需要注意,Spring-AMQP 提供的消费重试的计数是客户端级别的,重启 JVM 应用后,计数是会丢失的。所以,如果想要计数进行持久化,需要自己重新实现下。
注意:RocketMQ 提供的消费重试的计数,目前是服务端级别,已经进行持久化。
常量类
RabbitConfig
主要有以下两点变化:
- 创建的正常 Queue 额外设置了,当消息成为死信时,RabbitMQ 自动转发到 Exchange 为
MsgConstant.EXCHANGE
,RoutingKey 为MsgConstant.DEAD_ROUTING_KEY
的死信队列中。
- 通过
#demoDeadQueue()
方法来创建死信队列的 Queue ,通过#demoDeadBinding()
方法来创建死信队列的 Binding 。 因为我们重用了 Exchange 为MsgConstant.EXCHANGE
,所以无需创建。当然也可以根据自己的需要,创建死信队列的 Exchange 。
Consumer
DeadConsumer
测试
输出日志如下,首先
Publisher
成功同步发送了 1 条消息,Consumer
每次间隔1s
尝试消费这条消息,但是每次都出现异常,没能消费掉,总共尝试了3次,该消息消费重试到达上限,全部都失败,最终该消息转发到死信队列中,同时打印异常堆栈,最后由DeadConsumer
消费死信队列的该条消息。定时消息
RabbitMQ 提供了过期时间 TTL 机制,可以设置消息在队列中的存活时长。在消息到达过期时间时,会从当前队列中删除,并被 RabbitMQ 自动转发到对应的死信队列中。
那么,如果我们创建消费者 Consumer ,来消费该死信队列,是不是就实现了延迟队列的效果。如此,我们便实现了定时消息的功能。
RabbitConfig
Publisher
测试
#testSyncSend01()
方法,不设置消息的过期时间,使用队列默认的消息过期时间(10s),输出日志如下:
#testSyncSend02()
方法,发送消息的过期时间为 5000 毫秒(5s),输出日志如下:
RabbitMQ Delayed Message Plugin
RabbitMQ 目前提供了 RabbitMQ Delayed Message Plugin 插件,提供更加通用的定时消息的功能,使用起来比较简单。
生产环境下,还是推荐直接使用 RabbitMQ Delayed Message Plugin 插件的方式。毕竟这是 RabbitMQ 官方认可的插件,使用起来肯定是没错的。
消息模式
在消息队列中,有两种经典的消息模式:「点对点」和「发布订阅」。
如果胖友有使用过 RocketMQ 或者 Kafka 消息队列,可能比较习惯的叫法是:
- 集群消费(Clustering):对应「点对点」 集群消费模式下,相同 Consumer Group 的每个 Consumer 实例平均分摊消息。
- 广播消费(Broadcasting):对应「发布订阅」 广播消费模式下,相同 Consumer Group 的每个 Consumer 实例都接收全量的消息。
两种经典的消息模式
点对点:Queue,不可重复消费
消息生产者生产消息发送到queue中,然后消息消费者从queue中取出并且消费消息。 消息被消费以后,queue中不再有存储,所以消息消费者不可能消费到已经被消费的消息。Queue支持存在多个消费者,但是对一个消息而言,只会有一个消费者可以消费。
发布/订阅:Topic,可以重复消费
消息生产者(发布)将消息发布到topic中,同时有多个消息消费者(订阅)消费该消息。和点对点方式不同,发布到topic的消息会被所有订阅者消费。
支持订阅组的发布订阅模式: 发布订阅模式下,当发布者消息量很大时,显然单个订阅者的处理能力是不足的。实际上现实场景中是多个订阅者节点组成一个订阅组负载均衡消费topic消息即分组订阅,这样订阅者很容易实现消费能力线性扩展。可以看成是一个topic下有多个Queue,每个Queue是点对点的方式,Queue之间是发布订阅方式。
区别
点对点模式 | 发布订阅模式 |
生产者发送一条消息到queue,一个queue可以有很多消费者,但是一个消息只能被一个消费者接受,当没有消费者可用时,这个消息会被保存直到有 一个可用的消费者,所以Queue实现了一个可靠的负载均衡。 | 发布者发送到topic的消息,只有订阅了topic的订阅者才会收到消息。topic实现了发布和订阅,当你发布一个消息,所有订阅这个topic的服务都能得到这个消息,所以从1到N个订阅者都能得到这个消息的拷贝。 |
集群消费
在 RabbitMQ 中,如果多个 Consumer 订阅相同的 Queue ,那么每一条消息有且仅会被一个 Consumer 所消费。这个特性,就为我们实现集群消费提供了基础。
在本示例中,我们会把一个 Queue 作为一个 Consumer Group ,同时创建消费该 Queue 的 Consumer 。这样,在我们启动多个 JVM 进程时,就会有多个 Consumer 消费该 Queue ,从而实现集群消费的效果。
ClusterMessage
RabbitConfig
ClusterPublisher
ClusterConsumer
- 在这里,我们创建了 Exchange 类型是 Topic 。
- 为什么不选择 Exchange 类型是 Direct 呢?考虑到集群消费的模式,会存在多 Consumer Group 消费的情况,显然我们要支持一条消息投递到多个 Queue 中,所以 Direct Exchange 基本就被排除了。
- 为什么不选择 Exchange 类型是 Fanout 或者 Headers 呢?实际是可以的,不过Spring Cloud Stream RabbitMQ 中默认是使用 Topic Exchange 的,所以这里也就使用 Topic Exchange 类型了。
解释
- 通过
key
属性,设置使用的 RoutingKey 为#
,匹配所有。这就是为什么我们在ClusterMessage未定义 RoutingKey ,以及在ClusterPublisher中使用routingKey = null
的原因。
测试
- 执行
#mock1()
测试方法,先启动一个消费"QUEUE_CLUSTER-GROUP-01"
这个 Queue 的 Consumer 节点。
- 执行
#mock2()
测试方法,再启动一个消费"QUEUE_CLUSTER-GROUP-01"
这个 Queue 的 Consumer 节点。
- 执行
#testSyncSend()
测试方法,再启动一个消费"QUEUE_CLUSTER-GROUP-01"
这个 Queue 的 Consumer 节点。同时,该测试方法,调用ClusterPublisher#syncSend(id)
方法,同步发送了 3 条消息。
输出日志如下,可以看到3 条消息,都仅被 三个 Consumer 节点的一个进行消费,符合集群消费的预期。
广播消费
在实现集群消费时,我们通过“在 RabbitMQ 中,如果多个 Consumer 订阅相同的 Queue ,那么每一条消息有且仅会被一个 Consumer 所消费”特性,来实现了集群消费。但是,在实现广播消费时,这个特性恰恰成为了一种阻碍。
不过我们可以通过给每个 Consumer 创建一个其独有 Queue,从而保证都能接收到全量的消息。同时,RabbitMQ 支持队列的自动删除,所以我们可以在 Consumer 关闭的时候,通过该功能删除其独有的 Queue 。
BroadcastMessage
BroadcastPublisher
BroadcastConsumer
变化主要有以下两点:
- 在
@Queue
注解的name
属性,我们通过Spring EL
表达式,在 Queue 的名字上,使用 UUID 生成其后缀。这样,我们就能保证每个项目启动的 Consumer 的 Queue 不同,以达到广播消费的目的。
- 在
@Queue
注解的autoDelete
属性,我们设置为"true"
,这样在 Consumer 关闭时,该队列就可以被自动删除了。
测试
- 首先,执行
#mock()
测试方法,先启动一个消费"QUEUE_BROADCAST-${UUID1}"
这个 Queue 的 Consumer 节点。
- 再执行
#testSyncSend()
测试方法,再启动一个消费"QUEUE_BROADCAST-${UUID2}"
这个 Queue 的 Consumer 节点。同时,该测试方法,调用BroadcastPublisher#syncSend(id)
方法,同步发送了 3 条消息。控制台输出如下,两个 Consumer 节点,都消费了这条发送的消息。符合广播消费的预期。
并发消费
在上述的示例中,我们配置的每一个 Spring-AMQP
@RabbitListener
,都是串行消费的。显然,这在监听的 Queue 每秒消息量比较大的时候,会导致消费不及时,导致消息积压的问题。虽然说,我们可以通过启动多个 JVM 进程,实现多进程的并发消费,从而加速消费的速度。但是问题是,否能够实现多线程的并发消费呢?答案是有。
在
@RabbitListener
注解中,有 concurrency
属性,它可以指定并发消费的线程数。例如说,如果设置 concurrency=4
时,Spring-AMQP 就会为该 @RabbitListener
创建 4 个线程,进行并发消费。为了更好的理解
concurrency
属性,我们来简单说说 Spring-AMQP 在这块的实现方式。我们来举个例子:- 首先,我们来创建一个 Queue 为
QUEUE_CONCURRENCY_DEMO
。
- 然后,我们创建一个 DemoConsumer 类,并在其消费方法上,添加
@RabbitListener(concurrency=2)
注解。
- 再然后,我们启动项目。Spring-AMQP 会根据
@RabbitListener(concurrency=2)
注解,创建 2 个 RabbitMQ Consumer !后续,每个 RabbitMQ Consumer 会被单独分配到一个线程中,进行拉取消息,消费消息。
配置文件
在开始看具体的应用配置文件之前,我们先来了了解下 Spring-AMQP 的两个 ContainerType 容器类型,枚举如下:
- 第一种类型,
SIMPLE
对应 SimpleMessageListenerContainer 消息监听器容器。它一共有两类线程: - Consumer 线程,负责从 RabbitMQ Broker 获取 Queue 中的消息,存储到内存中的 BlockingQueue 阻塞队列中。
- Listener 线程,负责从内存中的 BlockingQueue 获取消息,进行消费逻辑。
注意,每一个
Consumer
线程,对应一个 RabbitMQ Consumer
,对应一个 Listener
线程。也就是说,它们三者是一一对应的。- 第二种类型,
DIRECT
对应 DirectMessageListenerContainer 消息监听器容器。它只有一类线程,即做SIMPLE
的 Consumer 线程的工作,也做SIMPLE
的 Listener 线程工作。
注意,因为只有一类线程,所以它要么正在获取消息,要么正在消费消息,也就是串行的。
🔥 默认情况下,Spring-AMQP 选择使用第一种类型,即SIMPLE
容器类型。
- 比 配置文件 来说,额外三个参数:
spring.rabbitmq.listener.type
spring.rabbitmq.listener.simple.concurrency
spring.rabbitmq.listener.simple.max-concurrency
要注意,是
spring.rabbitmq.listener.simple.max-concurrency
配置,是限制每个 @RabbitListener
的允许配置的 concurrency
最大大小。如果超过,则会抛出 IllegalArgumentException 异常。Consumer
- 通过
@RabbitListener
注解,设置并发数。优先级最高,可覆盖配置文件中的spring.rabbitmq.listener.simple.concurrency
配置项。
不过个人建议,还是每个@RabbitListener
各自配置,毕竟每个 Queue 的消息数量,都是不同的。当然,也可以结合使用。
RabbitConfig与config相同
测试
输入日志如下,可以看到,线程28 和线程29在交替消费
QUEUE_CONCURRENCY_DEMO
下的消息。顺序消息
- 我们先来一起了解下顺序消息的定义:
- 普通顺序消息 :Publisher 将相关联的消息发送到相同的消息队列。
- 完全严格顺序 :在【普通顺序消息】的基础上,Consumer 严格顺序消费。
- 那么,让我们来思考下,如果我们希望在 RabbitMQ 上,实现顺序消息需要做两个事情。
- 我们需要保证 RabbitMQ Publisher 发送相关联的消息发送到相同的 Queue 中。例如说,我们要发送用户信息发生变更的 Message ,那么如果我们希望使用顺序消息的情况下,可以将用户编号相同的消息发送到相同的 Queue 中。
- 我们在有且仅启动一个 Consumer 消费该队列,保证 Consumer 严格顺序消费。
- 不过如果这样做,会存在两个问题,我们逐个来看看。
- 正如我们在「并发消费」中提到,如果我们将消息仅仅投递到一个 Queue 中,并且采用单个 Consumer 串行消费,在监听的 Queue 每秒消息量比较大的时候,会导致消费不及时,导致消息积压的问题。
- 如果我们启动相同 Consumer 的多个进程,会导致相同 Queue 的消息被分配到多个 Consumer 进行消费,破坏 Consumer 严格顺序消费。
- 对于问题一,我们有两种方案来解决:
- 方案一,在 Publisher 端,将 Queue 拆成多个子 Queue 。假设原先 Queue 是
QUEUE_USER
,那么我们就分拆成QUEUE_USER_00
至QUEUE_USER_..${N-1}
这样 N 个队列,然后基于消息的用户编号取余,路由到对应的子 Queue 中。 - 方案二,在 Consumer 端,将 Queue 拉取到的消息,将相关联的消息发送到相同的线程中来消费。例如说,还是 Queue 是
QUEUE_USER
的例子,我们创建 N 个线程池大小为 1 的 ExecutorService 数组,然后基于消息的用户编号取余,提交到对应的 ExecutorService 中的单个线程来执行。
两个方案,并不冲突,可以结合使用。
- 对于问题二,我们也有两种方案来解决:
- 方案一,引入 ZooKeeper 来协调,动态设置多个进程中的相同的 Consumer 的开关,保证有且仅有一个 Consumer 开启对同一个 Queue 的消费。
- 方案二,仅适用于【问题一】的【方案一】。还是引入 ZooKeeper 来协调,动态设置多个进程中的相同的 Consumer 消费的 Queue 的分配,保证有且仅有一个 Consumer 开启对同一个 Queue 的消费。
本小节解决:
- 对于问题一,我们采用方案一。因为在 Spring-AMQP 中,自己定义线程来消费消息,无法和现有的 MessageListenerContainer 的实现所结合,除非自定义一个 MessageListenerContainer 实现类。
- 对于问题二,因为实现起来比较复杂,暂时先不提供。
DemoMessage
- 定义了
QUEUE_DEMO_
的四个子 Queue 。
- 定义了统一的 Exchange 。
- 暂未定义 RoutingKey 的名字,我们会使用“队列编号”作为 RoutingKey ,然后路由消息到每个子 Queue 中。
DemoPublisher
创建 DemoPublisher 类,它会使用 Spring-AMQP 封装提供的 RabbitTemplate ,实现发送消息到子 Queue 中。代码如下:
发送消息时,我们对
DemoMessage.id % 队列编号
进行取余,获得队列编号作为 RoutingKey ,从而路由消息到对应的子 Queue 中。DemoConsumer
- 为了实现每个子 Queue 能够被每个 Consumer 串行消费,从而实现基于子 Queue 的并行的严格消费顺序消息,我们只好在类上添了四个
@RabbitListener
注解,每个对应一个子 Queue 。
- 如果使用一个
@RabbitListener
注解,并添加四个子 Queue ,然后设置concurrency = 4
时,实际是并发四个线程,消费四个子 Queue 的消息,无法保证严格消费顺序消息。
测试
测试一共发送 8 条消息:发送 2 轮消息,每一轮消息的编号都是 0 至 3 。
输出日志如下,可以看到,相同编号的消息,被投递到相同的子 Queue ,被相同的线程所消费,符合预期。
事务消息
RabbitMQ 内置提供事务消息的支持。不过 RabbitMQ 提供的并不是完整的的事务消息的支持,缺少了回查机制。目前,常用的分布式消息队列,只有 RocketMQ 提供了完整的事务消息的支持。
RabbitConfig
- 在类上,添加
@EnableTransactionManagement
注解,开启Spring Transaction 的支持。
- 在
#rabbitTransactionManager()
方法,创建 RabbitTransactionManager 事务管理器 Bean 。
- 标记创建的 RabbitMQ Channel 是事务性的,从而可以使用 RabbitMQ 的事务消息。
因为 Spring-AMQP 通过 RabbitTransactionManager 来实现对 Spring Transaction 的集成,所以在实际开发中,我们只需要配合使用
@Transactional
注解,来声明事务即可。DemoPublisher
- 在发送消息方法上添加了
@Transactional
注解,声明事务。因为创建了 RabbitTransactionManager 事务管理器,所以这里会创建 RabbitMQ 事务。
- 故意等待
Thread#sleep(long millis)
10 秒,判断 RabbitMQ 事务是否生效。 - 如果同步发送消息成功后,Consumer 立即消费到该消息,说明未生效。
- 如果 Consumer 是 10 秒之后,才消费到该消息,说明已生效。
测试
输出日志如下,可以看到,Publisher 成功同步发送了 1 条消息。此时,事务并未提交。10 秒后,Publisher 提交事务。此时,Consumer 消费到该消息。
消费者的消息确认
在 RabbitMQ 中,Consumer 有两种消息确认的方式:
- 自动确认:RabbitMQ Broker 只要将消息写入到 TCP Socket 中成功,就认为该消息投递成功,而无需 Consumer 手动确认。
- 手动确认:RabbitMQ Broker 将消息发送给 Consumer 之后,由 Consumer 手动确认之后,才算任务消息投递成功。
实际场景下,因为自动确认存在可能丢失消息的情况,所以在对可靠性有要求的场景下,我们基本采用手动确认。当然,如果允许消息有一定的丢失,对性能有更高的产经下,我们可以考虑采用自动确认。
更多关于消费者的消息确认的内容,胖友可以阅读如下的文章:
在 Spring-AMQP 中,在 AcknowledgeMode 中,定义了三种消息确认的方式:
- 实际上,就是将手动确认进一步细分,提供了由 Spring-AMQP 提供 Consumer 级别的自动确认。
在上述的示例中,我们都采用了 Spring-AMQP 默认的
AUTO
模式。下面,我们来搭建一个 MANUAL
模式,手动确认的示例。配置文件
新增
spring.rabbitmq.listener.simple.acknowledge-mode=true
配置项,来配置 Consumer 手动提交。DemoConsumer
- 在消费方法上,我们增加类型为 Channel 的方法参数,和
deliveryTag
。通过调用其Channel#basicAck(deliveryTag, multiple)
方法,可以进行消息的确认。
- 在
@RabbitListener
注解的ackMode
属性,我们可以设置自定义的 AcknowledgeMode 模式,这里的优先级最高,可以覆盖配置文件的内容。
- 在消费逻辑中,我们故意只提交消费的消息的
DemoMessage.id
为奇数的消息。这样,我们只需要发送一条id=1
,一条id=2
的消息,如果第二条的消费进度没有被提交,就可以说明手动提交消费进度成功。
测试
执行
#testSyncSend()
单元测试,输出日志如下,可以看到,Publisher
成功同步发送 2 条消息,DemoConsumer
成功消费 2 条消息。生产者的发送确认
在 RabbitMQ 中,默认情况下,Publisher 发送消息的方法,只保证将消息写入到 TCP Socket 中成功,并不保证消息发送到 RabbitMQ Broker 成功,并且持久化消息到磁盘成功。也就是说,我们上述的示例,Publisher 在发送消息都不是绝对可靠,存在丢失消息的可能性。
不过不用担心,在 RabbitMQ 中,Publisher 采用 Confirm 模式,实现发送消息的确认机制,以保证消息发送的可靠性。实现原理如下:
- 首先,Publisher 通过调用
Channel#confirmSelect()
方法,将 Channel 设置为 Confirm 模式。
- 然后,在该 Channel 发送的消息时,需要先通过
Channel#getNextPublishSeqNo()
方法,给发送的消息分配一个唯一的 ID 编号(seqNo
从 1 开始递增),再发送该消息给 RabbitMQ Broker 。
- 之后,RabbitMQ Broker 在接收到该消息,并被路由到相应的队列之后,会发送一个包含消息的唯一编号(
deliveryTag
)的确认给Publisher。
通过
seqNo
编号,将 Publisher 发送消息的“请求”,和 RabbitMQ Broker 确认消息的“响应”串联在一起。通过这样的方式,Publisher 就可以知道消息是否成功发送到 RabbitMQ Broker 之中,保证消息发送的可靠性。不过要注意,整个执行的过程实际是异步,需要我们调用
Channel#waitForConfirms()
方法,同步阻塞等待 RabbitMQ Broker 确认消息的“响应”。也因此,Publisher 采用 Confirm 模式时,有三种编程方式:
- 【同步】普通 Confirm 模式:Publisher 每发送一条消息后,调用
Channel#waitForConfirms()
方法,等待服务器端 Confirm。
- 【同步】批量 Confirm 模式:Publisher 每发送一批消息后,调用
Channel#waitForConfirms()
方法,等待服务器端 Confirm。
本质上,和「普通 Confirm 模式」是一样的。
- 【异步】异步 Confirm 模式:Publisher 提供一个回调方法,RabbitMQ Broker 在 Confirm 了一条或者多条消息后,Publisher 会回调这个方法。
更多关于 Producer 的 Confirm 模式的内容,胖友可以阅读如下的文章:
- 《Consumer Acknowledgements and Publisher Confirms》 的生产者部分的内容,对应中文翻译为 《消费者应答(ACK)和发布者确认(Confirm)》 。
在 Spring-AMQP 中,在 ConfirmType 中,定义了三种消息确认的方式:
在上述的示例中,我们都采用了 Spring-AMQP 默认的
NONE
模式。下面,我们来搭建两个示例:- 在「同步 Confirm 模式」 中,我们会使用
SIMPLE
类型,实现同步的 Confirm 模式。
注意:这里的同步,指的是我们通过调用Channel#waitForConfirms()
方法,同步阻塞等待 RabbitMQ Broker 确认消息的“响应”。
- 在「异步 Confirm 模式」 中,我们会使用
CORRELATED
类型,使用异步的 Confirm 模式。
同步 Confirm 模式
配置文件
- 新增
spring.rabbitmq.publisher-confirm-type=simple
配置项,设置 Confirm 类型为ConfirmType.SIMPLE
。
在该类型下,Spring-AMQP 在创建完 RabbitMQ Channel 之后,会自动调用
Channel#confirmSelect()
方法,将 Channel 设置为 Confirm 模式。DemoPublisher
在 RabbitTemplate 提供的 API 方法中,如果 Producer 要使用同步的 Confirm 模式,需要调用
#invoke(action, acks, nacks)
方法。代码如下:- 因为 Confirm 模式需要基于相同 Channel ,所以我们需要使用该方法。
- 在方法参数
action
中,我们可以自定义操作。
- 在方法参数
acks
中,定义接收到 RabbitMQ Broker 的成功“响应”时的成回调。
- 在方法参数
nacks
中,定义接收到 RabbitMQ Broker 的失败“响应”时的成回调。
当消息最终得到确认之后,生产者应用便可以通过回调方法来处理该确认消息。如果 RabbitMQ 因为自身内部错误导致消息丢失,就会发送一条 nack 消息,生产者应用程序同样可以在回调方法中处理该 nack 消息。
测试
输出日志如下:
异步 Confirm 模式
配置文件
- 新增
spring.rabbitmq.publisher-confirm-type=correlated
配置项,设置 Confirm 类型为ConfirmType.CORRELATED
。
在该类型下,Spring-AMQP 在创建完 RabbitMQ Channel 之后,也会自动调用
Channel#confirmSelect()
方法,将 Channel 设置为 Confirm 模式。RabbitPublisherConfirmCallback
实现 RabbitTemplate.ConfirmCallback 接口,提供 Producer 收到 RabbitMQ 确认消息的“响应”的回调。
- 在构造方法中,把自己设置到 RabbitTemplate 中,作为 Confirm 的回调。
- 在
#confirm(...)
方法中,根据是否ack
成功,打印不同的日志。
DemoPublisher
测试
执行
#testSyncSend()
单元测试,输出日志如下:ReturnCallback
当 Producer 成功发送消息到 RabbitMQ Broker 时,但是在通过 Exchange 进行匹配不到 Queue 时,Broker 会将该消息回退给 Producer 。
RabbitPublisherReturnCallback
实现 RabbitTemplate.ReturnCallback 接口,提供 Producer 收到 RabbitMQ Broker 回退消息的的回调
- 在构造方法中,把自己设置到 RabbitTemplate 中,作为消息 Return 的回调。
- 在
#returnedMessage(...)
方法中,打印错误日志。当然,具体怎么处理,可以根据实际需求实现。
DemoPublisher
增加一个发送无法匹配到 Queue 的消息的方法
测试
执行
#testSyncSendReturn()
单元测试,输出日志如下:RPC 远程调用
在 RabbitMQ 中,我们可以通过 Direct Reply-to 特性,实现 RPC 远程调用的功能。
DemoPublisher
主要有两点说明:
- 创建 CorrelationData 对象,使用 UUID 作为唯一标识。
- 调用
RabbitTemplate#convertSendAndReceive(exchange, routingKey, message, correlationData)
方法,Producer 发送消息,并等待结果。该结果,是 Consumer 消费消息,返回的结果。
DemoConsumer
设置队列里的消息无需持久化,毕竟 RPC 是个瞬态操作
测试
执行
#testSyncSend()
单元测试,输出日志如下:消费异常处理器
在「消费重试」中,我们一起看了下,在 Consumer 消费异常时,Spring-AMQP 提供的消费重试机制。除此之外,在 Spring-AMQP 中可以自定义消费异常时的处理器。目前有两个接口,可以实现对 Consumer 消费异常的处理:
下面,我们来搭建一个 RabbitListenerErrorHandler 和 ErrorHandler 的使用示例。
RabbitListenerErrorHandlerImpl
- 实现 RabbitListenerErrorHandler 接口。
- 在类上,添加
@Component
注解,并设置其 Bean 名为"rabbitListenerErrorHandler"
。稍后会使用到该 Bean 名字。
- 在
#handleError(...)
方法中,先打印异常日志,并继续抛出 ListenerExecutionFailedException 异常。要注意,如果此时我们不继续抛出异常,而是return
结果,意味着 Consumer 消息成功。如果我们结合「消费重试」一起使用的时候,一定要继续抛出该异常,否则消费重试机制将失效。
RabbitLoggingErrorHandler
- 在构造方法中,把自己设置到 SimpleRabbitListenerContainerFactory 中,作为其 ErrorHandler 异常处理器。
- 在
#handleError(...)
方法中,打印错误日志。当然,具体怎么处理,可以根据自己的需要。
- 在执行顺序上,
RabbitListenerErrorHandler
先于ErrorHandler
执行。不过这个需要建立在一个前提上,RabbitListenerErrorHandler 需要继续抛出异常。
- 另外,
RabbitListenerErrorHandler
在每个@RabbitListener
注解上,需要每个手动设置下errorHandler
属性。而ErrorHandler
是相对全局的,所有SimpleRabbitListenerContainerFactory
创建的SimpleMessageListenerContainer
都会生效。
- 具体选择
ErrorHandler
还是RabbitLoggingErrorHandler
,暂时没有答案。不过个人感觉,如果不需要对 Consumer 消费的结果(包括成功和异常)做进一步处理,还是考虑ErrorHandler
即可。在ErrorHandler
中,我们可以通过判断 Throwable 异常是不是ListenerExecutionFailedException
异常,从而拿到Message
相关的信息。
DemoConsumer
- 在
@RabbitListener
注解上,我们通过设置errorHandler
属性为「RabbitListenerErrorHandlerImpl」的名字。
- 在
#onMessage(...)
方法中,我们通过抛出 RuntimeException 异常,模拟消费异常。
测试
执行
#testSyncSend()
单元测试,输出日志如下:参考
- 作者:Frank
- 链接:https://blog.franksteven.me//article/rabbitmq2
- 声明:本文采用 CC BY-NC-SA 4.0 许可协议,转载请注明出处。