soul源码阅读-soul-web使用websocket策略同步配置原理解析

前言

  • 在之前的文章中说到了soul的http长轮询数据同步策略,这次说一下另一个跟http有点关系的同步策略websocket,websocket策略与长轮询相反,它是admin主动向网关push数据,在官网的文章中,也介绍了websocket的同步原理,结合官网的文章看一下websocket的整体同步流程。

连接建立

在我们将网关同步策略配置为websocket后,启动项目,加载我们在网关配置的同步策略

1
2
3
4
soul :
sync:
websocket :
urls: ws://localhost:9095/websocket # 如果是网关集群,这里可以配置多个,用','分割,比如ws://localhost:9095/websocket,ws://localhost:9096/websocket

之后会向目标地址建立websocket连接,在建立websocket连接的时候,会对每个连接设置30s间隔的断线重连,具体代码如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
public WebsocketSyncDataService(final WebsocketConfig websocketConfig,
final PluginDataSubscriber pluginDataSubscriber,
final List<MetaDataSubscriber> metaDataSubscribers,
final List<AuthDataSubscriber> authDataSubscribers) {
// 解析url,根据url的数量,创建对应的守护调度线程
String[] urls = StringUtils.split(websocketConfig.getUrls(), ",");
executor = new ScheduledThreadPoolExecutor(urls.length, SoulThreadFactory.create("websocket-connect", true));
for (String url : urls) {
try {
clients.add(new SoulWebsocketClient(new URI(url), Objects.requireNonNull(pluginDataSubscriber), metaDataSubscribers, authDataSubscribers));
} catch (URISyntaxException e) {
log.error("websocket url({}) is error", url, e);
}
}
try {
// 对每个连接都设置同步阻塞方式,30s超时时间
for (WebSocketClient client : clients) {
boolean success = client.connectBlocking(3000, TimeUnit.MILLISECONDS);
if (success) {
log.info("websocket connection is successful.....");
} else {
log.error("websocket connection is error.....");
}
// 连接成功后,每30s检查一下心跳
executor.scheduleAtFixedRate(() -> {
try {
if (client.isClosed()) {
boolean reconnectSuccess = client.reconnectBlocking();
if (reconnectSuccess) {
log.info("websocket reconnect is successful.....");
} else {
log.error("websocket reconnection is error.....");
}
}
} catch (InterruptedException e) {
log.error("websocket connect is error :{}", e.getMessage());
}
}, 10, 30, TimeUnit.SECONDS);
}
/* client.setProxy(new Proxy(Proxy.Type.HTTP, new InetSocketAddress("proxyaddress", 80)));*/
} catch (InterruptedException e) {
log.info("websocket connection...exception....", e);
}

}

主动Push

  • 此时web与admin的连接就建立完成了,只需要等待admin发生修改操作,将数据push到web更新本地缓存就好,我们来看看admin的push过程,以及web收到通知后的操作,在admin修改一个配置提交,定位到PluginServiceImpl#createOrUpdate方法。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public String createOrUpdate(final PluginDTO pluginDTO) {
final String msg = checkData(pluginDTO);
if (StringUtils.isNoneBlank(msg)) {
return msg;
}
PluginDO pluginDO = PluginDO.buildPluginDO(pluginDTO);
DataEventTypeEnum eventType = DataEventTypeEnum.CREATE;
if (StringUtils.isBlank(pluginDTO.getId())) {
pluginMapper.insertSelective(pluginDO);
} else {
eventType = DataEventTypeEnum.UPDATE;
pluginMapper.updateSelective(pluginDO);
}

// publish change event.
eventPublisher.publishEvent(new DataChangedEvent(ConfigGroupEnum.PLUGIN, eventType,
Collections.singletonList(PluginTransfer.INSTANCE.mapToData(pluginDO))));
return StringUtils.EMPTY;
}

在admin中,与其他的数据同步策略处理一样,都是通过Spring事件发布机制处理,看下最终通过DataChangedEventDispatcher定位到的方法

1
2
3
4
5
public void onPluginChanged(final List<PluginData> pluginDataList, final DataEventTypeEnum eventType) {
WebsocketData<PluginData> websocketData =
new WebsocketData<>(ConfigGroupEnum.PLUGIN.name(), eventType.name(), pluginDataList);
WebsocketCollector.send(GsonUtils.getInstance().toJson(websocketData), eventType);
}

通过listener定位到了websocketDataChangedListener中,之后通过WebsocketCollector#send方法发送数据,看看WebsocketCollector#send方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public static void send(final String message, final DataEventTypeEnum type) {
if (StringUtils.isNotBlank(message)) {
if (DataEventTypeEnum.MYSELF == type) {
try {
Session session = (Session) ThreadLocalUtil.get(SESSION_KEY);
if (session != null) {
session.getBasicRemote().sendText(message);
}
} catch (IOException e) {
log.error("websocket send result is exception: ", e);
}
return;
}
for (Session session : SESSION_SET) {
try {
session.getBasicRemote().sendText(message);
} catch (IOException e) {
log.error("websocket send result is exception: ", e);
}
}
}
}
  • 在admin通过websocket发送之后,SoulWebsocketClient#onMessage接收数据最终在handleResult方法进行处理
