2018/7/12 23:00:13当前位置媒体热门新闻热点浏览文章

5.DateFrame&Dataset

1.DateFrame产生背景

DataFrame 不是Spark Sql提出的。而是在早起的Python、R、Pandas语言中就早就有了的。

Spark诞生之初一个目标就是给大数据生态圈提供一个基于通使用语言的,简单易使用的API。

1.假如想用SparkRDD进行编程,必需先学习Java,Scala,Python,成本较高
2.R语言等的DataFrame只支持单机的解决,随着Spark的不断壮大,需要拥有更广泛的受众群体利使用Spark进行分布式的解决。

2.DataFrame概述

A Dataset is a distributed collection of data. - 分布式的数据集
A DataFrame is a Dataset organized into named columns.(RDD with Schema) - 以列(列名、列的类型、列值)的形式构成的分布式数据集,依据列赋予不同的名称

It is conceptually equivalent to a table in a relational database or a data frame in R/Python.but with richer optimizations under the hood.

image.png

3.DataFrame和RDD的比照

RDD:分布式的能进行并行解决的集合
java/scala ==> JVM
python ==> python runtime

DataFrame:也是一个分布式的数据集,他更像一个传统的数据库的表,他除了数据之外,还可以知道列名,列的值,列的属性。他还可以支持一下复杂的数据结构。
java/scala/python ==> logic plan

从易使用的角度来看,DataFrame的学习成本更低。因为R语言,Python都有DataFrame,所以开发起来很方便

image.png

4.DataFrame基本API操作

image.png

看下load方法的源码

/ Loads input in as a `DataFrame`, for data sources that require a path (e.g. data backed by a local or distributed file system). @since 1.4.0/// 返回的就是一个DataFramedef load(path: String): DataFrame = {option("path", path).load(Seq.empty: _) // force invocation of `load(...varargs...)`}
package com.gwf.sparkimport org.apache.spark.sql.SparkSessionobject DataFrameApp {def main(args: Array[String]): Unit = {val spark = SparkSession.builder().appName("DataFrameApp").master("local[2]").getOrCreate()// 将json文件加载成一个dataframeval peopleDF = spark.read.format("json").load("file:///Users/gaowenfeng/software/spark-2.2.0-bin-2.6.0-cdh5.7.0/examples/src/main/resources/people.json")// 输出dataframe对应的schema信息peopleDF.printSchema()// root// |-- age: long (nullable = true)// |-- name: string (nullable = true)// 输出数据集的前20条记录peopleDF.show()// +----+-------+// | age| name|// +----+-------+// |null|Michael|// | 30| Andy|// | 19| Justin|// +----+-------+// 查询某列的所有数据 name from tablepeopleDF.select("name").show()// +-------+// | name|// +-------+// |Michael|// | Andy|// | Justin|// +-------+// 查询某几列所有的数据,并对列进行计算 name, age+10 as age2 from tablepeopleDF.select(peopleDF.col("name"),(peopleDF.col("age")+10).as("age2")).show()// +-------+----+// | name|age2|// +-------+----+// |Michael|null|// | Andy| 40|// | Justin| 29|// +-------+----+// 根据每一列的值进行过滤  from table where age > 19peopleDF.filter(peopleDF.col("age")>19).show()// +---+----+// |age|name|// +---+----+// | 30|Andy|// +---+----+// 根据每一列的值进行分组,而后聚合 age,count(1) from table group by agepeopleDF.groupBy("age").count().show()// +----+-----+// | age|count|// +----+-----+// | 19| 1|// |null| 1|// | 30| 1|// +----+-----+spark.stop()}}

5.DataFrame与RDD交互操作方式

image.png

1.通过反射的方式

前提:实现需要你知道你的字段,类型

package com.gwf.sparkimport org.apache.spark.sql.SparkSession/ DataFrameRDD的互操作/object DataFrameRDDAPP {def main(args: Array[String]): Unit = {val spark = SparkSession.builder().appName("DataFrameRDDAPP").master("local[2]").getOrCreate()val rdd = spark.sparkContext.textFile("file:///Users/gaowenfeng/project/idea/MySparkSqlProject/src/main/resources/infos.txt")// 需要导入隐式转换import spark.implicits._val infoDf = rdd.map(_.split(",")).map(line => Info(line(0).toInt, line(1), line(2).toInt)).toDF()infoDf.printSchema()infoDf.filter(infoDf.col("age") > 30).show()// Creates a local temporary view using the given name. The lifetime of this// temporary view is tied to the [[SparkSession]] that was used to create this Dataset.infoDf.createOrReplaceTempView("infos")spark.sql(" from infos where age > 30").show()}case class Info(id: Int, name: String, age: Int)}

2.编程方式

假如第一种不可以满足你的要求(事前不知道)

val rdd = spark.sparkContext.textFile("file:///Users/gaowenfeng/project/idea/MySparkSqlProject/src/main/resources/infos.txt")// 1.Create an RDD of Rows from the original RDD;val infoRDD = rdd.map(_.split(",")).map(line => Row(line(0).toInt, line(1), line(2).toInt))// 2.Create the schema represented by a StructType matching the structure of Rows in the RDD created in Step 1.val structType = StructType(Array(StructField("id",IntegerType, true),StructField("name",StringType, true),StructField("age",IntegerType, true)))// 3.Apply the schema to the RDD of Rows via createDataFrame method provided by SparkSession.val infoDF = spark.createDataFrame(infoRDD, structType)infoDF.printSchema()

3.选型,优先考虑第一种

6.DataSet 概述与用

A Dataset is a distributed collection of data. Dataset is a new interface added in Spark 1.6 that provides the benefits of RDDs (strong typing, ability to use powerful lambda functions) with the benefits of Spark SQL’s optimized execution engine. A Dataset can be constructed from JVM objects then manipulated using functional transformations (map, flatMap, filter, etc.). The Dataset API is available in Scala Java. Python does not have the support for the Dataset API. But due to Python’s dynamic nature, many of the benefits of the Dataset API are already available (i.e. you can access the field of a row by name naturally row.columnName). The case for R is similar.

A DataFrame is a Dataset organized into named columns. It is conceptually equivalent to a table in a relational database or a data frame in R/Python, but with richer optimizations under the hood. DataFrames can be constructed from a wide array of sources such as: structured data files, tables in Hive, external databases, or existing RDDs. The DataFrame API is available in Scala, Java, Python, R. In Scala Java, a DataFrame is represented by a Dataset of Rows. In the Scala API, DataFrame is simply a type alias of Dataset[Row]. While, in Java API, users need to use Dataset<Row> to represent a DataFrame.

image.png
SQL:seletf name from table compile ok result noDF:df.seletc("name") compile nodf.select("naem") compile ok result noDS:ds.select("naem") compile no

DataFrame = DataSet[Row]
DataSet 强类型 typed case class
DataFrame 弱类型

网友评论