sqlbrite源码解析

sqlbrite是square公司又一开源产品,当前版本是0.6.3。正如其简介所说,sqlbrite是对Android中SQLiteOpenHelper的轻量级包装,可以响应式的对SQL语句进行操作。本文关注响应式这个词,来对sqlbrite的源码进行解析。

上帝BriteDatabase

BriteDatabase sqlbrite中对外的主要接口。我们需要这样来创建它:

BriteDatabase db =SqlBrite.create().wrapDatabaseHelper(dbOpenHelper)

其中的dbOpenHelper 就是Android原生提供的SQLiteOpenHelper类。我们追下源码:

public static SqlBrite create() {
	 return create(new Logger() {
	     @Override
	     public void log(String message) {
	        Log.d("SqlBrite", message);
	     }
	  });
}

SqlBrite.create()只是一个简单的工厂,返回了Sqlbrite类的实例,值得注意的是这个实例中默认添加了logger,我们可以实现Sqlbrite.Logger来添加自己的log输出方式。继续追wrapDatabaseHelper方法:

@CheckResult @NonNull
public BriteDatabase wrapDatabaseHelper(@NonNull SQLiteOpenHelper helper) {
    return new BriteDatabase(helper, logger);
}

可见它也只是返回了BriteDatabase 对象实例,我们看下构造函数:

BriteDatabase(@NonNull SQLiteOpenHelper helper, @NonNull SqlBrite.Logger logger) {
	this.helper = helper;
	this.logger = logger;
}

也只是记录了SQLiteOpenHelperLogger ,这里并没有什么高级的代码。高级的是BriteDatabase中对事务的管理与数据查询的处理方式。

事务

事务是BriteDatabase中比较核心处理的概念,因为要保证前文说的响应式,必须对事务也是原子性的响应。我们来看BriteDatabase是如何做到的。

要想开启一个事务,只需调用newTransaction方法即可:

public Transaction newTransaction() {
	 SqliteTransaction transaction = new SqliteTransaction(transactions.get());
	    transactions.set(transaction);
	    if (logging) log("TXN BEGIN %s", transaction);
	     	 getWriteableDatabase().beginTransactionWithListener(transaction);
	return this.transaction;
}

这个函数短短四行,但理解起来不是特别容易,其中SqliteTransactionBriteDatabase 的内部类,它继承了LinkedHashSet,实现了SQLiteTransactionListener,构造函数如下:

SqliteTransaction(SqliteTransaction parent) {
    this.parent = parent;
}

它内部维护了一个parent变量,这个变量再后文中会提到作用。SqliteTransaction中也维护了一个bool变量commit,这个值在SQLiteTransactionListener的onCommit方法回调中被置为true,用于表示这个transaction是否成功。

transactions是BriteDatabase的成员变量,使用ThreadLocal维护了一个SqliteTransaction,保证线程间的隔离。所以前两句就是生成SqliteTransaction对象,并置于transactions中。

第四行首先调用了getWriteableDatabase 函数,这个函数如下:

SQLiteDatabase getWriteableDatabase() {
	  SQLiteDatabase db = writeableDatabase;
	  if (db == null) {
	      synchronized (databaseLock) {
	        db = writeableDatabase;
	        if (db == null) {
	          if (logging) log("Creating writeable database");
	          db = writeableDatabase = helper.getWritableDatabase();
	        }
	      }
	  }
	  return db;
}

这个函数使用了双重校验锁机制来返回dbhelper中的SQLiteDatabase对象。

第四行又继续调用了SQLiteDatabase 的beginTransactionWithListener函数,并传入了transaction对象,前面说过SqliteTransaction实现了SQLiteTransactionListener,这里将其作为事务的listener。

最后是返回this.transaction,等等,是this.transaction,不是刚刚new出的transaction对象,那这个乱入的this.transaction是个什么鬼呢?

首先,transaction是Transaction的一个实现,Transaction是BriteDatabase中的一个接口,拥有end()markSuccessful()yieldIfContendedSafely()yieldIfContendedSafely()close()五个方法,实际上就是SQLiteDatabase原生事务的一个抽象。 其中markSuccessful()yieldIfContendedSafely()yieldIfContendedSafely()三个方法都只是拿到SQLiteOpenHelper中的db进行对应的操作。这里有意思的是end函数:

 @Override
 public void end() {
      SqliteTransaction transaction = transactions.get();
      if (transaction == null) {
        throw new IllegalStateException("Not in transaction.");
      }
      SqliteTransaction newTransaction = transaction.parent;
      transactions.set(newTransaction);
      if (logging) log("TXN END %s", transaction);
      getWriteableDatabase().endTransaction();
      // Send the triggers after ending the transaction in the DB.
      if (transaction.commit) {
        sendTableTrigger(transaction);
      }
 }

