多线程、多进程和线程池编程

《Python3高级核心技术97讲》学习笔记第十章:多线程、多进程和线程池编程
443阅读 · 2020-6-24 23:50发布

0# 第十章 多线程、多进程和线程池编程

10.1 python中的GIL

  • GIL(global interpreter lock),全局解释器锁。
  • python中的一个线程对应于c语言中的一个线程。
  • gil使得同一个时刻只有一个线程在一个cpu上执行字节码。无法将多个线程映射到多个CPU上。
  • gil释放条件:1、字节码执行一定行数后释放。2、通过时间片划分(例如a线程执行15秒后切换到另一个线程)。3、遇到IO操作时会主动释放。
  • gil锁会释放的示例代码:

    total = 0
      def add():
          global total
          for i in range(1000000):
              total += 1
    
      def desc():
          global total
          for i in range(1000000):
              total -= 1
    
      import threading
      thread1 = threading.Thread(target=add)
      thread2 = threading.Thread(target=desc)
      thread1.start()
      thread2.start()
    
      thread1.join()
      thread2.join()
    
      print(total)
      # 输出结果每次都不一样
    

10.2 多线程编程-threading

  • thread1.setDaemon(True):将thread1设置为守护线程,主线程退出后守护线程立即退出。
  • thread1.join():执行到这后会一直阻塞,直到程执行完成后才会继续执行。
  • 通过Thread对象执行多线程。示例:

    import threading
      import time
      def get_detail_html(url):
          print("get detail html started")
          time.sleep(2)
          print("get detail html end")
    
      def get_detail_url(url):
          print("get detail url started")
          time.sleep(2)
          print("get detail url end")
    
      if __name__ == "__main__":
          thread1 = threading.Thread(target=get_detail_html,args=("",))
          thread2 = threading.Thread(target=get_detail_url,args=("",))
          start_time = time.time()
          thread1.start()
          thread2.start()
          thread1.join()
          thread2.join()
          print("last time:{}".format(time.time()-start_time))
    
  • 通过继承Thread实现多线程。示例:

    import threading
      import time
      class GetDetailHtml(threading.Thread):
          def __init__(self,name):
              super().__init__(name=name)
    
          def run(self):
              print("get detail html started")
              time.sleep(2)
              print("get detail html end")
    
      class GetDetailUrl(threading.Thread):
          def __init__(self,name):
              super().__init__(name=name)
    
          def run(self):
              print("get detail url started")
              time.sleep(2)
              print("get detail url end")
    
      if __name__ == "__main__":
          thread1 = GetDetailHtml("t1")
          thread2 = GetDetailUrl("t2")
          start_time = time.time()
          thread1.start()
          thread2.start()
          thread1.join()
          thread2.join()
          print("last time:{}".format(time.time()-start_time))
    

10.3 线程间通信-Queue

  • queue.get时实际会是调用dequeue(双端队列),它是线程安全的。
  • queue也可使用join阻塞线程,调用queue的task_done()则join会继续执行。
  • 线程通信时,如果直接import变量,那么该进程看不到其他线程修改的值。示例:
    # 其他线程改变detail_url_list时,该线程无法察觉
      from chaptel1.variables import detail_url_list
      # 其他线程修改variables时,该线程中该变量也会被修改
      from chaptel1 import variables
    
  • 使用queue通信,示例:

    from queue import Queue
      import threading
      import time
    
      def get_detail_html(queue):
          while True:
              url = queue.get()
              print("get detail html started")
              time.sleep(2)
              print("get detail html end")
    
      def get_detail_url(queue):
          while True:
              print("get detail url started")
              time.sleep(2)
              for i in range(20):
                  queue.put("http://www.baidu.com/{id}".format(id=1))
              print("get detail url end")
    
      if __name__ == "__main__":
          detail_url_queue = Queue(maxsize=1000)
          thread_detail_url = threading.Thread(target=get_detail_url,args=(detail_url_queue,))
          thread_detail_url.start()
          for i in range(10):
              html_thread = threading.Thread(target=get_detail_html,args=(detail_url_queue,))
              html_thread.start()
          start_time = time.time()
          print("last time:{}".format(time.time()-start_time))
    

10.4 线程同步-Lock、RLock

  • 为什么需要线程同步?

