Z


  • 首页

  • 标签

  • 分类

  • 归档

  • 关于

soul-dubbo例子使用

发表于 2021-01-27 | 分类于 网关 , soul , Java

运行项目

依次启动soul-admin,soul-bootstrap,soul-examples-apache-dubbo-service三个服务,观察apache-dubbo的日志,启动的时候就会将配置的接口注册到网关中,项目中使用@SoulDubboClient注解定义了网关要代理的路径,注解本身,是利用Spring的beanPostProcesstor特性进行加工处理,当然也可以通过控制台在页面进行操作。

阅读全文 »

soul源码阅读-sofa插件使用

发表于 2021-01-27 | 分类于 网关 , soul , Java

启动

  • 参考官网介绍,在soul-bootstarp中,打开sofa相关依赖,启动soul-admin和soul-bootstrap,在启动example下soul-examples-sofa
  • 在启动过程中遇到sofa注解的接口失败的问题,debug源码的时候发现,接口在注册的时候,整体流程如下

阅读全文 »

soul源码阅读-springCloud插件使用

发表于 2021-01-27 | 分类于 网关 , soul , Java

启动

  • 在soul-bootstarp的pom中,打开Spring cloud的依赖。
  • 参考官网介绍启动soul-admin、soul-bootstrap、eureka-server,再启动example下soul-examples-spring cloud
阅读全文 »

soul源码阅读-soulSpringCloudExample调用失败的原因及插件解析

发表于 2021-01-27 | 分类于 网关 , soul , Java

原因

  • 在一开始使用spring cloud插件的时候,调用网关没有得到请求,一直返回:Can not find url, please check your configuration!
  • 先说原因,是因为在bootstrap中,没有打开spring cloud的相关依赖导致的,具体分析如下。

阅读全文 »

soul源码阅读-ApacheDubboPlugin插件解析

