Room Daoの戻り値 Flowable と Flow の違い

Android Architecture ComponentのRoomのDao で、RxJavaのFlowableを戻り値とした場合と、Kotlin CoroutinesのFlowを戻り値とした場合の挙動差異についてメモします。

前提

検証は以下の環境で行っています。

検証コードはこちらです。

GitHub - n-seki/SampleCompareRxCoroutinesRoom: Sample App to compare rx and coroutines room behavior

Flowable と Flow

以下のようなDaoメソッドを考えます。

@Query("SELECT * FROM Books WHERE id = :id")
fun findById(id: Long): Flowable<BookEntity>

idが一致するBooksテーブルのレコードを1件取得していて、テーブルデータに変更があった場合にはFlowableに新しいデータが通知されるようなメソッドです。 (idPrimaryKeyとしているため重複は考えません)

これをKotlin CoroutinesのFlowに書き換えます。FlowableFlowに変わっただけで、挙動的な差異はないように見えます。

@Query("SELECT * FROM Books WHERE id = :id")
fun findById(id: Long): Flow<BookEntity>

が、Booksテーブルにidが一致するレコードが存在しない場合、

  • Flowable -> データは何も通知されない
  • Flow -> nullが通知される

と、異なる挙動となります。

RxJavaではnullが扱えないことを考えると当然の結果ではありますが、なぜこんな差異が生まれるのを確認しておきます。

Roomが生成するコード

テーブルから取得したデータをFlowableに流す処理を見ます。

return RxRoom.createFlowable(__db, false, new String[]{"Books"}, new Callable<BookEntity>() {
  @Override
  public BookEntity call() throws Exception {
    final Cursor _cursor = DBUtil.query(__db, _statement, false, null);
    try {
      final int _cursorIndexOfId = CursorUtil.getColumnIndexOrThrow(_cursor, "id");
      final int _cursorIndexOfTitle = CursorUtil.getColumnIndexOrThrow(_cursor, "title");
      final int _cursorIndexOfAmount = CursorUtil.getColumnIndexOrThrow(_cursor, "amount");
      final BookEntity _result;
      if(_cursor.moveToFirst()) {
        final String _tmpId;
        _tmpId = _cursor.getString(_cursorIndexOfId);
        final String _tmpTitle;
        _tmpTitle = _cursor.getString(_cursorIndexOfTitle);
        final double _tmpAmount;
        _tmpAmount = _cursor.getDouble(_cursorIndexOfAmount);
        _result = new BookEntity(_tmpId,_tmpTitle,_tmpAmount);
      } else {
        _result = null;
      }
      return _result;
    } finally {
      _cursor.close();
    }
  }

Flowの場合にはRxRoom.createFlowableの部分がCoroutinesRoom.createFlowに変わりますが、ロジック自体に差異はありません。

ここから、クエリ結果がnullの場合にも処理は中断しないことが分かります。RxRoom.createFlowableはこんな実装になっていて、クエリを実行するCallableMaybeに変換しflatMapMaybeの戻り値にしています。

https://android.googlesource.com/platform/frameworks/support/+/refs/heads/androidx-master-dev/room/rxjava2/src/main/java/androidx/room/RxRoom.java?autodive=0%2F%2F%2F%2F#68

@RestrictTo(RestrictTo.Scope.LIBRARY_GROUP_PREFIX)
public static <T> Flowable<T> createFlowable(final RoomDatabase database,
        final boolean inTransaction, final String[] tableNames, final Callable<T> callable) {
    Scheduler scheduler = Schedulers.from(getExecutor(database, inTransaction));
    final Maybe<T> maybe = Maybe.fromCallable(callable);
    return createFlowable(database, tableNames)
            .subscribeOn(scheduler)
            .unsubscribeOn(scheduler)
            .observeOn(scheduler)
            .flatMapMaybe(new Function<Object, MaybeSource<T>>() {
                @Override
                public MaybeSource<T> apply(Object o) throws Exception {
                    return maybe;
                }
            });
}

ここでやっていることは、テーブル(ここではBooksテーブル)に変更があったときにMaybeを介してCallableの処理(ここではfindByIdクエリ処理)を実行して結果を流すFlowableの作成です。

Maybe.fromCallableでは、callableの実行結果がnullの場合にはMaybeObserver::onSuccessではなく、MaybeObserver::onCompleteが呼ばれます。これによりnullがFlowableに流れないため、後続処理ではnullが流れてくることを考慮する必要はありません。

一方のCoroutinesRoom.createFlowは以下のような実装になっています。

https://android.googlesource.com/platform/frameworks/support/+/refs/heads/androidx-master-dev/room/ktx/src/main/java/androidx/room/CoroutinesRoom.kt#99

@JvmStatic
fun <R> createFlow(
    db: RoomDatabase,
    inTransaction: Boolean,
    tableNames: Array<String>,
    callable: Callable<R>
): Flow<@JvmSuppressWildcards R> = flow {
    // Observer channel receives signals from the invalidation tracker to emit queries.
    val observerChannel = Channel<Unit>(Channel.CONFLATED)
    val observer = object : InvalidationTracker.Observer(tableNames) {
        override fun onInvalidated(tables: MutableSet<String>) {
            observerChannel.offer(Unit)
        }
    }
    observerChannel.offer(Unit) // Initial signal to perform first query.
    val flowContext = coroutineContext
    val queryContext = if (inTransaction) db.transactionDispatcher else db.queryDispatcher
    withContext(queryContext) {
        db.invalidationTracker.addObserver(observer)
        try {
            // Iterate until cancelled, transforming observer signals to query results to
            // be emitted to the flow.
            for (signal in observerChannel) {
                val result = callable.call()
                withContext(flowContext) { emit(result) }
            }
        } finally {
            db.invalidationTracker.removeObserver(observer)
        }
    }
}

ロジックの大枠は同じで、テーブルに変更があったらcallableを実行して、その結果をFlowに流しています。callableの実行結果(つまりクエリ実行結果)がnullの場合にはnullをFlowに流すことになります。

しかしDaoのメソッドの戻り値型はFlow<BookEntity>と定義されているので、例えば後続処理で以下のような実装をしているとnull参照エラーが発生します。

return booksDao.findById("id").map { it.title }  // it.titleでitがnullのために例外が発生します

Flowcatchオペレーターを使えばアプリ全体のクラッシュは回避できますがFlowはキャンセルされるので、このあとテーブルにレコードがインサートされてもデータは流れてきません。何にせよ、意図した挙動ではないでしょう。

修正

クエリ結果がnullになる可能性があるのならば、nullを許容する実装をするべきなので、上記Flowを使ったDaoメソッドは以下のようにすると実態と合うと思います。

@Query("SELECT * FROM Books WHERE id = :id")
fun findById(id: Long): Flow<BookEntity?>

その上で、後続処理でFlow::mapNotNullFlow::filterNotNullでnullを除外する、nullの場合には適切な例外処理を行うなどすれば良いでしょう。

まとめ

  • RxJavaはそもそもnullを扱えないため、DaoのメソッドでFlowableを戻り値型にしている場合にはnullのことを考える必要はない(null値が流れてくることはない)
  • Flowを使った場合も一見同じ挙動になるように思えるが、nullは流れてくる
    • クエリ結果がnullになる可能性があるのならばFlow<Entity?>とnull許容するのが正しい