asyncio并发编程

《Python3高级核心技术97讲,bobby》学习笔记,第十二章:asynio的使用和案例。
307阅读 · 2020-7-11 22:52发布

asyncio定义

  • astncio是用于解决异步IO并发编程的核心模块。
  • 包含各种特定系统实现的模块化事件循环(例如支持linux的epoll和windows的select)
  • 传输和协议抽象。
  • 对TCP、UDP、SSL、子进程、延时调用以及其他的具体支持。
  • 模仿futures模块但适用于事件循环的Future类。
  • 基于yield from的协议和任务,让顺序的方式编写并发代码。
  • 使用一个将产生阻塞IO的调用时,可以通过接口将这个事件放到线程池执行。

12.1 事件循环的基本使用

  • 协程中如果需要使用sleep,可以使用asyncio.sleep。同时需要加上await关键字(会立即返回结果)。
  • asyncio.get_event_loop()可以循环获取事件。
  • run_until_complete()相当于多线程中的join方法,可以传递一个协程作为参数。把参数注册到loop中。代码示例:

    import asyncio
      import time
    
      async def get_html(url):
          print('start get url')
          await asyncio.sleep(2)
          print('end get url')
    
      if __name__ == '__main__':
          start_time = time.time()
          loop = asyncio.get_event_loop()
          loop.run_until_complete(get_html('http://www.baidu.com'))
          print(time.time() - start_time)
    
  • asyncio.ensure_future()可以提交一个任务,返回Future对象。
  • loop.create_task()也可以提交一个任务。
  • future和task(task是future的子类)的result()方法可以获取返回值。代码示例:

    import asyncio
      import time
    
      async def get_html(url):
          print('start get url')
          await asyncio.sleep(2)
          return 'return get url'
    
      if __name__ == '__main__':
          start_time = time.time()
          loop = asyncio.get_event_loop()
          get_future = asyncio.ensure_future(get_html('http://www.baidu.com'))
          # loop.create_task(get_html('http://www.baidu.com'))
          loop.run_until_complete(get_future)
          print(get_future.result())
          print(time.time() - start_time)
    
  • asyncio.ensure_future内部实际还是调用loop的create_task方法,loop对象是从events.get_event_loop()中获取(一个线程只有一个loop)。
  • add_done_callback()可以向调用的函数最后加入要执行的函数。代码示例:

    import asyncio
      import time
    
      async def get_html(url):
          print('start get url')
          await asyncio.sleep(2)
          return 'return get url'
    
      def callback(future):
          print("callback")
    
      if __name__ == '__main__':
          start_time = time.time()
          loop = asyncio.get_event_loop()
          # get_future = asyncio.ensure_future(get_html('http://www.baidu.com'))
          task = loop.create_task(get_html('http://www.baidu.com'))
          task.add_done_callback(callback)
          loop.run_until_complete(task)
          print(task.result())
          print(time.time() - start_time)
    
  • callback中有参数时,可以使用functools的partial,用于将函数包裹成另外一个函数,然后将新函数放入add_done_callback中。代码示例:

    import asyncio
      import time
      from functools import partial
    
      async def get_html(url):
          print('start get url')
          await asyncio.sleep(2)
          return 'return get url'
    
      def callback(url,future):
          print(url)
    
      if __name__ == '__main__':
          start_time = time.time()
          loop = asyncio.get_event_loop()
          # get_future = asyncio.ensure_future(get_html('http://www.baidu.com'))
          task = loop.create_task(get_html('http://www.baidu.com'))
          task.add_done_callback(partial(callback,'http://www.baidu.com'))
          loop.run_until_complete(task)
          print(task.result())
          print(time.time() - start_time)
    
  • wait可以指定继续执行的条件,例如全部执行完后再往下执行。代码示例:

    import asyncio
      import time
    
      async def get_html(url):
          print('start get url')
          await asyncio.sleep(2)
          print('end get url')
    
      if __name__ == '__main__':
          start_time = time.time()
          loop = asyncio.get_event_loop()
          tasks = [get_html('http://www.baidu.com') for i in range(10)]
          loop.run_until_complete(asyncio.wait(tasks))  # 默认全部执行
          print(time.time() - start_time)
    
  • gather和wait几乎相同,可以直接互相替代使用。代码示例:

    import asyncio
      import time
    
      async def get_html(url):
          print('start get url')
          await asyncio.sleep(2)
          print('end get url')
    
      if __name__ == '__main__':
          start_time = time.time()
          loop = asyncio.get_event_loop()
          tasks = [get_html('http://www.baidu.com') for i in range(10)]
          loop.run_until_complete(asyncio.gather(*tasks))
          print(time.time() - start_time)
    
  • gather还可以对tasks进行分组,或者取消(cancel)。代码示例:

    import asyncio
      import time
    
      async def get_html(url):
          print(url)
          await asyncio.sleep(1)
          print('end get %s' % url)
    
      if __name__ == '__main__':
          start_time = time.time()
          loop = asyncio.get_event_loop()
          tasks1 = [get_html('http://www.baidu.com') for i in range(5)]
          tasks2 = [get_html('http://www.test.com') for i in range(5)]
          group1 = asyncio.gather(*tasks1)
          group2 = asyncio.gather(*tasks2)
          loop.run_until_complete(asyncio.gather(group1,group2))
          print(time.time() - start_time)
    