发表于 2021-01-27 | 分类于 网关 , soul , Java
  • 本文介绍apache dubbo插件的具体实现,在看代码之前,介绍一下dubbo插件干的活,以及dubbo插件的关键数据结构,再具体看看doExecute方法实现

  • dubbo插件,主要作用是用来配合使用dubbo的项目,在使用dubbo协议进行调用时候,网关也可以拦截请求进行转发,相当于为了dubbo项目做了适配,而在dubbo里面,有一个元数据的概念,里面保存了使用dubbo进行rpc的时候,双方的接口信息,所以soul在实现dubbo协议的时候,也对其元数据做了适配,以此保证rpc的正确性。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
      
    protected Mono<Void> doExecute(final ServerWebExchange exchange, final SoulPluginChain chain, final SelectorData selector, final RuleData rule) {
    // 获取dubbo调用的相关数据校验
    String body = exchange.getAttribute(Constants.DUBBO_PARAMS);
    SoulContext soulContext = exchange.getAttribute(Constants.CONTEXT);
    assert soulContext != null;
    MetaData metaData = exchange.getAttribute(Constants.META_DATA);
    if (!checkMetaData(metaData)) {
    assert metaData != null;
    log.error(" path is :{}, meta data have error.... {}", soulContext.getPath(), metaData.toString());
    exchange.getResponse().setStatusCode(HttpStatus.INTERNAL_SERVER_ERROR);
    Object error = SoulResultWrap.error(SoulResultEnum.META_DATA_ERROR.getCode(), SoulResultEnum.META_DATA_ERROR.getMsg(), null);
    return WebFluxResultUtils.result(exchange, error);
    }
    if (StringUtils.isNoneBlank(metaData.getParameterTypes()) && StringUtils.isBlank(body)) {
    exchange.getResponse().setStatusCode(HttpStatus.INTERNAL_SERVER_ERROR);
    Object error = SoulResultWrap.error(SoulResultEnum.DUBBO_HAVE_BODY_PARAM.getCode(), SoulResultEnum.DUBBO_HAVE_BODY_PARAM.getMsg(), null);
    return WebFluxResultUtils.result(exchange, error);
    }
    // 最终通过dubbo泛化调用得到返回值
    final Mono<Object> result = dubboProxyService.genericInvoker(body, metaData, exchange);
    return result.then(chain.execute(exchange));
    }
  • 可以看到,插件的主要部分是进行调用接口的相关参数做处理,核心的地方在处理完成之后使用泛化调用对真实后端服务进行调用的部分,看看soul与dubbo集成时,在泛化调用的时候都做了什么

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    public Mono<Object> genericInvoker(final String body, final MetaData metaData, final ServerWebExchange exchange) throws SoulException {
    // issue(https://github.com/dromara/soul/issues/471), add dubbo tag route
    // 确定路由标签
    String dubboTagRouteFromHttpHeaders = exchange.getRequest().getHeaders().getFirst(Constants.DUBBO_TAG_ROUTE);
    if (StringUtils.isNotBlank(dubboTagRouteFromHttpHeaders)) {
    RpcContext.getContext().setAttachment(CommonConstants.TAG_KEY, dubboTagRouteFromHttpHeaders);
    }
    // 从元信息中获取调用路径,得到ReferenceConfig并对其进行初始化判断。
    ReferenceConfig<GenericService> reference = ApplicationConfigCache.getInstance().get(metaData.getPath());
    if (Objects.isNull(reference) || StringUtils.isEmpty(reference.getInterface())) {
    ApplicationConfigCache.getInstance().invalidate(metaData.getPath());
    reference = ApplicationConfigCache.getInstance().initRef(metaData);
    }
    // 获取泛化调用对象genericService,对之前从http的body中获取的dubbo调用的相关参数进行检查,最终得到调用的键值对进行远程调用
    GenericService genericService = reference.get();
    Pair<String[], Object[]> pair;
    if (ParamCheckUtils.dubboBodyIsEmpty(body)) {
    pair = new ImmutablePair<>(new String[]{}, new Object[]{});
    } else {
    pair = dubboParamResolveService.buildParameter(body, metaData.getParameterTypes());
    }
    CompletableFuture<Object> future = genericService.$invokeAsync(metaData.getMethodName(), pair.getLeft(), pair.getRight());
    return Mono.fromFuture(future.thenApply(ret -> {
    if (Objects.isNull(ret)) {
    ret = Constants.DUBBO_RPC_RESULT_EMPTY;
    }
    exchange.getAttributes().put(Constants.DUBBO_RPC_RESULT, ret);
    exchange.getAttributes().put(Constants.CLIENT_RESPONSE_RESULT_TYPE, ResultEnum.SUCCESS.getName());
    return ret;
    })).onErrorMap(exception -> exception instanceof GenericException ? new SoulException(((GenericException) exception).getExceptionMessage()) : new SoulException(exception));
    }
  • 至此网关收到客户端请求后,使用dubbo进行一次远程调用的过程就结束了,大体流程相对比较简单,在调用之前先进行参数的校验,组合处理,之后直接使用泛化调用来拿到实际结果。

soul源码阅读,divide插件探活,负载均衡,路径选择解析

发表于 2021-01-27 | 分类于 网关 , soul , Java
  • Divide插件,作为soul进行http协议请求处理的核心插件,本文介绍divide具体工作流程,插件提供的负载均衡,服务探活具体实现,下面是插件的主体逻辑,网关在接收到客户端的请求时divide插件干的活全在这里。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
