热门标签 | HotTags
当前位置:  开发笔记 > 运维 > 正文

在Java中操作Zookeeper的示例代码详解

这篇文章主要介绍了在Java中操作Zookeeper的示例代码详解,本文通过实例代码给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下

依赖

 
    org.apache.zookeeper
    zookeeper
    3.6.0
  

连接到zkServer

//连接字符串,zkServer的ip、port,如果是集群逗号分隔
  String cOnnectStr= "192.168.1.9:2181";

  //zookeeper就是一个zkCli
  ZooKeeper zooKeeper = null;

  try {
     //初始次数为1。后面要在内部类中使用,三种写法:1、写成外部类成员变量,不用加final;2、作为函数局部变量,放在try外面,写成final;3、写在try中,不加final
     CountDownLatch countDownLatch = new CountDownLatch(1);
    //超时时间ms,监听器
    zooKeeper = new ZooKeeper(connectStr, 5000, new Watcher() {
      public void process(WatchedEvent watchedEvent) {
        //如果状态变成已连接
        if (watchedEvent.getState().equals(Event.KeeperState.SyncConnected)) {
          System.out.println("连接成功");
          //次数-1
          countDownLatch.countDown();
        }
      }
    });
    //等待,次数为0时才会继续往下执行。等待监听器监听到连接成功,才能操作zk
    countDownLatch.await();
  } catch (IOException | InterruptedException e) {
    e.printStackTrace();
  }


  //...操作zk。后面的demo都是写在此处的


  //关闭连接
  try {
    zooKeeper.close();
  } catch (InterruptedException e) {
    e.printStackTrace();
  }

检测节点是否存在

// 检测节点是否存在

  // 同步方式
  Stat exists = null;
  try {
    //如果存在,返回节点状态Stat;如果不存在,返回null。第二个参数是watch
    exists = zooKeeper.exists("/mall",false);
  } catch (KeeperException | InterruptedException e) {
    e.printStackTrace();
  }
  if (exists==null){
    System.out.println("节点不存在");
  }
  else {
    System.out.println("节点存在");
  }


  //异步回调
  zooKeeper.exists("/mall",false, new AsyncCallback.StatCallback() {
    //第二个是path znode路径,第三个是ctx 后面传入实参,第四个是znode的状态
    public void processResult(int i, String s, Object o, Stat stat) {
      //如果节点不存在,返回的stat是null
      if (stat==null){
        System.out.println("节点不存在");
      }
      else{
        System.out.println("节点存在");
      }
    }
  // 传入ctx,Object类型
  },null);

操作后,服务端会返回处理结果,返回void、null也算处理结果。

同步指的是当前线程阻塞,等待服务端返回数据,收到返回的数据才继续往下执行;

异步回调指的是,把对结果(返回的数据)的处理写在回调函数中,当前线程不等待返回的数据,继续往下执行,收到返回的数据时自动调用回调函数来处理。

如果处理返回数据的代码之后的操作,不依赖返回数据、对返回数据的处理,那么可以把返回数据的处理写成回调函数。

创建节点

