knight_ka | 生活及学习笔记

JMS及它的一个实现ActiveMQ使用实例

JMS及它的一个实现ActiveMQ使用实例

JMS是什么

首先,JMS的全称是Java Message Service,Java消息服务。它是Java为所有的消息队列而创建的一种规范,目的是使开发者可以使用同样的API去调用不同的消息中间件。

在WEB应用开发中,会有很多的消息中间件,它们之间各有各的特点,但是在Java中使用这些消息中间件的时候如果没有一种同一个规范,可能就会导致ActiveMQ有一套自己的API,RabbitMQ有一套自己的API 等等,不同的消息中间件使用的是不同的API,这对于Java开发者来说是一件非常痛苦的事情,因为你要根据不同的消息中间件,去学习不同使用方法,所以Java就为我们提供了JMS这种通用的接口,它规定了,所有的消息中间件要想在Java中使用,都要按照JMS的规定去开发自己的API,这样Java开发者就可以非常轻松的使用同样的API去调用不同的消息中间件。

JMS中的其中角色

  • JMS提供者:也就是ActiveMQ,RabbitMQ,Kafka等这些实现了JMS的提供商
  • JMSk客户: 也就是使用JMS去发布消息或者消费消息的应用程序。如JMS生产者和消费者都是客户。
  • JMS生产者: 也就是专门用来生产消息的客户。
  • JMS消费者: 也就是专门用来消费消息的客户。
  • JMS消息: JMS客户之间传递的数据
  • JMS队列: 容纳发送数据的一个容器。队列中的消息一旦被阅读,就会从该队列中移除
  • JMS主题: 一种支持,一条消息可以使多个消费者接受的机制

JMS中的两种模型

一、 点对点模型Queue(或称队列模型)

二、 发布订阅模型Topic(或称主题模型)

这两种模型的区别,将在下面用ActiveMQ使用示例中用代码说明。

ActiveMQ实例

