• 首页 首页 icon
  • 工具库 工具库 icon
    • IP查询 IP查询 icon
  • 内容库 内容库 icon
    • 快讯库 快讯库 icon
    • 精品库 精品库 icon
    • 问答库 问答库 icon
  • 更多 更多 icon
    • 服务条款 服务条款 icon

在前端 Javascript 中的 p-limit 学习前端限制并发数的方法方法理论,理解微任务和async/await

武飞扬头像
Luke
帮助747

前言

弄懂并发的限制实现,并剖析p-limit库中的实现,最终能够自己实现对应的并发限制函数

并发和并行

这里摘抄百科的一段内容

当有多个线程在操作时,如果系统只有一个CPU,则它根本不可能真正同时进行一个以上的线程,它只能把CPU运行时间划分成若干个时间段,再将时间 段分配给各个线程执行,在一个时间段的线程代码运行时,其它线程处于挂起状。.这种方式我们称之为并发

与并发不同的是,并行特指多核CPU的情况下

当系统有一个以上CPU时,则线程的操作有可能非并发。当一个CPU执行一个线程时,另一个CPU可以执行另一个线程,两个线程互不抢占CPU资源,可以同时进行,这种方式我们称之为并行

区别

并发的关键是你有处理多个任务的能力,但不一定同时处理,只是不需要像串行那样必须把事情做完了才能做下一件。 并行的关键是你有同时处理多个任务的能力

作用

如果你同时并发多个异步操作,比如成百上千(Promise.all([n])),对于性能来说也是一种考验,此时我们需要限制并发的上限以保证资源的分配, 也就是 一部分 一部分的去执行,但最终的返回的结果还是一样的

前置知识

了解p-limit的源码,需要有队列和链表结构的基础,可以先看这一期内容 # yocto-queue

简化代码

为方便学习,这里将链表队列结构换成数组将代码简洁出来 方便没有看过队列的童鞋

function pLimit(concurrency) {
    const queue = []; //任务队列
    let activeCount = 0; //

    const next = () => {
            activeCount--;
            if (ququeueeue.size > 0) {
                    queue.shift()(); //如果队列存在元素则移除 第一位的元素 并将返回的元素的 函数执行
            }
    };

    //异步函数
    const run = async (fn, resolve, args) => {
            activeCount  ; //增加activeCount

            const result = (async () => fn(...args))(); /

            resolve(result);

            try {
                    await result;
            } catch {}

            next();
    };

    const enqueue = (fn, resolve, args) => {
            queue.push(run.bind(undefined, fn, resolve, args));

            (async () => {
                    // This function needs to wait until the next microtask before comparing
                    // `activeCount` to `concurrency`, because `activeCount` is updated asynchronously
                    // when the run function is dequeued and called. The comparison in the if-statement
                    // needs to happen asynchronously as well to get an up-to-date value for `activeCount`.
                    await Promise.resolve();
                    if (activeCount < concurrency && queue.size > 0) {
                            queue.shift()(); //run.bind(undefined, fn, resolve, args) 对应的就是执行 run(fn, resolve, args)
                    }
            })();
    };

    const generator = (fn, ...args) =>
            new Promise((resolve) => {
                    enqueue(fn, resolve, args);
            });
    return generator;
}


const limit = pLimit(3);

function sleep(sec) {
return new Promise((resolve, reject) => {
    console.log('本函数的执行时要等待'  sec   '秒')
    setTimeout(() => {
        console.log(`等待了${sec}秒执行`)
        resolve()
    }, sec * 1000)
})
}
limit(sleep, 1)
limit(sleep, 1)
limit(sleep, 1)
limit(sleep, 3)
limit(sleep, 3)
limit(sleep, 3)

结构浅析

pLimit.png

流程和原理

  1. 先生成一个指定最大并发数的生成器函数 用于接收异步函数的传入
  2. 将所有的异步函数加入到队列中,主线程执行后也就是加入完成后,微任务进行执行 此时会判断 当前并发数 是否小于最大并发数,小于的话 就会将 队列中的 任务 移出一个 并进行执行
  3. 被移出进行执行的异步函数 会增加 当前并发数 并将执行结果 返回给一开始 生成器函数的Promise中,在run中会看到这一行代码resolve(result), 然后在下一个微任务中执行next函数
  4. next函数 减少当前并发数 并判断是否需要继续 移出队列中的任务进行执行

