热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

基于memcachedforjava实现通用分布式缓存和集群分布式缓存

本文参考借鉴:http:guazi.iteye.comblog107164前提:基于memcachedclientforjava的基础进行的二次封装,实现缓存存储的两种模式:通用分布式

本文参考借鉴:http://guazi.iteye.com/blog/107164


前提:基于memcached client for java 的基础进行的二次封装,实现缓存存储的两种模式:通用分布式缓存和集群分布式缓存。以下是对于memcached client for Java 二次封装的UML图。


对于memcached的客户端初始化在CacheFactory中通过读取配置文件cacheConfig.xml完成。通用分布式缓存,只是一个简单的封装,利用memcached client for java提供的分布式支持来实现,这里主要说一下clusterCache的实现思想:对存入的缓存对象的key值进行一次hash,找到对应的服务器存入,然后根据一定的规则再次进行hash,找到另外一个不同的服务器存入,取缓存时,先对要取的key值进行一次hash,找到主服务器,如果获取失败或者获取到的值为null,就对key进行再次hash,找到其从服务器,从这台服务器取缓存结果(如果取到结果就异步的更新到主服务器),这样就形成了主从式集群缓存。特点是:没有绝对的主节点和从节点,正常情况下所有服务器共同承担缓存服务器,在一台服务器出现异常时其他服务器共同承担增加的访问压力。

拓扑结构如下:


源代码

package com.yx.cache;

public interface Cache {
/**
* 获取缓存中的数据
*
* @param key
* @return
*/
T get(String key);

/**
* 把数据放入缓存 如果存在与key对应的值,则返回失败
*
* @param key
* @param value
* @return
*/
boolean add(String key, T value);

/**
* 把数据放入缓存 如果存在与key对应的值,则覆盖原有的值
*
* @param key
* @param value
* @return
*/
boolean set(String key, T value);

/**
* 缓存更新 如果不存在与key对应的缓存值,则不更新
*
* @param key
* @param value
* @return
*/
boolean update(String key, T value);

/**
* 删除缓存
*
* @param key
* @return
*/
boolean delete(String key);
}

通用分布式缓存实现类:

package com.yx.cache;

import com.danga.MemCached.MemCachedClient;

public class CommonCache implements Cache {
private static MemCachedClient memCachedClient = null;

private String base = null;

CommonCache(Class t, MemCachedClient client) {
memCachedClient = client;
base = t.getSimpleName() + "-";
}

public T get(String key) {
return (T) memCachedClient.get(base + key);
}

public boolean set(String key, T value) {
return memCachedClient.set(base + key, value);
}

@Override
public boolean update(String key, T value) {
return memCachedClient.replace(base + key, value);
}

@Override
public boolean delete(String key) {
return memCachedClient.delete(base + key);
}

@Override
public boolean add(String key, T value) {
return memCachedClient.add(base + key, value);
}
}


集群分布式缓存实现类

package com.yx.cache;

import com.danga.MemCached.MemCachedClient;
import com.schooner.MemCached.SchoonerSockIOPool;
import com.yx.cache.util.HashCodeUtil;
import com.yx.task.ThreadPoolManager;

