字符串、字符处理

<cctype>

检验字符类型

  • isalpha(ch)
  • isupper(ch)
  • islower(ch)
  • isdigit(ch)
  • isxdigit(ch)
  • isalnum(ch)
  • ispunct(ch)
  • isspace(ch)
  • isprint(ch)

大小写转换

  • toupper(ch)
  • tolower(ch)

I/O

MPI

总述

Message Passing Interface:消息传递函数库的标准规范

  • 标准、规范,不是特指其某个具体实现
  • 库描述,包括上百个函数调用接口,支持FortranCppC
  • 消息传递编程模型,并且是这种编程模型的代表、事实上的标准

说明

  • 高移植性

    • 在所有主流并行机上得到实现
    • 使用MPI做消息传递的CppCFortran并行程序可以不加 改变移植
  • 常用版本:所有版本遵循MPI标准,MPI程序可以不加修改的运行

    • MPICH:最流行的非专利实现,具有更好的可移植性
    • OpenMPILAMMPI的下代MPI实现
    • 其他商用版本MPI:HP-MPIMS-MPI

编译运行

1
2
3
$ mpicc -o hello hello.c
$ mpirun np 4 hello
# 指定进程数

概念

通信组

通信子:进程组集合,所有参与并行计算的进程可以组合为一个或 多个通信组

  • MPI通信操作函数中必要参数,用于限定参加通信的进程范围

  • MPI_COMM_WORLDMPI_Init后,MPI程序的所有进程形成的 缺省组

MPI接口

  • 头文件为mpi.h
  • MPI函数均以MPI_为前缀,其后第一个字符大写
  • MPI函数返回出错代码或成功代码MPI_SUCCESS

MPI数据类型

MPI(C Binding) C MPI(Fortran Binding) Fortran
MPI_BYTE MPI_BYTE
MPI_CHAR signed char MPI_CHARACTER CHARACTER(1)
MPI_COMPLEX COMPLEX
MPI_DOUBLE double MPI_DOUBLE_PRECISION DOUBLE_PRECISION
MPI_FLOAT float MPI_REAL REAL
MPI_INT int MPI_INTEGER INTEGER
MPI_LOGICAL LOGICAL
MPI_LONG long
MPI_LONG_DOUBLE long double
MPI_PACKED MPI_PACKED
MPI_SHORT short
MPI_UNSIGNED_CHAR unsigned char
MPI_UNSIGNED unsigned int
MPI_UNSIGNED_LONG unsigned long
MPI_UNSIGNED_SHORT unsigned short

用途

  • 异构计算:不同系统架构有不同数据表示格式,MPI预定义一些 基本数据类型,实现过程中以这些基本数据类型为桥梁进行转换

  • 派生数据类型:允许消息来自不连续、类型不一致存储区域,如 数组散元、结构类型等

开始结束

MPI_Init

