热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

php的memcache队列类-PHP源码

php的memcache队列类

memcacheQueue.class.php

add('1asdf');
 *		$obj->getQueueLength();
 *		$obj->read(11);
 *		$obj->get(8);
 */

class memcacheQueue{
	public static	$client;			//memcache客户端连接
	public			$access;			//队列是否可更新	
	private 		$currentSide;		//当前轮值的队列面:A/B
	private			$lastSide;			//上一轮值的队列面:A/B
	private 		$sideAHead;			//A面队首值
	private 		$sideATail;			//A面队尾值
	private 		$sideBHead;			//B面队首值
	private 		$sideBTail;			//B面队尾值
	private			$currentHead;		//当前队首值
	private			$currentTail;		//当前队尾值
	private			$lastHead;			//上轮队首值
	private			$lastTail;			//上轮队尾值	
	private 		$expire;			//过期时间,秒,1~2592000,即30天内;0为永不过期
	private			$sleepTime;			//等待解锁时间,微秒
	private			$queueName;			//队列名称,唯一值
	private			$retryNum;			//重试次数,= 10 * 理论并发数
	
	const	MAXNUM		= 2000;					//(单面)最大队列数,建议上限10K
	const	HEAD_KEY	= '_lkkQueueHead_';		//队列首kye
	const	TAIL_KEY	= '_lkkQueueTail_';		//队列尾key
	const	VALU_KEY	= '_lkkQueueValu_';		//队列值key
	const	LOCK_KEY	= '_lkkQueueLock_';		//队列锁key
	const	SIDE_KEY	= '_lkkQueueSide_';		//轮值面key
	
	/*
	 * 构造函数
	 * @param	[config]	array	memcache服务器参数
	 * @param	[queueName]	string	队列名称
	 * @param	[expire]	string	过期时间
	 * @return	NULL
	 */
	public function __construct($queueName ='',$expire='',$cOnfig=''){
		if(empty($config)){
			self::$client = memcache_pconnect('localhost',11211);
		}elseif(is_array($config)){//array('host'=>'127.0.0.1','port'=>'11211')
			self::$client = memcache_pconnect($config['host'],$config['port']);
		}elseif(is_string($config)){//"127.0.0.1:11211"
			$tmp = explode(':',$config);
			$conf['host'] = isset($tmp[0]) ? $tmp[0] : '127.0.0.1';
			$conf['port'] = isset($tmp[1]) ? $tmp[1] : '11211';
			self::$client = memcache_pconnect($conf['host'],$conf['port']);		
		}
		if(!self::$client) return false;
		
		ignore_user_abort(TRUE);//当客户断开连接,允许继续执行
		set_time_limit(0);//取消脚本执行延时上限
		
		$this->access = false;
		$this->sleepTime = 1000;
		$expire = (empty($expire) && $expire!=0) ? 3600 : (int)$expire;
		$this->expire = $expire;
		$this->queueName = $queueName;
		$this->retryNum = 10000;
		
		$side = memcache_add(self::$client, $queueName . self::SIDE_KEY, 'A',false, $expire);
		$this->getHeadNTail($queueName);
		if(!isset($this->sideAHead) || empty($this->sideAHead)) $this->sideAHead = 0;
		if(!isset($this->sideATail) || empty($this->sideATail)) $this->sideATail = 0;
		if(!isset($this->sideBHead) || empty($this->sideBHead)) $this->sideBHead = 0;
		if(!isset($this->sideBHead) || empty($this->sideBHead)) $this->sideBHead = 0;
	}
	
	/*
	 * 获取队列首尾值
	 * @param	[queueName]	string	队列名称
	 * @return	NULL
	 */
	private function getHeadNTail($queueName){
		$this->sideAHead = (int)memcache_get(self::$client, $queueName.'A'. self::HEAD_KEY);
		$this->sideATail = (int)memcache_get(self::$client, $queueName.'A'. self::TAIL_KEY);
		$this->sideBHead = (int)memcache_get(self::$client, $queueName.'B'. self::HEAD_KEY);
		$this->sideBTail = (int)memcache_get(self::$client, $queueName.'B'. self::TAIL_KEY);
	}
	
	/*
	 * 获取当前轮值的队列面
	 * @return	string	队列面名称
	 */
	public function getCurrentSide(){
		$currentSide = memcache_get(self::$client, $this->queueName . self::SIDE_KEY);
		if($currentSide == 'A'){
			$this->currentSide = 'A';
			$this->lastSide = 'B';	

			$this->currentHead	= $this->sideAHead;
			$this->currentTail	= $this->sideATail;
			$this->lastHead		= $this->sideBHead;
			$this->lastTail		= $this->sideBTail;			
		}else{
			$this->currentSide = 'B';
			$this->lastSide = 'A';

			$this->currentHead	= $this->sideBHead;
			$this->currentTail	= $this->sideBTail;
			$this->lastHead		= $this->sideAHead;
			$this->lastTail		= $this->sideATail;						
		}
		
		return $this->currentSide;
	}
	
