python3 threading模块

python threading example

python中使用线程

使用threading模块和_thread模块

_thread and hello world

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
#!/usr/bin/env python3
import _thread
import time

def print_hello(thread_name, msg):
    print("{thread_name}: hello, {msg}".format(thread_name=thread_name, msg=msg))

_thread.start_new_thread(print_hello, ("world", "world!"))
_thread.start_new_thread(print_hello, ("boy", "boy!"))

time.sleep(1)

执行2次的结果发现是随机的 liuliancao@liuliancao-dev:~/temp$ python3 test.py boy: hello, boy! world: hello, world! liuliancao@liuliancao-dev:~/temp$ python3 test.py world: hello, world! boy: hello, boy!

threading and hello world

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
#!/usr/bin/env python3
import time
import threading

def print_hello(thread_name, msg):
    print("{thread_name}: hello, {msg}".format(thread_name=thread_name, msg=msg))

threading.Thread(target=print_hello, kwargs={"thread_name":"world","msg":"world!"}).start()
threading.Thread(target=print_hello, kwargs={"thread_name":"boy","msg":"boy!"}).start()
time.sleep(1)

执行100次发现,结果是有顺序的, boy: hello, boy! world: hello, world! boy: hello, boy! world: hello, world! …

threading starts with a class definition hello world

简单实现
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
#!/usr/bin/env python3
import time
import threading

class HelloWorld(threading.Thread):

    def __init__(self, thread_name, msg):
        threading.Thread.__init__(self)
        self.thread_name = thread_name
        self.msg = msg

    def run(self):
        print("{thread_name}: hello, {msg}".format(thread_name=self.thread_name, msg=self.msg))

thread_world = HelloWorld(thread_name='world', msg="world!")
thread_boy = HelloWorld(thread_name="boy", msg="boy!")

thread_world.start()
thread_boy.start()

需要注意,run里面加参数并不会让start能传进去,所以这里建议把要定义的参数都塞进__init__里面 执行结果 world: hello, world! boy: hello, boy!

what's join

官方介绍

join(timeout=None) 等待,直到线程终结。这会阻塞调用这个方法的线程,直到被调用 join() 的线程终结 – 不管是正常终结还是抛出未处理异常 – 或者直到发生超时,超时选项是可选的。 当 timeout 参数存在而且不是 None 时,它应该是一个用于指定操作超时的以秒为单位的浮点数(或者分数)。因为 join() 总是返回 None ,所以你一定要在 join() 后调用 is_alive() 才能判断是否发生超时 – 如果线程仍然存活,则 join() 超时。 当 timeout 参数不存在或者是 None ,这个操作会阻塞直到线程终结。 一个线程可以被 join() 很多次。 如果尝试加入当前线程会导致死锁, join() 会引起 RuntimeError 异常。如果尝试 join() 一个尚未开始的线程,也会抛出相同的异常。

看不懂没关系,看下如下两个代码
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
#!/usr/bin/env python3
import time
import threading

class HelloWorld(threading.Thread):

    def __init__(self, thread_name, msg):
        threading.Thread.__init__(self)
        self.thread_name = thread_name
        self.msg = msg

    def run(self):
        if self.thread_name == 'world':
            time.sleep(3)
        else:
            time.sleep(1)
        print("{thread_name}: hello, {msg}".format(thread_name=self.thread_name, msg=self.msg))

thread_world = HelloWorld(thread_name='world', msg="world!")
thread_boy = HelloWorld(thread_name="boy", msg="boy!")

thread_world.start()
thread_boy.start()

#thread_world.join()
#thread_boy.join()
print("ending.")

结果是啥,再把最后的join注释掉看看执行啥…

从结果我们可以发现, join会阻塞当前进程,而不加join则不会阻塞,想象两个场景 a)写日志 b)查询 很明显,查询结果的时候应该block当前程序,等待返回结果,这个在后端逻辑很需要,而写日志这种则可以不加join 另外一个发现是thread执行的顺序其实是两个start后都开始的,为什么打印会有延迟,是因为我们已经执行了sleep

threading锁

不加锁的时候

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
#!/usr/bin/env python3
import time
import threading

class HelloWorld(threading.Thread):

    def __init__(self, thread_name, msg):
        threading.Thread.__init__(self)
        self.thread_name = thread_name
        self.msg = msg

    def run(self):
        if self.thread_name == 'world':
            time.sleep(3)
        else:
            time.sleep(1)
        print("{thread_name}: hello, {msg}".format(thread_name=self.thread_name, msg=self.msg))

thread_world = HelloWorld(thread_name='world', msg="world!")
thread_boy = HelloWorld(thread_name="boy", msg="boy!")

thread_world.start()
thread_boy.start()


thread_world.join()
thread_boy.join()
print("ending.")

结果为 boy: hello, boy! world: hello, world! ending.

加锁的情况

看下官方对于锁的说明

原始锁是一个在锁定时不属于特定线程的同步基元组件。在Python中,它是能用的最低级的同步基元组件,由 _thread 扩展模块直接实现。

原始锁处于 "锁定" 或者 "非锁定" 两种状态之一。它被创建时为非锁定状态。它有两个基本方法, acquire() 和 release() 。当状态为非锁定时, acquire() 将状态改为 锁定 并立即返回。当状态是锁定时, acquire() 将阻塞至其他线程调用 release() 将其改为非锁定状态,然后 acquire() 调用重置其为锁定状态并返回。 release() 只在锁定状态下调用; 它将状态改为非锁定并立即返回。如果尝试释放一个非锁定的锁,则会引发 RuntimeError 异常。

锁同样支持 上下文管理协议。