Topic 发布订阅模型消息消费方代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
package io.github.dearas.Activemq.Topic;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
/**
* AvtiveMQ 消息队列主题模式消息消费方Demo
* 主题模式与点对点(一对一)模式的区别:
* 1.一对一模式,一条消息只能被一个消费者消费,也就是消息生产者生产消息到Queue中,只有一个消费方可以消费到这个消息。如果有多个消息消费者指向同一个队列,那么最先`抢`到这个消息的消费者可以消费此消息,其他的消费者只能拿到队列里的下个条消息。也就是说:同一条消息只可能被消费一次。
* 2.发布订阅模式(主题模式),一条消息可以被多个消费者消费。也就是说,消息生产者生产一条消息到一个主题中,则提供者ActiveMQ就会把这条消息推送到所有已经订阅了这个主题的消息消费者的队列中。(请区分消息提供者和消息消费者的区别)
*
* 3.一对一模式,生产者设置消息传递模式为持久化DeliverMode.PERSISTENT后,生产消息到消费者队列中,如果此时消费者不在线,在消费者上线后同样可以接受到队列里的消息。如果设置为DeliverMode.NON_PERSISTENT非持久化模式,则消息消费者上线后接收不到离线这段时间发送过来的消息。
* 4.主题模式,生产者设置消息传递模式为持久化DeliverMode.PERSISTENT后,生产者生产消息到主题中,而主题订阅者可以分为普通订阅者和持久订阅者,持久订阅者就是在消费者下线期间,所产生的消息,在上线后同样可以收到(前提:1.注册一个ClientId.并且clientId必须是唯一的,不然会报错。2.消息生产者设置消息传递模式为持久模式)。
* Created by tnp on 22/04/2017.
*/
public class TopicCustomerTest implements MessageListener,ExceptionListener{
public static void main(String[] args) throws JMSException, InterruptedException {
TopicCustomerTest test = new TopicCustomerTest();
//1.初始化activeMQ
test.initialize();
//2.消费消息
test.consumerMessage();
Thread.sleep(Long.MAX_VALUE);
//3.关闭连接 释放资源
test.close();
}
// 默认用户名
private String username = ActiveMQConnection.DEFAULT_USER;
// 默认密码
private String password = ActiveMQConnection.DEFAULT_PASSWORD;
// 默认ActiveMQ地址
private String brokerurl = "failover://tcp://127.0.0.1:61616";
// private String brokerurl = ActiveMQConnection.DEFAULT_BROKER_URL;
private MessageConsumer consumer;
private Session session;
private Connection connection;
/**
* 1.创建一个ActiveMQ的连接工厂
* 2.从连接工厂中拿出一个MQ连接。这些MQ连接都实现了JMS的Connection接口。
* 3.利用MQ连接对象获取一个本次会话对象Session。可以利用session创建Message,MessageCustomer,MessageProducer
* 4.利用session创建一个消息目的地Destination。Queue或者Topic
* 5.利用session创建一个消息消费者对象MessageCustomer,需要传入目的地。
* @throws JMSException
*/
private void initialize() throws JMSException {
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(username, password, brokerurl);
// MQ连接
connection = factory.createConnection();
connection.setClientID("tnp"); // 必须在创建会话之前设置clientID
// MQ会话对象 设置不开启事务,自动接收消息
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 目的地 Destination. Topic Queue都实现了JMS的Destination
Topic mytopic = session.createTopic("mytopic-2");
// 创建普通消息消费者
//consumer = session.createConsumer(mytopic);
// 创建持久消息订阅者
// 如果不配置clientID 会报错:You cannot create a durable subscriber without specifying a unique clientID on a Connection
// 所以需要设置一个ClientID 并且是唯一的
consumer = session.createDurableSubscriber(mytopic,"tnp");
}
/**
* 消费消息:
* 注意:消费消息有两种方式,一种是使用JMS提供的消息监听器MessageListener,重写onMessage方法
* 另一种是直接调用消息消费者MessageConsumer对象的receiver()方法。
* @throws JMSException
*/
private void consumerMessage() throws JMSException {
// 开启连接
connection.start();
// 设置消息监听器
consumer.setMessageListener(this);
// 设置异常监听器
connection.setExceptionListener(this);
//如果设置了MessageListener,再使用receive则会报错。Cannot synchronously receive a message when a MessageListener is set
//同一个session下只能使用一中方式接受消息
// TextMessage message = (TextMessage)consumer.receive();
// System.out.println("消息消费中,消息id为: " + message.getJMSMessageID());
}
/**
* 释放资源
*/
private void close() throws JMSException {
if(consumer!=null) consumer.close();
if(session!=null) session.close();
if(connection!=null) connection.close();
}
/**
* 消息监听器:
* 消息监听器的重写方法。当有消息进入队列或主题时自动回调本方法。
* @param message
*/
@Override
public void onMessage(Message message) {
TextMessage textMessage = (TextMessage)message;
try {
// 打印消息内容
System.out.println(textMessage.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
/**
* 异常监听器
* @param exception
*/
@Override
public void onException(JMSException exception) {
System.out.println("程序抛出异常,err: ");
exception.printStackTrace();
}
}

Topic 发布订阅模型消息发送方代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
package io.github.dearas.Activemq.Topic;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
/**
* Tpoic 主题模式,消息生产者。
* 测试:
* 1.生产者使用非持久化模式生产消息。消费者是普通消费者时,测试消费者能否接收到下线期间的消息: 接受不到
* 2.生产者使用持久化模式生产消息。消费者是普通消费者时,测试消费者能否接收到下线期间的消息: 接受不到
* 3.生产者使用非持久化模式生产消息,消费者是持久订阅者,测试消费者能否接收到下线期间的消息: 接受不到
* 4.生产者使用持久化模式生产消息,消费者是持久订阅者,测试消费者能否接收到下线期间的消息: 接受到了!!
* 5.在订阅者为订阅之前,发布一条主题消息,当订阅者订阅后,测试是否能够接收到未订阅之前的消息: 接受不到
* 结论:
* 1.在发布订阅模式下,不管是普通订阅者还是持久订阅者,订阅者都只能接受到订阅了该主题之后,主题中发布的消息。无法接受未订阅之前的消息。
* 2.在发布订阅模式下,只有发布者使用持久化消息模式发布,并且订阅者是持久化订阅者,这时候订阅者才能接受到在下线期间所产生的消息。
* 订阅者注意事项:
* 1.同一个Session下消息的接受方式只能使用一种,要么使用Listener,要么手动receive。
* 2.持久订阅者必须设置一个唯一的ClientID,并且必须在没有创建会话之前设置。
* Created by tnp on 23/04/2017.
*/
public class TopicProducerTest {
public static void main(String[] args) throws JMSException {
TopicProducerTest test = new TopicProducerTest();
// 1.初始化ActiveMQ
test.initialize();
// 2.生产消息
test.messsageProducer();
//3.释放资源 关闭连接
test.close();
}
// 默认用户名
private String username = ActiveMQConnection.DEFAULT_USER;
// 默认密码
private String password = ActiveMQConnection.DEFAULT_PASSWORD;
// 默认ActiveMQ地址
private String brokerurl = "failover://tcp://127.0.0.1:61616";
private Connection connection;
private Session session;
private MessageProducer producer;
private void initialize() throws JMSException {
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(username, password, brokerurl);
//1.获取一个MQ链接
connection = factory.createConnection();
//2.获取一个会话
session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
//3.获取目的地
Topic mytopic = session.createTopic("mytopic-2");
//4.获取消息生产者 此步骤会产生以下Topic或者Queue
producer = session.createProducer(mytopic);
// 设置消息传递模式为持久化模式 默认就是持久化
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
}
private void messsageProducer() throws JMSException {
// 1.开启链接
connection.start();
// 2.创建一个Text类型的消息进行发送。 还有ObjectMessage
TextMessage textMessage = session.createTextMessage();
textMessage.setText("Everything will be ok! +++ 4");
// 3.发送消息
producer.send(textMessage);
}
//释放资源
private void close() throws JMSException {
if(producer!=null) producer.close();
if(session!=null) session.close();
if(connection!=null) connection.close();
}
}

Queue 点对点模型消息消费方代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
package io.github.dearas.Activemq.Queue;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
/**
* ActiveMQ 消息模型:一对一 Queue队列模式消息
* Queue一对一队列:
* 生产者向固定的消费者队列发送消息。
* 消息生产者.
* Created by tnp on 22/04/2017.
*/
public class ConsumerTest implements MessageListener{
//ActiveMQ连接工厂
private ActiveMQConnectionFactory connectionFactory;
// JMS 连接对象
private Connection activeMQConnection;
// JMS 会话 session是一个发送和接受消息的线程。可以用它来创建MessageProducer MessageConsumer Message
private Session session;
// JMS 目的地对象 获取是队列,还是主题
private Destination destination;
// JMS 从目的地中取出消息的消息消费者
private MessageConsumer messageConsumer;
// 初始化ActiveMQ
private void initialize() throws JMSException {
// 创建一个MQ的连接工厂
connectionFactory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER,
ActiveMQConnection.DEFAULT_PASSWORD, "failover://tcp://192.168.1.99:61616");
// 从连接工厂中获取一个MQ连接
activeMQConnection = connectionFactory.createConnection();
/**
* 通过连接Connection获取一个会话对象Session。
* transacted 为事务标识 acknowledgeMode 为签收模式
* AUTO_ACKNOWLEDGE : 自动签收消息
* CLIENT_ACKNOWLEDGE : 客户端手动确认消息
* DUPS_OK_ACKNOWLEDGE : 自动批量确认消息
* SESSION_TRANSACTED : 事务提交并确认
*/
session = activeMQConnection.createSession(Boolean.FALSE,Session.AUTO_ACKNOWLEDGE);
// 利用Session创建一个队列(一对一队列)
destination = session.createQueue("tnp.queue");
// 目的地消息消费者
messageConsumer = session.createConsumer(destination);
}
/**
* 消费消息
* 同一个session下面,要么都是用receive()接受消息。要么都使用MessageListener接受消息
* @throws JMSException
*/
private void consumeMessage() throws JMSException {
// 开启连接
activeMQConnection.start();
// 设置消息监听器-当有消息时触发
messageConsumer.setMessageListener(this);
// 用连接对象设置异常监听器 当有异常发生时触发
activeMQConnection.setExceptionListener(null);
// 接受消息
// Message receive = messageConsumer.receive();
// System.out.println(receive.getJMSMessageID());
}
private void close() throws JMSException {
if (messageConsumer != null)
messageConsumer.close();
if (session != null)
session.close();
if (activeMQConnection != null)
activeMQConnection.close();
}
public static void main(String[] args) throws JMSException, InterruptedException {
ConsumerTest test = new ConsumerTest();
test.initialize();
test.consumeMessage();
Thread.sleep(5000000);
test.close();
}
/**
* 消息监听器,如果有消息进入队列,则自动触发。
* 消息监听器不能和received共存。同一个session下要么都用监听器接受消息,要么都用receive接受。
* 如果既用监听器又用receive接受,程序会抛异常。
* @param message
*/
@Override
public void onMessage(Message message) {
try {
System.out.println(message.getJMSMessageID());
System.out.println(((TextMessage) message).getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
}

Queue 点对点模型消息发送方代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
package io.github.dearas.Activemq.Queue;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
/**
* 一对一生产消费模式:消息生产者。
*
* Created by tnp on 22/04/2017.
*/
public class ProducerTest {
// activeMq链接
private Connection connection;
// 消息生产者
private MessageProducer messageProducer;
// 会话者
private Session session;
private void initialize() throws JMSException {
//1.获取一个ActiveMQ连接工厂
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER,
ActiveMQConnection.DEFAULT_PASSWORD,
"failover://tcp://192.168.1.99:61616");
//2.获取一个ActiveMQ链接
connection = activeMQConnectionFactory.createConnection();
//3.获取一个会话 不开启事务,默认自动接收消息
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//4.通过session获取一个队列的目的地 Queue 或者 Topic
Destination destination = session.createQueue("tnp.queue");
//5.通过session获取一个消息创建者
messageProducer = session.createProducer(destination);
messageProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); //持久化
}
// 生产消息
private void producerMessage() throws JMSException {
connection.start();
//TextMessage textMessage = session.createTextMessage()
TextMessage textMessage = session.createTextMessage();
textMessage.setText("Vinci Come On !");
messageProducer.send(textMessage);
}
private void close() throws JMSException {
if (messageProducer != null)
messageProducer.close();
if (session != null)
session.close();
if (connection != null)
connection.close();
}
public static void main(String[] args) throws JMSException {
ProducerTest test = new ProducerTest();
test.initialize();
test.producerMessage();
test.close();
}
}

ActiveMQ实例使用总结

点对点模式 与 发布订阅模式的区别

1.点对点模式,一条消息只能被一个消费者消费; 而发布订阅模式,一条消息可以被多个消费者消费。

2.点对点模式,消息发送者设置消息模式为持久模式Delivery.PERSISTENT后,消费者即使不在线。同样能够接受到下线期间的消息; 而发布订阅模式下,只有消息发送者设置为持久下次,并且,消息订阅者(也就是消息消费者)设置为持久订阅者才可以接受到下线期间的消息(配置方式见代码)。

值得注意的地方

1.同一个Session下消息的接受方式只能使用一种,要么使用Listener,要么手动receive。

2.持久订阅者必须设置一个唯一的ClientID,并且必须在没有创建会话之前设置。因为持久订阅者是根据clientID来发送消息的,如果没有clientid,mq则不知道该向谁发送这条消息。