好得很程序员自学网

<tfoot draggable='sEl'></tfoot>

queue.task_done()与 queue.join() 的相爱相杀

笔者在工作中经常接到这样的需求:利用现有的 redis 快照生成一个新的 redis 实例,并提供新实例的域名解析;由于生成 redis 和域名解析记录都是耗时操作;因此,顺理成章的把它构造成一个简单的生产者 - 消费者模型:

生产者利用 redis 快照生成新的实例 将新实例 id 传入队列

消费者接受队列消息中 id 并更新 dns 解析到新的实例上

代码如下:

# 省略具体的业务逻辑代码,主要实现生产者-消费者的逻辑
import json
import threading
import time
import queue

class Producer(threading.Thread):
    def __init__(self, source_clusterid_queue, target_clusterid_queue):
        threading.Thread.__init__(self)
        self.source_clusterid_queue = source_clusterid_queue
        self.target_clusterid_queue = target_clusterid_queue

    def _restore_redis(self,clusterid):
        time.sleep(10)
        print(f"{clusterid} has restored")

    def run(self):
        source_clusterid = self.source_clusterid_queue.get()
        self._restore_redis(source_clusterid)
        self.target_clusterid_queue.put(source_clusterid)

class Consumer(threading.Thread):
    def __init__(self, target_clusterid_queue):
        threading.Thread.__init__(self)
        self.clusterid_queue = target_clusterid_queue

    def _dns_update(self,cluster_id):
        time.sleep(2)
        print(f"{cluster_id} has been consumed")

    def run(self):
        while True:  
            ClusterId = self.clusterid_queue.get()
            self._dns_update(ClusterId)

if __name__=='__main__':
    source_clusterid_queue = queue.Queue()
    target_clusterid_queue = queue.Queue()
    clusterid_list = ['a','b']

    restore_th_list=[]
	
    #启动生产者和消费者线程
    for _ in clusterid_list:
        dns_update_th = Consumer(target_clusterid_queue)
        dns_update_th.start()
        restore_th = Producer(source_clusterid_queue,target_clusterid_queue)
        restore_th_list.append(restore_th)
        restore_th.start()

    for clusterid in clusterid_list:
        source_clusterid_queue.put(clusterid)

这里定义了两个队列,source_clusterid_queue接受外部元素,生产者获取该队列的中的元素,完成生产任务后,将clusterid 扔到 target_cluster_id_queue中,消费者从中获取元素进行消费

但是,运行该脚本你会发现,在消费完队列中的所有元素后,脚本会处于无限挂起的状态

原因在于消费者线程使用的 queue.get() 默认是一个阻塞方法,等到队列中的元素消费完毕,队列为空,消费者线程就会一直处于阻塞状态

阻塞改造

既然queue.get()会产生阻塞问题,那我们能否在消费完毕后,向队列中加入一个结束标记,消费者线程get()到该标记后主动退出?  于是有了如下改动:

# 仅显示改动部分
class Consumer(threading.Thread):
    def __init__(self, target_clusterid_queue):
        threading.Thread.__init__(self)
        self.clusterid_queue = target_clusterid_queue
        
    def _dns_update(self,cluster_id):
        if cluster_id == '_finish':
            pass  
        time.sleep(2)
        print(f"{cluster_id} has consumed")
        
    def run(self):
        while True:  
            ClusterId = self.clusterid_queue.get()
            self._dns_update(ClusterId)
            if ClusterId == '_finish':   
               break
               
if __name__=='__main__':
    source_clusterid_queue = queue.Queue()
    target_clusterid_queue = queue.Queue()
    clusterid_list = ['a','b']
    restore_th_list=[]
    #启动生产者和消费者线程
    for _ in clusterid_list:
        dns_update_th = Consumer(target_clusterid_queue)
        dns_update_th.start()
        restore_th = Producer(source_clusterid_queue,target_clusterid_queue)
        restore_th_list.append(restore_th)
        restore_th.start()
    for clusterid in clusterid_list:
        source_clusterid_queue.put(clusterid)
    #向queue发送结束结束信号,解除消费者队列阻塞
    for clusterid in clusterid_list:
        target_clusterid_queue.put('_finish')

