SQLAlchemy 基础教程
下面给你整理一份 真正适用于生产环境的、基于 SQLAlchemy 2.0 语法的完整教程(含 ORM + Core + Async + 会话管理 + 查询模型),覆盖你日常开发 FastAPI、PostgreSQL、TimescaleDB、RBAC 的全部常用操作。
全部代码均为 SQLAlchemy 2.0 官方推荐写法(如 Session(engine)、select()、async with AsyncSession()、mappings() 等)。
目录
- 🍃 安装
- 🚀 创建 Engine
- 🧱 定义 ORM 模型
- 📀 创建表
- 🧩 Session(同步 & 异步)
- 🔍 ORM 查询(select / where / order)
- ✏️ 插入(ORM / 原生 SQL)
- 🛠 更新、删除
- 📌 Result 解析(scalars/mappings/fetchall)
- 🔗 JOIN 查询
- 📂 Core 风格的执行
- ⚡ Async SQLAlchemy 用法
- 🧪 事务管理
- 🔒 最佳实践(你的生产项目应该这样写)
pip install sqlalchemy psycopg2-binary asyncpg
同步 engine
from sqlalchemy import create_engine
engine = create_engine(
"postgresql+psycopg2://user:pass@localhost/dbname",
future=True,
)
异步 engine(asyncpg)
from sqlalchemy.ext.asyncio import create_async_engine
async_engine = create_async_engine(
"postgresql+asyncpg://user:pass@localhost/dbname",
future=True,
)
from sqlalchemy.orm import DeclarativeBase
from sqlalchemy import Column, Integer, String
class Base(DeclarativeBase):
pass
class User(Base):
__tablename__ = "users"
id = Column(Integer, primary_key=True)
username = Column(String(50), unique=True)
role = Column(String(20))
Base.metadata.create_all(engine)
同步
from sqlalchemy.orm import Session
with Session(engine) as session:
...
异步
from sqlalchemy.ext.asyncio import AsyncSession
async with AsyncSession(async_engine) as session:
...
from sqlalchemy import select
stmt = select(User).where(User.role == "admin")
with Session(engine) as session:
result = session.execute(stmt)
users = result.scalars().all()
ORM 插入(推荐)
user = User(username="martin", role="admin")
with Session(engine) as s:
s.add(user)
s.commit()
原生 SQL 插入
from sqlalchemy import text
with engine.connect() as conn:
conn.execute(text("INSERT INTO users (username) VALUES (:u)"), {"u": "martin"})
conn.commit()
更新(ORM)
stmt = (
update(User)
.where(User.id == 1)
.values(role="superadmin")
)
with Session(engine) as s:
s.execute(stmt)
s.commit()
删除(ORM)
stmt = delete(User).where(User.id == 1)
with Session(engine) as s:
s.execute(stmt)
s.commit()
返回 ORM 对象
result.scalars().all()
字典格式(最常用)
result.mappings().all()
元组格式
result.fetchall()
stmt = (
select(User.username, Role.name)
.select_from(User)
.join(Role, User.role_id == Role.id)
)
with engine.connect() as conn:
result = conn.execute(text("SELECT * FROM users WHERE id=:id"), {"id": 1})
rows = result.mappings().all()
async def get_users():
async with AsyncSession(async_engine) as session:
result = await session.execute(select(User))
return result.scalars().all()
with Session(engine) as session, session.begin():
session.execute(...)
- engine 是单例
- 封装 SessionLocal
- 封装 CRUD
- 统一结果格式
- 避免连接泄漏