Python3教程:queue 模块用法

Python3教程:queue 模块用法

1. 引言

当我们需要在一个线程中将数据传递给另一个线程时,可以使用多种方式实现,其中一种方式是使用队列 (queue)。在 Python 中,自带一个 queue 模块,提供了 FIFO 队列 (Queue)、LIFO 队列 (LifoQueue)、优先队列 (PriorityQueue) 等多种队列。本文将重点介绍 Python 中 queue 模块的使用,并且会提供一些示例代码,帮助读者更好地理解。

2. 基本操作

2.1 创建队列

首先需要导入 queue 模块,然后就可以创建队列了。创建队列时,需要指定队列的类型,包括 FIFO、LIFO 和优先队列。FIFO 队列是一种先进先出的队列,LIFO 队列是一种后进先出的队列,而优先队列则是基于优先级进行排序的队列。以下是使用 queue 模块创建队列的基本步骤:

import queue

# 创建 FIFO 队列

fifo_queue = queue.Queue()

# 创建 LIFO 队列

lifo_queue = queue.LifoQueue()

# 创建优先队列

priority_queue = queue.PriorityQueue()

2.2 放入数据

向队列中添加数据需要使用 put() 方法。put() 方法可以有一个可选的 timeout 参数,用于设置阻塞时间,如果队列满了,会等待 timeout 秒后再继续执行操作。

# 向队列中添加数据

fifo_queue.put("A")

fifo_queue.put("B")

fifo_queue.put("C")

lifo_queue.put("A")

lifo_queue.put("B")

lifo_queue.put("C")

priority_queue.put((3, "A")) # 添加一个优先级为 3 的元素 "A"

priority_queue.put((1, "B")) # 添加一个优先级为 1 的元素 "B"

priority_queue.put((2, "C")) # 添加一个优先级为 2 的元素 "C"

2.3 获取数据

从队列中获取数据可以使用 get() 方法。get() 方法也可以有一个可选的 timeout 参数,用于设置阻塞时间,如果队列为空,会等待 timeout 秒后再继续执行操作。

# 获取队列中的数据

print(fifo_queue.get()) # 输出:A

print(fifo_queue.get()) # 输出:B

print(fifo_queue.get()) # 输出:C

print(lifo_queue.get()) # 输出:C

print(lifo_queue.get()) # 输出:B

print(lifo_queue.get()) # 输出:A

print(priority_queue.get()) # 输出:(1, 'B')

print(priority_queue.get()) # 输出:(2, 'C')

print(priority_queue.get()) # 输出:(3, 'A')

3. 队列的属性和方法

3.1 队列的大小

可以使用 qsize() 方法获取队列的大小。

# 获取队列的大小

print(fifo_queue.qsize()) # 输出:0

print(lifo_queue.qsize()) # 输出:0

print(priority_queue.qsize()) # 输出:0

3.2 判断队列是否为空

可以使用 empty() 方法判断队列是否为空。

# 判断队列是否为空

print(fifo_queue.empty()) # 输出:True

print(lifo_queue.empty()) # 输出:True

print(priority_queue.empty()) # 输出:True

3.3 判断队列是否已满

可以使用 full() 方法判断队列是否已满。注意,LIFO 队列不支持 full() 方法。

# 判断队列是否已满

print(fifo_queue.full()) # 输出:False

print(priority_queue.full()) # 输出:False

3.4 清空队列

可以使用 queue.clear() 方法清空队列。

# 清空队列

fifo_queue.clear()

lifo_queue.clear()

priority_queue.clear()

3.5 队列的阻塞操作

当队列为空时,调用 get() 方法会阻塞代码。当队列满时,调用 put() 方法也会阻塞代码。以下是一些常用的阻塞操作:

get(block=True, timeout=None):当队列为空时,调用该方法会一直阻塞代码,直到有新的数据入队或超时。

put(item, block=True, timeout=None):当队列已满时,调用该方法会一直阻塞代码,直到有数据出队或超时。

