Asyncio
asyncio 是 Python 内置的 异步 I/O 框架,支持:
- 并发网络请求
- 并发任务执行(单线程、多 I/O)
- 高性能爬虫
- 异步数据库(如 asyncpg、SQLAlchemy async)
- 异步 Web 接口(如 FastAPI)
核心思想:协程(coroutine)+ 事件循环(event loop)
| 关键字 | 含义 |
|---|---|
| async def | 定义一个协程函数 |
| await | 暂停当前协程,将控制权交回事件循环 |
| asyncio.create_task() | 创建一个可并发运行的任务 |
| asyncio.gather() | 并发执行多个任务 |
| await asyncio.sleep() | 异步睡眠,不阻塞事件循环 |
import asyncio
async def hello():
print("Hello")
await asyncio.sleep(1)
print("World")
asyncio.run(hello())
await 表示:
遇到 I/O 时,把执行权交出去,让其他协程继续跑。
示例:
await asyncio.sleep(3)
意味“让出 CPU,不阻塞线程”。
import asyncio
async def task(n):
await asyncio.sleep(1)
return f"task {n}"
async def main():
results = await asyncio.gather(
task(1),
task(2),
task(3),
)
print(results)
asyncio.run(main())
输出:
['task 1', 'task 2', 'task 3']
🔹 3 个任务并发执行,耗时 ≈ 1 秒。
async def main():
t1 = asyncio.create_task(task(1))
t2 = asyncio.create_task(task(2))
print(await t1)
print(await t2)
import aiohttp
import asyncio
async def fetch(url, session):
async with session.get(url) as resp:
return await resp.text()
async def main():
urls = [
"https://example.com",
"https://python.org",
"https://httpbin.org/get",
]
async with aiohttp.ClientSession() as session:
results = await asyncio.gather(
*(fetch(url, session) for url in urls)
)
print(len(results))
asyncio.run(main())
| 场景 | 推荐 |
|---|---|
| I/O 密集(网络、数据库) | asyncio |
| CPU 密集(计算、加密、视频编码) | multiprocessing |
| 两者结合 | 异步 + 多进程(你前面问过的 run_cpu_async 就属于此类) |
async def safe_task():
try:
await risky()
except Exception as e:
print("Error:", e)
asyncio.create_task(safe_task())
await asyncio.wait_for(task(), timeout=3)
sem = asyncio.Semaphore(10) # 最多 10 并发
async def fetch(url):
async with sem:
await asyncio.sleep(1)
return url
⚠️ 若你写了以下代码,会阻塞整个事件循环:
time.sleep(3) # ❌ 不要这么写
要用:
await asyncio.sleep(3) # ✅ 异步睡眠
同理:
| 阻塞函数 | 替代 |
|---|---|
| requests | aiohttp |
| mysqlclient | asyncmy / asyncpg |
| SQLAlchemy sync | SQLAlchemy async |
| 文件 I/O | aiofiles |
async with aiohttp.ClientSession() as s:
...
async for row in async_generator():
print(row)
async def agen():
for i in range(3):
await asyncio.sleep(1)
yield i
@app.get("/items")
async def get_items():
data = await service.fetch_items()
return data
数据库 ORM 如果支持 async,例如:
async with async_session() as session:
result = await session.execute(stmt)
| 场景 | 是否适合 asyncio |
|---|---|
| 高频 I/O 请求 | ✅ 强烈推荐 |
| HTTP 爬虫、并发下载 | ✅ |
| 异步数据库 | ✅ |
| FastAPI / aiohttp Web 后端 | ✅ |
| CPU 密集计算 | ❌ 不适合,用 multiprocessing |
| 读写大文件 | ✅ 用 aiofiles |
async def func(): <- 定义协程
await something <- 让出执行权
asyncio.run() <- 启动事件循环
asyncio.create_task() <- 并发运行
asyncio.gather() <- 并发收集结果
asyncio.sleep() <- 异步睡眠
async with <- 异步上下文
async for <- 异步迭代器
Semaphore <- 限制并发
wait_for <- 超时控制
```text