你好,游客 登录 注册 搜索
背景:
阅读新闻

翻译:Hadoop权威指南之Spark-1

[日期:2016-07-18] 来源:简书  作者:qinm08 [字体: ]

本文翻译自O'Reilly出版Tom White所著《Hadoop: The Definitive Guide》第4版第19章,向作者致敬。该书英文第4版已于2015年4月出版,至今已近15个月,而市面上中文第3版还在大行其道。Spark一章是第4版新增的内容,笔者在学习过程中顺便翻译记录,由于笔者也在学习,并无实战经验,难免翻译不妥或出错,欢迎方家来信斧正。翻译纯属兴趣,不做商业用途。秦铭,Email地址 qinm08@gmail.com 。

Hadoop

Apache Spark 是一个大规模数据处理的集群计算框架。和本书中讨论的大多数其他处理框架不同,Spark不使用MapReduce作为执行引擎,作为替代,Spark使用自己的分布式运行环境(distributed runtime)来执行集群上的工作。然而,Spark与MapReduce在API和runtime方面有许多相似,本章中我们将会看到。Spark和Hadoop紧密集成:它可以运行在YARN上,处理Hadoop的文件格式,后端存储采用HDFS。

Spark最著名的是它拥有把大量的工作数据集保持在内存中的能力。这种能力使得Spark胜过对应的MapReduce工作流(某些情况下差别显著),在MapReduce中数据集总是要从磁盘加载。两种类型的应用从Spark这种处理模型中受益巨大:1)迭代算法,一个函数在某数据集上反复执行直到满足退出条件。2)交互式分析,用户在某数据集上执行一系列的特定查询。

即使你不需要内存缓存,Spark依然有充满魅力的理由:它的DAG引擎和用户体验。与MapReduce不同,Spark的DAG引擎能够处理任意的多个操作组成的管道(pipelines of operators)并翻译为单个Job。

Spark的用户体验也是首屈一指的(second to none),它有丰富的API用来执行很多常见的数据处理任务,比如join。行文之时,Spark提供三种语言的API:Scala,Java和Python。本章中的大多数例子将采用Scala API,但翻译为别的语言也是容易的。Spark还带有一个基于Scala或Python的REPL(read-eval-print loop)环境,可以快速简便的查看数据集。

Spark是个构建分析工具的好平台,为达此目的,Apache Spark项目包含了众多的模块:机器学习(MLlib),图形处理(GraphX),流式处理(Spark Streaming),还有SQL(Spark SQL)。本章内容不涉及这些模块,感兴趣的读者可以访问 Apache Spark 网站 。

Installing Spark

从 下载页面 下载Spark二进制分发包的稳定版本(选择和你正在使用的Hadoop版本相匹配的)。在合适的地方解压这个tar包。

% tar xzf spark-x.y.z-bin-distro.tgz

把Spark加入到PATH环境变量中

% export SPARK_HOME=~/sw/spark-x.y.z-bin-distro
% export PATH=$PATH:$SPARK_HOME/bin

我们现在可以运行Spark的例子了。

An Example

为了介绍Spark,我们使用spark-shell来运行一个交互式会话,这是带有Spark附加组件的Scala REPL,用下面的命令启动shell:

% spark-shell
Spark context available as sc.
scala>

从控制台的输出,我们可以看到shell创建了一个Scala变量,sc,用来存储SparkContext实例。这是Spark的入口,我们可以这样加载一个文本文件:

scala> val lines = sc.textFile("input/ncdc/micro-tab/sample.txt")
lines: org.apache.spark.rdd.RDD[String] = MappedRDD[1] at textFile at <console>:12

lines变量是对一个弹性数据集(RDD)的引用,RDD是Spark的核心抽象:分区在集群中多台机器上的只读的对象集合。在典型的Spark程序中,一个或多个RDD被加载进来作为输入,经过一系列的转变(transformation),成为一组目标RDD,可以对其执行action(比如计算结果或者写入持久化存储) 。“弹性数据集”中的“弹性”是指,Spark会通过从源RDD中重新计算的方式,来自动重建一个丢失的分区。

