10分鐘帶你了解開發(fā)說的Mq到底是什么?

以下文章來源于懂Java的測試 ,作者懂Java的測試

前言

日常測試工作中,開發(fā)提到的Mq到底什么意思?本篇文章就分享一下所謂的Mq究竟是什么技術(shù)、有什么好處,以及常見的Mq組件有那些。

Mq的優(yōu)點(diǎn)有哪些

Mq(Message Queue)是消息隊(duì)列,主要有三大用途,我們拿一個電商系統(tǒng)的下單舉例:
解耦:引入消息隊(duì)列之前,下單完成之后,需要訂單服務(wù)去調(diào)用庫存服務(wù)減庫存,調(diào)用營銷服務(wù)加營銷數(shù)據(jù)……引入消息隊(duì)列之后,可以把訂單完成的消息丟進(jìn)隊(duì)列里,下游服務(wù)自己去調(diào)用就行了,這樣就完成了訂單服務(wù)和其它服務(wù)的解耦合。



異步:訂單支付之后,我們要扣減庫存、增加積分、發(fā)送消息等等,這樣一來這個鏈路就長了,鏈路一長,響應(yīng)時間就變長了。引入消息隊(duì)列,除了更新訂單狀態(tài),其它的都可以異步去做,這樣一來就來,就能降低響應(yīng)時間。



削峰:消息隊(duì)列合一用來削峰,例如秒殺系統(tǒng),平時流量很低,但是要做秒殺活動,秒殺的時候流量瘋狂懟進(jìn)來,我們的服務(wù)器,Redis,MySQL各自的承受能力都不一樣,直接全部流量照單全收肯定有問題啊,嚴(yán)重點(diǎn)可能直接打掛了。
我們可以把請求扔到隊(duì)列里面,只放出我們服務(wù)能處理的流量,這樣就能抗住短時間的大流量了。



解耦、異步、削峰,是消息隊(duì)列最主要的三大作用。

Mq的缺點(diǎn)有哪些
互聯(lián)網(wǎng)很多技術(shù)都是雙刃劍,有利益有弊,同樣Mq也是,它有以下幾個缺點(diǎn):

系統(tǒng)可用性降低
本來系統(tǒng)運(yùn)行好好的,現(xiàn)在你非要加入個消息隊(duì)列進(jìn)去,那消息隊(duì)列掛了,你的系統(tǒng)不是呵呵了。
比如說一個核心鏈路里面:系統(tǒng)A -> 系統(tǒng)B -> 系統(tǒng)C,然后系統(tǒng)C是通過MQ異步調(diào)用系統(tǒng)D的。



看起來很好,你用這個MQ異步化的手段解決了一個核心鏈路執(zhí)行性能過差的問題。
但是你有沒有考慮另外一個問題,就是萬一你依賴的那個MQ中間件突然掛掉了怎么辦?這個還真的不是異想天開,MQ、Redis、MySQL這些組件都有可能會掛掉。
一旦你的MQ掛了,就導(dǎo)致你的系統(tǒng)的核心業(yè)務(wù)流程中斷了。本來你要是不引入MQ中間件,那其實(shí)就是一些系統(tǒng)之間的調(diào)用,但是現(xiàn)在你引入了MQ,就導(dǎo)致你多了一個依賴。一旦多了一個依賴,就會導(dǎo)致你的可用性降低。
因此,一旦引入了MQ中間件,你就必須去考慮這個MQ是如何部署的,如何保證高可用性。
甚至在復(fù)雜的高可用的場景下,你還要考慮如果MQ一旦掛了以后,你的系統(tǒng)有沒有備用兜底的技術(shù)方案,可以保證系統(tǒng)繼續(xù)運(yùn)行下去
系統(tǒng)復(fù)雜度提高
還是上面那張圖,大家再來看一下:



不知道大家有沒有發(fā)現(xiàn)一個問題,這個鏈路除了MQ中間件掛掉這個可能存在的隱患之外,可能還有一些其他的技術(shù)問題。
比如說,莫名其妙的,系統(tǒng)C發(fā)了一個消息到MQ,結(jié)果那個消息因?yàn)榫W(wǎng)絡(luò)故障等問題,就丟失了。這就導(dǎo)致系統(tǒng)D沒有收到那條消息。
這可就慘了,這樣會導(dǎo)致系統(tǒng)D沒完成自己該做的任務(wù),此時可能整個系統(tǒng)會出現(xiàn)業(yè)務(wù)錯亂,數(shù)據(jù)丟失,嚴(yán)重的bug,用戶體驗(yàn)很差等各種問題。
這還只是其中之一,萬一說系統(tǒng)C給MQ發(fā)送消息,不小心一抽風(fēng)重復(fù)發(fā)了一條一模一樣的,導(dǎo)致消息重復(fù)了,這個時候該怎么辦?





