返回首页

RabbitMQ学习笔记

说明: 本笔记为本人学习过程中随手写的笔记,为复习使用,笔记中可能存在遗漏或错误,具体请以官方文档和权威书籍为准!谢谢! 笔记中的一些图片等元素因路径配置问题,可能会发生丢失。 笔记中展示的知识点仅为部分内容,完整内容请查阅官方开发文档内容!

 

------基础篇------

RabbitMQ介绍

RabbitMQ 是一个消息代理:它接收和转发消息。您可以将其视为邮局:当您将要邮寄的邮件放入邮箱时,您可以确信邮递员最终会将邮件投递给您的收件人。在这个类比中,RabbitMQ 是一个邮箱、一个邮局和一个邮递员。

RabbitMQ 和邮局之间的主要区别在于它不处理纸张,而是接受、存储和转发二进制数据块——消息

RabbitMQ 和消息传递通常使用一些术语。

请注意,生产者、消费者和代理不必驻留在同一主机上;实际上,在大多数应用程序中它们都不是。一个应用程序也可以既是生产者又是消费者。

 

微服务一旦拆分,必然涉及到服务之间的相互调用,目前我们服务之间调用采用的都是基于OpenFeign的调用。这种调用中,调用者发起请求后需要等待服务提供者执行业务返回结果后,才能继续执行后面的业务。也就是说调用者在调用过程中处于阻塞状态,因此我们称这种调用方式为同步调用,也可以叫同步通讯。但在很多场景下,我们可能需要采用异步通讯的方式,为什么呢?

我们先来看看什么是同步通讯和异步通讯。如图:

img

两种方式各有优劣,打电话可以立即得到响应,但是你却不能跟多个人同时通话。发微信可以同时与多个人收发微信,但是往往响应会有延迟。

同步调用

image-20250510152758234

同步调用的优势

同步调用的问题

异步调用

异步调用方式就是基于消息通知的方式,一般分为三个角色

image-20250509205103953

例如在支付服务中,不在同步调用业务关联度低的服务,而发送消息通知到Broker

image-20250509205608811

这种方式具有以下优势

异步调用的问题

适用于异步调用的场景

 

技术选型

MQ(MessageQueue),消息队列,字面意思就是存放消息的队列,也就是异步调用中的Broker

 RabbitMQActiveMQRecketMQKafka
开发公司/社区RabbitApache阿里Apache
开发语言ErlangjavajavaScala,java
协议支持AMQP,XMPP,SMTP,STOMPOpenWire,STOMP,REST,XMPP,AMQP自定义协议自定义协议
可用性一般
单机吞吐量一般非常高
消息延迟微秒级毫秒级毫秒级毫秒以内
消息可靠性一般一般

 

RabbitMQ安装

安装(Docker方式)

官网:RabbitMQ: One broker to queue them all | RabbitMQ

Installing on Debian and Ubuntu | RabbitMQ

  1. 安装Erlang环境

  2. 安装Docker

     

  3. 拉取docker镜像

    注意:拉取镜像前请先更改为国内加速镜像,否则会导致拉取失败

     

  4. 执行以下指令启动容器

     

  5. 测试访问

    输入设置好的用户名和密码,进入后台 image-20250510211155049

 

  1. 关机后再次启动

     

 

基本介绍

RabbitMQ的整体架构及核心概念:

image-20250510153857201

快速入门

需求:在RabbitMQ的控制台下完成下列操作

 

  1. 新建两个队列 image-20250512204036071

    image-20250512204108123

  2. 选一个交换机,配置交换机与队列的关系 image-20250512204623550 image-20250512204753435

  3. 使用交换机发送一条消息 image-20250512205105118

    可以看到消息发送成功 image-20250512205019302

 

 

数据隔离

需求:在rabbitMQ控制台下完成以下操作

 

  1. 新建用户 image-20250513135210764

  2. 新建虚拟主机 image-20250513141540233

 

Java客户端操作

快速入门

由于RabbitMq默认的API操作十分繁琐,在Java中,使用Spring AMQP中提供的API操作消息队列,更加方便快捷

 

