Zookeeper Java 批量操作

前面介绍使用 ZooKeeper 类进行创建、删除、修改数据均是一次仅仅对 zookeeper 操作一次,下面将介绍通过 multi() 方法一次提交多个操作到 zookeeper 服务。

方法定义

List<org.apache.zookeeper.OpResult> multi(Iterable<org.apache.zookeeper.Op> ops)  执行多个或全部 ZooKeeper 操作。执行成功时,将返回一个结果列表。如果失败,则会引发异常,其中包含部分结果和错误详情,请参阅 KeeperException.getResults()

注意:单次请求中所有 setData 操作的所有数据数组的最大允许大小通常为 1 MB(1,048,576 字节)。服务器通过 jute.maxbuffer 指定了这一限制。大于此值的请求将导致 KeeperException 异常。

void multi(Iterable<org.apache.zookeeper.Op> ops, AsyncCallback.MultiCallback cb, Object ctx)  异步版本。

参数说明:

  • ops - 迭代器,包含要执行的操作。这些操作应使用 Op 工厂方法创建。

  • cb - 回调接口

  • ctx - 上下文数据,用来向回调接口传递扩展数据

完整示例

下面示例显示了如何批量操作 zookeeper,如一次创建三个节点,然后删除一个节点。代码如下:

package com.hxstrive.zookeeper;

import com.alibaba.fastjson.JSONObject;
import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;
import org.junit.Before;
import org.junit.Test;
import java.util.*;

/**
* 批量执行 zookeeper 操作
* @author hxstrive.com
*/
public class MultiNode {
   private static ZooKeeper zooKeeper;

   @Before
   public void init() throws Exception {
       zooKeeper = new ZooKeeper("127.0.0.1:2181", 2000, new Watcher() {
           public void process(WatchedEvent watchedEvent) {
               System.out.println("触发了 " + watchedEvent.getType() + " 事件");
           }
       });

       // 如果节点不存在,则创建节点
       Stat stat = zooKeeper.exists("/multi_node", false);
       if(null == stat) {
           String nodeName = zooKeeper.create("/multi_node", "init value".getBytes(),
                   ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
           System.out.println("nodeName = " + nodeName);
       }
   }

   /**
    * 同步
    */
   @Test
   public void syncDemo() throws Exception {
       // 1.批量创建和删除节点
       List<Op> opList = new ArrayList<>();
       // 创建三个节点
       opList.add(Op.create("/multi_node/node1", "data1".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT));
       opList.add(Op.create("/multi_node/node2", "data2".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT));
       opList.add(Op.create("/multi_node/node3", "data3".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT));
       // 删除一个节点
       opList.add(Op.delete("/multi_node/node3", -1));
       List<OpResult> results = zooKeeper.multi(opList);
       for(OpResult result : results) {
           System.out.println("结果:" + JSONObject.toJSONString(result));
       }

       // 2.获取 /multi_node 节点的子节点列表
       List<String> list = zooKeeper.getChildren("/multi_node", false);
       System.out.println("/multi_node 节点的子节点有 " + Arrays.toString(list.toArray()));

       //输出:
       //触发了 None 事件
       //nodeName = /multi_node
       //结果:{"path":"/multi_node/node1","type":1}
       //结果:{"path":"/multi_node/node2","type":1}
       //结果:{"path":"/multi_node/node3","type":1}
       //结果:{"type":2}
       ///multi_node 节点的子节点有 [node2, node1]
   }

   /**
    * 异步
    */
   @Test
   public void asyncDemo() throws Exception {
       // 1.批量创建和删除节点
       List<Op> opList = new ArrayList<>();
       // 创建三个节点
       opList.add(Op.create("/multi_node/node1", "data1".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT));
       opList.add(Op.create("/multi_node/node2", "data2".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT));
       opList.add(Op.create("/multi_node/node3", "data3".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT));
       // 删除一个节点
       opList.add(Op.delete("/multi_node/node3", -1));
       zooKeeper.multi(opList, new AsyncCallback.MultiCallback() {
           @Override
           public void processResult(int i, String s, Object o, List<OpResult> results) {
               try {
                   System.out.println("i = " + i);
                   System.out.println("s = " + s);
                   System.out.println("o = " + o);
                   for (OpResult result : results) {
                       System.out.println("结果:" + JSONObject.toJSONString(result));
                   }

                   // 2.获取 /multi_node 节点的子节点列表
                   List<String> list = zooKeeper.getChildren("/multi_node", false);
                   System.out.println("/multi_node 节点的子节点有 " + Arrays.toString(list.toArray()));
               } catch (Exception e) {
                   e.printStackTrace();
               }
           }
       }, "我是扩展数据");

       // 暂停片刻
       Thread.sleep(1000);

       //输出:
       //触发了 None 事件
       //i = -110
       //s = null
       //o = 我是扩展数据
       //结果:{"err":-110,"type":-1}
       //结果:{"err":-2,"type":-1}
       //结果:{"err":-2,"type":-1}
       //结果:{"err":-2,"type":-1}
       ///multi_node 节点的子节点有 [node2, node1]
   }

}

说说我的看法
全部评论(
没有评论
关于
本网站专注于 Java、数据库(MySQL、Oracle)、Linux、软件架构及大数据等多领域技术知识分享。涵盖丰富的原创与精选技术文章,助力技术传播与交流。无论是技术新手渴望入门,还是资深开发者寻求进阶,这里都能为您提供深度见解与实用经验,让复杂编码变得轻松易懂,携手共赴技术提升新高度。如有侵权,请来信告知:hxstrive@outlook.com
公众号