目录
RoomTrackingLiveData.mRefreshRunnable
InvalidationTracker.addObserver
InvalidationTracker.mObserverMap
InvalidationTracker.mRefreshRunnable
前言
众所周知,为防止查询阻止界面,Room 不允许在主线程上访问数据库。此限制意味着必须将 DAO 查询设为异步,当然如果你一定要在主线程中查数据库,可以配置DatabaseConfiguration的allowMainThreadQueries为true来规避。
Room 库包含与多个不同框架的集成,以提供异步查询执行功能。
DAO 查询分为三类:
- 单次写入查询,用于在数据库中插入数据或者更新或删除其中的数据。
- 单次读取查询,是指仅从数据库中读取一次数据,并在执行时返回带有数据库快照的结果。
- 可观察读取查询,是指每当底层数据库表发生变化时,都会从数据库中读取数据,并发出新值来反映这些更改。
虽然官方已经推荐从 LiveData 迁移到 Kotlin 数据流,但是某些开源的项目还依旧使用LiveData来观察数据表的变化,为了避免知其然不知其所以然带来的其它问题,本文从源码的角度分析,LiveData是如何观察Room数据表的。
将 LiveData 与 Room 一起使用
先回顾LiveData和Room如何一起使用。
Room 持久性库支持返回 LiveData 对象的可观察查询。可观察查询属于数据库访问对象 (DAO) 的一部分。
当数据库更新时,Room 会生成更新 LiveData 对象所需的所有代码。在需要时,生成的代码会在后台线程上异步运行查询。此模式有助于使界面中显示的数据与存储在数据库中的数据保持同步。您可以在 Room 持久性库指南中详细了解 Room 和 DAO。
假设有一个AudioRecordFile表,界面需要显示表里的所有数据,并且在数据表发生新增、删除,修改时自动同步到界面,相关代码如下:
@Entity(tableName = "AudioRecordFile")
data class AudioRecordFile constructor(
@PrimaryKey(autoGenerate = true)
@ColumnInfo(name = "id") var id: Long,
@NonNull @ColumnInfo(name = "createTime") var createTime: Long,
@NonNull @ColumnInfo(name = "duration") var duration: String,
@NonNull @ColumnInfo(name = "fileName") var fileName: String,
@NonNull @ColumnInfo(name = "filePath") var filePath: String
)
/**
* Data Access Object for the AudioRecordFile table.
*/
@Dao
interface AudioRecordFileDao {
/**
* Observes list of AudioRecordFile.
*
* @return all AudioRecordFile.
*/
@Query("SELECT * FROM AudioRecordFile order by createTime DESC ")
fun observeAudioRecordFileList(): LiveData<List<AudioRecordFile>>
}
可以看到,在DAO类中,定义了一个observeAudioRecordFileList方法,返回了一个LiveData对象,对应的Value是List<AudioRecordFile>。
然后在ViewModel中得到这个LiveData对象:
class AudioRecordFileLocalDataSource @Inject constructor(
private val audioRecordFileDao: AudioRecordFileDao,
private val ioDispatcher: CoroutineDispatcher = Dispatchers.IO
) :
IAudioRecordFileDataSource {
......
override fun observeAudioRecordFileList(): LiveData<Result<List<AudioRecordFile>>> {
return audioRecordFileDao.observeAudioRecordFileList().map {
Result.success(it)
}
}
......
}
class AudioRecordFileRepository constructor(
private val audioRecordLocalDataSource: IAudioRecordFileDataSource,
private val audioRecordRemoteDataSource: IAudioRecordFileDataSource,
private val ioDispatcher: CoroutineDispatcher = Dispatchers.IO
) : IAudioRecordFileRepository {
......
override fun observeAudioRecordFileList(): LiveData<Result<List<AudioRecordFile>>> {
return audioRecordLocalDataSource.observeAudioRecordFileList()
}
......
}
@HiltViewModel
class AudioRecordListViewModel @Inject constructor(
private val audioRecordFileRepository: IAudioRecordFileRepository,
private val audioTrackHandler: AudioTrackHandler
) :
ViewModel() {
private val _forceUpdate = MutableLiveData(false)
private val _items: LiveData<List<AudioRecordFile>> = _forceUpdate.switchMap { forceUpdate ->
if (forceUpdate) {
_dataLoading.value = true
viewModelScope.launch {
audioRecordFileRepository.refreshAudioRecordFileList()
_dataLoading.value = false
}
}
LogUtil.i(TAG, "observeAudioRecordFileList ")
audioRecordFileRepository.observeAudioRecordFileList().distinctUntilChanged()
.switchMap { handleAudioRecordFileList(it) }
}
val items: LiveData<List<AudioRecordFile>> = _items
private fun loadTasks(forceUpdate: Boolean) {
_forceUpdate.value = forceUpdate
}
fun refresh() {
loadTasks(true)
}
}
可以看到,ViewModel中是直接调用AudioRecordFileDao的observeAudioRecordFileList方法的,并且通过LiveData的扩展方法distinctUntilChange创建一个新的 LiveData 对象提供给外部,配合DataBinding或者直接Observe使用,当AudioRecordFile表有变动时,驱动界面更新。
观察数据表变化的本质是触发器
我们先思考一个问题,为什么DAO类的方法返回LiveData对象就能够实现数据表发生变化时通知界面更新?
实现这个功能,我们需要知道数据表何时发生变化,这样我们才能在观察变化时调用LiveData的setValue或postValue去更新数据。那关键就在于怎么判断数据表是否发生变化?
触发器
答案就是触发器(TRIGGER)。触发器是在指定的数据库事件发生时自动执行的数据库操作。可以指定一个触发器,只要发生特定数据库表的DELETE,INSERT或UPDATE,或者在表的一个或多个指定列上发生UPDATE时触发。
举个例子
假设客户记录存储在“customers”表中,并且该订单记录存储在“orders”表中,则以下UPDATE触发器可确保在客户更改其地址时重定向所有相关订单:
CREATE TRIGGER update_customer_address UPDATE OF address ON customers
BEGIN
UPDATE orders SET address = new.address WHERE customer_name = old.name;
END;
安装此触发器后,执行语句:
UPDATE customers SET address = '1 Main St.' WHERE name = 'Jack Jones';
导致以下内容被自动执行:
UPDATE orders SET address = '1 Main St.' WHERE customer_name = 'Jack Jones';
是不是很简单,也就是说,如果我们针对指定表的的DELETE,INSERT或UPDATE分别创建相应的触发器,触发后直接执行查询语句,将新的数据返回,这样我们就能够实现观察数据表发生变化并返回最新的数据了。
CREATE TRIGGER的扩展于阅读:
CREATE TRIGGER (Language) – Sqlite 中文开发手册 – 开发者手册 – 云+社区 – 腾讯云
还有其它思路吗?当然有,比如我们通过AOP监控AudioRecordFile表的每一次SQL语句执行,如果涉及到DELETE,INSERT或UPDATE等操作时,我们就在操作结束后查询最新数据并返回。但是这样很麻烦,一点也不优雅,是吧。
DAO实现类源码分析
那Room是使用触发器来实现观察数据表的吗?带着疑问,我们去看看DAO实现类编译后的源码。
observeAudioRecordFileList方法的源码实现如下:
@Override
public LiveData<List<AudioRecordFile>> observeAudioRecordFileList() {
final String _sql = "SELECT * FROM AudioRecordFile order by createTime DESC ";
final RoomSQLiteQuery _statement = RoomSQLiteQuery.acquire(_sql, 0);
return __db.getInvalidationTracker().createLiveData(new String[]{"AudioRecordFile"}, false, new Callable<List<AudioRecordFile>>() {
@Override
public List<AudioRecordFile> call() throws Exception {
final Cursor _cursor = DBUtil.query(__db, _statement, false, null);
try {
final int _cursorIndexOfId = CursorUtil.getColumnIndexOrThrow(_cursor, "id");
final int _cursorIndexOfCreateTime = CursorUtil.getColumnIndexOrThrow(_cursor, "createTime");
final int _cursorIndexOfDuration = CursorUtil.getColumnIndexOrThrow(_cursor, "duration");
final int _cursorIndexOfFileName = CursorUtil.getColumnIndexOrThrow(_cursor, "fileName");
final int _cursorIndexOfFilePath = CursorUtil.getColumnIndexOrThrow(_cursor, "filePath");
final List<AudioRecordFile> _result = new ArrayList<AudioRecordFile>(_cursor.getCount());
while(_cursor.moveToNext()) {
final AudioRecordFile _item;
final long _tmpId;
_tmpId = _cursor.getLong(_cursorIndexOfId);
final long _tmpCreateTime;
_tmpCreateTime = _cursor.getLong(_cursorIndexOfCreateTime);
final String _tmpDuration;
if (_cursor.isNull(_cursorIndexOfDuration)) {
_tmpDuration = null;
} else {
_tmpDuration = _cursor.getString(_cursorIndexOfDuration);
}
final String _tmpFileName;
if (_cursor.isNull(_cursorIndexOfFileName)) {
_tmpFileName = null;
} else {
_tmpFileName = _cursor.getString(_cursorIndexOfFileName);
}
final String _tmpFilePath;
if (_cursor.isNull(_cursorIndexOfFilePath)) {
_tmpFilePath = null;
} else {
_tmpFilePath = _cursor.getString(_cursorIndexOfFilePath);
}
_item = new AudioRecordFile(_tmpId,_tmpCreateTime,_tmpDuration,_tmpFileName,_tmpFilePath);
_result.add(_item);
}
return _result;
} finally {
_cursor.close();
}
}
@Override
protected void finalize() {
_statement.release();
}
});
}
以上代码中,最终重要的是__db.getInvalidationTracker().createLiveData()
这一行,顾名思义,InvalidationTracker的意思是失效追踪器,所谓的失效应该就是新数据更新导致旧数据失效。
跟进去看下InvalidationTracker的相关源码。
InvalidationTracker
/**
* InvalidationTracker keeps a list of tables modified by queries and notifies its callbacks about
* these tables.
*/
// Some details on how the InvalidationTracker works:
// * An in memory table is created with (table_id, invalidated) table_id is a hardcoded int from
// initialization, while invalidated is a boolean bit to indicate if the table has been invalidated.
// * ObservedTableTracker tracks list of tables we should be watching (e.g. adding triggers for).
// * Before each beginTransaction, RoomDatabase invokes InvalidationTracker to sync trigger states.
// * After each endTransaction, RoomDatabase invokes InvalidationTracker to refresh invalidated
// tables.
// * Each update (write operation) on one of the observed tables triggers an update into the
// memory table table, flipping the invalidated flag ON.
// * When multi-instance invalidation is turned on, MultiInstanceInvalidationClient will be created.
// It works as an Observer, and notifies other instances of table invalidation.
public class InvalidationTracker {
private static final String[] TRIGGERS = new String[]{"UPDATE", "DELETE", "INSERT"};
private static final String UPDATE_TABLE_NAME = "room_table_modification_log";
private static final String TABLE_ID_COLUMN_NAME = "table_id";
private static final String INVALIDATED_COLUMN_NAME = "invalidated";
private static final String CREATE_TRACKING_TABLE_SQL = "CREATE TEMP TABLE " + UPDATE_TABLE_NAME
+ "(" + TABLE_ID_COLUMN_NAME + " INTEGER PRIMARY KEY, "
+ INVALIDATED_COLUMN_NAME + " INTEGER NOT NULL DEFAULT 0)";
@VisibleForTesting
static final String RESET_UPDATED_TABLES_SQL = "UPDATE " + UPDATE_TABLE_NAME
+ " SET " + INVALIDATED_COLUMN_NAME + " = 0 WHERE " + INVALIDATED_COLUMN_NAME + " = 1 ";
@VisibleForTesting
static final String SELECT_UPDATED_TABLES_SQL = "SELECT * FROM " + UPDATE_TABLE_NAME
+ " WHERE " + INVALIDATED_COLUMN_NAME + " = 1;";
@NonNull
final HashMap<String, Integer> mTableIdLookup;
final String[] mTableNames;
@NonNull
private Map<String, Set<String>> mViewTables;
@Nullable
AutoCloser mAutoCloser = null;
@SuppressWarnings("WeakerAccess") /* synthetic access */
final RoomDatabase mDatabase;
AtomicBoolean mPendingRefresh = new AtomicBoolean(false);
private volatile boolean mInitialized = false;
@SuppressWarnings("WeakerAccess") /* synthetic access */
volatile SupportSQLiteStatement mCleanupStatement;
private ObservedTableTracker mObservedTableTracker;
private final InvalidationLiveDataContainer mInvalidationLiveDataContainer;
// should be accessed with synchronization only.
@VisibleForTesting
@SuppressLint("RestrictedApi")
final SafeIterableMap<Observer, ObserverWrapper> mObserverMap = new SafeIterableMap<>();
private MultiInstanceInvalidationClient mMultiInstanceInvalidationClient;
/**
* Used by the generated code.
*
* @hide
*/
@SuppressWarnings("WeakerAccess")
@RestrictTo(RestrictTo.Scope.LIBRARY_GROUP_PREFIX)
public InvalidationTracker(RoomDatabase database, String... tableNames) {
this(database, new HashMap<String, String>(), Collections.<String, Set<String>>emptyMap(),
tableNames);
}
/**
* Used by the generated code.
*
* @hide
*/
@SuppressWarnings("WeakerAccess")
@RestrictTo(RestrictTo.Scope.LIBRARY_GROUP_PREFIX)
public InvalidationTracker(RoomDatabase database, Map<String, String> shadowTablesMap,
Map<String, Set<String>> viewTables, String... tableNames) {
mDatabase = database;
mObservedTableTracker = new ObservedTableTracker(tableNames.length);
mTableIdLookup = new HashMap<>();
mViewTables = viewTables;
mInvalidationLiveDataContainer = new InvalidationLiveDataContainer(mDatabase);
final int size = tableNames.length;
mTableNames = new String[size];
for (int id = 0; id < size; id++) {
final String tableName = tableNames[id].toLowerCase(Locale.US);
mTableIdLookup.put(tableName, id);
String shadowTableName = shadowTablesMap.get(tableNames[id]);
if (shadowTableName != null) {
mTableNames[id] = shadowTableName.toLowerCase(Locale.US);
} else {
mTableNames[id] = tableName;
}
}
// Adjust table id lookup for those tables whose shadow table is another already mapped
// table (e.g. external content fts tables).
for (Map.Entry<String, String> shadowTableEntry : shadowTablesMap.entrySet()) {
String shadowTableName = shadowTableEntry.getValue().toLowerCase(Locale.US);
if (mTableIdLookup.containsKey(shadowTableName)) {
String tableName = shadowTableEntry.getKey().toLowerCase(Locale.US);
mTableIdLookup.put(tableName, mTableIdLookup.get(shadowTableName));
}
}
}
private void stopTrackingTable(SupportSQLiteDatabase writableDb, int tableId) {
final String tableName = mTableNames[tableId];
StringBuilder stringBuilder = new StringBuilder();
for (String trigger : TRIGGERS) {
stringBuilder.setLength(0);
stringBuilder.append("DROP TRIGGER IF EXISTS ");
appendTriggerName(stringBuilder, tableName, trigger);
writableDb.execSQL(stringBuilder.toString());
}
}
private void startTrackingTable(SupportSQLiteDatabase writableDb, int tableId) {
writableDb.execSQL(
"INSERT OR IGNORE INTO " + UPDATE_TABLE_NAME + " VALUES(" + tableId + ", 0)");
final String tableName = mTableNames[tableId];
StringBuilder stringBuilder = new StringBuilder();
for (String trigger : TRIGGERS) {
stringBuilder.setLength(0);
stringBuilder.append("CREATE TEMP TRIGGER IF NOT EXISTS ");
appendTriggerName(stringBuilder, tableName, trigger);
stringBuilder.append(" AFTER ")
.append(trigger)
.append(" ON `")
.append(tableName)
.append("` BEGIN UPDATE ")
.append(UPDATE_TABLE_NAME)
.append(" SET ").append(INVALIDATED_COLUMN_NAME).append(" = 1")
.append(" WHERE ").append(TABLE_ID_COLUMN_NAME).append(" = ").append(tableId)
.append(" AND ").append(INVALIDATED_COLUMN_NAME).append(" = 0")
.append("; END");
writableDb.execSQL(stringBuilder.toString());
}
}
}
可以看到,定义了三个触发器,对应修改,删除,插入:
private static final String[] TRIGGERS = new String[]{"UPDATE", "DELETE", "INSERT"};
并且在startTrackingTable和stopTrackingTable,分别创建和删除这些触发器,验证了我们前面的想法。
但是,虽然有了触发器,但触发器的相关操作并不是去查询数据表,而是去更新了一个名为room_table_modification_log的表,这样的目的是什么?这个问题先卖个关子,我们继续往下看。
LiveData的使用
RoomTrackingLiveData对象
__db.getInvalidationTracker().createLiveData()代码中,createLiveData方法则最终调用InvalidationLiveDataContainer类的create的方法则返回了一个RoomTrackingLiveData对象。
/**
* A helper class that maintains {@link RoomTrackingLiveData} instances for an
* {@link InvalidationTracker}.
* <p>
* We keep a strong reference to active LiveData instances to avoid garbage collection in case
* developer does not hold onto the returned LiveData.
*/
class InvalidationLiveDataContainer {
@SuppressWarnings("WeakerAccess")
@VisibleForTesting
final Set<LiveData> mLiveDataSet = Collections.newSetFromMap(
new IdentityHashMap<LiveData, Boolean>()
);
private final RoomDatabase mDatabase;
InvalidationLiveDataContainer(RoomDatabase database) {
mDatabase = database;
}
<T> LiveData<T> create(String[] tableNames, boolean inTransaction,
Callable<T> computeFunction) {
return new RoomTrackingLiveData<>(mDatabase, this, inTransaction, computeFunction,
tableNames);
}
void onActive(LiveData liveData) {
mLiveDataSet.add(liveData);
}
void onInactive(LiveData liveData) {
mLiveDataSet.remove(liveData);
}
}
/**
* A helper class that maintains {@link RoomTrackingLiveData} instances for an
* {@link InvalidationTracker}.
* <p>
* We keep a strong reference to active LiveData instances to avoid garbage collection in case
* developer does not hold onto the returned LiveData.
*/
class InvalidationLiveDataContainer {
@SuppressWarnings("WeakerAccess")
@VisibleForTesting
final Set<LiveData> mLiveDataSet = Collections.newSetFromMap(
new IdentityHashMap<LiveData, Boolean>()
);
private final RoomDatabase mDatabase;
InvalidationLiveDataContainer(RoomDatabase database) {
mDatabase = database;
}
<T> LiveData<T> create(String[] tableNames, boolean inTransaction,
Callable<T> computeFunction) {
return new RoomTrackingLiveData<>(mDatabase, this, inTransaction, computeFunction,
tableNames);
}
void onActive(LiveData liveData) {
mLiveDataSet.add(liveData);
}
void onInactive(LiveData liveData) {
mLiveDataSet.remove(liveData);
}
}
@SuppressLint("RestrictedApi")
RoomTrackingLiveData(
RoomDatabase database,
InvalidationLiveDataContainer container,
boolean inTransaction,
Callable<T> computeFunction,
String[] tableNames) {
mDatabase = database;
mInTransaction = inTransaction;
mComputeFunction = computeFunction;
mContainer = container;
mObserver = new InvalidationTracker.Observer(tableNames) {
@Override
public void onInvalidated(@NonNull Set<String> tables) {
ArchTaskExecutor.getInstance().executeOnMainThread(mInvalidationRunnable);
}
};
}
我们知道,更新LiveData的值,要么调用setValue,要么调用postValue,也就是说,触发器之后的操作需要更新最新的表数据时,就必须然要调用以上的setValue/postValue,顺着这个思路,我们去查看RoomTrackingLiveData的代码,发现mRefreshRunnable有调用postValue。
RoomTrackingLiveData.mRefreshRunnable
@SuppressWarnings("WeakerAccess")final Runnable mRefreshRunnable = new Runnable() { @WorkerThread @Override public void run() { if (mRegisteredObserver.compareAndSet(false, true)) { mDatabase.getInvalidationTracker().addWeakObserver(mObserver); } boolean computed; do { computed = false; // compute can happen only in 1 thread but no reason to lock others. if (mComputing.compareAndSet(false, true)) { // as long as it is invalid, keep computing. try { T value = null; while (mInvalid.compareAndSet(true, false)) { computed = true; try { value = mComputeFunction.call(); } catch (Exception e) { throw new RuntimeException("Exception while computing database" + " live data.", e); } } if (computed) { postValue(value); } } finally { // release compute lock mComputing.set(false); } } // check invalid after releasing compute lock to avoid the following scenario. // Thread A runs compute() // Thread A checks invalid, it is false // Main thread sets invalid to true // Thread B runs, fails to acquire compute lock and skips // Thread A releases compute lock // We've left invalid in set state. The check below recovers. } while (computed && mInvalid.get()); }};
先不管其它细枝末节,可以看到postValue的value是调用mComputeFunction.call();返回的,mComputeFunction则对应前面observeAudioRecordFileList源码中createLiveData的参数:
@Override
public LiveData<List<AudioRecordFile>> observeAudioRecordFileList() {
final String _sql = "SELECT * FROM AudioRecordFile order by createTime DESC ";
final RoomSQLiteQuery _statement = RoomSQLiteQuery.acquire(_sql, 0);
return __db.getInvalidationTracker().createLiveData(new String[]{"AudioRecordFile"}, false, new Callable<List<AudioRecordFile>>() {
@Override
public List<AudioRecordFile> call() throws Exception {
final Cursor _cursor = DBUtil.query(__db, _statement, false, null);
try {
final int _cursorIndexOfId = CursorUtil.getColumnIndexOrThrow(_cursor, "id");
final int _cursorIndexOfCreateTime = CursorUtil.getColumnIndexOrThrow(_cursor, "createTime");
final int _cursorIndexOfDuration = CursorUtil.getColumnIndexOrThrow(_cursor, "duration");
final int _cursorIndexOfFileName = CursorUtil.getColumnIndexOrThrow(_cursor, "fileName");
final int _cursorIndexOfFilePath = CursorUtil.getColumnIndexOrThrow(_cursor, "filePath");
final List<AudioRecordFile> _result = new ArrayList<AudioRecordFile>(_cursor.getCount());
while(_cursor.moveToNext()) {
final AudioRecordFile _item;
final long _tmpId;
_tmpId = _cursor.getLong(_cursorIndexOfId);
final long _tmpCreateTime;
_tmpCreateTime = _cursor.getLong(_cursorIndexOfCreateTime);
final String _tmpDuration;
if (_cursor.isNull(_cursorIndexOfDuration)) {
_tmpDuration = null;
} else {
_tmpDuration = _cursor.getString(_cursorIndexOfDuration);
}
final String _tmpFileName;
if (_cursor.isNull(_cursorIndexOfFileName)) {
_tmpFileName = null;
} else {
_tmpFileName = _cursor.getString(_cursorIndexOfFileName);
}
final String _tmpFilePath;
if (_cursor.isNull(_cursorIndexOfFilePath)) {
_tmpFilePath = null;
} else {
_tmpFilePath = _cursor.getString(_cursorIndexOfFilePath);
}
_item = new AudioRecordFile(_tmpId,_tmpCreateTime,_tmpDuration,_tmpFileName,_tmpFilePath);
_result.add(_item);
}
return _result;
} finally {
_cursor.close();
}
}
@Override
protected void finalize() {
_statement.release();
}
});
可以看到value就是call方法的返回值,返回的是对应sql语句查询返回的数据。那剩下的关键就是,触发器的操作是怎么最终调用到mRefreshRunnable的。
继续看RoomTrackingLiveData源码,可以看到只有以下两个地方执行了mRefreshRunnable:
@SuppressWarnings("WeakerAccess")
final Runnable mInvalidationRunnable = new Runnable() {
@MainThread
@Override
public void run() {
boolean isActive = hasActiveObservers();
if (mInvalid.compareAndSet(false, true)) {
if (isActive) {
getQueryExecutor().execute(mRefreshRunnable);
}
}
}
};
@Override
protected void onActive() {
super.onActive();
mContainer.onActive(this);
getQueryExecutor().execute(mRefreshRunnable);
}
InvalidationTracker.Observer
onActive可以先不用管,我们看mInvalidationRunnable,发现是InvalidationTracker.Observer类中的onInvalidated方法中执行的mInvalidationRunnable:
RoomTrackingLiveData(
RoomDatabase database,
InvalidationLiveDataContainer container,
boolean inTransaction,
Callable<T> computeFunction,
String[] tableNames) {
mDatabase = database;
mInTransaction = inTransaction;
mComputeFunction = computeFunction;
mContainer = container;
mObserver = new InvalidationTracker.Observer(tableNames) {
@Override
public void onInvalidated(@NonNull Set<String> tables) {
ArchTaskExecutor.getInstance().executeOnMainThread(mInvalidationRunnable);
}
};
}
而这个mObserver又被add到mDatabase的InvalidationTracker中:
if (mRegisteredObserver.compareAndSet(false, true)) {
mDatabase.getInvalidationTracker().addWeakObserver(mObserver);
}
最终put进了mObserverMap。
InvalidationTracker.addObserver
/**
* Adds the given observer to the observers list and it will be notified if any table it
* observes changes.
* <p>
* Database changes are pulled on another thread so in some race conditions, the observer might
* be invoked for changes that were done before it is added.
* <p>
* If the observer already exists, this is a no-op call.
* <p>
* If one of the tables in the Observer does not exist in the database, this method throws an
* {@link IllegalArgumentException}.
* <p>
* This method should be called on a background/worker thread as it performs database
* operations.
*
* @param observer The observer which listens the database for changes.
*/
@SuppressLint("RestrictedApi")
@WorkerThread
public void addObserver(@NonNull Observer observer) {
final String[] tableNames = resolveViews(observer.mTables);
int[] tableIds = new int[tableNames.length];
final int size = tableNames.length;
for (int i = 0; i < size; i++) {
Integer tableId = mTableIdLookup.get(tableNames[i].toLowerCase(Locale.US));
if (tableId == null) {
throw new IllegalArgumentException("There is no table with name " + tableNames[i]);
}
tableIds[i] = tableId;
}
ObserverWrapper wrapper = new ObserverWrapper(observer, tableIds, tableNames);
ObserverWrapper currentObserver;
synchronized (mObserverMap) {
currentObserver = mObserverMap.putIfAbsent(observer, wrapper);
}
if (currentObserver == null && mObservedTableTracker.onAdded(tableIds)) {
syncTriggers();
}
}
这样一来,我们猜测触发器最终肯定直接或者间接的遍历mObserverMap,并调用对应对饮Observer的onInvalidated方法。
InvalidationTracker.mObserverMap
顺着思路继续继续查找遍历mObserverMap的代码,发现两个地方:
/**
* Notifies all the registered {@link Observer}s of table changes.
* <p>
* This can be used for notifying invalidation that cannot be detected by this
* {@link InvalidationTracker}, for example, invalidation from another process.
*
* @param tables The invalidated tables.
* @hide
*/
@RestrictTo(RestrictTo.Scope.LIBRARY)
@VisibleForTesting(otherwise = VisibleForTesting.PACKAGE_PRIVATE)
public void notifyObserversByTableNames(String... tables) {
synchronized (mObserverMap) {
for (Map.Entry<Observer, ObserverWrapper> entry : mObserverMap) {
if (!entry.getKey().isRemote()) {
entry.getValue().notifyByTableNames(tables);
}
}
}
}
根据注释,notifyObserversByTableNames主要是用于 InvalidationTracker 无法检测到数据表更新的场景,如其它进程更新了表数据,InvalidationTracker检测不到(为啥?触发器应该能触发吧?继续往下看吧),此时可以调用notifyObserversByTableNames强制遍历所有的Observer进行通知刷新。
notifyObserversByTableNames不是重点需要关心的,继续往下看。
InvalidationTracker.mRefreshRunnable
@VisibleForTesting
Runnable mRefreshRunnable = new Runnable() {
@Override
public void run() {
final Lock closeLock = mDatabase.getCloseLock();
Set<Integer> invalidatedTableIds = null;
closeLock.lock();
try {
if (!ensureInitialization()) {
return;
}
if (!mPendingRefresh.compareAndSet(true, false)) {
// no pending refresh
return;
}
if (mDatabase.inTransaction()) {
// current thread is in a transaction. when it ends, it will invoke
// refreshRunnable again. mPendingRefresh is left as false on purpose
// so that the last transaction can flip it on again.
return;
}
if (mDatabase.mWriteAheadLoggingEnabled) {
// This transaction has to be on the underlying DB rather than the RoomDatabase
// in order to avoid a recursive loop after endTransaction.
SupportSQLiteDatabase db = mDatabase.getOpenHelper().getWritableDatabase();
db.beginTransactionNonExclusive();
try {
invalidatedTableIds = checkUpdatedTable();
db.setTransactionSuccessful();
} finally {
db.endTransaction();
}
} else {
invalidatedTableIds = checkUpdatedTable();
}
} catch (IllegalStateException | SQLiteException exception) {
// may happen if db is closed. just log.
Log.e(Room.LOG_TAG, "Cannot run invalidation tracker. Is the db closed?",
exception);
} finally {
closeLock.unlock();
if (mAutoCloser != null) {
mAutoCloser.decrementCountAndScheduleClose();
}
}
if (invalidatedTableIds != null && !invalidatedTableIds.isEmpty()) {
synchronized (mObserverMap) {
for (Map.Entry<Observer, ObserverWrapper> entry : mObserverMap) {
entry.getValue().notifyByTableInvalidStatus(invalidatedTableIds);
}
}
}
}
private Set<Integer> checkUpdatedTable() {
HashSet<Integer> invalidatedTableIds = new HashSet<>();
Cursor cursor = mDatabase.query(new SimpleSQLiteQuery(SELECT_UPDATED_TABLES_SQL));
//noinspection TryFinallyCanBeTryWithResources
try {
while (cursor.moveToNext()) {
final int tableId = cursor.getInt(0);
invalidatedTableIds.add(tableId);
}
} finally {
cursor.close();
}
if (!invalidatedTableIds.isEmpty()) {
mCleanupStatement.executeUpdateDelete();
}
return invalidatedTableIds;
}
};
mRefreshRunnable代码比较长,我们耐心看:
if (invalidatedTableIds != null && !invalidatedTableIds.isEmpty()) {
synchronized (mObserverMap) {
for (Map.Entry<Observer, ObserverWrapper> entry : mObserverMap) {
entry.getValue().notifyByTableInvalidStatus(invalidatedTableIds);
}
}
}
/**
* Notifies the underlying {@link #mObserver} if any of the observed tables are invalidated
* based on the given invalid status set.
*
* @param invalidatedTablesIds The table ids of the tables that are invalidated.
*/
void notifyByTableInvalidStatus(Set<Integer> invalidatedTablesIds) {
Set<String> invalidatedTables = null;
final int size = mTableIds.length;
for (int index = 0; index < size; index++) {
final int tableId = mTableIds[index];
if (invalidatedTablesIds.contains(tableId)) {
if (size == 1) {
// Optimization for a single-table observer
invalidatedTables = mSingleTableSet;
} else {
if (invalidatedTables == null) {
invalidatedTables = new HashSet<>(size);
}
invalidatedTables.add(mTableNames[index]);
}
}
}
if (invalidatedTables != null) {
mObserver.onInvalidated(invalidatedTables);
}
}
可以看到mObserverMap遍历后会调用notifyByTableInvalidStatus方法,notifyByTableInvalidStatus方法会直接调用mObserver.onInvalidated(invalidatedTables),最终执行查询数据表的操作,并将返回值通过LiveData的postValue发出去。
前面我们提到room_table_modification_log的表,作用是什么?mRefreshRunnable告诉我们答案。
private Set<Integer> checkUpdatedTable() {
HashSet<Integer> invalidatedTableIds = new HashSet<>();
Cursor cursor = mDatabase.query(new SimpleSQLiteQuery(SELECT_UPDATED_TABLES_SQL));
//noinspection TryFinallyCanBeTryWithResources
try {
while (cursor.moveToNext()) {
final int tableId = cursor.getInt(0);
invalidatedTableIds.add(tableId);
}
} finally {
cursor.close();
}
if (!invalidatedTableIds.isEmpty()) {
mCleanupStatement.executeUpdateDelete();
}
return invalidatedTableIds;
}
可以看到mRefreshRunnable会调用checkUpdatedTable方法,查询room_table_modification_log表中记录的发生变化的表得到invalidatedTablesIds,如果invalidatedTablesIds不为空,并且包含我们设置返回LiveData的表,则回调对应的Observer的onInvalidated方法。
接下来只需要搞清楚mRefreshRunnable什么时候会执行的即可。mRefreshRunnable有两个调用的地方,我们继续往下看。
refreshVersionsAsync
/**
* Enqueues a task to refresh the list of updated tables.
* <p>
* This method is automatically called when {@link RoomDatabase#endTransaction()} is called but
* if you have another connection to the database or directly use {@link
* SupportSQLiteDatabase}, you may need to call this manually.
*/
@SuppressWarnings("WeakerAccess")
public void refreshVersionsAsync() {
// TODO we should consider doing this sync instead of async.
if (mPendingRefresh.compareAndSet(false, true)) {
if (mAutoCloser != null) {
// refreshVersionsAsync is called with the ref count incremented from
// RoomDatabase, so the db can't be closed here, but we need to be sure that our
// db isn't closed until refresh is completed. This increment call must be
// matched with a corresponding call in mRefreshRunnable.
mAutoCloser.incrementCountAndEnsureDbIsOpen();
}
mDatabase.getQueryExecutor().execute(mRefreshRunnable);
}
}
refreshVersionsAsync明显是一个异步方法,根据注释以及代码调用跟踪,该方法在endTransaction后调用:
/**
* Wrapper for {@link SupportSQLiteDatabase#endTransaction()}.
*
* @deprecated Use {@link #runInTransaction(Runnable)}
*/
@Deprecated
public void endTransaction() {
if (mAutoCloser == null) {
internalEndTransaction();
} else {
mAutoCloser.executeRefCountingFunction(db -> {
internalEndTransaction();
return null;
});
}
}
private void internalEndTransaction() {
mOpenHelper.getWritableDatabase().endTransaction();
if (!inTransaction()) {
// enqueue refresh only if we are NOT in a transaction. Otherwise, wait for the last
// endTransaction call to do it.
mInvalidationTracker.refreshVersionsAsync();
}
}
以新增一个Entity为例:
@Override
public Object insertAudioRecordFile(final AudioRecordFile audioRecordFile,
final Continuation<? super Long> continuation) {
return CoroutinesRoom.execute(__db, true, new Callable<Long>() {
@Override
public Long call() throws Exception {
__db.beginTransaction();
try {
long _result = __insertionAdapterOfAudioRecordFile.insertAndReturnId(audioRecordFile);
__db.setTransactionSuccessful();
return _result;
} finally {
__db.endTransaction();
}
}
}, continuation);
}
当我们向数据库中插入一条数据时,不管成功或者失败,最终都会调用__db.endTransaction(),这样就会触发refreshVersionsAsync执行,注意这里的__db.endTransaction()方法是RoomDatabase对象的,不是SupportSQLiteDatabase对象的。
对于触发器来说,当执行insertAudioRecordFile插入语句的时候,在真正插入数据之前触发器就会执行相关操作修改room_table_modification_log表中的记录,记录发生变动的表的id和对应状态,等插入完成执行__db.endTransaction();操作时,再去查询room_table_modification_log表执行相关LiveData对象的postValue的业务逻辑。
private void startTrackingTable(SupportSQLiteDatabase writableDb, int tableId) {
writableDb.execSQL(
"INSERT OR IGNORE INTO " + UPDATE_TABLE_NAME + " VALUES(" + tableId + ", 0)");
final String tableName = mTableNames[tableId];
StringBuilder stringBuilder = new StringBuilder();
for (String trigger : TRIGGERS) {
stringBuilder.setLength(0);
stringBuilder.append("CREATE TEMP TRIGGER IF NOT EXISTS ");
appendTriggerName(stringBuilder, tableName, trigger);
stringBuilder.append(" AFTER ")
.append(trigger)
.append(" ON `")
.append(tableName)
.append("` BEGIN UPDATE ")
.append(UPDATE_TABLE_NAME)
.append(" SET ").append(INVALIDATED_COLUMN_NAME).append(" = 1")
.append(" WHERE ").append(TABLE_ID_COLUMN_NAME).append(" = ").append(tableId)
.append(" AND ").append(INVALIDATED_COLUMN_NAME).append(" = 0")
.append("; END");
writableDb.execSQL(stringBuilder.toString());
}
}
这样一来,完整的支持LiveData的观察数据库变化的功能实现思路就完全理通了。
refreshVersionsSync
第二个地方是refreshVersionsSync,明显是一个同步方法,目前只有LimitOffsetDataSource会调用,鉴于LimitOffsetDataSource没有实际使用过,就不分析了。
/**
* Check versions for tables, and run observers synchronously if tables have been updated.
*
* @hide
*/
@RestrictTo(RestrictTo.Scope.LIBRARY_GROUP_PREFIX)
@WorkerThread
public void refreshVersionsSync() {
if (mAutoCloser != null) {
// This increment call must be matched with a corresponding call in mRefreshRunnable.
mAutoCloser.incrementCountAndEnsureDbIsOpen();
}
syncTriggers();
mRefreshRunnable.run();
}
distinctUntilChanged
我们注意到,ViewModel中返回Room的LiveData时,还有个distinctUntilChanged方法,这是一个扩展函数,源码如下
/**
* Creates a new {@link LiveData} object that does not emit a value until the source LiveData
* value has been changed. The value is considered changed if {@code equals()} yields
* {@code false}.
*
* @param source the input {@link LiveData}
* @param <X> the generic type parameter of {@code source}
* @return a new {@link LiveData} of type {@code X}
*/
@MainThread
@NonNull
public static <X> LiveData<X> distinctUntilChanged(@NonNull LiveData<X> source) {
final MediatorLiveData<X> outputLiveData = new MediatorLiveData<>();
outputLiveData.addSource(source, new Observer<X>() {
boolean mFirstTime = true;
@Override
public void onChanged(X currentValue) {
final X previousValue = outputLiveData.getValue();
if (mFirstTime
|| (previousValue == null && currentValue != null)
|| (previousValue != null && !previousValue.equals(currentValue))) {
mFirstTime = false;
outputLiveData.setValue(currentValue);
}
}
});
return outputLiveData;
}
可以看到distinctUntilChanged使用MediatorLiveData返回一个新的LiveData对象,并且利用onChanged方法,在第一次或Old LiveData对象更新的value与当前New LiveData对象的值不同时,才调用New LiveData对象的setValue方法。
小结
Room除了支持LiveData,还支持Flow,RxJava,但实现观察数据库的功能本质上都是一样的,都是使用触发器实现,这里画几张图加深理解。
图1
图1表示,View去观察Room返回的LiveData,LiveData的值是XXX对应的数据表。
图2
图3
图2和图3表示触发器的创建(包含移除)过程。
图4
图4表示,执行数据库增删改等SQL时,会先触发触发器的相关操作(主要是在room_table_modification_log表中记录发生”UPDATE“, “DELETE“, “INSERT“操作的数据表的id和状态),待数据库事务结束后,查询room_table_modification_log中是否存在发生变动的表,如果有,则回调对应Observer的onInvaliDated方法,执行对应的mComputeFunction(XXXDao_Impl的createLiveData方法传入的computeFunction,一般是查询表数据),并将mComputeFunction.call()返回的结果通过LiveData的postValue更新。
2022年02月08日补充
为避免误导大家,摘抄了Android官方最新的LiveData文档以供参考:
您可能会想在数据层类中使用
LiveData
对象,但LiveData
并不适合用于处理异步数据流。虽然您可以使用LiveData
转换和 MediatorLiveData 来实现此目的,但此方法的缺点在于:用于组合数据流的功能非常有限,并且所有LiveData
对象(包括通过转换创建的对象)都会在主线程中观察到。如果您需要在应用的其他层中使用数据流,请考虑使用 Kotlin Flow,然后使用 asLiveData() 在
ViewModel
中将 Kotlin Flow 转换成LiveData
。如需详细了解如何搭配使用 KotlinFlow
与LiveData
,请学习此 Codelab。对于使用 Java 构建的代码库,请考虑将执行器与回调或RxJava
结合使用。
可以看到,官方不再推荐在界面层之外的地方使用LiveData, Java用户建议将执行器与回调或 RxJava
结合使用, Kotlin 用户请考虑使用 Kotlin Flow 。
当时还在Google的JakeWharton 大神也不推荐LiveData配合Room使用,因为 LiveData 无法处理查询过程中产生的异常,而且按照大神的说法,他自己Never used LiveData。类似还有Retrofit配合LiveData的使用场景,也不推荐这么干!
LiveData
具有生命周期感知能力,遵循 activity 和 fragment 等实体的生命周期。我们可以使用 LiveData
在这些生命周期所有者和生命周期不同的其他对象(例如 ViewModel
对象)之间传递数据。
ViewModel
的主要责任是加载和管理与界面相关的数据,因此非常适合作为用于保留 LiveData
对象的备选方法。我们可以在 ViewModel
中创建 LiveData
对象,然后使用这些对象向界面层公开状态。
activity 和 fragment 不应保留 LiveData
实例,因为它们的用途是显示数据,而不是保持状态。
综上所述,笔者只建议在ViewModel使用LiveData,然而,你有可能使用了基于LiveData实现的事件总线框架:比如SingleLiveEvent,LiveDataBus等,或者干脆在后台线程中调用用 MutableLiveData.postValue()
与界面层通信,无所谓了,生命在于折腾,只要你知其然知其所以然,LiveData用的舒服又有什么关系呢?
当然,有小伙伴已经在ViewModel上用上StateFlow了,那你很牛逼,不过LiveData和StateFlow不是二选一的,选择适合你的即可。
写在最后,首先非常感谢您耐心阅读完整篇文章,坚持写原创且基于实战的文章不是件容易的事,如果本文刚好对您有点帮助,欢迎您给文章点赞评论,您的鼓励是笔者坚持不懈的动力。若文章有不对之处也欢迎指正,再次感谢。
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/116886.html