queue消息队列与守护进程

CSDN博客地址:https://blog.csdn.net/superDE009/article/details/107847070

queue消息队列与守护进程

在上一篇博客中大致讲以一些多线程编程的入门概念以及如何创建启动线程。但是要讲多线程实际使用在实际任务中,我们需要了解一些更加深入的概念:queue消息队列与守护进程

一、消息队列(queue)

在上一篇文章中,我们成功的创建的多线线程,并让他们同步运行了。但是有一个问题,这多个线程都运行的是同一个方法,而在正常的python编程中,通常将不同功能的代码放入不同的方法,多个方法相互配合,最终完成任务。在多线程编程时,我们也可以由多个不同功能的方法创建多个线程同步运行,将原来要顺序执行的方法,改为了同步执行,每个方法都在同步的获取传入数据,处理数据,将完成的数据交给下一个方法处理,如流水线一般。这就是生产者消费者模型的雏形

1.生产者消费者模型

当多个不同功能的线程同步执行时,其中部分线程负责获取(生产)数据,另一部分则负责分析(消费)获取到的数据,实现流水线般的数据操作。这就是生产者消费者模型的雏形。
其中获取数据的线程即为生产者,分析数据的线程即为消费者

以python中最常见的爬虫举例

爬虫的代码主要分为爬取数据,分析数据,整合输出数据三大块,在多线程编程中,我们通常将这三个功能分别放入不同方法,启用三个不同线程来执行。
在三个线程同步执行时,

  • 爬取数据线程输出爬到的数据
  • 分析数据线程获取爬到的数据,输出分析后的数据
  • 整合数据线程获取分析后的数据,输出结果。

这里三个线程是同步运行的,所以,在分析线程开始分析第一批数据时,爬取线程可同步爬取第二批数据,流水线操作,达到提高效率的目的
在这个例子中,爬取线程就是生产者,整合线程即消费者,而分析线程既是爬取线程的消费者,也是整合线程的生产者。

两个问题

要实现这样高效率的生产消费模型,我们需要解决两个重大难题

  1. 如何实现两个不同线程之间的数据通讯,让生产者线程的数据可以交给消费者线程
  2. 若生产者产出的效率低于消费者消耗的效率的话,就会出现消费者等待生产者产出数据,反之,则出现生产者等待消费者消耗完上一批数据,而当线程处于等待状态时,整体的运行效率都会降低。

好在,这些问题都有一个终极解决方案:消息队列(queue)

2.消息队列(queue)

线程间数据传输

在python的多线程编程中,我们可以通过queue消息队列来在线程之间进行数据交换。

queue本身类似于一种数据容器,其中可包含指定数量的数据,同一进程下的任一线程都能向该进程中的queue中添加或读取数据,由此解决了多线程之间相互通讯的问题。

解决生产消费效率不同

至于生产消费效率不同的问题,由于所有产出的数据都被放入到了queue中,各线程之间不直接交换数据。生产者产出数据后不用等消费者处理,直接放入消息队列中,消费者也不用向生产者要数据,而是直接从queue中取数据,从而避免不必要的等待,提高程序效率。

3.queue实例演示

概念上的基本就这么多了,接下来开始实际演示。
以下代码中,

from queue import Queue

导入消息队列模块

in_q.put(i)queue.put()方法向消息队列中放入数据
out_q.get(i)queue.get()方法从消息队列中取出数据
consumer方法中必须使用死循环,否则由该方法创建的线程将会在方法运行一次后直接退出,而我们的目的是要求consumer不断处理producer产生的数据。
在producer全部生产结束后,我们可以在消息队列中添加一个结束标志来结束consumer进程的死循环。

from queue import Queue#引入消息队列
from threading import Thread
def producer(in_q):#生产者
    for i in range(10):
        print('生产数据')
        in_q.put(i)
    in_q.put(False)#添加结束标志
    print('生产完成')

def consumer(out_q):#消费者
    while True:
        j=out_q.get()
        if j is False:#检测到结束标志,退出循环
            break
        else:
            print("消费",j)

q=Queue()#创建消息队列
T_producer = Thread(target=producer,args=(q,))
T_consumer = Thread(target=consumer,args=(q,))
T_consumer.start()
T_producer.start()

