Spark Core

Spark特性

数据处理速度快

得益于Spark的内存处理技术、DAG执行引擎

内存计算

  • Spark尽量把数据(中间结果等)驻留在内存中,必要时才写入 磁盘,避免I/O操作,提高处理效率

  • 支持保存部分数据在内存中,剩下部分保存在磁盘中

  • 数据完全驻留于内存时,数据处理达到hadoop系统的 几十~上百倍,数据存在磁盘上时,处理速度能够达到hadoop的 10倍左右

DAG执行引擎

  • Spark执行任务前,根据任务之间依赖关系生成DAG图,优化数据 处理流程(减少任务数量)、减少I/O操作

  • 除了简单的map、reduce,Spark提供了超过80个数据处理的 Operator Primitives

  • 对于数据查询操作,Spark采用Lazy Evaluation方式执行, 帮助优化器对整个数据处力工作流进行优化

易用性/API支持

  • Spark使用Scala编写,经过编译后在JVM上运行

  • 支持各种编程语言,提供了简洁、一致的编程接口

    • Scala
    • Java
    • Python
    • Clojure
    • R

通用性

  • Spark是通用平台,支持以DAG(有向无环图)形式表达的复杂 数据处理流程,能够对数据进行复杂的处理操作,而不用将复杂 任务分解成一系列MapReduce作业

  • Spark生态圈DBAS(Berkely Data Analysis Stack)包含组件, 支持批处理、流数据处理、图数据处理、机器学习

兼容性

  • DataStorage

    • 一般使用HDFS、Amazon S3等分布式系统存储数据
    • 支持Hive、Hbase、Cassandra等数据源
    • 支持Flume、Kafka、Twitter等流式数据
  • Resource Management

    • 能以YARN、Mesos等分布式资源管理框架为资源管理器
    • 也可以使用自身资源的管理器以Standalone Mode独立运行
  • 使用支持

    • 可以使用shell程序,交互式的对数据进行查询
    • 支持流处理、批处理
  • 数据类型、计算表达能力

    • Spark可以管理各种类型的数据集:文本

Spark架构

核心组件

  • Spark StreamingSpark SQLSpark GraphXSpark MLLib为BDAS所包含的组件
  • Spark Streaming:提供对实时数据流高吞吐、高容错、可 扩展的流式处理系统

    • 采用Micro Batch数据处理方式,实现更细粒度资源分配, 实现动态负载均衡
    • 可以对多种数据源(Kafka、Flume、Twitter、ZeroMQ),进行 包括map、reduce、join等复杂操作
  • Spark SQL:结构化数据查询模块

    • 通过JDBC API暴露Spark数据集,让客户程序可以在其上 直接执行SQL查询
    • 可以连接传统的BI、可视化工具至数据集
    • 前身Shark即为Hive on Spark,后出于维护、优化、 性能考虑放弃
  • Spark GraphX:图数据的并行处理模块

    • 扩展RDD为Resilient Distributed Property Graph, 将属性赋予各个节点、边的有向图
    • 可利用此模块对图数据进行ExploratoryAnalysis、Iterative Graph Computation
  • Spark MLLib:可扩展的机器学习模块

    • 大数据平台使得在全量数据上进行学习成为可能
    • 实现包括以下算法
      • Classification
      • Regression
      • Clustering
      • Collaborative Filtering
      • Dimensionality Reduction

周围组件

  • BlinkDB:近似查询处理引擎

    • 可以在大规模数据集上,交互式执行SQL查询
    • 允许用户在查询精度、响应时间之间做出折中
      • 用户可以指定查询响应时间、查询结果精度要求之一
      • BlinkDB在Data Sample上执行查询,获得近似结果
      • 查询结果会给Error Bar标签,帮助决策
  • Tachyon:基于内存的分布式文件系统

    • 支持不同处理框架
      • 可在不同计算框架之间实现可靠的文件共享
      • 支持不同的上层查询处理框架,可以以极高的速度对集群 内存中的文件进行访问
    • 将workset文件缓存在内存中,避免对磁盘的存取,如果数据集 不断被扫描、处理,数据处理速度将极大提升

Spark实体

spark_entities

  • Spark Context:负责和CM的交互、协调应用

    • 所有的Spark应用作为独立进程运行,由各自的SC协调
    • SC向CM申请在集群中worker创建executor执行应用
  • Driver:执行应用主函数、创建Spark Context的节点

  • Worker:数据处理的节点

  • Cluster Manager:为每个driver中的应用分配资源

    • 以下3种资源管理器,在sceduling、security、monitoring 有所不同,根据需要选择不同的CM
      • Standalone
      • Mesos
      • YARN
    • CM对Spark是agnostic

Spark Context

spark.SparkContext

1
2
3
4
5
6
7
8
9
10
11
class SparkConf{
// 设置Spark应用名
def setAppName(appName: String)

// 设置集群地址:yarn master节点地址、"local[4]"本地standalone
def setMaster(master: String)
}
class SparkContext(?conf: SparkConf){
// 将driver中节点分块
def parallelize(?val: ?AnyRef, ?numPartition: Int)
}
  • SparkContext是Spark应用执行环境封装,任何应用都需要、 也只能拥有一个活动实例,有些shell可能默认已经实例化
1
2
3
4
5
import org.apache.spark.{SparkConf, SparkContext}

val conf = new SparkConf().setAppName("app name")
.setMaster("local[4]")
val sc = new SparkContext(conf)

Share Variable

共享变量:可以是一个值、也可以是一个数据集,Spark提供了两种 共享变量

Broadcast Variable

广播变量:缓存在各个节点上,而不随着计算任务调度的发送变量 拷贝,可以避免大量的数据移动

1
2
val broadcastVal = sc.breadcast(Array(1,2,3))
println(broadcastVal.value)

Accumulator

收集变量/累加器:用于实现计数器、计算总和

  • 集群上各个任务可以向变量执行增加操作,但是不能读取值, 只有Driver Program(客户程序)可以读取

  • 累加符合结合律,所以集群对收集变量的累加没有顺序要求, 能够高效应用于并行环境

  • Spark原生支持数值类型累加器,可以自定义类型累加器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// 创建数值类型累加器
val accum = sc.accumulator(0, "accumulator")
sc.parallelize(Array(1,2,3,4)).foreach(x => accum += x)
println(accum.value)

// 自定义向量累加器工具
object VectorAccumulatorParam extends AccumulatorParam[Vector]{
def zero(initialValue: Vector): Vector = {
Vector.zeros(initialValue.size)
}
def addInPlace(v1: Vector, v2: Vector){
v1 += v2
}
}
// 创建向量累加器
val vecAccum = sc.accumulator(new Vector(1,2,3))(VectorAccumulator)

数据源