//创建节点

  //同步方式
  try {
    //数据要写成byte[],不携带数据写成null;默认acl权限使用ZooDefs.Ids.OPEN_ACL_UNSAFE;最后一个是节点类型,P是永久,E是临时,S是有序
    zooKeeper.create("/mall", "abcd".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
    System.out.println("已创建节点/mall");
    //如果节点已存在,会抛出异常
  } catch (KeeperException | InterruptedException e) {     System.out.println("创建节点/mall失败,请检查节点是否已存在");
    e.printStackTrace();
  }

  //异步回调
  zooKeeper.create("/mall", "abcd".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, new AsyncCallback.Create2Callback(){
    //第二个path,第三个ctx,第四个节点状态
    public void processResult(int i, String s, Object o, String s1, Stat stat) {
      //回调方式不抛出异常,返回的stat是创建节点的状态,如果节点已存在,返回的stat是null
      if (stat==null){
        System.out.println("创建节点/mall失败,请检查节点是否已存在");
      }
      else {
        System.out.println("节点创建成功");
      }
    }
    //ctx实参
  },null);

删除节点

//删除节点

  //同步方式
  try {
    //第二个参数是版本号,-1表示可以是任何版本
    zooKeeper.delete("/mall1",-1);
    System.out.println("成功删除节点/mall");
  } catch (InterruptedException | KeeperException e) {
    System.out.println("删除节点/mall失败");
    e.printStackTrace();
  }


  //异步回调
  zooKeeper.delete("/mall2", -1, new AsyncCallback.VoidCallback() {
    //第二个是path,第三个是ctx
    public void processResult(int i, String s, Object o) {
      
    }
  //
  //ctx实参
  },null);

delete()只能删除没有子节点的znode,如果该znode有子节点会抛出异常。

没有提供递归删除子节点的方法,如果要删除带有子节点的znode,需要自己实现递归删除。可以先getChildren()获取子节点列表,遍历列表依次删除子节点,再删除父节点。

获取子节点列表

//获取子节点列表,List,比如/mall/user,/mall/order,返回的是["user"、"order"]

  //同步方式
  List children = null;
  try {
    //第二个参数是watch
    children = zooKeeper.getChildren("/mall", false);
  } catch (KeeperException | InterruptedException e) {
    e.printStackTrace();
  }
  System.out.println("子节点列表:" + children);


  //异步
  zooKeeper.getChildren("/mall", false, new AsyncCallback.ChildrenCallback() {
    //第二个起依次是:path、ctx、返回的子节点列表
    public void processResult(int i, String s, Object o, List list) {
      System.out.println("子节点列表:" + list);
    }
  //ctx实参
  }, null);

只获取子节点,不获取孙节点。

watch都是:可以写boolean,要添加监听就写true,不监听写false;也可以写Watcher对象,new一个Watcher对象表示要监听,null表示不监听。

获取节点数据

//获取节点数据,返回byte[]

  //同步方式
  byte[] data = null;
  try {
    //第二个参数是watch,第三个是stat
    data = zooKeeper.getData("/mall", false, null);
  } catch (KeeperException | InterruptedException e) {
    e.printStackTrace();
  }
  //调用new String()时要判断data是否为null,如果是null会抛NPE
  if (data==null){
    System.out.println("该节点没有数据");
  }
  else{
    System.out.println("节点数据:"+new String(data));
  }


  //异步回调
  zooKeeper.getData("/mall", false, new AsyncCallback.DataCallback() {
    //第二个起依次是:path、ctx、返回的节点数据、节点状态
    public void processResult(int i, String s, Object o, byte[] bytes, Stat stat) {
      //不必判断bytes是否是null,如果节点没有数据,不会调用回调函数;执行到此,说明bytes不是null
      System.out.println("节点数据:" + new String(bytes) );
    }
    //ctx实参
  }, null);

设置|修改节点数据

//设置|更新节点据

  //同步方式
  try {
    //最后一个参数是版本号
    zooKeeper.setData("/mall", "1234".getBytes(), -1);
    System.out.println("设置节点数据成功");
  } catch (KeeperException | InterruptedException e) {
    System.out.println("设置节点数据失败");
    e.printStackTrace();
  }


  //异步回调
  zooKeeper.setData("/mall", "1234".getBytes(), -1, new AsyncCallback.StatCallback() {
    //第二个是path,第三个是ctx
    public void processResult(int i, String s, Object o, Stat stat) {

    }
  // ctx
  },null);

设置acl权限

//设置acl权限
    
  //第一个参数指定权限,第二个是Id对象
  ACL acl = new ACL(ZooDefs.Perms.ALL, new Id("auth", "chy:abcd"));
  
  List aclList = new ArrayList<>();
  aclList.add(acl);
  
  //如果List中只有一个ACL对象,也可以这样写
  //List aclList = Collections.singletonList(auth);
    
  //验证权限,需写在设置权限之前。如果之前没有设置权限,也需要先验证本次即将设置的用户
  zooKeeper.addAuthInfo("digest","chy:abcd".getBytes());

  
  //方式一 setAcl
  try {
    //第二个参数是List,第三个参数是版本号
    zooKeeper.setACL("/mall", aclList, -1);
    System.out.println("设置权限成功");
  } catch (KeeperException | InterruptedException e) {
    System.out.println("设置权限失败");
    e.printStackTrace();
  }

  
  //方式二 在创建节点时设置权限
  try {
    zooKeeper.create("/mall","abcd".getBytes(),aclList,CreateMode.PERSISTENT);
    System.out.println("已创建节点并设置权限");
  } catch (KeeperException | InterruptedException e) {
    e.printStackTrace();
  }

设置权限之后,连接zkServer进行操作时,都需要先验证用户。

此处未写对应的异步回调。

查看acl权限

//查看acl权限
    
  //设置权限之后,以后操作时需要先验证用户,一次session中验证一次即可
  zooKeeper.addAuthInfo("digest","chy:abcd".getBytes());

  
  //同步方式
  try {
    List aclList = zooKeeper.getACL("/mall", null);
    System.out.println("acl权限:"+aclList);
  } catch (KeeperException | InterruptedException e) {
    System.out.println("获取acl权限失败");
    e.printStackTrace();
  }


  //异步回调
  zooKeeper.getACL("/mall3", null, new AsyncCallback.ACLCallback() {
    //第二个起:path、ctx、获取到的List、节点状态
    public void processResult(int i, String s, Object o, List list, Stat stat) {
      //就算没有手动设置acl权限,默认也是有值的
      System.out.println("acl权限:"+list);
    }
  //ctx实参
  },null);

添加监听器

//添加监听 方式一
  try {
    CountDownLatch countDownLatch = new CountDownLatch(1);

    zooKeeper.getData("/mall", new Watcher() {
      public void process(WatchedEvent watchedEvent) {
        //watcher会监听该节点所有的事件,不管发生什么事件都会调用process()来处理,需要先判断一下事件类型
        if (watchedEvent.getType().equals(Event.EventType.NodeDataChanged)){
          System.out.println("节点数据改变了");
          //会一直监听,如果只监听一次数据改变,将下面这句代码取消注释即可
          //countDownLatch.countDown();
        }
      }
    }, null);
    //默认watcher是一次性的,如果要一直监听,需要借助CountDownLatch
    countDownLatch.await();
  } catch (KeeperException | InterruptedException e) {
    e.printStackTrace();
  }

ZooKeeper类的exists()、getData()、getChildren()方法都具有添加监听的功能,用法类似。

watchedEvent.getType().equals(Event.EventType.NodeDataChanged)
watchedEvent.getState().equals(Event.KeeperState.SyncConnected)

getType是获取事件类型,getState是获取连接状态。

上面这种方式,会递归监听子孙节点,子孙节点的数据改变也算NodeDataChanged,子孙节点的创建|删除也算NodeCreated|NodeDeleted。

//添加监听 方式二  
   try {
    CountDownLatch countDownLatch1 = new CountDownLatch(1);
    zooKeeper.addWatch("/mall", new Watcher() {
      @Override
      public void process(WatchedEvent watchedEvent) {
        if (watchedEvent.getType().equals(Event.EventType.NodeDataChanged)){
          System.out.println("节点数据改变了");
          //如果只监听一次数据改变,将下面这句代码注释掉
          //countDownLatch1.countDown();
        }
      }
    //监听模式,PERSISTENT是不监听子孙节点,PERSISTENT_RECURSIVE是递归监听子孙节点
    }, AddWatchMode.PERSISTENT_RECURSIVE);
    countDownLatch1.await();
  } catch (KeeperException | InterruptedException e) {
    e.printStackTrace();
  }

countDownLatch1.await();要阻塞线程,最好启动一条新线程来监听。

只有设置了监听的zkCli,该节点发生事件时才会收到zkServer的通知。

watch只保存在zkServer的内存中(zk依赖jdk,运行在jvm上,堆中的session对象),不持久化到硬盘,就是说设置的监听只在本次会话期间有效,zkCli关闭连接,zkServer在指定时间后(默认连续没有收到10个心跳),zkServer会自动删除相关session,watcher丢失。

移除监听

//移除监听 方式一
  try {
    zooKeeper.addWatch("/mall",null,AddWatchMode.PERSISTENT);
    System.out.println("已移除监听");
  } catch (KeeperException | InterruptedException e) {
    e.printStackTrace();
  }

就是上面添加监听的哪些方法,watch|watcher参数,如果是boolean类型,设置为false即可关闭监听;如果是Watcher类型,可以设置null覆盖掉之前设置的监听。

//移除监听 方式二
  try {
    //第二个参数是Watcher,原来添加的那个Watcher监听对象,不能是null
    //第三个参数指定要移除监听的哪部分,Any是移除整个监听,Data是移除对数据的监听,Children是移除对子节点的递归监听
    //最后一个参数指定未连接到zkServe时,是否移除本地监听部分
    zooKeeper.removeWatches("/mall",watcher, Watcher.WatcherType.Children,true);
  } catch (InterruptedException | KeeperException e) {
    e.printStackTrace();
  }

监听由2部分组成,一部分在zkServer上,事件发生时通知对应的zkCli;一部分在zkCli,收到zkServer的通知时做出一些处理。

最后一个参数指定未连接到zkServer,是否移除本地(zkCli)监听部分,true——移除,false——不移除。

比如说没有连接到zkServer,移除本地监听,10个心跳内连上了zkServer,zkServer的监听部分仍在,发生事件时仍会通知此zkCli,但zkCli本地监听已经移除了,对通知不会做出处理。

第一种方式会移除整个监听,不需要传入监听对象watcher;

第二种方式功能更全,可以指定移除监听的哪个部分,但需要传入watcher对象,添加监听时要用一个变量来保存watcher对象。

到此这篇关于在Java中操作Zookeeper的示例代码详解的文章就介绍到这了,更多相关Java操作Zookeeper内容请搜索以前的文章或继续浏览下面的相关文章希望大家以后多多支持!


推荐阅读
  • 本文由编程笔记#小编为大家整理,主要介绍了StartingzookeeperFAILEDTOSTART相关的知识,希望对你有一定的参考价值。下载路径:https://ar ... [详细]
  • 本文介绍了在Linux下安装和配置Kafka的方法,包括安装JDK、下载和解压Kafka、配置Kafka的参数,以及配置Kafka的日志目录、服务器IP和日志存放路径等。同时还提供了单机配置部署的方法和zookeeper地址和端口的配置。通过实操成功的案例,帮助读者快速完成Kafka的安装和配置。 ... [详细]
  • 本文总结了初学者在使用dubbo设计架构过程中遇到的问题,并提供了相应的解决方法。问题包括传输字节流限制、分布式事务、序列化、多点部署、zk端口冲突、服务失败请求3次机制以及启动时检查。通过解决这些问题,初学者能够更好地理解和应用dubbo设计架构。 ... [详细]
  • 开发笔记:读《分布式一致性原理》JAVA客户端API操作2
    篇首语:本文由编程笔记#小编为大家整理,主要介绍了读《分布式一致性原理》JAVA客户端API操作2相关的知识,希望对你有一定的参考价值。创 ... [详细]
  • 我们在之前的文章中已经初步介绍了Cloudera。hadoop基础----hadoop实战(零)-----hadoop的平台版本选择从版本选择这篇文章中我们了解到除了hadoop官方版本外很多 ... [详细]
  • 我正在使用sql-serverkafka-connect和debezium监视sqlserver数据库,但是当我发布并运行我的wo ... [详细]
  • mac php错误日志配置方法及错误级别修改
    本文介绍了在mac环境下配置php错误日志的方法,包括修改php.ini文件和httpd.conf文件的操作步骤。同时还介绍了如何修改错误级别,以及相应的错误级别参考链接。 ... [详细]
  • 一句话解决高并发的核心原则
    本文介绍了解决高并发的核心原则,即将用户访问请求尽量往前推,避免访问CDN、静态服务器、动态服务器、数据库和存储,从而实现高性能、高并发、高可扩展的网站架构。同时提到了Google的成功案例,以及适用于千万级别PV站和亿级PV网站的架构层次。 ... [详细]
  • 如何提高PHP编程技能及推荐高级教程
    本文介绍了如何提高PHP编程技能的方法,推荐了一些高级教程。学习任何一种编程语言都需要长期的坚持和不懈的努力,本文提醒读者要有足够的耐心和时间投入。通过实践操作学习,可以更好地理解和掌握PHP语言的特异性,特别是单引号和双引号的用法。同时,本文也指出了只走马观花看整体而不深入学习的学习方式无法真正掌握这门语言,建议读者要从整体来考虑局部,培养大局观。最后,本文提醒读者完成一个像模像样的网站需要付出更多的努力和实践。 ... [详细]
  • 本文介绍了使用Spark实现低配版高斯朴素贝叶斯模型的原因和原理。随着数据量的增大,单机上运行高斯朴素贝叶斯模型会变得很慢,因此考虑使用Spark来加速运行。然而,Spark的MLlib并没有实现高斯朴素贝叶斯模型,因此需要自己动手实现。文章还介绍了朴素贝叶斯的原理和公式,并对具有多个特征和类别的模型进行了讨论。最后,作者总结了实现低配版高斯朴素贝叶斯模型的步骤。 ... [详细]
  • Activiti7流程定义开发笔记
    本文介绍了Activiti7流程定义的开发笔记,包括流程定义的概念、使用activiti-explorer和activiti-eclipse-designer进行建模的方式,以及生成流程图的方法。还介绍了流程定义部署的概念和步骤,包括将bpmn和png文件添加部署到activiti数据库中的方法,以及使用ZIP包进行部署的方式。同时还提到了activiti.cfg.xml文件的作用。 ... [详细]
  • Android日历提醒软件开源项目分享及使用教程
    本文介绍了一款名为Android日历提醒软件的开源项目,作者分享了该项目的代码和使用教程,并提供了GitHub项目地址。文章详细介绍了该软件的主界面风格、日程信息的分类查看功能,以及添加日程提醒和查看详情的界面。同时,作者还提醒了读者在使用过程中可能遇到的Android6.0权限问题,并提供了解决方法。 ... [详细]
  • 大数据Hadoop生态(20)MapReduce框架原理OutputFormat的开发笔记
    本文介绍了大数据Hadoop生态(20)MapReduce框架原理OutputFormat的开发笔记,包括outputFormat接口实现类、自定义outputFormat步骤和案例。案例中将包含nty的日志输出到nty.log文件,其他日志输出到other.log文件。同时提供了一些相关网址供参考。 ... [详细]
  • 本文讨论了在shiro java配置中加入Shiro listener后启动失败的问题。作者引入了一系列jar包,并在web.xml中配置了相关内容,但启动后却无法正常运行。文章提供了具体引入的jar包和web.xml的配置内容,并指出可能的错误原因。该问题可能与jar包版本不兼容、web.xml配置错误等有关。 ... [详细]
  • 负载均衡_Nginx反向代理动静分离负载均衡及rewrite隐藏路径详解(Nginx Apache MySQL Redis)–第二部分
    nginx反向代理、动静分离、负载均衡及rewrite隐藏路径详解 ... [详细]
author-avatar
码农
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有