DBMS 查询优化

查询优化技术

查询优化技术:求解给定查询语句的高效执行计划过程

  • 目标:在数据库查询优化引擎生成执行策略的过程中,尽量减小 查询总开销
  • SQL层面上的局部优化,区别于数据库调优的全局优化

  • 广义数据库查询优化

    • 查询重用技术
    • 查询重写规则
    • 查询算法优化技术
    • 并行查询优化技术
    • 分布式查询优化技术
  • 狭义数据库查询优化

    • 查询重写规则:代数/逻辑优化,RBO
    • 查询算法优化技术:非代数/物理优化,CBO

代数/逻辑优化

代数/逻辑优化:依据关系代数的等价变换做逻辑变换

  • 语法级:查询语句层、基于语法进行优化
  • 代数级:使用形式逻辑、关系代数原理进行优化
  • 语义级:根据完整性约束,对查询语句进行语义理解,推知可 优化操作

非代数/物理优化

非代数/物理优化:根据数据读取、表连接方式、排序等技术对查询 进行优化

  • 物理级:物理优化技术,基于代价估计模型,比较得出各执行 方式中代价最小者
    • 查询算法优化:运用基于代价估算的多表连接算法求解最小 花费计算

查询重用技术

查询重用:尽可能利用先前执行的结果,以节约全过程时间、减少 资源消耗

  • 查询结果的重用:分配缓冲块存放SQL语句、最后结果集
  • 查询计划的重用:缓存查询语句执行计划、相应语法树结构
  • 优势:节约CPU、IO消耗
  • 弊端
    • 结果集很大回消耗放大内存资源
    • 同样SQL不同用户获取的结果集可能不完全相同

查询重写规则

查询重写:查询语句的等价转换

  • 基于关系代数,关系代数的等价变换规则为查询重写提供了理论 支持
  • 查询重写后,查询优化器可能生成多个连接路径,可以从候选者 中择优

目标

  • 将查询转换为等价、效率更高的形式
    • 低效率谓词转换为高效率谓词
    • 消除重复条件
  • 将查询重写为等价、简单、不受表顺序限制的形式,为 物理查询阶段提供更多选择

优化方向

  • 过程性查询转换为描述性查询:视图重写
  • 复杂查询尽可能转换为多表连接查询:嵌套子查询、外连接、 嵌套连接等
  • 低效率谓词转换为高效率谓词:等价谓词重写
  • 利用(不)等式性质简化wherehavingon条件

查询算法优化技术

todo

Rule-Based Optimizer

RBO:基于规则的优化器

  • 对AST/LP进行遍历,模式匹配能够满足特定规则的结点,进行 等价转换,得到等价的另一棵树

    • 剪枝:删除一些无用计算
    • 合并:合并多个计算步骤
  • 经验式、启发式的固定transformation,手动设置(硬编码) 在数据库中规则决定SQL执行计划

经典优化规则

  • predicate pushdown:谓词下推

    sql_optimization_predicate_pushdown

  • constant folding:常量累加

    sql_optimization_constant_folding

  • column pruning:列值裁剪

    sql_optimization_column_pruning

  • combine limits:Limits合并

  • inner join只访问单表:降为semi join

特点

  • 操作简单、能快速确定连接方式

  • 规则虽然有效但不敏感

    • 数据分布发生变化时,RBO是不感知的
  • 基于RBO生成的执行计划不能确保是最优的

    • 启发式规则只能排除一些明显不好的存取路径

Cost-Base Optimizer

CBO:基于成本的优化器

  • 根据SQL的执行成本制定、优化查询作业执行计划,生成可能 的执行计划中代价最小的计划

    • 数据表统计数据
      • 基/势
      • 唯一值数量
      • 空值数量
      • 平均、最大长度
    • SQL执行路径I/O
    • 网络资源
    • CPU使用情况
  • 以上执行信息获取方式取决于不同平台、数据库

    • 执行SQL前抽样分析数据
    • 每次执行SQL都会记录统计信息
  • 特殊概念

    • cardinality:集的势,结果集的行数

      • 表示SQL执行成本值
      • SQL执行返回的结果集包含的行数越多,成本越大
    • selectivity:可选择率,施加指定谓语条件后返回 结果集的记录数占未施加任何谓语条件的原始结果集记录数 的比率

      • 值越小,说明可选择性越好
      • 值越大,说明可选择性越差,成本值越大

常见优化规则

  • hash-join
    • 选择正确hash建表方
    • 选择正确join类型:广播hash、全洗牌hash
    • join reorder:调整多路join顺序
      • 尽量减小中间shuffle数据集大小,达到最优输出

特点

  • 对各种可能情况进行量化比较,可以得到花费最小的情况
  • CBO本身需要耗费一定资源,需要平衡CBO和查询计划优化程度
    • 数据表的数据统计资源耗费
    • 优化查询计划即时资源耗费,如果组合情况比较多则花费 判断时间较多

并行查询优化技术

并行数据库系统中查询优化的目标:寻找具有最小响应时间的查询 执行计划

  • 具有最小执行代价的计划不一定具有最快相应时间,需要考虑 把查询工作分解为可以并行运行的子工作

  • 查询能否并行取决于

    • 系统中可用资源
    • CPU数目
    • 运算中特定代数运算符
  • 查询并行可以分为

    • 操作内并行:将同一操作如单表扫描、两表连接、排序操作 等分解为多个独立子操作

    • 操作间并行:一条SQL查询语句分解为多个子操作

分布式查询优化技术

分布式数据库系统:查询策略优化、局部处理优化是查询优化重点

  • 查询策略优化:主要是数据传输策略优化

    • 主要考虑因素:数据的通信开销
    • 主要目标:以减少传输次数、数据量
  • 局部处理优化:传统单结点数据库的查询优化技术

  • 代价估计模型:总代价 = IO代价 + CPU代价 + 通信代价

Database Parser

Parser综述

Parser:分析器,将SQL语句切分为token,根据一定语义规则解析 成AST

todo

查询计划/树

sql_parser_ast

查询计划:由一系列内部操作符组成,操作符按照一定运算关系构成 查询的一个执行方案

  • 形式上:二叉树

    • 树叶是每个单表对象
    • 两个树叶的父节点是连接操作符连接后的中间结果
    • 每个结点即临时“关系”
  • 查询的基本操作:选择、投影、连接

    • 选择、投影的优化规则适用于select-projection-join 操作和非SPY(SPY+Groupby)操作
    • 连接操作包括两表连接、多表连接

结点类型

  • 单表结点:从物理存储到内存解析称逻辑字段的过程

    • 考虑数据获取方式
      • 直接IO获取
      • 索引获取
      • 通过索引定位数据位置后再经过IO获取相应数据块
  • 两表结点:内存中元组进行连接的过程

    • 完成用户语义的局部逻辑操作,完成用户全部语义需要配合 多表连接顺序的操作
    • 不同连接算法导致连接效率不同
    • 考虑两表
      • 连接方式
      • 代价
      • 连接路径
  • 多表中间结点:多个表按照“最优”顺序连接过程

    • 考虑代价最小的“执行计划”的多表连接顺序

Schema Catalog

元数据信息:表的模式信息

  • 表的基本定义:表名、列名、数据类型
  • 表的数据格式:json、text、parquet、压缩格式
  • 表的物理位置

解释型语言

Abstarct Syntax Tree

AST:抽象语法树,源代码的抽象语法结构的树状表示形式

js_parser_ast js_parser_ast_seq

  • 基本上语言中每种结构都与一种AST对象相对应

    • 不同解释器会有独有的AST格式
  • AST定义了代码的结构,可以通过操纵AST,精准定位到各种语句 ,实现

    • 代码分析、检查:语法、风格、错误提示
    • 代码结构优化:代码格式化、高亮、自动补全
    • 优化变更代码:改变代码结构

代码

基础数据类型

整形值

无额外空交换

  • 异或

    1
    2
    3
    a = a^b
    b = a^b
    a = a^b
  • 加减

    1
    2
    3
    a = a+b
    b = a-b
    a = a-b
  • 当然,对某些语言可以同时交换,如:python

取整

  • 向下取整:mid = (left + right) // 2
  • 向上取整:mid = (left + right + 1) // 2

运算溢出

  • 正数:边界值10x7FFF FFFF0xFFFF FFFF
  • 负数:边界值0x8000 00000xFFFF FFFF
  • 忽略语言特性:如long类型常量不加LL