12.2 task取消和子协程调用

task取消

  • loop.run_forever()运行后永远不会停止(loop.run_until_complete运行完指定future后会停止)
  • asyncio.Task.all_tasks()获取所有task。它在源码中会从events.get_event_loop()获取loop对象,从而获取所有任务。
  • task.cancel()取消任务。
  • 代码示例:

    import asyncio
    
      async def get_html(sleep_time,url):
          print('get %s' % url)
          await asyncio.sleep(sleep_time)
          print('done')
    
      if __name__ == '__main__':
          url = 'http://baidu.com'
          task1 = get_html(2,url)
          task2 = get_html(3,url)
          task3 = get_html(3,url)
    
          tasks = [task1,task2,task3]
    
          loop = asyncio.get_event_loop()
          try:
              loop.run_until_complete(asyncio.wait(tasks))
          except KeyboardInterrupt as e:
              all_tasks = asyncio.Task.all_tasks()
              for task in all_tasks:
                  print('cancel task')
                  print(task.cancel())
              loop.stop()  # 标记stopping为True
              loop.run_forever()  # 取消会报错
          finally:
              loop.close()  # 清空队列,executor关闭等。
    

协程中嵌套协程(子协程调用)

  • 官方文档示例代码:

    import asyncio
      async def compute(x,y):
          print("Compute %s + %s ..." % (x,y))
          await asyncio.sleep(1)
          return x+y
    
      async def print_sum(x,y):
          result = await compute(x,y)
          print("%s + %s = %s" % (x,y,result))
    
      loop = asyncio.get_event_loop()
      loop.run_until_complete(print_sum(1,2))
      loop.close()
    

12.3 asyncio中其他函数

  • loop.call_soon():队列中等待到下一个循环时执行指定函数。传递函数和函数的参数。代码示例:

    import asyncio
    
      def callback(sleep_time):
          print("sleep {} success".format(sleep_time))
    
      def stoploop(loop):
          loop.stop()
    
      if __name__ == '__main__':
          loop = asyncio.get_event_loop()
          loop.call_soon(callback,2)
          loop.call_soon(stoploop,loop)
          loop.run_forever()
    
  • loop.call_later():指定时间执行指定函数,当有多个时会根据时间排序执行。代码示例:

    import asyncio
    
      def callback(sleep_time):
          print("sleep {} success".format(sleep_time))
    
      def stoploop(loop):
          loop.stop()
    
      if __name__ == '__main__':
          loop = asyncio.get_event_loop()
          loop.call_later(2,callback,2)
          loop.call_later(5,stoploop,loop)
          loop.call_soon(callback,3)
          loop.run_forever()
    
  • loop.call_at():指定时间运行函数,但时间是loop中的单调时间。代码示例:

    import asyncio
    
      def callback(sleep_time,loop):
          print("sleep {} success: time {}".format(sleep_time,loop.time()))
    
      def stoploop(loop):
          loop.stop()
    
      if __name__ == '__main__':
          loop = asyncio.get_event_loop()
          now = loop.time()
          loop.call_at(now+2,callback,2,loop)
          loop.call_at(now+1,callback,1,loop)
          loop.call_at(now+3,callback,3,loop)
          loop.call_soon(callback,4,loop)
          loop.run_forever()
    
  • loop.call_soon_threadsafe():它是线程安全的方法。和call_soon的逻辑几乎一致,但它多了write_to_self方法,用于解决线程安全的问题。

