学新通技术网

Python实现多线程、多协程的并发爬取

juejin 5 1
Python实现多线程、多协程的并发爬取

线程

线程(英语:thread)是操作系统能够进行运算调度的最小单位。它被包含在进程之中,是进程中的实际运作单位。

单线程实现

from lxml import etree
import requests
import json
import time
header = {
'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/103.0.0.0 Safari/537.36'

}

local_file = open('duanzi.json','a',encoding='utf-8')

def parse_html(html):
    text = etree.HTML(html)
    # 返回所有段子的节点
    node_list = text.xpath('/html/body/div[2]/div[3]/ul[2]/li')
    for node in node_list:
        try:
            id = node.xpath('./span[1]/text()')[0]
            state = node.xpath('./span[2]/text()')[0].strip()


            items = {
                'id':id,
                'state':state
            }
            # print(items)
            local_file.write(json.dumps(items)+'\n')

        except:
            pass

url = 'https://wz.sun0769.com/political/index/politicsNewest?id=1&page=2'

def main():
    for i in range(1,20):
        # 每一页的网址
        url = f'https://wz.sun0769.com/political/index/politicsNewest?id=1&page={i}'
        html = requests.get(url=url,headers=header).text
        parse_html(html)

if __name__ == '__main__':
    t1= time.time()
    main()
    print(time.time()-t1)

多线程实现的流程

  1. 使用一个pageQueue队列保存要访问的网页
  2. 同时启动多个采集线程,每个线程都要从网页页码队列pageQueue中取出要访问的页码,构建网址,访问网址并爬取数据,操作完一个网页后再从网页队列中选取下一个页码,依次进行,直到所有的页码都已访问完毕,所有采集线程保存在threadCrawls中
  3. 使用一个dataCode来保存所有的网页代码,每个线程获取到的数据都应该放入队列中
  4. 同时启动多个解析线程,每个线程都从网页源代码dataQueue中取出一个网页源代码,并进行解析获得想要的数据,解析完成以后再选取下一个进行同样的操作,直至所有的解析完成。将所有的解析线程存储在列表threadParses中
  5. 将解析的json数据存储在本地文件中
import json
import threading
from queue import Queue
from lxml import etree
import time
import random
import requests

crawl = False  # 全局变量,标识pageQueue队列是否为空


class ThreadCrawl(threading.Thread):  # 采集线程
    def __init__(self, threadName, pageQueue, dataQueue):
        threading.Thread.__init__(self)
        self.threadName = threadName
        self.pageQueue = pageQueue
        self.dataQueue = dataQueue
        self.headers = {
            'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/103.0.0.0 Safari/537.36'
        }
    # 重写run函数
    def run(self):
        print("启动" + self.threadName)
        while not crawl:
            try:
                # 从dataQueue中取出一个页码数字,先进先出
                page = self.pageQueue.get(False)
                url = f'https://wz.sun0769.com/political/index/politicsNewest?id=1&page={page}'
                time.sleep(random.uniform(1,3))  # 降低访问频率,防止ip被封
                content = requests.get(url, headers=self.headers).text
                # 将爬到的网页源代码放入dataQueue队列中
                self.dataQueue.put(content)
            except:
                pass
        print("结束" + self.threadName)

PARSE_EXIT = False

class ThreadParse(threading.Thread):    # 解析线程
    def __init__(self, threadName, dataQueue, localFile, lock):
        super(ThreadParse, self).__init__()
        self.threadName = threadName
        self.dataQueue = dataQueue
        self.localFile = localFile  # 文件夹
        self.lock = lock  # 互斥锁

    def run(self):
        print("启动" + self.threadName)
        while not PARSE_EXIT:
            try:
                html = self.dataQueue.get(False)
                self.parse(html)
            except:
                pass
        print("结束" + self.threadName)
    def parse(self, html):
        text = etree.HTML(html)
        # 返回所有段子的节点
        node_list = text.xpath('/html/body/div[2]/div[3]/ul[2]/li')
        for node in node_list:
            try:
                id = node.xpath('./span[1]/text()')[0]
                state = node.xpath('./span[2]/text()')[0].strip()
                items = {
                    'id': id,
                    'state': state
                }
                # print(items)
                with self.lock:  #
                    print(json.dumps(items))
                    self.localFile.write(json.dumps(items) + '\n')

                # 在多线程开发中,为了保护资源的完整性,在访问共享资源时 需要使用共享锁,线程获得共享锁以后才可以访问文件中的localFile
                # 并往里添加数据,写入完毕以后将锁释放,这样其他线程就可以访问这个文件了
            except:
                pass


