本文共 6092 字,大约阅读时间需要 20 分钟。
它是spark中用于处理结构化数据的一个模块
功能跟hive类似,最早shark, 为了解决hive速度慢的问题
Spark SQL 和 Spark Core的关系(RDD)与 hive 和 MapReduce关系类似
Spark SQL 优势
代码少
spark SQL操作数据的两种方式
速度快(对比直接写RDD代码)
DataFrame 在操作结构化数据的时候
RDD是分布式的对象的集合,Spark并不知道对象的详细模式信息;DataFrame相当于是一个带着schema(提供由列组成的详细模式信息)的RDD
DataFrame还引入了off-heap,意味着可以使用JVM堆以外的内存
RDD是分布式的Java对象的集合。DataFrame是分布式的Row对象的集合
解决了RDD序列化、反序列化开销大,频繁创建和销毁对象造成大量的GC(Garbage Collection) 垃圾回收机制的缺点
丢失了RDD编译时进行类型检查,具有面向对象编程的特点的优点
DataFrame提供比RDD更丰富的算子,能够提升执行效率,减少数据读取以及执行计划的优化
通过DataFrame API或SQL处理数据,会自动经过Spark 优化器Catalyst的优化,即使你写的程序或SQL不仅高效,也可以运行的很快。
启动spark集群
cd ~/bigdata/spark/sbin./start-master.sh - h 192.168.19.137./start-slave.sh spark://192.168.19.137source active py365jupyter notebook --ip 0.0.0.0 --allow-root
配置环境 python和java解释器
import osJAVA_HOME = '/root/bigdata/jdk'PYSPARK_PYTHON = "/miniconda2/envs/py365/bin/python"os.environ["JAVA_HOME"] = JAVA_HOMEos.environ["PYSPARK_PYTHON"] = PYSPARK_PYTHONos.environ["PYSPARK_DRIVER_PYTHON"] = PYSPARK_PYTHONfrom pyspark import SparkContext,SparkConffrom pyspark.sql import SparkSession
创建DataFrame需要一个spark session 与创建RDD需要spark context
SPARK_APP_NAME = "dataframetest"SPARK_URL = "spark://192.168.19.137:7077"conf = SparkConf() # 创建spark config对象config = ( ("spark.app.name", SPARK_APP_NAME), # 设置启动的spark的app名称,没有提供,将随机产生一个名称 ("spark.executor.memory", "6g"), # 设置该app启动时占用的内存用量,默认1g ("spark.master", SPARK_URL), # spark master的地址 ("spark.executor.cores", "4"), # 设置spark executor使用的CPU核心数conf.setAll(config)# 利用config对象,创建spark sessionspark = SparkSession.builder.config(conf=conf).getOrCreate()
from pyspark.sql import SparkSessionfrom pyspark.sql import Rowspark = SparkSession.builder.appName('test').getOrCreate()# DataFrame -> RDDsc = spark.sparkContext l = [('Ankit',25),('Jalfaizy',22),('saurabh',20),('Bala',26)]rdd = sc.parallelize(l)#为数据添加列名 注意row对象people = rdd.map(lambda x: Row(name=x[0], age=int(x[1])))#RDD->DataFrameschemaPeople = spark.createDataFrame(people)df = spark.createDataFrame(Row对象的RDD)
df = spark.read.format('csv').option('header','true').load('/iris.csv')#显示数据结构df.printSchema()#显示前10条数据df.show(10)#统计总量df.count()#列名df.columns
jdbcDF = spark.read.format("jdbc").option("url","jdbc:mysql://localhost:3306/db_name").option("dbtable","table_name").option("user","xxx").option("password","xxx").load()
from pyspark.sql import SparkSessionfrom pyspark.sql.types import *jsonString = [ """{ "id" : "01001", "city" : "AGAWAM", "pop" : 15338, "state" : "MA" }""", """{ "id" : "01002", "city" : "CUSHMAN", "pop" : 36963, "state" : "MA" }"""]spark = SparkSession.builder.appName('json_demo').getOrCreate()sc = spark.sparkContextjsonRDD = sc.parallelize(jsonString)#定义结构类型 不带嵌套#StructType:schema的整体结构,表示JSON的对象结构#XXXStype:指的是某一列的数据类型jsonSchema = StructType() \ .add("id", StringType(),True) \ .add("city", StringType()) \ .add("pop" , LongType()) \ .add("state",StringType()) # 带嵌套 ArrayTypejsonSchema = StructType([ StructField("id", StringType(), True), StructField("city", StringType(), True), StructField("loc" , ArrayType(DoubleType())), StructField("pop", LongType(), True), StructField("state", StringType(), True)])# 读取表结构reader = spark.read.schema(jsonSchema)jsonDF = reader.json(jsonRDD)jsonDF.printSchema()jsonDF.show()jsonDF.filter(jsonDF.pop>4000).show(10)
增加或替换一列
df.withColumn('列名',数值).show()
删除一列
df.drop('列名').show()
统计信息
df.describe().show()#计算某一列的描述信息df.describe('cls').show()
提取部分列
df.select('列名','列名').show()
基本统计功能
df.select('列名').distinct().count()
分组统计
# 分组统计 groupby(colname).agg({'col':'fun','col2':'fun2'})df.groupby('列名').agg({ '列名':'mean','列名':'max'}).show()# avg(), count(), countDistinct(), first(), kurtosis(),# max(), mean(), min(), skewness(), stddev(), stddev_pop(),# stddev_samp(), sum(), sumDistinct(), var_pop(), var_samp() and variance()
自定义方法 并重命名
# 自定义的汇总方法import pyspark.sql.functions as fn#调用函数并起一个别名df.agg(fn.count('SepalWidth').alias('width_count'),fn.countDistinct('cls').alias('distinct_cls_count')).show()
拆分数据集
trainDF, testDF = df.randomSplit([0.6, 0.4])
采样数据
#第一个参数withReplacement:是否有放回的采样#第二个参数fraction:采样比例#第三个参数seed:随机种子sdf = df.sample(False,0.2,100)
查看两个数据集在类别上的差异
#查看两个数据集在类别上的差异 subtract,确保训练数据集覆盖了所有分类diff_in_train_test = testDF.select('cls').subtract(trainDF.select('cls'))diff_in_train_test.distinct().count()
交叉表
df.crosstab('cls','SepalLength').show()
udf
udf:自定义函数
创建udf,udf函数需要两个参数: 1. Function 2. Return type (in my case StringType()) 在RDD中可以直接定义函数,交给rdd的transformatioins方法进行执行 在DataFrame中需要通过udf将自定义函数封装成udf函数再交给DataFrame进行调用执行
#================== 综合案例 + udf================# 测试数据集中有些类别在训练集中是不存在的,找到这些数据集做后续处理from pyspark.sql.types import StringTypefrom pyspark.sql.functions import udftrainDF,testDF = df.randomSplit([0.99,0.01])diff_in_train_test = trainDF.select('cls').subtract(testDF.select('cls')).distinct().show()#首先找到这些类,整理到一个列表not_exist_cls = trainDF.select('cls').subtract(testDF.select('cls')).distinct().rdd.map(lambda x :x[0]).collect()#定义一个方法,用于检测def should_remove(x): if x in not_exist_cls: return -1 else : return xcheck = udf(should_remove,StringType())resultDF = trainDF.withColumn('New_cls',check(trainDF['cls'])).filter('New_cls <> -1')resultDF.show()
详见
转载地址:http://smdsn.baihongyu.com/