• 首页 首页 icon
  • 工具库 工具库 icon
    • IP查询 IP查询 icon
  • 内容库 内容库 icon
    • 快讯库 快讯库 icon
    • 精品库 精品库 icon
    • 问答库 问答库 icon
  • 更多 更多 icon
    • 服务条款 服务条款 icon

自己动手实现 RPC 框架

武飞扬头像
buyulian
帮助1

为了加深对 RPC 框架的理解,自己动手做了个简单的 RPC 框架,名字随便起个,就叫 lsf 吧。

lsf GitHub 地址:https://github.com/buyulian/lsf

目录

一、整体架构

二、各模块含义

三、提供方demo

1、引入客户端 jar 包

2、api 包定义

3、 接口实现

 4、提供者 spring bean 配置

5、启动类

四、调用方 demo

 1、引入客户端 jar 包和提供者的 api 包

2、消费者 spring bean 配置

3、启动类

 五、具体实现

1、注册中心、消费者、生产证 spring bean标签定义

(1) lsf.xsd 文件

(2) 标签解析

2、核心数据结构

(1) rpc参数

 (2) 消费者bean

3、核心处理逻辑

(1) 消费核心处理逻辑

(2) 生产者核心处理逻辑

(3) 序列化接口定义

(4) FastJson autoType 序列化实现

六、代码启动方式


一、整体架构

学新通

二、各模块含义

 学新通

三、提供方demo

1、引入客户端 jar 包

  1.  
    <dependency>
  2.  
    <groupId>com.me</groupId>
  3.  
    <artifactId>lsf-client</artifactId>
  4.  
    <version>1.0-SNAPSHOT</version>
  5.  
    </dependency>

2、api 包定义

学新通

3、 接口实现

学新通

 4、提供者 spring bean 配置

  1.  
    <lsf:registry id="registry" host="127.0.0.1" port="25000"/>
  2.  
     
  3.  
    <lsf:provider id="helloWorldServiceLsf" alias="test"
  4.  
    interface="com.me.lsf.provider.api.HelloWorldService"
  5.  
    registry="registry"
  6.  
    ref="helloWorldService"/>

5、启动类

学新通

四、调用方 demo

 1、引入客户端 jar 包和提供者的 api 包

  1.  
    <dependency>
  2.  
    <groupId>com.me</groupId>
  3.  
    <artifactId>lsf-provider-api</artifactId>
  4.  
    <version>1.0-SNAPSHOT</version>
  5.  
    </dependency>
  6.  
    <dependency>
  7.  
    <groupId>com.me</groupId>
  8.  
    <artifactId>lsf-client</artifactId>
  9.  
    <version>1.0-SNAPSHOT</version>
  10.  
    </dependency>

2、消费者 spring bean 配置

  1.  
    <lsf:registry id="registry" host="127.0.0.1" port="25000"/>
  2.  
     
  3.  
    <lsf:consumer id="helloWorldService"
  4.  
    interface="com.me.lsf.provider.api.HelloWorldService"
  5.  
    registry="registry"
  6.  
    alias="test"/>

3、启动类

学新通

 五、具体实现

1、注册中心、消费者、生产证 spring bean标签定义

(1) lsf.xsd 文件

  1.  
    <?xml version="1.0" encoding="UTF-8" ?>
  2.  
    <schema xmlns="http://www.w3.org/2001/XMLSchema"
  3.  
    targetNamespace="http://www.me.com/schema/lsf"
  4.  
    elementFormDefault="qualified">
  5.  
     
  6.  
    <element name="registry">
  7.  
    <complexType>
  8.  
    <attribute name="id" type="string"/>
  9.  
    <attribute name="host" type="string"/>
  10.  
    <attribute name="port" type="string"/>
  11.  
    </complexType>
  12.  
    </element>
  13.  
    <element name="provider">
  14.  
    <complexType>
  15.  
    <attribute name="id" type="string"/>
  16.  
    <attribute name="alias" type="string"/>
  17.  
    <attribute name="interface" type="string"/>
  18.  
    <attribute name="ref" type="string"/>
  19.  
    <attribute name="registry" type="string"/>
  20.  
    </complexType>
  21.  
    </element>
  22.  
    <element name="consumer">
  23.  
    <complexType>
  24.  
    <attribute name="id" type="string"/>
  25.  
    <attribute name="alias" type="string"/>
  26.  
    <attribute name="interface" type="string"/>
  27.  
    <attribute name="registry" type="string"/>
  28.  
    </complexType>
  29.  
    </element>
  30.  
    </schema>
学新通

