下面介绍怎样使用 ZooKeeper 类判断节点是否存在,以及在节点上添加监听器。
Stat exists(String path, boolean watch) 返回指定路径节点的状态,如果节点不存在,则返回 null。
void exists(String path, boolean watch, AsyncCallback.StatCallback cb, Object ctx) 异步版本
Stat exists(String path, Watcher watcher) 返回指定路径节点的状态,如果节点不存在,则返回 null。
void exists(String path, Watcher watcher, AsyncCallback.StatCallback cb, Object ctx) 异步版本
参数说明:
path - 节点路径
watch - 是否需要观察这个节点。如果 watch 为 true 且调用成功(无异常抛出),则会在监视该节点。如果创建/删除节点或在节点上设置数据操作成功,就会触发监视。
watcher - 监视器,用来接收节点的监视事件
cb - 回调接口
ctx - 上下文对象,传递扩展数据到回调接口
该示例将演示如何判断节点是否存在,以及如何在判断是否存在时添加监视器,监听节点状态的变化。代码如下:
package com.hxstrive.zookeeper; import com.alibaba.fastjson.JSONObject; import org.apache.zookeeper.*; import org.apache.zookeeper.data.Stat; import org.junit.Before; import org.junit.Test; import java.util.HashMap; import java.util.Map; import java.util.concurrent.CountDownLatch; /** * 判断节点是否存在 * @author hxstrive.com */ public class ExistsNode { private static ZooKeeper zooKeeper; @Before public void init() throws Exception { zooKeeper = new ZooKeeper("127.0.0.1:2181", 2000, new Watcher() { public void process(WatchedEvent watchedEvent) { System.out.println("触发了 " + watchedEvent.getType() + " 事件"); } }); // 如果节点不存在,则创建节点 Stat stat = zooKeeper.exists("/exists_node", false); if(null == stat) { String nodeName = zooKeeper.create("/exists_node", "init value".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); System.out.println("nodeName = " + nodeName); } } /** * 同步判断节点是否存在 */ @Test public void syncDemo() throws Exception { // 注释监听器 zooKeeper.register(new Watcher() { @Override public void process(WatchedEvent watchedEvent) { System.out.println(watchedEvent.getType() + " = " + watchedEvent.getPath()); } }); // 判断节点是否存在,然后监听节点 Stat stat = zooKeeper.exists("/exists_node", true); if(null != stat) { System.out.println("节点存在1"); // 修改节点数据,会触发监听 zooKeeper.setData("/exists_node", "new value 1".getBytes(), -1); } // 判断节点是否存在,不添加监听 stat = zooKeeper.exists("/exists_node", false); if(null != stat) { System.out.println("节点存在2"); // 修改节点数据,不会触发监听 zooKeeper.setData("/exists_node", "new value 2".getBytes(), -1); } //输出:触发了 None 事件 //节点存在1 //NodeDataChanged = /exists_node //节点存在2 } @Test public void syncDemo2() throws Exception { // 判断节点是否存在,然后添加监听器 Stat stat = zooKeeper.exists("/exists_node", new Watcher() { @Override public void process(WatchedEvent watchedEvent) { System.out.println(watchedEvent.getType() + " = " + watchedEvent.getPath()); } }); if(null != stat) { System.out.println("节点存在"); // 修改节点数据,会触发监听 zooKeeper.setData("/exists_node", "new value 1".getBytes(), -1); } //输出: //触发了 None 事件 //节点存在 //NodeDataChanged = /exists_node } /** * 异步判断节点是否存在 */ @Test public void asyncDemo() throws Exception { Map<String,String> extData = new HashMap<>(); extData.put("code", "C100"); extData.put("title", "这是标题"); zooKeeper.register(new Watcher() { @Override public void process(WatchedEvent watchedEvent) { System.out.println(watchedEvent.getType() + " = " + watchedEvent.getPath()); } }); // 这里的 CountDownLatch 仅仅是为了能够等待 AsyncCallback.StatCallback CountDownLatch countDownLatch = new CountDownLatch(1); zooKeeper.exists("/exists_node", true, // 回调接口实现 new AsyncCallback.StatCallback() { @Override public void processResult(int i, String s, Object o, Stat stat) { System.out.println("i = " + i); System.out.println("s = " + s); System.out.println("o = " + o); // 扩展数据 System.out.println("stat = " + JSONObject.toJSONString(stat)); countDownLatch.countDown(); } }, // 传递扩展数据,可以在回调 processResult 方法中获取到 extData); countDownLatch.await(); // 修改节点数据,会触发监听 zooKeeper.setData("/exists_node", "new value 1".getBytes(), -1); //输出: //触发了 None 事件 //i = 0 //s = /exists_node //o = {code=C100, title=这是标题} //stat = {"aversion":0,"ctime":1703682864376,"cversion":0,"czxid":162,"dataLength":11, // "ephemeralOwner":0,"mtime":1703772652389,"mzxid":211,"numChildren":0,"pzxid":162,"version":6} //NodeDataChanged = /exists_node } }