1
2
3
4
5
6
7
// 按行读取文本文件
def sc.textFile(?fileName: String, ?slices: Int): RDD[String]
// 读取包含多个小文件的目录
def sc.wholeTextFile(?directoryName: String): Map[String, RDD[String]]
// #todo
def sc.SequenceFiles(fileName: String)
def sc.hadoopRDD()
  • slices:切片数目,缺省为每个文件块创建切片,不能设置 小于文件块数目的切片值
  • Spark基于文件的方法,原生支持

    • 文件目录
    • 压缩文件:gz
    • 简单通配符

      |通配符|含义| |——-|——-| |[]:范围| |[^]|范围补集| |?|单个字符| |*|0、多个字符| |{}|整体或选择|

Spark Session

SparkSession:Spark2.0中新入口,封装有SparkConfSparkContextSQLContextHiveContext等接口

1
2
3
4
5
6
7
8
val warehouseLocation = "file:${system:user.dir}/spark-warehouse"
val spark = SparkSession
.builder()
.appName("App")
.config("spark.sql.warehouse.dir", warehouseLocation)
.enableHiveSupport()
.getOrCreate()
spark.conf.set("spark.executor.memory", "2g")

Resilient Distributed Dataset

RDD

RDD:容错的、immutable、分布式、确定可重复计算的数据集

  • RDD可分配在集群中的多个节点中以支持并行处理

    • 隶属于同一RDD数据,被划分为不同的Partition,以此为 单位分布到集群环境中各个节点上
  • RDD是无结构的数据表,可以存放任何数据类型

  • RDD immutable

    • 对RDD进行转换,不会修改原RDD,只是返回新RDD
    • 这也是基于Lineage容错机制的要求
  • 是Spark软件系统的核心概念

RDD容错机制

  • RDD采用基于Lineage的容错机制

    • 每个RDD记住确定性操作的lineage,即从其他RDD转换而来 的路径
    • 若所有RDD的转换操作是确定的,则最终转换数据一致, 无论机器发生的错误
    • 当某个RDD损坏时,Spark可以从上游RDD重新计算、创建 其数据
  • 容错语义

    • 输入源文件:Spark运行在HDFS、S3等容错文件系统上,从 任何容错数据而来的RDD都是容错的
    • receiver:可靠接收者告知可靠数据源,保证所有数据总是 会被恰好处理一次
    • 输出:输出操作可能会使得数据被重复写出,但文件会被 之后写入覆盖
  • 故障类型

    • worker节点故障:节点中内存数据丢失,其上接收者缓冲 数据丢失
    • driver节点故障:spark context丢失,所有执行算子丢失

RDD操作

1
import org.apache.spark.rdd.RDD

Transformation

转换:从已有数据集创建新数据集

  • 返回新RDD,原RDD保持不变

  • 转换操作Lazy

    • 仅记录转换操作作用的基础数据集
    • 仅当某个动作被Driver Program(客户操作)调用DAG 的动作操作时,动作操作的一系列proceeding转换操作才会 被启动
Transformation RDD DStream
map(func)
flatMap(func)
filter(func)
reduceByKey(func[, numTasks]) 包含(K, V)键值对,返回按键聚集键值对
groupByKey()
aggregateByKey()
pipe()
coalesce()
repartition(numPartitions)
union(other)
  • XXXByKey:RDD中应为(K, V)键值对

spark_rdd_transformation

  • 绿色、黑色:单、多RDD窄依赖转换
  • 紫色:KV shuffle转换
  • 黄色:重分区转换
  • 蓝色:特例转换

Action

动作:在数据集上进行计算后返回值到驱动程序

  • 施加于一个RDD,通过对RDD数据集的计算返回新的结果
    • 默认RDD会在每次执行动作时重新计算,但可以使用 cachepersist持久化RDD至内存、硬盘中,加速下次 查询速度
