消息隊列:第五章:RabbitMQ的使用

第一步:使用之前先安裝好RabbitMQ,建議安裝在linux系統(tǒng)下

安裝配置RabbitMQ:https://blog.csdn.net/qq_33450681/article/details/85339315
第二步:在配置文件下配置

    rabbitmq:
      host: 192.168.0.100
      port: 5672
      virtual-host: /mall
      username: mall
      password: mall
      publisher-confirms: true #如果對異步消息需要回調(diào)必須設(shè)置為true

瀏覽器訪問http://192.168.0.100:15672/#/

 

第三步:業(yè)務(wù)中使用發(fā)送消息

        @Autowired
        private OmsOrderSettingMapper orderSettingMapper;
     
        @Autowired
        private AmqpTemplate amqpTemplate;
     
    /**
         * 發(fā)送檢查支付結(jié)果的消息隊列
         * @param orderSn
         * @param count
         */
        @Override
        public void sendDelayPaymentCheck(String orderSn, int count) {
            //獲取訂單超時時間
            OmsOrderSetting orderSetting = orderSettingMapper.selectByPrimaryKey(1L);
            long delayTimes = orderSetting.getNormalOrderOvertime() * 60 * 1000;
            //將需要發(fā)送的數(shù)據(jù)封裝到hashmap中
            HashMap<Object, Object> hashMap = new HashMap<>();
            hashMap.put("out_trade_no",orderSn);
            hashMap.put("count",count);
     
            //給延遲隊列發(fā)送消息
            amqpTemplate.convertAndSend(QueueEnum.QUEUE_PAY_CANCEL.getExchange(), QueueEnum.QUEUE_PAY_CANCEL.getRouteKey(), hashMap, new MessagePostProcessor() {
                @Override
                public Message postProcessMessage(Message message) throws AmqpException {
                    //給消息設(shè)置延遲毫秒值
                    message.getMessageProperties().setExpiration(String.valueOf(delayTimes));
                    return message;
                }
            });
        }

第四步:定義QueueEnum枚舉

        /**
         * 支付通知隊列
         */
        QUEUE_PAY_CANCEL("mall.pay.direct","mall.pay.cancel","mall.pay.cancel")
     
        /**
         * 交換名稱
         */
        private String exchange;
        /**
         * 隊列名稱
         */
        private String name;
        /**
         * 路由鍵
         */
        private String routeKey;
     
        QueueEnum(String exchange, String name, String routeKey) {
            this.exchange = exchange;
            this.name = name;
            this.routeKey = routeKey;
        }
     
        public String getExchange() {
            return exchange;
        }
     
        public String getName() {
            return name;
        }
     
        public String getRouteKey() {
            return routeKey;
        }

第五步:配置

RabbitMQ參數(shù)配置:

使用一個RabbitMQ需要配置以下幾個重要的參數(shù)

1.虛擬主機名稱(Virtual host name),這個參數(shù)不是真正的IP地址或者域名,它是RabbitMQ內(nèi)部的一個虛擬主機,就像是電腦安裝了N臺虛擬機,對外的名稱一般是“/xxxx".

2.交換機名(Exchanges name):顧名思義,就是把生產(chǎn)者送來的消息來進行分發(fā)給下游的多個消費者,相當(dāng)一個內(nèi)部軟交換機。交換機的類型有fanout,direct,topic,header,fanout類型類似以太網(wǎng)交換機的廣播模式,把送來的消息給每個下游隊列。direct類似單播(使用routingkey來指定目的隊列),topic交換機類似組播,把消息傳遞給下面同一主題的隊列,header交換機則忽略掉routingkey,使用hash數(shù)據(jù)結(jié)構(gòu)來進行匹配和轉(zhuǎn)發(fā)。

3.routingkey :前面講過了,交換機在進行消息轉(zhuǎn)發(fā)時候,要使用routingkey為關(guān)鍵字進行轉(zhuǎn)發(fā)。

