19. multiprocessing

multiprocessing,基于进程的并行。

19.1. Process 类

class multiprocessing.Process(group=None, target=None, name=None, args=(), kwargs={}, *, daemon=None)

进程对象表示在单独进程中运行的活动。

Parameters
  • group – 应该始终是 None ,它仅用于兼容 threading.Threadtarget 是由 run() 方法调用的可调用对象;默认为 None ,意味着什么都没有被调用。

  • name – 是进程名称。

  • args – 是目标调用的参数元组;kwargs 是目标调用的关键字参数字典。

  • daemon – 是进程的守护标志,一个布尔值,必须在 start() 被调用之前设置;初始值继承自创建进程。

run()

表示进程活动的方法,可以在子类中重载此方法。标准 run() 方法调用传递给对象构造函数的可调用对象( target )。

start()

启动进程活动,每个进程对象最多只能调用一次;它会将对象的 run() 方法安排在一个单独的进程中调用。

join([timeout])

使主调进程(包含 p.join() 语句的进程)阻塞,直至被调用进程 p 运行结束或超时(指定 timeout )。

is_alive()

返回进程是否处于活动状态。粗略地说,从 start() 方法返回到子进程终止之前,进程对象仍处于活动状态。

name()

进程的名称,是一个字符串,仅用于识别目的;可以为多个进程指定相同的名称。

pid()

进程ID,在生成该进程之前为 None

19.2. 进程池

Pool 类表示一个工作进程池,它具有允许以几种不同方式将任务分配到工作进程的方法。

class multiprocessing.pool.Pool([processes[, initializer[, initargs[, maxtasksperchild[, context]]]]])
Parameters
  • processes – 是要使用的工作进程数目。如果 processesNone ,则使用 os.cpu_count() 返回的值。

  • context – 是用于指定启动工作进程的上下文。通常一个进程池是使用函数 multiprocessing.Pool() 或者一个上下文对象的 Pool() 方法创建的。

apply(func[, args[, kwds]])

对应的子进程是排队执行的,实际非并行(阻塞的,即上一个子进程完成了才能进行下一个子进程;注意是单个子进程执行的,而不是按批执行的)。

apply_async(func[, args[, kwds[, callback[, error_callback]]]])

对应的每个子进程是异步执行的;异步执行指的是一批子进程并行执行,且子进程完成一个,就新开始一个,而不必等待同一批其他进程完成(进程数 < 任务数,后面的任务只能等前面的进程结束才能开始执行)。如果指定了 callback , 它必须是一个接受单个参数的可调用对象。当执行成功时, callback 会被用于处理执行后的返回结果,否则,调用 error_callback 。如果指定了 error_callback , 它必须是一个接受单个参数的可调用对象。当目标函数执行失败时,会将抛出的异常对象作为参数传递给 error_callback 执行。

map(func, iterable[, chunksize])

内置 map() 函数的并行版本(但它只支持一个 iterable 参数,对于多个可迭代对象请参阅 starmap() )。 它会保持阻塞直到所有进程都获得结果。这个方法会将可迭代对象分割为许多块,然后提交给进程池。可以将 chunksize 设置为一个正整数从而(近似)指定每个块的大小。注意对于很长的迭代对象,可能消耗很多内存。可以考虑使用 imap()imap_unordered() 并且显示指定 chunksize 以提升效率。

map_async(func, iterable[, chunksize[, callback[, error_callback]]])

map() 的异步(非阻塞)版本,返回 MapResult 实例(其具有 get() 方法,获取结果组成的 list)。

imap(func, iterable[, chunksize])

map() 的迭代器版本,返回迭代器实例。 imap() 速度远慢于 map() ,但是对内存需求非常小。

starmap(func, iterable[, chunksize])

子进程活动 func 允许包含多个参数,也即 iterable 的每个元素也是 iterable (其每个元素作为 func 的参数),返回结果组成 list。

starmap_async(func, iterable[, chunksize[, callback[, error_callback]]])

starmap() 的异步(并行)版本,返回 MapResult 实例(其具有 get() 方法,获取结果组成的 list)。

close()

关闭进程池,关闭后不能往池中增加新的子进程,然后可以调用 join() 函数等待已有子进程执行完毕。

terminate()

不必等待未完成的任务,立即停止工作进程。当进程池对象被垃圾回收时,会立即调用 terminate()

join()

