熬夜整理的RabbitMQ知識點相當(dāng)齊全的文章

作者: 修羅debug
版權(quán)聲明:本文為博主原創(chuàng)文章,遵循 CC 4.0 by-sa 版權(quán)協(xié)議,轉(zhuǎn)載請附上原文出處鏈接和本聲明。


在如今微服務(wù)、分布式時代,不懂一點消息隊列、服務(wù)解耦、異步通信,都不好意思說自己做過Java高并發(fā)、分布式項目了;趁著放假,debug熬夜整理了消息中間件RabbitMQ相關(guān)的知識點,對于沒擼過這一技術(shù)棧的小伙伴而言可以說是福利了,而對于已經(jīng)擼過的小伙伴而言不妨再過一遍,畢竟溫故而知新嘛!


以下為本文的目錄,先一睹為快:

  • 一、基本概念
  • 二、RabbitMQ底層架構(gòu)
  • 三、如何在Spring Boot項目中使用
  • 四、福利奉上!


話不多說,咱們直接進(jìn)入正題!

一、簡介

1.RabbitMQ,一款由Erlang語言開發(fā)的、基于AMQP(高級消息隊列協(xié)議)實現(xiàn)的開源消息代理軟件,俗稱“消息中間件”;

2.那何謂“AMQP”呢,解釋為中文:高級消息隊列協(xié)議,是應(yīng)用層協(xié)議的一個開放標(biāo)準(zhǔn),專門面向消息而設(shè)計的,基于此協(xié)議的客戶端與消息中間件可傳遞消息,并不受產(chǎn)品、開發(fā)語言等條件的限制;

3.據(jù)說這RabbitMQ最初起源于金融系統(tǒng),用于在分布式系統(tǒng)中存儲轉(zhuǎn)發(fā)消息,在易用性、擴展性、高可用性等方面表現(xiàn)還挺不俗,具體特點有:

(1)可靠性:RabbitMQ使用一些機制來保證可靠性,如持久化、消費確認(rèn)、發(fā)布確認(rèn);

(2)靈活的路由:在消息進(jìn)入隊列之前,通過交換器Exchange 來路由消息的;對于典型的路由功能,RabbitMQ 已經(jīng)提供了一些內(nèi)置Exchange來實現(xiàn);針對更復(fù)雜的路由功能,可以將多個 Exchange 綁定在一起,也通過插件機制實現(xiàn)自己的 Exchange

(3)消息集群:多個RabbitMQ 服務(wù)器可以組成一個集群,形成一個邏輯上的Broker(消息代理服務(wù)器);

(4)高可用:隊列可以在集群中的機器節(jié)點進(jìn)行鏡像備份,當(dāng)部分節(jié)點出問題時隊列仍然可用;

(5)多種協(xié)議支持;RabbitMQ 支持多種消息隊列協(xié)議,比如 STOMPMQTT等等;

(6)多語言客戶端:RabbitMQ 幾乎支持所有常用的開發(fā)語言,比如 Java、.NET、Ruby 等;

(7)Web控制臺管理界面:RabbitMQ 提供了一個易用的用戶界面,使得用戶可以監(jiān)控和管理消息代理服務(wù)器 Broker 中的方方面面(包括隊列、交換器、路由、消息消費情況、隊列綁定情況、消費者實例情況、使用賬戶等等)

(8)跟蹤機制:RabbitMQ 提供了消息跟蹤機制,在消息傳輸期間出現(xiàn)異常時使用者可以找出到底發(fā)生了什么事!

(9)插件機制:RabbitMQ 提供了許多插件,來從多方面進(jìn)行擴展,也可以編寫自己的插件,比如RabbitMQ自帶的“延遲隊列插件”就相當(dāng)好用!


二、RabbitMQ底層架構(gòu)與基本概念

1.如下圖所示為RabbitMQ底層的架構(gòu)圖,消息在底層傳輸期間幾乎經(jīng)歷了其中所涉及的每個組件!


2.下面介紹下這些組件的具體含義:

1Message:消息,就像萬物皆對象一樣,萬物皆可充當(dāng)消息!因此消息可以理解為任意的事物、任意的數(shù)據(jù);它由消息頭Header和消息體Body組成;

其中消息體是不透明的,而消息頭則由一系列的可選屬性組成,這些屬性包括 routing-key(路由鍵)、priority(相對于其他消息的優(yōu)先權(quán))、delivery-mode(指出該消息可能需要持久性存儲)等;

2Publisher:消息的生產(chǎn)者,也是一個向交換器發(fā)布消息的客戶端應(yīng)用程序;

3Exchange:交換器,用來接收生產(chǎn)者發(fā)送的消息并將這些消息路由給服務(wù)器中的隊列,簡而言之:即將消息路由給隊列;

4Binding:綁定,用于將消息隊列綁定、關(guān)聯(lián)到指定的交換器;即用于消息隊列和交換器之間的關(guān)聯(lián);那如何將其關(guān)聯(lián)、綁定在一起呢,通過路由鍵RoutingKey實現(xiàn);

即通過 路由鍵RoutingKey 將 交換器Exchange 和 消息隊列 Queue 連接起來的,從另外一個角度上看,也可以將交換器Exchange理解成一個由綁定構(gòu)成的路由表。

5Queue:消息隊列,用于保存消息直到發(fā)送給消費者;可以理解為它是消息的容器;一個消息可投入一個或多個隊列;消息一直在隊列里面,直到消費者連接到這個隊列并將其取走 這個 消息才算真正走到了最后!

6Connection:網(wǎng)絡(luò)連接,比如一個 TCP 連接;

7Channel:信道,多路復(fù)用連接中的一條獨立的雙向數(shù)據(jù)流通道;信道是建立在真實TCP 連接之上的虛擬連接;AMQP 命令都是通過信道發(fā)出去的,不管是發(fā)布消息、訂閱隊列還是接收消息,這些動作都是通過信道完成,因為對于操作系統(tǒng)來說建立和銷毀 TCP 都是非常昂貴的開銷,所以引入了信道的概念,以復(fù)用一條 TCP 連接

8Consumer:消息的消費者,表示一個從消息隊列中取得消息的客戶端應(yīng)用程序;

9Virtual Host:虛擬主機,表示一批交換器、消息隊列和相關(guān)對象,虛擬主機是共享相同的身份認(rèn)證和加密環(huán)境的獨立服務(wù)器域;

10Broker:表示消息隊列服務(wù)器實體;


3. 交換器Exchange的類型: 交換器分發(fā)消息時根據(jù)類型的不同,分發(fā)策略也有所不同,目前共四種類型:directfanout、topicheaders ;headers 匹配 AMQP 消息的 header 而不是路由鍵,此外 headers 交換器和direct 交換器完全一致,但性能差很多,目前幾乎用不到了,所以直接看另外三種類型:

1)基于Direct類型交換器的傳輸模式:消息中的路由鍵(routing key)如果和 Binding 中的 binding key 一致,交換器就將消息發(fā)到對應(yīng)的隊列中,它是完全匹配、單播的模式;如下圖所示:














2)基于Fanout類型交換器的廣播分發(fā)模式:每個分發(fā)到 Fanout類型交換器的消息都會分到所有綁定的隊列上去,很像子網(wǎng)廣播,每臺子網(wǎng)內(nèi)的主機都獲得了一份復(fù)制的消息,Fanout 類型轉(zhuǎn)發(fā)消息是最快的; 















3)基于Topic類型交換器的模式匹配分發(fā)模式:Topic交換器通過模式匹配分配消息的路由鍵屬性,將路由鍵和某個模式進(jìn)行匹配,此時隊列需要綁定到一個模式上,它將路由鍵和綁定鍵的字符串切分成單詞,這些單詞之間用點隔開,它同樣也會識別兩個通配符:

   符號“#”和符號“*”,其中 # 匹配 0 個或多個單詞,而 * 匹配不多不少一個單詞,形象如下圖:














三、如何在Spring Boot項目中使用

  說一千道一百,最終還是得將其用到實際的項目并用實際的開發(fā)語言將它用起來,下面將基于Java Spring Boot介紹下如何在Spring Boot搭建的項目使用RabbitMQ,其實,很簡單,只要把握住上面RabbitMQ的系統(tǒng)架構(gòu)圖即可!

1)消息(在Java中就是一實體類):

@Data
@AllArgsConstructor
@NoArgsConstructor
public class BlogIndexMsg implements Serializable{
private Integer type;
private Integer blogId;
}


2)生產(chǎn)者(發(fā)送消息):

@Component
@Slf4j
public class BlogMqService {
@Autowired
private RabbitTemplate rabbitTemplate;

@Autowired
private Environment env;

@Autowired
private ObjectMapper objectMapper;

//用于首頁微博/朋友圈的相關(guān)操作對應(yīng)的mq服務(wù)
public void sendIndexBlogMsg(final BlogIndexMsg msg){
try {
rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
rabbitTemplate.setExchange(env.getProperty("mq. exchange.name"));
rabbitTemplate.setRoutingKey(env.getProperty("mq. routing.key.name"));
Message message= MessageBuilder.withBody(objectMapper.writeValueAsBytes(msg))
.setDeliveryMode(MessageDeliveryMode.PERSISTENT)
.build();
rabbitTemplate.convertAndSend(message);
}catch (Exception e){
log.error("用于首頁微博/朋友圈的相關(guān)操作對應(yīng)的mq服務(wù)-發(fā)生異常:{} ",msg,e);
}
}
}


3)消費者:

@Component
@Slf4j
public class BlogMqListener {

@Autowired
private ObjectMapper objectMapper;

//消息監(jiān)聽器
@RabbitListener(queues = {"${mq. queue.name}"})
public void consumeBlogIndexMsg(@Payload Message msg){
try {
BlogIndexMsg indexMsg=objectMapper.readValue(msg.getBody(),BlogIndexMsg.class);
log.info("消息監(jiān)聽器監(jiān)聽到的消息:{}",indexMsg);

//執(zhí)行具體的業(yè)務(wù)邏輯代碼

}catch (Exception e){
log.error("消息監(jiān)聽器-發(fā)生異常:",e);
}
}
}


4)消息隊列與交換器的綁定(即消息模型的構(gòu)建):

@Configuration
public class RabbitConfig {

private static final Logger log= LoggerFactory.getLogger(RabbitConfig.class);

@Autowired
private Environment env;

//構(gòu)建消息模型(消息綁定)
@Bean
public Queue blogIndexQueue(){
return new Queue(env.getProperty("mq.queue.name"),true);
}

@Bean
public TopicExchange blogIndexExchange(){
return new TopicExchange(env.getProperty("mq.exchange.name"),true,false);
}

@Bean
public Binding blogIndexBinding(){
return BindingBuilder.bind(blogIndexQueue()).to(blogIndexExchange()).with(env.getProperty("mq. routing.key.name"));
}
}


然后在代碼的某個地方調(diào)用 new BlogMqService().sendIndexBlogMsg()即可?。?!


四、福利奉上!

關(guān)于RabbitMQ更多詳細(xì)的介紹,諸位可以觀看debug錄制的 “RabbitMQ實戰(zhàn)視頻教程”,地址如下所示:https://www.fightjava.com/web/index/course/detail/4


在那里不僅僅介紹了本文所提及的所有知識點,還包括了:消費確認(rèn)模式、高并發(fā)下消息限流、服務(wù)解耦、異步通信、死信隊列等技術(shù)干貨,課程目錄如下圖所示:


總結(jié):

好了,本文就介紹到這里吧,快過春節(jié)了,提前祝大家新春快樂,身體健康,來年擼碼更硬?。?!

如果本文對你有幫助,請關(guān)注公眾號,并動動手指收藏、點贊、以及轉(zhuǎn)發(fā)哦?。。?/span>