• 首页 首页 icon
  • 工具库 工具库 icon
    • IP查询 IP查询 icon
  • 内容库 内容库 icon
    • 快讯库 快讯库 icon
    • 精品库 精品库 icon
    • 问答库 问答库 icon
  • 更多 更多 icon
    • 服务条款 服务条款 icon

Android WorkManager :任务取消

武飞扬头像
巴黎没有摩天轮Li
帮助252

前言

在应用开发中我们经常有一些功能场景需要在指定时间(时机)去触发,例如在启动流程时对缓存信息的更新,以及触发某些优先级低的行为逻辑,又或者空闲时上传图片不影响占用过多的用户行为。 在Jetpack还没有提供WorkMananger之前,我们经常采取 长生命周期的线程池AlarmManager BroadcastReceiverJobScheduler等待去处理异步任务,往往有时这些方法在某种场景下具有稳定性问题导致不可靠,并且需要我们去处理事件调度比如优先级等等,因此WorkManager的出现解决了这些问题,不过也带来了一些问题,让我们来探索一下。

取消api

学新通技术网

目前根据源码以及文档发现,对于取消任务提供了四个比较明显方法,举个其中某一个方法尝试取消任务。

WorkManager创建任务(自行封装)

学新通技术网

WorkManager取消任务

学新通技术网

通过注释以及日常使用实践来看,发现取消方法其实是一种可能取消不了的方法,可能存在申请取消但实际上内部耗时任务还是在执行的情况。

学新通技术网

例举其中一个方法我们看下实现逻辑。

@Override
public @NonNull Operation cancelWorkById(@NonNull UUID id) {
    // 1
    CancelWorkRunnable runnable = CancelWorkRunnable.forId(id, this);
    // 2
    mWorkTaskExecutor.executeOnBackgroundThread(runnable);
    return runnable.getOperation();
}

从代码1中我们可以看到根据UUID从CancelWorkRunnable找到对应的task的Runnable,代码2处我们可以看到直接从线程池中执行这个Runnable。

public static CancelWorkRunnable forId(
        @NonNull final UUID id,
        @NonNull final WorkManagerImpl workManagerImpl) {
    return new CancelWorkRunnable() {
        @WorkerThread
        @Override
        void runInternal() {
            // 1
            WorkDatabase workDatabase = workManagerImpl.getWorkDatabase();
            workDatabase.beginTransaction();
            try {
                // 2
                cancel(workManagerImpl, id.toString());
                workDatabase.setTransactionSuccessful();
            } finally {
                workDatabase.endTransaction();
            }
            reschedulePendingWorkers(workManagerImpl);
        }
    };
}

从代码1中看到task的状态信息是保存在数据库中的,因此获取了Room数据库对象,并开启了事务。

从代码2中看到调用了cancel方法,具体内部是如何实现的我们探究一下。

void cancel(WorkManagerImpl workManagerImpl, String workSpecId) {
    // 1
    iterativelyCancelWorkAndDependents(workManagerImpl.getWorkDatabase(), workSpecId);

    // 2
    Processor processor = workManagerImpl.getProcessor();
    // 3
    processor.stopAndCancelWork(workSpecId);

    for (Scheduler scheduler : workManagerImpl.getSchedulers()) {
        scheduler.cancel(workSpecId);
    }
}

代码1从方法名称可知目的是迭代的取消工作与相关依赖于当前task的其他tasks.

private void iterativelyCancelWorkAndDependents(WorkDatabase workDatabase, String workSpecId) {
    WorkSpecDao workSpecDao = workDatabase.workSpecDao();
    DependencyDao dependencyDao = workDatabase.dependencyDao();
    LinkedList<String> idsToProcess = new LinkedList<>();
    idsToProcess.add(workSpecId);
    while (!idsToProcess.isEmpty()) {
        String id = idsToProcess.remove();
        // Don't fail already cancelled work.
        // 1
        WorkInfo.State state = workSpecDao.getState(id);
        if (state != SUCCEEDED && state != FAILED) {
            workSpecDao.setState(CANCELLED, id);
        }
        idsToProcess.addAll(dependencyDao.getDependentWorkIds(id));
    }
}

iterativelyCancelWorkAndDependents()方法代码1处对根据UUID从数据库查找的task 以及 依赖tasks进行标记取消,workSpecDao.setState(CANCELLED, id), 并存入数据库中。

我们可以看到依赖表,表是记录的依赖于当前task的所有子tasks。

@Dao
public interface DependencyDao {
    /**
     * Attempts to insert a {@link Dependency} into the database.
     *
     * @param dependency The {@link Dependency}s to insert
     */
    @Insert(onConflict = IGNORE)
    void insertDependency(Dependency dependency);

