进程、线程、作业

Linux进程、线程

进程发展

  • Linux2.2内核

    • 进程通过系统调用fork()创建,新进程是原进程子进程
    • 不存在真正意义上的线程
    • 只默认允许4096个进程/线程同时运行
  • Linux2.4内核

    • 运行系统运行中动态调整进程数上限,进程数仅受制于物理 内存大小,最大16000
  • Linux2.6内核

    • 进程调度重新编写,引入slab分配器动态生成 task_struct
    • 最大进程数量提升至10亿
    • 线程框架重写
      • 引入tgid、线程组、线程各自的本地存储区
      • 得以支持NPTL线程库

线程/轻量级进程

  • Linux未真正实现、区分线程,在内核层面是特殊进程,是 “真正的轻量级进程”

    • “线程”和“进程”共享
      • 相同调度策略,处于同一调度层次
      • 相同数据结构进程标识符,位于相同进程标识符空间
    • “线程”与“进程”的区别在于
      • 线程没有独立的存储空间
  • 多线程即创建多个进程并分配相应的进程描述符 task_struct、指定其共享某些资源

    • 创建线程不会复制某些内存空间,较进程创建快
    • 在专门线程支持系统多线程中,系统会创建包含指向所有 线程的进程描述符,各线程再描述独占资源
  • 尽管Linux支持轻量级进程,但不能说其支持核心级线程

    • 则不可能在Linux上实现完全意义上的POSIX线程机制
    • 所以Linux线程库只能尽可能实现POSIX绝大部分语义,尽量 逼近功能
  • 线程在进程内共享某些资源

    • 打开的文件
    • 文件系统信息
    • 地址空间
    • 信号处理函数
  • 这里讨论的线程都是内核线程,即内核可感知、调度线程,不 包括程序自建线程

内核守护线程

kthreads pthreads
资源 无用户空间 共享完整虚拟寻址空间
状态 只工作在内核态 可在内核态、用户态之间切换
目的 维护内核正常工作 用户分配任务

内核守护线程:内核为维护正常运行创建、仅工作在内核态线程

  • 按作用可以分类

    • 周期性间隔运行,检测特定资源的使用,在用量超出或低于 阈值时采取行动
    • 启动后一直等待,直到系统调用请求执行某特定操作
  • 执行以下任务

    • 周期性将dirty内存页与页来源块设备同步:bpflush线程
    • 将不频繁使用的内存写入交换区:kswapd线程
    • 管理延时动作:kthreadd线程接手内核守护线程创建
    • 实现文件系统的事务日志
  • 内核守护线程只能工作在内核态

    • 没有用户空间,和内核共用一张内核页表
    • 只能使用大于PAGE_OFFSET部分的虚拟寻址空间,即进程 描述符中current->mm始终为空
    • 对4G主存的X86_32机器,只能使用最后1G,而普通pthreads 可以使用完整虚拟寻址空间
  • 内核守护线程名往往为k开头、d结尾

特殊内核守护线程

  • Linux内核启动的最后阶段,系统会创建两个内核线程
  • init:运行文件系统上一系列init脚本,并启动shell 进程

    • 是所有用户进程的祖先,pid为1
  • kthreadd:内核启动完成之后接手内核守护线程的创建

    • 内核正常工作时永不退出,是死循环,pid为2
    • 载入内核模块时即需要调用其创建新内核守护线程

进程状态

1
2
3
4
5
6
7
8
// <kernel/include/linux/sched.h>
#define TASK_RUNNING 0
#define TASK_INTERRUPTIBLE 1
#define TASK_UNINTERRUPTIBLE 2
#define __TASK_STOPPED 4
#define __TASK_TRACED 8
#define EXIT_ZOMBIE 16
#define TASK_DEAD 64

process_status

  • 状态虽然有很多种,但总是TASK_RUNNING -> 非, 即使进程在TASK_INTERRUPTIBLE状态被kill,也需要先唤醒 进入TASK_RUNNING状态再响应kill信号进入TASK_DEAD
  • TASK_RUNNING:可执行,正在执行或在运行队列中等待执行

    • 同一时刻可能有多个进程处于可执行态,位于运行队列中 等待进程调度器调度
  • TASK_INTERRUPTIBLE:正在阻塞,等待某些条件达成

    • 条件达成后内核会把进程状态设置为运行
    • 此状态进程也会因为接收到信号而提前唤醒准备运行
    • 系统中大部分进程都此状态
  • TASK_UNINTERRUPTILBE:不响应异步信号,即使接收到信号 也不会被唤醒或准备投入运行

    • 不可中断是指进程不响应异步信号,而不是指CPU不响应 中断
    • 内核某些处理流程是不可被打断的,如:内核和硬件设备 交互被打断会导致设备进入不可控状态,因此需要此状态
  • __TASK_TRACED:被其他进程跟踪

    • 开发中进程停留在断点状态就是此状态,如:通过ptrace 对调试程序进行跟踪
    • 此状态进程只能等待调试进程通过ptrace系统调用执行 PTRACE_CONTPTRACE_DETACH等操作才能恢复到 TASK_RUNNING状态
  • __TASK_STOPPED:停止执行,没有也不能投入运行

    • 通常发生在接收到SIGSTOPSIGSTPSIGTTINSIGTTOU等信号
    • 向此状态进程发送SIGCONT信号可以让其恢复到 TASK_RUNNING状态
  • TASK_DEAD:退出状态,即将被销毁

  • EXIT_ZOMBIE/TASK_ZOMBIE:进程已结束但task_struct未 注销

    • 进程退出过程中处于TASK_DEAD状态,进程占有的资源将 被回收,但父进程可能会关心进程的信息,所以 task_struct未被销毁

内核态、用户态

  • 系统设计角度:为不同的操作赋予不同的执行等级,与系统相关 的特别关键的操作必须有最高特权程序来完成

    • 运行于用户态:进程可执行操作、可访问资源受到限制
    • 运行于内核态:进程可执行任何操作、使用资源无限制
  • 内存使用角度(虚拟寻址空间,X86_32位系统,最大4GB主存)

    • 内核空间:最高的1G,所有进程共享
      • 包含系统堆栈:2页面,即8K内存,低地址中存放 task_struct
      • 进程运行于内核空间时使用系统堆栈、处于内核态
    • 用户空间:剩余3G
      • 包含用户堆栈
      • 进程运行于用户空间时使用用户堆栈、处于用户态

    virtual_address_space

  • 内核态的逻辑

    • 进程功能和内核密切相关,进程需要进入内核态才能实现 功能
    • 应用程序在内核空间运行、内核运行于进程上下文、陷入 内核空间,这种交互方式是程序基本行为方式
  • 用户态进入内核态的方式

    • 系统调用,如:printf函数中就是调用write函数
    • 软中断,如:系统发生异常
    • 硬件中断,通常是外部设备的中断

    process_calling_structure

  • 进程或者CPU在任何指定时间点上活动必然为

    • 运行于用户空间,执行用户进程
    • 运行于内核空间,处于进程上下文,代表某特定进程执行
    • 运行于内核空间,处于中断上下文,与任何进程无关,处理 特点中断

Linux进程数据结构

task_struct

1
2
3
4
5
6
7
8
9
10
11
12
13
// <kernel/include/linux/sched.h>
struct task_struct{
volatile long state; // -1:不可运行,0:可运行,>0:已中断
int lock_depth; // 锁深度
unsigned int policy; // 调度策略:FIFO,RR,CFS
pid_t pid; // 线程ID
pid_t tgid; // 线程组ID,2.6内核中引入
struct task_struct *parent; // 父进程
struct list_head children; // 子进程
struct list_head sibling; // 兄弟进程
struct task_struct *group_leader;
struct list_head thread_group;
}

task_struct

  • 内核使用任务队列(双向循环链表)维护进程(描述符)

  • task_struct:进程描述符,包含进程的所有信息,包括

    • 进程状态
    • 打开的文件
    • 挂起信号
    • 父子进程

ID

  • pid:字面意思为process id,但逻辑上为线程ID
  • tgid:字面意思为thread group id,但逻辑上为 进程ID
1
2
3
4
5
6
7
// <kernel/timer.c>
asmlinkage long sys_getpid(void){
return current->tgid;
}
asmlinakge long sys_gettid(void){
return current->pid;
}

线程关系

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
// <kernel/fork.c>
copy_process{
// some code
p->tgid = p->pid;

// 创建线程时
if (clone_flags & CLONE_THREAD)
// 从父进程获取`tgid`,归属同一线程组
p->tgid = current->tgid;

// some code
// 初始化`group_leader`、`thread_group`
p->group_leader = p;
INIT_LIST_HEAD(&p->thread_group);

// some code

// 创建线程时
if (clone_flags & CLONE_THREAD){
// `group_leader`设置为父进程`group_leader`
// 即保证`group_leader`指向首个线程`task_struct`
p->group_leader = current->group_leader;
// 通过`thread_group`字段挂到首个线程的`thread_group`队列中
list_add_tail_rcu(&p->thread_group, &p->group_leader->thread_group);

// some code
}

if(likely(p->pid)){
// some code
// 仅首个线程才会通过`tasks`字段挂入`init_task`队列中
if(thread_group_leader(p)){
//...
list_add_tail_rcu(&p->tasks, &init_task, tasks);
}
}
}

线程组退出

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
// <kernel/exit.c>
NORET_TYPE void do_group_exit(int exit_code){
BUG_ON(exit_code & 0x80);

// `current->signal`由线程组中所有线程共享
// 若调用此方法线程`SIGNAL_GROUP_EXIT`标志已被设置,说明
// 其他线程已经执行过此方法,已通知线程组中所有线程
// 退出,则可以直接执行`do_exit`
if(current->signal->flags & SIGNAL_GROUP_EXIT)
exit_code = current->signal->group_exit_code;

// 否则通知线程组中所有线程退出
else if(!thread_gropu_empty(current)){
struct signal_struct * const sig = current->signal;
struct sighand_struct * const sighand = current->sighand;
spin_lock_irq(&sighand->siglock);

// another thread got here before we took the lock
if(sig->flags & SIGNAL_GROUP_EXIT)
exit_code = sig->group_exit_code;
else{
sig->group_exit_code = exit_code;
zap_other_threads(current);
}
spin_unlock_irq(&sighand->sigloc);
}

do_exit(exit_code);
}

// <kernel/signal.c>
void zap_other_threads(struct task_struct *p){
struct task_struct *t;

// 设置`SIGNAL_GROUP_EXTI`标志
p->signal->flags = SIGNAL_GROUP_EXIT;
p->signal->group_stop_count = 0;

if(thread_group_empty(p))
return;

for (t=next_thread(p); t != p; t=next_thread(t)){
// don't bohter with already dead threads
if (t->exit_state)
continue;

// 为每个线程设置`SIGKILL`信号
sigaddset(&t->pending.signal, SIGKILL);
signal_wake_up(t, 1);
}
}

// <include/linux/sched.h>
static inline struct task_struct *next_thread(const struct task_struct *p){
return list_entry(rcu_dereference(p->thread_group.next),
struct task_struct, thread_group);
}

Slab分配器

process_slab

  • slab分配器把不同对象类型划分为不同高速缓存组,如: task_structinode分别存放

    • 高速缓存又会被划分为slab
    • slab由一个或多个物理上连续的页组成
  • 申请数据结构时

    • 先从半满的slabs_partial中申请
    • 若没有半满,就从空的slabs_empty中申请,直至填满 所有
    • 最后申请新的空slab
  • slab分配器策略优点

    • 减少频繁的内存申请和内存释放的内存碎片
    • 由于缓存,分配和释放迅速

thread_info