def main():
    pageQueue = Queue(20)
    for i in range(1, 21):
        pageQueue.put(i)
    dataQueue = Queue()
    localFile = open('多线程.json', 'a')
    lock = threading.Lock()  # 互斥锁
    crawlList = ['采集线程1', '采集线程2', '采集线程3']
    # 创建,启动和存储3个采集线程
    threadCrawls = []
    for thredName in crawlList:
        thread = ThreadCrawl(thredName, pageQueue, dataQueue)
        thread.start()  # 启动线程
        threadCrawls.append(thread)
    # 创建三个解析线程
    parseList = ['解析线程1', '解析线程2', '解析线程3']
    theradParses = []
    for threadName in parseList:
        thread = ThreadParse(threadName, dataQueue, localFile, lock)
        thread.start()
        theradParses.append(thread)
    while not pageQueue.empty():
        pass
    # 为空,采集线程退出循环
    global crawl
    crawl = True
    print("pageQueue为空")
    for thread in threadCrawls:
        thread.join()  # 阻塞线程
    while not dataQueue.empty():
        pass
    print('dataQueue为空')
    global PARSE_EXIT
    PARSE_EXIT = True
    for thread in theradParses:
        thread.join()
    with lock:
        localFile.close()


if __name__ == '__main__':
 	t1= time.time()
    main()
    print(time.time()-t1)

协程

  • 协程是一种比线程更小的执行单元,又称微线程(用户态的线程)。在一个线程中可以有多个协程,但是一次只能只能执行一个协程,当所执行的协程遭遇阻塞时,就会切换到下一个任务继续执行,从而提高CPU的利用率,适用于IO密集的场景,可以避免线程过多,减少线程切换之间浪费的时间

协程爬虫的流程分析

由于协程的切换不像多线程那样调度耗费资源,所以不用严格的限制协程的数量

  1. 将要爬取的网址存储在一个列表中,由于针对每一个网址都需要创建一个协程,所以需要准备一个待爬取的网址列表
  2. 为每一个网址创建一个协程并启动该协程。协程会依次执行,爬取对应的网页内容,如果一个协程在执行过程中出现网络阻塞或者其他异常情况,则会立马切换到下一个协程,由于协程的切换不用切换线程,消耗资源较小,所以不用严格限制协程的大小(分情况对待),每个协程协程负责爬取网页,并且将网页中的目标数据解析出来
  3. 将爬取到的目标数据存储到一个列表中
  4. 遍历数据列表,将数据存储到本地文件中

gevent

gevent是一个基于协程的Python网络库,是一个第三方库 安装

pip install genvent

协程实现流程

  1. 定义一个负责负责爬虫的类,所有爬虫的工作完全交给该类负责
  2. 使用一个队列data_queue保存所有的数据
  3. 创建多个协程任务,每一个协程都会使用页码构建完整的网址,访问网址爬取和提取有用的数据,并将数据保存到数据队列中
  4. 将dataQueue队列中的数据保存到本地文件中
import time
import gevent
from lxml import etree
import requests
from queue import Queue

class Spider(object):
    def __init__(self):
        self.headers = {
            'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/103.0.0.0 Safari/537.36'
        }
        self.url = 'https://wz.sun0769.com/political/index/politicsNewest?id=1&page='
        self.dataQueue = Queue()
        self.count = 0

    def send_raquest(self,url):
        print("正在爬取"+url)
        html = requests.get(url,self.headers).text
        time.sleep(1)
        self.parse_page(html)

    def parse_page(self,html):
        text = etree.HTML(html)
        node_list = text.xpath('/html/body/div[2]/div[3]/ul[2]/li')
        for node in node_list:
            try:
                id = node.xpath('./span[1]/text()')[0]
                state = node.xpath('./span[2]/text()')[0].strip()
                items = {
                    'id': id,
                    'state': state
                }
                self.count+=1
                self.dataQueue.put(items)
            except:
                pass
    def start_work(self):
        arr = []
        for page in range(1,20):
            # 创建一个协程任务对象
            url = self.url+str(page)
            job = gevent.spawn(self.send_raquest,url)
            arr.append(job)
        # joinall()接受一个列表,将列表中的所有协程任务添加到任务队列里执行
        gevent.joinall(arr)
        local_file = open("协程.json",'wb+')
        while not self.dataQueue.empty():
            content = self.dataQueue.get()
            result = str(content).encode('utf-8')
            local_file.write(result+b"\n")
        local_file.close()
        print(self.count)


if __name__ == '__main__':
	t1= time.time()
    spider = Spider()
    spider.start_work()
    print(time.time()-t1)

本文出至:学新通技术网

标签: