你好,游客 登录
背景:
阅读新闻

hadoop分布式程序 总结

[日期:2016-07-28] 来源:你知道DCY吗  作者: [字体: ]

首先hadoop的集群和虚机是op配置的。

因为hadoop是java开发的,而笔者不会java,所以通过hadoop streaming使用python来编写的程序。

Hadoop

网上一段资料讲的挺好:通过将用其他语言编写的mapper和reducer通过参数传给一个事先写好的Java程序(Hadoop自带的*-streaming.jar),这个Java程序会负责创建MR作业,另开一个进程来运行mapper,将得到的输入通过stdin传给它,再将mapper处理后输出到stdout的数据交给Hadoop,partition和sort之后,再另开进程运行reducer,同样地通过stdin/stdout得到最终结果。因此,我们只需要在其他语言编写的程序里,通过stdin接收数据,再将处理过的数据输出到stdout,Hadoop streaming就能通过这个Java的wrapper帮我们解决中间繁琐的步骤,运行分布式程序。

所以从原理上讲,只要能处理标准输入输出流的都可以来写mapper和reducer。

我在写hadoop的时候,基本的文件结构是

mapper.py

reducer.py

config #配置项

run.sh #执行hadoop命令

在这里给一个wordcount的例子,我们唯一需要做的是利用Python的sys.stdin读取输入数据,并把我们的输出传送给sys.stdout。

我们的测试数据是 “foo foo quux labs foo bar quux”

首先编写我们的mapper.py (mapper.py的作用就是分割字符串,我们需要输出到stdout的数据格式key \t value的形式,也就是 word\t1)

#!/usr/bin/env python
import sys
for line in sys.stdin:
   line = line.strip()
   words = line.split()
   for word in words:
      print "%s\t%s" % (word, 1)

我们先执行以下这个示例

echo “foo foo quux labs foo bar quux”|./mapper.py

结果是

foo 1
foo 1
quux    1
labs    1
foo 1
bar 1
quux    1

然后此处进行partition和sort,分片就是分成若干个reduce任务,此处测试时我们默认只进行sort。

排序之后我们编写 reduce任务代码,统计相同的词出现的总次数,排序后相同的key是连续的,相同的word输出一次,count相加。

#!/usr/bin/env python
from operator import itemgetter
import sys

current_word = None
current_count = 0
word = None

for line in sys.stdin:
    line = line.strip()
    word, count = line.split('\t', 1)
    try:
        count = int(count)
    except ValueError:  #count如果不是数字的话,直接忽略掉
        continue
    if current_word == word:
        current_count += count
    else:
        if current_word:
            print "%s\t%s" % (current_word, current_count)
        current_count = count
        current_word = word

if word == current_word:  #不要忘记最后的输出
    print "%s\t%s" % (current_word, current_count)

随即执行reduce的代码。

$echo “foo foo quux labs foo bar quux” | ./mapper.py | sort -k1,1 | ./reducer.py

如此这般,结果是

bar 1
foo 3
labs    1
quux    2

这样词频就统计出来结果了。

而针对于实际的应用场景,例如日志分析。

比如看这样一段日志:

"GET /pv.gif?uigs_productid=tugeleapp&platform=1&id=07e079a8-c7f4-49a2-b958-b3c3d8ecae71&version=2.1.0&visitor=1&company=apple&machine=iPhone%205s&osversion=7.1.2&- HTTP/1.0"
"GET /pv.gif?uigs_productid=tugeleapp&platform=1&id=07e079a8-c7f4-49a2-b958-b3c3d8ecae72&version=2.1.0&visitor=1&company=apple&machine=iPhone%205s&osversion=7.1.2&- HTTP/1.0"
"GET /pv.gif?uigs_productid=tugeleapp&platform=1&id=07e079a8-c7f4-49a2-b958-b3c3d8ecae71&version=2.1.0&visitor=1&company=apple&machine=iPhone%205s&osversion=7.1.2&- HTTP/1.0"
"GET /pv.gif?uigs_productid=tugeleapp&platform=1&id=07e079a8-c7f4-49a2-b958-b3c3d8ecae73&version=2.1.0&visitor=1&company=apple&machine=iPhone%205s&osversion=7.1.2&- HTTP/1.0"
"GET /pv.gif?uigs_productid=tugeleapp&platform=1&id=07e079a8-c7f4-49a2-b958-b3c3d8ecae72&version=2.1.0&visitor=1&company=apple&machine=iPhone%205s&osversion=7.1.2&- HTTP/1.0"
"GET /pv.gif?uigs_productid=tugeleapp&platform=1&id=07e079a8-c7f4-49a2-b958-b3c3d8ecae71&version=2.1.0&visitor=1&company=apple&machine=iPhone%205s&osversion=7.1.2&- HTTP/1.0"
"GET /pv.gif?uigs_productid=tugeleapp&platform=1&id=07e079a8-c7f4-49a2-b958-b3c3d8ecae71&version=2.1.0&visitor=1&company=apple&machine=iPhone%205s&osversion=7.1.2&- HTTP/1.0"

