一、简介
Excutor是Mybatis的最核心的接口之一。其中,SqlSession接口中的功能就是基于Excutor来完成的。
//DefaultSqlSession.java
@Override
public void select(String statement, Object parameter, RowBounds rowBounds, ResultHandler handler) {
try {
MappedStatement ms = configuration.getMappedStatement(statement);
executor.query(ms, wrapCollection(parameter), rowBounds, handler);
} catch (Exception e) {
throw ExceptionFactory.wrapException("Error querying database. Cause: " + e, e);
} finally {
ErrorContext.instance().reset();
}
}
上述方法,是SqlSession接口的Mybatis默认实现类DefaultSqlSession中的一个select方法。通过代码可以知道,最终是通过Excutor的query方法实现了查询。
二、Executor接口及其层级结构
在Mybatis中,Executor接口的实现由两类,一种是接口的真正实现类,用于操作数据库;一种是包装器类,主要是扩展Excutor的功能,比如缓存装饰器CacheExecutor。Executor接口层级结构如下图所示:
其中,BaseExecutor是Excutor接口的抽象实现类,主要提供了缓存管理和事务管理的基本功能;SimpleExecutor继承了BaseExecutor抽象类,是Mybatis提供的最简单的Executor接口实现;ReuseExecutor也继承了BaseExecutor抽象类,提供了对Statement重用的功能;BatchExecutor同样继承了BaseExecutor抽象类,实现了批处理多条 SQL 语句的功能;ClosedExecutor也继承了BaseExecutor抽象类,且是ResultLoaderMap类中的一个内部类,用于实现懒加载相关逻辑。CachingExecutor类直接实现了Excutor接口,是装饰器类,主要增强缓存相关功能。
Executor接口:
public interface Executor {
/**
* 默认的结果处理器,默认为空,主要是在查询的时候用于处理查询结果
*/
ResultHandler NO_RESULT_HANDLER = null;
/**
* 实现对数据表的增删改(insert/update/delete)操作
* @param ms
* @param parameter
* @return
* @throws SQLException
*/
int update(MappedStatement ms, Object parameter) throws SQLException;
/**
* 查询,回调处理查询结果,带分页
* @param ms
* @param parameter
* @param rowBounds
* @param resultHandler
* @param cacheKey
* @param boundSql
* @return
* @throws SQLException
*/
<E> List<E> query(MappedStatement ms, Object parameter, RowBounds rowBounds, ResultHandler resultHandler, CacheKey cacheKey, BoundSql boundSql) throws SQLException;
/**
* 查询,回调处理查询结果,
* @param ms
* @param parameter
* @param rowBounds
* @param resultHandler
* @param cacheKey
* @param boundSql
* @return
* @throws SQLException
*/
<E> List<E> query(MappedStatement ms, Object parameter, RowBounds rowBounds, ResultHandler resultHandler) throws SQLException;
/**
* 查询,返回Cursor<E>类型
* @param ms
* @param parameter
* @param rowBounds
* @return
* @throws SQLException
*/
<E> Cursor<E> queryCursor(MappedStatement ms, Object parameter, RowBounds rowBounds) throws SQLException;
/**
* 批量处理SQL语句
* @return
* @throws SQLException
*/
List<BatchResult> flushStatements() throws SQLException;
/**
* 提交事务
* @param required
* @throws SQLException
*/
void commit(boolean required) throws SQLException;
/**
* 回滚事务
* @param required
* @throws SQLException
*/
void rollback(boolean required) throws SQLException;
/**
* 创建缓存中使用的CacheKey,即缓存中使用的key
* @param ms
* @param parameterObject
* @param rowBounds
* @param boundSql
* @return
*/
CacheKey createCacheKey(MappedStatement ms, Object parameterObject, RowBounds rowBounds, BoundSql boundSql);
/**
* 根据对应的key,查找是否存在缓存
* @param ms
* @param key
* @return
*/
boolean isCached(MappedStatement ms, CacheKey key);
/**
* 清理一级缓存
*/
void clearLocalCache();
/**
* 延迟加载一级缓存中的数据
* @param ms
* @param resultObject
* @param property
* @param key
* @param targetType
*/
void deferLoad(MappedStatement ms, MetaObject resultObject, String property, CacheKey key, Class<?> targetType);
/**
* 获取事务对象
* @return
*/
Transaction getTransaction();
/**
* 关闭Executor对象
* @param forceRollback
*/
void close(boolean forceRollback);
/**
* 检查Exceutor对象是否关闭
* @return
*/
boolean isClosed();
/**
* 设置包装类
* @param executor
*/
void setExecutorWrapper(Executor executor);
}
三、BaseExecutor抽象类
BaseExecutor是Excutor接口的抽象实现类,主要提供了缓存管理和事务管理的基本功能。
1、 属性
/**
* Transaction对象,实现事务的提交、回滚和关闭操作
*/
protected Transaction transaction;
/**
* 封装了真正的 Executor对象
*/
protected Executor wrapper;
/**
* 延迟加载队列。一个基于链接节点的无界线程安全队列。此队列按照 FIFO(先进先出)原则对元素进行排序。
* 当多个线程共享访问一个公共 collection 时,ConcurrentLinkedQueue 是一个恰当的选择。此队列不允许使用 null元素。
* 该变量主要是存储一些可以延时加载的变量对象,且可供多线程使用
*/
protected ConcurrentLinkedQueue<DeferredLoad> deferredLoads;
/**
* 一级缓存,用于缓存该 Executor对象查询结果集映射得到的结果对象,
*/
protected PerpetualCache localCache;
/**
* 一级缓存,用于缓存输出类型的参数
*/
protected PerpetualCache localOutputParameterCache;
/**
* 全局唯一配置对象
*/
protected Configuration configuration;
/**
* 用来记录嵌套查询的层数,分析 DefaultResultSetHandler时介绍过的嵌套查询
*/
protected int queryStack;
/**
* 标识该执行器是否已经关闭
*/
private boolean closed;
2、 构造函数
主要实现了对上述的属性字段进行初始化工作。其中,locaCache、localOutputParameterCache都使用了PerpetualCache类型的缓存。
/**
* 构造函数,初始化变量
* @param configuration
* @param transaction
*/
protected BaseExecutor(Configuration configuration, Transaction transaction) {
this.transaction = transaction;
this.deferredLoads = new ConcurrentLinkedQueue<DeferredLoad>();
this.localCache = new PerpetualCache("LocalCache");
this.localOutputParameterCache = new PerpetualCache("LocalOutputParameterCache");
this.closed = false;
this.configuration = configuration;
this.wrapper = this;
}
3、 缓存相关操作
- createCacheKey()方法
创建缓存中使用的key,即CacheKey实例对象。创建的CacheKey实例对象,由下列参数决定其唯一性,即相同的查询,相关的参数,生成的CacheKey实例唯一且不变。依据的参数如下:
1、MappedStatement实例的ID
2、RowBounds实例的offset和limit属性
3、BoundSql实例的sql属性
4、查询条件的参数值
5、连接数据库的环境ID
@Override
public CacheKey createCacheKey(MappedStatement ms, Object parameterObject, RowBounds rowBounds, BoundSql boundSql) {
if (closed) {
throw new ExecutorException("Executor was closed.");
}
CacheKey cacheKey = new CacheKey();
cacheKey.update(ms.getId());
cacheKey.update(rowBounds.getOffset());
cacheKey.update(rowBounds.getLimit());
cacheKey.update(boundSql.getSql());
List<ParameterMapping> parameterMappings = boundSql.getParameterMappings();
TypeHandlerRegistry typeHandlerRegistry = ms.getConfiguration().getTypeHandlerRegistry();
// mimic DefaultParameterHandler logic
for (ParameterMapping parameterMapping : parameterMappings) {
if (parameterMapping.getMode() != ParameterMode.OUT) {
Object value;
String propertyName = parameterMapping.getProperty();
if (boundSql.hasAdditionalParameter(propertyName)) {
value = boundSql.getAdditionalParameter(propertyName);
} else if (parameterObject == null) {
value = null;
} else if (typeHandlerRegistry.hasTypeHandler(parameterObject.getClass())) {
value = parameterObject;
} else {
MetaObject metaObject = configuration.newMetaObject(parameterObject);
value = metaObject.getValue(propertyName);
}
cacheKey.update(value);
}
}
if (configuration.getEnvironment() != null) {
// issue #176
cacheKey.update(configuration.getEnvironment().getId());
}
return cacheKey;
}
- isCached()
判断该查询结果是否被缓存。
@Override
public boolean isCached(MappedStatement ms, CacheKey key) {
return localCache.getObject(key) != null;
}
- clearLocalCache()
清除本地缓存的方法,如下所示:
@Override
public void clearLocalCache() {
if (!closed) {
localCache.clear();
localOutputParameterCache.clear();
}
}
4、 事务相关方法
其中,commit()、rollback()方法中都执行了clearLocalCache()、flushStatements()方法,及刷新了缓存和Statement。
/**
* 获取执行器中的事务对象
*/
@Override
public Transaction getTransaction() {
if (closed) {
throw new ExecutorException("Executor was closed.");
}
return transaction;
}
/**
* 事务提交,真正提交操作是有事务实例transaction来完成
* 该方法实现了:
* 1、判断执行器是否被关闭,如果关闭就直接抛出异常
* 2、清理缓存和执行预操作
* 3、根据参数判断是否执行提交操作
*/
@Override
public void commit(boolean required) throws SQLException {
if (closed) {
throw new ExecutorException("Cannot commit, transaction is already closed");
}
clearLocalCache();
flushStatements();
if (required) {//如果
transaction.commit();
}
}
/**
* 回滚操作
*/
@Override
public void rollback(boolean required) throws SQLException {
if (!closed) {//如果该执行器没有关闭才执行回滚操作,否则就忽略,不做任何操作
try {
//清除缓存内容
clearLocalCache();
//刷新,预处理
flushStatements(true);
} finally {
if (required) {//需要回滚,就执行事务的回滚操作
transaction.rollback();
}
}
}
}
5、 关闭方法
/**
* 关闭java的Statement实例
* @param statement
*/
protected void closeStatement(Statement statement) {
if (statement != null) {
try {
if (!statement.isClosed()) {
statement.close();
}
} catch (SQLException e) {
// ignore
}
}
}
/**
* 关闭当前执行器,如果需要回滚,就执行回滚操作
* 需要把相应的资源(比如事务transaction等),同步关闭
* 把其他变量设置成NULL
*/
@Override
public void close(boolean forceRollback) {
try {
try {
rollback(forceRollback);
} finally {
if (transaction != null) {
transaction.close();
}
}
} catch (SQLException e) {
// Ignore. There's nothing that can be done at this point.
log.warn("Unexpected exception on closing transaction. Cause: " + e);
} finally {
transaction = null;
deferredLoads = null;
localCache = null;
localOutputParameterCache = null;
closed = true;
}
}
/**
* 获取当前执行器是否关闭的标识
*/
@Override
public boolean isClosed() {
return closed;
}
6、 延迟加载方法
延迟加载的方法由deferLoad()方法及内部类DeferredLoad实现了,后续会专门分析。
7、 更新操作 update()
更新操作,该方法处理了通用的情况,真正实现由子类通过实现doUpdate方法完成。该方法实现了:
1、判断该执行器是否已经关闭,如果关闭,就直接抛出异常
2、因为该操作是更新操作,所以该操作会让其中的缓存就全部失效,即清理缓存
3、调用抽象方法doUpdate实现数据更新,由子类完成。
@Override
public int update(MappedStatement ms, Object parameter) throws SQLException {
ErrorContext.instance().resource(ms.getResource()).activity("executing an update").object(ms.getId());
if (closed) {
throw new ExecutorException("Executor was closed.");
}
clearLocalCache();
return doUpdate(ms, parameter);
}
protected abstract int doUpdate(MappedStatement ms, Object parameter)
throws SQLException;
8、 刷新Statement
刷新Statement。真正的实现由具体的子类完成,且在不同的实现类中的逻辑不一样,后续再具体分析。该通用方法主要实现了:
1、判断执行器是否被关闭,如果关闭就直接抛出异常
2、调用抽象方法doFlushStatements,真正实现由子类完成
@Override
public List<BatchResult> flushStatements() throws SQLException {
return flushStatements(false);
}
public List<BatchResult> flushStatements(boolean isRollBack) throws SQLException {
if (closed) {
throw new ExecutorException("Executor was closed.");
}
return doFlushStatements(isRollBack);
}
protected abstract List<BatchResult> doFlushStatements(boolean isRollback)
throws SQLException;
9、 query()方法
@Override
public <E> List<E> query(MappedStatement ms, Object parameter, RowBounds rowBounds, ResultHandler resultHandler) throws SQLException {
BoundSql boundSql = ms.getBoundSql(parameter);
CacheKey key = createCacheKey(ms, parameter, rowBounds, boundSql);
return query(ms, parameter, rowBounds, resultHandler, key, boundSql);
}
在上述方法中,主要是根据MappedStatement获取BoundSql实例对象,还有就是生成CacheKey实例对象,然后调用重载的query()方法实现查询。
@SuppressWarnings("unchecked")
@Override
public <E> List<E> query(MappedStatement ms, Object parameter, RowBounds rowBounds, ResultHandler resultHandler, CacheKey key, BoundSql boundSql) throws SQLException {
ErrorContext.instance().resource(ms.getResource()).activity("executing a query").object(ms.getId());
if (closed) {
throw new ExecutorException("Executor was closed.");
}
if (queryStack == 0 && ms.isFlushCacheRequired()) {
clearLocalCache();
}
List<E> list;
try {
queryStack++;
//如果结果处理器==null,查询一级缓存中的数据
list = resultHandler == null ? (List<E>) localCache.getObject(key) : null;
if (list != null) {
//针对存储过程调用的处理 其功能是 在一级缓存命中时,获取缓存中保存的输出类型参数,
//并设豆到用户传入的实参( parameter )对象中。
handleLocallyCachedOutputParameters(ms, key, parameter, boundSql);
} else {//否则,执行下面语句,从数据库查询数据
list = queryFromDatabase(ms, parameter, rowBounds, resultHandler, key, boundSql);
}
} finally {
queryStack--;
}
if (queryStack == 0) {
for (DeferredLoad deferredLoad : deferredLoads) {
deferredLoad.load();
}
// issue #601
deferredLoads.clear();
//根据缓存作用域,判断是否要清空本地缓存
if (configuration.getLocalCacheScope() == LocalCacheScope.STATEMENT) {
// issue #482
clearLocalCache();
}
}
return list;
}
在该方法中,首先判断当前执行器是否已经关闭;然后再根据queryStack查询层数和flushCache属性判断,是否需要清除一级缓存;然后再判断结果处理器resultHandler是否为空,为空的话,尝试从缓存中查询数据,否则直接为空,后续从数据库查询数据;然后再根据缓存中是否有数据,如果存在数据,且是存储过程或函数类型则执行handleLocallyCachedOutputParameters()方法,如果存在数据,不是存储过程或函数类型就直接返回,如果缓存中没有数据就直接通过queryFromDatabase()方法从数据库查询数据,其中又通过doQuery()方法实现查询逻辑;后续在通过DeferredLoad类实现嵌套查询的延时加载功能。
handleLocallyCachedOutputParameters()方法:
/**
* 针对存储过程调用的处理 其功能是 在一级缓存命中时,获取缓存中保存的输出类型参数,并设豆到用户传入的实参( parameter )对象中。
* @param ms
* @param key
* @param parameter
* @param boundSql
*/
private void handleLocallyCachedOutputParameters(MappedStatement ms, CacheKey key, Object parameter, BoundSql boundSql) {
if (ms.getStatementType() == StatementType.CALLABLE) {//CALLABLE类型,即存储过程、函数类型,否则不做任何处理
//根据key获取缓存的对象
final Object cachedParameter = localOutputParameterCache.getObject(key);
if (cachedParameter != null && parameter != null) {//如果缓存对象和参数都不为null
final MetaObject metaCachedParameter = configuration.newMetaObject(cachedParameter);
final MetaObject metaParameter = configuration.newMetaObject(parameter);
for (ParameterMapping parameterMapping : boundSql.getParameterMappings()) {
if (parameterMapping.getMode() != ParameterMode.IN) {
final String parameterName = parameterMapping.getProperty();
final Object cachedValue = metaCachedParameter.getValue(parameterName);
metaParameter.setValue(parameterName, cachedValue);
}
}
}
}
}
queryFromDatabase()方法:
其中,又通过调用需要子类实现的抽象方法doQuery()实现真正的查询操作逻辑。
private <E> List<E> queryFromDatabase(MappedStatement ms, Object parameter, RowBounds rowBounds, ResultHandler resultHandler, CacheKey key, BoundSql boundSql) throws SQLException {
List<E> list;
localCache.putObject(key, EXECUTION_PLACEHOLDER);
try {
list = doQuery(ms, parameter, rowBounds, resultHandler, boundSql);
} finally {
localCache.removeObject(key);
}
localCache.putObject(key, list);
if (ms.getStatementType() == StatementType.CALLABLE) {
localOutputParameterCache.putObject(key, parameter);
}
return list;
}
10、 queryCursor()方法
查询,返回Cursor类型。该方法主要完成了根据参数生成BoundSql实例的工作,真正的查询工作还是由子类实现doQueryCursor方法来完成。
@Override
public <E> Cursor<E> queryCursor(MappedStatement ms, Object parameter, RowBounds rowBounds) throws SQLException {
BoundSql boundSql = ms.getBoundSql(parameter);
return doQueryCursor(ms, parameter, rowBounds, boundSql);
}
protected abstract <E> Cursor<E> doQueryCursor(MappedStatement ms, Object parameter, RowBounds rowBounds, BoundSql boundSql)
throws SQLException;
四、SimpleExecutor类
SimpleExecutor继承了BaseExecutor抽象类,是Mybatis提供的简单的Executor接口实现。在SimpleExecutor类中,主要实现了BaseExecutor抽象类中的四个抽象方法。
1、doUpdate()方法
在该方法中,首先通过configuration的newStatementHandler()方法,创建StatementHandler实例对象,StatementHandler实例对象用来完成真正数据库打交道的工作,即Mybatis与JDBC之间关于Statement的交互工作,然后再通过prepareStatement()方法构建了java.sql.Statement实例,最后通过StatementHandler的update()方法实现了与数据的update交互。
@Override
public int doUpdate(MappedStatement ms, Object parameter) throws SQLException {
Statement stmt = null;
try {
Configuration configuration = ms.getConfiguration();
StatementHandler handler = configuration.newStatementHandler(this, ms, parameter, RowBounds.DEFAULT, null, null);
stmt = prepareStatement(handler, ms.getStatementLog());
return handler.update(stmt);
} finally {
closeStatement(stmt);
}
}
prepareStatement()方法,用于构建java.sql.Statement实例对象,并绑定相应的参数。
private Statement prepareStatement(StatementHandler handler, Log statementLog) throws SQLException {
Statement stmt;
Connection connection = getConnection(statementLog);
stmt = handler.prepare(connection, transaction.getTimeout());
handler.parameterize(stmt);
return stmt;
}
2、doQuery()、doQueryCursor()方法
这两个方法,和doUpdate()方法逻辑类似。
@Override
public <E> List<E> doQuery(MappedStatement ms, Object parameter, RowBounds rowBounds, ResultHandler resultHandler, BoundSql boundSql) throws SQLException {
Statement stmt = null;
try {
Configuration configuration = ms.getConfiguration();
StatementHandler handler = configuration.newStatementHandler(wrapper, ms, parameter, rowBounds, resultHandler, boundSql);
stmt = prepareStatement(handler, ms.getStatementLog());
return handler.<E>query(stmt, resultHandler);
} finally {
closeStatement(stmt);
}
}
@Override
protected <E> Cursor<E> doQueryCursor(MappedStatement ms, Object parameter, RowBounds rowBounds, BoundSql boundSql) throws SQLException {
Configuration configuration = ms.getConfiguration();
StatementHandler handler = configuration.newStatementHandler(wrapper, ms, parameter, rowBounds, null, boundSql);
Statement stmt = prepareStatement(handler, ms.getStatementLog());
return handler.<E>queryCursor(stmt);
}
3、doFlushStatements()方法
在SimpleExecutor类中的doFlushStatements()方法不做任何处理,只是返回了一个空的集合对象。
@Override
public List<BatchResult> doFlushStatements(boolean isRollback) throws SQLException {
return Collections.emptyList();
}
四、ReuseExecutor类
ReuseExecutor也继承了BaseExecutor抽象类,提供了对Statement重用的功能。在ReuseExecutor类中,doUpdate()、doQuery()、doQueryCursor()三个方法的实现和SimpleExecutor类中类似,主要区别在prepareStatement()方法的实现。
- prepareStatement()方法
在prepareStatement()方法中,会先尝试从statementMap中获取缓存的 Statement 对象,否则重新创建,并放到statementMap变量中进行缓存。
private Statement prepareStatement(StatementHandler handler, Log statementLog) throws SQLException {
Statement stmt;
BoundSql boundSql = handler.getBoundSql();
String sql = boundSql.getSql();
if (hasStatementFor(sql)) {
stmt = getStatement(sql);
applyTransactionTimeout(stmt);
} else {
Connection connection = getConnection(statementLog);
stmt = handler.prepare(connection, transaction.getTimeout());
putStatement(sql, stmt);
}
handler.parameterize(stmt);
return stmt;
}
- doFlushStatements()方法
在该方法中,主要实现了缓存的Statement对象的关闭。
@Override
public List<BatchResult> doFlushStatements(boolean isRollback) throws SQLException {
for (Statement stmt : statementMap.values()) {
closeStatement(stmt);
}
statementMap.clear();
return Collections.emptyList();
}
因为篇幅原因,BatchExecutor类和CachingExecutor类在《》中分析。
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/68869.html