消息隊(duì)列:第三章:在Java中使用消息隊(duì)列
在項(xiàng)目中導(dǎo)入依賴坐標(biāo)
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-activemq</artifactId>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-pool</artifactId>
<version>5.15.2</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
</dependency>
[點(diǎn)擊并拖拽以移動]
使用隊(duì)列queue的情況
producer端
public static void main(String[] args) {
ConnectionFactory connect = new ActiveMQConnectionFactory("tcp://192.168.0.100:61616");
try {
//創(chuàng)建連接對象
Connection connection = connect.createConnection();
connection.start();
//第一個(gè)值表示是否使用事務(wù),如果選擇true,第二個(gè)值相當(dāng)于選擇0
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
//使用隊(duì)列queue
Queue testqueue = session.createQueue("boss drink");
//創(chuàng)建提供者
MessageProducer producer = session.createProducer(testqueue);
TextMessage textMessage=new ActiveMQTextMessage();
textMessage.setText("我渴了,我要喝水!");
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
producer.send(textMessage);
session.commit();// 事務(wù)型消息,必須提交后才生效
connection.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
consumer端1
public static void main(String[] args) {
// 連接
ConnectionFactory connect = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER,ActiveMQConnection.DEFAULT_PASSWORD,"tcp://192.168.0.100:61616");
try {
Connection connection = connect.createConnection();
connection.start();
//第一個(gè)值表示是否使用事務(wù),如果選擇true,第二個(gè)值相當(dāng)于選擇0
Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
Destination testqueue = session.createQueue("boss drink");
MessageConsumer consumer = session.createConsumer(testqueue);
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
if(message instanceof TextMessage){
try {
String text = ((TextMessage) message).getText();
System.out.println(text+",員工1馬上拿起杯子打水。。。");
//session.rollback();
session.commit();
} catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
});
}catch (Exception e){
e.printStackTrace();;
}
}
consumer端2
public static void main(String[] args) {
// 連接
ConnectionFactory connect = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER,ActiveMQConnection.DEFAULT_PASSWORD,"tcp://192.168.0.100:61616");
try {
Connection connection = connect.createConnection();
connection.start();
//第一個(gè)值表示是否使用事務(wù),如果選擇true,第二個(gè)值相當(dāng)于選擇0
Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
Destination testqueue = session.createQueue("boss drink");
MessageConsumer consumer = session.createConsumer(testqueue);
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
if(message instanceof TextMessage){
try {
String text = ((TextMessage) message).getText();
System.out.println(text+",員工2馬上拿起杯子打水。。。");
//session.rollback();
session.commit();
} catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
});
}catch (Exception e){
e.printStackTrace();;
}
}
使用topic話題的情況:
producer端:
public static void main(String[] args) {
ConnectionFactory connect = new ActiveMQConnectionFactory("tcp://192.168.0.100:61616");
try {
//創(chuàng)建連接對象
Connection connection = connect.createConnection();
connection.start();
//第一個(gè)值表示是否使用事務(wù),如果選擇true,第二個(gè)值相當(dāng)于選擇0
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
//使用topic話題(訂閱)
Topic topic = session.createTopic("boss speak");
//創(chuàng)建提供者
MessageProducer producer = session.createProducer(topic);
TextMessage textMessage=new ActiveMQTextMessage();
textMessage.setText("我們要為中國的偉大復(fù)興而努力奮斗!");
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
producer.send(textMessage);
session.commit();// 事務(wù)型消息,必須提交后才生效
connection.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
consumer端1:
public static void main(String[] args) {
// 連接
ConnectionFactory connect = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER,ActiveMQConnection.DEFAULT_PASSWORD,"tcp://192.168.0.100:61616");
try {
Connection connection = connect.createConnection();
//設(shè)置客戶端id
connection.setClientID("userOne");
connection.start();
//第一個(gè)值表示是否使用事務(wù),如果選擇true,第二個(gè)值相當(dāng)于選擇0
Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
Topic topic = session.createTopic("boss speak");
//在消費(fèi)端標(biāo)記,Session.createDurableSubscriber(Topic topic, String name)
//是發(fā)布-訂閱持久化的接收端的設(shè)置。
//參數(shù) topic -> 與發(fā)送端的topic 對應(yīng),建立連接
//參數(shù) name -> 為訂閱者的標(biāo)識(相當(dāng)于id)
MessageConsumer consumer =session.createDurableSubscriber(topic,"userOne");
//MessageConsumer consumer = session.createConsumer(topic);
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
if(message instanceof TextMessage){
try {
String text = ((TextMessage) message).getText();
System.out.println(text+",員工1這個(gè)月工資不要了。。。");
//session.rollback();
session.commit();
} catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
});
}catch (Exception e){
e.printStackTrace();;
}
}
consumer端2:
public static void main(String[] args) {
// 連接
ConnectionFactory connect = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER,ActiveMQConnection.DEFAULT_PASSWORD,"tcp://192.168.0.100:61616");
try {
Connection connection = connect.createConnection();
connection.start();
//第一個(gè)值表示是否使用事務(wù),如果選擇true,第二個(gè)值相當(dāng)于選擇0
Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
//Destination :目標(biāo)
Destination topic = session.createTopic("boss speak");
MessageConsumer consumer = session.createConsumer(topic);
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
if(message instanceof TextMessage){
try {
String text = ((TextMessage) message).getText();
System.out.println(text+",員工2周末主動來加班。。。");
//session.rollback();
session.commit();
} catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
});
}catch (Exception e){
e.printStackTrace();;
}
}
關(guān)于事務(wù)控制
持久化與非持久化
通過producer.setDeliveryMode(DeliveryMode.PERSISTENT) 進(jìn)行設(shè)置
持久化的好處就是當(dāng)activemq宕機(jī)的話,消息隊(duì)列中的消息不會丟失。非持久化會丟失。但是會消耗一定的性能。
與springboot整合
在application.properties文件中加入spring.activemq.broker-url=tcp://mq.server.com:61616
配置類ActiveMQConfig:項(xiàng)目啟動的時(shí)候加載并執(zhí)行里面所有的方法
@Configuration
public class ActiveMQConfig {
@Value("${spring.activemq.broker-url:disabled}")
String brokerURL ;
@Value("${activemq.listener.enable:disabled}")
String listenerEnable;
@Bean
public ActiveMQUtil getActiveMQUtil() throws JMSException {
if(brokerURL.equals("disabled")){
return null;
}
ActiveMQUtil activeMQUtil=new ActiveMQUtil();
activeMQUtil.init(brokerURL);
return activeMQUtil;
}
//定義一個(gè)消息監(jiān)聽器連接工廠,這里定義的是點(diǎn)對點(diǎn)模式的監(jiān)聽器連接工廠
@Bean(name = "jmsQueueListener")
public DefaultJmsListenerContainerFactory jmsQueueListenerContainerFactory(ActiveMQConnectionFactory activeMQConnectionFactory ) {
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
if(!listenerEnable.equals("true")){
return null;
}
factory.setConnectionFactory(activeMQConnectionFactory);
//設(shè)置并發(fā)數(shù)
factory.setConcurrency("5");
//重連間隔時(shí)間
factory.setRecoveryInterval(5000L);
factory.setSessionTransacted(false);
factory.setSessionAcknowledgeMode(Session.CLIENT_ACKNOWLEDGE);
return factory;
}
@Bean
public ActiveMQConnectionFactory activeMQConnectionFactory ( ){
/* if((url==null||url.equals(""))&&!brokerURL.equals("disabled")){
url=brokerURL;
}*/
ActiveMQConnectionFactory activeMQConnectionFactory =
new ActiveMQConnectionFactory( brokerURL);
return activeMQConnectionFactory;
}
}
工具類ActiveMQUtil
public class ActiveMQUtil {
PooledConnectionFactory pooledConnectionFactory=null;
public ConnectionFactory init(String brokerUrl) {
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(brokerUrl);
//加入連接池
pooledConnectionFactory=new PooledConnectionFactory(factory);
//出現(xiàn)異常時(shí)重新連接
pooledConnectionFactory.setReconnectOnException(true);
//
pooledConnectionFactory.setMaxConnections(5);
pooledConnectionFactory.setExpiryTimeout(10000);
return pooledConnectionFactory;
}
public ConnectionFactory getConnectionFactory(){
return pooledConnectionFactory;
}
}
案例:
controller:
@RequestMapping("/alipay/callback/return")
public String callBackReturn(HttpServletRequest request,Map<String,String> paramsMap){// 頁面同步反轉(zhuǎn)的回調(diào)
String out_trade_no = request.getParameter("out_trade_no");
String trade_no = request.getParameter("trade_no");
String sign = request.getParameter("sign");
try {
boolean b = AlipaySignature.rsaCheckV1(paramsMap,AlipayConfig.alipay_public_key,AlipayConfig.charset,AlipayConfig.sign_type);// 對支付寶回調(diào)簽名的校驗(yàn)
} catch (AlipayApiException e) {
e.printStackTrace();
}
// 修改支付信息
PaymentInfo paymentInfo = new PaymentInfo();
paymentInfo.setPaymentStatus("已支付");
paymentInfo.setCallbackContent(request.getQueryString());
paymentInfo.setOutTradeNo(out_trade_no);
paymentInfo.setAlipayTradeNo(trade_no);
paymentInfo.setCallbackTime(new Date());
//這里使用Queue隊(duì)列
// 發(fā)送系統(tǒng)消息,出發(fā)并發(fā)商品支付業(yè)務(wù)服務(wù)O2O消息隊(duì)列
paymentService.sendPaymentSuccess(paymentInfo.getOutTradeNo(),paymentInfo.getPaymentStatus(),trade_no);
paymentService.updatePayment(paymentInfo);
return "finish";
}
servcieimpl:
@Override
public void sendPaymentSuccess(String outTradeNo, String paymentStatus,String trackingNo) {
try {
// 連接消息服務(wù)器
ConnectionFactory connect = activeMQUtil.getConnectionFactory();
Connection connection = connect.createConnection();
connection.start();
//第一個(gè)值表示是否使用事務(wù),如果選擇true,第二個(gè)值相當(dāng)于選擇0
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
// 發(fā)送消息
Queue testqueue = session.createQueue("PAYMENT_SUCCESS_QUEUE");
MessageProducer producer = session.createProducer(testqueue);
MapMessage mapMessage=new ActiveMQMapMessage();
mapMessage.setString("out_trade_no",outTradeNo);
mapMessage.setString("payment_status",paymentStatus);
mapMessage.setString("tracking_no",trackingNo);
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
producer.send(mapMessage);
session.commit();// 事務(wù)型消息,必須提交后才生效
connection.close();
} catch (JMSException e) {
e.printStackTrace();
}
?