等待进程池中的子进程执行完毕,需在 close() 函数后调用。

get([timeout])

用于获取执行结果。

wait([timeout])

阻塞,直到返回结果或超时。

Note

多进程并行执行,它们结束的顺序是不定的,但是最终返回的结果列表顺序是与参数args的顺序一一对应的。

19.3. 进程通信

19.3.1. 数据共享:Manager

Manager() 返回的管理器对象控制一个服务进程,该进程保存 Python 对象并允许其他进程使用代理操作它们。支持类型: list 、 dict 、 Namespace 、 Lock 、 RLock 、 Semaphore 、 BoundedSemaphore 、 Condition 、 Event 、 Barrier 、 Queue 、 Value 和 Array 。

注意:在操作共享对象的元素(子对象)时,除了赋值操作,其他的方法都作用在共享对象的拷贝上,并不会对共享对象生效。

19.3.2. 数据传递:Queue

class multiprocessing.Queue([maxsize])

共享队列,允许多个进程放入、多个进程从队列取出对象。

qsize()

返回队列的大致长度。由于多线程或者多进程的上下文,这个数字是不可靠的。

empty()

如果队列是空的,返回 True。 由于多线程或多进程的环境,该状态是不可靠的。

full()

如果队列是满的,返回 True ,反之返回 False 。 由于多线程或多进程的环境,该状态是不可靠的。

put(obj[, block[, timeout]])

obj 放入队列。如果可选参数 block 是 True(默认值)且 timeout 是 None(默认值), 将会阻塞当前进程,直到有空的缓冲槽。如果 timeout 是正数,将会在阻塞了最多 timeout 秒之后还是没有可用的缓冲槽时抛出 queue.Full 异常。反之( block 是 False 时),仅当有可用缓冲槽时才放入对象,否则抛出 queue.Full 异常(在这种情形下 timeout 参数会被忽略)。

get([block[, timeout]])

从队列中取出并返回对象。如果可选参数 block 是 True(默认值)且 timeout 是 None(默认值), 将会阻塞当前进程,直到队列中出现可用的对象。如果 timeout 是正数,将会在阻塞了最多 timeout 秒之后还是没有可用的对象时抛出 queue.Empty 异常。反之( block 是 False 时),仅当有可用对象能够取出时返回,否则抛出 queue.Empty 异常(在这种情形下 timeout 参数会被忽略)。总而言之,在默认参数下,即使队列已经为空也不会抛出异常,会一直阻塞。

Note

使用 multiprocessing.Queue 可能会因为队列中的数据未flush而造成死锁、进程无法退出,建议使用 multiprocessing.Manager().Queue

19.3.3. 数据传递:Pipe

class multiprocessing.Pipe([duplex])

返回一对 Connection 对象 (conn1, conn2) , 分别表示管道的两端。

如果 duplex 被置为 True(默认值),那么该管道是双向的。如果 duplex 被置为 False ,那么该管道是单向的,即 conn1 只能用于接收消息,而 conn2 仅能用于发送消息。

send(obj)

发送数据。只能发送可 pickle 的数据。

recv()

读取管道中接收到的数据。

close()

关闭连接对象。当连接对象被垃圾回收时会自动调用。

poll([timeout])

判断管道对象是否有收到数据待读取。

19.4. 示例

19.4.1. 创建进程

 1from multiprocessing import Process
 2import os
 3
 4def info(title):
 5    print(title)
 6    print('module name:', __name__)
 7    print('parent process:', os.getppid())
 8    print('process id:', os.getpid())
 9
10def f(name):
11    info('function f')
12    print('hello', name)
13
14if __name__ == '__main__':
15    info('main line')
16    p = Process(target=f, args=('bob',))
17    p.start()
18    p.join()
19
20## if __name__ == '__main__' 是必需的

19.4.2.

使用 Lock 同步,在一个任务输出完成之后,再允许另一个任务输出,可以避免多个任务同时向终端输出。

 1from multiprocessing import Process, Lock
 2
 3def f(l, i):
 4    l.acquire()
 5    try:
 6        print('hello world', i)
 7    finally:
 8        l.release()
 9
10if __name__ == '__main__':
11    lock = Lock()
12
13    for num in range(10):
14        Process(target=f, args=(lock, num)).start()

