你好,游客 登录
背景:
阅读新闻

常见计算框架算子层对比

[日期:2014-09-29] 来源:CSDN博客  作者:张包峰 [字体: ]

背景

      前段时间在为内部自研的计算框架设计算子层,参考对比了一些开源的计算框架的算子层,本文做一个粗粒度的梳理。

      下面这张图是我对计算框架抽象层次的一个拆分,具体可以参考上周日杭州Spark meetup上我做的Spark SQL分享 slides。

Pig-latin

      hadoop MR上的DSL,面向过程,适用于large-scale的数据分析语法很美,可惜只适合CLI 。
[java] view plaincopy在CODE上查看代码片派生到我的代码片
  1. A = load 'xx' AS (c1:int, c2:chararray, c3:float
  2. B = GROUP A BY c1 
  3. C = FOREACH B GENERATE group, COUNT(A) 
  4. C = FOREACH B GENERATE $0. $1.c2 
  5.  
  6. X = COGROUP A by a1, B BY b1 
  7. Y = JOIN A by a1 (LEFT|FULL|LEFT OUTER), B BY b1 

Cascading

      Hadoop MR上的封装,Twitter Summingbird正是基于Cascading的。 每个算子都是new出来的,Pipe实例被"迭代式"地传入新的算子里 。
[java] view plaincopy在CODE上查看代码片派生到我的代码片
  1. // define source and sink Taps. 
  2. Scheme sourceScheme = new TextLine( new Fields( "line" ) ); 
  3. Tap source = new Hfs( sourceScheme, inputPath ); 
  4. Scheme sinkScheme = new TextLine( new Fields( "word""count" ) ); 
  5. Tap sink = new Hfs( sinkScheme, outputPath, SinkMode.REPLACE ); 
  6.  
  7. // the 'head' of the pipe assembly 
  8. Pipe assembly = new Pipe( "wordcount" ); 
  9.  
  10. // For each input Tuple 
  11. // parse out each word into a new Tuple with the field name "word" 
  12. // regular expressions are optional in Cascading 
  13. String regex = "(?<!\\pL)(?=\\pL)[^ ]*(?<=\\pL)(?!\\pL)"
  14. Function function = new RegexGenerator( new Fields( "word" ), regex ); 
  15. assembly = new Each( assembly, new Fields( "line" ), function ); 
  16.  
  17. // group the Tuple stream by the "word" value 
  18. assembly = new GroupBy( assembly, new Fields( "word" ) ); 
  19.  
  20. // For every Tuple group 
  21. // count the number of occurrences of "word" and store result in 
  22. // a field named "count" 
  23. Aggregator count = new Count( new Fields( "count" ) ); 
  24. assembly = new Every( assembly, count ); 
  25.  
  26. // initialize app properties, tell Hadoop which jar file to use 
  27. Properties properties = new Properties(); 
  28. AppProps.setApplicationJarClass( properties, Main.class ); 
  29.  
  30. // plan a new Flow from the assembly using the source and sink Taps 
  31. // with the above properties 
  32. FlowConnector flowConnector = new HadoopFlowConnector( properties ); 
  33. Flow flow = flowConnector.connect( "word-count", source, sink, assembly ); 
  34.  
  35. // execute the flow, block until complete 
  36. flow.complete();<strong> 
  37. </strong> 

Trident

      在Storm上提供高级的抽象原语,延续Transactional Topology的exactly-once的语义,满足事务性。 原语过于抽象,构造过程充斥重复性的字段定义。
[java] view plaincopy在CODE上查看代码片派生到我的代码片
  1. TridentState urlToTweeters = 
  2.        topology.newStaticState(getUrlToTweetersState()); 
  3. TridentState tweetersToFollowers = 
  4.        topology.newStaticState(getTweeterToFollowersState()); 
  5.  
  6. topology.newDRPCStream("reach"
  7.        .stateQuery(urlToTweeters, new Fields("args"), new MapGet(), new Fields("tweeters")) 
  8.        .each(new Fields("tweeters"), new ExpandList(), new Fields("tweeter")) 
  9.        .shuffle() 
  10.        .stateQuery(tweetersToFollowers, new Fields("tweeter"), new MapGet(), new Fields("followers")) 
  11.        .parallelismHint(200
  12.        .each(new Fields("followers"), new ExpandList(), new Fields("follower")) 
  13.        .groupBy(new Fields("follower")) 
  14.        .aggregate(new One(), new Fields("one")) 
  15.        .parallelismHint(20
  16.        .aggregate(new Count(), new Fields("reach")); 

RDD

      Spark上的分布式弹性数据集,具备丰富的原语。 RDD原语的灵活性归功于Scala语言本身的FP性质以及语法糖,而其丰富性又源自Scala语言本身API的丰富性。Java难以实现如此强大的表达能力。但RDD确实是非常有参考价值的。
[java] view plaincopy在CODE上查看代码片派生到我的代码片
  1. scala> val textFile = sc.textFile("README.md"
  2. textFile: spark.RDD[String] = spark.MappedRDD@2ee9b6e3 
  3.  
  4. scala> textFile.count() // Number of items in this RDD 
  5. res0: Long = 126 
  6.  
  7. scala> textFile.first() // First item in this RDD 
  8. res1: String = # Apache Spark 
  9.  
  10. scala> val linesWithSpark = textFile.filter(line => line.contains("Spark")) 
  11. linesWithSpark: spark.RDD[String] = spark.FilteredRDD@7dd4af09 
  12.  
  13. scala> textFile.filter(line => line.contains("Spark")).count() // How many lines contain "Spark"? 
  14. res3: Long = 15 
  15.  
  16. scala> textFile.map(line => line.split(" ").size).reduce((a, b) => if (a > b) a else b) 
  17. res4: Long = 15 
  18.  
  19. scala> val wordCounts = textFile.flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey((a, b) => a + b) 
  20. wordCounts: spark.RDD[(String, Int)] = spark.ShuffledAggregatedRDD@71f027b8 
  21.  
  22. scala> wordCounts.collect() 
  23. res6: Array[(String, Int)] = Array((means,1), (under,2), (this,3), (Because,1), (Python,2), (agree,1), (cluster.,1), ...)

SchemaRDD

      Spark SQL里的"Table"型RDD,额外为SQL提供了一套DSL。 但是这套DSL只适合SQL,表达能力不够,偏"垂直"。
[java] view plaincopy在CODE上查看代码片派生到我的代码片
  1. val sqlContext = new org.apache.spark.sql.SQLContext(sc) 
  2. // createSchemaRDD is used to implicitly convert an RDD to a SchemaRDD. 
  3. import sqlContext.createSchemaRDD 
  4.  
  5. // Define the schema using a case class. 
  6. case class Person(name: String, age: Int) 
  7.  
  8. // Create an RDD of Person objects and register it as a table. 
  9. val people = sc.textFile("examples/src/main/resources/people.txt").map(_.split(",")).map(p => Person(p(0), p(1).trim.toInt)) 
  10. people.registerAsTable("people"
  11.  
  12. // SQL statements can be run by using the sql methods provided by sqlContext. 
  13. val teenagers = sqlContext.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19"
  14.  
  15. // DSL: where(), select(), as(), join(), limit(), groupBy(), orderBy() etc. 
  16. val teenagers = people.where('age >= 10).where('age <= 19).select('name) 
  17. teenagers.map(t => "Name: " + t(0)).collect().foreach(println)

Apache Crunch

      Google FlumeJava论文的开源实现,是一个标准的算子层,现在支持Hadoop任务和Spark任务。
      Crunch 符合FlumeJava的设定,实现了PCollection和PTable这样的分布式、不可变数据表示集,实现了 parallelDo(),groupByKey(),combineValues(),flattern()四种基本原语,且基于此原语可以衍生出 count(),join(),top()。也实现了Deffered Evalution 以及 针对MSCR(MapShuffleCombineReduce) Operation的优化。Crunch的任务编写 严重依赖Hadoop,其本质是为了在批量计算框架上写MapReduce Pipeline。原语方面不够丰富,且parallelDo()不太适合流式语境。此外,其很多特性和功能是我们不需要具备的,但是抽象数据表示、接口 模型、流程控制是可以参考的。
[java] view plaincopy在CODE上查看代码片派生到我的代码片
  1. public class WordCount extends Configured implements Tool, Serializable { 
  2.   public int run(String[] args) throws Exception { 
  3.     // Create an object to coordinate pipeline creation and execution. 
  4.     Pipeline pipeline = new MRPipeline(WordCount.class, getConf()); 
  5.     // Reference a given text file as a collection of Strings. 
  6.     PCollection<String> lines = pipeline.readTextFile(args[0]); 
  7.  
  8.     PCollection<String> words = lines.parallelDo(new DoFn<String, String>() { 
  9.       public void process(String line, Emitter<String> emitter) { 
  10.         for (String word : line.split("\\s+")) { 
  11.           emitter.emit(word); 
  12.         } 
  13.       } 
  14.     }, Writables.strings()); // Indicates the serialization format 
  15.  
  16.     PTable<String, Long> counts = words.count(); 
  17.     // Instruct the pipeline to write the resulting counts to a text file. 
  18.     pipeline.writeTextFile(counts, args[1]); 
  19.     // Execute the pipeline as a MapReduce. 
  20.     PipelineResult result = pipeline.done(); 
  21.  
  22.     return result.succeeded() ? 0 : 1
  23.   } 
  24.  
  25.   public static void main(String[] args) throws Exception { 
  26.     int result = ToolRunner.run(new Configuration(), new WordCount(), args); 
  27.     System.exit(result); 
  28.   } 

总结

      最后这张图展示了Hadoop之上各种Data Pipeline项目的实现层次对比: 

  原文链接:http://blog.csdn.net/pelick/article/details/39076223





收藏 推荐 打印 | 录入: | 阅读:
相关新闻      
本文评论   查看全部评论 (0)
表情: 表情 姓名: 字数
点评:
       
评论声明
  • 尊重网上道德,遵守中华人民共和国的各项有关法律法规
  • 承担一切因您的行为而直接或间接导致的民事或刑事法律责任
  • 本站管理人员有权保留或删除其管辖留言中的任意内容
  • 本站有权在网站内转载或引用您的评论
  • 参与本评论即表明您已经阅读并接受上述条款