Dubbo 服务暴露

Dubbo 服务暴露

服务暴露

  • 大家都知道Dubbo是由consumer,provider,registry这三大部分组成
  • 那么provider的如何将服务提供给consumer调用呢,就是通过服务暴露来实现的,也就是把我们原来单机架构中的接口,对外部暴露

服务暴露的流程

  • dubbo服务暴露的流程大概如上图,在dubbo中,所有的服务都会被包装成一个invoker,这一点也将贯穿今后整个学习
  • dubbo服务暴露可以理解为两部分:本地暴露,远程暴露
  • 本地暴露的接口通常用于我们直接invoke dubbo的接口,以及有些时候我们的服务既是provider又是consumer,避免远程调用造成的资源浪费
  • 远程暴露则是将服务信息注册到registry,并且将服务通过网络提供给其他应用调用

源码解析

初始化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public class ServiceBean<T> extends ServiceConfig<T> implements InitializingBean, DisposableBean, ApplicationContextAware, ApplicationListener<ContextRefreshedEvent>, BeanNameAware {
private static final long serialVersionUID = 213195494150089726L;
/**
*此处省略其他代码
**/

@Override
public void onApplicationEvent(ContextRefreshedEvent event) {
if (!isExported() && !isUnexported()) {
if (logger.isInfoEnabled()) {
logger.info("The service ready on spring started. service: " + getInterface());
}
//服务暴露
export();
}
}

@Override
@SuppressWarnings({"unchecked", "deprecation"})
public void afterPropertiesSet() throws Exception {
//此处省略....
if (!supportedApplicationListener) {
//服务暴露
export();
}
}
}
  • 首先我们来看一下ServiceBean,ServiceBean实现了InitializingBean, ApplicationContextAware, ApplicationListener有没有觉得很熟悉,实现这几个类就能在spring初始化的时候do something

暴露服务

  • 我们看到这里都调用了export()方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public synchronized void export() {
if (provider != null) {
if (export == null) {
export = provider.getExport();
}
if (delay == null) {
delay = provider.getDelay();
}
}
if (export != null && !export) {
return;
}

if (delay != null && delay > 0) {
delayExportExecutor.schedule(new Runnable() {
@Override
public void run() {
doExport();
}
}, delay, TimeUnit.MILLISECONDS);
} else {
doExport();
}
}
  • 这里看到ServiceConfig.export方法上加了一个锁,用来保证不会重复暴露服务,抛开上面的逻辑判断,在第一次初始化的时候,是直接走到了doExport()方法
1
2
3
4
5
6
7
8
9
10
11
12
13
protected synchronized void doExport() {
//省略判断代码
doExportUrls();
}

private void doExportUrls() {
//加载注册中心的配置
List<URL> registryURLs = loadRegistries(true);
//把使用的协议注册到注册中心
for (ProtocolConfig protocolConfig : protocols) {
doExportUrlsFor1Protocol(protocolConfig, registryURLs);
}
}
  • 这里dubbo支持多协议,可以看到通过for循环可以把配置的多种协议都导出,进行暴露
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs) {
//省略判断代码
// 导出服务
String contextPath = protocolConfig.getContextpath();
if ((contextPath == null || contextPath.length() == 0) && provider != null) {
contextPath = provider.getContextpath();
}

String host = this.findConfigedHosts(protocolConfig, registryURLs, map);
Integer port = this.findConfigedPorts(protocolConfig, name, map);
URL url = new URL(name, host, port, (contextPath == null || contextPath.length() == 0 ? "" : contextPath + "/") + path, map);

if (ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class)
.hasExtension(url.getProtocol())) {
url = ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class)
.getExtension(url.getProtocol()).getConfigurator(url).configure(url);
}

String scope = url.getParameter(Constants.SCOPE_KEY);
// 没有配置时不导出
if (!Constants.SCOPE_NONE.equalsIgnoreCase(scope)) {
// 导出本地服务
if (!Constants.SCOPE_REMOTE.equalsIgnoreCase(scope)) {
exportLocal(url);
}
// 导出远程服务
if (!Constants.SCOPE_LOCAL.equalsIgnoreCase(scope)) {
if (logger.isInfoEnabled()) {
logger.info("Export dubbo service " + interfaceClass.getName() + " to url " + url);
}
if (registryURLs != null && !registryURLs.isEmpty()) {
//将服务都注册到当前已有的注册中心上去
for (URL registryURL : registryURLs) {
url = url.addParameterIfAbsent(Constants.DYNAMIC_KEY, registryURL.getParameter(Constants.DYNAMIC_KEY));
//判断是否有监控中心
URL monitorUrl = loadMonitor(registryURL);
if (monitorUrl != null) {
url = url.addParameterAndEncoded(Constants.MONITOR_KEY, monitorUrl.toFullString());
}
if (logger.isInfoEnabled()) {
logger.info("Register dubbo service " + interfaceClass.getName() + " url " + url + " to registry " + registryURL);
}
//对于providers,这用于启用自定义代理以生成invoker
String proxy = url.getParameter(Constants.PROXY_KEY);
if (StringUtils.isNotEmpty(proxy)) {
registryURL = registryURL.addParameter(Constants.PROXY_KEY, proxy);
}

Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString()));
//包装调用者和所有元数据的Invoker包装器
DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);

Exporter<?> exporter = protocol.export(wrapperInvoker);
exporters.add(exporter);
}
} else {
//没有注册中心直接暴露
Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, url);
//包装调用者和所有元数据的Invoker包装器
DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
Exporter<?> exporter = protocol.export(wrapperInvoker);
exporters.add(exporter);
}
}
}
this.urls.add(url);
}

