Java秒殺系統(tǒng)(十):RabbitMQ死信隊列處理超時未支付的訂單
作者:
修羅debug
版權(quán)聲明:本文為博主原創(chuàng)文章,遵循 CC 4.0 by-sa 版權(quán)協(xié)議,轉(zhuǎn)載請附上原文出處鏈接和本聲明。
摘要:本篇博文是“Java秒殺系統(tǒng)實戰(zhàn)系列文章”的第十篇,本篇博文我們將采用RabbitMQ的死信隊列的方式處理“用戶秒殺成功生成訂單后,卻遲遲沒有支付”的情況,一起來見識一下RabbitMQ死信隊列在實際業(yè)務(wù)環(huán)境下的強(qiáng)大之處!
內(nèi)容:對于消息中間件RabbitMQ,Debug其實在前面的篇章中已經(jīng)簡單分享介紹過了,在這里就不再贅述了!在本文我們將采用RabbitMQ的死信隊列實現(xiàn)這樣的業(yè)務(wù)需求:“用戶在秒殺成功并成功創(chuàng)建一筆訂單記錄后,理論上應(yīng)該是執(zhí)行去支付的操作,但是卻存在著一種情況是用戶遲遲不肯去支付~至于原因,不得而知!”
對于這種場景,各位小伙伴可以在一些商城平臺體驗一下,即挑選完商品,加入購物車后,點擊去結(jié)算,這個時候會有個倒計時,提醒你需要在指定的時間內(nèi)完成付款,否則訂單將失效!
對于這種業(yè)務(wù)邏輯的處理,傳統(tǒng)的做法是采用“定時器的方式”,定時輪詢獲取已經(jīng)超過指定時間的訂單,然后執(zhí)行一系列的處理措施(比如再爭取給用戶發(fā)送短信,提醒超過多長時間訂單就要失效了等等。。。),在這個秒殺系統(tǒng)中,我們將借助RabbitMQ死信隊列這一組件,對該訂單執(zhí)行“失效”的措施!
“死信隊列”,顧明思議,是可以延時、延遲一定的時間再處理消息的一種特殊隊列,它相對于“普通的隊列”而言,可以實現(xiàn)“進(jìn)入死信隊列的消息不立即處理,而是可以等待一定的時間再進(jìn)行處理”的功能!而普通的隊列則不行,即進(jìn)入隊列后的消息會立即被對應(yīng)的消費者監(jiān)聽消費,如下圖所示為普通隊列的基本消息模型:
而對于“死信隊列”,它的構(gòu)成以及使用相對而言比較復(fù)雜一點,在正常情況,死信隊列由三大核心組件組成:死信交換機(jī)+死信路由+TTL(消息存活時間~非必需的),而死信隊列又可以由“面向生產(chǎn)者的基本交換機(jī)+基本路由”綁定而成,故而生產(chǎn)者首先是將消息發(fā)送至“基本交換機(jī)+基本路由”所綁定而成的消息模型中,即間接性地進(jìn)入到死信隊列中,當(dāng)過了TTL,消息將“掛掉”,從而進(jìn)入下一個中轉(zhuǎn)站,即“面下那個消費者的死信交換機(jī)+死信路由”所綁定而成的消息模型中。如下圖所示:
下面,我們以實際的代碼來構(gòu)建死信隊列的消息模型,并將此消息模型應(yīng)用到秒殺系統(tǒng)的上述功能模塊中。
(1) 首先,需要在RabbitmqConfig配置類創(chuàng)建死信隊列的消息模型,其完整的源代碼如下所示:
//構(gòu)建秒殺成功之后-訂單超時未支付的死信隊列消息模型
@Bean
public Queue successKillDeadQueue(){
Map<String, Object> argsMap= Maps.newHashMap();
argsMap.put("x-dead-letter-exchange",env.getProperty("mq.kill.item.success.kill.dead.exchange"));
argsMap.put("x-dead-letter-routing-key",env.getProperty("mq.kill.item.success.kill.dead.routing.key"));
return new Queue(env.getProperty("mq.kill.item.success.kill.dead.queue"),true,false,false,argsMap);
}
//基本交換機(jī)
@Bean
public TopicExchange successKillDeadProdExchange(){
return new TopicExchange(env.getProperty("mq.kill.item.success.kill.dead.prod.exchange"),true,false);
}
//創(chuàng)建基本交換機(jī)+基本路由 -> 死信隊列 的綁定
@Bean
public Binding successKillDeadProdBinding(){
return BindingBuilder.bind(successKillDeadQueue()).to(successKillDeadProdExchange()).with(env.getProperty("mq.kill.item.success.kill.dead.prod.routing.key"));
}
//真正的隊列
@Bean
public Queue successKillRealQueue(){
return new Queue(env.getProperty("mq.kill.item.success.kill.dead.real.queue"),true);
}
//死信交換機(jī)
@Bean
public TopicExchange successKillDeadExchange(){
return new TopicExchange(env.getProperty("mq.kill.item.success.kill.dead.exchange"),true,false);
}
//死信交換機(jī)+死信路由->真正隊列 的綁定
@Bean
public Binding successKillDeadBinding(){
return BindingBuilder.bind(successKillRealQueue()).to(successKillDeadExchange()).with(env.getProperty("mq.kill.item.success.kill.dead.routing.key"));
}
其中,環(huán)境變量對象實例env讀取的變量是配置在application.properties配置文件中的,取值如下所示:
#訂單超時未支付自動失效-死信隊列消息模型
mq.kill.item.success.kill.dead.queue=${mq.env}.kill.item.success.kill.dead.queue
mq.kill.item.success.kill.dead.exchange=${mq.env}.kill.item.success.kill.dead.exchange
mq.kill.item.success.kill.dead.routing.key=${mq.env}.kill.item.success.kill.dead.routing.key
mq.kill.item.success.kill.dead.real.queue=${mq.env}.kill.item.success.kill.dead.real.queue
mq.kill.item.success.kill.dead.prod.exchange=${mq.env}.kill.item.success.kill.dead.prod.exchange
mq.kill.item.success.kill.dead.prod.routing.key=${mq.env}.kill.item.success.kill.dead.prod.routing.key
#單位為ms
mq.kill.item.success.kill.expire=20000
(2)
成功創(chuàng)建了消息模型之后,緊接著,我們需要在通用的RabbitMQ發(fā)送消息服務(wù)類RabbitSenderService中開發(fā)“發(fā)送消息入死信隊列”的功能,在該功能方法中,我們指定了消息的存活時間TTL,取值為配置的變量:mq.kill.item.success.kill.expire
的值,即20s;其完整的源代碼如下所示:
//秒殺成功后生成搶購訂單-發(fā)送信息入死信隊列,等待著一定時間失效超時未支付的訂單
public void sendKillSuccessOrderExpireMsg(final String orderCode){
try {
if (StringUtils.isNotBlank(orderCode)){
KillSuccessUserInfo info=itemKillSuccessMapper.selectByCode(orderCode);
if (info!=null){
rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
rabbitTemplate.setExchange(env.getProperty("mq.kill.item.success.kill.dead.prod.exchange"));
rabbitTemplate.setRoutingKey(env.getProperty("mq.kill.item.success.kill.dead.prod.routing.key"));
rabbitTemplate.convertAndSend(info, new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
MessageProperties mp=message.getMessageProperties();
mp.setDeliveryMode(MessageDeliveryMode.PERSISTENT);
mp.setHeader(AbstractJavaTypeMapper.DEFAULT_CONTENT_CLASSID_FIELD_NAME,KillSuccessUserInfo.class);
//TODO:動態(tài)設(shè)置TTL(為了測試方便,暫且設(shè)置20s)
mp.setExpiration(env.getProperty("mq.kill.item.success.kill.expire"));
return message;
}
});
}
}
}catch (Exception e){
log.error("秒殺成功后生成搶購訂單-發(fā)送信息入死信隊列,等待著一定時間失效超時未支付的訂單-發(fā)生異常,消息為:{}",orderCode,e.fillInStackTrace());
}
}
從該“發(fā)送消息入死信隊列”的代碼中,我們可以看到,消息首先是先入到“基本交換機(jī)+基本路由”所綁定的死信隊列的消息模型中的!當(dāng)消息到了TTL,自然會從死信隊列中出來(即“解脫了”),然后進(jìn)入下一個中轉(zhuǎn)站,即:“死信交換機(jī)+死信路由” 所綁定而成的真正隊列的消息模型中,最終真正被消費者監(jiān)聽消費!
此時,可以將整個項目、系統(tǒng)運行在外置的tomcat服務(wù)器中,然后打開RabbitMQ后端控制臺應(yīng)用,找到該死信隊列,可以看到該死信隊列的詳細(xì)信息,如下圖所示:
(3) 最后,是需要在RabbitMQ通用的消息監(jiān)聽服務(wù)類RabbitReceiverService 中監(jiān)聽“真正隊列”中的消息并進(jìn)行處理:在這里我們是對該訂單進(jìn)行失效處理(前提是還沒付款的情況下?。渫暾脑创a如下所示:
//用戶秒殺成功后超時未支付-監(jiān)聽者
@RabbitListener(queues = {"${mq.kill.item.success.kill.dead.real.queue}"},containerFactory = "singleListenerContainer")
public void consumeExpireOrder(KillSuccessUserInfo info){
try {
log.info("用戶秒殺成功后超時未支付-監(jiān)聽者-接收消息:{}",info);
if (info!=null){
ItemKillSuccess entity=itemKillSuccessMapper.selectByPrimaryKey(info.getCode());
if (entity!=null && entity.getStatus().intValue()==0){
itemKillSuccessMapper.expireOrder(info.getCode());
}
}
}catch (Exception e){
log.error("用戶秒殺成功后超時未支付-監(jiān)聽者-發(fā)生異常:",e.fillInStackTrace());
}
}
其中,失效更新訂單的記錄的操作由 itemKillSuccessMapper.expireOrder(info.getCode()); 來實現(xiàn),其對應(yīng)的動態(tài)Sql的寫法如下所示:
<!--失效更新訂單信息-->
<update id="expireOrder">
UPDATE item_kill_success
SET status = -1
WHERE code = #{code} AND status = 0
</update>
(4) 至此,關(guān)于RabbitMQ死信隊列消息模型的代碼實戰(zhàn)已經(jīng)完畢了!最后我只需要在“用戶秒殺成功創(chuàng)建訂單的那一刻,發(fā)送消息入死信隊列”的地方調(diào)用即可,其調(diào)用代碼如下所示:
/**
* 通用的方法-記錄用戶秒殺成功后生成的訂單-并進(jìn)行異步郵件消息的通知
* @param kill
* @param userId
* @throws Exception
*/
private void commonRecordKillSuccessInfo(ItemKill kill, Integer userId) throws Exception{
//TODO:記錄搶購成功后生成的秒殺訂單記錄
ItemKillSuccess entity=new ItemKillSuccess();
String orderNo=String.valueOf(snowFlake.nextId());
//entity.setCode(RandomUtil.generateOrderCode()); //傳統(tǒng)時間戳+N位隨機(jī)數(shù)
entity.setCode(orderNo); //雪花算法
entity.setItemId(kill.getItemId());
entity.setKillId(kill.getId());
entity.setUserId(userId.toString());
entity.setStatus(SysConstant.OrderStatus.SuccessNotPayed.getCode().byteValue());
entity.setCreateTime(DateTime.now().toDate());
//TODO:學(xué)以致用,舉一反三 -> 仿照單例模式的雙重檢驗鎖寫法
if (itemKillSuccessMapper.countByKillUserId(kill.getId(),userId) <= 0){
int res=itemKillSuccessMapper.insertSelective(entity);
if (res>0){
//TODO:進(jìn)行異步郵件消息的通知=rabbitmq+mail
rabbitSenderService.sendKillSuccessEmailMsg(orderNo);
//TODO:入死信隊列,用于 “失效” 超過指定的TTL時間時仍然未支付的訂單
rabbitSenderService.sendKillSuccessOrderExpireMsg(orderNo);
}
}
}
最后,是進(jìn)行自測:點擊“搶購”按鈕,用戶秒殺成功后,會發(fā)送一條消息入死信隊列(這一點可以在RabbitMQ后端控制臺中可以看到一條正Ready好的消息),等待20s,即可看到消息轉(zhuǎn)移到真正的隊列,并被真正的消費者監(jiān)聽消費,如下所示:
好了,關(guān)于“RabbitMQ死信隊列”的介紹以及應(yīng)用實戰(zhàn)本文就暫且介紹到這里了,此種方式可以很靈活對“超時未支付的訂單”,進(jìn)行很好的處理,而且整個過程是“自動、自然”的,而無需人為去手動點擊按鈕觸發(fā)了!當(dāng)然啦,萬事萬物都并非十全十美的,死信隊列也是如此,在一篇文章中我們將介紹此種方式的瑕疵之處,并采用相應(yīng)的解決方案進(jìn)行處理!
補(bǔ)充:
1、目前,這一秒殺系統(tǒng)的整體構(gòu)建與代碼實戰(zhàn)已經(jīng)全部完成了,該秒殺系統(tǒng)對應(yīng)的視頻教程的鏈接地址為:https://www.fightjava.com/web/index/course/detail/6,可以點擊鏈接進(jìn)行試看以及學(xué)習(xí),實戰(zhàn)期間有任何問題都可以留言或者與Debug聯(lián)系、交流!
2、另外,Debug也開源了該秒殺系統(tǒng)對應(yīng)的完整的源代碼以及數(shù)據(jù)庫,其地址可以來這里下載:https://gitee.com/steadyjack/SpringBoot-SecondKill 記得Fork跟Star?。。?!
3、最后,不要忘記了關(guān)注一下Debug的技術(shù)微信公眾號: