此案例参考书籍《Spark大数据商业实战三部曲》,特做学习笔记,巩固学习过程。
案例预览:
给每位员工的年龄增加100给特定的员工年龄增加70,其他增加30对人员信息中的重复数据进行去重按年龄进行降序排序将两个表格进行合并 进行随机采样分析选择某一列进行展示进行分组统计分析将两列合并为一列计算平均年龄、最大年龄等
1 数据来源
企业人员管理系统应用案例的数据使用的是Spark官网提供的实例数据。其数据地址为:下载spark-2.3.3-bin-hadoop2.7版本或其他版本,数据在文件夹spark-2.3.3-bin-hadoop2.7\examples\src\main\resources下,名为people.json文件,该文件内容数据较少,可直接复制粘贴以下内容,进行分析:
用户信息people.json文件,每一列含义:姓名,年龄
{“name”:“Andy”, “age”:30}
{“name”:“Justin”, “age”:19}
{“name”:“Justin”, “age”:29}
{“name”:“Michael”, “age”:46}
企业人员分数评分信息peopleScores.json文件,每一列含义:姓名,评分。
{“n”:“Andy”, “score”:100}
{“n”:“Justin”, “score”:89}
2 读取数据
package SparkHadoop.SparkBusinessExample
import org.apache.spark.SparkConf
import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
object PersonSpark {
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setMaster(“local[3]”).setAppName(“personspark”)
val spark: SparkSession = SparkSession.builder()
.config(conf)
.getOrCreate()
import spark.implicits._
//读取数据
val persons: DataFrame = spark.read.json(“InData/peoplemanagedata/people.json”)
val personScores: DataFrame = spark.read.json(“InData/peoplemanagedata/peopleScores.json”)
//将读取的数据转换为强类型数据
val personDS: Dataset[Person] = persons.as[Person]//dataFrame转dataset
val personScoreDS: Dataset[PersonScore] = personScores.as[PersonScore]
println(“—————-个人信息原始数据dataset————————-“)
personDS.show()
println(“—————–个人信息数据结构信息——————————-“)
persons.printSchema()
println(“—————-评分原始数据dataset————————-“)
personScoreDS.show()
println(“—————–评分数据结构信息——————————-“)
personScoreDS.printSchema()
}
}
case class Person(name:String,age:Long)
case class PersonScore(n:String,score:Long)
结果:
—————-个人信息原始数据dataset————————-
+—+——-+
|age| name|
+—+——-+
| 16|Michael|
| 30| Andy|
| 19| Justin|
| 29| Justin|
| 46|Michael|
+—+——-+
—————–个人信息数据结构信息——————————-
root
|– age: long (nullable = true)
|– name: string (nullable = true)
—————-评分原始数据dataset————————-
+——-+—–+
| n|score|
+——-+—–+
|Michael| 88|
| Andy| 100|
| Justin| 89|
+——-+—–+
—————–评分数据结构信息——————————-
root
|– n: string (nullable = true)
|– score: long (nullable = true)
实例操作1 给每位员工的年龄增加100
import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
object PersonSpark {
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setMaster(“local[3]”).setAppName(“personspark”)
val spark: SparkSession = SparkSession.builder()
.config(conf)
.getOrCreate()
import spark.implicits._
//读取数据 val persons: DataFrame = spark.read.json(“InData/peoplemanagedata/people.json”)
val personDS: Dataset[Person] = persons.as[Person]//dataFrame转dataset
println(“—————-原始数据dataset————————-“)
personDS.show()
println(“—————–数据结构信息——————————-“)
personDS.printSchema()
//将Dataset[Person]转为Dataset[(String, Long)] 类型
val DSperson: Dataset[(String, Long)] = personDS.map {
person => {
(person.name, person.age + 100L)//给每位员工的年龄增加100
}
}
println(“—————-dataset————————-“)
DSperson.show()
println(“—————–数据结构信息——————————-“)
DSperson.printSchema()
}
}
case class Person(name:String,age:Long)
结果:
—————-原始数据dataset————————-
+—+——-+
|age| name|
+—+——-+
| 16|Michael|
| 30| Andy|
| 19| Justin|
| 29| Justin|
| 46|Michael|
+—+——-+
—————–数据结构信息——————————-
root
|– age: long (nullable = true)
|– name: string (nullable = true)
—————-dataset————————-
+——-+—+
| _1| _2|
+——-+—+
|Michael|116|
| Andy|130|
| Justin|119|
| Justin|129|
|Michael|146|
+——-+—+
—————–数据结构信息——————————-
root
|– _1: string (nullable = true)
|– _2: long (nullable = false)
相关算子可参考:
实例操作2 给特定的员工年龄增加70,其他增加30
package SparkHadoop.SparkBusinessExample
import org.apache.spark.SparkConf
import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
object PersonSpark {
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setMaster(“local[3]”).setAppName(“personspark”)
val spark: SparkSession = SparkSession.builder()
.config(conf)
.getOrCreate()
import spark.implicits._
//读取数据
val persons: DataFrame = spark.read.json(“InData/peoplemanagedata/people.json”)
val personDS: Dataset[Person] = persons.as[Person]//dataFrame转dataset
println(“—————-原始数据dataset————————-“)
personDS.show()
println(“—————–数据结构信息——————————-“)
personDS.printSchema()
personDS.flatMap( persons => persons match {
case Person(name,age) if (name == “Andy”) => List((name,age+70))
case Person(name,age) => List((name,age + 30))
}).show()
}
}
case class Person(name:String,age:Long)
结果:
—————-原始数据dataset————————-
+—+——-+
|age| name|
+—+——-+
| 16|Michael|
| 30| Andy|
| 19| Justin|
| 29| Justin|
| 46|Michael|
+—+——-+
—————–数据结构信息——————————-
root
|– age: long (nullable = true)
|– name: string (nullable = true)
+——-+—+
| _1| _2|
+——-+—+
|Michael| 46|
| Andy|100|
| Justin| 49|
| Justin| 59|
|Michael| 76|
+——-+—+
flatmap操作可参考:
实例操作3 对人员信息中的重复数据进行去重
在person.json中有两条Justin的记录,需要删除其中一条。
import org.apache.spark.sql.{DataFrame, Dataset,SparkSession}
object PersonSpark {
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setMaster(“local[3]”).setAppName(“personspark”)
val spark: SparkSession = SparkSession.builder()
.config(conf)
.getOrCreate()
import spark.implicits._
import org.apache.spark.sql.functions._
//读取数据
val persons: DataFrame = spark.read.json(“InData/peoplemanagedata/people.json”)
val personScores: DataFrame = spark.read.json(“InData/peoplemanagedata/peopleScores.json”)
//将读取的数据转换为强类型数据
val personDS: Dataset[Person] = persons.as[Person]//dataFrame转dataset val personScoreDS: Dataset[PersonScore] = personScores.as[PersonScore]
//删除姓名中重复的记录
println(“————–dropDuplicates—————–“)
personDS.dropDuplicates(“name”).show()
//dropDuplicates和distinct()的区别在于,distinct()会返回一个新的dataset,而 //dropDuplicates是在原始数据上进行操作的
println(“————–distinct()—————–“)
val distvalue: Dataset[Person] = personDS.distinct()
distvalue.show()
}
}
case class Person(name:String,age:Long)
case class PersonScore(n:String,score:Long)
实例操作4 按年龄进行降序排序
import org.apache.spark.sql.{DataFrame, Dataset,SparkSession}
object PersonSpark {
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setMaster(“local[3]”).setAppName(“personspark”)
val spark: SparkSession = SparkSession.builder()
.config(conf)
.getOrCreate()
import spark.implicits._
import org.apache.spark.sql.functions._
//读取数据
val persons: DataFrame = spark.read.json(“InData/peoplemanagedata/people.json”)
val personScores: DataFrame = spark.read.json(“InData/peoplemanagedata/peopleScores.json”)
//将读取的数据转换为强类型数据 val personDS: Dataset[Person] = persons.as[Person]//dataFrame转dataset
val personScoreDS: Dataset[PersonScore] = personScores.as[PersonScore]
//按年龄进行降序排序
println(“使用sort算子对年龄进行降序排序”)
personDS.sort($“age”.desc).show()
}
}
case class Person(name:String,age:Long)
case class PersonScore(n:String,score:Long)
结果:
使用sort算子对年龄进行降序排序
+—+——-+
|age| name|
+—+——-+
| 46|Michael|
| 30| Andy|
| 29| Justin|
| 19| Justin|
| 16|Michael|
+—+——-+
实例操作5 将两个表格进行合并
import org.apache.spark.sql.{DataFrame, Dataset,SparkSession}
object PersonSpark {
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setMaster(“local[3]”).setAppName(“personspark”)
val spark: SparkSession = SparkSession.builder()
.config(conf)
.getOrCreate()
import spark.implicits._
import org.apache.spark.sql.functions._
//读取数据
val persons: DataFrame = spark.read.json(“InData/peoplemanagedata/people.json”)
val personScores: DataFrame = spark.read.json(“InData/peoplemanagedata/peopleScores.json”)
//将读取的数据转换为强类型数据
val personDS: Dataset[Person] = persons.as[Person]//dataFrame转dataset
val personScoreDS: Dataset[PersonScore] = personScores.as[PersonScore]
println(“使用join算子关联企业人员信息,企业人员评分信息”)
personDS.join(personScoreDS,$“name”=== $“n”).show()
//使用joinWith进行内关联,返回一个tuple println(“使用joinWith算子关联企业人员信息、企业人员分数评估”)
personDS.joinWith(personScoreDS,$“name” === $“n”).show()
}
}
case class Person(name:String,age:Long)
case class PersonScore(n:String,score:Long)
结果:
使用join算子关联企业人员信息,企业人员评分信息
+—+——-+——-+—–+
|age| name| n|score|
+—+——-+——-+—–+
| 16|Michael|Michael| 88|
| 30| Andy| Andy| 100|
| 19| Justin| Justin| 89|
| 29| Justin| Justin| 89|
| 46|Michael|Michael| 88|
+—+——-+——-+—–+
使用joinWith算子关联企业人员信息、企业人员分数评估
+————-+————-+
| _1| _2|
+————-+————-+
|[16, Michael]|[Michael, 88]|
| [30, Andy]| [Andy, 100]|
| [19, Justin]| [Justin, 89]|
| [29, Justin]| [Justin, 89]|
|[46, Michael]|[Michael, 88]|
+————-+————-+
实例操作6 进行随机采样分析
import org.apache.spark.sql.{DataFrame, Dataset,SparkSession}
object PersonSpark {
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setMaster(“local[3]”).setAppName(“personspark”)
val spark: SparkSession = SparkSession.builder()
.config(conf)
.getOrCreate()
import spark.implicits._
import org.apache.spark.sql.functions._
//读取数据
val persons: DataFrame = spark.read.json(“InData/peoplemanagedata/people.json”)
val personScores: DataFrame = spark.read.json(“InData/peoplemanagedata/peopleScores.json”)
//将读取的数据转换为强类型数据
val personDS: Dataset[Person] = persons.as[Person]//dataFrame转dataset
val personScoreDS: Dataset[PersonScore] = personScores.as[PersonScore]
//
println(“使用randoomsplit算子进行随机切分”)
//randoomsplit的参数weights表示权重 personDS.randomSplit(Array(10,20)).foreach(dataset => dataset.show())
println(“使用sample进行随机采样,第一个参数为true时,表示有放回采样”)
personDS.sample(false,0.5).show()
//select选择年龄进行展示
println(“使用select算子选择列”)
personDS.select(“name”).show()
}
}
case class Person(name:String,age:Long)
case class PersonScore(n:String,score:Long)
结果:
使用randoomsplit算子进行随机切分
+—+—-+
|age|name|
+—+—-+
| 30|Andy|
+—+—-+
+—+——-+
|age| name|
+—+——-+
| 16|Michael|
| 19| Justin|
| 29| Justin|
| 46|Michael|
+—+——-+
使用sample进行随机采样,第一个参数为true时,表示有放回采样
+—+——-+
|age| name|
+—+——-+
| 16|Michael|
| 30| Andy|
+—+——-+
使用select算子选择列
+——-+
| name|
+——-+
|Michael|
| Andy|
| Justin|
| Justin|
|Michael|
+——-+
实例操作7 选择某一列进行展示
import org.apache.spark.sql.{DataFrame, Dataset,SparkSession}
object PersonSpark {
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setMaster(“local[3]”).setAppName(“personspark”)
val spark: SparkSession = SparkSession.builder()
.config(conf)
.getOrCreate()
import spark.implicits._
import org.apache.spark.sql.functions._
//读取数据
val persons: DataFrame = spark.read.json(“InData/peoplemanagedata/people.json”)
val personScores: DataFrame = spark.read.json(“InData/peoplemanagedata/peopleScores.json”)
//将读取的数据转换为强类型数据
val personDS: Dataset[Person] = persons.as[Person]//dataFrame转dataset
val personScoreDS: Dataset[PersonScore] = personScores.as[PersonScore]
//select选择年龄进行展示
println(“使用select算子选择列”)
personDS.select(“name”).show()
}
}
case class Person(name:String,age:Long)
case class PersonScore(n:String,score:Long)
结果:
使用select算子选择列
+——-+
| name|
+——-+
|Michael|
| Andy|
| Justin|
| Justin|
|Michael|
+——-+
实例操作8 进行分组统计分析
package SparkHadoop.SparkBusinessExample
import org.apache.spark.SparkConf
import org.apache.spark.sql.{DataFrame, Dataset,SparkSession}
object PersonSpark {
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setMaster(“local[3]”).setAppName(“personspark”)
val spark: SparkSession = SparkSession.builder()
.config(conf)
.getOrCreate()
import spark.implicits._
import org.apache.spark.sql.functions._
//读取数据
val persons: DataFrame = spark.read.json(“InData/peoplemanagedata/people.json”)
val personScores: DataFrame = spark.read.json(“InData/peoplemanagedata/peopleScores.json”)
//将读取的数据转换为强类型数据
val personDS: Dataset[Person] = persons.as[Person]//dataFrame转dataset
val personScoreDS: Dataset[PersonScore] = personScores.as[PersonScore]
println(“按照姓名,年龄进行分组”)
//$”name”和col(“name”)等价
val personDSGrouped: DataFrame = personDS.groupBy($“name”,$“age”).count()
personDSGrouped.show()
}
}
case class Person(name:String,age:Long)
case class PersonScore(n:String,score:Long)
结果:
按照姓名,年龄进行分组
+——-+—+—–+
| name|age|count|
+——-+—+—–+
| Justin| 29| 1|
| Andy| 30| 1|
|Michael| 16| 1|
|Michael| 46| 1|
| Justin| 19| 1|
+——-+—+—–+
实例操作9 将两列合并为一列
package SparkHadoop.SparkBusinessExample
import org.apache.spark.SparkConf
import org.apache.spark.sql.{DataFrame, Dataset,SparkSession}
object PersonSpark {
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setMaster(“local[3]”).setAppName(“personspark”)
val spark: SparkSession = SparkSession.builder()
.config(conf)
.getOrCreate()
import spark.implicits._
import org.apache.spark.sql.functions._
//读取数据 val persons: DataFrame = spark.read.json(“InData/peoplemanagedata/people.json”)
val personScores: DataFrame = spark.read.json(“InData/peoplemanagedata/peopleScores.json”)
//将读取的数据转换为强类型数据
val personDS: Dataset[Person] = persons.as[Person]//dataFrame转dataset
val personScoreDS: Dataset[PersonScore] = personScores.as[PersonScore]
println(“使用agg算子concat内置函数,将姓名、年龄链接在一起,成为单个字符串列”)
personDS.groupBy($“name”,$“age”).agg(concat($“name”,$“name”)).show
}
}
case class Person(name:String,age:Long)
case class PersonScore(n:String,score:Long)
结果:
使用agg算子concat内置函数,将姓名、年龄链接在一起,成为单个字符串列
+——-+—+——————+
| name|age|concat(name, name)|
+——-+—+——————+
| Justin| 29| JustinJustin|
| Andy| 30| AndyAndy|
|Michael| 16| MichaelMichael|
|Michael| 46| MichaelMichael|
| Justin| 19| JustinJustin|
+——-+—+——————+
实例操作10 计算平均年龄、最大年龄等
import org.apache.spark.sql.{DataFrame, Dataset,SparkSession}
object PersonSpark {
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setMaster(“local[3]”).setAppName(“personspark”)
val spark: SparkSession = SparkSession.builder()
.config(conf)
.getOrCreate()
import spark.implicits._
import org.apache.spark.sql.functions._
//读取数据
val persons: DataFrame = spark.read.json(“InData/peoplemanagedata/people.json”)
val personScores: DataFrame = spark.read.json(“InData/peoplemanagedata/peopleScores.json”)
//将读取的数据转换为强类型数据
val personDS: Dataset[Person] = persons.as[Person]//dataFrame转dataset val personScoreDS: Dataset[PersonScore] = personScores.as[PersonScore]
//进行简单统计计算
personDS.groupBy($“name”)
.agg(sum($“age”),avg($“age”),max($“age”),min($“age”),count($“age”),countDistinct($“age”),mean($“age”),
current_date())
.show()
}
}
case class Person(name:String,age:Long)
case class PersonScore(n:String,score:Long)
结果:
+——-+——–+——–+——–+——–+———-+——————-+——–+————–+
| name|sum(age)|avg(age)|max(age)|min(age)|count(age)|count(DISTINCT age)|avg(age)|current_date()|
+——-+——–+——–+——–+——–+———-+——————-+——–+————–+
|Michael| 62| 31.0| 46| 16| 2| 2| 31.0| 2021–08–20|
| Andy| 30| 30.0| 30| 30| 1| 1| 30.0| 2021–08–20|
| Justin| 48| 24.0| 29| 19| 2| 2| 24.0| 2021–08–20|
+——-+——–+——–+——–+——–+———-+——————-+——–+————–+
暂无评论内容