RabbitMQ 入门
相关概念

AMQP协议
AMQP 是应用层协议的一个开放标准,为面向消息的中间件设计。基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件不同产品,不同的开发语言等条件的限制。目标是实现一种在全行业广泛使用的标准消息中间件技术,以便降低企业和系统集成的开销,并且向大众提供工业级的集成服务,主要实现有 RabbitMQ 。
生产者、消费者、消息
- 生产者:消息的创建者,发送到 RabbitMQ 。
- 消费者:连接到 RabbitMQ,订阅到队列上,消费消息。
- 消息:包含有效载荷和标签,有效载荷指要传输的数据,标签描述了有效载荷,并且 RabbitMQ 用它来决定谁获得消息,消费者只能拿到有效载荷,并不知道生产者是谁。
连接
首先作为客户端,无论是生产者还是消费者,你如果要与 RabbitMQ 通讯的话,你们之间必须创建一条 TCP 连接,连接在 RabbitMQ 原生客户端(5.0.0)版本中默认使用 Java 的原生 socket,但是也支持 NIO,需要手动设置修改。
信道
信道是生产者/消费者与 RabbitMQ 通信的渠道。信道是建立在 TCP 连接上的虚拟连接,什么意思呢?就是说 RabbitMQ 在一条 TCP 上建立成百上千个信道来达到多个线程处理,这个 TCP 被多个线程共享,每个线程对应一个信道,信道在 RabbitMQ 都有唯一的 ID ,保证了信道私有性,对应上唯一的线程使用。
为什么不建立多个 TCP 连接呢?
为了保证性能,系统为每个线程开辟一个 TCP 是非常消耗性能,每秒成百上千的建立销毁 TCP 会严重消耗系统,所以 RabbitMQ 选择建立多个信道(建立在 TCP 的虚拟连接)连接到 RabbitMQ 上。
虚拟主机
虚拟消息服务器,vhost,本质上就是一个 mini 版的 mq 服务器,有自己的队列、交换器和绑定,最重要的,自己的权限机制,Vhost 提供了逻辑上的分离,可以将众多客户端进行区分,又可以避免队列和交换器的命名冲突。Vhost 必须在连接时指定,RabbitMQ 包含缺省 vhost:“/”,通过缺省用户和口令 guest 进行访问。
RabbitMQ 里创建用户,必须要被指派给至少一个 vhost,并且只能访问被指派内的队列、交换器和绑定,Vhost 必须通过 RabbitMQ 的管理控制工具创建。
简单来说类似于 window 系统下的分盘,不同的盘存储不同的内容。
交换器、队列、绑定、路由键
队列通过路由键(routing key,某种确定的规则)绑定到交换器,生产者将消息发布到交换器,交换器根据绑定的路由键将消息路由到特定队列,然后由订阅这个队列的消费者进行接收(routing_key 和 绑定键 binding_key 的最大长度是 255 个字节)。
交换器类型
共有四种 Direct,Fanout,Topic,Headers,我们主要关注前3种。
Direct Exchange
直连交换器,路由键完全匹配,消息被投递到对应的队列,Direct 交换器是默认交换器,声明一个队列时,会自动绑定到默认交换器,例如 A 队列绑定路由键 routing key = juzi,那么只有路由键为 juzi 的消息可以进入A队列。
Fanout Exchange
广播交换器,消息广播到绑定的队列,不管队列绑定了什么路由键,消息经过交换器,每个队列都有一份。
Topic Exchange
主题交换器,通过路由键配置规则转发到队列,使用 * 和 # 通配符进行处理,使来自不同源头的消息到达同一个队列,. 将路由键分为了几个标识符,* 匹配 1 个,# 匹配一个或多个。
Topic Exchange 说明
- A队列绑定了路由键
routing key = juzi.*。 - B队列绑定了路由键
routing key = juzi.#。 - 那么路由键为
juzi.t1的消息会同时进入 A,B 队列。 - 路由键为
juzi.t1.t2的消息只会进入 B 队列。
安装
使用 Docker 快速安装。
docker run -d --restart=always --hostname rabbitmq-host \
--name rabbitmq \
-e RABBITMQ_DEFAULT_USER='你的用户名' \
-e RABBITMQ_DEFAULT_PASS='你的密码' \
-v /juzi/dockerData/rabbitmq:/var/lib/rabbitmq \
-p 5672:5672 -p 15672:15672 \
rabbitmq:3.11-managementjava客户端基础使用

官网介绍了以上几种队列的使用,我这里先选取几个常用的来学习一下。
创建 maven 项目,目录结构如下。
📂 rabbitmq-base
├─ 📂 src
│ └─ 📂 main
│ ├─ 📂 java
│ │ └─ 📦 com.juzicoding
│ │ ├─ 📦 config
│ │ │ └─ ☕ RabbitConstant.java
│ │ ├─ 📦 confirms
│ │ │ ├─ ☕ Producer1.java
│ │ │ ├─ ☕ Producer2.java
│ │ │ └─ ☕ Producer3.java
│ │ ├─ 📦 direct
│ │ │ ├─ ☕ Consumer.java
│ │ │ └─ ☕ Producer.java
│ │ ├─ 📦 fanout
│ │ │ ├─ ☕ Consumer1.java
│ │ │ ├─ ☕ Consumer2.java
│ │ │ └─ ☕ Producer.java
│ │ ├─ 📦 topic
│ │ │ ├─ ☕ Consumer.java
│ │ │ └─ ☕ Producer.java
│ │ └─ 📦 work
│ │ ├─ ☕ Producer.java
│ │ ├─ ☕ PullModeConsumer.java
│ │ └─ ☕ PushModeConsumer.java
│ └─ 📂 resources
└─ 📄 pom.xmlpom.xml 引入依赖,amqp-client 为我们需要连接使用 RabbitMQ 的依赖,其他的为个人使用习惯。
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<!-- 项目基本信息 -->
<groupId>com.juzicoding</groupId>
<artifactId>rabbitmq-base</artifactId>
<version>1.0-SNAPSHOT</version>
<!-- 全局属性配置 -->
<properties>
<maven.compiler.source>21</maven.compiler.source>
<maven.compiler.target>21</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<!-- 依赖管理 -->
<dependencies>
<!-- RabbitMQ Java客户端 -->
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.25.0</version>
</dependency>
<!-- 日志接口 -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>2.0.16</version>
</dependency>
<!-- 日志实现 -->
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>1.5.18</version>
<scope>compile</scope>
</dependency>
<!-- 简化代码开发 -->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.40</version>
<scope>provided</scope>
</dependency>
</dependencies>
</project>然后通用配置常量放在一个类里面。
public class RabbitConstant {
// RabbitMQ ip
public static final String HOST_NAME = "127.0.0.1";
// RabbitMQ 端口
public static final int HOST_PORT = 5672;
// RabbitMQ 账号
public static final String USER_NAME = "juzi";
// RabbitMQ 密码
public static final String PASSWORD = "juzi123456";
// 虚拟空间
public static final String VIRTUAL_HOST = "/juzi";
// Work Queues
public static final String WORK_QUEUE_NAME = "workQueue";
// Publish/Subscribe
public static final String FANOUT_QUEUE1_NAME = "fanoutQueue1";
public static final String FANOUT_QUEUE2_NAME = "fanoutQueue2";
public static final String FANOUT_EXCHANGE_NAME = "fanoutExchange";
// Direct
public static final String DIRECT_QUEUE_NAME = "directQueue";
public static final String DIRECT_EXCHANGE_NAME = "directExchange";
public static final String DIRECT_ROUTE_KEY = "directRouteKey";
// Topics
public static final String TOPIC_QUEUE_NAME = "topicQueue";
public static final String TOPIC_EXCHANGE_NAME = "topicExchange";
public static final String TOPIC_ROUTE_KEY = "juzi.*";
// Publisher Confirms
public static final String CONFIRMS_QUEUE = "confirmsQueue";
}简单来说,有以下几个步骤。
- 首先创建连接,获取 Channel。
- 声明 Exchange。
- 声明 Queue。
- 声明 Exchange 与 Queue 的绑定关系。
- Producer 发送消息到 Queue。
- Consumer 消费消息。
- 完成以后关闭连接,释放资源。
Work Queue 模型