join():等待队列中的所有任务执行完毕。

task_done():通知队列已完成一个任务。

4. 示例代码

下面是一些示例代码,演示了如何使用 queue 模块。

4.1 生产者-消费者模型

生产者-消费者模型是一种常见的多线程模型。本示例演示了如何使用队列实现生产者-消费者模型。

import queue

import threading

import random

import time

class Producer(threading.Thread):

def __init__(self, queue):

threading.Thread.__init__(self)

self.queue = queue

def run(self):

while True:

# 随机生成一个数字

item = random.randint(1, 100)

# 放入队列中

self.queue.put(item)

# 打印提示信息

print("[Producer] Added item: {} [queue size={}]".format(item, self.queue.qsize()))

# 随机等待一段时间

time.sleep(random.uniform(0.5, 1.5))

class Consumer(threading.Thread):

def __init__(self, queue):

threading.Thread.__init__(self)

self.queue = queue

def run(self):

while True:

# 获取数据

item = self.queue.get()

# 打印提示信息

print("[Consumer] Removed item: {} [queue size={}]".format(item, self.queue.qsize()))

# 通知任务已完成

self.queue.task_done()

# 随机等待一段时间

time.sleep(random.uniform(0.5, 1.5))

if __name__ == "__main__":

q = queue.Queue()

# 启动生产者线程

t1 = Producer(q)

t1.start()

# 启动消费者线程

t2 = Consumer(q)

t2.start()

# 等待队列中的所有任务执行完毕

q.join()

重点提示:在生产者-消费者模型中,生产者线程会不断地向队列中添加数据,而消费者线程会不断地取出队列中的数据,这两个操作会不断地交替进行。在代码中,每当生产者线程添加了一个数据时,都会打印一个提示信息。同样,在消费者线程取出一个数据时,也会打印一个提示信息。我们可以看到,即使在多线程的情况下,队列的使用也非常方便。

4.2 使用队列实现多线程下载

以下示例演示了如何使用队列实现多线程下载。在本例中,下载任务被分成若干个子任务,每个子任务由一个线程执行。每个线程都从队列中取出一个子任务进行下载,下载完成后继续从队列中取出下一个子任务。当队列为空时,表示所有任务已完成。

import os

import queue

import threading

import requests

class Downloader(threading.Thread):

def __init__(self, queue, path):

threading.Thread.__init__(self)

self.queue = queue

self.path = path

def run(self):

while True:

# 从队列中获取一个子任务

url = self.queue.get()

# 组装数据的URL链接和本地文件的保存路径

filename = os.path.join(self.path, url.split("/")[-1])

# 下载数据

response = requests.get(url, stream=True)

with open(filename, "wb") as f:

for chunk in response.iter_content(chunk_size=1024):

f.write(chunk)

# 触发任务完成事件

self.queue.task_done()

if __name__ == "__main__":

url_list = [

"https://cdn.pixabay.com/photo/2021/03/18/09/16/cat-6103309__340.jpg",

"https://cdn.pixabay.com/photo/2015/04/23/22/00/tree-736885__340.jpg",

"https://cdn.pixabay.com/photo/2021/03/15/13/53/cat-6091805__340.jpg",

"https://cdn.pixabay.com/photo/2016/03/27/22/10/girl-1284451__340.jpg"

]

# 创建队列

q = queue.Queue()

# 设置本地文件保存路径

path = os.path.join(os.getcwd(), "downloads")

if not os.path.exists(path):

os.mkdir(path)

# 创建下载任务线程

for i in range(4):

d = Downloader(q, path)

d.setDaemon(True)

d.start()

# 将下载任务分解成小任务并放入队列中

for url in url_list:

q.put(url)

# 等待所有任务完成

q.join()

print("所有下载任务已完成!")

重点提示:在多线程下载中,将下载任务分解成若干个子任务,并将子任务放入队列中。创建多个线程,在每个线程中不断从队列中取出一个子任务并执行下载操作。当队列为空时,表示所有任务已完成。

后端开发标签