在代码执行过程中,每一步操作都会被解析成字节码,例如a += 1转为字节码后操作有四步。那么在多线程运行时,四步操作中任意一步都有可能切换到其他线程去,而其他线程中对相同的变量也做了赋值操作,此时就会导致变量值被重复覆盖(不是预期的情况)。如果应用线程同步锁,可以保证某个代码段在运行时不会切换到其他线程。

  • 通常使用threading的Lock做线程同步。
  • lock.acquire(),获取锁。
  • lock.release(),释放锁。
  • 示例代码:

    from threading import Lock
      total = 0
      lock = Lock()
      def add():
          global total
          global lock
          for i in range(1000000):
              lock.acquire()
              total += 1
              lock.release()
    
      def desc():
          global total
          global lock
          for i in range(1000000):
              lock.acquire()
              total -= 1
              lock.release()
    
      import threading
      thread1 = threading.Thread(target=add)
      thread2 = threading.Thread(target=desc)
      thread1.start()
      thread2.start()
      thread1.join()
      thread2.join()
      print(total)
    
  • 死锁情景一:在a.acquire()后,未释放锁的情况下再次a.acquire()会导致永远不会释放锁,从而造成死锁。

  • 死锁情景二:(资源竞争)X线程获取a.acquire()后接着获取b.acquire(),此时如有有其他线程先获取b.acquire(),再获取a.acquire()。那么也会造成死锁。
  • 解决死锁:使用RLock(可重入的锁),可以在同一个线程中多次调用acquire(),但最终acquire()的次数需要和release()相等。

10.5 线程同步-Condition

  • Condition(条件变量):用于复杂线程间通信的锁,位于threading.Condition。
  • Condition实现了enter和exit方法,所以可以使用with语句。
  • acquire方法:调用RLock或者Lock的acquire方法。
  • exit方法:实际调用了lock的release方法。
  • wait方法:允许等待某个条件变量的通知。执行时会生成一个锁放进双端队列(_waiters)中,同时释放主锁。
  • notify方法:通知调用wait方法的线程启动。执行时会从队列(_waiters)中弹出一个锁,执行release()方法释放锁。
  • notify_all方法:释放队列(_waiters)中所有锁。
  • 代码示例:

    import threading
    
      class XiaoAi(threading.Thread):
          def __init__(self,cond):
              super().__init__(name="小爱同学")
              self.cond = cond
    
          def run(self):
              with self.cond:
                  self.cond.wait()
                  print("{} : 天猫精灵".format(self.name))
                  self.cond.notify()
    
                  self.cond.wait()
                  print("{} : bbbbbb".format(self.name))
                  self.cond.notify()
    
                  self.cond.wait()
                  print("{} : dddddd".format(self.name))
                  self.cond.notify()
    
      class TianMao(threading.Thread):
          def __init__(self,cond):
              super().__init__(name="天猫精灵")
              self.cond = cond
    
          def run(self):
              with self.cond:
                  print("{} : 小爱同学".format(self.name))
                  self.cond.notify()
                  self.cond.wait()
    
                  print("{} : aaaaaa".format(self.name))
                  self.cond.notify()
                  self.cond.wait()
    
                  print("{} : cccccc".format(self.name))
                  self.cond.notify()
                  self.cond.wait()
    
      if __name__ == "__main__":
          cond = threading.Condition()
          xiaoai = XiaoAi(cond)
          tianmao = TianMao(cond)
          xiaoai.start()
          tianmao.start()
    

10.6 线程同步-Semaphore

  • Semaphore(信号量):用于控制进入数量的锁。存在于threading.Semaphore。
  • 初始化方法:可以传递一个参数value,表示允许进入数量。
  • acquire()方法:调用一次,对应的value数量减一,当为0时会阻塞。
  • release()方法:调用一次,对应的value数量加一。
  • 内部使用的是condition实现。
  • 示例代码:

    import threading
      import time
      class HtmlSpider(threading.Thread):
          def __init__(self,url,sem):
              super().__init__()
              self.url = url
              self.sem = sem
    
          def run(self):
              time.sleep(2)
              print("get html success")
              self.sem.release()
    
      class UrlProducer(threading.Thread):
          def __init__(self,sem):
              super().__init__()
              self.sem = sem
    
          def run(self):
              for i in range(20):
                  self.sem.acquire()
                  html_thread = HtmlSpider("https://baidu.com/{}".format(i),self.sem)
                  html_thread.start()
    
      if __name__ == "__main__":
          sem = threading.Semaphore(3)
          url_producer = UrlProducer(sem)
          url_producer.start()
    