如上所示,我们在主线程中主动向队列添加"_finish"标记;在消费者线程中判断元素值,如果等于"_finish"就跳出循环,如此,就能解决get()的阻塞问题

再次运行脚本,如你所愿,脚本确实不会挂起了,但是执行逻辑却与初衷背道而驰了,消费者线程提前消费了"_finish"标记,却没有消费真正的业务信息就草草结束了

join 和 task_done的引入

原因在于, 消费者消费队列元素需要一定的时间,但我们的主线程并没有等待消费者线程消费结束,就提前结束消费者线程了;解决措施显而易见,我们需要在执行逻辑中等待队列完成,才能发起结束标记;因此,引入 queue.join()和 queue.task_done() 就水到渠成了。

官方对于两者的定义: queue.join(): 阻塞当前队列,只有队列中的任务都完成后,才往下运行

queue.task_done(): 表明入队的任务已经完成,一般被消费者线程调用; 消费者线程完成任务后,向队列告知任务已经完成

再次做改动:

class Producer(threading.Thread):
    def __init__(self, source_clusterid_queue, target_clusterid_queue):
        threading.Thread.__init__(self)
        self.source_clusterid_queue = source_clusterid_queue
        self.target_clusterid_queue = target_clusterid_queue
    def _restore_redis(self,clusterid):
        time.sleep(10)
        print(f"{clusterid} has restored")
    def run(self):
        source_clusterid = self.source_clusterid_queue.get()
        self._restore_redis(source_clusterid)
        self.target_clusterid_queue.put(source_clusterid)
        # 告知队列任务已完成,解除阻塞
        self.source_clusterid_queue.task_done()          
        
class Consumer(threading.Thread):
    def __init__(self, target_clusterid_queue):
        threading.Thread.__init__(self)
        self.clusterid_queue = target_clusterid_queue
        
    def _dns_update(self,cluster_id):
        if cluster_id == '_finish':
            pass  
        time.sleep(2)
        print(f"{cluster_id} has consumed")
    def run(self):
        while True:
            ClusterId = self.clusterid_queue.get()
            self._dns_update(ClusterId)
            # 告知队列任务已完成,解除阻塞
            self.clusterid_queue.task_done()  
            if ClusterId == '_finish':
                break
                
if __name__=='__main__':
    source_clusterid_queue = queue.Queue()
    target_clusterid_queue = queue.Queue()
    clusterid_list = ['a','b']
    restore_th_list=[]
    # 启动生产者和消费者线程
    for _ in clusterid_list:
        dns_update_th = Consumer(target_clusterid_queue)
        dns_update_th.start()
        restore_th = Producer(source_clusterid_queue,target_clusterid_queue)
        restore_th_list.append(restore_th)
        restore_th.start()
    for clusterid in clusterid_list:
        source_clusterid_queue.put(clusterid)
    # 阻塞生产者队列,等待生产完成
    source_clusterid_queue.join()
    # 向queue发送结束结束信号,解除消费者队列阻塞
    for clusterid in clusterid_list:
        target_clusterid_queue.put('_finish')
 
    #等待所有的消费完成
    target_clusterid_queue.join()  
    print('All restore work done')

如上所示,我们主线程中分别加入了两个队列的join()方法:

第一个join()方法的含义是等待生产线程生产完成,发送完所有的消息后,才开始向消费者发送结束消息标记;

第二个join()方法等待消费者线程结束后打印日志;

分别对应了消费者和生产者线程中的task_done()方法;在最后一个task_done()执行完后,join()就会解除阻塞,从而执行下面的逻辑,这样我们就完整的实现了消费者线程结束的功能

如图所示,消费者正确执行了所有的消费任务,才结束了脚本

查看更多关于queue.task_done()与 queue.join() 的相爱相杀的详细内容...

  阅读:37次