消息中间件

消息队列使用场景

消息队列常见的使用场景,核心的有3个:解耦、异步、削峰。

解耦

看这么个场景。A系统发送数据到BCD三个系统,通过接口调用发送。如果E系统也要这个数据呢?那如果C系统现在不需要了呢?A系统负责人几乎崩溃……

解耦1

在这个场景,A系统跟其他各种乱七八糟的系统严重耦合,A系统产生一条比较关键的数据,很多系统都需要A系统将这个数据发送过来。A系统要时时刻刻考虑BCDE四个系统如果挂了该怎么办?要不要重发,要不要把消息存起来?

如果使用MQ,A系统产生一条数据,发送到MQ里面去,哪个系统需要数据自己去MQ里消费。如果新系统需要数据,直接从MQ里消费即可;如果某个系统不需要这条数据了,就取消对MQ的消费即可。这样下来,A系统就不需要考虑要给谁发送数据,不需要维护这个代码,也不需要考虑是否调用成功、失败超时等情况。

解耦2

总结:通过一个MQ,Pub/Sub发布订阅消息这么一个模型,A系统就跟其它系统彻底解耦了。

异步

再来看一个场景,A系统接收一个请求,需要在自己本地写库,还需要再BCD三个系统写库,自己本地写库要3ms,BCD三个系统分别写库要300ms、450ms、200ms。最终请求总延时3 + 300 + 450 + 200 = 953ms,接近1s。用户通过浏览器发起请求,等待个1s,这几乎是不可接受的。

异步1

一般互联网类的企业,对于用户直接的操作,一般要求是每个请求都必须在200ms以内完成,对用户几乎是无感知的。

如果使用MQ,那么A系统连续发送3条消息到MQ队列中,加入耗时5ms,A系统从接受一个请求到返回响应给用户,总时长是3 + 5 = 8ms,对于用户而言,其实感觉上就是点个按钮,8ms以后就直接返回了,爽!网站做得真好,真快!

异步2

削峰

每天00:00到12:00系统风平浪静,每秒并发请求数量就50个。结果每次一到12:00~13:00,每秒并发请求数量突然会暴增到5k+条。但是系统是直接基于MySQL的,大量的请求涌入MySQL,每秒钟对MySQL执行约5k条SQL。

一般的MySQL,扛到每秒2k个请求就差不多了,如果每秒请求到5k的话,可能就直接把MySQL给打死了,导致系统崩溃,用户也就没法再使用系统了。

但是高峰期一过,到了下午的时候,就成了低峰期,可能也就1w的用户同时再网站上操作,每秒钟的请求数量可能也就50个,对整个系统几乎没有任何的压力。

削峰1

如果使用MQ,每秒5k个请求写入MQ,A系统每秒钟最多处理2k个请求,因为MySQL每秒钟最多处理2k个。A系统从MQ中慢慢拉取需求,每秒钟就拉取2k个请求,不要超过自己每秒能处理的最大请求数量就ok,这样下来,哪怕是高峰期的时候,A系统也绝对不会挂掉。而MQ每秒钟5k个请求进来,就2k个请求出去,结果就导致在中午高峰期(1个小时),可能有几十万甚至几百万的请求积压在MQ中。

削峰2

这个短暂的高峰期积压是ok的,因为高峰期过了之后,每秒钟就50个请求进MQ,但是A系统依然会按照每秒2k个请求的速度在处理。所以说,只要高峰期一过,A系统就会快速将积压的消息给解决掉。

消息队列的优缺点

优点就是在上述的特殊场景下对应的好处,解耦、异步、削峰。

缺点有以下几个:

  • 系统可用性降低

    系统引入的外部依赖越多,越容易挂掉。本来你就是A系统调用BCD三个系统的接口就好了,ABCD四个系统还好好的,没啥问题,但是加了个MQ进来之后,万一MQ挂了怎么办?MQ一挂,整套系统就崩溃了,所以在使用消息队列时,要保证消息队列的高可用性。

  • 系统复杂度提高

    加了MQ之后,如果保证消息没有重复消费?怎么处理消息丢失的情况?怎么保证消息传递的顺序性?

  • 一致性问题

    A系统处理完了直接返回成功,如果BCD三个系统那里,BD两个系统写库成功了,结果C系统写库失败了,就会导致数据不一致。

