Dubbo ZK订阅流程

ZK订阅流程

由RegistryProtocol开始,再由ZookeeperREgistry完成最后的订阅。

RegistryProtocol

RegistryProtocol 实现Protocol接口

@SPI("dubbo")
public interface Protocol {

    /** * 获取缺省端口,当用户没有配置端口时使用。 * * @return 缺省端口 */
    int getDefaultPort();

    /** * 暴露远程服务:<br> * 1. 协议在接收请求时,应记录请求来源方地址信息:RpcContext.getContext().setRemoteAddress();<br> * 2. export()必须是幂等的,也就是暴露同一个URL的Invoker两次,和暴露一次没有区别。<br> * 3. export()传入的Invoker由框架实现并传入,协议不需要关心。<br> * * @param <T> 服务的类型 * @param invoker 服务的执行体 * @return exporter 暴露服务的引用,用于取消暴露 * @throws RpcException 当暴露服务出错时抛出,比如端口已占用 */
    @Adaptive
    <T> Exporter<T> export(Invoker<T> invoker) throws RpcException;

    /** * 引用远程服务:<br> * 1. 当用户调用refer()所返回的Invoker对象的invoke()方法时,协议需相应执行同URL远端export()传入的Invoker对象的invoke()方法。<br> * 2. refer()返回的Invoker由协议实现,协议通常需要在此Invoker中发送远程请求。<br> * 3. 当url中有设置check=false时,连接失败不能抛出异常,并内部自动恢复。<br> * * @param <T> 服务的类型 * @param type 服务的类型 * @param url 远程服务的URL地址 * @return invoker 服务的本地代理 * @throws RpcException 当连接服务提供方失败时抛出 */
    @Adaptive
    <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException;

    /** * 释放协议:<br> * 1. 取消该协议所有已经暴露和引用的服务。<br> * 2. 释放协议所占用的所有资源,比如连接和端口。<br> * 3. 协议在释放后,依然能暴露和引用新的服务。<br> */
    void destroy();

}

RegistryProtocal的暴露方法代码如下:

public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
    //export invoker
    final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker);
    //registry provider
    final Registry registry = getRegistry(originInvoker);
    final URL registedProviderUrl = getRegistedProviderUrl(originInvoker);
    registry.register(registedProviderUrl);
    // 订阅override数据
    // FIXME 提供者订阅时,会影响同一JVM即暴露服务,又引用同一服务的的场景,因为subscribed以服务名为缓存的key,导致订阅信息覆盖。
    final URL overrideSubscribeUrl = getSubscribedOverrideUrl(registedProviderUrl);
    final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl);
    overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);
    registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);
    //保证每次export都返回一个新的exporter实例
    return new Exporter<T>() {
        public Invoker<T> getInvoker() {
            return exporter.getInvoker();
        }
        public void unexport() {
           try {
              exporter.unexport();
           } catch (Throwable t) {
               logger.warn(t.getMessage(), t);
            }
            try {
               registry.unregister(registedProviderUrl);
            } catch (Throwable t) {
               logger.warn(t.getMessage(), t);
            }
            try {
               overrideListeners.remove(overrideSubscribeUrl);
               registry.unsubscribe(overrideSubscribeUrl, overrideSubscribeListener);
            } catch (Throwable t) {
               logger.warn(t.getMessage(), t);
            }
        }
    };
}

最核心的订阅方法如下:

registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);

该方法实现RegistryService。

/** * 订阅符合条件的已注册数据,当有注册数据变更时自动推送. * * 订阅需处理契约:<br> * 1. 当URL设置了check=false时,订阅失败后不报错,在后台定时重试。<br> * 2. 当URL设置了category=routers,只通知指定分类的数据,多个分类用逗号分隔,并允许星号通配,表示订阅所有分类数据。<br> * 3. 允许以interface,group,version,classifier作为条件查询,如:interface=com.alibaba.foo.BarService&version=1.0.0<br> * 4. 并且查询条件允许星号通配,订阅所有接口的所有分组的所有版本,或:interface=*&group=*&version=*&classifier=*<br> * 5. 当注册中心重启,网络抖动,需自动恢复订阅请求。<br> * 6. 允许URI相同但参数不同的URL并存,不能覆盖。<br> * 7. 必须阻塞订阅过程,等第一次通知完后再返回。<br> * * @param url 订阅条件,不允许为空,如:consumer://10.20.153.10/com.alibaba.foo.BarService?version=1.0.0&application=kylin * @param listener 变更事件监听器,不允许为空 */
void subscribe(URL url, NotifyListener listener);

在 subscribe方法调用时,进入FailbackRegistry方法中。在该类中,会订阅对应的消息,同时对失败的消息进行定时重试。

