本文代码主要基于 基于Netty手写RPC框架,在他的基础上新增一些复杂的功能。
在上一个基础版本里面,我们有很多业务代码还是在生产者和消费者端编码的,这样是不是很不合理?
使用过Dubbo的读者们应该都知道会直接使用DubboReference
、DubboService
等注解来简化操作,不需要在客户端来编写相关的动态代理代码。
所以我们首先也为我们的框架添加注解驱动的功能,简化客户端的代码,可以实现直接通过注解的方式来实现远程通信。
1. 注解
首先我们在框架模块中创建注解,类似与DubboReference
、DubboService
。
该注解用在service上,暴露给消费端调用。
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.TYPE)
@Component
public @interface RemoteService {
}
该注解用在参数上,使消费端能直接调用注入的类型参数。
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.FIELD)
@Autowired
public @interface RemoteReference {
}
2. 生产者代码修改
创建完注解之后,按照之前的管理,我们只需要在需要暴露的接口上添加相关注解即可。
//自定义注解 远程接口暴露
@RemoteService
@Service
public class UserServiceImpl implements IUserService {
@Override
public String saveUser(String name) {
System.out.println("begin save user:" + name);
return "save user success:" + name;
}
}
之前我们生产者端启用的时候,会在启动方法中进行netty服务端的开启,这有点不合情理,所以我们需要进行相关的修改,首先将生产端启动的代码注释。
@ComponentScan(basePackages = {"com.rpc.example.service", "com.rpc.example.spring.service"})
@SpringBootApplication
public class NettyRpcProvider {
public static void main(String[] args) {
SpringApplication.run(NettyRpcProvider.class, args);
// 启动netty服务端
// new NettyServer("127.0.0.1", 8081).startNettyServer();
}
}
接下来就是到框架模块进行服务端的相关编码了。
由于我们在生产端关闭了启动Netty,所以我们需要让他自动启动;同时我们之前Netty服务端的事件处理器,是直接在处理器里面通过反射调用对应的方法的,我们也对其进行一定的优化。
首先我们需要创建SpringRpcProviderBean
类实现InitializingBean
,BeanPostProcessor
接口,并重写相关方法。
-
InitializingBean
InitializingBean
接口为bean提供了初始化方法的方式,它只包括afterPropertiesSet
方法,凡是继承该接口的类,在初始化bean的时候会执行该方法。 -
BeanPostProcessor
该接口我们也叫后置处理器,作用是在Bean对象在实例化和依赖注入完毕后,在显示调用初始化方法的前后添加我们自己的逻辑。注意是任何Bean实例化完毕后及依赖注入完成后都会触发的。
该类中主要是三个方法:
-
构造方法
构造方法中注入端口号以及设置好IP地址;
-
afterPropertiesSet
初始化方法这里通过线程异步启动Netty服务,在这里就解决了不需要在生产端手动启动服务了。
-
postProcessAfterInitialization
后置处理器这里我们将所有带有
RemoteService
注解的,需要暴露的接口方法都保存起来,方便服务端直接调用,不需要每次在服务端事件处理器中反射调用方法。
public class SpringRpcProviderBean implements InitializingBean, BeanPostProcessor {
private final int serverPort;
private final String serverAddress;
public SpringRpcProviderBean(int serverPort) throws UnknownHostException {
this.serverPort = serverPort;
// 获取服务器的IP地址
InetAddress address = InetAddress.getLocalHost();
System.out.println("网络地址:" + address.getHostAddress());
this.serverAddress = address.getHostAddress();
}
@Override
public void afterPropertiesSet() throws Exception {
System.out.println("begin deploy netty server");
new Thread(() -> {
new NettyServer(this.serverAddress, this.serverPort).startNettyServer();
}).start();
}
// 任何Bean装载到spring 容器的时候都会回调
@Override
public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
// 只要bean声明了RemoteService注解,则需要把该服务发布到网络上 允许被调用
if (bean.getClass().isAnnotationPresent(RemoteService.class)) {
// 得到所有方法
Method[] methods = bean.getClass().getMethods();
//我们这里需要把具体的发布映射关系保存下来
for (Method method : methods) {
String key = bean.getClass().getInterfaces()[0].getName() + "." + method.getName();
BeanMethod beanMethod = new BeanMethod();
beanMethod.setBean(bean);
beanMethod.setMethod(method);
Mediator.beanMethodMap.put(key, beanMethod);
}
}
return bean;
}
}
为了保存映射关系,我们需要创建相关的实体类以及委托对象。
实体对象只需要包含实体类以及对应的方法即可。
@Data
public class BeanMethod {
private Object bean;
private Method method;
}
委托对象这里主要通过ConcurrentHashMap
来保存我们的映射关系,同时问了保证安全性,我们这里使用单例方法对外提供调用;最后还要对外提供一个调用方法,供事件处理器调用:根据传入的类名、方法名找到对应的暴露接口,然后直接执行即可将结果返回。
public class Mediator {
// 定义好映射关系
// 管理所有bean 和 method的关系
public static Map<String, BeanMethod> beanMethodMap = new ConcurrentHashMap<>();
// 构造单例方法
private volatile static Mediator instance = null;
private Mediator() {
}
public static Mediator getInstance() {
if (instance == null) {
synchronized (Mediator.class) {
if (instance == null) {
instance = new Mediator();
}
}
}
return instance;
}
// 根据请求从map里面获取关系进行调用
public Object processor(RpcRequest request) throws InvocationTargetException, IllegalAccessException {
String key = request.getClassName() + "." + request.getMethodName();
BeanMethod beanMethod = beanMethodMap.get(key);
if (beanMethod == null) {
return null;
}
Object object = beanMethod.getBean();
Method method = beanMethod.getMethod();
return method.invoke(object, request.getParams());
}
}
服务端事件处理器只需要进行如下的修改即可。
public class RpcServerHandler extends SimpleChannelInboundHandler<RpcProtocol<RpcRequest>> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, RpcProtocol<RpcRequest> msg) throws Exception {
RpcProtocol<RpcResponse> resProtocol = new RpcProtocol<>();
// 处理服务端返回对象
Header header = msg.getHeader();
header.setReqType(ReqType.RESPONSE.code());
// 通过反射调用对应方法
// Object result = invoke(msg.getContent());
Object result = Mediator.getInstance().processor(msg.getContent());
resProtocol.setHeader(header);
RpcResponse response = new RpcResponse();
response.setData(result);
response.setMsg("success");
resProtocol.setContent(response);
ctx.writeAndFlush(resProtocol);
}
}
以上便是我们生产端相关代码的修改,但到这里就结束了吗?没错,我们还没将SpringRpcProviderBean
类交给Spring进行管理。同时我们还可以给其注入外部的配置信息,比如我们的端口号等。
我们这里想创建一个配置文件,这样便可以设置任意的端口号了。
@Data
@ConfigurationProperties("rpc")
public class RpcServerProperties {
// private String serviceAddress;
private int servicePort;
}
然后将其交给Spring容器管理:
@Configuration
@EnableConfigurationProperties(RpcServerProperties.class)
public class RpcProviderAutoConfiguration {
@Bean
public SpringRpcProviderBean springRpcProviderBean(RpcServerProperties rpcServerProperties) throws UnknownHostException {
return new SpringRpcProviderBean(rpcServerProperties.getServicePort());
}
}
至此,生产端的代码全部修改结束。
在生产端资源文件中设置端口号:
rpc.servicePort=20880
我们启动项目看下效果:
上图可以看出客户端成功启动,同时设置了Netty的端口号并将其开启。
3. 消费端代码修改
生产端的代码修改完成了,那消费端的代码该如何修改呢?
之前的基础版本,我们是直接在消费端通过动态代理获取对象调用方法的,这种代码理论上是不应该出现在客户端的,所以我们需要将其移动到框架模块中。
消费端的代码只应该如下方代码一样:
直接通过注解引入对应,然后调用对应方法即可,这才是我们平常接触到的写法。
@RestController
public class HelloController {
@RemoteReference
private IUserService userService;
@GetMapping("/say")
public String say() {
return userService.saveUser("cc");
}
}
那么接下来我们便对其进行修改。
首先创建SpringRpcReferenceBean
类实现FactoryBean
接口。
-
FactoryBean
FactoryBean通常是用来创建比较复杂的Bean,一般的bean使用XML配置即可,但若是一个Bean的创建过程中涉及到很多其他Bean 和 复杂的逻辑,使用XML配置比较复杂的话,这时可以考虑使用FactoryBean。
由于我们这里是消费端调用的代码,需要使用到动态代理,所以相关的类需要我们自行生成。
该类我们主要定义了一个初始方法,用来返回动态代理执行的结果;以及相关的属性;
RpcInvokerProxy
还是我们之前的执行类,只需要移到框架模块即可。
public class SpringRpcReferenceBean implements FactoryBean<Object> {
private Object object;
private String serviceAddress;
private int servicePort;
private Class<?> interfaceClass;
@Override
public Object getObject() throws Exception {
return object;
}
public void init() {
this.object = Proxy.newProxyInstance(
interfaceClass.getClassLoader(),
new Class<?>[]{interfaceClass},
new RpcInvokerProxy(serviceAddress, servicePort));
}
@Override
public Class<?> getObjectType() {
return this.interfaceClass;
}
public String getServiceAddress() {
return serviceAddress;
}
public void setServiceAddress(String serviceAddress) {
this.serviceAddress = serviceAddress;
}
public int getServicePort() {
return servicePort;
}
public void setServicePort(int servicePort) {
this.servicePort = servicePort;
}
public Class<?> getInterfaceClass() {
return interfaceClass;
}
public void setInterfaceClass(Class<?> interfaceClass) {
this.interfaceClass = interfaceClass;
}
}
接下来我们创建具体的实现类SpringRpcReferencePostProcessor
,需要实现ApplicationContextAware
,BeanClassLoaderAware
,BeanFactoryPostProcessor
接口,然后重写相关方法。
-
ApplicationContextAware
ApplicationContextAware 通过它Spring容器会自动把上下文环境对象调用ApplicationContextAware接口中的setApplicationContext方法。
我们在ApplicationContextAware的实现类中,就可以通过这个上下文环境对象得到Spring容器中的Bean。
-
BeanClassLoaderAware
调用setBeanClassLoader方法,获取相关类加载器。
-
BeanFactoryPostProcessor
之前我们了解过
BeanPostProcessor
:bean后置处理器,bean创建对象初始化前后进行拦截工作的而
BeanFactoryPostProcessor
:是beanFactory的后置处理器;调用时机:在
BeanFactory
标准初始化之后调用,这时所有的bean定义已经保存加载到beanFactory,但是bean的实例还未创建;能干什么:来定制和修改
BeanFactory
的内容,如覆盖或添加属性
该类首先通过setBeanClassLoader
,setApplicationContext
两个方法获取类加载器以及上下文,然后通过构造方法获得外部的配置信息。
这里定义了一个ConcurrentHashMap
用来保存发布的引用Bean的信息。什么是BeanDefinition
?
BeanDefinition
用于保存 Bean 的相关信息,包括属性、构造方法参数、依赖的 Bean 名称及是否单例、延迟加载等,它是实例化 Bean 的原材料,Spring 就是根据 BeanDefinition
中的信息实例化 Bean。
postProcessBeanFactory
方法是我们的主要实现。在该方法中我们首先遍历ConfigurableListableBeanFactory
中的所有BeanDefinition
,通过反射获取实例对象,然后遍历分析其字段是否具有RemoteReference
注解,如果有的话就创建实例并注入。
遍历完之后,我们将ConcurrentHashMap
中的数据都注册到BeanDefinitionRegistry
即可。
这样我们就可以将使用了RemoteReference
注解的字段对象注入到Spring容器中直接调用获取了。
public class SpringRpcReferencePostProcessor implements ApplicationContextAware, BeanClassLoaderAware, BeanFactoryPostProcessor {
private ApplicationContext applicationContext;
private ClassLoader classLoader;
private RpcClientProperties rpcClientProperties;
public SpringRpcReferencePostProcessor(RpcClientProperties rpcClientProperties) {
this.rpcClientProperties = rpcClientProperties;
}
// 保存发布的引用Bean的信息
private final Map<String, BeanDefinition> rpcRefBeanDefinition = new ConcurrentHashMap<>();
@Override
public void setBeanClassLoader(ClassLoader classLoader) {
this.classLoader = classLoader;
}
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
this.applicationContext = applicationContext;
}
// spring 容器加载了bean的定义文件之后,在bean实例化之前执行
@Override
public void postProcessBeanFactory(ConfigurableListableBeanFactory beanFactory) throws BeansException {
for (String beanDefinitionName : beanFactory.getBeanDefinitionNames()) {
BeanDefinition beanDefinition = beanFactory.getBeanDefinition(beanDefinitionName);
String className = beanDefinition.getBeanClassName();
if (className != null) {
Class<?> clazz = ClassUtils.resolveClassName(className, this.classLoader);
// 遍历字段解析
ReflectionUtils.doWithFields(clazz, this::parseRpcReference);
}
}
BeanDefinitionRegistry registry = (BeanDefinitionRegistry) beanFactory;
this.rpcRefBeanDefinition.forEach((beanName, beanDefinition) -> {
if (applicationContext.containsBean(beanName)) {
System.out.println("SpringContext already registry: " + beanName);
}
registry.registerBeanDefinition(beanName, beanDefinition);
System.out.println(" registry bean: " + beanName);
});
}
// 遍历字段解析
private void parseRpcReference(Field field) {
RemoteReference annotation = AnnotationUtils.getAnnotation(field, RemoteReference.class);
if (annotation != null) {
// 不为空则构建实例注入
BeanDefinitionBuilder builder = BeanDefinitionBuilder.
genericBeanDefinition(SpringRpcReferenceBean.class);
builder.setInitMethodName("init");
builder.addPropertyValue("interfaceClass", field.getType());
builder.addPropertyValue("serviceAddress", rpcClientProperties.getServiceAddress());
builder.addPropertyValue("servicePort", rpcClientProperties.getServicePort());
BeanDefinition beanDefinition = builder.getBeanDefinition();
rpcRefBeanDefinition.put(field.getName(), beanDefinition);
}
}
}
以上代码结束后,同样的,我们也需要将其交给Spring管理。
配置信息类:
@Data
public class RpcClientProperties {
private String serviceAddress = "192.168.183.1";
private int servicePort = 20880;
}
自动注入类:
@Configuration
public class RpcReferenceAutoConfiguration implements EnvironmentAware {
private Environment environment;
@Override
public void setEnvironment(Environment environment) {
this.environment = environment;
}
@Bean
public SpringRpcReferencePostProcessor postProcessor() {
RpcClientProperties rpcClientProperties = new RpcClientProperties();
rpcClientProperties.setServiceAddress(this.environment.getProperty("rpc.client.serviceAddress"));
rpcClientProperties.setServicePort(Integer.parseInt(this.environment.getProperty("rpc.client.servicePort")));
return new SpringRpcReferencePostProcessor(rpcClientProperties);
}
}
至此,消费端功能代码全部完成,我们可以在资源文件中添加相关属性,然后将消费端改为springboot项目启动即可。
server.port=8088
rpc.client.serviceAddress=192.168.183.1
rpc.client.servicePort=20880
@ComponentScan(basePackages = {"com.rpc.example.spring.reference","com.rpc.example.controller","com.rpc.example.annotation"})
@SpringBootApplication
public class NettyConsumerMain {
public static void main(String[] args) {
SpringApplication.run(NettyConsumerMain.class, args);
}
}
访问相关接口进行测试:
可以发现调用成功。
同时生产端以及消费端也变成了一个我们最基础的Spring boot项目。
4. 代码地址
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/16703.html