Skip to main content
Documents
Toggle Dark/Light/Auto mode Toggle Dark/Light/Auto mode Toggle Dark/Light/Auto mode Back to homepage

Queue (队列)

什么场景下会用到先进先出的队列?

FIFO(先进先出)队列非常常用,只要场景 强调“顺序处理”、“先来的先处理”、“排队”,都是典型使用场景。

以下是分类+具体实例,让你一眼就懂。

✅ 一、操作系统 & 网络中的 FIFO

1.1 多线程/多进程任务队列(典型)

生产者放任务,消费者按顺序处理,例如:

  • 爬虫任务队列(URL 先加入先爬)
  • 日志写入队列
  • 消息处理(如 RabbitMQ / Redis List)
from queue import Queue
q = Queue()
q.put("task1")
q.put("task2")
print(q.get())  # task1

1.2 IO 请求队列(磁盘、网络)

系统会按请求顺序处理读写任务。

✅ 二、业务逻辑中的“排队场景”

2.1 订单处理系统

先下单的订单要先处理中。

2.2 客服工单排队

用户工单按提交顺序处理。

2.3 接口限流、排队等候

例如:

  • API 限流:请求进入队列,按顺序处理
  • 秒杀系统:抢购请求先到先处理

✅ 三、数据结构与算法中的 FIFO

3.1 广度优先搜索(BFS)必用队列

例如图遍历、最短路径算法 Dijkstra(简化场景)、树层次遍历。

from collections import deque
queue = deque([start])

3.2 消息、事件、任务调度器

典型如:

  • 游戏循环事件队列
  • 事件驱动模型(事件先来先处理)
  • GUI/前端的事件循环

✅ 四、流式数据、实时系统

4.1 日志收集(log pipeline)

日志按时间实时进入队列 -> 后端处理。

4.2 数据流管道(streaming)

Kafka / Flink / Spark 都使用类似 FIFO 的结构。

✅ 五、缓存中的 FIFO 策略(缓存淘汰算法)

比如:

  • FIFO Cache:最早进入的最先淘汰

但注意:真实系统更常用 LRU,因为 FIFO 并不考虑访问频率。

✅ 六、前端 & Web:请求队列、动画队列

例如:

  • 前端 Ajax 请求排队
  • JS 动画队列(jQuery)

📌 总结:FIFO 适用于 -> “顺序不能乱” 的场景

场景 为什么 FIFO?
订单/工单排队 公平,按时间顺序处理
消息队列 MQ 事件顺序不能乱
爬虫任务队列 URL 按先后爬取
日志写入 日志是时间序列
BFS 遍历 层序扫描需要先入先出
缓存 FIFO 按加入顺序淘汰
限流排队 防止请求爆炸

使用 for 和 queue 处理顺序队列有什么区别?

下面用最清晰、最实战的方式告诉你:

for 循环顺序处理列表 vs Queue(队列)处理顺序队列,到底有什么区别、什么时候用哪个。

✅ 核心区别总结表

场景/特性 for 处理列表 queue.Queue / deque
是否线程安全 ❌否 ✅是(queue.Queue)
是否动态添加任务 ❌遍历期间不能安全变动 ✅可以边消费边追加
是否适合生产者-消费者模式 ❌不适合 ✅非常适合
是否阻塞等待任务 ❌不会阻塞 ✅可以 get(block=True) 等待
是否自动处理同步锁 ❌没有 ✅内置锁机制
适合数据规模 小到中 中到大,并用于并发
适用场景 简单顺序处理 工作队列、调度、线程池、自定义任务队列

🧩 区别 1:执行模型不同

for:处理一个固定的静态序列

tasks = [1, 2, 3]
for t in tasks:
    process(t)

特点:

  • tasks 是写死的,遍历开始后不适合修改(删/增可能导致越界、漏处理)
  • 不能动态接收新任务

Queue:动态任务流,执行过程中不断加入

from queue import Queue
import threading

q = Queue()

def worker():
    while True:
        t = q.get()
        process(t)
        q.task_done()

# 可以随时添加新任务
q.put(1)
q.put(2)
q.put(3)

特点:

  • 可动态追加任务
  • worker 永远等着接任务(阻塞等待)

🧩 区别 2:并发与线程安全

❌ for 循环不支持多线程并发处理

多个线程同时操作列表会:

  • 内容被修改
  • 顺序无法保证
  • 没有锁保护

✅ Queue 专为多线程设计(内置锁)

q = Queue()

def worker():
    while True:
        item = q.get()
        # do work
        q.task_done()

for _ in range(4):
    threading.Thread(target=worker).start()

Queue 可以安全高效实现“4 个线程同时取任务”的场景。

🧩 区别 3:阻塞机制

for 是非阻塞的 —— 数据没了就结束

Queue 可以阻塞等待任务

