Hadoop概述

  • Hadoop(核心):HDFSMapReduce/YARN
  • Hadoop家族:建立在Hadoop基础上的一系列开源工具

hadoop_relations

Hadoop

HadoopApache的一个分布式计算、java语言实现的开源框架, 实现在大量计算机组成的集群中对海量数据进行分布式计算。相比于 依赖硬件的可靠性,Hadoop被设计为可以检测、处理应用层面的 failures,能够提供构建于电脑集群上的可靠服务。

HadoopApache的分布式计算开源框架,提供分布式文件系统 HDFSMapReduce/YARN分布式计算的软件架构

Hadoop Common

支持其它Hadoop模块的公用组件

Hadoop Distributed File System(HDFS)

虚拟文件系统,让整个系统表面上看起来是一个空间,实际上是很多 服务器的磁盘构成的

Hadoop YARN

Yet Another Resource Negotiator,通用任务、集群资源分配框架 ,面向Hadoop的编程模型

  • YARN将classic/MapReduce1中Jobtracker职能划分为多个独立 实体,改善了其面临的扩展瓶颈问题

  • YARN比MapReduce更具一般性,MapReduce只是YARN应用的一种 形式,可以运行Spark、Storm等其他通用计算框架

  • YARN精妙的设计可以让不同的YARN应用在同一个集群上共存, 如一个MapReduce应用可以同时作为MPI应用运行,提高可管理性 和集群利用率

Hadoop MapReduce

YARN基础上的大数据集并行处理系统(框架)

  • 包括两个阶段

    • Map:映射
    • Reduce:归一
  • 在分布式系统上进行计算操作基本都是由Map、Reduce概念步骤 组成

    • 分布式系统,不像一般的数据库、文件系统,无法从上至下 、从头到尾进行求和等操作
    • 需要由分散的节点不断向一个点聚拢的计算过程
  • 不适合实时性要求的应用,只适合大数据离线处理

Apache下Hadoop相关项目

高频

Ambari

用于部署(供应)、管理、监控Hadoop集群的Web工具

  • 支持HDFSMapReduceHiveHCatalogHBaseOozieZooKeeperPigSqoop

  • 提供dashboard用于查看集群健康程度,如:热度图

  • 能够直观的查看MapReducePigHive应用特点,提供 易用的方式考察其执行情况

HBase