    /**
     * Determines if a {@link WorkSpec} has completed all prerequisites.
     *
     * @param id The identifier for the {@link WorkSpec}
     * @return {@code true} if the {@link WorkSpec} has no pending prerequisites.
     */
    @Query("SELECT COUNT(*)=0 FROM dependency WHERE work_spec_id=:id AND prerequisite_id IN "
              "(SELECT id FROM workspec WHERE state!="
              WorkTypeConverters.StateIds.SUCCEEDED   ")")
    boolean hasCompletedAllPrerequisites(String id);

    /**
     * Gets all the direct prerequisites for a particular {@link WorkSpec}.
     *
     * @param id The {@link WorkSpec} identifier
     * @return A list of all prerequisites for {@code id}
     */
    @Query("SELECT prerequisite_id FROM dependency WHERE work_spec_id=:id")
    List<String> getPrerequisites(String id);

    /**
     * Gets all {@link WorkSpec} id's dependent on a given id
     *
     * @param id A {@link WorkSpec} identifier
     * @return A list of all identifiers that depend on the input
     */
    @Query("SELECT work_spec_id FROM dependency WHERE prerequisite_id=:id")
    List<String> getDependentWorkIds(String id);

    /**
     * Determines if a {@link WorkSpec} has any dependents.
     *
     * @param id A {@link WorkSpec} identifier
     * @return {@code true} if the {@link WorkSpec} has WorkSpecs that depend on it
     */
    @Query("SELECT COUNT(*)>0 FROM dependency WHERE prerequisite_id=:id")
    boolean hasDependents(String id);
}

此时我们回过头再看下cancel方法,代码2处从WorkManagerImpl中获取了processor对象, 并调用了stopAndCancelWork()方法。

public boolean stopAndCancelWork(@NonNull String id) {
    synchronized (mLock) {
        Logger.get().debug(TAG, String.format("Processor cancelling %s", id));
        mCancelledIds.add(id);
        WorkerWrapper wrapper;
        // Check if running in the context of a foreground service
        // 1
        wrapper = mForegroundWorkMap.remove(id);
        boolean isForegroundWork = wrapper != null;
        if (wrapper == null) {
            // Fallback to enqueued Work
            wrapper = mEnqueuedWorkMap.remove(id);
        }
        // 2
        boolean interrupted = interrupt(id, wrapper);
        if (isForegroundWork) {
            stopForegroundService();
        }
        return interrupted;
    }
}

代码1处从前台工作队列中根据UUID移除,然后获取到移除的WorkerWrapper(本质上就是task实现了Runnable接口),如果wrapper不为null,则认为是前台工作,后续会停止前台服务。如果不是前台wrapper,就会从mEnqueuedWorkMap队列中根据UUID寻找task并移除,最终会调用wrapper自身的interrupt中断方法。

@RestrictTo(RestrictTo.Scope.LIBRARY_GROUP)
public void interrupt() {
    mInterrupted = true;
    // 1
    tryCheckForInterruptionAndResolve();
    boolean isDone = false;
    if (mInnerFuture != null) {
        // Propagate the cancellations to the inner future.
        // 2
        isDone = mInnerFuture.isDone();
        // 3
        mInnerFuture.cancel(true);
    }
    // Worker can be null if run() hasn't been called yet
    if (mWorker != null && !isDone) {
        // 4
        mWorker.stop();
    } else {
        String message =
                String.format("WorkSpec %s is already done. Not interrupting.", mWorkSpec);
        Logger.get().debug(TAG, message);
    }
}

code1处是对worker的取消逻辑。

private boolean tryCheckForInterruptionAndResolve() {
    if (mInterrupted) {
        WorkInfo.State currentState = mWorkSpecDao.getState(mWorkSpecId);
        if (currentState == null) {
            resolve(false);
        } else {
            resolve(!currentState.isFinished());
        }
        return true;
    }
    return false;
}

根据当前worker的状态来进行的不同处理。若当前的状态为null,说明当用户再次调用beginUniqueWork方法时并且设置的是 Replace 的模式,则对woker状态进行清理,所以才会有null这种情况。

private void resolve(final boolean needsReschedule) {
    mWorkDatabase.beginTransaction();
    try {
        boolean hasUnfinishedWork = mWorkDatabase.workSpecDao().hasUnfinishedWork();
        if (!hasUnfinishedWork) {
            // 1
            PackageManagerHelper.setComponentEnabled(
                    mAppContext, RescheduleReceiver.class, false);
        }
        if (needsReschedule) {
            // 2
            mWorkSpecDao.setState(ENQUEUED, mWorkSpecId);
            mWorkSpecDao.markWorkSpecScheduled(mWorkSpecId, SCHEDULE_NOT_REQUESTED_YET);
        }
        if (mWorkSpec != null && mWorker != null && mWorker.isRunInForeground()) {
            mForegroundProcessor.stopForeground(mWorkSpecId);
        }
        mWorkDatabase.setTransactionSuccessful();
    } finally {
        mWorkDatabase.endTransaction();
    }
    mFuture.set(needsReschedule);
}

