你好,游客 登录 注册 搜索
背景:
阅读新闻

Hadoop the Definitive 4th

[日期:2016-01-25] 来源:航行萬里 不留邊際  作者: [字体: ]

Developing a MapReduce Application

Writing a program in MapReduce follows a certain pattern. Before we start writing a MapReduce program, however, we need to set up and configure the development.

  • Configuration class ( org.apache.hadoop.conf package)

Running on a Cluster

In a distributed setting, things are a little more complex. For a start, a jobs' classes must be packaged into a job JAR file to send to the cluster. Hadoop will find the job JAR automactically by searching for the JAR on the drivers' classpath that contains the class set in the setJarByClass() method ( on JobConf or Job).

  • The client class path

    user's client-side classpath set by hadoop jar is made up of:

    - The job Jar file
    - Any JAR files in the lib directory of the job JAR file, and the classes direcotry ( if present)
    - The classpath defined by HADOOP_CLASSPATH, if set. 

Launching a Job

$ hadoop jar hadoop-examples.jar v2.MaxTemperatureDriver \
    -conf conf/hadoop-cluster.xml input/ncdc/all max-temp

Job, Task, and Task Attempt IDs

job ID 根据 YARN application ID 生成. YARN application ID 由 YARN RM 生成 (与 RM 的composed time, counter maintained, 以及后面添加的app唯一标识序号 有关), app ID 一般长得样子是 application_141041231231_0003 , 对应的jobID 就是把这前的application 替换为 job_. ==> job_141041231231_0003

一个Job再细分为几个Task, 它们的ID是 将job_前缀替换为 task ==> task_141041231231_0003_m_00003. 前后缀加上任务编号, 以区别不同的task.

MapReduce Web UI

YARN Page: resource-manager-host:8088

  • resource manager page
  • job history page
  • mapreduce job page

Tuning a Job

" Can I Make it run faster" may come out of our minds, after our job is working.

You should think about these profile in checklist before you start trying to optimize at the task level.

  • Number of mappers 如果每个mapper只运行几秒就停了, 你应该让他们运行更长时间, 1分钟甚至更长. 对于small files 去看看 CombineFileInputFormat (本博客中有单独文章提及)
  • Number of reducers 每个reducer的建议时长至少是5分钟, 产出数据大小应是blocksize水平的. 书中后面会单独讲解.

  • Combiners 在 mapper-reducer 之间的shuffle过程中能否使用某Combiner 提升效率

  • Intermidiate compression map输出进行压缩可能提升效率

  • Custom serialization

  • Shuffle tweaks

MapReduce Workflows

这里继 求当年最高温度之后再尝试寻找一个新的例子 -- 求一年366天中, 历年在多种天气状态下的平均最大温度. 1.1号为例, 先取下雨天气的天气, 然后求1901年至2000年 每年1.1号的下雨天气的最大温度, 找到最大值, 再对各类天气状态的最大求均值.

这个分析需要进行分解:

  1. 计算 (日期-无年, 天气状态) 为key下的最大温度
  2. 求上面输出的key下平均值

结合之前的分析, 能看到这里的任务要分2步或以上的MapReduce完成.

How MapReduce Works

MapReduce Job 的Lifetime:

  • hadoop客户端, 用户提交MapReduce Job
  • YARN RM(resource manager) 来分配cluster上的资源
  • YARN NM(NameNode) 准备用于运算的containers
  • MapRededuce application master, 在job运行过程中的"协调人".
  • HDFS share job间的文件

Job Submission

submit() 完成: 向 RM 申领一个新的 application Id, 用来指向给MapReduce Job ID; 检测Job, 例如输出的目录(output files)是否已经存在, 如果已经存在则会报错; 运行Input的Split, 若这过程有问题, 例如Input Path 不存在, 则会报错返回给 MapReduce Program; 对运行Job时需要的文件: JAR file, 配置文件, 计算好的 Input Splits, 进行copy, 待JobId分配成功后将放入到以Id命名的Share directory. 在集群运行该JOB时会有大量NM对Jar 进行访问, 因此其copy的量会较大, 程序中用 mapreduce.client.submit.file.replication来的控制, 默认因子设置为10.

提交成功.

Job Intialization

RM 接收到 某项 submit的请求后, 会将由 YARN 的 scheduler 处理该请求 -- 给其分配container(RM 启动任务, NM管理的地方).

该项MapReduce Job的直接master其实是 Java application 中的主类 MRAppMaster , 这个类中会创建一系列的 bookkeeping objects 用来跟踪记录Job的处理进度. 因为Job是会以被再细分为若干项Task, 所以每项Task都会单独向MRAppMaster汇报其完成情况. 另外, 它还会接收到用集群对输入切分好的Input Splits, 然后为每Input Split创建mapper task, 同样也完成reducer的初始化. (reducer个数由 mapreduce.job.reducers指定).

