作为一个和数据相关的专业,想学习pyspark,从而了解并学习pyspark ,以便更好的应用到工作中。
1、连接数据库
import findspark
#初始化
findspark.init()
import warnings
warnings.filterwarnings(ignore)
from pyspark.sql import SparkSession
# 定义数据库的地址,以及表,登录用户及密码
url = “jdbc:mysql://localhost:3306/xx”
table=“table”
#密码账户需要字典的形式传入
properties ={“user”:“root”,“password”:“12345678”}
spark = SparkSession.builder.appName(My first app).getOrCreate()
df = spark.read.jdbc(url=url,table=table,properties=properties)
df.show(4)
2、查看数据维度
df.count(),len(df.columns)# 查看数据维度
3、查看字段类型
df.printSchema()# 元数据分析,查看字段类型
4、使用select 按照黎明筛选数据
df.select([cxkh,jymc]) # 按照列名选择
5、使用filter 过滤筛选数据
df.filter((df[jyje]>100000) & (df[jysj]>2015-01-01)) #按照条件进行筛选
6、数据类型转换
#在元数据上更改字段
df.withColumn(“jyje”,df3.jyje.astype(“int”))
7、描述性统计
df3.describe().show()
8、增加字段
df.withColumn(test,(df[jyje]+100))
9、筛选空值
df[df[jydfmc].isNull()] # 筛选未空
10、缺失值填充
dd.fillna(wz).show(2) #缺失值填充
dd.na.fill(wz).show(2) #缺失值填充
11、分组处理
#按照一个index分组
df.groupBy(jymc).count().orderBy(count,ascending=False).show(3)
#按照两个index分组
df.groupBy(jymc,cxkh).agg({jyje:sum,jysj:max}).orderBy(sum(jyje),ascending=False).show(5,False) #特定函数聚合求解
12、去重
df.select([cxkh,jymc]).dropDuplicates().show(5)
13、遍历处理,类似于Python 的lambda
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType #类似于lambda
brand_udf=udf(jyje_level,StringType()) #注册udf
df.withColumn(jyje_level,brand_udf(df[jyje])).show(5)
14、模糊匹配
df.select([cxkh,jymc,jdbz,jyje,jydfmc]).filter(“jydfmc rlike 基金|理财|证券”) # 模糊匹配
15、修改指定列名
df.select(cxkh,jysj,jyje).groupBy(cxkh).agg({jysj:max,jyje:max}).\
withColumnRenamed(cxkh,cxkh1).withColumnRenamed(max(jysj),最晚一次交易时间).\
withColumnRenamed(max(jyje),最大交易金额) #修改指定字段的名字, df.seleceExpr(first_name as lase_name),这个好像需要字段名全部写上,否则最终显示结果只有更改了列名的那一列。
16、数据框拼接
df_person.join(df_lastime,df_person[cxkh]==df_lastime[cxkh1],\
left_outer).orderBy(cxkh,ascending=True)# 两表拼接指定字段的拼接,其他的连接方式:inner, outer, left_outer, right_outer, leftsemi
17、pyspark 中实现sql语句的查询
df.registerTempTable(df1) # 将数据框注册为一个table
spark.sql(“select * from df1where jymc=xxx”).show(2)#程序 mysql程序
18、取交集、差集、并集
df.select(jymc).dropDuplicates().intersect(df.select(jydfmc).dropDuplicates())#交集
df.select(jymc).dropDuplicates().subtract(df.select(jydfmc).dropDuplicates())#差集
df.select(jymc).dropDuplicates().union(df.select(jydfmc).dropDuplicates())#并集
19、并集去重
df.select(jymc).union(df.select(jydfmc)).distinct()#并集+去重
20、数据拆分
from pyspark.sql.functions import split
df.withColumn(jyrq,split(df_year[jysj],” “).getItem(0))#拆分后取第一个元素
df.withColumn(jytime,split(df_year[jysj],” “)[1])#拆分后取第二个元素
21、获取对应的年,月,日,一周内第几天,一年内第几天
from pyspark.sql.functions import month,year,dayofmonth,dayofweek,dayofyear
df.withColumn(year,year(jysj)).\
withColumn(month,month(jysj)).\
withColumn(day,dayofmonth(jysj)).\
withColumn(week,dayofweek(jysj)).\
withColumn(day_num,dayofyear(jysj)) # 获取对应的年,月,日,一周内第几天,一年内第几天
拆分后的数据结果,对应的年月日等拆分后的字段类型22、数据写出
#写出在多个csv文件中,一个csv文件为一行数据
df.write.save(path.csv)
#写出在一个csv 文件中
df.repartition(1).write.csv(“path.csv”,header=True, sep=”,”, mode=overwrite,encoding=utf-8)
#保存为parquet格式数据。读取速度快,占用内存小
df_person_info_lasttime.write.format(parquet).save(path4.parquet)
#读取parquet数据
d1=spark.read.format(parquet).load(path.csv4.parquet)
以上是第一次实践的结果,有什么不正确的地方,欢迎各位大神多多指教!不胜感激。
THE END
暂无评论内容