好得很程序员自学网

<tfoot draggable='sEl'></tfoot>

Python实现线程池之线程安全队列

本文实例为大家分享了Python实现线程池之线程安全队列的具体代码,供大家参考,具体内容如下

一、线程池组成

一个完整的线程池由下面几部分组成,线程安全队列、任务对象、线程处理对象、线程池对象。其中一个线程安全的队列是实现线程池和任务队列的基础,本节我们通过threading包中的互斥量threading.Lock()和条件变量threading.Condition()来实现一个简单的、读取安全的线程队列。

二、线程安全队列的实现

包括put、pop、get等方法,为保证线程安全,读写操作时要添加互斥锁;并且pop操作可以设置等待时间以阻塞当前获取元素的线程,当新元素写入队列时通过条件变量通知解除等待操作。

class ThreadSafeQueue(object):

? ? def __init__(self, max_size=0):
? ? ? ? self.queue = []
? ? ? ? self.max_size = max_size ?# max_size为0表示无限大
? ? ? ? self.lock = threading.Lock() ?# 互斥量
? ? ? ? self.condition = threading.Condition() ?# 条件变量

? ? def size(self):
? ? ? ? """
? ? ? ? 获取当前队列的大小
? ? ? ? :return: 队列长度
? ? ? ? """
? ? ? ? # 加锁
? ? ? ? self.lock.acquire()
? ? ? ? size = len(self.queue)
? ? ? ? self.lock.release()
? ? ? ? return size

? ? def put(self, item):
? ? ? ? """
? ? ? ? 将单个元素放入队列
? ? ? ? :param item:
? ? ? ? :return:
? ? ? ? """
? ? ? ? # 队列已满 max_size为0表示无限大
? ? ? ? if self.max_size != 0 and self.size() >= self.max_size:
? ? ? ? ? ? return ThreadSafeException()

? ? ? ? # 加锁
? ? ? ? self.lock.acquire()
? ? ? ? self.queue.append(item)
? ? ? ? self.lock.release()
? ? ? ? self.condition.acquire()
? ? ? ? # 通知等待读取的线程
? ? ? ? self.condition.notify()
? ? ? ? self.condition.release()

? ? ? ? return item

? ? def batch_put(self, item_list):
? ? ? ? """
? ? ? ? 批量添加元素
? ? ? ? :param item_list:
? ? ? ? :return:
? ? ? ? """
? ? ? ? if not isinstance(item_list, list):
? ? ? ? ? ? item_list = list(item_list)

? ? ? ? res = [self.put(item) for item in item_list]

? ? ? ? return res

? ? def pop(self, block=False, timeout=0):
? ? ? ? """
? ? ? ? 从队列头部取出元素
? ? ? ? :param block: 是否阻塞线程
? ? ? ? :param timeout: 等待时间
? ? ? ? :return:
? ? ? ? """
? ? ? ? if self.size() == 0:
? ? ? ? ? ? if block:
? ? ? ? ? ? ? ? self.condition.acquire()
? ? ? ? ? ? ? ? self.condition.wait(timeout)
? ? ? ? ? ? ? ? self.condition.release()
? ? ? ? ? ? else:
? ? ? ? ? ? ? ? return None

? ? ? ? # 加锁
? ? ? ? self.lock.acquire()
? ? ? ? item = None
? ? ? ? if len(self.queue):
? ? ? ? ? ? item = self.queue.pop()
? ? ? ? self.lock.release()

? ? ? ? return item

? ? def get(self, index):
? ? ? ? """
? ? ? ? 获取指定位置的元素
? ? ? ? :param index:
? ? ? ? :return:
? ? ? ? """
? ? ? ? if self.size() == 0 or index >= self.size():
? ? ? ? ? ? return None

? ? ? ? # 加锁
? ? ? ? self.lock.acquire()
? ? ? ? item = self.queue[index]
? ? ? ? self.lock.release()

? ? ? ? return item


class ThreadSafeException(Exception):
? ? pass

三、测试逻辑

3.1、测试阻塞逻辑

def thread_queue_test_1():
? ? thread_queue = ThreadSafeQueue(10)

? ? def producer():
? ? ? ? while True:
? ? ? ? ? ? thread_queue.put(random.randint(0, 10))
? ? ? ? ? ? time.sleep(2)

? ? def consumer():
? ? ? ? while True:
? ? ? ? ? ? print('current time before pop is %d' % time.time())
? ? ? ? ? ? item = thread_queue.pop(block=True, timeout=3)
? ? ? ? ? ? # item = thread_queue.get(2)
? ? ? ? ? ? if item is not None:
? ? ? ? ? ? ? ? print('get value from queue is %s' % item)
? ? ? ? ? ? else:
? ? ? ? ? ? ? ? print(item)
? ? ? ? ? ? print('current time after pop is %d' % time.time())

? ? t1 = threading.Thread(target=producer)
? ? t2 = threading.Thread(target=consumer)
? ? t1.start()
? ? t2.start()
? ? t1.join()
? ? t2.join()

测试结果:

我们可以看到生产者线程每隔2s向队列写入一个元素,消费者线程当无数据时默认阻塞3s。通过执行时间发现消费者线程确实发生了阻塞,当生产者写入数据时结束当前等待操作。

3.2、测试读写加锁逻辑

def thread_queue_test_2():
? ? thread_queue = ThreadSafeQueue(10)

? ? def producer():
? ? ? ? while True:
? ? ? ? ? ? thread_queue.put(random.randint(0, 10))
? ? ? ? ? ? time.sleep(2)

? ? def consumer(name):
? ? ? ? while True:
? ? ? ? ? ? item = thread_queue.pop(block=True, timeout=1)
? ? ? ? ? ? # item = thread_queue.get(2)
? ? ? ? ? ? if item is not None:
? ? ? ? ? ? ? ? print('%s get value from queue is %s' % (name, item))
? ? ? ? ? ? else:
? ? ? ? ? ? ? ? print('%s get value from queue is None' % name)

? ? t1 = threading.Thread(target=producer)
? ? t2 = threading.Thread(target=consumer, args=('thread1',))
? ? t3 = threading.Thread(target=consumer, args=('thread2',))
? ? t1.start()
? ? t2.start()
? ? t3.start()
? ? t1.join()
? ? t2.join()
? ? t3.join()

测试结果:

生产者还是每2s生成一个元素写入队列,消费者开启两个线程进行消费,默认阻塞时间为1s,打印结果显示通过加锁确保每次只有一个线程能获取数据,保证了线程读写的安全。

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持。

查看更多关于Python实现线程池之线程安全队列的详细内容...

  阅读:40次