Engine
下面给你一个 全面、清晰、实战向 的 SQLAlchemy Engine 讲解,包括:
- 什么是 Engine
- Engine 的职责
- Engine 的最佳使用方式(同步 / 异步)
- Engine 的创建方式
- Engine 相关的常见坑
- 结合你正在做的 FastAPI + SQLAlchemy + PostgreSQL 的推荐架构
Engine 是 SQLAlchemy 的核心对象,代表你的数据库连接底层接口。
它本质上做 3 件事情:
Engine 内部包含一个连接池(默认是 QueuePool)。
这使得 SQLAlchemy 不会每次查询都创建新连接,速度大幅提升。
Engine 决定你连接的是 PostgreSQL、MySQL、SQLite……
负责把 SQLAlchemy 的语句转换成对应数据库的 SQL。
with engine.connect() as conn:
conn.execute(text("SELECT 1"))
它是所有 ORM / SQL / Async Session 的“根源”。
SQLAlchemy 2.0 推荐方式:
from sqlalchemy import create_engine
engine = create_engine(
"postgresql://user:pass@localhost:5432/dbname",
echo=False,
pool_size=10,
max_overflow=20,
)
from sqlalchemy.ext.asyncio import create_async_engine
async_engine = create_async_engine(
"postgresql+asyncpg://user:pass@localhost:5432/dbname",
echo=False,
pool_size=10,
max_overflow=20,
)
FastAPI 推荐使用 asyncpg + async engine,性能更强。
你要记住:
- Engine = 连接池管理者
- Session = ORM 查询对象
不应该在代码中随便执行:
engine.connect().execute(...)
除非写原生 SQL 工具函数(你现在正在做的)。
正常 ORM 应该这样:
async with async_session() as session:
result = await session.execute(select(User))
❗Engine 应该是“全局单例”
你不应该每次请求都创建 engine(非常昂贵)。
正确方式:
db.py
from sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker
DATABASE_URL = "postgresql+asyncpg://user:pass@127.0.0.1/db"
engine = create_async_engine(DATABASE_URL, echo=False)
AsyncSessionLocal = async_sessionmaker(engine, expire_on_commit=False)
在 FastAPI 依赖项中使用:
async def get_db():
async with AsyncSessionLocal() as session:
yield session
如果你没有关闭连接,会导致 pool 被耗尽:
conn = engine.connect() # 忘记 close()
应始终使用 with:
with engine.connect() as conn:
...
异步:
async with engine.connect() as conn:
...
以下错误方式:
def get_engine():
return create_async_engine(...)
每个请求新建 engine -> 性能极差。
ORM 语句必须通过 Session,而不是 Engine。
你现在使用:
- FastAPI
- PostgreSQL
- SQLAlchemy 2.0
- Async(你将添加权限模块、Casbin 等)
最佳结构如下:
app/
├── db/
│ ├── engine.py # 创建全局 engine、sessionmaker
│ ├── base.py # Base = declarative_base()
│ ├── models/ # ORM models
│ └── utils.py # 原生 SQL 工具函数
├── api/
│ └── ... # 路由
└── main.py # FastAPI 启动入口
你的 engine.py:
from sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker
DATABASE_URL = "postgresql+asyncpg://user:pass@localhost/db"
engine = create_async_engine(
DATABASE_URL,
echo=False,
pool_size=10,
max_overflow=20,
)
async_session = async_sessionmaker(
bind=engine,
expire_on_commit=False,
)
你的“工具方法”可以注入 engine:
class SQLTool:
def __init__(self, engine):
self.engine = engine
async def run(self, sql):
async with self.engine.connect() as conn:
result = await conn.execute(text(sql))
return result.mappings().all()
强烈建议:
- 使用“外部传入 Engine”的方式
因为:
- 可测试性更好(你可以传入测试数据库 engine)
- 可复用(多个工具类共享同一个 engine)
- 避免意外创建多个 engine
- 按你的项目结构(多模块)更易扩展
例如:
from app.db.engine import engine
sql_tool = SQLTool(engine)
不要在类里面初始化:
# ❌ 不推荐
class SQLTool:
def __init__(self):
self.engine = create_async_engine(...)
| 项目 | 推荐方式 |
|---|---|
| Engine 是否单例? | ✅ 必须是全局单例 |
| Engine 放哪里? | ✅ 在 db/engine.py 中统一管理 |
| 工具类需要 Engine 吗? | ✅ 外部传入,不要内部创建 |
| ORM 操作用 Session? | ✅ 是 |
| 原生 SQL 用 Engine? | ✅ 可以 |
| FastAPI 配合方式 | ✅ async_engine + async_sessionmaker |
下面给你一个架构级别的答案,覆盖可维护性、可测试性、性能、强耦合等所有关键点。
结论非常明确:
✅ 最佳实践:engine 在外部初始化,然后注入(传入)自定义类中
也就是:
engine = create_engine(...)
db = MyDB(engine)
而不是:
class MyDB:
def __init__(...):
self.engine = create_engine(...)
如果类内部创建 engine,你无法替换为:
- SQLite 内存库
- 测试数据库
- Mock 对象
而依赖注入可以轻松做到:
test_engine = create_engine("sqlite://")
db = MyDB(test_engine)
这是单元测试、自动化测试、CI/CD 必须的设计。
如果类内部写死:
create_engine("postgresql+asyncpg://...")
那么:
- 你无法切换数据库
- 无法连接多个数据库
- 无法复用该类(强耦合)
如果传入 engine,你可以复用:
db1 = MyDB(engine1) # 连接数据库A
db2 = MyDB(engine2) # 连接数据库B
通常推荐:
app
⬇
db_engine / db_session 管理器
⬇
repository / service 层(你的自定义类)
- 数据库连接属于 基础设施层
- 而你的类属于 业务逻辑层
如果两者混在一起 -> 架构混乱、难以维护。
engine 是一个重量级对象,并且:
- 它维护连接池
- 是线程安全的
- 应该是单例
外部创建 engine -> 共享一个连接池 -> 性能最佳。
如果每个类都自己初始化 engine:
- 多个连接池浪费资源
- 性能更差
- 池上限难以管理
未来你切换到 async 时:
create_async_engine(...)
只需替换创建 engine 的地方,而不是修改所有类。
class UserRepository:
def __init__(self):
self.engine = create_engine("postgresql://...")
缺点:
- 无法测试
- 无法配置
- 多个类重复创建 engine
- 难以扩展到 async
class UserRepository:
def __init__(self, engine):
self.engine = engine
外部:
engine = create_engine("postgresql://...")
user_repo = UserRepository(engine)
更灵活、更清晰、更可维护。
将 engine 封装在一个统一的数据库管理器中:
class Database:
def __init__(self, url: str):
self.engine = create_engine(url)
self.SessionLocal = sessionmaker(bind=self.engine)
db = Database("postgresql://...")
user_repo = UserRepository(db)
甚至使用依赖注入容器(FastAPI 原生支持)。
你的技术栈:
- FastAPI
- SQLAlchemy 2.x
- RBAC 系统
- 多模块结构(users, roles, permissions, menus…)
最佳方案是:
engine 单例
↓
sessionmaker 单例
↓
注入到 Repository / Service 层
这样结构最清晰、测试最方便、性能最佳。
下面给你一份 极完整、极清晰、实战级的 SQLAlchemy 2.0 create_engine / create_async_engine 详细教程,包含用法、参数、最佳实践、注意事项、ORM + Core 使用模式等。
内容适用于 PostgreSQL / MySQL / SQLite / Oracle / MSSQL 全部数据库。
| 项目 | 同步 | 异步 |
|---|---|---|
| Engine | create_engine() | create_async_engine() |
| Session | Session | AsyncSession |
| Run query | 同步 I/O | 原生 async I/O |
| 适用场景 | 脚本、批处理、后台服务 | 高并发 API、FastAPI |
同一项目可同时用同步和异步,但不建议混在同一套 ORM 对象中。
from sqlalchemy import create_engine
engine = create_engine(
"postgresql+psycopg://user:pass@localhost:5432/dbname",
echo=False,
future=True,
pool_size=10,
max_overflow=20,
pool_timeout=30,
pool_recycle=1800,
)
格式:
dialect+driver://username:password@host:port/database
例如:
| 数据库 | URL 示例 |
|---|---|
| PostgreSQL | postgresql+psycopg://user:pwd@127.0.0.1/db |
| MySQL | mysql+pymysql://user:pwd@127.0.0.1/db |
| SQLite | sqlite:///local.db |
是否打印 SQL:
engine = create_engine(url, echo=True)
SQLAlchemy 2.0 推荐,总是开启。
| 参数 | 作用 |
|---|---|
| pool_size | 保持的连接数 |
| max_overflow | 超出 pool_size 时额外创建的连接 |
| pool_timeout | 等待连接的超时时间 |
| pool_recycle | 连接回收时间(防止 MySQL “gone away”) |
示例:
engine = create_engine(
url,
pool_size=10,
max_overflow=20,
pool_recycle=1800,
)
防止数据库断开连接(强烈建议 PostgreSQL/MySQL 生产开启)
create_engine(url, pool_pre_ping=True)
脚本/短任务:
engine = create_engine(url, poolclass=NullPool)
ORM session
from sqlalchemy.orm import Session
with Session(engine) as session:
result = session.execute(select(User))
rows = result.scalars().all()
Core
with engine.connect() as conn:
result = conn.execute(text("SELECT 1"))
print(result.all())
异步 engine 由:
from sqlalchemy.ext.asyncio import create_async_engine
创建:
async_engine = create_async_engine(
"postgresql+asyncpg://user:pass@localhost/dbname",
echo=False,
future=True,
pool_size=10,
max_overflow=20,
pool_recycle=1800,
)
| 数据库 | 驱动 |
|---|---|
| PostgreSQL | asyncpg |
| MySQL | aiomysql |
| SQLite | aiosqlite |
示例:
postgresql+asyncpg://user:pwd@host/db
不能写:
postgresql+psycopg://user:pwd@host/db ❌
例如:
create_async_engine(
url,
echo=False,
pool_size=20,
pool_timeout=30,
pool_recycle=1800,
pool_pre_ping=True,
)
ORM AsyncSession
from sqlalchemy.ext.asyncio import AsyncSession
async with AsyncSession(async_engine) as session:
result = await session.execute(select(User))
rows = result.scalars().all()
Core 方式
async with async_engine.connect() as conn:
result = await conn.execute(text("SELECT 1"))
print(result.all())
Session(async_engine) # 错误
应该:
AsyncSession(async_engine)
例如:
session.commit() # 错误
正确:
await session.commit()
比如 PostgreSQL:
postgresql+psycopg://... ❌ 这是同步
postgresql+asyncpg://... ✅ 这是异步
engine = create_engine(url)
SessionLocal = sessionmaker(engine, expire_on_commit=False)
def get_session():
with SessionLocal() as session:
yield session
async_engine = create_async_engine(url)
AsyncSessionLocal = async_sessionmaker(
async_engine, expire_on_commit=False
)
async def get_session():
async with AsyncSessionLocal() as session:
yield session
🔥 PostgreSQL + FastAPI(异步推荐)
from sqlalchemy.ext.asyncio import create_async_engine
DATABASE_URL = "postgresql+asyncpg://user:pwd@127.0.0.1/mydb"
engine = create_async_engine(
DATABASE_URL,
echo=False,
pool_size=20,
max_overflow=10,
pool_pre_ping=True,
pool_recycle=1800,
future=True,
)
🔥 后台任务 / 管理脚本(同步)
from sqlalchemy import create_engine
DATABASE_URL = "postgresql+psycopg://user:pwd@127.0.0.1/mydb"
engine = create_engine(
DATABASE_URL,
echo=False,
pool_pre_ping=True,
pool_recycle=1800,
)
但建议你封装一个全局数据库管理器(单例)
原因:
👍 优点
- 全局共享 engine(连接池才能发挥作用)
- session factory 统一管理
- 配置集中,便于调试
- 避免多 engine 导致连接池爆炸
非常好的问题——这一点确实容易让人困惑。 关键区别在于:
✅ “类中初始化 engine” 和 “数据库管理器统一封装 engine” 不是一回事!
它们的目的不同、风险不同、设计哲学也不同。
我来给你讲非常清楚👇
示例(不推荐):
class SQLTool:
def __init__(self):
self.engine = create_async_engine("postgres://...")
问题:
Engine 内部有:
- 连接池
- 多线程锁
- 连接初始化
- 方言加载
这是昂贵对象,应该全局共享。
如果你创建多个 SQLTool 实例,就会有多个 engine,性能会暴死。
你无法在测试中这样做:
SQLTool(test_engine)
因为 SQLTool 把 engine 写死了。
例如你想调整:
pool_size=20
必须修改每个类的内部代码 -> 非常糟糕。
例如应用关闭时:
await engine.dispose()
如果 engine 分散在 N 个类中,你根本不知道在哪里关闭。
⚠️ 核心:
把 engine 创建写死在某个类内部 = 最不灵活、最不好维护的模式。
因为统一管理 ≠ 在类中初始化 而是:
例如:
db/engine.py
engine = create_async_engine(DATABASE_URL, echo=False)
SessionLocal = async_sessionmaker(engine)
class SQLTool:
def __init__(self, engine):
self.engine = engine
或 FastAPI 中:
async def get_db():
async with SessionLocal() as session:
yield session
因为它解决了所有问题:
所有组件共享同一个 engine
- -> 性能最佳
- -> 连接管理统一
更灵活
测试可以这样:
SQLTool(test_engine)
例如 pool_size、max_overflow、timeout
修改一次,全系统更新。
FastAPI 可在 shutdown 事件关闭 engine:
@app.on_event("shutdown")
async def shutdown():
await engine.dispose()
- db.py 负责数据库底座
- models/ 负责 ORM 模型
- crud/ 或 service/ 负责业务逻辑
- 工具类、任务、脚本都始终用同一个 engine
不推荐“每个类自己初始化 engine”。
推荐“全局只初始化一次 engine”,在其他类中通过参数传入或依赖注入使用。
app/
├── db/
│ ├── engine.py # 这里创建唯一 engine
│ ├── session.py
│ ├── base.py
│ └── models/
├── services/
│ └── sql_tool.py # 接收 engine 作为参数
└── api/
engine.py
engine = create_async_engine(DATABASE_URL, echo=False)
SessionLocal = async_sessionmaker(engine, expire_on_commit=False)
sql_tool.py
class SQLTool:
def __init__(self, engine):
self.engine = engine
async def execute(self, sql):
async with self.engine.connect() as conn:
result = await conn.execute(text(sql))
return result.mappings().all()