你好,游客 登录
背景:
阅读新闻

Hadoop 学习记录之基础篇

[日期:2018-03-29] 来源:Latte_z博客  作者: [字体: ]

经历了一个学期的洗礼之后,最终我还是选择了走大数据处理这条道路,个人觉得自己不是一个愿意扎实看论文潜心研究的人,所以机器学习->深度学习这条路不是特别适合我,还是更加愿意去写一些工程代码锻炼自己的能力。

1、MapReduce的理论基础

​ 那么什么是 MapReduce 呢?MapReduce 就是一种“分治”的思想,把一个大规模的数据操作,分解成一个个小数据集操作,同时分发到一个主节点管理的集群中进行任务工作。然后再把各个节点完成的工作合并在一起,这就得到了最终的结果。总得来说,MapReduce 就是 Map + Reduce,即任务的“分解”与结果的“汇总”。

hadoop 中共有两种任务机器角色,一种是 JobTracker,一种是 TaskTracker,顾名思义,JobTracker 是用于工作的调度,而 TaskTracker 是用于执行工作的。其中,一个集群中只有一台 JobTracker。

​ MapReduce 框架负责了并行计算中的诸多问题,比如分布式存储、工作调度、负载均衡、容错均衡、容错处理以及网络通信等问题,抽象成了以上提到的 Map + Reduce 两个方法函数。

2、MapReduce 处理过程

​ 整个过程可以简单做成以上的流程,『 原始数据 -> 分割 -> map -> map 端排序 -> Combine -> Reduce 端排序 -> Reduce 输出 』,具体的内容,我们通过一个 WordCount 例子来说明。

3、WordCount 解析

​ WordCount是一个非常简单,但是又很能够体现 MapReduce 思想的程序,这个程序被 Hadoop 内置作为了一个测试程序,功能很简单,就是统计一个输入文件内每个单词的个数。

​ 我们暂时利用 maven 导入的 Hadoop 环境作为测试平台,pom 文件如下:

<dependencies>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.12</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-common</artifactId>
            <version>2.6.4</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-hdfs</artifactId>
            <version>2.6.4</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-mapreduce-client-jobclient</artifactId>
            <version>2.6.4</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>2.6.4</version>
        </dependency>
    </dependencies>
/* 这里我们使用新版 API 来编写代码,原因如下:
     * 新的API倾向于使用抽象类,而不是接口,因为这更容易扩展。
     * 例如,你可以添加一个方法(用默认的实现)到一个抽象类而不需修改类之前的实现方法。在新的API中,Mapper和Reducer是抽象类。
     *
     * 新的API广泛使用context object(上下文对象),并允许用户代码与MapReduce系统进行通信。
     * 例如,MapContext基本上充当着JobConf的OutputCollector和Reporter的角色。
     */
public class WordCount{

    /*
     * Hadoop 提供了如下内容数据类型,均实现了 WritableComparable 接口,以便用这些类型定义的数据可以被序列化,用以网络传输和文件存储
     * BooleanWritable: 标准布尔
     * ByteWritable: 单字节
     * DoubleWritable: 双字节
     * FloatWritable: 浮点数
     * IntWritable: 整型数
     * LongWritable: 长整型数
     * Text: UTF8 格式存储的文本
     * NullWritable: <key,value> 中 key 或 value 为空的时候使用
     */

    // KEYIN, VALUEIN, KEYOUT, VALUEOUT
    public static class TokenizerMapperextends Mapper<Object,Text,Text,IntWritable>{
        private final static IntWritable one = new IntWritable(1);
        private Text word = new Text();

