一:SparkSQL
1.SparkSQL简介
Spark SQL是Spark的一个模块,用于处理结构化的数据,它提供了一个数据抽象DataFrame(最核心的编程抽象就是DataFrame),并且SparkSQL作为分布式SQL查询引擎。
Spark SQL就是将SQL转换成一个任务,提交到集群上运行,类似于Hive的执行方式。
2.SparkSQL运行原理
将Spark SQL转化为RDD,然后提交到集群执行。
3.SparkSQL特点
(1)容易整合,Spark SQL已经集成在Spark中
(2)提供了统一的数据访问方式:JSON、CSV、JDBC、Parquet等都是使用统一的方式进行访问
(3)兼容 Hive
(4)标准的数据连接:JDBC、ODBC
二、SparkSQL运用
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 |
package sql
import org.apache.avro.ipc.specific.Person import org.apache.spark import org.apache.spark.rdd.RDD import org.apache.spark.sql import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession} import org.junit.Test
class Intro { @Test def dsIntro(): Unit ={ val spark: SparkSession = new sql.SparkSession.Builder() .appName( "ds intro" ) .master( "local[6]" ) .getOrCreate()
//导入隐算是shi转换 import spark.implicits._
val sourceRDD: RDD[Person] =spark.sparkContext.parallelize(Seq(Person( "张三" , 10 ),Person( "李四" , 15 ))) val personDS: Dataset[Person] =sourceRDD.toDS(); //personDS.printSchema()打印出错信息
val resultDS: Dataset[Person] =personDS.where('age> 10 ) .select( 'name,' age) .as[Person] resultDS.show()
} @Test def dfIntro(): Unit ={ val spark: SparkSession = new SparkSession.Builder() .appName( "ds intro" ) .master( "local" ) .getOrCreate()
import spark.implicits._ val sourceRDD: RDD[Person] = spark.sparkContext.parallelize(Seq(Person( "张三" , 10 ),Person( "李四" , 15 ))) val df: DataFrame = sourceRDD.toDF() //隐shi转换
df.createOrReplaceTempView( "person" ) //创建表 val resultDF: DataFrame =spark.sql( "select name from person where age>=10 and age<=20" ) resultDF.show()
} @Test def database1(): Unit ={ //1.创建sparkSession val spark: SparkSession = new SparkSession.Builder() .appName( "database1" ) .master( "local[6]" ) .getOrCreate() //2.导入引入shi子转换 import spark.implicits._
//3.演示 val sourceRDD: RDD[Person] =spark.sparkContext.parallelize(Seq(Person( "张三" , 10 ),Person( "李四" , 15 ))) val dataset: Dataset[Person] =sourceRDD.toDS()
//Dataset 支持强类型的API dataset.filter(item => item.age > 10 ).show() //Dataset 支持若弱类型的API dataset.filter('age> 10 ).show() //Dataset 可以直接编写SQL表达式 dataset.filter( "age>10" ).show() }
@Test def database2(): Unit ={ val spark: SparkSession = new SparkSession.Builder() .master( "local[6]" ) .appName( "database2" ) .getOrCreate() import spark.implicits._
val dataset: Dataset[Person] =spark.createDataset(Seq(Person( "张三" , 10 ),Person( "李四" , 20 ))) //无论Dataset中放置的是什么类型的对象,最终执行计划中的RDD上都是internalRow //直接获取到已经分析和解析过得Dataset的执行计划,从中拿到RDD val executionRdd: RDD[InternalRow] =dataset.queryExecution.toRdd
//通过将Dataset底层的RDD通过Decoder转成了和Dataset一样的类型RDD val typedRdd:RDD[Person] = dataset.rdd
println(executionRdd.toDebugString) println() println() println(typedRdd.toDebugString) }
@Test def database3(): Unit = { //1.创建sparkSession val spark: SparkSession = new SparkSession.Builder() .appName( "database1" ) .master( "local[6]" ) .getOrCreate() //2.导入引入shi子转换 import spark.implicits._
val dataFrame: DataFrame = Seq(Person( "zhangsan" , 15 ), Person( "lisi" , 20 )).toDF() //3.看看DataFrame可以玩出什么花样 //select name from... dataFrame.where('age > 10 ) .select('name) .show() } // @Test // def database4(): Unit = { // //1.创建sparkSession // val spark: SparkSession = new SparkSession.Builder() // .appName("database1") // .master("local[6]") // .getOrCreate() // //2.导入引入shi子转换 // import spark.implicits._ // val personList=Seq(Person("zhangsan",15),Person("lisi",20)) // // //1.toDF // val df1: DataFrame =personList.toDF() // val df2: DataFrame =spark.sparkContext.parallelize(personList).toDF() // //2.createDataFrame // val df3: DataFrame =spark.createDataFrame(personList) // // //3.read // val df4: DataFrame =spark.read.csv("") // df4.show() // } //toDF()是转成DataFrame,toDs是转成Dataset // DataFrame就是Dataset[Row] 代表弱类型的操作,Dataset代表强类型的操作,中的类型永远是row,DataFrame可以做到运行时类型安全,Dataset可以做到 编译时和运行时都安全 @Test def database4(): Unit = { //1.创建sparkSession val spark: SparkSession = new SparkSession.Builder() .appName( "database1" ) .master( "local[6]" ) .getOrCreate() //2.导入引入shi子转换 import spark.implicits._ val personList=Seq(Person( "zhangsan" , 15 ),Person( "lisi" , 20 )) //DataFrame代表弱类型操作是编译时不安全 val df: DataFrame =personList.toDF()
//Dataset是强类型的 val ds: Dataset[Person] =personList.toDS() ds.map((person:Person) =>Person(person.name,person.age)) } @Test def row(): Unit ={ //1.Row如何创建,它是什么 //row对象必须配合Schema对象才会有列名 val p: Person =Person( "zhangsan" , 15 ) val row: Row =Row( "zhangsan" , 15 ) //2.如何从row中获取数据 row.getString( 0 ) row.getInt( 1 ) //3.Row也是样例类、 row match { case Row(name,age) => println(name,age) } }
} case class Person(name: String, age: Int) |
到此这篇关于深入了解SparkSQL的运用及方法的文章就介绍到这了,更多相关SparkSQL运用内容请搜索以前的文章或继续浏览下面的相关文章希望大家以后多多支持!
原文链接:https://blog.csdn.net/m0_62491934/article/details/123840863
查看更多关于深入了解SparkSQL的运用及方法的详细内容...