public abstract class

RxWorker

extends ListenableWorker

 java.lang.Object

androidx.work.ListenableWorker

↳androidx.work.rxjava3.RxWorker

Gradle dependencies

compile group: 'androidx.work', name: 'work-rxjava3', version: '2.8.0-alpha02'

  • groupId: androidx.work
  • artifactId: work-rxjava3
  • version: 2.8.0-alpha02

Artifact androidx.work:work-rxjava3:2.8.0-alpha02 it located at Google repository (https://maven.google.com/)

Overview

RxJava3 interoperability Worker implementation.

When invoked by the WorkManager, it will call @RxWorker.createWork() to get a Single subscribe to it.

By default, RxWorker will subscribe on the thread pool that runs WorkManager Workers. You can change this behavior by overriding RxWorker.getBackgroundScheduler() method.

An RxWorker is given a maximum of ten minutes to finish its execution and return a ListenableWorker.Result. After this time has expired, the worker will be signalled to stop.

Summary

Constructors
publicRxWorker(Context appContext, WorkerParameters workerParams)

Methods
public abstract <any>createWork()

Override this method to define your actual work and return a Single of ListenableWorker.Result which will be subscribed by the WorkManager.

protected SchedulergetBackgroundScheduler()

Returns the default background scheduler that RxWorker will use to subscribe.

public <any>getForegroundInfo()

Return an Single with an instance of ForegroundInfo if the WorkRequest is important to the user.

public <any>getForegroundInfoAsync()

Return an instance of ForegroundInfo if the WorkRequest is important to the user.

public voidonStopped()

This method is invoked when this Worker has been told to stop.

public final CompletablesetCompletableProgress(Data data)

Updates the progress for a RxWorker.

public final CompletablesetForeground(ForegroundInfo foregroundInfo)

This specifies that the WorkRequest is long-running or otherwise important.

public abstract <any>startWork()

Override this method to start your actual background processing.

from ListenableWorkergetApplicationContext, getBackgroundExecutor, getId, getInputData, getNetwork, getRunAttemptCount, getTags, getTaskExecutor, getTriggeredContentAuthorities, getTriggeredContentUris, getWorkerFactory, isStopped, isUsed, setForegroundAsync, setProgressAsync, setUsed, stop
from java.lang.Objectclone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait

Constructors

public RxWorker(Context appContext, WorkerParameters workerParams)

Parameters:

appContext: The application
workerParams: Parameters to setup the internal state of this worker

Methods

public abstract <any> startWork()

Override this method to start your actual background processing. This method is called on the main thread.

A ListenableWorker has a well defined execution window to to finish its execution and return a ListenableWorker.Result. After this time has expired, the worker will be signalled to stop and its will be cancelled.

The future will also be cancelled if this worker is stopped for any reason (see ListenableWorker.onStopped()).

Returns:

A ListenableFuture with the ListenableWorker.Result of the computation. If you cancel this Future, WorkManager will treat this unit of work as failed.

protected Scheduler getBackgroundScheduler()

Returns the default background scheduler that RxWorker will use to subscribe.

The default implementation returns a Scheduler that uses the java.util.concurrent.Executor which was provided in WorkManager's Configuration (or the default one it creates).

You can override this method to change the Scheduler used by RxWorker to start its subscription. It always observes the result of the in WorkManager's internal thread.

Returns:

The default .

public abstract <any> createWork()

Override this method to define your actual work and return a Single of ListenableWorker.Result which will be subscribed by the WorkManager.

If the returned Single fails, the worker will be considered as failed.

If the RxWorker is cancelled by the WorkManager (e.g. due to a constraint change), WorkManager will dispose the subscription immediately.

By default, subscription happens on the shared Worker pool. You can change it by overriding RxWorker.getBackgroundScheduler().

An RxWorker is given a maximum of ten minutes to finish its execution and return a ListenableWorker.Result. After this time has expired, the worker will be signalled to stop.

Returns:

a Single that represents the work.

public final Completable setCompletableProgress(Data data)

Updates the progress for a RxWorker. This method returns a similar to the ListenableWorker.setProgressAsync(Data) API.

Parameters:

data: The progress Data

Returns:

The

public void onStopped()

This method is invoked when this Worker has been told to stop. At this point, the returned by the instance of ListenableWorker.startWork() is also cancelled. This could happen due to an explicit cancellation signal by the user, or because the system has decided to preempt the task. In these cases, the results of the work will be ignored by WorkManager. All processing in this method should be lightweight - there are no contractual guarantees about which thread will invoke this call, so this should not be a long-running or blocking operation.

public <any> getForegroundInfoAsync()

Return an instance of ForegroundInfo if the WorkRequest is important to the user. In this case, WorkManager provides a signal to the OS that the process should be kept alive while this work is executing.

Prior to Android S, WorkManager manages and runs a foreground service on your behalf to execute the WorkRequest, showing the notification provided in the ForegroundInfo. To update this notification subsequently, the application can use .

Starting in Android S and above, WorkManager manages this WorkRequest using an immediate job.

Returns:

A of ForegroundInfo instance if the WorkRequest is marked immediate. For more information look at WorkRequest.Builder.setExpedited(OutOfQuotaPolicy).

public <any> getForegroundInfo()

Return an Single with an instance of ForegroundInfo if the WorkRequest is important to the user. In this case, WorkManager provides a signal to the OS that the process should be kept alive while this work is executing.

Prior to Android S, WorkManager manages and runs a foreground service on your behalf to execute the WorkRequest, showing the notification provided in the ForegroundInfo. To update this notification subsequently, the application can use .

Starting in Android S and above, WorkManager manages this WorkRequest using an immediate job.

Returns:

A of ForegroundInfo instance if the WorkRequest is marked immediate. For more information look at .

public final Completable setForeground(ForegroundInfo foregroundInfo)

This specifies that the WorkRequest is long-running or otherwise important. In this case, WorkManager provides a signal to the OS that the process should be kept alive if possible while this work is executing.

Calls to setForegroundAsync *must* complete before a RxWorker signals completion by returning a .

Under the hood, WorkManager manages and runs a foreground service on your behalf to execute this WorkRequest, showing the notification provided in ForegroundInfo.

Calling setForeground will fail with an java.lang.IllegalStateException when the process is subject to foreground service restrictions. Consider using and RxWorker.getForegroundInfo() instead.

Parameters:

foregroundInfo: The ForegroundInfo

Returns:

A which resolves after the RxWorker transitions to running in the context of a foreground .

Source

/*
 * Copyright 2020 The Android Open Source Project
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package androidx.work.rxjava3;

import android.content.Context;

import androidx.annotation.CallSuper;
import androidx.annotation.MainThread;
import androidx.annotation.NonNull;
import androidx.annotation.Nullable;
import androidx.work.Configuration;
import androidx.work.Data;
import androidx.work.ForegroundInfo;
import androidx.work.ListenableWorker;
import androidx.work.OutOfQuotaPolicy;
import androidx.work.WorkManager;
import androidx.work.WorkRequest;
import androidx.work.Worker;
import androidx.work.WorkerParameters;
import androidx.work.impl.utils.SynchronousExecutor;
import androidx.work.impl.utils.futures.SettableFuture;

import com.google.common.util.concurrent.ListenableFuture;

import java.util.concurrent.Executor;

import io.reactivex.rxjava3.core.Completable;
import io.reactivex.rxjava3.core.Scheduler;
import io.reactivex.rxjava3.core.Single;
import io.reactivex.rxjava3.core.SingleObserver;
import io.reactivex.rxjava3.disposables.Disposable;
import io.reactivex.rxjava3.schedulers.Schedulers;

/**
 * RxJava3 interoperability Worker implementation.
 * <p>
 * When invoked by the {@link WorkManager}, it will call @{@link #createWork()} to get a
 * {@code Single<Result>} subscribe to it.
 * <p>
 * By default, RxWorker will subscribe on the thread pool that runs {@link WorkManager}
 * {@link Worker}s. You can change this behavior by overriding {@link #getBackgroundScheduler()}
 * method.
 * <p>
 * An RxWorker is given a maximum of ten minutes to finish its execution and return a
 * {@link androidx.work.ListenableWorker.Result}.  After this time has expired, the worker will be
 * signalled to stop.
 *
 * @see Worker
 */
public abstract class RxWorker extends ListenableWorker {
    @SuppressWarnings("WeakerAccess")
    static final Executor INSTANT_EXECUTOR = new SynchronousExecutor();

    @Nullable
    private SingleFutureAdapter<Result> mSingleFutureObserverAdapter;

    /**
     * @param appContext   The application {@link Context}
     * @param workerParams Parameters to setup the internal state of this worker
     */
    public RxWorker(@NonNull Context appContext, @NonNull WorkerParameters workerParams) {
        super(appContext, workerParams);
    }

    @NonNull
    @Override
    public final ListenableFuture<Result> startWork() {
        mSingleFutureObserverAdapter = new SingleFutureAdapter<>();
        return convert(mSingleFutureObserverAdapter, createWork());
    }

    /**
     * Returns the default background scheduler that {@code RxWorker} will use to subscribe.
     * <p>
     * The default implementation returns a Scheduler that uses the {@link Executor} which was
     * provided in {@link WorkManager}'s {@link Configuration} (or the default one it creates).
     * <p>
     * You can override this method to change the Scheduler used by RxWorker to start its
     * subscription. It always observes the result of the {@link Single} in WorkManager's internal
     * thread.
     *
     * @return The default {@link Scheduler}.
     */
    protected @NonNull Scheduler getBackgroundScheduler() {
        return Schedulers.from(getBackgroundExecutor(), true, true);
    }

    /**
     * Override this method to define your actual work and return a {@code Single} of
     * {@link androidx.work.ListenableWorker.Result} which will be subscribed by the
     * {@link WorkManager}.
     * <p>
     * If the returned {@code Single} fails, the worker will be considered as failed.
     * <p>
     * If the {@link RxWorker} is cancelled by the {@link WorkManager} (e.g. due to a constraint
     * change), {@link WorkManager} will dispose the subscription immediately.
     * <p>
     * By default, subscription happens on the shared {@link Worker} pool. You can change it
     * by overriding {@link #getBackgroundScheduler()}.
     * <p>
     * An RxWorker is given a maximum of ten minutes to finish its execution and return a
     * {@link androidx.work.ListenableWorker.Result}.  After this time has expired, the worker will
     * be signalled to stop.
     *
     * @return a {@code Single<Result>} that represents the work.
     */
    @MainThread
    public abstract @NonNull Single<Result> createWork();

    /**
     * Updates the progress for a {@link RxWorker}. This method returns a {@link Completable}
     * similar to the {@link ListenableWorker#setProgressAsync(Data)} API.
     *
     * @param data The progress {@link Data}
     * @return The {@link Completable}
     */
    @NonNull
    public final Completable setCompletableProgress(@NonNull Data data) {
        return Completable.fromFuture(setProgressAsync(data));
    }

    @Override
    @CallSuper
    public void onStopped() {
        super.onStopped();
        final SingleFutureAdapter<Result> observer = mSingleFutureObserverAdapter;
        if (observer != null) {
            observer.dispose();
            mSingleFutureObserverAdapter = null;
        }
    }

    @NonNull
    @Override
    public ListenableFuture<ForegroundInfo> getForegroundInfoAsync() {
        return convert(new SingleFutureAdapter<>(), getForegroundInfo());
    }

    /**
     * Return an {@code Single} with an instance of {@link ForegroundInfo} if the
     * {@link WorkRequest} is
     * important to the user.  In this case, WorkManager provides a signal to the OS that the
     * process should be kept alive while this work is executing.
     * <p>
     * Prior to Android S, WorkManager manages and runs a foreground service on your behalf to
     * execute the WorkRequest, showing the notification provided in the {@link ForegroundInfo}.
     * To update this notification subsequently, the application can use
     * {@link android.app.NotificationManager}.
     * <p>
     * Starting in Android S and above, WorkManager manages this WorkRequest using an immediate job.
     *
     * @return A {@link Single} of {@link ForegroundInfo} instance if the WorkRequest
     * is marked immediate. For more information look at
     * {@link WorkRequest.Builder#setExpedited(OutOfQuotaPolicy)}.
     */
    @NonNull
    public Single<ForegroundInfo> getForegroundInfo() {
        String message =
                "Expedited WorkRequests require a RxWorker to provide an implementation for"
                        + " `getForegroundInfo()`";
        return Single.error(new IllegalStateException(message));
    }

    /**
     * This specifies that the {@link WorkRequest} is long-running or otherwise important.  In
     * this case, WorkManager provides a signal to the OS that the process should be kept alive
     * if possible while this work is executing.
     * <p>
     * Calls to {@code setForegroundAsync} *must* complete before a {@link RxWorker}
     * signals completion by returning a {@link Result}.
     * <p>
     * Under the hood, WorkManager manages and runs a foreground service on your behalf to
     * execute this WorkRequest, showing the notification provided in
     * {@link ForegroundInfo}.
     * <p>
     * Calling {@code setForeground} will fail with an
     * {@link IllegalStateException} when the process is subject to foreground
     * service restrictions. Consider using
     * {@link WorkRequest.Builder#setExpedited(OutOfQuotaPolicy)} and
     * {@link RxWorker#getForegroundInfo()} instead.
     *
     * @param foregroundInfo The {@link ForegroundInfo}
     * @return A {@link Completable} which resolves after the {@link RxWorker}
     * transitions to running in the context of a foreground {@link android.app.Service}.
     */
    @NonNull
    public final Completable setForeground(@NonNull ForegroundInfo foregroundInfo) {
        return Completable.fromFuture(setForegroundAsync(foregroundInfo));
    }

    private <T> ListenableFuture<T> convert(SingleFutureAdapter<T> adapter, Single<T> single) {
        final Scheduler scheduler = getBackgroundScheduler();
        single.subscribeOn(scheduler)
                // observe on WM's private thread
                .observeOn(Schedulers.from(
                        getTaskExecutor().getSerialTaskExecutor(),
                        /* interruptible */true,
                        /* fair */true))
                .subscribe(adapter);
        return adapter.mFuture;
    }

    /**
     * An observer that can observe a single and provide it as a {@link ListenableWorker}.
     */
    static class SingleFutureAdapter<T> implements SingleObserver<T>, Runnable {
        final SettableFuture<T> mFuture = SettableFuture.create();
        @Nullable
        private Disposable mDisposable;

        SingleFutureAdapter() {
            mFuture.addListener(this, INSTANT_EXECUTOR);
        }

        @Override
        public void onSubscribe(Disposable disposable) {
            mDisposable = disposable;
        }

        @Override
        public void onSuccess(T t) {
            mFuture.set(t);
        }

        @Override
        public void onError(Throwable throwable) {
            mFuture.setException(throwable);
        }

        @Override
        public void run() { // Future listener
            if (mFuture.isCancelled()) {
                dispose();
            }
        }

        void dispose() {
            final Disposable disposable = mDisposable;
            if (disposable != null) {
                disposable.dispose();
            }
        }
    }
}