Dubbo学习(三)—Dubbo服务发布端启动流程源码剖析

Scroll Down

Dubbo学习(三)—Dubbo服务端启动与消费流程源码剖析

dubbo服务端暴露服务的概要过程

image-20200722115543981

ServiceConfig 类引用对外提供服务的实现类ref(如GreetingServiceImpl),然后通过ProxyFactory 接口的扩展实现类的getInvoker()方法使用ref 生成一个AbstractProxyInvoker 实例,到这一步就完成了具体服务到Invoker 的转化。接下来就是Invoker转换到Exporter的过程。Dubbo协议的Invoker转为Exporter发生在DubboProtocol类的export ()方法中,Dubbo处理服务暴露的关键就在Invoker转换到Exporter的过程中,在这个过程中会先启动Netty Server监听服务连接,然后将服务注册到服务注册中心。

Dubbo 会在 Spring 实例化完 bean 之后,在刷新容器最后一步发布 ContextRefreshEvent 事件的时候,通知实现了 ApplicationListener 的 ServiceBean 类进行回调 onApplicationEvent 事件方法,Dubbo 会在这个方法中调用 ServiceBean 父类 ServiceConfig 的 export 方法,而该方法真正实现了服务的(异步或者非异步)发布。

dubbo服务流程具体分析

dubbo服务提供端的启动流程图,这里我们通过前面的API发布服务进行剖析整体的流程
image-20200722101327599

