协程和异步io

《Python3高级核心技术97讲》学习笔记第十一章:协程和异步IO
335阅读 · 2020-7-1 23:28发布

11.1 并发/并行、同步/异步、阻塞/非阻塞概念

  • 并发:一个时间段内,x个程序在同一个cpu运行,就可以描述为x并发。
  • 并行:在任意时刻点上,有多个程序同时运行在多个cpu上。并行数量和cpu数量一致。
  • 同步:指代码调用IO操作时,必须等待IO操作完成才返回的调用方式。
  • 异步:指代码调用IO操作时,不必等IO操作完成就返回的调用方式。
  • 阻塞:指调用函数的时候当前线程被挂起。
  • 非阻塞:指调用函数的时候当前线程不会被挂起,会立即返回。

11.2 IO多路复用:select、poll和epoll

C10K问题

C10k是一个1999年被提出的技术挑战:如何在1Ghz CPU,2G内存,1gbps网络下,让单台服务器同时为1万个客户端提供FTP服务。

如果使用线程的方式,单台服务器能支持的线程数有限。

所以这里就会用到同步/异步、阻塞/非阻塞来解决该问题(IO多路复用)。

Unix下五种I/O模型

  • 阻塞式I/O
  • 非阻塞式I/O
  • I/O多路复用
  • 信号驱动式I/O
  • 异步I/O(POSIX的aio_系列函数)

I/O多路复用

  • 调用select后操作系统会返回已经准备好的socke或文件句柄。
  • select也是阻塞式的方法(如果没有准备好的,会一直阻塞)。
  • I/O多路复用的select可以同时监听多个。

select、poll、epoll

  • select,poll,epoll都是IO多路复用的机制。
  • I/O多路复用是通过一种机制,使一个进程(或线程)可以监视多个描述符,一旦某个描述符就绪(可读或可写就绪),能够通知程序进行相应的读写操作。
  • select,poll,epoll本质上是同步I/O。

select

  • select函数监视的描述符分3类:writefds、readfds和exceptfds。
  • 调用后select函数会阻塞,直到有描述符就绪(有数据可读、可写或者有except),或者超时,函数返回。
  • select返回后,可以通过遍历fdset来找到就绪描述符。
  • 优点:几乎在所有的平台上都支持,具有良好的跨平台支持。
  • 缺点:单个进程能够监视的文件描述符存在最大限制,Linux上一般为1024。(修改宏定义或者重新编译内核对会造成效率降低)

poll

  • poll是select的增强版本。
  • select使用三个位图不表示三个fdset,而poll使用一个pollfd的指针实现。
  • select使用“参数-值”传递方式,而pollfd结构包含了要监视的event和发生的event。
  • pollfd监视的文件描述符没有最大数量限制,但是数据量过大后性能也会下降。
  • poll返回后,和select一样,需要轮训获取就绪的描述符。

epoll

  • epoll是poll的增强版本。
  • epoll只支持linux系统,不支持windows。
  • epoll没有监视描述符数量的限制。
  • epoll使用一个文件描述符管理多个描述符。
  • epoll将用户关系的文件描述符的事件存放到内核的一个事件表中,使得在用户空间和内核空间的copy只需一次。
  • epoll的查询使用了红黑树。

select和epoll的应用比较

  • 并发高的情况下,连接活跃度不是很高(连接后随时断开),epoll比select好。
  • 并发性不高,同时连接很活跃(长期处于连接状态),select比epoll好。

11.3 IO多路复用:select+回调+事件循环实现获取HTTP请求

  • IO多路复用并发性高,它是单线程模式,驱动运行的是事件循环,它不会阻塞在建立连接或等待请求,实际都是CPU的操作。相比多线程,它没有切换线程的开销和内存消耗较小。
  • select一般是指import selete。
  • 编程时通常使用from selectors import DefaultSelector,它包装了select,会更加好用。(DefaultSelector会自动根据平台选择使用select或者epool)
  • select提供了一种注册机制。
  • selector.register(),注册函数,需要传递文件描述符、事件(EVENT_READ、EVENT_WRITE等)和回调函数。
  • selector.unregister(),注销掉监控事件。
  • 事件循环:需要手动去监控事件状态,然后调用对应回调函数。
  • 大致流程:创建多个回调函数,使用selector注册事件和回调函数的映射关系,通过事件循环执行回调函数。(其中通过select注册和注销监控事件)
  • select+回调+事件循环实现获取HTTP请求,代码示例:

    import socket
      from urllib.parse import urlparse
      from selectors import DefaultSelector,EVENT_READ,EVENT_WRITE
    
      selector = DefaultSelector()
      urls = ["http://www.baidu.com"]
      stop = False
    
      class Fetcher:
          def connect(self,key):
              selector.unregister(key.fd)  # 注销监控事件,self.client.fileno()的返回值
              self.client.send("GET {} HTTP/1.1\r\nHost:{}\r\nConnection:close\r\n\r\n".format(self.path, self.host).encode("utf8"))
              selector.register(self.client.fileno(),EVENT_READ,self.readable)
    
          def readable(self,key):
              d = self.client.recv(1024)
              if d:
                  self.data += d
              else:
                  selector.unregister(key.fd)
                  data = self.data.decode("utf8")
                  html_data = data.split("\r\n\r\n")[1]  # 去掉响应头信息
                  print(html_data)
                  self.client.close()
                  urls.remove(self.spider_url)
                  if not urls:
                      global stop
                      stop = True
    
          def get_url(self,url):
              self.data = b""
              self.spider_url = url  # 用于解决windows下支持select的问题
              url = urlparse(url)
              self.host = url.netloc
              self.path = url.path
              if self.path == "":
                  self.path = "/"
    
              self.client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
              self.client.setblocking(False)  # 使用非阻塞IO
              try:
                  self.client.connect((self.host,80))
              except BlockingIOError as e:
                  pass
              selector.register(self.client.fileno(),EVENT_WRITE,self.connect)  # 注册文件描述符,事件,回调函数
    
      def loop():
          while not stop:
              ready = selector.select()
              for key,mask in ready:
                  call_back = key.data
                  call_back(key)
    
      if __name__ == '__main__':
          fetcher = Fetcher()
          fetcher.get_url("http://www.baidu.com")
          loop()
    