所以消息队列实际是一种非常复杂的结构,你引入它有很多好处,但是也得针对它带来的坏处做各种额外的技术方案和架构来规避掉,做好之后,系统复杂度可能提升了一个数量级,也许是复杂了10倍。但是关键时刻,用还是要用的。

常用消息队列的对比

以下是常用的消息中间件Kafka、ActiveMQ、RabbitMQ、RocketMQ的对比:

特性 ActiveMQ RabbitMQ RocketMQ Kafka
单机吞吐量 万级,比RocketMQ、Kafka低一个数量级 同ActiveMQ 10万级,支撑高吞吐 10万级,高吞吐,一般配合大数据类的系统来进行实时数据计算、日志采集等场景
topic数量对吞吐量的影响 topic可以达到几百/几千的级别,吞吐量会有较小幅度的下降,这是RocketMQ的一大优势,在同等机器下,可以支撑大量的topic topic从几十到几百个的时候,吞吐量会大幅度下降,在同等机器下,Kafka尽量保证topic数量不要过多,如果要支撑大规模的topic,需要增加更多的机器资源
时效性 ms级 微秒级,这是RabbitMQ的一大特点,延迟最低 ms级 延迟在ms级以内
可用性 高,基于主从架构实现高可用 同ActiveMQ 非常高,分布式架构 非常高,分布式,一个数据多个副本,少数机器宕机,不会丢失数据,不会导致不可用
消息可靠性 有较低的概率丢失数据 基本不丢 经过参数优化配置,可以做到0丢失 同RocketMQ
功能支持 MQ领域的功能及其完备 基于erlang开发,并发能力很强,性能极好,延时很低 MQ功能较为完善,还是分布式的,扩展性好 功能较为简单,主要支持简单的MQ功能,在大数据领域的实时计算以及日志采集被大规模使用

综上,各种对比之后,有如下建议:

一般的业务系统要引入MQ,最早大家都用ActiveMQ,但是现在确实大家用的不多了,没经过大规模吞吐场景的验证,社区也不是很活跃,不推荐使用。

后来大家开始用RabbitMQ,但事实是erlang语言阻止了大量的Java工程师去深入研究和掌控它,对公司而言,几乎处于不可控状态,但是确实人家是开源的,有比较稳定的支持,活跃度也高。

现在越来越多的公司会取用RocketMQ,确实很不错,阿里出品,但社区可能有突然黄掉的风险(目前RocketMQ已捐给Apache,但GitHub上的活跃度其实不算高),对自己公司技术实力有绝对自信的,推荐用RocketMQ,否则老老实实用RabbitMQ吧。

所以中小型公司,技术实力较为一般,技术挑战不是特别高,用RabbitMQ是不错的选择;大型公司,基础架构研发实力较强,用RocketMQ是很好的选择。

如果是大数据领域的实时计算、日志采集等场景,用Kafka是业内标准,绝对没问题,社区活跃度很高,绝对不会黄,几乎是全世界这个领域的事实性规范。

消息队列的高可用

RabbitMQ的高可用

RabbitMQ是基于主从(非分布式)做高可用的,比较有代表性。RabbitMQ有三种模式:单机模式、普通集群模式、镜像集群模式。

单机模式

单机模式,就是Demo级别的,生产不会采用单机模式。

普通集群模式(无高可用性)

普通集群模式,意思就是在多台机器上启动多个RabbitMQ实例,每个机器启动一个。你创建的queue,指挥放在一个RabbitMQ实例上,但是每个实例都同步queue的元数据(元数据可以认为是queue的一些配置信息,通过元数据,可以找到queue所在实例)。你消费的时候,实际上如果连接到了另外一个实例,那么那个实例会从queue所在实例上拉取数据过来。

RabbitMQ普通集群模式

这种方式确实很麻烦,也不怎么好,没做到所谓的分布式,就是个普通集群。因为这种模式导致你要么消费者每次随机连接一个实例然后拉取数据,要么固定连接那个queue所在实例消费数据,前者有数据拉取的开销,后者导致单实例性能瓶颈。

而且如果那个放queue的实例宕机了,会导致接下来其他实例就无法从那个实例拉取,如果你开启了消息持久化,让RabbitMQ落地存储消息的话,消息不一定会丢失,得等这个实例恢复了,才可以继续从这个queue拉取数据。

