你好,游客 登录
背景:
阅读新闻

大数据学习系列之三 ----- HBase Java Api 图文详解

[日期:2018-01-04] 来源:虚无境的博客  作者: [字体: ]

引言

在上一篇中 大数据学习系列之二 —– HBase环境搭建(单机) 中,成功搭建了hadoop+HBase的环境,本文则主要讲述使用Java 对HBase的一些操作。

一、事前准备

1.确认hadoop和hbase成功启动

2.确认防火墙是否关闭

3.maven所需要的依赖架包

<!--hadoop 相关架包 -->
		<dependency>
			<groupId>org.apache.hadoop</groupId>
			<artifactId>hadoop-common</artifactId>
			<version>2.8.2</version>
		</dependency>
		<dependency>
			<groupId>org.apache.hadoop</groupId>
			<artifactId>hadoop-client</artifactId>
			<version>2.8.2</version>
		</dependency>
		<dependency>
			<groupId>org.apache.hadoop</groupId>
			<artifactId>hadoop-hdfs</artifactId>
			<version>2.8.2</version>
		</dependency>
		<dependency>
			<groupId>org.apache.hadoop</groupId>
			<artifactId>hadoop-mapreduce-client-core</artifactId>
			<version>2.8.2</version>
		</dependency>
		<dependency>
			<groupId>org.apache.hadoop</groupId>
			<artifactId>hadoop-yarn-common</artifactId>
			<version>2.8.2</version>
		</dependency>

<!--HBase相关jar -->
		<dependency>
			<groupId>org.apache.hbase</groupId>
			<artifactId>hbase-hadoop-compat</artifactId>
			<version>1.3.1</version>
		</dependency>
		<dependency>
			<groupId>org.apache.hbase</groupId>
			<artifactId>hbase-server</artifactId>
			<version>1.1.2</version>
		</dependency>
<dependency>
			<groupId>org.apache.hbase</groupId>
			<artifactId>hbase-client</artifactId>
			<version>1.1.2</version>
		</dependency>
<dependency>
			<groupId>org.apache.hbase</groupId>
		    <artifactId>hbase-common</artifactId>
	        <version>1.1.2</version>
	    </dependency>

4.修改hosts文件(可选)

修改Windows C:\Windows\System32\drivers\etc 目录下的hosts文件,添加hbase的主机ip和主机名做关系映射。

192.168.238.128 master

注:如果不使用映射,那么将代码中的主机名改成IP即可。

5.HBase的原理

这篇文章介绍得很详细:

http://blog.csdn.net/woshiwanxin102213/article/details/17584043

二、测试示例

1.创建表

创建两张表 t_student、t_student_info 这两张表,并添加两个列族

创建成功之后可以在 hbase shell和16010界面中看到。

2.添加数据

成功创建表之后,在这两张表中插入数据。

因为HBase是动态数据库,所以列是可以新增的。

HBase的新增和修改是一个方法,数据相同的,后来的数据会将前面的覆盖掉!

3.查询数据

分别根据表名、行健、列族、列来查询

4.删除数据

删除其中的一条数据

三、代码示例

工具类

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.util.Bytes;

import com.alibaba.fastjson.JSONObject;

/**
 * 
 * Title: HBaseUtil
 * Description: HBase工具类 
 * Version:1.0.0
 * @author pancm
 * @date 2017年12月6日
 */
public class HBaseUtil {
	/** hadoop 连接 */
	private static Configuration conf = null;
	/** hbase 连接 */
	private static Connection con = null;
	/** 会话 */
	private static Admin admin = null;

	private static String ip ="master";
    private static String port ="2181";
    private static String port1 ="9001";
	   
   // 初始化连接
   static {
	   // 获得配制文件对象
       conf = HBaseConfiguration.create(); 
       // 设置配置参数
		conf.set("hbase.zookeeper.quorum", ip);
		conf.set("hbase.zookeeper.property.clientPort", port);  
		//如果hbase是集群,这个必须加上 
		//这个ip和端口是在hadoop/mapred-site.xml配置文件配置的
		conf.set("hbase.master", ip+":"+port1); 
   }
		