这是 RabbitMQ 最基础,也是最常用的一种工作机制,生产者直接将消息发送到队列,然后由多个消费者消费。
消息发送
import com.juzicoding.config.RabbitConstant;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import lombok.extern.slf4j.Slf4j;
import java.nio.charset.StandardCharsets;
@Slf4j
public class Producer {
public static void main(String[] args) {
// ⾸先创建连接,获取 Channel
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(RabbitConstant.HOST_NAME);
factory.setPort(RabbitConstant.HOST_PORT);
factory.setUsername(RabbitConstant.USER_NAME);
factory.setPassword(RabbitConstant.PASSWORD);
factory.setVirtualHost(RabbitConstant.VIRTUAL_HOST);
// 创建连接,创建通道
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
// 声明队列; 参数含义: 队列名, 是否持久化, 是否排他性, 是否自动删除, 其他参数
channel.queueDeclare(RabbitConstant.WORK_QUEUE_NAME, true, false, false, null);
// 发送消息
String msg = "Hello RabbitMQ!";
// String msg = "error msg.";
channel.basicPublish("", RabbitConstant.WORK_QUEUE_NAME, null, msg.getBytes(StandardCharsets.UTF_8));
log.info("发送成功:{}", msg);
} catch (Exception e) {
log.error(e.getMessage(), e);
}
}
}消费消费
消费者消费消息有两种模式,一种是推模式(MQ 把消息推送给消费者),一种是拉模式(消费者从 MQ 拿消息),这里来看看两种模式的使用。
推模式
import com.juzicoding.config.RabbitConstant;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;
import lombok.extern.slf4j.Slf4j;
import java.nio.charset.StandardCharsets;
@Slf4j
public class PushModeConsumer {
public static void main(String[] args) {
// ⾸先创建连接,获取 Channel
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(RabbitConstant.HOST_NAME);
factory.setPort(RabbitConstant.HOST_PORT);
factory.setUsername(RabbitConstant.USER_NAME);
factory.setPassword(RabbitConstant.PASSWORD);
factory.setVirtualHost(RabbitConstant.VIRTUAL_HOST);
try {
// 创建连接,创建通道
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
// 声明队列
channel.queueDeclare(RabbitConstant.WORK_QUEUE_NAME, true, false, false, null);
// 不自动确认
boolean autoAck = false;
// 消息处理逻辑
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
long deliveryTag = delivery.getEnvelope().getDeliveryTag();
try {
log.info("收到消息: {}", message);
// 模拟处理业务逻辑
if (message.contains("error")) {
throw new RuntimeException("模拟业务异常");
}
// 手动确认; 参数含义: 消息标识, 是否批量确认(false: 只确认当前这一条消息, true: 确认所有小于等于该 deliveryTag 的消息)
channel.basicAck(deliveryTag, false);
log.info("消息手动确认");
} catch (Exception e) {
log.error(e.getMessage(), e);
// 单条消息,重新投递 channel.basicNack(deliveryTag, false, true);
// 单条消息,直接丢弃 channel.basicNack(deliveryTag, false, false);
channel.basicNack(deliveryTag, false, true);
}
};
// 启动消费者
channel.basicConsume(RabbitConstant.WORK_QUEUE_NAME, autoAck, deliverCallback, consumerTag -> {
});
} catch (Exception e) {
log.error(e.getMessage(), e);
}
}
}拉模式
import com.juzicoding.config.RabbitConstant;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.GetResponse;
import lombok.extern.slf4j.Slf4j;
import java.nio.charset.StandardCharsets;
@Slf4j
public class PullModeConsumer {
public static void main(String[] args) {
// ⾸先创建连接,获取 Channel
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(RabbitConstant.HOST_NAME);
factory.setPort(RabbitConstant.HOST_PORT);
factory.setUsername(RabbitConstant.USER_NAME);
factory.setPassword(RabbitConstant.PASSWORD);
factory.setVirtualHost(RabbitConstant.VIRTUAL_HOST);
// 创建连接,创建通道
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
// 声明队列
channel.queueDeclare(RabbitConstant.WORK_QUEUE_NAME, true, false, false, null);
// 拉取消息
GetResponse res = channel.basicGet(RabbitConstant.WORK_QUEUE_NAME, false);
if (res == null) {
log.info("没有需要消费的消息");
return;
}
String message = new String(res.getBody(), StandardCharsets.UTF_8);
log.info("收到消息: {}", message);
// 手动确认
channel.basicAck(res.getEnvelope().getDeliveryTag(), false);
} catch (Exception e) {
log.error(e.getMessage(), e);
}
}
}通常情况下推荐使用推模式,RabbitMQ 的架构更侧重于高效的“推模式”,这也是其作为异步消息传递系统的标准用法。而“拉模式”更像是一个辅助功能,用于特定场景。
| 特性 | 推模式 (basic.consume) | 拉模式 (basic.get) |
|---|---|---|
| 主动性 | RabbitMQ 主动 | 消费者主动 |
| 实时性 | 高。消息一到立即推送。 | 低。取决于轮询间隔。 |
| 性能开销 | 长连接,推送即时,效率高,是推荐方式。 | 频繁轮询会产生不必要的请求(即使队列为空),效率较低。 |
| 流量控制 | 支持好。通过 prefetch_count 参数限制未确认消息的数量,实现背压。 | 不支持。每次 get 只能获取一条,流量控制需自行在应用层实现。 |
| 连接开销 | 一个长连接,持续接收。 | 可以按需建立短连接,但通常不建议频繁开关连接。 |
| 使用场景 | 绝大多数场景,持续处理消息。 | 特殊场景,如:批处理、按需补单、管理后台手动触发等不连续的任务。 |
Fanout 交换机

