Apache_Spark源码走读系列篇二
- 格式:docx
- 大小:25.49 KB
- 文档页数:12
学会使用ApacheSpark进行大数据分析和处理的基本操作Apache Spark是一个快速、通用、可扩展的大数据处理引擎,被广泛应用于大数据分析和处理中。
学会使用Apache Spark进行大数据分析和处理的基本操作,对于数据科学家和大数据工程师来说至关重要。
本文将介绍Apache Spark的基本概念和操作,包括数据加载、转换、过滤、聚合以及输出等,以帮助读者快速上手使用Apache Spark进行大数据分析和处理。
第一章:Apache Spark简介与安装Apache Spark是一款开源的大数据处理框架,提供了高效的分布式计算能力,可以处理大规模的数据集。
在使用Apache Spark 之前,我们需要先安装Spark并配置好相应的环境。
具体的安装过程可以在Apache Spark官方网站上找到,并根据操作系统类型和版本进行安装、设置和配置。
第二章:数据加载与存储在使用Apache Spark进行大数据分析和处理之前,我们需要先将数据加载到Spark中。
Spark支持多种数据源和格式,如文本文件、CSV文件、JSON文件、数据库等。
可以使用Spark的API或工具(如spark-submit或spark-shell)来加载和读取数据。
除了加载数据,我们还可以将结果保存到各种外部存储介质中,如HDFS、S3或关系型数据库等。
第三章:数据转换与过滤在数据分析和处理过程中,常常需要对数据进行转换和过滤以满足需求。
Apache Spark提供了丰富的转换和过滤操作,如映射、过滤、排序、去重等。
通过这些操作,我们可以对数据集进行加工和处理,以便于后续的分析和挖掘。
第四章:数据聚合与计算数据聚合是大数据处理中常见的操作之一,Apache Spark提供了多种聚合和计算函数,如求和、平均值、最大值、最小值、统计等。
通过这些函数,我们可以对数据集进行统计和计算,以获取更有价值的信息。
此外,Spark还支持自定义聚合函数和窗口函数,可以满足更加复杂的需求。
Spark源码分析之Driver和Excutor是怎么跑起来的?(2.2.0版本)今天抽空回顾了⼀下Spark相关的源码,本来想要了解⼀下Block的管理机制,但是看着看着就回到了SparkContext的创建与使⽤。
正好之前没有正式的整理过这部分的内容,这次就顺带着回顾⼀下。
更多内容参考:Spark作为⽬前最流⾏的⼤数据计算框架,已经发展了⼏个年头了。
版本也从我刚接触的1.6升级到了2.2.1。
由于⽬前⼯作使⽤的是2.2.0,所以这次的分析也就从2.2.0版本⼊⼿了。
涉及的内容主要有:Standalone模式中的Master与Workerclient、driver、excutor的关系下⾯就按照顺序依次介绍⼀下。
在最开始编程的时候,很少会涉及分布式,因为数据量也不⼤。
后来随着硬件的发展cpu的瓶颈,开始流⾏多线程编程,基于多线程来加快处理速度;再后来,衍⽣出了⽹格计算、CPU与GPU的异构并⾏计算以及当时流⾏的mapreduce分布式计算。
但是mapreduce由于存储以及计算流程的限制,spark开始流⾏起来。
Spark凭借内存计算、强⼤的DAG回溯能⼒,快速的占领并⾏计算的风⼝。
那么并⾏计算肯定是需要分布式集群的,常见的集群管理⽅式,有Master-Slave模式、P2P模式等等。
⽐如Mysql的主从复制,就是Master-Slave模式;Elasticsearch的分⽚管理就是P2P模式。
在Spark中有不同的部署⽅式,但是计算的模式都是Master-Slave模式,只不过Slave换了名字叫做worker⽽已。
集群的部署模式如下所⽰:流程就是⽤户以client的⾝份向master提交任务,master去worker上⾯创建执⾏任务的载体(driver和excutor)。
Master和Worker是服务器的部署⾓⾊,程序从执⾏上,则分成了client、driver、excutor三种⾓⾊。
Spark集群启动之Master、Worker启动流程源码分析Spark集群启动Master可以使用脚本启动:start-master,shell脚本细节自行查看。
最终启动命令为:Java -cp /home/daxin/bigdata/spark/conf/:/home/daxin/bigdata/spark/jars/*:/home/daxin/bigdata/Hadoop/et c/hadoop/ -Xmx1g -XX:MaxPermSize=256m org.apache.spark.deploy.master.Master --host node --port 7077 --webui-port 8080最终转换为java命令启动Master的过程,所以我们就需要查看一下Master的main方法代码如下:[java] view plain copy 在CODE上查看代码片派生到我的代码片val systemName = "sparkMaster"private val actorName = "Master"/*** spark-class脚本调用,启动master** @param argStrings*/def main(argStrings: Array[String]) {SignalLogger.register(log)//参数配置准备val conf = new SparkConfval args = new MasterArguments(argStrings, conf)//创建actorSystem// (actorSystem, boundPort, portsResponse.webUIPort, portsResponse.restPort)val (actorSystem, _, _, _) = startSystemAndActor(args.host, args.port, args.webUiPort, conf) actorSystem.awaitTermination()}通过代码可以可以知道调用startSystemAndActor方法完成ActorSystem和Actor的创建。
代码⾛读 ⼀、代码⾛读的内容 代码⾛读在软件开发过程⼗分的重要,能及时的发现并解决问题,那么代码⾛读有哪些内容呢? 1、检查是否符合编程规范:开发⼈员的编码风格是否规范,是否有注释,编写的代码能否让其他的编程⼈员阅读及维护,编程中的变量命名是否合适,是否缺少空格等。
2、寻找编译器中的设计陷阱:编程和设计过程中常见的和可防⽌的问题,能顺利通过编译,没有任何警告和错误信息,⽽且计算机能严格按照代码执⾏。
3、快速理解源代码,找出流程设计中的问题:将源代码编译成可执⾏程序,也可以阅读代码来了解程序的功能及其⼯作⽅式,还可以修改源代码来改变程序的功能从⽽找出逻辑上存在的问题,要求检查者要读懂代码,并且熟悉业务。
4、架构:包含类之间的关系,某个函数的实现。
如果不考虑后期维护可以忽略这层,或是有强⼤的架构设计师。
其实这类问题⽐逻辑更容易发现,例如某个类功能太多或函数if\switch太多等。
5、对原有代码的重构:重构就是在不破坏可观察功能的前提下,借由搬移、提炼、打散、凝聚……,改善事务的体质、强化当前的可读性、为将来的扩充性和维护性做准备、乃⾄于在过程中找出潜在的错误。
⼆、代码⾛读的⽅法 1、反复推敲 同⼀个逻辑可以有很多⽅式描述,但⽤哪个更好更合适可以在⾛读时细细体会,推敲的标准是1.⾼内聚低耦合 2.接⼝优先 3.好看好理解4.⾼效,运⾏速度快。
2、过段时间复读 ⽂章放段时间再拿出来看能发现很多问题,代码⼀样,⼀段时间后⼈的思维惯性没那么强了,改代码的抵触⼼理也会少很多,更容易发现问题。
除了开发⼈员需要进⾏代码⾛读外,⽩盒测试⼈员在进⾏测试时也需要简单的进⾏代码⾛读,从测试⾓度找出编码中存在的问题,及时的让开发⼈员改正,从⽽保证代码的⾼质量。
Spark内核源码解析⼗⼆:shuffle原理解析第⼀个特点,在Spark早期版本中,那个bucket缓存是⾮常⾮常重要的,因为需要将⼀个ShuffleMapTask所有的数据都写⼊内存缓存之后,才会刷新到磁盘。
但是这就有⼀个问题,如果map side数据过多,那么很容易造成内存溢出。
所以spark在新版本中,优化了,默认那个内存缓存是100kb,然后呢,写⼊⼀点数据达到了刷新到磁盘的阈值之后,就会将数据⼀点⼀点地刷新到磁盘。
这种操作的优点,是不容易发⽣内存溢出。
缺点在于,如果内存缓存过⼩的话,那么可能发⽣过多的磁盘写io操作。
所以,这⾥的内存缓存⼤⼩,是可以根据实际的业务情况进⾏优化的。
第⼆个特点,与MapReduce完全不⼀样的是,MapReduce它必须将所有的数据都写⼊本地磁盘⽂件以后,才能启动reduce操作,来拉取数据。
为什么?因为mapreduce要实现默认的根据key的排序!所以要排序,肯定得写完所有数据,才能排序,然后reduce来拉取。
但是Spark不需要,spark默认情况下,是不会对数据进⾏排序的。
因此ShuffleMapTask每写⼊⼀点数据,ResultTask就可以拉取⼀点数据,然后在本地执⾏我们定义的聚合函数和算⼦,进⾏计算。
spark这种机制的好处在于,速度⽐mapreduce快多了。
但是也有⼀个问题,mapreduce提供的reduce,是可以处理每个key对应的value上的,很⽅便。
但是spark中,由于这种实时拉取的机制,因此提供不了,直接处理key对应的values的算⼦,只能通过groupByKey,先shuffle,有⼀个MapPartitionsRDD,然后⽤map算⼦,来处理每个key对应的values。
就没有mapreduce的计算模型那么⽅便。
shuffle原理图如下优化后也就是加⼊consolidation机制后的原理图如下,主要解决产⽣的⽂件太多在executor中执⾏任务时,主要是task的实现类来执⾏任务,其中shuffleMapTask,将针对rdd执⾏算⼦后的结果写⼊磁盘// 有mapstatus返回值,override def runTask(context: TaskContext): MapStatus = {// Deserialize the RDD using the broadcast variable.// 对要处理的rdd相关数据,做⼀些反序列化的,这个rdd是怎么拿到的,多个task运⾏在executor⾥⾯,并⾏运⾏或者并发运⾏// 可能不在⼀个地⽅,但是⼀个stage的task,要处理的rdd都是⼀样的,通过broadcast variable拿到val ser = SparkEnv.get.closureSerializer.newInstance()val (rdd, dep) = ser.deserialize[(RDD[_], ShuffleDependency[_, _, _])](ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader)metrics = Some(context.taskMetrics)var writer: ShuffleWriter[Any, Any] = nulltry {// 获取shuffleManagerval manager = SparkEnv.get.shuffleManagerwriter = manager.getWriter[Any, Any](dep.shuffleHandle, partitionId, context)// 调⽤rdd的iterator⽅法,并且传⼊当前task要处理哪个partition,核⼼逻辑就在rdd的iterator// ⽅法中在这⾥实现了针对某个partition执⾏算⼦和函数,针对rdd的partition进⾏处理,有返回数据通过shuffleWriter经过// HashPartition写⼊⾃⼰的分区,mapstatus封装了shufflemaptask计算后的数据,存储在那⾥,就是blockmanager信息// blockmanager就是spark底层内存、数据、磁盘管理组件writer.write(rdd.iterator(partition, context).asInstanceOf[Iterator[_ <: Product2[Any, Any]]])return writer.stop(success = true).get} catch {case e: Exception =>try {if (writer != null) {writer.stop(success = false)}} catch {case e: Exception =>log.debug("Could not stop writer", e)}throw e}}shuffle写的⼊⼝再HashShuffleWriter⾥⾯/** Write a bunch of records to this task's output* 将每个shuffleMapTask计算出来的新的RDD的partition数据,写⼊磁盘* */override def write(records: Iterator[_ <: Product2[K, V]]): Unit = {// ⾸先判断,是否需要在map端本地进⾏聚合,这⾥的话,如果是reduceBykey这种操作,它的dep.aggregator.isDefined就是true// 包括dep.mapSideCombine也是true// 那么就就进⾏map端的本地聚合val iter = if (dep.aggregator.isDefined) {if (dep.mapSideCombine) {// 执⾏本地聚合,如(hello,1)(hello,1)就成了(hello,2)bineValuesByKey(records, context)} else {records}//} else {require(!dep.mapSideCombine, "Map-side combine without Aggregator specified!")records}// 如果要本地聚合,那么先本地聚合,然后遍历数据,对每个数据掉⽤partitioner// ,默认是hashPartitioner,⽣成bucketId,也就是决定每⼀份数据要写⼊那个bucket。
SparkSQL源码解析(⼆)Antlr4解析Sql并⽣成树Spark SQL原理解析前⾔:这⼀次要开始真正介绍Spark解析SQL的流程,⾸先是从Sql Parse阶段开始,简单点说,这个阶段就是使⽤Antlr4,将⼀条Sql语句解析成语法树。
可能有童鞋没接触过antlr4这个内容,推荐看看《antlr4权威指南》前四章,看完起码知道antlr4能⼲嘛。
我这⾥就不多介绍了。
这篇⾸先先介绍调⽤spark.sql()时候的流程,再看看antlr4在这个其中的主要功能,最后再将探究Logical Plan究竟是什么东西。
初始流程当你调⽤spark.sql的时候,会调⽤下⾯的⽅法:def sql(sqlText: String): DataFrame = {Dataset.ofRows(self, sessionState.sqlParser.parsePlan(sqlText))}parse sql阶段主要是parsePlan(sqlText)这⼀部分。
⽽这⾥⼜会辗转去org.apache.spark.sql.catalyst.parser.AbstractSqlParser调⽤parse⽅法。
这⾥贴下关键代码。
protected def parse[T](command: String)(toResult: SqlBaseParser => T): T = {logDebug(s"Parsing command: $command")val lexer = new SqlBaseLexer(new UpperCaseCharStream(CharStreams.fromString(command)))lexer.removeErrorListeners()lexer.addErrorListener(ParseErrorListener)lexer.legacy_setops_precedence_enbled = SQLConf.get.setOpsPrecedenceEnforcedval tokenStream = new CommonTokenStream(lexer)val parser = new SqlBaseParser(tokenStream)parser.addParseListener(PostProcessor)parser.removeErrorListeners()parser.addErrorListener(ParseErrorListener)parser.legacy_setops_precedence_enbled = SQLConf.get.setOpsPrecedenceEnforcedtry {try {// first, try parsing with potentially faster SLL modeparser.getInterpreter.setPredictionMode(PredictionMode.SLL)toResult(parser)}catch {case e: ParseCancellationException =>// if we fail, parse with LL modetokenStream.seek(0) // rewind input streamparser.reset()// Try Again.parser.getInterpreter.setPredictionMode(PredictionMode.LL)toResult(parser)}}catch {case e: ParseException if mand.isDefined =>throw ecase e: ParseException =>throw e.withCommand(command)case e: AnalysisException =>val position = Origin(e.line, e.startPosition)throw new ParseException(Option(command), e.message, position, position)}}可以发现,这⾥⾯的处理逻辑,⽆论是SqlBaseLexer还是SqlBaseParser都是Antlr4的东西,包括最后的toResult(parser)也是调⽤访问者模式的类去遍历语法树来⽣成Logical Plan。
ApacheSpark框架详细介绍Apache Spark框架详细介绍Apache Spark是一个快速、通用的大数据处理框架,由加州大学伯克利分校的AMPLab开发。
它提供了一种高级的分布式编程模型,可以处理包含数十亿行数据的大规模数据集。
本文将详细介绍Apache Spark框架的核心组件、特点和使用场景。
一、Spark的核心组件Apache Spark框架由以下核心组件构成:1. Spark Core:Spark的基础功能模块,提供了任务调度、内存管理、错误恢复等核心功能。
它还包含了RDD(弹性分布式数据集)的实现,RDD是Spark的主要数据结构,它是一个可分区、可并行计算的数据集合。
2. Spark SQL:提供了对结构化数据的查询和分析功能,支持SQL查询、数据流处理和机器学习等操作。
Spark SQL可以将结构化数据映射成DataFrame,DataFrame是一种类似于关系型数据库的数据结构。
3. Spark Streaming:用于实时流式数据的处理和分析,可以处理实时生成的数据流,并将其划分成小批次进行计算。
Spark Streaming支持各种数据源,如Kafka、Flume等。
4. MLlib:为Spark提供了机器学习功能,包括常见的分类、回归、聚类和推荐算法等。
MLlib提供了丰富的机器学习工具和算法库,能够处理大规模的机器学习任务。
5. GraphX:用于图计算的组件,支持图的创建、操作和分析。
GraphX提供了图的基本算法和图计算的API,可以用于社交网络分析、推荐系统等领域。
二、Spark的特点Apache Spark相比于其他大数据处理框架,具有以下几个显著特点:1. 快速性能:Spark采用内存计算,能够将数据存储在内存中,从而加速数据处理的速度。
它还支持并行计算,能够在多个节点上同时执行任务,提高了处理效率。
2. 容错性:Spark具有良好的容错能力,能够在节点故障时自动恢复任务,保证计算的可靠性。
Spark源码系列(⼗)spark源码解析⼤全第1章Spark 整体概述1.1 整体概念 A p a c h e Sp a r k是⼀个开源的通⽤集群计算系统,它提供了H ig h-l e v e l编程A P I,⽀持Sc a l a、J a v a和P y t h o n三种编程语⾔。
Sp a r k内核使⽤Sc a l a 语⾔编写,通过基于Sc a l a的函数式编程特性,在不同的计算层⾯进⾏抽象,代码设计⾮常优秀。
1.2 RDD 抽象1.3 计算抽象在St a n d a l o n e模式下,默认使⽤的是FI FO这种简单的调度策略,在进⾏调度的过程中,⼤概流程如下图所⽰:1.4 集群模式 Sp a r k集群在设计的时候,并没有在资源管理的设计上对外封闭,⽽是充分考虑了未来对接⼀些更强⼤的资源管理系统,如Y A R N、M e s o s等,所以Sp a r k架构设计将资源管理单独抽象出⼀层,通过这种抽象能够构建⼀种适合企业当前技术栈的插件式资源管理模块,从⽽为不同的计算场景提供不同的资源分配与调度策略。
Sp a r k集群模式架构,如下图所⽰:上图中,Sp a r k集群C l u s t e r M a n a g e r⽬前⽀持如下三种模式:1)St a n d a l o n e模式 ·St a n d a l o n e模式是Sp a r k内部默认实现的⼀种集群管理模式,这种模式是通过集群中的M a s t e r来统⼀管理资源,⽽与M a s t e r进⾏资源请求协商的是D r iv e r内部的St a n d a l o n e Sc h e d u l e r B a c k e n d(实际上是其内部的St a n d a l o n e A p p C l ie n t真正与M a s t e r通信),后⾯会详细说明。
2)Y A R N模式 ·Y A R N模式下,可以将资源的管理统⼀交给Y A R N集群的R e s o u r c e M a n a g e r去管理,选择这种模式,可以更⼤限度的适应企业内部已有的技术栈,如果企业内部已经在使⽤H a d o o p技术构建⼤数据处理平台。
SparkConf源码解读------------恢复内容开始------------1.主要功能:SparkConf是Spark的配置类,配置spark的application的应⽤程序,使⽤(key,value)来进⾏存储配置信息。
2.主要形式:val conf=new SparkConf(),读取任何spark.*的配置,包括开发⼈员所设置的配置,因为SparkConf中含有辅助构造器:def this()=this(true),此辅助构造器中布尔值为true说明读取外部配置信息。
在配置单元⾥可以设置def this()=this(false),跳过外在配置信息。
3.Spark配置: Spark中的每⼀个组件都直接或者间接的使⽤SparkConf所存储的配置属性,这些属性都存储在数据结构ConcurrentHashMap中即 private val settings=new ConcurrentHashMap[String,String]() 参数配置的优先级为SparkConf > spark-submit和spark-shell命令的参数指定 > 配置⽂件spark-defaults.conf。
4.如何获取SparkConf配置: (1)来源于系统参数(即System.getproperties获取的属性)中以Spark.作为前缀的那部分属性 (2)使⽤SparkConf的API进⾏设置 (3)从其它SparkConf中克隆(1)系统属性中配置 在SparkConf中有⼀个Boolean类型属性loadDefaults,当loadDefaults为true时,将从系统属性中加载Spark配置,代码如下:/** Create a SparkConf that loads defaults from system properties and the classpath */def this() = this(true) //构造⽅法if (loadDefaults) {loadFromSystemProperties(false)}private[spark] def loadFromSystemProperties(silent: Boolean): SparkConf = {// Load any spark.* system properties 加载以spark. 开头的系统属性for ((key, value) <- Utils.getSystemProperties if key.startsWith("spark.")) {set(key, value, silent)}this}loadFromSystemProperties 上述代码调⽤了Utils⼯具类的getSystemProperties⽅法,其作⽤为获取系统的键值对属性,loadFromSystemProperties获取到系统属性后,使⽤scala守卫过滤出以"spark."字符串为前缀的Key和value并且调⽤set⽅法,最终设置到settings中private[spark] def set(key: String, value: String, silent: Boolean): SparkConf = {if (key == null) {throw new NullPointerException("null key")}if (value == null) {throw new NullPointerException("null value for " + key)}if (!silent) {logDeprecationWarning(key)}settings.put(key, value)this}配置属性存储到settings中(2)使⽤SparkConf配置的API 给SparkConf添加配置的⼀种常见⽅式是使⽤SparkConf提供的API,其中这些API最终实际调⽤了set的重载⽅法如:重载的set⽅法/** Set a configuration variable. */def set(key: String, value: String): SparkConf = {set(key, value, false)} Sparkconf的setMaster,setAppName,setJars,setExecutorEnv,setSparkHome,setAll等⽅法都是通过上述的set⽅法完成Spark配置的,如setMaster和setAppName/*** The master URL to connect to, such as "local" to run locally with one thread, "local[4]" to* run locally with 4 cores, or "spark://master:7077" to run on a Spark standalone cluster.*/def setMaster(master: String): SparkConf = {set("spark.master", master)}/** Set a name for your application. Shown in the Spark web UI. */def setAppName(name: String): SparkConf = {set("", name)}添加配置(3)克隆SparkConf配置 在某些情况下,同⼀个SparkConf实例中的配置信息需要被多个组件公⽤,⽽我们往往会想到的⽅法是将SparkConf实例定义为全局变量或者通过参数传递给其他组件,但是这样会引⼊并发问题,虽然settings数据结构为ConcurrentHashMap是线程安全的,⽽且ConcurrentHashMap也被证明是⾼并发下性能表现不错的数据结构,但是存在并发,就⼀定有性能的损失问题,也可以创建⼀个SparkConf 实例b,并将a中的配置信息全部拷贝到b中,这样会浪费内存,导致代码散落在程序的各个部分。
大数据:Spark Shuffle(二)Executor、Driver之间Shuffle结果消息传递、追踪1. 前言输出Shuffle结果到Shuffle_shuffleId_mapId_0.data数据文件中,每个executor需要向Driver汇报当前节点的Shuffle结果状态,Driver保存结果信息进行下个Task的调度。
2. StatusUpdate消息当Executor运行完Task的时候需要向Driver汇报StatusUpdate的消息[plain] view plain copyoverride def statusUpdate(taskId: Long, state: TaskState, data: ByteBuffer) {val msg = StatusUpdate(executorId, taskId, state, data)driver match {case Some(driverRef) => driverRef.send(msg)case None => logWarning(s"Drop $msg because has not yet connected to driver")}}整个结构体中包含了ExecutorId: Executor自己的IDTaskId: task分配的IDState: Task的运行状态[plain] view plain copyLAUNCHING, RUNNING, FINISHED, FAILED, KILLED, LOSTData: 保存序列化的Result2.1 Executor端发送在Task运行后的结果,Executor会将结果首先序列化成ByteBuffer封装成DirectTaskResult,再次序列化DirectTaskResult成ByteBuffer,很显然序列化的结果的大小会决定不同的传递策略。
ApacheSpark简单介绍、安装及使⽤Apache Spark简介Apache Spark是⼀个⾼速的通⽤型计算引擎,⽤来实现分布式的⼤规模数据的处理任务。
分布式的处理⽅式可以使以前单台计算机⾯对⼤规模数据时处理不了的情况成为可能。
Apache Spark安装及配置(OS X下的Ubuntu虚拟机)学习新东西最好是在虚拟机下操作,以免对现在的开发环境造成影响,我的系统是OS X,安装的是VirtualBox虚拟机,然后在虚拟机⾥安装的Ubuntu系统。
VirtualBox安装⽅法请查看教程:注意在安装过程中设置4GB的RAM和20GB的空间,否则会出现不够⽤的情况。
安装 AnacondaAnaconda 是Python科学计算包的合集,在接下来的例⼦中,会⽤到其中的matplotlib⽤来⽣成⼀张柱状图。
下载地址:然后在Terminal中输⼊命令:bash Anaconda2-4.1.1-Linux-x86_64.sh安装 Java SDKSpark运⾏在JVM上,所以还需要安装Java SDK:$ sudo apt-get install software-properties-common$ sudo add-apt-repository ppa:webupd8team/java$ sudo apt-get update$ sudo apt-get install oracle-java8-installer设置JAVA_HOME打开.bashrc⽂件gedit .bashrc在.bashrc中添加如下设置:JAVA_HOME=/usr/lib/jvm/java-8-oracleexport JAVA_HOMEPATH=$PATH:$JAVA_HOMEexport PATH安装Spark去官⽹下载压缩包,下载地址将安装包解压,命令如下:$ tar -zxvf spark-2.0.0-bin-hadoop2.7.tgz$ rm spark-2.0.0-bin-hadoop2.7.tgz启⽤IPython Notebook打开.bashrc⽂件gedit .bashrc在.bashrc中添加如下设置:export PYSPARK_DRIVER_PYTHON=ipythonexport PYSPARK_DRIVER_PYTHON_OPTS=notebook检查是否安装成功(需重启Terminal)cd ~/spark-2.0.0-bin-hadoop2.7./bin/pysparkApache Spark简单使⽤打开Spark服务后,点击new - Notebooks - Python新建⼀个Notebook⽂件。
Spark学习笔记(2)---Spark消息通信源码分析Spark消息通信Spark启动消息通信Spark启动过程中主要是进⾏Master和Worker之间的通信,其消息发送关系如下,⾸先由worker节点向Master发送注册消息,然后Master处理完毕后,返回注册成功消息或失败消息。
其详细过程如下:(1) 当Master启动后,随之启动各Worker,Worker启动时会创建通信环境RpcEnv和终端点EndPoint,并向Master发送注册Worker的消息RegisterWorker.Worker.tryRegisterAllMasters⽅法如下:``` scala // 因为Master可能不⽌⼀个 private def tryRegisterAllMasters(): Array[JFuture[_]] = { masterRpcAddresses.map { masterAddress => registerMasterThreadPool.submit(new Runnable { override def run(): Unit = { try { logInfo("Connecting to master " + masterAddress + "...") // 获取Master终端点的引⽤ val masterEndpoint = rpcEnv.setupEndpointRef(masterAddress, Master.ENDPOINT_NAME) registerWithMaster(masterEndpoint) } catch {} ... } private def registerWithMaster(masterEndpoint: RpcEndpointRef): Unit = {// 根据Master节点的引⽤发送注册信息masterEndpoint.ask[RegisterWorkerResponse](RegisterWorker(workerId, host, port, self, cores, memory, workerWebUiUrl)).onComplete {// 返回注册成功或失败的结果// This is a very fast action so we can use "ThreadUtils.sameThread"case Success(msg) =>Utils.tryLogNonFatalError {handleRegisterResponse(msg)}case Failure(e) =>logError(s"Cannot register with master: ${masterEndpoint.address}", e)System.exit(1)}(ThreadUtils.sameThread)}(2) Master收到消息后,需要对Worker发送的信息进⾏验证、记录。
Spark作业执⾏流程源码解析本⽂梳理⼀下Spark作业执⾏的流程。
⽬录Spark作业和任务调度系统是其核⼼,通过内部RDD的依赖DAG,使得模块之间的调⽤和处理变得游刃有余。
相关概念Job(作业):通过⾏动操作⽣成的⼀个或多个调度阶段Stage:根据依赖关系划分的多个任务集,称为调度阶段,也叫做TaskSet(任务集)。
划分Stage是由DAGScheduler进⾏的,任务阶段分为Shuffle Map Stage和Result Stage。
Task:是Spark执⾏计算的最⼩单位,会被分发到Executor中执⾏。
DAGScheduler:是⾯向调度阶段的任务调度器,接收Spark应⽤提交的作业,根据依赖关系划分stage,并提交给TaskScheduler。
TaskScheduler:是⾯向任务的调度器,接收DAGScheduler划分好的stage,发送给Worker节点的Executor运⾏任务。
关于RDD相关知识、⾏动操作、宽窄依赖请参考概述Spark作业主要是根据我们编写的业务处理代码,⽣成⼀系列相互依赖的调度阶段,之后将调度阶段中的任务提交Executor的执⾏的过程。
上图是spark作业运⾏流程图。
主要分为四块:构建DAG⾏动操作触发提交作业,提交之后根据依赖关系构造DAG。
划分调度阶段、提交调度阶段DAGScheduler中根据宽依赖划分调度阶段(stage)。
每个stage包含多个task,组成taskset提交给TaskScheduler执⾏通过集群管理器启动任务TaskScheduler收到DAGScheduler提交的任务集,以任务的形式⼀个个分发到Executor中进⾏执⾏。
Executor端执⾏任务,完成后存储报告结果Executor接到任务后,扔到线程池中执⾏任务。
任务完成后,报告结果给Driver。
源码解析从以下的代码展开叙述:def main(args: Array[String]): Unit = {val sc = new SparkContext("local", "word-count", new SparkConf())val words = Seq("hello spark", "hello scala", "hello java")val rdd = sc.makeRDD(words)rdd.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _).sortByKey().foreach(println(_))}这是⼀个简单的WordCount案例。
Spark(⼗五)SparkCore的源码解读⼀、启动脚本分析独⽴部署模式下,主要由master和slaves组成,master可以利⽤zk实现⾼可⽤性,其driver,work,app等信息可以持久化到zk上;slaves由⼀台⾄多台主机构成。
Driver通过向Master申请资源获取运⾏环境。
启动master和slaves主要是执⾏/usr/dahua/spark/sbin⽬录下的start-master.sh和start-slaves.sh,或者执⾏start-all.sh,其中star-all.sh本质上就是调⽤start-master.sh和start-slaves.sh1.1 start-all.sh#1.判断SPARK_HOME是否有值,没有将其设置为当前⽂件所在⽬录的上级⽬录if [ -z "${SPARK_HOME}" ]; thenexport SPARK_HOME="$(cd "`dirname "$0"`"/..; pwd)"fi#2.执⾏${SPARK_HOME}/sbin/spark-config.sh,见以下分析. "${SPARK_HOME}/sbin/spark-config.sh"#3.执⾏"${SPARK_HOME}/sbin"/start-master.sh,见以下分析"${SPARK_HOME}/sbin"/start-master.sh#4.执⾏"${SPARK_HOME}/sbin"/start-slaves.sh,见以下分析"${SPARK_HOME}/sbin"/start-slaves.sh其中start-master.sh和start-slave.sh分别调⽤的是org.apache.spark.deploy.master.Master和org.apache.spark.deploy.worker.Worker1.2 start-master.shstart-master.sh调⽤了spark-daemon.sh,注意这⾥指定了启动的类#1.判断SPARK_HOME是否有值,没有将其设置为当前⽂件所在⽬录的上级⽬录if [ -z "${SPARK_HOME}" ]; thenexport SPARK_HOME="$(cd "`dirname "$0"`"/..; pwd)"fi# NOTE: This exact class name is matched downstream by SparkSubmit.# Any changes need to be reflected there.#2.设置CLASS="org.apache.spark.deploy.master.Master"CLASS="org.apache.spark.deploy.master.Master"#3.如果参数结尾包含--help或者-h则打印帮助信息,并退出if [[ "$@" = *--help ]] || [[ "$@" = *-h ]]; thenecho "Usage: ./sbin/start-master.sh [options]"pattern="Usage:"pattern+="\|Using Spark's default log4j profile:"pattern+="\|Registered signal handlers for""${SPARK_HOME}"/bin/spark-class $CLASS --help 2>&1 | grep -v "$pattern" 1>&2exit 1fi#4.设置ORIGINAL_ARGS为所有参数ORIGINAL_ARGS="$@"#5.执⾏${SPARK_HOME}/sbin/spark-config.sh. "${SPARK_HOME}/sbin/spark-config.sh"#6.执⾏${SPARK_HOME}/bin/load-spark-env.sh. "${SPARK_HOME}/bin/load-spark-env.sh"#7.SPARK_MASTER_PORT为空则赋值7077if [ "$SPARK_MASTER_PORT" = "" ]; thenSPARK_MASTER_PORT=7077fi#8.SPARK_MASTER_HOST为空则赋值本主机名(hostname)if [ "$SPARK_MASTER_HOST" = "" ]; thencase `uname` in(SunOS)SPARK_MASTER_HOST="`/usr/sbin/check-hostname | awk '{print $NF}'`";;(*)SPARK_MASTER_HOST="`hostname -f`";;esacfi#9.SPARK_MASTER_WEBUI_PORT为空则赋值8080if [ "$SPARK_MASTER_WEBUI_PORT" = "" ]; thenSPARK_MASTER_WEBUI_PORT=8080fi#10.执⾏脚本"${SPARK_HOME}/sbin"/spark-daemon.sh start $CLASS 1 \--host $SPARK_MASTER_HOST --port $SPARK_MASTER_PORT --webui-port $SPARK_MASTER_WEBUI_PORT \$ORIGINAL_ARGS其中10肯定是重点,分析之前我们看看5,6都⼲了些啥,最后直译出最后⼀个脚本1.3 spark-config.sh(1.2的第5步)#判断SPARK_HOME是否有值,没有将其设置为当前⽂件所在⽬录的上级⽬录if [ -z "${SPARK_HOME}" ]; thenexport SPARK_HOME="$(cd "`dirname "$0"`"/..; pwd)"fi#SPARK_CONF_DIR存在就⽤此⽬录,不存在⽤${SPARK_HOME}/confexport SPARK_CONF_DIR="${SPARK_CONF_DIR:-"${SPARK_HOME}/conf"}"# Add the PySpark classes to the PYTHONPATH:if [ -z "${PYSPARK_PYTHONPATH_SET}" ]; thenexport PYTHONPATH="${SPARK_HOME}/python:${PYTHONPATH}"export PYTHONPATH="${SPARK_HOME}/python/lib/py4j-0.10.6-src.zip:${PYTHONPATH}"export PYSPARK_PYTHONPATH_SET=1fi1.4 load-spark-env.sh(1.2的第6步)#1.判断SPARK_HOME是否有值,没有将其设置为当前⽂件所在⽬录的上级⽬录if [ -z "${SPARK_HOME}" ]; thensource "$(dirname "$0")"/find-spark-homefi#2.判断SPARK_ENV_LOADED是否有值,没有将其设置为1if [ -z "$SPARK_ENV_LOADED" ]; thenexport SPARK_ENV_LOADED=1#3.设置user_conf_dir为SPARK_CONF_DIR或SPARK_HOME/confexport SPARK_CONF_DIR="${SPARK_CONF_DIR:-"${SPARK_HOME}"/conf}"#4.执⾏"${user_conf_dir}/spark-env.sh" [注:set -/+a含义再做研究]if [ -f "${SPARK_CONF_DIR}/spark-env.sh" ]; then# Promote all variable declarations to environment (exported) variablesset -a. "${SPARK_CONF_DIR}/spark-env.sh"set +afifi# Setting SPARK_SCALA_VERSION if not already set.#5.选择scala版本,2.11和2.12都存在的情况下,优先选择2.11if [ -z "$SPARK_SCALA_VERSION" ]; thenASSEMBLY_DIR2="${SPARK_HOME}/assembly/target/scala-2.11"ASSEMBLY_DIR1="${SPARK_HOME}/assembly/target/scala-2.12"if [[ -d "$ASSEMBLY_DIR2" && -d "$ASSEMBLY_DIR1" ]]; thenecho -e "Presence of build for multiple Scala versions detected." 1>&2echo -e 'Either clean one of them or, export SPARK_SCALA_VERSION in spark-env.sh.' 1>&2exit 1fiif [ -d "$ASSEMBLY_DIR2" ]; thenexport SPARK_SCALA_VERSION="2.11"elseexport SPARK_SCALA_VERSION="2.12"fifi1.5 spark-env.sh列举很多种模式的选项配置1.6 spark-daemon.sh回过头来看看1.2第10步中需要直译出的最后⼀个脚本,如下:sbin/spark-daemon.sh start org.apache.spark.deploy.master.Master 1 --host hostname --port 7077 --webui-port 8080上⾯搞了半天只是设置了变量,最终才进⼊主⾓,继续分析spark-daemon.sh脚本#1.参数个数⼩于等于1,打印帮助if [ $# -le 1 ]; thenecho $usageexit 1fi#2.判断SPARK_HOME是否有值,没有将其设置为当前⽂件所在⽬录的上级⽬录if [ -z "${SPARK_HOME}" ]; thenexport SPARK_HOME="$(cd "`dirname "$0"`"/..; pwd)"fi#3.执⾏${SPARK_HOME}/sbin/spark-config.sh,见上述分析 [类似脚本是否有重复?原因是有的⼈是直接⽤spark-daemon.sh启动的服务,反正重复设置下变量不需要什么代价]. "${SPARK_HOME}/sbin/spark-config.sh"# get arguments# Check if --config is passed as an argument. It is an optional parameter.# Exit if the argument is not a directory.#4.判断第⼀个参数是否是--config,如果是取空格后⼀个字符串,然后判断该⽬录是否存在,不存在则打印错误信息并退出,存在设置SPARK_CONF_DIR为该⽬录,shift到下⼀个参数#[注:--config只能⽤在第⼀参数上]if [ "$1" == "--config" ]thenshiftconf_dir="$1"if [ ! -d "$conf_dir" ]thenecho "ERROR : $conf_dir is not a directory"echo $usageexit 1elseexport SPARK_CONF_DIR="$conf_dir"fishiftfi#5.分别设置option、command、instance为后⾯的三个参数(如:option=start,command=org.apache.spark.deploy.master.Master,instance=1) #[注:很多⼈⽤spark-daemon.sh启动服务不成功的原因是名字不全]option=$1shiftcommand=$1shiftinstance=$1shift#6.⽇志回滚函数,主要⽤于更改⽇志名,如log-->log.1等,略过spark_rotate_log (){log=$1;num=5;if [ -n "$2" ]; thennum=$2fiif [ -f "$log" ]; then # rotate logswhile [ $num -gt 1 ]; doprev=`expr $num - 1`[ -f "$log.$prev" ] && mv "$log.$prev" "$log.$num"num=$prevdonemv "$log" "$log.$num";fi}#7.执⾏${SPARK_HOME}/bin/load-spark-env.sh,见上述分析. "${SPARK_HOME}/bin/load-spark-env.sh"#8.判断SPARK_IDENT_STRING是否有值,没有将其设置为$USER(linux⽤户)if [ "$SPARK_IDENT_STRING" = "" ]; thenexport SPARK_IDENT_STRING="$USER"fi#9.设置SPARK_PRINT_LAUNCH_COMMAND=1export SPARK_PRINT_LAUNCH_COMMAND="1"# get log directory#10.判断SPARK_LOG_DIR是否有值,没有将其设置为${SPARK_HOME}/logs,并创建改⽬录,测试创建⽂件,修改权限if [ "$SPARK_LOG_DIR" = "" ]; thenexport SPARK_LOG_DIR="${SPARK_HOME}/logs"fimkdir -p "$SPARK_LOG_DIR"touch "$SPARK_LOG_DIR"/.spark_test > /dev/null 2>&1TEST_LOG_DIR=$?if [ "${TEST_LOG_DIR}" = "0" ]; thenrm -f "$SPARK_LOG_DIR"/.spark_testelsechown "$SPARK_IDENT_STRING" "$SPARK_LOG_DIR"fi#11.判断SPARK_PID_DIR是否有值,没有将其设置为/tmpif [ "$SPARK_PID_DIR" = "" ]; thenSPARK_PID_DIR=/tmpfi# some variables#12.设置log和pidlog="$SPARK_LOG_DIR/spark-$SPARK_IDENT_STRING-$command-$instance-$HOSTNAME.out"pid="$SPARK_PID_DIR/spark-$SPARK_IDENT_STRING-$command-$instance.pid"# Set default scheduling priority#13.判断SPARK_NICENESS是否有值,没有将其设置为0 [注:调度优先级,见后⾯]if [ "$SPARK_NICENESS" = "" ]; thenexport SPARK_NICENESS=0fi#14.execute_command()函数,暂且略过,调⽤时再作分析execute_command() {if [ -z ${SPARK_NO_DAEMONIZE+set} ]; thennohup -- "$@" >> $log 2>&1 < /dev/null &newpid="$!"echo "$newpid" > "$pid"# Poll for up to 5 seconds for the java process to startfor i in {1..10}doif [[ $(ps -p "$newpid" -o comm=) =~ "java" ]]; thenbreakfisleep 0.5donesleep 2# Check if the process has died; in that case we'll tail the log so the user can seeif [[ ! $(ps -p "$newpid" -o comm=) =~ "java" ]]; thenecho "failed to launch: $@"tail -10 "$log" | sed 's/^/ /'echo "full log in $log"fielse"$@"fi}#15.进⼊case语句,判断option值,进⼊该分⽀,我们以start为例# 执⾏run_command class "$@",其中$@此时为空,经验证,启动带上此参数后,关闭也需,不然关闭不了,后⾯再分析此参数作⽤# 我们正式进⼊run_command()函数,分析# I.设置mode=class,创建SPARK_PID_DIR,上⾯的pid⽂件是否存在,# II.SPARK_MASTER不为空,同步删除某些⽂件# III.回滚log⽇志# IV.进⼊case,command=org.apache.spark.deploy.master.Master,最终执⾏# nohup nice -n "$SPARK_NICENESS" "${SPARK_HOME}"/bin/spark-class $command "$@" >> "$log" 2>&1 < /dev/null &# newpid="$!"# echo "$newpid" > "$pid"# 重点转向bin/spark-class org.apache.spark.deploy.master.Masterrun_command() {mode="$1"shiftmkdir -p "$SPARK_PID_DIR"if [ -f "$pid" ]; thenTARGET_ID="$(cat "$pid")"if [[ $(ps -p "$TARGET_ID" -o comm=) =~ "java" ]]; thenecho "$command running as process $TARGET_ID. Stop it first."exit 1fifiif [ "$SPARK_MASTER" != "" ]; thenecho rsync from "$SPARK_MASTER"rsync -a -e ssh --delete --exclude=.svn --exclude='logs/*' --exclude='contrib/hod/logs/*' "$SPARK_MASTER/" "${SPARK_HOME}" fispark_rotate_log "$log"echo "starting $command, logging to $log"case "$mode" in(class)execute_command nice -n "$SPARK_NICENESS" "${SPARK_HOME}"/bin/spark-class "$command" "$@";;(submit)execute_command nice -n "$SPARK_NICENESS" bash "${SPARK_HOME}"/bin/spark-submit --class "$command" "$@";;(*)echo "unknown mode: $mode"exit 1;;esac}case $option in(submit)run_command submit "$@";;(start)run_command class "$@";;(stop)if [ -f $pid ]; thenTARGET_ID="$(cat "$pid")"if [[ $(ps -p "$TARGET_ID" -o comm=) =~ "java" ]]; thenecho "stopping $command"kill "$TARGET_ID" && rm -f "$pid"elseecho "no $command to stop"fielseecho "no $command to stop"fi;;(status)if [ -f $pid ]; thenTARGET_ID="$(cat "$pid")"if [[ $(ps -p "$TARGET_ID" -o comm=) =~ "java" ]]; thenecho $command is running.exit 0elseecho $pid file is present but $command not runningexit 1fielseecho $command not running.exit 2fi;;(*)echo $usageexit 1;;esac1.7 spark-class#1.判断SPARK_HOME是否有值,没有将其设置为当前⽂件所在⽬录的上级⽬录if [ -z "${SPARK_HOME}" ]; thensource "$(dirname "$0")"/find-spark-homefi#2.执⾏${SPARK_HOME}/bin/load-spark-env.sh,见上述分析. "${SPARK_HOME}"/bin/load-spark-env.sh# Find the java binary#3.判断JAVA_HOME是否为NULL,不是则设置RUNNER="${JAVA_HOME}/bin/java",否则找系统⾃带,在没有则报未设置,并退出if [ -n "${JAVA_HOME}" ]; thenRUNNER="${JAVA_HOME}/bin/java"elseif [ "$(command -v java)" ]; thenRUNNER="java"elseecho "JAVA_HOME is not set" >&2exit 1fifi# Find Spark jars.#4.查找SPARK_JARS_DIR,若${SPARK_HOME}/RELEASE⽂件存在,则SPARK_JARS_DIR="${SPARK_HOME}/jars",否则#SPARK_JARS_DIR="${SPARK_HOME}/assembly/target/scala-$SPARK_SCALA_VERSION/jars"if [ -d "${SPARK_HOME}/jars" ]; thenSPARK_JARS_DIR="${SPARK_HOME}/jars"elseSPARK_JARS_DIR="${SPARK_HOME}/assembly/target/scala-$SPARK_SCALA_VERSION/jars"fi#5.若SPARK_JARS_DIR不存在且$SPARK_TESTING$SPARK_SQL_TESTING有值[注:⼀般我们不设置这两变量],报错退出,否则LAUNCH_CLASSPATH="$SPARK_JARS_DIR/*" if [ ! -d "$SPARK_JARS_DIR" ] && [ -z "$SPARK_TESTING$SPARK_SQL_TESTING" ]; thenecho "Failed to find Spark jars directory ($SPARK_JARS_DIR)." 1>&2echo "You need to build Spark with the target \"package\" before running this program." 1>&2exit 1elseLAUNCH_CLASSPATH="$SPARK_JARS_DIR/*"fi# Add the launcher build dir to the classpath if requested.#6.SPARK_PREPEND_CLASSES不是NULL,则LAUNCH_CLASSPATH="${SPARK_HOME}/launcher/target/scala-$SPARK_SCALA_VERSION/classes:$LAUNCH_CLASSPATH",#添加编译相关⾄LAUNCH_CLASSPATHif [ -n "$SPARK_PREPEND_CLASSES" ]; thenLAUNCH_CLASSPATH="${SPARK_HOME}/launcher/target/scala-$SPARK_SCALA_VERSION/classes:$LAUNCH_CLASSPATH"fi# For tests#7.SPARK_TESTING不是NULL,则unset YARN_CONF_DIR和unset HADOOP_CONF_DIR,暂且当做是为了某种测试if [[ -n "$SPARK_TESTING" ]]; thenunset YARN_CONF_DIRunset HADOOP_CONF_DIRfi#8.build_command函数,略过build_command() {"$RUNNER" -Xmx128m -cp "$LAUNCH_CLASSPATH" uncher.Main "$@"printf "%d\0" $?}# Turn off posix mode since it does not allow process substitutionset +o posixCMD=()while IFS= read -d '' -r ARG; doCMD+=("$ARG")#9.最终调⽤"$RUNNER" -Xmx128m -cp "$LAUNCH_CLASSPATH" uncher.Main "$@",#直译:java -Xmx128m -cp "$LAUNCH_CLASSPATH" uncher.Main "$@"#转向java类uncher.Main,这就是java⼊⼝类done < <(build_command "$@")COUNT=${#CMD[@]}LAST=$((COUNT - 1))LAUNCHER_EXIT_CODE=${CMD[$LAST]}# Certain JVM failures result in errors being printed to stdout (instead of stderr), which causes# the code that parses the output of the launcher to get confused. In those cases, check if the# exit code is an integer, and if it's not, handle it as a special error case.if ! [[ $LAUNCHER_EXIT_CODE =~ ^[0-9]+$ ]]; thenecho "${CMD[@]}" | head -n-1 1>&2exit 1fiif [ $LAUNCHER_EXIT_CODE != 0 ]; thenexit $LAUNCHER_EXIT_CODEfiCMD=("${CMD[@]:0:$LAST}")exec "${CMD[@]}"1.8 start-slaves.sh#1.判断SPARK_HOME是否有值,没有将其设置为当前⽂件所在⽬录的上级⽬录if [ -z "${SPARK_HOME}" ]; thenexport SPARK_HOME="$(cd "`dirname "$0"`"/..; pwd)"fi#2.执⾏${SPARK_HOME}/sbin/spark-config.sh,见上述分析. "${SPARK_HOME}/sbin/spark-config.sh"#3.执⾏${SPARK_HOME}/bin/load-spark-env.sh,见上述分析. "${SPARK_HOME}/bin/load-spark-env.sh"# Find the port number for the master#4.SPARK_MASTER_PORT为空则设置为7077if [ "$SPARK_MASTER_PORT" = "" ]; thenSPARK_MASTER_PORT=7077fi#5.SPARK_MASTER_HOST为空则设置为`hostname`if [ "$SPARK_MASTER_HOST" = "" ]; thencase `uname` in(SunOS)SPARK_MASTER_HOST="`/usr/sbin/check-hostname | awk '{print $NF}'`";;(*)SPARK_MASTER_HOST="`hostname -f`";;esacfi# Launch the slaves#6.启动slaves,# "${SPARK_HOME}/sbin/slaves.sh" cd "${SPARK_HOME}" \; "${SPARK_HOME}/sbin/start-slave.sh" "spark://$SPARK_MASTER_HOST:$SPARK_MASTER_PORT"# 遍历conf/slaves中主机,其中有设置SPARK_SSH_OPTS,ssh每⼀台机器执⾏"${SPARK_HOME}/sbin/start-slave.sh" "spark://$SPARK_MASTER_HOST:$SPARK_MASTER_PORT" "${SPARK_HOME}/sbin/slaves.sh" cd "${SPARK_HOME}" \; "${SPARK_HOME}/sbin/start-slave.sh" "spark://$SPARK_MASTER_HOST:$SPARK_MASTER_PORT"1.9 转向start-slave.sh#1.判断SPARK_HOME是否有值,没有将其设置为当前⽂件所在⽬录的上级⽬录if [ -z "${SPARK_HOME}" ]; thenexport SPARK_HOME="$(cd "`dirname "$0"`"/..; pwd)"fi#2.设置CLASS="org.apache.spark.deploy.worker.Worker"CLASS="org.apache.spark.deploy.worker.Worker"#3.如果参数结尾包含--help或者-h则打印帮助信息,并退出if [[ $# -lt 1 ]] || [[ "$@" = *--help ]] || [[ "$@" = *-h ]]; thenecho "Usage: ./sbin/start-slave.sh [options] <master>"pattern="Usage:"pattern+="\|Using Spark's default log4j profile:"pattern+="\|Registered signal handlers for""${SPARK_HOME}"/bin/spark-class $CLASS --help 2>&1 | grep -v "$pattern" 1>&2exit 1fi#4.执⾏${SPARK_HOME}/sbin/spark-config.sh,见上述分析. "${SPARK_HOME}/sbin/spark-config.sh"#5.执⾏${SPARK_HOME}/bin/load-spark-env.sh,见上述分析. "${SPARK_HOME}/bin/load-spark-env.sh"#6.MASTER=$1,这⾥MASTER=spark://hostname:7077,然后shift,也就是说单独启动单个slave使⽤start-slave.sh spark://hostname:7077MASTER=$1shift#7.SPARK_WORKER_WEBUI_PORT为空则设置为8081if [ "$SPARK_WORKER_WEBUI_PORT" = "" ]; thenSPARK_WORKER_WEBUI_PORT=8081fi#8.函数start_instance,略过function start_instance {#设置WORKER_NUM=$1WORKER_NUM=$1shiftif [ "$SPARK_WORKER_PORT" = "" ]; thenPORT_FLAG=PORT_NUM=elsePORT_FLAG="--port"PORT_NUM=$(( $SPARK_WORKER_PORT + $WORKER_NUM - 1 ))fiWEBUI_PORT=$(( $SPARK_WORKER_WEBUI_PORT + $WORKER_NUM - 1 ))#直译:spark-daemon.sh start org.apache.spark.deploy.worker.Worker 1 --webui-port 7077 spark://hostname:7077#代码再次转向spark-daemon.sh,见上诉分析"${SPARK_HOME}/sbin"/spark-daemon.sh start $CLASS $WORKER_NUM \--webui-port "$WEBUI_PORT" $PORT_FLAG $PORT_NUM $MASTER "$@"}#9.判断SPARK_WORKER_INSTANCES(可以认为是单节点Worker进程数)是否为空# 为空,则start_instance 1 "$@"# 不为空,则循环# for ((i=0; i<$SPARK_WORKER_INSTANCES; i++)); do# start_instance $(( 1 + $i )) "$@"# doneif [ "$SPARK_WORKER_INSTANCES" = "" ]; thenstart_instance 1 "$@"elsefor ((i=0; i<$SPARK_WORKER_INSTANCES; i++)); do#10.转向start_instance函数start_instance $(( 1 + $i )) "$@"donefi⼆、其他脚本2.1 start-history-server.sh#1.判断SPARK_HOME是否有值,没有将其设置为当前⽂件所在⽬录的上级⽬录if [ -z "${SPARK_HOME}" ]; thenexport SPARK_HOME="$(cd "`dirname "$0"`"/..; pwd)"fi#2.执⾏${SPARK_HOME}/sbin/spark-config.sh,见上述分析. "${SPARK_HOME}/sbin/spark-config.sh"#3.执⾏${SPARK_HOME}/bin/load-spark-env.sh,见上述分析. "${SPARK_HOME}/bin/load-spark-env.sh"#4.exec "${SPARK_HOME}/sbin"/spark-daemon.sh start org.apache.spark.deploy.history.HistoryServer 1 $@ ,见上诉分析exec "${SPARK_HOME}/sbin"/spark-daemon.sh start org.apache.spark.deploy.history.HistoryServer 1 "$@"2.2 start-shuffle-service.sh#1.判断SPARK_HOME是否有值,没有将其设置为当前⽂件所在⽬录的上级⽬录if [ -z "${SPARK_HOME}" ]; thenexport SPARK_HOME="$(cd "`dirname "$0"`"/..; pwd)"fi#2.执⾏${SPARK_HOME}/sbin/spark-config.sh,见上述分析. "${SPARK_HOME}/sbin/spark-config.sh"#3.执⾏${SPARK_HOME}/bin/load-spark-env.sh,见上述分析. "${SPARK_HOME}/bin/load-spark-env.sh"#4.exec "${SPARK_HOME}/sbin"/spark-daemon.sh start org.apache.spark.deploy.ExternalShuffleService 1 ,见上诉分析exec "${SPARK_HOME}/sbin"/spark-daemon.sh start org.apache.spark.deploy.ExternalShuffleService 12.3 start-thriftserver.sh开启thriftserver,略三、spark-submit处理逻辑分析以上主要是介绍了spark启动的⼀些脚本,这⾥主要分析⼀下Spark源码中提交任务脚本的处理逻辑,从spark-submit⼀步步深⼊进去看看任务提交的整体流程,⾸先看⼀下整体的流程概要图:3.1 spark-submit# -z是检查后⾯变量是否为空(空则真) shell可以在双引号之内引⽤变量,单引号不可#这⼀步作⽤是检查SPARK_HOME变量是否为空,为空则执⾏then后⾯程序#source命令: source filename作⽤在当前bash环境下读取并执⾏filename中的命令#$0代表shell脚本⽂件本⾝的⽂件名,这⾥即使spark-submit#dirname⽤于取得脚本⽂件所在⽬录 dirname $0取得当前脚本⽂件所在⽬录#$(命令)表⽰返回该命令的结果#故整个if语句的含义是:如果SPARK_HOME变量没有设置值,则执⾏当前⽬录下的find-spark-home脚本⽂件,设置SPARK_HOME值if [ -z "${SPARK_HOME}" ]; thensource "$(dirname "$0")"/find-spark-homefi# disable randomized hash for string in Python 3.3+export PYTHONHASHSEED=0#执⾏spark-class脚本,传递参数org.apache.spark.deploy.SparkSubmit 和"$@"#这⾥$@表⽰之前spark-submit接收到的全部参数exec "${SPARK_HOME}"/bin/spark-class org.apache.spark.deploy.SparkSubmit "$@"所以spark-submit脚本的整体逻辑就是:⾸先检查SPARK_HOME是否设置;if 已经设置执⾏spark-class⽂件否则加载执⾏find-spark-home⽂件3.2 find-spark-home#定义⼀个变量⽤于后续判断是否存在定义SPARK_HOME的python脚本⽂件FIND_SPARK_HOME_PYTHON_SCRIPT="$(cd "$(dirname "$0")"; pwd)/find_spark_home.py"# Short cirtuit if the user already has this set.##如果SPARK_HOME为不为空值,成功退出程序if [ ! -z "${SPARK_HOME}" ]; thenexit 0# -f⽤于判断这个⽂件是否存在并且是否为常规⽂件,是的话为真,这⾥不存在为假,执⾏下⾯语句,给SPARK_HOME变量赋值elif [ ! -f "$FIND_SPARK_HOME_PYTHON_SCRIPT" ]; then# If we are not in the same directory as find_spark_home.py we are not pip installed so we don't# need to search the different Python directories for a Spark installation.# Note only that, if the user has pip installed PySpark but is directly calling pyspark-shell or# spark-submit in another directory we want to use that version of PySpark rather than the# pip installed version of PySpark.export SPARK_HOME="$(cd "$(dirname "$0")"/..; pwd)"else# We are pip installed, use the Python script to resolve a reasonable SPARK_HOME# Default to standard python interpreter unless told otherwiseif [[ -z "$PYSPARK_DRIVER_PYTHON" ]]; thenPYSPARK_DRIVER_PYTHON="${PYSPARK_PYTHON:-"python"}"fiexport SPARK_HOME=$($PYSPARK_DRIVER_PYTHON "$FIND_SPARK_HOME_PYTHON_SCRIPT")fi可以看到,如果事先⽤户没有设定SPARK_HOME的值,这⾥程序也会⾃动设置并且将其注册为环境变量,供后⾯程序使⽤当SPARK_HOME的值设定完成之后,就会执⾏Spark-class⽂件,这也是我们分析的重要部分,源码如下:3.3 spark-class#!/usr/bin/env bash#依旧是检查设置SPARK_HOME的值if [ -z "${SPARK_HOME}" ]; thensource "$(dirname "$0")"/find-spark-homefi#执⾏load-spark-env.sh脚本⽂件,主要⽬的在于加载设定⼀些变量值#设定spark-env.sh中的变量值到环境变量中,供后续使⽤#设定scala版本变量值. "${SPARK_HOME}"/bin/load-spark-env.sh# Find the java binary#检查设定java环境值#-n代表检测变量长度是否为0,不为0时候为真#如果已经安装Java没有设置JAVA_HOME,command -v java返回的值为${JAVA_HOME}/bin/javaif [ -n "${JAVA_HOME}" ]; thenRUNNER="${JAVA_HOME}/bin/java"elseif [ "$(command -v java)" ]; thenRUNNER="java"elseecho "JAVA_HOME is not set" >&2exit 1fifi# Find Spark jars.#-d检测⽂件是否为⽬录,若为⽬录则为真#设置⼀些关联Class⽂件if [ -d "${SPARK_HOME}/jars" ]; thenSPARK_JARS_DIR="${SPARK_HOME}/jars"elseSPARK_JARS_DIR="${SPARK_HOME}/assembly/target/scala-$SPARK_SCALA_VERSION/jars"fiif [ ! -d "$SPARK_JARS_DIR" ] && [ -z "$SPARK_TESTING$SPARK_SQL_TESTING" ]; thenecho "Failed to find Spark jars directory ($SPARK_JARS_DIR)." 1>&2echo "You need to build Spark with the target \"package\" before running this program." 1>&2exit 1elseLAUNCH_CLASSPATH="$SPARK_JARS_DIR/*"fi# Add the launcher build dir to the classpath if requested.if [ -n "$SPARK_PREPEND_CLASSES" ]; thenLAUNCH_CLASSPATH="${SPARK_HOME}/launcher/target/scala-$SPARK_SCALA_VERSION/classes:$LAUNCH_CLASSPATH"fi# For testsif [[ -n "$SPARK_TESTING" ]]; thenunset YARN_CONF_DIRunset HADOOP_CONF_DIRfi# The launcher library will print arguments separated by a NULL character, to allow arguments with# characters that would be otherwise interpreted by the shell. Read that in a while loop, populating# an array that will be used to exec the final command.## The exit code of the launcher is appended to the output, so the parent shell removes it from the# command array and checks the value to see if the launcher succeeded.#执⾏类⽂件uncher.Main,返回解析后的参数build_command() {"$RUNNER" -Xmx128m -cp "$LAUNCH_CLASSPATH" uncher.Main "$@"printf "%d\0" $?}# Turn off posix mode since it does not allow process substitution#将build_command⽅法解析后的参数赋给CMDset +o posixCMD=()while IFS= read -d '' -r ARG; doCMD+=("$ARG")done < <(build_command "$@")COUNT=${#CMD[@]}LAST=$((COUNT - 1))LAUNCHER_EXIT_CODE=${CMD[$LAST]}# Certain JVM failures result in errors being printed to stdout (instead of stderr), which causes# the code that parses the output of the launcher to get confused. In those cases, check if the# exit code is an integer, and if it's not, handle it as a special error case.if ! [[ $LAUNCHER_EXIT_CODE =~ ^[0-9]+$ ]]; thenecho "${CMD[@]}" | head -n-1 1>&2exit 1fiif [ $LAUNCHER_EXIT_CODE != 0 ]; thenexit $LAUNCHER_EXIT_CODEfiCMD=("${CMD[@]:0:$LAST}")#执⾏CMD中的某个参数类org.apache.spark.deploy.SparkSubmitexec "${CMD[@]}"spark-class⽂件的执⾏逻辑稍显复杂,总体上应该是这样的:检查SPARK_HOME的值----》执⾏load-spark-env.sh⽂件,设定⼀些需要⽤到的环境变量,如scala环境值,这其中也加载了spark-env.sh⽂件-------》检查设定java的执⾏路径变量值-------》寻找spark jars,设定⼀些引⽤相关类的位置变量------》执⾏类⽂件uncher.Main,返回解析后的参数给CMD-------》判断解析参数是否正确(代表了⽤户设置的参数是否正确)--------》正确的话执⾏org.apache.spark.deploy.SparkSubmit这个类3.4 SparkSubmit2.1最后提交语句,D:\src\spark-2.3.0\core\src\main\scala\org\apache\spark\deploy\SparkSubmit.scalaexec "${SPARK_HOME}"/bin/spark-class org.apache.spark.deploy.SparkSubmit "$@"override def main(args: Array[String]): Unit = {// Initialize logging if it hasn't been done yet. Keep track of whether logging needs to// be reset before the application starts.val uninitLog = initializeLogIfNecessary(true, silent = true)//拿到submit脚本传⼊的参数val appArgs = new SparkSubmitArguments(args)if (appArgs.verbose) {// scalastyle:off printlnprintStream.println(appArgs)// scalastyle:on println}//根据传⼊的参数匹配对应的执⾏⽅法appArgs.action match {//根据传⼊的参数提交命令case SparkSubmitAction.SUBMIT => submit(appArgs, uninitLog)//只有standalone和mesos集群模式才触发case SparkSubmitAction.KILL => kill(appArgs)//只有standalone和mesos集群模式才触发case SparkSubmitAction.REQUEST_STATUS => requestStatus(appArgs)}}3.4.1 submit⼗分关键,主要分为两步骤(1)调⽤prepareSubmitEnvironment(2)调⽤doRunMain。
Apache Spark的基本原理和使用指南Apache Spark是一个快速通用的大数据处理引擎,它可以在集群上进行分布式计算,并支持各种语言编写的应用程序。
Spark的主要特点是高速、易用和可扩展性,为大数据应用程序提供了高效的内存计算和高速数据处理能力,广泛应用于机器学习、图形处理、实时流处理、数据挖掘等多个领域。
在本文中,我们将介绍Apache Spark的基本原理和使用指南,包括Spark的架构、数据处理模型、编程接口、执行计划和优化等方面的知识,帮助读者了解Spark的内部机制和如何合理使用Spark进行数据处理和分析。
一、Spark的架构Spark的架构包括Driver、Cluster Manager和Executor三个主要组件,如下图所示:- Driver:驱动器负责Spark应用程序的执行和调度。
它可以向集群管理器请求资源,并将Spark应用程序分解成任务,将这些任务分配给不同的Executor执行。
- Cluster Manager:集群管理器负责分配资源给Spark应用程序,并监控集群的状态。
Spark支持多种集群管理器,如Standalone、YARN、Mesos、Kubernetes等。
- Executor:执行器是Spark中负责执行具体任务的工作节点。
它负责在本地执行任务,并向驱动器报告执行结果。
二、Spark的数据处理模型Spark的数据处理模型是面向数据集的,它可以将数据集分为多个分布式数据集(RDD),并对RDD进行各种数据操作和转换。
RDD是Spark数据处理的核心概念,它是一个不可变、分布式的数据集合,可以在集群节点之间传输和操作。
Spark的数据处理模型可以分为三个基本阶段:Transform、Cache和Action。
Python编写Apache Spark应用的经验分享Apache Spark是一个快速、通用的大数据处理框架,它提供了许多强大的功能和丰富的API,使得数据分析和处理变得更加高效和便捷。
在使用Apache Spark时,使用Python编写应用程序是一种流行的选择。
本文将分享一些关于使用Python编写Apache Spark应用的经验和技巧。
一、概述Apache Spark是一个用于大数据处理和分析的开源集群计算框架,它提供了一系列的API和工具,能够实现高速、可扩展的数据处理。
Python是一种常用的编程语言,在数据分析和机器学习领域有广泛的应用。
Python编写Apache Spark应用,可以发挥Python易学易用和Spark强大性能的优势。
二、安装和设置在开始编写Python应用之前,我们需要安装Apache Spark和相应的Python库。
首先,下载并安装Apache Spark,并将其配置为可以在Python中使用。
其次,安装Python依赖库,如pyspark和py4j等。
完成这些设置后,我们就可以使用Python编写Apache Spark应用了。
三、数据准备在开始数据处理之前,我们需要准备好相应的数据。
Apache Spark支持各种数据源,如CSV、JSON、Parquet等。
在Python中,我们可以使用pyspark库来读取和处理这些数据。
例如,通过以下代码读取一个CSV文件:```pythonfrom pyspark.sql import SparkSessionspark = SparkSession.builder.master("local").appName("Data Processing").getOrCreate()data = spark.read.csv("data.csv", header=True, inferSchema=True)```四、数据处理一旦我们准备好数据,接下来可以开始进行数据处理。
ApacheSpark大数据处理框架详细分析ApacheSpark是一个开源的大数据处理框架,它提供了一种快速、高效、可扩展的方式来处理大规模数据集。
本文将对ApacheSpark的架构、特点和应用进行详细分析。
一、ApacheSpark的架构ApacheSpark的架构包括四个主要组件:驱动器程序、执行器、集群管理器和分布式文件系统。
1. 驱动器程序:驱动器程序是用户编写的Spark应用程序的入口点,它负责定义任务的执行流程,包括任务的划分、调度和结果的收集等。
2. 执行器:执行器是分布式计算的核心组件,它负责在工作节点上执行具体的任务。
每个执行器都管理了若干个工作线程,用来并行地执行任务。
3. 集群管理器:集群管理器负责对Spark应用程序进行资源调度和管理,它可以使用各种集群管理工具,如YARN、Mesos等。
4. 分布式文件系统:分布式文件系统提供了数据的存储和访问功能,Spark支持多种分布式文件系统,如HDFS、S3等。
二、ApacheSpark的特点ApacheSpark具有以下几个显著的特点:1. 快速:Spark通过内存计算和基于DAG(有向无环图)的任务调度,实现了比传统MapReduce更快的计算速度。
同时,Spark还提供了丰富的缓存机制,可以将数据加载到内存中,以减少磁盘IO的开销。
2. 强大的API支持:Spark提供了丰富的API接口,支持Java、Scala、Python和R等多种编程语言,开发者可以根据自己的需求选择适合的API进行开发。
3. 可扩展性:Spark的执行引擎可以在大规模集群上进行横向扩展,支持高并发和大数据量的处理。
此外,Spark还提供了一些高级功能,如动态资源调整和任务隔离等,进一步提升了应用程序的可扩展性。
4. 多种数据处理功能:Spark不仅支持常见的批处理和交互式查询,还提供了流处理和机器学习等高级数据处理功能,可以满足各种应用场景的需求。
超人学院—Apache Spark源码走读之Task运行期之函数调用关系分析欢迎转载,转载请注明出处,超人学院。
概要本篇主要阐述在TaskRunner中执行的task其业务逻辑是如何被调用到的,另外试图讲清楚运行着的task其输入的数据从哪获取,处理的结果返回到哪里,如何返回。
准备1.spark已经安装完毕2.spark运行在local mode或local-cluster modelocal-cluster modelocal-cluster模式也称为伪分布式,可以使用如下指令运行MASTER=local[1,2,1024] bin/spark-shell[1,2,1024]分别表示,executor number, core number和内存大小,其中内存大小不应小于默认的512MDriver Programme的初始化过程分析初始化过程的涉及的主要源文件1.SparkContext.scala 整个初始化过程的入口2.SparkEnv.scala 创建BlockManager,MapOutputTrackerMaster, ConnectionManager, CacheManager3.DAGScheduler.scala 任务提交的入口,即将Job划分成各个stage的关键4.TaskSchedulerImpl.scala 决定每个stage可以运行几个task,每个task分别在哪个executor上运行5.SchedulerBackend1.最简单的单机运行模式的话,看LocalBackend.scala2.如果是集群模式,看源文件SparkDeploySchedulerBackend初始化过程步骤详解步骤1:根据初始化入参生成SparkConf,再根据SparkConf来创建SparkEnv, SparkEnv中主要包含以下关键性组件 1. BlockManager 2. MapOutputTracker 3. ShuffleFetcher 4. ConnectionManagerprivate[spark] val env = SparkEnv.create(conf,"",conf.get("spark.driver.host"),conf.get("spark.driver.port").toInt,isDriver = true,isLocal = isLocal)SparkEnv.set(env)步骤2:创建TaskScheduler,根据Spark的运行模式来选择相应的SchedulerBackend,同时启动taskscheduler,这一步至为关键private[spark] var taskScheduler =SparkContext.createTaskScheduler(this, master, appName)taskScheduler.start()TaskScheduler.start目的是启动相应的SchedulerBackend,并启动定时器进行检测overridedef start() {backend.start()if (!isLocal && conf.getBoolean("spark.speculation", false)) {logInfo("Starting speculative execution thread") import sc.env.actorSystem.dispatchersc.env.actorSystem.scheduler.schedule(SPECULATION_INTERVAL milliseconds,SPECULATION_INTERVAL milliseconds) {checkSpeculatableTasks()}}}步骤3:以上一步中创建的TaskScheduler实例为入参创建DAGScheduler并启动运行@volatileprivate[spark] var dagScheduler = new DAGScheduler(taskScheduler)dagScheduler.start()步骤4:启动WEB UIui.start()RDD的转换过程还是以最简单的wordcount为例说明rdd的转换过程sc.textFile("README.md").flatMap(line=>line.split(" ")).map(word => (word, 1)).reduceByKey(_ + _)上述一行简短的代码其实发生了很复杂的RDD转换,下面仔细解释每一步的转换过程和转换结果步骤1:val rawFile = sc.textFile("README.md")textFile先是生成hadoopRDD,然后再通过map操作生成MappedRDD,如果在spark-shell中执行上述语句,得到的结果可以证明所做的分析scala> sc.textFile("README.md")14/04/2313:11:48 WARN SizeEstimator: Failed to check whether UseCompressedOops is set; assuming yes14/04/2313:11:48 INFO MemoryStore: ensureFreeSpace(119741)called with curMem=0, maxMem=31138775014/04/2313:11:48 INFO MemoryStore: Block broadcast_0 stored asvalues to memory (estimated size 116.9 KB, free 296.8 MB)14/04/2313:11:48 DEBUG BlockManager: Put block broadcast_0locally took 277 ms14/04/2313:11:48 DEBUG BlockManager: Put for block broadcast_0without replication took 281 msres0: org.apache.spark.rdd.RDD[String] = MappedRDD[1] attextFile at :13步骤2: val splittedText = rawFile.flatMap(line =>line.split(" "))flatMap将原来的MappedRDD转换成为FlatMappedRDDdef flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U]= new FlatMappedRDD(this, sc.clean(f))步骤3:val wordCount = splittedText.map(word => (word, 1))利用word生成相应的键值对,上一步的FlatMappedRDD被转换成为MappedRDD步骤4:val reduceJob = wordCount.reduceByKey(_ + _),这一步最复杂步骤2,3中使用到的operation全部定义在RDD.scala中,而这里使用到的reduceByKey却在RDD.scala中见不到踪迹。
reduceByKey 的定义出现在源文件PairRDDFunctions.scala细心的你一定会问reduceByKey不是MappedRDD的属性和方法啊,怎么能被MappedRDD调用呢?其实这背后发生了一个隐式的转换,该转换将MappedRDD转换成为PairRDDFunctionsimplicit def rddToPairRDDFunctions[K: ClassTag, V: ClassTag](rdd: RDD[(K, V)]) =new PairRDDFunctions(rdd)这种隐式的转换是scala的一个语法特征,如果想知道的更多,请用关键字"scala implicit method"进行查询,会有不少的文章对此进行详尽的介绍。
接下来再看一看reduceByKey的定义def reduceByKey(func: (V, V) => V): RDD[(K, V)] = {reduceByKey(defaultPartitioner(self), func)}def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)] = {combineByKey[V]((v: V) => v, func, func, partitioner)}def combineByKey[C](createCombiner: V => C,mergeValue: (C, V) => C,mergeCombiners: (C, C) => C,partitioner: Partitioner,mapSideCombine: Boolean = true,serializerClass: String = null): RDD[(K, C)] = {if (getKeyClass().isArray) {if (mapSideCombine) {thrownew SparkException("Cannot use map-side combining with array keys.")}if (partitioner.isInstanceOf[HashPartitioner]) {thrownew SparkException("Default partitioner cannot partition array keys.")}}val aggregator = new Aggregator[K, V, C](createCombiner, mergeValue, mergeCombiners)if (self.partitioner == Some(partitioner)) {self.mapPartitionsWithContext((context, iter) => { new InterruptibleIterator(context,bineValuesByKey(iter, context))}, preservesPartitioning = true)} elseif (mapSideCombine) {val combined = self.mapPartitionsWithContext((context, iter) => {bineValuesByKey(iter, context)}, preservesPartitioning = true)val partitioned = new ShuffledRDD[K, C, (K, C)](combined, partitioner).setSerializer(serializerClass)partitioned.mapPartitionsWithContext((context, iter) => {new InterruptibleIterator(context,bineCombinersByKey(iter, context))}, preservesPartitioning = true)} else {// Don't apply map-side combiner.val values = new ShuffledRDD[K, V, (K, V)](self, partitioner).setSerializer(serializerClass)values.mapPartitionsWithContext((context, iter) => { new InterruptibleIterator(context,bineValuesByKey(iter, context))}, preservesPartitioning = true)}}reduceByKey最终会调用combineByKey, 在这个函数中PairedRDDFunctions会被转换成为ShuffleRDD,当调用mapPartitionsWithContext之后,shuffleRDD被转换成为MapPartitionsRDDLog输出能证明我们的分析res1: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[8] at reduceByKey at :13RDD转换小结小结一下整个RDD转换过程HadoopRDD->MappedRDD->FlatMappedRDD->MappedRDD->PairRDDFunc tions->ShuffleRDD->MapPartitionsRDD整个转换过程好长啊,这一切的转换都发生在任务提交之前。