Spark Core

Spark特性

数据处理速度快

得益于Spark的内存处理技术、DAG执行引擎

内存计算

  • Spark尽量把数据(中间结果等)驻留在内存中,必要时才写入 磁盘,避免I/O操作,提高处理效率

  • 支持保存部分数据在内存中,剩下部分保存在磁盘中

  • 数据完全驻留于内存时,数据处理达到hadoop系统的 几十~上百倍,数据存在磁盘上时,处理速度能够达到hadoop的 10倍左右

DAG执行引擎

  • Spark执行任务前,根据任务之间依赖关系生成DAG图,优化数据 处理流程(减少任务数量)、减少I/O操作

  • 除了简单的map、reduce,Spark提供了超过80个数据处理的 Operator Primitives

  • 对于数据查询操作,Spark采用Lazy Evaluation方式执行, 帮助优化器对整个数据处力工作流进行优化

易用性/API支持

  • Spark使用Scala编写,经过编译后在JVM上运行

  • 支持各种编程语言,提供了简洁、一致的编程接口

    • Scala
    • Java
    • Python
    • Clojure
    • R

通用性

  • Spark是通用平台,支持以DAG(有向无环图)形式表达的复杂 数据处理流程,能够对数据进行复杂的处理操作,而不用将复杂 任务分解成一系列MapReduce作业

  • Spark生态圈DBAS(Berkely Data Analysis Stack)包含组件, 支持批处理、流数据处理、图数据处理、机器学习

兼容性

  • DataStorage

    • 一般使用HDFS、Amazon S3等分布式系统存储数据
    • 支持Hive、Hbase、Cassandra等数据源
    • 支持Flume、Kafka、Twitter等流式数据
  • Resource Management

    • 能以YARN、Mesos等分布式资源管理框架为资源管理器
    • 也可以使用自身资源的管理器以Standalone Mode独立运行
  • 使用支持

    • 可以使用shell程序,交互式的对数据进行查询
    • 支持流处理、批处理
  • 数据类型、计算表达能力

    • Spark可以管理各种类型的数据集:文本

Spark架构

核心组件

  • Spark StreamingSpark SQLSpark GraphXSpark MLLib为BDAS所包含的组件
  • Spark Streaming:提供对实时数据流高吞吐、高容错、可 扩展的流式处理系统

    • 采用Micro Batch数据处理方式,实现更细粒度资源分配, 实现动态负载均衡
    • 可以对多种数据源(Kafka、Flume、Twitter、ZeroMQ),进行 包括map、reduce、join等复杂操作
  • Spark SQL:结构化数据查询模块

    • 通过JDBC API暴露Spark数据集,让客户程序可以在其上 直接执行SQL查询
    • 可以连接传统的BI、可视化工具至数据集
    • 前身Shark即为Hive on Spark,后出于维护、优化、 性能考虑放弃
  • Spark GraphX:图数据的并行处理模块

    • 扩展RDD为Resilient Distributed Property Graph, 将属性赋予各个节点、边的有向图
    • 可利用此模块对图数据进行ExploratoryAnalysis、Iterative Graph Computation
  • Spark MLLib:可扩展的机器学习模块

    • 大数据平台使得在全量数据上进行学习成为可能
    • 实现包括以下算法
      • Classification
      • Regression
      • Clustering
      • Collaborative Filtering
      • Dimensionality Reduction

周围组件

  • BlinkDB:近似查询处理引擎

    • 可以在大规模数据集上,交互式执行SQL查询
    • 允许用户在查询精度、响应时间之间做出折中
      • 用户可以指定查询响应时间、查询结果精度要求之一
      • BlinkDB在Data Sample上执行查询,获得近似结果
      • 查询结果会给Error Bar标签,帮助决策
  • Tachyon:基于内存的分布式文件系统

    • 支持不同处理框架
      • 可在不同计算框架之间实现可靠的文件共享
      • 支持不同的上层查询处理框架,可以以极高的速度对集群 内存中的文件进行访问
    • 将workset文件缓存在内存中,避免对磁盘的存取,如果数据集 不断被扫描、处理,数据处理速度将极大提升

Spark实体

spark_entities

  • Spark Context:负责和CM的交互、协调应用

    • 所有的Spark应用作为独立进程运行,由各自的SC协调
    • SC向CM申请在集群中worker创建executor执行应用
  • Driver:执行应用主函数、创建Spark Context的节点

  • Worker:数据处理的节点

  • Cluster Manager:为每个driver中的应用分配资源

    • 以下3种资源管理器,在sceduling、security、monitoring 有所不同,根据需要选择不同的CM
      • Standalone
      • Mesos
      • YARN
    • CM对Spark是agnostic

Spark Context

spark.SparkContext

1
2
3
4
5
6
7
8
9
10
11
class SparkConf{
// 设置Spark应用名
def setAppName(appName: String)

// 设置集群地址:yarn master节点地址、"local[4]"本地standalone
def setMaster(master: String)
}
class SparkContext(?conf: SparkConf){
// 将driver中节点分块
def parallelize(?val: ?AnyRef, ?numPartition: Int)
}
  • SparkContext是Spark应用执行环境封装,任何应用都需要、 也只能拥有一个活动实例,有些shell可能默认已经实例化
1
2
3
4
5
import org.apache.spark.{SparkConf, SparkContext}

val conf = new SparkConf().setAppName("app name")
.setMaster("local[4]")
val sc = new SparkContext(conf)

Share Variable

共享变量:可以是一个值、也可以是一个数据集,Spark提供了两种 共享变量

Broadcast Variable

广播变量:缓存在各个节点上,而不随着计算任务调度的发送变量 拷贝,可以避免大量的数据移动

1
2
val broadcastVal = sc.breadcast(Array(1,2,3))
println(broadcastVal.value)

Accumulator

