博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Spark Sql 和DataFrame总结
阅读量:3748 次
发布时间:2019-05-22

本文共 6092 字,大约阅读时间需要 20 分钟。

Spark Sql 和DataFrame总结

1. Spark SQL概述

  • 它是spark中用于处理结构化数据的一个模块

  • 功能跟hive类似,最早shark, 为了解决hive速度慢的问题

  • Spark SQL 和 Spark Core的关系(RDD)与 hive 和 MapReduce关系类似

  • Spark SQL 优势

    • 代码少

    • spark SQL操作数据的两种方式

      • 通过sql语句
      • 自带了DataFrame的API
    • 速度快(对比直接写RDD代码)

      • SparkSQL API 转换成RDD的时候会做执行优化(catalyst优化器、钨丝计划、代码生成器)
      • 优化引擎转化的RDD代码比自己写的效率更高
    • 在这里插入图片描述

  • DataFrame 在操作结构化数据的时候

    • 引入了schema(表结构)
    • off-heap 突破虚拟机的限制,能够使用操作系统层面上的内存
    • 解决了RDD序列化、反序列化开销大频繁创建和销毁对象造成大量的GC(Garbage Collection) 垃圾回收机制的缺点
    • 丢失了RDD编译时进行类型检查,具有面向对象编程的特点的优点

2. DataFrame

2.1 DataFrame概述

  • RDD为基础分布式数据集,类似于传统关系型数据库的二维表,dataframe记录了对应列的名称和类型
  • DataFrame是一个分布式的行集合dataset[ROW]
  • 基于RDD
    • Immuatable 不可变的 只能生成新的RDD
    • Lazy Evaluations
      • transformation 延迟执行
      • action 执行了action之后transformation才会触发
    • Distributed 分布式
    • dataframe和dataset统一,dataframe只是**dataset[ROW]**的类型别名。由于Python是弱类型语言,只能使用DataFrame

2.2 DataFrame vs RDD 区别

  • RDD是分布式的对象的集合,Spark并不知道对象的详细模式信息;DataFrame相当于是一个带着schema(提供由列组成的详细模式信息)的RDD

  • 在这里插入图片描述

  • DataFrame还引入了off-heap,意味着可以使用JVM堆以外的内存

  • RDD是分布式的Java对象的集合。DataFrame是分布式的Row对象的集合

  • 解决了RDD序列化、反序列化开销大频繁创建和销毁对象造成大量的GC(Garbage Collection) 垃圾回收机制的缺点

  • 丢失了RDD编译时进行类型检查,具有面向对象编程的特点的优点

  • DataFrame提供比RDD更丰富的算子,能够提升执行效率,减少数据读取以及执行计划的优化

  • 通过DataFrame API或SQL处理数据,会自动经过Spark 优化器Catalyst的优化,即使你写的程序或SQL不仅高效,也可以运行的很快。

2.3 Pandas DataFrame vs Spark DataFrame

  • 单机 VS 分布式集群并行计算
  • Spark DataFrame 延迟执行
  • Spark DataFrame 不可变
  • Pandas DataFrame API 更丰富

3. DataFrame 操作

3.1 创建DataFrame

  • 启动spark集群

    cd ~/bigdata/spark/sbin./start-master.sh - h 192.168.19.137./start-slave.sh spark://192.168.19.137source active py365jupyter notebook --ip 0.0.0.0 --allow-root
  • 配置环境 python和java解释器

    import osJAVA_HOME = '/root/bigdata/jdk'PYSPARK_PYTHON = "/miniconda2/envs/py365/bin/python"os.environ["JAVA_HOME"] = JAVA_HOMEos.environ["PYSPARK_PYTHON"] = PYSPARK_PYTHONos.environ["PYSPARK_DRIVER_PYTHON"] = PYSPARK_PYTHONfrom pyspark import SparkContext,SparkConffrom pyspark.sql import SparkSession
  • 创建DataFrame需要一个spark session 与创建RDD需要spark context

    SPARK_APP_NAME = "dataframetest"SPARK_URL = "spark://192.168.19.137:7077"conf = SparkConf()    # 创建spark config对象config = (  ("spark.app.name", SPARK_APP_NAME),    # 设置启动的spark的app名称,没有提供,将随机产生一个名称  ("spark.executor.memory", "6g"),    # 设置该app启动时占用的内存用量,默认1g  ("spark.master", SPARK_URL),    # spark master的地址  ("spark.executor.cores", "4"),    # 设置spark executor使用的CPU核心数conf.setAll(config)# 利用config对象,创建spark sessionspark = SparkSession.builder.config(conf=conf).getOrCreate()

3.1.1 从RDD创建DataFrame

from pyspark.sql import SparkSessionfrom pyspark.sql import Rowspark = SparkSession.builder.appName('test').getOrCreate()# DataFrame -> RDDsc = spark.sparkContext l = [('Ankit',25),('Jalfaizy',22),('saurabh',20),('Bala',26)]rdd = sc.parallelize(l)#为数据添加列名 注意row对象people = rdd.map(lambda x: Row(name=x[0], age=int(x[1])))#RDD->DataFrameschemaPeople = spark.createDataFrame(people)df = spark.createDataFrame(Row对象的RDD)

3.1.2 从CSV文件创建DataFrame

df = spark.read.format('csv').option('header','true').load('/iris.csv')#显示数据结构df.printSchema()#显示前10条数据df.show(10)#统计总量df.count()#列名df.columns

3.1.3连接数据库

jdbcDF = spark.read.format("jdbc").option("url","jdbc:mysql://localhost:3306/db_name").option("dbtable","table_name").option("user","xxx").option("password","xxx").load()

3.1.4 读取json数据

from pyspark.sql import SparkSessionfrom pyspark.sql.types import *jsonString = [  """{ "id" : "01001", "city" : "AGAWAM",  "pop" : 15338, "state" : "MA" }""",  """{ "id" : "01002", "city" : "CUSHMAN", "pop" : 36963, "state" : "MA" }"""]spark =  SparkSession.builder.appName('json_demo').getOrCreate()sc = spark.sparkContextjsonRDD = sc.parallelize(jsonString)#定义结构类型 不带嵌套#StructType:schema的整体结构,表示JSON的对象结构#XXXStype:指的是某一列的数据类型jsonSchema = StructType() \  .add("id", StringType(),True) \  .add("city", StringType()) \  .add("pop" , LongType()) \  .add("state",StringType())  # 带嵌套 ArrayTypejsonSchema = StructType([  StructField("id", StringType(), True),  StructField("city", StringType(), True),  StructField("loc" , ArrayType(DoubleType())),  StructField("pop", LongType(), True),  StructField("state", StringType(), True)])# 读取表结构reader = spark.read.schema(jsonSchema)jsonDF = reader.json(jsonRDD)jsonDF.printSchema()jsonDF.show()jsonDF.filter(jsonDF.pop>4000).show(10)

3.2 DataFrame操作

在这里插入图片描述

增加或替换一列

df.withColumn('列名',数值).show()

删除一列

df.drop('列名').show()

统计信息

df.describe().show()#计算某一列的描述信息df.describe('cls').show()

提取部分列

df.select('列名','列名').show()

基本统计功能

df.select('列名').distinct().count()

分组统计

# 分组统计 groupby(colname).agg({'col':'fun','col2':'fun2'})df.groupby('列名').agg({
'列名':'mean','列名':'max'}).show()# avg(), count(), countDistinct(), first(), kurtosis(),# max(), mean(), min(), skewness(), stddev(), stddev_pop(),# stddev_samp(), sum(), sumDistinct(), var_pop(), var_samp() and variance()

自定义方法 并重命名

# 自定义的汇总方法import pyspark.sql.functions as fn#调用函数并起一个别名df.agg(fn.count('SepalWidth').alias('width_count'),fn.countDistinct('cls').alias('distinct_cls_count')).show()

拆分数据集

trainDF, testDF = df.randomSplit([0.6, 0.4])

采样数据

#第一个参数withReplacement:是否有放回的采样#第二个参数fraction:采样比例#第三个参数seed:随机种子sdf = df.sample(False,0.2,100)

查看两个数据集在类别上的差异

