Java面試題~消息中間件RabbitMQ如何保證消息不丟失且100%投遞成功

作者: 修羅debug
版權(quán)聲明:本文為博主原創(chuàng)文章,遵循 CC 4.0 by-sa 版權(quán)協(xié)議,轉(zhuǎn)載請(qǐng)附上原文出處鏈接和本聲明。

摘要

RabbitMQ是一款可以用于實(shí)現(xiàn)消息通信、服務(wù)解耦的分布式中間件,在實(shí)際項(xiàng)目中具有許多典型的應(yīng)用場(chǎng)景,如異步記錄日志、業(yè)務(wù)服務(wù)模塊異步通信解耦、高并發(fā)搶購(gòu)場(chǎng)景下流量削峰、限流等等,本文我們將以實(shí)際項(xiàng)目中典型的業(yè)務(wù)場(chǎng)景:“發(fā)送郵件” 為案例,一起探討RabbitMQ在發(fā)送消息時(shí)如何保證消息不丟失且保證其100%投遞成功。

題外話~Java技術(shù)面時(shí)這無(wú)疑是一道高頻必問(wèn)之題

內(nèi)容

一、 整體業(yè)務(wù)流程介紹

“發(fā)送郵件”這一業(yè)務(wù)場(chǎng)景想必各位小伙伴都經(jīng)歷過(guò),在業(yè)務(wù)要求不是很嚴(yán)格的情況下,“郵件發(fā)送”可以采用異步的方式加以實(shí)現(xiàn),傳統(tǒng)的做法是直接在發(fā)送郵件的方法上加上注解 @Async 即可實(shí)現(xiàn);

而在微服務(wù)、分布式時(shí)代,更多的是利用中間件加以實(shí)現(xiàn),此時(shí)MQ便排上了用場(chǎng),目前市面上比較典型的產(chǎn)品包括:ActiveMQ、RabbitMQ、Kafka 以及 RocketMQ等等,在這里我們以RabbitMQ為主,并由此介紹一下RabbitMQ在發(fā)送消息期間如何保證消息不丟失以及保證其100%投遞成功呢?

對(duì)這一問(wèn)題,我們還是先給出答案吧,后續(xù)我們還會(huì)采用代碼實(shí)戰(zhàn)一一驗(yàn)證這些答案:

1)“發(fā)送確認(rèn)”模式:即生產(chǎn)者通過(guò)MQ發(fā)送消息后,MQ需要將“已發(fā)送成功/失敗”反饋給生產(chǎn)者,告知生產(chǎn)者消息已投遞成功,此方式可確保消息正確地發(fā)送至RabbitMQ

 

2)“消費(fèi)確認(rèn)”模式:即消費(fèi)者監(jiān)聽到MQ中隊(duì)列的消息并執(zhí)行完對(duì)應(yīng)的業(yè)務(wù)邏輯后,需要發(fā)送“消息已被成功監(jiān)聽、消費(fèi)”反饋給MQ,此方式可保證接收方正確接收并消費(fèi)了消息,消費(fèi)成功后消息將從隊(duì)列中移除

 

3)“避免消息重復(fù)投遞”:生產(chǎn)者在生產(chǎn)消息時(shí),MQ內(nèi)部會(huì)針對(duì)每條消息生成一個(gè)MsgId,該標(biāo)識(shí)可以作為去重的依據(jù)(消息投遞失敗并重傳),避免重復(fù)的消息進(jìn)入隊(duì)列;

 

4)“消息消費(fèi)時(shí)保證冪等性”:這一點(diǎn)可以利用業(yè)務(wù)本身的特性來(lái)實(shí)現(xiàn),即每個(gè)業(yè)務(wù)實(shí)體一般都會(huì)有一個(gè)唯一的ID,就像數(shù)據(jù)庫(kù)表中唯一的主鍵一樣,在監(jiān)聽消費(fèi)處理時(shí)根據(jù)ID作為去重的依據(jù);

 

