spring-cloud-gateway过滤器实践

概述

这里是 SpringCloud Gateway 实践的第一篇,主要讲过滤器的相关实现。Spring-Cloud-Gateway是以 WebFlux 为基础的响应式架构设计, 是异步非阻塞式的,它能够充分利用多核 CPU 的硬件资源去处理大量的并发请求。

本篇将基于 spring-cloud-gateway简介 基础环境进行改造。

工作原理

Spring-Cloud-Gateway基于过滤器实现,同zuul类似,有prepost两种方式的filter,分别处理前置逻辑后置逻辑。客户端的请求先经过pre类型的filter,然后将请求转发到具体的业务服务,收到业务服务的响应之后,再经过post类型的filter处理,最后返回响应到客户端。

过滤器执行流程如下,order越大,优先级越低

接下来我们来验证下 filter 执行顺序。

这里创建3个过滤器,分别配置不同的优先级

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
@Slf4j
public class AFilter implements GlobalFilter {
@Override
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
log.info("AFilter前置逻辑");
return chain.filter(exchange).then(Mono.fromRunnable(() -> {
log.info("AFilter后置逻辑");
}));
}
}

@Slf4j
public class BFilter implements GlobalFilter {
@Override
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
log.info("BFilter前置逻辑");
return chain.filter(exchange).then(Mono.fromRunnable(() -> {
log.info("BFilter后置逻辑");
}));
}
}

@Slf4j
public class CFilter implements GlobalFilter {

@Override
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
log.info("CFilter前置逻辑");
return chain.filter(exchange).then(Mono.fromRunnable(() -> {
log.info("CFilter后置逻辑");
}));
}
}

@Configuration
public class FilterConfig {

@Bean
@Order(-1)
public GlobalFilter a() {
return new AFilter();
}

@Bean
@Order(0)
public GlobalFilter b() {
return new BFilter();
}

@Bean
@Order(1)
public GlobalFilter c() {
return new CFilter();
}
}
1
2
3
curl -X POST -H "Content-Type:application/json" -d '{"name": "admin"}' http://192.168.124.5:2000/p/provider1

curl -X GET -G -d "username=admin" http://192.168.124.5:2000/p/provider1/1

查看网关输出日志

1
2
3
4
5
6
7
2020-03-29 16:23:22.832  INFO 59326 --- [ctor-http-nio-6] cn.idea360.gateway.filter1.AFilter       : AFilter前置逻辑
2020-03-29 16:23:22.832 INFO 59326 --- [ctor-http-nio-6] cn.idea360.gateway.filter1.BFilter : BFilter前置逻辑
2020-03-29 16:23:22.832 INFO 59326 --- [ctor-http-nio-6] cn.idea360.gateway.filter1.CFilter : CFilter前置逻辑

2020-03-29 16:23:22.836 INFO 59326 --- [ctor-http-nio-6] cn.idea360.gateway.filter1.CFilter : CFilter后置逻辑
2020-03-29 16:23:22.836 INFO 59326 --- [ctor-http-nio-6] cn.idea360.gateway.filter1.BFilter : BFilter后置逻辑
2020-03-29 16:23:22.836 INFO 59326 --- [ctor-http-nio-6] cn.idea360.gateway.filter1.AFilter : AFilter后置逻辑

自定义过滤器

现在假设我们要统计某个服务的响应时间,我们可以在代码中

1
2
3
4
long beginTime = System.currentTimeMillis();
// do something...
long elapsed = System.currentTimeMillis() - beginTime;
log.info("elapsed: {}ms", elapsed);

每次都要这么写是不是很烦?Spring 告诉我们有个东西叫 AOP。但是我们是微服务啊,在每个服务里都写也很烦。这时候就该网关的过滤器登台表演了。

自定义过滤器需要实现 GatewayFilterOrdered 。其中 GatewayFilter 中的这个方法就是用来实现你的自定义的逻辑的

1
Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain);

Ordered 中的 int getOrder() 方法是来给过滤器设定优先级别的,值越大则优先级越低。

好了,让我们来撸代码吧.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
/**
* 此过滤器功能为计算请求完成时间
*/
public class ElapsedFilter implements GatewayFilter, Ordered {

private static final String ELAPSED_TIME_BEGIN = "elapsedTimeBegin";

@Override
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
exchange.getAttributes().put(ELAPSED_TIME_BEGIN, System.currentTimeMillis());
return chain.filter(exchange).then(
Mono.fromRunnable(() -> {
Long startTime = exchange.getAttribute(ELAPSED_TIME_BEGIN);
if (startTime != null) {
System.out.println(exchange.getRequest().getURI().getRawPath() + ": " + (System.currentTimeMillis() - startTime) + "ms");
}
})
);
}

