Python进程学习笔记

Python进程学习笔记

进程

◆ 是一个执行中的程序

◆ 每个进程都拥有自己的地址空间、内存、数据栈以及其他用于跟踪执行的辅助数据

◆ 操作系统管理其上所有进程的执行,并为这些进程合理地分配时间

◆ 进成也可通过派生(fork 或 spawn)新的进程来执行其他任务

PS:fork在windows上的支持不是太好,代码只可能在Linux上运行的话,建议用fork。

进程模块

◆ 使用multiprocessing实现多进程代码

◆ 用multiprocessing.Process创建进程

◆ start()启动进程

◆ join()挂起进程

◆ os.getpid()获得进程的ID

进程的实现

函数实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
import os, time
from multiprocessing import Process


def do_sth(name):
"""
进程要做的事情
:param name: str 进程的名称
:return:
"""
print('进程名称:{0}, pid:{1}'.format(name, os.getpid()))
time.sleep(150)
print('进程要做的事情')


if __name__ == '__main__':
p = Process(target=do_sth, args=('my process', ))
# 启动进程
p.start()
# 挂起进程
p.join()

面向对象实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
import os, time
from multiprocessing import Process


class MyProcess(Process):
def __init__(self, name, *args, **kwargs):
super().__init__(*args, **kwargs)
self.my_name = name

def run(self):
print('MyProcess进程的名称:{0},pid: {1}'.format(self.my_name, os.getpid()))
time.sleep(150)
print('MyProcess进程要做的事情')


if __name__ == '__main__':
p = MyProcess('my process class')
# 启动进程
p.start()
# 挂起进程
p.join()

可以在任务管理器中找到对应的进程。

进程之间的通信

◆ 通过Queue、Pipes等实现进程之间的通信

代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
import time
from multiprocessing import Process, Queue, current_process

import random
import time

class WriteProcess(Process):
""" 写的进程 """
def __init__(self, q, *args, **kwargs):
super().__init__(*args, **kwargs)
self.q = q

def run(self):
""" 实现进程的业务逻辑 """
# 要写的内容
ls = [
"第1行内容",
"第2行内容",
"第3行内容",
"第4行内容",
]
for line in ls:
print('写入内容:{0} - {1}'.format(line, current_process().name))
self.q.put(line)
# 每写入一次,休息1-5秒
time.sleep(random.randint(1, 5))


class ReadProcess(Process):
""" 读取内容进程 """
def __init__(self, q, *args, **kwargs):
super().__init__(*args, **kwargs)
self.q = q

def run(self):
while True:
content = self.q.get()
print('读取到的内容:{0} - {1}'.format(content, self.name))


if __name__ == '__main__':
# 通过Queue共享数据
q = Queue()
# 写入内容的进程
t_write = WriteProcess(q)
t_write.start()
# 读取进程启动
t_read = ReadProcess(q)
t_read.start()
t_write.join()
# t_read.join()

# 因为读的进程是死循环,无法等待其结束,只能强制终止
t_read.terminate()

即使用数据对象进行通信

多进程中的锁

◆ Lock()

◆ Rlock()

◆ Condition()

PS:进程的锁是为了防止共享数据产生错误,从而加锁保证每次操作数据只有一个进程,且牺牲了多进程的并行执行效率。

Lock()

代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
import random
import time
from multiprocessing import Process, Lock


class WriteProcess(Process):
""" 写入文件 """

def __init__(self, file_name, num, lock, *args, **kwargs):
# 文件的名称
super().__init__(*args, **kwargs)
self.file_name = file_name
self.num = num
# 锁对象
self.lock = lock

def run(self):
""" 写入文件的主要业务逻辑 """
try:
# 添加锁
self.lock.acquire()
for i in range(5):
content = '现在是:{0} : {1} - {2}\n'.format(
self.name,
self.pid,
self.num
)
with open(self.file_name, 'a+', encoding='utf-8') as f:
f.write(content)
time.sleep(random.randint(1,5))
print(content)
finally:
self.lock.release()

if __name__ == '__main__':
file_name = 'test.txt'
# 锁的对象
lock = Lock()
for x in range(5):
p = WriteProcess(file_name, x, lock)
p.start()

Rlock()

代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
import random
import time
from multiprocessing import Process, RLock


class WriteProcess(Process):
""" 写入文件 """

def __init__(self, file_name, num, rlock, *args, **kwargs):
# 文件的名称
super().__init__(*args, **kwargs)
self.file_name = file_name
self.num = num
# 锁对象
self.rlock = rlock

def run(self):
""" 写入文件的主要业务逻辑 """
try:
# 添加锁
self.rlock.acquire()
print('locked')
self.rlock.acquire()
print('relocked')
for i in range(5):
content = '现在是:{0} : {1} - {2}\n'.format(
self.name,
self.pid,
self.num
)
with open(self.file_name, 'a+', encoding='utf-8') as f:
f.write(content)
time.sleep(random.randint(1,5))
print(content)
finally:
self.rlock.release()
self.rlock.release()


if __name__ == '__main__':
file_name = 'test.txt'
# 锁的对象
rlock = RLock()
for x in range(5):
p = WriteProcess(file_name, x, rlock)
p.start()

RLock可重复锁,而Lock能重复锁,反而会导致死锁,导致程序无法正常运行,同线程的锁,有共同之处。

同样,可以使用with进行锁的添加和释放。

使用进程池

同步添加任务

代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
import random
import time
from multiprocessing import Process, RLock


class WriteProcess(Process):
""" 写入文件 """

def __init__(self, file_name, num, rlock, *args, **kwargs):
# 文件的名称
super().__init__(*args, **kwargs)
self.file_name = file_name
self.num = num
# 锁对象
self.rlock = rlock

def run(self):
""" 写入文件的主要业务逻辑 """
try:
# 添加锁
self.rlock.acquire()
print('locked')
self.rlock.acquire()
print('relocked')
for i in range(5):
content = '现在是:{0} : {1} - {2}\n'.format(
self.name,
self.pid,
self.num
)
with open(self.file_name, 'a+', encoding='utf-8') as f:
f.write(content)
time.sleep(random.randint(1,5))
print(content)
finally:
self.rlock.release()
self.rlock.release()


if __name__ == '__main__':
file_name = 'test.txt'
# 锁的对象
rlock = RLock()
for x in range(5):
p = WriteProcess(file_name, x, rlock)
p.start()

异步添加任务

代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
import random
import time
from multiprocessing import current_process, Pool


def run(file_name, num):
"""
进程执行的业务逻辑
往文件中写入数据
:param file_name: str 文件名称
:param num: int 写入的数字
:return: str 写入的结果
"""
with open(file_name, 'a+', encoding='utf-8') as f:
# 当前的进程
now_process = current_process()
# 写入的内容
content = '{0} - {1} - {2}'.format(
now_process.name,
now_process.pid,
num
)
f.write(content)
f.write('\n')
# 写完之后随机休息1-5秒
time.sleep(random.randint(1, 5))
print(content)
return 'ok'

if __name__ == '__main__':
file_name = 'test_pool.txt'
# 进程池
pool = Pool(2)
rest_list = []
for i in range(20):
# 异步添加任务
rest = pool.apply_async(run, args=(file_name, i))
rest_list.append(rest)
print('{0} --- {1}'.format(i ,rest))
# 关闭池子
pool.close()
pool.join()
# 查看异步执行的结果
print(rest_list[0].get())

PS:如果要用进程池的话,最好是用函数的方法实现,面向对象的方法相对的会比较麻烦。

Your browser is out-of-date!

Update your browser to view this website correctly. Update my browser now

×