并行开发
综述
python并行多任务均依赖操作系统底层服务并行执行python代码
- 线程派生:基本所有主流平台均支持
- 多进程
- shell命令进程
- 子python进程
跨平台多进程实现
- 创建子python进程
- 类Unix:
fork
系统调用实现进程分支,分支进程运行时 环境同主进程完全一致 - Win:创建python进程,
import
当前调用代码得到类似 主进程运行时环境
- 类Unix:
pickle
序列化被调函数,传递给子进程执行
- 因为Win下分支进程需要导入当前调用,所以多进程代码必须 在
__main__
内,否则无限循环创建新进程- 进程池调用函数要声明在进程池创建之前,否则启动进程会报错
进程通信
python进程间数据传递基本都是通过
pickle
序列化传递,所以 需要传递的数据要支持pickle
序列化multiprocessing
等模块派生进程需要传递被调函数,所以 不支持lambda
匿名函数- 绑定对象方法
其他相关模块、包
- 多进程:
pathos
、pp
- 进程通信:
signal
注意事项
- 子进程报错直接死亡,错误信息默认不会输出到任何地方,所以
子进程中,多使用
try catch
os
模块
派生进程
1 | os.startfile(filename) |
os.popen
1 | pipe = os.popen(cmd(str), mode="r"/"w", buffering=-1) |
用途:运行shell命令并重定向其输出、输入流到管道开放
返回值:返回管道流对象
- 类似shell中管道语法
管道流对象类似普通文件流,支持一般读、写、迭代 (但是文档中只有
close
方法)1
2
3
4
5
6
7
8
9
10
11# `r`模式执行,读取子进程标准输出
pipe.readlines()
pipe.readline()
pipe.read()
for i in pipe:
# 迭代器语法
# `w`模式执行,写入子进程标准输入
pipe.write()
pipe.close()
# 返回退出状态
# 历史原因,退出状态为0时返回None
os.popen
一般不会阻塞,但是python执行需要整个命令行程序 完成的语句时仍然会阻塞- 关闭管道对象
- 一次性读取所有输出流
subprocess
模块可以实现与os.system
、os.popen
相同的 效果,使用更复杂,但是对流的连接、使用提供更完善的控制
os.fork
进程分支是构建平行任务的传统做法,是Unix工具集的基本组成部分
- 分支是开始独立程序的直接做法,无论调用程序是否相同
- 分支想法基于复制
- 程序调用分支例行程序时,操作系统会创建在该进程副本
- 和进程并行的运行副本
- 有些系统实现并不真的复制原有程序,以减少资源消耗, 但是新副本会像真实副本一样运行
1 | import os |
返回值:据此区分父、子进程,执行不同任务
- 子进程中返回0
- 父进程中返回子进程ID
os.fork
仅仅是系统代码库中标准进程分支调用简单封装- 和C共用代码库
- 在win下标准python版本中无法运行,和win模型冲突过多
- Cygwin中python可以运行,虽然行为同真正Unix分支不完全 相同,但是足够接近
os.fork
实际复制整个python解释器- 除非python脚本被编译为二进制机器码
os.exec
1 | def execv(path, args=tuple/list) |
os.exec
族的函数会覆盖当前进程- 所以该语句的代码都不会执行
- 不会更改进程号
1
2
3
4
5
6
7
8
9
10
11
12import os
param = 0
while True:
param += 1
newpid = os.fork()
if newpid == 0:
os.execlp("python", "child.py", str(param))
assert False, "starting new program error"
# 上句正常执行,这句永远无法被调用
else:
print("child is", pid)
if input() == "q": break
multiprocessing
模块的进程派生模型+os.exec
配合使用 可以在win下实现类似os.fork
的效果
spawn
1 | os.spawn |
进程通信
1 | read_fd, write_fd = os.pipe() |
subprocess
模块
Popen
1 | class Popen: |
用途
参数
args
:需要执行的命令executable
:备选执行命令stdin
/stdout
/stderr
:执行程序标准输入、输出、 错误流连接对象- 默认:当前进程标准输入、输出、错误
subprocess.PIPE=-1
:当前管道对象标准…
preexec_fn
:子进程执行前在子进程中调用的对象- POSIX only
close_fds
:控制关闭、继承文件描述符shell
:是否通过shell执行命令- 执行shell内置命令则必须置
True
- win:
type
- linux:
set
- win:
- Linux下
False
:由os.execvp
运行
- 执行shell内置命令则必须置
cwd
:子进程执行目录env
:子进程环境变量universal_newlines
:是否在3个标准流中使用行结尾符- 即是否按照文本处理3个标准流
startupinfo
:windows onlyrestore_signals
:POSIX onlystart_new_session
:POSIX onlypass_fds
:POSIX only
.communicate
1 | (stdout, stderr) = communicate(self, input=None, timeout=None) |
- 用途:和子进程交互,阻塞直到子进程终止
input
传递给子进程标准输入- 返回标准输出、错误输出
universal_newlines
为True
时,input
参数、返回值 应该为str
,否则为bytes
其他方法
1 | def kill(self): |
Popen
创建对象对象之后会立刻执行同时指定
stdout
、stdin
参数可以实现管道双工工作- 需要注意,读写时交替发送缓冲数据流可能导致死锁
call
1 | subprocess.call( |
_thread
- 为系统平台上的各种线程系统提供了可移植的接口
- 在安装了pthreads POSIX线程功能的系统上,接口工作方式 一致,无需修改源码即可正常工作
- 基本可以完全被
threading
模块替代了
start_new_thread
1 | def start_new_thread( |
用途:开启新线程,以参数调用
callable
返回值:应该是线程起始地址
派生线程在函数返回后退出
- 若在线程中函数抛出未捕捉异常,打印堆栈跟踪记录、退出 线程
- 程序其他部分继续运行
大多数系统平台上,整个程序主线程退出时,子线程随之退出
- 需要一些处理避免子线程意外退出
其他方法
1 | Lock = _thread.alloacate_lock() |
例子
例1
- 全局线程锁保护对输出流写入
- 全局线程锁实现主线程、子线程间通信,保证主线程在子线程 之后退出
1 | import _thread as thread |
例2
with
上下文管理器使用锁- 全局变量实现主线程、子线程通信,避免主线程在子线程之前 退出
1 | import _thread as thread |
threading
Thread
1 | class Thread: |
用途:可控线程类,有两种方法使用
- 传递callable参数创建新对象
- 继承、覆盖
run
方法:代码和Thread
深耦合,可能 不方便代码复用,如multiprocessing
模块
参数
group
:保留参数用于未来扩展target
:可执行对象,将被run
invokename
:线程名,缺省Thread-N
args
:用于invoketarget
参数tuplekwargs
:用于invoketarget
keyword参数dictdaemon
:是否是守护线程- 默认情况下,主进程(线程)会等待子进程、线程退出 后退出
- 主进程(线程)不等待守护进程、线程退出后再退出
- 注意:主进程退出之前,守护进程、线程会自动终止
- 若衍生类覆盖此构造器方法,务必首先调用此方法
.run
1 | def run(self): |
- 用途:代表线程活动
- 原
run
用于invoketarget
- 覆盖此方法设置线程活动
- 原
.start
1 | def start(self): |
- 用途:开始线程活动
- 线程创建完成后不会立即执行,需要手动调用
.start
启动 - 多次调用
raise RuntimeError
- 线程创建完成后不会立即执行,需要手动调用
.join
1 | def join(self, |
用途:等待直到线程结束
- join:将线程加入当前线程
- 可以多次调用
- 试图导致死锁时,将会
raise RuntimeError
参数
timeout
:指定超时时间,单位秒- 缺省否则阻塞直到线程结束
其他方法
1 | bool = is_alive(self): |
Event
1 | class Event(): |
用途
- 发送信号:
is_set
触发事件 - 接收信号:
wait
阻塞直到事件发生
- 发送信号:
Event
中包含信标,可在线程中设置、接收,实现线程 间同步Event
对象信标默认设置为False
,等待Event对象线程 会阻塞直到信标设置为真若有线程设置信标为真,则将唤醒所有等待该
Event
对象线程
- 若只想唤醒单个线程,用信号量、
Condition
代替
.clear
.clear
可以重置Event
对象
难以确保安全清理、重新赋值
Event
对象,可能导致错过事件 、死锁且无法保证重置
Event
对象的代码能在线程再次等待此Event
信号之前执行所以
Event
对象最好单次使用,即其信标设置为真应立刻丢弃
- 若线程需不停重复使用
Event
对象,使用Condition
代替
Condition
1 | class Condition(): |
用途:
Condition
对象wait
等待信号、notify
唤醒一定 数量线程实现线程同步说明
- 以上所有方法执行前均需要已获得底层锁,否则
raise RuntimeError
- 因此以上方法一般都需要放在
with
代码块中,保证已经 获取了内部锁
- 以上所有方法执行前均需要已获得底层锁,否则
with
上下文管理器
1 | with c: |
with
进入:获取condition底层锁,保证调用方法前已经获得 底层锁with
退出:释放condition底层锁
Condition
支持with
上下文管理器,而且非常必须,- 在
help
里面看不到.acquire
、.release
方法,但是是有 而且可以调用的,应该是官方不建议使用
.wait
用途
- 方法先释放底层锁,阻塞,使得其他等待 获取此对象底层锁获得锁
- 等待被
notify
唤醒,再次获取锁,继续执行
底层锁是
RLock
时.wait
不是调用其.release()
方法,而是调用RLock
内部方法确保真正释放锁,即使RLock
被递归的获取多次再次获取锁时,调用另一个内部接口恢复递归层次,即
RLock
内部计数RLock
本身性质:在没有被持有时,其内部计数被重置 为1,其他线程可以自由获取
.notify
.notify
并不释放condition底层锁- 只是控制能够获取底层锁的线程数量
Semaphore
1 | class Semaphore(builtins.object): |
用途:
Semaphore
对象release
方法生产、acquire
方法 消耗信号量,实现线程通信- 可以像标准锁一样使用信号量作线程同步,但会增加复杂性 影响性能
- 更适用于需要在线程间引入信号、限制的程序,如限制代码 的并发访问量
- 信号量对象是建立在共享计数器基础上的同步原语
.acquire
用途:获取信号量,内部计数(许可证)大于0则立刻减1
- 内部计数
>0
,-1
立即返回True
- 内部计数
=0
,阻塞、等待,直到其他线程调用release
- 内部计数
返回值:成功获取许可证则返回
True
,否则返回False
.release
- 用途:释放信号量,内部计数(许可证)加1
- 内部计数
=0
,表明有线程阻塞,随机唤醒线程
- 内部计数
BoundedSemaphore
1 | class BoundedSemaphore(Semaphore): |
Lock
Threading.Lock
等价于_thread.allocate_lock
,二者 都是工厂方法,返回lock
类的实例
1 | class lock: |
用途:同步原语,排他性使用某资源
说明
- 支持
with
上下文语法,代码块执行前自动获取锁,执行 结束后自动释放 - 为了避免出现死锁,每个线程应该一次只允许获取一个锁, 否则应该使用更高级死锁避免机制
- 适合简单的锁定可变对象
- 支持
lock
对象应该是C实现,里面的方法是没有self
参数的
其他
Lock
锁可以视为使用更加方便的全局变量
可以用于线程之间的通信:给每个子线程分配单独一个锁, 主线程、子线程可以通过锁状态通信
很大程度上可以被全局变量“替换”
获得锁:不断检查全局变量状态,阻塞直到全局变量 状态代表锁可获得,修改全局变量状态代表锁被获取
释放锁:修改全局变量状态代表锁可获得
- 不断检查变量状态会无意义占用CPU时间,可以在检查
间隙使用
time.sleep()
暂停线程
RLock
- 工厂方法,返回
RLock
实例
1 | class RLock: |
用途:可重入锁,可以被同一线程多次获取
- 若锁被当前线程获取多次,则需要被释放同样次数才能被 其他线程获取
- 即只有内部计数器回到初始状态才能被任意线程获取
没有线程持有锁时,
RLock
内部计数被重置为1- 应该是
RLock
就是通过内部计数记录被获取次数
- 应该是
常用于给类整体上锁
- 类内部每个方法都获取锁
- 类内部方法之间相互调用
- 这样类实例方法每次只能有一个线程完整调用
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17import Threading
class SharedCounter:
_lock = threading.RLock()
# 被所有实例共享的类级锁
# 需要大量使用计数器时,内存效率更高
def __init__(self, intial_value=0):
self._value = initial_value
def incr(self, delta=1):
with ShareCounter:
self._value += delta
def decr(self, deta=1):
with SharedCounter:
# 获取锁之后,调用也需要获取锁的`.incr`方法
self.incr(-delta)
local
- 用途:创建本地线程存储对象,该对象属性保存、读取操作只对
当前线程可见
- 可以用于保存当前运行线程状态,隔离不同线程间数据
- 锁
- 套接字对象
- 可以用于保存当前运行线程状态,隔离不同线程间数据
例子
提前终止线程
轮询方法
- 将函数封装在类中,在类中设置成员变量作为轮询点
- 方法
terminate
改变轮询点状态,用于外部调用结束线程 - 线程方法
run
检查轮询点状态判断是否结束自身- 线程执行类似IO的阻塞操作时,无法返回自身、无法检查 轮询点,通过轮询终止线程难以协调
1 | from threading import Thread |
超时循环
- 设置任务超时,超时自动返回
- 任务只要超时就会返回,不会出现线程阻塞
1 | class IOTask: |
线程同步、通信
Event
方式
1 | from threading import Event, Threading |
Condition
实现queue.Queue
- 自定义数据类型,封装
Condition
实例进行线程间同步 put
方法生产,notify
通知消费者get
方法消费,阻塞直到被唤醒
1 | import heapq |
防死锁机制
严格升序使用锁
local
保存当前线程状态,隔离不同线程锁数据
1 | from threading import local |
1 | def thread_1(x_lock, y_lock): |
queue
模块
Queue
1 | class Queue(builtins.object): |
queue.Queue
:创建被多个线程共享的Queue
对象
线程安全的数据交换方式,基于
collections.deque
Queue
对象已经包含必要的锁,可以在多个线程间安全的 共享数据Queue
实际上是在线程间传递对象引用,不会复制 数据项,如果担心对象共享状态,可以传递不可修改数据 结构、深拷贝
- 还可以使用
Condition
变量包装数据结构,实现线程线程中 间通信
get
1 | obj = get(self, |
用途:从队列中移除、返回一个元素
参数
block
False
:没有空闲slot立即raise Empty exception
.task_done
1 | def task_done(self): |
- 说明
.join
阻塞时,要所有队列中元素都被告知task_done
, 才能解除阻塞- 即队列消费者每次
get
元素、处理后,要手动调用task_done
告知队列任务已完成
- 也可以将
Event
和队列元素封装,用Event
对象告知队列元素 处理完成
.put
1 | def put(self, |
用途:向队列中追加元素
参数
block
False
:没有空闲slot立即raise Full exception
.qsize
1 | int = qsize(self): |
例子
队列元素消费通知
Event
进队列
1 | from queue import Queue |
协调生产、消费线程终止
队列中添加特殊值
1 | _sentinel = object() |
Queue
实现线程池
1 | from socket import socket, AF_INET, SOCK_STREAM |
multiprocessing
模块
multiprocessing
支持一个基本与平台无关的进程派生模型, 在Unix、Windows下均可正常工作实现方式
- 启动一个新的Python进程,
import
当前模块 pickle
序列化需要调用的函数,传递至新进程执行
- 启动一个新的Python进程,
- 其中
Pool
、Queue
、Pipe
等实际都是其封装其子模块类 的工厂方法
Process
1 | class Process(): |
用途:
Multiprocessing
核心,类似于Thread
,实现多进程 创建、启动、关闭成员方法基本类似
Thread
1 | from multiprocessing import Process |
Pool
1 | class Pool: |
- 用途:创建管理进程池,提供指定数量进程供用户调用
- 新请求提交到pool中时
- 若进程池没有满,将创建新进程执行请求
- 否则请求等待直到池中有进程结束,然后创建新进程
- 适合子进程多且需要控制子进程数量时
- 新请求提交到pool中时
apply_async
1 | def apply(self, func, args=(), kwds={}): |
- 返回值:异步非阻塞调用返回结果操作句柄
ApplyResult
- 回调函数要有返回值,否则
ApplyResult.ready()=False
, 回调函数永远无法完成
ApplyResult
1 | class ApplyResult: |
map_async
1 | def map(self, func, iterable, chunksize=None): |
终止
1 | def close(self): |
Queue
1 | class SimpleQueue(): |
用途:进程安全队列
multiprocessing.Queue
基于multiprocessing.Pipe
构建数据传递时不是直接写入
Pipe
,而是写入本地buffer, 通过feeder线程写入底层Pipe
,从而实现超时控制、 非阻塞put/get
所以提供了
.join_thread
、cancel_join_thread
、close
函数控制feeder流行为相当于线程安全的
queue.Queue
的多进程克隆版
multiprocessing.SimpleQueue
:简化队列- 没有
Queue
中的buffer,没有使用Queue
可能有的问题, 但是put/get
方法都是阻塞的、没有超时控制
- 没有
Pipe
mltiprocessing.Pipe()
返回两个用管道相连的、读写双工Connection
对象
1 | class Connection(): |
对象会在每条信息前添加标志字节串,可以自动处理多条信息 堆叠
- 可以通过
os.read(connect.fileno())
获取
- 可以通过
共享内存
1 | class SynchronizedBase: |
Value
1 | def Value(typecode_or_type, *args, lock=True): |
Array
1 | def Array(typecode_or_type, size_or_initializer, *, |
Manager
常与Pool
模块一起使用,共享资源,不能使用Queue
、Array
其他方法
1 | import multiprocessing as mltp |
concurrent.futures
- 异步并发模块
ThreadPoolExecutor
1 | class ThreadPoolExecutor: |
- 用途:创建线程池
.shutdown
用途:关闭执行器,清理和该执行器相关的资源
- 可以多次调用,调用之后不能进行其他操作
参数
wait
:阻塞,直到所有运行futures执行完毕,所有 资源被释放
ProcessPoolExecutor
1 | class ProcessPoolExecutor: |
用途:创建进程池
- 参考
ThreadPoolExecutor
,.map
方法支持chunksize
参数
- 参考
常使用
with
语句使用1
with ProcessPoolExecutor() as Pool:
- 处理池执行完
with
语句块中后,处理池被关闭 - 程序会一直等待直到所有提交工作被完成
- 处理池执行完
注意事项
被提交的任务必须是简单函数形式,不支持方法、闭包和 其他类型可执行
函数参数、返回值必须兼容
pickle
模块,因为进程间通信 交换数据使用其序列化函数不要修改环境
- 被提交的任务函数出打印日志之类等简单行为外,不应该 有保留状态、副作用
混合使用进程池、多线程时
- 创建任何线程之前先创建、激活进程池
- 然后线程使用同样的进程池进行计算密集型工作
Future
1 | class Future: |
- 用途:代表一个异步计算的结果
- 直接创建对象无价值