10.7 ThreadPoolExecutor线程池

  • 线程池包:concurrent.futures用于线程池和进程池编程(futures的多线程和多进程编码接口一致)。
  • 线程池可以用于控制线程数量,获取任务状态和返回值等。
  • 引入线程池:from concurrent.futures import ThreadPoolExecutor。使用示例:

    from concurrent.futures import ThreadPoolExecutor
      import time
    
      def get_html(times):
          time.sleep(times)
          print("get page {} success".format(times))
          return times
    
      executor = ThreadPoolExecutor(max_workers=2)
    
      task1 = executor.submit(get_html,(3))
      task2 = executor.submit(get_html,(2))
    
      print(task1.done())  # 判断当前是否执行完成
      time.sleep(3)
      print(task1.done())
    
      print(task1.result())  # 获取返回值,会阻塞
      print(task2.cancel())  # 用于取消任务,但只能在执行之前使用(例如线程池满,等待线程池给线程时)。执行中和执行完时会返回False
    
  • 引入判断批量完成状态:from concurrent.futures import as_completed
  • as_completed每次会把已经完成的通过yield返回,谁先执行完就先返回谁。使用示例:

    from concurrent.futures import as_completed
      urls = [2,3,2]
      all_task = [executor.submit(get_html,(i)) for i in urls]
    
      for future in as_completed(all_task):
          data = future.result()
          print("get {} page ok".format(data))
    
  • executor.map也可以获取已经完成的,按顺序返回结果。使用示例:
    urls = [2,3,2]
      for data in executor.map(get_html,urls):
          print("get {} page ok".format(data))
    
  • 阻塞线程,指定某个或者一些task完成后才能继续执行。from concurrent.futures import wait。使用示例:
    urls = [3,2,4]
      all_task = [executor.submit(get_html,(i)) for i in urls]
      wait(all_task)
      for future in as_completed(all_task):
          data = future.result()
          print("get {} page ok".format(data))
    
  • wait还可以传递return_when等参数,指定执行条件后再往下执行。示例:
    from concurrent.futures import wait,FIRST_COMPLETED
      all_task = [executor.submit(get_html,(i)) for i in urls]
      wait(all_task, return_when=FIRST_COMPLETED)  # 第一个执行后继续执行
    

10.8 ThreadPoolExecutor源码分析

  • ThreadPoolExecutor中最重要的是Futures这个类,它位于concurrent.futures下。
  • Futures又称为未来对象(也可以理解成task返回容器),它会在submit之后被返回,它当时可能未完成,但可能未来某一时刻会完成。
  • Futures是多线程、多进程、协程中相通的概念。

线程池执行步骤:

  1. 执行submit方法。
  2. 会先获取一把锁:with self._shutdown_lock。
  3. 加锁后的代码中,会生成Futures对象,最后会将该futures对象返回。
  4. 然后将futures放入线程池执行单元_WorkItem中,生成workitem对象。
  5. workitem对象会被放入_work_queue队列中。
  6. 使用self._adjust_thread_count()调整线程数量和执行线程。
  7. self._adjust_thread_count()中如果线程足够,会创建一个线程,传入_work_queue作为参数。
  8. 当线程执行指定方法后,会将result通过future.set_result放入Futures对象。
  9. 最终Futures就可以获取线程状态。

  10. ThreadPoolExecutor的submit源码:

    def submit(*args, **kwargs):
              if len(args) >= 2:
                  self, fn, *args = args
              elif not args:
                  raise TypeError("descriptor 'submit' of 'ThreadPoolExecutor' object "
                                  "needs an argument")
              elif 'fn' in kwargs:
                  fn = kwargs.pop('fn')
                  self, *args = args
                  import warnings
                  warnings.warn("Passing 'fn' as keyword argument is deprecated",
                                DeprecationWarning, stacklevel=2)
              else:
                  raise TypeError('submit expected at least 1 positional argument, '
                                  'got %d' % (len(args)-1))
    
              with self._shutdown_lock:
                  if self._broken:
                      raise BrokenThreadPool(self._broken)
    
                  if self._shutdown:
                      raise RuntimeError('cannot schedule new futures after shutdown')
                  if _shutdown:
                      raise RuntimeError('cannot schedule new futures after '
                                         'interpreter shutdown')
    
                  f = _base.Future()
                  w = _WorkItem(f, fn, args, kwargs)
    
                  self._work_queue.put(w)
                  self._adjust_thread_count()
                  return f
    

