Hadoop 计算模型

分布式计算模型

分布式系统,不像一般的数据库、文件系统,无法从上至下、从头 到尾进行求和等操作,需要由分散的节点不断向一个点聚拢计算过程 ,考虑将分布式计算模型可以考虑分为两部分

  • 分布:操作原语
  • 聚合:处理流程

MapReduce

MapReduce计算模型主要描述是分布:操作原语部分, 但是此也包含了聚合:处理流程

操作原语

  • map:映射
    • 以键值对二元组为主要处理对象
    • map过程相互独立、各mapper相互不通信的
    • 所以mapper比较适合独立计算任务
  • reduce:规约
    • 直接处理map输出,reduce中二元组键位map过程中输出值

处理流程

以hadoop中MapReduce为例

  • 需要把任何计算任务转换为一系列MapReduce作业,然后依次 执行这些作业

  • 计算过程的各个步骤之间,各个作业输出的中间结果需要存盘, 然后才能被下个步骤使用(因为各个步骤之间没有明确流程)

缺陷

操作原语部分
  • 提供原语少:需要用户处理更多逻辑,易用性差

  • 所以抽象层次低:数据处理逻辑隐藏在用户代码中,可读性差

  • 所以其表达能力有限:

    • 复杂数据处理任务,如:机器学习算法、SQL连接查询很难 表示用MapReduce计算默认表达
处理流程部分
  • MapReduce需要通过把中间结果存盘实现同步,I/O延迟高

  • reduce任务需要等待map任务全部完成才能继续,同步Barrier 大

适合场景

  • One Pass Computation:只需要一遍扫描处理的计算任务 MapReduce计算模型非常有效

    • 批数据处理
  • Multi Pass Computation:需要在数据上进行多遍扫描、处理 的计算任务,需要执行多个MapReduce作业计算任务,因为 多副本复制、磁盘存取,其效率不高

DAG

Directed Acyclic Graph:只是表示数据处理流程的有向无环图

  • 顶点:数据处理任务,反映一定的业务逻辑,即如何对数据进行 转换和分析
  • 边:数据在不同的顶点间的传递

应用

  • DAG本身并不涉及如何处理数据,只是对数据数据流程的规划

  • 所以DAG并不能改进MapReduce计算模型中原语部分的缺陷,只能 优化数据处理流程

    • 减少MapReduce作业中间结果存盘,减少磁盘I/O
    • 优化map、reduce任务执行,减少同步Barrier

这也正是Tez所做的事情

RDD

Hadoop概述

  • Hadoop(核心):HDFSMapReduce/YARN
  • Hadoop家族:建立在Hadoop基础上的一系列开源工具

hadoop_relations

Hadoop

HadoopApache的一个分布式计算、java语言实现的开源框架, 实现在大量计算机组成的集群中对海量数据进行分布式计算。相比于 依赖硬件的可靠性,Hadoop被设计为可以检测、处理应用层面的 failures,能够提供构建于电脑集群上的可靠服务。

HadoopApache的分布式计算开源框架,提供分布式文件系统 HDFSMapReduce/YARN分布式计算的软件架构

Hadoop Common

支持其它Hadoop模块的公用组件

Hadoop Distributed File System(HDFS)

虚拟文件系统,让整个系统表面上看起来是一个空间,实际上是很多 服务器的磁盘构成的

Hadoop YARN

Yet Another Resource Negotiator,通用任务、集群资源分配框架 ,面向Hadoop的编程模型

  • YARN将classic/MapReduce1中Jobtracker职能划分为多个独立 实体,改善了其面临的扩展瓶颈问题

  • YARN比MapReduce更具一般性,MapReduce只是YARN应用的一种 形式,可以运行Spark、Storm等其他通用计算框架

  • YARN精妙的设计可以让不同的YARN应用在同一个集群上共存, 如一个MapReduce应用可以同时作为MPI应用运行,提高可管理性 和集群利用率

Hadoop MapReduce

YARN基础上的大数据集并行处理系统(框架)

  • 包括两个阶段

    • Map:映射
    • Reduce:归一
  • 在分布式系统上进行计算操作基本都是由Map、Reduce概念步骤 组成

    • 分布式系统,不像一般的数据库、文件系统,无法从上至下 、从头到尾进行求和等操作
    • 需要由分散的节点不断向一个点聚拢的计算过程
  • 不适合实时性要求的应用,只适合大数据离线处理

