19. multiprocessing
multiprocessing,基于进程的并行。
19.1. Process 类
- class multiprocessing.Process(group=None, target=None, name=None, args=(), kwargs={}, *, daemon=None)
进程对象表示在单独进程中运行的活动。
- Parameters
group – 应该始终是
None
,它仅用于兼容threading.Thread
。target
是由run()
方法调用的可调用对象;默认为None
,意味着什么都没有被调用。name – 是进程名称。
args – 是目标调用的参数元组;
kwargs
是目标调用的关键字参数字典。daemon – 是进程的守护标志,一个布尔值,必须在
start()
被调用之前设置;初始值继承自创建进程。
- run()
表示进程活动的方法,可以在子类中重载此方法。标准
run()
方法调用传递给对象构造函数的可调用对象(target
)。
- start()
启动进程活动,每个进程对象最多只能调用一次;它会将对象的
run()
方法安排在一个单独的进程中调用。
- join([timeout])
使主调进程(包含
p.join()
语句的进程)阻塞,直至被调用进程p
运行结束或超时(指定timeout
)。
- is_alive()
返回进程是否处于活动状态。粗略地说,从
start()
方法返回到子进程终止之前,进程对象仍处于活动状态。
- name()
进程的名称,是一个字符串,仅用于识别目的;可以为多个进程指定相同的名称。
- pid()
进程ID,在生成该进程之前为
None
。
19.2. 进程池
Pool
类表示一个工作进程池,它具有允许以几种不同方式将任务分配到工作进程的方法。
- class multiprocessing.pool.Pool([processes[, initializer[, initargs[, maxtasksperchild[, context]]]]])
- Parameters
processes – 是要使用的工作进程数目。如果
processes
为None
,则使用os.cpu_count()
返回的值。context – 是用于指定启动工作进程的上下文。通常一个进程池是使用函数
multiprocessing.Pool()
或者一个上下文对象的Pool()
方法创建的。
- apply(func[, args[, kwds]])
对应的子进程是排队执行的,实际非并行(阻塞的,即上一个子进程完成了才能进行下一个子进程;注意是单个子进程执行的,而不是按批执行的)。
- apply_async(func[, args[, kwds[, callback[, error_callback]]]])
对应的每个子进程是异步执行的;异步执行指的是一批子进程并行执行,且子进程完成一个,就新开始一个,而不必等待同一批其他进程完成(进程数 < 任务数,后面的任务只能等前面的进程结束才能开始执行)。如果指定了
callback
, 它必须是一个接受单个参数的可调用对象。当执行成功时,callback
会被用于处理执行后的返回结果,否则,调用error_callback
。如果指定了error_callback
, 它必须是一个接受单个参数的可调用对象。当目标函数执行失败时,会将抛出的异常对象作为参数传递给error_callback
执行。
- map(func, iterable[, chunksize])
内置
map()
函数的并行版本(但它只支持一个iterable
参数,对于多个可迭代对象请参阅starmap()
)。 它会保持阻塞直到所有进程都获得结果。这个方法会将可迭代对象分割为许多块,然后提交给进程池。可以将chunksize
设置为一个正整数从而(近似)指定每个块的大小。注意对于很长的迭代对象,可能消耗很多内存。可以考虑使用imap()
或imap_unordered()
并且显示指定chunksize
以提升效率。
- map_async(func, iterable[, chunksize[, callback[, error_callback]]])
map()
的异步(非阻塞)版本,返回 MapResult 实例(其具有get()
方法,获取结果组成的 list)。
- imap(func, iterable[, chunksize])
map()
的迭代器版本,返回迭代器实例。imap()
速度远慢于map()
,但是对内存需求非常小。
- starmap(func, iterable[, chunksize])
子进程活动
func
允许包含多个参数,也即iterable
的每个元素也是iterable
(其每个元素作为func
的参数),返回结果组成 list。
- starmap_async(func, iterable[, chunksize[, callback[, error_callback]]])
starmap()
的异步(并行)版本,返回 MapResult 实例(其具有get()
方法,获取结果组成的 list)。
- close()
关闭进程池,关闭后不能往池中增加新的子进程,然后可以调用
join()
函数等待已有子进程执行完毕。
- terminate()
不必等待未完成的任务,立即停止工作进程。当进程池对象被垃圾回收时,会立即调用
terminate()
。
- join()
等待进程池中的子进程执行完毕,需在
close()
函数后调用。
- get([timeout])
用于获取执行结果。
- wait([timeout])
阻塞,直到返回结果或超时。
Note
多进程并行执行,它们结束的顺序是不定的,但是最终返回的结果列表顺序是与参数args的顺序一一对应的。
19.3. 进程通信
19.3.1. 数据共享:Manager
由 Manager()
返回的管理器对象控制一个服务进程,该进程保存 Python 对象并允许其他进程使用代理操作它们。支持类型: list 、 dict 、 Namespace 、 Lock 、 RLock 、 Semaphore 、 BoundedSemaphore 、 Condition 、 Event 、 Barrier 、 Queue 、 Value 和 Array 。
注意:在操作共享对象的元素(子对象)时,除了赋值操作,其他的方法都作用在共享对象的拷贝上,并不会对共享对象生效。
19.3.2. 数据传递:Queue
- class multiprocessing.Queue([maxsize])
共享队列,允许多个进程放入、多个进程从队列取出对象。
- qsize()
返回队列的大致长度。由于多线程或者多进程的上下文,这个数字是不可靠的。
- empty()
如果队列是空的,返回 True。 由于多线程或多进程的环境,该状态是不可靠的。
- full()
如果队列是满的,返回 True ,反之返回 False 。 由于多线程或多进程的环境,该状态是不可靠的。
- put(obj[, block[, timeout]])
将
obj
放入队列。如果可选参数block
是 True(默认值)且timeout
是 None(默认值), 将会阻塞当前进程,直到有空的缓冲槽。如果timeout
是正数,将会在阻塞了最多timeout
秒之后还是没有可用的缓冲槽时抛出queue.Full
异常。反之(block
是 False 时),仅当有可用缓冲槽时才放入对象,否则抛出queue.Full
异常(在这种情形下timeout
参数会被忽略)。
- get([block[, timeout]])
从队列中取出并返回对象。如果可选参数
block
是 True(默认值)且timeout
是 None(默认值), 将会阻塞当前进程,直到队列中出现可用的对象。如果timeout
是正数,将会在阻塞了最多timeout
秒之后还是没有可用的对象时抛出queue.Empty
异常。反之(block
是 False 时),仅当有可用对象能够取出时返回,否则抛出queue.Empty
异常(在这种情形下timeout
参数会被忽略)。总而言之,在默认参数下,即使队列已经为空也不会抛出异常,会一直阻塞。
Note
使用 multiprocessing.Queue
可能会因为队列中的数据未flush而造成死锁、进程无法退出,建议使用 multiprocessing.Manager().Queue
。
19.3.3. 数据传递:Pipe
- class multiprocessing.Pipe([duplex])
返回一对 Connection 对象
(conn1, conn2)
, 分别表示管道的两端。如果
duplex
被置为 True(默认值),那么该管道是双向的。如果duplex
被置为 False ,那么该管道是单向的,即conn1
只能用于接收消息,而conn2
仅能用于发送消息。- send(obj)
发送数据。只能发送可 pickle 的数据。
- recv()
读取管道中接收到的数据。
- close()
关闭连接对象。当连接对象被垃圾回收时会自动调用。
- poll([timeout])
判断管道对象是否有收到数据待读取。
19.4. 示例
19.4.1. 创建进程
1from multiprocessing import Process
2import os
3
4def info(title):
5 print(title)
6 print('module name:', __name__)
7 print('parent process:', os.getppid())
8 print('process id:', os.getpid())
9
10def f(name):
11 info('function f')
12 print('hello', name)
13
14if __name__ == '__main__':
15 info('main line')
16 p = Process(target=f, args=('bob',))
17 p.start()
18 p.join()
19
20## if __name__ == '__main__' 是必需的
19.4.2. 锁
使用 Lock 同步,在一个任务输出完成之后,再允许另一个任务输出,可以避免多个任务同时向终端输出。
1from multiprocessing import Process, Lock
2
3def f(l, i):
4 l.acquire()
5 try:
6 print('hello world', i)
7 finally:
8 l.release()
9
10if __name__ == '__main__':
11 lock = Lock()
12
13 for num in range(10):
14 Process(target=f, args=(lock, num)).start()
19.4.3. Pool
1def f(a): ## map方法只允许1个参数
2 return a
3
4if __name__ == "__main__":
5 pool = multiprocessing.Pool()
6 result = [pool.apply_async(f, (a,)) for a in [10,20]]
7 pool.close()
8 pool.join()
9
10 for item in result:
11 print(item.get())
1def f(a, b): ## starmap方法允许多个参数
2 return a
3
4if __name__ == "__main__":
5 pool = multiprocessing.Pool()
6 result = pool.starmap_async(f, ((a0, b0), (a1, b1), )).get()
7 pool.close()
8 pool.join()
1import multiprocessing
2import time
3
4def f(t):
5 time.sleep(t)
6 print(t)
7 return t
8
9if __name__ == "__main__":
10 pool = multiprocessing.Pool(3)
11 ## 阻塞
12 pool.map(f, [0.1, 5, 0.1, 2, 1])
13 print("===")
14 ## 非阻塞
15 result = pool.map_async(f, [0.1, 5, 0.1, 2, 1])
16 print("here")
17 print("more")
18 ## 人为阻塞,直到获得所有的结果
19 result.wait()
20 print("end")
21 pool.close()
22 pool.join()
23 print(result.get())
结果是:
0.1
0.1
1
2
5
0.1
0.1
===
here
more
1
2
5
end
[0.1, 5, 0.1, 2, 1]
可以看出: map
是阻塞的,直到所有任务都结束; map_async
是非阻塞的,中间可以插入其他结果。
19.4.4. Manager
1from multiprocessing import Process, Manager
2import os
3
4def f(d, l):
5 d[1] = 'Python'
6 d[2] = "Java"
7 d[3] = str(os.getpid())
8 l.append(os.getpid()) # 获得当前的进程号
9 print(l)
10
11
12if __name__ == '__main__':
13 with Manager() as manager:
14 d = manager.dict()
15 l = manager.list()
16 p_list = []
17 for i in range(3):
18 p = Process(target=f, args=(d, l))
19 p.start()
20 p_list.append(p)
21 for res in p_list:
22 res.join()
23 print(d)
输出:
[14168]
[14168, 14108]
[14168, 14108, 5412]
{1: 'Python', 2: 'Java', 3: '5412'}
19.4.5. Pipe
1import time, random
2from multiprocessing import Process, Pipe, current_process
3from multiprocessing.connection import wait
4
5'''
6wait(object_list) :
7可以一次轮询多个连接对象,一直等待直到 object_list 中某个对象处于就绪状态。
8返回 object_list 中处于就绪状态的对象。
9当一个连接或者套接字对象拥有有效的数据可被读取的时候,或者另一端关闭后,这个对象就处于就绪状态。
10'''
11
12def foo(w):
13 for i in range(5):
14 w.send((i, current_process().name))
15 w.close()
16
17if __name__ == '__main__':
18 readers = []
19
20 for i in range(2):
21 r, w = Pipe(duplex=False)
22 readers.append(r)
23 p = Process(target=foo, args=(w,))
24 p.start()
25 w.close()
26
27 while readers:
28 for r in wait(readers):
29 try:
30 msg = r.recv()
31 except EOFError:
32 readers.remove(r)
33 else:
34 print(msg)
输出:
(0, 'Process-1')
(1, 'Process-1')
(2, 'Process-1')
(3, 'Process-1')
(4, 'Process-1')
(0, 'Process-2')
(1, 'Process-2')
(2, 'Process-2')
(3, 'Process-2')
(4, 'Process-2')
Note
在多进程任务中,如果每个进程都要从一个共享队列中读数据,而这个队列需要存储的列表很长, 把数据完整地填充(put)到队列中需要花费很长时间,在这种情况下,不要等到队列中填充完了所有的数据才启动进程, 而是先启动(start)进程任务,再填充队列,直到进程结束(join):
...
p.start()
fill_queue()
p.join()
...
这样一来,总体上看,一边填充队列,任务一边执行,效率大大提高。但是,进程读队列的时候队列可能为空, 这时候需要处理异常,读到空的次数达到一定阈值的时候任务结束:
## python3
import queue
## process task
while True:
empty_cnt = 0
try:
...
except queue.Empty as e:
empty_cnt += 1
if empty_cnt > th_cnt:
break
19.5. 参考资料
multiprocessing — Process-based parallelism
python并行计算(上):multiprocessing、multiprocess模块