RabbitMQ
RabbitMQ架构设计

生产者(Producer): 生产者是消息的发送方,负责产生并发送消息到 RabbitMQ。生产者通常将消息发送到交换机(Exchange)。
交换机(Exchange): 交换机是消息的分发中心,负责将接收到的消息路由到一个或多个队列。它定义了消息的传递规则,可以根据规则将消息发送到一个或多个队列。
直连交换机(Direct Exchange): 将消息路由到与消息中的路由键(Routing Key)完全匹配的队列。
主题交换机(Topic Exchange): 根据通配符匹配路由键,将消息路由到一个或多个队列。
扇出交换机(Fanout Exchange): 将消息广播到所有与交换机绑定的队列,忽略路由键。
头部交换机(Headers Exchange): 根据消息头中的属性进行匹配,将消息路由到与消息头匹配的队列。
队列(Queue): 队列是消息的存储区,用于存储生产者发送的消息。消息最终会被消费者从队列中取出并处理。每个队列都有一个名称,并且可以绑定到一个或多个交换机。
消费者(Consumer): 消费者是消息的接收方,负责从队列中获取消息并进行处理。消费者通过订阅队列来接收消息。
绑定(Binding): 绑定是交换机和队列之间的关联关系。生产者将消息发送到交换机,而队列通过绑定与交换机关联,从而接收到消息。
虚拟主机(Virtual Host): 虚拟主机是 RabbitMQ 的基本工作单元,每个虚拟主机拥有自己独立的用户、权限、交换机、队列等资源,完全隔离于其他虚拟主机。
连接(Connection): 连接是指生产者、消费者与 RabbitMQ 之间的网络连接。每个连接可以包含多个信道(Channel),每个信道是一个独立的会话通道,可以进行独立的消息传递。
消息: 消息是生产者和消费者之间传递的数据单元。消息通常包含消息体和可选的属性,如路由键等。
RabbitMQ交换机类型
direct(直连交换机)
作为默认交换机。路由键与队列名完全匹配交换机,此种类型交换机,通过RoutingKey路由键将交换机和队列进行绑定, 消息被发送到exchange时,需要根据消息的RoutingKey,来进行匹配,只将消息发送到完全匹配到此RoutingKey的队列。 比如:如果一个队列绑定到交换机要求路由键为“key”,则只转发RoutingKey标记为“key”的消息,不会转发"key1",也不会转发“key.1”等等。它是完全匹配、单播的模式

fanout(扇型交换机,广播)
Fanout,扇出类型交换机,此种交换机,会将消息分发给所有绑定了此交换机的队列,此时RoutingKey参数无效。 fanout类型交换机下发送消息一条,无论RoutingKey是什么,直接广播消息,queue1,queue2,queue3,queue4都可以收到消息。

topic(主题交换机)
Topic,主题类型交换机,此种交换机与Direct类似,也是需要通过routingkey路由键进行匹配分发,区别在于Topic可以进行模糊匹配,Direct是完全匹配。
Topic中,将routingkey通过"."来分为多个部分
"*":代表一个部分
"#":代表0个或多个部分(如果绑定的路由键为 "#" 时,则接受所有消息,因为路由键所有都匹配)

headers(头部交换机)
headers 匹配 AMQP 消息的 header 而不是路由键,此外 headers 交换器和 direct 交换器完全一致,但性能差很多,目前几乎用不到了 消费方指定的headers中必须包含一个"x-match"的键。 键"x-match"的值有2个
x-match = all :表示所有的键值对都匹配才能接受到消息
x-match = any :表示只要有键值对匹配就能接受到消息

