WebFlux之动态路由
因为想基于 WebFlux 实现一个网关的路由转发功能,且 Gateway 也是基于 WebFlux 实现的网关,网上也有许多 Gateway 的动态路由教程,而本章主要将单纯基于 WebFlux 如何实现动态路由
WebFlux 接口实现
首先是最底层 [HttpHandler]{.label .primary} 接口,该接口是对 [HTTP]{.label .primary} 请求处理的最底层抽象,通过这个接口将不同“接口”实现方式统一起来。所以不管是注解控制器方式的编程还是函数式的编程,在初始化环境时,都需要初始化这么一个类的实例,[HttpHandler]{.label .primary} 的定义如下所示
package org.springframework.http.server.reactive;
import reactor.core.publisher.Mono;
public interface HttpHandler {
Mono<Void> handle(ServerHttpRequest request, ServerHttpResponse response);
}
注解方式例子
基于如下就实现了一个 /api/mono2 接口
import lombok.extern.slf4j.Slf4j;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.bind.annotation.GetMapping;
import reactor.core.publisher.Mono;
@RestController
@RequestMapping("/api/")
@Slf4j
public class HelloController {
@GetMapping(value = "/mono2")
public Mono<String> monoDemo() {
log.info("start...");
Mono<String> mono = Mono.fromSupplier(() -> someString("hello webflux"));
log.info("end...");
return mono;
}
}
函数方式例子
基于如下就实现了一个 /api/mono 接口
import lombok.extern.slf4j.Slf4j;
import org.springframework.web.reactive.function.server.ServerRequest;
import org.springframework.web.reactive.function.server.ServerResponse;
import org.springframework.web.reactive.function.server.RouterFunction;
import static org.springframework.web.reactive.function.server.ServerResponse.ok;
import static org.springframework.web.reactive.function.server.RouterFunctions.route;
import static org.springframework.web.reactive.function.server.RequestPredicates.GET;
import reactor.core.publisher.Mono;
@Configuration
@Slf4j
public class RouterConfig {
@Bean
public RouterFunction<ServerResponse> timerRouter() {
return route(GET("/api/mono"), req -> this.monoDemo(req));
}
public RouterFunction<ServerResponse> monoDemo(ServerRequest serverRequest){
log.info("start...");
Mono<String> mono = Mono.fromSupplier(() -> someString("hello webflux"));
log.info("end...");
return ok().body(mono,String.class);
}
}
所以我将使用函数方式实现动态路由
函数式接口请求过程
首先 [DispatcherHandler]{.label .primary} 类
该类是接口 WebHandler 的实现。WebHandler 在 HttpHandler 功能上提供链式处理能力,可以组合异常处理器(WebExceptionHandler)、过滤器(WebFilter)以及目标。
// 地址:package org.springframework.web.reactive
@Override
public Mono<Void> handle(ServerWebExchange exchange) {
if (this.handlerMappings == null) { //是否有处理工具
return createNotFoundError();
}
return Flux.fromIterable(this.handlerMappings) // 便利 handlerMappings
.concatMap(mapping -> mapping.getHandler(exchange))// 一个一个取出去执行 getHandler
.next() //Flux转Mono
.switchIfEmpty(createNotFoundError()) // 如果前面数据为空
.flatMap(handler -> invokeHandler(exchange, handler))//只取一个数据
.flatMap(result -> handleResult(exchange, result));//只取一个数据
}
在 [getHandler]{.label .primary} 方法处进入 [AbstractHandlerMapping]{.label .primary} 类
// 地址:package org.springframework.web.reactive.handler;
@Override
public Mono<Object> getHandler(ServerWebExchange exchange) {
return getHandlerInternal(exchange).map(handler -> { // 注意 getHandlerInternal 方法
if (logger.isDebugEnabled()) {
logger.debug(exchange.getLogPrefix() + "Mapped to " + handler);
}
ServerHttpRequest request = exchange.getRequest();
if (hasCorsConfigurationSource(handler) || CorsUtils.isPreFlightRequest(request)) {
CorsConfiguration config = (this.corsConfigurationSource != null ? this.corsConfigurationSource.getCorsConfiguration(exchange) : null);
CorsConfiguration handlerConfig = getCorsConfiguration(handler, exchange);
config = (config != null ? config.combine(handlerConfig) : handlerConfig);
if (!this.corsProcessor.process(config, exchange) || CorsUtils.isPreFlightRequest(request)) {
return REQUEST_HANDLED_HANDLER;
}
}
return handler;
});
}
通过 [getHandlerInternal]{.label .primary} 方法进入 [RouterFunctionMapping]{.label .primary} 类
// 地址:package org.springframework.web.reactive.function.server.support;
@Override
protected Mono<?> getHandlerInternal(ServerWebExchange exchange) {
if (this.routerFunction != null) { //注意 routerFunction 变量。从这可看到熟悉接口路径
ServerRequest request = ServerRequest.create(exchange, this.messageReaders);
return this.routerFunction.route(request)
.doOnNext(handler -> setAttributes(exchange.getAttributes(), request, handler));
}
else {
return Mono.empty();
}
}
总结:程序运行时需要动态改变 [RouterFunctionMapping]{.label .primary} 类下的 [routerFunction]{.label .primary} 变量。增加接口和函数的映射关系。
WebHandler 和 HttpHandler 关系图
函数式接口初始化过程
首先 [RouterFunctionMapping]{.label .primary} 类初始化
// 地址:package org.springframework.web.reactive.function.server.support;
protected void initRouterFunctions() { // 通过这个方法初始化路由 routerFunction 变量
List<RouterFunction<?>> routerFunctions = routerFunctions(); //RouterFunction 函数
this.routerFunction = routerFunctions.stream().reduce(RouterFunction::andOther).orElse(null);
logRouterFunctions(routerFunctions);
}
//通过 routerFunctions 方法。获取所有的 RouterFunction。
private List<RouterFunction<?>> routerFunctions() {
List<RouterFunction<?>> functions = obtainApplicationContext() // 获取上下文
.getBeanProvider(RouterFunction.class) // 获取 RouterFunction 的bean
.orderedStream()
.map(router -> (RouterFunction<?>)router)
.collect(Collectors.toList());
return (!CollectionUtils.isEmpty(functions) ? functions : Collections.emptyList());
}
实现动态路由
实现修改[Spring]{.label .primary} 上下文中的单例 [bean]{.label .primary}
package com.cn.demotest.bean;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.support.DefaultSingletonBeanRegistry;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.support.AbstractApplicationContext;
import org.springframework.stereotype.Component;
/**
* <P>
* bean工厂
* </p>
*
* @author 昔日织
* @since 2021-07-02
*/
@Component
public class RouterBeanFactory implements ApplicationContextAware {
private ApplicationContext applicationContext;
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
this.applicationContext = applicationContext;
}
/**
* 获取bean注册表
* @return
*/
public DefaultSingletonBeanRegistry getBeanRegistry(){
return (DefaultSingletonBeanRegistry)((AbstractApplicationContext)applicationContext).getBeanFactory();
}
public void upDateBean(String name,Object o){
DefaultSingletonBeanRegistry beanRegistry = getBeanRegistry();
if(beanRegistry.containsSingleton(name)){
beanRegistry.destroySingleton(name);
}
beanRegistry.registerSingleton(name,o);
}
}
新增或者修改路由的 [updateRouter]{.label .primary}
import org.springframework.stereotype.Component;
import static org.springframework.web.reactive.function.server.RouterFunctions.route
import org.springframework.web.reactive.function.server.RouterFunction;
import org.springframework.web.reactive.function.server.support.RouterFunctionMapping;
import java.lang.reflect.Method;
import java.lang.reflect.InvocationTargetException;
@Component
public class updateRouter implements ApplicationContextAware {
@Autowired
private RouterBeanFactory routerBeanFactory;
private ApplicationContext applicationContext;
public void creat(String appName){
String format = String.format("/%s/**", appName);
RouterFunction<ServerResponse> appRouter = route(path(format), serverRequest -> requestTranspond(serverRequest, appName)); //requestTranspond 方法是用于接口转发。
routerBeanFactory.upDateBean("bean"+appName,appRouter);
RouterFunctionMapping routerFunctionMapping = applicationContext.getBean(RouterFunctionMapping.class);
Class<? extends RouterFunctionMapping> aClass = routerFunctionMapping.getClass();
try {
Method initRouterFunctions = aClass.getDeclaredMethod("initRouterFunctions", new Class[] {});
initRouterFunctions.setAccessible(true);
initRouterFunctions.invoke(routerFunctionMapping,new Object[] {});
} catch (NoSuchMethodException e) {
e.printStackTrace();
} catch (IllegalAccessException e) {
e.printStackTrace();
} catch (InvocationTargetException e) {
e.printStackTrace();
}
}
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
this.applicationContext = applicationContext;
}
}
至此调用 [creat]{.label .primary} 方法可以增加一个新的接口。
实现新服务接入增加接口(简)
网关可以通过服务注册发现来对外提供接口。如果新服务来了则增加一套对应接口。
实现思路
- 实现 [ApplicationRunner]{.label .primary} 接口。使[Spring boot]{.label .primary} 启动完毕后会自动回调这个方法。
- 实现 [ApplicationContextAware]{.label .primary} 方法获取上下文 [applicationContext]{.label .primary} 。
- 通过 [applicationContext]{.label .primary} 获取 [Controller]{.label .primary} 注解过的单例对象。
- 遍历对象拼接得到所有接口
- 带个唯一标识和这些接口与 [ip]{.label .primary} 和端口信息注册到网关上。
- 通过 [creat]{.label .primary}方法网关增加一个 /唯一标识/** 的路由接口。
- 访问该服务就是 网关的网址/唯一标识/服务的接口路径。网关在通过 [requestTranspond]{.label .primary} 方法进行转发
[requestTranspond]{.label .primary}方法请求参考文章 WebFlux 之 WebClient的使用记录
总结
功能实现有些潦草,有什么建议可以留言。如有不足之处,还望指出。