# Dubbo学习(二)—使用Dubbo的API配置方式
[TOC]
使用Dubbo的API配置方式,先声明了三个模块,Sdk模块是公用的POJO或者接口类的模块,Consume是消费者,Provider是服务提供者

## Dubbo下同步发布与调用
### Sdk代码编写
这里Sdk代码主要是一个公用的代码,编写一个POJO类
```java
public class PoJo {
private String id;
private String name;
}
```
编写一个Result类,POJO的返回值类型,用于演示泛化调用时的参数转换
```java
public class Result<T> implements Serializable {
private static final long serialVersionUID = 1L;
private T data;
private boolean sucess;
private String msg;
}
```
编写一个GreetingService接口
```java
public interface GreetingService {
String sayHello(String name);
Result<String> testGeneric(PoJo poJo);
}
```
这样SDK模块代码已经完成,下一步开始写服务提供者的代码
### Provider代码编写
编写一个具体的接口实现类GreetingServiceImpl
```java
public class GreetingServiceImpl implements GreetingService {
@Override
public String sayHello(String name) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "Hello " + name + " " + RpcContext.getContext().getAttachment("company");
}
@Override
public Result<String> testGeneric(PoJo poJo) {
Result<String> result = new Result<String>();
result.setSucess(true);
try {
result.setData(JSON.json(poJo));
} catch (IOException e) {
e.printStackTrace();
}
return result;
}
}
```
编写服务端的启动类:
```java
public class ApiProvider {
public static void main(String[] args) throws InterruptedException {
ServiceConfig<GreetingService> serviceConfig = new ServiceConfig<>();
//设置应用程序配置
serviceConfig.setApplication(new ApplicationConfig("first-dubbo-provider"));
//设置服务注册中心信息
RegistryConfig registryConfig = new RegistryConfig("zookeeper://127.0.0.1:21822");
registryConfig.setUsername("admin");
registryConfig.setPassword("admin");
serviceConfig.setRegistry(registryConfig);
//设置接口
serviceConfig.setInterface(GreetingService.class);
//设置实现类
serviceConfig.setRef(new GreetingServiceImpl());
//设置服务分组和版本
serviceConfig.setVersion("1.0.0");
serviceConfig.setGroup("dubbo");
//导出服务并启动NettyServer监听链接请求,并将服务注册到服务注册中心
serviceConfig.export();
System.out.println("server is started!");
TimeUnit.SECONDS.sleep(Integer.MAX_VALUE);
}
}
```
### Consume代码编写
编写一个客户端启动类:
```java
public class ApiConsume {
public static void main(String[] args) {
//创建引用实例
ReferenceConfig<GreetingService> config = new ReferenceConfig<>();
//设置应用程序信息
config.setApplication(new ApplicationConfig("first-dubbo-consumer"));
//设置注册中心
config.setRegistry(new RegistryConfig("zookeeper://127.0.0.1:21822"));
//设置接口类
config.setInterface(GreetingService.class);
//设置超时时间
config.setTimeout(5000);
//设置版本
config.setVersion("1.0.0");
//设置负载均衡策略和集群容错策略
//config.setLoadbalance("loadBalance");
//config.setCluster("myBroadcase");
//设置分组
config.setGroup("dubbo");
//引用服务
GreetingService greetingService = config.get();
//设置全局参数
RpcContext.getContext().setAttachment("company", "yonyou");
//输出结果
System.out.println(greetingService.sayHello("world"));
}
}
```
执行结果为:已经可以成功获取到服务端返回的信息了

## 消费端异步消费
Dubbo 2.7.X版本提供的是基于CompletableFuture的异步调用,下面需要修改消费端的代码
新增ApiConsumeForCompletableFuture类
```java
public class ApiConsumeForCompletableFuture {
public static void main(String[] args) throws InterruptedException {
//创建引用实例
ReferenceConfig<GreetingService> config = new ReferenceConfig<>();
//设置应用程序信息
config.setApplication(new ApplicationConfig("first-dubbo-consumer"));
//设置注册中心
config.setRegistry(new RegistryConfig("zookeeper://127.0.0.1:21822"));
//设置接口类
config.setInterface(GreetingService.class);
//设置超时时间
config.setTimeout(5000);
//设置版本
config.setVersion("1.0.0");
//设置负载均衡策略和集群容错策略
//config.setLoadbalance("loadBalance");
//config.setCluster("myBroadcase");
//设置分组
config.setGroup("dubbo");
//设置为异步
config.setAsync(true);
//引用服务
GreetingService greetingService = config.get();
//设置全局参数
RpcContext.getContext().setAttachment("company", "yonyou");
//输出结果
System.out.println(greetingService.sayHello("world"));
System.out.println(greetingService.sayHello("world2"));
System.out.println(greetingService.sayHello("world3"));
CompletableFuture<String> future = RpcContext.getContext().getCompletableFuture();
future.whenComplete((v,t)->{
if (t != null) {
t.printStackTrace();
}else {
System.out.println(v);
}
});
System.out.println("结束查询");
Thread.currentThread().join();
}
}
```
使用RpcContext.getContext().getCompletableFuture()获取CompletableFuture类型的future,然后就可以基于CompletableFuture的能力做一系列操作,发起调用的时候会立即返回一个null,其次通过调用whenComplete()方法设置了回调函数,作用是当服务提供端产生响应结果后调用设置的回调函数,函数内判断如果异常t不为null,则打印异常信息,否则打印响应结果。
这里多次调用服务端返回的也是最后一次调用的结果:

## 服务端提供异步消费
### 基于定义CompletableFuture签名的接口实现异步执行
首先在Sdk模块下需要增加一个提供异步能力的接口GrettingServiceAsync
```java
public interface GrettingServiceAsync {
CompletableFuture<String> sayHello(String name);
}
```
在Provider模块中,基于CompletableFuture签名的接口实现异步执行的接口实现类为GrettingServiceAsyncImpl
```java
public class GrettingServiceAsyncImpl implements GrettingServiceAsync {
/**
* 声明线程池
*/
private final ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(8, 16, 1, TimeUnit.MINUTES,
new SynchronousQueue<>(),new NamedThreadFactory("com.dubbo.async"),new ThreadPoolExecutor.CallerRunsPolicy());
@Override
public CompletableFuture<String> sayHello(String name) {
RpcContext rpcContext = RpcContext.getContext();
return CompletableFuture.supplyAsync(()->{
System.out.println("sync.return");
return "Hello " + name + " " + rpcContext.getAttachment("company");
},threadPoolExecutor);
}
}
```
基于定义CompletableFuture签名的接口实现异步执行需要接口方法的返回值为CompletableFuture,并且方法内部使用CompletableFuture.supplyAsync让本该由Dubbo内部线程池中的线程处理的服务,转换为由业务自定义线程池中的线程来处理,CompletableFuture.supplyAsync()方法会马上返回一个CompletableFuture对象(所以Dubbo内部线程池中的线程会得到及时释放),传递的业务函数则由业务线程池threadPoolExecutor执行。
增加服务端发布类ApiProviderSync
```java
public class ApiProviderSync {
public static void main(String[] args) throws InterruptedException {
ServiceConfig<GrettingServiceAsync> serviceConfig = new ServiceConfig<>();
ApplicationConfig applicationConfig = new ApplicationConfig("first-dubbo-provider");
applicationConfig.setQosPort(33333);
//设置应用程序配置
serviceConfig.setApplication(applicationConfig);
//设置服务注册中心信息
RegistryConfig registryConfig = new RegistryConfig("zookeeper://127.0.0.1:21822");
registryConfig.setUsername("admin");
registryConfig.setPassword("admin");
serviceConfig.setRegistry(registryConfig);
//设置接口
serviceConfig.setInterface(GrettingServiceAsync.class);
//设置实现类
serviceConfig.setRef(new GrettingServiceAsyncImpl());
//设置服务分组和版本
serviceConfig.setVersion("1.0.0");
serviceConfig.setGroup("dubbo");
//导出服务并启动NettyServer监听链接请求,并将服务注册到服务注册中心
serviceConfig.export();
System.out.println("server is started!");
TimeUnit.SECONDS.sleep(Integer.MAX_VALUE);
}
}
```
消费端需要根据原有代码略微修改一下,新增ApiConsumeForProviderSync类
```java
public class ApiConsumeForProviderSync {
public static void main(String[] args) throws InterruptedException {
//创建引用实例
ReferenceConfig<GrettingServiceAsync> config = new ReferenceConfig<>();
//设置应用程序信息
ApplicationConfig applicationConfig = new ApplicationConfig("first-dubbo-consumer");
//设置端口
applicationConfig.setQosPort(33333);
config.setApplication(applicationConfig);
//设置注册中心
config.setRegistry(new RegistryConfig("zookeeper://127.0.0.1:21822"));
//设置接口类
config.setInterface(GrettingServiceAsync.class);
//设置超时时间
config.setTimeout(5000);
//设置版本
config.setVersion("1.0.0");
//设置负载均衡策略和集群容错策略
//config.setLoadbalance("loadBalance");
//config.setCluster("myBroadcase");
//设置分组
config.setGroup("dubbo");
//设置为异步
config.setAsync(true);
//引用服务
GrettingServiceAsync greetingService = config.get();
//设置全局参数
RpcContext.getContext().setAttachment("company", "yonyou");
//输出结果
CompletableFuture<String> future = greetingService.sayHello("world");
System.out.println(future);
future.whenComplete((v,t)->{
if (t != null) {
t.printStackTrace();
}else {
System.out.println(v);
}
});
TimeUnit.SECONDS.sleep(Integer.MAX_VALUE);
}
}
```

它与客户端使用异步消费的差异是,返回就是异步的CompletableFuture结果,直接在该结果上面进行whenComplete事件处理
### 基于AsyncContext实现异步执行
服务端增加GrettingServiceAsyncContextImpl代码,RpcContext.startAsync()方法开启服务异步执行,返回一个asyncContext,然后把服务处理任务提交到业务线程池,这样Dubbo的线程调用马上返回的是null,就不会阻塞Dubbo的线程,使用的是业务线程。
```java
public class GrettingServiceAsyncContextImpl implements GrettingServiceRpcContext {
// 1.创建业务自定义线程池
private final ThreadPoolExecutor bizThreadpool = new ThreadPoolExecutor(8, 16, 1, TimeUnit.MINUTES,
new SynchronousQueue(), new NamedThreadFactory("biz-thread-pool"),
new ThreadPoolExecutor.CallerRunsPolicy());
// 2.创建服务处理接口,返回值为CompletableFuture
@Override
public String sayHello(String name) {
// 2.1开启异步
final AsyncContext asyncContext = RpcContext.startAsync();
bizThreadpool.execute(() -> {
// 2.2 如果要使用上下文,则必须要放在第一句执行
asyncContext.signalContextSwitch();
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
// 2.3写回响应
asyncContext.write("Hello " + name + " " + RpcContext.getContext().getAttachment("company"));
});
return null;
}
}
```
Dubbo学习(二)—使用Dubbo的API配置方式