介绍

RabbitMQ 是一个在 AMQP(Advanced Message Queuing Protocol )基础上实现的,可复用的企业消息系统。它具有消息队列的所有基本属性特征和优点,比如可以用来做异步、削峰、解耦,可以用来做定时任务,事务顺序等等,可以用于大型企业通信,支持高并发、支持可拓展。

RabbitMQ 是使用 Erlang 编写的一个开源的消息队列,本身支持很多的协议:AMQP,XMPP, SMTP, STOMP,也正是如此,使的它变的非常重量级,更适合于企业级的开发。它同时实现了一个 Broker 构架,这意味着消息在发送给客户端时先在中心队列排队,对路由(Routing)、负载均衡(Load balance)或者数据持久化都有很好的支持。


rabbitMq核心概念

前面概念于MessageQueue记录了


死信队列

什么是死信队列?

当一条消息在队列中出现以下三种情况的时候,该消息就会变成一条死信。

  • 消息被拒绝(basic.reject / basic.nack),并且requeue = false
  • 消息TTL过期
  • 队列达到最大长度

当消息在一个队列中变成一个死信之后,如果配置了死信队列,它将被重新publish到死信交换机,死信交换机将死信投递到一个队列上,这个队列就是死信队列。

1
2
3
生产者 -> 交换机 -> Queue --- --->消费者
|
--->死信交换机 ---> 死信队列 ---> 死信消费者

延迟队列

延迟队列指的是存储对应的延迟消息,消息被发送以后,并不想让消费者立刻拿到消息,而是等待特定时间后,消费者才能拿到这个消息进行消费。

RabbitMQ 本身是没有延迟队列的,要实现延迟消息,一般有两种方式:

  1. 通过 RabbitMQ 本身队列的特性来实现,需要使用 RabbitMQ 的死信交换机(Exchange)和消息的存活时间 TTL(Time To Live)。
  2. 在 RabbitMQ 3.5.7 及以上的版本提供了一个插件(rabbitmq-delayed-message-exchange)来实现延迟队列功能。同时,插件依赖 Erlang/OPT 18.0 及以上。

也就是说,AMQP 协议以及 RabbitMQ 本身没有直接支持延迟队列的功能,但是可以通过 TTL 和 DLX 模拟出延迟队列的功能。


RabbitMQ工作模式

模式 使用的交换机 路由规则 消息分发特点 典型场景 可能出现的问题 解决方法
Simple(简单模式) 默认交换机 "" 不需要 Routing Key(直接用队列名) 一个队列,一个消费者(多个消费者竞争时,只能一个拿到消息) 入门示例,简单任务队列 队列只有一个消费者,容易成为性能瓶颈;消息丢失风险高 启用消息持久化(durable queue + persistent message);为队列增加消费者(改造成 Work Queue)
Work Queue(工作队列模式) 默认交换机 "" 不需要 Routing Key 多个消费者绑定同一个队列,消息轮询分发;可用 prefetch 实现公平分发 异步任务处理(发邮件、图片处理) 默认是轮询分发,消费者性能差异可能导致慢的消费者积压消息;消息可能丢失 开启 ACK 手动确认,设置 basicQos(prefetch=1) 实现公平调度;开启消息持久化,或者添加一些消费者
Fanout(发布/订阅模式) Fanout Exchange 忽略 Routing Key 消息广播到所有绑定的队列 广播场景:日志系统、消息通知 如果消费者不在线,消息会直接丢弃;队列过多会造成性能问题 配置 持久化队列临时存储(如镜像队列);使用消息持久化;为消费者添加消息堆积监控
Routing(路由模式) Direct Exchange Routing Key 必须和 Binding Key 精确匹配 消息只发送到匹配的队列 分类处理:不同级别日志(info/warn/error) Routing Key 与 Binding Key 不匹配时,消息会丢失;绑定关系过多时管理复杂 设置 备用交换机(Alternate Exchange) 处理未路由消息;管理好 Binding Key,避免过度复杂
Topic(主题模式) Topic Exchange Routing Key 支持 *(一个词)、#(多个词) 模糊匹配,把消息分发给满足条件的队列 复杂消息路由:电商订单(order.create / order.pay / order.cancel) 模糊匹配可能导致消息被多个队列同时接收(重复消费);Routing Key 规则过于复杂,难以维护 设计好 Routing Key 规范(如 系统.业务.动作);在消费者侧去重或加上唯一 ID;尽量避免过度泛化的 #

