Spark SQL分享 by 张包峰
- 格式:pdf
- 大小:983.83 KB
- 文档页数:26
SparkSQL(⼆)——基本操作SparkSession新的起点在⽼的版本中,SparkSQL提供两种SQL查询起始点:⼀个叫SQLContext,⽤于Spark⾃⼰提供的SQL查询;⼀个叫HiveContext,⽤于连接Hive的查询。
SparkSession是Spark最新的SQL查询起始点,实质上是SQLContext和HiveContext的组合,所以在SQLContext和HiveContext上可⽤的API在SparkSession上同样是可以使⽤的。
SparkSession内部封装了sparkContext,所以计算实际上是由sparkContext或者HiveContext完成的。
DataFrame基本操作创建在Spark SQL中SparkSession是创建DataFrame和执⾏SQL的⼊⼝,创建DataFrame有三种⽅式:通过Spark的数据源进⾏创建;从⼀个存在的RDD进⾏转换;还可以从Hive Table进⾏查询返回。
1)通过spark的数据源创建查看SparkSession⽀持哪些⽂件格式创建dataframe(在spark shell中,spark.read.+tab)csv format jdbc json load option options orc parquet schema table text textFile以json格式为例:{"name":"zhangsan","age":20}{"name":"lisi","age":21}{"name":"wangwu","age":22}scala> spark.read.json("file:///home/chxy/spark/user.json")res2: org.apache.spark.sql.DataFrame = [age: bigint, name: string]它可以⾃动地判断出数据的字段和字段类型2)从⼀个存在的RDD中进⾏转换注意:如果需要RDD与DF或者DS之间操作,那么都需要引⼊ import spark.implicits._(1)⼿动转换//⾸先引⼊隐式转换scala> import spark.implicits._import spark.implicits._//创建⼀个RDDscala> def rdd = spark.sparkContext.makeRDD(List(("zhangsan",21),("lisi",22),("wangwu",23)))rdd: org.apache.spark.rdd.RDD[(String, Int)]//⼿动指定dataframe的数据结构scala> val dataframe = rdd.toDF("name","age")dataframe: org.apache.spark.sql.DataFrame = [name: string, age: int](2)通过case类来转换⾸先创建样例类scala> case class People(name:String, age:Int)defined class People将rdd中的数据转换为样例类的实例,rdd中的数据类型变为Peoplescala> val peopleRdd = rdd.map{ d => {People(d._1,d._2)}}peopleRdd: org.apache.spark.rdd.RDD[People] = MapPartitionsRDD[3] at map at将peopleRdd转换为dataframe,此时⽆需指定数据结构,spark可以直接将含有case类的RDD转换为DataFramescala> val peopleDataframe = peopleRdd.toDFpeopleDataframe: org.apache.spark.sql.DataFrame = [name: string, age: int]将dataframe转换为rddscala> peopleDataframe.rddres3: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = MapPartitionsRDD[7] at rdd at <console>:32注意:转换后的数据类型已经不是People,⽽是Row,也就是⾏,它⽆法还原出原来的数据类型。
SparkSQL⼊门⽤法与原理分析Spark SQL是为了让开发⼈员摆脱⾃⼰编写RDD等原⽣Spark代码⽽产⽣的,开发⼈员只需要写⼀句SQL语句或者调⽤API,就能⽣成(翻译成)对应的SparkJob代码并去执⾏,开发变得更简洁注意:本⽂全部基于SparkSQL1.6参考:⼀. APISpark SQL的API⽅案:3种SQLthe DataFrames APIthe Datasets API.但会使⽤同⼀个执⾏引擎the same execution engine is used(⼀)数据转为Dataframe1、(半)格式化数据(HDFS⽂件)SQLContext sqlContext = new org.apache.spark.sql.SQLContext(sc)// Parquet files are self-describing so the schema is preserved.⽂件格式⾃带描述性DataFrame df= sqlContext.read().parquet("people.parquet");//SQLContext.read().json() on either an RDD of String, or a JSON file. not a typical JSON file(见下⾯的⼩实验)DataFrame df = sqlContext.read().json("/testDir/people.json");Load默认是parquet格式,通过format指定格式DataFrame df = sqlContext.read().load("examples/src/main/resources/users.parquet");DataFrame df = sqlContext.read().format("json").load("main/resources/people.json");旧API 已经被废弃DataFrame df2 =sqlContext.jsonFile("/xxx.json");DataFrame df2 =sqlContext.parquetFile("/xxx.parquet");2、RDD数据SQLContext sqlContext = new org.apache.spark.sql.SQLContext(sc)a. 通过类利⽤JAVA类的反射机制已有:JavaRDD<Person> peopleDataFrame df= sqlContext.createDataFrame(people, Person.class);b. 通过schema转换RDD已有:StructType schema = DataTypes.createStructType(fields);和JavaRDD<Row> rowRDDDataFrame df= sqlContext.createDataFrame(rowRDD, schema);3、 Hive数据(HDFS⽂件在数据库中的表(schema)对应关系)HiveContext sqlContext = new org.apache.spark.sql.hive.HiveContext(sc);DataFrame df = sqlContext.sql("select count(*) from wangke.wangke where ns_date=20161224");sqlContext.refreshTable("my_table")//(if configured,sparkSQL caches metadata)sqlContext.sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)");sqlContext.sql("LOAD DATA LOCAL INPATH 'resources/kv1.txt' INTO TABLE src");Row[] results = sqlContext.sql("FROM src SELECT key, value").collect();4、特殊⽤法DataFrame df = sqlContext.sql("SELECT * FROM parquet.`main/resources/users.parquet`");//查询临时表peopleDataFrame teenagers = sqlContext.sql("SELECT name FROMpeople WHERE age >= 13 AND age <= 19")(⼆)、Dataframe使⽤1、展⽰df.show();df.printSchema();2、过滤选择df.select("name").show();df.select(df.col("name"), df.col("age").plus(1)).show();df.filter(df.col("age").gt(21)).show();df.groupBy("age").count().show();3、写⽂件df.select("name", "favorite_color").write().save("namesAndFavColors.parquet");df.select("name", "age").write().format("parquet").save("namesAndAges.parquet");df.write().parquet("people.parquet");4、注册临时表df.registerTempTable("people");之后就可以⽤SQL在上⾯去查了DataFrame teenagers = sqlContext.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19")5、保存Hive表When working with a HiveContext, DataFrames can also be saved as persistent tables using the saveAsTable command只有HiveContext⽣成的Dataframe才能调⽤saveAsTable去持久化hive表(三)、直接SQL操作sqlContext.sql("create table xx.tmp like xx.xx");sqlContext.sql("insert into table xx.tmp partition(day=20160816) select * from xx.xx where day=20160816");sqlContext.sql("insert overwrite table xx.xx partition(day=20160816) select * from xx.tmp where day=20160816");⼆. 原理将上⾯的所有操作总结为如下图:Dataframe本质是数据 + 数据的描述信息(结构元信息)所有的上述SQL及dataframe操作最终都通过Catalyst翻译成spark程序RDD操作代码sparkSQL前⾝是shark,⼤量依赖Hive项⽬的jar包与功能,但在上⾯的扩展越来越难,因此出现了SparkSQL,它重写了分析器,执⾏器脱离了对Hive项⽬的⼤部分依赖,基本可以独⽴去运⾏,只⽤到两个地⽅:1.借⽤了hive的词汇分析的jar即HiveQL解析器2.借⽤了hive的metastore和数据访问API即hiveCatalog也就是说上图的左半部分的操作全部⽤的是sparkSQL本⾝⾃带的内置SQL解析器解析SQL进⾏翻译,⽤到内置元数据信息(⽐如结构化⽂件中⾃带的结构元信息,RDD的schema中的结构元信息)右半部分则是⾛的Hive的HQL解析器,还有Hive元数据信息因此左右两边的API调⽤的底层类会有不同SQLContext使⽤:简单的解析器(scala语⾔写的sql解析器)【⽐如:1.在半结构化的⽂件⾥⾯使⽤sql查询时,是⽤这个解析器解析的,2.访问(半)结构化⽂件的时候,通过sqlContext使⽤schema,类⽣成Dataframe,然后dataframe注册为表时,registAsTmpTable 然后从这个表⾥⾯进⾏查询时,即使⽤的简单的解析器,⼀些hive语法应该是不⽀持的,有待验证)】simpleCatalog【此对象中存放关系(表),⽐如我们指定的schema信息,类的信息,都是关系信息】HiveContext使⽤:HiveQL解析器【⽀持hive的hql语法,如只有通过HiveContext⽣成的dataframe才能调⽤saveAsTable操作】hiveCatalog(存放数据库和表的元数据信息)三. Catalyst所有的SQL操作最终都通过Catalyst翻译成spark程序代码四. ⽂件⼩实验(关于sparkSQL使⽤json的坑)SQLContext sqlContext = new SQLContext(sc); DataFrame df = sqlContext.read().json("/testDir/people.json");将json⽂件放在⽂件系统中,⼀直⽆法找到原来它是从HDFS⾥⾯取数据的sc.textFile("/testDir/people.txt")也是默认从HDFS中读注意这个路径,最开始的斜杠很重要如果没有,则是相对路径,前⾯会⾃动加上user和⽤户名的路径hdfs://10.67.1.98:9000/user/wangke/testDir/people.txt创建了⼀个合法的json⽂件放在了HDFS下尝试其API,发现⼀直报错org.apache.spark.sql.AnalysisException: Cannot resolve column name "age" among (_corrupt_record);原因(很坑很坑)1. 不能写成合法的json数据[{"name": "Michael","age": 91},{"name": "Andy","age": 30},{"name": "justin","age": 19}]这个是标准的,spark不识别,呵呵呵改:{"name": "Michael","age": 91} {"name": "Andy","age": 3 }{"name": "justin","age": 19}依然报错2. Json数据不能换⾏{"name": "Michael","age": 91}{"name": "Andy","age": 30}{"name": "justin","age": 19}原因:Note that the file that is offered as a json file is not a typical JSON file. Each line must contain a separate, self-contained valid JSON object. As a consequence, a regular multi-line JSON file will most often fail.json要在⼀⾏的原因初步猜测是因为和spark json file to DF的步骤有关,猜测的步骤:1. val jsonRdd= sc.textFile('your_json_file')2. jsonRdd.map(line => )实现⽅式是先读text⽂件,然后map line to row or tuple,然后 toDF不在⼀⾏不好识别⼀个json string 有⼏⾏,也⽆法确定 df的schema。
SparkSql之ThriftServer和Beeline的使⽤概述ThriftServer相当于service层,⽽ThriftServer通过Beeline来连接数据库。
客户端⽤于连接JDBC的Server的⼀个⼯具步骤1:启动metastore服务./sbin/start-thriftserver.sh2:连接[rachel@bigdata-senior01 spark-2.2.0-bin]$ ./bin/beelineBeeline version 1.2.1.spark2 by Apache Hivebeeline> !connect jdbc:hive2://192.168.1.11:10000Connecting to jdbc:hive2://192.168.1.11:10000Enter username for jdbc:hive2://192.168.1.11:10000: rachelEnter password for jdbc:hive2://192.168.1.11:10000: ******18/09/2311:09:58 INFO Utils: Supplied authorities: 192.168.1.11:100003:Spark编译时版本选择和Hive的关系只要保证HDFS\MySQL没有问题,基本上Spark和Hive集成没有问题对于spark2.0以后版本编译,默认选择hive的1.2.*之后的版本hive可以理解为外⾯封装了数据服务的代理。
在spark和hive集成,虽然说是1.2,因为hive只提供了⼀个metastore只要保证HDFS数据没有问题,metastore没有问题,hive中的数据就是没有问题的4:连接beeline0: jdbc:hive2://192.168.1.11:10000> 这类似⼀个mysql的客户端。
0: jdbc:hive2://192.168.1.11:10000> select * from rachel.test;+---------+-----------+--+| userid | username |+---------+-----------+--+| 1 | rachel || 2 | allen || 3 | nike || 4 | joy |+---------+-----------+--+5:页⾯上查看Jobhttp://192.168.1.11:4040/jobs/总结基于Spark的thirftserver来访问hive中的数据,可以让多个客户端连接到同⼀个服务器端,跑的是同⼀个application Thirftserver作为服务端,beeline作为客户端来访问服务端,⽀持多个客户端同时访问,有助于多个客户端之间数据的共享。
Spark之使⽤SparkSql操作Hive的Scala程序实现依赖<dependency><groupId>org.apache.spark</groupId><artifactId>spark-hive_2.11</artifactId><version>2.1.3</version></dependency>scala代码package com.zy.sparksqlimport org.apache.spark.SparkContextimport org.apache.spark.sql.SparkSession/*** 通过spark操作hive 把hive.site.xml放到resources中即可把元数据信息写⼊配置的mysql中*/object HiveSupport {def main(args: Array[String]): Unit = {//创建sparkSessionval sparkSession: SparkSession = SparkSession.builder().appName("HiveSupport").master("local[2]").enableHiveSupport().getOrCreate() //获取scval sc: SparkContext = sparkSession.sparkContextsc.setLogLevel("WARN")//操作hive// sparkSession.sql("create table if not exists person(id int,name string,age int) row format delimited fields terminated by ','")// sparkSession.sql("load data local inpath './data/person.txt' into table person")sparkSession.sql("select * from person").show()sparkSession.stop()}}hive-site.xml<configuration><property><name>javax.jdo.option.ConnectionURL</name><value>jdbc:mysql://192.168.44.31:3306/hive?createDatabaseIfNotExist=true</value><description>JDBC connect string for a JDBC metastore</description></property><property><name>javax.jdo.option.ConnectionDriverName</name><value>com.mysql.jdbc.Driver</value><description>Driver class name for a JDBC metastore</description></property><property><name>javax.jdo.option.ConnectionUserName</name><value>root</value><description>username to use against metastore database</description></property><property><name>javax.jdo.option.ConnectionPassword</name><value>root</value><description>password to use against metastore database</description></property></configuration>还需要把hdfs上的user/hive/warehouse⽬录 chmod 777,不然程序访问不了会报错。
⼩记--------sparksql执⾏全过程1案例2 def main(args: Array[String]): Unit = {34// 1.创建sparkconf5 val conf = new SparkConf()6 .setMaster("local")7 .setAppName("test-sql")8910// 2.创建sparksession11 val session: SparkSession = SparkSession12 .builder()13 .config(conf)14 .getOrCreate()151617// 3.创建数据表并读取数据 , 并创建了student的数据表(视图)18// 读取本地student.json ⽂件。
19//{"id": 1 , "name" : "Kate" , "age" : 29}20//{"id": 2 , "name" : "Andy" , "age" : 39}21//{"id": 3 , "name" : "Tony" , "age" : 10}22 session23 .read24 .json("D:\\daima\\work\\1011\\spark-test-zhonghuashishan\\src\\test\\file\\student.json")25 .createOrReplaceTempView("student")262728// SQL查询29 session.sql("select name from student where age > 18 ").show()30 }⼀般来讲,对于sparkSQL系统,从SQL到spark中的RDD的执⾏需要经过两个⼤的阶段、逻辑计划(LogicalPlan)物理计划(PhysicalPlan)SQL执⾏过程概览逻辑计划阶段会将⽤户所写的SQL语句转换成树型数据结构(逻辑算⼦树),SQL语句中蕴含的逻辑映射到逻辑算⼦树的不同节点,逻辑计划阶段⽣成的逻辑算⼦树并不会直接提交执⾏,仅作为中间阶段。
【Spark篇】---SparkSQLonHive的配置和使⽤⼀、前述Spark on Hive: Hive只作为储存⾓⾊,Spark负责sql解析优化,执⾏。
⼆、具体配置1、在Spark客户端配置Hive On Spark在Spark客户端安装包下spark-1.6.0/conf中创建⽂件hive-site.xml:配置hive的metastore路径<configuration><property><name>hive.metastore.uris</name><value>thrift://node1:9083</value></property></configuration>2、启动Hive的metastore服务hive --service metastore3、启动zookeeper集群,启动HDFS集群。
4、启动SparkShell 读取Hive中的表总数,对⽐hive中查询同⼀表查询总数测试时间。
./spark-shell--master spark://node1:7077,node2:7077--executor-cores 1--executor-memory 1g--total-executor-cores 1import org.apache.spark.sql.hive.HiveContextval hc = new HiveContext(sc)hc.sql("show databases").showhc.sql("user default").showhc.sql("select count(*) from jizhan").show可以发现性能明显提升!!!注意:如果使⽤Spark on Hive 查询数据时,出现错误:找不到HDFS集群路径,要在客户端机器conf/spark-env.sh中设置HDFS的路径:export HADOOP_CONF_DIR=$HADOOP_HOME/etc/hadoop三、读取Hive中的数据加载成DataFrame1、HiveContext是SQLContext的⼦类,连接Hive建议使⽤HiveContext。
Spark SQL配置及使⽤教程⽬录SparkSQL版本:SparkSQL DSL语法SparkSQL和Hive的集成Spark应⽤依赖第三⽅jar包⽂件解决⽅案SparkSQL的ThriftServer服务SparkSQL的ThriftServer服务测试Spark中beeline的使⽤通过jdbc来访问spark的ThriftServer接⼝SparkSQL案例案例⼀:SparkSQL读取HDFS上Json格式的⽂件案例⼆:DataFrame和Dataset和RDD之间的互相转换SparkSQL的函数XY个⼈记SparkSQL是spark的⼀个模块,主⼊⼝是SparkSession,将SQL查询与Spark程序⽆缝混合。
DataFrames和SQL提供了访问各种数据源(通过JDBC或ODBC连接)的常⽤⽅法包括Hive,Avro,Parquet,ORC,JSON和JDBC。
您甚⾄可以跨这些来源加⼊数据。
以相同⽅式连接到任何数据源。
Spark SQL还⽀持HiveQL语法以及Hive SerDes和UDF,允许您访问现有的Hive仓库。
Spark SQL包括基于成本的优化器,列式存储和代码⽣成,以快速进⾏查询。
同时,它使⽤Spark引擎扩展到数千个节点和多⼩时查询,该引擎提供完整的中间查询容错。
不要担⼼使⽤不同的引擎来获取历史数据。
SparkSQL版本:Spark2.0之前⼊⼝:SQLContext和HiveContextSQLContext:主要DataFrame的构建以及DataFrame的执⾏,SQLContext指的是spark中SQL模块的程序⼊⼝HiveContext:是SQLContext的⼦类,专门⽤于与Hive的集成,⽐如读取Hive的元数据,数据存储到Hive表、Hive的窗⼝分析函数等Spark2.0之后⼊⼝:SparkSession(spark应⽤程序的⼀个整体⼊⼝),合并了SQLContext和HiveContextSparkSQL核⼼抽象:DataFrame/Dataset type DataFrame = Dataset[Row] //type 给某个数据类型起个别名SparkSQL DSL语法SparkSQL除了⽀持直接的HQL语句的查询外,还⽀持通过DSL语句/API进⾏数据的操作,主要DataFrame API列表如下:select:类似于HQL语句中的select,获取需要的字段信息where/filter:类似HQL语句中的where语句,根据给定条件过滤数据sort/orderBy: 全局数据排序功能,类似Hive中的order by语句,按照给定字段进⾏全部数据的排序sortWithinPartitions:类似Hive的sort by语句,按照分区进⾏数据排序groupBy:数据聚合操作limit:获取前N条数据记录SparkSQL和Hive的集成集成步骤:-1. namenode和datanode启动-2. 将hive配置⽂件软连接或者复制到spark的conf⽬录下⾯$ ln -s /opt/modules/apache/hive-1.2.1/conf/hive-site.xmlor$ cp /opt/modules/apache/hive-1.2.1/conf/hive-site.xml ./-3. 根据hive-site.xml中不同配置项,采⽤不同策略操作根据hive.metastore.uris参数-a. 当hive.metastore.uris参数为空的时候(默认值)将Hive元数据库的驱动jar⽂件添加spark的classpath环境变量中即可完成SparkSQL到hive的集成-b. 当hive.metastore.uris⾮空时候-1. 启动hive的metastore服务./bin/hive --service metastore &-2. 完成SparkSQL与Hive集成⼯作-4.启动spark-SQL($ bin/spark-sql)时候发现报错:ng.ClassNotFoundException: org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriverat .URLClassLoader$1.run(URLClassLoader.java:366)at .URLClassLoader$1.run(URLClassLoader.java:355)at java.security.AccessController.doPrivileged(Native Method)at .URLClassLoader.findClass(URLClassLoader.java:354)at ng.ClassLoader.loadClass(ClassLoader.java:425)at ng.ClassLoader.loadClass(ClassLoader.java:358)at ng.Class.forName0(Native Method)at ng.Class.forName(Class.java:270)at org.apache.spark.util.Utils$.classForName(Utils.scala:228)at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:693) at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:185)at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:210)at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:124)at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)Failed to load main class org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.You need to build Spark with -Phive and -Phive-thriftserver.解决办法:将spark源码中sql/hive-thriftserver/target/spark-hive-thriftserver_2.11-2.0.2.jar拷贝到spark的jars⽬录下完成。
SparkSQL优化知识点引言SparkSQL是Apache Spark中用于处理结构化数据的模块,它提供了一种高效的方式来执行SQL查询。
然而,在处理大规模数据集时,性能优化变得至关重要。
本文将介绍一些SparkSQL的优化知识点,帮助您提高查询性能。
1. 数据分区数据分区是优化SparkSQL查询性能的关键。
SparkSQL使用分布式计算的方式处理数据,数据的分区决定了计算的并行度。
因此,合理划分数据分区可以加速查询过程。
1.1 合理的分区数量合理的分区数量可以提高计算并行度。
通常,根据集群的规模和数据的大小来决定分区数量。
较小的集群和较小的数据集可以选择较少的分区,而较大的集群和较大的数据集则可以选择更多的分区。
1.2 数据倾斜数据倾斜是指某些分区中的数据量远远超过其他分区。
这会导致某些计算节点负载过重,从而降低整体性能。
处理数据倾斜的方法包括使用更细粒度的分区、使用随机前缀等。
2. 数据压缩数据压缩可以减少存储空间的占用,并提高数据的读取效率。
SparkSQL支持多种压缩算法,如Snappy、Gzip等。
在选择压缩算法时,需要考虑数据的压缩比和解压缩的速度。
3. 数据过滤数据过滤是优化SparkSQL查询性能的一种重要方法。
通过尽早过滤掉不必要的数据,可以减少后续计算的数据量。
3.1 列式存储列式存储可以提高查询性能,特别是当仅需要查询部分列时。
SparkSQL默认采用列式存储,但可以通过设置pressed参数来进一步优化存储性能。
3.2 分区剪枝分区剪枝是指根据查询条件选择需要扫描的分区。
当数据集有大量分区时,只扫描符合查询条件的分区可以提高查询效率。
可以通过使用谓词下推等技术来实现分区剪枝。
4. 数据缓存数据缓存是一种将数据存储在内存中,以减少后续查询的计算量的方法。
SparkSQL提供了内存缓存机制,通过将频繁访问的数据缓存到内存中,可以提高查询的响应速度。
4.1 RDD缓存通过将查询得到的结果以RDD的形式缓存到内存中,可以加速后续的查询操作。