python 并发执行之多线程

正常情况下,我们在启动一个程序的时候。这个程序会先启动一个进程,启动之后这个进程会拉起来一个线程。这个线程再去处理事务。也就是说真正干活的是线程,进程这玩意只负责向系统要内存,要资源但是进程自己是不干活的。默认情况下只有一个进程只会拉起来一个线程。

    多线程顾名思义,就是同样在一个进程的情况同时拉起来多个线程。上面说了,真正干活的是线程。进程与线程的关系就像是工厂和工人的关系。那么现在工厂还是一个,但是干活的工人多了。那么效率自然就提高了。因为只有一个进程,所以多线程在提高效率的同时,并没有向系统伸手要更多的内存资源。因此使用起来性价比还是很高的。但是多线程虽然不更多的消耗内存,但是每个线程却需要CPU的的参与。

相当于工厂虽然厂房就一间,可以有很多的工人干活。但是这些工人怎么干活还得靠厂长来指挥。工人太多了,厂长忙不过来安排一样效率不高。所以工人(线程)的数量最好还是在厂长(cpu)的能力(内核数)范围之内比较好。

    在python中多线程的实现方式有两种,我的总结就是一种是函数形式的。一种是通过自己创建一个类并继承threading.Thread类来实现的。其实关于多线程用到模块,也是有两种。一种是thread。这个模块是最原始的多线程模块,但是这个模块据说是比较low的。threading模块封装了thread模块,反正就是比较高级,反正就是没人用thread写程序,都用threading!!记住就好~

下面先来介绍第一种,也是我认为比较简单的一种函数形式的。

    先举个例子看下面的代码

import time
def haha(max_num):
    """
    随便定义一个函数,要求用户输入一个要打印数字的最大范围
    输入之后就会从0开始打印,直到用户输入的范围值
    """
    for i in range(max_num):
        """
        每次打印一个数字前要间隔1秒,那么打印10个数就要耗时10秒
        """
        time.sleep(1)
        print i
for x in range(3):
    haha(10)

上面的代码没什么难度,只是展现一下如果顺序执行函数haha()。执行三遍需要耗时30秒。因为程序要执行完第一个循环之后才会执行第二个循环。时间是累加的。

    现在我们引入多线程的方式执行。看看会不会有什么变化。

import threading
import time
def haha(max_num):
    """
    随便定义一个函数,要求用户输入一个要打印数字的最大范围
    输入之后就会从0开始打印,直到用户输入的最大范围
    """
    for i in range(max_num):
        """
        每次打印一个数字要间隔1秒,那么打印10个数就要耗时10秒
        """
        time.sleep(1)
        print i
for x in range(3):
    """
    这里的rang(3)是要依次启动三个线程,每个线程都调用函数haha()
    第一个线程启动执行之后,马上启动第二个线程再次执行。最后也相当
    函数执行了3次
    """
    #通过threading.Thread方法实例化多线程类
    #target后面跟的是函数的名称但是不要带括号也不填写参数
    #args后面的内容才是要传递给函数haha()的参数。切记参数一定要以数组的形式填写不然会报错。
    t=threading.Thread(target=haha,args=(10,))
    #将线程设置为守护线程
    t.setDaemon(True)
    #线程准备就绪,随时等候cpu调度
    t.start()

执行的结果是。。。。。。。。。。。。。。什么都没有发生!!!!没有任何输出。什么情况??!!!是不是代码有错误??!

其实问题就出在t.setDaemon(True)  这一句上。默认不写这句或者说默认设置的情况这一句应该是

t.setDaemon(False)这样子的。那这一句是什么意思呢?

setDaemon   设置为后台线程或前台线程(默认)

            如果是后台线程,主线程执行过程中,后台线程也在进行,主线程执行完毕后,后台线程不论成功与否,均停止

            如果是前台线程,主线程执行过程中,前台线程也在进行,主线程执行完毕后,等待前台线程也执行完成后,程序停止

这些什么前台、后台、主线程都是什么玩意?听着是不是特别晕?其实没有这么复杂。简单理解就是如果这个参数是True,就表示程序流程跑完之后直接就关闭线程然后退出了,根本不管线程是否执行完。从上面的例子可以看出来,我们每执行一遍函数haha()最少也得耗时10秒,哪怕是打印第一个数字出来也得停顿1秒之后才会输出。但是程序流程就是拉起来三个线程就结束了。执行启动线程的3次for循环可用不了10秒,1秒都用不到就结束了。所以就出现了我们看到的结果,程序拉起来3个线程,就结束了主线程但是此时线程调用的函数haha()还没来得及输出呢,就被迫跟着程序一起结束了。

    既然找到了原因,我们就来修改一下代码。把碍事的那部分置为默认值或者干脆不写这一行