所以这个模式其实没有所谓的高可用性,主要是为了提高吞吐量,就是说让集群中多个节点来服务于某个queue的读写操作。

镜像集群模式(高可用性)

这种模式,才是所谓的RabbtiMQ的高可用模式。跟普通集群模式不一样的是,在镜像集群模式下,你创建的queue,无论元数据还是queue里的消息都会存在于多个实例上,就是说,每个RabbitMQ节点都有这个queue的一个完整镜像,包含queue的全部数据的意思。然后你每次写消息到queue的时候,都会自动把消息同步到多个实例的queue上。

RabbitMQ镜像集群模式

那么如何开启这个镜像集群模式呢?其实很简单,RabbitMQ有很好的管理控制台,就是在后台新增一个策略,这个策略是镜像集群模式的策略,指定的时候是可以要求数据同步到所有节点的,也可以要求同步到指定数量的节点,再次创建queue的时候,应用这个策略,就会自动将数据同步到其他的节点上去了。

这样的话,好处在于,你任何一个机器宕机了,没事儿,其他机器(节点)还包含了这个queue的完整数据,别的consumer都可以到其他节点上去消费数据。坏处在于,第一,性能开销有些大,消息需要同步到所有机器上,导致网络带块压力和消耗很重!第二,不是分布式的,就没有扩展性可言了,如果某个queue负载很重,你加机器,新增的机器也包含了这个queue的所有数据,并且没有办法线性扩展你的queue。如果这个queue的数据量很大,大到这个机器上的容量无法容纳了,此时该怎么办呢?

Kafka的高可用性

Kafka有一个最基本的架构认识:由多个broker组成,每个broker是一个节点;你创建一个topic,这个topic可以划分为多个partition,每个partition可以存在于不同的broker上,每个partition就放一部分数据。

这就是天然的分布式消息队列,就是说一个topic的数据,是分散放在多个机器上的,每个机器就放一部分数据。

实际上RabbitMQ之类的,并不是分布式消息队列,它就是传统的消息队列,只不过提供了一些集群、HA(High Available,高可用性)的机制而已,因为无论怎么玩,RabbitMQ一个queue的数据都是放在一个节点里的,镜像集群下,也是每个节点都放这个queue的完整数据。

Kafka0.8版本以前,是没有HA机制的,就是任何一个broker宕机了,那个broker上的partition就废了,没法写也没法读,没有什么高可用性可言。

比如说,我们假设创建了一个topic,指定其partition数量是3个,分别在三台机器上。但是,如果第二台机器宕机了,会导致这个topic 1/3 的数据就丢了,因此这个是做不到高可用的。

Kafka基础架构

Kafka0.8以后,提供了HA机制,就是replica(复制品)副本机制。每个partition的数据都会同步到其他机器上,形成自己的多个replica副本。所有replica会选举一个leader出来,那么生产和消费都跟这个leader打交道,然后其他replica就是follower。写的时候,leader会负责把数据同步到所有follower上去,读的时候就直接读leader上的数据即可。只能读写leader?很简单,要是你可以随意读写每个follower,那么就要关注数据一致性的问题,系统复杂度太高,很容易出问题。Kafka会均匀地将一个partition的所有replica分布在不同的机器上,这样才可以提高容错性。

Kafka高可用

这样一来就有所谓的高可用性了,因为如果某个broker宕机了,那个broker上面的partition在其他机器上都有副本的。如果这个宕机的broker上面由某个partition的leader,那么此时会从follower中重新选举一个新的leader出来,在新的leader上继续进行读写操作。这样就可以实现高可用性了。

写数据的时候,生产者就写leader,然后leader将数据落地写本地磁盘,接着其他follower自己主动从leader拉pull数据。一旦所有follower同步好数据了,就会发送ack给leader,leader收到所哟follower的ack之后,就会返回写成功的消息给生产者。(当然,这只是其中一种模式,还可以调整这个行为)

消费的时候,只会从leader去读,但是只有当一个消息已经被所有follower都同步成功返回ack的时候,这个消息才会被消费者读到。

消息的幂等性

消费消息的时候,我们要考虑会不会重复消费?能不能避免重复消费?或者重复消费了也不能造成系统异常?