5)“持久化”:將隊(duì)列、交換機(jī)、消息都設(shè)置為持久化模式,確保消息在投遞、發(fā)送期間出現(xiàn)MQ服務(wù)宕機(jī)后重啟恢復(fù)過(guò)來(lái)時(shí)消息依舊存在;

 

6)“消息消費(fèi)重試機(jī)制”:指的是消費(fèi)者在監(jiān)聽、消費(fèi)、處理消息的過(guò)程中出現(xiàn)了異常,導(dǎo)致業(yè)務(wù)邏輯沒(méi)有處理成功,此時(shí)可以開啟“消息重入隊(duì)列”機(jī)制,設(shè)置消息重入隊(duì)列N 進(jìn)行 重試消費(fèi);

 

7)“消息投遞補(bǔ)償機(jī)制”:指的是消息在生產(chǎn)、投遞期間出現(xiàn)“投遞失敗”,也就是“發(fā)送不成功”的情況,此時(shí)可以將其加入到DB中,并開啟定時(shí)任務(wù),拉取那些投遞不成功的消息,重新投遞入隊(duì)列,如此一來(lái)便可以保證消息不丟失且準(zhǔn)備被投遞。

 

交代完了答案,接下來(lái)我們以“發(fā)送郵件”這一業(yè)務(wù)場(chǎng)景為案例,結(jié)合實(shí)際的代碼實(shí)戰(zhàn)一一驗(yàn)證上述的答案,如下圖所示為這一業(yè)務(wù)場(chǎng)景以及“RabbitMQ保證消息不丟失和100%投遞成功”的整體實(shí)現(xiàn)流程圖:   


二、整體業(yè)務(wù)流程核心代碼實(shí)戰(zhàn)

1)首先,需要在數(shù)據(jù)庫(kù)中創(chuàng)建一數(shù)據(jù)庫(kù)表msg_log,用于記錄RabbitMQ消息的投遞以及消費(fèi)過(guò)程,其DDL如下所示:

CREATE TABLE `msg_log` (
`msg_id` varchar(155) NOT NULL DEFAULT '' COMMENT '消息唯一標(biāo)識(shí)',
`msg` text COMMENT '消息體, json格式化',
`exchange` varchar(255) NOT NULL DEFAULT '' COMMENT '交換機(jī)',
`routing_key` varchar(255) NOT NULL DEFAULT '' COMMENT '路由鍵',
`status` int(11) NOT NULL DEFAULT '0' COMMENT '狀態(tài): 0投遞中 1投遞成功 2投遞失敗 3已消費(fèi)',
`try_count` int(11) NOT NULL DEFAULT '0' COMMENT '重試次數(shù)',
`next_try_time` datetime DEFAULT NULL COMMENT '下一次重試時(shí)間',
`create_time` datetime DEFAULT NULL COMMENT '創(chuàng)建時(shí)間',
`update_time` datetime DEFAULT NULL COMMENT '更新時(shí)間',
PRIMARY KEY (`msg_id`),
UNIQUE KEY `unq_msg_id` (`msg_id`) USING BTREE
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='消息投遞日志';

緊接著,需要利用Mybatis逆向工程生成該數(shù)據(jù)庫(kù)表對(duì)應(yīng)的實(shí)體類EntitySQL操作接口Mapper以及Mapper.xml,在這里就不貼出來(lái)了,各位小伙伴自行生成即可(也可以在文末下載對(duì)應(yīng)的源碼直接拷貝出來(lái)使用即可)

2)接下來(lái)是創(chuàng)建用于發(fā)送郵件的控制器MailController,其完整源碼如下所示:

@RestController
@RequestMapping("mail")
public class MailController {

@Autowired
private MailService mailService;

@Autowired
private RabbitTemplate rabbitTemplate;

@Autowired
private Environment env;

@Autowired
private MsgLogMapper msgLogMapper;

private static final Snowflake snowflake=new Snowflake(3,2);

//發(fā)送郵件
@RequestMapping(value = "send/v2",method = RequestMethod.POST)
public BaseResponse sendMailV2(@RequestBody @Validated MailDto dto, BindingResult result){
String checkRes=ValidatorUtil.checkResult(result);
if (StringUtils.isNotBlank(checkRes)){
return new BaseResponse(StatusCode.InvalidParams.getCode(),checkRes);
}
BaseResponse response=new BaseResponse(StatusCode.Success);
try {
MsgLog entity=new MsgLog();
String msgId=snowflake.nextIdStr();

entity.setMsgId(msgId);
entity.setExchange(env.getProperty("mq.email.v2.exchange"));
entity.setRoutingKey(env.getProperty("mq.email.v2.routing.key"));
entity.setStatus(Constant.MsgStatus.Sending.getStatus());
entity.setTryCount(0);
entity.setCreateTime(DateTime.now().toDate());

dto.setMsgId(msgId);
final String msg=new Gson().toJson(dto);

entity.setMsg(msg);
//在發(fā)送消息之前先創(chuàng)建并入庫(kù)
msgLogMapper.insertSelective(entity);

//發(fā)送消息
rabbitTemplate.convertAndSend(entity.getExchange(), entity.getRoutingKey(),msg,new CorrelationData(entity.getMsgId()));

}catch (Exception e){
response=new BaseResponse(StatusCode.Fail.getCode(),e.getMessage());
}
return response;
}
}

從該代碼中可以得知在RabbitMQ真正發(fā)送消息之前,需要將該消息插入數(shù)據(jù)庫(kù),并設(shè)置其狀態(tài)為 “投遞中”;與此同時(shí),需要利用“雪花算法”工具生成該消息的全局唯一標(biāo)識(shí)MsgId,并用其充當(dāng)消息的標(biāo)識(shí);

之后便是將消息交給了RabbitMQ 執(zhí)行發(fā)送邏輯,而為了能讓RabbitMQ真正發(fā)送消息,我們需要在RabbitmqConfig配置類中創(chuàng)建相應(yīng)的交換機(jī)、路由以及隊(duì)列綁定;除此之外,需要在自定義注入RabbitTemplate  Bean組件中加入“發(fā)送確認(rèn)”的處理邏輯,即監(jiān)聽 “消息是否真的投遞成功”;

同時(shí),也需要在自定義配置“監(jiān)聽器容器工廠” Bean組件中加入“消息確認(rèn)消費(fèi)模式”,在這里,我們采用的是“手動(dòng)確認(rèn)消費(fèi)”模式,其完整源碼如下所示:

