技術(shù)干貨實(shí)戰(zhàn)(1)- RabbitMQ死信與延遲隊(duì)列的區(qū)別與實(shí)現(xiàn)

作者: 修羅debug
版權(quán)聲明:本文為博主原創(chuàng)文章,遵循 CC 4.0 by-sa 版權(quán)協(xié)議,轉(zhuǎn)載請(qǐng)附上原文出處鏈接和本聲明。





摘要:對(duì)于消息中間件RabbitMQ,想必各位小伙伴并不陌生,其廣泛應(yīng)用程度不言而喻,此前我們也在許多課程以及諸多專欄文章中介紹了它的應(yīng)用,其應(yīng)用場景也是相當(dāng)廣泛的,像什么消息異步通信、服務(wù)模塊解耦、高并發(fā)流量削峰、訂單超時(shí)未支付自動(dòng)失效等等都是實(shí)際項(xiàng)目中最為常見的場景。本文我們將重點(diǎn)介紹并實(shí)現(xiàn)RabbitMQ的死信與延時(shí)隊(duì)列,并將兩者做一個(gè)簡單的對(duì)比!


內(nèi)容:對(duì)于RabbitMQ的死信隊(duì)列,此前我們?cè)凇癑ava秒殺系統(tǒng)”這一技術(shù)專欄中已經(jīng)有重點(diǎn)介紹過了,在那里我們是將其應(yīng)用于 “訂單超時(shí)未支付自動(dòng)失效”這一業(yè)務(wù)場景中,簡而言之,“死信隊(duì)列”是一種特殊的“隊(duì)列”,跟普通的隊(duì)列相比,具有“延遲處理任務(wù)”的特性。


而在消息中間件RabbitMQ的架構(gòu)組件中,也存在著跟“死信隊(duì)列”在功能特性方面幾乎相同的組件,那就是“延遲隊(duì)列/延時(shí)隊(duì)列”,同樣也具有“延遲、延時(shí)處理任務(wù)”的功效!

當(dāng)然啦,這兩者還是有一丟丟區(qū)別的,最直觀的當(dāng)然是名字上啦,從名字上你就可以看出來兩者的“處事風(fēng)格”是不一樣的,具體體現(xiàn)在:


一、創(chuàng)建上的差異:

(1)RabbitMQ的死信隊(duì)列DeadQueue是由“死信交換機(jī)DLX”+“死信路由DLK”組成的,當(dāng)然,可能還會(huì)有“TTL”,而DLX和DLK又可以綁定指向真正的隊(duì)列RealQueue,這個(gè)隊(duì)列RealQueue便是“消費(fèi)者”真正監(jiān)聽的對(duì)象.

(2)而RabbitMQ的延遲/延時(shí)隊(duì)列DelayedQueue 則是由普通的隊(duì)列來創(chuàng)建即可,唯一不同的地方在于其綁定的交換機(jī)為自定義的交換機(jī),即“CustomExchange”,在創(chuàng)建該交換機(jī)時(shí)只需要指定其消息的類型為 “x-delayed-message”即可.“消費(fèi)者”真正監(jiān)聽的隊(duì)列也是它本人,即DelayedQueue

畫外音:從這一點(diǎn)上看,延遲/延時(shí)隊(duì)列的創(chuàng)建相對(duì)而言簡單一些?。?/span>


二、功能特性上的差異:

(1)死信隊(duì)列在實(shí)際應(yīng)用時(shí)雖然可以實(shí)現(xiàn)“延時(shí)、延遲處理任務(wù)”的功效,但進(jìn)入死信中的消息卻依然保留了隊(duì)列的特性,即“FIFO” ~ 先進(jìn)先出,而不管先后進(jìn)入隊(duì)列中消息的TTL的值. 即假設(shè)先后進(jìn)入死信的消息為A、B、C,各自的TTL分別為:10s、3s、5s,理論上TTL先后到達(dá)的順序是:B、C、A,然后從死信出來,最終被路由到真正的隊(duì)列中,即消息被消費(fèi)的先后順序應(yīng)該為:B、C、A,然而現(xiàn)實(shí)卻是殘酷的,其最終消費(fèi)的消息的順序?yàn)椋篈、B、C,即“消息是怎么進(jìn)去的,就怎么出來”,保留了所謂的FIFO特性.