1
2
3
4
5
6
7
8
9
10
// <asm/thread_info.h>
struct thread_info{
struct task_struct *task;
struct exec_domain *exec_domain;
usigned long flags;
__u32 cpu;
int preempt_count;
mm_segment_t addr_limit;
struct restart_block restart_block;
}
  • 内核中对进程操作都需要获得进程描述符task_struct指针, 所以获取速度非常重要
    • 寄存器富余的体系会拿出专门的寄存器存放当前 task_struct的指针
    • 寄存器不富余的体系只能在栈尾创建thread_info结构, 通过计算间接查找

进程创建

  • 继承于Unix,Linux进程创建使用两个函数分别完成,其他如Win 可能都是通过一个方法完成
  • fork函数:拷贝当前进程创建子进程

    • 子进程、父进程区别仅在于PID、PPID和少量资源
  • exec函数(族):载入可执行文件至地址空间开始运行

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
SYSCALL_DEFINE0(fork){
return do_fork(SIGCHLD, 0, 0, NULL, NULL);
}
SYSCALL_DEFINE0(vfork){
return _do_fork(CLONE_VFORK | CLONE_VM | SIGCHLD,
0, 0, NULL, NULL, 0);
}
long do_fork(unsigned long clone_flags,
unsigned long stack_start,
unsigned long stack_size,
int __user *parent_tidptr,
int __user *child_tidptr){
return _do_fork(clone_flags, stack_start, stack_size,
parent_tidptr, child_tidptr, 0);
}
long _do_fork(unsigned long clone_flags,
unsigned long stack_start,
unsigned long stack_size,
int __user *parent_tidptr,
int __user *child_tidptr,
unsigned long tls){
// some code
p = copy_process(clone_flags, stack_start, stack_size,
child_tidptr, NULL, trace, tls);
// some code
}
  • forkvfork最终都是通过调用_do_fork实现,仅传参 不一致

    • 首个参数为clone_flags,最终被copy_process用于 真正的拷贝执行
  • 通过系统调用clone()创建线程

    • 同创建进程系统调用fork()vfork()一样,最终调用 do_fork方法,但传递和进程创建时不同的flag,指明 需要共享的资源

      1
      CLONE_VM | CLONE_FS | CLONE_FILES | CLONE_SIGNAND

fork

fork():子进程是父进程的完整副本,复制了父进程的资源, 包括内存内容、task_struct

  • 子进程拷贝父进程的数据段、代码段

    • 同一变量的虚拟地址相同(物理地址不同)
  • 利用copy-on-write优化效率

    • 内核创建子进程时不复制父进程的地址空间,而是只读共享 父进程空间数据
    • 只有子进程需要写数据时才会拷贝到子进程
  • 页表:存放给从逻辑页号到物理页帧/块号地址的映射

Unix傻瓜式进程创建

  • 内核原样复制父进程整个地址空间,并分配给子进程,效率低

    • 为子进程页表分配页帧
    • 为子进程页分配页帧
    • 初始化子进程页表
    • 把父进程页复制到子进程相应页中
  • 大部分情况下复制父进程页无意义

    • 子进程会载入新的程序开始运行
    • 丢弃所继承的地址空间

Copy-on-Write

copy-on-write思想简单:父进程、子进程共享页帧

  • 共享页帧不能被修改,父进程、子进程试图写共享页帧时产生 page_fault异常中断

  • CPU执行异常处理函数do_wp_page()解决此异常

    • 对导致异常中断的页帧取消共享操作
    • 为写进程复制新的物理页帧,使父、子进程各自拥有内容 相同的物理页帧
    • 原页帧仍然为写保护:其他进程试图写入时,内核检查进程 是否是页帧的唯一属主,如果是则将页帧标记为对此进程 可写
  • 异常处理函数返回时,CPU重新执行导致异常的写入操作指令

  • copy-on-write:多个呼叫者同时要求相同资源时,会共同 取得相同指针指向相同资源,直到某个呼叫者尝试修改资源时, 系统才给出private copy,避免被修改资源被直接察觉,此 过程对其他呼叫者transparent

vfork

vfork():子进程直接共享父进程的虚拟地址空间、物理空间

  • vfork被设计用以启动新程序

    • 内核不创建子进程的虚拟寻址空间结构
    • 进程创建后应立即执行exec族系统调用加载新程序,替换 当前进程
    • exec不创建新进程,仅用新程序替换当前进程正文、 数据、堆、栈
  • 在子进程调用exec函数族、_exit()exit()前,子进程 在父进程的地址空间中运行

    • 二者共享数据段,子进程可能破坏父进程数据结构、栈
    • 父进程地址空间被占用,因此内核会保证父进程被阻塞, 即vfork会保证子进程先运行
  • 应确保一旦调用vfork

    • 子进程不应使用return返回调用处,否则父进程又会 vfork子进程
    • 子进程不应依赖父进程进一步动作,否则会导致死锁
    • 子进程需避免改变全局数据
    • 若子进程改变了父进程数据结构就不能调用exit函数

clone

clone:可有选择地继承父进程资源

1
int clone(int (fn)(void), void * child_stack, int flags, void * args);
  • clone通过众多参数有选择地创建进程

    • 创建LWP/线程
    • 创建兄弟进程
    • 类似vfork创建和父进程共享虚拟寻址空间
  • 参数说明

    • fn:函数指针
    • child_stack:为子进程分配的系统堆栈空间
    • flags:描述需要从父进程继承的资源,如下
    • args:传给子进程的参数

Flags

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
#define CSIGNAL		0x000000ff		// signal mask to be setn at exit
#define CLONE_VM 0x00000100 // set if VM shared between process
#define CLONE_FS 0x00000200 // set if fs info shared between processes
#define CLONE_FILES 0x00000400 // set if open files shared between processes
#define CLONE_SIGHAND 0x00000800 // set if signal handlers and blocked signals shared
#define CLONE_PTRACE 0x00002000 // set if we want to let tracing continue on the child too
#define CLONE_VFORK 0x00004000 // set if the parent wants the child to wake it up on mm_release
#define CLONE_PARENT 0x00008000 // set if we want to have the same parent as the cloner
#define CLONE_THREAD 0x00010000 // same thread group?
#define CLONE_NEWS 0x00020000 // new namespace group?
#define CLONE_SYSVSEM 0x00040000 // share system V SEM_UNDO semantics
#define CLONE_SETTLS 0x00080000 // create a new TLS for the child
#define CLONE_PARENT_SETTID 0x00100000 // set the TID in the parent
#define CLONE_CHILD_CLEARTID 0x00200000 // clear TID in the child
#define CLONE_DETEACHED 0x00400000 // unused
#define CLONE_UNTRACED 0x00800000 // set if the tracing process can't force `CLONE_PTRACE` on this clone
#define CLONE_CHILD_SETTID 0x01000000 // set the TID in the child
#define CLONE_STOPPED 0x02000000 // start in stopped state
#define CLONE_NEWUTS 0x04000000 // new utsname group
#define CLONE_NEWIPC 0x08000000 // new ipcs
#define CLONE_NEWUSER 0x10000000 // new user namespace
#define CLONE_NEWPID 0x20000000 // new pid namespace
#define CLONE_NEWNET 0x40000000 // new network namespace
#define CLONE_IO 0x80000000 // clone into context

线程库

  • POSIX标准要求:线程不仅仅是共享资源即可,其需要被视为 整体
    • 查看进程列表时,一组task_struct需要被展示为列表中 一个节点
    • 发送给进程信号,将被一组task_struct共享,并被其中 任意一个线程处理
    • 发送给线程信号,将只被对应task_struct接收、处理
    • 进程被停止、继续时,一组task_struct状态发生改变
    • 进程收到致命信号SIGSEGV,一组task_struct全部退出

LinuxThread线程库

LinuxThread线程库:Linux2.6内核前,pthread线程库对应实现

  • 特点
    • 采用1对1线程模型
    • 通过轻量级进程模拟线程
    • 线程调用由内核完成,其他线程操作同步、取消等由核外 线程库完成
    • 仅通过管理线程实现POSIX以上5点要求中最后一点

管理线程

管理线程:为每个进程构造、负责处理线程相关管理工作

  • 管理线程是主线程的首个子线程

    • 进程首次调用pthread_create创建线程时会创建、启动 管理线程
  • 管理线程负责创建、销毁除主线程外线程,成为LinuxThread 的性能瓶颈

    • 从pipe接收命令创建线程
    • 子线程退出时将收到SIGUSER1信号(clone时指定), 若不是正常退出,则杀死所有子线程并自杀
    • 主循环中不断检查父进程ID,若为1说明原父线程退出并 被托管给init线程,则杀死所有子进程并自杀
  • 通过LWP模拟线程存在的问题

    • LWP不共享进程ID
    • 某些缺省信号难以做到对所有线程有效,如:SIGSTOPSIGCONT无法将整个进程挂起
    • 线程最大数量收到系统总进程数限制
    • 管理线程是性能瓶颈,一旦死亡需要用户手动清理线程、 无人处理线程创建请求
    • 同步效率低,通过复杂的信号处理机制进行同步
    • 与POSIX兼容性差

Naive POSIX Thread Library

NPTL:Linux2.6内核重写线程框架的基础上引入的pthread线程库

  • 本质上还是利用LWP实现线程的1对1线程模型,但结合新的线程 框架实现了POSIX的全部5点要求

    • 线程组tgid引入体现task_struct代表进程还是线程
    • task_struct维护两套signal_pending
      • 线程组共享signal_pending:存放kill发送信号, 任意线程可以处理其中信号
      • 线程独有signal_pending:存放pthread_kill发送 信号,只能由线程自身处理
    • 收到致命信号时,内核会将处理动作施加到线程组/进程中
  • 但也存在一些问题

    • kill未展示的LWP会杀死整个进程
  • RedHat开发,性能远高于LinuxThreads,需要内核支持

Next Generation Posix Threads for Linux

NGPT:基于GNU Portable Threads项目的实现多对多线程模型

  • IBM开发,性能介于LinuxThread、NPTL间,2003年终止开发

Python并行

threding

_thread

_dummy_thread

dummy_threading

multiprocessing

concurrent

concurrent.futures

subprocess

sched

queue

Parallel

并发和并行

  • Parallel:并行,同时做多件事情,关于执行、实现
  • Concurrent:并发,能够处理多件事情,关于结构、逻辑
  • 并发问题可以使用并行方式解决,也可以串行解决

    • 100并发任务同时运行在4核CPU上,最多可能有4个并发任务 并行处理,其余只能是串行处理
  • 并行角度:硬件技术的物理限制瓶颈

    • 单计算核心能力不足,所以需要多核并行运算
    • 进程、线程可认为是实现并行的基本逻辑实体
  • 并发角度:程序执行的逻辑重用需求

    • 程序要求重用一组逻辑,所以需要将一组指令集打包,重复 调用该组指令集
    • 子程序、协程可认为是方便重用的基本逻辑实体,因此更 应是语言内建机制
      • 子程序:无状态保存,同样重入得到同样结果
      • 协程:有保存状态,重入会改变协程状态,结果可能 不同
  • 线程作为任务执行的实体,可以认为是子程序、协程的具体执行

    • 内核线程作为可以独立执行的实体,逻辑上更会被设计为 完成独立任务,即没有保存状态需求,因此多是子程序的 具体执行
    • 用户线程则用程序负责调度,二者执行实例均可
    • 某种意义上线程、子程序是对应的执行实体、逻辑实体

子程序、协程

  • 子程序可以看作时协程的特例
    • 只有一个状态,每次进入时局部状态重置
    • 唯一入口点
  • 协程可视为子程序的组成
    • 维护自身状态,所以逻辑上不独立 ,应该是作为被 调用对象
    • 对于每次返回部分结果值的协程(也称生成器迭代器), 可以直接视为类似链表之类的数据结构 (在某些语言中可以所有数据都是类,从这个角度这也都是 统一的)
