协程和异步io
《Python3高级核心技术97讲》学习笔记第十一章:协程和异步IO
335阅读 · 2020-7-1 23:28发布
11.1 并发/并行、同步/异步、阻塞/非阻塞概念
- 并发:一个时间段内,x个程序在同一个cpu运行,就可以描述为x并发。
- 并行:在任意时刻点上,有多个程序同时运行在多个cpu上。并行数量和cpu数量一致。
- 同步:指代码调用IO操作时,必须等待IO操作完成才返回的调用方式。
- 异步:指代码调用IO操作时,不必等IO操作完成就返回的调用方式。
- 阻塞:指调用函数的时候当前线程被挂起。
- 非阻塞:指调用函数的时候当前线程不会被挂起,会立即返回。
11.2 IO多路复用:select、poll和epoll
C10K问题
C10k是一个1999年被提出的技术挑战:如何在1Ghz CPU,2G内存,1gbps网络下,让单台服务器同时为1万个客户端提供FTP服务。
如果使用线程的方式,单台服务器能支持的线程数有限。
所以这里就会用到同步/异步、阻塞/非阻塞来解决该问题(IO多路复用)。
Unix下五种I/O模型
- 阻塞式I/O
- 非阻塞式I/O
- I/O多路复用
- 信号驱动式I/O
- 异步I/O(POSIX的aio_系列函数)
I/O多路复用
- 调用select后操作系统会返回已经准备好的socke或文件句柄。
- select也是阻塞式的方法(如果没有准备好的,会一直阻塞)。
- I/O多路复用的select可以同时监听多个。
select、poll、epoll
- select,poll,epoll都是IO多路复用的机制。
- I/O多路复用是通过一种机制,使一个进程(或线程)可以监视多个描述符,一旦某个描述符就绪(可读或可写就绪),能够通知程序进行相应的读写操作。
- select,poll,epoll本质上是同步I/O。
select
- select函数监视的描述符分3类:writefds、readfds和exceptfds。
- 调用后select函数会阻塞,直到有描述符就绪(有数据可读、可写或者有except),或者超时,函数返回。
- select返回后,可以通过遍历fdset来找到就绪描述符。
- 优点:几乎在所有的平台上都支持,具有良好的跨平台支持。
- 缺点:单个进程能够监视的文件描述符存在最大限制,Linux上一般为1024。(修改宏定义或者重新编译内核对会造成效率降低)
poll
- poll是select的增强版本。
- select使用三个位图不表示三个fdset,而poll使用一个pollfd的指针实现。
- select使用“参数-值”传递方式,而pollfd结构包含了要监视的event和发生的event。
- pollfd监视的文件描述符没有最大数量限制,但是数据量过大后性能也会下降。
- poll返回后,和select一样,需要轮训获取就绪的描述符。
epoll
- epoll是poll的增强版本。
- epoll只支持linux系统,不支持windows。
- epoll没有监视描述符数量的限制。
- epoll使用一个文件描述符管理多个描述符。
- epoll将用户关系的文件描述符的事件存放到内核的一个事件表中,使得在用户空间和内核空间的copy只需一次。
- epoll的查询使用了红黑树。
select和epoll的应用比较
- 并发高的情况下,连接活跃度不是很高(连接后随时断开),epoll比select好。
- 并发性不高,同时连接很活跃(长期处于连接状态),select比epoll好。
11.3 IO多路复用:select+回调+事件循环实现获取HTTP请求
- IO多路复用并发性高,它是单线程模式,驱动运行的是事件循环,它不会阻塞在建立连接或等待请求,实际都是CPU的操作。相比多线程,它没有切换线程的开销和内存消耗较小。
- select一般是指import selete。
- 编程时通常使用from selectors import DefaultSelector,它包装了select,会更加好用。(DefaultSelector会自动根据平台选择使用select或者epool)
- select提供了一种注册机制。
- selector.register(),注册函数,需要传递文件描述符、事件(EVENT_READ、EVENT_WRITE等)和回调函数。
- selector.unregister(),注销掉监控事件。
- 事件循环:需要手动去监控事件状态,然后调用对应回调函数。
- 大致流程:创建多个回调函数,使用selector注册事件和回调函数的映射关系,通过事件循环执行回调函数。(其中通过select注册和注销监控事件)
select+回调+事件循环实现获取HTTP请求,代码示例:
import socket from urllib.parse import urlparse from selectors import DefaultSelector,EVENT_READ,EVENT_WRITE selector = DefaultSelector() urls = ["http://www.baidu.com"] stop = False class Fetcher: def connect(self,key): selector.unregister(key.fd) # 注销监控事件,self.client.fileno()的返回值 self.client.send("GET {} HTTP/1.1\r\nHost:{}\r\nConnection:close\r\n\r\n".format(self.path, self.host).encode("utf8")) selector.register(self.client.fileno(),EVENT_READ,self.readable) def readable(self,key): d = self.client.recv(1024) if d: self.data += d else: selector.unregister(key.fd) data = self.data.decode("utf8") html_data = data.split("\r\n\r\n")[1] # 去掉响应头信息 print(html_data) self.client.close() urls.remove(self.spider_url) if not urls: global stop stop = True def get_url(self,url): self.data = b"" self.spider_url = url # 用于解决windows下支持select的问题 url = urlparse(url) self.host = url.netloc self.path = url.path if self.path == "": self.path = "/" self.client = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.client.setblocking(False) # 使用非阻塞IO try: self.client.connect((self.host,80)) except BlockingIOError as e: pass selector.register(self.client.fileno(),EVENT_WRITE,self.connect) # 注册文件描述符,事件,回调函数 def loop(): while not stop: ready = selector.select() for key,mask in ready: call_back = key.data call_back(key) if __name__ == '__main__': fetcher = Fetcher() fetcher.get_url("http://www.baidu.com") loop()
11.4 协程是什么
C10M问题
如何利用8核心CPU,64G内存,在10gbps的网络上保持1000万并发连接。
- 使用协程可以解决C10M问题。
- 协程就是解决回调编写难的问题。
- 协程可以采用同步的方式写一部的代码。
- 协程可以单线程切换任务。
- 协程又称为有多个入口的函数,或者可以暂停的函数。
11.5 生成器进阶-send、close和throw
- 生成器的特点是可以实现暂停,从而可以使用生成器完成协程。
- 将yield放在赋值符号的右侧,可以获取调用方传递进来的值。
- send():可以传递值传递进入生成器内部,并同时重启生成器执行到下一步。
- 调用send()方法之前,必须至少启动一次生成器(例如执行一次next),否则只能传递None值。(不然会报错)
- throw():扔一个异常给生成器(同时会执行下一步),生成器会在上一次的yield中抛出扔进的异常。
- close():使用close方法后,会抛出GeneratorExit异常。(会自动处理该异常,如果需要捕获的话,建议捕获后使用raise StopIteration,否则RuntimeError异常)。
代码示例:
def gen_fun(): html = yield 'http://www.datatest.org' print(html) try: yield 2 except Exception as e: print(e) yield 3 yield 4 yield 5 yield 6 return 'test' if __name__ == '__main__': gen = gen_fun() url = next(gen) aaa = 'aaa' gen.send(aaa) gen.throw(Exception,"test error") gen.close()
11.6 生成器进阶-yield from
- python3.3添加了yield from语法。
yield from后面跟可迭代对象(iterable),可以将可迭代对象的值一个一个yield出来。代码示例:
# for 写法 my_list = [1,2,3,4,5,6,7] def my_chian(*args,**kwargs): for my_iterable in args: for value in my_iterable: yield value for value in my_chian(my_list,range(5,10)): print(value) # 相当于如下写法 my_list = [1,2,3,4,5,6,7] def my_chian(*args,**kwargs): for my_iterable in args: yield from my_iterable for value in my_chian(my_list,range(5,10)): print(value)
当一个生成器A被另一个生成器B使用yield from,并且C调用B。此时C被称为调用方,B是委托生成器,C是子生成器。示例代码:
def g1(gen): # 委托生成器 yield from gen # gen是子生成器 def main(): # 调用方 g = g1() g.send(None)
- yield from会在调用方与子生成器之间建立一个双向通道。(调用发执行close或者throw,会直接发送到子生成器,该特点是实现协程的重要环节)
通过yield from实现统计商品销销量。代码示例:
result = {} def sales_sum(key): total = 0 nums = [] while True: num = yield if not num: break total += num nums.append(num) return total,nums def middle(key): while True: result[key] = yield from sales_sum(key) print(key+'统计完成') def main(): data_sets = { 'aaa':[1000,2000], 'bbb':[20,30], 'ccc':[500,300] } for key,value in data_sets.items(): m = middle(key) m.send(None) # 预激 for i in value: m.send(i) m.send(None) print(result) if __name__ == '__main__': main() #输出: #aaa统计完成 #bbb统计完成 #ccc统计完成 #{'aaa': (3000, [1000, 2000]), 'bbb': (50, [20, 30]), 'ccc': (800, [500, 300])}
- yield from内部处理了各种异常信息等。(建议参考官方文档)
- inspect.getgeneratorstate可以获取生成器状态(例如GEN_CREATED等待开始执行、 GEN_RUNNING:解释器正在执行、GEN_SUSPENDED在yield表达式处暂停、GEN_CLOSE执行结束)。
11.7 async和await
- async和await是原生协程,是Python3.5以后引入的两个关键词。
- 原生协程便于后期维护。
- await相当于生成器的yield from。
- async中不允许使用yield关键字。
原生协程需要使用send调度。示例代码:
async def downloader(url): return 'test' async def download_url(url): html = await downloader(url) return html if __name__ == "__main__": coro = download_url("http://www.baidu.com") coro.send(None)
使用@types.coroutine装饰器装饰生成器(会实现__await__魔法方法),则可以使用await调用。示例代码:
import types @types.coroutine def downloader(url): yield 'test' # async def downloader(url): # return 'test' async def download_url(url): html = await downloader(url) return html if __name__ == "__main__": coro = download_url("http://www.baidu.com") a = coro.send(None) print(a)