消息隊(duì)列:第六章:ObjectMessage與MapMessage

javax.jms.jmsexception:無(wú)法從內(nèi)容生成正文。可序列化類不可用于代理原因:java.lang.ClassNotFoundException:
禁止類com.javaliao.portal.model.TbLogVisit不信任將此類序列化為objectMessage負(fù)載。
有關(guān)如何配置受信任類的詳細(xì)信息,請(qǐng)查看http://activemq.apache.org/objectmessage.html。

    控制臺(tái)打?。?br>     javax.jms.JMSException: Failed to build body from content. Serializable class not available to broker.
    Reason: java.lang.ClassNotFoundException: Forbidden class com.javaliao.portal.model.TbLogVisit!
    This class is not trusted to be serialized as ObjectMessage payload. Please take a look at
    http://activemq.apache.org/objectmessage.html for more information on how to configure trusted classes.
        at org.apache.activemq.util.JMSExceptionSupport.create(JMSExceptionSupport.java:36)
        at org.apache.activemq.command.ActiveMQObjectMessage.getObject(ActiveMQObjectMessage.java:213)
        at com.javaliao.portal.listener.LogVisitListener.consumePaymentResult(LogVisitListener.java:50)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:497)
        at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:171)

出現(xiàn)這個(gè)問(wèn)題是因?yàn)椋?/p>

