前面介绍使用 ZooKeeper 类进行创建、删除、修改数据均是一次仅仅对 zookeeper 操作一次,下面将介绍通过 multi() 方法一次提交多个操作到 zookeeper 服务。
List<org.apache.zookeeper.OpResult> multi(Iterable<org.apache.zookeeper.Op> ops) 执行多个或全部 ZooKeeper 操作。执行成功时,将返回一个结果列表。如果失败,则会引发异常,其中包含部分结果和错误详情,请参阅 KeeperException.getResults()
注意:单次请求中所有 setData 操作的所有数据数组的最大允许大小通常为 1 MB(1,048,576 字节)。服务器通过 jute.maxbuffer 指定了这一限制。大于此值的请求将导致 KeeperException 异常。
void multi(Iterable<org.apache.zookeeper.Op> ops, AsyncCallback.MultiCallback cb, Object ctx) 异步版本。
参数说明:
ops - 迭代器,包含要执行的操作。这些操作应使用 Op 工厂方法创建。
cb - 回调接口
ctx - 上下文数据,用来向回调接口传递扩展数据
下面示例显示了如何批量操作 zookeeper,如一次创建三个节点,然后删除一个节点。代码如下:
package com.hxstrive.zookeeper; import com.alibaba.fastjson.JSONObject; import org.apache.zookeeper.*; import org.apache.zookeeper.data.Stat; import org.junit.Before; import org.junit.Test; import java.util.*; /** * 批量执行 zookeeper 操作 * @author hxstrive.com */ public class MultiNode { private static ZooKeeper zooKeeper; @Before public void init() throws Exception { zooKeeper = new ZooKeeper("127.0.0.1:2181", 2000, new Watcher() { public void process(WatchedEvent watchedEvent) { System.out.println("触发了 " + watchedEvent.getType() + " 事件"); } }); // 如果节点不存在,则创建节点 Stat stat = zooKeeper.exists("/multi_node", false); if(null == stat) { String nodeName = zooKeeper.create("/multi_node", "init value".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); System.out.println("nodeName = " + nodeName); } } /** * 同步 */ @Test public void syncDemo() throws Exception { // 1.批量创建和删除节点 List<Op> opList = new ArrayList<>(); // 创建三个节点 opList.add(Op.create("/multi_node/node1", "data1".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT)); opList.add(Op.create("/multi_node/node2", "data2".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT)); opList.add(Op.create("/multi_node/node3", "data3".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT)); // 删除一个节点 opList.add(Op.delete("/multi_node/node3", -1)); List<OpResult> results = zooKeeper.multi(opList); for(OpResult result : results) { System.out.println("结果:" + JSONObject.toJSONString(result)); } // 2.获取 /multi_node 节点的子节点列表 List<String> list = zooKeeper.getChildren("/multi_node", false); System.out.println("/multi_node 节点的子节点有 " + Arrays.toString(list.toArray())); //输出: //触发了 None 事件 //nodeName = /multi_node //结果:{"path":"/multi_node/node1","type":1} //结果:{"path":"/multi_node/node2","type":1} //结果:{"path":"/multi_node/node3","type":1} //结果:{"type":2} ///multi_node 节点的子节点有 [node2, node1] } /** * 异步 */ @Test public void asyncDemo() throws Exception { // 1.批量创建和删除节点 List<Op> opList = new ArrayList<>(); // 创建三个节点 opList.add(Op.create("/multi_node/node1", "data1".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT)); opList.add(Op.create("/multi_node/node2", "data2".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT)); opList.add(Op.create("/multi_node/node3", "data3".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT)); // 删除一个节点 opList.add(Op.delete("/multi_node/node3", -1)); zooKeeper.multi(opList, new AsyncCallback.MultiCallback() { @Override public void processResult(int i, String s, Object o, List<OpResult> results) { try { System.out.println("i = " + i); System.out.println("s = " + s); System.out.println("o = " + o); for (OpResult result : results) { System.out.println("结果:" + JSONObject.toJSONString(result)); } // 2.获取 /multi_node 节点的子节点列表 List<String> list = zooKeeper.getChildren("/multi_node", false); System.out.println("/multi_node 节点的子节点有 " + Arrays.toString(list.toArray())); } catch (Exception e) { e.printStackTrace(); } } }, "我是扩展数据"); // 暂停片刻 Thread.sleep(1000); //输出: //触发了 None 事件 //i = -110 //s = null //o = 我是扩展数据 //结果:{"err":-110,"type":-1} //结果:{"err":-2,"type":-1} //结果:{"err":-2,"type":-1} //结果:{"err":-2,"type":-1} ///multi_node 节点的子节点有 [node2, node1] } }