Python3多线程编程 1.CPython中的GIL 在开始多线程编程之前,我们应该了解Python的GIL锁。GIL全称是Global Interpreter Lock,Python在设计之初,为了简单及线程安全,对整个解释器加了一把全局锁,这把锁就叫GIL。GIL的存在,只能让Python同一时刻只能有一个线程运行在一个CPU上,无法将多个线程映射到CPU上。这意味着,在如今的多核CPU时代,使用Python多线程编程,只能在一个核心上运行所有线程,无法将所有核心跑满。
2.GIL锁能保证我们编写线程的安全吗? 并不是有了GIL锁,我们编写的线程就安全了。GIL锁有一套自己的释放机制,当多个线程运行的时候,GIL会根据每个线程运行的字节码行数,时间片,IO操作等因素释放GIL,把CPU资源让给其他线程。
比如下面的例子:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 ''' Created on 2021年8月5日 @author: Jason Wang ''' import threadingtotal = 0 def fun1 (): global total for i in range (1000000 ): total += 1 def fun2 (): global total for i in range (1000000 ): total -= 1 if __name__ == "__main__" : thread1 = threading.Thread(target=fun1) thread2 = threading.Thread(target=fun2) thread1.start() thread2.start() thread1.join() thread2.join() print (total)
上面的代码,每次的运行结果并不是都是0,这说明了GIL并不能保证我们编写线程的安全。
3.如何进行多线程编程 对于IO操作密集的应用,采用多线程和多进程性能差别不大。要进行多线程编程,可以使用Thread类实例化。参考下面的代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 ''' Created on 2021年8月5日 @author: Jason Wang ''' import threading, timedef getHtml (url ): print ("get html {} started...\n" .format (url)) time.sleep(2 ) print ("get html {} done...\n" .format (url)) def getDetail (url ): print ("get detail {} started...\n" .format (url)) time.sleep(2 ) print ("get detail {} done...\n" .format (url)) if __name__ == "__main__" : thread1 = threading.Thread(target=getHtml, args=("heyboy.tech" ,), name="html_thread" ) thread2 = threading.Thread(target=getDetail, args=("jason-w.cn" ,), name="detail_thread" ) current_time = time.time() thread1.setDaemon(True ) thread1.start() thread2.start() print ("spend time is {}\n" .format (time.time()-current_time))
除了使用函数方式,还可以使用面向对象方式。继承Thread类,重写run方法。如下代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 ''' Created on 2021年8月5日 @author: Jason Wang ''' import threading, timeclass GetHtml (threading.Thread): def __init__ (self, name, url ): super ().__init__(name=name) self.url = url def run (self ): print ("get html {} started...\n" .format (self.url)) time.sleep(2 ) print ("get html {} done...\n" .format (self.url)) class GetDetail (threading.Thread): def __init__ (self, name, url ): super ().__init__(name=name) self.url = url def run (self ): print ("get detail {} started...\n" .format (self.url)) time.sleep(2 ) print ("get detail {} done...\n" .format (self.url)) if __name__ == "__main__" : Thread1 = GetHtml("html_thread" , "heyboy.tech" ) Thread2 = GetDetail("deail_thread" , "jason-w.cn" ) Thread1.start() Thread2.start()
4.使用共享变量及Queue进行线程间通信 线程间比较原始的通信方式是共享变量,比如第2小节的例子。但会有问题,比如共享的变量是线程不安全的。要实现变量的线程安全问题,需要对变量的操作加锁,需要对锁比较了解。所以一般来说,我们是不推荐的。其实我们可以直接用线程安全的队列Queue来进行线程间通信。
Queue的用法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 ''' Created on 2021年8月5日 @author: Jason Wang ''' import threadingfrom queue import Queuedef fun1 (que ): for i in range (10000 ): que.put(i) def fun2 (que ): for i in range (10000 ): que.get() if __name__ == "__main__" : que = Queue(maxsize=10000 ) thread1 = threading.Thread(target=fun1, args=(que,), name="fun1_thread" ) thread2 = threading.Thread(target=fun2, args=(que,), name="fun2_thread" ) thread1.start() thread2.start() thread1.join() thread2.join() print (que.qsize())
上面的代码,每次运行的结果,19行的共享变量que的长度始终为0。说明Queue是线程安全的,不需要我们额外的进行加锁。
5.使用Lock及RLock进行线程同步 第2小节的例子中,为什么结果不为0呢。
假设全局变量total=0,线程A进行对total+1操作,线程B对total-1操作。假设线程A先运行,通过线程计算,total+1后的值为1,但还未来得及把1赋值给total,total还是等于0。于是时间片就用完了。此时线程B执行,通过线程计算,total-1的值为-1,然后把-1赋值给total,此时total为-1。线程B结束,时间片给了线程A,然后A继续执行,把1赋值给total。于是最终结果,total为1。要保证结果为0,我们需要对线程的操作加锁,线程A操作执行完后,才让其他线程B运行。
使用Lock锁对象:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 ''' Created on 2021年8月5日 @author: Jason Wang ''' import threadingfrom threading import Locktotal = 0 lock = Lock() def fun1 (): global total global lock for i in range (1000000 ): lock.acquire() total += 1 lock.release() def fun2 (): global total global lock for i in range (1000000 ): lock.acquire() total -= 1 lock.release() if __name__ == "__main__" : thread1 = threading.Thread(target=fun1, name="fun1_thread" ) thread2 = threading.Thread(target=fun2, name="fun2_thread" ) thread1.start() thread2.start() thread1.join() thread2.join() print (total)
上面的代码,无论运行多少次,结果始终为0。
Lock锁不能重入,即不能调用两次acquire,所以Python为我们提供了可重入的锁RLock。但RLock重入的时候,只能用在同一个线程中,比如下面的例子:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 from threading import RLocktotal = 0 rlock = RLock() def fun2 (): global rlock for i in range (1000000 ): rlock.acquire() fun3(rlock) rlock.release() def fun3 (rlock ): global total rlock.acquire() total -= 1 rlock.release()
6.复杂的线程间通信condition条件变量 假设一个线程是天猫精灵,一个线程是小爱,那么可能会有如下的场景:
天猫说完小爱才能说,小爱说完天猫才能说。也就相当于天猫精灵运行完接着小爱运行,小爱运行完再运行天猫精灵。
在不考虑使用condition的情况下,我们也许会使用锁,如下代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 ''' Created on 2021年8月5日 @author: Jason Wang ''' import threadingclass TianMao (threading.Thread): def __init__ (self, lock ): super ().__init__(name="tianmao_thread" ) self.lock = lock self.name = "天猫精灵" def run (self ): self.lock.acquire() print ("{name}:小爱同学" .format (name=self.name)) self.lock.release() self.lock.acquire() print ("{name}:我们来对古诗吧" .format (name=self.name)) self.lock.release() class XiaoAi (threading.Thread): def __init__ (self, lock ): super ().__init__(name="xiaoai_thread" ) self.lock = lock self.name = "小爱" def run (self ): self.lock.acquire() print ("{name}:在" .format (name=self.name)) self.lock.release() self.lock.acquire() print ("{name}:好啊" .format (name=self.name)) self.lock.release() if __name__ == "__main__" : lock = threading.Lock() tianmao = TianMao(lock) xiaoai = XiaoAi(lock) tianmao.start() xiaoai.start()
经运行,实际结果并不满足预期。结果显示有时候天猫或小爱会一次性把话说完。
在Java中,有等待唤醒机制,在Python中同样也有,叫条件变量Condition。使用条件变量改造后,如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 ''' Created on 2021年8月5日 @author: Jason Wang ''' import threadingclass TianMao (threading.Thread): def __init__ (self, cond ): super ().__init__(name="tianmao_thread" ) self.cond = cond self.name = "天猫精灵" def run (self ): with self.cond: print ("{name}:小爱同学" .format (name=self.name)) self.cond.notify() self.cond.wait() print ("{name}:我们来对古诗吧" .format (name=self.name)) class XiaoAi (threading.Thread): def __init__ (self, cond ): super ().__init__(name="xiaoai_thread" ) self.cond = cond self.name = "小爱" def run (self ): with self.cond: self.cond.wait() print ("{name}:在" .format (name=self.name)) self.cond.notify() print ("{name}:好啊" .format (name=self.name)) if __name__ == "__main__" : cond = threading.Condition() tianmao = TianMao(cond) xiaoai = XiaoAi(cond) xiaoai.start() tianmao.start()
wait后,线程就会休眠,当接收到notify信号后,线程继续运行。需要提一下的是,使用condition应当注意启动顺序,wait和notify只能用在with语句块中。
7.信号量semaphore semaphore信号量是一个计数器,当执行acquire方法的时候,计数器会减一。当执行release方法的时候,计数器会加一。这一点在控制线程并发数量的时候特别有用。比如下面的代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 ''' Created on 2021年8月6日 @author: Jason Wang ''' import threading, timeclass GetHtml (threading.Thread): def __init__ (self, url, sema ): super ().__init__(name="gethtml_thread" ) self.url = url self.sema = sema def run (self ): time.sleep(2 ) print ("I've got the {url} html code" .format (url=self.url)) self.sema.release() class SpiderProducer (threading.Thread): def __init__ (self,sema ): super ().__init__(name="spiderproducer_thread" ) self.sema = sema def run (self ): for i in range (20 ): self.sema.acquire() getHtml_thread = GetHtml("https://heyboy.tech/{id}" .format (id =i), self.sema) getHtml_thread.start() if __name__ == "__main__" : sema = threading.Semaphore(3 ) spiderProducer_thread = SpiderProducer(sema) spiderProducer_thread.start()
上述代码,控制了GetHtml线程的并发数始终为3个。
8.ThreadPoolExecutor线程池 线程池位于concurrent包内的futures模块。
如果我们需要完成第7小节这种控制线程并发的数量,除了使用semaphore信号量,还可以使用线程池。使用线程池,主线程能够获取线程的运行状态及返回值,比如线程完成时,我们主线程立即知道,这是信号量做不到的。除此之外,使用futures可以让多线程和多进程的编码接口一致。
下面是ThreadPoolExecutor的基础用法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 ''' Created on 2021年8月6日 @author: Jason Wang ''' import timefrom concurrent.futures.thread import ThreadPoolExecutordef getHtml (times ): time.sleep(times) print ("got done for {} times" .format (times)) return times if __name__ == "__main__" : threadPool = ThreadPoolExecutor(max_workers=2 ) task1 = threadPool.submit(getHtml, 3 ) task2 = threadPool.submit(getHtml, 1 ) print (task1.done()) print (task1.result()) print (task1.cancel())
如果需要批量添加任务,批量获取结果,需要主线程等待,见如下代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 ''' Created on 2021年8月6日 @author: Jason Wang ''' from concurrent.futures import as_completed, wait, FIRST_COMPLETEDfrom concurrent.futures.thread import ThreadPoolExecutorimport timedef getHtml (times ): time.sleep(times) print ("got done for {} times" .format (times)) return times if __name__ == "__main__" : threadPool = ThreadPoolExecutor(max_workers=2 ) time_list = [5 ,3 ,2 ] all_task = [threadPool.submit(getHtml, t) for t in time_list] for task in as_completed(all_task): print ("time is {}" .format (task.result())) for data in threadPool.map (getHtml, time_list): print (data)
上面的代码,添加了三个任务。每当一个任务完成,主线程就迭代一个任务并显示任务的执行结果。
附带一个官方的例子:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 import concurrent.futuresimport urllib.requestURLS = ['http://www.foxnews.com/' , 'http://www.cnn.com/' , 'http://europe.wsj.com/' , 'http://www.bbc.co.uk/' , 'http://some-made-up-domain.com/' ] def load_url (url, timeout ): with urllib.request.urlopen(url, timeout=timeout) as conn: return conn.read() with concurrent.futures.ThreadPoolExecutor(max_workers=5 ) as executor: future_to_url = {executor.submit(load_url, url, 60 ): url for url in URLS} for future in concurrent.futures.as_completed(future_to_url): url = future_to_url[future] try : data = future.result() except Exception as exc: print ('%r generated an exception: %s' % (url, exc)) else : print ('%r page is %d bytes' % (url, len (data)))