热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

Zookeeper深入理解(三)kazoo接口

 zookeeper的开发接口以前主要以java和c为主,随着python项目越来越多的使用zookeeper作为分布式集群实现,python的zookeeper接口也出现了很多,

 zookeeper的开发接口以前主要以java和c为主,随着python项目越来越多的使用zookeeper作为分布式集群实现,python的zookeeper接口也出现了很多,现在主流的纯python的zookeeper接口是kazoo。因此如何使用kazoo开发基于python的分布式程序是必须掌握的。

 

1.安装kazoo

yum install python-pip

pip install kazoo

  

安装过程中会出现一些python依赖包未安装的情况,安装即可。

 

2.运行kazoo基础例子kazoo_basic.py

 

#!/usr/bin/env python
# _*_coding:utf-8_*_
import time

from kazoo.client import KazooClient

from kazoo.client import KazooState


def main():
	zk = KazooClient(hosts='127.0.0.1:2182')

	zk.start()

	@zk.add_listener
	def my_listener(state):

		if state == KazooState.LOST:

			print("LOST")

		elif state == KazooState.SUSPENDED:

			print("SUSPENDED")

		else:

			print("Connected")

	# Creating Nodes

	# Ensure a path, create if necessary

	zk.ensure_path("/my/favorite")

	# Create a node with data

	zk.create("/my/favorite/node", b"")

	zk.create("/my/favorite/node/a", b"A")

	# Reading Data

	# Determine if a node exists

	if zk.exists("/my/favorite"):
		print("/my/favorite is existed")

	@zk.ChildrenWatch("/my/favorite/node")
	def watch_children(children):

		print("Children are now: %s" % children)

	# Above function called immediately, and from then on

	@zk.DataWatch("/my/favorite/node")
	def watch_node(data, stat):

		print("Version: %s, data: %s" % (stat.version, data.decode("utf-8")))

	# Print the version of a node and its data

	data, stat = zk.get("/my/favorite/node")

	print("Version: %s, data: %s" % (stat.version, data.decode("utf-8")))

	# List the children

	children = zk.get_children("/my/favorite/node")

	print("There are %s children with names %s" % (len(children), children))

	# Updating Data

	zk.set("/my/favorite", b"some data")

	# Deleting Nodes

	zk.delete("/my/favorite/node/a")

	# Transactions

	transaction = zk.transaction()

	transaction.check('/my/favorite/node', version=-1)

	transaction.create('/my/favorite/node/b', b"B")

	results = transaction.commit()

	print ("Transaction results is %s" % results)

	zk.delete("/my/favorite/node/b")

	zk.delete("/my", recursive=True)

	time.sleep(2)

	zk.stop()


if __name__ == "__main__":

	try:

		main()

	except Exception, ex:

		print "Ocurred Exception: %s" % str(ex)

		quit()

  

运行结果:

Children are now: [u'a']

Version: 0, data: 

Version: 0, data: 

There are 1 children with names [u'a']

Children are now: []

Transaction results is [True, u'/my/favorite/node/b']

Children are now: [u'b']

Children are now: []

No handlers could be found for logger "kazoo.recipe.watchers"

LOST

  

以上程序运行了基本kazoo接口命令,包括创建删除加watcher等操作,通过调试并对比zookeeper服务节点znode目录结构的变化,就可以理解具体的操作结果。

 

3.运行通过kazoo实现的分布式锁程序kazoo_lock.py

 

import logging, os, time

from kazoo.client import KazooClient

from kazoo.client import KazooState

from kazoo.recipe.lock import Lock

 

