Python3教程:queue 模块用法
1. 引言
当我们需要在一个线程中将数据传递给另一个线程时,可以使用多种方式实现,其中一种方式是使用队列 (queue)。在 Python 中,自带一个 queue 模块,提供了 FIFO 队列 (Queue)、LIFO 队列 (LifoQueue)、优先队列 (PriorityQueue) 等多种队列。本文将重点介绍 Python 中 queue 模块的使用,并且会提供一些示例代码,帮助读者更好地理解。
2. 基本操作
2.1 创建队列
首先需要导入 queue 模块,然后就可以创建队列了。创建队列时,需要指定队列的类型,包括 FIFO、LIFO 和优先队列。FIFO 队列是一种先进先出的队列,LIFO 队列是一种后进先出的队列,而优先队列则是基于优先级进行排序的队列。以下是使用 queue 模块创建队列的基本步骤:
import queue
# 创建 FIFO 队列
fifo_queue = queue.Queue()
# 创建 LIFO 队列
lifo_queue = queue.LifoQueue()
# 创建优先队列
priority_queue = queue.PriorityQueue()
2.2 放入数据
向队列中添加数据需要使用 put() 方法。put() 方法可以有一个可选的 timeout 参数,用于设置阻塞时间,如果队列满了,会等待 timeout 秒后再继续执行操作。
# 向队列中添加数据
fifo_queue.put("A")
fifo_queue.put("B")
fifo_queue.put("C")
lifo_queue.put("A")
lifo_queue.put("B")
lifo_queue.put("C")
priority_queue.put((3, "A")) # 添加一个优先级为 3 的元素 "A"
priority_queue.put((1, "B")) # 添加一个优先级为 1 的元素 "B"
priority_queue.put((2, "C")) # 添加一个优先级为 2 的元素 "C"
2.3 获取数据
从队列中获取数据可以使用 get() 方法。get() 方法也可以有一个可选的 timeout 参数,用于设置阻塞时间,如果队列为空,会等待 timeout 秒后再继续执行操作。
# 获取队列中的数据
print(fifo_queue.get()) # 输出:A
print(fifo_queue.get()) # 输出:B
print(fifo_queue.get()) # 输出:C
print(lifo_queue.get()) # 输出:C
print(lifo_queue.get()) # 输出:B
print(lifo_queue.get()) # 输出:A
print(priority_queue.get()) # 输出:(1, 'B')
print(priority_queue.get()) # 输出:(2, 'C')
print(priority_queue.get()) # 输出:(3, 'A')
3. 队列的属性和方法
3.1 队列的大小
可以使用 qsize() 方法获取队列的大小。
# 获取队列的大小
print(fifo_queue.qsize()) # 输出:0
print(lifo_queue.qsize()) # 输出:0
print(priority_queue.qsize()) # 输出:0
3.2 判断队列是否为空
可以使用 empty() 方法判断队列是否为空。
# 判断队列是否为空
print(fifo_queue.empty()) # 输出:True
print(lifo_queue.empty()) # 输出:True
print(priority_queue.empty()) # 输出:True
3.3 判断队列是否已满
可以使用 full() 方法判断队列是否已满。注意,LIFO 队列不支持 full() 方法。
# 判断队列是否已满
print(fifo_queue.full()) # 输出:False
print(priority_queue.full()) # 输出:False
3.4 清空队列
可以使用 queue.clear() 方法清空队列。
# 清空队列
fifo_queue.clear()
lifo_queue.clear()
priority_queue.clear()
3.5 队列的阻塞操作
当队列为空时,调用 get() 方法会阻塞代码。当队列满时,调用 put() 方法也会阻塞代码。以下是一些常用的阻塞操作:
get(block=True, timeout=None):当队列为空时,调用该方法会一直阻塞代码,直到有新的数据入队或超时。
put(item, block=True, timeout=None):当队列已满时,调用该方法会一直阻塞代码,直到有数据出队或超时。
join():等待队列中的所有任务执行完毕。
task_done():通知队列已完成一个任务。
4. 示例代码
下面是一些示例代码,演示了如何使用 queue 模块。
4.1 生产者-消费者模型
生产者-消费者模型是一种常见的多线程模型。本示例演示了如何使用队列实现生产者-消费者模型。
import queue
import threading
import random
import time
class Producer(threading.Thread):
def __init__(self, queue):
threading.Thread.__init__(self)
self.queue = queue
def run(self):
while True:
# 随机生成一个数字
item = random.randint(1, 100)
# 放入队列中
self.queue.put(item)
# 打印提示信息
print("[Producer] Added item: {} [queue size={}]".format(item, self.queue.qsize()))
# 随机等待一段时间
time.sleep(random.uniform(0.5, 1.5))
class Consumer(threading.Thread):
def __init__(self, queue):
threading.Thread.__init__(self)
self.queue = queue
def run(self):
while True:
# 获取数据
item = self.queue.get()
# 打印提示信息
print("[Consumer] Removed item: {} [queue size={}]".format(item, self.queue.qsize()))
# 通知任务已完成
self.queue.task_done()
# 随机等待一段时间
time.sleep(random.uniform(0.5, 1.5))
if __name__ == "__main__":
q = queue.Queue()
# 启动生产者线程
t1 = Producer(q)
t1.start()
# 启动消费者线程
t2 = Consumer(q)
t2.start()
# 等待队列中的所有任务执行完毕
q.join()
重点提示:在生产者-消费者模型中,生产者线程会不断地向队列中添加数据,而消费者线程会不断地取出队列中的数据,这两个操作会不断地交替进行。在代码中,每当生产者线程添加了一个数据时,都会打印一个提示信息。同样,在消费者线程取出一个数据时,也会打印一个提示信息。我们可以看到,即使在多线程的情况下,队列的使用也非常方便。
4.2 使用队列实现多线程下载
以下示例演示了如何使用队列实现多线程下载。在本例中,下载任务被分成若干个子任务,每个子任务由一个线程执行。每个线程都从队列中取出一个子任务进行下载,下载完成后继续从队列中取出下一个子任务。当队列为空时,表示所有任务已完成。
import os
import queue
import threading
import requests
class Downloader(threading.Thread):
def __init__(self, queue, path):
threading.Thread.__init__(self)
self.queue = queue
self.path = path
def run(self):
while True:
# 从队列中获取一个子任务
url = self.queue.get()
# 组装数据的URL链接和本地文件的保存路径
filename = os.path.join(self.path, url.split("/")[-1])
# 下载数据
response = requests.get(url, stream=True)
with open(filename, "wb") as f:
for chunk in response.iter_content(chunk_size=1024):
f.write(chunk)
# 触发任务完成事件
self.queue.task_done()
if __name__ == "__main__":
url_list = [
"https://cdn.pixabay.com/photo/2021/03/18/09/16/cat-6103309__340.jpg",
"https://cdn.pixabay.com/photo/2015/04/23/22/00/tree-736885__340.jpg",
"https://cdn.pixabay.com/photo/2021/03/15/13/53/cat-6091805__340.jpg",
"https://cdn.pixabay.com/photo/2016/03/27/22/10/girl-1284451__340.jpg"
]
# 创建队列
q = queue.Queue()
# 设置本地文件保存路径
path = os.path.join(os.getcwd(), "downloads")
if not os.path.exists(path):
os.mkdir(path)
# 创建下载任务线程
for i in range(4):
d = Downloader(q, path)
d.setDaemon(True)
d.start()
# 将下载任务分解成小任务并放入队列中
for url in url_list:
q.put(url)
# 等待所有任务完成
q.join()
print("所有下载任务已完成!")
重点提示:在多线程下载中,将下载任务分解成若干个子任务,并将子任务放入队列中。创建多个线程,在每个线程中不断从队列中取出一个子任务并执行下载操作。当队列为空时,表示所有任务已完成。