你好,游客 登录 注册 搜索
背景:
阅读新闻

浅谈Hadoop

[日期:2016-07-05] 来源:简书  作者:codertom [字体: ]

Hadoop是一个十分流行的分布式存储和计算框架,也是业内大数据处理和分析最通用的框架之一。

Hadoop

hadoop_icon.png

Hadoop2.0 由HDFS(Hadoop Distributed File System)、MapReduce和Yarn三部分组成。

Hadoop的设计原型来源于google的三篇论文,即GFS、MapReduce和BigTable,同时作为Lucene的子项目Nutch的一部分在2005年引入Apache,Hadoop的得名是Hadoop之父Doug Cutting儿子的玩具,值得一提的是他的妻子叫Lucene。

Hadoop生态

hbase生态.png

"永远不把鸡蛋放在一个篮子里"——HDFS

在分布式的文件系统之前,往往使用大型机和存储服务器去做存储,无论大型机和存储服务器都是十分昂贵的,同时也是有瓶颈的,横向扩展能力很差。而分布式文件系统的横向扩展能力以及容错性十分好,也越来越受到人们的青睐。HDFS的定位是用比较廉价的机器,做高可用的海量数据的存储。主要采用多副本的分块存储机制,在部分机器宕机或数据损坏的情况下,依然能提供可靠服务。

  1. 集群拓扑
hdfs.jpg
  • NameNode:文件元数据存储节点,Hadoop1中存在单点问题,Hadoop2中通过备用节点实现高可用,同时有元数据存储瓶颈,不适于存储小文件。

  • DataNode:数据节点,单文件被分成多块,每一块多副本跨机架、机房存储,保证数据的高可用。

  • Block:文件的存储单位,单个文件可被分成多块,默认为128M。同时也是MapReduce默认的输入块大小。文件读取流程:

hdfsread.png

open DistributedFileSystem 去NameNode获取文件的块列表,NameNode根据Client距离各节点的网络距离给出Block列表。

根据Block列表去一次读取文件,读取后在Client进行文件汇总。

  1. 写入流程:
hdfs_write.png
  • 实例化DistributeFileSystem,在NameNode进行写文件的申请,申请成功后创建元数据,并返回数据存放的位置信息。

  • 过位置信息,对DataNode进行流式写入,将数据分成多个数据包,作为一个数据队列。写入时每次取一个数据包,写入全部副本,且三个节点均写入成功,则返回ACK信号表名当前buff写入成功,Client内部维护着数据包的Ack队列,收到Ack之后会移除这个数据包。

  • 最后想NameNode发送Complete信号,确认文件写入全部成功。如果中途节点写失败:写入部分数据的节点将在管线中移除,同时后续恢复正常后会删除这部分数据。随后写入后续两个节点,原则上写入一个块的时候就可以写成功,因为namenode发现数据不一致会做复制操作。

"分而治之"——MapReduce

分布式计算出现之前,数据的计算往往依靠性能比较好的单机计算。但是单机受限于本身的计算资源,往往计算速度都不如人意。

一天小明接到产品的一个需求:

产品:小明啊,这里有一天的日志信息,大概5个G,我要统计一下一共有多少。

小明:OK啊,就5个G,一个shell搞定,看我 cat * | wc -l,我简直就是个天才。

产品:对不起啊小明,需求变了,一天的看不出来效果,我需要统计1个月的数据,大概有150G。

小明:有点大啊,不怕,我线上服务器内存120G,40核,看我用多线程搞定,过了2个小时,终于搞 定了还有点费劲。

产品:我保证这是我最后一次变更需求,我想要最近一年的数据1800G左右。

小明:数据上T了,搞不定了啊。

上面的例子告诉我们,在大数量的场景下,高性能的单机有时也是解决不了问题。所以我们就需要MapReduce帮助我们。

MapReduce是一种采用分治和规约的一种并行的批处理框架,先将数据做分割计算,最后汇总结果。看上去和多线程的处理机制一样,但是Hadoop将它封装在了框架中,编程十分简单,吞吐量十分高,目前支持Java、C++、Python等多种API编程。

1.MapReduce运行模型总体概览:

mapreduceAllGraph.png

  • InputSplit: InputSplit是单个map任务的输入文件片,默认文件的一个block。

  • Map函数:数据处理逻辑的主体,用开发者开发。

  • Partition:map的结果发送到相应的reduce。

  • Combain:reduce之前进行一次预合并,减小网络IO。当然,部分场景不适合。

  • Shuffle:map输出数据按照Partition分发到各个reduce。

*reduce:将不同map汇总来的数据做reduce逻辑。

2.多reduce:

datatrans.png

3.经典wordcount:

wordcountdatatrans.png

mapreducedataStream.png

