Async + Multiprocessing
下面给你一份 最清晰、最实用的 async + multiprocessing 结合方案。
这是 Python 最强组合:
async 处理 IO,multiprocessing 处理 CPU,协同工作、互不阻塞。
大原则:
- CPU 密集 -> multiprocessing(绕开 GIL)
- IO 密集 -> async/await(高并发)
- 将 CPU 任务丢给进程池,让主事件循环继续跑 async
关键工具:
| 工具 | 说明 |
|---|---|
| asyncio.get_running_loop().run_in_executor() | 最推荐(官方) |
| ProcessPoolExecutor | 使用进程池跑 CPU 任务 |
| multiprocessing.Pool + asyncio.to_thread() | 组合用法 |
这是最优雅、最安全、FastAPI 官方推荐的写法。
import asyncio
from concurrent.futures import ProcessPoolExecutor
def cpu_task(x):
# 大计算任务
s = 0
for i in range(10_000_000):
s += i + x
return s
async def main():
loop = asyncio.get_running_loop()
# 创建进程池(根据 CPU 核心)
with ProcessPoolExecutor() as pool:
tasks = [
loop.run_in_executor(pool, cpu_task, i)
for i in range(5)
]
results = await asyncio.gather(*tasks)
print(results)
asyncio.run(main())
运行效果:
- 主 async loop 不被阻塞
- CPU 计算在多个进程中并行执行
- async 继续做自己的 IO 操作
from fastapi import FastAPI
from concurrent.futures import ProcessPoolExecutor
import asyncio
app = FastAPI()
pool = ProcessPoolExecutor()
def heavy_calc(x):
return sum(i * x for i in range(10_000_000))
@app.get("/calc")
async def calc(x: int):
loop = asyncio.get_running_loop()
result = await loop.run_in_executor(pool, heavy_calc, x)
return {"result": result}
特点:
- HTTP API 是 async,可高并发
- CPU 运算丢给进程池
- 全服务不会被阻塞
例如你有一个 multiprocessing 模块 utils.process_pool:
# utils/process_pool.py
from multiprocessing import Pool, cpu_count
def parallel_map(func, items):
with Pool(cpu_count()) as p:
return p.map(func, items)
async 中调用:
import asyncio
from utils.process_pool import parallel_map
async def main():
loop = asyncio.get_running_loop()
result = await loop.run_in_executor(
None, # 默认 ThreadPool,但函数内部是 multiprocessing
parallel_map,
lambda x: x * x,
list(range(10))
)
print(result)
asyncio.run(main())
注意:
- 这里 async 进入线程池,而线程池内部启动多进程池
- 适合大规模 CPU 并行任务
- 但启动成本比 ProcessPoolExecutor 大
需要 CPU 完成后通知 async:
import asyncio
from concurrent.futures import ProcessPoolExecutor
pool = ProcessPoolExecutor()
def cpu_task(x):
return x * x
async def main():
loop = asyncio.get_running_loop()
future = loop.run_in_executor(pool, cpu_task, 10)
# 可使用 callback 方式
def on_done(f):
print("Done:", f.result())
future.add_done_callback(on_done)
await future
asyncio.run(main())
例如你想每个进程内执行 async/await:
import asyncio
from concurrent.futures import ProcessPoolExecutor
async def async_subtask(x):
await asyncio.sleep(1)
return x * 2
def cpu_process():
# 每个进程内运行 async
return asyncio.run(async_subtask(10))
async def main():
loop = asyncio.get_running_loop()
with ProcessPoolExecutor() as pool:
result = await loop.run_in_executor(pool, cpu_process)
print(result)
asyncio.run(main())
用途:
- 每个进程内跑 IO + CPU 混合任务
- 例如大文件分段下载 + 校验
asyncio loop
(处理 IO、网络、API)
│
│ run_in_executor
│
▼
ProcessPoolExecutor
(多个进程并行跑 CPU 任务)
优势:
- async 不阻塞
- CPU 真并行
- 平滑扩展到 FastAPI 等框架
你的项目使用 FastAPI + SQLAlchemy + 异步 -> 最佳组合:
async -> CPU-heavy -> ProcessPoolExecutor
async -> IO-heavy -> await function()
只有一个函数:
result = await run_cpu_async(cpu_func, x)
即可自动切到多进程池执行 CPU 密集函数。
采用 asyncio + ProcessPoolExecutor,这是最稳、最通用的方式。
⸻
# utils/async_process.py
import asyncio
from concurrent.futures import ProcessPoolExecutor
from functools import partial
# 全局的进程池(避免频繁创建进程)
_PROCESS_POOL = ProcessPoolExecutor()
async def run_cpu_async(func, *args, **kwargs):
"""
在异步环境中执行 CPU 密集函数,自动切换到多进程。
用法:
result = await run_cpu_async(cpu_func, x, y)
"""
loop = asyncio.get_running_loop()
# 通过 partial 打包参数
fn = partial(func, *args, **kwargs)
# 在进程池执行
return await loop.run_in_executor(_PROCESS_POOL, fn)
def cpu_heavy(n):
s = 0
for i in range(10_000_000):
s += (i * n) % 7
return s
import asyncio
from utils.async_process import run_cpu_async
async def main():
result = await run_cpu_async(cpu_heavy, 5)
print("结果:", result)
asyncio.run(main())
| 技术 | 用途 |
|---|---|
| asyncio | 异步调度 |
| ProcessPoolExecutor | 多进程池,用来跑 CPU-heavy 函数 |
| loop.run_in_executor | 把同步 CPU 任务扔进进程池,不阻塞事件循环 |
| partial | 打包函数和参数 |
你只需写:
await run_cpu_async(func, ...)
完全不用关心多进程创建、队列通信、worker 管理等。
FastAPI / aiohttp / 机器人 框架中非常适用。
无需改 async。