ZooKeeper Watcher 机制源码解释(转)

2018/03/13 10:09:18 No Comments

本文转自 https://www.ibm.com/developerworks/cn/opensource/os-cn-apache-zookeeper-watcher/index.html

ZooKeeper 允许客户端向服务端注册一个 Watcher 监听,当服务端的一些指定事件触发了这个 Watcher,那么就会向指定客户端发送一个事件通知来实现分布式的通知功能。

ZooKeeper 的 Watcher 机制主要包括客户端线程、客户端 WatchManager 和 ZooKeeper 服务器三部分。在具体工作流程上,简单地讲,客户端在向 ZooKeeper 服务器注册 Watcher 的同时,会将 Watcher 对象存储在客户端的 WatchManager 中。当 ZooKeeper 服务器端触发 Watcher 事件后,会向客户端发送通知,客户端线程从 WatchManager 中取出对应的 Watcher 对象来执行回调逻辑。

    private final HashMap<String, HashSet<Watcher>> watchTable =new HashMap<String, HashSet<Watcher>>();

    public synchronized void addWatch(String path, Watcher watcher) {
        HashSet<Watcher> list = watchTable.get(path);
        if(list == null) {
            list = new HashSet<Watcher>(4);
            watchTable.put(path, list);
        }
        list.add(watcher);

        HashSet<String> paths = watch2Paths.get(watcher);
        if(paths == null) {
            // cnxns typically have many watches, so use default cap here
            paths = new HashSet<String>();
            watch2Paths.put(watcher, paths);
        }
        paths.add(path);
    }

Watcher 接口

Watcher 的理念是启动一个客户端去接收从 ZooKeeper 服务端发过来的消息并且同步地处理这些信息。ZooKeeper 的 Java API 提供了公共接口 Watcher,具体操作类通过实现这个接口相关的方法来实现从所连接的 ZooKeeper 服务端接收数据。如果要处理这个消息,需要为客户端注册一个 CallBack(回调)对象。Watcher 接口定义在 org.apache.zookeeper 包里面,代码如下所示

    public interface Watcher {
        abstract public void process(WatchedEvent event);
    }

在 Watcher 接口里面,除了回调函数 process 以外,还包含 KeeperState 和 EventType 两个枚举类,分别代表了通知状态和事件类型,如下所示:

Watcher通知状态和事件类型表

process 方法是 Watcher 接口中的一个回调方法,当 ZooKeeper 向客户端发送一个 Watcher 事件通知时,客户端就会对相应的 process 方法进行回调,从而实现对事件的处理。

process 方法包含 WatcherEvent 类型的参数,WatchedEvent 包含了每一个事件的三个基本属性:通知状态(KeeperState)、事件类型(EventType)和节点路径(Path),ZooKeeper 使用 WatchedEvent 对象来封装服务端事件并传递给 Watcher,从而方便回调方法 process 对服务端事件进行处理。

客户端注册 Watcher 流程

客户端的请求基本都是在 ClientCnxn 里面进行操作,当收到请求后,客户端会对当前客户端请求进行标记,将其设置为使用 Watcher 监听,同时会封装一个 Watcher 的注册信息 WatchRegistration 对象,用于暂时保存数据节点的路径和 Watcher 的对应关系。

在 ZooKeeper 中,Packet 是一个最小的通信协议单元,即数据包。Pakcet 用于进行客户端与服务端之间的网络传输,任何需要传输的对象都需要包装成一个 Packet 对象。在 ClientCnxn 中 WatchRegistration 也会被封装到 Pakcet 中,然后由 SendThread 线程调用 queuePacke 方法把 Packet 放入发送队列中等待客户端发送,这又是一个异步过程,分布式系统采用异步通信是一个普遍认同的观念。随后,SendThread 线程会通过 readResponse 方法接收来自服务端的响应,异步地调用 finishPacket 方法从 Packet 中取出对应的 Watcher 并注册到 ZKWatchManager 中去,如下所示:

    private void finishPacket(Packet p) {
        if(p.watchRegistration != null) {
            p.watchRegistration.register(p.replyHeader.getErr());
        }

        if(p.cb == null) {
            synchronized(p) {
                p.finished = true;
                p.notifyAll();
            }
        } else {
            p.finished = true;
            eventThread.queuePacket(p);
        }
    }

