1.为什么要用多进程编程

由于GIL的存在,同一时刻,只有一个线程能运行在一个CPU的上。所以对于CPU密集型的需求,多线程对CPU的利用率不高。而多进程则能解决这个问题。需要注意的是,对于操作系统来讲,进程切换的开销比线程要高。所以对于IO密集型的需求,多线程和多进程的差别不大的情况下,应当使用多线程。

2.父进程和子进程间的关系

先看下如下代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
# coding=utf-8
'''
Created on 2021年8月25日

@author: Jason Wang
'''
import os, time

if __name__ == "__main__":
pid = os.fork() #Fork出一个子进程。在子进程中返回0,在父进程中返回子进程的进程号。只能用于Linux下。
print("hello")
if pid == 0:
print("我是子进程,子进程id为:{},父进程id为:{}".format(os.getpid(), os.getppid()))
else:
print("我是父进程,id为:{}".format(os.getpid()))
time.sleep(5)

运行上述代码,运行结果为:

1
2
3
4
5
6
root@iZj6c1j9xpl70pdhc93y54Z:~# python3 test.py
hello
我是父进程,id为:65522
hello
我是子进程,子进程id为:65523,父进程id为:65522
root@iZj6c1j9xpl70pdhc93y54Z:~#

发现hello输出了两次,这是为什么呢?因为子进程创建后,会从父进程进行数据的拷贝(fork之后的代码段)。子进程和父进程是独立的,互不影响。如果去掉最后一行的sleep,把fork和print换下位置,如下代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# coding=utf-8
'''
Created on 2021年8月25日

@author: Jason Wang
'''
import os, time

if __name__ == "__main__":
print("hello")
pid = os.fork() #fork只能用于Linux下,作用是从当前进程生成一个子进程
if pid == 0:
print("我是子进程,子进程id为:{},父进程id为:{}".format(os.getpid(), os.getppid()))
else:
print("我是父进程,id为:{}".format(os.getpid()))

会发现运行结果为:

1
2
3
hello
我是父进程,id为:65675
root@iZj6c1j9xpl70pdhc93y54Z:~# 我是子进程,子进程id为:65676,父进程id为:1

这是为什么呢?因为父进程运行结束后,子进程依旧在运行。线程和进程有一个不一样的地方是:kill掉进程,进程中的所有线程都会被kill。但kill掉进程,进程中的子进程不会被kill掉,而是被init进程接管,继续运行,直到结束。kill掉进程,如果进程中只有守护进程,该进程和守护进程都会被kill掉。加上sleep的时候,由于父进程没有退出,所以输出的结果不会错行。

3.使用ProcessPoolExcutor进程池

ProcessPoolExcutor位于concurrent包内的futures模块。

使用方法和多线程的ThreadPoolExecutor一样,参考多线程编程的相关章节。

4.使用multiprocessing进行多进程编程

该接口比ProcessPoolExcutor更加底层,ProcessPoolExcutor是用的该接口进行的封装。平时进行多进程编程,建议使用更改层次的ProcessPoolExcutor。

使用函数式的方式:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
# coding=utf-8
'''
Created on 2021年8月25日

@author: Jason Wang
'''
import multiprocessing, time

def getHtml(seconds):
time.sleep(seconds)
print("sub process sleep {}".format(seconds))
return seconds

if __name__ == "__main__":
process1 = multiprocessing.Process(target=getHtml, args=(2,))
process1.start()
print("sub process pid is {}".format(process1.pid))
process1.join()
print("main process done")

使用面向对象的方式:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
# coding=utf-8
'''
Created on 2021年8月25日

@author: Jason Wang
'''
import multiprocessing, time

class MyProcess(multiprocessing.Process):
def __init__(self, seconds):
super().__init__(name = "MyProcess")
self.seconds = seconds

def run(self):
time.sleep(self.seconds)
print("sub process sleep {}".format(self.seconds))
return self.seconds

if __name__ == "__main__":
process1 = MyProcess(2)
process1.start()
print("sub process pid is {}".format(process1.pid))
process1.join()
print("main process done")

使用线程池:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
# coding=utf-8
'''
Created on 2021年8月25日

@author: Jason Wang
'''
import multiprocessing, time

def getHtml(seconds):
time.sleep(seconds)
print("sub process sleep {}".format(seconds))
return seconds