//通用化 Rabbitmq 配置
@Configuration
public class RabbitmqConfig {

private final static Logger log = LoggerFactory.getLogger(RabbitmqConfig.class);

@Autowired
private Environment env;
@Autowired
private CachingConnectionFactory connectionFactory;
@Autowired
private SimpleRabbitListenerContainerFactoryConfigurer factoryConfigurer;

@Autowired
private MsgLogMapper msgLogMapper;

//多實(shí)例消費(fèi)者 – 設(shè)定確認(rèn)消費(fèi)模式為手動(dòng)確認(rèn)
@Bean(name = "multiListenerContainer")
public SimpleRabbitListenerContainerFactory multiListenerContainer(){
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factoryConfigurer.configure(factory,connectionFactory);
factory.setMessageConverter(new Jackson2JsonMessageConverter());
//確認(rèn)消費(fèi)模式
factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
factory.setConcurrentConsumers(env.getProperty("spring.rabbitmq.listener.simple.concurrency",int.class));
factory.setMaxConcurrentConsumers(env.getProperty("spring.rabbitmq.listener.simple.max-concurrency",int.class));
factory.setPrefetchCount(env.getProperty("spring.rabbitmq.listener.simple.prefetch",int.class));
return factory;
}

//消息發(fā)送操作組件自定義注入配置:設(shè)置 "生產(chǎn)需要確認(rèn)"
@Bean
public RabbitTemplate rabbitTemplate(){
connectionFactory.setPublisherConfirms(true);
connectionFactory.setPublisherReturns(true);
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);

// 觸發(fā)setReturnCallback回調(diào)必須設(shè)置mandatory=true, 否則Exchange沒(méi)有找到Queue就會(huì)丟棄掉消息, 而不會(huì)觸發(fā)回調(diào)
rabbitTemplate.setMandatory(true);
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
log.info("消息發(fā)送成功:消息唯一標(biāo)識(shí)correlationData({}),消息確認(rèn)結(jié)果:ack({}),失敗原因:cause({})",correlationData,ack,cause);

//發(fā)送確認(rèn)結(jié)果
if (correlationData!=null && StringUtils.isNotBlank(correlationData.getId())){
if (ack) {
//消息投遞成功
msgLogMapper.updateMsgSendStatus(correlationData.getId(), Constant.MsgStatus.SendSuccess.getStatus());
} else {
//消息投遞失敗
msgLogMapper.updateMsgSendStatus(correlationData.getId(), Constant.MsgStatus.SendFail.getStatus());
}
}
}
});
// 消息是否從Exchange路由到Queue, 注意: 這是一個(gè)失敗回調(diào), 只有消息從Exchange路由到Queue失敗才會(huì)回調(diào)這個(gè)方法
// 啟動(dòng)消息失敗返回,比如路由不到隊(duì)列時(shí)觸發(fā)回調(diào)
rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
log.warn("消息丟失:交換器exchange({}),路由route({}),響應(yīng)結(jié)果replyCode({}),響應(yīng)信息replyText({}),消息message:{}",exchange,routingKey,replyCode,replyText,message);
}
});
return rabbitTemplate;
}

//構(gòu)建發(fā)送郵件消息模型-隊(duì)列、交換機(jī)做持久化,消息到時(shí)候也設(shè)置持久化,將意味著MQ在發(fā)送消息的整個(gè)過(guò)程,將會(huì)被記錄到持久化日志文件中
//可靠性投遞
@Bean
public Queue emailQueueV2(){
return new Queue(env.getProperty("mq.email.v2.queue"),true);
}
@Bean
public DirectExchange emailExchangeV2(){
return new DirectExchange(env.getProperty("mq.email.v2.exchange"),true,false);
}
@Bean
public Binding emailBindingV2(){
return BindingBuilder.bind(emailQueueV2()).to(emailExchangeV2()).with(env.getProperty("mq.email.v2.routing.key"));
}
}

3)然后是創(chuàng)建用于“監(jiān)聽消費(fèi)消息”的消費(fèi)者MqListener,其監(jiān)聽消費(fèi)處理消息的邏輯為:獲取并解析出消息體的內(nèi)容,根據(jù)MsgId查詢消息當(dāng)前的發(fā)送狀態(tài),如果處于“未投遞”或者“投遞失敗”且“重試次數(shù)還沒(méi)超過(guò)最大次數(shù)”時(shí),則獲取該消息體的真實(shí)內(nèi)容,并將其交給MailService執(zhí)行“發(fā)送郵件”的邏輯,其完整源代碼如下所示:

@Component
public class MqListener {

private static final Logger log= LoggerFactory.getLogger(MqListener.class);

@Autowired
private MailService mailService;

@Autowired
private MsgLogMapper msgLogMapper;

//監(jiān)聽消費(fèi)發(fā)送郵件消息-v2
@RabbitListener(queues = {"${mq.email.v2.queue}"},containerFactory = "multiListenerContainer")
public void consumeMailV2(Message message, Channel channel) throws Exception{
log.info("---監(jiān)聽消費(fèi)發(fā)送郵件消息-v2---開始處理中...");
Long deliverTag=message.getMessageProperties().getDeliveryTag();

Integer tryCount=0;
String msgId="";
try {
MailDto dto=new Gson().fromJson(new String(message.getBody()), MailDto.class);
msgId=dto.getMsgId();
log.info("---監(jiān)聽到消息-v2:{}",dto);

//maxTry=msgLogMapper.selectMaxTry(dto.getMsgId());

/*此處可以加上一層分布式鎖,保證超高并發(fā)時(shí)處理消息的原理操作 ~ 讀者自行實(shí)現(xiàn),有問(wèn)題可以隨時(shí)交流*/

//防止重復(fù)投遞 - 保證冪等性
MsgLog msgLog=msgLogMapper.selectByPrimaryKey(dto.getMsgId());
if (msgLog!=null && !Constant.MsgStatus.ConsumeSuccess.getStatus().equals(msgLog.getStatus()) && msgLog.getTryCount()<Constant.MsgTryCount){
tryCount=msgLog.getTryCount();

//核心業(yè)務(wù)邏輯【消息處理~發(fā)送郵件(同步;異步)】
boolean res=mailService.sendSimpleEmail(msgId,dto.getSubject(),dto.getContent(),dto.getTos());
if (res){
//確認(rèn)消費(fèi)
channel.basicAck(deliverTag,false);
return;
}
}
}catch (Exception e){
e.printStackTrace();

//出現(xiàn)異常時(shí)可以走重試機(jī)制 ~ 重試次數(shù)為3次,每次間隔8s(自己靈活設(shè)定即可)
if (tryCount<Constant.MsgTryCount){
Thread.sleep(8000);

channel.basicNack(deliverTag,false,true);
msgLogMapper.updateMaxTry(msgId);
return;
}
}

//當(dāng)走到這一步時(shí)代表消息已被消費(fèi)(status=3)或者 重試次數(shù)達(dá)到上限 等情況-但不管是哪種情況,都需要將消息從隊(duì)列中移除,防止下次項(xiàng)目重啟時(shí)重新監(jiān)聽消費(fèi)
channel.basicAck(deliverTag,false);
}
}

4)而mailServicesendSimpleEmail方法則主要用于執(zhí)行真正的“發(fā)送郵件”邏輯,同時(shí)將最終的發(fā)送結(jié)果(業(yè)務(wù)邏輯處理結(jié)果)及時(shí)更新至數(shù)據(jù)庫(kù)表“消息的發(fā)送狀態(tài)”中,其完整的源碼如下所示:

@Service
public class MailService {
private static final Logger log= LoggerFactory.getLogger(MailService.class);

@Autowired
private Environment env;

@Autowired
private JavaMailSender mailSender;

@Autowired
private MsgLogMapper msgLogMapper;

//TODO:發(fā)送簡(jiǎn)單的郵件消息
//@Async("threadPoolTaskExecutor")
public Boolean sendSimpleEmail(final String msgId,final String subject,final String content,final String ... tos) throws Exception{
Boolean res=true;
try {
SimpleMailMessage message=new SimpleMailMessage();
message.setSubject(subject);
message.setText(content);
message.setTo(tos);
message.setFrom(env.getProperty("mail.send.from"));
mailSender.send(message);

//int i=1/0;

log.info("----發(fā)送簡(jiǎn)單的郵件消息完畢--->");
}catch (Exception e){
log.error("--發(fā)送簡(jiǎn)單的郵件消息,發(fā)生異常:",e.fillInStackTrace());
res=false;
throw e;
}finally {
this.updateMsgSendStatus(msgId,res);
}
return res;
}

//TODO:更新消息處理的結(jié)果
private void updateMsgSendStatus(final String msgId,Boolean res){
if (StringUtils.isNotBlank(msgId)){
if (res){
msgLogMapper.updateMsgManageSuccess(msgId, Constant.MsgStatus.ConsumeSuccess.getStatus());
}else{
msgLogMapper.updateMsgManageSuccess(msgId, Constant.MsgStatus.ConsumeFail.getStatus());
}
}
}
}