ServiceConfig中的ref转换为Invoker步骤

  1. 进入serviceConfig.export();方法

    public synchronized void export() {
           //判断是否需要导出服务
            if (!shouldExport()) {
                return;
            }
    		//是否需要延迟发布,使用ScheduledExecutorService来实现,如果需要设置延迟发布,可以使用serviceConfig.setDelay();方法
            if (shouldDelay()) {
                delayExportExecutor.schedule(this::doExport, delay, TimeUnit.MILLISECONDS);
            } else {
                doExport();
            }
        }
    
  2. 如果不需要延迟发布,则直接进入doExport()方法

  3. doExport()最终调用的是doExportUrls()方法

    private void doExportUrls() {
        	//加载所有的服务注册中心对象
            List<URL> registryURLs = loadRegistries(true);
            for (ProtocolConfig protocolConfig : protocols) {
                String pathKey = URL.buildKey(getContextPath(protocolConfig).map(p -> p + "/" + path).orElse(path), group, version);
                ProviderModel providerModel = new ProviderModel(pathKey, ref, interfaceClass);
                ApplicationModel.initProviderModel(pathKey, providerModel);
                doExportUrlsFor1Protocol(protocolConfig, registryURLs);
            }
        }
    
  4. 通过调用loadRegistries()方法加载所有的服务注册中心对象,在Dubbo中,一个服务可以被注册到多个服务注册中心

  5. doExportUrlsFor1Protocol(protocolConfig, registryURLs)方法首先把参数封装为URL(在Dubbo里会把所有参数封装到一个URL里),然后具体执行服务导出

    private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs) {
            //.....这里省略代码,主要是用于解析MethodConfig对象设置的方法级别的配置并保存到参数map中;
     		//判断调用类型,如果为泛型调用,则设置泛型类型(true、nativejava或bean方式)   
            if (ProtocolUtils.isGeneric(generic)) {
                map.put(Constants.GENERIC_KEY, generic);
                map.put(Constants.METHODS_KEY, Constants.ANY_VALUE);
            } else {
            	//拼接url中的版本参数    
                String revision = Version.getVersion(interfaceClass, version);
    
                if (revision != null && revision.length() > 0) {
                    map.put("revision", revision);
                }
    			//这里很重要,dubbo会为所有的会给每个服务提供者的实现类生产一个Wrapper类,这个Wrapper类里面最终调用服务提供者的接口实现类,Wrapper类的存在是为了减少反射的调用,后面会再讲解一下这一步
                String[] methods = Wrapper.getWrapper(interfaceClass).getMethodNames();
                if (methods.length == 0) {
                    logger.warn("No method found in service interface " + interfaceClass.getName());
                    map.put(Constants.METHODS_KEY, Constants.ANY_VALUE);
                } else {
                    map.put(Constants.METHODS_KEY, StringUtils.join(new HashSet<String>(Arrays.asList(methods)), ","));
                }
            }
            if (!ConfigUtils.isEmpty(token)) {
                if (ConfigUtils.isDefault(token)) {
                    map.put(Constants.TOKEN_KEY, UUID.randomUUID().toString());
                } else {
                    map.put(Constants.TOKEN_KEY, token);
                }
            }
            // 拼接URL对象
            String host = this.findConfigedHosts(protocolConfig, registryURLs, map);
            Integer port = this.findConfigedPorts(protocolConfig, name, map);
            URL url = new URL(name, host, port, getContextPath(protocolConfig).map(p -> p + "/" + path).orElse(path), map);
    
            if (ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class)
                    .hasExtension(url.getProtocol())) {
                url = ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class)
                        .getExtension(url.getProtocol()).getConfigurator(url).configure(url);
            }
    		//根据服务类型来决定导出的是本地服务还是远程服务,本地导出使用的是injvm协议,是一个伪协议,它不会开启端口,只在JVM内部调用,但是还是会执行dubbo的filter,在默认情况下,Dubbo同时支持本地导出与远程导出协议,可以通过ServiceConfig的setScope()方法设置
            String scope = url.getParameter(Constants.SCOPE_KEY);
            // don't export when none is configured
            if (!Constants.SCOPE_NONE.equalsIgnoreCase(scope)) {
    
                // export to local if the config is not remote (export to remote only when config is remote)
                if (!Constants.SCOPE_REMOTE.equalsIgnoreCase(scope)) {
                    exportLocal(url);
                }
                // export to remote if the config is not local (export to local only when config is local)
                //当模式不等于local的时候,则导出远程服务
                if (!Constants.SCOPE_LOCAL.equalsIgnoreCase(scope)) {
                    if (logger.isInfoEnabled()) {
                        logger.info("Export dubbo service " + interfaceClass.getName() + " to url " + url);
                    }
                    if (CollectionUtils.isNotEmpty(registryURLs)) {
                        for (URL registryURL : registryURLs) {
                            url = url.addParameterIfAbsent(Constants.DYNAMIC_KEY, registryURL.getParameter(Constants.DYNAMIC_KEY));
                            URL monitorUrl = loadMonitor(registryURL);
                            if (monitorUrl != null) {
                                url = url.addParameterAndEncoded(Constants.MONITOR_KEY, monitorUrl.toFullString());
                            }
                            if (logger.isInfoEnabled()) {
                                logger.info("Register dubbo service " + interfaceClass.getName() + " url " + url + " to registry " + registryURL);
                            }
    
                            // For providers, this is used to enable custom proxy to generate invoker
                            String proxy = url.getParameter(Constants.PROXY_KEY);
                            if (StringUtils.isNotEmpty(proxy)) {
                                registryURL = registryURL.addParameter(Constants.PROXY_KEY, proxy);
                            }
    						//导出服务的核心代码
                            Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString()));
                            DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
    
                            Exporter<?> exporter = protocol.export(wrapperInvoker);
                            exporters.add(exporter);
                        }
                    } else {
                    //这里是配置为直连方式    
                        Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, url);
                        DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
    
                        Exporter<?> exporter = protocol.export(wrapperInvoker);
                        exporters.add(exporter);
                    }
                    /**
                     * @since 2.7.0
                     * ServiceData Store
                     */
                    //元数据存储,将元数据保存到指定配置中心。在Dubbo 2.7.0中对服务元数据进行了改造,其把原来都保存到服务注册中心的元数据进行了分类存储,注册中心将只用于存储关键服务信息,比如服务提供者地址列表、完整的接口定义等。
                    MetadataReportService metadataReportService = null;
                    if ((metadataReportService = getMetadataReportService()) != null) {
                        metadataReportService.publishProvider(url);
                    }
                }
            }
            this.urls.add(url);
        }
    
  6. 服务端导出的核心代码为

    Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString()));
                            DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
    
                            Exporter<?> exporter = protocol.export(wrapperInvoker);
                            exporters.add(exporter);
    

    其中proxyFactory和protocol都是dubbo使用SPI的扩展类,来源于

    
    private static final ProxyFactory proxyFactory = ExtensionLoader.getExtensionLoader(ProxyFactory.class).getAdaptiveExtension();
    
    private static final Protocol protocol = ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension();
    
    

    执行代码proxyFactory.getInvoker(ref,(Class) interfaceClass,url) 时,我们发现实际上是首先执行扩展接口ProxyFactory 的适配器类ProxyFactory$Adaptive 的getInvoker()方法,其内部根据URL里的proxy的类型选择具体的代理工厂,这里默认proxy 类型为javassist,所以又调用了JavassistProxyFactory的getInvoker()方法获取了代理类。

    image-20200722114053452

    而JavassistProxyFactory中的getInvoker()方法代码如下

        public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {
            // TODO Wrapper cannot handle this scenario correctly: the classname contains '$'
            final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type);
            return new AbstractProxyInvoker<T>(proxy, type, url) {
                @Override
                protected Object doInvoke(T proxy, String methodName,
                                          Class<?>[] parameterTypes,
                                          Object[] arguments) throws Throwable {
                    return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments);
                }
            };
        }
    

    这里首先把服务实现类转换为Wrapper 类,是为了减少反射的调用,这里返回的是AbstractProxyInvoker对象,其内部重写doInvoke()方法,并委托给Wrapper 实现具体功能,这样我们就获取到了最终的Invoker

    Invoker转换为Exporter

    image-20200722143655421

    当执行protocol.export(wrapperInvoker) 方法的时候,实际调用了Protocol 的适配器类Protocol$Adaptive 的export() 方法。如果为远程服务暴露,则其内部根据URL中Protocol的类型为registry,会选择Protocol 的实现类RegistryProtocol。如果为本地服务暴露,则其内部根据URL 中Protocol的类型为injvm,会选择Protocol 的实现类InjvmProtocol。但由于Dubbo SPI 的扩展点使用了Wrapper 自动增强,这里就使用了ProtocolFilterWrapper、ProtocolListenerWrapper、QosProtocolWrapper 对其进行了增强,所以需要一层层调用才会调用到RegistryProtocol的export()方法。

    image-20200722150508981