11.4 协程是什么

C10M问题

如何利用8核心CPU,64G内存,在10gbps的网络上保持1000万并发连接。

  • 使用协程可以解决C10M问题。
  • 协程就是解决回调编写难的问题。
  • 协程可以采用同步的方式写一部的代码。
  • 协程可以单线程切换任务。
  • 协程又称为有多个入口的函数,或者可以暂停的函数。

11.5 生成器进阶-send、close和throw

  • 生成器的特点是可以实现暂停,从而可以使用生成器完成协程。
  • 将yield放在赋值符号的右侧,可以获取调用方传递进来的值。
  • send():可以传递值传递进入生成器内部,并同时重启生成器执行到下一步。
  • 调用send()方法之前,必须至少启动一次生成器(例如执行一次next),否则只能传递None值。(不然会报错)
  • throw():扔一个异常给生成器(同时会执行下一步),生成器会在上一次的yield中抛出扔进的异常。
  • close():使用close方法后,会抛出GeneratorExit异常。(会自动处理该异常,如果需要捕获的话,建议捕获后使用raise StopIteration,否则RuntimeError异常)。
  • 代码示例:

    def gen_fun():
          html = yield 'http://www.datatest.org'
          print(html)
          try:
              yield 2
          except Exception as e:
              print(e)
          yield 3
          yield 4
          yield 5
          yield 6
          return 'test'
    
      if __name__ == '__main__':
          gen = gen_fun()
          url = next(gen)
          aaa = 'aaa'
          gen.send(aaa)
          gen.throw(Exception,"test error")
          gen.close()
    

11.6 生成器进阶-yield from

  • python3.3添加了yield from语法。
  • yield from后面跟可迭代对象(iterable),可以将可迭代对象的值一个一个yield出来。代码示例:

    # for 写法
      my_list = [1,2,3,4,5,6,7]
    
      def my_chian(*args,**kwargs):
          for my_iterable in args:
              for value in my_iterable:
                  yield value
    
      for value in my_chian(my_list,range(5,10)):
          print(value)
    
      # 相当于如下写法
      my_list = [1,2,3,4,5,6,7]
    
      def my_chian(*args,**kwargs):
          for my_iterable in args:
              yield from my_iterable
    
      for value in my_chian(my_list,range(5,10)):
          print(value)
    
  • 当一个生成器A被另一个生成器B使用yield from,并且C调用B。此时C被称为调用方,B是委托生成器,C是子生成器。示例代码:

    def g1(gen):  # 委托生成器
          yield from gen  # gen是子生成器
    
      def main():  # 调用方
          g = g1()
          g.send(None)
    
  • yield from会在调用方与子生成器之间建立一个双向通道。(调用发执行close或者throw,会直接发送到子生成器,该特点是实现协程的重要环节)
  • 通过yield from实现统计商品销销量。代码示例:

    result = {}
    
      def sales_sum(key):
          total = 0
          nums = []
          while True:
              num = yield
              if not num:
                  break
              total += num
              nums.append(num)
          return total,nums
    
      def middle(key):
          while True:
              result[key] = yield from sales_sum(key)
              print(key+'统计完成')
    
      def main():
          data_sets = {
              'aaa':[1000,2000],
              'bbb':[20,30],
              'ccc':[500,300]
          }
          for key,value in data_sets.items():
              m = middle(key)
              m.send(None)  # 预激
              for i in value:
                  m.send(i)
              m.send(None)
    
          print(result)
    
      if __name__ == '__main__':
          main()
      #输出:
      #aaa统计完成
      #bbb统计完成
      #ccc统计完成
      #{'aaa': (3000, [1000, 2000]), 'bbb': (50, [20, 30]), 'ccc': (800, [500, 300])}
    
  • yield from内部处理了各种异常信息等。(建议参考官方文档)
  • inspect.getgeneratorstate可以获取生成器状态(例如GEN_CREATED等待开始执行、 GEN_RUNNING:解释器正在执行、GEN_SUSPENDED在yield表达式处暂停、GEN_CLOSE执行结束)。

11.7 async和await

  • async和await是原生协程,是Python3.5以后引入的两个关键词。
  • 原生协程便于后期维护。
  • await相当于生成器的yield from。
  • async中不允许使用yield关键字。
  • 原生协程需要使用send调度。示例代码:

    async def downloader(url):
          return 'test'
    
      async def download_url(url):
          html = await downloader(url)
          return html
    
      if __name__ == "__main__":
          coro = download_url("http://www.baidu.com")
          coro.send(None)
    
  • 使用@types.coroutine装饰器装饰生成器(会实现__await__魔法方法),则可以使用await调用。示例代码:

    import types
    
      @types.coroutine
      def downloader(url):
          yield 'test'
    
      # async def downloader(url):
          # return 'test'
    
      async def download_url(url):
          html = await downloader(url)
          return html
    
      if __name__ == "__main__":
          coro = download_url("http://www.baidu.com")
          a = coro.send(None)
          print(a)