Skip to main content
Documents
Toggle Dark/Light/Auto mode Toggle Dark/Light/Auto mode Toggle Dark/Light/Auto mode Back to homepage

Async + Multiprocessing

下面给你一份 最清晰、最实用的 async + multiprocessing 结合方案。

这是 Python 最强组合:

async 处理 IO,multiprocessing 处理 CPU,协同工作、互不阻塞。

🚀 async + multiprocessing 结合(最佳实践指南)

大原则:

  • CPU 密集 -> multiprocessing(绕开 GIL)
  • IO 密集 -> async/await(高并发)
  • 将 CPU 任务丢给进程池,让主事件循环继续跑 async

关键工具:

工具 说明
asyncio.get_running_loop().run_in_executor() 最推荐(官方)
ProcessPoolExecutor 使用进程池跑 CPU 任务
multiprocessing.Pool + asyncio.to_thread() 组合用法

1. 最推荐的写法:async + ProcessPoolExecutor

这是最优雅、最安全、FastAPI 官方推荐的写法。

示例:async 调用 CPU 多进程

import asyncio
from concurrent.futures import ProcessPoolExecutor


def cpu_task(x):
    # 大计算任务
    s = 0
    for i in range(10_000_000):
        s += i + x
    return s


async def main():
    loop = asyncio.get_running_loop()
    # 创建进程池(根据 CPU 核心)
    with ProcessPoolExecutor() as pool:
        tasks = [
            loop.run_in_executor(pool, cpu_task, i)
            for i in range(5)
        ]
        results = await asyncio.gather(*tasks)
        print(results)


asyncio.run(main())

运行效果:

  • 主 async loop 不被阻塞
  • CPU 计算在多个进程中并行执行
  • async 继续做自己的 IO 操作

2. 和 FastAPI 完美结合

from fastapi import FastAPI
from concurrent.futures import ProcessPoolExecutor
import asyncio

app = FastAPI()
pool = ProcessPoolExecutor()


def heavy_calc(x):
    return sum(i * x for i in range(10_000_000))


@app.get("/calc")
async def calc(x: int):
    loop = asyncio.get_running_loop()
    result = await loop.run_in_executor(pool, heavy_calc, x)
    return {"result": result}

特点:

  • HTTP API 是 async,可高并发
  • CPU 运算丢给进程池
  • 全服务不会被阻塞

3. 如果你的 CPU 任务是批量 -> 结合 multiprocessing.Pool

例如你有一个 multiprocessing 模块 utils.process_pool:

# utils/process_pool.py
from multiprocessing import Pool, cpu_count

def parallel_map(func, items):
    with Pool(cpu_count()) as p:
        return p.map(func, items)

async 中调用:

import asyncio
from utils.process_pool import parallel_map

async def main():
    loop = asyncio.get_running_loop()
    result = await loop.run_in_executor(
        None,  # 默认 ThreadPool,但函数内部是 multiprocessing
        parallel_map,
        lambda x: x * x,
        list(range(10))
    )
    print(result)

asyncio.run(main())

注意:

  • 这里 async 进入线程池,而线程池内部启动多进程池
  • 适合大规模 CPU 并行任务
  • 但启动成本比 ProcessPoolExecutor 大

4. async + multiprocessing + 回调(高级用法)

需要 CPU 完成后通知 async:

import asyncio
from concurrent.futures import ProcessPoolExecutor

pool = ProcessPoolExecutor()

def cpu_task(x):
    return x * x

async def main():
    loop = asyncio.get_running_loop()
    future = loop.run_in_executor(pool, cpu_task, 10)
    
    # 可使用 callback 方式
    def on_done(f):
        print("Done:", f.result())
    
    future.add_done_callback(on_done)
    await future

asyncio.run(main())

5. 多进程内运行 async (极高级)

例如你想每个进程内执行 async/await:

import asyncio
from concurrent.futures import ProcessPoolExecutor

async def async_subtask(x):
    await asyncio.sleep(1)
    return x * 2

def cpu_process():
    # 每个进程内运行 async
    return asyncio.run(async_subtask(10))

async def main():
    loop = asyncio.get_running_loop()
    with ProcessPoolExecutor() as pool:
        result = await loop.run_in_executor(pool, cpu_process)
        print(result)

asyncio.run(main())

用途:

  • 每个进程内跑 IO + CPU 混合任务
  • 例如大文件分段下载 + 校验

6. async + multiprocessing 最终最佳架构图


        asyncio loop
     (处理 IO、网络、API)

            │ run_in_executor

      ProcessPoolExecutor
     (多个进程并行跑 CPU 任务)

优势:

  • async 不阻塞
  • CPU 真并行
  • 平滑扩展到 FastAPI 等框架

最推荐的组合方式

你的项目使用 FastAPI + SQLAlchemy + 异步 -> 最佳组合:

async -> CPU-heavy -> ProcessPoolExecutor
async -> IO-heavy  -> await function()

封装 utils/async_process.py

只有一个函数:

result = await run_cpu_async(cpu_func, x)

即可自动切到多进程池执行 CPU 密集函数。

采用 asyncio + ProcessPoolExecutor,这是最稳、最通用的方式。

✅ utils/async_process.py

# utils/async_process.py

import asyncio
from concurrent.futures import ProcessPoolExecutor
from functools import partial

# 全局的进程池(避免频繁创建进程)
_PROCESS_POOL = ProcessPoolExecutor()


async def run_cpu_async(func, *args, **kwargs):
    """
    在异步环境中执行 CPU 密集函数,自动切换到多进程。
    用法:
        result = await run_cpu_async(cpu_func, x, y)
    """

    loop = asyncio.get_running_loop()

    # 通过 partial 打包参数
    fn = partial(func, *args, **kwargs)

    # 在进程池执行
    return await loop.run_in_executor(_PROCESS_POOL, fn)

✅ 使用示例

1. CPU 密集函数(普通同步函数)

def cpu_heavy(n):
    s = 0
    for i in range(10_000_000):
        s += (i * n) % 7
    return s

2. 异步调用

import asyncio
from utils.async_process import run_cpu_async

async def main():
    result = await run_cpu_async(cpu_heavy, 5)
    print("结果:", result)

asyncio.run(main())

⭐ 技术解析(简明)

技术 用途
asyncio 异步调度
ProcessPoolExecutor 多进程池,用来跑 CPU-heavy 函数
loop.run_in_executor 把同步 CPU 任务扔进进程池,不阻塞事件循环
partial 打包函数和参数

🚀 这样封装的优势

✅ 自动切换到多进程

你只需写:

await run_cpu_async(func, ...)

完全不用关心多进程创建、队列通信、worker 管理等。

✅ 不会阻塞 event loop

FastAPI / aiohttp / 机器人 框架中非常适用。

✅ 支持任意同步 CPU 函数

无需改 async。

✅ 可长期作为系统公共工具