Skip to main content
Documents
Toggle Dark/Light/Auto mode Toggle Dark/Light/Auto mode Toggle Dark/Light/Auto mode Back to homepage

最佳实践

下面给你一份 真正生产可用的 Python Redis 最佳实践(结合你的场景:FastAPI + SQLAlchemy + PostgreSQL + Kubernetes)。

内容极简但专业,可直接复制进你项目。

🚀 Python Redis 最佳实践(生产级)

📌 目录

  • 连接方式最佳实践
  • 键名(Key)命名规范
  • 数据结构使用最佳实践
  • 原子操作最佳实践
  • 高并发性能优化
  • 高可靠性用法
  • FastAPI 项目结构推荐
  • 可直接使用的封装工具类

1. 连接方式最佳实践

使用 连接池(必选)

避免高并发时 Redis 被打爆:

import redis

redis_pool = redis.ConnectionPool(
    host="localhost",
    port=6379,
    db=0,
    max_connections=200,
    decode_responses=True,
)

r = redis.Redis(connection_pool=redis_pool)

⚠️ 不要在每次请求里创建 Redis(),那会导致连接泄漏、CPU 激增。

2. Key 命名最佳实践

统一使用 冒号 : 分隔层级

user:1:profile
user:1:token
cache:stock:600519
rate_limit:login:192.168.1.1

规则:

  • 一律小写
  • 层级使用冒号
  • 业务模块前缀,如 cache:、session:、token:
  • 不要用大写、大段英文、中文

3. 数据结构选型最佳实践

字符串 String -> 最稳选择

  • 计数器
  • 分布式锁
  • 简单 KV 缓存

使用:

r.set("user:1:name", "martin", ex=3600)

Hash -> 多字段对象(推荐)

更节省内存,适合用户信息、配置等。

r.hset("user:1", mapping={
    "name": "martin",
    "vip": "1"
})

List -> 队列(轻量)

适合:

  • 简单消息队列
  • 待处理任务

不适合超大列表。

Set -> 去重集合

如:

r.sadd("online_users", "martin")

ZSet -> 排序排行榜

如积分榜、热度榜。

4. 原子操作最佳实践(非常关键)

必须使用原子指令,而不是 get + set。

计数器

r.incr("counter")

获取并增加

value = r.incrby("counter", 1)

分布式锁

lock = r.lock("my_lock", timeout=5)

with lock:
    # 安全执行

5. 高并发性能优化

使用 Pipeline 批量提交(减少 RTT)

pipe = r.pipeline()
pipe.incr("views")
pipe.sadd("active_user", 1)
result = pipe.execute()

使用 SCAN 替代 KEYS

⭐⭐⭐ 超级重要,避免 Redis 卡死

for key in r.scan_iter("user:*"):
    print(key)

6. 高可靠性使用方式

设置 TTL 防止缓存炸库

r.set("session:xxx", "data", ex=3600)

缓存雪崩保护(随机 TTL)

import random
r.set("user:1", "data", ex=3600 + random.randint(0, 300))

缓存击穿保护(热点 key 锁)

lock = r.lock("lock:hot_key", timeout=3)

7. 与 FastAPI 集成最佳实践(你强烈需要)

目录结构推荐:

app/
  ├─ core/
  │    ├─ config.py
  │    └─ redis.py
  ├─ api/
  │    └─ users.py
  └─ main.py

redis.py(生产级依赖注入)

import redis

redis_pool = redis.ConnectionPool(
    host="localhost",
    port=6379,
    db=0,
    decode_responses=True,
    max_connections=200,
)

redis_client = redis.Redis(connection_pool=redis_pool)

def get_redis():
    return redis_client

FastAPI 依赖:

from fastapi import Depends

@app.get("/counter")
def counter(r = Depends(get_redis)):
    return r.incr("counter")

8. 生产可用 Redis 工具类(你可以直接用)

import redis
import json
import random
from typing import Any, Optional


class RedisClient:
    def __init__(self, host="localhost", port=6379, db=0, max_conn=200):
        pool = redis.ConnectionPool(
            host=host,
            port=port,
            db=db,
            decode_responses=True,
            max_connections=max_conn,
        )
        self.r = redis.Redis(connection_pool=pool)

    # ---------- String ----------
    def set(self, key: str, value: Any, ex: int | None = None, random_ttl: bool = False):
        if random_ttl and ex:
            ex = ex + random.randint(1, 300)
        if not isinstance(value, str):
            value = json.dumps(value)
        return self.r.set(key, value, ex=ex)

    def get(self, key: str) -> Optional[str]:
        return self.r.get(key)

    # ---------- Hash ----------
    def hset(self, key: str, mapping: dict):
        return self.r.hset(key, mapping=mapping)

    def hgetall(self, key: str):
        return self.r.hgetall(key)

    # ---------- List ----------
    def push(self, key: str, *values):
        return self.r.rpush(key, *values)

    def pop(self, key: str):
        return self.r.lpop(key)

    # ---------- Set ----------
    def sadd(self, key: str, *values):
        return self.r.sadd(key, *values)

    def sismember(self, key: str, value):
        return self.r.sismember(key, value)

    # ---------- Atomic ops ----------
    def incr(self, key: str, step: int = 1):
        return self.r.incrby(key, step)

    # ---------- Lock ----------
    def lock(self, key: str, timeout=5):
        return self.r.lock(key, timeout=timeout)

    # ---------- Scan ----------
    def scan(self, pattern: str):
        for key in self.r.scan_iter(pattern):
            yield key

使用:

rc = RedisClient()

rc.set("user:1:name", "martin", ex=3600)
print(rc.get("user:1:name"))

🎯 总结:Python Redis 最佳实践核心要点

最佳实践 必须执行? 说明
使用 ConnectionPool ✅ 必须 避免高并发下卡死
使用 decode_responses 推荐 避免大量 bytes
用 incr 代替 get+set 超级关键 防止竞态
使用 Pipeline 推荐 降低 RTT
禁用 KEYS,用 SCAN 必须 防止卡死
随机 TTL 防雪崩 推荐 企业常用
分布式锁用 redis.lock 推荐 避免并发问题
key 名称规范化 推荐 代码清晰稳定

扩展

  • Redis + FastAPI 完整封装(适用于你项目)
  • Redis 分布式锁最佳实践
  • Redis 缓存设计(穿透、击穿、雪崩防御)
  • Redis Stream 消息队列最佳实践
  • Redis 在 Kubernetes 上生产部署 YAML