子程序 协程
生命周期 后进先出 完全取决于需要
入口点 起始处 起始处、yield返回出口点
返回值 调用结束后返回全部 可每次yield返回部分值
  • 现代指令集通常提供对调用栈的指令支持,便于实现可递归 调用的子程序,在提供续体的语言环境(如Scheme),恰好 可用此抽象状态表示实现协程

Subroutine/Procedure/Function/Routine/Method

子程序:打包为整体、用于执行特定任务的指令集序列

  • 子程序是依赖可重入能力的弱化版本

    • 一旦唤醒,于起始点开始执行
    • 一旦退出,子程序结束
    • 子程序实例只返回一次,两次激活间不保存状态
  • 子程序中局部变量在每次调用/重入函数时都是相同的

    • 相同输入得到相同输出
  • procedure:过程,有时特指无返回值、仅有副作用

线程安全

线程安全:子程序在多线程环境调用时,能够正确处理多个线程之间 的共享变量,使程序功能能正确完成

  • 线程安全函数应该为每个调用其的线程分配专门空间,存储需要 单独保存的状态

  • Atomicity:原子性,操作不会被线程调度机制打断,一旦 开始就会运行到结束,中间不会有任何线程切换

    • 可以通过locksynchronized确保原子性
  • Visibility:可见性,某线程修改变量值后,其他线程能够 立刻感知
    • 一般可以通过volatile保证可见性,强制要求被修改值 从寄存器同步至主存
    • locksynchronized也可以通过限制其他线程访问变量 的方式保证可见性
  • Ordering:有序性/一致性,程序按照代码顺序执行
    • 可以通过volatile保证一定的有序性
    • 也可通过locksynchronized提供单线程执行环境保证 有序性

Instruction Reorder

指令重排:编译器对无相互依赖的指令重新排序执行

  • as-if-serial语义:指令可以为优化而重排序,但是必须保证 最终执行结果不变

    • 规则:重排序过程不破坏数据依赖关系
    • 只能保证单线程执行结果有效,但不保证多线程并发执行 的正确性

    instruction_reorder_asif

  • happens-before原则:保证前后两个操作间不会被重排序,

    • 程序次序规则:线程中每个操作happens-before该线程中 任意后续操作
    • 锁定规则:锁的解锁happens-before加锁
    • volatile变量规则:volatile变量写操作happens-before 其读操作
    • 传递规则:若A happens-before B、B happens-before C,则A happens-before C
    • 线程启动规则:线程对象启动happens-before线程中每个 动作
    • 线程中断规则:线程中断方法的调用happens-before被 中断线程代码检测到的中断事件的发生
    • 线程终结规则:线程中所有操作happens-before线程的 终止检测
    • 对象终结规则:对象的初始化happens-beforefinal 方法的开始
  • happens-before原则被JVM用于规定(跨线程)操作之间偏序 关系,若操作之间的关系可以由此原则退出,则两个操作有序

Reentrant

  • A computer program or routine is described as reentrant if it can be safely executed concorrently; that is, the routine can be re-entered while it is already running

可重入函数:对于相同(合法)的函数参数,多次重复调用(包括 执行过程中被中断再重入)结果总是可预期的

  • 可重入需要满足条件

    • 不在函数内部使用静态或全局数据,所有数据都由函数 调用者提供
      • 全局变量区
      • 中断向量表
    • 使用本地数据,或制作全局数据的本地拷贝保护全局数据
    • 不返回静态或全局数据
    • 不调用不可重入函数
  • 不可重入后果主要体现在信号处理函数这样需要重入情况中, 若在信号处理函数中使用了不可重入函数,则可能导致程序错误

  • 可重入函数总是线程安全的,反之不一定成立

    • 线程安全可以通过“并发不冲突”实现
    • 可重入则要求“并行不冲突”

Coroutine

协程:为非抢占式多任务产生子程序的程序组件,允许执行过程 中挂起、恢复

  • 挂起、恢复:协程可以通过yield(让步)调用其他协程暂时 退出,之后可在退出位置恢复执行

    • 从协程角度看,这是调用其他协程而不是退出
    • 但实际是各协程之间是对称的,而不像子程序调用 中主调-被调关系
    • 这即暗含
      • 协程可包含多个入口点
      • 允许在不同入口点暂停、开始执行程序
  • 局部状态维护:协程实例保持上次退出时状态

    • 则协程被唤醒时状态可能不同
    • 可能同时有多个给定协程实例
  • 协程将原在子程序外、输入状态管理工作交由自身逻辑维护
  • 原生不支持协程的语言也可以使用循环等构建
  • 经典状态机、对象已经具有协程特性

用途

  • 协程可以简化异步代码的实现,使得需要使用异步+回调的代码 可以使用看似同步方式写出

    • 协程本身只涉及状态保存、过程重入,和并发/异步无关系
    • 但协程本身蕴含的状态保存使得状态切换几乎无成本,适合 高并发任务
  • 协程在线程中调度完全由用户控制,可以视为用户态轻量级线程

    • 避免陷入无效内核级别上下文切换造成的性能损失
    • 较线程在IO密集任务上性能上更好

进程、线程

  • 这里仅讨论理论上的异同,不考虑各平台具体实现
  • 进程:具有独立功能的程序关于某数据集合的一次运行活动
  • 线程/子程序:进程内某特定任务的执行活动
  • 协程:推广协作式多任务的子程序
Process Thread Coroutine
调度、创建、切换、维护 内核(系统) 内核、自身 自身
切换开销、速度 大、慢 小、快
易用性 需要考虑进程退出、僵尸进程 只需要管理进程即可 无需管理
资源共享 独立 同进程内线程共享资源 除局部变量外均共享
通信 IPC较复杂:环境变量、文件、系统端口 较简单:共享内存 结果调用
移植性
健壮性 好,进程死亡不影响其他进程 差,线程死亡会导致进程(及线程)死亡
  • 进程、线程、协程可以看作是控制权逐渐从系统已经移交到自身 的过程
  • 这里的协程强调其在实现方面的资源、调度特点,其与子程序间 功能差异参加cs_program/parallel/implementation
  • 通信:参见cs_program/parallel/#todo
  • 移植性:基于进程分支多进程和windows模型有很大冲突,往往 不能在windows平台上使用

Process

进程:具有独立功能的程序关于某数据集合的一次运行活动

  • 进程是处于运行期的程序和相关资源的总称

    • 程序:代码、指令
    • 运行:对CPU的占用,逐渐发展为线程,标识进程中指令的执行
    • 资源:执行上下文,由进程内线程共享
  • 从系统调度方面看

    • 进程是系统进行资源分配、调度的独立单位
    • 在系统中有进程控制块(进程描述符)描述进程相关信息, 系统通过此控制块控制系统相关行为
  • 从资源分配方面看

    • 有独立的存储空间(虚拟寻址空间)
      • 独享的用户空间
      • 进程专用的“共享内核空间”
    • 可执行的程序代码
  • 线程可能对系统是可感知的,则进程不定是资源分配的基本 单位
  • Linux线程实现即类似进程,但不包含独立存储空间

调度角度

  • 内核跟踪进程运行所需的状态信息(上下文)

    • 主存、虚拟内存内容
    • 寄存器文件值
    • 文件句柄
  • 调度:分配CPU执行进程

    • 内核决定CPU控制权在进程间的转移
  • 上下文切换:进程状态的记录、恢复、切换

    • 保存当前进程上下文、恢复新进程上下文
    • 通过处理器在进程间切换,实现单个CPU“看似”并发执行 多个进程
    • 上下文进程间切换开销比较大,但相对比较稳定安全

资源角度

  • 独立内存空间/虚拟地址空间:每个进程“独占的”使用 内存、看到一致的存储器

process_virtual_address_space_structure

  • 用户空间

    • 程序代码、数据:对所有进程,代码从同一固定位置开始, 直接按照可执行目标文件的内容初始化
    • (运行时)堆:可在运行时动态扩展、收缩
    • 共享库:共享库代码和数据,如:C标准库、数学库
    • 栈:用于实现函数调用,运行时动态扩展、收缩,位于虚拟 地址空间顶部
  • 内核空间

    • 内核虚拟存储器:内核总是驻留在内存中,为其保留地址 空间顶部
    • 不允许程序读写区域内容或直接调用内核代码定义的函数

Thread

线程:进程执行实体,进程中包含指令的执行活动

调度角度

  • 线程是CPU调度、分派的基本单位,有时被称为轻量级进程 (在Linux系统中也是按照轻量级进程实现)
  • 线程引入逻辑

    • 进程内部可能存在多个不同task,task需要共享进程数据
    • 同时task操作的数据具有独立性,多个task不需要按照时序 执行
    • task间需根据不同策略进行调度,因此产生了线程概念, 并被引入作为内核调度基本单位
  • 线程:比进程更小的、能独立运行的基本单位

    • 线程能/是“独立运行”,但不一定能被内核感知到,也一定由 内核调度
    • 只能说线程是针对某个task的执行活动

资源角度

thread_virtual_address_space_structure

  • 线程运行在进程的上下文中,共享同样代码和全局数据

    • 进程代码段、公有数据
    • 进程打开的文件描述符、信号的处理器
    • 进程当前目录
    • 进程用户ID、进程组ID
  • 线程还独享某些个性以实现并发性

    • 线程ID:进程中唯一标识
    • 寄存器组值:线程间并发运行,线程有不同运行线索, 切换时需要保存当前线程的寄存器集合状态
    • 线程堆栈:独立函数堆栈保证线程内函数调用可以正常 执行,不受其他线程影响
    • 错误返回码:线程的系统调用错误可能未及时处理,独立 错误返回码避免其被其他线程修改
    • 线程的信号屏蔽码:线程感兴趣的信号不同,因此信号 屏蔽码应由自己管理
    • 线程优先级:线程需要像进程被调度,需要有相应的优先级

线程实现理论

User-Level Thread

用户级线程:由用户程序自身负责支持、调度

  • 特点

    • 相当于实现自己的线程调度内核,实现线程数据结构、 创建、销毁、调度维护
    • 线程运行在内核(可感知的)进程内
  • 优点

    • 即使系统不支持线程,也可通过库函数支持在系统中实现 真实的多线程
    • 线程只在用户态,减少内核态到用户态切换开销
  • 缺点:线程对系统透明,对系统每个进程只有一个线程, 系统直接调用进程

    • 当线程进行系统调用而阻塞时,系统会阻塞整个进程
    • 用户空间没有时钟中断机制,若线程长时间不释放CPU, 会导致阻塞其他线程

Kernel-Level Thread

内核级线程:系统内核支持的线程,通过内核完成线程切换

  • 优点/特点:系统负责线程的创建、销毁、调度、维护

    • 内核通过操纵调度器对线程进行调度,并负责将线程 的任务映射到各个处理器上
    • 程序可以直接使用系统调用已实现线程,无需实现线程 调度、对CPU资源抢占使用
  • 缺点:内核线程需要内核支持

    • 创建、销毁、调度、维护往往都需要系统调用,代价较高
    • 需要消耗内核资源,不能大量创建

线程调度模型

  • X对Y是指X task对应Y内核调度单位

N对1模型

thread_model_n_versus_1

N对1模型:线程库有实现用户级线程,内核未实现内核级线程

  • 系统只负责调用进程
  • 线程对系统透明,由进程自身负责调用
  • 此模型中内核没有实现内核级线程,所以内核调度单位就是进程 S

    1对1模型

thread_model_1_versus_1

1对1模型:线程库未实现用户级线程,内核实现有内核线程

  • 程序(逻辑)层面

    • 创建执行独立task的线程
    • 程序创建的线程直接由内核负责调度
  • 内核(实现)层面

    • 每次创建线程都是调用系统调用创建内核级线程
    • 可视为每个“用户创建的线程”同内核级线程绑定,二者一一 对应
  • 此模型中内核调度单位就是内核级线程
  • 此模型中不存在上述定义的用户级线程,图中Thread实际上 应该就是Kernel Thread,仅拆分出来表示是程序创建的线程

M对N模型

