消息隊(duì)列:第三章:在Java中使用消息隊(duì)列

在項(xiàng)目中導(dǎo)入依賴坐標(biāo)

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-activemq</artifactId>
            <exclusions>
                <exclusion>
                    <groupId>org.slf4j</groupId>
                    <artifactId>slf4j-log4j12</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
 
        <dependency>
            <groupId>org.apache.activemq</groupId>
            <artifactId>activemq-pool</artifactId>
            <version>5.15.2</version>
            <exclusions>
                <exclusion>
                    <groupId>org.slf4j</groupId>
                    <artifactId>slf4j-log4j12</artifactId>
                </exclusion>
            </exclusions>
        </dependency>

[點(diǎn)擊并拖拽以移動]

使用隊(duì)列queue的情況

producer端

    public static void main(String[] args) {
        ConnectionFactory connect = new ActiveMQConnectionFactory("tcp://192.168.0.100:61616");
        try {
            //創(chuàng)建連接對象
            Connection connection = connect.createConnection();
            connection.start();
            //第一個(gè)值表示是否使用事務(wù),如果選擇true,第二個(gè)值相當(dāng)于選擇0
            Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
            //使用隊(duì)列queue
            Queue testqueue = session.createQueue("boss drink");
            //創(chuàng)建提供者
            MessageProducer producer = session.createProducer(testqueue);
            TextMessage textMessage=new ActiveMQTextMessage();
            textMessage.setText("我渴了,我要喝水!");
            producer.setDeliveryMode(DeliveryMode.PERSISTENT);
            producer.send(textMessage);
            session.commit();// 事務(wù)型消息,必須提交后才生效
            connection.close();
 
        } catch (JMSException e) {
            e.printStackTrace();
        }
 
    }

consumer端1

    public static void main(String[] args) {
        // 連接
        ConnectionFactory connect = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER,ActiveMQConnection.DEFAULT_PASSWORD,"tcp://192.168.0.100:61616");
        try {
            Connection connection = connect.createConnection();
            connection.start();
            //第一個(gè)值表示是否使用事務(wù),如果選擇true,第二個(gè)值相當(dāng)于選擇0
            Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
            Destination testqueue = session.createQueue("boss drink");
 
            MessageConsumer consumer = session.createConsumer(testqueue);
            consumer.setMessageListener(new MessageListener() {
                @Override
                public void onMessage(Message message) {
                    if(message instanceof TextMessage){
                        try {
                            String text = ((TextMessage) message).getText();
                            System.out.println(text+",員工1馬上拿起杯子打水。。。");
 
                            //session.rollback();
                            session.commit();
                        } catch (JMSException e) {
                            // TODO Auto-generated catch block
                            e.printStackTrace();
                        }
                    }
                }
            });
 
 
        }catch (Exception e){
            e.printStackTrace();;
        }
 
    }

consumer端2

    public static void main(String[] args) {
        // 連接
        ConnectionFactory connect = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER,ActiveMQConnection.DEFAULT_PASSWORD,"tcp://192.168.0.100:61616");
        try {
            Connection connection = connect.createConnection();
            connection.start();
            //第一個(gè)值表示是否使用事務(wù),如果選擇true,第二個(gè)值相當(dāng)于選擇0
            Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
            Destination testqueue = session.createQueue("boss drink");
 
            MessageConsumer consumer = session.createConsumer(testqueue);
            consumer.setMessageListener(new MessageListener() {
                @Override
                public void onMessage(Message message) {
                    if(message instanceof TextMessage){
                        try {
                            String text = ((TextMessage) message).getText();
                            System.out.println(text+",員工2馬上拿起杯子打水。。。");
 
                            //session.rollback();
                            session.commit();
                        } catch (JMSException e) {
                            // TODO Auto-generated catch block
                            e.printStackTrace();
                        }
                    }
                }
            });
 
 
        }catch (Exception e){
            e.printStackTrace();;
        }
 
    }

使用topic話題的情況:

producer端:

    public static void main(String[] args) {
        ConnectionFactory connect = new ActiveMQConnectionFactory("tcp://192.168.0.100:61616");
        try {
            //創(chuàng)建連接對象
            Connection connection = connect.createConnection();
            connection.start();
            //第一個(gè)值表示是否使用事務(wù),如果選擇true,第二個(gè)值相當(dāng)于選擇0
            Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
            //使用topic話題(訂閱)
            Topic topic = session.createTopic("boss speak");
            //創(chuàng)建提供者
            MessageProducer producer = session.createProducer(topic);
            TextMessage textMessage=new ActiveMQTextMessage();
            textMessage.setText("我們要為中國的偉大復(fù)興而努力奮斗!");
            producer.setDeliveryMode(DeliveryMode.PERSISTENT);
            producer.send(textMessage);
            session.commit();// 事務(wù)型消息,必須提交后才生效
            connection.close();
 
        } catch (JMSException e) {
            e.printStackTrace();
        }
 
    }