除了上面介绍的方式以外,ZooKeeper 客户端也可以通过 getData、getChildren 和 exist 三个接口来向 ZooKeeper 服务器注册 Watcher,无论使用哪种方式,注册 Watcher 的工作原理是一致的。如下所示,getChildren 方法调用了 WatchManager 类的 addWatch 方法添加了 watcher 事件。

    public ArrayList<String> getChildren(String path, Stat stat,
                                         Watcher watcher) throws KeeperException.NoNodeException {
        DataNodeV1 n = nodes.get(path);
        if(n == null) {
            throw new KeeperException.NoNodeException();
        }
        synchronized(n) {
            ArrayList<String> children = new ArrayList<String>();
            children.addAll(n.children);
            if(watcher != null) {
                childWatches.addWatch(path, watcher);
            }
            return children;
        }
    }

如上所示,现在需要从这个封装对象中再次提取出 Watcher 对象来,在 register 方法里面,客户端将 Watcher 对象转交给 ZKWatchManager,并最终保存在一个 Map 类型的数据结构 dataWatches 里面,用于将数据节点的路径和 Watcher 对象进行一一映射后管理起来。

服务端处理 Watcher 流程

如下所示是服务端处理 Watcher 的一个完整序列图。

对于注册 Watcher 请求,FinalRequestProcessor 的 ProcessRequest 方法会判断当前请求是否需要注册 Watcher,如果为 true,就会将当前的 ServerCnxn 对象和数据节点路径传入 getData 方法中去。ServerCnxn 是一个 ZooKeeper 客户端和服务器之间的连接接口,代表了一个客户端和服务器的连接,我们后面讲到的 process 回调方法,实际上也是从这里回调的,所以可以把 ServerCnxn 看作是一个 Watcher 对象。数据节点的节点路径和 ServerCnxn 最终会被存储在 WatchManager 的 watchTable 和 watch2Paths 中。

WatchManager 负责 Watcher 事件的触发,它是一个统称,在服务端 DataTree 会托管两个 WatchManager,分别是 dataWatches 和 childWatches,分别对应数据变更 Watcher 和子节点变更 Watcher。

当发生 Create、Delete、NodeChange(数据变更)这样的事件后,DataTree 会调用相应方法去触发 WatchManager 的 triggerWatch 方法,该方法返回 ZNODE 的信息,自此进入到回调本地 process 的序列。

    public Set<Watcher> triggerWatch(String path, EventType type, Set<Watcher> supress) {
        WatchedEvent e = new WatchedEvent(type,
                KeeperState.SyncConnected, path);
//将事件类型(EventType)、通知状态(WatchedEvent)、节点路径封装成一个 WatchedEvent 对象
        HashSet<Watcher> watchers;
        synchronized(this) {
//根据数据节点的节点路径从 watchTable 里面取出对应的 Watcher。如果没有找到 Watcher 对象,
            说明没有任何客户端在该数据节点上注册过 Watcher,直接退出。如果找打了 Watcher 就将其提取出来,
            同时会直接从 watchTable 和 watch2Paths 里删除 Watcher,即 Watcher 是一次性的,触发一次就失效了。
            watchers = watchTable.remove(path);
            for(Watcher w : watchers) {
                HashSet<String> paths = watch2Paths.get(w);
            }
        }
        for(Watcher w : watchers) {
            if(supress != null && supress.contains(w)) {
                continue;
            }
//对于需要注册 Watcher 的请求,ZooKeeper 会把请求对应的恶 ServerCnxn 作为一个 Watcher 存储,
            所以这里调用的 process 方法实质上是 ServerCnxn 的对应方法
            w.process(e);
        }
        return watchers;
    }

从上面的代码我们可以总结出,如果想要处理一个 Watcher,需要执行的步骤如下所示:

1. 将事件类型(EventType)、通知状态(WatchedEvent)、节点路径封装成一个 WatchedEvent 对象。

2. 根据数据节点的节点路径从 watchTable 里面取出对应的 Watcher。如果没有找到 Watcher 对象,说明没有任何客户端在该数据节点上注册过 Watcher,直接退出。如果找到了 Watcher 就将其提取出来,同时会直接从 watchTable 和 watch2Paths 里删除 Watcher,即 Watcher 是一次性的,触发一次就失效了。