Fanout 翻译过来的意思是 “扇出”,但是个人认为叫广播交换机更合适,他的作用就是将收到的消息向所有绑定到 Fanout 交换机的队列路由。
消息发送
import com.juzicoding.config.RabbitConstant;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import lombok.extern.slf4j.Slf4j;
import java.nio.charset.StandardCharsets;
@Slf4j
public class Producer {
public static void main(String[] args) {
// ⾸先创建连接,获取 Channel
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(RabbitConstant.HOST_NAME);
factory.setPort(RabbitConstant.HOST_PORT);
factory.setUsername(RabbitConstant.USER_NAME);
factory.setPassword(RabbitConstant.PASSWORD);
factory.setVirtualHost(RabbitConstant.VIRTUAL_HOST);
// 创建连接,创建通道
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
// 声明 fanout 交换机
channel.exchangeDeclare(RabbitConstant.FANOUT_EXCHANGE_NAME, "fanout");
// 发送消息
String msg = "fanout err msg!";
channel.basicPublish(RabbitConstant.FANOUT_EXCHANGE_NAME, "", null, msg.getBytes(StandardCharsets.UTF_8));
log.info("发送成功:{}", msg);
} catch (Exception e) {
log.error(e.getMessage(), e);
}
}
}消费消费
消费者1
import com.juzicoding.config.RabbitConstant;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;
import lombok.extern.slf4j.Slf4j;
import java.nio.charset.StandardCharsets;
@Slf4j
public class Consumer1 {
public static void main(String[] args) {
// ⾸先创建连接,获取 Channel
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(RabbitConstant.HOST_NAME);
factory.setPort(RabbitConstant.HOST_PORT);
factory.setUsername(RabbitConstant.USER_NAME);
factory.setPassword(RabbitConstant.PASSWORD);
factory.setVirtualHost(RabbitConstant.VIRTUAL_HOST);
try {
// 创建连接,创建通道
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
// 声明交换机
channel.exchangeDeclare(RabbitConstant.FANOUT_EXCHANGE_NAME, "fanout");
// 声明队列
channel.queueDeclare(RabbitConstant.FANOUT_QUEUE1_NAME, true, false, false, null);
// 声明交换机与队列的绑定关系; 参数含义: 队列名, 交换机名, 路由key
channel.queueBind(RabbitConstant.FANOUT_QUEUE1_NAME, RabbitConstant.FANOUT_EXCHANGE_NAME, "");
// 不自动确认
boolean autoAck = false;
// 消息处理逻辑
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String msg = new String(delivery.getBody(), StandardCharsets.UTF_8);
long deliveryTag = delivery.getEnvelope().getDeliveryTag();
try {
log.info("Consumer1 收到消息: {}", msg);
// 模拟处理业务逻辑
if (msg.contains("error")) {
throw new RuntimeException("模拟业务异常");
}
// 手动确认; 参数含义: 消息标识, 是否批量确认(false: 只确认当前这一条消息, true: 确认所有小于等于该 deliveryTag 的消息)
channel.basicAck(deliveryTag, false);
log.info("消息手动确认");
} catch (Exception e) {
log.error(e.getMessage(), e);
// channel.basicNack(deliveryTag, false, true); // 单条消息,重新投递
// channel.basicNack(deliveryTag, false, false); // 单条消息,直接丢弃
channel.basicNack(deliveryTag, false, false);
}
};
// 启动消费者
channel.basicConsume(RabbitConstant.FANOUT_QUEUE1_NAME, autoAck, deliverCallback, consumerTag -> {
});
} catch (Exception e) {
log.error(e.getMessage(), e);
}
}
}消费者2
import com.juzicoding.config.RabbitConstant;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;
import lombok.extern.slf4j.Slf4j;
import java.nio.charset.StandardCharsets;
@Slf4j
public class Consumer2 {
public static void main(String[] args) {
// ⾸先创建连接,获取 Channel
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(RabbitConstant.HOST_NAME);
factory.setPort(RabbitConstant.HOST_PORT);
factory.setUsername(RabbitConstant.USER_NAME);
factory.setPassword(RabbitConstant.PASSWORD);
factory.setVirtualHost(RabbitConstant.VIRTUAL_HOST);
try {
// 创建连接,创建通道
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
// 声明交换机
channel.exchangeDeclare(RabbitConstant.FANOUT_EXCHANGE_NAME, "fanout");
// 声明队列
channel.queueDeclare(RabbitConstant.FANOUT_QUEUE2_NAME, true, false, false, null);
// 声明交换机与队列的绑定关系; 参数含义: 队列名, 交换机名, 路由key
channel.queueBind(RabbitConstant.FANOUT_QUEUE2_NAME, RabbitConstant.FANOUT_EXCHANGE_NAME, "");
// 不自动确认
boolean autoAck = false;
// 消息处理逻辑
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String msg = new String(delivery.getBody(), StandardCharsets.UTF_8);
long deliveryTag = delivery.getEnvelope().getDeliveryTag();
try {
log.info("Consumer2 收到消息: {}", msg);
// 模拟处理业务逻辑
if (msg.contains("error")) {
throw new RuntimeException("模拟业务异常");
}
// 手动确认; 参数含义: 消息标识, 是否批量确认(false: 只确认当前这一条消息, true: 确认所有小于等于该 deliveryTag 的消息)
channel.basicAck(deliveryTag, false);
log.info("消息手动确认");
} catch (Exception e) {
log.error(e.getMessage(), e);
// channel.basicNack(deliveryTag, false, true); // 单条消息,重新投递
// channel.basicNack(deliveryTag, false, false); // 单条消息,直接丢弃
channel.basicNack(deliveryTag, false, false);
}
};
// 启动消费者
channel.basicConsume(RabbitConstant.FANOUT_QUEUE2_NAME, autoAck, deliverCallback, consumerTag -> {
});
} catch (Exception e) {
log.error(e.getMessage(), e);
}
}
}Direct 交换机

