Dubbo源码解析(十) Dubbo Registry以Zookeeper为例

默认是DubboRegistryFactory

@SPI("dubbo")
public interface RegistryFactory {

    /** * Connect to the registry * <p> * Connecting the registry needs to support the contract: <br> * 1. When the check=false is set, the connection is not checked, otherwise the exception is thrown when disconnection <br> * 2. Support username:password authority authentication on URL.<br> * 3. Support the backup=10.20.153.10 candidate registry cluster address.<br> * 4. Support file=registry.cache local disk file cache.<br> * 5. Support the timeout=1000 request timeout setting.<br> * 6. Support session=60000 session timeout or expiration settings.<br> * * @param url Registry address, is not allowed to be empty * @return Registry reference, never return empty value */
    @Adaptive({"protocol"})
    Registry getRegistry(URL url);

}

这个接口很简单,就是服务可以发布到注册中心上面。通过传入的url的protocol来标示。那么具体是在哪里进行调用了这个接口的呢?

在RegistryProtocol进行暴露服务时,这个问题提问已经回答了。看看,这就是提问的艺术。。。

public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
        // 暴露服务,然后包装暴露者
        final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker);

        // 获取注册中心地址
        // 通过服务的registry配置,
        //<dubbo:registry id="asRegistry" client="zkclient" protocol="zookeeper" register="true" address="zookeeper:2181" file="D:/dubbo/user/dubbo.registry"/>
        //<dubbo:protocol name="dubbo" port="20880" dispatcher="direct" threadpool="fixed" threads="2"/>
        //<dubbo:service connections="10" interface="com.wuhulala.dubbo.user.service.UserService" ref="demoService" registry="asRegistry"/>

        URL registryUrl = getRegistryUrl(originInvoker);

        //根据注册URL 获取注册中心
        final Registry registry = getRegistry(originInvoker);

        // 处理需要注册到注册中心的地址
        final URL registedProviderUrl = getRegistedProviderUrl(originInvoker);

        //判断是否需要注册,为什么不放到最上面
        boolean register = registedProviderUrl.getParameter("register", true);

        ProviderConsumerRegTable.registerProvider(originInvoker, registryUrl, registedProviderUrl);

        if (register) {
            // 注册逻辑
            register(registryUrl, registedProviderUrl);
            ProviderConsumerRegTable.getProviderWrapper(originInvoker).setReg(true);
       }
       // 省略 ...

    }

    public void register(URL registryUrl, URL registedProviderUrl) {
        // registry在初始化的时候被注入一个自适应的扩展点ExtensionLoader.getExtensionLoader(type).getAdaptiveExtension()
        //所以发布的时候也是自适应寻找对应扩展点进行注册到注册中心
        Registry registry = registryFactory.getRegistry(registryUrl);
        registry.register(registedProviderUrl);
    }

下面以Zookeeper为例
AbstractRegistryFactory

public Registry getRegistry(URL url) {
        url = url.setPath(RegistryService.class.getName())
                .addParameter(Constants.INTERFACE_KEY, RegistryService.class.getName())
                .removeParameters(Constants.EXPORT_KEY, Constants.REFER_KEY);
        String key = url.toServiceString();
        // Lock the registry access process to ensure a single instance of the registry
        LOCK.lock();
        try {
            Registry registry = REGISTRIES.get(key);
            if (registry != null) {
                return registry;
            }
            registry = createRegistry(url);
            if (registry == null) {
                throw new IllegalStateException("Can not create registry " + url);
            }
            REGISTRIES.put(key, registry);
            return registry;
        } finally {
            // Release the lock
            LOCK.unlock();
        }
    }

ZookeeperRegistryFactory

public Registry createRegistry(URL url) {
        return new ZookeeperRegistry(url, zookeeperTransporter);
    }

ZookeeperRegistry

public ZookeeperRegistry(URL url, ZookeeperTransporter zookeeperTransporter) {
        super(url);
        if (url.isAnyHost()) {
            throw new IllegalStateException("registry address == null");
        }
        String group = url.getParameter(Constants.GROUP_KEY, DEFAULT_ROOT);
        if (!group.startsWith(Constants.PATH_SEPARATOR)) {
            group = Constants.PATH_SEPARATOR + group;
        }
        this.root = group;
        zkClient = zookeeperTransporter.connect(url);
        zkClient.addStateListener(new StateListener() {
            public void stateChanged(int state) {
                if (state == RECONNECTED) {
                    try {
                        recover();
                    } catch (Exception e) {
                        logger.error(e.getMessage(), e);
                    }
                }
            }
        });
    }

注册节点

protected void doRegister(URL url) {
        try {
            // 第二个参数代表是否是临时节点
            zkClient.create(toUrlPath(url), url.getParameter(Constants.DYNAMIC_KEY, true));
        } catch (Throwable e) {
            throw new RpcException("Failed to register " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
        }
    }

节点目录结构

private String toUrlPath(URL url) {
        // 分类名称/url编码
        return toCategoryPath(url) + Constants.PATH_SEPARATOR + URL.encode(url.toFullString());
    }
private String toCategoryPath(URL url) {
        return toServicePath(url) + Constants.PATH_SEPARATOR + url.getParameter(Constants.CATEGORY_KEY, Constants.DEFAULT_CATEGORY);
    }
private String toServicePath(URL url) {
        String name = url.getServiceInterface();
        if (Constants.ANY_VALUE.equals(name)) {
 return toRootPath();
        }
 return toRootDir() + URL.encode(name);
    }
private String toRootDir() {
        if (root.equals(Constants.PATH_SEPARATOR)) {
            return root;
        }
        return root + Constants.PATH_SEPARATOR;
    }
String group = url.getParameter(Constants.GROUP_KEY, DEFAULT_ROOT);
        if (!group.startsWith(Constants.PATH_SEPARATOR)) {
            group = Constants.PATH_SEPARATOR + group;
        }
        this.root = group;

所以节点的注册信息就是 group{默认dubbo}/serviceName/category{默认providers}/url编码

总体来说 registry这里不是特别的复杂

相关文章
相关标签/搜索