protected Mono<Void> doExecute(final ServerWebExchange exchange, final SoulPluginChain chain, final SelectorData selector, final RuleData rule) {
final SoulContext soulContext = exchange.getAttribute(Constants.CONTEXT);
assert soulContext != null;
final DivideRuleHandle ruleHandle = GsonUtils.getInstance().fromJson(rule.getHandle(), DivideRuleHandle.class);
// 确保请求的上游服务存在
final List<DivideUpstream> upstreamList = UpstreamCacheManager.getInstance().findUpstreamListBySelectorId(selector.getId());
if (CollectionUtils.isEmpty(upstreamList)) {
log.error("divide upstream configuration error: {}", rule.toString());
Object error = SoulResultWrap.error(SoulResultEnum.CANNOT_FIND_URL.getCode(), SoulResultEnum.CANNOT_FIND_URL.getMsg(), null);
return WebFluxResultUtils.result(exchange, error);
}
final String ip = Objects.requireNonNull(exchange.getRequest().getRemoteAddress()).getAddress().getHostAddress();
// 负载均衡,确定实际的上游后端服务地址
DivideUpstream divideUpstream = LoadBalanceUtils.selector(upstreamList, ruleHandle.getLoadBalance(), ip);
if (Objects.isNull(divideUpstream)) {
log.error("divide has no upstream");
Object error = SoulResultWrap.error(SoulResultEnum.CANNOT_FIND_URL.getCode(), SoulResultEnum.CANNOT_FIND_URL.getMsg(), null);
return WebFluxResultUtils.result(exchange, error);
}
// set the http url
String domain = buildDomain(divideUpstream);
String realURL = buildRealURL(domain, soulContext, exchange);
exchange.getAttributes().put(Constants.HTTP_URL, realURL);
// set the http timeout
exchange.getAttributes().put(Constants.HTTP_TIME_OUT, ruleHandle.getTimeout());
exchange.getAttributes().put(Constants.HTTP_RETRY, ruleHandle.getRetry());
return chain.execute(exchange);
}
  • 之前文章说了关于http请求,实际的执行请求转发插件是webClientPlugin,其他的比如divide,springCloud插件,都是对当前http请求进行加工处理,处理完成交给webClientP lugin发送。
  • UpstreamCacheManager是上游后端服务的缓存,divide插件处理过程中要确保此次请求的服务是存在的,才能保证后面的转发插件能正确转发请求到实际服务,因此就需要本地保存目前真实存在的后端服务,不影响性能的情况下还能确保请求转发的正确性,同时还需要发送心跳检查后端服务健康情况。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
private UpstreamCacheManager() {
// 初始化探活调度器
boolean check = Boolean.parseBoolean(System.getProperty("soul.upstream.check", "false"));
if (check) {
new ScheduledThreadPoolExecutor(1, SoulThreadFactory.create("scheduled-upstream-task", false))
.scheduleWithFixedDelay(this::scheduled,
30, Integer.parseInt(System.getProperty("soul.upstream.scheduledTime", "30")), TimeUnit.SECONDS);
}
}
// 根据选择器,选择是否添加到上游服务缓存
public void submit(final SelectorData selectorData) {
final List<DivideUpstream> upstreamList = GsonUtils.getInstance().fromList(selectorData.getHandle(), DivideUpstream.class);
if (null != upstreamList && upstreamList.size() > 0) {
UPSTREAM_MAP.put(selectorData.getId(), upstreamList);
UPSTREAM_MAP_TEMP.put(selectorData.getId(), upstreamList);
} else {
UPSTREAM_MAP.remove(selectorData.getId());
UPSTREAM_MAP_TEMP.remove(selectorData.getId());
}
}

// 在进行调度的时候去检查服务是否可用
private void scheduled() {
if (UPSTREAM_MAP.size() > 0) {
UPSTREAM_MAP.forEach((k, v) -> {
List<DivideUpstream> result = check(v);
if (result.size() > 0) {
UPSTREAM_MAP_TEMP.put(k, result);
} else {
UPSTREAM_MAP_TEMP.remove(k);
}
});
}
}


private List<DivideUpstream> check(final List<DivideUpstream> upstreamList) {
List<DivideUpstream> resultList = Lists.newArrayListWithCapacity(upstreamList.size());
for (DivideUpstream divideUpstream : upstreamList) {
// 通过socket连接检查上游后端服务
final boolean pass = UpstreamCheckUtils.checkUrl(divideUpstream.getUpstreamUrl());
if (pass) {
if (!divideUpstream.isStatus()) {
divideUpstream.setTimestamp(System.currentTimeMillis());
divideUpstream.setStatus(true);
log.info("UpstreamCacheManager detect success the url: {}, host: {} ", divideUpstream.getUpstreamUrl(), divideUpstream.getUpstreamHost());
}
resultList.add(divideUpstream);
} else {
divideUpstream.setStatus(false);
log.error("check the url={} is fail ", divideUpstream.getUpstreamUrl());
}
}
return resultList;

}
  • 这个缓存管理器里面,有2个缓存,一个UPSTREAM_MAP,一个UPSTREAM_MAP_TEMP,后者是内部实时更新,对外提供健康服务地址的缓存,前者是作为一致性检查的缓存,在进行调度的时候,根据UPSTREAM_MAP的内容,会对UPSTREAM_MAP_TEMP进行更新,保证每次心跳检测时两个缓存内容一致。
  • 对缓存地址的添加则是在DividePluginDataHandler中操作的,当admin更新了后端服务地址时,通过发布订阅机制,本地的缓存会收到消息进行更新。

