你好,游客 登录 注册 搜索
背景:
阅读新闻

初识Hadoop's Ecosystem

[日期:2015-12-10] 来源:航行萬里 不留邊際   作者: [字体: ]

时至今日, 我还是不太能用外行人听得懂的简单的话来解释什么是大数据.

Hadoop的生态系统发展到现在越来越成熟, 不断有新的项目迭代更新, 向成熟规模使用的项目发起挑战.对于我们这种中小企业工作的人来说, 现在还只能用接触到的一部分工具组合来窥视此生态的"冰山一角".

node -> clusterp

数据的存储不再依赖于单机之上, 而是通过聚合大量的机器节点, 就有了集群的概念.

通过网络联通一群机器, Hadoop就是利用了这样的物理存储引擎, 设计了其自身的文件管理系统 --Hadoop分布式文件系统, Hadoop Distributed File System (HDFS).

看到的一个HDFS路径 /tmp/foobar/test.data 看上去是一个文件名称, 但实际有可能是存放在不同的节点上. 如果你不想关心这些底层存储的逻辑, 那么往下看数据的处理使用.

Hadoop1.0 的生态系统

图源: Apache Hadoop YARN: Moving beyond MapReduce and Batch Processing with Apache Hadoop 2

HDFS的设计

通常来说, 我们会这样粗略理解: HDFS是用来存储 "bigger than bigger" 的大型文件. 这里的大我们可以有个量级的概念. 目前比较主流的是Gb, Tb级别的数据, 但这些数据是用HDFS的block来存储的 .Block是HDFS中最小的数据读写单元, 默认大小为128MB(但一个小于128MB的文件实际消耗的磁盘空间不是128, 仍为原大小).

关于128的大小: 一个BLOCK大小128MB 主要是考虑到查询的成本, 同时也兼顾数据传输的速率问题. 如果一个BLOCK过大, 在每个BLOCK上的map任务都会消耗时间.

data streaming: 一套文件系统的创建必须要考虑存储在上面的数据文件的读写效率. 最典型的场景就是 一次写, 多次读 的模式.

namenodes & datanodes

HDFS集群上也有两类节点, 一类是干活的datanodes(workders), 另一类就算是发号的namenodes(master). master用于管理文件系统的namespace, 维护着当前文件系统树的目录关系. 同时也清楚其它datanodes 现在都存储了哪些文件.

串行 v 并行

真正的业务已经不可能再使用单机\串行的方式来处理数据了 -- 因为数据量之量 已经无法容忍速度最快的机器的时长, 因此就要实现机器之间的数据交互, 以达到并行处理的效果. MapReduce, Tez, Spark 这些计算框架皆是出于此. 其中MR是第一代计算引擎, 是由Google的科学家们首先提出 (业界著名的paper想必是容易找到的).

比如手上有个全球的天气数据, 先来看一个shell下awk处理的代码, 作为串行的代表:

