Python系统编程

综述

主要标准模块

  • os:与Python所在底层操作系统相对应变量、函数

  • sys:与Python解释器本身相关的组件

  • 文件、目录

    • glob:文件名扩展
    • stat:文件信息
  • 并行开发

    • threading_threadqueue:运行、同步并发线程
    • subprocessmultiprocessing:启动、控制并行进程
    • socket:网络连接、进程间通信
  • 系统

    • timetimeit:获取系统时间等相关细节
    • signalselectshutiltempfile:多种系统 相关任务

说明

  • Python中大部分系统级接口都集中在模块sysos

  • 以上模块之间具体实现不总是遵循规则

    • 标准输入、输出流位于sys中,但可以将视为与操作系统 模式相关
  • 一些内建函数实际上也是系统接口

    • open

Sys模块

平台、版本

1
2
3
4
5
6
7
8
9
10
11
12
import sys
sys.platform
# 操作系统名称
sys.maxsize
# 当前计算机最大容纳“原生”整型
# 一般就是字长
sys.version
# python解释器版本号
sys.byteorder
# 平台字节序
sys.hash_info
# 数值类型hash信息

sys.xxxcheckinterval

1
2
3
4
sys.getcheckinterval()
# 查看解释器检查线程切换、信号处理器等的频率
sys.setcheckinterval(N)
# 设置解释器检查线程切换、信号处理器等的频率
  • 参数

    • N:线程切换前执行指令的数量
  • 对大多数程序无需更改此设置,但是可以用于调试线程性能

    • 较大值表示切换频率较低,切换线程开销降低,但是对事件 的应答能力变弱
    • 较小值表示切换频率较高,切换线程开销增加,对事件应答 能力提升

sys.hash_info

1
2
sys.hash_info.width
# `hash()`函数截取hash值长度

模块搜索路径

1
sys.path
  • 返回值:目录名称字符串组成的列表
    • 每个目录名称代表正在运行python解释器的运行时模块 搜索路径
    • 可以类似普通列表在运行时被修改、生效

sys.path初始化顺序

  • 脚本主目录指示器:空字符串

    • 脚本主目录是指脚本所在目录,不是os.getcwd() 获取的当前工作目录
  • PYTHONPATH环境变量

    1
    2
    # .bashrc
    export PYTHONPATH=$PYTHONPATH:/path/to/fold/contains/module
  • 标准库目录

  • .pth路径文件:在扫描以上目录过程中,遇到.pth文件会 将其中路径加入sys.path

    1
    2
    # extras.pth
    /path/to/fold/contains/module

导入模块顺序

导入模块时,python解释器

  1. 搜索内置模块,即内置模块优先级最高
  2. 从左至右扫描sys.path列表,在列表目录下搜索模块文件

嵌入解释器的钩子

1
2
3
4
5
6
sys.modules
# 已加载模块字典
sys.builtin_module_names
# 可执行程序的内置模块
sys.getrefcount()
# 查看对象引用次数

异常

1
sys.exc_info()
  • 返回值:(type, value, trackback)
    • 最近异常的类型、值、追踪对象元组
    • 处理该异常的except执行之后,sys.exc_info被恢复 为原始值
  • 追踪对象可以使用traceback模块处理

命令行参数

1
sys.argv
  • 返回值:命令行参数列表
    • 首项始终为执行脚本名称,交互式python时为空字符串
  • 参数可以自行解析,也可以使用以下标准库中模块
    • getopt:类似Unix/C同名工具
    • optparse:功能更加强大

标准流

1
2
3
4
5
6
sys.stdin
# 标准输入流
sys.stdout
# 标准输出流
sys.stderr
# 标准错误流
  • 标准流是预先打开的python文件对象

    • 在python启动时自动链接到程序上、绑定至终端
    • shell会将相应流链接到指定数据源:用户标准输入、文件

重定向

  • 可以将sys.stdinsys.stdout重置到文件类的对象,实现 python内部的普遍的重定向方式

    • 外部:cmd输入输出重定向
    • 局部:指定print参数
  • 任何方法上与文件类似的对象都可以充当标准流,与对象类型 无关,只取决于接口

    • 任何提供了类似文件read方法的对象可以指定给 sys.stdin,以从该对象read读取输入

    • 任何提供了类似文件write方法的对象可以指定给 sys.write,将所有标准输出发送至该对象方法上

  • 标准库io提供可以用于重定向的类StringIOByteIO
  • 重定向之后printinput方法将应用在重定向之后的流

stdin

1
2
3
4
5
6
7
8
stdin.read()
# 从标准输入流引用对象读取数据
input("input a word")
sys.stdin.readlines()[-1]
# 以上两行语句类似

stdin.isatty()
# 判断stdin是否连接到终端(是否被重定向)
  • 在stdin被重定向时,若需要接受用户终端输入,需要使用 特殊接口从键盘直接读取用户输入
    • win:msvcrt模块
    • linux:读取/dev/tty设备文件

退出

1
sys.exit(N)
  • 用途:当前线程以状态N退出
    • 实际上只是抛出一个内建的SystemExit异常,可以被正常 捕获
    • 等价于显式raise SystemExit
  • 进程退出参见os._exit()

sys.exitfuncs

1
sys.exitfuncs

编码

1
2
3
4
5
sys.getdefaulencoding()
# 文件内容编码,平台默认值
# 默认输入解码、输出编码方案
sys.getfilesystemencoding()
# 文件名编码,平台默认体系
  • win10中二者都是utf-8,win7中文件名编码是mbcs

os模块

  • os模块提供了POSIX工具

    • 操作系统调用的跨平台移至标准
    • 不依赖平台的目录处理工具
      • os.path
  • 包含在C程序、shell脚本中经常用到的所有操作系统调用,涉及 目录、进程、shell变量

  • 实践中,os基本可以作为计算机系统调用的可移植接口 使用

    • 只要技术上可行,os模块都能跨平台
    • 但在某些平台,os提供专属该平台的工具

Shell变量

1
2
3
4
5
os.environ
# 获取、设置shell环境变量,类似字典
os.putenv()
# 修改进程对应shell环境变量
os.getenv()

os.environ

os.environ可以向普通字典一样键索引、赋值

  • 默认继承系统所有环境变量、命令行临时环境变量

  • 在最新的python中,对os.environ的键值修改将自动导出 到应用的其他部分

    • os.environ对象
    • 进程对应shell环境变量:通过后台调用os.putenv生效, 反之不会更新os.environ
  • python进程、链入C模块、该进程派生子进程都可以获取新的 赋值

    • 子进程一般会继承父进程的环境变量设定
    • 可以作为传递信息的方式

os.putenv

  • os.putenv同时会调用C库中的putenv(若在系统中可用) 导出设置到python链接的C库

    • 底层C库没有putenv则可将os.environ作为参数传递

管理工具

1
2
3
4
5
6
os.getpid()
# 调用函数的进程id
os.getcwd()
# 当前工作目录CWD
os.chdir(r"C:\Users")
# 更改当前工作目录CWD

移植工具

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
os.sep
# python底层运行平台采用的**目录组**分隔符号
# linux: `/`、win:`\`、某些mac:`:`
os.pathsep
# 目录列表(字符串形式)中分隔目录的字符
# posix机:`:`、win:`;`
os.curdir
# 当前目录代表
# linux:`.`
os.pardir
# 父目录代表
# linux:`..`
os.linesep
# 换行符
# linux:`\n`

||Linux|Win|Unix| |———|——————|———|———| |sep|/|\|/(某些MAC:)| |pathsep|:|;|| |curdir|.||| |pardir|..||| |linesep|\n|\r\n||

  • 借助这些变量可以系统相关字符串操作的跨平台
  • win下目录组分隔符是\,大部分情况下看到\\是作为\ 转义字符,防止\和之后字符转义
    • 确认不会转义时,直接使用\也是可以的
    • 使用r''表示不转义也可以直接使用\