@Override
public void subscribe(URL url, NotifyListener listener) {
    super.subscribe(url, listener);
    removeFailedSubscribed(url, listener);
    try {
        // 向服务器端发送订阅请求
        doSubscribe(url, listener);
    } catch (Exception e) {
        Throwable t = e;

        List<URL> urls = getCacheUrls(url);
        if (urls != null && urls.size() > 0) {
            notify(url, listener, urls);
            logger.error("Failed to subscribe " + url + ", Using cached list: " + urls + " from cache file: " + getUrl().getParameter(Constants.FILE_KEY, System.getProperty("user.home") + "/dubbo-registry-" + url.getHost() + ".cache") + ", cause: " + t.getMessage(), t);
        } else {
            // 如果开启了启动时检测,则直接抛出异常
            boolean check = getUrl().getParameter(Constants.CHECK_KEY, true)
                    && url.getParameter(Constants.CHECK_KEY, true);
            boolean skipFailback = t instanceof SkipFailbackWrapperException;
            if (check || skipFailback) {
                if(skipFailback) {
                    t = t.getCause();
                }
                throw new IllegalStateException("Failed to subscribe " + url + ", cause: " + t.getMessage(), t);
            } else {
                logger.error("Failed to subscribe " + url + ", waiting for retry, cause: " + t.getMessage(), t);
            }
        }

        // 将失败的订阅请求记录到失败列表,定时重试
        addFailedSubscribed(url, listener);
    }
}

super.subscribe方法,调用父类,在父类中对参数检查和增加对应的监听器。

public void subscribe(URL url, NotifyListener listener) {
    if (url == null) {
        throw new IllegalArgumentException("subscribe url == null");
    }
    if (listener == null) {
        throw new IllegalArgumentException("subscribe listener == null");
    }
    if (logger.isInfoEnabled()){
        logger.info("Subscribe: " + url);
    }
    Set<NotifyListener> listeners = subscribed.get(url);
    if (listeners == null) {
        subscribed.putIfAbsent(url, new ConcurrentHashSet<NotifyListener>());
        listeners = subscribed.get(url);
    }
    listeners.add(listener);
}

在下一步的doSubscribe方法中,实现订阅。
订阅分为两种,一种订阅所有的,使用*通配符,第二种指定了Service。以下代码为第二种订阅方式:

List<URL> urls = new ArrayList<URL>();
    for (String path : toCategoriesPath(url)) {
        ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.get(url);
        if (listeners == null) {
            zkListeners.putIfAbsent(url, new ConcurrentHashMap<NotifyListener, ChildListener>());
            listeners = zkListeners.get(url);
        }
        ChildListener zkListener = listeners.get(listener);
        if (zkListener == null) {
            listeners.putIfAbsent(listener, new ChildListener() {
                public void childChanged(String parentPath, List<String> currentChilds) {
                   ZookeeperRegistry.this.notify(url, listener, toUrlsWithEmpty(url, parentPath, currentChilds));
                }
            });
            zkListener = listeners.get(listener);
        }
        zkClient.create(path, false);
        List<String> children = zkClient.addChildListener(path, zkListener);
        if (children != null) {
           urls.addAll(toUrlsWithEmpty(url, path, children));
        }
    }
    notify(url, listener, urls);
}

最终增加监听器,同时创建zk目录。

ZookeeperRegistry

ZookeeperRegistry中,对服务进行订阅。

for (String path : toCategoriesPath(url)) {
    ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.get(url);
    if (listeners == null) {
        zkListeners.putIfAbsent(url, new ConcurrentHashMap<NotifyListener, ChildListener>());
        listeners = zkListeners.get(url);
    }
    ChildListener zkListener = listeners.get(listener);
    if (zkListener == null) {
        listeners.putIfAbsent(listener, new ChildListener() {
            public void childChanged(String parentPath, List<String> currentChilds) {
               ZookeeperRegistry.this.notify(url, listener, toUrlsWithEmpty(url, parentPath, currentChilds));
            }
        });
        zkListener = listeners.get(listener);
    }
    zkClient.create(path, false);
    List<String> children = zkClient.addChildListener(path, zkListener);
    if (children != null) {
       urls.addAll(toUrlsWithEmpty(url, path, children));
    }
}
notify(url, listener, urls);

分别获取对应providers,configurators,routers三个目录,分别订阅。

if (zkListener == null) {
    listeners.putIfAbsent(listener, new ChildListener() {
        public void childChanged(String parentPath, List<String> currentChilds) {
           ZookeeperRegistry.this.notify(url, listener, toUrlsWithEmpty(url, parentPath, currentChilds));
        }
    });

最终执行notify方法。

protected void doNotify(URL url, NotifyListener listener, List<URL> urls) {
   super.notify(url, listener, urls);
}

notify由父类AbstractRegistry方法完成。

for (Map.Entry<String, List<URL>> entry : result.entrySet()) {
    String category = entry.getKey();
    List<URL> categoryList = entry.getValue();
    categoryNotified.put(category, categoryList);
    saveProperties(url);
    listener.notify(categoryList);
}
private void saveProperties(URL url) {
    if (file == null) {
        return;
    }

    try {
        StringBuilder buf = new StringBuilder();
        Map<String, List<URL>> categoryNotified = notified.get(url);
        if (categoryNotified != null) {
            for (List<URL> us : categoryNotified.values()) {
                for (URL u : us) {
                    if (buf.length() > 0) {
                        buf.append(URL_SEPARATOR);
                    }
                    buf.append(u.toFullString());
                }
            }
        }
        properties.setProperty(url.getServiceKey(), buf.toString());
        long version = lastCacheChanged.incrementAndGet();
        if (syncSaveFile) {
            doSaveProperties(version);
        } else {
            registryCacheExecutor.execute(new SaveProperties(version));
        }
    } catch (Throwable t) {
        logger.warn(t.getMessage(), t);
    }
}

最终通过线程池来完成更新文件。

registryCacheExecutor.execute(new SaveProperties(version));
相关文章
相关标签/搜索