Dubbo 服务引用

Dubbo 服务引用

服务引用

  • 大家都知道Dubbo是由consumer,provider,registry这三大部分组成
  • 那么consumer是如何发现provider并调用的呢,就是通过服务引用来实现的,也就是通过发现服务,然后进行调用

服务引用的流程

  • dubbo服务引用的流程大概如上图,不难发现其流程跟dubbo服务暴露互逆,(关于Dubbo服务暴露Dubbo服务暴露)但最终也是通过invoker来完成我们服务引用
  • dubbo服务引用最终通过ProxyFactory将Invoker转化为调用的Service
  • dubbo服务引用过程与dubbo服务暴露相似,都是通过SPI,适配相应的协议,并将服务注册到注册中心,并最终完成服务引用

源码解析

初始化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public class ReferenceBean<T> extends ReferenceConfig<T> implements FactoryBean, ApplicationContextAware, InitializingBean, DisposableBean {

//省略一部分代码

//获取服务接口
@Override
public Object getObject() {
return get();
}

@Override
@SuppressWarnings({"unchecked"})
public void afterPropertiesSet() throws Exception {
//此处省略 配置校验代码
Boolean b = isInit();
if (b == null && getConsumer() != null) {
b = getConsumer().isInit();
}
if (b != null && b) {
//发现服务
getObject();
}
}
}
  • 首先我们来看一下ReferenceBean, ReferenceBean实现了InitializingBean, ApplicationContextAware, ApplicationListener这里同服务暴露一样,通过spring在初始化的时候进行服务引用

服务引用

  • 我们看到这里都调用了getObject()方法,其实是调用了ReferenceConfig中的get()方法,接下来我们一起看下ReferenceConfig中的get()方法
1
2
3
4
5
6
7
8
9
10
11
12
13
public synchronized T get() {
//配置校验
checkAndUpdateSubConfigs();
//如果该服务已被销毁,则抛出异常
if (destroyed) {
throw new IllegalStateException("The invoker of ReferenceConfig(" + url + ") has already destroyed!");
}
//如果服务为空,则进行初始化,否则直接返回
if (ref == null) {
init();
}
return ref;
}
  • 这里看到ReferenceConfig.get方法上加了一个锁,用来保证不会重复发现服务,而该方法的核心在于init()方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
private void init() {
if (initialized) {
return;
}
initialized = true;
checkStubAndLocal(interfaceClass);
//校验mock
checkMock(interfaceClass);
Map<String, String> map = new HashMap<String, String>();

//省略对参数解析设置 ...

//创建代理对象
ref = createProxy(map);

ApplicationModel.initConsumerModel(getUniqueServiceName(), buildConsumerModel(attributes));
}
  • 这里通过对参数的解析来创建服务代理, createProxy()方法是整个服务引用初始化的关键
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
 private T createProxy(Map<String, String> map) {
URL tmpUrl = new URL("temp", "localhost", 0, map);
final boolean isJvmRefer;
if (isInjvm() == null) {
if (url != null && url.length() > 0) { // 如果指定了url,则不要进行本地引用
isJvmRefer = false;
} else {
// 默认情况下,引用本地服务(如果有)
isJvmRefer = InjvmProtocol.getInjvmProtocol().isInjvmRefer(tmpUrl);
}
} else {
isJvmRefer = isInjvm();
}

if (isJvmRefer) {
URL url = new URL(Constants.LOCAL_PROTOCOL, Constants.LOCALHOST_VALUE, 0, interfaceClass.getName()).addParameters(map);
invoker = refprotocol.refer(interfaceClass, url);
if (logger.isInfoEnabled()) {
logger.info("Using injvm service " + interfaceClass.getName());
}
} else {
if (url != null && url.length() > 0) { // 用户指定的URL,可以是对等地址,也可以是注册中心的地址.
String[] us = Constants.SEMICOLON_SPLIT_PATTERN.split(url);
if (us != null && us.length > 0) {
for (String u : us) {
URL url = URL.valueOf(u);
if (StringUtils.isEmpty(url.getPath())) {
url = url.setPath(interfaceName);
}
if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
urls.add(url.addParameterAndEncoded(Constants.REFER_KEY, StringUtils.toQueryString(map)));
} else {
urls.add(ClusterUtils.mergeUrl(url, map));
}
}
}
} else { // x来自注册中心配置的URL
checkRegistry();
List<URL> us = loadRegistries(false);
if (CollectionUtils.isNotEmpty(us)) {
for (URL u : us) {
URL monitorUrl = loadMonitor(u);
if (monitorUrl != null) {
map.put(Constants.MONITOR_KEY, URL.encode(monitorUrl.toFullString()));
}
urls.add(u.addParameterAndEncoded(Constants.REFER_KEY, StringUtils.toQueryString(map)));
}
}
if (urls.isEmpty()) {
throw new IllegalStateException("No such any registry to reference " + interfaceName + " on the consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion() + ", please config <dubbo:registry address=\"...\" /> to your spring config.");
}
}
//这里的refprotocol.refer即通过registryProtocol来进行发现
if (urls.size() == 1) {
invoker = refprotocol.refer(interfaceClass, urls.get(0));
} else {
List<Invoker<?>> invokers = new ArrayList<Invoker<?>>();
URL registryURL = null;
for (URL url : urls) {
invokers.add(refprotocol.refer(interfaceClass, url));
if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
registryURL = url; // 使用最后一个注册表网址
}
}
if (registryURL != null) { // 注册表网址可用
// 仅在寄存器的群集可用时才使用RegistryAwareCluster
URL u = registryURL.addParameter(Constants.CLUSTER_KEY, RegistryAwareCluster.NAME);
//调用者包装关系将是:RegistryAwareClusterInvoker(StaticDirectory) - > FailoverClusterInvoker(RegistryDirectory,将执行路由) - > Invoker invoker = cluster.join(new StaticDirectory(u, invokers));
} else { // 不是注册表网址,必须直接调用。
//这里要注意 cluster 最终都会被包装成 MockClusterWrapper(SPI的依赖注入)
invoker = cluster.join(new StaticDirectory(invokers));
}
}
}