可能會導(dǎo)致系統(tǒng)D一下子把一條數(shù)據(jù)插入了兩次,導(dǎo)致數(shù)據(jù)錯誤,臟數(shù)據(jù)的產(chǎn)生,最后一樣會導(dǎo)致各種問題。
或者說如果系統(tǒng)D突然宕機(jī)了幾個小時,導(dǎo)致無法消費(fèi)消息,結(jié)果大量的消息在MQ中間件里積壓了很久,這個時候怎么辦?
即使系統(tǒng)D恢復(fù)了,也需要慢慢的消費(fèi)數(shù)據(jù)來進(jìn)行處理。
所以這就是引入MQ中間件的第二個大問題,系統(tǒng)穩(wěn)定性可能會下降,故障會增多,各種各樣亂七八糟的問題都可能產(chǎn)生。而且一旦產(chǎn)生了一個問題,就會導(dǎo)致系統(tǒng)整體出問題。就需要為了解決各種MQ引發(fā)的技術(shù)問題,采取很多的技術(shù)方案,系統(tǒng)的復(fù)雜性就會提升好幾個層級。
一致性問題
A 系統(tǒng)處理完了直接返回成功了,人都以為你這個請求就成功了;但是問題是,要是 BCD 三個系統(tǒng)那里,BD 兩個系統(tǒng)寫庫成功了,結(jié)果 C 系統(tǒng)寫庫失敗了,咋整?你這數(shù)據(jù)就不一致了
所以消息隊(duì)列實(shí)際是一種非常復(fù)雜的架構(gòu),你引入它有很多好處,但是也得針對它帶來的壞處做各種額外的技術(shù)方案和架構(gòu)來規(guī)避掉,做好之后,你會發(fā)現(xiàn),媽呀,系統(tǒng)復(fù)雜度提升了一個數(shù)量級,也許是復(fù)雜了 10 倍。但是關(guān)鍵時刻,用,還是得用的。
你們公司項(xiàng)目用的是什么消息中間件?
Mq的組件有哪些?

Kafka、ActiveMQ、RabbitMQ、RocketMQ 有什么優(yōu)缺點(diǎn)?



Mq的組件有哪些常見的問題?

消息的順序問題
消息有序指的是可以按照消息的發(fā)送順序來消費(fèi)。
假如生產(chǎn)者產(chǎn)生了 2 條消息:M1、M2,假定 M1 發(fā)送到 S1,M2 發(fā)送到 S2,如果要保證 M1 先于 M2 被消費(fèi),怎么做?



解決方案:
保證生產(chǎn)者 - MQServer - 消費(fèi)者是一對一對一的關(guān)系



消息的重復(fù)問題
造成消息重復(fù)的根本原因是:網(wǎng)絡(luò)不可達(dá)。
所以解決這個問題的辦法就是繞過這個問題。那么問題就變成了:如果消費(fèi)端收到兩條一樣的消息,應(yīng)該怎樣處理?
消費(fèi)端處理消息的業(yè)務(wù)邏輯保持冪等性。只要保持冪等性,不管來多少條重復(fù)消息,最后處理的結(jié)果都一樣。保證每條消息都有唯一編號且保證消息處理成功與去重表的日志同時出現(xiàn)。利用一張日志表來記錄已經(jīng)處理成功的消息的 ID,如果新到的消息 ID 已經(jīng)在日志表中,那么就不再處理這條消息。
具體可參考之前的文章測試工程師都能看懂的冪等性問題
消息的丟失問題

每個MQ組件都有自己的方案,由于篇幅有限,我就簡單聊聊RocketMq(互聯(lián)網(wǎng)項(xiàng)目常用的Mq技術(shù),經(jīng)歷過阿里雙十一高并發(fā)的考驗(yàn))防丟失方案吧。

一般RocketMq會有以下幾種場景丟失消息:



1、生產(chǎn)者將消息發(fā)送給Rocket MQ的時候,如果出現(xiàn)了網(wǎng)絡(luò)抖動或者通信異常等問題,消息就有可能會丟失
2、消息需要持久化到磁盤中,這時會有兩種情況導(dǎo)致消息丟失
①RocketMQ為了減少磁盤的IO,會先將消息寫入到os cache中,而不是直接寫入到磁盤中,消費(fèi)者從os cache中獲取消息類似于直接從內(nèi)存中獲取消息,速度更快,過一段時間會由os線程異步的將消息刷入磁盤中,此時才算真正完成了消息的持久化。在這個過程中,如果消息還沒有完成異步刷盤,RocketMQ中的Broker宕機(jī)的話,就會導(dǎo)致消息丟失