	/*
	 * 队列加锁
	 * @return boolean
	 */
	private function getLock(){
		if($this->access === false){
			while(!memcache_add(self::$client, $this->queueName .self::LOCK_KEY, 1, false, $this->expire) ){
				usleep($this->sleepTime);
				@$i++;
				if($i > $this->retryNum){//尝试等待N次
					return false;
					break;
				}
			}
			return $this->access = true;
		}
		return false;
	}
	
	/*
	 * 队列解锁
	 * @return NULL
	 */
	private function unLock(){
		memcache_delete(self::$client, $this->queueName .self::LOCK_KEY);
		$this->access = false;
	}
	
	/*
	 * 添加数据
	 * @param	[data]	要存储的值
	 * @return	boolean
	 */
	public function add($data){
		$result = false;
		if(!$this->getLock()){
			return $result;
		} 
		$this->getHeadNTail($this->queueName);
		$this->getCurrentSide();
		
		if($this->isFull()){
			$this->unLock();
			return false;
		}
		
		if($this->currentTail queueName .$this->currentSide . self::VALU_KEY . $this->currentTail;
			if(memcache_add(self::$client, $value_key, $data, false, $this->expire)){
				$this->changeTail();
				$result = true;
			}
		}else{//当前队列已满,更换轮值面
			$this->unLock();
			$this->changeCurrentSide();
			return $this->add($data);
		}

		$this->unLock();
		return $result;
	}
	
	/*
	 * 取出数据
	 * @param	[length]	int	数据的长度
	 * @return	array
	 */
	public function get($length=0){
		if(!is_numeric($length)) return false;
		if(empty($length)) $length = self::MAXNUM * 2;//默认读取所有
		if(!$this->getLock()) return false;

		if($this->isEmpty()){
			$this->unLock();
			return false;
		}
		
		$keyArray	= $this->getKeyArray($length);
		$lastKey	= $keyArray['lastKey'];
		$currentKey	= $keyArray['currentKey'];
		$keys		= $keyArray['keys'];
		$this->changeHead($this->lastSide,$lastKey);
		$this->changeHead($this->currentSide,$currentKey);
		
		$data	= @memcache_get(self::$client, $keys);
		foreach($keys as $v){//取出之后删除
			@memcache_delete(self::$client, $v, 0);
		}
		$this->unLock();

		return $data;
	}
	
	/*
	 * 读取数据
	 * @param	[length]	int	数据的长度
	 * @return	array
	 */
	public function read($length=0){
		if(!is_numeric($length)) return false;
		if(empty($length)) $length = self::MAXNUM * 2;//默认读取所有
		$keyArray	= $this->getKeyArray($length);
		$data	= @memcache_get(self::$client, $keyArray['keys']);
		return $data;
	}
	