加载RDD和执行transformation不会触发数据处理,仅仅是创建一个执行计算的计划。当action(比如 foreach())执行的时候,才会触发计算。

我们要做的第一个transformation,是把lines拆分为fields:

scala> val records = lines.map(_.split("\t"))
records: org.apache.spark.rdd.RDD[Array[String]] = MappedRDD[2] at map at <console>:14

这里使用了RDD的map()方法,对RDD中的每一个元素,执行一个函数。本例中,我们把每一行(字符串String)拆分为 Scala 的字符串数组(Array of Strings)。

接下来,我们使用过滤器(filter)来去掉可能存在的坏记录:

scala> val filtered = records.filter(rec => (rec(1) != "9999" && rec(2).matches("[01459]")))
filtered: org.apache.spark.rdd.RDD[Array[String]] = FilteredRDD[3] at filter at <console>:16

RDD的filter方法接收一个返回布尔值的函数作为参数。这个函数过滤掉那些温度缺失(由9999表示)或者质量不好的记录。

为了找到每一年的最高气温,我们需要在year字段上执行分组操作,这样才能处理每一年的所有温度值。Spark提供reduceByKey()方法来做这件事情,但它需要一个键值对RDD,因此我们需要通过另一个map来把现有的RDD转变为正确的形式:

scala> val tuples = filtered.map(rec => (rec(0).toInt, rec(1).toInt))
tuples: org.apache.spark.rdd.RDD[(Int, Int)] = MappedRDD[4] at map at <console>:18

现在可以执行聚合了。reduceByKey()方法的参数是一个函数,这个函数接受两个数值并联合为一个单独的数值。这里我们使用Java的Math.max函数:

scala> val maxTemps = tuples.reduceByKey((a, b) => Math.max(a, b))
maxTemps: org.apache.spark.rdd.RDD[(Int, Int)] = MapPartitionsRDD[7] at reduceByKey at <console>:21

现在可以展示maxTemps的内容了,调用foreach()方法并传入println(),把每个元素打印到控制台:

scala> maxTemps.foreach(println(_))
(1950,22)
(1949,111)

这个foreach()方法,与标准Scala集合(比如List)中的等价物相同,对RDD中的每个元素应用一个函数(此函数具有副作用)。正是这个操作,促使Spark运行一个job来计算RDD中的数据,使之能够跑步通过println()方法:-)

或者,也可以把RDD保存到文件系统:

scala> maxTemps.saveAsTextFile("output")

这样会创建一个output目录,包含分区文件:

% cat output/part-*
(1950,22)
(1949,111)

这个saveAsTextFile()方法也会触发一个Spark job。主要的区别是没有返回值,而是把RDD的计算结果及其分区文件写入output目录中。

Spark Applications, Jobs, Stages, Tasks

示例中我们看到,和MapReduce一样,Spark也有job的概念。然而,Spark的job比MapReduce的job更通用,因为它是由任意的stage的有向无环图(DAG)组成。每个stage大致等同于MapReduce中的map或者reduce阶段。

Stages被Spark 运行时拆分为tasks,并行地运行在RDD的分区之上,就像MapReduce的task一样。

一个Job总是运行于一个application的上下文中,由SparkContext实例表示,application的作用是分组RDD和共享变量。一个application可以运行多个job,串行或者并行,并且提供一种机制,使得一个job可以访问同一application中的前一个job缓存的RDD。一个交互式的Spark会话,比如spark-shell会话,就是一个application的实例。





收藏 推荐 打印 | 录入:elainebo | 阅读:
本文评论   查看全部评论 (0)
表情: 表情 姓名: 字数
点评:
       
评论声明
  • 尊重网上道德,遵守中华人民共和国的各项有关法律法规
  • 承担一切因您的行为而直接或间接导致的民事或刑事法律责任
  • 本站管理人员有权保留或删除其管辖留言中的任意内容
  • 本站有权在网站内转载或引用您的评论
  • 参与本评论即表明您已经阅读并接受上述条款