路径名工具

判断存在

1
2
3
4
5
os.path.isdir(r"C:\Users")
os.path.isfile(r"C:\Users")
# 判断路径名是简单文件、目录
os.path.exists(r"C:\Users")
# 判断路径名是否存在
  • os.stat配合stat模块有更丰富的功能

路径操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
pfile = os.path.join(r"C:\temp", "output.txt")
# 连接文件名、目录

os.path.split(pfile)
# 分离文件名、目录

os.path.dirname(pfile)
# 返回路径中目录
os.path.basename(pfile)
# 返回路径中
os.path.splitext(pfile)
# 返回文件扩展名

os.path.normpath(r"C:\temp/index.html")
# 调整路径为当前平台标准,尤其是分隔符混用时
os.path.abspath("index.html")
# 返回文件的**完整绝对路径名**
# 扩展`.`、`..`等语法
  • os.sep配合字符串.join.split方法可以实现基本相同 效果

目录、文件操作

1
2
3
4
5
6
7
8
9
10
11
12
os.mkdir(dirname)
os.rename(ori_name, dest_name)
os.remove(filename)
os.unlink(filename)
# unix下文件删除,同`os.remove`
os.chmod(filename, previlideges)
info = os.stat(filename)
# 命名元组表示的文件底层信息
# 可使用`stat`模块处理、解释信息
os.listdir(dirpath)
os.walk(rootdir, topdown=True/False)
# 遍历根目录下的整个目录树

os.listdir

  • 返回值:包含目录中所有条目名称的列表

    • 名称不带目录路径前缀
  • 需要注意的是:文件名同样有编码

    • 若参数为字节串,返回文件名列表也是字节串
    • 参数为字符串,返回文件名列表也是字符串
    • open函数也可以类似使用字节串确定需要打开的文件
    • glob.globos.walk内部都是通过调用os.listdir 实现,行为相同
  • glob模块也有遍历目录的能力

os.walk

  • 返回值:返回迭代器

    • 每个元素为(dirname, subdirs, subfile)
  • 参数

    • topdown:默认True,自顶向下返回

文件描述符、文件锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
descriptor = os.open(path, flags, mode)
# 打开文件并返回底层描述符
os.read(descriptor, N)
# 最多读取N个字节,返回一个字节串
os.write(descriptor, string)
# 将字节串写入文件
os.lseek(descriptor, position, how)
# 移动文件游标位置
descriptor.flush()
# 强制刷出缓冲

new_fd = os.dup(fd)
# 创建文件描述符副本
os.dup2(fd_src, fd_dest)
# 将文件描述符`fd_src`复制至`fd_dest`
  • os通过调用文件的描述符来处理文件

  • 基于文件描述符的文件以字节流形式处理

    • 没有字符解码、编码、换行符转换
    • 除了缓冲等额外性能,基于描述符的文件和二进制文件模式 对象类似
  • 文件流对象、工具仅仅是在基于描述符的文件的封装

    • 可以通过.fileno()获得文件流对象对应文件描述符, sys.stdinsys.stdoutsys.stderr对应文件 描述符是:0、1、2

      1
      2
      os.write(1, b"hello world\n")
      sys.stdout.write("hello world\n")
    • 可以通过os.fdopen把文件描述符封装进文件流对象

      1
      2
      3
      fdfile = os.open("filename", (os.O_RDWR|os.O_BINARY))
      filstream = os.fdopen(fdfile, "r", encoding="utf-8",
      closefd=False)

os.open

1
2
3
4
5
def os.open(path,
flags,
mode=511, *,
dir_fd=None
)
  • 参数

    • mode:需要模式标识符进行二进制操作以得到需要的模式

      • os.O_RDWR
      • os.O_RDONLY
      • os.O_WRONLY
      • os.O_BINARY
      • os.O_EXCL:唯一访问权,是python在并发、进程 同步情况下锁定文件最便捷的方法
      • os.O_NONBLOCK:非阻塞访问
      • 其他模式选项参见os模块
  • 返回值:文件描述符

    • 整数代码、句柄,代表操作系统的中文件

退出进程

1
2
os._exit(0)
# 调用进程立即退出,不输出流缓冲、不运行清理处理器

异常信息

trackback模块

1
2
3
4
5
6
7
8
import traceback, sys

try:
...
except:
exc_info = sys.exec_info()
print(exec_info[0], exec_info[1])
traceback.print_tb(exec_info[2])

参数处理

getopt模块

optparse模块

文件、目录

stat模块

  • 包含os.stat信息相关常量、函数以便跨平台使用
1
2
3
4
5
6
7
8
import stat

info = os.stat(filename)
info[stat.ST_MODE]
# `stat.ST_MODE`就是字符串
# 只是这样封装易于跨平台
stat.S_ISDIR(info.st_mode)
# 通过整数`info.st_mode`判断是否是目录
  • os.path中包含常用部分相同功能函数

glob模块

glob.glob

1
2
3
import glob

def glob.glob(pathname,*,recursive=False)
  • 参数

    • pathname:文件名模式
      • 接受shell常用文件名模式语法
        • ?:单个字符
        • *:任意字符
        • []:字符选集
      • .开头路径不被以上?*匹配
    • recursive
      • False:默认
      • True**将递归匹配所有子目录、文件
  • 返回值:匹配文件名列表

    • 目录前缀层次同参数
  • glob.glob是利用glob.fnmatch模块匹配名称模式

struct模块

struct模块:用于打包、解压二进制数据的调用

  • 类似于C语言中struct声明,需要指定二进制中数据类型
  • 可以使用任何一种字节序(大、小端)进行组合、分解
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
import struct
data = struct.pack(">i4shf", 2, "spam", 3, 1.234)
# `>`:高位字节优先,大端
# `i`:整形数据
# `4s`:4字符字符串
# `h`:半整数
# `f`:浮点数
file = open("data.bin", "wb")
file.write(data)
# 二进制写入字节串
file.close()

file = open("data.bin", "rb")
bytes = file.read()
values = struct.unpack(">i4shf", data)
# 需要给出字节串存储格式

shutil模块

shutil模块:包含文件操作相关

todo

系统、信息

locale模块

1
2
3
4
import locale

locale.getpreferredencoding()
# 获取平台默认编码方案

dis模块

1
2
def dis.dis(func)
# 打印可拆解函数语句对应机器指令

atexit模块

atexit:主要用于在程序结束前执行代码

  • 类似于析构,主要做资源清理工作

atexit.register

1
2
3
4
5
def register(
func,
*arg,
**kwargs
)
  • 用途:注册回调函数
    • 在程序退出之前,按照注册顺序反向调用已注册回调函数
    • 如果程序非正常crash、通过os._exit()退出,注册回调 函数不会被调用
1
2
3
4
5
6
7
8
9
10
11
12
13
14
import atexit

df func1():
print("atexit func 1, last out")

def func2(name, age):
print("atexit func 2")

atexit.register(func1)
atexit.register(func2, "john", 20)

@atexit.register
def func3():
print("atexit func 3, first out")

实现

atexit内部是通过sys.exitfunc实现的

  • 将注册函数放到列表中,当程序退出时按照先进后出方式 调用注册的回调函数,

  • 若回调函数执行过程中抛出异常,atexit捕获异常然后继续 执行之后回调函数,知道所有回调函数执行完毕再抛出异常

  • 二者同时使用,通过atexit.register注册回调函数可能不会 被正常调用

signal模块

信号模块

Python 函数式编程

functools

total_ordering

total_ordering:允许类只定义__eq__和其他中的一个,其他 富比较方法由装饰器自动填充

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
from functools import total_ordering