首先,我们要明确,使用RabbitMQ、RocketMQ、Kafka的时候,都有可能会出现消息重复消费的问题。用Kafka来举个例子,Kafka实际上有个offset的概念,就是每个消息写进去,都有一个offset,代表消息的序号,consumer消费了数据之后,每隔一段时间(定时定期),会把自己消费过的消息的offset提交一下,表示“我已经消费过了,下次我要是重启的话,你就让我继续从上次消费到的offset来继续消费吧”。

但是凡事都有意外,比如在重启consumer服务的时候,是直接kill进程再重启,就会导致consumer有些消息处理了,但是没来得及提交offset,重启之后,部分消息会再次消费一次。

有这么个场景,数据1、2、3一次进入Kafka,Kafka会给这三条数据每条分配一个offset,代表这条数据的序号,我们假设分配的offset依次是152、153、154。消费者从Kafka去消费的时候,也是按照这个顺序去消费。假如当消费者消费了offset=153这条数据,刚准备去提交offset到zookeeper,此时消费者进程被重启了,那么此时消费过的数据1、2的offset并没有提交,Kafka也就不知道你已经消费了offset=153这条数据。那么重启之后,由于之前的offset没有提交成功,那么数据1、2会再次传过来,如果此时消费者没有去重的话,就会导致重复消费。

注意:新版的Kafka已经将offset的存储从zookeeper转移至Kafka brokers,并使用内部位移主题 __consumer_offset 进行存储。

Kafka提交offset

如果消费是拿一条数据就往数据库里写一条,会导致数据1、2再数据库插入了两次,那么数据就出错了。

其实重复消费并不可怕,可怕的是你没考虑到重复消费之后,怎么保证幂等性。

举个例子,假设你有一个系统,消费一条消息就往数据库里插入一条数据,要是你一个消息重复两次,就插入了两条,数据就错了;但是你要是消费到第二次的时候,自己判断一下是否已经消费过了,如果是就直接跳过,这样就指挥插入一条数据,从而保证了数据的正确性。

一条数据重复出现两次,数据库里就只有一条数据,这就保证了系统的幂等性。

幂等性,通俗点说,就是一条数据、或者一个请求,重复来多次,得确保对应的数据是不会改变的,不能出错。

所以问题来了,怎么保证消息队列消费的幂等性?

其实还是得集合业务来思考,这里给几个思路:

  • 如果你取到数据要写库,你先根据主键查一下,如果这条数据已经有了,就别插入了,update一下。
  • 如果你是写Redis,那没问题,反正每次都是set,天然幂等性。
  • 如果你不是上面两个场景,那要做的稍微复杂一点,你需要让生产者发送每条数据的时候,里面加上一个全局唯一的id,类似订单id之类的,然后再消费的是时候,先根据这个id去Redis查一下,之前是不是消费过了?如果没有消费过,就正常处理,然后把这个id写入Redis;如果已经消费过了,就跳过不处理,保证不重复处理相同的消息即可。
  • 也可以基于数据库的唯一索引来保证重复数据不会重复插入多条。因为有唯一索引约束了,重复数据插入会报错,不会导致数据库中出现脏数据。

消息队列消费幂等性处理

消息可靠性传输

使用MQ的时候有个基本原则,就是数据不能多一条,也不能少一条。不能多,就是前面说的重复消费和幂等性问题。不能少,就是说数据不能丢失。

如果用MQ传递过来的是非常核心的消息,比如说计费、扣费的一些消息,那必须确保这个MQ传递过程中绝对不会把计费消息给弄丢。

数据的丢失问题,可能出现在生产者、MQ、消费者中,以下内容从RabbitMQ和Kafka分别来进行说明。

RabbitMQ

RabbitMQ消息丢失

生产者弄丢了消息

生产者将数据发送到RabbitMQ的时候,可能会因为网络等问题,导致数据在半路给搞丢了。

此时可以选择用RabbitMQ提供的事务功能,在生产者发送数据之前开启RabbitMQ事务 channel.txSelect,然后发送消息,如果消息没有成功被RabbitMQ接收到,那么生产者会收到异常报错,此时可以回滚事务 channel.txRollback,然后重试发送消息;如果收到了消息,那么可以提交事务 channel.txCommit

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
//开启事务
channel.txSelect

try {
//发送消息

} catch (Exception e) {
channel.txRollback

//再次重发消息
...
}

//提交事务
channel.txCommit