初值选取

0值未考虑

浮点值

  • 浮点取整:尽量避免混用使用向上取整、向下取整,容易混淆

  • 浮点型相等比较:不应使用==,应该使用相差<某常数

    1
    2
    3
    a, b = 1.11111, 1.11111111
    if abs(a - b) < 0.00001:
    print("equal")

数据结构

线性表

  • 常用初值

    • 数值型:0-1sys.maxsizefloat('inf')
    • 字符串型:""
    • 迭代过程中可能取值:输出列表首个元素
  • 判断条件

    • 是否为初值
    • 是否越界
  • 对比迭代技巧

    • 从左至右、从右至左分别遍历
    • 原始列表、排序后列表分别遍历
  • 边界条件限制

    • 判断语句中:先列出边界条件,利用短路求值简化代码

双指针

  • 迭代操作注意事项

    • 保证退出循环
    • 所有值均被检验
    • 数据处理、移动指针的顺序
  • 相向双指针:两指针均向中间靠拢不断缩小搜索空间

    • 明确切片范围:是否包括端点,并尽量保持不变
    • 据此明确搜索空间范围,则搜索空间为空时结束循环
  • 滑动窗口:两指针同时向一侧移动,检验窗口内切片

    • 分类
      • 固定间隔双指针
      • 不同条件移动双指针
    • 示例
  • 快慢指针:两指针同时迭代、但运动速度不同

    • 示例
      • 判断单链表是否成环

端点利用

  • 两端限位器:在数据端点添加标记数据,便于

    • 指示“指针”已经到达数据端点
    • 简化特殊情况代码
      • 将循环中的判断条件合并
      • 端点标记数据符合一般情况,无需特殊处理
    • 示例
      • 数组末尾添加查找键(查询问题):在顺序查找中可以 将判断数组越界、查找键比较合并
  • 末端点为空闲位置:存储有效值

    • 队列、栈插入元素:末端点处为空闲位置,可直接使用
    • 数组迭代比较:末端点处存储有效值,先比较再更新指针
  • 末端点为非空闲位置

    • 队列、栈:可以直接使用其末尾元素作为上次添加的元素, 简化代码

链表特性

  • 头指针/节点:添加链表头,保证特殊情况链表正常工作

    • 示例
      • 删除只包含一个元素的链表
  • 自设外部指针

    • 使用时注意有时会改变内部节点值

      1
      2
      // 修改链表内节点值
      outer.next.val = 4

HashXXX

  • hash数据结构查询是常数时间,非常合适缓冲记录结果

    • HashSet:常数时间判断元素存在性
    • HashDict:键元素、值元素出现次数,记录次数
  • 特殊、常用键

    • 有重复元素:有序tuple、字符串
    • 无重复元素:frozen set

逻辑

运算符

  • 注意运算符优先级

  • =结合不等号

    • 同时使用<=>=,容易造成死循环、遗漏等
    • 尽量只使用>=号,不再使用<=

递归终止条件

递归终止条件主要有两种设计方案

  • 最后元素:判断元素是否是最后元素,终止递归调用

  • 空(无效)元素:判断元素是空(无效)元素,终止递归调用

    • 需要确保最终能够进入该分支,而不是停止在最后一步

预处理方法

排序

  • 预排序是简化问题的好方法

分治

  • 缩小搜索空间:按特征排序后,根据该特征是否满足条件 即可缩小搜索空间

组合

  • 组合后剔除重复:可重复组合排序后唯一,方便剔除重复元素
  • 组合前保证唯一:对组合候选集“预排序”,保证取用元素位序 不减(可重复)、单增(不可重复)
    • 相当于提前对结果排序,保证得到组合结果唯一
    • “预排序”指候选集中顺序有意义,无需真正排序

特殊测试用例

字符串

  • 空字符串
  • 长度1字符串、长度2字符串
  • 字符相同字符串

普通列表

  • 空列表

    • 若在循环外即已取值,应该提前判断列表是否空
  • 长度1列表、长度2列表

  • 元素相同列表

树、二叉树

  • 空树、只有根元素

    1
    2
    node.val = value;
    # 未考虑空树
  • 只有左子树、右子树

文件边界条件

  • 首个字符
  • 最末字符、倒数第二个字符

代码优化考量

  • 保持程序设计风格:把经常使用的工具性代码编辑成已验证
  • 用规范的格式处理、保存数据
  • 区分代码与数据:与代码无关的数据应该尽可能区分开来,尽量 把数据保存在常量数组中

代码执行速度

  • 输入输出方式过慢:cin等高级输入、输出方式比较慢

程序结构优化

  • 栈溢出:数组等大局部变量保存到栈,少量递归即会发生栈溢出

输入、输出

将输入、输出流重定向到文件中,避免频繁测试时频繁输入

  • 输入放在in.txt文件中
  • 输出到out.txt
  • 输出执行时间

C/C++

1
2
3
4
5
6
7
8
9
10
11
12
13
14
#ifdef SUBMIT
freopen("in.txt", "r", stdin);
// input data
freopen("out.txt", "w", stdout);
// output
long _begin_time = clock();
#endif

// put code here

#ifdef SUBMIT
long _end_time = clock();
printf("time = %ld ms\n", _end_time - begin_time);
#endif

Python

1
2
3
4
5
6
7
8
9
10
import sys
import time
sys.stdin = open("in.txt", "r")
sys.stdout = open("out.txt", "w")
__tstart = time.time()

# code here

__trange = time.time() - __tstart
print("time used: %f" % __trange)

博弈论

总述

约瑟夫斯问题

n个人围成圈编号{0..n-1},从1号开始每次消去第m个人直到最后一个 人,计算最后人编号$J(n)$。

减1法

考虑每次消去1人后剩余人的编号情况

  • 还剩k人时,消去1人后,以下个人编号为0开始,将剩余人重新 编号,得到每个人在剩k-1人时的编号

  • 相较于剩k人时,剩k-1人时每个人编号都减少m,即每个人在剩 k人时编号满足

  • 考虑只剩1人时,其编号为0,则可以递推求解

算法

1
2
3
4
5
6
7
Joseph_1(n, m):
// 减1法求解Joseph问题
// 输入:人数n、消去间隔m
// 输出:最后留下者编号
j_n = 0
for k from 2 to n:
j_n = (j_n + m) % k

特点

  • 算法效率
    • 时间效率$\in O(n)$

减常因子

剩余人数$k >= m$时考虑所有人都报数一次后剩余人的编号变化情况

  • 还剩k人时,报数一圈后消去k//m人,以最后被消去人的下个人 编号为0开始,将剩余人重新编号,得到剩k-k/m人时的编号

  • 相较于剩k人时,剩k-k//m人时每个人编号满足

    • $d = k // m$
    • $s = J_{k - d} - n\%m$
  • $k < m$时,使用减1法计算

    • m很大时,以$k < m$作为调用减1法很容易使得递归超出 最大值
    • 另外$m < k <= d * m$时,每轮也不过消去$d$个人,而 递推式复杂许多、需要递归调用
    • 所以具体代码实现中应该选择较大的$d$值,如5

算法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
Joseph_factor(n, m):
// 减常因子法求解Joseph问题
// 输入:人数n、消去间隔m
// 输出:最后留下者编号
if n < 5 * m:
j_n = 0
for k from 2 to n
j_n = (j_n + m) % k
return j_n

s = Joseph(n-n/m, m) - k % m
if s < 0:
retrun s + n
else:
return s + s // (m-1)
return j_n

特点

  • 算法效率

    • 时间效率$\in O(log n) + m$
  • 特别的,对$m=2$时

    • $n=2k$为偶数时,$J(2k)=2J(k)-1$
    • $n=2k+1$为奇数时,$J(2k+1)=2J(k)+1$

任意第k个

  • 考虑报数不重置,则第k个被消去的人报数为$k * m - 1$

  • 对报数为$p = k * m + a, 0 \leq a < m$的人

    • 此时已经有k个人被消去,剩余n-k个人

    • 则经过$n - k$个剩余人报数之后,该人才继续报数,则 其下次报数为$q = p + n - k = n + k*(m-1) + a$

  • 若该人报数$p$时未被消去,则$a \neq m-1$,则可以得到 $p = (q - n) // (m-1) * m + (q-n) \% (m-1)$

算法