	/*
	 * 获取队列某段长度的key数组
	 * @param	[length]	int	队列长度
	 * @return	array
	 */
	private function getKeyArray($length){
		$result = array('keys'=>array(),'lastKey'=>array(),'currentKey'=>array());
		$this->getHeadNTail($this->queueName);
		$this->getCurrentSide();
		if(empty($length)) return $result;
		
		//先取上一面的key
		$i = $result['lastKey'] = 0;
		for($i=0;$i<$length;$i++){
			$result[&#39;lastKey&#39;] = $this->lastHead + $i;
			if($result[&#39;lastKey&#39;] >= $this->lastTail) break;
			$result[&#39;keys&#39;][] = $this->queueName .$this->lastSide . self::VALU_KEY . $result[&#39;lastKey&#39;];
		}
		
		//再取当前面的key
		$j = $length - $i;
		$k = $result[&#39;currentKey&#39;] = 0;
		for($k=0;$k<$j;$k++){
			$result[&#39;currentKey&#39;] = $this->currentHead + $k;
			if($result[&#39;currentKey&#39;] >= $this->currentTail) break;
			$result[&#39;keys&#39;][] = $this->queueName .$this->currentSide . self::VALU_KEY . $result[&#39;currentKey&#39;];
		}

		return $result;
	}
	
	/*
	 * 更新当前轮值面队列尾的值
	 * @return	NULL
	 */
	private function changeTail(){
		$tail_key = $this->queueName .$this->currentSide . self::TAIL_KEY;
		memcache_add(self::$client, $tail_key, 0,false, $this->expire);//如果没有,则插入;有则false;
		//memcache_increment(self::$client, $tail_key, 1);//队列尾+1
		$v = memcache_get(self::$client, $tail_key) +1;
		memcache_set(self::$client, $tail_key,$v,false,$this->expire);
	}
	
	/*
	 * 更新队列首的值
	 * @param	[side]		string	要更新的面
	 * @param	[headValue]	int		队列首的值
	 * @return	NULL
	 */
	private function changeHead($side,$headValue){
		if($headValue <1) return false;
		$head_key = $this->queueName .$side . self::HEAD_KEY;
		$tail_key = $this->queueName .$side . self::TAIL_KEY;
		$sideTail = memcache_get(self::$client, $tail_key);
		if($headValue <$sideTail){
			memcache_set(self::$client, $head_key,$headValue+1,false,$this->expire);
		}elseif($headValue >= $sideTail){
			$this->resetSide($side);
		}
	}
	
	/*
	 * 重置队列面,即将该队列面的队首、队尾值置为0
	 * @param	[side]	string	要重置的面
	 * @return	NULL
	 */
	private function resetSide($side){
		$head_key = $this->queueName .$side . self::HEAD_KEY;
		$tail_key = $this->queueName .$side . self::TAIL_KEY;
		memcache_set(self::$client, $head_key,0,false,$this->expire);
		memcache_set(self::$client, $tail_key,0,false,$this->expire);
	}
	
	
	/*
	 * 改变当前轮值队列面
	 * @return	string
	 */
	private function changeCurrentSide(){
		$currentSide = memcache_get(self::$client, $this->queueName . self::SIDE_KEY);
		if($currentSide == &#39;A&#39;){
			memcache_set(self::$client, $this->queueName . self::SIDE_KEY,&#39;B&#39;,false,$this->expire);
			$this->currentSide = &#39;B&#39;;
		}else{
			memcache_set(self::$client, $this->queueName . self::SIDE_KEY,&#39;A&#39;,false,$this->expire);
			$this->currentSide = &#39;A&#39;;
		}
		return $this->currentSide;
	}
	
	/*
	 * 检查当前队列是否已满
	 * @return	boolean
	 */
	public function isFull(){
		$result = false;
		if($this->sideATail == self::MAXNUM && $this->sideBTail == self::MAXNUM){
			$result = true;
		}
		return $result;
	}
	
	/*
	 * 检查当前队列是否为空
	 * @return	boolean
	 */
	public function isEmpty(){
		$result = true;
		if($this->sideATail > 0 || $this->sideBTail > 0){
			$result = false;
		}
		return $result;
	}
	
	/*
	 * 获取当前队列的长度
	 * 该长度为理论长度,某些元素由于过期失效而丢失,真实长度小于或等于该长度
	 * @return	int
	 */
	public function getQueueLength(){
		$this->getHeadNTail($this->queueName);
		$this->getCurrentSide();

		$sideALength = $this->sideATail - $this->sideAHead;
		$sideBLength = $this->sideBTail - $this->sideBHead;
		$result = $sideALength + $sideBLength;
		
		return $result;
	}
	

	/*
	 * 清空当前队列数据,仅保留HEAD_KEY、TAIL_KEY、SIDE_KEY三个key
	 * @return	boolean
	 */
	public function clear(){
		if(!$this->getLock()) return false;
		for($i=0;$iqueueName.&#39;A&#39;. self::VALU_KEY .$i, 0);
			@memcache_delete(self::$client, $this->queueName.&#39;B&#39;. self::VALU_KEY .$i, 0);
		}
		$this->unLock();
		$this->resetSide(&#39;A&#39;);
		$this->resetSide(&#39;B&#39;);
		return true;
	}
	
	/*
	 * 清除所有memcache缓存数据
	 * @return	NULL
	 */
	public function memFlush(){
		memcache_flush(self::$client);
	}


}

推荐阅读
  • 如何实现织梦DedeCms全站伪静态
    本文介绍了如何通过修改织梦DedeCms源代码来实现全站伪静态,以提高管理和SEO效果。全站伪静态可以避免重复URL的问题,同时通过使用mod_rewrite伪静态模块和.htaccess正则表达式,可以更好地适应搜索引擎的需求。文章还提到了一些相关的技术和工具,如Ubuntu、qt编程、tomcat端口、爬虫、php request根目录等。 ... [详细]
  • 本文详细介绍了SQL日志收缩的方法,包括截断日志和删除不需要的旧日志记录。通过备份日志和使用DBCC SHRINKFILE命令可以实现日志的收缩。同时,还介绍了截断日志的原理和注意事项,包括不能截断事务日志的活动部分和MinLSN的确定方法。通过本文的方法,可以有效减小逻辑日志的大小,提高数据库的性能。 ... [详细]
  • 本文介绍了Python高级网络编程及TCP/IP协议簇的OSI七层模型。首先简单介绍了七层模型的各层及其封装解封装过程。然后讨论了程序开发中涉及到的网络通信内容,主要包括TCP协议、UDP协议和IPV4协议。最后还介绍了socket编程、聊天socket实现、远程执行命令、上传文件、socketserver及其源码分析等相关内容。 ... [详细]
  • 本文介绍了在开发Android新闻App时,搭建本地服务器的步骤。通过使用XAMPP软件,可以一键式搭建起开发环境,包括Apache、MySQL、PHP、PERL。在本地服务器上新建数据库和表,并设置相应的属性。最后,给出了创建new表的SQL语句。这个教程适合初学者参考。 ... [详细]
  • 搭建Windows Server 2012 R2 IIS8.5+PHP(FastCGI)+MySQL环境的详细步骤
    本文详细介绍了搭建Windows Server 2012 R2 IIS8.5+PHP(FastCGI)+MySQL环境的步骤,包括环境说明、相关软件下载的地址以及所需的插件下载地址。 ... [详细]
  • PHP设置MySQL字符集的方法及使用mysqli_set_charset函数
    本文介绍了PHP设置MySQL字符集的方法,详细介绍了使用mysqli_set_charset函数来规定与数据库服务器进行数据传送时要使用的字符集。通过示例代码演示了如何设置默认客户端字符集。 ... [详细]
  • 本文介绍了如何使用php限制数据库插入的条数并显示每次插入数据库之间的数据数目,以及避免重复提交的方法。同时还介绍了如何限制某一个数据库用户的并发连接数,以及设置数据库的连接数和连接超时时间的方法。最后提供了一些关于浏览器在线用户数和数据库连接数量比例的参考值。 ... [详细]
  • 如何使用Java获取服务器硬件信息和磁盘负载率
    本文介绍了使用Java编程语言获取服务器硬件信息和磁盘负载率的方法。首先在远程服务器上搭建一个支持服务端语言的HTTP服务,并获取服务器的磁盘信息,并将结果输出。然后在本地使用JS编写一个AJAX脚本,远程请求服务端的程序,得到结果并展示给用户。其中还介绍了如何提取硬盘序列号的方法。 ... [详细]
  • Linux服务器密码过期策略、登录次数限制、私钥登录等配置方法
    本文介绍了在Linux服务器上进行密码过期策略、登录次数限制、私钥登录等配置的方法。通过修改配置文件中的参数,可以设置密码的有效期、最小间隔时间、最小长度,并在密码过期前进行提示。同时还介绍了如何进行公钥登录和修改默认账户用户名的操作。详细步骤和注意事项可参考本文内容。 ... [详细]
  • 本文介绍了在rhel5.5操作系统下搭建网关+LAMP+postfix+dhcp的步骤和配置方法。通过配置dhcp自动分配ip、实现外网访问公司网站、内网收发邮件、内网上网以及SNAT转换等功能。详细介绍了安装dhcp和配置相关文件的步骤,并提供了相关的命令和配置示例。 ... [详细]
  • 这是原文链接:sendingformdata许多情况下,我们使用表单发送数据到服务器。服务器处理数据并返回响应给用户。这看起来很简单,但是 ... [详细]
  • 本文介绍了使用AJAX的POST请求实现数据修改功能的方法。通过ajax-post技术,可以实现在输入某个id后,通过ajax技术调用post.jsp修改具有该id记录的姓名的值。文章还提到了AJAX的概念和作用,以及使用async参数和open()方法的注意事项。同时强调了不推荐使用async=false的情况,并解释了JavaScript等待服务器响应的机制。 ... [详细]
  • Centos7.6安装Gitlab教程及注意事项
    本文介绍了在Centos7.6系统下安装Gitlab的详细教程,并提供了一些注意事项。教程包括查看系统版本、安装必要的软件包、配置防火墙等步骤。同时,还强调了使用阿里云服务器时的特殊配置需求,以及建议至少4GB的可用RAM来运行GitLab。 ... [详细]
  • 本文介绍了在Hibernate配置lazy=false时无法加载数据的问题,通过采用OpenSessionInView模式和修改数据库服务器版本解决了该问题。详细描述了问题的出现和解决过程,包括运行环境和数据库的配置信息。 ... [详细]
  • 本文介绍了如何找到并终止在8080端口上运行的进程的方法,通过使用终端命令lsof -i :8080可以获取在该端口上运行的所有进程的输出,并使用kill命令终止指定进程的运行。 ... [详细]
author-avatar
315热点关注
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有