class Room:
def __init__(self, name, length, width):
self.name = name
self.length = length
self.width = width
self.square_feet = self.length * self.width

@total_ordering
class House:
def __init__(self, name, style):
self.name = name
self.style = style
self.rooms = list()

@property
def living_space_footage(self):
return sum(r.square_feet for r in self.rooms)

def add_room(self, room):
self.rooms.append(room)

def __str__(str):
return "{}: {} squre foot {}".format(
self.name,
self.living_space_footage,
self.style)

def __eq__(self, other):
return self.living_space_footage == other.living_space_footage

def __lt__(self, other):
return self.living_space_footage < other.living_space_footage

并行开发

综述

python并行多任务均依赖操作系统底层服务并行执行python代码

  • 线程派生:基本所有主流平台均支持
  • 多进程
    • shell命令进程
    • 子python进程

跨平台多进程实现

  • 创建子python进程
    • 类Unix:fork系统调用实现进程分支,分支进程运行时 环境同主进程完全一致
    • Win:创建python进程,import当前调用代码得到类似 主进程运行时环境
  • pickle序列化被调函数,传递给子进程执行
  • 因为Win下分支进程需要导入当前调用,所以多进程代码必须 在__main__内,否则无限循环创建新进程
  • 进程池调用函数要声明在进程池创建之前,否则启动进程会报错

进程通信

  • python进程间数据传递基本都是通过pickle序列化传递,所以 需要传递的数据要支持pickle序列化

  • multiprocessing等模块派生进程需要传递被调函数,所以 不支持

    • lambda匿名函数
    • 绑定对象方法

其他相关模块、包

  • 多进程:pathospp
  • 进程通信:signal

注意事项

  • 子进程报错直接死亡,错误信息默认不会输出到任何地方,所以 子进程中,多使用try catch

os模块

派生进程

1
2
3
4
5
6
os.startfile(filename)
# 调用系统默认程序打开文件,类似于鼠标双击
# linux下没有实现
int = os.system(command(str))
# 运行shell命令,返回命令执行退出状态
# 和普通shell命令一样默认阻塞,除非后台运算符`&`

os.popen

1
2
pipe = os.popen(cmd(str), mode="r"/"w", buffering=-1)
# 运行shell命令并重定向其输出、输入流到管道开放
  • 用途:运行shell命令并重定向其输出、输入流到管道开放

  • 返回值:返回管道流对象

    • 类似shell中管道语法
    • 管道流对象类似普通文件流,支持一般读、写、迭代 (但是文档中只有close方法)

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      # `r`模式执行,读取子进程标准输出
      pipe.readlines()
      pipe.readline()
      pipe.read()
      for i in pipe:
      # 迭代器语法
      # `w`模式执行,写入子进程标准输入
      pipe.write()
      pipe.close()
      # 返回退出状态
      # 历史原因,退出状态为0时返回None
  • os.popen一般不会阻塞,但是python执行需要整个命令行程序 完成的语句时仍然会阻塞

    • 关闭管道对象
    • 一次性读取所有输出流
  • subprocess模块可以实现与os.systemos.popen相同的 效果,使用更复杂,但是对流的连接、使用提供更完善的控制

os.fork

进程分支是构建平行任务的传统做法,是Unix工具集的基本组成部分

  • 分支是开始独立程序的直接做法,无论调用程序是否相同
  • 分支想法基于复制
    • 程序调用分支例行程序时,操作系统会创建在该进程副本
    • 和进程并行的运行副本
    • 有些系统实现并不真的复制原有程序,以减少资源消耗, 但是新副本会像真实副本一样运行
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
import os

def child():
print("hello from child", os.getpid())
os._exit(0)

def parent():
while True:
newpid = os.fork()
# 子进程返回0
# 父进程返回子进程进程号
if newpid == 0:
child()
else:
print("hello from parent", os.getpid(), newpid)
if input() == "q":
break
  • 返回值:据此区分父、子进程,执行不同任务

    • 子进程中返回0
    • 父进程中返回子进程ID
  • os.fork仅仅是系统代码库中标准进程分支调用简单封装

    • 和C共用代码库
    • 在win下标准python版本中无法运行,和win模型冲突过多
    • Cygwin中python可以运行,虽然行为同真正Unix分支不完全 相同,但是足够接近
  • os.fork实际复制整个python解释器

    • 除非python脚本被编译为二进制机器码

