在 基于Netty手写RPC框架进阶版(上)——注解驱动 中,我们搭建好了注解驱动的RPC框架,接下来在本文中我们就来逐步实现注册中心以及动态扩容的功能。
1. 注册中心模块
接下来我们便来引入注册中心,这里使用zookeeper来实现,首先创建netyy-rpc-registry
模块。
首先引入zk的相关依赖:
<dependencies>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>5.2.0</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>5.2.0</version>
</dependency>
<!-- 服务注册及发现-->
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-x-discovery</artifactId>
<version>5.1.0</version>
</dependency>
</dependencies>
然后我们定义相关接口,在其中实现注册方法以及发现方法。
public interface IRegistryService {
/**
* 服务注册
*
* @param serviceInfo
*/
void register(ServiceInfo serviceInfo);
/**
* 服务发现
*
* @param serviceName
* @return
*/
ServiceInfo discovery(String serviceName);
}
这里的话我们还需要定义一个实体类封装一些相关的服务信息。
@Data
public class ServiceInfo {
private String serviceName;
private String serviceAddress;
private int servicePort;
}
接下来就是编写实现类了,这里我们可以自行选择使用zookeeper还是eureka,我这里使用zookeeper进行服务的注册。
整个实现类主要通过Curator
客户端及ServiceInstance
来实现服务的注册以及发现。这里的zookeeper相关代码在之前的文档里面由讲解过,可以自行查看。
public class ZookeeperRegistryService implements IRegistryService {
private static final String REGISTRY_PATH = "/registry";
// curator 中提供的服务注册与发现的封装
private final ServiceDiscovery<ServiceInfo> serviceDiscovery;
// 负载均衡
private ILoadBalance<ServiceInstance<ServiceInfo>> loadBalance;
public ZookeeperRegistryService(String registryAddress) throws Exception {
// 创建curator客户端
CuratorFramework client = CuratorFrameworkFactory
.newClient(registryAddress, new ExponentialBackoffRetry(1000, 3));
// 启动客户端
client.start();
// 实例化注册发现工具
JsonInstanceSerializer<ServiceInfo> serializer = new JsonInstanceSerializer<>(ServiceInfo.class);
this.serviceDiscovery = ServiceDiscoveryBuilder.builder(ServiceInfo.class)
.client(client)
.serializer(serializer)
.basePath(REGISTRY_PATH)
.build();
this.serviceDiscovery.start();
// 实例化负载均衡算法 直接写死,因为这里只有一个
this.loadBalance = new RandomLoadBalance();
}
@Override
public void register(ServiceInfo serviceInfo) throws Exception {
System.out.println("begin registry serviceInfo to zookeeper server");
//将服务端元数据保存到注册中心上
ServiceInstance<ServiceInfo> serviceInstance = ServiceInstance.<ServiceInfo>builder()
.name(serviceInfo.getServiceName())
.address(serviceInfo.getServiceAddress())
.port(serviceInfo.getServicePort())
.payload(serviceInfo)
.build();
//注册
this.serviceDiscovery.registerService(serviceInstance);
}
@Override
public ServiceInfo discovery(String serviceName) throws Exception {
System.out.println("begin discovery serviceInfo from zookeeper server");
Collection<ServiceInstance<ServiceInfo>> serviceInstances = this.serviceDiscovery.queryForInstances(serviceName);
//动态路由
ServiceInstance<ServiceInfo> serviceInstance = this.loadBalance.select((List<ServiceInstance<ServiceInfo>>) serviceInstances);
if (serviceInstance == null) {
return null;
}
return serviceInstance.getPayload();
}
}
在发现服务的代码中,我们需要实现动态路由,这里我们可以实现一个随机路由的算法。
首先创建负载均衡接口。
public interface ILoadBalance<T> {
T select(List<T> servers);
}
然后创建一个抽象类作为模板方法:
public abstract class AbstractLoadBalance implements ILoadBalance<ServiceInstance<ServiceInfo>> {
// 模板方法
@Override
public ServiceInstance<ServiceInfo> select(List<ServiceInstance<ServiceInfo>> servers) {
if (servers == null || servers.size() == 0) {
return null;
}
if (servers.size() == 1) {
return servers.get(0);
}
// 其他的由具体的子类实现
return doSelect(servers);
}
protected abstract ServiceInstance<ServiceInfo> doSelect(List<ServiceInstance<ServiceInfo>> servers);
}
不同的路由算法由各个子类单独实现:
public class RandomLoadBalance extends AbstractLoadBalance {
@Override
protected ServiceInstance<ServiceInfo> doSelect(List<ServiceInstance<ServiceInfo>> servers) {
int len = servers.size();
Random random = new Random();
return servers.get(random.nextInt(len));
}
}
服务注册这里处理完之后,我们来处理一些如何供外部调用。
首先创建类型枚举,让客户端可以选择不同的注册中心。
public enum RegistryType {
ZOOKEEPER((byte) 0),
EUREKA((byte) 1);
private byte code;
RegistryType(byte code) {
this.code = code;
}
public static RegistryType findByCode(int code) {
for (RegistryType value : RegistryType.values()) {
if (value.code == code) {
return value;
}
}
return null;
}
}
创建简易工厂类,根据不同的类型返回不同的注册服务。
public class RegistryFactory {
public static IRegistryService createRegistryService(String address, RegistryType registryType) {
IRegistryService service = null;
try {
switch (registryType) {
case EUREKA:
// todo
break;
case ZOOKEEPER:
service = new ZookeeperRegistryService(address);
break;
default:
service = new ZookeeperRegistryService(address);
break;
}
} catch (Exception e) {
e.printStackTrace();
}
return service;
}
}
2. 服务注册
注册中心模块完成之后,我们就可以到框架模块将相关的服务注册上去了。
首先引入相关依赖:
<dependency>
<groupId>org.example</groupId>
<artifactId>netty-rpc-registry</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
然后修改SpringRpcProviderBean
的相关代码:
首先引入注册中心,然后将相关服务注册上去即可。
public class SpringRpcProviderBean implements InitializingBean, BeanPostProcessor {
private final int serverPort;
private final String serverAddress;
// 服务注册相关
private final IRegistryService registryService; // 服务注册中心
public SpringRpcProviderBean(int serverPort, IRegistryService registryService) throws UnknownHostException {
this.serverPort = serverPort;
// 获取服务器的IP地址
InetAddress address = InetAddress.getLocalHost();
System.out.println("网络地址:" + address.getHostAddress());
this.serverAddress = address.getHostAddress();
this.registryService = registryService;
}
@Override
public void afterPropertiesSet() throws Exception {
System.out.println("begin deploy netty server");
new Thread(() -> {
new NettyServer(this.serverAddress, this.serverPort).startNettyServer();
}).start();
}
// 任何Bean装载到spring 容器的时候都会回调
@Override
public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
// 只要bean声明了RemoteService注解,则需要把该服务发布到网络上 允许被调用
if (bean.getClass().isAnnotationPresent(RemoteService.class)) {
// 得到所有方法
Method[] methods = bean.getClass().getMethods();
//我们这里需要把具体的发布映射关系保存下来
for (Method method : methods) {
String serviceName = bean.getClass().getInterfaces()[0].getName();
String key = serviceName + "." + method.getName();
BeanMethod beanMethod = new BeanMethod();
beanMethod.setBean(bean);
beanMethod.setMethod(method);
Mediator.beanMethodMap.put(key, beanMethod);
//服务注册相关
//发布到远程服务端
ServiceInfo serviceInfo = new ServiceInfo();
serviceInfo.setServiceAddress(this.serverAddress);
serviceInfo.setServicePort(this.serverPort);
serviceInfo.setServiceName(serviceName);
try {
registryService.register(serviceInfo); // 注册服务
} catch (Exception e) {
System.out.println(serviceName + "registry failed");
e.printStackTrace();
}
}
}
return bean;
}
}
我们需要通过配置文件信息来控制注册中心的地址以及类型:
@Data
@ConfigurationProperties("rpc")
public class RpcServerProperties {
// private String serviceAddress;
private int servicePort;
// 服务注册相关
private String registryAddress;
private byte registryType;
}
然后在注入SpringRpcProviderBean
的时候引入对应的注册中心。
@Configuration
@EnableConfigurationProperties(RpcServerProperties.class)
public class RpcProviderAutoConfiguration {
@Bean
public SpringRpcProviderBean springRpcProviderBean(RpcServerProperties rpcServerProperties) throws UnknownHostException {
// 服务注册的时候需要传递注册中心
IRegistryService registryService = RegistryFactory.createRegistryService(rpcServerProperties.getRegistryAddress(), RegistryType.findByCode(rpcServerProperties.getRegistryType()));
return new SpringRpcProviderBean(rpcServerProperties.getServicePort(), registryService);
}
}
最后在生产者端添加对应配置信息并启动项目查看。
server.port=8081
rpc.servicePort=20880
rpc.registryType=0
rpc.registryAddress=127.0.0.1:2181
打开我们的zookeeper客户端查看服务是否注册上去。
3. 消费端处理
既然我们的服务已经注册到注册中心了,那我们的消费端应该如何完善呢?还记得我们消费端Netty连接的地址和端口号都是我们在配置文件中写死的吗?如果是多个服务的话,是不是就不合理了,所以既然我们已经将服务放到注册中心了,那我们能不能直接连接到服务对应的地址上去呢?
没错,我们只需要在服务端根据名字找到对应的服务,然后将Netty客户端的地址设置成服务的地址和端口即可。
接下来逐步修改完善,首先从Netty客户端开始修改。
客户端这里我们不需要从构造方法中传入地址和端口号了,而是在发送消息的时候传入注册中心,然后注册中心根据类名获取到对应的服务,直接使用服务的地址和端口号进行连接。
public class NettyClient {
private final Bootstrap bootstrap;
private final EventLoopGroup eventLoopGroup = new NioEventLoopGroup();
// private String serviceAddress;
//
// private int servicePort;
// public NettyClient(String serviceAddress, int servicePort) {
// System.out.printf("begin init Netty Client,{},{}", serviceAddress, servicePort);
public NettyClient() {
bootstrap = new Bootstrap();
bootstrap.group(eventLoopGroup)
.channel(NioSocketChannel.class)
.handler(new RpcClientInitializer());
// this.serviceAddress = serviceAddress;
// this.servicePort = servicePort;
}
// 发送数据包
// 通过注册服务来连接对应地址
public void sendRequest(RpcProtocol<RpcRequest> protocol, IRegistryService registryService) throws Exception {
// 服务发现
ServiceInfo discovery = registryService.discovery(protocol.getContent().getClassName());
// 地址都来自注册注册中心 ,不需要通过外部传递了
final ChannelFuture future = bootstrap.connect(discovery.getServiceAddress(), discovery.getServicePort()).sync();
//监听是否连接成功
future.addListener(listener -> {
if (future.isSuccess()) {
// System.out.printf("connect rpc server {} success.", this.serviceAddress);
} else {
// System.out.printf("connect rpc server {} failed. ", this.serviceAddress);
future.cause().printStackTrace();
eventLoopGroup.shutdownGracefully();
}
});
System.out.println("begin transfer data");
future.channel().writeAndFlush(protocol);
}
}
修改完客户端后,接下来就是对动态代理部分的代码进行修改,也是将地址和端口号改为服务的地址和端口。
首先是RpcInvokerProxy
类,同样的这里不在传递地址和端口号,改为传递注册中心。
public class RpcInvokerProxy implements InvocationHandler {
// 不需要手动传入了,直接从注册中心获取
private IRegistryService registryService;
// private String host;
//
// private int port;
//
// public RpcInvokerProxy(String host, int port) {
// this.host = host;
// this.port = port;
// }
public RpcInvokerProxy(IRegistryService registryService) {
this.registryService = registryService;
}
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
System.out.println("begin invoke target server");
RpcProtocol<RpcRequest> reqProtocol = new RpcProtocol<>();
long requestId = RequestHolder.REQUEST_ID.incrementAndGet();
Header header = new Header(RpcConstant.MAGIC, SerialType.JSON_SERIAL.code(),
ReqType.REQUEST.code(), requestId, 0);
reqProtocol.setHeader(header);
RpcRequest request = new RpcRequest();
request.setClassName(method.getDeclaringClass().getName());
request.setMethodName(method.getName());
request.setParamsTypes(method.getParameterTypes());
request.setParams(args);
reqProtocol.setContent(request);
//短链接
// NettyClient nettyClient = new NettyClient(host, port);
// 不需要传递地址及端口了
NettyClient nettyClient = new NettyClient();
// 通过设置DefaultEventLoop进行轮询获取结果
RpcFuture<RpcResponse> future = new RpcFuture<>(new DefaultPromise<RpcResponse>(new DefaultEventLoop()));
// 保存请求ID和返回数据的对应关系
RequestHolder.REQUEST_MAP.put(requestId, future);
nettyClient.sendRequest(reqProtocol, registryService);
// 返回异步回调的数据
return future.getPromise().get().getData();
}
}
然后是SpringRpcReferenceBean
类,我们需要在这里给RpcInvokerProxy
类传递注册中心,这里的话需要我们设置注册中心的地址和端口号。
public class SpringRpcReferenceBean implements FactoryBean<Object> {
private Object object;
// private String serviceAddress;
//
// private int servicePort;
private Class<?> interfaceClass;
//服务注册地址
private String registryAddress;
private byte registryType;
@Override
public Object getObject() throws Exception {
return object;
}
public void init() {
// 传入注册中心
IRegistryService registryService = RegistryFactory.createRegistryService(this.registryAddress, RegistryType.findByCode(this.registryType));
this.object = Proxy.newProxyInstance(
interfaceClass.getClassLoader(),
new Class<?>[]{interfaceClass},
new RpcInvokerProxy(registryService));
// new RpcInvokerProxy(serviceAddress, servicePort));
}
@Override
public Class<?> getObjectType() {
return this.interfaceClass;
}
// public String getServiceAddress() {
// return serviceAddress;
// }
//
// public void setServiceAddress(String serviceAddress) {
// this.serviceAddress = serviceAddress;
// }
//
// public int getServicePort() {
// return servicePort;
// }
//
// public void setServicePort(int servicePort) {
// this.servicePort = servicePort;
// }
public Class<?> getInterfaceClass() {
return interfaceClass;
}
public void setInterfaceClass(Class<?> interfaceClass) {
this.interfaceClass = interfaceClass;
}
public void setRegistryAddress(String registryAddress) {
this.registryAddress = registryAddress;
}
public void setRegistryType(byte registryType) {
this.registryType = registryType;
}
}
然后在SpringRpcReferencePostProcessor
类中为SpringRpcReferenceBean
类设置注册中心的地址及端口号。
public class SpringRpcReferencePostProcessor implements ApplicationContextAware, BeanClassLoaderAware, BeanFactoryPostProcessor {
private ApplicationContext applicationContext;
private ClassLoader classLoader;
private RpcClientProperties rpcClientProperties;
public SpringRpcReferencePostProcessor(RpcClientProperties rpcClientProperties) {
this.rpcClientProperties = rpcClientProperties;
}
// 保存发布的引用Bean的信息
private final Map<String, BeanDefinition> rpcRefBeanDefinition = new ConcurrentHashMap<>();
@Override
public void setBeanClassLoader(ClassLoader classLoader) {
this.classLoader = classLoader;
}
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
this.applicationContext = applicationContext;
}
// spring 容器加载了bean的定义文件之后,在bean实例化之前执行
@Override
public void postProcessBeanFactory(ConfigurableListableBeanFactory beanFactory) throws BeansException {
for (String beanDefinitionName : beanFactory.getBeanDefinitionNames()) {
BeanDefinition beanDefinition = beanFactory.getBeanDefinition(beanDefinitionName);
String className = beanDefinition.getBeanClassName();
if (className != null) {
Class<?> clazz = ClassUtils.resolveClassName(className, this.classLoader);
// 遍历字段解析
ReflectionUtils.doWithFields(clazz, this::parseRpcReference);
}
}
BeanDefinitionRegistry registry = (BeanDefinitionRegistry) beanFactory;
this.rpcRefBeanDefinition.forEach((beanName, beanDefinition) -> {
if (applicationContext.containsBean(beanName)) {
System.out.println("SpringContext already registry: " + beanName);
}
registry.registerBeanDefinition(beanName, beanDefinition);
System.out.println(" registry bean: " + beanName);
});
}
// 遍历字段解析
private void parseRpcReference(Field field) {
RemoteReference annotation = AnnotationUtils.getAnnotation(field, RemoteReference.class);
if (annotation != null) {
// 不为空则构建实例注入
BeanDefinitionBuilder builder = BeanDefinitionBuilder.
genericBeanDefinition(SpringRpcReferenceBean.class);
builder.setInitMethodName("init");
builder.addPropertyValue("interfaceClass", field.getType());
// 使用注册中心的地址,不需要手动注入地址及端口号
// builder.addPropertyValue("serviceAddress", rpcClientProperties.getServiceAddress());
// builder.addPropertyValue("servicePort", rpcClientProperties.getServicePort());
builder.addPropertyValue("registryAddress", rpcClientProperties.getRegistryAddress());
builder.addPropertyValue("registryType", rpcClientProperties.getRegistryType());
BeanDefinition beanDefinition = builder.getBeanDefinition();
rpcRefBeanDefinition.put(field.getName(), beanDefinition);
}
}
}
接下来就是获取消费端配置文件中的信息注入。
@Data
public class RpcClientProperties {
private String serviceAddress = "192.168.183.1";
private int servicePort = 20880;
private byte registryType;
private String registryAddress;
}
@Configuration
public class RpcReferenceAutoConfiguration implements EnvironmentAware {
private Environment environment;
@Override
public void setEnvironment(Environment environment) {
this.environment = environment;
}
@Bean
public SpringRpcReferencePostProcessor postProcessor() {
RpcClientProperties rpcClientProperties = new RpcClientProperties();
// 使用注册中心就不需要手动获取对应的服务端口号及地址了
// rpcClientProperties.setServiceAddress(this.environment.getProperty("rpc.client.serviceAddress"));
// rpcClientProperties.setServicePort(Integer.parseInt(this.environment.getProperty("rpc.client.servicePort")));
rpcClientProperties.setRegistryAddress(this.environment.getProperty("rpc.client.registryAddress"));
rpcClientProperties.setRegistryType(Byte.parseByte(this.environment.getProperty("rpc.client.registryType")));
return new SpringRpcReferencePostProcessor(rpcClientProperties);
}
}
最后我们在消费端添加配置信息并启动查看效果。
server.port=8088
# rpc.client.serviceAddress=192.168.183.1
# rpc.client.servicePort=20880
rpc.client.registryAddress=127.0.0.1:2181
rpc.client.registryType=0
可以发现成功调用,说明我们的改造是成功。
4. 动态扩容
接下来我们测试一下动态扩容。
首先复制一份生产者服务并修改netty端口号。
这个时候启动一个消费端及生产端,可以发现接口是可以正常调用的。
然后启动第二个生产端。这个时候我们将两个生产端控制台清空。
多次调用接口后,可以发现两个生产端的控制台都会有打印信息,这就是我们的动态扩容功能。
5. 项目地址
以上便是基于Netty手写RPC框架的全部内容,现在来看,我们的消费端以及生产端的代码是非常干净的,从头到尾我们只需要填写注册中心的配置信息以及使用相应的注解即可,所有的业务内容我们都放在了框架模块中。
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/16702.html