        @Override
        public void map(Object key, Text value, Context context)throws IOException, InterruptedException {
            // map 方法中,value 值存储的是文本文件中的一行(回车符为行结束标记)
            // key 值为该行首字母相对于文本文件首字母偏移量

            /*
             * 分割过程,例如:
             * Hello World      ->      <0, "Hello World">
             * Bye World        ->      <12, "Bye World">
             * 其中偏移量包括了回车所占的字符数(注意 Windows 和 Linux/macOS/*nix 环境下换行符的差别)
             * Windows: \r\n
             * Linux/macOS/*nix: \n
             * 老版本的 Mac OS, OS X: \r
             */

            StringTokenizer itr = new StringTokenizer(value.toString()); // StringTokenizer 将每一行拆分成一个个单词

            /*
             * Map 过程,例如:
             * <0, "Hello World>      ->      <Hello, 1> + <World, 1>
             * <12, "Bye World>       ->      <Bye, 1> + <World, 1>
             */

            while (itr.hasMoreTokens()) {
                word.set(itr.nextToken());
                // 以 <word, 1> 作为方法结果输出,等于是做了一个词频的统计
                context.write(word, one);
                // 剩余工作交给 MapReduce 框架处理
            }

            /*
             * 得到 map 方法输出的 <key,value> 后,Mapper 对这一些键值对按照 key 值进行排序(字典序)
             * 再执行 Combine 过程,将 key 值相同的 value 累加,得到 Mapper 最终输出结果
             * <Bye, 1>         ->      <Bye, 1>
             * <Hello, 1>       ->      <Hello, 1>
             * <World, 1>       ->      <World, 2>
             * <World, 1>            ↗
             */
        }
    }

    // KEYIN, VALUEIN, KEYOUT, VALUEOUT
    public static class IntSumReducerextends Reducer<Text,IntWritable,Text,IntWritable>{
        private IntWritable result = new IntWritable();

        @Override
        public void reduce(Text key, Iterable<IntWritable> values, Context context)throws IOException, InterruptedException {
            // key 为单个单词,values 则是对应单词计数值组成的列表
            // reduce 的过程就是遍历 values 求和的操作,得到某个单词的总词数

            /*
             * 首先 Reducer 对从 Mapper 接收的数据进行排序操作,再交给用户重写的 reduce 方法进行处理
             * 得到新的 <key,value> 对,作为 WordCount 的输出结果
             * <Bye, list(1,1)>       ->      <Bye, 2>
             * <Hadoop, list(2)>      ->      <Hadoop, 1>
             * <Hello, list(1,1)>     ->      <Hello, 2>
             * <Word, list(2)>        ->      <Word, 2>
             */

            int sum = 0;
            for (IntWritable val : values) {
                sum += val.get();
            }
            result.set(sum);
            context.write(key, result);
        }
    }

    public static void main(String[] args)throws Exception {
        // 用 Configuration 类对 MapReduce Job 进行一个初始化
        Configuration conf = new Configuration();
        String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
        if (2 != otherArgs.length) {
            System.err.println("Usage: wordcount <in> <out>");
            System.exit(2);
        }

        // 由 Job 对象负责管理和运行一个计算任务,通过 Job 的一些方法来对任务参数进行相关设置
        // 对 Job 进行命名操作,使得在 JobTracker 和 TaskTracker 页面进行监视
        Job job = Job.getInstance(conf, "word count");

        // 设置主类
        job.setJarByClass(WordCount.class);

        // 设置 Job 的 Map(拆分操作)、Combiner(中间结果合并)、Reduce(合并操作)三个相关处理类
        // 原始数据 -> 分割 -> map -> map 端排序 -> Combine -> Reduce 端排序 -> Reduce 输出
        job.setMapperClass(TokenizerMapper.class);
        job.setCombinerClass(IntSumReducer.class);
        job.setReducerClass(IntSumReducer.class);

        // 设置 Job 输出结果,为 <key,value>
        // 本例子中是 <单词,个数>,故 key 为 Text 型,Value 为 IntWritable 类型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);

        // 设置输入输出的路径
        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        // 等待任务结束后退出
        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
}




收藏 推荐 打印 | 录入:Cstor | 阅读:
本文评论   查看全部评论 (0)
表情: 表情 姓名: 字数
点评:
       
评论声明
  • 尊重网上道德,遵守中华人民共和国的各项有关法律法规
  • 承担一切因您的行为而直接或间接导致的民事或刑事法律责任
  • 本站管理人员有权保留或删除其管辖留言中的任意内容
  • 本站有权在网站内转载或引用您的评论
  • 参与本评论即表明您已经阅读并接受上述条款