	/**
	 * 获取连接
	 * 
	 * @return
	 */
	public synchronized static Connection getConnection() {
		try {
			if (null == con || con.isClosed()) {
				// 获得连接对象
				con = ConnectionFactory.createConnection(conf);
			}
		} catch (IOException e) {
			System.out.println("获取连接失败!");
			e.printStackTrace();
		}

		return con;
	}

	/**
	 * 连接关闭
	 */
	public static void close() {
		try {
			if (admin != null) {
				admin.close();
			}
			if (con != null) {
				con.close();
			}
		} catch (IOException e) {
			System.out.println("连接关闭失败!");
			e.printStackTrace();
		}
	}

	/**
	 * 创建表
	 * 
	 * @param tableName
	 *            表名
	 * @param columnFamily
	 *            列族
	 */
	public static void creatTable(String tableName, String[] columnFamily) {
		if(null==tableName||tableName.length()==0){
			return;
		}
		if(null==columnFamily||columnFamily.length==0){
			return;
		}
		// 创建表名对象
		TableName tn = TableName.valueOf(tableName);
		// a.判断数据库是否存在
		try {
			// 获取会话
			admin = getConnection().getAdmin();
			if (admin.tableExists(tn)) {
				System.out.println(tableName + " 表存在,删除表....");
				// 先使表设置为不可编辑
				admin.disableTable(tn);
				// 删除表
				admin.deleteTable(tn);
				System.out.println("表删除成功.....");
			}
			// 创建表结构对象
			HTableDescriptor htd = new HTableDescriptor(tn);
			for (String str : columnFamily) {
				// 创建列族结构对象
				HColumnDescriptor hcd = new HColumnDescriptor(str);
				htd.addFamily(hcd);
			}
			// 创建表
			admin.createTable(htd);
			System.out.println(tableName + " 表创建成功!");
		} catch (IOException e) {
			e.printStackTrace();
		} finally {
			close();
		}
	}

	/**
	 * 数据单条插入或更新
	 * 
	 * @param tableName
	 *            表名
	 * @param rowKey
	 *            行健 (主键)
	 * @param family
	 *            列族
	 * @param qualifier
	 *            列
	 * @param value
	 *            存入的值
	 * @return
	 */
	public static void insert(String tableName, String rowKey, String family,
			String qualifier, String value) {
		Table t = null;
		try {
			t = getConnection().getTable(TableName.valueOf(tableName));
			Put put = new Put(Bytes.toBytes(rowKey));
			put.addColumn(Bytes.toBytes(family), Bytes.toBytes(qualifier),
					Bytes.toBytes(value));
			t.put(put);
			System.out.println(tableName + " 更新成功!");
		} catch (IOException e) {
			System.out.println(tableName + " 更新失败!");
			e.printStackTrace();
		} finally {
			close();
		}
	}

	/**
	 * 数据批量插入或更新
	 * 
	 * @param tableName
	 *            表名
	 * @param list
	 *            hbase的数据 
	 * @return
	 */
	public static void insertBatch(String tableName, List<?> list) {
		if (null == tableName ||tableName.length()==0) {
			return;
		}
		if( null == list || list.size() == 0){
			return;
		}
		Table t = null;
		Put put = null;
		JSONObject json = null;
		List<Put> puts = new ArrayList<Put>();
		try {
			t = getConnection().getTable(TableName.valueOf(tableName));
			for (int i = 0, j = list.size(); i < j; i++) {
				json = (JSONObject) list.get(i);
				put = new Put(Bytes.toBytes(json.getString("rowKey")));
				put.addColumn(Bytes.toBytes(json.getString("family")),
						Bytes.toBytes(json.getString("qualifier")),
						Bytes.toBytes(json.getString("value")));
				puts.add(put);
			}
			t.put(puts);
			System.out.println(tableName + " 更新成功!");
		} catch (IOException e) {
			System.out.println(tableName + " 更新失败!");
			e.printStackTrace();
		} finally {
			close();
		}
	}
	