Boolean c = check;
if (c == null && consumer != null) {
c = consumer.isCheck();
}
if (c == null) {
c = true; // default true
}
if (c && !invoker.isAvailable()) {
// 如果提供者暂时不可用,则允许消费者稍后重试
initialized = false;
throw new IllegalStateException("Failed to check the status of the service " + interfaceName + ". No provider available for the service " + (group == null ? "" : group + "/") + interfaceName + (version == null ? "" : ":" + version) + " from the url " + invoker.getUrl() + " to the consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion());
}
if (logger.isInfoEnabled()) {
logger.info("Refer dubbo service " + interfaceClass.getName() + " from url " + invoker.getUrl());
}
/**
* @since 2.7.0
* ServiceData Store
*/
MetadataReportService metadataReportService = null;
if ((metadataReportService = getMetadataReportService()) != null) {
URL consumerURL = new URL(Constants.CONSUMER_PROTOCOL, map.remove(Constants.REGISTER_IP_KEY), 0, map.get(Constants.INTERFACE_KEY), map);
metadataReportService.publishConsumer(consumerURL);
}
// create service proxy
return (T) proxyFactory.getProxy(invoker);
}
  • 这里可以看到dubbo在服务引用中也可以使用本地服务的发现,但是可看到这一块已经被标记为过时,我的理解是dubbo作为一个RPC框架,本地服务还通过dubbo去调用,肯定与dubbo本身的意义不相匹配,所以便不推荐使用
  • 这块代码我们可以发现同服务暴露一样,会将consumer注册到所有配置的注册中心上去,而refprotocol.refer则是服务引用的核心代码
  • cluster对invoker进行了一层包装,以便应对后续服务调用中出现的异常情况进行处理
  • 最后我们的invoker将通过代理工厂转换为可以调用的代理服务

RegistryProtocal中的refer

RegistryProtocal
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
@Override
@SuppressWarnings("unchecked")
public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
url = url.setProtocol(url.getParameter(REGISTRY_KEY, DEFAULT_REGISTRY)).removeParameter(REGISTRY_KEY);
//获取注册中心
Registry registry = registryFactory.getRegistry(url);
//如果是注册中心的服务,直接返回注册中心类型的invoker
if (RegistryService.class.equals(type)) {
return proxyFactory.getInvoker((T) registry, type, url);
}

// group="a,b" or group="*"
Map<String, String> qs = StringUtils.parseQueryString(url.getParameterAndDecoded(REFER_KEY));
String group = qs.get(Constants.GROUP_KEY);
if (group != null && group.length() > 0) {
if ((COMMA_SPLIT_PATTERN.split(group)).length > 1 || "*".equals(group)) {
return doRefer(getMergeableCluster(), registry, type, url);
}
}
//发现服务
return doRefer(cluster, registry, type, url);
}

private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) {
//创建并设置注册目录对象
RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url);
directory.setRegistry(registry);
directory.setProtocol(protocol);
// all attributes of REFER_KEY
Map<String, String> parameters = new HashMap<String, String>(directory.getUrl().getParameters());
URL subscribeUrl = new URL(CONSUMER_PROTOCOL, parameters.remove(REGISTER_IP_KEY), 0, type.getName(), parameters);
if (!ANY_VALUE.equals(url.getServiceInterface()) && url.getParameter(REGISTER_KEY, true)) {
directory.setRegisteredConsumerUrl(getRegisteredConsumerUrl(subscribeUrl, url));
//注册服务
registry.register(directory.getRegisteredConsumerUrl());
}
directory.buildRouterChain(subscribeUrl);
//订阅服务
directory.subscribe(subscribeUrl.addParameter(CATEGORY_KEY,
PROVIDERS_CATEGORY + "," + CONFIGURATORS_CATEGORY + "," + ROUTERS_CATEGORY));
//装饰Invoker
Invoker invoker = cluster.join(directory);
ProviderConsumerRegTable.registerConsumer(invoker, url, subscribeUrl, directory);
return invoker;
}
  • 在RegistryProtocal中,我们看到了cluster.join(directory),在ReferenceConfig中也出现过,在ReferenceConfig中没有注册中心的时候将直接使用装饰invoker,以供我们接下来服务调用来做集群容错
  • 服务引用在RegistryProtocal中的核心方法即为doRefer方法