//暴露本地服务
private void exportLocal(URL url) {
if (!Constants.LOCAL_PROTOCOL.equalsIgnoreCase(url.getProtocol())) {
//手动暴露一个本地服务
URL local = URL.valueOf(url.toFullString())
.setProtocol(Constants.LOCAL_PROTOCOL)
.setHost(LOCALHOST)
.setPort(0);
Exporter<?> exporter = protocol.export(
proxyFactory.getInvoker(ref, (Class) interfaceClass, local));
exporters.add(exporter);
logger.info("Export dubbo service " + interfaceClass.getName() + " to local registry");
}
}
  • 这你scope配置默认值是null,则本地服务和远程服务都导出,另外如果没有配置注册中心,将直接将接口暴露出去,我们可以根据自己所在的场景,选择都暴露还是指定暴露
  • 由于dubbo也是支持多注册中心的,所以可以通过for循环,将多个服务都注册到当前已有的注册中心上去
  • 在exportLocal方法这是将配置中解析好的url参数手动修改成本地协议进行服务暴露
  • ProxyFactory是通过SPI获取JavassistProxyFactory靠Javassist字节码技术动态的生成Invoker类,大家有兴趣的可以下去了解一下

暴露的细节

ProtocolFilterWrapper
1
2
3
4
5
6
7
8
9
10
11
public class ProtocolFilterWrapper implements Protocol {

private final Protocol protocol;
//装饰者模式
public ProtocolFilterWrapper(Protocol protocol) {
if (protocol == null) {
throw new IllegalArgumentException("protocol == null");
}
this.protocol = protocol;
}
}
  • 在ServiceConfig中,通过SPI获取相应的Protocol,SPI中会对实现类进行装饰,每次执行protocol.exprot()方法的时候,其实都是执行的ProtocolFilterWrapper的protocol.exprot方法
ProtocolFilterWrapper
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
@Override
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
//如果是注册中心协议直接导出
if (Constants.REGISTRY_PROTOCOL.equals(invoker.getUrl().getProtocol())) {
return protocol.export(invoker);
}
//如果不是则执行整个filter的责任链
return protocol.export(buildInvokerChain(invoker, Constants.SERVICE_FILTER_KEY, Constants.PROVIDER));
}

//责任链模式,对filter进行逐个执行
private static <T> Invoker<T> buildInvokerChain(final Invoker<T> invoker, String key, String group) {
Invoker<T> last = invoker;
List<Filter> filters = ExtensionLoader.getExtensionLoader(Filter.class).getActivateExtension(invoker.getUrl(), key, group);
if (!filters.isEmpty()) {
for (int i = filters.size() - 1; i >= 0; i--) {
final Filter filter = filters.get(i);
final Invoker<T> next = last;
last = new Invoker<T>() {

@Override
public Class<T> getInterface() {
return invoker.getInterface();
}

@Override
public URL getUrl() {
return invoker.getUrl();
}

@Override
public boolean isAvailable() {
return invoker.isAvailable();
}

@Override
public Result invoke(Invocation invocation) throws RpcException {
Result result = filter.invoke(next, invocation);
if (result instanceof AsyncRpcResult) {
AsyncRpcResult asyncResult = (AsyncRpcResult) result;
asyncResult.thenApplyWithContext(r -> filter.onResponse(r, invoker, invocation));
return asyncResult;
} else {
return filter.onResponse(result, invoker, invocation);
}
}

@Override
public void destroy() {
invoker.destroy();
}

@Override
public String toString() {
return invoker.toString();
}
};
}
}
return last;
}
  • 由上述代码我们可以看到,dubbo中运用装饰者模式和责任链模式,对我们提供的服务做了一次封装,最终转换成我们需要的invoker对外暴露
  • 要注意到的是,当我们的协议是registry也就是注册协议的时候,是不需要进行构建责任链的

本地暴露

InjvmProtocol
1
2
3
4
@Override
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
return new InjvmExporter<T>(invoker, invoker.getUrl().getServiceKey(), exporterMap);
}
  • 如果是本地暴露,则通过SPI拿到InjvmProtocol,最终通过injvm协议导出InjvmExporter

远程暴露