10.9 多线程和多进程对比

  • 多CPU操作适合使用多进程,多IO操作适合使用多线程(进程切换代价较高)。
  • 通过斐波拉契计算,对比多线程和多进程的执行时间,代码示例:

    import time
      from concurrent.futures import ThreadPoolExecutor,as_completed
      from concurrent.futures import ProcessPoolExecutor
    
      def fib(n):
          if n<=2:
              return 1
          return fib(n-1) + fib(n-2)
    
      if __name__ == '__main__':
          # 多线程
          # with ThreadPoolExecutor(3) as executor:
          #     start_time = time.time()
          #     all_taks = [executor.submit(fib,(num)) for num in range(25,35)]
          #     for future in as_completed(all_taks):
          #         data = future.result()
          #         print("data {} ok".format(data))
          #     print("last time is :{}".format(time.time()-start_time))
          #last time is :16.157923936843872
    
          # 多进程
          with ProcessPoolExecutor(3) as executor:
              start_time = time.time()
              all_taks = [executor.submit(fib,(num)) for num in range(25,35)]
              for future in as_completed(all_taks):
                  data = future.result()
                  print("data {} ok".format(data))
              print("last time is :{}".format(time.time()-start_time))
          # last time is :5.383307695388794
    

10.10 multiprocessing多进程编程

  • (推荐使用)多进程编程使用concurrent.futures的ProcessPoolExecutor,它和多线程使用方式几乎一样。
  • multiprocessing也可以实现多进程,它比ProcessPollExcutor更加底层。多进程代码示例:

    import multiprocessing
      import time
    
      def get_html(n):
          time.sleep(n)
          print("get_html sucess")
          return n
    
      class MyProgress(multiprocessing.Process):
          def run(self):
              get_html(2)
    
      if __name__ == '__main__':
          # 使用多进程函数方式
          # progress = multiprocessing.Process(target=get_html,args=(2,))
          # progress.start()
          # print(progress.pid)  # 获取进程的pid
          # progress.join()
          # print("main progress end")
    
          # 使用多进程类的方式
          my = MyProgress()
          my.start()
          my.join()
          print("main progress end")
    
  • multiprocessing.Pool可以实现多进程池。示例代码:

    import multiprocessing
      import time
    
      def get_html(n):
          time.sleep(n)
          print("get_html sucess")
          return n
    
      if __name__ == '__main__':
          # 使用进程池
          pool = multiprocessing.Pool(multiprocessing.cpu_count())  # 如果不传参,会默认使用cpu数量
          result = pool.apply_async(get_html,args=(3,))  # 异步提交任务,返回结果
          pool.close()  # 在join前必须关闭,表示不再接收新任务
          pool.join()  # 等待所有任务完成
          print(result.get())
    
  • pool对象的imap对应多线程中的map方法。获取已完成的,按顺序返回结果。
  • pool对象的imap_unordered对应多线程中的as_completed。获取已完成的,谁先完成就先返回谁。

10.11 进程间通信:Queue、Pipe、Manager

  • 多进程中的线程间通信和锁,在多进程中无法使用。
  • 共享全局变量不适用于多进程编程(可以在多线程中使用)。
  • multiprocessing.Queue用于多进程间通信(无法用在进程池中)。代码示例:

    import time
      from multiprocessing import Process, Queue
      def producer(queue):
          queue.put('a')
          time.sleep(2)
    
      def consumer(queue):
          time.sleep(2)
          data = queue.get()
          print(data)
    
      if __name__ == '__main__':
          queue = Queue(10)
          my_producer = Process(target=producer,args=(queue,))
          my_consumer = Process(target=consumer,args=(queue,))
          my_producer.start()
          my_consumer.start()
          my_producer.join()
          my_consumer.join()
    
  • multiprocessing.Manager下的Queue用于线程池间通信。代码示例:

    import time
      from multiprocessing import Pool,Manager
    
      def producer(queue):
          queue.put('a')
          time.sleep(2)
    
      def consumer(queue):
          time.sleep(2)
          data = queue.get()
          print(data)
    
      if __name__ == '__main__':
          queue =Manager().Queue(2)
          pool = Pool(2)
          pool.apply_async(producer,args=(queue,))
          pool.apply_async(consumer,args=(queue,))
    
          pool.close()
          pool.join()
    
  • multiprocessing.Pipe也可以实现进程间通信,它只适用于两个进程,它性能高于Queue。代码示例:

    import time
      from multiprocessing import Pipe,Process
    
      def producer(pipe):
          pipe.send('aaa')
    
      def consumer(pipe):
          print(pipe.recv())
    
      if __name__ == '__main__':
          recevie_pipe,send_pipe = Pipe()
    
          my_producer = Process(target=producer, args=(send_pipe,))
          my_consumerr = Process(target=consumer, args=(recevie_pipe,))
    
          my_producer.start()
          my_consumerr.start()
          my_producer.join()
          my_consumerr.join()