public class ClusterCache implements Cache {
private static MemCachedClient memCachedClient = null;

private static ThreadPoolManager taskManager = ThreadPoolManager
.getInstance("cache");

private String base = null;

private SchoonerSockIOPool pool = SchoonerSockIOPool.getInstance();

ClusterCache(Class t, MemCachedClient client) {
memCachedClient = client;
base = "i-" + t.getSimpleName() + "-";
}

@Override
public T get(String key) {
T value = null;
if (key == null) {
return null;
}
key = base + key;
if (pool.getServers().length <2) {
value = (T) memCachedClient.get(key);
} else {
int hashCode = HashCodeUtil.getHash(key);

value = (T) memCachedClient.get(key, hashCode);
if (value == null) {
hashCode = this.getRehashCode(key, hashCode);

value = (T) memCachedClient.get(key, hashCode);
if (value != null) {// 如果在另外一台服务器上取到了缓存,则恢复第一台服务器
UpdateTask task = new UpdateTask(key, value);
taskManager.submit(task);
}

}
}
return value;
}

@Override
public boolean set(String key, T value) {
if (key == null) {
return false;
}
key = base + key;
boolean result = false;
if (pool.getServers().length <2) {
result = memCachedClient.set(key, value);
} else {
int hashCode = HashCodeUtil.getHash(key);

result = memCachedClient.set(key, value, hashCode);
// if (result) {
hashCode = getRehashCode(key, hashCode);
memCachedClient.set(key, value, hashCode);

// }
}
return result;
}

private int getRehashCode(String key, int oldHashcode) {
String host = pool.getHost(key, oldHashcode);
int rehashTries = 0;
// if (result) {
int hashCode = HashCodeUtil.getHash(rehashTries + key);
while (host.equals(pool.getHost(key, hashCode))) {
rehashTries++;
hashCode = HashCodeUtil.getHash(rehashTries + key);
}
return hashCode;
}

@Override
public boolean update(String key, T value) {
if (key == null) {
return false;
}
key = base + key;
boolean result = false;
if (pool.getServers().length <2) {
result = memCachedClient.replace(key, value);
} else {
int hashCode = HashCodeUtil.getHash(key);

result = memCachedClient.replace(key, value, hashCode);
// if (result) {
hashCode = getRehashCode(key, hashCode);
memCachedClient.replace(key, value, hashCode);

// }
}
return result;
}

@Override
public boolean delete(String key) {
if (key == null) {
return false;
}
key = base + key;
boolean result = false;
if (pool.getServers().length <2) {
result = memCachedClient.delete(key);
} else {
int hashCode = HashCodeUtil.getHash(key);

result = memCachedClient.delete(key, hashCode, null);
// if (result) {
hashCode = this.getRehashCode(key, hashCode);

memCachedClient.delete(key, hashCode, null);

// }
}
return result;
}

@Override
public boolean add(String key, T value) {
if (key == null) {
return false;
}
key = base + key;
boolean result = false;
if (pool.getServers().length <2) {
result = memCachedClient.add(key, value);
} else {
int hashCode = HashCodeUtil.getHash(key);

result = memCachedClient.add(key, value, hashCode);
// if (result) {
hashCode = getRehashCode(key, hashCode);
memCachedClient.add(key, value, hashCode);

// }
}
return result;
}

static class UpdateTask implements Runnable {

private String key;
private Object value;

UpdateTask(String key, Object value) {
this.key = key;
this.value = value;
}

@Override
public void run() {
memCachedClient.set(key, value, HashCodeUtil.getHash(key));
}

}
}


基于工厂模式创建memcached 存储模式(通用模式还是集群模式)

package com.yx.cache;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

import com.danga.MemCached.MemCachedClient;
import com.danga.MemCached.SockIOPool;
import com.yx.cache.util.ConfigUtil;

public class CacheFactory {
private static MemCachedClient memCachedClient = null;

@SuppressWarnings("rawtypes")
private static final Map map = new ConcurrentHashMap();

static {
String serverStr = ConfigUtil.getConfigValue("servers", "");

List servers = new ArrayList();
for (String s : serverStr.split(",")) {
s = s.trim();
if (!"".equals(s)) {
servers.add(s);
}
}
if (servers.size() <1) {
throw new RuntimeException("cache 初始化失败!");
}
SockIOPool pool = SockIOPool.getInstance();
pool.setServers(servers.toArray(new String[] {}));
pool.setFailover(Boolean.valueOf(ConfigUtil.getConfigValue("failover",
"true")));
pool.setInitConn(Integer.valueOf(ConfigUtil.getConfigValue("initConn",
"100")));
pool.setMinConn(Integer.valueOf(ConfigUtil.getConfigValue("minConn",
"25")));
pool.setMaxConn(Integer.valueOf(ConfigUtil.getConfigValue("maxConn",
"250")));
pool.setMaintSleep(Integer.valueOf(ConfigUtil.getConfigValue(
"maintSleep", "30")));
pool.setNagle(Boolean.valueOf(ConfigUtil.getConfigValue("nagle",
"false")));// 关闭nagle算法
pool.setSocketTO(Integer.valueOf(ConfigUtil.getConfigValue("socketTO",
"3000")));
pool.setAliveCheck(Boolean.valueOf(ConfigUtil.getConfigValue(
"aliveCheck", "true")));
pool.setHashingAlg(Integer.valueOf(ConfigUtil.getConfigValue(
"hashingAlg", "0")));
pool.setSocketConnectTO(Integer.valueOf(ConfigUtil.getConfigValue(
"socketConnectTO", "3000")));
String wStr = ConfigUtil.getConfigValue("weights", "");
List weights = new ArrayList();
for (String s : wStr.split(",")) {
s = s.trim();
if (!"".equals(s)) {
weights.add(Integer.valueOf(s));

}
}
if (weights.size() == servers.size()) {
pool.setWeights(weights.toArray(new Integer[] {}));
}
pool.initialize();
memCachedClient = new MemCachedClient();

}

public static Cache getCommonCache(Class t) {
Cache cache = map.get(t.getName());
if (cache == null) {
cache = createCommonCache(t);
}
return cache;
}

public static Cache getClusterCache(Class t) {
Cache cache = map.get("i-" + t.getName());
if (cache == null) {
cache = createClusterCache(t);
}
return cache;
}

private static synchronized Cache createCommonCache(Class t) {
Cache cache = map.get(t.getName());
if (cache == null) {
cache = new CommonCache(t, memCachedClient);
map.put(t.getName(), cache);
}
return cache;
}

private static synchronized Cache createClusterCache(Class t) {
Cache cache = map.get(t.getName());
if (cache == null) {
cache = new ClusterCache(t, memCachedClient);
map.put(t.getName(), cache);
}
return cache;
}
}