开启RabbitMQ的事务机制(同步)时,吞吐量会下来,因为太耗性能了。

所以一般来说,如果你要确保RabbitMQ的消息别丢,可以开启 confirm 模式,在生产者那里设置开启 confirm 模式之后,每次写的消息都会分配一个唯一的id,然后如果成功写入RabbitMQ中,RabbitMQ会回传一个ack消息,告诉你这个消息写入ok。如果RabbitMQ没能处理这个消息,会回调你的nack接口,告诉你这个消息接收失败,你可以重试。而且你可以结合这个机制自己在内存里维护每个消息id的状态,如果超过一定时间还没有接收到这个消息的回调,可以将消息进行重发。

事务机制和 confirm 机制最大的不同在于,事务机制时同步的,你提交一个事务之后会阻塞住,但是 confirm 机制是异步的,你发送消息之后可以接着发送下一个消息,RabbitMQ接收到消息之后会异步回调接口通知你消息接收到了。

所以一般在生产者避免数据丢失,都是用 confirm 机制的。

RabbitMQ弄丢了消息

就是RabbitMQ自己弄丢了消息,这个你必须开启RabbitMQ的持久化,就是消息写入之后会持久化到磁盘,哪怕是RabbitMQ自己挂了,恢复之后会自动读取之前存储的数据,一般数据不会丢。极其罕见的是,RabbitMQ还没持久化,自己就挂了,可能导致少量数据丢失,但是这个概率较小。

设置持久化有两个步骤:

  • 创建queue的时候将其设置为持久化

    这样就可以保证RabbitMQ持久化queue的元数据,但是它是不会持久化queue里的数据的。

  • 发送消息的时候将消息的 deliveryMode 设置为2

    就是将消息设置为持久化的,此时RabbitMQ就会将消息持久化到磁盘上去。

必须同时设置这两个持久化才行,RabbitMQ哪怕是挂了,再次重启,也会从磁盘上恢复queue,恢复这个queue里的数据。

注意,哪怕是你给RabbitMQ开启了持久化机制,也有一种可能,就是这个消息写到了RabbitMQ中,但是还没来得及持久化到磁盘上,结果不巧,此时RabbitMQ挂了,就会导致内存里的一点点数据丢失。

所以,持久化可以跟生产者那边的 confirm 机制配合起来,只有消息被持久化到磁盘之后,才会通知生产者 ack 了,所以哪怕实在持久化到磁盘之前,RabbtiMQ挂了,数据丢了,生产者收不到 ack,你野是可以自己重发的。

消费端弄丢了消息

RabbitMQ如果丢失了数据,主要是因为你消费的时候,刚消费到,还没处理,结果进程挂了,比如重启了,RabbitMQ认为你消费了,这数据就丢了。

这个时候得用RabbitMQ提供的 ack 机制,简单来说,就是你必须关闭RabbitMQ的自动 ack,可以通过一个api来调用就行,然后每次你自己代码里确保处理完的时候,再在程序里 ack。这样的话,如果你还没处理完,就不会 ack,那RabbitMQ就认为你还没处理完,这个时候RabbtiMQ会把这个消息分配给别的consumer去处理,消息是不会丢的。

RabbitMQ消息丢失及对应解决方案

Kafka

消费端弄丢了消息

唯一可能导致消费者丢失数据的情况,就是说,你消费到了这个消息,然后消费者那边自动提交了offset,让Kafka以为你已经消费好了这个消息,但其实你才刚准备处理这个消息,还没处理自己就挂了,这种情况下这条消息就丢了。

这不是跟RabbitMQ差不多吗,大家都只打Kafka会自动提交offset,那么只要关闭自动提交offset,在处理完之后自己手动提交offset,就可以保证数据不会丢。但是此时确实还是可能会有重复消费,比如你刚处理完,还没提交offset,结果自己挂了,此时肯定会重复消费一次,自己保证幂等性就好了。

生产环境碰到的一个问题,就是说我们的Kafka消费者消费到了数据之后是写到一个内存的queue里先缓冲一下,结果有的时候,你刚把消息写入内存queue,然后消费者会自动提交offset,此时我们重启了系统,就会导致内存queue里还没来得及处理的数据就丢失了。

Kafka弄丢了消息

