作者:蒯曼荷学_936 | 来源:互联网 | 2023-10-10 10:49
一、Zookeeper的节点结构和节点类型1.1.zookeeper数据结构1、层次化的目录结构,命名符合常规文件系统规范(见下图)2、每个节点在zookeeper中叫做znode,并且其有一个唯
一、Zookeeper的节点结构和节点类型
1.1. zookeeper数据结构
1、层次化的目录结构,命名符合常规文件系统规范(见下图)
2、每个节点在zookeeper中叫做znode,并且其有一个唯一的路径标识
3、节点Znode可以包含数据和子节点(但是EPHEMERAL类型的节点不能有子节点)
4、客户端应用可以在节点上设置监视器
1.2. 节点类型
1、Znode有两种类型:
短暂(ephemeral)(断开连接自己删除)
也叫临时节点,临时节点不能有子节点。
持久(persistent)(断开连接不删除)
2、Znode有四种形式的目录节点(默认是persistent )
PERSISTENT
PERSISTENT_SEQUENTIAL(持久序列/test0000000019)
EPHEMERAL
EPHEMERAL_SEQUENTIAL
3、创建znode时设置顺序标识,znode名称后会附加一个值,顺序号是一个单调递增的计数器,由父节点维护
4、在分布式系统中,顺序号可以被用于为所有的事件进行全局排序,这样客户端可以通过顺序号推断事件的顺序
二、常用命令
运行 zkCli.sh –server
客户端登陆
1、使用 ls 命令来查看当前 ZooKeeper 中所包含的内容:
[zk: 202.115.36.251:2181(CONNECTED) 1] ls /
使用这个命令后会监听节点的变化,当节点变化时会出现类似下面的信息:
#WATCHER::
#...
2、创建一个新的 znode ,使用 create /zk myData 。这个命令创建了一个新的 znode 节点“ zk ”以及与它关联的字符串:
[zk: 202.115.36.251:2181(CONNECTED) 2] create /zk "myData“
3、我们运行 get 命令来确认 znode 是否包含我们所创建的字符串:
[zk: 202.115.36.251:2181(CONNECTED) 3] get /zk
#监听这个节点数据的变化,当另外一个客户端改变/zk时,它会打出下面的
#WATCHER::
#WatchedEvent state:SyncConnectedtype:NodeDataChanged path:/zk
[zk: localhost:2181(CONNECTED) 4] get /zkwatch
4、下面我们通过 set 命令来对 zk 所关联的字符串进行设置:
[zk: 202.115.36.251:2181(CONNECTED) 4] set /zk "zsl“
5、下面我们将刚才创建的 znode 删除:
[zk: 202.115.36.251:2181(CONNECTED) 5] delete /zk
6、删除节点:rmr
[zk: 202.115.36.251:2181(CONNECTED) 5] rmr /zk
quit退出客户端
7、远程开启服务端
ssh 192.168.65.135 "source /etc/profile;/root/apps/zookeeper-3.4.5/bin/zkServer.sh start"
三、常用API
功能 |
描述 |
create |
在本地目录树中创建一个节点 |
delete |
删除一个节点 |
exists |
测试本地是否存在目标节点 |
get/set data |
从目标节点上读取 / 写数据 |
get/set ACL |
获取 / 设置目标节点访问控制列表信息 |
get children |
检索一个子节点上的列表 |
sync |
等待要被传送的数据 |
增删查改操作:
public class SimpleZkClient {
private static final String cOnnectString= "mini1:2181,mini2:2181,mini3:2181";
private static final int sessiOnTimeout= 2000;
ZooKeeper zkClient = null;
@Before
public void init() throws Exception {
zkClient = new ZooKeeper(connectString, sessionTimeout, new Watcher() {
@Override
public void process(WatchedEvent event) {
// 收到事件通知后的回调函数(应该是我们自己的事件处理逻辑)
System.out.println(event.getType() + "---" + event.getPath());
try {
zkClient.getChildren("/", true);
} catch (Exception e) {
}
}
});
}
/**
* 数据的增删改查
*
* @throws InterruptedException
* @throws KeeperException
*/
// 创建数据节点到zk中
public void testCreate() throws KeeperException, InterruptedException {
// 参数1:要创建的节点的路径 参数2:节点大数据 参数3:节点的权限 参数4:节点的类型
String nodeCreated = zkClient.create("/eclipse", "hellozk".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
//上传的数据可以是任何类型,但都要转成byte[]
}
//判断znode是否存在
@Test
public void testExist() throws Exception{
Stat stat = zkClient.exists("/eclipse", false);
System.out.println(stat==null?"not exist":"exist");
}
// 获取子节点
@Test
public void getChildren() throws Exception {
List children = zkClient.getChildren("/", true);
for (String child : children) {
System.out.println(child);
}
Thread.sleep(Long.MAX_VALUE);
}
//获取znode的数据
@Test
public void getData() throws Exception {
byte[] data = zkClient.getData("/eclipse", false, null);
System.out.println(new String(data));
}
//删除znode
@Test
public void deleteZnode() throws Exception {
//参数2:指定要删除的版本,-1表示删除所有版本
zkClient.delete("/eclipse", -1);
}
//删除znode
@Test
public void setData() throws Exception {
zkClient.setData("/app1", "imissyou angelababy".getBytes(), -1);
byte[] data = zkClient.getData("/app1", false, null);
System.out.println(new String(data));
}
}
四、案例
实现对分布式系统中服务器上下线的监控:
public class DistributedServer {
private static final String cOnnectString= "mini1:2181,mini2:2181,mini3:2181";
private static final int sessiOnTimeout= 2000;
private static final String parentNode = "/servers";
private ZooKeeper zk = null;
/**
* 创建到zk的客户端连接
*
* @throws Exception
*/
public void getConnect() throws Exception {
zk = new ZooKeeper(connectString, sessionTimeout, new Watcher() {
@Override
public void process(WatchedEvent event) {
// 收到事件通知后的回调函数(应该是我们自己的事件处理逻辑)
System.out.println(event.getType() + "---" + event.getPath());
try {
zk.getChildren("/", true);
} catch (Exception e) {
}
}
});
}
/**
* 向zk集群注册服务器信息
*
* @param hostname
* @throws Exception
*/
public void registerServer(String hostname) throws Exception {
String create = zk.create(parentNode + "/server", hostname.getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
System.out.println(hostname + "is online.." + create);
}
/**
* 业务功能
*
* @throws InterruptedException
*/
public void handleBussiness(String hostname) throws InterruptedException {
System.out.println(hostname + "start working.....");
Thread.sleep(Long.MAX_VALUE);
}
public static void main(String[] args) throws Exception {
// 获取zk连接
DistributedServer server = new DistributedServer();
server.getConnect();
// 利用zk连接注册服务器信息
server.registerServer(args[0]);
// 启动业务功能
server.handleBussiness(args[0]);
}
}
public class DistributedClient {private static final String cOnnectString= "mini1:2181,mini2:2181,mini3:2181";private static final int sessiOnTimeout= 2000;private static final String parentNode = "/servers";// 注意:加volatile的意义何在?private volatile List serverList;private ZooKeeper zk = null;/** * 创建到zk的客户端连接 * * @throws Exception */public void getConnect() throws Exception {zk = new ZooKeeper(connectString, sessionTimeout, new Watcher() {@Overridepublic void process(WatchedEvent event) {// 收到事件通知后的回调函数(应该是我们自己的事件处理逻辑)try {//重新更新服务器列表,并且注册了监听getServerList();} catch (Exception e) {}}});}/** * 获取服务器信息列表 * * @throws Exception */public void getServerList() throws Exception {// 获取服务器子节点信息,并且对父节点进行监听List children = zk.getChildren(parentNode, true);// 先创建一个局部的list来存服务器信息List servers = new ArrayList();for (String child : children) {// child只是子节点的节点名byte[] data = zk.getData(parentNode + "/" + child, false, null);servers.add(new String(data));}// 把servers赋值给成员变量serverList,已提供给各业务线程使用serverList = servers;//打印服务器列表System.out.println(serverList);}/** * 业务功能 * * @throws InterruptedException */public void handleBussiness() throws InterruptedException {System.out.println("client start working.....");Thread.sleep(Long.MAX_VALUE);}public static void main(String[] args) throws Exception {// 获取zk连接DistributedClient client = new DistributedClient();client.getConnect();// 获取servers的子节点信息(并监听),从中获取服务器信息列表client.getServerList();// 业务线程启动client.handleBussiness();}}