程序员子龙(Java面试 + Java学习) 程序员子龙(Java面试 + Java学习)
首页
学习指南
工具
开源项目
技术书籍

程序员子龙

Java 开发从业者
首页
学习指南
工具
开源项目
技术书籍
  • 基础

  • JVM

  • Spring

  • 并发编程

  • Mybatis

  • 网络编程

  • 数据库

  • 缓存

  • 设计模式

  • 分布式

    • 分布式基础

    • zookeeper

      • zookpeer 入门
      • zookeeper监听机制
        • 原生的监听方式
        • 缓存的监听模式
        • node cache
        • path cache
        • tree cache
      • Zookeeper分布式锁
      • Java整合Zookeeper实战
    • 分布式事务

    • 分布式锁

  • 高并发

  • SpringBoot

  • SpringCloudAlibaba

  • Nginx

  • 面试

  • 生产问题

  • 系统设计

  • 消息中间件

  • Java
  • 分布式
  • zookeeper
程序员子龙
2024-01-29
目录

zookeeper监听机制

# 原生的监听方式

ZooKeeper的Watcher监测是一次性的,每次触发之后都需要重新进行注册。∙

client.create().orSetData().withMode(CreateMode.PERSISTENT).forPath("/test","hello world".getBytes());

// 注册观察者,当节点变动时触发
byte[] data = client.getData().usingWatcher(new Watcher() {
    @Override
    public void process(WatchedEvent event) {
        System.out.println("获取 test 节点 监听器 : " + event);
    }
}).forPath("/test");

client.create().orSetData().withMode(CreateMode.PERSISTENT).forPath("/test","hello".getBytes());
Thread.sleep(1000);
client.create().orSetData().withMode(CreateMode.PERSISTENT).forPath("/test","world".getBytes());
Thread.sleep(1000);
System.out.println("节点数据: "+ new String(data));
Thread.sleep(10000);
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16

Curator 引入了 Cache 来实现对 Zookeeper 服务端事件监听,Cache 事件监听可以理解 为一个本地缓存视图与远程 Zookeeper 视图的对比过程。Cache 提供了反复注册的功能。

# 缓存的监听模式

Curator引入了Cache来实现对Zookeeper服务端事件监听,Cache事件监听可以理解为一个本地缓存视图与远程Zookeeper视图的对比过程。Cache提供了反复注册的功能。Cache分为两类注册类型:节点监听和子节点监听。

Cache的监听模式,可以理解为利用了一个缓存的机制,同时Cache提供了反复注册的功能。简单来说Cache就是在客户端缓存了指定节点的状态,当感知到ZNode的节点变化的时候,就会触发event事件,注册的监听器就会处理这个事件。

Cache的事件有三种类型,Node Cache,Path Cache,Tree Cache。

  • Node Cache是用于ZNode节点的监听
  • Path Cache是用于ZNode子节点的监听
  • Tree Cache 相当于前两者的组合。

# node cache

//path 监听路径
public NodeCache(CuratorFramework client,String path)
1
2

通过注册监听器来实现,对当前节点数据变化的处理


public void addListener(NodeCacheListener listener)

1
2
3

代码示例:

     String nodePath = "/watch";

        //1.初始化一个nodeCache
        NodeCache nodeCache = new NodeCache(curatorFramework, nodePath, false);

        //2.初始化一个NodeCacheListener
        NodeCacheListener nodeCacheListener = new NodeCacheListener() {
            @Override
            public void nodeChanged() throws Exception {
                ChildData childData = nodeCache.getCurrentData();
                System.out.println("ZNode 节点的状态变化,path=" + childData.getPath());
                System.out.println("ZNode 节点的状态变化,data=" + new String(childData.getData(), "utf-8"));
                System.out.println("ZNode 节点的状态变化,stat=" + childData.getStat());
            }
        };

        nodeCache.getListenable().addListener(nodeCacheListener);
        /**
         * 唯一的一个参数buildInitial代表着是否将该节点的数据立即进行缓存。
         * 如果设置为true的话,在start启动时立即调用NodeCache的getCurrentData方法就能够得到对应节点的信息ChildData类,
         * 如果设置为false的就得不到对应的信息。
         *
         * 使用NodeCache来监听节点的事件
         */
        nodeCache.start();//start

        curatorFramework.create().forPath(nodePath);
        // 第1次变更节点数据
        curatorFramework.setData().forPath(nodePath, "hello".getBytes());
        Thread.sleep(1000);

        // 第2次变更节点数据
        curatorFramework.setData().forPath(nodePath, "world".getBytes());

        Thread.sleep(1000);

        // 第3次变更节点数据
        curatorFramework.setData().forPath(nodePath, "hello world".getBytes());
        Thread.sleep(1000);

        //目的是阻塞客户端关闭
        System.in.read();
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42

ZNode 节点的状态变化,path=/watch ZNode 节点的状态变化,data=hello ZNode 节点的状态变化,stat=415,416,1657780730498,1657780730505,1,0,0,0,5,0,415

ZNode 节点的状态变化,path=/watch ZNode 节点的状态变化,data=world ZNode 节点的状态变化,stat=415,417,1657780730498,1657780731524,2,0,0,0,5,0,415

ZNode 节点的状态变化,path=/watch ZNode 节点的状态变化,data=hello world ZNode 节点的状态变化,stat=415,418,1657780730498,1657780732534,3,0,0,0,11,0,415

可以看到,这里多次修改数据,能够实现多次监听。

# path cache

