Hive

Hive简介

Hive是Hadoop平台上的数据仓库,面向结构化数据分析

  • 结构化数据文件映射为一张数据库表

  • 提供完整的SQL查询功能,所用语言称为HiveQL

    • Hive将HiveQL转换为MapReduce作业,在hadoop平台运行
    • Hive相当于一个在hadoop平台上的SQL Shell
    • 方便用户使用HiveQL快速实现简单数据分析、统计,而不必 开发专用MapReduce程序,学习成本低
  • 相较于传统关系数据库,Hive具有如下特点

    ||Hive|传统关系型数据库| |———|———|———-| |数据存储|HDFS分布式文件系统|服务器本地文件系统| |查询处理|MapReduce计算模型|自行设计的查询处理模型| |应用场景|海量数据分析处理|高性能查询,实时性好| |数据更新|不支持对具体数据行修改,只能覆盖、追加|支持| |事务处理|不支持|支持| |索引支持|不支持,一般需要对数据进行全部扫描|支持,多种索引| |扩展能力|基于Hadoop平台,存储、计算强大的扩展能力|扩展性较差| |数据加载|Writing Time Schema:数据加载时无需进行模式检查,在读取数据时对数据以一定模式进行解释|Reading Time Schema:要求数据必须符合数据库表结构|

Hive服务端组件

Driver

负责将用户的编写的HiveQL查询语句进行解析、编译、优化、生成 执行计划,然后调用底层MapReduce计算模型执行,包括

  • Compiler:编译器
  • Optimizer:优化器
  • Executor:执行器

MetaStore

元信息管理器,对Hive正确运行举足轻重

  • MetaStore实际上就是Thrift服务

    • MetaStore客户端(hive、spark shell等)和服务端通过 thrift协议进行通信
    • 客户端通过连接metastore服务,实现对元数据的存取
    • 通过Thrift获取元数据,屏蔽了访问MetaStore Database 所需的驱动、url、用户名、密码等细节
  • 负责存储元数据在关系型数据库(称为MetaStore Database)

    • 元数据包括Hive创建的database、table等元信息
    • 支持的关系型数据库
      • Derby:Apache旗下Java数据库
      • MySQL
  • MetaStore服务可以独立运行,可以让多个客户端同时连接、 甚至安装到远程服务器集群,保持Hive运行的健壮性

Embedded Metastore Server(Database Derby)

内嵌模式:使用内嵌的Derby数据库存储元数据

  • 不需要额外起Metastore服务
  • 一次只能一个客户端连接,使用做实验,不适合生产环境
  • Derby默认会在调用hive命令所在目录的metastore_db文件中 持久化元数据

embeded_metastore_database

Local Metastore Server

本地元存储

  • 采用外部数据库,支持

    • MySQL
    • Postgres
    • Orcale
    • MSSQL
  • 数据库独立于hive部署,hive服务使用JDBC访问元数据,多个 服务可以同时进行

  • 本地元存储不需要单独起metastore服务,用的是跟hive在同一 进程metastore服务

local_metastore_server

Remote Metastore Server

远程元存储

  • 类似于本地元存储,只是需要单独启动metastore服务,和hive 运行在不同的进程(甚至主机)中

  • 需要在每个客户端配置文件配置连接到该metastore服务

    • hive通过thrift访问metastore
  • 此模式可以控制到数据库的连接

remote_metastore_server

hiveserver2

基于的Thrift RPC实现

  • 远程客户端可以通过hiveserver2执行对hive的查询并返回结果

    • 支持多客户端并发、身份验证
  • 可以使用JDBC、ODBC、Thrift连接hiveserver2(Thrift Server 特性)

  • hiveserver2也能访问元数据,不依赖于metastore服务

Hive客户端组件

CLI

Command Line Interface

  • 允许用户交互式的使用Hive

THrift Client/beeline

基于Thrift的JDBC Client

  • 包括JDBC/ODBC驱动程序

WEB GUI

允许用户通过WEB GUI图形界面访问Hive

  • 需要首先启动Hive Web Interface服务

Hive查询处理

过程

  1. 用户提交HQL至Driver
  2. Driver把查询交给Compiler,Compiler使用MetaStore中元信息 检查、编译
  3. 查询经过Optimizer优化交由Executor Engine执行,转换为 MapReduce作业后调用MapReduce执行
  4. MapReduce存取HDFS,对数据进行处理,查询结果返回Driver

数据类型

  • 基础数据类型

    • Integer
    • Float
    • Double
    • String
  • 复杂数据类型:通过嵌套表达复杂类型

    • Map
    • List
    • Struct
  • 还允许用户自定以类型、函数扩展系统

数据存储模型

使用传统数据库:Table、Row、Column、Partition等概念,易于 理解

Database

相当于关系型数据库中的Namespace

  • 将不同用户数据隔离到不同的数据库、模式中

Table

表格

  • 逻辑上由存储的数据、描述数据格式的相关元数据组成

    • 表格数据存放在分布式文件系统(HDFS)中
    • 元数据存储在MetaStore服务指定关系型数据库中
  • 创建表格、加载数据之前,表格在HDFS中就是一个目录, 表格分为两种类型

    • 托管表:数据文件存放在Hive数据仓库中,即HDFS中的一个 目录,是Hive数据文件默认存放路径
    • 外部表:数据文件可以存放在其他文件系统中

Partition

根据“分区列”的值,对表格数据进行粗略划分的极值

  • 存储上:是Hive中表格主目录的子目录,名字即为定义的分区列 名字

  • 逻辑上:分区不是表中的实际字段,是虚拟列

    • 根据虚拟列(可能包含多个实际字段)划分、存储表格数据
    • 同一虚拟列中字段通常应该经常一起被查询,这样在需要 存取部分数据字段时,可以只扫描部分表

Bucket

Table、Partition都是目录级别的数据拆分,指定Bucket的表格, 数据文件将按照规律拆分成多个文件

  • 每个桶就是table、partition目录中的文件

  • 一般使用Hash函数实现数据分桶,创建表时,需要指定桶数量、 分桶操作依据的列

  • 用户执行Sample查询时,Hive可以使用分桶信息,有效的Prune Data,如:对每个目录下单个桶文件进行查询

Hadoop 计算模型

分布式计算模型

分布式系统,不像一般的数据库、文件系统,无法从上至下、从头 到尾进行求和等操作,需要由分散的节点不断向一个点聚拢计算过程 ,考虑将分布式计算模型可以考虑分为两部分

  • 分布:操作原语
  • 聚合:处理流程

MapReduce

MapReduce计算模型主要描述是分布:操作原语部分, 但是此也包含了聚合:处理流程

操作原语

  • map:映射
    • 以键值对二元组为主要处理对象
    • map过程相互独立、各mapper相互不通信的
    • 所以mapper比较适合独立计算任务
  • reduce:规约
    • 直接处理map输出,reduce中二元组键位map过程中输出值

处理流程

以hadoop中MapReduce为例

  • 需要把任何计算任务转换为一系列MapReduce作业,然后依次 执行这些作业

  • 计算过程的各个步骤之间,各个作业输出的中间结果需要存盘, 然后才能被下个步骤使用(因为各个步骤之间没有明确流程)

缺陷

操作原语部分
  • 提供原语少:需要用户处理更多逻辑,易用性差

  • 所以抽象层次低:数据处理逻辑隐藏在用户代码中,可读性差

  • 所以其表达能力有限:

    • 复杂数据处理任务,如:机器学习算法、SQL连接查询很难 表示用MapReduce计算默认表达
处理流程部分
  • MapReduce需要通过把中间结果存盘实现同步,I/O延迟高

  • reduce任务需要等待map任务全部完成才能继续,同步Barrier 大

适合场景

  • One Pass Computation:只需要一遍扫描处理的计算任务 MapReduce计算模型非常有效

    • 批数据处理
  • Multi Pass Computation:需要在数据上进行多遍扫描、处理 的计算任务,需要执行多个MapReduce作业计算任务,因为 多副本复制、磁盘存取,其效率不高

DAG

Directed Acyclic Graph:只是表示数据处理流程的有向无环图

  • 顶点:数据处理任务,反映一定的业务逻辑,即如何对数据进行 转换和分析
  • 边:数据在不同的顶点间的传递

应用

  • DAG本身并不涉及如何处理数据,只是对数据数据流程的规划

  • 所以DAG并不能改进MapReduce计算模型中原语部分的缺陷,只能 优化数据处理流程

    • 减少MapReduce作业中间结果存盘,减少磁盘I/O
    • 优化map、reduce任务执行,减少同步Barrier

这也正是Tez所做的事情

RDD

HBase

HBase简介

HBase是高可靠、高性能、面向列、可伸缩的分布式数据库系统

  • 利用HBase技术可以在廉价硬件上搭建大规模非结构化数据管理 集群

  • HBase借鉴Google Bigtable技术实现的开源软件

    ||HBase|Bigtable| |——-|——-|——-| |存储系统|HDFS|GFS| |数据处理|Hadoop MapReduce|MapReduce| |协同服务|Zookeeper|Chubby| |RDBMS数据导入|Sqoop|-|

  • HBase访问接口

    • Native Java API:常用、高效的访问方式
    • HBase Shell:HBase命令行工具,适合用于管理HBase
    • Thrift Gateway:利用Thrift序列化技术,支持C++、PHP、 Python多种语言异构系统访问HBase表数据
    • REST Gateway:支持REST风格的Http API访问HBase
    • Pig:支持Pig Latin语言操作HBase中数据
      • 最终被转换为MapReduce Job处理HBase表数据
      • 适合做数据统计
    • Hive:支持用户使用HiveQL访问HBase
  • 可以在HBase系统上运行MapReduce作业,实现数据批处理 hbase_mapreduce

HBase数据结构

hbase_storage_structure

Table

HBase的表格,类似关系型数据库中的表格,但有所不同

特殊Table

HBase中有两张特殊的Table

  • .META.:记录用户表Region信息,自身可以有多个region
  • -ROOT-:记录.META.表Region信息的,自身只能有一个 region

Row Key

行键,Table行主键,Table记录按照此排序

Column、Column Family

  • Table在水平方向由一个或多个列簇组成
  • 一个列簇可以由任意多个Column组成
  • 列簇支持动态扩展,无需预先定义列数量、类型
  • 所有列均义二进制格式存储,用户需要自行进行类型转换

Timestamp

时间戳:每次数据操作对应的时间戳,可视为是数据的版本号

Region

Table记录数不断增加而变大后,逐渐分裂出的多个split

  • 每个region由[startkey, endkey)表示
  • 不同region被Master分配给相应RegionServer进行管理(存储)

HBase系统架构

hbase_structure

Client

  • HBase Client使用HBase RPC机制同HMaster、HRegionServer 进行通信
    • 对于管理类操作,通过RPC机制访问HMaster
    • 对于读写操作,通过RPC机制访问HRegionServer

Zookeeper

  • Zookeeper Quorum中记录-ROOT表的位置

    • 客户端访问数据之前首先访问zookeeper
    • 然访问-ROOT-
    • 然后访问.META.
    • 最后根据用户数据位置,访问具体数据
  • Zookeeper Quorum中存储有HMaster地址

  • HRegionServer把自己义Ephemeral方式注册到Zookeeper中, 使得HMaster可以随时感知各个HRegionServer健康状态

  • 引入Zookeeper,避免了HMaster单点失败问题

    • HBase中可以启动多个HMaster
    • 通过Zookeeper的Master Election机制保证总有一个Master 运行

HMaster

HMaster在功能上主要负责Table、Region管理工作

  • 管理用户对Table增、删、查、找操作???
  • 管理HRegionServer负载均衡,调整Region分布
  • 在Region分裂后,负责新Region分配
  • 在HRegionServer停机后,负责失效HRegionServer上region迁移

HRegionServer

HRegionServer负责响应用户I/O请求,向HDFS文件系统写数据,是 HBase中最核心的模块

hbase_hregion_server_structure

HRegion

HRegionServer内部管理一系列HRegion对象

  • HRegion对象对应Table中一个Region
  • HRegion由多个HStore组成

HStore

每个HStore对应Table中一个列簇的存储,是HBase存储核心模块

  • 由此可以看出列簇就是一个集中存储单元

    • 因此最好将具备共同IO特性的列放在同一个列簇中,可以 提高IO效率
  • HStore由两部分构成

    • MemStore
    • StoreFile:底层实现是HFile,是对HFile的轻量级包装
MemStore

Sorted memory buffer,用户写入数据首先放入MemStore中,满了 后Flush成一个StoreFile

StoreFile
  • 文件数量增长到一定阈值时会触发Compact合并操作,将多个 StoreFile合并成一个StoreFile

    • 合并过程中会进行版本合并、数据删除
    • 即HBase其实只有增加数据,所有更新、删除操作都是后续 Compact过程中进行的
    • 这使得用户写操作只要进入内存就可以立即返回,保证 HBase IO高性能
  • Compact操作会逐步形成越来越大的StoreFile,超过阈值之后 会触发Split操作

    • 当前Region分裂成2个Region
    • 父Region下线
    • 新分裂出的2个子Region会被HMaster分配到相应的 HRegionServer上,实现负载均衡

HLog

每个HRegionServer中都有一个HLog对象,避免因为分布式系统 中节点宕机导致的MemStore中内存数据丢失

  • HLog是实现WriteAheadLog的类

  • HLog作用

    • 每次用户写入MemStore时,也会写入一份数据至HLog文件中
    • HLog定时删除已持久化到StoreFile中的数据
  • HRegion意外终止后,HMaster会通过zookeeper感知到

    • HMaster首先处理遗留的HLog文件,将其中不同Region的Log 数据进行拆分,分别放到相应Region目录下
    • 然后将失效Region重新分配
    • 领取到Region的HRegionServer在Load Region过程中,会 发现有历史HLog需要处理,会Replay HLog中的数据到 MemStore中,然后flush到StoreFile中,完成数据恢复

HBase存储

HBase中所有数据存储在HDFS中

HFile

HFile是Hadoop二进制格式文件,实现HBase中Key-Value数据存储

  • HFile是不定长的,长度固定的只有:Trailer、FileInfo

hbase_hfile_storage_formation

Trailer

含有指针指向其他数据块起点

FileInfo

记录文件的一些元信息,如

  • AVG_KEY_LEN
  • AVG_VALUE_LEN
  • LAST_KEY
  • COMPARATOR
  • MAX_SEQ_ID_KEY

Data Index

记录每个Data块起始点

Meta Index

记录每个Meta块起始点

Data Block

