RabbitMQ 进阶
消息的可靠性
首先,我们分析消息可能丢失的环节:
- 生产者发送阶段:连接 MQ 失败;消息发送后找不到对应 Exchange;Exchange 无法路由到目标 Queue。
- MQ 自身阶段:消息已存入队列,但 MQ 宕机导致数据丢失。
- 消费者处理阶段:消费者接收消息后宕机,未开始处理,或者说处理过程中发生异常,未能正常完成业务。
所以,要确保消息可靠投递,需从三方面着手:
- 保证生产者将消息成功送达 MQ。
- 保证 MQ 自身不丢失消息。
- 保证消费者正确消费并处理消息。
生产者的可靠性
生产者重试机制
在生产者发送消息阶段,由于网络故障导致的连接 MQ 失败,可以通过重试机制提高送达成功率。SpringAMQP 支持为 RabbitTemplate 配置发送重试,在连接超时后自动进行多次尝试。
在生产者 application.yml 中增加配置:
spring:
rabbitmq:
# 设置MQ的连接超时时间
connection-timeout: 2s
template:
retry:
# 开启超时重试机制
enabled: true
# 失败后的初始等待时间
initial-interval: 2000ms
# 失败后下次的等待时长倍数,下次等待时长 = initial-interval * multiplier
multiplier: 1
# 最大重试次数
max-attempts: 3停掉 RabbitMQ 服务后测试消息发送,观察日志,将会触发重试:每间隔 2 秒重试一次,最多重试 3 次,大致日志如下。
2026-01-26T17:17:28.741+08:00 INFO 29260 --- [io-18080-exec-1] o.s.a.r.c.CachingConnectionFactory : Attempting to connect to: [127.0.0.1:5672]
2026-01-26T17:17:30.757+08:00 INFO 29260 --- [io-18080-exec-1] o.s.a.r.c.CachingConnectionFactory : Attempting to connect to: [127.0.0.1:5672]
2026-01-26T17:17:32.768+08:00 INFO 29260 --- [io-18080-exec-1] o.s.a.r.c.CachingConnectionFactory : Attempting to connect to: [127.0.0.1:5672]注意:该重试机制是同步阻塞的,重试期间会阻塞当前线程,建议关闭重试或合理设置等待时间与重试次数;并且网络故障统通常也不是短时间自动恢复的。
生产者消息确认
在大多数网络连接正常的情况下,生产者发送消息到 RabbitMQ 的消息丢失概率较低,但在少数场景中,消息仍可能因为 MQ 内部处理异常或路由问题而丢失,例如:
- MQ 内部处理消息的进程发生异常
- 生产者发送消息到 MQ 后未找到对应的 Exchange
- 消息到达 Exchange 后无法路由到合适的 Queue
为了应对这些情况,RabbitMQ 提供了生产者消息确认机制,主要包括两种形式:
- Publisher Confirm:确认消息是否成功到达 Exchange(即MQ是否成功接收到消息)。
- Publisher Return:确认消息是否从 Exchange 成功路由到 Queue。如果消息因路由失败无法进入队列,MQ会通过 Return 机制将消息返回给生产者。
在入门中已经知道 Publisher Confirm 有 3 种确认方式而没有提过 Publisher Return,接下来看看在 SpringBoot 中怎么实现。
在生产者服务 application.yml 中增加配置:
spring:
rabbitmq:
# 开启 publisher confirm 机制,并设置 confirm 类型
publisher-confirm-type: correlated
# 开启 publisher return 机制
publisher-returns: true在生产者服务 RabbitMQConfig.java 增加配置。
@Bean
public RabbitTemplate rabbitTemplate(CachingConnectionFactory connectionFactory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
// 设置 JSON 消息转换器
rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
// 设置开启Mandatory, 才能触发回调函数, 无论消息推送结果怎么样都强制调用回调函数
rabbitTemplate.setMandatory(true);
// 交换机收到消息回调
rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
log.info("ConfirmCallback: correlationData({}),ack({}),cause({})", correlationData, ack, cause);
// 如果需要具体消息可以自己扩展,比如继承 CorrelationData 存储更多信息
});
// 队列收到消息回调, 如果失败的话会进行 returnCallback 的回调处理, 反之成功就不会回调
rabbitTemplate.setReturnsCallback(returned -> {
log.info("returnCallback: 消息:{}", returned.getMessage());
log.info("returnCallback: 回应码:{}", returned.getReplyCode());
log.info("returnCallback: 回应信息:{}", returned.getReplyText());
log.info("returnCallback: 交换机:{}", returned.getExchange());
log.info("returnCallback: 路由键:{}", returned.getRoutingKey());
// 其他兜底逻辑
});
return rabbitTemplate;
}测试 Publisher Confirm 只需要发送消息到一个不存在的交换机就可以看到效果;测试 Publisher Return 只需要发送消息到一个存在的交换机,但是路由 key 路由不到队列就可以看到效果。
开启生产者确认比较消耗 MQ 性能,需要根据实际业务场景进行权衡,除了订单业务等一定不能丢失的场景,通常情况如果要开启消费者确认的话,开启 Publisher Confirm 而不开启 Publisher Return。
我们可以分析一下触发消息确认的几种典型场景:
- 交换机不存在:属于代码层面的配置问题,属于可预防的编程错误。
- 路由失败:通常是由于 RoutingKey 配置错误导致,这类问题往往能在开发测试阶段发现。
- MQ内部异常:虽然需要容错处理,但实际发生概率相对较低,如果存在开发也修改不了不了,要么是版本 BUG,要么安装姿势有问题。
MQ 的可靠性
持久化
RabbitMQ 为了追求更高的性能,默认情况下 MQ 的数据都是在内存存储的临时数据,重启后就会消失。因此,要构建一个真正可靠的消息系统,我们必须主动配置持久化,将关键数据落盘。持久化主要包含三个层面:交换机持久化、队列持久化、消息持久化,前面章节已经说明过,不再赘述怎么配置持久化。
惰性队列
在默认情况下,RabbitMQ 会将接收到的消息保存在内存中,以此来降低消息收发的延迟。但在某些场景下,这种做法容易引发消息积压问题,例如:
- 消费者服务宕机或出现网络故障
- 消息生产速率突然激增,超过消费者处理能力
- 消费者业务处理发生阻塞
一旦消息开始堆积,RabbitMQ 的内存占用会持续升高,直至触发内存预警阈值。此时,RabbitMQ 会将内存中的消息刷入磁盘,这个动作被称为 PageOut。PageOut 不仅本身耗时较长,还会阻塞对应队列的进程,在此期间 RabbitMQ 将无法处理新消息,导致生产者的请求全部被阻塞。
为解决这一问题,RabbitMQ 自 3.6.0 版本起引入了 Lazy Queue(惰性队列) 模式。惰性队列的核心特点如下:
- 接收到消息后直接持久化至磁盘,而非内存
- 当消费者准备消费消息时,才从磁盘读取并加载到内存(实现懒加载)
- 能够支持数百万甚至更多消息的存储
从 3.12 版本开始,Lazy Queue 已成为所有队列的默认存储方式。因此,官方建议将 RabbitMQ 升级至 3.12 或更高版本,或主动将现有队列设置为惰性队列模式,以从根本上避免内存堆积带来的性能阻塞问题。当然后面还出现了仲裁队列来解决这个问题,后面有机会再详解其他新出现的特性。
需要配置队列为惰性队列,仅需要增加一个参数 "x-queue-mode" = "lazy"。
@Bean
public Queue smsQueue() {
Map<String, Object> arguments = new HashMap<>(2);
arguments.put("x-queue-mode", "lazy");
return new Queue(RabbitConstant.SMS_QUEUE_NAME, true, false, false, arguments);
}删除队列后重新启动项目发送消息,在控制台可以看到已经设置为惰性多列了。