12.4 ThreadPoolExecutor和asyncio完成阻塞IO请求

  • 在协程中如果需要使用阻塞IO时,则可以使用多线程(线程池)实现。
  • loop.run_in_executor():可以将阻塞IO放入executor中运行。传入线程池和需要执行的函数以及参数。代码示例:

    import asyncio
      from concurrent.futures import ThreadPollExecutor
    
      def get_url(url):
          pass
    
      if __name__ == "__main__":
          loop asyncio.get_event_loop()
          executor = ThreadPollExecutor()
          tasks = []
          for url in range(20):
              url = "http://baidu.com/{}/".format(url)
              task = loop.run_in_executor(executor,get_url,url)
              tasks.append(task)
          loop.run_until_complete(asyncio.wait(tasks))
    

12.5 asyncio模拟http请求

  • asyncio没有提供http协议的接口。(aiohttp可以)
  • asyncio.open_connection():建立socket连接,返回reader和writer。
  • async for语法:将for异步化。
  • asyncio.ensure_future():将task变为future对象。
  • asyncio.as_completed():传入任务列表,完成一个返回一个。代码示例:
    for task in asyncio.as_completed(tasks):
          result = await task
    

12.6 asyncio的同步和通信

  • asyncio基于单线程,但是其中如果使用了异步的操作,异步操作返回过慢时,会出现多次调用的的情况。此时如果没有锁,则会重复执行代码。
  • from asyncio import Lock:是asyncio中可以使用的锁。代码示例:

    import asyncio
      from asyncio import Lock
      cache = {}
      lock = Lock()
    
      async def get_stuff(url):
          # 也可以使用with await lock:或async with lock:
          awaitlock.acquire()
          if url in cache:
              return cache[url]
          stuff = await aiohttp.request('GET',url)
          cache[url] = stuff
          return stuff
          lock.release()
    
  • asyncio中还有Queue(在协程中使用有限流的效果)等,使用方式和线程中相同,但前面需要加上await关键字。
  • 本章节的视频内容主要讲解Lock和Queue的源码执行过程,如果有需要请自行查看源码。

