最佳实践
下面给你一份 真正生产可用的 Python Redis 最佳实践(结合你的场景:FastAPI + SQLAlchemy + PostgreSQL + Kubernetes)。
内容极简但专业,可直接复制进你项目。
📌 目录
- 连接方式最佳实践
- 键名(Key)命名规范
- 数据结构使用最佳实践
- 原子操作最佳实践
- 高并发性能优化
- 高可靠性用法
- FastAPI 项目结构推荐
- 可直接使用的封装工具类
避免高并发时 Redis 被打爆:
import redis
redis_pool = redis.ConnectionPool(
host="localhost",
port=6379,
db=0,
max_connections=200,
decode_responses=True,
)
r = redis.Redis(connection_pool=redis_pool)
⚠️ 不要在每次请求里创建 Redis(),那会导致连接泄漏、CPU 激增。
统一使用 冒号 : 分隔层级:
user:1:profile
user:1:token
cache:stock:600519
rate_limit:login:192.168.1.1
规则:
- 一律小写
- 层级使用冒号
- 业务模块前缀,如 cache:、session:、token:
- 不要用大写、大段英文、中文
- 计数器
- 分布式锁
- 简单 KV 缓存
使用:
r.set("user:1:name", "martin", ex=3600)
更节省内存,适合用户信息、配置等。
r.hset("user:1", mapping={
"name": "martin",
"vip": "1"
})
适合:
- 简单消息队列
- 待处理任务
不适合超大列表。
如:
r.sadd("online_users", "martin")
如积分榜、热度榜。
必须使用原子指令,而不是 get + set。
计数器
r.incr("counter")
获取并增加
value = r.incrby("counter", 1)
分布式锁
lock = r.lock("my_lock", timeout=5)
with lock:
# 安全执行
pipe = r.pipeline()
pipe.incr("views")
pipe.sadd("active_user", 1)
result = pipe.execute()
⭐⭐⭐ 超级重要,避免 Redis 卡死
for key in r.scan_iter("user:*"):
print(key)
r.set("session:xxx", "data", ex=3600)
import random
r.set("user:1", "data", ex=3600 + random.randint(0, 300))
lock = r.lock("lock:hot_key", timeout=3)
目录结构推荐:
app/
├─ core/
│ ├─ config.py
│ └─ redis.py
├─ api/
│ └─ users.py
└─ main.py
redis.py(生产级依赖注入)
import redis
redis_pool = redis.ConnectionPool(
host="localhost",
port=6379,
db=0,
decode_responses=True,
max_connections=200,
)
redis_client = redis.Redis(connection_pool=redis_pool)
def get_redis():
return redis_client
FastAPI 依赖:
from fastapi import Depends
@app.get("/counter")
def counter(r = Depends(get_redis)):
return r.incr("counter")
import redis
import json
import random
from typing import Any, Optional
class RedisClient:
def __init__(self, host="localhost", port=6379, db=0, max_conn=200):
pool = redis.ConnectionPool(
host=host,
port=port,
db=db,
decode_responses=True,
max_connections=max_conn,
)
self.r = redis.Redis(connection_pool=pool)
# ---------- String ----------
def set(self, key: str, value: Any, ex: int | None = None, random_ttl: bool = False):
if random_ttl and ex:
ex = ex + random.randint(1, 300)
if not isinstance(value, str):
value = json.dumps(value)
return self.r.set(key, value, ex=ex)
def get(self, key: str) -> Optional[str]:
return self.r.get(key)
# ---------- Hash ----------
def hset(self, key: str, mapping: dict):
return self.r.hset(key, mapping=mapping)
def hgetall(self, key: str):
return self.r.hgetall(key)
# ---------- List ----------
def push(self, key: str, *values):
return self.r.rpush(key, *values)
def pop(self, key: str):
return self.r.lpop(key)
# ---------- Set ----------
def sadd(self, key: str, *values):
return self.r.sadd(key, *values)
def sismember(self, key: str, value):
return self.r.sismember(key, value)
# ---------- Atomic ops ----------
def incr(self, key: str, step: int = 1):
return self.r.incrby(key, step)
# ---------- Lock ----------
def lock(self, key: str, timeout=5):
return self.r.lock(key, timeout=timeout)
# ---------- Scan ----------
def scan(self, pattern: str):
for key in self.r.scan_iter(pattern):
yield key
使用:
rc = RedisClient()
rc.set("user:1:name", "martin", ex=3600)
print(rc.get("user:1:name"))
| 最佳实践 | 必须执行? | 说明 |
|---|---|---|
| 使用 ConnectionPool | ✅ 必须 | 避免高并发下卡死 |
| 使用 decode_responses | 推荐 | 避免大量 bytes |
| 用 incr 代替 get+set | 超级关键 | 防止竞态 |
| 使用 Pipeline | 推荐 | 降低 RTT |
| 禁用 KEYS,用 SCAN | 必须 | 防止卡死 |
| 随机 TTL 防雪崩 | 推荐 | 企业常用 |
| 分布式锁用 redis.lock | 推荐 | 避免并发问题 |
| key 名称规范化 | 推荐 | 代码清晰稳定 |
扩展
- Redis + FastAPI 完整封装(适用于你项目)
- Redis 分布式锁最佳实践
- Redis 缓存设计(穿透、击穿、雪崩防御)
- Redis Stream 消息队列最佳实践
- Redis 在 Kubernetes 上生产部署 YAML