基于Netty手写RPC框架进阶版(上)——注解驱动

导读:本篇文章讲解 基于Netty手写RPC框架进阶版(上)——注解驱动,希望对大家有帮助,欢迎收藏,转发!站点地址:www.bmabk.com

本文代码主要基于 基于Netty手写RPC框架,在他的基础上新增一些复杂的功能。

在上一个基础版本里面,我们有很多业务代码还是在生产者和消费者端编码的,这样是不是很不合理?

使用过Dubbo的读者们应该都知道会直接使用DubboReferenceDubboService等注解来简化操作,不需要在客户端来编写相关的动态代理代码。

所以我们首先也为我们的框架添加注解驱动的功能,简化客户端的代码,可以实现直接通过注解的方式来实现远程通信。

1. 注解

首先我们在框架模块中创建注解,类似与DubboReferenceDubboService

该注解用在service上,暴露给消费端调用。

@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.TYPE)
@Component
public @interface RemoteService {

}

该注解用在参数上,使消费端能直接调用注入的类型参数。

@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.FIELD)
@Autowired
public @interface RemoteReference {

}

2. 生产者代码修改

创建完注解之后,按照之前的管理,我们只需要在需要暴露的接口上添加相关注解即可。

//自定义注解 远程接口暴露
@RemoteService
@Service
public class UserServiceImpl implements IUserService {
    @Override
    public String saveUser(String name) {
        System.out.println("begin save user:" + name);
        return "save user success:" + name;
    }
}

之前我们生产者端启用的时候,会在启动方法中进行netty服务端的开启,这有点不合情理,所以我们需要进行相关的修改,首先将生产端启动的代码注释。

@ComponentScan(basePackages = {"com.rpc.example.service", "com.rpc.example.spring.service"})
@SpringBootApplication
public class NettyRpcProvider {

    public static void main(String[] args) {
        SpringApplication.run(NettyRpcProvider.class, args);
        // 启动netty服务端
//        new NettyServer("127.0.0.1", 8081).startNettyServer();
    }
}

接下来就是到框架模块进行服务端的相关编码了。

由于我们在生产端关闭了启动Netty,所以我们需要让他自动启动;同时我们之前Netty服务端的事件处理器,是直接在处理器里面通过反射调用对应的方法的,我们也对其进行一定的优化。

首先我们需要创建SpringRpcProviderBean类实现InitializingBean,BeanPostProcessor接口,并重写相关方法。

  • InitializingBean

    InitializingBean接口为bean提供了初始化方法的方式,它只包括afterPropertiesSet方法,凡是继承该接口的类,在初始化bean的时候会执行该方法。

  • BeanPostProcessor

    该接口我们也叫后置处理器,作用是在Bean对象在实例化和依赖注入完毕后,在显示调用初始化方法的前后添加我们自己的逻辑。注意是任何Bean实例化完毕后及依赖注入完成后都会触发的。

该类中主要是三个方法:

  • 构造方法

    构造方法中注入端口号以及设置好IP地址;

  • afterPropertiesSet

    初始化方法这里通过线程异步启动Netty服务,在这里就解决了不需要在生产端手动启动服务了。

  • postProcessAfterInitialization

    后置处理器这里我们将所有带有RemoteService注解的,需要暴露的接口方法都保存起来,方便服务端直接调用,不需要每次在服务端事件处理器中反射调用方法。

public class SpringRpcProviderBean implements InitializingBean, BeanPostProcessor {

    private final int serverPort;

    private final String serverAddress;

    public SpringRpcProviderBean(int serverPort) throws UnknownHostException {
        this.serverPort = serverPort;
        // 获取服务器的IP地址
        InetAddress address = InetAddress.getLocalHost();
        System.out.println("网络地址:" + address.getHostAddress());
        this.serverAddress = address.getHostAddress();
    }

    @Override
    public void afterPropertiesSet() throws Exception {
        System.out.println("begin deploy netty server");
        new Thread(() -> {
            new NettyServer(this.serverAddress, this.serverPort).startNettyServer();
        }).start();
    }

