本章将介绍如何通过 Java 方式对 Zookeeper 节点进行新增操作(同步/异步),项目 maven 依赖信息见 “ZooKeeper Java 简单应用” 章节。
在 ZooKeeper 提供的 API 中对节点的 CRUD 操作均是通过 org.apache.zookeeper.ZooKeeper 类完成的,该类是 ZooKeeper 客户端库的主类,要使用 ZooKeeper 服务,应用程序必须首先实例化一个 ZooKeeper 类的对象。
注意:ZooKeeper 类的方法都是线程安全的。
ZooKeeper 类提供了两个方法来新增节点,方法定义如下:
String create(String path, byte[] data, List<ACL> acl, CreateMode createMode) 用给定的路径创建一个节点(同步版本)。
void create(String path, byte[] data, List<ACL> acl, CreateMode createMode, AsyncCallback.StringCallback cb, Object ctx) 创建一个节点,异步版本。
参数说明:
path - 节点路径
data - 节点初始化数据
acl - 节点 ACL 列表,我们可以使用 ZooDefs.Ids 进行指定,取值如下:
static Id ANYONE_ID_UNSAFE 这个 Id 代表任何人。
static Id AUTH_IDS 此 Id 仅用于设置 ACL。
static ArrayList<ACL> CREATOR_ALL_ACL 创建者身份验证 ID 所有权限。
static ArrayList<ACL> OPEN_ACL_UNSAFE 完全开放的 ACL。
static ArrayList<ACL> READ_ACL_UNSAFE 拥有读能力。
createMode - 指定要创建的节点是临时节点、还是顺序节点,CreateMode 值决定在 ZooKeeper 上创建 znode 的方式。CreateMode 枚举类取值如下:
EPHEMERAL 客户端断开连接后,znode 将被删除。
EPHEMERAL_SEQUENTIAL 客户端断开连接后,znode 将被删除,其名称将附加一个单调递增的数字。
PERSISTENT 客户端断开连接后,znode 不会自动删除。
PERSISTENT_SEQUENTIAL 客户端断开连接时,znode 不会被自动删除,其名称将附加一个单调递增的数字。
cb - 回调接口,用于获取节点的名称。
ctx - 一个上下文对象,用于在回调时传递额外的上下文信息。
下面通过一个例子展示如何通过 ZooKeeper 类实现同步和异步创建节点,代码如下:
package com.hxstrive.zookeeper; import org.apache.zookeeper.*; import org.junit.Before; import org.junit.Test; import java.util.HashMap; import java.util.Map; import java.util.concurrent.CountDownLatch; /** * 创建节点 * @author hxstrive.com */ public class CreateNode { private static ZooKeeper zooKeeper; @Before public void init() throws Exception { zooKeeper = new ZooKeeper("127.0.0.1:2181", 2000, new Watcher() { public void process(WatchedEvent watchedEvent) { System.out.println("触发了 " + watchedEvent.getType() + " 事件"); } }); } /** * 同步创建节点 */ @Test public void syncDemo() throws Exception { // ZooDefs.Ids.OPEN_ACL_UNSAFE 完全开放权限 // CreateMode.PERSISTENT 持久化节点 String nodeName = zooKeeper.create("/sync_create_node", "hello world".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); System.out.println("nodeName = " + nodeName); //输出: //触发了 None 事件 //nodeName = /sync_create_node } /** * 异步创建节点 */ @Test public void asyncDemo() throws Exception { // ZooDefs.Ids.OPEN_ACL_UNSAFE 完全开放权限 // CreateMode.PERSISTENT 持久化节点 Map<String,String> extData = new HashMap<>(); extData.put("code", "C100"); extData.put("title", "这是标题"); // 这里的 CountDownLatch 仅仅是为了能够等待 AsyncCallback.StringCallback // 被触发后才结束程序 CountDownLatch countDownLatch = new CountDownLatch(1); zooKeeper.create("/async_create_node", "hello world".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, // 回调接口实现 new AsyncCallback.StringCallback() { public void processResult(int i, String s, Object o, String s1) { System.out.println("i = " + i); // 返回代码或调用结果。 System.out.println("s = " + s); // 节点名称 System.out.println("o = " + o); // 扩展数据,即外面传递的 extData // 创建的 Z 节点的名称。创建成功后,名称和路径通常相等,除非创建的是顺序节点。 System.out.println("s1 = " + s1); countDownLatch.countDown(); } }, // 传递扩展数据,可以在回调 processResult 方法中获取到 extData); countDownLatch.await(); //输出: //触发了 None 事件 //i = 0 //s = /async_create_node //o = {code=C100, title=这是标题} //s1 = /async_create_node } }