(2) 标签解析

学新通

2、核心数据结构

(1) rpc参数

  1.  
    public class RpcParam {
  2.  
     
  3.  
    /**
  4.  
    * 调用类
  5.  
    */
  6.  
    private String rClass;
  7.  
     
  8.  
    /**
  9.  
    * 调用方法
  10.  
    */
  11.  
    private String method;
  12.  
     
  13.  
    /**
  14.  
    * 参数列表
  15.  
    */
  16.  
    private String[] args;
  17.  
     
  18.  
    /**
  19.  
    * 序列化方式
  20.  
    */
  21.  
    private String serializeType = SerializeTypeEnum.JSON_AUTO_TYPE.getCode();
  22.  
    }
学新通

 (2) 消费者bean

  1.  
     
  2.  
    public class Consumerbean {
  3.  
     
  4.  
    /**
  5.  
    * 调用接口
  6.  
    */
  7.  
    private Class interfaceClass;
  8.  
     
  9.  
    /**
  10.  
    * 调用接口名
  11.  
    */
  12.  
    private String interfaceName;
  13.  
     
  14.  
    /**
  15.  
    * 别名
  16.  
    */
  17.  
    private String alias;
  18.  
     
  19.  
    /**
  20.  
    * 预留
  21.  
    */
  22.  
    private Boolean register;
  23.  
     
  24.  
    /**
  25.  
    * 存活生产者连接
  26.  
    */
  27.  
    private List<LsfConnection> aliveConnectionList;
  28.  
     
  29.  
    /**
  30.  
    * 手工指定生产者连接
  31.  
    */
  32.  
    private List<LsfConnection> fixedConnectionList;
  33.  
     
  34.  
    /**
  35.  
    * 父对象
  36.  
    */
  37.  
    private ParentObject parentObject;
  38.  
     
  39.  
    /**
  40.  
    * 序列化方式
  41.  
    */
  42.  
    private String serializeType = SerializeTypeEnum.JSON_AUTO_TYPE.getCode();
  43.  
     
  44.  
    /**
  45.  
    * 注册中心 bean
  46.  
    */
  47.  
    private RegistryBean registryBean;
  48.  
    }
学新通

3、核心处理逻辑

(1) 消费核心处理逻辑

  1.  
    public class ConsumerBeanInvocationHandler implements InvocationHandler {
  2.  
     
  3.  
    private static Logger logger = LoggerFactory.getLogger(ConsumerBeanInvocationHandler.class);
  4.  
     
  5.  
    private Consumerbean consumerbean;
  6.  
     
  7.  
    @Override
  8.  
    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
  9.  
     
  10.  
    //获取调用的类
  11.  
    Class tClass = consumerbean.getInterfaceClass();
  12.  
     
  13.  
    String canonicalName = tClass.getCanonicalName();
  14.  
    String methodName = method.getName();
  15.  
     
  16.  
    ParentObject parentObject = consumerbean.getParentObject();
  17.  
     
  18.  
    boolean isNoRpc = isNoRpc(methodName, parentObject);
  19.  
     
  20.  
    if (isNoRpc) {
  21.  
    return method.invoke(parentObject, args);
  22.  
    }
  23.  
    logger.debug("执行了 rpc 调用, class {}, method {}, args {}",canonicalName, methodName, Arrays.toString(args));
  24.  
     
  25.  
    //获取序列化方式
  26.  
    String serializeType = consumerbean.getSerializeType();
  27.  
     
  28.  
    //组装 rpc 参数
  29.  
    RpcParam rpcParam = getRpcParam(args, canonicalName, methodName, serializeType, method);
  30.  
     
  31.  
    //获取可用的生产者连接
  32.  
    LsfConnection connection = consumerbean.getConnection();
  33.  
     
  34.  
    //调用并得到字符串结果
  35.  
    String rpcResponseParamStr = getBody(rpcParam, connection);
  36.  
     
  37.  
    RpcResponseParam rpcResponseParam = JSON.parseObject(rpcResponseParamStr, RpcResponseParam.class);
  38.  
     
  39.  
    if (ErrorCodeEnum.SUCCESS.getCode().equals(rpcResponseParam.getCode())) {
  40.  
    //获取序列化处理类
  41.  
    LsfSerialize lsfSerialize = LsfSerializeFactory.get(serializeType);
  42.  
    //反序列化结果
  43.  
    Object result = lsfSerialize.deSerializeResult(method, rpcResponseParam.getResult());
  44.  
    return result;
  45.  
    } else {
  46.  
    //生产者抛出的异常处理
  47.  
    throw new RuntimeException(rpcResponseParam.getException());
  48.  
    }
  49.  
     
  50.  
    }
  51.  
     
  52.  
    private String getBody(RpcParam rpcParam, LsfConnection connection) {
  53.  
    LsfClient client = LsfHttpClientFactory.getClient();
  54.  
    String host = connection.getHost();
  55.  
    int port = connection.getPort();
  56.  
    ClientParam clientParam = new ClientParam();
  57.  
    clientParam.setHost(host);
  58.  
    clientParam.setPort(port);
  59.  
    clientParam.setUrl("/");
  60.  
     
  61.  
    String rpcBody = JSON.toJSONString(rpcParam);
  62.  
    clientParam.setBody(rpcBody);
  63.  
    // netty 执行网络调用
  64.  
    return client.post(clientParam);
  65.  
    }
  66.  
     
  67.  
     
  68.  
    private RpcParam getRpcParam(Object[] args, String canonicalName, String methodName, String serializeType, Method method) {
  69.  
    RpcParam rpcParam = new RpcParam();
  70.  
     
  71.  
    rpcParam.setrClass(canonicalName);
  72.  
    rpcParam.setMethod(methodName);
  73.  
    rpcParam.setSerializeType(serializeType);
  74.  
     
  75.  
    //获取序列化方式
  76.  
    LsfSerialize lsfSerialize = LsfSerializeFactory.get(serializeType);
  77.  
     
  78.  
    //序列化参数
  79.  
    String[] argsStrs = lsfSerialize.serializeParam(method, args);
  80.  
     
  81.  
    rpcParam.setArgs(argsStrs);
  82.  
    return rpcParam;
  83.  
    }
  84.  
     
  85.  
    private boolean isNoRpc(String methodName, ParentObject parentObject) {
  86.  
    boolean isNoRpc = false;
  87.  
    Method[] declaredMethods = parentObject.getClass().getDeclaredMethods();
  88.  
    for (Method declaredMethod : declaredMethods) {
  89.  
    if (declaredMethod.getName().equals(methodName)) {
  90.  
    isNoRpc = true;
  91.  
    break;
  92.  
    }
  93.  
    }
  94.  
    return isNoRpc;
  95.  
    }
  96.  
    }