这块比较常见的一个场景,就是Kafka某个broker宕机,然后重新选举partition的leader。如果此时其他的follower刚好还有些数据没有同步,结果此时leader挂了,接着选举某个follower成为leader之后,就会少了一些数据。

所以此时一般是要求起码设置如下4个参数:

  • 给topic设置 replication.factor 参数:这个值必须大于1,要求每个partition必须至少有2个副本。
  • 在Kafka服务端设置 min.insync.replicas 参数:这个值必须大于1,这个是要求一个leader至少感知到有至少一个follower还跟自己保持联系,没掉队,这样才能确保leader挂了还有一个follower可用。
  • 在producer端设置 acks=all:这个是要求每条数,必须是写入所有 replica 之后,才能认为是写成功了。
  • 在producer端设置 retries=MAX(很大的一个值,无限次重试的意思):这个是要求一旦写入失败,就无限重试。

我们生产环境就是按照上述要求配置的,这样配置之后,至少在Kafka broker端就可以保证leader所在broker发生故障,进行leader切换时,数据不会丢失。

生产者会不会弄丢消息?

如果按照上述的思路设置了 acks=all,一定不会丢,要求是,你的leader接收到消息,所有的follower都同步到了消息之后,才认为本次写成功了。如果没满足这个条件,生产者会自动不断地重试,重试无限次。

消息的顺序性

消息错乱的场景

先看看顺序会错乱的场景:

  • RabbitMQ:一个queue,多个consumer。比如,生产者想RabbitMQ里发送了三条数据,顺序依次是data1、data2、data3,压入的是RabbitMQ的一个内存队列。有三个消费者分别从MQ中消费者三条数据中的一条,结果消费者2先执行完操作,把data2存入数据库,然后是data1、data3,明显顺序乱了。

RabbitMQ消息顺序错乱

  • Kafka:比如说我们建了一个topic,有三个partition。生产者在写的时候,其实可以指定一个key,比如说我们制定了某个订单id作为key,那么这个订单相关的数据,一定会被分发到同一个partition中去,而且这个partition中的数据一定是有顺序的。

消费者从partition中取出来数据的时候,也一定是有顺序的。到这里,顺序还是ok的,没有错乱。接着,我们在消费者里可能会搞多个线程来并发处理消息。因为如果消费者是单线程消费处理,而处理比较耗时的话,比如处理一条消息耗时几十ms,那么1秒钟只能处理几十条消息,者吞吐量太低了。而多个线程并发跑的话,顺序可能就乱掉了。

Kafka消息顺序错乱

解决方案

RabbitMQ

拆分多个queue,每个queue一个consumer,就是多一些queue而已,确实是麻烦点;或者就一个queue但是对应一个consumer,然后这个consumer内部用内存队列做排列,然后分发给底层不同的worker来处理。

Rabbit消息顺序错乱解决方案

Kafka

  • 一个topic,一个partition,一个consumer,内部单线程消费,单线程吞吐量太低,一般不会用这个。
  • 写N个内存queue,具有相同key的数据都到同一个内存queue;然后对于N个线程,每个线程分别消费一个内存queue即可,这样就能保证顺序性。

Kafka消息顺序错乱解决方案

解决消息队列的延时以及过期失效问题

核心问题:如何解决消息队列的延时以及过期失效问题?消息队列满了以后该怎么处理?有几百万消息持续积压几小时,该怎么解决?

大量消息在MQ中积压了几个消失了还没解决

几千万条数据在MQ里积压了七八个小时,从下午4点多,积压到了晚上11点多。这个是我们真实遇到过的一个场景,确实是线上故障了,这个时候要不然就是修复consumer的问题,让它恢复消费速度,然后傻傻的等待几个小时消费完毕。

一个消费者一秒是1000条,3个消费者一秒是3000条,一分钟就是18万条。所以如果你积压了几百万到上千万的数据,几十消费者恢复了,也需要大概1小时的时间才能恢复过来。

一般这个时候,只能临时紧急扩容了,具体操作步骤和思路如下:

  • 先修复consumer的问题,确保其恢复消费速度,然后将现有consumer都停掉。
  • 新建一个topic,partition是原来的10倍,临时建立好原先10倍的queue数量。
  • 然后写一个临时的分发数据的consumer程序,这个程序部署上去消费积压的数据,消费之后不做耗时的处理,直接均匀轮询写入临时建立好的10倍数量的queue。
  • 接着临时征用10倍的机器来部署consumer,每一批consumer消费一个临时queue的数据。这种做法相当于是临时将queue资源和consumer资源扩大10倍,以正常的10倍速度来消费数据。
  • 等快速消费完积压数据之后,得恢复原先部署的架构,重新用原先的consumer机器来消费消息。