class ZooKeeperLock():

    def __init__(self, hosts, id_str, lock_name, logger=None, timeout=1):

        self.hosts = hosts

        self.id_str = id_str

        self.zk_client = None

        self.timeout = timeout

        self.logger = logger

        self.name = lock_name

        self.lock_handle = None

        self.create_lock()

    def create_lock(self):

        try:

            self.zk_client = KazooClient(hosts=self.hosts, logger=self.logger, timeout=self.timeout)

            self.zk_client.start(timeout=self.timeout)

        except Exception, ex:

            self.init_ret = False

            self.err_str = "Create KazooClient failed! Exception: %s" % str(ex)

            logging.error(self.err_str)

            return

        try:

            lock_path = os.path.join("/", "locks", self.name)

            self.lock_handle = Lock(self.zk_client, lock_path)

        except Exception, ex:

            self.init_ret = False

            self.err_str = "Create lock failed! Exception: %s" % str(ex)

            logging.error(self.err_str)

            return

    def destroy_lock(self):

        #self.release()

        if self.zk_client != None:

            self.zk_client.stop()

            self.zk_client = None

    def acquire(self, blocking=True, timeout=None):

        if self.lock_handle == None:

            return None

        try:

            return self.lock_handle.acquire(blocking=blocking, timeout=timeout)

        except Exception, ex:

            self.err_str = "Acquire lock failed! Exception: %s" % str(ex)

            logging.error(self.err_str)

            return None

    def release(self):

        if self.lock_handle == None:

            return None

        return self.lock_handle.release()

    def __del__(self):

        self.destroy_lock()

 

def main():

    logger = logging.getLogger()

    logger.setLevel(logging.INFO)

    sh = logging.StreamHandler()

    formatter = logging.Formatter('%(asctime)s -%(module)s:%(filename)s-L%(lineno)d-%(levelname)s: %(message)s')

    sh.setFormatter(formatter)

    logger.addHandler(sh)

    zookeeper_hosts = "127.0.0.1:2182"

    lock_name = "test"

    lock = ZooKeeperLock(zookeeper_hosts, "myid is 1", lock_name, logger=logger)

    ret = lock.acquire()

    if not ret:

        logging.info("Can't get lock! Ret: %s", ret)

        return

    logging.info("Get lock! Do something! Sleep 10 secs!")

    for i in range(1, 11):

        time.sleep(1)

        print str(i)

    lock.release()

 

if __name__ == "__main__":

    try:

        main()

    except Exception, ex:

        print "Ocurred Exception: %s" % str(ex)

        quit()

  

 

将该测试文件copy到多个服务器,同时运行,就可以看到分布式锁的效果了。

 

参考链接:

http://kazoo.readthedocs.org/en/latest/basic_usage.html

http://yunjianfei.iteye.com/blog/2164888

 

 

 

 

1.安装kazoo

yum install python-pip

pip install kazoo

  

安装过程中会出现一些python依赖包未安装的情况,安装即可。

 

2.运行kazoo基础例子kazoo_basic.py

 

#!/usr/bin/env python
# _*_coding:utf-8_*_
import time

from kazoo.client import KazooClient

from kazoo.client import KazooState


def main():
	zk = KazooClient(hosts='127.0.0.1:2182')

	zk.start()

	@zk.add_listener
	def my_listener(state):

		if state == KazooState.LOST:

			print("LOST")

		elif state == KazooState.SUSPENDED:

			print("SUSPENDED")

		else:

			print("Connected")

	# Creating Nodes

	# Ensure a path, create if necessary

	zk.ensure_path("/my/favorite")

	# Create a node with data

	zk.create("/my/favorite/node", b"")

	zk.create("/my/favorite/node/a", b"A")

	# Reading Data

	# Determine if a node exists

	if zk.exists("/my/favorite"):
		print("/my/favorite is existed")

	@zk.ChildrenWatch("/my/favorite/node")
	def watch_children(children):

		print("Children are now: %s" % children)

	# Above function called immediately, and from then on

	@zk.DataWatch("/my/favorite/node")
	def watch_node(data, stat):

		print("Version: %s, data: %s" % (stat.version, data.decode("utf-8")))

	# Print the version of a node and its data

	data, stat = zk.get("/my/favorite/node")

	print("Version: %s, data: %s" % (stat.version, data.decode("utf-8")))

	# List the children

	children = zk.get_children("/my/favorite/node")

	print("There are %s children with names %s" % (len(children), children))

	# Updating Data

	zk.set("/my/favorite", b"some data")

	# Deleting Nodes

	zk.delete("/my/favorite/node/a")

	# Transactions

	transaction = zk.transaction()

	transaction.check('/my/favorite/node', version=-1)

	transaction.create('/my/favorite/node/b', b"B")

	results = transaction.commit()

	print ("Transaction results is %s" % results)

	zk.delete("/my/favorite/node/b")

	zk.delete("/my", recursive=True)

	time.sleep(2)

	zk.stop()


