热门标签 | HotTags
当前位置:  开发笔记 > 运维 > 正文

用于大数据的并查集(基于HBase)的java类

在做推荐系统的时候想查看原始数据集中自然存在的类别有多少种,即找到一些子集,这些子集属于原始数据集,子集之间没有任何关联,而子集内部所有数据都有直接或间接的关联。首先考虑的是由于数据规模,读入内存是不可能的,所以要借助硬盘(虽然很不情愿)

在做推荐系统的时候想查看原始数据集中自然存在的类别有多少种,即找到一些子集,这些子集属于原始数据集,子集之间没有任何关联,而子集内部所有数据都有直接或间接的关联。 首先考虑的是由于数据规模,读入内存是不可能的,所以要借助硬盘(虽然很不情愿)

在做推荐系统的时候想查看原始数据集中自然存在的类别有多少种,即找到一些子集,这些子集属于原始数据集,子集之间没有任何关联,而子集内部所有数据都有直接或间接的关联。

首先考虑的是由于数据规模,读入内存是不可能的,所以要借助硬盘(虽然很不情愿)。既然是借助硬盘,那就要文件存取。而又由于在处理过程中需要快速的查找数据是否存在于某个集合内和将数据集合关联等操作,选择使用并查集。

这样选择之后算是有一个解决方案了,但是还需要最后一个关键的部分,就是需要建立文件索引和缓存机制以便快速进行合并和查询过程。这里选择使用的工具还是最趁手的hbase,很好的解决这两个问题。

这个类主要解决的问题就是原始数据的聚类,有关联的聚在一起。核心的两个方法是:

public byte[] findSet(byte[] pos);
public void union(byte[] pos1, byte[] pos2);

其中还有一个

public byte[] findSet(byte[] pos)

是递归实现。两个方法都使用了路径压缩进行优化。union()方法的两个参数有顺序要求,其作用是后者集合连接到前者集合的根节点。

最后,计算的并行是使用MapReduce计算框架。

package recommendsystem;
?
import java.io.IOException;
import java.lang.reflect.Array;
?
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.util.Bytes;
?
public class UnionFindSet {
	private Configuration _conf;
	private HBaseAdmin _hbAdmin;
	private HTable _unionTable;
?
	public static void main(String[] args) throws IOException {
		UnionFindSet ufs = new UnionFindSet("test");
		ufs.union(Bytes.toBytes("7"), Bytes.toBytes("8"));
		ufs.union(Bytes.toBytes("5"), Bytes.toBytes("9"));
		ufs.union(Bytes.toBytes("3"), Bytes.toBytes("7"));
		ufs.union(Bytes.toBytes("4"), Bytes.toBytes("6"));
		ufs.union(Bytes.toBytes("1"), Bytes.toBytes("7"));
		for (int i = 1; i <10; i++) {
			System.out.println(Bytes.toString(ufs.findSet(Bytes.toBytes(String
					.valueOf(i)))));
		}
?
	}
?
	public UnionFindSet(String tableName) throws IOException {
		_cOnf= HBaseConfiguration.create();
		init(tableName);
	}
?
	public UnionFindSet(Configuration conf, String tableName)
			throws IOException {
		// _cOnf=new Configuration(conf);
		_cOnf= conf;
		init(tableName);
	}
?
	public UnionFindSet(HTable htable) {
		_uniOnTable= htable;
	}
?
	public void clear() throws IOException {
		_hbAdmin.close();
		_unionTable.close();
	}
?
	public int printSets() throws IOException {
		Scan scan = new Scan();
		scan.addColumn(GlobalName.CLASSIFICATION_FAMILY, GlobalName.NULL);
		scan.setCaching(1000);
		ResultScanner rs = _unionTable.getScanner(scan);
		int count = 0;
		for (Result r : rs) {
			if (arrayCompare(r.getRow(), r.value()) == 0) {
				System.out.println(Bytes.toString(r.getRow()));
				count++;
			}
		}
		return count;
	}
?
	public ResultScanner getSets() throws IOException {
		Scan scan = new Scan();
		scan.addColumn(GlobalName.CLASSIFICATION_FAMILY, GlobalName.NULL);
		scan.setCaching(1000);
		return _unionTable.getScanner(scan);
	}
?
	public byte[] RecursionFindSet(byte[] pos) throws IOException {
		byte[] tmpRes, tmpPre = pos;
		Result r = queryUnionHBase(pos);
		if (r.isEmpty()) {
			insertUnionHBase(pos, pos);
			return pos;
		}
		tmpRes = r.value();
		if (arrayCompare(r.value(), pos) != 0) {
			tmpPre = findSet(tmpRes);
			insertUnionHBase(pos, tmpPre);
		}
		return tmpPre;
	}
?
	public byte[] findSet(byte[] pos) throws IOException {
		byte[] tmpRes = pos, tmpPre;
		Result r = queryUnionHBase(pos);
		if (r.isEmpty()) {
			insertUnionHBase(pos, pos);
			return pos;
		}
		tmpPre = r.value();
		while (arrayCompare(tmpRes, tmpPre) != 0) {
			r = queryUnionHBase(tmpPre);
			tmpRes = tmpPre;
			tmpPre = r.value();
		}
		tmpRes = pos;
		while (arrayCompare(tmpPre, tmpRes) != 0) {
			insertUnionHBase(tmpPre, tmpRes);
			r = queryUnionHBase(tmpRes);
			tmpRes = r.value();
		}
		return tmpPre;
	}
?
	public void union(byte[] pos1, byte[] pos2) throws IOException {
		byte[] t1 = findSet(pos1);
		byte[] t2 = findSet(pos2);
		if (arrayCompare(t1, t2) == 0)
			return;
		insertUnionHBase(t2, t1);
	}
?
	private void init(String tableName) throws IOException {
		_hbAdmin = new HBaseAdmin(_conf);
		if (_hbAdmin.tableExists(tableName)) {
			_hbAdmin.disableTable(tableName);
			_hbAdmin.deleteTable(tableName);
		}
		createDB(tableName);
		_uniOnTable= new HTable(_conf, tableName);
	}
?
	private void createDB(String tableName) throws IOException {
		HTableDescriptor hd = new HTableDescriptor(tableName);
		hd.addFamily(new HColumnDescriptor(GlobalName.CLASSIFICATION_FAMILY));
		CreateDB.createTable(_hbAdmin, hd,
				CreateDB.getHexSplits("0", "110000000", 10));
	}
?
	private void insertUnionHBase(byte[] row, byte[] value) throws IOException {
		Put put = new Put(value);
		put.add(GlobalName.CLASSIFICATION_FAMILY, GlobalName.NULL, row);
		_unionTable.put(put);
	}
?
	private Result queryUnionHBase(byte[] row) throws IOException {
		Get get = new Get(row);
		get.addColumn(GlobalName.CLASSIFICATION_FAMILY, GlobalName.NULL);
		return _unionTable.get(get);
	}
?
	private int arrayCompare(byte[] o1, byte[] o2) {
		int len = Array.getLength(o1);
		if (len != Array.getLength(o2)) {
			return -1;
		}
		for (int i = 0; i 
    
推荐阅读
author-avatar
平凡小柏
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有