/*
*过滤器存在优先级,order越大,优先级越低
*/
@Override
public int getOrder() {
return Ordered.LOWEST_PRECEDENCE;
}
}

我们在请求刚刚到达时,往 ServerWebExchange 中放入了一个属性 elapsedTimeBegin,属性值为当时的毫秒级时间戳。然后在请求执行结束后,又从中取出我们之前放进去的那个时间戳,与当前时间的差值即为该请求的耗时。因为这是与业务无关的日志所以将 Ordered 设为 Integer.MAX_VALUE 以降低优先级。

现在再来看我们之前的问题:怎么来区分是 “pre” 还是 “post” 呢?其实就是 chain.filter(exchange) 之前的就是 “pre” 部分,之后的也就是 then 里边的是 “post” 部分。

创建好 Filter 之后我们将它添加到我们的 Filter Chain 里边

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
@Configuration
public class FilterConfig {


/**
* http://localhost:8100/filter/provider
* @param builder
* @return
*/
@Bean
public RouteLocator customerRouteLocator(RouteLocatorBuilder builder) {
// @formatter:off
// 可以对比application.yml中关于路由转发的配置
return builder.routes()
.route(r -> r.path("/filter/**")
.filters(f -> f.stripPrefix(1)
.filter(new ElapsedFilter()))
.uri("lb://idc-cloud-provider")
.order(0)
.id("filter")
)
.build();
// @formatter:on
}

}

基于全局过滤器实现审计功能

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
// AdaptCachedBodyGlobalFilter

@Component
public class LogFilter implements GlobalFilter, Ordered {

private Logger log = LoggerFactory.getLogger(LogFilter.class);

private final ObjectMapper objectMapper = new ObjectMapper();
private static final String START_TIME = "startTime";
private static final List<HttpMessageReader<?>> messageReaders = HandlerStrategies.withDefaults().messageReaders();

@Override
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {

ServerHttpRequest request = exchange.getRequest();
// 请求路径
String path = request.getPath().pathWithinApplication().value();
// 请求schema: http/https
String scheme = request.getURI().getScheme();
// 请求方法
HttpMethod method = request.getMethod();
// 路由服务地址
URI targetUri = exchange.getAttribute(ServerWebExchangeUtils.GATEWAY_REQUEST_URL_ATTR);
// 请求头
HttpHeaders headers = request.getHeaders();
// 设置startTime
exchange.getAttributes().put(START_TIME, System.currentTimeMillis());
// 获取请求地址
InetSocketAddress remoteAddress = request.getRemoteAddress();


MultiValueMap<String, String> formData = null;



AccessRecord accessRecord = new AccessRecord();
accessRecord.setPath(path);
accessRecord.setSchema(scheme);
accessRecord.setMethod(method.name());
accessRecord.setTargetUri(targetUri.toString());
accessRecord.setRemoteAddress(remoteAddress.toString());
accessRecord.setHeaders(headers);

if (method == HttpMethod.GET) {
formData = request.getQueryParams();
accessRecord.setFormData(formData);
writeAccessRecord(accessRecord);
}

if (method == HttpMethod.POST) {
Mono<Void> voidMono = null;
if (headers.getContentType().equals(MediaType.APPLICATION_JSON)) {
// JSON
voidMono = readBody(exchange, chain, accessRecord);
}

if (headers.getContentType().equals(MediaType.APPLICATION_FORM_URLENCODED)) {
// x-www-form-urlencoded
voidMono = readFormData(exchange, chain, accessRecord);
}

if (voidMono != null) {
return voidMono;
}

}

return chain.filter(exchange);
}

private Mono<Void> readFormData(ServerWebExchange exchange, GatewayFilterChain chain, AccessRecord accessRecord) {
return null;
}

private Mono<Void> readBody(ServerWebExchange exchange, GatewayFilterChain chain, AccessRecord accessRecord) {

return DataBufferUtils.join(exchange.getRequest().getBody()).flatMap(dataBuffer -> {

byte[] bytes = new byte[dataBuffer.readableByteCount()];
dataBuffer.read(bytes);
DataBufferUtils.release(dataBuffer);
Flux<DataBuffer> cachedFlux = Flux.defer(() -> {
DataBuffer buffer = exchange.getResponse().bufferFactory().wrap(bytes);
DataBufferUtils.retain(buffer);
return Mono.just(buffer);
});


// 重写请求体,因为请求体数据只能被消费一次
ServerHttpRequest mutatedRequest = new ServerHttpRequestDecorator(exchange.getRequest()) {
@Override
public Flux<DataBuffer> getBody() {
return cachedFlux;
}
};

ServerWebExchange mutatedExchange = exchange.mutate().request(mutatedRequest).build();

return ServerRequest.create(mutatedExchange, messageReaders)
.bodyToMono(String.class)
.doOnNext(objectValue -> {
accessRecord.setBody(objectValue);
writeAccessRecord(accessRecord);
}).then(chain.filter(mutatedExchange));
});
}

@Override
public int getOrder() {
return Ordered.LOWEST_PRECEDENCE;
}

/**
* TODO 异步日志
* @param accessRecord
*/
private void writeAccessRecord(AccessRecord accessRecord) {

log.info("\n\n start------------------------------------------------- \n " +
"请求路径:{}\n " +
"scheme:{}\n " +
"请求方法:{}\n " +
"目标服务:{}\n " +
"请求头:{}\n " +
"远程IP地址:{}\n " +
"表单参数:{}\n " +
"请求体:{}\n " +
"end------------------------------------------------- \n ",
accessRecord.getPath(), accessRecord.getSchema(), accessRecord.getMethod(), accessRecord.getTargetUri(), accessRecord.getHeaders(), accessRecord.getRemoteAddress(), accessRecord.getFormData(), accessRecord.getBody());
}
}
1
2
3
curl -X POST -H "Content-Type:application/json" -d '{"name": "admin"}' http://192.168.124.5:2000/p/provider1