Apache下Hadoop相关项目

高频

Ambari

用于部署(供应)、管理、监控Hadoop集群的Web工具

  • 支持HDFSMapReduceHiveHCatalogHBaseOozieZooKeeperPigSqoop

  • 提供dashboard用于查看集群健康程度,如:热度图

  • 能够直观的查看MapReducePigHive应用特点,提供 易用的方式考察其执行情况

HBase

Hadoop项目子项目,高可靠、高性能、面向列、可伸缩的分布式 存储系统

  • 该技术源于Fay Chang撰写的Google论文《Bigtable:一个 结构化数据的分布式存储系统》,类似于Bigtable在Google 文件系统上提供的分布式数据存储一样,HBaseHadoop的 基础上提供了类似于Bigtable的能力

  • 适合非结构化数据存储

  • 可用于在廉价PC Server上搭建大规模结构化存储集群,是 NoSQL数据库的两个首选项目(MongoDB

Hive

基于Hadoop的数据仓库工具

  • Hive中建立表,将表映射为结构化数据文件

  • 可以通过类SQL语句直接查询数据实现简单的MapReduce统计, 而不必开发专门的MapReduce应用

    • Hive会将SQL语句转换为MapReduce任务查询Hadoop
    • 速度很慢
    • 适合数据仓库的统计分析
    • 支持SQL语法有限

Pig

基于Hadoop的大规模数据高层分析工具(类似于Hive

  • 提供SQL-Like语言PigLatin

    • 其编译器会把类SQL的数据分析请求,转换为一系列经过 优化处理的MapReduce运算

    • 是一种过程语言,和Hive中的类SQL语句相比,更适合写 脚本,而Hive的类SQL语句适合直接在命令行执行

Zookeeper

Hadoop正式子项目,针对大型分布式应用设计的分布式、开源协调 系统

  • 提供功能:配置维护、名字服务、分布式同步、组服务

  • 封装好复杂、易出错的关键服务,提供简单易用、功能稳定、 性能高效的接口(系统),解决分布式应用中经常遇到的数据 管理问题,简化分布式应用协调及管理难度,提供高性能分布式 服务

  • 通常为HBase提供节点间的协调,部署HDFSHA模式时是 必须的

Spark

基于内存计算的开源集群计算系统,目的是让数据分析更加快速

低频

Mahout

基于Hadoop的机器学习、数据挖掘的分布式框架

  • 使用MapReduce实现了部分数据挖掘算法,解决了并行挖掘问题

    • 包括聚类、分类、推荐过滤、频繁子项挖掘
  • 通过使用Hadoop库,Mahout可以有效扩展至云端

Cassandra

开源分布式NoSQL数据库系统,最初由Facebook开发,用于存储 简单格式数据,集Google BigTable数据模型和Amazon Dynamo 的完全分布式架构于一身

Avro

数据序列化系统,设计用于支持数据密集型、大批量数据交换应用, 是新的数据序列化格式、传输工具,将逐步取代Hadoop原有的 IPC机制

Chukwa

用于监控大型分布式系统的开源数据收集系统,可以将各种类型的 数据收集成适合Hadoop处理的文件,保存在HDFS中供MapReduce 操作

Tez

基于YARN的泛用数据流编程平台

  • 提供强力、灵活的引擎用于执行任何DAG任务,为批处理和 交互用例处理数据

Tez正逐渐被HivePigHadoop生态框架采用,甚至被一些 商业公司用于替代MapReduce作为底层执行引擎

其他Hadoop相关项目

高频

Sqoop

用于将Hadoop和关系型数据库中数据相互转移的开源工具

  • 可以将关系型数据库(MySQLOraclePostgres)中 数据转移至HadoopHDFS

  • 也可以将HDFS的数据转移进关系型数据库中

Impala

Cloudera发布的实时查询开源项目

  • 模仿Google Dremel

  • 称比基于MapReduceHive SQL查询速度提升3~30倍,更加 灵活易用

Phoenix

apache顶级项目,在HBase上构建了一层关系型数据库,可以用 SQL查询HBase数据库,且速度比Impala更快,还支持包括 二级索引在内的丰富特性,借鉴了很多关系型数据库优化查询方法

Oozie

工作流引擎服务器,用于管理、协调运行在Hadoop平台 (HDFSPigMapReduce)的任务

Cloudera Hue

基于Web的监控、管理系统,实现对HDFSMapReduce/YARNHBaseHivePigWeb化操作和管理

低频

Hama

基于HDFSBSP(Bulk Synchronous Parallel)并行 计算框架,可以用包括图、矩阵、网络算法在内的大规模、 大数据计算

Flume

分布的、可靠的、高可用的海量日志聚合系统,可用于日志数据 收集、处理、传输

Giraph

基于Hadoop的可伸缩的分布式迭代图处理系统,灵感来自于BSPGoogle Pregel

Crunch

基于Google FlumeJava库编写的Java库,用于创建MapReduce 流水线(程序)

  • 类似于HivePig,提供了用于实现如连接数据、执行聚合 、排序记录等常见任务的模式库

    • 但是Crunch不强制所有输入遵循同一数据类型

    • 其使用一种定制的类型系统,非常灵活,能直接处理复杂 数据类型,如:时间序列、HDF5文件、HBase、序列化 对象(protocol bufferAvro记录)

  • 尝试简化MapReduce的思考方式

    • MapReduce有很多优点,但是对很多问题,并不是合适的 抽象级别

    • 出于性能考虑,需要将逻辑上独立的操作(数据过滤、投影 、变换)组合为一个物理上的MapReduce操作

Whirr

运行于云服务的类库(包括Hadoop),提供高度互补性

  • 相对中立
  • 支持AmazonEC2Rackspace的服务

Bigtop

Hadoop及其周边生态打包、分发、测试的工具

HCatalog

基于Hadoop的数据表、存储管理,实现中央的元数据、模式管理, 跨越HadoopRDBMS,利用PigHive提供关系视图

Llama

让外部服务器从YARN获取资源的框架

CDH组件

Fuse

HDFS系统看起来像普通文件系统

Hadoop Streamin

MapReduce代码其他语言支持,包括:C/C++PerlPythonBash

MapReduce YARN

MapReduce1.0组件

MapReduce1.0是指Hadoop1.0中组件,不是指MapReduce计算模型

优势

  • 方便扩展:能够运行在普通服务器构成的超大规模集群上

  • IO瓶颈:通过将IO分散在大规模集群的各个节点上,可以提高 数据装载速度(大规模数据时只有部分数据可以状态在内存中)

局限

  • MapReduce计算模型问题(参见MapReduce计算模型

  • 数据处理延迟大

    • MapReduce作业在Map阶段、Reduce阶段执行过程中,需要 把中间结果存盘
    • 在MR作业间也需要通过磁盘实现作业间的数据交换
  • 资源利用率低

    • 任务调度方法远未达到优化资源利用率的效果,给每个 TaskTracker分配任务的过程比较简单

资源分配

  • 每个TaskTracker拥有一定数量的slots,每个活动的Map、 Reduce任务占用一个slot

  • JobTracker把任务分配给最靠近数据、有slot空闲TT

  • 不考虑Task运算量大小,所有Task视为相同,如果有某个TT 当前负载过高,会影响整体的执行

  • 也可以通过Speculative Execution模式,在多个slave上 启动同一个任务,只要其中有一个任务完成即可

执行引擎

MapReduce执行引擎运行在HDFS上

  • JobTracker:运行在NameNode上

    • 分解客户端提交的job为数据处理tasks,分发给集群里相关 节点上的TaskTacker运行
    • 发送任务原则:尽量把任务推送到离数据最近的节点上, 甚至推送到数据所在的节点上运行
  • TaskTracker:运行在DataNode上

    • 在节点上执行数据处理map、reduce tasks
    • 可能需要从其他DataNode中获取需要数据

MapRedudce2.0

Shuffle

Shuffle:系统执行排序的过程

  • 为了确保MapReduce的输入是按键排序的

Map端

每个Map Task都有一个内存缓冲区用于存储map输出结果,缓冲区 快满时需要将缓冲区数据以临时文件方式存放到磁盘中,整个Task 结束后再对此Map Task产生所有临时作合并,生成最终正式输出文件 ,等待Reduce Task拉数据

YARN

Yet Another Resource Negotiator,通用任务、集群资源分配框架 ,面向Hadoop的编程模型

YARN优势

扩展性

  • YARN将classic/MapReduce1中Jobtracker职能划分为多个独立 实体,改善了其面临的扩展瓶颈问题

  • MapReduce现在只是批数据处理框架,是YARN支持的数据处理 框架的一种,独立于资源管理层,单独演化、改进

  • YARN精妙的设计可以让不同的YARN应用在同一个集群上共存, 如一个MapReduce应用可以同时作为MPI应用运行,提高可管理性 、集群利用率

高效率

  • ResourceManager是单独的资源管理器

  • Job Scheduler指负责作业调度

  • 根据资源预留要求、公平性、Service Level Agreement等标准 ,优化整个集群的资源利用

一般性

YARN是通用资源管理框架,在其上可以搭建多种数据处理框架

  • 批处理:MapReduce
  • 交互式处理:Tez
  • 迭代处理:Spark
  • 实时流处理:Storm
  • 图数据处理:GraphLab/Giraph

YARN中实体

ResourceManager

RM物理上对应主节点,逻辑上管理集群上的资源使用,其功能由 Scheduler、ApplicationManager协调完成

  • AppplicatonManager:接受、监控任务

    • 接受客户端提交的job

    • 判断启动该job的ApplicationMaster所需的资源

    • 监控ApplicationMaster的状态,并在失败时重启其

  • Scheduler:分配资源、调度

    • Schedular计算启动ApplicationManager提交的job的AM所需 资源,将资源封装成Container

    • 然后根据调度算法调度,在某个NM上启动job的AM

    • 不提供失败重启、监控功能

    • Scheduler收到AM任务完成汇报之后,回收资源、向RM返回 执行结果

    • 调度算法可自定以,YARN根据不同场景提供

      • FIFO Scheduler
      • Capacity Scheduler
      • Fair Scheduler

NodeManager

NM物理上对应计算节点,逻辑上监控、管理当前节点资源

  • 仅仅抽象本节点资源(cpu、内存、磁盘、网络等),并且定时 像RM的Scheduler汇报

  • 接受并处理AM的tasks启动、停止等请求

ApplicationMaster

AM管理集群上运行任务生命周期

  • 每个job都有一个专用的AM

  • AM启动后会计算job所需资源,并向Scheduler申请资源

  • AM运行在job运行期间,负责整个job执行过程的监控

    • NM分配完任务container后,AM开始监控这些containers 、tasks状态
    • 任务失败则回收资源重新生成
    • 成功则释放资源
    • 任务执行完毕后回报Scheduler

Containers

YARN为将来的资源隔离提出的框架,是一组资源的集合,每个task 对应一个container,只能在container中运行

  • 容器有特定的内存分配范围

    • 容器内存最小值即为内存分配单位,内存最大值也应该是 内存分配单位整数倍

    • 根据任务所需资源多少分配给容器整数倍内存单位,但是 如果任务所需内存大于容器内存最大值,运行时可能会报错

  • 由NM确保task使用的资源不会超过分配的资源

  • 注意

    • AM并不运行于container中,真正的task才运行在container

yarn_procedure

Job运行过程

作业提交

  • 从RM获取新的作业ID
  • 作业客户端检查作业输出说明,计算输入分片(也可以配置 在集群中产生分片)
  • 将作业资源复制到HDFS
  • 调用RM上的submitApplication方法提交作业

作业初始化

  1. RM收到调用submitApplication消息后,将请求传递给 内部scheduler,scheduler分配一个container
  2. NM在RM的管理下在容器中启动应用程序的master进程AM, 其对作业进行初始化
  3. AM创建多个簿记对象用于接受任务进度、完成报告,保持 对作业进度的跟踪
  4. AM接受来自共享文件系统的在客户端计算的输入分片,对 每个分片创建一个map对象,及由mapreduce.job.reduces 属性确定的多个reduce任务对象
  5. AM根据任务大小决定如何运行job,如果在新容器中分配、 运行任务的开销大于并行运行时的开销,AM会在单个节点 上运行,这样的作业称为uberized
  6. AM在任何tasks执行之前通过job的setup方法设置job的 OutputCommiter,建立作业输出目录

任务分配

  1. 若作业不适合作为uber任务运行,AM为该作业中所有map 、reduce任务向RM请求容器
  2. 请求附着heart beat,包括每个map任务的数据本地化信息 ,特别是输入分片所在的主机、机架信息,scheduler据此 做调度决策
    • 理想化情况下任务分配到数据本地化节点
    • 否则优先使用机架本地化
  3. 请求同时指定了任务内存需求,YARN中的资源分为更细粒度 ,task可以请求最小到最大限制范围、任意最小值倍数的 内存容量

任务执行

  1. 当NM的scheduler为task分配了container,AM就可以通过 与NM通信启动容器
  2. 任务由YarnChild执行,在执行任务之前,需要将任务 所需资源本地化,包括作业的配置、JAR文件、所有来自 分布式缓存的文件,然后运行map、reduce任务
  3. 对于Streaming、Pipes程序,YarnChild启动Streaming、 Pipes进程,使用标准输入输出、socket与其通信(以 MapReduce1方式运行)

进度和状态更新

  1. task每3s通过umbilical接口向AM汇报进度、状态(包括 计数器),作为job的aggregate view
  2. 客户端则默认没1s查询AM接受进度更新

作业完成

  1. 客户端每5s通过调用job的waitForCompletion检查作业 是否完成,也可以通过HTTP callback完成作业
  2. 作业完成后AM和task容器清理工作状态,OutputCommiter 作业清理方法被调用

todo:这里逻辑有问题,要删

MapReduce计算模型

  • 分布式系统,不像一般的数据库、文件系统,无法从上至下 、从头到尾进行求和等操作
  • 需要由分散的节点不断向一个点聚拢的计算过程,即分布式系统 上计算模型基本都是由map、reduce步骤组成

MapReduce

MapReduce每步数据处理流程包括两个阶段

  • Map:映射

    • map过程相互独立、各mapper见不通信,所以mapreduce 只适合处理独立计算的任务
  • Reduce:归一

    • reduce直接处理map的输出,reduce的为map输出

数据处理过程

  • 需要把任何计算任务转换为一系列MapReduce作业,然后依次 执行这些作业

  • 计算过程的各个步骤之间,各个作业输出的中间结果需要存盘, 然后才能被下个步骤使用(因为各个步骤之间没有明确流程)

  • One Pass Computation:只需要一遍扫描处理的计算任务 MapReduce计算模型非常有效

  • Multi Pass Computation:需要在数据上进行多遍扫描、处理 的计算任务,需要执行多个MapReduce作业计算任务,因为 多副本复制、磁盘存取,其效率不高

Mapred on DAG

Directed Acyclic Graph;表示数据处理流程的有向无环图

  • 顶点:数据处理任务,反映一定的业务逻辑,即如何对数据进行 转换和分析
  • 边:数据在不同的顶点间的传递

比较

比较方面 MapReduce DAG
操作原语 map、reduce 较多
抽象层次
表达能力
易用性 要手动处理job之间依赖关系,易用性差 DAG本身体现数据处理流程
可读性 处理逻辑隐藏在代码中,没有整体逻辑 较好
  • 正是MapReduce提供操作原语少、抽象层次低,所以其表达能力 差,同时需要用户处理更多的逻辑,易用性、可读性差

    • 复杂数据处理任务,如:机器学习算法、SQL连接查询很难 表示用MapReduce计算默认表达

    • 操作原语多并不是DAG本身的要求,DAG本身只是有向无环图, 只是使用DAG计算模型可以提供更多的操作原语

  • 由于DAG的表达能力强于MapReduce,对某些处理逻辑,DAG 所需作业数目小于MapReduce,消除不必要的任务

    • DAG显著提高了数据处理效率,对小规模、低延迟和大规模、 高吞吐量的负载均有效

    • MapReduce需要通过把中间结果存盘实现同步,而DAG整合 部分MapReduce作业,减少磁盘I/O

    • reduce任务需要等待map任务全部完成才能继续,DAG优化 数据处理流程,减少同步Barrier

  • DAG部分计算模型也由map、reduce任务构成,只是不像传统 MapReduce计算模型中map、reduce必须成对出现

    • 或者说DAG只有一次map任务(扫描数据时),其余都是 reduce任务?

    • 从MapReduce配置也可以看出,MapReduce可以选择基于 yarnyarn-tez