收集变量/累加器:用于实现计数器、计算总和

  • 集群上各个任务可以向变量执行增加操作,但是不能读取值, 只有Driver Program(客户程序)可以读取

  • 累加符合结合律,所以集群对收集变量的累加没有顺序要求, 能够高效应用于并行环境

  • Spark原生支持数值类型累加器,可以自定义类型累加器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// 创建数值类型累加器
val accum = sc.accumulator(0, "accumulator")
sc.parallelize(Array(1,2,3,4)).foreach(x => accum += x)
println(accum.value)

// 自定义向量累加器工具
object VectorAccumulatorParam extends AccumulatorParam[Vector]{
def zero(initialValue: Vector): Vector = {
Vector.zeros(initialValue.size)
}
def addInPlace(v1: Vector, v2: Vector){
v1 += v2
}
}
// 创建向量累加器
val vecAccum = sc.accumulator(new Vector(1,2,3))(VectorAccumulator)

数据源

1
2
3
4
5
6
7
// 按行读取文本文件
def sc.textFile(?fileName: String, ?slices: Int): RDD[String]
// 读取包含多个小文件的目录
def sc.wholeTextFile(?directoryName: String): Map[String, RDD[String]]
// #todo
def sc.SequenceFiles(fileName: String)
def sc.hadoopRDD()
  • slices:切片数目,缺省为每个文件块创建切片,不能设置 小于文件块数目的切片值
  • Spark基于文件的方法,原生支持

    • 文件目录
    • 压缩文件:gz
    • 简单通配符

      |通配符|含义| |——-|——-| |[]:范围| |[^]|范围补集| |?|单个字符| |*|0、多个字符| |{}|整体或选择|

Spark Session

SparkSession:Spark2.0中新入口,封装有SparkConfSparkContextSQLContextHiveContext等接口

1
2
3
4
5
6
7
8
val warehouseLocation = "file:${system:user.dir}/spark-warehouse"
val spark = SparkSession
.builder()
.appName("App")
.config("spark.sql.warehouse.dir", warehouseLocation)
.enableHiveSupport()
.getOrCreate()
spark.conf.set("spark.executor.memory", "2g")

Resilient Distributed Dataset

RDD

RDD:容错的、immutable、分布式、确定可重复计算的数据集

  • RDD可分配在集群中的多个节点中以支持并行处理

    • 隶属于同一RDD数据,被划分为不同的Partition,以此为 单位分布到集群环境中各个节点上
  • RDD是无结构的数据表,可以存放任何数据类型

  • RDD immutable

    • 对RDD进行转换,不会修改原RDD,只是返回新RDD
    • 这也是基于Lineage容错机制的要求
  • 是Spark软件系统的核心概念

RDD容错机制

  • RDD采用基于Lineage的容错机制

    • 每个RDD记住确定性操作的lineage,即从其他RDD转换而来 的路径
    • 若所有RDD的转换操作是确定的,则最终转换数据一致, 无论机器发生的错误
    • 当某个RDD损坏时,Spark可以从上游RDD重新计算、创建 其数据
  • 容错语义

    • 输入源文件:Spark运行在HDFS、S3等容错文件系统上,从 任何容错数据而来的RDD都是容错的
    • receiver:可靠接收者告知可靠数据源,保证所有数据总是 会被恰好处理一次
    • 输出:输出操作可能会使得数据被重复写出,但文件会被 之后写入覆盖
  • 故障类型

    • worker节点故障:节点中内存数据丢失,其上接收者缓冲 数据丢失
    • driver节点故障:spark context丢失,所有执行算子丢失

RDD操作

1
import org.apache.spark.rdd.RDD

Transformation

转换:从已有数据集创建新数据集

  • 返回新RDD,原RDD保持不变

  • 转换操作Lazy

    • 仅记录转换操作作用的基础数据集
    • 仅当某个动作被Driver Program(客户操作)调用DAG 的动作操作时,动作操作的一系列proceeding转换操作才会 被启动
Transformation RDD DStream
map(func)
flatMap(func)
filter(func)
reduceByKey(func[, numTasks]) 包含(K, V)键值对,返回按键聚集键值对
groupByKey()
aggregateByKey()
pipe()
coalesce()
repartition(numPartitions)
union(other)
  • XXXByKey:RDD中应为(K, V)键值对

spark_rdd_transformation

  • 绿色、黑色:单、多RDD窄依赖转换
  • 紫色:KV shuffle转换
  • 黄色:重分区转换
  • 蓝色:特例转换

Action

动作:在数据集上进行计算后返回值到驱动程序

  • 施加于一个RDD,通过对RDD数据集的计算返回新的结果
    • 默认RDD会在每次执行动作时重新计算,但可以使用 cachepersist持久化RDD至内存、硬盘中,加速下次 查询速度