	/**
	 * 数据删除 
	 * @param tableName 表名
	 * @param rowKey	行健
	 * @return
	 */
    public static void delete(String tableName, String rowKey) {
    	delete(tableName,rowKey,"","");
    }
	
	/**
	 * 数据删除 
	 * @param tableName 表名
	 * @param rowKey	行健
	 * @param family	列族
	 * @return
	 */
    public static void delete(String tableName, String rowKey, String family) {
    	delete(tableName,rowKey,family,"");
    }
	
	/**
	 * 数据删除 
	 * @param tableName 表名
	 * @param rowKey	行健
	 * @param family	列族
	 * @param qualifier 列
	 * @return
	 */
    public static void delete(String tableName, String rowKey, String family,
            String qualifier) {
    	if (null == tableName ||tableName.length()==0) {
			return;
		}
		if( null == rowKey || rowKey.length() == 0){
			return;
		}
    	Table t = null;
        try {
            t = getConnection().getTable(TableName.valueOf(tableName));
            Delete del = new Delete(Bytes.toBytes(rowKey));
            // 如果列族不为空
 			if (null != family && family.length() > 0) {
 				// 如果列不为空
 				if (null != qualifier && qualifier.length() > 0) {
 					del.addColumn(Bytes.toBytes(family),
 							Bytes.toBytes(qualifier));
 				} else {
 					del.addFamily(Bytes.toBytes(family));
 				}
 			}      
            t.delete(del);    
        } catch (IOException e) {
        	System.out.println("删除失败!");
            e.printStackTrace();
        } finally {
          close();
        }
    }
	
	/**
	 * 查询该表中的所有数据
	 * 
	 * @param tableName
	 *            表名
	 */
	public static void select(String tableName) {
		if(null==tableName||tableName.length()==0){
			return;
		}
		Table t = null;
		List<Map<String,Object>> list=new ArrayList<Map<String,Object>>();
		try {
			t = getConnection().getTable(TableName.valueOf(tableName));
			// 读取操作
			Scan scan = new Scan();
			// 得到扫描的结果集
			ResultScanner rs = t.getScanner(scan);
			if (null == rs ) {
				return;
			}
			for (Result result : rs) {
				// 得到单元格集合
				List<Cell> cs = result.listCells();
				if (null == cs || cs.size() == 0) {
					continue;
				}
				for (Cell cell : cs) {
					Map<String,Object> map=new HashMap<String, Object>();
					map.put("rowKey", Bytes.toString(CellUtil.cloneRow(cell)));// 取行健
					map.put("timestamp", cell.getTimestamp());// 取到时间戳
					map.put("family", Bytes.toString(CellUtil.cloneFamily(cell)));// 取到列族
					map.put("qualifier", Bytes.toString(CellUtil.cloneQualifier(cell)));// 取到列
					map.put("value", Bytes.toString(CellUtil.cloneValue(cell)));// 取到值
					list.add(map);
				}
			}
			System.out.println("查询的数据:"+list);
		} catch (IOException e) {
			System.out.println("查询失败!");
			e.printStackTrace();
		} finally {
			close();
		}
	}

	/**
	 * 根据表名和行健查询
	 * @param tableName
	 * @param rowKey
	 */
	public static void select(String tableName, String rowKey) {
		select(tableName,rowKey,"","");
	}
	
	/**
	 * 根据表名、行健和列族查询
	 * @param tableName
	 * @param rowKey
	 * @param family
	 */
	public static void select(String tableName, String rowKey, String family) {
		select(tableName,rowKey,family,"");
	}
	