关于Spring AMQP

Spring AMQP是一个基于AMQP协议的消息中间件框架,它提供了一个简单的API来发送和接收异步、可靠的消息。它是Spring框架的一部分,可以与Spring Boot和其他Spring项目一起使用。Spring AMQP支持多种消息协议,包括RabbitMQ、Apache ActiveMQ和Qpid等。它提供了一个高级的消息模型,包括消息确认、事务和消息监听器等功能,使得开发者可以轻松地编写可靠的消息应用程序。Spring AMQP还提供了一些高级特性,如消息转换器、消息路由、消息过滤和消息拦截等。这些特性可以帮助开发者更加灵活地处理不同类型的消息,同时也提高了应用程序的性能和可靠性。总之,Spring AMQP是一个强大的消息中间件框架,它可以帮助开发者轻松地构建可靠的消息应用程序,并提供了许多高级特性来满足不同的需求。

Spring AMQP

 

AMQP介绍

Advanced MessageQueuingProtocol,是用于在应用程序之间传递业务消息的开放标准。该协议与语言和平台无关,更符合微服务中独立性的要求。

 

需求

  1. 利用控制台创建队列simple.queue image-20250515095558932

  2. 在publisher服务中,利用Spring AMQP向simple.queue发送消息

    • 引入Spring AMQP依赖

    • 在每个微服务引入MQ服务端信息

    • 发送消息 SpringAMQP提供了RabbitTemplate工具类,方便我们发送消息。发送消息代码如下:

       

  3. 在consumer服务中,利用Spring AMQP编写消费者,监听simple.queue队列消息 image-20250515094904764

    SpringAMQP提供声明式的消息监听,我们只需要通过注解在方法上声明要监听的队列名称,将来SpringAMQP就会把消息传递给当前方法:

 

work模型

Work queues,任务模型。让多个消费者绑定到一个队列,共同消费队列中的消息

image-20250515112008169

该模式下,在消息队列中一条消息只被消费一次

在默认情况下RabbitMQ会将消息依次轮询投递到队列上的每一个消费者,但这并没有考虑到消费者是否已经处理完消息,可能会出现消息堆积问题

可以在消费者的配置文件中添加以下配置,设置preFetch值为1,确保同一时刻最多投递一条消息给消费者:

 

 

Fanout交换机

在真实的生产环境下,都是通过exchange来发送消息的,而不是直接发送到队列

交换机的种类分为以下三种

FanoutExchange会将接收到的消息广播到每一个跟其绑定的queue,所以也叫广播模式 image-20250515224137486

案例实现:利用SpringAMQP演示FanoutExchange的使用

  1. 在RabbitMQ控制台中,新建两个队列 image-20250516174700942

  2. 在RabbitMQ控制台中,新建一个Fanout交换机 image-20250516174736042

  3. 将两个队列与交换机进行绑定 image-20250516181828278

  4. 在consumer服务中编写两个消费者方法

     

  5. 在publisher中编写测试方法,向交换机发送消息

  6. 效果展示 image-20250516182259928

 

Direct交换机

DirectExchange会将接收到的消息根据规则路由到指定的Queue,因此称为定向路由。

image-20250516182708524

 

案例实现:利用SpringAMQP演示DirectExchange的使用

  1. 在RabbitMQ控制台中,声明队列direct.queue1和direct.queue2 image-20250517101232155

  2. 在RabbitMQ控制台中,声明交换机direct.exchange,将两个队列与其绑定,设置好key image-20250517101305693 image-20250517101420805

  3. 在consumer服务中,编写两个消费者方法,分别监听direct.queue1和direct.queue2

  4. 在publisher中编写测试方法,利用不同的RoutingKey向direct.exchange发送消息

     

测试结果

image-20250517102208743

 

 

Topic交换机

TopicExchange与DirectExchange类似,区别在于routingKey可以是多个单词的列表,并且以.分割。

image-20250517133820890

 

案例实现:利用Spring AMQP演示DirectExchange的使用

image-20250517134027560