Data Block是HBase IO基本单元

  • 为了提高效率,HRegionServer中实现了基于LRU的Block Cache 机制

  • Data块大小可以在创建Table时通过参数指定

    • 较大的块有利于顺序Scan
    • 较小的块有利于随机查询
  • Data块除了开头的Magic信息外,就是一个个<key, value> 键值对拼接而成

  • Magic内容就是一些随机数字,防止数据损坏

  • 每个键值对就是简单的byte array,但是包含很多项,且有固定 结构 hbase_hfile_datablock_kv

    • 开头是两个固定长度的数值,分别表示key、value长度
    • 然后是key部分
      • 固定长度数值表示RowKey长度
      • RowKey
      • 固定长度数值表示Column Family的长度
      • Column Family
      • Qualifier
      • 固定长度数值表示:Timestamp、KeyType(Put/Delete)
    • Value部分就是二进制数据

HLogFile

HBase中Write Ahead Log存储格式,本质上是Hadoop Sequence File

  • Sequence File的Key是HLogKey对象,记录了写入数据的归属信息

    • table
    • region
    • squecence number:起始值为0,或最近一次存入文件系统 中的squence number
    • timestamp:写入时间
  • Squence File的Value是KeyValue对象,即对应HFile中KeyValue

e

Zookeeper

Zookeeper是一个协调软件服务,用于构建可靠的、分布式群组

  • 提供群组成员维护、领导人选举、工作流协同、分布式系统同步 、命名、配置信息维护等服务

  • 提供广义的分布式数据结构:锁、队列、屏障、锁存器

  • Zookeeper促进户端间的松耦合,提供最终一致的、类似传统 文件系统中文件、目录的Znode视图,提供基本操作,如:创建 、删除、检查Znode是否存在

  • 提供事件驱动模型,客户端能观察到Znode的变化

  • Zookeeper运行多个Zookeeper Ensemble以获得高可用性,每个 服务器上的Ensemble都持有分布式系统内存副本,为客户端读取 请求提供服务

zookeeper_structure

Flume

Flume是分布式日志收集系统,收集日志、事件等数据资源,并集中 存储

Flume组件、结构

  • 旧版本组件、结构 flume_struture_old_version

  • 新版本组件、结构:每个Flume整体称为Agent flume_dataflow

    • 两个版本的组件功能、数据流结构都有区别
    • 但是3大组件基本可以一一对应(功能略有差异)
  • Agent是一组独立的JVM守护进程,从客户端、其他Agent接收 数据、迅速传递给下个目的节点

  • 支持多路径流量、多管道接入流量、多管道接出流量、上下文 路由

Source(Agent)

采集数据,是Flume产生数据流的地方

  • 运行在数据发生器所在的服务器上,接收数据发生器接受数据, 将数据以event格式传递给一个或多个Channel

  • 支持多种数据接收方式

    • Avro Source:支持Avro RPC协议,内置支持
    • Thrift Source:支持Thrift协议
    • Exec Source:支持Unix标准输出
    • JMS Source:从JMS(消息、主题)读取数据
    • Spooling Directory Source:监控指定目录内数据变更
    • Twitter 1% firehose Source:通过API持续下载Twitter 数据
    • Netcat Source:监控端口,将流经端口的每个文本行数据 作为Event输入
    • Sequence Generator Source:序列生成器数据源
    • HTTP Source:基于POST、GET方式数据源,支持JSON、BLOB 格式
  • 收集数据模式

    • Push Source:外部系统主动将数据推送到Flume中,如 RPC、syslog
    • Polling Source:Flume主动从外部系统获取数据,如 text、exec

Channel (Collector)

暂时的存储容器,缓存接收到的event格式的数据,直到被sink消费

  • 在source、sink间起桥梁作用

  • Channel基于事务传递Event,保证数据在收发时的一致性

  • Channel可以和任意数量source、sink连接

  • 主要Channel类型有

    • JDBC channel:数据持久化在数据库中,内置支持Derby
    • File Channel:数据存储在磁盘文件中
    • Memory Channel:数据存储在内存中
    • Spillable Meemory Channel:优先存在内存中,内存队列 满则持久到磁盘中
    • Custom Channel:自定义Channel实现

Sink(Storage Tier)

将从Channel接收数据存储到集中存储器中(HDFS、HBase)

Flume Event

Flume事件是内部数据传输最小基本单元、事务处理基本单位

  • 由一个装载数据的byte array、可选header构成
    • 数据对Flume是不透明的
    • header是容纳键值对字符串的无需集合,键在集合内唯一
    • header可以在上下文路由中使用扩展,如:数据清洗
  • Event将传输数据进行封装

Flume架构特性(旧版)

Reliablity

Flume提供了3种数据可靠性选项

  • End-to-End:使用磁盘日志、接受端Ack的方式,保证Flume接收 数据最终到导致目的地

  • Store on Failure:目的地不可用时,将数据保存在本地硬盘, 但进程如果出问题,可能丢失部分数据(发送后目的地不可用)

  • Best Effort:不做任何QoS保证

Scalability

易扩展性

  • Flume三大组件都是可伸缩的
  • Flume对事件的处理不需要带状态,Scalability容易实现

Avaliablity

高可用性:Flume引入Zookeeper用于保存配置数据

  • Zookeeper本身可以保证配置数据一致性、高可用
  • 在配置数据发生变化时,Zookeeper通知Flume Master节点 Flume Master节点通过gossip协议同步数据

flume_distributed_deployment

Manageablity

易管理性

  • 多个Master,保证可以管理大量节点

Extensibility

可开发性:可以基于Java为Flume添加各种新功能

  • 实现Source子类,自定义数据接入方式
  • 实现Sink子类,将数据写入特定目标
  • 实现SinkDecorator子类,对数据进行一定的预处理

适合场景

  • 高效率的从多个网站服务器收集日志信息存储在HDFS上
  • 将从多个服务器获取的数据迅速移交给Hadoop
  • 可以收集社交网络站点事件数据,如:facebook、amazon

Kafka

分布式、分区的、可复制的Message System(提交日志服务), 得益于特有的设计,Kafka具有高性能、高可扩展的特点

  • 完全分布式系统,易于横向扩展、处理极大规模数据
  • 同时为发布、订阅提供极高吞吐能力
  • 支持多订阅,出现失败状态时,可以自动平衡消费者
  • 将消息持久化到磁盘,保证消息系统可靠性,可用于消息 批量消费应用(ETL系统)、实时应用

Kafka组件

kafka_structure

Topic

话题:特定类型的消息流

  • 话题是消息的分类机制

    • 消息产生器向Kafka发布消息必须指定话题
  • Kafka安照Topic维护接收到的消息

    • 话题被划分为一系列分区
    • Kafka集群为每个Topic维护一个分区日志文件存储消息

消息是字节的Payload(有效载荷)

Producer

生产者:向Kafka发布消息的进程

  • 生产者需要指定消息分配至哪个分区
    • 采用Round-Robin方式方便均衡负载
    • 根据应用的语义要求,设置专用Partition Function进行 消息分区

Broker

代理:AMQP客户端,保存已经发布消息的服务器进程

AMQP:the Advanced Message Queuing Protocal,标准开放 的应用层消息中间件协议。AMQP定义了通过网络发送的字节流 的数据格式,兼容性非常好,任何实现AMQP协议的程序可以和 兼容AMQP协议兼容的其他应用程序交互,容易做到跨语言、 跨平台。

  • 一组代理服务器构成Kafka集群

  • Kafka代理是无状态的,消费者需要自行维护已消费状态信息

    • 因此Kafka无法知晓信息是否已经被消费、应该删除,因此 代理使用简单的、基于时间的Serice Level Agreement应用 于保留策略,消息在代理中超过一定时间自动删除

    • 这种设计允许消费者可以重复消费已消费数据

      • 虽然违反队列常见约定
      • 但是实际应用中很多消费者有这种特征
  • 消息代理将紧密耦合的系统设计解耦,可以对未及时处理的消息 进行缓存

    • 提高了吞吐能力
    • 提供了分区、复制、容错支持
  • Kafka代理通过Zookeeper与其他Kafka代理协同 kafka_with_zookeeper

    • 系统中新增代理或代理故障失效时,Zookeeper通知生产者 、消费者
    • 生产者、消费者据此开始同其他代理协同工作

Consumer

消费者:向Kafka subscribe话题,以处理Kafka消息的进程

  • 消费者可以订阅一个或多个话题,从代理拉取数据,消费已经 发布的消息

  • 消费者获取消息系统一般采用两种模型

    • Queuing:队列模型,一组消费者从一个服务器读取信息, 每个消息仅可被其中一个消费者消费

    • Publish Subscribe:发布订阅模型,消息被广播给所有 消费者

  • Kafka采用一种抽象方法:消费者组Consumer Group提供对上述 两种消息系统模型的支持 kafka_comsumer_group

    • 给每个消费者打上属于某个消费者组的标签(这里组只是 表示同组内消费者只能有一个消费信息)

    • 每个发布到话题的消息分发给消费者组的其中一个消费者

    • 一般情况下每个话题下有多个消费者组,每个组中有多个 消费者实例,以达到扩展处理能力、容错

    • 极端情况:如果所有消费者实例都隶属于同一个消费者组, Kafka工作模式类似于队列模型;所有消费者实例隶属于 不同的消费者组,Kafka工作模式类似于发布-订阅模型

消息分区、存储、分发

分区日志

每个分区是有序的不可更改可在末尾不断追加的 消息序列

分区优势

  • 允许Kafka处理超过一台服务器容量的日志规模

  • 分区作为并行处理基本单元,允许Kafka进行并行处理

  • 通过保证每个分区仅仅由一个消费者消费,可以保证同一 分区内消息消费的有序

    • 由于可以设置很多分区,仍然可以保证在不同消费者之间 实现负载均衡
    • 分区内外保证消息有序、数据分区处理对大部分实际应用 已经足够

分区管理

每个分区由单独的(一组)服务器处理,负责该分区数据管理、消息 请求,支持多个副本以支持容错

  • 每个分区中有一台服务器作为leader、若干服务器作为follower

  • 领导者负责分区读、写请求,跟随者以被动的方式领导者数据 进行复制

  • 领导者失败,则追随者之一在Zookeeper协调下成为新领导者

  • 为保证负载均衡,每个服务器担任部分分区领导者、其他分区 追随者

存储布局

Kafka存储布局非常简单

kafka_storage

分区存储

  • 话题每个分区对应一个逻辑日志

  • 每个日志为相同的大小的一组分段文件

  • 生产者发布的消息被代理追加到对应分区最后一个段文件中

  • 发布消息数量达到设定值、经过一段时间后,段文件真正写入 磁盘,然后公开给消费者

Offset

分区中每个消息的Sequential ID Number(Offset),唯一标识 分区中消息,并没有明确的消息ID

  • 偏移量是增量的但不连续,下个消息ID通过在其偏移量加上 消息长度得到

  • 偏移量标识每个消费者目前处理到某分区消息队列的位置, 对分区消息队列处理依赖于其(消息通过日志偏移量公开)

  • 偏移量由消费者控制,所以消费者可以以任何顺序消费消息

    • 可以回推偏移量重复消费消息
    • 设计消费者仅仅查看分区末尾若干消息,不改变消息, 其他消费者可以正常的消费

从消息分区机制、消费者基于偏移量消费机制,可以看出Kafka消息 消费机制不会对集群、其他消费者造成影响

适合场景

  • Messaging:消息传递,作为传递消息队列(ActiveMQ、 RabbitMQ等)替代品,提供高吞吐能力、高容错、低延迟

  • Website Activity Tracking:网站活动跟踪,要求系统必须 快速处理产生消息

  • Metric:度量,把分布式各个应用程序的运营数据集中,进行 汇总统计

  • Streaming Processing:流数据处理

  • Event Sourcing:事件溯源,把应用程序状态变化以时间顺序 存储,需要支持大量数据

  • Commit Log:日志提交,作为分布式系统提交日志的外部存储 服务

Storm

Storm是分布式、高容错的实时流数据处理的开源系统

  • Storm为流数据处理设计,具有很高的容错性
  • Storm保证每个消息只能得到一次完整处理,任务失败时会负责 从消息源重试消息,从而支持可靠的消息处理
  • 可以通过实现Storm通讯协议,提供其他语言支持

Storm架构

storm_structure

  • 主节点的运行Nimbus守护进程

    • 分配代码
    • 布置任务
    • 故障检测
  • 工作节点运行Supervisor守护进程

    • 监听、开始、终止工作进程
  • Nimbus、Supervisor都是无状态的(不负责维护客户端两次调用 之间状态维护)

    • 这使得两者十分健壮
    • 两者之间的协调由Zookeeper完成
  • Storm在ZeorMQ内部传递消息

Nimbus

Supervisor

Worker

Storm编程模型

Stream

数据流:没有边界的tuple序列

  • 这些tuple以分布式的方式,并行的创建、处理

Topology

计算拓扑:实时计算应用程序处理逻辑封装成的Topology对象

  • 相当于Mapreduce作业,但是MapReduce作业最终会结束、而 Topology会一直运行直到被杀死
  • Topology由Spout、Bolt组成

Spout

消息源:消息tuple生产者

  • 消息源可以是可靠的、不可靠的
  • 可靠的消息源可在tuple没有被storm成功处理时,可以重新发送
  • 不可靠的消息源则在发送tuple之后彻底丢弃

Bolt

消息处理者:封装所有的消息处理逻辑

  • Bolt可以做很多事情,包括过滤、聚集
  • Bolt一般数据处理流程
    • 处理一个输入tuple,发送0个、多个tuple
    • 调用ack接口,通知storm子集已经处理过了

Task、Executor

Topology每个Spout、Bolt转换为若干个任务在整个集群里执行

  • 默认情况下,每个Task对应一个线程Executor,线程用于执行 task
  • 同一个Spout/Bolt里的Task共享一个物理线程

Stream Grouping

数据分发策略:定义Spout、Bolt间Tasks的数据分发

  • Shuffle Grouping:洗牌式分组,上游Spout数据流tuples随机 分发到下游Bolt的Task

  • Fields Grouping:按指定字段进行分组

  • All Grouping:Spout数据tuple分发给所有下Bolt

  • Global Grouping:Spout数据tuple分发给最小id的task

  • Non-Grouping:类似shuffle Grouping,把具有Non-Grouping 设置Bolt推到其订阅的上游Spout、Bolt

  • Direct Grouping:tuple生产者决定接收tuple下游bolt中的task

  • Local or Shuffle Grouping:如果目标bolt中由一个或多个 task工作在同一进程中,tuple分配给这些task,否则同洗牌式 分组

  • Partial Key Grouping:类似Fields Grouping,但是在下游 Bolt中做负载均衡,提高资源利用率

