Java秒殺系統(tǒng)(八):整合RabbitMQ實(shí)現(xiàn)消息異步發(fā)送
作者:
修羅debug
版權(quán)聲明:本文為博主原創(chuàng)文章,遵循 CC 4.0 by-sa 版權(quán)協(xié)議,轉(zhuǎn)載請(qǐng)附上原文出處鏈接和本聲明。
摘要:本篇博文是“Java秒殺系統(tǒng)實(shí)戰(zhàn)系列文章”的第八篇,在這篇文章中我們將整合消息中間件RabbitMQ,包括添加依賴、加入配置信息以及自定義注入相關(guān)操作組件,比如RabbitTemplate等等,最終初步實(shí)現(xiàn)消息的發(fā)送和接收,并在下一篇章將其與郵件服務(wù)整合,實(shí)現(xiàn)“用戶秒殺成功發(fā)送郵件通知消息”的功能!
內(nèi)容:對(duì)于消息中間件RabbitMQ,想必各位小伙伴沒(méi)有用過(guò)、也該有聽(tīng)過(guò),它是一款目前市面上應(yīng)用相當(dāng)廣泛的消息中間件,可以實(shí)現(xiàn)消息異步通信、業(yè)務(wù)服務(wù)模塊解耦、接口限流、消息分發(fā)等功能,在微服務(wù)、分布式系統(tǒng)架構(gòu)中可以說(shuō)是充當(dāng)著一名了不起的角色?。ㄔ敿?xì)的介紹,Debug在這里就不贅述了,各位小伙伴可以上官網(wǎng)看看其更多的介紹及其典型的應(yīng)用場(chǎng)景)!
在本篇博文中,我們將使用RabbitMQ充當(dāng)消息發(fā)送的組件,將它與后面篇章介紹的“郵件服務(wù)”結(jié)合實(shí)現(xiàn)“用戶秒殺成功后異步發(fā)送郵件通知消息,告知用戶秒殺已經(jīng)成功!”,下面我們一起進(jìn)入代碼實(shí)戰(zhàn)吧。
(1) 要使用RabbitMQ,前提得在本地開(kāi)發(fā)環(huán)境或者服務(wù)器安裝RabbitMQ服務(wù),如下圖所示為Debug在本地安裝RabbitMQ服務(wù)成功后訪問(wèn)其后端控制臺(tái)應(yīng)用的首頁(yè):
之后我們開(kāi)始將其與SpringBoot進(jìn)行整合。首先需要加入其依賴,其版本號(hào)跟SpringBoot的版本一致,版本號(hào)為1.5.7.RELEASE:
<!--rabbitmq-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
<version>${spring-boot.version}</version>
</dependency>
然后需要在配置文件application.properties中加入RabbitMQ服務(wù)相關(guān)的配置,比如其服務(wù)所在的Host、端口Port等等:
#rabbitmq
spring.rabbitmq.virtual-host=/
spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.rabbitmq.listener.simple.concurrency=5
spring.rabbitmq.listener.simple.max-concurrency=15
spring.rabbitmq.listener.simple.prefetch=10
(2) 緊接著,我們借助SpringBoot天然具有的一些特性,自動(dòng)注入RabbitMQ一些組件的配置,包括其“單一實(shí)例消費(fèi)者”配置、“多實(shí)例消費(fèi)者”配置以及用于發(fā)送消息的操作組件實(shí)例“RabbitTemplate”的配置:
//通用化 Rabbitmq 配置
@Configuration
public class RabbitmqConfig {
private final static Logger log = LoggerFactory.getLogger(RabbitmqConfig.class);
@Autowired
private Environment env;
@Autowired
private CachingConnectionFactory connectionFactory;
@Autowired
private SimpleRabbitListenerContainerFactoryConfigurer factoryConfigurer;
//單一消費(fèi)者
@Bean(name = "singleListenerContainer")
public SimpleRabbitListenerContainerFactory listenerContainer(){
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setMessageConverter(new Jackson2JsonMessageConverter());
factory.setConcurrentConsumers(1);
factory.setMaxConcurrentConsumers(1);
factory.setPrefetchCount(1);
factory.setTxSize(1);
return factory;
}
//多個(gè)消費(fèi)者
@Bean(name = "multiListenerContainer")
public SimpleRabbitListenerContainerFactory multiListenerContainer(){
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factoryConfigurer.configure(factory,connectionFactory);
factory.setMessageConverter(new Jackson2JsonMessageConverter());
//確認(rèn)消費(fèi)模式-NONE
factory.setAcknowledgeMode(AcknowledgeMode.NONE);
factory.setConcurrentConsumers(env.getProperty("spring.rabbitmq.listener.simple.concurrency",int.class));
factory.setMaxConcurrentConsumers(env.getProperty("spring.rabbitmq.listener.simple.max-concurrency",int.class));
factory.setPrefetchCount(env.getProperty("spring.rabbitmq.listener.simple.prefetch",int.class));
return factory;
}
@Bean
public RabbitTemplate rabbitTemplate(){
connectionFactory.setPublisherConfirms(true);
connectionFactory.setPublisherReturns(true);
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setMandatory(true);
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
log.info("消息發(fā)送成功:correlationData({}),ack({}),cause({})",correlationData,ack,cause);
}
});
rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
log.warn("消息丟失:exchange({}),route({}),replyCode({}),replyText({}),message:{}",exchange,routingKey,replyCode,replyText,message);
}
});
return rabbitTemplate;
}
}
在RabbitMQ的消息發(fā)送組件RabbitTemplate的配置中,我們還特意加入了“消息發(fā)送確認(rèn)”、“消息丟失回調(diào)”的輸出配置,即當(dāng)消息正確進(jìn)入到隊(duì)列后,即代表消息發(fā)送成功;當(dāng)消息找不到對(duì)應(yīng)的隊(duì)列(在某種程度上,其實(shí)也就是找不到交換機(jī)和路由)時(shí),會(huì)輸出消息丟失。
(3) 完了之后,我們準(zhǔn)備開(kāi)始使用RabbitMQ實(shí)現(xiàn)消息的發(fā)送和接收。首先,我們需要在RabbitmqConfig配置類中創(chuàng)建隊(duì)列、交換機(jī)、路由以及綁定等Bean組件,如下所示:
//構(gòu)建異步發(fā)送郵箱通知的消息模型
@Bean
public Queue successEmailQueue(){
return new Queue(env.getProperty("mq.kill.item.success.email.queue"),true);
}
@Bean
public TopicExchange successEmailExchange(){
return new TopicExchange(env.getProperty("mq.kill.item.success.email.exchange"),true,false);
}
@Bean
public Binding successEmailBinding(){
return BindingBuilder.bind(successEmailQueue()).to(successEmailExchange()).with(env.getProperty("mq.kill.item.success.email.routing.key"));
}
其中,環(huán)境變量實(shí)例env讀取的那些屬性我們是配置在application.properties文件中的,如下所示:
mq.env=test
#秒殺成功異步發(fā)送郵件的消息模型
mq.kill.item.success.email.queue=${mq.env}.kill.item.success.email.queue
mq.kill.item.success.email.exchange=${mq.env}.kill.item.success.email.exchange
mq.kill.item.success.email.routing.key=${mq.env}.kill.item.success.email.routing.key
緊接著,我們需要在通用的消息發(fā)送服務(wù)類 RabbitSenderService 中寫(xiě)一段發(fā)送消息的方法,該方法用于接收“訂單編號(hào)”參數(shù),然后在數(shù)據(jù)庫(kù)中查詢其對(duì)應(yīng)的詳細(xì)訂單記錄,將該記錄充當(dāng)“消息”并發(fā)送至RabbitMQ的隊(duì)列中,等待被監(jiān)聽(tīng)消費(fèi):
/**
* RabbitMQ通用的消息發(fā)送服務(wù)
* @Author:debug (SteadyJack)
* @Date: 2019/6/21 21:47
**/
@Service
public class RabbitSenderService {
public static final Logger log= LoggerFactory.getLogger(RabbitSenderService.class);
@Autowired
private RabbitTemplate rabbitTemplate;
@Autowired
private Environment env;
@Autowired
private ItemKillSuccessMapper itemKillSuccessMapper;
//秒殺成功異步發(fā)送郵件通知消息
public void sendKillSuccessEmailMsg(String orderNo){
log.info("秒殺成功異步發(fā)送郵件通知消息-準(zhǔn)備發(fā)送消息:{}",orderNo);
try {
if (StringUtils.isNotBlank(orderNo)){
KillSuccessUserInfo info=itemKillSuccessMapper.selectByCode(orderNo);
if (info!=null){
//TODO:rabbitmq發(fā)送消息的邏輯
rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
rabbitTemplate.setExchange(env.getProperty("mq.kill.item.success.email.exchange"));
rabbitTemplate.setRoutingKey(env.getProperty("mq.kill.item.success.email.routing.key"));
//TODO:將info充當(dāng)消息發(fā)送至隊(duì)列
rabbitTemplate.convertAndSend(info, new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
MessageProperties messageProperties=message.getMessageProperties();
messageProperties.setDeliveryMode(MessageDeliveryMode.PERSISTENT);
messageProperties.setHeader(AbstractJavaTypeMapper.DEFAULT_CONTENT_CLASSID_FIELD_NAME,KillSuccessUserInfo.class);
return message;
}
});
}
}
}catch (Exception e){
log.error("秒殺成功異步發(fā)送郵件通知消息-發(fā)生異常,消息為:{}",orderNo,e.fillInStackTrace());
}
}
}
(4) 最后,是在通用的消息接收服務(wù)類RabbitReceiverService中實(shí)現(xiàn)消息的接收,其完整的源代碼如下所示:
/**
* RabbitMQ通用的消息接收服務(wù)
* @Author:debug (SteadyJack)
* @Date: 2019/6/21 21:47
**/
@Service
public class RabbitReceiverService {
public static final Logger log= LoggerFactory.getLogger(RabbitReceiverService.class);
@Autowired
private MailService mailService;
@Autowired
private Environment env;
@Autowired
private ItemKillSuccessMapper itemKillSuccessMapper;
//秒殺異步郵件通知-接收消息
@RabbitListener(queues = {"${mq.kill.item.success.email.queue}"},containerFactory = "singleListenerContainer")
public void consumeEmailMsg(KillSuccessUserInfo info){
try {
log.info("秒殺異步郵件通知-接收消息:{}",info);
//到時(shí)候這里將整合郵件服務(wù)發(fā)送郵件通知消息的邏輯
}catch (Exception e){
log.error("秒殺異步郵件通知-接收消息-發(fā)生異常:",e.fillInStackTrace());
}
}
}
至此,關(guān)于SpringBoot整合消息中間件RabbitMQ的代碼實(shí)戰(zhàn),本篇文章就介紹到這里了。
最后一點(diǎn),我們需要進(jìn)行測(cè)試,即用戶在界面發(fā)起“搶購(gòu)”的請(qǐng)求操作之后,如果能秒殺成功,則RabbitMQ會(huì)發(fā)送、接收一條消息,如下所示:
好了,關(guān)于RabbitMQ的使用,本文到此就暫且告一段落了,在下一篇文章中我們將把它與郵件服務(wù)進(jìn)行整合,實(shí)現(xiàn)“用戶秒殺成功后異步發(fā)送郵件通知消息給到用戶郵箱”的功能!除此之外,我們還將在后面的篇章介紹“如何使用RabbitMQ的死信隊(duì)列,處理用戶下單成功后卻超時(shí)未支付的訂單~在那里我們將采取失效的操作”。
補(bǔ)充:
1、目前,這一秒殺系統(tǒng)的整體構(gòu)建與代碼實(shí)戰(zhàn)已經(jīng)全部完成了,該秒殺系統(tǒng)對(duì)應(yīng)的視頻教程的鏈接地址為:https://www.fightjava.com/web/index/course/detail/6,可以點(diǎn)擊鏈接進(jìn)行試看以及學(xué)習(xí),實(shí)戰(zhàn)期間有任何問(wèn)題都可以留言或者與Debug聯(lián)系、交流!
2、另外,Debug也開(kāi)源了該秒殺系統(tǒng)對(duì)應(yīng)的完整的源代碼以及數(shù)據(jù)庫(kù),其地址可以來(lái)這里下載:https://gitee.com/steadyjack/SpringBoot-SecondKill 記得Fork跟Star?。。?!
3、最后,不要忘記了關(guān)注一下Debug的技術(shù)微信公眾號(hào):