Redis實戰(zhàn)(4)-數(shù)據(jù)結(jié)構(gòu)List實戰(zhàn)之隊列特性實現(xiàn)消息多線程 廣播通知

作者: 修羅debug
版權(quán)聲明:本文為博主原創(chuàng)文章,遵循 CC 4.0 by-sa 版權(quán)協(xié)議,轉(zhuǎn)載請附上原文出處鏈接和本聲明。



摘要:電商平臺的管理后端一般有兩大角色的用戶可以使用,一個是系統(tǒng)管理員,一個是平臺的賣家/商家,對于商家而言,管理自個兒的商品是日常工作中再為普通不過的事情了,而對于系統(tǒng)管理員而言,有時候需要發(fā)布一些活動公告通知商家進行報名參加,本文我們將基于List的隊列特性實現(xiàn)公告消息的廣播通知功能!

內(nèi)容:在上篇文章中我們介紹了Redis的數(shù)據(jù)結(jié)構(gòu)~列表List,簡單介紹了其基本特性及其在電商應(yīng)用后端管理平臺下如何實現(xiàn)“商家”添加商品時的有序存儲,以及如何以有序列表的形式進行展示!

在其中,我們給大家展示了列表List在存儲和獲取數(shù)據(jù)時的流程圖,不曉得大伙兒還記不記得,如下圖所示:


從該圖中可以看出,當我們往Redis的列表List中添加數(shù)據(jù)時,數(shù)據(jù)的流動是具有“先進先出”的特性,即所謂的“FIFO”(有點隊列Queue的特性?。┑?,而且數(shù)據(jù)是緊湊、一個挨著一個存儲的!

即當我們在往緩存Redis的列表List添加數(shù)據(jù)時,可以采用“LPush 即從左邊的方向添加”的方式往緩存Redis的List中添加,然后再采用“LPop 即從左邊的方向彈出數(shù)據(jù)”或者“RPop 即從右邊的方向彈出數(shù)據(jù)”的方式獲取這一有序存儲的列表數(shù)據(jù)!

知道了列表List的數(shù)據(jù)存儲和讀取流程,其實我們也就幾乎知曉了在實際的項目實戰(zhàn)開發(fā)中的代碼實現(xiàn)了。

下面我們以“電商應(yīng)用~平臺管理員在平臺發(fā)布活動公告信息之后,除了將公告信息塞入數(shù)據(jù)庫DB之外,同時以LPush的方式將其塞入緩存Redis的列表List中,并在接口的另一端開啟定時檢測的方式,隨時檢測緩存中指定的列表Redis是否有通告信息過來,如果有,則采取RPop的方式彈出該公告信息,并以郵件的形式發(fā)送給商戶!”,如下圖所示:


下面,我們就進入代碼實戰(zhàn)環(huán)節(jié)吧!

(1)首先,當然是需要來個“通告信息表”啦,其完整的DDL(即數(shù)據(jù)定義語言)如下所示:

CREATE TABLE `notice` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`title` varchar(255) CHARACTER SET utf8mb4 DEFAULT NULL COMMENT '通告標題',
`content` varchar(255) CHARACTER SET utf8mb4 DEFAULT NULL COMMENT '內(nèi)容',
`is_active` tinyint(4) DEFAULT '1',
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=18 DEFAULT CHARSET=utf8 COMMENT='通告';

(2)然后,當然是需要開發(fā)一個Controller啦(上文我們已經(jīng)開發(fā)過了)!在該Controller中我們需要開設(shè)一個請求方法,給平臺管理員添加“通告信息”,該請求方法在接收到公告信息之后需要將其塞入數(shù)據(jù)庫DB中,同時也需要往緩存Redis的列表List中LPush一條公告信息,準備被監(jiān)聽檢測!其完整的源代碼如下所示:  

    //平臺發(fā)送通知給到各位商戶
@RequestMapping(value = "/notice/put",method = RequestMethod.POST,consumes = MediaType.APPLICATION_JSON_UTF8_VALUE)
public BaseResponse putNotice(@RequestBody @Validated Notice notice, BindingResult result){
String checkRes=ValidatorUtil.checkResult(result);
if (StrUtil.isNotBlank(checkRes)){
return new BaseResponse(StatusCode.Fail.getCode(),checkRes);
}
BaseResponse response=new BaseResponse(StatusCode.Success);
try {
log.info("--平臺發(fā)送通知給到各位商戶:{}",notice);

listService.pushNotice(notice);
}catch (Exception e){
response=new BaseResponse(StatusCode.Fail.getCode(),e.getMessage());
}
return response;
}

(3)緊接著,我們需要開發(fā)Controller對應(yīng)的Service,其職責當然是用來處理真正的業(yè)務(wù)邏輯,即“添加完成通告信息”時,它負責將該商品信息添加進DB數(shù)據(jù)庫,并添加進緩存Redis的列表List中,其完整的源代碼如下所示:  

    @Autowired
private NoticeMapper noticeMapper;

//創(chuàng)建通告
@Transactional(rollbackFor = Exception.class)
public void pushNotice(Notice notice) throws Exception{
if (notice!=null){
notice.setId(null);
//TODO:將通告信息塞入數(shù)據(jù)庫DB中
noticeMapper.insertSelective(notice);
final Integer id=notice.getId();

if (id>0){
//TODO:塞入List列表中(隊列),準備被拉取異步通知至不同的商戶的郵箱 - applicationEvent&Listener;Rabbitmq;jms
ListOperations<String,Notice> listOperations=redisTemplate.opsForList();
listOperations.leftPush(Constant.RedisListNoticeKey,notice);
}
}
}

(4)之后,我們需要創(chuàng)建一個“定時任務(wù)調(diào)度器”,用于“近實時”的檢測緩存Redis中的列表List是否有通知公告信息,如果有,則將其RPop取出來,然后采取多線程的形式將其發(fā)送給“平臺的商家”,讓他們趕緊報名參加相關(guān)的活動!其完整的源代碼如下所示:  

/**
* Redis列表-隊列的消費者監(jiān)聽器
* @Author:debug (SteadyJack)
* @Link: weixin-> debug0868 qq-> 1948831260
* @Date: 2019/10/30 14:51
**/
@Component
@EnableScheduling
public class ListListenerScheduler {

private static final Logger log=
LoggerFactory.getLogger(ListListenerScheduler.class);

private static final String listenKey= Constant.RedisListNoticeKey;

@Autowired
private RedisTemplate redisTemplate;

@Autowired
private UserMapper userMapper;

@Autowired
private EmailService emailService;

//TODO:近實時的定時任務(wù)檢測
//@Scheduled(cron = "0/10 * * * * ?")
@Scheduled(cron = "0/59 * * * * ?")
public void schedulerListenNotice(){
log.info("----定時任務(wù)調(diào)度隊列監(jiān)聽、檢測通告消息,監(jiān)聽list中的數(shù)據(jù)");

ListOperations<String,Notice> listOperations=redisTemplate.opsForList();
Notice notice=listOperations.rightPop(listenKey);
while (notice!=null){
//TODO:發(fā)送給到所有的商戶的郵箱
this.noticeUser(notice);

notice=listOperations.rightPop(listenKey);
}
}

//TODO:發(fā)送通知給到不同的商戶
@Async("threadPoolTaskExecutor")
private void noticeUser(Notice notice){
if (notice!=null){
//TODO:查詢獲取所有商戶信息
List<User> list=userMapper.selectList();

//TODO:線程池/多線程觸發(fā)群發(fā)郵件
try {
if (list!=null && !list.isEmpty()){
ExecutorService executorService=Executors.newFixedThreadPool(4);
List<NoticeThread> threads= Lists.newLinkedList();

list.forEach(user -> {
threads.add(new NoticeThread(user,notice,emailService));
});

executorService.invokeAll(threads);
}
}catch (Exception e){
log.error("近實時的定時任務(wù)檢測-發(fā)送通知給到不同的商戶-法二-線程池/多線程觸發(fā)-發(fā)生異常:",e.fillInStackTrace());
}
}
}
}

(5)至此,我們的代碼實戰(zhàn)就完畢了,最后我們就基于Postman進入測試環(huán)節(jié)吧,幾張圖加以概括吧:



最后,上一下郵箱看看吧,可以發(fā)現(xiàn)確實收到了郵件:


好了,本篇文章我們就介紹到這里了,建議各位小伙伴一定要照著文章提供的樣例代碼擼一擼,只有擼過才能知道這玩意是咋用的,否則就成了“空談?wù)摺保?br>

對Redis相關(guān)技術(shù)棧以及實際應(yīng)用場景實戰(zhàn)感興趣的小伙伴可以前往Debug搭建的技術(shù)社區(qū)的課程中心進行學習觀看:https://www.fightjava.com/web/index/course/detail/12 !

其他相關(guān)的技術(shù),感興趣的小伙伴可以關(guān)注底部Debug的技術(shù)公眾號,或者加Debug的微信,拉你進“微信版”的真正技術(shù)交流群!一起學習、共同成長!

補充:

1、本文涉及到的相關(guān)的源代碼可以到此地址,check出來進行查看學習:

https://gitee.com/steadyjack/SpringBootRedis

2、目前Debug已將本文所涉及的內(nèi)容整理錄制成視頻教程,感興趣的小伙伴可以前往觀看學習:https://www.fightjava.com/web/index/course/detail/12

3、關(guān)注一下Debug的技術(shù)微信公眾號,最新的技術(shù)文章、課程以及技術(shù)專欄將會第一時間在公眾號發(fā)布哦!