PathChildrenCache会对子节点进行监听,但是不会对二级子节点进行监听。

public PathChildrenCache(CuratorFramework client,String path,boolean cacheData)
1

通过注册监听器来实现,对当前节点的子节点数据变化的处理

public void addListener(PathChildrenCacheListener listener)
1

代码示例:


        String nodePath = "/watchNode";

        curatorFramework.create().forPath(nodePath);

        PathChildrenCache pathChildrenCache = new PathChildrenCache(curatorFramework, nodePath, true);
        PathChildrenCacheListener pathChildrenCacheListener = new PathChildrenCacheListener() {
            @Override
            public void childEvent(CuratorFramework curatorFramework, PathChildrenCacheEvent pathChildrenCacheEvent) throws Exception {
                ChildData childData = pathChildrenCacheEvent.getData();
                switch (pathChildrenCacheEvent.getType()) {
                    case CHILD_ADDED:
                        System.out.println("子节点增加,path=" + childData.getPath() + ",data=" + new String(childData.getData(), "utf-8"));
                        break;
                    case CHILD_UPDATED:
                        System.out.println("子节点更新,path=" + childData.getPath() + ",data=" + new String(childData.getData(), "utf-8"));
                        break;
                    case CHILD_REMOVED:
                        System.out.println("子节点删除,path=" + childData.getPath() + ",data=" + new String(childData.getData(), "utf-8"));
                        break;
                    default:
                        break;
                }
            }
        };

        pathChildrenCache.getListenable().addListener(pathChildrenCacheListener);
        pathChildrenCache.start(PathChildrenCache.StartMode.BUILD_INITIAL_CACHE);

        Thread.sleep(1000);
        for (int i = 0; i < 3; i++) {
            String childPath = nodePath + "/" + i;
            byte[] data = childPath.getBytes();
            curatorFramework.create().forPath(childPath, data);
        }

        //主线程睡眠,然后再删除子节点,这里依旧会触发子节点变更事件
        Thread.sleep(5000);
        for (int i = 0; i < 3; i++) {
            String childPath = nodePath + "/" + i;
            curatorFramework.delete().forPath(childPath);
        }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42

子节点增加,path=/watchNode/0,data=/watchNode/0 子节点增加,path=/watchNode/1,data=/watchNode/1 子节点增加,path=/watchNode/2,data=/watchNode/2 子节点删除,path=/watchNode/0,data=/watchNode/0 子节点删除,path=/watchNode/1,data=/watchNode/1 子节点删除,path=/watchNode/2,data=/watchNode/2

# tree cache

TreeCache使用一个内部类TreeNode来维护这个一个树结构。并将这个树结构与ZK节点进行了映射。所以TreeCache可以监听当前节点下所有节点的事件。

public TreeCache(CuratorFramework client,String path,boolean cacheData)
1

通过注册监听器来实现,对当前节点的子节点,及递归子节点数据变化的处理。

public void addListener(TreeCacheListener listener)
1

代码示例:

String nodePath = "/watch";

TreeCache treeCache = new TreeCache(curatorFramework, nodePath);
TreeCacheListener treeCacheListener = new TreeCacheListener() {
    @Override
    public void childEvent(CuratorFramework curatorFramework, TreeCacheEvent treeCacheEvent) throws Exception {
        ChildData data = treeCacheEvent.getData();
        switch (treeCacheEvent.getType()) {
            case NODE_ADDED:
                System.out.println("[TreeNode]节点增加,path="+data.getPath()+",data="+new String(data.getData(),"utf-8"));
                break;
            case NODE_UPDATED:
                System.out.println("[TreeNode]节点更新,path="+data.getPath()+",data="+new String(data.getData(),"utf-8"));
                break;
            case NODE_REMOVED:
                System.out.println("[TreeNode]节点删除,path="+data.getPath()+",data="+new String(data.getData(),"utf-8"));
                break;
            default:
                break;
        }
    }
};

treeCache.getListenable().addListener(treeCacheListener);

treeCache.start();
Thread.sleep(1000);
for (int i = 0; i < 3; i++) {
    String childPath = nodePath + "/" + i;
    byte[] data = childPath.getBytes();
    curatorFramework.create().creatingParentsIfNeeded().forPath(childPath, data);
}

Thread.sleep(1000);
for (int i = 0; i < 3; i++) {
    String childPath = nodePath + "/" + i;
    curatorFramework.delete().forPath(childPath);
}

curatorFramework.setData().forPath(nodePath,"update parent".getBytes());
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40

[TreeNode]节点增加,path=/watch,data=update parent [TreeNode]节点增加,path=/watch/0,data=/watch/0 [TreeNode]节点增加,path=/watch/1,data=/watch/1 [TreeNode]节点增加,path=/watch/2,data=/watch/2 [TreeNode]节点删除,path=/watch/0,data=/watch/0 [TreeNode]节点删除,path=/watch/1,data=/watch/1 [TreeNode]节点删除,path=/watch/2,data=/watch/2 [TreeNode]节点更新,path=/watch,data=update parent

上次更新: 2024/01/30, 15:08:57
zookpeer 入门
Zookeeper分布式锁

← zookpeer 入门 Zookeeper分布式锁→

最近更新
01
一个注解,优雅的实现接口幂等性
11-17
02
MySQL事务(超详细!!!)
10-14
03
阿里二面:Kafka中如何保证消息的顺序性?这周被问到两次了
10-09
更多文章>
Theme by Vdoing | Copyright © 2024-2024

    辽ICP备2023001503号-2

  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式