Hadoop项目子项目,高可靠、高性能、面向列、可伸缩的分布式 存储系统

  • 该技术源于Fay Chang撰写的Google论文《Bigtable:一个 结构化数据的分布式存储系统》,类似于Bigtable在Google 文件系统上提供的分布式数据存储一样,HBaseHadoop的 基础上提供了类似于Bigtable的能力

  • 适合非结构化数据存储

  • 可用于在廉价PC Server上搭建大规模结构化存储集群,是 NoSQL数据库的两个首选项目(MongoDB

Hive

基于Hadoop的数据仓库工具

  • Hive中建立表,将表映射为结构化数据文件

  • 可以通过类SQL语句直接查询数据实现简单的MapReduce统计, 而不必开发专门的MapReduce应用

    • Hive会将SQL语句转换为MapReduce任务查询Hadoop
    • 速度很慢
    • 适合数据仓库的统计分析
    • 支持SQL语法有限

Pig

基于Hadoop的大规模数据高层分析工具(类似于Hive

  • 提供SQL-Like语言PigLatin

    • 其编译器会把类SQL的数据分析请求,转换为一系列经过 优化处理的MapReduce运算

    • 是一种过程语言,和Hive中的类SQL语句相比,更适合写 脚本,而Hive的类SQL语句适合直接在命令行执行

Zookeeper

Hadoop正式子项目,针对大型分布式应用设计的分布式、开源协调 系统

  • 提供功能:配置维护、名字服务、分布式同步、组服务

  • 封装好复杂、易出错的关键服务,提供简单易用、功能稳定、 性能高效的接口(系统),解决分布式应用中经常遇到的数据 管理问题,简化分布式应用协调及管理难度,提供高性能分布式 服务

  • 通常为HBase提供节点间的协调,部署HDFSHA模式时是 必须的

Spark

基于内存计算的开源集群计算系统,目的是让数据分析更加快速

低频

Mahout

基于Hadoop的机器学习、数据挖掘的分布式框架

  • 使用MapReduce实现了部分数据挖掘算法,解决了并行挖掘问题

    • 包括聚类、分类、推荐过滤、频繁子项挖掘
  • 通过使用Hadoop库,Mahout可以有效扩展至云端

Cassandra

开源分布式NoSQL数据库系统,最初由Facebook开发,用于存储 简单格式数据,集Google BigTable数据模型和Amazon Dynamo 的完全分布式架构于一身

Avro

数据序列化系统,设计用于支持数据密集型、大批量数据交换应用, 是新的数据序列化格式、传输工具,将逐步取代Hadoop原有的 IPC机制

Chukwa

用于监控大型分布式系统的开源数据收集系统,可以将各种类型的 数据收集成适合Hadoop处理的文件,保存在HDFS中供MapReduce 操作

Tez

基于YARN的泛用数据流编程平台

  • 提供强力、灵活的引擎用于执行任何DAG任务,为批处理和 交互用例处理数据

Tez正逐渐被HivePigHadoop生态框架采用,甚至被一些 商业公司用于替代MapReduce作为底层执行引擎

其他Hadoop相关项目

高频

Sqoop

用于将Hadoop和关系型数据库中数据相互转移的开源工具

  • 可以将关系型数据库(MySQLOraclePostgres)中 数据转移至HadoopHDFS

  • 也可以将HDFS的数据转移进关系型数据库中

Impala

Cloudera发布的实时查询开源项目

  • 模仿Google Dremel

  • 称比基于MapReduceHive SQL查询速度提升3~30倍,更加 灵活易用

Phoenix

apache顶级项目,在HBase上构建了一层关系型数据库,可以用 SQL查询HBase数据库,且速度比Impala更快,还支持包括 二级索引在内的丰富特性,借鉴了很多关系型数据库优化查询方法

Oozie

工作流引擎服务器,用于管理、协调运行在Hadoop平台 (HDFSPigMapReduce)的任务

Cloudera Hue

基于Web的监控、管理系统,实现对HDFSMapReduce/YARNHBaseHivePigWeb化操作和管理

低频

Hama

基于HDFSBSP(Bulk Synchronous Parallel)并行 计算框架,可以用包括图、矩阵、网络算法在内的大规模、 大数据计算

Flume

分布的、可靠的、高可用的海量日志聚合系统,可用于日志数据 收集、处理、传输

Giraph

基于Hadoop的可伸缩的分布式迭代图处理系统,灵感来自于BSPGoogle Pregel

Crunch

基于Google FlumeJava库编写的Java库,用于创建MapReduce 流水线(程序)

  • 类似于HivePig,提供了用于实现如连接数据、执行聚合 、排序记录等常见任务的模式库

    • 但是Crunch不强制所有输入遵循同一数据类型

    • 其使用一种定制的类型系统,非常灵活,能直接处理复杂 数据类型,如:时间序列、HDF5文件、HBase、序列化 对象(protocol bufferAvro记录)

  • 尝试简化MapReduce的思考方式

    • MapReduce有很多优点,但是对很多问题,并不是合适的 抽象级别

    • 出于性能考虑,需要将逻辑上独立的操作(数据过滤、投影 、变换)组合为一个物理上的MapReduce操作

Whirr

运行于云服务的类库(包括Hadoop),提供高度互补性

  • 相对中立
  • 支持AmazonEC2Rackspace的服务

Bigtop

Hadoop及其周边生态打包、分发、测试的工具

HCatalog

基于Hadoop的数据表、存储管理,实现中央的元数据、模式管理, 跨越HadoopRDBMS,利用PigHive提供关系视图

Llama

让外部服务器从YARN获取资源的框架

CDH组件

Fuse

HDFS系统看起来像普通文件系统

Hadoop Streamin

MapReduce代码其他语言支持,包括:C/C++PerlPythonBash

MapReduce YARN

MapReduce1.0组件

MapReduce1.0是指Hadoop1.0中组件,不是指MapReduce计算模型

优势

  • 方便扩展:能够运行在普通服务器构成的超大规模集群上

  • IO瓶颈:通过将IO分散在大规模集群的各个节点上,可以提高 数据装载速度(大规模数据时只有部分数据可以状态在内存中)

局限

  • MapReduce计算模型问题(参见MapReduce计算模型

  • 数据处理延迟大

    • MapReduce作业在Map阶段、Reduce阶段执行过程中,需要 把中间结果存盘
    • 在MR作业间也需要通过磁盘实现作业间的数据交换
  • 资源利用率低

    • 任务调度方法远未达到优化资源利用率的效果,给每个 TaskTracker分配任务的过程比较简单

资源分配

  • 每个TaskTracker拥有一定数量的slots,每个活动的Map、 Reduce任务占用一个slot

  • JobTracker把任务分配给最靠近数据、有slot空闲TT

  • 不考虑Task运算量大小,所有Task视为相同,如果有某个TT 当前负载过高,会影响整体的执行

  • 也可以通过Speculative Execution模式,在多个slave上 启动同一个任务,只要其中有一个任务完成即可

执行引擎

MapReduce执行引擎运行在HDFS上

  • JobTracker:运行在NameNode上

    • 分解客户端提交的job为数据处理tasks,分发给集群里相关 节点上的TaskTacker运行
    • 发送任务原则:尽量把任务推送到离数据最近的节点上, 甚至推送到数据所在的节点上运行
  • TaskTracker:运行在DataNode上

    • 在节点上执行数据处理map、reduce tasks
    • 可能需要从其他DataNode中获取需要数据

MapRedudce2.0

Shuffle

Shuffle:系统执行排序的过程

  • 为了确保MapReduce的输入是按键排序的

Map端

每个Map Task都有一个内存缓冲区用于存储map输出结果,缓冲区 快满时需要将缓冲区数据以临时文件方式存放到磁盘中,整个Task 结束后再对此Map Task产生所有临时作合并,生成最终正式输出文件 ,等待Reduce Task拉数据

YARN

Yet Another Resource Negotiator,通用任务、集群资源分配框架 ,面向Hadoop的编程模型

YARN优势

扩展性

  • YARN将classic/MapReduce1中Jobtracker职能划分为多个独立 实体,改善了其面临的扩展瓶颈问题

  • MapReduce现在只是批数据处理框架,是YARN支持的数据处理 框架的一种,独立于资源管理层,单独演化、改进

  • YARN精妙的设计可以让不同的YARN应用在同一个集群上共存, 如一个MapReduce应用可以同时作为MPI应用运行,提高可管理性 、集群利用率

高效率

  • ResourceManager是单独的资源管理器

  • Job Scheduler指负责作业调度

  • 根据资源预留要求、公平性、Service Level Agreement等标准 ,优化整个集群的资源利用

一般性

YARN是通用资源管理框架,在其上可以搭建多种数据处理框架

  • 批处理:MapReduce
  • 交互式处理:Tez
  • 迭代处理:Spark
  • 实时流处理:Storm
  • 图数据处理:GraphLab/Giraph

YARN中实体

ResourceManager

RM物理上对应主节点,逻辑上管理集群上的资源使用,其功能由 Scheduler、ApplicationManager协调完成

  • AppplicatonManager:接受、监控任务

    • 接受客户端提交的job

    • 判断启动该job的ApplicationMaster所需的资源

    • 监控ApplicationMaster的状态,并在失败时重启其

  • Scheduler:分配资源、调度

    • Schedular计算启动ApplicationManager提交的job的AM所需 资源,将资源封装成Container

    • 然后根据调度算法调度,在某个NM上启动job的AM

    • 不提供失败重启、监控功能

    • Scheduler收到AM任务完成汇报之后,回收资源、向RM返回 执行结果

    • 调度算法可自定以,YARN根据不同场景提供

      • FIFO Scheduler
      • Capacity Scheduler
      • Fair Scheduler

NodeManager

NM物理上对应计算节点,逻辑上监控、管理当前节点资源

  • 仅仅抽象本节点资源(cpu、内存、磁盘、网络等),并且定时 像RM的Scheduler汇报

  • 接受并处理AM的tasks启动、停止等请求

ApplicationMaster

AM管理集群上运行任务生命周期

  • 每个job都有一个专用的AM

  • AM启动后会计算job所需资源,并向Scheduler申请资源

  • AM运行在job运行期间,负责整个job执行过程的监控

    • NM分配完任务container后,AM开始监控这些containers 、tasks状态
    • 任务失败则回收资源重新生成
    • 成功则释放资源
    • 任务执行完毕后回报Scheduler

Containers

YARN为将来的资源隔离提出的框架,是一组资源的集合,每个task 对应一个container,只能在container中运行

  • 容器有特定的内存分配范围

    • 容器内存最小值即为内存分配单位,内存最大值也应该是 内存分配单位整数倍

    • 根据任务所需资源多少分配给容器整数倍内存单位,但是 如果任务所需内存大于容器内存最大值,运行时可能会报错

  • 由NM确保task使用的资源不会超过分配的资源

  • 注意

    • AM并不运行于container中,真正的task才运行在container

yarn_procedure

Job运行过程

作业提交

  • 从RM获取新的作业ID
  • 作业客户端检查作业输出说明,计算输入分片(也可以配置 在集群中产生分片)
  • 将作业资源复制到HDFS
  • 调用RM上的submitApplication方法提交作业

作业初始化

  1. RM收到调用submitApplication消息后,将请求传递给 内部scheduler,scheduler分配一个container
  2. NM在RM的管理下在容器中启动应用程序的master进程AM, 其对作业进行初始化
  3. AM创建多个簿记对象用于接受任务进度、完成报告,保持 对作业进度的跟踪
  4. AM接受来自共享文件系统的在客户端计算的输入分片,对 每个分片创建一个map对象,及由mapreduce.job.reduces 属性确定的多个reduce任务对象
  5. AM根据任务大小决定如何运行job,如果在新容器中分配、 运行任务的开销大于并行运行时的开销,AM会在单个节点 上运行,这样的作业称为uberized
  6. AM在任何tasks执行之前通过job的setup方法设置job的 OutputCommiter,建立作业输出目录

任务分配

  1. 若作业不适合作为uber任务运行,AM为该作业中所有map 、reduce任务向RM请求容器
  2. 请求附着heart beat,包括每个map任务的数据本地化信息 ,特别是输入分片所在的主机、机架信息,scheduler据此 做调度决策
    • 理想化情况下任务分配到数据本地化节点
    • 否则优先使用机架本地化
  3. 请求同时指定了任务内存需求,YARN中的资源分为更细粒度 ,task可以请求最小到最大限制范围、任意最小值倍数的 内存容量

任务执行

  1. 当NM的scheduler为task分配了container,AM就可以通过 与NM通信启动容器
  2. 任务由YarnChild执行,在执行任务之前,需要将任务 所需资源本地化,包括作业的配置、JAR文件、所有来自 分布式缓存的文件,然后运行map、reduce任务
  3. 对于Streaming、Pipes程序,YarnChild启动Streaming、 Pipes进程,使用标准输入输出、socket与其通信(以 MapReduce1方式运行)

进度和状态更新

  1. task每3s通过umbilical接口向AM汇报进度、状态(包括 计数器),作为job的aggregate view
  2. 客户端则默认没1s查询AM接受进度更新

作业完成

  1. 客户端每5s通过调用job的waitForCompletion检查作业 是否完成,也可以通过HTTP callback完成作业
  2. 作业完成后AM和task容器清理工作状态,OutputCommiter 作业清理方法被调用

todo:这里逻辑有问题,要删

MapReduce计算模型

  • 分布式系统,不像一般的数据库、文件系统,无法从上至下 、从头到尾进行求和等操作
  • 需要由分散的节点不断向一个点聚拢的计算过程,即分布式系统 上计算模型基本都是由map、reduce步骤组成

MapReduce

MapReduce每步数据处理流程包括两个阶段

  • Map:映射

    • map过程相互独立、各mapper见不通信,所以mapreduce 只适合处理独立计算的任务
  • Reduce:归一

    • reduce直接处理map的输出,reduce的为map输出

数据处理过程

  • 需要把任何计算任务转换为一系列MapReduce作业,然后依次 执行这些作业

  • 计算过程的各个步骤之间,各个作业输出的中间结果需要存盘, 然后才能被下个步骤使用(因为各个步骤之间没有明确流程)

  • One Pass Computation:只需要一遍扫描处理的计算任务 MapReduce计算模型非常有效

  • Multi Pass Computation:需要在数据上进行多遍扫描、处理 的计算任务,需要执行多个MapReduce作业计算任务,因为 多副本复制、磁盘存取,其效率不高

Mapred on DAG

Directed Acyclic Graph;表示数据处理流程的有向无环图

  • 顶点:数据处理任务,反映一定的业务逻辑,即如何对数据进行 转换和分析
  • 边:数据在不同的顶点间的传递

比较

比较方面 MapReduce DAG
操作原语 map、reduce 较多
抽象层次
表达能力
易用性 要手动处理job之间依赖关系,易用性差 DAG本身体现数据处理流程
可读性 处理逻辑隐藏在代码中,没有整体逻辑 较好
  • 正是MapReduce提供操作原语少、抽象层次低,所以其表达能力 差,同时需要用户处理更多的逻辑,易用性、可读性差

    • 复杂数据处理任务,如:机器学习算法、SQL连接查询很难 表示用MapReduce计算默认表达

    • 操作原语多并不是DAG本身的要求,DAG本身只是有向无环图, 只是使用DAG计算模型可以提供更多的操作原语

  • 由于DAG的表达能力强于MapReduce,对某些处理逻辑,DAG 所需作业数目小于MapReduce,消除不必要的任务

    • DAG显著提高了数据处理效率,对小规模、低延迟和大规模、 高吞吐量的负载均有效

    • MapReduce需要通过把中间结果存盘实现同步,而DAG整合 部分MapReduce作业,减少磁盘I/O

    • reduce任务需要等待map任务全部完成才能继续,DAG优化 数据处理流程,减少同步Barrier

  • DAG部分计算模型也由map、reduce任务构成,只是不像传统 MapReduce计算模型中map、reduce必须成对出现

    • 或者说DAG只有一次map任务(扫描数据时),其余都是 reduce任务?

    • 从MapReduce配置也可以看出,MapReduce可以选择基于 yarnyarn-tez

Tez

Tez简介

Tezm目标就是建立执行框架,支持大数据上DAG表达的作业处理

  • YARN将资源管理功能从数据处理模型中独立出来,使得在Hadoop 执行DAG表达的作业处理成为可能,Tez成为可扩展、高效的执行 引擎

    • Tez在YARN和Hive、Pig之间提供一种通用数据处理模型DAG
    • Hive、Pig、Cascading作业在Tez上执行更快,提供交互式 查询响应
  • Tez把DAG的每个顶点建模为Input、Processer、Output模块的 组合

    • Input、Output决定数据格式、输入、输出
    • Processor包装了数据转换、处理逻辑
    • Processor通过Input从其他顶点、管道获取数据输入,通过 Output向其他顶点、管道传送生成数据
    • 通过把不同的Input、Processor、Output模块组合成顶点, 建立DAG数据处理工作流,执行特定数据处理逻辑
  • Tez自动把DAG映射到物理资源,将其逻辑表示扩展为物理表示, 并执行其中的业务逻辑

    • Tez能为每个节点增加并行性,即使用多个任务执行节点 的计算任务

    • Tez能动态地优化DAG执行过程,能够根据执行过程中获得地 信息,如:数据采样,优化DAG执行计划,优化资源使用、 提高性能

      • 去除了连续作业之间的写屏障
      • 去除了工作流中多余的Map阶段

Tez执行过程

  • 初始化例程,提供context/configuration信息给Tez runtime

  • 对每个顶点的每个任务(任务数量根据并行度创建)进行初始化

  • 执行任务Processor直到所有任务完成,则节点完成

  • Output把从Processor接收到的数据,通过管道传递给下游顶点 的Input

  • 直到整个DAG所有节点任务执行完毕

MSSQL Puzzles

访问其他数据库服务器

SQL默认阻止对组件Ad Hoc Distributed QueriesSTATEMENT OpenRowSet/OpenDatasource的访问,需要使用sp_configure 启用Ad Hoc Distributed Queries

  • 开启Ad Hoc Distributed Queries

    1
    2
    3
    4
    exec sp_configure 'show advanced options',1
    reconfigure
    exec sp_configure 'Ad Hoc Distributed Queries',1
    reconfigure
  • 关闭

    1
    2
    3
    4
    exec sp_configure 'Ad Hoc Distributed Queries',0
    reconfigure
    exec sp_configure 'show advanced options',0
    reconfigure

特殊语法

数据导入

  • mssql中换行符设置为\n表示的是\r\n,即永远无法单独 指定\n或者\r,尽量使用ASCII码0xXX表示

    1
    > bulk insert tbl_name from /path/to/file with (FILEDTERMINATOR="|", ROWTERMINATOR="0x0a");

SQL数据库Puzzles

数据迁移

直接查询、插入

同库

1
2
insert into dst_tb select * from src_tb;
insert into dst_tb(field1, field2, ...) select (field_a, field_b, ...) from src_tb;

异库、同服务器

1
2
3
4
5
6
insert into db1.dst_db select * from db2.src_db;
# 插入已有表
create table db1.dst_tb as select * from db2.src_tb;
# 创建表并插入数据
rename table src_db.src_tb to dst_db.dst_tb;
# 重命名迁移完整表

异服务器

文件中介、跨实例

.sql

1
2
3
4
5
$ mysqldump [-u user] -p --single-transaction [--where=""] src_db src_tb > src_db.src_tb.sql
# 导入数据
# 加上`-d`仅导出表结构
$ mysql [-u user] -p dst_db < src_db.src_tb.sql
# 导入数据
1
source src_db.src_tb.sql;

.csv

secure_file_priv

load data infileinto outfile需要mysql开启 secure_file_priv选项,可以通过查看

1
show global variables like `%secure%`;

mysql默认值NULL不允许执行,需要更改配置文件

1
2
[mysqld]
secure_file_priv=''
本机Server
1
2
3
4
5
6
7
8
9
10
11
12
13
14
select * from src_tb into outfile file_name.csv
fields terminated by ','
optionally enclosed by '"'
escaped by '"'
lines terminated by '\r\n';
# 导出至`.csv`

load data infile file_name.csv [replace] into table dst_tb(field1, field2, @dummy...)
fields terminated by ','
optionally enclosed by '"'
escaped by '"'
lines terminated by '\r\n';
# 从`.csv`数据导入
# 表结构不同时可以设置对应字段,多余字段`@dummy`表示丢弃
异机Server
1
2
3
4
$ mysql -h host -u user -p src_db -N -e "select * from src_tb;" > file_name.csv
# 只能通过*shell*查询并导出至文件
# 需要`file`权限
# `-N`:skip column names
1
2
load data local infile filename.csv;
# 指定`local`则从*client*读取文件,否则从*server*读取

大表分块迁移

  • 容易分块的字段
    • 自增id
    • 时间

注意

Hadoop HDFS

HDFS设计模式

  • 数据读取、写入

    • HDFS一般存储不可更新的文件,只能对文件进行数据追加 ,也不支持多个写入者的操作
    • 认为一次写入、多次读取是最高效的访问模式
    • namenode将metadata存储在内存中,所以文件系统所能 存储的文件总数受限于NameNode的内存
  • 适合模式

    • 每次分析都涉及数据集大部分数据甚至全部,因此读取整个 数据集的大部分数据、甚至全部,因此读取整个数据集的 时间比读取第一条记录时间延迟更重要
    • HDFS不适合要求地时间延迟的数据访问应用,HDFS是为 高数据吞吐量应用优化的,可能会提高时间延迟
  • 硬件:HDFS无需高可靠硬件,HDFS被设计为即使节点故障也能 继续运行,且不让用户察觉

数据块

和普通文件系统一样,HDFS同样也有块的概念,默认为64MB

  • HDFS上的文件也被划分为块大小的多个chunk,作为独立的存储 单元,但是HDFS中小于块大小的文件不会占据整个块空间

  • 对分布式文件系统块进行抽象的好处

    • 文件大小可以大于网络中任意一个磁盘的容量
    • 使用抽象块而非整个文件作为存储单元,简化了存储子系统 的设计
      • 简化存储管理,块大小固定,计算磁盘能存储块数目 相对容易
      • 消除了对元素见的顾虑,文件的元信息(权限等), 不需要和块一起存储,可由其他系统单独管理
  • 块非常适合用于数据备份,进而提供数据容错能力、提高可用性

NameNode

HDFS系统中的管理者

  • 集中存储了HDFS的元信息Metadata

    • 维护文件系统的文件树、全部的文件和文件夹的元数据
    • 管理文件系统的Namespace:创建、删除、修改、列出所有 文件、目录
  • 执行数据块的管理操作

    • 把文件映射到所有的数据块
    • 创建、删除数据块
    • 管理副本的Placement、Replication
  • 负责DataNode的成员管理

    • 接受DataNode的Registration
    • 接受DataNode周期性的Heart Beat

Hadoop上层模块,根据NameNode上的元信息,就可以知道每个数据块 有多少副本、存放在哪些节点上,据此分配计算任务,通过 Move Computation to Data,而不是移动数据本身,减少数据 移动开销,加快计算过程

Metadata的保存

为了支持高效的存取操作,NameNode把所有的元信息保存在主内存, 包括文件和数据块的命名空间、文件到数据块的映射、数据块副本 的位置信息。文件命名空间、文件到数据块的映射信息也会持久化 到NameNode本地文件系统

  • FsImage:命名空间镜像文件,保存整个文件系统命名空间、 文件到数据块的映射信息
  • EditLog:编辑日志文件,是一个Transaction Log文件,记录了 对文件系统元信息的所有更新操作:创建文件、改变文件的 Replication Factor

NameNode启动时,读取FsImage、EditLog文件,把EditLog的所有 事务日志应用到从FsImage文件中装载的旧版本元信息上,生成新的 FsImage并保存,然后截短EditLog

NameNode可恢复性

多个文件系统备份

备份文件系统元信息的持久化版本

  • 在NameNode写入元信息的持久化版本时,同步、atomic写入多个 文件系统(一般是本地磁盘、mount为本地目录的NFS)
Secondary NameNode

运行Secondary NameNode:负责周期性的使用EditLog更新 FsImage,保持EditLog在一定规模内

  • Seconadary NameNode保存FsImage、EditLog文件副本, 每个一段时间从NameNode拷贝FsImage,和EditLog文件进行 合并,然后把更新后的FsImage复制回NameNode

  • 若NameNode宕机,可以启动其他机器,从Secondary NameNode获得FsImage、EditLog,恢复宕机之前的最新的 元信息,当作新的NameNode,也可以直接作为主NameNode

  • Secondary NameNode保存的出状态总是滞后于主节点,需要 从NFS获取NameNode部分丢失的metadata

  • Secondary NameNode需要运行在另一台机器,需要和主 NameNode一样规模的CPU计算能力、内存,以便完成元信息 管理

想要从失效的NameNode恢复,需要启动一个拥有文件系统数据副本的 新NameNode,并配置DataNode和客户端以便使用新的NameNode

  • 将namespace的映像导入内存中
  • 重做编辑日志
  • 接收到足够多的来自DataNode的数据块报告,并退出安全模式

DataNode

HDFS中保存数据的节点

  • 数据被切割为多个数据块,以冗余备份的形式存储在多个 DataNode中,因此不需要再每个节点上安装RAID存储获得硬件上 可靠存储支持。DataNode之间可以拷贝数据副本,从而可以重新 平衡每个节点存储数据量、保证数据可靠性(保证副本数量)

  • DotaNode定期向NameNode报告其存储的数据块列表,以备使用者 通过直接访问DataNode获得相应数据

  • 所有NameNode和DataNode之间的通讯,包括DataNode的注册、 心跳信息、报告数据块元信息,都是由DataNode发起请求,由 NameNode被动应答和完成管理

HDFS高可用性

对于大型集群,NN冷启动需要30min甚至更长,因此Hadoop2.x中添加 对高可用性HA(high-availability)的支持

  • 配置Active-Standby NameNode

    • ANN失效后,SNN就会接管任务并开始服务,没有明显中断
    • ANN、SNN应该具有相同的硬件配置
  • NN之间需要通过高可用的共享存储(JounalNode)实现 Editlog共享

    • JN进程轻量,可以和其他节点部署在同一台机器
    • JN至少为3个,最好为奇数个,这样JN失效$(n-1)/2$个时 仍然可以正常工作
    • SNN接管工作后,将通读共享编辑日志直到末尾,实现与ANN 状态同步
  • DN需要同时向两个NN发送数据块处理报告,因为数据块映射信息 存储在NN内存中

  • 客户端需要使用特定机制处理NN失效问题,且机制对用户透明

  • 如果两个namenode同时失效,同样可以冷启动其他namenode, 此时情况就和no-HA模式冷启动类似

注意:HA模式下,不应该再次配置Secondary NameNode

Note that, in an HA cluster, the Standby NameNode also performs checkpoints of the namespace state, and thus it is not necessary to run a Secondary NameNode, CheckpointNode, or BackupNode in an HA cluster. In fact, to do so would be an error. This also allows one who is reconfiguring a non-HA-enabled HDFS cluster to be HA-enabled to reuse the hardware which they had previously dedicated to the Secondary NameNode.

Failover Controller

故障转移控制器系统中有一个新实体管理者管理namenode之间切换,

  • Failover Controller最初实现基于Zookeeper,可插拔

  • 每个namenode运行着一个Failover Controller,用于监视宿主 namenode是否失效(heart beat机制), 并在失效时进行故障 切换

    • 管理员也可以手动发起故障切换,称为平稳故障转移
  • 在非平稳故障切换时,无法确切知道失效namenode是否已经停止 运行,如网速慢、网络切割均可能激发故障转移,引入fencing 机制

    • 杀死namenode进程
    • 收回对共享存储目录权限
    • 屏蔽相应网络端口
    • STONITH:shoot the other node in the head,断电

联邦HDFS

NameNode在内存中保存文件系统中每个文件、数据块的引用关系, 所以对于拥有大量文件的超大集群,内存将成为系统扩展的瓶颈, 2.x中引入的联邦HDFS可以添加NameNode实现扩展

  • 每个NameNode维护一个namespace volume,包括命名空间的 元数据、命令空间下的文件的所有数据块、数据块池
  • namespace volume之间相互独立、不通信,其中一个NameNode 失效也不会影响其他NameNode维护的命名空间的可用性
  • 数据块池不再切分,因此集群中的DataNode需要注册到每个 NameNode,并且存储来自多个数据块池的数据块

Hadoop文件系统

Hadoop有一个抽象问的文件系统概念,HDFS只是其中的一个实现, Java抽象类org.apche.hadoop.fs.FileSystem定义了Hadoop中的 一个文件系统接口,包括以下具体实现

文件系统 URI方案 Java实现 描述
Local file fs.LocalFileSystem 使用客户端校验和本地磁盘文件系统,没有使用校验和文件系统RawLocalFileSystem
HDFS hdfs hdfs.DistributedFileSystem HDFS设计为与MapReduce结合使用实现高性能
HFTP hftp hdfs.HftpFileSystem 在HTTP上提供对HDFS只读访问的文件系统,通常与distcp结合使用,以实现在运行不同版本HDFS集群之间复制数据
HSFTP hsftp hdfs.HsftpFileSystem 在HTTPS上同以上
WebHDFS Webhdfs hdfs.web.WebHdfsFileSystem 基于HTTP,对HDFS提供安全读写访问的文件系统,为了替代HFTP、HFSTP而构建
HAR har fs.HarFileSystem 构建于其他文件系统之上,用于文件存档的文件系统,通常用于需要将HDFS中的文件进行存档时,以减少对NN内存的使用
hfs kfs fs.kfs.kosmosFileSystem CloudStore(前身为Kosmos文件系统)类似于HDFS(GFS),C++编写
FTP ftp fs.ftp.FTPFileSystem 由FTP服务器支持的文件系统
S3(原生) S3n fs.s3native.NativeS3FileSystem 由Amazon S3支持的文件系统
S3(基于块) S3 fs.sa.S3FileSystem 由Amazon S3支持的文件系统,以块格式存储文件(类似于HDFS),以解决S3Native 5GB文件大小限制
分布式RAID hdfs hdfs.DistributedRaidFileSystem RAID版本的HDFS是为了存档而设计的。针对HDFS中每个文件,创建一个更小的检验文件,并允许数据副本变为2,同时数据丢失概率保持不变。需要在集群中运行一个RaidNode后台进程
View viewfs viewfs.ViewFileSystem 针对其他Hadoop文件系统挂载的客户端表,通常用于联邦NN创建挂载点

文件系统接口

Hadoop对文件系统提供了许多接口,一般使用URI方案选择合适的 文件系统实例进行交互

命令行接口

1
2
3
4
5
6
7
8
$ hadoop fs -copyFromLocal file hdfs://localhost/user/xyy15926/file
# 调用Hadoop文件系统的shell命令`fs`
# `-copyFromLocalFile`则是`fs`子命令
# 事实上`hfds://localhost`可以省略,使用URI默认设置,即
# 在`core-site.xml`中的默认设置
# 类似的默认复制文件路径为HDFS中的`$HOME`

$ hadoop fs -copyToLocal file file

HTTP

  • 直接访问:HDFS后台进程直接服务来自于客户端的请求

    • 由NN内嵌的web服务器提供目录服务(默认50070端口)
    • DN的web服务器提供文件数据(默认50075端口)
  • 代理访问:依靠独立代理服务器通过HTTP访问HDFS

    • 代理服务器可以使用更严格的防火墙策略、贷款限制策略

C

Hadoop提供libhdfs的C语言库,是Java FileSystem接口类的 镜像

  • 被写成访问HDFS的C语言库,但其实可以访问全部的Hadoop文件 系统
  • 使用Java原生接口(JNI)调用Java文件系统客户端

FUSE

Filesystem in Userspace允许把按照用户空间实现的文件系统整合 成一个Unix文件系统

  • 使用Hadoop Fuse-DFS功能模块,任何一个Hadoop文件系统可以 作为一个标准文件系统进行挂载
    • Fuse_DFS使用C语言实现,调用libhdfs作为访问HDFS的接口
  • 然后可以使用Unix工具(lscat等)与文件系统交互
  • 还可以使用任何编程语言调用POSIX库访问文件系统

读文件

  1. 客户端程序使用要读取的文件名、Read Range的开始偏移量、 读取范围的程度等信息,询问NameNode

  2. NameNode返回落在读取范围内的数据块的Location信息,根据 与客户端的临近性(Proximity)进行排序,客户端一般选择 最临近的DataNode发送读取请求

具体实现如下

  1. 客户端调用FileSystem对象open方法,打开文件,获得 DistributedFileSystem类的一个实例

  2. DistributedFileSystem返回FSDataInputStream类的实例, 支持文件的定位、数据读取

    • DistributedFileSystem通过RPC调用NameNode,获得 文件首批若干数据块的位置信息(Locations of Blocks)
    • 对每个数据块,NameNode会返回拥有其副本的所有DataNode 地址
    • 其包含一个DFSInputStream对象,负责管理客户端对HDFS 中DataNode、NameNode存取
  3. 客户端从输入流调用函数read,读取文件第一个数据块,不断 调用read方法从DataNode获取数据

    • DFSInputStream保存了文件首批若干数据块所在的 DataNode地址,连接到closest DataNode
    • 当达到数据块末尾时,DFSInputStream关闭到DataNode 的连接,创建到保存其他数据块DataNode的连接
    • 首批数据块读取完毕之后,DFSInputStream向NameNode 询问、提取下一批数据块的DataNode的位置信息
  4. 客户端完成文件的读取,调用FSDataInputStream实例close 方法关闭文件

写文件

  • 客户端询问NameNode,了解应该存取哪些DataNode,然后客户端 直接和DataNode进行通讯,使用Data Transfer协议传输数据, 这个流式数据传输协议可以提高数据传输效率

  • 创建文件时,客户端把文件数据缓存在一个临时的本地文件上, 当本地文件累计超过一个数据块大小时,客户端程序联系 NameNode,NameNode更新文件系统的NameSpace,返回Newly Allocated数据块的位置信息,客户端根据此信息本文件数据块 从临时文件Flush到DataNode进行保存

具体实现如下:

  1. 客户端调用DistributedFileSystemcreate方法

    • DistributedFileSystem通过发起RPC告诉NameNode在 其NameSpace创建一个新文件,此时新文件没有任何数据块
    • NameNode检查:文件是否存在、客户端权限等,检查通过 NameNode为新文件创建新记录、保存其信息,否则文件创建 失败
  2. DistributedFileSystem返回FSDataOutputStream给客户端

    • 其包括一个DFSOutputStream对象,负责和NameNode、 DataNode的通讯
  3. 客户端调用FSDataOutputStream对象write方法写入数据

    • DFSOutputStream把数据分解为数据包Packet,写入内部 Data Queue
    • DataSteamer消费这个队列,写入本地临时文件中
    • 当写入数据超过一个数据块大小时,DataStreamer请求 NameNode为新的数据块分配空间,即选择一系列合适的 DataNode存放各个数据块各个副本
    • 存放各个副本的DataNode形成一个Pipeline,流水线上的 Replica Factor数量的DataNode接收到数据包之后转发给 下个DataNode
    • DFSOutputStream同时维护数据包内部Ack Queue,用于 等待接受DataNode应答信息,只有某个数据包已经被流水线 上所有DataNode应答后,才会从Ack Queue上删除
  4. 客户端完成数据写入,调用FSDataOutputStreamclose 方法

    • DFSOutputStream把所有的剩余的数据包发送到DataNode 流水线上,等待应答信息
    • 最后通知NameNode文件结束
    • NameNode自身知道文件由哪些数据块构成,其等待数据块 复制完成,然后返回文件创建成功

Hadoop平台上的列存储

列存储的优势

  • 更少的IO操作:读取数据的时候,支持Prject Pushdown,甚至 是Predicate Pushdown,可大大减少IO操作

  • 更大的压缩比:每列中数据类型相同,可以针对性的编码、压缩

  • 更少缓存失效:每列数据相同,可以使用更适合的Cpu Pipline 编码方式,减少CPU cache miss

RCFile

Record Columnar File Format:FB、Ohio州立、中科院计算所合作 研发的列存储文件格式,首次在Hadoop中引入列存储格式

  • 允许按行查询,同时提供列存储的压缩效率的列存储格式

    • 具备相当于行存储的数据加载速度、负载适应能力
    • 读优化可以在扫描表格时,避免不必要的数据列读取
    • 使用列维度压缩,有效提升存储空间利用率
  • 具体存储格式

    • 首先横向分割表格,生成多个Row Group,大小可以由用户 指定
    • 在RowGroup内部,按照列存储一般做法,按列把数据分开, 分别连续保存
      • 写盘时,RCFile针对每列数据,使用Zlib/LZO算法进行 压缩,减少空间占用
      • 读盘时,RCFile采用Lazy Decompression策略,即用户 查询只涉及表中部分列时,会跳过不需要列的解压缩、 反序列化的过程

ORC存储格式

Optimized Row Columnar File:对RCFile优化的存储格式

  • 支持更加丰富的数据类型

    • 包括Date Time、Decimal
    • Hive的各种Complex Type,包括:Struct、List、Map、 Union
  • Self Describing的列存储文件格式

    • 为Streaming Read操作进行了优化
    • 支持快速查找少数数据行
  • Type Aware的列存储文件格式

    • 文件写入时,针对不同的列的数据类型,使用不同的编码器 进行编码,提高压缩比
      • 整数类型:Variable Length Compression
      • 字符串类型:Dictionary Encoding
  • 引入轻量级索引、基本统计信息

    • 包括各数据列的最大/小值、总和、记录数等信息
    • 在查询过程中,通过谓词下推,可以忽略大量不符合查询 条件的记录

文件结构

一个ORC文件由多个Stripe、一个包含辅助信息的FileFooter、以及 Postscript构成

Stripe

每个stripe包含index data、row data、stripe footer

  • stripe就是ORC File中划分的row group

    • 默认大小为256MB,可扩展的长度只受HDFS约束
    • 大尺寸的strip、对串行IO的优化,能提高数据吞吐量、 读取更少的文件,同时能把减轻NN负担
  • Index Data部分

    • 包含每个列的极值
    • 一系列Row Index Entry记录压缩模块的偏移量,用于跳转 到正确的压缩块的位置,实现数据的快速读取,缺省可以 跳过10000行
  • Row Data部分;包含每个列的数据,每列由若干Data Stream 构成

  • Stripe Footer部分

    • Data Stream位置信息
    • 每列数据的编码方式

包含该ORCFile文件中所有stripe的元信息

  • 每个Stripe的位置
  • 每个Stripe的行数
  • 每列的数据类型
  • 还有一些列级别的聚集结果,如:记录数、极值、总和
Postscript
  • 用来存储压缩参数
  • 压缩过后的Footer的长度

Parquet

灵感来自于Google关于Drenel系统的论文,其介绍了一种支持嵌套 结构的列存储格式,以提升查询性能

支持

Parquet为hadoop生态系统中的所有项目,提供支持高压缩率的 列存储格式

  • 兼容各种数据处理框架

    • MapReduce
    • Spark
    • Cascading
    • Crunch
    • Scalding
    • Kite
  • 支持多种对象模型

    • Avro
    • Thrift
    • Protocol Buffers
  • 支持各种查询引擎

    • Hive
    • Impala
    • Presto
    • Drill
    • Tajo
    • HAWQ
    • IBM Big SQL

Parquet组件

  • Storage Format:存储格式,定义了Parquet内部的数据类型、 存储格式

  • Object Model Converter:对象转换器,由Parquet-mr实现, 完成对象模型与Parquet数据类型的映射

    • 如Parquet-pig子项目,负责把内存中的Pig Tuple序列化 并按存储格式保存为Parquet格式的文件,已经反过来, 把Parquet格式文件的数据反序列化为Pig Tuple
  • Object Model:对象模型,可以理解为内存中的数据表示,包括 Avro、Thrift、Protocal Buffer、Hive Serde、Pig Tuple、 SparkSQL Internal Row等对象模型

Parquet数据schema

数据schema(结构)可以用一个棵树表达

  • 有一个根节点,根节点包含多个Feild(子节点),子节点可以 包含子节点

  • 每个field包含三个属性

    • repetition:field出现的次数

      • required:必须出现1次
      • optional:出现0次或1次
      • repeated:出现0次或多次
    • type:数据类型

      • primitive:原生类惬
      • group:衍生类型
    • name:field名称

  • Parquet通过把多个schema结构按树结构组合,提供对复杂类型 支持

    • List、Set:repeated field
    • Map:包含键值对的Repeated Group(key为Required)
  • schema中有多少叶子节点,Parquet格式实际存储多少列, 父节点则是在表头组合成schema的结构

Parquet文件结构

  • HDFS文件:包含数据和元数据,数据存储在多个block中
  • HDFS Block:HDFS上最小的存储单位
  • Row Group:按照行将数据表格划分多个单元,每个行组包含 一定行数,行组包含该行数据各列对应的列块
    • 一般建议采用更大的行组(512M-1G),此意味着更大的 列块,有毅力在磁盘上串行IO
    • 由于可能依次需要读取整个行组,所以一般让一个行组刚好 在一个HDFS数据块中,HDFS Block需要设置得大于等于行组 大小
  • Column Chunk:每个行组中每列保存在一个列块中
    • 行组中所有列连续的存储在行组中
    • 不同列块使用不同压缩算法压缩
    • 列块存储时保存相应统计信息,极值、空值数量,用于加快 查询处理
    • 列块由多个页组成
  • Page:每个列块划分为多个Page
    • Page是压缩、编码的单元
    • 列块的不同页可以使用不同的编码方式

HDFS命令

用户

  • HDFS的用户就是当前linux登陆的用户

Hadoop组件

Hadoop Streaming

Hadoop安装配置

Hadoop安装

依赖

  • Java

  • ssh:必须安装且保证sshd一直运行,以便使用hadoop脚本管理 远端hadoop守护进程

    • pdsh:建议安装获得更好的ssh资源管理
    • 要设置免密登陆

机器环境配置

~/.bashrc

这里所有的设置都只是设置环境变量

  • 所以这里所有环境变量都可以放在hadoop-env.sh

  • 放在.bashrc中不是基于用户隔离的考虑

    • 因为hadoop中配置信息大部分放在.xml,放在这里无法 实现用户隔离
    • 更多的考虑是给hive等依赖hadoop的应用提供hadoop配置
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
export HADOOP_PREFIX=/opt/hadoop
# 自定义部分
# 此处是直接解压放在`/opt`目录下
export HADOOP_HOME=$HADOOP_PREFIX
export HADOOP_COMMON_HOME=$HADOOP_PREFIX
# hadoop common
export HADOOP_HDFS_HOME=$HADOOP_PREFIX
# hdfs
export HADOOP_MAPRED_HOME=$HADOOP_PREFIX
# mapreduce
export HADOOP_YARN_HOME=$HADOOP_PREFIX
# YARN
export HADOOP_CONF_DIR=$HADOOP_PREFIX/etc/hadoop

export HADOOP_COMMON_LIB_NATIVE_DIR=$HADOOP_HOME/lib/native
export HADOOP_OPTS="$HADOOP_OPTS -Djava.library.path=$HADOOP_COMMON_LIB_NATIVE_DIR"
# 这里`-Djava`间不能有空格

export CLASSPATH=$CLASS_PATH:$HADOOP_PREFIX/lib/*
export PATH=$PATH:$HADOOP_PREFIX/sbin:$HADOOP_PREFIX/bin

/etc/hosts

1
2
3
4
192.168.31.129 hd-master
192.168.31.130 hd-slave1
192.168.31.131 hd-slave2
127.0.0.1 localhost
  • 这里配置的ip地址是各个主机的ip,需要自行配置
  • hd-masterhd-slave1等就是主机ip-主机名映射
  • todo?一定需要在/etc/hostname中设置各个主机名称

firewalld

必须关闭所有节点的防火墙

1
2
$ sudo systemctl stop firewalld.service
$ sudo systemctl disable firewalld.service

文件夹建立

  • 所有节点都需要建立
1
2
$ mkdir tmp
$ mkdir -p hdfs/data hdfs/name

Hadoop配置

Hadoop全系列(包括hive、tez等)配置取决于以下两类配置文件

  • 只读默认配置文件

    • core-defualt.xml
    • hdfs-default.xml
    • mapred-default.xml
  • 随站点变化的配置文件

    • etc/hadoop/core-site.xml
    • etc/hadoop/hdfs-site.xml
    • etc/hadoop/mapred-site.xml
    • etc/hadoop/yarn-env.xml
  • 环境设置文件:设置随站点变化的值,从而控制bin/中的 hadoop脚本行为

    • etc/hadoop/hadoop-env.sh
    • etc/hadoop/yarn-env.sh
    • etc/hadoop/mapred-env.sh

    中一般是环境变量配置,补充在shell中未设置的环境变量

  • 注意

    • .xml配置信息可在不同应用的配置文件中继承使用, 如在tez的配置中可以使用core-site.xml${fs.defaultFS}变量

    • 应用会读取/执行相应的*_CONF_DIR目录下所有 .xml/.sh文件,所以理论上可以在etc/hadoop中存放 所以配置文件,因为hadoop是最底层应用,在其他所有应用 启动前把环境均已设置完毕???

Hadoop集群有三种运行模式

  • Standalone Operation
  • Pseudo-Distributed Operation
  • Fully-Distributed Operation

针对不同的运行模式有,hadoop有三种不同的配置方式

Standalone Operation

hadoop被配置为以非分布模式运行的一个独立Java进程,对调试有 帮助

  • 默认为单机模式,无需配置
测试
1
2
3
4
5
$ cd /path/to/hadoop
$ mkdir input
$ cp etc/hadoop/*.xml input
$ bin/hadoop jar share/hadoop/mapreduce/hadoop-mapreduce-examples-3.1.1.jar grep input output 'dfs[a-z.]+'
$ cat output/*

Pseudo-Distributed Operation

在单节点(服务器)上以所谓的伪分布式模式运行,此时每个Hadoop 守护进程作为独立的Java进程运行

core-site.xml
1
2
3
4
5
6
<configuration>
<property>
<name>fs.defaultFS</name>
<value>hdfs://localhost:9000</value>
</property>
</configuration>
hdfs-site.xml
1
2
3
4
5
6
<configuration>
<property>
<name>dfs.replication</name>
<value>1</value>
</property>
</configuration>
mapred-site.xml
1
2
3
4
5
6
7
8
9
10
11
12
13
<configuration>
<property>
<name>mapreduce.framework.name</name>
<value>yarn</value>
</property>
</configruration>

<configuration>
<property>
<name>mapreduce.application.classpath</name>
<value>$HADOOP_HOME/share/hadoop/mapreduce/*:$HADOOP_MAPRED_HOME/share/hadoop/mapreduce/lib/*</value>
</preperty>
</configruation>
yarn-site.xml
1
2
3
4
5
6
7
8
9
10
<configuration>
<property>
<name>yarn.nodemanager.aux-services</name>
<value>mapreduce_shuffle</value>
</property>
<property>
<name>yarn.nodemanager.env-whitelist</name>
<value>JAVA_HOME,HADOOP_COMMON_HOME,HADOOP_HDFS_HOME,HADOOP_CONF_DIR,CLASSPATH_PREPEND_DISTCACHE,HADOOP_YARN_HOME,HADOOP_MAPRED_HOME</value>
</property>
</configuration>

Fully-Distributed Operation

  • 单节点配置完hadoop之后,需要将其同步到其余节点
core-site.xml

模板:core-site.xml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
<configuration>
<property>
<name>fs.defaultFS</name>
<value>hdfs://hd-master:9000</value>
<description>namenode address</description>
</property>
<property>
<name>hadoop.tmp.dir</name>
<value>file:///opt/hadoop/tmp</value>
</property>
<property>
<name>io.file.buffer.size</name>
<value>131702</value>
</property>

<property>
<name>hadoop.proxyuser.root.hosts</name>
<value>*</value>
</property>
<property>
<name>hadoop.proxyuser.root.groups</name>
<value>*</value>
</property>
<!-- 为将用户`root`设置为超级代理,代理所有用户,如果是其他用户需要相应的将root修改为其用户名 -->
<!-- 是为hive的JDBCServer远程访问而设置,应该有其他情况也需要 -->
</configuration>
hdfs-site.xml

模板:hdfs-site.xml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
<configuration>
<property>
<name>dfs.namenode.secondary.http-address</name>
<value>hd-master:9001</value>
</property>
<property>
<name>dfs.namenode.name.dir</name>
<value>file:///opt/hadoop/hdfs/name</value>
<description>namenode data directory</description>
</property>
<property>
<name>dfs.datanode.data.dir</name>
<value>file:///opt/hadoop/hdfs/data</value>
<description>datanode data directory</description>
</property>
<property>
<name>dfs.replication</name>
<value>2</value>
<description>replication number</description>
</property>
<property>
<name>dfs.webhdfs.enabled</name>
<value>true</value>
</property>

<property>
<name>dfs.datanode.directoryscan.throttle.limit.ms.per.sec</name>
<value>1000</value>
</property>
<!--bug-->
</configuration>
yarn-site.xml
  • 模板:yarn-site.xml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
<configuration>
<property>
<name>yarn.resourcemanager.hostname</name>
<value>hd-master</value>
</property>
<property>
<name>yarn.resourcemanager.address</name>
<value>hd-master:9032</value>
</property>
<property>
<name>yarn.resourcemanager.scheduler.address</name>
<value>hd-master:9030</value>
</property>
<property>
<name>yarn.resourcemanager.resource-tracker.address</name>
<value>hd-master:9031</value>
</property>
<property>
<name>yarn.resourcemanager.admin.address</name>
<value>hd-master:9033</value>
</property>
<property>
<name>yarn.resourcemanager.webapp.address</name>
<value>hd-master:9099</value>
</property>

<!-- container -->
<property>
<name>yarn.scheduler.maximum-allocation-mb</name>
<value>512</value>
<description>maximum memory allocation per container</description>
</property>
<property>
<name>yarn.scheduler.minimum-allocation-mb</name>
<value>256</value>
<description>minimum memory allocation per container</description>
</property>
<!-- container -->

<!-- node -->
<property>
<name>yarn.nodemanager.resource.memory-mb</name>
<value>1024</value>
<description>maximium memory allocation per node</description>
</property>
<property>
<name>yarn.nodemanager.vmem-pmem-ratio</name>
<value>8</value>
<description>virtual memmory ratio</description>
</property>
<!-- node -->

<property>
<name>yarn.app.mapreduce.am.resource.mb</name>
<value>384</value>
</property>
<property>
<name>yarn.app.mapreduce.am.command-opts</name>
<value>-Xms128m -Xmx256m</value>
</property>

<property>
<name>yarn.nodemanager.vmem-check-enabled</name>
<value>false</value>
</property>

<property>
<name>yarn.nodemanager.resource.cpu-vcores</name>
<value>1</value>
</property>

<property>
<name>yarn.nodemanager.aux-services</name>
<value>mapreduce_shuffle</value>
</property>
<property>
<name>yarn.nodemanager.aux-services.mapreduce.shuffle.class</name>
<value>org.apache.hadoop.mapred.ShuffleHandler</value>
</property>
</configuration>
mapred-site.xml
  • 模板:mapred-site.xml.template
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
<configuration>
<property>
<name>mapreduce.framework.name</name>
<value>yarn</value>
<!--
<value>yarn-tez</value>
设置整个hadoop运行在Tez上,需要配置好Tez
-->
</property>
<property>
<name>mapreduce.jobhistory.address</name>
<value>hd-master:10020</value>
</property>
<property>
<name>mapreduce.jobhistory.webapp.address</name>
<value>hd-master:19888</value>
</property>

<!-- mapreduce -->
<property>
<name>mapreduce.map.memory.mb</name>
<value>256</value>
<description>memory allocation for map task, which should between minimum container and maximum</description>
</property>
<property>
<name>mapreduce.reduce.memory.mb</name>
<value>256</value>
<description>memory allocation for reduce task, which should between minimum container and maximum</description>
</property>
<!-- mapreduce -->

<!-- java heap size options -->
<property>
<name>mapreduce.map.java.opts</name>
<value>-Xms128m -Xmx256m</value>
</property>
<property>
<name>mapreduce.reduce.java.opts</name>
<value>-Xms128m -Xmx256m</value>
</property>
<!-- java heap size options -->

</configuration>
参数说明
  • yarn.scheduler.minimum-allocation-mb:container内存 单位,也是container分配的内存最小值

  • yarn.scheduler.maximum-allocation-mb:container内存 最大值,应该为最小值整数倍

  • mapreduce.map.memeory.mb:map task的内存分配

    • hadoop2x中mapreduce构建于YARN之上,资源由YARN统一管理
    • 所以maptask任务的内存应设置container最小值、最大值间
    • 否则分配一个单位,即最小值container
  • mapreduce.reduce.memeory.mb:reduce task的内存分配

    • 设置一般为map task的两倍
  • *.java.opts:JVM进程参数设置

    • 每个container(其中执行task)中都会运行JVM进程
    • -Xmx...m:heap size最大值设置,所以此参数应该小于 task(map、reduce)对应的container分配内存的最大值, 如果超出会出现physical memory溢出
    • -Xms...m:heap size最小值?#todo
  • yarn.nodemanager.vmem-pmem-ratio:虚拟内存比例

    • 以上所有配置都按照此参数放缩
    • 所以在信息中会有physical memory、virtual memory区分
  • yarn.nodemanager.resource.memory-mb:节点内存设置

    • 整个节点被设置的最大内存,剩余内存共操作系统使用
  • yarn.app.mapreduce.am.resource.mb:每个Application Manager分配的内存大小

主从文件

masters
  • 设置主节点地址,根据需要设置
1
hd-master
slaves
  • 设置从节点地址,根据需要设置
1
2
hd-slave1
hd-slave2

环境设置文件

  • 这里环境设置只是起补充作用,在~/.bashrc已经设置的 环境变量可以不设置
  • 但是在这里设置环境变量,然后把整个目录同步到其他节点, 可以保证在其余节点也能同样的设置环境变量
hadoop-env.sh

设置JAVA_HOME为Java安装根路径

1
JAVA_HOME=/opt/java/jdk
hdfs-env.sh

设置JAVA_HOME为Java安装根路径

1
JAVA_HOME=/opt/java/jdk
yarn-env.sh

设置JAVA_HOME为Java安装根路径

1
2
JAVA_HOME=/opt/java/jdk
JAVA_HEAP_MAX=Xmx3072m

初始化、启动、测试

HDFS
  • 格式化、启动

    1
    2
    3
    4
    5
    6
    $ hdfs namenode -format
    # 格式化文件系统
    $ start-dfs.sh
    # 启动NameNode和DataNode
    # 此时已可访问NameNode,默认http://localhost:9870/
    $ stop-dfs.sh
  • 测试

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29

    $ hdfs dfsadmin -report
    # 应该输出3个节点的情况

    $ hdfs dfs -mkdir /user
    $ hdfs dfs -mkdir /user/<username>
    # 创建执行MapReduce任务所需的HDFS文件夹
    $ hdfs dfs -mkdir input
    $ hdfs dfs -put etc/hadoop/*.xml input
    # 复制文件至分布式文件系统
    $ hadoop jar /opt/hadoop/share/hadoop/mapreduce/hadoop-mapreduce-examples-2.7.7.jar grep input output 'dfs[a-z]+'
    # 执行自带样例
    # 样例名称取决于版本

    $ hdfs dfs -get output outut
    $ cat output/*
    # 检查输出文件:将所有的输出文件从分布式文件系统复制
    # 至本地文件系统,并检查
    $ hdfs dfs -cat output/*
    # 或者之间查看分布式文件系统上的输出文件


    $ hadoop jar /opt/hadoop/share/hadoop/tools/lib/hadoop-streaming-2.7.7.jar \
    -input /path/to/hdfs_file \
    -output /path/to/hdfs_dir \
    -mapper "/bin/cat" \
    -reducer "/user/bin/wc" \
    -file /path/to/local_file \
    -numReduceTasks 1
YARN
1
2
3
4
5
$ sbin/start-yarn.sh
# 启动ResourceManger守护进程、NodeManager守护进程
# 即可访问ResourceManager的web接口,默认:http://localhost:8088/
$ sbin/stop-yarn.sh
# 关闭守护进程

其他

注意事项

  • hdfs namenode -format甚至可以在datanode节点没有java时 成功格式化

  • 没有关闭防火墙时,整个集群可以正常启动,甚至可以在hdfs里 正常建立文件夹,但是无法写入文件,尝试写入文件时报错

可能错误

节点启动不全
  • 原因

    • 服务未正常关闭,节点状态不一致
  • 关闭服务、删除存储数据的文件夹dfs/data、格式化namenode

文件无法写入

could only be replicated to 0 nodes instead of minReplication (=1). There are 2 datanode(s) running and 2 node(s) are excluded in this operation.

  • 原因

    • 未关闭防火墙
    • 存储空间不够
    • 节点状态不一致、启动不全
    • 在log里面甚至可能会出现一个连接超时1000ms的ERROR
  • 处理

    • 关闭服务、删除存储数据的文件夹dfs/data、格式化 namenode
      • 这样处理会丢失数据,不能用于生产环境
    • 尝试修改节点状态信息文件VERSION一致
      • ${hadoop.tmp.dir}
      • ${dfs.namenode.name.dir}
      • ${dfs.datanode.data.dir}
Unhealthy Node

1/1 local-dirs are bad: /opt/hadoop/tmp/nm-local-dir; 1/1 log-dirs are bad: /opt/hadoop/logs/userlogs

  • 原因:磁盘占用超过90%

常用命令

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
scp -r /opt/hadoop/etc/hadoop centos2:/opt/hadoop/etc
scp -r /opt/hadoop/etc/hadoop centos3:/opt/hadoop/etc
# 同步配置

scp /root/.bashrc centos2:/root
scp /root/.bashrc centos3:/root
# 同步环境

rm -r /opt/hadoop/tmp /opt/hadoop/hdfs
mkdir -p /opt/hadoop/tmp /opt/hadoop/hdfs
ssh centos2 rm -r /opt/hadoop/tmp /opt/hadoop/hdfs
ssh centos2 mkdir -p /opt/hadoop/tmp /opt/hadoop/hdfs/name /opt/hadoop/hdfs/data
ssh centos3 rm -r /opt/hadoop/tmp /opt/hadoop/hdfs/name /opt/hadoop/data
ssh centos3 mkdir -p /opt/hadoop/tmp /opt/hadoop/hdfs/name /opt/hadoop/hdfs/data
# 同步清除数据

rm -r /opt/hadoop/logs/*
ssh centos2 rm -r /opt/hadoop/logs/*
ssh centos3 rm -r /opt/hadoop/logs/*
# 同步清除log

Hive

依赖

  • hadoop:配置完成hadoop,则相应java等也配置完成
  • 关系型数据库:mysql、derby等

机器环境配置

~/.bashrc

1
2
3
4
5
export HIVE_HOME=/opt/hive
# self designed
export HIVE_CONF_DIR=$HIVE_HOME/conf
export PATH=$PATH:$HIVE_HOME/bin
export CLASSPATH=$CLASS_PATH:$HIVE_HOME/lib/*

文件夹建立

HDFS
1
2
3
4
$ hdfs dfs -rm -r /user/hive
$ hdfs dfs -mkdir -p /user/hive/warehouse /user/hive/tmp /user/hive/logs
# 这三个目录与配置文件中对应
$ hdfs dfs -chmod 777 /user/hive/warehouse /user/hive/tmp /user/hive/logs
FS
1
2
3
4
5
6
$ mkdir data
$ chmod 777 data
# hive数据存储文件夹
$ mkdir logs
$ chmod 777 logs
# log目录

Hive配置

XML参数

conf/hive-site.xml
  • 模板:conf/hive-default.xml.template
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
<property>
<name>javax.jdo.option.ConnectionURL</name>
<value>jdbc:mysql://hd-master:3306/metastore_db?createDatabaseIfNotExist=true</value>
</property>
<property>
<name>javax.jdo.option.ConnectionDriverName</name>
<value>org.mariadb.jdbc.Driver</value>
</property>
<property>
<name>javax.jdo.option.ConnectionUserName</name>
</value>hive</value>
</property>
<property>
<name>javax.jdo.option.ConnectionPassword</name>
<value>1234</value>
</property>
<property>
<name>hive.metastore.warehouse.dir</name>
<value>/user/hive/warehouse</value>
</property>
<property>
<name>hive.exec.scratchdir</name>
<value>/user/hive/tmp</value>
</property>

<!--
<property>
<name>hive.exec.local.scratchdir</name>
<value>${system:java.io.tmpdir}/${system:user.name}</value>
</property>
<property>
<name>hive.downloaded.resources.dir</name>
<valeu>${system:java.io.tmpdir}/${hive.session.id}_resources</value>
</property>
<property>«
<name>hive.server2.logging.operation.log.location</name>«
<value>${system:java.io.tmpdir}/${system:user.name}/operation_logs</value>«
<description>Top level directory where operation logs are stored if logging functionality is enabled</description>«
</property>«
所有`${system.java.io.tmpdir}`都要被替换为相应的`/opt/hive/tmp`,
可以通过设置这两个变量即可,基本是用于设置路径
-->

<property>
<name>system:java.io.tmpdir</name>
<value>/opt/hive/tmp</value>
</property>
<property>
<name>system:user.name</name>
<value>hive</value>
<property>

<!--
<property>
<name>hive.querylog.location</name>
<value>/user/hive/logs</value>
<description>Location of Hive run time structured log file</description>
</property>
这里应该不用设置,log放在本地文件系统更合适吧
-->

<property>
<name>hive.metastore.uris</name>
<value>thrift://192.168.31.129:19083</value>
</property>
<!--这个是配置metastore,如果配置此选项,每次启动hive必须先启动metastore,否则hive实可直接启动-->

<property>
<name>hive.server2.logging.operation.enabled</name>
<value>true</value>
</property>
<!-- 使用JDBCServer时需要配置,否则无法自行建立log文件夹,然后报错,手动创建可行,但是每次查询都会删除文件夹,必须查一次建一次 -->
  • /user开头的路径一般表示hdfs中的路径,而${}变量开头 的路径一般表示本地文件系统路径

    • 变量system:java.io.tmpdirsystem:user.name在 文件中需要自己设置,这样就避免需要手动更改出现这些 变量的地方
    • hive.querylog.location设置在本地更好,这个日志好像 只在hive启动时存在,只是查询日志,不是hive运行日志, hive结束运行时会被删除,并不是没有生成日志、${}表示 HDFS路径
  • 配置中出现的目录(HDFS、locaL)有些手动建立

    • HDFS的目录手动建立?
    • local不用
  • hive.metastore.uris若配置,则hive会通过metastore服务 访问元信息

    • 使用hive前需要启动metastore服务
    • 并且端口要和配置文件中一样,否则hive无法访问

环境设置文件

conf/hive-env.sh
  • 模板:conf/hive-env.sh.template
1
2
3
4
5
export JAVA_HOME=/opt/java/jdk
export HADOOP_HOME=/opt/hadoop
export HIVE_CONF_DIR=/opt/hive/conf
# 以上3者若在`~/.bashrc`中设置,则无需再次设置
export HIVE_AUX_JARS_PATH=/opt/hive/lib
conf/hive-exec-log4j2.properties
  • 模板:hive-exec-log4j2.properties.template

    1
    2
    3
    property.hive.log.dir=/opt/hive/logs
    # 原为`${sys:java.io.tmpdir}/${sys:user.name}`
    # 即`/tmp/root`(root用户执行)
conf/hive-log4j2.properties
  • 模板:hive-log4j2.properties.template

MetaStore

MariaDB
  • 安装MariaDB

  • 修改MariaDB配置

    1
    $ cp /user/share/mysql/my-huge.cnf /etc/my.cnf
  • 创建用户,注意新创建用户可能无效,见mysql配置

    • 需要注意用户权限:创建数据库权限、修改表权限
    • 初始化时Hive要自己创建数据库(hive-site中配置), 所以对权限比较严格的环境下,可能需要先行创建同名 数据库、赋权、删库
  • 下载mariadb-java-client-x.x.x-jar包,复制到lib

初始化数据库
1
$ schematool -initSchema -dbType mysql

这个命令要在所有配置完成之后执行

服务设置

1
2
3
4
5
6
7
8
9
10
11
$ hive --service metastore -p 19083 &
# 启动metastore服务,端口要和hive中的配置相同
# 否则hive无法连接metastore服务,无法使用
# 终止metastore服务只能根据进程号`kill`
$ hive --service hiveserver2 --hiveconf hive.server2.thrift.port =10011 &
# 启动JDBC Server
# 此时可以通过JDBC Client(如beeline)连接JDBC Server对
# Hive中数据进行操作
$ hive --service hiveserver2 --stop
# 停止JDBC Server
# 或者直接kill

测试

Hive可用性

需要先启动hdfs、YARN、metastore database(mysql),如果有 设置独立metastore server,还需要在正确端口启动

1
2
3
4
5
6
hive>	create table if not exists words(id INT, word STRING)
row format delimited fields terminated by " "
lines terminated by "\n";
hive> load data local inpath "/opt/hive-test.txt" overwrite into
table words;
hive> select * from words;
JDBCServer可用性
  • 命令行连接

    1
    $ beeline -u jdbc:hive2://localhost:10011 -n hive -p 1234
  • beeline中连接

    1
    2
    3
    $ beeline
    beeline> !connect jdbc:hive2://localhost:10011
    # 然后输入用户名、密码(metastore数据库用户名密码)

其他

可能错误

Failed with exception Unable to move source file

  • linux用户权限问题,无法操作原文件
  • hdfs用户权限问题,无法写入目标文件
  • hdfs配置问题,根本无法向hdfs写入:参见hdfs问题

org.apache.hive.service.cli.HiveSQLException: Couldn’t find log associated with operation handle:

  • 原因:hiveserver2查询日志文件夹不存在

  • 可以在hive中通过

    1
    $ set hive.server2.logging.operation.log.location;

    查询日志文件夹,建立即可,默认为 ${system:java.io.tmpdir}/${system:user.name}/operation_logs ,并设置权限为777

    • 好像如果不设置权限为777,每次查询文件夹被删除,每 查询一次建立一次文件夹?#todo
    • hive-sitex.xml中配置允许自行创建?

User: root is not allowed to impersonate hive

  • 原因:当前用户(不一定是root)不被允许通过代理操作 hadoop用户、用户组、主机

    • hadoop引入安全伪装机制,不允许上层系统直接将实际用户 传递给超级代理,此代理在hadoop上执行操作,避免客户端 随意操作hadoop
  • 配置hadoop的core-site.xml,使得当前用户作为超级代理

Tez

依赖

  • hadoop

机器环境配置

.bashrc

1
2
3
4
5
6
7
8
9
10
11
12
13
14
export TEZ_HOME=/opt/tez
export TEZ_CONF_DIR=$TEZ_HOME/conf

for jar in `ls $TEZ_HOME | grep jar`; do
export HADOOP_CLASSPATH=$HADOOP_CLASSPATH:$TEZ_HOME/$jar
done
for jar in `ls $TEZ_HOME/lib`; do
export HADOOP_CLASSPATH=$HADOOP_CLASSPATH:$TEZ_HOME/lib/$jar
done
# this part could be replaced with line bellow
export HADOOP_CLASSPATH=$HADOOP_CLASSPATH:$TEZ_HOME/*:$TEZ_HOME/lib/*
# `hadoop-env.sh`中说`HADOOP_CLASSPATH`是Extra Java CLASSPATH
# elements
# 这意味着hadoop组件只需要把其jar包加到`HADOOP_CLASSPATH`中既可

HDFS

  • 上传$TEZ_HOME/share/tez.tar.gz至HDFS中

    1
    2
    $ hdfs dfs -mkdir /apps
    $ hdfs dfs -copyFromLocal tez.tar.gz /apps

HadoopOnTez

在hadoop中配置Tez

  • 侵入性较强,对已有的hadoop集群全体均有影响

  • 所有hadoop集群执行的MapReduce任务都通过tez执行

    • 这里所有的任务应该是指直接在hadoop上执行、能在 webRM上看到的任务
    • hive这样的独立组件需要独立配置

XML参数

tez-site.xml
  • 模板:conf/tez-default-tmplate.xml
  • 好像还是需要复制到hadoop的配置文件夹中
1
2
3
4
5
6
7
8
9
10
11
<property>
<name>tez.lib.uris</name>
<value>${fs.defaultFS}/apps/tez.tar.gz</value>
<!--设置tez安装包位置-->
</property>
<!--
<property>
<name>tez.container.max.java.heap.fraction</name>
<value>0.2</value>
<property>
内存不足时-->
mapred-site.xml
  • 修改mapred-site.xml文件:配置mapreduce基于yarn-tez, (配置修改在hadoop部分也有)
1
2
3
4
<property>
<name>mapreduce.framework.name</name>
<value>yarn-tez</value>
</property>

环境参数

HiveOnTez

  • 此模式下Hive可以在mapreduce、tez计算模型下自由切换?

    1
    2
    3
    4
    5
    hive> set hive.execution.engine=tez;
    # 切换查询引擎为tez
    hive> set hive.execution.engine=mr;
    # 切换查询引擎为mapreduce
    # 这些命令好像没用,只能更改值,不能更改实际查询模型
  • 只有Hive会受到影响,其他基于hadoop平台的mapreduce作业 仍然使用tez计算模型

Hive设置

  • 若已经修改了mapred-site.xml设置全局基于tez,则无需复制 jar包,直接修改hive-site.xml即可
Jar包复制

复制$TEZ_HOME$TEZ_HOME/lib下的jar包到$HIVE_HOME/lib 下即可

hive-site.xml
1
2
3
4
<property>
<name>hive.execution.engine</name>
<value>tez</value>
</property>

其他

可能错误

SLF4J: Class path contains multiple SLF4J bindings.

  • 原因:包冲突的
  • 解决方案:根据提示冲突包删除即可

Spark

依赖

  • java
  • scala
  • python:一般安装anaconda,需要额外配置
    1
    2
    export PYTHON_HOME=/opt/anaconda3
    export PATH=$PYTHON_HOME/bin:$PATH
  • 相应资源管理框架,如果不以standalone模式运行

机器环境配置

~/.bashrc

1
2
3
4
5
6
7
export SPARK_HOME=/opt/spark
export PATH=$PATH:$SPARK_HOME/bin:$SPARK_HOME/sbin
export PYTHON_PATH=$PYTHON_PATH:$SPARK_HOME/python:$SPARK_HOME/python/lib/*
# 把`pyshark`、`py4j`模块对应的zip文件添加进路径
# 这里用的是`*`通配符应该也可以,手动添加所有zip肯定可以
# 否则无法在一般的python中对spark进行操作
# 似乎只要master节点有设置`/lib/*`添加`pyspark`、`py4j`就行

Standalone

环境设置文件

conf/spark-env.sh
  • 模板:conf/spark-env.sh.template

这里应该有些配置可以省略、移除#todo

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
export JAVA_HOME=/opt/jdk
export HADOOP_HOME=/opt/hadoop
export hADOOP_CONF_DIR=/opt/hadoop/etc/hadoop
export HIVE_HOME=/opt/hive

export SCALA_HOME=/opt/scala
export SCALA_LIBRARY=$SPARK_HOME/lib
# `~/.bashrc`设置完成之后,前面这段应该就这个需要设置

export SPARK_HOME=/opt/spark
export SPARK_DIST_CLASSPATH=$(hadoop classpath)
# 这里是执行命令获取classpath
# todo
# 这里看文档的意思,应该也是类似于`$HADOOP_CLASSPATH`
# 可以直接添加进`$CLASSPATH`而不必设置此变量
export SPARK_LIBRARY_PATH=$SPARK_HOME/lib

export SPARK_MASTER_HOST=hd-master
export SPARK_MASTER_PORT=7077
export SPARK_MASTER_WEBUI_PORT=8080
export SPARK_WORKER_WEBUI_PORT=8081
export SPARK_WORKER_MEMORY=1024m
# spark能在一个container内执行多个task
export SPARK_LOCAL_DIRS=$SPARK_HOME/data
# 需要手动创建

export SPARK_MASTER_OPTS=
export SPARK_WORKER_OPTS=
export SPARK_DAEMON_JAVA_OPTS=
export SPARK_DAEMON_MEMORY=
export SPARK_DAEMON_JAVA_OPTS=
文件夹建立
1
2
$ mkdir /opt/spark/spark_data
# for `$SPARK_LOCAL_DIRS`

Spark配置

conf/slaves

文件不存在,则在当前主机单节点运行

  • 模板:conf/slaves.template
1
2
hd-slave1
hd-slave2
conf/hive-site.xml

这里只是配置Spark,让Spark作为“thrift客户端”能正确连上 metastore server

  • 模板:/opt/hive/conf/hive-site.xml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
<property>
<name>hive.metastore.uris</name>
<value>thrift://192.168.31.129:19083</value>
<description>Thrift URI for the remote metastor. Used by metastore client to connect to remote metastore</description>
</property>
<property>
<name>hive.server2.thrift.port</name>
<value>10011</value>
</property>
<!--配置spark对外界thrift服务,以便可通过JDBC客户端存取spark-->
<!--这里启动端口同hive的配置,所以两者不能默认同时启动-->
<property>
<name>hive.server2.thrift.bind.host</name>
<value>hd-master</value>
</property>
</configuration>

测试

启动Spark服务

需要启动hdfs、正确端口启动的metastore server

1
2
3
4
5
6
7
8
9
10
$ start-master.sh
# 在执行**此命令**机器上启动master实例
$ start-slaves.sh
# 在`conf/slaves`中的机器上启动worker实例
$ start-slave.sh
# 在执行**此命令**机器上启动worker实例

$ stop-master.sh
$ stop-slaves.sh
$ stop-slave.sh
启动Spark Thrift Server
1
2
3
4
5
6
7
$ start-thriftserver.sh --master spark://hd-master:7077 \
--hiveconf hive.server2.thrift.bind.host hd-master \
--hiveconf hive.server2.thrift.port 10011
# 这里在命令行启动thrift server时动态指定host、port
# 如果在`conf/hive-site.xml`有配置,应该不需要

# 然后使用beeline连接thrift server,同hive
Spark-Sql测试
1
2
3
4
5
6
$ spark-sql --master spark://hd-master:7077
# 在含有配置文件的节点上启动时,配置文件中已经指定`MASTER`
# 因此不需要指定后面配置

spark-sql> set spark.sql.shuffle.partitions=20;
spark-sql> select id, count(*) from words group by id order by id;
pyspark测试
1
2
3
4
5
6
7
8
9
10
11
12
13
14
$ MASTER=spark://hd-master:7077 pyspark
# 这里应该是调用`$PATH`中第一个python,如果未默认指定

from pyspark.sql import HiveContext
sql_ctxt = HiveContext(sc)
# 此`sc`是pyspark启动时自带的,是`SparkContext`类型实例
# 每个连接只能有一个此实例,不能再次创建此实例

ret = sql_ctxt.sql("show tables").collect()
# 这里语句结尾不能加`;`

file = sc.textFile("hdfs://hd-master:9000/user/root/input/capacity-scheduler.xml")
file.count()
file.first()
Scala测试
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
$ MASTER=spark://hd-master:7077 spark-shell \
executor-memory 1024m \
--total-executor-cores 2 \
--excutor-cores 1 \
# 添加参数启动`spark-shell`

import org.apache.spark.sql.SQLContext
val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc)
sqlContext.sql("select * from words").collect().foreach(println)
sqlContext.sql("select id, word from words order by id").collect().foreach(println)

sqlContext.sql("insert into words values(7, \"jd\")")
val df = sqlContext.sql("select * from words");
df.show()

var df = spark.read.json("file:///opt/spark/example/src/main/resources/people.json")
df.show()

Spark on YARN

其他

可能错误

Initial job has not accepted any resources;

  • 原因:内存不足,spark提交application时内存超过分配给 worker节点内存

  • 说明

    • 根据结果来看,pysparkspark-sql需要内存比 spark-shell少? (设置worker内存512m,前两者可以正常运行)
    • 但是前两者的内存分配和scala不同,scala应该是提交任务 、指定内存大小的方式,这也可以从web-ui中看出来,只有 spark-shell开启时才算是application
  • 解决方式

    • 修改conf/spark-env.shSPARK_WORKER_MEMORY更大, (spark默认提交application内存为1024m)
    • 添加启动参数--executor-memory XXXm不超过分配值
ERROR KeyProviderCache:87 - Could not find uri with key [dfs.encryption.key.provider.uri] to create a keyProvider
  • 无影响

HBase

依赖

  • java
  • hadoop
  • zookeeper:建议,否则日志不好管理

机器环境

~/.bashrc

1
2
3
export HBASE_HOME=/opt/hbase
export PATH=$PAHT:$HBASE_HOME/bin
export HADOOP_CLASSPATH=$HADOOP_CLASSPATH:$HBASE_HOME/lib/*

建立目录

1
$ mkdir /tmp/hbase/tmpdir

HBase配置

环境变量

conf/hbase-env.sh
1
2
export HBASE_MANAGES_ZK=false
# 不使用自带zookeeper
conf/zoo.cfg

若设置使用独立zookeeper,需要复制zookeeper配置至HBase配置 文件夹中

1
$ cp /opt/zookeeper/conf/zoo.cfg /opt/hbase/conf

Standalone模式

conf/hbase-site.xml
1
2
3
4
5
6
7
8
9
10
<configuration>
<property>
<name>hbase.rootdir</name>
<value>file://${HBASE_HOME}/data</value>
</property>
<property>
<name>hbase.zookeeper.property.dataDir</name>
<value>/tmp/zookeeper/zkdata</value>
</property>
</configuration>

Pseudo-Distributed模式

conf/hbase-site.xml
  • 在Standalone配置上修改
1
2
3
4
5
6
7
8
<proeperty>
<name>hbase.cluster.distributed</name>
<value>true</value>
</property>
<property>
<name>hbase.rootdir</name>
<value>hdfs://hd-master:9000/hbase</value>
</property>

Fully-Distributed模式

conf/hbase-site.xml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
<property>
<name>hbase.rootdir</name>
<value>hdfs://hd-master:9000/hbase</value>
</property>
<property>
<name>hbase.cluster.distributed</name>
<value>true</name>
</property>
<property>
<name>hbase.zookeeper.quorum</name>
<value>hd-master,hd-slave1,hd-slave2</value>
</property>
<property>
<name>hbase.zookeeper.property.dataDir</name>
<value>/tmp/zookeeper/zkdata</value>
</property>

测试

  • 需要首先启动HDFS、YARN
  • 使用独立zookeeper还需要先行在每个节点启动zookeeper
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
$ start-hbase.sh
# 启动HBase服务
$ local-regionservers.sh start 2 3 4 5
# 启动额外的4个RegionServer
$ hbase shell
hbase> create 'test', 'cf'
hbase> list 'test'
hbase> put 'test', 'row7', 'cf:a', 'value7a'
put 'test', 'row7', 'cf:b', 'value7b'
put 'test', 'row7', 'cf:c', 'value7c'
put 'test', 'row8', 'cf:b', 'value8b',
put 'test', 'row9', 'cf:c', 'value9c'
hbase> scan 'test'
hbase> get 'test', 'row7'
hbase> disable 'test'
hbase> enable 'test'
hbaee> drop 'test'
hbase> quit

Zookeeper

依赖

  • java

  • 注意:zookeeper集群中工作超过半数才能对外提供服务,所以 一般配置服务器数量为奇数

机器环境

~/.bashrc

1
2
3
export ZOOKEEPER_HOME=/opt/zookeeper
export PATH=$PATH:$ZOOKEEPER_HOME/bin
export HADOOP_CLASSPATH=$HADOOP_CLASSPATH:$ZOOKEEPER_HOME/lib

创建文件夹

  • 在所有节点都需要创建相应文件夹、myid文件
1
2
3
4
5
6
7
mkdir -p /tmp/zookeeper/zkdata /tmp/zookeeper/zkdatalog
echo 0 > /tmp/zookeeper/zkdatalog/myid

ssh centos2 mkdir -p /tmp/zookeeper/zkdata /tmp/zookeeper/zkdatalog
ssh centos3 mkdir -p /tmp/zookeeper/zkdata /tmp/zookeeper/zkdatalog
ssh centos2 "echo 2 > /tmp/zookeeper/zkdata/myid"
ssh centos3 "echo 3 > /tmp/zookeeper/zkdata/myid"

Zookeeper配置

Conf

conf/zoo.cfg
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
tickTime=2000
# The number of milliseconds of each tick
initLimit=10
# The number of ticks that the initial
# synchronization phase can take
syncLimit=5
# The number of ticks that can pass between
# sending a request and getting an acknowledgement
dataDir=/tmp/zookeeper/zkdata
dataLogDir=/tmp/zookeeper/zkdatalog
# the directory where the snapshot is stored.
# do not use /tmp for storage, /tmp here is just
# example sakes.
clientPort=2181
# the port at which the clients will connect

autopurge.snapRetainCount=3
# Be sure to read the maintenance section of the
# administrator guide before turning on autopurge.
# http://zookeeper.apache.org/doc/current/zookeeperAdmin.html#sc_maintenance
# The number of snapshots to retain in dataDir
autopurge.purgeInterval=1
# Purge task interval in hours
# Set to "0" to disable auto purge feature

server.0=hd-master:2888:3888
server.1=hd-slave1:2888:3888
server.2=hd-slave2:2888:3888
# Determine the zookeeper servers
# fromation: server.NO=HOST:PORT1:PORT2
# PORT1: port used to communicate with leader
# PORT2: port used to reelect leader when current leader fail
$dataDir/myid
  • $dataDirconf/zoo.cfg中指定目录
  • myid文件里就一个id,指明当前zookeeper server的id,服务 启动时读取文件确定其id,需要自行创建
1
0

启动、测试、清理

启动zookeeper

1
2
3
4
5
6
7
8
9
$ zkServer.sh start
# 开启zookeeper服务
# zookeeper服务要在各个节点分别手动启动

$ zkServer.sh status
# 查看服务状态

$ zkCleanup.sh
# 清理旧的快照、日志文件

Flume

依赖

  • java

机器环境配置

~/.bashrc

1
export PATH=$PATH:/opt/flume/bin

Flume配置

环境设置文件

conf/flume-env.sh
  • 模板:conf/flume-env.sh.template
1
JAVA_HOME=/opt/jdk

Conf文件

conf/flume.conf
  • 模板:conf/flume-conf.properties.template
1
2
3
4
5
6
7
8
9
10
11
12
13
14
agent1.channels.ch1.type=memory
# define a memory channel called `ch1` on `agent1`
agent1.sources.avro-source1.channels=ch1
agent1.sources.avro-source1.type=avro
agent1.sources.avro-source1.bind=0.0.0.0
agent1.sources.avro-source1.prot=41414
# define an Avro source called `avro-source1` on `agent1` and tell it
agent1.sink.log-sink1.channels=ch1
agent1.sink.log-sink1.type=logger
# define a logger sink that simply logs all events it receives
agent1.channels=ch1
agent1.sources=avro-source1
agent1.sinks=log-sink1
# Finally, all components have been defined, tell `agent1` which one to activate

启动、测试

1
2
3
4
5
6
7
8
9
10
11
$ flume-ng agent --conf /opt/flume/conf \
-f /conf/flume.conf \
-D flume.root.logger=DEBUG,console \
-n agent1
# the agent name specified by -n agent1` must match an agent name in `-f /conf/flume.conf`

$ flume-ng avro-client --conf /opt/flume/conf \
-H localhost -p 41414 \
-F /opt/hive-test.txt \
-D flume.root.logger=DEBUG, Console
# 测试flume

其他

Kafka

依赖

  • java
  • zookeeper

机器环境变量

~/.bashrc

1
2
export PATH=$PATH:/opt/kafka/bin
export KAFKA_HOME=/opt/kafka

多brokers配置

Conf

config/server-1.properties
  • 模板:config/server.properties
  • 不同节点broker.id不能相同
  • 可以多编写几个配置文件,在不同节点使用不同配置文件启动
1
2
3
broker.id=0
listeners=PLAINTEXT://:9093
zookeeper.connect=hd-master:2181, hd-slave1:2181, hd-slave2:2181

测试

  • 启动zookeeper
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
$ kafka-server-start.sh /opt/kafka/config/server.properties &
# 开启kafka服务(broker)
# 这里是指定使用单个默认配置文件启动broker
# 启动多个broker需要分别使用多个配置启动多次
$ kafka-server-stop.sh /opt/kafka/config/server.properties

$ kafka-topics.sh --create --zookeeper localhost:2181 \
--replication-factor 1 \
--partitions 1 \
--topic test1
# 开启话题
$ kafka-topics.sh --list zookeeper localhost:2181
#
$ kafka-topics.shd --delete --zookeeper localhost:2181
--topic test1
# 关闭话题

$ kafka-console-producer.sh --broker-list localhost:9092 \
--topic test1
# 新终端开启producer,可以开始发送消息

$ kafka-console-consumer.sh --bootstrap-server localhost:9092 \
--topic test1 \
--from-beginning

$ kafka-console-consumer.sh --zookeeper localhost:2181 \
--topic test1 \
--from beginning
# 新终端开启consumer,可以开始接收信息
# 这个好像是错的

其他

Storm

依赖

  • java
  • zookeeper
  • python2.6+
  • ZeroMQ、JZMQ

机器环境配置

~/.bashrc

1
2
export STORM_HOME=/opt/storm
export PAT=$PATH:$STORM_HOME/bin

Storm配置

配置文件

conf/storm.yaml
  • 模板:conf/storm.yarml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
storm.zookeeper.servers:
-hd-master
-hd-slave1
-hd-slave2
storm.zookeeper.port: 2181

nimbus.seeds: [hd-master]
storm.local.dir: /tmp/storm/tmp
nimbus.host: hd-master
supervisor.slots.ports:
-6700
-6701
-6702
-6703

启动、测试

1
2
3
4
5
6
7
8
9
10
11
12
13
14
storm nimbus &> /dev/null &
storm logviewer &> /dev/null &
storm ui &> /dev/null &
# master节点启动nimbus

storm sueprvisor &> /dev/null &
storm logviewer &> /dev/nulla &
# worker节点启动


storm jar /opt/storm/example/..../storm-start.jar \
storm.starter.WordCountTopology
# 测试用例
stom kill WordCountTopology

http://hadoop.apache.org/docs/r3.1.1