②如果消息已經(jīng)被刷入了磁盤中,但是數(shù)據(jù)沒有做任何備份,一旦磁盤損壞,那么消息也會丟失
3、消費(fèi)者成功從RocketMQ中獲取到了消息,還沒有將消息完全消費(fèi)完的時候,就通知RocketMQ我已經(jīng)將消息消費(fèi)了,然后消費(fèi)者宕機(jī),但是RocketMQ認(rèn)為消費(fèi)者已經(jīng)成功消費(fèi)了數(shù)據(jù),所以數(shù)據(jù)依舊丟失了
那么如何保證消息的零丟失呢?



1、生產(chǎn)者保證消息不丟失的方案是使用RocketMQ自帶的事務(wù)機(jī)制來發(fā)送消息,大致流程為
①首先生產(chǎn)者發(fā)送half消息到RocketMQ中,此時消費(fèi)者是無法消費(fèi)half消息的,若half消息就發(fā)送失敗了,則執(zhí)行相應(yīng)的回滾邏輯
②half消息發(fā)送成功之后,且RocketMQ返回成功響應(yīng),則執(zhí)行生產(chǎn)者的核心鏈路
③如果生產(chǎn)者自己的核心鏈路執(zhí)行失敗,則回滾,并通知RocketMQ刪除half消息
④如果生產(chǎn)者的核心鏈路執(zhí)行成功,則通知RocketMQ commit half消息,讓消費(fèi)者可以消費(fèi)這條數(shù)據(jù)
其中還有一些RocketMQ長時間沒有收到生產(chǎn)者是要commit/rollback操作的響應(yīng),回調(diào)生產(chǎn)者接口的細(xì)節(jié),感興趣的可以參考我的這篇博文 RocketMQ分布式事務(wù)原理
在使用了RocketMQ事務(wù)將生產(chǎn)者的消息成功發(fā)送給RocketMQ,就可以保證在這個階段消息不會丟失
消息中間件要保證消息不丟失,首先需要將os cache的異步刷盤策略改為同步刷盤,這一步需要修改Broker的配置文件,將flushDiskType改為SYNC_FLUSH同步刷盤策略,默認(rèn)的是ASYNC_FLUSH異步刷盤。一旦同步刷盤返回成功,那么就一定保證消息已經(jīng)持久化到磁盤中了;為了保證磁盤損壞不會丟失數(shù)據(jù),我們需要對RocketMQ采用主從機(jī)構(gòu),集群部署,Leader中的數(shù)據(jù)在多個Follower中都存有備份,防止單點(diǎn)故障。
消費(fèi)者如何保證不丟失消息呢?消息到達(dá)了消費(fèi)者,RocketMQ在代碼中就能保證消息不會丟失。
//注冊消息監(jiān)聽器處理消息
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context){                                          
//對消息進(jìn)行處理
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }
});
上面這段代碼中,RocketMQ在消費(fèi)者中注冊了一個監(jiān)聽器,當(dāng)消費(fèi)者獲取到了消息,就會去回調(diào)這個監(jiān)聽器函數(shù),去處理里面的消息
當(dāng)你的消息處理完畢之后,才會返回ConsumeConcurrentlyStatus.CONSUME_SUCCESS
只有返回了CONSUME_SUCCESS,消費(fèi)者才會告訴RocketMQ我已經(jīng)消費(fèi)完了,此時如果消費(fèi)者宕機(jī),消息已經(jīng)處理完了,也就不會丟失消息了
如果消費(fèi)者還沒有返回CONSUME_SUCCESS時就宕機(jī)了,那么RocketMQ就會認(rèn)為你這個消費(fèi)者節(jié)點(diǎn)掛掉了,會自動故障轉(zhuǎn)移,將消息交給消費(fèi)者組的其他消費(fèi)者去消費(fèi)這個消息,保證消息不會丟失。
使用上面一整套的方案就可以在使用RocketMQ時保證消息零丟失,但是性能和吞吐量也將大幅下降
使用事務(wù)機(jī)制傳輸消息,會比普通的消息傳輸多出很多步驟,耗費(fèi)性能
同步刷盤相比異步刷盤,一個是存儲在磁盤中,一個存儲在內(nèi)存中,速度完全不是一個數(shù)量級,主從架構(gòu)的話,需要Leader將數(shù)據(jù)同步給Follower,消費(fèi)時無法異步消費(fèi),只能等待消費(fèi)完成再通知RocketMQ消費(fèi)完成。
消息零丟失是一把雙刃劍,要想用好,還是要視具體的業(yè)務(wù)場景而定,選擇合適的方案才是最好的

怎么測試Mq的組件?
最后一點(diǎn)特別關(guān)鍵,也是很多面試官愛問的面試題,你怎么測試mq組件的?主要分為以下幾個方面:
Mq異常測試