5)最后是“定時(shí)任務(wù)補(bǔ)償投遞”機(jī)制,即開啟一定時(shí)任務(wù),拉取投遞失敗的消息重新進(jìn)行投遞,其源碼如下所示:

@Component
public class MqScheduler {

private static final Logger log= LoggerFactory.getLogger(MqScheduler.class);

@Autowired
private MsgLogMapper msgLogMapper;

@Autowired
private RabbitTemplate rabbitTemplate;

//定時(shí)任務(wù)重新拉取并投遞
@Scheduled(cron = "0/10 * * * * ?")
public void reSendMsg(){
try {
String status= Constant.MsgStatus.Sending.getStatus()+","+Constant.MsgStatus.SendFail.getStatus();
List<MsgLog> msgLogs=msgLogMapper.selectSendFailMsg(status);
if (msgLogs!=null && !msgLogs.isEmpty()){
msgLogs.forEach(msgLog -> {
if (StringUtils.isNotBlank(msgLog.getMsg()) && StringUtils.isNotBlank(msgLog.getExchange())
&& StringUtils.isNotBlank(msgLog.getRoutingKey())){
//發(fā)送消息
rabbitTemplate.convertAndSend(msgLog.getExchange(), msgLog.getRoutingKey(),msgLog.getMsg(),
new CorrelationData(msgLog.getMsgId()));
log.info("----已成功重新投遞消息,消息id={}",msgLog.getMsgId());
}
});
}
}catch (Exception e){
log.error("定時(shí)任務(wù)重新拉取投遞失敗的消息~重新進(jìn)行投遞~發(fā)生異常:",e.fillInStackTrace());
}
}
}

做完這些之后,便可以開始整個(gè)業(yè)務(wù)流程的測(cè)試了,在此期間需要分多種情況進(jìn)行測(cè)試,第一種是正常投遞、正常消費(fèi)的情況;第二種是投遞失敗,定時(shí)任務(wù)定時(shí)拉取重新投遞的情況;最后一種是消息確認(rèn)消費(fèi)失敗進(jìn)入重試的情況。  

三、整個(gè)業(yè)務(wù)流程的測(cè)試

1)正常投遞、正常消費(fèi)的情況

A.點(diǎn)擊運(yùn)行項(xiàng)目,在Postman輸入相應(yīng)的請(qǐng)求URL、請(qǐng)求體、請(qǐng)求方式 以及請(qǐng)求體的內(nèi)容類型,如下圖所示:


B.觀察IDEA控制臺(tái)的輸出信息,會(huì)發(fā)現(xiàn)消息已經(jīng)成功插入數(shù)據(jù)庫(kù),并投遞成功,稍等一兩秒后可以看到消息已經(jīng)被監(jiān)聽消費(fèi),如下圖所示:




C.稍等片刻,會(huì)發(fā)現(xiàn)郵件已經(jīng)成功,與此同時(shí),數(shù)據(jù)庫(kù)中該條消息的狀態(tài)已變更為“消費(fèi)成功”,如下圖所示:





D.打開自己的郵箱,會(huì)發(fā)現(xiàn)也已經(jīng)成功收到了該郵件信息,如下圖所示:


2)投遞失敗,定時(shí)任務(wù)定時(shí)拉取重新投遞的情況

A.為了演示“消息投遞失敗/不成功”的情況,我們可以將Controller中的sendMailV2()方法里的“發(fā)送MQ消息”的邏輯注釋起來(lái),如下所示:

//發(fā)送消息
//rabbitTemplate.convertAndSend(entity.getExchange(), entity.getRoutingKey(),msg,new CorrelationData(entity.getMsgId()));

