你好,游客 登录
背景:
阅读新闻

Hadoop基础之MapReduce原理、序列化和源码分析

[日期:2014-11-19] 来源:CSDN博客  作者:Fortyone41 [字体: ]

3. Writable接口以及常用的Writable实现类

3.1. Writable接口

根据DataInput和DataOutput实现的简单的、有效的序列化对象。

MapReduce的任意key和value都必须实现Writable接口并实现write()和readFields()方法:

write 是把每个对象序列化到输出流

readFields是把输入流字节反序列化

MapReduce的任意key都必须实现WritableComparable接口:

3.2. 常用的Writable实现类

Java基本类型与hadoop中类型之间的转换如下图所示:

3.3.基于文件的存储结构

SequenceFile 无序存储

MapFile 会对key建立索引文件,value按key顺序存储

基于MapFile的结构有:

ArrayFile 像我们使用的数组一样,key值为序列化的数字

SetFile 他只有key,value为不可变的数据

BloomMapFile 在 MapFile 的基础上增加了一个 /bloom 文件,包含的是二进制的过滤表,在每一次写操作完成时,会更新这个过滤表。

3.4.自定义WritableKpi

package mavshuang.mapreduce;
 
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.net.URI;
 
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 
public class KpiDemo {
public static final String INPUT_PATH = "hdfs://hadoop0:9000/kpi";
public static final String OUT_PATH = "hdfs://hadoop0:9000/out";
 
public static void main(String[] args) throws Exception {
final Configuration configuration = new Configuration();
//final FileSystem fileSystem = FileSystem.get(configuration);//这是导致出错的原因
final FileSystem fileSystem = FileSystem.get(new URI(INPUT_PATH), configuration);
// 判断输出的路径是否已经存在
if (fileSystem.exists(new Path(OUT_PATH))) {
fileSystem.delete(new Path(OUT_PATH), true);
}
      
final String jobName = KpiDemo.class.getSimpleName();
      
final Job job = new Job(configuration,jobName);
      
FileInputFormat.setInputPaths(job, new Path(INPUT_PATH));
      
job.setMapperClass(MyMapper.class);
      
job.setReducerClass(MyReducer.class);
      
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(KpiWritable.class);
      
FileOutputFormat.setOutputPath(job, new Path(OUT_PATH));
      
job.waitForCompletion(true);
}
 
static class MyMapper extends Mapper<LongWritable, Text, Text, KpiWritable> {
 
protected void map(LongWritable key, Text value, org.apache.hadoop.mapreduce.Mapper<LongWritable,Text,Text,KpiWritable>.Context context) throws java.io.IOException ,InterruptedException {
final String[] splited = value.toString().split("\t");
final String phoneNo = splited[1];
final Text k2 = new Text(phoneNo);
      
final KpiWritable v2 = new KpiWritable(splited[6], splited[7], splited[8], splited[9]);
      
context.write(k2, v2);
};
}
 
static class MyReducer extends Reducer<Text, KpiWritable, Text, KpiWritable> {
 
protected void reduce(Text k2, java.lang.Iterable<KpiWritable> v2s, org.apache.hadoop.mapreduce.Reducer<Text,KpiWritable,Text,KpiWritable>.Context context) throws IOException ,InterruptedException {
long upPackNum = 0L;
long downPackNum = 0L;
long upPayLoad = 0L;
long downPayLoad = 0L;
      
for (KpiWritable kpiWritable : v2s) {
upPackNum += kpiWritable.upPackNum;
downPackNum += kpiWritable.downPackNum;
upPayLoad += kpiWritable.upPayLoad;
downPayLoad += kpiWritable.downPayLoad;
}
      
final KpiWritable v3 = new KpiWritable(upPackNum, downPackNum, upPayLoad, downPayLoad);
      
context.write(k2, v3);
};
}
}
 
class KpiWritable implements Writable {
long upPackNum;// 上传数据包数,单位是个
long downPackNum;// 下载数据包数,单位是个
long upPayLoad;// 上行总流量,单位是bytes
long downPayLoad;// 下行总流量,单位是bytes
 
public KpiWritable() {
}
 
public KpiWritable(long upPackNum, long downPackNum, long upPayLoad, long downPayLoad) {
this.upPackNum = upPackNum;
this.downPackNum = downPackNum;
this.upPayLoad = upPayLoad;
this.downPayLoad = downPayLoad;
}
 
public KpiWritable(String upPackNum, String downPackNum, String upPayLoad, String downPayLoad) {
this(Long.parseLong(upPackNum), Long.parseLong(downPackNum), Long.parseLong(upPayLoad), Long.parseLong(downPayLoad));
}
 
public void readFields(DataInput in) throws IOException {
this.upPackNum = in.readLong();
this.downPackNum = in.readLong();
this.upPayLoad = in.readLong();
this.downPayLoad = in.readLong();
}
 
public void write(DataOutput out) throws IOException {
out.writeLong(this.upPackNum);
out.writeLong(this.downPackNum);
out.writeLong(this.upPayLoad);
out.writeLong(this.downPayLoad);
}
 
public String toString() {
return upPackNum + "\t" + downPackNum + "\t" + upPayLoad + "\t" + downPayLoad;
}
 
}