RegistryDirectory

RegistryDirectory
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
/**
* 将网址转换为调用者,如果网址已被引用,则不会重新引用。
*
* @param urls
* @return invokers
*/
private Map<String, Invoker<T>> toInvokers(List<URL> urls) {
Map<String, Invoker<T>> newUrlInvokerMap = new HashMap<>();
if (urls == null || urls.isEmpty()) {
return newUrlInvokerMap;
}
Set<String> keys = new HashSet<>();
String queryProtocols = this.queryMap.get(Constants.PROTOCOL_KEY);
for (URL providerUrl : urls) {
// 如果在参考侧配置协议,则仅选择匹配协议
if (queryProtocols != null && queryProtocols.length() > 0) {
boolean accept = false;
String[] acceptProtocols = queryProtocols.split(",");
for (String acceptProtocol : acceptProtocols) {
if (providerUrl.getProtocol().equals(acceptProtocol)) {
accept = true;
break;
}
}
if (!accept) {
continue;
}
}
if (Constants.EMPTY_PROTOCOL.equals(providerUrl.getProtocol())) {
continue;
}
if (!ExtensionLoader.getExtensionLoader(Protocol.class).hasExtension(providerUrl.getProtocol())) {
logger.error(new IllegalStateException("Unsupported protocol " + providerUrl.getProtocol() +
" in notified url: " + providerUrl + " from registry " + getUrl().getAddress() +
" to consumer " + NetUtils.getLocalHost() + ", supported protocol: " +
ExtensionLoader.getExtensionLoader(Protocol.class).getSupportedExtensions()));
continue;
}
URL url = mergeUrl(providerUrl);

String key = url.toFullString(); // 参数URL已排序
if (keys.contains(key)) { //重复的网址
continue;
}
keys.add(key);
// 缓存键是不与消费者方参数合并的URL,无论消费者如何组合参数,如果服务器URL更改,则再次引用
Map<String, Invoker<T>> localUrlInvokerMap = this.urlInvokerMap; // 本地发现
Invoker<T> invoker = localUrlInvokerMap == null ? null : localUrlInvokerMap.get(key);
if (invoker == null) { // 不在缓存中,请再次发现
try {
boolean enabled = true;
if (url.hasParameter(Constants.DISABLED_KEY)) {
enabled = !url.getParameter(Constants.DISABLED_KEY, false);
} else {
enabled = url.getParameter(Constants.ENABLED_KEY, true);
}
if (enabled) {
invoker = new InvokerDelegate<>(protocol.refer(serviceType, url), url, providerUrl);
}
} catch (Throwable t) {
logger.error("Failed to refer invoker for interface:" + serviceType + ",url:(" + url + ")" + t.getMessage(), t);
}
if (invoker != null) { // Put new invoker in cache
newUrlInvokerMap.put(key, invoker);
}
} else {
newUrlInvokerMap.put(key, invoker);
}
}
keys.clear();
return newUrlInvokerMap;
}
  • 那我们的服务最后是如何通相应协议打开consumer和provider的链接呢,关键代码就在RegistryDirectory的toInvokers方法,将url转换成具体的invoker,这个方法在订阅服务的时候会被触发,并且这里做了一层缓存,防止服务被多次引用

DubboProtocal中的refer

DubboProtocol
1
2
3
4
5
6
7
8
9
10
@Override
public <T> Invoker<T> refer(Class<T> serviceType, URL url) throws RpcException {
optimizeSerialization(url);

// create rpc invoker.
DubboInvoker<T> invoker = new DubboInvoker<T>(serviceType, url, getClients(url), invokers);
invokers.add(invoker);

return invoker;
}
  • 这里我们以Dubbo协议为例,看到DubboProtocal中的refer很简单,就是创建一个netty客户端,与provider进行连接返回一个Invoker即完成了一次服务的引用
  • 最后通过ProxyFactory的字节码结束,生成代理的可供调用的服务,到这里dubbo服务引用的流程就结束了,可以看出服务引用与服务暴露的过程中有很多类似的地方,其中还有很多细节没有展开,这也将是后续学习的重点
文章作者: 怀风
文章链接: http://blog.leishunyu.com/2019/03/16/2019-03-16-Dubbo 服务引用/
版权声明: 本博客所有文章除特别声明外,均采用 CC BY-NC-SA 4.0 许可协议。转载请注明来自 Maple