MQ中的消息过期失效了

假设你用的是RabbitMQ,RabbitMQ是可以设置过期时间的,也就是TTL。如果消息在queue中积压超过一定的时间就会被RabbitMQ给清理掉,这个数据就没了。那这就是第二个坑了,这就不是数据会大量积压在MQ里,而是大量的数据会直接丢失。

这个情况下,就不是说要增加consumer消费积压的消息,因为实际上没啥积压,而是丢了大量的消息。我们可以采取一个方案,就是批量重导,这个我们之前线上也有过类似的场景。就是大量积压的时候,我们当时就直接丢失数据了,然后等过了高峰期,比如晚上12点以后,用户都睡觉了。这个时候我们就开始写程序,将丢失的那批数据,写个临时程序,一点一点的查出来,然后重新灌入MQ里面去,把白天丢的数据给补回来。

假设1万个订单积压在MQ里面,没有处理,其中1000个订单都丢了,你只能手动写程序把那1000个订单给查出来,手动发到MQ里去再补一次。

MQ都快写满了

如果消息积压在MQ里,你很长时间都没有处理掉,此时导致MQ都快写满了,怎么办?这个还有别的办法吗?没有,谁让你第一个方案执行的太慢了,你临时写程序,接入数据来消费,消费一个丢弃一个,都不要了,快速消费掉所有的消息。然后走第二个方案,到了晚上再补数据吧。

RockerMQ的处理方式

对于RockerMQ,官方针对消息积压问题,提供了解决方案。

提高消费并行度

绝大部分消息消费行为都属于IO密集型,即可能是操作数据库,或者调用RPC,这类消费行为的消费速度在于后端数据库或者外部系统的吞吐量,通过增加消费并行度,可以提高总的消费吞吐量,但是并行度增加到一定程度,反而会下降。所以,应用必须要设置合理的并行度。如下有几种修改消费并行度的方法:

同一个ConsumerGroup下,通过增加consumer实例数量在提高并行度(需要注意的是超过订阅队列数的consuemr实例无效)。可以通过加机器,或者在已有机器启动多个进程的方式。提高单个consumer的消费并行线程,通过修改参数consumeThreadMin、consumeThreadMax实现。

批量方式消费

某些业务流程如果支持批量方式消费,则可以很大程度上提高消费吞吐量,例如订单扣款类应用,一次处理一个订单耗时1s,一次处理10个订单可能也只耗时2s,这样即可大幅度提高消费的吞吐量,通过设置consumer的consumeMessageBatchMaxSize这个参数,默认是1,即一次之消费一条消息,例如设置为N,那么每次消费的消息数小于等于N。

跳过非重要消息

发生消息堆积时,如果消费速度一直追不上发送速度,如果业务对数据要求不高的话,可以选择丢弃不重要的消息。例如,当某个队列的消息数堆积到100000条以上,则尝试丢弃部分或者全部消息,这样就可以快速追上发送消息的速度。实例代码如下:

1
2
3
4
5
6
7
8
9
10
11
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
long offset = msgs.get(0).getQueueOffset();
String maxOffset = msgs.get(0).getProperty(Message.PROPERTY_MAX_OFFSET);
long diff = Long.parseLong(maxOffset) - offset;
if (diff > 100000) {
//TODO 消息堆积情况的特殊处理
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
//TODO 正常消费过程
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}

优化每条消息消费过程

举例如下,某条消息的消费过程如下:

  • 根据消息从DB查询【数据1】
  • 根据消息从DB查询【数据2】
  • 复杂的业务计算
  • 向DB插入【数据3】
  • 向DB插入【数据4】

这条消息的消费过程有4次与DB的交互,如果按照每次5ms计算,那么总共耗时20ms,假设业务计算耗时5ms,那么总共耗时25ms,所以如果能把4次DB交互优化为2次,那么总耗时就可以优化到15ms,即总体性能提高了40%。所以应用如果对时延敏感的话,可以把DB部署在SSD硬盘,相比于SCSI磁盘,前者的RT会小很多。