前面介绍使用 ZooKeeper 类进行创建、删除、修改数据均是一次仅仅对 zookeeper 操作一次,下面将介绍通过 multi() 方法一次提交多个操作到 zookeeper 服务。
List<org.apache.zookeeper.OpResult> multi(Iterable<org.apache.zookeeper.Op> ops) 执行多个或全部 ZooKeeper 操作。执行成功时,将返回一个结果列表。如果失败,则会引发异常,其中包含部分结果和错误详情,请参阅 KeeperException.getResults()
注意:单次请求中所有 setData 操作的所有数据数组的最大允许大小通常为 1 MB(1,048,576 字节)。服务器通过 jute.maxbuffer 指定了这一限制。大于此值的请求将导致 KeeperException 异常。
void multi(Iterable<org.apache.zookeeper.Op> ops, AsyncCallback.MultiCallback cb, Object ctx) 异步版本。
参数说明:
ops - 迭代器,包含要执行的操作。这些操作应使用 Op 工厂方法创建。
cb - 回调接口
ctx - 上下文数据,用来向回调接口传递扩展数据
下面示例显示了如何批量操作 zookeeper,如一次创建三个节点,然后删除一个节点。代码如下:
package com.hxstrive.zookeeper;
import com.alibaba.fastjson.JSONObject;
import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;
import org.junit.Before;
import org.junit.Test;
import java.util.*;
/**
* 批量执行 zookeeper 操作
* @author hxstrive.com
*/
public class MultiNode {
private static ZooKeeper zooKeeper;
@Before
public void init() throws Exception {
zooKeeper = new ZooKeeper("127.0.0.1:2181", 2000, new Watcher() {
public void process(WatchedEvent watchedEvent) {
System.out.println("触发了 " + watchedEvent.getType() + " 事件");
}
});
// 如果节点不存在,则创建节点
Stat stat = zooKeeper.exists("/multi_node", false);
if(null == stat) {
String nodeName = zooKeeper.create("/multi_node", "init value".getBytes(),
ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
System.out.println("nodeName = " + nodeName);
}
}
/**
* 同步
*/
@Test
public void syncDemo() throws Exception {
// 1.批量创建和删除节点
List<Op> opList = new ArrayList<>();
// 创建三个节点
opList.add(Op.create("/multi_node/node1", "data1".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT));
opList.add(Op.create("/multi_node/node2", "data2".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT));
opList.add(Op.create("/multi_node/node3", "data3".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT));
// 删除一个节点
opList.add(Op.delete("/multi_node/node3", -1));
List<OpResult> results = zooKeeper.multi(opList);
for(OpResult result : results) {
System.out.println("结果:" + JSONObject.toJSONString(result));
}
// 2.获取 /multi_node 节点的子节点列表
List<String> list = zooKeeper.getChildren("/multi_node", false);
System.out.println("/multi_node 节点的子节点有 " + Arrays.toString(list.toArray()));
//输出:
//触发了 None 事件
//nodeName = /multi_node
//结果:{"path":"/multi_node/node1","type":1}
//结果:{"path":"/multi_node/node2","type":1}
//结果:{"path":"/multi_node/node3","type":1}
//结果:{"type":2}
///multi_node 节点的子节点有 [node2, node1]
}
/**
* 异步
*/
@Test
public void asyncDemo() throws Exception {
// 1.批量创建和删除节点
List<Op> opList = new ArrayList<>();
// 创建三个节点
opList.add(Op.create("/multi_node/node1", "data1".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT));
opList.add(Op.create("/multi_node/node2", "data2".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT));
opList.add(Op.create("/multi_node/node3", "data3".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT));
// 删除一个节点
opList.add(Op.delete("/multi_node/node3", -1));
zooKeeper.multi(opList, new AsyncCallback.MultiCallback() {
@Override
public void processResult(int i, String s, Object o, List<OpResult> results) {
try {
System.out.println("i = " + i);
System.out.println("s = " + s);
System.out.println("o = " + o);
for (OpResult result : results) {
System.out.println("结果:" + JSONObject.toJSONString(result));
}
// 2.获取 /multi_node 节点的子节点列表
List<String> list = zooKeeper.getChildren("/multi_node", false);
System.out.println("/multi_node 节点的子节点有 " + Arrays.toString(list.toArray()));
} catch (Exception e) {
e.printStackTrace();
}
}
}, "我是扩展数据");
// 暂停片刻
Thread.sleep(1000);
//输出:
//触发了 None 事件
//i = -110
//s = null
//o = 我是扩展数据
//结果:{"err":-110,"type":-1}
//结果:{"err":-2,"type":-1}
//结果:{"err":-2,"type":-1}
//结果:{"err":-2,"type":-1}
///multi_node 节点的子节点有 [node2, node1]
}
}