当前位置:
文档之家› Hadoop的MapReduce中多文件输出-北风网
Hadoop的MapReduce中多文件输出-北风网
[Hadoop系列]Hadoop的MapReduce中多文件输出
2010-01-08 11:14 2883人阅读评论(4) 收藏举报inkfish原创,请勿商业性质转载,转载请注明来源(https://www.doczj.com/doc/07889494.html,/inkfish)。
Hadoop默认的输出是TextOutputFormat,输出文件名不可定制。hadoop 0.19.X中有一个org.apache.hadoop.mapred.lib.MultipleOutputFormat,可以输出多份文件且可以自定义文件名,但是从hadoop 0.20.x中MultipleOutputFormat所在包的所有类被标记为“已过时”,当前如果再使用MultipleOutputFormat,在将来版本的hadoop中可能无法使用。本篇文章中,我们自己实现一个简单的MultipleOutputFormat,并修改hadoop自带的WordCount 示例程序来测试结果。
环境:
Ubuntu 8.0.4 Server 32bit
Hadoop 0.20.1
JDK 1.6.0_16-b01
Eclipse 3.5
所有代码分为3个类:
1.LineRecordWriter:
RecordWriter的一个实现,用于把转化为一行文本。在Hadoop中,这个类作为TextOutputFormat的一个子类存在,protected访问权限,因此普通程序无法访问。这里仅仅是把LineRecordWriter从TextOutputFormat抽取出来,作为一个独立的公共类使用。
view plain
1.package inkfish.hadoop.study;
2.import java.io.DataOutputStream;
3.import java.io.IOException;
4.import java.io.UnsupportedEncodingException;
5.import org.apache.hadoop.io.NullWritable;
6.import org.apache.hadoop.io.Text;
7.import org.apache.hadoop.mapreduce.RecordWriter;
8.import org.apache.hadoop.mapreduce.TaskAttemptContext;
9.import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
10./**摘自{@link TextOutputFormat}中的LineRecordWriter。 */
11.public class LineRecordWriter extends RecordWriter {
12.private static final String utf8 = "UTF-8";
13.private static final byte[] newline;
14.static {
15.try {
16. newline = "/n".getBytes(utf8);
17. } catch (UnsupportedEncodingException uee) {
18.throw new IllegalArgumentException("can't find " + utf8 + " enco
ding");
19. }
20. }
21.protected DataOutputStream out;
22.private final byte[] keyValueSeparator;
23.public LineRecordWriter(DataOutputStream out, String keyValueSeparator)
{
24.this.out = out;
25.try {
26.this.keyValueSeparator = keyValueSeparator.getBytes(utf8);
27. } catch (UnsupportedEncodingException uee) {
28.throw new IllegalArgumentException("can't find " + utf8 + " enco
ding");
29. }
30. }
31.public LineRecordWriter(DataOutputStream out) {
32.this(out, "/t");
33. }
34.private void writeObject(Object o) throws IOException {
35.if (o instanceof Text) {
36. Text to = (Text) o;
37. out.write(to.getBytes(), 0, to.getLength());
38. } else {
39. out.write(o.toString().getBytes(utf8));
40. }
41. }
42.public synchronized void write(K key, V value) throws IOException {
43.boolean nullKey = key == null || key instanceof NullWritable;
44.boolean nullValue = value == null || value instanceof NullWritable;
45.if (nullKey && nullValue) {
46.return;
47. }
48.if (!nullKey) {
49. writeObject(key);
50. }
51.if (!(nullKey || nullValue)) {
52. out.write(keyValueSeparator);
53. }
54.if (!nullValue) {
55. writeObject(value);
56. }
57. out.write(newline);
58. }
59.public synchronized void close(TaskAttemptContext context) throws IOExce
ption {
60. out.close();
61. }
62.}
2.MultipleOutputFormat:
抽象类,主要参考org.apache.hadoop.mapred.lib.MultipleOutputFormat。子类唯一需要实现的方法是:String generateFileNameForKeyValue(K key, V value, Configuration conf),即通过key和value及conf配置信息决定文件名(含扩展名)。
view plain
1.package inkfish.hadoop.study;
2.import java.io.DataOutputStream;
3.import java.io.IOException;
4.import java.util.HashMap;
5.import java.util.Iterator;
6.import org.apache.hadoop.conf.Configuration;
7.import org.apache.hadoop.fs.FSDataOutputStream;
8.import org.apache.hadoop.fs.Path;
9.import org.apache.hadoop.io.Writable;
10.import org.apache.hadoop.io.WritableComparable;
11.import https://www.doczj.com/doc/07889494.html,pressionCodec;
12.import https://www.doczj.com/doc/07889494.html,press.GzipCodec;
13.import org.apache.hadoop.mapreduce.OutputCommitter;
14.import org.apache.hadoop.mapreduce.RecordWriter;
15.import org.apache.hadoop.mapreduce.TaskAttemptContext;
16.import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
17.import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
18.import org.apache.hadoop.util.ReflectionUtils;
19.public abstract class MultipleOutputFormat,
V extends Writable>
20.extends FileOutputFormat {
21.private MultiRecordWriter writer = null;
22.public RecordWriter getRecordWriter(TaskAttemptContext job) throws
IOException,
23. InterruptedException {
24.if (writer == null) {
25. writer = new MultiRecordWriter(job, getTaskOutputPath(job));
26. }
27.return writer;
28. }
29.private Path getTaskOutputPath(TaskAttemptContext conf) throws IOExcepti
on {
30. Path workPath = null;
31. OutputCommitter committer = super.getOutputCommitter(conf);
32.if (committer instanceof FileOutputCommitter) {
33. workPath = ((FileOutputCommitter) committer).getWorkPath();
34. } else {
35. Path outputPath = super.getOutputPath(conf);
36.if (outputPath == null) {
37.throw new IOException("Undefined job output-path");
38. }
39. workPath = outputPath;
40. }
41.return workPath;
42. }
43./**通过key, value, conf来确定输出文件名(含扩展名)*/
44.protected abstract String generateFileNameForKeyValue(K key, V value, Co
nfiguration conf);
45.public class MultiRecordWriter extends RecordWriter {
46./**RecordWriter的缓存*/
47.private HashMap> recordWriters = null;
48.private TaskAttemptContext job = null;
49./**输出目录*/
50.private Path workPath = null;
51.public MultiRecordWriter(TaskAttemptContext job, Path workPath) {
52.super();
53.this.job = job;
54.this.workPath = workPath;
55. recordWriters = new HashMap>();
56. }
57.@Override
58.public void close(TaskAttemptContext context) throws IOException, In
terruptedException {
59. Iterator> values = this.recordWriters.values(
).iterator();
60.while (values.hasNext()) {
61. values.next().close(context);
62. }
63.this.recordWriters.clear();
64. }
65.@Override
66.public void write(K key, V value) throws IOException, InterruptedExc
eption {
67.//得到输出文件名
68. String baseName = generateFileNameForKeyValue(key, value, job.ge
tConfiguration());
69. RecordWriter rw = this.recordWriters.get(baseName);
70.if (rw == null) {
71. rw = getBaseRecordWriter(job, baseName);
72.this.recordWriters.put(baseName, rw);
73. }
74. rw.write(key, value);
75. }
76.// ${mapred.out.dir}/_temporary/_${taskid}/${nameWithExtension}
77.private RecordWriter getBaseRecordWriter(TaskAttemptContext jo
b, String baseName)
78.throws IOException, InterruptedException {
79. Configuration conf = job.getConfiguration();
80.boolean isCompressed = getCompressOutput(job);
81. String keyValueSeparator = ",";
82. RecordWriter recordWriter = null;
83.if (isCompressed) {
84. Class extends CompressionCodec> codecClass = getOutputComp
ressorClass(job,
85. GzipCodec.class);
86. CompressionCodec codec = ReflectionUtils.newInstance(codecCl
ass, conf);
87. Path file = new Path(workPath, baseName + codec.getDefaultEx
tension());
88. FSDataOutputStream fileOut = file.getFileSystem(conf).create
(file, false);
89. recordWriter = new LineRecordWriter(new DataOutputStre
am(codec
90. .createOutputStream(fileOut)), keyValueSeparator);
91. } else {
92. Path file = new Path(workPath, baseName);
93. FSDataOutputStream fileOut = file.getFileSystem(conf).create
(file, false);
94. recordWriter = new LineRecordWriter(fileOut, keyValueS
eparator);
95. }
96.return recordWriter;
97. }
98. }
99.}
3.WordCount:
基本上维持hadoop示例中的WordCount原样,主要增加一个静态内部类AlphabetOutputFormat,这个类实现了MultipleOutputFormat,文件命名规则是:以英文字母开头的单词以“首字母.txt”为文件名保存,其他以“other.txt”保存。
view plain
1.package inkfish.hadoop.study;
2.import java.io.IOException;
3.import java.util.StringTokenizer;
4.import org.apache.hadoop.conf.Configuration;
5.import org.apache.hadoop.fs.Path;
6.import org.apache.hadoop.io.IntWritable;
7.import org.apache.hadoop.io.Text;
8.import org.apache.hadoop.mapreduce.Job;
9.import org.apache.hadoop.mapreduce.Mapper;
10.import org.apache.hadoop.mapreduce.Reducer;
11.import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
12.import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
13.import org.apache.hadoop.util.GenericOptionsParser;
14.public class WordCount {
15.public static class TokenizerMapper extends Mapper