1
2
3
4
5
6
7
8
9
10
11
12
private void handleResult(final String result) {
// json反序列化之后,获取到configGroup和eventType事件类型最终进行处理
WebsocketData websocketData = GsonUtils.getInstance().fromJson(result, WebsocketData.class);
ConfigGroupEnum groupEnum = ConfigGroupEnum.acquireByName(websocketData.getGroupType());
String eventType = websocketData.getEventType();
String json = GsonUtils.getInstance().toJson(websocketData.getData());
websocketDataHandler.executor(groupEnum, json, eventType);
}
// WebsocketDataHandler#executor
public void executor(final ConfigGroupEnum type, final String json, final String eventType) {
ENUM_MAP.get(type).handle(json, eventType);
}
  • 然后定位到了AbstractDataHandler中的handle方法(又是一个运用了模版模式的地方)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public void handle(final String json, final String eventType) {
// 解析json后根据不同的具体实现类和eventType,定位到具体的实现类中进行执行
List<T> dataList = convert(json);
if (CollectionUtils.isNotEmpty(dataList)) {
DataEventTypeEnum eventTypeEnum = DataEventTypeEnum.acquireByName(eventType);
switch (eventTypeEnum) {
case REFRESH:
case MYSELF:
doRefresh(dataList);
break;
case UPDATE:
case CREATE:
doUpdate(dataList);
break;
case DELETE:
doDelete(dataList);
break;
default:
break;
}
}
}
  • 我们修改插件,因此是Plugin类型,最终定位到了PluginDataHandler#doUpdate方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
protected void doUpdate(final List<PluginData> dataList) {
// 就一行代码,依次订阅,而具体查看这个onSubscribe方法时,PluginDataSubscriber就只有一个具体实现CommonPluginDataSubscriber,最终会定位到CommonPluginDataSubscriber的具体实现中
dataList.forEach(pluginDataSubscriber::onSubscribe);
}

//CommonPluginDataSubscriber
//逻辑上处理也相对简单,判断事件类型,更新对应类型的缓存
//每个分支下面都用到了BaseDataCache,见名知意,网关里面各种插件数据类型的数据都缓存在这里了。
//之后的handler.xxx方法,其实就是对应的数据类型的处理器了,接口里面大多使用了默认实现,具体处理器实现接口根据需要进行重写
private <T> void subscribeDataHandler(final T classData, final DataEventTypeEnum dataType) {
Optional.ofNullable(classData).ifPresent(data -> {
if (data instanceof PluginData) {
PluginData pluginData = (PluginData) data;
if (dataType == DataEventTypeEnum.UPDATE) {
BaseDataCache.getInstance().cachePluginData(pluginData);
Optional.ofNullable(handlerMap.get(pluginData.getName())).ifPresent(handler -> handler.handlerPlugin(pluginData));
} else if (dataType == DataEventTypeEnum.DELETE) {
BaseDataCache.getInstance().removePluginData(pluginData);
Optional.ofNullable(handlerMap.get(pluginData.getName())).ifPresent(handler -> handler.removePlugin(pluginData));
}
} else if (data instanceof SelectorData) {
SelectorData selectorData = (SelectorData) data;
if (dataType == DataEventTypeEnum.UPDATE) {
BaseDataCache.getInstance().cacheSelectData(selectorData);
Optional.ofNullable(handlerMap.get(selectorData.getPluginName())).ifPresent(handler -> handler.handlerSelector(selectorData));
} else if (dataType == DataEventTypeEnum.DELETE) {
BaseDataCache.getInstance().removeSelectData(selectorData);
Optional.ofNullable(handlerMap.get(selectorData.getPluginName())).ifPresent(handler -> handler.removeSelector(selectorData));
}
} else if (data instanceof RuleData) {
RuleData ruleData = (RuleData) data;
if (dataType == DataEventTypeEnum.UPDATE) {
BaseDataCache.getInstance().cacheRuleData(ruleData);
Optional.ofNullable(handlerMap.get(ruleData.getPluginName())).ifPresent(handler -> handler.handlerRule(ruleData));
} else if (dataType == DataEventTypeEnum.DELETE) {
BaseDataCache.getInstance().removeRuleData(ruleData);
Optional.ofNullable(handlerMap.get(ruleData.getPluginName())).ifPresent(handler -> handler.removeRule(ruleData));
}
}
});
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
// 比如divide插件,就只充血了3个方法,其余的都是用接口默认方法。
public class DividePluginDataHandler implements PluginDataHandler {

@Override
public void handlerSelector(final SelectorData selectorData) {
UpstreamCacheManager.getInstance().submit(selectorData);
}

@Override
public void removeSelector(final SelectorData selectorData) {
UpstreamCacheManager.getInstance().removeByKey(selectorData.getId());
}

@Override
public String pluginNamed() {
return PluginEnum.DIVIDE.getName();
}
}

结尾

  • 到这里,websocket的数据同步策略一次具体的同步流程就完成了,soul在进行设计的时候进行了高度抽象,大部分的代码都做到了复用,具体的功能特性只需进行少量定制实现就好。最后理一下websocket数据同步策略的流程:

img

-------------本文结束感谢您的阅读-------------