出现如下错误:

//final FileSystem fileSystem = FileSystem.get(configuration);// 这是导致出错的原因

正确的运行结果如下所示:

查看运行后生成的文件:

4.MapReduce的执行过程源码分析

4.1.InputFormat

InputFormat负责处理MR的输入部分,有三个作用:

l  验证作业的输入是否规范,即验证输入信息的合法性,包括输入路径是否存在等;

l  把输入文件切分成InputSpilt,即把HDFS中的文件按照一定规则拆分成InputSpilt,每个InputSpilt由一个Mapper执行;

l  提供RecordReader,把InputSpilt中的每一行解析出来供map函数处理。

4.2. FileInputFormat

FileInputFormat是所有以文件作为数据源的InputFormat实现的基类,FileInputFormat保存作为Job输入的所有文件,并实现了对输入文件计算splits的方法;而获取记录的方法是由TextInputFormat进行实现的。

在上图中,第247行计算minSize,是提供给后面计算使用的,其中getFormatMinSplitSize()方法的值是1, getMinSplitSize (job) 方法的值由配置参数 mapred.min.split.size 指定的,默认是1,所以minSize的值就是1。第248行计算maxSize,也是同后面使用的,值由 mapred.max.split.size 指定,默认值是Long的最大值。在第252行files列表中存放的是输入文件,可以有多个。从第253行开始,循环处理每一个输入文件;第254行是 获取文件路径,L256获取文件长度,L257获取文件块位置。如果文件非空,并且文件允许被分割为输入块,从而进入L258中。

从上图看出输入块size由三个因素决定,分别为 blockSize, minSize,maxSize ,根据之前的数值,输入分片的默认Size就是文件块Size。

4.3.InputSplit

l  在执行MapReduce之前,原始数据被分割成若干spilt,每个split作为一个map任务的输入,在map执行过程中split会被分解成一个个记录(key-value对),map会一次处理每一个记录;

l  FileInputFormat只划分比HDFS block大的文件;如果一个文件的大小比block小,将不会被划分,这是Hadoop处理大文件的效率比处理很多小文件效率高的原因;

l  当Hadoop处理很多小文件时候,由于FileInputFormat不会对小文件进行划分,所以每一个小文件都会被当做一个spilt并分配一个Map任务,从而导致效率低下。

4.4.TextInputFormat

l  TextInputFormat是默认处理类,处理普通文本文件;文件中每一行作为一个记录,将每一行在文件中的起始偏移量作为key,每一行的内容作为value,默认以\n或者回车键作为一行记录;

l  TextInputFormat继承FileInputFormat。

InputFormat类的层次结构:

4.5.其他输入类

CombineFileInputFormat:处理小文件;

KeyValueTextInputFormat:当输入数据的每一行是两列,并且使用Tab键分离的时候;

NLineInputFormat:控制在每个split中数据的行数;

SequenceFileInputFormat:当输入文件格式是sequencefile的时候要使用SequenceFileInputFormat作为输入。

4.6.自定义输入格式

继承FileInputFormat基类;

重写getSplits(JobContextjob)方法。

4.7.Hadoop的输出

TextOutputFormat:默认的输出格式,key和value中间值使用Tab键隔开;

在上图中发现,当文本输出的时候使用UTF-8编码,从L47中可看出,划分行的符号是”\n”,从L65中可以看出,输出的键值对默认是以制表符(Tab)分割的。

SequenceFileOutputFormat:将key和value以sequenceFile格式输出;

SequenceFileAsBinaryOutputFormat:将key和value以原始二进制的格式输出;

MapFileOutputFormat:将key和value写入MapFile中,由于MapFile中的key是有序的,所以写入的时候必须保证记录是按key值顺序写入的。

MultipleOutputFormat:默认情况下一个reducer会产生一个输出,此类可以实现一个reducer产生多个输出。





收藏 推荐 打印 | 录入: | 阅读:
本文评论   查看全部评论 (0)
表情: 表情 姓名: 字数
点评:
       
评论声明
  • 尊重网上道德,遵守中华人民共和国的各项有关法律法规
  • 承担一切因您的行为而直接或间接导致的民事或刑事法律责任
  • 本站管理人员有权保留或删除其管辖留言中的任意内容
  • 本站有权在网站内转载或引用您的评论
  • 参与本评论即表明您已经阅读并接受上述条款