​ 进入RegistryProtocol的export()方法后,在此方法中启动了相应的NettyServer,将服务注册到相应的注册中心上

image-20200722155212540

RegistryProtocol如何启动netty服务器

首先先来看RegistryProtocol是如何启动Netty服务器的,查看RegistryProtocol启动nettyServer的时序图,

image-20200722155701795

进入RegistryProtocol中的doLocalExport()方法

    private <T> ExporterChangeableWrapper<T> doLocalExport(final Invoker<T> originInvoker, URL providerUrl) {
        String key = getCacheKey(originInvoker);

        return (ExporterChangeableWrapper<T>) bounds.computeIfAbsent(key, s -> {
            Invoker<?> invokerDelegete = new InvokerDelegate<>(originInvoker, providerUrl);
            return new ExporterChangeableWrapper<>((Exporter<T>) protocol.export(invokerDelegete), originInvoker);
        });
    }

这里会根据URL的协议进入到不同的协议处理类中,这里我们使用的是dubbo,即会进入DubboProtocol中,

public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
        URL url = invoker.getUrl();

        // export service.
        String key = serviceKey(url);
    	//将invoker转换为Exporter
        DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);
        exporterMap.put(key, exporter);
		......
        //开启netty服务,同一台机器上不同的service也只会开启一个nettyServer
        openServer(url);
        optimizeSerialization(url);

        return exporter;
    }

image-20200722152601929

这里可以看到服务开启之后会以IP+端口为Key放入缓存中,如果Map中已经有这个启动好的服务缓存了,那么就不会再开启新的netty服务了,如果没有启动服务,那么会进入createServer(url)方法来创建

image-20200722160338754

这里因为使用的是netty协议,最终会进入到NettyTransporter类中,并执行bind()方法

    public Server bind(URL url, ChannelHandler listener) throws RemotingException {
        return new NettyServer(url, listener);
    }