    // 任何Bean装载到spring 容器的时候都会回调
    @Override
    public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
        // 只要bean声明了RemoteService注解,则需要把该服务发布到网络上 允许被调用
        if (bean.getClass().isAnnotationPresent(RemoteService.class)) {
            // 得到所有方法
            Method[] methods = bean.getClass().getMethods();

            //我们这里需要把具体的发布映射关系保存下来
            for (Method method : methods) {
                String key = bean.getClass().getInterfaces()[0].getName() + "." + method.getName();
                BeanMethod beanMethod = new BeanMethod();
                beanMethod.setBean(bean);
                beanMethod.setMethod(method);
                Mediator.beanMethodMap.put(key, beanMethod);
            }
        }
        return bean;
    }
}

为了保存映射关系,我们需要创建相关的实体类以及委托对象。

实体对象只需要包含实体类以及对应的方法即可。

@Data
public class BeanMethod {

    private Object bean;

    private Method method;

}

委托对象这里主要通过ConcurrentHashMap来保存我们的映射关系,同时问了保证安全性,我们这里使用单例方法对外提供调用;最后还要对外提供一个调用方法,供事件处理器调用:根据传入的类名、方法名找到对应的暴露接口,然后直接执行即可将结果返回。

public class Mediator {

    // 定义好映射关系
    // 管理所有bean 和 method的关系
    public static Map<String, BeanMethod> beanMethodMap = new ConcurrentHashMap<>();

    // 构造单例方法
    private volatile static Mediator instance = null;

    private Mediator() {
    }

    public static Mediator getInstance() {
        if (instance == null) {
            synchronized (Mediator.class) {
                if (instance == null) {
                    instance = new Mediator();
                }
            }
        }

        return instance;
    }

    // 根据请求从map里面获取关系进行调用
    public Object processor(RpcRequest request) throws InvocationTargetException, IllegalAccessException {
        String key = request.getClassName() + "." + request.getMethodName();
        BeanMethod beanMethod = beanMethodMap.get(key);
        if (beanMethod == null) {
            return null;
        }
        Object object = beanMethod.getBean();
        Method method = beanMethod.getMethod();
        return method.invoke(object, request.getParams());
    }
}

服务端事件处理器只需要进行如下的修改即可。

public class RpcServerHandler extends SimpleChannelInboundHandler<RpcProtocol<RpcRequest>> {
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, RpcProtocol<RpcRequest> msg) throws Exception {
        RpcProtocol<RpcResponse> resProtocol = new RpcProtocol<>();
        // 处理服务端返回对象
        Header header = msg.getHeader();
        header.setReqType(ReqType.RESPONSE.code());
        // 通过反射调用对应方法
//        Object result = invoke(msg.getContent());
        Object result = Mediator.getInstance().processor(msg.getContent());
        resProtocol.setHeader(header);
        RpcResponse response = new RpcResponse();
        response.setData(result);
        response.setMsg("success");
        resProtocol.setContent(response);

        ctx.writeAndFlush(resProtocol);
    }
}

以上便是我们生产端相关代码的修改,但到这里就结束了吗?没错,我们还没将SpringRpcProviderBean类交给Spring进行管理。同时我们还可以给其注入外部的配置信息,比如我们的端口号等。

我们这里想创建一个配置文件,这样便可以设置任意的端口号了。

@Data
@ConfigurationProperties("rpc")
public class RpcServerProperties {

//    private String serviceAddress;

    private int servicePort;

}

然后将其交给Spring容器管理:

@Configuration
@EnableConfigurationProperties(RpcServerProperties.class)
public class RpcProviderAutoConfiguration {

    @Bean
    public SpringRpcProviderBean springRpcProviderBean(RpcServerProperties rpcServerProperties) throws UnknownHostException {
        return new SpringRpcProviderBean(rpcServerProperties.getServicePort());
    }
    
}

至此,生产端的代码全部修改结束。

在生产端资源文件中设置端口号:

rpc.servicePort=20880

我们启动项目看下效果:

image-20220429182950595

上图可以看出客户端成功启动,同时设置了Netty的端口号并将其开启。

3. 消费端代码修改

生产端的代码修改完成了,那消费端的代码该如何修改呢?

之前的基础版本,我们是直接在消费端通过动态代理获取对象调用方法的,这种代码理论上是不应该出现在客户端的,所以我们需要将其移动到框架模块中。

消费端的代码只应该如下方代码一样:

直接通过注解引入对应,然后调用对应方法即可,这才是我们平常接触到的写法。

