[TOC]
spark SQL语法 与 DSL语法
无论是hadoop、spark、flink其都具备一些共性的功能,都试图不断完善自己的功能。
包括:离线批处理api,离线批处理sql编写能力、DSL语法,实时处理能力
Hadoop:只有离线批处理api hive:离线批处理sql编写能力,对hadoop进行功能进行完善
Spark:离线批处理api,离线批处理sql编写能力、DSL语法,实时处理能力
Flink:不区分批处理和流处理,统一表编写程序。其只区分了基础底层datastream api,以及高级接口table api和sql 语法
rdd,dataframe,dataset三者都是分布式弹性数据集Resilient
RDD相比DataFrame不支持sql操作,一般与mlib一起使用。DataFrame是指定了列名的,可以通过列名访问。
DataFrame是Dataset的一个特例,其类型为Dataset[Row]。两者都支持sql操作,比如select,groupby。
spark sql 与hive集成
https://blog.csdn.net/Clown_34/article/details/122421267
共用几种方案:
1将hive的配置文件链接到spark的conf文件夹,还有mysql connector、hdfs的配置文件等。使用spark-shell执行sql语法。或在程序中使用spark程序的sparkcontext,借助spark.sql执行sql语句。
2将hive的配置文件链接到spark的conf文件夹,还有mysql connector、hdfs的配置文件等。spark-sql执行sql语句。也可以开启thriftserver,使用beeline直接执行sql语句,和hiveserver2形式一样。
SQL 与 DSL的转换
#一个dataframe或dataset想要执行sql语句,需要创建View表,才能操作。
val df=spark.read.json(“data/user.json”)
df.createOrReplaceTempView(“people”)
#一个sql语句的查询结果就是dataframe,之后就可以执行DSL语法的语句。
val sqlDF=spark.sql(“SELECT * FROM people”)
sqlDF.show()
SQL语法
sql语法风格是指查询数据时使用sql语句来查询,这种风格的查询必须要有临时试图或者全局视图来辅助
#对DataFrame创建一个临时表,这样可以使用sql进行操作
val df=spark.read.json(“data/user.json”)
df.createOrReplaceTempView(“people”)
val sqlDF=spark.sql(“SELECT * FROM people”)
sqlDF.show()
#普通临时表是 Session 范围内的,如果想应用范围内有效,可以使用全局临时表。使
用全局临时表时需要全路径访问,如:global_temp.people
#创建全局表
df.createGlobalTempView(“people”)
spark.sql(“SELECT * FROM global_temp.people”).show()
spark.newSession().sql(“SELECT * FROM global_temp.people”).show()
DataFrame
创建DataFrame
#从本地文件系统的json文件创建dataframe
val df=spark.read.json(“data/user.json”)
#从RDD创建dataframe
#从hive table进行查询返回
DSL语法
domain-specific language,DSL语法用于管理结构化数据,可以使用scala、java、python等编写DSL语法语句,无需创建临时视图使用sql了。
无需编写符合sql规范的语句,可以灵活的与编程语言粘合。
#等同于sql的select语句,注意:涉及到运算的时候, 每列都必须使用$, 或者采用引号表达式:单引号+字段名
df.select($”username”,$”age” + 1).show
df.select(username, age + 1).show()
#
df.filter($”age”>30).show
df.groupBy(“age”).count.show
DSL语法与sql差异
where 和 filter的差异,where是filter的别名
$取列值是语法糖,本质是返回一个column对象
RDD与DataFrame互相转换
在IDEA开发程序时,如果需要将RDD于DF和DS之间互相操作,需要import spark.implicits._
这里的 spark 不是 Scala 中的包名,而是创建的 sparkSession 对象的变量名称,所以必 须先创建 SparkSession 对象再导入。这里的 spark 对象不能使用 var 声明,因为 Scala 只支持 val 修饰的对象的引入。
val idRDD=sc.textFile(“id.txt”)
idRDD.toDF(“id”).show
#DataFrame转RDD
val df = sc.makeRDD(List((“zhangsan”,30), (“lisi”,40))).map(t=>User(t._1,
t._2)).toDF
val rdd = df.rdd
val array = rdd.collect
array(0)
array(0)(0)
array(0).getAs[String](“name”)
#开发中,常常通过样例类将rdd转换为dataframe,如果是一个样例类,那么可以直接取属性名作为dataframe的列名
case class User(name:String,age:Int)
sc.makeRDD(List((“zhangsan”,30),(“lisi”,40))).map(t=>User(t._1,t._2)).toDF.show
直接调用toDF实际上是借助隐式转换完成的,一般不使用,我们可以通过createDataFrame转换rdd到df或ds
https://blog.csdn.net/sunyiyuan1213/article/details/91450379
#其中spark是创建的sparkSession
val classDF: DataFrame = spark.createDataFrame(usersRow)
val structDf: DataFrame = spark.createDataFrame(structRow,structSchema)
Dataset
Dataset是具有强类型的数据集合,需要提供对应的类型信息。
创建Dataset
#使用样例类序列创建DataSet
case class Person(name:String,age:Long)
val caseClassDS=Seq(Person(“zhangsan”,2)).toDS()
caseClassDS.show
#使用基本类型的序列创建DataSet
val ds=Seq(1,2,3,4,5).toDS
ds.show
#通过 SparkSession.createDataset() 直接创建
val spark = SparkSession.builder().config(conf).getOrCreate();
import spark.implicits._;
val ds = spark.createDataset(List(Person(“Jason”,34,”DBA”),Person(“Tom”,20,”Dev”)));
ds.show();
Dataset与其他类型之间的转换
#注意:在实际使用的时候,很少用到把序列转换成DataSet,更多的是通过RDD来得到DataSet
#RDD转DataSet
ds1=sc.makeRDD(List((“zhangsna”,30),(“lisa”,60))).map(t=>User(t._1,t._2)).toDS
#Dataset转RDD
ds1.rdd
#DataFrame和DataSet转换
val df=sc.makeRDD(List((“zhangsan”,30))).toDF(“name”,”age”)
val ds=df.as[User]
VAL df=ds.toDF
三者的共性
1.都是spark平台下的分布式弹性数据集,为处理大型数据提供便利。
2.三者都有惰性机制,在创建、转换,如map方法时,不会立即执行,只有碰到action如foreach时,三者才会开始遍历运算。
3.三者有一些共同的函数,如filter,排序等。
4.在对 DataFrame 和 Dataset 进行操作许多操作都需要这个包:import spark.implicits._(在 创建好 SparkSession 对象后尽量直接导入)
5.都有partition的概念
Dataframe相比于rdd,多了列名,可以方便进行sql。rdd无法直接查看每一列的值,必须通过解析。
Dataframe时Dataset的特例,相当于指定类型为Row,类型可以为person、teacher等。 Row是无法知道每列字段的具体类型的,所以其是弱类型的,
暂无评论内容