消费者的可靠性
在 RabbitMQ 中,消息成功投递给消费者并不代表已被正确处理。网络异常、消费者宕机或业务处理失败等都可能导致消息丢失。为了保证消息被可靠消费,RabbitMQ 需要一种机制来获知消费者的处理状态,并在消费失败时能够重新投递消息,在前面的入门中已经通过手动确认的方式来处理消费异常的情况,这里看下在 springboot 中有那些处理方式。
消费者确认
从入门篇中,我们知道当消费者处理完消息后,必须向 RabbitMQ 发送一个明确回执,告知处理结果。回执分为三种类型:
- ack:确认成功,RabbitMQ 将该消息从队列删除。
- nack:确认失败,要求 RabbitMQ 重新投递消息。
- reject:拒绝消息,通常表示消息无法处理,将从队列直接删除。
在实际开发中,通常通过 try-catch 捕获业务异常:处理成功时返回 ack,失败则返回 nack。SpringAMQP 对该机制进行了封装,支持以下确认模式:
- none:不处理回执,消息投递后自动确认删除(存在丢失风险,不推荐)。
- manual:手动模式,在代码中显式调用 API 发送回执(灵活但侵入业务)。
- auto:自动模式(默认),基于 AOP 环绕处理:
- 业务正常执行 → 自动 ack
- 发生业务异常 → 自动 nack(重新入队)
- 发生消息格式等错误 → 自动 reject(丢弃消息)
在正常开发中,推荐 auto 配合 try-catch 使用,失败后记录日志统一补偿。消费者项目 application.yml 增加配置如下,具体效果可以自己手动写个异常查看效果。
spring:
rabbitmq:
listener:
simple:
# 自动确认模式
acknowledge-mode: auto幂等消费
幂等消费的思路非常简单,借助唯一消息id + 中间件来实现,比如 MySQL 中建立一个消息 id 表建立唯一主键,插入不进去就是消费过了;判断 Redis 是否存在唯一消息 id,不存在写一个带消息 id 时效的 key。唯一消息 id 实现思路在入门中已经涉及,不再赘述。
消息消费顺序性
需要保证顺序性有两种方案。版本 3.8 之前使用单消费者(也就是只有一个服务实例当消费者)+ 每次只取一条消息;3.8+ 使用 Single Active Consumer,此方案可以多实例,如果活跃消费者下线,其他等待的消费者会接手消费,当然单消费者也可以启动时通过参数控制某个服务为监听者实现类似的效果。
单消费者方案
消费者项目 application.yml 增加配置如下:
spring:
rabbitmq:
listener:
simple:
# 一次只消费一条
prefetch: 1
# 固定一个消费者
concurrency: 1
# 最大一个消费者
max-concurrency: 1Single Active Consumer 方案
消费者项目 application.yml 增加配置如下:
spring:
rabbitmq:
listener:
simple:
# 一次只消费一条
prefetch: 1定义队列的时候,增加参数 "x-single-active-consumer"=true,例如:
@Configuration
public class SACConfig {
@Bean
public Queue sacQueue() {
Map<String, Object> arguments = new HashMap<>();
arguments.put("x-single-active-consumer", true);
return new Queue("sac.queue", true, false, false, arguments);
}
}启动多个消费者,在控制台查看是否只有一个消费者效果,active 被选中的就是活跃的消费者,关闭活跃的消费者实例,会发现活跃实例会自动漂移。