@RestController
public class HelloController {

    @RemoteReference
    private IUserService userService;

    @GetMapping("/say")
    public String say() {
        return userService.saveUser("cc");
    }
    
}

那么接下来我们便对其进行修改。

首先创建SpringRpcReferenceBean类实现FactoryBean接口。

  • FactoryBean

    FactoryBean通常是用来创建比较复杂的Bean,一般的bean使用XML配置即可,但若是一个Bean的创建过程中涉及到很多其他Bean 和 复杂的逻辑,使用XML配置比较复杂的话,这时可以考虑使用FactoryBean。

由于我们这里是消费端调用的代码,需要使用到动态代理,所以相关的类需要我们自行生成。

该类我们主要定义了一个初始方法,用来返回动态代理执行的结果;以及相关的属性;

RpcInvokerProxy还是我们之前的执行类,只需要移到框架模块即可。

public class SpringRpcReferenceBean implements FactoryBean<Object> {

    private Object object;

    private String serviceAddress;

    private int servicePort;

    private Class<?> interfaceClass;

    @Override
    public Object getObject() throws Exception {
        return object;
    }

    public void init() {
        this.object = Proxy.newProxyInstance(
                interfaceClass.getClassLoader(),
                new Class<?>[]{interfaceClass},
                new RpcInvokerProxy(serviceAddress, servicePort));
    }

    @Override
    public Class<?> getObjectType() {
        return this.interfaceClass;
    }

    public String getServiceAddress() {
        return serviceAddress;
    }

    public void setServiceAddress(String serviceAddress) {
        this.serviceAddress = serviceAddress;
    }

    public int getServicePort() {
        return servicePort;
    }

    public void setServicePort(int servicePort) {
        this.servicePort = servicePort;
    }

    public Class<?> getInterfaceClass() {
        return interfaceClass;
    }

    public void setInterfaceClass(Class<?> interfaceClass) {
        this.interfaceClass = interfaceClass;
    }
}

接下来我们创建具体的实现类SpringRpcReferencePostProcessor,需要实现ApplicationContextAwareBeanClassLoaderAwareBeanFactoryPostProcessor接口,然后重写相关方法。

  • ApplicationContextAware

    ApplicationContextAware 通过它Spring容器会自动把上下文环境对象调用ApplicationContextAware接口中的setApplicationContext方法。

    我们在ApplicationContextAware的实现类中,就可以通过这个上下文环境对象得到Spring容器中的Bean。

  • BeanClassLoaderAware

    调用setBeanClassLoader方法,获取相关类加载器。

  • BeanFactoryPostProcessor

    之前我们了解过BeanPostProcessor:bean后置处理器,bean创建对象初始化前后进行拦截工作的

    BeanFactoryPostProcessor:是beanFactory的后置处理器;

    调用时机:在BeanFactory标准初始化之后调用,这时所有的bean定义已经保存加载到beanFactory,但是bean的实例还未创建;

    能干什么:来定制和修改BeanFactory的内容,如覆盖或添加属性

该类首先通过setBeanClassLoadersetApplicationContext两个方法获取类加载器以及上下文,然后通过构造方法获得外部的配置信息。

这里定义了一个ConcurrentHashMap用来保存发布的引用Bean的信息。什么是BeanDefinition ?

BeanDefinition 用于保存 Bean 的相关信息,包括属性、构造方法参数、依赖的 Bean 名称及是否单例、延迟加载等,它是实例化 Bean 的原材料,Spring 就是根据 BeanDefinition 中的信息实例化 Bean。

postProcessBeanFactory方法是我们的主要实现。在该方法中我们首先遍历ConfigurableListableBeanFactory中的所有BeanDefinition,通过反射获取实例对象,然后遍历分析其字段是否具有RemoteReference注解,如果有的话就创建实例并注入。

遍历完之后,我们将ConcurrentHashMap中的数据都注册到BeanDefinitionRegistry即可。

这样我们就可以将使用了RemoteReference注解的字段对象注入到Spring容器中直接调用获取了。

public class SpringRpcReferencePostProcessor implements ApplicationContextAware, BeanClassLoaderAware, BeanFactoryPostProcessor {

    private ApplicationContext applicationContext;

    private ClassLoader classLoader;