对比 Fanout 交换机的广播投递,Direct 交换机根据消息的路由键(Routing Key)将消息精确地投递到一个或多个队列。
消息发送
import com.juzicoding.config.RabbitConstant;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import lombok.extern.slf4j.Slf4j;
import java.nio.charset.StandardCharsets;
@Slf4j
public class Producer {
public static void main(String[] args) {
// ⾸先创建连接,获取 Channel
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(RabbitConstant.HOST_NAME);
factory.setPort(RabbitConstant.HOST_PORT);
factory.setUsername(RabbitConstant.USER_NAME);
factory.setPassword(RabbitConstant.PASSWORD);
factory.setVirtualHost(RabbitConstant.VIRTUAL_HOST);
// 创建连接,创建通道
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
// 声明交换机; 参数含义: 交换机名称, 交换机类型, 是否持久化
channel.exchangeDeclare(RabbitConstant.DIRECT_EXCHANGE_NAME, "direct", true);
// 声明队列; 参数含义: 队列名, 是否持久化, 是否排他性, 是否自动删除, 其他参数
channel.queueDeclare(RabbitConstant.DIRECT_QUEUE_NAME, true, false, false, null);
// 声明交换机与队列的绑定关系; 参数含义: 队列名, 交换机名, 路由key
channel.queueBind(RabbitConstant.DIRECT_QUEUE_NAME, RabbitConstant.DIRECT_EXCHANGE_NAME, RabbitConstant.DIRECT_ROUTE_KEY);
// 发送消息
String msg = "direct msg!";
channel.basicPublish(RabbitConstant.DIRECT_EXCHANGE_NAME, RabbitConstant.DIRECT_ROUTE_KEY, null, msg.getBytes(StandardCharsets.UTF_8));
log.info("发送成功:{}", msg);
} catch (Exception e) {
log.error(e.getMessage(), e);
}
}
}消费消费
import com.juzicoding.config.RabbitConstant;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;
import lombok.extern.slf4j.Slf4j;
import java.nio.charset.StandardCharsets;
@Slf4j
public class Consumer {
public static void main(String[] args) {
// ⾸先创建连接,获取 Channel
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(RabbitConstant.HOST_NAME);
factory.setPort(RabbitConstant.HOST_PORT);
factory.setUsername(RabbitConstant.USER_NAME);
factory.setPassword(RabbitConstant.PASSWORD);
factory.setVirtualHost(RabbitConstant.VIRTUAL_HOST);
try {
// 创建连接,创建通道
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
// 声明队列
channel.queueDeclare(RabbitConstant.DIRECT_QUEUE_NAME, true, false, false, null);
// 不自动确认
boolean autoAck = false;
// 消息处理逻辑
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String msg = new String(delivery.getBody(), StandardCharsets.UTF_8);
long deliveryTag = delivery.getEnvelope().getDeliveryTag();
try {
log.info("收到消息: {}", msg);
// 模拟处理业务逻辑
if (msg.contains("error")) {
throw new RuntimeException("模拟业务异常");
}
// 手动确认; 参数含义: 消息标识, 是否批量确认(false: 只确认当前这一条消息, true: 确认所有小于等于该 deliveryTag 的消息)
channel.basicAck(deliveryTag, false);
log.info("消息手动确认");
} catch (Exception e) {
log.error(e.getMessage(), e);
// channel.basicNack(deliveryTag, false, true); // 单条消息,重新投递
// channel.basicNack(deliveryTag, false, false); // 单条消息,直接丢弃
channel.basicNack(deliveryTag, false, false);
}
};
// 启动消费者
channel.basicConsume(RabbitConstant.DIRECT_QUEUE_NAME, autoAck, deliverCallback, consumerTag -> {
});
} catch (Exception e) {
log.error(e.getMessage(), e);
}
}
}Topic 交换机

