WebFlux 之 WebClient的使用记录

WebFlux 是一款基于 Netty 和 Reactive 技术栈构开发的一款 Spring 框架。该框架相比原来 Web 框架能有效提高 TPS 。其原理是基于 Netty 实现的事件驱动达到对线程的高可用,也实现了非阻塞。通过 Reactor 设计模式,使原有请求(线程)不需要通过等待过程实现(如:方法内的处理过程或远程调用或者数据库调用)导致系统资源的损耗。而 WebFlux 则是请求调用后则高高挂起,等待程序通知返回响应结果,然后它再离开。这个过程就是一种观察者模式。观察是否有处理结果了再做反应。而这也是 WebFlux 的响应式编程。

WebClient 是什么

[WebClient]{.label .primary}是非阻塞式 [Reactive HTTP]{.label .primary} 客户端,是 [Spring 5]{.label .primary} 中引入了。而以前用的 [RestTemplate]{.label .primary} 是阻塞式 [HTTP]{.label .primary} 客户端。

WebClient 使用

普通的接口

@GetMapping(value = "/test")
public String monoDemo(String name) {
    try {
            TimeUnit.SECONDS.sleep(5);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    return name;
}

[WebClient]{.label .primary} 调用

@GetMapping(value = "/test2")
public Mono<String> monoDemo2() {
    System.out.println("start...");
	Mono<String> stringMono = WebClient.create("http://127.0.0.1:8080/test?name=昔日长廊")
        .get() // 请求方式 也可 .method(HttpMethod.GET)
        .retrieve() // 对返回结果的简易处理,对应结果集 .exchange()是高级处理,对应响应体
        .bodyToMono(String.class) //对数据进行转为 Mono(T)
        .flatMap(f -> { // 对第一个数据进行处理并返回一个 Mono(T) 格式对象。也有可能是 Flux 要看上游
            System.out.println(f);
            return Mono.just(f); //将数据转为 Mono(T)
        });
    System.out.println("end...");
    return stringMono;
}

调用 [test2]{.label .primary}。打印结果

start...
end...
昔日长廊
// 返回结果是5秒后的“昔日长廊”字符串

WebClient 发送带有自定义字段的请求头

@GetMapping(value = "/mono4")
public Mono<String> monoDemo3() {
    WebClient webClient = WebClient.create();
    Map<String, String> stringStringMap = new HashMap<>();
    stringStringMap.put("name","昔日织");
    stringStringMap.put("password","昔日长廊");
    return webClient.method(HttpMethod.GET)
            .uri("http://127.0.0.1:8090/list?current=1&size=1000&buildState=true")
            .headers(p->{
                // 重点:通过设置 Access-Control-Expose-Headers 防止自定义请求头丢失。可设 token
                p.add("Access-Control-Expose-Headers","*");
                // 携带自定义请求头凭据
				p.add("token","eyJleHBpcmF0aW9uIjoxNj");
            })
            .body(BodyInserters.fromValue(stringStringMap))
            .retrieve()
            .bodyToMono(String.class);
}

使用 WebClient 做网关转发

WebFlux 框架的特性非常适用于网关。像 Gateway 也是基于 WebFlux 基础实现的一个微服务网关。

实现:将 [/almp/api/user]{.label .primary} 接口转发到 [/api/user]{.label .primary} 接口下。

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.reactive.function.BodyInserters;
import org.springframework.web.reactive.function.server.ServerRequest;
import org.springframework.web.reactive.function.server.ServerResponse;
import org.springframework.web.reactive.function.server.RouterFunction;
import static org.springframework.web.reactive.function.server.RequestPredicates.path;
import static org.springframework.web.reactive.function.server.RouterFunctions.route;
import static org.springframework.web.reactive.function.server.ServerResponse.ok;
import org.springframework.web.reactive.function.client.WebClient;
import org.springframework.http.MediaType;
import reactor.core.publisher.Mono;
import java.util.*;

@Configuration
public class RouterConfig {
    @Bean
    public RouterFunction<ServerResponse> dynamicRouter() {
        return route(path("/almp/**"),res->this.send(res,"almp"));
    }
    /**
     * 转发器
     * @param serverRequest
     * @return
     */
    public Mono<ServerResponse> send(ServerRequest serverRequest,String appName) {
        // url 去除前面的 almp 
        String url = serverRequest.path().replace("/"+appName, "");
        // 拼接一个带 GET 请求参数的路径
        String path = String.format("http://127.0.0.1:8080%s?%s",url,serverRequest.uri().getQuery());
        WebClient webClient = WebClient.create();
        Mono<String> stringMono = serverRequest.bodyToMono(Map.class) //将json传参转义为Map
            // //如果参数为空则设置一个空的Map。不然无法进入 flatMap
            .switchIfEmpty(Mono.just(new HashMap<>()))
            .flatMap(p -> {
         		return webClient.method(serverRequest.method()) //设置请求的类型
                				.uri(path)//设置接口
                				.headers(g -> {
                                    // 设置请求头防止过滤自定义字段
                    g.setAll(serverRequest.headers().asHttpHeaders().toSingleValueMap());
                    g.set("Access-Control-Expose-Headers","*");
                			})
                         		.body(BodyInserters.fromValue(p)) //传递json格式参数
                         		.retrieve() //获取简易的返回结果
                         		.bodyToMono(String.class);//将返回结果转为字符串
        });
        // contentType 返回内容是 application/json。可理解为被@ResponseBody注解的接口返回的格式
        // body 这个Mono(T) T的类型是什么
        return ok().contentType(MediaType.APPLICATION_JSON).body(stringMono, String.class);
    }
}

经测试可以转发 [GET]{.label .primary} 请求,[PROD]{.label .primary} 请求[JSON]{.label .primary} 传值方式的接口。不支持 [form]{.label .primary} 表达传值的方式。未测试应该可实现对其它请求传值方式的接口转发。

遇到的问题

从 ServerRequest 取出参数问题

[ServerRequest]{.label .primary} 类在使用过程中要和最后的返回 [body]{.label .primary} 里面要有关联,不然会导致 [ServerRequest]{.label .primary} 的[bodyToMono]{.label .primary} 产生的 [Mono]{.label .primary} 或者 [Flux]{.label .primary} 不执行。如果使用 [block]{.label .primary} 等结束操作会直接导致报错,报错的原因应该是连接断掉了。也不能通过 [subscribe]{.label .primary} 订阅的方式获取参数。只有让 [ServerRequest]{.label .primary} 产生的 [Mono]{.label .primary} 或者 [Flux]{.label .primary} 在返回结果的 [ServerResponse]{.label .primary} 里,整体构成一个回路。才会去执行 [ServerRequest]{.label .primary} 的 [bodyToMono]{.label .primary} 后面的数据操作流程。所以如果我将上面情况改成如下,则 从 [serverRequest.bodyToMono]{.label .primary} 开始不会运行。

return ok().contentType(MediaType.APPLICATION_JSON).body(stringMono, String.class);
return ok().contentType(MediaType.APPLICATION_JSON).body(Mono.just("自定义字符串"), String.class);

其问题的原因是 [Mono]{.label .primary} 和 [Flux]{.label .primary} 的执行原理。而 [ServerRequest]{.label .primary} 正好又是一个特殊的东西。所以才会有这种情况。

设置 headers 问题

// 最开始的代码
.headers(g -> {
    g = serverRequest.headers().asHttpHeaders();
    g.set("Access-Control-Expose-Headers","*");
})
// 现在
.headers(g -> {
    g.setAll(serverRequest.headers().asHttpHeaders().toSingleValueMap());
    g.set("Access-Control-Expose-Headers","*");
})

最开始直接将 [ServerRequest]{.label .primary} 的请求头对象替换 [WebClient]{.label .primary} 发起的请求头,这个步骤没有报错,但之后对 [WebClient]{.label .primary} 的请求头设置 [Access-Control-Expose-Headers=*]{.label .primary} 的时候会报错。

原因是 [ServerRequest]{.label .primary} 请求体内的 [HttpHeaders]{.label .primary} 对象是 基于 [HttpHeaders]{.label .primary} 接口实现的 [ReadOnlyHttpHeaders]{.label .primary} 实例对象。这个实例的对象不能对请求头进行编辑修改。所以之后请求头设置 [Access-Control-Expose-Headers=*]{.label .primary} 的时候会报错。

总结

第一次知道 [WebFlux]{.label .primary} 并对此进行了解和学习。感觉 WebFlux 的响应式编程有点烧脑。

本文如果有什么问题还望各位多多包涵,请指出我的错误。互相学习。