进程
◆ 是一个执行中的程序
◆ 每个进程都拥有自己的地址空间、内存、数据栈以及其他用于跟踪执行的辅助数据
◆ 操作系统管理其上所有进程的执行,并为这些进程合理地分配时间
◆ 进成也可通过派生(fork 或 spawn)新的进程来执行其他任务
PS:fork在windows上的支持不是太好,代码只可能在Linux上运行的话,建议用fork。
进程模块
◆ 使用multiprocessing实现多进程代码
◆ 用multiprocessing.Process创建进程
◆ start()启动进程
◆ join()挂起进程
◆ os.getpid()获得进程的ID
进程的实现
函数实现:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| import os, time from multiprocessing import Process
def do_sth(name): """ 进程要做的事情 :param name: str 进程的名称 :return: """ print('进程名称:{0}, pid:{1}'.format(name, os.getpid())) time.sleep(150) print('进程要做的事情')
if __name__ == '__main__': p = Process(target=do_sth, args=('my process', )) p.start() p.join()
|
面向对象实现:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| import os, time from multiprocessing import Process
class MyProcess(Process): def __init__(self, name, *args, **kwargs): super().__init__(*args, **kwargs) self.my_name = name
def run(self): print('MyProcess进程的名称:{0},pid: {1}'.format(self.my_name, os.getpid())) time.sleep(150) print('MyProcess进程要做的事情')
if __name__ == '__main__': p = MyProcess('my process class') p.start() p.join()
|
可以在任务管理器中找到对应的进程。
进程之间的通信
◆ 通过Queue、Pipes等实现进程之间的通信
代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54
| import time from multiprocessing import Process, Queue, current_process
import random import time
class WriteProcess(Process): """ 写的进程 """ def __init__(self, q, *args, **kwargs): super().__init__(*args, **kwargs) self.q = q
def run(self): """ 实现进程的业务逻辑 """ ls = [ "第1行内容", "第2行内容", "第3行内容", "第4行内容", ] for line in ls: print('写入内容:{0} - {1}'.format(line, current_process().name)) self.q.put(line) time.sleep(random.randint(1, 5))
class ReadProcess(Process): """ 读取内容进程 """ def __init__(self, q, *args, **kwargs): super().__init__(*args, **kwargs) self.q = q
def run(self): while True: content = self.q.get() print('读取到的内容:{0} - {1}'.format(content, self.name))
if __name__ == '__main__': q = Queue() t_write = WriteProcess(q) t_write.start() t_read = ReadProcess(q) t_read.start() t_write.join()
t_read.terminate()
|
即使用数据对象进行通信
多进程中的锁
◆ Lock()
◆ Rlock()
◆ Condition()
PS:进程的锁是为了防止共享数据产生错误,从而加锁保证每次操作数据只有一个进程,且牺牲了多进程的并行执行效率。
Lock()
代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41
| import random import time from multiprocessing import Process, Lock
class WriteProcess(Process): """ 写入文件 """
def __init__(self, file_name, num, lock, *args, **kwargs): super().__init__(*args, **kwargs) self.file_name = file_name self.num = num self.lock = lock
def run(self): """ 写入文件的主要业务逻辑 """ try: self.lock.acquire() for i in range(5): content = '现在是:{0} : {1} - {2}\n'.format( self.name, self.pid, self.num ) with open(self.file_name, 'a+', encoding='utf-8') as f: f.write(content) time.sleep(random.randint(1,5)) print(content) finally: self.lock.release()
if __name__ == '__main__': file_name = 'test.txt' lock = Lock() for x in range(5): p = WriteProcess(file_name, x, lock) p.start()
|
Rlock()
代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46
| import random import time from multiprocessing import Process, RLock
class WriteProcess(Process): """ 写入文件 """
def __init__(self, file_name, num, rlock, *args, **kwargs): super().__init__(*args, **kwargs) self.file_name = file_name self.num = num self.rlock = rlock
def run(self): """ 写入文件的主要业务逻辑 """ try: self.rlock.acquire() print('locked') self.rlock.acquire() print('relocked') for i in range(5): content = '现在是:{0} : {1} - {2}\n'.format( self.name, self.pid, self.num ) with open(self.file_name, 'a+', encoding='utf-8') as f: f.write(content) time.sleep(random.randint(1,5)) print(content) finally: self.rlock.release() self.rlock.release()
if __name__ == '__main__': file_name = 'test.txt' rlock = RLock() for x in range(5): p = WriteProcess(file_name, x, rlock) p.start()
|
RLock可重复锁,而Lock能重复锁,反而会导致死锁,导致程序无法正常运行,同线程的锁,有共同之处。
同样,可以使用with进行锁的添加和释放。
使用进程池
同步添加任务
代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46
| import random import time from multiprocessing import Process, RLock
class WriteProcess(Process): """ 写入文件 """
def __init__(self, file_name, num, rlock, *args, **kwargs): super().__init__(*args, **kwargs) self.file_name = file_name self.num = num self.rlock = rlock
def run(self): """ 写入文件的主要业务逻辑 """ try: self.rlock.acquire() print('locked') self.rlock.acquire() print('relocked') for i in range(5): content = '现在是:{0} : {1} - {2}\n'.format( self.name, self.pid, self.num ) with open(self.file_name, 'a+', encoding='utf-8') as f: f.write(content) time.sleep(random.randint(1,5)) print(content) finally: self.rlock.release() self.rlock.release()
if __name__ == '__main__': file_name = 'test.txt' rlock = RLock() for x in range(5): p = WriteProcess(file_name, x, rlock) p.start()
|
异步添加任务
代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44
| import random import time from multiprocessing import current_process, Pool
def run(file_name, num): """ 进程执行的业务逻辑 往文件中写入数据 :param file_name: str 文件名称 :param num: int 写入的数字 :return: str 写入的结果 """ with open(file_name, 'a+', encoding='utf-8') as f: now_process = current_process() content = '{0} - {1} - {2}'.format( now_process.name, now_process.pid, num ) f.write(content) f.write('\n') time.sleep(random.randint(1, 5)) print(content) return 'ok'
if __name__ == '__main__': file_name = 'test_pool.txt' pool = Pool(2) rest_list = [] for i in range(20): rest = pool.apply_async(run, args=(file_name, i)) rest_list.append(rest) print('{0} --- {1}'.format(i ,rest)) pool.close() pool.join() print(rest_list[0].get())
|
PS:如果要用进程池的话,最好是用函数的方法实现,面向对象的方法相对的会比较麻烦。