pyspark处理数据基本语法

作为一个和数据相关的专业,想学习pyspark,从而了解并学习pyspark ,以便更好的应用到工作中。

1、连接数据库

import findspark #初始化 findspark.init() import warnings warnings.filterwarnings(ignore) from pyspark.sql import SparkSession # 定义数据库的地址,以及表,登录用户及密码 url = “jdbc:mysql://localhost:3306/xx” table=“table” #密码账户需要字典的形式传入 properties ={“user”:“root”,“password”:“12345678”} spark = SparkSession.builder.appName(My first app).getOrCreate() df = spark.read.jdbc(url=url,table=table,properties=properties) df.show(4)

2、查看数据维度

df.count(),len(df.columns)# 查看数据维度

3、查看字段类型

df.printSchema()# 元数据分析,查看字段类型

4、使用select 按照黎明筛选数据

df.select([cxkh,jymc]) # 按照列名选择

5、使用filter 过滤筛选数据

df.filter((df[jyje]>100000) & (df[jysj]>2015-01-01)) #按照条件进行筛选

6、数据类型转换

#在元数据上更改字段 df.withColumn(“jyje”,df3.jyje.astype(“int”))

7、描述性统计

df3.describe().show()

8、增加字段

df.withColumn(test,(df[jyje]+100))

9、筛选空值

df[df[jydfmc].isNull()] # 筛选未空

10、缺失值填充

dd.fillna(wz).show(2) #缺失值填充 dd.na.fill(wz).show(2) #缺失值填充

11、分组处理

#按照一个index分组 df.groupBy(jymc).count().orderBy(count,ascending=False).show(3) #按照两个index分组 df.groupBy(jymc,cxkh).agg({jyje:sum,jysj:max}).orderBy(sum(jyje),ascending=False).show(5,False) #特定函数聚合求解

12、去重

df.select([cxkh,jymc]).dropDuplicates().show(5)

13、遍历处理,类似于Python 的lambda

from pyspark.sql.functions import udf from pyspark.sql.types import StringType #类似于lambda brand_udf=udf(jyje_level,StringType()) #注册udf df.withColumn(jyje_level,brand_udf(df[jyje])).show(5)

14、模糊匹配

df.select([cxkh,jymc,jdbz,jyje,jydfmc]).filter(“jydfmc rlike 基金|理财|证券”) # 模糊匹配

15、修改指定列名

df.select(cxkh,jysj,jyje).groupBy(cxkh).agg({jysj:max,jyje:max}).\ withColumnRenamed(cxkh,cxkh1).withColumnRenamed(max(jysj),最晚一次交易时间).\ withColumnRenamed(max(jyje),最大交易金额) #修改指定字段的名字, df.seleceExpr(first_name as lase_name),这个好像需要字段名全部写上,否则最终显示结果只有更改了列名的那一列。

16、数据框拼接

df_person.join(df_lastime,df_person[cxkh]==df_lastime[cxkh1],\ left_outer).orderBy(cxkh,ascending=True)# 两表拼接指定字段的拼接,其他的连接方式:inner, outer, left_outer, right_outer, leftsemi

17、pyspark 中实现sql语句的查询

df.registerTempTable(df1) # 将数据框注册为一个table spark.sql(“select * from df1where jymc=xxx”).show(2)#程序 mysql程序

18、取交集、差集、并集

df.select(jymc).dropDuplicates().intersect(df.select(jydfmc).dropDuplicates())#交集 df.select(jymc).dropDuplicates().subtract(df.select(jydfmc).dropDuplicates())#差集 df.select(jymc).dropDuplicates().union(df.select(jydfmc).dropDuplicates())#并集

19、并集去重

df.select(jymc).union(df.select(jydfmc)).distinct()#并集+去重

20、数据拆分

from pyspark.sql.functions import split df.withColumn(jyrq,split(df_year[jysj],” “).getItem(0))#拆分后取第一个元素 df.withColumn(jytime,split(df_year[jysj],” “)[1])#拆分后取第二个元素

21、获取对应的年,月,日,一周内第几天,一年内第几天

from pyspark.sql.functions import month,year,dayofmonth,dayofweek,dayofyear df.withColumn(year,year(jysj)).\ withColumn(month,month(jysj)).\ withColumn(day,dayofmonth(jysj)).\ withColumn(week,dayofweek(jysj)).\ withColumn(day_num,dayofyear(jysj)) # 获取对应的年,月,日,一周内第几天,一年内第几天
拆分后的数据结果,对应的年月日等
拆分后的字段类型

22、数据写出

#写出在多个csv文件中,一个csv文件为一行数据 df.write.save(path.csv) #写出在一个csv 文件中 df.repartition(1).write.csv(“path.csv”,header=True, sep=”,”, mode=overwrite,encoding=utf-8) #保存为parquet格式数据。读取速度快,占用内存小 df_person_info_lasttime.write.format(parquet).save(path4.parquet) #读取parquet数据 d1=spark.read.format(parquet).load(path.csv4.parquet)

以上是第一次实践的结果,有什么不正确的地方,欢迎各位大神多多指教!不胜感激。

    THE END
    喜欢就支持一下吧
    点赞8 分享
    评论 抢沙发
    头像
    欢迎您留下宝贵的见解!
    提交
    头像

    昵称

    取消
    昵称表情代码图片

      暂无评论内容