概述
这里是 SpringCloud Gateway
实践的第一篇,主要讲过滤器的相关实现。Spring-Cloud-Gateway是以 WebFlux
为基础的响应式架构设计, 是异步非阻塞式的,它能够充分利用多核 CPU 的硬件资源去处理大量的并发请求。
本篇将基于 spring-cloud-gateway简介 基础环境进行改造。
工作原理
Spring-Cloud-Gateway基于过滤器实现,同zuul类似,有pre和post两种方式的filter,分别处理前置逻辑和后置逻辑。客户端的请求先经过pre类型的filter,然后将请求转发到具体的业务服务,收到业务服务的响应之后,再经过post类型的filter处理,最后返回响应到客户端。
过滤器执行流程如下,order越大,优先级越低
接下来我们来验证下 filter
执行顺序。
这里创建3个过滤器,分别配置不同的优先级
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55
| @Slf4j public class AFilter implements GlobalFilter { @Override public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) { log.info("AFilter前置逻辑"); return chain.filter(exchange).then(Mono.fromRunnable(() -> { log.info("AFilter后置逻辑"); })); } }
@Slf4j public class BFilter implements GlobalFilter { @Override public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) { log.info("BFilter前置逻辑"); return chain.filter(exchange).then(Mono.fromRunnable(() -> { log.info("BFilter后置逻辑"); })); } }
@Slf4j public class CFilter implements GlobalFilter { @Override public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) { log.info("CFilter前置逻辑"); return chain.filter(exchange).then(Mono.fromRunnable(() -> { log.info("CFilter后置逻辑"); })); } }
@Configuration public class FilterConfig {
@Bean @Order(-1) public GlobalFilter a() { return new AFilter(); }
@Bean @Order(0) public GlobalFilter b() { return new BFilter(); }
@Bean @Order(1) public GlobalFilter c() { return new CFilter(); } }
|
1 2 3
| curl -X POST -H "Content-Type:application/json" -d '{"name": "admin"}' http://192.168.124.5:2000/p/provider1
curl -X GET -G -d "username=admin" http://192.168.124.5:2000/p/provider1/1
|
查看网关输出日志
1 2 3 4 5 6 7
| 2020-03-29 16:23:22.832 INFO 59326 --- [ctor-http-nio-6] cn.idea360.gateway.filter1.AFilter : AFilter前置逻辑 2020-03-29 16:23:22.832 INFO 59326 --- [ctor-http-nio-6] cn.idea360.gateway.filter1.BFilter : BFilter前置逻辑 2020-03-29 16:23:22.832 INFO 59326 --- [ctor-http-nio-6] cn.idea360.gateway.filter1.CFilter : CFilter前置逻辑
2020-03-29 16:23:22.836 INFO 59326 --- [ctor-http-nio-6] cn.idea360.gateway.filter1.CFilter : CFilter后置逻辑 2020-03-29 16:23:22.836 INFO 59326 --- [ctor-http-nio-6] cn.idea360.gateway.filter1.BFilter : BFilter后置逻辑 2020-03-29 16:23:22.836 INFO 59326 --- [ctor-http-nio-6] cn.idea360.gateway.filter1.AFilter : AFilter后置逻辑
|
自定义过滤器
现在假设我们要统计某个服务的响应时间,我们可以在代码中
1 2 3 4
| long beginTime = System.currentTimeMillis();
long elapsed = System.currentTimeMillis() - beginTime; log.info("elapsed: {}ms", elapsed);
|
每次都要这么写是不是很烦?Spring 告诉我们有个东西叫 AOP。但是我们是微服务啊,在每个服务里都写也很烦。这时候就该网关的过滤器登台表演了。
自定义过滤器需要实现 GatewayFilter
和 Ordered
。其中 GatewayFilter
中的这个方法就是用来实现你的自定义的逻辑的
1
| Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain);
|
而 Ordered
中的 int getOrder()
方法是来给过滤器设定优先级别的,值越大则优先级越低。
好了,让我们来撸代码吧.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
|
public class ElapsedFilter implements GatewayFilter, Ordered {
private static final String ELAPSED_TIME_BEGIN = "elapsedTimeBegin";
@Override public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) { exchange.getAttributes().put(ELAPSED_TIME_BEGIN, System.currentTimeMillis()); return chain.filter(exchange).then( Mono.fromRunnable(() -> { Long startTime = exchange.getAttribute(ELAPSED_TIME_BEGIN); if (startTime != null) { System.out.println(exchange.getRequest().getURI().getRawPath() + ": " + (System.currentTimeMillis() - startTime) + "ms"); } }) ); }
@Override public int getOrder() { return Ordered.LOWEST_PRECEDENCE; } }
|
我们在请求刚刚到达时,往 ServerWebExchange
中放入了一个属性 elapsedTimeBegin
,属性值为当时的毫秒级时间戳。然后在请求执行结束后,又从中取出我们之前放进去的那个时间戳,与当前时间的差值即为该请求的耗时。因为这是与业务无关的日志所以将 Ordered
设为 Integer.MAX_VALUE
以降低优先级。
现在再来看我们之前的问题:怎么来区分是 “pre” 还是 “post” 呢?其实就是 chain.filter(exchange)
之前的就是 “pre” 部分,之后的也就是 then
里边的是 “post” 部分。
创建好 Filter 之后我们将它添加到我们的 Filter Chain 里边
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
| @Configuration public class FilterConfig {
@Bean public RouteLocator customerRouteLocator(RouteLocatorBuilder builder) { return builder.routes() .route(r -> r.path("/filter/**") .filters(f -> f.stripPrefix(1) .filter(new ElapsedFilter())) .uri("lb://idc-cloud-provider") .order(0) .id("filter") ) .build(); }
}
|
基于全局过滤器实现审计功能
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131
|
@Component public class LogFilter implements GlobalFilter, Ordered {
private Logger log = LoggerFactory.getLogger(LogFilter.class);
private final ObjectMapper objectMapper = new ObjectMapper(); private static final String START_TIME = "startTime"; private static final List<HttpMessageReader<?>> messageReaders = HandlerStrategies.withDefaults().messageReaders();
@Override public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
ServerHttpRequest request = exchange.getRequest(); String path = request.getPath().pathWithinApplication().value(); String scheme = request.getURI().getScheme(); HttpMethod method = request.getMethod(); URI targetUri = exchange.getAttribute(ServerWebExchangeUtils.GATEWAY_REQUEST_URL_ATTR); HttpHeaders headers = request.getHeaders(); exchange.getAttributes().put(START_TIME, System.currentTimeMillis()); InetSocketAddress remoteAddress = request.getRemoteAddress();
MultiValueMap<String, String> formData = null;
AccessRecord accessRecord = new AccessRecord(); accessRecord.setPath(path); accessRecord.setSchema(scheme); accessRecord.setMethod(method.name()); accessRecord.setTargetUri(targetUri.toString()); accessRecord.setRemoteAddress(remoteAddress.toString()); accessRecord.setHeaders(headers);
if (method == HttpMethod.GET) { formData = request.getQueryParams(); accessRecord.setFormData(formData); writeAccessRecord(accessRecord); }
if (method == HttpMethod.POST) { Mono<Void> voidMono = null; if (headers.getContentType().equals(MediaType.APPLICATION_JSON)) { voidMono = readBody(exchange, chain, accessRecord); }
if (headers.getContentType().equals(MediaType.APPLICATION_FORM_URLENCODED)) { voidMono = readFormData(exchange, chain, accessRecord); }
if (voidMono != null) { return voidMono; }
}
return chain.filter(exchange); }
private Mono<Void> readFormData(ServerWebExchange exchange, GatewayFilterChain chain, AccessRecord accessRecord) { return null; }
private Mono<Void> readBody(ServerWebExchange exchange, GatewayFilterChain chain, AccessRecord accessRecord) {
return DataBufferUtils.join(exchange.getRequest().getBody()).flatMap(dataBuffer -> {
byte[] bytes = new byte[dataBuffer.readableByteCount()]; dataBuffer.read(bytes); DataBufferUtils.release(dataBuffer); Flux<DataBuffer> cachedFlux = Flux.defer(() -> { DataBuffer buffer = exchange.getResponse().bufferFactory().wrap(bytes); DataBufferUtils.retain(buffer); return Mono.just(buffer); });
ServerHttpRequest mutatedRequest = new ServerHttpRequestDecorator(exchange.getRequest()) { @Override public Flux<DataBuffer> getBody() { return cachedFlux; } };
ServerWebExchange mutatedExchange = exchange.mutate().request(mutatedRequest).build();
return ServerRequest.create(mutatedExchange, messageReaders) .bodyToMono(String.class) .doOnNext(objectValue -> { accessRecord.setBody(objectValue); writeAccessRecord(accessRecord); }).then(chain.filter(mutatedExchange)); }); }
@Override public int getOrder() { return Ordered.LOWEST_PRECEDENCE; }
private void writeAccessRecord(AccessRecord accessRecord) {
log.info("\n\n start------------------------------------------------- \n " + "请求路径:{}\n " + "scheme:{}\n " + "请求方法:{}\n " + "目标服务:{}\n " + "请求头:{}\n " + "远程IP地址:{}\n " + "表单参数:{}\n " + "请求体:{}\n " + "end------------------------------------------------- \n ", accessRecord.getPath(), accessRecord.getSchema(), accessRecord.getMethod(), accessRecord.getTargetUri(), accessRecord.getHeaders(), accessRecord.getRemoteAddress(), accessRecord.getFormData(), accessRecord.getBody()); } }
|
1 2 3
| curl -X POST -H "Content-Type:application/json" -d '{"name": "admin"}' http://192.168.124.5:2000/p/provider1
curl -X GET -G -d "username=admin" http://192.168.124.5:2000/p/provider1/1
|
输出结果
1 2 3 4 5 6 7 8 9 10
| start------------------------------------------------- 请求路径:/provider1 scheme:http 请求方法:POST 目标服务:http://192.168.124.5:2001/provider1 请求头:[Content-Type:"application/json", User-Agent:"PostmanRuntime/7.22.0", Accept:"*/*", Cache-Control:"no-cache", Postman-Token:"2a4ce04d-8449-411d-abd8-247d20421dc2", Host:"192.168.124.5:2000", Accept-Encoding:"gzip, deflate, br", Content-Length:"16", Connection:"keep-alive"] 远程IP地址:/192.168.124.5:49969 表单参数:null 请求体:{"name":"admin"} end-------------------------------------------------
|
接下来,我们来配置日志,方便日志系统提取日志。SpringBoot默认的日志为logback。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45
| <?xml version="1.0" encoding="UTF-8"?> <configuration>
<property name="LOGS" value="/Users/cuishiying/Documents/spring-cloud-learning/logs" />
<appender name="Console" class="ch.qos.logback.core.ConsoleAppender"> <layout class="ch.qos.logback.classic.PatternLayout"> <Pattern> %black(%d{ISO8601}) %highlight(%-5level) [%blue(%t)] %yellow(%C{1.}): %msg%n%throwable </Pattern> </layout> </appender>
<appender name="RollingFile" class="ch.qos.logback.core.rolling.RollingFileAppender"> <file>${LOGS}/spring-boot-logger.log</file> <encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder"> <Pattern>%d %p %C{1.} [%t] %m%n</Pattern> </encoder>
<rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy"> <fileNamePattern>${LOGS}/archived/spring-boot-logger-%d{yyyy-MM-dd}.%i.log </fileNamePattern> <timeBasedFileNamingAndTriggeringPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedFNATP"> <maxFileSize>10MB</maxFileSize> </timeBasedFileNamingAndTriggeringPolicy> </rollingPolicy> </appender>
<root level="info"> <appender-ref ref="Console" /> </root>
<logger name="cn.idea360.gateway" level="info" additivity="false"> <appender-ref ref="RollingFile" /> <appender-ref ref="Console" /> </logger>
</configuration>
|
这样console和日志目录下就都有日志了。
自定义过滤器工厂
如果你看过静态路由的配置,你应该对如下配置有印象。
1 2 3
| filters: - StripPrefix=1 - AddResponseHeader=X-Response-Default-Foo, Default-Bar
|
StripPrefix
、AddResponseHeader
这两个实际上是两个过滤器工厂(GatewayFilterFactory),用这种配置的方式更灵活方便。
我们就将之前的那个 ElapsedFilter
改造一下,让它能接收一个 boolean
类型的参数,来决定是否将请求参数也打印出来。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52
| public class ElapsedGatewayFilterFactory extends AbstractGatewayFilterFactory<ElapsedGatewayFilterFactory.Config> {
private static final Log log = LogFactory.getLog(GatewayFilter.class); private static final String ELAPSED_TIME_BEGIN = "elapsedTimeBegin"; private static final String KEY = "withParams";
public List<String> shortcutFieldOrder() { return Arrays.asList(KEY); }
public ElapsedGatewayFilterFactory() { super(Config.class); }
public GatewayFilter apply(Config config) { return (exchange, chain) -> { exchange.getAttributes().put(ELAPSED_TIME_BEGIN, System.currentTimeMillis()); return chain.filter(exchange).then( Mono.fromRunnable(() -> { Long startTime = exchange.getAttribute(ELAPSED_TIME_BEGIN); if (startTime != null) { StringBuilder sb = new StringBuilder(exchange.getRequest().getURI().getRawPath()) .append(": ") .append(System.currentTimeMillis() - startTime) .append("ms"); if (config.isWithParams()) { sb.append(" params:").append(exchange.getRequest().getQueryParams()); } log.info(sb.toString()); } }) ); }; }
public static class Config {
private boolean withParams;
public boolean isWithParams() { return withParams; }
public void setWithParams(boolean withParams) { this.withParams = withParams; }
} }
|
过滤器工厂的顶级接口是 GatewayFilterFactory
,我们可以直接继承它的两个抽象类来简化开发 AbstractGatewayFilterFactory
和 AbstractNameValueGatewayFilterFactory
,这两个抽象类的区别就是前者接收一个参数(像 StripPrefix
和我们创建的这种),后者接收两个参数(像 AddResponseHeader
)。
GatewayFilter apply(Config config)
方法内部实际上是创建了一个 GatewayFilter
的匿名类,具体实现和之前的几乎一样,就不解释了。
静态内部类 Config
就是为了接收那个 boolean
类型的参数服务的,里边的变量名可以随意写,但是要重写 List shortcutFieldOrder()
这个方法。
这里注意一下,一定要调用一下父类的构造器把 Config
类型传过去,否则会报 ClassCastException
1 2 3
| public ElapsedGatewayFilterFactory() { super(Config.class); }
|
工厂类我们有了,再把它注册到 Spring 当中
1 2 3 4
| @Bean public ElapsedGatewayFilterFactory elapsedGatewayFilterFactory() { return new ElapsedGatewayFilterFactory(); }
|
然后添加配置(主要改动在 default-filters
配置)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33
| server: port: 2000 spring: application: name: idc-gateway redis: host: localhost port: 6379 timeout: 6000ms jedis: pool: max-active: 1000 max-wait: -1ms max-idle: 10 min-idle: 5 cloud: consul: host: localhost port: 8500 gateway: discovery: locator: enabled: true default-filters: - Elapsed=true routes: - id: provider uri: lb://idc-provider1 predicates: - Path=/p/** filters: - StripPrefix=1
|
结语
本文到此结束。关于 Webflux
的学习刚入门,觉得可以像 Rxjava
那样在 onNext
中拿到异步数据,然而在 post
获取body中没生效。经测试可知 getBody
获得的数据输出为null,而自己通过 Flux.create
创建的数据可以在订阅者中获取到。此处还有待研究,希望抛砖引玉,大家有研究出来的不吝赐教。同时,希望大家关注公众号【当我遇上你】。
参考