if __name__ == "__main__":

	try:

		main()

	except Exception, ex:

		print "Ocurred Exception: %s" % str(ex)

		quit()

  

运行结果:

Children are now: [u'a']

Version: 0, data: 

Version: 0, data: 

There are 1 children with names [u'a']

Children are now: []

Transaction results is [True, u'/my/favorite/node/b']

Children are now: [u'b']

Children are now: []

No handlers could be found for logger "kazoo.recipe.watchers"

LOST

  

以上程序运行了基本kazoo接口命令,包括创建删除加watcher等操作,通过调试并对比zookeeper服务节点znode目录结构的变化,就可以理解具体的操作结果。

 

3.运行通过kazoo实现的分布式锁程序kazoo_lock.py

 

import logging, os, time

from kazoo.client import KazooClient

from kazoo.client import KazooState

from kazoo.recipe.lock import Lock

 

class ZooKeeperLock():

    def __init__(self, hosts, id_str, lock_name, logger=None, timeout=1):

        self.hosts = hosts

        self.id_str = id_str

        self.zk_client = None

        self.timeout = timeout

        self.logger = logger

        self.name = lock_name

        self.lock_handle = None

        self.create_lock()

    def create_lock(self):

        try:

            self.zk_client = KazooClient(hosts=self.hosts, logger=self.logger, timeout=self.timeout)

            self.zk_client.start(timeout=self.timeout)

        except Exception, ex:

            self.init_ret = False

            self.err_str = "Create KazooClient failed! Exception: %s" % str(ex)

            logging.error(self.err_str)

            return

        try:

            lock_path = os.path.join("/", "locks", self.name)

            self.lock_handle = Lock(self.zk_client, lock_path)

        except Exception, ex:

            self.init_ret = False

            self.err_str = "Create lock failed! Exception: %s" % str(ex)

            logging.error(self.err_str)

            return

    def destroy_lock(self):

        #self.release()

        if self.zk_client != None:

            self.zk_client.stop()

            self.zk_client = None

    def acquire(self, blocking=True, timeout=None):

        if self.lock_handle == None:

            return None

        try:

            return self.lock_handle.acquire(blocking=blocking, timeout=timeout)

        except Exception, ex:

            self.err_str = "Acquire lock failed! Exception: %s" % str(ex)

            logging.error(self.err_str)

            return None

    def release(self):

        if self.lock_handle == None:

            return None

        return self.lock_handle.release()

    def __del__(self):

        self.destroy_lock()

 

def main():

    logger = logging.getLogger()

    logger.setLevel(logging.INFO)

    sh = logging.StreamHandler()

    formatter = logging.Formatter('%(asctime)s -%(module)s:%(filename)s-L%(lineno)d-%(levelname)s: %(message)s')

    sh.setFormatter(formatter)

    logger.addHandler(sh)

    zookeeper_hosts = "127.0.0.1:2182"

    lock_name = "test"

    lock = ZooKeeperLock(zookeeper_hosts, "myid is 1", lock_name, logger=logger)

    ret = lock.acquire()

    if not ret:

        logging.info("Can't get lock! Ret: %s", ret)

        return

    logging.info("Get lock! Do something! Sleep 10 secs!")

    for i in range(1, 11):

        time.sleep(1)

        print str(i)

    lock.release()

 

if __name__ == "__main__":

    try:

        main()

    except Exception, ex:

        print "Ocurred Exception: %s" % str(ex)

        quit()

  

 

将该测试文件copy到多个服务器,同时运行,就可以看到分布式锁的效果了。

 

参考链接:

http://kazoo.readthedocs.org/en/latest/basic_usage.html

http://yunjianfei.iteye.com/blog/2164888

 

 

 