#查看两个数据集在类别上的差异 subtract,确保训练数据集覆盖了所有分类diff_in_train_test = testDF.select('cls').subtract(trainDF.select('cls'))diff_in_train_test.distinct().count()

交叉表

df.crosstab('cls','SepalLength').show()

udf

udf:自定义函数

创建udf,udf函数需要两个参数: 1. Function 2. Return type (in my case StringType()) 在RDD中可以直接定义函数,交给rdd的transformatioins方法进行执行 在DataFrame中需要通过udf将自定义函数封装成udf函数再交给DataFrame进行调用执行

3.3 综合案例

#================== 综合案例 + udf================# 测试数据集中有些类别在训练集中是不存在的,找到这些数据集做后续处理from pyspark.sql.types import StringTypefrom pyspark.sql.functions import udftrainDF,testDF = df.randomSplit([0.99,0.01])diff_in_train_test = trainDF.select('cls').subtract(testDF.select('cls')).distinct().show()#首先找到这些类,整理到一个列表not_exist_cls = trainDF.select('cls').subtract(testDF.select('cls')).distinct().rdd.map(lambda x :x[0]).collect()#定义一个方法,用于检测def should_remove(x):    if x in not_exist_cls:        return -1    else :        return xcheck = udf(should_remove,StringType())resultDF = trainDF.withColumn('New_cls',check(trainDF['cls'])).filter('New_cls <> -1')resultDF.show()

4. 用DataFrame对数据进行去重、缺失值处理、异常值处理

详见

转载地址:http://smdsn.baihongyu.com/

你可能感兴趣的文章
算法训练 1的个数(输入正整数n,判断从1到n之中,数字1一共要出现几次。例如1123这个数,则出现了两次1。例如15,那么从1到15之中,一共出现了8个1。)
查看>>
算法训练 素因子去重(给定一个正整数n,求一个正整数p,满足p仅包含n的所有素因子,且每个素因子的次数不大于1)
查看>>
算法训练 二进制数数( 给定L,R。统计[L,R]区间内的所有数在二进制下包含的“1”的个数之和。   如5的二进制为101,包含2个“1”。)
查看>>
第十届MathorCup高校数学建模D题解题思路
查看>>
2020年高教社杯全国大学生数学建模竞赛赛题 C题分析与思路!(持续更新)
查看>>
2020年高教社杯全国大学生数学建模竞赛赛题 B题分析与思路!(持续更新)
查看>>
蓝桥杯真题 18省4-测试次数 x星球的居民脾气不太好,但好在他们生气的时候唯一的异常举动是:摔手机。 各大厂商也就纷纷推出各种耐摔型手机。x星球的质监局规定了手机必须经过耐摔测试,并且评定出一个耐
查看>>
蓝桥杯真题 19省3-数列求值 给定数列 1, 1, 1, 3, 5, 9, 17, …,从第 4 项开始,每项都是前 3 项的和。求第 20190324 项的最后 4 位数字。
查看>>
大小写字母转换函数tolower();的用法
查看>>
蓝桥杯 15校4-7对数字 今有7对数字:两个1,两个2,两个3,...两个7,把它们排成一行。 要求,两个1间有1个其它数字,两个2间有2个其它数字,以此类推,两个7之间有7个其它数字。如下就是
查看>>
蓝桥杯真题 17省10-k倍区间 给定一个长度为N的数列,A1, A2, ... AN,如果其中一段连续的子序列Ai, Ai+1, ... Aj(i <= j)之和是K的倍数,我们就称这个区间[i
查看>>
TCP协议的流量控制
查看>>
TCP连接的三次握手过程,为什么不是两次或四次?
查看>>
小白都能看懂的DNS解析过程
查看>>
HTTP和HTTPS的区别?描述HTTPS的工作过程
查看>>
简述一下HTTP的状态码
查看>>
20210227vulhub靶场之环境配置---无法获得靶机IP的疑难解决方式(可以解决VBox和VMware不兼容问题)
查看>>
20210226web渗透学习之SSRF总结
查看>>
2021-06-01web渗透学习之sqlserver提权(转)
查看>>
大数据之Flume
查看>>