Dubbo源码解析(十一) Dubbo Exchanger

先看一下Exchanger的接口定义,就是bindconnectbind是服务端调用的,绑定一个端口用来接收客户端的请求。connect是作为一个客户端去连接服务端,进行和服务端交换。

@SPI(HeaderExchanger.NAME)
public interface Exchanger {

    /** * bind. * * @param url * @param handler * @return message server */
    @Adaptive({Constants.EXCHANGER_KEY})
    ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException;

    /** * connect. * * @param url * @param handler * @return message channel */
    @Adaptive({Constants.EXCHANGER_KEY})
    ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException;

}

那么Exchanger是被谁调用了?
这里写图片描述
可以看到主要是被Exchangers这个工具类调用了,那么这个工具类具体做了什么事情
这里写图片描述
就是bindconnect方法以及创建Exchanger的功能

创建Exchanger

public static Exchanger getExchanger(String type) {
        return ExtensionLoader.getExtensionLoader(Exchanger.class).getExtension(type);
    }

bind

public static ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
        if (url == null) {
            throw new IllegalArgumentException("url == null");
        }
        if (handler == null) {
            throw new IllegalArgumentException("handler == null");
        }
        url = url.addParameterIfAbsent(Constants.CODEC_KEY, "exchange");
        return getExchanger(url).bind(url, handler);
    }

connect方法

public static ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException {
        if (url == null) {
            throw new IllegalArgumentException("url == null");
        }
        if (handler == null) {
            throw new IllegalArgumentException("handler == null");
        }
        url = url.addParameterIfAbsent(Constants.CODEC_KEY, "exchange");
        return getExchanger(url).connect(url, handler);
    }

可以看到 逻辑性不是很强,但是这个分层到底是为了什么?直接在Protocol发布/引用服务的时候创建一个Exchanger不就好了吗?

为什么要抽象一个工具类呢?

这个后面再思考思考,什么时候写公共类,什么时候写这种工具类

先聊聊Exchanger的作用是什么?

exchange 信息交换层:封装请求响应模式,同步转异步,以 Request, Response 为中心,扩展接口为 Exchanger, ExchangeChannel, ExchangeClient, ExchangeServer

RequestResponse,很熟悉,和Servlet是不是很像,那么这一层应该是类似于Tomcat接收外界的请求,封装为RequestResponse进行封装与转发,作为内部与外部连接的转接层。确实是哦,Protocol这一层主要是进行服务的发布与引用,相当于内部的处理都做好了,但是现在与外部个还没有连接,所以对于injvm这些不需要与外界进行沟通的协议或者如rmi这种有自己格式固定的通讯并有相应的支支撑的协议,就不需要这个交换层了。

Dubbo里面只有一个HeaderExchanger的默认实现,对心跳的功能的包装

public class HeaderExchanger implements Exchanger {

    public static final String NAME = "header";

    public ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException {
        // 开启一个Transporters的客户端
        // 解码 -> 心跳handler包装原始的handler 装饰者模式 哇哇哇哇
        return new HeaderExchangeClient(Transporters.connect(url, new DecodeHandler(new HeaderExchangeHandler(handler))), true);
    }

    public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
        // 开启一个Transporters的服务端
        return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler))));
    }

}

创建一个交换层的客户端,也就是ReferenceBean的消息发送者

public HeaderExchangeClient(Client client, boolean needHeartbeat) {
        if (client == null) {
            throw new IllegalArgumentException("client == null");
        }
        this.client = client;
        this.channel = new HeaderExchangeChannel(client);
        // 获取dubbo协议版本
        String dubbo = client.getUrl().getParameter(Constants.DUBBO_VERSION_KEY);
        this.heartbeat = client.getUrl().getParameter(Constants.HEARTBEAT_KEY, dubbo != null && dubbo.startsWith("1.0.") ? Constants.DEFAULT_HEARTBEAT : 0);
        // 获取心跳超时时间
        this.heartbeatTimeout = client.getUrl().getParameter(Constants.HEARTBEAT_TIMEOUT_KEY, heartbeat * 3);
        // 如果心跳超时小于两个心跳间隔的时间 
        if (heartbeatTimeout < heartbeat * 2) {
            throw new IllegalStateException("heartbeatTimeout < heartbeatInterval * 2");
        }
        // 如果需要发送心跳
        if (needHeartbeat) {
            startHeartbeatTimer();
        }
    }

如何发送心跳的

  1. 停止之前的心跳发送器
  2. 启动一个新的心跳
    通过 ScheduledThreadPoolExecutor 定时任务线程池
// 过heartbeat时间后开始没heartbeat发送心跳
    private void startHeartbeatTimer() {
        stopHeartbeatTimer();
        if (heartbeat > 0) {
            heartbeatTimer = scheduled.scheduleWithFixedDelay(
            heartBeatTask,
            heartbeat, 
            heartbeat, 
            TimeUnit.MILLISECONDS);
        }
    }

heartBeatTask 心跳任务

new HeartBeatTask(new HeartBeatTask.ChannelProvider() {
        public Collection<Channel> getChannels() {
            return Collections.<Channel>singletonList(HeaderExchangeClient.this);
        }
    }, heartbeat, heartbeatTimeout);

HeaderExchangeServer 也是类似的功能,不一样的地方就是里面的属性serverclient,这些都是Transport这里面的,后面还是要梳理梳理整个dubbo的流程 且听下回分解

相关文章
相关标签/搜索