thread_model_m_versus_n

M对N模型:线程库实现有用户级线程,系统也实现有内核级线程

  • 程序(逻辑)层面

    • 创建执行独立task的用户级线程
    • 创建可独立被内核调度的内核级线程
    • 将若干和用户线程同内核线程相关联,即task组总体由内核 负责调度,task组内task由程序自身负责调度
  • 内核(实现)层面

    • 调用系统调用创建内核级线程
    • 内核线程执行指令中包含用户线程创建、调度指令
  • 优点

    • 用户级线程创建、切换、析构等均在用户空间中,依然 廉价,支持大规模用户线程并发
    • 内核级线程作为用户线程在内核中调度、执行的桥梁, 降低整个进程被完全阻塞的风险
  • 此模型中内核调度单位为内核级线程、task单位为用户线程, 二者比例不定
  • 有的说法里面会使用Light Weighted Process表示内核级线程

通用调度算法

  • 耗时相差不大的task队列总是比较好处理,难以调度的task是

    • 耗时相差大
    • IO任务、计算任务混合
  • 内核调度除考虑任务(线程)外,还会考虑进程因素

    • Gang Scheduling:尽量将同进程中线程同时调度,而非 随机从多个进程中挑选CPU数量线程调度
    • Space Sharing:将CPU划分,各进程仅允许占用部分CPU 执行并发
  • 从任务角度,调度需要考虑

    • Responsiveness
    • Schedule Overload
    • Starvation-Freedom:饥饿避免
    • Fairness

First-In-First-Out

先进先出:按task在队列中的顺序依次调用,执行完task再执行下个 task,仅在task结束后才会切换task

  • 优点

    • 最少task切换开销
    • 最大吞吐量(总处理效率)
    • 朴实公平
  • 缺点

    • 平均相应时间高

Shortest task First/Shortest Remained Time task

最短耗时task优先:有限调度耗时短task

  • 优点

    • 平均相应时间短:长耗时task不断推移,必然统计出较短 平均响应时间
  • 缺点

    • 不公平,长耗时task难被调度,容易饥饿
    • 频繁task切换,调度额外开销大

Round Robin

时间片轮转:给队列中每个task时间片,时间片结束之后task移至 队列末尾,切换到执行下个task

  • 优点

    • 每个task可以得到公平调度
    • 耗时短task即使在耗时长task之后也可以较快得到执行
  • 缺点

    • task切换引起的调度开销大,需要多次切换task上下文
    • 时间片不好设置
      • 时间片足够小则退化为SFJ
      • 时间片足够大则退化为FIFO
    • 需要知道task(剩余)执行时间

(Weighted) Max-Min Fairness

(带权重的)最大最小公平:资源按照需求递增的顺序分配,不存在 需求得到资源超过自身需求,未得到满足得到需求等价分享资源

  • 具体方案
    • 每轮开始将资源按照权重分配
    • 若需求大于被分配资源则推迟执行,进入下轮
    • 若需求小于被分配资源则执行,并将多余资源继续按照权重 分配给无法执行资源

Multi-level Feedback Queue

多级反馈队列:监控task处理耗时,若task未用尽分配资源则提高 优先级,否则降低其优先级

  • 具体方案

    • task、分片时长具有相对应的不同优先级
      • 分片时长越长优先级越低
      • 高级优先级task可以抢占低优先级task
      • 新task位于高优先级task
    • 同一优先级task使用Round Robin (事实上仅有最低优先级task使用Round Robin算法, 其他优先级都是FIFO)
      • 时间片用完后task结束则正常退出系统,否则优先级 下滑一等级
      • 若是主动让出CPU(IO等),则停留在当前优先级或 提升
  • 多核场景

    • 应该为每个CPU分配单独MFQ,同时采用 Affinity Scheduling保证task尽量同一CPU核上执行, 避免CPU cache频繁失效
    • 若共用MFQ,则容易出现
      • 随CPU增长,对MFQ锁争抢严重
      • task每次执行位于的CPU不同,CPU Cache需要在不同 CPU间转移

同步问题

生产者-消费者问题/缓存绑定问题

  • 生产者生成数据放入缓存,消费者从缓存获取、移除、消费数据 ,问题核心在于保证不让生产者在缓存已满时放入数据、不让 消费者在缓存为空时读取数据
  • 若缓存满:生产者者停止工作
  • 若缓存空:消费者停止消费
  • 消费者从缓存中取走数据,通知生产者工作
  • 生产者向缓存中放入数据,通知消费者消费
  • 不完善的解决方案会造成死锁:生产者、消费者均等待对方唤醒

信号量解决方案

  • mutex信号量:互斥信号量,确保只有一个生产者、消费者 操作缓存区,单一消费者、生产者可省略此信号量
  • fill_count信号量:已使用缓存区数量
  • empty_count信号量:空闲缓存区数量
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
semaphore mutex = 1
semaphore fill_count = 0
semaphore empty_count = BUFFER_SIZE

producer():
while true:
item = produce_item()
down(empty_count)
down(mutex)
put_item_into_buffer(item)
up(mutex)
up(fillcount)

consumer():
while true:
down(fill_count)
down(mutex)
item = remove_item_from_buffer()
up(mutex)
up(empty_count)
consume_item(item)

状态监控解决方案

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
item_count = 0
condition full
condition empty

add(item):
while item_count == BUFFER_SIZE:
wait(full)

item_count = item_count + 1
put_item_into_buffer(item)

if item_count == 1:
notify(empty)

remove():
while item_count == 0:
wait(empty)

item_count = item_count - 1
item = remove_item_from_buffer()

if item_count == BUFFER_SIZE - 1:
notify(full)

produer():
while true:
item = produce_item()
add(item)

consumer():
while true:
item = remove()
consume_item(item)
  • 互斥信号没有保护关键区,监控方法更好

其他

  • 协调生产者、消费者的关闭

    • 可在在队列中放置特殊的值,消费者读到时终止执行,结束 消费者线程
    • 有多个消费者时,消费者读取特殊值之后可将特殊值放回 队列中继续传递,直至关闭所有消费者

哲学家就餐问题

  • 哲学家围坐在圆桌旁,只能吃饭或者思考,每两个哲学家 之间只有一根筷子,只有同时拿到左右两根筷子才能正常吃饭
  • 实际计算机问题中,筷子视为共享资源

服务生

  • 引入服务生判断资源是否能被获取
  • 引入服务生,哲学家必须经过其允许才能拿起筷子

  • 服务生知道有哪些筷子正在被使用,能够判断是否会死锁

资源分级

  • 为资源(筷子)分配偏序关系
    • 约定所有资源都按此偏序获取、相反顺序释放
    • 且保证不会有无关资源同时被同一工作获取
  • 哲学家只能拿左右侧筷子:不会有无关资源被同一工作获取

  • 将筷子按顺序编号:资源分配偏序关系

    • 哲学家只能先拿左右筷子中编号较小者
    • 哲学家需要先放下筷子中编号较大者
  • 最实用的解法:为锁指定常量分级,强制获取顺序的顺序
  • 策略不总是实用的,尤其是所需资源列表事先不知道,可能需要 先释放已获取资源、获取低编号资源、重新获取资源,效率不高

Chandy/Misra解法

  • 标记资源,保留未使用资源、交出已使用资源,初始所以资源 已使用
  • 每根筷子分为干净、脏,最初所有筷子都脏

  • 对每对竞争同一筷子的哲学家,新拿筷子给编号较低者

  • 当哲学家需要某筷子时

    • 向其竞争对手发送请求
    • 拥有筷子的哲学家收到请求
      • 若筷子干净则保留
      • 否则擦干净交出
  • 哲学家吃完东西后,筷子变脏

    • 若有哲学家之前请求过该筷子,擦干净交出
  • 有很大并行性,适合任意大问题

读者-写者问题

  • 多线程同时访问共享内存地址,线程写入时其他线程不能读取、 写入,多个线程可以同时读取
  • 一般使用readers-writer lock解决问题

读者优先

  • 若共享内存被读取,其他读者可以立即、同时读取
  • 若一直有读者开始读取,则写者会一直被插队、无法修改

写者优先

  • 如果写者在排队,应该尽快写入共享内存
  • 若一直有写者准备写入,则读者会一直被插队、无法读取

限定时间

  • 共享内存区的锁定权要在限定时间内结束
  • 能避免读者、写者一直排队

熟睡的理发师问题

  • 理发店只有一名理发师、一张理发时坐的椅子、若干普通椅子 供顾客等待
    • 没有顾客时理发师在理发椅子上睡觉,顾客到达后离开、 或者叫醒理发师
    • 有顾客时,理发师为别人立法,顾客达到后若有空闲普通 椅子则坐下休息、否则离开
    • 理完发后,任选顾客开始理发
  • 理发师等待顾客、顾客等待理发师,造成死锁
  • 有顾客不按顺序等待,让某些顾客永远不能理发

3信标解决

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
semaphore customer = 0
semaphore barber = 0
semaphore mutex = 1
empty_chairs = BUFFER_SIZE

barber():
while true:
if empty_chairs == BUFFER_SIZE:
sleep()

down(mutex)
item = get_customer_from_chairs()
empty_chairs += 1
up(mutex)

down(barber)
cut_hair(item)
up(barber)

customer():
while true:
down(mutex)
if empty_chairs > 0:
empty_chairs -= 1

else:
wait() or leave()

up(mutex)

三个烟鬼问题

  • 香烟需要:烟草、卷烟纸、火柴,三个烟鬼分别有无限各一种, 不吸烟协调人会随机安排两个烟鬼各拿出一份材料放在桌上, 另外一个烟鬼拿到材料卷烟、抽
    • 桌上空了后,协调人就随机要求烟鬼拿出材料
    • 烟鬼只会在抽完手中烟后才会卷另一只
    • 若烟草、卷烟纸在桌上,有火柴的烟鬼在吸烟,直到该烟鬼 吸完烟拿走桌上材料才会继续
  • 问题模拟程序中4种角色,展示信标方法作用有限

并行开发

综述

python并行多任务均依赖操作系统底层服务并行执行python代码

  • 线程派生:基本所有主流平台均支持
  • 多进程
    • shell命令进程
    • 子python进程

跨平台多进程实现

  • 创建子python进程
    • 类Unix:fork系统调用实现进程分支,分支进程运行时 环境同主进程完全一致
    • Win:创建python进程,import当前调用代码得到类似 主进程运行时环境
  • pickle序列化被调函数,传递给子进程执行
  • 因为Win下分支进程需要导入当前调用,所以多进程代码必须 在__main__内,否则无限循环创建新进程
  • 进程池调用函数要声明在进程池创建之前,否则启动进程会报错

进程通信

  • python进程间数据传递基本都是通过pickle序列化传递,所以 需要传递的数据要支持pickle序列化

  • multiprocessing等模块派生进程需要传递被调函数,所以 不支持

    • lambda匿名函数
    • 绑定对象方法

其他相关模块、包

  • 多进程:pathospp
  • 进程通信:signal

注意事项

  • 子进程报错直接死亡,错误信息默认不会输出到任何地方,所以 子进程中,多使用try catch

os模块

派生进程

1
2
3
4
5
6
os.startfile(filename)
# 调用系统默认程序打开文件,类似于鼠标双击
# linux下没有实现
int = os.system(command(str))
# 运行shell命令,返回命令执行退出状态
# 和普通shell命令一样默认阻塞,除非后台运算符`&`

os.popen

