hadoop API介绍
- 格式:pdf
- 大小:159.38 KB
- 文档页数:7
如何使用HDFS Java API 在Eclipse中进行文件操作一、概述在本文中,我将向您介绍如何使用Hadoop分布式文件系统(HDFS)的Java API在Eclipse中进行文件操作的基本流程。
HDFS是Hadoop生态系统中的一个核心组件,用于在分布式环境下存储和处理大规模数据。
通过本文的学习,您将能够深入了解HDFS的基本操作,并掌握在Eclipse中使用Java API对HDFS进行文件操作的技巧。
二、准备工作在开始之前,您需要进行一些准备工作:1. 安装Hadoop和Eclipse您需要安装Hadoop和Eclipse。
您可以从官方网站下载Hadoop和Eclipse的安装包,并按照官方指南进行安装。
2. 配置Hadoop环境在安装Hadoop之后,您需要配置Hadoop的环境变量。
确保Hadoop的bin目录已经加入到系统的PATH变量中,以便在Eclipse 中调用Hadoop命令。
3. 创建Hadoop项目在Eclipse中,您需要创建一个新的Java项目,并将Hadoop库添加到项目的Build Path中。
这样您才能够在项目中使用Hadoop的Java API。
三、基本流程1. 创建HDFS客户端要在Eclipse中使用Hadoop的Java API对HDFS进行文件操作,首先需要创建一个HDFS客户端。
您可以通过以下代码来创建一个HDFS客户端并连接到HDFS集群:```javaConfiguration conf = new Configuration();conf.set("fs.defaultFS", "hdfs://localhost:9000");FileSystem fs = FileSystem.get(conf);```在这段代码中,我们首先创建了一个Configuration对象,并设置了HDFS的默认文件系统为hdfs://localhost:9000。
Hadoop 生态系统介绍Hadoop生态系统是一个开源的大数据处理平台,它由Apache基金会支持和维护,可以在大规模的数据集上实现分布式存储和处理。
Hadoop生态系统是由多个组件和工具构成的,包括Hadoop 核心,Hive、HBase、Pig、Spark等。
接下来,我们将对每个组件及其作用进行介绍。
一、Hadoop核心Hadoop核心是整个Hadoop生态系统的核心组件,它主要由两部分组成,一个是Hadoop分布式文件系统(HDFS),另一个是MapReduce编程模型。
HDFS是一个高可扩展性的分布式文件系统,可以将海量数据存储在数千台计算机上,实现数据的分散储存和高效访问。
MapReduce编程模型是基于Hadoop的针对大数据处理的一种模型,它能够对海量数据进行分布式处理,使大规模数据分析变得容易和快速。
二、HiveHive是一个开源的数据仓库系统,它使用Hadoop作为其计算和存储平台,提供了类似于SQL的查询语法,可以通过HiveQL 来查询和分析大规模的结构化数据。
Hive支持多种数据源,如文本、序列化文件等,同时也可以将结果导出到HDFS或本地文件系统。
三、HBaseHBase是一个开源的基于Hadoop的列式分布式数据库系统,它可以处理海量的非结构化数据,同时也具有高可用性和高性能的特性。
HBase的特点是可以支持快速的数据存储和检索,同时也支持分布式计算模型,提供了易于使用的API。
四、PigPig是一个基于Hadoop的大数据分析平台,提供了一种简单易用的数据分析语言(Pig Latin语言),通过Pig可以进行数据的清洗、管理和处理。
Pig将数据处理分为两个阶段:第一阶段使用Pig Latin语言将数据转换成中间数据,第二阶段使用集合行处理中间数据。
五、SparkSpark是一个快速、通用的大数据处理引擎,可以处理大规模的数据,支持SQL查询、流式数据处理、机器学习等多种数据处理方式。
黑马程序员hadoop笔记Hadoop是当前最流行的大数据处理框架之一,具备高可靠性、高扩展性和高效性等特点。
本文将全面介绍Hadoop的相关内容,包括其基本概念、架构设计、应用场景以及使用方法等。
1. Hadoop的基本概念Hadoop是一个开源的分布式计算平台,其核心由Hadoop分布式文件系统(HDFS)和MapReduce计算框架组成。
HDFS采用主从架构,支持海量数据的分布式存储和处理;MapReduce则是一种分布式计算模型,提供了高效的数据处理能力。
2. Hadoop的架构设计Hadoop采用了分布式存储和计算的架构设计,主要包括主节点(NameNode)和多个工作节点(DataNode)组成。
主节点负责管理整个系统的元数据信息,存储在内存中,而工作节点则负责存储和计算任务的执行。
3. Hadoop的应用场景Hadoop广泛应用于大规模数据处理和分析领域。
它可以处理各种类型的数据,包括结构化数据、半结构化数据和非结构化数据等。
常见的应用场景包括日志分析、推荐系统、搜索引擎和数据仓库等。
4. Hadoop的使用方法使用Hadoop进行数据处理通常需要编写MapReduce程序,它由Mapper和Reducer两个组件组成。
Mapper负责将输入数据切分成若干键值对,然后执行相应的逻辑处理;Reducer负责对Mapper的输出结果进行归纳和聚合。
在编写MapReduce程序时,我们需要定义数据的输入和输出路径,并指定Mapper和Reducer的逻辑处理方式。
通过Hadoop提供的命令行工具和API,可以方便地操作Hadoop集群,提交任务并监控任务的执行状态。
本文对Hadoop的概念、架构设计、常见应用场景和使用方法进行了简要介绍。
Hadoop作为一种强大的大数据处理框架,具备高可靠性和高扩展性,适用于处理大规模数据和复杂计算任务。
通过深入学习和掌握Hadoop的知识,我们可以更好地应对现实中的数据挑战,并开展相关的数据分析和应用开发工作。
hadoop技术、方法以及原理的理解Hadoop技术、方法以及原理的理解Hadoop是一个开源的分布式计算框架,它能够存储和处理海量的数据。
它由Apache基金会开发和维护,是目前最流行的大数据处理解决方案之一。
Hadoop的技术、方法以及原理是构成Hadoop 的核心部分,下面我们将对其进行详细的解析。
一、Hadoop的技术1. HDFSHadoop分布式文件系统(HDFS)是Hadoop的核心组件之一。
它是一种高度容错的分布式文件系统,具有高可靠性和高可用性。
该文件系统将海量数据分散存储在多个节点上,以实现快速访问和处理。
2. MapReduceMapReduce是Hadoop的另一个核心组件,它是一种编程模型和处理数据的方式。
MapReduce将数据分成小的块,然后在分布式计算机集群上处理这些块。
MapReduce将任务分为Map和Reduce两个阶段。
在Map阶段,数据被分割并分配给不同的节点进行计算。
在Reduce阶段,计算的结果被合并起来并输出。
3. YARNHadoop资源管理器(YARN)是另一个重要的组件,它是一个分布式的集群管理系统,用于管理Hadoop集群中的资源。
YARN允许多个应用程序同时运行在同一个Hadoop集群上,通过动态管理资源来提高集群的使用效率。
二、Hadoop的方法1. 大数据存储Hadoop通过HDFS实现对海量数据的存储和管理。
HDFS的设计目标是支持大型数据集的分布式处理,它通过多个节点存储数据,提供高可靠性和高可用性。
2. 数据处理Hadoop通过MapReduce实现对海量数据的处理。
MapReduce 将数据分成小的块,然后在分布式计算机集群上处理这些块。
在Map阶段,数据被分割并分配给不同的节点进行计算。
在Reduce 阶段,计算的结果被合并起来并输出。
3. 数据分析Hadoop通过Hive、Pig和Spark等工具实现数据分析。
这些工具提供了高级查询和数据分析功能,可以通过SQL和其他编程语言来处理海量数据。
hadoop基本架构和工作原理Hadoop是一个分布式开源框架,用于处理海量数据。
它能够使用廉价的硬件来搭建集群,同时还提供了高度可靠性和容错性。
Hadoop基本架构包括Hadoop Common、Hadoop Distributed File System (HDFS)和Hadoop MapReduce三个部分,下面将详细介绍Hadoop的工作原理。
1. Hadoop CommonHadoop Common是整个Hadoop架构的基础部分,是一个共享库,它包含了大量的Java类和应用程序接口。
Hadoop集群的每一台机器上都要安装Hadoop Common,并保持相同版本。
2. HDFSHadoop Distributed File System(HDFS)是Hadoop的分布式文件存储部分。
它的目的是将大型数据集分成多个块,并且将这些块在集群中的多个节点间分布式存储。
HDFS可以实现高度可靠性,因为它将每个块在存储节点之间备份。
HDFS可以在不同的节点中进行数据备份,这确保了数据发生故障时,可以轻松恢复。
3. MapReduceHadoop MapReduce是一种编程模型,用于处理大型数据集。
它将处理任务分成两个主要阶段,即Map阶段和Reduce阶段。
在Map阶段,MapReduce将数据集分成小块,并将每个块分配给不同的节点进行处理。
在Reduce阶段,结果被聚合,以生成最终的输出结果。
总的来说,MapReduce作为Hadoop的核心组件,负责对数据集进行处理和计算。
它充当的角色是一个调度员,它会将不同的任务分发到集群中的不同节点上,并尽力保证每个任务都可以获得足够的计算资源。
Hadoop采用多种技术来提供MapReduce的分布式计算能力,其中包括TaskTracker、JobTracker和心跳机制等。
TaskTracker是每个集群节点的一个守护程序,负责处理MapReduce任务的具体实现。
HadoopHDFS编程API⼊门系列之从本地上传⽂件到HDFS(⼀) 不多说,直接上代码。
代码版本11package zhouls.bigdata.myWholeHadoop.HDFS.hdfs5;23import java.io.IOException;45import .URI;6import .URISyntaxException;78import org.apache.hadoop.conf.Configuration;9import org.apache.hadoop.fs.FileSystem;10import org.apache.hadoop.fs.Path;1112/**13 *14 * @author15 * @function Copying from Local file system to HDFS,即把本地⽂件(如windows或linux⽂件拷贝到hdfs上)16 *17*/18public class CopyingLocalFileToHDFS19 {20/**21 * @function Main() ⽅法22 * @param args23 * @throws IOException24 * @throws URISyntaxException25*/26public static void main(String[] args) throws IOException,URISyntaxException{27// 本地⽂件路径(如windows或linux⽂件)28// String source = "D://Data/weibo.txt";29 String source = "./data/weibo.txt";30// hdfs⽂件路径31 String dest = "hdfs://HadoopMaster:9000/middle/weibo/";32 copyFromLocal(source, dest);33 }3435/**36 * @function 本地⽂件上传⾄ HDFS37 * @param source 原⽂件路径38 * @param dest ⽬的⽂件路径39 * @throws IOException40 * @throws URISyntaxException41*/42public static void copyFromLocal(String source, String dest)throws IOException, URISyntaxException { 43// 读取hadoop⽂件系统的配置44 Configuration conf = new Configuration();45 URI uri = new URI("hdfs://HadoopMaster:9000");46// FileSystem是⽤户操作HDFS的核⼼类,它获得URI对应的HDFS⽂件系统47 FileSystem fileSystem = FileSystem.get(uri, conf);48// 源⽂件路径49 Path srcPath = new Path(source);50// ⽬的路径51 Path dstPath = new Path(dest);52// 查看⽬的路径是否存在53if (!(fileSystem.exists(dstPath))) {54// 如果路径不存在,即刻创建55 fileSystem.mkdirs(dstPath);56 }57// 得到本地⽂件名称58 String filename = source.substring(stIndexOf('/') + 1,source.length());59try {60// 将本地⽂件上传到HDFS61 fileSystem.copyFromLocalFile(srcPath, dstPath);62 System.out.println("File " + filename + " copied to " + dest);63 } catch (Exception e) {64 System.err.println("Exception caught! :" + e);65 System.exit(1);66 } finally {67 fileSystem.close();68 }69 }7071 }代码版本21package com.dajiangtai.Hadoop.HDFS;23import java.io.IOException;4import .URI;5import .URISyntaxException;67import org.apache.hadoop.conf.Configuration;8import org.apache.hadoop.fs.FSDataInputStream;9import org.apache.hadoop.fs.FSDataOutputStream;10import org.apache.hadoop.fs.FileStatus;11import org.apache.hadoop.fs.FileSystem;12import org.apache.hadoop.fs.FileUtil;13import org.apache.hadoop.fs.Path;14import org.apache.hadoop.fs.PathFilter;15/**16 * @function 将指定格式的多个⽂件上传⾄ HDFS17 * 使⽤⽂件模式,实现多⽂件上传⾄HDFS18 * @author⼩讲19 *20*/21 @SuppressWarnings("unused")22public class CopyManyFilesToHDFS {2324private static FileSystem fs = null;//FileSystem实例对象,即fs25private static FileSystem local = null;//FileSystem实例对象,即Local,本地⽂件系统2627/**28 * @function Main ⽅法29 * @param args30 * @throws IOException31 * @throws URISyntaxException32*/33public static void main(String[] args) throws IOException,URISyntaxException {34//⽂件上传路径35// Path dstPath = new Path("hdfs://djt002:9000/outData/copyManyFilesToHDFS/");//这样会在这个默认的copyManyFilesToHDFS.txt⾥36 Path dstPath = new Path("hdfs://djt002:9000/outCopyManyFilesToHDFS/");//要么,你先可以新建好outCopyManyFilesToHDFS这个⽬录373839//调⽤⽂件上传 list ⽅法40 list(dstPath);41 }4243/**44 * function 过滤⽂件格式将多个⽂件上传⾄ HDFS45 * @param dstPath ⽬的路径46 * @throws IOException47 * @throws URISyntaxException48*/49public static void list(Path dstPath) throws IOException, URISyntaxException {50//读取hadoop⽂件系统的配置51 Configuration conf = new Configuration();52//HDFS 接⼝53 URI uri = new URI("hdfs://djt002:9000");5455// URL、URI与Path三者的区别56// Hadoop⽂件系统中通过Hadoop Path对象来代表⼀个⽂件57// URL(相当于绝对路径) -> (⽂件) -> URI(相当于相对路径,即代表URL前⾯的那⼀部分)58// URI:如hdfs://dajiangtai:900059// 如,URL.openStream606162//获得FileSystem实例fs63 fs = FileSystem.get(uri, conf);64// 返回类型是FileSystem,等价于 FileSystem fs = FileSystem.get(uri, conf);656667//获得FileSystem实例,即Local68 local = FileSystem.getLocal(conf);69// 返回类型是LocalFileSystem,等价于 LocalFileSystem local = FileSystem.getLocal(conf);7071// 为什么要获取到Local呢,因为,我们要把本地D盘下data/74⽬录下的⽂件要合并后,上传到HDFS⾥,所以,我们需先获取到Local,再来做复制⼯作啦! 727374//只上传data/testdata ⽬录下 txt 格式的⽂件75 FileStatus[] localStatus = local.globStatus(new Path("D://data/74/*"),new RegexAcceptPathFilter("^.*txt$"));76// FileStatus[] localStatus = local.globStatus(new Path("./data/copyManyFilesToHDFS/*"),new RegexAcceptPathFilter("^.*txt$"));77// ^表⽰匹配我们字符串开始的位置 *代表0到多个字符 $代表字符串结束的位置78// RegexAcceptPathFilter来只接收我们需要的,即格式79// RegexAcceptPathFilter这个⽅法我们⾃⼰写8081// 但是我们,最终是要处理⽂件⾥的东西,最终是要转成Path类型,因为Path对象f,它对应着⼀个⽂件。
Hadoop(四)HDFS的⾼级API操作⼀ HDFS客户端环境准备1.1 jar包准备1)解压hadoop-2.7.6.tar.gz到⾮中⽂⽬录2)进⼊share⽂件夹,查找所有jar包,并把jar包拷贝到_lib⽂件夹下3)在全部jar包中查找sources.jar,并剪切到_source⽂件夹。
4)在全部jar包中查找tests.jar,并剪切到_test⽂件夹1.2 Eclipse准备1)根据⾃⼰电脑的操作系统拷贝对应的编译后的hadoop jar包到⾮中⽂路径(例如:E:\02_software\hadoop-2.7.6)。
(如果不⽣效,重新启动eclipse)2)配置HADOOP_HOME环境变量3)创建第⼀个java⼯程HdfsClientDemo14)创建lib⽂件夹,然后添加jar包5)创建包,HdfsClient测试类public class HdfsClient {// 上传⽂件public static void main(String[] args) throws IOException, InterruptedException, URISyntaxException {// 1 获取⽂件系统Configuration configuration = new Configuration();// 配置在集群上运⾏// configuration.set("fs.defaultFS", "hdfs://node21:9000");// FileSystem fs = FileSystem.get(configuration);FileSystem fs = FileSystem.get(new URI("hdfs://node21:9000"), configuration, "admin");// 2 上传⽂件fs.copyFromLocalFile(new Path("e:/hello.txt"), new Path("/hello2.txt"));// 3 关闭资源fs.close();System.out.println("over");}}6)执⾏程序运⾏时需要配置⽤户名称,客户端去操作hdfs时,是有⼀个⽤户⾝份的。
hadoop大数据原理与应用Hadoop大数据原理与应用随着信息技术的飞速发展,大数据成为当今社会的热门话题之一。
而Hadoop作为大数据处理的重要工具,因其可靠性和高效性而备受关注。
本文将介绍Hadoop大数据的原理和应用。
一、Hadoop的原理Hadoop是一个开源的分布式计算框架,可以处理大规模数据集。
其核心组件包括Hadoop分布式文件系统(HDFS)和Hadoop分布式计算框架(MapReduce)。
HDFS是一个可靠的分布式文件系统,能够将大文件分成多个块并存储在不同的计算机节点上,以实现高容错性和高可用性。
而MapReduce是一种编程模型,将大规模数据集分成多个小的子集,然后在分布式计算集群上进行并行处理。
Hadoop的工作流程如下:首先,将大文件切分成多个块,并将这些块存储在不同的计算机节点上。
然后,在计算机节点上进行并行计算,每个节点都可以处理自己所存储的数据块。
最后,将每个节点的计算结果进行整合,得到最终的结果。
Hadoop的优势在于其可扩展性和容错性。
由于其分布式计算的特性,Hadoop可以轻松地处理大规模数据集。
同时,Hadoop还具有高容错性,即使某个计算机节点发生故障,整个计算任务也不会中断,系统会自动将任务分配给其他可用节点。
二、Hadoop的应用Hadoop广泛应用于大数据分析和处理领域。
以下是几个典型的应用场景:1.数据仓库:Hadoop可以存储和处理海量的结构化和非结构化数据,为企业提供全面的数据仓库解决方案。
通过使用Hadoop,企业可以轻松地将各种类型的数据整合到一个统一的平台上,从而更好地进行数据分析和挖掘。
2.日志分析:随着互联网的普及,各种网站和应用产生的日志数据越来越庞大。
Hadoop可以帮助企业对这些日志数据进行实时分析和处理,从而发现潜在的问题和机会。
3.推荐系统:在电子商务和社交媒体领域,推荐系统起着重要的作用。
Hadoop可以帮助企业分析用户的行为和偏好,从而提供个性化的推荐服务。
Hadoop生态中的大数据处理与分析第一章介绍Hadoop生态Hadoop是由Apache基金会开发的一个开源Java框架,用于处理大数据。
Hadoop生态系统是由许多不同的组件组成的,包括Hadoop文件系统(HDFS)、MapReduce、Hive、Pig、HBase等。
每个组件都有不同的目的和特点。
Hadoop生态系统为大数据处理提供了一整套完备的工具。
在Hadoop生态系统中,MapReduce是最常用的一项工具,它提供了分布式的数据处理功能。
在大数据处理中,MapReduce通常用于将大量数据分解为不同的小块,并在不同的节点间并行运算和处理。
第二章大数据的处理与分析大数据处理和分析是指处理大量数据并提取有用信息的过程。
大数据处理和分析可以帮助企业了解其业务、排除风险和改进业务决策。
但是,对于大数据的处理和分析来说,非结构化数据和半结构化数据是一个巨大的挑战。
这时候Hadoop生态系统可以帮助企业解决这个问题。
Hadoop生态系统的组件,如Hive、Pig、Spark和Storm等可以处理非常大的数据集,并提供高效的并行计算。
这些工具可以从海量的数据中提取有用的信息。
Hive和Pig可以将非结构化数据转换成结构化数据,并通过SQL查询进行分析。
Spark和Storm可以通过Stream Processing技术进行数据分析和处理。
Hadoop生态系统可以帮助企业在分析和处理大数据时提高效率并节省成本。
第三章 Hadoop生态系统的组件1. Hadoop文件系统(HDFS)HDFS是Hadoop生态系统中的核心组件,用于存储和管理大量数据。
在HDFS中,数据被分解为多个块,并分布在不同的服务器上,使得数据存储和处理更加高效。
HDFS提供了高可靠性、高可用性和高扩展性。
HDFS可以容错处理所有的节点故障,同时支持横向扩展。
2. MapReduceMapReduce是Hadoop生态系统中最常用的一项组件,用于分布式计算。
Hadoop API 使用介绍2009-11-17 00:57Hadoop API被分成(divide into)如下几种主要的包(package)org.apache.hadoop.conf 定义了系统参数的配置文件处理API。
org.apache.hadoop.fs 定义了抽象的文件系统API。
org.apache.hadoop.dfs Hadoop分布式文件系统(HDFS)模块的实现。
org.apache.hadoop.io 定义了通用的I/O API,用于针对网络,数据库,文件等数据对象做读写操作;org.apache.hadoop.ipc 用于网络服务端和客户端的工具,封装了网络异步I/O的基础模块。
org.apache.hadoop.mapred Hadoop分布式计算系统(MapReduce)模块的实现,包括任务的分发调度等。
org.apache.hadoop.metrics 定义了用于性能统计信息的API,主要用于mapred和dfs模块。
org.apache.hadoop.record 定义了针对记录的I/O API类以及一个记录描述语言翻译器,用于简化将记录序列化成语言中性的格式(language-neutral manner)。
org.apache.hadoop.tools 定义了一些通用的工具。
org.apache.hadoop.util 定义了一些公用的API。
MapReduce框架结构Map/Reduce是一个用于大规模数据处理的分布式计算模型,它最初是由Google工程师设计并实现的,Google已经将它完整的MapReduce论文公开发布了。
其中对它的定义是,Map/Reduce是一个编程模型(programming model),是一个用于处理和生成大规模数据集(processing and generating large data sets)的相关的实现。
用户定义一个map函数来处理一个key/value对以生成一批中间的key/value对,再定义一个reduce函数将所有这些中间的有着相同key的values合并起来。
很多现实世界中的任务都可用这个模型来表达。
Hadoop的Map/Reduce框架也是基于这个原理实现的,下面简要介绍一下Map/Reduce 框架主要组成及相互的关系。
2.1 总体结构2.1.1 Mapper和Reducer运行于Hadoop的MapReduce应用程序最基本的组成部分包括一个Mapper和一个Reducer类,以及一个创建JobConf的执行程序,在一些应用中还可以包括一个Combiner 类,它实际也是Reducer的实现。
2.1.2 JobTracker和TaskTracker它们都是由一个master服务JobTracker和多个运行于多个节点的slaver服务TaskTracker两个类提供的服务调度的。
master负责调度job的每一个子任务task运行于slave上,并监控它们,如果发现有失败的task就重新运行它,slave则负责直接执行每一个task。
TaskTracker都需要运行在HDFS的DataNode上,而JobTracker则不需要,一般情况应该把JobTracker部署在单独的机器上。
2.1.3 JobClient每一个job都会在用户端通过JobClient类将应用程序以及配置参数Configuration 打包成jar文件存储在HDFS,并把路径提交到JobTracker的master服务,然后由master 创建每一个Task(即MapTask和ReduceTask)将它们分发到各个TaskTracker服务中去执行。
2.1.4 JobInProgressJobClient提交job后,JobTracker会创建一个JobInProgress来跟踪和调度这个job,并把它添加到job队列里。
JobInProgress会根据提交的job jar中定义的输入数据集(已分解成FileSplit)创建对应的一批TaskInProgress用于监控和调度MapTask,同时在创建指定数目的TaskInProgress用于监控和调度ReduceTask,缺省为1个ReduceTask。
2.1.5 TaskInProgressJobTracker启动任务时通过每一个TaskInProgress来launch Task,这时会把Task 对象(即MapTask和ReduceTask)序列化写入相应的TaskTracker服务中,TaskTracker 收到后会创建对应的TaskInProgress(此TaskInProgress实现非JobTracker中使用的TaskInProgress,作用类似)用于监控和调度该Task。
启动具体的Task进程是通过TaskInProgress管理的TaskRunner对象来运行的。
TaskRunner会自动装载job jar,并设置好环境变量后启动一个独立的java child进程来执行Task,即MapTask或者ReduceTask,但它们不一定运行在同一个TaskTracker中。
2.1.6 MapTask和ReduceTask一个完整的job会自动依次执行Mapper、Combiner(在JobConf指定了Combiner时执行)和Reducer,其中Mapper和Combiner是由MapTask调用执行,Reducer则由ReduceTask 调用,Combiner实际也是Reducer接口类的实现。
Mapper会根据job jar中定义的输入数据集按<key1,value1>对读入,处理完成生成临时的<key2,value2>对,如果定义了Combiner,MapTask会在Mapper完成调用该Combiner将相同key的值做合并处理,以减少输出结果集。
MapTask的任务全完成即交给ReduceTask进程调用Reducer处理,生成最终结果<key3,value3>对。
这个过程在下一部分再详细介绍。
下图描述了Map/Reduce框架中主要组成和它们之间的关系:2.2 Job创建过程2.2.1 JobClient.runJob() 开始运行job并分解输入数据集一个MapReduce的Job会通过JobClient类根据用户在JobConf类中定义的InputFormat实现类来将输入的数据集分解成一批小的数据集,每一个小数据集会对应创建一个MapTask来处理。
JobClient会使用缺省的FileInputFormat类调用FileInputFormat.getSplits()方法生成小数据集,如果判断数据文件是isSplitable()的话,会将大的文件分解成小的FileSplit,当然只是记录文件在HDFS里的路径及偏移量和Split大小。
这些信息会统一打包到jobFile的jar中并存储在HDFS中,再将jobFile路径提交给JobTracker去调度和执行。
2.2.2 JobClient.submitJob() 提交job到JobTrackerjobFile的提交过程是通过RPC模块(有单独一章来详细介绍)来实现的。
大致过程是,JobClient类中通过RPC实现的Proxy接口调用JobTracker的submitJob()方法,而JobTracker必须实现JobSubmissionProtocol接口。
JobTracker则根据获得的jobFile路径创建与job有关的一系列对象(即JobInProgress和TaskInProgress等)来调度并执行job。
JobTracker创建job成功后会给JobClient传回一个JobStatus对象用于记录job的状态信息,如执行时间、Map和Reduce任务完成的比例等。
JobClient会根据这个JobStatus 对象创建一个NetworkedJob的RunningJob对象,用于定时从JobTracker获得执行过程的统计数据来监控并打印到用户的控制台。
与创建Job过程相关的类和方法如下图所示2.3 Job执行过程上面已经提到,job是统一由JobTracker来调度的,具体的Task分发给各个TaskTracker节点来执行。
下面通过源码来详细解析执行过程,首先先从JobTracker收到JobClient的提交请求开始。
2.3.1 JobTracker初始化Job和Task队列过程2.3.1.1 JobTracker.submitJob() 收到请求当JobTracker接收到新的job请求(即submitJob()函数被调用)后,会创建一个JobInProgress对象并通过它来管理和调度任务。
JobInProgress在创建的时候会初始化一系列与任务有关的参数,如job jar的位置(会把它从HDFS复制本地的文件系统中的临时目录里),Map和Reduce的数据,job的优先级别,以及记录统计报告的对象等。
2.3.1.2 JobTracker.resortPriority() 加入队列并按优先级排序JobInProgress创建后,首先将它加入到jobs队列里,分别用一个map成员变量jobs 用来管理所有jobs对象,一个list成员变量jobsByPriority用来维护jobs的执行优先级别。
之后JobTracker会调用resortPriority()函数,将jobs先按优先级别排序,再按提交时间排序,这样保证最高优先并且先提交的job会先执行。
2.3.1.3 JobTracker.JobInitThread 通知初始化线程然后JobTracker会把此job加入到一个管理需要初始化的队列里,即一个list成员变量jobInitQueue里。
通过此成员变量调用notifyAll()函数,会唤起一个用于初始化job的线程JobInitThread来处理(JobTracker会有几个内部的线程来维护jobs队列,它们的实现都在JobTracker代码里,稍候再详细介绍)。
JobInitThread收到信号后即取出最靠前的job,即优先级别最高的job,调用JobInProgress的initTasks()函数执行真正的初始化工作。
2.3.1.4 JobInProgress.initTasks() 初始化TaskInProgressTask的初始化过程稍复杂些,首先步骤JobInProgress会创建Map的监控对象。
在initTasks()函数里通过调用JobClient的readSplitFile()获得已分解的输入数据的RawSplit列表,然后根据这个列表创建对应数目的Map执行管理对象TaskInProgress。