Java秒殺系統(tǒng)(十七):秒殺邏輯優(yōu)化之RabbitMQ接口限流一

作者: 修羅debug
版權(quán)聲明:本文為博主原創(chuàng)文章,遵循 CC 4.0 by-sa 版權(quán)協(xié)議,轉(zhuǎn)載請(qǐng)附上原文出處鏈接和本聲明。



摘要:本篇博文是“Java秒殺系統(tǒng)實(shí)戰(zhàn)系列文章”的第十七篇,我們將繼續(xù)秒殺系統(tǒng)的優(yōu)化之路。在本篇文章中我們將基于RabbitMQ異步通信、FIFO(先進(jìn)先出)、接口限流的特性,在執(zhí)行秒殺核心的處理邏輯之前架上一層“限流”的處理邏輯,從而讓瞬時(shí)產(chǎn)生的,猶如波濤洶涌、潮水般的請(qǐng)求流量變得井井有條、有序性地到達(dá)后端的秒殺接口! 

內(nèi)容:在前面的篇章中,我們主要是從秒殺的核心處理邏輯著手進(jìn)行優(yōu)化,先后從數(shù)據(jù)庫(kù)級(jí)別Sql的優(yōu)化、分布式鎖的引入、分布式唯一ID算法的引入以及業(yè)務(wù)服務(wù)模塊的異步通信、服務(wù)解耦等方式對(duì)秒殺核心業(yè)務(wù)邏輯的處理進(jìn)行了大幅度的調(diào)整、優(yōu)化,本文Debug將介紹一種獨(dú)立于“秒殺核心業(yè)務(wù)邏輯”的方式對(duì)秒殺系統(tǒng)進(jìn)行優(yōu)化。 

首先,我們先來(lái)回顧一下秒殺系統(tǒng)整體的秒殺業(yè)務(wù)邏輯的處理:


當(dāng)前端高并發(fā)產(chǎn)生多線程請(qǐng)求時(shí),正常情況下,前端會(huì)在瞬時(shí)產(chǎn)生猶如波濤洶涌、潮水般的請(qǐng)求流量到達(dá)后端的秒殺接口,此時(shí)如果我們的代碼處理邏輯并不那么迅速,那么很有可能將應(yīng)對(duì)不了這股蜂擁而至的高并發(fā)流量,最終可能出現(xiàn)各種各樣亂七八糟的問(wèn)題,前面所講的“庫(kù)存超賣”,“重復(fù)秒殺”便是一種體現(xiàn)。

而且,在某種程度上,這些請(qǐng)求流量對(duì)于我們來(lái)講有兩點(diǎn)我們需要去注意的:

(1) 這些請(qǐng)求流量是透明的,我們后端接口壓根不知道、也不需要知道請(qǐng)求對(duì)應(yīng)的用戶是哪位;

(2) 這些請(qǐng)求最終并非全部都是有效的,即如果待秒殺的商品數(shù)量為N,而請(qǐng)求的流量遠(yuǎn)遠(yuǎn)大于N,則很明顯,在秒殺開(kāi)始后將有很大一部分的請(qǐng)求流量對(duì)于我們后端接口而言是木有多大用處的,因?yàn)槊霘⑦M(jìn)行到一定時(shí)間時(shí),N很有可能已經(jīng)等于0了。

基于這兩點(diǎn)設(shè)想,我們希望對(duì)這些請(qǐng)求流量進(jìn)行“規(guī)范化”,當(dāng)前端高并發(fā)產(chǎn)生多線程請(qǐng)求流量時(shí),我們希望將這些請(qǐng)求壓入“隊(duì)列”,使得這些請(qǐng)求可以“乖乖的”等待被處理,即變得井井有條、有序性地到達(dá)后端的秒殺接口,而不是像無(wú)頭蒼蠅般、一窩蜂的插隊(duì)處理! 

眾所周知,RabbitMQ是一款MQ中間件,MQ正是Message Queue,即消息隊(duì)列的簡(jiǎn)稱,而我們都知道隊(duì)列的特點(diǎn)是先進(jìn)先出,即FIFO;它可以實(shí)現(xiàn)先進(jìn)入隊(duì)列的消息先被消費(fèi)處理、后進(jìn)入隊(duì)列的消息后被消費(fèi)處理,因此,RabbitMQ的這一特性將可以助我們實(shí)現(xiàn)“接口限流”的作用,其最終的效果如下圖所示:   


下面我們就進(jìn)入代碼實(shí)戰(zhàn)環(huán)節(jié):

(1) 首先,我們需要在RabbitmqConfig配置類中創(chuàng)建限流用的隊(duì)列、交換機(jī)和路由消息模型,其代碼如下所示:

//TODO:RabbitMQ限流專用