(2)或許是因?yàn)樗佬庞羞@種缺陷,所以RabbitMQ提供了另一種組件,即“延遲隊(duì)列”,它可以很完美的解決上面死信出現(xiàn)的問題,即最終消費(fèi)的消息的順序?yàn)椋築、C、A,我們將在下面用實(shí)際的代碼進(jìn)行實(shí)戰(zhàn)實(shí)現(xiàn)與演練.


三、插件安裝上的差異:

(1)死信不需要額外的插件

(2)但是延遲隊(duì)列在實(shí)際項(xiàng)目使用時(shí)卻需要在Mq Server中安裝一個(gè)插件,它的名字叫做:“rabbitmq_delayed_message_exchange”,其安裝過程可以參考鏈接:https://www.cnblogs.com/isunsine/p/11572457.html  里面就提供了Windows環(huán)境和Linux環(huán)境下的插件的安裝過程(很簡單,只需要不到3步的步驟.)


四、代碼的實(shí)戰(zhàn)實(shí)現(xiàn)~RabbitMQ的死信隊(duì)列

       說了這么多,想必有些小伙伴有點(diǎn)不耐煩了,下面我將采用實(shí)際的代碼對(duì)上面所介紹的幾點(diǎn)區(qū)別進(jìn)行實(shí)現(xiàn)與演練(代碼都是基于Spring Boot2.0搭建的項(xiàng)目環(huán)境實(shí)現(xiàn)與測試的)

(1)首先,我們需要?jiǎng)?chuàng)建死信隊(duì)列以及真正的隊(duì)列,并實(shí)現(xiàn)相關(guān)的綁定:   

   //構(gòu)建訂單超時(shí)未支付的死信隊(duì)列消息模型
@Bean
public Queue successKillDeadQueue(){
Map<String, Object> argsMap= Maps.newHashMap();
argsMap.put("x-dead-letter-exchange",env.getProperty("mq.kill.item.success.kill.dead.exchange"));
argsMap.put("x-dead-letter-routing-key",env.getProperty("mq.kill.item.success.kill.dead.routing.key"));
return new Queue(env.getProperty("mq.kill.item.success.kill.dead.queue"),true,false,false,argsMap);
}

//基本交換機(jī)
@Bean
public TopicExchange successKillDeadProdExchange(){
return new TopicExchange(env.getProperty("mq.kill.item.success.kill.dead.prod.exchange"),true,false);
}

//創(chuàng)建基本交換機(jī)+基本路由 -> 死信隊(duì)列 的綁定
@Bean
public Binding successKillDeadProdBinding(){
return BindingBuilder.bind(successKillDeadQueue()).to(successKillDeadProdExchange()).with(env.getProperty("mq.kill.item.success.kill.dead.prod.routing.key"));
}

//真正的隊(duì)列
@Bean
public Queue successKillRealQueue(){
return new Queue(env.getProperty("mq.kill.item.success.kill.dead.real.queue"),true);
}

//死信交換機(jī)
@Bean
public TopicExchange successKillDeadExchange(){
return new TopicExchange(env.getProperty("mq.kill.item.success.kill.dead.exchange"),true,false);
}

//死信交換機(jī)+死信路由->真正隊(duì)列 的綁定
@Bean
public Binding successKillDeadBinding(){
return BindingBuilder.bind(successKillRealQueue()).to(successKillDeadExchange()).with(env.getProperty("mq.kill.item.success.kill.dead.routing.key"));
}


(2)將項(xiàng)目運(yùn)行起來,登錄RabbitMQ的后端控制臺(tái),可以看到成功創(chuàng)建了相應(yīng)的死信隊(duì)列和真正的隊(duì)列等組件,如下圖所示:



(3)緊接著,我們?cè)贑ontroller中建立一個(gè)請(qǐng)求方法,用于接收前端請(qǐng)求過來的消息,并將該消息附以TTL值,塞入死信隊(duì)列中,如下所示:

    //死信隊(duì)列-生產(chǎn)者
@RequestMapping(value = "dead/msg/send",method = RequestMethod.GET)
@ResponseBody
public BaseResponse sendDQMsg(@RequestParam String msg,@RequestParam Long ttl){
BaseResponse response=new BaseResponse(StatusCode.Success);
try {
Message realMsg=MessageBuilder.withBody(msg.getBytes("UTF-8")).build();

rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
rabbitTemplate.convertAndSend(env.getProperty("mq.kill.item.success.kill.dead.prod.exchange"), env.getProperty("mq.kill.item.success.kill.dead.prod.routing.key"), realMsg, message -> {
MessageProperties mp=message.getMessageProperties();
mp.setDeliveryMode(MessageDeliveryMode.PERSISTENT);

//TODO:動(dòng)態(tài)設(shè)置TTL
mp.setExpiration(String.valueOf(ttl));

log.info("死信隊(duì)列生產(chǎn)者-發(fā)出消息:{} TTL:{}",msg,ttl);
return message;
});
}catch (Exception e){
response=new BaseResponse(StatusCode.Fail.getCode(),e.getMessage());
}
return response;
}