task = q.get(block=True)   # 如果队列为空,会等待新的任务

适用于:

  • 守护线程
  • 消息队列
  • 后台工作服务(持续处理)

🧩 区别 4:生产者-消费者模型是否适用

❌ for 不能实现真实的生产者-消费者模式

因为消费者无法等待生产者不断生产。

✅ Queue 完美支持

生产者

q.put(task)

消费者

q.get()

🧩 区别 5:是否能重点控制任务流程

Queue 提供:

  • q.task_done()
  • q.join()
  • maxsize 限制
  • 阻塞行为控制

for 循环完全做不到。

📌 什么时候用 for 循环?

适用于:

  • 数据量小
  • 任务固定、不变
  • 单线程
  • 无需动态追加任务

例如:

for file in file_list:
    process_file(file)

📌 什么时候必须用 Queue?(推荐使用)

  • 需要并发处理任务(多线程 / 多进程)
  • 需要实时追加任务
  • 需要任务等待机制
  • 需要生产者-消费者模型
  • 任务数量未知、可能无限增长
  • 任务执行时间不均衡(队列自动平衡)

十分常见在:

  • 抓取爬虫(生产 URL -> worker 请求)
  • 日志收集系统
  • 后台任务执行器
  • 消息队列
  • Worker 池
  • 调度系统、作业系统

🎯 一句话总结

  • for 遍历的是“固定列表”
  • Queue 处理的是“动态任务流”

for 适合一次性小任务。

Queue 适合需要持续运行、动态增加、并发执行的任务系统。


queue 和 collections.deque 有什么区别?

以下是一份最清晰、最实战的对比总结:

queue.Queue vs collections.deque —— 两者都是“队列”,但用法和目的差别非常大。

✅ 一张表总结区别

特性 queue.Queue collections.deque
是否线程安全 ✅是(内置锁) ⚠️否(非线程安全)
是否支持阻塞/超时 ✅支持 get(block=True)、put(timeout=) ❌不支持阻塞等待
适合多线程工作队列吗 ⭐最佳选择 ❌不适合
性能 较慢(锁开销) ⭐极高(O(1) 双端操作)
允许从两端操作 ❌只支持 put/get ✅支持 append/appendleft/pop/popleft
是否支持 maxlen ❌不支持 ✅支持固定长度自动丢弃旧数据
适合单线程队列吗? 可用但不必要 ⭐最佳选择
典型用途 线程池、消费者模型 缓冲区、任务列表、LRU 缓存、简单队列

🧩 1. queue.Queue:给多线程准备的“安全队列”

特点:

  • 内置锁(thread-safe)
  • 自带阻塞机制(生产者-消费者必用)
  • 提供 put/get、task_done/join
  • 速度比 deque 慢

示例:线程安全队列(生产者-消费者经典写法)

from queue import Queue
import threading

q = Queue()

def worker():
    while True:
        task = q.get()    # 会阻塞等待
        print("work:", task)
        q.task_done()

threading.Thread(target=worker, daemon=True).start()

q.put(1)
q.put(2)
q.put(3)
q.join()

适用于:

  • 多线程任务调度
  • 后台 worker 任务队列
  • 消息队列
  • 优雅的线程同步

🧩 2. collections.deque:最快速的双端队列(无锁)

特点:

  • 超快(C 实现,O(1) 操作)
  • 支持从两端操作
  • 默认不安全(非线程安全)
  • 可设定 maxlen 用作环形缓冲区

示例:单线程 FIFO 队列

from collections import deque

dq = deque()
dq.append(1)
dq.append(2)
print(dq.popleft())   # 1
print(dq.popleft())   # 2

用作固定长度缓存

dq = deque(maxlen=3)
dq.extend([1,2,3,4])
print(dq)   # deque([2,3,4], maxlen=3)

适用于:

  • 单线程的任务列表
  • LRU 缓存
  • 滑动窗口数据统计
  • 高性能队列处理
  • 双端队列结构

🧩 3. 功能对比(示例对比说明)

🔸阻塞等待?

Queue

q.get()  # 会阻塞等待新任务

deque

dq.popleft()  # 如果空:报错 IndexError,而不是阻塞

🔸线程安全?

Queue

# 多线程读写安全
q.put(x)

deque(不安全)

# 多线程读写要自己加锁
lock = threading.Lock()
with lock:
    dq.append(x)

🔸速度?

  • deque 远快于 Queue
  • Queue 因为加锁和阻塞机制而慢

如果你写的是单线程队列 -> 用 deque

如果你写的是多线程队列 -> 用 Queue

🧩 4. 到底怎么选择?(最实战建议)

| 使用场景 | 推荐 | | 多线程任务队列 | ⭐queue.Queue | | 需要阻塞等待任务 | ⭐queue.Queue | | 单线程高性能 FIFO | ⭐deque | | 双端操作的数据结构 | ⭐deque | | 实现 LRU 缓存 | deque(maxlen=) | | 实时流数据、滑动窗口 | deque | | 需要 task_done/join | queue.Queue |