1
int MPI_Init(int *argc, char **argv)
  • MPI程序的第一个调用(除MPI_Initialize

    • 完成MPI程序的所有初始化工作
    • 启动MPI环境
    • 标志并行代码开始
  • main必须带参数运行

MPI_Finalize (void)

1
int MPI_Finalize(void)
  • MPI程序的最后一个调用

    • 结束MPI程序的运行
    • 必须是MPI程序的最后一条可执行语句,否则程序运行结果 不可预知
    • 标志并行代码结束,结束除主进程外其他进程
  • 串行代码之后仍然可在主进程上运行

进程判断

MPI_Common_size

1
int MPI_Common_size(MPI_Comm comm, int *size)
  • 说明:获得进程个数p存储于size

  • 参数

    • comm:通信组

MPI_Common_rank

1
int MPI_Comm_rank(MPI_Comm comm, int *rank)
  • 说明:获得0~p-1间进程rank值,相当于进程ID

MPI_Get_processor_name

1
2
3
4
int MPI_Get_processor_name(
char *processor_name,
int *namelen
)
  • 说明:获得进程名称

  • 参数

    • processor_name:存储进程名称
    • namelen:存储进程名称长度

其他

MPI_Get_count

1
2
3
4
5
init MPI_Get_count(
MPI_Status status,
MPI_Datatype datatype,
int *count
)
  • 说明:返回实际接收到的消息长度

  • 参数

    • status:接收操作返回值
    • datatype:接收缓冲区中元素类型
    • countOUT,接受区元素个数

P2P通信

  • 一对一通信
  • 注意
    • MPI_Send/Recv匹配,避免死锁

同步P2P

MPI_Send

1
2
3
4
5
6
7
8
int MPI_Send(
void *buf,
int count,
MPI_Datatype datatype,
int dest,
int tag,
MPI_Comm comm
)
  • 说明:阻塞发送缓冲区中countdatatype数据类型 数据至目的进程

  • 参数

    • buf:发送缓冲区起始地址
    • count:发送元素个数
    • datatype:发送信息元素类型
    • dest:目标进程rank值
      • 阻塞式消息传递中不允许source == dest,即自身 作为接收者,会导致死锁
    • tag:消息标签
    • comm:通信组
      • 缺省MPI_COMM_WORLD
      • 消息传递必须限制在同一个消息组内

MPI_Recv

1
2
3
4
5
6
7
8
9
int MPI_Recv(
void *buf,
int count,
MPI_Datatype datatype,
int source,
int tag,
MPI_Common comm,
MPI_Status *status
)
  • 说明:阻塞从发送源进程获取countdatatype 数据类型至数据缓冲区buf

  • 参数

    • bufOUT,接收缓冲区起始地址

      • 必须至少可以容纳countdatatype类型数据, 否则溢出、出错
    • count:最多可接收数据个数

    • datatype:接收数据类型,必须同MPI_Send匹配

      • 有类型数据:发送、接收两者数据类型相同
      • 无类型数据:使用MPI_BYTE数据类型
      • 打包数据通信:使用MPI_PACKED
    • source:接收数据源进程rank值

      • 发送进程隐式确定,由进程rank值唯一标识
      • MPI_ANY_SOURCE:接收任意进程来源
    • tag:消息标签

      • MPI_ANY_TAG:匹配任意tag值
    • comm:通信组

      • 缺省MPI_COMM_WORLD
      • 消息传递必须限制在同一个消息组内
    • statusOUT,包含实际接收到消息的有关信息

      • status.MPI_SOURCEMPI_ANY_SOURCE时,确定 消息来源
      • status.MPI_TAGMPI_ANY_TAG时,确定消息tag
      • MPI_Get_count获取接收到的消息长度

案例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
#include<stdio.h>
#include "mpi.h"

void maini(int argc, char *argv[]){
int myid, numprocs, source;
MPI_Status status;
char message[100];

MPI_Init(&argc, &argv);
MPI_Comm_rank(MPI_COMM_WORLD, &myid);
MPI_Comm_size(MPI_COMM_WORLD, &numprocs);

if (myid != 0){
sprintf(message, "Hello! From process %d", myid);
MPI_Send(message, strlen(message)+1, MPI_CHAR, 0,
99, MPI_COMM_WORLD);
}else{
for (source = 1; source < numproces; source++){
MPI_Recv(message, 100, MPI_CHAR, source, 99,
MPI_COMM_WORLD, &status);
printf("%s\n", message);
}
}
MPI_Finalize()

异步P2P

MPI_Isend

1
2
3
4
5
6
7
8
9
int MPI_Isend(
void *buf,
int count,
MPI_Datatype datatype,
int dest,
int tag,
MPI_Comm comm,
MPI_Request *request
)
  • 说明:非阻塞发送缓冲区中countdatatype数据类型 数据至目的进程

  • 参数

    • requestOUT,非阻塞通信完成对象(句柄、指针)

MPI_Irecv

1
2
3
4
5
6
7
8
9
int MPI_Recv(
void *buf,
int count,
MPI_Datatype datatype,
int source,
int tag,
MPI_Common comm,
MPI_Request *request
)
  • 说明:非阻塞从发送源进程获取countdatatype 数据类型至数据缓冲区buf

Collective Communication

集合通信

  • 通信空间中所有进程都参与通信操作
  • 每个进程都需要调用操作函数
  • 多对一或一对多、同步通信

数据移动

  • All-:结果到所有进程
  • -v:variety,被操作的对象、操作更灵活
    • 通信元素块大小可变
    • 发送、接收时数据位置可以不连续

mpi_data_migration

MPI_Bcast

1
2
3
4
5
6
7
int MPI_Bcast(
void *buffer,
int count,
MPI_Datatype datatype,
int root,
MPI_Comm comm
)
  • 说明:数据广播,一到多

  • 参数

    • root:根进程

MPI_Gather

1
2
3
4
5
6
7
8
9
10
int MPI_Gather(
void *sendbuf,
int sendcnt,
MPI_Datatype sendtype,
void *recvbuf,
int recvcount,
MPI_Datatype recvtype,
int root,
MPI_Comm comm
)
  • 说明:数据聚合,多到一

  • 参数

    • sendbuf:发送缓冲区起始位置
    • sendcnt:发送数据元素个数
    • sendtype:发送数据类型
    • recvcount:接收元素个数
      • 所有进程相同
      • 仅对根进程有效
    • recvtype:接收数据类型
      • 仅对根进程有效
    • root:接收进程rank值
    • comm:通信组

MPI_Gatherv

1
2
3
4
5
6
7
8
9
10
11
int MPI_Gatherv(
void *sendbuf,
int sendcnt,
MPI_Datatype sendtype,
void *recvbuf,
int *recvcnts,
int *displs,
MPI Datatype recvtype,
int root,
MPI_Comm comm
)
  • 说明:MPI_Gather的一般形式

  • 参数

    • recvcnts:从各进程分别要接受的元素个数数组
      • 大小等于通信组大小
      • 仅对根进程有效
    • displs:从各进程要接受存放位置相对于接收缓冲区 起始位置的偏移量
      • 仅对根进程有效

MPI_Allgather

  • 说明:MPI_Gather特殊形式

MPI_Allgatherv

  • 说明:MPI_Allgather一般形式

MPI_Scatter

1
2
3
4
5
6
7
8
9
10
int MPI_Scatter(
void *sendbuf,
int sendcnt,
MPI_Datatype sendtype,
void *recvbuf,
int recvbuf,
MPI_Datatype recvtype,
int root,
MPI_Comm comm
)
  • 说明:数据分散,一对多

MPI_Scatterv

  • 说明:MPI_Scatter一般形式

MPI_Alltoall

  • 说明:多对多,全交换置换数据

MPI_Alltoallv

  • 说明:MPI_Alltoall一般形式

数据聚集

mpi_data_reduction

MPI_Reduce

1
2
3
4
5
6
7
8
9
int MPI_Reduce(
void *sendbuf,
void *recvbuf,
int count,
MPI_Datatype datatype,
MPI_Op op,
int root,
MPI_Comm comm
_
  • 说明:数据规约,多对一

MPI_Allreduce

  • 说明:MPI_Allreduce特殊形式,结果在所有进程

MPI_Reduce_scatter

  • 说明:结果分散到每个进程

MPI_Scan

  • 说明:前缀操作

同步

MPI_Barrier

  • 说明:同步操作

Hadoop 计算模型

分布式计算模型

分布式系统,不像一般的数据库、文件系统,无法从上至下、从头 到尾进行求和等操作,需要由分散的节点不断向一个点聚拢计算过程 ,考虑将分布式计算模型可以考虑分为两部分

  • 分布:操作原语
  • 聚合:处理流程

MapReduce

MapReduce计算模型主要描述是分布:操作原语部分, 但是此也包含了聚合:处理流程

操作原语

  • map:映射
    • 以键值对二元组为主要处理对象
    • map过程相互独立、各mapper相互不通信的
    • 所以mapper比较适合独立计算任务
  • reduce:规约
    • 直接处理map输出,reduce中二元组键位map过程中输出值

处理流程

以hadoop中MapReduce为例

  • 需要把任何计算任务转换为一系列MapReduce作业,然后依次 执行这些作业

  • 计算过程的各个步骤之间,各个作业输出的中间结果需要存盘, 然后才能被下个步骤使用(因为各个步骤之间没有明确流程)

缺陷

操作原语部分
  • 提供原语少:需要用户处理更多逻辑,易用性差

  • 所以抽象层次低:数据处理逻辑隐藏在用户代码中,可读性差

  • 所以其表达能力有限:

    • 复杂数据处理任务,如:机器学习算法、SQL连接查询很难 表示用MapReduce计算默认表达
处理流程部分
  • MapReduce需要通过把中间结果存盘实现同步,I/O延迟高

  • reduce任务需要等待map任务全部完成才能继续,同步Barrier 大

适合场景

  • One Pass Computation:只需要一遍扫描处理的计算任务 MapReduce计算模型非常有效

    • 批数据处理
  • Multi Pass Computation:需要在数据上进行多遍扫描、处理 的计算任务,需要执行多个MapReduce作业计算任务,因为 多副本复制、磁盘存取,其效率不高

DAG

Directed Acyclic Graph:只是表示数据处理流程的有向无环图

  • 顶点:数据处理任务,反映一定的业务逻辑,即如何对数据进行 转换和分析
  • 边:数据在不同的顶点间的传递

应用

  • DAG本身并不涉及如何处理数据,只是对数据数据流程的规划

  • 所以DAG并不能改进MapReduce计算模型中原语部分的缺陷,只能 优化数据处理流程

    • 减少MapReduce作业中间结果存盘,减少磁盘I/O
    • 优化map、reduce任务执行,减少同步Barrier

这也正是Tez所做的事情

RDD

HBase

HBase简介

HBase是高可靠、高性能、面向列、可伸缩的分布式数据库系统

  • 利用HBase技术可以在廉价硬件上搭建大规模非结构化数据管理 集群

  • HBase借鉴Google Bigtable技术实现的开源软件

    ||HBase|Bigtable| |——-|——-|——-| |存储系统|HDFS|GFS| |数据处理|Hadoop MapReduce|MapReduce| |协同服务|Zookeeper|Chubby| |RDBMS数据导入|Sqoop|-|

  • HBase访问接口

    • Native Java API:常用、高效的访问方式
    • HBase Shell:HBase命令行工具,适合用于管理HBase
    • Thrift Gateway:利用Thrift序列化技术,支持C++、PHP、 Python多种语言异构系统访问HBase表数据
    • REST Gateway:支持REST风格的Http API访问HBase
    • Pig:支持Pig Latin语言操作HBase中数据
      • 最终被转换为MapReduce Job处理HBase表数据
      • 适合做数据统计
    • Hive:支持用户使用HiveQL访问HBase
  • 可以在HBase系统上运行MapReduce作业,实现数据批处理 hbase_mapreduce

HBase数据结构

hbase_storage_structure

Table

HBase的表格,类似关系型数据库中的表格,但有所不同

特殊Table

HBase中有两张特殊的Table

  • .META.:记录用户表Region信息,自身可以有多个region
  • -ROOT-:记录.META.表Region信息的,自身只能有一个 region

Row Key

行键,Table行主键,Table记录按照此排序

Column、Column Family

  • Table在水平方向由一个或多个列簇组成
  • 一个列簇可以由任意多个Column组成
  • 列簇支持动态扩展,无需预先定义列数量、类型
  • 所有列均义二进制格式存储,用户需要自行进行类型转换

Timestamp

时间戳:每次数据操作对应的时间戳,可视为是数据的版本号

Region

Table记录数不断增加而变大后,逐渐分裂出的多个split

  • 每个region由[startkey, endkey)表示
  • 不同region被Master分配给相应RegionServer进行管理(存储)

HBase系统架构

hbase_structure

Client

  • HBase Client使用HBase RPC机制同HMaster、HRegionServer 进行通信
    • 对于管理类操作,通过RPC机制访问HMaster
    • 对于读写操作,通过RPC机制访问HRegionServer

Zookeeper

  • Zookeeper Quorum中记录-ROOT表的位置

    • 客户端访问数据之前首先访问zookeeper
    • 然访问-ROOT-
    • 然后访问.META.
    • 最后根据用户数据位置,访问具体数据
  • Zookeeper Quorum中存储有HMaster地址

  • HRegionServer把自己义Ephemeral方式注册到Zookeeper中, 使得HMaster可以随时感知各个HRegionServer健康状态

  • 引入Zookeeper,避免了HMaster单点失败问题

    • HBase中可以启动多个HMaster
    • 通过Zookeeper的Master Election机制保证总有一个Master 运行

HMaster

HMaster在功能上主要负责Table、Region管理工作

  • 管理用户对Table增、删、查、找操作???
  • 管理HRegionServer负载均衡,调整Region分布
  • 在Region分裂后,负责新Region分配
  • 在HRegionServer停机后,负责失效HRegionServer上region迁移

HRegionServer

HRegionServer负责响应用户I/O请求,向HDFS文件系统写数据,是 HBase中最核心的模块

hbase_hregion_server_structure

HRegion

HRegionServer内部管理一系列HRegion对象

  • HRegion对象对应Table中一个Region
  • HRegion由多个HStore组成

HStore

每个HStore对应Table中一个列簇的存储,是HBase存储核心模块

  • 由此可以看出列簇就是一个集中存储单元

    • 因此最好将具备共同IO特性的列放在同一个列簇中,可以 提高IO效率
  • HStore由两部分构成

    • MemStore
    • StoreFile:底层实现是HFile,是对HFile的轻量级包装
MemStore

Sorted memory buffer,用户写入数据首先放入MemStore中,满了 后Flush成一个StoreFile

StoreFile
  • 文件数量增长到一定阈值时会触发Compact合并操作,将多个 StoreFile合并成一个StoreFile

    • 合并过程中会进行版本合并、数据删除
    • 即HBase其实只有增加数据,所有更新、删除操作都是后续 Compact过程中进行的
    • 这使得用户写操作只要进入内存就可以立即返回,保证 HBase IO高性能
  • Compact操作会逐步形成越来越大的StoreFile,超过阈值之后 会触发Split操作

    • 当前Region分裂成2个Region
    • 父Region下线
    • 新分裂出的2个子Region会被HMaster分配到相应的 HRegionServer上,实现负载均衡

HLog

每个HRegionServer中都有一个HLog对象,避免因为分布式系统 中节点宕机导致的MemStore中内存数据丢失

  • HLog是实现WriteAheadLog的类

  • HLog作用

    • 每次用户写入MemStore时,也会写入一份数据至HLog文件中
    • HLog定时删除已持久化到StoreFile中的数据
  • HRegion意外终止后,HMaster会通过zookeeper感知到

    • HMaster首先处理遗留的HLog文件,将其中不同Region的Log 数据进行拆分,分别放到相应Region目录下
    • 然后将失效Region重新分配
    • 领取到Region的HRegionServer在Load Region过程中,会 发现有历史HLog需要处理,会Replay HLog中的数据到 MemStore中,然后flush到StoreFile中,完成数据恢复

HBase存储

HBase中所有数据存储在HDFS中

HFile

HFile是Hadoop二进制格式文件,实现HBase中Key-Value数据存储

  • HFile是不定长的,长度固定的只有:Trailer、FileInfo

hbase_hfile_storage_formation

Trailer

含有指针指向其他数据块起点

FileInfo

记录文件的一些元信息,如

  • AVG_KEY_LEN
  • AVG_VALUE_LEN
  • LAST_KEY
  • COMPARATOR
  • MAX_SEQ_ID_KEY

Data Index

记录每个Data块起始点

Meta Index

记录每个Meta块起始点

Data Block

Data Block是HBase IO基本单元

  • 为了提高效率,HRegionServer中实现了基于LRU的Block Cache 机制

  • Data块大小可以在创建Table时通过参数指定

    • 较大的块有利于顺序Scan
    • 较小的块有利于随机查询
  • Data块除了开头的Magic信息外,就是一个个<key, value> 键值对拼接而成

  • Magic内容就是一些随机数字,防止数据损坏

  • 每个键值对就是简单的byte array,但是包含很多项,且有固定 结构 hbase_hfile_datablock_kv

    • 开头是两个固定长度的数值,分别表示key、value长度
    • 然后是key部分
      • 固定长度数值表示RowKey长度
      • RowKey
      • 固定长度数值表示Column Family的长度
      • Column Family
      • Qualifier
      • 固定长度数值表示:Timestamp、KeyType(Put/Delete)
    • Value部分就是二进制数据

HLogFile

HBase中Write Ahead Log存储格式,本质上是Hadoop Sequence File

  • Sequence File的Key是HLogKey对象,记录了写入数据的归属信息

    • table
    • region
    • squecence number:起始值为0,或最近一次存入文件系统 中的squence number
    • timestamp:写入时间
  • Squence File的Value是KeyValue对象,即对应HFile中KeyValue

e

Zookeeper

Zookeeper是一个协调软件服务,用于构建可靠的、分布式群组

  • 提供群组成员维护、领导人选举、工作流协同、分布式系统同步 、命名、配置信息维护等服务

  • 提供广义的分布式数据结构:锁、队列、屏障、锁存器

  • Zookeeper促进户端间的松耦合,提供最终一致的、类似传统 文件系统中文件、目录的Znode视图,提供基本操作,如:创建 、删除、检查Znode是否存在

  • 提供事件驱动模型,客户端能观察到Znode的变化

  • Zookeeper运行多个Zookeeper Ensemble以获得高可用性,每个 服务器上的Ensemble都持有分布式系统内存副本,为客户端读取 请求提供服务

zookeeper_structure

Flume

Flume是分布式日志收集系统,收集日志、事件等数据资源,并集中 存储

Flume组件、结构

  • 旧版本组件、结构 flume_struture_old_version

  • 新版本组件、结构:每个Flume整体称为Agent flume_dataflow

    • 两个版本的组件功能、数据流结构都有区别
    • 但是3大组件基本可以一一对应(功能略有差异)
  • Agent是一组独立的JVM守护进程,从客户端、其他Agent接收 数据、迅速传递给下个目的节点

  • 支持多路径流量、多管道接入流量、多管道接出流量、上下文 路由

Source(Agent)

采集数据,是Flume产生数据流的地方

  • 运行在数据发生器所在的服务器上,接收数据发生器接受数据, 将数据以event格式传递给一个或多个Channel

  • 支持多种数据接收方式

    • Avro Source:支持Avro RPC协议,内置支持
    • Thrift Source:支持Thrift协议
    • Exec Source:支持Unix标准输出
    • JMS Source:从JMS(消息、主题)读取数据
    • Spooling Directory Source:监控指定目录内数据变更
    • Twitter 1% firehose Source:通过API持续下载Twitter 数据
    • Netcat Source:监控端口,将流经端口的每个文本行数据 作为Event输入
    • Sequence Generator Source:序列生成器数据源
    • HTTP Source:基于POST、GET方式数据源,支持JSON、BLOB 格式
  • 收集数据模式

    • Push Source:外部系统主动将数据推送到Flume中,如 RPC、syslog
    • Polling Source:Flume主动从外部系统获取数据,如 text、exec

Channel (Collector)

暂时的存储容器,缓存接收到的event格式的数据,直到被sink消费

  • 在source、sink间起桥梁作用

  • Channel基于事务传递Event,保证数据在收发时的一致性

  • Channel可以和任意数量source、sink连接

  • 主要Channel类型有

    • JDBC channel:数据持久化在数据库中,内置支持Derby
    • File Channel:数据存储在磁盘文件中
    • Memory Channel:数据存储在内存中
    • Spillable Meemory Channel:优先存在内存中,内存队列 满则持久到磁盘中
    • Custom Channel:自定义Channel实现

Sink(Storage Tier)

将从Channel接收数据存储到集中存储器中(HDFS、HBase)

Flume Event

Flume事件是内部数据传输最小基本单元、事务处理基本单位

  • 由一个装载数据的byte array、可选header构成
    • 数据对Flume是不透明的
    • header是容纳键值对字符串的无需集合,键在集合内唯一
    • header可以在上下文路由中使用扩展,如:数据清洗
  • Event将传输数据进行封装

Flume架构特性(旧版)

Reliablity

Flume提供了3种数据可靠性选项

  • End-to-End:使用磁盘日志、接受端Ack的方式,保证Flume接收 数据最终到导致目的地

  • Store on Failure:目的地不可用时,将数据保存在本地硬盘, 但进程如果出问题,可能丢失部分数据(发送后目的地不可用)

  • Best Effort:不做任何QoS保证

Scalability

易扩展性

  • Flume三大组件都是可伸缩的
  • Flume对事件的处理不需要带状态,Scalability容易实现

Avaliablity

高可用性:Flume引入Zookeeper用于保存配置数据

  • Zookeeper本身可以保证配置数据一致性、高可用
  • 在配置数据发生变化时,Zookeeper通知Flume Master节点 Flume Master节点通过gossip协议同步数据

flume_distributed_deployment

Manageablity

易管理性

  • 多个Master,保证可以管理大量节点

Extensibility

可开发性:可以基于Java为Flume添加各种新功能

  • 实现Source子类,自定义数据接入方式
  • 实现Sink子类,将数据写入特定目标
  • 实现SinkDecorator子类,对数据进行一定的预处理

适合场景

  • 高效率的从多个网站服务器收集日志信息存储在HDFS上
  • 将从多个服务器获取的数据迅速移交给Hadoop
  • 可以收集社交网络站点事件数据,如:facebook、amazon

Kafka

分布式、分区的、可复制的Message System(提交日志服务), 得益于特有的设计,Kafka具有高性能、高可扩展的特点

  • 完全分布式系统,易于横向扩展、处理极大规模数据
  • 同时为发布、订阅提供极高吞吐能力
  • 支持多订阅,出现失败状态时,可以自动平衡消费者
  • 将消息持久化到磁盘,保证消息系统可靠性,可用于消息 批量消费应用(ETL系统)、实时应用

Kafka组件

kafka_structure

Topic

话题:特定类型的消息流

  • 话题是消息的分类机制

    • 消息产生器向Kafka发布消息必须指定话题
  • Kafka安照Topic维护接收到的消息

    • 话题被划分为一系列分区
    • Kafka集群为每个Topic维护一个分区日志文件存储消息

消息是字节的Payload(有效载荷)

Producer

生产者:向Kafka发布消息的进程

  • 生产者需要指定消息分配至哪个分区
    • 采用Round-Robin方式方便均衡负载
    • 根据应用的语义要求,设置专用Partition Function进行 消息分区

Broker

代理:AMQP客户端,保存已经发布消息的服务器进程

AMQP:the Advanced Message Queuing Protocal,标准开放 的应用层消息中间件协议。AMQP定义了通过网络发送的字节流 的数据格式,兼容性非常好,任何实现AMQP协议的程序可以和 兼容AMQP协议兼容的其他应用程序交互,容易做到跨语言、 跨平台。

  • 一组代理服务器构成Kafka集群

  • Kafka代理是无状态的,消费者需要自行维护已消费状态信息

    • 因此Kafka无法知晓信息是否已经被消费、应该删除,因此 代理使用简单的、基于时间的Serice Level Agreement应用 于保留策略,消息在代理中超过一定时间自动删除

    • 这种设计允许消费者可以重复消费已消费数据

      • 虽然违反队列常见约定
      • 但是实际应用中很多消费者有这种特征
  • 消息代理将紧密耦合的系统设计解耦,可以对未及时处理的消息 进行缓存

    • 提高了吞吐能力
    • 提供了分区、复制、容错支持
  • Kafka代理通过Zookeeper与其他Kafka代理协同 kafka_with_zookeeper

    • 系统中新增代理或代理故障失效时,Zookeeper通知生产者 、消费者
    • 生产者、消费者据此开始同其他代理协同工作

Consumer

消费者:向Kafka subscribe话题,以处理Kafka消息的进程

  • 消费者可以订阅一个或多个话题,从代理拉取数据,消费已经 发布的消息

  • 消费者获取消息系统一般采用两种模型

    • Queuing:队列模型,一组消费者从一个服务器读取信息, 每个消息仅可被其中一个消费者消费

    • Publish Subscribe:发布订阅模型,消息被广播给所有 消费者

  • Kafka采用一种抽象方法:消费者组Consumer Group提供对上述 两种消息系统模型的支持 kafka_comsumer_group

    • 给每个消费者打上属于某个消费者组的标签(这里组只是 表示同组内消费者只能有一个消费信息)

    • 每个发布到话题的消息分发给消费者组的其中一个消费者

    • 一般情况下每个话题下有多个消费者组,每个组中有多个 消费者实例,以达到扩展处理能力、容错

    • 极端情况:如果所有消费者实例都隶属于同一个消费者组, Kafka工作模式类似于队列模型;所有消费者实例隶属于 不同的消费者组,Kafka工作模式类似于发布-订阅模型

消息分区、存储、分发

分区日志

每个分区是有序的不可更改可在末尾不断追加的 消息序列

分区优势

  • 允许Kafka处理超过一台服务器容量的日志规模

  • 分区作为并行处理基本单元,允许Kafka进行并行处理

  • 通过保证每个分区仅仅由一个消费者消费,可以保证同一 分区内消息消费的有序

    • 由于可以设置很多分区,仍然可以保证在不同消费者之间 实现负载均衡
    • 分区内外保证消息有序、数据分区处理对大部分实际应用 已经足够

分区管理

每个分区由单独的(一组)服务器处理,负责该分区数据管理、消息 请求,支持多个副本以支持容错

  • 每个分区中有一台服务器作为leader、若干服务器作为follower

  • 领导者负责分区读、写请求,跟随者以被动的方式领导者数据 进行复制

  • 领导者失败,则追随者之一在Zookeeper协调下成为新领导者

  • 为保证负载均衡,每个服务器担任部分分区领导者、其他分区 追随者

存储布局

Kafka存储布局非常简单

kafka_storage

分区存储

  • 话题每个分区对应一个逻辑日志

  • 每个日志为相同的大小的一组分段文件

  • 生产者发布的消息被代理追加到对应分区最后一个段文件中

  • 发布消息数量达到设定值、经过一段时间后,段文件真正写入 磁盘,然后公开给消费者

Offset

分区中每个消息的Sequential ID Number(Offset),唯一标识 分区中消息,并没有明确的消息ID

  • 偏移量是增量的但不连续,下个消息ID通过在其偏移量加上 消息长度得到

  • 偏移量标识每个消费者目前处理到某分区消息队列的位置, 对分区消息队列处理依赖于其(消息通过日志偏移量公开)

  • 偏移量由消费者控制,所以消费者可以以任何顺序消费消息

    • 可以回推偏移量重复消费消息
    • 设计消费者仅仅查看分区末尾若干消息,不改变消息, 其他消费者可以正常的消费

从消息分区机制、消费者基于偏移量消费机制,可以看出Kafka消息 消费机制不会对集群、其他消费者造成影响

适合场景

  • Messaging:消息传递,作为传递消息队列(ActiveMQ、 RabbitMQ等)替代品,提供高吞吐能力、高容错、低延迟

  • Website Activity Tracking:网站活动跟踪,要求系统必须 快速处理产生消息

  • Metric:度量,把分布式各个应用程序的运营数据集中,进行 汇总统计

  • Streaming Processing:流数据处理

  • Event Sourcing:事件溯源,把应用程序状态变化以时间顺序 存储,需要支持大量数据

  • Commit Log:日志提交,作为分布式系统提交日志的外部存储 服务

Storm

Storm是分布式、高容错的实时流数据处理的开源系统

  • Storm为流数据处理设计,具有很高的容错性
  • Storm保证每个消息只能得到一次完整处理,任务失败时会负责 从消息源重试消息,从而支持可靠的消息处理
  • 可以通过实现Storm通讯协议,提供其他语言支持

Storm架构

storm_structure

  • 主节点的运行Nimbus守护进程

    • 分配代码
    • 布置任务
    • 故障检测
  • 工作节点运行Supervisor守护进程

    • 监听、开始、终止工作进程
  • Nimbus、Supervisor都是无状态的(不负责维护客户端两次调用 之间状态维护)

    • 这使得两者十分健壮
    • 两者之间的协调由Zookeeper完成
  • Storm在ZeorMQ内部传递消息

Nimbus

Supervisor

Worker

Storm编程模型

Stream

数据流:没有边界的tuple序列

  • 这些tuple以分布式的方式,并行的创建、处理

Topology

计算拓扑:实时计算应用程序处理逻辑封装成的Topology对象

  • 相当于Mapreduce作业,但是MapReduce作业最终会结束、而 Topology会一直运行直到被杀死
  • Topology由Spout、Bolt组成

Spout

消息源:消息tuple生产者

  • 消息源可以是可靠的、不可靠的
  • 可靠的消息源可在tuple没有被storm成功处理时,可以重新发送
  • 不可靠的消息源则在发送tuple之后彻底丢弃

Bolt

消息处理者:封装所有的消息处理逻辑

  • Bolt可以做很多事情,包括过滤、聚集
  • Bolt一般数据处理流程
    • 处理一个输入tuple,发送0个、多个tuple
    • 调用ack接口,通知storm子集已经处理过了

Task、Executor

Topology每个Spout、Bolt转换为若干个任务在整个集群里执行

  • 默认情况下,每个Task对应一个线程Executor,线程用于执行 task
  • 同一个Spout/Bolt里的Task共享一个物理线程

Stream Grouping

数据分发策略:定义Spout、Bolt间Tasks的数据分发

  • Shuffle Grouping:洗牌式分组,上游Spout数据流tuples随机 分发到下游Bolt的Task

  • Fields Grouping:按指定字段进行分组

  • All Grouping:Spout数据tuple分发给所有下Bolt

  • Global Grouping:Spout数据tuple分发给最小id的task

  • Non-Grouping:类似shuffle Grouping,把具有Non-Grouping 设置Bolt推到其订阅的上游Spout、Bolt

  • Direct Grouping:tuple生产者决定接收tuple下游bolt中的task

  • Local or Shuffle Grouping:如果目标bolt中由一个或多个 task工作在同一进程中,tuple分配给这些task,否则同洗牌式 分组

  • Partial Key Grouping:类似Fields Grouping,但是在下游 Bolt中做负载均衡,提高资源利用率

消息处理保证

Storm追踪由每个SpoutTuple产生的Tuple树

  • 每个从Spout发出tuple,可能会生成成千上万个tuple

    • 根据血缘关系形成一棵tuple树
    • 当tuple树中所有节点都被成功处理了,才说明tuple被完全 处理
  • 每个Topology都有一个消息超时设置,如果Storm在时间内无法 检验tuple树是否完全执行,该tuple标记为执行失败,之后重发

重发