Dubbo学习(二)—使用Dubbo的API配置方式

Scroll Down

Dubbo学习(二)—使用Dubbo的API配置方式

使用Dubbo的API配置方式,先声明了三个模块,Sdk模块是公用的POJO或者接口类的模块,Consume是消费者,Provider是服务提供者

image-20200715165347772

Dubbo下同步发布与调用

Sdk代码编写

这里Sdk代码主要是一个公用的代码,编写一个POJO类

public class PoJo {

	private String  id;
	private String name;
}

编写一个Result类,POJO的返回值类型,用于演示泛化调用时的参数转换

public class Result<T> implements Serializable {

	private static final long serialVersionUID = 1L;
	private T data;
	private boolean sucess;
	private String msg;
}

编写一个GreetingService接口

public interface GreetingService {
	String sayHello(String name);
	
	Result<String> testGeneric(PoJo poJo);
}

这样SDK模块代码已经完成,下一步开始写服务提供者的代码

Provider代码编写

编写一个具体的接口实现类GreetingServiceImpl

public class GreetingServiceImpl implements GreetingService {

	@Override
	public String sayHello(String name) {
		
		try {
			Thread.sleep(1000);
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
		return "Hello " + name + " " + RpcContext.getContext().getAttachment("company");
	}

	@Override
	public Result<String> testGeneric(PoJo poJo) {
		Result<String> result = new Result<String>();
		result.setSucess(true);
		try {
			result.setData(JSON.json(poJo));
		} catch (IOException e) {
			e.printStackTrace();
		}
		
		return result;
	}
}   

编写服务端的启动类:

public class ApiProvider {
    public static void main(String[] args) throws InterruptedException {
        ServiceConfig<GreetingService> serviceConfig = new ServiceConfig<>();
        //设置应用程序配置
        serviceConfig.setApplication(new ApplicationConfig("first-dubbo-provider"));
        //设置服务注册中心信息
        RegistryConfig registryConfig = new RegistryConfig("zookeeper://127.0.0.1:21822");
        registryConfig.setUsername("admin");
        registryConfig.setPassword("admin");
        serviceConfig.setRegistry(registryConfig);
        //设置接口
        serviceConfig.setInterface(GreetingService.class);
        //设置实现类
        serviceConfig.setRef(new GreetingServiceImpl());
        //设置服务分组和版本
        serviceConfig.setVersion("1.0.0");
        serviceConfig.setGroup("dubbo");
        //导出服务并启动NettyServer监听链接请求,并将服务注册到服务注册中心
        serviceConfig.export();
        System.out.println("server is started!");
        TimeUnit.SECONDS.sleep(Integer.MAX_VALUE);
    }
}

Consume代码编写

编写一个客户端启动类:

public class ApiConsume {
    public static void main(String[] args) {
        //创建引用实例
        ReferenceConfig<GreetingService> config = new ReferenceConfig<>();
        //设置应用程序信息
        config.setApplication(new ApplicationConfig("first-dubbo-consumer"));
        //设置注册中心
        config.setRegistry(new RegistryConfig("zookeeper://127.0.0.1:21822"));
        //设置接口类
        config.setInterface(GreetingService.class);
        //设置超时时间
        config.setTimeout(5000);
        //设置版本
        config.setVersion("1.0.0");
        //设置负载均衡策略和集群容错策略
        //config.setLoadbalance("loadBalance");
        //config.setCluster("myBroadcase");
        //设置分组
        config.setGroup("dubbo");
        //引用服务
        GreetingService greetingService = config.get();
        //设置全局参数
        RpcContext.getContext().setAttachment("company", "yonyou");
        //输出结果
        System.out.println(greetingService.sayHello("world"));
    }
}

执行结果为:已经可以成功获取到服务端返回的信息了

image-20200715170202727

消费端异步消费

Dubbo 2.7.X版本提供的是基于CompletableFuture的异步调用,下面需要修改消费端的代码

新增ApiConsumeForCompletableFuture类

public class ApiConsumeForCompletableFuture {
    public static void main(String[] args) throws InterruptedException {
        //创建引用实例
        ReferenceConfig<GreetingService> config = new ReferenceConfig<>();
        //设置应用程序信息
        config.setApplication(new ApplicationConfig("first-dubbo-consumer"));
        //设置注册中心
        config.setRegistry(new RegistryConfig("zookeeper://127.0.0.1:21822"));
        //设置接口类
        config.setInterface(GreetingService.class);
        //设置超时时间
        config.setTimeout(5000);
        //设置版本
        config.setVersion("1.0.0");
        //设置负载均衡策略和集群容错策略
        //config.setLoadbalance("loadBalance");
        //config.setCluster("myBroadcase");
        //设置分组
        config.setGroup("dubbo");
        //设置为异步
        config.setAsync(true);
        //引用服务
        GreetingService greetingService = config.get();
        //设置全局参数
        RpcContext.getContext().setAttachment("company", "yonyou");
        //输出结果
        System.out.println(greetingService.sayHello("world"));
        System.out.println(greetingService.sayHello("world2"));
        System.out.println(greetingService.sayHello("world3"));
        CompletableFuture<String> future = RpcContext.getContext().getCompletableFuture();
        future.whenComplete((v,t)->{
            if (t != null) {
                t.printStackTrace();
            }else {
                System.out.println(v);
            }
        });
        System.out.println("结束查询");
        Thread.currentThread().join();
    }
}

使用RpcContext.getContext().getCompletableFuture()获取CompletableFuture类型的future,然后就可以基于CompletableFuture的能力做一系列操作,发起调用的时候会立即返回一个null,其次通过调用whenComplete()方法设置了回调函数,作用是当服务提供端产生响应结果后调用设置的回调函数,函数内判断如果异常t不为null,则打印异常信息,否则打印响应结果。

这里多次调用服务端返回的也是最后一次调用的结果:

image-20200715170546946

服务端提供异步消费

基于定义CompletableFuture签名的接口实现异步执行

首先在Sdk模块下需要增加一个提供异步能力的接口GrettingServiceAsync

public interface GrettingServiceAsync {
	CompletableFuture<String> sayHello(String name);
}

在Provider模块中,基于CompletableFuture签名的接口实现异步执行的接口实现类为GrettingServiceAsyncImpl

public class GrettingServiceAsyncImpl implements GrettingServiceAsync {
    /**
     * 声明线程池
     */
    private final ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(8, 16, 1, TimeUnit.MINUTES,
            new SynchronousQueue<>(),new NamedThreadFactory("com.dubbo.async"),new ThreadPoolExecutor.CallerRunsPolicy());
    @Override
    public CompletableFuture<String> sayHello(String name) {
        RpcContext rpcContext = RpcContext.getContext();
        return   CompletableFuture.supplyAsync(()->{
            System.out.println("sync.return");
            return "Hello " + name + " " + rpcContext.getAttachment("company");
        },threadPoolExecutor);
    }
}

基于定义CompletableFuture签名的接口实现异步执行需要接口方法的返回值为CompletableFuture,并且方法内部使用CompletableFuture.supplyAsync让本该由Dubbo内部线程池中的线程处理的服务,转换为由业务自定义线程池中的线程来处理,CompletableFuture.supplyAsync()方法会马上返回一个CompletableFuture对象(所以Dubbo内部线程池中的线程会得到及时释放),传递的业务函数则由业务线程池threadPoolExecutor执行。

增加服务端发布类ApiProviderSync

public class ApiProviderSync {
    public static void main(String[] args) throws InterruptedException {
        ServiceConfig<GrettingServiceAsync> serviceConfig = new ServiceConfig<>();
        ApplicationConfig applicationConfig = new ApplicationConfig("first-dubbo-provider");
        applicationConfig.setQosPort(33333);
        //设置应用程序配置
        serviceConfig.setApplication(applicationConfig);
        //设置服务注册中心信息
        RegistryConfig registryConfig = new RegistryConfig("zookeeper://127.0.0.1:21822");
        registryConfig.setUsername("admin");
        registryConfig.setPassword("admin");
        serviceConfig.setRegistry(registryConfig);
        //设置接口
        serviceConfig.setInterface(GrettingServiceAsync.class);
        //设置实现类
        serviceConfig.setRef(new GrettingServiceAsyncImpl());
        //设置服务分组和版本
        serviceConfig.setVersion("1.0.0");
        serviceConfig.setGroup("dubbo");
        //导出服务并启动NettyServer监听链接请求,并将服务注册到服务注册中心
        serviceConfig.export();
        System.out.println("server is started!");
        TimeUnit.SECONDS.sleep(Integer.MAX_VALUE);
    }
}

消费端需要根据原有代码略微修改一下,新增ApiConsumeForProviderSync类

public class ApiConsumeForProviderSync {
    public static void main(String[] args) throws InterruptedException {
        //创建引用实例
        ReferenceConfig<GrettingServiceAsync> config = new ReferenceConfig<>();
        //设置应用程序信息
        ApplicationConfig applicationConfig = new ApplicationConfig("first-dubbo-consumer");
        //设置端口
        applicationConfig.setQosPort(33333);
        config.setApplication(applicationConfig);
        //设置注册中心
        config.setRegistry(new RegistryConfig("zookeeper://127.0.0.1:21822"));
        //设置接口类
        config.setInterface(GrettingServiceAsync.class);

        //设置超时时间
        config.setTimeout(5000);
        //设置版本
        config.setVersion("1.0.0");
        //设置负载均衡策略和集群容错策略
        //config.setLoadbalance("loadBalance");
        //config.setCluster("myBroadcase");
        //设置分组
        config.setGroup("dubbo");
        //设置为异步
        config.setAsync(true);
        //引用服务
        GrettingServiceAsync greetingService = config.get();
        //设置全局参数
        RpcContext.getContext().setAttachment("company", "yonyou");
        //输出结果
        CompletableFuture<String> future = greetingService.sayHello("world");
        System.out.println(future);
        future.whenComplete((v,t)->{
            if (t != null) {
                t.printStackTrace();
            }else {
                System.out.println(v);
            }
        });
        TimeUnit.SECONDS.sleep(Integer.MAX_VALUE);
    }
}

image-20200715172727208

它与客户端使用异步消费的差异是,返回就是异步的CompletableFuture结果,直接在该结果上面进行whenComplete事件处理

基于AsyncContext实现异步执行

服务端增加GrettingServiceAsyncContextImpl代码,RpcContext.startAsync()方法开启服务异步执行,返回一个asyncContext,然后把服务处理任务提交到业务线程池,这样Dubbo的线程调用马上返回的是null,就不会阻塞Dubbo的线程,使用的是业务线程。

public class GrettingServiceAsyncContextImpl implements GrettingServiceRpcContext {

	// 1.创建业务自定义线程池
	private final ThreadPoolExecutor bizThreadpool = new ThreadPoolExecutor(8, 16, 1, TimeUnit.MINUTES,
			new SynchronousQueue(), new NamedThreadFactory("biz-thread-pool"),
			new ThreadPoolExecutor.CallerRunsPolicy());

	// 2.创建服务处理接口,返回值为CompletableFuture
	@Override
	public String sayHello(String name) {

		// 2.1开启异步
		final AsyncContext asyncContext = RpcContext.startAsync();
		bizThreadpool.execute(() -> {
			// 2.2 如果要使用上下文,则必须要放在第一句执行
			asyncContext.signalContextSwitch();
			try {
				Thread.sleep(500);
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
			// 2.3写回响应
			asyncContext.write("Hello " + name + " " + RpcContext.getContext().getAttachment("company"));
		});

		return null;
	}
}