curl -X GET -G -d "username=admin" http://192.168.124.5:2000/p/provider1/1

输出结果

1
2
3
4
5
6
7
8
9
10
start------------------------------------------------- 
请求路径:/provider1
scheme:http
请求方法:POST
目标服务:http://192.168.124.5:2001/provider1
请求头:[Content-Type:"application/json", User-Agent:"PostmanRuntime/7.22.0", Accept:"*/*", Cache-Control:"no-cache", Postman-Token:"2a4ce04d-8449-411d-abd8-247d20421dc2", Host:"192.168.124.5:2000", Accept-Encoding:"gzip, deflate, br", Content-Length:"16", Connection:"keep-alive"]
远程IP地址:/192.168.124.5:49969
表单参数:null
请求体:{"name":"admin"}
end-------------------------------------------------

接下来,我们来配置日志,方便日志系统提取日志。SpringBoot默认的日志为logback。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
<?xml version="1.0" encoding="UTF-8"?>
<configuration>

<property name="LOGS" value="/Users/cuishiying/Documents/spring-cloud-learning/logs" />

<appender name="Console" class="ch.qos.logback.core.ConsoleAppender">
<layout class="ch.qos.logback.classic.PatternLayout">
<Pattern>
%black(%d{ISO8601}) %highlight(%-5level) [%blue(%t)] %yellow(%C{1.}): %msg%n%throwable
</Pattern>
</layout>
</appender>

<appender name="RollingFile" class="ch.qos.logback.core.rolling.RollingFileAppender">
<file>${LOGS}/spring-boot-logger.log</file>
<encoder
class="ch.qos.logback.classic.encoder.PatternLayoutEncoder">
<Pattern>%d %p %C{1.} [%t] %m%n</Pattern>
</encoder>

<rollingPolicy
class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
<!-- rollover daily and when the file reaches 10 MegaBytes -->
<fileNamePattern>${LOGS}/archived/spring-boot-logger-%d{yyyy-MM-dd}.%i.log
</fileNamePattern>
<timeBasedFileNamingAndTriggeringPolicy
class="ch.qos.logback.core.rolling.SizeAndTimeBasedFNATP">
<maxFileSize>10MB</maxFileSize>
</timeBasedFileNamingAndTriggeringPolicy>
</rollingPolicy>
</appender>

<!-- LOG everything at INFO level -->
<root level="info">
<!--<appender-ref ref="RollingFile" />-->
<appender-ref ref="Console" />
</root>

<!-- LOG "cn.idea360*" at TRACE level additivity:是否向上级loger传递打印信息。默认是true-->
<logger name="cn.idea360.gateway" level="info" additivity="false">
<appender-ref ref="RollingFile" />
<appender-ref ref="Console" />
</logger>

</configuration>

这样console和日志目录下就都有日志了。

自定义过滤器工厂

如果你看过静态路由的配置,你应该对如下配置有印象。

1
2
3
filters:
- StripPrefix=1
- AddResponseHeader=X-Response-Default-Foo, Default-Bar

StripPrefixAddResponseHeader 这两个实际上是两个过滤器工厂(GatewayFilterFactory),用这种配置的方式更灵活方便。

