WebFlux之动态路由

因为想基于 WebFlux 实现一个网关的路由转发功能,且 Gateway 也是基于 WebFlux 实现的网关,网上也有许多 Gateway 的动态路由教程,而本章主要将单纯基于 WebFlux 如何实现动态路由

WebFlux 接口实现

首先是最底层 [HttpHandler]{.label .primary} 接口,该接口是对 [HTTP]{.label .primary} 请求处理的最底层抽象,通过这个接口将不同“接口”实现方式统一起来。所以不管是注解控制器方式的编程还是函数式的编程,在初始化环境时,都需要初始化这么一个类的实例,[HttpHandler]{.label .primary} 的定义如下所示

package org.springframework.http.server.reactive;
import reactor.core.publisher.Mono;
public interface HttpHandler {
    Mono<Void> handle(ServerHttpRequest request, ServerHttpResponse response);
}

注解方式例子

基于如下就实现了一个 /api/mono2 接口

import lombok.extern.slf4j.Slf4j;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.bind.annotation.GetMapping;
import reactor.core.publisher.Mono;

@RestController
@RequestMapping("/api/")
@Slf4j
public class HelloController {
    @GetMapping(value = "/mono2")
    public Mono<String> monoDemo() {
        log.info("start...");
        Mono<String> mono = Mono.fromSupplier(() -> someString("hello webflux"));
        log.info("end...");
        return mono;
    }
}

函数方式例子

基于如下就实现了一个 /api/mono 接口

import lombok.extern.slf4j.Slf4j;
import org.springframework.web.reactive.function.server.ServerRequest;
import org.springframework.web.reactive.function.server.ServerResponse;
import org.springframework.web.reactive.function.server.RouterFunction;
import static org.springframework.web.reactive.function.server.ServerResponse.ok;
import static org.springframework.web.reactive.function.server.RouterFunctions.route;
import static org.springframework.web.reactive.function.server.RequestPredicates.GET;
import reactor.core.publisher.Mono;
@Configuration
@Slf4j
public class RouterConfig {
    
    @Bean
    public RouterFunction<ServerResponse> timerRouter() {
        return route(GET("/api/mono"), req -> this.monoDemo(req));
    }
    public RouterFunction<ServerResponse> monoDemo(ServerRequest serverRequest){
        log.info("start...");
        Mono<String> mono = Mono.fromSupplier(() -> someString("hello webflux"));
        log.info("end...");
        return ok().body(mono,String.class);
    }
}

所以我将使用函数方式实现动态路由

函数式接口请求过程

首先 [DispatcherHandler]{.label .primary} 类

该类是接口 WebHandler 的实现。WebHandler 在 HttpHandler 功能上提供链式处理能力,可以组合异常处理器(WebExceptionHandler)、过滤器(WebFilter)以及目标。

// 地址:package org.springframework.web.reactive
@Override
public Mono<Void> handle(ServerWebExchange exchange) {
    if (this.handlerMappings == null) { //是否有处理工具
        return createNotFoundError();
    }
    return Flux.fromIterable(this.handlerMappings) // 便利 handlerMappings
        .concatMap(mapping -> mapping.getHandler(exchange))// 一个一个取出去执行 getHandler
        .next() //Flux转Mono
        .switchIfEmpty(createNotFoundError()) // 如果前面数据为空
        .flatMap(handler -> invokeHandler(exchange, handler))//只取一个数据
        .flatMap(result -> handleResult(exchange, result));//只取一个数据
}

在 [getHandler]{.label .primary} 方法处进入 [AbstractHandlerMapping]{.label .primary} 类

// 地址:package org.springframework.web.reactive.handler;
@Override
public Mono<Object> getHandler(ServerWebExchange exchange) {
	return getHandlerInternal(exchange).map(handler -> { // 注意 getHandlerInternal 方法
		if (logger.isDebugEnabled()) {
			logger.debug(exchange.getLogPrefix() + "Mapped to " + handler);
		}
		ServerHttpRequest request = exchange.getRequest();
        if (hasCorsConfigurationSource(handler) || CorsUtils.isPreFlightRequest(request)) 		 {
			CorsConfiguration config = (this.corsConfigurationSource != null ? this.corsConfigurationSource.getCorsConfiguration(exchange) : null);
			CorsConfiguration handlerConfig = getCorsConfiguration(handler, exchange);
			config = (config != null ? config.combine(handlerConfig) : handlerConfig);
			if (!this.corsProcessor.process(config, exchange) || CorsUtils.isPreFlightRequest(request)) {
				return REQUEST_HANDLED_HANDLER;
			}
		}
		return handler;
	});
}

通过 [getHandlerInternal]{.label .primary} 方法进入 [RouterFunctionMapping]{.label .primary} 类

// 地址:package org.springframework.web.reactive.function.server.support;
@Override
protected Mono<?> getHandlerInternal(ServerWebExchange exchange) {
	if (this.routerFunction != null) { //注意 routerFunction 变量。从这可看到熟悉接口路径
		ServerRequest request = ServerRequest.create(exchange, this.messageReaders);
		return this.routerFunction.route(request)
				.doOnNext(handler -> setAttributes(exchange.getAttributes(), request, handler));
	}
	else {
		return Mono.empty();
	}
}