这里看到new了一个NettyServer,进入对应的构造方法查看,调用了父类的初始化方法

    public NettyServer(URL url, ChannelHandler handler) throws RemotingException {
        super(url, ChannelHandlers.wrap(handler, ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME)));
    }

这里使用子类的doOpen()方法启动了netty的服务监听

 public AbstractServer(URL url, ChannelHandler handler) throws RemotingException {
       ...
        try {
            //启动netty服务监听
            doOpen();
            if (logger.isInfoEnabled()) {
                logger.info("Start " + getClass().getSimpleName() + " bind " + getBindAddress() + ", export " + getLocalAddress());
            }
        } catch (Throwable t) {
            throw new RemotingException(url.toInetSocketAddress(), null, "Failed to bind " + getClass().getSimpleName()
                    + " on " + getLocalAddress() + ", cause: " + t.getMessage(), t);
        }
     ...
    }

查看具体的org.apache.dubbo.remoting.transport.netty4.NettyServer#doOpen方法,可以看到就是启动了一个netty的服务监听

 protected void doOpen() throws Throwable {
     //创建ServerBootstrap
        bootstrap = new ServerBootstrap();
	//设置netty的线程池
        bossGroup = new NioEventLoopGroup(1, new DefaultThreadFactory("NettyServerBoss", true));
        workerGroup = new NioEventLoopGroup(getUrl().getPositiveParameter(Constants.IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS),
                new DefaultThreadFactory("NettyServerWorker", true));

     //配置nettyServer
        final NettyServerHandler nettyServerHandler = new NettyServerHandler(getUrl(), this);
   
        channels = nettyServerHandler.getChannels();

        bootstrap.group(bossGroup, workerGroup)
                .channel(NioServerSocketChannel.class)
                .childOption(ChannelOption.TCP_NODELAY, Boolean.TRUE)
                .childOption(ChannelOption.SO_REUSEADDR, Boolean.TRUE)
                .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
                .childHandler(new ChannelInitializer<NioSocketChannel>() {
                    @Override
                    protected void initChannel(NioSocketChannel ch) throws Exception {
                        // FIXME: should we use getTimeout()?
                        int idleTimeout = UrlUtils.getIdleTimeout(getUrl());
                        NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this);
                        ch.pipeline()//.addLast("logging",new LoggingHandler(LogLevel.INFO))//for debug
                            //编码器
                                .addLast("decoder", adapter.getDecoder())
                            //解码器
                                .addLast("encoder", adapter.getEncoder())
                            //心跳的handler
                                .addLast("server-idle-handler", new IdleStateHandler(0, 0, idleTimeout, MILLISECONDS))
                            //业务处理handler
                                .addLast("handler", nettyServerHandler);
                    }
                });
        // bind本地端口,启动netty服务
        ChannelFuture channelFuture = bootstrap.bind(getBindAddress());
        channelFuture.syncUninterruptibly();
        channel = channelFuture.channel();

    }

RegistryProtocol如何获取注册中心并将服务注册到注册中心

image-20200722163202039

首先先查看的RegistryProtocol下的getRegistry方法,这里invoker会返回相对应的URL,然后根据url获取对应的注册中心,这里是使用的zookeeper注册中心,所以这里最终会使用ZookeeperRegistry

    private Registry getRegistry(final Invoker<?> originInvoker) {
        //返回注册URL
        URL registryUrl = getRegistryUrl(originInvoker);
        //根据url中的找到对应的注册中心,这里使用的是ZookeeperRegistry
        return registryFactory.getRegistry(registryUrl);
    }

    private URL getRegistryUrl(Invoker<?> originInvoker) {
        URL registryUrl = originInvoker.getUrl();
        if (REGISTRY_PROTOCOL.equals(registryUrl.getProtocol())) {
            String protocol = registryUrl.getParameter(REGISTRY_KEY, DEFAULT_DIRECTORY);
            registryUrl = registryUrl.setProtocol(protocol).removeParameter(REGISTRY_KEY);
        }
        return registryUrl;
    }
    public Registry getRegistry(URL url) {
        url = URLBuilder.from(url)
                .setPath(RegistryService.class.getName())
                .addParameter(Constants.INTERFACE_KEY, RegistryService.class.getName())
                .removeParameters(Constants.EXPORT_KEY, Constants.REFER_KEY)
                .build();
        String key = url.toServiceStringWithoutResolving();
        // Lock the registry access process to ensure a single instance of the registry
        LOCK.lock();
        try {
            Registry registry = REGISTRIES.get(key);
            if (registry != null) {
                return registry;
            }
            //创建服务注册中心
            registry = createRegistry(url);
            if (registry == null) {
                throw new IllegalStateException("Can not create registry " + url);
            }
            REGISTRIES.put(key, registry);
            return registry;
        } finally {
            // Release the lock
            LOCK.unlock();
        }
    }