我们需要统计出来这样的结果

我们需要一个单独的配置文件config,根据相应的公式来写

比如 这个统计项的公式是(格式是自定义的)

APP_USAGE>id>machine>osversion|id=* APP_USAGE是统计项名称

左侧的格式是 在key中加入 id,machine,osversion等字段,分别代表用户id,机型,系统版本。 除了这些默认的有版本号,平台两个字段,key中不同字段在输出时候以’|’分割。

然后我们还需要公式,何时代表这一条log满足这个统计项目的需要。

配置文件中’|’ 后面的id=* 就是这个统计项目的公式,id在每一条log中出现一次,就计1次。

我们也可以对公式进行很多自定义的设置,满足不同的统计项目的需求。

此时我们可以仿照上面的词频统计的例子来按照配置文件编写代码,然后此时我们写好了mapper和reducer的代码。

然后是在hadoop中执行,我们通过一段shell来完成整个执行过程。

input_path="*/*"
output_path="*/*"

hadoop jar  /hadoop-streaming-*.jar \
        -Dhadoop.client.ugi=user,pass \# hadoop用户名和密码
        -input $input_path \ #输入路径
        -output $output_path \ #输出路径
        -mapper mapper.py \ #mapper文件
        -reducer reducer.py \ #reducer文件
        -file mapper.py \
        -file reducer.py \
        -file config \
        -jobconf mapred.map.tasks=500 \
        -inputformat com.hadoop.mapred.DeprecatedLzoTextInputFormat \
        -jobconf stream.map.input.ignoreKey=true \
        -numReduceTasks 10 \ #reduce分片任务数量设置  此处为10
        -cmdenv "" \
        -jobconf "mapred.job.name="

# 此时我们在输出路径中得到hadoop结果,然后我们拿到一个有10个输出结果文件的文件夹。我们将其merge 到本地
hadoop dfs -getmerge "$output_path" res.txt
# 因为10个部分,可能会有重复的,所以需要再执行一次reduce任务,可以再启动一次hadoop,此处因为行数比较少,所以直接写脚本处理了。
cat res.txt |sort |python all_reduce.py > result.txt
# 然后将不同的result.txt分割,按照配置文件的统计项
cat result.txt | grep APP_USAGE | sort -t $'\t' -k2 -nr | head -200 > ./temp/APP_USAGE.txt
cat result.txt | grep APP_ANAL | sort -t $'\t' -k2 -nr > ./temp/APP_ANAL.txt
# 然后再写脚本对每个txt文件,分割和处理结果后,写入到数据库里。
python sql.py
#然后删除 res.txt 
rm -f res.txt

此处所有的处理结果完毕。

如果需要在绿皮上展示,在写入数据库的时候,建议选择latin1 编码,或者将文件改为gbk编码。这样子不会乱码。另外数据库的格式也设置为latin1编码。





收藏 推荐 打印 | 录入:elainebo | 阅读:
本文评论   查看全部评论 (0)
表情: 表情 姓名: 字数
点评:
       
评论声明
  • 尊重网上道德,遵守中华人民共和国的各项有关法律法规
  • 承担一切因您的行为而直接或间接导致的民事或刑事法律责任
  • 本站管理人员有权保留或删除其管辖留言中的任意内容
  • 本站有权在网站内转载或引用您的评论
  • 参与本评论即表明您已经阅读并接受上述条款