好得很程序员自学网

<tfoot draggable='sEl'></tfoot>

深入了解SparkSQL的运用及方法

一:SparkSQL

1.SparkSQL简介

Spark SQL是Spark的一个模块,用于处理结构化的数据,它提供了一个数据抽象DataFrame(最核心的编程抽象就是DataFrame),并且SparkSQL作为分布式SQL查询引擎。
Spark SQL就是将SQL转换成一个任务,提交到集群上运行,类似于Hive的执行方式。

2.SparkSQL运行原理

将Spark SQL转化为RDD,然后提交到集群执行。

3.SparkSQL特点

(1)容易整合,Spark SQL已经集成在Spark中

(2)提供了统一的数据访问方式:JSON、CSV、JDBC、Parquet等都是使用统一的方式进行访问

(3)兼容 Hive

(4)标准的数据连接:JDBC、ODBC

二、SparkSQL运用

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

79

80

81

82

83

84

85

86

87

88

89

90

91

92

93

94

95

96

97

98

99

100

101

102

103

104

105

106

107

108

109

110

111

112

113

114

115

116

117

118

119

120

121

122

123

124

125

126

127

128

129

130

131

132

133

134

135

136

137

138

139

140

141

142

143

144

145

146

147

148

149

150

151

152

153

154

155

156

157

158

159

160

161

162

163

164

165

package sql

 

import org.apache.avro.ipc.specific.Person

import org.apache.spark

import org.apache.spark.rdd.RDD

import org.apache.spark.sql

import org.apache.spark.sql.catalyst.InternalRow

import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}

import org.junit.Test

 

class Intro {

   @Test

