熬夜整理的RabbitMQ知識(shí)點(diǎn)相當(dāng)齊全的文章
作者:
修羅debug
版權(quán)聲明:本文為博主原創(chuàng)文章,遵循 CC 4.0 by-sa 版權(quán)協(xié)議,轉(zhuǎn)載請(qǐng)附上原文出處鏈接和本聲明。
在如今微服務(wù)、分布式時(shí)代,不懂一點(diǎn)消息隊(duì)列、服務(wù)解耦、異步通信,都不好意思說(shuō)自己做過(guò)Java高并發(fā)、分布式項(xiàng)目了;趁著放假,debug熬夜整理了消息中間件RabbitMQ相關(guān)的知識(shí)點(diǎn),對(duì)于沒(méi)擼過(guò)這一技術(shù)棧的小伙伴而言可以說(shuō)是福利了,而對(duì)于已經(jīng)擼過(guò)的小伙伴而言不妨再過(guò)一遍,畢竟溫故而知新嘛!
以下為本文的目錄,先一睹為快:
- 一、基本概念
- 二、RabbitMQ底層架構(gòu)
- 三、如何在Spring Boot項(xiàng)目中使用
- 四、福利奉上!
話不多說(shuō),咱們直接進(jìn)入正題!
一、簡(jiǎn)介
1.RabbitMQ,一款由Erlang語(yǔ)言開(kāi)發(fā)的、基于AMQP(高級(jí)消息隊(duì)列協(xié)議)實(shí)現(xiàn)的開(kāi)源消息代理軟件,俗稱(chēng)“消息中間件”;
2.那何謂“AMQP”呢,解釋為中文:高級(jí)消息隊(duì)列協(xié)議,是應(yīng)用層協(xié)議的一個(gè)開(kāi)放標(biāo)準(zhǔn),專(zhuān)門(mén)面向消息而設(shè)計(jì)的,基于此協(xié)議的客戶(hù)端與消息中間件可傳遞消息,并不受產(chǎn)品、開(kāi)發(fā)語(yǔ)言等條件的限制;
3.據(jù)說(shuō)這RabbitMQ最初起源于金融系統(tǒng),用于在分布式系統(tǒng)中存儲(chǔ)轉(zhuǎn)發(fā)消息,在易用性、擴(kuò)展性、高可用性等方面表現(xiàn)還挺不俗,具體特點(diǎn)有:
(1)可靠性:RabbitMQ使用一些機(jī)制來(lái)保證可靠性,如持久化、消費(fèi)確認(rèn)、發(fā)布確認(rèn);
(2)靈活的路由:在消息進(jìn)入隊(duì)列之前,通過(guò)交換器Exchange 來(lái)路由消息的;對(duì)于典型的路由功能,RabbitMQ 已經(jīng)提供了一些內(nèi)置Exchange來(lái)實(shí)現(xiàn);針對(duì)更復(fù)雜的路由功能,可以將多個(gè) Exchange 綁定在一起,也通過(guò)插件機(jī)制實(shí)現(xiàn)自己的 Exchange
(3)消息集群:多個(gè)RabbitMQ 服務(wù)器可以組成一個(gè)集群,形成一個(gè)邏輯上的Broker(消息代理服務(wù)器);
(4)高可用:隊(duì)列可以在集群中的機(jī)器節(jié)點(diǎn)進(jìn)行鏡像備份,當(dāng)部分節(jié)點(diǎn)出問(wèn)題時(shí)隊(duì)列仍然可用;
(5)多種協(xié)議支持;RabbitMQ 支持多種消息隊(duì)列協(xié)議,比如 STOMP、MQTT等等;
(6)多語(yǔ)言客戶(hù)端:RabbitMQ 幾乎支持所有常用的開(kāi)發(fā)語(yǔ)言,比如 Java、.NET、Ruby 等;
(7)Web控制臺(tái)管理界面:RabbitMQ 提供了一個(gè)易用的用戶(hù)界面,使得用戶(hù)可以監(jiān)控和管理消息代理服務(wù)器 Broker 中的方方面面(包括隊(duì)列、交換器、路由、消息消費(fèi)情況、隊(duì)列綁定情況、消費(fèi)者實(shí)例情況、使用賬戶(hù)等等)
(8)跟蹤機(jī)制:RabbitMQ 提供了消息跟蹤機(jī)制,在消息傳輸期間出現(xiàn)異常時(shí)使用者可以找出到底發(fā)生了什么事!
(9)插件機(jī)制:RabbitMQ 提供了許多插件,來(lái)從多方面進(jìn)行擴(kuò)展,也可以編寫(xiě)自己的插件,比如RabbitMQ自帶的“延遲隊(duì)列插件”就相當(dāng)好用!
二、RabbitMQ底層架構(gòu)與基本概念
1.如下圖所示為RabbitMQ底層的架構(gòu)圖,消息在底層傳輸期間幾乎經(jīng)歷了其中所涉及的每個(gè)組件!
2.下面介紹下這些組件的具體含義:
(1)Message:消息,就像萬(wàn)物皆對(duì)象一樣,萬(wàn)物皆可充當(dāng)消息!因此消息可以理解為任意的事物、任意的數(shù)據(jù);它由消息頭Header和消息體Body組成;
其中消息體是不透明的,而消息頭則由一系列的可選屬性組成,這些屬性包括 routing-key(路由鍵)、priority(相對(duì)于其他消息的優(yōu)先權(quán))、delivery-mode(指出該消息可能需要持久性存儲(chǔ))等;
(2)Publisher:消息的生產(chǎn)者,也是一個(gè)向交換器發(fā)布消息的客戶(hù)端應(yīng)用程序;
(3)Exchange:交換器,用來(lái)接收生產(chǎn)者發(fā)送的消息并將這些消息路由給服務(wù)器中的隊(duì)列,簡(jiǎn)而言之:即將消息路由給隊(duì)列;
(4)Binding:綁定,用于將消息隊(duì)列綁定、關(guān)聯(lián)到指定的交換器;即用于消息隊(duì)列和交換器之間的關(guān)聯(lián);那如何將其關(guān)聯(lián)、綁定在一起呢,通過(guò)路由鍵RoutingKey實(shí)現(xiàn);
即通過(guò) 路由鍵RoutingKey 將 交換器Exchange 和 消息隊(duì)列 Queue 連接起來(lái)的,從另外一個(gè)角度上看,也可以將交換器Exchange理解成一個(gè)由綁定構(gòu)成的路由表。
(5)Queue:消息隊(duì)列,用于保存消息直到發(fā)送給消費(fèi)者;可以理解為它是消息的容器;一個(gè)消息可投入一個(gè)或多個(gè)隊(duì)列;消息一直在隊(duì)列里面,直到消費(fèi)者連接到這個(gè)隊(duì)列并將其取走 這個(gè) 消息才算真正走到了最后!
(6)Connection:網(wǎng)絡(luò)連接,比如一個(gè) TCP 連接;
(7)Channel:信道,多路復(fù)用連接中的一條獨(dú)立的雙向數(shù)據(jù)流通道;信道是建立在真實(shí)TCP 連接之上的虛擬連接;AMQP 命令都是通過(guò)信道發(fā)出去的,不管是發(fā)布消息、訂閱隊(duì)列還是接收消息,這些動(dòng)作都是通過(guò)信道完成,因?yàn)閷?duì)于操作系統(tǒng)來(lái)說(shuō)建立和銷(xiāo)毀 TCP 都是非常昂貴的開(kāi)銷(xiāo),所以引入了信道的概念,以復(fù)用一條 TCP 連接
(8)Consumer:消息的消費(fèi)者,表示一個(gè)從消息隊(duì)列中取得消息的客戶(hù)端應(yīng)用程序;
(9)Virtual Host:虛擬主機(jī),表示一批交換器、消息隊(duì)列和相關(guān)對(duì)象,虛擬主機(jī)是共享相同的身份認(rèn)證和加密環(huán)境的獨(dú)立服務(wù)器域;
(10)Broker:表示消息隊(duì)列服務(wù)器實(shí)體;
3. 交換器Exchange的類(lèi)型: 交換器分發(fā)消息時(shí)根據(jù)類(lèi)型的不同,分發(fā)策略也有所不同,目前共四種類(lèi)型:direct、fanout、topic、headers ;headers 匹配 AMQP 消息的 header 而不是路由鍵,此外 headers 交換器和direct 交換器完全一致,但性能差很多,目前幾乎用不到了,所以直接看另外三種類(lèi)型:
(1)基于Direct類(lèi)型交換器的傳輸模式:消息中的路由鍵(routing key)如果和 Binding 中的 binding key 一致,交換器就將消息發(fā)到對(duì)應(yīng)的隊(duì)列中,它是完全匹配、單播的模式;如下圖所示:
(2)基于Fanout類(lèi)型交換器的廣播分發(fā)模式:每個(gè)分發(fā)到 Fanout類(lèi)型交換器的消息都會(huì)分到所有綁定的隊(duì)列上去,很像子網(wǎng)廣播,每臺(tái)子網(wǎng)內(nèi)的主機(jī)都獲得了一份復(fù)制的消息,Fanout 類(lèi)型轉(zhuǎn)發(fā)消息是最快的;
(3)基于Topic類(lèi)型交換器的模式匹配分發(fā)模式:Topic交換器通過(guò)模式匹配分配消息的路由鍵屬性,將路由鍵和某個(gè)模式進(jìn)行匹配,此時(shí)隊(duì)列需要綁定到一個(gè)模式上,它將路由鍵和綁定鍵的字符串切分成單詞,這些單詞之間用點(diǎn)隔開(kāi),它同樣也會(huì)識(shí)別兩個(gè)通配符:
符號(hào)“#”和符號(hào)“*”,其中 # 匹配 0 個(gè)或多個(gè)單詞,而 * 匹配不多不少一個(gè)單詞,形象如下圖:
三、如何在Spring Boot項(xiàng)目中使用
說(shuō)一千道一百,最終還是得將其用到實(shí)際的項(xiàng)目并用實(shí)際的開(kāi)發(fā)語(yǔ)言將它用起來(lái),下面將基于Java Spring Boot介紹下如何在Spring Boot搭建的項(xiàng)目使用RabbitMQ,其實(shí),很簡(jiǎn)單,只要把握住上面RabbitMQ的系統(tǒng)架構(gòu)圖即可!
(1)消息(在Java中就是一實(shí)體類(lèi)):
@Data
@AllArgsConstructor
@NoArgsConstructor
public class BlogIndexMsg implements Serializable{
private Integer type;
private Integer blogId;
}
(2)生產(chǎn)者(發(fā)送消息):
@Component
@Slf4j
public class BlogMqService {
@Autowired
private RabbitTemplate rabbitTemplate;
@Autowired
private Environment env;
@Autowired
private ObjectMapper objectMapper;
//用于首頁(yè)微博/朋友圈的相關(guān)操作對(duì)應(yīng)的mq服務(wù)
public void sendIndexBlogMsg(final BlogIndexMsg msg){
try {
rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
rabbitTemplate.setExchange(env.getProperty("mq. exchange.name"));
rabbitTemplate.setRoutingKey(env.getProperty("mq. routing.key.name"));
Message message= MessageBuilder.withBody(objectMapper.writeValueAsBytes(msg))
.setDeliveryMode(MessageDeliveryMode.PERSISTENT)
.build();
rabbitTemplate.convertAndSend(message);
}catch (Exception e){
log.error("用于首頁(yè)微博/朋友圈的相關(guān)操作對(duì)應(yīng)的mq服務(wù)-發(fā)生異常:{} ",msg,e);
}
}
}
(3)消費(fèi)者:
@Component
@Slf4j
public class BlogMqListener {
@Autowired
private ObjectMapper objectMapper;
//消息監(jiān)聽(tīng)器
@RabbitListener(queues = {"${mq. queue.name}"})
public void consumeBlogIndexMsg(@Payload Message msg){
try {
BlogIndexMsg indexMsg=objectMapper.readValue(msg.getBody(),BlogIndexMsg.class);
log.info("消息監(jiān)聽(tīng)器監(jiān)聽(tīng)到的消息:{}",indexMsg);
//執(zhí)行具體的業(yè)務(wù)邏輯代碼
}catch (Exception e){
log.error("消息監(jiān)聽(tīng)器-發(fā)生異常:",e);
}
}
}
(4)消息隊(duì)列與交換器的綁定(即消息模型的構(gòu)建):
@Configuration
public class RabbitConfig {
private static final Logger log= LoggerFactory.getLogger(RabbitConfig.class);
@Autowired
private Environment env;
//構(gòu)建消息模型(消息綁定)
@Bean
public Queue blogIndexQueue(){
return new Queue(env.getProperty("mq.queue.name"),true);
}
@Bean
public TopicExchange blogIndexExchange(){
return new TopicExchange(env.getProperty("mq.exchange.name"),true,false);
}
@Bean
public Binding blogIndexBinding(){
return BindingBuilder.bind(blogIndexQueue()).to(blogIndexExchange()).with(env.getProperty("mq. routing.key.name"));
}
}
然后在代碼的某個(gè)地方調(diào)用 new BlogMqService().sendIndexBlogMsg()即可!?。?/span>
四、福利奉上!
關(guān)于RabbitMQ更多詳細(xì)的介紹,諸位可以觀看debug錄制的 “RabbitMQ實(shí)戰(zhàn)視頻教程”,地址如下所示:https://www.fightjava.com/web/index/course/detail/4
在那里不僅僅介紹了本文所提及的所有知識(shí)點(diǎn),還包括了:消費(fèi)確認(rèn)模式、高并發(fā)下消息限流、服務(wù)解耦、異步通信、死信隊(duì)列等技術(shù)干貨,課程目錄如下圖所示:
總結(jié):
好了,本文就介紹到這里吧,快過(guò)春節(jié)了,提前祝大家新春快樂(lè),身體健康,來(lái)年擼碼更硬?。?!
如果本文對(duì)你有幫助,請(qǐng)關(guān)注公眾號(hào),并動(dòng)動(dòng)手指收藏、點(diǎn)贊、以及轉(zhuǎn)發(fā)哦?。?!