(4)最后是寫一個(gè)Spring Bean類充當(dāng)消費(fèi)者,在其中監(jiān)聽“實(shí)際隊(duì)列”的消息:  

@RabbitListener(queues = {"${mq.kill.item.success.kill.dead.real.queue}"},containerFactory = "singleListenerContainer")
public void consumeExpireOrder(@Payload byte[] msg){
try {
log.info("死信隊(duì)列-監(jiān)聽者-接收消息:{}",new String(msg,"UTF-8"));

}catch (Exception e){
log.error("死信隊(duì)列-監(jiān)聽者-發(fā)生異常:",e.fillInStackTrace());
}
}


最后,我們進(jìn)入測試環(huán)節(jié),打開Postman,前后輸入3次不同的請(qǐng)求信息,其中各自的TTL也不盡相同,即消息A的TTL=50s,消息B的TTL=20s,消息C的TTL=10s,最終在Console控制臺(tái)等待,你會(huì)發(fā)現(xiàn)消費(fèi)者監(jiān)聽的消息的順序?yàn)椋篈、B、C,而不是C、B、A,如下圖所示:




五、代碼的實(shí)戰(zhàn)實(shí)現(xiàn)~RabbitMQ的延遲/延時(shí)隊(duì)列

很明顯,由于死信存在的這個(gè)缺陷,故而其在上面的應(yīng)用場景中是不太適用的!即死信隊(duì)列在 消息的TTL不一致,且后入死信的消息TTL小于前入的消息TTL的應(yīng)用場景中是不適用的,而像“訂單超時(shí)未支付”的應(yīng)用場景,因?yàn)榇蠹叶家粯?,都是固定?0min或者 1h,故而這種場景,死信是相當(dāng)適合的

因此,為了解決實(shí)際項(xiàng)目中“TTL不一致且不固定”的應(yīng)用場景,我們需要搬上“延遲/延時(shí)隊(duì)列”(當(dāng)然啦,Redisson的延遲/延遲隊(duì)列也是可以實(shí)現(xiàn)的!),下面我們用代碼加以實(shí)現(xiàn)!

(1)首先是創(chuàng)建“延遲/延時(shí)隊(duì)列”等相關(guān)的組件,如下所示:

    //TODO:RabbitMQ延遲隊(duì)列
@Bean
public Queue delayQueue(){
return QueueBuilder.durable(env.getProperty("mq.kill.delay.queue")).build();
}

@Bean
public CustomExchange delayExchange(){
Map<String,Object> map=Maps.newHashMap();
map.put("x-delayed-type","direct");
return new CustomExchange(env.getProperty("mq.kill.delay.exchange"),"x-delayed-message",true,false,map);
}

@Bean
public Binding delayBinding(){
return BindingBuilder.bind(delayQueue()).to(delayExchange()).with(env.getProperty("mq.kill.delay.routingKey")).noargs();
}

(2)其生產(chǎn)者發(fā)送消息的代碼我們?nèi)匀皇欠旁谝粋€(gè)Controller的請(qǐng)求方法中,如下所示:  

//延遲隊(duì)列-生產(chǎn)者
@RequestMapping(value = "delay/msg/send",method = RequestMethod.GET)
@ResponseBody
public BaseResponse sendDelayMsg(@RequestParam String msg,@RequestParam Long ttl){
BaseResponse response=new BaseResponse(StatusCode.Success);
try {
String info=msg;

Message realMsg=MessageBuilder.withBody(info.getBytes("UTF-8")).build();
rabbitTemplate.convertAndSend(env.getProperty("mq.kill.delay.exchange"),env.getProperty("mq.kill.delay.routingKey"),
realMsg, new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
MessageProperties mp=message.getMessageProperties();
mp.setDeliveryMode(MessageDeliveryMode.PERSISTENT);
mp.setHeader("x-delay",ttl);

log.info("延遲隊(duì)列生產(chǎn)者-發(fā)出消息:{} TTL:{}",msg,ttl);
return message;
}
});

}catch (Exception e){
response=new BaseResponse(StatusCode.Fail.getCode(),e.getMessage());
}
return response;
}

