Skip to main content
Documents
Toggle Dark/Light/Auto mode Toggle Dark/Light/Auto mode Toggle Dark/Light/Auto mode Back to homepage

Multiprocessing (多进程)

下面是 最清晰、最实用、完全够用的 Python multiprocessing(多进程)指南,从入门到进阶,一次讲明白。

multiprocessing 是 Python 用来 绕过 GIL、充分利用多核 CPU 的官方库。

适用于:

  • CPU 密集型(计算、压缩、图像处理、加密、矩阵运算)
  • 并行跑多个独立任务
  • 并发处理大型数据集
  • 加快模型训练前的准备任务

1. 为什么需要 multiprocessing?

因为 Python 有 GIL(全局解释器锁):

  • 多线程不能利用多核 CPU
  • CPU 密集运算用线程不升反降

multiprocessing 的每个进程都有 独立的 Python 解释器和 GIL -> 真正并行运行。

2. 最简单的 multiprocessing 示例

from multiprocessing import Process

def worker(n):
    print("Working:", n)

if __name__ == '__main__':
    p = Process(target=worker, args=(1,))
    p.start()
    p.join()

要点:

  • Process() 创建一个进程
  • start() 启动
  • join() 等待结束
  • 必须放在 if __name__ == '__main__' 下(Windows / macOS 必须要,不然会无限递归启动进程)

3. 用 Pool 更优雅地创建进程池

Pool 是使用最多的方式:

固定数量进程池(推荐)

from multiprocessing import Pool

def task(x):
    return x * x

if __name__ == '__main__':
    with Pool(4) as pool:
        results = pool.map(task, range(10))
        print(results)

输出:

[0, 1, 4, 9, 16, ...]

map() -> 自动拆分任务、多进程并行、收集结果。

4. apply_async:带回调的异步提交任务

非常适合大批量 CPU 任务:

from multiprocessing import Pool

def task(x):
    return x * x

def done(result):
    print("Result:", result)

if __name__ == '__main__':
    pool = Pool(4)
    for i in range(10):
        pool.apply_async(task, args=(i,), callback=done)
    pool.close()
    pool.join()

多进程 + 异步 + 回调。

5. 进程间通信:Queue / Pipe

5.1 Queue(最常用)

from multiprocessing import Process, Queue

def worker(q):
    q.put("hello")

if __name__ == '__main__':
    q = Queue()
    p = Process(target=worker, args=(q,))
    p.start()
    print(q.get())   # 从子进程取值
    p.join()

5.2 Pipe(点对点更快)

from multiprocessing import Process, Pipe

def worker(conn):
    conn.send("hi")
    conn.close()

if __name__ == '__main__':
    parent, child = Pipe()
    p = Process(target=worker, args=(child,))
    p.start()
    print(parent.recv())
    p.join()

6. 共享数据(Value / Array / Manager)

6.1 Value(单个变量共享)

from multiprocessing import Process, Value

def worker(v):
    v.value += 1

if __name__ == '__main__':
    val = Value('i', 0)
    ps = [Process(target=worker, args=(val,)) for _ in range(5)]
    for p in ps: p.start()
    for p in ps: p.join()
    print(val.value)  # 5

6.2 Array(共享数组)

6.3 Manager(跨进程共享 dict / list)

from multiprocessing import Process, Manager

def worker(lst):
    lst.append(1)

if __name__ == '__main__':
    mgr = Manager()
    lst = mgr.list()

    ps = [Process(target=worker, args=(lst,)) for _ in range(5)]
    for p in ps: p.start()
    for p in ps: p.join()

    print(lst)     # [1, 1, 1, 1, 1]

7. 进程锁(Lock)

防止多个进程同时修改同一资源。

from multiprocessing import Process, Lock

def worker(lock, n):
    with lock:
        print("Hello", n)

if __name__ == '__main__':
    lock = Lock()
    ps = [Process(target=worker, args=(lock, i)) for i in range(5)]
    for p in ps: p.start()
    for p in ps: p.join()

8. multiprocessing vs threading

特性 multiprocessing threading
CPU 密集 ✅ 极佳 ❌ 受 GIL 限制
IO 密集 ❌ 不如 async ✅ 可以
内存占用
数据共享 复杂(IPC) 简单(内存共享)
启动速度

总结:

  • CPU 密集 -> multiprocessing
  • IO 密集 -> async / await
  • 轻量混合任务 -> threading

9. multiprocessing 最佳实践

9.1 必须加主保护

if __name__ == '__main__':

9.2 推荐使用 Pool 而不是手动开进程

9.3 不要共享普通对象(进程不共享内存)

必须使用:

  • Queue
  • Manager
  • Pipe
  • Value / Array

9.4 长任务优先 multiprocessing

9.5 大批量小任务先合并成大任务(减少 IPC 负担)

10. multiprocessing + async(专家级)

你可以在 子进程里运行异步任务

def run_async():
    asyncio.run(main_async_function())

多进程 + 异步 -> CPU + IO 双加速。

11. 完整推荐模板(你可直接放 utils/processes.py)

from multiprocessing import Pool, cpu_count

def run_in_parallel(func, data, worker=None):
    worker = worker or cpu_count()
    with Pool(worker) as pool:
        return pool.map(func, data)

调用:

from utils.processes import run_in_parallel

results = run_in_parallel(task, items)

12. 总结(最关键的三句话)

  • 多进程用于 CPU 密集型任务
  • 每个进程有自己独立的 GIL -> 真正并行
  • 使用 Pool 是最推荐的写法