0%

SpringBoot2.x(十三)整合ActiveMQ

前言


JMS简介

常用场景

  • 跨平台
  • 多语言
  • 多项目
  • 解耦
  • 分布式事务
  • 流量控制
  • 最终一致性
  • RPC调用
    • 上下游对接,数据源变动->通知下属

基础概念

  • JMS提供者:Apache ActiveMQ、RabbitMQ、Kafka、Notify、MetaQ、RocketMQ
  • JMS生产者(Message Producer):监听、接收消息
  • JMS消费者(Message Consumer):生产、发送消息
  • JMS消息:如订单号
  • JMS队列:消息生产后的去处、接收消息的来源
  • 点对点:消息只能从一端到另一端
  • 发布订阅:消息可以从一端到另多端

SpringBoot2.x整合ActiveMQ之点对点

环境搭建

如果你的IDE是IDEA,直接通过 new module->spring initializer搜索 jms并选中JMS(ActiveMQ)即可快速创建一个带 ActiveMQ依赖的SpringBoot应用。

当然你也可以直接 new empty module之后引入如下依赖:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
<!-- 整合消息队列ActiveMQ -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-activemq</artifactId>
</dependency>

<!-- 如果配置线程池则加入 -->
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-pool</artifactId>
</dependency>

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-devtools</artifactId>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>

application.properties中添加SpringBoot整合ActiveMQ的属性配置

1
2
3
4
5
6
7
8
9
10
spring.activemq.broker-url=tcp://127.0.0.1:61616

#集群配置
#spring.activemq.broker-url=failover:(tcp://localhost:61616,tcp://localhost:61617)

spring.activemq.user=admin
spring.activemq.password=admin
#下列配置要增加activemq-pool依赖
spring.activemq.pool.enabled=true
spring.activemq.pool.max-connections=100

值得注意的是,连接非本机部署的ActiveMQ服务时要开启其访问端口(ActiveMQweb控制台端口:8161,客户端tcp连接端口:61616)

按照以上步骤创建两个工程 jms-producerjms-consumer模拟微服务通信

jms-producer

编写业务类

  1. 创建消息生产业务类 SmsProducer(这里以短信验证码业务为例):
  2. SmsProducer上添加 @Component使其纳入spring容器管理
  3. 在类中注入 JmsMessageTemplate,其相比 JmsTemplate而言封装力度更大,操作更简便
  4. 使用 jmsMessageTemplate发送消息,同时指定消息要发往的目的地
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
package top.zhenganwen.jmsproducer.jms.producer;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsMessagingTemplate;
import org.springframework.stereotype.Component;

import javax.jms.Queue;
import java.util.HashMap;
import java.util.Map;

/**
* SmsProducer class
* <p>
* 模拟短信服务生产者:
* 将手机号和短信发送至队列,短信服务消费者调用第三方短信服务接口(如阿里大于)发送短信
* 由于只需发送一次,因此采用点对点模式
*
* @author zhenganwen
* @date 2018/7/23
*/
@Component
public class SmsProducer {

/**
* 该类继承JmsTemplate,封装好了许多方法,更加便捷
*/
@Autowired
private JmsMessagingTemplate jmsTemplate;

/**
*
* @param phone 用户手机号
* @param code 短信验证码
*/
public void sendSmsCode(String phone, String code) {
Map msgMap = new HashMap();
msgMap.put("phone", phone);
msgMap.put("code", code);
jmsTemplate.convertAndSend("sms",msgMap);
}
}

测试业务类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
package top.zhenganwen.jmsproducer;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
import top.zhenganwen.jmsproducer.jms.producer.SmsProducer;

@RunWith(SpringRunner.class)
@SpringBootTest
public class JmsProducerApplicationTests {

@Autowired
private SmsProducer smsProducer;

@Test
public void testSendSms() {
smsProducer.sendSmsCode("15797260000","679158");
}

}

访问 web控制台:172.0.0.1:8161/admin,查看 Queues标签页查看Number Of Pending Messages(入队消息数量)

jms-consumer

编写业务类

  1. 创建业务类 如SmsConsumer,并标明为组件纳入spring管理
  2. 在业务方法上添加 @JmsListener监听入队消息并通过 destination属性标明监听的队列名称
  3. 在业务方法形参中注入入队消息
  4. 对消息数据根据业务需求加工处理
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
package top.zhenganwen.jmsconsumer.jms.consumer;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;

import javax.jms.Queue;
import java.util.Map;

/**
* SmsConsumer class
* <p>
* 短信服务消费者
* 接收来自短信服务生产者传递的手机号和验证码调用第三方短信服务接口给用户发送验证码短信
*
* @author zhenganwen
* @date 2018/7/23
*/
@Component
public class SmsConsumer {

/**
* sms 短信消息队列
* msgMap 消息数据
*/
@JmsListener(destination = "sms")
public void sendSmsCode(Map msgMap) {
String phone = (String) msgMap.get("phone");
String code = (String) msgMap.get("code");
System.out.println("接收到的消息:");
System.out.println("code:" + code);
System.out.println("phone:"+phone);
try {
//模拟调用第三方短信服务接口给用户发送短信
Thread.sleep(1000);
//发送成功
System.out.println("发送成功");
} catch (InterruptedException e) {
e.printStackTrace();
System.out.println("发送失败");
}
}
}

测试业务类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
package top.zhenganwen.jmsconsumer;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

import java.io.IOException;

@RunWith(SpringRunner.class)
@SpringBootTest
public class JmsConsumerApplicationTests {

@Test
public void testSendSms() throws InterruptedException {
//启动容器即收到阻塞在sms队列中的消息
Thread.sleep(100*1000);
}

}

测试,控制台输出如下:

1
2
3
4
接收到的消息:
code:679158
phone:15797260000
发送成功

SpringBoot2.x整合ActiveMQ之发布订阅

开启发布订阅模式

你可能注意到了,上例中无论是生产者还是消费者我们都只是表明了 destinationName(sms),而并未指明是 Queue的形式还是 Topic的形式。

这是因为SpringBoot默认开启点对点模式而关闭发布订阅模式(convertAndSend("sms",message)默认以 Queue的形式发送)。因此如果需要开启(以 Topic的形式发送),则需在 application.properties中添加如下配置:

1
2
#default point to point
spring.jms.pub-sub-domain=true

配置Topic监听容器

即使你通过 spring.jms.pub-sub-domain=true解决了将消息发布到 Topics中的问题,但你又会发现 @JmsListener只指定 destinationName="sms"的话,它是只监听 Queues中的队列的。

因此你需要给Topic定义独立的 JmsListenerContainer

1
2
3
4
5
6
7
8
9
10
@Configuration
public class ActiveMQConfiguration{
@Bean
public JmsListenerContainerFactory<?> jmsListenerContainerTopic(ConnectionFactory activeMQConnectionFactory) {
DefaultJmsListenerContainerFactory bean = new DefaultJmsListenerContainerFactory();
bean.setPubSubDomain(true);
bean.setConnectionFactory(activeMQConnectionFactory);
return bean;
}
}

这时你可注释掉 spring.jms.pub-sub-domain=true

消费者

这里发布订阅模式以天气预报为例,生产者是气象台,消费者是百姓。

@JmsListener中指定 containerFactory为自定义的方法名 jmsListenerContainerTopic

1
2
3
4
5
6
7
8
9
10
11
12
13
package top.zhenganwen.jmsconsumer.jms.consumer;

import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;

@Component
public class WeatherConsumer {

@JmsListener(destination = "weather",containerFactory = "jmsListenerContainerTopic")
public void listenWeatherReport(String report) {
System.out.println(report);
}
}

启动消费者监听消息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
package top.zhenganwen.jmsconsumer;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

import java.io.IOException;

@RunWith(SpringRunner.class)
@SpringBootTest
public class JmsConsumerApplicationTests {

@Test
public void testTopicMsg() throws InterruptedException {
//启动容器,监听来自Topics中weather队列的消息
Thread.sleep(1000*1000);
}

}

生产者

配置消息发往地

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
package top.zhenganwen.jmsproducer.jms.producer;

import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import javax.jms.Queue;
import javax.jms.Topic;

@Configuration
public class DestinationConfiguration {
@Bean
public Topic getTopic() {
return new ActiveMQTopic("weather");
}
}

编写生产业务类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
package top.zhenganwen.jmsproducer.jms.producer;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsMessagingTemplate;
import org.springframework.stereotype.Component;

import javax.jms.Topic;

@Component
public class WeatherProducer {

@Autowired
private JmsMessagingTemplate jmsMessagingTemplate;
@Autowired
private Topic weatherTopic;

/**
* 发送天气预报
*/
public void sendWeatherReport(String message) {
jmsMessagingTemplate.convertAndSend(weatherTopic, message);
}
}

启动生产者生产消息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
package top.zhenganwen.jmsproducer;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
import top.zhenganwen.jmsproducer.jms.producer.SmsProducer;
import top.zhenganwen.jmsproducer.jms.producer.WeatherProducer;

@RunWith(SpringRunner.class)
@SpringBootTest
public class JmsProducerApplicationTests {

@Autowired
private WeatherProducer weatherProducer;
@Test
public void testTopicMsg() {
weatherProducer.sendWeatherReport("明天有雨,请备好伞");
}
}

查看 jms-consumer工程的控制台:

1
明天有雨,请备好伞

配置多消费者监听

1
2
3
4
5
6
7
8
9
10
11
12
@JmsListener(destination = "weather",containerFactory = "jmsListenerContainerTopic")
public void listenWeatherReport(String report) {
System.out.println(report);
}
@JmsListener(destination = "weather",containerFactory = "jmsListenerContainerTopic")
public void listenWeatherReport1(String report) {
System.out.println(report);
}
@JmsListener(destination = "weather",containerFactory = "jmsListenerContainerTopic")
public void listenWeatherReport2(String report) {
System.out.println(report);
}
1
2
3
明天有雨,请备好伞
明天有雨,请备好伞
明天有雨,请备好伞
鼓励一下~