DubboProtocol
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
@Override
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
URL url = invoker.getUrl();
// 导出服务.
String key = serviceKey(url);
DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);
exporterMap.put(key, exporter);
//导出根服务以进行调度事件
Boolean isStubSupportEvent = url.getParameter(Constants.STUB_EVENT_KEY, Constants.DEFAULT_STUB_EVENT);
Boolean isCallbackservice = url.getParameter(Constants.IS_CALLBACK_SERVICE, false);
if (isStubSupportEvent && !isCallbackservice) {
String stubServiceMethods = url.getParameter(Constants.STUB_EVENT_METHODS_KEY);
if (stubServiceMethods == null || stubServiceMethods.length() == 0) {
if (logger.isWarnEnabled()) {
logger.warn(new IllegalStateException("consumer [" + url.getParameter(Constants.INTERFACE_KEY) +
"], has set stubproxy support event ,but no stub methods founded."));
}
} else {
stubServiceMethodsMap.put(url.getServiceKey(), stubServiceMethods);
}
}
//打开服务
openServer(url);
optimizeSerialization(url);
return exporter;
}

//打开服务
private void openServer(URL url) {
// find server.
String key = url.getAddress();
//客户端可以导出仅供服务器调用的服务
boolean isServer = url.getParameter(Constants.IS_SERVER_KEY, true);
if (isServer) {
ExchangeServer server = serverMap.get(key);
if (server == null) {
synchronized (this) {
server = serverMap.get(key);
if (server == null) {
//如果服务不存在,创建服务
serverMap.put(key, createServer(url));
}
}
} else {
// 服务器支持重置,与override一起使用
server.reset(url);
}
}
}
  • 在dubbo协议中,我们看到在服务导出的时候会根据配置地址,打开netty服务,也就是通过这一步,开启了RPC端口,使consumer通过TCP协议进行服务调用

服务注册

RegistryProtocol
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
@Override
public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
URL registryUrl = getRegistryUrl(originInvoker);
// url在本地导出
URL providerUrl = getProviderUrl(originInvoker);
// 订阅覆盖数据
// 同样的服务由于订阅是带有服务名称的缓存密钥,因此会导致订阅信息覆盖。
final URL overrideSubscribeUrl = getSubscribedOverrideUrl(providerUrl);
final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker);
overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);

providerUrl = overrideUrlWithConfig(providerUrl, overrideSubscribeListener);
//导出invoker
final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker, providerUrl);
//将url注册到注册中心
final Registry registry = getRegistry(originInvoker);
final URL registeredProviderUrl = getRegisteredProviderUrl(providerUrl, registryUrl);
ProviderInvokerWrapper<T> providerInvokerWrapper = ProviderConsumerRegTable.registerProvider(originInvoker,
registryUrl, registeredProviderUrl);
//判断我们是否需要推迟发布
boolean register = registeredProviderUrl.getParameter("register", true);
if (register) {
register(registryUrl, registeredProviderUrl);
providerInvokerWrapper.setReg(true);
}
// Deprecated! Subscribe to override rules in 2.6.x or before.
registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);
exporter.setRegisterUrl(registeredProviderUrl);
exporter.setSubscribeUrl(overrideSubscribeUrl);
//确保每次导出时都返回一个新的导出器实例
return new DestroyableExporter<>(exporter);
}

//真正导出服务的地方
private <T> ExporterChangeableWrapper<T> doLocalExport(final Invoker<T> originInvoker, URL providerUrl) {
String key = getCacheKey(originInvoker);
ExporterChangeableWrapper<T> exporter = (ExporterChangeableWrapper<T>) bounds.get(key);
if (exporter == null) {
synchronized (bounds) {
exporter = (ExporterChangeableWrapper<T>) bounds.get(key);
if (exporter == null) {

final Invoker<?> invokerDelegete = new InvokerDelegate<T>(originInvoker, providerUrl);
//以dubbo协议为例,这里才是真正调用DubboProtocol.exprot的地方
exporter = new ExporterChangeableWrapper<T>((Exporter<T>) protocol.export(invokerDelegete), originInvoker);
bounds.put(key, exporter);
}
}
}
return exporter;
}
  • 大家这里可以跟进源码,会发现,在ServiceConfig中通过proxyFactory生成的Invoker的url指向的协议其实是registry,所以在ServiceConfig中protocol.exprot调用的是RegistryProtocol的exprot方法
  • 在RegistryProtocol中调用了真正的远程服务暴露的方法,即DubboProtocol(以dubbo协议为例),在远程服务暴露成功后,将服务信息注册到registry上去,由此完成了一个服务的导出
  • 至此Dubbo服务暴露中的大致流程已经完成了,后面将会对Dubbo如何通过ProxyFactory生成Invoker,以及Registry是如何进行注册的进行更加深入的学习
文章作者: 怀风
文章链接: http://blog.leishunyu.com/2019/01/25/2019-01-25-Dubbo 服务暴露/
版权声明: 本博客所有文章除特别声明外,均采用 CC BY-NC-SA 4.0 许可协议。转载请注明来自 Maple