实现一个线程
◆ 用threading模块代替thread模块
◆ 用threading.Tread创建线程
◆ start()启动线程
◆ join()挂起线程
threading 模块的对象
对象 |
描述 |
Thread |
表示一个执行线程的对象 |
Lock |
锁原语对象(和thread模块中的锁一样) |
RLock |
可重入锁对象,使单一线程可以(再次)获得已持有的锁(递归锁) |
Condition |
条件变量对象,使得一个线程等待另一个线程满足特定的”条件”,比如改变状态或某个数据值 |
Event |
条件变量的通用版本,任意数量的线程等待某个事件的发生,在该事件发生后所有线程将被激活 |
Semaphore |
为线程间共享的有限资源提供了一个”计数器”,如果没有可用资源时会被阻塞 |
BoundedSemaphore |
与Semaphore相似,不过它不允许超过初始值 |
Timer |
与Thread相似,不过它不允许超过初始值 |
Barrier |
创建一个”障碍”,必须达到指定数量的线程后才可以继续 |
Thread 对象数据属性
属性 |
描述 |
name |
线程名 |
ident |
线程的标识符 |
daemon |
布尔标志,表示这个线程是否是守护线程 |
Thread 对象方法
属性 |
描述 |
init() |
实例化一个进程对象,需要有一个可以调用的target,以及其参数args或kwargs。 |
start() |
开始执行该线程 |
run() |
定义线程功能的方法(通常在子类中被应用开发者重写) |
join(timeout=None) |
直至启动的线程终止之前一直挂起;除非给出了timeout(秒),否者会一直阻塞 |
getName() |
返回线程名 |
setName(name) |
设定线程名 |
isAlivel/is_alive() |
布尔标志,表示这个线程是否还存活 |
isDaemon() |
如果是守护线程,则返回True;否则,返回False |
setDaemon() |
把线程的守护标志设定为布尔值daemonic(必须在线程start()之前调用) |
代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
| import threading import time
def loop(): """ 新的线程执行的代码 """ now_thread = threading.current_thread() print('[loop]now thread name: {0}'.format(now_thread.name)) n = 0 while n < 5: print(n) time.sleep(1) n += 1
def use_thread(): """ 使用线程来实现 """ now_thread = threading.current_thread() print('now thread name:{0}'.format(now_thread.name)) t = threading.Thread(target=loop, name='loop_thread') t.start() t.join()
if __name__ == '__main__': use_thread()
|
输出如下:
1 2 3 4 5 6 7
| now thread name:MainThread [loop]now thread name: loop_thread 0 1 2 3 4
|
观察上面的代码和其输出可以发现,本来是由主线程进行的运行,经过新设置的线程后,变成由新线程进行运行
类形式代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| import threading import time
class LoopThread(threading.Thread): """ 自定义线程 """ n = 0 def run(self): while self.n < 5: print(self.n) now_thread = threading.current_thread() print('[loop]now thread name: {0}'.format(now_thread.name)) time.sleep(1) self.n += 1
if __name__ == '__main__': t = LoopThread(name='loop_thread_oop') t.start() t.join()
|
实现多个线程
代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39
| import threading
import time
balance = 0
def change_it(n): """ 改变我的余额 """ global balance balance = balance + n time.sleep(2) balance = balance - n time.sleep(1) print('-N---> {0}; balance:{1}'.format(n, balance))
class ChangeBalanceThread(threading.Thread): """ 改变银行余额的线程 """ def __init__(self, num, *args, **kwargs): super().__init__(*args, **kwargs) self.num = num
def run(self): for i in range(1000000): change_it(self.num)
if __name__ == '__main__': t1 = ChangeBalanceThread(5) t2 = ChangeBalanceThread(8) t1.start() t2.start() t1.join() t2.join() print('the last: {0}'.format(balance))
|
输出如下:
1 2 3 4 5 6 7 8 9
| -N---> 5; balance:0 -N---> 8; balance:5 -N---> 5; balance:0 -N---> 8; balance:5 -N---> 5; balance:0 -N---> 8; balance:5 -N---> 5; balance:0 -N---> 8; balance:5 ......
|
PS: global是全局变量的声明,为了让范围比我大的别人用我的东西,使用的时候要和作用域关联起来。
多线程中的锁实现
◆ Lock()
◆ Rlock()
◆ Condition()
Lock()
代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47
| import threading import time
my_lock = threading.Lock()
balance = 0
def change_it(n): """ 改变我的余额 """ global balance try: my_lock.acquire() balance = balance + n time.sleep(2) balance = balance - n time.sleep(1) print('-N---> {0}; balance:{1}'.format(n, balance)) finally: my_lock.release()
class ChangeBalanceThread(threading.Thread): """ 改变银行余额的线程 """ def __init__(self, num, *args, **kwargs): super().__init__(*args, **kwargs) self.num = num
def run(self): for i in range(1000000): change_it(self.num)
if __name__ == '__main__': t1 = ChangeBalanceThread(5) t2 = ChangeBalanceThread(8) t1.start() t2.start() t1.join() t2.join() print('the last: {0}'.format(balance))
|
输出如下:
1 2 3 4 5 6 7 8
| -N---> 5; balance:0 -N---> 5; balance:0 -N---> 5; balance:0 -N---> 5; balance:0 -N---> 5; balance:0 -N---> 5; balance:0 -N---> 5; balance:0 ......
|
添加:
1 2 3 4
| my_lock.acquire()
my_lock.acquire()
|
资源已经被锁住了,不能重复锁定,产生死锁
RLock()
代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50
| import threading import time
your_lock = threading.RLock()
balance = 0
def change_it(n): """ 改变我的余额 """ global balance try: your_lock.acquire() your_lock.acquire() balance = balance + n time.sleep(2) balance = balance - n time.sleep(1) print('-N---> {0}; balance:{1}'.format(n, balance)) finally: your_lock.release() your_lock.release()
class ChangeBalanceThread(threading.Thread): """ 改变银行余额的线程 """ def __init__(self, num, *args, **kwargs): super().__init__(*args, **kwargs) self.num = num
def run(self): for i in range(1000000): change_it(self.num)
if __name__ == '__main__': t1 = ChangeBalanceThread(5) t2 = ChangeBalanceThread(8) t1.start() t2.start() t1.join() t2.join() print('the last: {0}'.format(balance))
|
输出如下:
1 2 3 4 5 6 7 8
| -N---> 5; balance:0 -N---> 5; balance:0 -N---> 5; balance:0 -N---> 5; balance:0 -N---> 5; balance:0 -N---> 5; balance:0 -N---> 5; balance:0 ......
|
在一个线程中锁住,还能再锁。
对锁使用的优化(自动释放,效率会变慢,但安全性提升):
1 2 3 4 5 6
| with your_lock: balance = balance + n time.sleep(2) balance = balance - n time.sleep(1) print('-N---> {0}; balance:{1}'.format(n, balance))
|
线程的调度和优化
我们的电脑和服务器是有限制的,线程是有上限的,不会无限制的暴力使用,所以在这里就需要进行优化,于是就有了内存池的概念。即建立一个池,当其中的线程完成自己的任务后,让其继续完成池子中未完成的任务,而不是直接销毁掉。
下面使用两种线程的调度和优化
的方法
使用multiprocessing模块
代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
| import time import threading from multiprocessing.dummy import Pool
def run(n): """ 线程要做的事情 """ time.sleep(2) print(threading.current_thread().name, n)
def main_use_pool(): """ 使用线程池来优化 """ t1 = time.time() n_list = range(100) pool = Pool(10) pool.map(run, n_list) pool.close() pool.join() print(time.time() - t1)
if __name__ == '__main__': main_use_pool()
|
输出如下:
1 2 3 4 5 6 7 8
| ······ Thread-6 91 Thread-3 94 Thread-8 97 Thread-6 92 Thread-3 95 Thread-8 98 24.017276763916016
|
PS: multiprocessing模块中有dummy.pool()和pool.Pool,前者是给线程用的,后者是给进程用的。
使用ThreadPoolExecutor类
代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| import time import threading from concurrent.futures.thread import ThreadPoolExecutor
def run(n): """ 线程要做的事情 """ time.sleep(2) print(threading.current_thread().name, n)
def main_use_executor(): """ 使用 ThreadPoolExecutor 来优化""" t1 = time.time() n_list = range(100) with ThreadPoolExecutor(max_workers=10) as executor: executor.map(run, n_list) print(time.time() - t1)
if __name__ == '__main__': main_use_executor()
|
输出如下:
1 2 3 4 5 6 7 8 9
| ······ ThreadPoolExecutor-0_4 93 ThreadPoolExecutor-0_1 92 ThreadPoolExecutor-0_6 95 ThreadPoolExecutor-0_3 97 ThreadPoolExecutor-0_8 98 ThreadPoolExecutor-0_5 96 ThreadPoolExecutor-0_7 99 20.01082682609558
|
对比两种方法的运行的时间,确实ThreadPoolExecutor效率相对比较高一些