code1当没有未完成的工作的时候,会将RescheduleReceiver设置为false, 这个RescheduleReceiver是一个广播,用于由于还有队列没有处理完成,唤起workmananger的作用,或者对一些场景做告警,这个广播的逻辑我们以后有时间再详勘。code2处就比较好理解了,由于我们要直接取消WorkMananger的执行,势必有些工作是需要再次排期执行的,只是说在当前过程中我们需要中止WorkManager,因此将对数据库对worker的状态重新设置为ENQUEUED,并对执行的时刻表设置为SCHEDULE_NOT_REQUESTED_YET(-1).

此时我们回到interrupt()方法,code2处这里的mInnerFuture是用来监听work的异步任务结果的,这里多说一嘴,我们知道创建线程的几种方式,其中的一种方式是通过 Future 接口来调用线程池的execute(future: Future)方法来获取线程任务的结果的,也就是说FutureTask Callable的方式。但是Future接口并未提供当线程执行完毕自动回调,所以Google针对这种需求在原有的基础之上又自行封装了一份ListenableFuture,在code3处调用了cancel(true)方法是为了切断ListenableFuture中的任务执行。

code4处ListenableWorker这里他的实现类是CoroutineWorker,在调用stop方法时实际是调用了onStopped()方法,回到CorotineWorker#onStop()方法,他内部依然存在一个ListenableFuture去监听doWork()最终的状态,用于对协程调度的取消操作。

internal val job = Job()
internal val future: SettableFuture<Result> = SettableFuture.create()

init {
    future.addListener(
        Runnable {
            if (future.isCancelled) {
                job.cancel()
            }
        },
        taskExecutor.backgroundExecutor
    )
}

/**
 * The coroutine context on which [doWork] will run. By default, this is [Dispatchers.Default].
 */
@Deprecated(message = "use withContext(...) inside doWork() instead.")
public open val coroutineContext: CoroutineDispatcher = Dispatchers.Default

@Suppress("DEPRECATION")
public final override fun startWork(): ListenableFuture<Result> {
    val coroutineScope = CoroutineScope(coroutineContext   job)
    coroutineScope.launch {
        try {
            val result = doWork()
            future.set(result)
        } catch (t: Throwable) {
            future.setException(t)
        }
    }
    return future
}

public final override fun onStopped() {
    super.onStopped()
    future.cancel(false)
}

学新通技术网

他的实现类是AbstractFuture.

学新通技术网

所以对于为什么我们去调用workManager自带的取消api只是尽力去取消,是因为Future的cancel(true)方法只是一个尽力而为的行为,并不能确定一定会将异步任务完全的取消执行。

解决方案

自己思考了一下,其实WorkerManager本身定位就是轻量级异步任务,我们其实不用过多的去考虑完全取消的问题,但我为了这个问题并简化模板代码自己封装了一下,大家可以参考一下,由于使用定位问题,没有应用到项目中,不过我觉得是一个思路。

使用方式
Step1
@Singleton
class CityInfoCacheTask @Inject constructor(private val cityRepository: CityRepository) : AsyncTask {
    override suspend fun call(): ListenableWorker.Result {
        return cityRepository.refreshSelectedCityInfo().toWorkerResult()
    }
    companion object {
        const val TAG: String = "CityInfoCacheTask"
    }
}
Step2
taskHelper.apply {
    addTask(TaskEntity(task = cacheTask, retryTimes = 1, tag = CityInfoCacheTask.TAG))
    addTask(TaskEntity(task = cacheTask, retryTimes = 1, tag = CityInfoCacheTask.TAG))
    addTask(TaskEntity(task = cacheTask, retryTimes = 1, tag = CityInfoCacheTask.TAG))
    addTask(TaskEntity(task = cacheTask, retryTimes = 1, tag = CityInfoCacheTask.TAG))
    addTask(TaskEntity(task = cacheTask, retryTimes = 1, tag = CityInfoCacheTask.TAG))
    addTask(TaskEntity(task = cacheTask, retryTimes = 1, tag = CityInfoCacheTask.TAG))
    addTask(TaskEntity(task = cacheTask, retryTimes = 1, tag = CityInfoCacheTask.TAG))
    addTask(TaskEntity(task = cacheTask, retryTimes = 1, tag = CityInfoCacheTask.TAG))
    // ....
}
// start task
workerHelper.startOnce()
Cancel Work
taskHelper.cancelTaskByTag(CityInfoCacheTask.TAG)
// Now, I think we don't need support to canceledAllTaskByTag(), because use workermanager we should be canceled specific task. 
taskHelper.cancelAllTasksByTag()

就这么简单。

实现方式

学新通技术网

学新通技术网