if __name__ == "__main__":
pool = multiprocessing.Pool(multiprocessing.cpu_count()) #参数为同时执行最大的进程数
result = pool.apply_async(getHtml, args=(2,)) #添加进程
pool.close()
pool.join() #调用join前必须先关闭pool
print(result.get()) #获取进程的返回结果

进程池中批量添加任务并获取返回结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
# coding=utf-8
'''
Created on 2021年8月25日

@author: Jason Wang
'''
import multiprocessing, time

def getHtml(seconds):
time.sleep(seconds)
print("sub process sleep {}".format(seconds))
return seconds

if __name__ == "__main__":
pool = multiprocessing.Pool(multiprocessing.cpu_count()) #参数为同时执行最大的进程数
#imap是按照添加的顺序返回迭代对象,使用imap_unordered,可以按照进程执行完毕的先后顺序返回迭代对象
for result in pool.imap(getHtml, [1,5,3]):
print(result)

5.使用Queue进程间的通信

queue包的Queue,不能用于进程间的通信,只能用于线程间。用于进程间的Queue,位于multiprocessing包中,使用方法和queue包的Queue一样,但不能用于multiprocessing线程池中。例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
# coding=utf-8
'''
Created on 2021年8月25日

@author: Jason Wang
'''
import multiprocessing, time
from multiprocessing import Queue

def producer(queue):
time.sleep(2)
queue.put("a")

def consumer(queue):
time.sleep(2)
data = queue.get()
print(data)

if __name__ == "__main__":
queue = Queue(10)
prod = multiprocessing.Process(target=producer, args=(queue, ))
cons = multiprocessing.Process(target=consumer, args=(queue, ))
prod.start()
cons.start()
prod.join()
cons.join()

6.使用共享变量进行进程间通信

进程间通信,共享变量的方式,无法用于进程间通信。因为进程间通信的上下文,是独立的。即传递给进程中的共享变量,是独立的副本。但要实现共享变量,可以使用Manager对象特有的数据结构。例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
# coding=utf-8
'''
Created on 2021年8月25日

@author: Jason Wang
'''
import multiprocessing, time
from multiprocessing import Manager

def producer(list):
time.sleep(2)
list.append("A")

def consumer(list):
time.sleep(2)
list.append("B")

if __name__ == "__main__":
list = Manager().list()
proc = multiprocessing.Process(target=producer, args=(list, ))
cons = multiprocessing.Process(target=consumer, args=(list, ))
proc.start()
cons.start()
proc.join()
proc.join()
for item in list:
print(item)

除了list,manager对象中还有Lock,RLock,Tuple,dict等。

7.使用Manager进行进程池间的通信

multiprocessing包中Queue无法用于进程池间的通信,要在进程池中使用Queue,需要使用Manager对象。

例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
# coding=utf-8
'''
Created on 2021年8月25日

@author: Jason Wang
'''
import multiprocessing, time
from multiprocessing import Manager

def producer(queue):
time.sleep(2)
queue.put("a")

def consumer(queue):
time.sleep(2)
data = queue.get()
print(data)

if __name__ == "__main__":
queue = Manager().Queue(10)
pool = multiprocessing.Pool(2)
pool.apply_async(producer, args=(queue, ))
pool.apply_async(consumer, args=(queue, ))
pool.close()
pool.join()

8.使用pipe进行进程间通信

pipe是一个管道,是简化的队列。适用于只有两个线程的情况。效率高于Queue。例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
# coding=utf-8
'''
Created on 2021年8月25日

@author: Jason Wang
'''
import multiprocessing, time
from multiprocessing import Pipe

def producer(pipe):
time.sleep(2)
pipe.send("a")

def consumer(pipe):
time.sleep(2)
data = pipe.recv()
print(data)

if __name__ == "__main__":
receive_pipe, send_pipe = Pipe()
proc = multiprocessing.Process(target=producer, args=(send_pipe, ))
cons = multiprocessing.Process(target=consumer, args=(receive_pipe, ))
proc.start()
cons.start()
proc.join()
proc.join()

9.进程间同步

进程间同步,使用方法和线程一样。

官方文档:

1
2
Manager() 返回的管理器支持类型: listdict 、 Namespace 、 Lock 、 
RLock 、 Semaphore 、 BoundedSemaphore 、 Condition 、 Event 、 Barrier 、 Queue 、 Value 和 Array 。

https://docs.python.org/zh-cn/3/library/multiprocessing.html