消息队列
一、RabbitMQ 在项目中的使用
1.1 基本配置
1.1.1 两个交换机配置
(1) 订单实际消费队列(死信队列)绑定的交换机。也即,延迟队列中的消息过期后,会经该交换机转发到死信队列中去:
DirectExchange orderDirect() { |
directExchange 代表这是一个直连交换机,名称从枚举类 QueueEnum 中获取(按顺序分别是交换机名称,队列名称,路由键):
QUEUE_ORDER_CANCEL("mall.order.direct", "mall.order.cancel", "mall.order.cancel"); |
durable(true) 表示该交换机是持久化的,即 RabbitMQ 重启后交换机依然存在。
(2) 订单延迟队列所绑定的交换机。也即,订单生成后会生成一个消息发送到该延迟队列:
|
1.1.2 两个消息队列配置
(1) 订单实际消费队列(死信队列)
|
(2) 订单延迟队列
|
withArgument 方法设置队列的参数:
- x-dead-letter-exchange:指定当消息成为死信时,将其转发到的交换机,这里是订单实际消费队列(死信队列)所绑定的交换机。
- x-dead-letter-routing-key:指定消息成为死信后转发时使用的路由键。
1.1.3 队列绑定到交换机
(1) 订单队列(死信队列)绑定到交换机
|
BindingBuilder.bind 方法将订单实际消费队列绑定到订单消息实际消费队列所绑定的交换机上。with 方法指定绑定的路由键。
(2) 订单延迟队列绑定到交换机
|
1.2 生产者和消费者
二、RabbitMQ 相关场景题目
2.1 基础场景题
2.1.1 交换机和队列之间的关系是什么?
DirectExchange(直连交换机):是 RabbitMQ 中的一种交换机类型,它根据消息的路由键将消息路由到与之绑定的队列。直连交换机的路由规则是简单的键值匹配,当消息的路由键与绑定键相匹配时,消息就会被发送到对应的队列。
Queue(队列):是 RabbitMQ 中存储消息的地方,消费者从队列中获取消息进行处理。
二者的关系:队列通过绑定键(路由键)与直连交换机进行绑定,当消息被发送到交换机时,交换机会根据消息的路由键找到与之匹配的绑定队列,并将消息发送到该队列。
2.1.2 x-dead-letter-exchange 和 x-dead-letter-routing-key 的作用是什么?
x-dead-letter-exchange:这是队列的一个参数,用于指定当队列中的消息成为死信时,将其转发到的交换机(死信交换机)。死信是由于消息过期、队列达到最大长度、消息被拒绝等原因产生的。
x-dead-letter-routing-key:当消息成为死信并被转发到指定的交换机时,使用该路由键来确定消息最终要被路由到哪个队列。
2.2 高并发高可用多线程场景题
2.2.1 在高并发场景下,如何确保消息队列的性能和可靠性?
1.性能优化
- 增加队列数量:可以创建多个订单实际消费队列和延迟队列,将消息均匀地分发到这些队列中,避免单个队列成为性能瓶颈。
- 批量消费:消费者可以采用批量消费的方式,一次性处理多个消息,减少与 RabbitMQ 的交互次数,提高消费效率。
2.可靠性优化 - 消息确认机制:使用消息确认机制,确保消息被消费者成功处理后才被标记为已消费,避免消息丢失。
- 备份交换机:为交换机配置备份交换机,当消息无法被路由到任何队列时,将消息发送到备份交换机,避免消息丢失。
2.2.2 如何处理 RabbitMQ 节点故障以保证高可用性?
1.集群部署:将 RabbitMQ 部署为集群,通过多个节点来提供服务。当某个节点出现故障时,其他节点可以继续提供服务,保证系统的可用性。
2.镜像队列:使用镜像队列将队列的消息复制到多个节点上,当一个节点出现故障时,其他节点上的镜像队列可以继续提供服务,确保消息不会丢失。
3.负载均衡:在集群前面添加负载均衡器,将客户端的请求均匀地分发到各个节点上,提高系统的性能和可用性。
2.2.3 在代码中是否可以使用多线程来消费消息队列中的消息?
可以。通过 @RabbitListener 注解指定了要消费的队列和线程池。concurrency 参数指定了并发消费的线程数。
|
- 当 concurrency 小于等于核心线程数:线程池会使用核心线程来执行这些并发的消费者任务。例如,线程池核心线程数为 5,concurrency = “3”,那么线程池会从 5 个核心线程中选取 3 个来执行消费者任务。
- 当 concurrency 大于核心线程数但小于等于最大线程数:如果核心线程都在忙碌,线程池会创建新的线程来满足 concurrency 的要求。比如核心线程数为 5,最大线程数为 10,concurrency = “8”,线程池会先使用 5 个核心线程,然后再创建 3 个新线程来执行剩下的消费者任务。
- 当 concurrency 大于最大线程数:此时线程池无法满足 concurrency 的要求,会根据线程池的拒绝策略处理多余的任务。例如,核心线程数为 5,最大线程数为 10,concurrency = “15”,线程池最多只能创建 10 个线程来执行消费者任务,剩下的 5 个任务可能会被拒绝或者放入队列等待。