1
2
3
4
5
6
7
8
Joseph_k(n, m, k):
// 计算Joseph问题中第k个被消去人编号
// 输入:人数n、间隔m、被消去次序k
// 输出:第k个被消去人编号
j_k = k*m - 1
while j_k >= n:
j_k = (j_k-n) // (m-1) * m - (j_k-n)%(m-1)
return j_k

算法特点

  • 算法效率

    • 时间效率$\in O(log n)$
  • 特别的,m=2时对n做一次向左循环移位就是最后者编号

双人游戏

  • 双人游戏中往往涉及两个概念
    • state:状态,当前游戏状态、数据
    • move:走子,游戏中所有可能发生的状态改变
  • 状态、走子彼此之间相互“调用”
    • 状态调用走子转化为下个状态
    • 走子调用状态评价当前状态
1
2
3
4
5
6
7
8
9
10
11
12
13
make_move(state, move):
switch move:
case move_1:
state = move_1(state)
evaluate_state(state)
...other cases...

evaluate_state(state):
switch state:
case state_1:
make_move(state, move_1)
...other cases...
end game

拈游戏

同样局面,每个玩家都有同样可选走法,每种步数有限的走法都能 形成游戏的一个较小实例,最后能移动的玩家就是胜者。

  • 拈游戏(单堆版):只有一堆棋子n个,两个玩家轮流拿走最少 1个,最多m个棋子
  • 拈游戏(多堆版):有I堆棋子,每堆棋子个数分别为 ${n_1,\cdots,n_I}$,可以从任意一堆棋子中拿走任意允许数量 棋子,甚至拿走全部一堆

减可变规模算法

算法

(单堆)从较小的n开始考虑胜负(标准流程)

  • n=0:下个人失败
  • 1<=n<=m:下个人胜利(可以拿走全部)
  • n=m+1:下个人失败(无论拿走几个,对方符合1<=n<=m 胜利条件)
  • 数学归纳法可以证明:n=k(m+1)时为败局,其余为胜局
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// 两个函数轮流递归调用
find_good_move(coins):
// 判断当前是否有成功步骤
// 输入:棋子数目
// 输出:成功策略或没有成功策略
for taken=1 to limit do
if(is_bad_position(coins-taken))
// 对手没有成功策略
return taken
return NO_GOOD_MOVE

is_bad_position(coins):
// 判断当前是否是good position
// 输入:棋子数量
// 输出:是否有成功策略
if (coins == 0)
return true
return find_good_move(coins) == NO_GOOD_MOVE
// 没有成功策略

特点

  • 堆为2时,需要对两堆是否相同分别考虑

  • 对更一般的I堆时

    • 对每堆数量的位串计算二进制数位和
    • 结果中包含至少一个1则对下个人为胜局,全为0则为负局
    • 则玩家下步要拿走的棋子数量要使得位串二进制数位和全0 ,则对方陷入负局

    • todo又是二进制???和约瑟夫斯问题一样了

    • 但是这里没有涉及最多能拿几个啊,不一定能够成功拿到 使拈和全为0啊
  • 二进制数位和(拈和):每位求和并忽略进位(奇或)

并行开发

综述

python并行多任务均依赖操作系统底层服务并行执行python代码

  • 线程派生:基本所有主流平台均支持
  • 多进程
    • shell命令进程
    • 子python进程

跨平台多进程实现

  • 创建子python进程
    • 类Unix:fork系统调用实现进程分支,分支进程运行时 环境同主进程完全一致
    • Win:创建python进程,import当前调用代码得到类似 主进程运行时环境
  • pickle序列化被调函数,传递给子进程执行
  • 因为Win下分支进程需要导入当前调用,所以多进程代码必须 在__main__内,否则无限循环创建新进程
  • 进程池调用函数要声明在进程池创建之前,否则启动进程会报错

进程通信

  • python进程间数据传递基本都是通过pickle序列化传递,所以 需要传递的数据要支持pickle序列化

  • multiprocessing等模块派生进程需要传递被调函数,所以 不支持

    • lambda匿名函数
    • 绑定对象方法

其他相关模块、包

  • 多进程:pathospp
  • 进程通信:signal

注意事项

  • 子进程报错直接死亡,错误信息默认不会输出到任何地方,所以 子进程中,多使用try catch

os模块

派生进程

1
2
3
4
5
6
os.startfile(filename)
# 调用系统默认程序打开文件,类似于鼠标双击
# linux下没有实现
int = os.system(command(str))
# 运行shell命令,返回命令执行退出状态
# 和普通shell命令一样默认阻塞,除非后台运算符`&`

os.popen

1
2
pipe = os.popen(cmd(str), mode="r"/"w", buffering=-1)
# 运行shell命令并重定向其输出、输入流到管道开放
  • 用途:运行shell命令并重定向其输出、输入流到管道开放

  • 返回值:返回管道流对象

    • 类似shell中管道语法
    • 管道流对象类似普通文件流,支持一般读、写、迭代 (但是文档中只有close方法)

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      # `r`模式执行,读取子进程标准输出
      pipe.readlines()
      pipe.readline()
      pipe.read()
      for i in pipe:
      # 迭代器语法
      # `w`模式执行,写入子进程标准输入
      pipe.write()
      pipe.close()
      # 返回退出状态
      # 历史原因,退出状态为0时返回None
  • os.popen一般不会阻塞,但是python执行需要整个命令行程序 完成的语句时仍然会阻塞

    • 关闭管道对象
    • 一次性读取所有输出流
  • subprocess模块可以实现与os.systemos.popen相同的 效果,使用更复杂,但是对流的连接、使用提供更完善的控制

os.fork

进程分支是构建平行任务的传统做法,是Unix工具集的基本组成部分

  • 分支是开始独立程序的直接做法,无论调用程序是否相同
  • 分支想法基于复制
    • 程序调用分支例行程序时,操作系统会创建在该进程副本
    • 和进程并行的运行副本
    • 有些系统实现并不真的复制原有程序,以减少资源消耗, 但是新副本会像真实副本一样运行
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
import os

def child():
print("hello from child", os.getpid())
os._exit(0)

def parent():
while True:
newpid = os.fork()
# 子进程返回0
# 父进程返回子进程进程号
if newpid == 0:
child()
else:
print("hello from parent", os.getpid(), newpid)
if input() == "q":
break
  • 返回值:据此区分父、子进程,执行不同任务

    • 子进程中返回0
    • 父进程中返回子进程ID
  • os.fork仅仅是系统代码库中标准进程分支调用简单封装

    • 和C共用代码库
    • 在win下标准python版本中无法运行,和win模型冲突过多
    • Cygwin中python可以运行,虽然行为同真正Unix分支不完全 相同,但是足够接近
  • os.fork实际复制整个python解释器

    • 除非python脚本被编译为二进制机器码

