你好,游客 登录 注册 搜索
背景:
阅读新闻

Spark读HBase多表组成一个RDD

[日期:2016-01-28] 来源:Linux公社  作者: [字体: ]

环境:Spark-1.5.0 HBase-1.0.0。

场景:HBase中按天分表存数据,要求将任意时间段的数据合并成一个RDD以做后续计算。

尝试1: 寻找一次读取多个表的API,找到最接近的是一个叫MultiTableInputFormat的东西,它在MapReduce中使用良好,

但没有找到用于RDD读HBase的方法。

尝试2: 每个表生成一个RDD,再用union合并,代码逻辑如下:

var totalRDD = xxx // 读取第一张表

for { // 循环读表并合并到totalRDD

val sRDD = xxx

totalRDD.union(sRDD)

}

代码放到集群上执行,totalRDD并不是正确的union结果,用var还真是不行。

尝试3: 思路类似2,但使用SparkContext.union来一次合并多个RDD,代码逻辑如下:

var rddSet: xxx = Set() // 创建RDD列表

dateSet.foreach(date => { // 将所有表的RDD放入列表中

val sRDD = xxx

rddSet += sRDD

}

val totalRDD = sc.union(rddSet.toSeq) // 合并列表中的所有RDD

完整代码如下:

import java.text.SimpleDateFormat

import org.apache.Hadoop.hbase.client.Result

import org.apache.hadoop.hbase.io.ImmutableBytesWritable

import org.apache.spark.rdd.RDD

import org.apache.spark.{SparkContext, SparkConf}

import org.apache.hadoop.hbase.HBaseConfiguration

import org.apache.hadoop.hbase.mapreduce.TableInputFormat

import scala.collection.mutable.Set

/**

* 时间处理类

*/

object Htime {

/**

* 根据起止日期获取日期列表

* 例如起止时间为20160118,20160120,那么日期列表为(20160118,20160119,20160120)

*

* @param sDate 开始日期

* @param eDate 结束日期

* @return 日期列表

*/

def getDateSet(sDate:String, eDate:String): Set[String] = {

// 定义要生成的日期列表

var dateSet: Set[String] = Set()

// 定义日期格式val sdf = new SimpleDateFormat("yyyyMMdd")

// 按照上边定义的日期格式将起止时间转化成毫秒数

val sDate_ms = sdf.parse(sDate).getTime

val eDate_ms = sdf.parse(eDate).getTime

// 计算一天的毫秒数用于后续迭代val day_ms = 24*60*60*1000

// 循环生成日期列表

var tm = sDate_ms

while (tm <= eDate_ms) {

val dateStr = sdf.format(tm)

dateSet += dateStr

tm = tm + day_ms

}

// 日期列表作为返回

dateSet

}

}

/**

* 从HBase中读取行为数据计算人群分类

*/

object Classify {

/**

* @param args 命令行参数,第一个参数为行为数据开始日期,第二个为结束日期,例如20160118

*/

def main(args: Array[String]) {

// 命令行参数个数必须为2

if (args.length != 2) {

System.err.println("参数个数错误")

System.err.println("Usage: Classify <开始日期> <结束日期>")

System.exit(1)

}

// 获取命令行参数中的行为数据起止日期

val startDate = args(0)

val endDate  = args(1)

// 根据起止日志获取日期列表

// 例如起止时间为20160118,20160120,那么日期列表为(20160118,20160119,20160120)

val dateSet = Htime.getDateSet(startDate, endDate)

// Spark上下文

val sparkConf = new SparkConf().setAppName("Classify")

val sc = new SparkContext(sparkConf)

// 初始化HBase配置val conf = HBaseConfiguration.create()

// 按照日期列表读出多个RDD存在一个Set中,再用SparkContext.union()合并成一个RDD

var rddSet: Set[RDD[(ImmutableBytesWritable, Result)] ] = Set()

dateSet.foreach(date => {

conf.set(TableInputFormat.INPUT_TABLE, "behaviour_test_" + date) // 设置表名

val bRdd: RDD[(ImmutableBytesWritable, Result)] = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat],

classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],

classOf[org.apache.hadoop.hbase.client.Result])

rddSet += bRdd

})

 

val behavRdd = sc.union(rddSet.toSeq)

 

behavRdd.collect().foreach(println)

}

}





收藏 推荐 打印 | 录入:elainebo | 阅读:
本文评论   查看全部评论 (0)
表情: 表情 姓名: 字数
点评:
       
评论声明
  • 尊重网上道德,遵守中华人民共和国的各项有关法律法规
  • 承担一切因您的行为而直接或间接导致的民事或刑事法律责任
  • 本站管理人员有权保留或删除其管辖留言中的任意内容
  • 本站有权在网站内转载或引用您的评论
  • 参与本评论即表明您已经阅读并接受上述条款