	/**
	 * 根据条件明细查询
	 * 
	 * @param tableName
	 *            表名
	 * @param rowKey
	 *            行健 (主键)
	 * @param family
	 *            列族
	 * @param qualifier
	 *            列
	 */
	public static void select(String tableName, String rowKey, String family,
			String qualifier) {
		Table t = null;
		List<Map<String,Object>> list=new ArrayList<Map<String,Object>>();
		try {
			t = getConnection().getTable(TableName.valueOf(tableName));
			// 通过HBase中的 get来进行查询
			Get get = new Get(Bytes.toBytes(rowKey));
			// 如果列族不为空
			if (null != family && family.length() > 0) {
				// 如果列不为空
				if (null != qualifier && qualifier.length() > 0) {
					get.addColumn(Bytes.toBytes(family),
							Bytes.toBytes(qualifier));
				} else {
					get.addFamily(Bytes.toBytes(family));
				}
			}
			Result r = t.get(get);
			List<Cell> cs = r.listCells();
			if (null == cs || cs.size() == 0) {
				return;
			}
			for (Cell cell : cs) {
				Map<String,Object> map=new HashMap<String, Object>();
				map.put("rowKey", Bytes.toString(CellUtil.cloneRow(cell)));// 取行健
				map.put("timestamp", cell.getTimestamp());// 取到时间戳
				map.put("family", Bytes.toString(CellUtil.cloneFamily(cell)));// 取到列族
				map.put("qualifier", Bytes.toString(CellUtil.cloneQualifier(cell)));// 取到列
				map.put("value", Bytes.toString(CellUtil.cloneValue(cell)));// 取到值
				list.add(map);
			}
			System.out.println("查询的数据:"+list);
		} catch (IOException e) {
			System.out.println("查询失败!");
			e.printStackTrace();
		} finally {
			close();
		}
	}
}

测试代码

import java.util.ArrayList;
import java.util.List;

import com.alibaba.fastjson.JSONObject;
/**
 * 
* Title: hbaseTest
* Description: HBase 相关测试
* Version:1.0.0  
* @author pancm
* @date 2017年11月23日
 */
public class hbaseTest {
	
	public static void main(String[] args) {
		test();
	}

	/**
	 * 一些测试
	 */
	private static void test() {
		String tableName1="t_student",tableName2="t_student_info";
		String []columnFamily1={"st1","st2"};
		String []columnFamily2={"stf1","stf2"};
		HBaseUtil.creatTable(tableName1, columnFamily1);
		HBaseUtil.creatTable(tableName2, columnFamily2);
		
		HBaseUtil.insert(tableName1, "1001", columnFamily1[0], "name", "zhangsan");
		HBaseUtil.insert(tableName1, "1002", columnFamily1[0], "name", "lisi");
		HBaseUtil.insert(tableName1, "1001", columnFamily1[1], "age", "18");
		HBaseUtil.insert(tableName1, "1002", columnFamily1[1], "age", "20");
		
		HBaseUtil.insert(tableName2, "1001", columnFamily2[0], "phone", "123456");
		HBaseUtil.insert(tableName2, "1002", columnFamily2[0], "phone", "234567");
		HBaseUtil.insert(tableName2, "1001", columnFamily2[1], "mail", "123@163.com");
		HBaseUtil.insert(tableName2, "1002", columnFamily2[1], "mail", "234@163.com");
		
		HBaseUtil.select(tableName1); //查询该表所有数据
		HBaseUtil.select(tableName1, "1001"); //根据表名和行健查询
		HBaseUtil.select(tableName2, "1002",columnFamily2[0]); //根据表名、行健和列族查询
		HBaseUtil.select(tableName2, "1002",columnFamily2[1],"mail"); //根据表名、行健、列族、和列查询
		
		HBaseUtil.select(tableName1, "1002"); //根据表名和行健查询
		HBaseUtil.delete(tableName1, "1002", columnFamily1[0]);//删除数据
		HBaseUtil.select(tableName1, "1002"); //根据表名和行健查询
		
	}
}




收藏 推荐 打印 | 录入:Cstor | 阅读:
本文评论   查看全部评论 (0)
表情: 表情 姓名: 字数
点评:
       
评论声明
  • 尊重网上道德,遵守中华人民共和国的各项有关法律法规
  • 承担一切因您的行为而直接或间接导致的民事或刑事法律责任
  • 本站管理人员有权保留或删除其管辖留言中的任意内容
  • 本站有权在网站内转载或引用您的评论
  • 参与本评论即表明您已经阅读并接受上述条款