consumer端1:

    public static void main(String[] args) {
        // 連接
        ConnectionFactory connect = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER,ActiveMQConnection.DEFAULT_PASSWORD,"tcp://192.168.0.100:61616");
        try {
            Connection connection = connect.createConnection();
            //設(shè)置客戶端id
            connection.setClientID("userOne");
 
            connection.start();
            //第一個(gè)值表示是否使用事務(wù),如果選擇true,第二個(gè)值相當(dāng)于選擇0
            Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
            Topic topic = session.createTopic("boss speak");
            //在消費(fèi)端標(biāo)記,Session.createDurableSubscriber(Topic topic, String name)
            //是發(fā)布-訂閱持久化的接收端的設(shè)置。
            //參數(shù)  topic -> 與發(fā)送端的topic 對應(yīng),建立連接
            //參數(shù) name -> 為訂閱者的標(biāo)識(相當(dāng)于id)
            MessageConsumer consumer =session.createDurableSubscriber(topic,"userOne");
            //MessageConsumer consumer = session.createConsumer(topic);
            consumer.setMessageListener(new MessageListener() {
                @Override
                public void onMessage(Message message) {
                    if(message instanceof TextMessage){
                        try {
                            String text = ((TextMessage) message).getText();
                            System.out.println(text+",員工1這個(gè)月工資不要了。。。");
 
                            //session.rollback();
                            session.commit();
                        } catch (JMSException e) {
                            // TODO Auto-generated catch block
                            e.printStackTrace();
                        }
                    }
                }
            });
 
 
        }catch (Exception e){
            e.printStackTrace();;
        }
 
    }

consumer端2:

    public static void main(String[] args) {
        // 連接
        ConnectionFactory connect = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER,ActiveMQConnection.DEFAULT_PASSWORD,"tcp://192.168.0.100:61616");
        try {
            Connection connection = connect.createConnection();
            connection.start();
            //第一個(gè)值表示是否使用事務(wù),如果選擇true,第二個(gè)值相當(dāng)于選擇0
            Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
            //Destination :目標(biāo)
            Destination topic = session.createTopic("boss speak");
 
            MessageConsumer consumer = session.createConsumer(topic);
            consumer.setMessageListener(new MessageListener() {
                @Override
                public void onMessage(Message message) {
                    if(message instanceof TextMessage){
                        try {
                            String text = ((TextMessage) message).getText();
                            System.out.println(text+",員工2周末主動來加班。。。");
 
                            //session.rollback();
                            session.commit();
                        } catch (JMSException e) {
                            // TODO Auto-generated catch block
                            e.printStackTrace();
                        }
                    }
                }
            });
        }catch (Exception e){
            e.printStackTrace();;
        }
    }

關(guān)于事務(wù)控制

在這里插入圖片描述在這里插入圖片描述

持久化與非持久化

通過producer.setDeliveryMode(DeliveryMode.PERSISTENT) 進(jìn)行設(shè)置

持久化的好處就是當(dāng)activemq宕機(jī)的話,消息隊(duì)列中的消息不會丟失。非持久化會丟失。但是會消耗一定的性能。
與springboot整合

在application.properties文件中加入spring.activemq.broker-url=tcp://mq.server.com:61616
配置類ActiveMQConfig:項(xiàng)目啟動的時(shí)候加載并執(zhí)行里面所有的方法

@Configuration
public class ActiveMQConfig {
 
    @Value("${spring.activemq.broker-url:disabled}")
    String brokerURL ;
 
    @Value("${activemq.listener.enable:disabled}")
    String listenerEnable;
 
    @Bean
    public ActiveMQUtil getActiveMQUtil() throws JMSException {
        if(brokerURL.equals("disabled")){
            return null;
        }
        ActiveMQUtil activeMQUtil=new ActiveMQUtil();
        activeMQUtil.init(brokerURL);
        return  activeMQUtil;
    }
 
