Spark SQL入门基础

Spark SQL简介

从Shark说起

Shark即hive on Spark,为了实现与Hive兼容,Shark在HiveQL方面重用了Hive中HiveQL的解析、逻辑执行计划、翻译执行计划优化等逻辑,可以近似认为将物理执行计划从MapReduce作业替换成了Spark作业,通过Hive的HiveQL解析,把HiveQL翻译成Spark上的RDD操作。Shark的出现,使得SQL-on-Hadoop的性能比Hive有了10-100倍的提高。

Shark的设计导致了两个问题:

  • 执行计划优化完全依赖于Hive,不方便添加新的优化策略。

  • 因为Shark是线程级并行,而MapReduce是进程级。因此,Spark在兼容Hive的实现上存在线程安全问题,导致Shark不得不使用另外一套独立维护的打了补丁的Hive源码分支。

2014年6月1日Shark项目和SparkSQL项目的主持人Reynold Xin宣布:停止对Shark的开发,团队将所有资源放在SparkSQL项目上。至此,Shark的发展画上了句号,也因此发展出了两个方向:SparkSQL和Hive on Spark

  • Spark SQL作为Spark生态的一员继续发展,而不再受限于Hive,只是兼容Hive。

  • Hive on Spark是一个Hive的发展计划,该计划将Spark作为Hive底层引擎之一,也就是说,Hive将不再受限于一个引擎,可以采用Map-Reduce、Tez、Spark等引擎。

Spark SQL设计

mark
Spark SQL在Hive兼容层面仅依赖HiveQL解析、Hive元数据,也就是说,从HQL被解析成抽象语法树起,就全部由Spark SQL接管了。Spark SQL执行计划生成和优化都由Catalyst(函数式关系查询优化框架)负责。

Spark SQL增加了DataFrame(即带有Schema信息的RDD),使用户可以在Spark SQL中执行SQL语句,数据既可以来自RDD,也可以是Hive、HDFS、Cassandra等外部数据源,还可以是JSON格式的数据。Spark SQL提供DataFrame API,可以对内部和外部各种数据源执行各种关系操作。

Spark SQL可以支持大量的数据源和数据分析算法。Spark SQL可以融合传统关系数据库的结构化数据管理能力和机器学习算法的数据处理能力。

DataFrame

DataFrame使得Spark具备了对大规模结构化数据的处理能力,不仅比原有的RDD转化方式更加简单易用,而且获得了更高的计算能力。

  • RDD是分布式的Java对象的集合,但是,对象内部结构对于RDD而言却是不可知的。

  • DataFrame是一种以RDD为基础的分布式数据集,提供了详细的结构信息。
    Spark能够轻松实现从MySQL到DataFrame的转化,并且支持SQL查询。
    RDD分布式对象的集合。

DataFrame的创建

从Spark2.0开始,Spark使用全新的SparkSession接口替代Spark1.6中的SQLContext及HiveContext接口来实现对数据加载、转换、处理等功能。SparkSession实现了SQLContext及HiveContext所有功能。

Spark支持从不同的数据源加载数据,并把数据转换成DataFrame,并且支持把DataFrame转换成SQLContext自身中的表,然后使用SQL语句来操作数据。SparkSession亦提供了HiveQL以及其他依赖于Hive的功能的支持。

现在在”/usr/local/spark/examples/src/main/resources/”这个目录下有两个样例数据people.json和people.txt。

people.json文件的内容如下:
{"name":"Michael"}
{"name":"Andy", "age":30}
{"name":"Justin", "age":19}

people.txt文件的内容如下:
Michael,29
Andy,30
Justin,19

编写代码读取文件数据,创建DataFrame。

import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder().getOrCreate()
//是支持RDDs转换为DataFrames及后续sql操作
import spark.implictis._
val df = spark.read.json("file://usr/local/spark/examples/src/main/resources/people.json")
df.show()
//打印模式信息
df.printSchema()
df.select(df("name"), df("age")+1).show()
//分组聚合
df.groupBy("age").count().show()

从RDD到DataFrame

利用反射机制推断RDD模式

在利用反射机制推断RDD模式时,需要首先顶一个case class。因为,只有case class才能被Spark隐式地转换为DataFrame。

import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
import org.apache.spark.sql.Encoder
import spark.implicits._ //支持把一个RDD隐式转换为一个DataFrame
import spark.implicits._
case class Person(name:String, age:Long) # 定义case class
val peopleDF = spark.sparkContext.textFile("file:///usr/local/spark/examples/src/main/resources/people.txt")
    .map(_.split(",")).map(attributes => Person(attributes(0),attributes(1).trim.toInt)).toDF()
peopleDF.createOrReplaceTempView("people") #必须注册为临时表才能供下面的查询使用
//最终生成一个DataFrame
val personsRDD = spark.sql("select name, age from people where age>20")
//DataFrame中的么个元素都是一行记录,包含name和age两个字段,分别用t(0),t(1)来获取值
personsRDD.map(t=>"Name:"+t(0)+","+"Age:"+t(1)).show()

使用编程方式定义RDD模式

当无法提前定义case clas时,就需要采用编程方式定义RDD模式。