一些疑惑点

为什么enqueue函数中 对于 if的判断 要包装到 一个async函数中? 如果你弄明白了 async/await中的执行流程以及微任务的概念就可以理解其中的原由

(async () => {
    await Promise.resolve();
    //包装在异步函数 下面的代码 会在主线程执行完后 微任务中去处理
    //这里的代码不会被立即执行
    //下面的代码会在下一个微任务中执行  主要是为了保证 activeCount 的更新
    //因为activeCount 的增加 是在 run这个异步函数中
    if (activeCount < concurrency && queue.size > 0) {
            queue.dequeue()();
        }
})();
const run = async (fn, resolve, args) => {
        activeCount  ; 
        const result = (async () => fn(...args))();
        resolve(result); 
        try {
                await result;
        } catch {}
        next(); // await后的 非 await代码,被包装成Promise 推入到微任务队列
};
  1. 作者的注释中 告诉你是为了等待 activeCount 的 异步更新, 也就是 先把所有的异步函数先加入到队列中但不执行,需要留意的是, 因为enqueque函数 会产生对应函数调用次数的微任务, 然后每次微任务执行的时候就会判断activeCount , 默认为0 ,小于concurrency, 那么就会移出队列中第一个异步任务进行执行, 执行时 会增加 activeCount, 异步任务会依次执行直到 activeCount达到了 限定值

  2. 同时run函数中的 next函数执行是在 await result之后,所以 这一步 next函数执行也会被加入到 微任务队列中,所以不难想象出 整体的 执行流程是这样的:

微任务中判断并发数 -> 还在最大范围内 则执行 dequeuqe()返回的函数, 对应的就是run函数 -> run函数中 会将 next 的执行 推入到 下一次的微任务 -> next执行的时候则会判断队列中是否还有任务 然后继续执行队列中的任务 对应的就是一个个的 run函数

普通情况下你也可以不把这个if 判断 放入到 async 自执行函数,但为了保证activeCount的 异步更新 作者并没有这么做,因为 run 函数中的 next 函数执行 始终会进入到微任务队列中 保证任务执行不会出问题

实现

我们根据这一思路 实现一个 限制并发数的 函数 ,核心原理即 判断并发数 决定 异步任务的执行 还是 加入队列,并在异步任务完成后 再次对并发和 任务队列做一次判断

const harexsLimit = (maxCount) => {
    let activeCount = 0
    let waitTask = []

    const execute = (asyncFn,...args)=>{
        return new Promise((resolve,reject)=>{
            const task = create(asyncFn,args,resolve,reject)
            if(activeCount >= maxCount){
                waitTask.push(task)
            }else{
                task()
            }
        })
    }

    const create = (asyncFn,args,resolve,reject) => {
        return ()=>{
            asyncFn(...args).then(resolve).catch(reject).finally(()=>{
                activeCount--
                if(waitTask.length){
                    waitTask.shift()()
                }
            })
            activeCount  
        }
    }

    return execute
}


let limitP = harexsLimit(3)

function sleep(sec){
    return new Promise((resolve,reject)=>{
        console.log('本函数的执行时要等待'  sec   '秒')
        setTimeout(()=>{
            console.log('等待了'  sec   '秒')
            resolve()
        },sec*1000)
    })
}

limitP(sleep,1)
limitP(sleep,1.1)
limitP(sleep,1.2)
limitP(sleep,3)
limitP(sleep,1.3)

发布

根据之前mitt那一期发布的流程 再发布一个包~ harexs-limit

npm i harexs-limit

感想

这一期 在工作之余抽空看了好几次, 没有人指点自己琢磨着看 有点吃力,主要是感觉有点绕~ 我觉得还是基础不扎实, 只有理解了 微任务 以及 async 函数中 的执行流程后, 才突然理解明白了, 不理解微任务 宏任务 以及 async/await 的执行流程 看多少遍 都是迷糊的哈哈!

这篇好文章是转载于:学新通技术网

  • 版权申明: 本站部分内容来自互联网,仅供学习及演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,请提供相关证据及您的身份证明,我们将在收到邮件后48小时内删除。
  • 本站站名: 学新通技术网
  • 本文地址: /boutique/detail/tanefjh
系列文章
更多 icon
同类精品
更多 icon
继续加载