RabbitMQ 消息怎么传输

由于 TCP 链接的创建和销毁开销较大,且并发数受系统资源限制,会造成性能瓶颈,所以 RabbitMQ 使用信道的方式来传输数据。信道(Channel)是生产者、消费者与 RabbitMQ 通信的渠道,信道是建立在 TCP 链接上的虚拟链接,且每条 TCP 链接上的信道数量没有限制。就是说 RabbitMQ 在一条 TCP 链接上建立成百上千个信道来达到多个线程处理,这个 TCP 被多个线程共享,每个信道在 RabbitMQ 都有唯一的 ID,保证了信道私有性,每个信道对应一个线程使用。


如何保证消息的可靠性?

消息到 MQ 的过程中搞丢,MQ 自己搞丢,MQ 到消费过程中搞丢。

  • 生产者到 RabbitMQ:事务机制和 Confirm 机制,注意:事务机制和 Confirm 机制是互斥的,两者不能共存,会导致 RabbitMQ 报错。
  • RabbitMQ 自身:持久化、集群、普通模式、镜像模式。
  • RabbitMQ 到消费者:basicAck 机制、死信队列、消息补偿机制。

如何保证 RabbitMQ 消息的顺序性

  • 拆分多个 queue(消息队列),每个 queue(消息队列) 一个 consumer(消费者),就是多一些 queue (消息队列)而已,确实是麻烦点;
  • 或者就一个 queue (消息队列)但是对应一个 consumer(消费者),然后这个 consumer(消费者)内部用内存队列做排队,然后分发给底层不同的 worker 来处理。

如何保证RabbitMQ高可用?

RabbitMQ 是比较有代表性的,因为是基于主从(非分布式)做高可用性的,我们就以 RabbitMQ 为例子讲解第一种 MQ 的高可用性怎么实现。RabbitMQ 有三种模式:单机模式、普通集群模式、镜像集群模式。

单机模式

Demo 级别的,入门学习用的

普通集群模式

意思就是在多台机器上启动多个 RabbitMQ 实例,每个机器启动一个。你创建的 queue,只会放在一个 RabbitMQ 实例上,但是每个实例都同步 queue 的元数据(元数据可以认为是 queue 的一些配置信息,通过元数据,可以找到 queue 所在实例)。

你消费的时候,实际上如果连接到了另外一个实例,那么那个实例会从 queue 所在实例上拉取数据过来。这方案主要是提高吞吐量的,就是说让集群中多个节点来服务某个 queue 的读写操作。

镜像集群模式

这种模式,才是所谓的 RabbitMQ 的高可用模式。跟普通集群模式不一样的是,在镜像集群模式下,你创建的 queue,无论元数据还是 queue 里的消息都会存在于多个实例上,就是说,每个 RabbitMQ 节点都有这个 queue 的一个完整镜像,包含 queue 的全部数据的意思。然后每次你写消息到 queue 的时候,都会自动把消息同步到多个实例的 queue 上。RabbitMQ 有很好的管理控制台,就是在后台新增一个策略,这个策略是镜像集群模式的策略,指定的时候是可以要求数据同步到所有节点的,也可以要求同步到指定数量的节点,再次创建 queue 的时候,应用这个策略,就会自动将数据同步到其他的节点上去了。

这样的好处在于,你任何一个机器宕机了,没事儿,其它机器(节点)还包含了这个 queue 的完整数据,别的 consumer 都可以到其它节点上去消费数据。坏处在于,第一,这个性能开销也太大了吧,消息需要同步到所有机器上,导致网络带宽压力和消耗很重!RabbitMQ 一个 queue 的数据都是放在一个节点里的,镜像集群下,也是每个节点都放这个 queue 的完整数据


解决延时消息丢失问题

在队列里如果时间过长消息会被清理,此时如果有1w条消息堆积,其中1k条消息过期了,那么这1k条消息实际上会丢失,解决这个办法可以采用死信队列,让消息后续重新处理或者重试或者先写入文件延时处理