Multiprocessing (多进程)
下面是 最清晰、最实用、完全够用的 Python multiprocessing(多进程)指南,从入门到进阶,一次讲明白。
multiprocessing 是 Python 用来 绕过 GIL、充分利用多核 CPU 的官方库。
适用于:
- CPU 密集型(计算、压缩、图像处理、加密、矩阵运算)
- 并行跑多个独立任务
- 并发处理大型数据集
- 加快模型训练前的准备任务
因为 Python 有 GIL(全局解释器锁):
- 多线程不能利用多核 CPU
- CPU 密集运算用线程不升反降
multiprocessing 的每个进程都有 独立的 Python 解释器和 GIL -> 真正并行运行。
from multiprocessing import Process
def worker(n):
print("Working:", n)
if __name__ == '__main__':
p = Process(target=worker, args=(1,))
p.start()
p.join()
要点:
- Process() 创建一个进程
- start() 启动
- join() 等待结束
- 必须放在
if __name__ == '__main__'下(Windows / macOS 必须要,不然会无限递归启动进程)
Pool 是使用最多的方式:
固定数量进程池(推荐)
from multiprocessing import Pool
def task(x):
return x * x
if __name__ == '__main__':
with Pool(4) as pool:
results = pool.map(task, range(10))
print(results)
输出:
[0, 1, 4, 9, 16, ...]
map() -> 自动拆分任务、多进程并行、收集结果。
非常适合大批量 CPU 任务:
from multiprocessing import Pool
def task(x):
return x * x
def done(result):
print("Result:", result)
if __name__ == '__main__':
pool = Pool(4)
for i in range(10):
pool.apply_async(task, args=(i,), callback=done)
pool.close()
pool.join()
多进程 + 异步 + 回调。
from multiprocessing import Process, Queue
def worker(q):
q.put("hello")
if __name__ == '__main__':
q = Queue()
p = Process(target=worker, args=(q,))
p.start()
print(q.get()) # 从子进程取值
p.join()
from multiprocessing import Process, Pipe
def worker(conn):
conn.send("hi")
conn.close()
if __name__ == '__main__':
parent, child = Pipe()
p = Process(target=worker, args=(child,))
p.start()
print(parent.recv())
p.join()
from multiprocessing import Process, Value
def worker(v):
v.value += 1
if __name__ == '__main__':
val = Value('i', 0)
ps = [Process(target=worker, args=(val,)) for _ in range(5)]
for p in ps: p.start()
for p in ps: p.join()
print(val.value) # 5
from multiprocessing import Process, Manager
def worker(lst):
lst.append(1)
if __name__ == '__main__':
mgr = Manager()
lst = mgr.list()
ps = [Process(target=worker, args=(lst,)) for _ in range(5)]
for p in ps: p.start()
for p in ps: p.join()
print(lst) # [1, 1, 1, 1, 1]
防止多个进程同时修改同一资源。
from multiprocessing import Process, Lock
def worker(lock, n):
with lock:
print("Hello", n)
if __name__ == '__main__':
lock = Lock()
ps = [Process(target=worker, args=(lock, i)) for i in range(5)]
for p in ps: p.start()
for p in ps: p.join()
| 特性 | multiprocessing | threading |
|---|---|---|
| CPU 密集 | ✅ 极佳 | ❌ 受 GIL 限制 |
| IO 密集 | ❌ 不如 async | ✅ 可以 |
| 内存占用 | 大 | 小 |
| 数据共享 | 复杂(IPC) | 简单(内存共享) |
| 启动速度 | 慢 | 快 |
总结:
- CPU 密集 -> multiprocessing
- IO 密集 -> async / await
- 轻量混合任务 -> threading
if __name__ == '__main__':
必须使用:
- Queue
- Manager
- Pipe
- Value / Array
你可以在 子进程里运行异步任务:
def run_async():
asyncio.run(main_async_function())
多进程 + 异步 -> CPU + IO 双加速。
from multiprocessing import Pool, cpu_count
def run_in_parallel(func, data, worker=None):
worker = worker or cpu_count()
with Pool(worker) as pool:
return pool.map(func, data)
调用:
from utils.processes import run_in_parallel
results = run_in_parallel(task, items)
- 多进程用于 CPU 密集型任务
- 每个进程有自己独立的 GIL -> 真正并行
- 使用 Pool 是最推荐的写法