背景
前段时间在为内部自研的计算框架设计算子层,参考对比了一些开源的计算框架的算子层,本文做一个粗粒度的梳理。
下面这张图是我对计算框架抽象层次的一个拆分,具体可以参考上周日杭州Spark meetup上我做的Spark SQL分享 slides。
Pig-latin
- A = load 'xx' AS (c1:int, c2:chararray, c3:float)
- B = GROUP A BY c1
- C = FOREACH B GENERATE group, COUNT(A)
- C = FOREACH B GENERATE $0. $1.c2
- X = COGROUP A by a1, B BY b1
- Y = JOIN A by a1 (LEFT|FULL|LEFT OUTER), B BY b1
Cascading
Hadoop MR上的封装,Twitter Summingbird正是基于Cascading的。 每个算子都是new出来的,Pipe实例被"迭代式"地传入新的算子里 。
- // define source and sink Taps.
- Scheme sourceScheme = new TextLine( new Fields( "line" ) );
- Tap source = new Hfs( sourceScheme, inputPath );
- Scheme sinkScheme = new TextLine( new Fields( "word", "count" ) );
- Tap sink = new Hfs( sinkScheme, outputPath, SinkMode.REPLACE );
- // the 'head' of the pipe assembly
- Pipe assembly = new Pipe( "wordcount" );
- // For each input Tuple
- // parse out each word into a new Tuple with the field name "word"
- // regular expressions are optional in Cascading
- String regex = "(?<!\\pL)(?=\\pL)[^ ]*(?<=\\pL)(?!\\pL)";
- Function function = new RegexGenerator( new Fields( "word" ), regex );
- assembly = new Each( assembly, new Fields( "line" ), function );
- // group the Tuple stream by the "word" value
- assembly = new GroupBy( assembly, new Fields( "word" ) );
- // For every Tuple group
- // count the number of occurrences of "word" and store result in
- // a field named "count"
- Aggregator count = new Count( new Fields( "count" ) );
- assembly = new Every( assembly, count );
- // initialize app properties, tell Hadoop which jar file to use
- Properties properties = new Properties();
- AppProps.setApplicationJarClass( properties, Main.class );
- // plan a new Flow from the assembly using the source and sink Taps
- // with the above properties
- FlowConnector flowConnector = new HadoopFlowConnector( properties );
- Flow flow = flowConnector.connect( "word-count", source, sink, assembly );
- // execute the flow, block until complete
- flow.complete();<strong>
- </strong>
Trident
在Storm上提供高级的抽象原语,延续Transactional Topology的exactly-once的语义,满足事务性。 原语过于抽象,构造过程充斥重复性的字段定义。
- TridentState urlToTweeters =
- topology.newStaticState(getUrlToTweetersState());
- TridentState tweetersToFollowers =
- topology.newStaticState(getTweeterToFollowersState());
- topology.newDRPCStream("reach")
- .stateQuery(urlToTweeters, new Fields("args"), new MapGet(), new Fields("tweeters"))
- .each(new Fields("tweeters"), new ExpandList(), new Fields("tweeter"))
- .shuffle()
- .stateQuery(tweetersToFollowers, new Fields("tweeter"), new MapGet(), new Fields("followers"))
- .parallelismHint(200)
- .each(new Fields("followers"), new ExpandList(), new Fields("follower"))
- .groupBy(new Fields("follower"))
- .aggregate(new One(), new Fields("one"))
- .parallelismHint(20)
- .aggregate(new Count(), new Fields("reach"));
RDD
Spark上的分布式弹性数据集,具备丰富的原语。 RDD原语的灵活性归功于Scala语言本身的FP性质以及语法糖,而其丰富性又源自Scala语言本身API的丰富性。Java难以实现如此强大的表达能力。但RDD确实是非常有参考价值的。
- scala> val textFile = sc.textFile("README.md")
- textFile: spark.RDD[String] = spark.MappedRDD@2ee9b6e3
- scala> textFile.count() // Number of items in this RDD
- res0: Long = 126
- scala> textFile.first() // First item in this RDD
- res1: String = # Apache Spark
- scala> val linesWithSpark = textFile.filter(line => line.contains("Spark"))
- linesWithSpark: spark.RDD[String] = spark.FilteredRDD@7dd4af09
- scala> textFile.filter(line => line.contains("Spark")).count() // How many lines contain "Spark"?
- res3: Long = 15
- scala> textFile.map(line => line.split(" ").size).reduce((a, b) => if (a > b) a else b)
- res4: Long = 15
- scala> val wordCounts = textFile.flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey((a, b) => a + b)
- wordCounts: spark.RDD[(String, Int)] = spark.ShuffledAggregatedRDD@71f027b8
- scala> wordCounts.collect()
- res6: Array[(String, Int)] = Array((means,1), (under,2), (this,3), (Because,1), (Python,2), (agree,1), (cluster.,1), ...)
SchemaRDD
Spark SQL里的"Table"型RDD,额外为SQL提供了一套DSL。 但是这套DSL只适合SQL,表达能力不够,偏"垂直"。
- val sqlContext = new org.apache.spark.sql.SQLContext(sc)
- // createSchemaRDD is used to implicitly convert an RDD to a SchemaRDD.
- import sqlContext.createSchemaRDD
- // Define the schema using a case class.
- case class Person(name: String, age: Int)
- // Create an RDD of Person objects and register it as a table.
- val people = sc.textFile("examples/src/main/resources/people.txt").map(_.split(",")).map(p => Person(p(0), p(1).trim.toInt))
- people.registerAsTable("people")
- // SQL statements can be run by using the sql methods provided by sqlContext.
- val teenagers = sqlContext.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19")
- // DSL: where(), select(), as(), join(), limit(), groupBy(), orderBy() etc.
- val teenagers = people.where('age >= 10).where('age <= 19).select('name)
- teenagers.map(t => "Name: " + t(0)).collect().foreach(println)
Apache Crunch
Google FlumeJava论文的开源实现,是一个标准的算子层,现在支持Hadoop任务和Spark任务。
Crunch 符合FlumeJava的设定,实现了PCollection和PTable这样的分布式、不可变数据表示集,实现了 parallelDo(),groupByKey(),combineValues(),flattern()四种基本原语,且基于此原语可以衍生出 count(),join(),top()。也实现了Deffered Evalution 以及 针对MSCR(MapShuffleCombineReduce) Operation的优化。Crunch的任务编写 严重依赖Hadoop,其本质是为了在批量计算框架上写MapReduce Pipeline。原语方面不够丰富,且parallelDo()不太适合流式语境。此外,其很多特性和功能是我们不需要具备的,但是抽象数据表示、接口 模型、流程控制是可以参考的。
Crunch 符合FlumeJava的设定,实现了PCollection和PTable这样的分布式、不可变数据表示集,实现了 parallelDo(),groupByKey(),combineValues(),flattern()四种基本原语,且基于此原语可以衍生出 count(),join(),top()。也实现了Deffered Evalution 以及 针对MSCR(MapShuffleCombineReduce) Operation的优化。Crunch的任务编写 严重依赖Hadoop,其本质是为了在批量计算框架上写MapReduce Pipeline。原语方面不够丰富,且parallelDo()不太适合流式语境。此外,其很多特性和功能是我们不需要具备的,但是抽象数据表示、接口 模型、流程控制是可以参考的。
- public class WordCount extends Configured implements Tool, Serializable {
- public int run(String[] args) throws Exception {
- // Create an object to coordinate pipeline creation and execution.
- Pipeline pipeline = new MRPipeline(WordCount.class, getConf());
- // Reference a given text file as a collection of Strings.
- PCollection<String> lines = pipeline.readTextFile(args[0]);
- PCollection<String> words = lines.parallelDo(new DoFn<String, String>() {
- public void process(String line, Emitter<String> emitter) {
- for (String word : line.split("\\s+")) {
- emitter.emit(word);
- }
- }
- }, Writables.strings()); // Indicates the serialization format
- PTable<String, Long> counts = words.count();
- // Instruct the pipeline to write the resulting counts to a text file.
- pipeline.writeTextFile(counts, args[1]);
- // Execute the pipeline as a MapReduce.
- PipelineResult result = pipeline.done();
- return result.succeeded() ? 0 : 1;
- }
- public static void main(String[] args) throws Exception {
- int result = ToolRunner.run(new Configuration(), new WordCount(), args);
- System.exit(result);
- }
- }
总结
最后这张图展示了Hadoop之上各种Data Pipeline项目的实现层次对比:
原文链接:http://blog.csdn.net/pelick/article/details/39076223