12.7 aiohttp实现高并发爬虫

  • aiohttp是第三方库,github地址:https://github.com/aio-libs/aiohttp
  • 它是基于asyncio实现的http client/server。
  • 可以在pypi中查看它支持的Python版本(目前20200709支持Python3.6),pypi地址:https://pypi.org/project/aiohttp/
  • 基于asyncio的另一个web服务器:sanic。号称性能可以和go语言媲美。
  • aiomysql:基于asyncio的mysql的驱动。
  • pyquery:用于解析html的包,类似jquery。(如果安装提示lxml出错,可以访问如下地址下载安装http://www.lfd.ucl.edu/~gohlke/pythonlibs/)

实现步骤如下:

  1. 定义一个传入url,获取response的函数。示例:
    async def fetch(url):
         async with aiohttp.ClientSession() as session:
             try:
                 async with session.get(url) as response:
                     print("url status:{}".format(response.status))
                     if response.status in [200,201]:
                         data = await response.text()
                         return data
             except Exception as e:
                 print(e)
    
  2. 定义main函数,程序执行的主流程。使用aiomysql创建mysql的链接对象。示例:
    async def main(loop):
         # 等待mysql链接建立好
         pool = await aiomysql.create_pool(host='127.0.0.1',port=3306,user='root',password='',db='mysql',loop=loop,charset="utf8",autocommit=True)
    
  3. 将使用list(也可以用asyncio的队列)传递url,把爬取的url放入list等待解析。
  4. 将使用set存放已爬取url(set去重)。
  5. 创建初始化url的函数,用于调用方法从第一个url开始执行。
  6. 创建解析返回值,获取其中url的函数。示例:

    def extract_urls(html):
         urls = []
         pq = PyQuery(html)
         # 获取所有a标签
         for link in pq.items("a"):
             url = link.attr("href")  # 获取属性href的值
             if url and url not in seen_urls:
                 urls.append(url)
                 waitting_urls.append(url)
         return urls
    
     async def init_urls():
         html = await fetch(start_url)
         extract_urls(html)
    
  7. 创建consumer函数,用于循环查看待爬取的list中有无数据,有就判断url是否是详情页面,是就在详情页面中获取指定数据。示例:
    async def consumer():
         while not stopping:
             # 因为是异步,需要判断waitting_urls是否为空,否则可能会报错
             if len(waitting_urls) == 0:
                 await asyncio.sleep(0.5)
                 continue
             url = waitting_urls.pop()
             print('start get url: {}'.format(url))
             if re.match('https://www.datatest.org/\w+/',url):
                 if url not in seen_urls:
                     pass
    
  8. 创建解析url详情页的函数,用于从详情页面获取指定数据。代码示例:
    async def article_handler(url,session):
         # 获取文章详情并且入库
         html = await fetch(url,session)
         extract_urls(html)  # 将页面拿去提取可爬取url
         pq = PyQuery()
         title = pq("title").text()
    
  9. 进行mysql存储数据。示例代码:
    async def article_handler(url,session,pool):
         # 获取文章详情并且入库
         html = await fetch(url,session)
         seen_urls.add(url)
         extract_urls(html)  # 将页面拿去提取可爬取url
         pq = PyQuery()
         title = pq("title").text()
         async with pool.acquire() as conn:
             async with conn.cursor() as cur:
                 await cur.execute("SELECT 42;")
                 insert_sql = "insert into article(title) value('{}','{}')".format(title)
                 await cur.execute(insert_sql)
    
  10. 使用asyncio.Semaphore()控制并发数量。代码示例:
    sem = asyncio.Semaphore(1)  # 1并发
    async with sem:  # 将需要控制并发的代码放到该行下即可
    
  11. 编写调用代码,检查整个流程。大致流程如下:
    • 调用主函数main()
    • main函数中创建mysql连接对象,获取aiohttp的session获取第一个返回值,传入extract_urls解析html代码中的a标签值。调用consumer循环对抓取到的url进行处理。
    • fetch()函数用于获取http返回值。
    • extract_urls()函数用于解析html中的a标签值,将可爬取的url放入waitting_urls中。
    • init_urls()函数用于处理非详情页url时进行的逻辑操作。
    • article_handler()函数用于获取详情页的内容,并入库。
    • consumer()函数用于循环读取waitting_urls中的数据。
  12. 完成代码示例:

    import asyncio
    import re
    
    import aiohttp
    import aiomysql
    from pyquery import PyQuery
    
    start_url = "https://www.datatest.org"  # 起始路径
    waitting_urls = []
    seen_urls = set()
    stopping = False
    
    sem = asyncio.Semaphore(1)
    
    # 获取url的返回值
    async def fetch(url,session):
        async with sem:
            await asyncio.sleep(5)
            print("-------------------sleep-----------------")
            try:
                async with session.get(url) as response:
                    print("url status:{}".format(response.status))
                    if response.status in [200,201]:
                        data = await response.text()
                        return data
            except Exception as e:
                print(e)
    
    # 从html中获取a标签的值
    def extract_urls(html):
        urls = []
        pq = PyQuery(html)
        # 获取所有a标签
        for link in pq.items("a"):
            url = "https://www.datatest.org" + link.attr("href")  # 获取属性href的值
            if url and url not in seen_urls:
                urls.append(url)
                waitting_urls.append(url)
        return urls
    
    async def init_urls(url,session):
        html = await fetch(url,session)
        seen_urls.add(url)
        extract_urls(html)
    
    # 用于解析详情页面,把需要抓取的内容存入mysql
    async def article_handler(url,session,pool):
        # 获取文章详情并且入库
        html = await fetch(url,session)
        seen_urls.add(url)
        extract_urls(html)  # 将页面拿去提取可爬取url
        pq = PyQuery(html)
        title = pq("title").text()
        print("-----------")
        async with pool.acquire() as conn:
            async with conn.cursor() as cur:
                await cur.execute("SELECT 42;")
                insert_sql = "insert into article(title) value('{}')".format(title)
                print(insert_sql)
                await cur.execute(insert_sql)
    
    # 用于循环监控是否有需要处理的url
    async def consumer(pool):
        async with aiohttp.ClientSession() as session:
            while not stopping:
                # 因为是异步,需要判断waitting_urls是否为空,否则可能会报错
                if len(waitting_urls) == 0:
                    await asyncio.sleep(0.5)
                    continue
                url = waitting_urls.pop()
                print('start get url: {}'.format(url))
                if re.match('https://www.datatest.org/blog/\w+',url):
                    if url not in seen_urls:
                        asyncio.ensure_future(article_handler(url,session,pool))
                        await asyncio.sleep(0.5)
                # else:  # 如果不是详情页
                #     if url not in seen_urls:
                #         asyncio.ensure_future(init_urls(url,session))
    
    async def main(loop):
        # 等待mysql链接建立好
        pool = await aiomysql.create_pool(host='127.0.0.1',port=3306,user='root',password='root',db='testcode',loop=loop,charset="utf8",autocommit=True)
        async with aiohttp.ClientSession() as session:
            html = await fetch(start_url,session)
            seen_urls.add(start_url)
            extract_urls(html)
        asyncio.ensure_future(consumer(pool))
    
    if __name__ == '__main__':
        loop = asyncio.get_event_loop()
        loop.run_until_complete(main(loop))
        loop.run_forever()
    

爬取后的数据截图