Queue (队列)
FIFO(先进先出)队列非常常用,只要场景 强调“顺序处理”、“先来的先处理”、“排队”,都是典型使用场景。
以下是分类+具体实例,让你一眼就懂。
生产者放任务,消费者按顺序处理,例如:
- 爬虫任务队列(URL 先加入先爬)
- 日志写入队列
- 消息处理(如 RabbitMQ / Redis List)
from queue import Queue
q = Queue()
q.put("task1")
q.put("task2")
print(q.get()) # task1
系统会按请求顺序处理读写任务。
先下单的订单要先处理中。
用户工单按提交顺序处理。
例如:
- API 限流:请求进入队列,按顺序处理
- 秒杀系统:抢购请求先到先处理
例如图遍历、最短路径算法 Dijkstra(简化场景)、树层次遍历。
from collections import deque
queue = deque([start])
典型如:
- 游戏循环事件队列
- 事件驱动模型(事件先来先处理)
- GUI/前端的事件循环
日志按时间实时进入队列 -> 后端处理。
Kafka / Flink / Spark 都使用类似 FIFO 的结构。
比如:
- FIFO Cache:最早进入的最先淘汰
但注意:真实系统更常用 LRU,因为 FIFO 并不考虑访问频率。
例如:
- 前端 Ajax 请求排队
- JS 动画队列(jQuery)
| 场景 | 为什么 FIFO? |
|---|---|
| 订单/工单排队 | 公平,按时间顺序处理 |
| 消息队列 MQ | 事件顺序不能乱 |
| 爬虫任务队列 | URL 按先后爬取 |
| 日志写入 | 日志是时间序列 |
| BFS 遍历 | 层序扫描需要先入先出 |
| 缓存 FIFO | 按加入顺序淘汰 |
| 限流排队 | 防止请求爆炸 |
下面用最清晰、最实战的方式告诉你:
for 循环顺序处理列表 vs Queue(队列)处理顺序队列,到底有什么区别、什么时候用哪个。
| 场景/特性 | for 处理列表 | queue.Queue / deque |
|---|---|---|
| 是否线程安全 | ❌否 | ✅是(queue.Queue) |
| 是否动态添加任务 | ❌遍历期间不能安全变动 | ✅可以边消费边追加 |
| 是否适合生产者-消费者模式 | ❌不适合 | ✅非常适合 |
| 是否阻塞等待任务 | ❌不会阻塞 | ✅可以 get(block=True) 等待 |
| 是否自动处理同步锁 | ❌没有 | ✅内置锁机制 |
| 适合数据规模 | 小到中 | 中到大,并用于并发 |
| 适用场景 | 简单顺序处理 | 工作队列、调度、线程池、自定义任务队列 |
tasks = [1, 2, 3]
for t in tasks:
process(t)
特点:
- tasks 是写死的,遍历开始后不适合修改(删/增可能导致越界、漏处理)
- 不能动态接收新任务
from queue import Queue
import threading
q = Queue()
def worker():
while True:
t = q.get()
process(t)
q.task_done()
# 可以随时添加新任务
q.put(1)
q.put(2)
q.put(3)
特点:
- 可动态追加任务
- worker 永远等着接任务(阻塞等待)
❌ for 循环不支持多线程并发处理
多个线程同时操作列表会:
- 内容被修改
- 顺序无法保证
- 没有锁保护
✅ Queue 专为多线程设计(内置锁)
q = Queue()
def worker():
while True:
item = q.get()
# do work
q.task_done()
for _ in range(4):
threading.Thread(target=worker).start()
Queue 可以安全高效实现“4 个线程同时取任务”的场景。
for 是非阻塞的 —— 数据没了就结束
Queue 可以阻塞等待任务
task = q.get(block=True) # 如果队列为空,会等待新的任务
适用于:
- 守护线程
- 消息队列
- 后台工作服务(持续处理)
❌ for 不能实现真实的生产者-消费者模式
因为消费者无法等待生产者不断生产。
✅ Queue 完美支持
生产者
q.put(task)
消费者
q.get()
Queue 提供:
q.task_done()q.join()- maxsize 限制
- 阻塞行为控制
for 循环完全做不到。
适用于:
- 数据量小
- 任务固定、不变
- 单线程
- 无需动态追加任务
例如:
for file in file_list:
process_file(file)
- 需要并发处理任务(多线程 / 多进程)
- 需要实时追加任务
- 需要任务等待机制
- 需要生产者-消费者模型
- 任务数量未知、可能无限增长
- 任务执行时间不均衡(队列自动平衡)
十分常见在:
- 抓取爬虫(生产 URL -> worker 请求)
- 日志收集系统
- 后台任务执行器
- 消息队列
- Worker 池
- 调度系统、作业系统
- for 遍历的是“固定列表”
- Queue 处理的是“动态任务流”
for 适合一次性小任务。
Queue 适合需要持续运行、动态增加、并发执行的任务系统。
以下是一份最清晰、最实战的对比总结:
queue.Queue vs collections.deque —— 两者都是“队列”,但用法和目的差别非常大。
| 特性 | queue.Queue | collections.deque |
|---|---|---|
| 是否线程安全 | ✅是(内置锁) | ⚠️否(非线程安全) |
| 是否支持阻塞/超时 | ✅支持 get(block=True)、put(timeout=) | ❌不支持阻塞等待 |
| 适合多线程工作队列吗 | ⭐最佳选择 | ❌不适合 |
| 性能 | 较慢(锁开销) | ⭐极高(O(1) 双端操作) |
| 允许从两端操作 | ❌只支持 put/get | ✅支持 append/appendleft/pop/popleft |
| 是否支持 maxlen | ❌不支持 | ✅支持固定长度自动丢弃旧数据 |
| 适合单线程队列吗? | 可用但不必要 | ⭐最佳选择 |
| 典型用途 | 线程池、消费者模型 | 缓冲区、任务列表、LRU 缓存、简单队列 |
特点:
- 内置锁(thread-safe)
- 自带阻塞机制(生产者-消费者必用)
- 提供 put/get、task_done/join
- 速度比 deque 慢
示例:线程安全队列(生产者-消费者经典写法)
from queue import Queue
import threading
q = Queue()
def worker():
while True:
task = q.get() # 会阻塞等待
print("work:", task)
q.task_done()
threading.Thread(target=worker, daemon=True).start()
q.put(1)
q.put(2)
q.put(3)
q.join()
适用于:
- 多线程任务调度
- 后台 worker 任务队列
- 消息队列
- 优雅的线程同步
特点:
- 超快(C 实现,O(1) 操作)
- 支持从两端操作
- 默认不安全(非线程安全)
- 可设定 maxlen 用作环形缓冲区
示例:单线程 FIFO 队列
from collections import deque
dq = deque()
dq.append(1)
dq.append(2)
print(dq.popleft()) # 1
print(dq.popleft()) # 2
用作固定长度缓存
dq = deque(maxlen=3)
dq.extend([1,2,3,4])
print(dq) # deque([2,3,4], maxlen=3)
适用于:
- 单线程的任务列表
- LRU 缓存
- 滑动窗口数据统计
- 高性能队列处理
- 双端队列结构
Queue
q.get() # 会阻塞等待新任务
deque
dq.popleft() # 如果空:报错 IndexError,而不是阻塞
Queue
# 多线程读写安全
q.put(x)
deque(不安全)
# 多线程读写要自己加锁
lock = threading.Lock()
with lock:
dq.append(x)
- deque 远快于 Queue
- Queue 因为加锁和阻塞机制而慢
如果你写的是单线程队列 -> 用 deque
如果你写的是多线程队列 -> 用 Queue
| 使用场景 | 推荐 | | 多线程任务队列 | ⭐queue.Queue | | 需要阻塞等待任务 | ⭐queue.Queue | | 单线程高性能 FIFO | ⭐deque | | 双端操作的数据结构 | ⭐deque | | 实现 LRU 缓存 | deque(maxlen=) | | 实时流数据、滑动窗口 | deque | | 需要 task_done/join | queue.Queue |
Queue 是 “安全的慢队列”,deque 是 “快但不安全的队列”。
- 多线程 -> Queue
- 单线程高性能 -> deque
- 双端结构 -> deque
- 消费者模型 -> Queue
下面给你一份 最实战、可直接用在项目中的 Redis 队列(FIFO/LIFO/延时/可靠队列)方案,适合你的 FastAPI / Python 环境。
Redis 原生支持:
- 列表型队列(list -> LPUSH / RPUSH / LPOP / BRPOP)
- 阻塞队列(BRPOP、BLPOP)
- 延时队列(sorted set)
- 可靠队列(ack 模式)(RPOPLPUSH)
并且:
- 无锁(单线程执行)
- 超快(内存)
- 集群可扩展
- 服务间通信最佳选择之一
使用 Redis list:
LPUSH queue key
RPOP queue
推任务(生产者)
import redis
r = redis.Redis()
def push_task(task: str):
r.lpush("task_queue", task)
取任务(消费者)
def pop_task():
return r.rpop("task_queue")
特点:
- FIFO
- 不阻塞
- 简单高效
BLPOP queue timeout
BRPOP queue timeout
消费者示例(阻塞模式)
def worker():
while True:
task = r.brpop("task_queue", timeout=0) # 永久阻塞
key, value = task
print("处理任务:", value.decode())
特点:
- 不需要 while + sleep
- 高性能
- 消费者等任务的时候不会浪费 CPU
通过 Redis 命令:
RPOPLPUSH source processing_queue
消费失败时再把任务放回 source。
生产者
def push_task(task: str):
r.lpush("task_queue", task)
消费者(带 ACK)
def worker():
while True:
task = r.rpoplpush("task_queue", "processing_queue")
try:
# 处理任务
print(f"处理任务:{task.decode()}")
# 成功后移除处理队列中的任务
r.lrem("processing_queue", 1, task)
except Exception:
# 不删除,留下处理队列,稍后恢复
print("任务失败,将在之后重试")
特点:
- worker 崩溃时任务不会丢
- 可用一个定时器扫描 processing_queue 恢复失败任务
使用 ZSET:
ZADD delay_queue score(timestamp) value
def delay_push(task: str, delay_seconds: int):
timestamp = time.time() + delay_seconds
r.zadd("delay_queue", {task: timestamp})
def delay_worker():
while True:
now = time.time()
tasks = r.zrangebyscore("delay_queue", 0, now, start=0, num=1)
if not tasks:
time.sleep(0.5)
continue
task = tasks[0]
if r.zrem("delay_queue", task): # 防止并发重复删除
print("执行延时任务:", task.decode())
| 功能 | Redis | queue.Queue | deque |
|---|---|---|---|
| 多进程/多机器共享 | ⭐支持 | ❌不支持 | ❌不支持 |
| 超高性能 | ⭐⭐ | 中 | ⭐⭐ |
| 阻塞等待 | BRPOP | get(block=True) | ❌ |
| 可靠队列 | ⭐支持 | ❌ | ❌ |
| 延时队列 | ⭐支持 | ❌ | ❌ |
| 适用场景 | 分布式、多服务 | 单机多线程 | 单线程结构 |
Redis 适合你的 爬虫分布式调度、任务队列、异步服务。
import redis
import json
from typing import Any
class RedisQueue:
def __init__(self, name: str, host="localhost", port=6379, db=0):
self.name = name
self.r = redis.Redis(host=host, port=port, db=db)
def push(self, data: Any):
self.r.lpush(self.name, json.dumps(data))
def pop(self, block=True, timeout=0):
if block:
result = self.r.brpop(self.name, timeout=timeout)
if result:
_, value = result
return json.loads(value)
return None
else:
value = self.r.rpop(self.name)
return json.loads(value) if value else None
def size(self):
return self.r.llen(self.name)
使用
q = RedisQueue("task_queue")
q.push({"id": 1, "action": "crawl"})
task = q.pop()
print(task)