這將意味著,前端Postman觸發(fā)請(qǐng)求調(diào)用這個(gè)接口時(shí),插入數(shù)據(jù)庫(kù)中的消息的狀態(tài)為“投遞中”,即 status=0 的情況;除此之外,還可以偷偷地調(diào)整 消息對(duì)應(yīng)的“交換機(jī)”或者“路由”的值,這樣一來(lái),RabbitmqConfigRabbitTemplate方法將會(huì)觸發(fā)“消息投遞失敗 404”的錯(cuò)誤,也就是 status=2 的情況;


B.下面我們就以第一種為主吧,將發(fā)送消息的代碼邏輯注釋起來(lái),然后再次在Postman發(fā)起請(qǐng)求,此時(shí)會(huì)發(fā)現(xiàn)數(shù)據(jù)庫(kù)表插入了一條 status=0 的消息,如下圖所示:


與此同時(shí),我們的定時(shí)器因?yàn)?/span>cron設(shè)定時(shí)間為 10s 來(lái)一次輪詢,因此稍等片刻后,會(huì)發(fā)現(xiàn)該條消息重新被拉取出來(lái)重新進(jìn)入投遞,最終成功走到了最后了,如下圖所示:


 


3)消息確認(rèn)消費(fèi)失敗進(jìn)入重試的情況

A.這種情況只有在“監(jiān)聽消費(fèi)處理消息的過(guò)程出現(xiàn)異?!?/span> 才能觸發(fā)“消息重入隊(duì)列”機(jī)制,因此,我們只需要在真正處理消息的業(yè)務(wù)邏輯:“發(fā)送郵件”中嵌入一段 int i=1/0 的代碼即可,如下圖所示:



B.而我們都知道,在執(zhí)行這段代碼之后,sendSimpleEmail() 勢(shì)必會(huì)拋出異常,之后會(huì)更新數(shù)據(jù)庫(kù)表中該消息的狀態(tài),即“消費(fèi)失敗”,之后便會(huì)觸發(fā)外層MqListenercomsumeMailV2方法的catch邏輯,如下所示:   

catch (Exception e){
e.printStackTrace();

//出現(xiàn)異常時(shí)可以走重試機(jī)制
if (tryCount<Constant.MsgTryCount){
Thread.sleep(8000);

channel.basicNack(deliverTag,false,true);
msgLogMapper.updateMaxTry(msgId);
return;
}
}


C.之后,消息便會(huì)“重入隊(duì)列”,當(dāng)然啦,每次重入隊(duì)列時(shí),時(shí)間間隔是8s(這個(gè)按照實(shí)際情況設(shè)定即可,這種寫法還是比較粗糙~), 數(shù)據(jù)庫(kù)表該消息的“最大重試次數(shù)”將會(huì)加1,直到超過(guò) 最大的重試次數(shù)(這里是 3次),便不會(huì)再進(jìn)入隊(duì)列了,而是會(huì)從隊(duì)列中永久刪除;于此同時(shí),IDEA控制臺(tái)會(huì)報(bào)出相應(yīng)的異常信息,最終數(shù)據(jù)庫(kù)表中該條消息的try_count也達(dá)到了最大重試次數(shù):3次,如下圖所示:




四、總結(jié)

至此,我們已經(jīng)完成了整個(gè)業(yè)務(wù)流程的代碼實(shí)戰(zhàn)以及測(cè)試了,總結(jié)的話就不說(shuō)了,在本文的一開始我們就已經(jīng)給出了答案了,簡(jiǎn)而言之就是:發(fā)送和消費(fèi)需要做確認(rèn);隊(duì)列、交換機(jī)和消息需要做持久化;允許消息投遞失敗時(shí)定時(shí)任務(wù)重新投遞;允許消息消費(fèi)失敗時(shí)重入隊(duì)列進(jìn)行消費(fèi)。


最后想必是大伙兒關(guān)心的源碼下載了,關(guān)注下方微信公眾號(hào):“程序員實(shí)戰(zhàn)基地”,回復(fù):mq實(shí)戰(zhàn),即可下載該源碼;如果需要交流技術(shù),可以加下方debug的微信:debug0868