os.exec

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
def execv(path, args=tuple/list)
def execl(path, *args=*list/*tuple)
# 以参数执行指定可执行文件,**代替**当前进程

def execvp(file, args)
def execlp(file, *args)
# `p` for `$PATH`
# 在系统搜索路径`$PATH`中定位可执行程序

def execve(path, args=tuple/list, env=dict)
def execle(path, args=*tuple/*list, env=dict)
# `e` for `environ`
# 将`env`字典作为环境环境变量传递

def execvpe(path, args=tuple/list, env=dict)
def execlpe(path, args=*tuple/*list, env=dict)
# 在系统搜索路径`$PATH`中定位可执行程序
# 将`env`字典作为环境环境变量传递
  • os.exec族的函数会覆盖当前进程

    • 所以该语句的代码都不会执行
    • 不会更改进程号
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    import os
    param = 0
    while True:
    param += 1
    newpid = os.fork()
    if newpid == 0:
    os.execlp("python", "child.py", str(param))
    assert False, "starting new program error"
    # 上句正常执行,这句永远无法被调用
    else:
    print("child is", pid)
    if input() == "q": break
  • multiprocessing模块的进程派生模型+os.exec配合使用 可以在win下实现类似os.fork的效果

spawn

1
2
os.spawn
# 启动带有底层控制的新程序

进程通信

1
2
3
4
5
6
read_fd, write_fd = os.pipe()
# 创建管道,返回管道写入、读取描述符
# 可以使用`os.fdopen()`封装管道描述符,方便读、写
os.mkfifo(path, mode=438, *, dir_fd=None)
# 创建命名管道,win不支持
# 仅创建外部文件,使用需要像普通文件一样打开、处理

subprocess模块

Popen

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
class Popen:
def __init__(self,
args(str,[str]),
bufsize=-1/0/1/int,
executable=None/str,
stdin=None/stream,
stdout=None/stream,
stderr=None/stream,
preexec_fn=None/callable,
close_fd=obj,
shell=False/True,
cwd=None/path,
env=None/dict,
universal_newlines=False/True,
startupinfo=None,
creationflags=0,
restore_signals=True/False,
start_new_session=False/True,
pass_fds=(),
encoding=None/str,
errors=None
)
  • 用途

  • 参数

    • args:需要执行的命令

    • executable:备选执行命令

    • stdin/stdout/stderr:执行程序标准输入、输出、 错误流连接对象

      • 默认:当前进程标准输入、输出、错误
      • subprocess.PIPE=-1:当前管道对象标准…
    • preexec_fn:子进程执行前在子进程中调用的对象

      • POSIX only
    • close_fds:控制关闭、继承文件描述符

    • shell:是否通过shell执行命令

      • 执行shell内置命令则必须置True
        • win:type
        • linux:set
      • Linux下False:由os.execvp运行
    • cwd:子进程执行目录

    • env:子进程环境变量

    • universal_newlines:是否在3个标准流中使用行结尾符

      • 即是否按照文本处理3个标准流
    • startupinfo:windows only

    • restore_signals:POSIX only
    • start_new_session:POSIX only
    • pass_fds:POSIX only

.communicate

1
(stdout, stderr) = communicate(self, input=None, timeout=None)
  • 用途:和子进程交互,阻塞直到子进程终止
    • input传递给子进程标准输入
    • 返回标准输出、错误输出
    • universal_newlinesTrue时,input参数、返回值 应该为str,否则为bytes

其他方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
def kill(self):
# kill进程通过*SIGKILL*信号

def terminate(self):
# 终止进程通过*ISGTERM*信号

def send_signal(self, sig):
# 向进程传递信号

def poll(self):
# 检查子进程是否终止,设置、返回`.returncode`属性

def wait(self, timeout=None, endtime=None):
# 等待直到子进程终止,返回`.returncode`属性

pipe.stdin
pipe.stdout
pipe.stderr
# 管道标准输入、输出、错误流
# 创建子进程时,可以选择和子进程相应流连接
# 支持读、写、迭代
pipe.returncode
# 子进程退出状态
  • Popen创建对象对象之后会立刻执行

  • 同时指定stdoutstdin参数可以实现管道双工工作

    • 需要注意,读写时交替发送缓冲数据流可能导致死锁

call

1
2
3
4
subprocess.call(
"type hello.py",
shell=True/False
)

_thread

  • 为系统平台上的各种线程系统提供了可移植的接口
  • 在安装了pthreads POSIX线程功能的系统上,接口工作方式 一致,无需修改源码即可正常工作
  • 基本可以完全被threading模块替代了

start_new_thread

1
2
3
4
5
6
7
def start_new_thread(
callable,
args=tuple/list,
kwargs=dict
)
def start_new():
# deprecated,同上
  • 用途:开启新线程,以参数调用callable

  • 返回值:应该是线程起始地址

  • 派生线程在函数返回后退出

    • 若在线程中函数抛出未捕捉异常,打印堆栈跟踪记录、退出 线程
    • 程序其他部分继续运行
  • 大多数系统平台上,整个程序主线程退出时,子线程随之退出

    • 需要一些处理避免子线程意外退出

其他方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
Lock = _thread.alloacate_lock()
# 获取一个`Lock`锁
# 等价于`threading.Lock()`
Lock = _thread.allocate()
# deprecated,同上

RLock = _thread.RLock()
# 获取一个`RLock`可重入锁
# 等价于`threading.RLock()`

def _thread.exit()
# 退出当前线程,可捕捉
# 等价于显式`raise SystemExit`、`sys.exit()`
def _thread.exit_thread()
# 同上

例子

例1

  • 全局线程锁保护对输出流写入
  • 全局线程锁实现主线程、子线程间通信,保证主线程在子线程 之后退出
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
import _thread as thread

stdoutmutex = thread.allocate_lock()
# 创建全局标准输出锁锁对象
exitmutexes = [thread.allocate_lock() for _ in range(10)]
# 为每个线程创建锁
exitmutexes_bool = [False] * 10
# 线程共享内存,同样可以使用全局变量作为信号量,而不用
# 额外开销

def counter(myId, count):
for i in range(count):

stdoutmutex.acquire()
# 线程向标准输出流写入时,获得锁
print("[%s] => %s" % (myId, i))
stdoutmutex.release()
# 向标准输出流写入完毕后,释放锁

with stdoutmutex:
# 线程锁同样支持`with`上下文管理器
print("[%s] => %s again" % (myId, i))

exitmutexes[myID].acquire()
# 线程执行完毕后获取对应自身id的锁,通知主线程

for i in range(10):
thread.start_new_thread(counter, (i, 100))
# 创建、启动新线程

for mutex in existmutexes:
# 检查所有信号锁
while not mutex.locked():
# 直到信号锁被获取,结束死循环
pass
print("main thread exiting")

例2

  • with上下文管理器使用锁
  • 全局变量实现主线程、子线程通信,避免主线程在子线程之前 退出
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
import _thread as thread
import time

stdoutmutex = thread.allocate_lock()
exitmutexes_bool = [False] * 10

def counter(myId, count):
for i in range(count):
with stdoutmutex:
# 线程锁同样支持`with`上下文管理器
print("[%s] => %s again" % (myId, i))
exitmutexes[myID] = True

for i in range(10):
thread.start_new_thread(counter, (i, 100))

while not all(exitmutexes):
time.sleep(0.25)
# 暂停主线程,减少占用CPU进行无价值循环
print("main thread exiting")

threading

Thread

1
2
3
4
5
6
7
8
9
10
class Thread:
def __init__(self,
group=None,
target=callable,
name=None/str,
args=(),
kwargs={},
*,
daemon=None/True/daemon):
pass
  • 用途:可控线程类,有两种方法使用

    • 传递callable参数创建新对象
    • 继承、覆盖run方法:代码和Thread深耦合,可能 不方便代码复用,如multiprocessing模块
  • 参数

    • group:保留参数用于未来扩展
    • target:可执行对象,将被run invoke
    • name:线程名,缺省Thread-N
    • args:用于invoke target参数tuple
    • kwargs:用于invoke target keyword参数dict
    • daemon:是否是守护线程
      • 默认情况下,主进程(线程)会等待子进程、线程退出 后退出
      • 主进程(线程)不等待守护进程、线程退出后再退出
      • 注意:主进程退出之前,守护进程、线程会自动终止
  • 若衍生类覆盖此构造器方法,务必首先调用此方法

.run

1
2
def run(self):
pass
  • 用途:代表线程活动
    • run用于invoke target
    • 覆盖此方法设置线程活动

.start

1
2
def start(self):
pass
  • 用途:开始线程活动
    • 线程创建完成后不会立即执行,需要手动调用.start启动
    • 多次调用raise RuntimeError

.join

1
2
3
def join(self,
timeout=None/float):
pass
  • 用途:等待直到线程结束

    • join:将线程加入当前线程
    • 可以多次调用
    • 试图导致死锁时,将会raise RuntimeError
  • 参数

    • timeout:指定超时时间,单位秒
      • 缺省否则阻塞直到线程结束

其他方法

1
2
3
4
5
6
7
8
9
10
bool = is_alive(self):
# 返回线程是否存活

def setDaemon(self, daemonic):
# 设置守护进程
bool = isDaemon(self):

def setName(self, name):
# 设置线程名
def getName(self):

Event

1
2
3
4
5
6
7
8
9
10
11
12
13
14
class Event():
def set(self):
# 设置信标值为`True`,发送信号

bool = is_set(self):
# 查看信标是否被设置

bool = wait(self, timeout):
# 阻塞,直到超时、信标被设置为`True`
# 返回信标值,即因超时返回时返回`False`

def clear():
# 重置Event对象,设置信标值为`False`

  • 用途

    • 发送信号:is_set触发事件
    • 接收信号:wait阻塞直到事件发生
  • Event中包含信标,可在线程中设置、接收,实现线程 间同步

    • Event对象信标默认设置为False,等待Event对象线程 会阻塞直到信标设置为真

    • 若有线程设置信标为真,则将唤醒所有等待该Event 对象线程

  • 若只想唤醒单个线程,用信号量、Condition代替

.clear

.clear可以重置Event对象

  • 难以确保安全清理、重新赋值Event对象,可能导致错过事件 、死锁

  • 且无法保证重置Event对象的代码能在线程再次等待此Event 信号之前执行

  • 所以Event对象最好单次使用,即其信标设置为真应立刻丢弃

  • 若线程需不停重复使用Event对象,使用Condition代替

Condition

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
class Condition():
def __init__(self, lock=None)
# `lock`:`Lock`、`RLock`对象,被用作底层锁
# 缺省创建新`RLock`对象作为底层锁

bool accquire():
# 获取一次condition内部锁

bool release():
# 释放一次condition内部锁

def notify(self, n=1):
# 唤醒至多`n`个on this condition的线程

def notify_all(self):
# 唤醒所有on this condition的线程

bool = wait(self, timeout=None):
# 释放底层锁,阻塞
# 直到被唤醒再次获取底层锁、超时,返回

bool = wait_for(self, predicate(callable), timeout=None):
# `wait`直到`predicate`返回`True`
  • 用途:Condition对象wait等待信号、notify唤醒一定 数量线程实现线程同步

  • 说明

    • 以上所有方法执行前均需要已获得底层锁,否则 raise RuntimeError
    • 因此以上方法一般都需要放在with代码块中,保证已经 获取了内部锁

with上下文管理器

1
2
3
4
5
with c:
c.wait()

with c:
c.notify()
  • with进入:获取condition底层锁,保证调用方法前已经获得 底层锁
  • with退出:释放condition底层锁
  • Condition支持with上下文管理器,而且非常必须
  • help里面看不到.acquire.release方法,但是是有 而且可以调用的,应该是官方不建议使用

.wait

  • 用途

    • 方法先释放底层锁,阻塞,使得其他等待 获取此对象底层锁获得锁
    • 等待被notify唤醒,再次获取锁,继续执行
  • 底层锁是RLock

    • .wait不是调用其.release()方法,而是调用RLock 内部方法确保真正释放锁,即使RLock被递归的获取多次

    • 再次获取锁时,调用另一个内部接口恢复递归层次,即 RLock内部计数

    • RLock本身性质:在没有被持有时,其内部计数被重置 为1,其他线程可以自由获取

.notify

  • .notify并不释放condition底层锁
  • 只是控制能够获取底层锁的线程数量

Semaphore

1
2
3
4
5
6
7
8
9
class Semaphore(builtins.object):
def __init__(self, value):
# `value`:起始许可证数量(最多允许同时执行线程数目)

bool = acquire(self, blocking=True, timeout=None):
# 获取信号量,内部计数(许可证)减1

def release():
# 释放信号量,内部计数(许可证)加1
  • 用途:Semaphore对象release方法生产、acquire方法 消耗信号量,实现线程通信

    • 可以像标准锁一样使用信号量作线程同步,但会增加复杂性 影响性能
    • 更适用于需要在线程间引入信号、限制的程序,如限制代码 的并发访问量
  • 信号量对象是建立在共享计数器基础上的同步原语

.acquire

  • 用途:获取信号量,内部计数(许可证)大于0则立刻减1

    • 内部计数>0-1立即返回True
    • 内部计数=0,阻塞、等待,直到其他线程调用release
  • 返回值:成功获取许可证则返回True,否则返回False

.release

  • 用途:释放信号量,内部计数(许可证)加1
    • 内部计数=0,表明有线程阻塞,随机唤醒线程

BoundedSemaphore

1
2
3
4
class BoundedSemaphore(Semaphore):
def release(self):
# 释放信号量,内部计数加1
# 当信号量总数超过初始化值时`raise ValueError`

Lock

  • Threading.Lock等价于_thread.allocate_lock,二者 都是工厂方法,返回lock类的实例
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
class lock:
bool = acquire(blocking=True/False, timeout=-1)
# 尝试获得锁,返回是否获得锁
bool = acquire_lock
# deprecated,同`acquire`

bool = locked()
# 返回锁状态
bool = lockec_lock()
# deprecated,同`lock`

bool = release()
# 释放锁,若锁本身未获取`raise RuntimeError`
bool = release_lock()
# deprected,同`release`
  • 用途:同步原语,排他性使用某资源

  • 说明

    • 支持with上下文语法,代码块执行前自动获取锁,执行 结束后自动释放
    • 为了避免出现死锁,每个线程应该一次只允许获取一个锁, 否则应该使用更高级死锁避免机制
    • 适合简单的锁定可变对象
  • lock对象应该是C实现,里面的方法是没有self参数的

其他

Lock锁可以视为使用更加方便的全局变量

  • 可以用于线程之间的通信:给每个子线程分配单独一个锁, 主线程、子线程可以通过锁状态通信

  • 很大程度上可以被全局变量“替换”

    • 获得锁:不断检查全局变量状态,阻塞直到全局变量 状态代表锁可获得,修改全局变量状态代表锁被获取

    • 释放锁:修改全局变量状态代表锁可获得

    • 不断检查变量状态会无意义占用CPU时间,可以在检查 间隙使用time.sleep()暂停线程

RLock

  • 工厂方法,返回RLock实例
1
2
3
4
5
6
7
class RLock:
bool = acquire(block=True):
# 尝试获取锁,返回是否获取锁
# 每次获取锁,内部计数器加1
bool = release()
# 释放锁,若锁本身未获取`raise RuntimeError`
# 每次释放锁,内部计数器减1
  • 用途:可重入锁,可以被同一线程多次获取

    • 若锁被当前线程获取多次,则需要被释放同样次数才能被 其他线程获取
    • 即只有内部计数器回到初始状态才能被任意线程获取
  • 没有线程持有锁时,RLock内部计数被重置为1

    • 应该是RLock就是通过内部计数记录被获取次数
  • 常用于给类整体上锁

    • 类内部每个方法都获取锁
    • 类内部方法之间相互调用
    • 这样类实例方法每次只能有一个线程完整调用
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    import Threading

    class SharedCounter:
    _lock = threading.RLock()
    # 被所有实例共享的类级锁
    # 需要大量使用计数器时,内存效率更高
    def __init__(self, intial_value=0):
    self._value = initial_value

    def incr(self, delta=1):
    with ShareCounter:
    self._value += delta

    def decr(self, deta=1):
    with SharedCounter:
    # 获取锁之后,调用也需要获取锁的`.incr`方法
    self.incr(-delta)

local

  • 用途:创建本地线程存储对象,该对象属性保存、读取操作只对 当前线程可见
    • 可以用于保存当前运行线程状态,隔离不同线程间数据
      • 套接字对象

例子

提前终止线程

轮询方法
  • 函数封装在类中,在类中设置成员变量作为轮询点
  • 方法terminate改变轮询点状态,用于外部调用结束线程
  • 线程方法run检查轮询点状态判断是否结束自身
    • 线程执行类似IO的阻塞操作时,无法返回自身、无法检查 轮询点,通过轮询终止线程难以协调
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
from threading import Thread
import time

class CountDownTask:

def __init__(self):
self._running = True

def terminate(self):
self._runing = False

def run(self, n):
while self._running and n > 0:
# 设置轮询点,告诉线程何时应该终止
print("T-minus", n)
n -= 1
time.sleep(5)

def test():
c = CountdownTask()
t = Thread(karget=c.run, args=(10,0))
# 创建Thread
t.start()
# 启动线程
c.terminate()
c.join()
超时循环
  • 设置任务超时,超时自动返回
    • 任务只要超时就会返回,不会出现线程阻塞
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
class IOTask:
def terminate(self):
self._running = False

def run(self, sock):
sock.settimeout(5)
# 设置超时
while self._running:
try:
data = sock.recv(8192)
break
except socket.timeout:
continue
...continued processing...
...terminated...
return

线程同步、通信

Event方式
1
2
3
4
5
6
7
8
9
10
11
12
from threading import Event, Threading

def send_signal(start_evt):
start_evt.set()
# 设置事件

def recv_signal():
start_evt = Event()
# 创建新事件
t = Thread.threading(target=send_signal, start_evt)
start_evt.wait()
# 阻塞直到接收到信号
Condition实现queue.Queue
  • 自定义数据类型,封装Condition实例进行线程间同步
  • put方法生产,notify通知消费者
  • get方法消费,阻塞直到被唤醒
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
import heapq
from threading import Condition

class PriortyQueue:
def __init__(self):
self._queue = [ ]
self._count = 0
self._cv = Condition()
# 封装`Condition`实例实现线程间同步

def put(self, item, priority):
with self._cv:
heapq.heappush(self._queue, (-priority, self._count, item))
self._count += 1
self._cv.notify()

def get(self):
with self._cv:
# 阻塞,直到空闲
while len(self._queue) == 0:
self._cv.wait()
# `Condition`默认使用`RLock`,可重入
return heapq.heappop(self._queue)[-1]

防死锁机制

严格升序使用锁
  • local保存当前线程状态,隔离不同线程锁数据
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
from threading import local
from contextlib import contextmanager

_local = local()

@contextmanager
# 使支持`with`上下文管理器语法
def acquire(*locks):

locks = sorted(locks, key=lambda x: id(x))
# 根据*object identifier*对locks排序
# 之后根据此list请求锁都会按照固定顺序获取

acquired = getattr(_local, "acquired", [ ])
# 为每个线程创建独立属性保存锁
if acquired and max(id(lock)) for lock in acquired >= id(locks[0]):
# `_local`中已有锁不能比新锁`id`大,否则有顺序问题
raise RuntimeError("lock order violation")

acquired.extend(locks)
_local.acquired = acquired
# 更新线程环境中锁

try:
for lock in locks:
# 只允许升序获取锁
lock.acquire()
yield
# `with`语句进入处
finally:
# `with`语句退出处
for lock in reversed(locks):
# 只允许降序释放锁
lock.release()
del acquired[-len(locks):]
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
def thread_1(x_lock, y_lock):
while True:
with acquire(y_xlock, y_lock):
print("Thread_1")

def thread_2(x_lock, y_lock):
while True:
with acquire(y_lock, x_lock):
print("Thread_2")

def test():
x_lock = threading.Lock()
y_lock = threading.Lock()

t1 = threading.Thread(target=thread_1, args=(x_lock, y_lock)):
t1.daemon = True
t1.start()

t1 = threading.Thread(target=thread_2, args=(x_lock, y_lock))
t2.daemon = True
t2.start()

queue模块

Queue

1
2
3
class Queue(builtins.object):
def __init__(self, maxsize=0):
# `maxsize`;限制可以添加到队列中的元素数量

queue.Queue:创建被多个线程共享的Queue对象

  • 线程安全的数据交换方式,基于collections.deque

    • Queue对象已经包含必要的锁,可以在多个线程间安全的 共享数据
    • Queue实际上是在线程间传递对象引用,不会复制 数据项,如果担心对象共享状态,可以传递不可修改数据 结构、深拷贝
  • 还可以使用Condition变量包装数据结构,实现线程线程中 间通信

get

1
2
3
4
5
6
obj =  get(self,
block=True,
timeout=None/num)

obj = get_nowait(self):
# 等同于`get(block=False)`
  • 用途:从队列中移除、返回一个元素

  • 参数

    • block
      • False:没有空闲slot立即raise Empty exception

.task_done

1
2
3
4
def task_done(self):
# 指示前一个队列“任务”完成
def join(self):
# 阻塞直到队列中所有元素被获取(消费)、**处理**
  • 说明
    • .join阻塞时,要所有队列中元素都被告知task_done, 才能解除阻塞
    • 即队列消费者每次get元素、处理后,要手动调用 task_done告知队列任务已完成
  • 也可以将Event和队列元素封装,用Event对象告知队列元素 处理完成

.put

1
2
3
4
5
6
7
8
def put(self,
item,
block=True,
timeout=None/num):
pass

def put_nowait(self, item):
# 等价于`put(self, item, block=False)`
  • 用途:向队列中追加元素

  • 参数

    • block
      • False:没有空闲slot立即raise Full exception

.qsize

1
2
3
4
5
6
7
8
9
10
int = qsize(self):
# 返回队列大概大小(不可靠)
# 非线程安全,在其使用结果时队列状态可能已经改变

bool = empty(self):
# deprecated,使用`qsize() == 0`替代
# 和`qsize()`一样非线程安全

bool = full(self):
# deprecated,使用`qsize() > n`替代,非线程安全

例子

队列元素消费通知

Event进队列
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
from queue import Queue
from threading import Thread, Event

def producer(out_q):
while running:
evt = Event()
out_q.put((data, evt))
# 将`Event`实例放入队列
evt.wait()
# 阻塞直到收到消费者信号

# A thread that consumes data
def consumer(in_q):
while True:
data, evt = in_q.get()
evt.set()
# 告知生产者消费完成

协调生产、消费线程终止

队列中添加特殊值
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
_sentinel = object()

def producer(out_q):
while running:
out_q.put(data)
out_q.put(_sentinel)
# 将特殊值放入队列,结束生产、通知消费者
def consumer(in_q):
while True:
data = in_q.get()
if data is _sentinel:
# 消费者接收到特殊值,结束生产
in_q.put(_sentinel)
# 特殊信号放回队列,继续传递
break

Queue实现线程池

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
from socket import socket, AF_INET, SOCK_STREAM
from threading import Thread
from queue import Queue

def echo_client(q):
sock, client_addr = q.get()
while True:
msg = sock.recv(65536)
if not msg:
break
sock.sendall(msg)
print("closed")

def echo_server(addr, nworkers):
q = Queue()
for n in range(nworkers):
t = Thread(target=echo_client, args=(q,))
t.daemon = True
t.start()
socket = socket(AF_INET, SOCK_STREAM)
socket.bind(addr)
sock.listen(5)
while True:
client_sock, client_addr = sock.accept()
q.put((client_sock, client_addr))

multiprocessing模块

  • multiprocessing支持一个基本与平台无关的进程派生模型, 在Unix、Windows下均可正常工作

  • 实现方式

    • 启动一个新的Python进程,import当前模块
    • pickle序列化需要调用的函数,传递至新进程执行
  • 其中PoolQueuePipe等实际都是其封装其子模块类 的工厂方法

Process

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
class Process():
def __init__(self,
group=None,
target=None/callable,
name=None/str,
args=()/list,
kwargs={}/dict,
daemon=None
):
self.authkey
self.daemon
self.exitcode
self.name
self.pid

def is_alive(self):

def join(self,
timeout=None/num
):

def start(self):
# 启动进程

def run(self):
# `start`方法调用`run`方法,若进程实例未传入`target`,
# `start`默认执行`run`方法

def terminate():
# 立即结束进程
  • 用途:Multiprocessing核心,类似于Thread,实现多进程 创建、启动、关闭

  • 成员方法基本类似Thread

1
2
3
4
5
6
7
8
9
10
11
from multiprocessing import Process
import os

def test(name):
print("Process ID: %s" % (os.getpid())
print("Parent Process ID: %s" % (os.getppid()))

if __name__ == "__main__":
proc = Process(target=test, args=("nmask",))
proc.start()
proc.join()

Pool

1
2
3
4
5
6
7
8
class Pool:
def __init__(self,
processes=None/int,
initializer=None,
initargs=(),
maxstacksperchild=None/int,
context=None
):
  • 用途:创建管理进程池,提供指定数量进程供用户调用
    • 新请求提交到pool中时
      • 若进程池没有满,将创建新进程执行请求
      • 否则请求等待直到池中有进程结束,然后创建新进程
    • 适合子进程多且需要控制子进程数量时

apply_async

1
2
3
4
5
6
7
8
def apply(self, func, args=(), kwds={}):
# 分配进程执行`func(*args, **kwds)`,阻塞
# 返回值:`func(item)`返回值

def apply_async(self, func, args=(), kwds={},
callback=None, error_callback=None)
# 异步分配进程执行调用,非阻塞
# 返回值:`ApplyResult`对象
  • 返回值:异步非阻塞调用返回结果操作句柄ApplyResult
  • 回调函数要有返回值,否则ApplyResult.ready()=False, 回调函数永远无法完成
ApplyResult
1
2
3
4
5
6
7
8
9
10
11
class ApplyResult:
def __init__(self, cache, chunksize, length,
callback, error_callback):

def get(self, timeout=None):

bool = ready(self):

bool = successful(self):

bool = wait(self, timeout=None):

map_async

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
def map(self, func, iterable, chunksize=None):
# 同步多进程`map(func, iterable)`,阻塞直到全部完成
# 返回值:结果列表,结果按调用顺序

def map_async(self, func, iterable, chunksize=None,
callback=None, error_callback=None):
# 异步多进程`map`,非阻塞
# 返回值:`MapResult(ApplyResult)`对象

def imap(self, func, iterable, chunksize=1):
# 迭代器版`map`,更慢、耗内存更少
def imap_unordered(self, func, iterable, chunksize=1):
# 返回结果无序版`imap`

def starmap(self, func, iterable, chunksize=1):
# 同步`map`,参数被解构`func(*item)`,阻塞

def starmap_async(self, func, iterable, chuncksize=None,
callback=None, error_callback=None):
# 异步`startmap`,阻塞

终止

1
2
3
4
5
6
7
8
9
def close(self):
# 关闭进程池,不允许添加新进程

def join(self):
# 阻塞,直至进程池中所有进程执行完毕
# 必须先`.close`进程池

def terminate(self):
# 终止进程池

Queue

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
class SimpleQueue():
bool empty(self):
def get(self):
def put(self):
class Queue:
def __init__(self, maxsize=0, *, ctx):
def join_thread(self):
# join feeder线程
def cancel_join_thread(self):
# 控制在进程退出时,不自动join feeder线程
# 有可能导致部分数据在feeder中未被写入pipe而丢失
def close(self):
# 关闭feeder线程
def qsize(self):
def empty(self):
def full(self):
def get(self, block=True, timeout=None):
def get_nowait(self):
def put(self, obj, block=True, timeout=None):
def put(self):
class JoinableQueue(Queue):
def task_done():
def join():
  • 用途:进程安全队列

  • multiprocessing.Queue基于multiprocessing.Pipe构建

    • 数据传递时不是直接写入Pipe,而是写入本地buffer, 通过feeder线程写入底层Pipe,从而实现超时控制、 非阻塞put/get

    • 所以提供了.join_threadcancel_join_threadclose函数控制feeder流行为

    • 相当于线程安全的queue.Queue的多进程克隆版

  • multiprocessing.SimpleQueue:简化队列

    • 没有Queue中的buffer,没有使用Queue可能有的问题, 但是put/get方法都是阻塞的、没有超时控制

Pipe

  • mltiprocessing.Pipe()返回两个用管道相连的、读写双工 Connection对象
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
class Connection():
def __init__(self, handle,
readable=True,
writable=True):

def close(self):
def fileno(self):
# 返回描述符或连接处理器
bool = poll(self, timeout=0):
# 是否有输入可以读取
obj = recv(self):
# 接收pickable对象
# 若接收字节流不能被pickle解析则报错
bytes = recv_bytes(self, maxlength=None):
# 接收字节流作为`bytes`
int = recv_bytes_into(self, buf, offset=0):
# 接收字节流存入可写`buf`
# 返回读取的字节数量
def send(self, obj):
# 发送对象
def send_bytes(self, buf, offset=0, size=None):
# 发送bytes-like对象中字节数据
  • 对象会在每条信息前添加标志字节串,可以自动处理多条信息 堆叠

    • 可以通过os.read(connect.fileno())获取

共享内存

1
2
3
4
5
6
7
8
class SynchronizedBase:
def __init__(self, obj, lock=None, ctx=None):

def get_lock(self):
# 获取`multiprocessing.synchronize.RLock`

def get_obj(self):
# 获取数据对象,C数据结构`ctypes.`

Value

1
2
3
4
def Value(typecode_or_type, *args, lock=True):
# 返回同步共享对象
class Synchronized(SynchronizedBase):
value

Array

1
2
3
4
5
6
7
8
9
def Array(typecode_or_type, size_or_initializer, *,
lock=True)

def SynchronizedArray(SynchronizedBase):
def __getitem__(self, i):
def __getslice__(self, start, stop):
def __len__(self):
def __setitem(self, i, value):
def __setslice__(self, start, stop, values):

Manager

常与Pool模块一起使用,共享资源,不能使用QueueArray

其他方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
import multiprocessing as mltp
l1 = mltp.Lock()
# 获取进程锁
rl1 = mltp.RLock()
# 获取可重入锁
s1 = mltp.Semaphore(value=int)
# 获取信号量对象
bs1 = mltp.BoundedSemaphore(value=int)
# 获取有上限信号量对象
e1 = mltp.Event()
# 获取事件对象
cv1 = mltp.Condition(lock=None)
# 获取Condition对象
cp = mltp.current_process()
# 获取当前Process对象

concurrent.futures

  • 异步并发模块

ThreadPoolExecutor

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
class ThreadPoolExecutor:
def __init__(self,
max_workers=None,
thread_name_prefix=''):

def shutdown(self, wait=True):
# 清理和该执行器相关资源

def submit(self, fn(callable), *args, **kwargs):
# 以指定参数执行`fn`
# 返回:代表调用的future,可以用于获取结果

def map(self, fn, *iterables,
timeout=None,
chunksize=1/int)
# 并行`map(fn, iterables)`
# 返回:按调用顺序的结果迭代器
  • 用途:创建线程池

.shutdown

  • 用途:关闭执行器,清理和该执行器相关的资源

    • 可以多次调用,调用之后不能进行其他操作
  • 参数

    • wait:阻塞,直到所有运行futures执行完毕,所有 资源被释放

ProcessPoolExecutor

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
class ProcessPoolExecutor:
def __init__(self,
max_workers=None):

def shutdown(self, wait=True):
# 清理和该执行器相关资源

def submit(self, fn(callable), *args, **kwargs):
# 以指定参数执行`fn`
# 返回:代表调用的`Future`实例,可以用于后续处理

def map(self, fn,
*iterables,
timeout=None,
chunksize=1/int)
# 并行`map(fn, iterables)`
# 返回:按调用顺序的结果迭代器
  • 用途:创建进程池

    • 参考ThreadPoolExecutor.map方法支持chunksize 参数
  • 常使用with语句使用

    1
    with ProcessPoolExecutor() as Pool:
    • 处理池执行完with语句块中后,处理池被关闭
    • 程序会一直等待直到所有提交工作被完成

注意事项

  • 被提交的任务必须是简单函数形式,不支持方法、闭包和 其他类型可执行

  • 函数参数、返回值必须兼容pickle模块,因为进程间通信 交换数据使用其序列化

  • 函数不要修改环境

    • 被提交的任务函数出打印日志之类等简单行为外,不应该 有保留状态、副作用
  • 混合使用进程池、多线程时

    • 创建任何线程之前先创建、激活进程池
    • 然后线程使用同样的进程池进行计算密集型工作

Future

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
class Future:
def add_done_callback(self, fn):
# 添加future完成的回调函数
# 多次调用添加的回调函数按添加顺序执行

bool = cancel(self):
# 尝试取消当前future,返回是否成功取消
# 正在运行、执行完成的future不能被取消

bool = cancelled(self):
# 查看当前future是否被取消

bool = done(self):
# 查看当前future是否被取消、完成

def exception(self, timeout=None):
# 返回当前future代表的调用的exception

def result(self, timeout=None):
# 返回当前future代表调用的返回值
  • 用途:代表一个异步计算的结果
    • 直接创建对象无价值

socket模块

Python类说明

元类说明

type

type:元类,python中所有类都由type创建

1
2
3
4
5
class = type(
name=cls_name,
bases=(pls_name,..),
dict={attr1: val1,...}
)
  • 参数

    • cls_name:类名称
    • bases:父类元组
    • dict:类方法、属性
  • 返回值:类的别名

元类作用

  • 拦截类创建->修改类->返回修改后的类(创建类对象

  • 元类的作用和函数相似

    • python并不关心类对象是否是由真正的元类创建
    • 可以指定元类为一个函数,而非继承自type的元类
  • 但仍应尽量将元类指定为继承自type的对象

    • 元类应用场景一般比较复杂,使用类可以更好的管理代码
    • 默认元类是type类,类继承保持一致性意图比较明显,且 可以使用type中的方法
    • 元类可以使用一些类中特有的方法:__new____init__
  • 如果不确定是否需要用元类,就不应该使用

自定义元类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
class UpperAttrMeta(type):
def __new__(cls, cls_name, bases, attrs):
upper_attrs=dict((name.upper(), val)
for name,val in attrs.items()
if not name.startswith('__')
);
return super(UpperAttrMeta,cls).__new__(
cls,
cls_name,
bases,
upper_attrs);
// 使用元类创建新类

class Foo(metaclass=UpperAttrMeta):
b=1;

使用自定义元类UppAttrMeta创建的类Foo中定义的__init____new__等函数无意义,因为该类不仅是通过元类创建,也是 通过元类初始化

  • Foo通过UpperAttrMeta创建,而UppAttrMeta本身没有 实现自定义__init__,默认继承于object

    因此Foo类的创建就有object的init完成 segmentfault.com/q/1010000004438156 这个是原话,不明白什么意思了

  • 但是如果元类仅仅是pass,如下:

    1
    2
    class MetaCls(type):
    pass;

    使用此自定义元类,类定义中的__init____new__有效

类创建

py2自定义元类

python创建类对象步骤

  • __metaclass__指定创建类使用的元类

    • 按照优先级:类定义内 > 祖先类内 > 模块内 > type, 查找__metaclass__,并使用其创建类对象,若前三者 均未定义__metaclass__,则使用type创建

    • 自定义元类就是为__metaclass__指定自定义值

    • python只是将创建类的参数传递给__metaclass__,并不 关心__metaclass__是否是一个

      • cls()返回一个类对象,是相当于调用cls.__new__
      • 所以可以指定__metaclass__为一个函数
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
		
def upper_attr(cls_name, bases, attrs):
upper_attrs=dict((name.upper(), val) for name,val in attrs.items());
return type(cls_name, bases, upper_attrs);

class Foo():
bar=1;
__metaclass__=upper_attr;

# 函数版本

class UpperAttrMeta(type):
def __new__(clsf, cls_name, bases, attrs):
upper_attrs=dict((name.upper(), val) for name,val in attrs.items());
return type(cls_name, bases, upper_attrs);

class Foo():
bar=1;
__metaclass__=UpperAttrMeta;

# 类版本1

class UpperAttrMeta(type):
def __new__(cls, cls_name, bases, attrs):
upper_attrs=dict((name.upper(), val) for name,val in attrs.items());
return type.__new__(cls, cls_name, bases, upper_attrs);

# 类版本2

class UpperAttrMeta(type):
def __new__(cls, cls_name, bases, attrs):
upper_attrs=dict((name.upper(), val) for name,val in attrs.items());
return super(UpperAttrMeta,cls).__new__(cls, cls_name, bases, upper_attrs);

# 类版本3

元类示例

缓存实例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
import weakref

class Cached(type):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.__cache = weakref.WeakValueDictionary()

def __call__(self, *args):
if args in self.__cache:
return self.__cache[args]
else:
obj = super().__call__(*args)
self.__cache[args] = obj
return obj

class Spam(metaclass=Cached):
def __init__(self, name):
print("Creating Spam({!r})".format(name))
self.name = name

捕获类属性定义顺序

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
from collection import OrderedDict

class Typed:
_expected_type = type(None)
def __init__(self, name=None):
self._name = name

def __set__(self, instance, value):
if not instance(value, self_expected_type):
raise TypeError("expected" + str(self._expected_type))
instance.__dict__[self._name] = value

class Integer(Typed):
_expected_type = int

class Float(Typed):
_expected_type = float

class String(Typed):
_expected_type = str

class OrderedMate(type):
def __new__(cls, clsname, bases, clsdict):
d = dict(clsdict)
order = [ ]
for name, value in clsdict.items():
if isinstance(value, Typed):
value._name = name
order.append(name)
d["_order"] = order
return type.__new__(cls, clsname, bases, d)

@classmethod
def __prepare__(cls, clsname, bases):
# 此方法会在开始定义类、其父类时执行
# 必须返回一个映射对象,以便在类定义体中使用
return OrderedDict()

class Structure(metaclass=OrderedMeta):
def as_csv(self):
return ",".join(str(getattr(self, name)) for name in self._order)

class Stock(Structure):
name = String()
shares = Integer()
price = Float()

def __init__(self, name, shares, price):
self.name = name
self.shares = shares
self.price = price

有可选参数元类

为了使元类支持关键字参数,必须在__prepare____new____init__方法中使用KEYWORDS_ONLY关键字参数

1
2
3
4
5
6
7
8
9
10
11
12
13
class MyMeta(type):
@classmethod
def __prepare__(cls, name, bases, *, debug=False, synchronize=False):
pass
return super().__prepare(naeme, bases)

def __new__(cls, name, bases, *, debug=False, synchronize=False):
pass
return super().__new__(cls, name, bases, ns)

def __init__(self, name, bases, ns, *, debug=False, synchronize=False):
pass
super().__init__(name, base, ns)

Python IO、持久化

DBM

DBM文件是python库中数据库管理的标准工具之一

  • 实现了数据的随机访问
    • 可以使用键访问存储的文本字符串
  • DBM文件有多个实现
    • python标准库中dbm/dbm.py

使用

  • 使用DBM文件和使用内存字典类型非常相似
    • 支持通过键抓取、测试、删除对象

pickle

  • 将内存中的python对象转换为序列化的字节流,可以写入任何 输出流中
  • 根据序列化的字节流重新构建原来内存中的对象
  • 感觉上比较像XML的表示方式,但是是python专用
1
2
3
4
5
6
7
import pickle
dbfile = open("people.pkl", "wb")
pickle.dump(db, dbfile)
dbfile.close()
dbfile = open("people.pkl", "rb")
db = pickle.load(dbfile)
dbfile.close()
  • 不支持pickle序列化的数据类型
    • 套接字

shelves

  • 就像能必须打开着的、存储持久化对象的词典
    • 自动处理内容、文件之间的映射
    • 在程序退出时进行持久化,自动分隔存储记录,只获取、 更新被访问、修改的记录
  • 使用像一堆只存储一条记录的pickle文件
    • 会自动在当前目录下创建许多文件
1
2
3
4
5
6
import shelves
db = shelves.open("people-shelves", writeback=True)
// `writeback`:载入所有数据进内存缓存,关闭时再写回,
// 能避免手动写回,但会消耗内存,关闭变慢
db["bob"] = "Bob"
db.close()

SQLite

SQLAlchemy

通信、锁

函数通信

  • 简单通信方式:允许进程向其他进程发送简单信息

    • 命令行参数
    • 信号:受控于操作系统的非同步事件机制
    • shell环境变量
    • 程序退出状态码
    • 简单文件
  • 匿名管道:允许共享文件描述符的线程、进程传递数据

    • 仅在进程内部存在,通常和进程分支合用作为父进程、 子进程之间的通信手段、线程之间通信

    • 依赖于类Unix下进程分支模型,可移植性差

  • 具名管道FIFO:映射到系统的文件系统,允许不相关程序 进行交流

    • 是真正的外部文件,通过标准文件接口实现
    • 可以独立启动,局限于本地进程通信时可以替代套接字
    • 相较于普通外部文件,操作系统同步化FIFO访问,使之 适合IPC
  • 套接字socket:映射到系统级别端口号

    • 允许远程联网机器程序之间交流
    • 可移植性较好,几乎所有平台都支持
  • 共享内存:消息保存在函数外部,不随着函数栈清除消失

    • 全局变量
    • 类中变量

  • 线程/进程调度本质上是不确定的

  • 并行编程中错误使用锁机制可能会导致随机数据损坏、其他 异常行为,即竞争条件

  • 最好在临界区(对临界资源进行操作的部分代码)使用锁

死锁、预防

  • 线程/进程同时获取多个锁

避免、预防方案

  • 尽可能保证每个线程/进程只能同时保持一个锁

  • 为程序中每个锁分配唯一ID,然后只允许按照升序规则使用 多个锁

    • 单纯按照对象ID递增顺序加锁不会产生循环依赖,而循环 依赖时死锁必要,从而避免进入死锁状态
    • 可以数学上证明,这样能保证程序不会进入死锁状态

检测、恢复方案

  • 引入看门狗计数器:
    • 线程正常运行时每隔一段时间重置计数器
    • 没有发生死锁时正常运行
    • 一旦发生死锁,无法重置计数器导致计数器超时,程序通过 重启恢复自身状态

Python

contextlib

contextlib.contextmanager

  • 用途:上下文实现装饰器

    • 实现try...finally...语句的生成器上下文管理器语法
    • try部分:生成器部分,with语句进入时执行
    • finally部分:清理部分,with语句退出时执行
  • 用法

    • 定义

      1
      2
      3
      4
      5
      6
      7
      @contextmanager
      def some_generator(<parameters>):
      <setup>
      try:
      yield <value>
      finally:
      <cleanup>
    • 用法

      1
      2
      with some_generator(<argrument>) as <variable>:
      <body>
    • 等价于

      1
      2
      3
      4
      5
      6
      <setup>
      try:
      <variable> = <value>
      <body>
      finally:
      <cleanup>