你好,游客 登录
背景:
阅读新闻

Hadoop - MapReduce

[日期:2017-07-10] 来源:公众账号  作者: [字体: ]

通过MapReduce框架,我们可以编写应用程序在商用机器集群上以可靠的方式并行处理大量的数据。

MapReduce是什么?

MapReduce是基于java的分布式计算程序模型和处理技术。 MapReduce算法包含两个重要的任务,即Map和Reduce。 Map接受一组数据并将其转换为另一组数据,这些独立的元素分解成元组(键/值对)。 然后是reduce任务,它接受map的输出作为输入,并将这些数据元组组成一组更小的元组。 就像MapReduce的名字所暗示的那样,reduce任务总是在map之后执行。

MapReduce的主要优势是,它很容易在多个计算节点上作大规模的数据处理。 在MapReduce模式下,数据处理原语被称为mappers和reducers。 将数据处理应用程序分解为mappers和reducers有时是不容易的。 但是,一旦我们以MapReduce形式编写应用程序,那么扩展应用程序,让它运行在成百上千,甚至上万的机器集群中只是一个修改配置的问题。 正是这一点可伸缩性吸引了许多程序员使用MapReduce模型。

算法

  • 通常MapReduce范例基于将计算实体发送到数据所在的地方。

  • MapReduce程序执行分三个阶段,即map阶段, shuffle阶段,和reduce阶段。

map阶段 :map或mapper的工作是处理输入数据。 一般输入数据是以文件或目录的形式存在,存储在hadoop文件系统(HDFS)。 输入文件逐行传递给mapper函数。mapper处理数据并创建一些小数据块。

reduce阶段 :这个阶段是Shuffle 阶段和 Reduce阶段的组合。Reducer的工作是处理来自于mapper的数据。 处理完成后,生成一组新的输出存储到HDFS中。

  • MapReduce任务期间,Hadoop 发送Map和Reduce任务给集群中相应的服务器。

  • 该框架管理有关数据传递的所有细节,如发布任务,验证任务完成,在集群的节点之间复制数据。

  • 大部分的计算发生在本地节点,这些节点在磁盘中存储有数据,这减少了网络流量。

  • 给定的任务完成后,由集群归集数据,产生一个适当的结果,并将其发送回Hadoop服务器。

输入和输出(Java视角)

MapReduce框架操作< key,value>对,也就是说,框架将任务的输入视为一组<key,value>对,并生成一组< key,value>对作为任务的输出,只是类型不同。

键和值相关的类应该通过框架进行序列化,需要实现Writable接口。此外,key类必须实现Writable-Comparable接口,使框架方便排序。一个MapReduce任务的输入和输出类型:(Input)< k1,v1 > - >map- > < k2,v2 > - > - >reduce- > < k3,v3 >(Output)。

InputOutput
Map <k1, v1> list (<k2, v2>)
Reduce <k2, list(v2)> list (<k3, v3>)

术语

  • PayLoad -实现Map和Reduce方法的 应用程序 ,是任务的核心。

  • Mapper - Mapper将输入的键/值对映射为一组中间键/值对。

  • NamedNode -管理Hadoop分布式文件系统(HDFS)的节点。

  • DataNode —在进行任何处理之前提前展示数据的节点。

  • MasterNode —JobTracker运行的节点,并接受来自客户端的任务请求。

  • SlaveNode -Map和Reduce程序运行的节点。

  • JobTracker -调度任务并跟踪分配的任务到任务跟踪器。

  • Task Tracker -跟踪任务,向JobTracker报告任务状态。

  • Job —数据集上执行Mapper和Reducer的程序。

  • Task —在数据块的Mapper或Reducer任务。

  • Task Attempt —尝试在SlaveNode上执行任务的特定实例

示例场景

以下是某机构的用电量的数据。它包含了每月的用电量和几年的平均用电量。

JanFebMarAprMayJunJulAugSepOctNovDecAvg

19792323243242526262626252625

198026272828283031313130303029

198131323232333435363634343434

198439383939394142434039383840

198538393939394141410040393945

如果将上述数据作为输入,我们必须编写应用程序来处理它并产生结果,例如查找最大用电量年、最小用电量年,等等。如果记录数量有限,对程序员来说,这是一个简单的过程。他们将简单地编写逻辑来生成所需的输出,并将数据传递给应用程序。

但是,如果数据展示的是一个特定州的所有大型工业的从开始到现在的电力消耗。

  • 当我们编写应用程序来处理这些批量数据时,程序需要大量的时间来执行。

  • 当我们将数据从源头转移到网络服务器,将会消耗大量的网络流量。

为了解决这些问题,我们需要MapReduce框架。

输入数据

以下数据保存在sample.txt,作为输入数据:

示例程序

以下的程序使用MapReduce框架处理样本数据

package hadoop; 
 
import java.util.*; 
import java.io.IOException; 
import java.io.IOException; 
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.io.*; 
import org.apache.hadoop.mapred.*;
import org.apache.hadoop.util.*; 
 