end函数是必须被调用的,相当于对db的endTransaction()函数做了一层代理。它首先拿出transaction对象,并将transactions对象设为transaction的parent,然后调用真实的endTransaction(),最后,当事务成功了,会调用sendTableTrigger(transaction),这个函数是什么呢?看名字应该可以知道它是一个触发器,没错,它就是sqlbrite实现响应式的重要一步!

触发器

sendTableTrigger函数的源码如下:

private void sendTableTrigger(Set<String> tables) {
    SqliteTransaction transaction = transactions.get();
    if (transaction != null) {
      transaction.addAll(tables);
    } else {
      if (logging) log("TRIGGER %s", tables);
      triggers.onNext(tables);
    }
}

它首先会先判断当前的transactions是否有值,有值说明当前还有事务正在执行,此时只将数据进行存储。等事务结束后一同出发。如果transactions没有值,就触发triggers的onNext函数,那这个triggers是什么呢?它就是Rxjava中大名鼎鼎的PublishSubject:

private final PublishSubject<Set<String>> triggers = PublishSubject.create();

原来sqlbrite是用PublishSubject来实现数据的及时更新,PublishSubject是一种冷的Observable,每次调用它的onNext时就会主动触发观察者。

其实不仅事务,BriteDatabase中对insert、delete、update等函数的调用也会触发sendTableTrigger函数。

所以我们可以想象,当我们订阅了这个subject,每当有db有数据更新,我们就会收到新数据的推送。实现一次订阅,永久查收的效果。

查询

那如何实现订阅呢?对于数据库而言这就是查询操作。 sqlbrite中创建查询的方法是createQuery,源码如下:

private QueryObservable createQuery(final Func1<Set<String>, Boolean> tableFilter,
      final String sql, final String... args) {
    if (transactions.get() != null) {
      throw new IllegalStateException("Cannot create observable query in transaction. "
          + "Use query() for a query inside a transaction.");
    }

    final Query query = new Query() {
      @Override public Cursor run() {
        if (transactions.get() != null) {
          throw new IllegalStateException("Cannot execute observable query in a transaction.");
        }
        return getReadableDatabase().rawQuery(sql, args);
      }

      @Override public String toString() {
        return sql;
      }
    };

    Observable<Query> queryObservable = triggers //
        .filter(tableFilter) // Only trigger on tables we care about.
        .startWith(INITIAL_TRIGGER) // Immediately execute the query for initial value.
        .map(new Func1<Set<String>, Query>() {
          @Override public Query call(Set<String> trigger) {
            if (transactions.get() != null) {
              throw new IllegalStateException(
                  "Cannot subscribe to observable query in a transaction.");
            }
            if (logging) {
              log("QUERY\n  trigger: %s\n  tables: %s\n  sql: %s\n  args: %s", trigger, tableFilter,
                  sql, Arrays.toString(args));
            }
            return query;
          }
        }) //
        .onBackpressureLatest();
    return new QueryObservable(queryObservable);
}

代码稍多,但比较好理解,首先,构造了一个Query变量,这个Query变量就是sqlbrite对查询的一个抽象,接着我们看到了熟悉的triggers对象,通过一系列的变换,得到Observable对象queryObservable,最后,将queryObservable置于QueryObservable的构造函数中,并返回了一个QueryObservable对象。那这个QueryObservable是何方神圣呢?

其实它就是Observable的子类,但对Observable做了一些本土化的扩展,增加了如下几个方法:

1.public final <T> Observable<T> mapToOne(@NonNull Func1<Cursor, T> mapper)

2.public final <T> Observable<T> mapToOneOrDefault(@NonNull Func1<Cursor, T> mapper,
      T defaultValue)

3.public final <T> Observable<List<T>> mapToList(@NonNull Func1<Cursor, T> mapper)

简单来说,这三个函数就是通过lift操作来将cursor变换成我们想要的数据,可以是业务实体亦或数据代理。

综上,一个比较标准的查询方法如下:

mDb.createQuery("TABLE_NAME", "SELECT * FROM TABLE_NAME")
        .mapToList((cursor, entity) -> cursor2entity(cursor))
        .distinct()
        .observeOn(AndroidSchedulers.mainThread())
        .subscribeOn(Schedulers.io())
        .subscribe(() -> {
        });

如此,就是sqlbrite实现响应式的主要思路了。

优缺点