我们就将之前的那个 ElapsedFilter 改造一下,让它能接收一个 boolean 类型的参数,来决定是否将请求参数也打印出来。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
public class ElapsedGatewayFilterFactory extends AbstractGatewayFilterFactory<ElapsedGatewayFilterFactory.Config> {

private static final Log log = LogFactory.getLog(GatewayFilter.class);
private static final String ELAPSED_TIME_BEGIN = "elapsedTimeBegin";
private static final String KEY = "withParams";


public List<String> shortcutFieldOrder() {
return Arrays.asList(KEY);
}

public ElapsedGatewayFilterFactory() {
super(Config.class);
}


public GatewayFilter apply(Config config) {
return (exchange, chain) -> {
exchange.getAttributes().put(ELAPSED_TIME_BEGIN, System.currentTimeMillis());
return chain.filter(exchange).then(
Mono.fromRunnable(() -> {
Long startTime = exchange.getAttribute(ELAPSED_TIME_BEGIN);
if (startTime != null) {
StringBuilder sb = new StringBuilder(exchange.getRequest().getURI().getRawPath())
.append(": ")
.append(System.currentTimeMillis() - startTime)
.append("ms");
if (config.isWithParams()) {
sb.append(" params:").append(exchange.getRequest().getQueryParams());
}
log.info(sb.toString());
}
})
);
};
}


public static class Config {

private boolean withParams;

public boolean isWithParams() {
return withParams;
}

public void setWithParams(boolean withParams) {
this.withParams = withParams;
}

}
}

过滤器工厂的顶级接口是 GatewayFilterFactory,我们可以直接继承它的两个抽象类来简化开发 AbstractGatewayFilterFactoryAbstractNameValueGatewayFilterFactory,这两个抽象类的区别就是前者接收一个参数(像 StripPrefix 和我们创建的这种),后者接收两个参数(像 AddResponseHeader)。

GatewayFilter apply(Config config) 方法内部实际上是创建了一个 GatewayFilter 的匿名类,具体实现和之前的几乎一样,就不解释了。

静态内部类 Config 就是为了接收那个 boolean 类型的参数服务的,里边的变量名可以随意写,但是要重写 List shortcutFieldOrder() 这个方法。

这里注意一下,一定要调用一下父类的构造器把 Config 类型传过去,否则会报 ClassCastException

1
2
3
public ElapsedGatewayFilterFactory() {
super(Config.class);
}

工厂类我们有了,再把它注册到 Spring 当中

1
2
3
4
@Bean
public ElapsedGatewayFilterFactory elapsedGatewayFilterFactory() {
return new ElapsedGatewayFilterFactory();
}

然后添加配置(主要改动在 default-filters 配置)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
server:
port: 2000
spring:
application:
name: idc-gateway
redis:
host: localhost
port: 6379
timeout: 6000ms # 连接超时时长(毫秒)
jedis:
pool:
max-active: 1000 # 连接池最大连接数(使用负值表示没有限制)
max-wait: -1ms # 连接池最大阻塞等待时间(使用负值表示没有限制)
max-idle: 10 # 连接池中的最大空闲连接
min-idle: 5 # 连接池中的最小空闲连接
cloud:
consul:
host: localhost
port: 8500
gateway:
discovery:
locator:
enabled: true
# 修改在这里。gateway可以通过开启以下配置来打开根据服务的serviceId来匹配路由,默认是大写
default-filters:
- Elapsed=true
routes:
- id: provider # 路由 ID,保持唯一
uri: lb://idc-provider1 # uri指目标服务地址,lb代表从注册中心获取服务
predicates: # 路由条件。Predicate 接受一个输入参数,返回一个布尔值结果。该接口包含多种默认方法来将 Predicate 组合成其他复杂的逻辑(比如:与,或,非)
- Path=/p/**
filters:
- StripPrefix=1 # 过滤器StripPrefix,作用是去掉请求路径的最前面n个部分截取掉。StripPrefix=1就代表截取路径的个数为1,比如前端过来请求/test/good/1/view,匹配成功后,路由到后端的请求路径就会变成http://localhost:8888/good/1/view

结语

本文到此结束。关于 Webflux 的学习刚入门,觉得可以像 Rxjava 那样在 onNext 中拿到异步数据,然而在 post 获取body中没生效。经测试可知 getBody 获得的数据输出为null,而自己通过 Flux.create 创建的数据可以在订阅者中获取到。此处还有待研究,希望抛砖引玉,大家有研究出来的不吝赐教。同时,希望大家关注公众号【当我遇上你】。

参考