串行执行
internal class SingleRunnableStrategy(private val taskQueue: TaskQueue, val coDispatcherProvider: CoDispatcherProvider)
    : BaseRunnableStrategy(taskQueue) {

    private var cancelLoopJob: Job? = null
    @VisibleForTesting
    internal var runningJobInfo = Pair<Job?, String?>(null, null)

    override suspend fun run() {
        supervisorScope {
            cancelLoopJob = async { cancelTaskLooper() }
            do {
                val loopTask = getTask()
                if (loopTask != null) {
                    val runningJob = async {
                        val taskResult = loopTask.task.call()
                        handleResult(loopTask, taskResult)
                    }
                    runningJobInfo = runningJob to loopTask.tag
                    runningJob.join()
                }
            } while (loopTask != null)
            cancelLoopJob?.cancel()
        }
    }

    @VisibleForTesting
    internal suspend fun cancelTaskLooper() {
        while (true) {
            if (runningJobInfo.first == null || runningJobInfo.second == null){
                continue
            }
            val runningJob = runningJobInfo.first!!
            val runningTag = runningJobInfo.second!!
            if (taskQueue.isCanceledTask(runningTag) && runningJob.isActive) {
                taskQueue.removeCanceledTaskByTag(runningTag)
                runningJob.cancel()
            }
        }
    }
}
并行执行
internal class ParallelRunnableStrategy(val taskQueue: TaskQueue, val coDispatcherProvider: CoDispatcherProvider) :
    BaseRunnableStrategy(taskQueue) {

    @VisibleForTesting
    var isFinished = false

    @VisibleForTesting
    var parallelRunningTasks = ConcurrentHashMap<String?, Job>()

    @VisibleForTesting
    var releaseChannel = Channel<TaskEntity>()

    @VisibleForTesting
    var blockingJob: Job? = null

    private var loopChannelJob: Job? = null
    private val lock = Mutex()

    // available cpu core   1 = parallel threshold
    private val taskThreshold = Runtime.getRuntime().availableProcessors()   1

    override suspend fun run() {
        prepareRun()
        supervisorScope {
            loopChannelJob = async {
                loopChannel()
            }
            do {
                val loopTask = getTask()
                loopTask?.let { currentTask ->
                    lock.withLock {
                        parallelRunningTasks[currentTask.tag] = async {
                            runTask(currentTask)
                        }
                    }
                }
                if (taskThreshold <= parallelRunningTasks.size) {
                    blockingJob = async { delay(Long.MAX_VALUE) }
                    blockingJob?.join()
                }
            } while (loopTask != null)
            release()
        }
    }

    private suspend fun runTask(currentTask: TaskEntity) {
        val taskResult = currentTask.task.call()
        handleResult(currentTask, taskResult)
        // task finished send a message, to check blocking and whether need to await.
        releaseChannel.send(currentTask)
    }

    @VisibleForTesting
    internal suspend fun loopChannel() {
        while (true) {
            if (isFinished) {
                break
            }
            verifyCancelTask()
            // check to release block if parallel number < DEFAULT_PARALLEL_NUMBER
            releaseChannel.tryReceive().getOrNull()?.let {
                lock.withLock {
                    parallelRunningTasks.remove(it.tag)
                    if (parallelRunningTasks.size < taskThreshold && blockingJob?.isActive == true) {
                        blockingJob?.cancel()
                    }
                }
            }
        }
    }

    @VisibleForTesting
    internal fun verifyCancelTask() {
        // check cancel.
        val taskIterator = parallelRunningTasks.iterator()
        while (taskIterator.hasNext()) {
            val runningTask = taskIterator.next()
            runningTask.key?.let {
                if (taskQueue.isCanceledTask(it) && runningTask.value.isActive) {
                    runningTask.value.cancel()
                    taskQueue.removeCanceledTaskByTag(it)
                    taskIterator.remove()
                }
            }
        }
    }

    private fun prepareRun() {
        parallelRunningTasks.filter { it.value.isActive }.map { it.value.cancel() }
        parallelRunningTasks.clear()
        loopChannelJob?.cancel()
    }

    private fun release() {
        isFinished = true
        releaseChannel.close()
        loopChannelJob?.cancel()
    }
}

核心的取消流程是创建了一个loop,不停地去判断当前任务的运行态,若运行态是取消状态,则去cancel当前执行任务的job。 这样的话其实 WorkManager只开辟一次,内部细分成多个子task,这样我们的执行与取消逻辑由自己去控制。

这篇好文章是转载于:学新通技术网

  • 版权申明: 本站部分内容来自互联网,仅供学习及演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,请提供相关证据及您的身份证明,我们将在收到邮件后48小时内删除。
  • 本站站名: 学新通技术网
  • 本文地址: /boutique/detail/tanffhga
系列文章
更多 icon
同类精品
更多 icon
继续加载