os.exec

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
def execv(path, args=tuple/list)
def execl(path, *args=*list/*tuple)
# 以参数执行指定可执行文件,**代替**当前进程

def execvp(file, args)
def execlp(file, *args)
# `p` for `$PATH`
# 在系统搜索路径`$PATH`中定位可执行程序

def execve(path, args=tuple/list, env=dict)
def execle(path, args=*tuple/*list, env=dict)
# `e` for `environ`
# 将`env`字典作为环境环境变量传递

def execvpe(path, args=tuple/list, env=dict)
def execlpe(path, args=*tuple/*list, env=dict)
# 在系统搜索路径`$PATH`中定位可执行程序
# 将`env`字典作为环境环境变量传递
  • os.exec族的函数会覆盖当前进程

    • 所以该语句的代码都不会执行
    • 不会更改进程号
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    import os
    param = 0
    while True:
    param += 1
    newpid = os.fork()
    if newpid == 0:
    os.execlp("python", "child.py", str(param))
    assert False, "starting new program error"
    # 上句正常执行,这句永远无法被调用
    else:
    print("child is", pid)
    if input() == "q": break
  • multiprocessing模块的进程派生模型+os.exec配合使用 可以在win下实现类似os.fork的效果

spawn

1
2
os.spawn
# 启动带有底层控制的新程序

进程通信

1
2
3
4
5
6
read_fd, write_fd = os.pipe()
# 创建管道,返回管道写入、读取描述符
# 可以使用`os.fdopen()`封装管道描述符,方便读、写
os.mkfifo(path, mode=438, *, dir_fd=None)
# 创建命名管道,win不支持
# 仅创建外部文件,使用需要像普通文件一样打开、处理

subprocess模块

Popen

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
class Popen:
def __init__(self,
args(str,[str]),
bufsize=-1/0/1/int,
executable=None/str,
stdin=None/stream,
stdout=None/stream,
stderr=None/stream,
preexec_fn=None/callable,
close_fd=obj,
shell=False/True,
cwd=None/path,
env=None/dict,
universal_newlines=False/True,
startupinfo=None,
creationflags=0,
restore_signals=True/False,
start_new_session=False/True,
pass_fds=(),
encoding=None/str,
errors=None
)
  • 用途

  • 参数

    • args:需要执行的命令

    • executable:备选执行命令

    • stdin/stdout/stderr:执行程序标准输入、输出、 错误流连接对象

      • 默认:当前进程标准输入、输出、错误
      • subprocess.PIPE=-1:当前管道对象标准…
    • preexec_fn:子进程执行前在子进程中调用的对象

      • POSIX only
    • close_fds:控制关闭、继承文件描述符

    • shell:是否通过shell执行命令

      • 执行shell内置命令则必须置True
        • win:type
        • linux:set
      • Linux下False:由os.execvp运行
    • cwd:子进程执行目录

    • env:子进程环境变量

    • universal_newlines:是否在3个标准流中使用行结尾符

      • 即是否按照文本处理3个标准流
    • startupinfo:windows only

    • restore_signals:POSIX only
    • start_new_session:POSIX only
    • pass_fds:POSIX only

.communicate

1
(stdout, stderr) = communicate(self, input=None, timeout=None)
  • 用途:和子进程交互,阻塞直到子进程终止
    • input传递给子进程标准输入
    • 返回标准输出、错误输出
    • universal_newlinesTrue时,input参数、返回值 应该为str,否则为bytes

其他方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
def kill(self):
# kill进程通过*SIGKILL*信号

def terminate(self):
# 终止进程通过*ISGTERM*信号

def send_signal(self, sig):
# 向进程传递信号

def poll(self):
# 检查子进程是否终止,设置、返回`.returncode`属性

def wait(self, timeout=None, endtime=None):
# 等待直到子进程终止,返回`.returncode`属性

pipe.stdin
pipe.stdout
pipe.stderr
# 管道标准输入、输出、错误流
# 创建子进程时,可以选择和子进程相应流连接
# 支持读、写、迭代
pipe.returncode
# 子进程退出状态
  • Popen创建对象对象之后会立刻执行

  • 同时指定stdoutstdin参数可以实现管道双工工作

    • 需要注意,读写时交替发送缓冲数据流可能导致死锁

call

1
2
3
4
subprocess.call(
"type hello.py",
shell=True/False
)

_thread

  • 为系统平台上的各种线程系统提供了可移植的接口
  • 在安装了pthreads POSIX线程功能的系统上,接口工作方式 一致,无需修改源码即可正常工作
  • 基本可以完全被threading模块替代了

start_new_thread

1
2
3
4
5
6
7
def start_new_thread(
callable,
args=tuple/list,
kwargs=dict
)
def start_new():
# deprecated,同上
  • 用途:开启新线程,以参数调用callable

  • 返回值:应该是线程起始地址

  • 派生线程在函数返回后退出

    • 若在线程中函数抛出未捕捉异常,打印堆栈跟踪记录、退出 线程
    • 程序其他部分继续运行
  • 大多数系统平台上,整个程序主线程退出时,子线程随之退出

    • 需要一些处理避免子线程意外退出

其他方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
Lock = _thread.alloacate_lock()
# 获取一个`Lock`锁
# 等价于`threading.Lock()`
Lock = _thread.allocate()
# deprecated,同上

RLock = _thread.RLock()
# 获取一个`RLock`可重入锁
# 等价于`threading.RLock()`

def _thread.exit()
# 退出当前线程,可捕捉
# 等价于显式`raise SystemExit`、`sys.exit()`
def _thread.exit_thread()
# 同上

例子

例1

  • 全局线程锁保护对输出流写入
  • 全局线程锁实现主线程、子线程间通信,保证主线程在子线程 之后退出
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
import _thread as thread

stdoutmutex = thread.allocate_lock()
# 创建全局标准输出锁锁对象
exitmutexes = [thread.allocate_lock() for _ in range(10)]
# 为每个线程创建锁
exitmutexes_bool = [False] * 10
# 线程共享内存,同样可以使用全局变量作为信号量,而不用
# 额外开销

def counter(myId, count):
for i in range(count):

stdoutmutex.acquire()
# 线程向标准输出流写入时,获得锁
print("[%s] => %s" % (myId, i))
stdoutmutex.release()
# 向标准输出流写入完毕后,释放锁

with stdoutmutex:
# 线程锁同样支持`with`上下文管理器
print("[%s] => %s again" % (myId, i))

exitmutexes[myID].acquire()
# 线程执行完毕后获取对应自身id的锁,通知主线程

for i in range(10):
thread.start_new_thread(counter, (i, 100))
# 创建、启动新线程

for mutex in existmutexes:
# 检查所有信号锁
while not mutex.locked():
# 直到信号锁被获取,结束死循环
pass
print("main thread exiting")

例2

  • with上下文管理器使用锁
  • 全局变量实现主线程、子线程通信,避免主线程在子线程之前 退出
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
import _thread as thread
import time

stdoutmutex = thread.allocate_lock()
exitmutexes_bool = [False] * 10

def counter(myId, count):
for i in range(count):
with stdoutmutex:
# 线程锁同样支持`with`上下文管理器
print("[%s] => %s again" % (myId, i))
exitmutexes[myID] = True

for i in range(10):
thread.start_new_thread(counter, (i, 100))

while not all(exitmutexes):
time.sleep(0.25)
# 暂停主线程,减少占用CPU进行无价值循环
print("main thread exiting")

threading

Thread

1
2
3
4
5
6
7
8
9
10
class Thread:
def __init__(self,
group=None,
target=callable,
name=None/str,
args=(),
kwargs={},
*,
daemon=None/True/daemon):
pass
  • 用途:可控线程类,有两种方法使用

    • 传递callable参数创建新对象
    • 继承、覆盖run方法:代码和Thread深耦合,可能 不方便代码复用,如multiprocessing模块
  • 参数

    • group:保留参数用于未来扩展
    • target:可执行对象,将被run invoke
    • name:线程名,缺省Thread-N
    • args:用于invoke target参数tuple
    • kwargs:用于invoke target keyword参数dict
    • daemon:是否是守护线程
      • 默认情况下,主进程(线程)会等待子进程、线程退出 后退出
      • 主进程(线程)不等待守护进程、线程退出后再退出
      • 注意:主进程退出之前,守护进程、线程会自动终止
  • 若衍生类覆盖此构造器方法,务必首先调用此方法

.run

1
2
def run(self):
pass
  • 用途:代表线程活动
    • run用于invoke target
    • 覆盖此方法设置线程活动

.start

1
2
def start(self):
pass
  • 用途:开始线程活动
    • 线程创建完成后不会立即执行,需要手动调用.start启动
    • 多次调用raise RuntimeError

.join

1
2
3
def join(self,
timeout=None/float):
pass
  • 用途:等待直到线程结束

    • join:将线程加入当前线程
    • 可以多次调用
    • 试图导致死锁时,将会raise RuntimeError
  • 参数

    • timeout:指定超时时间,单位秒
      • 缺省否则阻塞直到线程结束

其他方法

1
2
3
4
5
6
7
8
9
10
bool = is_alive(self):
# 返回线程是否存活

def setDaemon(self, daemonic):
# 设置守护进程
bool = isDaemon(self):

def setName(self, name):
# 设置线程名
def getName(self):

Event

1
2
3
4
5
6
7
8
9
10
11
12
13
14
class Event():
def set(self):
# 设置信标值为`True`,发送信号

bool = is_set(self):
# 查看信标是否被设置

bool = wait(self, timeout):
# 阻塞,直到超时、信标被设置为`True`
# 返回信标值,即因超时返回时返回`False`

def clear():
# 重置Event对象,设置信标值为`False`

  • 用途

    • 发送信号:is_set触发事件
    • 接收信号:wait阻塞直到事件发生
  • Event中包含信标,可在线程中设置、接收,实现线程 间同步

    • Event对象信标默认设置为False,等待Event对象线程 会阻塞直到信标设置为真

    • 若有线程设置信标为真,则将唤醒所有等待该Event 对象线程

  • 若只想唤醒单个线程,用信号量、Condition代替

.clear

.clear可以重置Event对象

  • 难以确保安全清理、重新赋值Event对象,可能导致错过事件 、死锁

  • 且无法保证重置Event对象的代码能在线程再次等待此Event 信号之前执行

  • 所以Event对象最好单次使用,即其信标设置为真应立刻丢弃

  • 若线程需不停重复使用Event对象,使用Condition代替

Condition

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
class Condition():
def __init__(self, lock=None)
# `lock`:`Lock`、`RLock`对象,被用作底层锁
# 缺省创建新`RLock`对象作为底层锁

bool accquire():
# 获取一次condition内部锁

bool release():
# 释放一次condition内部锁

def notify(self, n=1):
# 唤醒至多`n`个on this condition的线程

def notify_all(self):
# 唤醒所有on this condition的线程

bool = wait(self, timeout=None):
# 释放底层锁,阻塞
# 直到被唤醒再次获取底层锁、超时,返回

bool = wait_for(self, predicate(callable), timeout=None):
# `wait`直到`predicate`返回`True`
  • 用途:Condition对象wait等待信号、notify唤醒一定 数量线程实现线程同步

  • 说明

    • 以上所有方法执行前均需要已获得底层锁,否则 raise RuntimeError
    • 因此以上方法一般都需要放在with代码块中,保证已经 获取了内部锁

with上下文管理器

1
2
3
4
5
with c:
c.wait()

with c:
c.notify()
  • with进入:获取condition底层锁,保证调用方法前已经获得 底层锁
  • with退出:释放condition底层锁
  • Condition支持with上下文管理器,而且非常必须
  • help里面看不到.acquire.release方法,但是是有 而且可以调用的,应该是官方不建议使用

.wait

  • 用途

    • 方法先释放底层锁,阻塞,使得其他等待 获取此对象底层锁获得锁
    • 等待被notify唤醒,再次获取锁,继续执行
  • 底层锁是RLock

    • .wait不是调用其.release()方法,而是调用RLock 内部方法确保真正释放锁,即使RLock被递归的获取多次

    • 再次获取锁时,调用另一个内部接口恢复递归层次,即 RLock内部计数

    • RLock本身性质:在没有被持有时,其内部计数被重置 为1,其他线程可以自由获取

.notify

  • .notify并不释放condition底层锁
  • 只是控制能够获取底层锁的线程数量

Semaphore

1
2
3
4
5
6
7
8
9
class Semaphore(builtins.object):
def __init__(self, value):
# `value`:起始许可证数量(最多允许同时执行线程数目)

bool = acquire(self, blocking=True, timeout=None):
# 获取信号量,内部计数(许可证)减1

def release():
# 释放信号量,内部计数(许可证)加1
  • 用途:Semaphore对象release方法生产、acquire方法 消耗信号量,实现线程通信

    • 可以像标准锁一样使用信号量作线程同步,但会增加复杂性 影响性能
    • 更适用于需要在线程间引入信号、限制的程序,如限制代码 的并发访问量
  • 信号量对象是建立在共享计数器基础上的同步原语

.acquire

  • 用途:获取信号量,内部计数(许可证)大于0则立刻减1

    • 内部计数>0-1立即返回True
    • 内部计数=0,阻塞、等待,直到其他线程调用release
  • 返回值:成功获取许可证则返回True,否则返回False

.release

  • 用途:释放信号量,内部计数(许可证)加1
    • 内部计数=0,表明有线程阻塞,随机唤醒线程

BoundedSemaphore

1
2
3
4
class BoundedSemaphore(Semaphore):
def release(self):
# 释放信号量,内部计数加1
# 当信号量总数超过初始化值时`raise ValueError`

Lock

  • Threading.Lock等价于_thread.allocate_lock,二者 都是工厂方法,返回lock类的实例
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
class lock:
bool = acquire(blocking=True/False, timeout=-1)
# 尝试获得锁,返回是否获得锁
bool = acquire_lock
# deprecated,同`acquire`

bool = locked()
# 返回锁状态
bool = lockec_lock()
# deprecated,同`lock`

bool = release()
# 释放锁,若锁本身未获取`raise RuntimeError`
bool = release_lock()
# deprected,同`release`
  • 用途:同步原语,排他性使用某资源

  • 说明

    • 支持with上下文语法,代码块执行前自动获取锁,执行 结束后自动释放
    • 为了避免出现死锁,每个线程应该一次只允许获取一个锁, 否则应该使用更高级死锁避免机制
    • 适合简单的锁定可变对象
  • lock对象应该是C实现,里面的方法是没有self参数的

其他

Lock锁可以视为使用更加方便的全局变量

  • 可以用于线程之间的通信:给每个子线程分配单独一个锁, 主线程、子线程可以通过锁状态通信

  • 很大程度上可以被全局变量“替换”

    • 获得锁:不断检查全局变量状态,阻塞直到全局变量 状态代表锁可获得,修改全局变量状态代表锁被获取

    • 释放锁:修改全局变量状态代表锁可获得

    • 不断检查变量状态会无意义占用CPU时间,可以在检查 间隙使用time.sleep()暂停线程

RLock

  • 工厂方法,返回RLock实例
1
2
3
4
5
6
7
class RLock:
bool = acquire(block=True):
# 尝试获取锁,返回是否获取锁
# 每次获取锁,内部计数器加1
bool = release()
# 释放锁,若锁本身未获取`raise RuntimeError`
# 每次释放锁,内部计数器减1
  • 用途:可重入锁,可以被同一线程多次获取

    • 若锁被当前线程获取多次,则需要被释放同样次数才能被 其他线程获取
    • 即只有内部计数器回到初始状态才能被任意线程获取
  • 没有线程持有锁时,RLock内部计数被重置为1

    • 应该是RLock就是通过内部计数记录被获取次数
  • 常用于给类整体上锁

    • 类内部每个方法都获取锁
    • 类内部方法之间相互调用
    • 这样类实例方法每次只能有一个线程完整调用
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    import Threading

    class SharedCounter:
    _lock = threading.RLock()
    # 被所有实例共享的类级锁
    # 需要大量使用计数器时,内存效率更高
    def __init__(self, intial_value=0):
    self._value = initial_value

    def incr(self, delta=1):
    with ShareCounter:
    self._value += delta

    def decr(self, deta=1):
    with SharedCounter:
    # 获取锁之后,调用也需要获取锁的`.incr`方法
    self.incr(-delta)

local

  • 用途:创建本地线程存储对象,该对象属性保存、读取操作只对 当前线程可见
    • 可以用于保存当前运行线程状态,隔离不同线程间数据
      • 套接字对象

例子

提前终止线程

轮询方法
  • 函数封装在类中,在类中设置成员变量作为轮询点
  • 方法terminate改变轮询点状态,用于外部调用结束线程
  • 线程方法run检查轮询点状态判断是否结束自身
    • 线程执行类似IO的阻塞操作时,无法返回自身、无法检查 轮询点,通过轮询终止线程难以协调
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
from threading import Thread
import time

class CountDownTask:

def __init__(self):
self._running = True

def terminate(self):
self._runing = False

def run(self, n):
while self._running and n > 0:
# 设置轮询点,告诉线程何时应该终止
print("T-minus", n)
n -= 1
time.sleep(5)

def test():
c = CountdownTask()
t = Thread(karget=c.run, args=(10,0))
# 创建Thread
t.start()
# 启动线程
c.terminate()
c.join()
超时循环
  • 设置任务超时,超时自动返回
    • 任务只要超时就会返回,不会出现线程阻塞
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
class IOTask:
def terminate(self):
self._running = False

def run(self, sock):
sock.settimeout(5)
# 设置超时
while self._running:
try:
data = sock.recv(8192)
break
except socket.timeout:
continue
...continued processing...
...terminated...
return

线程同步、通信

Event方式
1
2
3
4
5
6
7
8
9
10
11
12
from threading import Event, Threading

def send_signal(start_evt):
start_evt.set()
# 设置事件

def recv_signal():
start_evt = Event()
# 创建新事件
t = Thread.threading(target=send_signal, start_evt)
start_evt.wait()
# 阻塞直到接收到信号
Condition实现queue.Queue
  • 自定义数据类型,封装Condition实例进行线程间同步
  • put方法生产,notify通知消费者
  • get方法消费,阻塞直到被唤醒
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
import heapq
from threading import Condition

class PriortyQueue:
def __init__(self):
self._queue = [ ]
self._count = 0
self._cv = Condition()
# 封装`Condition`实例实现线程间同步

def put(self, item, priority):
with self._cv:
heapq.heappush(self._queue, (-priority, self._count, item))
self._count += 1
self._cv.notify()

def get(self):
with self._cv:
# 阻塞,直到空闲
while len(self._queue) == 0:
self._cv.wait()
# `Condition`默认使用`RLock`,可重入
return heapq.heappop(self._queue)[-1]

防死锁机制

严格升序使用锁
  • local保存当前线程状态,隔离不同线程锁数据
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
from threading import local
from contextlib import contextmanager

_local = local()

@contextmanager
# 使支持`with`上下文管理器语法
def acquire(*locks):

locks = sorted(locks, key=lambda x: id(x))
# 根据*object identifier*对locks排序
# 之后根据此list请求锁都会按照固定顺序获取

acquired = getattr(_local, "acquired", [ ])
# 为每个线程创建独立属性保存锁
if acquired and max(id(lock)) for lock in acquired >= id(locks[0]):
# `_local`中已有锁不能比新锁`id`大,否则有顺序问题
raise RuntimeError("lock order violation")

acquired.extend(locks)
_local.acquired = acquired
# 更新线程环境中锁

try:
for lock in locks:
# 只允许升序获取锁
lock.acquire()
yield
# `with`语句进入处
finally:
# `with`语句退出处
for lock in reversed(locks):
# 只允许降序释放锁
lock.release()
del acquired[-len(locks):]
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
def thread_1(x_lock, y_lock):
while True:
with acquire(y_xlock, y_lock):
print("Thread_1")

def thread_2(x_lock, y_lock):
while True:
with acquire(y_lock, x_lock):
print("Thread_2")

def test():
x_lock = threading.Lock()
y_lock = threading.Lock()

t1 = threading.Thread(target=thread_1, args=(x_lock, y_lock)):
t1.daemon = True
t1.start()

t1 = threading.Thread(target=thread_2, args=(x_lock, y_lock))
t2.daemon = True
t2.start()

queue模块

Queue

1
2
3
class Queue(builtins.object):
def __init__(self, maxsize=0):
# `maxsize`;限制可以添加到队列中的元素数量

queue.Queue:创建被多个线程共享的Queue对象

  • 线程安全的数据交换方式,基于collections.deque

    • Queue对象已经包含必要的锁,可以在多个线程间安全的 共享数据
    • Queue实际上是在线程间传递对象引用,不会复制 数据项,如果担心对象共享状态,可以传递不可修改数据 结构、深拷贝
  • 还可以使用Condition变量包装数据结构,实现线程线程中 间通信

get

1
2
3
4
5
6
obj =  get(self,
block=True,
timeout=None/num)

obj = get_nowait(self):
# 等同于`get(block=False)`
  • 用途:从队列中移除、返回一个元素

  • 参数

    • block
      • False:没有空闲slot立即raise Empty exception

.task_done

1
2
3
4
def task_done(self):
# 指示前一个队列“任务”完成
def join(self):
# 阻塞直到队列中所有元素被获取(消费)、**处理**
  • 说明
    • .join阻塞时,要所有队列中元素都被告知task_done, 才能解除阻塞
    • 即队列消费者每次get元素、处理后,要手动调用 task_done告知队列任务已完成
  • 也可以将Event和队列元素封装,用Event对象告知队列元素 处理完成

.put

1
2
3
4
5
6
7
8
def put(self,
item,
block=True,
timeout=None/num):
pass

def put_nowait(self, item):
# 等价于`put(self, item, block=False)`
  • 用途:向队列中追加元素

  • 参数

    • block
      • False:没有空闲slot立即raise Full exception

.qsize

1
2
3
4
5
6
7
8
9
10
int = qsize(self):
# 返回队列大概大小(不可靠)
# 非线程安全,在其使用结果时队列状态可能已经改变

bool = empty(self):
# deprecated,使用`qsize() == 0`替代
# 和`qsize()`一样非线程安全

bool = full(self):
# deprecated,使用`qsize() > n`替代,非线程安全

例子

队列元素消费通知

Event进队列
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
from queue import Queue
from threading import Thread, Event

def producer(out_q):
while running:
evt = Event()
out_q.put((data, evt))
# 将`Event`实例放入队列
evt.wait()
# 阻塞直到收到消费者信号

# A thread that consumes data
def consumer(in_q):
while True:
data, evt = in_q.get()
evt.set()
# 告知生产者消费完成

协调生产、消费线程终止

队列中添加特殊值
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
_sentinel = object()

def producer(out_q):
while running:
out_q.put(data)
out_q.put(_sentinel)
# 将特殊值放入队列,结束生产、通知消费者
def consumer(in_q):
while True:
data = in_q.get()
if data is _sentinel:
# 消费者接收到特殊值,结束生产
in_q.put(_sentinel)
# 特殊信号放回队列,继续传递
break

Queue实现线程池

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
from socket import socket, AF_INET, SOCK_STREAM
from threading import Thread
from queue import Queue

def echo_client(q):
sock, client_addr = q.get()
while True:
msg = sock.recv(65536)
if not msg:
break
sock.sendall(msg)
print("closed")

def echo_server(addr, nworkers):
q = Queue()
for n in range(nworkers):
t = Thread(target=echo_client, args=(q,))
t.daemon = True
t.start()
socket = socket(AF_INET, SOCK_STREAM)
socket.bind(addr)
sock.listen(5)
while True:
client_sock, client_addr = sock.accept()
q.put((client_sock, client_addr))

multiprocessing模块

  • multiprocessing支持一个基本与平台无关的进程派生模型, 在Unix、Windows下均可正常工作

  • 实现方式

    • 启动一个新的Python进程,import当前模块
    • pickle序列化需要调用的函数,传递至新进程执行
  • 其中PoolQueuePipe等实际都是其封装其子模块类 的工厂方法

Process

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
class Process():
def __init__(self,
group=None,
target=None/callable,
name=None/str,
args=()/list,
kwargs={}/dict,
daemon=None
):
self.authkey
self.daemon
self.exitcode
self.name
self.pid

def is_alive(self):

def join(self,
timeout=None/num
):

def start(self):
# 启动进程

def run(self):
# `start`方法调用`run`方法,若进程实例未传入`target`,
# `start`默认执行`run`方法

def terminate():
# 立即结束进程
  • 用途:Multiprocessing核心,类似于Thread,实现多进程 创建、启动、关闭

  • 成员方法基本类似Thread

1
2
3
4
5
6
7
8
9
10
11
from multiprocessing import Process
import os

def test(name):
print("Process ID: %s" % (os.getpid())
print("Parent Process ID: %s" % (os.getppid()))

if __name__ == "__main__":
proc = Process(target=test, args=("nmask",))
proc.start()
proc.join()

Pool

1
2
3
4
5
6
7
8
class Pool:
def __init__(self,
processes=None/int,
initializer=None,
initargs=(),
maxstacksperchild=None/int,
context=None
):
  • 用途:创建管理进程池,提供指定数量进程供用户调用
    • 新请求提交到pool中时
      • 若进程池没有满,将创建新进程执行请求
      • 否则请求等待直到池中有进程结束,然后创建新进程
    • 适合子进程多且需要控制子进程数量时

apply_async

1
2
3
4
5
6
7
8
def apply(self, func, args=(), kwds={}):
# 分配进程执行`func(*args, **kwds)`,阻塞
# 返回值:`func(item)`返回值

def apply_async(self, func, args=(), kwds={},
callback=None, error_callback=None)
# 异步分配进程执行调用,非阻塞
# 返回值:`ApplyResult`对象
  • 返回值:异步非阻塞调用返回结果操作句柄ApplyResult
  • 回调函数要有返回值,否则ApplyResult.ready()=False, 回调函数永远无法完成
ApplyResult
1
2
3
4
5
6
7
8
9
10
11
class ApplyResult:
def __init__(self, cache, chunksize, length,
callback, error_callback):

def get(self, timeout=None):

bool = ready(self):

bool = successful(self):

bool = wait(self, timeout=None):

map_async

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
def map(self, func, iterable, chunksize=None):
# 同步多进程`map(func, iterable)`,阻塞直到全部完成
# 返回值:结果列表,结果按调用顺序

def map_async(self, func, iterable, chunksize=None,
callback=None, error_callback=None):
# 异步多进程`map`,非阻塞
# 返回值:`MapResult(ApplyResult)`对象

def imap(self, func, iterable, chunksize=1):
# 迭代器版`map`,更慢、耗内存更少
def imap_unordered(self, func, iterable, chunksize=1):
# 返回结果无序版`imap`

def starmap(self, func, iterable, chunksize=1):
# 同步`map`,参数被解构`func(*item)`,阻塞

def starmap_async(self, func, iterable, chuncksize=None,
callback=None, error_callback=None):
# 异步`startmap`,阻塞

终止

1
2
3
4
5
6
7
8
9
def close(self):
# 关闭进程池,不允许添加新进程

def join(self):
# 阻塞,直至进程池中所有进程执行完毕
# 必须先`.close`进程池

def terminate(self):
# 终止进程池

Queue

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
class SimpleQueue():
bool empty(self):
def get(self):
def put(self):
class Queue:
def __init__(self, maxsize=0, *, ctx):
def join_thread(self):
# join feeder线程
def cancel_join_thread(self):
# 控制在进程退出时,不自动join feeder线程
# 有可能导致部分数据在feeder中未被写入pipe而丢失
def close(self):
# 关闭feeder线程
def qsize(self):
def empty(self):
def full(self):
def get(self, block=True, timeout=None):
def get_nowait(self):
def put(self, obj, block=True, timeout=None):
def put(self):
class JoinableQueue(Queue):
def task_done():
def join():
  • 用途:进程安全队列

  • multiprocessing.Queue基于multiprocessing.Pipe构建

    • 数据传递时不是直接写入Pipe,而是写入本地buffer, 通过feeder线程写入底层Pipe,从而实现超时控制、 非阻塞put/get

    • 所以提供了.join_threadcancel_join_threadclose函数控制feeder流行为

    • 相当于线程安全的queue.Queue的多进程克隆版

  • multiprocessing.SimpleQueue:简化队列

    • 没有Queue中的buffer,没有使用Queue可能有的问题, 但是put/get方法都是阻塞的、没有超时控制

Pipe

  • mltiprocessing.Pipe()返回两个用管道相连的、读写双工 Connection对象
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
class Connection():
def __init__(self, handle,
readable=True,
writable=True):

def close(self):
def fileno(self):
# 返回描述符或连接处理器
bool = poll(self, timeout=0):
# 是否有输入可以读取
obj = recv(self):
# 接收pickable对象
# 若接收字节流不能被pickle解析则报错
bytes = recv_bytes(self, maxlength=None):
# 接收字节流作为`bytes`
int = recv_bytes_into(self, buf, offset=0):
# 接收字节流存入可写`buf`
# 返回读取的字节数量
def send(self, obj):
# 发送对象
def send_bytes(self, buf, offset=0, size=None):
# 发送bytes-like对象中字节数据
  • 对象会在每条信息前添加标志字节串,可以自动处理多条信息 堆叠

    • 可以通过os.read(connect.fileno())获取

共享内存

1
2
3
4
5
6
7
8
class SynchronizedBase:
def __init__(self, obj, lock=None, ctx=None):

def get_lock(self):
# 获取`multiprocessing.synchronize.RLock`

def get_obj(self):
# 获取数据对象,C数据结构`ctypes.`

Value

1
2
3
4
def Value(typecode_or_type, *args, lock=True):
# 返回同步共享对象
class Synchronized(SynchronizedBase):
value

Array

1
2
3
4
5
6
7
8
9
def Array(typecode_or_type, size_or_initializer, *,
lock=True)

def SynchronizedArray(SynchronizedBase):
def __getitem__(self, i):
def __getslice__(self, start, stop):
def __len__(self):
def __setitem(self, i, value):
def __setslice__(self, start, stop, values):

Manager

常与Pool模块一起使用,共享资源,不能使用QueueArray

其他方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
import multiprocessing as mltp
l1 = mltp.Lock()
# 获取进程锁
rl1 = mltp.RLock()
# 获取可重入锁
s1 = mltp.Semaphore(value=int)
# 获取信号量对象
bs1 = mltp.BoundedSemaphore(value=int)
# 获取有上限信号量对象
e1 = mltp.Event()
# 获取事件对象
cv1 = mltp.Condition(lock=None)
# 获取Condition对象
cp = mltp.current_process()
# 获取当前Process对象

concurrent.futures

  • 异步并发模块

ThreadPoolExecutor

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
class ThreadPoolExecutor:
def __init__(self,
max_workers=None,
thread_name_prefix=''):

def shutdown(self, wait=True):
# 清理和该执行器相关资源

def submit(self, fn(callable), *args, **kwargs):
# 以指定参数执行`fn`
# 返回:代表调用的future,可以用于获取结果

def map(self, fn, *iterables,
timeout=None,
chunksize=1/int)
# 并行`map(fn, iterables)`
# 返回:按调用顺序的结果迭代器
  • 用途:创建线程池

.shutdown

  • 用途:关闭执行器,清理和该执行器相关的资源

    • 可以多次调用,调用之后不能进行其他操作
  • 参数

    • wait:阻塞,直到所有运行futures执行完毕,所有 资源被释放

ProcessPoolExecutor

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
class ProcessPoolExecutor:
def __init__(self,
max_workers=None):

def shutdown(self, wait=True):
# 清理和该执行器相关资源

def submit(self, fn(callable), *args, **kwargs):
# 以指定参数执行`fn`
# 返回:代表调用的`Future`实例,可以用于后续处理

def map(self, fn,
*iterables,
timeout=None,
chunksize=1/int)
# 并行`map(fn, iterables)`
# 返回:按调用顺序的结果迭代器
  • 用途:创建进程池

    • 参考ThreadPoolExecutor.map方法支持chunksize 参数
  • 常使用with语句使用

    1
    with ProcessPoolExecutor() as Pool:
    • 处理池执行完with语句块中后,处理池被关闭
    • 程序会一直等待直到所有提交工作被完成

注意事项

  • 被提交的任务必须是简单函数形式,不支持方法、闭包和 其他类型可执行

  • 函数参数、返回值必须兼容pickle模块,因为进程间通信 交换数据使用其序列化

  • 函数不要修改环境

    • 被提交的任务函数出打印日志之类等简单行为外,不应该 有保留状态、副作用
  • 混合使用进程池、多线程时

    • 创建任何线程之前先创建、激活进程池
    • 然后线程使用同样的进程池进行计算密集型工作

Future

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
class Future:
def add_done_callback(self, fn):
# 添加future完成的回调函数
# 多次调用添加的回调函数按添加顺序执行

bool = cancel(self):
# 尝试取消当前future,返回是否成功取消
# 正在运行、执行完成的future不能被取消

bool = cancelled(self):
# 查看当前future是否被取消

bool = done(self):
# 查看当前future是否被取消、完成

def exception(self, timeout=None):
# 返回当前future代表的调用的exception

def result(self, timeout=None):
# 返回当前future代表调用的返回值
  • 用途:代表一个异步计算的结果
    • 直接创建对象无价值

socket模块

Python类说明

元类说明

type

type:元类,python中所有类都由type创建

1
2
3
4
5
class = type(
name=cls_name,
bases=(pls_name,..),
dict={attr1: val1,...}
)
  • 参数

    • cls_name:类名称
    • bases:父类元组
    • dict:类方法、属性
  • 返回值:类的别名

元类作用

  • 拦截类创建->修改类->返回修改后的类(创建类对象

  • 元类的作用和函数相似

    • python并不关心类对象是否是由真正的元类创建
    • 可以指定元类为一个函数,而非继承自type的元类
  • 但仍应尽量将元类指定为继承自type的对象

    • 元类应用场景一般比较复杂,使用类可以更好的管理代码
    • 默认元类是type类,类继承保持一致性意图比较明显,且 可以使用type中的方法
    • 元类可以使用一些类中特有的方法:__new____init__
  • 如果不确定是否需要用元类,就不应该使用

自定义元类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
class UpperAttrMeta(type):
def __new__(cls, cls_name, bases, attrs):
upper_attrs=dict((name.upper(), val)
for name,val in attrs.items()
if not name.startswith('__')
);
return super(UpperAttrMeta,cls).__new__(
cls,
cls_name,
bases,
upper_attrs);
// 使用元类创建新类

class Foo(metaclass=UpperAttrMeta):
b=1;

使用自定义元类UppAttrMeta创建的类Foo中定义的__init____new__等函数无意义,因为该类不仅是通过元类创建,也是 通过元类初始化

  • Foo通过UpperAttrMeta创建,而UppAttrMeta本身没有 实现自定义__init__,默认继承于object

    因此Foo类的创建就有object的init完成 segmentfault.com/q/1010000004438156 这个是原话,不明白什么意思了

  • 但是如果元类仅仅是pass,如下:

    1
    2
    class MetaCls(type):
    pass;

    使用此自定义元类,类定义中的__init____new__有效

类创建

py2自定义元类

python创建类对象步骤

  • __metaclass__指定创建类使用的元类

    • 按照优先级:类定义内 > 祖先类内 > 模块内 > type, 查找__metaclass__,并使用其创建类对象,若前三者 均未定义__metaclass__,则使用type创建

    • 自定义元类就是为__metaclass__指定自定义值

    • python只是将创建类的参数传递给__metaclass__,并不 关心__metaclass__是否是一个

      • cls()返回一个类对象,是相当于调用cls.__new__
      • 所以可以指定__metaclass__为一个函数
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
		
def upper_attr(cls_name, bases, attrs):
upper_attrs=dict((name.upper(), val) for name,val in attrs.items());
return type(cls_name, bases, upper_attrs);

class Foo():
bar=1;
__metaclass__=upper_attr;

# 函数版本

class UpperAttrMeta(type):
def __new__(clsf, cls_name, bases, attrs):
upper_attrs=dict((name.upper(), val) for name,val in attrs.items());
return type(cls_name, bases, upper_attrs);

class Foo():
bar=1;
__metaclass__=UpperAttrMeta;

# 类版本1

class UpperAttrMeta(type):
def __new__(cls, cls_name, bases, attrs):
upper_attrs=dict((name.upper(), val) for name,val in attrs.items());
return type.__new__(cls, cls_name, bases, upper_attrs);

# 类版本2

class UpperAttrMeta(type):
def __new__(cls, cls_name, bases, attrs):
upper_attrs=dict((name.upper(), val) for name,val in attrs.items());
return super(UpperAttrMeta,cls).__new__(cls, cls_name, bases, upper_attrs);

# 类版本3

元类示例

缓存实例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
import weakref

class Cached(type):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.__cache = weakref.WeakValueDictionary()

def __call__(self, *args):
if args in self.__cache:
return self.__cache[args]
else:
obj = super().__call__(*args)
self.__cache[args] = obj
return obj

class Spam(metaclass=Cached):
def __init__(self, name):
print("Creating Spam({!r})".format(name))
self.name = name

捕获类属性定义顺序

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
from collection import OrderedDict

class Typed:
_expected_type = type(None)
def __init__(self, name=None):
self._name = name

def __set__(self, instance, value):
if not instance(value, self_expected_type):
raise TypeError("expected" + str(self._expected_type))
instance.__dict__[self._name] = value

class Integer(Typed):
_expected_type = int

class Float(Typed):
_expected_type = float

class String(Typed):
_expected_type = str

class OrderedMate(type):
def __new__(cls, clsname, bases, clsdict):
d = dict(clsdict)
order = [ ]
for name, value in clsdict.items():
if isinstance(value, Typed):
value._name = name
order.append(name)
d["_order"] = order
return type.__new__(cls, clsname, bases, d)

@classmethod
def __prepare__(cls, clsname, bases):
# 此方法会在开始定义类、其父类时执行
# 必须返回一个映射对象,以便在类定义体中使用
return OrderedDict()

class Structure(metaclass=OrderedMeta):
def as_csv(self):
return ",".join(str(getattr(self, name)) for name in self._order)

class Stock(Structure):
name = String()
shares = Integer()
price = Float()

def __init__(self, name, shares, price):
self.name = name
self.shares = shares
self.price = price

有可选参数元类

为了使元类支持关键字参数,必须在__prepare____new____init__方法中使用KEYWORDS_ONLY关键字参数

1
2
3
4
5
6
7
8
9
10
11
12
13
class MyMeta(type):
@classmethod
def __prepare__(cls, name, bases, *, debug=False, synchronize=False):
pass
return super().__prepare(naeme, bases)

def __new__(cls, name, bases, *, debug=False, synchronize=False):
pass
return super().__new__(cls, name, bases, ns)

def __init__(self, name, bases, ns, *, debug=False, synchronize=False):
pass
super().__init__(name, base, ns)

Python IO、持久化

DBM

DBM文件是python库中数据库管理的标准工具之一

  • 实现了数据的随机访问
    • 可以使用键访问存储的文本字符串
  • DBM文件有多个实现
    • python标准库中dbm/dbm.py

使用

  • 使用DBM文件和使用内存字典类型非常相似
    • 支持通过键抓取、测试、删除对象

pickle

  • 将内存中的python对象转换为序列化的字节流,可以写入任何 输出流中
  • 根据序列化的字节流重新构建原来内存中的对象
  • 感觉上比较像XML的表示方式,但是是python专用
1
2
3
4
5
6
7
import pickle
dbfile = open("people.pkl", "wb")
pickle.dump(db, dbfile)
dbfile.close()
dbfile = open("people.pkl", "rb")
db = pickle.load(dbfile)
dbfile.close()
  • 不支持pickle序列化的数据类型
    • 套接字

shelves

  • 就像能必须打开着的、存储持久化对象的词典
    • 自动处理内容、文件之间的映射
    • 在程序退出时进行持久化,自动分隔存储记录,只获取、 更新被访问、修改的记录
  • 使用像一堆只存储一条记录的pickle文件
    • 会自动在当前目录下创建许多文件
1
2
3
4
5
6
import shelves
db = shelves.open("people-shelves", writeback=True)
// `writeback`:载入所有数据进内存缓存,关闭时再写回,
// 能避免手动写回,但会消耗内存,关闭变慢
db["bob"] = "Bob"
db.close()

SQLite

SQLAlchemy

Python

contextlib

contextlib.contextmanager

  • 用途:上下文实现装饰器

    • 实现try...finally...语句的生成器上下文管理器语法
    • try部分:生成器部分,with语句进入时执行
    • finally部分:清理部分,with语句退出时执行
  • 用法

    • 定义

      1
      2
      3
      4
      5
      6
      7
      @contextmanager
      def some_generator(<parameters>):
      <setup>
      try:
      yield <value>
      finally:
      <cleanup>
    • 用法

      1
      2
      with some_generator(<argrument>) as <variable>:
      <body>
    • 等价于

      1
      2
      3
      4
      5
      6
      <setup>
      try:
      <variable> = <value>
      <body>
      finally:
      <cleanup>