    //定義一個(gè)消息監(jiān)聽器連接工廠,這里定義的是點(diǎn)對點(diǎn)模式的監(jiān)聽器連接工廠
    @Bean(name = "jmsQueueListener")
    public DefaultJmsListenerContainerFactory jmsQueueListenerContainerFactory(ActiveMQConnectionFactory activeMQConnectionFactory ) {
        DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
        if(!listenerEnable.equals("true")){
            return null;
        }
 
        factory.setConnectionFactory(activeMQConnectionFactory);
        //設(shè)置并發(fā)數(shù)
        factory.setConcurrency("5");
 
        //重連間隔時(shí)間
       factory.setRecoveryInterval(5000L);
       factory.setSessionTransacted(false);
       factory.setSessionAcknowledgeMode(Session.CLIENT_ACKNOWLEDGE);
        return factory;
    }
 
 
    @Bean
    public ActiveMQConnectionFactory activeMQConnectionFactory ( ){
/*        if((url==null||url.equals(""))&&!brokerURL.equals("disabled")){
            url=brokerURL;
        }*/
        ActiveMQConnectionFactory activeMQConnectionFactory =
                new ActiveMQConnectionFactory(  brokerURL);
        return activeMQConnectionFactory;
    }
 
}

工具類ActiveMQUtil

public class ActiveMQUtil {
    PooledConnectionFactory pooledConnectionFactory=null;
 
    public ConnectionFactory init(String brokerUrl) {
 
        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(brokerUrl);
         //加入連接池
        pooledConnectionFactory=new PooledConnectionFactory(factory);
        //出現(xiàn)異常時(shí)重新連接
        pooledConnectionFactory.setReconnectOnException(true);
        //
        pooledConnectionFactory.setMaxConnections(5);
        pooledConnectionFactory.setExpiryTimeout(10000);
        return pooledConnectionFactory;
    }
 
    public ConnectionFactory getConnectionFactory(){
        return pooledConnectionFactory;
    }
}

案例:

controller:

@RequestMapping("/alipay/callback/return")
    public String callBackReturn(HttpServletRequest request,Map<String,String> paramsMap){// 頁面同步反轉(zhuǎn)的回調(diào)
        String out_trade_no = request.getParameter("out_trade_no");
        String trade_no = request.getParameter("trade_no");
        String sign = request.getParameter("sign");
        try {
            boolean b = AlipaySignature.rsaCheckV1(paramsMap,AlipayConfig.alipay_public_key,AlipayConfig.charset,AlipayConfig.sign_type);// 對支付寶回調(diào)簽名的校驗(yàn)
        } catch (AlipayApiException e) {
            e.printStackTrace();
        }
        // 修改支付信息
        PaymentInfo paymentInfo = new PaymentInfo();
        paymentInfo.setPaymentStatus("已支付");
        paymentInfo.setCallbackContent(request.getQueryString());
        paymentInfo.setOutTradeNo(out_trade_no);
        paymentInfo.setAlipayTradeNo(trade_no);
        paymentInfo.setCallbackTime(new Date());
 
        //這里使用Queue隊(duì)列
        // 發(fā)送系統(tǒng)消息,出發(fā)并發(fā)商品支付業(yè)務(wù)服務(wù)O2O消息隊(duì)列
        paymentService.sendPaymentSuccess(paymentInfo.getOutTradeNo(),paymentInfo.getPaymentStatus(),trade_no);
 
        paymentService.updatePayment(paymentInfo);
 
        return "finish";
    }

servcieimpl:

@Override
    public void sendPaymentSuccess(String outTradeNo, String paymentStatus,String trackingNo) {
        try {
            // 連接消息服務(wù)器
            ConnectionFactory connect = activeMQUtil.getConnectionFactory();
            Connection connection = connect.createConnection();
            connection.start();
            //第一個(gè)值表示是否使用事務(wù),如果選擇true,第二個(gè)值相當(dāng)于選擇0
            Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
            // 發(fā)送消息
            Queue testqueue = session.createQueue("PAYMENT_SUCCESS_QUEUE");
 
            MessageProducer producer = session.createProducer(testqueue);
            MapMessage mapMessage=new ActiveMQMapMessage();
            mapMessage.setString("out_trade_no",outTradeNo);
            mapMessage.setString("payment_status",paymentStatus);
            mapMessage.setString("tracking_no",trackingNo);
            producer.setDeliveryMode(DeliveryMode.PERSISTENT);
            producer.send(mapMessage);
            session.commit();// 事務(wù)型消息,必須提交后才生效
            connection.close();
        } catch (JMSException e) {
            e.printStackTrace();
        }

?