Action RDD DStream
count() 返回包含单元素RDD的DStream
collect() 将RDD数据聚集至本地
countByValue() 返回(T, long)键值对
countByKey()
first() 返回包含单元素RDDd的DStream
reduce(func) 返回包含单元素RDD的DStream
take(func)
foreach(func)
foreachPartition(func)
join(other[, numTasks]) 包含(K,V),与另一(K,W)连接
cogroup(other[, numTasks]) 包含(K,V)、输入(K,W),返回(K, Seq(V), Seq(W)
  • numTasks:默认使用Spark默认并发数目

DStream RDD

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
// RDD级`map`:`func`以RDD为参数,自定义转换操作
def transform(func)
// RDD级`foreach`
def foreachRDD(func)

// RDD级`reduce`
def updateStateByKey[S: ClassTag](
// `K`、`Seq[V]`:当前RDD中键`K`对应值`V`集合
// `Option[S]`:上个RDD结束后键`K`对应状态
updateFunc: (Iterator[(K, Seq[V], Option[S])]) => Iterator[(K, S)],
// 分区算法

partitioner: Partitioner,
// 是否在接下来Streaming执行过程中产生的RDD使用相同分区算法
remmemberPartition: Boolean,
// 键值对的初始状态
initRDD: RDD[(K,S)]
)
  • RDD分布在多个worker节点上,对不可序列化传递对象,需要在 每个worker节点独立创建

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    dstream.foreachRDD(rdd => {
    rdd.foreachPartition(partitionOfRecords => {
    // 为每个partition创建不可序列化网络连接
    // 为每个record创建成本过高
    val connection = createNewConnnection()
    // 进一步可以维护静态连接对象池
    // val connection = ConnectionPool.getConnection()
    partitionOfRecords.foreach(record => connection.send(record))
    connection.close()
    })
    })

DStream Window Action

Window Action DStream
window(windowLength, slideInterval) 基于DStream产生的窗口化批数据产生DStream
countByWindow(windowLenght, slideInterval) 返回滑动窗口数
reduceByWindow(func, windowLength, slideInterval)
reduceByKeyAndWindow(func, windowLength, slidenInterval[, numTasks])
reduceByKeyAndWindow(func, invFunc, windowLength, slideInterval[, numTasks]) 须提供invFunc消除离开窗口RDD对reduce结果影响
countByValueAndWindow(windowLength, slideInterval[, numTasks])
  • windowLength:窗口长度
  • slideInterval:滑动间隔

  • 以上操作默认会持久化RDD至内存,无需手动调用persist等方法

spark_streaming_dstream_window_based_operation

Output

Output Operation RDD DStream
print 打印前10条元素 每个RDD打印前10条元素
saveAsObjectFile(prefix[, suffix]) 保存为序列化文件 命名为<prefix>-TIME_IN_M.<suffix>
saveAsTextFile(prefix[, suffix]) 保存为文本文件
saveAsHadoopFile(prefix[, suffix]) 保存为Hadoop文件

Persistence

Persistence RDD DStream
persist()
cache()

Directed Asycled Graph

Spark中DAG:可以看作由RDD、转换操作、动作操作构成,用于表达 复杂计算

  • 当需要执行某个操作时,将重新从上游RDD进行计算

  • 也可以对RDD进行缓存、持久化,以便再次存取,获得更高查询 速度

    • In-mem Storage as Deserialized Java Objects
    • In-mem Storage as Serialized Data
    • On-Disk Storage

DAG工作流示例

spark_dag_procedure

  • 从HDFS装载数据至两个RDD中
  • 对RDD(和中间生成的RDD)施加一系列转换操作
  • 最后动作操作施加于最后的RDD生成最终结果、存盘

宽依赖、窄依赖

spark_dag_wide_narrow_dependencies

  • 窄依赖:每个父RDD最多被一个子RDD分区使用

    • 即单个父RDD分区经过转换操作生成子RDD分区
    • 窄依赖可以在一台机器上处理,无需Data Shuffling, 在网络上进行数据传输
  • 宽依赖:多个子RDD分区,依赖同一个父RDD分区

    • 涉及宽依赖操作
      • groupByKey
      • reduceByKey
      • sortByKey
    • 宽依赖一般涉及Data Shuffling

DAG Scheduler

DAG SchedulerStage-Oriented的DAG执行调度器

spark_dag_job_stage

  • 使用Job、Stage概念进行作业调度

    • 作业:一个提交到DAG Scheduler的工作项目,表达成DAG, 以一个RDD结束
    • 阶段:一组并行任务,每个任务对应RDD一个分区,是作业 的一部分、数据处理基本单元,负责计算部分结果,
  • DAG Scheduler检查依赖类型

    • 把一系列窄依赖RDD组织成一个阶段
      • 所以说阶段中并行的每个任务对应RDD一个分区
    • 宽依赖需要跨越连续阶段
      • 因为宽依赖子RDD分区依赖多个父RDD分区,涉及 Data Shuffling,数据处理不能在单独节点并行执行
      • 或者说阶段就是根据宽依赖进行划分
  • DAG Scheduler对整个DAG进行分析

    • 为作业产生一系列阶段、其间依赖关系
    • 确定需要持久化的RDD、阶段的输出
    • 找到作业运行最小代价的执行调度方案、根据Cache Status 确定的运行每个task的优选位置,把信息提交给 Task Sheduler执行
  • 容错处理

    • DAG Scheduler负责对shuffle output file丢失情况进行 处理,把已经执行过的阶段重新提交,以便重建丢失的数据
    • stage内其他失败情况由Task Scheduler本身进行处理, 将尝试执行任务一定次数、直到取消整个阶段

DataFrame

  • DataFrame:类似关系型数据库中表,数据被组织到具名列中

    • 相较于RDD是对分布式、结构化数据集的高层次抽象,提供 特定领域的专用API进行处理
    • 静态类型安全:相较于SQL查询语句,在编译时即可发现 语法错误
  • Dataset:有明确类型数据、或无明确数据类型集合,相应API 也分为两类

    • 相较于DataFrame,也可组织半结构化数据,同样提供方便 易用的结构化API
    • 静态类型、运行时类型安全:相较于DataFrame,集合元素 有明确类型,在编译时即可发现分析错误
  • Spark2.0中二者API统一
  • DataFrame可视为无明确数据类型Dataset[Row]别名,每行是 无类型JVM对象

创建方式

  • .toDF

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    val sqlContext = new org.apache.spark.sql.SQLContext(sc)
    import sqlContext.implicits._

    // `.toDF` + `Seq`创建
    val df = Seq(
    (1, "F", java.sql.Date.valueOf("2019-08-02")),
    (2, "G", java.sql.Date.valueOf("2019-08-01"))
    ).toDF("id", "level", "date")
    // 不指定列名,则默认为`_1`、`_2`等


    // `.toDF` + `case Class`创建
    case class Person(name: String, age: Int)
    val people = sc.textFile("people.txt")
    .map(_.split(","))
    .map(p => Person(p(0),p(1).trim.toInt))
    .toDF()
  • .createDataFrame

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    import org.apache.spark.sql.types._
    val schema = StrucType(List(
    StructField("id", IntegerType, nullable=False),
    StructField("level", StringType, nullable=False),
    StructField("date", DateType, nullable=False)
    ))
    val rdd = sc.parallelize(Seq(
    (1, "F", java.sql.Date.valueOf("2019-08-02")),
    (2, "G", java.sql.Date.valueOf("2019-08-01"))
    ))
    val df = sqlContext.createDataFrame(rdd, schema)
  • 读取文件创建

    1
    2
    3
    4
    5
    6
    7
    val df = sqlContext.read.parquet("hdfs:/peopole.parq")
    val df = spark.read.json("people.json")
    // 读取csv仅2.0版本`SparkSession`后可
    val df = spark.read.format("com.databricks.spark.csv")
    .option("header", "true")
    .option("mode", "DROPMALFORMED")
    .load("people.csv")

三种数据集对比

  • 空间、时间效率:DataFrame >= Dataset > RDD

    • DataFrame、Dataset基于SparkSQL引擎构建,使用Catalyst 生成优化后的逻辑、物理查询计划;无类型DataFrame相较 有类型Dataset运行更快
    • Spark作为编译器可以理解Dataset类型JVM对象,会使用 编码器将其映射为Tungsten内存内部表示
  • RDD适合场景

    • 对数据集进行最基本的转换、处理、控制
    • 希望以函数式编程而不是以特定领域表达处理数据
    • 数据为非结构化,如:流媒体、字符流
  • DataFrame、Dataset使用场景

    • 需要语义丰富、高级抽象、通用平台、特定领域API
    • 需要对半结构化数据进行高级处理,如:filter、SQL查询
    • 需要编译时/运行时类型安全、Catalyst优化、内存优化

Spark Streaming

Spark Streaming

Spark Streaming:提供对实时数据流高吞吐、高容错、可扩展的 流式处理系统

  • 可以对多种数据源(Kafka、Flume、Twitter、ZeroMQ),进行 包括map、reduce、join等复杂操作

  • 采用Micro Batch数据处理方式,实现更细粒度资源分配,实现 动态负载均衡

    • 离散化数据流为为小的RDDs得到DStream交由Spark引擎处理
    • 数据存储在内存实现数据流处理、批处理、交互式一体化
    • 故障节点任务将均匀分散至集群中,实现更快的故障恢复

streaming.StreamingContext

1
2
3
4
5
6
7
8
9
10
import org.apache.spark.streaming.StreamingContext
class StreamingContext(?conf: SparkConf, ?slices: Int){

// 开始接受、处理流式数据
def start()
// 结束流式处理过程
def stop(?stop_spark_context=True)
// 等待计算完成
def awaitTermination()
}
1
2
3
4
5
6
import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.streaming.Seconds
import org.apache.spark.streaming.StreamingContext._

val conf = new SparkConf().setAppName("app name").setMaster(master)
val ssc = new StreamingContext(conf, Seconds(1))

Discreted Stream

DStream:代表持续性的数据流,是Spark Streaming的基础抽象

spark_streaming_dstream_transformation

  • 可从外部输入源、已有DStream转换得到

    • 可在流应用中并行创建多个输入DStream接收多个数据流
  • 在内部实现

    • DStream由时间上连续的RDD表示,每个RDD包含特定时间 间隔内的数据流
    • 对DStream中各种操作也是映射到内部RDD上分别进行 (部分如transform等则以RDD为基本单元)
      • 转换操作仍然得到DStream
      • 最终结果也是以批量方式生成的batch
    • 对DStream操作参见tools/spark/spark_rdd
  • (大部分)输入流DStream和一个Receiver对象相关联

    • Recevier对象作为长期任务运行,会占用独立计算核, 若分配核数量不够,系统将只能接收数据而不能处理
    • reliable receiver:可靠的receiver正确应答可靠源, 数据收到、且被正确复制至Spark
    • unreliable receiver:不可靠recevier不支持应答

Basic Sources

基本源:在StreamingContext中直接可用

  • 套接字连接
  • Akka中Actor
  • RDD队列数据流
1
2
3
4
5
6
7
8
// 套接字连接TCP源获取数据
def ssc.socketTextStream(?host: String, ?port: Int): DStream

// 自定义actor流
def ssc.actorStream(actorProps: ?, actorName: String): DStream

// RDD队列流
def ssc.queueStream(queueOfRDDs: Seq[RDD])

文件系统

1
2
3
4
// 文件流获取数据
def ssc.fileStream[keyClass, valueClass, inputFormatClass]
(dataDirectory: String): DStream
def ssc.textFileStream(dataDirectory)

文件系统:StreamingContext将监控目标目录,处理目录下任何 文件(不包括嵌套目录)

  • 文件须具有相同数据格式
  • 文件需要直接位于目标目录下
  • 已处理文件追加数据不被处理
  • 文件流无需运行receiver

Advanced Sources

高级源:需要额外的依赖

  • Flume
  • Kinesis
  • Twitter

Kafka

1
2
3
// 创建多个Kafka输入流
val kafkaStreams = (1 to numStreams).map(_ => kafkaUtils.createStream())
val unifiedStream = streamingContext.union(kafkaStreams)

性能调优

  • 减少批数据处理时间

    • 创建多个receiver接收输入流,提高数据接受并行水平
    • 提高数据处理并行水平
    • 减少输入数据序列化、RDD数据序列化成本
    • 减少任务启动开支
  • 设置正确的批容量,保证系统能正常、稳定处理数据

  • 内存调优,调整内存使用、应用的垃圾回收行为

Checkpoint

1
2
3
4
// 设置checkpoint存储信息目录
def ssc.checkpoint(?checkpointDirectory: String)
// 从checkpoint中恢复(若目录存在)、或创建新streaming上下文
def StreamingContext.getOrCreate(?checkPointDirectory: String, ?functionToCreateContext: () => StreamingContext)
  • 为保证流应用程序全天运行,需要checkpoint足够信息到容错 存储系统,使得系统能从程序逻辑无关错误中恢复

    • metadata checkpointing:流计算的定义信息,用于恢复 worker节点故障
    • configuration:streaming程序配置
    • DStream operation:streaming程序操作集合
    • imcomplete batches:操作队列中未完成批
    • data checkpointing:中间生成的RDD,在有状态的转换 操作中必须,避免RDD依赖链无限增长
  • 需要开启checkpoint场合

    • 使用有状态转换操作:updateStateByKeyreduceByKeyAndWindow
    • 从程序的driver故障中恢复
1
2
3
4
5
6
7
def functionToCreateContext(): StreamingContext = {
val conf = new SparkConf()
val ssc = new StreamingContext(conf)
// other streaming setting
ssc.checkpoint("checkpointDirectory")
ssc
}

Spark GraphX

GraphX

Spark GraphX:图数据的并行处理模块

  • GraphX扩展RDD为Resilient Distributed Property Graph, 边、顶点都有属性的有向图

  • 可利用此模块对图数据进行ExploratoryAnalysis、Iterative Graph Computation

  • GraphX提供了包括:子图、顶点连接、信息聚合等操作在内的 基础原语,并且对的Pregel API提供了优化变量的

  • GraphX包括了正在不断增加的一系列图算法、构建方法,用于 简化图形分析任务

    • 提供了一系列操作

      • Sub Graph:子图
      • Join Vertices:顶点连接
      • Aggregate Message:消息聚集
      • Pregel API变种
    • 经典图处理算法

      • PageRank

Spark MLLib

MLLib

Spark MLLib:Spark平台的机器学习库

  • 能直接操作RDD数据集,可以和其他BDAS其他组件无缝集成, 使得在全量数据上进行学习成为可能

  • 实现包括以下算法

    • Classification
    • Regression
    • Clustering
    • Collaborative Filtering
    • Dimensionality Reduction
  • MLLib是MLBase中的一部分

    • MLLib
    • MLI
    • MLOptimizer
    • MLRuntime
  • 从Spark1.2起被分为两个模块

    • spark.mllib:包含基于RDD的原始算法API
    • spark.ml:包含基于DataFrame的高层次API
      • 可以用于构建机器学习PipLine
      • ML PipLine API可以方便的进行数据处理、特征转换、 正则化、联合多个机器算法,构建单一完整的机器学习 流水线
  • MLLib算法代码可以在examples目录下找到,数据则在data 目录下
  • 机器学习算法往往需要多次迭代到收敛为止,Spark内存计算、 DAG执行引擎象相较MapReduce更理想
  • 由于Spark核心模块的高性能、通用性,Mahout已经放弃 MapReduce计算模型,选择Spark作为执行引擎

mllib.classification

Classification

Logistic Regression

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
from pyspark.mllib.classification import \
LogisticRegressionWithLBFGS, LogisticRegressionModel
from pyspark.mllib.regression import LabledPoint

def parse_point(line):
value = [float(i) for i line.split(", \r\n\t")

data = sc.textFile("data/mllib/sample_svm_data.txt")
parsed_data = data.map(parse_point)
# map `parse_point` to all data

model = LogisticRegressionWithLBFGS.train(parsed_data)
labels_and_preds = parsed_data.map(lambda p: (p.label, model.predict(p.features)))
train_err = labels_and_preds \
.filter(lambda lp: lp[0] != lp[1]) \
.count() / float(parsed_data.count())

model.save(sc, "model_path")
same_model = LogisticRegressionModel.load(sc, "model.path")
  • Decision Tree
  • Random Forest
  • Gradient
  • boosted tree
  • Multilaye Perceptron
  • Support Vector Machine
  • One-vs-Rest Classifier
  • Naive Bayes

Clustering

K-means

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
import numpy as np
from pyspark.mllib.clustering import KMeans, KMeansModel

data = sc.textFile("data/mllib/kmeans_data.txt")
parsed_data = data.map(lambda line: np.array([float(i) for i in line.split()]))

cluster_model = KMeans.train(
parsed_data,
maxIteration=10,
initializationMode="random"
)
def error(point):
center = cluster_model.centers[cluster.predict(point)]
return np.sqrt(sum([i**2 for i in (point - center)]))
WSSSE = parsed_data \
.map(lambda point.error(point)) \
.reduce(lambd x, y: x + y)

cluster_model.save(sc, "model_path")
same_model = KMeansModel.load(sc, "model_path")

Gaussian Mixture Model(GMM)

  • 混合密度模型
    • 有限混合模型:正态分布混合模型可以模拟所有分布
    • 迪利克莱混合模型:类似于泊松过程
  • 应用
    • 聚类:检验聚类结果是否合适
    • 预测:

      todo

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
import numpy as np
from pyspark.mllib.clustering import GussianMixture, \
GussianMixtureModel

data = sc.textFile("data/mllib/gmm_data.txt")
parsed_data = data.map(lambda line: np.array[float(i) for i in line.strip()]))

gmm = GaussianMixture.train(parsed_data, 2)
for w, g in zip(gmm.weights, gmm.gaussians):
print("weight = ", w,
"mu = ", g.mu,
"sigma = ", g.sigma.toArray())

gmm.save(sc, "model_path")
same_model = GussainMixtureModel.load(sc, "model_path")

Latent Dirichlet Allocation(LDA)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
from pyspark.mllib.clustering import LDA, LDAModel
from pyspark.mllib.linalg import Vectors

data = sc.textFile("data/mllib/sample_lda_data.txt")
parsed_data = data.map(lambda line: Vector.dense([float(i) for i in line.strip()]))

corpus = parsed_data.zipWithIndex() \
.map(lambda x: [x[1], x[0]).cache()
ldaModel = LDA.train(corpus, k=3)

topics = ldaModel.topicsMatrix()

for word in range(0, ldaModel.vocabSize()):
for topic in word:
print(topic)

ldaModel.save(sc, "model_path")
same_model = LDAModel.load("model_path")
  • Disecting K-means

Regression

Linear Regression

  • 耗时长、无法计算解析解(无意义)
  • 使用MSE作为极小化目标函数,使用SGD算法求解
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
from pyspark.mllib.regression import LabledPoint, \
LinearRegressionWithSGD, LinearRegressionModel

def parse_point(line):
value = [float(i) for i line.split(", \r\n\t")

data = sc.textFile("data/mllib/ridge-data/lpsa.data")
parsed_data = data.map(parse_point)
# map `parse_point` to all data

model = LinearRegressionWithSGD.train(
parsed_data,
iteration=100,
step=0.00000001
)
values_and_preds = parsed_data.map(lambda p:(p.label, model.predict(p.features)))
MSE = values_and_preds \
.map(lambda vp: (vp[0] - vp[1]) ** 2) \
.reduce(lambda x, y: x + y) / values_and_preds.count()

model.save(sc, "model_path")
# save model
same_model = LinearRegressionModel.load(sc, "model_path")
# load saved model
  • Generalized Linear Regression
  • Decision Tree Regression
  • Random Forest Regression
  • Gradient-boosted Tree Regression
  • Survival Regression
  • Isotonic Regression

Collaborative Filtering

Spark SQL

Spark SQL

Spark SQL:结构化数据查询模块

  • 内置JDBC服务器,通过JDBC API暴露Spark数据集,让客户程序 可以在其上直接执行SQL查询

  • 通过ETL工具从不同格式数据源装载数据,并运行一些 Ad-Hoc Query

  • 可以连接传统的BI、可视化工具至数据集

  • 前身Shark即为Hive on Spark,后出于维护、优化、 性能考虑放弃
  • Extraction Transformation Loading:ETL

sparksql_structure

sql.SQLContext

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
import org.apache.spark.sql.{SQLContext, HiveContext}

class SQLContext{

// 缓存使用柱状格式的表
// Spark将仅仅浏览需要的列、自动压缩数据减少内存使用
def cacheTable(tableName: String)

// 将普通RDD转换为SchemaRDD
def implicit createSchemaRDD(rdd: RDD): SchemaRDD

// 载入parquet格式文件
def parquetFile(fileName: String): SchemaRDD

// 载入json格式文件
def jsonFile(fileName: String): SchemaRDD
def jsonRDD(rdd: RDD[String]): SchemaRDD

// 执行SQL query
def sql(query: String): SchemeRDD
}
  • HiveContext支持SQLContext支持功能的超集,增加在 MetaStore发现表、利用HiveSQL写查询功能

sql.SchemaRDD

1
2
3
4
5
6
7
8
9
10
11
class SchemaRDD{

// 存储为parquet文件
def saveAsParquetFile(fileName: String)

// 注册为临时表,然后可以使用SQL语句查询
def registerTempTable(tableName: String)

// 打印表schema
def printSchema()
}

在数据存储层面对数据进行结构化描述的schema

  • 由SchemaRDD(上个版本)发展而来,在其上增加schema层 ,以便对各个数据列命名、数据类型描述

  • 可以通过DF API把过程性处理、Relational Processing (对表格的选择、投影、连接等操作)集成

  • DF API操作是Lazy的,使得Spark可以对关系操作、数据处理 工作流进行深入优化

  • 结构化的DF可以通过调用DF API重新转换为无结构的RDD数据集

  • 可以通过不同Data Source创建DF

    • 已经存在的RDD数据集
    • 结构化数据文件
    • JSON数据集
    • Hive表格
    • 外部数据库表

Data Source

数据源:通过DS API可以存取不同格式保存的结构化数据

  • Parquet
  • JSON
  • Apache Avro数据序列化格式
  • JDBC DS:可以通过JDBC读取关系型数据库
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
import org.apache.spark.sql.{SQLContext, StructType, StructField, Row}
import org.apache.spark.sql.HiveContext

val sqlContext = new SQLContext(sc)
import sqlContext.createSchemeRDD


case class Person(name: String, age: Int)

// 通过反射推断包含特定对象类型的RDD的模式
// 需要编写时已知模式
// 代码更简洁、工作更好
val people: RDD[Person] = sc.textFile("people.txt")
.map(_.split(","))
.map(p => Person(p(0), p(1).trim.toInt))


// 编程指定模式:构建模式,然后在已经存在的RDDs上使用
// 运行在运行期前不知道列、列类型情况下构造SchemaRDDs
val schemaString = "name age"
val people = sc.textFile("people.txt")
val schema = StructType(schemaString.split(" ")
.map(fieldName => StructField(fieldName, StringType, true))
)
val rowRDD = people.map(_.split(","))
.map(p => Row(p(0), p(1).trim))
val peopleSchemaRDD = sqlContext.applySchema(rowRDD, schema)
peopleSchemaRDD.registerTempTable("people")


// 查询语言集成
val teenagers = people.where("age >= 13").select("name")


people.registerTempTable("people")
val teenagers = sqlContext.sql("SELECT name FORM people WHERE age >= 13")


val apRDD = sc.parallelize(
"""{"name": "Tom", "address": { "city": "Columbus", "state": "Ohio" }}""" :: Nil)
val anotherPeople = sqlContext.jsonRDD(apRDD)

Catalyst 优化器

结构

Catalyst优化器:利用Scala模式匹配和quasiquotes机制构建的 可扩展查询优化器

sparksql_optimization

sparksql_procedure

  • SparkSQL Pipeline的中间核心部分

Parser模块

Parser模块:将SQL语句切分为token,根据一定语义规则解析为AST

sql_parser_ast

  • Spark1.x使用Scala原生Parser Combinator构建的词法、语法 分析器

  • Spark2.x使用采用第三方语法解析器工具ANTLR4

    • ANTLR4根据语法文件SqlBase.g4自动解析生成两个Java类 ,将sql语句解析成ParseTree的语法结构

      • SqlBaseLexer:词法解析器
      • SqlBaseParser:语法解析器
    • 随后ParsePlan过程,使用AstBuilder.scala将ParseTree 转换为catalyst表达式逻辑计划Unresovled Logical Plan

      • Unsolved Relation
      • Unsolved Function
      • Unsolved Attribute

Analyzer模块

Analyzer模块:使用Analysis Rules,借助数据元数据 (session cataloghive metastore)将ULP解析为 Logical Plan

sparksql_catalyst_analyzer

  • ULP虽然具备基本骨架,但是系统对表的字段信息不清楚,需要 基本元数据信息表达ULP中token

  • 遍历整个AST,对树上每个结点进行数据类型绑定、函数绑定, 得到LP

Schema Catalog

元数据信息:表的模式信息

  • 表的基本定义:表名、列名、数据类型
  • 表的数据格式:json、text、parquet、压缩格式
  • 表的物理位置

Optimizer模块

Optimizer模块:使用Optimization Rules对LP进行合并、列 裁剪、过滤器下推等优化工作,生成等价Optimized Logical Plan

  • 分为RBO、CBO两种优化策略,是catalyst核心

Spark Planner

Spark Planner模块:将OLP转换为spark能够理解的 Physical Plan

sql_optimization_physical_plan

  • 将逻辑上可行的执行计划变为Spark真正可以执行的物理计划
  • 物理计划实际上是逻辑计划中耗时最小的算法实现

Join

Join类型

SparkSQL目前支持三种join算符

  • shuffle hash join
  • broadcast hash join
  • sort merge join

Broadcast Hash Join

broadcast hash join:将小表广播分发到大表所在的结点上, 并行在各节点上进行hash join

sparksql_broadcast_hash_join

  • 适合小表很小,可以直接广播的场合

    • spark.sql.autoBroadcastJoinThreshold设置广播小表 大小上限
  • broadcast阶段:将所小表广播分发到大表所在的所有主机

    • driver分发
    • p2p分发
  • hash join结点:各结点独立并行hash join

    • 小表构建hash表
    • 各结点本地大表试探

Shuffle Hash Join

shuffle hash join:按照join key分区,在每个结点独立并行 进行hash join

sparksql_shuffle_hash_join

  • 类似分布式GHJ,不同块位于不同结点

  • shuffle阶段:将表按照join key分区,将具有相同join key 的记录重分布到同一结点

  • hash jon阶段:各结点使用本地数据独立并行hash join

Sort Merge Join

SMJ:按照join key分区,在各节点独立并行SMJ

sparksql_sort_merge_join

  • shuffle阶段:将表按照join key分区,将具有相同join key 的记录重分布到同一结点

  • sort阶段:各节点并行对本地数据排序

    • spark当前shuffle算法使用sort-based shuffle算法
    • 理论上shuffle过后各分区数据已经排序完毕,无需再次 sort,效率很高
  • merge阶段:各节点对排序好表数据执行join操作

Join Reorder

  • 基于CBO的join重排序优化:用统计信息预估的基修正join顺序

  • 使用动态规划算法,考虑所有可能组合,选择代价最小的方案

    • 单个join操作成本,join树的成本是所有中间join成本总和

      • carinality:对应CPU成本
      • size:对应IO成本
    • 没有任何join条件同时包含左、右子树时,修剪笛卡尔积 减少搜索范围

Statistics Collection Framework

CBO依赖统计细节信息优化查询计划

  • CBO自下而上遍历LP,统计信息可以随之传播到上层算子

统计信息类型

  • Numeric、Date、Timestamp

    • Distinct Count
    • Max
    • Min
    • Null Count
    • Average Length:定长
    • Max Length:定长
  • String、Binary

    • Distinct Count
    • Null Count
    • Average Length
    • Max Length

统计方式

  • 扫描全表:简单、统计信息准确,代价大
  • 抽样统计:

应用

Filter Selectivity

过滤选择率:估计应用谓词表达式过滤的选择率

逻辑运算符
  • AND:左侧过滤条件选择率、右侧过滤条件选择率之积

  • OR:左侧、右侧过滤条件选择率之和,减去其乘积

  • NOT:1减去原始过滤条件选择率

比较运算符
  • =:等于条件

    • 若常数取值在当前列取值范围之外,则过滤选择率为0
    • 否则根据柱状图、均匀分布得到过滤选择率
  • <:小于条件

    • 若常数取值小于当前列最小值,则过滤选择率为0
    • 否则根据柱状图、均匀分数得到过滤选择率

Join Carinality

联接基:估计联接操作结果的基

  • inner:其他基估计值可由inner join计算

    • num(A):join操作前表A的有效记录数
    • distinct(A.k):表A中列k唯一值数量
  • left-outer:取inner join、左表中基较大者

  • right-outer:取inner join、右表中基较大者

  • full-outer

B)

$$

数据库背景

数据库术语

数据库结构

  • 数据源:多源数据继承
  • 数据仓库继承工具:FTL工具、MapReduce
  • 数据仓库服务器:列存储数据库引擎
  • 数据即使:数据仓库的数据子集、聚集数据继
  • OLAP服务器:提供多维数据视图
  • 前台数据分析工具
    • 报表工具
    • 多维分析工具
    • 数据挖掘工具

数据库类型

  • 传统关系型数据库
  • MPP大规模并行处理数据库
  • NoSQL数据库
  • 图数据库
  • NewSQL数据库
  • GPU数据库

数据查询

  • Project Pushdown:投影下推

    • 只读取、查询需要的
    • 减少每次查询的IO数据量
  • Predicate Pushdown:谓词下推

    • 将过滤条件尽快执行,跳过不满足条件

数据压缩算法

  • Run Length Encoding:重复数据
  • Delta Encoding:有序数据集
  • Dictionary Encoding:小规模数据集合
  • Prefix Encoding:字符串版Delta Encoding

数据库调优

数据库调优:是数据库具有更高的吞吐量、更快响应

  • 被调优对象是数据库整体
  • 需要考虑很多资源、数据库配置

人工调优

  • 依赖人工,效率低下
  • 要求操作者理解查询原理,对应用、DBMS、操作系统、硬件有 一定理解

基于案例调优

  • 总结典型应用案例情况中数据库参数推荐配置值、逻辑层设计等 情况,为用户调优工作提供参考、借鉴
  • 忽略系统动态性、不同系统之间差异

自调优

  • 为DBMS建立模型,根据“影响数据库性能效率的因素”,自动进行 参数配置
  • 部分商业数据库实现了自调优技术

需求分析期

应用情况估算

  • 应用使用方式
    • 将业务逻辑转换为读写分布逻辑,以读多写少、读写均衡 区分OLAP、OLTP
    • 应用对数据库的并发情况、并发是否可池化
  • 数据量
  • 对数据库压力、峰值压力

系统选型策略

  • 确定适合的数据库:开源、商业;集群、单机
  • 操作系统、中间件、硬件、网络选型

项目设计期

数据模型设计

根据业务逻辑,从以下角度考虑表结构

  • E-R模型设计:遵循E-R模型设计原理,适当非规范化可以改善 系统查询性能
  • 数据逻辑分布策略:减少数据请求中不必要的数据量
    • 分区
    • 利用E-R模型分表
  • 数据物理存储策略:减少IO操作
    • 启用压缩
    • 分开存储索引、表数据
    • 不同表数据分布在不同表空间
    • 不同表空间分布在不同物理存储,尤其是读写量大的表空间 分布在不同物理存储上
    • 日志、索引、数据分布在不同物理存储上
  • 索引:在查询频繁的对象上建立恰当索引

开发期

SQL设计

  • 编写正确、查询效率高的SQL语句,依据“查询重写规则”
    • 有意识地保障SQL能利用到索引

数据库功能启用

  • 查询重用
  • 数据库参数设计

测试、试运行、上线、维护

模型系统预运行

  • 在备用系统上模型实际运行环境,加大压力进行相似测试

系统监控分析

  • 应用系统表示:收集用户使用意见、系统存在问题
  • OS环境监控:实时监控CPU、内存、IO等,对比历史正常情况
  • 数据库内部状态监控:系统表、视图、工具、锁的情况
  • 日志分析:在数据库的日志、操作系统日志中找出异常

HiveSQL

命令行参数

  • -d/--define <key=value>:替换脚本中shell形式变量
  • --hivevar <key=value>:替换脚本中shell形式变量
    • 结合hive脚本中设置shell变量使用
  • -h <hostname>:hive服务器
  • -p <port>:hive服务器端口
  • -database <database>:连接数据库
  • -e <quoted-query-string>:从命令行获取、执行hive脚本
  • -f <filename>:从文件获取、执行hive脚本
  • -i <filename>:初始化hive脚本
  • --hiveconf <property=value>:设置hive参数
  • -S/--slient:安静模式启动交互hive shell
  • -v/--verbose:详细模式
  • -H/--help:帮助

辅助语句

结果输出

  • INSERT INTO/OVERWRITE:查询结果追加/覆盖在hive表中
  • INSERT INTO/OVERWRITE [LOCAL] DIRECTORY:查询结果追加/ 覆盖本地/HDFS目录
  • 有分区情况下,仅覆盖当前分区

内置函数

聚合函数

  • collect_set():配合group by合并、消除重复字段,返回 array
  • concat_ws():连接字符串
  • if(<condition>, <true_value>, <false_value>):判断条件
  • size():返回array长度
  • length():返回字符串大小

配置相关语句

文本分隔符

  • 记录分隔:\n
  • 字段分隔:\001(八进制)ASCII码1字符
  • Array、Struct、Map等集合中元素分隔:\002ASCII码1字符
  • Map中键值对分隔:\003ASCII码1字符
1
2
3
4
line terminated by `\n`
row format delimited fields terminated by `\001`
collection items terminated by `\002`
map keys terminated by `\003`

空值

  • hive中空值一般有两种存储方式

    • NULL:底层存储NULL,查询显示为NULL
    • \N:底层存储\N,查询显示为NULL,查询输出为\N
  • 空值查询:<field> is NULL

    • NULL:也可<field> = 'NULL'
    • \N:也可<field> = '\\N'(转义)
  • 底层存储设置参见表存储
  • 空字符串不是空值,需要用<field> = ''查询

表存储配置

分区

属性

serdeproperties

  • 设置空值存储方式

    1
    alter <table> SET serdeproperites('serialization.null.format' = '\N')

Mysql/Mariadb安装配置

安装

Mysql

大部分系统常用源中包含mysql,直接使用自带的包管理工具安装 即可,对于源中无法找到mysql的系统可以访问官网获取 安装方法

1
$ sudo zypper install mysql-server mysql-client

CentOS7

CentOS7的常用源中即不含mysql,安装mysql则需要添加mysql源, 同样在官网中找到添加方式:

  1. 下载的是RPM源rpm包
  2. $ sudo yum localintall安装后即添加源
  3. 使用yum直接安装mysql,需要注意的是默认情况下只有最新版本 mysql源是enabled,其他版本的需要--enablerepo指定或者 /etc/yum.repo.d下修改文件

Mariadb

mariadb和mysql大部分兼容,添加了一些新的特性,是mysql的一个 开源分支,由mysql的作者维护,避免mysql在被Oracle收购后闭源。

  • 大部分情况下,mariadb可以完全看作是mysql
  • 甚至在某些系统中,mariadb服务有别名mysql
  • mariadb控制台命令也是mysql

配置

配置文件

  • /etc/mysql/my.cnf:mysql主配置文件

    • mysql的此配置文件内包含有具体的配置
    • mariadb此文件中则不包含具体配置,而是导入配置文件
      1
      2
      !includedir /etc/mysql/conf.d/
      !includedir /etc/mysql/mariadb.cond.d/
  • ~/.my.cnf

    • 一般不存在,需要自行建文件,注意需要设置文件权限, 只能root用户可写,否则mysql会忽略此配置文件
    • mysqld服务启动需要root权限,因此~目录下的配置文件 基本不可能影响mysql-server状态,即[server]下的配置 基本是无效的

数据位置

  • 数据库存放数据位置:/var/lib/mysql/db_name/

Mysql-Client

登陆

@todo 一个问题,我安装的mariadb,默认的root用户无法在一般用户账户 登陆,必须sudo才能正常登陆

参数登陆

1
$ mysql -h host -P port -u user -p

mysql不带参数启动则是默认cur_user_name@localhost:3306, 表示使用当前用户名作为用户名登陆,如果该用户设置密码,-p 参数不能省略

文件

1
$ mysql --defaults-file=file_name

文件内容格式类似于配置文件

1
2
3
4
5
[client]
host=
user=
password=
database=(可选)

注意

  • mysql中默认存在一个用户名为空的账户,只要在本地,可以 不用账户、密码登陆mysql,因为这个账号存在,使用新建用户 无法通过密码登陆,需要在
    1
    2
    3
    $ use mysql;
    $ delete from user where User="";
    $ flush privileges;

Mysql交互命令

Show信息类

  • SHOW DATABASES:列出MySQLServer数据库。
  • SHOW TABLES [FROM db_name]:列出数据库数据表。
  • SHOW TABLE STATUS [FROM db_name]:列出数据表及表状态信息。
  • SHOW COLUMNS FROM tbl_name [FROM db_name]:列出资料表字段
    • DESC tbl_name:同
  • SHOW FIELDS FROM tbl_name [FROM db_name]
  • DESCRIBE tbl_name [col_name]
  • SHOW FULL COLUMNS FROM tbl_name [FROM db_name]:列出字段及详情
  • SHOW FULL FIELDS FROM tbl_name [FROM db_name]:列出字段完整属性
  • SHOW INDEX FROM tbl_name [FROM db_name]:列出表索引
  • SHOW STATUS:列出 DB Server 状态
  • SHOW VARIABLES [like pattern]:列出 MySQL 系统环境变量
  • SHOW PROCESSLIST:列出执行命令。
  • SHOW GRANTS FOR user:列出某用户权限

User

  • CREATE USER 'USER_NAME'@'ADDRESS' IDENTIFIED BY 'PASSWORD'

    • IDENTIFIED BY PASSWORD这个语法好像已经被丢弃了
  • SET PASSWORD FOR 'USER_NAME'@'ADDRESS' = PASSWORD('NEW_PWD')

  • SET PASSWORD = PASSWORD('NEW_PWD'):修改当前用户密码

  • GRANT PRIVILEGES ON DB_NAME.TBL_NAME TO 'USER_NAME'@'ADDRESS' [WITH GRANT OPTION]

    • WITH GRANT OPTION:允许用户授权
  • REVOKE PRIVILIEGES ON DB_NAME.TBL_NAME TO 'USER_NAME'@'ADDRES%'

  • DROP 'USER_NAME'@'ADDRESS'

说明

  • revoke和grant中的权限需要一致才能取消相应授权

    • grant select不能通过revoke all取消select
    • grant all也不能通过revoke select取消select
  • 特殊符号

    • %:所有address,也可以是ip部分,如:111.111.111.%
      • 这个其实在sql语句中可以表示任意长度字符串
    • *:所有数据库、表

Priviledges

名称 权限
alter alter table
alter routine alter or drop routines
create create table
create routine create routine
create temporary table create temporary table
create user create, drop, rename users and revoke all privilieges
create view create view
delete delete
drop drop table
execute run stored routines
file select info outfile and load data infile
index create index and drop index
insert insert
lock tables lock tables on tables for which select is granted
process show full processlist
reload use flush
replicati on client ask where slave or master server are
replicati on slave
select select
show databases show databases
show view show view
shutdown use mysqladmin shutdown
super change master, kill, purge master logs,
set global sql statements,    use mysqladmin
debug command, create an extra connection
even reach the maximum amount|

|update|update| |usage|connect without any specific priviliege|

执行Sql脚本

  • shell内执行

    1
    mysql -h host -D db_name -u user -p < file_name.sql;
  • mysql命令行执行

    1
    source file_name.sql;

导入、导出数据

导入数据

  • shell内

  • mysql命令行内

    1
    2
    3
    4
    5
    load data [local] infile '/path/to/file' into table tbl_name
    fields terminated by 'sep_char'
    optionally enclosed by 'closure_char'
    escaped by 'esc_char'
    lines terminated by `\r\n`;
    • /path/to/file不是绝对路径,则被认为是相对于当前 数据库存储数据目录的相对路径,而不是当前目录
    • 关键字local表示从客户端主机导入数据,否则从服务器 导入数据

导出数据

  • shell内

  • mysql命令行内

注意:远程登陆mysql时,两种方式导出数据不同,shell导出 数据可以导出至client,而mysql命令行内导出至server

Mysql-Server

数据库字符编码方式

查看

  • 只查看数据库编码方式

    1
    show variables like "character_set_database;
  • 查看数据库相关的编码方式

    1
    show variables like "character%";

    |variable_name|value| |——-|——-| |character_set_client|latin1| |character_set_connection|latin1| |character_set_database|latin1| |character_set_filesystem|binary| |character_set_results|latin1| |character_set_server|latin1| |character_set_system|utf8| |character_sets_dir|/usr/share/mysql/charsets/|

  • 另一种查询数据库编码方式

    1
    show variables like "collation%";

    |Variable_name|Value| |——-|——-| |collation_connection|utf8mb4_general_ci| |collation_database|utf8mb4_general_ci| |collation_server|utf8mb4_general_ci|

相关变量说明

  • character_set_client:客户端编码方式
  • character_set_connection:建立连接是所用编码
  • character_set_database:数据库编码
  • character_set_results:结果集编码
  • character_set_server:服务器编码

保证以上编码方式相同则不会出现乱码问题,还需要注意其他连接 数据库的方式不一定同此编码方式,可能需要额外指定

修改编码方式

修改数据库默认编码方式

修改mysql配置文件(/etc/mysql/my.cnf

#todo
mysql配置文件的优先级
utf8mb4意义
1
2
3
4
5
[client]
default-character-set=utf8mb4
[mysqld]
default-character-set=utf8mb4
init_connect="SET NAMES utf8mb4"

重启mysql即可

修改单个数据库
  • 创建时
    1
    create database db_name character set utf8 collate utf8_general_ci;
    1
    create database if not exists db_name defualt charater set utf8;
脚本、窗口
1
set names gbk;

只修改character_set_clientcharacter_set_connectioncharacter_set_results编码方式,且只对当前窗口、脚本有效, 不影响数据库底层编码方式