需求如下:

  1. 在RabbitMQ控制台中,声明队列topic.queue1和topic.queue2 image-20250517134118145

  2. 在RabbitMQ控制台中,声明交换机hmall.topic,将两个队列与其绑定 image-20250517134158045 image-20250517134318163

  3. 在consumer服务中,编写两个消费者方法,分别监听topic.queue1和topic.queue2

     

  4. 在publisher中编写测试方法,利用不同的RoutingKey向hmall.topic发送消息

  5. 效果展示 image-20250517135638424

 

 

声明队列和交换机

在真实的生产环境和开发环境中,一个项目可能需要很多个交换机,如果这些交换机全部通过上述后台管理界面进行创建,会非常繁琐复杂,在切换环境时,还可能会出错,导致项目出现问题。

在日常开发中,使用更多的时在项目Java代码的配置中进行声明,创建交换机,队列等

SpringAMQP提供了几个类,用来声明队列、交换机及其绑定关系:

声明交换机一般在消费者端进行声明

方式一

基于Bean声明

image-20250518095852112

但是这种方法对于TopicExchange和DirectExchange这类需要绑定KEY的交换机就非常不友好,每次绑定一个Key都需要重新声明一个Bean(一个bean一次只能绑定一个key),如果绑定的key非常多,就需要声明很多key,使得业务代码非常冗余复杂,重复。

 

方式二

基于注解声明

SpringAMQP还提供了基于@RabbitListener注解来声明队列和交换机的方式:

image-20250518103947538

 

消息转换器

image-20250519130037173

在这段消息数据中,通过默认的消息转换器转换后,消息大小为181 bytes,消息体积变大了很多。

 

Spring的对消息对象的处理是由org.springframework.amqp.support.converter.MessageConverter来处理的。而默认实现是SimpleMessageConverter,基于JDk的ObjectOutputStream完成序列化。 存在下列问题:

 

建议采用JSON序列化代替默认的JDK序列化,要做两件事情:

  1. 在publisher和consumer中都要引l入jackson依赖:

  2. 在publisher和consumer中都要配置MessageConverter:

image-20250522220853205

对比两次消息可以看出,通过消息转换器转换的消息体积明显变小了

------高级篇------

生产者可靠性

生产者重连

有的时候由于网络波动,可能会出现客户端连接MQ失败的情况。通过配置我们可以开启连接失败后的重连机制:

 

注意:

当网络不稳定的时候,利用重试机制可以有效提高消息发送的成功率。不过SpringAMQP提供的重试机制是阻塞式的重试,也就是说多次重试等待的过程中,当前线程是被阻塞的,会影响业务性能。 如果对于业务性能有要求,建议禁用重试机制。如果一定要使用,请合理配置等待时长和重试次数,当然也可以考虑使用异步线程来执行发送消息的代码。

 

生产者确认

RabbitMQ 提供了PublisherConfirm和PublisherReturn两种确认机制。开启确机制认后,在MQ成功收到消息后会返回确认消息给生产者。返回的结果有以下几种情况:

image-20250519195422246

 

生产者确认代码实现

  1. 在publisher微服务中的application.yml中添加以下配置

    配置说明: publisher-confirm-type有三种模式可选:

    • none:关闭confirm机制

    • simple:同步阻塞等待MQ的回执消息

    • correlated:MQ异步回调方式返回回执消息

  2. 每个RabbitTemplate只能配置一个ReturnCallback,因此需要在项目启动过程中配置:

  3. 测试发送消息

     

如何处理生产者的确认消息?

 

MQ可靠性

数据持久化

在默认情况下,RabbitMQ会将接收到的信息保存在内存中以降低消息收发的延迟。这样会导致两个问题:

RabbitMQ实现数据持久化包括3个方面:

 

LazyQueue

数据持久化虽然保证了数据的安全性问题,但同时我们也发现,这种方法的性能比较一般。

从RabbitMQ的3.6.0版本开始,就增加了LazyQueue的概念,也就是惰性队列。

惰性队列的特征如下:

在3.12版本后,所有队列都是LazyQueue模式,无法更改。

image-20250521191634375

 

消息堆积问题的解决方案?

 

惰性队列的优点有哪些?

 

惰性队列的缺点有哪些?

 

消费者可靠性

消费者确认

为了确认消费者是否成功处理消息,RabbitMQ提供了消费者确认机制(ConsumerAcknowledgement)。当消费者处理消息结束后,应该向RabbitMQ发送一个回执,告知RabbitMQ自己消息处理状态。回执有三种可选值:

 

SpringAMQP已经实现了消息确认功能。并允许我们通过配置文件选择ACK处理方式,有三种方式:

 

配置方法

在消费者的配置文件中添加以下配置项

 

失败重试机制

当消费者出现异常后,消息会不断requeue(重新入队)到队列,再重新发送给消费者,然后再次异常,再次requeue无限循环,导致mq的消息处理升,带来不必要的压力。

我们可以利用Spring的retry机制,在消费者出现异常时利用本地重试,而不是无限制的requeue到mq队列:

在消费者服务中添加以下配置

在开启重试模式后,重试次数耗尽,如果消息依然失败,则需要有MessageRecoverer接口来处理,它包含三种不同的实现:

 

在一般情况下,我们会使用RepublishMessageRecoverer策略

操作步骤:

  1. 首先,定义接收失败消息的交换机、队列及其绑定关系

  2. 然后,定义RepublishMessageRecoverer

     

业务幂等性

幂等是一个数学概念,用函数表达式来描述是这样的:f(x)=f(f(x))。在程序开发中,则是指同一个业务,执行一次或多次对业务状态的影响是一致的。

image-20250522215811358

方案一:唯一消息ID

给每个消息都设置一个唯一id,利用id区分是否是重复消息:

  1. 每一条消息都生成一个唯一的id,与消息一起投递给消费者。

  2. 消费者接收到消息后处理自己的业务,业务处理成功后将消息ID保存到数据库

  3. 如果下次又收到相同消息,去数据库查询判断是否存在,存在则为重复消息放弃处理。

image-20250522222336584

问题:这种方法需要改造业务代码。例如:在业务代码的基础上,需要增加读写ID记录数据库代码,性能会有所降低。

 

方案二:结合业务逻辑,基于业务本身做判断

但是,该方法并不具备通用性,需要根据实际情况进行选择。

例如:我们要在支付后修改订单状态为已支付,应该在修改订单状态前先查询订单状态,判断状态是否是未支付。只有未支付订单才需要修改,其它状态不做处理:

image-20250523222603842

 

延迟消息

延迟消息介绍

生产者发送消息时指定一个时间,消费者不会立刻收到消息,而是在指定时间之后才收到消息。

 

死信交换机

当一个队列中的消息满足下列情况之一时,就会成为死信(deadletter):

如果队列通过dead-letter-exchange属性指定了一个交换机,那么该队列中的死信就会投递到这个交换机中。这个交换机称为死信交换机(DeadLetterExchange,简称DLx)。

案例实现:

使用死信交换机完成延迟消息效果

  1. 按照下图的展示创建和绑定交换机和队列 image-20250524100903757

  2. 创建消费者,监听死信队列

     

  3. 创建生产者,发送消息,设置消息过期时间

     

  4. 效果展示 image-20250524164622226

 

延迟消息插件

RabbitMQ的官方也推出了一个插件,原生支持延迟消息功能。该插件的原理是设计了一种支持延迟消息功能的交换机,当消息投递到交换机后可以暂存一定时间,到期后再投递到队列。

插件下载:Releases · rabbitmq/rabbitmq-delayed-message-exchange

image-20250524164926027

  1. 将插件上传到服务器,并导入到容器的插件目录

  2. 进入容器

  3. 启用插件

    启动成功 image-20250524174600745

 

使用方法:

  1. 声明交换机时添加delayed="true"

    • 注解方法

       

    • @Bean方法

       

  2. 发送延迟消息测试

  3. 效果展示,10秒后死信队列显示输出

    image-20250524183614820