Spark SQL
Spark SQL:结构化数据查询模块
- 前身Shark即为Hive on Spark,后出于维护、优化、
性能考虑放弃
- Extraction Transformation Loading:ETL
sql.SQLContext
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| import org.apache.spark.sql.{SQLContext, HiveContext}
class SQLContext{
def cacheTable(tableName: String)
def implicit createSchemaRDD(rdd: RDD): SchemaRDD
def parquetFile(fileName: String): SchemaRDD
def jsonFile(fileName: String): SchemaRDD def jsonRDD(rdd: RDD[String]): SchemaRDD
def sql(query: String): SchemeRDD }
|
HiveContext
支持SQLContext
支持功能的超集,增加在
MetaStore发现表、利用HiveSQL写查询功能
sql.SchemaRDD
1 2 3 4 5 6 7 8 9 10 11
| class SchemaRDD{
def saveAsParquetFile(fileName: String)
def registerTempTable(tableName: String)
def printSchema() }
|
在数据存储层面对数据进行结构化描述的schema
由SchemaRDD(上个版本)发展而来,在其上增加schema层
,以便对各个数据列命名、数据类型描述
可以通过DF API把过程性处理、Relational Processing
(对表格的选择、投影、连接等操作)集成
DF API操作是Lazy的,使得Spark可以对关系操作、数据处理
工作流进行深入优化
结构化的DF可以通过调用DF API重新转换为无结构的RDD数据集
可以通过不同Data Source创建DF
- 已经存在的RDD数据集
- 结构化数据文件
- JSON数据集
- Hive表格
- 外部数据库表
Data Source
数据源:通过DS API可以存取不同格式保存的结构化数据
- Parquet
- JSON
- Apache Avro数据序列化格式
- JDBC DS:可以通过JDBC读取关系型数据库
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41
| import org.apache.spark.sql.{SQLContext, StructType, StructField, Row} import org.apache.spark.sql.HiveContext
val sqlContext = new SQLContext(sc) import sqlContext.createSchemeRDD
case class Person(name: String, age: Int)
val people: RDD[Person] = sc.textFile("people.txt") .map(_.split(",")) .map(p => Person(p(0), p(1).trim.toInt))
val schemaString = "name age" val people = sc.textFile("people.txt") val schema = StructType(schemaString.split(" ") .map(fieldName => StructField(fieldName, StringType, true)) ) val rowRDD = people.map(_.split(",")) .map(p => Row(p(0), p(1).trim)) val peopleSchemaRDD = sqlContext.applySchema(rowRDD, schema) peopleSchemaRDD.registerTempTable("people")
val teenagers = people.where("age >= 13").select("name")
people.registerTempTable("people") val teenagers = sqlContext.sql("SELECT name FORM people WHERE age >= 13")
val apRDD = sc.parallelize( """{"name": "Tom", "address": { "city": "Columbus", "state": "Ohio" }}""" :: Nil) val anotherPeople = sqlContext.jsonRDD(apRDD)
|