Webflux使用示例

示例

pox.xml

1
2
3
4
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>

接口

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
package cn.idea360.data.gateway.flux;

import cn.idea360.data.gateway.task.ClueTask;
import cn.idea360.data.gateway.task.OrderTask;
import lombok.extern.slf4j.Slf4j;
import org.springframework.http.MediaType;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import reactor.core.publisher.Flux;

import java.time.Duration;
import java.time.LocalDateTime;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import static cn.idea360.data.gateway.config.Constant.DATE_TIME_FORMATTER;

/**
* @author cuishiying
* @date 2024-07-23
*/
@Slf4j
@RestController
@RequestMapping("/flux")
public class FluxController {

private final ExecutorService executorService = Executors.newFixedThreadPool(10);

private static final String DONE = "done";
private static final String ERROR = "error";

/**
* http://localhost:2033/api/flux/hello
*/
@GetMapping(value = "hello", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<String> fluxEndpoint() {
return Flux.concat(generateData(), Flux.just(DONE))
.onErrorReturn(ERROR)
.doFinally(signalType -> {
log.info("Stream ended due to: " + signalType);
});
}

private Flux<String> generateData() {
return Flux.interval(Duration.ofSeconds(1))
.map(sequence -> "Data chunk " + sequence)
.take(10);
}

/**
* http://localhost:2033/api/flux/sink
*/
@GetMapping(value = "/sink", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<Object> getMessages() {
return Flux.create(sink -> {
new Thread(() -> {
for (int i = 0; i < 5; i++) {
sink.next("pushMessgae:" + i);
try {
Thread.sleep(500);
} catch (InterruptedException e) {
sink.error(e);
Thread.currentThread().interrupt();
}
}
sink.complete();
}).start();
});
}
}

测试请求

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
package cn.idea360.data.gateway.flux;

import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Test;
import org.springframework.http.MediaType;
import org.springframework.web.reactive.function.client.WebClient;
import reactor.core.publisher.Flux;

import java.nio.charset.StandardCharsets;
import java.util.concurrent.CountDownLatch;

/**
* @author cuishiying
* @date 2024-07-24
*/
@Slf4j
class FluxTest {

@DisplayName("WebClient请求stream")
@Test
void webClient() throws InterruptedException {
CountDownLatch countDownLatch = new CountDownLatch(1);
WebClient client = WebClient.create("http://localhost:2033");
Flux<String> result = client
.get()
.uri(uriBuilder -> uriBuilder
.path("/api/flux/sink")
.queryParam("startTime", "2024-08-16 13:00:00")
.queryParam("endTime", "2024-08-16 13:59:59")
.build())
.acceptCharset(StandardCharsets.UTF_8)
.accept(MediaType.TEXT_EVENT_STREAM)
.retrieve()
.bodyToFlux(String.class);
result
.doOnComplete(() -> {
log.info("Data fetching completed");
countDownLatch.countDown();
})
.subscribe(log::info,
error -> log.error("Error occurred", error),
() -> log.info("Subscription closed"));
countDownLatch.await();
}
}