1
2
pipe = os.popen(cmd(str), mode="r"/"w", buffering=-1)
# 运行shell命令并重定向其输出、输入流到管道开放
  • 用途:运行shell命令并重定向其输出、输入流到管道开放

  • 返回值:返回管道流对象

    • 类似shell中管道语法
    • 管道流对象类似普通文件流,支持一般读、写、迭代 (但是文档中只有close方法)

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      # `r`模式执行,读取子进程标准输出
      pipe.readlines()
      pipe.readline()
      pipe.read()
      for i in pipe:
      # 迭代器语法
      # `w`模式执行,写入子进程标准输入
      pipe.write()
      pipe.close()
      # 返回退出状态
      # 历史原因,退出状态为0时返回None
  • os.popen一般不会阻塞,但是python执行需要整个命令行程序 完成的语句时仍然会阻塞

    • 关闭管道对象
    • 一次性读取所有输出流
  • subprocess模块可以实现与os.systemos.popen相同的 效果,使用更复杂,但是对流的连接、使用提供更完善的控制

os.fork

进程分支是构建平行任务的传统做法,是Unix工具集的基本组成部分

  • 分支是开始独立程序的直接做法,无论调用程序是否相同
  • 分支想法基于复制
    • 程序调用分支例行程序时,操作系统会创建在该进程副本
    • 和进程并行的运行副本
    • 有些系统实现并不真的复制原有程序,以减少资源消耗, 但是新副本会像真实副本一样运行
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
import os

def child():
print("hello from child", os.getpid())
os._exit(0)

def parent():
while True:
newpid = os.fork()
# 子进程返回0
# 父进程返回子进程进程号
if newpid == 0:
child()
else:
print("hello from parent", os.getpid(), newpid)
if input() == "q":
break
  • 返回值:据此区分父、子进程,执行不同任务

    • 子进程中返回0
    • 父进程中返回子进程ID
  • os.fork仅仅是系统代码库中标准进程分支调用简单封装

    • 和C共用代码库
    • 在win下标准python版本中无法运行,和win模型冲突过多
    • Cygwin中python可以运行,虽然行为同真正Unix分支不完全 相同,但是足够接近
  • os.fork实际复制整个python解释器

    • 除非python脚本被编译为二进制机器码