public class ProcessUnits { 
   //Mapper class 
   public static class E_EMapper extends MapReduceBase implements 
   Mapper<LongWritable ,/*Input key Type */ 
   Text,                /*Input value Type*/ 
   Text,                /*Output key Type*/ 
   IntWritable>        /*Output value Type*/ 
   { 
      
      //Map function 
   public void map(LongWritable key, Text value, 
   OutputCollector<Text, IntWritable> output,   
   Reporter reporter) throws IOException 
    { 
     String line = value.toString(); 
     String lasttoken = null; 
     StringTokenizer s = new StringTokenizer(line,"\t"); 
     String year = s.nextToken(); 
         
     while(s.hasMoreTokens())
        {
          lasttoken=s.nextToken();
        } 
            
      int avgprice = Integer.parseInt(lasttoken); 
      output.collect(new Text(year), new IntWritable(avgprice)); 
    } 
 } 
   
   
   //Reducer class 
   public static class E_EReduce extends MapReduceBase implements 
   Reducer< Text, IntWritable, Text, IntWritable > 
   {  
   
      //Reduce function 
    public void reduce( Text key, Iterator <IntWritable> values, 
    OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException 
     { 
       int maxavg=30; 
       int val=Integer.MIN_VALUE; 
            
       while (values.hasNext()) 
        { 
          if((val=values.next().get())>maxavg) 
           { 
            output.collect(key, new IntWritable(val)); 
           } 
         } 
 
      } 
   }  
   
   
   //Main function 
   public static void main(String args[])throws Exception 
   { 
    JobConf conf = new JobConf(ProcessUnits.class); 
      
    conf.setJobName("max_eletricityunits"); 
    conf.setOutputKeyClass(Text.class);
    conf.setOutputValueClass(IntWritable.class); 
    conf.setMapperClass(E_EMapper.class); 
    conf.setCombinerClass(E_EReduce.class); 
    conf.setReducerClass(E_EReduce.class); 
    conf.setInputFormat(TextInputFormat.class); 
    conf.setOutputFormat(TextOutputFormat.class); 
      
    FileInputFormat.setInputPaths(conf, new Path(args[0])); 
    FileOutputFormat.setOutputPath(conf, new Path(args[1])); 
      
    JobClient.runJob(conf); 
   } 
 
} 

将上面的程序保存为ProcessUnits.java。 程序的编译和执行解释如下

编译和单元程序的执行过程

假设我们进入Hadoop用户的主目录中(例如,/home/Hadoop)。

按照下面的步骤编译并执行上面的程序。

步骤1

下面的命令是创建一个目录来存储已编译的java类 

$ mkdir units 

步骤2

下载Hadoop-core-1.2.1.jar,用于编译和执行MapReduce程序。访问以下链接http://mvnrepository.com/artifact/org.apache.hadoop/hadoop-core/1.2.1下载jar。假设下载的文件夹是/home/hadoop/。

步骤3

下面的命令用于编译ProcessUnits.java,为程序创建一个jar。

$ javac -classpath hadoop-core-1.2.1.jar -d units ProcessUnits.java 

$ jar -cvf units.jar -C units/ . 

步骤4

下面的命令用于在HDFS中创建一个输入目录。

$HADOOP_HOME/bin/hadoop fs -mkdir input_dir 

步骤5

下面的命令用于复制名为sample的输入文件。txtin是HDFS的输入目录。

$HADOOP_HOME/bin/hadoop fs -put /home/hadoop/sample.txt input_dir 

步骤6

下面的命令用于验证输入目录中的文件。

$HADOOP_HOME/bin/hadoop fs -ls input_dir/ 

步骤7

下面的命令是用于从输入的目录获取输入文件,运行eleunit_max应用。

$HADOOP_HOME/bin/hadoop jar units.jar hadoop.ProcessUnits input_dir output_dir 

等待一段时间,直到文件被执行。执行后,如下图所示,输出将包含输入细分的数目、Map任务的数量、reducer任务的数量等。

INFO mapreduce.Job: Job job_1414748220717_0002 

completed successfully 

14/10/31 06:02:52 

INFO mapreduce.Job: Counters: 49 

File System Counters 

FILE: Number of bytes read=61 

FILE: Number of bytes written=279400 

FILE: Number of read operations=0 

FILE: Number of large read operations=0   

FILE: Number of write operations=0 

HDFS: Number of bytes read=546 

HDFS: Number of bytes written=40 

HDFS: Number of read operations=9 

HDFS: Number of large read operations=0 

HDFS: Number of write operations=2 Job Counters 

Launched map tasks=2  

Launched reduce tasks=1 

Data-local map tasks=2  

Total time spent by all maps in occupied slots (ms)=146137 

Total time spent by all reduces in occupied slots (ms)=441   

Total time spent by all map tasks (ms)=14613 

Total time spent by all reduce tasks (ms)=44120 

Total vcore-seconds taken by all map tasks=146137 

 

Total vcore-seconds taken by all reduce tasks=44120 

Total megabyte-seconds taken by all map tasks=149644288 

Total megabyte-seconds taken by all reduce tasks=45178880 

 

Map-Reduce Framework 

Map input records=5  

Map output records=5   

Map output bytes=45  

Map output materialized bytes=67  

Input split bytes=208 

Combine input records=5  

Combine output records=5 

Reduce input groups=5  

Reduce shuffle bytes=6  

Reduce input records=5  

Reduce output records=5  

Spilled Records=10  

Shuffled Maps =2  

Failed Shuffles=0  

Merged Map outputs=2  

GC time elapsed (ms)=948  

CPU time spent (ms)=5160  

Physical memory (bytes) snapshot=47749120  

Virtual memory (bytes) snapshot=2899349504  

Total committed heap usage (bytes)=277684224

 

File Output Format Counters 

Bytes Written=40 

步骤8

下面的命令用于验证输出文件夹中的结果文件。

$HADOOP_HOME/bin/hadoop fs -ls output_dir/ 

步骤9

下面的命令是用来看part- 00000文件输出。这个文件是由HDFS生成。

$HADOOP_HOME/bin/hadoop fs -cat output_dir/part-00000 

下面是MapReduce程序生成的输出。

1981    34 

1984    40 

1985    45 

步骤10

下面的命令用来从HDFS复制输出文件夹到本地文件系统,用作分析。

$HADOOP_HOME/bin/hadoop fs -cat output_dir/part-00000/bin/hadoop dfs get output_dir /home/hadoop 

重要命令

所有Hadoop命令都由$HADOOP_HOME/bin/hadoop命令调用。 运行Hadoop脚本不加任何参数会打印所有命令的描述。

Usage : hadoop [--config confdir] COMMAND

下表罗列了可用选项及其描述

OptionsDescription
namenode -format Formats the DFS filesystem.
secondarynamenode Runs the DFS secondary namenode.
namenode Runs the DFS namenode.
datanode Runs a DFS datanode.
dfsadmin Runs a DFS admin client.
mradmin Runs a Map-Reduce admin client.
fsck Runs a DFS filesystem checking utility.
fs Runs a generic filesystem user client.
balancer Runs a cluster balancing utility.
oiv Applies the offline fsimage viewer to an fsimage.
fetchdt Fetches a delegation token from the NameNode.
jobtracker Runs the MapReduce job Tracker node.
pipes Runs a Pipes job.
tasktracker Runs a MapReduce task Tracker node.
historyserver Runs job history servers as a standalone daemon.
job Manipulates the MapReduce jobs.
queue Gets information regarding JobQueues.
version Prints the version.
jar <jar> Runs a jar file.
distcp <srcurl> <desturl> Copies file or directories recursively.
distcp2 <srcurl> <desturl> DistCp version 2.
archive -archiveName NAME -p Creates a hadoop archive.
<parent path> <src>* <dest>
classpath Prints the class path needed to get the Hadoop jar and the required libraries.
daemonlog Get/Set the log level for each daemon

如何与mapreduce任务交互

以下是Hadoop任务中可用的通用选项。

GENERIC_OPTIONSDescription
-submit <job-file> Submits the job.
-status <job-id> Prints the map and reduce completion percentage and all job counters.
-counter <job-id> <group-name> <countername> Prints the counter value.
-kill <job-id> Kills the job.
-events <job-id> <fromevent-#> <#-of-events> Prints the events' details received by jobtracker for the given range.
-history [all] <jobOutputDir> - history < jobOutputDir> Prints job details, failed and killed tip details. More details about the job such as successful tasks and task attempts made for each task can be viewed by specifying the [all] option.
-list[all] Displays all jobs. -list displays only jobs which are yet to complete.
-kill-task <task-id> Kills the task. Killed tasks are NOT counted against failed attempts.
-fail-task <task-id> Fails the task. Failed tasks are counted against failed attempts.
-set-priority <job-id> <priority> Changes the priority of the job. Allowed priority values are VERY_HIGH, HIGH, NORMAL, LOW, VERY_LOW

查看job状态

$ $HADOOP_HOME/bin/hadoop job -status <JOB-ID> 
 e.g. 
$ $HADOOP_HOME/bin/hadoop job -status job_201310191043_0004 

查看job输出历史记录

$ $HADOOP_HOME/bin/hadoop job -history <DIR-NAME> 
e.g.
$ $HADOOP_HOME/bin/hadoop job -history /user/expert/output 

终止job

$ $HADOOP_HOME/bin/hadoop job -kill <JOB-ID> 
e.g. 
$ $HADOOP_HOME/bin/hadoop job -kill job_201310191043_0004 

java达人

ID:java_daren

(长按识别)

作者:tutorialspoint

译者:java达人

来源:https://www.tutorialspoint.com/hadoop/hadoop_mapreduce.htm(点击文末阅读原文前往)





收藏 推荐 打印 | 录入: | 阅读:
本文评论   查看全部评论 (0)
表情: 表情 姓名: 字数
点评:
       
评论声明
  • 尊重网上道德,遵守中华人民共和国的各项有关法律法规
  • 承担一切因您的行为而直接或间接导致的民事或刑事法律责任
  • 本站管理人员有权保留或删除其管辖留言中的任意内容
  • 本站有权在网站内转载或引用您的评论
  • 参与本评论即表明您已经阅读并接受上述条款