1. 简介
Python的多线程管理使用threading模块。多线程可以帮助我们在处理任务时增加程序的运行速度,尤其在IO密集型任务上非常有用。
Python的全局解释器锁(GIL)也会对多线程的效率造成一定的影响,但并不代表Python的多线程就没有用处了。
在本文中,我们将会介绍如何使用Python 3.x中的threading模块进行多线程管理。
2. 创建线程
2.1 threading.Thread()
在Python中使用threading模块创建线程非常简单,只需要使用threading.Thread()函数,并传入一个函数作为线程的主体即可。
下面是一个例子,我们创建了一个函数print_hello()并把它作为参数传入了threading.Thread()函数中,创建了一个名为t的线程:
import threading
def print_hello():
print("Hello")
t = threading.Thread(target=print_hello)
t.start()
这里使用了Thread()函数来创建一个线程t,它的主体函数是print_hello(),线程通过start()方法来启动。
2.2 继承 threading.Thread
为了更好的管理线程,我们可以继承threading.Thread类并重写它的run()方法。
例子如下:
import threading
class PrintThread(threading.Thread):
def run(self):
print("Thread started")
t = PrintThread()
t.start()
这里我们创建了一个PrintThread类,继承自threading.Thread。这个类中,我们重写了run()方法来实现线程的主体内容。我们可以通过t.start()来启动线程,并默认会调用run()方法。
3. 线程池
使用线程池可以一次性创建多个线程,并且重复利用这些线程来执行任务,可以提高效率。
3.1 创建线程池
可以使用concurrent.futures中的ThreadPoolExecutor()来创建线程池。
下面是创建一个5个线程的线程池的例子:
from concurrent.futures import ThreadPoolExecutor
def task(num):
return num*2
# 创建线程池
with ThreadPoolExecutor(max_workers=5) as executor:
# 提交任务
for i in range(10):
future = executor.submit(task, i)
print(future.result())
这里我们首先定义了一个任务task(),它返回输入的数值乘以2。然后我们通过ThreadPoolExecutor()函数来创建一个线程池,max_workers参数指定了线程池中所含线程的数量。在使用with语句控制with代码块的上下文时创建线程池,这样可以自动管理线程的关闭操作。
之后我们可以通过执行executor.submit()来向线程池中提交任务,submit()返回一个future对象,它可以用来获取任务的执行结果。
3.2 批量提交任务
如果需要一次性提交多个任务到线程池中,我们可以使用map()函数来批量提交。
下面是一个例子:
from concurrent.futures import ThreadPoolExecutor
def task(num):
return num*2
# 创建线程池
with ThreadPoolExecutor(max_workers=5) as executor:
# 批量提交任务
results = executor.map(task, range(10))
for result in results:
print(result)
这里我们通过executor.map()来批量提交任务。map()函数会返回一个迭代器,我们可以直接使用for循环来遍历每个任务的结果。
4. 线程同步
4.1 Lock
在多线程的环境中,有时需要让多个线程访问同一个资源,这时候可能会出现多个线程同时修改同一个变量导致数据混乱的问题。为了避免这种问题,我们可以使用线程同步机制。
使用threading.Lock()可以很好地保证线程同步,下面是一个例子:
import threading
balance = 0
lock = threading.Lock()
def update(n):
global balance
balance += n
def task(n):
for i in range(1000):
lock.acquire()
update(n)
lock.release()
# 创建线程
threads = [threading.Thread(target=task, args=(i,)) for i in range(10)]
# 启动线程
for t in threads:
t.start()
# 等待线程结束
for t in threads:
t.join()
print(balance)
这个例子中,我们首先定义了一个全局变量balance,并创建了一个线程锁lock。
接着,我们定义了一个函数update()用来修改balance的值。
task()函数在这里是我们要执行的任务,它会调用update()函数来修改balance。
在task()函数中,我们通过lock.acquire()来获取线程锁,然后在修改balance之后通过lock.release()来释放线程锁,这样就可以保证每次只有一个线程在访问balance变量了。
最后我们创建了10个线程来执行这个任务,并且等待所有线程执行完毕后输出balance变量的值。
4.2 RLock
在使用Lock锁定时,只有一个线程可以获得锁并执行等待的临界区代码。而RLock可以让一个线程多次获得锁,可以解决其中一个线程嵌套执行任务时,死锁的问题。
下面是一个例子:
import threading
balance = 0
lock = threading.RLock()
def task(n):
global balance
if n:
with lock:
balance += n
task(n-1)
else:
return None
# 创建线程
threads = [threading.Thread(target=task, args=(5,)) for i in range(10)]
# 启动线程
for t in threads:
t.start()
# 等待线程结束
for t in threads:
t.join()
print(balance)
在这个例子中,我们定义了一个递归函数task()来修改balance变量的值,同时使用了RLock来保证线程同步。
在task()函数中,我们在递归调用task()函数之前获取锁,在递归调用之后释放锁。
这样可以保证递归调用过程中不会因为锁定问题出现死锁。
4.3 Condition
Condition可以通过wait()和notify()方法协调多个线程的执行顺序,特别适合于制定需要复杂的线程同步协议的情况。
下面是一个例子:
import threading
queue = []
lock = threading.Lock()
condition = threading.Condition(lock)
def produce(num):
global queue
for i in range(num):
with lock:
queue.append("Task %d" % i)
print("Task %d added, notify consumers..." % i)
condition.notify()
time.sleep(1)
def consume():
global queue
while True:
with lock:
while not queue:
print("No task, wait...")
condition.wait()
task = queue.pop(0)
print("Got task: %s" % task)
time.sleep(1)
# 创建线程
p = threading.Thread(target=produce, args=(2,))
c = threading.Thread(target=consume)
# 启动线程
p.start()
c.start()
# 等待线程结束
p.join()
c.join()
这里我们定义了一个队列queue,同时创建了一个Condition的对象。
生产者函数produce()会向queue中添加任务,并且每次添加后通过notify()方法通知消费者任务已经添加完成。
消费者函数consume()会循环检测队列是否为空,如果不为空就会从队列中取走一个任务并执行。
如果队列为空,它会通过调用condition.wait()来阻塞线程,并等待生产者的通知。
这样可以避免在队列为空时消费者线程不停地进行空循环损耗cpu资源。
5. 小结
本文主要介绍了Python 3.x中的threading模块来进行多线程管理的方法。我们讨论了如何创建线程和线程池,以及如何使用锁和条件来保证线程同步。多线程的使用虽然需要注意许多问题,但是掌握好相关技巧可以大大增加程序的执行效率。