消息处理保证

Storm追踪由每个SpoutTuple产生的Tuple树

  • 每个从Spout发出tuple,可能会生成成千上万个tuple

    • 根据血缘关系形成一棵tuple树
    • 当tuple树中所有节点都被成功处理了,才说明tuple被完全 处理
  • 每个Topology都有一个消息超时设置,如果Storm在时间内无法 检验tuple树是否完全执行,该tuple标记为执行失败,之后重发

重发

Hadoop概述

  • Hadoop(核心):HDFSMapReduce/YARN
  • Hadoop家族:建立在Hadoop基础上的一系列开源工具

hadoop_relations

Hadoop

HadoopApache的一个分布式计算、java语言实现的开源框架, 实现在大量计算机组成的集群中对海量数据进行分布式计算。相比于 依赖硬件的可靠性,Hadoop被设计为可以检测、处理应用层面的 failures,能够提供构建于电脑集群上的可靠服务。

HadoopApache的分布式计算开源框架,提供分布式文件系统 HDFSMapReduce/YARN分布式计算的软件架构

Hadoop Common

支持其它Hadoop模块的公用组件

Hadoop Distributed File System(HDFS)

虚拟文件系统,让整个系统表面上看起来是一个空间,实际上是很多 服务器的磁盘构成的

Hadoop YARN

Yet Another Resource Negotiator,通用任务、集群资源分配框架 ,面向Hadoop的编程模型

  • YARN将classic/MapReduce1中Jobtracker职能划分为多个独立 实体,改善了其面临的扩展瓶颈问题

  • YARN比MapReduce更具一般性,MapReduce只是YARN应用的一种 形式,可以运行Spark、Storm等其他通用计算框架

  • YARN精妙的设计可以让不同的YARN应用在同一个集群上共存, 如一个MapReduce应用可以同时作为MPI应用运行,提高可管理性 和集群利用率

Hadoop MapReduce

YARN基础上的大数据集并行处理系统(框架)

  • 包括两个阶段

    • Map:映射
    • Reduce:归一
  • 在分布式系统上进行计算操作基本都是由Map、Reduce概念步骤 组成

    • 分布式系统,不像一般的数据库、文件系统,无法从上至下 、从头到尾进行求和等操作
    • 需要由分散的节点不断向一个点聚拢的计算过程
  • 不适合实时性要求的应用,只适合大数据离线处理

Apache下Hadoop相关项目

高频

Ambari

用于部署(供应)、管理、监控Hadoop集群的Web工具

  • 支持HDFSMapReduceHiveHCatalogHBaseOozieZooKeeperPigSqoop

  • 提供dashboard用于查看集群健康程度,如:热度图

  • 能够直观的查看MapReducePigHive应用特点,提供 易用的方式考察其执行情况

HBase