学新通

(2) 生产者核心处理逻辑

  1.  
    public static String dealRequest(String body) {
  2.  
     
  3.  
    logger.info("center asyncDeal request {}",body);
  4.  
     
  5.  
    //解析 rpc 调用参数
  6.  
    RpcParam rpcParam = JSON.parseObject(body, RpcParam.class);
  7.  
     
  8.  
    String rClassStr = rpcParam.getrClass();
  9.  
    Object provider = getProvider(rClassStr);
  10.  
     
  11.  
    if (provider == null) {
  12.  
    throw new RuntimeException("没有 对应的 provider");
  13.  
    }
  14.  
     
  15.  
    String method = rpcParam.getMethod();
  16.  
    String[] argsStr = rpcParam.getArgs();
  17.  
     
  18.  
    Class<?> aClass = provider.getClass();
  19.  
     
  20.  
    String resultStr = "error";
  21.  
    RpcResponseParam rpcResponseParam = new RpcResponseParam();
  22.  
    try {
  23.  
    Method declaredMethod = null;
  24.  
     
  25.  
    //通过反射获取rpc调用的方法
  26.  
    Method[] declaredMethods = aClass.getDeclaredMethods();
  27.  
    for (Method declaredMethod1 : declaredMethods) {
  28.  
    if (declaredMethod1.getName().equals(method)) {
  29.  
    declaredMethod = declaredMethod1;
  30.  
    break;
  31.  
    }
  32.  
    }
  33.  
     
  34.  
    if (declaredMethod == null) {
  35.  
    throw new RuntimeException("没有这个方法 " method);
  36.  
    }
  37.  
     
  38.  
    //获取序列化实现类
  39.  
    LsfSerialize lsfSerialize = LsfSerializeFactory.get(rpcParam.getSerializeType());
  40.  
     
  41.  
    //反序列参数
  42.  
    Object[] inArgs = lsfSerialize.deSerializeParam(declaredMethod, argsStr);
  43.  
     
  44.  
    //调用实现类
  45.  
    Object result = declaredMethod.invoke(provider, inArgs);
  46.  
     
  47.  
    //序列化执行结果
  48.  
    String result1 = lsfSerialize.serializeResult(declaredMethod, result);
  49.  
    rpcResponseParam.setCode(ErrorCodeEnum.SUCCESS.getCode());
  50.  
    rpcResponseParam.setResult(result1);
  51.  
     
  52.  
    } catch (Exception e) {
  53.  
    //若原始方法发生异常,则封装异常信息并返回给消费者
  54.  
    rpcResponseParam.setCode(ErrorCodeEnum.EXCEPTION.getCode());
  55.  
    rpcResponseParam.setException(e.toString());
  56.  
    logger.error("lsf rpc exception rpc param {}", JSON.toJSONString(rpcParam), e);
  57.  
    }
  58.  
    resultStr = JSON.toJSONString(rpcResponseParam);
  59.  
     
  60.  
    logger.info("center asyncDeal result {}", resultStr);
  61.  
    return resultStr;
  62.  
    }
