0%

SpringBoot2.x(十六)响应式编程webflux和服务器端主动推送SSE

何为响应式

想象一个排队买奶茶的场景:

  • 场景一

客户们买奶茶需要到奶茶店前台排队,前台服务人员收到客户请求(如买一杯柠檬奶茶)后通知后台去做,后台做完后给前台,前台再给客户,该客户消费完成,轮到下一位客户购买

  • 场景二

客户们买奶茶只需将请求报给前台服务人员(如买一杯巧克力味奶茶),前台服务人员马上打出一张付费票据给客户并告诉客户可在周围稍作等待(可能是十分钟左右),奶茶做好后会通过喇叭通知客户来领取。

​ 场景一就对应servlet开发以及基于 servlet API封装的strutsspringmvc。他们的特点就是在客户发送请求到请求被处理完毕期间,客户和后台都是阻塞的:客户只能站在原地排队(等待浏览器响应)、后台只能做这一份奶茶,而在做这份奶茶的空余时间(如煮奶茶)只能等待(后台其他的资源如内存等没有完全利用起来)。这种模式下即使增加了排队的队伍,在横向上提高了请求处理效率,但阻塞的本质并没有变。

​ 场景二就对应 webflux响应式编程。这种模式是基于事件驱动的,如后台做好奶茶后递给前台,前台便通过喇叭通知客户领取,这里的后台将做好的奶茶递给前台就是请求处理过程的重要事件。这种模式下:客户发送请求后可在已知的等候时间内(如前台服务员告诉客户大概十分钟后做好)去做自己的事件,同时后台也可以在煮柠檬茶时开始做巧克力奶茶,这种模式便是典型的非阻塞,好处是实现了资源利用最大化(后台)和用户体验友好化(客户)。

SpringBoot2.x是依赖于Spring5来做响应式编程,而Spring5又是基于Reactor来开发响应式编程。

学习资料

reactive-streams学习资料

web-flux相关资料

springboot-webflux

webflux实战

依赖

使用webflux可以开发脱离 Servlet API的web应用,使用 Netty做内嵌服务器,性能很好。

1
2
3
4
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>

Mono&Flux

  • pojo
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
package top.zhenganwen.webflux.pojo;

import lombok.Data;
import java.io.Serializable;

@Data
public class User implements Serializable {

private String username;
private String pwd;
private String phone;

public User(String username, String pwd, String phone) {
this.username = username;
this.pwd = pwd;
this.phone = phone;
}
public User() {
}
}
  • Service

通常使用 Mono封装单一数据对象而使用Flux封装集合数据对象,返回给前端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
package top.zhenganwen.webflux.service;

import org.springframework.stereotype.Service;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import top.zhenganwen.webflux.pojo.User;
import java.util.HashMap;
import java.util.Map;

@Service
public class UserService {

/**
* 模拟数据库数据
*/
private static Map<Integer,User> dataMap;
static {
dataMap = new HashMap();
dataMap.put(1, new User("tom", "123"));
dataMap.put(2, new User("jack", "456"));
dataMap.put(3, new User("alice", "789"));
dataMap.put(4, new User("john", "123"));
}

public Flux list() {
return Flux.fromIterable(dataMap.values());
}

public Mono getById(Integer id) {
return Mono.justOrEmpty(dataMap.get(id));
}

public Mono del(Integer id) {
return Mono.justOrEmpty(dataMap.remove(id));
}

}
  • Controller

Controller写法沿袭 SpringMvc:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
package top.zhenganwen.webflux.controller;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import top.zhenganwen.webflux.service.UserService;

@RestController
public class UserController {

@Autowired
private UserService userService;

@GetMapping("getById")
public Mono getById(Integer id) {
return userService.getById(id);
}

@GetMapping("list")
public Flux list() {
return userService.list();
}

@GetMapping("/del")
public Mono del(Integer id) {
return userService.del(id);
}
}

响应流

一系列事件称为,如客户们买咖啡

下面以陆续通知客户领取咖啡为例测试一下异步响应(不是等所有客户的咖啡都做好了再通知,而是做好了一个便通知相应的客户):

1
2
3
4
@GetMapping(value = "list", produces = MediaType.APPLICATION_STREAM_JSON_VALUE)
public Flux list() {
return userService.list().delayElements(Duration.ofSeconds(1));
}

注意设置响应头为响应

webflux客户端webclient

webclient作为 webflux客户端可以用来抓取接口响应的 json数据

1
2
3
4
5
6
7
8
9
@Test
public void testWebClient() {
Mono<User> bodyMono = WebClient.create().get()
.uri("http://localhost:8080/getById?id=1")
.accept(MediaType.APPLICATION_JSON)
.retrieve().bodyToMono(User.class);

System.out.println(bodyMono.block()); //阻塞获取结果
}

输出结果:

1
User(username=tom, pwd=123)

还可以通过 uri(String uri,Object... params)灵活设置请求参数:

1
2
3
4
Mono<String> bodyMono = WebClient.create().get()
.uri("http://localhost:8080/api/v1/user/find?id={id}",2)
.accept(MediaType.APPLICATION_JSON)
.retrieve().bodyToMono(String.class);

服务器端主动推送SSE

服务端推送常用技术介绍

  • 客户端轮询:ajax定时拉取
    • 缺点是每个客户端都会不断发出请求,消耗资源和带宽
  • 服务端主动推送:WebSocket
    • 全双工的,本质上是一个额外的tcp连接建立和关闭时握手使用http协议其他数据传输不使用http协议。这样使得服务端有新的数据时才推送个客户端,而不用不断轮询抓取。
    • 更加复杂一些,适用于需要进行复杂双向数据通讯的场景
  • 服务端主动推送:SSE (Server Send Event)
    • H5新标准,用来从服务端实时推送数据到浏览器端
    • 直接建立在当前http连接上,本质上是保持一个http长连接轻量协议
    • 简单的服务器数据推送的场景,使用服务器推送事件

实战

  1. 设置响应头
  2. 前端通过 EventSource对象设置抓取接口和消息监听
  3. 监听到消息后通过回调参数获取消息数据
1
2
3
4
5
6
7
8
9
10
11
@RequestMapping(value = "/retrieve",produces = "text/event-stream;charset=UTF-8")
public double retrieve() {
try {
//每0.5秒刷新数据
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
//模拟股票实时变动数据
return Math.random();
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>Title</title>
</head>
<body>

<div id="data"></div>
<script type="text/javascript">
var source = new EventSource("retrieve");
//当抓取到消息
source.onmessage = function (evt) {
document.getElementById("data").innerHTML = "股票行情:" + evt.data;
};
</script>
</body>
</html>

SSE本质上也是不断轮询请求接口获取数据。

鼓励一下~