//初始化zookeeper注册中心连接实例
public Registry createRegistry(URL url) {
        return new ZookeeperRegistry(url, zookeeperTransporter);
    }

然后步骤6中的register(registryUrl, registeredProviderUrl)方法将具体的url注册到zookeeper中,最终调用的是zookeeper中的doRegister方法

    public void doRegister(URL url) {
        try {
            //创建对应的节点
            zkClient.create(toUrlPath(url), url.getParameter(Constants.DYNAMIC_KEY, true));
        } catch (Throwable e) {
            throw new RpcException("Failed to register " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
        }
    }

原始URL为:

dubbo://169.254.136.26:20880/com.zhengyao.api.service.GreetingService?anyhost=true&application=first-dubbo-provider&default.deprecated=false&default.dynamic=false&default.register=true&deprecated=false&dubbo=2.0.2&dynamic=false&generic=false&group=dubbo&interface=com.zhengyao.api.service.GreetingService&methods=sayHello,testGeneric&pid=19576&register=true&release=2.7.1&revision=1.0.0&side=provider&timestamp=1595406360934&version=1.0.0

经过toUrlPath转换后的URL为:

/dubbo/com.zhengyao.api.service.GreetingService/providers/dubbo://169.254.136.26:20880/com.zhengyao.api.service.GreetingService?anyhost=true&application=first-dubbo-provider&default.deprecated=false&default.dynamic=false&default.register=true&deprecated=false&dubbo=2.0.2&dynamic=false&generic=false&group=dubbo&interface=com.zhengyao.api.service.GreetingService&methods=sayHello,testGeneric&pid=19576&register=true&release=2.7.1&revision=1.0.0&side=provider&timestamp=1595406360934&version=1.0.0

create()方法是递归函数,首先其调用了方法createPersistent()分别创建了临时节点/dubbo、/dubbo/com.zhengyao.api.service.GreetingService和/dubbo/com.zhengyao.api.service.GreetingService/providers

最终,建立的zookeeper路径如下:

image-20200722164953136

第一层Root节点说明ZooKeeper的服务分组为Dubbo,第二层Service节点说明注册的服务为service接口,第三层Type节点说明是为服务提供者注册的服务,第四层URL记录服务提供者的地址信息(这里只是简单地显示了服务提供者的IP信息,对于真实情况则不只是IP信息)。
第一个服务提供者注册时需要ZooKeeper服务端创建第一层的Dubbo节点、第二层的Service节点、第三层的Type节点,但是同一个Service的其他机器在注册服务时因为上面三层节点已经存在了,所以只需在Providers下也就是第四层插入服务提供者信息节点就可以了。
服务注册到ZooKeeper 后,消费端就可以在Providers节点下找到对应service服务的所有服务提供者,然后根据设置的负载均衡策略选择机器进行远程调用了。

dubbo服务端如何处理一次请求

消费端发起一次消费请求后,会与服务端建立TCP连接,服务端的处理时序图

image-20200723143243341

消费端发起TCP链接并完成后,服务端NettyServer的connected方法会被激活,该方法执行是在Netty的I/O线程上执行的,为了及时释放I/O线程,Netty默认的线程模型为All,所有消息派发到Dubbo内部的业务线程池中进行处理,这些消息包括了请求事件、响应事件、断开事件、心跳事件等,对应的AllChannelHandler把I/O线程接收到的所有消息包装为ChannelEventRunnable任务都投递到线程池中。

线程池中的任务执行后,最终会调用DubboProtocol的connected()方法

public void connected(Channel channel) throws RemotingException {
            invoke(channel, Constants.ON_CONNECT_KEY);
}
private void invoke(Channel channel, String methodKey) {
    //创建Invocation对象
            Invocation invocation = createInvocation(channel, channel.getUrl(), methodKey);
    //不为null的话调用received进行处理
            if (invocation != null) {
                try {
                    received(channel, invocation);
                } catch (Throwable t) {
                    logger.warn("Failed to invoke event method " + invocation.getMethodName() + "(), cause: " + t.getMessage(), t);
                }
            }
        }
public void received(Channel channel, Object message) throws RemotingException {
            if (message instanceof Invocation) {
                reply((ExchangeChannel) channel, message);

            } else {
                super.received(channel, message);
            }
 }

private Invocation createInvocation(Channel channel, URL url, String methodKey) {
     //如果url中不包含key,直接返回null      
    String method = url.getParameter(methodKey);
            if (method == null || method.length() == 0) {
                return null;
            }
	//根据method创建相应的RpcInvocation对象
            RpcInvocation invocation = new RpcInvocation(method, new Class<?>[0], new Object[0]);
            invocation.setAttachment(Constants.PATH_KEY, url.getPath());
            invocation.setAttachment(Constants.GROUP_KEY, url.getParameter(Constants.GROUP_KEY));
            invocation.setAttachment(Constants.INTERFACE_KEY, url.getParameter(Constants.INTERFACE_KEY));
            invocation.setAttachment(Constants.VERSION_KEY, url.getParameter(Constants.VERSION_KEY));
            if (url.getParameter(Constants.STUB_EVENT_KEY, false)) {
                invocation.setAttachment(Constants.STUB_EVENT_KEY, Boolean.TRUE.toString());
            }

            return invocation;
        }
    };

