Python线程学习笔记

Python线程学习笔记

实现一个线程

◆ 用threading模块代替thread模块

◆ 用threading.Tread创建线程

◆ start()启动线程

◆ join()挂起线程

threading 模块的对象

对象 描述
Thread 表示一个执行线程的对象
Lock 锁原语对象(和thread模块中的锁一样)
RLock 可重入锁对象,使单一线程可以(再次)获得已持有的锁(递归锁)
Condition 条件变量对象,使得一个线程等待另一个线程满足特定的”条件”,比如改变状态或某个数据值
Event 条件变量的通用版本,任意数量的线程等待某个事件的发生,在该事件发生后所有线程将被激活
Semaphore 为线程间共享的有限资源提供了一个”计数器”,如果没有可用资源时会被阻塞
BoundedSemaphore 与Semaphore相似,不过它不允许超过初始值
Timer 与Thread相似,不过它不允许超过初始值
Barrier 创建一个”障碍”,必须达到指定数量的线程后才可以继续

Thread 对象数据属性

属性 描述
name 线程名
ident 线程的标识符
daemon 布尔标志,表示这个线程是否是守护线程

Thread 对象方法

属性 描述
init() 实例化一个进程对象,需要有一个可以调用的target,以及其参数args或kwargs。
start() 开始执行该线程
run() 定义线程功能的方法(通常在子类中被应用开发者重写)
join(timeout=None) 直至启动的线程终止之前一直挂起;除非给出了timeout(秒),否者会一直阻塞
getName() 返回线程名
setName(name) 设定线程名
isAlivel/is_alive() 布尔标志,表示这个线程是否还存活
isDaemon() 如果是守护线程,则返回True;否则,返回False
setDaemon() 把线程的守护标志设定为布尔值daemonic(必须在线程start()之前调用)

代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
import threading
import time


def loop():
""" 新的线程执行的代码 """
now_thread = threading.current_thread()
print('[loop]now thread name: {0}'.format(now_thread.name))
n = 0
while n < 5:
print(n)
time.sleep(1)
n += 1


def use_thread():
""" 使用线程来实现 """
# 当前正在执行的线程名称
now_thread = threading.current_thread()
print('now thread name:{0}'.format(now_thread.name))
#设置线程
t = threading.Thread(target=loop, name='loop_thread')
# 启动线程
t.start()
# 挂起线程
t.join()


if __name__ == '__main__':
use_thread()

输出如下:

1
2
3
4
5
6
7
now thread name:MainThread
[loop]now thread name: loop_thread
0
1
2
3
4

观察上面的代码和其输出可以发现,本来是由主线程进行的运行,经过新设置的线程后,变成由新线程进行运行

类形式代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
import threading
import time


class LoopThread(threading.Thread):
""" 自定义线程 """
n = 0

def run(self):
while self.n < 5:
print(self.n)
now_thread = threading.current_thread()
print('[loop]now thread name: {0}'.format(now_thread.name))
time.sleep(1)
self.n += 1


if __name__ == '__main__':
t = LoopThread(name='loop_thread_oop')
t.start()
t.join()

实现多个线程

代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
import threading

# 我的银行账户
import time

balance = 0


def change_it(n):
""" 改变我的余额 """
global balance
balance = balance + n
time.sleep(2)
balance = balance - n
time.sleep(1)
print('-N---> {0}; balance:{1}'.format(n, balance))


class ChangeBalanceThread(threading.Thread):
"""
改变银行余额的线程
"""
def __init__(self, num, *args, **kwargs):
super().__init__(*args, **kwargs)
self.num = num

def run(self):
for i in range(1000000):
change_it(self.num)


if __name__ == '__main__':
t1 = ChangeBalanceThread(5)
t2 = ChangeBalanceThread(8)
t1.start()
t2.start()
t1.join()
t2.join()
print('the last: {0}'.format(balance))

输出如下:

1
2
3
4
5
6
7
8
9
-N---> 5; balance:0
-N---> 8; balance:5
-N---> 5; balance:0
-N---> 8; balance:5
-N---> 5; balance:0
-N---> 8; balance:5
-N---> 5; balance:0
-N---> 8; balance:5
......

PS: global是全局变量的声明,为了让范围比我大的别人用我的东西,使用的时候要和作用域关联起来。

多线程中的锁实现

◆ Lock()

◆ Rlock()

◆ Condition()

Lock()

代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
import threading
import time

# 获得一把锁
my_lock = threading.Lock()

# 我的银行账户
balance = 0


def change_it(n):
""" 改变我的余额 """
global balance
try:
# 添加锁
my_lock.acquire()
balance = balance + n
time.sleep(2)
balance = balance - n
time.sleep(1)
print('-N---> {0}; balance:{1}'.format(n, balance))
finally:
# 释放掉锁
my_lock.release()