os.exec

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
def execv(path, args=tuple/list)
def execl(path, *args=*list/*tuple)
# 以参数执行指定可执行文件,**代替**当前进程

def execvp(file, args)
def execlp(file, *args)
# `p` for `$PATH`
# 在系统搜索路径`$PATH`中定位可执行程序

def execve(path, args=tuple/list, env=dict)
def execle(path, args=*tuple/*list, env=dict)
# `e` for `environ`
# 将`env`字典作为环境环境变量传递

def execvpe(path, args=tuple/list, env=dict)
def execlpe(path, args=*tuple/*list, env=dict)
# 在系统搜索路径`$PATH`中定位可执行程序
# 将`env`字典作为环境环境变量传递
  • os.exec族的函数会覆盖当前进程

    • 所以该语句的代码都不会执行
    • 不会更改进程号
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    import os
    param = 0
    while True:
    param += 1
    newpid = os.fork()
    if newpid == 0:
    os.execlp("python", "child.py", str(param))
    assert False, "starting new program error"
    # 上句正常执行,这句永远无法被调用
    else:
    print("child is", pid)
    if input() == "q": break
  • multiprocessing模块的进程派生模型+os.exec配合使用 可以在win下实现类似os.fork的效果

spawn

1
2
os.spawn
# 启动带有底层控制的新程序

进程通信

1
2
3
4
5
6
read_fd, write_fd = os.pipe()
# 创建管道,返回管道写入、读取描述符
# 可以使用`os.fdopen()`封装管道描述符,方便读、写
os.mkfifo(path, mode=438, *, dir_fd=None)
# 创建命名管道,win不支持
# 仅创建外部文件,使用需要像普通文件一样打开、处理

subprocess模块

Popen

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
class Popen:
def __init__(self,
args(str,[str]),
bufsize=-1/0/1/int,
executable=None/str,
stdin=None/stream,
stdout=None/stream,
stderr=None/stream,
preexec_fn=None/callable,
close_fd=obj,
shell=False/True,
cwd=None/path,
env=None/dict,
universal_newlines=False/True,
startupinfo=None,
creationflags=0,
restore_signals=True/False,
start_new_session=False/True,
pass_fds=(),
encoding=None/str,
errors=None
)
  • 用途

  • 参数

    • args:需要执行的命令

    • executable:备选执行命令

    • stdin/stdout/stderr:执行程序标准输入、输出、 错误流连接对象

      • 默认:当前进程标准输入、输出、错误
      • subprocess.PIPE=-1:当前管道对象标准…
    • preexec_fn:子进程执行前在子进程中调用的对象

      • POSIX only
    • close_fds:控制关闭、继承文件描述符

    • shell:是否通过shell执行命令

      • 执行shell内置命令则必须置True
        • win:type
        • linux:set
      • Linux下False:由os.execvp运行
    • cwd:子进程执行目录

    • env:子进程环境变量

    • universal_newlines:是否在3个标准流中使用行结尾符

      • 即是否按照文本处理3个标准流
    • startupinfo:windows only

    • restore_signals:POSIX only
    • start_new_session:POSIX only
    • pass_fds:POSIX only

.communicate

1
(stdout, stderr) = communicate(self, input=None, timeout=None)
  • 用途:和子进程交互,阻塞直到子进程终止
    • input传递给子进程标准输入
    • 返回标准输出、错误输出
    • universal_newlinesTrue时,input参数、返回值 应该为str,否则为bytes

其他方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
def kill(self):
# kill进程通过*SIGKILL*信号

def terminate(self):
# 终止进程通过*ISGTERM*信号

def send_signal(self, sig):
# 向进程传递信号

def poll(self):
# 检查子进程是否终止,设置、返回`.returncode`属性

def wait(self, timeout=None, endtime=None):
# 等待直到子进程终止,返回`.returncode`属性

pipe.stdin
pipe.stdout
pipe.stderr
# 管道标准输入、输出、错误流
# 创建子进程时,可以选择和子进程相应流连接
# 支持读、写、迭代
pipe.returncode
# 子进程退出状态
  • Popen创建对象对象之后会立刻执行

  • 同时指定stdoutstdin参数可以实现管道双工工作

    • 需要注意,读写时交替发送缓冲数据流可能导致死锁

call

1
2
3
4
subprocess.call(
"type hello.py",
shell=True/False
)

_thread

  • 为系统平台上的各种线程系统提供了可移植的接口
  • 在安装了pthreads POSIX线程功能的系统上,接口工作方式 一致,无需修改源码即可正常工作
  • 基本可以完全被threading模块替代了

start_new_thread

1
2
3
4
5
6
7
def start_new_thread(
callable,
args=tuple/list,
kwargs=dict
)
def start_new():
# deprecated,同上
  • 用途:开启新线程,以参数调用callable

  • 返回值:应该是线程起始地址

  • 派生线程在函数返回后退出

    • 若在线程中函数抛出未捕捉异常,打印堆栈跟踪记录、退出 线程
    • 程序其他部分继续运行
  • 大多数系统平台上,整个程序主线程退出时,子线程随之退出

    • 需要一些处理避免子线程意外退出

其他方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
Lock = _thread.alloacate_lock()
# 获取一个`Lock`锁
# 等价于`threading.Lock()`
Lock = _thread.allocate()
# deprecated,同上

RLock = _thread.RLock()
# 获取一个`RLock`可重入锁
# 等价于`threading.RLock()`

def _thread.exit()
# 退出当前线程,可捕捉
# 等价于显式`raise SystemExit`、`sys.exit()`
def _thread.exit_thread()
# 同上

例子

例1

  • 全局线程锁保护对输出流写入
  • 全局线程锁实现主线程、子线程间通信,保证主线程在子线程 之后退出
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
import _thread as thread

stdoutmutex = thread.allocate_lock()
# 创建全局标准输出锁锁对象
exitmutexes = [thread.allocate_lock() for _ in range(10)]
# 为每个线程创建锁
exitmutexes_bool = [False] * 10
# 线程共享内存,同样可以使用全局变量作为信号量,而不用
# 额外开销

def counter(myId, count):
for i in range(count):

stdoutmutex.acquire()
# 线程向标准输出流写入时,获得锁
print("[%s] => %s" % (myId, i))
stdoutmutex.release()
# 向标准输出流写入完毕后,释放锁

with stdoutmutex:
# 线程锁同样支持`with`上下文管理器
print("[%s] => %s again" % (myId, i))

exitmutexes[myID].acquire()
# 线程执行完毕后获取对应自身id的锁,通知主线程

for i in range(10):
thread.start_new_thread(counter, (i, 100))
# 创建、启动新线程

for mutex in existmutexes:
# 检查所有信号锁
while not mutex.locked():
# 直到信号锁被获取,结束死循环
pass
print("main thread exiting")

例2

  • with上下文管理器使用锁
  • 全局变量实现主线程、子线程通信,避免主线程在子线程之前 退出
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
import _thread as thread
import time

stdoutmutex = thread.allocate_lock()
exitmutexes_bool = [False] * 10

def counter(myId, count):
for i in range(count):
with stdoutmutex:
# 线程锁同样支持`with`上下文管理器
print("[%s] => %s again" % (myId, i))
exitmutexes[myID] = True

for i in range(10):
thread.start_new_thread(counter, (i, 100))

while not all(exitmutexes):
time.sleep(0.25)
# 暂停主线程,减少占用CPU进行无价值循环
print("main thread exiting")

threading

Thread

1
2
3
4
5
6
7
8
9
10
class Thread:
def __init__(self,
group=None,
target=callable,
name=None/str,
args=(),
kwargs={},
*,
daemon=None/True/daemon):
pass
  • 用途:可控线程类,有两种方法使用

    • 传递callable参数创建新对象
    • 继承、覆盖run方法:代码和Thread深耦合,可能 不方便代码复用,如multiprocessing模块
  • 参数

    • group:保留参数用于未来扩展
    • target:可执行对象,将被run invoke
    • name:线程名,缺省Thread-N
    • args:用于invoke target参数tuple
    • kwargs:用于invoke target keyword参数dict
    • daemon:是否是守护线程
      • 默认情况下,主进程(线程)会等待子进程、线程退出 后退出
      • 主进程(线程)不等待守护进程、线程退出后再退出
      • 注意:主进程退出之前,守护进程、线程会自动终止
  • 若衍生类覆盖此构造器方法,务必首先调用此方法

.run

1
2
def run(self):
pass
  • 用途:代表线程活动
    • run用于invoke target
    • 覆盖此方法设置线程活动

.start

1
2
def start(self):
pass
  • 用途:开始线程活动
    • 线程创建完成后不会立即执行,需要手动调用.start启动
    • 多次调用raise RuntimeError

.join

1
2
3
def join(self,
timeout=None/float):
pass
  • 用途:等待直到线程结束

    • join:将线程加入当前线程
    • 可以多次调用
    • 试图导致死锁时,将会raise RuntimeError
  • 参数

    • timeout:指定超时时间,单位秒
      • 缺省否则阻塞直到线程结束

其他方法

1
2
3
4
5
6
7
8
9
10
bool = is_alive(self):
# 返回线程是否存活

def setDaemon(self, daemonic):
# 设置守护进程
bool = isDaemon(self):

def setName(self, name):
# 设置线程名
def getName(self):

Event

1
2
3
4
5
6
7
8
9
10
11
12
13
14
class Event():
def set(self):
# 设置信标值为`True`,发送信号

bool = is_set(self):
# 查看信标是否被设置

bool = wait(self, timeout):
# 阻塞,直到超时、信标被设置为`True`
# 返回信标值,即因超时返回时返回`False`

def clear():
# 重置Event对象,设置信标值为`False`

  • 用途

    • 发送信号:is_set触发事件
    • 接收信号:wait阻塞直到事件发生
  • Event中包含信标,可在线程中设置、接收,实现线程 间同步

    • Event对象信标默认设置为False,等待Event对象线程 会阻塞直到信标设置为真

    • 若有线程设置信标为真,则将唤醒所有等待该Event 对象线程

  • 若只想唤醒单个线程,用信号量、Condition代替

.clear

.clear可以重置Event对象

  • 难以确保安全清理、重新赋值Event对象,可能导致错过事件 、死锁

  • 且无法保证重置Event对象的代码能在线程再次等待此Event 信号之前执行

  • 所以Event对象最好单次使用,即其信标设置为真应立刻丢弃

  • 若线程需不停重复使用Event对象,使用Condition代替

Condition

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
class Condition():
def __init__(self, lock=None)
# `lock`:`Lock`、`RLock`对象,被用作底层锁
# 缺省创建新`RLock`对象作为底层锁

bool accquire():
# 获取一次condition内部锁

bool release():
# 释放一次condition内部锁

def notify(self, n=1):
# 唤醒至多`n`个on this condition的线程

def notify_all(self):
# 唤醒所有on this condition的线程

bool = wait(self, timeout=None):
# 释放底层锁,阻塞
# 直到被唤醒再次获取底层锁、超时,返回

bool = wait_for(self, predicate(callable), timeout=None):
# `wait`直到`predicate`返回`True`
  • 用途:Condition对象wait等待信号、notify唤醒一定 数量线程实现线程同步

  • 说明

    • 以上所有方法执行前均需要已获得底层锁,否则 raise RuntimeError
    • 因此以上方法一般都需要放在with代码块中,保证已经 获取了内部锁

with上下文管理器

1
2
3
4
5
with c:
c.wait()

with c:
c.notify()
  • with进入:获取condition底层锁,保证调用方法前已经获得 底层锁
  • with退出:释放condition底层锁
  • Condition支持with上下文管理器,而且非常必须
  • help里面看不到.acquire.release方法,但是是有 而且可以调用的,应该是官方不建议使用

.wait

  • 用途

    • 方法先释放底层锁,阻塞,使得其他等待 获取此对象底层锁获得锁
    • 等待被notify唤醒,再次获取锁,继续执行
  • 底层锁是RLock

    • .wait不是调用其.release()方法,而是调用RLock 内部方法确保真正释放锁,即使RLock被递归的获取多次

    • 再次获取锁时,调用另一个内部接口恢复递归层次,即 RLock内部计数

    • RLock本身性质:在没有被持有时,其内部计数被重置 为1,其他线程可以自由获取

.notify

  • .notify并不释放condition底层锁
  • 只是控制能够获取底层锁的线程数量

Semaphore

1
2
3
4
5
6
7
8
9
class Semaphore(builtins.object):
def __init__(self, value):
# `value`:起始许可证数量(最多允许同时执行线程数目)

bool = acquire(self, blocking=True, timeout=None):
# 获取信号量,内部计数(许可证)减1

def release():
# 释放信号量,内部计数(许可证)加1
  • 用途:Semaphore对象release方法生产、acquire方法 消耗信号量,实现线程通信

    • 可以像标准锁一样使用信号量作线程同步,但会增加复杂性 影响性能
    • 更适用于需要在线程间引入信号、限制的程序,如限制代码 的并发访问量
  • 信号量对象是建立在共享计数器基础上的同步原语

.acquire

  • 用途:获取信号量,内部计数(许可证)大于0则立刻减1

    • 内部计数>0-1立即返回True
    • 内部计数=0,阻塞、等待,直到其他线程调用release
  • 返回值:成功获取许可证则返回True,否则返回False

.release

  • 用途:释放信号量,内部计数(许可证)加1
    • 内部计数=0,表明有线程阻塞,随机唤醒线程

BoundedSemaphore

1
2
3
4
class BoundedSemaphore(Semaphore):
def release(self):
# 释放信号量,内部计数加1
# 当信号量总数超过初始化值时`raise ValueError`

Lock

  • Threading.Lock等价于_thread.allocate_lock,二者 都是工厂方法,返回lock类的实例
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
class lock:
bool = acquire(blocking=True/False, timeout=-1)
# 尝试获得锁,返回是否获得锁
bool = acquire_lock
# deprecated,同`acquire`

bool = locked()
# 返回锁状态
bool = lockec_lock()
# deprecated,同`lock`

bool = release()
# 释放锁,若锁本身未获取`raise RuntimeError`
bool = release_lock()
# deprected,同`release`
  • 用途:同步原语,排他性使用某资源

  • 说明

    • 支持with上下文语法,代码块执行前自动获取锁,执行 结束后自动释放
    • 为了避免出现死锁,每个线程应该一次只允许获取一个锁, 否则应该使用更高级死锁避免机制
    • 适合简单的锁定可变对象
  • lock对象应该是C实现,里面的方法是没有self参数的

其他

Lock锁可以视为使用更加方便的全局变量

  • 可以用于线程之间的通信:给每个子线程分配单独一个锁, 主线程、子线程可以通过锁状态通信

  • 很大程度上可以被全局变量“替换”

    • 获得锁:不断检查全局变量状态,阻塞直到全局变量 状态代表锁可获得,修改全局变量状态代表锁被获取

    • 释放锁:修改全局变量状态代表锁可获得

    • 不断检查变量状态会无意义占用CPU时间,可以在检查 间隙使用time.sleep()暂停线程

RLock

  • 工厂方法,返回RLock实例
1
2
3
4
5
6
7
class RLock:
bool = acquire(block=True):
# 尝试获取锁,返回是否获取锁
# 每次获取锁,内部计数器加1
bool = release()
# 释放锁,若锁本身未获取`raise RuntimeError`
# 每次释放锁,内部计数器减1
  • 用途:可重入锁,可以被同一线程多次获取

    • 若锁被当前线程获取多次,则需要被释放同样次数才能被 其他线程获取
    • 即只有内部计数器回到初始状态才能被任意线程获取
  • 没有线程持有锁时,RLock内部计数被重置为1

    • 应该是RLock就是通过内部计数记录被获取次数
  • 常用于给类整体上锁

    • 类内部每个方法都获取锁
    • 类内部方法之间相互调用
    • 这样类实例方法每次只能有一个线程完整调用
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    import Threading

    class SharedCounter:
    _lock = threading.RLock()
    # 被所有实例共享的类级锁
    # 需要大量使用计数器时,内存效率更高
    def __init__(self, intial_value=0):
    self._value = initial_value

    def incr(self, delta=1):
    with ShareCounter:
    self._value += delta

    def decr(self, deta=1):
    with SharedCounter:
    # 获取锁之后,调用也需要获取锁的`.incr`方法
    self.incr(-delta)

local

  • 用途:创建本地线程存储对象,该对象属性保存、读取操作只对 当前线程可见
    • 可以用于保存当前运行线程状态,隔离不同线程间数据
      • 套接字对象

例子

提前终止线程

轮询方法
  • 函数封装在类中,在类中设置成员变量作为轮询点
  • 方法terminate改变轮询点状态,用于外部调用结束线程
  • 线程方法run检查轮询点状态判断是否结束自身
    • 线程执行类似IO的阻塞操作时,无法返回自身、无法检查 轮询点,通过轮询终止线程难以协调
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
from threading import Thread
import time

class CountDownTask:

def __init__(self):
self._running = True

def terminate(self):
self._runing = False

def run(self, n):
while self._running and n > 0:
# 设置轮询点,告诉线程何时应该终止
print("T-minus", n)
n -= 1
time.sleep(5)

def test():
c = CountdownTask()
t = Thread(karget=c.run, args=(10,0))
# 创建Thread
t.start()
# 启动线程
c.terminate()
c.join()
超时循环
  • 设置任务超时,超时自动返回
    • 任务只要超时就会返回,不会出现线程阻塞
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
class IOTask:
def terminate(self):
self._running = False

def run(self, sock):
sock.settimeout(5)
# 设置超时
while self._running:
try:
data = sock.recv(8192)
break
except socket.timeout:
continue
...continued processing...
...terminated...
return

线程同步、通信

Event方式
1
2
3
4
5
6
7
8
9
10
11
12
from threading import Event, Threading

def send_signal(start_evt):
start_evt.set()
# 设置事件

def recv_signal():
start_evt = Event()
# 创建新事件
t = Thread.threading(target=send_signal, start_evt)
start_evt.wait()
# 阻塞直到接收到信号
Condition实现queue.Queue
  • 自定义数据类型,封装Condition实例进行线程间同步
  • put方法生产,notify通知消费者
  • get方法消费,阻塞直到被唤醒
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
import heapq
from threading import Condition

class PriortyQueue:
def __init__(self):
self._queue = [ ]
self._count = 0
self._cv = Condition()
# 封装`Condition`实例实现线程间同步

def put(self, item, priority):
with self._cv:
heapq.heappush(self._queue, (-priority, self._count, item))
self._count += 1
self._cv.notify()

def get(self):
with self._cv:
# 阻塞,直到空闲
while len(self._queue) == 0:
self._cv.wait()
# `Condition`默认使用`RLock`,可重入
return heapq.heappop(self._queue)[-1]

防死锁机制

严格升序使用锁
  • local保存当前线程状态,隔离不同线程锁数据
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
from threading import local
from contextlib import contextmanager

_local = local()

@contextmanager
# 使支持`with`上下文管理器语法
def acquire(*locks):

locks = sorted(locks, key=lambda x: id(x))
# 根据*object identifier*对locks排序
# 之后根据此list请求锁都会按照固定顺序获取

acquired = getattr(_local, "acquired", [ ])
# 为每个线程创建独立属性保存锁
if acquired and max(id(lock)) for lock in acquired >= id(locks[0]):
# `_local`中已有锁不能比新锁`id`大,否则有顺序问题
raise RuntimeError("lock order violation")

acquired.extend(locks)
_local.acquired = acquired
# 更新线程环境中锁

try:
for lock in locks:
# 只允许升序获取锁
lock.acquire()
yield
# `with`语句进入处
finally:
# `with`语句退出处
for lock in reversed(locks):
# 只允许降序释放锁
lock.release()
del acquired[-len(locks):]
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
def thread_1(x_lock, y_lock):
while True:
with acquire(y_xlock, y_lock):
print("Thread_1")

def thread_2(x_lock, y_lock):
while True:
with acquire(y_lock, x_lock):
print("Thread_2")

def test():
x_lock = threading.Lock()
y_lock = threading.Lock()

t1 = threading.Thread(target=thread_1, args=(x_lock, y_lock)):
t1.daemon = True
t1.start()

t1 = threading.Thread(target=thread_2, args=(x_lock, y_lock))
t2.daemon = True
t2.start()

queue模块

Queue

1
2
3
class Queue(builtins.object):
def __init__(self, maxsize=0):
# `maxsize`;限制可以添加到队列中的元素数量

queue.Queue:创建被多个线程共享的Queue对象

  • 线程安全的数据交换方式,基于collections.deque

    • Queue对象已经包含必要的锁,可以在多个线程间安全的 共享数据
    • Queue实际上是在线程间传递对象引用,不会复制 数据项,如果担心对象共享状态,可以传递不可修改数据 结构、深拷贝
  • 还可以使用Condition变量包装数据结构,实现线程线程中 间通信

get

1
2
3
4
5
6
obj =  get(self,
block=True,
timeout=None/num)

obj = get_nowait(self):
# 等同于`get(block=False)`
  • 用途:从队列中移除、返回一个元素

  • 参数

    • block
      • False:没有空闲slot立即raise Empty exception

.task_done

1
2
3
4
def task_done(self):
# 指示前一个队列“任务”完成
def join(self):
# 阻塞直到队列中所有元素被获取(消费)、**处理**
  • 说明
    • .join阻塞时,要所有队列中元素都被告知task_done, 才能解除阻塞
    • 即队列消费者每次get元素、处理后,要手动调用 task_done告知队列任务已完成
  • 也可以将Event和队列元素封装,用Event对象告知队列元素 处理完成

.put

1
2
3
4
5
6
7
8
def put(self,
item,
block=True,
timeout=None/num):
pass

def put_nowait(self, item):
# 等价于`put(self, item, block=False)`
  • 用途:向队列中追加元素

  • 参数

    • block
      • False:没有空闲slot立即raise Full exception

.qsize

1
2
3
4
5
6
7
8
9
10
int = qsize(self):
# 返回队列大概大小(不可靠)
# 非线程安全,在其使用结果时队列状态可能已经改变

bool = empty(self):
# deprecated,使用`qsize() == 0`替代
# 和`qsize()`一样非线程安全

bool = full(self):
# deprecated,使用`qsize() > n`替代,非线程安全

例子

队列元素消费通知

Event进队列
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
from queue import Queue
from threading import Thread, Event

def producer(out_q):
while running:
evt = Event()
out_q.put((data, evt))
# 将`Event`实例放入队列
evt.wait()
# 阻塞直到收到消费者信号

# A thread that consumes data
def consumer(in_q):
while True:
data, evt = in_q.get()
evt.set()
# 告知生产者消费完成

协调生产、消费线程终止

队列中添加特殊值
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
_sentinel = object()

def producer(out_q):
while running:
out_q.put(data)
out_q.put(_sentinel)
# 将特殊值放入队列,结束生产、通知消费者
def consumer(in_q):
while True:
data = in_q.get()
if data is _sentinel:
# 消费者接收到特殊值,结束生产
in_q.put(_sentinel)
# 特殊信号放回队列,继续传递
break

Queue实现线程池

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
from socket import socket, AF_INET, SOCK_STREAM
from threading import Thread
from queue import Queue

def echo_client(q):
sock, client_addr = q.get()
while True:
msg = sock.recv(65536)
if not msg:
break
sock.sendall(msg)
print("closed")

def echo_server(addr, nworkers):
q = Queue()
for n in range(nworkers):
t = Thread(target=echo_client, args=(q,))
t.daemon = True
t.start()
socket = socket(AF_INET, SOCK_STREAM)
socket.bind(addr)
sock.listen(5)
while True:
client_sock, client_addr = sock.accept()
q.put((client_sock, client_addr))

multiprocessing模块

  • multiprocessing支持一个基本与平台无关的进程派生模型, 在Unix、Windows下均可正常工作

  • 实现方式

    • 启动一个新的Python进程,import当前模块
    • pickle序列化需要调用的函数,传递至新进程执行
  • 其中PoolQueuePipe等实际都是其封装其子模块类 的工厂方法

Process

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
class Process():
def __init__(self,
group=None,
target=None/callable,
name=None/str,
args=()/list,
kwargs={}/dict,
daemon=None
):
self.authkey
self.daemon
self.exitcode
self.name
self.pid

def is_alive(self):

def join(self,
timeout=None/num
):

def start(self):
# 启动进程

def run(self):
# `start`方法调用`run`方法,若进程实例未传入`target`,
# `start`默认执行`run`方法

def terminate():
# 立即结束进程
  • 用途:Multiprocessing核心,类似于Thread,实现多进程 创建、启动、关闭

  • 成员方法基本类似Thread

1
2
3
4
5
6
7
8
9
10
11
from multiprocessing import Process
import os

def test(name):
print("Process ID: %s" % (os.getpid())
print("Parent Process ID: %s" % (os.getppid()))

if __name__ == "__main__":
proc = Process(target=test, args=("nmask",))
proc.start()
proc.join()

Pool

1
2
3
4
5
6
7
8
class Pool:
def __init__(self,
processes=None/int,
initializer=None,
initargs=(),
maxstacksperchild=None/int,
context=None
):
  • 用途:创建管理进程池,提供指定数量进程供用户调用
    • 新请求提交到pool中时
      • 若进程池没有满,将创建新进程执行请求
      • 否则请求等待直到池中有进程结束,然后创建新进程
    • 适合子进程多且需要控制子进程数量时

apply_async

1
2
3
4
5
6
7
8
def apply(self, func, args=(), kwds={}):
# 分配进程执行`func(*args, **kwds)`,阻塞
# 返回值:`func(item)`返回值

def apply_async(self, func, args=(), kwds={},
callback=None, error_callback=None)
# 异步分配进程执行调用,非阻塞
# 返回值:`ApplyResult`对象
  • 返回值:异步非阻塞调用返回结果操作句柄ApplyResult
  • 回调函数要有返回值,否则ApplyResult.ready()=False, 回调函数永远无法完成
ApplyResult
1
2
3
4
5
6
7
8
9
10
11
class ApplyResult:
def __init__(self, cache, chunksize, length,
callback, error_callback):

def get(self, timeout=None):

bool = ready(self):

bool = successful(self):

bool = wait(self, timeout=None):

map_async

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
def map(self, func, iterable, chunksize=None):
# 同步多进程`map(func, iterable)`,阻塞直到全部完成
# 返回值:结果列表,结果按调用顺序

def map_async(self, func, iterable, chunksize=None,
callback=None, error_callback=None):
# 异步多进程`map`,非阻塞
# 返回值:`MapResult(ApplyResult)`对象

def imap(self, func, iterable, chunksize=1):
# 迭代器版`map`,更慢、耗内存更少
def imap_unordered(self, func, iterable, chunksize=1):
# 返回结果无序版`imap`

def starmap(self, func, iterable, chunksize=1):
# 同步`map`,参数被解构`func(*item)`,阻塞

def starmap_async(self, func, iterable, chuncksize=None,
callback=None, error_callback=None):
# 异步`startmap`,阻塞

终止

1
2
3
4
5
6
7
8
9
def close(self):
# 关闭进程池,不允许添加新进程

def join(self):
# 阻塞,直至进程池中所有进程执行完毕
# 必须先`.close`进程池

def terminate(self):
# 终止进程池

Queue

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
class SimpleQueue():
bool empty(self):
def get(self):
def put(self):
class Queue:
def __init__(self, maxsize=0, *, ctx):
def join_thread(self):
# join feeder线程
def cancel_join_thread(self):
# 控制在进程退出时,不自动join feeder线程
# 有可能导致部分数据在feeder中未被写入pipe而丢失
def close(self):
# 关闭feeder线程
def qsize(self):
def empty(self):
def full(self):
def get(self, block=True, timeout=None):
def get_nowait(self):
def put(self, obj, block=True, timeout=None):
def put(self):
class JoinableQueue(Queue):
def task_done():
def join():
  • 用途:进程安全队列

  • multiprocessing.Queue基于multiprocessing.Pipe构建

    • 数据传递时不是直接写入Pipe,而是写入本地buffer, 通过feeder线程写入底层Pipe,从而实现超时控制、 非阻塞put/get

    • 所以提供了.join_threadcancel_join_threadclose函数控制feeder流行为

    • 相当于线程安全的queue.Queue的多进程克隆版

  • multiprocessing.SimpleQueue:简化队列

    • 没有Queue中的buffer,没有使用Queue可能有的问题, 但是put/get方法都是阻塞的、没有超时控制

Pipe

  • mltiprocessing.Pipe()返回两个用管道相连的、读写双工 Connection对象
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
class Connection():
def __init__(self, handle,
readable=True,
writable=True):

def close(self):
def fileno(self):
# 返回描述符或连接处理器
bool = poll(self, timeout=0):
# 是否有输入可以读取
obj = recv(self):
# 接收pickable对象
# 若接收字节流不能被pickle解析则报错
bytes = recv_bytes(self, maxlength=None):
# 接收字节流作为`bytes`
int = recv_bytes_into(self, buf, offset=0):
# 接收字节流存入可写`buf`
# 返回读取的字节数量
def send(self, obj):
# 发送对象
def send_bytes(self, buf, offset=0, size=None):
# 发送bytes-like对象中字节数据
  • 对象会在每条信息前添加标志字节串,可以自动处理多条信息 堆叠

    • 可以通过os.read(connect.fileno())获取

共享内存

1
2
3
4
5
6
7
8
class SynchronizedBase:
def __init__(self, obj, lock=None, ctx=None):

def get_lock(self):
# 获取`multiprocessing.synchronize.RLock`

def get_obj(self):
# 获取数据对象,C数据结构`ctypes.`

Value

1
2
3
4
def Value(typecode_or_type, *args, lock=True):
# 返回同步共享对象
class Synchronized(SynchronizedBase):
value

Array

1
2
3
4
5
6
7
8
9
def Array(typecode_or_type, size_or_initializer, *,
lock=True)

def SynchronizedArray(SynchronizedBase):
def __getitem__(self, i):
def __getslice__(self, start, stop):
def __len__(self):
def __setitem(self, i, value):
def __setslice__(self, start, stop, values):

Manager

常与Pool模块一起使用,共享资源,不能使用QueueArray

其他方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
import multiprocessing as mltp
l1 = mltp.Lock()
# 获取进程锁
rl1 = mltp.RLock()
# 获取可重入锁
s1 = mltp.Semaphore(value=int)
# 获取信号量对象
bs1 = mltp.BoundedSemaphore(value=int)
# 获取有上限信号量对象
e1 = mltp.Event()
# 获取事件对象
cv1 = mltp.Condition(lock=None)
# 获取Condition对象
cp = mltp.current_process()
# 获取当前Process对象

concurrent.futures

  • 异步并发模块

ThreadPoolExecutor

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
class ThreadPoolExecutor:
def __init__(self,
max_workers=None,
thread_name_prefix=''):

def shutdown(self, wait=True):
# 清理和该执行器相关资源

def submit(self, fn(callable), *args, **kwargs):
# 以指定参数执行`fn`
# 返回:代表调用的future,可以用于获取结果

def map(self, fn, *iterables,
timeout=None,
chunksize=1/int)
# 并行`map(fn, iterables)`
# 返回:按调用顺序的结果迭代器
  • 用途:创建线程池

.shutdown

  • 用途:关闭执行器,清理和该执行器相关的资源

    • 可以多次调用,调用之后不能进行其他操作
  • 参数

    • wait:阻塞,直到所有运行futures执行完毕,所有 资源被释放

ProcessPoolExecutor

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
class ProcessPoolExecutor:
def __init__(self,
max_workers=None):

def shutdown(self, wait=True):
# 清理和该执行器相关资源

def submit(self, fn(callable), *args, **kwargs):
# 以指定参数执行`fn`
# 返回:代表调用的`Future`实例,可以用于后续处理

def map(self, fn,
*iterables,
timeout=None,
chunksize=1/int)
# 并行`map(fn, iterables)`
# 返回:按调用顺序的结果迭代器
  • 用途:创建进程池

    • 参考ThreadPoolExecutor.map方法支持chunksize 参数
  • 常使用with语句使用

    1
    with ProcessPoolExecutor() as Pool:
    • 处理池执行完with语句块中后,处理池被关闭
    • 程序会一直等待直到所有提交工作被完成

注意事项

  • 被提交的任务必须是简单函数形式,不支持方法、闭包和 其他类型可执行

  • 函数参数、返回值必须兼容pickle模块,因为进程间通信 交换数据使用其序列化

  • 函数不要修改环境

    • 被提交的任务函数出打印日志之类等简单行为外,不应该 有保留状态、副作用
  • 混合使用进程池、多线程时

    • 创建任何线程之前先创建、激活进程池
    • 然后线程使用同样的进程池进行计算密集型工作

Future

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
class Future:
def add_done_callback(self, fn):
# 添加future完成的回调函数
# 多次调用添加的回调函数按添加顺序执行

bool = cancel(self):
# 尝试取消当前future,返回是否成功取消
# 正在运行、执行完成的future不能被取消

bool = cancelled(self):
# 查看当前future是否被取消

bool = done(self):
# 查看当前future是否被取消、完成

def exception(self, timeout=None):
# 返回当前future代表的调用的exception

def result(self, timeout=None):
# 返回当前future代表调用的返回值
  • 用途:代表一个异步计算的结果
    • 直接创建对象无价值

socket模块

通信、锁

函数通信

  • 简单通信方式:允许进程向其他进程发送简单信息

    • 命令行参数
    • 信号:受控于操作系统的非同步事件机制
    • shell环境变量
    • 程序退出状态码
    • 简单文件
  • 匿名管道:允许共享文件描述符的线程、进程传递数据

    • 仅在进程内部存在,通常和进程分支合用作为父进程、 子进程之间的通信手段、线程之间通信

    • 依赖于类Unix下进程分支模型,可移植性差

  • 具名管道FIFO:映射到系统的文件系统,允许不相关程序 进行交流

    • 是真正的外部文件,通过标准文件接口实现
    • 可以独立启动,局限于本地进程通信时可以替代套接字
    • 相较于普通外部文件,操作系统同步化FIFO访问,使之 适合IPC
  • 套接字socket:映射到系统级别端口号

    • 允许远程联网机器程序之间交流
    • 可移植性较好,几乎所有平台都支持
  • 共享内存:消息保存在函数外部,不随着函数栈清除消失

    • 全局变量
    • 类中变量

  • 线程/进程调度本质上是不确定的

  • 并行编程中错误使用锁机制可能会导致随机数据损坏、其他 异常行为,即竞争条件

  • 最好在临界区(对临界资源进行操作的部分代码)使用锁

死锁、预防

  • 线程/进程同时获取多个锁

避免、预防方案

  • 尽可能保证每个线程/进程只能同时保持一个锁

  • 为程序中每个锁分配唯一ID,然后只允许按照升序规则使用 多个锁

    • 单纯按照对象ID递增顺序加锁不会产生循环依赖,而循环 依赖时死锁必要,从而避免进入死锁状态
    • 可以数学上证明,这样能保证程序不会进入死锁状态

检测、恢复方案

  • 引入看门狗计数器:
    • 线程正常运行时每隔一段时间重置计数器
    • 没有发生死锁时正常运行
    • 一旦发生死锁,无法重置计数器导致计数器超时,程序通过 重启恢复自身状态

并行计算简介

并行计算模型

PRAM

Parallel Random Access Machine:随机存取并行机器模型,也称 共享存储的SIMD模型,从串行的RAM模型直接发展起来

假定

  • 容量无限大的共享存储器

  • 有限个或无限个功能相同的处理器,具有简单的算术运算和逻辑 判断功能

  • 任何时刻各处理器都可以通过共享存储单元相互交互数据

分类

根据处理器对共享存储单元同时读、同时写的限制,PRAM模型可以 分为下面几种

  • PRAM-EREW:Exclusive-Read and Exclusive-Write,不允许同时 读、写

  • PRAM-CREW:Concurrent-Read and Exclusive-Write,允许同时 读、不允许同时写

  • PRAM-CRCW:Concurrent-Read and Concurrent-Write,允许 同时读和同时写,允许同时写是不现实的,进一步约定

    • CPRAM-CRCW:Common PRAM-CRCN,只允许所有的处理器同时 写相同的数

    • PPRAM-CRCW:Priority PRAM-CRCN,只允许最优先的处理器 先写

    • APRAM-CRCW:Aribitrary PRAM-CRCN,允许任意处理器 自由写

    • SPRAM-CRCW:Sum PRAM-CRCN,往存储器中写的实际内容是 所有处理器写的数的和

优点

  • 适合于并行算法的表达、分析和比较

  • 使用简单,很多关于并行计算机的底层细节,比如处理器间通信 、存储系统管理和进程同步都被隐含在模型中

  • 易于设计算法和稍加修改便可以运行在不同的并行计算机系统上

缺点

  • 模型中使用了全局、单一共享存储器、局存容量较小

    • 不足以描述分布主存多处理机的性能瓶颈
    • 共享单一存储器的假定,不适合分布存储结构的MIMD机器
  • PRAM模型是同步的

    • 意味着所有的指令都按照锁步的方式操作
    • 耗时长、不能反映现实中很多系统的异步性;
  • 假设不现实

    • 模型假设每个处理器可在单位时间访问共享存储器的任一 单元,要求处理机间通信无延迟、无限带宽和无开销,忽略 资源竞争和有限带宽
    • 假设处理机有限或无限,对并行任务的增大无开销

BSP模型

异步MIMD-DM(Distributed Memory)模型

  • BSP模型支持消息传递系统,块内异步并行,块间显式同步

  • 模型基于一个master协调,所有worker同步(lock-step)执行, 数据从输入的队列中读取

模型描述

模型可以用 p/s/g/i 4个参数进行描述

  • p:处理器的数目(带有存储器)。

  • s:处理器的计算速度。

  • g:选路器吞吐率

    • 定义为:time_steps / packet
    • time_steps:每秒本地完成的局部计算数目
    • packet:通信网络每秒传送的数据量
  • i:全局同步时间开销,Barrier synchronization time

同步和通信的开销都规格化为处理器的指定条数,p台处理器同时 传送h个字节信息,则gh就是通信的开销

模型结构

BSP程序同时具有水平和垂直两个方面的结构

  • 垂直上:BSP程序由一系列串行的超步(superstep)组成

    super_step_line

  • 从水平上看:在一个超步中,所有的进程并行执行局部计算

  • 超步可分为三个阶段

    super_step

    • 本地计算阶段:每个处理器只对存储本地内存中的数据进行 本地计算
    • 全局通信阶段:对任何非本地数据进行操作
    • 栅栏同步阶段:等待所有通信行为的结束

特点

  • 模型将计算划分为一个一个的超步(superstep),有效避免死锁。

  • 处理器和路由器分开,强调了计算任务和通信任务的分开,且 路由器仅仅完成点到点的消息传递,不提供组合、复制和广播等 功能,掩盖具体的互连网络拓扑、简化了通信协议

  • 一般分布存储的MIMD模型的可编程性比较差,但BSP模型中,若 计算和通信可以合适的平衡(例如g=1),则它在可编程方面 呈现出主要的优点。

  • 采用障碍同步的方式以硬件实现的全局同步是在可控的粗粒度级 ,从而提供了执行紧耦合同步式并行算法的有效方式,而程序员 并无过分的负担

  • BSP模型起到为软件和硬件之间架起一座类似于冯·诺伊曼机的 桥梁的作业,因此BSP模型也常叫做桥模型

  • BSP模型上曾直接实现了一些重要的算法(如矩阵乘、并行前序 运算、FFT和排序等),均避免自动存储管理的额外开销

  • 为PRAM模型所设计的算法,都可以采用在每个BSP处理器上模拟 一些PRAM处理器的方法来实现。

不足

  • 模型中,在超级步开始发送的消息,即使网络延迟时间比超级步 长度短,该消息也只能在下一个超级步才能被使用

  • 全局障碍同步假定是用特殊的硬件支持的,但很多并行机中可能 没有相应的硬件

LogP模型

分布存储、点到点的多处理机模型

模型描述

通信网络由4个主要参数描述

  • L:Latency,源处理机与目的处理机进行消息通信所需要的 等待或延迟时间的上限,表示网络中消息的延迟

  • O:Overhead,处理机准备发送或接收每个消息的时间开销

    • 包括操作系统核心开销和网络软件开销
    • 在这段时间里处理不能执行其它操作
  • G:Gap,一台处理机连续两次发送或接收消息时的最小时间 间隔,其倒数即微处理机的通信带宽。

  • P:Processor,处理机/存储器模块个数

以处理器周期为时间单位,Log可以表示成处理器周期 整数倍

特点

  • 抓住了网络与处理机之间的性能瓶颈:带宽

    • g反映了通信带宽,单位时间内最多有L/g个消息能进行处理机间传送。
  • 处理机之间异步工作,并通过处理机间的消息传送来完成同步

  • 对多线程技术有一定反映。每个物理处理机可以模拟多个虚拟 处理机(VP)

    • 某个VP有访问请求时,计算不会终止
    • VP的个数受限于通信带宽和上下文交换的开销、网络容量
    • 至多有L/g个VP。
  • 消息延迟不确定,但延迟不大于L

    • 消息经历的等待时间是不可预测的
    • 但在没有阻塞的情况下,最大不超过L。
  • 可以预估算法的实际运行时间。

不足

  • 对网络中的通信模式描述的不够深入,有些现象未描述、考虑

    • 重发消息可能占满带宽
    • 中间路由器缓存饱和等未加描述
  • 主要适用于消息传递算法设计

    • 对于共享存储模式,则简单地认为远地读操作相当于两次 消息传递
    • 未考虑流水线预取技术、Cache引起的数据不一致性以及 Cache命中率对计算的影响
  • 未考虑多线程技术的上下文开销

  • 用点对点消息路由器进行通信,这增加了编程者考虑路由器上 相关通信操作的负担

背景

  • 根据技术发展的趋势,20世纪90年代末和未来的并行计算机发展 的主流之一是巨量并行机,即MPC(Massively Parallel Computers), 它由成千个功能强大的处理器/存储器节点,通过具有有限带宽 和相当大的延迟的互连网络构成。所以我们建立并行计算模型 应该充分考虑到这个情况,这样基于模型的并行算法才能在现有 和将来的并行计算机上有效的运行。

  • 根据已有的编程经验,现有的共享存储、消息传递和数据并行 等编程方式都很流行,但还没有一个公认的和占支配地位的编程方式, 因此应该寻求一种与上面的编程方式无关的计算模型。而根据 现有的理论模型,共享存储PRAM模型和互连网络的SIMD模型对 开发并行算法还不够合适,因为它们既没有包含分布存储的情况, 也没有考虑通信和同步等实际因素,从而也不能精确的反映运行 在真实的并行计算机上的算法的行为,所以,1993年D.Culer等人 在分析了分布式存储计算机特点的基础上,提出了点对点通信 的多计算机模型,它充分说明了互联网络的性能特性,而不涉 及到具体的网络结构,也不假定算法一定要用现实的消息传递 操作进行描述

并行算法基本设计策略

串改并

发掘和利用现有串行算法中的并行性,直接将串行算法改造为并行 算法

  • 最常用的设计思路但并不普适
  • 好的串行算法一般无法并行化(数值串行算法可以)

全新设计

从问题本身描述出发,不考虑相应的串行算法,设计全新并行算法

借用法

找出求解问题和某个已解决问题之间的联系,改造或利用已知算法 应用到求解问题上

并行算法常用设计技术

划分设计技术

使用划分法把问题求解分成两步:

  • 把给定问题划分成p个几乎等尺寸的子问题
  • 用p台处理器并行求解子问题

分治设计技术

  • 将复杂问题划分成较小规模特性相同的子问题
  • 且子问题类型和原问题类型相同
  • 通常用递归完成分治算法

平衡树设计技术

  • 以树的叶结点为输入,中间结点为处理结点
  • 由叶向根或由根向叶逐层进行并行处理

倍增设计技术

  • 递归调用时,所要处理数据之间的距离逐步加倍
  • 经过k步后即可完成距离为2^k的所有数据的计算

流水线技术

  • 将算法路程分成p个前后衔接的任务片段,一个任务片段完成后 ,其后继任务片段可以立即开始

  • 则可以引入流水线的思想来处理多条数据

并行计算机体系架构

Shared Memory

shared_mem_image

Distributed Memory

distributed_mem_image

Hybrid

hybrid_mem_image

并行编程模型

特征 数据并行 共享变量 消息传递
代表 HPF OpenMP MPI、PVM
可移植性 SMP、DSM、MPP SMP、DSM 所有流行并行计算机
并行力度 进程级细粒度 线程级细粒度 进程级粗粒度
并行操作方式 松散同步 异步 异步
数据存储 共享存储 共享存储 分布式存储
数据分配方式 半隐式 隐式 显示
难度 较简单 简单
可扩展性 一般 较差

数据并行模型

相同操作同时作用于不同数据

共享变量模型

用共享变量实现并行进程间通信

消息传递模型

驻留在不同节点上的进程通过网络传递消息相互通信,实现进程之间 信息交换、协调步伐、控制执行等

?