自己动手实现 RPC 框架
为了加深对 RPC 框架的理解,自己动手做了个简单的 RPC 框架,名字随便起个,就叫 lsf 吧。
lsf GitHub 地址:https://github.com/buyulian/lsf
目录
1、注册中心、消费者、生产证 spring bean标签定义
一、整体架构
二、各模块含义
三、提供方demo
1、引入客户端 jar 包
-
<dependency>
-
<groupId>com.me</groupId>
-
<artifactId>lsf-client</artifactId>
-
<version>1.0-SNAPSHOT</version>
-
</dependency>
2、api 包定义
3、 接口实现
4、提供者 spring bean 配置
-
<lsf:registry id="registry" host="127.0.0.1" port="25000"/>
-
-
<lsf:provider id="helloWorldServiceLsf" alias="test"
-
interface="com.me.lsf.provider.api.HelloWorldService"
-
registry="registry"
-
ref="helloWorldService"/>
5、启动类
四、调用方 demo
1、引入客户端 jar 包和提供者的 api 包
-
<dependency>
-
<groupId>com.me</groupId>
-
<artifactId>lsf-provider-api</artifactId>
-
<version>1.0-SNAPSHOT</version>
-
</dependency>
-
<dependency>
-
<groupId>com.me</groupId>
-
<artifactId>lsf-client</artifactId>
-
<version>1.0-SNAPSHOT</version>
-
</dependency>
2、消费者 spring bean 配置
-
<lsf:registry id="registry" host="127.0.0.1" port="25000"/>
-
-
<lsf:consumer id="helloWorldService"
-
interface="com.me.lsf.provider.api.HelloWorldService"
-
registry="registry"
-
alias="test"/>
3、启动类
五、具体实现
1、注册中心、消费者、生产证 spring bean标签定义
(1) lsf.xsd 文件
-
-
<schema xmlns="http://www.w3.org/2001/XMLSchema"
-
targetNamespace="http://www.me.com/schema/lsf"
-
elementFormDefault="qualified">
-
-
<element name="registry">
-
<complexType>
-
<attribute name="id" type="string"/>
-
<attribute name="host" type="string"/>
-
<attribute name="port" type="string"/>
-
</complexType>
-
</element>
-
<element name="provider">
-
<complexType>
-
<attribute name="id" type="string"/>
-
<attribute name="alias" type="string"/>
-
<attribute name="interface" type="string"/>
-
<attribute name="ref" type="string"/>
-
<attribute name="registry" type="string"/>
-
</complexType>
-
</element>
-
<element name="consumer">
-
<complexType>
-
<attribute name="id" type="string"/>
-
<attribute name="alias" type="string"/>
-
<attribute name="interface" type="string"/>
-
<attribute name="registry" type="string"/>
-
</complexType>
-
</element>
-
</schema>
(2) 标签解析
2、核心数据结构
(1) rpc参数
-
public class RpcParam {
-
-
/**
-
* 调用类
-
*/
-
private String rClass;
-
-
/**
-
* 调用方法
-
*/
-
private String method;
-
-
/**
-
* 参数列表
-
*/
-
private String[] args;
-
-
/**
-
* 序列化方式
-
*/
-
private String serializeType = SerializeTypeEnum.JSON_AUTO_TYPE.getCode();
-
}
(2) 消费者bean
-
-
public class Consumerbean {
-
-
/**
-
* 调用接口
-
*/
-
private Class interfaceClass;
-
-
/**
-
* 调用接口名
-
*/
-
private String interfaceName;
-
-
/**
-
* 别名
-
*/
-
private String alias;
-
-
/**
-
* 预留
-
*/
-
private Boolean register;
-
-
/**
-
* 存活生产者连接
-
*/
-
private List<LsfConnection> aliveConnectionList;
-
-
/**
-
* 手工指定生产者连接
-
*/
-
private List<LsfConnection> fixedConnectionList;
-
-
/**
-
* 父对象
-
*/
-
private ParentObject parentObject;
-
-
/**
-
* 序列化方式
-
*/
-
private String serializeType = SerializeTypeEnum.JSON_AUTO_TYPE.getCode();
-
-
/**
-
* 注册中心 bean
-
*/
-
private RegistryBean registryBean;
-
}
3、核心处理逻辑
(1) 消费核心处理逻辑
-
public class ConsumerBeanInvocationHandler implements InvocationHandler {
-
-
private static Logger logger = LoggerFactory.getLogger(ConsumerBeanInvocationHandler.class);
-
-
private Consumerbean consumerbean;
-
-
-
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
-
-
//获取调用的类
-
Class tClass = consumerbean.getInterfaceClass();
-
-
String canonicalName = tClass.getCanonicalName();
-
String methodName = method.getName();
-
-
ParentObject parentObject = consumerbean.getParentObject();
-
-
boolean isNoRpc = isNoRpc(methodName, parentObject);
-
-
if (isNoRpc) {
-
return method.invoke(parentObject, args);
-
}
-
logger.debug("执行了 rpc 调用, class {}, method {}, args {}",canonicalName, methodName, Arrays.toString(args));
-
-
//获取序列化方式
-
String serializeType = consumerbean.getSerializeType();
-
-
//组装 rpc 参数
-
RpcParam rpcParam = getRpcParam(args, canonicalName, methodName, serializeType, method);
-
-
//获取可用的生产者连接
-
LsfConnection connection = consumerbean.getConnection();
-
-
//调用并得到字符串结果
-
String rpcResponseParamStr = getBody(rpcParam, connection);
-
-
RpcResponseParam rpcResponseParam = JSON.parseObject(rpcResponseParamStr, RpcResponseParam.class);
-
-
if (ErrorCodeEnum.SUCCESS.getCode().equals(rpcResponseParam.getCode())) {
-
//获取序列化处理类
-
LsfSerialize lsfSerialize = LsfSerializeFactory.get(serializeType);
-
//反序列化结果
-
Object result = lsfSerialize.deSerializeResult(method, rpcResponseParam.getResult());
-
return result;
-
} else {
-
//生产者抛出的异常处理
-
throw new RuntimeException(rpcResponseParam.getException());
-
}
-
-
}
-
-
private String getBody(RpcParam rpcParam, LsfConnection connection) {
-
LsfClient client = LsfHttpClientFactory.getClient();
-
String host = connection.getHost();
-
int port = connection.getPort();
-
ClientParam clientParam = new ClientParam();
-
clientParam.setHost(host);
-
clientParam.setPort(port);
-
clientParam.setUrl("/");
-
-
String rpcBody = JSON.toJSONString(rpcParam);
-
clientParam.setBody(rpcBody);
-
// netty 执行网络调用
-
return client.post(clientParam);
-
}
-
-
-
private RpcParam getRpcParam(Object[] args, String canonicalName, String methodName, String serializeType, Method method) {
-
RpcParam rpcParam = new RpcParam();
-
-
rpcParam.setrClass(canonicalName);
-
rpcParam.setMethod(methodName);
-
rpcParam.setSerializeType(serializeType);
-
-
//获取序列化方式
-
LsfSerialize lsfSerialize = LsfSerializeFactory.get(serializeType);
-
-
//序列化参数
-
String[] argsStrs = lsfSerialize.serializeParam(method, args);
-
-
rpcParam.setArgs(argsStrs);
-
return rpcParam;
-
}
-
-
private boolean isNoRpc(String methodName, ParentObject parentObject) {
-
boolean isNoRpc = false;
-
Method[] declaredMethods = parentObject.getClass().getDeclaredMethods();
-
for (Method declaredMethod : declaredMethods) {
-
if (declaredMethod.getName().equals(methodName)) {
-
isNoRpc = true;
-
break;
-
}
-
}
-
return isNoRpc;
-
}
-
}
(2) 生产者核心处理逻辑
-
public static String dealRequest(String body) {
-
-
logger.info("center asyncDeal request {}",body);
-
-
//解析 rpc 调用参数
-
RpcParam rpcParam = JSON.parseObject(body, RpcParam.class);
-
-
String rClassStr = rpcParam.getrClass();
-
Object provider = getProvider(rClassStr);
-
-
if (provider == null) {
-
throw new RuntimeException("没有 对应的 provider");
-
}
-
-
String method = rpcParam.getMethod();
-
String[] argsStr = rpcParam.getArgs();
-
-
Class<?> aClass = provider.getClass();
-
-
String resultStr = "error";
-
RpcResponseParam rpcResponseParam = new RpcResponseParam();
-
try {
-
Method declaredMethod = null;
-
-
//通过反射获取rpc调用的方法
-
Method[] declaredMethods = aClass.getDeclaredMethods();
-
for (Method declaredMethod1 : declaredMethods) {
-
if (declaredMethod1.getName().equals(method)) {
-
declaredMethod = declaredMethod1;
-
break;
-
}
-
}
-
-
if (declaredMethod == null) {
-
throw new RuntimeException("没有这个方法 " method);
-
}
-
-
//获取序列化实现类
-
LsfSerialize lsfSerialize = LsfSerializeFactory.get(rpcParam.getSerializeType());
-
-
//反序列参数
-
Object[] inArgs = lsfSerialize.deSerializeParam(declaredMethod, argsStr);
-
-
//调用实现类
-
Object result = declaredMethod.invoke(provider, inArgs);
-
-
//序列化执行结果
-
String result1 = lsfSerialize.serializeResult(declaredMethod, result);
-
rpcResponseParam.setCode(ErrorCodeEnum.SUCCESS.getCode());
-
rpcResponseParam.setResult(result1);
-
-
} catch (Exception e) {
-
//若原始方法发生异常,则封装异常信息并返回给消费者
-
rpcResponseParam.setCode(ErrorCodeEnum.EXCEPTION.getCode());
-
rpcResponseParam.setException(e.toString());
-
logger.error("lsf rpc exception rpc param {}", JSON.toJSONString(rpcParam), e);
-
}
-
resultStr = JSON.toJSONString(rpcResponseParam);
-
-
logger.info("center asyncDeal result {}", resultStr);
-
return resultStr;
-
}
(3) 序列化接口定义
对扩展开放。新的的序列化方式可通过实现这个接口,并注册到序列化工厂里去实现。
-
public interface LsfSerialize {
-
-
String[] serializeParam(Method method, Object[] args);
-
-
Object[] deSerializeParam(Method method, String[] contents);
-
-
String serializeResult(Method method, Object result);
-
-
Object deSerializeResult(Method method, String content);
-
-
}
(4) FastJson autoType 序列化实现
-
-
public class JsonAutoTypeSerialize implements LsfSerialize {
-
-
private static Logger logger = LoggerFactory.getLogger(JsonAutoTypeSerialize.class);
-
-
{
-
ParserConfig.getGlobalInstance().setSafeMode(false);
-
ParserConfig.getGlobalInstance().setAutoTypeSupport(true);
-
}
-
-
-
public String[] serializeParam(Method method, Object[] args) {
-
-
String[] argsStrs = null;
-
if (args != null) {
-
argsStrs = new String[args.length];
-
for (int i = 0; i < args.length; i ) {
-
argsStrs[i] = JSON.toJSONString(args[i], SerializerFeature.WriteClassName);
-
}
-
}
-
return argsStrs;
-
}
-
-
-
public Object[] deSerializeParam(Method method, String[] contents) {
-
Class<?>[] parameterTypes = method.getParameterTypes();
-
return getInArgs(contents, parameterTypes, method);
-
}
-
-
private Object[] getInArgs(String[] strs, Class<?>[] parameterTypes, Method method) {
-
-
if (strs == null) {
-
return null;
-
}
-
Type[] genericParameterTypes = method.getGenericParameterTypes();
-
Object[] inArgs = new Object[strs.length];
-
for (int i = 0; i < parameterTypes.length; i ) {
-
Class<?> parameterType = parameterTypes[i];
-
inArgs[i] = getObjectSuper(strs[i], parameterType, genericParameterTypes[i]);
-
}
-
return inArgs;
-
}
-
-
-
public String serializeResult(Method method, Object result) {
-
return JSON.toJSONString(result, SerializerFeature.WriteClassName);
-
}
-
-
-
public Object deSerializeResult(Method method, String content) {
-
Type genericReturnType = method.getGenericReturnType();
-
Class<?> returnType = method.getReturnType();
-
return getObjectSuper(content, returnType, genericReturnType);
-
}
-
-
private Object getObjectSuper(String content, Class<?> returnType, Type genericReturnType) {
-
Object result = JSON.parseObject(content, returnType);
-
return result;
-
}
-
}
六、代码启动方式
先启动注册中心,然后启动生产者,最后启动消费者。
这篇好文章是转载于:学新通技术网
- 版权申明: 本站部分内容来自互联网,仅供学习及演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,请提供相关证据及您的身份证明,我们将在收到邮件后48小时内删除。
- 本站站名: 学新通技术网
- 本文地址: /boutique/detail/tanhfjhieg
系列文章
更多
同类精品
更多
-
photoshop保存的图片太大微信发不了怎么办
PHP中文网 06-15 -
Android 11 保存文件到外部存储,并分享文件
Luke 10-12 -
word里面弄一个表格后上面的标题会跑到下面怎么办
PHP中文网 06-20 -
《学习通》视频自动暂停处理方法
HelloWorld317 07-05 -
photoshop扩展功能面板显示灰色怎么办
PHP中文网 06-14 -
微信公众号没有声音提示怎么办
PHP中文网 03-31 -
excel下划线不显示怎么办
PHP中文网 06-23 -
怎样阻止微信小程序自动打开
PHP中文网 06-13 -
excel打印预览压线压字怎么办
PHP中文网 06-22 -
TikTok加速器哪个好免费的TK加速器推荐
TK小达人 10-01