MapReduce源码分析完整版
- 格式:doc
- 大小:754.50 KB
- 文档页数:16
第四章分布式计算框架MapReduce4.1初识MapReduceMapReduce是一种面向大规模数据并行处理的编程模型,也一种并行分布式计算框架。
在Hadoop流行之前,分布式框架虽然也有,但是实现比较复杂,基本都是大公司的专利,小公司没有能力和人力来实现分布式系统的开发。
Hadoop的出现,使用MapReduce框架让分布式编程变得简单。
如名称所示,MapReduce主要由两个处理阶段:Map阶段和Reduce 阶段,每个阶段都以键值对作为输入和输出,键值对类型可由用户定义。
程序员只需要实现Map和Reduce两个函数,便可实现分布式计算,而其余的部分,如分布式实现、资源协调、内部通信等,都是由平台底层实现,无需开发者关心。
基于Hadoop开发项目相对简单,小公司也可以轻松的开发分布式处理软件。
4.1.1 MapReduce基本过程MapReduce是一种编程模型,用户在这个模型框架下编写自己的Map函数和Reduce函数来实现分布式数据处理。
MapReduce程序的执行过程主要就是调用Map函数和Reduce函数,Hadoop把MapReduce程序的执行过程分为Map和Reduce两个大的阶段,如果细分可以为Map、Shuffle(洗牌)、Reduce三个阶段。
Map含义是映射,将要操作的每个元素映射成一对键和值,Reduce含义是归约,将要操作的元素按键做合并计算,Shuffle在第三节详细介绍。
下面以一个比较简单的示例,形象直观介绍一下Map、Reduce阶段是如何执行的。
有一组图形,包含三角形、圆形、正方形三种形状图形,要计算每种形状图形的个数,见下图4-1。
图:4-1 map/reduce计算不同形状的过程在Map阶段,将每个图形映射成形状(键Key)和数量(值Value),每个形状图形的数量值是“1”;Shuffle阶段的Combine(合并),相同的形状做归类;在Reduce阶段,对相同形状的值做求和计算。
mapreduce编程实验报告心得【实验报告心得】总结:本次mapreduce编程实验通过实际操作,使我对mapreduce编程框架有了更深入的理解。
在实验过程中,我学会了如何编写map和reduce函数,并利用这些函数从大数据集中进行数据提取和聚合分析。
通过这个实验,我还掌握了如何调试和优化mapreduce任务,以提高数据处理效率和性能。
一、实验目的:本次实验的目的是掌握mapreduce编程框架的使用方法,理解其实现原理,并在实际编程中熟练运用map和reduce函数进行数据处理和分析。
二、实验环境和工具:本次实验使用Hadoop分布式计算框架进行mapreduce编程。
使用的工具包括Hadoop集群、HDFS分布式文件系统以及Java编程语言。
三、实验过程:1. 实验准备:在开始实验前,我首先了解了mapreduce的基本概念和特点,以及Hadoop集群的配置和使用方法。
2. 实验设计:根据实验要求,我选择了一个适当的数据集,并根据具体需求设计了相应的map和reduce函数。
在设计过程中,我充分考虑了数据的结构和处理逻辑,以保证mapreduce任务的高效完成。
3. 实验编码:在实验编码过程中,我使用Java编程语言来实现map 和reduce函数。
我按照mapreduce编程模型,利用输入键值对和中间结果键值对来进行数据处理。
在编码过程中,我注意了代码的规范性和可读性,并进行了适当的优化。
4. 实验测试:完成编码后,我在Hadoop集群上部署和运行了我的mapreduce任务。
通过对数据集进行分析和处理,我验证了自己编写的map和reduce函数的正确性和性能。
5. 实验总结:在实验结束后,我对本次实验进行了总结。
我分析了实验中遇到的问题和挑战,并提出了相应的解决方法。
我还对mapreduce编程框架的优缺点进行了评估,并给出了自己的观点和建议。
四、实验结果和观点:通过本次实验,我成功实现了对选定数据集的mapreduce处理。
mapreduce总结MapReduce一个由Google出的分布式编程模型,它将大数据处理转化为分布式程序模型,提供了一种简单却强大的方法来处理海量的数据。
MapReduce优点在于提供了一种既可以低成本、高效率地处理大规模数据的数据处理框架,在大数据的处理和管理方面发挥了重要作用。
本文将对MapReduce的相关概念及其实现原理、特点和应用进行综述。
一、MapReduce的概念MapReduceGoogle发明的一种解决海量数据处理的分布式编程模型,它是一种计算框架,可以将一个大型数据集分割成多个小任务,并把任务分发到多台机器上执行,并最终将小任务的结果合并成最终结果。
MapReduce模型由Google在2004年提出,并于2005年在著名论文“MapReduce:A Flexible Data Processing Tool”中被正式发表,其主要贡献者为Google公司的三位研究人员:Jeff Dean、Sanjay Ghemawat Andrew Tomkins。
二、MapReduce的实现原理MapReduce实现原理主要分2个阶段。
1. Map:Map是利用已有的数据,进行数据归类和分块的过程,将大规模的数据量分割成多个中等规模的数据块,每个数据块用一个子任务来处理;2. Reduce阶段:Reduce是从 Map的多个子任务的结果中汇总出最终的结果;MapReduce框架建立在分布式环境之上,将一个大规模的计算任务拆分成小任务,分发到各个节点运行,最后把小任务的结果合并成一个总结果。
三、MapReduce的特点MapReduce模型提供了一种机制,可以实现以下处理大规模数据的特点:1.发处理大数据:MapReduce过将大数据集分成多个小数据集,并由多个节点并行处理,从而提供了大规模数据处理的并发能力,可以提升处理效率;2.错性:MapReduce型支持容错性处理,也即当某台机器出现故障或是宕机,MapReduce架会将任务重新分发到其它机器上执行,从而保证了数据的正确性;3.可伸缩性:MapReduce型具有较高的可伸缩性,即可以根据需求随时增加或减少计算任务的数量,从而改变计算的规模;4.持低延迟的数据处理:MapReduce数据处理过程中,可以有效避免数据倾斜现象,从而减少任务处理的时间。
关键字: 分布式云计算Google的核心竞争技术是它的计算平台。
Google的大牛们用了下面5篇文章,介绍了它们的计算设施。
GoogleCluster: /archive/googlecluster.htmlChubby:/papers/chubby.htmlGFS:/papers/gfs.htmlBigTable:/papers/bigtable.htmlMapReduce:/papers/mapreduce.html很快,Apache上就出现了一个类似的解决方案,目前它们都属于Apache的Hadoop项目,对应的分别是:Chubby-->ZooKeeperGFS-->HDFSBigTable-->HBaseMapReduce-->Hadoop目前,基于类似思想的Open Source项目还很多,如Facebook用于用户分析的Hive。
HDFS作为一个分布式文件系统,是所有这些项目的基础。
分析好HDFS,有利于了解其他系统。
由于Hadoop的HDFS和MapReduce 是同一个项目,我们就把他们放在一块,进行分析。
下图是MapReduce整个项目的顶层包图和他们的依赖关系。
Hadoop包之间的依赖关系比较复杂,原因是HDFS提供了一个分布式文件系统,该系统提供API,可以屏蔽本地文件系统和分布式文件系统,甚至象Amazon S3这样的在线存储系统。
这就造成了分布式文件系统的实现,或者是分布式文件系统的底层的实现,依赖于某些貌似高层的功能。
功能的相互引用,造成了蜘蛛网型的依赖关系。
一个典型的例子就是包conf,conf用于读取系统配置,它依赖于fs,主要是读取配置文件的时候,需要使用文件系统,而部分的文件系统的功能,在包fs中被抽象了。
Hadoop的关键部分集中于图中蓝色部分,这也是我们考察的重点。
下面给出了Hadoop的包的功能分析。
Hadoop源代码分析(三)由于Hadoop的MapReduce和HDFS都有通信的需求,需要对通信的对象进行序列化。
MapReduce编程一、实验目的1、理解MapReduce编程模型基本知识2、掌握MapReduce开发环境的搭建3、掌握MapReduce基本知识,能够运用MapReduce进行基本的开发二、实验原理MapReduce 是Hadoop两个最基础最重要的核心成员之一。
它是大规模数据(TB 级)计算的利器,Map 和Reduce 是它的主要思想,来源于函数式编程语言。
从编程的角度来说MapReduce分为Map函数和Reduce函数,Map负责将数据打散,Reduce负责对数据进行聚集,用户只需要实现map 和reduce 两个接口,即可完成TB级数据的计算。
Hadoop Map Reduce的实现采用了Master/Slave 结构。
Master 叫做JobTracker,而Slave 叫做TaskTracker。
用户提交的计算叫做Job,每一个Job会被划分成若干个Tasks。
JobTracker负责Job 和Tasks 的调度,而TaskTracker负责执行Tasks。
常见的应用包括:日志分析和数据挖掘等数据分析应用,另外,还可用于科学数据计算,如圆周率PI 的计算等。
MapReduce 框架的核心步骤主要分两部分:Map 和Reduce。
当你向MapReduce 框架提交一个计算作业时,它会首先把计算作业拆分成若干个Map 任务,然后分配到不同的节点上去执行,每一个Map 任务处理输入数据中的一部分,当Map 任务完成后,它会生成一些中间文件,这些中间文件将会作为Reduce 任务的输入数据。
Reduce 任务的主要目标就是把前面若干个Map 的输出汇总到一起并输出。
按照以上基本的描述,其工作图如下。
从工作流程来讲,MapReduce对应的作业Job首先把输入的数据集切分为若干独立的数据块,并由Map组件以Task的方式并行处理。
处理结果经过排序后,依次输入给Reduce 组件,并且以Task的形式并行处理。
mapreduce编程模型的原理MapReduce编程模型是一种分布式计算模型,用于处理大规模数据集。
它的原理是将数据集划分成小的数据块,然后并行地在集群的多个节点上执行Map和Reduce操作,最终将结果合并起来形成最终结果。
MapReduce编程模型的主要原理可以归纳为以下几个方面:1. 数据划分MapReduce会将大规模数据集划分为小的数据块,每个数据块通常在64MB到1GB之间。
将数据划分为小的数据块可以方便地并行处理,也可以减少网络传输的数据量。
2. Map操作Map操作是MapReduce中的第一步。
Map操作会对数据块中的每个数据进行处理,其中Map会将每个数据转化为一个中间键-值对(key-value),key表示数据属性,value表示值。
Map操作通常包括以下步骤:(1)输入:从输入数据中读取数据块(2)映射:将输入数据转换为中间键-值对(3)缓存:将处理后的中间键-值对缓存在内存中3. Shuffle操作Shuffle操作是MapReduce中的第二步,Shuffle操作会将Map操作生成的中间键-值对重新组合,并按照key值将它们分组。
Shuffle操作通常包括以下步骤:(1)数据的拷贝:将Map输出的中间键-值对按照key值拷贝到Reduce操作的计算节点上(2)数据的排序:按照key值对中间键-值对进行排序,便于Reduce操作的处理(3)数据的分区:将排序后的中间键-值对分成多个分区,每个分区包含相同key值的中间键-值对4. Reduce操作Reduce操作是MapReduce中的第三步。
在Reduce操作中,Map操作生成的中间键-值对被分成多个分区,每个分区都包含相同key值的键值对。
在Reduce操作中,对每个分区中的中间键-值对进行处理,并生成一个输出结果。
Reduce操作通常包括以下步骤:(1)输入:从Map操作的输出获取中间键-值对分组信息(2)缓存:将Map操作输出的中间键-值对缓存到内存中(3)分组:将缓存中的中间键-值对按照key值分组(4)Reduce:对每个分组中的中间键-值对进行Reduce操作,并将结果输出5. 在Master节点上进行控制和协调MapReduce编程模型中,由Master节点来进行任务的分配、管理和协调。
第8章 MapReduce 相关特性详解184 if(key.toString().equals("hello")){ //判断计数条件 context.setStatus("BadKey is coming!"); //写入Reduce 状态context.getCounter(ReportTest.ReduceReport).increment(1); //计数增加 }context.write(key, new IntWritable(sum));};}}从以上程序的黑体部分中可以看到,使用了大量的关键字进行计数器的设置,因此可以通过触发条件对计数值进行增加。
图8-4展示了最终结果。
图8-4 程序8-1运行结果计数器视图最上面部分是设置的计数器计数,根据获取条件产生了若干个计数器对结果进行输出。
8.1.3 动态计数器对于设定的计数器,可以通过在初始处设置枚举,使得计数器能够引用枚举中的类型,从而提供计数的功能。
小提示:有些时候,某些问题的产生并不适合在枚举处提供,例如一些产生的错误并不能在一开始定义,因此需要一个动态定义计数器的方法对数据进行定义。
除了前面所述的使用getCounter 方法获取枚举中值的方式外,Context 类中还有一个重载的方法能够对当前计数器进行动态定义,其源码如下所示:public Counter getCounter(String groupName, String counterName) { return reporter.getCounter(groupName, counterName);}此方法通过重新动态定义计数器实现对信息的动态捕获。
代码如程序8-2所示。
大数据实例代码大数据在各行各业中的应用越来越广泛,并且通过实例代码的使用,我们可以更好地理解和应用大数据技术。
本文将介绍几个常见的大数据实例代码,以帮助读者更好地掌握和应用这些技术。
一、MapReduce 实现 Word CountMapReduce 是一种用于处理大数据集的编程模型,它将数据分为若干个小块进行并行处理,并最终合并结果。
下面是一个使用MapReduce 实现的 Word Count 实例代码:```javaimport java.io.IOException;import org.apache.hadoop.fs.Path;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.Reducer;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;public class WordCount {public static class Map extends Mapper<LongWritable, Text, Text, IntWritable> {private final static IntWritable one = new IntWritable(1);private Text word = new Text();public void map(LongWritable key, Text value, Context context)throws IOException, InterruptedException {String line = value.toString();String[] words = line.split(" ");for (String w : words) {word.set(w);context.write(word, one);}}}public static class Reduce extends Reducer<Text, IntWritable, Text, IntWritable> {public void reduce(Text key, Iterable<IntWritable> values, Context context)throws IOException, InterruptedException {int sum = 0;for (IntWritable val : values) {sum += val.get();}context.write(key, new IntWritable(sum));}}public static void main(String[] args) throws Exception { Configuration conf = new Configuration();Job job = new Job(conf, "wordcount");job.setJarByClass(WordCount.class);job.setMapperClass(Map.class);job.setReducerClass(Reduce.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(IntWritable.class);FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); job.waitForCompletion(true);}}```二、Spark 实现数据清洗Spark 是一种快速、通用的大数据处理引擎,支持在大规模数据集上进行高效的数据处理。
mapreduce实验报告总结一、引言MapReduce是一种用于处理和生成大数据集的编程模型和模型化工具,它由Google提出并广泛应用于各种大数据处理场景。
通过MapReduce,我们可以将大规模数据集分解为多个小任务,并分配给多个计算节点并行处理,从而大大提高了数据处理效率。
在本实验中,我们通过实践操作,深入了解了MapReduce的工作原理,并尝试解决了一些实际的大数据处理问题。
二、实验原理MapReduce是一种编程模型,它通过两个核心阶段——Map阶段和Reduce阶段,实现了对大规模数据的处理。
Map阶段负责处理输入数据集中的每个元素,生成一组中间结果;Reduce阶段则对Map阶段的输出进行汇总和聚合,生成最终结果。
通过并行处理和分布式计算,MapReduce可以在大量计算节点上高效地处理大规模数据集。
在本实验中,我们使用了Hadoop平台来实现MapReduce模型。
Hadoop是一个开源的分布式计算框架,它提供了包括MapReduce在内的一系列数据处理功能。
通过Hadoop,我们可以方便地搭建分布式计算环境,实现大规模数据处理。
三、实验操作过程1.数据准备:首先,我们需要准备一个大规模的数据集,可以是结构化数据或非结构化数据。
在本实验中,我们使用了一个包含大量文本数据的CSV文件。
2.编写Map任务:根据数据处理的需求,我们编写了一个Map任务,该任务从输入数据集中读取文本数据,提取出关键词并进行分类。
3.编写Reduce任务:根据Map任务的输出,我们编写了一个Reduce任务,该任务将相同关键词的文本数据进行汇总,生成最终结果。
4.运行MapReduce作业:将Map和Reduce任务编译成可执行脚本,并通过Hadoop作业调度器提交作业,实现并行处理。
5.数据分析:获取处理后的结果,并进行数据分析,以验证数据处理的有效性。
四、实验结果与分析实验结束后,我们得到了处理后的数据结果。
MapReduce案例-流量统计(⼀)### 需求⼀: 统计求和统计每个⼿机号的上⾏数据包总和,下⾏数据包总和,上⾏总流量之和,下⾏总流量之和分析:以⼿机号码作为key值,上⾏流量,下⾏流量,上⾏总流量,下⾏总流量四个字段作为value值,然后以这个key,和value作为map阶段的输出,reduce阶段的输⼊##### Step 1: ⾃定义map的输出value对象FlowBean```javapublic class FlowBean implements Writable {private Integer upFlow; //上⾏数据包数private Integer downFlow; //下⾏数据包数private Integer upCountFlow; //上⾏流量总和private Integer downCountFlow;//下⾏流量总和public Integer getUpFlow() {return upFlow;}public void setUpFlow(Integer upFlow) {this.upFlow = upFlow;}public Integer getDownFlow() {return downFlow;}public void setDownFlow(Integer downFlow) {this.downFlow = downFlow;}public Integer getUpCountFlow() {return upCountFlow;}public void setUpCountFlow(Integer upCountFlow) {this.upCountFlow = upCountFlow;}public Integer getDownCountFlow() {return downCountFlow;}public void setDownCountFlow(Integer downCountFlow) {this.downCountFlow = downCountFlow;}@Overridepublic String toString() {return upFlow +"\t" + downFlow +"\t" + upCountFlow +"\t" + downCountFlow;}//序列化⽅法@Overridepublic void write(DataOutput out) throws IOException {out.writeInt(upFlow);out.writeInt(downFlow);out.writeInt(upCountFlow);out.writeInt(downCountFlow);}//反序列化@Overridepublic void readFields(DataInput in) throws IOException {this.upFlow = in.readInt();this.downFlow = in.readInt();this.upCountFlow = in.readInt();this.downCountFlow = in.readInt();}}```##### Step 2: 定义FlowMapper类```javapublic class FlowCountMapper extends Mapper<LongWritable,Text,Text,FlowBean> {/*将K1和V1转为K2和V2:K1 V10 1360021750219 128 1177 16852 200------------------------------K2 V2136******** FlowBean(19 128 1177 16852)*/@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {//1:拆分⾏⽂本数据,得到⼿机号--->K2String[] split = value.toString().split("\t");String phoneNum = split[1];//2:创建FlowBean对象,并从⾏⽂本数据拆分出流量的四个四段,并将四个流量字段的值赋给FlowBean对象FlowBean flowBean = new FlowBean();flowBean.setUpFlow(Integer.parseInt(split[6]));flowBean.setDownFlow(Integer.parseInt(split[7]));flowBean.setUpCountFlow(Integer.parseInt(split[8]));flowBean.setDownCountFlow(Integer.parseInt(split[9]));//3:将K2和V2写⼊上下⽂中context.write(new Text(phoneNum), flowBean);}}```##### Step 3: 定义FlowReducer类```javapublic class FlowCountReducer extends Reducer<Text,FlowBean,Text,FlowBean> {@Overrideprotected void reduce(Text key, Iterable<FlowBean> values, Context context) throws IOException, InterruptedException { //1:遍历集合,并将集合中的对应的四个字段累计Integer upFlow = 0; //上⾏数据包数Integer downFlow = 0; //下⾏数据包数Integer upCountFlow = 0; //上⾏流量总和Integer downCountFlow = 0;//下⾏流量总和for (FlowBean value : values) {upFlow += value.getUpFlow();downFlow += value.getDownFlow();upCountFlow += value.getUpCountFlow();downCountFlow += value.getDownCountFlow();}//2:创建FlowBean对象,并给对象赋值 V3FlowBean flowBean = new FlowBean();flowBean.setUpFlow(upFlow);flowBean.setDownFlow(downFlow);flowBean.setUpCountFlow(upCountFlow);flowBean.setDownCountFlow(downCountFlow);//3:将K3和V3下⼊上下⽂中context.write(key, flowBean);}}```##### Step 4: 程序main函数⼊⼝FlowMain```javapublic class JobMain extends Configured implements Tool {//该⽅法⽤于指定⼀个job任务@Overridepublic int run(String[] args) throws Exception {//1:创建⼀个job任务对象Job job = Job.getInstance(super.getConf(), "mapreduce_flowcount");//如果打包运⾏出错,则需要加该配置job.setJarByClass(JobMain.class);//2:配置job任务对象(⼋个步骤)//第⼀步:指定⽂件的读取⽅式和读取路径job.setInputFormatClass(TextInputFormat.class);//TextInputFormat.addInputPath(job, new Path("hdfs://node01:8020/wordcount")); TextInputFormat.addInputPath(job, new Path("file:///D:\\input\\flowcount_input")); //第⼆步:指定Map阶段的处理⽅式和数据类型job.setMapperClass(FlowCountMapper.class);//设置Map阶段K2的类型job.setMapOutputKeyClass(Text.class);//设置Map阶段V2的类型job.setMapOutputValueClass(FlowBean.class);//第三(分区),四(排序)//第五步: 规约(Combiner)//第六步分组//第七步:指定Reduce阶段的处理⽅式和数据类型job.setReducerClass(FlowCountReducer.class);//设置K3的类型job.setOutputKeyClass(Text.class);//设置V3的类型job.setOutputValueClass(FlowBean.class);//第⼋步: 设置输出类型job.setOutputFormatClass(TextOutputFormat.class);//设置输出的路径TextOutputFormat.setOutputPath(job, new Path("file:///D:\\out\\flowcount_out")); //等待任务结束boolean bl = job.waitForCompletion(true);return bl ? 0:1;}public static void main(String[] args) throws Exception {Configuration configuration = new Configuration();//启动job任务int run = ToolRunner.run(configuration, new JobMain(), args);System.exit(run);}}```。
一MapReduce概述Map/Reduce是一个用于大规模数据处理的分布式计算模型,它最初是由Google工程师设计并实现的,Google已经将它完整的MapReduce论文公开发布了。
其中对它的定义是,Map/Reduce是一个编程模型(programming model),是一个用于处理和生成大规模数据集(processing and generating large data sets)的相关的实现。
用户定义一个map函数来处理一个key/value对以生成一批中间的key/value对,再定义一个reduce函数将所有这些中间的有着相同key的values合并起来。
很多现实世界中的任务都可用这个模型来表达。
二MapReduce工作原理1 Map-ReduceMap-Reduce框架的运作完全基于<key,value>对,即数据的输入是一批<key,value>对,生成的结果也是一批<key,value>对,只是有时候它们的类型不一样而已。
Key和value的类由于需要支持被序列化(serialize)操作,所以它们必须要实现Writable接口,而且key的类还必须实现WritableComparable接口,使得可以让框架对数据集的执行排序操作。
一个Map-Reduce任务的执行过程以及数据输入输出的类型如下所示:Map:<k1,v1> ——> list<k2,v2>Reduce:<k2,list<v2>> ——> <k3,v3>2例子下面通过一个的例子来详细说明这个过程。
WordCount是Hadoop自带的一个例子,目标是统计文本文件中单词的个数。
假设有如下的两个文本文件来运行WorkCount程序:Hello World Bye WorldHello Hadoop GoodBye Hadoop2.1 map数据输入Hadoop针对文本文件缺省使用LineRecordReader类来实现读取,一行一个key/value对,key取偏移量,value为行内容。
如下是map1的输入数据:Key1 Value10 Hello World Bye World如下是map2的输入数据:Key1Value10 Hello Hadoop GoodBye Hadoop2.2 map输出/combine输入如下是map1的输出结果Key2Value2Hello 1World 1Bye 1World 1 如下是map2的输出结果Key2Value2Hello 1Hadoop 1GoodBye 1Hadoop 12.3 combine输出Combiner类实现将相同key的值合并起来,它也是一个Reducer的实现。
如下是combine1的输出Key2Value2Hello 1World 2Bye 1如下是combine2的输出Key2Value2Hello 1Hadoop 2GoodBye 12.4 reduce输出Reducer类实现将相同key的值合并起来。
如下是reduce的输出Key2Value2Hello 2World 2Bye 1Hadoop 2GoodBye 1三MapReduce框架结构1 角色1.1 JobClient每一个job都会在用户端通过JobClient类将应用程序以及配置参数打包成jar文件存储在HDFS,并把路径提交到JobTracker,然后由JobTracker创建每一个Task(即MapTask和ReduceTask)并将它们分发到各个TaskTracker服务中去执行。
1.2 JobTrackerJobTracker是一个master服务,JobTracker负责调度job的每一个子任务task运行于TaskTracker上,并监控它们,如果发现有失败的task就重新运行它。
一般情况应该把JobTracker部署在单独的机器上。
1.3 TaskTrackerTaskTracker是运行于多个节点上的slaver服务。
TaskTracker则负责直接执行每一个task。
TaskTracker都需要运行在HDFS的DataNode上,2 数据结构2.1 Mapper和Reducer运行于Hadoop的MapReduce应用程序最基本的组成部分包括一个Mapper和一个Reducer类,以及一个创建JobConf的执行程序,在一些应用中还可以包括一个Combiner 类,它实际也是Reducer的实现。
2.2 JobInProgressJobClient提交job后,JobTracker会创建一个JobInProgress来跟踪和调度这个job,并把它添加到job队列里。
JobInProgress会根据提交的job jar中定义的输入数据集(已分解成FileSplit)创建对应的一批TaskInProgress用于监控和调度MapTask,同时在创建指定数目的TaskInProgress用于监控和调度ReduceTask,缺省为1个ReduceTask。
2.3 TaskInProgressJobTracker启动任务时通过每一个TaskInProgress来launchTask,这时会把Task对象(即MapTask和ReduceTask)序列化写入相应的TaskTracker服务中,TaskTracker收到后会创建对应的TaskInProgress(此TaskInProgress实现非JobTracker中使用的TaskInProgress,作用类似,是JobTracker内部类)用于监控和调度该Task。
启动具体的Task进程是通过TaskInProgress管理的TaskRunner对象来运行的。
TaskRunner会自动装载job jar,并设置好环境变量后启动一个独立的java child进程来执行Task,即MapTask或者ReduceTask,但它们不一定运行在同一个TaskTracker中。
2.4 MapTask和ReduceTask一个完整的job会自动依次执行Mapper、Combiner(在JobConf指定了Combiner时执行)和Reducer,其中Mapper和Combiner是由MapTask调用执行,Reducer则由ReduceTask 调用,Combiner实际也是Reducer接口类的实现。
Mapper会根据job jar中定义的输入数据集按<key1,value1>对读入,处理完成生成临时的<key2,value2>对,如果定义了Combiner,MapTask会在Mapper完成调用该Combiner将相同key的值做合并处理,以减少输出结果集。
MapTask的任务全完成即交给ReduceTask进程调用Reducer处理,生成最终结果<key3,value3>对。
这个过程在下一部分再详细介绍。
下图描述了Map/Reduce框架中主要组成和它们之间的关系:3 流程一道MapRedcue作业是通过JobClient.rubJob(job)向master节点的JobTracker提交的, JobTracker接到JobClient的请求后把其加入作业队列中。
JobTracker一直在等待JobClient 通过RPC提交作业,而TaskTracker一直通过RPC向JobTracker发送心跳heartbeat询问有没有任务可做,如果有,让其派发任务给它执行。
如果JobTracker的作业队列不为空, 则TaskTracker发送的心跳将会获得JobTracker给它派发的任务。
这是一道pull过程。
slave节点的TaskTracker接到任务后在其本地发起Task,执行任务。
以下是简略示意图:下面详细介绍一下Map/Reduce处理一个工作的流程。
四JobClient在编写MapReduce程序时通常是上是这样写的:Configuration conf = new Configuration(); // 读取hadoop配置Job job = new Job(conf, "作业名称"); // 实例化一道作业job.setMapperClass(Mapper类型);job.setCombinerClass(Combiner类型);job.setReducerClass(Reducer类型);job.setOutputKeyClass(输出Key的类型);job.setOutputValueClass(输出Value的类型);FileInputFormat.addInputPath(job, new Path(输入hdfs路径));FileOutputFormat.setOutputPath(job, new Path(输出hdfs路径));// 其它初始化配置JobClient.runJob(job);1 配置JobJobConf是用户描述一个job的接口。
下面的信息是MapReduce过程中一些较关键的定制信息:2 JobClient.runJob()运行Job并分解输入数据集一个MapReduce的Job会通过JobClient类根据用户在JobConf类中定义的InputFormat实现类来将输入的数据集分解成一批小的数据集,每一个小数据集会对应创建一个MapTask来处理。
JobClient会使用缺省的FileInputFormat类调用FileInputFormat.getSplits()方法生成小数据集,如果判断数据文件是isSplitable()的话,会将大的文件分解成小的FileSplit,当然只是记录文件在HDFS里的路径及偏移量和Split大小。
这些信息会统一打包到jobFile的jar中。
JobClient然后使用submitJob(job)方法向master提交作业。
submitJob(job)内部是通过submitJobInternal(job)方法完成实质性的作业提交。
submitJobInternal(job)方法首先会向hadoop分布系统文件系统hdfs依次上传三个文件: job.jar, job.split和job.xml。
job.xml: 作业配置,例如Mapper, Combiner, Reducer的类型,输入输出格式的类型等。
job.jar: jar包,里面包含了执行此任务需要的各种类,比如Mapper,Reducer等实现。
job.split: 文件分块的相关信息,比如有数据分多少个块,块的大小(默认64m)等。