    private RpcClientProperties rpcClientProperties;

    public SpringRpcReferencePostProcessor(RpcClientProperties rpcClientProperties) {
        this.rpcClientProperties = rpcClientProperties;
    }

    // 保存发布的引用Bean的信息
    private final Map<String, BeanDefinition> rpcRefBeanDefinition = new ConcurrentHashMap<>();

    @Override
    public void setBeanClassLoader(ClassLoader classLoader) {
        this.classLoader = classLoader;
    }

    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        this.applicationContext = applicationContext;
    }

    // spring 容器加载了bean的定义文件之后,在bean实例化之前执行
    @Override
    public void postProcessBeanFactory(ConfigurableListableBeanFactory beanFactory) throws BeansException {
        for (String beanDefinitionName : beanFactory.getBeanDefinitionNames()) {
            BeanDefinition beanDefinition = beanFactory.getBeanDefinition(beanDefinitionName);
            String className = beanDefinition.getBeanClassName();
            if (className != null) {
                Class<?> clazz = ClassUtils.resolveClassName(className, this.classLoader);
                // 遍历字段解析
                ReflectionUtils.doWithFields(clazz, this::parseRpcReference);
            }
        }

        BeanDefinitionRegistry registry = (BeanDefinitionRegistry) beanFactory;
        this.rpcRefBeanDefinition.forEach((beanName, beanDefinition) -> {
            if (applicationContext.containsBean(beanName)) {
                System.out.println("SpringContext already registry: " + beanName);
            }
            registry.registerBeanDefinition(beanName, beanDefinition);
            System.out.println(" registry bean: " + beanName);
        });
    }

    // 遍历字段解析
    private void parseRpcReference(Field field) {
        RemoteReference annotation = AnnotationUtils.getAnnotation(field, RemoteReference.class);
        if (annotation != null) {
            // 不为空则构建实例注入
            BeanDefinitionBuilder builder = BeanDefinitionBuilder.
                    genericBeanDefinition(SpringRpcReferenceBean.class);
            builder.setInitMethodName("init");
            builder.addPropertyValue("interfaceClass", field.getType());
            builder.addPropertyValue("serviceAddress", rpcClientProperties.getServiceAddress());
            builder.addPropertyValue("servicePort", rpcClientProperties.getServicePort());
            BeanDefinition beanDefinition = builder.getBeanDefinition();

            rpcRefBeanDefinition.put(field.getName(), beanDefinition);
        }
    }

}

以上代码结束后,同样的,我们也需要将其交给Spring管理。

配置信息类:

@Data
public class RpcClientProperties {

    private String serviceAddress = "192.168.183.1";

    private int servicePort = 20880;
}

自动注入类:

@Configuration
public class RpcReferenceAutoConfiguration implements EnvironmentAware {

    private Environment environment;

    @Override
    public void setEnvironment(Environment environment) {
        this.environment = environment;
    }

    @Bean
    public SpringRpcReferencePostProcessor postProcessor() {
        RpcClientProperties rpcClientProperties = new RpcClientProperties();
        rpcClientProperties.setServiceAddress(this.environment.getProperty("rpc.client.serviceAddress"));
        rpcClientProperties.setServicePort(Integer.parseInt(this.environment.getProperty("rpc.client.servicePort")));
        return new SpringRpcReferencePostProcessor(rpcClientProperties);
    }
}

至此,消费端功能代码全部完成,我们可以在资源文件中添加相关属性,然后将消费端改为springboot项目启动即可。

server.port=8088
rpc.client.serviceAddress=192.168.183.1
rpc.client.servicePort=20880
@ComponentScan(basePackages = {"com.rpc.example.spring.reference","com.rpc.example.controller","com.rpc.example.annotation"})
@SpringBootApplication
public class NettyConsumerMain {

    public static void main(String[] args) {
        SpringApplication.run(NettyConsumerMain.class, args);
    }
}

访问相关接口进行测试:

image-20220429192442402

可以发现调用成功。

同时生产端以及消费端也变成了一个我们最基础的Spring boot项目。

image-20220429192515765

4. 代码地址

Netty-RPC框架进阶版

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/16703.html

(0)
小半的头像小半

相关推荐

极客之音——专业性很强的中文编程技术网站,欢迎收藏到浏览器,订阅我们!