Action RDD DStream
count() 返回包含单元素RDD的DStream
collect() 将RDD数据聚集至本地
countByValue() 返回(T, long)键值对
countByKey()
first() 返回包含单元素RDDd的DStream
reduce(func) 返回包含单元素RDD的DStream
take(func)
foreach(func)
foreachPartition(func)
join(other[, numTasks]) 包含(K,V),与另一(K,W)连接
cogroup(other[, numTasks]) 包含(K,V)、输入(K,W),返回(K, Seq(V), Seq(W)
  • numTasks:默认使用Spark默认并发数目

DStream RDD

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
// RDD级`map`:`func`以RDD为参数,自定义转换操作
def transform(func)
// RDD级`foreach`
def foreachRDD(func)

// RDD级`reduce`
def updateStateByKey[S: ClassTag](
// `K`、`Seq[V]`:当前RDD中键`K`对应值`V`集合
// `Option[S]`:上个RDD结束后键`K`对应状态
updateFunc: (Iterator[(K, Seq[V], Option[S])]) => Iterator[(K, S)],
// 分区算法

partitioner: Partitioner,
// 是否在接下来Streaming执行过程中产生的RDD使用相同分区算法
remmemberPartition: Boolean,
// 键值对的初始状态
initRDD: RDD[(K,S)]
)
  • RDD分布在多个worker节点上,对不可序列化传递对象,需要在 每个worker节点独立创建

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    dstream.foreachRDD(rdd => {
    rdd.foreachPartition(partitionOfRecords => {
    // 为每个partition创建不可序列化网络连接
    // 为每个record创建成本过高
    val connection = createNewConnnection()
    // 进一步可以维护静态连接对象池
    // val connection = ConnectionPool.getConnection()
    partitionOfRecords.foreach(record => connection.send(record))
    connection.close()
    })
    })

DStream Window Action

Window Action DStream
window(windowLength, slideInterval) 基于DStream产生的窗口化批数据产生DStream
countByWindow(windowLenght, slideInterval) 返回滑动窗口数
reduceByWindow(func, windowLength, slideInterval)
reduceByKeyAndWindow(func, windowLength, slidenInterval[, numTasks])
reduceByKeyAndWindow(func, invFunc, windowLength, slideInterval[, numTasks]) 须提供invFunc消除离开窗口RDD对reduce结果影响
countByValueAndWindow(windowLength, slideInterval[, numTasks])
  • windowLength:窗口长度
  • slideInterval:滑动间隔

  • 以上操作默认会持久化RDD至内存,无需手动调用persist等方法

spark_streaming_dstream_window_based_operation

Output

Output Operation RDD DStream
print 打印前10条元素 每个RDD打印前10条元素
saveAsObjectFile(prefix[, suffix]) 保存为序列化文件 命名为<prefix>-TIME_IN_M.<suffix>
saveAsTextFile(prefix[, suffix]) 保存为文本文件
saveAsHadoopFile(prefix[, suffix]) 保存为Hadoop文件

Persistence

Persistence RDD DStream
persist()
cache()

Directed Asycled Graph

Spark中DAG:可以看作由RDD、转换操作、动作操作构成,用于表达 复杂计算

  • 当需要执行某个操作时,将重新从上游RDD进行计算

  • 也可以对RDD进行缓存、持久化,以便再次存取,获得更高查询 速度

    • In-mem Storage as Deserialized Java Objects
    • In-mem Storage as Serialized Data
    • On-Disk Storage

DAG工作流示例

spark_dag_procedure

  • 从HDFS装载数据至两个RDD中
  • 对RDD(和中间生成的RDD)施加一系列转换操作
  • 最后动作操作施加于最后的RDD生成最终结果、存盘

宽依赖、窄依赖

spark_dag_wide_narrow_dependencies

  • 窄依赖:每个父RDD最多被一个子RDD分区使用

    • 即单个父RDD分区经过转换操作生成子RDD分区
    • 窄依赖可以在一台机器上处理,无需Data Shuffling, 在网络上进行数据传输
  • 宽依赖:多个子RDD分区,依赖同一个父RDD分区

    • 涉及宽依赖操作
      • groupByKey
      • reduceByKey
      • sortByKey
    • 宽依赖一般涉及Data Shuffling

DAG Scheduler

DAG SchedulerStage-Oriented的DAG执行调度器

spark_dag_job_stage

  • 使用Job、Stage概念进行作业调度

    • 作业:一个提交到DAG Scheduler的工作项目,表达成DAG, 以一个RDD结束
    • 阶段:一组并行任务,每个任务对应RDD一个分区,是作业 的一部分、数据处理基本单元,负责计算部分结果,
  • DAG Scheduler检查依赖类型

    • 把一系列窄依赖RDD组织成一个阶段
      • 所以说阶段中并行的每个任务对应RDD一个分区
    • 宽依赖需要跨越连续阶段
      • 因为宽依赖子RDD分区依赖多个父RDD分区,涉及 Data Shuffling,数据处理不能在单独节点并行执行
      • 或者说阶段就是根据宽依赖进行划分
  • DAG Scheduler对整个DAG进行分析

    • 为作业产生一系列阶段、其间依赖关系
    • 确定需要持久化的RDD、阶段的输出
    • 找到作业运行最小代价的执行调度方案、根据Cache Status 确定的运行每个task的优选位置,把信息提交给 Task Sheduler执行
  • 容错处理

    • DAG Scheduler负责对shuffle output file丢失情况进行 处理,把已经执行过的阶段重新提交,以便重建丢失的数据
    • stage内其他失败情况由Task Scheduler本身进行处理, 将尝试执行任务一定次数、直到取消整个阶段

DataFrame

  • DataFrame:类似关系型数据库中表,数据被组织到具名列中

    • 相较于RDD是对分布式、结构化数据集的高层次抽象,提供 特定领域的专用API进行处理
    • 静态类型安全:相较于SQL查询语句,在编译时即可发现 语法错误
  • Dataset:有明确类型数据、或无明确数据类型集合,相应API 也分为两类

    • 相较于DataFrame,也可组织半结构化数据,同样提供方便 易用的结构化API
    • 静态类型、运行时类型安全:相较于DataFrame,集合元素 有明确类型,在编译时即可发现分析错误
  • Spark2.0中二者API统一
  • DataFrame可视为无明确数据类型Dataset[Row]别名,每行是 无类型JVM对象

创建方式

  • .toDF

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    val sqlContext = new org.apache.spark.sql.SQLContext(sc)
    import sqlContext.implicits._

    // `.toDF` + `Seq`创建
    val df = Seq(
    (1, "F", java.sql.Date.valueOf("2019-08-02")),
    (2, "G", java.sql.Date.valueOf("2019-08-01"))
    ).toDF("id", "level", "date")
    // 不指定列名,则默认为`_1`、`_2`等


    // `.toDF` + `case Class`创建
    case class Person(name: String, age: Int)
    val people = sc.textFile("people.txt")
    .map(_.split(","))
    .map(p => Person(p(0),p(1).trim.toInt))
    .toDF()
  • .createDataFrame

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    import org.apache.spark.sql.types._
    val schema = StrucType(List(
    StructField("id", IntegerType, nullable=False),
    StructField("level", StringType, nullable=False),
    StructField("date", DateType, nullable=False)
    ))
    val rdd = sc.parallelize(Seq(
    (1, "F", java.sql.Date.valueOf("2019-08-02")),
    (2, "G", java.sql.Date.valueOf("2019-08-01"))
    ))
    val df = sqlContext.createDataFrame(rdd, schema)
  • 读取文件创建

    1
    2
    3
    4
    5
    6
    7
    val df = sqlContext.read.parquet("hdfs:/peopole.parq")
    val df = spark.read.json("people.json")
    // 读取csv仅2.0版本`SparkSession`后可
    val df = spark.read.format("com.databricks.spark.csv")
    .option("header", "true")
    .option("mode", "DROPMALFORMED")
    .load("people.csv")

三种数据集对比

  • 空间、时间效率:DataFrame >= Dataset > RDD

    • DataFrame、Dataset基于SparkSQL引擎构建,使用Catalyst 生成优化后的逻辑、物理查询计划;无类型DataFrame相较 有类型Dataset运行更快
    • Spark作为编译器可以理解Dataset类型JVM对象,会使用 编码器将其映射为Tungsten内存内部表示
  • RDD适合场景

    • 对数据集进行最基本的转换、处理、控制
    • 希望以函数式编程而不是以特定领域表达处理数据
    • 数据为非结构化,如:流媒体、字符流
  • DataFrame、Dataset使用场景

    • 需要语义丰富、高级抽象、通用平台、特定领域API
    • 需要对半结构化数据进行高级处理,如:filter、SQL查询
    • 需要编译时/运行时类型安全、Catalyst优化、内存优化

Spark Streaming

Spark Streaming

Spark Streaming:提供对实时数据流高吞吐、高容错、可扩展的 流式处理系统

  • 可以对多种数据源(Kafka、Flume、Twitter、ZeroMQ),进行 包括map、reduce、join等复杂操作

  • 采用Micro Batch数据处理方式,实现更细粒度资源分配,实现 动态负载均衡

    • 离散化数据流为为小的RDDs得到DStream交由Spark引擎处理
    • 数据存储在内存实现数据流处理、批处理、交互式一体化
    • 故障节点任务将均匀分散至集群中,实现更快的故障恢复

streaming.StreamingContext

1
2
3
4
5
6
7
8
9
10
import org.apache.spark.streaming.StreamingContext
class StreamingContext(?conf: SparkConf, ?slices: Int){

// 开始接受、处理流式数据
def start()
// 结束流式处理过程
def stop(?stop_spark_context=True)
// 等待计算完成
def awaitTermination()
}
1
2
3
4
5
6
import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.streaming.Seconds
import org.apache.spark.streaming.StreamingContext._

val conf = new SparkConf().setAppName("app name").setMaster(master)
val ssc = new StreamingContext(conf, Seconds(1))

Discreted Stream

DStream:代表持续性的数据流,是Spark Streaming的基础抽象

spark_streaming_dstream_transformation

  • 可从外部输入源、已有DStream转换得到

    • 可在流应用中并行创建多个输入DStream接收多个数据流
  • 在内部实现

    • DStream由时间上连续的RDD表示,每个RDD包含特定时间 间隔内的数据流
    • 对DStream中各种操作也是映射到内部RDD上分别进行 (部分如transform等则以RDD为基本单元)
      • 转换操作仍然得到DStream
      • 最终结果也是以批量方式生成的batch
    • 对DStream操作参见tools/spark/spark_rdd
  • (大部分)输入流DStream和一个Receiver对象相关联

    • Recevier对象作为长期任务运行,会占用独立计算核, 若分配核数量不够,系统将只能接收数据而不能处理
    • reliable receiver:可靠的receiver正确应答可靠源, 数据收到、且被正确复制至Spark
    • unreliable receiver:不可靠recevier不支持应答

Basic Sources

基本源:在StreamingContext中直接可用

  • 套接字连接
  • Akka中Actor
  • RDD队列数据流
1
2
3
4
5
6
7
8
// 套接字连接TCP源获取数据
def ssc.socketTextStream(?host: String, ?port: Int): DStream

// 自定义actor流
def ssc.actorStream(actorProps: ?, actorName: String): DStream

// RDD队列流
def ssc.queueStream(queueOfRDDs: Seq[RDD])

文件系统

1
2
3
4
// 文件流获取数据
def ssc.fileStream[keyClass, valueClass, inputFormatClass]
(dataDirectory: String): DStream
def ssc.textFileStream(dataDirectory)

文件系统:StreamingContext将监控目标目录,处理目录下任何 文件(不包括嵌套目录)

  • 文件须具有相同数据格式
  • 文件需要直接位于目标目录下
  • 已处理文件追加数据不被处理
  • 文件流无需运行receiver

Advanced Sources

高级源:需要额外的依赖

  • Flume
  • Kinesis
  • Twitter

Kafka

1
2
3
// 创建多个Kafka输入流
val kafkaStreams = (1 to numStreams).map(_ => kafkaUtils.createStream())
val unifiedStream = streamingContext.union(kafkaStreams)

性能调优

  • 减少批数据处理时间

    • 创建多个receiver接收输入流,提高数据接受并行水平
    • 提高数据处理并行水平
    • 减少输入数据序列化、RDD数据序列化成本
    • 减少任务启动开支
  • 设置正确的批容量,保证系统能正常、稳定处理数据

  • 内存调优,调整内存使用、应用的垃圾回收行为

Checkpoint

1
2
3
4
// 设置checkpoint存储信息目录
def ssc.checkpoint(?checkpointDirectory: String)
// 从checkpoint中恢复(若目录存在)、或创建新streaming上下文
def StreamingContext.getOrCreate(?checkPointDirectory: String, ?functionToCreateContext: () => StreamingContext)
  • 为保证流应用程序全天运行,需要checkpoint足够信息到容错 存储系统,使得系统能从程序逻辑无关错误中恢复

    • metadata checkpointing:流计算的定义信息,用于恢复 worker节点故障
    • configuration:streaming程序配置
    • DStream operation:streaming程序操作集合
    • imcomplete batches:操作队列中未完成批
    • data checkpointing:中间生成的RDD,在有状态的转换 操作中必须,避免RDD依赖链无限增长
  • 需要开启checkpoint场合

    • 使用有状态转换操作:updateStateByKeyreduceByKeyAndWindow
    • 从程序的driver故障中恢复
1
2
3
4
5
6
7
def functionToCreateContext(): StreamingContext = {
val conf = new SparkConf()
val ssc = new StreamingContext(conf)
// other streaming setting
ssc.checkpoint("checkpointDirectory")
ssc
}

Spark GraphX

GraphX

Spark GraphX:图数据的并行处理模块

  • GraphX扩展RDD为Resilient Distributed Property Graph, 边、顶点都有属性的有向图

  • 可利用此模块对图数据进行ExploratoryAnalysis、Iterative Graph Computation

  • GraphX提供了包括:子图、顶点连接、信息聚合等操作在内的 基础原语,并且对的Pregel API提供了优化变量的

  • GraphX包括了正在不断增加的一系列图算法、构建方法,用于 简化图形分析任务

    • 提供了一系列操作

      • Sub Graph:子图
      • Join Vertices:顶点连接
      • Aggregate Message:消息聚集
      • Pregel API变种
    • 经典图处理算法

      • PageRank

Spark MLLib

MLLib

Spark MLLib:Spark平台的机器学习库

  • 能直接操作RDD数据集,可以和其他BDAS其他组件无缝集成, 使得在全量数据上进行学习成为可能

  • 实现包括以下算法

    • Classification
    • Regression
    • Clustering
    • Collaborative Filtering
    • Dimensionality Reduction
  • MLLib是MLBase中的一部分

    • MLLib
    • MLI
    • MLOptimizer
    • MLRuntime
  • 从Spark1.2起被分为两个模块

    • spark.mllib:包含基于RDD的原始算法API
    • spark.ml:包含基于DataFrame的高层次API
      • 可以用于构建机器学习PipLine
      • ML PipLine API可以方便的进行数据处理、特征转换、 正则化、联合多个机器算法,构建单一完整的机器学习 流水线
  • MLLib算法代码可以在examples目录下找到,数据则在data 目录下
  • 机器学习算法往往需要多次迭代到收敛为止,Spark内存计算、 DAG执行引擎象相较MapReduce更理想
  • 由于Spark核心模块的高性能、通用性,Mahout已经放弃 MapReduce计算模型,选择Spark作为执行引擎

mllib.classification

Classification

Logistic Regression

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
from pyspark.mllib.classification import \
LogisticRegressionWithLBFGS, LogisticRegressionModel
from pyspark.mllib.regression import LabledPoint

def parse_point(line):
value = [float(i) for i line.split(", \r\n\t")

data = sc.textFile("data/mllib/sample_svm_data.txt")
parsed_data = data.map(parse_point)
# map `parse_point` to all data

model = LogisticRegressionWithLBFGS.train(parsed_data)
labels_and_preds = parsed_data.map(lambda p: (p.label, model.predict(p.features)))
train_err = labels_and_preds \
.filter(lambda lp: lp[0] != lp[1]) \
.count() / float(parsed_data.count())

model.save(sc, "model_path")
same_model = LogisticRegressionModel.load(sc, "model.path")
  • Decision Tree
  • Random Forest
  • Gradient
  • boosted tree
  • Multilaye Perceptron
  • Support Vector Machine
  • One-vs-Rest Classifier
  • Naive Bayes

Clustering

K-means

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
import numpy as np
from pyspark.mllib.clustering import KMeans, KMeansModel

data = sc.textFile("data/mllib/kmeans_data.txt")
parsed_data = data.map(lambda line: np.array([float(i) for i in line.split()]))

cluster_model = KMeans.train(
parsed_data,
maxIteration=10,
initializationMode="random"
)
def error(point):
center = cluster_model.centers[cluster.predict(point)]
return np.sqrt(sum([i**2 for i in (point - center)]))
WSSSE = parsed_data \
.map(lambda point.error(point)) \
.reduce(lambd x, y: x + y)

cluster_model.save(sc, "model_path")
same_model = KMeansModel.load(sc, "model_path")

Gaussian Mixture Model(GMM)

  • 混合密度模型
    • 有限混合模型:正态分布混合模型可以模拟所有分布
    • 迪利克莱混合模型:类似于泊松过程
  • 应用
    • 聚类:检验聚类结果是否合适
    • 预测:

      todo

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
import numpy as np
from pyspark.mllib.clustering import GussianMixture, \
GussianMixtureModel

data = sc.textFile("data/mllib/gmm_data.txt")
parsed_data = data.map(lambda line: np.array[float(i) for i in line.strip()]))

gmm = GaussianMixture.train(parsed_data, 2)
for w, g in zip(gmm.weights, gmm.gaussians):
print("weight = ", w,
"mu = ", g.mu,
"sigma = ", g.sigma.toArray())

gmm.save(sc, "model_path")
same_model = GussainMixtureModel.load(sc, "model_path")

Latent Dirichlet Allocation(LDA)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
from pyspark.mllib.clustering import LDA, LDAModel
from pyspark.mllib.linalg import Vectors

data = sc.textFile("data/mllib/sample_lda_data.txt")
parsed_data = data.map(lambda line: Vector.dense([float(i) for i in line.strip()]))

corpus = parsed_data.zipWithIndex() \
.map(lambda x: [x[1], x[0]).cache()
ldaModel = LDA.train(corpus, k=3)

topics = ldaModel.topicsMatrix()

for word in range(0, ldaModel.vocabSize()):
for topic in word:
print(topic)

ldaModel.save(sc, "model_path")
same_model = LDAModel.load("model_path")
  • Disecting K-means

Regression

Linear Regression

  • 耗时长、无法计算解析解(无意义)
  • 使用MSE作为极小化目标函数,使用SGD算法求解
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
from pyspark.mllib.regression import LabledPoint, \
LinearRegressionWithSGD, LinearRegressionModel

def parse_point(line):
value = [float(i) for i line.split(", \r\n\t")

data = sc.textFile("data/mllib/ridge-data/lpsa.data")
parsed_data = data.map(parse_point)
# map `parse_point` to all data

model = LinearRegressionWithSGD.train(
parsed_data,
iteration=100,
step=0.00000001
)
values_and_preds = parsed_data.map(lambda p:(p.label, model.predict(p.features)))
MSE = values_and_preds \
.map(lambda vp: (vp[0] - vp[1]) ** 2) \
.reduce(lambda x, y: x + y) / values_and_preds.count()

model.save(sc, "model_path")
# save model
same_model = LinearRegressionModel.load(sc, "model_path")
# load saved model
  • Generalized Linear Regression
  • Decision Tree Regression
  • Random Forest Regression
  • Gradient-boosted Tree Regression
  • Survival Regression
  • Isotonic Regression

Collaborative Filtering

Spark SQL

Spark SQL

Spark SQL:结构化数据查询模块

  • 内置JDBC服务器,通过JDBC API暴露Spark数据集,让客户程序 可以在其上直接执行SQL查询

  • 通过ETL工具从不同格式数据源装载数据,并运行一些 Ad-Hoc Query

  • 可以连接传统的BI、可视化工具至数据集

  • 前身Shark即为Hive on Spark,后出于维护、优化、 性能考虑放弃
  • Extraction Transformation Loading:ETL

sparksql_structure

sql.SQLContext

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
import org.apache.spark.sql.{SQLContext, HiveContext}

class SQLContext{

// 缓存使用柱状格式的表
// Spark将仅仅浏览需要的列、自动压缩数据减少内存使用
def cacheTable(tableName: String)

// 将普通RDD转换为SchemaRDD
def implicit createSchemaRDD(rdd: RDD): SchemaRDD

// 载入parquet格式文件
def parquetFile(fileName: String): SchemaRDD

// 载入json格式文件
def jsonFile(fileName: String): SchemaRDD
def jsonRDD(rdd: RDD[String]): SchemaRDD

// 执行SQL query
def sql(query: String): SchemeRDD
}
  • HiveContext支持SQLContext支持功能的超集,增加在 MetaStore发现表、利用HiveSQL写查询功能

sql.SchemaRDD

1
2
3
4
5
6
7
8
9
10
11
class SchemaRDD{

// 存储为parquet文件
def saveAsParquetFile(fileName: String)

// 注册为临时表,然后可以使用SQL语句查询
def registerTempTable(tableName: String)

// 打印表schema
def printSchema()
}

在数据存储层面对数据进行结构化描述的schema

  • 由SchemaRDD(上个版本)发展而来,在其上增加schema层 ,以便对各个数据列命名、数据类型描述

  • 可以通过DF API把过程性处理、Relational Processing (对表格的选择、投影、连接等操作)集成

  • DF API操作是Lazy的,使得Spark可以对关系操作、数据处理 工作流进行深入优化

  • 结构化的DF可以通过调用DF API重新转换为无结构的RDD数据集

  • 可以通过不同Data Source创建DF

    • 已经存在的RDD数据集
    • 结构化数据文件
    • JSON数据集
    • Hive表格
    • 外部数据库表

Data Source

数据源:通过DS API可以存取不同格式保存的结构化数据

  • Parquet
  • JSON
  • Apache Avro数据序列化格式
  • JDBC DS:可以通过JDBC读取关系型数据库
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
import org.apache.spark.sql.{SQLContext, StructType, StructField, Row}
import org.apache.spark.sql.HiveContext

val sqlContext = new SQLContext(sc)
import sqlContext.createSchemeRDD


case class Person(name: String, age: Int)

// 通过反射推断包含特定对象类型的RDD的模式
// 需要编写时已知模式
// 代码更简洁、工作更好
val people: RDD[Person] = sc.textFile("people.txt")
.map(_.split(","))
.map(p => Person(p(0), p(1).trim.toInt))


// 编程指定模式:构建模式,然后在已经存在的RDDs上使用
// 运行在运行期前不知道列、列类型情况下构造SchemaRDDs
val schemaString = "name age"
val people = sc.textFile("people.txt")
val schema = StructType(schemaString.split(" ")
.map(fieldName => StructField(fieldName, StringType, true))
)
val rowRDD = people.map(_.split(","))
.map(p => Row(p(0), p(1).trim))
val peopleSchemaRDD = sqlContext.applySchema(rowRDD, schema)
peopleSchemaRDD.registerTempTable("people")


// 查询语言集成
val teenagers = people.where("age >= 13").select("name")


people.registerTempTable("people")
val teenagers = sqlContext.sql("SELECT name FORM people WHERE age >= 13")


val apRDD = sc.parallelize(
"""{"name": "Tom", "address": { "city": "Columbus", "state": "Ohio" }}""" :: Nil)
val anotherPeople = sqlContext.jsonRDD(apRDD)

Catalyst 优化器

结构

Catalyst优化器:利用Scala模式匹配和quasiquotes机制构建的 可扩展查询优化器

sparksql_optimization

sparksql_procedure

  • SparkSQL Pipeline的中间核心部分

Parser模块

Parser模块:将SQL语句切分为token,根据一定语义规则解析为AST

sql_parser_ast

  • Spark1.x使用Scala原生Parser Combinator构建的词法、语法 分析器

  • Spark2.x使用采用第三方语法解析器工具ANTLR4

    • ANTLR4根据语法文件SqlBase.g4自动解析生成两个Java类 ,将sql语句解析成ParseTree的语法结构

      • SqlBaseLexer:词法解析器
      • SqlBaseParser:语法解析器
    • 随后ParsePlan过程,使用AstBuilder.scala将ParseTree 转换为catalyst表达式逻辑计划Unresovled Logical Plan

      • Unsolved Relation
      • Unsolved Function
      • Unsolved Attribute

Analyzer模块

Analyzer模块:使用Analysis Rules,借助数据元数据 (session cataloghive metastore)将ULP解析为 Logical Plan

sparksql_catalyst_analyzer

  • ULP虽然具备基本骨架,但是系统对表的字段信息不清楚,需要 基本元数据信息表达ULP中token

  • 遍历整个AST,对树上每个结点进行数据类型绑定、函数绑定, 得到LP

Schema Catalog

元数据信息:表的模式信息

  • 表的基本定义:表名、列名、数据类型
  • 表的数据格式:json、text、parquet、压缩格式
  • 表的物理位置

Optimizer模块

Optimizer模块:使用Optimization Rules对LP进行合并、列 裁剪、过滤器下推等优化工作,生成等价Optimized Logical Plan

  • 分为RBO、CBO两种优化策略,是catalyst核心

Spark Planner

Spark Planner模块:将OLP转换为spark能够理解的 Physical Plan

sql_optimization_physical_plan

  • 将逻辑上可行的执行计划变为Spark真正可以执行的物理计划
  • 物理计划实际上是逻辑计划中耗时最小的算法实现

Join

Join类型

SparkSQL目前支持三种join算符

  • shuffle hash join
  • broadcast hash join
  • sort merge join

Broadcast Hash Join

broadcast hash join:将小表广播分发到大表所在的结点上, 并行在各节点上进行hash join

sparksql_broadcast_hash_join

  • 适合小表很小,可以直接广播的场合

    • spark.sql.autoBroadcastJoinThreshold设置广播小表 大小上限
  • broadcast阶段:将所小表广播分发到大表所在的所有主机

    • driver分发
    • p2p分发
  • hash join结点:各结点独立并行hash join

    • 小表构建hash表
    • 各结点本地大表试探

Shuffle Hash Join

shuffle hash join:按照join key分区,在每个结点独立并行 进行hash join

sparksql_shuffle_hash_join

  • 类似分布式GHJ,不同块位于不同结点

  • shuffle阶段:将表按照join key分区,将具有相同join key 的记录重分布到同一结点

  • hash jon阶段:各结点使用本地数据独立并行hash join

Sort Merge Join

SMJ:按照join key分区,在各节点独立并行SMJ

sparksql_sort_merge_join

  • shuffle阶段:将表按照join key分区,将具有相同join key 的记录重分布到同一结点

  • sort阶段:各节点并行对本地数据排序

    • spark当前shuffle算法使用sort-based shuffle算法
    • 理论上shuffle过后各分区数据已经排序完毕,无需再次 sort,效率很高
  • merge阶段:各节点对排序好表数据执行join操作

Join Reorder

  • 基于CBO的join重排序优化:用统计信息预估的基修正join顺序

  • 使用动态规划算法,考虑所有可能组合,选择代价最小的方案

    • 单个join操作成本,join树的成本是所有中间join成本总和

      • carinality:对应CPU成本
      • size:对应IO成本
    • 没有任何join条件同时包含左、右子树时,修剪笛卡尔积 减少搜索范围

Statistics Collection Framework

CBO依赖统计细节信息优化查询计划

  • CBO自下而上遍历LP,统计信息可以随之传播到上层算子

统计信息类型

  • Numeric、Date、Timestamp

    • Distinct Count
    • Max
    • Min
    • Null Count
    • Average Length:定长
    • Max Length:定长
  • String、Binary

    • Distinct Count
    • Null Count
    • Average Length
    • Max Length

统计方式

  • 扫描全表:简单、统计信息准确,代价大
  • 抽样统计:

应用

Filter Selectivity

过滤选择率:估计应用谓词表达式过滤的选择率

逻辑运算符
  • AND:左侧过滤条件选择率、右侧过滤条件选择率之积

  • OR:左侧、右侧过滤条件选择率之和,减去其乘积

  • NOT:1减去原始过滤条件选择率

比较运算符
  • =:等于条件

    • 若常数取值在当前列取值范围之外,则过滤选择率为0
    • 否则根据柱状图、均匀分布得到过滤选择率
  • <:小于条件

    • 若常数取值小于当前列最小值,则过滤选择率为0
    • 否则根据柱状图、均匀分数得到过滤选择率

Join Carinality

联接基:估计联接操作结果的基

  • inner:其他基估计值可由inner join计算

    • num(A):join操作前表A的有效记录数
    • distinct(A.k):表A中列k唯一值数量
  • left-outer:取inner join、左表中基较大者

  • right-outer:取inner join、右表中基较大者

  • full-outer

B)

$$

SparkSQL2.4中启用CBO时JoinReorder分析

背景

Spark Join方式

SparkSQL目前支持三种join方式

  • broadcast hash join:将小表广播分发到大表所在的结点上 ,并行在各节点上进行hash join

    • 仅适合内表非常小的场合
  • shuffle hash join:按照join key分区,每个结点独立并行 进行hash join

    • 类似分布式GHJ,不同块位于不同结点
  • sort merge join:按照join key分区,在各节点独立并行SMJ

    • spark当前shuffle算法使用sort-based shuffle算法
    • 理论上shuffle过后各分区数据已经排序完毕,无需再次 sort,效率很高

Join类型

SparkSQL支持的Join类型可以分为以下两类

  • 顺序结果无关Join

    • inner join
    • (full)outer join
  • 顺序结果相关Join

    • left(outer) join
    • right(outer) join
    • left semi join
    • right semi join

考虑到JoinReorder的结果

  • 仅支持连接重排序的连接类型只可能是inner join outer join

  • outer join重排序虽然不影响结果,但是处理不方便,所以 联接重排序一般仅限于inner join???

    • 有些情况下RBO可以将外联接等价转换为内联接
    • SparkSQL2.4中支持的连接重排序仅限于内连接

Cost-Based Opitimization/Optimizer

CBO:基于成本的优化(器)

  • 根据SQL的执行成本制定、优化查询作业执行计划,生成可能 的执行计划中代价最小的计划

    • 数据表统计数据
      • 基/势
      • 唯一值数量
      • 空值数量
      • 平均、最大长度
    • SQL执行路径I/O
    • 网络资源
    • CPU使用情况
  • 在SparkSQL Hash Join中可以用于

    • 选择正确hash建表方
    • 选择正确join类型:广播hash、全洗牌hash
    • join reorder:调整多路join顺序
  • CBO本身需要耗费一定资源,需要平衡CBO和查询计划优化程度

    • 数据表的数据统计资源耗费
    • 优化查询计划即时资源耗费
  • CBO是相较于Rule-Based Optimization的概念

CBO中的独特概念

  • cardinality:集的势,结果集的行数

    • 表示SQL执行成本值
    • SQL执行返回的结果集包含的行数越多,成本越大
  • selectivity:可选择率,施加指定谓语条件后返回结果集的 记录数占未施加任何谓语条件的原始结果记录数的比率

    • 值越小,说明可选择性越好
    • 值越大,说明可选择性越差,成本值越大

Join Reorder

Join Reorder:基于CBO的多表连接顺序重排

  • 用统计信息预估的基修正join顺序

  • 主要涉及到以下两个方面

    • 查询代价估算
    • 多表连接顺序搜索算法

查询代价估计

代价模型

  • 单个join操作成本

    • carinality:对应CPU成本
    • size:对应IO成本
  • join树的成本是所有中间join成本总和

Filter Selectivity估计

过滤选择率:估计应用谓词表达式过滤的选择率

逻辑运算符
  • AND:左侧过滤条件选择率、右侧过滤条件选择率之积

  • OR:左侧、右侧过滤条件选择率之和,减去其乘积

  • NOT:1减去原始过滤条件选择率

比较运算符
  • =:等于条件

    • 若常数取值在当前列取值范围之外,则过滤选择率为0
    • 否则根据柱状图、均匀分布得到过滤选择率
  • <:小于条件

    • 若常数取值小于当前列最小值,则过滤选择率为0
    • 否则根据柱状图、均匀分数得到过滤选择率

Join Carinality估计

联接基:估计联接操作结果的基

  • inner:其他基估计值可由inner join计算

    • num(A):join操作前表A的有效记录数
    • distinct(A.k):表A中列k唯一值数量
  • left-outer:取inner join、左表中基较大者

  • right-outer:取inner join、右表中基较大者

  • full-outer

多表连接顺序搜索算法

SparkSQL2.4中使用动态规划算法对可能联接顺序进行搜索,从中 选择最优的联接顺序作为执行计划

  • 最优子结构:一旦前k个表联接顺序确定,则联接前中间表和 第k+1个表方案和前k个表的联接顺序无关

  • 动态规划表:从单表代价开始,逐层向上计算各层多表联接代价 ,直到求得所有表联接最小代价

  • 减少搜索空间启发式想法:尽可能优先有谓词限制的内连接、 中间表

评价

  • 优势:动态规划算法能够求得整个搜索空间中最优解
  • 缺陷:当联接表数量增加时,算法需要搜索的空间增加的非常快 ,计算最优联接顺序代价很高

PostgreSQL

代价模型

Postgres的查询代价估计模型基于CPU开销、IO开销,另外还增加 了启动代价

动态规划算法

类似SparkSQL2.4多表连接算法(假设联接n个表)

  1. 构造第一层关系:每个关系的最优路径就是关系的最优单表扫描 方式

  2. 迭代依次构造之后n-1层关系联接最优解

    • 左深联接树方式:将第k-1层每个关系同第1层关系联接
    • 紧密树联接方式:将第m(m > 2)层每个关系同第k-m层关系 联接

    left_deep_tree_bushy_tree

遗传算法

遗传算法:模拟自然界生物进化过程,采用人工进化的方式对目标 空间进行搜索

  • 本质是高效、并行、全局搜索方法
  • 能在搜索过程中自动获取、积累有关搜索空间的知识,并自适应 的控制搜索过程以求的最佳解

思想

  • 将问题域中可能解看作是染色体,将其编码为符号串的形式
  • 对染色体群体反复进行基于遗传学的操作:选择、交叉、变异
  • 根据预定目标适应度函数对每个个体进行评价,不断得到更优 群体,从中全局并行搜索得到优化群体中最优个体

MySQL

代价模型

  • 因为多表联接顺序采用贪心算法,多个表已经按照一定规则排序 (可访问元组数量升序排序)
  • 所以MySQL认为,找到每个表的最小花费就是最终联接最小代价

贪心算法

贪心算法:认为每次连接表的连接方式都是最优的,即从未联接表中 选择使得下次联接代价最小者

  • 多表排序一般为

    • 常量表最前
    • 其他表按可访问元组数量升序排序
  • 贪心算法得到的联接方式都是最优的

    • 则每次联接主要求解要联接表对象的最佳访问方式
    • 即每次代价估计的重点在于单表扫描的代价
  • 求解结束后,局部最优查询计划生成

    • 得到左深树
    • 最初始表位于最左下端叶子节点处

优化方案

以下分别从查询代价估计、多表连接顺序搜索算法给出方案

查询代价估计

  • 考虑在现有代价模型上增加网络通信开销

  • 在现有直方图估计选择率基础上,增加选择率估计方法

    • Parametric Method:参数方法,使用预先估计分布函数 逼近真实分布

    • Curve Fitting:曲线拟合法,使用多项式函数、最小 标准差逼近属性值分布

多表连接顺序搜索算法

考虑到动态规划算法随着联接表数量增加时,计算代价过于庞大, 可以考虑引入其他算法优化多表连接顺序

  • 遗传算法
  • 退火算法
  • 贪心算法
  • 遗传算法
  • 退火算法
  • 贪心算法