学新通

(3) 序列化接口定义

对扩展开放。新的的序列化方式可通过实现这个接口,并注册到序列化工厂里去实现。

  1.  
    public interface LsfSerialize {
  2.  
     
  3.  
    String[] serializeParam(Method method, Object[] args);
  4.  
     
  5.  
    Object[] deSerializeParam(Method method, String[] contents);
  6.  
     
  7.  
    String serializeResult(Method method, Object result);
  8.  
     
  9.  
    Object deSerializeResult(Method method, String content);
  10.  
     
  11.  
    }

(4) FastJson autoType 序列化实现

  1.  
     
  2.  
    public class JsonAutoTypeSerialize implements LsfSerialize {
  3.  
     
  4.  
    private static Logger logger = LoggerFactory.getLogger(JsonAutoTypeSerialize.class);
  5.  
     
  6.  
    {
  7.  
    ParserConfig.getGlobalInstance().setSafeMode(false);
  8.  
    ParserConfig.getGlobalInstance().setAutoTypeSupport(true);
  9.  
    }
  10.  
     
  11.  
    @Override
  12.  
    public String[] serializeParam(Method method, Object[] args) {
  13.  
     
  14.  
    String[] argsStrs = null;
  15.  
    if (args != null) {
  16.  
    argsStrs = new String[args.length];
  17.  
    for (int i = 0; i < args.length; i ) {
  18.  
    argsStrs[i] = JSON.toJSONString(args[i], SerializerFeature.WriteClassName);
  19.  
    }
  20.  
    }
  21.  
    return argsStrs;
  22.  
    }
  23.  
     
  24.  
    @Override
  25.  
    public Object[] deSerializeParam(Method method, String[] contents) {
  26.  
    Class<?>[] parameterTypes = method.getParameterTypes();
  27.  
    return getInArgs(contents, parameterTypes, method);
  28.  
    }
  29.  
     
  30.  
    private Object[] getInArgs(String[] strs, Class<?>[] parameterTypes, Method method) {
  31.  
     
  32.  
    if (strs == null) {
  33.  
    return null;
  34.  
    }
  35.  
    Type[] genericParameterTypes = method.getGenericParameterTypes();
  36.  
    Object[] inArgs = new Object[strs.length];
  37.  
    for (int i = 0; i < parameterTypes.length; i ) {
  38.  
    Class<?> parameterType = parameterTypes[i];
  39.  
    inArgs[i] = getObjectSuper(strs[i], parameterType, genericParameterTypes[i]);
  40.  
    }
  41.  
    return inArgs;
  42.  
    }
  43.  
     
  44.  
    @Override
  45.  
    public String serializeResult(Method method, Object result) {
  46.  
    return JSON.toJSONString(result, SerializerFeature.WriteClassName);
  47.  
    }
  48.  
     
  49.  
    @Override
  50.  
    public Object deSerializeResult(Method method, String content) {
  51.  
    Type genericReturnType = method.getGenericReturnType();
  52.  
    Class<?> returnType = method.getReturnType();
  53.  
    return getObjectSuper(content, returnType, genericReturnType);
  54.  
    }
  55.  
     
  56.  
    private Object getObjectSuper(String content, Class<?> returnType, Type genericReturnType) {
  57.  
    Object result = JSON.parseObject(content, returnType);
  58.  
    return result;
  59.  
    }
  60.  
    }
学新通

六、代码启动方式

先启动注册中心,然后启动生产者,最后启动消费者。

这篇好文章是转载于:学新通技术网

  • 版权申明: 本站部分内容来自互联网,仅供学习及演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,请提供相关证据及您的身份证明,我们将在收到邮件后48小时内删除。
  • 本站站名: 学新通技术网
  • 本文地址: /boutique/detail/tanhfjhieg
系列文章
更多 icon
同类精品
更多 icon
继续加载