Skip to main content
Documents
Toggle Dark/Light/Auto mode Toggle Dark/Light/Auto mode Toggle Dark/Light/Auto mode Back to homepage

Engine

下面给你一个 全面、清晰、实战向 的 SQLAlchemy Engine 讲解,包括:

  • 什么是 Engine
  • Engine 的职责
  • Engine 的最佳使用方式(同步 / 异步)
  • Engine 的创建方式
  • Engine 相关的常见坑
  • 结合你正在做的 FastAPI + SQLAlchemy + PostgreSQL 的推荐架构

🚀 1. 什么是 SQLAlchemy Engine?

Engine 是 SQLAlchemy 的核心对象,代表你的数据库连接底层接口。

它本质上做 3 件事情:

1. 管理数据库连接池(Connection Pool)

Engine 内部包含一个连接池(默认是 QueuePool)。

这使得 SQLAlchemy 不会每次查询都创建新连接,速度大幅提升。

2. 管理方言(Dialect)

Engine 决定你连接的是 PostgreSQL、MySQL、SQLite……

负责把 SQLAlchemy 的语句转换成对应数据库的 SQL。

3. 提供 connect() 方法获取底层连接

with engine.connect() as conn:
    conn.execute(text("SELECT 1"))

它是所有 ORM / SQL / Async Session 的“根源”。

🚀 2. 如何创建 Engine?

SQLAlchemy 2.0 推荐方式:

同步 engine(常规 Web 后端使用不推荐)

from sqlalchemy import create_engine

engine = create_engine(
    "postgresql://user:pass@localhost:5432/dbname",
    echo=False,
    pool_size=10,
    max_overflow=20,
)

异步 engine(FastAPI 推荐)

from sqlalchemy.ext.asyncio import create_async_engine

async_engine = create_async_engine(
    "postgresql+asyncpg://user:pass@localhost:5432/dbname",
    echo=False,
    pool_size=10,
    max_overflow=20,
)

FastAPI 推荐使用 asyncpg + async engine,性能更强。

🚀 3. Engine 和 Session 的关系(重点!)

你要记住:

  • Engine = 连接池管理者
  • Session = ORM 查询对象

不应该在代码中随便执行:

engine.connect().execute(...)

除非写原生 SQL 工具函数(你现在正在做的)。

正常 ORM 应该这样:

async with async_session() as session:
    result = await session.execute(select(User))

🚀 4. Engine 的最佳使用方式

Engine 应该是“全局单例”

你不应该每次请求都创建 engine(非常昂贵)。

正确方式:

db.py

from sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker

DATABASE_URL = "postgresql+asyncpg://user:pass@127.0.0.1/db"

engine = create_async_engine(DATABASE_URL, echo=False)

AsyncSessionLocal = async_sessionmaker(engine, expire_on_commit=False)

在 FastAPI 依赖项中使用:

async def get_db():
    async with AsyncSessionLocal() as session:
        yield session

🚀 5. 常见坑

❌ 5.1 连接耗尽

如果你没有关闭连接,会导致 pool 被耗尽:

conn = engine.connect()  # 忘记 close()

应始终使用 with:

with engine.connect() as conn:
    ...

异步:

async with engine.connect() as conn:
    ...

❌ 5.2 创建过多 Engine

以下错误方式:

def get_engine():
    return create_async_engine(...)

每个请求新建 engine -> 性能极差。

❌ 5.3 Engine 执行 ORM 语句(错误用法)

ORM 语句必须通过 Session,而不是 Engine。

🚀 6. 结合你的项目:推荐架构

你现在使用:

  • FastAPI
  • PostgreSQL
  • SQLAlchemy 2.0
  • Async(你将添加权限模块、Casbin 等)

最佳结构如下:

app/
 ├── db/
 │    ├── engine.py          # 创建全局 engine、sessionmaker
 │    ├── base.py            # Base = declarative_base()
 │    ├── models/            # ORM models
 │    └── utils.py           # 原生 SQL 工具函数
 ├── api/
 │    └── ...                # 路由
 └── main.py                 # FastAPI 启动入口

你的 engine.py:

from sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker

DATABASE_URL = "postgresql+asyncpg://user:pass@localhost/db"

engine = create_async_engine(
    DATABASE_URL,
    echo=False,
    pool_size=10,
    max_overflow=20,
)

async_session = async_sessionmaker(
    bind=engine,
    expire_on_commit=False,
)

你的“工具方法”可以注入 engine:

class SQLTool:
    def __init__(self, engine):
        self.engine = engine

    async def run(self, sql):
        async with self.engine.connect() as conn:
            result = await conn.execute(text(sql))
            return result.mappings().all()

🚀 7. 你问的重点:Engine 应该放在哪里?

强烈建议:

  • 使用“外部传入 Engine”的方式

因为:

  • 可测试性更好(你可以传入测试数据库 engine)
  • 可复用(多个工具类共享同一个 engine)
  • 避免意外创建多个 engine
  • 按你的项目结构(多模块)更易扩展

例如:

from app.db.engine import engine
sql_tool = SQLTool(engine)

不要在类里面初始化:

# ❌ 不推荐
class SQLTool:
    def __init__(self):
        self.engine = create_async_engine(...)

📌 总结

项目 推荐方式
Engine 是否单例? ✅ 必须是全局单例
Engine 放哪里? ✅ 在 db/engine.py 中统一管理
工具类需要 Engine 吗? ✅ 外部传入,不要内部创建
ORM 操作用 Session? ✅ 是
原生 SQL 用 Engine? ✅ 可以
FastAPI 配合方式 ✅ async_engine + async_sessionmaker

自定义类,是在类中初始化 engine 好,还是将初始化好的 engine 传入到类中?

下面给你一个架构级别的答案,覆盖可维护性、可测试性、性能、强耦合等所有关键点。

结论非常明确:

最佳实践:engine 在外部初始化,然后注入(传入)自定义类中

也就是:

engine = create_engine(...)
db = MyDB(engine)

而不是:

class MyDB:
    def __init__(...):
        self.engine = create_engine(...)

📌 为什么要把 engine 传入类(依赖注入)?

1. 可测试性更强(最重要)

如果类内部创建 engine,你无法替换为:

  • SQLite 内存库
  • 测试数据库
  • Mock 对象

而依赖注入可以轻松做到:

test_engine = create_engine("sqlite://")
db = MyDB(test_engine)

这是单元测试、自动化测试、CI/CD 必须的设计。

2. 减少全局耦合,类更加通用

如果类内部写死:

create_engine("postgresql+asyncpg://...")

那么:

  • 你无法切换数据库
  • 无法连接多个数据库
  • 无法复用该类(强耦合)

如果传入 engine,你可以复用:

db1 = MyDB(engine1)   # 连接数据库A
db2 = MyDB(engine2)   # 连接数据库B

3. 提升程序架构的“分层清晰度”

通常推荐:

app


db_engine / db_session 管理器


repository / service 层(你的自定义类)
  • 数据库连接属于 基础设施层
  • 而你的类属于 业务逻辑层

如果两者混在一起 -> 架构混乱、难以维护。

4. 可以在多个类中共享 engine(性能更好)

engine 是一个重量级对象,并且:

  • 它维护连接池
  • 是线程安全的
  • 应该是单例

外部创建 engine -> 共享一个连接池 -> 性能最佳。

如果每个类都自己初始化 engine:

  • 多个连接池浪费资源
  • 性能更差
  • 池上限难以管理

5. 灵活替换为 AsyncEngine

未来你切换到 async 时:

create_async_engine(...)

只需替换创建 engine 的地方,而不是修改所有类。

🆚 对比示例

❌ 反模式(不推荐)

class UserRepository:
    def __init__(self):
        self.engine = create_engine("postgresql://...")

缺点:

  • 无法测试
  • 无法配置
  • 多个类重复创建 engine
  • 难以扩展到 async

✅ 正确模式(推荐)

class UserRepository:
    def __init__(self, engine):
        self.engine = engine

外部:

engine = create_engine("postgresql://...")
user_repo = UserRepository(engine)

更灵活、更清晰、更可维护。

⭐ 更高级的方式(你一定会用得到)

将 engine 封装在一个统一的数据库管理器中:

class Database:

    def __init__(self, url: str):
        self.engine = create_engine(url)
        self.SessionLocal = sessionmaker(bind=self.engine)

db = Database("postgresql://...")

user_repo = UserRepository(db)

甚至使用依赖注入容器(FastAPI 原生支持)。

📌 最终建议(针对你当前的项目架构)

你的技术栈:

  • FastAPI
  • SQLAlchemy 2.x
  • RBAC 系统
  • 多模块结构(users, roles, permissions, menus…)

最佳方案是:

engine 单例
sessionmaker 单例
注入到 Repository / Service 层

这样结构最清晰、测试最方便、性能最佳。


下面给你一份 极完整、极清晰、实战级的 SQLAlchemy 2.0 create_engine / create_async_engine 详细教程,包含用法、参数、最佳实践、注意事项、ORM + Core 使用模式等。