import org.apache.spark.sql.types._
import org.apache.spark.sql.Row
val peopleRDD = spark.sparkContext.textFile("file:///usr/local/spark/examples/src/main/resources/people.txt")
//定义一个模式字符串
val schemaString="name age"
//根据模式字符串生成模式
val fields = schemaString.split(" ").map(fieldName=>StructField(fieldName, StringType, nullable=true))
val schema = StructType(fields) //模式中包含name和age两个字段
val rowRDD = peopleRDD.map(_.split(",")).map(attributes=>Row(attributes(0), attributes(1).trim))
val peopleDF = spark.createDataFrame(rowRDD, schema)
peopleDF.createOrReplaceTempView("people")
val results = spark.sql("select name,age from people")
results.map(attributes=>"name"+attributes(0)+"."+"age:"+attributes(1)).show()

把DataFrame保存成文件

  • 第一个方法
val peopleDF = spark.read.format("json").load("file:///usr/local/spark/examples/src/main/resources/people.json")
peopleDF.select("name","age").write.format("csv")
    .save("file:///usr/local/spark/examples/src/main/resources/newpeople.csv")

write.format()支持输出json,parquet,jdbc.orc,libsvm,csv,text等格式文件。
- 第二种方式

val peopleDF = spark.read.format("json").load("file:///usr/local/spark/examples/src/main/resources/people.json")
//转换成rdd然后再保存
peopleDF.rdd.saveAsTextFile("file:///usr/local/spark/mycode/newpeople.txt")

读取和保存parquet数据

Spark SQL可以支持Parquet、JSON、Hive等数据源,并且可以通过JDBC连接外部数据源。

Parquet是一种流行的列式存储格式,可以高效地存储具有嵌套字段的记录。Parquet是语言无关的,而且不与任何一种数据处理框架绑定在一起,适配多种语言和组件,能够与Parquet配合使用的组件有:

  • 查询引擎:Hive,Impala,Pig,Presto等
  • 计算框架:MapReduce,Spark,Cascading等
  • 数据模型:Avro,Thrift,Protocol Buffers, POJOs

从parquet文件中加载数据生成DataFrame

import spark.implicits._
val parquetFileDF = spark.read.parquet("file:///usr/local/spark/examples/src/main/resources/users.parquet")
parquetFileDF.createOrReplaceTempView("parquetFile")
val namesDF = spark.sql("select * from parquetFile")
namesDF.foreach(attributes=>println("Name:"+attributes(0)+"favorite color:"+attributes(1)))

将DataFrame保存成parquet文件

import spark.implicits._
val parquetFileDF = spark.read.json("file:///usr/local/spark/examples/src/main/resources/people.json")
peopleDF.write.parquet("file:///usr/local/spark/mycode/newpeople.parquet")

读取和插入MySQL

准备工作:

  • 下载MySQL的JDBC驱动,比如mysql-connector-java-5.1.40.tar.gz
  • 把该驱动程序拷贝到spark的安装目录“/usr/local/spark/jars”下
  • 启动一个spark-shell,启动Spark Shell时,必须指定mysql连接驱动jar包
$ ./bin/spark-shell --jars /usr/local/spark/jars/mysql-connector-java-5.1.40/mysql-connector-java-5.1.40-bin.jar --driver-class-path /usr/local/spark/jars/mysql-connector-java-5.1.40/mysql-connector-java-5.1.40-bin.jar

在MySQL数据库中创建了一个名称为spark的数据库,并创建了一个名称为student的表。
执行以下命令连接数据库,读取数据,并显示:

val jdbcDF = spark.read.format("jdbc").option("url", "jdbc:mysql://localhost:3306/spark")
    .option("driver", "com.mysql.jdbc.Driver").option("dbtable", "student")
    .option("user", "root").option("password","hadoop").load()
jdbcDF.show()

向student表中插入两条记录:

import java.util.Properties
import org.apache.spark.sql.types._
import org.apache.spark.sql.Row

//下面我们设置两条数据表示两个学生信息
val studentRDD = spark.sparkContext.parallelize(Array("3 Rongcheng M 26", "4 Guanhua M 27")).map(_.split(" "))

//下面要设置模式信息
val schema = StructType(List(StructField("id", IntegerType, true), StructField("name", StringType, true), StructField("gender", StringType, true), StructField("age", IntegerType, true)))

//下面创建Row对象,每个Row对象都是rowRDD中的一行
val rowRDD = studentRDD.map(p=>Row(p(0).toInt, p(1).trim, p(2).trim, p(3).toInt))

//建立起Row对象和模式之间的对应关系,也就是把数据和模式对应起来
val studentDF = spark.createDataFrame(rowRDD, schema)

//下面创建一个prop变量用来保存jdbc连接参数
val prop = new Properties()
prop.put("user", "root") // 表示用户名是root
prop.put("password", "hadoop") //表示密码是hadoop
prop.put("driver","com.mysql.jdbc.Driver") //表示驱动程序

//连接数据库,采用append模式,表示追加记录到数据表中
studentDF.wirte.mode("append").jdbc("jdbc:mysql://localhost:3306/spark", "student", prop)
相关文章
相关标签/搜索