在这里的URL内不包含Constants.ON_CONNECT_KEY,所以直接返回了null。

然后进入步骤12-22的received部分,类似于connected,received事件被投递到线程池后进行异步处理。线程池任务被激活后调用了HeaderExchangeHandler的received()方法

public void received(Channel channel, Object message) throws RemotingException {
        channel.setAttribute(KEY_READ_TIMESTAMP, System.currentTimeMillis());
        final ExchangeChannel exchangeChannel = HeaderExchangeChannel.getOrAddChannel(channel);
        try {
            //判断是处理请求还是返回结果
            if (message instanceof Request) {
                // handle request.
                Request request = (Request) message;
                if (request.isEvent()) {
                    handlerEvent(channel, request);
                } else {
                    //需要有返回值的请求,req-res,twoWay
                    if (request.isTwoWay()) {
                        handleRequest(exchangeChannel, request);
                    } else {
                    //不需要有返回值的请求req,oneway ,handler是DubboProtocol,直接调用handle的received方法 
                        handler.received(exchangeChannel, request.getData());
                    }
                }
             //判断是处理请求还是返回结果    
            } else if (message instanceof Response) {
                handleResponse(channel, (Response) message);
            } else if (message instanceof String) {
            } else {
                handler.received(exchangeChannel, message);
            }
        } finally {
            HeaderExchangeChannel.removeChannelIfDisconnected(channel);
        }
    }

如果请求不需要响应结果则直接调用DubboProtocol的received()方法,否则执行handleRequest()方法

    void handleRequest(final ExchangeChannel channel, Request req) throws RemotingException {
        Response res = new Response(req.getId(), req.getVersion());
		.........
        // find handler by message class.
        Object msg = req.getData();
        try {
            // 最终还是调用DubboProtocol的reply方法
            CompletableFuture<Object> future = handler.reply(channel, msg);
            if (future.isDone()) {
               //如果请求处理完成,则设置相应结果并写回 
                res.setStatus(Response.OK);
                res.setResult(future.get());
                channel.send(res);
                return;
            }
            //否则等待返回结果后再进行回调
            future.whenComplete((result, t) -> {
                try {
                    if (t == null) {
                        res.setStatus(Response.OK);
                        res.setResult(result);
                    } else {
                        res.setStatus(Response.SERVICE_ERROR);
                        res.setErrorMessage(StringUtils.toString(t));
                    }
                    channel.send(res);
                } catch (RemotingException e) {
                }
            });
        } catch (Throwable e) {
        }
    }

