前言
在应用开发中我们经常有一些功能场景需要在指定时间(时机)去触发,例如在启动流程时对缓存信息的更新,以及触发某些优先级低的行为逻辑,又或者空闲时上传图片不影响占用过多的用户行为。 在Jetpack还没有提供WorkMananger之前,我们经常采取 长生命周期的线程池
、AlarmManager BroadcastReceiver
、JobScheduler
等待去处理异步任务,往往有时这些方法在某种场景下具有稳定性问题导致不可靠,并且需要我们去处理事件调度比如优先级等等,因此WorkManager的出现解决了这些问题,不过也带来了一些问题,让我们来探索一下。
取消api
目前根据源码以及文档发现,对于取消任务提供了四个比较明显方法,举个其中某一个方法尝试取消任务。
WorkManager创建任务(自行封装)
WorkManager取消任务
通过注释以及日常使用实践来看,发现取消方法其实是一种可能取消不了的方法,可能存在申请取消但实际上内部耗时任务还是在执行的情况。
例举其中一个方法我们看下实现逻辑。
@Override
public @NonNull Operation cancelWorkById(@NonNull UUID id) {
// 1
CancelWorkRunnable runnable = CancelWorkRunnable.forId(id, this);
// 2
mWorkTaskExecutor.executeOnBackgroundThread(runnable);
return runnable.getOperation();
}
从代码1中我们可以看到根据UUID从CancelWorkRunnable找到对应的task的Runnable,代码2处我们可以看到直接从线程池中执行这个Runnable。
public static CancelWorkRunnable forId(
@NonNull final UUID id,
@NonNull final WorkManagerImpl workManagerImpl) {
return new CancelWorkRunnable() {
@WorkerThread
@Override
void runInternal() {
// 1
WorkDatabase workDatabase = workManagerImpl.getWorkDatabase();
workDatabase.beginTransaction();
try {
// 2
cancel(workManagerImpl, id.toString());
workDatabase.setTransactionSuccessful();
} finally {
workDatabase.endTransaction();
}
reschedulePendingWorkers(workManagerImpl);
}
};
}
从代码1中看到task的状态信息是保存在数据库中的,因此获取了Room数据库对象,并开启了事务。
从代码2中看到调用了cancel方法,具体内部是如何实现的我们探究一下。
void cancel(WorkManagerImpl workManagerImpl, String workSpecId) {
// 1
iterativelyCancelWorkAndDependents(workManagerImpl.getWorkDatabase(), workSpecId);
// 2
Processor processor = workManagerImpl.getProcessor();
// 3
processor.stopAndCancelWork(workSpecId);
for (Scheduler scheduler : workManagerImpl.getSchedulers()) {
scheduler.cancel(workSpecId);
}
}
代码1从方法名称可知目的是迭代的取消工作与相关依赖于当前task的其他tasks.
private void iterativelyCancelWorkAndDependents(WorkDatabase workDatabase, String workSpecId) {
WorkSpecDao workSpecDao = workDatabase.workSpecDao();
DependencyDao dependencyDao = workDatabase.dependencyDao();
LinkedList<String> idsToProcess = new LinkedList<>();
idsToProcess.add(workSpecId);
while (!idsToProcess.isEmpty()) {
String id = idsToProcess.remove();
// Don't fail already cancelled work.
// 1
WorkInfo.State state = workSpecDao.getState(id);
if (state != SUCCEEDED && state != FAILED) {
workSpecDao.setState(CANCELLED, id);
}
idsToProcess.addAll(dependencyDao.getDependentWorkIds(id));
}
}
iterativelyCancelWorkAndDependents()方法代码1处对根据UUID从数据库查找的task 以及 依赖tasks进行标记取消,workSpecDao.setState(CANCELLED, id), 并存入数据库中。
我们可以看到依赖表,表是记录的依赖于当前task的所有子tasks。
@Dao
public interface DependencyDao {
/**
* Attempts to insert a {@link Dependency} into the database.
*
* @param dependency The {@link Dependency}s to insert
*/
@Insert(onConflict = IGNORE)
void insertDependency(Dependency dependency);
/**
* Determines if a {@link WorkSpec} has completed all prerequisites.
*
* @param id The identifier for the {@link WorkSpec}
* @return {@code true} if the {@link WorkSpec} has no pending prerequisites.
*/
@Query("SELECT COUNT(*)=0 FROM dependency WHERE work_spec_id=:id AND prerequisite_id IN "
"(SELECT id FROM workspec WHERE state!="
WorkTypeConverters.StateIds.SUCCEEDED ")")
boolean hasCompletedAllPrerequisites(String id);
/**
* Gets all the direct prerequisites for a particular {@link WorkSpec}.
*
* @param id The {@link WorkSpec} identifier
* @return A list of all prerequisites for {@code id}
*/
@Query("SELECT prerequisite_id FROM dependency WHERE work_spec_id=:id")
List<String> getPrerequisites(String id);
/**
* Gets all {@link WorkSpec} id's dependent on a given id
*
* @param id A {@link WorkSpec} identifier
* @return A list of all identifiers that depend on the input
*/
@Query("SELECT work_spec_id FROM dependency WHERE prerequisite_id=:id")
List<String> getDependentWorkIds(String id);
/**
* Determines if a {@link WorkSpec} has any dependents.
*
* @param id A {@link WorkSpec} identifier
* @return {@code true} if the {@link WorkSpec} has WorkSpecs that depend on it
*/
@Query("SELECT COUNT(*)>0 FROM dependency WHERE prerequisite_id=:id")
boolean hasDependents(String id);
}
此时我们回过头再看下cancel方法,代码2处从WorkManagerImpl中获取了processor对象, 并调用了stopAndCancelWork()方法。
public boolean stopAndCancelWork(@NonNull String id) {
synchronized (mLock) {
Logger.get().debug(TAG, String.format("Processor cancelling %s", id));
mCancelledIds.add(id);
WorkerWrapper wrapper;
// Check if running in the context of a foreground service
// 1
wrapper = mForegroundWorkMap.remove(id);
boolean isForegroundWork = wrapper != null;
if (wrapper == null) {
// Fallback to enqueued Work
wrapper = mEnqueuedWorkMap.remove(id);
}
// 2
boolean interrupted = interrupt(id, wrapper);
if (isForegroundWork) {
stopForegroundService();
}
return interrupted;
}
}
代码1处从前台工作队列中根据UUID移除,然后获取到移除的WorkerWrapper(本质上就是task实现了Runnable接口),如果wrapper不为null,则认为是前台工作,后续会停止前台服务。如果不是前台wrapper,就会从mEnqueuedWorkMap队列中根据UUID寻找task并移除,最终会调用wrapper自身的interrupt中断方法。
@RestrictTo(RestrictTo.Scope.LIBRARY_GROUP)
public void interrupt() {
mInterrupted = true;
// 1
tryCheckForInterruptionAndResolve();
boolean isDone = false;
if (mInnerFuture != null) {
// Propagate the cancellations to the inner future.
// 2
isDone = mInnerFuture.isDone();
// 3
mInnerFuture.cancel(true);
}
// Worker can be null if run() hasn't been called yet
if (mWorker != null && !isDone) {
// 4
mWorker.stop();
} else {
String message =
String.format("WorkSpec %s is already done. Not interrupting.", mWorkSpec);
Logger.get().debug(TAG, message);
}
}
code1处是对worker的取消逻辑。
private boolean tryCheckForInterruptionAndResolve() {
if (mInterrupted) {
WorkInfo.State currentState = mWorkSpecDao.getState(mWorkSpecId);
if (currentState == null) {
resolve(false);
} else {
resolve(!currentState.isFinished());
}
return true;
}
return false;
}
根据当前worker的状态来进行的不同处理。若当前的状态为null,说明当用户再次调用beginUniqueWork方法时并且设置的是 Replace 的模式,则对woker状态进行清理,所以才会有null这种情况。
private void resolve(final boolean needsReschedule) {
mWorkDatabase.beginTransaction();
try {
boolean hasUnfinishedWork = mWorkDatabase.workSpecDao().hasUnfinishedWork();
if (!hasUnfinishedWork) {
// 1
PackageManagerHelper.setComponentEnabled(
mAppContext, RescheduleReceiver.class, false);
}
if (needsReschedule) {
// 2
mWorkSpecDao.setState(ENQUEUED, mWorkSpecId);
mWorkSpecDao.markWorkSpecScheduled(mWorkSpecId, SCHEDULE_NOT_REQUESTED_YET);
}
if (mWorkSpec != null && mWorker != null && mWorker.isRunInForeground()) {
mForegroundProcessor.stopForeground(mWorkSpecId);
}
mWorkDatabase.setTransactionSuccessful();
} finally {
mWorkDatabase.endTransaction();
}
mFuture.set(needsReschedule);
}
code1当没有未完成的工作的时候,会将RescheduleReceiver设置为false, 这个RescheduleReceiver是一个广播,用于由于还有队列没有处理完成,唤起workmananger的作用,或者对一些场景做告警,这个广播的逻辑我们以后有时间再详勘。code2处就比较好理解了,由于我们要直接取消WorkMananger的执行,势必有些工作是需要再次排期执行的,只是说在当前过程中我们需要中止WorkManager,因此将对数据库对worker的状态重新设置为ENQUEUED,并对执行的时刻表设置为SCHEDULE_NOT_REQUESTED_YET(-1).
此时我们回到interrupt()方法,code2处这里的mInnerFuture是用来监听work的异步任务结果的,这里多说一嘴,我们知道创建线程的几种方式,其中的一种方式是通过 Future 接口来调用线程池的execute(future: Future)方法来获取线程任务的结果的,也就是说FutureTask Callable的方式。但是Future接口并未提供当线程执行完毕自动回调,所以Google针对这种需求在原有的基础之上又自行封装了一份ListenableFuture,在code3处调用了cancel(true)方法是为了切断ListenableFuture中的任务执行。
code4处ListenableWorker这里他的实现类是CoroutineWorker,在调用stop方法时实际是调用了onStopped()方法,回到CorotineWorker#onStop()方法,他内部依然存在一个ListenableFuture去监听doWork()最终的状态,用于对协程调度的取消操作。
internal val job = Job()
internal val future: SettableFuture<Result> = SettableFuture.create()
init {
future.addListener(
Runnable {
if (future.isCancelled) {
job.cancel()
}
},
taskExecutor.backgroundExecutor
)
}
/**
* The coroutine context on which [doWork] will run. By default, this is [Dispatchers.Default].
*/
@Deprecated(message = "use withContext(...) inside doWork() instead.")
public open val coroutineContext: CoroutineDispatcher = Dispatchers.Default
@Suppress("DEPRECATION")
public final override fun startWork(): ListenableFuture<Result> {
val coroutineScope = CoroutineScope(coroutineContext job)
coroutineScope.launch {
try {
val result = doWork()
future.set(result)
} catch (t: Throwable) {
future.setException(t)
}
}
return future
}
public final override fun onStopped() {
super.onStopped()
future.cancel(false)
}
他的实现类是AbstractFuture.
所以对于为什么我们去调用workManager自带的取消api只是尽力去取消,是因为Future的cancel(true)方法只是一个尽力而为的行为,并不能确定一定会将异步任务完全的取消执行。
解决方案
自己思考了一下,其实WorkerManager本身定位就是轻量级异步任务,我们其实不用过多的去考虑完全取消的问题,但我为了这个问题并简化模板代码自己封装了一下,大家可以参考一下,由于使用定位问题,没有应用到项目中,不过我觉得是一个思路。
使用方式
Step1
@Singleton
class CityInfoCacheTask @Inject constructor(private val cityRepository: CityRepository) : AsyncTask {
override suspend fun call(): ListenableWorker.Result {
return cityRepository.refreshSelectedCityInfo().toWorkerResult()
}
companion object {
const val TAG: String = "CityInfoCacheTask"
}
}
Step2
taskHelper.apply {
addTask(TaskEntity(task = cacheTask, retryTimes = 1, tag = CityInfoCacheTask.TAG))
addTask(TaskEntity(task = cacheTask, retryTimes = 1, tag = CityInfoCacheTask.TAG))
addTask(TaskEntity(task = cacheTask, retryTimes = 1, tag = CityInfoCacheTask.TAG))
addTask(TaskEntity(task = cacheTask, retryTimes = 1, tag = CityInfoCacheTask.TAG))
addTask(TaskEntity(task = cacheTask, retryTimes = 1, tag = CityInfoCacheTask.TAG))
addTask(TaskEntity(task = cacheTask, retryTimes = 1, tag = CityInfoCacheTask.TAG))
addTask(TaskEntity(task = cacheTask, retryTimes = 1, tag = CityInfoCacheTask.TAG))
addTask(TaskEntity(task = cacheTask, retryTimes = 1, tag = CityInfoCacheTask.TAG))
// ....
}
// start task
workerHelper.startOnce()
Cancel Work
taskHelper.cancelTaskByTag(CityInfoCacheTask.TAG)
// Now, I think we don't need support to canceledAllTaskByTag(), because use workermanager we should be canceled specific task.
taskHelper.cancelAllTasksByTag()
就这么简单。
实现方式
串行执行
internal class SingleRunnableStrategy(private val taskQueue: TaskQueue, val coDispatcherProvider: CoDispatcherProvider)
: BaseRunnableStrategy(taskQueue) {
private var cancelLoopJob: Job? = null
@VisibleForTesting
internal var runningJobInfo = Pair<Job?, String?>(null, null)
override suspend fun run() {
supervisorScope {
cancelLoopJob = async { cancelTaskLooper() }
do {
val loopTask = getTask()
if (loopTask != null) {
val runningJob = async {
val taskResult = loopTask.task.call()
handleResult(loopTask, taskResult)
}
runningJobInfo = runningJob to loopTask.tag
runningJob.join()
}
} while (loopTask != null)
cancelLoopJob?.cancel()
}
}
@VisibleForTesting
internal suspend fun cancelTaskLooper() {
while (true) {
if (runningJobInfo.first == null || runningJobInfo.second == null){
continue
}
val runningJob = runningJobInfo.first!!
val runningTag = runningJobInfo.second!!
if (taskQueue.isCanceledTask(runningTag) && runningJob.isActive) {
taskQueue.removeCanceledTaskByTag(runningTag)
runningJob.cancel()
}
}
}
}
并行执行
internal class ParallelRunnableStrategy(val taskQueue: TaskQueue, val coDispatcherProvider: CoDispatcherProvider) :
BaseRunnableStrategy(taskQueue) {
@VisibleForTesting
var isFinished = false
@VisibleForTesting
var parallelRunningTasks = ConcurrentHashMap<String?, Job>()
@VisibleForTesting
var releaseChannel = Channel<TaskEntity>()
@VisibleForTesting
var blockingJob: Job? = null
private var loopChannelJob: Job? = null
private val lock = Mutex()
// available cpu core 1 = parallel threshold
private val taskThreshold = Runtime.getRuntime().availableProcessors() 1
override suspend fun run() {
prepareRun()
supervisorScope {
loopChannelJob = async {
loopChannel()
}
do {
val loopTask = getTask()
loopTask?.let { currentTask ->
lock.withLock {
parallelRunningTasks[currentTask.tag] = async {
runTask(currentTask)
}
}
}
if (taskThreshold <= parallelRunningTasks.size) {
blockingJob = async { delay(Long.MAX_VALUE) }
blockingJob?.join()
}
} while (loopTask != null)
release()
}
}
private suspend fun runTask(currentTask: TaskEntity) {
val taskResult = currentTask.task.call()
handleResult(currentTask, taskResult)
// task finished send a message, to check blocking and whether need to await.
releaseChannel.send(currentTask)
}
@VisibleForTesting
internal suspend fun loopChannel() {
while (true) {
if (isFinished) {
break
}
verifyCancelTask()
// check to release block if parallel number < DEFAULT_PARALLEL_NUMBER
releaseChannel.tryReceive().getOrNull()?.let {
lock.withLock {
parallelRunningTasks.remove(it.tag)
if (parallelRunningTasks.size < taskThreshold && blockingJob?.isActive == true) {
blockingJob?.cancel()
}
}
}
}
}
@VisibleForTesting
internal fun verifyCancelTask() {
// check cancel.
val taskIterator = parallelRunningTasks.iterator()
while (taskIterator.hasNext()) {
val runningTask = taskIterator.next()
runningTask.key?.let {
if (taskQueue.isCanceledTask(it) && runningTask.value.isActive) {
runningTask.value.cancel()
taskQueue.removeCanceledTaskByTag(it)
taskIterator.remove()
}
}
}
}
private fun prepareRun() {
parallelRunningTasks.filter { it.value.isActive }.map { it.value.cancel() }
parallelRunningTasks.clear()
loopChannelJob?.cancel()
}
private fun release() {
isFinished = true
releaseChannel.close()
loopChannelJob?.cancel()
}
}
核心的取消流程是创建了一个loop,不停地去判断当前任务的运行态,若运行态是取消状态,则去cancel当前执行任务的job。 这样的话其实 WorkManager只开辟一次,内部细分成多个子task,这样我们的执行与取消逻辑由自己去控制。
本文出至:学新通技术网
标签: