Python3多线程编程

1.CPython中的GIL

在开始多线程编程之前,我们应该了解Python的GIL锁。GIL全称是Global Interpreter Lock,Python在设计之初,为了简单及线程安全,对整个解释器加了一把全局锁,这把锁就叫GIL。GIL的存在,只能让Python同一时刻只能有一个线程运行在一个CPU上,无法将多个线程映射到CPU上。这意味着,在如今的多核CPU时代,使用Python多线程编程,只能在一个核心上运行所有线程,无法将所有核心跑满。

2.GIL锁能保证我们编写线程的安全吗?

并不是有了GIL锁,我们编写的线程就安全了。GIL锁有一套自己的释放机制,当多个线程运行的时候,GIL会根据每个线程运行的字节码行数,时间片,IO操作等因素释放GIL,把CPU资源让给其他线程。

比如下面的例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
# coding=utf-8
'''
Created on 2021年8月5日

@author: Jason Wang
'''
import threading

total = 0

def fun1():
global total
for i in range(1000000):
total += 1

def fun2():
global total
for i in range(1000000):
total -= 1

if __name__ == "__main__":
thread1 = threading.Thread(target=fun1)
thread2 = threading.Thread(target=fun2)
thread1.start()
thread2.start()
thread1.join()
thread2.join()
print(total)

上面的代码,每次的运行结果并不是都是0,这说明了GIL并不能保证我们编写线程的安全。

3.如何进行多线程编程

对于IO操作密集的应用,采用多线程和多进程性能差别不大。要进行多线程编程,可以使用Thread类实例化。参考下面的代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
# coding=utf-8
'''
Created on 2021年8月5日

@author: Jason Wang
'''
import threading, time

def getHtml(url):
print("get html {} started...\n".format(url))
time.sleep(2)
print("get html {} done...\n".format(url))

def getDetail(url):
print("get detail {} started...\n".format(url))
time.sleep(2)
print("get detail {} done...\n".format(url))

if __name__ == "__main__":
#使用Thread对象创建线程,target为函数名,args为函数参数,name为线程名称。其中args和name是可选的
thread1 = threading.Thread(target=getHtml, args=("heyboy.tech",), name="html_thread")
thread2 = threading.Thread(target=getDetail, args=("jason-w.cn",), name="detail_thread")
current_time = time.time()
thread1.setDaemon(True) #设置为守护线程
thread1.start()
thread2.start()
print("spend time is {}\n".format(time.time()-current_time))

除了使用函数方式,还可以使用面向对象方式。继承Thread类,重写run方法。如下代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
# coding=utf-8
'''
Created on 2021年8月5日

@author: Jason Wang
'''
import threading, time

class GetHtml(threading.Thread):
def __init__(self, name, url):
super().__init__(name=name)
self.url = url

def run(self):
print("get html {} started...\n".format(self.url))
time.sleep(2)
print("get html {} done...\n".format(self.url))

class GetDetail(threading.Thread):
def __init__(self, name, url):
super().__init__(name=name)
self.url = url

def run(self):
print("get detail {} started...\n".format(self.url))
time.sleep(2)
print("get detail {} done...\n".format(self.url))

if __name__ == "__main__":
Thread1 = GetHtml("html_thread", "heyboy.tech")
Thread2 = GetDetail("deail_thread", "jason-w.cn")
Thread1.start()
Thread2.start()

4.使用共享变量及Queue进行线程间通信

线程间比较原始的通信方式是共享变量,比如第2小节的例子。但会有问题,比如共享的变量是线程不安全的。要实现变量的线程安全问题,需要对变量的操作加锁,需要对锁比较了解。所以一般来说,我们是不推荐的。其实我们可以直接用线程安全的队列Queue来进行线程间通信。

Queue的用法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
# coding=utf-8
'''
Created on 2021年8月5日

@author: Jason Wang
'''
import threading
from queue import Queue

def fun1(que):
for i in range(10000):
que.put(i) #put方法是阻塞的

def fun2(que):
for i in range(10000):
que.get() #get方法是阻塞的

if __name__ == "__main__":
que = Queue(maxsize=10000) #设置最大长度为1000,防止内存占用过多
thread1 = threading.Thread(target=fun1, args=(que,), name="fun1_thread")
thread2 = threading.Thread(target=fun2, args=(que,), name="fun2_thread")
thread1.start()
thread2.start()
thread1.join()
thread2.join()
#que.task_done() #Queue支持等待唤醒机制
#que.join() #当Queue进行等待执行完成时,需要接收上面一行代码的信号,才会继续执行。
print(que.qsize())

上面的代码,每次运行的结果,19行的共享变量que的长度始终为0。说明Queue是线程安全的,不需要我们额外的进行加锁。

5.使用Lock及RLock进行线程同步

第2小节的例子中,为什么结果不为0呢。

假设全局变量total=0,线程A进行对total+1操作,线程B对total-1操作。假设线程A先运行,通过线程计算,total+1后的值为1,但还未来得及把1赋值给total,total还是等于0。于是时间片就用完了。此时线程B执行,通过线程计算,total-1的值为-1,然后把-1赋值给total,此时total为-1。线程B结束,时间片给了线程A,然后A继续执行,把1赋值给total。于是最终结果,total为1。要保证结果为0,我们需要对线程的操作加锁,线程A操作执行完后,才让其他线程B运行。

使用Lock锁对象:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
# coding=utf-8
'''
Created on 2021年8月5日

@author: Jason Wang
'''
import threading
from threading import Lock

total = 0
lock = Lock() #声明锁对象

def fun1():
global total
global lock
for i in range(1000000):
lock.acquire() #获取锁
total += 1
lock.release() #释放锁

def fun2():
global total
global lock
for i in range(1000000):
lock.acquire() #获取锁
total -= 1
lock.release() #释放锁

if __name__ == "__main__":
thread1 = threading.Thread(target=fun1, name="fun1_thread")
thread2 = threading.Thread(target=fun2, name="fun2_thread")
thread1.start()
thread2.start()
thread1.join()
thread2.join()
print(total)

上面的代码,无论运行多少次,结果始终为0。

Lock锁不能重入,即不能调用两次acquire,所以Python为我们提供了可重入的锁RLock。但RLock重入的时候,只能用在同一个线程中,比如下面的例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
from threading import RLock
total = 0
rlock = RLock()
def fun2():
global rlock
for i in range(1000000):
rlock.acquire() #获取锁
fun3(rlock)
rlock.release() #释放锁

def fun3(rlock):
global total
rlock.acquire()
total -= 1
rlock.release()

6.复杂的线程间通信condition条件变量

假设一个线程是天猫精灵,一个线程是小爱,那么可能会有如下的场景:

天猫说完小爱才能说,小爱说完天猫才能说。也就相当于天猫精灵运行完接着小爱运行,小爱运行完再运行天猫精灵。

在不考虑使用condition的情况下,我们也许会使用锁,如下代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
# coding=utf-8
'''
Created on 2021年8月5日

@author: Jason Wang
'''
import threading

class TianMao(threading.Thread):
def __init__(self, lock):
super().__init__(name="tianmao_thread")
self.lock = lock
self.name = "天猫精灵"

def run(self):
self.lock.acquire()
print("{name}:小爱同学".format(name=self.name))
self.lock.release()

self.lock.acquire()
print("{name}:我们来对古诗吧".format(name=self.name))
self.lock.release()

class XiaoAi(threading.Thread):
def __init__(self, lock):
super().__init__(name="xiaoai_thread")
self.lock = lock
self.name = "小爱"

def run(self):
self.lock.acquire()
print("{name}:在".format(name=self.name))
self.lock.release()

self.lock.acquire()
print("{name}:好啊".format(name=self.name))
self.lock.release()

if __name__ == "__main__":
lock = threading.Lock()
tianmao = TianMao(lock)
xiaoai = XiaoAi(lock)
tianmao.start()
xiaoai.start()

经运行,实际结果并不满足预期。结果显示有时候天猫或小爱会一次性把话说完。

在Java中,有等待唤醒机制,在Python中同样也有,叫条件变量Condition。使用条件变量改造后,如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
# coding=utf-8
'''
Created on 2021年8月5日

@author: Jason Wang
'''
import threading

class TianMao(threading.Thread):
def __init__(self, cond):
super().__init__(name="tianmao_thread")
self.cond = cond
self.name = "天猫精灵"

def run(self):
with self.cond:
print("{name}:小爱同学".format(name=self.name))
self.cond.notify()
self.cond.wait()
print("{name}:我们来对古诗吧".format(name=self.name))

class XiaoAi(threading.Thread):
def __init__(self, cond):
super().__init__(name="xiaoai_thread")
self.cond = cond
self.name = "小爱"

def run(self):
with self.cond:
self.cond.wait()
print("{name}:在".format(name=self.name))
self.cond.notify()
print("{name}:好啊".format(name=self.name))

if __name__ == "__main__":
cond = threading.Condition()
tianmao = TianMao(cond)
xiaoai = XiaoAi(cond)
xiaoai.start()
tianmao.start()

wait后,线程就会休眠,当接收到notify信号后,线程继续运行。需要提一下的是,使用condition应当注意启动顺序,wait和notify只能用在with语句块中。

7.信号量semaphore

