你好,游客 登录 注册 搜索
背景:
阅读新闻

去除Hadoop-Streaming行末多餘的TAB

[日期:2015-01-15] 来源:新青年  作者:新青年 [字体: ]

单位有一组业务一直都是使用Streaming压缩文本日志,大体上就是设置作业输出为BZ2格式,怎么输入就怎么输出,没有任何处理功能在里面。但是每行结尾都多出来一个TAB。终于,有一个业务需要使用TAB前的最后一个字段,不去掉不行了。

虽然是个小问题,但是网上搜了一圈,也没有很好的解决。很多人都遇到了,但是单位的业务比较特殊,只有map没有reduce。 http://stackoverflow.com/questions/20137618/hadoop-streaming-api-how-to- remove-unwanted-delimiters这个上面直接说“As I discussed with friends, there's no easy way to achieve the goal,...”。

Streaming有个特点,默认是按照TAB去区分Key和Value。如果没有设置Key字段的数目,默认一行里面第一个TAB之前的做Key,后面的是Value。如果没有找到Tab,就全都是Key字段,Value是空。之所以后面会多出个Tab,正是Key和Value之间的那个Tab。

首先是考察Streaming的Map,在PipeMapper.java。InputWriter处理输出,所以尝试实现自定义输出。在 MapReduce作业配置里面,stream.map.input.writer.class负责指定InputWriter是哪一个,默认是 TextInputWriter。Streaming在这里比较坑,增加-Dstream.map.input.writer.class=XXX的选项并不能令Streaming使用自定义的实现类,必须实现自己的IdentifierResolver,然后在其中对不同类型的输入设定不同类型的 InputWriter,而其中的输入类型,必须由stream.map.input选项传入。是否设置成功以作业运行时候JobTracker的配置参数表为准。

不巧的是,使用自定义的InputWriter代替TextInputWriter,行尾的Tab是没了,行首又多了个数字。估计是Hadoop给Mapper传入的Key被打印出来了。oooorz....不能瞎猜了,还是看看代码吧。

好在代码蛮短的还是。

Streaming会把本身、以及用户-file -cacheFile -cacheArchive 等选项指定的文件,打成一个Jar包提交到集群进行MR作业。把集群的输出,作为用户实现Mapper的输入;读取用户实现 Mapper的输出,作为整个Map作业的输出。Input/Output相对于用户自定义作业,Writer/Reader体现为Streaming的行为,因此是InputWriter和OutputReader。简单来讲,

Hadoop给出的(K,V)---streaming---> 用户自定义Mapper ---streaming--->Hadoop的Mapper输出

Streaming由PipeMapRunner启动作业,异步收集用户作业输出,进而向Hadoop匯报作业进度。整个作业的基础设置、作业提交都是由StreamJob类完成。

作业的执行是PipeMapRed/PipeMapper/PipReducer/PipCombiner这几个类。解决方案也就在这里。在 MROutputThread的run方法里面,outCollector.collect(key, value);这句之前,加上下面的代码片段即可。

          if (value instanceof Text) {
            if (value.toString().isEmpty())
              value = NullWritable.get();
          }

是不是很简单。

 

为什么这样做是可行的?还是源于org.apache.hadoop.mapred.TextOutputFormat。直接上代码。

package org.apache.hadoop.mapred;

import java.io.DataOutputStream;
import java.io.IOException;
import java.io.UnsupportedEncodingException;

import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.FSDataOutputStream;

import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.hadoop.util.*;

/** An {@link OutputFormat} that writes plain text files. 
 */
@InterfaceAudience.Public
@InterfaceStability.Stable
public class TextOutputFormat<K, V> extends FileOutputFormat<K, V> {

  protected static class LineRecordWriter<K, V>
    implements RecordWriter<K, V> {
    private static final String utf8 = "UTF-8";
    private static final byte[] newline;
    static {
      try {
        newline = "\n".getBytes(utf8);
      } catch (UnsupportedEncodingException uee) {
        throw new IllegalArgumentException("can't find " + utf8 + " encoding");
      }
    }

    protected DataOutputStream out;
    private final byte[] keyValueSeparator;

    public LineRecordWriter(DataOutputStream out, String keyValueSeparator) {
      this.out = out;
      try {
        this.keyValueSeparator = keyValueSeparator.getBytes(utf8);
      } catch (UnsupportedEncodingException uee) {
        throw new IllegalArgumentException("can't find " + utf8 + " encoding");
      }
    }

    public LineRecordWriter(DataOutputStream out) {
      this(out, "\t");
    }

    /**
     * Write the object to the byte stream, handling Text as a special
     * case.
     * @param o the object to print
     * @throws IOException if the write throws, we pass it on
     */
    private void writeObject(Object o) throws IOException {
      if (o instanceof Text) {
        Text to = (Text) o;
        out.write(to.getBytes(), 0, to.getLength());
      } else {
        out.write(o.toString().getBytes(utf8));
      }
    }

    public synchronized void write(K key, V value)
      throws IOException {
            boolean nullKey = key == null || key instanceof NullWritable;
      boolean nullValue = value == null || value instanceof NullWritable;
      if (nullKey && nullValue) {
        return; 
      } 
      if (!nullKey) {
        writeObject(key);
      }
      if (!(nullKey || nullValue)) {
        out.write(keyValueSeparator);
      }
      if (!nullValue) {
        writeObject(value);
      }
      out.write(newline);
    }   
      
    public synchronized void close(Reporter reporter) throws IOException {
      out.close();
    }
  }
    
