Android Architecture ComponentのRoomのDao で、RxJavaのFlowable
を戻り値とした場合と、Kotlin CoroutinesのFlow
を戻り値とした場合の挙動差異についてメモします。
前提
検証は以下の環境で行っています。
- Android Studio 4.1.1
- Android Gradle Plugin 4.0.2
- Room 2.2.5
検証コードはこちらです。
GitHub - n-seki/SampleCompareRxCoroutinesRoom: Sample App to compare rx and coroutines room behavior
Flowable と Flow
以下のようなDaoメソッドを考えます。
@Query("SELECT * FROM Books WHERE id = :id") fun findById(id: Long): Flowable<BookEntity>
id
が一致するBooksテーブルのレコードを1件取得していて、テーブルデータに変更があった場合にはFlowableに新しいデータが通知されるようなメソッドです。
(id
はPrimaryKey
としているため重複は考えません)
これをKotlin CoroutinesのFlow
に書き換えます。Flowable
がFlow
に変わっただけで、挙動的な差異はないように見えます。
@Query("SELECT * FROM Books WHERE id = :id") fun findById(id: Long): Flow<BookEntity>
が、Booksテーブルにid
が一致するレコードが存在しない場合、
Flowable
-> データは何も通知されないFlow
-> nullが通知される
と、異なる挙動となります。
RxJava
ではnullが扱えないことを考えると当然の結果ではありますが、なぜこんな差異が生まれるのを確認しておきます。
Roomが生成するコード
テーブルから取得したデータをFlowable
に流す処理を見ます。
return RxRoom.createFlowable(__db, false, new String[]{"Books"}, new Callable<BookEntity>() { @Override public BookEntity call() throws Exception { final Cursor _cursor = DBUtil.query(__db, _statement, false, null); try { final int _cursorIndexOfId = CursorUtil.getColumnIndexOrThrow(_cursor, "id"); final int _cursorIndexOfTitle = CursorUtil.getColumnIndexOrThrow(_cursor, "title"); final int _cursorIndexOfAmount = CursorUtil.getColumnIndexOrThrow(_cursor, "amount"); final BookEntity _result; if(_cursor.moveToFirst()) { final String _tmpId; _tmpId = _cursor.getString(_cursorIndexOfId); final String _tmpTitle; _tmpTitle = _cursor.getString(_cursorIndexOfTitle); final double _tmpAmount; _tmpAmount = _cursor.getDouble(_cursorIndexOfAmount); _result = new BookEntity(_tmpId,_tmpTitle,_tmpAmount); } else { _result = null; } return _result; } finally { _cursor.close(); } }
Flow
の場合にはRxRoom.createFlowable
の部分がCoroutinesRoom.createFlow
に変わりますが、ロジック自体に差異はありません。
ここから、クエリ結果がnullの場合にも処理は中断しないことが分かります。RxRoom.createFlowable
はこんな実装になっていて、クエリを実行するCallable
をMaybe
に変換しflatMapMaybe
の戻り値にしています。
@RestrictTo(RestrictTo.Scope.LIBRARY_GROUP_PREFIX) public static <T> Flowable<T> createFlowable(final RoomDatabase database, final boolean inTransaction, final String[] tableNames, final Callable<T> callable) { Scheduler scheduler = Schedulers.from(getExecutor(database, inTransaction)); final Maybe<T> maybe = Maybe.fromCallable(callable); return createFlowable(database, tableNames) .subscribeOn(scheduler) .unsubscribeOn(scheduler) .observeOn(scheduler) .flatMapMaybe(new Function<Object, MaybeSource<T>>() { @Override public MaybeSource<T> apply(Object o) throws Exception { return maybe; } }); }
ここでやっていることは、テーブル(ここではBooks
テーブル)に変更があったときにMaybe
を介してCallableの処理(ここではfindById
クエリ処理)を実行して結果を流すFlowable
の作成です。
Maybe.fromCallable
では、callableの実行結果がnullの場合にはMaybeObserver::onSuccess
ではなく、MaybeObserver::onComplete
が呼ばれます。これによりnullがFlowable
に流れないため、後続処理ではnullが流れてくることを考慮する必要はありません。
一方のCoroutinesRoom.createFlow
は以下のような実装になっています。
@JvmStatic fun <R> createFlow( db: RoomDatabase, inTransaction: Boolean, tableNames: Array<String>, callable: Callable<R> ): Flow<@JvmSuppressWildcards R> = flow { // Observer channel receives signals from the invalidation tracker to emit queries. val observerChannel = Channel<Unit>(Channel.CONFLATED) val observer = object : InvalidationTracker.Observer(tableNames) { override fun onInvalidated(tables: MutableSet<String>) { observerChannel.offer(Unit) } } observerChannel.offer(Unit) // Initial signal to perform first query. val flowContext = coroutineContext val queryContext = if (inTransaction) db.transactionDispatcher else db.queryDispatcher withContext(queryContext) { db.invalidationTracker.addObserver(observer) try { // Iterate until cancelled, transforming observer signals to query results to // be emitted to the flow. for (signal in observerChannel) { val result = callable.call() withContext(flowContext) { emit(result) } } } finally { db.invalidationTracker.removeObserver(observer) } } }
ロジックの大枠は同じで、テーブルに変更があったらcallableを実行して、その結果をFlow
に流しています。callableの実行結果(つまりクエリ実行結果)がnullの場合にはnullをFlow
に流すことになります。
しかしDao
のメソッドの戻り値型はFlow<BookEntity>
と定義されているので、例えば後続処理で以下のような実装をしているとnull参照エラーが発生します。
return booksDao.findById("id").map { it.title } // it.titleでitがnullのために例外が発生します
Flow
のcatch
オペレーターを使えばアプリ全体のクラッシュは回避できますがFlow
はキャンセルされるので、このあとテーブルにレコードがインサートされてもデータは流れてきません。何にせよ、意図した挙動ではないでしょう。
修正
クエリ結果がnullになる可能性があるのならば、nullを許容する実装をするべきなので、上記Flowを使ったDaoメソッドは以下のようにすると実態と合うと思います。
@Query("SELECT * FROM Books WHERE id = :id") fun findById(id: Long): Flow<BookEntity?>
その上で、後続処理でFlow::mapNotNull
やFlow::filterNotNull
でnullを除外する、nullの場合には適切な例外処理を行うなどすれば良いでしょう。
まとめ
- RxJavaはそもそもnullを扱えないため、Daoのメソッドで
Flowable
を戻り値型にしている場合にはnullのことを考える必要はない(null値が流れてくることはない) Flow
を使った場合も一見同じ挙動になるように思えるが、nullは流れてくる- クエリ結果がnullになる可能性があるのならば
Flow<Entity?>
とnull許容するのが正しい
- クエリ結果がnullになる可能性があるのならば