uberized

application master 决定如何运行MapReduce job下的各项task. 一般来说, 每一项task单独被申请各自container, 等task执行完毕, container被NM回收. 这种情况下, 每个JVM仅仅执行一次task. 如果Job太小, application master 可能会用相同的JVM执行多个任务, 实现JVM的重用 -- 每个task依次在这个container里的JVM里顺序执行, 直到所有task被执行完毕. 这样master不必申请多次, 达到了 uberlized 的效果.

application master 在 OutputCommitter上 执行 setupJob() 方法, 为task的输出创建临时woking space及输出目录. 详情还要查询 Output Committers

Task Assignment

non-uber task. application master 开始向 RM 为他创建好的 mapper/reducer申请container 资源完成任务. mapper的请求优先级会高于reducer -- 这是因为在执行sort, reducer 任务之前 所有mapper结果要完成. 对于 reduce的请求, 至少要等 5% 的map任务完成 才会开始接受受理.

Reduce 任务可以在集群的任意位置执行, 但map task 受到数据局部性(data locality)制约. map, reduce task默认的分配内存是 1024MB. 该值的properties:mapreduce.map.memory.mb mapreduce.reduce.memory.mb, mapreduce.map.cpo.vcores, mapreduce.reduce.cpu.vcores

data locality , 在quora中找到了一个 答案 中的解释:

Data locality is a core concept of Hadoop, based on several assumptions around the use of MapReduce. In short, keep data on disks that are close to the CPU and RAM that will be used to process and store it. If you had a cluster of 100 machines, and needed to read a selection of records, the records should be adjacent on disk, fit into RAM and be processable (e.g sorted or computed) using that machine's CPU.

Task Execution

Streaming

Progress and Status Updates

user有必要获得他提交的job目录的运行情况. 包括每个task的 status (运行中, 成功, 失败), Mapper和Reducer的完成进度. Mapper的完成度就是和task完成个数比例有关. Reducer则要复杂一些, 涉及到shuffle, reduce.

JOB Completion

application master, task containers 负责打扫, 清理. 执行OutputCommitter 's commitJob, 让用户看到预期的历史记录, 服务器使用信息.

Failures

任务失败, 遇到这种情况很正常, 我们应该学习怎样避免失败. 或者说如何解决这样的问题, 并避免.

失败的维度: task, application master, node manager, resource manger.

  • task failure 正常来说, 是最易看到的一类错误, 如果一个map/reduce task 抛出 runtime型的异常, JVM将会在其exit之前先把该信息报告给application master. app master 标记这项task为 failed. 然后释放container资源, 供下一个task使用.

另, 对于一个failed的task, app master 会再给他4次重试的机会(4 可进行修改, 参数为: mapreduce.map.maxattempts ) app master 会在重新调度的时候尽可能的使用与先前不同的Node来执行这个失败的任务 4次不行的话 则整个Job标记为失败.

如果失败的task 超过一定个数, 则会激活 job failure的改变. Profiles:mapreudce.map.failures.maxpercent, mapreduce.reduce.failures.maxpercent (也就是说失败的个数小于这个百分比, 那么 appication将会继续执行其它的task.

  • application master failure

  • node manager failure

    当node manager fail RM上看不到其返回的heartbeat. (比如10分钟内看不到, 时间配置: yearn.resourcemanager.nm.liveness-monitor.expiry-interval-ms )

  • resource manager failure

这个问题就比较严重了. 暂时先不想了解.

Shuffle and Sort

mapper的输出经过排序(按key)后传给reducer, 在这个排序并转移数据的过程, 叫做shuffle.

注, 上图中能看出, map的输出结果不是直接写到disk中, 而是先到一个内存中的缓存区(memory buffer), 这个默认大小是100MB(可通过 mapreduce.task.io.sort.mb 参数配置), 当缓存区的空间达到一定水平(默认是 80% mapreduce.map.sort.splil





收藏 推荐 打印 | 录入:elainebo | 阅读:
本文评论   查看全部评论 (0)
表情: 表情 姓名: 字数
点评:
       
评论声明
  • 尊重网上道德,遵守中华人民共和国的各项有关法律法规
  • 承担一切因您的行为而直接或间接导致的民事或刑事法律责任
  • 本站管理人员有权保留或删除其管辖留言中的任意内容
  • 本站有权在网站内转载或引用您的评论
  • 参与本评论即表明您已经阅读并接受上述条款