笔者在工作中经常接到这样的需求:利用现有的 redis 快照生成一个新的 redis 实例,并提供新实例的域名解析;由于生成 redis 和域名解析记录都是耗时操作;因此,顺理成章的把它构造成一个简单的生产者 - 消费者模型:
生产者利用 redis 快照生成新的实例 将新实例 id 传入队列消费者接受队列消息中 id 并更新 dns 解析到新的实例上
代码如下:
# 省略具体的业务逻辑代码,主要实现生产者-消费者的逻辑 import json import threading import time import queue class Producer(threading.Thread): def __init__(self, source_clusterid_queue, target_clusterid_queue): threading.Thread.__init__(self) self.source_clusterid_queue = source_clusterid_queue self.target_clusterid_queue = target_clusterid_queue def _restore_redis(self,clusterid): time.sleep(10) print(f"{clusterid} has restored") def run(self): source_clusterid = self.source_clusterid_queue.get() self._restore_redis(source_clusterid) self.target_clusterid_queue.put(source_clusterid) class Consumer(threading.Thread): def __init__(self, target_clusterid_queue): threading.Thread.__init__(self) self.clusterid_queue = target_clusterid_queue def _dns_update(self,cluster_id): time.sleep(2) print(f"{cluster_id} has been consumed") def run(self): while True: ClusterId = self.clusterid_queue.get() self._dns_update(ClusterId) if __name__=='__main__': source_clusterid_queue = queue.Queue() target_clusterid_queue = queue.Queue() clusterid_list = ['a','b'] restore_th_list=[] #启动生产者和消费者线程 for _ in clusterid_list: dns_update_th = Consumer(target_clusterid_queue) dns_update_th.start() restore_th = Producer(source_clusterid_queue,target_clusterid_queue) restore_th_list.append(restore_th) restore_th.start() for clusterid in clusterid_list: source_clusterid_queue.put(clusterid)
这里定义了两个队列,source_clusterid_queue接受外部元素,生产者获取该队列的中的元素,完成生产任务后,将clusterid 扔到 target_cluster_id_queue中,消费者从中获取元素进行消费
但是,运行该脚本你会发现,在消费完队列中的所有元素后,脚本会处于无限挂起的状态
原因在于消费者线程使用的 queue.get() 默认是一个阻塞方法,等到队列中的元素消费完毕,队列为空,消费者线程就会一直处于阻塞状态
阻塞改造
既然queue.get()会产生阻塞问题,那我们能否在消费完毕后,向队列中加入一个结束标记,消费者线程get()到该标记后主动退出? 于是有了如下改动:
# 仅显示改动部分 class Consumer(threading.Thread): def __init__(self, target_clusterid_queue): threading.Thread.__init__(self) self.clusterid_queue = target_clusterid_queue def _dns_update(self,cluster_id): if cluster_id == '_finish': pass time.sleep(2) print(f"{cluster_id} has consumed") def run(self): while True: ClusterId = self.clusterid_queue.get() self._dns_update(ClusterId) if ClusterId == '_finish': break if __name__=='__main__': source_clusterid_queue = queue.Queue() target_clusterid_queue = queue.Queue() clusterid_list = ['a','b'] restore_th_list=[] #启动生产者和消费者线程 for _ in clusterid_list: dns_update_th = Consumer(target_clusterid_queue) dns_update_th.start() restore_th = Producer(source_clusterid_queue,target_clusterid_queue) restore_th_list.append(restore_th) restore_th.start() for clusterid in clusterid_list: source_clusterid_queue.put(clusterid) #向queue发送结束结束信号,解除消费者队列阻塞 for clusterid in clusterid_list: target_clusterid_queue.put('_finish')
如上所示,我们在主线程中主动向队列添加"_finish"标记;在消费者线程中判断元素值,如果等于"_finish"就跳出循环,如此,就能解决get()的阻塞问题
再次运行脚本,如你所愿,脚本确实不会挂起了,但是执行逻辑却与初衷背道而驰了,消费者线程提前消费了"_finish"标记,却没有消费真正的业务信息就草草结束了
join 和 task_done的引入
原因在于, 消费者消费队列元素需要一定的时间,但我们的主线程并没有等待消费者线程消费结束,就提前结束消费者线程了;解决措施显而易见,我们需要在执行逻辑中等待队列完成,才能发起结束标记;因此,引入 queue.join()和 queue.task_done() 就水到渠成了。
官方对于两者的定义: queue.join(): 阻塞当前队列,只有队列中的任务都完成后,才往下运行
queue.task_done(): 表明入队的任务已经完成,一般被消费者线程调用; 消费者线程完成任务后,向队列告知任务已经完成
再次做改动:
class Producer(threading.Thread): def __init__(self, source_clusterid_queue, target_clusterid_queue): threading.Thread.__init__(self) self.source_clusterid_queue = source_clusterid_queue self.target_clusterid_queue = target_clusterid_queue def _restore_redis(self,clusterid): time.sleep(10) print(f"{clusterid} has restored") def run(self): source_clusterid = self.source_clusterid_queue.get() self._restore_redis(source_clusterid) self.target_clusterid_queue.put(source_clusterid) # 告知队列任务已完成,解除阻塞 self.source_clusterid_queue.task_done() class Consumer(threading.Thread): def __init__(self, target_clusterid_queue): threading.Thread.__init__(self) self.clusterid_queue = target_clusterid_queue def _dns_update(self,cluster_id): if cluster_id == '_finish': pass time.sleep(2) print(f"{cluster_id} has consumed") def run(self): while True: ClusterId = self.clusterid_queue.get() self._dns_update(ClusterId) # 告知队列任务已完成,解除阻塞 self.clusterid_queue.task_done() if ClusterId == '_finish': break if __name__=='__main__': source_clusterid_queue = queue.Queue() target_clusterid_queue = queue.Queue() clusterid_list = ['a','b'] restore_th_list=[] # 启动生产者和消费者线程 for _ in clusterid_list: dns_update_th = Consumer(target_clusterid_queue) dns_update_th.start() restore_th = Producer(source_clusterid_queue,target_clusterid_queue) restore_th_list.append(restore_th) restore_th.start() for clusterid in clusterid_list: source_clusterid_queue.put(clusterid) # 阻塞生产者队列,等待生产完成 source_clusterid_queue.join() # 向queue发送结束结束信号,解除消费者队列阻塞 for clusterid in clusterid_list: target_clusterid_queue.put('_finish') #等待所有的消费完成 target_clusterid_queue.join() print('All restore work done')
如上所示,我们主线程中分别加入了两个队列的join()方法:
第一个join()方法的含义是等待生产线程生产完成,发送完所有的消息后,才开始向消费者发送结束消息标记;
第二个join()方法等待消费者线程结束后打印日志;
分别对应了消费者和生产者线程中的task_done()方法;在最后一个task_done()执行完后,join()就会解除阻塞,从而执行下面的逻辑,这样我们就完整的实现了消费者线程结束的功能
如图所示,消费者正确执行了所有的消费任务,才结束了脚本
查看更多关于queue.task_done()与 queue.join() 的相爱相杀的详细内容...