0%

使用Netty三分钟手写一个RPC

undefined

流程概览

image

项目结构

依赖

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>5.0.0.Alpha1</version>
</dependency>

<!-- 服务提供方根据调用信息反射获取实现类时需要 -->
<dependency>
<groupId>org.reflections</groupId>
<artifactId>reflections</artifactId>
<version>0.9.10</version>
</dependency>

<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.16.18</version>
<scope>provided</scope>
</dependency>

通用模块

image

ClassInfo

实体类,封装了服务调用信息:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
package top.zhenganwen.rpc.common;

import lombok.Data;

import java.io.Serializable;

/**
* ClassInfo class
* 使用JDK的序列化技术必须实现接口Serializable
*
* @author : zaw
* @date : 2019/3/30
*/
@Data
public class ClassInfo implements Serializable {

/**
* 调用服务的接口名
*/
private String className;
/**
* 调用服务的方法名
*/
private String methodName;
/**
* 调用方法的参数列表类型
*/
private Class[] paramTypes;
/**
* 调用服务传参
*/
private Object[] params;
}

需要注意的是客户端在发送调用信息时会将该类对象序列化并发送给服务端,而服务的则需要反序列化回来,如果使用的是JDK的序列化技术则需要将此类实现Serializable接口

服务接口

为了便于维护,服务接口通常会被独立出来到通用模块中,以jar包的形式被服务调用方和服务提供方依赖。这里简单的写了两个接口,一个包含无參服务,一个包含有参服务。

1
2
3
4
5
6
7
public interface HasArgsHelloService {
String hello(String msg);
}

public interface NoArgsHelloService {
String hello();
}

服务调用方

image

client

这个包中是依赖Service接口的一些类,RPC服务的调用对于他们来说是透明的,他们仅通过client_stub中的ServiceProxy来获取服务实现类并调用服务。

1
2
3
4
5
6
7
8
9
10
11
public class RPCClient {

public static void main(String[] args){
NoArgsHelloService noArgsHelloService = (NoArgsHelloService) ServiceProxy.create(NoArgsHelloService.class);
System.out.println(noArgsHelloService.hello());

HasArgsHelloService hasArgsHelloService = (HasArgsHelloService) ServiceProxy.create(HasArgsHelloService.class);
System.out.println(hasArgsHelloService.hello("hello netty rpc"));
}

}

client_stub

真正处理RPC调用逻辑的包,ServiceProxy通过JDK代理Proxy.newProxyInstance来代理所有的服务,所有client中调用服务的动作都将被该代理逻辑中设置的InvocationHandler拦截,拦截后获取调用信息(接口名、方法名、方法参列类型、实参列表)并通过Netty与服务端建立连接发送调用信息,然后阻塞等待连接关闭事件(RPCClientHandler在收到服务端返回的调用结果时会保存该结果并关闭连接),若此事件被触发说明RPCClientHandler已拿到调用结果,于是此次InvocationHandler的拦截可以返回了。

  • ServiceProxy
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
package top.zhenganwen.rpc.client_stub;

import io.netty.bootstrap.Bootstrap;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.serialization.ClassResolvers;
import io.netty.handler.codec.serialization.ObjectDecoder;
import io.netty.handler.codec.serialization.ObjectEncoder;
import top.zhenganwen.rpc.common.ClassInfo;

import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;

/**
* ServiceProxy class
*
* @author : zaw
* @date : 2019/3/30
*/
public class ServiceProxy {

public static Object create(Class clazz) {
return Proxy.newProxyInstance(clazz.getClassLoader(), new Class[]{clazz}, new InvocationHandler() {
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {

//构造调用信息
ClassInfo classInfo = new ClassInfo();
classInfo.setClassName(clazz.getName());
classInfo.setMethodName(method.getName());
classInfo.setParamTypes(method.getParameterTypes());
classInfo.setParams(args);

//使用netty发送调用信息给服务提供方
NioEventLoopGroup group = new NioEventLoopGroup();
Bootstrap bootstrap = new Bootstrap();
RPCClientHandler rpcClientHandler = new RPCClientHandler();
try {
bootstrap.group(group)
.channel(NioSocketChannel.class)
.option(ChannelOption.SO_KEEPALIVE, true)
.option(ChannelOption.TCP_NODELAY, true)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new ObjectEncoder());
//反序列化对象时指定类解析器,null表示使用默认的类加载器
ch.pipeline().addLast(new ObjectDecoder(1024 * 64, ClassResolvers.cacheDisabled(null)));
ch.pipeline().addLast(rpcClientHandler);

}
});
//connect是异步的,但调用其future的sync则是同步等待连接成功
ChannelFuture future = bootstrap.connect("127.0.0.1", 80).sync();
//同步等待调用信息发送成功
future.channel().writeAndFlush(classInfo).sync();
//同步等待RPCClientHandler的channelRead被触发后(意味着收到了调用结果)
future.channel().closeFuture().sync();
} catch (Exception e) {
e.printStackTrace();
} finally {
group.shutdownGracefully();
}

//返回调用结果
return rpcClientHandler.getRpcResult();
}
});
}

}
  • PRCClientHandler
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
package top.zhenganwen.rpc.client_stub;