4.隊列名稱:可以為不同的消費者指定不同的隊列,可以對消息進行分類到不同的隊列進行轉(zhuǎn)發(fā)。
配置類

    /**
     * 消息隊列配置
     * Created by macro on 2018/9/14.
     */
    @Configuration
    public class RabbitMqConfig {
     
     
        /**
         * 支付隊列
         * @return
         */
        @Bean
        public Queue payQueue() {
            return new Queue(QueueEnum.QUEUE_PAY_CANCEL.getName());
        }
     
        /**
         * 綁定支付交互機
         * @return
         */
        @Bean
        DirectExchange payDirect() {
            return (DirectExchange) ExchangeBuilder
                    .directExchange(QueueEnum.QUEUE_PAY_CANCEL.getExchange())
                    .durable(true)
                    .build();
        }
     
        /**
         * 將支付隊列綁定到支付交互機
         * @param payDirect
         * @param payQueue
         * @return
         */
        @Bean
        Binding payBinding(DirectExchange payDirect,Queue payQueue){
            return BindingBuilder
                    .bind(payQueue)
                    .to(payDirect)
                    .with(QueueEnum.QUEUE_PAY_CANCEL.getRouteKey());
        }

第六步:處理支付信息

    package com.macro.mall.portal.component;
     
    import com.macro.mall.model.PaymentInfo;
    import com.macro.mall.portal.service.PaymentService;
    import org.springframework.amqp.rabbit.annotation.RabbitHandler;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Component;
    import java.util.HashMap;
     
    /**
     * 支付的處理者
     */
    @Component
    @RabbitListener(queues = "mall.pay.cancel")
    public class PayReceiver {
     
        @Autowired
        PaymentService paymentService;
     
        @RabbitHandler
        public void handle(HashMap mapMessage){
            try {
                String outTradeNo = mapMessage.get("out_trade_no").toString();
                int count = Integer.parseInt(mapMessage.get("count").toString());
                // 如果沒有支付成功,再次發(fā)送延遲檢查隊列
                if (count > 0) {
                    // 進行支付狀態(tài)檢查
                    System.out.println("正在進行第" + (6 - count) + "支付結(jié)果次檢查");
                    //調(diào)用alipayClient接口,根據(jù)out_trade_no查詢支付信息
                    PaymentInfo paymentInfo = paymentService.checkPaymentResult(outTradeNo);
                    Thread thread = new Thread();
                    thread.start();
                    Thread.sleep(10000);
                    //判斷是否已經(jīng)支付成功
                    if (paymentInfo.getPaymentStatus()!=null&&(paymentInfo.getPaymentStatus().equals("TRADE_SUCCESS") || paymentInfo.getPaymentStatus().equals("TRADE_FINISHED"))) {
                        // 交易成功或者失敗,記錄交易狀態(tài)
                        System.out.println("檢查交易結(jié)果成功,記錄交易狀態(tài)。。。");// 修改支付的狀態(tài)信息
                        // 修改支付信息
                        boolean b = paymentService.checkPaymentStatus(outTradeNo);
                        if(!b){
                            //修改為已支付
                            paymentService.updatePayment(paymentInfo.getCallbackContent(),outTradeNo,paymentInfo.getAlipayTradeNo());
                            // 發(fā)送系統(tǒng)消息,出發(fā)并發(fā)商品支付業(yè)務(wù)消息隊列
                            paymentService.sendPaymentSuccess(paymentInfo.getOutTradeNo(),paymentInfo.getPaymentStatus(),paymentInfo.getAlipayTradeNo());
                        }
                    } else {//未支付
                        // 再次進行延遲檢查
                        System.out.println("正在進行第" + (6 - count) + "支付結(jié)果次檢查,檢查用戶尚未付款成功,繼續(xù)巡檢");
                        paymentService.sendDelayPaymentCheck(outTradeNo, count - 1);
                    }
                } else {
                    System.out.println("支付結(jié)果次檢查次數(shù)耗盡,支付未果。。。");
                }
            } catch (Exception e) {
     
            }
        }
    }