(3)最后是用于監(jiān)聽延遲隊(duì)列中消息的消費(fèi)者的代碼,如下所示:  

/**
* 延時(shí)隊(duì)列-消息監(jiān)聽器-消費(fèi)者
* @Author:debug (SteadyJack)
* @Link: weixin-> debug0868 qq-> 1948831260
**/
@Component
public class DelayQueueMqListener {
private static final Logger log= LoggerFactory.getLogger(DelayQueueMqListener.class);

//消息監(jiān)聽
@RabbitListener(queues = {"${mq.kill.delay.queue}"})
public void consumeMsg(@Payload byte[] msg){
try {
String info=new String(msg,"UTF-8");

log.info("延時(shí)隊(duì)列監(jiān)聽到消息:{} ",info);
}catch (Exception e){
log.error("延時(shí)隊(duì)列-消息監(jiān)聽器-消費(fèi)者-消息監(jiān)聽-發(fā)生異常:",e.fillInStackTrace());
}
}
}

(4)將項(xiàng)目跑起來,可以看到RabbitMQ的后端控制臺(tái)已經(jīng)建立了該隊(duì)列,如下圖所示:


(5)最后,我們打開postman,前后輸入3次不同的請(qǐng)求信息,其中各自的TTL也不盡相同,即消息A的TTL=50s,消息B的TTL=20s,消息C的TTL=10s,最終在Console控制臺(tái)等待,你會(huì)發(fā)現(xiàn)消費(fèi)者監(jiān)聽的消息的順序?yàn)椋篊、B、A,而不是A、B、C,,即按照消息的TTL來決定消費(fèi)的先后順序,如下圖所示:


從該運(yùn)行結(jié)果上看,會(huì)發(fā)現(xiàn)這才是我們真正想要的結(jié)果,即按照時(shí)間TTL的大小來決定消息被消費(fèi)的先后順序,而且,你可以看出消費(fèi)時(shí)的時(shí)間跟發(fā)出的時(shí)間剛好差 TTL !

在文章的最后的,我們簡單總結(jié)一下本文所講的內(nèi)容,即主要介紹、對(duì)比并實(shí)戰(zhàn)了RabbitMQ中兩款具有“延時(shí)、延遲處理任務(wù)”功效的組件,即“死信隊(duì)列”和“延遲隊(duì)列”,其差異性主要體現(xiàn)在:創(chuàng)建上的不同、功能特性的不同、插件安裝上的不同等方面。

總體來說,如果是想追求消息傳輸?shù)姆€(wěn)定性、可靠性且TTL是固定的話,那么建議選擇“死信隊(duì)列”,因?yàn)橄囊婚_始就在隊(duì)列中待著,等到TTL一到才被路由到真正的隊(duì)列!而“延遲隊(duì)列”則不同,即發(fā)送出去的消息需要等待 TTL 的時(shí)間才進(jìn)入“延遲隊(duì)列”,如果在等待的期間,Mq Server 宕機(jī)了,那很可能消息就丟失了…..

好了,本文我們就介紹到這里了,最后打個(gè)小廣告,Debug最近上新了一門新課:Java分布式中間件大匯聚實(shí)戰(zhàn)第一季 (SpringBoot2.0+點(diǎn)贊系統(tǒng)+面試),課程所介紹的內(nèi)容正是基于企業(yè)級(jí)項(xiàng)目真實(shí)的應(yīng)用案例為出發(fā)點(diǎn),來實(shí)戰(zhàn)各種典型的主流技術(shù)棧,目前課程還處于優(yōu)惠期,原價(jià)是129,目前仍然是59.9而已哦(下個(gè)月就要漲價(jià)嘍…)


課程觀看:https://www.fightjava.com/web/index/course/detail/15 !

其他相關(guān)的技術(shù),感興趣的小伙伴可以關(guān)注底部Debug的技術(shù)公眾號(hào),或者加Debug的微信,拉你進(jìn)“微信版”的真正技術(shù)交流群!一起學(xué)習(xí)、共同成長!


補(bǔ)充

1、若想學(xué)習(xí)其他的技術(shù)干貨,可以前往Debug自建的技術(shù)社區(qū)進(jìn)行學(xué)習(xí)觀看,包括技術(shù)專欄、博客和課程哦:https://www.fightjava.com/

2、關(guān)注一下Debug的技術(shù)微信公眾號(hào),最新的技術(shù)文章、課程以及技術(shù)專欄將會(huì)第一時(shí)間在公眾號(hào)發(fā)布哦!