3. 对于需要注册 Watcher 的请求,ZooKeeper 会把请求对应的 ServerCnxn 作为一个 Watcher 存储,所以这里调用的 process 方法实质上是 ServerCnxn 的对应方法,如清单 24 所示,在请求头标记“-1”表示当前是一个通知,将 WatchedEvent 包装成 WatcherEvent 用于网络传输序列化,向客户端发送通知,真正的回调方法在客户端,就是如下的 process() 方法。

    synchronized public void process(WatchedEvent event) {
        ReplyHeader h = new ReplyHeader(-1, -1L, 0);
        if(LOG.isTraceEnabled()) {
            ZooTrace.logTraceMessage(LOG, ZooTrace.EVENT_DELIVERY_TRACE_MASK,
                    "Deliver event " + event + " to 0x"
                            + Long.toHexString(this.sessionId)
                            + " through " + this);
        }

        // Convert WatchedEvent to a type that can be sent over the wire
        WatcherEvent e = event.getWrapper();

        sendResponse(h, e, "notification");
    }

客户端收到消息后,会调用 ClientCnxn 的 SendThread.readResponse 方法来进行统一处理,如清单所示。如果响应头 replyHdr 中标识的 Xid 为 02,表示是 ping,如果为-4,表示是验证包,如果是-1,表示这是一个通知类型的响应,然后进行反序列化、处理 chrootPath、还原 WatchedEvent、回调 Watcher 等步骤,其中回调 Watcher 步骤将 WacthedEvent 对象交给 EventThread 线程,在下一个轮询周期中进行 Watcher 回调

SendThread 接收到服务端的通知事件后,会通过调用 EventThread 类的 queueEvent 方法将事件传给 EventThread 线程,queueEvent 方法根据该通知事件,从 ZKWatchManager 中取出所有相关的 Watcher,如下所示:

    class EventThread extends ZooKeeperThread {
        public void queueEvent(WatchedEvent event) {
            if(event.getType() == EventType.None
                    && sessionState == event.getState()) {
                return;
            }
            sessionState = event.getState();

            // materialize the watchers based on the event
            WatcherSetEventPair pair = new WatcherSetEventPair(
                    watcher.materialize(event.getState(), event.getType(),
                            event.getPath()),
                    event);
            // queue the pair (watch set & event) for later processing
            waitingEvents.add(pair);
        }

客户端在识别出事件类型 EventType 之后,会从相应的 Watcher 存储中删除对应的 Watcher,获取到相关的 Watcher 之后,会将其放入 waitingEvents 队列,该队列从字面上就能理解是一个待处理队列,线程的 run 方法会不断对该该队列进行处理,这就是一种异步处理思维的实现。

    public Set<Watcher> materialize(Watcher.Event.KeeperState state,
                                    Watcher.Event.EventType type,
                                    String clientPath) {
        Set<Watcher> result = new HashSet<Watcher>();
        case NodeCreated:
        synchronized(dataWatches) {
            addTo(dataWatches.remove(clientPath), result);
        }
        synchronized(existWatches) {
            addTo(existWatches.remove(clientPath), result);
        }
        break;
    }

Watcher 特性总结

1. 注册只能确保一次消费
无论是服务端还是客户端,一旦一个 Watcher 被触发,ZooKeeper 都会将其从相应的存储中移除。因此,开发人员在 Watcher 的使用上要记住的一点是需要反复注册。这样的设计有效地减轻了服务端的压力。如果注册一个 Watcher 之后一直有效,那么针对那些更新非常频繁的节点,服务端会不断地向客户端发送事件通知,这无论对于网络还是服务端性能的影响都非常大。

2. 客户端串行执行
客户端 Watcher 回调的过程是一个串行同步的过程,这为我们保证了顺序,同时,需要开发人员注意的一点是,千万不要因为一个 Watcher 的处理逻辑影响了整个客户端的 Watcher 回调。

3. 轻量级设计
WatchedEvent 是 ZooKeeper 整个 Watcher 通知机制的最小通知单元,这个数据结构中只包含三部分的内容:通知状态、事件类型和节点路径。也就是说,Watcher 通知非常简单,只会告诉客户端发生了事件,而不会说明事件的具体内容。例如针对 NodeDataChanged 事件,ZooKeeper 的 Watcher 只会通知客户指定数据节点的数据内容发生了变更,而对于原始数据以及变更后的新数据都无法从这个事件中直接获取到,而是需要客户端主动重新去获取数据,这也是 ZooKeeper 的 Watcher 机制的一个非常重要的特性。另外,客户端向服务端注册 Watcher 的时候,并不会把客户端真实的 Watcher 对象传递到服务端,仅仅只是在客户端请求中使用 boolean 类型属性进行了标记,同时服务端也仅仅只是保存了当前连接的 ServerCnxn 对象。这样轻量级的 Watcher 机制设计,在网络开销和服务端内存开销上都是非常廉价的。

转载请标明出处:i flym
本文地址:https://www.iflym.com/index.php/java-programe/201803130001.html

相关文章:

留下足迹