import threading
import time
def haha(max_num):
    for i in range(max_num):
        time.sleep(1)
        print i
for x in range(3):
    t=threading.Thread(target=haha,args=(5,))
    #也可以干脆不写这一行
    t.setDaemon(False)
    t.start()

现在运行,就可以看到看起来很乱的执行结果

0
00
1
 11
2
2
2
3
3
3
4
4
4

其实这就是三个线程并行运行同时输出,所以把结果都输出到一起引起。正是这种乱才整明白了确实三个函数haha()在同时运行。

    如果想让结果看起来规则一些可以考虑使用join()方法

import threading
import time
def haha(max_num):
    for i in range(max_num):
        time.sleep(1)
        print i
for x in range(3):
    t=threading.Thread(target=haha,args=(5,))
    t.start()
    #通过join方法让线程逐条执行
    t.join()

这样执行的结果看起来就美观了

0
1
2
3
4
0
1
2
3
4
0
1
2
3
4

就像注释所说的那样,美观是没问题了。可是这样的话虽然创建了多个线程,每个线程却是依次执行的。没有了并行还要多线程干嘛。这样和最上面写的串行执行例子就一个效果了。因此join方法不能随便乱用的。

    可是既然有了join()方法它总得有用吧?设计出来肯定不是为了摆着看的。现在我们再修改一下代码,看看join()方法到底怎么正确使用。

import threading
import time
def haha(max_num):
    for i in range(max_num):
        time.sleep(1)
        print i
"""
创建一个列表,用于存储要启动多线程的实例
"""
threads=[]
for x in range(3):
    t=threading.Thread(target=haha,args=(5,))
    #把多线程的实例追加入列表,要启动几个线程就追加几个实例
    threads.append(t)
for thr in threads:
    #把列表中的实例遍历出来后,调用start()方法以线程启动运行
    thr.start()
for thr in threads:
    """
    isAlive()方法可以返回True或False,用来判断是否还有没有运行结束
    的线程。如果有的话就让主线程等待线程结束之后最后再结束。
    """
    if thr.isAlive():
        thr.join()

上面学习setDaemon()方法的时候我们知道,主线程其实就相当于程序的主运行流程。那么程序运行的时候最先启动的一定就是主线程,主线程负责拉起子线程用于干活。我们的例子中运行函数haha()线程其实都是子线程。因此可以说多线程其实就是多个子线程。那么程序运行完最后一个退出的也肯定就是主线程。因此上例中最后再遍历一个遍threads列表的目的就是查看还是否有没有退出的子线程,只要还有子线程是活的,没有退出。就通过join()方法强制程序流程不可以走到主线程退出的那个步骤。只有等子线程都退出之后,才能根据join()方法的规则顺序执行到主线程退出的步骤。

  第二种创建多线程的方式就是通过自定义一个类来实现的。

import threading
import time
class haha(threading.Thread):
    """
    自定义一个类haha,必须要继承threading.Thread,下面必须要重写一个run()方法。
    把要执行的函数写到run()方法里。如果没有run()方法就会报错。其实这个类的作用就是
    通过haha类里面的run()方法来定义每个启动的子线程要执行的函数内容。
    """
    def __init__(self,max_num):
        threading.Thread.__init__(self)
        self.max_num=max_num
    def run(self):
        for i in range(self.max_num):
            time.sleep(1)
            print i
if __name__=='__main__':
    threads=[]
    for x in range(3):
        """
        只是这里和函数方式有点区别,因为haha类继承了threading.Thread,所以通过haha类的实例化
        就相当于调用了多线程的实例化。剩下的操作就和函数方式一个样子了。
        """
        t=haha(5)
        threads.append(t)
    for thr in threads:
        thr.start()
    for thr in threads:
        if thr.isAlive():
            thr.join()

以上就是实现多线程的两种方式,根据个人喜好选择就好。没什么本质区别。

下面介绍一下线程锁,先看下面一段代码

import threading
#定义一个变量
gnum=0
def work(max_number):
    for i in range(max_number):
        print i
        
def mylock():
    global gnum
    """
    这个函数运行的时候需要先运行一下函数work()
    执行完之后将全局的gnum+1 
    """
    work(10)
    #将变量声明为全局变量
    gnum=gnum+1
    print 'gnum is ',gnum
for x in range(5):
    """
    同时启动5个现成运行mylock()函数
    """
    t=threading.Thread(target=mylock)
    t.start()