当多个线程在 acquire() 等待状态转变为未锁定被阻塞,然后 release() 重置状态为未锁定时,只有一个线程能继续执行;至于哪个等待线程继续执行没有定义,并且会根据实现而不同。

所有方法的执行都是原子性的。

class threading.Lock 实现原始锁对象的类。一旦一个线程获得一个锁,会阻塞随后尝试获得锁的线程,直到它被释放;任何线程都可以释放它。

需要注意的是 Lock 其实是一个工厂函数,返回平台支持的具体锁类中最有效的版本的实例。

acquire(blocking=True, timeout=-1) 可以阻塞或非阻塞地获得锁。

当调用时参数 blocking 设置为 True (缺省值),阻塞直到锁被释放,然后将锁锁定并返回 True 。

在参数 blocking 被设置为 False 的情况下调用,将不会发生阻塞。如果调用时 blocking 设为 True 会阻塞,并立即返回 False ;否则,将锁锁定并返回 True。

当浮点型 timeout 参数被设置为正值调用时,只要无法获得锁,将最多阻塞 timeout 设定的秒数。timeout 参数被设置为 -1 时将无限等待。当 blocking 为 false 时,timeout 指定的值将被忽略。

如果成功获得锁,则返回 True,否则返回 False (例如发生 超时 的时候)。

在 3.2 版更改: 新的 timeout 形参。

在 3.2 版更改: 现在如果底层线程实现支持,则可以通过POSIX上的信号中断锁的获取。

release() 释放一个锁。这个方法可以在任何线程中调用,不单指获得锁的线程。

当锁被锁定,将它重置为未锁定,并返回。如果其他线程正在等待这个锁解锁而被阻塞,只允许其中一个允许。

在未锁定的锁调用时,会引发 RuntimeError 异常。

没有返回值。

locked() 如果获得了锁则返回真值。

写几行代码看看
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
#!/usr/bin/env python3
import time
import threading

my_lock = threading.Lock()

class HelloWorld(threading.Thread):

    def __init__(self, thread_name):
        threading.Thread.__init__(self)
        self.thread_name = thread_name

    def run(self):
        #my_lock.acquire()
        # first with world
        if self.thread_name == 'world':
            #my_lock.release()
            with open('test.org', 'a') as f:
                print("hello, world!")
                f.write("hello,world!")
        if self.thread_name == 'boy':
            with open('test.org', 'a') as f:
                print("hello,boy!")
                f.write("hello,boy!")

thread_world = HelloWorld(thread_name='world')
thread_boy = HelloWorld(thread_name="boy")


thread_world.start()
thread_boy.start()

虽然都在写同一个文件,其实执行顺序是未知的,哪个线程都可能先拿到文件的句柄 想象如下场景

  • 我想把监控数据拿到并写到influxdb里面,但是我需要先拿到所有监控数据, 再执行写Influxdb
  • 我希望第一行必须是我写的world,然后开始其他写入
  • 我希望我的这个资源不能被同时写入,如果我写入较长

这时候在代码里面定义一个锁threading.Lock()(相当于全局的一个开关),在实现函数之前lock.acquire()(这个是被锁的线程) 当我执行完一部分代码或者前置thread完成达到解锁lock的时候,执行lock.release()

所以当把两行反注释掉后,发现,确实先写的是world, 这里只是一个非常简单的例子

threading CONDITION

上面写锁的方法理解不方便,直接用condtion会更明朗

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
#!/usr/bin/env python3
import threading
import time

my_lock = threading.Lock()
condition = threading.Condition(lock=my_lock)

class HelloWorld(threading.Thread):

    def __init__(self, thread_name):
        threading.Thread.__init__(self)
        self.thread_name = thread_name

    def run(self):
        with condition:
            while self.thread_name != 'world':
                condition.wait()
                print("hello, boy!")
            print("hello, world")

            #condition.notify()


thread_boy = HelloWorld(thread_name="boy")
thread_world = HelloWorld(thread_name='world')

thread_boy.start()
thread_world.start()

这个例子可能不是很恰当,在这个condition下,执行满足条件不是world的会被block,notify才会唤醒block的thread 这里官方的例子是用在生产消费模型里面

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
# Consume one item
with cv:
    while not an_item_is_available():
        cv.wait()
    get_an_available_item()

# Produce one item
with cv:
    make_an_item_available()
    cv.notify()

threading Barrier

设置一个栅栏,当满足栅栏条件的时候可以执行一个action,可以为栅栏设置timeout 栅栏可以通过reset重置 wait可以冲出栅栏,当满足冲出栅栏的wait,就会执行对应的action 看一个例子(也可以想一下亡羊补牢的例子) 一个场景一个招财猫,它的任务是hello, 它每天只hello 2次就去睡觉了

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
#!/usr/bin/env python3
import time
import threading

def print_ok():
    print('tired, go to sleep')

barrier = threading.Barrier(parties=2, action=print_ok, timeout=2)

class HelloCat(threading.Thread):

    def __init__(self, thread_name):
        threading.Thread.__init__(self)
        self.thread_name = thread_name

    def run(self):
        print("helloing...")
        print("hello, {name}".format(name=self.thread_name))
        barrier.wait()


thread_boy = HelloCat(thread_name="boy")
thread_world = HelloCat(thread_name='world')

thread_boy.start()
thread_world.start()

执行结果如下: helloing… hello, boy helloing… hello, world tired, go to sleep 所以栅栏对象是一种阈值,当达到这个阈值次数,就会执行一个action 而在我们wait是一个计数器,之前像是一个常态的服务,比如服务接受请求,会一直接受,当栅栏坏的越来越多,最终全坏了, 羊就跑了