总结:程序运行时需要动态改变 [RouterFunctionMapping]{.label .primary} 类下的 [routerFunction]{.label .primary} 变量。增加接口和函数的映射关系。

WebHandler 和 HttpHandler 关系图

img

函数式接口初始化过程

首先 [RouterFunctionMapping]{.label .primary} 类初始化

// 地址:package org.springframework.web.reactive.function.server.support;
protected void initRouterFunctions() { // 通过这个方法初始化路由 routerFunction 变量
	List<RouterFunction<?>> routerFunctions = routerFunctions(); //RouterFunction 函数
	this.routerFunction = routerFunctions.stream().reduce(RouterFunction::andOther).orElse(null);
	logRouterFunctions(routerFunctions);
}
//通过 routerFunctions 方法。获取所有的 RouterFunction。
private List<RouterFunction<?>> routerFunctions() {
	List<RouterFunction<?>> functions = obtainApplicationContext() // 获取上下文
			.getBeanProvider(RouterFunction.class) // 获取 RouterFunction 的bean
			.orderedStream()
			.map(router -> (RouterFunction<?>)router)
			.collect(Collectors.toList());
	return (!CollectionUtils.isEmpty(functions) ? functions : Collections.emptyList());
}

实现动态路由

实现修改[Spring]{.label .primary} 上下文中的单例 [bean]{.label .primary}

package com.cn.demotest.bean;

import org.springframework.beans.BeansException;
import org.springframework.beans.factory.support.DefaultSingletonBeanRegistry;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.support.AbstractApplicationContext;
import org.springframework.stereotype.Component;

/**
 * <P>
 * bean工厂
 * </p>
 *
 * @author 昔日织
 * @since 2021-07-02
 */
@Component
public class RouterBeanFactory implements ApplicationContextAware {
    private ApplicationContext applicationContext;

    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        this.applicationContext = applicationContext;
    }

    /**
     * 获取bean注册表
     * @return
     */
    public DefaultSingletonBeanRegistry getBeanRegistry(){
        return (DefaultSingletonBeanRegistry)((AbstractApplicationContext)applicationContext).getBeanFactory();
    }

    public void upDateBean(String name,Object o){
        DefaultSingletonBeanRegistry beanRegistry = getBeanRegistry();
        if(beanRegistry.containsSingleton(name)){
            beanRegistry.destroySingleton(name);
        }
        beanRegistry.registerSingleton(name,o);
    }
}

新增或者修改路由的 [updateRouter]{.label .primary}

import org.springframework.stereotype.Component;
import static org.springframework.web.reactive.function.server.RouterFunctions.route
import org.springframework.web.reactive.function.server.RouterFunction;
import org.springframework.web.reactive.function.server.support.RouterFunctionMapping;
import java.lang.reflect.Method;
import java.lang.reflect.InvocationTargetException;

@Component
public class updateRouter implements ApplicationContextAware {
	@Autowired
    private RouterBeanFactory routerBeanFactory;
    
    private ApplicationContext applicationContext;
    
    public void creat(String appName){
        String format = String.format("/%s/**", appName);
        RouterFunction<ServerResponse> appRouter = route(path(format), serverRequest -> requestTranspond(serverRequest, appName)); //requestTranspond 方法是用于接口转发。
        routerBeanFactory.upDateBean("bean"+appName,appRouter);
        RouterFunctionMapping routerFunctionMapping = applicationContext.getBean(RouterFunctionMapping.class);
        Class<? extends RouterFunctionMapping> aClass = routerFunctionMapping.getClass();
        try {
            Method initRouterFunctions = aClass.getDeclaredMethod("initRouterFunctions", new Class[] {});
            initRouterFunctions.setAccessible(true);
            initRouterFunctions.invoke(routerFunctionMapping,new Object[] {});
        } catch (NoSuchMethodException e) {
            e.printStackTrace();
        } catch (IllegalAccessException e) {
            e.printStackTrace();
        } catch (InvocationTargetException e) {
            e.printStackTrace();
        }
    }
    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        this.applicationContext = applicationContext;
    }
}

至此调用 [creat]{.label .primary} 方法可以增加一个新的接口。

实现新服务接入增加接口(简)

网关可以通过服务注册发现来对外提供接口。如果新服务来了则增加一套对应接口。

实现思路

  • 实现 [ApplicationRunner]{.label .primary} 接口。使[Spring boot]{.label .primary} 启动完毕后会自动回调这个方法。
  • 实现 [ApplicationContextAware]{.label .primary} 方法获取上下文 [applicationContext]{.label .primary} 。
  • 通过 [applicationContext]{.label .primary} 获取 [Controller]{.label .primary} 注解过的单例对象。
  • 遍历对象拼接得到所有接口
  • 带个唯一标识和这些接口与 [ip]{.label .primary} 和端口信息注册到网关上。
  • 通过 [creat]{.label .primary}方法网关增加一个 /唯一标识/** 的路由接口。
  • 访问该服务就是 网关的网址/唯一标识/服务的接口路径。网关在通过 [requestTranspond]{.label .primary} 方法进行转发

[requestTranspond]{.label .primary}方法请求参考文章 WebFlux 之 WebClient的使用记录

总结

功能实现有些潦草,有什么建议可以留言。如有不足之处,还望指出。