目录
相关知识
什么是r2dbc
传统情况Java 使用 JDBC 来操作关系型数据库,而 JDBC 是阻塞的、同步的,即使使用线程池进行改善也是有限的。基于此,Spring官方(Pivotal)提出了R2DBC(Reactive Relational Database Connectivity)。R2DBC是一项API规范计划,它声明了一个反应式API,该方法将由数据库厂商实现以访问其关系数据库。
目前所有项目都是基于JDBC的,Mybatis就是基于JDBC的。
什么是r2dbc-proxy
https://github.com/ttddyy/datasource-proxy-r2dbc
r2dbc-proxy是基于r2dbc-spi实现的代理程序,可基于java agent或结合Sleuth的方式对基于r2dbc的程序进行代理,修改原程序的r2dbc连接工厂,在数据库操作的各个阶段插入自定义逻辑
r2dbc的核心是连接工厂
使用r2dbc访问关系型数据库的核心是创建一个io.r2dbc.spi.ConnectionFactory接口的实例(通常使用单例)
数据库支持
目前实现了R2DBC的数据库驱动有:H2、MariaDB、Microsoft SQL Server、MySQL 、jasync-sql MySQL、Postgres
r2dbc对Web容器的要求
r2dbc是为WebFLux定制的,要改数据库操作方式,最好把Web容器从Tomcat、Jetty等也改成WebFLux
R2DBC 是 Spring 官方在 Spring5 发布了响应式 Web 框架 Spring WebFlux 之后急需能够满足异步响应的数据库交互 API,不过由于缺乏标准和驱动,Pivotal 团队开始自己研究响应式关系型数据库连接 Reactive Relational Database Connectivity,并提出了 R2DBC 规范 API 用来评估可行性并讨论数据库厂商是否有兴趣支持响应式的异步非阻塞驱动程序。最早只有 PostgreSQL 、H2、MSSQL 三家数据库厂商,现在 MySQL也加入进来了。
代理程序-r2dbc-proxy
使用Java agent,将代理程序写在被代理程序的启动参数中,达到修改源程序数据库连接工厂、实现代理的目的
程序示例-修改连接工厂进行代理
public class R2dbcProxyAgent {
private static ProxyConfig proxyConfig = createProxyConfig();
/**
* Configure the given {@link ProxyConfig}.
*/
private static ProxyConfig createProxyConfig() {
// as an example, printing out any method interactions and executed query.
QueryExecutionInfoFormatter queryFormatter = QueryExecutionInfoFormatter.showAll();
MethodExecutionInfoFormatter formatter = MethodExecutionInfoFormatter.withDefault();
ProxyConfig.Builder builder = ProxyConfig.builder();
builder.listener(new ProxyExecutionListener() {
@Override
public void beforeMethod(MethodExecutionInfo executionInfo) {
System.out.println("Before >> " + formatter.format(executionInfo));
}
@Override
public void afterMethod(MethodExecutionInfo executionInfo) {
System.out.println("After >> " + formatter.format(executionInfo));
}
@Override
public void afterQuery(QueryExecutionInfo execInfo) {
System.out.println("afterQuery" + queryFormatter.format(execInfo));
}
});
// To add LifeCycleListener, it needs to be wrapped by LifeCycleExecutionListener
builder.listener(new LifeCycleListener() {
@Override
public void afterCreateOnConnectionFactory(MethodExecutionInfo methodExecutionInfo) {
String msg = format(">> Connection acquired. took=%sms", methodExecutionInfo.getExecuteDuration().toMillis());
System.out.println(msg);
}
});
// Optional: use ByteBuddy to create proxies
builder.proxyFactoryFactory(ByteBuddyProxyFactory::new);
return builder.build();
}
public static void premain(String arg, Instrumentation inst) {
System.out.println("\n\n\n");
System.out.println("*****************************");
System.out.println(">>> Java Agent Activated <<<");
System.out.println("*****************************");
System.out.println("\n\n\n");
instrument(inst);
}
private static void instrument(Instrumentation inst) {
// intercept methods defined on ConnectionFactory
new AgentBuilder.Default()
.type(isSubTypeOf(ConnectionFactory.class))
.transform((builder, typeDescription, classLoader, module) -> builder
.method(named("create").or(named("getMetadata")))
.intercept(to(ConnectionFactoryInterceptor.class))
)
.installOn(inst);
}
/**
* Interceptor implementation.
* <p>
* Intercept {@link ConnectionFactory#create()} and{@link ConnectionFactory#getMetadata()}
* methods. Then, perform proxy invocation logic.
* The returned object is a proxy object and any interaction to it triggers callback
* for listeners from r2dbc-proxy framework.
* In other words, this is the entry point to the r2dbc-proxy framework.
*/
@SuppressWarnings("unchecked")
public static class ConnectionFactoryInterceptor {
@RuntimeType
public static Object intercept(@AllArguments Object[] args,
@This ConnectionFactory connectionFactory, @Origin Method method,
@SuperCall Callable<?> callable) throws Throwable {
ConnectionFactoryCallbackHandler handler = new ConnectionFactoryCallbackHandler(connectionFactory, proxyConfig);
handler.setMethodInvocationStrategy((invokedMethod, invokedTarget, invokedArgs) -> {
return callable.call(); // retrieve original result
});
Object result = handler.invoke("", method, args);
String methodName = method.getName();
if ("getMetadata".equals(methodName)) {
return result; // result is ConnectionFactoryMetadata
}
if (Mono.class.equals(method.getReturnType())) {
return Mono.from((Publisher<? extends Connection>) result);
}
return result; // return as Flux
}
}
}
连接工厂的使用
此处使用的连接工厂是io.r2dbc.spi.ConnectionFactory
ConnectionFactoryCallbackHandler handler = new ConnectionFactoryCallbackHandler(connectionFactory, proxyConfig);
配置监听器
设置每次SQL执行时,监听到执行方法后的动作
builder.listener(new ProxyExecutionListener() {
@Override
public void beforeMethod(MethodExecutionInfo executionInfo) {
System.out.println("Before >> " + formatter.format(executionInfo));
}
@Override
public void afterMethod(MethodExecutionInfo executionInfo) {
System.out.println("After >> " + formatter.format(executionInfo));
}
@Override
public void afterQuery(QueryExecutionInfo execInfo) {
System.out.println("afterQuery" + queryFormatter.format(execInfo));
}
});
被代理程序-基于r2dbc的程序
相当于我们自己写的产品,要使用r2dbc进行数据库操作
程序示例-使用R2DBC方式编程
/**
* R2DBC proxy sample application
*/
@SpringBootApplication
@RestController
public class Application {
public static void main(String[] args) {
ApplicationContext ctx = SpringApplication.run(Application.class, args);
}
@Autowired
DatabaseClient databaseClient;
@Autowired
TransactionalOperator operator;
@RequestMapping("/")
Flux<?> select() {
return this.databaseClient.execute("SELECT value FROM test;")
.map(row -> row.get("value", Integer.class))
.all();
}
@RequestMapping("/transaction")
Mono<?> transaction() {
return this.databaseClient.execute("INSERT INTO test VALUES (:value)")
.bind("value", 200)
.fetch().rowsUpdated().as(this.operator::transactional);
}
@RequestMapping("/slow")
Flux<?> slow() {
return this.databaseClient.execute("CALL SLEEP(700);").map(row -> Mono.just(-1)).all();
}
@Bean
DataSource dataSource() {
return new EmbeddedDatabaseBuilder()
.setType(EmbeddedDatabaseType.H2)
.build();
}
@Bean
CommandLineRunner bootstrap(DataSource dataSource) {
return args -> {
JdbcOperations jdbcOperations = new JdbcTemplate(dataSource);
jdbcOperations.execute("DROP TABLE IF EXISTS test");
jdbcOperations.execute("CREATE TABLE test ( value INTEGER )");
jdbcOperations.execute("INSERT INTO test VALUES (100)");
jdbcOperations.execute("INSERT INTO test VALUES (200)");
// create sleep function for slow query
jdbcOperations.execute("CREATE ALIAS SLEEP FOR \"java.lang.Thread.sleep(long)\"");
};
}
@Bean
ConnectionFactory connectionFactory() {
H2ConnectionConfiguration h2Configuration = H2ConnectionConfiguration.builder()
.username("sa")
.password("")
.inMemory("testdb")
.build();
z
ConnectionFactory connectionFactory = new H2ConnectionFactory(h2Configuration);
return connectionFactory;
}
@Bean
DatabaseClient databaseClient(ConnectionFactory connectionFactory) {
return DatabaseClient.create(connectionFactory);
}
}
H2的连接工厂
上面示例使用的H2内存数据库,ConnectionFactory的实现类直接使用的H2ConnectionFactory类
@Bean
ConnectionFactory connectionFactory() {
H2ConnectionConfiguration h2Configuration = H2ConnectionConfiguration.builder()
.username("sa")
.password("")
.inMemory("testdb")
.build();
z
ConnectionFactory connectionFactory = new H2ConnectionFactory(h2Configuration);
return connectionFactory;
}
MySQL的连接工厂
也可以通过参数配置,不指定实现类,创建MySQL的ConnectionFactory实例
@Bean
ConnectionFactory connectionFactory() {
ConnectionFactoryOptions options = ConnectionFactoryOptions.builder()
.option(DRIVER, "mysql")
.option(HOST, "127.0.0.1")
.option(USER, "root")
.option(PORT, 3306)
.option(PASSWORD, "")
.option(DATABASE, "r2dbc_stu")
.option(CONNECT_TIMEOUT, Duration.ofSeconds(3))
.build();
return ConnectionFactories.get(options);
}
代理测试
启动方式
-javaagent后是代理程序,-jar后是被代理程序
java -javaagent:package-agent/target/examples-agent-1.0-SNAPSHOT-jar-with-dependencies.jar \
-jar package-application/target/examples-application-1.0-SNAPSHOT.jar
输出结果
Before >> 31: Thread:52 Connection:n/a Time:0 H2ConnectionFactory#create()
After >> 32: Thread:52 Connection:4 Time:1 H2ConnectionFactory#create()
>> Connection acquired. took=1ms
Before >> 33: Thread:52 Connection:4 Time:0 H2Connection#createStatement()
After >> 34: Thread:52 Connection:4 Time:0 H2Connection#createStatement()
Before >> 35: Thread:52 Connection:4 Time:0 H2Statement#execute()
Before >> 36: Thread:52 Connection:4 Time:0 H2Result#map()
After >> 37: Thread:52 Connection:4 Time:1 H2Result#map()
Before >> 38: Thread:52 Connection:4 Time:0 H2Connection#close()
After >> 39: Thread:52 Connection:4 Time:0 H2Connection#close()
Thread:http-nio-8080-exec-6(52) Connection:4 Transaction:{Create:0 Rollback:0 Commit:0} Success:True Time:22381 Type:Statement BatchSize:0 BindingsSize:0 Query:["SELECT value FROM test;"] Bindings:[]
After >> 40: Thread:52 Connection:4 Time:25249 H2Statement#execute()
结论
当前阶段研究发现,r2dbc-proxy可以通过java agent的方式,无侵入地实现数据库操作的拦截
但是被代理程序需要是基于r2dbc这种响应式关系型数据库连接,目前程序都是jdbc连接,需要改造
单从程序没有r2dbc的ConnectionFactory这点看,无法直接使用r2dbc-proxy
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/93671.html