内容适用于 PostgreSQL / MySQL / SQLite / Oracle / MSSQL 全部数据库。

🚀 一、SQLAlchemy 2.0:同步 vs 异步

项目 同步 异步
Engine create_engine() create_async_engine()
Session Session AsyncSession
Run query 同步 I/O 原生 async I/O
适用场景 脚本、批处理、后台服务 高并发 API、FastAPI

同一项目可同时用同步和异步,但不建议混在同一套 ORM 对象中。

🏗 二、同步 engine:create_engine

from sqlalchemy import create_engine

engine = create_engine(
    "postgresql+psycopg://user:pass@localhost:5432/dbname",
    echo=False,
    future=True,
    pool_size=10,
    max_overflow=20,
    pool_timeout=30,
    pool_recycle=1800,
)

2.1 参数详解(同步)

🔹 url(连接字符串)

格式:

dialect+driver://username:password@host:port/database

例如:

数据库 URL 示例
PostgreSQL postgresql+psycopg://user:pwd@127.0.0.1/db
MySQL mysql+pymysql://user:pwd@127.0.0.1/db
SQLite sqlite:///local.db

🔹 echo

是否打印 SQL:

engine = create_engine(url, echo=True)

🔹 future=True

SQLAlchemy 2.0 推荐,总是开启。

🔹 连接池参数

参数 作用
pool_size 保持的连接数
max_overflow 超出 pool_size 时额外创建的连接
pool_timeout 等待连接的超时时间
pool_recycle 连接回收时间(防止 MySQL “gone away”)

示例:

engine = create_engine(
    url,
    pool_size=10,
    max_overflow=20,
    pool_recycle=1800,
)

🔹 pool_pre_ping=True

防止数据库断开连接(强烈建议 PostgreSQL/MySQL 生产开启)

create_engine(url, pool_pre_ping=True)

🔹 禁用 pool(脚本类)

脚本/短任务:

engine = create_engine(url, poolclass=NullPool)

2.2 同步 engine 使用示例

ORM session

from sqlalchemy.orm import Session

with Session(engine) as session:
    result = session.execute(select(User))
    rows = result.scalars().all()

Core

with engine.connect() as conn:
    result = conn.execute(text("SELECT 1"))
    print(result.all())

🌀 三、异步 engine:create_async_engine

异步 engine 由:

from sqlalchemy.ext.asyncio import create_async_engine

创建:

async_engine = create_async_engine(
    "postgresql+asyncpg://user:pass@localhost/dbname",
    echo=False,
    future=True,
    pool_size=10,
    max_overflow=20,
    pool_recycle=1800,
)

3.1 数据库驱动必须是 async 版本

数据库 驱动
PostgreSQL asyncpg
MySQL aiomysql
SQLite aiosqlite

示例:

postgresql+asyncpg://user:pwd@host/db

不能写:

postgresql+psycopg://user:pwd@host/db   

3.2 异步 engine 参数完全与同步 engine 相同

例如:

create_async_engine(
    url,
    echo=False,
    pool_size=20,
    pool_timeout=30,
    pool_recycle=1800,
    pool_pre_ping=True,
)

3.3 异步执行示例

ORM AsyncSession

from sqlalchemy.ext.asyncio import AsyncSession

async with AsyncSession(async_engine) as session:
    result = await session.execute(select(User))
    rows = result.scalars().all()

Core 方式

async with async_engine.connect() as conn:
    result = await conn.execute(text("SELECT 1"))
    print(result.all())

3.4 异步 engine 常见错误

❌ 错误:使用同步 Session

Session(async_engine)  # 错误

应该:

AsyncSession(async_engine)

❌ 错误:直接对 AsyncSession 调用非 async 方法

例如:

session.commit()  # 错误

正确:

await session.commit()

❌ 错误:URL 使用错误驱动

比如 PostgreSQL:

postgresql+psycopg://...  这是同步
postgresql+asyncpg://...  这是异步

🧱 四、正确的 Engine + Session 组合方式

同步

engine = create_engine(url)

SessionLocal = sessionmaker(engine, expire_on_commit=False)

def get_session():
    with SessionLocal() as session:
        yield session

异步

async_engine = create_async_engine(url)

AsyncSessionLocal = async_sessionmaker(
    async_engine, expire_on_commit=False
)

async def get_session():
    async with AsyncSessionLocal() as session:
        yield session

⭐ 五、Engine 创建最佳实践(你项目可直接使用)

🔥 PostgreSQL + FastAPI(异步推荐)

from sqlalchemy.ext.asyncio import create_async_engine

