public class

RxRoom

extends java.lang.Object

 java.lang.Object

↳androidx.room.RxRoom

Gradle dependencies

compile group: 'androidx.room', name: 'room-rxjava2', version: '2.5.0-alpha01'

  • groupId: androidx.room
  • artifactId: room-rxjava2
  • version: 2.5.0-alpha01

Artifact androidx.room:room-rxjava2:2.5.0-alpha01 it located at Google repository (https://maven.google.com/)

Androidx artifact mapping:

androidx.room:room-rxjava2 android.arch.persistence.room:rxjava2

Androidx class mapping:

androidx.room.RxRoom android.arch.persistence.room.RxRoom

Overview

Helper class to add RxJava2 support to Room.

Summary

Fields
public static final java.lang.ObjectNOTHING

Data dispatched by the publisher created by RxRoom.createFlowable(RoomDatabase, String...).

Constructors
publicRxRoom()

Methods
public static <any>createFlowable(RoomDatabase database, boolean inTransaction, java.lang.String tableNames[], java.util.concurrent.Callable<java.lang.Object> callable)

Helper method used by generated code to bind a Callable such that it will be run in our disk io thread and will automatically block null values since RxJava2 does not like null.

public static <any>createFlowable(RoomDatabase database, java.lang.String tableNames[])

Creates a that emits at least once and also re-emits whenever one of the observed tables is updated.

public static <any>createFlowable(RoomDatabase database, java.lang.String tableNames[], java.util.concurrent.Callable<java.lang.Object> callable)

Helper method used by generated code to bind a Callable such that it will be run in our disk io thread and will automatically block null values since RxJava2 does not like null.

public static <any>createObservable(RoomDatabase database, boolean inTransaction, java.lang.String tableNames[], java.util.concurrent.Callable<java.lang.Object> callable)

Helper method used by generated code to bind a Callable such that it will be run in our disk io thread and will automatically block null values since RxJava2 does not like null.

public static <any>createObservable(RoomDatabase database, java.lang.String tableNames[])

Creates a that emits at least once and also re-emits whenever one of the observed tables is updated.

public static <any>createObservable(RoomDatabase database, java.lang.String tableNames[], java.util.concurrent.Callable<java.lang.Object> callable)

Helper method used by generated code to bind a Callable such that it will be run in our disk io thread and will automatically block null values since RxJava2 does not like null.

public static <any>createSingle(java.util.concurrent.Callable<java.lang.Object> callable)

Helper method used by generated code to create a Single from a Callable that will ignore the EmptyResultSetException if the stream is already disposed.

from java.lang.Objectclone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait

Fields

public static final java.lang.Object NOTHING

Data dispatched by the publisher created by RxRoom.createFlowable(RoomDatabase, String...).

Constructors

public RxRoom()

Deprecated: This type should not be instantiated as it contains only static methods.

Methods

public static <any> createFlowable(RoomDatabase database, java.lang.String tableNames[])

Creates a that emits at least once and also re-emits whenever one of the observed tables is updated.

You can easily chain a database operation to downstream of this to ensure that it re-runs when database is modified.

Since database invalidation is batched, multiple changes in the database may results in just 1 emission.

Parameters:

database: The database instance
tableNames: The list of table names that should be observed

Returns:

A which emits RxRoom.NOTHING when one of the observed tables is modified (also once when the invalidation tracker connection is established).

public static <any> createFlowable(RoomDatabase database, java.lang.String tableNames[], java.util.concurrent.Callable<java.lang.Object> callable)

Deprecated: Use RxRoom.createFlowable(RoomDatabase, boolean, String[], Callable)

Helper method used by generated code to bind a Callable such that it will be run in our disk io thread and will automatically block null values since RxJava2 does not like null.

public static <any> createFlowable(RoomDatabase database, boolean inTransaction, java.lang.String tableNames[], java.util.concurrent.Callable<java.lang.Object> callable)

Helper method used by generated code to bind a Callable such that it will be run in our disk io thread and will automatically block null values since RxJava2 does not like null.

public static <any> createObservable(RoomDatabase database, java.lang.String tableNames[])

Creates a that emits at least once and also re-emits whenever one of the observed tables is updated.

You can easily chain a database operation to downstream of this to ensure that it re-runs when database is modified.

Since database invalidation is batched, multiple changes in the database may results in just 1 emission.

Parameters:

database: The database instance
tableNames: The list of table names that should be observed

Returns:

A which emits RxRoom.NOTHING when one of the observed tables is modified (also once when the invalidation tracker connection is established).

public static <any> createObservable(RoomDatabase database, java.lang.String tableNames[], java.util.concurrent.Callable<java.lang.Object> callable)

Deprecated: Use RxRoom.createObservable(RoomDatabase, boolean, String[], Callable)

Helper method used by generated code to bind a Callable such that it will be run in our disk io thread and will automatically block null values since RxJava2 does not like null.

public static <any> createObservable(RoomDatabase database, boolean inTransaction, java.lang.String tableNames[], java.util.concurrent.Callable<java.lang.Object> callable)

Helper method used by generated code to bind a Callable such that it will be run in our disk io thread and will automatically block null values since RxJava2 does not like null.

public static <any> createSingle(java.util.concurrent.Callable<java.lang.Object> callable)

Helper method used by generated code to create a Single from a Callable that will ignore the EmptyResultSetException if the stream is already disposed.

Source

/*
 * Copyright (C) 2017 The Android Open Source Project
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package androidx.room;

import androidx.annotation.RestrictTo;

import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.Executor;

import io.reactivex.BackpressureStrategy;
import io.reactivex.Flowable;
import io.reactivex.FlowableEmitter;
import io.reactivex.FlowableOnSubscribe;
import io.reactivex.Maybe;
import io.reactivex.MaybeSource;
import io.reactivex.Observable;
import io.reactivex.ObservableEmitter;
import io.reactivex.ObservableOnSubscribe;
import io.reactivex.Scheduler;
import io.reactivex.Single;
import io.reactivex.SingleEmitter;
import io.reactivex.SingleOnSubscribe;
import io.reactivex.disposables.Disposables;
import io.reactivex.functions.Action;
import io.reactivex.functions.Function;
import io.reactivex.schedulers.Schedulers;

/**
 * Helper class to add RxJava2 support to Room.
 */
@SuppressWarnings("WeakerAccess")
public class RxRoom {
    /**
     * Data dispatched by the publisher created by {@link #createFlowable(RoomDatabase, String...)}.
     */
    public static final Object NOTHING = new Object();

    /**
     * Creates a {@link Flowable} that emits at least once and also re-emits whenever one of the
     * observed tables is updated.
     * <p>
     * You can easily chain a database operation to downstream of this {@link Flowable} to ensure
     * that it re-runs when database is modified.
     * <p>
     * Since database invalidation is batched, multiple changes in the database may results in just
     * 1 emission.
     *
     * @param database   The database instance
     * @param tableNames The list of table names that should be observed
     * @return A {@link Flowable} which emits {@link #NOTHING} when one of the observed tables
     * is modified (also once when the invalidation tracker connection is established).
     */
    public static Flowable<Object> createFlowable(final RoomDatabase database,
            final String... tableNames) {
        return Flowable.create(new FlowableOnSubscribe<Object>() {
            @Override
            public void subscribe(final FlowableEmitter<Object> emitter) throws Exception {
                final InvalidationTracker.Observer observer = new InvalidationTracker.Observer(
                        tableNames) {
                    @Override
                    public void onInvalidated(@androidx.annotation.NonNull Set<String> tables) {
                        if (!emitter.isCancelled()) {
                            emitter.onNext(NOTHING);
                        }
                    }
                };
                if (!emitter.isCancelled()) {
                    database.getInvalidationTracker().addObserver(observer);
                    emitter.setDisposable(Disposables.fromAction(new Action() {
                        @Override
                        public void run() throws Exception {
                            database.getInvalidationTracker().removeObserver(observer);
                        }
                    }));
                }

                // emit once to avoid missing any data and also easy chaining
                if (!emitter.isCancelled()) {
                    emitter.onNext(NOTHING);
                }
            }
        }, BackpressureStrategy.LATEST);
    }

    /**
     * Helper method used by generated code to bind a Callable such that it will be run in
     * our disk io thread and will automatically block null values since RxJava2 does not like null.
     *
     * @deprecated Use {@link #createFlowable(RoomDatabase, boolean, String[], Callable)}
     *
     * @hide
     */
    @Deprecated
    @RestrictTo(RestrictTo.Scope.LIBRARY_GROUP_PREFIX)
    public static <T> Flowable<T> createFlowable(final RoomDatabase database,
            final String[] tableNames, final Callable<T> callable) {
        return createFlowable(database, false, tableNames, callable);
    }

    /**
     * Helper method used by generated code to bind a Callable such that it will be run in
     * our disk io thread and will automatically block null values since RxJava2 does not like null.
     *
     * @hide
     */
    @RestrictTo(RestrictTo.Scope.LIBRARY_GROUP_PREFIX)
    public static <T> Flowable<T> createFlowable(final RoomDatabase database,
            final boolean inTransaction, final String[] tableNames, final Callable<T> callable) {
        Scheduler scheduler = Schedulers.from(getExecutor(database, inTransaction));
        final Maybe<T> maybe = Maybe.fromCallable(callable);
        return createFlowable(database, tableNames)
                .subscribeOn(scheduler)
                .unsubscribeOn(scheduler)
                .observeOn(scheduler)
                .flatMapMaybe(new Function<Object, MaybeSource<T>>() {
                    @Override
                    public MaybeSource<T> apply(Object o) throws Exception {
                        return maybe;
                    }
                });
    }

    /**
     * Creates a {@link Observable} that emits at least once and also re-emits whenever one of the
     * observed tables is updated.
     * <p>
     * You can easily chain a database operation to downstream of this {@link Observable} to ensure
     * that it re-runs when database is modified.
     * <p>
     * Since database invalidation is batched, multiple changes in the database may results in just
     * 1 emission.
     *
     * @param database   The database instance
     * @param tableNames The list of table names that should be observed
     * @return A {@link Observable} which emits {@link #NOTHING} when one of the observed tables
     * is modified (also once when the invalidation tracker connection is established).
     */
    public static Observable<Object> createObservable(final RoomDatabase database,
            final String... tableNames) {
        return Observable.create(new ObservableOnSubscribe<Object>() {
            @Override
            public void subscribe(final ObservableEmitter<Object> emitter) throws Exception {
                final InvalidationTracker.Observer observer = new InvalidationTracker.Observer(
                        tableNames) {
                    @Override
                    public void onInvalidated(@androidx.annotation.NonNull Set<String> tables) {
                        emitter.onNext(NOTHING);
                    }
                };
                database.getInvalidationTracker().addObserver(observer);
                emitter.setDisposable(Disposables.fromAction(new Action() {
                    @Override
                    public void run() throws Exception {
                        database.getInvalidationTracker().removeObserver(observer);
                    }
                }));

                // emit once to avoid missing any data and also easy chaining
                emitter.onNext(NOTHING);
            }
        });
    }

    /**
     * Helper method used by generated code to bind a Callable such that it will be run in
     * our disk io thread and will automatically block null values since RxJava2 does not like null.
     *
     * @deprecated Use {@link #createObservable(RoomDatabase, boolean, String[], Callable)}
     *
     * @hide
     */
    @Deprecated
    @RestrictTo(RestrictTo.Scope.LIBRARY_GROUP_PREFIX)
    public static <T> Observable<T> createObservable(final RoomDatabase database,
            final String[] tableNames, final Callable<T> callable) {
        return createObservable(database, false, tableNames, callable);
    }

    /**
     * Helper method used by generated code to bind a Callable such that it will be run in
     * our disk io thread and will automatically block null values since RxJava2 does not like null.
     *
     * @hide
     */
    @RestrictTo(RestrictTo.Scope.LIBRARY_GROUP_PREFIX)
    public static <T> Observable<T> createObservable(final RoomDatabase database,
            final boolean inTransaction, final String[] tableNames, final Callable<T> callable) {
        Scheduler scheduler = Schedulers.from(getExecutor(database, inTransaction));
        final Maybe<T> maybe = Maybe.fromCallable(callable);
        return createObservable(database, tableNames)
                .subscribeOn(scheduler)
                .unsubscribeOn(scheduler)
                .observeOn(scheduler)
                .flatMapMaybe(new Function<Object, MaybeSource<T>>() {
                    @Override
                    public MaybeSource<T> apply(Object o) throws Exception {
                        return maybe;
                    }
                });
    }

    /**
     * Helper method used by generated code to create a Single from a Callable that will ignore
     * the EmptyResultSetException if the stream is already disposed.
     *
     * @hide
     */
    @RestrictTo(RestrictTo.Scope.LIBRARY_GROUP_PREFIX)
    public static <T> Single<T> createSingle(final Callable<T> callable) {
        return Single.create(new SingleOnSubscribe<T>() {
            @Override
            public void subscribe(SingleEmitter<T> emitter) throws Exception {
                try {
                    emitter.onSuccess(callable.call());
                } catch (EmptyResultSetException e) {
                    emitter.tryOnError(e);
                }
            }
        });
    }

    private static Executor getExecutor(RoomDatabase database, boolean inTransaction) {
        if (inTransaction) {
            return database.getTransactionExecutor();
        } else {
            return database.getQueryExecutor();
        }
    }

    /** @deprecated This type should not be instantiated as it contains only static methods. */
    @Deprecated
    @SuppressWarnings("PrivateConstructorForUtilityClass")
    public RxRoom() {
    }
}