Hadoop项目子项目,高可靠、高性能、面向列、可伸缩的分布式 存储系统

  • 该技术源于Fay Chang撰写的Google论文《Bigtable:一个 结构化数据的分布式存储系统》,类似于Bigtable在Google 文件系统上提供的分布式数据存储一样,HBaseHadoop的 基础上提供了类似于Bigtable的能力

  • 适合非结构化数据存储

  • 可用于在廉价PC Server上搭建大规模结构化存储集群,是 NoSQL数据库的两个首选项目(MongoDB

Hive

基于Hadoop的数据仓库工具

  • Hive中建立表,将表映射为结构化数据文件

  • 可以通过类SQL语句直接查询数据实现简单的MapReduce统计, 而不必开发专门的MapReduce应用

    • Hive会将SQL语句转换为MapReduce任务查询Hadoop
    • 速度很慢
    • 适合数据仓库的统计分析
    • 支持SQL语法有限

Pig

基于Hadoop的大规模数据高层分析工具(类似于Hive

  • 提供SQL-Like语言PigLatin

    • 其编译器会把类SQL的数据分析请求,转换为一系列经过 优化处理的MapReduce运算

    • 是一种过程语言,和Hive中的类SQL语句相比,更适合写 脚本,而Hive的类SQL语句适合直接在命令行执行

Zookeeper

Hadoop正式子项目,针对大型分布式应用设计的分布式、开源协调 系统

  • 提供功能:配置维护、名字服务、分布式同步、组服务

  • 封装好复杂、易出错的关键服务,提供简单易用、功能稳定、 性能高效的接口(系统),解决分布式应用中经常遇到的数据 管理问题,简化分布式应用协调及管理难度,提供高性能分布式 服务

  • 通常为HBase提供节点间的协调,部署HDFSHA模式时是 必须的

Spark

基于内存计算的开源集群计算系统,目的是让数据分析更加快速

低频

Mahout

基于Hadoop的机器学习、数据挖掘的分布式框架

  • 使用MapReduce实现了部分数据挖掘算法,解决了并行挖掘问题

    • 包括聚类、分类、推荐过滤、频繁子项挖掘
  • 通过使用Hadoop库,Mahout可以有效扩展至云端

Cassandra

开源分布式NoSQL数据库系统,最初由Facebook开发,用于存储 简单格式数据,集Google BigTable数据模型和Amazon Dynamo 的完全分布式架构于一身

Avro

数据序列化系统,设计用于支持数据密集型、大批量数据交换应用, 是新的数据序列化格式、传输工具,将逐步取代Hadoop原有的 IPC机制

Chukwa

用于监控大型分布式系统的开源数据收集系统,可以将各种类型的 数据收集成适合Hadoop处理的文件,保存在HDFS中供MapReduce 操作

Tez

基于YARN的泛用数据流编程平台

  • 提供强力、灵活的引擎用于执行任何DAG任务,为批处理和 交互用例处理数据

Tez正逐渐被HivePigHadoop生态框架采用,甚至被一些 商业公司用于替代MapReduce作为底层执行引擎

其他Hadoop相关项目

高频

Sqoop

用于将Hadoop和关系型数据库中数据相互转移的开源工具

  • 可以将关系型数据库(MySQLOraclePostgres)中 数据转移至HadoopHDFS

  • 也可以将HDFS的数据转移进关系型数据库中

Impala

Cloudera发布的实时查询开源项目

  • 模仿Google Dremel

  • 称比基于MapReduceHive SQL查询速度提升3~30倍,更加 灵活易用

Phoenix

apache顶级项目,在HBase上构建了一层关系型数据库,可以用 SQL查询HBase数据库,且速度比Impala更快,还支持包括 二级索引在内的丰富特性,借鉴了很多关系型数据库优化查询方法

Oozie

工作流引擎服务器,用于管理、协调运行在Hadoop平台 (HDFSPigMapReduce)的任务

Cloudera Hue

基于Web的监控、管理系统,实现对HDFSMapReduce/YARNHBaseHivePigWeb化操作和管理

低频

Hama

基于HDFSBSP(Bulk Synchronous Parallel)并行 计算框架,可以用包括图、矩阵、网络算法在内的大规模、 大数据计算

Flume

分布的、可靠的、高可用的海量日志聚合系统,可用于日志数据 收集、处理、传输

Giraph

基于Hadoop的可伸缩的分布式迭代图处理系统,灵感来自于BSPGoogle Pregel

Crunch

基于Google FlumeJava库编写的Java库,用于创建MapReduce 流水线(程序)

  • 类似于HivePig,提供了用于实现如连接数据、执行聚合 、排序记录等常见任务的模式库

    • 但是Crunch不强制所有输入遵循同一数据类型

    • 其使用一种定制的类型系统,非常灵活,能直接处理复杂 数据类型,如:时间序列、HDF5文件、HBase、序列化 对象(protocol bufferAvro记录)

  • 尝试简化MapReduce的思考方式

    • MapReduce有很多优点,但是对很多问题,并不是合适的 抽象级别

    • 出于性能考虑,需要将逻辑上独立的操作(数据过滤、投影 、变换)组合为一个物理上的MapReduce操作

Whirr

运行于云服务的类库(包括Hadoop),提供高度互补性

  • 相对中立
  • 支持AmazonEC2Rackspace的服务

Bigtop

Hadoop及其周边生态打包、分发、测试的工具

HCatalog

基于Hadoop的数据表、存储管理,实现中央的元数据、模式管理, 跨越HadoopRDBMS,利用PigHive提供关系视图

Llama

让外部服务器从YARN获取资源的框架

CDH组件

Fuse

HDFS系统看起来像普通文件系统

Hadoop Streamin

MapReduce代码其他语言支持,包括:C/C++PerlPythonBash

MapReduce YARN

MapReduce1.0组件

MapReduce1.0是指Hadoop1.0中组件,不是指MapReduce计算模型

优势

  • 方便扩展:能够运行在普通服务器构成的超大规模集群上

  • IO瓶颈:通过将IO分散在大规模集群的各个节点上,可以提高 数据装载速度(大规模数据时只有部分数据可以状态在内存中)

局限

  • MapReduce计算模型问题(参见MapReduce计算模型

  • 数据处理延迟大

    • MapReduce作业在Map阶段、Reduce阶段执行过程中,需要 把中间结果存盘
    • 在MR作业间也需要通过磁盘实现作业间的数据交换
  • 资源利用率低

    • 任务调度方法远未达到优化资源利用率的效果,给每个 TaskTracker分配任务的过程比较简单

资源分配

  • 每个TaskTracker拥有一定数量的slots,每个活动的Map、 Reduce任务占用一个slot

  • JobTracker把任务分配给最靠近数据、有slot空闲TT

  • 不考虑Task运算量大小,所有Task视为相同,如果有某个TT 当前负载过高,会影响整体的执行

  • 也可以通过Speculative Execution模式,在多个slave上 启动同一个任务,只要其中有一个任务完成即可

执行引擎

MapReduce执行引擎运行在HDFS上

  • JobTracker:运行在NameNode上

    • 分解客户端提交的job为数据处理tasks,分发给集群里相关 节点上的TaskTacker运行
    • 发送任务原则:尽量把任务推送到离数据最近的节点上, 甚至推送到数据所在的节点上运行
  • TaskTracker:运行在DataNode上

    • 在节点上执行数据处理map、reduce tasks
    • 可能需要从其他DataNode中获取需要数据

MapRedudce2.0

Shuffle

Shuffle:系统执行排序的过程

  • 为了确保MapReduce的输入是按键排序的

Map端

每个Map Task都有一个内存缓冲区用于存储map输出结果,缓冲区 快满时需要将缓冲区数据以临时文件方式存放到磁盘中,整个Task 结束后再对此Map Task产生所有临时作合并,生成最终正式输出文件 ,等待Reduce Task拉数据

YARN

Yet Another Resource Negotiator,通用任务、集群资源分配框架 ,面向Hadoop的编程模型

YARN优势

扩展性

  • YARN将classic/MapReduce1中Jobtracker职能划分为多个独立 实体,改善了其面临的扩展瓶颈问题

  • MapReduce现在只是批数据处理框架,是YARN支持的数据处理 框架的一种,独立于资源管理层,单独演化、改进

  • YARN精妙的设计可以让不同的YARN应用在同一个集群上共存, 如一个MapReduce应用可以同时作为MPI应用运行,提高可管理性 、集群利用率

高效率

  • ResourceManager是单独的资源管理器

  • Job Scheduler指负责作业调度

  • 根据资源预留要求、公平性、Service Level Agreement等标准 ,优化整个集群的资源利用

一般性

YARN是通用资源管理框架,在其上可以搭建多种数据处理框架

  • 批处理:MapReduce
  • 交互式处理:Tez
  • 迭代处理:Spark
  • 实时流处理:Storm
  • 图数据处理:GraphLab/Giraph

YARN中实体

ResourceManager

RM物理上对应主节点,逻辑上管理集群上的资源使用,其功能由 Scheduler、ApplicationManager协调完成

  • AppplicatonManager:接受、监控任务

    • 接受客户端提交的job

    • 判断启动该job的ApplicationMaster所需的资源

    • 监控ApplicationMaster的状态,并在失败时重启其

  • Scheduler:分配资源、调度

    • Schedular计算启动ApplicationManager提交的job的AM所需 资源,将资源封装成Container

    • 然后根据调度算法调度,在某个NM上启动job的AM

    • 不提供失败重启、监控功能

    • Scheduler收到AM任务完成汇报之后,回收资源、向RM返回 执行结果

    • 调度算法可自定以,YARN根据不同场景提供

      • FIFO Scheduler
      • Capacity Scheduler
      • Fair Scheduler

NodeManager

NM物理上对应计算节点,逻辑上监控、管理当前节点资源

  • 仅仅抽象本节点资源(cpu、内存、磁盘、网络等),并且定时 像RM的Scheduler汇报

  • 接受并处理AM的tasks启动、停止等请求

ApplicationMaster

AM管理集群上运行任务生命周期

  • 每个job都有一个专用的AM

  • AM启动后会计算job所需资源,并向Scheduler申请资源

  • AM运行在job运行期间,负责整个job执行过程的监控

    • NM分配完任务container后,AM开始监控这些containers 、tasks状态
    • 任务失败则回收资源重新生成
    • 成功则释放资源
    • 任务执行完毕后回报Scheduler

Containers

YARN为将来的资源隔离提出的框架,是一组资源的集合,每个task 对应一个container,只能在container中运行

  • 容器有特定的内存分配范围

    • 容器内存最小值即为内存分配单位,内存最大值也应该是 内存分配单位整数倍

    • 根据任务所需资源多少分配给容器整数倍内存单位,但是 如果任务所需内存大于容器内存最大值,运行时可能会报错

  • 由NM确保task使用的资源不会超过分配的资源

  • 注意

    • AM并不运行于container中,真正的task才运行在container

yarn_procedure

Job运行过程

作业提交

  • 从RM获取新的作业ID
  • 作业客户端检查作业输出说明,计算输入分片(也可以配置 在集群中产生分片)
  • 将作业资源复制到HDFS
  • 调用RM上的submitApplication方法提交作业

作业初始化

  1. RM收到调用submitApplication消息后,将请求传递给 内部scheduler,scheduler分配一个container
  2. NM在RM的管理下在容器中启动应用程序的master进程AM, 其对作业进行初始化
  3. AM创建多个簿记对象用于接受任务进度、完成报告,保持 对作业进度的跟踪
  4. AM接受来自共享文件系统的在客户端计算的输入分片,对 每个分片创建一个map对象,及由mapreduce.job.reduces 属性确定的多个reduce任务对象
  5. AM根据任务大小决定如何运行job,如果在新容器中分配、 运行任务的开销大于并行运行时的开销,AM会在单个节点 上运行,这样的作业称为uberized
  6. AM在任何tasks执行之前通过job的setup方法设置job的 OutputCommiter,建立作业输出目录

任务分配

  1. 若作业不适合作为uber任务运行,AM为该作业中所有map 、reduce任务向RM请求容器
  2. 请求附着heart beat,包括每个map任务的数据本地化信息 ,特别是输入分片所在的主机、机架信息,scheduler据此 做调度决策
    • 理想化情况下任务分配到数据本地化节点
    • 否则优先使用机架本地化
  3. 请求同时指定了任务内存需求,YARN中的资源分为更细粒度 ,task可以请求最小到最大限制范围、任意最小值倍数的 内存容量

任务执行

  1. 当NM的scheduler为task分配了container,AM就可以通过 与NM通信启动容器
  2. 任务由YarnChild执行,在执行任务之前,需要将任务 所需资源本地化,包括作业的配置、JAR文件、所有来自 分布式缓存的文件,然后运行map、reduce任务
  3. 对于Streaming、Pipes程序,YarnChild启动Streaming、 Pipes进程,使用标准输入输出、socket与其通信(以 MapReduce1方式运行)

进度和状态更新

  1. task每3s通过umbilical接口向AM汇报进度、状态(包括 计数器),作为job的aggregate view
  2. 客户端则默认没1s查询AM接受进度更新

作业完成

  1. 客户端每5s通过调用job的waitForCompletion检查作业 是否完成,也可以通过HTTP callback完成作业
  2. 作业完成后AM和task容器清理工作状态,OutputCommiter 作业清理方法被调用

todo:这里逻辑有问题,要删

MapReduce计算模型

  • 分布式系统,不像一般的数据库、文件系统,无法从上至下 、从头到尾进行求和等操作
  • 需要由分散的节点不断向一个点聚拢的计算过程,即分布式系统 上计算模型基本都是由map、reduce步骤组成

MapReduce

MapReduce每步数据处理流程包括两个阶段

  • Map:映射

    • map过程相互独立、各mapper见不通信,所以mapreduce 只适合处理独立计算的任务
  • Reduce:归一

    • reduce直接处理map的输出,reduce的为map输出

数据处理过程

  • 需要把任何计算任务转换为一系列MapReduce作业,然后依次 执行这些作业

  • 计算过程的各个步骤之间,各个作业输出的中间结果需要存盘, 然后才能被下个步骤使用(因为各个步骤之间没有明确流程)

  • One Pass Computation:只需要一遍扫描处理的计算任务 MapReduce计算模型非常有效

  • Multi Pass Computation:需要在数据上进行多遍扫描、处理 的计算任务,需要执行多个MapReduce作业计算任务,因为 多副本复制、磁盘存取,其效率不高

Mapred on DAG

Directed Acyclic Graph;表示数据处理流程的有向无环图

  • 顶点:数据处理任务,反映一定的业务逻辑,即如何对数据进行 转换和分析
  • 边:数据在不同的顶点间的传递

比较

比较方面 MapReduce DAG
操作原语 map、reduce 较多
抽象层次
表达能力
易用性 要手动处理job之间依赖关系,易用性差 DAG本身体现数据处理流程
可读性 处理逻辑隐藏在代码中,没有整体逻辑 较好
  • 正是MapReduce提供操作原语少、抽象层次低,所以其表达能力 差,同时需要用户处理更多的逻辑,易用性、可读性差

    • 复杂数据处理任务,如:机器学习算法、SQL连接查询很难 表示用MapReduce计算默认表达

    • 操作原语多并不是DAG本身的要求,DAG本身只是有向无环图, 只是使用DAG计算模型可以提供更多的操作原语

  • 由于DAG的表达能力强于MapReduce,对某些处理逻辑,DAG 所需作业数目小于MapReduce,消除不必要的任务

    • DAG显著提高了数据处理效率,对小规模、低延迟和大规模、 高吞吐量的负载均有效

    • MapReduce需要通过把中间结果存盘实现同步,而DAG整合 部分MapReduce作业,减少磁盘I/O

    • reduce任务需要等待map任务全部完成才能继续,DAG优化 数据处理流程,减少同步Barrier

  • DAG部分计算模型也由map、reduce任务构成,只是不像传统 MapReduce计算模型中map、reduce必须成对出现

    • 或者说DAG只有一次map任务(扫描数据时),其余都是 reduce任务?

    • 从MapReduce配置也可以看出,MapReduce可以选择基于 yarnyarn-tez

Tez

Tez简介

Tezm目标就是建立执行框架,支持大数据上DAG表达的作业处理

  • YARN将资源管理功能从数据处理模型中独立出来,使得在Hadoop 执行DAG表达的作业处理成为可能,Tez成为可扩展、高效的执行 引擎

    • Tez在YARN和Hive、Pig之间提供一种通用数据处理模型DAG
    • Hive、Pig、Cascading作业在Tez上执行更快,提供交互式 查询响应
  • Tez把DAG的每个顶点建模为Input、Processer、Output模块的 组合

    • Input、Output决定数据格式、输入、输出
    • Processor包装了数据转换、处理逻辑
    • Processor通过Input从其他顶点、管道获取数据输入,通过 Output向其他顶点、管道传送生成数据
    • 通过把不同的Input、Processor、Output模块组合成顶点, 建立DAG数据处理工作流,执行特定数据处理逻辑
  • Tez自动把DAG映射到物理资源,将其逻辑表示扩展为物理表示, 并执行其中的业务逻辑

    • Tez能为每个节点增加并行性,即使用多个任务执行节点 的计算任务

    • Tez能动态地优化DAG执行过程,能够根据执行过程中获得地 信息,如:数据采样,优化DAG执行计划,优化资源使用、 提高性能

      • 去除了连续作业之间的写屏障
      • 去除了工作流中多余的Map阶段

Tez执行过程

  • 初始化例程,提供context/configuration信息给Tez runtime

  • 对每个顶点的每个任务(任务数量根据并行度创建)进行初始化

  • 执行任务Processor直到所有任务完成,则节点完成

  • Output把从Processor接收到的数据,通过管道传递给下游顶点 的Input

  • 直到整个DAG所有节点任务执行完毕

Hadoop HDFS

HDFS设计模式

  • 数据读取、写入

    • HDFS一般存储不可更新的文件,只能对文件进行数据追加 ,也不支持多个写入者的操作
    • 认为一次写入、多次读取是最高效的访问模式
    • namenode将metadata存储在内存中,所以文件系统所能 存储的文件总数受限于NameNode的内存
  • 适合模式

    • 每次分析都涉及数据集大部分数据甚至全部,因此读取整个 数据集的大部分数据、甚至全部,因此读取整个数据集的 时间比读取第一条记录时间延迟更重要
    • HDFS不适合要求地时间延迟的数据访问应用,HDFS是为 高数据吞吐量应用优化的,可能会提高时间延迟
  • 硬件:HDFS无需高可靠硬件,HDFS被设计为即使节点故障也能 继续运行,且不让用户察觉

数据块

和普通文件系统一样,HDFS同样也有块的概念,默认为64MB

  • HDFS上的文件也被划分为块大小的多个chunk,作为独立的存储 单元,但是HDFS中小于块大小的文件不会占据整个块空间

  • 对分布式文件系统块进行抽象的好处

    • 文件大小可以大于网络中任意一个磁盘的容量
    • 使用抽象块而非整个文件作为存储单元,简化了存储子系统 的设计
      • 简化存储管理,块大小固定,计算磁盘能存储块数目 相对容易
      • 消除了对元素见的顾虑,文件的元信息(权限等), 不需要和块一起存储,可由其他系统单独管理
  • 块非常适合用于数据备份,进而提供数据容错能力、提高可用性

NameNode

HDFS系统中的管理者

  • 集中存储了HDFS的元信息Metadata

    • 维护文件系统的文件树、全部的文件和文件夹的元数据
    • 管理文件系统的Namespace:创建、删除、修改、列出所有 文件、目录
  • 执行数据块的管理操作

    • 把文件映射到所有的数据块
    • 创建、删除数据块
    • 管理副本的Placement、Replication
  • 负责DataNode的成员管理

    • 接受DataNode的Registration
    • 接受DataNode周期性的Heart Beat

Hadoop上层模块,根据NameNode上的元信息,就可以知道每个数据块 有多少副本、存放在哪些节点上,据此分配计算任务,通过 Move Computation to Data,而不是移动数据本身,减少数据 移动开销,加快计算过程

Metadata的保存

为了支持高效的存取操作,NameNode把所有的元信息保存在主内存, 包括文件和数据块的命名空间、文件到数据块的映射、数据块副本 的位置信息。文件命名空间、文件到数据块的映射信息也会持久化 到NameNode本地文件系统

  • FsImage:命名空间镜像文件,保存整个文件系统命名空间、 文件到数据块的映射信息
  • EditLog:编辑日志文件,是一个Transaction Log文件,记录了 对文件系统元信息的所有更新操作:创建文件、改变文件的 Replication Factor

NameNode启动时,读取FsImage、EditLog文件,把EditLog的所有 事务日志应用到从FsImage文件中装载的旧版本元信息上,生成新的 FsImage并保存,然后截短EditLog

NameNode可恢复性

多个文件系统备份

备份文件系统元信息的持久化版本

  • 在NameNode写入元信息的持久化版本时,同步、atomic写入多个 文件系统(一般是本地磁盘、mount为本地目录的NFS)
Secondary NameNode

运行Secondary NameNode:负责周期性的使用EditLog更新 FsImage,保持EditLog在一定规模内

  • Seconadary NameNode保存FsImage、EditLog文件副本, 每个一段时间从NameNode拷贝FsImage,和EditLog文件进行 合并,然后把更新后的FsImage复制回NameNode

  • 若NameNode宕机,可以启动其他机器,从Secondary NameNode获得FsImage、EditLog,恢复宕机之前的最新的 元信息,当作新的NameNode,也可以直接作为主NameNode

  • Secondary NameNode保存的出状态总是滞后于主节点,需要 从NFS获取NameNode部分丢失的metadata

  • Secondary NameNode需要运行在另一台机器,需要和主 NameNode一样规模的CPU计算能力、内存,以便完成元信息 管理

想要从失效的NameNode恢复,需要启动一个拥有文件系统数据副本的 新NameNode,并配置DataNode和客户端以便使用新的NameNode

  • 将namespace的映像导入内存中
  • 重做编辑日志
  • 接收到足够多的来自DataNode的数据块报告,并退出安全模式

DataNode

HDFS中保存数据的节点

  • 数据被切割为多个数据块,以冗余备份的形式存储在多个 DataNode中,因此不需要再每个节点上安装RAID存储获得硬件上 可靠存储支持。DataNode之间可以拷贝数据副本,从而可以重新 平衡每个节点存储数据量、保证数据可靠性(保证副本数量)

  • DotaNode定期向NameNode报告其存储的数据块列表,以备使用者 通过直接访问DataNode获得相应数据

  • 所有NameNode和DataNode之间的通讯,包括DataNode的注册、 心跳信息、报告数据块元信息,都是由DataNode发起请求,由 NameNode被动应答和完成管理

HDFS高可用性

对于大型集群,NN冷启动需要30min甚至更长,因此Hadoop2.x中添加 对高可用性HA(high-availability)的支持

  • 配置Active-Standby NameNode

    • ANN失效后,SNN就会接管任务并开始服务,没有明显中断
    • ANN、SNN应该具有相同的硬件配置
  • NN之间需要通过高可用的共享存储(JounalNode)实现 Editlog共享

    • JN进程轻量,可以和其他节点部署在同一台机器
    • JN至少为3个,最好为奇数个,这样JN失效$(n-1)/2$个时 仍然可以正常工作
    • SNN接管工作后,将通读共享编辑日志直到末尾,实现与ANN 状态同步
  • DN需要同时向两个NN发送数据块处理报告,因为数据块映射信息 存储在NN内存中

  • 客户端需要使用特定机制处理NN失效问题,且机制对用户透明

  • 如果两个namenode同时失效,同样可以冷启动其他namenode, 此时情况就和no-HA模式冷启动类似

注意:HA模式下,不应该再次配置Secondary NameNode

Note that, in an HA cluster, the Standby NameNode also performs checkpoints of the namespace state, and thus it is not necessary to run a Secondary NameNode, CheckpointNode, or BackupNode in an HA cluster. In fact, to do so would be an error. This also allows one who is reconfiguring a non-HA-enabled HDFS cluster to be HA-enabled to reuse the hardware which they had previously dedicated to the Secondary NameNode.

Failover Controller

故障转移控制器系统中有一个新实体管理者管理namenode之间切换,

  • Failover Controller最初实现基于Zookeeper,可插拔

  • 每个namenode运行着一个Failover Controller,用于监视宿主 namenode是否失效(heart beat机制), 并在失效时进行故障 切换

    • 管理员也可以手动发起故障切换,称为平稳故障转移
  • 在非平稳故障切换时,无法确切知道失效namenode是否已经停止 运行,如网速慢、网络切割均可能激发故障转移,引入fencing 机制

    • 杀死namenode进程
    • 收回对共享存储目录权限
    • 屏蔽相应网络端口
    • STONITH:shoot the other node in the head,断电

联邦HDFS

NameNode在内存中保存文件系统中每个文件、数据块的引用关系, 所以对于拥有大量文件的超大集群,内存将成为系统扩展的瓶颈, 2.x中引入的联邦HDFS可以添加NameNode实现扩展

  • 每个NameNode维护一个namespace volume,包括命名空间的 元数据、命令空间下的文件的所有数据块、数据块池
  • namespace volume之间相互独立、不通信,其中一个NameNode 失效也不会影响其他NameNode维护的命名空间的可用性
  • 数据块池不再切分,因此集群中的DataNode需要注册到每个 NameNode,并且存储来自多个数据块池的数据块

Hadoop文件系统

Hadoop有一个抽象问的文件系统概念,HDFS只是其中的一个实现, Java抽象类org.apche.hadoop.fs.FileSystem定义了Hadoop中的 一个文件系统接口,包括以下具体实现

文件系统 URI方案 Java实现 描述
Local file fs.LocalFileSystem 使用客户端校验和本地磁盘文件系统,没有使用校验和文件系统RawLocalFileSystem
HDFS hdfs hdfs.DistributedFileSystem HDFS设计为与MapReduce结合使用实现高性能
HFTP hftp hdfs.HftpFileSystem 在HTTP上提供对HDFS只读访问的文件系统,通常与distcp结合使用,以实现在运行不同版本HDFS集群之间复制数据
HSFTP hsftp hdfs.HsftpFileSystem 在HTTPS上同以上
WebHDFS Webhdfs hdfs.web.WebHdfsFileSystem 基于HTTP,对HDFS提供安全读写访问的文件系统,为了替代HFTP、HFSTP而构建
HAR har fs.HarFileSystem 构建于其他文件系统之上,用于文件存档的文件系统,通常用于需要将HDFS中的文件进行存档时,以减少对NN内存的使用
hfs kfs fs.kfs.kosmosFileSystem CloudStore(前身为Kosmos文件系统)类似于HDFS(GFS),C++编写
FTP ftp fs.ftp.FTPFileSystem 由FTP服务器支持的文件系统
S3(原生) S3n fs.s3native.NativeS3FileSystem 由Amazon S3支持的文件系统
S3(基于块) S3 fs.sa.S3FileSystem 由Amazon S3支持的文件系统,以块格式存储文件(类似于HDFS),以解决S3Native 5GB文件大小限制
分布式RAID hdfs hdfs.DistributedRaidFileSystem RAID版本的HDFS是为了存档而设计的。针对HDFS中每个文件,创建一个更小的检验文件,并允许数据副本变为2,同时数据丢失概率保持不变。需要在集群中运行一个RaidNode后台进程
View viewfs viewfs.ViewFileSystem 针对其他Hadoop文件系统挂载的客户端表,通常用于联邦NN创建挂载点

文件系统接口

Hadoop对文件系统提供了许多接口,一般使用URI方案选择合适的 文件系统实例进行交互

命令行接口

1
2
3
4
5
6
7
8
$ hadoop fs -copyFromLocal file hdfs://localhost/user/xyy15926/file
# 调用Hadoop文件系统的shell命令`fs`
# `-copyFromLocalFile`则是`fs`子命令
# 事实上`hfds://localhost`可以省略,使用URI默认设置,即
# 在`core-site.xml`中的默认设置
# 类似的默认复制文件路径为HDFS中的`$HOME`

$ hadoop fs -copyToLocal file file

HTTP

  • 直接访问:HDFS后台进程直接服务来自于客户端的请求

    • 由NN内嵌的web服务器提供目录服务(默认50070端口)
    • DN的web服务器提供文件数据(默认50075端口)
  • 代理访问:依靠独立代理服务器通过HTTP访问HDFS

    • 代理服务器可以使用更严格的防火墙策略、贷款限制策略

C

Hadoop提供libhdfs的C语言库,是Java FileSystem接口类的 镜像

  • 被写成访问HDFS的C语言库,但其实可以访问全部的Hadoop文件 系统
  • 使用Java原生接口(JNI)调用Java文件系统客户端

FUSE

Filesystem in Userspace允许把按照用户空间实现的文件系统整合 成一个Unix文件系统

  • 使用Hadoop Fuse-DFS功能模块,任何一个Hadoop文件系统可以 作为一个标准文件系统进行挂载
    • Fuse_DFS使用C语言实现,调用libhdfs作为访问HDFS的接口
  • 然后可以使用Unix工具(lscat等)与文件系统交互
  • 还可以使用任何编程语言调用POSIX库访问文件系统

读文件

  1. 客户端程序使用要读取的文件名、Read Range的开始偏移量、 读取范围的程度等信息,询问NameNode

  2. NameNode返回落在读取范围内的数据块的Location信息,根据 与客户端的临近性(Proximity)进行排序,客户端一般选择 最临近的DataNode发送读取请求

具体实现如下

  1. 客户端调用FileSystem对象open方法,打开文件,获得 DistributedFileSystem类的一个实例

  2. DistributedFileSystem返回FSDataInputStream类的实例, 支持文件的定位、数据读取

    • DistributedFileSystem通过RPC调用NameNode,获得 文件首批若干数据块的位置信息(Locations of Blocks)
    • 对每个数据块,NameNode会返回拥有其副本的所有DataNode 地址
    • 其包含一个DFSInputStream对象,负责管理客户端对HDFS 中DataNode、NameNode存取
  3. 客户端从输入流调用函数read,读取文件第一个数据块,不断 调用read方法从DataNode获取数据

    • DFSInputStream保存了文件首批若干数据块所在的 DataNode地址,连接到closest DataNode
    • 当达到数据块末尾时,DFSInputStream关闭到DataNode 的连接,创建到保存其他数据块DataNode的连接
    • 首批数据块读取完毕之后,DFSInputStream向NameNode 询问、提取下一批数据块的DataNode的位置信息
  4. 客户端完成文件的读取,调用FSDataInputStream实例close 方法关闭文件

写文件

  • 客户端询问NameNode,了解应该存取哪些DataNode,然后客户端 直接和DataNode进行通讯,使用Data Transfer协议传输数据, 这个流式数据传输协议可以提高数据传输效率

  • 创建文件时,客户端把文件数据缓存在一个临时的本地文件上, 当本地文件累计超过一个数据块大小时,客户端程序联系 NameNode,NameNode更新文件系统的NameSpace,返回Newly Allocated数据块的位置信息,客户端根据此信息本文件数据块 从临时文件Flush到DataNode进行保存

具体实现如下:

  1. 客户端调用DistributedFileSystemcreate方法

    • DistributedFileSystem通过发起RPC告诉NameNode在 其NameSpace创建一个新文件,此时新文件没有任何数据块
    • NameNode检查:文件是否存在、客户端权限等,检查通过 NameNode为新文件创建新记录、保存其信息,否则文件创建 失败
  2. DistributedFileSystem返回FSDataOutputStream给客户端

    • 其包括一个DFSOutputStream对象,负责和NameNode、 DataNode的通讯
  3. 客户端调用FSDataOutputStream对象write方法写入数据

    • DFSOutputStream把数据分解为数据包Packet,写入内部 Data Queue
    • DataSteamer消费这个队列,写入本地临时文件中
    • 当写入数据超过一个数据块大小时,DataStreamer请求 NameNode为新的数据块分配空间,即选择一系列合适的 DataNode存放各个数据块各个副本
    • 存放各个副本的DataNode形成一个Pipeline,流水线上的 Replica Factor数量的DataNode接收到数据包之后转发给 下个DataNode
    • DFSOutputStream同时维护数据包内部Ack Queue,用于 等待接受DataNode应答信息,只有某个数据包已经被流水线 上所有DataNode应答后,才会从Ack Queue上删除
  4. 客户端完成数据写入,调用FSDataOutputStreamclose 方法

    • DFSOutputStream把所有的剩余的数据包发送到DataNode 流水线上,等待应答信息
    • 最后通知NameNode文件结束
    • NameNode自身知道文件由哪些数据块构成,其等待数据块 复制完成,然后返回文件创建成功

Hadoop平台上的列存储

列存储的优势

  • 更少的IO操作:读取数据的时候,支持Prject Pushdown,甚至 是Predicate Pushdown,可大大减少IO操作

  • 更大的压缩比:每列中数据类型相同,可以针对性的编码、压缩

  • 更少缓存失效:每列数据相同,可以使用更适合的Cpu Pipline 编码方式,减少CPU cache miss

RCFile

Record Columnar File Format:FB、Ohio州立、中科院计算所合作 研发的列存储文件格式,首次在Hadoop中引入列存储格式

  • 允许按行查询,同时提供列存储的压缩效率的列存储格式

    • 具备相当于行存储的数据加载速度、负载适应能力
    • 读优化可以在扫描表格时,避免不必要的数据列读取
    • 使用列维度压缩,有效提升存储空间利用率
  • 具体存储格式

    • 首先横向分割表格,生成多个Row Group,大小可以由用户 指定
    • 在RowGroup内部,按照列存储一般做法,按列把数据分开, 分别连续保存
      • 写盘时,RCFile针对每列数据,使用Zlib/LZO算法进行 压缩,减少空间占用
      • 读盘时,RCFile采用Lazy Decompression策略,即用户 查询只涉及表中部分列时,会跳过不需要列的解压缩、 反序列化的过程

ORC存储格式

Optimized Row Columnar File:对RCFile优化的存储格式

  • 支持更加丰富的数据类型

    • 包括Date Time、Decimal
    • Hive的各种Complex Type,包括:Struct、List、Map、 Union
  • Self Describing的列存储文件格式

    • 为Streaming Read操作进行了优化
    • 支持快速查找少数数据行
  • Type Aware的列存储文件格式

    • 文件写入时,针对不同的列的数据类型,使用不同的编码器 进行编码,提高压缩比
      • 整数类型:Variable Length Compression
      • 字符串类型:Dictionary Encoding
  • 引入轻量级索引、基本统计信息

    • 包括各数据列的最大/小值、总和、记录数等信息
    • 在查询过程中,通过谓词下推,可以忽略大量不符合查询 条件的记录

文件结构

一个ORC文件由多个Stripe、一个包含辅助信息的FileFooter、以及 Postscript构成

Stripe

每个stripe包含index data、row data、stripe footer

  • stripe就是ORC File中划分的row group

    • 默认大小为256MB,可扩展的长度只受HDFS约束
    • 大尺寸的strip、对串行IO的优化,能提高数据吞吐量、 读取更少的文件,同时能把减轻NN负担
  • Index Data部分

    • 包含每个列的极值
    • 一系列Row Index Entry记录压缩模块的偏移量,用于跳转 到正确的压缩块的位置,实现数据的快速读取,缺省可以 跳过10000行
  • Row Data部分;包含每个列的数据,每列由若干Data Stream 构成

  • Stripe Footer部分

    • Data Stream位置信息
    • 每列数据的编码方式

包含该ORCFile文件中所有stripe的元信息

  • 每个Stripe的位置
  • 每个Stripe的行数
  • 每列的数据类型
  • 还有一些列级别的聚集结果,如:记录数、极值、总和
Postscript
  • 用来存储压缩参数
  • 压缩过后的Footer的长度

Parquet

灵感来自于Google关于Drenel系统的论文,其介绍了一种支持嵌套 结构的列存储格式,以提升查询性能

支持

Parquet为hadoop生态系统中的所有项目,提供支持高压缩率的 列存储格式

  • 兼容各种数据处理框架

    • MapReduce
    • Spark
    • Cascading
    • Crunch
    • Scalding
    • Kite
  • 支持多种对象模型

    • Avro
    • Thrift
    • Protocol Buffers
  • 支持各种查询引擎

    • Hive
    • Impala
    • Presto
    • Drill
    • Tajo
    • HAWQ
    • IBM Big SQL

Parquet组件

  • Storage Format:存储格式,定义了Parquet内部的数据类型、 存储格式

  • Object Model Converter:对象转换器,由Parquet-mr实现, 完成对象模型与Parquet数据类型的映射

    • 如Parquet-pig子项目,负责把内存中的Pig Tuple序列化 并按存储格式保存为Parquet格式的文件,已经反过来, 把Parquet格式文件的数据反序列化为Pig Tuple
  • Object Model:对象模型,可以理解为内存中的数据表示,包括 Avro、Thrift、Protocal Buffer、Hive Serde、Pig Tuple、 SparkSQL Internal Row等对象模型

Parquet数据schema

数据schema(结构)可以用一个棵树表达

  • 有一个根节点,根节点包含多个Feild(子节点),子节点可以 包含子节点

  • 每个field包含三个属性

    • repetition:field出现的次数

      • required:必须出现1次
      • optional:出现0次或1次
      • repeated:出现0次或多次
    • type:数据类型

      • primitive:原生类惬
      • group:衍生类型
    • name:field名称

  • Parquet通过把多个schema结构按树结构组合,提供对复杂类型 支持

    • List、Set:repeated field
    • Map:包含键值对的Repeated Group(key为Required)
  • schema中有多少叶子节点,Parquet格式实际存储多少列, 父节点则是在表头组合成schema的结构

Parquet文件结构

  • HDFS文件:包含数据和元数据,数据存储在多个block中
  • HDFS Block:HDFS上最小的存储单位
  • Row Group:按照行将数据表格划分多个单元,每个行组包含 一定行数,行组包含该行数据各列对应的列块
    • 一般建议采用更大的行组(512M-1G),此意味着更大的 列块,有毅力在磁盘上串行IO
    • 由于可能依次需要读取整个行组,所以一般让一个行组刚好 在一个HDFS数据块中,HDFS Block需要设置得大于等于行组 大小
  • Column Chunk:每个行组中每列保存在一个列块中
    • 行组中所有列连续的存储在行组中
    • 不同列块使用不同压缩算法压缩
    • 列块存储时保存相应统计信息,极值、空值数量,用于加快 查询处理
    • 列块由多个页组成
  • Page:每个列块划分为多个Page
    • Page是压缩、编码的单元
    • 列块的不同页可以使用不同的编码方式

HDFS命令

用户

  • HDFS的用户就是当前linux登陆的用户

Hadoop组件

Hadoop Streaming

Hadoop安装配置

Hadoop安装

依赖

  • Java

  • ssh:必须安装且保证sshd一直运行,以便使用hadoop脚本管理 远端hadoop守护进程

    • pdsh:建议安装获得更好的ssh资源管理
    • 要设置免密登陆

机器环境配置

~/.bashrc

这里所有的设置都只是设置环境变量

  • 所以这里所有环境变量都可以放在hadoop-env.sh

  • 放在.bashrc中不是基于用户隔离的考虑

    • 因为hadoop中配置信息大部分放在.xml,放在这里无法 实现用户隔离
    • 更多的考虑是给hive等依赖hadoop的应用提供hadoop配置
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
export HADOOP_PREFIX=/opt/hadoop
# 自定义部分
# 此处是直接解压放在`/opt`目录下
export HADOOP_HOME=$HADOOP_PREFIX
export HADOOP_COMMON_HOME=$HADOOP_PREFIX
# hadoop common
export HADOOP_HDFS_HOME=$HADOOP_PREFIX
# hdfs
export HADOOP_MAPRED_HOME=$HADOOP_PREFIX
# mapreduce
export HADOOP_YARN_HOME=$HADOOP_PREFIX
# YARN
export HADOOP_CONF_DIR=$HADOOP_PREFIX/etc/hadoop

export HADOOP_COMMON_LIB_NATIVE_DIR=$HADOOP_HOME/lib/native
export HADOOP_OPTS="$HADOOP_OPTS -Djava.library.path=$HADOOP_COMMON_LIB_NATIVE_DIR"
# 这里`-Djava`间不能有空格

export CLASSPATH=$CLASS_PATH:$HADOOP_PREFIX/lib/*
export PATH=$PATH:$HADOOP_PREFIX/sbin:$HADOOP_PREFIX/bin

/etc/hosts

1
2
3
4
192.168.31.129 hd-master
192.168.31.130 hd-slave1
192.168.31.131 hd-slave2
127.0.0.1 localhost
  • 这里配置的ip地址是各个主机的ip,需要自行配置
  • hd-masterhd-slave1等就是主机ip-主机名映射
  • todo?一定需要在/etc/hostname中设置各个主机名称

firewalld

必须关闭所有节点的防火墙

1
2
$ sudo systemctl stop firewalld.service
$ sudo systemctl disable firewalld.service

文件夹建立

  • 所有节点都需要建立
1
2
$ mkdir tmp
$ mkdir -p hdfs/data hdfs/name

Hadoop配置

Hadoop全系列(包括hive、tez等)配置取决于以下两类配置文件

  • 只读默认配置文件

    • core-defualt.xml
    • hdfs-default.xml
    • mapred-default.xml
  • 随站点变化的配置文件

    • etc/hadoop/core-site.xml
    • etc/hadoop/hdfs-site.xml
    • etc/hadoop/mapred-site.xml
    • etc/hadoop/yarn-env.xml
  • 环境设置文件:设置随站点变化的值,从而控制bin/中的 hadoop脚本行为

    • etc/hadoop/hadoop-env.sh
    • etc/hadoop/yarn-env.sh
    • etc/hadoop/mapred-env.sh

    中一般是环境变量配置,补充在shell中未设置的环境变量

  • 注意

    • .xml配置信息可在不同应用的配置文件中继承使用, 如在tez的配置中可以使用core-site.xml${fs.defaultFS}变量

    • 应用会读取/执行相应的*_CONF_DIR目录下所有 .xml/.sh文件,所以理论上可以在etc/hadoop中存放 所以配置文件,因为hadoop是最底层应用,在其他所有应用 启动前把环境均已设置完毕???

Hadoop集群有三种运行模式

  • Standalone Operation
  • Pseudo-Distributed Operation
  • Fully-Distributed Operation

针对不同的运行模式有,hadoop有三种不同的配置方式

Standalone Operation

hadoop被配置为以非分布模式运行的一个独立Java进程,对调试有 帮助

  • 默认为单机模式,无需配置
测试
1
2
3
4
5
$ cd /path/to/hadoop
$ mkdir input
$ cp etc/hadoop/*.xml input
$ bin/hadoop jar share/hadoop/mapreduce/hadoop-mapreduce-examples-3.1.1.jar grep input output 'dfs[a-z.]+'
$ cat output/*

Pseudo-Distributed Operation

在单节点(服务器)上以所谓的伪分布式模式运行,此时每个Hadoop 守护进程作为独立的Java进程运行

core-site.xml
1
2
3
4
5
6
<configuration>
<property>
<name>fs.defaultFS</name>
<value>hdfs://localhost:9000</value>
</property>
</configuration>
hdfs-site.xml
1
2
3
4
5
6
<configuration>
<property>
<name>dfs.replication</name>
<value>1</value>
</property>
</configuration>
mapred-site.xml
1
2
3
4
5
6
7
8
9
10
11
12
13
<configuration>
<property>
<name>mapreduce.framework.name</name>
<value>yarn</value>
</property>
</configruration>

<configuration>
<property>
<name>mapreduce.application.classpath</name>
<value>$HADOOP_HOME/share/hadoop/mapreduce/*:$HADOOP_MAPRED_HOME/share/hadoop/mapreduce/lib/*</value>
</preperty>
</configruation>
yarn-site.xml
1
2
3
4
5
6
7
8
9
10
<configuration>
<property>
<name>yarn.nodemanager.aux-services</name>
<value>mapreduce_shuffle</value>
</property>
<property>
<name>yarn.nodemanager.env-whitelist</name>
<value>JAVA_HOME,HADOOP_COMMON_HOME,HADOOP_HDFS_HOME,HADOOP_CONF_DIR,CLASSPATH_PREPEND_DISTCACHE,HADOOP_YARN_HOME,HADOOP_MAPRED_HOME</value>
</property>
</configuration>

Fully-Distributed Operation

  • 单节点配置完hadoop之后,需要将其同步到其余节点
core-site.xml

模板:core-site.xml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
<configuration>
<property>
<name>fs.defaultFS</name>
<value>hdfs://hd-master:9000</value>
<description>namenode address</description>
</property>
<property>
<name>hadoop.tmp.dir</name>
<value>file:///opt/hadoop/tmp</value>
</property>
<property>
<name>io.file.buffer.size</name>
<value>131702</value>
</property>

<property>
<name>hadoop.proxyuser.root.hosts</name>
<value>*</value>
</property>
<property>
<name>hadoop.proxyuser.root.groups</name>
<value>*</value>
</property>
<!-- 为将用户`root`设置为超级代理,代理所有用户,如果是其他用户需要相应的将root修改为其用户名 -->
<!-- 是为hive的JDBCServer远程访问而设置,应该有其他情况也需要 -->
</configuration>
hdfs-site.xml

模板:hdfs-site.xml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
<configuration>
<property>
<name>dfs.namenode.secondary.http-address</name>
<value>hd-master:9001</value>
</property>
<property>
<name>dfs.namenode.name.dir</name>
<value>file:///opt/hadoop/hdfs/name</value>
<description>namenode data directory</description>
</property>
<property>
<name>dfs.datanode.data.dir</name>
<value>file:///opt/hadoop/hdfs/data</value>
<description>datanode data directory</description>
</property>
<property>
<name>dfs.replication</name>
<value>2</value>
<description>replication number</description>
</property>
<property>
<name>dfs.webhdfs.enabled</name>
<value>true</value>
</property>

<property>
<name>dfs.datanode.directoryscan.throttle.limit.ms.per.sec</name>
<value>1000</value>
</property>
<!--bug-->
</configuration>
yarn-site.xml
  • 模板:yarn-site.xml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
<configuration>
<property>
<name>yarn.resourcemanager.hostname</name>
<value>hd-master</value>
</property>
<property>
<name>yarn.resourcemanager.address</name>
<value>hd-master:9032</value>
</property>
<property>
<name>yarn.resourcemanager.scheduler.address</name>
<value>hd-master:9030</value>
</property>
<property>
<name>yarn.resourcemanager.resource-tracker.address</name>
<value>hd-master:9031</value>
</property>
<property>
<name>yarn.resourcemanager.admin.address</name>
<value>hd-master:9033</value>
</property>
<property>
<name>yarn.resourcemanager.webapp.address</name>
<value>hd-master:9099</value>
</property>

<!-- container -->
<property>
<name>yarn.scheduler.maximum-allocation-mb</name>
<value>512</value>
<description>maximum memory allocation per container</description>
</property>
<property>
<name>yarn.scheduler.minimum-allocation-mb</name>
<value>256</value>
<description>minimum memory allocation per container</description>
</property>
<!-- container -->

<!-- node -->
<property>
<name>yarn.nodemanager.resource.memory-mb</name>
<value>1024</value>
<description>maximium memory allocation per node</description>
</property>
<property>
<name>yarn.nodemanager.vmem-pmem-ratio</name>
<value>8</value>
<description>virtual memmory ratio</description>
</property>
<!-- node -->

<property>
<name>yarn.app.mapreduce.am.resource.mb</name>
<value>384</value>
</property>
<property>
<name>yarn.app.mapreduce.am.command-opts</name>
<value>-Xms128m -Xmx256m</value>
</property>

<property>
<name>yarn.nodemanager.vmem-check-enabled</name>
<value>false</value>
</property>

<property>
<name>yarn.nodemanager.resource.cpu-vcores</name>
<value>1</value>
</property>

<property>
<name>yarn.nodemanager.aux-services</name>
<value>mapreduce_shuffle</value>
</property>
<property>
<name>yarn.nodemanager.aux-services.mapreduce.shuffle.class</name>
<value>org.apache.hadoop.mapred.ShuffleHandler</value>
</property>
</configuration>
mapred-site.xml
  • 模板:mapred-site.xml.template
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
<configuration>
<property>
<name>mapreduce.framework.name</name>
<value>yarn</value>
<!--
<value>yarn-tez</value>
设置整个hadoop运行在Tez上,需要配置好Tez
-->
</property>
<property>
<name>mapreduce.jobhistory.address</name>
<value>hd-master:10020</value>
</property>
<property>
<name>mapreduce.jobhistory.webapp.address</name>
<value>hd-master:19888</value>
</property>

<!-- mapreduce -->
<property>
<name>mapreduce.map.memory.mb</name>
<value>256</value>
<description>memory allocation for map task, which should between minimum container and maximum</description>
</property>
<property>
<name>mapreduce.reduce.memory.mb</name>
<value>256</value>
<description>memory allocation for reduce task, which should between minimum container and maximum</description>
</property>
<!-- mapreduce -->

<!-- java heap size options -->
<property>
<name>mapreduce.map.java.opts</name>
<value>-Xms128m -Xmx256m</value>
</property>
<property>
<name>mapreduce.reduce.java.opts</name>
<value>-Xms128m -Xmx256m</value>
</property>
<!-- java heap size options -->

</configuration>
参数说明
  • yarn.scheduler.minimum-allocation-mb:container内存 单位,也是container分配的内存最小值

  • yarn.scheduler.maximum-allocation-mb:container内存 最大值,应该为最小值整数倍

  • mapreduce.map.memeory.mb:map task的内存分配

    • hadoop2x中mapreduce构建于YARN之上,资源由YARN统一管理
    • 所以maptask任务的内存应设置container最小值、最大值间
    • 否则分配一个单位,即最小值container
  • mapreduce.reduce.memeory.mb:reduce task的内存分配

    • 设置一般为map task的两倍
  • *.java.opts:JVM进程参数设置

    • 每个container(其中执行task)中都会运行JVM进程
    • -Xmx...m:heap size最大值设置,所以此参数应该小于 task(map、reduce)对应的container分配内存的最大值, 如果超出会出现physical memory溢出
    • -Xms...m:heap size最小值?#todo
  • yarn.nodemanager.vmem-pmem-ratio:虚拟内存比例

    • 以上所有配置都按照此参数放缩
    • 所以在信息中会有physical memory、virtual memory区分
  • yarn.nodemanager.resource.memory-mb:节点内存设置

    • 整个节点被设置的最大内存,剩余内存共操作系统使用
  • yarn.app.mapreduce.am.resource.mb:每个Application Manager分配的内存大小

主从文件

masters
  • 设置主节点地址,根据需要设置
1
hd-master
slaves
  • 设置从节点地址,根据需要设置
1
2
hd-slave1
hd-slave2

环境设置文件

  • 这里环境设置只是起补充作用,在~/.bashrc已经设置的 环境变量可以不设置
  • 但是在这里设置环境变量,然后把整个目录同步到其他节点, 可以保证在其余节点也能同样的设置环境变量
hadoop-env.sh

设置JAVA_HOME为Java安装根路径

1
JAVA_HOME=/opt/java/jdk
hdfs-env.sh

设置JAVA_HOME为Java安装根路径

1
JAVA_HOME=/opt/java/jdk
yarn-env.sh

设置JAVA_HOME为Java安装根路径

1
2
JAVA_HOME=/opt/java/jdk
JAVA_HEAP_MAX=Xmx3072m

初始化、启动、测试

HDFS
  • 格式化、启动

    1
    2
    3
    4
    5
    6
    $ hdfs namenode -format
    # 格式化文件系统
    $ start-dfs.sh
    # 启动NameNode和DataNode
    # 此时已可访问NameNode,默认http://localhost:9870/
    $ stop-dfs.sh
  • 测试

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29

    $ hdfs dfsadmin -report
    # 应该输出3个节点的情况

    $ hdfs dfs -mkdir /user
    $ hdfs dfs -mkdir /user/<username>
    # 创建执行MapReduce任务所需的HDFS文件夹
    $ hdfs dfs -mkdir input
    $ hdfs dfs -put etc/hadoop/*.xml input
    # 复制文件至分布式文件系统
    $ hadoop jar /opt/hadoop/share/hadoop/mapreduce/hadoop-mapreduce-examples-2.7.7.jar grep input output 'dfs[a-z]+'
    # 执行自带样例
    # 样例名称取决于版本

    $ hdfs dfs -get output outut
    $ cat output/*
    # 检查输出文件:将所有的输出文件从分布式文件系统复制
    # 至本地文件系统,并检查
    $ hdfs dfs -cat output/*
    # 或者之间查看分布式文件系统上的输出文件


    $ hadoop jar /opt/hadoop/share/hadoop/tools/lib/hadoop-streaming-2.7.7.jar \
    -input /path/to/hdfs_file \
    -output /path/to/hdfs_dir \
    -mapper "/bin/cat" \
    -reducer "/user/bin/wc" \
    -file /path/to/local_file \
    -numReduceTasks 1
YARN
1
2
3
4
5
$ sbin/start-yarn.sh
# 启动ResourceManger守护进程、NodeManager守护进程
# 即可访问ResourceManager的web接口,默认:http://localhost:8088/
$ sbin/stop-yarn.sh
# 关闭守护进程

其他

注意事项

  • hdfs namenode -format甚至可以在datanode节点没有java时 成功格式化

  • 没有关闭防火墙时,整个集群可以正常启动,甚至可以在hdfs里 正常建立文件夹,但是无法写入文件,尝试写入文件时报错

可能错误

节点启动不全
  • 原因

    • 服务未正常关闭,节点状态不一致
  • 关闭服务、删除存储数据的文件夹dfs/data、格式化namenode

文件无法写入

could only be replicated to 0 nodes instead of minReplication (=1). There are 2 datanode(s) running and 2 node(s) are excluded in this operation.

  • 原因

    • 未关闭防火墙
    • 存储空间不够
    • 节点状态不一致、启动不全
    • 在log里面甚至可能会出现一个连接超时1000ms的ERROR
  • 处理

    • 关闭服务、删除存储数据的文件夹dfs/data、格式化 namenode
      • 这样处理会丢失数据,不能用于生产环境
    • 尝试修改节点状态信息文件VERSION一致
      • ${hadoop.tmp.dir}
      • ${dfs.namenode.name.dir}
      • ${dfs.datanode.data.dir}
Unhealthy Node

1/1 local-dirs are bad: /opt/hadoop/tmp/nm-local-dir; 1/1 log-dirs are bad: /opt/hadoop/logs/userlogs

  • 原因:磁盘占用超过90%

常用命令

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
scp -r /opt/hadoop/etc/hadoop centos2:/opt/hadoop/etc
scp -r /opt/hadoop/etc/hadoop centos3:/opt/hadoop/etc
# 同步配置

scp /root/.bashrc centos2:/root
scp /root/.bashrc centos3:/root
# 同步环境

rm -r /opt/hadoop/tmp /opt/hadoop/hdfs
mkdir -p /opt/hadoop/tmp /opt/hadoop/hdfs
ssh centos2 rm -r /opt/hadoop/tmp /opt/hadoop/hdfs
ssh centos2 mkdir -p /opt/hadoop/tmp /opt/hadoop/hdfs/name /opt/hadoop/hdfs/data
ssh centos3 rm -r /opt/hadoop/tmp /opt/hadoop/hdfs/name /opt/hadoop/data
ssh centos3 mkdir -p /opt/hadoop/tmp /opt/hadoop/hdfs/name /opt/hadoop/hdfs/data
# 同步清除数据

rm -r /opt/hadoop/logs/*
ssh centos2 rm -r /opt/hadoop/logs/*
ssh centos3 rm -r /opt/hadoop/logs/*
# 同步清除log

Hive

依赖

  • hadoop:配置完成hadoop,则相应java等也配置完成
  • 关系型数据库:mysql、derby等

机器环境配置

~/.bashrc

1
2
3
4
5
export HIVE_HOME=/opt/hive
# self designed
export HIVE_CONF_DIR=$HIVE_HOME/conf
export PATH=$PATH:$HIVE_HOME/bin
export CLASSPATH=$CLASS_PATH:$HIVE_HOME/lib/*

文件夹建立

HDFS
1
2
3
4
$ hdfs dfs -rm -r /user/hive
$ hdfs dfs -mkdir -p /user/hive/warehouse /user/hive/tmp /user/hive/logs
# 这三个目录与配置文件中对应
$ hdfs dfs -chmod 777 /user/hive/warehouse /user/hive/tmp /user/hive/logs
FS
1
2
3
4
5
6
$ mkdir data
$ chmod 777 data
# hive数据存储文件夹
$ mkdir logs
$ chmod 777 logs
# log目录

Hive配置

XML参数

conf/hive-site.xml
  • 模板:conf/hive-default.xml.template
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
<property>
<name>javax.jdo.option.ConnectionURL</name>
<value>jdbc:mysql://hd-master:3306/metastore_db?createDatabaseIfNotExist=true</value>
</property>
<property>
<name>javax.jdo.option.ConnectionDriverName</name>
<value>org.mariadb.jdbc.Driver</value>
</property>
<property>
<name>javax.jdo.option.ConnectionUserName</name>
</value>hive</value>
</property>
<property>
<name>javax.jdo.option.ConnectionPassword</name>
<value>1234</value>
</property>
<property>
<name>hive.metastore.warehouse.dir</name>
<value>/user/hive/warehouse</value>
</property>
<property>
<name>hive.exec.scratchdir</name>
<value>/user/hive/tmp</value>
</property>

<!--
<property>
<name>hive.exec.local.scratchdir</name>
<value>${system:java.io.tmpdir}/${system:user.name}</value>
</property>
<property>
<name>hive.downloaded.resources.dir</name>
<valeu>${system:java.io.tmpdir}/${hive.session.id}_resources</value>
</property>
<property>«
<name>hive.server2.logging.operation.log.location</name>«
<value>${system:java.io.tmpdir}/${system:user.name}/operation_logs</value>«
<description>Top level directory where operation logs are stored if logging functionality is enabled</description>«
</property>«
所有`${system.java.io.tmpdir}`都要被替换为相应的`/opt/hive/tmp`,
可以通过设置这两个变量即可,基本是用于设置路径
-->

<property>
<name>system:java.io.tmpdir</name>
<value>/opt/hive/tmp</value>
</property>
<property>
<name>system:user.name</name>
<value>hive</value>
<property>

<!--
<property>
<name>hive.querylog.location</name>
<value>/user/hive/logs</value>
<description>Location of Hive run time structured log file</description>
</property>
这里应该不用设置,log放在本地文件系统更合适吧
-->

<property>
<name>hive.metastore.uris</name>
<value>thrift://192.168.31.129:19083</value>
</property>
<!--这个是配置metastore,如果配置此选项,每次启动hive必须先启动metastore,否则hive实可直接启动-->

<property>
<name>hive.server2.logging.operation.enabled</name>
<value>true</value>
</property>
<!-- 使用JDBCServer时需要配置,否则无法自行建立log文件夹,然后报错,手动创建可行,但是每次查询都会删除文件夹,必须查一次建一次 -->
  • /user开头的路径一般表示hdfs中的路径,而${}变量开头 的路径一般表示本地文件系统路径

    • 变量system:java.io.tmpdirsystem:user.name在 文件中需要自己设置,这样就避免需要手动更改出现这些 变量的地方
    • hive.querylog.location设置在本地更好,这个日志好像 只在hive启动时存在,只是查询日志,不是hive运行日志, hive结束运行时会被删除,并不是没有生成日志、${}表示 HDFS路径
  • 配置中出现的目录(HDFS、locaL)有些手动建立

    • HDFS的目录手动建立?
    • local不用
  • hive.metastore.uris若配置,则hive会通过metastore服务 访问元信息

    • 使用hive前需要启动metastore服务
    • 并且端口要和配置文件中一样,否则hive无法访问

环境设置文件

conf/hive-env.sh
  • 模板:conf/hive-env.sh.template
1
2
3
4
5
export JAVA_HOME=/opt/java/jdk
export HADOOP_HOME=/opt/hadoop
export HIVE_CONF_DIR=/opt/hive/conf
# 以上3者若在`~/.bashrc`中设置,则无需再次设置
export HIVE_AUX_JARS_PATH=/opt/hive/lib
conf/hive-exec-log4j2.properties
  • 模板:hive-exec-log4j2.properties.template

    1
    2
    3
    property.hive.log.dir=/opt/hive/logs
    # 原为`${sys:java.io.tmpdir}/${sys:user.name}`
    # 即`/tmp/root`(root用户执行)
conf/hive-log4j2.properties
  • 模板:hive-log4j2.properties.template

MetaStore

MariaDB
  • 安装MariaDB

  • 修改MariaDB配置

    1
    $ cp /user/share/mysql/my-huge.cnf /etc/my.cnf
  • 创建用户,注意新创建用户可能无效,见mysql配置

    • 需要注意用户权限:创建数据库权限、修改表权限
    • 初始化时Hive要自己创建数据库(hive-site中配置), 所以对权限比较严格的环境下,可能需要先行创建同名 数据库、赋权、删库
  • 下载mariadb-java-client-x.x.x-jar包,复制到lib

初始化数据库
1
$ schematool -initSchema -dbType mysql

这个命令要在所有配置完成之后执行

服务设置

1
2
3
4
5
6
7
8
9
10
11
$ hive --service metastore -p 19083 &
# 启动metastore服务,端口要和hive中的配置相同
# 否则hive无法连接metastore服务,无法使用
# 终止metastore服务只能根据进程号`kill`
$ hive --service hiveserver2 --hiveconf hive.server2.thrift.port =10011 &
# 启动JDBC Server
# 此时可以通过JDBC Client(如beeline)连接JDBC Server对
# Hive中数据进行操作
$ hive --service hiveserver2 --stop
# 停止JDBC Server
# 或者直接kill

测试

Hive可用性

需要先启动hdfs、YARN、metastore database(mysql),如果有 设置独立metastore server,还需要在正确端口启动

1
2
3
4
5
6
hive>	create table if not exists words(id INT, word STRING)
row format delimited fields terminated by " "
lines terminated by "\n";
hive> load data local inpath "/opt/hive-test.txt" overwrite into
table words;
hive> select * from words;
JDBCServer可用性
  • 命令行连接

    1
    $ beeline -u jdbc:hive2://localhost:10011 -n hive -p 1234
  • beeline中连接

    1
    2
    3
    $ beeline
    beeline> !connect jdbc:hive2://localhost:10011
    # 然后输入用户名、密码(metastore数据库用户名密码)

其他

可能错误

Failed with exception Unable to move source file

  • linux用户权限问题,无法操作原文件
  • hdfs用户权限问题,无法写入目标文件
  • hdfs配置问题,根本无法向hdfs写入:参见hdfs问题

org.apache.hive.service.cli.HiveSQLException: Couldn’t find log associated with operation handle:

  • 原因:hiveserver2查询日志文件夹不存在

  • 可以在hive中通过

    1
    $ set hive.server2.logging.operation.log.location;

    查询日志文件夹,建立即可,默认为 ${system:java.io.tmpdir}/${system:user.name}/operation_logs ,并设置权限为777

    • 好像如果不设置权限为777,每次查询文件夹被删除,每 查询一次建立一次文件夹?#todo
    • hive-sitex.xml中配置允许自行创建?

User: root is not allowed to impersonate hive

  • 原因:当前用户(不一定是root)不被允许通过代理操作 hadoop用户、用户组、主机

    • hadoop引入安全伪装机制,不允许上层系统直接将实际用户 传递给超级代理,此代理在hadoop上执行操作,避免客户端 随意操作hadoop
  • 配置hadoop的core-site.xml,使得当前用户作为超级代理

Tez

依赖

  • hadoop

机器环境配置

.bashrc

1
2
3
4
5
6
7
8
9
10
11
12
13
14
export TEZ_HOME=/opt/tez
export TEZ_CONF_DIR=$TEZ_HOME/conf

for jar in `ls $TEZ_HOME | grep jar`; do
export HADOOP_CLASSPATH=$HADOOP_CLASSPATH:$TEZ_HOME/$jar
done
for jar in `ls $TEZ_HOME/lib`; do
export HADOOP_CLASSPATH=$HADOOP_CLASSPATH:$TEZ_HOME/lib/$jar
done
# this part could be replaced with line bellow
export HADOOP_CLASSPATH=$HADOOP_CLASSPATH:$TEZ_HOME/*:$TEZ_HOME/lib/*
# `hadoop-env.sh`中说`HADOOP_CLASSPATH`是Extra Java CLASSPATH
# elements
# 这意味着hadoop组件只需要把其jar包加到`HADOOP_CLASSPATH`中既可

HDFS

  • 上传$TEZ_HOME/share/tez.tar.gz至HDFS中

    1
    2
    $ hdfs dfs -mkdir /apps
    $ hdfs dfs -copyFromLocal tez.tar.gz /apps

HadoopOnTez

在hadoop中配置Tez

  • 侵入性较强,对已有的hadoop集群全体均有影响

  • 所有hadoop集群执行的MapReduce任务都通过tez执行

    • 这里所有的任务应该是指直接在hadoop上执行、能在 webRM上看到的任务
    • hive这样的独立组件需要独立配置

XML参数

tez-site.xml
  • 模板:conf/tez-default-tmplate.xml
  • 好像还是需要复制到hadoop的配置文件夹中
1
2
3
4
5
6
7
8
9
10
11
<property>
<name>tez.lib.uris</name>
<value>${fs.defaultFS}/apps/tez.tar.gz</value>
<!--设置tez安装包位置-->
</property>
<!--
<property>
<name>tez.container.max.java.heap.fraction</name>
<value>0.2</value>
<property>
内存不足时-->
mapred-site.xml
  • 修改mapred-site.xml文件:配置mapreduce基于yarn-tez, (配置修改在hadoop部分也有)
1
2
3
4
<property>
<name>mapreduce.framework.name</name>
<value>yarn-tez</value>
</property>

环境参数

HiveOnTez

  • 此模式下Hive可以在mapreduce、tez计算模型下自由切换?

    1
    2
    3
    4
    5
    hive> set hive.execution.engine=tez;
    # 切换查询引擎为tez
    hive> set hive.execution.engine=mr;
    # 切换查询引擎为mapreduce
    # 这些命令好像没用,只能更改值,不能更改实际查询模型
  • 只有Hive会受到影响,其他基于hadoop平台的mapreduce作业 仍然使用tez计算模型

Hive设置

  • 若已经修改了mapred-site.xml设置全局基于tez,则无需复制 jar包,直接修改hive-site.xml即可
Jar包复制

复制$TEZ_HOME$TEZ_HOME/lib下的jar包到$HIVE_HOME/lib 下即可

hive-site.xml
1
2
3
4
<property>
<name>hive.execution.engine</name>
<value>tez</value>
</property>

其他

可能错误

SLF4J: Class path contains multiple SLF4J bindings.

  • 原因:包冲突的
  • 解决方案:根据提示冲突包删除即可

Spark

依赖

  • java
  • scala
  • python:一般安装anaconda,需要额外配置
    1
    2
    export PYTHON_HOME=/opt/anaconda3
    export PATH=$PYTHON_HOME/bin:$PATH
  • 相应资源管理框架,如果不以standalone模式运行

机器环境配置

~/.bashrc

1
2
3
4
5
6
7
export SPARK_HOME=/opt/spark
export PATH=$PATH:$SPARK_HOME/bin:$SPARK_HOME/sbin
export PYTHON_PATH=$PYTHON_PATH:$SPARK_HOME/python:$SPARK_HOME/python/lib/*
# 把`pyshark`、`py4j`模块对应的zip文件添加进路径
# 这里用的是`*`通配符应该也可以,手动添加所有zip肯定可以
# 否则无法在一般的python中对spark进行操作
# 似乎只要master节点有设置`/lib/*`添加`pyspark`、`py4j`就行

Standalone

环境设置文件

conf/spark-env.sh
  • 模板:conf/spark-env.sh.template

这里应该有些配置可以省略、移除#todo

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
export JAVA_HOME=/opt/jdk
export HADOOP_HOME=/opt/hadoop
export hADOOP_CONF_DIR=/opt/hadoop/etc/hadoop
export HIVE_HOME=/opt/hive

export SCALA_HOME=/opt/scala
export SCALA_LIBRARY=$SPARK_HOME/lib
# `~/.bashrc`设置完成之后,前面这段应该就这个需要设置

export SPARK_HOME=/opt/spark
export SPARK_DIST_CLASSPATH=$(hadoop classpath)
# 这里是执行命令获取classpath
# todo
# 这里看文档的意思,应该也是类似于`$HADOOP_CLASSPATH`
# 可以直接添加进`$CLASSPATH`而不必设置此变量
export SPARK_LIBRARY_PATH=$SPARK_HOME/lib

export SPARK_MASTER_HOST=hd-master
export SPARK_MASTER_PORT=7077
export SPARK_MASTER_WEBUI_PORT=8080
export SPARK_WORKER_WEBUI_PORT=8081
export SPARK_WORKER_MEMORY=1024m
# spark能在一个container内执行多个task
export SPARK_LOCAL_DIRS=$SPARK_HOME/data
# 需要手动创建

export SPARK_MASTER_OPTS=
export SPARK_WORKER_OPTS=
export SPARK_DAEMON_JAVA_OPTS=
export SPARK_DAEMON_MEMORY=
export SPARK_DAEMON_JAVA_OPTS=
文件夹建立
1
2
$ mkdir /opt/spark/spark_data
# for `$SPARK_LOCAL_DIRS`

Spark配置

conf/slaves

文件不存在,则在当前主机单节点运行

  • 模板:conf/slaves.template
1
2
hd-slave1
hd-slave2
conf/hive-site.xml

这里只是配置Spark,让Spark作为“thrift客户端”能正确连上 metastore server

  • 模板:/opt/hive/conf/hive-site.xml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
<property>
<name>hive.metastore.uris</name>
<value>thrift://192.168.31.129:19083</value>
<description>Thrift URI for the remote metastor. Used by metastore client to connect to remote metastore</description>
</property>
<property>
<name>hive.server2.thrift.port</name>
<value>10011</value>
</property>
<!--配置spark对外界thrift服务,以便可通过JDBC客户端存取spark-->
<!--这里启动端口同hive的配置,所以两者不能默认同时启动-->
<property>
<name>hive.server2.thrift.bind.host</name>
<value>hd-master</value>
</property>
</configuration>

测试

启动Spark服务

需要启动hdfs、正确端口启动的metastore server

1
2
3
4
5
6
7
8
9
10
$ start-master.sh
# 在执行**此命令**机器上启动master实例
$ start-slaves.sh
# 在`conf/slaves`中的机器上启动worker实例
$ start-slave.sh
# 在执行**此命令**机器上启动worker实例

$ stop-master.sh
$ stop-slaves.sh
$ stop-slave.sh
启动Spark Thrift Server
1
2
3
4
5
6
7
$ start-thriftserver.sh --master spark://hd-master:7077 \
--hiveconf hive.server2.thrift.bind.host hd-master \
--hiveconf hive.server2.thrift.port 10011
# 这里在命令行启动thrift server时动态指定host、port
# 如果在`conf/hive-site.xml`有配置,应该不需要

# 然后使用beeline连接thrift server,同hive
Spark-Sql测试
1
2
3
4
5
6
$ spark-sql --master spark://hd-master:7077
# 在含有配置文件的节点上启动时,配置文件中已经指定`MASTER`
# 因此不需要指定后面配置

spark-sql> set spark.sql.shuffle.partitions=20;
spark-sql> select id, count(*) from words group by id order by id;
pyspark测试
1
2
3
4
5
6
7
8
9
10
11
12
13
14
$ MASTER=spark://hd-master:7077 pyspark
# 这里应该是调用`$PATH`中第一个python,如果未默认指定

from pyspark.sql import HiveContext
sql_ctxt = HiveContext(sc)
# 此`sc`是pyspark启动时自带的,是`SparkContext`类型实例
# 每个连接只能有一个此实例,不能再次创建此实例

ret = sql_ctxt.sql("show tables").collect()
# 这里语句结尾不能加`;`

file = sc.textFile("hdfs://hd-master:9000/user/root/input/capacity-scheduler.xml")
file.count()
file.first()
Scala测试
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
$ MASTER=spark://hd-master:7077 spark-shell \
executor-memory 1024m \
--total-executor-cores 2 \
--excutor-cores 1 \
# 添加参数启动`spark-shell`

import org.apache.spark.sql.SQLContext
val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc)
sqlContext.sql("select * from words").collect().foreach(println)
sqlContext.sql("select id, word from words order by id").collect().foreach(println)

sqlContext.sql("insert into words values(7, \"jd\")")
val df = sqlContext.sql("select * from words");
df.show()

var df = spark.read.json("file:///opt/spark/example/src/main/resources/people.json")
df.show()

Spark on YARN

其他

可能错误

Initial job has not accepted any resources;

  • 原因:内存不足,spark提交application时内存超过分配给 worker节点内存

  • 说明

    • 根据结果来看,pysparkspark-sql需要内存比 spark-shell少? (设置worker内存512m,前两者可以正常运行)
    • 但是前两者的内存分配和scala不同,scala应该是提交任务 、指定内存大小的方式,这也可以从web-ui中看出来,只有 spark-shell开启时才算是application
  • 解决方式

    • 修改conf/spark-env.shSPARK_WORKER_MEMORY更大, (spark默认提交application内存为1024m)
    • 添加启动参数--executor-memory XXXm不超过分配值
ERROR KeyProviderCache:87 - Could not find uri with key [dfs.encryption.key.provider.uri] to create a keyProvider
  • 无影响

HBase

依赖

  • java
  • hadoop
  • zookeeper:建议,否则日志不好管理

机器环境

~/.bashrc

1
2
3
export HBASE_HOME=/opt/hbase
export PATH=$PAHT:$HBASE_HOME/bin
export HADOOP_CLASSPATH=$HADOOP_CLASSPATH:$HBASE_HOME/lib/*

建立目录

1
$ mkdir /tmp/hbase/tmpdir

HBase配置

环境变量

conf/hbase-env.sh
1
2
export HBASE_MANAGES_ZK=false
# 不使用自带zookeeper
conf/zoo.cfg

若设置使用独立zookeeper,需要复制zookeeper配置至HBase配置 文件夹中

1
$ cp /opt/zookeeper/conf/zoo.cfg /opt/hbase/conf

Standalone模式

conf/hbase-site.xml
1
2
3
4
5
6
7
8
9
10
<configuration>
<property>
<name>hbase.rootdir</name>
<value>file://${HBASE_HOME}/data</value>
</property>
<property>
<name>hbase.zookeeper.property.dataDir</name>
<value>/tmp/zookeeper/zkdata</value>
</property>
</configuration>

Pseudo-Distributed模式

conf/hbase-site.xml
  • 在Standalone配置上修改
1
2
3
4
5
6
7
8
<proeperty>
<name>hbase.cluster.distributed</name>
<value>true</value>
</property>
<property>
<name>hbase.rootdir</name>
<value>hdfs://hd-master:9000/hbase</value>
</property>

Fully-Distributed模式

conf/hbase-site.xml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
<property>
<name>hbase.rootdir</name>
<value>hdfs://hd-master:9000/hbase</value>
</property>
<property>
<name>hbase.cluster.distributed</name>
<value>true</name>
</property>
<property>
<name>hbase.zookeeper.quorum</name>
<value>hd-master,hd-slave1,hd-slave2</value>
</property>
<property>
<name>hbase.zookeeper.property.dataDir</name>
<value>/tmp/zookeeper/zkdata</value>
</property>

测试

  • 需要首先启动HDFS、YARN
  • 使用独立zookeeper还需要先行在每个节点启动zookeeper
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
$ start-hbase.sh
# 启动HBase服务
$ local-regionservers.sh start 2 3 4 5
# 启动额外的4个RegionServer
$ hbase shell
hbase> create 'test', 'cf'
hbase> list 'test'
hbase> put 'test', 'row7', 'cf:a', 'value7a'
put 'test', 'row7', 'cf:b', 'value7b'
put 'test', 'row7', 'cf:c', 'value7c'
put 'test', 'row8', 'cf:b', 'value8b',
put 'test', 'row9', 'cf:c', 'value9c'
hbase> scan 'test'
hbase> get 'test', 'row7'
hbase> disable 'test'
hbase> enable 'test'
hbaee> drop 'test'
hbase> quit

Zookeeper

依赖

  • java

  • 注意:zookeeper集群中工作超过半数才能对外提供服务,所以 一般配置服务器数量为奇数

机器环境

~/.bashrc

1
2
3
export ZOOKEEPER_HOME=/opt/zookeeper
export PATH=$PATH:$ZOOKEEPER_HOME/bin
export HADOOP_CLASSPATH=$HADOOP_CLASSPATH:$ZOOKEEPER_HOME/lib

创建文件夹

  • 在所有节点都需要创建相应文件夹、myid文件
1
2
3
4
5
6
7
mkdir -p /tmp/zookeeper/zkdata /tmp/zookeeper/zkdatalog
echo 0 > /tmp/zookeeper/zkdatalog/myid

ssh centos2 mkdir -p /tmp/zookeeper/zkdata /tmp/zookeeper/zkdatalog
ssh centos3 mkdir -p /tmp/zookeeper/zkdata /tmp/zookeeper/zkdatalog
ssh centos2 "echo 2 > /tmp/zookeeper/zkdata/myid"
ssh centos3 "echo 3 > /tmp/zookeeper/zkdata/myid"

Zookeeper配置

Conf

conf/zoo.cfg
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
tickTime=2000
# The number of milliseconds of each tick
initLimit=10
# The number of ticks that the initial
# synchronization phase can take
syncLimit=5
# The number of ticks that can pass between
# sending a request and getting an acknowledgement
dataDir=/tmp/zookeeper/zkdata
dataLogDir=/tmp/zookeeper/zkdatalog
# the directory where the snapshot is stored.
# do not use /tmp for storage, /tmp here is just
# example sakes.
clientPort=2181
# the port at which the clients will connect

autopurge.snapRetainCount=3
# Be sure to read the maintenance section of the
# administrator guide before turning on autopurge.
# http://zookeeper.apache.org/doc/current/zookeeperAdmin.html#sc_maintenance
# The number of snapshots to retain in dataDir
autopurge.purgeInterval=1
# Purge task interval in hours
# Set to "0" to disable auto purge feature

server.0=hd-master:2888:3888
server.1=hd-slave1:2888:3888
server.2=hd-slave2:2888:3888
# Determine the zookeeper servers
# fromation: server.NO=HOST:PORT1:PORT2
# PORT1: port used to communicate with leader
# PORT2: port used to reelect leader when current leader fail
$dataDir/myid
  • $dataDirconf/zoo.cfg中指定目录
  • myid文件里就一个id,指明当前zookeeper server的id,服务 启动时读取文件确定其id,需要自行创建
1
0

启动、测试、清理

启动zookeeper

1
2
3
4
5
6
7
8
9
$ zkServer.sh start
# 开启zookeeper服务
# zookeeper服务要在各个节点分别手动启动

$ zkServer.sh status
# 查看服务状态

$ zkCleanup.sh
# 清理旧的快照、日志文件

Flume

依赖

  • java

机器环境配置

~/.bashrc

1
export PATH=$PATH:/opt/flume/bin

Flume配置

环境设置文件

conf/flume-env.sh
  • 模板:conf/flume-env.sh.template
1
JAVA_HOME=/opt/jdk

Conf文件

conf/flume.conf
  • 模板:conf/flume-conf.properties.template
1
2
3
4
5
6
7
8
9
10
11
12
13
14
agent1.channels.ch1.type=memory
# define a memory channel called `ch1` on `agent1`
agent1.sources.avro-source1.channels=ch1
agent1.sources.avro-source1.type=avro
agent1.sources.avro-source1.bind=0.0.0.0
agent1.sources.avro-source1.prot=41414
# define an Avro source called `avro-source1` on `agent1` and tell it
agent1.sink.log-sink1.channels=ch1
agent1.sink.log-sink1.type=logger
# define a logger sink that simply logs all events it receives
agent1.channels=ch1
agent1.sources=avro-source1
agent1.sinks=log-sink1
# Finally, all components have been defined, tell `agent1` which one to activate

启动、测试

1
2
3
4
5
6
7
8
9
10
11
$ flume-ng agent --conf /opt/flume/conf \
-f /conf/flume.conf \
-D flume.root.logger=DEBUG,console \
-n agent1
# the agent name specified by -n agent1` must match an agent name in `-f /conf/flume.conf`

$ flume-ng avro-client --conf /opt/flume/conf \
-H localhost -p 41414 \
-F /opt/hive-test.txt \
-D flume.root.logger=DEBUG, Console
# 测试flume

其他

Kafka

依赖

  • java
  • zookeeper

机器环境变量

~/.bashrc

1
2
export PATH=$PATH:/opt/kafka/bin
export KAFKA_HOME=/opt/kafka

多brokers配置

Conf

config/server-1.properties
  • 模板:config/server.properties
  • 不同节点broker.id不能相同
  • 可以多编写几个配置文件,在不同节点使用不同配置文件启动
1
2
3
broker.id=0
listeners=PLAINTEXT://:9093
zookeeper.connect=hd-master:2181, hd-slave1:2181, hd-slave2:2181

测试

  • 启动zookeeper
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
$ kafka-server-start.sh /opt/kafka/config/server.properties &
# 开启kafka服务(broker)
# 这里是指定使用单个默认配置文件启动broker
# 启动多个broker需要分别使用多个配置启动多次
$ kafka-server-stop.sh /opt/kafka/config/server.properties

$ kafka-topics.sh --create --zookeeper localhost:2181 \
--replication-factor 1 \
--partitions 1 \
--topic test1
# 开启话题
$ kafka-topics.sh --list zookeeper localhost:2181
#
$ kafka-topics.shd --delete --zookeeper localhost:2181
--topic test1
# 关闭话题

$ kafka-console-producer.sh --broker-list localhost:9092 \
--topic test1
# 新终端开启producer,可以开始发送消息

$ kafka-console-consumer.sh --bootstrap-server localhost:9092 \
--topic test1 \
--from-beginning

$ kafka-console-consumer.sh --zookeeper localhost:2181 \
--topic test1 \
--from beginning
# 新终端开启consumer,可以开始接收信息
# 这个好像是错的

其他

Storm

依赖

  • java
  • zookeeper
  • python2.6+
  • ZeroMQ、JZMQ

机器环境配置

~/.bashrc

1
2
export STORM_HOME=/opt/storm
export PAT=$PATH:$STORM_HOME/bin

Storm配置

配置文件

conf/storm.yaml
  • 模板:conf/storm.yarml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
storm.zookeeper.servers:
-hd-master
-hd-slave1
-hd-slave2
storm.zookeeper.port: 2181

nimbus.seeds: [hd-master]
storm.local.dir: /tmp/storm/tmp
nimbus.host: hd-master
supervisor.slots.ports:
-6700
-6701
-6702
-6703

启动、测试

1
2
3
4
5
6
7
8
9
10
11
12
13
14
storm nimbus &> /dev/null &
storm logviewer &> /dev/null &
storm ui &> /dev/null &
# master节点启动nimbus

storm sueprvisor &> /dev/null &
storm logviewer &> /dev/nulla &
# worker节点启动


storm jar /opt/storm/example/..../storm-start.jar \
storm.starter.WordCountTopology
# 测试用例
stom kill WordCountTopology

http://hadoop.apache.org/docs/r3.1.1