延迟消息
在电商系统中,处理“订单下单未支付”是一个经典问题。例如,用户下单后会立刻锁定库存,但如果长时间不付款(比如30分钟),这些库存就会被无效占用,影响其他顾客购买,导致销售损失。
因此,我们需要一个机制:在用户下单后的第30分钟,自动检查订单状态,若未支付则取消订单、释放库存。
这类“过一段时间再执行”的任务,称为延迟任务。实现延迟任务的一种高效、可靠的方案,就是利用 MQ 的延迟消息功能。
通常实现延迟任务有两种方式,一种是利用 死信交换机+TTL 实现;另外一种是利用延迟插件。两种方案各有优劣,推荐使用延迟插件实现。
死信交换机
首先,来了解一下什么是死信交换机,在消息队列中,死信是指那些无法被正常消费的消息。通常,在以下三种情况下,消息会变成死信:
- 消费被拒绝:消费者使用
basic.reject或basic.nack拒绝消息,并且不要求重新入队; - 消息过期:消息在队列中等待时间超过设定的存活时间(TTL),未被消费;
- 队列已满:消息无法投递到已满的队列中。
一旦消息成为死信,如果该队列通过 dead-letter-exchange 属性指定了交换机,死信就会被转发到该交换机,这个交换机称为死信交换机。后续,如果有队列绑定到此死信交换机,死信最终会被投递到该队列中。
死信交换机的主要作用包括:
- 收集处理失败的消息:便于后续分析或人工处理;
- 应对队列满载的情况:避免消息丢失;
- 处理超时消息:自动转移因 TTL 到期未被消费的消息。
通过死信机制,可以提高消息系统的可靠性与可维护性,确保异常消息可追踪、可管理。
利用死信交换机的机制,我们就可以实现一个延时消息:消息发送到业务队列 → 设置TTL(过期时间) → 过期后消息从业务队列转到死信交换机路由到死信队列 → 消费者从死信队列消费 = 延迟消息完成。
在 SpringBoot 中的实现,首先在 RabbitConstant.java 中增加常量配置。
// 业务队列
public static final String BUSINESS_QUEUE = "business.queue";
public static final String BUSINESS_EXCHANGE = "business.exchange";
public static final String BUSINESS_ROUTE_KEY = "business.routing.key";
// 死信队列
public static final String DLX_QUEUE = "dlx.queue";
public static final String DLX_EXCHANGE = "dlx.exchange";
public static final String DLX_ROUTE_KEY = "dlx.routing.key";生产者项目声明业务队列和死信队列。
import com.juzicoding.constant.RabbitConstant;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class DLXDelayConfig {
/**
* 业务队列(带死信配置)
*/
@Bean
public Queue businessQueue() {
Map<String, Object> args = new HashMap<>();
// 死信交换机
args.put("x-dead-letter-exchange", RabbitConstant.DLX_EXCHANGE);
// 死信路由键
args.put("x-dead-letter-routing-key", RabbitConstant.DLX_ROUTE_KEY);
// 设置队列消息TTL,单位为毫秒
// args.put("x-message-ttl", 5000);
return new Queue(RabbitConstant.BUSINESS_QUEUE, true, false, false, args);
}
/**
* 业务交换机
*/
@Bean
public DirectExchange businessExchange() {
return new DirectExchange(RabbitConstant.BUSINESS_EXCHANGE);
}
/**
* 绑定业务队列
*/
@Bean
public Binding businessBinding() {
return BindingBuilder.bind(businessQueue())
.to(businessExchange())
.with(RabbitConstant.BUSINESS_ROUTE_KEY);
}
/**
* 死信队列(延迟消费队列)
*/
@Bean
public Queue dlxQueue() {
return new Queue(RabbitConstant.DLX_QUEUE, true);
}
/**
* 死信交换机
*/
@Bean
public DirectExchange dlxExchange() {
return new DirectExchange(RabbitConstant.DLX_EXCHANGE);
}
/**
* 绑定死信队列
*/
@Bean
public Binding dlxBinding() {
return BindingBuilder.bind(dlxQueue())
.to(dlxExchange())
.with(RabbitConstant.DLX_ROUTE_KEY);
}
}发送消息和消费消息代码,启动后发送消息,观察消费者是否 5 秒后消费消息。
@PostMapping("/sendDLXDelayMsg")
private void sendDLXDelayMsg() {
MsgBO<SmsMsg> smsMsgBO = MsgBO.of(new SmsMsg("1234567890", "请尽快支付"));
rabbitTemplate.convertAndSend(
RabbitConstant.BUSINESS_EXCHANGE,
RabbitConstant.BUSINESS_ROUTE_KEY,
smsMsgBO,
msg -> {
// 设置TTL,单位:毫秒
msg.getMessageProperties().setExpiration(String.valueOf(5000));
return msg;
}
);
log.info("发送消息: {}", smsMsgBO);
} @RabbitListener(queues = RabbitConstant.DLX_QUEUE)
public void dlxQueueConsumer(MsgBO<SmsMsg> smsMsgBO) {
log.info("{} 收到消息:{}", RabbitConstant.DLX_QUEUE, smsMsgBO);
}死信交换机实现延迟消息的方式虽然原理简单且由 RabbitMQ 原生支持,但通常不推荐在生产中大规模使用,主要存在以下几个明显缺点:
- 队头阻塞问题:延迟时间由队列的 TTL(Time-To-Live)统一设置,导致所有消息的延迟时间必须相同。如果先后放入两条消息,第一条延迟 10 秒,第二条延迟 5 秒,由于队列先进先出的特性,第二条消息必须等待第一条到期后才会被判断是否过期,从而导致其实际延迟时间被拉长,无法实现精准的按消息独立延迟。
- 延迟精度有限:RabbitMQ 通过定期扫描队列来判断消息是否过期,而非实时判断,因此延迟任务的触发时间存在一定的误差,不适合对时间精度要求严格的场景。
- 资源占用较大:大量延迟消息会长时间堆积在业务队列中,占用内存与存储资源,影响队列的整体吞吐性能,尤其在延迟时间较长、消息量大的情况下,系统资源利用率较低。
总结来说,死信交换机适用于延迟时间统一、精度要求不高、消息量不大的简单场景。对于复杂、精准、高并发的延迟任务需求,更推荐使用延迟消息插件方案来实现。
延迟插件
延迟插件项目:https://github.com/rabbitmq/rabbitmq-delayed-message-exchange
下载对应版本的插件,比如我的版本是 3.11,那么就选择对应的插件版本 3.11,同时启用插件。
# 下载插件
wget https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases/download/3.11.1/rabbitmq_delayed_message_exchange-3.11.1.ez
# 复制插件到容器内
docker cp rabbitmq_delayed_message_exchange-3.11.1.ez rabbitmq:/plugins/
# 启用延迟消息插件
docker exec -it rabbitmq rabbitmq-plugins enable rabbitmq_delayed_message_exchange
# 查看已安装插件
docker exec -it rabbitmq rabbitmq-plugins list | grep delay生产者项目声明延迟交换机。
// 延迟插件实现延迟消息
public static final String PLUGIN_DELAY_QUEUE = "plugin.delay.queue";
public static final String PLUGIN_DELAY_EXCHANGE = "plugin.delay.exchange";
public static final String PLUGIN_DELAY_ROUTE_KEY = "plugin.delay.routing.key";import com.juzicoding.constant.RabbitConstant;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.CustomExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class PluginDelayConfig {
@Bean
public Queue delayQueue() {
return new Queue(RabbitConstant.PLUGIN_DELAY_QUEUE, true);
}
@Bean
public CustomExchange delayExchange() {
Map<String, Object> args = new HashMap<>();
args.put("x-delayed-type", "direct");
// 注意:交换机类型必须是 x-delayed-message
return new CustomExchange(RabbitConstant.PLUGIN_DELAY_EXCHANGE,
"x-delayed-message",
true,
false,
args);
}
@Bean
public Binding delayBinding() {
return BindingBuilder.bind(delayQueue())
.to(delayExchange())
.with(RabbitConstant.PLUGIN_DELAY_ROUTE_KEY)
.noargs();
}
}发送消息和消费消息代码,启动后发送消息,观察消费者是否 5 秒后消费消息。
@PostMapping("/sendPluginDelayMsg")
private void sendPluginDelayMsg() {
MsgBO<SmsMsg> smsMsgBO = MsgBO.of(new SmsMsg("1234567890", "请尽快支付"));
rabbitTemplate.convertAndSend(
RabbitConstant.PLUGIN_DELAY_EXCHANGE,
RabbitConstant.PLUGIN_DELAY_ROUTE_KEY,
smsMsgBO,
msg -> {
// 设置延迟时间(毫秒)
msg.getMessageProperties().setDelayLong(5000L);
return msg;
}
);
log.info("发送消息: {}", smsMsgBO);
} @RabbitListener(queues = RabbitConstant.PLUGIN_DELAY_QUEUE)
public void pluginQueueConsumer(MsgBO<SmsMsg> smsMsgBO) {
log.info("{} 收到消息:{}", RabbitConstant.PLUGIN_DELAY_QUEUE, smsMsgBO);
}两种方案对比
| 特性 | 死信队列 | 延迟插件 |
|---|---|---|
| 精确度 | 低(秒级) | 高(毫秒级) |
| 灵活性 | 低(队列级别TTL) | 高(消息级别延迟) |
| 性能 | 一般 | 优秀 |
| 安装复杂度 | 无需安装 | 需要安装插件 |
| 队头阻塞 | 存在 | 不存在 |
| 资源占用 | 高 | 低 |