如果请求需要返回值则执行handleRequest()方法,其也是委托给DubboProtocol的reply()方法来执行的。如果执行结果已经完成,则直接将结果写回消费端,否则使用异步回调方式(避免当前线程被阻塞),等执行完毕并拿到结果后再把结果写回消费端,所以无论是有没有返回值,最终都要调用DubboProtocol类的reply()方法

        public CompletableFuture<Object> reply(ExchangeChannel channel, Object message) throws RemotingException {
            Invocation inv = (Invocation) message;
            //获取调用需要使用的invoker
            Invoker<?> invoker = getInvoker(channel, inv);
			.....
            //获取上下文对象,并设置对端的地址    
            RpcContext rpcContext = RpcContext.getContext();
            rpcContext.setRemoteAddress(channel.getRemoteAddress());
            //执行invoker调用链
            Result result = invoker.invoke(inv);
			//返回结果
            //如果是AsyncRpcResult说明为服务提供方的异步实行
            if (result instanceof AsyncRpcResult) {
                return ((AsyncRpcResult) result).getResultFuture().thenApply(r -> (Object) r);

            } else {
            //同步执行,转换结果为CompletableFuture
                return CompletableFuture.completedFuture(result);
            }
        }

服务端启动的时候导出的DubboExporter对象会保存到exporterMap中,这里的getInvoker获取的是RegistryProtocol$InvokerDelegate

Invoker<?> getInvoker(Channel channel, Invocation inv) throws RemotingException {

        String serviceKey = serviceKey(port, path, inv.getAttachments().get(Constants.VERSION_KEY), inv.getAttachments().get(Constants.GROUP_KEY));
    //从exporterMap中根据serviceKey获取exporter
        DubboExporter<?> exporter = (DubboExporter<?>) exporterMap.get(serviceKey);

        if (exporter == null) {
            throw new RemotingException(channel, "Not found exported service: " + serviceKey + " in " + exporterMap.keySet() + ", may be version or group mismatch " +
                    ", channel: consumer: " + channel.getRemoteAddress() + " --> provider: " + channel.getLocalAddress() + ", message:" + inv);
        }
	//获取最终的invoker
        return exporter.getInvoker();
    }

在调用InvokerDelegate的invoke()方法前会先经过Filter链,然后InvokerDelegate会调用服务提供方启动时AbstractProxyInvoker代理类的invoke()方法,其代码如下:

public Result invoke(Invocation invocation) throws RpcException {
        RpcContext rpcContext = RpcContext.getContext();
        try {
            //执行服务调用
            Object obj = doInvoke(proxy, invocation.getMethodName(), invocation.getParameterTypes(), invocation.getArguments());
            //判断返回结果是不是CompletableFuture类型
            if (RpcUtils.isReturnTypeFuture(invocation)) {
                return new AsyncRpcResult((CompletableFuture<Object>) obj);
            //判断是否使用了rpcContext.startAsync()开启异步执行    
            } else if (rpcContext.isAsyncStarted()) { // ignore obj in case of RpcContext.startAsync()? always rely on user to write back.
                return new AsyncRpcResult(((AsyncContextImpl)(rpcContext.getAsyncContext())).getInternalFuture());
            } else {
            //同步执行    
                return new RpcResult(obj);
            }
        } catch (InvocationTargetException e) {
            }
            return new RpcResult(e.getTargetException());
        } 
    }

而doInvoker就会使用JavaAssist来执行本地服务,以便减少反射调用耗时

    public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {
        // 使用JavaAssist创建Wrapper
        final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type);
        return new AbstractProxyInvoker<T>(proxy, type, url) {
            @Override
            protected Object doInvoke(T proxy, String methodName,
                                      Class<?>[] parameterTypes,
                                      Object[] arguments) throws Throwable {
                return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments);
            }
        };
    }

AbstractProxyInvoker的doInvoke()方法委托Wrapper类的invokeMethod执行具体逻辑,后者则通过调用服务提供方接口的实现类来执行本地服务