转载请注明出处:http://blog.csdn.net/xiaojimanman/article/details/40184581
最近几天一直在看hadoop相关的书籍,目前稍微有点感觉,自己就仿照着WordCount程序自己编写了一个统计关联商品。
需求描述:
根据超市的销售清单,计算商品之间的关联程度(即统计同时买A商品和B商品的次数)。
数据格式:
超市销售清单简化为如下格式:一行表示一个清单,每个商品采用 "," 分割,如下图所示:

需求分析:
采用hadoop中的mapreduce对该需求进行计算。
map函数主要拆分出关联的商品,输出结果为 key为商品A,value为商品B,对于第一条三条结果拆分结果如下图所示:

这里为了统计出和A、B两件商品想关联的商品,所以商品A、B之间的关系输出两条结果即 A-B、B-A。
reduce函数分别对和商品A相关的商品进行分组统计,即分别求value中的各个商品出现的次数,输出结果为key为商品A|商品B,value为该组合出现的次数。针对上面提到的5条记录,对map输出中key值为R的做下分析:
通过map函数的处理,得到如下图所示的记录:

reduce中对map输出的value值进行分组计数,得到的结果如下图所示

将商品A B作为key,组合个数作为value输出,输出结果如下图所示:

对于需求的实现过程的分析到目前就结束了,下面就看下具体的代码实现
代码实现:
关于代码就不做详细的介绍,具体参照代码之中的注释吧。
- package com;
 - import java.io.IOException;
 - import java.util.HashMap;
 - import java.util.Map.Entry;
 - import org.apache.hadoop.conf.Configuration;
 - import org.apache.hadoop.conf.Configured;
 - import org.apache.hadoop.fs.Path;
 - import org.apache.hadoop.io.IntWritable;
 - import org.apache.hadoop.io.LongWritable;
 - import org.apache.hadoop.io.Text;
 - import org.apache.hadoop.mapreduce.Job;
 - import org.apache.hadoop.mapreduce.Mapper;
 - import org.apache.hadoop.mapreduce.Reducer;
 - import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 - import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 - import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
 - import org.apache.hadoop.util.Tool;
 - import org.apache.hadoop.util.ToolRunner;
 - public class Test extends Configured implements Tool{
 - /**
 - * map类,实现数据的预处理
 - * 输出结果key为商品A value为关联商品B
 - * @author lulei
 - */
 - public static class MapT extends Mapper<LongWritable, Text, Text, Text> {
 - public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException{
 - String line = value.toString();
 - if (!(line == null || "".equals(line))) {
 - //分割商品
 - String []vs = line.split(",");
 - //两两组合,构成一条记录
 - for (int i = 0; i < (vs.length - 1); i++) {
 - if ("".equals(vs[i])) {//排除空记录
 - continue;
 - }
 - for (int j = i+1; j < vs.length; j++) {
 - if ("".equals(vs[j])) {
 - continue;
 - }
 - //输出结果
 - context.write(new Text(vs[i]), new Text(vs[j]));
 - context.write(new Text(vs[j]), new Text(vs[i]));
 - }
 - }
 - }
 - }
 - }
 - /**
 - * reduce类,实现数据的计数
 - * 输出结果key 为商品A|B value为该关联次数
 - * @author lulei
 - */
 - public static class ReduceT extends Reducer<Text, Text, Text, IntWritable> {
 - private int count;
 - /**
 - * 初始化
 - */
 - public void setup(Context context) {
 - //从参数中获取最小记录个数
 - String countStr = context.getConfiguration().get("count");
 - try {
 - this.count = Integer.parseInt(countStr);
 - } catch (Exception e) {
 - this.count = 0;
 - }
 - }
 - public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException{
 - String keyStr = key.toString();
 - HashMap<String, Integer> hashMap = new HashMap<String, Integer>();
 - //利用hash统计B商品的次数
 - for (Text value : values) {
 - String valueStr = value.toString();
 - if (hashMap.containsKey(valueStr)) {
 - hashMap.put(valueStr, hashMap.get(valueStr) + 1);
 - } else {
 - hashMap.put(valueStr, 1);
 - }
 - }
 - //将结果输出
 - for (Entry<String, Integer> entry : hashMap.entrySet()) {
 - if (entry.getValue() >= this.count) {//只输出次数不小于最小值的
 - context.write(new Text(keyStr + "|" + entry.getKey()), new IntWritable(entry.getValue()));
 - }
 - }
 - }
 - }
 - @Override
 - public int run(String[] arg0) throws Exception {
 - // TODO Auto-generated method stub
 - Configuration conf = getConf();
 - conf.set("count", arg0[2]);
 - Job job = new Job(conf);
 - job.setJobName("jobtest");
 - job.setOutputFormatClass(TextOutputFormat.class);
 - job.setOutputKeyClass(Text.class);
 - job.setOutputValueClass(Text.class);
 - job.setMapperClass(MapT.class);
 - job.setReducerClass(ReduceT.class);
 - FileInputFormat.addInputPath(job, new Path(arg0[0]));
 - FileOutputFormat.setOutputPath(job, new Path(arg0[1]));
 - job.waitForCompletion(true);
 - return job.isSuccessful() ? 0 : 1;
 - }
 - /**
 - * @param args
 - */
 - public static void main(String[] args) {
 - // TODO Auto-generated method stub
 - if (args.length != 3) {
 - System.exit(-1);
 - }
 - try {
 - int res = ToolRunner.run(new Configuration(), new Test(), args);
 - System.exit(res);
 - } catch (Exception e) {
 - // TODO Auto-generated catch block
 - e.printStackTrace();
 - }
 - }
 - }
 
 上传运行:
将程序打包成jar文件,上传到机群之中。将测试数据也上传到HDFS分布式文件系统中。
命令运行截图如下图所示:
    运行结束后查看相应的HDFS文件系统,如下图所示:

到此一个完整的mapreduce程序就完成了,关于hadoop的学习,自己还将继续~
			