@Bean
public Queue executeLimitQueue(){
Map<String, Object> argsMap=Maps.newHashMap();
//限制channel中隊(duì)列同一時(shí)刻通過(guò)的消息數(shù)量
argsMap.put("x-max-length", env.getProperty("spring.rabbitmq.listener.simple.prefetch",Integer.class));
return new Queue(env.getProperty("mq.kill.item.execute.limit.queue.name"),true,false,false,argsMap);
}

@Bean
public TopicExchange executeLimitExchange(){
return new TopicExchange(env.getProperty("mq.kill.item.execute.limit.queue.exchange"),true,false);
}

@Bean
public Binding executeLimitBinding(){
return BindingBuilder.bind(executeLimitQueue()).to(executeLimitExchange()).with(env.getProperty("mq.kill.item.execute.limit.queue.routing.key"));
}

其中,讀取環(huán)境變量的對(duì)象實(shí)例讀取的配置參數(shù)是配置為配置文件application.properties中的,如下所示:   

#RabbitMQ限流專用
mq.kill.item.execute.limit.queue.name=${mq.env}.kill.item.execute.limit.queue
mq.kill.item.execute.limit.queue.exchange=${mq.env}.kill.item.execute.limit.exchange
mq.kill.item.execute.limit.queue.routing.key=${mq.env}.kill.item.execute.limit.routing.key

(2) 接著,我們需要在RabbitSenderService服務(wù)類中開(kāi)發(fā)一個(gè)用于轉(zhuǎn)移巨大用戶流量的、將請(qǐng)求流量或者消息發(fā)送至RabbitMQ的功能方法,其完整源代碼如下所示:  

//秒殺時(shí)異步發(fā)送Mq消息
public void sendKillExecuteMqMsg(final KillDto killDto){
try {
if (killDto!=null){
rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
rabbitTemplate.setExchange(env.getProperty("mq.kill.item.execute.limit.queue.exchange"));
rabbitTemplate.setRoutingKey(env.getProperty("mq.kill.item.execute.limit.queue.routing.key"));
rabbitTemplate.convertAndSend(killDto, new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
MessageProperties mp=message.getMessageProperties();
mp.setDeliveryMode(MessageDeliveryMode.PERSISTENT);
mp.setHeader(AbstractJavaTypeMapper.DEFAULT_CONTENT_CLASSID_FIELD_NAME,KillDto.class);
return message;
}
});
}
}catch (Exception e){
log.error("秒殺時(shí)異步發(fā)送Mq消息-發(fā)生異常,消息為:{}",killDto,e.fillInStackTrace());
}
}

(3) 轉(zhuǎn)移消息進(jìn)入隊(duì)列之后,消息將變得井井有序、規(guī)范化地等待被監(jiān)聽(tīng),消費(fèi)處理,其處理邏輯即為“秒殺接口的核心處理邏輯”,完整源代碼于RabbitReceiverService服務(wù)類中,如下所示:  

//秒殺時(shí)異步接收Mq消息-監(jiān)聽(tīng)者
@RabbitListener(queues = {"${mq.kill.item.execute.limit.queue.name}"},containerFactory = "multiListenerContainer")
public void consumeKillExecuteMqMsg(KillDto dto){
try {
if (dto!=null){
//采用任何一種加分布鎖的處理方法都是可行的 killItemV5也行
killService.killItemV4(dto.getKillId(),dto.getUserId());
}
}catch (Exception e){
log.error("用戶秒殺成功后超時(shí)未支付-監(jiān)聽(tīng)者-發(fā)生異常:",e.fillInStackTrace());
}
}

RabbitMQ限流、轉(zhuǎn)移以及在接收處理層面的代碼開(kāi)發(fā)已經(jīng)完成了,下篇文章我們將將其整合至秒殺的業(yè)務(wù)邏輯當(dāng)中!  

補(bǔ)充:

1、目前,這一秒殺系統(tǒng)的整體構(gòu)建與代碼實(shí)戰(zhàn)已經(jīng)全部完成了,該秒殺系統(tǒng)對(duì)應(yīng)的視頻教程的鏈接地址為:https://www.fightjava.com/web/index/course/detail/6,可以點(diǎn)擊鏈接進(jìn)行試看以及學(xué)習(xí),實(shí)戰(zhàn)期間有任何問(wèn)題都可以留言或者與Debug聯(lián)系、交流!

2、另外,Debug也開(kāi)源了該秒殺系統(tǒng)對(duì)應(yīng)的完整的源代碼以及數(shù)據(jù)庫(kù),其地址可以來(lái)這里下載:https://gitee.com/steadyjack/SpringBoot-SecondKill 記得Fork跟Star?。。?!

3、最后,不要忘記了關(guān)注一下Debug的技術(shù)微信公眾號(hào):