DATABASE_URL = "postgresql+asyncpg://user:pwd@127.0.0.1/mydb"

engine = create_async_engine(
    DATABASE_URL,
    echo=False,
    pool_size=20,
    max_overflow=10,
    pool_pre_ping=True,
    pool_recycle=1800,
    future=True,
)

🔥 后台任务 / 管理脚本(同步)

from sqlalchemy import create_engine

DATABASE_URL = "postgresql+psycopg://user:pwd@127.0.0.1/mydb"

engine = create_engine(
    DATABASE_URL,
    echo=False,
    pool_pre_ping=True,
    pool_recycle=1800,
)

🧩 六、补充:Engine 不建议放在类内部

但建议你封装一个全局数据库管理器(单例)

原因:

👍 优点

  • 全局共享 engine(连接池才能发挥作用)
  • session factory 统一管理
  • 配置集中,便于调试
  • 避免多 engine 导致连接池爆炸

既然不建议在类中初始化,为什么还建议将 engine 封装在一个统一的数据库管理器中?

非常好的问题——这一点确实容易让人困惑。 关键区别在于:

✅ “类中初始化 engine” 和 “数据库管理器统一封装 engine” 不是一回事!

它们的目的不同、风险不同、设计哲学也不同。

我来给你讲非常清楚👇

❌ 1. 为什么不建议 在类内部初始化 engine?

示例(不推荐):

class SQLTool:
    def __init__(self):
        self.engine = create_async_engine("postgres://...")

问题:

❌ 1.1 每个类实例都重新创建一个 engine(巨大开销)

Engine 内部有:

  • 连接池
  • 多线程锁
  • 连接初始化
  • 方言加载

这是昂贵对象,应该全局共享。

如果你创建多个 SQLTool 实例,就会有多个 engine,性能会暴死。

❌ 1.2 难以测试(不能替换为 mock 数据库)

你无法在测试中这样做:

SQLTool(test_engine)

因为 SQLTool 把 engine 写死了。

❌ 1.3 难以控制连接池大小

例如你想调整:

pool_size=20

必须修改每个类的内部代码 -> 非常糟糕。

❌ 1.4 不方便统一管理生命周期

例如应用关闭时:

await engine.dispose()

如果 engine 分散在 N 个类中,你根本不知道在哪里关闭。

⚠️ 核心:

把 engine 创建写死在某个类内部 = 最不灵活、最不好维护的模式。

✅ 2. 为什么建议将 engine 放在“统一的数据库管理器(db.py)”中?

因为统一管理 ≠ 在类中初始化 而是:

在全局只创建一次 engine

例如:

db/engine.py

engine = create_async_engine(DATABASE_URL, echo=False)
SessionLocal = async_sessionmaker(engine)

然后由其他类/模块依赖注入使用(Dependency Injection)

class SQLTool:
    def __init__(self, engine):
        self.engine = engine

或 FastAPI 中:

async def get_db():
    async with SessionLocal() as session:
        yield session

☘️ 为什么这种方式是最佳实践?

因为它解决了所有问题:

1) Engine 全局唯一(不重复创建)

所有组件共享同一个 engine

  • -> 性能最佳
  • -> 连接管理统一

2) 所有类能从外部接收 engine(依赖注入)

更灵活

测试可以这样:

SQLTool(test_engine)

3) 连接池配置统一

例如 pool_size、max_overflow、timeout

修改一次,全系统更新。

4) 生命周期明确

FastAPI 可在 shutdown 事件关闭 engine:

@app.on_event("shutdown")
async def shutdown():
    await engine.dispose()

5) 代码结构清晰可维护

  • db.py 负责数据库底座
  • models/ 负责 ORM 模型
  • crud/ 或 service/ 负责业务逻辑
  • 工具类、任务、脚本都始终用同一个 engine

📌 用一句话总结

不推荐“每个类自己初始化 engine”。

推荐“全局只初始化一次 engine”,在其他类中通过参数传入或依赖注入使用。

最终结构(最佳实践示例)

app/
 ├── db/
 │    ├── engine.py     # 这里创建唯一 engine
 │    ├── session.py
 │    ├── base.py
 │    └── models/
 ├── services/
 │    └── sql_tool.py   # 接收 engine 作为参数
 └── api/

engine.py

engine = create_async_engine(DATABASE_URL, echo=False)
SessionLocal = async_sessionmaker(engine, expire_on_commit=False)

sql_tool.py

class SQLTool:
    def __init__(self, engine):
        self.engine = engine

    async def execute(self, sql):
        async with self.engine.connect() as conn:
            result = await conn.execute(text(sql))
            return result.mappings().all()