当前位置:文档之家› study hadoop note

study hadoop note

Hadoop MapReduce 对外提供了5 个可编程组件:InputFormat、Mapper、Partitioner、Reducer 和OutputFormat
HDFS 是一个具有高度容错性的分布式文件系统,适合部署在廉价的机器上。HDFS 能
提供高吞吐量的数据访问,非常适合大规模数据集上的应用。
总体上采用了master/slave 架构,主要由以下几个组件组成:
Client、NameNode、SecondaryNameNode 和DataNode。
(1)Client
Client(代表用户)通过与NameNode 和DataNode 交互访问HDFS 中的文件。Client
提供了一个类似POSIX 的文件系统接口供用户调用。26
(2)NameNode
整个Hadoop 集群中只有一个NameNode。它是整个系统的“总管”,负责管理HDFS
的目录树和相关的文件元数据信息。这些信息是以“fsimage”(HDFS 元数据镜像文件)
和“ editlog”(HDFS 文件改动日志)两个文件 形式存放在本地磁盘,当 HDFS 重启时重
 在Hadoop 0.21.0 版本中,SecondaryNameNode 被Checkpoint Node 代替。
 在Hadoop 0.21.0 版本中,这两个文件被合并成一个。
新构造出来的。此外,NameNode 还负责监控各个DataNode 的健康状态,一旦发现某个
DataNode 宕掉,则将该DataNode 移出HDFS 并重新备份其上面的数据。
(3)Secondary NameNode
Secondary NameNode 最重要的任务并不是为NameNode 元数据进行热备份,而是定期
合并fsimage 和edits 日志,并传输给NameNode。这里需要注意的是,为了减小NameNode
压力,NameNode 自己并不会合并fsimage 和edits,并将文件存储到磁盘上,而是交由
Secondary NameNode 完成。
(4)DataNode
一般而言,每个Slave 节点上安装一个DataNode,它负责实际的数据存储,并将数据
信息定期汇报给NameNode。DataNode 以固定大小的block 为基本单位组织文件内容,默
认情况下block 大小为64MB。当用户上传一个大的文件到HDFS 上时,该文件会被切分成
若干个block,分别存储到不同的DataNode ;同时,为了保证数据可靠,会将同一个block
以流水线方式写到若干个(默认是3,该参数可配置)不同的DataNode 上。这种文件切割
后存储的过程是对用户透明的。

Hadoop MapReduce 也采用了Master/Slave(M/S)架构,具体如图2-5
所示。它主要由以下几个组件组成:Client、JobTracker、 TaskTracker 和Task。下面分别对
这几个组件进行介绍。


(1)Client
用户编写的MapReduce 程序通过Client 提交到JobTracker 端;同时,用户可通过Client 提
供的一些接口查看作业运行状态。在Hadoop 内部用“作业”(Job)表示MapReduce 程序。一个
MapReduce 程序可对应若干个作业,而每个作业会被分解成若干个Map/Reduce 任务(Task)。
(2)JobTracker
JobTracker 主要负责资源监控和作业调度。JobTracker 监控所有TaskTracker 与作业的
健康状况,一旦发现

失败情况后,其会将相应的任务转移到其他节点;同时,JobTracker 会
跟踪任务的执行进度、资源使用量等信息,并将这些信息告诉任务调度器,而调度器会在
资源出现空闲时,选择合适的任务使用这些资源。在Hadoop 中,任务调度器是一个可插拔
的模块,用户可以根据自己的需要设计相应的调度器。
(3)TaskTracker
TaskTracker 会周期性地通过Heartbeat 将本节点上资源的使用情况和任务的运行进度汇报
给JobTracker,同时接收JobTracker 发送过来的命令并执行相应的操作(如启动新任务、杀死
任务等)。TaskTracker 使用“slot”等量划分本节点上的资源量。“slot”代表计算资源(CPU、
内存等)。一个Task 获取到一个slot 后才有机会运行,而Hadoop 调度器的作用就是将各个
TaskTracker 上的空闲slot 分配给Task 使用。slot 分为Map slot 和Reduce slot 两种,分别供Map
Task 和Reduce Task 使用。TaskTracker 通过slot 数目(可配置参数)限定Task 的并发度。
(4)Task
Task 分为Map Task 和Reduce Task 两种,均由TaskTracker 启动。从上一小节中我们知道,
HDFS 以固定大小的block 为基本单位存储数据,而对于MapReduce 而言,其处理单位是split。
split 与block 的对应关系如图2-6 所示。split 是一个逻辑概念,它只包含一些元数据信息,比如
数据起始位置、数据长度、数据所在节点等。它的划分方法完全由用户自己决定。但需要注意
的是,split 的多少决定了Map Task 的数目,因为每个split 会交由一个Map Task 处理。\
InputFormat 主要用于描述输入数据的格式,它提供以下两个功能。
? 数据切分:按照某个策略将输入数据切分成若干个split,以便确定 Map Task 个数以
及对应的split。InputSplit 只记录了分片的元数据信息,比如起始位置、长度以及所在的
节点列表等。
InputSplit[] getSplits(JobConf job, int numSplits) throws IOException;
getSplits 方法主要完成数据切分的功能,它会尝试着将输入数据切分成numSplits 个
InputSplit。InputSplit 有以下两个特点。