读取配置文件工具类封装和生成Hash代码工具类

ConfigUtil.Java和HashCodeUtil.java

package com.yx.cache.util;

import java.io.IOException;
import java.io.InputStream;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;

import org.dom4j.Document;
import org.dom4j.DocumentException;
import org.dom4j.Element;
import org.dom4j.io.SAXReader;

public class ConfigUtil {

private static final String COnFILE= "cacheConfig.xml";
private static final Map map = new HashMap();

static {
SAXReader saxReader = new SAXReader();
InputStream ins = ConfigUtil.class.getClassLoader()
.getResourceAsStream(CONFILE);
try {
if (ins != null) {
Document doc = saxReader.read(ins);
Element root = doc.getRootElement();
Iterator iter = root.elementIterator();
while (iter.hasNext()) {
Element e = iter.next();
map.put(e.getName(), e.getTextTrim());
}
}
} catch (DocumentException e) {
// TODO Auto-generated catch block
e.printStackTrace();
throw new RuntimeException("找不到配置文件:" + CONFILE);
} finally {
try {
if (ins != null) {
ins.close();
} else {
throw new RuntimeException("找不到配置文件:" + CONFILE);
}
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}

public static String getConfigValue(String key, String defaultValue) {
String tmp = map.get(key);
return isEmpty(tmp) ? defaultValue : tmp;
}

public static void main(String[] args) {
System.out.println(map);
}

private static boolean isEmpty(String str) {
if (str == null || "".equals(str)) {
return true;
}
return false;
}
}


package com.yx.cache.util;

import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.util.zip.CRC32;

import com.schooner.MemCached.SchoonerSockIOPool;

public class HashCodeUtil {

public static final int NATIVE_HASH = 0; // native String.hashCode();
public static final int OLD_COMPAT_HASH = 1; // original compatibility
public static final int NEW_COMPAT_HASH = 2; // new CRC32 based
public static final int CONSISTENT_HASH = 3; // MD5 Based -- Stops

private static int hashingAlg = SchoonerSockIOPool.getInstance()
.getHashingAlg();

/**
* Returns a bucket to check for a given key.
*
* @param key
* String key cache is stored under
* @return int bucket
*/
public static final int getHash(String key) {

switch (hashingAlg) {
case NATIVE_HASH:
return key.hashCode();
case OLD_COMPAT_HASH:
return origCompatHashingAlg(key);
case NEW_COMPAT_HASH:
return newCompatHashingAlg(key);
case CONSISTENT_HASH:
return md5HashingAlg(key);
default:
// use the native hash as a default
hashingAlg = NATIVE_HASH;
return key.hashCode();
}
}

private static int origCompatHashingAlg(String key) {
int hash = 0;
char[] cArr = key.toCharArray();

for (int i = 0; i hash = (hash * 33) + cArr[i];
}

return hash;
}

private static int newCompatHashingAlg(String key) {
CRC32 checksum = new CRC32();
checksum.update(key.getBytes());
int crc = (int) checksum.getValue();
return (crc >> 16) & 0x7fff;
}

private static int md5HashingAlg(String key) {
MessageDigest md5 = MD5.get();
md5.reset();
md5.update(key.getBytes());
byte[] bKey = md5.digest();
int res = ((bKey[3] & 0xFF) <<24) | ((bKey[2] & 0xFF) <<16)
| ((bKey[1] & 0xFF) <<8) | (bKey[0] & 0xFF);
return res;
}

private static ThreadLocal MD5 = new ThreadLocal() {
@Override
protected final MessageDigest initialValue() {
try {
return MessageDigest.getInstance("MD5");
} catch (NoSuchAlgorithmException e) {
throw new IllegalStateException(" no md5 algorythm found");
}
}
};

}


对于集群分布式缓存还缺少一个工具类ThreadPoolManage.java

package com.yx.task;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;

/**
* @author liuyuxiao
* @Date 2011-5-30 下午04:34:16
*/
public class ThreadPoolManager {

private static final Map map = new HashMap();

final int CORE_SIZE = 5;

private ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors
.newFixedThreadPool(CORE_SIZE);

public void submit(Runnable task) {
executor.submit(task);
}

public boolean finished() {
return executor.getCompletedTaskCount() == executor.getTaskCount();
}

private ThreadPoolManager() {

}

public static synchronized ThreadPoolManager getInstance(String key) {
ThreadPoolManager t = map.get(key);
if (t == null) {
t = new ThreadPoolManager();
map.put(key, t);
}
return t;
}
}

对于集群缓存模式和通用缓存模式测试:

package com.yx.cache.test;

import com.yx.cache.Cache;
import com.yx.cache.CacheFactory;

public class TestCommonCache {

/**
* @param args
*/
public static void main(String[] args) {

Cache cache = CacheFactory.getCommonCache(String.class);
int count = 0;
for (int i = 0; i <100; i++) {

// cache.set("" + i, "Hello!" + i);

String result = cache.get("" + i);
// System.out.println(String.format("set( %d ): %s", i, success));
if (result == null) {
count++;
}
System.out.println(String.format("get( %d ): %s", i, result));
}

System.out.println(count);
// for (int i = 0; i <500; i++) {
// MemTask task = new MemTask();
// Thread t = new Thread(task);
// t.start();
// }

}
}

package com.yx.cache.test;

import com.yx.cache.Cache;
import com.yx.cache.CacheFactory;

public class TestClusterCache {

public static void main(String[] args) {
Cache cache = CacheFactory.getClusterCache(String.class);
int count = 0;
for (int i = 0; i <100; i++) {

// cache.set("" + i, "Hello!" + i);

String result = cache.get("" + i);
// System.out.println(String.format("set( %d ): %s", i, success));
if (result == null) {
count++;
}
System.out.println(String.format("get( %d ): %s", i, result));
}
}
}






推荐阅读
  • 基于PgpoolII的PostgreSQL集群安装与配置教程
    本文介绍了基于PgpoolII的PostgreSQL集群的安装与配置教程。Pgpool-II是一个位于PostgreSQL服务器和PostgreSQL数据库客户端之间的中间件,提供了连接池、复制、负载均衡、缓存、看门狗、限制链接等功能,可以用于搭建高可用的PostgreSQL集群。文章详细介绍了通过yum安装Pgpool-II的步骤,并提供了相关的官方参考地址。 ... [详细]
  • 本文介绍了Java工具类库Hutool,该工具包封装了对文件、流、加密解密、转码、正则、线程、XML等JDK方法的封装,并提供了各种Util工具类。同时,还介绍了Hutool的组件,包括动态代理、布隆过滤、缓存、定时任务等功能。该工具包可以简化Java代码,提高开发效率。 ... [详细]
  • 本文介绍了Redis的基础数据结构string的应用场景,并以面试的形式进行问答讲解,帮助读者更好地理解和应用Redis。同时,描述了一位面试者的心理状态和面试官的行为。 ... [详细]
  • 在重复造轮子的情况下用ProxyServlet反向代理来减少工作量
    像不少公司内部不同团队都会自己研发自己工具产品,当各个产品逐渐成熟,到达了一定的发展瓶颈,同时每个产品都有着自己的入口,用户 ... [详细]
  • 个人学习使用:谨慎参考1Client类importcom.thoughtworks.gauge.Step;importcom.thoughtworks.gauge.T ... [详细]
  • 一句话解决高并发的核心原则
    本文介绍了解决高并发的核心原则,即将用户访问请求尽量往前推,避免访问CDN、静态服务器、动态服务器、数据库和存储,从而实现高性能、高并发、高可扩展的网站架构。同时提到了Google的成功案例,以及适用于千万级别PV站和亿级PV网站的架构层次。 ... [详细]
  • 模板引擎StringTemplate的使用方法和特点
    本文介绍了模板引擎StringTemplate的使用方法和特点,包括强制Model和View的分离、Lazy-Evaluation、Recursive enable等。同时,还介绍了StringTemplate语法中的属性和普通字符的使用方法,并提供了向模板填充属性的示例代码。 ... [详细]
  • 本文介绍了一个适用于PHP应用快速接入TRX和TRC20数字资产的开发包,该开发包支持使用自有Tron区块链节点的应用场景,也支持基于Tron官方公共API服务的轻量级部署场景。提供的功能包括生成地址、验证地址、查询余额、交易转账、查询最新区块和查询交易信息等。详细信息可参考tron-php的Github地址:https://github.com/Fenguoz/tron-php。 ... [详细]
  • 单页面应用 VS 多页面应用的区别和适用场景
    本文主要介绍了单页面应用(SPA)和多页面应用(MPA)的区别和适用场景。单页面应用只有一个主页面,所有内容都包含在主页面中,页面切换快但需要做相关的调优;多页面应用有多个独立的页面,每个页面都要加载相关资源,页面切换慢但适用于对SEO要求较高的应用。文章还提到了两者在资源加载、过渡动画、路由模式和数据传递方面的差异。 ... [详细]
  • Centos下安装memcached+memcached教程
    本文介绍了在Centos下安装memcached和使用memcached的教程,详细解释了memcached的工作原理,包括缓存数据和对象、减少数据库读取次数、提高网站速度等。同时,还对memcached的快速和高效率进行了解释,与传统的文件型数据库相比,memcached作为一个内存型数据库,具有更高的读取速度。 ... [详细]
  • 本文介绍了在Android开发中使用软引用和弱引用的应用。如果一个对象只具有软引用,那么只有在内存不够的情况下才会被回收,可以用来实现内存敏感的高速缓存;而如果一个对象只具有弱引用,不管内存是否足够,都会被垃圾回收器回收。软引用和弱引用还可以与引用队列联合使用,当被引用的对象被回收时,会将引用加入到关联的引用队列中。软引用和弱引用的根本区别在于生命周期的长短,弱引用的对象可能随时被回收,而软引用的对象只有在内存不够时才会被回收。 ... [详细]
  • Hibernate延迟加载深入分析-集合属性的延迟加载策略
    本文深入分析了Hibernate延迟加载的机制,特别是集合属性的延迟加载策略。通过延迟加载,可以降低系统的内存开销,提高Hibernate的运行性能。对于集合属性,推荐使用延迟加载策略,即在系统需要使用集合属性时才从数据库装载关联的数据,避免一次加载所有集合属性导致性能下降。 ... [详细]
  • Mac OS 升级到11.2.2 Eclipse打不开了,报错Failed to create the Java Virtual Machine
    本文介绍了在Mac OS升级到11.2.2版本后,使用Eclipse打开时出现报错Failed to create the Java Virtual Machine的问题,并提供了解决方法。 ... [详细]
  • 图解redis的持久化存储机制RDB和AOF的原理和优缺点
    本文通过图解的方式介绍了redis的持久化存储机制RDB和AOF的原理和优缺点。RDB是将redis内存中的数据保存为快照文件,恢复速度较快但不支持拉链式快照。AOF是将操作日志保存到磁盘,实时存储数据但恢复速度较慢。文章详细分析了两种机制的优缺点,帮助读者更好地理解redis的持久化存储策略。 ... [详细]
  • 计算机存储系统的层次结构及其优势
    本文介绍了计算机存储系统的层次结构,包括高速缓存、主存储器和辅助存储器三个层次。通过分层存储数据可以提高程序的执行效率。计算机存储系统的层次结构将各种不同存储容量、存取速度和价格的存储器有机组合成整体,形成可寻址存储空间比主存储器空间大得多的存储整体。由于辅助存储器容量大、价格低,使得整体存储系统的平均价格降低。同时,高速缓存的存取速度可以和CPU的工作速度相匹配,进一步提高程序执行效率。 ... [详细]
author-avatar
航头党员之家
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有