import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;

/**
* RPCClientHandler class
*
* @author : zaw
* @date : 2019/3/30
*/
public class RPCClientHandler extends ChannelHandlerAdapter {

/**
* RPC调用返回的结果
*/
private Object rpcResult;

public Object getRpcResult() {
return rpcResult;
}

public void setRpcResult(Object rpcResult) {
this.rpcResult = rpcResult;
}

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
setRpcResult(msg);
ctx.close();
}
}

服务提供方

image

server

首先服务提供方有具体的服务实现类,然后它通过RPCServer建立Netty服务端24小时监听客户端的服务调用请求。请求将被RPCServerHandler处理,它根据请求中的调用信息通过反射找到实现类和服务方法并反射调用获取结果,并立即将结果发送给客户端。

  • 服务实现类
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public class NoArgsHelloServiceImpl implements NoArgsHelloService {

@Override
public String hello() {
return "hello";
}
}

public class HasArgsHelloServiceImpl implements HasArgsHelloService {

@Override
public String hello(String msg) {
return msg;
}
}
  • PRCServer
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
package top.zhenganwen.rpc.server;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.serialization.ClassResolvers;
import io.netty.handler.codec.serialization.ObjectDecoder;
import io.netty.handler.codec.serialization.ObjectEncoder;
import top.zhenganwen.rpc.server_stub.RPCServerHandler;

/**
* RPCServer class
*
* @author : zaw
* @date : 2019/3/30
*/
public class RPCServer {

public static void main(String[] args){
NioEventLoopGroup boss = new NioEventLoopGroup();
NioEventLoopGroup worker = new NioEventLoopGroup();
ServerBootstrap bootstrap = new ServerBootstrap();
try {
bootstrap.group(boss, worker)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 128)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new ObjectEncoder());
ch.pipeline().addLast(new ObjectDecoder(1024 * 64, ClassResolvers.cacheDisabled(null)));
ch.pipeline().addLast(new RPCServerHandler());
}
});
//bind初始化端口是异步的,但调用sync则会同步阻塞等待端口绑定成功
ChannelFuture future = bootstrap.bind(80).sync();
future.channel().closeFuture().sync();
} catch (Exception e) {
e.printStackTrace();
}finally {
boss.shutdownGracefully();
worker.shutdownGracefully();
}
}

}

server_stub

真正根据调用请求反射调用的业务处理类

  • RPCServerHandler
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
package top.zhenganwen.rpc.server_stub;

import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
import org.reflections.Reflections;
import top.zhenganwen.rpc.common.ClassInfo;

import java.lang.reflect.Method;
import java.util.Set;

/**
* RPCServerHandler class
*
* @author : zaw
* @date : 2019/3/30
*/
public class RPCServerHandler extends ChannelHandlerAdapter {

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
//获取调用信息,寻找服务实现类
ClassInfo classInfo = (ClassInfo) msg;
String implName = getImplClassName(classInfo.getClassName());
Class<?> clazz = Class.forName(implName);
Method method = clazz.getMethod(classInfo.getMethodName(), classInfo.getParamTypes());
Object result = method.invoke(clazz.newInstance(), classInfo.getParams());
ctx.writeAndFlush(result);
}

private String getImplClassName(String interfaceName) throws ClassNotFoundException {
Class interClass = Class.forName(interfaceName);
String servicePath = "top.zhenganwen.rpc.server";
Reflections reflections = new Reflections(servicePath);
Set<Class> implClasses = reflections.getSubTypesOf(interClass);
if (implClasses.isEmpty()) {
System.err.println("impl class is not found!");
} else if (implClasses.size() > 1) {
System.err.println("there are many impl classes, not sure invoke which");
} else {
Class[] classes = implClasses.toArray(new Class[1]);
return classes[0].getName();
}
return null;
}
}
鼓励一下~