splitSize = max{minSize, min{goalSize, blockSize}}
splitSize = max{minSize, min{maxSize, blockSize}}
? 逻辑分片 :它只是在逻辑上对输入数据进行分片,并不会在磁盘上将其切分成分片
进行存储。InputSplit 只记录了分片的元数据信息,比如起始位置、长度以及所在的
节点列表等。
? 可序列化:在 Hadoop 中,对象序列化主要有两个作用:进程间通信和永久存储。此
处,InputSplit 支持序列化操作主要是为了进程间通信。作业被提交到JobTracker 之
前,Client 会调用作业InputFormat 中的getSplits 函数,并将得到的InputSplit 序列
化到文件中。这样,当作业提交到JobTracker 端对作业初

始化时,可直接读取该文
件,解析出所有InputSplit,并创建对应的Map Task。

? 为 Mapper 提供输入数据:给定某个 split,能将其解析成一个个 key/value 对。
RecordReader getRecordReader(InputSplit split,
JobConf job,
Reporter reporter) throws IOException;
FileInputFormat:
1.文件切割算法: splitSize = max{minSize, min{goalSize, blockSize}}
如果想让InputSplit 尺寸大于block 尺寸,则直接增大配置参
数mapred.min.split.size 即可。
2。host 选择算法
Partitioner 的作用是对Mapper 产生的中间结果进行分片,以便将同一分组的数据交给
同一个Reducer 处理,它直接影响Reduce 阶段的负载均衡。旧版API 中Partitioner 的类图如
图3-20 所示。它继承了JobConfigurable,可通过configure 方法初始化。它本身只包含一
个待实现的方法getPartition。该方法包含三个参数,均由框架自动传入,前面两个参数是
key/value,第三个参数numPartitions 表示每个Mapper 的分片数,也就是Reducer 的个数。

MapReduce 提供了两个Partitioner 实现:HashPartitioner 和TotalOrderPartitioner。其
中HashPartitioner 是默认实现,它实现了一种基于哈希值的分片方法,代码如下:
public int getPartition(K2 key, V2 value,
int numReduceTasks) {
return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;
}
TotalOrderPartitioner 提供了一种基于区间的分片方法,通常用在数据全排序中。在
MapReduce 环境中,容易想到的全排序方案是归并排序,即在Map 阶段,每个Map Task
进行局部排序;在Reduce 阶段,启动一个Reduce Task 进行全局排序。由于作业只能有一
个Reduce Task,因而Reduce 阶段会成为作业的瓶颈。为了提高全局排序的性能和扩展性,
MapReduce 提供了TotalOrderPartitioner。它能够按照大小将数据分成若干个区间(分片),
并保证后一个区间的所有数据均大于前一个区间数据,这使得全排序的步骤如下:
步骤1 数据采样。在Client 端通过采样获取分片的分割点。Hadoop 自带了几个采样
算法, 如IntercalSampler、RandomSampler、SplitSampler 等( 具体见org.apache.hadoop.
mapred.lib 包中的InputSampler 类)。下面举例说明。
采样数据为:b,abc,abd,bcd,abcd,efg,hii,afd,rrr,mnk
经排序后得到:abc,abcd,abd,afd,b,bcd,efg,hii,mnk,rrr
如果Reduce Task 个数为4,则采样数据的四等分点为abd、bcd、mnk,将这3 个字符
串作为分割点.

Map 阶段。本阶段涉及两个组件,分别是Mapper 和Partitioner。其中,Mapper 可
采用IdentityMapper,直接将输入数据输出,但Partitioner 必须选用TotalOrderPartitioner,
它将步骤1 中获取的分割点保存到trie 树中以便快速定位任意一个记录所在的区间,这样,
每个Map Task 产生R(Reduce T

ask 个数)个区间,且区间之间有序。
TotalOrderPartitioner 通过trie 树查找每条记录所对应的Reduce Task 编号。如图3-21
所示,我们将分割点保存在深度为2 的trie 树中,假设输入数据中有两个字符串“abg”
和“mnz”,则字符串“abg”对应partition1,即第2 个Reduce Task,字符串“mnz”对应
partition3,即第4 个Reduce Task。