盡管通常不鼓勵(lì)使用ObjectMessage,因?yàn)樗谏a(chǎn)者和消費(fèi)者之間引入了類路徑的耦合,但ActiveMQ支持它們作為JMS規(guī)范的一部分。
安防
objectMessage對(duì)象依賴于marshal/unmarshal對(duì)象負(fù)載的Java序列化。此進(jìn)程通常被認(rèn)為是不安全的,因?yàn)閻阂庳?fù)載可以利用主機(jī)系統(tǒng)進(jìn)行攻擊。這就是為什么從版本5.12.2和5.13.0開(kāi)始,ActiveMQ強(qiáng)制用戶顯式地列出可以使用ObjectMessages交換的包的白名單。
如果需要交換對(duì)象消息,則需要添加應(yīng)用程序正在使用的包。通過(guò)使用org.apache.activemq.serializable_packages系統(tǒng)屬性(由代理和activemq客戶機(jī)庫(kù)解釋)可以做到這一點(diǎn)。您可以將此系統(tǒng)屬性添加到${activemq_home}/bin/env腳本中的activemq_opts變量。
例如
-dorg.apache.activemq.serializable_packages=java.lang,javax.security,java.util,org.apache.activemq,org.fusesource.hawtbuf,com.thoughtworks.xstream.mapper,com.mycompany.myapp
將com.mycompany.myapp包添加到受信任包列表中注意,這里列出的其他包在默認(rèn)情況下是啟用的,因?yàn)樗鼈兪浅R?guī)代理工作所必需的。如果您想簡(jiǎn)化此機(jī)制,可以使用*通配符來(lái)信任所有包,如
-dorg.apache.activemq.serializable_包=*
客戶
在客戶機(jī)端,您需要使用與在objectMessage.getObject()調(diào)用上反序列化惡意代碼相同的機(jī)制,從而破壞應(yīng)用程序的環(huán)境。您可以在代理上使用相同的配置機(jī)制,并使用系統(tǒng)屬性配置受信任的類但是,這在客戶端應(yīng)用程序中通常不方便,因此在5.12.2和5.13.1中,我們引入了使用activemqconnectionfactory的附加配置機(jī)制。定義了兩種附加方法:
setTrustedPackages()方法允許您設(shè)置要取消序列化的受信任包的列表,如
activemqconnectionfactory=new activemqconnectionfactory(“tcp://localhost:61616”);
factory.setTrustedPackages(新的ArrayList(Arrays.asList(“org.apache.activemq.test,org.apache.camel.test.split(“,”)));
setTrustAllPackages()允許關(guān)閉安全檢查并信任所有類。它對(duì)測(cè)試很有用。
activemqconnectionfactory=new activemqconnectionfactory(“tcp://localhost:61616”);
factory.setTrustAllPackages(真);
您可以在camel上下文中設(shè)置相同的屬性,如:```
org.apache.activemq.test org.apache.camel.test測(cè)試

```如果設(shè)置了系統(tǒng)屬性,則此配置將覆蓋這些屬性。

我的代碼:

發(fā)消息代碼:

        @Autowired
        ActiveMQUtil activeMQUtil;
     
        @Override
        public void insertLogVisit(TbLogVisit tbLogVisit) {
            //使用信息隊(duì)列發(fā)信息異步執(zhí)行保存到數(shù)據(jù)庫(kù)中
            try {
                // 連接消息服務(wù)器
                Connection connection = activeMQUtil.getConnection();
                connection.start();
                //第一個(gè)值表示是否使用事務(wù),如果選擇true,第二個(gè)值相當(dāng)于選擇0
                Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
                // 發(fā)送消息
                Queue testqueue = session.createQueue("LOG_VISIT_QUEUE");
                MessageProducer producer = session.createProducer(testqueue);
                ObjectMessage objectMessage = session.createObjectMessage();
                /*AMQ_SCHEDULED_DELAY     long     延遲投遞的時(shí)間
                AMQ_SCHEDULED_PERIOD     long     重復(fù)投遞的時(shí)間間隔
                AMQ_SCHEDULED_REPEAT     int     重復(fù)投遞次數(shù)
                AMQ_SCHEDULED_CRON     String     Cron表達(dá)式*/
                //設(shè)置時(shí)間為30s
                objectMessage.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY,1000*30);
                objectMessage.setObject(tbLogVisit);
                // 設(shè)置持久化傳輸
                producer.setDeliveryMode(DeliveryMode.PERSISTENT);
                producer.send(objectMessage);
                session.commit();// 事務(wù)型消息,必須提交后才生效
                connection.close();
            } catch (JMSException e) {
                e.printStackTrace();
            }
        }

 連接工具類:

    package com.javaliao.portal.util;
     
    import org.apache.activemq.ActiveMQConnectionFactory;
    import org.apache.activemq.pool.PooledConnectionFactory;
    import javax.jms.Connection;
    import javax.jms.JMSException;
    import java.util.ArrayList;
    import java.util.Arrays;
     
    public class ActiveMQUtil {
        PooledConnectionFactory pooledConnectionFactory=null;
        public  void init(String brokerUrl){
            ActiveMQConnectionFactory activeMQConnectionFactory=new ActiveMQConnectionFactory(brokerUrl);
            //設(shè)置白名單包
            activeMQConnectionFactory.setTrustedPackages(
                    new ArrayList(Arrays.asList("org.apache.activemq.test,org.apache.camel.test".split(","))));
            activeMQConnectionFactory.setTrustAllPackages(true);
            pooledConnectionFactory = new PooledConnectionFactory(activeMQConnectionFactory);
            pooledConnectionFactory.setExpiryTimeout(2000);
            pooledConnectionFactory.setMaximumActiveSessionPerConnection(10);
            pooledConnectionFactory.setMaxConnections(30);
            pooledConnectionFactory.setReconnectOnException(true);
        }
     
        public Connection getConnection(){
            Connection connection = null;
            try {
                connection = pooledConnectionFactory.createConnection();
            } catch (JMSException e) {
                e.printStackTrace();
            }
            return connection;
        }
    }

配置類:

    package com.javaliao.portal.config;
     
    import com.javaliao.portal.util.ActiveMQUtil;
    import org.apache.activemq.ActiveMQConnectionFactory;
    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.jms.config.DefaultJmsListenerContainerFactory;
     
    import javax.jms.JMSException;
    import javax.jms.Session;
     
    @Configuration
    public class ActiveMQConfig {
     
        @Value("${spring.activemq.broker-url:disabled}")
        String brokerURL ;
     
        @Value("${activemq.listener.enable:disabled}")
        String listenerEnable;
     
        @Bean
        public ActiveMQUtil getActiveMQUtil() throws JMSException {
            if(brokerURL.equals("disabled")){
                return null;
            }
            ActiveMQUtil activeMQUtil=new ActiveMQUtil();
            activeMQUtil.init(brokerURL);
            return  activeMQUtil;
        }
     
        @Bean
        public ActiveMQConnectionFactory activeMQConnectionFactory ( ){
    /*        if((url==null||url.equals(""))&&!brokerURL.equals("disabled")){
                url=brokerURL;
            }*/
            ActiveMQConnectionFactory activeMQConnectionFactory =
                    new ActiveMQConnectionFactory(  brokerURL);
            return activeMQConnectionFactory;
        }
     
        //定義一個(gè)消息監(jiān)聽(tīng)器連接工廠,這里定義的是點(diǎn)對(duì)點(diǎn)模式的監(jiān)聽(tīng)器連接工廠
        @Bean(name = "jmsQueueListener")
        public DefaultJmsListenerContainerFactory jmsQueueListenerContainerFactory(ActiveMQConnectionFactory activeMQConnectionFactory ) {
            DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
            if(!listenerEnable.equals("true")){
                return null;
        }
            factory.setConnectionFactory(activeMQConnectionFactory);
        //設(shè)置并發(fā)數(shù)
            factory.setConcurrency("5");
        //重連間隔時(shí)間
           factory.setRecoveryInterval(5000L);
           factory.setSessionTransacted(false);
           factory.setSessionAcknowledgeMode(Session.CLIENT_ACKNOWLEDGE);
            return factory;
        }
     
     
     
    }

 實(shí)體類:

    import java.io.Serializable;
    import java.util.Date;
     
    public class TbLogVisit implements Serializable {
     
        private static final long serialVersionUID = 2659644940173539668L;
     
        private Long id;
     
        private String visitIpAddress;
     
        private String visitHostName;
     
        private String visitChannel;
     
        private String visitDescription;
     
        private String visitApi;
     
        private String visitParams;
     
        private String visitUrl;
     
        private String visitTimeConsuming;
     
        private String visitResult;
     
        private Long visitNum;
     
        private String visitThrowingErro;
     
        private Date visitStartTime;
     
        private Date visitEndTime;
     
        private Date createTime;
     
        private Date updateTime;

 接收消息:

    package com.javaliao.portal.listener;
     
    import com.javaliao.portal.log4j.BaseLogger;
    import com.javaliao.portal.model.TbLogVisit;
    import com.javaliao.portal.service.ActionService;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.jms.annotation.JmsListener;
    import org.springframework.stereotype.Component;
    import javax.jms.JMSException;
    import javax.jms.Message;
    import javax.jms.ObjectMessage;
     
    @Component
    public class LogVisitListener {
     
        @Autowired
        ActionService actionService;
     
        /**
         * 為了體現(xiàn)差距,專門做了個(gè)很大的class,使用json轉(zhuǎn)換后大概35MB左右
         * 使用json傳輸?shù)那闆r,單位均為ms:
         * 總時(shí)間:17366
         * 傳輸時(shí)間:220
         * 發(fā)送者把object轉(zhuǎn)為json的時(shí)間:6271
         * 發(fā)送總共時(shí)間:10000
         * 接收者把message轉(zhuǎn)換為textMessage時(shí)間:0
         * 接收者把json轉(zhuǎn)換為object時(shí)間:7146。
         * ****************************************
         * 使用ObjectMessage進(jìn)行傳輸?shù)那闆r:
         * 總時(shí)間:6742
         * 傳輸時(shí)間:173
         * 發(fā)送總時(shí)間:4836
         * 接收者把message轉(zhuǎn)換為ObjectMessage時(shí)間:1733
         * ******************************************
         * 結(jié)論
         * 雖然沒(méi)做壓力測(cè)試,雖然只測(cè)了一次,雖然測(cè)試環(huán)境僅僅是我的筆記本,但我想我已經(jīng)可以做出結(jié)論了。
         * 在server之間的異步通信時(shí),object優(yōu)于json。
         * 優(yōu)勢(shì)主要集中于java序列化和對(duì)象之間轉(zhuǎn)換的效率遠(yuǎn)高于json盒對(duì)象轉(zhuǎn)換的效率,
         * 另外序列化后對(duì)象的大小比json的小也是有利傳輸?shù)脑颉?br>          */
     
        @JmsListener(containerFactory = "jmsQueueListener" ,destination = "LOG_VISIT_QUEUE")
        public void consumePaymentResult(Message mapMessage){
            try {
                ObjectMessage tbLogVisitObject = (ObjectMessage) mapMessage;
                TbLogVisit object = (TbLogVisit) tbLogVisitObject.getObject();
                int count = actionService.insertLog(object);
                if(count < 1){
                    BaseLogger.info("日志更新失敗");
                }
            } catch (JMSException e) {
                e.printStackTrace();
            }
        }
     
    }

 application.preperties:

    #activemq信息隊(duì)列
    spring.activemq.broker-url=tcp://192.168.134.100:61616
    activemq.listener.enable=true

 

 配置那邊:

 

 我的是配置在linux系統(tǒng)下

 

 然并卵,沒(méi)什么用,然后強(qiáng)迫我使用String類型,改后代碼:

        @Override
        public void insertLogVisit(TbLogVisit tbLogVisit) {
            try {
                // 連接消息服務(wù)器
                Connection connection = activeMQUtil.getConnection();
                connection.start();
                //第一個(gè)值表示是否使用事務(wù),如果選擇true,第二個(gè)值相當(dāng)于選擇0
                Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
                // 發(fā)送消息
                Queue testqueue = session.createQueue("LOG_VISIT_QUEUE");
                MessageProducer producer = session.createProducer(testqueue);
                MapMessage mapMessage=new ActiveMQMapMessage();
                String toString = JSONObject.fromObject(tbLogVisit).toString();
                mapMessage.setString("tbLogVisit",toString);
                producer.setDeliveryMode(DeliveryMode.PERSISTENT);
                producer.send(mapMessage);
                session.commit();// 事務(wù)型消息,必須提交后才生效
                connection.close();
            } catch (JMSException e) {
                e.printStackTrace();
            }
        }

 接收消息:

        @JmsListener(containerFactory = "jmsQueueListener" ,destination = "LOG_VISIT_QUEUE")
        public void consumeLogResult(MapMessage mapMessage){
            try {
                String object = mapMessage.getString("tbLogVisit");
                JSONObject jsonObject = new JSONObject().fromObject(object);
                TbLogVisit logVisit = (TbLogVisit) JSONObject.toBean(jsonObject, TbLogVisit.class);
                int count = actionService.insertLog(logVisit);
                if(count < 1){
                    BaseLogger.info("日志更新失敗");
                }
            } catch (JMSException e) {
                e.printStackTrace();
            }
        }

 然后控制臺(tái):