Topic 与 Direct 相比都是可以根据路由键把消息路由到不同的队列,只不过 Topic 可以让队列在绑定路由键的时候使用通配符,这样就可以路由一类消息。
消息发送
import com.juzicoding.config.RabbitConstant;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import lombok.extern.slf4j.Slf4j;
import java.nio.charset.StandardCharsets;
@Slf4j
public class Producer {
public static void main(String[] args) {
// ⾸先创建连接,获取 Channel
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(RabbitConstant.HOST_NAME);
factory.setPort(RabbitConstant.HOST_PORT);
factory.setUsername(RabbitConstant.USER_NAME);
factory.setPassword(RabbitConstant.PASSWORD);
factory.setVirtualHost(RabbitConstant.VIRTUAL_HOST);
// 创建连接,创建通道
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
// 声明交换机; 参数含义: 交换机名称, 交换机类型, 是否持久化
channel.exchangeDeclare(RabbitConstant.TOPIC_EXCHANGE_NAME, "topic", true);
// 声明队列; 参数含义: 队列名, 是否持久化, 是否排他性, 是否自动删除, 其他参数
channel.queueDeclare(RabbitConstant.TOPIC_QUEUE_NAME, true, false, false, null);
// 声明交换机与队列的绑定关系; 参数含义: 队列名, 交换机名, 路由key
channel.queueBind(RabbitConstant.TOPIC_QUEUE_NAME, RabbitConstant.TOPIC_EXCHANGE_NAME, RabbitConstant.TOPIC_ROUTE_KEY);
// 发送消息
String msg = "topic msg!";
channel.basicPublish(RabbitConstant.TOPIC_EXCHANGE_NAME, "juzi.coding", null, msg.getBytes(StandardCharsets.UTF_8));
log.info("发送成功:{}", msg);
} catch (Exception e) {
log.error(e.getMessage(), e);
}
}
}消费消费
import com.juzicoding.config.RabbitConstant;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;
import lombok.extern.slf4j.Slf4j;
import java.nio.charset.StandardCharsets;
@Slf4j
public class Consumer {
public static void main(String[] args) {
// ⾸先创建连接,获取 Channel
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(RabbitConstant.HOST_NAME);
factory.setPort(RabbitConstant.HOST_PORT);
factory.setUsername(RabbitConstant.USER_NAME);
factory.setPassword(RabbitConstant.PASSWORD);
factory.setVirtualHost(RabbitConstant.VIRTUAL_HOST);
try {
// 创建连接,创建通道
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
// 声明队列
channel.queueDeclare(RabbitConstant.TOPIC_QUEUE_NAME, true, false, false, null);
// 不自动确认
boolean autoAck = false;
// 消息处理逻辑
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String msg = new String(delivery.getBody(), StandardCharsets.UTF_8);
long deliveryTag = delivery.getEnvelope().getDeliveryTag();
try {
log.info("收到消息: {}", msg);
// 模拟处理业务逻辑
if (msg.contains("error")) {
throw new RuntimeException("模拟业务异常");
}
// 手动确认; 参数含义: 消息标识, 是否批量确认(false: 只确认当前这一条消息, true: 确认所有小于等于该 deliveryTag 的消息)
channel.basicAck(deliveryTag, false);
log.info("消息手动确认");
} catch (Exception e) {
log.error(e.getMessage(), e);
// channel.basicNack(deliveryTag, false, true); // 单条消息,重新投递
// channel.basicNack(deliveryTag, false, false); // 单条消息,直接丢弃
channel.basicNack(deliveryTag, false, false);
}
};
// 启动消费者
channel.basicConsume(RabbitConstant.TOPIC_QUEUE_NAME, autoAck, deliverCallback, consumerTag -> {
});
} catch (Exception e) {
log.error(e.getMessage(), e);
}
}
}生产者确认
RabbitMQ 虽然能保证消息到达队列后的可靠性,但在消息发送阶段可能存在风险。回顾基础的 channel.basicPublish 方法,你会发现它没有返回值——这意味着发送者无法确认消息是否成功到达 RabbitMQ。这在业务层面可能导致消息丢失,尤其是在网络不稳定或服务异常时。
为了填补这个可靠性缺口,RabbitMQ 提供了发送者确认机制(Publisher Confirms)。该机制通过异步确认的方式,让生产者能够可靠地知晓消息是否已被 MQ 服务器成功接收和处理。
这个机制默认情况下是关闭的,官网主要介绍了3种方式,接下来我们看看怎么开启这个机制的使用。
普通确认模式
简单来说,当发送一条消息后,调用 channel.waitForConfirmsOrDie(5000L); 方法会在 Channel 层面同步等待 RabbitMQ 服务端的确认响应,以确保消息已成功送达。需要注意的是,该方法会阻塞当前 Channel 直到收到确认或超时,在此期间 Channel 无法继续发送后续消息,因此会对消息吞吐量造成明显影响。
import com.juzicoding.config.RabbitConstant;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import lombok.extern.slf4j.Slf4j;
import java.nio.charset.StandardCharsets;
@Slf4j
public class Producer1 {
public static void main(String[] args) {
// ⾸先创建连接,获取 Channel
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(RabbitConstant.HOST_NAME);
factory.setPort(RabbitConstant.HOST_PORT);
factory.setUsername(RabbitConstant.USER_NAME);
factory.setPassword(RabbitConstant.PASSWORD);
factory.setVirtualHost(RabbitConstant.VIRTUAL_HOST);
// 创建连接,创建通道
try {
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
// 声明队列; 参数含义: 队列名, 是否持久化, 是否排他性, 是否自动删除, 其他参数
channel.queueDeclare(RabbitConstant.CONFIRMS_QUEUE, true, false, false, null);
// 开启生产者确认模式
channel.confirmSelect();
// 发送消息
for (int i = 0; i < 10; i++) {
String msg = "Publisher Confirms " + i;
channel.basicPublish("", RabbitConstant.CONFIRMS_QUEUE, null, msg.getBytes(StandardCharsets.UTF_8));
channel.waitForConfirmsOrDie(5000L);
log.info("消息确认成功: {}", msg);
}
} catch (Exception e) {
log.error(e.getMessage(), e);
}
}
}批量确认模式
之前的单条确认机制会显著降低系统吞吐量,因此一种折中的做法是在发送一批消息后进行批量确认。
这种方式能在一定程度上缓解发送方确认模式对性能的影响,但也存在一个固有缺陷:当确认出现异常时,发送方只能感知到整批消息存在问题,而无法准确定位具体是哪一条消息发生了异常。
import com.juzicoding.config.RabbitConstant;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import lombok.extern.slf4j.Slf4j;
import java.nio.charset.StandardCharsets;
@Slf4j
public class Producer2 {
public static void main(String[] args) {
// ⾸先创建连接,获取 Channel
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(RabbitConstant.HOST_NAME);
factory.setPort(RabbitConstant.HOST_PORT);
factory.setUsername(RabbitConstant.USER_NAME);
factory.setPassword(RabbitConstant.PASSWORD);
factory.setVirtualHost(RabbitConstant.VIRTUAL_HOST);
// 创建连接,创建通道
try {
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
// 声明队列; 参数含义: 队列名, 是否持久化, 是否排他性, 是否自动删除, 其他参数
channel.queueDeclare(RabbitConstant.CONFIRMS_QUEUE, true, false, false, null);
// 开启生产者确认模式
channel.confirmSelect();
// 发送消息
int batchSize = 5;
int outstanding = 0;
for (int i = 0; i < 24; i++) {
String msg = "Publisher Confirms " + i;
channel.basicPublish("", RabbitConstant.CONFIRMS_QUEUE, null, msg.getBytes(StandardCharsets.UTF_8));
outstanding++;
if (outstanding == batchSize) {
// 批量等待确认
channel.waitForConfirmsOrDie(5000);
log.info("一批消息确认成功");
outstanding = 0;
}
}
// 剩余消息
if (outstanding > 0) {
channel.waitForConfirmsOrDie(5000);
log.info("最后一批确认成功");
}
} catch (Exception e) {
log.error(e.getMessage(), e);
}
}
}异步确认模式
前面两种方式都存在一定的问题,所以一步确认模式就是最为推荐的一个方式,具体实现方式是在 Channel 中注册确认监听器来异步处理消息的发送结果。核心方法是:
channel.addConfirmListener(ConfirmCallback ackCallback, ConfirmCallback nackCallback);为何需要提供两个回调函数?这是由于 RabbitMQ 会分别对消息的成功送达和失败情况进行通知,因此我们需要同时为成功和失败两种情况分别注册处理逻辑。
ConfirmCallback 是一个监听器接口,其中只包含一个方法:
void handle(long sequenceNumber, boolean multiple) throws IOException;方法参数说明:
- sequenceNumber:表示消息的唯一序列号。由于 RabbitMQ 的消息体本身只是二进制数组,并不携带序列号信息,因此发送方需要自行维护消息与序列号之间的对应关系。RabbitMQ 提供了
channel.getNextPublishSeqNo()方法来生成一个全局递增的序列号,该序列号会被关联到下一条即将发送的消息上。应用程序必须负责记录和绑定这条消息与其序列号。 - multiple:这是一个布尔值参数。若为
false,表示当前仅确认了 sequenceNumber 对应的单条消息;若为true,则表示 RabbitMQ 一次性确认了一批消息,即序列号小于或等于当前 sequenceNumber 的所有消息都已被确认。
import com.juzicoding.config.RabbitConstant;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import lombok.extern.slf4j.Slf4j;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.ConcurrentSkipListMap;
@Slf4j
public class Producer3 {
public static void main(String[] args) {
// ⾸先创建连接,获取 Channel
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(RabbitConstant.HOST_NAME);
factory.setPort(RabbitConstant.HOST_PORT);
factory.setUsername(RabbitConstant.USER_NAME);
factory.setPassword(RabbitConstant.PASSWORD);
factory.setVirtualHost(RabbitConstant.VIRTUAL_HOST);
// 创建连接,创建通道
try {
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
// 声明队列; 参数含义: 队列名, 是否持久化, 是否排他性, 是否自动删除, 其他参数
channel.queueDeclare(RabbitConstant.CONFIRMS_QUEUE, true, false, false, null);
// 开启生产者确认模式
channel.confirmSelect();
// 保存未确认消息(deliveryTag -> msg)
ConcurrentSkipListMap<Long, String> confirmMap = new ConcurrentSkipListMap<>();
// 确认监听器
channel.addConfirmListener(
// ACK
(deliveryTag, multiple) -> {
if (multiple) {
confirmMap.headMap(deliveryTag + 1).clear();
log.info("ACK, tag <= {}", deliveryTag);
} else {
confirmMap.remove(deliveryTag);
log.info("ACK, tag = {}", deliveryTag);
}
},
// NACK
(deliveryTag, multiple) -> {
log.error("NACK, tag={}, msg={}", deliveryTag, confirmMap.get(deliveryTag));
// 这里可以做重发
}
);
// 发送消息
for (int i = 0; i < 100; i++) {
String msg = "async-confirm-" + i;
long nextSeqNo = channel.getNextPublishSeqNo();
confirmMap.put(nextSeqNo, msg);
channel.basicPublish("", RabbitConstant.CONFIRMS_QUEUE, null, msg.getBytes(StandardCharsets.UTF_8));
}
} catch (Exception e) {
log.error(e.getMessage(), e);
}
}
}实际上,在当前的版本中,Publisher 不仅能够确认消息是否成功到达 Exchange,还可以进一步确认消息是否从 Exchange 正确路由到了 Queue。这通过在 Channel 上添加一个 ReturnListener 来实现,它会监听到那些虽已成功发送但未能路由到任何队列的消息。
不过在实际开发中,这类情况通常源于代码层面的配置问题(如绑定键不匹配或队列不存在等),因此往往不会主动开启这一监听机制,后续在进阶内容中也会简单提到。
SpringBoot 集成
有了前面的基础,集成到 SpringBoot 就更简单了,也是差不多的步骤,我不写注释都能看懂的那种。
先创建项目,rabbitmq-springboot 为父项目,统一通用依赖及版本;mq-producer 为生产者项目,启动端口 8080;mq-consumer 为消费者项目,启动端口 8081。
📂 rabbitmq-springboot
├─ 📂 mq-consumer
│ ├─ 📂 src
│ │ ├─ 📂 main
│ │ │ ├─ 📂 java
│ │ │ │ └─ 📦 com.juzicoding
│ │ │ │ ├─ 📦 common
│ │ │ │ │ └─ ☕ MsgBO.java
│ │ │ │ ├─ 📦 config
│ │ │ │ │ └─ ☕ RabbitMQConfig.java
│ │ │ │ ├─ 📦 constant
│ │ │ │ │ └─ ☕ RabbitConstant.java
│ │ │ │ ├─ 📦 consumer
│ │ │ │ │ ├─ ☕ DirectConsumer.java
│ │ │ │ │ ├─ ☕ FanoutConsumer.java
│ │ │ │ │ ├─ ☕ SmsConsumer.java
│ │ │ │ │ ├─ ☕ TopicConsumer.java
│ │ │ │ │ └─ ☕ WorkQueueConsumer.java
│ │ │ │ ├─ 📦 entity
│ │ │ │ │ └─ ☕ SmsMsg.java
│ │ │ │ └─ ☕ MqConsumerApplication.java
│ │ │ └─ 📂 resources
│ │ │ └─ ⚙️ application.yml
│ └─ 📄 pom.xml
├─ 📂 mq-producer
│ ├─ 📂 src
│ │ ├─ 📂 main
│ │ │ ├─ 📂 java
│ │ │ │ └─ 📦 com.juzicoding
│ │ │ │ ├─ 📦 api
│ │ │ │ │ └─ ☕ RabbitTestApi.java
│ │ │ │ ├─ 📦 common
│ │ │ │ │ └─ ☕ MsgBO.java
│ │ │ │ ├─ 📦 config
│ │ │ │ │ ├─ ☕ DirectConfig.java
│ │ │ │ │ ├─ ☕ FanoutConfig.java
│ │ │ │ │ ├─ ☕ RabbitMQConfig.java
│ │ │ │ │ ├─ ☕ SmsConfig.java
│ │ │ │ │ ├─ ☕ TopicConfig.java
│ │ │ │ │ └─ ☕ WorkQueueConfig.java
│ │ │ │ ├─ 📦 constant
│ │ │ │ │ └─ ☕ RabbitConstant.java
│ │ │ │ ├─ 📦 entity
│ │ │ │ │ └─ ☕ SmsMsg.java
│ │ │ │ └─ ☕ MqProducerApplication.java
│ │ │ └─ 📂 resources
│ │ │ └─ ⚙️ application.yml
│ └─ 📄 pom.xml
└─ 📄 pom.xmlpom.xml 配置
rabbitmq-springboot -> pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>3.5.9</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.juzicoding</groupId>
<artifactId>rabbitmq-springboot</artifactId>
<version>1.0-SNAPSHOT</version>
<packaging>pom</packaging>
<modules>
<module>mq-producer</module>
<module>mq-consumer</module>
</modules>
<properties>
<maven.compiler.source>21</maven.compiler.source>
<maven.compiler.target>21</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<!-- web -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!-- 简化代码开发 -->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<!-- RabbitMQ -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<version>3.5.9</version>
</plugin>
</plugins>
</build>
</project>mq-producer -> pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>com.juzicoding</groupId>
<artifactId>rabbitmq-springboot</artifactId>
<version>1.0-SNAPSHOT</version>
</parent>
<artifactId>mq-producer</artifactId>
</project>mq-consumer -> pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>com.juzicoding</groupId>
<artifactId>rabbitmq-springboot</artifactId>
<version>1.0-SNAPSHOT</version>
</parent>
<artifactId>mq-consumer</artifactId>
</project>aplication.yml 配置
两个服务的 application.yml 主要配置。
spring:
rabbitmq:
host: 127.0.0.1
port: 5672
username: juzi
password: juzi123456
virtual-host: /juziRabbitConstant 常量
public class RabbitConstant {
public static final String WORK_QUEUE_NAME = "workQueue";
public static final String FANOUT_QUEUE1_NAME = "fanoutQueue1";
public static final String FANOUT_QUEUE2_NAME = "fanoutQueue2";
public static final String FANOUT_EXCHANGE_NAME = "fanoutExchange";
public static final String DIRECT_QUEUE_NAME = "directQueue";
public static final String DIRECT_EXCHANGE_NAME = "directExchange";
public static final String DIRECT_ROUTE_KEY = "directRouteKey";
public static final String TOPIC_QUEUE_NAME = "topicQueue";
public static final String TOPIC_EXCHANGE_NAME = "topicExchange";
public static final String TOPIC_ROUTE_KEY = "juzi.*";
}消息发送测试 Api
import com.juzicoding.common.MsgBO;
import com.juzicoding.constant.RabbitConstant;
import com.juzicoding.entity.SmsMsg;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@Slf4j
@RestController
@AllArgsConstructor
@RequestMapping("/rabbitTest")
public class RabbitTestApi {
private final RabbitTemplate rabbitTemplate;
@PostMapping("/workQueueSendMsg")
private void workQueueSendMsg() {
String msg = """
{"templateId": "workQueue", "msg": ""}
""";
rabbitTemplate.convertAndSend(RabbitConstant.WORK_QUEUE_NAME, msg);
log.info("发送消息: {}", msg);
}
@PostMapping("/fanoutSendMsg")
private void fanoutSendMsg() {
String msg = """
{"templateId": "fanout", "msg": "fanoutSendMsg"}
""";
rabbitTemplate.convertAndSend(RabbitConstant.FANOUT_EXCHANGE_NAME, "", msg);
log.info("发送消息: {}", msg);
}
@PostMapping("/directSendMsg")
private void directSendMsg() {
String msg = """
{"templateId": "direct", "code": "directSendMsg"}
""";
rabbitTemplate.convertAndSend(RabbitConstant.DIRECT_EXCHANGE_NAME, RabbitConstant.DIRECT_ROUTE_KEY, msg);
log.info("发送消息: {}", msg);
}
@PostMapping("/topicSendMsg")
private void topicSendMsg() {
String msg = """
{"templateId": "topic", "code": "topicSendMsg"}
""";
rabbitTemplate.convertAndSend(RabbitConstant.TOPIC_EXCHANGE_NAME, "juzi.coding.info", msg);
log.info("发送消息: {}", msg);
}
@PostMapping("/sendSmsMsg")
private void sendSmsMsg() {
MsgBO<SmsMsg> smsMsgBO = MsgBO.of(new SmsMsg("1234567890", "您好,您的验证码是 123456"));
rabbitTemplate.convertAndSend(RabbitConstant.SMS_QUEUE_NAME, smsMsgBO);
log.info("发送消息: {}", smsMsgBO);
}
}Work Queue 模型
消息发送
import com.juzicoding.constant.RabbitConstant;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class WorkQueueConfig {
@Bean
public Queue workQueue() {
return new Queue(RabbitConstant.WORK_QUEUE_NAME, true);
}
}消息消费
import com.juzicoding.constant.RabbitConstant;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Slf4j
@Component
public class WorkQueueConsumer {
@RabbitListener(queues = RabbitConstant.WORK_QUEUE_NAME)
public void workQueueConsumer(String textMessageStr) {
log.info("{} 收到消息:{}", RabbitConstant.WORK_QUEUE_NAME, textMessageStr);
}
}Fanout 交换机
消息发送
import com.juzicoding.constant.RabbitConstant;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class FanoutConfig {
@Bean
public Queue fanoutQueue1() {
return new Queue(RabbitConstant.FANOUT_QUEUE1_NAME, true);
}
@Bean
public Queue fanoutQueue2() {
return new Queue(RabbitConstant.FANOUT_QUEUE2_NAME, true);
}
@Bean
public FanoutExchange fanoutExchange() {
return new FanoutExchange(RabbitConstant.FANOUT_EXCHANGE_NAME, true, false);
}
@Bean
public Binding fanoutBinding1(FanoutExchange fanoutExchange, Queue fanoutQueue1) {
return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);
}
@Bean
public Binding fanoutBinding2(FanoutExchange fanoutExchange, Queue fanoutQueue2) {
return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange);
}
}消息消费
import com.juzicoding.constant.RabbitConstant;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Slf4j
@Component
public class FanoutConsumer {
@RabbitListener(queues = RabbitConstant.FANOUT_QUEUE1_NAME)
public void fanoutQueue1Consumer(String textMessageStr) {
log.info("{} 收到消息:{}", RabbitConstant.FANOUT_QUEUE1_NAME, textMessageStr);
}
@RabbitListener(queues = RabbitConstant.FANOUT_QUEUE2_NAME)
public void fanoutQueue2Consumer(String textMessageStr) {
log.info("{} 收到消息:{}", RabbitConstant.FANOUT_QUEUE2_NAME, textMessageStr);
}
}Direct 交换机
消息发送
import com.juzicoding.constant.RabbitConstant;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class DirectConfig {
@Bean
public Queue directQueue() {
return new Queue(RabbitConstant.DIRECT_QUEUE_NAME, true);
}
@Bean
public DirectExchange directExchange() {
return new DirectExchange(RabbitConstant.DIRECT_EXCHANGE_NAME, true, false);
}
@Bean
public Binding directBinding(Queue directQueue, DirectExchange directExchange) {
return BindingBuilder.bind(directQueue).to(directExchange).with(RabbitConstant.DIRECT_ROUTE_KEY);
}
}消息消费
import com.juzicoding.constant.RabbitConstant;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Slf4j
@Component
public class DirectConsumer {
@RabbitListener(queues = RabbitConstant.DIRECT_QUEUE_NAME)
public void handlerDirectQueueMsg(String textMessageStr) {
log.info("{} 收到消息:{}", RabbitConstant.DIRECT_QUEUE_NAME, textMessageStr);
}
}Topic 交换机
消息发送
import com.juzicoding.constant.RabbitConstant;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class TopicConfig {
@Bean
public Queue topicQueue() {
return new Queue(RabbitConstant.TOPIC_QUEUE_NAME, true);
}
@Bean
public TopicExchange topicExchange() {
return new TopicExchange(RabbitConstant.TOPIC_EXCHANGE_NAME, true, false);
}
@Bean
public Binding topicBinding(Queue topicQueue, TopicExchange topicExchange) {
return BindingBuilder.bind(topicQueue).to(topicExchange).with(RabbitConstant.TOPIC_ROUTE_KEY);
}
}消息消费
import com.juzicoding.constant.RabbitConstant;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Slf4j
@Component
public class TopicConsumer {
@RabbitListener(queues = RabbitConstant.TOPIC_QUEUE_NAME)
public void topicConsumer(String textMessageStr) {
log.info("{} 收到消息:{}", RabbitConstant.TOPIC_QUEUE_NAME, textMessageStr);
}
}消息转换器
默认情况下发送对象消息使用的是 jdk 的序列化方式,这样消息并不直接可读,还有各种各样的问题,实际使用会使用 json 作为序列化方式,同时通常会封装一个通用的消息类,用来自动生成一些通用的字段,比如消息唯一ID、生成时间等。
import lombok.Data;
import java.io.Serial;
import java.io.Serializable;
import java.time.LocalDateTime;
import java.util.UUID;
@Data
public class MsgBO<T> implements Serializable {
@Serial
private static final long serialVersionUID = 1L;
/**
* 消息唯一ID,可用于幂等处理
*/
private String id;
/**
* 消息生成时间
*/
private LocalDateTime timestamp;
/**
* 消息体
*/
private T data;
/**
* 创建消息对象,自动生成 id 和 timestamp
*/
public static <T> MsgBO<T> of(T data) {
MsgBO<T> msgDTO = new MsgBO<>();
msgDTO.setId(UUID.randomUUID().toString());
msgDTO.setTimestamp(LocalDateTime.now());
msgDTO.setData(data);
return msgDTO;
}
}以发送短信验证码为例,创建 SmsMsg。
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.io.Serial;
import java.io.Serializable;
@Data
@NoArgsConstructor
@AllArgsConstructor
public class SmsMsg implements Serializable {
@Serial
private static final long serialVersionUID = 1L;
/**
* 接收手机号
*/
private String phone;
/**
* 短信内容
*/
private String content;
}配置 MessageConverter 转换器,使用自定义的 Jackson 来做序列化和反序列化。
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;
import com.fasterxml.jackson.databind.module.SimpleModule;
import com.fasterxml.jackson.databind.ser.std.ToStringSerializer;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.Locale;
import java.util.TimeZone;
@Slf4j
@Configuration
public class RabbitMQConfig {
/**
* RabbitMQ 专用 ObjectMapper
*/
@Bean
public ObjectMapper rabbitObjectMapper() {
ObjectMapper objectMapper = new ObjectMapper();
// 统一时区与语言环境,避免不同服务器默认配置不一致
objectMapper.setTimeZone(TimeZone.getTimeZone("Asia/Shanghai"));
objectMapper.setLocale(Locale.CHINA);
// 序列化时忽略 NULL 字段,减少消息体积
objectMapper.setSerializationInclusion(JsonInclude.Include.NON_NULL);
// 反序列化时忽略未知字段,支持消息字段演进
objectMapper.disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES);
// Long / long 序列化为 String,避免精度丢失
SimpleModule longModule = new SimpleModule();
longModule.addSerializer(Long.class, ToStringSerializer.instance);
longModule.addSerializer(Long.TYPE, ToStringSerializer.instance);
objectMapper.registerModule(longModule);
// 支持 Java 8 时间类型(LocalDateTime 等)
objectMapper.registerModule(new JavaTimeModule());
// 禁用时间戳格式,使用可读的日期格式
objectMapper.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS);
return objectMapper;
}
/**
* RabbitMQ 消息转换器
*/
@Bean
public MessageConverter rabbitMessageConverter(ObjectMapper rabbitObjectMapper) {
return new Jackson2JsonMessageConverter(rabbitObjectMapper);
}
}消息发送
import com.juzicoding.constant.RabbitConstant;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class SmsConfig {
@Bean
public Queue smsQueue() {
return new Queue(RabbitConstant.SMS_QUEUE_NAME, true);
}
}消息消费
import com.juzicoding.common.MsgBO;
import com.juzicoding.constant.RabbitConstant;
import com.juzicoding.entity.SmsMsg;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Slf4j
@Component
public class SmsConsumer {
@RabbitListener(queues = RabbitConstant.SMS_QUEUE_NAME)
public void workQueueConsumer(MsgBO<SmsMsg> smsMsgBO) {
log.info("{} 收到消息:{}", RabbitConstant.SMS_QUEUE_NAME, smsMsgBO);
}
}