1、消息重復(fù)發(fā)送
消息重復(fù)發(fā)送,冪等性測試,可參考文章測試工程師都能看懂的冪等性問題
2、消息到達(dá)順序不一致
消息到達(dá)順序不一致,導(dǎo)致業(yè)務(wù)異常。
比如:訂單下單后再取消,如果先收到取消的消息,再收到下單消息,就會有問題。
3、消費(fèi)失敗生產(chǎn)者、消費(fèi)者重試機(jī)制

4、Mq性能測試

線上即將投入使用的Mq集群,通常都會做個性能摸底,在Mq的TPS和機(jī)器的資源使用率之間取得一個平衡。

舉個例子,mq在資源利用率極高的情況下可以到10萬TPS,但是機(jī)器的內(nèi)存、cpu、io負(fù)載特別高,瀕臨宕機(jī),這個是極其不安全的,可是當(dāng)TPS在8萬的時候,機(jī)器的內(nèi)存、cpu、io負(fù)載較高,但是還在可接受范圍內(nèi),這批機(jī)器就比較安全,不至于宕機(jī)。

所以我們的目標(biāo)就是綜合TPS和機(jī)器負(fù)載,盡量找到一個最好的Tps,并且機(jī)器各項(xiàng)負(fù)載都在可接受范圍內(nèi),這才是我們的測試目的。為此要性能壓測要進(jìn)行以下幾個步驟。

1、代碼中創(chuàng)建幾個生產(chǎn)者,投遞消息

我們可以在代碼里讓兩個生產(chǎn)者,不停的往集群中發(fā)送消息,每個生成者啟動多個80個線程(具體參考服務(wù)器配置),相當(dāng)于每臺機(jī)器有80個線程并發(fā)往Mq集群發(fā)送消息。每條消息的大小固定。

2、實(shí)時查看集群中服務(wù)器的cpu、內(nèi)存負(fù)載情況、磁盤io、JVM GC頻率、網(wǎng)卡流量等。

可以使用命令,也可以使用監(jiān)控軟件prometheus 等監(jiān)控服務(wù)器設(shè)備的負(fù)載。

3、當(dāng)設(shè)備負(fù)載任何一個指標(biāo)已經(jīng)超過安全閾值以后,立即查看Mq管理頁面的tps峰值,譬如機(jī)器網(wǎng)卡是128M,網(wǎng)卡實(shí)際流量有100M,接近千兆網(wǎng)流量,我們mq的Tps峰值是7萬左右,我們就認(rèn)為7萬tps是新集群最佳Tps.

其他

Producer(生產(chǎn)者)
所謂的生產(chǎn)者,就是產(chǎn)生消息的應(yīng)用方,在進(jìn)行生產(chǎn)者端的測試時,需要注意如何接入對應(yīng)的MQ,需要哪些信息,可以提前確認(rèn)(包括但不僅限入接入賬號、接入的主題、消息格式等)。
測試注意點(diǎn)有:
1、數(shù)據(jù)是否真正推送到隊(duì)列中
2、數(shù)據(jù)是否推送到正確的topic下
3、如果一次推送的數(shù)據(jù)過多,前面推送的數(shù)據(jù)如何處理(超過隊(duì)列)
4、同時需要注意每個topic下的queue如何分布數(shù)據(jù)

Consumer(消費(fèi)者) 在MQ的世界里,消費(fèi)者(從MQ隊(duì)列里獲取數(shù)據(jù)的應(yīng)用方)主要有兩種,PUSH和PULL,簡單來說,就是主動拉取消息和被動接收消息(還有一種消費(fèi)方式是廣播消息,應(yīng)用場景較少)。不管哪種獲取消息方式,首先都要訂閱消息,即先指定需要消費(fèi)哪個topic下的消息。
測試注意點(diǎn):
1、確認(rèn)應(yīng)用的消費(fèi)是哪種
2、測試消費(fèi)者的消費(fèi)信息源是否正確(能否從正確的topic中拿到正確的消息)
3、測試Topic的消費(fèi)隊(duì)列策略是什么
4、數(shù)據(jù)被消費(fèi)者使用后,有沒有及時的被清除
5、當(dāng)消息隊(duì)列過長(消費(fèi)速度過慢)時,MQ會溢出的數(shù)據(jù)如何處理
6、是否會越權(quán)消費(fèi)別的 topic中的信息
7、如果是PULL類型的消費(fèi)者,需要測試?yán)〉臅r間間隔,如果是push的類型,需要測試當(dāng)有生產(chǎn)者生成消息時,消費(fèi)者是否能及時得到信息并消費(fèi)
總結(jié)

Mq在互聯(lián)網(wǎng)項(xiàng)目中使用特別廣泛,如果想在面試中增加自己的亮點(diǎn),拿高薪,個人覺得可以在這方面突破。

作者:懂Java的測試


歡迎關(guān)注微信公眾號 :Python測試社區(qū)