🎯 一句话总结

Queue 是 “安全的慢队列”,deque 是 “快但不安全的队列”。

  • 多线程 -> Queue
  • 单线程高性能 -> deque
  • 双端结构 -> deque
  • 消费者模型 -> Queue

结合 Redis 实现队列操作

下面给你一份 最实战、可直接用在项目中的 Redis 队列(FIFO/LIFO/延时/可靠队列)方案,适合你的 FastAPI / Python 环境。

✅ 一、Redis 为什么适合作队列?

Redis 原生支持:

  • 列表型队列(list -> LPUSH / RPUSH / LPOP / BRPOP)
  • 阻塞队列(BRPOP、BLPOP)
  • 延时队列(sorted set)
  • 可靠队列(ack 模式)(RPOPLPUSH)

并且:

  • 无锁(单线程执行)
  • 超快(内存)
  • 集群可扩展
  • 服务间通信最佳选择之一

🔥 二、最推荐的三种 Redis 队列模式(带代码)

✅ 1. 简单 FIFO 队列(最常用)

使用 Redis list:

LPUSH queue key
RPOP queue

推任务(生产者)

import redis
r = redis.Redis()

def push_task(task: str):
    r.lpush("task_queue", task)

取任务(消费者)

def pop_task():
    return r.rpop("task_queue")

特点:

  • FIFO
  • 不阻塞
  • 简单高效

✅ 2. 阻塞队列(推荐) —— 消费者可以“等待任务”

BLPOP queue timeout
BRPOP queue timeout

消费者示例(阻塞模式)

def worker():
    while True:
        task = r.brpop("task_queue", timeout=0)  # 永久阻塞
        key, value = task
        print("处理任务:", value.decode())

特点:

  • 不需要 while + sleep
  • 高性能
  • 消费者等任务的时候不会浪费 CPU

✅ 3. 可靠队列(ack 机制) —— 消费失败任务不会丢

通过 Redis 命令:

RPOPLPUSH source processing_queue

消费失败时再把任务放回 source。

生产者

def push_task(task: str):
    r.lpush("task_queue", task)

消费者(带 ACK)

def worker():
    while True:
        task = r.rpoplpush("task_queue", "processing_queue")
        try:
            # 处理任务
            print(f"处理任务:{task.decode()}")

            # 成功后移除处理队列中的任务
            r.lrem("processing_queue", 1, task)

        except Exception:
            # 不删除,留下处理队列,稍后恢复
            print("任务失败,将在之后重试")

特点:

  • worker 崩溃时任务不会丢
  • 可用一个定时器扫描 processing_queue 恢复失败任务

🔥 三、Redis 延时队列(适合定时任务)

使用 ZSET:

ZADD delay_queue score(timestamp) value

1)生产者(设置未来执行的时间)

def delay_push(task: str, delay_seconds: int):
    timestamp = time.time() + delay_seconds
    r.zadd("delay_queue", {task: timestamp})

2)消费者(每秒检查可以执行的任务)

def delay_worker():
    while True:
        now = time.time()
        tasks = r.zrangebyscore("delay_queue", 0, now, start=0, num=1)

        if not tasks:
            time.sleep(0.5)
            continue

        task = tasks[0]
        if r.zrem("delay_queue", task):  # 防止并发重复删除
            print("执行延时任务:", task.decode())

🔥 四、Redis 队列 vs Python 内置 queue/deque

功能 Redis queue.Queue deque
多进程/多机器共享 ⭐支持 ❌不支持 ❌不支持
超高性能 ⭐⭐ ⭐⭐
阻塞等待 BRPOP get(block=True)
可靠队列 ⭐支持
延时队列 ⭐支持
适用场景 分布式、多服务 单机多线程 单线程结构

Redis 适合你的 爬虫分布式调度、任务队列、异步服务。

🔥 五、为你的项目定制:RedisQueue 类(可直接用)

import redis
import json
from typing import Any

class RedisQueue:
    def __init__(self, name: str, host="localhost", port=6379, db=0):
        self.name = name
        self.r = redis.Redis(host=host, port=port, db=db)

    def push(self, data: Any):
        self.r.lpush(self.name, json.dumps(data))

    def pop(self, block=True, timeout=0):
        if block:
            result = self.r.brpop(self.name, timeout=timeout)
            if result:
                _, value = result
                return json.loads(value)
            return None
        else:
            value = self.r.rpop(self.name)
            return json.loads(value) if value else None

    def size(self):
        return self.r.llen(self.name)

使用

q = RedisQueue("task_queue")

q.push({"id": 1, "action": "crawl"})

task = q.pop()
print(task)