你好,游客 登录
背景:
阅读新闻

SparkStreaming向Hbase中写数据(一)

[日期:2015-08-26] 来源:肖的博客  作者: [字体: ]

在SparkStreaming中统计了数据之后,我们需要将结果写入外部文件系统。

本文,以向Hbase中写数据,为例,说一下,SparkStreaming怎么向Hbase中写数据。

首先,需要说一下,下面的这个方法。

foreachRDD(func)

最通用的输出操作,把func作用于从stream生成的每一个RDD。

注意:这个函数是在 运行streaming程序的driver进程 中执行的。

下面跟着思路,看一下,怎么优雅的向Hbase中写入数据

向外部写数据 常见的错误:

向外部数据库写数据,通常会建立连接,使用连接发送数据(也就是保存数据)。

开发者可能 在driver中创建连接,而在spark worker 中保存数据

例如:

dstream.foreachRDD { rdd =>
  val connection = createNewConnection()  // 这个会在driver中执行
  rdd.foreach { record =>
    connection.send(record) //这个会在 worker中执行
  }
}

上面这种写法是错误的!上面的写法,需要connection 对象被序列化,然后从driver发送到worker。

这样的connection是很少在机器之间传输的。知道这个问题后,我们可以写出以下的,修改后的代码:

dstream.foreachRDD { rdd =>
  rdd.foreach { record =>
    val connection = createNewConnection()
    connection.send(record)
    connection.close()
  }
}

很遗憾!这种写法也是不对的。这会导致,对于每条数据,都创建一个connection(创建connection是消耗资源的)。

下面的方法会好一些:

dstream.foreachRDD { rdd =>
  rdd.foreachPartition { partitionOfRecords =>
    val connection = createNewConnection()
    partitionOfRecords.foreach(record => connection.send(record))
    connection.close()
  }
}

上面的方法,使用 rdd.foreachPartition 创建一个connection 对象, 一个RDD分区中的所有数据,都使用这一个connection。

是不是,很机智啊~~~

事实上,还可以更机智点

在多个RDD之间,connection对象是可以重用的,所以可以创建一个连接池。如下:

dstream.foreachRDD { rdd =>
  rdd.foreachPartition { partitionOfRecords =>
    // ConnectionPool是一个静态的,延迟初始化的连接池
    val connection = ConnectionPool.getConnection()
    partitionOfRecords.foreach(record => connection.send(record))
    ConnectionPool.returnConnection(connection)  // 返回到池中 以便别人使用  }
}

注意:连接池中的连接应该是,应需求而延迟创建,并且,如果一段时间没用,就超时了(也就是关闭该连接)

到此,SparkStreaming向外部数据库写数据的原理就讲完了。

下篇文章,将,依据此原理,写一个生产环境中的demo

原文链接:http://www.xiaofateng.com/?p=1023&utm_source=tuicool





收藏 推荐 打印 | 录入: | 阅读:
本文评论   查看全部评论 (0)
表情: 表情 姓名: 字数
点评:
       
评论声明
  • 尊重网上道德,遵守中华人民共和国的各项有关法律法规
  • 承担一切因您的行为而直接或间接导致的民事或刑事法律责任
  • 本站管理人员有权保留或删除其管辖留言中的任意内容
  • 本站有权在网站内转载或引用您的评论
  • 参与本评论即表明您已经阅读并接受上述条款