4.Map类的实现:

  • 必须继承org.apache.hadoop.mapreduce.Mapper 类

  • map()函数,对于每一个输入K/V都会调用一次map函数,逻辑实现(必须)。

  • setup()函数,在task开始前调用一次,做maptask的一些初始化工作,如连接数据库、加载配置(可选)。

  • cleanup()函数,在task结束前调用一次,做maptask的收尾清理工作,如批处理的收尾,关闭连接等(可选)

  • Context上下文环境对象,包含task相关的配置、属性和状态等。

5.Reduce类的实现:

  • 必须继承org.apache.hadoop.mapreduce.Reducer类。

  • reduce(key, Iterable<>values,Context context)对于每一个key值调用一次reduce函数。

  • setup():在task开始前调用一次,做reducetask的一些初始化工作。

  • cleanup():在task结束时调用一次,做reducetask的收尾清理工作。

6.作业整体配置:

  • 参数解析:String[]otherArgs= new GenericOptionsParser(conf, args).getRemainingArgs();

  • 创建job: Jobjob= Job.getInstance(conf, "word count");

  • 设置map类,reduce类。

  • 设置map和reduce输出的KV类型,二者输出类型一致的话则可以只设置Reduce的输出类型。

  • 设置reduce的个数 :默认为1,综合考虑,建议单个reduce处理数据量<10G。不想启用reduce设置为0即可。

  • 设置InputFormat

  • 设置OutputFormat

  • 设置输入,输出路径。

  • job.waitForCompletion(true) (同步提交)和job.submit()(异步提交)

wordcount:

```public class WordCountTask {

private static final Logger logger = Logger.getLogger(WordCountTask.class);

public static class WordCountMap extends Mapper<Object, Text, Text, IntWritable>{

private static final IntWritable one = new IntWritable(1);

private Text word = new Text();

@Override

protected void cleanup(Context context)

throws IOException, InterruptedException {

logger.info("mapTaskEnd.....");

}

    protected void map(Object key, Text value, Context context)
            throws IOException, InterruptedException {
           StringTokenizer itr = new StringTokenizer(value.toString());
            while (itr.hasMoreTokens()) {
              this.word.set(itr.nextToken());
              context.write(this.word, one);
            }
        }

    @Override
    protected void setup(Context context)
            throws IOException, InterruptedException {
        logger.info("mapTaskStart.....");
    }

}

public static class WordCountReduce extends Reducer<Text, IntWritable, Text, IntWritable>{

    private IntWritable result = new IntWritable();

      public void reduce(Text key, Iterable<IntWritable> values, Context context)
        throws IOException, InterruptedException
      {
        int sum = 0;
        for (IntWritable val : values) {
          sum += val.get();
        }
        this.result.set(sum);
        context.write(key, this.result);
      }
}


 public static void main(String[] args)
            throws Exception
          {
            Configuration conf = new Configuration();
            String[] otherArgs = new GenericOptionsParser(conf,     args).getRemainingArgs();
            if (otherArgs.length < 2) {
              System.err.println("Usage: wordcount <in> [<in>...] <out>");
              System.exit(2);
            }
            Job job = Job.getInstance(conf, "word count");
            job.setJarByClass(WordCountTask.class);
            job.setMapperClass(WordCountMap.class);
            job.setReducerClass(WordCountReduce.class);
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(IntWritable.class);
            FileSystem fs = FileSystem.get(conf);
            for (int i = 0; i < otherArgs.length - 1; i++) {
              FileInputFormat.addInputPath(job, new Path(otherArgs[i]));
            }
            if(fs.exists(new Path(otherArgs[otherArgs.length - 1]))){
                fs.delete(new Path(otherArgs[otherArgs.length - 1]));
              }
            FileOutputFormat.setOutputPath(job, new Path(otherArgs[(otherArgs.length - 1)]));
            job.setNumReduceTasks(1);
            System.exit(job.waitForCompletion(true) ? 0 : 1);
          }
}

6.提交:

​hadoop jar hadoop-examples.jar demo.wordcount(主类名) Dmapreduce.job.queuename=XX(系统参数) input output

​缺点:无定时调度

  1. 常用的InputFormat:

  2. TextInputFormat key:行便宜 value:文本内容,split计算:splitSize=max("mapred.min.split.size",min("mapred.max.split.size",blockSize)) mapred.min.split.size 在大量文本输入的情况下,需要控制map的数量,可以调此选项。

  3. CombineTextInputFormat(集群默认),多个小文件分片送到一个map中处理,主要解决多个小文件消耗map资源的问题。

  4. sequenceFileInputFormat,采用自己的序列化方式,通常文件名为key,value为文件内容,可在存储上解决小文件对namenode的影响。





收藏 推荐 打印 | 录入:elainebo | 阅读:
本文评论   查看全部评论 (0)
表情: 表情 姓名: 字数
点评:
       
评论声明
  • 尊重网上道德,遵守中华人民共和国的各项有关法律法规
  • 承担一切因您的行为而直接或间接导致的民事或刑事法律责任
  • 本站管理人员有权保留或删除其管辖留言中的任意内容
  • 本站有权在网站内转载或引用您的评论
  • 参与本评论即表明您已经阅读并接受上述条款