RocketMQ 詳解系列
什么是RocketMQ
RocketMQ作為一款純java、分布式、隊(duì)列模型的開源消息中間件,支持事務(wù)消息、順序消息、批量消息、定時(shí)消息、消息回溯等。主要功能是異步解耦和流量削峰:。
常見的MQ主要有:ActiveMQ、RabbitMQ、Kafka、RocketMQ
四種消息中間件的基本介紹:
特性 | ActiveMQ | RabbitMQ | RocketMQ | Kafka |
---|---|---|---|---|
單機(jī)吞吐量 | 萬級(jí),比RocketMQ和Kafka第一個(gè)級(jí)別 | 同ActiveMQ | 10萬級(jí),支撐高吞吐 | 10萬級(jí),高吞吐,一般配合大數(shù)據(jù)類的系統(tǒng)進(jìn)行實(shí)時(shí)數(shù)據(jù)計(jì)算、日志采集等場景 |
topic數(shù)量對(duì)吞吐量的影響 | topic可以達(dá)到幾百/幾千級(jí)別,吞吐量會(huì)有較小幅度的下降,這是RocketMQ的一大優(yōu)勢,在同等機(jī)器下,可以支撐大量的topic | topic從幾十到幾百時(shí),吞吐量會(huì)大幅度下降,在同等機(jī)器下,kafka盡量保證topic數(shù)量不要過多,如果要支撐大規(guī)模的topic,需要增加更多的機(jī)器資源 | ||
時(shí)效性 | ms級(jí) | 微秒級(jí)別,RabbitMQ的特性,延遲最低 | ms級(jí)別 | 延遲在ms級(jí)別以內(nèi) |
可用性 | 高,基于主從架構(gòu)實(shí)現(xiàn)高可用 | 同ActiveMQ | 非常高,分布式架構(gòu) | 非常高,分布式一個(gè)數(shù)據(jù)多個(gè)副本,少數(shù)機(jī)器宕機(jī),不會(huì)丟失數(shù)據(jù),不會(huì)導(dǎo)致不可用 |
消息可靠性 | 有較低的概率丟失數(shù)據(jù) | 基本不丟 | 經(jīng)過參數(shù)優(yōu)化配置,可以做到0丟失 | 經(jīng)過參數(shù)優(yōu)化配置,可以做到0丟失 |
功能支持 | MQ領(lǐng)域的功能機(jī)器完備 | 基于erlang開發(fā),并發(fā)能力很強(qiáng),性能極好,延時(shí)很低 | MQ功能較為完善,基本分布式,擴(kuò)展性好 | 功能較簡單,主要支持簡單的MQ功能,在大數(shù)據(jù)領(lǐng)域的實(shí)時(shí)計(jì)算以及日志采集被大規(guī)模使用 |
其他 | Apache開發(fā),起步早,沒有經(jīng)過高吞吐場景驗(yàn)證,社區(qū)不活躍 | 開源、穩(wěn)定、社區(qū)活躍度高 | 阿里開源,交給Apache,社區(qū)活躍度低 | Apache開發(fā),開源、高吞吐量、社區(qū)活躍度高 |
消息中間件的使用場景:
異步與解耦:
當(dāng)我們下了一個(gè)訂單之后,訂單系統(tǒng)會(huì)進(jìn)行RPC同步調(diào)用 支付系統(tǒng)、庫存系統(tǒng)、物流系統(tǒng)等,那么系統(tǒng)之間就會(huì)有耦合性,耦合性越高的話,容錯(cuò)性就越低,比如我們的支付系統(tǒng)如果宕機(jī)了,就會(huì)導(dǎo)致我們整個(gè)交易的異常,從而影響用戶的體驗(yàn)。
如果我們中間加入了消息中間件,不管是支付還是庫存等系統(tǒng),都是通過異步的方式進(jìn)行調(diào)用的,如果其中一個(gè)系統(tǒng)宕機(jī)了,不會(huì)影響我們用戶下單的使用。
本質(zhì)上MQ第一步完成了 異步 ,第二步完成了 解耦 。那么系統(tǒng)的容錯(cuò)性就越高。
流量削峰:
流量削峰也可以叫削峰填谷,比如一些互聯(lián)網(wǎng)公司大促場景,雙十一、店慶或者秒殺活動(dòng),都會(huì)使用到消息中間件。
如果在不使用消息中間件或者沒有流量削峰,每秒是很高的并發(fā),這個(gè)時(shí)候如果我們的A系統(tǒng),如果要將數(shù)據(jù)寫入到我們的MYSQL中,受限于MYSQL本身服務(wù)的上限,最大我們只能每秒處理200請求,這個(gè)時(shí)候會(huì)有大量的消息進(jìn)行堆積,從而導(dǎo)致A系統(tǒng)的奔潰。
這個(gè)時(shí)候我們可以將用戶的請求消息通過MQ進(jìn)行寫入,因?yàn)橄⒅虚g件本身是對(duì)數(shù)據(jù)量處理比較高的一個(gè)系統(tǒng),所以對(duì)于每秒2000請求,消息中間件可以處理,然后A系統(tǒng)作為消息中間件的一個(gè)消費(fèi)者,以固定的速度從MQ中拉取200個(gè)消息,完成我們的業(yè)務(wù)操作,用時(shí)間換空間 從而確保我們A系統(tǒng)的穩(wěn)定性。
數(shù)據(jù)分發(fā):
如果S系統(tǒng),在對(duì)系統(tǒng)進(jìn)行開發(fā)的時(shí)候,需要對(duì)接多個(gè)(A、B、C、D)系統(tǒng),使用傳統(tǒng)的接口調(diào)用,中間有改動(dòng)就需要修改我們的代碼,當(dāng)新增了A系統(tǒng),我們需要去修改代碼去調(diào)用A系統(tǒng)來完成對(duì)應(yīng)的業(yè)務(wù)邏輯,如果我們當(dāng)中的D系統(tǒng)需要移除, 同樣也需要修改代碼刪除對(duì)應(yīng)的接口調(diào)用。
如果S系統(tǒng)使用了消息中間件,我們S系統(tǒng)只要將消息交給MQ,剩下的不論是新增還是移除,還是原有的,他們都只是消息中間件的一個(gè)消費(fèi)者,這個(gè)時(shí)候我們就便于數(shù)據(jù)的分發(fā)。
比如我們新增一個(gè)系統(tǒng),我們只需要新增一個(gè)MQ的消費(fèi)者,直接從MQ里面拿消息就可以,當(dāng)我們需要移除一個(gè)系統(tǒng)的時(shí)候,只需要取消對(duì)MQ消息的監(jiān)聽即可。對(duì)于我們原有的S系統(tǒng)不需要進(jìn)行額外的修改。如果使用MQ作為數(shù)據(jù)分發(fā),減少數(shù)據(jù)的修改,提高開發(fā)的效率。
RocketMQ 基本概念
RocketMQ主要有四大核心組成部分:NameServer、Broker、Producer以及Consumer四部分
。這些角色通常以集群的方式存在,RocketMQ 基于純Java開發(fā),具有高吞吐量、高可用性、適合大規(guī)模分布式系統(tǒng)應(yīng)用的特點(diǎn)。
對(duì)于 RockerMQ
而言,我們想要啟動(dòng),必須首先啟動(dòng) NameServer
,在啟動(dòng) Brober
主機(jī), Brober
會(huì)向 NameServer 注冊對(duì)應(yīng)的路由和服務(wù)(Broker 地址、主體和),Producer會(huì)進(jìn)行路由的發(fā)現(xiàn),向NameServer請求Broker路由信息,進(jìn)行消息的發(fā)送。
作為Consumer要連通NameServer,獲取到相關(guān)的路由信息,方便我們進(jìn)行消息的訂閱。
Broker 也是一個(gè)很重要的角色,主要負(fù)責(zé)消息的存儲(chǔ),不管是生產(chǎn)消息還是訂閱消息,消息的來源都是 Broker,一般來說消息的發(fā)送(Producer)只會(huì)發(fā)到主節(jié)點(diǎn),然后Broker會(huì)進(jìn)行消息的同步,同步到從節(jié)點(diǎn),作為消費(fèi)者(Consumer)也只會(huì)優(yōu)先從Master節(jié)點(diǎn),獲取消息,進(jìn)行消費(fèi),除非主節(jié)點(diǎn)不可用或者非常繁忙,才會(huì)從從節(jié)點(diǎn)進(jìn)行消費(fèi),Broker除了消息的中轉(zhuǎn),還負(fù)責(zé)消息的持久化以及主從數(shù)據(jù)之間的復(fù)制
NameServer:
NameServer
是一個(gè)服務(wù)與注冊的發(fā)現(xiàn)中心。也是整個(gè) RocketMQ 的“大腦”,所以 RocketMQ 需要先啟動(dòng) NameServer
再啟動(dòng) RocketMQ 中的 Broker
NameServer
是一個(gè)幾乎無狀態(tài)節(jié)點(diǎn),可集群部署,節(jié)點(diǎn)之間無任何信息同步。NameServer
底層由 Netty 實(shí)現(xiàn),是內(nèi)存式存儲(chǔ),所以 NameServer
中的 broker、topic不會(huì)持久化。
NameServer
其角色類似Dubbo和zookeeper,主要負(fù)責(zé)Broker的動(dòng)態(tài)注冊與發(fā)現(xiàn)。為什么不使用zookeeper?rocketmq主要是在分布式情況下使用追求性能,因?yàn)閦ookeeper最求最終一致性,所以在性能上會(huì)有所折扣。
Broker:
消息服務(wù)器(Broker
)是消息存儲(chǔ)中心,主要作用是接收來自 Producer
的消息并存儲(chǔ),Consumer
從這里取得消息。存儲(chǔ)與消息相關(guān)的元數(shù)據(jù),包括用戶組、消費(fèi)進(jìn)度偏移量、隊(duì)列信息等。從部署結(jié)構(gòu)圖中可以看出 Broker
有 Master
和 Slave
兩種類型, Master
既可以寫又可以讀,Slave
不可以寫只可以讀。
Producer:
Producer
也稱為消息發(fā)布者(生產(chǎn)者),負(fù)責(zé)生產(chǎn)并發(fā)送消息至 Topic
。生產(chǎn)者向 broker
發(fā)送由業(yè)務(wù)應(yīng)用程序系統(tǒng)生成的消息。RocketMQ
提供了發(fā)送:同步、異步和單向(one-way)的多種范例。
Consumer:
也稱為消息訂閱者,負(fù)責(zé)從 Topic 接收并消費(fèi)消息。消費(fèi)者從 brokers
那里拉取信息并將其輸入應(yīng)用程序。從Master拿到消息,執(zhí)行完成后,會(huì)發(fā)送一個(gè)消息給Broker進(jìn)行確認(rèn),這個(gè)就是ACK確認(rèn)
RocketMQ 基本概念
分組(Group)
Group 分為兩個(gè)部分 生產(chǎn)者和消費(fèi)者
-
生產(chǎn)者: 表示發(fā)送同一類消息的 Producer,通常情況下發(fā)送邏輯是一致的。發(fā)送普通消息時(shí),用于標(biāo)識(shí)使用,沒有特別的用處。
主要用來作用于事務(wù)消息,當(dāng)事務(wù)消息中某條消息一直處于等待狀態(tài)并超時(shí),Broker會(huì)回查同一個(gè)Group下的其他producer,確定該消息是 commit 還是 rollback
-
消費(fèi)者: 消費(fèi)者的分組就非常有意義了,消費(fèi)者是標(biāo)識(shí)一類
Consumer
的集合名稱,這類Consumer
通常消費(fèi)一類消息,且消費(fèi)邏輯一致。同一個(gè)Consumer Group
下的各個(gè)實(shí)例將共同消費(fèi) topic 的消息,起到負(fù)載均衡的作用。消費(fèi)進(jìn)度以
Consumer Group
為粒度管理,不同Consumer Group
之間消費(fèi)進(jìn)度彼此不受影響,即消息 A 被Consumer Group1
消費(fèi)過,也會(huì)再給Consumer Group2
消費(fèi)。
主體(Topic)
用來區(qū)分消息的種類,表示一類消息的邏輯名字,消息的邏輯管理單位,無論生產(chǎn)還是消費(fèi)消息,都需要執(zhí)行Topic。
一個(gè)發(fā)送者可以發(fā)送消息給一個(gè)或者多個(gè)Topic;
一個(gè)消息接受者可以訂閱一個(gè)或多個(gè)Topic消息;
消息隊(duì)列(Message Queue)
消息隊(duì)列 簡稱 Queue ,消息物理管理單位。用來并行發(fā)送和接收消息,相當(dāng)于是Topic的分區(qū)。
一個(gè)Topic會(huì)有若干個(gè)Queue,消息的生產(chǎn)一般會(huì)比消息消費(fèi)的速度要快,消息進(jìn)行消費(fèi)的時(shí)會(huì)有對(duì)應(yīng)的業(yè)務(wù)邏輯進(jìn)行處理,這個(gè)時(shí)候就會(huì)降低消息消費(fèi)的速度。所有一般Topic會(huì)有若干個(gè)Queue。主要用來解決生產(chǎn)很快,消費(fèi)很慢。
如果同一個(gè)Topic創(chuàng)建在不同的Broker,那么不同的Broker有不同的Queue,將物理存儲(chǔ)在不同的Broker節(jié)點(diǎn)之上,具有水平擴(kuò)展的能力。無論是生產(chǎn)者還是消費(fèi)者,實(shí)際的操作都是針對(duì)Queue級(jí)別。
標(biāo)簽(Tag)
RocketMQ 支持在發(fā)送時(shí)給 topic 的消息設(shè)置 tag,用于同一主題下區(qū)分不同類型的消息。
來自同一業(yè)務(wù)單元的消息,可以根據(jù)不同業(yè)務(wù)目的在同一主題下設(shè)置不同標(biāo)簽。比如有一個(gè) Topic 消息為水果,那么水果可以有其他的標(biāo)簽 可以是 香蕉、西瓜、草莓等等,我們可以把對(duì)應(yīng)的消息,打上對(duì)應(yīng)的標(biāo)簽(Tag),這個(gè)就是方便我們在消費(fèi)的時(shí)候做對(duì)應(yīng)的篩選。
標(biāo)簽?zāi)軌蛴行У乇3执a的清晰度和連貫性,并優(yōu)化 RocketMQ 提供的查詢系統(tǒng)。消費(fèi)者可以根據(jù)Tag實(shí)現(xiàn)對(duì)不同子主題的不同消費(fèi)邏輯,實(shí)現(xiàn)更好的擴(kuò)展性。
偏移量(Offset)
在 RocketMQ 中,有很多 offset 的概念。一般我們只關(guān)心暴露到客戶端的 offset。不指定的話,一般指的是消費(fèi)者消息的偏移量(ConsumerOffset)
Message queue
是無限長的數(shù)組。一條消息進(jìn)來下標(biāo)就會(huì)漲 1,而這個(gè)數(shù)組的下標(biāo)就是 offset。
Message queue
中的 max offset 表示消息的最大 offset,Consumer offset
可以理解為標(biāo)記 Consumer Group 在一條邏輯 Message Queue
上,消息消費(fèi)到哪里即消費(fèi)進(jìn)度。
RocketMQ 下載安裝
下載地址:https://rocketmq.apache.org/dowloading/releases/
環(huán)境要求:
- Windows/Linux 64位系統(tǒng)
- JDK1.8(64位)
- 源碼安裝需要安裝Maven 3.2.x
這里我們用 rocketmq-4.9.2
來做演示案例。
設(shè)置環(huán)境變量:
變量名: ROCKETMQ_HOME
變量值: MQ解壓路徑\MQ文件夾名
啟動(dòng)
在rocketmq-4.9.2\bin
目錄下,打開cmd窗口
先啟動(dòng) nameServer,啟動(dòng)命令:start mqnamesrv.cmd
然后在啟動(dòng) Broker,啟動(dòng)命令:start mqbroker.cmd -n 127.0.0.1:7906 autoCreateTopicEnable=true
管理端插件安裝:
老版本地址下載:https://codeload.github.com/apache/rocketmq-externals/zip/master
新版本地址:https://github.com/apache/rocketmq-dashboard
啟動(dòng)完成之后,瀏覽器中輸入‘127.0.0.1:8089’,成功后即可進(jìn)行管理端查看。
消息發(fā)送
RocketMQ提供的原生客戶端的API,當(dāng)然除了原生客戶端外,SpringBoot、SpringCloudStream也進(jìn)行了集成,但本質(zhì)上這些也是基于原生API的封裝,所以只需掌握原生API,其他的也會(huì)水到渠成。
導(dǎo)入MQ客戶端依賴
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.9.2</version>
</dependency>
消息發(fā)送:
/**
* 同步發(fā)送
*/
public class SyncProducer {
public static void main(String[] args) throws Exception{
// 實(shí)例化消息生產(chǎn)者Producer
DefaultMQProducer producer = new DefaultMQProducer("group_test");
// 設(shè)置NameServer的地址
producer.setNamesrvAddr("127.0.0.1:9876");
//producer.setSendLatencyFaultEnable(true);
// 啟動(dòng)Producer實(shí)例
producer.start();
for (int i = 0; i < 10; i++) {
// 創(chuàng)建消息,并指定Topic,Tag和消息體
Message msg = new Message("TopicTest" /* Topic */,
"TagA" /* Tag */,
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
);
// 發(fā)送消息到一個(gè)Broker
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
}
//如果不再發(fā)送消息,關(guān)閉Producer實(shí)例。
producer.shutdown();
}
}
總結(jié)
這篇主要是帶大家了解RocketMQ的基本原理和介紹,在后面的章節(jié)中,會(huì)帶大家深入了解和使用RocketMQ,如果覺得文章有幫助的,記得點(diǎn)贊關(guān)注,您的支持是我創(chuàng)作的最大動(dòng)力。
作者:牧小農(nóng)
公眾號(hào):牧小農(nóng),微信掃碼關(guān)注或搜索公眾號(hào)名稱