RabbitMQ如何保证消息不丢失
整个流程中可能会出现三种消息丢失场景:
生产者发送消息到 RabbitMQ 服务器的过程中出现消息丢失。 可能是网络波动未收到消息,又或者是服务器宕机。
RabbitMQ 服务器消息持久化出现消息丢失。 消息发送到 RabbitMQ 之后,未能及时存储完成持久化,RabbitMQ 服务器出现宕机重启,消息出现丢失。
消费者拉取消息过程以及拿到消息后出现消息丢失。 消费者从 RabbitMQ 服务器获取到消息过程出现网络波动等问题可能出现消息丢失;消费者拿到消息后但是消费者未能正常消费,导致丢失,可能是消费者出现处理异常又或者是消费者宕机。
针对上述三种消息丢失场景,RabbitMQ 提供了相应的解决方案,confirm 消息确认机制(生产者),消息持久化机制(RabbitMQ 服务),ACK 事务机制(消费者)
confirm 消息确认机制(生产者)
Confirm 模式是 RabbitMQ 提供的一种消息可靠性保障机制。当生产者通过 Confirm 模式发送消息时,它会等待 RabbitMQ 的确认,确保消息已经被正确地投递到了指定的 Exchange 中。 消息正确投递到 queue 时,会返回 ack。 消息没有正确投递到 queue 时,会返回 nack。如果 exchange 没有绑定 queue,也会出现消息丢失。
生产者通过 confirm.select 方法将 Channel 设置为 Confirm 模式。
发送消息后,通过添加 add_confirm_listener 方法,监听消息的确认状态。
1.开启消息确认机制
spring:
rabbitmq:
# 开启消息确认机制
publisher-confirms: true
# 消息在未被队列收到的情况下返回
#publisher-returns: true
#publisher-confirm-type: correlated
2.消息未接收时调用ReturnCallback
rabbitTemplate.setMandatory(true);
3.生产者投递消息
@Service
public class ConfirmProvider implements RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnCallback {
@Autowired
RabbitTemplate rabbitTemplate;
@PostConstruct
public void init() {
rabbitTemplate.setReturnCallback(this);
rabbitTemplate.setConfirmCallback(this);
}
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
if(ack){
System.out.println("确认了这条消息:"+correlationData);
}else{
System.out.println("确认失败了:"+correlationData+";出现异常:"+cause);
}
}
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
System.out.println("这条消息发送失败了"+message+",请处理");
}
public void publisMessage(String message){
rabbitTemplate.setMandatory(true);
rabbitTemplate.convertAndSend("javatrip",message);
}
}
4.如果消息确认失败后,我们可以进行消息补偿,也就是消息的重试机制。当未收到确认信息时进行消息的重新投递。设置如下配置即可完成。
spring:
rabbitmq:
# 支持消息发送失败后重返队列
publisher-returns: true
# 开启消息确认机制
# publisher-confirm-type: correlated
listener:
simple:
retry:
# 开启重试
enabled: true
# 最大重试次数
max-attempts: 5
# 重试时间间隔
消息持久化机制(RabbitMQ 服务)
持久化机制是指将消息存储到磁盘,以保证在 RabbitMQ 服务器宕机或重启时,消息不会丢失。
生产者通过将消息的 delivery_mode 属性设置为 2(SpringBoot中deliveryMode默认为MessageDeliveryMode.PERSISTENT 等于2),将消息标记为持久化。
队列也需要进行持久化设置,确保队列在 RabbitMQ 服务器重启后仍然存在。需要将durable属性设置为true,且需配合autoDelete设置为false
持久化机制会影响性能,因此在需要确保消息不丢失的场景下使用。
@Queue(value = "TianMingQ",durable = "true",autoDelete = "false") //或者注入时
return new Queue("TianMingQ", true, false, false, map);
ACK 事务机制(消费者)
ACK 事务机制用于确保消息被正确消费。当消息被消费者成功处理后,消费者发送确认(ACK)给 RabbitMQ,告知消息可以被移除。这个过程是自动处理的,也可以关闭进行手工发送 ACK。
在 RabbitMQ 中,ACK 机制默认是开启的。当消息被消费者接收后,会立即从队列中删除,除非消费者发生异常。
可以手动开启 ACK 机制,通过将 auto_ack 参数设置为 False,手动控制消息的 ACK (acknowledge-mode: manual)。
1.修改yml为手动签收模式
spring:
rabbitmq:
listener:
simple:
# 手动签收模式
acknowledge-mode: manual
# 每次签收一条消息
prefetch: 1
2.消费者手动签收
@Component
@RabbitListener(queuesToDeclare = @Queue(value = "TianMingQ", durable = "true"))
public class SecondConsumer {
@RabbitHandler
public void receive(String message, @Headers Map<String,Object> headers, Channel channel) throws Exception{
System.out.println(message);
// 唯一的消息ID
Long deliverTag = (Long) headers.get(AmqpHeaders.DELIVERY_TAG);
// 确认该条消息
if(...){
channel.basicAck(deliverTag,false);
}else{
// 消费失败,消息重返队列
channel.basicNack(deliverTag,false,true);
}
}
RabbitMQ中如何解决消息堆积问题
消费者处理消息的速度太慢
增加消费者数量:通过水平扩展,增加消费者的数量来提高处理能力。
优化消费者性能:提高消费者处理消息的效率,例如优化代码、增加资源。
消息预取限制(prefetch count):调整消费者的预取数量以避免一次处理过多消息而导致处理缓慢。
队列的容量太小
增加队列的容量:调整队列设置以允许更多消息存储。
网络故障
监控和告警:通过监控网络状况并设置告警,确保在网络故障时快速发现并解决问题。
持久化和高可用性:确保消息和队列的持久化以避免消息丢失,并使用镜像队列提高可用性。
消费者故障
使用死信队列:将无法处理的消息转移到死信队列,防止堵塞主队列。
容错机制:实现消费者的自动重启和错误处理逻辑。
队列配置不当
优化队列配置:检查并优化消息确认模式、队列长度限制和其他相关配置。
消息大小
消息分片:将大型消息分割成小的消息片段,加快处理速度。
业务逻辑复杂或耗时
优化业务逻辑:简化消费者中的业务逻辑,减少处理每个消息所需的时间。
消息产生速度快于消费速度
使用消息限流:控制消息的生产速度,确保它不会超过消费者的处理能力。
负载均衡:确保消息在消费者之间公平分配,避免个别消费者过载。
其他配置优化
消息优先级:使用消息优先级确保高优先级消息优先处理。
调整RabbitMQ配置:优化RabbitMQ服务的配置,如文件描述符限制、内存使用限制等。
RabbitMQ中如何保证消息不被重复消费
什么情况会导致消息被重复消费
生产者:生产者可能会重复推送一条数据到 MQ 中,比如 Controller 接口被重复调用了 2 次,没有做接口幂等性导致的;
MQ:在消费者消费完准备响应 ack 消息消费成功时,MQ 突然挂了,导致 MQ 以为消费者还未消费该条数据,MQ 恢复后再次推送了该条消息,导致了重复消费。
消费者:消费者已经消费完消息,正准备但是还未响应给ack消息到时,此时消费者挂了,服务重启后 MQ 以为消费者还没有消费该消息,再次推送了该条消息。
解决方案
使用乐观锁 假设是更新订单状态,在发送的消息的时候带上修改字段的版本号
简单的消息去重,插入消费记录,增加数据库判断
这个消费者的代码执行需要1秒,重复消息在执行期间(假设100毫秒)内到达(例如生产者快速重发,Broker重启等),增加校验的地方是不是还是没数据(因为上一条消息还没消费完,没有记录)
那么就会穿透掉检查的挡板,最后导致重复的消息消费逻辑进入到非幂等安全的业务代码中,从而引发重复消费的问题
最后更新于
这有帮助吗?