Reduce 阶段。每个Reducer 对分配到的区间数据进行局部排序,最终得到全
排序数据。
从以上步骤可以看出,基于TotalOrderPartitioner 全排序的效率跟key 分布规律和采样
算法有直接关系;key 值分布越均匀且采样越具有代表性,则Reduce Task 负载越均衡,全
排序效率越高。
TotalOrderPartitioner 有两个典型的应用实例:TeraSort 和HBase 批量数据导入。其中,
TeraSort 是Hadoop 自带的一个应用程序实例。它曾在TB 级数据排序基准评估中赢得第一
名,而TotalOrderPartitioner 正是从该实例中提炼出来的。HBase 是一个构建在Hadoop
之上的NoSQL 数据仓库。它以Region 为单位划分数据,Region 内部数据有序(按key 排
序), Region 之间也有序。很明显,一个MapReduce 全排序作业的R 个输出文件正好可对
应HBase 的R 个Region。

Reduce 阶段。每个Reducer 对分配到的区间数据进行局部排序,最终得到全
排序数据。
从以上步骤可以看出,基于TotalOrderPartitioner 全排序的效率跟key 分布规律和采样
算法有直接关系;key 值分布越均匀且采样越具有代表性,则Reduce Task 负载越均衡,全
排序效率越高。
TotalOrderPartitioner 有两个典型的应用实例:TeraSort 和HBase 批量数据导入。其中,
TeraSort 是Hadoop 自带的一个应用程序实例。它曾在TB 级数据排序基准评估中赢得第一
名,而TotalOrderPartitioner 正是从该实例中提炼出来的。HBase 是一个构建在Hadoop
之上的NoSQL 数据仓库。它以Region 为单位划分数据,Region 内部数据有序(按key 排
序), Region 之间也有序。很明显,一个MapReduce 全排序作业的R 个输出文件正好可对
应HBase 的R 个Region。






1. JobControl 编程实例
我们以第2 章中的贝叶斯分类为例介绍。一个完整的贝叶斯分类算法可能需要4 个有
依赖关系的MapReduce 作业完成,传统的做法是:为每个作业创建相应的JobConf 对象,
并按照依赖关系依次(串行)提交各个作业,如下所示:
// 为4 个作业分别创建JobConf 对象
JobConf extractJobConf = new JobConf(ExtractJob.class);
JobConf classPriorJobConf = new JobConf(ClassPriorJob.class);
JobConf conditionalProbilityJobConf = new JobConf(ConditionalProbilityJob.class);
JobConf predictJobConf = new JobConf(PredictJob.class);
...// 配置各个JobConf
// 按照依赖关系依次提交作业
JobClient.runJob(extractJobConf);

JobClient.runJob(classPriorJobConf);
JobClient.runJob(conditionalProbilityJobConf);
JobClient.runJob(predictJobConf);
如果使用JobControl,则用户只需使用addDepending() 函数添加作业依赖关系接口,
JobControl 会按照依赖关系调度各个作业,具体代码如下:
Configuration extractJobConf = new Configuration();
Configuration classPriorJobConf = new Configuration();
Configuration conditionalProbilityJobConf = new Configuration();
Configuration predictJobConf = new Configuration();
...// 设置各个Configuration
// 创建Job 对象。注意,JobControl 要求作业必须封装成Job 对象
Job extractJob = new Job(extractJobConf);
Job classPriorJob = new Job(classPriorJobConf);
Job conditionalProbilityJob = new Job(conditionalProbilityJobConf);
Job predictJob = new Job(predictJobConf);
// 设置依赖关系,构造一个DAG 作业
classPriorJob.addDepending(extractJob);
conditionalProbilityJob.addDepending(extractJob);
predictJob.addDepending(classPriorJob);
predictJob.addDepending(conditionalProbilityJob);
// 创建JobControl 对象,由它对作业进行监控和调度
JobControl JC = new JobControl("Native Bayes");
JC.addJob(extractJob);// 把4 个作业加入JobControl 中
JC.addJob(classPriorJob);
68 第二部分 MapReduce 编程模型篇
JC.addJob(conditionalProbilityJob);
JC.addJob(predictJob);
JC.run(); // 提交DAG 作业
在实际运行过程中,不依赖于其他任何作业的extractJob 会优先得到调度,一旦运行完
成,classPriorJob 和conditionalProbilityJob 两个作业同时被调度,待它们全部运行完成后,
predictJob 被调度。
对比以上两种方案,可以得到一个简单的结论:使用JobControl 编写DAG 作业更加简
便,且能使多个无依赖关系的作业并行运行。


相关主题
文本预览
相关文档 最新文档