并行开发
综述
python并行多任务均依赖操作系统底层服务并行执行python代码
- 线程派生:基本所有主流平台均支持
- 多进程
- shell命令进程
- 子python进程
跨平台多进程实现
- 创建子python进程
- 类Unix:
fork系统调用实现进程分支,分支进程运行时 环境同主进程完全一致 - Win:创建python进程,
import当前调用代码得到类似 主进程运行时环境
- 类Unix:
pickle序列化被调函数,传递给子进程执行
- 因为Win下分支进程需要导入当前调用,所以多进程代码必须 在
__main__内,否则无限循环创建新进程- 进程池调用函数要声明在进程池创建之前,否则启动进程会报错
进程通信
python进程间数据传递基本都是通过
pickle序列化传递,所以 需要传递的数据要支持pickle序列化multiprocessing等模块派生进程需要传递被调函数,所以 不支持lambda匿名函数- 绑定对象方法
其他相关模块、包
- 多进程:
pathos、pp - 进程通信:
signal
注意事项
- 子进程报错直接死亡,错误信息默认不会输出到任何地方,所以
子进程中,多使用
try catch
os模块
派生进程
1 | os.startfile(filename) |
os.popen
1 | pipe = os.popen(cmd(str), mode="r"/"w", buffering=-1) |
用途:运行shell命令并重定向其输出、输入流到管道开放
返回值:返回管道流对象
- 类似shell中管道语法
管道流对象类似普通文件流,支持一般读、写、迭代 (但是文档中只有
close方法)1
2
3
4
5
6
7
8
9
10
11# `r`模式执行,读取子进程标准输出
pipe.readlines()
pipe.readline()
pipe.read()
for i in pipe:
# 迭代器语法
# `w`模式执行,写入子进程标准输入
pipe.write()
pipe.close()
# 返回退出状态
# 历史原因,退出状态为0时返回None
os.popen一般不会阻塞,但是python执行需要整个命令行程序 完成的语句时仍然会阻塞- 关闭管道对象
- 一次性读取所有输出流
subprocess模块可以实现与os.system、os.popen相同的 效果,使用更复杂,但是对流的连接、使用提供更完善的控制
os.fork
进程分支是构建平行任务的传统做法,是Unix工具集的基本组成部分
- 分支是开始独立程序的直接做法,无论调用程序是否相同
- 分支想法基于复制
- 程序调用分支例行程序时,操作系统会创建在该进程副本
- 和进程并行的运行副本
- 有些系统实现并不真的复制原有程序,以减少资源消耗, 但是新副本会像真实副本一样运行
1 | import os |
返回值:据此区分父、子进程,执行不同任务
- 子进程中返回0
- 父进程中返回子进程ID
os.fork仅仅是系统代码库中标准进程分支调用简单封装- 和C共用代码库
- 在win下标准python版本中无法运行,和win模型冲突过多
- Cygwin中python可以运行,虽然行为同真正Unix分支不完全 相同,但是足够接近
os.fork实际复制整个python解释器- 除非python脚本被编译为二进制机器码
os.exec
1 | def execv(path, args=tuple/list) |
os.exec族的函数会覆盖当前进程- 所以该语句的代码都不会执行
- 不会更改进程号
1
2
3
4
5
6
7
8
9
10
11
12import os
param = 0
while True:
param += 1
newpid = os.fork()
if newpid == 0:
os.execlp("python", "child.py", str(param))
assert False, "starting new program error"
# 上句正常执行,这句永远无法被调用
else:
print("child is", pid)
if input() == "q": break
multiprocessing模块的进程派生模型+os.exec配合使用 可以在win下实现类似os.fork的效果
spawn
1 | os.spawn |
进程通信
1 | read_fd, write_fd = os.pipe() |
subprocess模块
Popen
1 | class Popen: |
用途
参数
args:需要执行的命令executable:备选执行命令stdin/stdout/stderr:执行程序标准输入、输出、 错误流连接对象- 默认:当前进程标准输入、输出、错误
subprocess.PIPE=-1:当前管道对象标准…
preexec_fn:子进程执行前在子进程中调用的对象- POSIX only
close_fds:控制关闭、继承文件描述符shell:是否通过shell执行命令- 执行shell内置命令则必须置
True- win:
type - linux:
set
- win:
- Linux下
False:由os.execvp运行
- 执行shell内置命令则必须置
cwd:子进程执行目录env:子进程环境变量universal_newlines:是否在3个标准流中使用行结尾符- 即是否按照文本处理3个标准流
startupinfo:windows onlyrestore_signals:POSIX onlystart_new_session:POSIX onlypass_fds:POSIX only
.communicate
1 | (stdout, stderr) = communicate(self, input=None, timeout=None) |
- 用途:和子进程交互,阻塞直到子进程终止
input传递给子进程标准输入- 返回标准输出、错误输出
universal_newlines为True时,input参数、返回值 应该为str,否则为bytes
其他方法
1 | def kill(self): |
Popen创建对象对象之后会立刻执行同时指定
stdout、stdin参数可以实现管道双工工作- 需要注意,读写时交替发送缓冲数据流可能导致死锁
call
1 | subprocess.call( |
_thread
- 为系统平台上的各种线程系统提供了可移植的接口
- 在安装了pthreads POSIX线程功能的系统上,接口工作方式 一致,无需修改源码即可正常工作
- 基本可以完全被
threading模块替代了
start_new_thread
1 | def start_new_thread( |
用途:开启新线程,以参数调用
callable返回值:应该是线程起始地址
派生线程在函数返回后退出
- 若在线程中函数抛出未捕捉异常,打印堆栈跟踪记录、退出 线程
- 程序其他部分继续运行
大多数系统平台上,整个程序主线程退出时,子线程随之退出
- 需要一些处理避免子线程意外退出
其他方法
1 | Lock = _thread.alloacate_lock() |
例子
例1
- 全局线程锁保护对输出流写入
- 全局线程锁实现主线程、子线程间通信,保证主线程在子线程 之后退出
1 | import _thread as thread |
例2
with上下文管理器使用锁- 全局变量实现主线程、子线程通信,避免主线程在子线程之前 退出
1 | import _thread as thread |
threading
Thread
1 | class Thread: |
用途:可控线程类,有两种方法使用
- 传递callable参数创建新对象
- 继承、覆盖
run方法:代码和Thread深耦合,可能 不方便代码复用,如multiprocessing模块
参数
group:保留参数用于未来扩展target:可执行对象,将被runinvokename:线程名,缺省Thread-Nargs:用于invoketarget参数tuplekwargs:用于invoketargetkeyword参数dictdaemon:是否是守护线程- 默认情况下,主进程(线程)会等待子进程、线程退出 后退出
- 主进程(线程)不等待守护进程、线程退出后再退出
- 注意:主进程退出之前,守护进程、线程会自动终止
- 若衍生类覆盖此构造器方法,务必首先调用此方法
.run
1 | def run(self): |
- 用途:代表线程活动
- 原
run用于invoketarget - 覆盖此方法设置线程活动
- 原
.start
1 | def start(self): |
- 用途:开始线程活动
- 线程创建完成后不会立即执行,需要手动调用
.start启动 - 多次调用
raise RuntimeError
- 线程创建完成后不会立即执行,需要手动调用
.join
1 | def join(self, |
用途:等待直到线程结束
- join:将线程加入当前线程
- 可以多次调用
- 试图导致死锁时,将会
raise RuntimeError
参数
timeout:指定超时时间,单位秒- 缺省否则阻塞直到线程结束
其他方法
1 | bool = is_alive(self): |
Event
1 | class Event(): |
用途
- 发送信号:
is_set触发事件 - 接收信号:
wait阻塞直到事件发生
- 发送信号:
Event中包含信标,可在线程中设置、接收,实现线程 间同步Event对象信标默认设置为False,等待Event对象线程 会阻塞直到信标设置为真若有线程设置信标为真,则将唤醒所有等待该
Event对象线程
- 若只想唤醒单个线程,用信号量、
Condition代替
.clear
.clear可以重置Event对象
难以确保安全清理、重新赋值
Event对象,可能导致错过事件 、死锁且无法保证重置
Event对象的代码能在线程再次等待此Event信号之前执行所以
Event对象最好单次使用,即其信标设置为真应立刻丢弃
- 若线程需不停重复使用
Event对象,使用Condition代替
Condition
1 | class Condition(): |
用途:
Condition对象wait等待信号、notify唤醒一定 数量线程实现线程同步说明
- 以上所有方法执行前均需要已获得底层锁,否则
raise RuntimeError - 因此以上方法一般都需要放在
with代码块中,保证已经 获取了内部锁
- 以上所有方法执行前均需要已获得底层锁,否则
with上下文管理器
1 | with c: |
with进入:获取condition底层锁,保证调用方法前已经获得 底层锁with退出:释放condition底层锁
Condition支持with上下文管理器,而且非常必须,- 在
help里面看不到.acquire、.release方法,但是是有 而且可以调用的,应该是官方不建议使用
.wait
用途
- 方法先释放底层锁,阻塞,使得其他等待 获取此对象底层锁获得锁
- 等待被
notify唤醒,再次获取锁,继续执行
底层锁是
RLock时.wait不是调用其.release()方法,而是调用RLock内部方法确保真正释放锁,即使RLock被递归的获取多次再次获取锁时,调用另一个内部接口恢复递归层次,即
RLock内部计数RLock本身性质:在没有被持有时,其内部计数被重置 为1,其他线程可以自由获取
.notify
.notify并不释放condition底层锁- 只是控制能够获取底层锁的线程数量
Semaphore
1 | class Semaphore(builtins.object): |
用途:
Semaphore对象release方法生产、acquire方法 消耗信号量,实现线程通信- 可以像标准锁一样使用信号量作线程同步,但会增加复杂性 影响性能
- 更适用于需要在线程间引入信号、限制的程序,如限制代码 的并发访问量
- 信号量对象是建立在共享计数器基础上的同步原语
.acquire
用途:获取信号量,内部计数(许可证)大于0则立刻减1
- 内部计数
>0,-1立即返回True - 内部计数
=0,阻塞、等待,直到其他线程调用release
- 内部计数
返回值:成功获取许可证则返回
True,否则返回False
.release
- 用途:释放信号量,内部计数(许可证)加1
- 内部计数
=0,表明有线程阻塞,随机唤醒线程
- 内部计数
BoundedSemaphore
1 | class BoundedSemaphore(Semaphore): |
Lock
Threading.Lock等价于_thread.allocate_lock,二者 都是工厂方法,返回lock类的实例
1 | class lock: |
用途:同步原语,排他性使用某资源
说明
- 支持
with上下文语法,代码块执行前自动获取锁,执行 结束后自动释放 - 为了避免出现死锁,每个线程应该一次只允许获取一个锁, 否则应该使用更高级死锁避免机制
- 适合简单的锁定可变对象
- 支持
lock对象应该是C实现,里面的方法是没有self参数的
其他
Lock锁可以视为使用更加方便的全局变量
可以用于线程之间的通信:给每个子线程分配单独一个锁, 主线程、子线程可以通过锁状态通信
很大程度上可以被全局变量“替换”
获得锁:不断检查全局变量状态,阻塞直到全局变量 状态代表锁可获得,修改全局变量状态代表锁被获取
释放锁:修改全局变量状态代表锁可获得
- 不断检查变量状态会无意义占用CPU时间,可以在检查
间隙使用
time.sleep()暂停线程
RLock
- 工厂方法,返回
RLock实例
1 | class RLock: |
用途:可重入锁,可以被同一线程多次获取
- 若锁被当前线程获取多次,则需要被释放同样次数才能被 其他线程获取
- 即只有内部计数器回到初始状态才能被任意线程获取
没有线程持有锁时,
RLock内部计数被重置为1- 应该是
RLock就是通过内部计数记录被获取次数
- 应该是
常用于给类整体上锁
- 类内部每个方法都获取锁
- 类内部方法之间相互调用
- 这样类实例方法每次只能有一个线程完整调用
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17import Threading
class SharedCounter:
_lock = threading.RLock()
# 被所有实例共享的类级锁
# 需要大量使用计数器时,内存效率更高
def __init__(self, intial_value=0):
self._value = initial_value
def incr(self, delta=1):
with ShareCounter:
self._value += delta
def decr(self, deta=1):
with SharedCounter:
# 获取锁之后,调用也需要获取锁的`.incr`方法
self.incr(-delta)
local
- 用途:创建本地线程存储对象,该对象属性保存、读取操作只对
当前线程可见
- 可以用于保存当前运行线程状态,隔离不同线程间数据
- 锁
- 套接字对象
- 可以用于保存当前运行线程状态,隔离不同线程间数据
例子
提前终止线程
轮询方法
- 将函数封装在类中,在类中设置成员变量作为轮询点
- 方法
terminate改变轮询点状态,用于外部调用结束线程 - 线程方法
run检查轮询点状态判断是否结束自身- 线程执行类似IO的阻塞操作时,无法返回自身、无法检查 轮询点,通过轮询终止线程难以协调
1 | from threading import Thread |
超时循环
- 设置任务超时,超时自动返回
- 任务只要超时就会返回,不会出现线程阻塞
1 | class IOTask: |
线程同步、通信
Event方式
1 | from threading import Event, Threading |
Condition实现queue.Queue
- 自定义数据类型,封装
Condition实例进行线程间同步 put方法生产,notify通知消费者get方法消费,阻塞直到被唤醒
1 | import heapq |
防死锁机制
严格升序使用锁
local保存当前线程状态,隔离不同线程锁数据
1 | from threading import local |
1 | def thread_1(x_lock, y_lock): |
queue模块
Queue
1 | class Queue(builtins.object): |
queue.Queue:创建被多个线程共享的Queue对象
线程安全的数据交换方式,基于
collections.dequeQueue对象已经包含必要的锁,可以在多个线程间安全的 共享数据Queue实际上是在线程间传递对象引用,不会复制 数据项,如果担心对象共享状态,可以传递不可修改数据 结构、深拷贝
- 还可以使用
Condition变量包装数据结构,实现线程线程中 间通信
get
1 | obj = get(self, |
用途:从队列中移除、返回一个元素
参数
blockFalse:没有空闲slot立即raise Empty exception
.task_done
1 | def task_done(self): |
- 说明
.join阻塞时,要所有队列中元素都被告知task_done, 才能解除阻塞- 即队列消费者每次
get元素、处理后,要手动调用task_done告知队列任务已完成
- 也可以将
Event和队列元素封装,用Event对象告知队列元素 处理完成
.put
1 | def put(self, |
用途:向队列中追加元素
参数
blockFalse:没有空闲slot立即raise Full exception
.qsize
1 | int = qsize(self): |
例子
队列元素消费通知
Event进队列
1 | from queue import Queue |
协调生产、消费线程终止
队列中添加特殊值
1 | _sentinel = object() |
Queue实现线程池
1 | from socket import socket, AF_INET, SOCK_STREAM |
multiprocessing模块
multiprocessing支持一个基本与平台无关的进程派生模型, 在Unix、Windows下均可正常工作实现方式
- 启动一个新的Python进程,
import当前模块 pickle序列化需要调用的函数,传递至新进程执行
- 启动一个新的Python进程,
- 其中
Pool、Queue、Pipe等实际都是其封装其子模块类 的工厂方法
Process
1 | class Process(): |
用途:
Multiprocessing核心,类似于Thread,实现多进程 创建、启动、关闭成员方法基本类似
Thread
1 | from multiprocessing import Process |
Pool
1 | class Pool: |
- 用途:创建管理进程池,提供指定数量进程供用户调用
- 新请求提交到pool中时
- 若进程池没有满,将创建新进程执行请求
- 否则请求等待直到池中有进程结束,然后创建新进程
- 适合子进程多且需要控制子进程数量时
- 新请求提交到pool中时
apply_async
1 | def apply(self, func, args=(), kwds={}): |
- 返回值:异步非阻塞调用返回结果操作句柄
ApplyResult - 回调函数要有返回值,否则
ApplyResult.ready()=False, 回调函数永远无法完成
ApplyResult
1 | class ApplyResult: |
map_async
1 | def map(self, func, iterable, chunksize=None): |
终止
1 | def close(self): |
Queue
1 | class SimpleQueue(): |
用途:进程安全队列
multiprocessing.Queue基于multiprocessing.Pipe构建数据传递时不是直接写入
Pipe,而是写入本地buffer, 通过feeder线程写入底层Pipe,从而实现超时控制、 非阻塞put/get所以提供了
.join_thread、cancel_join_thread、close函数控制feeder流行为相当于线程安全的
queue.Queue的多进程克隆版
multiprocessing.SimpleQueue:简化队列- 没有
Queue中的buffer,没有使用Queue可能有的问题, 但是put/get方法都是阻塞的、没有超时控制
- 没有
Pipe
mltiprocessing.Pipe()返回两个用管道相连的、读写双工Connection对象
1 | class Connection(): |
对象会在每条信息前添加标志字节串,可以自动处理多条信息 堆叠
- 可以通过
os.read(connect.fileno())获取
- 可以通过
共享内存
1 | class SynchronizedBase: |
Value
1 | def Value(typecode_or_type, *args, lock=True): |
Array
1 | def Array(typecode_or_type, size_or_initializer, *, |
Manager
常与Pool模块一起使用,共享资源,不能使用Queue、Array
其他方法
1 | import multiprocessing as mltp |
concurrent.futures
- 异步并发模块
ThreadPoolExecutor
1 | class ThreadPoolExecutor: |
- 用途:创建线程池
.shutdown
用途:关闭执行器,清理和该执行器相关的资源
- 可以多次调用,调用之后不能进行其他操作
参数
wait:阻塞,直到所有运行futures执行完毕,所有 资源被释放
ProcessPoolExecutor
1 | class ProcessPoolExecutor: |
用途:创建进程池
- 参考
ThreadPoolExecutor,.map方法支持chunksize参数
- 参考
常使用
with语句使用1
with ProcessPoolExecutor() as Pool:
- 处理池执行完
with语句块中后,处理池被关闭 - 程序会一直等待直到所有提交工作被完成
- 处理池执行完
注意事项
被提交的任务必须是简单函数形式,不支持方法、闭包和 其他类型可执行
函数参数、返回值必须兼容
pickle模块,因为进程间通信 交换数据使用其序列化函数不要修改环境
- 被提交的任务函数出打印日志之类等简单行为外,不应该 有保留状态、副作用
混合使用进程池、多线程时
- 创建任何线程之前先创建、激活进程池
- 然后线程使用同样的进程池进行计算密集型工作
Future
1 | class Future: |
- 用途:代表一个异步计算的结果
- 直接创建对象无价值