输出结果

生产数据
生产数据
消费 0
生产数据
消费 1
生产数据
消费 2
生产数据
消费 3
生产数据
消费 4
生产数据
消费 5
生产数据
消费 6
生产数据
消费 7
生产数据
消费 8
生产完成
消费 9

至此,我们实现了多个线程同步相互配合地处理不同任务。

  • queue模块中的常用方法: :

queue.qsize() 返回队列的大小
queue.empty() 如果队列为空,返回True,反之False
queue.full() 如果队列满了,返回True,反之False
queue.full 与maxsize 大小对应
queue.get([block[, timeout]]) 获取队列,timeout等待时间
queue.get_nowait() 相当queue.get(False)
queue.put(item) 写入队列,timeout等待时间
queue.put_nowait(item) 相当queue.put(item, False)
queue.task_done() 在完成一项工作之后,queue.task_done()函数向任务已经完成的队列发送一个信号
queue.join() 实际上意味着等到队列为空,再执行别的操作

二、守护线程与queue.join(),queue.task_done()方法

在上一个queue的实例中,consumer方法的死循环的停止是需要靠producer的放入queue中的停止标志,而若是在多consumer,producer的情况下,利用queue中的停止标志来控制consumer线程退出是十分繁杂的,我们需要一种更好的方式来控制consumer线程退出。
答案就是守护线程与queue.join(),queue.task_done()方法的联合使用。

1.守护线程(Daemon Thread)

原理

我们都知道,普通的线程是在执行完目标方法后,才会结束,这也就导致了consumer线程无法自动退出。
而守护线程(Daemon Thread),则是在进程中所有非守护线程的线程全部结束后,无论有没有守护线程,主进程都将强制结束,即所有的线程结束(包括守护线程)。

守护线程定义上是指为其他线程服务的线程,即consumer为producer服务那样,当被服务的线程(非守护线程)全部结束后,所有的守护进程都没有了服务对象,也就没有存在的必要了,于是全部结束。

举例

以上的概念十分的抽象,让我们举个例子来理解一下吧。
下面的语句用于将线程指定为守护线程

Threadname.daemon = True#将线程指定为守护线程

在下面的代码中,我将consumer线程指定为守护线程,运行

from queue import Queue
from threading import Thread
def producer(in_q):
    for i in range(10):
        print('生产数据')
        in_q.put(i)
    print('生产完成')

def consumer(out_q):
    while True:
        j=out_q.get()
        print("消费",j)

q=Queue()
T_producer = Thread(target=producer,args=(q,))
T_consumer = Thread(target=consumer,args=(q,))
T_consumer.daemon = True#将consumer设置为守护进程
T_producer.start()
T_consumer.start()

结果输出:
通过结果可以看出,在输出“生产结束”后,(即代表producer线程运行结束),整个进程直接结束,consumer也一起自动退出了。
至此,我们实现的consumer的自动退出功能。

生产数据 0
生产数据 1
生产数据 2
消费 0
生产数据 3
消费 1
生产数据 4
消费 2
生产数据 5
消费 3
生产数据 6
消费 4
生产数据 7
消费 5
生产数据 8
消费 6
生产数据 9
消费 7
生产完成

在执行以上程序后,可能会报以下错误,这是由于consumer在被强制退出时,print语句调用内存失败所以报错,这里暂时不管。
Fatal Python error: could not acquire lock for <_io.BufferedWriter name=”> at interpreter shutdown, possibly due to daemon threads

由以上例子我们可以看出,守护进程成功解决了线程无法自动退出的情况。
但是,又出现了一个新的问题,守护进程(consumer)会在非守护进程(producer)结束的瞬间同时结束,而通常来说,此时守护进程对数据的处理还未完成,如上例结果中,consumer消费完第7个数据时,producer已经完成生产,整个进程结束了。
所以我们需要找到一种新的方法来保证主进程在消息队列中所有数据处理完成后,再退出。
这里我们就要用到queue.join(),queue.task_done()方法来实现了

2.queue.join(),queue.task_done()方法

Queue.task_done()

