继续Spring Webflux,WebClient实现非阻塞模式的远程调用
WebClient是从Spring WebFlux 5.0版本开始提供的一个非阻塞的基于响应式编程的进行Http请求的客户端工具。它的响应式编程的基于Reactor的。WebClient中提供了标准Http请求方式对应的get、post、put、delete等方法,可以用来发起相应的请求。
个人认为目前Spring Webflux比较适合的应用场景就是网关,比如Spring Cloud Gateway就是一个例子,因为目前它对于关系型数据库尚不支持,目前已支持的数据库包括Redis、MongoDB等Nosql数据库。
而今天的这个Demo就是基于Spring Webflux实践的服务端的远程调用,类似于Openfeign,但是由于它是基于Webflux的,因此是非阻塞的,性能比较好。
预期效果
利用Webclient,实现webflux-client对于p-webflux的远程调用。
设计
依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
编码
注释接口
/**
* @author: yunho
* @description: 服务器相关信息
*/
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
public @interface ApiServer {
String value() default "";
}
实体类
/**
* @author: yunho
* @description:
*/
@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
public class ServerInfo {
private String url;
}
/**
* @author: yunho
* @description:方法调用信息类
*/
@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
public class MethodInfo {
private String url;
private HttpMethod httpMethod;
private Map<String,Object> params;
private Mono body;
/**
* 请求body的类型
*/
private Class<?> bodyClass;
/**
* 是否flux,否则为mono
*/
private Boolean isFlux;
/**
* 返回对象类型
*/
private Class<?> returnElementType;
}
@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
public class User {
private String id;
private String name;
private int age;
}
FactoryBean配置类
@Configuration
public class FactoryBeanConfig {
@Bean
public ProxyCreator proxyCreator(){
return new JdkProxyCreator();
}
@Bean
FactoryBean<IUser> userApi(ProxyCreator proxyCreator){
return new FactoryBean<IUser>() {
//返回代理对象
@Override
public IUser getObject() throws Exception {
return (IUser) proxyCreator.createProxy(this.getObjectType());
}
@Override
public Class<?> getObjectType() {
return IUser.class;
}
};
}
}
接口
public interface ProxyCreator {
Object createProxy(Class<?> type);
}
/**
* @author: yunho
* @date: 2021/5/14 16:21
* @description:Rest 请求调用
*/
public interface RestHandler {
/**
* 初始化服务器信息
* @param serverInfo
*/
void init(ServerInfo serverInfo);
/**
* 调用rest接口,返回接口
* @param methodInfo
* @return
*/
Object invokeRest(MethodInfo methodInfo);
}
@ApiServer("http://localhost:8089/ruser")
public interface IUser {
@GetMapping("/")
Flux<User> getAllUser();
@GetMapping("/{id}")
Mono<User> getUserById(@PathVariable("id") String id);
@DeleteMapping("/{id}")
Mono<Void> deleteUser(@PathVariable("id") String id);
@PostMapping("/")
Mono<User> putUser(@RequestBody Mono<User> userMono);
}
代理类
/**
* @author: yunho
* @description: 使用jdk动态代理
*/
@Slf4j
public class JdkProxyCreator implements ProxyCreator {
@Override
public Object createProxy(Class<?> type) {
log.info("createProxy*****"+type);
//根据接口得到API信息
ServerInfo serverInfo = extractServerInfo(type);
log.info("serverInfo*****"+serverInfo);
//给每一个代理类一个实现
RestHandler restHandler = new WebClientRestHandler();
//初始化服务器信息 初始化web client
restHandler.init(serverInfo);
return Proxy.newProxyInstance(this.getClass().getClassLoader(),
new Class[]{type}, new InvocationHandler() {
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
//根据方法和参数得到调用信息
MethodInfo methodInfo = extractMethodInfo(method,args);
log.info("methodInfo*****"+methodInfo);
//调用rest
//这种写法 发现serverInfo是每次调用都会传,而且它是一样的,因此可以将它在类初始化时定义。
// return restHandler.invokeRest(serverInfo,methodInfo);
return restHandler.invokeRest(methodInfo);
}
/**
* 根据方法定义和调用参数得到相关信息
* @param method
* @param args
* @return
*/
private MethodInfo extractMethodInfo(Method method, Object[] args) {
MethodInfo methodinfo = new MethodInfo();
extractHttpMethod(method, methodinfo);
extractRequestParams(method, args, methodinfo);
//提取返回对象信息
extractReturnInfo(method,methodinfo);
return methodinfo;
}
/**
* 提取返回对象信息
* @param method
* @param methodinfo
*/
private void extractReturnInfo(Method method, MethodInfo methodinfo) {
//isAssignableFrom 判断是否某个类的子类,instanceof 判断实例
boolean assignableFrom = method.getReturnType().isAssignableFrom(Flux.class);
methodinfo.setIsFlux(assignableFrom);
//得到返回对象的实际类型
Class<?> elementType = extractElementType(method.getGenericReturnType());
methodinfo.setReturnElementType(elementType);
}
/**
* 得到泛型类型的实际类型
* @param genericReturnType
* @return
*/
private Class<?> extractElementType(Type genericReturnType) {
Type[] actualTypeArguments = ((ParameterizedType) genericReturnType).getActualTypeArguments();
return (Class<?>) actualTypeArguments[0];
}
/**
* 获取请求参数
* @param method
* @param args
* @param methodinfo
*/
private void extractRequestParams(Method method, Object[] args, MethodInfo methodinfo) {
//得到参数和body
Parameter[] parameters = method.getParameters();
Map<String,Object> params = new LinkedHashMap<>();
methodinfo.setParams(params);
for (int i = 0; i < parameters.length; i++) {
PathVariable pathVariable = parameters[i].getAnnotation(PathVariable.class);
if(pathVariable !=null){
params.put(pathVariable.value(), args[i]);
}
//是否带了requestBody
RequestBody requestBody = parameters[i].getAnnotation(RequestBody.class);
if(requestBody!=null){
methodinfo.setBody((Mono<?>) args[i]);
//请求对象的实际类型
methodinfo.setBodyClass(extractElementType(parameters[i].getParameterizedType()));
}
}
}
/**
* 获取url和httpmethod
* @param method
* @param methodinfo
*/
private void extractHttpMethod(Method method, MethodInfo methodinfo) {
Annotation[] annotations = method.getAnnotations();
for (Annotation a:annotations
) {
//GET
if(a instanceof GetMapping){
GetMapping getMapping = (GetMapping) a;
methodinfo.setUrl(getMapping.value()[0]);
methodinfo.setHttpMethod(HttpMethod.GET);
}//POST
else if(a instanceof PostMapping){
PostMapping m = (PostMapping) a;
methodinfo.setUrl(m.value()[0]);
methodinfo.setHttpMethod(HttpMethod.POST);
}//DELETE
else if(a instanceof DeleteMapping){
DeleteMapping m = (DeleteMapping) a;
methodinfo.setUrl(m.value()[0]);
methodinfo.setHttpMethod(HttpMethod.DELETE);
}
}
}
});
}
/**
* 返回服务器信息
* @param type
* @return
*/
private ServerInfo extractServerInfo(Class<?> type) {
ServerInfo serverinfo = new ServerInfo();
ApiServer annotation = type.getAnnotation(ApiServer.class);
serverinfo.setUrl(annotation.value());
return serverinfo;
}
}
核心业务处理类
/**
* @author: yunho
* @description:
*/
public class WebClientRestHandler implements RestHandler {
private WebClient webClient;
//初始化Webclient端
@Override
public void init(ServerInfo serverInfo) {
this.webClient = WebClient.create(serverInfo.getUrl());
}
/**
* 处理Rest请求
* @param methodInfo
* @return
*/
@Override
public Object invokeRest(MethodInfo methodInfo) {
Object result =null;
WebClient.RequestBodySpec accept = this.webClient
.method(methodInfo.getHttpMethod())
//url和参数
.uri(methodInfo.getUrl(), methodInfo.getParams())
.accept(MediaType.APPLICATION_JSON);
WebClient.ResponseSpec retrieve =null;
//判断是否有body
if(methodInfo.getBody()!=null){
//发出请求
retrieve= accept.body(methodInfo.getBody(), methodInfo.getBodyClass()).retrieve();
}else{
retrieve = accept.retrieve();
}
retrieve.onStatus(status -> status.value()== HttpStatus.NOT_FOUND.value()
, response -> Mono.just(new RuntimeException("未找到指定对象")));
//处理flux还是mono
if(methodInfo.getIsFlux()){
result= retrieve.bodyToFlux(methodInfo.getReturnElementType());
}else{
result=retrieve.bodyToMono(methodInfo.getReturnElementType());
}
//处理body
return result;
}
}
总结
整个代码量很小,但是实现了预期的效果,而且具备良好的扩展性
数据结构:ServerInfo和MethodInfo作为routerfunction 动态代理的实体类,与其他框架无耦合
接口:IUser 业务接口、ProxyCreator 动态代理接口、RestHandler Rest请求处理接口(核心业务)
实现类:JdkProxyCreator 基于JDK的动态代理实现。
分享一些原则
程序=数据结构+算法
设计的最重要的是解耦
实现解耦的关键是设计自己的数据结构+抽象接口
只要有改变的可能就应该定义一个接口
原文始发于微信公众号(云户):继续Spring Webflux,WebClient实现非阻塞模式的远程调用
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/25875.html