19.4.3. Pool

 1def f(a): ## map方法只允许1个参数
 2    return a
 3
 4if __name__ == "__main__":
 5    pool = multiprocessing.Pool()
 6    result = [pool.apply_async(f, (a,)) for a in [10,20]]
 7    pool.close()
 8    pool.join()
 9
10    for item in result:
11        print(item.get())
1def f(a, b): ## starmap方法允许多个参数
2    return a
3
4if __name__ == "__main__":
5    pool = multiprocessing.Pool()
6    result = pool.starmap_async(f, ((a0, b0), (a1, b1), )).get()
7    pool.close()
8    pool.join()
 1import multiprocessing
 2import time
 3
 4def f(t):
 5    time.sleep(t)
 6    print(t)
 7    return t
 8
 9if __name__ == "__main__":
10    pool = multiprocessing.Pool(3)
11    ## 阻塞
12    pool.map(f, [0.1, 5, 0.1, 2, 1])
13    print("===")
14    ## 非阻塞
15    result = pool.map_async(f, [0.1, 5, 0.1, 2, 1])
16    print("here")
17    print("more")
18    ## 人为阻塞,直到获得所有的结果
19    result.wait()
20    print("end")
21    pool.close()
22    pool.join()
23    print(result.get())

结果是:

0.1
0.1
1
2
5
0.1
0.1
===
here
more
1
2
5
end
[0.1, 5, 0.1, 2, 1]

可以看出: map 是阻塞的,直到所有任务都结束; map_async 是非阻塞的,中间可以插入其他结果。

19.4.4. Manager

 1from multiprocessing import Process, Manager
 2import os
 3
 4def f(d, l):
 5    d[1] = 'Python'
 6    d[2] = "Java"
 7    d[3] = str(os.getpid())
 8    l.append(os.getpid()) # 获得当前的进程号
 9    print(l)
10
11
12if __name__ == '__main__':
13    with Manager() as manager:
14        d = manager.dict()
15        l = manager.list()
16        p_list = []
17        for i in range(3):
18            p = Process(target=f, args=(d, l))
19            p.start()
20            p_list.append(p)
21        for res in p_list:
22            res.join()
23        print(d)

输出:

[14168]
[14168, 14108]
[14168, 14108, 5412]
{1: 'Python', 2: 'Java', 3: '5412'}

19.4.5. Pipe

 1import time, random
 2from multiprocessing import Process, Pipe, current_process
 3from multiprocessing.connection import wait
 4
 5'''
 6wait(object_list) :
 7可以一次轮询多个连接对象,一直等待直到 object_list 中某个对象处于就绪状态。
 8返回 object_list 中处于就绪状态的对象。
 9当一个连接或者套接字对象拥有有效的数据可被读取的时候,或者另一端关闭后,这个对象就处于就绪状态。
10'''
11
12def foo(w):
13    for i in range(5):
14        w.send((i, current_process().name))
15    w.close()
16
17if __name__ == '__main__':
18    readers = []
19
20    for i in range(2):
21        r, w = Pipe(duplex=False)
22        readers.append(r)
23        p = Process(target=foo, args=(w,))
24        p.start()
25        w.close()
26
27    while readers:
28        for r in wait(readers):
29            try:
30                msg = r.recv()
31            except EOFError:
32                readers.remove(r)
33            else:
34                print(msg)

输出:

(0, 'Process-1')
(1, 'Process-1')
(2, 'Process-1')
(3, 'Process-1')
(4, 'Process-1')
(0, 'Process-2')
(1, 'Process-2')
(2, 'Process-2')
(3, 'Process-2')
(4, 'Process-2')

Note

在多进程任务中,如果每个进程都要从一个共享队列中读数据,而这个队列需要存储的列表很长, 把数据完整地填充(put)到队列中需要花费很长时间,在这种情况下,不要等到队列中填充完了所有的数据才启动进程, 而是先启动(start)进程任务,再填充队列,直到进程结束(join):

...
p.start()
fill_queue()
p.join()
...

这样一来,总体上看,一边填充队列,任务一边执行,效率大大提高。但是,进程读队列的时候队列可能为空, 这时候需要处理异常,读到空的次数达到一定阈值的时候任务结束:

## python3
import queue

## process task
while True:
    empty_cnt = 0
    try:
    ...
    except queue.Empty as e:
        empty_cnt += 1
        if empty_cnt > th_cnt:
            break

19.5. 参考资料

  1. multiprocessing — Process-based parallelism

  1. python并行计算(上):multiprocessing、multiprocess模块