用于通知当前queue,queue中先前的一个数据已经被consumer线程使用完成了,
如果join()调用正在阻塞,当队列中所有的数据都被获取并处理后它将恢复执行
如果调用次数超过了队列中放置的条目数目,将抛出ValueError异常。

Queue.join()

阻塞直到queue中所有的数据都被consumer线程获取并处理,
当一个条目被增加到队列时,未完成任务的计数将增加。当一个消费者线程调用task_done()时,未完成任务的计数将减少。当未完成任务的计数减少到0时,join()解锁。

原理(看完再看上面的理解起来更加方便)

在queue模块中,存在一个名为self.unfinished_tasks的变量,默认值为0,每当有一个数据被queue.put()方法放入queue中时,self.unfinished_tasks的值+1,而每调用一次Queue.task_done()方法,self.unfinished_tasks的值-1,当self.unfinished_tasks的值不为0时,则Queue.join()方法会阻塞整个进程,等待直到self.unfinished_tasks的值变为0后,被Queue.join()阻塞的进程才会解锁,继续执行。

实例

在上面的理论中,我们已经了解了queue.join()queue.task_done()方法的功能
之前在使用守护线程的过程中,我们遇到了consumer线程提前退出的问题,为了解决这个问题,我们只要让整个进程等待到queue中的所有数据都被consumer线程获取并处理完成之后再退出。
Queue.task_done()的功能是通知queue数据是否处理完成,Queue.join()功能能够让线程等待。
由这两个方法配合使用即可解决以上问题。
这么说的还是太抽象,我们上代码:

producer线程中每调用一次put方法,那么未完成的任务数量+1
在consumer方法中加入Queue.task_done()方法,让每一次consumer线程处理完成后,都通知queue任务处理完成。(未完成的任务数量-1)
在整个程序末尾加入Queue.join()方法,让程序顺序运行启动两个线程后,进入等待状态,直到所有的任务都完成后(未完成的任务数量归零),再退出。

from queue import Queue
from threading import Thread
def producer(in_q):
    for i in range(10):
        print('生产数据',i)
        in_q.put(i)
    print('生产完成')

def consumer(out_q):
    while True:
        j=out_q.get()
        print("消费",j)
        out_q.task_done()#通知queue该本次获取的数据已经处理完成
q=Queue()
T_producer = Thread(target=producer,args=(q,))
T_consumer = Thread(target=consumer,args=(q,))
T_consumer.daemon = True
T_producer.start()
T_consumer.start()
q.join()#阻塞整个进程,直到所有的数据都返回task_done后再继续执行。

输出结果
这一次在producer完成生产后,进程会等到consumer线程处理完所有queue中的数据后,再退出。

生产数据 0
生产数据 1
生产数据 2
生产数据 3
消费 0
消费 1
消费 2
生产数据 4
消费 3
生产数据 5
生产数据 6
生产数据 7
消费 4
消费 5
消费 6
消费 7
生产数据 8
生产数据 9
生产完成
消费 8
消费 9

由此,我们就完成让多线程之间通讯的功能。这样一来线程之间就能够共同合作完成一项任务了,实现流水线操作。

总结

在掌握了多线程之间通讯和控制同步运行的方法后,在一些需要处理大量数据的python脚本中,我们就可以使用多线程来提高处理的效率了。
但是多线程的学习之路还远未结束,在真正的运用多线程时,在很多情况下,部分线程时不允许同步运行的,如当两个线程需要同时写入一个文件时,这是我们就将用到”锁(Lock)“的概念来控制线程之间的同步运行。该知识点会在该系列的下篇博客中继续讲解。

同系列其他文章

进程与线程:https://blog.csdn.net/superDE009/article/details/107816034
queue与守护线程:https://blog.csdn.net/superDE009/article/details/107847070
互斥锁和GIL:https://blog.csdn.net/superDE009/article/details/107881944

本人的一个多线程爬虫,地址:https://github.com/DE009/DaZhongDianPing-mutiThread-
有需要可以拿去看看。
此篇博客为学习总结,如若有错,还请大佬多多指正。


发表评论

邮箱地址不会被公开。 必填项已用*标注