   def dsIntro(): Unit ={

     val spark: SparkSession = new sql.SparkSession.Builder()

       .appName( "ds intro" )

       .master( "local[6]" )

       .getOrCreate()

 

     //导入隐算是shi转换

     import spark.implicits._

 

     val sourceRDD: RDD[Person] =spark.sparkContext.parallelize(Seq(Person( "张三" , 10 ),Person( "李四" , 15 )))

     val personDS: Dataset[Person] =sourceRDD.toDS();

//personDS.printSchema()打印出错信息

 

     val resultDS: Dataset[Person] =personDS.where('age> 10 )

       .select( 'name,' age)

       .as[Person]

     resultDS.show()

 

   }

   @Test

   def dfIntro(): Unit ={

     val spark: SparkSession = new SparkSession.Builder()

       .appName( "ds intro" )

       .master( "local" )

       .getOrCreate()

 

     import spark.implicits._

     val sourceRDD: RDD[Person] = spark.sparkContext.parallelize(Seq(Person( "张三" , 10 ),Person( "李四" , 15 )))

     val df: DataFrame = sourceRDD.toDF() //隐shi转换

 

     df.createOrReplaceTempView( "person" ) //创建表

     val resultDF: DataFrame =spark.sql( "select name from person where age>=10 and age<=20" )

     resultDF.show()

 

   }

   @Test

   def database1(): Unit ={

     //1.创建sparkSession

     val spark: SparkSession = new SparkSession.Builder()

       .appName( "database1" )

       .master( "local[6]" )

       .getOrCreate()

       //2.导入引入shi子转换

     import spark.implicits._

 

     //3.演示

     val sourceRDD: RDD[Person] =spark.sparkContext.parallelize(Seq(Person( "张三" , 10 ),Person( "李四" , 15 )))

     val dataset: Dataset[Person] =sourceRDD.toDS()

 

     //Dataset 支持强类型的API

     dataset.filter(item => item.age > 10 ).show()

     //Dataset 支持若弱类型的API

     dataset.filter('age> 10 ).show()

     //Dataset 可以直接编写SQL表达式

     dataset.filter( "age>10" ).show()

   }

 

   @Test

   def database2(): Unit ={

     val spark: SparkSession = new SparkSession.Builder()

       .master( "local[6]" )

       .appName( "database2" )

       .getOrCreate()

     import spark.implicits._

 

     val dataset: Dataset[Person] =spark.createDataset(Seq(Person( "张三" , 10 ),Person( "李四" , 20 )))

     //无论Dataset中放置的是什么类型的对象,最终执行计划中的RDD上都是internalRow

     //直接获取到已经分析和解析过得Dataset的执行计划,从中拿到RDD

     val executionRdd: RDD[InternalRow] =dataset.queryExecution.toRdd

 

     //通过将Dataset底层的RDD通过Decoder转成了和Dataset一样的类型RDD

     val typedRdd:RDD[Person] = dataset.rdd

 

     println(executionRdd.toDebugString)

     println()

     println()

     println(typedRdd.toDebugString)

   }

 

   @Test

   def database3(): Unit = {

     //1.创建sparkSession

     val spark: SparkSession = new SparkSession.Builder()

       .appName( "database1" )

       .master( "local[6]" )

       .getOrCreate()

     //2.导入引入shi子转换

     import spark.implicits._

 

     val dataFrame: DataFrame = Seq(Person( "zhangsan" , 15 ), Person( "lisi" , 20 )).toDF()

     //3.看看DataFrame可以玩出什么花样

     //select name from...

     dataFrame.where('age > 10 )

       .select('name)

       .show()

   }

//  @Test

//  def database4(): Unit = {

//    //1.创建sparkSession

//    val spark: SparkSession = new SparkSession.Builder()

//      .appName("database1")

//      .master("local[6]")

//      .getOrCreate()

//    //2.导入引入shi子转换

//    import spark.implicits._

//    val personList=Seq(Person("zhangsan",15),Person("lisi",20))

//

//    //1.toDF

//    val df1: DataFrame =personList.toDF()

//    val df2: DataFrame =spark.sparkContext.parallelize(personList).toDF()

//      //2.createDataFrame

//    val df3: DataFrame =spark.createDataFrame(personList)

//

//    //3.read

//    val df4: DataFrame =spark.read.csv("")

//    df4.show()

//  }

   //toDF()是转成DataFrame,toDs是转成Dataset

   //  DataFrame就是Dataset[Row] 代表弱类型的操作,Dataset代表强类型的操作,中的类型永远是row,DataFrame可以做到运行时类型安全,Dataset可以做到 编译时和运行时都安全

@Test

def database4(): Unit = {

   //1.创建sparkSession

   val spark: SparkSession = new SparkSession.Builder()

     .appName( "database1" )

     .master( "local[6]" )

     .getOrCreate()

   //2.导入引入shi子转换

   import spark.implicits._

   val personList=Seq(Person( "zhangsan" , 15 ),Person( "lisi" , 20 ))

   //DataFrame代表弱类型操作是编译时不安全

   val df: DataFrame =personList.toDF()

 

   //Dataset是强类型的

   val ds: Dataset[Person] =personList.toDS()

   ds.map((person:Person) =>Person(person.name,person.age))

}

   @Test

   def row(): Unit ={

     //1.Row如何创建,它是什么

     //row对象必须配合Schema对象才会有列名

     val p: Person =Person( "zhangsan" , 15 )

     val row: Row =Row( "zhangsan" , 15 )

     //2.如何从row中获取数据

     row.getString( 0 )

     row.getInt( 1 )

     //3.Row也是样例类、

     row match {

       case Row(name,age) => println(name,age)

     }

   }

 

}

case class Person(name: String, age: Int)

到此这篇关于深入了解SparkSQL的运用及方法的文章就介绍到这了,更多相关SparkSQL运用内容请搜索以前的文章或继续浏览下面的相关文章希望大家以后多多支持!

原文链接:https://blog.csdn.net/m0_62491934/article/details/123840863

查看更多关于深入了解SparkSQL的运用及方法的详细内容...

  阅读:16次