semaphore信号量是一个计数器,当执行acquire方法的时候,计数器会减一。当执行release方法的时候,计数器会加一。这一点在控制线程并发数量的时候特别有用。比如下面的代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
# coding=utf-8
'''
Created on 2021年8月6日

@author: Jason Wang
'''
import threading, time

class GetHtml(threading.Thread):
def __init__(self, url, sema):
super().__init__(name="gethtml_thread")
self.url = url
self.sema = sema

def run(self):
time.sleep(2) #模拟爬取网页的操作
print("I've got the {url} html code".format(url=self.url))
self.sema.release()

class SpiderProducer(threading.Thread):
def __init__(self,sema):
super().__init__(name="spiderproducer_thread")
self.sema = sema

def run(self):
for i in range(20):
self.sema.acquire()
getHtml_thread = GetHtml("https://heyboy.tech/{id}".format(id=i), self.sema)
getHtml_thread.start()
#该行不能进行semaphore的释放操作,因为无法确定爬虫是否爬取完毕

if __name__ == "__main__":
sema = threading.Semaphore(3) #最大三个线程并发
spiderProducer_thread = SpiderProducer(sema)
spiderProducer_thread.start()

上述代码,控制了GetHtml线程的并发数始终为3个。

8.ThreadPoolExecutor线程池

线程池位于concurrent包内的futures模块。

如果我们需要完成第7小节这种控制线程并发的数量,除了使用semaphore信号量,还可以使用线程池。使用线程池,主线程能够获取线程的运行状态及返回值,比如线程完成时,我们主线程立即知道,这是信号量做不到的。除此之外,使用futures可以让多线程和多进程的编码接口一致。

下面是ThreadPoolExecutor的基础用法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
# coding=utf-8
'''
Created on 2021年8月6日

@author: Jason Wang
'''
import time
from concurrent.futures.thread import ThreadPoolExecutor

def getHtml(times):
time.sleep(times)
print("got done for {} times".format(times))
return times

if __name__ == "__main__":
threadPool = ThreadPoolExecutor(max_workers=2) #最大两个线程同时工作
task1 = threadPool.submit(getHtml, 3)
task2 = threadPool.submit(getHtml, 1)
print(task1.done()) #线程是否完成,立即返回,非阻塞
print(task1.result()) #获取线程的返回值,阻塞
print(task1.cancel()) #取消线程的运行,只可用于尚未开始运行的线程

如果需要批量添加任务,批量获取结果,需要主线程等待,见如下代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
# coding=utf-8
'''
Created on 2021年8月6日

@author: Jason Wang
'''
from concurrent.futures import as_completed, wait, FIRST_COMPLETED
from concurrent.futures.thread import ThreadPoolExecutor
import time


def getHtml(times):
time.sleep(times)
print("got done for {} times".format(times))
return times

if __name__ == "__main__":
threadPool = ThreadPoolExecutor(max_workers=2) #最大两个线程同时工作
time_list = [5,3,2]
all_task = [threadPool.submit(getHtml, t) for t in time_list]
#wait(all_task) #让主线程阻塞,等全部任务执行完后再继续执行主线程
#wait(all_task, return_when=FIRST_COMPLETED) #让主线程阻塞,等第一个任务运行完毕后主线程继续执行
#wait(all_task, return_when=FIRST_EXCEPTION) #让主线程阻塞,等第一个抛出异常的任务抛出异常后,主线程继续执行
#获取已经成功任务的返回结果
for task in as_completed(all_task):
#as_completed()传一个Future列表,在Future都完成之后返回一个迭代器
print("time is {}".format(task.result()))

#第二种方法获取已经成功任务的返回结果,上面这种方法哪个任务先完成哪个任务返回。这个方法返回顺序和time_list一致
for data in threadPool.map(getHtml, time_list):
print(data)

上面的代码,添加了三个任务。每当一个任务完成,主线程就迭代一个任务并显示任务的执行结果。

附带一个官方的例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
import concurrent.futures
import urllib.request

URLS = ['http://www.foxnews.com/',
'http://www.cnn.com/',
'http://europe.wsj.com/',
'http://www.bbc.co.uk/',
'http://some-made-up-domain.com/']

# Retrieve a single page and report the URL and contents
def load_url(url, timeout):
with urllib.request.urlopen(url, timeout=timeout) as conn:
return conn.read()

# We can use a with statement to ensure threads are cleaned up promptly
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
# Start the load operations and mark each future with its URL
future_to_url = {executor.submit(load_url, url, 60): url for url in URLS}
for future in concurrent.futures.as_completed(future_to_url):
url = future_to_url[future]
try:
data = future.result()
except Exception as exc:
print('%r generated an exception: %s' % (url, exc))
else:
print('%r page is %d bytes' % (url, len(data)))