soul源码阅读-使用@SoulSpringMvcClient将接口注册到网关流程解析

发表于 2021-01-27 | 分类于 网关 , soul , Java
  • 在使用soul将我们编写的controller接口注册到网关,由网关统一代理时,一般情况,http方式,我们只需要使用@SoulSpringMvcClient注解标注在对应的接口上就行了,那么,我们使用了注解之后,soul是如何将我们的接口注册到网关的呢
  • 先看看注解在哪些地方被使用了

阅读全文 »

soul源码阅读-soul-web使用http长轮询策略同步配置原理解析

发表于 2021-01-27 | 分类于 网关 , soul , Java

简介

  • 在参考soul官网研究soul的数据同步策略的时候发现,soul目前支持的数据同步策略有zk,websocket和http长轮询,而zk和websocke是主动push的策略,在admin进行配置修改的时候才会触发,长轮询作为pull策略,通过soul-web主动向admin发起数据同步策略,唯一的pull策略引发了兴趣,研究下soul如何实现的。
  • 在官网的介绍里说了http长轮询的执行流程,web网关,会定时向admin发起数据同步的长轮询请求,而admin则在接受到请求后,使用servlet3.0的异步特性,先将请求放入一个阻塞队列ArrayBlockingQueue中保存,发生了数据变更时,会将队列中的所有任务出队依次处理并响应,如果超时,则将队列中的头部元素取出进行处理然后响应。
  • 既然是数据同步的策略,那具体的实现就应该在soul-sync-data-center中,http长轮询就应该对应soul-sync-data-http模块(soul的项目结构本身就设计的很好,并且命名规范,按照功能进行模块拆分,具体的功能点可以在对应的模块中找到,在加上阅读源码时的‘假设性原则’,定位到核心代码处更是分分钟的事情,再不济也能猜到入口,一步步debug)

阅读全文 »

soul源码阅读-zookeeper数据同步策略流程详解

发表于 2021-01-27 | 分类于 网关 , soul , Java

前言

  • 这次讲解soul中使用zookeeper网关同步数据的流程,照旧参考官方文档的说明。

  • 依赖zookeeper的 watch 机制,soul-web会监听配置的节点,soul-admin在启动的时候,会将数据全量写入 zookeeper,后续数据发生变更时,会增量更新zookeeper的节点,与此同时,soul-web 会监听配置信息的节点,一旦有信息变更时,会更新本地缓存

阅读全文 »

soul源码阅读-nacos数据同步策略流程详解

发表于 2021-01-27 | 分类于 网关 , soul , Java

前言

  • 在soul新版本中添加了nacos的数据同步策略,大致的同步流程如下

    1
    2
    3
    4
    5
    6
    graph LR

    admin[admin启动向nacos中发送数据] --> nacos[nacos]
    web[web启动时从nacos中拿取数据更新本地缓存] -->nacos[nacos]

    nacos1[nacos中的数据发生更改,发送更改数据到web] --> web1[web]
  • 因为目前版本admin中的nacos数据同步,没有在启动时将数据同步到nacos中,所以目前来说,nacos数据同步方案在启动的时候需要手动进行同步,关于nacos同步策略的使用以及可能遇到的坑,可以参考Soul网关源码阅读(十六)Nacos数据同步示例运行

阅读全文 »
12
Zhoutzzz

Zhoutzzz

一枚Java Coder的个人学习基地

17 日志
7 分类
8 标签
GitHub E-Mail
© 2021 Zhoutzzz
本站访客数人
| 本站总访问量次