上面的例子看起来也不难,目的就是在执行gnum+1之前先运行另外一个耗时的函数而已。因为我们启动了5个线程同时运行,理论上运行流程应该是第一个线程运行完成之后gnum+1=1,此时第二个线程也运行完了在gnum=1的基础上再加1,使gnum=2。以此类推,最后当5个线程运行完了的时候gnum应该等于5。但是实际运行的时候并不是我们想象的那个样子!!!!!

    真实的情况是当我们第一个线程运行的时候gnum=0,运行一个耗时的work()函数。因为线程是并发执行的,那这时候在第一个work()还没运行完的情况下,第二个线程又启动开始运行了。第一个线程没有运行完的情况下,是不会执行gnum+1操作的。此时对第二个线程来说依旧是gnum=0。之后第一个线程结束的时候gnum经过自加1变成了gnum=1,可是第二个线程还是当初取值的时候还是按照gnum=0来进行的自加运算。所以第二次运算的结果很有可能还是gnum=1。没有达到我们理想的gnum=2的效果。

    从这里就可以看出来,如果多线程执行的任务互不相干那自然什么事情都没有。一旦要利用多线程多同一个变量进行操作的时候,因为线程是并发执行的。所以很有很可能同时修改变量,导致最终结果不符合我们的预期。

    遇到这种情况一个方案就是用我们上面跳到join方法,让线程依次运行。这样同时就只有一个线程在修改变量,不会出现混乱。但是问题还是一样多线程并发的效果就没有了。肯定不可取。第二个

方案就是使用线程锁。什么是线程锁呢?就是在多个线程同时操作一个资源的时候,哪个线程先操作。哪个线程就先锁定这个资源。直到这个线程操作结束打开锁之后,其他的线程才能再操作。这就叫做线程安全,也就是线程锁。听起来好像和join()方法有点类似。其实还是有区别的,先来看看加了线程锁的代码。

import threading
gnum=0
lock=threading.RLock()
def work(max_number):
    for i in range(max_number):
        print i
def mylock():
    work(10)
    #在操作gnum之前先上锁
    #acquire()的括号里可以定义锁定的timeout时间,超过这个时间就自动打开锁
    lock.acquire()
    global gnum
    gnum=gnum+1
    #操作结束之后再打开锁
    lock.release()
    print 'gnum is ',gnum
for x in range(5):
    t=threading.Thread(target=mylock)
    t.start()

上从面的代码可以看出区别,join()方法是对整个线程做限制。而线程锁lock.acquire是在线程执行过程中对某一部分进行锁限制。例子中被启动的各个线程还是可以并行运行work()这个比较耗时的函数,只是在gnum的处理上才会受到锁的限制而已。这样就解决了多线程同时操作一个资源引发错误数据的问题。另外一个要注意的就是threading.RLock()也是Lock()的高级用法,用这个高级的就可以了。

 多线程的event事件

一般情况下,多线程在创建之后就开始立即投入工作。没有任何停顿。但是有时候我们也许并不希望如此。比如我们要写一个爬虫程序。在爬取网页之前,我希望先ping一下这个网页。看看这个网页网页是否可以ping通。如果通了就释放线程去爬取内容。如果不通就去测试下一个网页。所以python线程的事件用于主线程控制其他线程的执行,事件主要提供了三个方法 set、wait、clear。其中event.wait()相当于一个全局的标识,程序根据event.set()和event.clear()两个方法分别定制这个全局Flag的值为True或者Flase。当Flag=True的时候就相当于收到释放所有线程的信号。看下面一个列子

import threading
def do(event):
    print 'start'
    #函数执行到这里等待信号放行信号
    event.wait()
    #收到放行信号后执行下面的语句
    print 'execute'
#实例化threading.Event()事件
event_obj = threading.Event()
for i in range(10):
    t = threading.Thread(target=do, args=(event_obj,))
    t.start()
#先将Flag标识置为False
event_obj.clear()
inp = raw_input('input:')
#如果用户输入'true'就像wait()发送放行信号
if inp == 'true':
    event_obj.set()

这样就完成通过set()和clear()方法控制线程运行的目的

最后再简单介绍一下GIL

GIL是python的全局解释器锁的简称。这个锁是干什么用的呢?说白了就是限制python解释调用cpu内核之用的。多线程理论上可以同时调用多个cpu内核同时工作,比如java语言就可以做到。但是python因为GIL的存在,同一时间只有一条进程在cpu内核中进行处理。虽然我们可以看到多线程并发运行,但是那只是因为cpu内核通过上下文的切换快速将多个线程来回执行造成的假象。python和java那种可以真正调用多核心多线程的语言,在效率上还是有差异的。这个就是python一直被人诟病的GIL锁

Top