class ChangeBalanceThread(threading.Thread):
"""
改变银行余额的线程
"""
def __init__(self, num, *args, **kwargs):
super().__init__(*args, **kwargs)
self.num = num

def run(self):
for i in range(1000000):
change_it(self.num)


if __name__ == '__main__':
t1 = ChangeBalanceThread(5)
t2 = ChangeBalanceThread(8)
t1.start()
t2.start()
t1.join()
t2.join()
print('the last: {0}'.format(balance))

输出如下:

1
2
3
4
5
6
7
8
-N---> 5; balance:0
-N---> 5; balance:0
-N---> 5; balance:0
-N---> 5; balance:0
-N---> 5; balance:0
-N---> 5; balance:0
-N---> 5; balance:0
......

添加:

1
2
3
4
# 添加锁
my_lock.acquire()
# 资源已经被锁住了,不能重复锁定,产生死锁
my_lock.acquire()

资源已经被锁住了,不能重复锁定,产生死锁

RLock()

代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
import threading
import time

# 获得一把锁
your_lock = threading.RLock()

# 我的银行账户
balance = 0


def change_it(n):
""" 改变我的余额 """
global balance
try:
# 添加锁
your_lock.acquire()
# 资源已经被锁住了,不能重复锁定,产生死锁
your_lock.acquire()
balance = balance + n
time.sleep(2)
balance = balance - n
time.sleep(1)
print('-N---> {0}; balance:{1}'.format(n, balance))
finally:
# 释放掉锁
your_lock.release()
your_lock.release()


class ChangeBalanceThread(threading.Thread):
"""
改变银行余额的线程
"""
def __init__(self, num, *args, **kwargs):
super().__init__(*args, **kwargs)
self.num = num

def run(self):
for i in range(1000000):
change_it(self.num)


if __name__ == '__main__':
t1 = ChangeBalanceThread(5)
t2 = ChangeBalanceThread(8)
t1.start()
t2.start()
t1.join()
t2.join()
print('the last: {0}'.format(balance))

输出如下:

1
2
3
4
5
6
7
8
-N---> 5; balance:0
-N---> 5; balance:0
-N---> 5; balance:0
-N---> 5; balance:0
-N---> 5; balance:0
-N---> 5; balance:0
-N---> 5; balance:0
......

在一个线程中锁住,还能再锁。

对锁使用的优化(自动释放,效率会变慢,但安全性提升):

1
2
3
4
5
6
with your_lock:
balance = balance + n
time.sleep(2)
balance = balance - n
time.sleep(1)
print('-N---> {0}; balance:{1}'.format(n, balance))

线程的调度和优化

我们的电脑和服务器是有限制的,线程是有上限的,不会无限制的暴力使用,所以在这里就需要进行优化,于是就有了内存池的概念。即建立一个池,当其中的线程完成自己的任务后,让其继续完成池子中未完成的任务,而不是直接销毁掉。

下面使用两种线程的调度和优化的方法

使用multiprocessing模块

代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
import time
import threading
from multiprocessing.dummy import Pool


def run(n):
""" 线程要做的事情 """
time.sleep(2)
print(threading.current_thread().name, n)


def main_use_pool():
""" 使用线程池来优化 """
t1 = time.time()
n_list = range(100)
pool = Pool(10)
pool.map(run, n_list)
pool.close()
pool.join()
print(time.time() - t1)


if __name__ == '__main__':
main_use_pool()

输出如下:

1
2
3
4
5
6
7
8
······
Thread-6 91
Thread-3 94
Thread-8 97
Thread-6 92
Thread-3 95
Thread-8 98
24.017276763916016

PS: multiprocessing模块中有dummy.pool()和pool.Pool,前者是给线程用的,后者是给进程用的。

使用ThreadPoolExecutor类

代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
import time
import threading
from concurrent.futures.thread import ThreadPoolExecutor


def run(n):
""" 线程要做的事情 """
time.sleep(2)
print(threading.current_thread().name, n)


def main_use_executor():
""" 使用 ThreadPoolExecutor 来优化"""
t1 = time.time()
n_list = range(100)
with ThreadPoolExecutor(max_workers=10) as executor:
executor.map(run, n_list)
print(time.time() - t1)


if __name__ == '__main__':
main_use_executor()

输出如下:

1
2
3
4
5
6
7
8
9
······
ThreadPoolExecutor-0_4 93
ThreadPoolExecutor-0_1 92
ThreadPoolExecutor-0_6 95
ThreadPoolExecutor-0_3 97
ThreadPoolExecutor-0_8 98
ThreadPoolExecutor-0_5 96
ThreadPoolExecutor-0_7 99
20.01082682609558

对比两种方法的运行的时间,确实ThreadPoolExecutor效率相对比较高一些

Your browser is out-of-date!

Update your browser to view this website correctly. Update my browser now

×