#!/usr/bin/env bash
for year in all/*
do
  echo -ne `basename $year .gz`"\t"
  gunzip -c $year | \
    awk '{ temp = substr($0, 88, 5) + 0;
           q = substr($0, 93, 1);
           if (temp !=9999 && q ~ /[01459]/ && temp > max) max = temp }
         END { print max }'
done

代码片段来自 Hadoop: The Definitive Guide 4 th Edition , 目的是求每年的最高温度.

但是, 我们会不会这么想, 如果每年处理要5分钟, 有100年就得500分钟(8个小时), 能不能每年将由一个节点处理, 然后再将100个结果汇总到一起?

图源: Hadoop: The Definitive Guide 4 th Edition

mapper of java

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class MaxTemperatureMapper
    extends Mapper<LongWritable, Text, Text, IntWritable> {

  private static final int MISSING = 9999;

  @Override
  public void map(LongWritable key, Text value, Context context)
      throws IOException, InterruptedException {

    String line = value.toString();
    String year = line.substring(15, 19);
    int airTemperature;
    if (line.charAt(87) == '+') { // parseInt doesn't like leading plus signs
      airTemperature = Integer.parseInt(line.substring(88, 92));
    } else {
      airTemperature = Integer.parseInt(line.substring(87, 92));
    }
    String quality = line.substring(92, 93);
    if (airTemperature != MISSING && quality.matches("[01459]")) {
      context.write(new Text(year), new IntWritable(airTemperature));
    }
  }
}

reducer of java

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class MaxTemperatureReducer
    extends Reducer<Text, IntWritable, Text, IntWritable> {

  @Override
  public void reduce(Text key, Iterable<IntWritable> values, Context context)
      throws IOException, InterruptedException {

    int maxValue = Integer.MIN_VALUE;
    for (IntWritable value : values) {
      maxValue = Math.max(maxValue, value.get());
    }
    context.write(key, new IntWritable(maxValue));
  }
}

代码片段来自 Hadoop: The Definitive Guide 4 th Edition

MapReduce

MapReduce 实际是 map 和 reduce的处理流, 假设我们有100台机器,将数据分成100份之后同时处理数据, 然后再通过reduce(之间可能会有其它的步骤)来实现聚合每台机器上的分治结果.

有兴趣的人可去寻找很多人列举的wordCount 这一经典如同hello world 的例子.实际上这也是并行处理最为直白的方式, 非常简单粗暴. 但比较笨重, Spark通过减少数据的io, 以更方便的方式来提高了效率, 现在已经逐渐在业界有取代之势, 不过由于MR的根基深厚, 目前可能还是MR为主流.

不过因为写MR的代码比较麻烦, 不管是使用Java, Python, 甚至Ruby等语言都还需要一定的学习成本, 对于一些不那么技术的人来说(可能对方更希望使用的一些简单如SQL语句的方式来进行数据的处理), 因此有相继的Apache项目诞生了出来. 这其中比较有代表性的如Pig, 如Hive. 通过PigLatin, 或 Hive SQL 真的是几乎和SQL一样的语言来提供了MapReduce的"傻瓜式"编译. 大大简化了人编写的工作, 释放了人的时间.

于是, 数据分析师可以用这些工具来不再依赖工程师的"Hard core"技能. 也可以独立进行数据的处理. 更重要的: 让CS背景的人们也开始向往大数据的领域.

那么问题来了

随着这些简化工具的普及推广, 问题也接踵而至, 就是速度. 于是有更多的新颖的东西也应此需求诞生( Impala, Presto, Drill ... ) 这些工具的往往特点就是通过牺牲小部分(可接受的)系统稳定性来获取大幅度性能的提升.后来, Spark SQL 的提出可能是一次划时代的, 基本上完全的超越了Hive.

为什么我们一个劲的强调处理数据的速度?想象你是Facebook的后台数据人员, 需要实时的进行信息处理或者模型工具, 你的数据量是多少? 全球至少10亿的人在使用, ... 我们必须要根据自己的业务或者是当前业务一定倍数来使用, 甚至不断的尝试一些新奇的数据技术.

一些 key-value 工具

有些数据存储是根据键值的形式存储的, Cassandra,HBase,MongoDB还有很多我们想不出来名字的模块. 这是根据key来索引数据的. 比如我们全中国人的数据, 假设是用身份证号来索引的. 这个动作通过kv会比MapReduce更方便一些. 比如一些电商网站的订单管理系统, 可能就是用订单号和其它的字段作为联合key.

调度大师

上面提到了一些工具框架, 但在几百台的大集群上有序进行(大家和睦相片)就得有个commander -- 调度系统. 目前最主流的就是Yarn. 比如我们CEO可能就每天负责技术部分今天做xxx, 之后再由产品来xxx, 再是销售人员来...

在1.0时, Hadoop使用 JobTracker的机制来负责资源管理(时间统筹, 资源监管等任务). JobTracker又细分成全局性的Resource Manager(RM)和每个应用任务的单独Manager -- Application Master(AM).

增加了Yarn的Hadoop生态系统

图源: Apache Hadoop YARN: Moving beyond MapReduce and Batch Processing with Apache Hadoop 2





收藏 推荐 打印 | 录入:elainebo | 阅读:
本文评论   查看全部评论 (0)
表情: 表情 姓名: 字数
点评:
       
评论声明
  • 尊重网上道德,遵守中华人民共和国的各项有关法律法规
  • 承担一切因您的行为而直接或间接导致的民事或刑事法律责任
  • 本站管理人员有权保留或删除其管辖留言中的任意内容
  • 本站有权在网站内转载或引用您的评论
  • 参与本评论即表明您已经阅读并接受上述条款