推荐阅读
  • 本文介绍了闭包的定义和运转机制,重点解释了闭包如何能够接触外部函数的作用域中的变量。通过词法作用域的查找规则,闭包可以访问外部函数的作用域。同时还提到了闭包的作用和影响。 ... [详细]
  • Commit1ced2a7433ea8937a1b260ea65d708f32ca7c95eintroduceda+Clonetraitboundtom ... [详细]
  • 开发笔记:加密&json&StringIO模块&BytesIO模块
    篇首语:本文由编程笔记#小编为大家整理,主要介绍了加密&json&StringIO模块&BytesIO模块相关的知识,希望对你有一定的参考价值。一、加密加密 ... [详细]
  • 目录实现效果:实现环境实现方法一:基本思路主要代码JavaScript代码总结方法二主要代码总结方法三基本思路主要代码JavaScriptHTML总结实 ... [详细]
  • 闭包一直是Java社区中争论不断的话题,很多语言都支持闭包这个语言特性,闭包定义了一个依赖于外部环境的自由变量的函数,这个函数能够访问外部环境的变量。本文以JavaScript的一个闭包为例,介绍了闭包的定义和特性。 ... [详细]
  • Java太阳系小游戏分析和源码详解
    本文介绍了一个基于Java的太阳系小游戏的分析和源码详解。通过对面向对象的知识的学习和实践,作者实现了太阳系各行星绕太阳转的效果。文章详细介绍了游戏的设计思路和源码结构,包括工具类、常量、图片加载、面板等。通过这个小游戏的制作,读者可以巩固和应用所学的知识,如类的继承、方法的重载与重写、多态和封装等。 ... [详细]
  • YOLOv7基于自己的数据集从零构建模型完整训练、推理计算超详细教程
    本文介绍了关于人工智能、神经网络和深度学习的知识点,并提供了YOLOv7基于自己的数据集从零构建模型完整训练、推理计算的详细教程。文章还提到了郑州最低生活保障的话题。对于从事目标检测任务的人来说,YOLO是一个熟悉的模型。文章还提到了yolov4和yolov6的相关内容,以及选择模型的优化思路。 ... [详细]
  • 本文讨论了Alink回归预测的不完善问题,指出目前主要针对Python做案例,对其他语言支持不足。同时介绍了pom.xml文件的基本结构和使用方法,以及Maven的相关知识。最后,对Alink回归预测的未来发展提出了期待。 ... [详细]
  • 阿,里,云,物,联网,net,core,客户端,czgl,aliiotclient, ... [详细]
  • 本文介绍了OC学习笔记中的@property和@synthesize,包括属性的定义和合成的使用方法。通过示例代码详细讲解了@property和@synthesize的作用和用法。 ... [详细]
  • JavaSE笔试题-接口、抽象类、多态等问题解答
    本文解答了JavaSE笔试题中关于接口、抽象类、多态等问题。包括Math类的取整数方法、接口是否可继承、抽象类是否可实现接口、抽象类是否可继承具体类、抽象类中是否可以有静态main方法等问题。同时介绍了面向对象的特征,以及Java中实现多态的机制。 ... [详细]
  • eclipse学习(第三章:ssh中的Hibernate)——11.Hibernate的缓存(2级缓存,get和load)
    本文介绍了eclipse学习中的第三章内容,主要讲解了ssh中的Hibernate的缓存,包括2级缓存和get方法、load方法的区别。文章还涉及了项目实践和相关知识点的讲解。 ... [详细]
  • XML介绍与使用的概述及标签规则
    本文介绍了XML的基本概念和用途,包括XML的可扩展性和标签的自定义特性。同时还详细解释了XML标签的规则,包括标签的尖括号和合法标识符的组成,标签必须成对出现的原则以及特殊标签的使用方法。通过本文的阅读,读者可以对XML的基本知识有一个全面的了解。 ... [详细]
  • 本文介绍了Android 7的学习笔记总结,包括最新的移动架构视频、大厂安卓面试真题和项目实战源码讲义。同时还分享了开源的完整内容,并提醒读者在使用FileProvider适配时要注意不同模块的AndroidManfiest.xml中配置的xml文件名必须不同,否则会出现问题。 ... [详细]
  • 开发笔记:Java是如何读取和写入浏览器Cookies的
    篇首语:本文由编程笔记#小编为大家整理,主要介绍了Java是如何读取和写入浏览器Cookies的相关的知识,希望对你有一定的参考价值。首先我 ... [详细]
author-avatar
518094haha
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有