  public RecordWriter<K, V> getRecordWriter(FileSystem ignored,
                                                  JobConf job,
                                                  String name,
                                                  Progressable progress)
    throws IOException {
    boolean isCompressed = getCompressOutput(job);
    String keyValueSeparator = job.get("mapreduce.output.textoutputformat.separator",
                                       "\t");
    if (!isCompressed) {
      Path file = FileOutputFormat.getTaskOutputPath(job, name);
      FileSystem fs = file.getFileSystem(job);
      FSDataOutputStream fileOut = fs.create(file, progress);
      return new LineRecordWriter<K, V>(fileOut, keyValueSeparator);
    } else { 
      Class<? extends CompressionCodec> codecClass =
        getOutputCompressorClass(job, GzipCodec.class);
      // create the named codec
      CompressionCodec codec = ReflectionUtils.newInstance(codecClass, job);
      // build the filename including the extension
      Path file = 
        FileOutputFormat.getTaskOutputPath(job,
        Path file =
        FileOutputFormat.getTaskOutputPath(job,
                                           name + codec.getDefaultExtension());
      FileSystem fs = file.getFileSystem(job);
      FSDataOutputStream fileOut = fs.create(file, progress);
      return new LineRecordWriter<K, V>(new DataOutputStream
                                        (codec.createOutputStream(fileOut)),
                                        keyValueSeparator);
    }
  }
}

注意到LineRecordWriter.write了么?

 

后记:

A. 网上很多是想办法修改分隔符,把TAB换成空字符。这是一个非常粗暴的做法,基本上就是埋坑!为什么呢?

日志文本内容可以是很丰富的,这次出问题是因为每行没有TAB。如果换做含有TAB的文本,把分隔符变为空串,就把日志中塬有的TAB去掉了。

B. 之所以这么搞,也是受到了stackoverflow的这个Q&A的启发。http://stackoverflow.com /questions/18133290/hadoop-streaming-remove-trailing-tab-from-reducer- output。类似的,Q&A也是采用修改分隔符的办法,是不可取的。但是仔细发现,是可以在自己重写的 TextOutputFormat<K,V>里,修改LineRecordWriter.write方法的。

重写TextOutputFormat是十分优雅的解决,看似修改了Hadoop本身的东西,但是在Streaming最新版没有加入这个fix之前,防止对每个版本的Streaming都要变更、重新编译打包。另外,Streaming不是独立的项目,编译它需要同时编译Hadoop!

加上下面这段

    if (!nullValue) {
      if (value instanceof Text) {
        if (value.toString().isEmpty())
          nullValue = true;
      }   
    }

C. 虽然是修改了Streaming代码,但是不需要考虑会影响同一机器所有用户的问题,也不用修改$HADOOP_HOME下的Streaming包。streaming提供了这个参数stream.shipped.hadoopstreaming。

D. 有些设置似乎是指对Reducer生效,对于这种只有Mapper的作业不起作用。比如

mapred.textoutputformat.ignoreseparator
mapred.textoutputformat.separator

设置了,没看到什么效果。

再有就是,命令行选项里面如果写-DXXX= \ 这样的语句,似乎也没有把这个参数设置为空串的效果,写-DXXX= "http://go.rritw.com/luckybins.blog.51cto.com/"也是一样。

本文出自 “新青年” 博客,请务必保留此出处http://go.rritw.com/luckybins.blog.51cto.com/786164/1601722





收藏 推荐 打印 | 录入:Cstor | 阅读:
本文评论   查看全部评论 (0)
表情: 表情 姓名: 字数
点评:
       
评论声明
  • 尊重网上道德,遵守中华人民共和国的各项有关法律法规
  • 承担一切因您的行为而直接或间接导致的民事或刑事法律责任
  • 本站管理人员有权保留或删除其管辖留言中的任意内容
  • 本站有权在网站内转载或引用您的评论
  • 参与本评论即表明您已经阅读并接受上述条款